worker 就是一个 Python 进程,运行在后台用于执行那些费时的、阻塞的任务。因为这些任务并没有必要把他放在 web 应用中处理,比如用户注册时发送邮件验证其有效性的操作。
启动 worker 进程非常简单,只需从工程根目录下运行命令:
- $ rqworker high normal low
- 20:52:58 RQ worker started, version 0.4.6
- 20:52:58
- 20:52:58 *** Listening on high, normal, low...
-
Worker 会从指定的队列中读取 jobs(任务),读取顺序是按照其先后顺序(high,normal,low)来处理的,worker 会不断循环处理,直到所有任务处理完成后才处于等待状态。
每个 worker 在同一时间只处理单个任务,在 worker 中没有并发处理,如果你想并发地处理任务,那么启动多个 worker 即可。
默认情况下,workers 启动后立马开始工作,直到所有任务完成时开始阻塞等待新的任务进来,不过workers还可以以一种叫burst的模式启动,这种方式启动时,workers完成所有任务后,一旦返现队列没有任务了,那么该进程就退出。
这种使用场景一般用于临时性处理一些任务,或者业务高分期临时扩展业务来处理一下这些任务。
worker 的生命周期由如下几个步骤构成:
基本上,rqworker shell 脚本是一个简单的fetch-fork执行循环,当很多任务做很多设置或者他们都依靠相同的模块,你可以在每次执行任务的时候支付这个开销(因为你在forking之后正在导入)。这是很清晰的,因为RQ并不会泄露内存,更不会变慢。
你可以使用一种模式提高其吞吐量的性能,没有任何方法告诉RQ workers去执行,但是你可以自己启动循环。
为了达到这种效果,自己提供一个脚本(取代rqworker)。简单实现例子:
- #!/usr/bin/env python
- import sys
- from rq import Queue, Connection, Worker
-
- #预加载库
- import library_that_you_want_prelaoded
-
- #提供队列名字准备去监听
- # 类似与 rqworker
- with Connection():
- qs = map(Queue, sys.argv[1:]) or [Queue()]
- w = Worker(qs)
- w.work()
-
Workers 注册到系统以他们的名字,你可以看 monitoring,默认情况下,worker的名字等于当前的hostname和当前的PID追加而成,要是要覆盖这个默认的名字,可以在启动worker的时候,使用--name 选项来指定。
任何时候,workers接收SIGINT(又叫Ctrl+C)或者SIGTERM(又叫kill),worker等待知道当前正在运行的任务完成,就会停止work循环,然后优雅地注册自己死亡。
如果在down的过程中,SIGINT或者SIGTERM再次接收了,那么worker将强制终止子进程(通过发送SIGKILL).但是仍然会注册自己死亡。
如果你喜欢通过配置文件的方式来配置rqworker,那么你可以通过类似settings.py的方式指定配置:
- REDIS_HOST = “redis.example.com”
- REDIS_PORT = 6379
-
- # 你还可以指定Redis DB去使用
- # REDIS_DB = 3
- # REDIS_PASSWORD = ‘VERY SECRET’
-
- # 要监听的队列
- Queue = [‘high’,’normal’, ‘low’]