celery的初次使用
基本步骤:
-
选择并且安装一个消息中间件(Broker)
-
安装 Celery 并且创建第一个任务
-
运行职程(Worker)以及调用任务
-
跟踪任务的情况以及返回值
应用
创建第一个 Celery 实例程序,把创建 Celery 程序成为 Celery 应用,创建的第一个实例程序可能需要包含 Celery 中执行操作的所有入口点,例如创建任务、管理职程(Worker)等,所以必须要导入 Celery 模块。
首先创建 tasks.py:
# tasks.py
from celery import Celery
# app = Celery('tasks', broker='amqp://guest@localhost//')
app = Celery('tasks', broker='redis://127.0.0.1/2')
@app.task
def add(x, y):
return x + y
第一个参数为当前模块的名称,只有在 __main__ 模块中定义任务时才会生产名称。
第二个参数为中间人(Broker)的链接 URL ,实例中使用的 RabbitMQ(Celery默认使用的也是RabbitMQ)。
更多相关的 Celery 中间人(Broker)的选择方案,可查阅官方文档。例如,对于 RabbitMQ 可以写为 amqp://localhost ,使用 Redis 可以写为 redis://localhost。
创建了一个名称为 add 的任务,返回的俩个数字的和。
运行 Celery 职程(Worker)服务
现在可以使用 worker 参数进行执行刚刚创建职程(Worker):
celery -A tasks worker -l info
关于 Celery 可用的命令完整列表,可以通过以下命令进行查看:
celery worker --help
也可以通过以下命令查看一些 Celery 帮助选项:
celery help
调用任务
需要调用我们创建的实例任务,可以通过 delay()
进行调用。
delay()
是 apply_async()
的快捷方法,可以更好的控制任务的执行:
可以在交互式shell环境中运行下列代码
from tasks import add
add.delay(4, 4)
该任务已经有职程(Worker)开始处理,可以通过控制台输出的日志进行查看执行情况。
调用任务会返回一个 AsyncResult 的实例,用于检测任务的状态,等待任务完成获取返回值(如果任务执行失败,会抛出异常)。默认这个功能是不开启的,如果开启则需要配置 Celery 的结果后端。
执行上面代码可能遇到的问题:
新的报错
ValueError: not enough values to unpack (expected 3, got 0)
- 安装
pip install eventlet
- 然后启动worker的时候加一个参数,如下:(执行任务)
celery worker -A <mymodule> -l INFO
- 替换它
celery -A <mymodule> worker -l info -P eventlet
保存结果
如果您需要跟踪任务的状态,Celery 需要在某处存储任务的状态信息。Celery 内置了一些后端结果:SQLAlchemy/Django ORM、Memcached、Redis、 RPC (RabbitMQ/AMQP)以及自定义的后端结果存储中间件。
本次实例,使用 RPC 作为结果后端,将状态信息作为临时消息回传。后端通过 backend 参数指定给 Celery(或者通过配置模块中的 result_backend 选项设定):
app = Celery('tasks', backend='rpc://', broker='pyamqp://')
可以使用Redis作为Celery结果后端,使用RabbitMQ作为中间人(Broker)可以使用以下配置(这种组合比较流行):
app = Celery('tasks', backend='redis://localhost', broker='pyamqp://')
现在已经配置结果后端,重新调用执行任务。会得到调用任务后返回的一个 AsyncResult 实例:
>>> result = add.delay(4, 4)
ready()
可以检测是否已经处理完毕:
>>> result.ready()
False
整个任务执行过程为异步的,如果一直等待任务完成,会将异步调用转换为同步调用:
>>> result.get(timeout=1)
8
如果任务出现异常,get()
会再次引发异常,可以通过 propagate 参数进行覆盖:
>>> result.get(propagate=False)
如果任务出现异常,可以通过以下命令进行回溯:
>>> result.traceback
配置
Celery 像家用电器一样,不需要任何配置,开箱即用。它有一个输入和输出,输入端必须连接中间人(Broker),输出端可以连接到结果后端。如果仔细观察一些家用电器,会发现有很多到按钮,这就是配置。
大多数情况下,使用默认的配置就可以满足,也可以按需配置。
可以直接在程序中进行配置,也可以通过配置模块进行专门配置。例如,通过 task_serializer 选项可以指定序列化的方式:
app.conf.task_serializer = 'json'
如果需要配置多个选项,可以通过 update 进行配置:
app.conf.update(
task_serializer='json',
accept_content=['json'], # Ignore other content
result_serializer='json',
timezone='Europe/Oslo',
enable_utc=True,
)
针对大型的项目,建议使用专用配置模块,进行针对 Celery 配置。不建议使用硬编码,建议将所有的配置项集中化配置。集中化配置可以像系统管理员一样,当系统发生故障时可针对其进行微调。
可以通过 app.config_from_object()
进行加载配置模块:
app.config_from_object('celeryconfig')
其中 celeryconfig 为配置模块的名称,这个是可以自定义修改的、
在上面的实例中,需要在同级目录下创建一个名为 celeryconfig.py
的文件,添加以下内容:
celeryconfig.py
broker_url = 'pyamqp://'
result_backend = 'rpc://'
task_serializer = 'json'
result_serializer = 'json'
accept_content = ['json']
timezone = 'Europe/Oslo'
enable_utc = True
可以通过以下命令来进行验证配置模块是否配置正确:
$ python -m celeryconfig
Celery 也可以设置任务执行错误时的专用队列中,这只是配置模块中一小部分,详细配置如下:
# celeryconfig.py
task_routes = {
'tasks.add': 'low-priority',
}
Celery 也可以针对任务进行限速,以下为每分钟内允许执行的10个任务的配置:
# celeryconfig.py
task_annotations = {
'tasks.add': {'rate_limit': '10/m'}
}
如果使用的是 RabbitMQ 或 Redis 的话,可以在运行时进行设置任务的速率:
$ celery -A tasks control rate_limit tasks.add 10/m
worker@example.com: OK
new rate limit set successfully