summaryrefslogtreecommitdiff
path: root/test/engine
diff options
context:
space:
mode:
authorMike Bayer <mike_mp@zzzcomputing.com>2021-11-01 15:44:44 -0400
committerMike Bayer <mike_mp@zzzcomputing.com>2021-11-07 14:30:35 -0500
commitd050193daaa8d91371c759296f3304b8641c1976 (patch)
treef3f880ccd528d1dc6c1dafa1a19b71c7c953fdce /test/engine
parent248d232459e38561999c4172acaaddd651c1a933 (diff)
downloadsqlalchemy-d050193daaa8d91371c759296f3304b8641c1976.tar.gz
fully implement future engine and remove legacy
The major action here is to lift and move future.Connection and future.Engine fully into sqlalchemy.engine.base. This removes lots of engine concepts, including: * autocommit * Connection running without a transaction, autobegin is now present in all cases * most "autorollback" is obsolete * Core-level subtransactions (i.e. MarkerTransaction) * "branched" connections, copies of connections * execution_options() returns self, not a new connection * old argument formats, distill_params(), simplifies calling scheme between engine methods * before/after_execute() events (oriented towards compiled constructs) don't emit for exec_driver_sql(). before/after_cursor_execute() is still included for this * old helper methods superseded by context managers, connection.transaction(), engine.transaction() engine.run_callable() * ancient engine-level reflection methods has_table(), table_names() * sqlalchemy.testing.engines.proxying_engine References: #7257 Change-Id: Ib20ed816642d873b84221378a9ec34480e01e82c
Diffstat (limited to 'test/engine')
-rw-r--r--test/engine/test_ddlevents.py83
-rw-r--r--test/engine/test_deprecations.py1390
-rw-r--r--test/engine/test_execute.py545
-rw-r--r--test/engine/test_logging.py23
-rw-r--r--test/engine/test_parseconnect.py16
-rw-r--r--test/engine/test_processors.py86
-rw-r--r--test/engine/test_reconnect.py135
-rw-r--r--test/engine/test_transaction.py1460
8 files changed, 947 insertions, 2791 deletions
diff --git a/test/engine/test_ddlevents.py b/test/engine/test_ddlevents.py
index d2755221d..f339ef171 100644
--- a/test/engine/test_ddlevents.py
+++ b/test/engine/test_ddlevents.py
@@ -538,8 +538,9 @@ class DDLTransactionTest(fixtures.TestBase):
finally:
m.drop_all(testing.db)
- def _listening_engine_fixture(self, future=False):
- eng = engines.testing_engine(future=future)
+ @testing.fixture
+ def listening_engine_fixture(self):
+ eng = engines.testing_engine()
m1 = mock.Mock()
@@ -558,17 +559,7 @@ class DDLTransactionTest(fixtures.TestBase):
return eng, m1
- @testing.fixture
- def listening_engine_fixture(self):
- return self._listening_engine_fixture(future=False)
-
- @testing.fixture
- def future_listening_engine_fixture(self):
- return self._listening_engine_fixture(future=True)
-
- def test_ddl_legacy_engine(
- self, metadata_fixture, listening_engine_fixture
- ):
+ def test_ddl_engine(self, metadata_fixture, listening_engine_fixture):
eng, m1 = listening_engine_fixture
metadata_fixture.create_all(eng)
@@ -583,70 +574,12 @@ class DDLTransactionTest(fixtures.TestBase):
],
)
- def test_ddl_future_engine(
- self, metadata_fixture, future_listening_engine_fixture
- ):
- eng, m1 = future_listening_engine_fixture
-
- metadata_fixture.create_all(eng)
-
- eq_(
- m1.mock_calls,
- [
- mock.call.begin(mock.ANY),
- mock.call.cursor_execute("CREATE TABLE ..."),
- mock.call.cursor_execute("CREATE TABLE ..."),
- mock.call.commit(mock.ANY),
- ],
- )
-
- def test_ddl_legacy_connection_no_transaction(
- self, metadata_fixture, listening_engine_fixture
- ):
- eng, m1 = listening_engine_fixture
-
- with eng.connect() as conn:
- with testing.expect_deprecated(
- "The current statement is being autocommitted using "
- "implicit autocommit"
- ):
- metadata_fixture.create_all(conn)
-
- eq_(
- m1.mock_calls,
- [
- mock.call.cursor_execute("CREATE TABLE ..."),
- mock.call.commit(mock.ANY),
- mock.call.cursor_execute("CREATE TABLE ..."),
- mock.call.commit(mock.ANY),
- ],
- )
-
- def test_ddl_legacy_connection_transaction(
+ def test_ddl_connection_autobegin_transaction(
self, metadata_fixture, listening_engine_fixture
):
eng, m1 = listening_engine_fixture
with eng.connect() as conn:
- with conn.begin():
- metadata_fixture.create_all(conn)
-
- eq_(
- m1.mock_calls,
- [
- mock.call.begin(mock.ANY),
- mock.call.cursor_execute("CREATE TABLE ..."),
- mock.call.cursor_execute("CREATE TABLE ..."),
- mock.call.commit(mock.ANY),
- ],
- )
-
- def test_ddl_future_connection_autobegin_transaction(
- self, metadata_fixture, future_listening_engine_fixture
- ):
- eng, m1 = future_listening_engine_fixture
-
- with eng.connect() as conn:
metadata_fixture.create_all(conn)
conn.commit()
@@ -661,10 +594,10 @@ class DDLTransactionTest(fixtures.TestBase):
],
)
- def test_ddl_future_connection_explicit_begin_transaction(
- self, metadata_fixture, future_listening_engine_fixture
+ def test_ddl_connection_explicit_begin_transaction(
+ self, metadata_fixture, listening_engine_fixture
):
- eng, m1 = future_listening_engine_fixture
+ eng, m1 = listening_engine_fixture
with eng.connect() as conn:
with conn.begin():
diff --git a/test/engine/test_deprecations.py b/test/engine/test_deprecations.py
index 3ab4a4e6c..6e7169d12 100644
--- a/test/engine/test_deprecations.py
+++ b/test/engine/test_deprecations.py
@@ -2,16 +2,12 @@ import re
import sqlalchemy as tsa
import sqlalchemy as sa
-from sqlalchemy import bindparam
from sqlalchemy import create_engine
-from sqlalchemy import DDL
from sqlalchemy import engine
from sqlalchemy import event
from sqlalchemy import exc
from sqlalchemy import ForeignKey
-from sqlalchemy import func
from sqlalchemy import inspect
-from sqlalchemy import INT
from sqlalchemy import Integer
from sqlalchemy import MetaData
from sqlalchemy import pool
@@ -20,7 +16,6 @@ from sqlalchemy import String
from sqlalchemy import testing
from sqlalchemy import text
from sqlalchemy import ThreadLocalMetaData
-from sqlalchemy import VARCHAR
from sqlalchemy.engine import reflection
from sqlalchemy.engine.base import Connection
from sqlalchemy.engine.base import Engine
@@ -36,11 +31,11 @@ from sqlalchemy.testing import is_false
from sqlalchemy.testing import is_instance_of
from sqlalchemy.testing import is_true
from sqlalchemy.testing import mock
+from sqlalchemy.testing.assertions import expect_deprecated
from sqlalchemy.testing.engines import testing_engine
from sqlalchemy.testing.mock import Mock
from sqlalchemy.testing.schema import Column
from sqlalchemy.testing.schema import Table
-from .test_transaction import ResetFixture
def _string_deprecation_expect():
@@ -272,21 +267,6 @@ class ConnectionlessDeprecationTest(fixtures.TestBase):
is_(i1.bind, testing.db)
self.check_usage(i1)
- def test_bind_close_conn(self):
- e = testing.db
- conn = e.connect()
-
- with testing.expect_deprecated_20(
- r"The Connection.connect\(\) method is considered",
- r"The .close\(\) method on a so-called 'branched' connection is "
- r"deprecated as of 1.4, as are 'branched' connections overall, "
- r"and will be removed in a future release.",
- ):
- with conn.connect() as c2:
- assert not c2.closed
- assert not conn.closed
- assert c2.closed
-
class CreateEngineTest(fixtures.TestBase):
def test_strategy_keyword_mock(self):
@@ -329,575 +309,6 @@ class CreateEngineTest(fixtures.TestBase):
)
-class TransactionTest(ResetFixture, fixtures.TablesTest):
- __backend__ = True
-
- @classmethod
- def define_tables(cls, metadata):
- Table(
- "users",
- metadata,
- Column("user_id", Integer, primary_key=True),
- Column("user_name", String(20)),
- test_needs_acid=True,
- )
- Table("inserttable", metadata, Column("data", String(20)))
-
- @testing.fixture
- def local_connection(self):
- with testing.db.connect() as conn:
- yield conn
-
- def test_transaction_container(self):
- users = self.tables.users
-
- def go(conn, table, data):
- for d in data:
- conn.execute(table.insert(), d)
-
- with testing.expect_deprecated(
- r"The Engine.transaction\(\) method is deprecated"
- ):
- testing.db.transaction(
- go, users, [dict(user_id=1, user_name="user1")]
- )
-
- with testing.db.connect() as conn:
- eq_(conn.execute(users.select()).fetchall(), [(1, "user1")])
- with testing.expect_deprecated(
- r"The Engine.transaction\(\) method is deprecated"
- ):
- assert_raises(
- tsa.exc.DBAPIError,
- testing.db.transaction,
- go,
- users,
- [
- {"user_id": 2, "user_name": "user2"},
- {"user_id": 1, "user_name": "user3"},
- ],
- )
- with testing.db.connect() as conn:
- eq_(conn.execute(users.select()).fetchall(), [(1, "user1")])
-
- def test_begin_begin_rollback_rollback(self, reset_agent):
- with reset_agent.engine.connect() as connection:
- trans = connection.begin()
- with testing.expect_deprecated_20(
- r"Calling .begin\(\) when a transaction is already "
- "begun, creating a 'sub' transaction"
- ):
- trans2 = connection.begin()
- trans2.rollback()
- trans.rollback()
- eq_(
- reset_agent.mock_calls,
- [
- mock.call.rollback(connection),
- mock.call.do_rollback(mock.ANY),
- mock.call.do_rollback(mock.ANY),
- ],
- )
-
- def test_begin_begin_commit_commit(self, reset_agent):
- with reset_agent.engine.connect() as connection:
- trans = connection.begin()
- with testing.expect_deprecated_20(
- r"Calling .begin\(\) when a transaction is already "
- "begun, creating a 'sub' transaction"
- ):
- trans2 = connection.begin()
- trans2.commit()
- trans.commit()
- eq_(
- reset_agent.mock_calls,
- [
- mock.call.commit(connection),
- mock.call.do_commit(mock.ANY),
- mock.call.do_rollback(mock.ANY),
- ],
- )
-
- def test_branch_nested_rollback(self, local_connection):
- connection = local_connection
- users = self.tables.users
- connection.begin()
- with testing.expect_deprecated_20(
- r"The Connection.connect\(\) method is considered legacy"
- ):
- branched = connection.connect()
- assert branched.in_transaction()
- branched.execute(users.insert(), dict(user_id=1, user_name="user1"))
- with testing.expect_deprecated_20(
- r"Calling .begin\(\) when a transaction is already "
- "begun, creating a 'sub' transaction"
- ):
- nested = branched.begin()
- branched.execute(users.insert(), dict(user_id=2, user_name="user2"))
- nested.rollback()
- assert not connection.in_transaction()
-
- assert_raises_message(
- exc.InvalidRequestError,
- "This connection is on an inactive transaction. Please",
- connection.exec_driver_sql,
- "select 1",
- )
-
- @testing.requires.savepoints
- def test_savepoint_cancelled_by_toplevel_marker(self, local_connection):
- conn = local_connection
- users = self.tables.users
- trans = conn.begin()
- conn.execute(users.insert(), {"user_id": 1, "user_name": "name"})
-
- with testing.expect_deprecated_20(
- r"Calling .begin\(\) when a transaction is already "
- "begun, creating a 'sub' transaction"
- ):
- mk1 = conn.begin()
-
- sp1 = conn.begin_nested()
- conn.execute(users.insert(), {"user_id": 2, "user_name": "name2"})
-
- mk1.rollback()
-
- assert not sp1.is_active
- assert not trans.is_active
- assert conn._transaction is trans
- assert conn._nested_transaction is None
-
- with testing.db.connect() as conn:
- eq_(
- conn.scalar(select(func.count(1)).select_from(users)),
- 0,
- )
-
- @testing.requires.savepoints
- def test_rollback_to_subtransaction(self, local_connection):
- connection = local_connection
- users = self.tables.users
- transaction = connection.begin()
- connection.execute(users.insert(), dict(user_id=1, user_name="user1"))
- trans2 = connection.begin_nested()
- connection.execute(users.insert(), dict(user_id=2, user_name="user2"))
-
- with testing.expect_deprecated_20(
- r"Calling .begin\(\) when a transaction is already "
- "begun, creating a 'sub' transaction"
- ):
- trans3 = connection.begin()
- connection.execute(users.insert(), dict(user_id=3, user_name="user3"))
- trans3.rollback()
-
- assert_raises_message(
- exc.InvalidRequestError,
- "This connection is on an inactive savepoint transaction.",
- connection.exec_driver_sql,
- "select 1",
- )
- trans2.rollback()
- assert connection._nested_transaction is None
-
- connection.execute(users.insert(), dict(user_id=4, user_name="user4"))
- transaction.commit()
- eq_(
- connection.execute(
- select(users.c.user_id).order_by(users.c.user_id)
- ).fetchall(),
- [(1,), (4,)],
- )
-
- # PG emergency shutdown:
- # select * from pg_prepared_xacts
- # ROLLBACK PREPARED '<xid>'
- # MySQL emergency shutdown:
- # for arg in `mysql -u root -e "xa recover" | cut -c 8-100 |
- # grep sa`; do mysql -u root -e "xa rollback '$arg'"; done
- @testing.requires.skip_mysql_on_windows
- @testing.requires.two_phase_transactions
- @testing.requires.savepoints
- def test_mixed_two_phase_transaction(self, local_connection):
- connection = local_connection
- users = self.tables.users
- transaction = connection.begin_twophase()
- connection.execute(users.insert(), dict(user_id=1, user_name="user1"))
- with testing.expect_deprecated_20(
- r"Calling .begin\(\) when a transaction is already "
- "begun, creating a 'sub' transaction"
- ):
- transaction2 = connection.begin()
- connection.execute(users.insert(), dict(user_id=2, user_name="user2"))
- transaction3 = connection.begin_nested()
- connection.execute(users.insert(), dict(user_id=3, user_name="user3"))
- with testing.expect_deprecated_20(
- r"Calling .begin\(\) when a transaction is already "
- "begun, creating a 'sub' transaction"
- ):
- transaction4 = connection.begin()
- connection.execute(users.insert(), dict(user_id=4, user_name="user4"))
- transaction4.commit()
- transaction3.rollback()
- connection.execute(users.insert(), dict(user_id=5, user_name="user5"))
- transaction2.commit()
- transaction.prepare()
- transaction.commit()
- eq_(
- connection.execute(
- select(users.c.user_id).order_by(users.c.user_id)
- ).fetchall(),
- [(1,), (2,), (5,)],
- )
-
- @testing.requires.savepoints
- def test_inactive_due_to_subtransaction_on_nested_no_commit(
- self, local_connection
- ):
- connection = local_connection
- trans = connection.begin()
-
- nested = connection.begin_nested()
-
- with testing.expect_deprecated_20(
- r"Calling .begin\(\) when a transaction is already "
- "begun, creating a 'sub' transaction"
- ):
- trans2 = connection.begin()
- trans2.rollback()
-
- assert_raises_message(
- exc.InvalidRequestError,
- "This connection is on an inactive savepoint transaction. "
- "Please rollback",
- nested.commit,
- )
- trans.commit()
-
- assert_raises_message(
- exc.InvalidRequestError,
- "This nested transaction is inactive",
- nested.commit,
- )
-
- def test_close(self, local_connection):
- connection = local_connection
- users = self.tables.users
- transaction = connection.begin()
- connection.execute(users.insert(), dict(user_id=1, user_name="user1"))
- connection.execute(users.insert(), dict(user_id=2, user_name="user2"))
- connection.execute(users.insert(), dict(user_id=3, user_name="user3"))
- with testing.expect_deprecated_20(
- r"Calling .begin\(\) when a transaction is already "
- "begun, creating a 'sub' transaction"
- ):
- trans2 = connection.begin()
- connection.execute(users.insert(), dict(user_id=4, user_name="user4"))
- connection.execute(users.insert(), dict(user_id=5, user_name="user5"))
- assert connection.in_transaction()
- trans2.close()
- assert connection.in_transaction()
- transaction.commit()
- assert not connection.in_transaction()
- self.assert_(
- connection.exec_driver_sql(
- "select count(*) from " "users"
- ).scalar()
- == 5
- )
- result = connection.exec_driver_sql("select * from users")
- assert len(result.fetchall()) == 5
-
- def test_close2(self, local_connection):
- connection = local_connection
- users = self.tables.users
- transaction = connection.begin()
- connection.execute(users.insert(), dict(user_id=1, user_name="user1"))
- connection.execute(users.insert(), dict(user_id=2, user_name="user2"))
- connection.execute(users.insert(), dict(user_id=3, user_name="user3"))
- with testing.expect_deprecated_20(
- r"Calling .begin\(\) when a transaction is already "
- "begun, creating a 'sub' transaction"
- ):
- trans2 = connection.begin()
- connection.execute(users.insert(), dict(user_id=4, user_name="user4"))
- connection.execute(users.insert(), dict(user_id=5, user_name="user5"))
- assert connection.in_transaction()
- trans2.close()
- assert connection.in_transaction()
- transaction.close()
- assert not connection.in_transaction()
- self.assert_(
- connection.exec_driver_sql(
- "select count(*) from " "users"
- ).scalar()
- == 0
- )
- result = connection.exec_driver_sql("select * from users")
- assert len(result.fetchall()) == 0
-
- def test_inactive_due_to_subtransaction_no_commit(self, local_connection):
- connection = local_connection
- trans = connection.begin()
- with testing.expect_deprecated_20(
- r"Calling .begin\(\) when a transaction is already "
- "begun, creating a 'sub' transaction"
- ):
- trans2 = connection.begin()
- trans2.rollback()
- assert_raises_message(
- exc.InvalidRequestError,
- "This connection is on an inactive transaction. Please rollback",
- trans.commit,
- )
-
- trans.rollback()
-
- assert_raises_message(
- exc.InvalidRequestError,
- "This transaction is inactive",
- trans.commit,
- )
-
- def test_nested_rollback(self, local_connection):
- connection = local_connection
- users = self.tables.users
- try:
- transaction = connection.begin()
- try:
- connection.execute(
- users.insert(), dict(user_id=1, user_name="user1")
- )
- connection.execute(
- users.insert(), dict(user_id=2, user_name="user2")
- )
- connection.execute(
- users.insert(), dict(user_id=3, user_name="user3")
- )
- with testing.expect_deprecated_20(
- r"Calling .begin\(\) when a transaction is already "
- "begun, creating a 'sub' transaction"
- ):
- trans2 = connection.begin()
- try:
- connection.execute(
- users.insert(), dict(user_id=4, user_name="user4")
- )
- connection.execute(
- users.insert(), dict(user_id=5, user_name="user5")
- )
- raise Exception("uh oh")
- trans2.commit()
- except Exception:
- trans2.rollback()
- raise
- transaction.rollback()
- except Exception:
- transaction.rollback()
- raise
- except Exception as e:
- # and not "This transaction is inactive"
- # comment moved here to fix pep8
- assert str(e) == "uh oh"
- else:
- assert False
-
- def test_nesting(self, local_connection):
- connection = local_connection
- users = self.tables.users
- transaction = connection.begin()
- connection.execute(users.insert(), dict(user_id=1, user_name="user1"))
- connection.execute(users.insert(), dict(user_id=2, user_name="user2"))
- connection.execute(users.insert(), dict(user_id=3, user_name="user3"))
- with testing.expect_deprecated_20(
- r"Calling .begin\(\) when a transaction is already "
- "begun, creating a 'sub' transaction"
- ):
- trans2 = connection.begin()
- connection.execute(users.insert(), dict(user_id=4, user_name="user4"))
- connection.execute(users.insert(), dict(user_id=5, user_name="user5"))
- trans2.commit()
- transaction.rollback()
- self.assert_(
- connection.exec_driver_sql(
- "select count(*) from " "users"
- ).scalar()
- == 0
- )
- result = connection.exec_driver_sql("select * from users")
- assert len(result.fetchall()) == 0
-
- def test_no_marker_on_inactive_trans(self, local_connection):
- conn = local_connection
- conn.begin()
-
- with testing.expect_deprecated_20(
- r"Calling .begin\(\) when a transaction is already "
- "begun, creating a 'sub' transaction"
- ):
- mk1 = conn.begin()
-
- mk1.rollback()
-
- assert_raises_message(
- exc.InvalidRequestError,
- "the current transaction on this connection is inactive.",
- conn.begin,
- )
-
- def test_implicit_autocommit_compiled(self):
- users = self.tables.users
-
- with testing.db.connect() as conn:
- with testing.expect_deprecated_20(
- "The current statement is being autocommitted "
- "using implicit autocommit."
- ):
- conn.execute(
- users.insert(), {"user_id": 1, "user_name": "user3"}
- )
-
- def test_implicit_autocommit_text(self):
- with testing.db.connect() as conn:
- with testing.expect_deprecated_20(
- "The current statement is being autocommitted "
- "using implicit autocommit."
- ):
- conn.execute(
- text("insert into inserttable (data) values ('thedata')")
- )
-
- def test_implicit_autocommit_driversql(self):
- with testing.db.connect() as conn:
- with testing.expect_deprecated_20(
- "The current statement is being autocommitted "
- "using implicit autocommit."
- ):
- conn.exec_driver_sql(
- "insert into inserttable (data) values ('thedata')"
- )
-
- def test_branch_autorollback(self, local_connection):
- connection = local_connection
- users = self.tables.users
- with testing.expect_deprecated_20(
- r"The Connection.connect\(\) method is considered legacy"
- ):
- branched = connection.connect()
- with testing.expect_deprecated_20(
- "The current statement is being autocommitted using "
- "implicit autocommit"
- ):
- branched.execute(
- users.insert(), dict(user_id=1, user_name="user1")
- )
- assert_raises(
- exc.DBAPIError,
- branched.execute,
- users.insert(),
- dict(user_id=1, user_name="user1"),
- )
- # can continue w/o issue
- with testing.expect_deprecated_20(
- "The current statement is being autocommitted using "
- "implicit autocommit"
- ):
- branched.execute(
- users.insert(), dict(user_id=2, user_name="user2")
- )
-
- def test_branch_orig_rollback(self, local_connection):
- connection = local_connection
- users = self.tables.users
- with testing.expect_deprecated_20(
- r"The Connection.connect\(\) method is considered legacy"
- ):
- branched = connection.connect()
- with testing.expect_deprecated_20(
- "The current statement is being autocommitted using "
- "implicit autocommit"
- ):
- branched.execute(
- users.insert(), dict(user_id=1, user_name="user1")
- )
- nested = branched.begin()
- assert branched.in_transaction()
- branched.execute(users.insert(), dict(user_id=2, user_name="user2"))
- nested.rollback()
- eq_(
- connection.exec_driver_sql("select count(*) from users").scalar(),
- 1,
- )
-
- @testing.requires.independent_connections
- def test_branch_autocommit(self, local_connection):
- users = self.tables.users
- with testing.db.connect() as connection:
- with testing.expect_deprecated_20(
- r"The Connection.connect\(\) method is considered legacy"
- ):
- branched = connection.connect()
- with testing.expect_deprecated_20(
- "The current statement is being autocommitted using "
- "implicit autocommit"
- ):
- branched.execute(
- users.insert(), dict(user_id=1, user_name="user1")
- )
-
- eq_(
- local_connection.execute(
- text("select count(*) from users")
- ).scalar(),
- 1,
- )
-
- @testing.requires.savepoints
- def test_branch_savepoint_rollback(self, local_connection):
- connection = local_connection
- users = self.tables.users
- trans = connection.begin()
- with testing.expect_deprecated_20(
- r"The Connection.connect\(\) method is considered legacy"
- ):
- branched = connection.connect()
- assert branched.in_transaction()
- branched.execute(users.insert(), dict(user_id=1, user_name="user1"))
- nested = branched.begin_nested()
- branched.execute(users.insert(), dict(user_id=2, user_name="user2"))
- nested.rollback()
- assert connection.in_transaction()
- trans.commit()
- eq_(
- connection.exec_driver_sql("select count(*) from users").scalar(),
- 1,
- )
-
- @testing.requires.two_phase_transactions
- def test_branch_twophase_rollback(self, local_connection):
- connection = local_connection
- users = self.tables.users
- with testing.expect_deprecated_20(
- r"The Connection.connect\(\) method is considered legacy"
- ):
- branched = connection.connect()
- assert not branched.in_transaction()
- with testing.expect_deprecated_20(
- r"The current statement is being autocommitted using "
- "implicit autocommit"
- ):
- branched.execute(
- users.insert(), dict(user_id=1, user_name="user1")
- )
- nested = branched.begin_twophase()
- branched.execute(users.insert(), dict(user_id=2, user_name="user2"))
- nested.rollback()
- assert not connection.in_transaction()
- eq_(
- connection.exec_driver_sql("select count(*) from users").scalar(),
- 1,
- )
-
-
class HandleInvalidatedOnConnectTest(fixtures.TestBase):
__requires__ = ("sqlite",)
@@ -987,155 +398,6 @@ def select1(db):
return str(select(1).compile(dialect=db.dialect))
-class DeprecatedEngineFeatureTest(fixtures.TablesTest):
- __backend__ = True
-
- @classmethod
- def define_tables(cls, metadata):
- cls.table = Table(
- "exec_test",
- metadata,
- Column("a", Integer),
- Column("b", Integer),
- test_needs_acid=True,
- )
-
- def _trans_fn(self, is_transaction=False):
- def go(conn, x, value=None):
- if is_transaction:
- conn = conn.connection
- conn.execute(self.table.insert().values(a=x, b=value))
-
- return go
-
- def _trans_rollback_fn(self, is_transaction=False):
- def go(conn, x, value=None):
- if is_transaction:
- conn = conn.connection
- conn.execute(self.table.insert().values(a=x, b=value))
- raise SomeException("breakage")
-
- return go
-
- def _assert_no_data(self):
- with testing.db.connect() as conn:
- eq_(
- conn.scalar(select(func.count("*")).select_from(self.table)),
- 0,
- )
-
- def _assert_fn(self, x, value=None):
- with testing.db.connect() as conn:
- eq_(conn.execute(self.table.select()).fetchall(), [(x, value)])
-
- def test_transaction_engine_fn_commit(self):
- fn = self._trans_fn()
- with testing.expect_deprecated(r"The Engine.transaction\(\) method"):
- testing.db.transaction(fn, 5, value=8)
- self._assert_fn(5, value=8)
-
- def test_transaction_engine_fn_rollback(self):
- fn = self._trans_rollback_fn()
- with testing.expect_deprecated(
- r"The Engine.transaction\(\) method is deprecated"
- ):
- assert_raises_message(
- Exception, "breakage", testing.db.transaction, fn, 5, value=8
- )
- self._assert_no_data()
-
- def test_transaction_connection_fn_commit(self):
- fn = self._trans_fn()
- with testing.db.connect() as conn:
- with testing.expect_deprecated(
- r"The Connection.transaction\(\) method is deprecated"
- ):
- conn.transaction(fn, 5, value=8)
- self._assert_fn(5, value=8)
-
- def test_transaction_connection_fn_rollback(self):
- fn = self._trans_rollback_fn()
- with testing.db.connect() as conn:
- with testing.expect_deprecated(r""):
- assert_raises(Exception, conn.transaction, fn, 5, value=8)
- self._assert_no_data()
-
- def test_execute_plain_string(self):
- with testing.db.connect() as conn:
- with _string_deprecation_expect():
- conn.execute(select1(testing.db)).scalar()
-
- def test_execute_plain_string_events(self):
-
- m1 = Mock()
- select1_str = select1(testing.db)
- with _string_deprecation_expect():
- with testing.db.connect() as conn:
- event.listen(conn, "before_execute", m1.before_execute)
- event.listen(conn, "after_execute", m1.after_execute)
- result = conn.execute(select1_str)
- eq_(
- m1.mock_calls,
- [
- mock.call.before_execute(mock.ANY, select1_str, [], {}, {}),
- mock.call.after_execute(
- mock.ANY, select1_str, [], {}, {}, result
- ),
- ],
- )
-
- def test_scalar_plain_string(self):
- with testing.db.connect() as conn:
- with _string_deprecation_expect():
- conn.scalar(select1(testing.db))
-
- # Tests for the warning when non dict params are used
- # @testing.combinations(42, (42,))
- # def test_execute_positional_non_dicts(self, args):
- # with testing.expect_deprecated(
- # r"Usage of tuple or scalars as positional arguments of "
- # ):
- # testing.db.execute(text(select1(testing.db)), args).scalar()
-
- # @testing.combinations(42, (42,))
- # def test_scalar_positional_non_dicts(self, args):
- # with testing.expect_deprecated(
- # r"Usage of tuple or scalars as positional arguments of "
- # ):
- # testing.db.scalar(text(select1(testing.db)), args)
-
-
-class DeprecatedConnectionFeatureTest(fixtures.TablesTest):
- __backend__ = True
-
- def test_execute_plain_string(self):
- with _string_deprecation_expect():
- with testing.db.connect() as conn:
- conn.execute(select1(testing.db)).scalar()
-
- def test_scalar_plain_string(self):
- with _string_deprecation_expect():
- with testing.db.connect() as conn:
- conn.scalar(select1(testing.db))
-
- # Tests for the warning when non dict params are used
- # @testing.combinations(42, (42,))
- # def test_execute_positional_non_dicts(self, args):
- # with testing.expect_deprecated(
- # r"Usage of tuple or scalars as positional arguments of "
- # ):
- # with testing.db.connect() as conn:
- # conn.execute(text(select1(testing.db)), args).scalar()
-
- # @testing.combinations(42, (42,))
- # def test_scalar_positional_non_dicts(self, args):
- # with testing.expect_deprecated(
- # r"Usage of tuple or scalars as positional arguments of "
- # ):
- # with testing.db.connect() as conn:
- # conn.scalar(text(select1(testing.db)), args)
-
-
class DeprecatedReflectionTest(fixtures.TablesTest):
@classmethod
def define_tables(cls, metadata):
@@ -1153,73 +415,6 @@ class DeprecatedReflectionTest(fixtures.TablesTest):
Column("email", String(50)),
)
- def test_exists(self):
- dont_exist = Table("dont_exist", MetaData())
- with testing.expect_deprecated(
- r"The Table.exists\(\) method is deprecated"
- ):
- is_false(dont_exist.exists(testing.db))
-
- user = self.tables.user
- with testing.expect_deprecated(
- r"The Table.exists\(\) method is deprecated"
- ):
- is_true(user.exists(testing.db))
-
- def test_create_drop_explicit(self):
- metadata = MetaData()
- table = Table("test_table", metadata, Column("foo", Integer))
- bind = testing.db
- for args in [([], {"bind": bind}), ([bind], {})]:
- metadata.create_all(*args[0], **args[1])
- with testing.expect_deprecated(
- r"The Table.exists\(\) method is deprecated"
- ):
- assert table.exists(*args[0], **args[1])
- metadata.drop_all(*args[0], **args[1])
- table.create(*args[0], **args[1])
- table.drop(*args[0], **args[1])
- with testing.expect_deprecated(
- r"The Table.exists\(\) method is deprecated"
- ):
- assert not table.exists(*args[0], **args[1])
-
- def test_create_drop_err_table(self):
- metadata = MetaData()
- table = Table("test_table", metadata, Column("foo", Integer))
-
- with testing.expect_deprecated(
- r"The Table.exists\(\) method is deprecated"
- ):
- assert_raises_message(
- tsa.exc.UnboundExecutionError,
- (
- "Table object 'test_table' is not bound to an Engine or "
- "Connection."
- ),
- table.exists,
- )
-
- def test_engine_has_table(self):
- with testing.expect_deprecated(
- r"The Engine.has_table\(\) method is deprecated"
- ):
- is_false(testing.db.has_table("dont_exist"))
-
- with testing.expect_deprecated(
- r"The Engine.has_table\(\) method is deprecated"
- ):
- is_true(testing.db.has_table("user"))
-
- def test_engine_table_names(self):
- metadata = self.tables_test_metadata
-
- with testing.expect_deprecated(
- r"The Engine.table_names\(\) method is deprecated"
- ):
- table_names = testing.db.table_names()
- is_true(set(table_names).issuperset(metadata.tables))
-
def test_reflecttable(self):
inspector = inspect(testing.db)
metadata = MetaData()
@@ -1234,304 +429,6 @@ class DeprecatedReflectionTest(fixtures.TablesTest):
eq_(res, exp)
-class ExecutionOptionsTest(fixtures.TestBase):
- def test_branched_connection_execution_options(self):
- engine = engines.testing_engine("sqlite://")
-
- conn = engine.connect()
- c2 = conn.execution_options(foo="bar")
-
- with testing.expect_deprecated_20(
- r"The Connection.connect\(\) method is considered "
- ):
- c2_branch = c2.connect()
- eq_(c2_branch._execution_options, {"foo": "bar"})
-
-
-class RawExecuteTest(fixtures.TablesTest):
- __backend__ = True
-
- @classmethod
- def define_tables(cls, metadata):
- Table(
- "users",
- metadata,
- Column("user_id", INT, primary_key=True, autoincrement=False),
- Column("user_name", VARCHAR(20)),
- )
- Table(
- "users_autoinc",
- metadata,
- Column(
- "user_id", INT, primary_key=True, test_needs_autoincrement=True
- ),
- Column("user_name", VARCHAR(20)),
- )
-
- def test_no_params_option(self, connection):
- stmt = (
- "SELECT '%'"
- + testing.db.dialect.statement_compiler(
- testing.db.dialect, None
- ).default_from()
- )
-
- with _string_deprecation_expect():
- result = (
- connection.execution_options(no_parameters=True)
- .execute(stmt)
- .scalar()
- )
- eq_(result, "%")
-
- @testing.requires.qmark_paramstyle
- def test_raw_qmark(self, connection):
- conn = connection
-
- with _string_deprecation_expect():
- conn.execute(
- "insert into users (user_id, user_name) " "values (?, ?)",
- (1, "jack"),
- )
- with _string_deprecation_expect():
- conn.execute(
- "insert into users (user_id, user_name) " "values (?, ?)",
- [2, "fred"],
- )
-
- with _string_deprecation_expect():
- conn.execute(
- "insert into users (user_id, user_name) " "values (?, ?)",
- [3, "ed"],
- [4, "horse"],
- )
- with _string_deprecation_expect():
- conn.execute(
- "insert into users (user_id, user_name) " "values (?, ?)",
- (5, "barney"),
- (6, "donkey"),
- )
-
- with _string_deprecation_expect():
- conn.execute(
- "insert into users (user_id, user_name) " "values (?, ?)",
- 7,
- "sally",
- )
-
- with _string_deprecation_expect():
- res = conn.execute("select * from users order by user_id")
- assert res.fetchall() == [
- (1, "jack"),
- (2, "fred"),
- (3, "ed"),
- (4, "horse"),
- (5, "barney"),
- (6, "donkey"),
- (7, "sally"),
- ]
- for multiparam, param in [
- (("jack", "fred"), {}),
- ((["jack", "fred"],), {}),
- ]:
- with _string_deprecation_expect():
- res = conn.execute(
- "select * from users where user_name=? or "
- "user_name=? order by user_id",
- *multiparam,
- **param
- )
- assert res.fetchall() == [(1, "jack"), (2, "fred")]
-
- with _string_deprecation_expect():
- res = conn.execute("select * from users where user_name=?", "jack")
- assert res.fetchall() == [(1, "jack")]
-
- @testing.requires.format_paramstyle
- def test_raw_sprintf(self, connection):
- conn = connection
- with _string_deprecation_expect():
- conn.execute(
- "insert into users (user_id, user_name) " "values (%s, %s)",
- [1, "jack"],
- )
- with _string_deprecation_expect():
- conn.execute(
- "insert into users (user_id, user_name) " "values (%s, %s)",
- [2, "ed"],
- [3, "horse"],
- )
- with _string_deprecation_expect():
- conn.execute(
- "insert into users (user_id, user_name) " "values (%s, %s)",
- 4,
- "sally",
- )
- with _string_deprecation_expect():
- conn.execute("insert into users (user_id) values (%s)", 5)
- with _string_deprecation_expect():
- res = conn.execute("select * from users order by user_id")
- assert res.fetchall() == [
- (1, "jack"),
- (2, "ed"),
- (3, "horse"),
- (4, "sally"),
- (5, None),
- ]
- for multiparam, param in [
- (("jack", "ed"), {}),
- ((["jack", "ed"],), {}),
- ]:
- with _string_deprecation_expect():
- res = conn.execute(
- "select * from users where user_name=%s or "
- "user_name=%s order by user_id",
- *multiparam,
- **param
- )
- assert res.fetchall() == [(1, "jack"), (2, "ed")]
- with _string_deprecation_expect():
- res = conn.execute(
- "select * from users where user_name=%s", "jack"
- )
- assert res.fetchall() == [(1, "jack")]
-
- @testing.requires.pyformat_paramstyle
- def test_raw_python(self, connection):
- conn = connection
- with _string_deprecation_expect():
- conn.execute(
- "insert into users (user_id, user_name) "
- "values (%(id)s, %(name)s)",
- {"id": 1, "name": "jack"},
- )
- with _string_deprecation_expect():
- conn.execute(
- "insert into users (user_id, user_name) "
- "values (%(id)s, %(name)s)",
- {"id": 2, "name": "ed"},
- {"id": 3, "name": "horse"},
- )
- with _string_deprecation_expect():
- conn.execute(
- "insert into users (user_id, user_name) "
- "values (%(id)s, %(name)s)",
- id=4,
- name="sally",
- )
- with _string_deprecation_expect():
- res = conn.execute("select * from users order by user_id")
- assert res.fetchall() == [
- (1, "jack"),
- (2, "ed"),
- (3, "horse"),
- (4, "sally"),
- ]
-
- @testing.requires.named_paramstyle
- def test_raw_named(self, connection):
- conn = connection
- with _string_deprecation_expect():
- conn.execute(
- "insert into users (user_id, user_name) "
- "values (:id, :name)",
- {"id": 1, "name": "jack"},
- )
- with _string_deprecation_expect():
- conn.execute(
- "insert into users (user_id, user_name) "
- "values (:id, :name)",
- {"id": 2, "name": "ed"},
- {"id": 3, "name": "horse"},
- )
- with _string_deprecation_expect():
- conn.execute(
- "insert into users (user_id, user_name) "
- "values (:id, :name)",
- id=4,
- name="sally",
- )
- with _string_deprecation_expect():
- res = conn.execute("select * from users order by user_id")
- assert res.fetchall() == [
- (1, "jack"),
- (2, "ed"),
- (3, "horse"),
- (4, "sally"),
- ]
-
-
-class DeprecatedExecParamsTest(fixtures.TablesTest):
- __backend__ = True
-
- @classmethod
- def define_tables(cls, metadata):
- Table(
- "users",
- metadata,
- Column("user_id", INT, primary_key=True, autoincrement=False),
- Column("user_name", VARCHAR(20)),
- )
-
- Table(
- "users_autoinc",
- metadata,
- Column(
- "user_id", INT, primary_key=True, test_needs_autoincrement=True
- ),
- Column("user_name", VARCHAR(20)),
- )
-
- def test_kwargs(self, connection):
- users = self.tables.users
-
- with testing.expect_deprecated_20(
- r"The connection.execute\(\) method in "
- "SQLAlchemy 2.0 will accept parameters as a single "
- ):
- connection.execute(
- users.insert(), user_id=5, user_name="some name"
- )
-
- eq_(connection.execute(select(users)).all(), [(5, "some name")])
-
- def test_positional_dicts(self, connection):
- users = self.tables.users
-
- with testing.expect_deprecated_20(
- r"The connection.execute\(\) method in "
- "SQLAlchemy 2.0 will accept parameters as a single "
- ):
- connection.execute(
- users.insert(),
- {"user_id": 5, "user_name": "some name"},
- {"user_id": 6, "user_name": "some other name"},
- )
-
- eq_(
- connection.execute(select(users).order_by(users.c.user_id)).all(),
- [(5, "some name"), (6, "some other name")],
- )
-
- @testing.requires.empty_inserts
- def test_single_scalar(self, connection):
-
- users = self.tables.users_autoinc
-
- with testing.expect_deprecated_20(
- r"The connection.execute\(\) method in "
- "SQLAlchemy 2.0 will accept parameters as a single "
- ):
- # TODO: I'm not even sure what this exec format is or how
- # it worked if at all
- connection.execute(users.insert(), "some name")
-
- eq_(
- connection.execute(select(users).order_by(users.c.user_id)).all(),
- [(1, None)],
- )
-
-
class EngineEventsTest(fixtures.TestBase):
__requires__ = ("ad_hoc_engines",)
__backend__ = True
@@ -1555,54 +452,33 @@ class EngineEventsTest(fixtures.TestBase):
):
break
- @testing.combinations(
- ((), {"z": 10}, [], {"z": 10}, testing.requires.legacy_engine),
- )
- def test_modify_parameters_from_event_one(
- self, multiparams, params, expected_multiparams, expected_params
- ):
- # this is testing both the normalization added to parameters
- # as of I97cb4d06adfcc6b889f10d01cc7775925cffb116 as well as
- # that the return value from the event is taken as the new set
- # of parameters.
- def before_execute(
- conn, clauseelement, multiparams, params, execution_options
- ):
- eq_(multiparams, expected_multiparams)
- eq_(params, expected_params)
- return clauseelement, (), {"q": "15"}
+ def test_engine_connect(self, testing_engine):
+ e1 = testing_engine(config.db_url)
- def after_execute(
- conn, clauseelement, multiparams, params, result, execution_options
- ):
- eq_(multiparams, ())
- eq_(params, {"q": "15"})
+ canary = Mock()
- e1 = testing_engine(config.db_url)
- event.listen(e1, "before_execute", before_execute, retval=True)
- event.listen(e1, "after_execute", after_execute)
+ def thing(conn, branch):
+ canary(conn, branch)
- with e1.connect() as conn:
- with testing.expect_deprecated_20(
- r"The connection\.execute\(\) method"
- ):
- result = conn.execute(
- select(bindparam("q", type_=String)),
- *multiparams,
- **params
- )
- eq_(result.all(), [("15",)])
+ event.listen(e1, "engine_connect", thing)
- @testing.only_on("sqlite")
- def test_modify_statement_string(self, connection):
- @event.listens_for(connection, "before_execute", retval=True)
- def _modify(
- conn, clauseelement, multiparams, params, execution_options
- ):
- return clauseelement.replace("hi", "there"), multiparams, params
+ msg = (
+ r"The argument signature for the "
+ r'"ConnectionEvents.engine_connect" event listener has changed as '
+ r"of version 2.0, and conversion for the old argument signature "
+ r"will be removed in a future release. The new signature is "
+ r'"def engine_connect\(conn\)'
+ )
- with _string_deprecation_expect():
- eq_(connection.scalar("select 'hi'"), "there")
+ with expect_deprecated(msg):
+ c1 = e1.connect()
+ c1.close()
+
+ with expect_deprecated(msg):
+ c2 = e1.connect()
+ c2.close()
+
+ eq_(canary.mock_calls, [mock.call(c1, False), mock.call(c2, False)])
def test_retval_flag(self):
canary = []
@@ -1669,223 +545,3 @@ class EngineEventsTest(fixtures.TestBase):
with e1.connect() as conn:
result = conn.execute(select(1))
result.close()
-
-
-class DDLExecutionTest(fixtures.TestBase):
- def setup_test(self):
- self.engine = engines.mock_engine()
- self.metadata = MetaData()
- self.users = Table(
- "users",
- self.metadata,
- Column("user_id", Integer, primary_key=True),
- Column("user_name", String(40)),
- )
-
-
-class AutocommitKeywordFixture(object):
- def _test_keyword(self, keyword, expected=True):
- dbapi = Mock(
- connect=Mock(
- return_value=Mock(
- cursor=Mock(return_value=Mock(description=()))
- )
- )
- )
- engine = engines.testing_engine(
- options={"_initialize": False, "pool_reset_on_return": None}
- )
- engine.dialect.dbapi = dbapi
-
- with engine.connect() as conn:
- if expected:
- with testing.expect_deprecated_20(
- "The current statement is being autocommitted "
- "using implicit autocommit"
- ):
- conn.exec_driver_sql(
- "%s something table something" % keyword
- )
- else:
- conn.exec_driver_sql("%s something table something" % keyword)
-
- if expected:
- eq_(
- [n for (n, k, s) in dbapi.connect().mock_calls],
- ["cursor", "commit"],
- )
- else:
- eq_(
- [n for (n, k, s) in dbapi.connect().mock_calls], ["cursor"]
- )
-
-
-class AutocommitTextTest(AutocommitKeywordFixture, fixtures.TestBase):
- __backend__ = True
-
- def test_update(self):
- self._test_keyword("UPDATE")
-
- def test_insert(self):
- self._test_keyword("INSERT")
-
- def test_delete(self):
- self._test_keyword("DELETE")
-
- def test_alter(self):
- self._test_keyword("ALTER TABLE")
-
- def test_create(self):
- self._test_keyword("CREATE TABLE foobar")
-
- def test_drop(self):
- self._test_keyword("DROP TABLE foobar")
-
- def test_select(self):
- self._test_keyword("SELECT foo FROM table", False)
-
-
-class ExplicitAutoCommitTest(fixtures.TablesTest):
-
- """test the 'autocommit' flag on select() and text() objects.
-
- Requires PostgreSQL so that we may define a custom function which
- modifies the database."""
-
- __only_on__ = "postgresql"
-
- @classmethod
- def define_tables(cls, metadata):
- Table(
- "foo",
- metadata,
- Column("id", Integer, primary_key=True),
- Column("data", String(100)),
- )
-
- event.listen(
- metadata,
- "after_create",
- DDL(
- "create function insert_foo(varchar) "
- "returns integer as 'insert into foo(data) "
- "values ($1);select 1;' language sql"
- ),
- )
- event.listen(
- metadata, "before_drop", DDL("drop function insert_foo(varchar)")
- )
-
- def test_control(self):
-
- # test that not using autocommit does not commit
- foo = self.tables.foo
-
- conn1 = testing.db.connect()
- conn2 = testing.db.connect()
- conn1.execute(select(func.insert_foo("data1")))
- assert conn2.execute(select(foo.c.data)).fetchall() == []
- conn1.execute(text("select insert_foo('moredata')"))
- assert conn2.execute(select(foo.c.data)).fetchall() == []
- trans = conn1.begin()
- trans.commit()
- assert conn2.execute(select(foo.c.data)).fetchall() == [
- ("data1",),
- ("moredata",),
- ]
- conn1.close()
- conn2.close()
-
- def test_explicit_compiled(self):
- foo = self.tables.foo
-
- conn1 = testing.db.connect()
- conn2 = testing.db.connect()
-
- with testing.expect_deprecated_20(
- "The current statement is being autocommitted using "
- "implicit autocommit"
- ):
- conn1.execute(
- select(func.insert_foo("data1")).execution_options(
- autocommit=True
- )
- )
- assert conn2.execute(select(foo.c.data)).fetchall() == [("data1",)]
- conn1.close()
- conn2.close()
-
- def test_explicit_connection(self):
- foo = self.tables.foo
-
- conn1 = testing.db.connect()
- conn2 = testing.db.connect()
- with testing.expect_deprecated_20(
- "The current statement is being autocommitted using "
- "implicit autocommit"
- ):
- conn1.execution_options(autocommit=True).execute(
- select(func.insert_foo("data1"))
- )
- eq_(conn2.execute(select(foo.c.data)).fetchall(), [("data1",)])
-
- # connection supersedes statement
-
- conn1.execution_options(autocommit=False).execute(
- select(func.insert_foo("data2")).execution_options(autocommit=True)
- )
- eq_(conn2.execute(select(foo.c.data)).fetchall(), [("data1",)])
-
- # ditto
-
- with testing.expect_deprecated_20(
- "The current statement is being autocommitted using "
- "implicit autocommit"
- ):
- conn1.execution_options(autocommit=True).execute(
- select(func.insert_foo("data3")).execution_options(
- autocommit=False
- )
- )
- eq_(
- conn2.execute(select(foo.c.data)).fetchall(),
- [("data1",), ("data2",), ("data3",)],
- )
- conn1.close()
- conn2.close()
-
- def test_explicit_text(self):
- foo = self.tables.foo
-
- conn1 = testing.db.connect()
- conn2 = testing.db.connect()
- with testing.expect_deprecated_20(
- "The current statement is being autocommitted using "
- "implicit autocommit"
- ):
- conn1.execute(
- text("select insert_foo('moredata')").execution_options(
- autocommit=True
- )
- )
- assert conn2.execute(select(foo.c.data)).fetchall() == [("moredata",)]
- conn1.close()
- conn2.close()
-
- def test_implicit_text(self):
- foo = self.tables.foo
-
- conn1 = testing.db.connect()
- conn2 = testing.db.connect()
- with testing.expect_deprecated_20(
- "The current statement is being autocommitted using "
- "implicit autocommit"
- ):
- conn1.execute(
- text("insert into foo (data) values ('implicitdata')")
- )
- assert conn2.execute(select(foo.c.data)).fetchall() == [
- ("implicitdata",)
- ]
- conn1.close()
- conn2.close()
diff --git a/test/engine/test_execute.py b/test/engine/test_execute.py
index 23df3b03d..afe95ba82 100644
--- a/test/engine/test_execute.py
+++ b/test/engine/test_execute.py
@@ -38,7 +38,6 @@ from sqlalchemy.testing import config
from sqlalchemy.testing import engines
from sqlalchemy.testing import eq_
from sqlalchemy.testing import expect_raises_message
-from sqlalchemy.testing import expect_warnings
from sqlalchemy.testing import fixtures
from sqlalchemy.testing import is_
from sqlalchemy.testing import is_false
@@ -105,6 +104,13 @@ class ExecuteTest(fixtures.TablesTest):
)
eq_(result, "%")
+ def test_no_strings(self, connection):
+ with expect_raises_message(
+ tsa.exc.ObjectNotExecutableError,
+ "Not an executable object: 'select 1'",
+ ):
+ connection.execute("select 1")
+
def test_raw_positional_invalid(self, connection):
assert_raises_message(
tsa.exc.ArgumentError,
@@ -754,17 +760,98 @@ class ExecuteTest(fixtures.TablesTest):
res = conn.scalars(select(users.c.user_name).order_by(users.c.user_id))
eq_(res.all(), ["sandy", "spongebob"])
+ @testing.combinations(
+ ({}, {}, {}),
+ ({"a": "b"}, {}, {"a": "b"}),
+ ({"a": "b", "d": "e"}, {"a": "c"}, {"a": "c", "d": "e"}),
+ argnames="conn_opts, exec_opts, expected",
+ )
+ def test_execution_opts_per_invoke(
+ self, connection, conn_opts, exec_opts, expected
+ ):
+ opts = []
-class UnicodeReturnsTest(fixtures.TestBase):
- def test_unicode_test_not_in(self):
- eng = engines.testing_engine()
- eng.dialect.returns_unicode_strings = String.RETURNS_UNKNOWN
+ @event.listens_for(connection, "before_cursor_execute")
+ def before_cursor_execute(
+ conn, cursor, statement, parameters, context, executemany
+ ):
+ opts.append(context.execution_options)
- assert_raises_message(
- tsa.exc.InvalidRequestError,
- "RETURNS_UNKNOWN is unsupported in Python 3",
- eng.connect,
- )
+ if conn_opts:
+ connection = connection.execution_options(**conn_opts)
+
+ if exec_opts:
+ connection.execute(select(1), execution_options=exec_opts)
+ else:
+ connection.execute(select(1))
+
+ eq_(opts, [expected])
+
+ @testing.combinations(
+ ({}, {}, {}, {}),
+ ({}, {"a": "b"}, {}, {"a": "b"}),
+ ({}, {"a": "b", "d": "e"}, {"a": "c"}, {"a": "c", "d": "e"}),
+ (
+ {"q": "z", "p": "r"},
+ {"a": "b", "p": "x", "d": "e"},
+ {"a": "c"},
+ {"q": "z", "p": "x", "a": "c", "d": "e"},
+ ),
+ argnames="stmt_opts, conn_opts, exec_opts, expected",
+ )
+ def test_execution_opts_per_invoke_execute_events(
+ self, connection, stmt_opts, conn_opts, exec_opts, expected
+ ):
+ opts = []
+
+ @event.listens_for(connection, "before_execute")
+ def before_execute(
+ conn, clauseelement, multiparams, params, execution_options
+ ):
+ opts.append(("before", execution_options))
+
+ @event.listens_for(connection, "after_execute")
+ def after_execute(
+ conn,
+ clauseelement,
+ multiparams,
+ params,
+ execution_options,
+ result,
+ ):
+ opts.append(("after", execution_options))
+
+ stmt = select(1)
+
+ if stmt_opts:
+ stmt = stmt.execution_options(**stmt_opts)
+
+ if conn_opts:
+ connection = connection.execution_options(**conn_opts)
+
+ if exec_opts:
+ connection.execute(stmt, execution_options=exec_opts)
+ else:
+ connection.execute(stmt)
+
+ eq_(opts, [("before", expected), ("after", expected)])
+
+ @testing.combinations(
+ ({"user_id": 1, "user_name": "name1"},),
+ ([{"user_id": 1, "user_name": "name1"}],),
+ (({"user_id": 1, "user_name": "name1"},),),
+ (
+ [
+ {"user_id": 1, "user_name": "name1"},
+ {"user_id": 2, "user_name": "name2"},
+ ],
+ ),
+ argnames="parameters",
+ )
+ def test_params_interpretation(self, connection, parameters):
+ users = self.tables.users
+
+ connection.execute(users.insert(), parameters)
class ConvenienceExecuteTest(fixtures.TablesTest):
@@ -822,21 +909,22 @@ class ConvenienceExecuteTest(fixtures.TablesTest):
return_value=Mock(begin=Mock(side_effect=Exception("boom")))
)
with mock.patch.object(engine, "_connection_cls", mock_connection):
- if testing.requires.legacy_engine.enabled:
- with expect_raises_message(Exception, "boom"):
- engine.begin()
- else:
- # context manager isn't entered, doesn't actually call
- # connect() or connection.begin()
- engine.begin()
+ # context manager isn't entered, doesn't actually call
+ # connect() or connection.begin()
+ engine.begin()
- if testing.requires.legacy_engine.enabled:
- eq_(mock_connection.return_value.close.mock_calls, [call()])
- else:
- eq_(mock_connection.return_value.close.mock_calls, [])
+ eq_(mock_connection.return_value.close.mock_calls, [])
def test_transaction_engine_ctx_begin_fails_include_enter(self):
- """test #7272"""
+ """test #7272
+
+ Note this behavior for 2.0 required that we add a new flag to
+ Connection _allow_autobegin=False, so that the first-connect
+ initialization sequence in create.py does not actually run begin()
+ events. previously, the initialize sequence used a future=False
+ connection unconditionally (and I didn't notice this).
+
+ """
engine = engines.testing_engine()
close_mock = Mock()
@@ -893,23 +981,6 @@ class ConvenienceExecuteTest(fixtures.TablesTest):
fn(conn, 5, value=8)
self._assert_fn(5, value=8)
- @testing.requires.legacy_engine
- def test_connect_as_ctx_noautocommit(self):
- fn = self._trans_fn()
- self._assert_no_data()
-
- with testing.db.connect() as conn:
- ctx = conn.execution_options(autocommit=False)
- testing.run_as_contextmanager(ctx, fn, 5, value=8)
- # autocommit is off
- self._assert_no_data()
-
-
-class FutureConvenienceExecuteTest(
- fixtures.FutureEngineMixin, ConvenienceExecuteTest
-):
- __backend__ = True
-
class CompiledCacheTest(fixtures.TestBase):
__backend__ = True
@@ -1213,51 +1284,51 @@ class SchemaTranslateTest(fixtures.TestBase, testing.AssertsExecutionResults):
with self.sql_execution_asserter(connection) as asserter:
conn = connection
execution_options = {"schema_translate_map": map_}
- conn._execute_20(
+ conn.execute(
t1.insert(), {"x": 1}, execution_options=execution_options
)
- conn._execute_20(
+ conn.execute(
t2.insert(), {"x": 1}, execution_options=execution_options
)
- conn._execute_20(
+ conn.execute(
t3.insert(), {"x": 1}, execution_options=execution_options
)
- conn._execute_20(
+ conn.execute(
t1.update().values(x=1).where(t1.c.x == 1),
execution_options=execution_options,
)
- conn._execute_20(
+ conn.execute(
t2.update().values(x=2).where(t2.c.x == 1),
execution_options=execution_options,
)
- conn._execute_20(
+ conn.execute(
t3.update().values(x=3).where(t3.c.x == 1),
execution_options=execution_options,
)
eq_(
- conn._execute_20(
+ conn.execute(
select(t1.c.x), execution_options=execution_options
).scalar(),
1,
)
eq_(
- conn._execute_20(
+ conn.execute(
select(t2.c.x), execution_options=execution_options
).scalar(),
2,
)
eq_(
- conn._execute_20(
+ conn.execute(
select(t3.c.x), execution_options=execution_options
).scalar(),
3,
)
- conn._execute_20(t1.delete(), execution_options=execution_options)
- conn._execute_20(t2.delete(), execution_options=execution_options)
- conn._execute_20(t3.delete(), execution_options=execution_options)
+ conn.execute(t1.delete(), execution_options=execution_options)
+ conn.execute(t2.delete(), execution_options=execution_options)
+ conn.execute(t3.delete(), execution_options=execution_options)
asserter.assert_(
CompiledSQL("INSERT INTO [SCHEMA__none].t1 (x) VALUES (:x)"),
@@ -1454,6 +1525,26 @@ class EngineEventsTest(fixtures.TestBase):
):
break
+ def test_engine_connect(self, testing_engine):
+ e1 = testing_engine(config.db_url)
+
+ canary = Mock()
+
+ # use a real def to trigger legacy signature decorator
+ # logic, if present
+ def thing(conn):
+ canary(conn)
+
+ event.listen(e1, "engine_connect", thing)
+
+ c1 = e1.connect()
+ c1.close()
+
+ c2 = e1.connect()
+ c2.close()
+
+ eq_(canary.mock_calls, [mock.call(c1), mock.call(c2)])
+
def test_per_engine_independence(self, testing_engine):
e1 = testing_engine(config.db_url)
e2 = testing_engine(config.db_url)
@@ -1511,11 +1602,11 @@ class EngineEventsTest(fixtures.TestBase):
canary.got_result(result)
with e1.connect() as conn:
- assert not conn._is_future
+ conn.execute(select(1)).scalar()
+
+ assert conn.in_transaction()
- with conn.begin():
- conn.execute(select(1)).scalar()
- assert conn.in_transaction()
+ conn.commit()
assert not conn.in_transaction()
@@ -1534,11 +1625,6 @@ class EngineEventsTest(fixtures.TestBase):
eq_(canary.be1.call_count, 1)
eq_(canary.be2.call_count, 1)
- if testing.requires.legacy_engine.enabled:
- conn._branch().execute(select(1))
- eq_(canary.be1.call_count, 2)
- eq_(canary.be2.call_count, 2)
-
@testing.combinations(
(True, False),
(True, True),
@@ -1586,10 +1672,18 @@ class EngineEventsTest(fixtures.TestBase):
def init(connection):
initialize(connection)
+ connection.execute(select(1))
+ # begin mock added as part of migration to future only
+ # where we don't want anything related to begin() happening
+ # as part of create
+ # note we can't use an event to ensure begin() is not called
+ # because create also blocks events from happening
with mock.patch.object(
e1.dialect, "initialize", side_effect=init
- ) as m1:
+ ) as m1, mock.patch.object(
+ e1._connection_cls, "begin"
+ ) as begin_mock:
@event.listens_for(e1, "connect", insert=True)
def go1(dbapi_conn, xyz):
@@ -1616,6 +1710,8 @@ class EngineEventsTest(fixtures.TestBase):
c1.close()
c2.close()
+ eq_(begin_mock.mock_calls, [])
+
if add_our_own_onconnect:
calls = [
mock.call.foo("custom event first"),
@@ -1676,9 +1772,6 @@ class EngineEventsTest(fixtures.TestBase):
eq_(canary.be1.call_count, 1)
- conn._branch().execute(select(1))
- eq_(canary.be1.call_count, 2)
-
def test_force_conn_events_false(self, testing_engine):
canary = Mock()
e1 = testing_engine(config.db_url, future=False)
@@ -1694,9 +1787,6 @@ class EngineEventsTest(fixtures.TestBase):
eq_(canary.be1.call_count, 0)
- conn._branch().execute(select(1))
- eq_(canary.be1.call_count, 0)
-
def test_cursor_events_ctx_execute_scalar(self, testing_engine):
canary = Mock()
e1 = testing_engine(config.db_url)
@@ -1849,9 +1939,8 @@ class EngineEventsTest(fixtures.TestBase):
# event is not called at all
eq_(m1.mock_calls, [])
- @testing.combinations((True,), (False,), argnames="future")
@testing.only_on("sqlite")
- def test_modify_statement_internal_driversql(self, connection, future):
+ def test_modify_statement_internal_driversql(self, connection):
m1 = mock.Mock()
@event.listens_for(connection, "before_execute", retval=True)
@@ -1862,16 +1951,11 @@ class EngineEventsTest(fixtures.TestBase):
return clauseelement.replace("hi", "there"), multiparams, params
eq_(
- connection._exec_driver_sql(
- "select 'hi'", [], {}, {}, future=future
- ).scalar(),
- "hi" if future else "there",
+ connection.exec_driver_sql("select 'hi'").scalar(),
+ "hi",
)
- if future:
- eq_(m1.mock_calls, [])
- else:
- eq_(m1.mock_calls, [call.run_event()])
+ eq_(m1.mock_calls, [])
def test_modify_statement_clauseelement(self, connection):
@event.listens_for(connection, "before_execute", retval=True)
@@ -1905,7 +1989,7 @@ class EngineEventsTest(fixtures.TestBase):
conn.execute(select(1).compile(dialect=e1.dialect))
conn._execute_compiled(
- select(1).compile(dialect=e1.dialect), (), {}, {}
+ select(1).compile(dialect=e1.dialect), (), {}
)
def test_execute_events(self):
@@ -2175,18 +2259,6 @@ class EngineEventsTest(fixtures.TestBase):
conn.execute(select(1))
eq_(canary, ["execute", "cursor_execute"])
- @testing.requires.legacy_engine
- def test_engine_connect(self):
- engine = engines.testing_engine()
-
- tracker = Mock()
- event.listen(engine, "engine_connect", tracker)
-
- c1 = engine.connect()
- c2 = c1._branch()
- c1.close()
- eq_(tracker.mock_calls, [call(c1, False), call(c2, True)])
-
def test_execution_options(self):
engine = engines.testing_engine()
@@ -2463,37 +2535,6 @@ class EngineEventsTest(fixtures.TestBase):
)
-class FutureEngineEventsTest(fixtures.FutureEngineMixin, EngineEventsTest):
- def test_future_fixture(self, testing_engine):
- e1 = testing_engine()
-
- assert e1._is_future
- with e1.connect() as conn:
- assert conn._is_future
-
- def test_emit_sql_in_autobegin(self, testing_engine):
- e1 = testing_engine(config.db_url)
-
- canary = Mock()
-
- @event.listens_for(e1, "begin")
- def begin(connection):
- result = connection.execute(select(1)).scalar()
- canary.got_result(result)
-
- with e1.connect() as conn:
- assert conn._is_future
- conn.execute(select(1)).scalar()
-
- assert conn.in_transaction()
-
- conn.commit()
-
- assert not conn.in_transaction()
-
- eq_(canary.mock_calls, [call.got_result(1)])
-
-
class HandleErrorTest(fixtures.TestBase):
__requires__ = ("ad_hoc_engines",)
__backend__ = True
@@ -2651,26 +2692,59 @@ class HandleErrorTest(fixtures.TestBase):
)
eq_(patched.call_count, 1)
- def test_exception_autorollback_fails(self):
+ @testing.only_on("sqlite", "using specific DB message")
+ def test_exception_no_autorollback(self):
+ """with the 2.0 engine, a SQL statement will have run
+ "autobegin", so that we are in a transaction. so if an error
+ occurs, we report the error but stay in the transaction.
+
+ previously, we'd see the rollback failing due to autorollback
+ when transaction isn't started.
+ """
engine = engines.testing_engine()
conn = engine.connect()
def boom(connection):
raise engine.dialect.dbapi.OperationalError("rollback failed")
- with expect_warnings(
- r"An exception has occurred during handling of a previous "
- r"exception. The previous exception "
- r"is.*(?:i_dont_exist|does not exist)",
- py2konly=True,
- ):
- with patch.object(conn.dialect, "do_rollback", boom):
- assert_raises_message(
- tsa.exc.OperationalError,
- "rollback failed",
- conn.exec_driver_sql,
- "insert into i_dont_exist (x) values ('y')",
- )
+ with patch.object(conn.dialect, "do_rollback", boom):
+ assert_raises_message(
+ tsa.exc.OperationalError,
+ "no such table: i_dont_exist",
+ conn.exec_driver_sql,
+ "insert into i_dont_exist (x) values ('y')",
+ )
+
+ # we're still in a transaction
+ assert conn._transaction
+
+ # only fails when we actually call rollback
+ assert_raises_message(
+ tsa.exc.OperationalError,
+ "rollback failed",
+ conn.rollback,
+ )
+
+ def test_actual_autorollback(self):
+ """manufacture an autorollback scenario that works in 2.x."""
+
+ engine = engines.testing_engine()
+ conn = engine.connect()
+
+ def boom(connection):
+ raise engine.dialect.dbapi.OperationalError("rollback failed")
+
+ @event.listens_for(conn, "begin")
+ def _do_begin(conn):
+ # run a breaking statement before begin actually happens
+ conn.exec_driver_sql("insert into i_dont_exist (x) values ('y')")
+
+ with patch.object(conn.dialect, "do_rollback", boom):
+ assert_raises_message(
+ tsa.exc.OperationalError,
+ "rollback failed",
+ conn.begin,
+ )
def test_exception_event_ad_hoc_context(self):
"""test that handle_error is called with a context in
@@ -3115,6 +3189,45 @@ class OnConnectTest(fixtures.TestBase):
dbapi.OperationalError("test"), None, None
)
+ def test_dont_create_transaction_on_initialize(self):
+ """test that engine init doesn't invoke autobegin.
+
+ this happened implicitly in 1.4 due to use of a non-future
+ connection for initialize.
+
+ to fix for 2.0 we added a new flag _allow_autobegin=False
+ for init purposes only.
+
+ """
+ e = create_engine("sqlite://")
+
+ init_connection = None
+
+ def mock_initialize(connection):
+ # definitely trigger what would normally be an autobegin
+ connection.execute(select(1))
+ nonlocal init_connection
+ init_connection = connection
+
+ with mock.patch.object(
+ e._connection_cls, "begin"
+ ) as mock_begin, mock.patch.object(
+ e.dialect, "initialize", Mock(side_effect=mock_initialize)
+ ) as mock_init:
+ conn = e.connect()
+
+ eq_(mock_begin.mock_calls, [])
+ is_not(init_connection, None)
+ is_not(conn, init_connection)
+ is_false(init_connection._allow_autobegin)
+ eq_(mock_init.mock_calls, [mock.call(init_connection)])
+
+ # assert the mock works too
+ conn.begin()
+ eq_(mock_begin.mock_calls, [mock.call()])
+
+ conn.close()
+
def test_invalidate_on_connect(self):
"""test that is_disconnect() is called during connect.
@@ -3493,168 +3606,6 @@ class DialectEventTest(fixtures.TestBase):
eq_(conn.info["boom"], "one")
-class FutureExecuteTest(fixtures.FutureEngineMixin, fixtures.TablesTest):
- __backend__ = True
-
- @classmethod
- def define_tables(cls, metadata):
- Table(
- "users",
- metadata,
- Column("user_id", INT, primary_key=True, autoincrement=False),
- Column("user_name", VARCHAR(20)),
- test_needs_acid=True,
- )
- Table(
- "users_autoinc",
- metadata,
- Column(
- "user_id", INT, primary_key=True, test_needs_autoincrement=True
- ),
- Column("user_name", VARCHAR(20)),
- test_needs_acid=True,
- )
-
- def test_non_dict_mapping(self, connection):
- """ensure arbitrary Mapping works for execute()"""
-
- class NotADict(collections_abc.Mapping):
- def __init__(self, _data):
- self._data = _data
-
- def __iter__(self):
- return iter(self._data)
-
- def __len__(self):
- return len(self._data)
-
- def __getitem__(self, key):
- return self._data[key]
-
- def keys(self):
- return self._data.keys()
-
- nd = NotADict({"a": 10, "b": 15})
- eq_(dict(nd), {"a": 10, "b": 15})
-
- result = connection.execute(
- select(
- bindparam("a", type_=Integer), bindparam("b", type_=Integer)
- ),
- nd,
- )
- eq_(result.first(), (10, 15))
-
- def test_row_works_as_mapping(self, connection):
- """ensure the RowMapping object works as a parameter dictionary for
- execute."""
-
- result = connection.execute(
- select(literal(10).label("a"), literal(15).label("b"))
- )
- row = result.first()
- eq_(row, (10, 15))
- eq_(row._mapping, {"a": 10, "b": 15})
-
- result = connection.execute(
- select(
- bindparam("a", type_=Integer).label("a"),
- bindparam("b", type_=Integer).label("b"),
- ),
- row._mapping,
- )
- row = result.first()
- eq_(row, (10, 15))
- eq_(row._mapping, {"a": 10, "b": 15})
-
- @testing.combinations(
- ({}, {}, {}),
- ({"a": "b"}, {}, {"a": "b"}),
- ({"a": "b", "d": "e"}, {"a": "c"}, {"a": "c", "d": "e"}),
- argnames="conn_opts, exec_opts, expected",
- )
- def test_execution_opts_per_invoke(
- self, connection, conn_opts, exec_opts, expected
- ):
- opts = []
-
- @event.listens_for(connection, "before_cursor_execute")
- def before_cursor_execute(
- conn, cursor, statement, parameters, context, executemany
- ):
- opts.append(context.execution_options)
-
- if conn_opts:
- connection = connection.execution_options(**conn_opts)
-
- if exec_opts:
- connection.execute(select(1), execution_options=exec_opts)
- else:
- connection.execute(select(1))
-
- eq_(opts, [expected])
-
- @testing.combinations(
- ({}, {}, {}, {}),
- ({}, {"a": "b"}, {}, {"a": "b"}),
- ({}, {"a": "b", "d": "e"}, {"a": "c"}, {"a": "c", "d": "e"}),
- (
- {"q": "z", "p": "r"},
- {"a": "b", "p": "x", "d": "e"},
- {"a": "c"},
- {"q": "z", "p": "x", "a": "c", "d": "e"},
- ),
- argnames="stmt_opts, conn_opts, exec_opts, expected",
- )
- def test_execution_opts_per_invoke_execute_events(
- self, connection, stmt_opts, conn_opts, exec_opts, expected
- ):
- opts = []
-
- @event.listens_for(connection, "before_execute")
- def before_execute(
- conn, clauseelement, multiparams, params, execution_options
- ):
- opts.append(("before", execution_options))
-
- @event.listens_for(connection, "after_execute")
- def after_execute(
- conn,
- clauseelement,
- multiparams,
- params,
- execution_options,
- result,
- ):
- opts.append(("after", execution_options))
-
- stmt = select(1)
-
- if stmt_opts:
- stmt = stmt.execution_options(**stmt_opts)
-
- if conn_opts:
- connection = connection.execution_options(**conn_opts)
-
- if exec_opts:
- connection.execute(stmt, execution_options=exec_opts)
- else:
- connection.execute(stmt)
-
- eq_(opts, [("before", expected), ("after", expected)])
-
- def test_no_branching(self, connection):
- with testing.expect_deprecated(
- r"The Connection.connect\(\) method is considered legacy"
- ):
- assert_raises_message(
- NotImplementedError,
- "sqlalchemy.future.Connection does not support "
- "'branching' of new connections.",
- connection.connect,
- )
-
-
class SetInputSizesTest(fixtures.TablesTest):
__backend__ = True
diff --git a/test/engine/test_logging.py b/test/engine/test_logging.py
index c5f8b69b6..bd5bde775 100644
--- a/test/engine/test_logging.py
+++ b/test/engine/test_logging.py
@@ -105,7 +105,7 @@ class LogParamsTest(fixtures.TestBase):
)
def test_log_positional_array(self):
- with self.eng.connect() as conn:
+ with self.eng.begin() as conn:
exc_info = assert_raises(
tsa.exc.DBAPIError,
conn.execute,
@@ -119,7 +119,7 @@ class LogParamsTest(fixtures.TestBase):
)
eq_regex(
- self.buf.buffer[1].message,
+ self.buf.buffer[2].message,
r"\[generated .*\] \(\[1, 2, 3\], 'hi'\)",
)
@@ -799,25 +799,26 @@ class EchoTest(fixtures.TestBase):
e1.echo = True
- with e1.connect() as conn:
+ with e1.begin() as conn:
conn.execute(select(1)).close()
- with e2.connect() as conn:
+ with e2.begin() as conn:
conn.execute(select(2)).close()
e1.echo = False
- with e1.connect() as conn:
+ with e1.begin() as conn:
conn.execute(select(3)).close()
- with e2.connect() as conn:
+ with e2.begin() as conn:
conn.execute(select(4)).close()
e2.echo = True
- with e1.connect() as conn:
+ with e1.begin() as conn:
conn.execute(select(5)).close()
- with e2.connect() as conn:
+ with e2.begin() as conn:
conn.execute(select(6)).close()
- assert self.buf.buffer[0].getMessage().startswith("SELECT 1")
- assert self.buf.buffer[2].getMessage().startswith("SELECT 6")
- assert len(self.buf.buffer) == 4
+ assert self.buf.buffer[1].getMessage().startswith("SELECT 1")
+
+ assert self.buf.buffer[5].getMessage().startswith("SELECT 6")
+ assert len(self.buf.buffer) == 8
diff --git a/test/engine/test_parseconnect.py b/test/engine/test_parseconnect.py
index 0cc7c0508..044dc2cbd 100644
--- a/test/engine/test_parseconnect.py
+++ b/test/engine/test_parseconnect.py
@@ -17,6 +17,7 @@ from sqlalchemy.testing import is_false
from sqlalchemy.testing import is_true
from sqlalchemy.testing import mock
from sqlalchemy.testing.assertions import expect_deprecated
+from sqlalchemy.testing.assertions import expect_raises_message
from sqlalchemy.testing.mock import call
from sqlalchemy.testing.mock import MagicMock
from sqlalchemy.testing.mock import Mock
@@ -482,7 +483,7 @@ class CreateEngineTest(fixtures.TestBase):
)
assert e.echo is True
- def test_engine_from_config_future(self):
+ def test_engine_from_config_future_parameter_ignored(self):
dbapi = mock_dbapi
config = {
@@ -491,10 +492,9 @@ class CreateEngineTest(fixtures.TestBase):
"sqlalchemy.future": "true",
}
- e = engine_from_config(config, module=dbapi, _initialize=False)
- assert e._is_future
+ engine_from_config(config, module=dbapi, _initialize=False)
- def test_engine_from_config_not_future(self):
+ def test_engine_from_config_future_false_raises(self):
dbapi = mock_dbapi
config = {
@@ -503,8 +503,12 @@ class CreateEngineTest(fixtures.TestBase):
"sqlalchemy.future": "false",
}
- e = engine_from_config(config, module=dbapi, _initialize=False)
- assert not e._is_future
+ with expect_raises_message(
+ exc.ArgumentError,
+ r"The 'future' parameter passed to create_engine\(\) "
+ r"may only be set to True.",
+ ):
+ engine_from_config(config, module=dbapi, _initialize=False)
def test_pool_reset_on_return_from_config(self):
dbapi = mock_dbapi
diff --git a/test/engine/test_processors.py b/test/engine/test_processors.py
index ad643a446..943ae32f0 100644
--- a/test/engine/test_processors.py
+++ b/test/engine/test_processors.py
@@ -1,7 +1,6 @@
from sqlalchemy.testing import assert_raises_message
from sqlalchemy.testing import eq_
from sqlalchemy.testing import fixtures
-from sqlalchemy.testing import mock
class _BooleanProcessorTest(fixtures.TestBase):
@@ -104,88 +103,3 @@ class CDateProcessorTest(_DateProcessorTest):
from sqlalchemy import cprocessors
cls.module = cprocessors
-
-
-class _DistillArgsTest(fixtures.TestBase):
- def test_distill_none(self):
- eq_(self.module._distill_params(mock.Mock(), None, None), [])
-
- def test_distill_no_multi_no_param(self):
- eq_(self.module._distill_params(mock.Mock(), (), {}), [])
-
- def test_distill_dict_multi_none_param(self):
- eq_(
- self.module._distill_params(mock.Mock(), None, {"foo": "bar"}),
- [{"foo": "bar"}],
- )
-
- def test_distill_dict_multi_empty_param(self):
- eq_(
- self.module._distill_params(mock.Mock(), (), {"foo": "bar"}),
- [{"foo": "bar"}],
- )
-
- def test_distill_single_dict(self):
- eq_(
- self.module._distill_params(mock.Mock(), ({"foo": "bar"},), {}),
- [{"foo": "bar"}],
- )
-
- def test_distill_single_list_strings(self):
- eq_(
- self.module._distill_params(mock.Mock(), (["foo", "bar"],), {}),
- [["foo", "bar"]],
- )
-
- def test_distill_single_list_tuples(self):
- eq_(
- self.module._distill_params(
- mock.Mock(), ([("foo", "bar"), ("bat", "hoho")],), {}
- ),
- [("foo", "bar"), ("bat", "hoho")],
- )
-
- def test_distill_single_list_tuple(self):
- eq_(
- self.module._distill_params(mock.Mock(), ([("foo", "bar")],), {}),
- [("foo", "bar")],
- )
-
- def test_distill_multi_list_tuple(self):
- eq_(
- self.module._distill_params(
- mock.Mock(), ([("foo", "bar")], [("bar", "bat")]), {}
- ),
- ([("foo", "bar")], [("bar", "bat")]),
- )
-
- def test_distill_multi_strings(self):
- eq_(
- self.module._distill_params(mock.Mock(), ("foo", "bar"), {}),
- [("foo", "bar")],
- )
-
- def test_distill_single_list_dicts(self):
- eq_(
- self.module._distill_params(
- mock.Mock(), ([{"foo": "bar"}, {"foo": "hoho"}],), {}
- ),
- [{"foo": "bar"}, {"foo": "hoho"}],
- )
-
- def test_distill_single_string(self):
- eq_(self.module._distill_params(mock.Mock(), ("arg",), {}), [["arg"]])
-
- def test_distill_multi_string_tuple(self):
- eq_(
- self.module._distill_params(mock.Mock(), (("arg", "arg"),), {}),
- [("arg", "arg")],
- )
-
-
-class PyDistillArgsTest(_DistillArgsTest):
- @classmethod
- def setup_test_class(cls):
- from sqlalchemy.engine import util
-
- cls.module = util
diff --git a/test/engine/test_reconnect.py b/test/engine/test_reconnect.py
index 20f6ec299..0c7f86a62 100644
--- a/test/engine/test_reconnect.py
+++ b/test/engine/test_reconnect.py
@@ -18,6 +18,7 @@ from sqlalchemy.testing import assert_raises_message_context_ok
from sqlalchemy.testing import engines
from sqlalchemy.testing import eq_
from sqlalchemy.testing import expect_raises
+from sqlalchemy.testing import expect_raises_message
from sqlalchemy.testing import fixtures
from sqlalchemy.testing import is_
from sqlalchemy.testing import is_false
@@ -27,7 +28,6 @@ from sqlalchemy.testing import ne_
from sqlalchemy.testing.engines import testing_engine
from sqlalchemy.testing.mock import call
from sqlalchemy.testing.mock import Mock
-from sqlalchemy.testing.mock import patch
from sqlalchemy.testing.schema import Column
from sqlalchemy.testing.schema import Table
from sqlalchemy.testing.util import gc_collect
@@ -563,7 +563,8 @@ class MockReconnectTest(fixtures.TestBase):
# error stays consistent
assert_raises_message(
tsa.exc.PendingRollbackError,
- "This connection is on an inactive transaction. Please rollback",
+ r"Can't reconnect until invalid transaction is rolled back. "
+ r"Please rollback\(\) fully before proceeding",
conn.execute,
select(1),
)
@@ -571,7 +572,8 @@ class MockReconnectTest(fixtures.TestBase):
assert_raises_message(
tsa.exc.PendingRollbackError,
- "This connection is on an inactive transaction. Please rollback",
+ r"Can't reconnect until invalid transaction is rolled back. "
+ r"Please rollback\(\) fully before proceeding",
trans.commit,
)
@@ -579,7 +581,8 @@ class MockReconnectTest(fixtures.TestBase):
assert_raises_message(
tsa.exc.PendingRollbackError,
- "This connection is on an inactive transaction. Please rollback",
+ r"Can't reconnect until invalid transaction is rolled back. "
+ r"Please rollback\(\) fully before proceeding",
conn.execute,
select(1),
)
@@ -607,13 +610,22 @@ class MockReconnectTest(fixtures.TestBase):
self.dbapi.shutdown()
- assert_raises(tsa.exc.DBAPIError, conn.execute, select(1))
+ with expect_raises(tsa.exc.DBAPIError):
+ conn.execute(select(1))
assert not conn.closed
assert conn.invalidated
eq_([c.close.mock_calls for c in self.dbapi.connections], [[call()]])
+ # trans was autobegin. they have to call rollback
+ with expect_raises(tsa.exc.PendingRollbackError):
+ conn.execute(select(1))
+
+ # ok
+ conn.rollback()
+
+ # now we are good
# test reconnects
conn.execute(select(1))
assert not conn.invalidated
@@ -659,17 +671,22 @@ class MockReconnectTest(fixtures.TestBase):
conn.close()
def test_noreconnect_rollback(self):
+ # this test changes in 2.x due to autobegin.
+
conn = self.db.connect()
+ conn.execute(select(1))
+
self.dbapi.shutdown("rollback_no_disconnect")
- # raises error
- assert_raises_message(
+ # previously, running a select() here which would fail would then
+ # trigger autorollback which would also fail, this is not the
+ # case now as autorollback does not normally occur
+ with expect_raises_message(
tsa.exc.DBAPIError,
- "something broke on rollback but we didn't " "lose the connection",
- conn.execute,
- select(1),
- )
+ r"something broke on rollback but we didn't lose the connection",
+ ):
+ conn.rollback()
assert not conn.closed
assert not conn.invalidated
@@ -695,8 +712,7 @@ class MockReconnectTest(fixtures.TestBase):
assert_raises_message(
tsa.exc.DBAPIError,
"Lost the DB connection on rollback",
- conn.execute,
- select(1),
+ conn.rollback,
)
assert not conn.closed
@@ -987,6 +1003,12 @@ class RealReconnectTest(fixtures.TestBase):
assert conn.invalidated
assert conn.invalidated
+
+ with expect_raises(tsa.exc.PendingRollbackError):
+ conn.execute(select(1))
+
+ conn.rollback()
+
eq_(conn.execute(select(1)).scalar(), 1)
assert not conn.invalidated
@@ -995,6 +1017,8 @@ class RealReconnectTest(fixtures.TestBase):
_assert_invalidated(conn.execute, select(1))
assert conn.invalidated
+ conn.rollback()
+
eq_(conn.execute(select(1)).scalar(), 1)
assert not conn.invalidated
@@ -1016,61 +1040,6 @@ class RealReconnectTest(fixtures.TestBase):
# pool isn't replaced
assert self.engine.pool is p2
- def test_branched_invalidate_branch_to_parent(self):
- with self.engine.connect() as c1:
-
- with patch.object(self.engine.pool, "logger") as logger:
- with testing.expect_deprecated_20(
- r"The Connection.connect\(\) method is considered legacy"
- ):
- c1_branch = c1.connect()
- eq_(c1_branch.execute(select(1)).scalar(), 1)
-
- self.engine.test_shutdown()
-
- _assert_invalidated(c1_branch.execute, select(1))
- assert c1.invalidated
- assert c1_branch.invalidated
-
- c1_branch._revalidate_connection()
- assert not c1.invalidated
- assert not c1_branch.invalidated
-
- assert "Invalidate connection" in logger.mock_calls[0][1][0]
-
- def test_branched_invalidate_parent_to_branch(self):
- with self.engine.connect() as c1:
- with testing.expect_deprecated_20(
- r"The Connection.connect\(\) method is considered legacy"
- ):
- c1_branch = c1.connect()
- eq_(c1_branch.execute(select(1)).scalar(), 1)
-
- self.engine.test_shutdown()
-
- _assert_invalidated(c1.execute, select(1))
- assert c1.invalidated
- assert c1_branch.invalidated
-
- c1._revalidate_connection()
- assert not c1.invalidated
- assert not c1_branch.invalidated
-
- def test_branch_invalidate_state(self):
- with self.engine.connect() as c1:
- with testing.expect_deprecated_20(
- r"The Connection.connect\(\) method is considered legacy"
- ):
- c1_branch = c1.connect()
-
- eq_(c1_branch.execute(select(1)).scalar(), 1)
-
- self.engine.test_shutdown()
-
- _assert_invalidated(c1_branch.execute, select(1))
- assert not c1_branch.closed
- assert not c1_branch._still_open_and_dbapi_connection_is_valid
-
def test_ensure_is_disconnect_gets_connection(self):
def is_disconnect(e, conn, cursor):
# connection is still present
@@ -1085,6 +1054,10 @@ class RealReconnectTest(fixtures.TestBase):
self.engine.test_shutdown()
assert_raises(tsa.exc.DBAPIError, conn.execute, select(1))
+ # aiosqlite is not able to run close() here without an
+ # error.
+ conn.invalidate()
+
def test_rollback_on_invalid_plain(self):
with self.engine.connect() as conn:
trans = conn.begin()
@@ -1149,6 +1122,7 @@ class RealReconnectTest(fixtures.TestBase):
_assert_invalidated(conn.execute, select(1))
assert not conn.closed
assert conn.invalidated
+ conn.rollback()
eq_(conn.execute(select(1)).scalar(), 1)
assert not conn.invalidated
@@ -1379,9 +1353,7 @@ class ReconnectRecipeTest(fixtures.TestBase):
# to get a real "cut the server off" kind of fixture we'd need to do
# something in provisioning that seeks out the TCP connection at the
# OS level and kills it.
- __only_on__ = ("mysql+mysqldb", "mysql+pymysql")
-
- future = False
+ __only_on__ = ("+mysqldb", "+pymysql")
def make_engine(self, engine):
num_retries = 3
@@ -1404,12 +1376,7 @@ class ReconnectRecipeTest(fixtures.TestBase):
)
connection.invalidate()
- if self.future:
- connection.rollback()
- else:
- trans = connection.get_transaction()
- if trans:
- trans.rollback()
+ connection.rollback()
time.sleep(retry_interval)
context.cursor = (
@@ -1446,9 +1413,7 @@ class ReconnectRecipeTest(fixtures.TestBase):
__backend__ = True
def setup_test(self):
- self.engine = engines.reconnecting_engine(
- options=dict(future=self.future)
- )
+ self.engine = engines.reconnecting_engine()
self.meta = MetaData()
self.table = Table(
"sometable",
@@ -1533,12 +1498,4 @@ class ReconnectRecipeTest(fixtures.TestBase):
{"id": 6, "name": "some name 6"},
],
)
- if self.future:
- conn.rollback()
- else:
- trans = conn.get_transaction()
- trans.rollback()
-
-
-class FutureReconnectRecipeTest(ReconnectRecipeTest):
- future = True
+ conn.rollback()
diff --git a/test/engine/test_transaction.py b/test/engine/test_transaction.py
index 9e6142022..50da425bd 100644
--- a/test/engine/test_transaction.py
+++ b/test/engine/test_transaction.py
@@ -1,5 +1,3 @@
-import sys
-
from sqlalchemy import event
from sqlalchemy import exc
from sqlalchemy import func
@@ -20,7 +18,6 @@ from sqlalchemy.testing import expect_warnings
from sqlalchemy.testing import fixtures
from sqlalchemy.testing import mock
from sqlalchemy.testing import ne_
-from sqlalchemy.testing.assertions import expect_deprecated_20
from sqlalchemy.testing.assertions import expect_raises_message
from sqlalchemy.testing.engines import testing_engine
from sqlalchemy.testing.schema import Column
@@ -45,30 +42,6 @@ class TransactionTest(fixtures.TablesTest):
with testing.db.connect() as conn:
yield conn
- def test_interrupt_ctxmanager_engine(self, trans_ctx_manager_fixture):
- fn = trans_ctx_manager_fixture
-
- # add commit/rollback to the legacy Connection object so that
- # we can test this less-likely case in use with the legacy
- # Engine.begin() context manager
- class ConnWCommitRollback(testing.db._connection_cls):
- def commit(self):
- self.get_transaction().commit()
-
- def rollback(self):
- self.get_transaction().rollback()
-
- with mock.patch.object(
- testing.db, "_connection_cls", ConnWCommitRollback
- ):
- fn(testing.db, trans_on_subject=False, execute_on_subject=False)
-
- def test_interrupt_ctxmanager_connection(self, trans_ctx_manager_fixture):
- fn = trans_ctx_manager_fixture
-
- with testing.db.connect() as conn:
- fn(conn, trans_on_subject=False, execute_on_subject=True)
-
def test_commits(self, local_connection):
users = self.tables.users
connection = local_connection
@@ -148,56 +121,6 @@ class TransactionTest(fixtures.TablesTest):
assert not local_connection.in_transaction()
@testing.combinations((True,), (False,), argnames="roll_back_in_block")
- def test_ctxmanager_rolls_back(self, local_connection, roll_back_in_block):
- m1 = mock.Mock()
-
- event.listen(local_connection, "rollback", m1.rollback)
- event.listen(local_connection, "commit", m1.commit)
-
- with expect_raises_message(Exception, "test"):
- with local_connection.begin() as trans:
- if roll_back_in_block:
- trans.rollback()
-
- if 1 == 1:
- raise Exception("test")
-
- assert not trans.is_active
- assert not local_connection.in_transaction()
- assert trans._deactivated_from_connection
-
- eq_(m1.mock_calls, [mock.call.rollback(local_connection)])
-
- @testing.combinations((True,), (False,), argnames="roll_back_in_block")
- def test_ctxmanager_rolls_back_legacy_marker(
- self, local_connection, roll_back_in_block
- ):
- m1 = mock.Mock()
-
- event.listen(local_connection, "rollback", m1.rollback)
- event.listen(local_connection, "commit", m1.commit)
-
- with expect_deprecated_20(
- r"Calling .begin\(\) when a transaction is already begun"
- ):
- with local_connection.begin() as trans:
- with expect_raises_message(Exception, "test"):
- with local_connection.begin() as marker_trans:
- if roll_back_in_block:
- marker_trans.rollback()
- if 1 == 1:
- raise Exception("test")
-
- assert not marker_trans.is_active
- assert marker_trans._deactivated_from_connection
-
- assert not trans._deactivated_from_connection
- assert not trans.is_active
- assert not local_connection.in_transaction()
-
- eq_(m1.mock_calls, [mock.call.rollback(local_connection)])
-
- @testing.combinations((True,), (False,), argnames="roll_back_in_block")
@testing.requires.savepoints
def test_ctxmanager_rolls_back_savepoint(
self, local_connection, roll_back_in_block
@@ -235,37 +158,6 @@ class TransactionTest(fixtures.TablesTest):
],
)
- def test_ctxmanager_commits_real_trans_from_nested(self, local_connection):
- m1 = mock.Mock()
-
- event.listen(
- local_connection, "rollback_savepoint", m1.rollback_savepoint
- )
- event.listen(
- local_connection, "release_savepoint", m1.release_savepoint
- )
- event.listen(local_connection, "rollback", m1.rollback)
- event.listen(local_connection, "commit", m1.commit)
- event.listen(local_connection, "begin", m1.begin)
- event.listen(local_connection, "savepoint", m1.savepoint)
-
- with testing.expect_deprecated_20(
- r"Calling Connection.begin_nested\(\) in 2.0 style use will return"
- ):
- with local_connection.begin_nested() as nested_trans:
- pass
-
- assert not nested_trans.is_active
- assert nested_trans._deactivated_from_connection
- # legacy mode, no savepoint at all
- eq_(
- m1.mock_calls,
- [
- mock.call.begin(local_connection),
- mock.call.commit(local_connection),
- ],
- )
-
def test_deactivated_warning_straight(self, local_connection):
with expect_warnings(
"transaction already deassociated from connection"
@@ -427,39 +319,34 @@ class TransactionTest(fixtures.TablesTest):
0,
)
- def test_with_interface(self, local_connection):
+ def test_ctxmanager_interface(self, local_connection):
+ # a legacy test, adapted for 2.x style, was called
+ # "test_with_interface". this is likely an early test for when
+ # the "with" construct was first added.
+
connection = local_connection
users = self.tables.users
trans = connection.begin()
- trans.__enter__()
- connection.execute(users.insert(), dict(user_id=1, user_name="user1"))
- connection.execute(users.insert(), dict(user_id=2, user_name="user2"))
- try:
+
+ with trans:
connection.execute(
- users.insert(), dict(user_id=2, user_name="user2.5")
+ users.insert(), dict(user_id=1, user_name="user1")
+ )
+ connection.execute(
+ users.insert(), dict(user_id=2, user_name="user2")
)
- except Exception:
- trans.__exit__(*sys.exc_info())
- assert not trans.is_active
- self.assert_(
- connection.exec_driver_sql(
- "select count(*) from " "users"
- ).scalar()
- == 0
- )
+ assert trans.is_active
- trans = connection.begin()
- trans.__enter__()
- connection.execute(users.insert(), dict(user_id=1, user_name="user1"))
- trans.__exit__(None, None, None)
assert not trans.is_active
- self.assert_(
+
+ eq_(
connection.exec_driver_sql(
"select count(*) from " "users"
- ).scalar()
- == 1
+ ).scalar(),
+ 2,
)
+ connection.rollback()
def test_close(self, local_connection):
connection = local_connection
@@ -648,6 +535,531 @@ class TransactionTest(fixtures.TablesTest):
)
eq_(result.fetchall(), [])
+ def test_interrupt_ctxmanager_engine(self, trans_ctx_manager_fixture):
+ fn = trans_ctx_manager_fixture
+
+ fn(testing.db, trans_on_subject=False, execute_on_subject=False)
+
+ @testing.combinations((True,), (False,), argnames="trans_on_subject")
+ def test_interrupt_ctxmanager_connection(
+ self, trans_ctx_manager_fixture, trans_on_subject
+ ):
+ fn = trans_ctx_manager_fixture
+
+ with testing.db.connect() as conn:
+ fn(
+ conn,
+ trans_on_subject=trans_on_subject,
+ execute_on_subject=True,
+ )
+
+ def test_autobegin_rollback(self):
+ users = self.tables.users
+ with testing.db.connect() as conn:
+ conn.execute(users.insert(), {"user_id": 1, "user_name": "name"})
+ conn.rollback()
+
+ eq_(conn.scalar(select(func.count(1)).select_from(users)), 0)
+
+ @testing.requires.autocommit
+ def test_autocommit_isolation_level(self):
+ users = self.tables.users
+
+ with testing.db.connect().execution_options(
+ isolation_level="AUTOCOMMIT"
+ ) as conn:
+ conn.execute(users.insert(), {"user_id": 1, "user_name": "name"})
+ conn.rollback()
+
+ with testing.db.connect() as conn:
+ eq_(
+ conn.scalar(select(func.count(1)).select_from(users)),
+ 1,
+ )
+
+ @testing.requires.autocommit
+ def test_no_autocommit_w_begin(self):
+
+ with testing.db.begin() as conn:
+ assert_raises_message(
+ exc.InvalidRequestError,
+ r"This connection has already initialized a SQLAlchemy "
+ r"Transaction\(\) object via begin\(\) or autobegin; "
+ r"isolation_level may not be altered unless rollback\(\) or "
+ r"commit\(\) is called first.",
+ conn.execution_options,
+ isolation_level="AUTOCOMMIT",
+ )
+
+ @testing.requires.autocommit
+ def test_no_autocommit_w_autobegin(self):
+
+ with testing.db.connect() as conn:
+ conn.execute(select(1))
+
+ assert_raises_message(
+ exc.InvalidRequestError,
+ r"This connection has already initialized a SQLAlchemy "
+ r"Transaction\(\) object via begin\(\) or autobegin; "
+ r"isolation_level may not be altered unless rollback\(\) or "
+ r"commit\(\) is called first.",
+ conn.execution_options,
+ isolation_level="AUTOCOMMIT",
+ )
+
+ conn.rollback()
+
+ conn.execution_options(isolation_level="AUTOCOMMIT")
+
+ def test_autobegin_commit(self):
+ users = self.tables.users
+
+ with testing.db.connect() as conn:
+
+ assert not conn.in_transaction()
+ conn.execute(users.insert(), {"user_id": 1, "user_name": "name"})
+
+ assert conn.in_transaction()
+ conn.commit()
+
+ assert not conn.in_transaction()
+
+ eq_(
+ conn.scalar(select(func.count(1)).select_from(users)),
+ 1,
+ )
+
+ conn.execute(users.insert(), {"user_id": 2, "user_name": "name 2"})
+
+ eq_(
+ conn.scalar(select(func.count(1)).select_from(users)),
+ 2,
+ )
+
+ assert conn.in_transaction()
+ conn.rollback()
+ assert not conn.in_transaction()
+
+ eq_(
+ conn.scalar(select(func.count(1)).select_from(users)),
+ 1,
+ )
+
+ def test_rollback_on_close(self):
+ canary = mock.Mock()
+ with testing.db.connect() as conn:
+ event.listen(conn, "rollback", canary)
+ conn.execute(select(1))
+ assert conn.in_transaction()
+
+ eq_(canary.mock_calls, [mock.call(conn)])
+
+ def test_no_on_close_no_transaction(self):
+ canary = mock.Mock()
+ with testing.db.connect() as conn:
+ event.listen(conn, "rollback", canary)
+ conn.execute(select(1))
+ conn.rollback()
+ assert not conn.in_transaction()
+
+ eq_(canary.mock_calls, [mock.call(conn)])
+
+ def test_rollback_on_exception(self):
+ canary = mock.Mock()
+ try:
+ with testing.db.connect() as conn:
+ event.listen(conn, "rollback", canary)
+ conn.execute(select(1))
+ assert conn.in_transaction()
+ raise Exception("some error")
+ assert False
+ except:
+ pass
+
+ eq_(canary.mock_calls, [mock.call(conn)])
+
+ def test_rollback_on_exception_if_no_trans(self):
+ canary = mock.Mock()
+ try:
+ with testing.db.connect() as conn:
+ event.listen(conn, "rollback", canary)
+ assert not conn.in_transaction()
+ raise Exception("some error")
+ assert False
+ except:
+ pass
+
+ eq_(canary.mock_calls, [])
+
+ def test_commit_no_begin(self):
+ with testing.db.connect() as conn:
+ assert not conn.in_transaction()
+ conn.commit()
+
+ @testing.requires.independent_connections
+ def test_commit_inactive(self):
+ with testing.db.connect() as conn:
+ conn.begin()
+ conn.invalidate()
+
+ assert_raises_message(
+ exc.InvalidRequestError, "Can't reconnect until", conn.commit
+ )
+
+ @testing.requires.independent_connections
+ def test_rollback_inactive(self):
+ users = self.tables.users
+ with testing.db.connect() as conn:
+
+ conn.execute(users.insert(), {"user_id": 1, "user_name": "name"})
+ conn.commit()
+
+ conn.execute(users.insert(), {"user_id": 2, "user_name": "name2"})
+
+ conn.invalidate()
+
+ assert_raises_message(
+ exc.PendingRollbackError,
+ "Can't reconnect",
+ conn.execute,
+ select(1),
+ )
+
+ conn.rollback()
+ eq_(
+ conn.scalar(select(func.count(1)).select_from(users)),
+ 1,
+ )
+
+ def test_rollback_no_begin(self):
+ with testing.db.connect() as conn:
+ assert not conn.in_transaction()
+ conn.rollback()
+
+ def test_rollback_end_ctx_manager(self):
+ with testing.db.begin() as conn:
+ assert conn.in_transaction()
+ conn.rollback()
+ assert not conn.in_transaction()
+
+ def test_rollback_end_ctx_manager_autobegin(self, local_connection):
+ m1 = mock.Mock()
+
+ event.listen(local_connection, "rollback", m1.rollback)
+ event.listen(local_connection, "commit", m1.commit)
+
+ with local_connection.begin() as trans:
+ assert local_connection.in_transaction()
+ trans.rollback()
+ assert not local_connection.in_transaction()
+
+ # previously, would be subject to autocommit.
+ # now it raises
+ with expect_raises_message(
+ exc.InvalidRequestError,
+ "Can't operate on closed transaction inside context manager. "
+ "Please complete the context manager before emitting "
+ "further commands.",
+ ):
+ local_connection.execute(select(1))
+
+ assert not local_connection.in_transaction()
+
+ @testing.combinations((True,), (False,), argnames="roll_back_in_block")
+ def test_ctxmanager_rolls_back(self, local_connection, roll_back_in_block):
+ m1 = mock.Mock()
+
+ event.listen(local_connection, "rollback", m1.rollback)
+ event.listen(local_connection, "commit", m1.commit)
+
+ with expect_raises_message(Exception, "test"):
+ with local_connection.begin() as trans:
+ if roll_back_in_block:
+ trans.rollback()
+
+ if 1 == 1:
+ raise Exception("test")
+
+ assert not trans.is_active
+ assert not local_connection.in_transaction()
+ assert trans._deactivated_from_connection
+
+ eq_(m1.mock_calls, [mock.call.rollback(local_connection)])
+
+ @testing.requires.savepoints
+ def test_ctxmanager_autobegins_real_trans_from_nested(
+ self, local_connection
+ ):
+ # the legacy version of this test in 1.4
+ # was test_ctxmanager_commits_real_trans_from_nested
+ m1 = mock.Mock()
+
+ event.listen(
+ local_connection, "rollback_savepoint", m1.rollback_savepoint
+ )
+ event.listen(
+ local_connection, "release_savepoint", m1.release_savepoint
+ )
+ event.listen(local_connection, "rollback", m1.rollback)
+ event.listen(local_connection, "commit", m1.commit)
+ event.listen(local_connection, "begin", m1.begin)
+ event.listen(local_connection, "savepoint", m1.savepoint)
+
+ with local_connection.begin_nested() as nested_trans:
+ pass
+
+ assert not nested_trans.is_active
+ assert nested_trans._deactivated_from_connection
+ eq_(
+ m1.mock_calls,
+ [
+ mock.call.begin(local_connection),
+ mock.call.savepoint(local_connection, mock.ANY),
+ mock.call.release_savepoint(
+ local_connection, mock.ANY, mock.ANY
+ ),
+ ],
+ )
+
+ def test_explicit_begin(self):
+ users = self.tables.users
+
+ with testing.db.connect() as conn:
+ assert not conn.in_transaction()
+ conn.begin()
+ assert conn.in_transaction()
+ conn.execute(users.insert(), {"user_id": 1, "user_name": "name"})
+ conn.commit()
+
+ eq_(
+ conn.scalar(select(func.count(1)).select_from(users)),
+ 1,
+ )
+
+ def test_no_double_begin(self):
+ with testing.db.connect() as conn:
+ conn.begin()
+
+ assert_raises_message(
+ exc.InvalidRequestError,
+ r"This connection has already initialized a SQLAlchemy "
+ r"Transaction\(\) object via begin\(\) or autobegin; can't "
+ r"call begin\(\) here unless rollback\(\) or commit\(\) is "
+ r"called first.",
+ conn.begin,
+ )
+
+ def test_no_autocommit(self):
+ users = self.tables.users
+
+ with testing.db.connect() as conn:
+ conn.execute(users.insert(), {"user_id": 1, "user_name": "name"})
+
+ with testing.db.connect() as conn:
+ eq_(
+ conn.scalar(select(func.count(1)).select_from(users)),
+ 0,
+ )
+
+ def test_begin_block(self):
+ users = self.tables.users
+
+ with testing.db.begin() as conn:
+ conn.execute(users.insert(), {"user_id": 1, "user_name": "name"})
+
+ with testing.db.connect() as conn:
+ eq_(
+ conn.scalar(select(func.count(1)).select_from(users)),
+ 1,
+ )
+
+ @testing.requires.savepoints
+ def test_savepoint_one(self):
+ users = self.tables.users
+
+ with testing.db.begin() as conn:
+ conn.execute(users.insert(), {"user_id": 1, "user_name": "name"})
+
+ savepoint = conn.begin_nested()
+ conn.execute(users.insert(), {"user_id": 2, "user_name": "name2"})
+
+ eq_(
+ conn.scalar(select(func.count(1)).select_from(users)),
+ 2,
+ )
+ savepoint.rollback()
+
+ eq_(
+ conn.scalar(select(func.count(1)).select_from(users)),
+ 1,
+ )
+
+ with testing.db.connect() as conn:
+ eq_(
+ conn.scalar(select(func.count(1)).select_from(users)),
+ 1,
+ )
+
+ @testing.requires.savepoints
+ def test_savepoint_two(self):
+ users = self.tables.users
+
+ with testing.db.begin() as conn:
+ conn.execute(users.insert(), {"user_id": 1, "user_name": "name"})
+
+ savepoint = conn.begin_nested()
+ conn.execute(users.insert(), {"user_id": 2, "user_name": "name2"})
+
+ eq_(
+ conn.scalar(select(func.count(1)).select_from(users)),
+ 2,
+ )
+ savepoint.commit()
+
+ eq_(
+ conn.scalar(select(func.count(1)).select_from(users)),
+ 2,
+ )
+
+ with testing.db.connect() as conn:
+ eq_(
+ conn.scalar(select(func.count(1)).select_from(users)),
+ 2,
+ )
+
+ @testing.requires.savepoints
+ def test_savepoint_three(self):
+ users = self.tables.users
+
+ with testing.db.begin() as conn:
+ conn.execute(users.insert(), {"user_id": 1, "user_name": "name"})
+
+ conn.begin_nested()
+ conn.execute(users.insert(), {"user_id": 2, "user_name": "name2"})
+
+ conn.rollback()
+
+ assert not conn.in_transaction()
+
+ with testing.db.connect() as conn:
+ eq_(
+ conn.scalar(select(func.count(1)).select_from(users)),
+ 0,
+ )
+
+ @testing.requires.savepoints
+ def test_savepoint_four(self):
+ users = self.tables.users
+
+ with testing.db.begin() as conn:
+ conn.execute(users.insert(), {"user_id": 1, "user_name": "name"})
+
+ sp1 = conn.begin_nested()
+ conn.execute(users.insert(), {"user_id": 2, "user_name": "name2"})
+
+ sp2 = conn.begin_nested()
+ conn.execute(users.insert(), {"user_id": 3, "user_name": "name3"})
+
+ sp2.rollback()
+
+ assert not sp2.is_active
+ assert sp1.is_active
+ assert conn.in_transaction()
+
+ assert not sp1.is_active
+
+ with testing.db.connect() as conn:
+ eq_(
+ conn.scalar(select(func.count(1)).select_from(users)),
+ 2,
+ )
+
+ @testing.requires.savepoints
+ def test_savepoint_five(self):
+ users = self.tables.users
+
+ with testing.db.begin() as conn:
+ conn.execute(users.insert(), {"user_id": 1, "user_name": "name"})
+
+ conn.begin_nested()
+ conn.execute(users.insert(), {"user_id": 2, "user_name": "name2"})
+
+ sp2 = conn.begin_nested()
+ conn.execute(users.insert(), {"user_id": 3, "user_name": "name3"})
+
+ sp2.commit()
+
+ assert conn.in_transaction()
+
+ with testing.db.connect() as conn:
+ eq_(
+ conn.scalar(select(func.count(1)).select_from(users)),
+ 3,
+ )
+
+ @testing.requires.savepoints
+ def test_savepoint_six(self):
+ users = self.tables.users
+
+ with testing.db.begin() as conn:
+ conn.execute(users.insert(), {"user_id": 1, "user_name": "name"})
+
+ sp1 = conn.begin_nested()
+ conn.execute(users.insert(), {"user_id": 2, "user_name": "name2"})
+
+ assert conn._nested_transaction is sp1
+
+ sp2 = conn.begin_nested()
+ conn.execute(users.insert(), {"user_id": 3, "user_name": "name3"})
+
+ assert conn._nested_transaction is sp2
+
+ sp2.commit()
+
+ assert conn._nested_transaction is sp1
+
+ sp1.rollback()
+
+ assert conn._nested_transaction is None
+
+ assert conn.in_transaction()
+
+ with testing.db.connect() as conn:
+ eq_(
+ conn.scalar(select(func.count(1)).select_from(users)),
+ 1,
+ )
+
+ @testing.requires.savepoints
+ def test_savepoint_seven(self):
+ users = self.tables.users
+
+ conn = testing.db.connect()
+ trans = conn.begin()
+ conn.execute(users.insert(), {"user_id": 1, "user_name": "name"})
+
+ sp1 = conn.begin_nested()
+ conn.execute(users.insert(), {"user_id": 2, "user_name": "name2"})
+
+ sp2 = conn.begin_nested()
+ conn.execute(users.insert(), {"user_id": 3, "user_name": "name3"})
+
+ assert conn.in_transaction()
+
+ trans.close()
+
+ assert not sp1.is_active
+ assert not sp2.is_active
+ assert not trans.is_active
+ assert conn._transaction is None
+ assert conn._nested_transaction is None
+
+ with testing.db.connect() as conn:
+ eq_(
+ conn.scalar(select(func.count(1)).select_from(users)),
+ 0,
+ )
+
class AutoRollbackTest(fixtures.TestBase):
__backend__ = True
@@ -693,7 +1105,6 @@ class IsolationLevelTest(fixtures.TestBase):
__requires__ = (
"isolation_level",
"ad_hoc_engines",
- "legacy_isolation_level",
)
__backend__ = True
@@ -712,6 +1123,7 @@ class IsolationLevelTest(fixtures.TestBase):
else:
assert False, "no non-default isolation level available"
+ @testing.requires.legacy_isolation_level
def test_engine_param_stays(self):
eng = testing_engine()
@@ -765,6 +1177,7 @@ class IsolationLevelTest(fixtures.TestBase):
conn.close()
+ @testing.requires.legacy_isolation_level
def test_reset_level_with_setting(self):
eng = testing_engine(
options=dict(isolation_level=self._non_default_isolation_level())
@@ -788,7 +1201,8 @@ class IsolationLevelTest(fixtures.TestBase):
)
conn.close()
- def test_invalid_level(self):
+ @testing.requires.legacy_isolation_level
+ def test_invalid_level_engine_param(self):
eng = testing_engine(options=dict(isolation_level="FOO"))
assert_raises_message(
exc.ArgumentError,
@@ -802,6 +1216,33 @@ class IsolationLevelTest(fixtures.TestBase):
eng.connect,
)
+ # TODO: all the dialects seem to be manually raising ArgumentError
+ # individually within their set_isolation_level() methods, when this
+ # should be a default dialect feature so that
+ # error messaging etc. is consistent, including that it works for 3rd
+ # party dialects.
+ # TODO: barring that, at least implement this for the Oracle dialect
+ @testing.fails_on(
+ "oracle",
+ "cx_oracle dialect doesnt have argument error here, "
+ "raises it via the DB rejecting it",
+ )
+ def test_invalid_level_execution_option(self):
+ eng = testing_engine(
+ options=dict(execution_options=dict(isolation_level="FOO"))
+ )
+ assert_raises_message(
+ exc.ArgumentError,
+ "Invalid value '%s' for isolation_level. "
+ "Valid isolation levels for %s are %s"
+ % (
+ "FOO",
+ eng.dialect.name,
+ ", ".join(eng.dialect._isolation_lookup),
+ ),
+ eng.connect,
+ )
+
def test_connection_invalidated(self):
eng = testing_engine()
conn = eng.connect()
@@ -851,28 +1292,25 @@ class IsolationLevelTest(fixtures.TestBase):
c3.close()
c4.close()
- def test_warning_in_transaction(self):
+ def test_exception_in_transaction(self):
eng = testing_engine()
c1 = eng.connect()
- with expect_warnings(
- "Connection is already established with a Transaction; "
- "setting isolation_level may implicitly rollback or commit "
- "the existing transaction, or have no effect until next "
- "transaction"
+ with expect_raises_message(
+ exc.InvalidRequestError,
+ r"This connection has already initialized a SQLAlchemy "
+ r"Transaction\(\) object via begin\(\) or autobegin; "
+ r"isolation_level may not be altered unless rollback\(\) or "
+ r"commit\(\) is called first.",
):
with c1.begin():
c1 = c1.execution_options(
isolation_level=self._non_default_isolation_level()
)
- eq_(
- eng.dialect.get_isolation_level(c1.connection),
- self._non_default_isolation_level(),
- )
- # stays outside of transaction
+ # was never set, so we are on original value
eq_(
eng.dialect.get_isolation_level(c1.connection),
- self._non_default_isolation_level(),
+ self._default_isolation_level(),
)
def test_per_statement_bzzt(self):
@@ -1015,25 +1453,25 @@ class ConnectionCharacteristicTest(fixtures.TestBase):
eq_(connection.foo, "original_value")
- def test_warning_in_transaction(self, characteristic_fixture):
+ def test_exception_in_transaction(self, characteristic_fixture):
+ # this was a warning in 1.x. it appears we did not test the
+ # 2.0 error case in 1.4
+
engine, connection = characteristic_fixture
c1 = engine.connect()
- with expect_warnings(
- "Connection is already established with a Transaction; "
- "setting foo may implicitly rollback or commit "
- "the existing transaction, or have no effect until next "
- "transaction"
+ with expect_raises_message(
+ exc.InvalidRequestError,
+ r"This connection has already initialized a SQLAlchemy "
+ r"Transaction\(\) object via begin\(\) or autobegin; "
+ r"foo may not be altered unless rollback\(\) or "
+ r"commit\(\) is called first.",
):
with c1.begin():
c1 = c1.execution_options(foo="new_foo")
- eq_(
- engine.dialect.get_foo(c1.connection),
- "new_foo",
- )
- # stays outside of transaction
- eq_(engine.dialect.get_foo(c1.connection), "new_foo")
+ # was never set, so we are on original value
+ eq_(engine.dialect.get_foo(c1.connection), "original_value")
@testing.fails("no error is raised yet here.")
def test_per_statement_bzzt(self, characteristic_fixture):
@@ -1120,6 +1558,7 @@ class ResetAgentTest(ResetFixture, fixtures.TestBase):
def test_begin_close(self, reset_agent):
with reset_agent.engine.connect() as connection:
trans = connection.begin()
+
assert not trans.is_active
eq_(
reset_agent.mock_calls,
@@ -1130,6 +1569,7 @@ class ResetAgentTest(ResetFixture, fixtures.TestBase):
with reset_agent.engine.connect() as connection:
trans = connection.begin()
trans.rollback()
+ assert not trans.is_active
eq_(
reset_agent.mock_calls,
[
@@ -1143,6 +1583,7 @@ class ResetAgentTest(ResetFixture, fixtures.TestBase):
with reset_agent.engine.connect() as connection:
trans = connection.begin()
trans.commit()
+ assert not trans.is_active
eq_(
reset_agent.mock_calls,
[
@@ -1237,161 +1678,6 @@ class ResetAgentTest(ResetFixture, fixtures.TestBase):
@testing.requires.savepoints
def test_begin_nested_close(self, reset_agent):
with reset_agent.engine.connect() as connection:
- with testing.expect_deprecated_20(
- r"Calling Connection.begin_nested\(\) in "
- r"2.0 style use will return"
- ):
- trans = connection.begin_nested()
- assert not trans.is_active
- eq_(
- reset_agent.mock_calls,
- [
- mock.call.rollback(connection),
- mock.call.do_rollback(mock.ANY),
- ],
- )
-
- @testing.requires.savepoints
- def test_begin_begin_nested_close(self, reset_agent):
- with reset_agent.engine.connect() as connection:
- trans = connection.begin()
- trans2 = connection.begin_nested()
- assert not trans2.is_active
- assert not trans.is_active
- eq_(
- reset_agent.mock_calls,
- [
- mock.call.rollback(connection),
- mock.call.do_rollback(mock.ANY),
- ],
- )
-
- @testing.requires.savepoints
- def test_begin_begin_nested_rollback_commit(self, reset_agent):
- with reset_agent.engine.connect() as connection:
- trans = connection.begin()
- trans2 = connection.begin_nested()
- trans2.rollback()
- trans.commit()
- eq_(
- reset_agent.mock_calls,
- [
- mock.call.rollback_savepoint(connection, mock.ANY, mock.ANY),
- mock.call.commit(connection),
- mock.call.do_commit(mock.ANY),
- mock.call.do_rollback(mock.ANY),
- ],
- )
-
- @testing.requires.savepoints
- def test_begin_begin_nested_rollback_rollback(self, reset_agent):
- with reset_agent.engine.connect() as connection:
- trans = connection.begin()
- trans2 = connection.begin_nested()
- trans2.rollback()
- trans.rollback()
- eq_(
- reset_agent.mock_calls,
- [
- mock.call.rollback_savepoint(connection, mock.ANY, mock.ANY),
- mock.call.rollback(connection),
- mock.call.do_rollback(mock.ANY),
- mock.call.do_rollback(mock.ANY),
- ],
- )
-
- @testing.requires.two_phase_transactions
- def test_reset_via_agent_begin_twophase(self, reset_agent):
- with reset_agent.engine.connect() as connection:
- trans = connection.begin_twophase() # noqa
-
- # pg8000 rolls back via the rollback_twophase
- eq_(
- reset_agent.mock_calls[0],
- mock.call.rollback_twophase(connection, mock.ANY, mock.ANY),
- )
-
- @testing.requires.two_phase_transactions
- def test_reset_via_agent_begin_twophase_commit(self, reset_agent):
- with reset_agent.engine.connect() as connection:
- trans = connection.begin_twophase()
- trans.commit()
- eq_(
- reset_agent.mock_calls[0],
- mock.call.commit_twophase(connection, mock.ANY, mock.ANY),
- )
-
- eq_(reset_agent.mock_calls[-1], mock.call.do_rollback(mock.ANY))
-
- @testing.requires.two_phase_transactions
- def test_reset_via_agent_begin_twophase_rollback(self, reset_agent):
- with reset_agent.engine.connect() as connection:
- trans = connection.begin_twophase()
- trans.rollback()
- eq_(
- reset_agent.mock_calls[0:2],
- [
- mock.call.rollback_twophase(connection, mock.ANY, mock.ANY),
- mock.call.do_rollback(mock.ANY),
- ],
- )
-
- eq_(reset_agent.mock_calls[-1], mock.call.do_rollback(mock.ANY))
-
-
-class FutureResetAgentTest(
- ResetFixture, fixtures.FutureEngineMixin, fixtures.TestBase
-):
-
- __backend__ = True
-
- def test_reset_agent_no_conn_transaction(self, reset_agent):
- with reset_agent.engine.connect():
- pass
-
- eq_(reset_agent.mock_calls, [mock.call.do_rollback(mock.ANY)])
-
- def test_begin_close(self, reset_agent):
- with reset_agent.engine.connect() as connection:
- trans = connection.begin()
-
- assert not trans.is_active
- eq_(
- reset_agent.mock_calls,
- [mock.call.rollback(connection), mock.call.do_rollback(mock.ANY)],
- )
-
- def test_begin_rollback(self, reset_agent):
- with reset_agent.engine.connect() as connection:
- trans = connection.begin()
- trans.rollback()
- assert not trans.is_active
- eq_(
- reset_agent.mock_calls,
- [
- mock.call.rollback(connection),
- mock.call.do_rollback(mock.ANY),
- mock.call.do_rollback(mock.ANY),
- ],
- )
-
- def test_begin_commit(self, reset_agent):
- with reset_agent.engine.connect() as connection:
- trans = connection.begin()
- trans.commit()
- assert not trans.is_active
- eq_(
- reset_agent.mock_calls,
- [
- mock.call.commit(connection),
- mock.call.do_commit(mock.ANY),
- mock.call.do_rollback(mock.ANY),
- ],
- )
-
- @testing.requires.savepoints
- def test_begin_nested_close(self, reset_agent):
- with reset_agent.engine.connect() as connection:
trans = connection.begin_nested()
# it's a savepoint, but root made sure it closed
assert not trans.is_active
@@ -1492,554 +1778,8 @@ class FutureResetAgentTest(
)
eq_(reset_agent.mock_calls[-1], mock.call.do_rollback(mock.ANY))
-
-class FutureTransactionTest(fixtures.FutureEngineMixin, fixtures.TablesTest):
- __backend__ = True
-
- @classmethod
- def define_tables(cls, metadata):
- Table(
- "users",
- metadata,
- Column("user_id", INT, primary_key=True, autoincrement=False),
- Column("user_name", VARCHAR(20)),
- test_needs_acid=True,
- )
- Table(
- "users_autoinc",
- metadata,
- Column(
- "user_id", INT, primary_key=True, test_needs_autoincrement=True
- ),
- Column("user_name", VARCHAR(20)),
- test_needs_acid=True,
- )
-
- @testing.fixture
- def local_connection(self):
- with testing.db.connect() as conn:
- yield conn
-
- def test_interrupt_ctxmanager_engine(self, trans_ctx_manager_fixture):
- fn = trans_ctx_manager_fixture
-
- fn(testing.db, trans_on_subject=False, execute_on_subject=False)
-
- @testing.combinations((True,), (False,), argnames="trans_on_subject")
- def test_interrupt_ctxmanager_connection(
- self, trans_ctx_manager_fixture, trans_on_subject
- ):
- fn = trans_ctx_manager_fixture
-
- with testing.db.connect() as conn:
- fn(
- conn,
- trans_on_subject=trans_on_subject,
- execute_on_subject=True,
- )
-
- def test_autobegin_rollback(self):
- users = self.tables.users
- with testing.db.connect() as conn:
- conn.execute(users.insert(), {"user_id": 1, "user_name": "name"})
- conn.rollback()
-
- eq_(conn.scalar(select(func.count(1)).select_from(users)), 0)
-
- @testing.requires.autocommit
- def test_autocommit_isolation_level(self):
- users = self.tables.users
-
- with testing.db.connect().execution_options(
- isolation_level="AUTOCOMMIT"
- ) as conn:
- conn.execute(users.insert(), {"user_id": 1, "user_name": "name"})
- conn.rollback()
-
- with testing.db.connect() as conn:
- eq_(
- conn.scalar(select(func.count(1)).select_from(users)),
- 1,
- )
-
- @testing.requires.autocommit
- def test_no_autocommit_w_begin(self):
-
- with testing.db.begin() as conn:
- assert_raises_message(
- exc.InvalidRequestError,
- r"This connection has already initialized a SQLAlchemy "
- r"Transaction\(\) object via begin\(\) or autobegin; "
- r"isolation_level may not be altered unless rollback\(\) or "
- r"commit\(\) is called first.",
- conn.execution_options,
- isolation_level="AUTOCOMMIT",
- )
-
- @testing.requires.autocommit
- def test_no_autocommit_w_autobegin(self):
-
- with testing.db.connect() as conn:
- conn.execute(select(1))
-
- assert_raises_message(
- exc.InvalidRequestError,
- r"This connection has already initialized a SQLAlchemy "
- r"Transaction\(\) object via begin\(\) or autobegin; "
- r"isolation_level may not be altered unless rollback\(\) or "
- r"commit\(\) is called first.",
- conn.execution_options,
- isolation_level="AUTOCOMMIT",
- )
-
- conn.rollback()
-
- conn.execution_options(isolation_level="AUTOCOMMIT")
-
- def test_autobegin_commit(self):
- users = self.tables.users
-
- with testing.db.connect() as conn:
-
- assert not conn.in_transaction()
- conn.execute(users.insert(), {"user_id": 1, "user_name": "name"})
-
- assert conn.in_transaction()
- conn.commit()
-
- assert not conn.in_transaction()
-
- eq_(
- conn.scalar(select(func.count(1)).select_from(users)),
- 1,
- )
-
- conn.execute(users.insert(), {"user_id": 2, "user_name": "name 2"})
-
- eq_(
- conn.scalar(select(func.count(1)).select_from(users)),
- 2,
- )
-
- assert conn.in_transaction()
- conn.rollback()
- assert not conn.in_transaction()
-
- eq_(
- conn.scalar(select(func.count(1)).select_from(users)),
- 1,
- )
-
- def test_rollback_on_close(self):
- canary = mock.Mock()
- with testing.db.connect() as conn:
- event.listen(conn, "rollback", canary)
- conn.execute(select(1))
- assert conn.in_transaction()
-
- eq_(canary.mock_calls, [mock.call(conn)])
-
- def test_no_on_close_no_transaction(self):
- canary = mock.Mock()
- with testing.db.connect() as conn:
- event.listen(conn, "rollback", canary)
- conn.execute(select(1))
- conn.rollback()
- assert not conn.in_transaction()
-
- eq_(canary.mock_calls, [mock.call(conn)])
-
- def test_rollback_on_exception(self):
- canary = mock.Mock()
- try:
- with testing.db.connect() as conn:
- event.listen(conn, "rollback", canary)
- conn.execute(select(1))
- assert conn.in_transaction()
- raise Exception("some error")
- assert False
- except:
- pass
-
- eq_(canary.mock_calls, [mock.call(conn)])
-
- def test_rollback_on_exception_if_no_trans(self):
- canary = mock.Mock()
- try:
- with testing.db.connect() as conn:
- event.listen(conn, "rollback", canary)
- assert not conn.in_transaction()
- raise Exception("some error")
- assert False
- except:
- pass
-
- eq_(canary.mock_calls, [])
-
- def test_commit_no_begin(self):
- with testing.db.connect() as conn:
- assert not conn.in_transaction()
- conn.commit()
-
- @testing.requires.independent_connections
- def test_commit_inactive(self):
- with testing.db.connect() as conn:
- conn.begin()
- conn.invalidate()
-
- assert_raises_message(
- exc.InvalidRequestError, "Can't reconnect until", conn.commit
- )
-
- @testing.requires.independent_connections
- def test_rollback_inactive(self):
- users = self.tables.users
- with testing.db.connect() as conn:
-
- conn.execute(users.insert(), {"user_id": 1, "user_name": "name"})
- conn.commit()
-
- conn.execute(users.insert(), {"user_id": 2, "user_name": "name2"})
-
- conn.invalidate()
-
- assert_raises_message(
- exc.PendingRollbackError,
- "Can't reconnect",
- conn.execute,
- select(1),
- )
-
- conn.rollback()
- eq_(
- conn.scalar(select(func.count(1)).select_from(users)),
- 1,
- )
-
- def test_rollback_no_begin(self):
- with testing.db.connect() as conn:
- assert not conn.in_transaction()
- conn.rollback()
-
- def test_rollback_end_ctx_manager(self):
- with testing.db.begin() as conn:
- assert conn.in_transaction()
- conn.rollback()
- assert not conn.in_transaction()
-
- def test_rollback_end_ctx_manager_autobegin(self, local_connection):
- m1 = mock.Mock()
-
- event.listen(local_connection, "rollback", m1.rollback)
- event.listen(local_connection, "commit", m1.commit)
-
- with local_connection.begin() as trans:
- assert local_connection.in_transaction()
- trans.rollback()
- assert not local_connection.in_transaction()
-
- # previously, would be subject to autocommit.
- # now it raises
- with expect_raises_message(
- exc.InvalidRequestError,
- "Can't operate on closed transaction inside context manager. "
- "Please complete the context manager before emitting "
- "further commands.",
- ):
- local_connection.execute(select(1))
-
- assert not local_connection.in_transaction()
-
- @testing.combinations((True,), (False,), argnames="roll_back_in_block")
- def test_ctxmanager_rolls_back(self, local_connection, roll_back_in_block):
- m1 = mock.Mock()
-
- event.listen(local_connection, "rollback", m1.rollback)
- event.listen(local_connection, "commit", m1.commit)
-
- with expect_raises_message(Exception, "test"):
- with local_connection.begin() as trans:
- if roll_back_in_block:
- trans.rollback()
-
- if 1 == 1:
- raise Exception("test")
-
- assert not trans.is_active
- assert not local_connection.in_transaction()
- assert trans._deactivated_from_connection
-
- eq_(m1.mock_calls, [mock.call.rollback(local_connection)])
-
- @testing.requires.savepoints
- def test_ctxmanager_autobegins_real_trans_from_nested(
- self, local_connection
- ):
- m1 = mock.Mock()
-
- event.listen(
- local_connection, "rollback_savepoint", m1.rollback_savepoint
- )
- event.listen(
- local_connection, "release_savepoint", m1.release_savepoint
- )
- event.listen(local_connection, "rollback", m1.rollback)
- event.listen(local_connection, "commit", m1.commit)
- event.listen(local_connection, "begin", m1.begin)
- event.listen(local_connection, "savepoint", m1.savepoint)
-
- with local_connection.begin_nested() as nested_trans:
+ def test_reset_agent_no_conn_transaction(self, reset_agent):
+ with reset_agent.engine.connect():
pass
- assert not nested_trans.is_active
- assert nested_trans._deactivated_from_connection
- # legacy mode, no savepoint at all
- eq_(
- m1.mock_calls,
- [
- mock.call.begin(local_connection),
- mock.call.savepoint(local_connection, mock.ANY),
- mock.call.release_savepoint(
- local_connection, mock.ANY, mock.ANY
- ),
- ],
- )
-
- def test_explicit_begin(self):
- users = self.tables.users
-
- with testing.db.connect() as conn:
- assert not conn.in_transaction()
- conn.begin()
- assert conn.in_transaction()
- conn.execute(users.insert(), {"user_id": 1, "user_name": "name"})
- conn.commit()
-
- eq_(
- conn.scalar(select(func.count(1)).select_from(users)),
- 1,
- )
-
- def test_no_double_begin(self):
- with testing.db.connect() as conn:
- conn.begin()
-
- assert_raises_message(
- exc.InvalidRequestError,
- r"This connection has already initialized a SQLAlchemy "
- r"Transaction\(\) object via begin\(\) or autobegin; can't "
- r"call begin\(\) here unless rollback\(\) or commit\(\) is "
- r"called first.",
- conn.begin,
- )
-
- def test_no_autocommit(self):
- users = self.tables.users
-
- with testing.db.connect() as conn:
- conn.execute(users.insert(), {"user_id": 1, "user_name": "name"})
-
- with testing.db.connect() as conn:
- eq_(
- conn.scalar(select(func.count(1)).select_from(users)),
- 0,
- )
-
- def test_begin_block(self):
- users = self.tables.users
-
- with testing.db.begin() as conn:
- conn.execute(users.insert(), {"user_id": 1, "user_name": "name"})
-
- with testing.db.connect() as conn:
- eq_(
- conn.scalar(select(func.count(1)).select_from(users)),
- 1,
- )
-
- @testing.requires.savepoints
- def test_savepoint_one(self):
- users = self.tables.users
-
- with testing.db.begin() as conn:
- conn.execute(users.insert(), {"user_id": 1, "user_name": "name"})
-
- savepoint = conn.begin_nested()
- conn.execute(users.insert(), {"user_id": 2, "user_name": "name2"})
-
- eq_(
- conn.scalar(select(func.count(1)).select_from(users)),
- 2,
- )
- savepoint.rollback()
-
- eq_(
- conn.scalar(select(func.count(1)).select_from(users)),
- 1,
- )
-
- with testing.db.connect() as conn:
- eq_(
- conn.scalar(select(func.count(1)).select_from(users)),
- 1,
- )
-
- @testing.requires.savepoints
- def test_savepoint_two(self):
- users = self.tables.users
-
- with testing.db.begin() as conn:
- conn.execute(users.insert(), {"user_id": 1, "user_name": "name"})
-
- savepoint = conn.begin_nested()
- conn.execute(users.insert(), {"user_id": 2, "user_name": "name2"})
-
- eq_(
- conn.scalar(select(func.count(1)).select_from(users)),
- 2,
- )
- savepoint.commit()
-
- eq_(
- conn.scalar(select(func.count(1)).select_from(users)),
- 2,
- )
-
- with testing.db.connect() as conn:
- eq_(
- conn.scalar(select(func.count(1)).select_from(users)),
- 2,
- )
-
- @testing.requires.savepoints
- def test_savepoint_three(self):
- users = self.tables.users
-
- with testing.db.begin() as conn:
- conn.execute(users.insert(), {"user_id": 1, "user_name": "name"})
-
- conn.begin_nested()
- conn.execute(users.insert(), {"user_id": 2, "user_name": "name2"})
-
- conn.rollback()
-
- assert not conn.in_transaction()
-
- with testing.db.connect() as conn:
- eq_(
- conn.scalar(select(func.count(1)).select_from(users)),
- 0,
- )
-
- @testing.requires.savepoints
- def test_savepoint_four(self):
- users = self.tables.users
-
- with testing.db.begin() as conn:
- conn.execute(users.insert(), {"user_id": 1, "user_name": "name"})
-
- sp1 = conn.begin_nested()
- conn.execute(users.insert(), {"user_id": 2, "user_name": "name2"})
-
- sp2 = conn.begin_nested()
- conn.execute(users.insert(), {"user_id": 3, "user_name": "name3"})
-
- sp2.rollback()
-
- assert not sp2.is_active
- assert sp1.is_active
- assert conn.in_transaction()
-
- assert not sp1.is_active
-
- with testing.db.connect() as conn:
- eq_(
- conn.scalar(select(func.count(1)).select_from(users)),
- 2,
- )
-
- @testing.requires.savepoints
- def test_savepoint_five(self):
- users = self.tables.users
-
- with testing.db.begin() as conn:
- conn.execute(users.insert(), {"user_id": 1, "user_name": "name"})
-
- conn.begin_nested()
- conn.execute(users.insert(), {"user_id": 2, "user_name": "name2"})
-
- sp2 = conn.begin_nested()
- conn.execute(users.insert(), {"user_id": 3, "user_name": "name3"})
-
- sp2.commit()
-
- assert conn.in_transaction()
-
- with testing.db.connect() as conn:
- eq_(
- conn.scalar(select(func.count(1)).select_from(users)),
- 3,
- )
-
- @testing.requires.savepoints
- def test_savepoint_six(self):
- users = self.tables.users
-
- with testing.db.begin() as conn:
- conn.execute(users.insert(), {"user_id": 1, "user_name": "name"})
-
- sp1 = conn.begin_nested()
- conn.execute(users.insert(), {"user_id": 2, "user_name": "name2"})
-
- assert conn._nested_transaction is sp1
-
- sp2 = conn.begin_nested()
- conn.execute(users.insert(), {"user_id": 3, "user_name": "name3"})
-
- assert conn._nested_transaction is sp2
-
- sp2.commit()
-
- assert conn._nested_transaction is sp1
-
- sp1.rollback()
-
- assert conn._nested_transaction is None
-
- assert conn.in_transaction()
-
- with testing.db.connect() as conn:
- eq_(
- conn.scalar(select(func.count(1)).select_from(users)),
- 1,
- )
-
- @testing.requires.savepoints
- def test_savepoint_seven(self):
- users = self.tables.users
-
- conn = testing.db.connect()
- trans = conn.begin()
- conn.execute(users.insert(), {"user_id": 1, "user_name": "name"})
-
- sp1 = conn.begin_nested()
- conn.execute(users.insert(), {"user_id": 2, "user_name": "name2"})
-
- sp2 = conn.begin_nested()
- conn.execute(users.insert(), {"user_id": 3, "user_name": "name3"})
-
- assert conn.in_transaction()
-
- trans.close()
-
- assert not sp1.is_active
- assert not sp2.is_active
- assert not trans.is_active
- assert conn._transaction is None
- assert conn._nested_transaction is None
-
- with testing.db.connect() as conn:
- eq_(
- conn.scalar(select(func.count(1)).select_from(users)),
- 0,
- )
+ eq_(reset_agent.mock_calls, [mock.call.do_rollback(mock.ANY)])