summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--CHANGES11
-rw-r--r--lib/sqlalchemy/engine/base.py3
-rw-r--r--lib/sqlalchemy/pool.py82
-rw-r--r--lib/sqlalchemy/util/queue.py40
-rw-r--r--test/engine/test_pool.py63
5 files changed, 161 insertions, 38 deletions
diff --git a/CHANGES b/CHANGES
index c6e00d8b2..9721775f7 100644
--- a/CHANGES
+++ b/CHANGES
@@ -146,6 +146,17 @@ underneath "0.7.xx".
(use sqlalchemy.ext.horizontal_shard)
- engine
+ - [bug] Fixed bug whereby
+ a disconnect detect + dispose that occurs
+ when the QueuePool has threads waiting
+ for connections would leave those
+ threads waiting for the duration of
+ the timeout on the old pool. The fix
+ now notifies those waiters with a special
+ exception case and has them move onto
+ the new pool. This fix may or may
+ not be ported to 0.7. [ticket:2522]
+
- [feature] Added a new system
for registration of new dialects in-process
without using an entrypoint. See the
diff --git a/lib/sqlalchemy/engine/base.py b/lib/sqlalchemy/engine/base.py
index d13344ff6..75f6ac29a 100644
--- a/lib/sqlalchemy/engine/base.py
+++ b/lib/sqlalchemy/engine/base.py
@@ -2253,8 +2253,7 @@ class Engine(Connectable, log.Identified):
the engine are not affected.
"""
- self.pool.dispose()
- self.pool = self.pool.recreate()
+ self.pool = self.pool._replace()
@util.deprecated("0.7", "Use the create() method on the given schema "
"object directly, i.e. :meth:`.Table.create`, "
diff --git a/lib/sqlalchemy/pool.py b/lib/sqlalchemy/pool.py
index 9aabe689b..f9e0f98dd 100644
--- a/lib/sqlalchemy/pool.py
+++ b/lib/sqlalchemy/pool.py
@@ -22,7 +22,6 @@ from sqlalchemy import exc, log, event, events, interfaces, util
from sqlalchemy.util import queue as sqla_queue
from sqlalchemy.util import threading, memoized_property, \
chop_traceback
-
proxies = {}
def manage(module, **params):
@@ -212,6 +211,17 @@ class Pool(log.Identified):
raise NotImplementedError()
+ def _replace(self):
+ """Dispose + recreate this pool.
+
+ Subclasses may employ special logic to
+ move threads waiting on this pool to the
+ new one.
+
+ """
+ self.dispose()
+ return self.recreate()
+
def connect(self):
"""Return a DBAPI connection from the pool.
@@ -580,6 +590,12 @@ class SingletonThreadPool(Pool):
self._cleanup()
return c
+class DummyLock(object):
+ def acquire(self, wait=True):
+ return True
+ def release(self):
+ pass
+
class QueuePool(Pool):
"""A :class:`.Pool` that imposes a limit on the number of open connections.
@@ -688,37 +704,26 @@ class QueuePool(Pool):
self._max_overflow = max_overflow
self._timeout = timeout
self._overflow_lock = self._max_overflow > -1 and \
- threading.Lock() or None
-
- def recreate(self):
- self.logger.info("Pool recreating")
- return self.__class__(self._creator, pool_size=self._pool.maxsize,
- max_overflow=self._max_overflow,
- timeout=self._timeout,
- recycle=self._recycle, echo=self.echo,
- logging_name=self._orig_logging_name,
- use_threadlocal=self._use_threadlocal,
- _dispatch=self.dispatch)
+ threading.Lock() or DummyLock()
def _do_return_conn(self, conn):
try:
self._pool.put(conn, False)
except sqla_queue.Full:
conn.close()
- if self._overflow_lock is None:
+ self._overflow_lock.acquire()
+ try:
self._overflow -= 1
- else:
- self._overflow_lock.acquire()
- try:
- self._overflow -= 1
- finally:
- self._overflow_lock.release()
+ finally:
+ self._overflow_lock.release()
def _do_get(self):
try:
wait = self._max_overflow > -1 and \
self._overflow >= self._max_overflow
return self._pool.get(wait, self._timeout)
+ except sqla_queue.SAAbort, aborted:
+ return aborted.context._do_get()
except sqla_queue.Empty:
if self._max_overflow > -1 and \
self._overflow >= self._max_overflow:
@@ -730,22 +735,27 @@ class QueuePool(Pool):
"connection timed out, timeout %d" %
(self.size(), self.overflow(), self._timeout))
- if self._overflow_lock is not None:
- self._overflow_lock.acquire()
-
- if self._max_overflow > -1 and \
- self._overflow >= self._max_overflow:
- if self._overflow_lock is not None:
- self._overflow_lock.release()
- return self._do_get()
-
+ self._overflow_lock.acquire()
try:
- con = self._create_connection()
- self._overflow += 1
+ if self._max_overflow > -1 and \
+ self._overflow >= self._max_overflow:
+ return self._do_get()
+ else:
+ con = self._create_connection()
+ self._overflow += 1
+ return con
finally:
- if self._overflow_lock is not None:
- self._overflow_lock.release()
- return con
+ self._overflow_lock.release()
+
+ def recreate(self):
+ self.logger.info("Pool recreating")
+ return self.__class__(self._creator, pool_size=self._pool.maxsize,
+ max_overflow=self._max_overflow,
+ timeout=self._timeout,
+ recycle=self._recycle, echo=self.echo,
+ logging_name=self._orig_logging_name,
+ use_threadlocal=self._use_threadlocal,
+ _dispatch=self.dispatch)
def dispose(self):
while True:
@@ -758,6 +768,12 @@ class QueuePool(Pool):
self._overflow = 0 - self.size()
self.logger.info("Pool disposed. %s", self.status())
+ def _replace(self):
+ self.dispose()
+ np = self.recreate()
+ self._pool.abort(np)
+ return np
+
def status(self):
return "Pool size: %d Connections in pool: %d "\
"Current Overflow: %d Current Checked out "\
diff --git a/lib/sqlalchemy/util/queue.py b/lib/sqlalchemy/util/queue.py
index e71ceb458..ebf736331 100644
--- a/lib/sqlalchemy/util/queue.py
+++ b/lib/sqlalchemy/util/queue.py
@@ -5,19 +5,28 @@
# the MIT License: http://www.opensource.org/licenses/mit-license.php
"""An adaptation of Py2.3/2.4's Queue module which supports reentrant
-behavior, using RLock instead of Lock for its mutex object.
+behavior, using RLock instead of Lock for its mutex object. The
+Queue object is used exclusively by the sqlalchemy.pool.QueuePool
+class.
This is to support the connection pool's usage of weakref callbacks to return
connections to the underlying Queue, which can in extremely
rare cases be invoked within the ``get()`` method of the Queue itself,
producing a ``put()`` inside the ``get()`` and therefore a reentrant
-condition."""
+condition.
+
+An additional change includes a special "abort" method which can be used
+to immediately raise a special exception for threads that are blocking
+on get(). This is to accommodate a rare race condition that can occur
+within QueuePool.
+
+"""
from collections import deque
from time import time as _time
from sqlalchemy.util import threading
-__all__ = ['Empty', 'Full', 'Queue']
+__all__ = ['Empty', 'Full', 'Queue', 'SAAbort']
class Empty(Exception):
"Exception raised by Queue.get(block=0)/get_nowait()."
@@ -29,6 +38,11 @@ class Full(Exception):
pass
+class SAAbort(Exception):
+ "Special SQLA exception to abort waiting"
+ def __init__(self, context):
+ self.context = context
+
class Queue:
def __init__(self, maxsize=0):
"""Initialize a queue object with a given maximum size.
@@ -49,6 +63,9 @@ class Queue:
# a thread waiting to put is notified then.
self.not_full = threading.Condition(self.mutex)
+ # when this is set, SAAbort is raised within get().
+ self._sqla_abort_context = False
+
def qsize(self):
"""Return the approximate size of the queue (not reliable!)."""
@@ -138,6 +155,8 @@ class Queue:
elif timeout is None:
while self._empty():
self.not_empty.wait()
+ if self._sqla_abort_context:
+ raise SAAbort(self._sqla_abort_context)
else:
if timeout < 0:
raise ValueError("'timeout' must be a positive number")
@@ -147,12 +166,27 @@ class Queue:
if remaining <= 0.0:
raise Empty
self.not_empty.wait(remaining)
+ if self._sqla_abort_context:
+ raise SAAbort(self._sqla_abort_context)
item = self._get()
self.not_full.notify()
return item
finally:
self.not_empty.release()
+ def abort(self, context):
+ """Issue an 'abort', will force any thread waiting on get()
+ to stop waiting and raise SAAbort.
+
+ """
+ self._sqla_abort_context = context
+ if not self.not_full.acquire(False):
+ return
+ try:
+ self.not_empty.notify()
+ finally:
+ self.not_full.release()
+
def get_nowait(self):
"""Remove and return an item from the queue without blocking.
diff --git a/test/engine/test_pool.py b/test/engine/test_pool.py
index 789324445..4d5572891 100644
--- a/test/engine/test_pool.py
+++ b/test/engine/test_pool.py
@@ -802,7 +802,70 @@ class QueuePoolTest(PoolTestBase):
lazy_gc()
assert not pool._refs
+
+ def test_waiters_handled(self):
+ """test that threads waiting for connections are
+ handled when the pool is replaced.
+
+ """
+ dbapi = MockDBAPI()
+ def creator():
+ return dbapi.connect()
+ success = []
+ for timeout in (None, 30):
+ for max_overflow in (-1, 0, 3):
+ p = pool.QueuePool(creator=creator,
+ pool_size=2, timeout=timeout,
+ max_overflow=max_overflow)
+ def waiter(p):
+ conn = p.connect()
+ time.sleep(.5)
+ success.append(True)
+ conn.close()
+
+ time.sleep(.2)
+ c1 = p.connect()
+ c2 = p.connect()
+
+ t = threading.Thread(target=waiter, args=(p, ))
+ t.setDaemon(True) # so the tests dont hang if this fails
+ t.start()
+
+ c1.invalidate()
+ c2.invalidate()
+ p2 = p._replace()
+ time.sleep(1)
+ eq_(len(success), 6)
+
+ def test_dispose_closes_pooled(self):
+ dbapi = MockDBAPI()
+ def creator():
+ return dbapi.connect()
+
+ p = pool.QueuePool(creator=creator,
+ pool_size=2, timeout=None,
+ max_overflow=0)
+ c1 = p.connect()
+ c2 = p.connect()
+ conns = [c1.connection, c2.connection]
+ c1.close()
+ eq_([c.closed for c in conns], [False, False])
+ p.dispose()
+ eq_([c.closed for c in conns], [True, False])
+
+ # currently, if a ConnectionFairy is closed
+ # after the pool has been disposed, there's no
+ # flag that states it should be invalidated
+ # immediately - it just gets returned to the
+ # pool normally...
+ c2.close()
+ eq_([c.closed for c in conns], [True, False])
+
+ # ...and that's the one we'll get back next.
+ c3 = p.connect()
+ assert c3.connection is conns[1]
+
def test_no_overflow(self):
self._test_overflow(40, 0)