diff options
-rw-r--r-- | CHANGES | 11 | ||||
-rw-r--r-- | lib/sqlalchemy/engine/base.py | 3 | ||||
-rw-r--r-- | lib/sqlalchemy/pool.py | 82 | ||||
-rw-r--r-- | lib/sqlalchemy/util/queue.py | 40 | ||||
-rw-r--r-- | test/engine/test_pool.py | 63 |
5 files changed, 161 insertions, 38 deletions
@@ -146,6 +146,17 @@ underneath "0.7.xx". (use sqlalchemy.ext.horizontal_shard) - engine + - [bug] Fixed bug whereby + a disconnect detect + dispose that occurs + when the QueuePool has threads waiting + for connections would leave those + threads waiting for the duration of + the timeout on the old pool. The fix + now notifies those waiters with a special + exception case and has them move onto + the new pool. This fix may or may + not be ported to 0.7. [ticket:2522] + - [feature] Added a new system for registration of new dialects in-process without using an entrypoint. See the diff --git a/lib/sqlalchemy/engine/base.py b/lib/sqlalchemy/engine/base.py index d13344ff6..75f6ac29a 100644 --- a/lib/sqlalchemy/engine/base.py +++ b/lib/sqlalchemy/engine/base.py @@ -2253,8 +2253,7 @@ class Engine(Connectable, log.Identified): the engine are not affected. """ - self.pool.dispose() - self.pool = self.pool.recreate() + self.pool = self.pool._replace() @util.deprecated("0.7", "Use the create() method on the given schema " "object directly, i.e. :meth:`.Table.create`, " diff --git a/lib/sqlalchemy/pool.py b/lib/sqlalchemy/pool.py index 9aabe689b..f9e0f98dd 100644 --- a/lib/sqlalchemy/pool.py +++ b/lib/sqlalchemy/pool.py @@ -22,7 +22,6 @@ from sqlalchemy import exc, log, event, events, interfaces, util from sqlalchemy.util import queue as sqla_queue from sqlalchemy.util import threading, memoized_property, \ chop_traceback - proxies = {} def manage(module, **params): @@ -212,6 +211,17 @@ class Pool(log.Identified): raise NotImplementedError() + def _replace(self): + """Dispose + recreate this pool. + + Subclasses may employ special logic to + move threads waiting on this pool to the + new one. + + """ + self.dispose() + return self.recreate() + def connect(self): """Return a DBAPI connection from the pool. @@ -580,6 +590,12 @@ class SingletonThreadPool(Pool): self._cleanup() return c +class DummyLock(object): + def acquire(self, wait=True): + return True + def release(self): + pass + class QueuePool(Pool): """A :class:`.Pool` that imposes a limit on the number of open connections. @@ -688,37 +704,26 @@ class QueuePool(Pool): self._max_overflow = max_overflow self._timeout = timeout self._overflow_lock = self._max_overflow > -1 and \ - threading.Lock() or None - - def recreate(self): - self.logger.info("Pool recreating") - return self.__class__(self._creator, pool_size=self._pool.maxsize, - max_overflow=self._max_overflow, - timeout=self._timeout, - recycle=self._recycle, echo=self.echo, - logging_name=self._orig_logging_name, - use_threadlocal=self._use_threadlocal, - _dispatch=self.dispatch) + threading.Lock() or DummyLock() def _do_return_conn(self, conn): try: self._pool.put(conn, False) except sqla_queue.Full: conn.close() - if self._overflow_lock is None: + self._overflow_lock.acquire() + try: self._overflow -= 1 - else: - self._overflow_lock.acquire() - try: - self._overflow -= 1 - finally: - self._overflow_lock.release() + finally: + self._overflow_lock.release() def _do_get(self): try: wait = self._max_overflow > -1 and \ self._overflow >= self._max_overflow return self._pool.get(wait, self._timeout) + except sqla_queue.SAAbort, aborted: + return aborted.context._do_get() except sqla_queue.Empty: if self._max_overflow > -1 and \ self._overflow >= self._max_overflow: @@ -730,22 +735,27 @@ class QueuePool(Pool): "connection timed out, timeout %d" % (self.size(), self.overflow(), self._timeout)) - if self._overflow_lock is not None: - self._overflow_lock.acquire() - - if self._max_overflow > -1 and \ - self._overflow >= self._max_overflow: - if self._overflow_lock is not None: - self._overflow_lock.release() - return self._do_get() - + self._overflow_lock.acquire() try: - con = self._create_connection() - self._overflow += 1 + if self._max_overflow > -1 and \ + self._overflow >= self._max_overflow: + return self._do_get() + else: + con = self._create_connection() + self._overflow += 1 + return con finally: - if self._overflow_lock is not None: - self._overflow_lock.release() - return con + self._overflow_lock.release() + + def recreate(self): + self.logger.info("Pool recreating") + return self.__class__(self._creator, pool_size=self._pool.maxsize, + max_overflow=self._max_overflow, + timeout=self._timeout, + recycle=self._recycle, echo=self.echo, + logging_name=self._orig_logging_name, + use_threadlocal=self._use_threadlocal, + _dispatch=self.dispatch) def dispose(self): while True: @@ -758,6 +768,12 @@ class QueuePool(Pool): self._overflow = 0 - self.size() self.logger.info("Pool disposed. %s", self.status()) + def _replace(self): + self.dispose() + np = self.recreate() + self._pool.abort(np) + return np + def status(self): return "Pool size: %d Connections in pool: %d "\ "Current Overflow: %d Current Checked out "\ diff --git a/lib/sqlalchemy/util/queue.py b/lib/sqlalchemy/util/queue.py index e71ceb458..ebf736331 100644 --- a/lib/sqlalchemy/util/queue.py +++ b/lib/sqlalchemy/util/queue.py @@ -5,19 +5,28 @@ # the MIT License: http://www.opensource.org/licenses/mit-license.php """An adaptation of Py2.3/2.4's Queue module which supports reentrant -behavior, using RLock instead of Lock for its mutex object. +behavior, using RLock instead of Lock for its mutex object. The +Queue object is used exclusively by the sqlalchemy.pool.QueuePool +class. This is to support the connection pool's usage of weakref callbacks to return connections to the underlying Queue, which can in extremely rare cases be invoked within the ``get()`` method of the Queue itself, producing a ``put()`` inside the ``get()`` and therefore a reentrant -condition.""" +condition. + +An additional change includes a special "abort" method which can be used +to immediately raise a special exception for threads that are blocking +on get(). This is to accommodate a rare race condition that can occur +within QueuePool. + +""" from collections import deque from time import time as _time from sqlalchemy.util import threading -__all__ = ['Empty', 'Full', 'Queue'] +__all__ = ['Empty', 'Full', 'Queue', 'SAAbort'] class Empty(Exception): "Exception raised by Queue.get(block=0)/get_nowait()." @@ -29,6 +38,11 @@ class Full(Exception): pass +class SAAbort(Exception): + "Special SQLA exception to abort waiting" + def __init__(self, context): + self.context = context + class Queue: def __init__(self, maxsize=0): """Initialize a queue object with a given maximum size. @@ -49,6 +63,9 @@ class Queue: # a thread waiting to put is notified then. self.not_full = threading.Condition(self.mutex) + # when this is set, SAAbort is raised within get(). + self._sqla_abort_context = False + def qsize(self): """Return the approximate size of the queue (not reliable!).""" @@ -138,6 +155,8 @@ class Queue: elif timeout is None: while self._empty(): self.not_empty.wait() + if self._sqla_abort_context: + raise SAAbort(self._sqla_abort_context) else: if timeout < 0: raise ValueError("'timeout' must be a positive number") @@ -147,12 +166,27 @@ class Queue: if remaining <= 0.0: raise Empty self.not_empty.wait(remaining) + if self._sqla_abort_context: + raise SAAbort(self._sqla_abort_context) item = self._get() self.not_full.notify() return item finally: self.not_empty.release() + def abort(self, context): + """Issue an 'abort', will force any thread waiting on get() + to stop waiting and raise SAAbort. + + """ + self._sqla_abort_context = context + if not self.not_full.acquire(False): + return + try: + self.not_empty.notify() + finally: + self.not_full.release() + def get_nowait(self): """Remove and return an item from the queue without blocking. diff --git a/test/engine/test_pool.py b/test/engine/test_pool.py index 789324445..4d5572891 100644 --- a/test/engine/test_pool.py +++ b/test/engine/test_pool.py @@ -802,7 +802,70 @@ class QueuePoolTest(PoolTestBase): lazy_gc() assert not pool._refs + + def test_waiters_handled(self): + """test that threads waiting for connections are + handled when the pool is replaced. + + """ + dbapi = MockDBAPI() + def creator(): + return dbapi.connect() + success = [] + for timeout in (None, 30): + for max_overflow in (-1, 0, 3): + p = pool.QueuePool(creator=creator, + pool_size=2, timeout=timeout, + max_overflow=max_overflow) + def waiter(p): + conn = p.connect() + time.sleep(.5) + success.append(True) + conn.close() + + time.sleep(.2) + c1 = p.connect() + c2 = p.connect() + + t = threading.Thread(target=waiter, args=(p, )) + t.setDaemon(True) # so the tests dont hang if this fails + t.start() + + c1.invalidate() + c2.invalidate() + p2 = p._replace() + time.sleep(1) + eq_(len(success), 6) + + def test_dispose_closes_pooled(self): + dbapi = MockDBAPI() + def creator(): + return dbapi.connect() + + p = pool.QueuePool(creator=creator, + pool_size=2, timeout=None, + max_overflow=0) + c1 = p.connect() + c2 = p.connect() + conns = [c1.connection, c2.connection] + c1.close() + eq_([c.closed for c in conns], [False, False]) + p.dispose() + eq_([c.closed for c in conns], [True, False]) + + # currently, if a ConnectionFairy is closed + # after the pool has been disposed, there's no + # flag that states it should be invalidated + # immediately - it just gets returned to the + # pool normally... + c2.close() + eq_([c.closed for c in conns], [True, False]) + + # ...and that's the one we'll get back next. + c3 = p.connect() + assert c3.connection is conns[1] + def test_no_overflow(self): self._test_overflow(40, 0) |