summaryrefslogtreecommitdiff
path: root/lib/sqlalchemy/util/queue.py
diff options
context:
space:
mode:
authorMike Bayer <mike_mp@zzzcomputing.com>2022-02-15 23:43:51 -0500
committerMike Bayer <mike_mp@zzzcomputing.com>2022-02-17 14:45:04 -0500
commit5157e0aa542f390242dd7a6d27a6ce1663230e46 (patch)
tree113f0e5a83e8229c7d0cb9e9c47387e1d703cb29 /lib/sqlalchemy/util/queue.py
parent20213fd1f27fea51015d753bf94c6f40674ae86f (diff)
downloadsqlalchemy-5157e0aa542f390242dd7a6d27a6ce1663230e46.tar.gz
pep-484 for pool
also extends into some areas of utils, events and others as needed. Formalizes a public hierarchy for pool API, with ManagesConnection -> PoolProxiedConnection / ConnectionPoolEntry for connectionfairy / connectionrecord, which are now what's exposed in the event API and other APIs. all public API docs moved to the new objects. Corrects the mypy plugin's check for sqlalchemy-stubs not being insatlled, which has to be imported using the dash in the name to be effective. Change-Id: I16c2cb43b2e840d28e70a015f370a768e70f3581
Diffstat (limited to 'lib/sqlalchemy/util/queue.py')
-rw-r--r--lib/sqlalchemy/util/queue.py126
1 files changed, 86 insertions, 40 deletions
diff --git a/lib/sqlalchemy/util/queue.py b/lib/sqlalchemy/util/queue.py
index 3062d9d8a..06b60c8bf 100644
--- a/lib/sqlalchemy/util/queue.py
+++ b/lib/sqlalchemy/util/queue.py
@@ -17,16 +17,26 @@ producing a ``put()`` inside the ``get()`` and therefore a reentrant
condition.
"""
+from __future__ import annotations
+
import asyncio
from collections import deque
import threading
from time import time as _time
+import typing
+from typing import Any
+from typing import Awaitable
+from typing import Deque
+from typing import Generic
+from typing import Optional
+from typing import TypeVar
from .concurrency import await_fallback
from .concurrency import await_only
from .langhelpers import memoized_property
+_T = TypeVar("_T", bound=Any)
__all__ = ["Empty", "Full", "Queue"]
@@ -42,8 +52,41 @@ class Full(Exception):
pass
-class Queue:
- def __init__(self, maxsize=0, use_lifo=False):
+class QueueCommon(Generic[_T]):
+ maxsize: int
+ use_lifo: bool
+
+ def __init__(self, maxsize: int = 0, use_lifo: bool = False):
+ ...
+
+ def empty(self) -> bool:
+ raise NotImplementedError()
+
+ def full(self) -> bool:
+ raise NotImplementedError()
+
+ def qsize(self) -> int:
+ raise NotImplementedError()
+
+ def put_nowait(self, item: _T) -> None:
+ raise NotImplementedError()
+
+ def put(
+ self, item: _T, block: bool = True, timeout: Optional[float] = None
+ ) -> None:
+ raise NotImplementedError()
+
+ def get_nowait(self) -> _T:
+ raise NotImplementedError()
+
+ def get(self, block: bool = True, timeout: Optional[float] = None) -> _T:
+ raise NotImplementedError()
+
+
+class Queue(QueueCommon[_T]):
+ queue: Deque[_T]
+
+ def __init__(self, maxsize: int = 0, use_lifo: bool = False):
"""Initialize a queue object with a given maximum size.
If `maxsize` is <= 0, the queue size is infinite.
@@ -66,27 +109,29 @@ class Queue:
# If this queue uses LIFO or FIFO
self.use_lifo = use_lifo
- def qsize(self):
+ def qsize(self) -> int:
"""Return the approximate size of the queue (not reliable!)."""
with self.mutex:
return self._qsize()
- def empty(self):
+ def empty(self) -> bool:
"""Return True if the queue is empty, False otherwise (not
reliable!)."""
with self.mutex:
return self._empty()
- def full(self):
+ def full(self) -> bool:
"""Return True if the queue is full, False otherwise (not
reliable!)."""
with self.mutex:
return self._full()
- def put(self, item, block=True, timeout=None):
+ def put(
+ self, item: _T, block: bool = True, timeout: Optional[float] = None
+ ) -> None:
"""Put an item into the queue.
If optional args `block` is True and `timeout` is None (the
@@ -118,7 +163,7 @@ class Queue:
self._put(item)
self.not_empty.notify()
- def put_nowait(self, item):
+ def put_nowait(self, item: _T) -> None:
"""Put an item into the queue without blocking.
Only enqueue the item if a free slot is immediately available.
@@ -126,7 +171,7 @@ class Queue:
"""
return self.put(item, False)
- def get(self, block=True, timeout=None):
+ def get(self, block: bool = True, timeout: Optional[float] = None) -> _T:
"""Remove and return an item from the queue.
If optional args `block` is True and `timeout` is None (the
@@ -158,7 +203,7 @@ class Queue:
self.not_full.notify()
return item
- def get_nowait(self):
+ def get_nowait(self) -> _T:
"""Remove and return an item from the queue without blocking.
Only get an item if one is immediately available. Otherwise
@@ -167,32 +212,23 @@ class Queue:
return self.get(False)
- # Override these methods to implement other queue organizations
- # (e.g. stack or priority queue).
- # These will only be called with appropriate locks held
-
- # Initialize the queue representation
- def _init(self, maxsize):
+ def _init(self, maxsize: int) -> None:
self.maxsize = maxsize
self.queue = deque()
- def _qsize(self):
+ def _qsize(self) -> int:
return len(self.queue)
- # Check whether the queue is empty
- def _empty(self):
+ def _empty(self) -> bool:
return not self.queue
- # Check whether the queue is full
- def _full(self):
+ def _full(self) -> bool:
return self.maxsize > 0 and len(self.queue) == self.maxsize
- # Put a new item in the queue
- def _put(self, item):
+ def _put(self, item: _T) -> None:
self.queue.append(item)
- # Get an item from the queue
- def _get(self):
+ def _get(self) -> _T:
if self.use_lifo:
# LIFO
return self.queue.pop()
@@ -201,14 +237,21 @@ class Queue:
return self.queue.popleft()
-class AsyncAdaptedQueue:
- await_ = staticmethod(await_only)
+class AsyncAdaptedQueue(QueueCommon[_T]):
+ if typing.TYPE_CHECKING:
- def __init__(self, maxsize=0, use_lifo=False):
+ @staticmethod
+ def await_(coroutine: Awaitable[Any]) -> _T:
+ ...
+
+ else:
+ await_ = staticmethod(await_only)
+
+ def __init__(self, maxsize: int = 0, use_lifo: bool = False):
self.use_lifo = use_lifo
self.maxsize = maxsize
- def empty(self):
+ def empty(self) -> bool:
return self._queue.empty()
def full(self):
@@ -218,7 +261,7 @@ class AsyncAdaptedQueue:
return self._queue.qsize()
@memoized_property
- def _queue(self):
+ def _queue(self) -> asyncio.Queue[_T]:
# Delay creation of the queue until it is first used, to avoid
# binding it to a possibly wrong event loop.
# By delaying the creation of the pool we accommodate the common
@@ -226,39 +269,41 @@ class AsyncAdaptedQueue:
# different event loop is in present compared to when the application
# is actually run.
+ queue: asyncio.Queue[_T]
+
if self.use_lifo:
queue = asyncio.LifoQueue(maxsize=self.maxsize)
else:
queue = asyncio.Queue(maxsize=self.maxsize)
return queue
- def put_nowait(self, item):
+ def put_nowait(self, item: _T) -> None:
try:
- return self._queue.put_nowait(item)
+ self._queue.put_nowait(item)
except asyncio.QueueFull as err:
raise Full() from err
- def put(self, item, block=True, timeout=None):
+ def put(
+ self, item: _T, block: bool = True, timeout: Optional[float] = None
+ ) -> None:
if not block:
return self.put_nowait(item)
try:
if timeout is not None:
- return self.await_(
- asyncio.wait_for(self._queue.put(item), timeout)
- )
+ self.await_(asyncio.wait_for(self._queue.put(item), timeout))
else:
- return self.await_(self._queue.put(item))
+ self.await_(self._queue.put(item))
except (asyncio.QueueFull, asyncio.TimeoutError) as err:
raise Full() from err
- def get_nowait(self):
+ def get_nowait(self) -> _T:
try:
return self._queue.get_nowait()
except asyncio.QueueEmpty as err:
raise Empty() from err
- def get(self, block=True, timeout=None):
+ def get(self, block: bool = True, timeout: Optional[float] = None) -> _T:
if not block:
return self.get_nowait()
@@ -273,5 +318,6 @@ class AsyncAdaptedQueue:
raise Empty() from err
-class FallbackAsyncAdaptedQueue(AsyncAdaptedQueue):
- await_ = staticmethod(await_fallback)
+class FallbackAsyncAdaptedQueue(AsyncAdaptedQueue[_T]):
+ if not typing.TYPE_CHECKING:
+ await_ = staticmethod(await_fallback)