summaryrefslogtreecommitdiff
path: root/lib/sqlalchemy/util/queue.py
diff options
context:
space:
mode:
authorFederico Caselli <cfederico87@gmail.com>2021-01-09 13:25:55 +0100
committerFederico Caselli <cfederico87@gmail.com>2021-01-21 21:42:58 +0100
commite56534995de2a97210d9c3d58183e8d245cdae94 (patch)
tree8b820ef993bb4157b107322a6bba8f3c2d78961d /lib/sqlalchemy/util/queue.py
parent851a3a362ee5e05b8438f92e2e1df63c68f79d68 (diff)
downloadsqlalchemy-e56534995de2a97210d9c3d58183e8d245cdae94.tar.gz
Fix a couple of bugs in the asyncio implementation
Log an informative message if a connection is not closed and the gc is reclaiming it when using an async dpapi, that does not support running IO at that stage. The ``AsyncAdaptedQueue`` used by default on async dpapis should instantiate a queue only when it's first used to avoid binding it to a possibly wrong event loop. Fixes: #5823 Change-Id: Ibfc50e209b1937ae3d6599ae7997f028c7a92c33
Diffstat (limited to 'lib/sqlalchemy/util/queue.py')
-rw-r--r--lib/sqlalchemy/util/queue.py32
1 files changed, 25 insertions, 7 deletions
diff --git a/lib/sqlalchemy/util/queue.py b/lib/sqlalchemy/util/queue.py
index ca5a3abde..30e388248 100644
--- a/lib/sqlalchemy/util/queue.py
+++ b/lib/sqlalchemy/util/queue.py
@@ -26,6 +26,7 @@ from .compat import threading
from .concurrency import asyncio
from .concurrency import await_fallback
from .concurrency import await_only
+from .langhelpers import memoized_property
__all__ = ["Empty", "Full", "Queue"]
@@ -206,15 +207,32 @@ class AsyncAdaptedQueue:
await_ = staticmethod(await_only)
def __init__(self, maxsize=0, use_lifo=False):
- if use_lifo:
- self._queue = asyncio.LifoQueue(maxsize=maxsize)
- else:
- self._queue = asyncio.Queue(maxsize=maxsize)
self.use_lifo = use_lifo
self.maxsize = maxsize
- self.empty = self._queue.empty
- self.full = self._queue.full
- self.qsize = self._queue.qsize
+
+ def empty(self):
+ return self._queue.empty()
+
+ def full(self):
+ return self._queue.full()
+
+ def qsize(self):
+ return self._queue.qsize()
+
+ @memoized_property
+ def _queue(self):
+ # Delay creation of the queue until it is first used, to avoid
+ # binding it to a possibly wrong event loop.
+ # By delaying the creation of the pool we accommodate the common
+ # usage pattern of instanciating the engine at module level, where a
+ # different event loop is in present compared to when the application
+ # is actually run.
+
+ if self.use_lifo:
+ queue = asyncio.LifoQueue(maxsize=self.maxsize)
+ else:
+ queue = asyncio.Queue(maxsize=self.maxsize)
+ return queue
def put_nowait(self, item):
try: