使用步骤
1、安装
pip install django django-celery
2、新建工程
$ django-admin.py startproject celery_project
$ python manage.py startapp course
$ cd celery_project
项目结构
├── celery_project
│ ├── __init__.py
│ ├── celery_config.py
│ ├── settings.py
│ ├── urls.py
│ └── wsgi.py
├── course
│ ├── __init__.py
│ ├── admin.py
│ ├── apps.py
│ ├── migrations
│ │ ├── __init__.py
│ ├── models.py
│ ├── tasks.py
│ ├── tests.py
│ └── views.py
├── db.sqlite3
└── manage.py
3、新建任务
course/tasks.py
# -*- coding: utf-8 -*-
import time
from celery.task import Task
class CourseTask(Task):
name = "course-task"
def run(self, *args, **kwargs):
print("start course task")
time.sleep(4) # 模拟耗时
print(args, kwargs)
print("end course task")
4、配置celery
celery_project/celery_config.py
# -*- coding: utf-8 -*-
import djcelery
from datetime import timedelta
djcelery.setup_loader()
BROKER_BACKEND = "redis"
BROKER_URL = 'redis://localhost:6379/1'
CELERY_RESULT_BACKEND = 'redis://localhost:6379/2'
# 设置任务队列
CELERY_QUEUES = {
"beat_queue": {
"exchange": "beat_queue",
"exchange_type": "direct",
"binding_key": "beat_queue"
},
"work_queue": {
"exchange": "work_queue",
"exchange_type": "direct",
"binding_key": "work_queue"
}
}
# 设置默认队列
CELERY_DEFAULT_QUEUE = "work_queue"
# 注册任务函数
CELERY_IMPORTS = (
"course.tasks",
)
# 设置时区,默认UTC
CELERY_TIMEZONE = 'Asia/Shanghai'
# 有些情况下防止死锁
CELERYD_FORCE_EXECV = True
# 设置并发的worker数量
CELERYD_CONCURRENCY = 4
# 允许重试
CELERYD_ACKS_LATE = True
# 每个worker最多执行100个任务,防止内存泄漏
CELERYD_MAX_TASKS_PER_CHILD = 100
# 单个任务最大运行时间
CELERYD_TASK_TIME_LIMIT = 6 * 60
5、配置settings
celery_project/settings.py
# 添加app
INSTALLED_APPS = [
...
'djcelery',
'course'
]
# 引入 Celery设置
from .celery_config import *
6、编写视图函数
course/views.py
# -*- coding: utf-8 -*-
from course.tasks import CourseTask
from django.http import JsonResponse
def do(reqeust):
# 执行异步任务
print("start do")
CourseTask.apply_async(args=("hello", ), queue="work_queque")
print("end do")
return JsonResponse({"result": "ok"})
7、配置路由
celery_project/urls.py
from course import views
urlpatterns = [
url(r'^do/$', views.do)
]
8、启动Django服务和 worker
$ python manage.py help
$ python manage.py runserver
$ python manage.py celery worker -l INFO
# 或者
$ python manage.py celery worker -Q queue
访问测试:http://127.0.0.1:8000/do/
9、设置定时任务
celery_project/celery_config.py
# 设置定时任务
CELERYBEAT_SCHEDULE = {
"course-task": {
"task": "course-task",
"schedule": timedelta(seconds=5),
"options": {
"queue": "beat_queue"
}
}
}
10、启动beat
$ python manage.py celery beat -l INFO
监控工具flower
安装
pip install flower
启动
celery flower --address=0.0.0.0 --port=5555 --broker=redis://localhost:6379/1 --basic_auth=user:123
django中启动
python manage.py celery flower
访问管理界面 http://localhost:5555
总结
Django中使用Celery 只是多了3个步骤:
1、task任务编写
2、celery配置
3、启动celery-worker