Skip to content

实现一个线程安全的资源池

1. 背景与需求

1.1 问题描述

在并发编程中,我们经常会遇到这样的场景:第三方库提供的某些对象实现无法保证线程安全,但并发系统要求高效地使用这些资源。

1.2 解决方案

通过创建多个实例来实现并发访问,这就需要一个线程安全的资源池。资源池需要支持以下功能:

  • 资源获取:从池中获取可用资源
  • 资源释放:将资源归还给池
  • 超时等待:支持超时机制,避免无限等待
  • 动态创建:当资源不足时动态创建新资源
  • 大小限制:限制资源池的最大容量

2. 实现方案

2.1 设计思路

我们使用以下技术实现线程安全的资源池:

  1. Queue 队列:使用 Python 的 queue.Queue 作为资源容器,它本身是线程安全的
  2. 工厂模式:通过工厂函数创建资源实例
  3. 上下文管理器:提供便捷的资源获取和释放机制
  4. 泛型支持:使用 TypeVar 支持任意类型的资源

2.2 完整实现

py
from contextlib import contextmanager
from queue import Empty, Queue
from threading import Lock
from typing import Callable, Generic, TypeVar

T = TypeVar("T")


class ResourcePool(Generic[T]):
    """线程安全的资源池"""

    def __init__(self, factory: Callable[[], T], max_size: int = 0):
        """
        初始化资源池

        :param factory: 用于创建资源的工厂函数,无参数
        :param max_size: 资源池的最大大小,0 表示无限制
        """
        self.factory = factory
        self.max_size = max_size
        self._pool = Queue[T](max_size)
        self._lock = Lock()
        self._current_size = 0

    def acquire(self, timeout: float | None = None):
        """
        获取一个资源如果没有可用资源且未达到最大大小,则创建一个新资源
        否则,阻塞等待直到有资源可用

        :param timeout: 可选的超时时间(秒)
        :return: 获取的资源
        :raises queue.Empty: 如果在指定时间内未能获取到资源
        """
        try:
            resource = self._pool.get(block=True, timeout=timeout)
            return resource
        except Empty:
            with self._lock:
                if self._current_size < self.max_size:
                    resource = self.factory()
                    self._current_size += 1
                    return resource
            # 如果已经达到最大资源数,等待获取资源
            resource = self._pool.get(block=True, timeout=timeout)
            return resource

    def release(self, resource: T):
        """
        释放一个资源,将其放回资源池

        :param resource: 要释放的资源
        """
        self._pool.put(resource)

    @contextmanager
    def get_resource(self, timeout: float | None = None):
        """
        上下文管理器,用于自动获取和释放资源

        :param timeout: 获取资源的超时时间(秒)
        """
        resource = self.acquire(timeout=timeout)
        try:
            yield resource
        finally:
            self.release(resource)

    def __len__(self):
        """
        返回当前资源池中可用资源的数量
        """
        return self._pool.qsize()

3. 使用示例

3.1 基本用法

假设我们有一个线程不安全的 Task 类:

python
class Task:
    """一个线程不安全的任务类"""
    def __init__(self):
        self.state = "idle"
    
    def run(self):
        self.state = "running"
        # 执行任务...
        print(f"Task {id(self)} is running")
        self.state = "done"

# 创建资源池,最多 10 个 Task 实例
pool = ResourcePool(lambda: Task(), 10)

# 使用上下文管理器自动获取和释放资源
with pool.get_resource() as task:
    task.run()

3.2 并发使用示例

在多线程环境中使用资源池:

python
import threading
import time

def worker(pool: ResourcePool[Task], worker_id: int):
    """工作线程函数"""
    for i in range(3):
        with pool.get_resource(timeout=5.0) as task:
            print(f"Worker {worker_id} acquired task {id(task)}")
            task.run()
            time.sleep(0.1)  # 模拟耗时操作
        print(f"Worker {worker_id} released task")

# 创建资源池
pool = ResourcePool(lambda: Task(), max_size=5)

# 创建多个工作线程
threads = []
for i in range(10):
    t = threading.Thread(target=worker, args=(pool, i))
    threads.append(t)
    t.start()

# 等待所有线程完成
for t in threads:
    t.join()

3.3 手动管理资源

如果不使用上下文管理器,也可以手动获取和释放资源:

python
pool = ResourcePool(lambda: Task(), 10)

try:
    task = pool.acquire(timeout=5.0)
    task.run()
finally:
    pool.release(task)

4. 关键特性说明

4.1 线程安全

资源池通过以下机制保证线程安全:

  • 使用 queue.Queue 作为底层存储,它是线程安全的
  • 使用 threading.Lock 保护资源创建的临界区
  • 使用原子操作更新资源计数

4.2 超时机制

acquire() 方法支持超时参数:

python
try:
    task = pool.acquire(timeout=3.0)
    # 使用资源...
except Empty:
    print("获取资源超时")

4.3 动态扩容

当资源池为空且未达到最大容量时,会自动创建新资源:

python
# max_size=5 表示最多创建 5 个资源
pool = ResourcePool(lambda: Task(), max_size=5)

4.4 容量限制

可以通过 max_size 参数限制资源池的最大容量:

  • max_size=0:无限制(需谨慎使用)
  • max_size>0:限制最大资源数

5. 适用场景

这个资源池适用于以下场景:

  1. 数据库连接池:管理数据库连接对象
  2. HTTP 客户端池:复用 HTTP 客户端实例
  3. 文件句柄池:限制同时打开的文件数量
  4. 线程不安全对象:包装任何线程不安全的对象供并发使用

6. 注意事项

使用资源池时需要注意:

  1. 资源释放:确保资源使用后正确释放,建议使用上下文管理器
  2. 超时设置:合理设置超时时间,避免死锁
  3. 资源状态:确保释放的资源处于可用状态
  4. 容量规划:根据实际需求合理设置 max_size
  5. 异常处理:在资源使用过程中捕获异常,确保资源能够被释放

7. 扩展建议

可以考虑以下扩展:

  1. 健康检查:在获取资源时检查资源是否可用
  2. 资源回收:定期清理长时间未使用的资源
  3. 统计信息:记录资源使用情况和性能指标
  4. 优先级队列:支持按优先级获取资源
  5. 资源预热:初始化时预先创建部分资源

8. 参考资料