- php think queue:work --queue helloJobQueue
- php think queue:listen --queue helloJobQueue
- php think queue:work \
- --daemon //是否循环执行,如果不加该参数,则该命令处理完下一个消息就退出
- --queue helloJobQueue //要处理的队列的名称
- --delay 0 \ //如果本次任务执行抛出异常且任务未被删除时,设置其下次执行前延迟多少秒,默认为0
- --force \ //系统处于维护状态时是否仍然处理任务,并未找到相关说明
- --memory 128 \ //该进程允许使用的内存上限,以 M 为单位
- --sleep 3 \ //如果队列中无任务,则sleep多少秒后重新检查(work+daemon模式)或者退出(listen或非daemon模式)
- --tries 2 //如果任务已经超过尝试次数上限,则触发‘任务尝试次数超限’事件,默认为0
- php think queue:listen \
- --queue helloJobQueue \ //监听的队列的名称
- --delay 0 \ //如果本次任务执行抛出异常且任务未被删除时,设置其下次执行前延迟多少秒,默认为0
- --memory 128 \ //该进程允许使用的内存上限,以 M 为单位
- --sleep 3 \ //如果队列中无任务,则多长时间后重新检查
- --tries 0 \ //如果任务已经超过重发次数上限,则进入失败处理逻辑,默认为0
- --timeout 60 // work 进程允许执行的最长时间,以秒为单位
可以看到 listen 模式下,不包含 --deamon 参数,原因下面会说明两者都可以用于处理消息队列中的任务
区别在于:
- public function fire(){
- while(true){ //死循环
- $consoleOutPut->writeln("<info>I am looping forever inside a job.</info> \n");
- sleep(1);
- }
- }
那么这个循环将永远不能停止,直到任务所在的进程超过内存限制或者由管理员手动结束。这个过程不会有任何的告警。更严重的是,如果你配置了expire ,那么这个死循环的任务可能会污染到同样处理 helloJobQueue 队列的其他work进程,最后好几个work进程将被卡死在这段死循环中。详情后文会说明。- php think queue:work
- php think queue:restart
- php think queue:restart
- php think queue:work
单模块项目推荐使用 app\job 作为任务类的命名空间
多模块项目可用使用 app\module\job 作为任务类的命名空间 也可以放在任意可以自动加载到的地方
如果一个任务类里有多个小任务的话,在发布任务时,需要用 任务的类名@方法名 如 app\lib\job\Job2@task1、app\lib\job\Job2@task2多任务例子:
注意:命令行中的 --queue 参数不支持@解析
- public function actionWithMultiTask(){
-
- $taskType = $_GET['taskType'];
- switch ($whichTask) {
- case 'taskA':
- $jobHandlerClassName = 'application\index\job\MultiTask@taskA';
- $jobDataArr = ['a' => '1'];
- $jobQueueName = "multiTaskJobQueue";
- break;
- case 'taskB':
- $jobHandlerClassName = 'application\index\job\MultiTask@taskB';
- $jobDataArr = ['b' => '2'];
- $jobQueueName = "multiTaskJobQueue";
- break;
- default:
- break;
- }
-
- $isPushed = Queue::push($jobHandlerClassName, $jobDataArr, $jobQueueName);
- if ($isPushed !== false) {
- echo("the $taskType of MultiTask Job has been Pushed to ".$jobQueueName ."<br>");
- }else{
- throw new Exception("push a new $taskType of MultiTask Job Failed!");
- }
- }
- <?php
- /**
- * 文件路径: \application\index\job\MultiTask.php
- * 这是一个消费者类,用于处理 multiTaskJobQueue 队列中的任务
- */
- namespace app\index\job;
-
- use think\queue\Job;
-
- class MultiTask {
-
- public function taskA(Job $job,$data){
-
- $isJobDone = $this->_doTaskA($data);
-
- if ($isJobDone) {
- $job->delete();
- print("Info: TaskA of Job MultiTask has been done and deleted"."\n");
- }else{
- if ($job->attempts() > 3) {
- $job->delete();
- }
- }
- }
-
- public function taskB(Job $job,$data){
-
- $isJobDone = $this->_doTaskA($data);
-
- if ($isJobDone) {
- $job->delete();
- print("Info: TaskB of Job MultiTask has been done and deleted"."\n");
- }else{
- if ($job->attempts() > 2) {
- $job->release();
- }
- }
- }
-
- private function _doTaskA($data) {
- print("Info: doing TaskA of Job MultiTask "."\n");
- return true;
- }
-
- private function _doTaskB($data) {
- print("Info: doing TaskB of Job MultiTask "."\n");
- return true;
- }
延迟执行,相对于即时执行,是用来限制某个任务的最早可执行时刻。在到达该时刻之前,该任务会被跳过。
可以利用该功能实现定时任务。
使用方式:
- // 即时执行
- $isPushed = Queue::push($jobHandlerClassName, $jobDataArr, $jobQueueName);
-
- // 延迟 2 秒执行
- $isPushed = Queue::later( 2, $jobHandlerClassName, $jobDataArr, $jobQueueName);
-
- // 延迟到 2017-02-18 01:01:01 时刻执行
- $time2wait = strtotime('2017-02-18 01:01:01') - strtotime('now');
- $isPushed = Queue::later($time2wait,$jobHandlerClassName, $jobDataArr, $jobQueueName);
- // 重发,即时执行
- $job->release();
-
- // 重发,延迟 2 秒执行
- $job->release(2);
-
- // 延迟到 2017-02-18 01:01:01 时刻执行
- $time2wait = strtotime('2017-02-18 01:01:01') - strtotime('now');
- $job->release($time2wait);
- //如果消费者类的fire()方法抛出了异常且任务未被删除时,将自动重发该任务,重发时,会设置其下次执行前延迟多少秒,默认为0
- php think queue:work --delay 3
thinkphp-queue 中,消息的重发时机有3种:
- if( $isJobDone === false){
- $job->release();
- }
补充:
在database 模式下,2.7.1 和 2.7.2 中的重发逻辑是先删除原来的任务,然后插入一个新的任务。2.7.3 中的重发时机是直接更新原任务。
而在redis 模式下,3种重发都是先删除再插入。
不管是哪种重发方式,重发之后,任务的已尝试次数会在原来的基础上 +1 。
此外,消费者类中需要注意,如果 fire() 方法中可能抛出异常,那么
当同时满足以下条件时,将触发任务失败回调:
注意, queue_failed 标签需要在安装了 thinkphp-queue之后 手动 去 \application\tags.php 文件中添加。
首先,我们添加 queue_failed 事件标签, 及其对应的回调方法
- // 文件路径: \application\tags.php
- // 应用行为扩展定义文件
- return [
- // 应用初始化
- 'app_init' => [],
- // 应用开始
- 'app_begin' => [],
- // 模块初始化
- 'module_init' => [],
- // 操作开始执行
- 'action_begin' => [],
- // 视图内容过滤
- 'view_filter' => [],
- // 日志写入
- 'log_write' => [],
- // 应用结束
- 'app_end' => [],
-
- // 任务失败统一回调,有四种定义方式
- 'queue_failed'=> [
-
- // 数组形式,[ 'ClassName' , 'methodName']
- ['application\\behavior\\MyQueueFailedLogger', 'logAllFailedQueues']
-
- // 字符串(静态方法),'StaicClassName::methodName'
- // 'MyQueueFailedLogger::logAllFailedQueues'
-
- // 字符串(对象方法),'ClassName',此时需在对应的ClassName类中添加一个名为 queueFailed 的方法
- // 'application\\behavior\\MyQueueFailedLogger'
-
- // 闭包形式
- /*
- function( &$jobObject , $extra){
- // var_dump($jobObject);
- return true;
- }
- */
- ]
- ];
这里,我们选择数组形式的回调方式,新增 \application\behavior\MyQueueFailedLogger 类,添加一个 logAllFailedQueues() 方法
- <?php
- /**
- * 文件路径: \application\behavior\MyQueueFailedLogger.php
- * 这是一个行为类,用于处理所有的消息队列中的任务失败回调
- */
-
- namespace app\behavior;
-
-
- class MyQueueFailedLogger {
-
- const should_run_hook_callback = true;
-
- /**
- * @param $jobObject \think\queue\Job //任务对象,保存了该任务的执行情况和业务数据
- * @return bool true //是否需要删除任务并触发其failed() 方法
- */
- public function logAllFailedQueues(&$jobObject){
-
- $failedJobLog = [
- 'jobHandlerClassName' => $jobObject->getName(), // 'application\index\job\Hello'
- 'queueName' => $jobObject->getQueue(), // 'helloJobQueue'
- 'jobData' => $jobObject->getRawBody()['data'], // '{'a': 1 }'
- 'attempts' => $jobObject->attempts(), // 3
- ];
- var_export(json_encode($failedJobLog,true));
-
- // $jobObject->release(); //重发任务
- //$jobObject->delete(); //删除任务
- //$jobObject->failed(); //通知消费者类任务执行失败
-
- return self::should_run_hook_callback;
- }
- }
需要注意该回调方法的返回值:
最后,在消费者类中,添加 failed() 方法
- /**
- * 文件路径: \application\index\job\HelloJob.php
- */
-
- /**
- * 该方法用于接收任务执行失败的通知,你可以发送邮件给相应的负责人员
- * @param $jobData string|array|... //发布任务时传递的 jobData 数据
- */
- public function failed($jobData){
- send_mail_to_somebody() ;
-
- print("Warning: Job failed after max retries. job data is :".var_export($data,true)."\n";
- }
这样,就可以做到任务失败的记录与告警
过期这个概念用文字比较难描述清楚,建议先看一下 深入理解 中 3.4 消息处理的详细流程图