Celery 异步任务核心知识整理
1 Celery 核心概念
Celery 是基于 Python 实现的分布式任务队列框架,主要用于:
- 管理分布式任务队列
- 处理耗时任务
- 支持任务队列方式的任务调度
核心特性:
- 任务执行完全脱离主程序
- 可分配到不同主机运行
- 同时支持异步任务和定时任务
2 系统架构组成
Celery 采用三组件架构:
2.1 消息中间件 (Broker)
- 任务调度队列,存储任务
- 接收生产者发送的任务消息
- 官方推荐使用 RabbitMQ 或 Redis
- 示例配置:
celery_broker = 'amqp://guest@127.0.0.1//'
2.2 任务执行单元 (Worker)
- 实时监控消息队列
- 获取并执行队列中的任务
- 支持分布式部署多个 Worker
2.3 结果存储 (Backend)
- 存储任务执行结果
- 支持 RabbitMQ, Redis, MongoDB 等
- 示例配置:
celery_backend = 'amqp://guest@127.0.0.1//'
3 任务定义与调用
3.1 任务类型
- 异步任务:业务逻辑触发后发往队列
- 定时任务:由 Celery Beat 周期性触发
3.2 任务定义示例
python
# tasks.py
from config import app
@app.task
def add(x, y):
return x + y
@app.task
def sub(x, y):
return x - y
3.3 任务调用方式
python
# main.py
from tasks import add
async_result = add.apply_async((1, 100)) # 异步调用
# 结果状态检查
if async_result.successful():
result = async_result.get()
elif async_result.failed():
print('任务失败')
4 工作流程解析
- 任务提交:生产者调用
apply_async()
提交任务 - 队列存储:Broker 接收并存储任务
- 任务获取:Worker 从 Broker 拉取任务
- 任务执行:Worker 执行具体任务逻辑
- 结果存储:执行结果存入 Backend
- 结果查询:生产者通过 AsyncResult 对象查询结果
5 应用场景
5.1 异步任务场景
- 发送短信/邮件
- 消息推送
- 音视频处理
- 大数据处理
5.2 定时任务场景
- 每日数据统计报表
- 定期数据清理
- 周期性系统检查
5.3 Web 框架集成(Tornado 示例)
python
# tornado_celery.py
import tcelery
import tornado.gen
from tasks import add
tcelery.setup_nonblocking_producer()
class CheckHandler(tornado.web.RequestHandler):
@tornado.gen.coroutine
def get(self):
x = int(self.get_argument('x', '1'))
y = int(self.get_argument('y', '2'))
response = yield tornado.gen.Task(add.apply_async, args=[x, y])
self.write({'results': response.result})
6 配置与最佳实践
6.1 基础配置 (config.py)
python
from celery import Celery
from datetime import timedelta
app = Celery('celery',
broker='amqp://guest@127.0.0.1//',
backend='amqp://guest@127.0.0.1//')
app.conf.update(
CELERY_ACKS_LATE=True, # 允许重试
CELERYD_CONCURRENCY=4, # 并发 Worker 数
CELERYD_MAX_TASKS_PER_CHILD=500, # 防内存泄漏
CELERYD_TASK_TIME_LIMIT=360 # 超时时间(秒)
)
# 定时任务配置
app.conf.beat_schedule = {
'sub_task': {
'task': 'tasks.sub',
'schedule': timedelta(seconds=30),
'args': (300, 150)
}
}
6.2 最佳实践
资源管理:使用信号机制管理资源
python@worker_process_init.connect def init_worker(*args, **kwargs): # 初始化数据库连接等资源 @worker_process_shutdown.connect def release_worker(*args, **kwargs): # 释放资源
任务监控:通过状态跟踪任务执行
PENDING
:等待中STARTED
:已开始RETRY
:重试中FAILED
:失败SUCCESS
:成功
错误处理:自动重试机制
python@app.task(bind=True, max_retries=3) def task_with_retry(self): try: # 业务代码 except Exception as exc: raise self.retry(exc=exc)
7 系统启动与管理
7.1 启动 Worker
bash
celery -A config worker --loglevel=info
7.2 启动定时任务调度
bash
celery -A config beat --loglevel=info
7.3 监控建议
- 使用
flower
组件进行实时监控 - 日志级别设置为
info
以上 - 定期检查任务积压情况
本文内容整理自知乎文章:https://zhuanlan.zhihu.com/p/319426628 Celery 官方文档:https://docs.celeryq.dev/en/stable/