summaryrefslogtreecommitdiff
path: root/test/engine/test_execute.py
diff options
context:
space:
mode:
Diffstat (limited to 'test/engine/test_execute.py')
-rw-r--r--test/engine/test_execute.py545
1 files changed, 248 insertions, 297 deletions
diff --git a/test/engine/test_execute.py b/test/engine/test_execute.py
index 23df3b03d..afe95ba82 100644
--- a/test/engine/test_execute.py
+++ b/test/engine/test_execute.py
@@ -38,7 +38,6 @@ from sqlalchemy.testing import config
from sqlalchemy.testing import engines
from sqlalchemy.testing import eq_
from sqlalchemy.testing import expect_raises_message
-from sqlalchemy.testing import expect_warnings
from sqlalchemy.testing import fixtures
from sqlalchemy.testing import is_
from sqlalchemy.testing import is_false
@@ -105,6 +104,13 @@ class ExecuteTest(fixtures.TablesTest):
)
eq_(result, "%")
+ def test_no_strings(self, connection):
+ with expect_raises_message(
+ tsa.exc.ObjectNotExecutableError,
+ "Not an executable object: 'select 1'",
+ ):
+ connection.execute("select 1")
+
def test_raw_positional_invalid(self, connection):
assert_raises_message(
tsa.exc.ArgumentError,
@@ -754,17 +760,98 @@ class ExecuteTest(fixtures.TablesTest):
res = conn.scalars(select(users.c.user_name).order_by(users.c.user_id))
eq_(res.all(), ["sandy", "spongebob"])
+ @testing.combinations(
+ ({}, {}, {}),
+ ({"a": "b"}, {}, {"a": "b"}),
+ ({"a": "b", "d": "e"}, {"a": "c"}, {"a": "c", "d": "e"}),
+ argnames="conn_opts, exec_opts, expected",
+ )
+ def test_execution_opts_per_invoke(
+ self, connection, conn_opts, exec_opts, expected
+ ):
+ opts = []
-class UnicodeReturnsTest(fixtures.TestBase):
- def test_unicode_test_not_in(self):
- eng = engines.testing_engine()
- eng.dialect.returns_unicode_strings = String.RETURNS_UNKNOWN
+ @event.listens_for(connection, "before_cursor_execute")
+ def before_cursor_execute(
+ conn, cursor, statement, parameters, context, executemany
+ ):
+ opts.append(context.execution_options)
- assert_raises_message(
- tsa.exc.InvalidRequestError,
- "RETURNS_UNKNOWN is unsupported in Python 3",
- eng.connect,
- )
+ if conn_opts:
+ connection = connection.execution_options(**conn_opts)
+
+ if exec_opts:
+ connection.execute(select(1), execution_options=exec_opts)
+ else:
+ connection.execute(select(1))
+
+ eq_(opts, [expected])
+
+ @testing.combinations(
+ ({}, {}, {}, {}),
+ ({}, {"a": "b"}, {}, {"a": "b"}),
+ ({}, {"a": "b", "d": "e"}, {"a": "c"}, {"a": "c", "d": "e"}),
+ (
+ {"q": "z", "p": "r"},
+ {"a": "b", "p": "x", "d": "e"},
+ {"a": "c"},
+ {"q": "z", "p": "x", "a": "c", "d": "e"},
+ ),
+ argnames="stmt_opts, conn_opts, exec_opts, expected",
+ )
+ def test_execution_opts_per_invoke_execute_events(
+ self, connection, stmt_opts, conn_opts, exec_opts, expected
+ ):
+ opts = []
+
+ @event.listens_for(connection, "before_execute")
+ def before_execute(
+ conn, clauseelement, multiparams, params, execution_options
+ ):
+ opts.append(("before", execution_options))
+
+ @event.listens_for(connection, "after_execute")
+ def after_execute(
+ conn,
+ clauseelement,
+ multiparams,
+ params,
+ execution_options,
+ result,
+ ):
+ opts.append(("after", execution_options))
+
+ stmt = select(1)
+
+ if stmt_opts:
+ stmt = stmt.execution_options(**stmt_opts)
+
+ if conn_opts:
+ connection = connection.execution_options(**conn_opts)
+
+ if exec_opts:
+ connection.execute(stmt, execution_options=exec_opts)
+ else:
+ connection.execute(stmt)
+
+ eq_(opts, [("before", expected), ("after", expected)])
+
+ @testing.combinations(
+ ({"user_id": 1, "user_name": "name1"},),
+ ([{"user_id": 1, "user_name": "name1"}],),
+ (({"user_id": 1, "user_name": "name1"},),),
+ (
+ [
+ {"user_id": 1, "user_name": "name1"},
+ {"user_id": 2, "user_name": "name2"},
+ ],
+ ),
+ argnames="parameters",
+ )
+ def test_params_interpretation(self, connection, parameters):
+ users = self.tables.users
+
+ connection.execute(users.insert(), parameters)
class ConvenienceExecuteTest(fixtures.TablesTest):
@@ -822,21 +909,22 @@ class ConvenienceExecuteTest(fixtures.TablesTest):
return_value=Mock(begin=Mock(side_effect=Exception("boom")))
)
with mock.patch.object(engine, "_connection_cls", mock_connection):
- if testing.requires.legacy_engine.enabled:
- with expect_raises_message(Exception, "boom"):
- engine.begin()
- else:
- # context manager isn't entered, doesn't actually call
- # connect() or connection.begin()
- engine.begin()
+ # context manager isn't entered, doesn't actually call
+ # connect() or connection.begin()
+ engine.begin()
- if testing.requires.legacy_engine.enabled:
- eq_(mock_connection.return_value.close.mock_calls, [call()])
- else:
- eq_(mock_connection.return_value.close.mock_calls, [])
+ eq_(mock_connection.return_value.close.mock_calls, [])
def test_transaction_engine_ctx_begin_fails_include_enter(self):
- """test #7272"""
+ """test #7272
+
+ Note this behavior for 2.0 required that we add a new flag to
+ Connection _allow_autobegin=False, so that the first-connect
+ initialization sequence in create.py does not actually run begin()
+ events. previously, the initialize sequence used a future=False
+ connection unconditionally (and I didn't notice this).
+
+ """
engine = engines.testing_engine()
close_mock = Mock()
@@ -893,23 +981,6 @@ class ConvenienceExecuteTest(fixtures.TablesTest):
fn(conn, 5, value=8)
self._assert_fn(5, value=8)
- @testing.requires.legacy_engine
- def test_connect_as_ctx_noautocommit(self):
- fn = self._trans_fn()
- self._assert_no_data()
-
- with testing.db.connect() as conn:
- ctx = conn.execution_options(autocommit=False)
- testing.run_as_contextmanager(ctx, fn, 5, value=8)
- # autocommit is off
- self._assert_no_data()
-
-
-class FutureConvenienceExecuteTest(
- fixtures.FutureEngineMixin, ConvenienceExecuteTest
-):
- __backend__ = True
-
class CompiledCacheTest(fixtures.TestBase):
__backend__ = True
@@ -1213,51 +1284,51 @@ class SchemaTranslateTest(fixtures.TestBase, testing.AssertsExecutionResults):
with self.sql_execution_asserter(connection) as asserter:
conn = connection
execution_options = {"schema_translate_map": map_}
- conn._execute_20(
+ conn.execute(
t1.insert(), {"x": 1}, execution_options=execution_options
)
- conn._execute_20(
+ conn.execute(
t2.insert(), {"x": 1}, execution_options=execution_options
)
- conn._execute_20(
+ conn.execute(
t3.insert(), {"x": 1}, execution_options=execution_options
)
- conn._execute_20(
+ conn.execute(
t1.update().values(x=1).where(t1.c.x == 1),
execution_options=execution_options,
)
- conn._execute_20(
+ conn.execute(
t2.update().values(x=2).where(t2.c.x == 1),
execution_options=execution_options,
)
- conn._execute_20(
+ conn.execute(
t3.update().values(x=3).where(t3.c.x == 1),
execution_options=execution_options,
)
eq_(
- conn._execute_20(
+ conn.execute(
select(t1.c.x), execution_options=execution_options
).scalar(),
1,
)
eq_(
- conn._execute_20(
+ conn.execute(
select(t2.c.x), execution_options=execution_options
).scalar(),
2,
)
eq_(
- conn._execute_20(
+ conn.execute(
select(t3.c.x), execution_options=execution_options
).scalar(),
3,
)
- conn._execute_20(t1.delete(), execution_options=execution_options)
- conn._execute_20(t2.delete(), execution_options=execution_options)
- conn._execute_20(t3.delete(), execution_options=execution_options)
+ conn.execute(t1.delete(), execution_options=execution_options)
+ conn.execute(t2.delete(), execution_options=execution_options)
+ conn.execute(t3.delete(), execution_options=execution_options)
asserter.assert_(
CompiledSQL("INSERT INTO [SCHEMA__none].t1 (x) VALUES (:x)"),
@@ -1454,6 +1525,26 @@ class EngineEventsTest(fixtures.TestBase):
):
break
+ def test_engine_connect(self, testing_engine):
+ e1 = testing_engine(config.db_url)
+
+ canary = Mock()
+
+ # use a real def to trigger legacy signature decorator
+ # logic, if present
+ def thing(conn):
+ canary(conn)
+
+ event.listen(e1, "engine_connect", thing)
+
+ c1 = e1.connect()
+ c1.close()
+
+ c2 = e1.connect()
+ c2.close()
+
+ eq_(canary.mock_calls, [mock.call(c1), mock.call(c2)])
+
def test_per_engine_independence(self, testing_engine):
e1 = testing_engine(config.db_url)
e2 = testing_engine(config.db_url)
@@ -1511,11 +1602,11 @@ class EngineEventsTest(fixtures.TestBase):
canary.got_result(result)
with e1.connect() as conn:
- assert not conn._is_future
+ conn.execute(select(1)).scalar()
+
+ assert conn.in_transaction()
- with conn.begin():
- conn.execute(select(1)).scalar()
- assert conn.in_transaction()
+ conn.commit()
assert not conn.in_transaction()
@@ -1534,11 +1625,6 @@ class EngineEventsTest(fixtures.TestBase):
eq_(canary.be1.call_count, 1)
eq_(canary.be2.call_count, 1)
- if testing.requires.legacy_engine.enabled:
- conn._branch().execute(select(1))
- eq_(canary.be1.call_count, 2)
- eq_(canary.be2.call_count, 2)
-
@testing.combinations(
(True, False),
(True, True),
@@ -1586,10 +1672,18 @@ class EngineEventsTest(fixtures.TestBase):
def init(connection):
initialize(connection)
+ connection.execute(select(1))
+ # begin mock added as part of migration to future only
+ # where we don't want anything related to begin() happening
+ # as part of create
+ # note we can't use an event to ensure begin() is not called
+ # because create also blocks events from happening
with mock.patch.object(
e1.dialect, "initialize", side_effect=init
- ) as m1:
+ ) as m1, mock.patch.object(
+ e1._connection_cls, "begin"
+ ) as begin_mock:
@event.listens_for(e1, "connect", insert=True)
def go1(dbapi_conn, xyz):
@@ -1616,6 +1710,8 @@ class EngineEventsTest(fixtures.TestBase):
c1.close()
c2.close()
+ eq_(begin_mock.mock_calls, [])
+
if add_our_own_onconnect:
calls = [
mock.call.foo("custom event first"),
@@ -1676,9 +1772,6 @@ class EngineEventsTest(fixtures.TestBase):
eq_(canary.be1.call_count, 1)
- conn._branch().execute(select(1))
- eq_(canary.be1.call_count, 2)
-
def test_force_conn_events_false(self, testing_engine):
canary = Mock()
e1 = testing_engine(config.db_url, future=False)
@@ -1694,9 +1787,6 @@ class EngineEventsTest(fixtures.TestBase):
eq_(canary.be1.call_count, 0)
- conn._branch().execute(select(1))
- eq_(canary.be1.call_count, 0)
-
def test_cursor_events_ctx_execute_scalar(self, testing_engine):
canary = Mock()
e1 = testing_engine(config.db_url)
@@ -1849,9 +1939,8 @@ class EngineEventsTest(fixtures.TestBase):
# event is not called at all
eq_(m1.mock_calls, [])
- @testing.combinations((True,), (False,), argnames="future")
@testing.only_on("sqlite")
- def test_modify_statement_internal_driversql(self, connection, future):
+ def test_modify_statement_internal_driversql(self, connection):
m1 = mock.Mock()
@event.listens_for(connection, "before_execute", retval=True)
@@ -1862,16 +1951,11 @@ class EngineEventsTest(fixtures.TestBase):
return clauseelement.replace("hi", "there"), multiparams, params
eq_(
- connection._exec_driver_sql(
- "select 'hi'", [], {}, {}, future=future
- ).scalar(),
- "hi" if future else "there",
+ connection.exec_driver_sql("select 'hi'").scalar(),
+ "hi",
)
- if future:
- eq_(m1.mock_calls, [])
- else:
- eq_(m1.mock_calls, [call.run_event()])
+ eq_(m1.mock_calls, [])
def test_modify_statement_clauseelement(self, connection):
@event.listens_for(connection, "before_execute", retval=True)
@@ -1905,7 +1989,7 @@ class EngineEventsTest(fixtures.TestBase):
conn.execute(select(1).compile(dialect=e1.dialect))
conn._execute_compiled(
- select(1).compile(dialect=e1.dialect), (), {}, {}
+ select(1).compile(dialect=e1.dialect), (), {}
)
def test_execute_events(self):
@@ -2175,18 +2259,6 @@ class EngineEventsTest(fixtures.TestBase):
conn.execute(select(1))
eq_(canary, ["execute", "cursor_execute"])
- @testing.requires.legacy_engine
- def test_engine_connect(self):
- engine = engines.testing_engine()
-
- tracker = Mock()
- event.listen(engine, "engine_connect", tracker)
-
- c1 = engine.connect()
- c2 = c1._branch()
- c1.close()
- eq_(tracker.mock_calls, [call(c1, False), call(c2, True)])
-
def test_execution_options(self):
engine = engines.testing_engine()
@@ -2463,37 +2535,6 @@ class EngineEventsTest(fixtures.TestBase):
)
-class FutureEngineEventsTest(fixtures.FutureEngineMixin, EngineEventsTest):
- def test_future_fixture(self, testing_engine):
- e1 = testing_engine()
-
- assert e1._is_future
- with e1.connect() as conn:
- assert conn._is_future
-
- def test_emit_sql_in_autobegin(self, testing_engine):
- e1 = testing_engine(config.db_url)
-
- canary = Mock()
-
- @event.listens_for(e1, "begin")
- def begin(connection):
- result = connection.execute(select(1)).scalar()
- canary.got_result(result)
-
- with e1.connect() as conn:
- assert conn._is_future
- conn.execute(select(1)).scalar()
-
- assert conn.in_transaction()
-
- conn.commit()
-
- assert not conn.in_transaction()
-
- eq_(canary.mock_calls, [call.got_result(1)])
-
-
class HandleErrorTest(fixtures.TestBase):
__requires__ = ("ad_hoc_engines",)
__backend__ = True
@@ -2651,26 +2692,59 @@ class HandleErrorTest(fixtures.TestBase):
)
eq_(patched.call_count, 1)
- def test_exception_autorollback_fails(self):
+ @testing.only_on("sqlite", "using specific DB message")
+ def test_exception_no_autorollback(self):
+ """with the 2.0 engine, a SQL statement will have run
+ "autobegin", so that we are in a transaction. so if an error
+ occurs, we report the error but stay in the transaction.
+
+ previously, we'd see the rollback failing due to autorollback
+ when transaction isn't started.
+ """
engine = engines.testing_engine()
conn = engine.connect()
def boom(connection):
raise engine.dialect.dbapi.OperationalError("rollback failed")
- with expect_warnings(
- r"An exception has occurred during handling of a previous "
- r"exception. The previous exception "
- r"is.*(?:i_dont_exist|does not exist)",
- py2konly=True,
- ):
- with patch.object(conn.dialect, "do_rollback", boom):
- assert_raises_message(
- tsa.exc.OperationalError,
- "rollback failed",
- conn.exec_driver_sql,
- "insert into i_dont_exist (x) values ('y')",
- )
+ with patch.object(conn.dialect, "do_rollback", boom):
+ assert_raises_message(
+ tsa.exc.OperationalError,
+ "no such table: i_dont_exist",
+ conn.exec_driver_sql,
+ "insert into i_dont_exist (x) values ('y')",
+ )
+
+ # we're still in a transaction
+ assert conn._transaction
+
+ # only fails when we actually call rollback
+ assert_raises_message(
+ tsa.exc.OperationalError,
+ "rollback failed",
+ conn.rollback,
+ )
+
+ def test_actual_autorollback(self):
+ """manufacture an autorollback scenario that works in 2.x."""
+
+ engine = engines.testing_engine()
+ conn = engine.connect()
+
+ def boom(connection):
+ raise engine.dialect.dbapi.OperationalError("rollback failed")
+
+ @event.listens_for(conn, "begin")
+ def _do_begin(conn):
+ # run a breaking statement before begin actually happens
+ conn.exec_driver_sql("insert into i_dont_exist (x) values ('y')")
+
+ with patch.object(conn.dialect, "do_rollback", boom):
+ assert_raises_message(
+ tsa.exc.OperationalError,
+ "rollback failed",
+ conn.begin,
+ )
def test_exception_event_ad_hoc_context(self):
"""test that handle_error is called with a context in
@@ -3115,6 +3189,45 @@ class OnConnectTest(fixtures.TestBase):
dbapi.OperationalError("test"), None, None
)
+ def test_dont_create_transaction_on_initialize(self):
+ """test that engine init doesn't invoke autobegin.
+
+ this happened implicitly in 1.4 due to use of a non-future
+ connection for initialize.
+
+ to fix for 2.0 we added a new flag _allow_autobegin=False
+ for init purposes only.
+
+ """
+ e = create_engine("sqlite://")
+
+ init_connection = None
+
+ def mock_initialize(connection):
+ # definitely trigger what would normally be an autobegin
+ connection.execute(select(1))
+ nonlocal init_connection
+ init_connection = connection
+
+ with mock.patch.object(
+ e._connection_cls, "begin"
+ ) as mock_begin, mock.patch.object(
+ e.dialect, "initialize", Mock(side_effect=mock_initialize)
+ ) as mock_init:
+ conn = e.connect()
+
+ eq_(mock_begin.mock_calls, [])
+ is_not(init_connection, None)
+ is_not(conn, init_connection)
+ is_false(init_connection._allow_autobegin)
+ eq_(mock_init.mock_calls, [mock.call(init_connection)])
+
+ # assert the mock works too
+ conn.begin()
+ eq_(mock_begin.mock_calls, [mock.call()])
+
+ conn.close()
+
def test_invalidate_on_connect(self):
"""test that is_disconnect() is called during connect.
@@ -3493,168 +3606,6 @@ class DialectEventTest(fixtures.TestBase):
eq_(conn.info["boom"], "one")
-class FutureExecuteTest(fixtures.FutureEngineMixin, fixtures.TablesTest):
- __backend__ = True
-
- @classmethod
- def define_tables(cls, metadata):
- Table(
- "users",
- metadata,
- Column("user_id", INT, primary_key=True, autoincrement=False),
- Column("user_name", VARCHAR(20)),
- test_needs_acid=True,
- )
- Table(
- "users_autoinc",
- metadata,
- Column(
- "user_id", INT, primary_key=True, test_needs_autoincrement=True
- ),
- Column("user_name", VARCHAR(20)),
- test_needs_acid=True,
- )
-
- def test_non_dict_mapping(self, connection):
- """ensure arbitrary Mapping works for execute()"""
-
- class NotADict(collections_abc.Mapping):
- def __init__(self, _data):
- self._data = _data
-
- def __iter__(self):
- return iter(self._data)
-
- def __len__(self):
- return len(self._data)
-
- def __getitem__(self, key):
- return self._data[key]
-
- def keys(self):
- return self._data.keys()
-
- nd = NotADict({"a": 10, "b": 15})
- eq_(dict(nd), {"a": 10, "b": 15})
-
- result = connection.execute(
- select(
- bindparam("a", type_=Integer), bindparam("b", type_=Integer)
- ),
- nd,
- )
- eq_(result.first(), (10, 15))
-
- def test_row_works_as_mapping(self, connection):
- """ensure the RowMapping object works as a parameter dictionary for
- execute."""
-
- result = connection.execute(
- select(literal(10).label("a"), literal(15).label("b"))
- )
- row = result.first()
- eq_(row, (10, 15))
- eq_(row._mapping, {"a": 10, "b": 15})
-
- result = connection.execute(
- select(
- bindparam("a", type_=Integer).label("a"),
- bindparam("b", type_=Integer).label("b"),
- ),
- row._mapping,
- )
- row = result.first()
- eq_(row, (10, 15))
- eq_(row._mapping, {"a": 10, "b": 15})
-
- @testing.combinations(
- ({}, {}, {}),
- ({"a": "b"}, {}, {"a": "b"}),
- ({"a": "b", "d": "e"}, {"a": "c"}, {"a": "c", "d": "e"}),
- argnames="conn_opts, exec_opts, expected",
- )
- def test_execution_opts_per_invoke(
- self, connection, conn_opts, exec_opts, expected
- ):
- opts = []
-
- @event.listens_for(connection, "before_cursor_execute")
- def before_cursor_execute(
- conn, cursor, statement, parameters, context, executemany
- ):
- opts.append(context.execution_options)
-
- if conn_opts:
- connection = connection.execution_options(**conn_opts)
-
- if exec_opts:
- connection.execute(select(1), execution_options=exec_opts)
- else:
- connection.execute(select(1))
-
- eq_(opts, [expected])
-
- @testing.combinations(
- ({}, {}, {}, {}),
- ({}, {"a": "b"}, {}, {"a": "b"}),
- ({}, {"a": "b", "d": "e"}, {"a": "c"}, {"a": "c", "d": "e"}),
- (
- {"q": "z", "p": "r"},
- {"a": "b", "p": "x", "d": "e"},
- {"a": "c"},
- {"q": "z", "p": "x", "a": "c", "d": "e"},
- ),
- argnames="stmt_opts, conn_opts, exec_opts, expected",
- )
- def test_execution_opts_per_invoke_execute_events(
- self, connection, stmt_opts, conn_opts, exec_opts, expected
- ):
- opts = []
-
- @event.listens_for(connection, "before_execute")
- def before_execute(
- conn, clauseelement, multiparams, params, execution_options
- ):
- opts.append(("before", execution_options))
-
- @event.listens_for(connection, "after_execute")
- def after_execute(
- conn,
- clauseelement,
- multiparams,
- params,
- execution_options,
- result,
- ):
- opts.append(("after", execution_options))
-
- stmt = select(1)
-
- if stmt_opts:
- stmt = stmt.execution_options(**stmt_opts)
-
- if conn_opts:
- connection = connection.execution_options(**conn_opts)
-
- if exec_opts:
- connection.execute(stmt, execution_options=exec_opts)
- else:
- connection.execute(stmt)
-
- eq_(opts, [("before", expected), ("after", expected)])
-
- def test_no_branching(self, connection):
- with testing.expect_deprecated(
- r"The Connection.connect\(\) method is considered legacy"
- ):
- assert_raises_message(
- NotImplementedError,
- "sqlalchemy.future.Connection does not support "
- "'branching' of new connections.",
- connection.connect,
- )
-
-
class SetInputSizesTest(fixtures.TablesTest):
__backend__ = True