Skip to content

Celery 异步任务核心知识整理

1 Celery 核心概念

Celery 是基于 Python 实现的分布式任务队列框架,主要用于:

  • 管理分布式任务队列
  • 处理耗时任务
  • 支持任务队列方式的任务调度

核心特性:

  • 任务执行完全脱离主程序
  • 可分配到不同主机运行
  • 同时支持异步任务和定时任务

2 系统架构组成

Celery 采用三组件架构:

2.1 消息中间件 (Broker)

  • 任务调度队列,存储任务
  • 接收生产者发送的任务消息
  • 官方推荐使用 RabbitMQ 或 Redis
  • 示例配置:celery_broker = 'amqp://guest@127.0.0.1//'

2.2 任务执行单元 (Worker)

  • 实时监控消息队列
  • 获取并执行队列中的任务
  • 支持分布式部署多个 Worker

2.3 结果存储 (Backend)

  • 存储任务执行结果
  • 支持 RabbitMQ, Redis, MongoDB 等
  • 示例配置:celery_backend = 'amqp://guest@127.0.0.1//'

3 任务定义与调用

3.1 任务类型

  • 异步任务:业务逻辑触发后发往队列
  • 定时任务:由 Celery Beat 周期性触发

3.2 任务定义示例

python
# tasks.py
from config import app

@app.task
def add(x, y):
    return x + y

@app.task
def sub(x, y):
    return x - y

3.3 任务调用方式

python
# main.py
from tasks import add

async_result = add.apply_async((1, 100))  # 异步调用

# 结果状态检查
if async_result.successful():
    result = async_result.get()
elif async_result.failed():
    print('任务失败')

4 工作流程解析

  1. 任务提交:生产者调用 apply_async() 提交任务
  2. 队列存储:Broker 接收并存储任务
  3. 任务获取:Worker 从 Broker 拉取任务
  4. 任务执行:Worker 执行具体任务逻辑
  5. 结果存储:执行结果存入 Backend
  6. 结果查询:生产者通过 AsyncResult 对象查询结果

5 应用场景

5.1 异步任务场景

  • 发送短信/邮件
  • 消息推送
  • 音视频处理
  • 大数据处理

5.2 定时任务场景

  • 每日数据统计报表
  • 定期数据清理
  • 周期性系统检查

5.3 Web 框架集成(Tornado 示例)

python
# tornado_celery.py
import tcelery
import tornado.gen
from tasks import add

tcelery.setup_nonblocking_producer()

class CheckHandler(tornado.web.RequestHandler):
    @tornado.gen.coroutine
    def get(self):
        x = int(self.get_argument('x', '1'))
        y = int(self.get_argument('y', '2'))
        response = yield tornado.gen.Task(add.apply_async, args=[x, y])
        self.write({'results': response.result})

6 配置与最佳实践

6.1 基础配置 (config.py)

python
from celery import Celery
from datetime import timedelta

app = Celery('celery', 
             broker='amqp://guest@127.0.0.1//',
             backend='amqp://guest@127.0.0.1//')

app.conf.update(
    CELERY_ACKS_LATE=True,  # 允许重试
    CELERYD_CONCURRENCY=4,  # 并发 Worker 数
    CELERYD_MAX_TASKS_PER_CHILD=500,  # 防内存泄漏
    CELERYD_TASK_TIME_LIMIT=360  # 超时时间(秒)
)

# 定时任务配置
app.conf.beat_schedule = {
    'sub_task': {
        'task': 'tasks.sub',
        'schedule': timedelta(seconds=30),
        'args': (300, 150)
    }
}

6.2 最佳实践

  1. 资源管理:使用信号机制管理资源

    python
    @worker_process_init.connect
    def init_worker(*args, **kwargs):
        # 初始化数据库连接等资源
    
    @worker_process_shutdown.connect
    def release_worker(*args, **kwargs):
        # 释放资源
  2. 任务监控:通过状态跟踪任务执行

    • PENDING:等待中
    • STARTED:已开始
    • RETRY:重试中
    • FAILED:失败
    • SUCCESS:成功
  3. 错误处理:自动重试机制

    python
    @app.task(bind=True, max_retries=3)
    def task_with_retry(self):
        try:
            # 业务代码
        except Exception as exc:
            raise self.retry(exc=exc)

7 系统启动与管理

7.1 启动 Worker

bash
celery -A config worker --loglevel=info

7.2 启动定时任务调度

bash
celery -A config beat --loglevel=info

7.3 监控建议

  • 使用 flower 组件进行实时监控
  • 日志级别设置为 info 以上
  • 定期检查任务积压情况

本文内容整理自知乎文章:https://zhuanlan.zhihu.com/p/319426628 Celery 官方文档:https://docs.celeryq.dev/en/stable/