Celery 异步任务核心知识整理 
1 Celery 核心概念 
Celery 是基于 Python 实现的分布式任务队列框架,主要用于:
- 管理分布式任务队列
- 处理耗时任务
- 支持任务队列方式的任务调度
核心特性:
- 任务执行完全脱离主程序
- 可分配到不同主机运行
- 同时支持异步任务和定时任务
2 系统架构组成 
Celery 采用三组件架构:
2.1 消息中间件 (Broker) 
- 任务调度队列,存储任务
- 接收生产者发送的任务消息
- 官方推荐使用 RabbitMQ 或 Redis
- 示例配置:celery_broker = 'amqp://guest@127.0.0.1//'
2.2 任务执行单元 (Worker) 
- 实时监控消息队列
- 获取并执行队列中的任务
- 支持分布式部署多个 Worker
2.3 结果存储 (Backend) 
- 存储任务执行结果
- 支持 RabbitMQ, Redis, MongoDB 等
- 示例配置:celery_backend = 'amqp://guest@127.0.0.1//'
3 任务定义与调用 
3.1 任务类型 
- 异步任务:业务逻辑触发后发往队列
- 定时任务:由 Celery Beat 周期性触发
3.2 任务定义示例 
python
# tasks.py
from config import app
@app.task
def add(x, y):
    return x + y
@app.task
def sub(x, y):
    return x - y3.3 任务调用方式 
python
# main.py
from tasks import add
async_result = add.apply_async((1, 100))  # 异步调用
# 结果状态检查
if async_result.successful():
    result = async_result.get()
elif async_result.failed():
    print('任务失败')4 工作流程解析 
- 任务提交:生产者调用 apply_async()提交任务
- 队列存储:Broker 接收并存储任务
- 任务获取:Worker 从 Broker 拉取任务
- 任务执行:Worker 执行具体任务逻辑
- 结果存储:执行结果存入 Backend
- 结果查询:生产者通过 AsyncResult 对象查询结果
5 应用场景 
5.1 异步任务场景 
- 发送短信/邮件
- 消息推送
- 音视频处理
- 大数据处理
5.2 定时任务场景 
- 每日数据统计报表
- 定期数据清理
- 周期性系统检查
5.3 Web 框架集成(Tornado 示例) 
python
# tornado_celery.py
import tcelery
import tornado.gen
from tasks import add
tcelery.setup_nonblocking_producer()
class CheckHandler(tornado.web.RequestHandler):
    @tornado.gen.coroutine
    def get(self):
        x = int(self.get_argument('x', '1'))
        y = int(self.get_argument('y', '2'))
        response = yield tornado.gen.Task(add.apply_async, args=[x, y])
        self.write({'results': response.result})6 配置与最佳实践 
6.1 基础配置 (config.py) 
python
from celery import Celery
from datetime import timedelta
app = Celery('celery', 
             broker='amqp://guest@127.0.0.1//',
             backend='amqp://guest@127.0.0.1//')
app.conf.update(
    CELERY_ACKS_LATE=True,  # 允许重试
    CELERYD_CONCURRENCY=4,  # 并发 Worker 数
    CELERYD_MAX_TASKS_PER_CHILD=500,  # 防内存泄漏
    CELERYD_TASK_TIME_LIMIT=360  # 超时时间(秒)
)
# 定时任务配置
app.conf.beat_schedule = {
    'sub_task': {
        'task': 'tasks.sub',
        'schedule': timedelta(seconds=30),
        'args': (300, 150)
    }
}6.2 最佳实践 
- 资源管理:使用信号机制管理资源 python- @worker_process_init.connect def init_worker(*args, **kwargs): # 初始化数据库连接等资源 @worker_process_shutdown.connect def release_worker(*args, **kwargs): # 释放资源
- 任务监控:通过状态跟踪任务执行 - PENDING:等待中
- STARTED:已开始
- RETRY:重试中
- FAILED:失败
- SUCCESS:成功
 
- 错误处理:自动重试机制 python- @app.task(bind=True, max_retries=3) def task_with_retry(self): try: # 业务代码 except Exception as exc: raise self.retry(exc=exc)
7 系统启动与管理 
7.1 启动 Worker 
bash
celery -A config worker --loglevel=info7.2 启动定时任务调度 
bash
celery -A config beat --loglevel=info7.3 监控建议 
- 使用 flower组件进行实时监控
- 日志级别设置为 info以上
- 定期检查任务积压情况
本文内容整理自知乎文章:https://zhuanlan.zhihu.com/p/319426628 Celery 官方文档:https://docs.celeryq.dev/en/stable/