Skip to content

实现一个线程安全的资源池

需求:第三方的某些对象实现无法保证线程安全,但并发系统要求高效地使用这些资源。

此时考虑通过多个实例来实现并发,因此我们需要一个线程安全的资源池。资源池要求支持资源的获取、释放、超时等待。

AI 提示词

Python 如何实现一个线程安全的资源池,使得线程不安全的对象可以被并发程序高效使用,要求传入 factory 和资源池最大大小。

py
from contextlib import contextmanager
from queue import Empty, Queue
from threading import Lock
from typing import Callable, Generic, TypeVar

T = TypeVar("T")


class ResourcePool(Generic[T]):
    """线程安全的资源池"""

    def __init__(self, factory: Callable[[], T], max_size: int = 0):
        """
        初始化资源池

        :param factory: 用于创建资源的工厂函数,无参数
        :param max_size: 资源池的最大大小,0 表示无限制
        """
        self.factory = factory
        self.max_size = max_size
        self._pool = Queue[T](max_size)
        self._lock = Lock()
        self._current_size = 0

    def acquire(self, timeout: float | None = None):
        """
        获取一个资源如果没有可用资源且未达到最大大小,则创建一个新资源
        否则,阻塞等待直到有资源可用

        :param timeout: 可选的超时时间(秒)
        :return: 获取的资源
        :raises queue.Empty: 如果在指定时间内未能获取到资源
        """
        try:
            resource = self._pool.get(block=True, timeout=timeout)
            return resource
        except Empty:
            with self._lock:
                if self._current_size < self.max_size:
                    resource = self.factory()
                    self._current_size += 1
                    return resource
            # 如果已经达到最大资源数,等待获取资源
            resource = self._pool.get(block=True, timeout=timeout)
            return resource

    def release(self, resource: T):
        """
        释放一个资源,将其放回资源池

        :param resource: 要释放的资源
        """
        self._pool.put(resource)

    @contextmanager
    def get_resource(self, timeout: float | None = None):
        """
        上下文管理器,用于自动获取和释放资源

        :param timeout: 获取资源的超时时间(秒)
        """
        resource = self.acquire(timeout=timeout)
        try:
            yield resource
        finally:
            self.release(resource)

    def __len__(self):
        """
        返回当前资源池中可用资源的数量
        """
        return self._pool.qsize()

示例代码:

python
pool = ResourcePool(lambda: Task(), 10)
with pool.get_resource() as task:
    task.run()