Skip to content

创建异步子进程

1. 发起异步进程

使用 asyncio.create_subprocess_shell() 可以在异步环境中创建子进程:

python
import asyncio

async def run(cmd: str | bytes):
    proc = await asyncio.create_subprocess_shell(
        cmd,
        stdout=asyncio.subprocess.PIPE,
        stderr=asyncio.subprocess.PIPE,
    )
    stdout, stderr = await proc.communicate()

    print(f"{cmd!r} exited with {proc.returncode}")
    if stdout:
        print(f"[stdout]\n{stdout.decode()}")
    if stderr:
        print(f"[stderr]\n{stderr.decode()}")

asyncio.run(run("ls /"))

2. 使用 create_subprocess_exec

除了 create_subprocess_shell(),还可以使用 create_subprocess_exec() 直接执行程序而不通过 shell:

python
import asyncio

async def run_command():
    # 使用 exec 方式,参数需要分开传递
    proc = await asyncio.create_subprocess_exec(
        'python', '-c', 'print("Hello from subprocess")',
        stdout=asyncio.subprocess.PIPE,
        stderr=asyncio.subprocess.PIPE
    )
    
    stdout, stderr = await proc.communicate()
    
    if stdout:
        print(f"stdout: {stdout.decode()}")
    if stderr:
        print(f"stderr: {stderr.decode()}")
    
    return proc.returncode

asyncio.run(run_command())

create_subprocess_exec()create_subprocess_shell() 更安全,因为它不会通过 shell 解释命令,避免了 shell 注入攻击。

3. 实时读取子进程输出

可以逐行读取子进程的输出:

python
import asyncio

async def read_stream(stream, prefix):
    """实时读取流并打印"""
    while True:
        line = await stream.readline()
        if not line:
            break
        print(f"{prefix}: {line.decode().strip()}")

async def run_with_realtime_output(cmd):
    proc = await asyncio.create_subprocess_shell(
        cmd,
        stdout=asyncio.subprocess.PIPE,
        stderr=asyncio.subprocess.PIPE
    )
    
    # 同时读取 stdout 和 stderr
    await asyncio.gather(
        read_stream(proc.stdout, "STDOUT"),
        read_stream(proc.stderr, "STDERR")
    )
    
    await proc.wait()
    return proc.returncode

asyncio.run(run_with_realtime_output("python -c 'import time; print(1); time.sleep(1); print(2)'"))

4. 向子进程发送输入

可以通过 stdin 向子进程发送数据:

python
import asyncio

async def send_input():
    proc = await asyncio.create_subprocess_exec(
        'python', '-c', 'import sys; print(f"Got: {sys.stdin.read()}")',
        stdin=asyncio.subprocess.PIPE,
        stdout=asyncio.subprocess.PIPE,
        stderr=asyncio.subprocess.PIPE
    )
    
    # 发送输入并等待结果
    stdout, stderr = await proc.communicate(input=b"Hello, subprocess!\n")
    
    print(stdout.decode())
    return proc.returncode

asyncio.run(send_input())

5. 超时控制

使用 asyncio.wait_for() 可以为子进程设置超时:

python
import asyncio

async def run_with_timeout(cmd, timeout=5.0):
    try:
        proc = await asyncio.create_subprocess_shell(
            cmd,
            stdout=asyncio.subprocess.PIPE,
            stderr=asyncio.subprocess.PIPE
        )
        
        stdout, stderr = await asyncio.wait_for(
            proc.communicate(),
            timeout=timeout
        )
        
        return stdout.decode(), stderr.decode(), proc.returncode
    except asyncio.TimeoutError:
        print(f"Command timed out after {timeout} seconds")
        proc.kill()
        await proc.wait()
        raise

# 运行一个可能超时的命令
try:
    result = asyncio.run(run_with_timeout("sleep 10", timeout=2.0))
except asyncio.TimeoutError:
    print("Command was killed due to timeout")

6. 并发运行多个子进程

可以同时运行多个子进程:

python
import asyncio

async def run_multiple_commands():
    commands = [
        "echo 'Command 1'",
        "echo 'Command 2'",
        "echo 'Command 3'"
    ]
    
    async def run_cmd(cmd):
        proc = await asyncio.create_subprocess_shell(
            cmd,
            stdout=asyncio.subprocess.PIPE,
            stderr=asyncio.subprocess.PIPE
        )
        stdout, stderr = await proc.communicate()
        return cmd, stdout.decode(), proc.returncode
    
    results = await asyncio.gather(*[run_cmd(cmd) for cmd in commands])
    
    for cmd, output, returncode in results:
        print(f"Command: {cmd}")
        print(f"Output: {output}")
        print(f"Return code: {returncode}")
        print("-" * 40)

asyncio.run(run_multiple_commands())

7. Windows 下的限制和解决方案

7.1 事件循环策略

在 Windows 上,默认的事件循环策略(SelectorEventLoop)不支持异步子进程。有以下解决方案:

方案一:使用 ProactorEventLoop

python
import asyncio
import sys

if sys.platform == 'win32':
    # Windows 下需要使用 ProactorEventLoop
    asyncio.set_event_loop_policy(asyncio.WindowsProactorEventLoopPolicy())

async def run_on_windows(cmd):
    proc = await asyncio.create_subprocess_shell(
        cmd,
        stdout=asyncio.subprocess.PIPE,
        stderr=asyncio.subprocess.PIPE
    )
    stdout, stderr = await proc.communicate()
    return stdout.decode()

asyncio.run(run_on_windows("dir"))

方案二:使用 Python 3.8+ 的默认设置

Python 3.8+ 在 Windows 上默认使用 ProactorEventLoop,不需要手动设置:

python
import asyncio

# Python 3.8+ 在 Windows 上可以直接使用
async def run_on_windows():
    proc = await asyncio.create_subprocess_shell(
        "dir",
        stdout=asyncio.subprocess.PIPE,
        stderr=asyncio.subprocess.PIPE
    )
    stdout, stderr = await proc.communicate()
    print(stdout.decode())

asyncio.run(run_on_windows())

7.2 ProactorEventLoop 的兼容性问题

虽然 ProactorEventLoop 支持子进程,但某些第三方库可能不兼容。如果遇到兼容性问题,可以考虑:

使用同步的 subprocess 模块:在线程池中运行同步的子进程操作

python
import asyncio
import subprocess
from concurrent.futures import ThreadPoolExecutor

async def run_sync_subprocess(cmd):
    loop = asyncio.get_event_loop()
    with ThreadPoolExecutor() as pool:
        result = await loop.run_in_executor(
            pool,
            subprocess.run,
            cmd,
            True,  # shell=True
            subprocess.PIPE,  # stdout
            subprocess.PIPE   # stderr
        )
    return result.stdout.decode()

asyncio.run(run_sync_subprocess("dir"))

使用专门的库:如 asyncio-subprocess 或在特定情况下回退到同步实现

7.3 路径和命令差异

Windows 和 Unix 系统的命令和路径格式不同,编写跨平台代码时需要注意:

python
import asyncio
import sys
import os

async def run_ls():
    # 根据操作系统选择命令
    if sys.platform == 'win32':
        cmd = 'dir'
    else:
        cmd = 'ls -la'
    
    proc = await asyncio.create_subprocess_shell(
        cmd,
        stdout=asyncio.subprocess.PIPE,
        stderr=asyncio.subprocess.PIPE
    )
    stdout, stderr = await proc.communicate()
    print(stdout.decode())

# 使用 pathlib 处理路径以保证跨平台
from pathlib import Path

async def run_python_script():
    script_path = Path(__file__).parent / "script.py"
    
    proc = await asyncio.create_subprocess_exec(
        sys.executable,  # 使用当前 Python 解释器
        str(script_path),
        stdout=asyncio.subprocess.PIPE,
        stderr=asyncio.subprocess.PIPE
    )
    stdout, stderr = await proc.communicate()
    return stdout.decode()

8. 错误处理最佳实践

python
import asyncio

async def robust_run_command(cmd):
    try:
        proc = await asyncio.create_subprocess_shell(
            cmd,
            stdout=asyncio.subprocess.PIPE,
            stderr=asyncio.subprocess.PIPE
        )
        
        stdout, stderr = await proc.communicate()
        
        if proc.returncode != 0:
            print(f"Command failed with return code {proc.returncode}")
            print(f"stderr: {stderr.decode()}")
            return None
        
        return stdout.decode()
    except Exception as e:
        print(f"Error running command: {e}")
        return None

asyncio.run(robust_run_command("python --version"))

9. 参考资料

  1. Python 官方文档 - asyncio 子进程
  2. Python 官方文档 - subprocess 模块
  3. asyncio 在 Windows 上的注意事项