summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--doc/build/changelog/changelog_09.rst25
-rw-r--r--lib/sqlalchemy/engine/base.py16
-rw-r--r--lib/sqlalchemy/pool.py55
-rw-r--r--test/engine/test_pool.py90
-rw-r--r--test/engine/test_transaction.py133
5 files changed, 299 insertions, 20 deletions
diff --git a/doc/build/changelog/changelog_09.rst b/doc/build/changelog/changelog_09.rst
index e6a77378a..74f92da06 100644
--- a/doc/build/changelog/changelog_09.rst
+++ b/doc/build/changelog/changelog_09.rst
@@ -15,6 +15,31 @@
:version: 0.9.2
.. change::
+ :tags: bug, mysql, pool, engine
+ :tickets: 2907
+
+ :class:`.Connection` now associates a new
+ :class:`.RootTransaction` or :class:`.TwoPhaseTransaction`
+ with its immediate :class:`._ConnectionFairy` as a "reset handler"
+ for the span of that transaction, which takes over the task
+ of calling commit() or rollback() for the "reset on return" behavior
+ of :class:`.Pool` if the transaction was not otherwise completed.
+ This resolves the issue that a picky transaction
+ like that of MySQL two-phase will be
+ properly closed out when the connection is closed without an
+ explicit rollback or commit (e.g. no longer raises "XAER_RMFAIL"
+ in this case - note this only shows up in logging as the exception
+ is not propagated within pool reset).
+ This issue would arise e.g. when using an orm
+ :class:`.Session` with ``twophase`` set, and then
+ :meth:`.Session.close` is called without an explicit rollback or
+ commit. The change also has the effect that you will now see
+ an explicit "ROLLBACK" in the logs when using a :class:`.Session`
+ object in non-autocommit mode regardless of how that session was
+ discarded. Thanks to Jeff Dairiki and Laurence Rowe for isolating
+ the issue here.
+
+ .. change::
:tags: feature, pool, engine
Added a new pool event :meth:`.PoolEvents.invalidate`. Called when
diff --git a/lib/sqlalchemy/engine/base.py b/lib/sqlalchemy/engine/base.py
index ff2e6e282..5c66f4806 100644
--- a/lib/sqlalchemy/engine/base.py
+++ b/lib/sqlalchemy/engine/base.py
@@ -404,7 +404,7 @@ class Connection(Connectable):
"""
if self.__transaction is None:
- self.__transaction = RootTransaction(self)
+ self.__transaction = self.connection._reset_agent = RootTransaction(self)
return self.__transaction
else:
return Transaction(self, self.__transaction)
@@ -425,7 +425,7 @@ class Connection(Connectable):
"""
if self.__transaction is None:
- self.__transaction = RootTransaction(self)
+ self.__transaction = self.connection._reset_agent = RootTransaction(self)
else:
self.__transaction = NestedTransaction(self, self.__transaction)
return self.__transaction
@@ -453,7 +453,7 @@ class Connection(Connectable):
"is already in progress.")
if xid is None:
xid = self.engine.dialect.create_xid()
- self.__transaction = TwoPhaseTransaction(self, xid)
+ self.__transaction = self.connection._reset_agent = TwoPhaseTransaction(self, xid)
return self.__transaction
def recover_twophase(self):
@@ -491,11 +491,11 @@ class Connection(Connectable):
self.engine.logger.info("ROLLBACK")
try:
self.engine.dialect.do_rollback(self.connection)
- self.__transaction = None
+ self.__transaction = self.connection._reset_agent = None
except Exception as e:
self._handle_dbapi_exception(e, None, None, None, None)
else:
- self.__transaction = None
+ self.__transaction = self.connection._reset_agent = None
def _commit_impl(self, autocommit=False):
if self._has_events:
@@ -505,7 +505,7 @@ class Connection(Connectable):
self.engine.logger.info("COMMIT")
try:
self.engine.dialect.do_commit(self.connection)
- self.__transaction = None
+ self.__transaction = self.connection._reset_agent = None
except Exception as e:
self._handle_dbapi_exception(e, None, None, None, None)
@@ -560,7 +560,7 @@ class Connection(Connectable):
if self._still_open_and_connection_is_valid:
assert isinstance(self.__transaction, TwoPhaseTransaction)
self.engine.dialect.do_rollback_twophase(self, xid, is_prepared)
- self.__transaction = None
+ self.__transaction = self.connection._reset_agent = None
def _commit_twophase_impl(self, xid, is_prepared):
if self._has_events:
@@ -569,7 +569,7 @@ class Connection(Connectable):
if self._still_open_and_connection_is_valid:
assert isinstance(self.__transaction, TwoPhaseTransaction)
self.engine.dialect.do_commit_twophase(self, xid, is_prepared)
- self.__transaction = None
+ self.__transaction = self.connection._reset_agent = None
def _autorollback(self):
if not self.in_transaction():
diff --git a/lib/sqlalchemy/pool.py b/lib/sqlalchemy/pool.py
index 0f0a2ac10..f84f331d5 100644
--- a/lib/sqlalchemy/pool.py
+++ b/lib/sqlalchemy/pool.py
@@ -479,18 +479,8 @@ def _finalize_fairy(connection, connection_record, pool, ref, echo, fairy=None):
try:
fairy = fairy or _ConnectionFairy(connection, connection_record)
- if pool.dispatch.reset:
- pool.dispatch.reset(fairy, connection_record)
- if pool._reset_on_return is reset_rollback:
- if echo:
- pool.logger.debug("Connection %s rollback-on-return",
- connection)
- pool._dialect.do_rollback(fairy)
- elif pool._reset_on_return is reset_commit:
- if echo:
- pool.logger.debug("Connection %s commit-on-return",
- connection)
- pool._dialect.do_commit(fairy)
+ assert fairy.connection is connection
+ fairy._reset(pool, echo)
# Immediately close detached instances
if not connection_record:
@@ -542,6 +532,23 @@ class _ConnectionFairy(object):
"""
+ _reset_agent = None
+ """Refer to an object with a ``.commit()`` and ``.rollback()`` method;
+ if non-None, the "reset-on-return" feature will call upon this object
+ rather than directly against the dialect-level do_rollback() and do_commit()
+ methods.
+
+ In practice, a :class:`.Connection` assigns a :class:`.Transaction` object
+ to this variable when one is in scope so that the :class:`.Transaction`
+ takes the job of committing or rolling back on return if
+ :meth:`.Connection.close` is called while the :class:`.Transaction`
+ still exists.
+
+ This is essentially an "event handler" of sorts but is simplified as an
+ instance variable both for performance/simplicity as well as that there
+ can only be one "reset agent" at a time.
+ """
+
@classmethod
def _checkout(cls, pool, threadconns=None, fairy=None):
if not fairy:
@@ -591,6 +598,30 @@ class _ConnectionFairy(object):
_close = _checkin
+ def _reset(self, pool, echo):
+ if pool.dispatch.reset:
+ pool.dispatch.reset(self, self._connection_record)
+ if pool._reset_on_return is reset_rollback:
+ if echo:
+ pool.logger.debug("Connection %s rollback-on-return%s",
+ self.connection,
+ ", via agent"
+ if self._reset_agent else "")
+ if self._reset_agent:
+ self._reset_agent.rollback()
+ else:
+ pool._dialect.do_rollback(self)
+ elif pool._reset_on_return is reset_commit:
+ if echo:
+ pool.logger.debug("Connection %s commit-on-return%s",
+ self.connection,
+ ", via agent"
+ if self._reset_agent else "")
+ if self._reset_agent:
+ self._reset_agent.commit()
+ else:
+ pool._dialect.do_commit(self)
+
@property
def _logger(self):
return self._pool.logger
diff --git a/test/engine/test_pool.py b/test/engine/test_pool.py
index 10f490b48..2e4c2dc48 100644
--- a/test/engine/test_pool.py
+++ b/test/engine/test_pool.py
@@ -1328,6 +1328,96 @@ class QueuePoolTest(PoolTestBase):
c2 = p.connect()
assert c2.connection is not None
+class ResetOnReturnTest(PoolTestBase):
+ def _fixture(self, **kw):
+ dbapi = Mock()
+ return dbapi, pool.QueuePool(creator=lambda: dbapi.connect('foo.db'), **kw)
+
+ def test_plain_rollback(self):
+ dbapi, p = self._fixture(reset_on_return='rollback')
+
+ c1 = p.connect()
+ c1.close()
+ assert dbapi.connect().rollback.called
+ assert not dbapi.connect().commit.called
+
+ def test_plain_commit(self):
+ dbapi, p = self._fixture(reset_on_return='commit')
+
+ c1 = p.connect()
+ c1.close()
+ assert not dbapi.connect().rollback.called
+ assert dbapi.connect().commit.called
+
+ def test_plain_none(self):
+ dbapi, p = self._fixture(reset_on_return=None)
+
+ c1 = p.connect()
+ c1.close()
+ assert not dbapi.connect().rollback.called
+ assert not dbapi.connect().commit.called
+
+ def test_agent_rollback(self):
+ dbapi, p = self._fixture(reset_on_return='rollback')
+
+ class Agent(object):
+ def __init__(self, conn):
+ self.conn = conn
+
+ def rollback(self):
+ self.conn.special_rollback()
+
+ def commit(self):
+ self.conn.special_commit()
+
+ c1 = p.connect()
+ c1._reset_agent = Agent(c1)
+ c1.close()
+
+ assert dbapi.connect().special_rollback.called
+ assert not dbapi.connect().special_commit.called
+
+ assert not dbapi.connect().rollback.called
+ assert not dbapi.connect().commit.called
+
+ c1 = p.connect()
+ c1.close()
+ eq_(dbapi.connect().special_rollback.call_count, 1)
+ eq_(dbapi.connect().special_commit.call_count, 0)
+
+ assert dbapi.connect().rollback.called
+ assert not dbapi.connect().commit.called
+
+ def test_agent_commit(self):
+ dbapi, p = self._fixture(reset_on_return='commit')
+
+ class Agent(object):
+ def __init__(self, conn):
+ self.conn = conn
+
+ def rollback(self):
+ self.conn.special_rollback()
+
+ def commit(self):
+ self.conn.special_commit()
+
+ c1 = p.connect()
+ c1._reset_agent = Agent(c1)
+ c1.close()
+ assert not dbapi.connect().special_rollback.called
+ assert dbapi.connect().special_commit.called
+
+ assert not dbapi.connect().rollback.called
+ assert not dbapi.connect().commit.called
+
+ c1 = p.connect()
+ c1.close()
+
+ eq_(dbapi.connect().special_rollback.call_count, 0)
+ eq_(dbapi.connect().special_commit.call_count, 1)
+ assert not dbapi.connect().rollback.called
+ assert dbapi.connect().commit.called
+
class SingletonThreadPoolTest(PoolTestBase):
@testing.requires.threading_with_mock
diff --git a/test/engine/test_transaction.py b/test/engine/test_transaction.py
index 9c45cfddc..e3f5fc252 100644
--- a/test/engine/test_transaction.py
+++ b/test/engine/test_transaction.py
@@ -3,6 +3,7 @@ from sqlalchemy.testing import eq_, assert_raises, \
import sys
import time
import threading
+from sqlalchemy import event
from sqlalchemy.testing.engines import testing_engine
from sqlalchemy import create_engine, MetaData, INT, VARCHAR, Sequence, \
select, Integer, String, func, text, exc
@@ -378,6 +379,138 @@ class TransactionTest(fixtures.TestBase):
eq_(result.fetchall(), [('user1', ), ('user4', )])
conn.close()
+ @testing.requires.two_phase_transactions
+ def test_reset_rollback_two_phase_no_rollback(self):
+ # test [ticket:2907], essentially that the
+ # TwoPhaseTransaction is given the job of "reset on return"
+ # so that picky backends like MySQL correctly clear out
+ # their state when a connection is closed without handling
+ # the transaction explicitly.
+
+ eng = testing_engine()
+
+ # MySQL raises if you call straight rollback() on
+ # a connection with an XID present
+ @event.listens_for(eng, "invalidate")
+ def conn_invalidated(dbapi_con, con_record, exception):
+ dbapi_con.close()
+ raise exception
+
+ with eng.connect() as conn:
+ rec = conn.connection._connection_record
+ raw_dbapi_con = rec.connection
+ xa = conn.begin_twophase()
+ conn.execute(users.insert(), user_id=1, user_name='user1')
+
+ assert rec.connection is raw_dbapi_con
+
+ with eng.connect() as conn:
+ result = \
+ conn.execute(select([users.c.user_name]).
+ order_by(users.c.user_id))
+ eq_(result.fetchall(), [])
+
+class ResetAgentTest(fixtures.TestBase):
+ def test_begin_close(self):
+ with testing.db.connect() as connection:
+ trans = connection.begin()
+ assert connection.connection._reset_agent is trans
+ assert not trans.is_active
+
+ def test_begin_rollback(self):
+ with testing.db.connect() as connection:
+ trans = connection.begin()
+ assert connection.connection._reset_agent is trans
+ trans.rollback()
+ assert connection.connection._reset_agent is None
+
+ def test_begin_commit(self):
+ with testing.db.connect() as connection:
+ trans = connection.begin()
+ assert connection.connection._reset_agent is trans
+ trans.commit()
+ assert connection.connection._reset_agent is None
+
+ @testing.requires.savepoints
+ def test_begin_nested_close(self):
+ with testing.db.connect() as connection:
+ trans = connection.begin_nested()
+ assert connection.connection._reset_agent is trans
+ assert not trans.is_active
+
+ @testing.requires.savepoints
+ def test_begin_begin_nested_close(self):
+ with testing.db.connect() as connection:
+ trans = connection.begin()
+ trans2 = connection.begin_nested()
+ assert connection.connection._reset_agent is trans
+ assert trans2.is_active # was never closed
+ assert not trans.is_active
+
+ @testing.requires.savepoints
+ def test_begin_begin_nested_rollback_commit(self):
+ with testing.db.connect() as connection:
+ trans = connection.begin()
+ trans2 = connection.begin_nested()
+ assert connection.connection._reset_agent is trans
+ trans2.rollback()
+ assert connection.connection._reset_agent is trans
+ trans.commit()
+ assert connection.connection._reset_agent is None
+
+ @testing.requires.savepoints
+ def test_begin_begin_nested_rollback_rollback(self):
+ with testing.db.connect() as connection:
+ trans = connection.begin()
+ trans2 = connection.begin_nested()
+ assert connection.connection._reset_agent is trans
+ trans2.rollback()
+ assert connection.connection._reset_agent is trans
+ trans.rollback()
+ assert connection.connection._reset_agent is None
+
+ def test_begin_begin_rollback_rollback(self):
+ with testing.db.connect() as connection:
+ trans = connection.begin()
+ trans2 = connection.begin()
+ assert connection.connection._reset_agent is trans
+ trans2.rollback()
+ assert connection.connection._reset_agent is None
+ trans.rollback()
+ assert connection.connection._reset_agent is None
+
+ def test_begin_begin_commit_commit(self):
+ with testing.db.connect() as connection:
+ trans = connection.begin()
+ trans2 = connection.begin()
+ assert connection.connection._reset_agent is trans
+ trans2.commit()
+ assert connection.connection._reset_agent is trans
+ trans.commit()
+ assert connection.connection._reset_agent is None
+
+ @testing.requires.two_phase_transactions
+ def test_reset_via_agent_begin_twophase(self):
+ with testing.db.connect() as connection:
+ trans = connection.begin_twophase()
+ assert connection.connection._reset_agent is trans
+
+ @testing.requires.two_phase_transactions
+ def test_reset_via_agent_begin_twophase_commit(self):
+ with testing.db.connect() as connection:
+ trans = connection.begin_twophase()
+ assert connection.connection._reset_agent is trans
+ trans.commit()
+ assert connection.connection._reset_agent is None
+
+ @testing.requires.two_phase_transactions
+ def test_reset_via_agent_begin_twophase_rollback(self):
+ with testing.db.connect() as connection:
+ trans = connection.begin_twophase()
+ assert connection.connection._reset_agent is trans
+ trans.rollback()
+ assert connection.connection._reset_agent is None
+
class AutoRollbackTest(fixtures.TestBase):
@classmethod