summaryrefslogtreecommitdiff
path: root/test/base/test_concurrency_py3k.py
diff options
context:
space:
mode:
authorFederico Caselli <cfederico87@gmail.com>2021-01-09 13:25:55 +0100
committerFederico Caselli <cfederico87@gmail.com>2021-01-21 21:42:58 +0100
commite56534995de2a97210d9c3d58183e8d245cdae94 (patch)
tree8b820ef993bb4157b107322a6bba8f3c2d78961d /test/base/test_concurrency_py3k.py
parent851a3a362ee5e05b8438f92e2e1df63c68f79d68 (diff)
downloadsqlalchemy-e56534995de2a97210d9c3d58183e8d245cdae94.tar.gz
Fix a couple of bugs in the asyncio implementation
Log an informative message if a connection is not closed and the gc is reclaiming it when using an async dpapi, that does not support running IO at that stage. The ``AsyncAdaptedQueue`` used by default on async dpapis should instantiate a queue only when it's first used to avoid binding it to a possibly wrong event loop. Fixes: #5823 Change-Id: Ibfc50e209b1937ae3d6599ae7997f028c7a92c33
Diffstat (limited to 'test/base/test_concurrency_py3k.py')
-rw-r--r--test/base/test_concurrency_py3k.py50
1 files changed, 50 insertions, 0 deletions
diff --git a/test/base/test_concurrency_py3k.py b/test/base/test_concurrency_py3k.py
index e7ae8c9ad..8eabece92 100644
--- a/test/base/test_concurrency_py3k.py
+++ b/test/base/test_concurrency_py3k.py
@@ -1,12 +1,18 @@
+import threading
+
from sqlalchemy import exc
from sqlalchemy import testing
from sqlalchemy.testing import async_test
from sqlalchemy.testing import eq_
+from sqlalchemy.testing import expect_raises
from sqlalchemy.testing import expect_raises_message
from sqlalchemy.testing import fixtures
+from sqlalchemy.testing import is_true
+from sqlalchemy.util import asyncio
from sqlalchemy.util import await_fallback
from sqlalchemy.util import await_only
from sqlalchemy.util import greenlet_spawn
+from sqlalchemy.util import queue
try:
from greenlet import greenlet
@@ -152,3 +158,47 @@ class TestAsyncioCompat(fixtures.TestBase):
"The current operation required an async execution but none was",
):
await greenlet_spawn(run, _require_await=True)
+
+
+class TestAsyncAdaptedQueue(fixtures.TestBase):
+ def test_lazy_init(self):
+ run = [False]
+
+ def thread_go(q):
+ def go():
+ q.get(timeout=0.1)
+
+ with expect_raises(queue.Empty):
+ asyncio.run(greenlet_spawn(go))
+ run[0] = True
+
+ t = threading.Thread(
+ target=thread_go, args=[queue.AsyncAdaptedQueue()]
+ )
+ t.start()
+ t.join()
+
+ is_true(run[0])
+
+ def test_error_other_loop(self):
+ run = [False]
+
+ def thread_go(q):
+ def go():
+ eq_(q.get(block=False), 1)
+ q.get(timeout=0.1)
+
+ with expect_raises_message(
+ RuntimeError, "Task .* attached to a different loop"
+ ):
+ asyncio.run(greenlet_spawn(go))
+
+ run[0] = True
+
+ q = queue.AsyncAdaptedQueue()
+ q.put_nowait(1)
+ t = threading.Thread(target=thread_go, args=[q])
+ t.start()
+ t.join()
+
+ is_true(run[0])