Python 多进程
1. 进程的概念
Python 的多进程可以避免 GIL(全局解释器锁)的限制,充分发挥多核 CPU 的优势。每个进程都有独立的内存空间和 Python 解释器,因此不会受到 GIL 的影响。
1.1 何时使用多进程
适合使用多进程的场景:
- CPU 密集型任务(数值计算、数据处理、图像处理等)
- 需要绕过 GIL 限制
- 任务之间相互独立,不需要频繁共享数据
不适合使用多进程的场景:
- I/O 密集型任务(使用多线程或异步 I/O 更好)
- 需要大量进程间通信
- 内存资源有限
2. 创建进程
2.1 基本使用
创建过程与多线程一致:
py
import multiprocessing as mp
def job(a: int, b: int) -> None:
print("a:", a, "b:", b)
if __name__ == "__main__":
p1 = mp.Process(target=job, args=(1, 2))
p1.start()
p1.join()2.2 使用 Process 类
python
from multiprocessing import Process
import os
def worker(name):
print(f"Worker {name} started, PID: {os.getpid()}")
# 执行一些工作
result = sum(i * i for i in range(1000000))
print(f"Worker {name} finished, result: {result}")
if __name__ == "__main__":
print(f"Main process PID: {os.getpid()}")
# 创建进程
processes = []
for i in range(4):
p = Process(target=worker, args=(f"P{i}",))
processes.append(p)
p.start()
# 等待所有进程完成
for p in processes:
p.join()
print("All processes completed")2.3 继承 Process 类
python
from multiprocessing import Process
import time
class MyProcess(Process):
def __init__(self, name):
super().__init__()
self.process_name = name
def run(self):
print(f"{self.process_name} is running")
time.sleep(2)
print(f"{self.process_name} is done")
if __name__ == "__main__":
processes = [MyProcess(f"Process-{i}") for i in range(3)]
for p in processes:
p.start()
for p in processes:
p.join()3. 进程间通信
3.1 使用 Queue 进行通信
Queue 是线程和进程安全的队列,可以用于进程间传递数据:
py
import multiprocessing as mp
def job(q: mp.Queue, index: int) -> None:
res = 0
for i in range(1000):
res += i + i * i + i**3
q.put(res + index)
if __name__ == "__main__":
q = mp.Queue()
p1 = mp.Process(target=job, args=(q, 1))
p2 = mp.Process(target=job, args=(q, 2))
p1.start()
p2.start()
p1.join()
p2.join()
print(q.get())
print(q.get())3.2 使用 Pipe 进行通信
Pipe 创建一对连接对象,适合两个进程之间的双向通信:
python
from multiprocessing import Process, Pipe
def sender(conn, messages):
"""发送消息的进程"""
for msg in messages:
conn.send(msg)
print(f"Sent: {msg}")
conn.close()
def receiver(conn):
"""接收消息的进程"""
while True:
try:
msg = conn.recv()
print(f"Received: {msg}")
except EOFError:
break
if __name__ == "__main__":
parent_conn, child_conn = Pipe()
messages = ["Hello", "World", "From", "Pipe"]
p1 = Process(target=sender, args=(parent_conn, messages))
p2 = Process(target=receiver, args=(child_conn,))
p1.start()
p2.start()
p1.join()
p2.join()3.3 使用 Manager 共享复杂数据
Manager 可以创建共享的 Python 对象(列表、字典等):
python
from multiprocessing import Process, Manager
import time
def worker(shared_dict, shared_list, worker_id):
"""修改共享数据的工作进程"""
shared_dict[worker_id] = f"Worker {worker_id}"
shared_list.append(worker_id)
time.sleep(0.1)
if __name__ == "__main__":
with Manager() as manager:
# 创建共享对象
shared_dict = manager.dict()
shared_list = manager.list()
processes = []
for i in range(5):
p = Process(target=worker, args=(shared_dict, shared_list, i))
processes.append(p)
p.start()
for p in processes:
p.join()
print(f"Shared dict: {dict(shared_dict)}")
print(f"Shared list: {list(shared_list)}")4. 性能对比
4.1 多线程与多进程对比
py
import multiprocessing as mp
import threading as td
import time
def job(q: mp.Queue) -> None:
res = 0
for i in range(10000000):
res += i + i * i + i**3
q.put(res)
def multicore():
q = mp.Queue()
p1 = mp.Process(target=job, args=(q,))
p2 = mp.Process(target=job, args=(q,))
p1.start()
p2.start()
p1.join()
p2.join()
print("multicore:", q.get(), q.get())
def normal():
res = 0
for _ in range(2):
for i in range(10000000):
res += i + i * i + i**3
print("normal:", res)
def multithread():
q = mp.Queue()
t1 = td.Thread(target=job, args=(q,))
t2 = td.Thread(target=job, args=(q,))
t1.start()
t2.start()
t1.join()
t2.join()
res1 = q.get()
res2 = q.get()
print("multithread:", res1, res2)
if __name__ == "__main__":
st = time.time()
normal()
et = time.time()
print("Time:", et - st)
st = time.time()
multicore()
et = time.time()
print("Time:", et - st)
st = time.time()
multithread()
et = time.time()
print("Time:", et - st)4.2 实际测试示例
python
import time
from multiprocessing import Process, cpu_count
from threading import Thread
def cpu_bound_task(n):
"""CPU 密集型任务"""
total = 0
for i in range(n):
total += i ** 2
return total
def benchmark_sequential(n, repeat):
"""顺序执行"""
start = time.time()
for _ in range(repeat):
cpu_bound_task(n)
return time.time() - start
def benchmark_threads(n, repeat):
"""多线程执行"""
start = time.time()
threads = []
for _ in range(repeat):
t = Thread(target=cpu_bound_task, args=(n,))
threads.append(t)
t.start()
for t in threads:
t.join()
return time.time() - start
def benchmark_processes(n, repeat):
"""多进程执行"""
start = time.time()
processes = []
for _ in range(repeat):
p = Process(target=cpu_bound_task, args=(n,))
processes.append(p)
p.start()
for p in processes:
p.join()
return time.time() - start
if __name__ == "__main__":
n = 1000000
repeat = 4
print(f"CPU cores: {cpu_count()}")
print(f"Task: compute sum of squares for {n} numbers, {repeat} times")
print(f"Sequential: {benchmark_sequential(n, repeat):.2f}s")
print(f"Threads: {benchmark_threads(n, repeat):.2f}s")
print(f"Processes: {benchmark_processes(n, repeat):.2f}s")5. Pool 进程池
5.1 基本使用
py
import multiprocessing as mp
def job(x: int):
return x**x
def multicore():
pool = mp.Pool(processes=2)
res = pool.map(job, range(1000))
print(len(res))
if __name__ == "__main__":
multicore()5.2 apply_async 异步执行
apply_async() 函数异步执行单个任务:
python
from multiprocessing import Pool
import time
def job(x):
time.sleep(1)
return x * x
if __name__ == "__main__":
with Pool(processes=4) as pool:
# 提交多个异步任务
results = []
for i in range(10):
res = pool.apply_async(job, (i,))
results.append(res)
# 获取所有结果
for i, res in enumerate(results):
print(f"Result {i}: {res.get()}")5.3 map 批量执行
python
from multiprocessing import Pool
def square(x):
return x * x
if __name__ == "__main__":
with Pool() as pool:
# map 会阻塞直到所有任务完成
results = pool.map(square, range(10))
print(results)5.4 starmap 处理多参数
python
from multiprocessing import Pool
def add(x, y):
return x + y
if __name__ == "__main__":
data = [(1, 2), (3, 4), (5, 6)]
with Pool() as pool:
results = pool.starmap(add, data)
print(results) # [3, 7, 11]5.5 imap 迭代处理
python
from multiprocessing import Pool
import time
def process_item(x):
time.sleep(0.5)
return x * x
if __name__ == "__main__":
with Pool(processes=4) as pool:
# imap 返回迭代器,可以逐个获取结果
for result in pool.imap(process_item, range(10)):
print(f"Got result: {result}")6. 共享内存
6.1 Value 和 Array
定义共享内存变量:
python
from multiprocessing import Process, Value, Array
import time
def worker(shared_value, shared_array, index):
"""修改共享内存的工作进程"""
shared_value.value += 1
shared_array[index] = shared_array[index] * 2
if __name__ == "__main__":
# 'd' 表示 double,'i' 表示 int
val = Value('d', 0.0)
array = Array('i', [1, 2, 3, 4, 5])
processes = []
for i in range(5):
p = Process(target=worker, args=(val, array, i))
processes.append(p)
p.start()
for p in processes:
p.join()
print(f"Final value: {val.value}")
print(f"Final array: {list(array)}")变量的类型值可以参考 标准库文档。
6.2 共享内存的类型代码
| 类型代码 | C 类型 | Python 类型 | 字节大小 |
|---|---|---|---|
'b' | signed char | int | 1 |
'B' | unsigned char | int | 1 |
'h' | signed short | int | 2 |
'H' | unsigned short | int | 2 |
'i' | signed int | int | 2 或 4 |
'I' | unsigned int | int | 2 或 4 |
'l' | signed long | int | 4 |
'L' | unsigned long | int | 4 |
'q' | signed long long | int | 8 |
'Q' | unsigned long long | int | 8 |
'f' | float | float | 4 |
'd' | double | float | 8 |
7. Lock 锁
7.1 使用锁避免竞态条件
使用锁确保对共享资源的互斥访问:
py
import multiprocessing as mp
import time
from multiprocessing.synchronize import Lock
def job(v, num: int, lock: Lock):
lock.acquire()
for _ in range(10):
time.sleep(0.1)
v.value += num
print(v.value)
lock.release()
def multicore():
lock = mp.Lock()
val = mp.Value("i", 0)
p1 = mp.Process(target=job, args=(val, 1, lock))
p2 = mp.Process(target=job, args=(val, 3, lock))
p1.start()
p2.start()
p1.join()
p2.join()
if __name__ == "__main__":
multicore()7.2 死锁问题
使用多个锁时要注意死锁问题:
python
from multiprocessing import Process, Lock
import time
def worker1(lock1, lock2):
with lock1:
print("Worker 1 acquired lock1")
time.sleep(0.1)
with lock2:
print("Worker 1 acquired lock2")
def worker2(lock1, lock2):
# 注意:这里的锁顺序与 worker1 相反,可能导致死锁
with lock2:
print("Worker 2 acquired lock2")
time.sleep(0.1)
with lock1:
print("Worker 2 acquired lock1")
if __name__ == "__main__":
lock1 = Lock()
lock2 = Lock()
p1 = Process(target=worker1, args=(lock1, lock2))
p2 = Process(target=worker2, args=(lock1, lock2))
p1.start()
p2.start()
p1.join()
p2.join()避免死锁的方法:
- 始终以相同的顺序获取多个锁
- 使用超时机制
- 使用更高级的同步原语
8. 进程同步原语
8.1 Semaphore 信号量
限制同时访问资源的进程数:
python
from multiprocessing import Process, Semaphore
import time
def worker(sem, worker_id):
with sem:
print(f"Worker {worker_id} is working")
time.sleep(2)
print(f"Worker {worker_id} is done")
if __name__ == "__main__":
# 最多允许 3 个进程同时工作
sem = Semaphore(3)
processes = [Process(target=worker, args=(sem, i)) for i in range(10)]
for p in processes:
p.start()
for p in processes:
p.join()8.2 Event 事件
用于进程间的信号通知:
python
from multiprocessing import Process, Event
import time
def waiter(event, name):
print(f"{name} is waiting for event")
event.wait()
print(f"{name} received event")
def setter(event):
print("Setter is sleeping")
time.sleep(2)
print("Setter is setting event")
event.set()
if __name__ == "__main__":
event = Event()
p1 = Process(target=waiter, args=(event, "Process 1"))
p2 = Process(target=waiter, args=(event, "Process 2"))
p3 = Process(target=setter, args=(event,))
p1.start()
p2.start()
p3.start()
p1.join()
p2.join()
p3.join()9. 最佳实践
9.1 使用 if __name__ == "__main__"
在使用多进程时,必须使用 if __name__ == "__main__" 保护主程序代码:
python
from multiprocessing import Process
def worker():
print("Worker process")
# 必须使用 if __name__ == "__main__"
if __name__ == "__main__":
p = Process(target=worker)
p.start()
p.join()这是因为在 Windows 上,子进程会重新导入主模块,如果不加保护会导致无限递归。
9.2 选择合适的工作进程数
python
from multiprocessing import cpu_count
# 对于 CPU 密集型任务
workers = cpu_count()
# 对于 I/O 密集型任务,可以使用更多进程
workers = cpu_count() * 29.3 避免序列化大对象
进程间通信需要序列化对象,传递大对象会降低性能:
python
# 不好的做法:传递大对象
large_data = [i for i in range(1000000)]
def worker(data):
return sum(data)
# 更好的做法:传递数据的引用或范围
def worker(start, end):
return sum(range(start, end))9.4 正确关闭进程池
python
from multiprocessing import Pool
if __name__ == "__main__":
with Pool() as pool:
# 使用进程池
results = pool.map(some_function, data)
# 自动关闭和 join
# 或手动管理
pool = Pool()
try:
results = pool.map(some_function, data)
finally:
pool.close()
pool.join()10. 常见问题
10.1 进程启动方法
Python 支持三种进程启动方法:
python
import multiprocessing as mp
# 查看当前启动方法
print(mp.get_start_method())
# 设置启动方法(必须在创建进程前)
if __name__ == "__main__":
mp.set_start_method('spawn') # 'fork', 'spawn', 'forkserver'- fork(Unix/Linux 默认):复制父进程
- spawn(Windows 默认):启动新的 Python 解释器进程
- forkserver:使用服务器进程来创建新进程
10.2 共享状态的替代方案
如果需要频繁共享状态,考虑使用:
- 共享内存映射文件
- Redis 或其他缓存系统
- 数据库
- 消息队列