summaryrefslogtreecommitdiff
path: root/test/engine/test_execute.py
diff options
context:
space:
mode:
Diffstat (limited to 'test/engine/test_execute.py')
-rw-r--r--test/engine/test_execute.py129
1 files changed, 127 insertions, 2 deletions
diff --git a/test/engine/test_execute.py b/test/engine/test_execute.py
index b0256d325..cba3972f6 100644
--- a/test/engine/test_execute.py
+++ b/test/engine/test_execute.py
@@ -1,7 +1,7 @@
# coding: utf-8
from sqlalchemy.testing import eq_, assert_raises, assert_raises_message, \
- config, is_
+ config, is_, is_not_
import re
from sqlalchemy.testing.util import picklers
from sqlalchemy.interfaces import ConnectionProxy
@@ -1943,6 +1943,47 @@ class HandleErrorTest(fixtures.TestBase):
self._test_alter_disconnect(True, False)
self._test_alter_disconnect(False, False)
+ @testing.requires.independent_connections
+ def _test_alter_invalidate_pool_to_false(self, set_to_false):
+ orig_error = True
+
+ engine = engines.testing_engine()
+
+ @event.listens_for(engine, "handle_error")
+ def evt(ctx):
+ if set_to_false:
+ ctx.invalidate_pool_on_disconnect = False
+
+ c1, c2, c3 = engine.pool.connect(), \
+ engine.pool.connect(), engine.pool.connect()
+ crecs = [conn._connection_record for conn in (c1, c2, c3)]
+ c1.close()
+ c2.close()
+ c3.close()
+
+ with patch.object(engine.dialect, "is_disconnect",
+ Mock(return_value=orig_error)):
+
+ with engine.connect() as c:
+ target_crec = c.connection._connection_record
+ try:
+ c.execute("SELECT x FROM nonexistent")
+ assert False
+ except tsa.exc.StatementError as st:
+ eq_(st.connection_invalidated, True)
+
+ for crec in crecs:
+ if crec is target_crec or not set_to_false:
+ is_not_(crec.connection, crec.get_connection())
+ else:
+ is_(crec.connection, crec.get_connection())
+
+ def test_alter_invalidate_pool_to_false(self):
+ self._test_alter_invalidate_pool_to_false(True)
+
+ def test_alter_invalidate_pool_stays_true(self):
+ self._test_alter_invalidate_pool_to_false(False)
+
def test_handle_error_event_connect_isolation_level(self):
engine = engines.testing_engine()
@@ -2133,7 +2174,7 @@ class HandleInvalidatedOnConnectTest(fixtures.TestBase):
conn.invalidate()
- eng.pool._creator = Mock(
+ eng.pool._wrapped_creator = Mock(
side_effect=self.ProgrammingError(
"Cannot operate on a closed database."))
@@ -2532,3 +2573,87 @@ class DialectEventTest(fixtures.TestBase):
def test_cursor_execute_wo_replace(self):
self._test_cursor_execute(False)
+
+ def test_connect_replace_params(self):
+ e = engines.testing_engine(options={"_initialize": False})
+
+ @event.listens_for(e, "do_connect")
+ def evt(dialect, conn_rec, cargs, cparams):
+ cargs[:] = ['foo', 'hoho']
+ cparams.clear()
+ cparams['bar'] = 'bat'
+ conn_rec.info['boom'] = "bap"
+
+ m1 = Mock()
+ e.dialect.connect = m1.real_connect
+
+ with e.connect() as conn:
+ eq_(m1.mock_calls, [call.real_connect('foo', 'hoho', bar='bat')])
+ eq_(conn.info['boom'], 'bap')
+
+ def test_connect_do_connect(self):
+ e = engines.testing_engine(options={"_initialize": False})
+
+ m1 = Mock()
+
+ @event.listens_for(e, "do_connect")
+ def evt1(dialect, conn_rec, cargs, cparams):
+ cargs[:] = ['foo', 'hoho']
+ cparams.clear()
+ cparams['bar'] = 'bat'
+ conn_rec.info['boom'] = "one"
+
+ @event.listens_for(e, "do_connect")
+ def evt2(dialect, conn_rec, cargs, cparams):
+ conn_rec.info['bap'] = "two"
+ return m1.our_connect(cargs, cparams)
+
+ with e.connect() as conn:
+ # called with args
+ eq_(
+ m1.mock_calls,
+ [call.our_connect(['foo', 'hoho'], {'bar': 'bat'})])
+
+ eq_(conn.info['boom'], "one")
+ eq_(conn.info['bap'], "two")
+
+ # returned our mock connection
+ is_(conn.connection.connection, m1.our_connect())
+
+ def test_connect_do_connect_info_there_after_recycle(self):
+ # test that info is maintained after the do_connect()
+ # event for a soft invalidation.
+
+ e = engines.testing_engine(options={"_initialize": False})
+
+ @event.listens_for(e, "do_connect")
+ def evt1(dialect, conn_rec, cargs, cparams):
+ conn_rec.info['boom'] = "one"
+
+ conn = e.connect()
+ eq_(conn.info['boom'], "one")
+
+ conn.connection.invalidate(soft=True)
+ conn.close()
+ conn = e.connect()
+ eq_(conn.info['boom'], "one")
+
+ def test_connect_do_connect_info_there_after_invalidate(self):
+ # test that info is maintained after the do_connect()
+ # event for a hard invalidation.
+
+ e = engines.testing_engine(options={"_initialize": False})
+
+ @event.listens_for(e, "do_connect")
+ def evt1(dialect, conn_rec, cargs, cparams):
+ assert not conn_rec.info
+ conn_rec.info['boom'] = "one"
+
+ conn = e.connect()
+ eq_(conn.info['boom'], "one")
+
+ conn.connection.invalidate()
+ conn = e.connect()
+ eq_(conn.info['boom'], "one")
+
+