Skip to content

Python 异步 IO 编程指南

1. 异步 IO 简介

1.1 什么是异步 IO

异步 IO(Asynchronous I/O)是一种编程模式,允许程序在等待 I/O 操作(如网络请求、文件读写)完成时不阻塞执行,而是继续执行其他任务。这可以大幅提升程序的并发性能。

1.2 为什么需要异步 IO

在传统的同步编程中:

python
import time

def task1():
    time.sleep(2)  # 模拟 I/O 操作
    return "Task 1 完成"

def task2():
    time.sleep(2)  # 模拟 I/O 操作
    return "Task 2 完成"

# 串行执行,总共需要 4 秒
result1 = task1()
result2 = task2()

使用异步 IO 可以并发执行:

python
import asyncio

async def task1():
    await asyncio.sleep(2)  # 异步等待
    return "Task 1 完成"

async def task2():
    await asyncio.sleep(2)  # 异步等待
    return "Task 2 完成"

# 并发执行,只需要 2 秒
results = await asyncio.gather(task1(), task2())

2. Python asyncio 基础

2.1 核心概念

协程(Coroutine):

  • 使用 async def 定义的函数
  • 调用协程函数返回协程对象
  • 需要使用 await 来执行

事件循环(Event Loop):

  • 管理和调度协程的执行
  • asyncio.run() 创建并运行事件循环

await 关键字:

  • 用于等待协程、Future 或 Task 完成
  • 只能在 async 函数内使用

2.2 基本使用

python
import asyncio

# 定义异步函数
async def hello():
    print("Hello")
    await asyncio.sleep(1)
    print("World")

# 运行异步函数
asyncio.run(hello())

2.3 并发执行多个协程

python
import asyncio

async def fetch_data(id):
    print(f"开始获取数据 {id}")
    await asyncio.sleep(2)
    print(f"完成获取数据 {id}")
    return f"数据 {id}"

async def main():
    # 方法1:使用 gather 并发执行
    results = await asyncio.gather(
        fetch_data(1),
        fetch_data(2),
        fetch_data(3)
    )
    print(results)
    
    # 方法2:使用 create_task
    task1 = asyncio.create_task(fetch_data(4))
    task2 = asyncio.create_task(fetch_data(5))
    await task1
    await task2

asyncio.run(main())

3. 在异步环境中运行同步代码

3.1 使用 asyncio.to_thread

Python 3.9+ 提供了 asyncio.to_thread() 函数,可以在线程池中运行同步函数:

python
import asyncio
from time import sleep

def blocking_task():
    """同步阻塞函数"""
    print("开始耗时任务")
    sleep(5)  # 模拟阻塞操作
    print("完成耗时任务")
    return "结果"

async def async_task():
    """异步包装"""
    result = await asyncio.to_thread(blocking_task)
    return result

async def main():
    # 并发运行多个阻塞任务
    results = await asyncio.gather(
        async_task(),
        async_task(),
        async_task()
    )
    print(results)

asyncio.run(main())

3.2 嵌套的异步和同步调用

处理复杂的嵌套调用场景:

python
import asyncio
from time.sleep

def sync_task():
    """同步函数"""
    sleep(2)
    print("同步任务完成")

async def async_wrapper():
    """异步包装器"""
    await asyncio.to_thread(sync_task)

def sync_caller():
    """同步调用者,需要运行异步函数"""
    asyncio.run(async_wrapper())

async def async_caller():
    """异步调用者,需要运行同步函数"""
    await asyncio.to_thread(sync_caller())

# 主入口
def main():
    asyncio.run(async_caller())

if __name__ == "__main__":
    main()

3.3 完整示例

基于您提供的测试代码的完整示例:

python
import asyncio
from time import sleep

def task():
    """同步阻塞任务"""
    print("执行同步任务...")
    sleep(5)
    print("同步任务完成")

async def async_task():
    """将同步任务转为异步"""
    await asyncio.to_thread(task)

def obj():
    """同步函数,调用异步任务"""
    print("obj: 开始")
    asyncio.run(async_task())
    print("obj: 结束")

async def async_obj():
    """异步包装 obj"""
    await asyncio.to_thread(obj)

async def main():
    """主异步函数"""
    print("main: 开始")
    await async_obj()
    print("main: 结束")

if __name__ == "__main__":
    asyncio.run(main())

4. 常见模式和最佳实践

4.1 异步上下文管理器

python
class AsyncResource:
    async def __aenter__(self):
        print("获取资源")
        await asyncio.sleep(1)
        return self
    
    async def __aexit__(self, exc_type, exc_val, exc_tb):
        print("释放资源")
        await asyncio.sleep(1)

async def use_resource():
    async with AsyncResource() as resource:
        print("使用资源")

4.2 异步迭代器

python
class AsyncIterator:
    def __init__(self, max_count):
        self.max_count = max_count
        self.current = 0
    
    def __aiter__(self):
        return self
    
    async def __anext__(self):
        if self.current >= self.max_count:
            raise StopAsyncIteration
        await asyncio.sleep(0.5)
        self.current += 1
        return self.current

async def iterate():
    async for item in AsyncIterator(5):
        print(f"Item: {item}")

4.3 超时控制

python
async def long_running_task():
    await asyncio.sleep(10)
    return "完成"

async def with_timeout():
    try:
        result = await asyncio.wait_for(
            long_running_task(),
            timeout=5.0
        )
        print(result)
    except asyncio.TimeoutError:
        print("任务超时")

4.4 异常处理

python
async def task_with_error():
    await asyncio.sleep(1)
    raise ValueError("出错了")

async def handle_errors():
    try:
        await task_with_error()
    except ValueError as e:
        print(f"捕获异常: {e}")

# 处理多个任务的异常
async def gather_with_errors():
    results = await asyncio.gather(
        task1(),
        task2(),
        return_exceptions=True  # 返回异常而不是抛出
    )
    for result in results:
        if isinstance(result, Exception):
            print(f"任务失败: {result}")

5. 性能对比

5.1 同步 vs 异步

python
import time
import asyncio

# 同步版本
def sync_version():
    start = time.time()
    for i in range(5):
        time.sleep(1)
    print(f"同步耗时: {time.time() - start:.2f}秒")

# 异步版本
async def async_version():
    start = time.time()
    await asyncio.gather(*[asyncio.sleep(1) for _ in range(5)])
    print(f"异步耗时: {time.time() - start:.2f}秒")

# 同步:约 5 秒
sync_version()

# 异步:约 1 秒
asyncio.run(async_version())

6. 常见陷阱

6.1 在同步函数中使用 await

错误:

python
def sync_function():
    await asyncio.sleep(1)  # SyntaxError

正确:

python
async def async_function():
    await asyncio.sleep(1)

6.2 阻塞事件循环

错误:

python
async def bad_async():
    time.sleep(5)  # 阻塞整个事件循环

正确:

python
async def good_async():
    await asyncio.to_thread(time.sleep, 5)  # 在线程中执行

6.3 嵌套的 asyncio.run()

错误:

python
async def outer():
    asyncio.run(inner())  # RuntimeError

正确:

python
async def outer():
    await inner()

7. 常用异步库

7.1 HTTP 请求

python
# aiohttp - 异步 HTTP 客户端
import aiohttp

async def fetch(url):
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as response:
            return await response.text()

7.2 数据库

python
# asyncpg - 异步 PostgreSQL 客户端
import asyncpg

async def fetch_users():
    conn = await asyncpg.connect('postgresql://...')
    rows = await conn.fetch('SELECT * FROM users')
    await conn.close()
    return rows

7.3 文件 IO

python
# aiofiles - 异步文件操作
import aiofiles

async def read_file():
    async with aiofiles.open('file.txt', 'r') as f:
        content = await f.read()
        return content

8. 参考资料