2025年6月7日 星期六 乙巳(蛇)年 三月十一 设为首页 加入收藏
rss
您当前的位置:首页 > 计算机 > 编程开发 > PHP

thinkphp-queue 源码阅读笔记 之 详细介绍

时间:12-14来源:作者:点击数:9
2.1 命令模式
  • queue:subscribe 命令 [官方未提供示例, 略过]
  • queue:work 命令
    work 命令: 该命令将启动一个 work 进程来处理消息队列。
    • php think queue:work --queue helloJobQueue
  • queue:listen 命令
    listen 命令: 该命令将会启动一个 listen 进程 ,然后由 listen 进程通过 proc_open(‘php think queue:work --queue="%s" --delay=%s --memory=%s --sleep=%s --tries=%s’) 的方式来周期性地创建一次性的 work 进程来消费消息队列, 并且限制该 work 进程的执行时间, 同时通过管道来监听 work 进程的输出。
    • php think queue:listen --queue helloJobQueue
2.2 命令行参数
  • Work 模式
    • php think queue:work \
    • --daemon //是否循环执行,如果不加该参数,则该命令处理完下一个消息就退出
    • --queue helloJobQueue //要处理的队列的名称
    • --delay 0 \ //如果本次任务执行抛出异常且任务未被删除时,设置其下次执行前延迟多少秒,默认为0
    • --force \ //系统处于维护状态时是否仍然处理任务,并未找到相关说明
    • --memory 128 \ //该进程允许使用的内存上限,以 M 为单位
    • --sleep 3 \ //如果队列中无任务,则sleep多少秒后重新检查(work+daemon模式)或者退出(listen或非daemon模式)
    • --tries 2 //如果任务已经超过尝试次数上限,则触发‘任务尝试次数超限’事件,默认为0
  • Listen 模式
    • php think queue:listen \
    • --queue helloJobQueue \ //监听的队列的名称
    • --delay 0 \ //如果本次任务执行抛出异常且任务未被删除时,设置其下次执行前延迟多少秒,默认为0
    • --memory 128 \ //该进程允许使用的内存上限,以 M 为单位
    • --sleep 3 \ //如果队列中无任务,则多长时间后重新检查
    • --tries 0 \ //如果任务已经超过重发次数上限,则进入失败处理逻辑,默认为0
    • --timeout 60 // work 进程允许执行的最长时间,以秒为单位
    可以看到 listen 模式下,不包含 --deamon 参数,原因下面会说明
2.3 work 模式和 listen 模式的区别

两者都可以用于处理消息队列中的任务

区别在于:

  • 2.3.1 执行原理不同
    • work 命令是单进程的处理模式。
      按照是否设置了 --daemon 参数,work命令又可分为单次执行和循环执行两种模式。
      • 单次执行:不添加 --daemon参数,该模式下,work进程在处理完下一个消息后直接结束当前进程。当队列为空时,会sleep一段时间然后退出。
      • 循环执行:添加了 --daemon参数,该模式下,work进程会循环地处理队列中的消息,直到内存超出参数配置才结束进程。当队列为空时,会在每次循环中sleep一段时间。
    • listen 命令是 双进程 + 管道 的处理模式。
      listen命令所在的进程会循环地创建 单次执行模式的 work 进程,每次创建的 work 进程只消费一个消息就会结束, 然后 listen 进程再创建一个新的 work 进程,
      • listen 进程会定时检查当前的 work 进程执行时间是否超过了 --timeout 参数的值, 如果已超时, 则 listen 进程会 kill 掉 work 进程, 然后抛出异常
      • listen 进程会通过管道来监听当前的 work 进程的输出, 当 work 进程有输出时, listen 进程会将输出写入到 stdout / stderr
      • listen 进程会定时通过 proc_get_status() 来监控当前的 work 进程是否仍在运行, work 进程消费完一个任务之后, work 进程就结束了,其状态会变成 terminated, 此时 listen 进程就会重新创建一个新的 work 进程并对其计时, 新的 work 进程开始消费下一个任务
  • 2.3.2 结束时机不同
    • work 命令的结束时机在上面的执行原理部分已叙述,此处不再重复
    • listen 命令中,listen 进程和 work 进程会在以下情况下结束:
      • listen 进程会定时检查当前的 work 进程的执行时间是否超过了 --timeout 参数的值,如果已超时, 此时 listen 进程会先 kill 掉当前的 work 进程, 然后抛出一个 ProcessTimeoutException 异常并结束 listen 进程
      • listen 进程会定时检查自身使用的内存是否超过了 --memory 参数的值,如果已超过, 此时 listen 进程会直接 die 掉, work 进程也会自动结束.
  • 2.3.3 性能不同
    • work 命令是在脚本内部做循环,框架脚本在命令执行的初期就已加载完毕;
    • 而listen模式则是处理完一个任务之后新开一个work进程,此时会重新加载框架脚本。
      因此: work 模式的性能会比listen模式高
      注意:当代码有更新时,work 模式下需要手动去执行 php think queue:restart 命令重启队列来使改动生效;而listen 模式会自动生效,无需其他操作。
  • 2.3.4 超时控制能力
    • work 模式本质上既不能控制进程自身的运行时间,也无法限制执行中的任务的执行时间。
      举例来说,假如你在某次上线之后,在上文中的 \application\index\job\Hello.php 消费者的fire方法中添加了一段死循环 :
      • public function fire(){
      • while(true){ //死循环
      • $consoleOutPut->writeln("<info>I am looping forever inside a job.</info> \n");
      • sleep(1);
      • }
      • }
      那么这个循环将永远不能停止,直到任务所在的进程超过内存限制或者由管理员手动结束。这个过程不会有任何的告警。更严重的是,如果你配置了expire ,那么这个死循环的任务可能会污染到同样处理 helloJobQueue 队列的其他work进程,最后好几个work进程将被卡死在这段死循环中。详情后文会说明。
      work 模式下的超时控制能力,实际上应该理解为 多个work 进程配合下的过期任务重发能力。
    • 而 listen 命令可以限制 listen 进程创建的 work 进程的最大执行时间。
      listen 命令可通过 --timeout 参数限制 work 进程允许运行的最长时间,超过该时间限制后, work 进程会被强制 kill 掉, listen 进程本身也会抛出异常并结束;
    • 这里有必要补充一下 expire 和 timeout 之间的区别:
      • expire 在配置文件中设置,timeout 在 listen命令 的命令行参数中设置,而且,expire 和 timeout 是两个不同层次上的概念:
      • expire 是指任务的过期时间。这个时间是全局的,影响到所有的work进程。(不管是独立的work命令还是 listen 模式下创建的的 work 进程) 。expire 针对的对象是 任务
      • timeout 是指 work 进程的超时时间。这个时间只对当前执行的 listen 命令有效。timeout 针对的对象是 work 进程
  • 2.3.5 使用场景不同
    根据上面的介绍,可以看到,work 命令的适用场景是:
    • 任务数量较多
    • 性能要求较高
    • 任务的执行时间较短
    • 消费者类中不存在死循环,sleep() ,exit() ,die() 等容易导致bug的逻辑
    listen命令的适用场景是:
    • 任务数量较少
    • 任务的执行时间较长(如生成大型的excel报表等),
    • 任务的执行时间需要有严格限制
2.4 消息队列的开始,停止与重启
  • 开始一个消息队列:
    • php think queue:work
  • 停止所有的消息队列:
    • php think queue:restart
  • 重启所有的消息队列:
    • php think queue:restart
    • php think queue:work
2.5 多模块,多任务的处理
  • 多模块
    单模块项目推荐使用 app\job 作为任务类的命名空间
    多模块项目可用使用 app\module\job 作为任务类的命名空间 也可以放在任意可以自动加载到的地方
  • 多任务
    如果一个任务类里有多个小任务的话,在发布任务时,需要用 任务的类名@方法名 如 app\lib\job\Job2@task1app\lib\job\Job2@task2
    注意:命令行中的 --queue 参数不支持@解析
    多任务例子:
    • 在 \application\index\controller\JobTest.php 控制器中,添加 actionWithMultiTask()方法:
    • public function actionWithMultiTask(){
    • $taskType = $_GET['taskType'];
    • switch ($whichTask) {
    • case 'taskA':
    • $jobHandlerClassName = 'application\index\job\MultiTask@taskA';
    • $jobDataArr = ['a' => '1'];
    • $jobQueueName = "multiTaskJobQueue";
    • break;
    • case 'taskB':
    • $jobHandlerClassName = 'application\index\job\MultiTask@taskB';
    • $jobDataArr = ['b' => '2'];
    • $jobQueueName = "multiTaskJobQueue";
    • break;
    • default:
    • break;
    • }
    • $isPushed = Queue::push($jobHandlerClassName, $jobDataArr, $jobQueueName);
    • if ($isPushed !== false) {
    • echo("the $taskType of MultiTask Job has been Pushed to ".$jobQueueName ."<br>");
    • }else{
    • throw new Exception("push a new $taskType of MultiTask Job Failed!");
    • }
    • }
    • 新增 \application\index\job\MultiTask.php 消费者类,并编写其 taskA() 和 taskB()方法
    • <?php
    • /**
    • * 文件路径: \application\index\job\MultiTask.php
    • * 这是一个消费者类,用于处理 multiTaskJobQueue 队列中的任务
    • */
    • namespace app\index\job;
    • use think\queue\Job;
    • class MultiTask {
    • public function taskA(Job $job,$data){
    • $isJobDone = $this->_doTaskA($data);
    • if ($isJobDone) {
    • $job->delete();
    • print("Info: TaskA of Job MultiTask has been done and deleted"."\n");
    • }else{
    • if ($job->attempts() > 3) {
    • $job->delete();
    • }
    • }
    • }
    • public function taskB(Job $job,$data){
    • $isJobDone = $this->_doTaskA($data);
    • if ($isJobDone) {
    • $job->delete();
    • print("Info: TaskB of Job MultiTask has been done and deleted"."\n");
    • }else{
    • if ($job->attempts() > 2) {
    • $job->release();
    • }
    • }
    • }
    • private function _doTaskA($data) {
    • print("Info: doing TaskA of Job MultiTask "."\n");
    • return true;
    • }
    • private function _doTaskB($data) {
    • print("Info: doing TaskB of Job MultiTask "."\n");
    • return true;
    • }
2.6 消息的延迟执行与定时执行

延迟执行,相对于即时执行,是用来限制某个任务的最早可执行时刻。在到达该时刻之前,该任务会被跳过。

可以利用该功能实现定时任务

使用方式:

  • 在生产者业务代码中:
  • // 即时执行
  • $isPushed = Queue::push($jobHandlerClassName, $jobDataArr, $jobQueueName);
  • // 延迟 2 秒执行
  • $isPushed = Queue::later( 2, $jobHandlerClassName, $jobDataArr, $jobQueueName);
  • // 延迟到 2017-02-18 01:01:01 时刻执行
  • $time2wait = strtotime('2017-02-18 01:01:01') - strtotime('now');
  • $isPushed = Queue::later($time2wait,$jobHandlerClassName, $jobDataArr, $jobQueueName);
  • 在消费者类中:
  • // 重发,即时执行
  • $job->release();
  • // 重发,延迟 2 秒执行
  • $job->release(2);
  • // 延迟到 2017-02-18 01:01:01 时刻执行
  • $time2wait = strtotime('2017-02-18 01:01:01') - strtotime('now');
  • $job->release($time2wait);
  • 在命令行中:
  • //如果消费者类的fire()方法抛出了异常且任务未被删除时,将自动重发该任务,重发时,会设置其下次执行前延迟多少秒,默认为0
  • php think queue:work --delay 3
2.7 消息的重发

thinkphp-queue 中,消息的重发时机有3种:

  • 2.7.1 在消费者类中手动重发:
  • if( $isJobDone === false){
  • $job->release();
  • }
  • 2.7.2 work进程自动重发,需同时满足以下两个条件
    • 消费者类的 fire() 方法抛出了异常
    • 任务未被删除
  • 2.7.3 当配置了 expire 不为 null 时,work 进程内部每次查询可用任务之前,会先自动重发已过期的任务。

补充:

在database 模式下,2.7.1 和 2.7.2 中的重发逻辑是先删除原来的任务,然后插入一个新的任务。2.7.3 中的重发时机是直接更新原任务。

而在redis 模式下,3种重发都是先删除再插入。

不管是哪种重发方式,重发之后,任务的已尝试次数会在原来的基础上 +1 。

此外,消费者类中需要注意,如果 fire() 方法中可能抛出异常,那么

  • 如果不需要自动重发的话, 请在抛出异常之前将任务删除 $job->delete() ,否则会被框架自动重发。
  • 如果需要自动重发的话,请直接抛出异常,不要在 fire() 方法中又手动使用 $job->release() , 这样会导致该任务被重发两次,产生两个一样的新任务。
2.8 任务的失败回调及告警

当同时满足以下条件时,将触发任务失败回调:

  • 命令行的 --tries 参数的值大于0
  • 任务的已尝试次数大于 命令行的 --tries 参数
  • 开发者添加了 queue_failed 事件标签及其对应的回调代码
  • 消费者类中定义了 failed() 方法,用于接收任务失败的通知

注意, queue_failed 标签需要在安装了 thinkphp-queue之后 手动 去 \application\tags.php 文件中添加。

首先,我们添加 queue_failed 事件标签, 及其对应的回调方法

  • // 文件路径: \application\tags.php
  • // 应用行为扩展定义文件
  • return [
  • // 应用初始化
  • 'app_init' => [],
  • // 应用开始
  • 'app_begin' => [],
  • // 模块初始化
  • 'module_init' => [],
  • // 操作开始执行
  • 'action_begin' => [],
  • // 视图内容过滤
  • 'view_filter' => [],
  • // 日志写入
  • 'log_write' => [],
  • // 应用结束
  • 'app_end' => [],
  • // 任务失败统一回调,有四种定义方式
  • 'queue_failed'=> [
  • // 数组形式,[ 'ClassName' , 'methodName']
  • ['application\\behavior\\MyQueueFailedLogger', 'logAllFailedQueues']
  • // 字符串(静态方法),'StaicClassName::methodName'
  • // 'MyQueueFailedLogger::logAllFailedQueues'
  • // 字符串(对象方法),'ClassName',此时需在对应的ClassName类中添加一个名为 queueFailed 的方法
  • // 'application\\behavior\\MyQueueFailedLogger'
  • // 闭包形式
  • /*
  • function( &$jobObject , $extra){
  • // var_dump($jobObject);
  • return true;
  • }
  • */
  • ]
  • ];

这里,我们选择数组形式的回调方式,新增 \application\behavior\MyQueueFailedLogger 类,添加一个 logAllFailedQueues() 方法

  • <?php
  • /**
  • * 文件路径: \application\behavior\MyQueueFailedLogger.php
  • * 这是一个行为类,用于处理所有的消息队列中的任务失败回调
  • */
  • namespace app\behavior;
  • class MyQueueFailedLogger {
  • const should_run_hook_callback = true;
  • /**
  • * @param $jobObject \think\queue\Job //任务对象,保存了该任务的执行情况和业务数据
  • * @return bool true //是否需要删除任务并触发其failed() 方法
  • */
  • public function logAllFailedQueues(&$jobObject){
  • $failedJobLog = [
  • 'jobHandlerClassName' => $jobObject->getName(), // 'application\index\job\Hello'
  • 'queueName' => $jobObject->getQueue(), // 'helloJobQueue'
  • 'jobData' => $jobObject->getRawBody()['data'], // '{'a': 1 }'
  • 'attempts' => $jobObject->attempts(), // 3
  • ];
  • var_export(json_encode($failedJobLog,true));
  • // $jobObject->release(); //重发任务
  • //$jobObject->delete(); //删除任务
  • //$jobObject->failed(); //通知消费者类任务执行失败
  • return self::should_run_hook_callback;
  • }
  • }

需要注意该回调方法的返回值:

  • 返回 true 时,系统会自动删除该任务,并且自动调用消费者类中的 failed() 方法
  • 返回 false 时,系统不会自动删除该任务,也不会自动调用消费者类中的 failed() 方法,需要开发者另行处理失败任务的删除和通知。

最后,在消费者类中,添加 failed() 方法

  • /**
  • * 文件路径: \application\index\job\HelloJob.php
  • */
  • /**
  • * 该方法用于接收任务执行失败的通知,你可以发送邮件给相应的负责人员
  • * @param $jobData string|array|... //发布任务时传递的 jobData 数据
  • */
  • public function failed($jobData){
  • send_mail_to_somebody() ;
  • print("Warning: Job failed after max retries. job data is :".var_export($data,true)."\n";
  • }

这样,就可以做到任务失败的记录告警

2.9 处理过期的任务

过期这个概念用文字比较难描述清楚,建议先看一下 深入理解 中 3.4 消息处理的详细流程图

方便获取更多学习、工作、生活信息请关注本站微信公众号城东书院 微信服务号城东书院 微信订阅号
推荐内容
相关内容
栏目更新
栏目热门
本栏推荐