实现一个线程安全的资源池
需求:第三方的某些对象实现无法保证线程安全,但并发系统要求高效地使用这些资源。
此时考虑通过多个实例来实现并发,因此我们需要一个线程安全的资源池。资源池要求支持资源的获取、释放、超时等待。
AI 提示词
Python 如何实现一个线程安全的资源池,使得线程不安全的对象可以被并发程序高效使用,要求传入 factory
和资源池最大大小。
py
from contextlib import contextmanager
from queue import Empty, Queue
from threading import Lock
from typing import Callable, Generic, TypeVar
T = TypeVar("T")
class ResourcePool(Generic[T]):
"""线程安全的资源池"""
def __init__(self, factory: Callable[[], T], max_size: int = 0):
"""
初始化资源池
:param factory: 用于创建资源的工厂函数,无参数
:param max_size: 资源池的最大大小,0 表示无限制
"""
self.factory = factory
self.max_size = max_size
self._pool = Queue[T](max_size)
self._lock = Lock()
self._current_size = 0
def acquire(self, timeout: float | None = None):
"""
获取一个资源如果没有可用资源且未达到最大大小,则创建一个新资源
否则,阻塞等待直到有资源可用
:param timeout: 可选的超时时间(秒)
:return: 获取的资源
:raises queue.Empty: 如果在指定时间内未能获取到资源
"""
try:
resource = self._pool.get(block=True, timeout=timeout)
return resource
except Empty:
with self._lock:
if self._current_size < self.max_size:
resource = self.factory()
self._current_size += 1
return resource
# 如果已经达到最大资源数,等待获取资源
resource = self._pool.get(block=True, timeout=timeout)
return resource
def release(self, resource: T):
"""
释放一个资源,将其放回资源池
:param resource: 要释放的资源
"""
self._pool.put(resource)
@contextmanager
def get_resource(self, timeout: float | None = None):
"""
上下文管理器,用于自动获取和释放资源
:param timeout: 获取资源的超时时间(秒)
"""
resource = self.acquire(timeout=timeout)
try:
yield resource
finally:
self.release(resource)
def __len__(self):
"""
返回当前资源池中可用资源的数量
"""
return self._pool.qsize()
示例代码:
python
pool = ResourcePool(lambda: Task(), 10)
with pool.get_resource() as task:
task.run()