Skip to content

Python 多进程

1. 进程的概念

Python 的多进程可以避免 GIL(全局解释器锁)的限制,充分发挥多核 CPU 的优势。每个进程都有独立的内存空间和 Python 解释器,因此不会受到 GIL 的影响。

1.1 何时使用多进程

适合使用多进程的场景

  • CPU 密集型任务(数值计算、数据处理、图像处理等)
  • 需要绕过 GIL 限制
  • 任务之间相互独立,不需要频繁共享数据

不适合使用多进程的场景

  • I/O 密集型任务(使用多线程或异步 I/O 更好)
  • 需要大量进程间通信
  • 内存资源有限

2. 创建进程

2.1 基本使用

创建过程与多线程一致:

py
import multiprocessing as mp


def job(a: int, b: int) -> None:
    print("a:", a, "b:", b)


if __name__ == "__main__":
    p1 = mp.Process(target=job, args=(1, 2))
    p1.start()
    p1.join()

2.2 使用 Process 类

python
from multiprocessing import Process
import os

def worker(name):
    print(f"Worker {name} started, PID: {os.getpid()}")
    # 执行一些工作
    result = sum(i * i for i in range(1000000))
    print(f"Worker {name} finished, result: {result}")

if __name__ == "__main__":
    print(f"Main process PID: {os.getpid()}")
    
    # 创建进程
    processes = []
    for i in range(4):
        p = Process(target=worker, args=(f"P{i}",))
        processes.append(p)
        p.start()
    
    # 等待所有进程完成
    for p in processes:
        p.join()
    
    print("All processes completed")

2.3 继承 Process 类

python
from multiprocessing import Process
import time

class MyProcess(Process):
    def __init__(self, name):
        super().__init__()
        self.process_name = name
    
    def run(self):
        print(f"{self.process_name} is running")
        time.sleep(2)
        print(f"{self.process_name} is done")

if __name__ == "__main__":
    processes = [MyProcess(f"Process-{i}") for i in range(3)]
    
    for p in processes:
        p.start()
    
    for p in processes:
        p.join()

3. 进程间通信

3.1 使用 Queue 进行通信

Queue 是线程和进程安全的队列,可以用于进程间传递数据:

py
import multiprocessing as mp


def job(q: mp.Queue, index: int) -> None:
    res = 0
    for i in range(1000):
        res += i + i * i + i**3
    q.put(res + index)


if __name__ == "__main__":
    q = mp.Queue()
    p1 = mp.Process(target=job, args=(q, 1))
    p2 = mp.Process(target=job, args=(q, 2))
    p1.start()
    p2.start()
    p1.join()
    p2.join()
    print(q.get())
    print(q.get())

3.2 使用 Pipe 进行通信

Pipe 创建一对连接对象,适合两个进程之间的双向通信:

python
from multiprocessing import Process, Pipe

def sender(conn, messages):
    """发送消息的进程"""
    for msg in messages:
        conn.send(msg)
        print(f"Sent: {msg}")
    conn.close()

def receiver(conn):
    """接收消息的进程"""
    while True:
        try:
            msg = conn.recv()
            print(f"Received: {msg}")
        except EOFError:
            break

if __name__ == "__main__":
    parent_conn, child_conn = Pipe()
    
    messages = ["Hello", "World", "From", "Pipe"]
    
    p1 = Process(target=sender, args=(parent_conn, messages))
    p2 = Process(target=receiver, args=(child_conn,))
    
    p1.start()
    p2.start()
    
    p1.join()
    p2.join()

3.3 使用 Manager 共享复杂数据

Manager 可以创建共享的 Python 对象(列表、字典等):

python
from multiprocessing import Process, Manager
import time

def worker(shared_dict, shared_list, worker_id):
    """修改共享数据的工作进程"""
    shared_dict[worker_id] = f"Worker {worker_id}"
    shared_list.append(worker_id)
    time.sleep(0.1)

if __name__ == "__main__":
    with Manager() as manager:
        # 创建共享对象
        shared_dict = manager.dict()
        shared_list = manager.list()
        
        processes = []
        for i in range(5):
            p = Process(target=worker, args=(shared_dict, shared_list, i))
            processes.append(p)
            p.start()
        
        for p in processes:
            p.join()
        
        print(f"Shared dict: {dict(shared_dict)}")
        print(f"Shared list: {list(shared_list)}")

4. 性能对比

4.1 多线程与多进程对比

py
import multiprocessing as mp
import threading as td
import time


def job(q: mp.Queue) -> None:
    res = 0
    for i in range(10000000):
        res += i + i * i + i**3
    q.put(res)


def multicore():
    q = mp.Queue()
    p1 = mp.Process(target=job, args=(q,))
    p2 = mp.Process(target=job, args=(q,))
    p1.start()
    p2.start()
    p1.join()
    p2.join()
    print("multicore:", q.get(), q.get())


def normal():
    res = 0
    for _ in range(2):
        for i in range(10000000):
            res += i + i * i + i**3
    print("normal:", res)


def multithread():
    q = mp.Queue()
    t1 = td.Thread(target=job, args=(q,))
    t2 = td.Thread(target=job, args=(q,))
    t1.start()
    t2.start()
    t1.join()
    t2.join()
    res1 = q.get()
    res2 = q.get()
    print("multithread:", res1, res2)


if __name__ == "__main__":
    st = time.time()
    normal()
    et = time.time()
    print("Time:", et - st)
    st = time.time()
    multicore()
    et = time.time()
    print("Time:", et - st)
    st = time.time()
    multithread()
    et = time.time()
    print("Time:", et - st)

4.2 实际测试示例

python
import time
from multiprocessing import Process, cpu_count
from threading import Thread

def cpu_bound_task(n):
    """CPU 密集型任务"""
    total = 0
    for i in range(n):
        total += i ** 2
    return total

def benchmark_sequential(n, repeat):
    """顺序执行"""
    start = time.time()
    for _ in range(repeat):
        cpu_bound_task(n)
    return time.time() - start

def benchmark_threads(n, repeat):
    """多线程执行"""
    start = time.time()
    threads = []
    for _ in range(repeat):
        t = Thread(target=cpu_bound_task, args=(n,))
        threads.append(t)
        t.start()
    for t in threads:
        t.join()
    return time.time() - start

def benchmark_processes(n, repeat):
    """多进程执行"""
    start = time.time()
    processes = []
    for _ in range(repeat):
        p = Process(target=cpu_bound_task, args=(n,))
        processes.append(p)
        p.start()
    for p in processes:
        p.join()
    return time.time() - start

if __name__ == "__main__":
    n = 1000000
    repeat = 4
    
    print(f"CPU cores: {cpu_count()}")
    print(f"Task: compute sum of squares for {n} numbers, {repeat} times")
    print(f"Sequential: {benchmark_sequential(n, repeat):.2f}s")
    print(f"Threads: {benchmark_threads(n, repeat):.2f}s")
    print(f"Processes: {benchmark_processes(n, repeat):.2f}s")

5. Pool 进程池

5.1 基本使用

py
import multiprocessing as mp


def job(x: int):
    return x**x


def multicore():
    pool = mp.Pool(processes=2)
    res = pool.map(job, range(1000))
    print(len(res))


if __name__ == "__main__":
    multicore()

5.2 apply_async 异步执行

apply_async() 函数异步执行单个任务:

python
from multiprocessing import Pool
import time

def job(x):
    time.sleep(1)
    return x * x

if __name__ == "__main__":
    with Pool(processes=4) as pool:
        # 提交多个异步任务
        results = []
        for i in range(10):
            res = pool.apply_async(job, (i,))
            results.append(res)
        
        # 获取所有结果
        for i, res in enumerate(results):
            print(f"Result {i}: {res.get()}")

5.3 map 批量执行

python
from multiprocessing import Pool

def square(x):
    return x * x

if __name__ == "__main__":
    with Pool() as pool:
        # map 会阻塞直到所有任务完成
        results = pool.map(square, range(10))
        print(results)

5.4 starmap 处理多参数

python
from multiprocessing import Pool

def add(x, y):
    return x + y

if __name__ == "__main__":
    data = [(1, 2), (3, 4), (5, 6)]
    
    with Pool() as pool:
        results = pool.starmap(add, data)
        print(results)  # [3, 7, 11]

5.5 imap 迭代处理

python
from multiprocessing import Pool
import time

def process_item(x):
    time.sleep(0.5)
    return x * x

if __name__ == "__main__":
    with Pool(processes=4) as pool:
        # imap 返回迭代器,可以逐个获取结果
        for result in pool.imap(process_item, range(10)):
            print(f"Got result: {result}")

6. 共享内存

6.1 Value 和 Array

定义共享内存变量:

python
from multiprocessing import Process, Value, Array
import time

def worker(shared_value, shared_array, index):
    """修改共享内存的工作进程"""
    shared_value.value += 1
    shared_array[index] = shared_array[index] * 2

if __name__ == "__main__":
    # 'd' 表示 double,'i' 表示 int
    val = Value('d', 0.0)
    array = Array('i', [1, 2, 3, 4, 5])
    
    processes = []
    for i in range(5):
        p = Process(target=worker, args=(val, array, i))
        processes.append(p)
        p.start()
    
    for p in processes:
        p.join()
    
    print(f"Final value: {val.value}")
    print(f"Final array: {list(array)}")

变量的类型值可以参考 标准库文档

6.2 共享内存的类型代码

类型代码C 类型Python 类型字节大小
'b'signed charint1
'B'unsigned charint1
'h'signed shortint2
'H'unsigned shortint2
'i'signed intint2 或 4
'I'unsigned intint2 或 4
'l'signed longint4
'L'unsigned longint4
'q'signed long longint8
'Q'unsigned long longint8
'f'floatfloat4
'd'doublefloat8

7. Lock 锁

7.1 使用锁避免竞态条件

使用锁确保对共享资源的互斥访问:

py
import multiprocessing as mp
import time
from multiprocessing.synchronize import Lock


def job(v, num: int, lock: Lock):
    lock.acquire()
    for _ in range(10):
        time.sleep(0.1)
        v.value += num
        print(v.value)
    lock.release()


def multicore():
    lock = mp.Lock()
    val = mp.Value("i", 0)
    p1 = mp.Process(target=job, args=(val, 1, lock))
    p2 = mp.Process(target=job, args=(val, 3, lock))
    p1.start()
    p2.start()
    p1.join()
    p2.join()


if __name__ == "__main__":
    multicore()

7.2 死锁问题

使用多个锁时要注意死锁问题:

python
from multiprocessing import Process, Lock
import time

def worker1(lock1, lock2):
    with lock1:
        print("Worker 1 acquired lock1")
        time.sleep(0.1)
        with lock2:
            print("Worker 1 acquired lock2")

def worker2(lock1, lock2):
    # 注意:这里的锁顺序与 worker1 相反,可能导致死锁
    with lock2:
        print("Worker 2 acquired lock2")
        time.sleep(0.1)
        with lock1:
            print("Worker 2 acquired lock1")

if __name__ == "__main__":
    lock1 = Lock()
    lock2 = Lock()
    
    p1 = Process(target=worker1, args=(lock1, lock2))
    p2 = Process(target=worker2, args=(lock1, lock2))
    
    p1.start()
    p2.start()
    
    p1.join()
    p2.join()

避免死锁的方法

  • 始终以相同的顺序获取多个锁
  • 使用超时机制
  • 使用更高级的同步原语

8. 进程同步原语

8.1 Semaphore 信号量

限制同时访问资源的进程数:

python
from multiprocessing import Process, Semaphore
import time

def worker(sem, worker_id):
    with sem:
        print(f"Worker {worker_id} is working")
        time.sleep(2)
        print(f"Worker {worker_id} is done")

if __name__ == "__main__":
    # 最多允许 3 个进程同时工作
    sem = Semaphore(3)
    
    processes = [Process(target=worker, args=(sem, i)) for i in range(10)]
    
    for p in processes:
        p.start()
    
    for p in processes:
        p.join()

8.2 Event 事件

用于进程间的信号通知:

python
from multiprocessing import Process, Event
import time

def waiter(event, name):
    print(f"{name} is waiting for event")
    event.wait()
    print(f"{name} received event")

def setter(event):
    print("Setter is sleeping")
    time.sleep(2)
    print("Setter is setting event")
    event.set()

if __name__ == "__main__":
    event = Event()
    
    p1 = Process(target=waiter, args=(event, "Process 1"))
    p2 = Process(target=waiter, args=(event, "Process 2"))
    p3 = Process(target=setter, args=(event,))
    
    p1.start()
    p2.start()
    p3.start()
    
    p1.join()
    p2.join()
    p3.join()

9. 最佳实践

9.1 使用 if __name__ == "__main__"

在使用多进程时,必须使用 if __name__ == "__main__" 保护主程序代码:

python
from multiprocessing import Process

def worker():
    print("Worker process")

# 必须使用 if __name__ == "__main__"
if __name__ == "__main__":
    p = Process(target=worker)
    p.start()
    p.join()

这是因为在 Windows 上,子进程会重新导入主模块,如果不加保护会导致无限递归。

9.2 选择合适的工作进程数

python
from multiprocessing import cpu_count

# 对于 CPU 密集型任务
workers = cpu_count()

# 对于 I/O 密集型任务,可以使用更多进程
workers = cpu_count() * 2

9.3 避免序列化大对象

进程间通信需要序列化对象,传递大对象会降低性能:

python
# 不好的做法:传递大对象
large_data = [i for i in range(1000000)]

def worker(data):
    return sum(data)

# 更好的做法:传递数据的引用或范围
def worker(start, end):
    return sum(range(start, end))

9.4 正确关闭进程池

python
from multiprocessing import Pool

if __name__ == "__main__":
    with Pool() as pool:
        # 使用进程池
        results = pool.map(some_function, data)
    # 自动关闭和 join

    # 或手动管理
    pool = Pool()
    try:
        results = pool.map(some_function, data)
    finally:
        pool.close()
        pool.join()

10. 常见问题

10.1 进程启动方法

Python 支持三种进程启动方法:

python
import multiprocessing as mp

# 查看当前启动方法
print(mp.get_start_method())

# 设置启动方法(必须在创建进程前)
if __name__ == "__main__":
    mp.set_start_method('spawn')  # 'fork', 'spawn', 'forkserver'
  • fork(Unix/Linux 默认):复制父进程
  • spawn(Windows 默认):启动新的 Python 解释器进程
  • forkserver:使用服务器进程来创建新进程

10.2 共享状态的替代方案

如果需要频繁共享状态,考虑使用:

  • 共享内存映射文件
  • Redis 或其他缓存系统
  • 数据库
  • 消息队列

11. 参考资料

  1. Python 官方文档 - multiprocessing
  2. Python 官方文档 - 并发执行
  3. multiprocessing 最佳实践