summaryrefslogtreecommitdiff
path: root/test/engine/test_execute.py
diff options
context:
space:
mode:
Diffstat (limited to 'test/engine/test_execute.py')
-rw-r--r--test/engine/test_execute.py65
1 files changed, 63 insertions, 2 deletions
diff --git a/test/engine/test_execute.py b/test/engine/test_execute.py
index ced72f276..69507eabe 100644
--- a/test/engine/test_execute.py
+++ b/test/engine/test_execute.py
@@ -322,8 +322,8 @@ class ExecuteTest(fixtures.TestBase):
@testing.requires.ad_hoc_engines
def test_engine_level_options(self):
- eng = engines.testing_engine(options={'execution_options'
- : {'foo': 'bar'}})
+ eng = engines.testing_engine(options={'execution_options':
+ {'foo': 'bar'}})
conn = eng.contextual_connect()
eq_(conn._execution_options['foo'], 'bar')
eq_(conn.execution_options(bat='hoho')._execution_options['foo'
@@ -336,6 +336,66 @@ class ExecuteTest(fixtures.TestBase):
conn = eng.contextual_connect()
eq_(conn._execution_options['foo'], 'hoho')
+ @testing.requires.ad_hoc_engines
+ def test_generative_engine_execution_options(self):
+ eng = engines.testing_engine(options={'execution_options':
+ {'base': 'x1'}})
+
+ eng1 = eng.execution_options(foo="b1")
+ eng2 = eng.execution_options(foo="b2")
+ eng1a = eng1.execution_options(bar="a1")
+ eng2a = eng2.execution_options(foo="b3", bar="a2")
+
+ eq_(eng._execution_options,
+ {'base': 'x1'})
+ eq_(eng1._execution_options,
+ {'base': 'x1', 'foo': 'b1'})
+ eq_(eng2._execution_options,
+ {'base': 'x1', 'foo': 'b2'})
+ eq_(eng1a._execution_options,
+ {'base': 'x1', 'foo': 'b1', 'bar': 'a1'})
+ eq_(eng2a._execution_options,
+ {'base': 'x1', 'foo': 'b3', 'bar': 'a2'})
+ is_(eng1a.pool, eng.pool)
+
+ # test pool is shared
+ eng2.dispose()
+ is_(eng1a.pool, eng2.pool)
+ is_(eng.pool, eng2.pool)
+
+ @testing.requires.ad_hoc_engines
+ def test_generative_engine_event_dispatch(self):
+ canary = []
+ def l1(*arg, **kw):
+ canary.append("l1")
+ def l2(*arg, **kw):
+ canary.append("l2")
+ def l3(*arg, **kw):
+ canary.append("l3")
+
+ eng = engines.testing_engine(options={'execution_options':
+ {'base': 'x1'}})
+ event.listen(eng, "before_execute", l1)
+
+ eng1 = eng.execution_options(foo="b1")
+ event.listen(eng, "before_execute", l2)
+ event.listen(eng1, "before_execute", l3)
+
+ eng.execute(select([1]))
+ eng1.execute(select([1]))
+
+ eq_(canary, ["l1", "l2", "l3", "l1", "l2"])
+
+ @testing.requires.ad_hoc_engines
+ def test_generative_engine_event_dispatch_hasevents(self):
+ def l1(*arg, **kw):
+ pass
+ eng = create_engine(testing.db.url)
+ assert not eng._has_events
+ event.listen(eng, "before_execute", l1)
+ eng2 = eng.execution_options(foo='bar')
+ assert eng2._has_events
+
def test_unicode_test_fails_warning(self):
class MockCursor(engines.DBAPIProxyCursor):
def execute(self, stmt, params=None, **kw):
@@ -1018,6 +1078,7 @@ class EngineEventsTest(fixtures.TestBase):
def tearDown(self):
Engine.dispatch._clear()
+ Engine._has_events = False
def _assert_stmts(self, expected, received):
orig = list(received)