searchusermenu
  • 发布文章
  • 消息中心
点赞
收藏
评论
分享
原创

Celery分布式任务支持多消费者实践

2023-06-20 02:22:22
85
0

1.背景

Celery 是一个功能强大、灵活可扩展的任务队列框架,广泛应用于异步任务处理、定时任务调度和分布式系统中。它简化了任务的管理和执行,提高了系统的可靠性、性能和可扩展性。无论是构建高性能的 Web 应用程序、处理大量的异步任务还是调度定时任务,Celery 都是一个非常有价值的工具。

在某些场景下,我们可能需要配置多个消费者来处理不同类型的任务,以便更好地分配和利用系统资源。本文将重点介绍如何配置 Celery 多消费者,以及如何确保不同任务由不同消费者进行消费。

2.Celery组件介绍

2.1 Celery Worker

是负责执行任务的后台工作进程。它从消息代理(如 RabbitMQ 或 Redis)接收任务消息,并按照预定义的任务路由和调度规则执行任务。可以启动多个 Celery Worker 来实现并发处理和分布式处理任务。

2.2 Celery Beat

是 Celery 的任务调度器。它负责按照预定的时间表和规则触发定时任务的执行。Celery Beat 使用了底层的消息代理来触发任务的调度,确保任务在指定的时间点或时间间隔内被执行。

2.3 消息代理Broker

是 Celery 的核心组件之一。它负责接收、存储和分发任务消息。常用的消息代理包括 RabbitMQ、Redis等。消息代理实现了任务消息的队列和分发机制,确保任务能够安全、可靠地传递给 Celery Worker 进行执行。

2.4 结果存储组件Result Backend

是用于存储任务的执行结果。当任务执行完成后,Celery 将结果存储到指定的后端,如数据库、缓存或消息代理。结果存储允许您轻松地获取任务的执行结果,进行后续处理或展示。

3.配置多消费者

如果想在同一个应用程序app中配置不同的任务队列,以便不同的消费者处理不同的队列,可以按照以下步骤进行配置:

3.1 加载 Celery 配置

在 Django 项目的根目录中的 celery.py 文件中,创建一个 Celery 实例,并从 Django 的配置中加载 Celery 配置:

from celery import Celery

from django.conf import settings

app = Celery('project')

app.config_from_object(settings, namespace='CELERY')

3.2 配置 Celery 的队列和路由

在 Django 的 settings.py 文件中,配置 Celery 的队列和路由:

CELERY_QUEUES = {
    Queue('default', Exchange('default'), routing_key='default', exchange_type="topic"),
    Queue('long_task', Exchange('long_task'), routing_key='long_task', exchange_type='topic')
    # 可以配置更多队列
}

CELERY_ROUTES = {
    'app1.tasks.task1': {'queue': 'default'},
    'app1.tasks.task2': {'queue': 'long_task'},
    # 配置其他任务和队列的映射关系
}

CELERY_DEFAULT_QUEUE = 'default'  # 设置默认队列名称
CELERY_DEFAULT_ROUTING_KEY = 'default'  # 设置默认路由键

# 配置 Celery Broker(消息代理)

CELERY_BROKER_URL = 'amqp://localhost'  # RabbitMQ 的连接 URL 或 Redis 的连接 URL
CELERY_RESULT_BACKEND = 'db+postgresql://your_username:your_password@localhost/your_database'  # 结果存储后端配置

在上面的示例中,我们定义了两个队列(queue1 和 queue2),并将任务 app1.tasks.task1 分配给 queue1,将任务 app1.tasks.task2 分配给 queue2。也可以根据实际需求配置更多的队列和任务的映射关系。

3.3 指定队列

在任务定义中,为每个任务指定相应的队列:

from celery import shared_task

@shared_task(queue='default')
def task1():

    # 处理队列1的任务逻辑

@shared_task(queue='long_task')
def task2():

    # 处理队列2的任务逻辑

通过为任务添加queue参数,并指定相应的队列名称,将任务与特定的队列关联起来。

3.4 启动Worker

启动多个 Celery Worker,并为每个 Worker 指定相应的队列:

celery -A project worker -Q default  -c 20

celery -A project worker -Q long_task -c 20

在每个命令中使用-Q参数来指定要处理的队列名称。

 

4.总结

通过以上步骤,我们可以在同一个应用程序app中配置不同的队列,并由不同的消费者处理不同的队列任务。这样可以提高系统的可伸缩性和性能,使任务处理更加高效和灵活,更好地管理任务的分配和利用系统资源。

0条评论
0 / 1000
郑****腾
2文章数
0粉丝数
郑****腾
2 文章 | 0 粉丝
郑****腾
2文章数
0粉丝数
郑****腾
2 文章 | 0 粉丝
原创

Celery分布式任务支持多消费者实践

2023-06-20 02:22:22
85
0

1.背景

Celery 是一个功能强大、灵活可扩展的任务队列框架,广泛应用于异步任务处理、定时任务调度和分布式系统中。它简化了任务的管理和执行,提高了系统的可靠性、性能和可扩展性。无论是构建高性能的 Web 应用程序、处理大量的异步任务还是调度定时任务,Celery 都是一个非常有价值的工具。

在某些场景下,我们可能需要配置多个消费者来处理不同类型的任务,以便更好地分配和利用系统资源。本文将重点介绍如何配置 Celery 多消费者,以及如何确保不同任务由不同消费者进行消费。

2.Celery组件介绍

2.1 Celery Worker

是负责执行任务的后台工作进程。它从消息代理(如 RabbitMQ 或 Redis)接收任务消息,并按照预定义的任务路由和调度规则执行任务。可以启动多个 Celery Worker 来实现并发处理和分布式处理任务。

2.2 Celery Beat

是 Celery 的任务调度器。它负责按照预定的时间表和规则触发定时任务的执行。Celery Beat 使用了底层的消息代理来触发任务的调度,确保任务在指定的时间点或时间间隔内被执行。

2.3 消息代理Broker

是 Celery 的核心组件之一。它负责接收、存储和分发任务消息。常用的消息代理包括 RabbitMQ、Redis等。消息代理实现了任务消息的队列和分发机制,确保任务能够安全、可靠地传递给 Celery Worker 进行执行。

2.4 结果存储组件Result Backend

是用于存储任务的执行结果。当任务执行完成后,Celery 将结果存储到指定的后端,如数据库、缓存或消息代理。结果存储允许您轻松地获取任务的执行结果,进行后续处理或展示。

3.配置多消费者

如果想在同一个应用程序app中配置不同的任务队列,以便不同的消费者处理不同的队列,可以按照以下步骤进行配置:

3.1 加载 Celery 配置

在 Django 项目的根目录中的 celery.py 文件中,创建一个 Celery 实例,并从 Django 的配置中加载 Celery 配置:

from celery import Celery

from django.conf import settings

app = Celery('project')

app.config_from_object(settings, namespace='CELERY')

3.2 配置 Celery 的队列和路由

在 Django 的 settings.py 文件中,配置 Celery 的队列和路由:

CELERY_QUEUES = {
    Queue('default', Exchange('default'), routing_key='default', exchange_type="topic"),
    Queue('long_task', Exchange('long_task'), routing_key='long_task', exchange_type='topic')
    # 可以配置更多队列
}

CELERY_ROUTES = {
    'app1.tasks.task1': {'queue': 'default'},
    'app1.tasks.task2': {'queue': 'long_task'},
    # 配置其他任务和队列的映射关系
}

CELERY_DEFAULT_QUEUE = 'default'  # 设置默认队列名称
CELERY_DEFAULT_ROUTING_KEY = 'default'  # 设置默认路由键

# 配置 Celery Broker(消息代理)

CELERY_BROKER_URL = 'amqp://localhost'  # RabbitMQ 的连接 URL 或 Redis 的连接 URL
CELERY_RESULT_BACKEND = 'db+postgresql://your_username:your_password@localhost/your_database'  # 结果存储后端配置

在上面的示例中,我们定义了两个队列(queue1 和 queue2),并将任务 app1.tasks.task1 分配给 queue1,将任务 app1.tasks.task2 分配给 queue2。也可以根据实际需求配置更多的队列和任务的映射关系。

3.3 指定队列

在任务定义中,为每个任务指定相应的队列:

from celery import shared_task

@shared_task(queue='default')
def task1():

    # 处理队列1的任务逻辑

@shared_task(queue='long_task')
def task2():

    # 处理队列2的任务逻辑

通过为任务添加queue参数,并指定相应的队列名称,将任务与特定的队列关联起来。

3.4 启动Worker

启动多个 Celery Worker,并为每个 Worker 指定相应的队列:

celery -A project worker -Q default  -c 20

celery -A project worker -Q long_task -c 20

在每个命令中使用-Q参数来指定要处理的队列名称。

 

4.总结

通过以上步骤,我们可以在同一个应用程序app中配置不同的队列,并由不同的消费者处理不同的队列任务。这样可以提高系统的可伸缩性和性能,使任务处理更加高效和灵活,更好地管理任务的分配和利用系统资源。

文章来自个人专栏
Celery实践
1 文章 | 1 订阅
0条评论
0 / 1000
请输入你的评论
0
0