diff options
-rw-r--r-- | doc/build/changelog/changelog_09.rst | 15 | ||||
-rw-r--r-- | lib/sqlalchemy/engine/base.py | 6 | ||||
-rw-r--r-- | lib/sqlalchemy/orm/strategies.py | 1 | ||||
-rw-r--r-- | lib/sqlalchemy/pool.py | 46 | ||||
-rw-r--r-- | lib/sqlalchemy/util/queue.py | 41 | ||||
-rw-r--r-- | test/engine/test_pool.py | 103 | ||||
-rw-r--r-- | test/engine/test_reconnect.py | 14 |
7 files changed, 147 insertions, 79 deletions
diff --git a/doc/build/changelog/changelog_09.rst b/doc/build/changelog/changelog_09.rst index 2e4e5f098..c4fa76e49 100644 --- a/doc/build/changelog/changelog_09.rst +++ b/doc/build/changelog/changelog_09.rst @@ -15,6 +15,21 @@ :version: 0.9.4 .. change:: + :tags: bug, engine + :tickets: 2985 + + A major improvement made to the mechanics by which the :class:`.Engine` + recycles the connection pool when a "disconnect" condition is detected; + instead of discarding the pool and explicitly closing out connections, + the pool is retained and a "generational" timestamp is updated to + reflect the current time, thereby causing all existing connections + to be recycled when they are next checked out. This greatly simplifies + the recycle process, removes the need for "waking up" connect attempts + waiting on the old pool and eliminates the race condition that many + immediately-discarded "pool" objects could be created during the + recycle operation. + + .. change:: :tags: bug, oracle :tickets: 2987 diff --git a/lib/sqlalchemy/engine/base.py b/lib/sqlalchemy/engine/base.py index d3024640b..2cad2a094 100644 --- a/lib/sqlalchemy/engine/base.py +++ b/lib/sqlalchemy/engine/base.py @@ -1091,9 +1091,7 @@ class Connection(Connectable): del self._is_disconnect dbapi_conn_wrapper = self.connection self.invalidate(e) - if not hasattr(dbapi_conn_wrapper, '_pool') or \ - dbapi_conn_wrapper._pool is self.engine.pool: - self.engine.dispose() + self.engine.pool._invalidate(dbapi_conn_wrapper) if self.should_close_with_result: self.close() @@ -1503,7 +1501,7 @@ class Engine(Connectable, log.Identified): the engine are not affected. """ - self.pool = self.pool._replace() + self.pool.dispose() def _execute_default(self, default): with self.contextual_connect() as conn: diff --git a/lib/sqlalchemy/orm/strategies.py b/lib/sqlalchemy/orm/strategies.py index 473b665c8..4a07e7856 100644 --- a/lib/sqlalchemy/orm/strategies.py +++ b/lib/sqlalchemy/orm/strategies.py @@ -528,7 +528,6 @@ class LazyLoader(AbstractRelationshipLoader): def _emit_lazyload(self, strategy_options, session, state, ident_key, passive): q = session.query(self.mapper)._adapt_all_clauses() - if self.parent_property.secondary is not None: q = q.select_from(self.mapper, self.parent_property.secondary) diff --git a/lib/sqlalchemy/pool.py b/lib/sqlalchemy/pool.py index 59c1e614a..7fc4fc659 100644 --- a/lib/sqlalchemy/pool.py +++ b/lib/sqlalchemy/pool.py @@ -210,6 +210,7 @@ class Pool(log.Identified): self._threadconns = threading.local() self._creator = creator self._recycle = recycle + self._invalidate_time = 0 self._use_threadlocal = use_threadlocal if reset_on_return in ('rollback', True, reset_rollback): self._reset_on_return = reset_rollback @@ -276,6 +277,22 @@ class Pool(log.Identified): return _ConnectionRecord(self) + def _invalidate(self, connection): + """Mark all connections established within the generation + of the given connection as invalidated. + + If this pool's last invalidate time is before when the given + connection was created, update the timestamp til now. Otherwise, + no action is performed. + + Connections with a start time prior to this pool's invalidation + time will be recycled upon next checkout. + """ + rec = getattr(connection, "_connection_record", None) + if not rec or self._invalidate_time < rec.starttime: + self._invalidate_time = time.time() + + def recreate(self): """Return a new :class:`.Pool`, of the same class as this one and configured with identical creation arguments. @@ -301,17 +318,6 @@ class Pool(log.Identified): raise NotImplementedError() - def _replace(self): - """Dispose + recreate this pool. - - Subclasses may employ special logic to - move threads waiting on this pool to the - new one. - - """ - self.dispose() - return self.recreate() - def connect(self): """Return a DBAPI connection from the pool. @@ -483,6 +489,7 @@ class _ConnectionRecord(object): self.connection = None def get_connection(self): + recycle = False if self.connection is None: self.connection = self.__connect() self.info.clear() @@ -493,6 +500,15 @@ class _ConnectionRecord(object): self.__pool.logger.info( "Connection %r exceeded timeout; recycling", self.connection) + recycle = True + elif self.__pool._invalidate_time > self.starttime: + self.__pool.logger.info( + "Connection %r invalidated due to pool invalidation; recycling", + self.connection + ) + recycle = True + + if recycle: self.__close() self.connection = self.__connect() self.info.clear() @@ -911,8 +927,6 @@ class QueuePool(Pool): try: wait = use_overflow and self._overflow >= self._max_overflow return self._pool.get(wait, self._timeout) - except sqla_queue.SAAbort as aborted: - return aborted.context._do_get() except sqla_queue.Empty: if use_overflow and self._overflow >= self._max_overflow: if not wait: @@ -974,12 +988,6 @@ class QueuePool(Pool): self._overflow = 0 - self.size() self.logger.info("Pool disposed. %s", self.status()) - def _replace(self): - self.dispose() - np = self.recreate() - self._pool.abort(np) - return np - def status(self): return "Pool size: %d Connections in pool: %d "\ "Current Overflow: %d Current Checked out "\ diff --git a/lib/sqlalchemy/util/queue.py b/lib/sqlalchemy/util/queue.py index 82ff55a5d..c98aa7fda 100644 --- a/lib/sqlalchemy/util/queue.py +++ b/lib/sqlalchemy/util/queue.py @@ -15,11 +15,6 @@ rare cases be invoked within the ``get()`` method of the Queue itself, producing a ``put()`` inside the ``get()`` and therefore a reentrant condition. -An additional change includes a special "abort" method which can be used -to immediately raise a special exception for threads that are blocking -on get(). This is to accommodate a rare race condition that can occur -within QueuePool. - """ from collections import deque @@ -27,7 +22,7 @@ from time import time as _time from .compat import threading -__all__ = ['Empty', 'Full', 'Queue', 'SAAbort'] +__all__ = ['Empty', 'Full', 'Queue'] class Empty(Exception): @@ -42,12 +37,6 @@ class Full(Exception): pass -class SAAbort(Exception): - "Special SQLA exception to abort waiting" - def __init__(self, context): - self.context = context - - class Queue: def __init__(self, maxsize=0): """Initialize a queue object with a given maximum size. @@ -68,8 +57,6 @@ class Queue: # a thread waiting to put is notified then. self.not_full = threading.Condition(self.mutex) - # when this is set, SAAbort is raised within get(). - self._sqla_abort_context = False def qsize(self): """Return the approximate size of the queue (not reliable!).""" @@ -158,13 +145,7 @@ class Queue: raise Empty elif timeout is None: while self._empty(): - # wait for only half a second, then - # loop around, so that we can see a change in - # _sqla_abort_context in case we missed the notify_all() - # called by abort() - self.not_empty.wait(.5) - if self._sqla_abort_context: - raise SAAbort(self._sqla_abort_context) + self.not_empty.wait() else: if timeout < 0: raise ValueError("'timeout' must be a positive number") @@ -174,30 +155,12 @@ class Queue: if remaining <= 0.0: raise Empty self.not_empty.wait(remaining) - if self._sqla_abort_context: - raise SAAbort(self._sqla_abort_context) item = self._get() self.not_full.notify() return item finally: self.not_empty.release() - def abort(self, context): - """Issue an 'abort', will force any thread waiting on get() - to stop waiting and raise SAAbort. - - """ - self._sqla_abort_context = context - if not self.not_full.acquire(False): - return - try: - # note that this is now optional - # as the waiters in get() both loop around - # to check the _sqla_abort_context flag periodically - self.not_empty.notify_all() - finally: - self.not_full.release() - def get_nowait(self): """Remove and return an item from the queue without blocking. diff --git a/test/engine/test_pool.py b/test/engine/test_pool.py index fc6f3dcea..bbab0a7c3 100644 --- a/test/engine/test_pool.py +++ b/test/engine/test_pool.py @@ -7,7 +7,7 @@ from sqlalchemy.testing.util import gc_collect, lazy_gc from sqlalchemy.testing import eq_, assert_raises, is_not_ from sqlalchemy.testing.engines import testing_engine from sqlalchemy.testing import fixtures - +import random from sqlalchemy.testing.mock import Mock, call join_timeout = 10 @@ -1069,7 +1069,8 @@ class QueuePoolTest(PoolTestBase): # inside the queue, before we invalidate the other # two conns time.sleep(.2) - p2 = p._replace() + p._invalidate(c2) + c2.invalidate() for t in threads: t.join(join_timeout) @@ -1079,19 +1080,18 @@ class QueuePoolTest(PoolTestBase): @testing.requires.threading_with_mock def test_notify_waiters(self): dbapi = MockDBAPI() + canary = [] - def creator1(): + def creator(): canary.append(1) return dbapi.connect() - def creator2(): - canary.append(2) - return dbapi.connect() - p1 = pool.QueuePool(creator=creator1, + p1 = pool.QueuePool(creator=creator, pool_size=1, timeout=None, max_overflow=0) - p2 = pool.NullPool(creator=creator2) + #p2 = pool.NullPool(creator=creator2) def waiter(p): conn = p.connect() + canary.append(2) time.sleep(.5) conn.close() @@ -1104,12 +1104,14 @@ class QueuePoolTest(PoolTestBase): threads.append(t) time.sleep(.5) eq_(canary, [1]) - p1._pool.abort(p2) + + c1.invalidate() + p1._invalidate(c1) for t in threads: t.join(join_timeout) - eq_(canary, [1, 2, 2, 2, 2, 2]) + eq_(canary, [1, 1, 2, 2, 2, 2, 2]) def test_dispose_closes_pooled(self): dbapi = MockDBAPI() @@ -1251,6 +1253,21 @@ class QueuePoolTest(PoolTestBase): c3 = p.connect() assert id(c3.connection) != c_id + def test_recycle_on_invalidate(self): + p = self._queuepool_fixture(pool_size=1, + max_overflow=0) + c1 = p.connect() + c_id = id(c1.connection) + c1.close() + c2 = p.connect() + assert id(c2.connection) == c_id + + p._invalidate(c2) + c2.close() + time.sleep(.5) + c3 = p.connect() + assert id(c3.connection) != c_id + def _assert_cleanup_on_pooled_reconnect(self, dbapi, p): # p is QueuePool with size=1, max_overflow=2, # and one connection in the pool that will need to @@ -1290,6 +1307,72 @@ class QueuePoolTest(PoolTestBase): time.sleep(1) self._assert_cleanup_on_pooled_reconnect(dbapi, p) + def test_recycle_pool_no_race(self): + def slow_close(): + slow_closing_connection._slow_close() + time.sleep(.5) + + slow_closing_connection = Mock() + slow_closing_connection.connect.return_value.close = slow_close + + class Error(Exception): + pass + + dialect = Mock() + dialect.is_disconnect = lambda *arg, **kw: True + dialect.dbapi.Error = Error + + pools = [] + class TrackQueuePool(pool.QueuePool): + def __init__(self, *arg, **kw): + pools.append(self) + super(TrackQueuePool, self).__init__(*arg, **kw) + + def creator(): + return slow_closing_connection.connect() + p1 = TrackQueuePool(creator=creator, pool_size=20) + + from sqlalchemy import create_engine + eng = create_engine("postgresql://", pool=p1, _initialize=False) + eng.dialect = dialect + + # 15 total connections + conns = [eng.connect() for i in range(15)] + + # return 8 back to the pool + for conn in conns[3:10]: + conn.close() + + def attempt(conn): + time.sleep(random.random()) + try: + conn._handle_dbapi_exception(Error(), "statement", {}, Mock(), Mock()) + except tsa.exc.DBAPIError: + pass + + # run an error + invalidate operation on the remaining 7 open connections + threads = [] + for conn in conns: + t = threading.Thread(target=attempt, args=(conn, )) + t.start() + threads.append(t) + + for t in threads: + t.join() + + # return all 15 connections to the pool + for conn in conns: + conn.close() + + # re-open 15 total connections + conns = [eng.connect() for i in range(15)] + + # 15 connections have been fully closed due to invalidate + assert slow_closing_connection._slow_close.call_count == 15 + + # 15 initial connections + 15 reconnections + assert slow_closing_connection.connect.call_count == 30 + assert len(pools) <= 2, len(pools) def test_invalidate(self): p = self._queuepool_fixture(pool_size=1, max_overflow=0) diff --git a/test/engine/test_reconnect.py b/test/engine/test_reconnect.py index ba336a1bf..a3ad9c548 100644 --- a/test/engine/test_reconnect.py +++ b/test/engine/test_reconnect.py @@ -146,16 +146,20 @@ class MockReconnectTest(fixtures.TestBase): # close shouldnt break conn.close() - is_not_(self.db.pool, db_pool) - - # ensure all connections closed (pool was recycled) + # ensure one connection closed... eq_( [c.close.mock_calls for c in self.dbapi.connections], - [[call()], [call()]] + [[call()], []] ) conn = self.db.connect() + + eq_( + [c.close.mock_calls for c in self.dbapi.connections], + [[call()], [call()], []] + ) + conn.execute(select([1])) conn.close() @@ -534,8 +538,6 @@ class RealReconnectTest(fixtures.TestBase): # invalidate() also doesn't screw up assert_raises(exc.DBAPIError, engine.connect) - # pool was recreated - assert engine.pool is not p1 def test_null_pool(self): engine = \ |