summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--doc/build/changelog/unreleased_20/9237.rst40
-rw-r--r--lib/sqlalchemy/pool/base.py65
-rw-r--r--test/engine/test_pool.py83
-rw-r--r--test/engine/test_transaction.py160
-rw-r--r--test/ext/asyncio/test_engine_py3k.py63
5 files changed, 289 insertions, 122 deletions
diff --git a/doc/build/changelog/unreleased_20/9237.rst b/doc/build/changelog/unreleased_20/9237.rst
new file mode 100644
index 000000000..18813cc73
--- /dev/null
+++ b/doc/build/changelog/unreleased_20/9237.rst
@@ -0,0 +1,40 @@
+.. change::
+ :tags: bug, asyncio
+ :tickets: 9237
+
+ Repaired a regression caused by the fix for :ticket:`8419` which caused
+ asyncpg connections to be reset (i.e. transaction ``rollback()`` called)
+ and returned to the pool normally in the case that the connection were not
+ explicitly returned to the connection pool and was instead being
+ intercepted by Python garbage collection, which would fail if the garbage
+ collection operation were being called outside of the asyncio event loop,
+ leading to a large amount of stack trace activity dumped into logging
+ and standard output.
+
+ The correct behavior is restored, which is that all asyncio connections
+ that are garbage collected due to not being explicitly returned to the
+ connection pool are detached from the pool and discarded, along with a
+ warning, rather than being returned the pool, as they cannot be reliably
+ reset. In the case of asyncpg connections, the asyncpg-specific
+ ``terminate()`` method will be used to end the connection more gracefully
+ within this process as opposed to just dropping it.
+
+ This change includes a small behavioral change that is hoped to be useful
+ for debugging asyncio applications, where the warning that's emitted in the
+ case of asyncio connections being unexpectedly garbage collected has been
+ made slightly more aggressive by moving it outside of a ``try/except``
+ block and into a ``finally:`` block, where it will emit unconditionally
+ regardless of whether the detach/termination operation succeeded or not. It
+ will also have the effect that applications or test suites which promote
+ Python warnings to exceptions will see this as a full exception raise,
+ whereas previously it was not possible for this warning to actually
+ propagate as an exception. Applications and test suites which need to
+ tolerate this warning in the interim should adjust the Python warnings
+ filter to allow these warnings to not raise.
+
+ The behavior for traditional sync connections remains unchanged, that
+ garbage collected connections continue to be returned to the pool normally
+ without emitting a warning. This will likely be changed in a future major
+ release to at least emit a similar warning as is emitted for asyncio
+ drivers, as it is a usage error for pooled connections to be intercepted by
+ garbage collection without being properly returned to the pool.
diff --git a/lib/sqlalchemy/pool/base.py b/lib/sqlalchemy/pool/base.py
index a77f65205..d67f32442 100644
--- a/lib/sqlalchemy/pool/base.py
+++ b/lib/sqlalchemy/pool/base.py
@@ -382,7 +382,10 @@ class Pool(log.Identified, event.EventTarget):
self._dialect.do_close(connection)
except BaseException as e:
self.logger.error(
- "Exception closing connection %r", connection, exc_info=True
+ f"Exception {'terminating' if terminate else 'closing'} "
+ f"connection %r",
+ connection,
+ exc_info=True,
)
if not isinstance(e, Exception):
raise
@@ -941,27 +944,32 @@ def _finalize_fairy(
if is_gc_cleanup:
assert ref is not None
_strong_ref_connection_records.pop(ref, None)
- elif fairy:
- _strong_ref_connection_records.pop(weakref.ref(fairy), None)
-
- if is_gc_cleanup:
assert connection_record is not None
if connection_record.fairy_ref is not ref:
return
assert dbapi_connection is None
dbapi_connection = connection_record.dbapi_connection
+ elif fairy:
+ _strong_ref_connection_records.pop(weakref.ref(fairy), None)
+
# null pool is not _is_asyncio but can be used also with async dialects
- dont_restore_gced = (
- pool._dialect.is_async and not pool._dialect.has_terminate
- )
+ dont_restore_gced = pool._dialect.is_async
if dont_restore_gced:
detach = connection_record is None or is_gc_cleanup
- can_manipulate_connection = ref is None
+ can_manipulate_connection = not is_gc_cleanup
+ can_close_or_terminate_connection = (
+ not pool._dialect.is_async or pool._dialect.has_terminate
+ )
+ requires_terminate_for_close = (
+ pool._dialect.is_async and pool._dialect.has_terminate
+ )
+
else:
detach = connection_record is None
- can_manipulate_connection = True
+ can_manipulate_connection = can_close_or_terminate_connection = True
+ requires_terminate_for_close = False
if dbapi_connection is not None:
if connection_record and echo:
@@ -992,25 +1000,14 @@ def _finalize_fairy(
fairy._pool = pool
fairy.detach()
- if can_manipulate_connection:
+ if can_close_or_terminate_connection:
if pool.dispatch.close_detached:
pool.dispatch.close_detached(dbapi_connection)
- pool._close_connection(dbapi_connection)
- else:
- message = (
- "The garbage collector is trying to clean up "
- f"connection {dbapi_connection!r}. This feature is "
- "unsupported on asyncio "
- 'dbapis that lack a "terminate" feature, since no '
- "IO can be performed at this stage to "
- "reset the connection. Please close out all "
- "connections when they are no longer used, calling "
- "``close()`` or using a context manager to "
- "manage their lifetime."
+ pool._close_connection(
+ dbapi_connection,
+ terminate=requires_terminate_for_close,
)
- pool.logger.error(message)
- util.warn(message)
except BaseException as e:
pool.logger.error(
@@ -1020,6 +1017,24 @@ def _finalize_fairy(
connection_record.invalidate(e=e)
if not isinstance(e, Exception):
raise
+ finally:
+ if detach and is_gc_cleanup and dont_restore_gced:
+ message = (
+ "The garbage collector is trying to clean up "
+ f"non-checked-in connection {dbapi_connection!r}, "
+ f"""which will be {
+ 'dropped, as it cannot be safely terminated'
+ if not can_close_or_terminate_connection
+ else 'terminated'
+ }. """
+ "Please ensure that SQLAlchemy pooled connections are "
+ "returned to "
+ "the pool explicitly, either by calling ``close()`` "
+ "or by using appropriate context managers to manage "
+ "their lifecycle."
+ )
+ pool.logger.error(message)
+ util.warn(message)
if connection_record and connection_record.fairy_ref is not None:
connection_record.checkin()
diff --git a/test/engine/test_pool.py b/test/engine/test_pool.py
index 4fddcc871..6730d7012 100644
--- a/test/engine/test_pool.py
+++ b/test/engine/test_pool.py
@@ -23,6 +23,7 @@ from sqlalchemy.testing import assert_raises_context_ok
from sqlalchemy.testing import assert_warns_message
from sqlalchemy.testing import eq_
from sqlalchemy.testing import expect_raises
+from sqlalchemy.testing import expect_warnings
from sqlalchemy.testing import fixtures
from sqlalchemy.testing import is_
from sqlalchemy.testing import is_none
@@ -456,6 +457,13 @@ class PoolEventsTest(PoolTestBase):
)
canary = []
+ @event.listens_for(p, "reset")
+ def reset(conn, rec, state):
+ canary.append(
+ f"""reset_{'rollback_ok'
+ if state.asyncio_safe else 'no_rollback'}"""
+ )
+
@event.listens_for(p, "checkin")
def checkin(*arg, **kw):
canary.append("checkin")
@@ -668,7 +676,7 @@ class PoolEventsTest(PoolTestBase):
c1 = p.connect()
eq_(canary, [])
c1.close()
- eq_(canary, ["checkin"])
+ eq_(canary, ["reset_rollback_ok", "checkin"])
def test_reset_event(self):
p, canary = self._reset_event_fixture()
@@ -728,11 +736,13 @@ class PoolEventsTest(PoolTestBase):
assert canary.call_args_list[0][0][0] is dbapi_con
assert canary.call_args_list[0][0][2] is exc
- @testing.combinations((True,), (False,), argnames="is_asyncio")
- @testing.combinations((True,), (False,), argnames="has_terminate")
+ @testing.variation("is_asyncio", [True, False])
+ @testing.variation("has_terminate", [True, False])
def test_checkin_event_gc(self, is_asyncio, has_terminate):
+ """tests for #8419, which have been modified for 2.0 in #9237"""
+
p, canary = self._checkin_event_fixture(
- _is_asyncio=is_asyncio, _has_terminate=has_terminate
+ _is_asyncio=bool(is_asyncio), _has_terminate=bool(has_terminate)
)
c1 = p.connect()
@@ -740,18 +750,38 @@ class PoolEventsTest(PoolTestBase):
dbapi_connection = weakref.ref(c1.dbapi_connection)
eq_(canary, [])
- del c1
- lazy_gc()
- detach_gced = is_asyncio and not has_terminate
+ if is_asyncio:
+ if has_terminate:
+ with expect_warnings(
+ "The garbage collector is trying to clean up.*which will "
+ "be terminated."
+ ):
+ del c1
+ lazy_gc()
+ else:
+ with expect_warnings(
+ "The garbage collector is trying to clean up.*which will "
+ "be dropped, as it cannot be safely terminated."
+ ):
+ del c1
+ lazy_gc()
+ else:
+ del c1
+ lazy_gc()
+
+ detach_gced = is_asyncio
if detach_gced:
- # "close_detached" is not called because for asyncio the
- # connection is just lost.
- eq_(canary, ["detach"])
+ if has_terminate:
+ eq_(canary, ["reset_no_rollback", "detach", "close_detached"])
+ else:
+ # "close_detached" is not called because for asyncio without
+ # terminate the connection is just lost.
+ eq_(canary, ["reset_no_rollback", "detach"])
else:
- eq_(canary, ["checkin"])
+ eq_(canary, ["reset_rollback_ok", "checkin"])
gc_collect()
if detach_gced:
@@ -769,10 +799,13 @@ class PoolEventsTest(PoolTestBase):
eq_(canary, [])
c1.close()
- eq_(canary, ["checkin"])
+ eq_(canary, ["reset_rollback_ok", "checkin"])
c2.close()
- eq_(canary, ["checkin", "checkin"])
+ eq_(
+ canary,
+ ["reset_rollback_ok", "checkin", "reset_rollback_ok", "checkin"],
+ )
def test_listen_targets_scope(self):
canary = []
@@ -1686,28 +1719,32 @@ class QueuePoolTest(PoolTestBase):
raise tsa.exc.DisconnectionError()
conn = pool.connect()
- old_dbapi_conn = conn.dbapi_connection
+ normally_closed_dbapi_conn = conn.dbapi_connection
conn.close()
- eq_(old_dbapi_conn.mock_calls, [call.rollback()])
+ eq_(normally_closed_dbapi_conn.mock_calls, [call.rollback()])
- old_dbapi_conn.boom = "yes"
+ normally_closed_dbapi_conn.boom = "yes"
conn = pool.connect()
- dbapi_conn = conn.dbapi_connection
+
+ # normally closed conn was checked out again but had a problem,
+ # so was replaced
+ eq_(
+ normally_closed_dbapi_conn.mock_calls,
+ [call.rollback(), call.close()],
+ )
+
+ not_closed_dbapi_conn = conn.dbapi_connection
del conn
gc_collect()
if detach_gced:
# new connection was detached + abandoned on return
- eq_(dbapi_conn.mock_calls, [])
+ eq_(not_closed_dbapi_conn.mock_calls, [])
else:
# new connection reset and returned to pool
- eq_(dbapi_conn.mock_calls, [call.rollback()])
-
- # old connection was just closed - did not get an
- # erroneous reset on return
- eq_(old_dbapi_conn.mock_calls, [call.rollback(), call.close()])
+ eq_(not_closed_dbapi_conn.mock_calls, [call.rollback()])
@testing.requires.timing_intensive
def test_recycle_pool_no_race(self):
diff --git a/test/engine/test_transaction.py b/test/engine/test_transaction.py
index e24c84820..1b3c4c17e 100644
--- a/test/engine/test_transaction.py
+++ b/test/engine/test_transaction.py
@@ -1069,25 +1069,25 @@ class TransactionTest(fixtures.TablesTest):
def test_savepoint_seven(self):
users = self.tables.users
- conn = testing.db.connect()
- trans = conn.begin()
- conn.execute(users.insert(), {"user_id": 1, "user_name": "name"})
+ with testing.db.connect() as conn:
+ trans = conn.begin()
+ conn.execute(users.insert(), {"user_id": 1, "user_name": "name"})
- sp1 = conn.begin_nested()
- conn.execute(users.insert(), {"user_id": 2, "user_name": "name2"})
+ sp1 = conn.begin_nested()
+ conn.execute(users.insert(), {"user_id": 2, "user_name": "name2"})
- sp2 = conn.begin_nested()
- conn.execute(users.insert(), {"user_id": 3, "user_name": "name3"})
+ sp2 = conn.begin_nested()
+ conn.execute(users.insert(), {"user_id": 3, "user_name": "name3"})
- assert conn.in_transaction()
+ assert conn.in_transaction()
- trans.close()
+ trans.close()
- assert not sp1.is_active
- assert not sp2.is_active
- assert not trans.is_active
- assert conn._transaction is None
- assert conn._nested_transaction is None
+ assert not sp1.is_active
+ assert not sp2.is_active
+ assert not trans.is_active
+ assert conn._transaction is None
+ assert conn._nested_transaction is None
with testing.db.connect() as conn:
eq_(
@@ -1163,41 +1163,47 @@ class IsolationLevelTest(fixtures.TestBase):
def test_engine_param_stays(self):
eng = testing_engine()
- isolation_level = eng.dialect.get_isolation_level(
- eng.connect().connection.dbapi_connection
- )
+ with eng.connect() as conn:
+ isolation_level = eng.dialect.get_isolation_level(
+ conn.connection.dbapi_connection
+ )
level = self._non_default_isolation_level()
ne_(isolation_level, level)
eng = testing_engine(options=dict(isolation_level=level))
- eq_(
- eng.dialect.get_isolation_level(
- eng.connect().connection.dbapi_connection
- ),
- level,
- )
+ with eng.connect() as conn:
+ eq_(
+ eng.dialect.get_isolation_level(
+ conn.connection.dbapi_connection
+ ),
+ level,
+ )
# check that it stays
- conn = eng.connect()
- eq_(
- eng.dialect.get_isolation_level(conn.connection.dbapi_connection),
- level,
- )
- conn.close()
+ with eng.connect() as conn:
+ eq_(
+ eng.dialect.get_isolation_level(
+ conn.connection.dbapi_connection
+ ),
+ level,
+ )
- conn = eng.connect()
- eq_(
- eng.dialect.get_isolation_level(conn.connection.dbapi_connection),
- level,
- )
- conn.close()
+ with eng.connect() as conn:
+ eq_(
+ eng.dialect.get_isolation_level(
+ conn.connection.dbapi_connection
+ ),
+ level,
+ )
def test_default_level(self):
eng = testing_engine(options=dict())
- isolation_level = eng.dialect.get_isolation_level(
- eng.connect().connection.dbapi_connection
- )
+
+ with eng.connect() as conn:
+ isolation_level = eng.dialect.get_isolation_level(
+ conn.connection.dbapi_connection
+ )
eq_(isolation_level, self._default_isolation_level())
def test_reset_level(self):
@@ -1335,16 +1341,16 @@ class IsolationLevelTest(fixtures.TestBase):
def test_connection_invalidated(self):
eng = testing_engine()
- conn = eng.connect()
- c2 = conn.execution_options(
- isolation_level=self._non_default_isolation_level()
- )
- c2.invalidate()
- c2.connection
+ with eng.connect() as conn:
+ c2 = conn.execution_options(
+ isolation_level=self._non_default_isolation_level()
+ )
+ c2.invalidate()
+ c2.connection
- # TODO: do we want to rebuild the previous isolation?
- # for now, this is current behavior so we will leave it.
- eq_(c2.get_isolation_level(), self._default_isolation_level())
+ # TODO: do we want to rebuild the previous isolation?
+ # for now, this is current behavior so we will leave it.
+ eq_(c2.get_isolation_level(), self._default_isolation_level())
def test_per_connection(self):
from sqlalchemy.pool import QueuePool
@@ -1384,24 +1390,26 @@ class IsolationLevelTest(fixtures.TestBase):
def test_exception_in_transaction(self):
eng = testing_engine()
- c1 = eng.connect()
- with expect_raises_message(
- exc.InvalidRequestError,
- r"This connection has already initialized a SQLAlchemy "
- r"Transaction\(\) object via begin\(\) or autobegin; "
- r"isolation_level may not be altered unless rollback\(\) or "
- r"commit\(\) is called first.",
- ):
- with c1.begin():
- c1 = c1.execution_options(
- isolation_level=self._non_default_isolation_level()
- )
+ with eng.connect() as c1:
+ with expect_raises_message(
+ exc.InvalidRequestError,
+ r"This connection has already initialized a SQLAlchemy "
+ r"Transaction\(\) object via begin\(\) or autobegin; "
+ r"isolation_level may not be altered unless rollback\(\) or "
+ r"commit\(\) is called first.",
+ ):
+ with c1.begin():
+ c1 = c1.execution_options(
+ isolation_level=self._non_default_isolation_level()
+ )
- # was never set, so we are on original value
- eq_(
- eng.dialect.get_isolation_level(c1.connection.dbapi_connection),
- self._default_isolation_level(),
- )
+ # was never set, so we are on original value
+ eq_(
+ eng.dialect.get_isolation_level(
+ c1.connection.dbapi_connection
+ ),
+ self._default_isolation_level(),
+ )
def test_per_statement_bzzt(self):
assert_raises_message(
@@ -1424,22 +1432,26 @@ class IsolationLevelTest(fixtures.TestBase):
}
),
)
- conn = eng.connect()
- eq_(
- eng.dialect.get_isolation_level(conn.connection.dbapi_connection),
- self._non_default_isolation_level(),
- )
+ with eng.connect() as conn:
+ eq_(
+ eng.dialect.get_isolation_level(
+ conn.connection.dbapi_connection
+ ),
+ self._non_default_isolation_level(),
+ )
def test_per_option_engine(self):
eng = testing_engine(testing.db.url).execution_options(
isolation_level=self._non_default_isolation_level()
)
- conn = eng.connect()
- eq_(
- eng.dialect.get_isolation_level(conn.connection.dbapi_connection),
- self._non_default_isolation_level(),
- )
+ with eng.connect() as conn:
+ eq_(
+ eng.dialect.get_isolation_level(
+ conn.connection.dbapi_connection
+ ),
+ self._non_default_isolation_level(),
+ )
def test_isolation_level_accessors_connection_default(self):
eng = testing_engine(testing.db.url)
diff --git a/test/ext/asyncio/test_engine_py3k.py b/test/ext/asyncio/test_engine_py3k.py
index 2eebb433d..bce669e4f 100644
--- a/test/ext/asyncio/test_engine_py3k.py
+++ b/test/ext/asyncio/test_engine_py3k.py
@@ -31,6 +31,7 @@ from sqlalchemy.testing import combinations
from sqlalchemy.testing import config
from sqlalchemy.testing import engines
from sqlalchemy.testing import eq_
+from sqlalchemy.testing import eq_regex
from sqlalchemy.testing import expect_raises
from sqlalchemy.testing import expect_raises_message
from sqlalchemy.testing import fixtures
@@ -339,6 +340,68 @@ class AsyncEngineTest(EngineFixture):
is_false(t1 == None)
+ @testing.variation("simulate_gc", [True, False])
+ def test_appropriate_warning_for_gced_connection(
+ self, async_engine, simulate_gc
+ ):
+ """test #9237 which builds upon a not really complete solution
+ added for #8419."""
+
+ async def go():
+ conn = await async_engine.connect()
+ await conn.begin()
+ await conn.execute(select(1))
+ pool_connection = await conn.get_raw_connection()
+ return pool_connection
+
+ from sqlalchemy.util.concurrency import await_only
+
+ pool_connection = await_only(go())
+
+ rec = pool_connection._connection_record
+ ref = rec.fairy_ref
+ pool = pool_connection._pool
+ echo = False
+
+ if simulate_gc:
+ # not using expect_warnings() here because we also want to do a
+ # negative test for warnings, and we want to absolutely make sure
+ # the thing here that emits the warning is the correct path
+ from sqlalchemy.pool.base import _finalize_fairy
+
+ with mock.patch.object(
+ pool._dialect,
+ "do_rollback",
+ mock.Mock(side_effect=Exception("can't run rollback")),
+ ), mock.patch("sqlalchemy.util.warn") as m:
+
+ _finalize_fairy(
+ None, rec, pool, ref, echo, transaction_was_reset=False
+ )
+
+ if async_engine.dialect.has_terminate:
+ expected_msg = (
+ "The garbage collector is trying to clean up.*which will "
+ "be terminated."
+ )
+ else:
+ expected_msg = (
+ "The garbage collector is trying to clean up.*which will "
+ "be dropped, as it cannot be safely terminated."
+ )
+
+ # [1] == .args, not in 3.7
+ eq_regex(m.mock_calls[0][1][0], expected_msg)
+ else:
+ # the warning emitted by the pool is inside of a try/except:
+ # so it's impossible right now to have this warning "raise".
+ # for now, test by using mock.patch
+
+ with mock.patch("sqlalchemy.util.warn") as m:
+ pool_connection.close()
+
+ eq_(m.mock_calls, [])
+
def test_clear_compiled_cache(self, async_engine):
async_engine.sync_engine._compiled_cache["foo"] = "bar"
eq_(async_engine.sync_engine._compiled_cache["foo"], "bar")