Skip to content

Python 异步编程

Python 3.4 引入了 asyncio 模块,增加了异步编程,跟 JavaScript 的 async/await 极为类似,大大方便了异步任务的处理。

Python 的异步编程

历史上,Python 并不支持专门的异步编程语法,因为不需要。

有了多线程(threading)和多进程(multiprocessing),就没必要一定支持异步了。如果一个线程(或进程)阻塞,新建其他线程(或进程)就可以了,程序不会卡死。

但是,多线程有 “线程竞争” 的问题,处理起来很复杂,还涉及加锁。对于简单的异步任务来说(比如与网页互动),写起来很麻烦。

asyncio 的设计

asyncio 模块最大特点就是,只存在一个线程。

由于只有一个线程,就不可能多个任务同时运行。asyncio多任务合作 模式(cooperative multitasking),允许异步任务交出执行权给其他任务,等到其他任务完成,再收回执行权继续往下执行,这和 JavaScript 也是一样的。

由于代码的执行权在多个任务之间交换,所以看上去好像多个任务同时运行,其实底层只有一个线程,多个任务分享运行时间。

asyncio 模块在单线程上启动一个事件循环(event loop),时刻监听新进入循环的事件,加以处理,并不断重复这个过程,直到异步任务结束。

asyncio API

下面介绍 asyncio 模块最主要的几个 API。注意,必须使用 Python 3.7 或更高版本,早期的语法已经改变了。

第一步,import 导入 asyncio 模块。

python
import asyncio

第二步,函数前面加上 async 关键字,就变成了 async 函数。这种函数最大特点是执行可以暂停,交出执行权。

python
async def main():
    pass

第三步,在 async 函数内部的异步任务前面,加上 await 命令。

python
await asyncio.sleep(1)

上面代码中,asyncio.sleep(1) 方法可以生成一个异步任务,休眠 1 秒钟然后结束。

执行引擎遇到 await 命令,就会在异步任务开始执行之后,暂停当前 async 函数的执行,把执行权交给其他任务。等到异步任务结束,再把执行权交回 async 函数,继续往下执行。

第四步,async.run() 方法加载 async 函数,启动事件循环。

python
asyncio.run(main())

上面代码中,asyncio.run() 在事件循环上监听 async 函数 main() 的执行。等到 main() 执行完了,事件循环才会终止。

async 函数示例

python
import asyncio

async def count():
    print('One')
    await asyncio.sleep(1)
    print('Two')

async def main():
    await asyncio.gather(count(), count(), count())

asyncio.run(main())

执行结果:

console
$ python async.py
One
One
One
Two
Two
Two

脚本总的运行时间是 1 秒,而它们同步执行的时间是 3 秒。

使用 Event 进行协程同步

asyncio.Event 是一个异步事件对象,可以用于协程之间的同步。当事件被设置时,等待该事件的所有协程都会被唤醒。

python
import asyncio
import functools

def set_event(event: asyncio.Event):
    print("setting event in callback")
    event.set()

async def coro1(event: asyncio.Event):
    print("coro1 waiting for event")
    await event.wait()
    print("coro1 triggered")

async def coro2(event: asyncio.Event):
    print("coro2 waiting for event")
    await event.wait()
    print("coro2 triggered")

async def main(loop: asyncio.AbstractEventLoop):
    # Create a shared event
    event = asyncio.Event()
    print("event start state: {}".format(event.is_set()))

    loop.call_later(1, functools.partial(set_event, event))

    await asyncio.gather(coro1(event), coro2(event))
    print("event end state: {}".format(event.is_set()))

event_loop = asyncio.get_event_loop()
try:
    event_loop.run_until_complete(main(event_loop))
finally:
    event_loop.close()

执行结果:

text
event start state: False
coro1 waiting for event
coro2 waiting for event
setting event in callback
coro1 triggered
coro2 triggered
event end state: True

异步编程的常见模式

超时控制

使用 asyncio.wait_for() 可以为异步操作设置超时时间:

python
import asyncio

async def long_running_task():
    await asyncio.sleep(10)
    return "completed"

async def main():
    try:
        result = await asyncio.wait_for(long_running_task(), timeout=5.0)
        print(result)
    except asyncio.TimeoutError:
        print("Task timed out!")

asyncio.run(main())

并发限制

使用 asyncio.Semaphore 可以限制并发数量:

python
import asyncio

async def worker(sem: asyncio.Semaphore, num: int):
    async with sem:
        print(f"Worker {num} is running")
        await asyncio.sleep(1)
        print(f"Worker {num} is done")

async def main():
    sem = asyncio.Semaphore(3)  # 最多 3 个并发
    tasks = [worker(sem, i) for i in range(10)]
    await asyncio.gather(*tasks)

asyncio.run(main())

任务取消

可以取消正在运行的异步任务:

python
import asyncio

async def cancellable_task():
    try:
        await asyncio.sleep(10)
    except asyncio.CancelledError:
        print("Task was cancelled")
        raise

async def main():
    task = asyncio.create_task(cancellable_task())
    await asyncio.sleep(1)
    task.cancel()
    try:
        await task
    except asyncio.CancelledError:
        print("Caught cancellation in main")

asyncio.run(main())

异步编程最佳实践

避免阻塞调用

在异步代码中应避免使用阻塞的 I/O 操作或 CPU 密集型计算,这会阻塞整个事件循环:

python
# 错误示例
async def bad_example():
    import time
    time.sleep(1)  # 这会阻塞事件循环!

# 正确示例
async def good_example():
    await asyncio.sleep(1)  # 使用异步睡眠

使用 asyncio.run() 启动程序

Python 3.7+ 推荐使用 asyncio.run() 来启动异步程序,它会自动创建和关闭事件循环:

python
# 推荐写法
async def main():
    await some_async_function()

asyncio.run(main())

# 不推荐的旧式写法
loop = asyncio.get_event_loop()
try:
    loop.run_until_complete(main())
finally:
    loop.close()

合理使用 gather 和 create_task

asyncio.gather() 适合等待多个任务完成,asyncio.create_task() 适合创建后台任务:

python
async def main():
    # gather: 等待所有任务完成
    results = await asyncio.gather(
        fetch_data(1),
        fetch_data(2),
        fetch_data(3)
    )
    
    # create_task: 创建后台任务
    task = asyncio.create_task(background_work())
    # 可以继续执行其他操作
    await other_work()
    # 需要时等待后台任务
    await task

错误处理

在异步编程中,异常处理需要特别注意:

python
import asyncio

async def may_fail():
    raise ValueError("Something went wrong")

async def main():
    try:
        await may_fail()
    except ValueError as e:
        print(f"Caught error: {e}")

    # 使用 gather 时的异常处理
    results = await asyncio.gather(
        may_fail(),
        asyncio.sleep(1),
        return_exceptions=True  # 不抛出异常,而是返回异常对象
    )
    for result in results:
        if isinstance(result, Exception):
            print(f"Task failed: {result}")

asyncio.run(main())

参考资料

  1. Python 官方文档 - asyncio
  2. Python 官方文档 - asyncio 开发
  3. 阮一峰 - Python 异步编程入门
  4. 深入理解 Python asyncio
  5. Python asyncio 实践