Python 协程
1. 协程的概念
协程是子例程的更一般形式。
子例程可以在某一点进入并在另一点退出。协程则可以在许多不同的点上进入、退出和恢复。它们可通过 async def 语句来实现。参见 PEP 492[1]。
- @3.5+ 协程(coroutine)可以在多个位置上挂起和恢复执行
await表达式,async for以及async with只能在协程函数体中使用- 使用
async def语法定义的函数总是为协程函数,即使它们不包含await或async关键字 - 在协程函数体中使用
yield from表达式将引发SyntaxError
详细解释见 官方文档语言参考手册。
协程与普通函数的区别在于:
- 普通函数:调用时立即执行,返回结果或抛出异常
- 协程函数:调用时返回一个协程对象,需要通过事件循环执行
# 普通函数
def normal_function():
return "result"
# 协程函数
async def coroutine_function():
return "result"
# 调用对比
result1 = normal_function() # 立即执行,返回 "result"
result2 = coroutine_function() # 返回协程对象,不会执行
# 需要使用 await 或事件循环执行
import asyncio
result3 = asyncio.run(coroutine_function()) # 执行协程,返回 "result"2. 协程函数
返回一个 Coroutine 对象的函数。
协程函数可通过 async def 语句来定义,并可能包含 await、async for 和 async with 关键字。这些特性是由 PEP 492 引入的。
2.1 基本协程函数示例
import asyncio
async def fetch_data(id: int):
print(f"Fetching data {id}...")
await asyncio.sleep(1) # 模拟 I/O 操作
print(f"Data {id} fetched")
return f"Data {id}"
async def main():
# 顺序执行
result1 = await fetch_data(1)
result2 = await fetch_data(2)
print(f"Results: {result1}, {result2}")
asyncio.run(main())2.2 并发执行协程
import asyncio
async def fetch_data(id: int):
print(f"Fetching data {id}...")
await asyncio.sleep(1)
print(f"Data {id} fetched")
return f"Data {id}"
async def main():
# 并发执行
results = await asyncio.gather(
fetch_data(1),
fetch_data(2),
fetch_data(3)
)
print(f"Results: {results}")
asyncio.run(main())3. PEP 492
PEP 是 Python 语言发展的提案。
PEP 492 提出使用 async 和 await 语法实现协程,将协程作为 Python 中的一个正式的单独概念,并增加相应的支持语法。
该提案在 Python 3.5 版本实现。
3.1 PEP 492 引入的特性
PEP 492 引入了以下关键特性:
- 原生协程类型:使用
async def定义的协程是独立的类型 - await 表达式:用于等待协程、Future 或 Task 对象
- async with:异步上下文管理器
- async for:异步迭代器
import asyncio
# async with 示例
class AsyncResource:
async def __aenter__(self):
print("Acquiring resource")
await asyncio.sleep(0.1)
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
print("Releasing resource")
await asyncio.sleep(0.1)
async def use_resource():
async with AsyncResource() as resource:
print("Using resource")
# async for 示例
class AsyncCounter:
def __init__(self, max):
self.max = max
self.current = 0
def __aiter__(self):
return self
async def __anext__(self):
if self.current < self.max:
await asyncio.sleep(0.1)
self.current += 1
return self.current
raise StopAsyncIteration
async def count():
async for num in AsyncCounter(5):
print(num)
async def main():
await use_resource()
await count()
asyncio.run(main())4. 协程的生命周期
协程对象有以下几种状态:
import asyncio
async def example_coroutine():
print("Coroutine is running")
await asyncio.sleep(1)
return "Done"
# 创建协程对象(但未执行)
coro = example_coroutine()
print(f"Coroutine object: {coro}")
# 执行协程
async def main():
result = await coro
print(f"Result: {result}")
asyncio.run(main())注意:如果创建了协程对象但从未 await 它,Python 会发出警告。
5. 实用技巧
5.1 在异步代码中调用同步函数
对于一些同步函数,如果我们在异步代码中直接调用这些函数,会导致事件循环被阻塞,从而影响整个程序的性能。因此最好的方法是将这些函数放在线程中运行。
GIL 限制
由于 GIL 的存在,asyncio.to_thread() 通常不会对 CPU 密集型函数产生显著的性能提升,故通常只能被用来将 I/O 密集型函数变为非阻塞的。但是,对于会释放 GIL 的扩展模块或无此限制的替代性 Python 实现来说,asyncio.to_thread() 也可被用于 CPU 密集型函数。
在 Python 3.9 之后,我们可以使用 asyncio.to_thread() 函数来在异步代码中调用同步函数[2]。这会默认使用 Python 的 ThreadPoolExecutor 来运行函数。
示例:
import asyncio
async def async_task():
print("Start task")
await asyncio.sleep(1)
print("End task")
if __name__ == "__main__":
asyncio.run(async_task())Python 3.8 及更早版本的替代方法:
import asyncio
from concurrent.futures import ThreadPoolExecutor
def blocking_io():
# 模拟阻塞 I/O
import time
time.sleep(1)
return "Done"
async def main():
loop = asyncio.get_event_loop()
with ThreadPoolExecutor() as pool:
result = await loop.run_in_executor(pool, blocking_io)
print(result)
asyncio.run(main())5.2 在同步代码中调用异步函数
有时,我们需要在同步代码中调用异步函数,这时我们可以使用 asyncio.run() 函数来运行异步函数。
import asyncio
async def async_task():
print("Start task")
await asyncio.sleep(1)
print("End task")
if __name__ == "__main__":
asyncio.run(async_task())但是,以上这种情况仅适用于代码在单个线程中运行,如果代码在多个线程中运行,我们可以使用 asyncio.run_coroutine_threadsafe() 函数来运行异步函数。
注意,此时协程必须运行在一个正在工作的事件循环中,否则会引发 RuntimeError 异常或者死锁。因此下面的代码用于在一个线程调用正在另一个线程中运行的事件循环。
import asyncio
from typing import Any, Coroutine, TypeVar
T = TypeVar("T")
def coro_to_sync(
coro: Coroutine[Any, Any, T],
loop: asyncio.AbstractEventLoop | None = None,
) -> T:
"""将协程转换为同步函数,此函数只由主线程调用"""
loop = loop or asyncio.get_event_loop()
future = asyncio.run_coroutine_threadsafe(coro, loop)
return future.result()5.3 协程的取消
可以取消正在运行的协程:
import asyncio
async def long_running_task():
try:
while True:
await asyncio.sleep(1)
print("Task is running...")
except asyncio.CancelledError:
print("Task was cancelled")
raise
async def main():
task = asyncio.create_task(long_running_task())
# 让任务运行一会儿
await asyncio.sleep(3)
# 取消任务
task.cancel()
try:
await task
except asyncio.CancelledError:
print("Caught cancellation in main")
asyncio.run(main())5.4 协程超时
使用 asyncio.wait_for() 设置超时:
import asyncio
async def slow_operation():
await asyncio.sleep(10)
return "Done"
async def main():
try:
result = await asyncio.wait_for(slow_operation(), timeout=2.0)
print(result)
except asyncio.TimeoutError:
print("Operation timed out")
asyncio.run(main())5.5 协程屏蔽取消
使用 asyncio.shield() 可以保护协程不被取消:
import asyncio
async def critical_operation():
await asyncio.sleep(2)
return "Critical operation completed"
async def main():
task = asyncio.create_task(
asyncio.shield(critical_operation())
)
await asyncio.sleep(1)
task.cancel()
try:
result = await task
print(result)
except asyncio.CancelledError:
print("Task was cancelled, but critical operation continues")
asyncio.run(main())6. 协程与生成器的对比
虽然协程和生成器在语法上有相似之处(都可以暂停和恢复),但它们的用途不同:
| 特性 | 生成器 | 协程 |
|---|---|---|
| 定义方式 | def + yield | async def + await |
| 执行方式 | next() 或 for 循环 | await 或事件循环 |
| 主要用途 | 惰性迭代,生成数据序列 | 异步 I/O,并发编程 |
| 返回值 | 通过 yield 产生多个值 | 通过 return 返回单个值 |
| 暂停机制 | 交出值后暂停 | 等待异步操作完成后暂停 |
# 生成器示例
def generator_example():
for i in range(3):
yield i
for value in generator_example():
print(value)
# 协程示例
async def coroutine_example():
for i in range(3):
await asyncio.sleep(0.1)
print(i)
asyncio.run(coroutine_example())7. 常见错误和陷阱
7.1 忘记 await
import asyncio
async def get_data():
await asyncio.sleep(1)
return "data"
async def wrong():
# 错误:忘记 await,result 是协程对象而不是结果
result = get_data()
print(result) # <coroutine object get_data at 0x...>
async def correct():
# 正确:使用 await
result = await get_data()
print(result) # "data"
asyncio.run(correct())7.2 在同步代码中使用 await
# 错误:不能在非异步函数中使用 await
def wrong():
result = await some_coroutine() # SyntaxError
# 正确:使用 asyncio.run()
def correct():
result = asyncio.run(some_coroutine())7.3 阻塞事件循环
import asyncio
import time
async def wrong():
# 错误:使用同步的 sleep 会阻塞事件循环
time.sleep(1)
async def correct():
# 正确:使用异步的 sleep
await asyncio.sleep(1)