Celery 是一个简单、灵活且可靠的分布式系统,可以处理大量消息,同时为操作提供维护此类系统所需的工具。
它是一个专注于实时处理的任务队列,同时也支持任务调度。
1、celery的介绍
1.1 什么是任务队列
任务队列用作跨线程或机器分配工作的机制。
任务队列的输入是称为任务的工作单元。专用的工作进程不断监视任务队列以执行新工作。
Celery 通过消息进行通信,通常使用代理在客户和工作人员之间进行调解。为了启动任务,客户端将一条消息添加到队列中,然后代理将该消息传递给工作人员。
Celery 系统可以由多个 worker 和 broker 组成,让位于高可用性和横向扩展。
1.2 celery的使用
Celery需要消息传输来发送和接收消息。
RabbitMQ 和 Redis 代理传输功能完备,但也支持无数其他实验性解决方案,包括使用 SQLite 进行本地开发。
Celery可以在单机上运行,也可以在多台机器上运行,甚至可以跨数据中心运行。
1.3 celery的介绍
简单:
Celery 易于使用和维护,并且不需要配置文件。
from celery import Celery
app = Celery('hello', broker='amqp://guest@localhost//')
@app.task
def hello():
return 'hello world'
高可用:
Worker 和 clients 会在连接丢失或失败时自动重试,一些 broker 以Primary/Primary或Primary/Replica复制的方式支持 HA。
快速:
单个 Celery 进程每分钟可以处理数百万个任务,往返延迟为亚毫秒级(使用 RabbitMQ、librabbitmq 和优化设置)。
灵活:
Celery的几乎每个部分都可以单独扩展或使用,自定义池实现、序列化程序、压缩方案、日志记录、调度程序、消费者、生产者、代理传输等等。
1.4 celery支持
-
中间人
-
结果存储
-
AMQP、 Redis
-
Memcached
-
SQLAlchemy、Django ORM
-
Apache Cassandra, Elasticsearch, Riak
-
MongoDB, CouchDB, Couchbase, ArangoDB
-
Amazon DynamoDB, Amazon S3
-
Microsoft Azure Block Blob, Microsoft Azure Cosmos DB
-
File system
-
-
并发
-
序列化
-
pickle、json、yaml、msgpack
-
zlib、bzip2 compression
-
Cryptographic message signing
-
1.5 celery特点功能
-
监控
可以针对整个流程进行监控,内置的工具或可以实时说明当前集群的概况。
-
调度
可以通过调度功能在一段时间内指定任务的执行时间 datetime,也可以根据简单每隔一段时间进行执行重复的任务,支持分钟、小时、星期几,也支持某一天或某一年的Crontab表达式。
-
工作流
可以通过“canvas“进行组成工作流,其中包含分组、链接、分块等等。
简单和复杂的工作流程可以使用一组“canvas“组成,其中包含分组、链接、分块等。
-
资源(内存)泄漏保护
--max-tasks-per-child
参数适用于可能会出现资源泄漏(例如:内存泄漏)的任务。
-
时间和速率的限制
可以控制每秒/分钟/小时执行任务的次数,或者任务执行的最长时间,也将这些设置为默认值,针对特定的任务或程序进行定制化配置。
-
自定义组件
开发者可以定制化每一个Worker以及额外的组件。Worker是用 “bootsteps” 构建的-一个依赖关系图,可以对Worker的内部进行细粒度控制。
1.6 celery框架集成
Celery可以快速的集成一些常用的Web框架,详细如下:
Web框架 |
集成包 |
不需要 |
|
针对 Django ,请参考 Django 的初次使用。
集成包并不是必须安全的,但使用它们可以更加快速和方便的开发,有时它们会在 fork(2) 中添加例如数据库关闭连接的回调。
2、 celery简单项目实战
2.1 项目布局:
celery.celery.py:
from celery import Celery
app = Celery('proj',
broker='redis://192.168.124.49:6379/0',
backend='redis://192.168.124.49:6379/0',
include=['celery_proj.tasks'])
# Optional configuration, see the application user guide.
app.conf.update(
result_expires=3600,
)
if __name__ == '__main__':
app.start()
broker
参数指定要使用的代理的 URL。backend
参数指定要使用的结果后端。- include参数是工作程序启动时要导入的模块列表。您需要在此处添加我们的任务模块,以便worker能够找到我们的任务。
celery_proj.tasks.py:
from .celery import app
@app.task
def add(x, y):
return x + y
@app.task
def mul(x, y):
return x * y
@app.task
def xsum(numbers):
return sum(numbers)
2.2 启动worker
celery程序可以用来启动worker(需要在proj上面的目录下运行worker)
celery -A celery_proj.celery worker -l info -P eventlet
注意:在windows中执行时,添加-P eventlet,否则可能执行任务时报错。
--help:可以通过传入标志来获取完整的命令行参数列表:
celery worker --help
Usage: celery worker [OPTIONS]
Start worker instance.
Examples -------- $ celery --app=proj worker -l INFO $ celery -A proj worker
-l INFO -Q hipri,lopri $ celery -A proj worker --concurrency=4 $ celery -A
proj worker --concurrency=1000 -P eventlet $ celery worker --autoscale=10,0
Worker Options:
-n, --hostname HOSTNAME Set custom hostname (e.g., 'w1@%%h').
Expands: %%h (hostname), %%n (name) and %%d,
(domain).
-D, --detach Start worker as a background process.
-S, --statedb PATH Path to the state database. The extension
'.db' may be appended to the filename.
-l, --loglevel [DEBUG|INFO|WARNING|ERROR|CRITICAL|FATAL]
Logging level.
-O [default|fair] Apply optimization profile.
--prefetch-multiplier <prefetch multiplier>
Set custom prefetch multiplier value for
this worker instance.
Pool Options:
-c, --concurrency <concurrency>
Number of child processes processing the
queue. The default is the number of CPUs
available on your system.
-P, --pool [prefork|eventlet|gevent|solo|processes|threads]
Pool implementation.
-E, --task-events, --events Send task-related events that can be
captured by monitors like celery events,
celerymon, and others.
--time-limit FLOAT Enables a hard time limit (in seconds
int/float) for tasks.
--soft-time-limit FLOAT Enables a soft time limit (in seconds
int/float) for tasks.
--max-tasks-per-child INTEGER Maximum number of tasks a pool worker can
execute before it's terminated and replaced
by a new worker.
--max-memory-per-child INTEGER Maximum amount of resident memory, in KiB,
that may be consumed by a child process
before it will be replaced by a new one. If
a single task causes a child process to
exceed this limit, the task will be
completed and the child process will be
replaced afterwards. Default: no limit.
Queue Options:
--purge, --discard
-Q, --queues COMMA SEPARATED LIST
-X, --exclude-queues COMMA SEPARATED LIST
-I, --include COMMA SEPARATED LIST
Features:
--without-gossip
--without-mingle
--without-heartbeat
--heartbeat-interval INTEGER
--autoscale <MIN WORKERS>, <MAX WORKERS>
Embedded Beat Options:
-B, --beat
-s, --schedule-filename, --schedule TEXT
--scheduler TEXT
Daemonization Options:
-f, --logfile TEXT
--pidfile TEXT
--uid TEXT
--uid TEXT
--gid TEXT
--umask TEXT
--executable TEXT
Options:
--help Show this message and exit.
在生产环境中,您需要在后台运行 worker。
守护进程脚本使用celery multi命令在后台启动一个或多个 worker:
celery multi start w1 -A proj -l INFO
也可以重新启动它:
celery multi restart w1 -A proj -l INFO
停止它:
celery multi stop w1 -A proj -l INFO
该stop
命令是异步的,因此它不会等待 worker 关闭。您可能希望改用该stopwait
命令,以确保在退出之前完成所有当前正在执行的任务:
celery multi stopwait w1 -A proj -l INFO
注意:celery multi不存储有关worker的信息,因此您需要在重新启动时使用相同的命令行参数。停止时只能使用相同的 pidfile 和 logfile 参数。
默认情况下,它将在当前目录中创建 pid 和日志文件。为了防止多个 worker 相互叠加启动,我们建议您将它们放在一个专用目录中:
mkdir -p /var/run/celery
mkdir -p /var/log/celery
celery multi start w1 -A proj -l INFO --pidfile=/var/run/celery/%n.pid --logfile=/var/log/celery/%n%I.log
使用 multi 命令,可以启动多个 worker,并且还有一个强大的命令行语法来为不同的 worker 指定参数,例如:
celery multi start 10 -A proj -l INFO -Q:1-3 images,video -Q:4,5 data -Q default -L:4,5 debug
2.3 任务调度
使用以下方法调用任务delay():
from proj.tasks import add
add.delay(2, 2)
此方法实际上是另一种方法的星号参数快捷方式 apply_async()
:
add.apply_async((2, 2))
后者使您能够指定执行选项,例如运行时间(倒计时)、它应该发送到的队列等等:
add.apply_async((2, 2), queue='lopri', countdown=10)
在上面的示例中,任务将被发送到一个名为的队列lopri
,并且该任务最早将在消息发送后 10 秒执行。
直接应用任务将在当前进程中执行任务,因此不会发送消息:
from celery_proj.tasks import add, mul
ret = add(2, 3)
print(ret)
这三个方法 - delay()
、apply_async()
和 applying ( __call__
) 构成了 Celery 调用 API,它也用于签名。
每个任务调用都将被赋予一个唯一标识符(UUID)——这就是任务 ID。
如果您配置了结果后端,则可以检索任务的返回值:
from celery_proj.tasks import add
ret = add.delay(2, 3)
print(ret)
print(ret.get(timeout=1))
# 通过查看id属性找到任务的 ID
print(ret.id)
如果任务引发异常,还可以检查异常和回溯,实际上result.get()
默认情况下会传播任何错误:
from celery_proj.tasks import add
ret = add.delay(2, '3')
print(ret.get(timeout=1))
如果您不希望错误传播,您可以通过传递来禁用它propagate
:
from celery_proj.tasks import add
ret = add.delay(2, '3')
print(ret.get(propagate=False, timeout=1))
在这种情况下,它将返回引发的异常实例——因此要检查任务是成功还是失败,您必须在结果实例上使用相应的方法:
from celery_proj.tasks import add
ret = add.delay(2, '3')
print(ret.get(propagate=False, timeout=1))
print(ret.failed())
print(ret.successful())
那么它如何知道任务是否失败呢?它可以通过查看任务状态来找出:
from celery_proj.tasks import add
ret = add.delay(2, '3')
print(ret.get(propagate=False, timeout=1))
print(ret.state)
一个任务只能处于一个状态,但它可以在多个状态之间进行。典型任务的阶段可以是:
启动状态是一种特殊状态,只有在 task_track_started启用设置或 @task(track_started=True)为任务设置了选项时才会记录该状态。
挂起状态实际上不是记录状态,而是任何未知任务 ID 的默认状态:您可以从这个例子中看到这一点:
from celery_proj.celery import app
res = app.AsyncResult('this-id-does-not-exist')
print(res.state)
如果重试任务,阶段可能会变得更加复杂。为了演示,对于重试两次的任务,阶段将是:
2.4 签名
签名包装了单个任务调用的参数和执行选项,使其可以传递给函数,甚至可以序列化并通过网络发送。
add
您可以使用 arguments和 10 秒倒计时为任务创建签名,如下所示:(2, 2)
还有一个使用星号参数的快捷方式:
签名实例还支持调用 API,这意味着它们具有delay
和apply_async
方法。
但区别在于签名可能已经指定了参数签名。该add
任务有两个参数,因此指定两个参数的签名将构成完整的签名:
也可以制作不完整的签名来创建我们所说的 partials:
s2
现在是需要另一个参数才能完成的部分签名,这可以在调用签名时解决:
在这里,您将参数 8 添加到现有参数 2 之前,形成了 的完整签名。add(8, 2)
关键字参数也可以稍后添加;然后将它们与任何现有的关键字参数合并,但新参数优先:
如前所述,签名支持调用 API:这意味着
- sig.apply_async(args=(), kwargs={}, **options):使用可选的部分参数和部分关键字参数调用签名。还支持部分执行选项。
- sig.delay(*args, **kwargs):任何参数都将添加到签名中的参数之前,关键字参数与任何现有键合并。
2.5 路由
Celery 支持 AMQP 提供的所有路由工具,但它也支持将消息发送到命名队列的简单路由。
该task_routes设置使您能够按名称路由任务并将所有内容集中在一个位置:
app.conf.update(
task_routes = {
'proj.tasks.add': {'queue': 'hipri'},
},
)
还可以在运行时使用以下queue
参数指定队列apply_async
:
from proj.tasks import add
add.apply_async((2, 2), queue='hipri')
可以通过指定选项让worker从该队列中消费:celery worker -Q
可以使用逗号分隔的列表指定多个队列。例如,您可以让worker同时从默认队列和队列中消费,其中默认队列是由于历史原因hipri
而命名的:celery
队列的顺序无关紧要,因为worker会给队列同等的权重。
2.6 时区
所有时间和日期,内部和消息中都使用 UTC 时区。
当worker收到一条消息时,例如设置了倒计时,它会将 UTC 时间转换为本地时间。如果您希望使用与系统时区不同的时区,则必须使用以下timezone设置进行配置:
app.conf.timezone = 'Europe/London'