summaryrefslogtreecommitdiff
path: root/test/engine/test_pool.py
diff options
context:
space:
mode:
authormike bayer <mike_mp@zzzcomputing.com>2022-08-24 18:08:17 +0000
committerGerrit Code Review <gerrit@ci3.zzzcomputing.com>2022-08-24 18:08:17 +0000
commit1ea3c783b6ceaf488e34f15c3ce97eabbc3ab4d3 (patch)
tree1dffaa16ae5a09c12a1cc47a833a352fb023ef95 /test/engine/test_pool.py
parent1c9da1e1d7ef7994328de2248b69a1a582766272 (diff)
parent776abf43d7404a3fa165588fd1e1e2d5ef9a9f04 (diff)
downloadsqlalchemy-1ea3c783b6ceaf488e34f15c3ce97eabbc3ab4d3.tar.gz
Merge "integrate connection.terminate() for supporting dialects" into main
Diffstat (limited to 'test/engine/test_pool.py')
-rw-r--r--test/engine/test_pool.py20
1 files changed, 15 insertions, 5 deletions
diff --git a/test/engine/test_pool.py b/test/engine/test_pool.py
index c66ebfa6c..39f86e280 100644
--- a/test/engine/test_pool.py
+++ b/test/engine/test_pool.py
@@ -87,10 +87,13 @@ class PoolTestBase(fixtures.TestBase):
def _queuepool_dbapi_fixture(self, **kw):
dbapi = MockDBAPI()
_is_asyncio = kw.pop("_is_asyncio", False)
+ _has_terminate = kw.pop("_has_terminate", False)
p = pool.QueuePool(creator=lambda: dbapi.connect("foo.db"), **kw)
if _is_asyncio:
p._is_asyncio = True
p._dialect = _AsyncConnDialect()
+ if _has_terminate:
+ p._dialect.has_terminate = True
return dbapi, p
@@ -445,8 +448,10 @@ class PoolEventsTest(PoolTestBase):
return p, canary
- def _checkin_event_fixture(self, _is_asyncio=False):
- p = self._queuepool_fixture(_is_asyncio=_is_asyncio)
+ def _checkin_event_fixture(self, _is_asyncio=False, _has_terminate=False):
+ p = self._queuepool_fixture(
+ _is_asyncio=_is_asyncio, _has_terminate=_has_terminate
+ )
canary = []
@event.listens_for(p, "checkin")
@@ -721,9 +726,12 @@ class PoolEventsTest(PoolTestBase):
assert canary.call_args_list[0][0][0] is dbapi_con
assert canary.call_args_list[0][0][2] is exc
- @testing.combinations((True,), (False,))
- def test_checkin_event_gc(self, detach_gced):
- p, canary = self._checkin_event_fixture(_is_asyncio=detach_gced)
+ @testing.combinations((True,), (False,), argnames="is_asyncio")
+ @testing.combinations((True,), (False,), argnames="has_terminate")
+ def test_checkin_event_gc(self, is_asyncio, has_terminate):
+ p, canary = self._checkin_event_fixture(
+ _is_asyncio=is_asyncio, _has_terminate=has_terminate
+ )
c1 = p.connect()
@@ -733,6 +741,8 @@ class PoolEventsTest(PoolTestBase):
del c1
lazy_gc()
+ detach_gced = is_asyncio and not has_terminate
+
if detach_gced:
# "close_detached" is not called because for asyncio the
# connection is just lost.