summaryrefslogtreecommitdiff
path: root/test/engine/test_transaction.py
diff options
context:
space:
mode:
Diffstat (limited to 'test/engine/test_transaction.py')
-rw-r--r--test/engine/test_transaction.py1460
1 files changed, 600 insertions, 860 deletions
diff --git a/test/engine/test_transaction.py b/test/engine/test_transaction.py
index 9e6142022..50da425bd 100644
--- a/test/engine/test_transaction.py
+++ b/test/engine/test_transaction.py
@@ -1,5 +1,3 @@
-import sys
-
from sqlalchemy import event
from sqlalchemy import exc
from sqlalchemy import func
@@ -20,7 +18,6 @@ from sqlalchemy.testing import expect_warnings
from sqlalchemy.testing import fixtures
from sqlalchemy.testing import mock
from sqlalchemy.testing import ne_
-from sqlalchemy.testing.assertions import expect_deprecated_20
from sqlalchemy.testing.assertions import expect_raises_message
from sqlalchemy.testing.engines import testing_engine
from sqlalchemy.testing.schema import Column
@@ -45,30 +42,6 @@ class TransactionTest(fixtures.TablesTest):
with testing.db.connect() as conn:
yield conn
- def test_interrupt_ctxmanager_engine(self, trans_ctx_manager_fixture):
- fn = trans_ctx_manager_fixture
-
- # add commit/rollback to the legacy Connection object so that
- # we can test this less-likely case in use with the legacy
- # Engine.begin() context manager
- class ConnWCommitRollback(testing.db._connection_cls):
- def commit(self):
- self.get_transaction().commit()
-
- def rollback(self):
- self.get_transaction().rollback()
-
- with mock.patch.object(
- testing.db, "_connection_cls", ConnWCommitRollback
- ):
- fn(testing.db, trans_on_subject=False, execute_on_subject=False)
-
- def test_interrupt_ctxmanager_connection(self, trans_ctx_manager_fixture):
- fn = trans_ctx_manager_fixture
-
- with testing.db.connect() as conn:
- fn(conn, trans_on_subject=False, execute_on_subject=True)
-
def test_commits(self, local_connection):
users = self.tables.users
connection = local_connection
@@ -148,56 +121,6 @@ class TransactionTest(fixtures.TablesTest):
assert not local_connection.in_transaction()
@testing.combinations((True,), (False,), argnames="roll_back_in_block")
- def test_ctxmanager_rolls_back(self, local_connection, roll_back_in_block):
- m1 = mock.Mock()
-
- event.listen(local_connection, "rollback", m1.rollback)
- event.listen(local_connection, "commit", m1.commit)
-
- with expect_raises_message(Exception, "test"):
- with local_connection.begin() as trans:
- if roll_back_in_block:
- trans.rollback()
-
- if 1 == 1:
- raise Exception("test")
-
- assert not trans.is_active
- assert not local_connection.in_transaction()
- assert trans._deactivated_from_connection
-
- eq_(m1.mock_calls, [mock.call.rollback(local_connection)])
-
- @testing.combinations((True,), (False,), argnames="roll_back_in_block")
- def test_ctxmanager_rolls_back_legacy_marker(
- self, local_connection, roll_back_in_block
- ):
- m1 = mock.Mock()
-
- event.listen(local_connection, "rollback", m1.rollback)
- event.listen(local_connection, "commit", m1.commit)
-
- with expect_deprecated_20(
- r"Calling .begin\(\) when a transaction is already begun"
- ):
- with local_connection.begin() as trans:
- with expect_raises_message(Exception, "test"):
- with local_connection.begin() as marker_trans:
- if roll_back_in_block:
- marker_trans.rollback()
- if 1 == 1:
- raise Exception("test")
-
- assert not marker_trans.is_active
- assert marker_trans._deactivated_from_connection
-
- assert not trans._deactivated_from_connection
- assert not trans.is_active
- assert not local_connection.in_transaction()
-
- eq_(m1.mock_calls, [mock.call.rollback(local_connection)])
-
- @testing.combinations((True,), (False,), argnames="roll_back_in_block")
@testing.requires.savepoints
def test_ctxmanager_rolls_back_savepoint(
self, local_connection, roll_back_in_block
@@ -235,37 +158,6 @@ class TransactionTest(fixtures.TablesTest):
],
)
- def test_ctxmanager_commits_real_trans_from_nested(self, local_connection):
- m1 = mock.Mock()
-
- event.listen(
- local_connection, "rollback_savepoint", m1.rollback_savepoint
- )
- event.listen(
- local_connection, "release_savepoint", m1.release_savepoint
- )
- event.listen(local_connection, "rollback", m1.rollback)
- event.listen(local_connection, "commit", m1.commit)
- event.listen(local_connection, "begin", m1.begin)
- event.listen(local_connection, "savepoint", m1.savepoint)
-
- with testing.expect_deprecated_20(
- r"Calling Connection.begin_nested\(\) in 2.0 style use will return"
- ):
- with local_connection.begin_nested() as nested_trans:
- pass
-
- assert not nested_trans.is_active
- assert nested_trans._deactivated_from_connection
- # legacy mode, no savepoint at all
- eq_(
- m1.mock_calls,
- [
- mock.call.begin(local_connection),
- mock.call.commit(local_connection),
- ],
- )
-
def test_deactivated_warning_straight(self, local_connection):
with expect_warnings(
"transaction already deassociated from connection"
@@ -427,39 +319,34 @@ class TransactionTest(fixtures.TablesTest):
0,
)
- def test_with_interface(self, local_connection):
+ def test_ctxmanager_interface(self, local_connection):
+ # a legacy test, adapted for 2.x style, was called
+ # "test_with_interface". this is likely an early test for when
+ # the "with" construct was first added.
+
connection = local_connection
users = self.tables.users
trans = connection.begin()
- trans.__enter__()
- connection.execute(users.insert(), dict(user_id=1, user_name="user1"))
- connection.execute(users.insert(), dict(user_id=2, user_name="user2"))
- try:
+
+ with trans:
connection.execute(
- users.insert(), dict(user_id=2, user_name="user2.5")
+ users.insert(), dict(user_id=1, user_name="user1")
+ )
+ connection.execute(
+ users.insert(), dict(user_id=2, user_name="user2")
)
- except Exception:
- trans.__exit__(*sys.exc_info())
- assert not trans.is_active
- self.assert_(
- connection.exec_driver_sql(
- "select count(*) from " "users"
- ).scalar()
- == 0
- )
+ assert trans.is_active
- trans = connection.begin()
- trans.__enter__()
- connection.execute(users.insert(), dict(user_id=1, user_name="user1"))
- trans.__exit__(None, None, None)
assert not trans.is_active
- self.assert_(
+
+ eq_(
connection.exec_driver_sql(
"select count(*) from " "users"
- ).scalar()
- == 1
+ ).scalar(),
+ 2,
)
+ connection.rollback()
def test_close(self, local_connection):
connection = local_connection
@@ -648,6 +535,531 @@ class TransactionTest(fixtures.TablesTest):
)
eq_(result.fetchall(), [])
+ def test_interrupt_ctxmanager_engine(self, trans_ctx_manager_fixture):
+ fn = trans_ctx_manager_fixture
+
+ fn(testing.db, trans_on_subject=False, execute_on_subject=False)
+
+ @testing.combinations((True,), (False,), argnames="trans_on_subject")
+ def test_interrupt_ctxmanager_connection(
+ self, trans_ctx_manager_fixture, trans_on_subject
+ ):
+ fn = trans_ctx_manager_fixture
+
+ with testing.db.connect() as conn:
+ fn(
+ conn,
+ trans_on_subject=trans_on_subject,
+ execute_on_subject=True,
+ )
+
+ def test_autobegin_rollback(self):
+ users = self.tables.users
+ with testing.db.connect() as conn:
+ conn.execute(users.insert(), {"user_id": 1, "user_name": "name"})
+ conn.rollback()
+
+ eq_(conn.scalar(select(func.count(1)).select_from(users)), 0)
+
+ @testing.requires.autocommit
+ def test_autocommit_isolation_level(self):
+ users = self.tables.users
+
+ with testing.db.connect().execution_options(
+ isolation_level="AUTOCOMMIT"
+ ) as conn:
+ conn.execute(users.insert(), {"user_id": 1, "user_name": "name"})
+ conn.rollback()
+
+ with testing.db.connect() as conn:
+ eq_(
+ conn.scalar(select(func.count(1)).select_from(users)),
+ 1,
+ )
+
+ @testing.requires.autocommit
+ def test_no_autocommit_w_begin(self):
+
+ with testing.db.begin() as conn:
+ assert_raises_message(
+ exc.InvalidRequestError,
+ r"This connection has already initialized a SQLAlchemy "
+ r"Transaction\(\) object via begin\(\) or autobegin; "
+ r"isolation_level may not be altered unless rollback\(\) or "
+ r"commit\(\) is called first.",
+ conn.execution_options,
+ isolation_level="AUTOCOMMIT",
+ )
+
+ @testing.requires.autocommit
+ def test_no_autocommit_w_autobegin(self):
+
+ with testing.db.connect() as conn:
+ conn.execute(select(1))
+
+ assert_raises_message(
+ exc.InvalidRequestError,
+ r"This connection has already initialized a SQLAlchemy "
+ r"Transaction\(\) object via begin\(\) or autobegin; "
+ r"isolation_level may not be altered unless rollback\(\) or "
+ r"commit\(\) is called first.",
+ conn.execution_options,
+ isolation_level="AUTOCOMMIT",
+ )
+
+ conn.rollback()
+
+ conn.execution_options(isolation_level="AUTOCOMMIT")
+
+ def test_autobegin_commit(self):
+ users = self.tables.users
+
+ with testing.db.connect() as conn:
+
+ assert not conn.in_transaction()
+ conn.execute(users.insert(), {"user_id": 1, "user_name": "name"})
+
+ assert conn.in_transaction()
+ conn.commit()
+
+ assert not conn.in_transaction()
+
+ eq_(
+ conn.scalar(select(func.count(1)).select_from(users)),
+ 1,
+ )
+
+ conn.execute(users.insert(), {"user_id": 2, "user_name": "name 2"})
+
+ eq_(
+ conn.scalar(select(func.count(1)).select_from(users)),
+ 2,
+ )
+
+ assert conn.in_transaction()
+ conn.rollback()
+ assert not conn.in_transaction()
+
+ eq_(
+ conn.scalar(select(func.count(1)).select_from(users)),
+ 1,
+ )
+
+ def test_rollback_on_close(self):
+ canary = mock.Mock()
+ with testing.db.connect() as conn:
+ event.listen(conn, "rollback", canary)
+ conn.execute(select(1))
+ assert conn.in_transaction()
+
+ eq_(canary.mock_calls, [mock.call(conn)])
+
+ def test_no_on_close_no_transaction(self):
+ canary = mock.Mock()
+ with testing.db.connect() as conn:
+ event.listen(conn, "rollback", canary)
+ conn.execute(select(1))
+ conn.rollback()
+ assert not conn.in_transaction()
+
+ eq_(canary.mock_calls, [mock.call(conn)])
+
+ def test_rollback_on_exception(self):
+ canary = mock.Mock()
+ try:
+ with testing.db.connect() as conn:
+ event.listen(conn, "rollback", canary)
+ conn.execute(select(1))
+ assert conn.in_transaction()
+ raise Exception("some error")
+ assert False
+ except:
+ pass
+
+ eq_(canary.mock_calls, [mock.call(conn)])
+
+ def test_rollback_on_exception_if_no_trans(self):
+ canary = mock.Mock()
+ try:
+ with testing.db.connect() as conn:
+ event.listen(conn, "rollback", canary)
+ assert not conn.in_transaction()
+ raise Exception("some error")
+ assert False
+ except:
+ pass
+
+ eq_(canary.mock_calls, [])
+
+ def test_commit_no_begin(self):
+ with testing.db.connect() as conn:
+ assert not conn.in_transaction()
+ conn.commit()
+
+ @testing.requires.independent_connections
+ def test_commit_inactive(self):
+ with testing.db.connect() as conn:
+ conn.begin()
+ conn.invalidate()
+
+ assert_raises_message(
+ exc.InvalidRequestError, "Can't reconnect until", conn.commit
+ )
+
+ @testing.requires.independent_connections
+ def test_rollback_inactive(self):
+ users = self.tables.users
+ with testing.db.connect() as conn:
+
+ conn.execute(users.insert(), {"user_id": 1, "user_name": "name"})
+ conn.commit()
+
+ conn.execute(users.insert(), {"user_id": 2, "user_name": "name2"})
+
+ conn.invalidate()
+
+ assert_raises_message(
+ exc.PendingRollbackError,
+ "Can't reconnect",
+ conn.execute,
+ select(1),
+ )
+
+ conn.rollback()
+ eq_(
+ conn.scalar(select(func.count(1)).select_from(users)),
+ 1,
+ )
+
+ def test_rollback_no_begin(self):
+ with testing.db.connect() as conn:
+ assert not conn.in_transaction()
+ conn.rollback()
+
+ def test_rollback_end_ctx_manager(self):
+ with testing.db.begin() as conn:
+ assert conn.in_transaction()
+ conn.rollback()
+ assert not conn.in_transaction()
+
+ def test_rollback_end_ctx_manager_autobegin(self, local_connection):
+ m1 = mock.Mock()
+
+ event.listen(local_connection, "rollback", m1.rollback)
+ event.listen(local_connection, "commit", m1.commit)
+
+ with local_connection.begin() as trans:
+ assert local_connection.in_transaction()
+ trans.rollback()
+ assert not local_connection.in_transaction()
+
+ # previously, would be subject to autocommit.
+ # now it raises
+ with expect_raises_message(
+ exc.InvalidRequestError,
+ "Can't operate on closed transaction inside context manager. "
+ "Please complete the context manager before emitting "
+ "further commands.",
+ ):
+ local_connection.execute(select(1))
+
+ assert not local_connection.in_transaction()
+
+ @testing.combinations((True,), (False,), argnames="roll_back_in_block")
+ def test_ctxmanager_rolls_back(self, local_connection, roll_back_in_block):
+ m1 = mock.Mock()
+
+ event.listen(local_connection, "rollback", m1.rollback)
+ event.listen(local_connection, "commit", m1.commit)
+
+ with expect_raises_message(Exception, "test"):
+ with local_connection.begin() as trans:
+ if roll_back_in_block:
+ trans.rollback()
+
+ if 1 == 1:
+ raise Exception("test")
+
+ assert not trans.is_active
+ assert not local_connection.in_transaction()
+ assert trans._deactivated_from_connection
+
+ eq_(m1.mock_calls, [mock.call.rollback(local_connection)])
+
+ @testing.requires.savepoints
+ def test_ctxmanager_autobegins_real_trans_from_nested(
+ self, local_connection
+ ):
+ # the legacy version of this test in 1.4
+ # was test_ctxmanager_commits_real_trans_from_nested
+ m1 = mock.Mock()
+
+ event.listen(
+ local_connection, "rollback_savepoint", m1.rollback_savepoint
+ )
+ event.listen(
+ local_connection, "release_savepoint", m1.release_savepoint
+ )
+ event.listen(local_connection, "rollback", m1.rollback)
+ event.listen(local_connection, "commit", m1.commit)
+ event.listen(local_connection, "begin", m1.begin)
+ event.listen(local_connection, "savepoint", m1.savepoint)
+
+ with local_connection.begin_nested() as nested_trans:
+ pass
+
+ assert not nested_trans.is_active
+ assert nested_trans._deactivated_from_connection
+ eq_(
+ m1.mock_calls,
+ [
+ mock.call.begin(local_connection),
+ mock.call.savepoint(local_connection, mock.ANY),
+ mock.call.release_savepoint(
+ local_connection, mock.ANY, mock.ANY
+ ),
+ ],
+ )
+
+ def test_explicit_begin(self):
+ users = self.tables.users
+
+ with testing.db.connect() as conn:
+ assert not conn.in_transaction()
+ conn.begin()
+ assert conn.in_transaction()
+ conn.execute(users.insert(), {"user_id": 1, "user_name": "name"})
+ conn.commit()
+
+ eq_(
+ conn.scalar(select(func.count(1)).select_from(users)),
+ 1,
+ )
+
+ def test_no_double_begin(self):
+ with testing.db.connect() as conn:
+ conn.begin()
+
+ assert_raises_message(
+ exc.InvalidRequestError,
+ r"This connection has already initialized a SQLAlchemy "
+ r"Transaction\(\) object via begin\(\) or autobegin; can't "
+ r"call begin\(\) here unless rollback\(\) or commit\(\) is "
+ r"called first.",
+ conn.begin,
+ )
+
+ def test_no_autocommit(self):
+ users = self.tables.users
+
+ with testing.db.connect() as conn:
+ conn.execute(users.insert(), {"user_id": 1, "user_name": "name"})
+
+ with testing.db.connect() as conn:
+ eq_(
+ conn.scalar(select(func.count(1)).select_from(users)),
+ 0,
+ )
+
+ def test_begin_block(self):
+ users = self.tables.users
+
+ with testing.db.begin() as conn:
+ conn.execute(users.insert(), {"user_id": 1, "user_name": "name"})
+
+ with testing.db.connect() as conn:
+ eq_(
+ conn.scalar(select(func.count(1)).select_from(users)),
+ 1,
+ )
+
+ @testing.requires.savepoints
+ def test_savepoint_one(self):
+ users = self.tables.users
+
+ with testing.db.begin() as conn:
+ conn.execute(users.insert(), {"user_id": 1, "user_name": "name"})
+
+ savepoint = conn.begin_nested()
+ conn.execute(users.insert(), {"user_id": 2, "user_name": "name2"})
+
+ eq_(
+ conn.scalar(select(func.count(1)).select_from(users)),
+ 2,
+ )
+ savepoint.rollback()
+
+ eq_(
+ conn.scalar(select(func.count(1)).select_from(users)),
+ 1,
+ )
+
+ with testing.db.connect() as conn:
+ eq_(
+ conn.scalar(select(func.count(1)).select_from(users)),
+ 1,
+ )
+
+ @testing.requires.savepoints
+ def test_savepoint_two(self):
+ users = self.tables.users
+
+ with testing.db.begin() as conn:
+ conn.execute(users.insert(), {"user_id": 1, "user_name": "name"})
+
+ savepoint = conn.begin_nested()
+ conn.execute(users.insert(), {"user_id": 2, "user_name": "name2"})
+
+ eq_(
+ conn.scalar(select(func.count(1)).select_from(users)),
+ 2,
+ )
+ savepoint.commit()
+
+ eq_(
+ conn.scalar(select(func.count(1)).select_from(users)),
+ 2,
+ )
+
+ with testing.db.connect() as conn:
+ eq_(
+ conn.scalar(select(func.count(1)).select_from(users)),
+ 2,
+ )
+
+ @testing.requires.savepoints
+ def test_savepoint_three(self):
+ users = self.tables.users
+
+ with testing.db.begin() as conn:
+ conn.execute(users.insert(), {"user_id": 1, "user_name": "name"})
+
+ conn.begin_nested()
+ conn.execute(users.insert(), {"user_id": 2, "user_name": "name2"})
+
+ conn.rollback()
+
+ assert not conn.in_transaction()
+
+ with testing.db.connect() as conn:
+ eq_(
+ conn.scalar(select(func.count(1)).select_from(users)),
+ 0,
+ )
+
+ @testing.requires.savepoints
+ def test_savepoint_four(self):
+ users = self.tables.users
+
+ with testing.db.begin() as conn:
+ conn.execute(users.insert(), {"user_id": 1, "user_name": "name"})
+
+ sp1 = conn.begin_nested()
+ conn.execute(users.insert(), {"user_id": 2, "user_name": "name2"})
+
+ sp2 = conn.begin_nested()
+ conn.execute(users.insert(), {"user_id": 3, "user_name": "name3"})
+
+ sp2.rollback()
+
+ assert not sp2.is_active
+ assert sp1.is_active
+ assert conn.in_transaction()
+
+ assert not sp1.is_active
+
+ with testing.db.connect() as conn:
+ eq_(
+ conn.scalar(select(func.count(1)).select_from(users)),
+ 2,
+ )
+
+ @testing.requires.savepoints
+ def test_savepoint_five(self):
+ users = self.tables.users
+
+ with testing.db.begin() as conn:
+ conn.execute(users.insert(), {"user_id": 1, "user_name": "name"})
+
+ conn.begin_nested()
+ conn.execute(users.insert(), {"user_id": 2, "user_name": "name2"})
+
+ sp2 = conn.begin_nested()
+ conn.execute(users.insert(), {"user_id": 3, "user_name": "name3"})
+
+ sp2.commit()
+
+ assert conn.in_transaction()
+
+ with testing.db.connect() as conn:
+ eq_(
+ conn.scalar(select(func.count(1)).select_from(users)),
+ 3,
+ )
+
+ @testing.requires.savepoints
+ def test_savepoint_six(self):
+ users = self.tables.users
+
+ with testing.db.begin() as conn:
+ conn.execute(users.insert(), {"user_id": 1, "user_name": "name"})
+
+ sp1 = conn.begin_nested()
+ conn.execute(users.insert(), {"user_id": 2, "user_name": "name2"})
+
+ assert conn._nested_transaction is sp1
+
+ sp2 = conn.begin_nested()
+ conn.execute(users.insert(), {"user_id": 3, "user_name": "name3"})
+
+ assert conn._nested_transaction is sp2
+
+ sp2.commit()
+
+ assert conn._nested_transaction is sp1
+
+ sp1.rollback()
+
+ assert conn._nested_transaction is None
+
+ assert conn.in_transaction()
+
+ with testing.db.connect() as conn:
+ eq_(
+ conn.scalar(select(func.count(1)).select_from(users)),
+ 1,
+ )
+
+ @testing.requires.savepoints
+ def test_savepoint_seven(self):
+ users = self.tables.users
+
+ conn = testing.db.connect()
+ trans = conn.begin()
+ conn.execute(users.insert(), {"user_id": 1, "user_name": "name"})
+
+ sp1 = conn.begin_nested()
+ conn.execute(users.insert(), {"user_id": 2, "user_name": "name2"})
+
+ sp2 = conn.begin_nested()
+ conn.execute(users.insert(), {"user_id": 3, "user_name": "name3"})
+
+ assert conn.in_transaction()
+
+ trans.close()
+
+ assert not sp1.is_active
+ assert not sp2.is_active
+ assert not trans.is_active
+ assert conn._transaction is None
+ assert conn._nested_transaction is None
+
+ with testing.db.connect() as conn:
+ eq_(
+ conn.scalar(select(func.count(1)).select_from(users)),
+ 0,
+ )
+
class AutoRollbackTest(fixtures.TestBase):
__backend__ = True
@@ -693,7 +1105,6 @@ class IsolationLevelTest(fixtures.TestBase):
__requires__ = (
"isolation_level",
"ad_hoc_engines",
- "legacy_isolation_level",
)
__backend__ = True
@@ -712,6 +1123,7 @@ class IsolationLevelTest(fixtures.TestBase):
else:
assert False, "no non-default isolation level available"
+ @testing.requires.legacy_isolation_level
def test_engine_param_stays(self):
eng = testing_engine()
@@ -765,6 +1177,7 @@ class IsolationLevelTest(fixtures.TestBase):
conn.close()
+ @testing.requires.legacy_isolation_level
def test_reset_level_with_setting(self):
eng = testing_engine(
options=dict(isolation_level=self._non_default_isolation_level())
@@ -788,7 +1201,8 @@ class IsolationLevelTest(fixtures.TestBase):
)
conn.close()
- def test_invalid_level(self):
+ @testing.requires.legacy_isolation_level
+ def test_invalid_level_engine_param(self):
eng = testing_engine(options=dict(isolation_level="FOO"))
assert_raises_message(
exc.ArgumentError,
@@ -802,6 +1216,33 @@ class IsolationLevelTest(fixtures.TestBase):
eng.connect,
)
+ # TODO: all the dialects seem to be manually raising ArgumentError
+ # individually within their set_isolation_level() methods, when this
+ # should be a default dialect feature so that
+ # error messaging etc. is consistent, including that it works for 3rd
+ # party dialects.
+ # TODO: barring that, at least implement this for the Oracle dialect
+ @testing.fails_on(
+ "oracle",
+ "cx_oracle dialect doesnt have argument error here, "
+ "raises it via the DB rejecting it",
+ )
+ def test_invalid_level_execution_option(self):
+ eng = testing_engine(
+ options=dict(execution_options=dict(isolation_level="FOO"))
+ )
+ assert_raises_message(
+ exc.ArgumentError,
+ "Invalid value '%s' for isolation_level. "
+ "Valid isolation levels for %s are %s"
+ % (
+ "FOO",
+ eng.dialect.name,
+ ", ".join(eng.dialect._isolation_lookup),
+ ),
+ eng.connect,
+ )
+
def test_connection_invalidated(self):
eng = testing_engine()
conn = eng.connect()
@@ -851,28 +1292,25 @@ class IsolationLevelTest(fixtures.TestBase):
c3.close()
c4.close()
- def test_warning_in_transaction(self):
+ def test_exception_in_transaction(self):
eng = testing_engine()
c1 = eng.connect()
- with expect_warnings(
- "Connection is already established with a Transaction; "
- "setting isolation_level may implicitly rollback or commit "
- "the existing transaction, or have no effect until next "
- "transaction"
+ with expect_raises_message(
+ exc.InvalidRequestError,
+ r"This connection has already initialized a SQLAlchemy "
+ r"Transaction\(\) object via begin\(\) or autobegin; "
+ r"isolation_level may not be altered unless rollback\(\) or "
+ r"commit\(\) is called first.",
):
with c1.begin():
c1 = c1.execution_options(
isolation_level=self._non_default_isolation_level()
)
- eq_(
- eng.dialect.get_isolation_level(c1.connection),
- self._non_default_isolation_level(),
- )
- # stays outside of transaction
+ # was never set, so we are on original value
eq_(
eng.dialect.get_isolation_level(c1.connection),
- self._non_default_isolation_level(),
+ self._default_isolation_level(),
)
def test_per_statement_bzzt(self):
@@ -1015,25 +1453,25 @@ class ConnectionCharacteristicTest(fixtures.TestBase):
eq_(connection.foo, "original_value")
- def test_warning_in_transaction(self, characteristic_fixture):
+ def test_exception_in_transaction(self, characteristic_fixture):
+ # this was a warning in 1.x. it appears we did not test the
+ # 2.0 error case in 1.4
+
engine, connection = characteristic_fixture
c1 = engine.connect()
- with expect_warnings(
- "Connection is already established with a Transaction; "
- "setting foo may implicitly rollback or commit "
- "the existing transaction, or have no effect until next "
- "transaction"
+ with expect_raises_message(
+ exc.InvalidRequestError,
+ r"This connection has already initialized a SQLAlchemy "
+ r"Transaction\(\) object via begin\(\) or autobegin; "
+ r"foo may not be altered unless rollback\(\) or "
+ r"commit\(\) is called first.",
):
with c1.begin():
c1 = c1.execution_options(foo="new_foo")
- eq_(
- engine.dialect.get_foo(c1.connection),
- "new_foo",
- )
- # stays outside of transaction
- eq_(engine.dialect.get_foo(c1.connection), "new_foo")
+ # was never set, so we are on original value
+ eq_(engine.dialect.get_foo(c1.connection), "original_value")
@testing.fails("no error is raised yet here.")
def test_per_statement_bzzt(self, characteristic_fixture):
@@ -1120,6 +1558,7 @@ class ResetAgentTest(ResetFixture, fixtures.TestBase):
def test_begin_close(self, reset_agent):
with reset_agent.engine.connect() as connection:
trans = connection.begin()
+
assert not trans.is_active
eq_(
reset_agent.mock_calls,
@@ -1130,6 +1569,7 @@ class ResetAgentTest(ResetFixture, fixtures.TestBase):
with reset_agent.engine.connect() as connection:
trans = connection.begin()
trans.rollback()
+ assert not trans.is_active
eq_(
reset_agent.mock_calls,
[
@@ -1143,6 +1583,7 @@ class ResetAgentTest(ResetFixture, fixtures.TestBase):
with reset_agent.engine.connect() as connection:
trans = connection.begin()
trans.commit()
+ assert not trans.is_active
eq_(
reset_agent.mock_calls,
[
@@ -1237,161 +1678,6 @@ class ResetAgentTest(ResetFixture, fixtures.TestBase):
@testing.requires.savepoints
def test_begin_nested_close(self, reset_agent):
with reset_agent.engine.connect() as connection:
- with testing.expect_deprecated_20(
- r"Calling Connection.begin_nested\(\) in "
- r"2.0 style use will return"
- ):
- trans = connection.begin_nested()
- assert not trans.is_active
- eq_(
- reset_agent.mock_calls,
- [
- mock.call.rollback(connection),
- mock.call.do_rollback(mock.ANY),
- ],
- )
-
- @testing.requires.savepoints
- def test_begin_begin_nested_close(self, reset_agent):
- with reset_agent.engine.connect() as connection:
- trans = connection.begin()
- trans2 = connection.begin_nested()
- assert not trans2.is_active
- assert not trans.is_active
- eq_(
- reset_agent.mock_calls,
- [
- mock.call.rollback(connection),
- mock.call.do_rollback(mock.ANY),
- ],
- )
-
- @testing.requires.savepoints
- def test_begin_begin_nested_rollback_commit(self, reset_agent):
- with reset_agent.engine.connect() as connection:
- trans = connection.begin()
- trans2 = connection.begin_nested()
- trans2.rollback()
- trans.commit()
- eq_(
- reset_agent.mock_calls,
- [
- mock.call.rollback_savepoint(connection, mock.ANY, mock.ANY),
- mock.call.commit(connection),
- mock.call.do_commit(mock.ANY),
- mock.call.do_rollback(mock.ANY),
- ],
- )
-
- @testing.requires.savepoints
- def test_begin_begin_nested_rollback_rollback(self, reset_agent):
- with reset_agent.engine.connect() as connection:
- trans = connection.begin()
- trans2 = connection.begin_nested()
- trans2.rollback()
- trans.rollback()
- eq_(
- reset_agent.mock_calls,
- [
- mock.call.rollback_savepoint(connection, mock.ANY, mock.ANY),
- mock.call.rollback(connection),
- mock.call.do_rollback(mock.ANY),
- mock.call.do_rollback(mock.ANY),
- ],
- )
-
- @testing.requires.two_phase_transactions
- def test_reset_via_agent_begin_twophase(self, reset_agent):
- with reset_agent.engine.connect() as connection:
- trans = connection.begin_twophase() # noqa
-
- # pg8000 rolls back via the rollback_twophase
- eq_(
- reset_agent.mock_calls[0],
- mock.call.rollback_twophase(connection, mock.ANY, mock.ANY),
- )
-
- @testing.requires.two_phase_transactions
- def test_reset_via_agent_begin_twophase_commit(self, reset_agent):
- with reset_agent.engine.connect() as connection:
- trans = connection.begin_twophase()
- trans.commit()
- eq_(
- reset_agent.mock_calls[0],
- mock.call.commit_twophase(connection, mock.ANY, mock.ANY),
- )
-
- eq_(reset_agent.mock_calls[-1], mock.call.do_rollback(mock.ANY))
-
- @testing.requires.two_phase_transactions
- def test_reset_via_agent_begin_twophase_rollback(self, reset_agent):
- with reset_agent.engine.connect() as connection:
- trans = connection.begin_twophase()
- trans.rollback()
- eq_(
- reset_agent.mock_calls[0:2],
- [
- mock.call.rollback_twophase(connection, mock.ANY, mock.ANY),
- mock.call.do_rollback(mock.ANY),
- ],
- )
-
- eq_(reset_agent.mock_calls[-1], mock.call.do_rollback(mock.ANY))
-
-
-class FutureResetAgentTest(
- ResetFixture, fixtures.FutureEngineMixin, fixtures.TestBase
-):
-
- __backend__ = True
-
- def test_reset_agent_no_conn_transaction(self, reset_agent):
- with reset_agent.engine.connect():
- pass
-
- eq_(reset_agent.mock_calls, [mock.call.do_rollback(mock.ANY)])
-
- def test_begin_close(self, reset_agent):
- with reset_agent.engine.connect() as connection:
- trans = connection.begin()
-
- assert not trans.is_active
- eq_(
- reset_agent.mock_calls,
- [mock.call.rollback(connection), mock.call.do_rollback(mock.ANY)],
- )
-
- def test_begin_rollback(self, reset_agent):
- with reset_agent.engine.connect() as connection:
- trans = connection.begin()
- trans.rollback()
- assert not trans.is_active
- eq_(
- reset_agent.mock_calls,
- [
- mock.call.rollback(connection),
- mock.call.do_rollback(mock.ANY),
- mock.call.do_rollback(mock.ANY),
- ],
- )
-
- def test_begin_commit(self, reset_agent):
- with reset_agent.engine.connect() as connection:
- trans = connection.begin()
- trans.commit()
- assert not trans.is_active
- eq_(
- reset_agent.mock_calls,
- [
- mock.call.commit(connection),
- mock.call.do_commit(mock.ANY),
- mock.call.do_rollback(mock.ANY),
- ],
- )
-
- @testing.requires.savepoints
- def test_begin_nested_close(self, reset_agent):
- with reset_agent.engine.connect() as connection:
trans = connection.begin_nested()
# it's a savepoint, but root made sure it closed
assert not trans.is_active
@@ -1492,554 +1778,8 @@ class FutureResetAgentTest(
)
eq_(reset_agent.mock_calls[-1], mock.call.do_rollback(mock.ANY))
-
-class FutureTransactionTest(fixtures.FutureEngineMixin, fixtures.TablesTest):
- __backend__ = True
-
- @classmethod
- def define_tables(cls, metadata):
- Table(
- "users",
- metadata,
- Column("user_id", INT, primary_key=True, autoincrement=False),
- Column("user_name", VARCHAR(20)),
- test_needs_acid=True,
- )
- Table(
- "users_autoinc",
- metadata,
- Column(
- "user_id", INT, primary_key=True, test_needs_autoincrement=True
- ),
- Column("user_name", VARCHAR(20)),
- test_needs_acid=True,
- )
-
- @testing.fixture
- def local_connection(self):
- with testing.db.connect() as conn:
- yield conn
-
- def test_interrupt_ctxmanager_engine(self, trans_ctx_manager_fixture):
- fn = trans_ctx_manager_fixture
-
- fn(testing.db, trans_on_subject=False, execute_on_subject=False)
-
- @testing.combinations((True,), (False,), argnames="trans_on_subject")
- def test_interrupt_ctxmanager_connection(
- self, trans_ctx_manager_fixture, trans_on_subject
- ):
- fn = trans_ctx_manager_fixture
-
- with testing.db.connect() as conn:
- fn(
- conn,
- trans_on_subject=trans_on_subject,
- execute_on_subject=True,
- )
-
- def test_autobegin_rollback(self):
- users = self.tables.users
- with testing.db.connect() as conn:
- conn.execute(users.insert(), {"user_id": 1, "user_name": "name"})
- conn.rollback()
-
- eq_(conn.scalar(select(func.count(1)).select_from(users)), 0)
-
- @testing.requires.autocommit
- def test_autocommit_isolation_level(self):
- users = self.tables.users
-
- with testing.db.connect().execution_options(
- isolation_level="AUTOCOMMIT"
- ) as conn:
- conn.execute(users.insert(), {"user_id": 1, "user_name": "name"})
- conn.rollback()
-
- with testing.db.connect() as conn:
- eq_(
- conn.scalar(select(func.count(1)).select_from(users)),
- 1,
- )
-
- @testing.requires.autocommit
- def test_no_autocommit_w_begin(self):
-
- with testing.db.begin() as conn:
- assert_raises_message(
- exc.InvalidRequestError,
- r"This connection has already initialized a SQLAlchemy "
- r"Transaction\(\) object via begin\(\) or autobegin; "
- r"isolation_level may not be altered unless rollback\(\) or "
- r"commit\(\) is called first.",
- conn.execution_options,
- isolation_level="AUTOCOMMIT",
- )
-
- @testing.requires.autocommit
- def test_no_autocommit_w_autobegin(self):
-
- with testing.db.connect() as conn:
- conn.execute(select(1))
-
- assert_raises_message(
- exc.InvalidRequestError,
- r"This connection has already initialized a SQLAlchemy "
- r"Transaction\(\) object via begin\(\) or autobegin; "
- r"isolation_level may not be altered unless rollback\(\) or "
- r"commit\(\) is called first.",
- conn.execution_options,
- isolation_level="AUTOCOMMIT",
- )
-
- conn.rollback()
-
- conn.execution_options(isolation_level="AUTOCOMMIT")
-
- def test_autobegin_commit(self):
- users = self.tables.users
-
- with testing.db.connect() as conn:
-
- assert not conn.in_transaction()
- conn.execute(users.insert(), {"user_id": 1, "user_name": "name"})
-
- assert conn.in_transaction()
- conn.commit()
-
- assert not conn.in_transaction()
-
- eq_(
- conn.scalar(select(func.count(1)).select_from(users)),
- 1,
- )
-
- conn.execute(users.insert(), {"user_id": 2, "user_name": "name 2"})
-
- eq_(
- conn.scalar(select(func.count(1)).select_from(users)),
- 2,
- )
-
- assert conn.in_transaction()
- conn.rollback()
- assert not conn.in_transaction()
-
- eq_(
- conn.scalar(select(func.count(1)).select_from(users)),
- 1,
- )
-
- def test_rollback_on_close(self):
- canary = mock.Mock()
- with testing.db.connect() as conn:
- event.listen(conn, "rollback", canary)
- conn.execute(select(1))
- assert conn.in_transaction()
-
- eq_(canary.mock_calls, [mock.call(conn)])
-
- def test_no_on_close_no_transaction(self):
- canary = mock.Mock()
- with testing.db.connect() as conn:
- event.listen(conn, "rollback", canary)
- conn.execute(select(1))
- conn.rollback()
- assert not conn.in_transaction()
-
- eq_(canary.mock_calls, [mock.call(conn)])
-
- def test_rollback_on_exception(self):
- canary = mock.Mock()
- try:
- with testing.db.connect() as conn:
- event.listen(conn, "rollback", canary)
- conn.execute(select(1))
- assert conn.in_transaction()
- raise Exception("some error")
- assert False
- except:
- pass
-
- eq_(canary.mock_calls, [mock.call(conn)])
-
- def test_rollback_on_exception_if_no_trans(self):
- canary = mock.Mock()
- try:
- with testing.db.connect() as conn:
- event.listen(conn, "rollback", canary)
- assert not conn.in_transaction()
- raise Exception("some error")
- assert False
- except:
- pass
-
- eq_(canary.mock_calls, [])
-
- def test_commit_no_begin(self):
- with testing.db.connect() as conn:
- assert not conn.in_transaction()
- conn.commit()
-
- @testing.requires.independent_connections
- def test_commit_inactive(self):
- with testing.db.connect() as conn:
- conn.begin()
- conn.invalidate()
-
- assert_raises_message(
- exc.InvalidRequestError, "Can't reconnect until", conn.commit
- )
-
- @testing.requires.independent_connections
- def test_rollback_inactive(self):
- users = self.tables.users
- with testing.db.connect() as conn:
-
- conn.execute(users.insert(), {"user_id": 1, "user_name": "name"})
- conn.commit()
-
- conn.execute(users.insert(), {"user_id": 2, "user_name": "name2"})
-
- conn.invalidate()
-
- assert_raises_message(
- exc.PendingRollbackError,
- "Can't reconnect",
- conn.execute,
- select(1),
- )
-
- conn.rollback()
- eq_(
- conn.scalar(select(func.count(1)).select_from(users)),
- 1,
- )
-
- def test_rollback_no_begin(self):
- with testing.db.connect() as conn:
- assert not conn.in_transaction()
- conn.rollback()
-
- def test_rollback_end_ctx_manager(self):
- with testing.db.begin() as conn:
- assert conn.in_transaction()
- conn.rollback()
- assert not conn.in_transaction()
-
- def test_rollback_end_ctx_manager_autobegin(self, local_connection):
- m1 = mock.Mock()
-
- event.listen(local_connection, "rollback", m1.rollback)
- event.listen(local_connection, "commit", m1.commit)
-
- with local_connection.begin() as trans:
- assert local_connection.in_transaction()
- trans.rollback()
- assert not local_connection.in_transaction()
-
- # previously, would be subject to autocommit.
- # now it raises
- with expect_raises_message(
- exc.InvalidRequestError,
- "Can't operate on closed transaction inside context manager. "
- "Please complete the context manager before emitting "
- "further commands.",
- ):
- local_connection.execute(select(1))
-
- assert not local_connection.in_transaction()
-
- @testing.combinations((True,), (False,), argnames="roll_back_in_block")
- def test_ctxmanager_rolls_back(self, local_connection, roll_back_in_block):
- m1 = mock.Mock()
-
- event.listen(local_connection, "rollback", m1.rollback)
- event.listen(local_connection, "commit", m1.commit)
-
- with expect_raises_message(Exception, "test"):
- with local_connection.begin() as trans:
- if roll_back_in_block:
- trans.rollback()
-
- if 1 == 1:
- raise Exception("test")
-
- assert not trans.is_active
- assert not local_connection.in_transaction()
- assert trans._deactivated_from_connection
-
- eq_(m1.mock_calls, [mock.call.rollback(local_connection)])
-
- @testing.requires.savepoints
- def test_ctxmanager_autobegins_real_trans_from_nested(
- self, local_connection
- ):
- m1 = mock.Mock()
-
- event.listen(
- local_connection, "rollback_savepoint", m1.rollback_savepoint
- )
- event.listen(
- local_connection, "release_savepoint", m1.release_savepoint
- )
- event.listen(local_connection, "rollback", m1.rollback)
- event.listen(local_connection, "commit", m1.commit)
- event.listen(local_connection, "begin", m1.begin)
- event.listen(local_connection, "savepoint", m1.savepoint)
-
- with local_connection.begin_nested() as nested_trans:
+ def test_reset_agent_no_conn_transaction(self, reset_agent):
+ with reset_agent.engine.connect():
pass
- assert not nested_trans.is_active
- assert nested_trans._deactivated_from_connection
- # legacy mode, no savepoint at all
- eq_(
- m1.mock_calls,
- [
- mock.call.begin(local_connection),
- mock.call.savepoint(local_connection, mock.ANY),
- mock.call.release_savepoint(
- local_connection, mock.ANY, mock.ANY
- ),
- ],
- )
-
- def test_explicit_begin(self):
- users = self.tables.users
-
- with testing.db.connect() as conn:
- assert not conn.in_transaction()
- conn.begin()
- assert conn.in_transaction()
- conn.execute(users.insert(), {"user_id": 1, "user_name": "name"})
- conn.commit()
-
- eq_(
- conn.scalar(select(func.count(1)).select_from(users)),
- 1,
- )
-
- def test_no_double_begin(self):
- with testing.db.connect() as conn:
- conn.begin()
-
- assert_raises_message(
- exc.InvalidRequestError,
- r"This connection has already initialized a SQLAlchemy "
- r"Transaction\(\) object via begin\(\) or autobegin; can't "
- r"call begin\(\) here unless rollback\(\) or commit\(\) is "
- r"called first.",
- conn.begin,
- )
-
- def test_no_autocommit(self):
- users = self.tables.users
-
- with testing.db.connect() as conn:
- conn.execute(users.insert(), {"user_id": 1, "user_name": "name"})
-
- with testing.db.connect() as conn:
- eq_(
- conn.scalar(select(func.count(1)).select_from(users)),
- 0,
- )
-
- def test_begin_block(self):
- users = self.tables.users
-
- with testing.db.begin() as conn:
- conn.execute(users.insert(), {"user_id": 1, "user_name": "name"})
-
- with testing.db.connect() as conn:
- eq_(
- conn.scalar(select(func.count(1)).select_from(users)),
- 1,
- )
-
- @testing.requires.savepoints
- def test_savepoint_one(self):
- users = self.tables.users
-
- with testing.db.begin() as conn:
- conn.execute(users.insert(), {"user_id": 1, "user_name": "name"})
-
- savepoint = conn.begin_nested()
- conn.execute(users.insert(), {"user_id": 2, "user_name": "name2"})
-
- eq_(
- conn.scalar(select(func.count(1)).select_from(users)),
- 2,
- )
- savepoint.rollback()
-
- eq_(
- conn.scalar(select(func.count(1)).select_from(users)),
- 1,
- )
-
- with testing.db.connect() as conn:
- eq_(
- conn.scalar(select(func.count(1)).select_from(users)),
- 1,
- )
-
- @testing.requires.savepoints
- def test_savepoint_two(self):
- users = self.tables.users
-
- with testing.db.begin() as conn:
- conn.execute(users.insert(), {"user_id": 1, "user_name": "name"})
-
- savepoint = conn.begin_nested()
- conn.execute(users.insert(), {"user_id": 2, "user_name": "name2"})
-
- eq_(
- conn.scalar(select(func.count(1)).select_from(users)),
- 2,
- )
- savepoint.commit()
-
- eq_(
- conn.scalar(select(func.count(1)).select_from(users)),
- 2,
- )
-
- with testing.db.connect() as conn:
- eq_(
- conn.scalar(select(func.count(1)).select_from(users)),
- 2,
- )
-
- @testing.requires.savepoints
- def test_savepoint_three(self):
- users = self.tables.users
-
- with testing.db.begin() as conn:
- conn.execute(users.insert(), {"user_id": 1, "user_name": "name"})
-
- conn.begin_nested()
- conn.execute(users.insert(), {"user_id": 2, "user_name": "name2"})
-
- conn.rollback()
-
- assert not conn.in_transaction()
-
- with testing.db.connect() as conn:
- eq_(
- conn.scalar(select(func.count(1)).select_from(users)),
- 0,
- )
-
- @testing.requires.savepoints
- def test_savepoint_four(self):
- users = self.tables.users
-
- with testing.db.begin() as conn:
- conn.execute(users.insert(), {"user_id": 1, "user_name": "name"})
-
- sp1 = conn.begin_nested()
- conn.execute(users.insert(), {"user_id": 2, "user_name": "name2"})
-
- sp2 = conn.begin_nested()
- conn.execute(users.insert(), {"user_id": 3, "user_name": "name3"})
-
- sp2.rollback()
-
- assert not sp2.is_active
- assert sp1.is_active
- assert conn.in_transaction()
-
- assert not sp1.is_active
-
- with testing.db.connect() as conn:
- eq_(
- conn.scalar(select(func.count(1)).select_from(users)),
- 2,
- )
-
- @testing.requires.savepoints
- def test_savepoint_five(self):
- users = self.tables.users
-
- with testing.db.begin() as conn:
- conn.execute(users.insert(), {"user_id": 1, "user_name": "name"})
-
- conn.begin_nested()
- conn.execute(users.insert(), {"user_id": 2, "user_name": "name2"})
-
- sp2 = conn.begin_nested()
- conn.execute(users.insert(), {"user_id": 3, "user_name": "name3"})
-
- sp2.commit()
-
- assert conn.in_transaction()
-
- with testing.db.connect() as conn:
- eq_(
- conn.scalar(select(func.count(1)).select_from(users)),
- 3,
- )
-
- @testing.requires.savepoints
- def test_savepoint_six(self):
- users = self.tables.users
-
- with testing.db.begin() as conn:
- conn.execute(users.insert(), {"user_id": 1, "user_name": "name"})
-
- sp1 = conn.begin_nested()
- conn.execute(users.insert(), {"user_id": 2, "user_name": "name2"})
-
- assert conn._nested_transaction is sp1
-
- sp2 = conn.begin_nested()
- conn.execute(users.insert(), {"user_id": 3, "user_name": "name3"})
-
- assert conn._nested_transaction is sp2
-
- sp2.commit()
-
- assert conn._nested_transaction is sp1
-
- sp1.rollback()
-
- assert conn._nested_transaction is None
-
- assert conn.in_transaction()
-
- with testing.db.connect() as conn:
- eq_(
- conn.scalar(select(func.count(1)).select_from(users)),
- 1,
- )
-
- @testing.requires.savepoints
- def test_savepoint_seven(self):
- users = self.tables.users
-
- conn = testing.db.connect()
- trans = conn.begin()
- conn.execute(users.insert(), {"user_id": 1, "user_name": "name"})
-
- sp1 = conn.begin_nested()
- conn.execute(users.insert(), {"user_id": 2, "user_name": "name2"})
-
- sp2 = conn.begin_nested()
- conn.execute(users.insert(), {"user_id": 3, "user_name": "name3"})
-
- assert conn.in_transaction()
-
- trans.close()
-
- assert not sp1.is_active
- assert not sp2.is_active
- assert not trans.is_active
- assert conn._transaction is None
- assert conn._nested_transaction is None
-
- with testing.db.connect() as conn:
- eq_(
- conn.scalar(select(func.count(1)).select_from(users)),
- 0,
- )
+ eq_(reset_agent.mock_calls, [mock.call.do_rollback(mock.ANY)])