diff options
Diffstat (limited to 'test/engine/test_transaction.py')
-rw-r--r-- | test/engine/test_transaction.py | 1460 |
1 files changed, 600 insertions, 860 deletions
diff --git a/test/engine/test_transaction.py b/test/engine/test_transaction.py index 9e6142022..50da425bd 100644 --- a/test/engine/test_transaction.py +++ b/test/engine/test_transaction.py @@ -1,5 +1,3 @@ -import sys - from sqlalchemy import event from sqlalchemy import exc from sqlalchemy import func @@ -20,7 +18,6 @@ from sqlalchemy.testing import expect_warnings from sqlalchemy.testing import fixtures from sqlalchemy.testing import mock from sqlalchemy.testing import ne_ -from sqlalchemy.testing.assertions import expect_deprecated_20 from sqlalchemy.testing.assertions import expect_raises_message from sqlalchemy.testing.engines import testing_engine from sqlalchemy.testing.schema import Column @@ -45,30 +42,6 @@ class TransactionTest(fixtures.TablesTest): with testing.db.connect() as conn: yield conn - def test_interrupt_ctxmanager_engine(self, trans_ctx_manager_fixture): - fn = trans_ctx_manager_fixture - - # add commit/rollback to the legacy Connection object so that - # we can test this less-likely case in use with the legacy - # Engine.begin() context manager - class ConnWCommitRollback(testing.db._connection_cls): - def commit(self): - self.get_transaction().commit() - - def rollback(self): - self.get_transaction().rollback() - - with mock.patch.object( - testing.db, "_connection_cls", ConnWCommitRollback - ): - fn(testing.db, trans_on_subject=False, execute_on_subject=False) - - def test_interrupt_ctxmanager_connection(self, trans_ctx_manager_fixture): - fn = trans_ctx_manager_fixture - - with testing.db.connect() as conn: - fn(conn, trans_on_subject=False, execute_on_subject=True) - def test_commits(self, local_connection): users = self.tables.users connection = local_connection @@ -148,56 +121,6 @@ class TransactionTest(fixtures.TablesTest): assert not local_connection.in_transaction() @testing.combinations((True,), (False,), argnames="roll_back_in_block") - def test_ctxmanager_rolls_back(self, local_connection, roll_back_in_block): - m1 = mock.Mock() - - event.listen(local_connection, "rollback", m1.rollback) - event.listen(local_connection, "commit", m1.commit) - - with expect_raises_message(Exception, "test"): - with local_connection.begin() as trans: - if roll_back_in_block: - trans.rollback() - - if 1 == 1: - raise Exception("test") - - assert not trans.is_active - assert not local_connection.in_transaction() - assert trans._deactivated_from_connection - - eq_(m1.mock_calls, [mock.call.rollback(local_connection)]) - - @testing.combinations((True,), (False,), argnames="roll_back_in_block") - def test_ctxmanager_rolls_back_legacy_marker( - self, local_connection, roll_back_in_block - ): - m1 = mock.Mock() - - event.listen(local_connection, "rollback", m1.rollback) - event.listen(local_connection, "commit", m1.commit) - - with expect_deprecated_20( - r"Calling .begin\(\) when a transaction is already begun" - ): - with local_connection.begin() as trans: - with expect_raises_message(Exception, "test"): - with local_connection.begin() as marker_trans: - if roll_back_in_block: - marker_trans.rollback() - if 1 == 1: - raise Exception("test") - - assert not marker_trans.is_active - assert marker_trans._deactivated_from_connection - - assert not trans._deactivated_from_connection - assert not trans.is_active - assert not local_connection.in_transaction() - - eq_(m1.mock_calls, [mock.call.rollback(local_connection)]) - - @testing.combinations((True,), (False,), argnames="roll_back_in_block") @testing.requires.savepoints def test_ctxmanager_rolls_back_savepoint( self, local_connection, roll_back_in_block @@ -235,37 +158,6 @@ class TransactionTest(fixtures.TablesTest): ], ) - def test_ctxmanager_commits_real_trans_from_nested(self, local_connection): - m1 = mock.Mock() - - event.listen( - local_connection, "rollback_savepoint", m1.rollback_savepoint - ) - event.listen( - local_connection, "release_savepoint", m1.release_savepoint - ) - event.listen(local_connection, "rollback", m1.rollback) - event.listen(local_connection, "commit", m1.commit) - event.listen(local_connection, "begin", m1.begin) - event.listen(local_connection, "savepoint", m1.savepoint) - - with testing.expect_deprecated_20( - r"Calling Connection.begin_nested\(\) in 2.0 style use will return" - ): - with local_connection.begin_nested() as nested_trans: - pass - - assert not nested_trans.is_active - assert nested_trans._deactivated_from_connection - # legacy mode, no savepoint at all - eq_( - m1.mock_calls, - [ - mock.call.begin(local_connection), - mock.call.commit(local_connection), - ], - ) - def test_deactivated_warning_straight(self, local_connection): with expect_warnings( "transaction already deassociated from connection" @@ -427,39 +319,34 @@ class TransactionTest(fixtures.TablesTest): 0, ) - def test_with_interface(self, local_connection): + def test_ctxmanager_interface(self, local_connection): + # a legacy test, adapted for 2.x style, was called + # "test_with_interface". this is likely an early test for when + # the "with" construct was first added. + connection = local_connection users = self.tables.users trans = connection.begin() - trans.__enter__() - connection.execute(users.insert(), dict(user_id=1, user_name="user1")) - connection.execute(users.insert(), dict(user_id=2, user_name="user2")) - try: + + with trans: connection.execute( - users.insert(), dict(user_id=2, user_name="user2.5") + users.insert(), dict(user_id=1, user_name="user1") + ) + connection.execute( + users.insert(), dict(user_id=2, user_name="user2") ) - except Exception: - trans.__exit__(*sys.exc_info()) - assert not trans.is_active - self.assert_( - connection.exec_driver_sql( - "select count(*) from " "users" - ).scalar() - == 0 - ) + assert trans.is_active - trans = connection.begin() - trans.__enter__() - connection.execute(users.insert(), dict(user_id=1, user_name="user1")) - trans.__exit__(None, None, None) assert not trans.is_active - self.assert_( + + eq_( connection.exec_driver_sql( "select count(*) from " "users" - ).scalar() - == 1 + ).scalar(), + 2, ) + connection.rollback() def test_close(self, local_connection): connection = local_connection @@ -648,6 +535,531 @@ class TransactionTest(fixtures.TablesTest): ) eq_(result.fetchall(), []) + def test_interrupt_ctxmanager_engine(self, trans_ctx_manager_fixture): + fn = trans_ctx_manager_fixture + + fn(testing.db, trans_on_subject=False, execute_on_subject=False) + + @testing.combinations((True,), (False,), argnames="trans_on_subject") + def test_interrupt_ctxmanager_connection( + self, trans_ctx_manager_fixture, trans_on_subject + ): + fn = trans_ctx_manager_fixture + + with testing.db.connect() as conn: + fn( + conn, + trans_on_subject=trans_on_subject, + execute_on_subject=True, + ) + + def test_autobegin_rollback(self): + users = self.tables.users + with testing.db.connect() as conn: + conn.execute(users.insert(), {"user_id": 1, "user_name": "name"}) + conn.rollback() + + eq_(conn.scalar(select(func.count(1)).select_from(users)), 0) + + @testing.requires.autocommit + def test_autocommit_isolation_level(self): + users = self.tables.users + + with testing.db.connect().execution_options( + isolation_level="AUTOCOMMIT" + ) as conn: + conn.execute(users.insert(), {"user_id": 1, "user_name": "name"}) + conn.rollback() + + with testing.db.connect() as conn: + eq_( + conn.scalar(select(func.count(1)).select_from(users)), + 1, + ) + + @testing.requires.autocommit + def test_no_autocommit_w_begin(self): + + with testing.db.begin() as conn: + assert_raises_message( + exc.InvalidRequestError, + r"This connection has already initialized a SQLAlchemy " + r"Transaction\(\) object via begin\(\) or autobegin; " + r"isolation_level may not be altered unless rollback\(\) or " + r"commit\(\) is called first.", + conn.execution_options, + isolation_level="AUTOCOMMIT", + ) + + @testing.requires.autocommit + def test_no_autocommit_w_autobegin(self): + + with testing.db.connect() as conn: + conn.execute(select(1)) + + assert_raises_message( + exc.InvalidRequestError, + r"This connection has already initialized a SQLAlchemy " + r"Transaction\(\) object via begin\(\) or autobegin; " + r"isolation_level may not be altered unless rollback\(\) or " + r"commit\(\) is called first.", + conn.execution_options, + isolation_level="AUTOCOMMIT", + ) + + conn.rollback() + + conn.execution_options(isolation_level="AUTOCOMMIT") + + def test_autobegin_commit(self): + users = self.tables.users + + with testing.db.connect() as conn: + + assert not conn.in_transaction() + conn.execute(users.insert(), {"user_id": 1, "user_name": "name"}) + + assert conn.in_transaction() + conn.commit() + + assert not conn.in_transaction() + + eq_( + conn.scalar(select(func.count(1)).select_from(users)), + 1, + ) + + conn.execute(users.insert(), {"user_id": 2, "user_name": "name 2"}) + + eq_( + conn.scalar(select(func.count(1)).select_from(users)), + 2, + ) + + assert conn.in_transaction() + conn.rollback() + assert not conn.in_transaction() + + eq_( + conn.scalar(select(func.count(1)).select_from(users)), + 1, + ) + + def test_rollback_on_close(self): + canary = mock.Mock() + with testing.db.connect() as conn: + event.listen(conn, "rollback", canary) + conn.execute(select(1)) + assert conn.in_transaction() + + eq_(canary.mock_calls, [mock.call(conn)]) + + def test_no_on_close_no_transaction(self): + canary = mock.Mock() + with testing.db.connect() as conn: + event.listen(conn, "rollback", canary) + conn.execute(select(1)) + conn.rollback() + assert not conn.in_transaction() + + eq_(canary.mock_calls, [mock.call(conn)]) + + def test_rollback_on_exception(self): + canary = mock.Mock() + try: + with testing.db.connect() as conn: + event.listen(conn, "rollback", canary) + conn.execute(select(1)) + assert conn.in_transaction() + raise Exception("some error") + assert False + except: + pass + + eq_(canary.mock_calls, [mock.call(conn)]) + + def test_rollback_on_exception_if_no_trans(self): + canary = mock.Mock() + try: + with testing.db.connect() as conn: + event.listen(conn, "rollback", canary) + assert not conn.in_transaction() + raise Exception("some error") + assert False + except: + pass + + eq_(canary.mock_calls, []) + + def test_commit_no_begin(self): + with testing.db.connect() as conn: + assert not conn.in_transaction() + conn.commit() + + @testing.requires.independent_connections + def test_commit_inactive(self): + with testing.db.connect() as conn: + conn.begin() + conn.invalidate() + + assert_raises_message( + exc.InvalidRequestError, "Can't reconnect until", conn.commit + ) + + @testing.requires.independent_connections + def test_rollback_inactive(self): + users = self.tables.users + with testing.db.connect() as conn: + + conn.execute(users.insert(), {"user_id": 1, "user_name": "name"}) + conn.commit() + + conn.execute(users.insert(), {"user_id": 2, "user_name": "name2"}) + + conn.invalidate() + + assert_raises_message( + exc.PendingRollbackError, + "Can't reconnect", + conn.execute, + select(1), + ) + + conn.rollback() + eq_( + conn.scalar(select(func.count(1)).select_from(users)), + 1, + ) + + def test_rollback_no_begin(self): + with testing.db.connect() as conn: + assert not conn.in_transaction() + conn.rollback() + + def test_rollback_end_ctx_manager(self): + with testing.db.begin() as conn: + assert conn.in_transaction() + conn.rollback() + assert not conn.in_transaction() + + def test_rollback_end_ctx_manager_autobegin(self, local_connection): + m1 = mock.Mock() + + event.listen(local_connection, "rollback", m1.rollback) + event.listen(local_connection, "commit", m1.commit) + + with local_connection.begin() as trans: + assert local_connection.in_transaction() + trans.rollback() + assert not local_connection.in_transaction() + + # previously, would be subject to autocommit. + # now it raises + with expect_raises_message( + exc.InvalidRequestError, + "Can't operate on closed transaction inside context manager. " + "Please complete the context manager before emitting " + "further commands.", + ): + local_connection.execute(select(1)) + + assert not local_connection.in_transaction() + + @testing.combinations((True,), (False,), argnames="roll_back_in_block") + def test_ctxmanager_rolls_back(self, local_connection, roll_back_in_block): + m1 = mock.Mock() + + event.listen(local_connection, "rollback", m1.rollback) + event.listen(local_connection, "commit", m1.commit) + + with expect_raises_message(Exception, "test"): + with local_connection.begin() as trans: + if roll_back_in_block: + trans.rollback() + + if 1 == 1: + raise Exception("test") + + assert not trans.is_active + assert not local_connection.in_transaction() + assert trans._deactivated_from_connection + + eq_(m1.mock_calls, [mock.call.rollback(local_connection)]) + + @testing.requires.savepoints + def test_ctxmanager_autobegins_real_trans_from_nested( + self, local_connection + ): + # the legacy version of this test in 1.4 + # was test_ctxmanager_commits_real_trans_from_nested + m1 = mock.Mock() + + event.listen( + local_connection, "rollback_savepoint", m1.rollback_savepoint + ) + event.listen( + local_connection, "release_savepoint", m1.release_savepoint + ) + event.listen(local_connection, "rollback", m1.rollback) + event.listen(local_connection, "commit", m1.commit) + event.listen(local_connection, "begin", m1.begin) + event.listen(local_connection, "savepoint", m1.savepoint) + + with local_connection.begin_nested() as nested_trans: + pass + + assert not nested_trans.is_active + assert nested_trans._deactivated_from_connection + eq_( + m1.mock_calls, + [ + mock.call.begin(local_connection), + mock.call.savepoint(local_connection, mock.ANY), + mock.call.release_savepoint( + local_connection, mock.ANY, mock.ANY + ), + ], + ) + + def test_explicit_begin(self): + users = self.tables.users + + with testing.db.connect() as conn: + assert not conn.in_transaction() + conn.begin() + assert conn.in_transaction() + conn.execute(users.insert(), {"user_id": 1, "user_name": "name"}) + conn.commit() + + eq_( + conn.scalar(select(func.count(1)).select_from(users)), + 1, + ) + + def test_no_double_begin(self): + with testing.db.connect() as conn: + conn.begin() + + assert_raises_message( + exc.InvalidRequestError, + r"This connection has already initialized a SQLAlchemy " + r"Transaction\(\) object via begin\(\) or autobegin; can't " + r"call begin\(\) here unless rollback\(\) or commit\(\) is " + r"called first.", + conn.begin, + ) + + def test_no_autocommit(self): + users = self.tables.users + + with testing.db.connect() as conn: + conn.execute(users.insert(), {"user_id": 1, "user_name": "name"}) + + with testing.db.connect() as conn: + eq_( + conn.scalar(select(func.count(1)).select_from(users)), + 0, + ) + + def test_begin_block(self): + users = self.tables.users + + with testing.db.begin() as conn: + conn.execute(users.insert(), {"user_id": 1, "user_name": "name"}) + + with testing.db.connect() as conn: + eq_( + conn.scalar(select(func.count(1)).select_from(users)), + 1, + ) + + @testing.requires.savepoints + def test_savepoint_one(self): + users = self.tables.users + + with testing.db.begin() as conn: + conn.execute(users.insert(), {"user_id": 1, "user_name": "name"}) + + savepoint = conn.begin_nested() + conn.execute(users.insert(), {"user_id": 2, "user_name": "name2"}) + + eq_( + conn.scalar(select(func.count(1)).select_from(users)), + 2, + ) + savepoint.rollback() + + eq_( + conn.scalar(select(func.count(1)).select_from(users)), + 1, + ) + + with testing.db.connect() as conn: + eq_( + conn.scalar(select(func.count(1)).select_from(users)), + 1, + ) + + @testing.requires.savepoints + def test_savepoint_two(self): + users = self.tables.users + + with testing.db.begin() as conn: + conn.execute(users.insert(), {"user_id": 1, "user_name": "name"}) + + savepoint = conn.begin_nested() + conn.execute(users.insert(), {"user_id": 2, "user_name": "name2"}) + + eq_( + conn.scalar(select(func.count(1)).select_from(users)), + 2, + ) + savepoint.commit() + + eq_( + conn.scalar(select(func.count(1)).select_from(users)), + 2, + ) + + with testing.db.connect() as conn: + eq_( + conn.scalar(select(func.count(1)).select_from(users)), + 2, + ) + + @testing.requires.savepoints + def test_savepoint_three(self): + users = self.tables.users + + with testing.db.begin() as conn: + conn.execute(users.insert(), {"user_id": 1, "user_name": "name"}) + + conn.begin_nested() + conn.execute(users.insert(), {"user_id": 2, "user_name": "name2"}) + + conn.rollback() + + assert not conn.in_transaction() + + with testing.db.connect() as conn: + eq_( + conn.scalar(select(func.count(1)).select_from(users)), + 0, + ) + + @testing.requires.savepoints + def test_savepoint_four(self): + users = self.tables.users + + with testing.db.begin() as conn: + conn.execute(users.insert(), {"user_id": 1, "user_name": "name"}) + + sp1 = conn.begin_nested() + conn.execute(users.insert(), {"user_id": 2, "user_name": "name2"}) + + sp2 = conn.begin_nested() + conn.execute(users.insert(), {"user_id": 3, "user_name": "name3"}) + + sp2.rollback() + + assert not sp2.is_active + assert sp1.is_active + assert conn.in_transaction() + + assert not sp1.is_active + + with testing.db.connect() as conn: + eq_( + conn.scalar(select(func.count(1)).select_from(users)), + 2, + ) + + @testing.requires.savepoints + def test_savepoint_five(self): + users = self.tables.users + + with testing.db.begin() as conn: + conn.execute(users.insert(), {"user_id": 1, "user_name": "name"}) + + conn.begin_nested() + conn.execute(users.insert(), {"user_id": 2, "user_name": "name2"}) + + sp2 = conn.begin_nested() + conn.execute(users.insert(), {"user_id": 3, "user_name": "name3"}) + + sp2.commit() + + assert conn.in_transaction() + + with testing.db.connect() as conn: + eq_( + conn.scalar(select(func.count(1)).select_from(users)), + 3, + ) + + @testing.requires.savepoints + def test_savepoint_six(self): + users = self.tables.users + + with testing.db.begin() as conn: + conn.execute(users.insert(), {"user_id": 1, "user_name": "name"}) + + sp1 = conn.begin_nested() + conn.execute(users.insert(), {"user_id": 2, "user_name": "name2"}) + + assert conn._nested_transaction is sp1 + + sp2 = conn.begin_nested() + conn.execute(users.insert(), {"user_id": 3, "user_name": "name3"}) + + assert conn._nested_transaction is sp2 + + sp2.commit() + + assert conn._nested_transaction is sp1 + + sp1.rollback() + + assert conn._nested_transaction is None + + assert conn.in_transaction() + + with testing.db.connect() as conn: + eq_( + conn.scalar(select(func.count(1)).select_from(users)), + 1, + ) + + @testing.requires.savepoints + def test_savepoint_seven(self): + users = self.tables.users + + conn = testing.db.connect() + trans = conn.begin() + conn.execute(users.insert(), {"user_id": 1, "user_name": "name"}) + + sp1 = conn.begin_nested() + conn.execute(users.insert(), {"user_id": 2, "user_name": "name2"}) + + sp2 = conn.begin_nested() + conn.execute(users.insert(), {"user_id": 3, "user_name": "name3"}) + + assert conn.in_transaction() + + trans.close() + + assert not sp1.is_active + assert not sp2.is_active + assert not trans.is_active + assert conn._transaction is None + assert conn._nested_transaction is None + + with testing.db.connect() as conn: + eq_( + conn.scalar(select(func.count(1)).select_from(users)), + 0, + ) + class AutoRollbackTest(fixtures.TestBase): __backend__ = True @@ -693,7 +1105,6 @@ class IsolationLevelTest(fixtures.TestBase): __requires__ = ( "isolation_level", "ad_hoc_engines", - "legacy_isolation_level", ) __backend__ = True @@ -712,6 +1123,7 @@ class IsolationLevelTest(fixtures.TestBase): else: assert False, "no non-default isolation level available" + @testing.requires.legacy_isolation_level def test_engine_param_stays(self): eng = testing_engine() @@ -765,6 +1177,7 @@ class IsolationLevelTest(fixtures.TestBase): conn.close() + @testing.requires.legacy_isolation_level def test_reset_level_with_setting(self): eng = testing_engine( options=dict(isolation_level=self._non_default_isolation_level()) @@ -788,7 +1201,8 @@ class IsolationLevelTest(fixtures.TestBase): ) conn.close() - def test_invalid_level(self): + @testing.requires.legacy_isolation_level + def test_invalid_level_engine_param(self): eng = testing_engine(options=dict(isolation_level="FOO")) assert_raises_message( exc.ArgumentError, @@ -802,6 +1216,33 @@ class IsolationLevelTest(fixtures.TestBase): eng.connect, ) + # TODO: all the dialects seem to be manually raising ArgumentError + # individually within their set_isolation_level() methods, when this + # should be a default dialect feature so that + # error messaging etc. is consistent, including that it works for 3rd + # party dialects. + # TODO: barring that, at least implement this for the Oracle dialect + @testing.fails_on( + "oracle", + "cx_oracle dialect doesnt have argument error here, " + "raises it via the DB rejecting it", + ) + def test_invalid_level_execution_option(self): + eng = testing_engine( + options=dict(execution_options=dict(isolation_level="FOO")) + ) + assert_raises_message( + exc.ArgumentError, + "Invalid value '%s' for isolation_level. " + "Valid isolation levels for %s are %s" + % ( + "FOO", + eng.dialect.name, + ", ".join(eng.dialect._isolation_lookup), + ), + eng.connect, + ) + def test_connection_invalidated(self): eng = testing_engine() conn = eng.connect() @@ -851,28 +1292,25 @@ class IsolationLevelTest(fixtures.TestBase): c3.close() c4.close() - def test_warning_in_transaction(self): + def test_exception_in_transaction(self): eng = testing_engine() c1 = eng.connect() - with expect_warnings( - "Connection is already established with a Transaction; " - "setting isolation_level may implicitly rollback or commit " - "the existing transaction, or have no effect until next " - "transaction" + with expect_raises_message( + exc.InvalidRequestError, + r"This connection has already initialized a SQLAlchemy " + r"Transaction\(\) object via begin\(\) or autobegin; " + r"isolation_level may not be altered unless rollback\(\) or " + r"commit\(\) is called first.", ): with c1.begin(): c1 = c1.execution_options( isolation_level=self._non_default_isolation_level() ) - eq_( - eng.dialect.get_isolation_level(c1.connection), - self._non_default_isolation_level(), - ) - # stays outside of transaction + # was never set, so we are on original value eq_( eng.dialect.get_isolation_level(c1.connection), - self._non_default_isolation_level(), + self._default_isolation_level(), ) def test_per_statement_bzzt(self): @@ -1015,25 +1453,25 @@ class ConnectionCharacteristicTest(fixtures.TestBase): eq_(connection.foo, "original_value") - def test_warning_in_transaction(self, characteristic_fixture): + def test_exception_in_transaction(self, characteristic_fixture): + # this was a warning in 1.x. it appears we did not test the + # 2.0 error case in 1.4 + engine, connection = characteristic_fixture c1 = engine.connect() - with expect_warnings( - "Connection is already established with a Transaction; " - "setting foo may implicitly rollback or commit " - "the existing transaction, or have no effect until next " - "transaction" + with expect_raises_message( + exc.InvalidRequestError, + r"This connection has already initialized a SQLAlchemy " + r"Transaction\(\) object via begin\(\) or autobegin; " + r"foo may not be altered unless rollback\(\) or " + r"commit\(\) is called first.", ): with c1.begin(): c1 = c1.execution_options(foo="new_foo") - eq_( - engine.dialect.get_foo(c1.connection), - "new_foo", - ) - # stays outside of transaction - eq_(engine.dialect.get_foo(c1.connection), "new_foo") + # was never set, so we are on original value + eq_(engine.dialect.get_foo(c1.connection), "original_value") @testing.fails("no error is raised yet here.") def test_per_statement_bzzt(self, characteristic_fixture): @@ -1120,6 +1558,7 @@ class ResetAgentTest(ResetFixture, fixtures.TestBase): def test_begin_close(self, reset_agent): with reset_agent.engine.connect() as connection: trans = connection.begin() + assert not trans.is_active eq_( reset_agent.mock_calls, @@ -1130,6 +1569,7 @@ class ResetAgentTest(ResetFixture, fixtures.TestBase): with reset_agent.engine.connect() as connection: trans = connection.begin() trans.rollback() + assert not trans.is_active eq_( reset_agent.mock_calls, [ @@ -1143,6 +1583,7 @@ class ResetAgentTest(ResetFixture, fixtures.TestBase): with reset_agent.engine.connect() as connection: trans = connection.begin() trans.commit() + assert not trans.is_active eq_( reset_agent.mock_calls, [ @@ -1237,161 +1678,6 @@ class ResetAgentTest(ResetFixture, fixtures.TestBase): @testing.requires.savepoints def test_begin_nested_close(self, reset_agent): with reset_agent.engine.connect() as connection: - with testing.expect_deprecated_20( - r"Calling Connection.begin_nested\(\) in " - r"2.0 style use will return" - ): - trans = connection.begin_nested() - assert not trans.is_active - eq_( - reset_agent.mock_calls, - [ - mock.call.rollback(connection), - mock.call.do_rollback(mock.ANY), - ], - ) - - @testing.requires.savepoints - def test_begin_begin_nested_close(self, reset_agent): - with reset_agent.engine.connect() as connection: - trans = connection.begin() - trans2 = connection.begin_nested() - assert not trans2.is_active - assert not trans.is_active - eq_( - reset_agent.mock_calls, - [ - mock.call.rollback(connection), - mock.call.do_rollback(mock.ANY), - ], - ) - - @testing.requires.savepoints - def test_begin_begin_nested_rollback_commit(self, reset_agent): - with reset_agent.engine.connect() as connection: - trans = connection.begin() - trans2 = connection.begin_nested() - trans2.rollback() - trans.commit() - eq_( - reset_agent.mock_calls, - [ - mock.call.rollback_savepoint(connection, mock.ANY, mock.ANY), - mock.call.commit(connection), - mock.call.do_commit(mock.ANY), - mock.call.do_rollback(mock.ANY), - ], - ) - - @testing.requires.savepoints - def test_begin_begin_nested_rollback_rollback(self, reset_agent): - with reset_agent.engine.connect() as connection: - trans = connection.begin() - trans2 = connection.begin_nested() - trans2.rollback() - trans.rollback() - eq_( - reset_agent.mock_calls, - [ - mock.call.rollback_savepoint(connection, mock.ANY, mock.ANY), - mock.call.rollback(connection), - mock.call.do_rollback(mock.ANY), - mock.call.do_rollback(mock.ANY), - ], - ) - - @testing.requires.two_phase_transactions - def test_reset_via_agent_begin_twophase(self, reset_agent): - with reset_agent.engine.connect() as connection: - trans = connection.begin_twophase() # noqa - - # pg8000 rolls back via the rollback_twophase - eq_( - reset_agent.mock_calls[0], - mock.call.rollback_twophase(connection, mock.ANY, mock.ANY), - ) - - @testing.requires.two_phase_transactions - def test_reset_via_agent_begin_twophase_commit(self, reset_agent): - with reset_agent.engine.connect() as connection: - trans = connection.begin_twophase() - trans.commit() - eq_( - reset_agent.mock_calls[0], - mock.call.commit_twophase(connection, mock.ANY, mock.ANY), - ) - - eq_(reset_agent.mock_calls[-1], mock.call.do_rollback(mock.ANY)) - - @testing.requires.two_phase_transactions - def test_reset_via_agent_begin_twophase_rollback(self, reset_agent): - with reset_agent.engine.connect() as connection: - trans = connection.begin_twophase() - trans.rollback() - eq_( - reset_agent.mock_calls[0:2], - [ - mock.call.rollback_twophase(connection, mock.ANY, mock.ANY), - mock.call.do_rollback(mock.ANY), - ], - ) - - eq_(reset_agent.mock_calls[-1], mock.call.do_rollback(mock.ANY)) - - -class FutureResetAgentTest( - ResetFixture, fixtures.FutureEngineMixin, fixtures.TestBase -): - - __backend__ = True - - def test_reset_agent_no_conn_transaction(self, reset_agent): - with reset_agent.engine.connect(): - pass - - eq_(reset_agent.mock_calls, [mock.call.do_rollback(mock.ANY)]) - - def test_begin_close(self, reset_agent): - with reset_agent.engine.connect() as connection: - trans = connection.begin() - - assert not trans.is_active - eq_( - reset_agent.mock_calls, - [mock.call.rollback(connection), mock.call.do_rollback(mock.ANY)], - ) - - def test_begin_rollback(self, reset_agent): - with reset_agent.engine.connect() as connection: - trans = connection.begin() - trans.rollback() - assert not trans.is_active - eq_( - reset_agent.mock_calls, - [ - mock.call.rollback(connection), - mock.call.do_rollback(mock.ANY), - mock.call.do_rollback(mock.ANY), - ], - ) - - def test_begin_commit(self, reset_agent): - with reset_agent.engine.connect() as connection: - trans = connection.begin() - trans.commit() - assert not trans.is_active - eq_( - reset_agent.mock_calls, - [ - mock.call.commit(connection), - mock.call.do_commit(mock.ANY), - mock.call.do_rollback(mock.ANY), - ], - ) - - @testing.requires.savepoints - def test_begin_nested_close(self, reset_agent): - with reset_agent.engine.connect() as connection: trans = connection.begin_nested() # it's a savepoint, but root made sure it closed assert not trans.is_active @@ -1492,554 +1778,8 @@ class FutureResetAgentTest( ) eq_(reset_agent.mock_calls[-1], mock.call.do_rollback(mock.ANY)) - -class FutureTransactionTest(fixtures.FutureEngineMixin, fixtures.TablesTest): - __backend__ = True - - @classmethod - def define_tables(cls, metadata): - Table( - "users", - metadata, - Column("user_id", INT, primary_key=True, autoincrement=False), - Column("user_name", VARCHAR(20)), - test_needs_acid=True, - ) - Table( - "users_autoinc", - metadata, - Column( - "user_id", INT, primary_key=True, test_needs_autoincrement=True - ), - Column("user_name", VARCHAR(20)), - test_needs_acid=True, - ) - - @testing.fixture - def local_connection(self): - with testing.db.connect() as conn: - yield conn - - def test_interrupt_ctxmanager_engine(self, trans_ctx_manager_fixture): - fn = trans_ctx_manager_fixture - - fn(testing.db, trans_on_subject=False, execute_on_subject=False) - - @testing.combinations((True,), (False,), argnames="trans_on_subject") - def test_interrupt_ctxmanager_connection( - self, trans_ctx_manager_fixture, trans_on_subject - ): - fn = trans_ctx_manager_fixture - - with testing.db.connect() as conn: - fn( - conn, - trans_on_subject=trans_on_subject, - execute_on_subject=True, - ) - - def test_autobegin_rollback(self): - users = self.tables.users - with testing.db.connect() as conn: - conn.execute(users.insert(), {"user_id": 1, "user_name": "name"}) - conn.rollback() - - eq_(conn.scalar(select(func.count(1)).select_from(users)), 0) - - @testing.requires.autocommit - def test_autocommit_isolation_level(self): - users = self.tables.users - - with testing.db.connect().execution_options( - isolation_level="AUTOCOMMIT" - ) as conn: - conn.execute(users.insert(), {"user_id": 1, "user_name": "name"}) - conn.rollback() - - with testing.db.connect() as conn: - eq_( - conn.scalar(select(func.count(1)).select_from(users)), - 1, - ) - - @testing.requires.autocommit - def test_no_autocommit_w_begin(self): - - with testing.db.begin() as conn: - assert_raises_message( - exc.InvalidRequestError, - r"This connection has already initialized a SQLAlchemy " - r"Transaction\(\) object via begin\(\) or autobegin; " - r"isolation_level may not be altered unless rollback\(\) or " - r"commit\(\) is called first.", - conn.execution_options, - isolation_level="AUTOCOMMIT", - ) - - @testing.requires.autocommit - def test_no_autocommit_w_autobegin(self): - - with testing.db.connect() as conn: - conn.execute(select(1)) - - assert_raises_message( - exc.InvalidRequestError, - r"This connection has already initialized a SQLAlchemy " - r"Transaction\(\) object via begin\(\) or autobegin; " - r"isolation_level may not be altered unless rollback\(\) or " - r"commit\(\) is called first.", - conn.execution_options, - isolation_level="AUTOCOMMIT", - ) - - conn.rollback() - - conn.execution_options(isolation_level="AUTOCOMMIT") - - def test_autobegin_commit(self): - users = self.tables.users - - with testing.db.connect() as conn: - - assert not conn.in_transaction() - conn.execute(users.insert(), {"user_id": 1, "user_name": "name"}) - - assert conn.in_transaction() - conn.commit() - - assert not conn.in_transaction() - - eq_( - conn.scalar(select(func.count(1)).select_from(users)), - 1, - ) - - conn.execute(users.insert(), {"user_id": 2, "user_name": "name 2"}) - - eq_( - conn.scalar(select(func.count(1)).select_from(users)), - 2, - ) - - assert conn.in_transaction() - conn.rollback() - assert not conn.in_transaction() - - eq_( - conn.scalar(select(func.count(1)).select_from(users)), - 1, - ) - - def test_rollback_on_close(self): - canary = mock.Mock() - with testing.db.connect() as conn: - event.listen(conn, "rollback", canary) - conn.execute(select(1)) - assert conn.in_transaction() - - eq_(canary.mock_calls, [mock.call(conn)]) - - def test_no_on_close_no_transaction(self): - canary = mock.Mock() - with testing.db.connect() as conn: - event.listen(conn, "rollback", canary) - conn.execute(select(1)) - conn.rollback() - assert not conn.in_transaction() - - eq_(canary.mock_calls, [mock.call(conn)]) - - def test_rollback_on_exception(self): - canary = mock.Mock() - try: - with testing.db.connect() as conn: - event.listen(conn, "rollback", canary) - conn.execute(select(1)) - assert conn.in_transaction() - raise Exception("some error") - assert False - except: - pass - - eq_(canary.mock_calls, [mock.call(conn)]) - - def test_rollback_on_exception_if_no_trans(self): - canary = mock.Mock() - try: - with testing.db.connect() as conn: - event.listen(conn, "rollback", canary) - assert not conn.in_transaction() - raise Exception("some error") - assert False - except: - pass - - eq_(canary.mock_calls, []) - - def test_commit_no_begin(self): - with testing.db.connect() as conn: - assert not conn.in_transaction() - conn.commit() - - @testing.requires.independent_connections - def test_commit_inactive(self): - with testing.db.connect() as conn: - conn.begin() - conn.invalidate() - - assert_raises_message( - exc.InvalidRequestError, "Can't reconnect until", conn.commit - ) - - @testing.requires.independent_connections - def test_rollback_inactive(self): - users = self.tables.users - with testing.db.connect() as conn: - - conn.execute(users.insert(), {"user_id": 1, "user_name": "name"}) - conn.commit() - - conn.execute(users.insert(), {"user_id": 2, "user_name": "name2"}) - - conn.invalidate() - - assert_raises_message( - exc.PendingRollbackError, - "Can't reconnect", - conn.execute, - select(1), - ) - - conn.rollback() - eq_( - conn.scalar(select(func.count(1)).select_from(users)), - 1, - ) - - def test_rollback_no_begin(self): - with testing.db.connect() as conn: - assert not conn.in_transaction() - conn.rollback() - - def test_rollback_end_ctx_manager(self): - with testing.db.begin() as conn: - assert conn.in_transaction() - conn.rollback() - assert not conn.in_transaction() - - def test_rollback_end_ctx_manager_autobegin(self, local_connection): - m1 = mock.Mock() - - event.listen(local_connection, "rollback", m1.rollback) - event.listen(local_connection, "commit", m1.commit) - - with local_connection.begin() as trans: - assert local_connection.in_transaction() - trans.rollback() - assert not local_connection.in_transaction() - - # previously, would be subject to autocommit. - # now it raises - with expect_raises_message( - exc.InvalidRequestError, - "Can't operate on closed transaction inside context manager. " - "Please complete the context manager before emitting " - "further commands.", - ): - local_connection.execute(select(1)) - - assert not local_connection.in_transaction() - - @testing.combinations((True,), (False,), argnames="roll_back_in_block") - def test_ctxmanager_rolls_back(self, local_connection, roll_back_in_block): - m1 = mock.Mock() - - event.listen(local_connection, "rollback", m1.rollback) - event.listen(local_connection, "commit", m1.commit) - - with expect_raises_message(Exception, "test"): - with local_connection.begin() as trans: - if roll_back_in_block: - trans.rollback() - - if 1 == 1: - raise Exception("test") - - assert not trans.is_active - assert not local_connection.in_transaction() - assert trans._deactivated_from_connection - - eq_(m1.mock_calls, [mock.call.rollback(local_connection)]) - - @testing.requires.savepoints - def test_ctxmanager_autobegins_real_trans_from_nested( - self, local_connection - ): - m1 = mock.Mock() - - event.listen( - local_connection, "rollback_savepoint", m1.rollback_savepoint - ) - event.listen( - local_connection, "release_savepoint", m1.release_savepoint - ) - event.listen(local_connection, "rollback", m1.rollback) - event.listen(local_connection, "commit", m1.commit) - event.listen(local_connection, "begin", m1.begin) - event.listen(local_connection, "savepoint", m1.savepoint) - - with local_connection.begin_nested() as nested_trans: + def test_reset_agent_no_conn_transaction(self, reset_agent): + with reset_agent.engine.connect(): pass - assert not nested_trans.is_active - assert nested_trans._deactivated_from_connection - # legacy mode, no savepoint at all - eq_( - m1.mock_calls, - [ - mock.call.begin(local_connection), - mock.call.savepoint(local_connection, mock.ANY), - mock.call.release_savepoint( - local_connection, mock.ANY, mock.ANY - ), - ], - ) - - def test_explicit_begin(self): - users = self.tables.users - - with testing.db.connect() as conn: - assert not conn.in_transaction() - conn.begin() - assert conn.in_transaction() - conn.execute(users.insert(), {"user_id": 1, "user_name": "name"}) - conn.commit() - - eq_( - conn.scalar(select(func.count(1)).select_from(users)), - 1, - ) - - def test_no_double_begin(self): - with testing.db.connect() as conn: - conn.begin() - - assert_raises_message( - exc.InvalidRequestError, - r"This connection has already initialized a SQLAlchemy " - r"Transaction\(\) object via begin\(\) or autobegin; can't " - r"call begin\(\) here unless rollback\(\) or commit\(\) is " - r"called first.", - conn.begin, - ) - - def test_no_autocommit(self): - users = self.tables.users - - with testing.db.connect() as conn: - conn.execute(users.insert(), {"user_id": 1, "user_name": "name"}) - - with testing.db.connect() as conn: - eq_( - conn.scalar(select(func.count(1)).select_from(users)), - 0, - ) - - def test_begin_block(self): - users = self.tables.users - - with testing.db.begin() as conn: - conn.execute(users.insert(), {"user_id": 1, "user_name": "name"}) - - with testing.db.connect() as conn: - eq_( - conn.scalar(select(func.count(1)).select_from(users)), - 1, - ) - - @testing.requires.savepoints - def test_savepoint_one(self): - users = self.tables.users - - with testing.db.begin() as conn: - conn.execute(users.insert(), {"user_id": 1, "user_name": "name"}) - - savepoint = conn.begin_nested() - conn.execute(users.insert(), {"user_id": 2, "user_name": "name2"}) - - eq_( - conn.scalar(select(func.count(1)).select_from(users)), - 2, - ) - savepoint.rollback() - - eq_( - conn.scalar(select(func.count(1)).select_from(users)), - 1, - ) - - with testing.db.connect() as conn: - eq_( - conn.scalar(select(func.count(1)).select_from(users)), - 1, - ) - - @testing.requires.savepoints - def test_savepoint_two(self): - users = self.tables.users - - with testing.db.begin() as conn: - conn.execute(users.insert(), {"user_id": 1, "user_name": "name"}) - - savepoint = conn.begin_nested() - conn.execute(users.insert(), {"user_id": 2, "user_name": "name2"}) - - eq_( - conn.scalar(select(func.count(1)).select_from(users)), - 2, - ) - savepoint.commit() - - eq_( - conn.scalar(select(func.count(1)).select_from(users)), - 2, - ) - - with testing.db.connect() as conn: - eq_( - conn.scalar(select(func.count(1)).select_from(users)), - 2, - ) - - @testing.requires.savepoints - def test_savepoint_three(self): - users = self.tables.users - - with testing.db.begin() as conn: - conn.execute(users.insert(), {"user_id": 1, "user_name": "name"}) - - conn.begin_nested() - conn.execute(users.insert(), {"user_id": 2, "user_name": "name2"}) - - conn.rollback() - - assert not conn.in_transaction() - - with testing.db.connect() as conn: - eq_( - conn.scalar(select(func.count(1)).select_from(users)), - 0, - ) - - @testing.requires.savepoints - def test_savepoint_four(self): - users = self.tables.users - - with testing.db.begin() as conn: - conn.execute(users.insert(), {"user_id": 1, "user_name": "name"}) - - sp1 = conn.begin_nested() - conn.execute(users.insert(), {"user_id": 2, "user_name": "name2"}) - - sp2 = conn.begin_nested() - conn.execute(users.insert(), {"user_id": 3, "user_name": "name3"}) - - sp2.rollback() - - assert not sp2.is_active - assert sp1.is_active - assert conn.in_transaction() - - assert not sp1.is_active - - with testing.db.connect() as conn: - eq_( - conn.scalar(select(func.count(1)).select_from(users)), - 2, - ) - - @testing.requires.savepoints - def test_savepoint_five(self): - users = self.tables.users - - with testing.db.begin() as conn: - conn.execute(users.insert(), {"user_id": 1, "user_name": "name"}) - - conn.begin_nested() - conn.execute(users.insert(), {"user_id": 2, "user_name": "name2"}) - - sp2 = conn.begin_nested() - conn.execute(users.insert(), {"user_id": 3, "user_name": "name3"}) - - sp2.commit() - - assert conn.in_transaction() - - with testing.db.connect() as conn: - eq_( - conn.scalar(select(func.count(1)).select_from(users)), - 3, - ) - - @testing.requires.savepoints - def test_savepoint_six(self): - users = self.tables.users - - with testing.db.begin() as conn: - conn.execute(users.insert(), {"user_id": 1, "user_name": "name"}) - - sp1 = conn.begin_nested() - conn.execute(users.insert(), {"user_id": 2, "user_name": "name2"}) - - assert conn._nested_transaction is sp1 - - sp2 = conn.begin_nested() - conn.execute(users.insert(), {"user_id": 3, "user_name": "name3"}) - - assert conn._nested_transaction is sp2 - - sp2.commit() - - assert conn._nested_transaction is sp1 - - sp1.rollback() - - assert conn._nested_transaction is None - - assert conn.in_transaction() - - with testing.db.connect() as conn: - eq_( - conn.scalar(select(func.count(1)).select_from(users)), - 1, - ) - - @testing.requires.savepoints - def test_savepoint_seven(self): - users = self.tables.users - - conn = testing.db.connect() - trans = conn.begin() - conn.execute(users.insert(), {"user_id": 1, "user_name": "name"}) - - sp1 = conn.begin_nested() - conn.execute(users.insert(), {"user_id": 2, "user_name": "name2"}) - - sp2 = conn.begin_nested() - conn.execute(users.insert(), {"user_id": 3, "user_name": "name3"}) - - assert conn.in_transaction() - - trans.close() - - assert not sp1.is_active - assert not sp2.is_active - assert not trans.is_active - assert conn._transaction is None - assert conn._nested_transaction is None - - with testing.db.connect() as conn: - eq_( - conn.scalar(select(func.count(1)).select_from(users)), - 0, - ) + eq_(reset_agent.mock_calls, [mock.call.do_rollback(mock.ANY)]) |