场景:

自动化很多文件均需要执行,逐个执行耗时较久,每个文档之间没有关联,考虑分布式执行

实际应用:

1.在settings.py 文件的INSTALLED_APPS中添加 'djcelery' ,或者['django_celery_beat','django_celery_results',]

2.在settings.py 文件添加如下代码:

TIME_ZONE = 'Asia/Shanghai'
CELERY_TIMEZONE = TIME_ZONE
CELERY_ENABLE_UTC = False
# 解决时区问题
DJANGO_CELERY_BEAT_TZ_AWARE = False

DJANGO_CELERY_RESULTS_TASK_ID_MAX_LENGTH = 191

# celery 内容等消息的格式设置
CELERY_ACCEPT_CONTENT = ['application/json', ]
# 结果序列化方案
CELERY_RESULT_SERIALIZER = 'json'  
CELERY_TASK_SERIALIZER = 'json'

# 使用redis作为中间件
CELERY_BEAT_SCHEDULER = 'django_celery_beat.schedulers:DatabaseScheduler'

# 单个任务的运行时间限制,否则会被杀死,任务移交给父进程
CELERY_TASK_TIME_LIMIT = 60 * 30

# celery任务执行结果的过期时间
CELERY_TASK_RESULT_EXPIRES = 0

# 每个worker执行多次任务会死掉
CELERY_WORKER_MAX_TASKS_PER_CHILD = 200

# 限制所有任务的刷新频率
CELERY_ANNOTATIONS = {'*': {'rate_limit': '10/s'}}

# celery worker每次去redis取任务的数量,默认值就是4
CELERY_WORKER_PREFETCH_MULTIPLIER = 1

# celery worker的并发数,默认是服务器的内核数目,也是命令行-c参数指定的数目
CELERY_WORKER_CONCURRENCY = 6

# 非常重要,有些情况下可以防止死锁
CELERYD_FORCE_EXECV = True

# 允许重试
CELERY_ACKS_LATE = True

3.创建celery.py文件

# from __future__ imports must at the beginning of the file
from __future__ import absolute_import, unicode_literals
import os
from celery import Celery

os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'test.settings')
app = Celery('test')
# 使用CELERY_ 作为前缀,在settings中写配置
app.config_from_object('django.conf:settings', namespace='CELERY')
# 发现任务文件每个app下的task.py
app.autodiscover_tasks()

4.在__init__.py(即创建celery.py的工程目录下)中配置如下内容:

# 引入celery实例对象
from __future__ import absolute_import, unicode_literals
from .celery import app as celery_app

__all__ = ['celery_app']

todo:如何应用,如何跟踪日志

参考文档:

Celery - 分布式任务队列 — Celery 3.1.7 文档

https://www.cnblogs.com/Vera-y/p/11971252.html

Logo

魔乐社区(Modelers.cn) 是一个中立、公益的人工智能社区,提供人工智能工具、模型、数据的托管、展示与应用协同服务,为人工智能开发及爱好者搭建开放的学习交流平台。社区通过理事会方式运作,由全产业链共同建设、共同运营、共同享有,推动国产AI生态繁荣发展。

更多推荐