summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMike Bayer <mike_mp@zzzcomputing.com>2012-06-22 12:24:08 -0400
committerMike Bayer <mike_mp@zzzcomputing.com>2012-06-22 12:24:08 -0400
commit5f0a7bb152b30dd7b05771725a7ffe16e3af8f8a (patch)
treebc17038b5eb1a82ce41accbba56d9228d540858f
parent51a3a9ac8a76096a6a25eb2cc7404970561d5123 (diff)
downloadsqlalchemy-5f0a7bb152b30dd7b05771725a7ffe16e3af8f8a.tar.gz
- [bug] Fixed bug whereby
a disconnect detect + dispose that occurs when the QueuePool has threads waiting for connections would leave those threads waiting for the duration of the timeout on the old pool. The fix now notifies those waiters with a special exception case and has them move onto the new pool. This fix may or may not be ported to 0.7. [ticket:2522]
-rw-r--r--CHANGES11
-rw-r--r--lib/sqlalchemy/engine/base.py3
-rw-r--r--lib/sqlalchemy/pool.py82
-rw-r--r--lib/sqlalchemy/util/queue.py40
-rw-r--r--test/engine/test_pool.py63
5 files changed, 161 insertions, 38 deletions
diff --git a/CHANGES b/CHANGES
index c6e00d8b2..9721775f7 100644
--- a/CHANGES
+++ b/CHANGES
@@ -146,6 +146,17 @@ underneath "0.7.xx".
(use sqlalchemy.ext.horizontal_shard)
- engine
+ - [bug] Fixed bug whereby
+ a disconnect detect + dispose that occurs
+ when the QueuePool has threads waiting
+ for connections would leave those
+ threads waiting for the duration of
+ the timeout on the old pool. The fix
+ now notifies those waiters with a special
+ exception case and has them move onto
+ the new pool. This fix may or may
+ not be ported to 0.7. [ticket:2522]
+
- [feature] Added a new system
for registration of new dialects in-process
without using an entrypoint. See the
diff --git a/lib/sqlalchemy/engine/base.py b/lib/sqlalchemy/engine/base.py
index d13344ff6..75f6ac29a 100644
--- a/lib/sqlalchemy/engine/base.py
+++ b/lib/sqlalchemy/engine/base.py
@@ -2253,8 +2253,7 @@ class Engine(Connectable, log.Identified):
the engine are not affected.
"""
- self.pool.dispose()
- self.pool = self.pool.recreate()
+ self.pool = self.pool._replace()
@util.deprecated("0.7", "Use the create() method on the given schema "
"object directly, i.e. :meth:`.Table.create`, "
diff --git a/lib/sqlalchemy/pool.py b/lib/sqlalchemy/pool.py
index 9aabe689b..f9e0f98dd 100644
--- a/lib/sqlalchemy/pool.py
+++ b/lib/sqlalchemy/pool.py
@@ -22,7 +22,6 @@ from sqlalchemy import exc, log, event, events, interfaces, util
from sqlalchemy.util import queue as sqla_queue
from sqlalchemy.util import threading, memoized_property, \
chop_traceback
-
proxies = {}
def manage(module, **params):
@@ -212,6 +211,17 @@ class Pool(log.Identified):
raise NotImplementedError()
+ def _replace(self):
+ """Dispose + recreate this pool.
+
+ Subclasses may employ special logic to
+ move threads waiting on this pool to the
+ new one.
+
+ """
+ self.dispose()
+ return self.recreate()
+
def connect(self):
"""Return a DBAPI connection from the pool.
@@ -580,6 +590,12 @@ class SingletonThreadPool(Pool):
self._cleanup()
return c
+class DummyLock(object):
+ def acquire(self, wait=True):
+ return True
+ def release(self):
+ pass
+
class QueuePool(Pool):
"""A :class:`.Pool` that imposes a limit on the number of open connections.
@@ -688,37 +704,26 @@ class QueuePool(Pool):
self._max_overflow = max_overflow
self._timeout = timeout
self._overflow_lock = self._max_overflow > -1 and \
- threading.Lock() or None
-
- def recreate(self):
- self.logger.info("Pool recreating")
- return self.__class__(self._creator, pool_size=self._pool.maxsize,
- max_overflow=self._max_overflow,
- timeout=self._timeout,
- recycle=self._recycle, echo=self.echo,
- logging_name=self._orig_logging_name,
- use_threadlocal=self._use_threadlocal,
- _dispatch=self.dispatch)
+ threading.Lock() or DummyLock()
def _do_return_conn(self, conn):
try:
self._pool.put(conn, False)
except sqla_queue.Full:
conn.close()
- if self._overflow_lock is None:
+ self._overflow_lock.acquire()
+ try:
self._overflow -= 1
- else:
- self._overflow_lock.acquire()
- try:
- self._overflow -= 1
- finally:
- self._overflow_lock.release()
+ finally:
+ self._overflow_lock.release()
def _do_get(self):
try:
wait = self._max_overflow > -1 and \
self._overflow >= self._max_overflow
return self._pool.get(wait, self._timeout)
+ except sqla_queue.SAAbort, aborted:
+ return aborted.context._do_get()
except sqla_queue.Empty:
if self._max_overflow > -1 and \
self._overflow >= self._max_overflow:
@@ -730,22 +735,27 @@ class QueuePool(Pool):
"connection timed out, timeout %d" %
(self.size(), self.overflow(), self._timeout))
- if self._overflow_lock is not None:
- self._overflow_lock.acquire()
-
- if self._max_overflow > -1 and \
- self._overflow >= self._max_overflow:
- if self._overflow_lock is not None:
- self._overflow_lock.release()
- return self._do_get()
-
+ self._overflow_lock.acquire()
try:
- con = self._create_connection()
- self._overflow += 1
+ if self._max_overflow > -1 and \
+ self._overflow >= self._max_overflow:
+ return self._do_get()
+ else:
+ con = self._create_connection()
+ self._overflow += 1
+ return con
finally:
- if self._overflow_lock is not None:
- self._overflow_lock.release()
- return con
+ self._overflow_lock.release()
+
+ def recreate(self):
+ self.logger.info("Pool recreating")
+ return self.__class__(self._creator, pool_size=self._pool.maxsize,
+ max_overflow=self._max_overflow,
+ timeout=self._timeout,
+ recycle=self._recycle, echo=self.echo,
+ logging_name=self._orig_logging_name,
+ use_threadlocal=self._use_threadlocal,
+ _dispatch=self.dispatch)
def dispose(self):
while True:
@@ -758,6 +768,12 @@ class QueuePool(Pool):
self._overflow = 0 - self.size()
self.logger.info("Pool disposed. %s", self.status())
+ def _replace(self):
+ self.dispose()
+ np = self.recreate()
+ self._pool.abort(np)
+ return np
+
def status(self):
return "Pool size: %d Connections in pool: %d "\
"Current Overflow: %d Current Checked out "\
diff --git a/lib/sqlalchemy/util/queue.py b/lib/sqlalchemy/util/queue.py
index e71ceb458..ebf736331 100644
--- a/lib/sqlalchemy/util/queue.py
+++ b/lib/sqlalchemy/util/queue.py
@@ -5,19 +5,28 @@
# the MIT License: http://www.opensource.org/licenses/mit-license.php
"""An adaptation of Py2.3/2.4's Queue module which supports reentrant
-behavior, using RLock instead of Lock for its mutex object.
+behavior, using RLock instead of Lock for its mutex object. The
+Queue object is used exclusively by the sqlalchemy.pool.QueuePool
+class.
This is to support the connection pool's usage of weakref callbacks to return
connections to the underlying Queue, which can in extremely
rare cases be invoked within the ``get()`` method of the Queue itself,
producing a ``put()`` inside the ``get()`` and therefore a reentrant
-condition."""
+condition.
+
+An additional change includes a special "abort" method which can be used
+to immediately raise a special exception for threads that are blocking
+on get(). This is to accommodate a rare race condition that can occur
+within QueuePool.
+
+"""
from collections import deque
from time import time as _time
from sqlalchemy.util import threading
-__all__ = ['Empty', 'Full', 'Queue']
+__all__ = ['Empty', 'Full', 'Queue', 'SAAbort']
class Empty(Exception):
"Exception raised by Queue.get(block=0)/get_nowait()."
@@ -29,6 +38,11 @@ class Full(Exception):
pass
+class SAAbort(Exception):
+ "Special SQLA exception to abort waiting"
+ def __init__(self, context):
+ self.context = context
+
class Queue:
def __init__(self, maxsize=0):
"""Initialize a queue object with a given maximum size.
@@ -49,6 +63,9 @@ class Queue:
# a thread waiting to put is notified then.
self.not_full = threading.Condition(self.mutex)
+ # when this is set, SAAbort is raised within get().
+ self._sqla_abort_context = False
+
def qsize(self):
"""Return the approximate size of the queue (not reliable!)."""
@@ -138,6 +155,8 @@ class Queue:
elif timeout is None:
while self._empty():
self.not_empty.wait()
+ if self._sqla_abort_context:
+ raise SAAbort(self._sqla_abort_context)
else:
if timeout < 0:
raise ValueError("'timeout' must be a positive number")
@@ -147,12 +166,27 @@ class Queue:
if remaining <= 0.0:
raise Empty
self.not_empty.wait(remaining)
+ if self._sqla_abort_context:
+ raise SAAbort(self._sqla_abort_context)
item = self._get()
self.not_full.notify()
return item
finally:
self.not_empty.release()
+ def abort(self, context):
+ """Issue an 'abort', will force any thread waiting on get()
+ to stop waiting and raise SAAbort.
+
+ """
+ self._sqla_abort_context = context
+ if not self.not_full.acquire(False):
+ return
+ try:
+ self.not_empty.notify()
+ finally:
+ self.not_full.release()
+
def get_nowait(self):
"""Remove and return an item from the queue without blocking.
diff --git a/test/engine/test_pool.py b/test/engine/test_pool.py
index 789324445..4d5572891 100644
--- a/test/engine/test_pool.py
+++ b/test/engine/test_pool.py
@@ -802,7 +802,70 @@ class QueuePoolTest(PoolTestBase):
lazy_gc()
assert not pool._refs
+
+ def test_waiters_handled(self):
+ """test that threads waiting for connections are
+ handled when the pool is replaced.
+
+ """
+ dbapi = MockDBAPI()
+ def creator():
+ return dbapi.connect()
+ success = []
+ for timeout in (None, 30):
+ for max_overflow in (-1, 0, 3):
+ p = pool.QueuePool(creator=creator,
+ pool_size=2, timeout=timeout,
+ max_overflow=max_overflow)
+ def waiter(p):
+ conn = p.connect()
+ time.sleep(.5)
+ success.append(True)
+ conn.close()
+
+ time.sleep(.2)
+ c1 = p.connect()
+ c2 = p.connect()
+
+ t = threading.Thread(target=waiter, args=(p, ))
+ t.setDaemon(True) # so the tests dont hang if this fails
+ t.start()
+
+ c1.invalidate()
+ c2.invalidate()
+ p2 = p._replace()
+ time.sleep(1)
+ eq_(len(success), 6)
+
+ def test_dispose_closes_pooled(self):
+ dbapi = MockDBAPI()
+ def creator():
+ return dbapi.connect()
+
+ p = pool.QueuePool(creator=creator,
+ pool_size=2, timeout=None,
+ max_overflow=0)
+ c1 = p.connect()
+ c2 = p.connect()
+ conns = [c1.connection, c2.connection]
+ c1.close()
+ eq_([c.closed for c in conns], [False, False])
+ p.dispose()
+ eq_([c.closed for c in conns], [True, False])
+
+ # currently, if a ConnectionFairy is closed
+ # after the pool has been disposed, there's no
+ # flag that states it should be invalidated
+ # immediately - it just gets returned to the
+ # pool normally...
+ c2.close()
+ eq_([c.closed for c in conns], [True, False])
+
+ # ...and that's the one we'll get back next.
+ c3 = p.connect()
+ assert c3.connection is conns[1]
+
def test_no_overflow(self):
self._test_overflow(40, 0)