summaryrefslogtreecommitdiff
path: root/test/engine/test_execute.py
diff options
context:
space:
mode:
Diffstat (limited to 'test/engine/test_execute.py')
-rw-r--r--test/engine/test_execute.py140
1 files changed, 110 insertions, 30 deletions
diff --git a/test/engine/test_execute.py b/test/engine/test_execute.py
index 9795e4c10..1d2aebf97 100644
--- a/test/engine/test_execute.py
+++ b/test/engine/test_execute.py
@@ -1014,6 +1014,53 @@ class ResultProxyTest(fixtures.TestBase):
finally:
r.close()
+class ExecutionOptionsTest(fixtures.TestBase):
+ def test_dialect_conn_options(self):
+ engine = testing_engine("sqlite://")
+ engine.dialect = Mock()
+ conn = engine.connect()
+ c2 = conn.execution_options(foo="bar")
+ eq_(
+ engine.dialect.set_connection_execution_options.mock_calls,
+ [call(c2, {"foo": "bar"})]
+ )
+
+ def test_dialect_engine_options(self):
+ engine = testing_engine("sqlite://")
+ engine.dialect = Mock()
+ e2 = engine.execution_options(foo="bar")
+ eq_(
+ engine.dialect.set_engine_execution_options.mock_calls,
+ [call(e2, {"foo": "bar"})]
+ )
+
+ def test_dialect_engine_construction_options(self):
+ dialect = Mock()
+ engine = Engine(Mock(), dialect, Mock(),
+ execution_options={"foo": "bar"})
+ eq_(
+ dialect.set_engine_execution_options.mock_calls,
+ [call(engine, {"foo": "bar"})]
+ )
+
+ def test_propagate_engine_to_connection(self):
+ engine = testing_engine("sqlite://",
+ options=dict(execution_options={"foo": "bar"}))
+ conn = engine.connect()
+ eq_(conn._execution_options, {"foo": "bar"})
+
+ def test_propagate_option_engine_to_connection(self):
+ e1 = testing_engine("sqlite://",
+ options=dict(execution_options={"foo": "bar"}))
+ e2 = e1.execution_options(bat="hoho")
+ c1 = e1.connect()
+ c2 = e2.connect()
+ eq_(c1._execution_options, {"foo": "bar"})
+ eq_(c2._execution_options, {"foo": "bar", "bat": "hoho"})
+
+
+
+
class AlternateResultProxyTest(fixtures.TestBase):
__requires__ = ('sqlite', )
@@ -1101,63 +1148,58 @@ class EngineEventsTest(fixtures.TestBase):
e1 = testing_engine(config.db_url)
e2 = testing_engine(config.db_url)
- canary = []
- def before_exec(conn, stmt, *arg):
- canary.append(stmt)
- event.listen(e1, "before_execute", before_exec)
+ canary = Mock()
+ event.listen(e1, "before_execute", canary)
s1 = select([1])
s2 = select([2])
e1.execute(s1)
e2.execute(s2)
- eq_(canary, [s1])
- event.listen(e2, "before_execute", before_exec)
+ eq_(
+ [arg[1][1] for arg in canary.mock_calls], [s1]
+ )
+ event.listen(e2, "before_execute", canary)
e1.execute(s1)
e2.execute(s2)
- eq_(canary, [s1, s1, s2])
+ eq_([arg[1][1] for arg in canary.mock_calls], [s1, s1, s2])
def test_per_engine_plus_global(self):
- canary = []
- def be1(conn, stmt, *arg):
- canary.append('be1')
- def be2(conn, stmt, *arg):
- canary.append('be2')
- def be3(conn, stmt, *arg):
- canary.append('be3')
-
- event.listen(Engine, "before_execute", be1)
+ canary = Mock()
+ event.listen(Engine, "before_execute", canary.be1)
e1 = testing_engine(config.db_url)
e2 = testing_engine(config.db_url)
- event.listen(e1, "before_execute", be2)
+ event.listen(e1, "before_execute", canary.be2)
- event.listen(Engine, "before_execute", be3)
+ event.listen(Engine, "before_execute", canary.be3)
e1.connect()
e2.connect()
- canary[:] = []
+
e1.execute(select([1]))
+ canary.be1.assert_call_count(1)
+ canary.be2.assert_call_count(1)
+
e2.execute(select([1]))
- eq_(canary, ['be1', 'be3', 'be2', 'be1', 'be3'])
+ canary.be1.assert_call_count(2)
+ canary.be2.assert_call_count(1)
+ canary.be3.assert_call_count(2)
def test_per_connection_plus_engine(self):
- canary = []
- def be1(conn, stmt, *arg):
- canary.append('be1')
- def be2(conn, stmt, *arg):
- canary.append('be2')
+ canary = Mock()
e1 = testing_engine(config.db_url)
- event.listen(e1, "before_execute", be1)
+ event.listen(e1, "before_execute", canary.be1)
conn = e1.connect()
- event.listen(conn, "before_execute", be2)
- canary[:] = []
+ event.listen(conn, "before_execute", canary.be2)
conn.execute(select([1]))
- eq_(canary, ['be2', 'be1'])
+ canary.be1.assert_call_count(1)
+ canary.be2.assert_call_count(1)
conn._branch().execute(select([1]))
- eq_(canary, ['be2', 'be1', 'be2', 'be1'])
+ canary.be1.assert_call_count(2)
+ canary.be2.assert_call_count(2)
def test_argument_format_execute(self):
def before_execute(conn, clauseelement, multiparams, params):
@@ -1320,6 +1362,44 @@ class EngineEventsTest(fixtures.TestBase):
canary, ['execute', 'cursor_execute']
)
+ def test_engine_connect(self):
+ engine = engines.testing_engine()
+
+ tracker = Mock()
+ event.listen(engine, "engine_connect", tracker)
+
+ c1 = engine.connect()
+ c2 = c1._branch()
+ c1.close()
+ eq_(
+ tracker.mock_calls,
+ [call(c1, False), call(c2, True)]
+ )
+
+ def test_execution_options(self):
+ engine = engines.testing_engine()
+
+ engine_tracker = Mock()
+ conn_tracker = Mock()
+
+ event.listen(engine, "set_engine_execution_options", engine_tracker)
+ event.listen(engine, "set_connection_execution_options", conn_tracker)
+
+ e2 = engine.execution_options(e1='opt_e1')
+ c1 = engine.connect()
+ c2 = c1.execution_options(c1='opt_c1')
+ c3 = e2.connect()
+ c4 = c3.execution_options(c3='opt_c3')
+ eq_(
+ engine_tracker.mock_calls,
+ [call(e2, {'e1': 'opt_e1'})]
+ )
+ eq_(
+ conn_tracker.mock_calls,
+ [call(c2, {"c1": "opt_c1"}), call(c4, {"c3": "opt_c3"})]
+ )
+
+
@testing.requires.sequences
@testing.provide_metadata
def test_cursor_execute(self):