diff options
-rw-r--r-- | doc/build/changelog/changelog_09.rst | 25 | ||||
-rw-r--r-- | lib/sqlalchemy/engine/base.py | 16 | ||||
-rw-r--r-- | lib/sqlalchemy/pool.py | 55 | ||||
-rw-r--r-- | test/engine/test_pool.py | 90 | ||||
-rw-r--r-- | test/engine/test_transaction.py | 133 |
5 files changed, 299 insertions, 20 deletions
diff --git a/doc/build/changelog/changelog_09.rst b/doc/build/changelog/changelog_09.rst index e6a77378a..74f92da06 100644 --- a/doc/build/changelog/changelog_09.rst +++ b/doc/build/changelog/changelog_09.rst @@ -15,6 +15,31 @@ :version: 0.9.2 .. change:: + :tags: bug, mysql, pool, engine + :tickets: 2907 + + :class:`.Connection` now associates a new + :class:`.RootTransaction` or :class:`.TwoPhaseTransaction` + with its immediate :class:`._ConnectionFairy` as a "reset handler" + for the span of that transaction, which takes over the task + of calling commit() or rollback() for the "reset on return" behavior + of :class:`.Pool` if the transaction was not otherwise completed. + This resolves the issue that a picky transaction + like that of MySQL two-phase will be + properly closed out when the connection is closed without an + explicit rollback or commit (e.g. no longer raises "XAER_RMFAIL" + in this case - note this only shows up in logging as the exception + is not propagated within pool reset). + This issue would arise e.g. when using an orm + :class:`.Session` with ``twophase`` set, and then + :meth:`.Session.close` is called without an explicit rollback or + commit. The change also has the effect that you will now see + an explicit "ROLLBACK" in the logs when using a :class:`.Session` + object in non-autocommit mode regardless of how that session was + discarded. Thanks to Jeff Dairiki and Laurence Rowe for isolating + the issue here. + + .. change:: :tags: feature, pool, engine Added a new pool event :meth:`.PoolEvents.invalidate`. Called when diff --git a/lib/sqlalchemy/engine/base.py b/lib/sqlalchemy/engine/base.py index ff2e6e282..5c66f4806 100644 --- a/lib/sqlalchemy/engine/base.py +++ b/lib/sqlalchemy/engine/base.py @@ -404,7 +404,7 @@ class Connection(Connectable): """ if self.__transaction is None: - self.__transaction = RootTransaction(self) + self.__transaction = self.connection._reset_agent = RootTransaction(self) return self.__transaction else: return Transaction(self, self.__transaction) @@ -425,7 +425,7 @@ class Connection(Connectable): """ if self.__transaction is None: - self.__transaction = RootTransaction(self) + self.__transaction = self.connection._reset_agent = RootTransaction(self) else: self.__transaction = NestedTransaction(self, self.__transaction) return self.__transaction @@ -453,7 +453,7 @@ class Connection(Connectable): "is already in progress.") if xid is None: xid = self.engine.dialect.create_xid() - self.__transaction = TwoPhaseTransaction(self, xid) + self.__transaction = self.connection._reset_agent = TwoPhaseTransaction(self, xid) return self.__transaction def recover_twophase(self): @@ -491,11 +491,11 @@ class Connection(Connectable): self.engine.logger.info("ROLLBACK") try: self.engine.dialect.do_rollback(self.connection) - self.__transaction = None + self.__transaction = self.connection._reset_agent = None except Exception as e: self._handle_dbapi_exception(e, None, None, None, None) else: - self.__transaction = None + self.__transaction = self.connection._reset_agent = None def _commit_impl(self, autocommit=False): if self._has_events: @@ -505,7 +505,7 @@ class Connection(Connectable): self.engine.logger.info("COMMIT") try: self.engine.dialect.do_commit(self.connection) - self.__transaction = None + self.__transaction = self.connection._reset_agent = None except Exception as e: self._handle_dbapi_exception(e, None, None, None, None) @@ -560,7 +560,7 @@ class Connection(Connectable): if self._still_open_and_connection_is_valid: assert isinstance(self.__transaction, TwoPhaseTransaction) self.engine.dialect.do_rollback_twophase(self, xid, is_prepared) - self.__transaction = None + self.__transaction = self.connection._reset_agent = None def _commit_twophase_impl(self, xid, is_prepared): if self._has_events: @@ -569,7 +569,7 @@ class Connection(Connectable): if self._still_open_and_connection_is_valid: assert isinstance(self.__transaction, TwoPhaseTransaction) self.engine.dialect.do_commit_twophase(self, xid, is_prepared) - self.__transaction = None + self.__transaction = self.connection._reset_agent = None def _autorollback(self): if not self.in_transaction(): diff --git a/lib/sqlalchemy/pool.py b/lib/sqlalchemy/pool.py index 0f0a2ac10..f84f331d5 100644 --- a/lib/sqlalchemy/pool.py +++ b/lib/sqlalchemy/pool.py @@ -479,18 +479,8 @@ def _finalize_fairy(connection, connection_record, pool, ref, echo, fairy=None): try: fairy = fairy or _ConnectionFairy(connection, connection_record) - if pool.dispatch.reset: - pool.dispatch.reset(fairy, connection_record) - if pool._reset_on_return is reset_rollback: - if echo: - pool.logger.debug("Connection %s rollback-on-return", - connection) - pool._dialect.do_rollback(fairy) - elif pool._reset_on_return is reset_commit: - if echo: - pool.logger.debug("Connection %s commit-on-return", - connection) - pool._dialect.do_commit(fairy) + assert fairy.connection is connection + fairy._reset(pool, echo) # Immediately close detached instances if not connection_record: @@ -542,6 +532,23 @@ class _ConnectionFairy(object): """ + _reset_agent = None + """Refer to an object with a ``.commit()`` and ``.rollback()`` method; + if non-None, the "reset-on-return" feature will call upon this object + rather than directly against the dialect-level do_rollback() and do_commit() + methods. + + In practice, a :class:`.Connection` assigns a :class:`.Transaction` object + to this variable when one is in scope so that the :class:`.Transaction` + takes the job of committing or rolling back on return if + :meth:`.Connection.close` is called while the :class:`.Transaction` + still exists. + + This is essentially an "event handler" of sorts but is simplified as an + instance variable both for performance/simplicity as well as that there + can only be one "reset agent" at a time. + """ + @classmethod def _checkout(cls, pool, threadconns=None, fairy=None): if not fairy: @@ -591,6 +598,30 @@ class _ConnectionFairy(object): _close = _checkin + def _reset(self, pool, echo): + if pool.dispatch.reset: + pool.dispatch.reset(self, self._connection_record) + if pool._reset_on_return is reset_rollback: + if echo: + pool.logger.debug("Connection %s rollback-on-return%s", + self.connection, + ", via agent" + if self._reset_agent else "") + if self._reset_agent: + self._reset_agent.rollback() + else: + pool._dialect.do_rollback(self) + elif pool._reset_on_return is reset_commit: + if echo: + pool.logger.debug("Connection %s commit-on-return%s", + self.connection, + ", via agent" + if self._reset_agent else "") + if self._reset_agent: + self._reset_agent.commit() + else: + pool._dialect.do_commit(self) + @property def _logger(self): return self._pool.logger diff --git a/test/engine/test_pool.py b/test/engine/test_pool.py index 10f490b48..2e4c2dc48 100644 --- a/test/engine/test_pool.py +++ b/test/engine/test_pool.py @@ -1328,6 +1328,96 @@ class QueuePoolTest(PoolTestBase): c2 = p.connect() assert c2.connection is not None +class ResetOnReturnTest(PoolTestBase): + def _fixture(self, **kw): + dbapi = Mock() + return dbapi, pool.QueuePool(creator=lambda: dbapi.connect('foo.db'), **kw) + + def test_plain_rollback(self): + dbapi, p = self._fixture(reset_on_return='rollback') + + c1 = p.connect() + c1.close() + assert dbapi.connect().rollback.called + assert not dbapi.connect().commit.called + + def test_plain_commit(self): + dbapi, p = self._fixture(reset_on_return='commit') + + c1 = p.connect() + c1.close() + assert not dbapi.connect().rollback.called + assert dbapi.connect().commit.called + + def test_plain_none(self): + dbapi, p = self._fixture(reset_on_return=None) + + c1 = p.connect() + c1.close() + assert not dbapi.connect().rollback.called + assert not dbapi.connect().commit.called + + def test_agent_rollback(self): + dbapi, p = self._fixture(reset_on_return='rollback') + + class Agent(object): + def __init__(self, conn): + self.conn = conn + + def rollback(self): + self.conn.special_rollback() + + def commit(self): + self.conn.special_commit() + + c1 = p.connect() + c1._reset_agent = Agent(c1) + c1.close() + + assert dbapi.connect().special_rollback.called + assert not dbapi.connect().special_commit.called + + assert not dbapi.connect().rollback.called + assert not dbapi.connect().commit.called + + c1 = p.connect() + c1.close() + eq_(dbapi.connect().special_rollback.call_count, 1) + eq_(dbapi.connect().special_commit.call_count, 0) + + assert dbapi.connect().rollback.called + assert not dbapi.connect().commit.called + + def test_agent_commit(self): + dbapi, p = self._fixture(reset_on_return='commit') + + class Agent(object): + def __init__(self, conn): + self.conn = conn + + def rollback(self): + self.conn.special_rollback() + + def commit(self): + self.conn.special_commit() + + c1 = p.connect() + c1._reset_agent = Agent(c1) + c1.close() + assert not dbapi.connect().special_rollback.called + assert dbapi.connect().special_commit.called + + assert not dbapi.connect().rollback.called + assert not dbapi.connect().commit.called + + c1 = p.connect() + c1.close() + + eq_(dbapi.connect().special_rollback.call_count, 0) + eq_(dbapi.connect().special_commit.call_count, 1) + assert not dbapi.connect().rollback.called + assert dbapi.connect().commit.called + class SingletonThreadPoolTest(PoolTestBase): @testing.requires.threading_with_mock diff --git a/test/engine/test_transaction.py b/test/engine/test_transaction.py index 9c45cfddc..e3f5fc252 100644 --- a/test/engine/test_transaction.py +++ b/test/engine/test_transaction.py @@ -3,6 +3,7 @@ from sqlalchemy.testing import eq_, assert_raises, \ import sys import time import threading +from sqlalchemy import event from sqlalchemy.testing.engines import testing_engine from sqlalchemy import create_engine, MetaData, INT, VARCHAR, Sequence, \ select, Integer, String, func, text, exc @@ -378,6 +379,138 @@ class TransactionTest(fixtures.TestBase): eq_(result.fetchall(), [('user1', ), ('user4', )]) conn.close() + @testing.requires.two_phase_transactions + def test_reset_rollback_two_phase_no_rollback(self): + # test [ticket:2907], essentially that the + # TwoPhaseTransaction is given the job of "reset on return" + # so that picky backends like MySQL correctly clear out + # their state when a connection is closed without handling + # the transaction explicitly. + + eng = testing_engine() + + # MySQL raises if you call straight rollback() on + # a connection with an XID present + @event.listens_for(eng, "invalidate") + def conn_invalidated(dbapi_con, con_record, exception): + dbapi_con.close() + raise exception + + with eng.connect() as conn: + rec = conn.connection._connection_record + raw_dbapi_con = rec.connection + xa = conn.begin_twophase() + conn.execute(users.insert(), user_id=1, user_name='user1') + + assert rec.connection is raw_dbapi_con + + with eng.connect() as conn: + result = \ + conn.execute(select([users.c.user_name]). + order_by(users.c.user_id)) + eq_(result.fetchall(), []) + +class ResetAgentTest(fixtures.TestBase): + def test_begin_close(self): + with testing.db.connect() as connection: + trans = connection.begin() + assert connection.connection._reset_agent is trans + assert not trans.is_active + + def test_begin_rollback(self): + with testing.db.connect() as connection: + trans = connection.begin() + assert connection.connection._reset_agent is trans + trans.rollback() + assert connection.connection._reset_agent is None + + def test_begin_commit(self): + with testing.db.connect() as connection: + trans = connection.begin() + assert connection.connection._reset_agent is trans + trans.commit() + assert connection.connection._reset_agent is None + + @testing.requires.savepoints + def test_begin_nested_close(self): + with testing.db.connect() as connection: + trans = connection.begin_nested() + assert connection.connection._reset_agent is trans + assert not trans.is_active + + @testing.requires.savepoints + def test_begin_begin_nested_close(self): + with testing.db.connect() as connection: + trans = connection.begin() + trans2 = connection.begin_nested() + assert connection.connection._reset_agent is trans + assert trans2.is_active # was never closed + assert not trans.is_active + + @testing.requires.savepoints + def test_begin_begin_nested_rollback_commit(self): + with testing.db.connect() as connection: + trans = connection.begin() + trans2 = connection.begin_nested() + assert connection.connection._reset_agent is trans + trans2.rollback() + assert connection.connection._reset_agent is trans + trans.commit() + assert connection.connection._reset_agent is None + + @testing.requires.savepoints + def test_begin_begin_nested_rollback_rollback(self): + with testing.db.connect() as connection: + trans = connection.begin() + trans2 = connection.begin_nested() + assert connection.connection._reset_agent is trans + trans2.rollback() + assert connection.connection._reset_agent is trans + trans.rollback() + assert connection.connection._reset_agent is None + + def test_begin_begin_rollback_rollback(self): + with testing.db.connect() as connection: + trans = connection.begin() + trans2 = connection.begin() + assert connection.connection._reset_agent is trans + trans2.rollback() + assert connection.connection._reset_agent is None + trans.rollback() + assert connection.connection._reset_agent is None + + def test_begin_begin_commit_commit(self): + with testing.db.connect() as connection: + trans = connection.begin() + trans2 = connection.begin() + assert connection.connection._reset_agent is trans + trans2.commit() + assert connection.connection._reset_agent is trans + trans.commit() + assert connection.connection._reset_agent is None + + @testing.requires.two_phase_transactions + def test_reset_via_agent_begin_twophase(self): + with testing.db.connect() as connection: + trans = connection.begin_twophase() + assert connection.connection._reset_agent is trans + + @testing.requires.two_phase_transactions + def test_reset_via_agent_begin_twophase_commit(self): + with testing.db.connect() as connection: + trans = connection.begin_twophase() + assert connection.connection._reset_agent is trans + trans.commit() + assert connection.connection._reset_agent is None + + @testing.requires.two_phase_transactions + def test_reset_via_agent_begin_twophase_rollback(self): + with testing.db.connect() as connection: + trans = connection.begin_twophase() + assert connection.connection._reset_agent is trans + trans.rollback() + assert connection.connection._reset_agent is None + class AutoRollbackTest(fixtures.TestBase): @classmethod |