diff options
author | Federico Caselli <cfederico87@gmail.com> | 2021-01-09 13:25:55 +0100 |
---|---|---|
committer | Federico Caselli <cfederico87@gmail.com> | 2021-01-21 21:42:58 +0100 |
commit | e56534995de2a97210d9c3d58183e8d245cdae94 (patch) | |
tree | 8b820ef993bb4157b107322a6bba8f3c2d78961d /test/base/test_concurrency_py3k.py | |
parent | 851a3a362ee5e05b8438f92e2e1df63c68f79d68 (diff) | |
download | sqlalchemy-e56534995de2a97210d9c3d58183e8d245cdae94.tar.gz |
Fix a couple of bugs in the asyncio implementation
Log an informative message if a connection is not closed
and the gc is reclaiming it when using an async dpapi, that
does not support running IO at that stage.
The ``AsyncAdaptedQueue`` used by default on async dpapis
should instantiate a queue only when it's first used
to avoid binding it to a possibly wrong event loop.
Fixes: #5823
Change-Id: Ibfc50e209b1937ae3d6599ae7997f028c7a92c33
Diffstat (limited to 'test/base/test_concurrency_py3k.py')
-rw-r--r-- | test/base/test_concurrency_py3k.py | 50 |
1 files changed, 50 insertions, 0 deletions
diff --git a/test/base/test_concurrency_py3k.py b/test/base/test_concurrency_py3k.py index e7ae8c9ad..8eabece92 100644 --- a/test/base/test_concurrency_py3k.py +++ b/test/base/test_concurrency_py3k.py @@ -1,12 +1,18 @@ +import threading + from sqlalchemy import exc from sqlalchemy import testing from sqlalchemy.testing import async_test from sqlalchemy.testing import eq_ +from sqlalchemy.testing import expect_raises from sqlalchemy.testing import expect_raises_message from sqlalchemy.testing import fixtures +from sqlalchemy.testing import is_true +from sqlalchemy.util import asyncio from sqlalchemy.util import await_fallback from sqlalchemy.util import await_only from sqlalchemy.util import greenlet_spawn +from sqlalchemy.util import queue try: from greenlet import greenlet @@ -152,3 +158,47 @@ class TestAsyncioCompat(fixtures.TestBase): "The current operation required an async execution but none was", ): await greenlet_spawn(run, _require_await=True) + + +class TestAsyncAdaptedQueue(fixtures.TestBase): + def test_lazy_init(self): + run = [False] + + def thread_go(q): + def go(): + q.get(timeout=0.1) + + with expect_raises(queue.Empty): + asyncio.run(greenlet_spawn(go)) + run[0] = True + + t = threading.Thread( + target=thread_go, args=[queue.AsyncAdaptedQueue()] + ) + t.start() + t.join() + + is_true(run[0]) + + def test_error_other_loop(self): + run = [False] + + def thread_go(q): + def go(): + eq_(q.get(block=False), 1) + q.get(timeout=0.1) + + with expect_raises_message( + RuntimeError, "Task .* attached to a different loop" + ): + asyncio.run(greenlet_spawn(go)) + + run[0] = True + + q = queue.AsyncAdaptedQueue() + q.put_nowait(1) + t = threading.Thread(target=thread_go, args=[q]) + t.start() + t.join() + + is_true(run[0]) |