summaryrefslogtreecommitdiff
path: root/test
diff options
context:
space:
mode:
Diffstat (limited to 'test')
-rw-r--r--test/engine/test_execute.py129
-rw-r--r--test/engine/test_parseconnect.py29
-rw-r--r--test/engine/test_pool.py98
3 files changed, 251 insertions, 5 deletions
diff --git a/test/engine/test_execute.py b/test/engine/test_execute.py
index b0256d325..cba3972f6 100644
--- a/test/engine/test_execute.py
+++ b/test/engine/test_execute.py
@@ -1,7 +1,7 @@
# coding: utf-8
from sqlalchemy.testing import eq_, assert_raises, assert_raises_message, \
- config, is_
+ config, is_, is_not_
import re
from sqlalchemy.testing.util import picklers
from sqlalchemy.interfaces import ConnectionProxy
@@ -1943,6 +1943,47 @@ class HandleErrorTest(fixtures.TestBase):
self._test_alter_disconnect(True, False)
self._test_alter_disconnect(False, False)
+ @testing.requires.independent_connections
+ def _test_alter_invalidate_pool_to_false(self, set_to_false):
+ orig_error = True
+
+ engine = engines.testing_engine()
+
+ @event.listens_for(engine, "handle_error")
+ def evt(ctx):
+ if set_to_false:
+ ctx.invalidate_pool_on_disconnect = False
+
+ c1, c2, c3 = engine.pool.connect(), \
+ engine.pool.connect(), engine.pool.connect()
+ crecs = [conn._connection_record for conn in (c1, c2, c3)]
+ c1.close()
+ c2.close()
+ c3.close()
+
+ with patch.object(engine.dialect, "is_disconnect",
+ Mock(return_value=orig_error)):
+
+ with engine.connect() as c:
+ target_crec = c.connection._connection_record
+ try:
+ c.execute("SELECT x FROM nonexistent")
+ assert False
+ except tsa.exc.StatementError as st:
+ eq_(st.connection_invalidated, True)
+
+ for crec in crecs:
+ if crec is target_crec or not set_to_false:
+ is_not_(crec.connection, crec.get_connection())
+ else:
+ is_(crec.connection, crec.get_connection())
+
+ def test_alter_invalidate_pool_to_false(self):
+ self._test_alter_invalidate_pool_to_false(True)
+
+ def test_alter_invalidate_pool_stays_true(self):
+ self._test_alter_invalidate_pool_to_false(False)
+
def test_handle_error_event_connect_isolation_level(self):
engine = engines.testing_engine()
@@ -2133,7 +2174,7 @@ class HandleInvalidatedOnConnectTest(fixtures.TestBase):
conn.invalidate()
- eng.pool._creator = Mock(
+ eng.pool._wrapped_creator = Mock(
side_effect=self.ProgrammingError(
"Cannot operate on a closed database."))
@@ -2532,3 +2573,87 @@ class DialectEventTest(fixtures.TestBase):
def test_cursor_execute_wo_replace(self):
self._test_cursor_execute(False)
+
+ def test_connect_replace_params(self):
+ e = engines.testing_engine(options={"_initialize": False})
+
+ @event.listens_for(e, "do_connect")
+ def evt(dialect, conn_rec, cargs, cparams):
+ cargs[:] = ['foo', 'hoho']
+ cparams.clear()
+ cparams['bar'] = 'bat'
+ conn_rec.info['boom'] = "bap"
+
+ m1 = Mock()
+ e.dialect.connect = m1.real_connect
+
+ with e.connect() as conn:
+ eq_(m1.mock_calls, [call.real_connect('foo', 'hoho', bar='bat')])
+ eq_(conn.info['boom'], 'bap')
+
+ def test_connect_do_connect(self):
+ e = engines.testing_engine(options={"_initialize": False})
+
+ m1 = Mock()
+
+ @event.listens_for(e, "do_connect")
+ def evt1(dialect, conn_rec, cargs, cparams):
+ cargs[:] = ['foo', 'hoho']
+ cparams.clear()
+ cparams['bar'] = 'bat'
+ conn_rec.info['boom'] = "one"
+
+ @event.listens_for(e, "do_connect")
+ def evt2(dialect, conn_rec, cargs, cparams):
+ conn_rec.info['bap'] = "two"
+ return m1.our_connect(cargs, cparams)
+
+ with e.connect() as conn:
+ # called with args
+ eq_(
+ m1.mock_calls,
+ [call.our_connect(['foo', 'hoho'], {'bar': 'bat'})])
+
+ eq_(conn.info['boom'], "one")
+ eq_(conn.info['bap'], "two")
+
+ # returned our mock connection
+ is_(conn.connection.connection, m1.our_connect())
+
+ def test_connect_do_connect_info_there_after_recycle(self):
+ # test that info is maintained after the do_connect()
+ # event for a soft invalidation.
+
+ e = engines.testing_engine(options={"_initialize": False})
+
+ @event.listens_for(e, "do_connect")
+ def evt1(dialect, conn_rec, cargs, cparams):
+ conn_rec.info['boom'] = "one"
+
+ conn = e.connect()
+ eq_(conn.info['boom'], "one")
+
+ conn.connection.invalidate(soft=True)
+ conn.close()
+ conn = e.connect()
+ eq_(conn.info['boom'], "one")
+
+ def test_connect_do_connect_info_there_after_invalidate(self):
+ # test that info is maintained after the do_connect()
+ # event for a hard invalidation.
+
+ e = engines.testing_engine(options={"_initialize": False})
+
+ @event.listens_for(e, "do_connect")
+ def evt1(dialect, conn_rec, cargs, cparams):
+ assert not conn_rec.info
+ conn_rec.info['boom'] = "one"
+
+ conn = e.connect()
+ eq_(conn.info['boom'], "one")
+
+ conn.connection.invalidate()
+ conn = e.connect()
+ eq_(conn.info['boom'], "one")
+
+
diff --git a/test/engine/test_parseconnect.py b/test/engine/test_parseconnect.py
index 9f1b5ceba..fb1f338e6 100644
--- a/test/engine/test_parseconnect.py
+++ b/test/engine/test_parseconnect.py
@@ -5,7 +5,7 @@ from sqlalchemy.engine.default import DefaultDialect
import sqlalchemy as tsa
from sqlalchemy.testing import fixtures
from sqlalchemy import testing
-from sqlalchemy.testing.mock import Mock, MagicMock
+from sqlalchemy.testing.mock import Mock, MagicMock, call
from sqlalchemy import event
from sqlalchemy import select
@@ -340,6 +340,33 @@ class TestRegNewDBAPI(fixtures.TestBase):
e = create_engine("mysql+my_mock_dialect://")
assert isinstance(e.dialect, MockDialect)
+ @testing.requires.sqlite
+ def test_wrapper_hooks(self):
+ def get_dialect_cls(url):
+ url.drivername = "sqlite"
+ return url.get_dialect()
+
+ global WrapperFactory
+ WrapperFactory = Mock()
+ WrapperFactory.get_dialect_cls.side_effect = get_dialect_cls
+
+ from sqlalchemy.dialects import registry
+ registry.register("wrapperdialect", __name__, "WrapperFactory")
+
+ from sqlalchemy.dialects import sqlite
+ e = create_engine("wrapperdialect://")
+
+ eq_(e.dialect.name, "sqlite")
+ assert isinstance(e.dialect, sqlite.dialect)
+
+ eq_(
+ WrapperFactory.mock_calls,
+ [
+ call.get_dialect_cls(url.make_url("sqlite://")),
+ call.engine_created(e)
+ ]
+ )
+
class MockDialect(DefaultDialect):
@classmethod
diff --git a/test/engine/test_pool.py b/test/engine/test_pool.py
index ff45b2d51..3d93cda89 100644
--- a/test/engine/test_pool.py
+++ b/test/engine/test_pool.py
@@ -4,11 +4,11 @@ from sqlalchemy import pool, select, event
import sqlalchemy as tsa
from sqlalchemy import testing
from sqlalchemy.testing.util import gc_collect, lazy_gc
-from sqlalchemy.testing import eq_, assert_raises, is_not_
+from sqlalchemy.testing import eq_, assert_raises, is_not_, is_
from sqlalchemy.testing.engines import testing_engine
from sqlalchemy.testing import fixtures
import random
-from sqlalchemy.testing.mock import Mock, call
+from sqlalchemy.testing.mock import Mock, call, patch
import weakref
join_timeout = 10
@@ -335,6 +335,13 @@ class PoolEventsTest(PoolTestBase):
return p, canary
+ def _soft_invalidate_event_fixture(self):
+ p = self._queuepool_fixture()
+ canary = Mock()
+ event.listen(p, 'soft_invalidate', canary)
+
+ return p, canary
+
def test_first_connect_event(self):
p, canary = self._first_connect_event_fixture()
@@ -438,6 +445,31 @@ class PoolEventsTest(PoolTestBase):
c1.close()
eq_(canary, ['reset'])
+ def test_soft_invalidate_event_no_exception(self):
+ p, canary = self._soft_invalidate_event_fixture()
+
+ c1 = p.connect()
+ c1.close()
+ assert not canary.called
+ c1 = p.connect()
+ dbapi_con = c1.connection
+ c1.invalidate(soft=True)
+ assert canary.call_args_list[0][0][0] is dbapi_con
+ assert canary.call_args_list[0][0][2] is None
+
+ def test_soft_invalidate_event_exception(self):
+ p, canary = self._soft_invalidate_event_fixture()
+
+ c1 = p.connect()
+ c1.close()
+ assert not canary.called
+ c1 = p.connect()
+ dbapi_con = c1.connection
+ exc = Exception("hi")
+ c1.invalidate(exc, soft=True)
+ assert canary.call_args_list[0][0][0] is dbapi_con
+ assert canary.call_args_list[0][0][2] is exc
+
def test_invalidate_event_no_exception(self):
p, canary = self._invalidate_event_fixture()
@@ -1130,6 +1162,44 @@ class QueuePoolTest(PoolTestBase):
eq_(len(success), 12, "successes: %s" % success)
+ def test_connrec_invalidated_within_checkout_no_race(self):
+ """Test that a concurrent ConnectionRecord.invalidate() which
+ occurs after the ConnectionFairy has called _ConnectionRecord.checkout()
+ but before the ConnectionFairy tests "fairy.connection is None"
+ will not result in an InvalidRequestError.
+
+ This use case assumes that a listener on the checkout() event
+ will be raising DisconnectionError so that a reconnect attempt
+ may occur.
+
+ """
+ dbapi = MockDBAPI()
+
+ def creator():
+ return dbapi.connect()
+
+ p = pool.QueuePool(creator=creator, pool_size=1, max_overflow=0)
+
+ conn = p.connect()
+ conn.close()
+
+ _existing_checkout = pool._ConnectionRecord.checkout
+
+ @classmethod
+ def _decorate_existing_checkout(cls, *arg, **kw):
+ fairy = _existing_checkout(*arg, **kw)
+ connrec = fairy._connection_record
+ connrec.invalidate()
+ return fairy
+
+ with patch(
+ "sqlalchemy.pool._ConnectionRecord.checkout",
+ _decorate_existing_checkout):
+ conn = p.connect()
+ is_(conn._connection_record.connection, None)
+ conn.close()
+
+
@testing.requires.threading_with_mock
@testing.requires.timing_intensive
def test_notify_waiters(self):
@@ -1323,12 +1393,36 @@ class QueuePoolTest(PoolTestBase):
c2 = p.connect()
assert id(c2.connection) == c_id
+ c2_rec = c2._connection_record
p._invalidate(c2)
+ assert c2_rec.connection is None
c2.close()
time.sleep(.5)
c3 = p.connect()
assert id(c3.connection) != c_id
+ @testing.requires.timing_intensive
+ def test_recycle_on_soft_invalidate(self):
+ p = self._queuepool_fixture(pool_size=1,
+ max_overflow=0)
+ c1 = p.connect()
+ c_id = id(c1.connection)
+ c1.close()
+ c2 = p.connect()
+ assert id(c2.connection) == c_id
+
+ c2_rec = c2._connection_record
+ c2.invalidate(soft=True)
+ assert c2_rec.connection is c2.connection
+
+ c2.close()
+ time.sleep(.5)
+ c3 = p.connect()
+ assert id(c3.connection) != c_id
+ assert c3._connection_record is c2_rec
+ assert c2_rec.connection is c3.connection
+
+
def _assert_cleanup_on_pooled_reconnect(self, dbapi, p):
# p is QueuePool with size=1, max_overflow=2,
# and one connection in the pool that will need to