diff options
Diffstat (limited to 'test')
-rw-r--r-- | test/engine/test_execute.py | 129 | ||||
-rw-r--r-- | test/engine/test_parseconnect.py | 29 | ||||
-rw-r--r-- | test/engine/test_pool.py | 98 |
3 files changed, 251 insertions, 5 deletions
diff --git a/test/engine/test_execute.py b/test/engine/test_execute.py index b0256d325..cba3972f6 100644 --- a/test/engine/test_execute.py +++ b/test/engine/test_execute.py @@ -1,7 +1,7 @@ # coding: utf-8 from sqlalchemy.testing import eq_, assert_raises, assert_raises_message, \ - config, is_ + config, is_, is_not_ import re from sqlalchemy.testing.util import picklers from sqlalchemy.interfaces import ConnectionProxy @@ -1943,6 +1943,47 @@ class HandleErrorTest(fixtures.TestBase): self._test_alter_disconnect(True, False) self._test_alter_disconnect(False, False) + @testing.requires.independent_connections + def _test_alter_invalidate_pool_to_false(self, set_to_false): + orig_error = True + + engine = engines.testing_engine() + + @event.listens_for(engine, "handle_error") + def evt(ctx): + if set_to_false: + ctx.invalidate_pool_on_disconnect = False + + c1, c2, c3 = engine.pool.connect(), \ + engine.pool.connect(), engine.pool.connect() + crecs = [conn._connection_record for conn in (c1, c2, c3)] + c1.close() + c2.close() + c3.close() + + with patch.object(engine.dialect, "is_disconnect", + Mock(return_value=orig_error)): + + with engine.connect() as c: + target_crec = c.connection._connection_record + try: + c.execute("SELECT x FROM nonexistent") + assert False + except tsa.exc.StatementError as st: + eq_(st.connection_invalidated, True) + + for crec in crecs: + if crec is target_crec or not set_to_false: + is_not_(crec.connection, crec.get_connection()) + else: + is_(crec.connection, crec.get_connection()) + + def test_alter_invalidate_pool_to_false(self): + self._test_alter_invalidate_pool_to_false(True) + + def test_alter_invalidate_pool_stays_true(self): + self._test_alter_invalidate_pool_to_false(False) + def test_handle_error_event_connect_isolation_level(self): engine = engines.testing_engine() @@ -2133,7 +2174,7 @@ class HandleInvalidatedOnConnectTest(fixtures.TestBase): conn.invalidate() - eng.pool._creator = Mock( + eng.pool._wrapped_creator = Mock( side_effect=self.ProgrammingError( "Cannot operate on a closed database.")) @@ -2532,3 +2573,87 @@ class DialectEventTest(fixtures.TestBase): def test_cursor_execute_wo_replace(self): self._test_cursor_execute(False) + + def test_connect_replace_params(self): + e = engines.testing_engine(options={"_initialize": False}) + + @event.listens_for(e, "do_connect") + def evt(dialect, conn_rec, cargs, cparams): + cargs[:] = ['foo', 'hoho'] + cparams.clear() + cparams['bar'] = 'bat' + conn_rec.info['boom'] = "bap" + + m1 = Mock() + e.dialect.connect = m1.real_connect + + with e.connect() as conn: + eq_(m1.mock_calls, [call.real_connect('foo', 'hoho', bar='bat')]) + eq_(conn.info['boom'], 'bap') + + def test_connect_do_connect(self): + e = engines.testing_engine(options={"_initialize": False}) + + m1 = Mock() + + @event.listens_for(e, "do_connect") + def evt1(dialect, conn_rec, cargs, cparams): + cargs[:] = ['foo', 'hoho'] + cparams.clear() + cparams['bar'] = 'bat' + conn_rec.info['boom'] = "one" + + @event.listens_for(e, "do_connect") + def evt2(dialect, conn_rec, cargs, cparams): + conn_rec.info['bap'] = "two" + return m1.our_connect(cargs, cparams) + + with e.connect() as conn: + # called with args + eq_( + m1.mock_calls, + [call.our_connect(['foo', 'hoho'], {'bar': 'bat'})]) + + eq_(conn.info['boom'], "one") + eq_(conn.info['bap'], "two") + + # returned our mock connection + is_(conn.connection.connection, m1.our_connect()) + + def test_connect_do_connect_info_there_after_recycle(self): + # test that info is maintained after the do_connect() + # event for a soft invalidation. + + e = engines.testing_engine(options={"_initialize": False}) + + @event.listens_for(e, "do_connect") + def evt1(dialect, conn_rec, cargs, cparams): + conn_rec.info['boom'] = "one" + + conn = e.connect() + eq_(conn.info['boom'], "one") + + conn.connection.invalidate(soft=True) + conn.close() + conn = e.connect() + eq_(conn.info['boom'], "one") + + def test_connect_do_connect_info_there_after_invalidate(self): + # test that info is maintained after the do_connect() + # event for a hard invalidation. + + e = engines.testing_engine(options={"_initialize": False}) + + @event.listens_for(e, "do_connect") + def evt1(dialect, conn_rec, cargs, cparams): + assert not conn_rec.info + conn_rec.info['boom'] = "one" + + conn = e.connect() + eq_(conn.info['boom'], "one") + + conn.connection.invalidate() + conn = e.connect() + eq_(conn.info['boom'], "one") + + diff --git a/test/engine/test_parseconnect.py b/test/engine/test_parseconnect.py index 9f1b5ceba..fb1f338e6 100644 --- a/test/engine/test_parseconnect.py +++ b/test/engine/test_parseconnect.py @@ -5,7 +5,7 @@ from sqlalchemy.engine.default import DefaultDialect import sqlalchemy as tsa from sqlalchemy.testing import fixtures from sqlalchemy import testing -from sqlalchemy.testing.mock import Mock, MagicMock +from sqlalchemy.testing.mock import Mock, MagicMock, call from sqlalchemy import event from sqlalchemy import select @@ -340,6 +340,33 @@ class TestRegNewDBAPI(fixtures.TestBase): e = create_engine("mysql+my_mock_dialect://") assert isinstance(e.dialect, MockDialect) + @testing.requires.sqlite + def test_wrapper_hooks(self): + def get_dialect_cls(url): + url.drivername = "sqlite" + return url.get_dialect() + + global WrapperFactory + WrapperFactory = Mock() + WrapperFactory.get_dialect_cls.side_effect = get_dialect_cls + + from sqlalchemy.dialects import registry + registry.register("wrapperdialect", __name__, "WrapperFactory") + + from sqlalchemy.dialects import sqlite + e = create_engine("wrapperdialect://") + + eq_(e.dialect.name, "sqlite") + assert isinstance(e.dialect, sqlite.dialect) + + eq_( + WrapperFactory.mock_calls, + [ + call.get_dialect_cls(url.make_url("sqlite://")), + call.engine_created(e) + ] + ) + class MockDialect(DefaultDialect): @classmethod diff --git a/test/engine/test_pool.py b/test/engine/test_pool.py index ff45b2d51..3d93cda89 100644 --- a/test/engine/test_pool.py +++ b/test/engine/test_pool.py @@ -4,11 +4,11 @@ from sqlalchemy import pool, select, event import sqlalchemy as tsa from sqlalchemy import testing from sqlalchemy.testing.util import gc_collect, lazy_gc -from sqlalchemy.testing import eq_, assert_raises, is_not_ +from sqlalchemy.testing import eq_, assert_raises, is_not_, is_ from sqlalchemy.testing.engines import testing_engine from sqlalchemy.testing import fixtures import random -from sqlalchemy.testing.mock import Mock, call +from sqlalchemy.testing.mock import Mock, call, patch import weakref join_timeout = 10 @@ -335,6 +335,13 @@ class PoolEventsTest(PoolTestBase): return p, canary + def _soft_invalidate_event_fixture(self): + p = self._queuepool_fixture() + canary = Mock() + event.listen(p, 'soft_invalidate', canary) + + return p, canary + def test_first_connect_event(self): p, canary = self._first_connect_event_fixture() @@ -438,6 +445,31 @@ class PoolEventsTest(PoolTestBase): c1.close() eq_(canary, ['reset']) + def test_soft_invalidate_event_no_exception(self): + p, canary = self._soft_invalidate_event_fixture() + + c1 = p.connect() + c1.close() + assert not canary.called + c1 = p.connect() + dbapi_con = c1.connection + c1.invalidate(soft=True) + assert canary.call_args_list[0][0][0] is dbapi_con + assert canary.call_args_list[0][0][2] is None + + def test_soft_invalidate_event_exception(self): + p, canary = self._soft_invalidate_event_fixture() + + c1 = p.connect() + c1.close() + assert not canary.called + c1 = p.connect() + dbapi_con = c1.connection + exc = Exception("hi") + c1.invalidate(exc, soft=True) + assert canary.call_args_list[0][0][0] is dbapi_con + assert canary.call_args_list[0][0][2] is exc + def test_invalidate_event_no_exception(self): p, canary = self._invalidate_event_fixture() @@ -1130,6 +1162,44 @@ class QueuePoolTest(PoolTestBase): eq_(len(success), 12, "successes: %s" % success) + def test_connrec_invalidated_within_checkout_no_race(self): + """Test that a concurrent ConnectionRecord.invalidate() which + occurs after the ConnectionFairy has called _ConnectionRecord.checkout() + but before the ConnectionFairy tests "fairy.connection is None" + will not result in an InvalidRequestError. + + This use case assumes that a listener on the checkout() event + will be raising DisconnectionError so that a reconnect attempt + may occur. + + """ + dbapi = MockDBAPI() + + def creator(): + return dbapi.connect() + + p = pool.QueuePool(creator=creator, pool_size=1, max_overflow=0) + + conn = p.connect() + conn.close() + + _existing_checkout = pool._ConnectionRecord.checkout + + @classmethod + def _decorate_existing_checkout(cls, *arg, **kw): + fairy = _existing_checkout(*arg, **kw) + connrec = fairy._connection_record + connrec.invalidate() + return fairy + + with patch( + "sqlalchemy.pool._ConnectionRecord.checkout", + _decorate_existing_checkout): + conn = p.connect() + is_(conn._connection_record.connection, None) + conn.close() + + @testing.requires.threading_with_mock @testing.requires.timing_intensive def test_notify_waiters(self): @@ -1323,12 +1393,36 @@ class QueuePoolTest(PoolTestBase): c2 = p.connect() assert id(c2.connection) == c_id + c2_rec = c2._connection_record p._invalidate(c2) + assert c2_rec.connection is None c2.close() time.sleep(.5) c3 = p.connect() assert id(c3.connection) != c_id + @testing.requires.timing_intensive + def test_recycle_on_soft_invalidate(self): + p = self._queuepool_fixture(pool_size=1, + max_overflow=0) + c1 = p.connect() + c_id = id(c1.connection) + c1.close() + c2 = p.connect() + assert id(c2.connection) == c_id + + c2_rec = c2._connection_record + c2.invalidate(soft=True) + assert c2_rec.connection is c2.connection + + c2.close() + time.sleep(.5) + c3 = p.connect() + assert id(c3.connection) != c_id + assert c3._connection_record is c2_rec + assert c2_rec.connection is c3.connection + + def _assert_cleanup_on_pooled_reconnect(self, dbapi, p): # p is QueuePool with size=1, max_overflow=2, # and one connection in the pool that will need to |