Python 异步编程
转载
Python 3.4 引入了 asyncio 模块,增加了异步编程,跟 JavaScript 的 async/await 极为类似,大大方便了异步任务的处理。
Python 的异步编程
历史上,Python 并不支持专门的异步编程语法,因为不需要。
有了多线程(threading)和多进程(multiprocessing),就没必要一定支持异步了。如果一个线程(或进程)阻塞,新建其他线程(或进程)就可以了,程序不会卡死。
但是,多线程有 “线程竞争” 的问题,处理起来很复杂,还涉及加锁。对于简单的异步任务来说(比如与网页互动),写起来很麻烦。
asyncio 的设计
asyncio 模块最大特点就是,只存在一个线程。
由于只有一个线程,就不可能多个任务同时运行。asyncio 是 多任务合作 模式(cooperative multitasking),允许异步任务交出执行权给其他任务,等到其他任务完成,再收回执行权继续往下执行,这和 JavaScript 也是一样的。
由于代码的执行权在多个任务之间交换,所以看上去好像多个任务同时运行,其实底层只有一个线程,多个任务分享运行时间。
asyncio 模块在单线程上启动一个事件循环(event loop),时刻监听新进入循环的事件,加以处理,并不断重复这个过程,直到异步任务结束。
asyncio API
下面介绍 asyncio 模块最主要的几个 API。注意,必须使用 Python 3.7 或更高版本,早期的语法已经改变了。
第一步,import 导入 asyncio 模块。
import asyncio第二步,函数前面加上 async 关键字,就变成了 async 函数。这种函数最大特点是执行可以暂停,交出执行权。
async def main():
pass第三步,在 async 函数内部的异步任务前面,加上 await 命令。
await asyncio.sleep(1)上面代码中,asyncio.sleep(1) 方法可以生成一个异步任务,休眠 1 秒钟然后结束。
执行引擎遇到 await 命令,就会在异步任务开始执行之后,暂停当前 async 函数的执行,把执行权交给其他任务。等到异步任务结束,再把执行权交回 async 函数,继续往下执行。
第四步,async.run() 方法加载 async 函数,启动事件循环。
asyncio.run(main())上面代码中,asyncio.run() 在事件循环上监听 async 函数 main() 的执行。等到 main() 执行完了,事件循环才会终止。
async 函数示例
import asyncio
async def count():
print('One')
await asyncio.sleep(1)
print('Two')
async def main():
await asyncio.gather(count(), count(), count())
asyncio.run(main())执行结果:
$ python async.py
One
One
One
Two
Two
Two脚本总的运行时间是 1 秒,而它们同步执行的时间是 3 秒。
使用 Event 进行协程同步
asyncio.Event 是一个异步事件对象,可以用于协程之间的同步。当事件被设置时,等待该事件的所有协程都会被唤醒。
import asyncio
import functools
def set_event(event: asyncio.Event):
print("setting event in callback")
event.set()
async def coro1(event: asyncio.Event):
print("coro1 waiting for event")
await event.wait()
print("coro1 triggered")
async def coro2(event: asyncio.Event):
print("coro2 waiting for event")
await event.wait()
print("coro2 triggered")
async def main(loop: asyncio.AbstractEventLoop):
# Create a shared event
event = asyncio.Event()
print("event start state: {}".format(event.is_set()))
loop.call_later(1, functools.partial(set_event, event))
await asyncio.gather(coro1(event), coro2(event))
print("event end state: {}".format(event.is_set()))
event_loop = asyncio.get_event_loop()
try:
event_loop.run_until_complete(main(event_loop))
finally:
event_loop.close()执行结果:
event start state: False
coro1 waiting for event
coro2 waiting for event
setting event in callback
coro1 triggered
coro2 triggered
event end state: True异步编程的常见模式
超时控制
使用 asyncio.wait_for() 可以为异步操作设置超时时间:
import asyncio
async def long_running_task():
await asyncio.sleep(10)
return "completed"
async def main():
try:
result = await asyncio.wait_for(long_running_task(), timeout=5.0)
print(result)
except asyncio.TimeoutError:
print("Task timed out!")
asyncio.run(main())并发限制
使用 asyncio.Semaphore 可以限制并发数量:
import asyncio
async def worker(sem: asyncio.Semaphore, num: int):
async with sem:
print(f"Worker {num} is running")
await asyncio.sleep(1)
print(f"Worker {num} is done")
async def main():
sem = asyncio.Semaphore(3) # 最多 3 个并发
tasks = [worker(sem, i) for i in range(10)]
await asyncio.gather(*tasks)
asyncio.run(main())任务取消
可以取消正在运行的异步任务:
import asyncio
async def cancellable_task():
try:
await asyncio.sleep(10)
except asyncio.CancelledError:
print("Task was cancelled")
raise
async def main():
task = asyncio.create_task(cancellable_task())
await asyncio.sleep(1)
task.cancel()
try:
await task
except asyncio.CancelledError:
print("Caught cancellation in main")
asyncio.run(main())异步编程最佳实践
避免阻塞调用
在异步代码中应避免使用阻塞的 I/O 操作或 CPU 密集型计算,这会阻塞整个事件循环:
# 错误示例
async def bad_example():
import time
time.sleep(1) # 这会阻塞事件循环!
# 正确示例
async def good_example():
await asyncio.sleep(1) # 使用异步睡眠使用 asyncio.run() 启动程序
Python 3.7+ 推荐使用 asyncio.run() 来启动异步程序,它会自动创建和关闭事件循环:
# 推荐写法
async def main():
await some_async_function()
asyncio.run(main())
# 不推荐的旧式写法
loop = asyncio.get_event_loop()
try:
loop.run_until_complete(main())
finally:
loop.close()合理使用 gather 和 create_task
asyncio.gather() 适合等待多个任务完成,asyncio.create_task() 适合创建后台任务:
async def main():
# gather: 等待所有任务完成
results = await asyncio.gather(
fetch_data(1),
fetch_data(2),
fetch_data(3)
)
# create_task: 创建后台任务
task = asyncio.create_task(background_work())
# 可以继续执行其他操作
await other_work()
# 需要时等待后台任务
await task错误处理
在异步编程中,异常处理需要特别注意:
import asyncio
async def may_fail():
raise ValueError("Something went wrong")
async def main():
try:
await may_fail()
except ValueError as e:
print(f"Caught error: {e}")
# 使用 gather 时的异常处理
results = await asyncio.gather(
may_fail(),
asyncio.sleep(1),
return_exceptions=True # 不抛出异常,而是返回异常对象
)
for result in results:
if isinstance(result, Exception):
print(f"Task failed: {result}")
asyncio.run(main())