创建异步子进程
1. 发起异步进程
使用 asyncio.create_subprocess_shell() 可以在异步环境中创建子进程:
python
import asyncio
async def run(cmd: str | bytes):
proc = await asyncio.create_subprocess_shell(
cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
stdout, stderr = await proc.communicate()
print(f"{cmd!r} exited with {proc.returncode}")
if stdout:
print(f"[stdout]\n{stdout.decode()}")
if stderr:
print(f"[stderr]\n{stderr.decode()}")
asyncio.run(run("ls /"))2. 使用 create_subprocess_exec
除了 create_subprocess_shell(),还可以使用 create_subprocess_exec() 直接执行程序而不通过 shell:
python
import asyncio
async def run_command():
# 使用 exec 方式,参数需要分开传递
proc = await asyncio.create_subprocess_exec(
'python', '-c', 'print("Hello from subprocess")',
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
stdout, stderr = await proc.communicate()
if stdout:
print(f"stdout: {stdout.decode()}")
if stderr:
print(f"stderr: {stderr.decode()}")
return proc.returncode
asyncio.run(run_command())create_subprocess_exec() 比 create_subprocess_shell() 更安全,因为它不会通过 shell 解释命令,避免了 shell 注入攻击。
3. 实时读取子进程输出
可以逐行读取子进程的输出:
python
import asyncio
async def read_stream(stream, prefix):
"""实时读取流并打印"""
while True:
line = await stream.readline()
if not line:
break
print(f"{prefix}: {line.decode().strip()}")
async def run_with_realtime_output(cmd):
proc = await asyncio.create_subprocess_shell(
cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
# 同时读取 stdout 和 stderr
await asyncio.gather(
read_stream(proc.stdout, "STDOUT"),
read_stream(proc.stderr, "STDERR")
)
await proc.wait()
return proc.returncode
asyncio.run(run_with_realtime_output("python -c 'import time; print(1); time.sleep(1); print(2)'"))4. 向子进程发送输入
可以通过 stdin 向子进程发送数据:
python
import asyncio
async def send_input():
proc = await asyncio.create_subprocess_exec(
'python', '-c', 'import sys; print(f"Got: {sys.stdin.read()}")',
stdin=asyncio.subprocess.PIPE,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
# 发送输入并等待结果
stdout, stderr = await proc.communicate(input=b"Hello, subprocess!\n")
print(stdout.decode())
return proc.returncode
asyncio.run(send_input())5. 超时控制
使用 asyncio.wait_for() 可以为子进程设置超时:
python
import asyncio
async def run_with_timeout(cmd, timeout=5.0):
try:
proc = await asyncio.create_subprocess_shell(
cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
stdout, stderr = await asyncio.wait_for(
proc.communicate(),
timeout=timeout
)
return stdout.decode(), stderr.decode(), proc.returncode
except asyncio.TimeoutError:
print(f"Command timed out after {timeout} seconds")
proc.kill()
await proc.wait()
raise
# 运行一个可能超时的命令
try:
result = asyncio.run(run_with_timeout("sleep 10", timeout=2.0))
except asyncio.TimeoutError:
print("Command was killed due to timeout")6. 并发运行多个子进程
可以同时运行多个子进程:
python
import asyncio
async def run_multiple_commands():
commands = [
"echo 'Command 1'",
"echo 'Command 2'",
"echo 'Command 3'"
]
async def run_cmd(cmd):
proc = await asyncio.create_subprocess_shell(
cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
stdout, stderr = await proc.communicate()
return cmd, stdout.decode(), proc.returncode
results = await asyncio.gather(*[run_cmd(cmd) for cmd in commands])
for cmd, output, returncode in results:
print(f"Command: {cmd}")
print(f"Output: {output}")
print(f"Return code: {returncode}")
print("-" * 40)
asyncio.run(run_multiple_commands())7. Windows 下的限制和解决方案
7.1 事件循环策略
在 Windows 上,默认的事件循环策略(SelectorEventLoop)不支持异步子进程。有以下解决方案:
方案一:使用 ProactorEventLoop
python
import asyncio
import sys
if sys.platform == 'win32':
# Windows 下需要使用 ProactorEventLoop
asyncio.set_event_loop_policy(asyncio.WindowsProactorEventLoopPolicy())
async def run_on_windows(cmd):
proc = await asyncio.create_subprocess_shell(
cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
stdout, stderr = await proc.communicate()
return stdout.decode()
asyncio.run(run_on_windows("dir"))方案二:使用 Python 3.8+ 的默认设置
Python 3.8+ 在 Windows 上默认使用 ProactorEventLoop,不需要手动设置:
python
import asyncio
# Python 3.8+ 在 Windows 上可以直接使用
async def run_on_windows():
proc = await asyncio.create_subprocess_shell(
"dir",
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
stdout, stderr = await proc.communicate()
print(stdout.decode())
asyncio.run(run_on_windows())7.2 ProactorEventLoop 的兼容性问题
虽然 ProactorEventLoop 支持子进程,但某些第三方库可能不兼容。如果遇到兼容性问题,可以考虑:
使用同步的 subprocess 模块:在线程池中运行同步的子进程操作
python
import asyncio
import subprocess
from concurrent.futures import ThreadPoolExecutor
async def run_sync_subprocess(cmd):
loop = asyncio.get_event_loop()
with ThreadPoolExecutor() as pool:
result = await loop.run_in_executor(
pool,
subprocess.run,
cmd,
True, # shell=True
subprocess.PIPE, # stdout
subprocess.PIPE # stderr
)
return result.stdout.decode()
asyncio.run(run_sync_subprocess("dir"))使用专门的库:如 asyncio-subprocess 或在特定情况下回退到同步实现
7.3 路径和命令差异
Windows 和 Unix 系统的命令和路径格式不同,编写跨平台代码时需要注意:
python
import asyncio
import sys
import os
async def run_ls():
# 根据操作系统选择命令
if sys.platform == 'win32':
cmd = 'dir'
else:
cmd = 'ls -la'
proc = await asyncio.create_subprocess_shell(
cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
stdout, stderr = await proc.communicate()
print(stdout.decode())
# 使用 pathlib 处理路径以保证跨平台
from pathlib import Path
async def run_python_script():
script_path = Path(__file__).parent / "script.py"
proc = await asyncio.create_subprocess_exec(
sys.executable, # 使用当前 Python 解释器
str(script_path),
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
stdout, stderr = await proc.communicate()
return stdout.decode()8. 错误处理最佳实践
python
import asyncio
async def robust_run_command(cmd):
try:
proc = await asyncio.create_subprocess_shell(
cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
stdout, stderr = await proc.communicate()
if proc.returncode != 0:
print(f"Command failed with return code {proc.returncode}")
print(f"stderr: {stderr.decode()}")
return None
return stdout.decode()
except Exception as e:
print(f"Error running command: {e}")
return None
asyncio.run(robust_run_command("python --version"))