diff options
-rw-r--r-- | doc/build/changelog/unreleased_20/9237.rst | 40 | ||||
-rw-r--r-- | lib/sqlalchemy/pool/base.py | 65 | ||||
-rw-r--r-- | test/engine/test_pool.py | 83 | ||||
-rw-r--r-- | test/engine/test_transaction.py | 160 | ||||
-rw-r--r-- | test/ext/asyncio/test_engine_py3k.py | 63 |
5 files changed, 289 insertions, 122 deletions
diff --git a/doc/build/changelog/unreleased_20/9237.rst b/doc/build/changelog/unreleased_20/9237.rst new file mode 100644 index 000000000..18813cc73 --- /dev/null +++ b/doc/build/changelog/unreleased_20/9237.rst @@ -0,0 +1,40 @@ +.. change:: + :tags: bug, asyncio + :tickets: 9237 + + Repaired a regression caused by the fix for :ticket:`8419` which caused + asyncpg connections to be reset (i.e. transaction ``rollback()`` called) + and returned to the pool normally in the case that the connection were not + explicitly returned to the connection pool and was instead being + intercepted by Python garbage collection, which would fail if the garbage + collection operation were being called outside of the asyncio event loop, + leading to a large amount of stack trace activity dumped into logging + and standard output. + + The correct behavior is restored, which is that all asyncio connections + that are garbage collected due to not being explicitly returned to the + connection pool are detached from the pool and discarded, along with a + warning, rather than being returned the pool, as they cannot be reliably + reset. In the case of asyncpg connections, the asyncpg-specific + ``terminate()`` method will be used to end the connection more gracefully + within this process as opposed to just dropping it. + + This change includes a small behavioral change that is hoped to be useful + for debugging asyncio applications, where the warning that's emitted in the + case of asyncio connections being unexpectedly garbage collected has been + made slightly more aggressive by moving it outside of a ``try/except`` + block and into a ``finally:`` block, where it will emit unconditionally + regardless of whether the detach/termination operation succeeded or not. It + will also have the effect that applications or test suites which promote + Python warnings to exceptions will see this as a full exception raise, + whereas previously it was not possible for this warning to actually + propagate as an exception. Applications and test suites which need to + tolerate this warning in the interim should adjust the Python warnings + filter to allow these warnings to not raise. + + The behavior for traditional sync connections remains unchanged, that + garbage collected connections continue to be returned to the pool normally + without emitting a warning. This will likely be changed in a future major + release to at least emit a similar warning as is emitted for asyncio + drivers, as it is a usage error for pooled connections to be intercepted by + garbage collection without being properly returned to the pool. diff --git a/lib/sqlalchemy/pool/base.py b/lib/sqlalchemy/pool/base.py index a77f65205..d67f32442 100644 --- a/lib/sqlalchemy/pool/base.py +++ b/lib/sqlalchemy/pool/base.py @@ -382,7 +382,10 @@ class Pool(log.Identified, event.EventTarget): self._dialect.do_close(connection) except BaseException as e: self.logger.error( - "Exception closing connection %r", connection, exc_info=True + f"Exception {'terminating' if terminate else 'closing'} " + f"connection %r", + connection, + exc_info=True, ) if not isinstance(e, Exception): raise @@ -941,27 +944,32 @@ def _finalize_fairy( if is_gc_cleanup: assert ref is not None _strong_ref_connection_records.pop(ref, None) - elif fairy: - _strong_ref_connection_records.pop(weakref.ref(fairy), None) - - if is_gc_cleanup: assert connection_record is not None if connection_record.fairy_ref is not ref: return assert dbapi_connection is None dbapi_connection = connection_record.dbapi_connection + elif fairy: + _strong_ref_connection_records.pop(weakref.ref(fairy), None) + # null pool is not _is_asyncio but can be used also with async dialects - dont_restore_gced = ( - pool._dialect.is_async and not pool._dialect.has_terminate - ) + dont_restore_gced = pool._dialect.is_async if dont_restore_gced: detach = connection_record is None or is_gc_cleanup - can_manipulate_connection = ref is None + can_manipulate_connection = not is_gc_cleanup + can_close_or_terminate_connection = ( + not pool._dialect.is_async or pool._dialect.has_terminate + ) + requires_terminate_for_close = ( + pool._dialect.is_async and pool._dialect.has_terminate + ) + else: detach = connection_record is None - can_manipulate_connection = True + can_manipulate_connection = can_close_or_terminate_connection = True + requires_terminate_for_close = False if dbapi_connection is not None: if connection_record and echo: @@ -992,25 +1000,14 @@ def _finalize_fairy( fairy._pool = pool fairy.detach() - if can_manipulate_connection: + if can_close_or_terminate_connection: if pool.dispatch.close_detached: pool.dispatch.close_detached(dbapi_connection) - pool._close_connection(dbapi_connection) - else: - message = ( - "The garbage collector is trying to clean up " - f"connection {dbapi_connection!r}. This feature is " - "unsupported on asyncio " - 'dbapis that lack a "terminate" feature, since no ' - "IO can be performed at this stage to " - "reset the connection. Please close out all " - "connections when they are no longer used, calling " - "``close()`` or using a context manager to " - "manage their lifetime." + pool._close_connection( + dbapi_connection, + terminate=requires_terminate_for_close, ) - pool.logger.error(message) - util.warn(message) except BaseException as e: pool.logger.error( @@ -1020,6 +1017,24 @@ def _finalize_fairy( connection_record.invalidate(e=e) if not isinstance(e, Exception): raise + finally: + if detach and is_gc_cleanup and dont_restore_gced: + message = ( + "The garbage collector is trying to clean up " + f"non-checked-in connection {dbapi_connection!r}, " + f"""which will be { + 'dropped, as it cannot be safely terminated' + if not can_close_or_terminate_connection + else 'terminated' + }. """ + "Please ensure that SQLAlchemy pooled connections are " + "returned to " + "the pool explicitly, either by calling ``close()`` " + "or by using appropriate context managers to manage " + "their lifecycle." + ) + pool.logger.error(message) + util.warn(message) if connection_record and connection_record.fairy_ref is not None: connection_record.checkin() diff --git a/test/engine/test_pool.py b/test/engine/test_pool.py index 4fddcc871..6730d7012 100644 --- a/test/engine/test_pool.py +++ b/test/engine/test_pool.py @@ -23,6 +23,7 @@ from sqlalchemy.testing import assert_raises_context_ok from sqlalchemy.testing import assert_warns_message from sqlalchemy.testing import eq_ from sqlalchemy.testing import expect_raises +from sqlalchemy.testing import expect_warnings from sqlalchemy.testing import fixtures from sqlalchemy.testing import is_ from sqlalchemy.testing import is_none @@ -456,6 +457,13 @@ class PoolEventsTest(PoolTestBase): ) canary = [] + @event.listens_for(p, "reset") + def reset(conn, rec, state): + canary.append( + f"""reset_{'rollback_ok' + if state.asyncio_safe else 'no_rollback'}""" + ) + @event.listens_for(p, "checkin") def checkin(*arg, **kw): canary.append("checkin") @@ -668,7 +676,7 @@ class PoolEventsTest(PoolTestBase): c1 = p.connect() eq_(canary, []) c1.close() - eq_(canary, ["checkin"]) + eq_(canary, ["reset_rollback_ok", "checkin"]) def test_reset_event(self): p, canary = self._reset_event_fixture() @@ -728,11 +736,13 @@ class PoolEventsTest(PoolTestBase): assert canary.call_args_list[0][0][0] is dbapi_con assert canary.call_args_list[0][0][2] is exc - @testing.combinations((True,), (False,), argnames="is_asyncio") - @testing.combinations((True,), (False,), argnames="has_terminate") + @testing.variation("is_asyncio", [True, False]) + @testing.variation("has_terminate", [True, False]) def test_checkin_event_gc(self, is_asyncio, has_terminate): + """tests for #8419, which have been modified for 2.0 in #9237""" + p, canary = self._checkin_event_fixture( - _is_asyncio=is_asyncio, _has_terminate=has_terminate + _is_asyncio=bool(is_asyncio), _has_terminate=bool(has_terminate) ) c1 = p.connect() @@ -740,18 +750,38 @@ class PoolEventsTest(PoolTestBase): dbapi_connection = weakref.ref(c1.dbapi_connection) eq_(canary, []) - del c1 - lazy_gc() - detach_gced = is_asyncio and not has_terminate + if is_asyncio: + if has_terminate: + with expect_warnings( + "The garbage collector is trying to clean up.*which will " + "be terminated." + ): + del c1 + lazy_gc() + else: + with expect_warnings( + "The garbage collector is trying to clean up.*which will " + "be dropped, as it cannot be safely terminated." + ): + del c1 + lazy_gc() + else: + del c1 + lazy_gc() + + detach_gced = is_asyncio if detach_gced: - # "close_detached" is not called because for asyncio the - # connection is just lost. - eq_(canary, ["detach"]) + if has_terminate: + eq_(canary, ["reset_no_rollback", "detach", "close_detached"]) + else: + # "close_detached" is not called because for asyncio without + # terminate the connection is just lost. + eq_(canary, ["reset_no_rollback", "detach"]) else: - eq_(canary, ["checkin"]) + eq_(canary, ["reset_rollback_ok", "checkin"]) gc_collect() if detach_gced: @@ -769,10 +799,13 @@ class PoolEventsTest(PoolTestBase): eq_(canary, []) c1.close() - eq_(canary, ["checkin"]) + eq_(canary, ["reset_rollback_ok", "checkin"]) c2.close() - eq_(canary, ["checkin", "checkin"]) + eq_( + canary, + ["reset_rollback_ok", "checkin", "reset_rollback_ok", "checkin"], + ) def test_listen_targets_scope(self): canary = [] @@ -1686,28 +1719,32 @@ class QueuePoolTest(PoolTestBase): raise tsa.exc.DisconnectionError() conn = pool.connect() - old_dbapi_conn = conn.dbapi_connection + normally_closed_dbapi_conn = conn.dbapi_connection conn.close() - eq_(old_dbapi_conn.mock_calls, [call.rollback()]) + eq_(normally_closed_dbapi_conn.mock_calls, [call.rollback()]) - old_dbapi_conn.boom = "yes" + normally_closed_dbapi_conn.boom = "yes" conn = pool.connect() - dbapi_conn = conn.dbapi_connection + + # normally closed conn was checked out again but had a problem, + # so was replaced + eq_( + normally_closed_dbapi_conn.mock_calls, + [call.rollback(), call.close()], + ) + + not_closed_dbapi_conn = conn.dbapi_connection del conn gc_collect() if detach_gced: # new connection was detached + abandoned on return - eq_(dbapi_conn.mock_calls, []) + eq_(not_closed_dbapi_conn.mock_calls, []) else: # new connection reset and returned to pool - eq_(dbapi_conn.mock_calls, [call.rollback()]) - - # old connection was just closed - did not get an - # erroneous reset on return - eq_(old_dbapi_conn.mock_calls, [call.rollback(), call.close()]) + eq_(not_closed_dbapi_conn.mock_calls, [call.rollback()]) @testing.requires.timing_intensive def test_recycle_pool_no_race(self): diff --git a/test/engine/test_transaction.py b/test/engine/test_transaction.py index e24c84820..1b3c4c17e 100644 --- a/test/engine/test_transaction.py +++ b/test/engine/test_transaction.py @@ -1069,25 +1069,25 @@ class TransactionTest(fixtures.TablesTest): def test_savepoint_seven(self): users = self.tables.users - conn = testing.db.connect() - trans = conn.begin() - conn.execute(users.insert(), {"user_id": 1, "user_name": "name"}) + with testing.db.connect() as conn: + trans = conn.begin() + conn.execute(users.insert(), {"user_id": 1, "user_name": "name"}) - sp1 = conn.begin_nested() - conn.execute(users.insert(), {"user_id": 2, "user_name": "name2"}) + sp1 = conn.begin_nested() + conn.execute(users.insert(), {"user_id": 2, "user_name": "name2"}) - sp2 = conn.begin_nested() - conn.execute(users.insert(), {"user_id": 3, "user_name": "name3"}) + sp2 = conn.begin_nested() + conn.execute(users.insert(), {"user_id": 3, "user_name": "name3"}) - assert conn.in_transaction() + assert conn.in_transaction() - trans.close() + trans.close() - assert not sp1.is_active - assert not sp2.is_active - assert not trans.is_active - assert conn._transaction is None - assert conn._nested_transaction is None + assert not sp1.is_active + assert not sp2.is_active + assert not trans.is_active + assert conn._transaction is None + assert conn._nested_transaction is None with testing.db.connect() as conn: eq_( @@ -1163,41 +1163,47 @@ class IsolationLevelTest(fixtures.TestBase): def test_engine_param_stays(self): eng = testing_engine() - isolation_level = eng.dialect.get_isolation_level( - eng.connect().connection.dbapi_connection - ) + with eng.connect() as conn: + isolation_level = eng.dialect.get_isolation_level( + conn.connection.dbapi_connection + ) level = self._non_default_isolation_level() ne_(isolation_level, level) eng = testing_engine(options=dict(isolation_level=level)) - eq_( - eng.dialect.get_isolation_level( - eng.connect().connection.dbapi_connection - ), - level, - ) + with eng.connect() as conn: + eq_( + eng.dialect.get_isolation_level( + conn.connection.dbapi_connection + ), + level, + ) # check that it stays - conn = eng.connect() - eq_( - eng.dialect.get_isolation_level(conn.connection.dbapi_connection), - level, - ) - conn.close() + with eng.connect() as conn: + eq_( + eng.dialect.get_isolation_level( + conn.connection.dbapi_connection + ), + level, + ) - conn = eng.connect() - eq_( - eng.dialect.get_isolation_level(conn.connection.dbapi_connection), - level, - ) - conn.close() + with eng.connect() as conn: + eq_( + eng.dialect.get_isolation_level( + conn.connection.dbapi_connection + ), + level, + ) def test_default_level(self): eng = testing_engine(options=dict()) - isolation_level = eng.dialect.get_isolation_level( - eng.connect().connection.dbapi_connection - ) + + with eng.connect() as conn: + isolation_level = eng.dialect.get_isolation_level( + conn.connection.dbapi_connection + ) eq_(isolation_level, self._default_isolation_level()) def test_reset_level(self): @@ -1335,16 +1341,16 @@ class IsolationLevelTest(fixtures.TestBase): def test_connection_invalidated(self): eng = testing_engine() - conn = eng.connect() - c2 = conn.execution_options( - isolation_level=self._non_default_isolation_level() - ) - c2.invalidate() - c2.connection + with eng.connect() as conn: + c2 = conn.execution_options( + isolation_level=self._non_default_isolation_level() + ) + c2.invalidate() + c2.connection - # TODO: do we want to rebuild the previous isolation? - # for now, this is current behavior so we will leave it. - eq_(c2.get_isolation_level(), self._default_isolation_level()) + # TODO: do we want to rebuild the previous isolation? + # for now, this is current behavior so we will leave it. + eq_(c2.get_isolation_level(), self._default_isolation_level()) def test_per_connection(self): from sqlalchemy.pool import QueuePool @@ -1384,24 +1390,26 @@ class IsolationLevelTest(fixtures.TestBase): def test_exception_in_transaction(self): eng = testing_engine() - c1 = eng.connect() - with expect_raises_message( - exc.InvalidRequestError, - r"This connection has already initialized a SQLAlchemy " - r"Transaction\(\) object via begin\(\) or autobegin; " - r"isolation_level may not be altered unless rollback\(\) or " - r"commit\(\) is called first.", - ): - with c1.begin(): - c1 = c1.execution_options( - isolation_level=self._non_default_isolation_level() - ) + with eng.connect() as c1: + with expect_raises_message( + exc.InvalidRequestError, + r"This connection has already initialized a SQLAlchemy " + r"Transaction\(\) object via begin\(\) or autobegin; " + r"isolation_level may not be altered unless rollback\(\) or " + r"commit\(\) is called first.", + ): + with c1.begin(): + c1 = c1.execution_options( + isolation_level=self._non_default_isolation_level() + ) - # was never set, so we are on original value - eq_( - eng.dialect.get_isolation_level(c1.connection.dbapi_connection), - self._default_isolation_level(), - ) + # was never set, so we are on original value + eq_( + eng.dialect.get_isolation_level( + c1.connection.dbapi_connection + ), + self._default_isolation_level(), + ) def test_per_statement_bzzt(self): assert_raises_message( @@ -1424,22 +1432,26 @@ class IsolationLevelTest(fixtures.TestBase): } ), ) - conn = eng.connect() - eq_( - eng.dialect.get_isolation_level(conn.connection.dbapi_connection), - self._non_default_isolation_level(), - ) + with eng.connect() as conn: + eq_( + eng.dialect.get_isolation_level( + conn.connection.dbapi_connection + ), + self._non_default_isolation_level(), + ) def test_per_option_engine(self): eng = testing_engine(testing.db.url).execution_options( isolation_level=self._non_default_isolation_level() ) - conn = eng.connect() - eq_( - eng.dialect.get_isolation_level(conn.connection.dbapi_connection), - self._non_default_isolation_level(), - ) + with eng.connect() as conn: + eq_( + eng.dialect.get_isolation_level( + conn.connection.dbapi_connection + ), + self._non_default_isolation_level(), + ) def test_isolation_level_accessors_connection_default(self): eng = testing_engine(testing.db.url) diff --git a/test/ext/asyncio/test_engine_py3k.py b/test/ext/asyncio/test_engine_py3k.py index 2eebb433d..bce669e4f 100644 --- a/test/ext/asyncio/test_engine_py3k.py +++ b/test/ext/asyncio/test_engine_py3k.py @@ -31,6 +31,7 @@ from sqlalchemy.testing import combinations from sqlalchemy.testing import config from sqlalchemy.testing import engines from sqlalchemy.testing import eq_ +from sqlalchemy.testing import eq_regex from sqlalchemy.testing import expect_raises from sqlalchemy.testing import expect_raises_message from sqlalchemy.testing import fixtures @@ -339,6 +340,68 @@ class AsyncEngineTest(EngineFixture): is_false(t1 == None) + @testing.variation("simulate_gc", [True, False]) + def test_appropriate_warning_for_gced_connection( + self, async_engine, simulate_gc + ): + """test #9237 which builds upon a not really complete solution + added for #8419.""" + + async def go(): + conn = await async_engine.connect() + await conn.begin() + await conn.execute(select(1)) + pool_connection = await conn.get_raw_connection() + return pool_connection + + from sqlalchemy.util.concurrency import await_only + + pool_connection = await_only(go()) + + rec = pool_connection._connection_record + ref = rec.fairy_ref + pool = pool_connection._pool + echo = False + + if simulate_gc: + # not using expect_warnings() here because we also want to do a + # negative test for warnings, and we want to absolutely make sure + # the thing here that emits the warning is the correct path + from sqlalchemy.pool.base import _finalize_fairy + + with mock.patch.object( + pool._dialect, + "do_rollback", + mock.Mock(side_effect=Exception("can't run rollback")), + ), mock.patch("sqlalchemy.util.warn") as m: + + _finalize_fairy( + None, rec, pool, ref, echo, transaction_was_reset=False + ) + + if async_engine.dialect.has_terminate: + expected_msg = ( + "The garbage collector is trying to clean up.*which will " + "be terminated." + ) + else: + expected_msg = ( + "The garbage collector is trying to clean up.*which will " + "be dropped, as it cannot be safely terminated." + ) + + # [1] == .args, not in 3.7 + eq_regex(m.mock_calls[0][1][0], expected_msg) + else: + # the warning emitted by the pool is inside of a try/except: + # so it's impossible right now to have this warning "raise". + # for now, test by using mock.patch + + with mock.patch("sqlalchemy.util.warn") as m: + pool_connection.close() + + eq_(m.mock_calls, []) + def test_clear_compiled_cache(self, async_engine): async_engine.sync_engine._compiled_cache["foo"] = "bar" eq_(async_engine.sync_engine._compiled_cache["foo"], "bar") |