diff options
author | Mike Bayer <mike_mp@zzzcomputing.com> | 2022-02-15 23:43:51 -0500 |
---|---|---|
committer | Mike Bayer <mike_mp@zzzcomputing.com> | 2022-02-17 14:45:04 -0500 |
commit | 5157e0aa542f390242dd7a6d27a6ce1663230e46 (patch) | |
tree | 113f0e5a83e8229c7d0cb9e9c47387e1d703cb29 /lib/sqlalchemy/util/queue.py | |
parent | 20213fd1f27fea51015d753bf94c6f40674ae86f (diff) | |
download | sqlalchemy-5157e0aa542f390242dd7a6d27a6ce1663230e46.tar.gz |
pep-484 for pool
also extends into some areas of utils, events and others
as needed.
Formalizes a public hierarchy for pool API,
with ManagesConnection -> PoolProxiedConnection /
ConnectionPoolEntry for connectionfairy / connectionrecord,
which are now what's exposed in the event API and other
APIs. all public API docs moved to the new objects.
Corrects the mypy plugin's check for sqlalchemy-stubs
not being insatlled, which has to be imported using the
dash in the name to be effective.
Change-Id: I16c2cb43b2e840d28e70a015f370a768e70f3581
Diffstat (limited to 'lib/sqlalchemy/util/queue.py')
-rw-r--r-- | lib/sqlalchemy/util/queue.py | 126 |
1 files changed, 86 insertions, 40 deletions
diff --git a/lib/sqlalchemy/util/queue.py b/lib/sqlalchemy/util/queue.py index 3062d9d8a..06b60c8bf 100644 --- a/lib/sqlalchemy/util/queue.py +++ b/lib/sqlalchemy/util/queue.py @@ -17,16 +17,26 @@ producing a ``put()`` inside the ``get()`` and therefore a reentrant condition. """ +from __future__ import annotations + import asyncio from collections import deque import threading from time import time as _time +import typing +from typing import Any +from typing import Awaitable +from typing import Deque +from typing import Generic +from typing import Optional +from typing import TypeVar from .concurrency import await_fallback from .concurrency import await_only from .langhelpers import memoized_property +_T = TypeVar("_T", bound=Any) __all__ = ["Empty", "Full", "Queue"] @@ -42,8 +52,41 @@ class Full(Exception): pass -class Queue: - def __init__(self, maxsize=0, use_lifo=False): +class QueueCommon(Generic[_T]): + maxsize: int + use_lifo: bool + + def __init__(self, maxsize: int = 0, use_lifo: bool = False): + ... + + def empty(self) -> bool: + raise NotImplementedError() + + def full(self) -> bool: + raise NotImplementedError() + + def qsize(self) -> int: + raise NotImplementedError() + + def put_nowait(self, item: _T) -> None: + raise NotImplementedError() + + def put( + self, item: _T, block: bool = True, timeout: Optional[float] = None + ) -> None: + raise NotImplementedError() + + def get_nowait(self) -> _T: + raise NotImplementedError() + + def get(self, block: bool = True, timeout: Optional[float] = None) -> _T: + raise NotImplementedError() + + +class Queue(QueueCommon[_T]): + queue: Deque[_T] + + def __init__(self, maxsize: int = 0, use_lifo: bool = False): """Initialize a queue object with a given maximum size. If `maxsize` is <= 0, the queue size is infinite. @@ -66,27 +109,29 @@ class Queue: # If this queue uses LIFO or FIFO self.use_lifo = use_lifo - def qsize(self): + def qsize(self) -> int: """Return the approximate size of the queue (not reliable!).""" with self.mutex: return self._qsize() - def empty(self): + def empty(self) -> bool: """Return True if the queue is empty, False otherwise (not reliable!).""" with self.mutex: return self._empty() - def full(self): + def full(self) -> bool: """Return True if the queue is full, False otherwise (not reliable!).""" with self.mutex: return self._full() - def put(self, item, block=True, timeout=None): + def put( + self, item: _T, block: bool = True, timeout: Optional[float] = None + ) -> None: """Put an item into the queue. If optional args `block` is True and `timeout` is None (the @@ -118,7 +163,7 @@ class Queue: self._put(item) self.not_empty.notify() - def put_nowait(self, item): + def put_nowait(self, item: _T) -> None: """Put an item into the queue without blocking. Only enqueue the item if a free slot is immediately available. @@ -126,7 +171,7 @@ class Queue: """ return self.put(item, False) - def get(self, block=True, timeout=None): + def get(self, block: bool = True, timeout: Optional[float] = None) -> _T: """Remove and return an item from the queue. If optional args `block` is True and `timeout` is None (the @@ -158,7 +203,7 @@ class Queue: self.not_full.notify() return item - def get_nowait(self): + def get_nowait(self) -> _T: """Remove and return an item from the queue without blocking. Only get an item if one is immediately available. Otherwise @@ -167,32 +212,23 @@ class Queue: return self.get(False) - # Override these methods to implement other queue organizations - # (e.g. stack or priority queue). - # These will only be called with appropriate locks held - - # Initialize the queue representation - def _init(self, maxsize): + def _init(self, maxsize: int) -> None: self.maxsize = maxsize self.queue = deque() - def _qsize(self): + def _qsize(self) -> int: return len(self.queue) - # Check whether the queue is empty - def _empty(self): + def _empty(self) -> bool: return not self.queue - # Check whether the queue is full - def _full(self): + def _full(self) -> bool: return self.maxsize > 0 and len(self.queue) == self.maxsize - # Put a new item in the queue - def _put(self, item): + def _put(self, item: _T) -> None: self.queue.append(item) - # Get an item from the queue - def _get(self): + def _get(self) -> _T: if self.use_lifo: # LIFO return self.queue.pop() @@ -201,14 +237,21 @@ class Queue: return self.queue.popleft() -class AsyncAdaptedQueue: - await_ = staticmethod(await_only) +class AsyncAdaptedQueue(QueueCommon[_T]): + if typing.TYPE_CHECKING: - def __init__(self, maxsize=0, use_lifo=False): + @staticmethod + def await_(coroutine: Awaitable[Any]) -> _T: + ... + + else: + await_ = staticmethod(await_only) + + def __init__(self, maxsize: int = 0, use_lifo: bool = False): self.use_lifo = use_lifo self.maxsize = maxsize - def empty(self): + def empty(self) -> bool: return self._queue.empty() def full(self): @@ -218,7 +261,7 @@ class AsyncAdaptedQueue: return self._queue.qsize() @memoized_property - def _queue(self): + def _queue(self) -> asyncio.Queue[_T]: # Delay creation of the queue until it is first used, to avoid # binding it to a possibly wrong event loop. # By delaying the creation of the pool we accommodate the common @@ -226,39 +269,41 @@ class AsyncAdaptedQueue: # different event loop is in present compared to when the application # is actually run. + queue: asyncio.Queue[_T] + if self.use_lifo: queue = asyncio.LifoQueue(maxsize=self.maxsize) else: queue = asyncio.Queue(maxsize=self.maxsize) return queue - def put_nowait(self, item): + def put_nowait(self, item: _T) -> None: try: - return self._queue.put_nowait(item) + self._queue.put_nowait(item) except asyncio.QueueFull as err: raise Full() from err - def put(self, item, block=True, timeout=None): + def put( + self, item: _T, block: bool = True, timeout: Optional[float] = None + ) -> None: if not block: return self.put_nowait(item) try: if timeout is not None: - return self.await_( - asyncio.wait_for(self._queue.put(item), timeout) - ) + self.await_(asyncio.wait_for(self._queue.put(item), timeout)) else: - return self.await_(self._queue.put(item)) + self.await_(self._queue.put(item)) except (asyncio.QueueFull, asyncio.TimeoutError) as err: raise Full() from err - def get_nowait(self): + def get_nowait(self) -> _T: try: return self._queue.get_nowait() except asyncio.QueueEmpty as err: raise Empty() from err - def get(self, block=True, timeout=None): + def get(self, block: bool = True, timeout: Optional[float] = None) -> _T: if not block: return self.get_nowait() @@ -273,5 +318,6 @@ class AsyncAdaptedQueue: raise Empty() from err -class FallbackAsyncAdaptedQueue(AsyncAdaptedQueue): - await_ = staticmethod(await_fallback) +class FallbackAsyncAdaptedQueue(AsyncAdaptedQueue[_T]): + if not typing.TYPE_CHECKING: + await_ = staticmethod(await_fallback) |