diff options
Diffstat (limited to 'test/engine/test_execute.py')
-rw-r--r-- | test/engine/test_execute.py | 129 |
1 files changed, 127 insertions, 2 deletions
diff --git a/test/engine/test_execute.py b/test/engine/test_execute.py index b0256d325..cba3972f6 100644 --- a/test/engine/test_execute.py +++ b/test/engine/test_execute.py @@ -1,7 +1,7 @@ # coding: utf-8 from sqlalchemy.testing import eq_, assert_raises, assert_raises_message, \ - config, is_ + config, is_, is_not_ import re from sqlalchemy.testing.util import picklers from sqlalchemy.interfaces import ConnectionProxy @@ -1943,6 +1943,47 @@ class HandleErrorTest(fixtures.TestBase): self._test_alter_disconnect(True, False) self._test_alter_disconnect(False, False) + @testing.requires.independent_connections + def _test_alter_invalidate_pool_to_false(self, set_to_false): + orig_error = True + + engine = engines.testing_engine() + + @event.listens_for(engine, "handle_error") + def evt(ctx): + if set_to_false: + ctx.invalidate_pool_on_disconnect = False + + c1, c2, c3 = engine.pool.connect(), \ + engine.pool.connect(), engine.pool.connect() + crecs = [conn._connection_record for conn in (c1, c2, c3)] + c1.close() + c2.close() + c3.close() + + with patch.object(engine.dialect, "is_disconnect", + Mock(return_value=orig_error)): + + with engine.connect() as c: + target_crec = c.connection._connection_record + try: + c.execute("SELECT x FROM nonexistent") + assert False + except tsa.exc.StatementError as st: + eq_(st.connection_invalidated, True) + + for crec in crecs: + if crec is target_crec or not set_to_false: + is_not_(crec.connection, crec.get_connection()) + else: + is_(crec.connection, crec.get_connection()) + + def test_alter_invalidate_pool_to_false(self): + self._test_alter_invalidate_pool_to_false(True) + + def test_alter_invalidate_pool_stays_true(self): + self._test_alter_invalidate_pool_to_false(False) + def test_handle_error_event_connect_isolation_level(self): engine = engines.testing_engine() @@ -2133,7 +2174,7 @@ class HandleInvalidatedOnConnectTest(fixtures.TestBase): conn.invalidate() - eng.pool._creator = Mock( + eng.pool._wrapped_creator = Mock( side_effect=self.ProgrammingError( "Cannot operate on a closed database.")) @@ -2532,3 +2573,87 @@ class DialectEventTest(fixtures.TestBase): def test_cursor_execute_wo_replace(self): self._test_cursor_execute(False) + + def test_connect_replace_params(self): + e = engines.testing_engine(options={"_initialize": False}) + + @event.listens_for(e, "do_connect") + def evt(dialect, conn_rec, cargs, cparams): + cargs[:] = ['foo', 'hoho'] + cparams.clear() + cparams['bar'] = 'bat' + conn_rec.info['boom'] = "bap" + + m1 = Mock() + e.dialect.connect = m1.real_connect + + with e.connect() as conn: + eq_(m1.mock_calls, [call.real_connect('foo', 'hoho', bar='bat')]) + eq_(conn.info['boom'], 'bap') + + def test_connect_do_connect(self): + e = engines.testing_engine(options={"_initialize": False}) + + m1 = Mock() + + @event.listens_for(e, "do_connect") + def evt1(dialect, conn_rec, cargs, cparams): + cargs[:] = ['foo', 'hoho'] + cparams.clear() + cparams['bar'] = 'bat' + conn_rec.info['boom'] = "one" + + @event.listens_for(e, "do_connect") + def evt2(dialect, conn_rec, cargs, cparams): + conn_rec.info['bap'] = "two" + return m1.our_connect(cargs, cparams) + + with e.connect() as conn: + # called with args + eq_( + m1.mock_calls, + [call.our_connect(['foo', 'hoho'], {'bar': 'bat'})]) + + eq_(conn.info['boom'], "one") + eq_(conn.info['bap'], "two") + + # returned our mock connection + is_(conn.connection.connection, m1.our_connect()) + + def test_connect_do_connect_info_there_after_recycle(self): + # test that info is maintained after the do_connect() + # event for a soft invalidation. + + e = engines.testing_engine(options={"_initialize": False}) + + @event.listens_for(e, "do_connect") + def evt1(dialect, conn_rec, cargs, cparams): + conn_rec.info['boom'] = "one" + + conn = e.connect() + eq_(conn.info['boom'], "one") + + conn.connection.invalidate(soft=True) + conn.close() + conn = e.connect() + eq_(conn.info['boom'], "one") + + def test_connect_do_connect_info_there_after_invalidate(self): + # test that info is maintained after the do_connect() + # event for a hard invalidation. + + e = engines.testing_engine(options={"_initialize": False}) + + @event.listens_for(e, "do_connect") + def evt1(dialect, conn_rec, cargs, cparams): + assert not conn_rec.info + conn_rec.info['boom'] = "one" + + conn = e.connect() + eq_(conn.info['boom'], "one") + + conn.connection.invalidate() + conn = e.connect() + eq_(conn.info['boom'], "one") + + |