Python 异步 IO 编程指南
1. 异步 IO 简介
1.1 什么是异步 IO
异步 IO(Asynchronous I/O)是一种编程模式,允许程序在等待 I/O 操作(如网络请求、文件读写)完成时不阻塞执行,而是继续执行其他任务。这可以大幅提升程序的并发性能。
1.2 为什么需要异步 IO
在传统的同步编程中:
python
import time
def task1():
time.sleep(2) # 模拟 I/O 操作
return "Task 1 完成"
def task2():
time.sleep(2) # 模拟 I/O 操作
return "Task 2 完成"
# 串行执行,总共需要 4 秒
result1 = task1()
result2 = task2()使用异步 IO 可以并发执行:
python
import asyncio
async def task1():
await asyncio.sleep(2) # 异步等待
return "Task 1 完成"
async def task2():
await asyncio.sleep(2) # 异步等待
return "Task 2 完成"
# 并发执行,只需要 2 秒
results = await asyncio.gather(task1(), task2())2. Python asyncio 基础
2.1 核心概念
协程(Coroutine):
- 使用
async def定义的函数 - 调用协程函数返回协程对象
- 需要使用
await来执行
事件循环(Event Loop):
- 管理和调度协程的执行
asyncio.run()创建并运行事件循环
await 关键字:
- 用于等待协程、Future 或 Task 完成
- 只能在
async函数内使用
2.2 基本使用
python
import asyncio
# 定义异步函数
async def hello():
print("Hello")
await asyncio.sleep(1)
print("World")
# 运行异步函数
asyncio.run(hello())2.3 并发执行多个协程
python
import asyncio
async def fetch_data(id):
print(f"开始获取数据 {id}")
await asyncio.sleep(2)
print(f"完成获取数据 {id}")
return f"数据 {id}"
async def main():
# 方法1:使用 gather 并发执行
results = await asyncio.gather(
fetch_data(1),
fetch_data(2),
fetch_data(3)
)
print(results)
# 方法2:使用 create_task
task1 = asyncio.create_task(fetch_data(4))
task2 = asyncio.create_task(fetch_data(5))
await task1
await task2
asyncio.run(main())3. 在异步环境中运行同步代码
3.1 使用 asyncio.to_thread
Python 3.9+ 提供了 asyncio.to_thread() 函数,可以在线程池中运行同步函数:
python
import asyncio
from time import sleep
def blocking_task():
"""同步阻塞函数"""
print("开始耗时任务")
sleep(5) # 模拟阻塞操作
print("完成耗时任务")
return "结果"
async def async_task():
"""异步包装"""
result = await asyncio.to_thread(blocking_task)
return result
async def main():
# 并发运行多个阻塞任务
results = await asyncio.gather(
async_task(),
async_task(),
async_task()
)
print(results)
asyncio.run(main())3.2 嵌套的异步和同步调用
处理复杂的嵌套调用场景:
python
import asyncio
from time.sleep
def sync_task():
"""同步函数"""
sleep(2)
print("同步任务完成")
async def async_wrapper():
"""异步包装器"""
await asyncio.to_thread(sync_task)
def sync_caller():
"""同步调用者,需要运行异步函数"""
asyncio.run(async_wrapper())
async def async_caller():
"""异步调用者,需要运行同步函数"""
await asyncio.to_thread(sync_caller())
# 主入口
def main():
asyncio.run(async_caller())
if __name__ == "__main__":
main()3.3 完整示例
基于您提供的测试代码的完整示例:
python
import asyncio
from time import sleep
def task():
"""同步阻塞任务"""
print("执行同步任务...")
sleep(5)
print("同步任务完成")
async def async_task():
"""将同步任务转为异步"""
await asyncio.to_thread(task)
def obj():
"""同步函数,调用异步任务"""
print("obj: 开始")
asyncio.run(async_task())
print("obj: 结束")
async def async_obj():
"""异步包装 obj"""
await asyncio.to_thread(obj)
async def main():
"""主异步函数"""
print("main: 开始")
await async_obj()
print("main: 结束")
if __name__ == "__main__":
asyncio.run(main())4. 常见模式和最佳实践
4.1 异步上下文管理器
python
class AsyncResource:
async def __aenter__(self):
print("获取资源")
await asyncio.sleep(1)
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
print("释放资源")
await asyncio.sleep(1)
async def use_resource():
async with AsyncResource() as resource:
print("使用资源")4.2 异步迭代器
python
class AsyncIterator:
def __init__(self, max_count):
self.max_count = max_count
self.current = 0
def __aiter__(self):
return self
async def __anext__(self):
if self.current >= self.max_count:
raise StopAsyncIteration
await asyncio.sleep(0.5)
self.current += 1
return self.current
async def iterate():
async for item in AsyncIterator(5):
print(f"Item: {item}")4.3 超时控制
python
async def long_running_task():
await asyncio.sleep(10)
return "完成"
async def with_timeout():
try:
result = await asyncio.wait_for(
long_running_task(),
timeout=5.0
)
print(result)
except asyncio.TimeoutError:
print("任务超时")4.4 异常处理
python
async def task_with_error():
await asyncio.sleep(1)
raise ValueError("出错了")
async def handle_errors():
try:
await task_with_error()
except ValueError as e:
print(f"捕获异常: {e}")
# 处理多个任务的异常
async def gather_with_errors():
results = await asyncio.gather(
task1(),
task2(),
return_exceptions=True # 返回异常而不是抛出
)
for result in results:
if isinstance(result, Exception):
print(f"任务失败: {result}")5. 性能对比
5.1 同步 vs 异步
python
import time
import asyncio
# 同步版本
def sync_version():
start = time.time()
for i in range(5):
time.sleep(1)
print(f"同步耗时: {time.time() - start:.2f}秒")
# 异步版本
async def async_version():
start = time.time()
await asyncio.gather(*[asyncio.sleep(1) for _ in range(5)])
print(f"异步耗时: {time.time() - start:.2f}秒")
# 同步:约 5 秒
sync_version()
# 异步:约 1 秒
asyncio.run(async_version())6. 常见陷阱
6.1 在同步函数中使用 await
❌ 错误:
python
def sync_function():
await asyncio.sleep(1) # SyntaxError✅ 正确:
python
async def async_function():
await asyncio.sleep(1)6.2 阻塞事件循环
❌ 错误:
python
async def bad_async():
time.sleep(5) # 阻塞整个事件循环✅ 正确:
python
async def good_async():
await asyncio.to_thread(time.sleep, 5) # 在线程中执行6.3 嵌套的 asyncio.run()
❌ 错误:
python
async def outer():
asyncio.run(inner()) # RuntimeError✅ 正确:
python
async def outer():
await inner()7. 常用异步库
7.1 HTTP 请求
python
# aiohttp - 异步 HTTP 客户端
import aiohttp
async def fetch(url):
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
return await response.text()7.2 数据库
python
# asyncpg - 异步 PostgreSQL 客户端
import asyncpg
async def fetch_users():
conn = await asyncpg.connect('postgresql://...')
rows = await conn.fetch('SELECT * FROM users')
await conn.close()
return rows7.3 文件 IO
python
# aiofiles - 异步文件操作
import aiofiles
async def read_file():
async with aiofiles.open('file.txt', 'r') as f:
content = await f.read()
return content8. 参考资料
- https://docs.python.org/zh-cn/3/library/asyncio.html - Python asyncio 官方文档
- https://realpython.com/async-io-python/ - Real Python: Async IO in Python
- https://github.com/aio-libs - 异步库集合
- https://www.python.org/dev/peps/pep-0492/ - PEP 492: async/await 语法