diff options
-rw-r--r-- | doc/build/changelog/changelog_08.rst | 11 | ||||
-rw-r--r-- | lib/sqlalchemy/pool.py | 64 | ||||
-rw-r--r-- | test/engine/test_pool.py | 82 |
3 files changed, 126 insertions, 31 deletions
diff --git a/doc/build/changelog/changelog_08.rst b/doc/build/changelog/changelog_08.rst index c5f882f7f..4c9cc85d9 100644 --- a/doc/build/changelog/changelog_08.rst +++ b/doc/build/changelog/changelog_08.rst @@ -14,6 +14,17 @@ .. change:: :tags: bug, engine, pool :versions: 0.9.0b2 + :tickets: 2880 + + The :class:`.QueuePool` has been enhanced to not block new connection + attempts when an existing connection attempt is blocking. Previously, + the production of new connections was serialized within the block + that monitored overflow; the overflow counter is now altered within + it's own critical section outside of the connection process itself. + + .. change:: + :tags: bug, engine, pool + :versions: 0.9.0b2 :tickets: 2522 Made a slight adjustment to the logic which waits for a pooled diff --git a/lib/sqlalchemy/pool.py b/lib/sqlalchemy/pool.py index 34681ef46..20b61746d 100644 --- a/lib/sqlalchemy/pool.py +++ b/lib/sqlalchemy/pool.py @@ -658,15 +658,6 @@ class SingletonThreadPool(Pool): return c -class DummyLock(object): - - def acquire(self, wait=True): - return True - - def release(self): - pass - - class QueuePool(Pool): """A :class:`.Pool` that imposes a limit on the number of open connections. @@ -774,30 +765,25 @@ class QueuePool(Pool): self._overflow = 0 - pool_size self._max_overflow = max_overflow self._timeout = timeout - self._overflow_lock = threading.Lock() if self._max_overflow > -1 \ - else DummyLock() + self._overflow_lock = threading.Lock() def _do_return_conn(self, conn): try: self._pool.put(conn, False) except sqla_queue.Full: + self._dec_overflow() conn.close() - self._overflow_lock.acquire() - try: - self._overflow -= 1 - finally: - self._overflow_lock.release() def _do_get(self): + use_overflow = self._max_overflow > -1 + try: - wait = self._max_overflow > -1 and \ - self._overflow >= self._max_overflow + wait = use_overflow and self._overflow >= self._max_overflow return self._pool.get(wait, self._timeout) except sqla_queue.SAAbort as aborted: return aborted.context._do_get() except sqla_queue.Empty: - if self._max_overflow > -1 and \ - self._overflow >= self._max_overflow: + if use_overflow and self._overflow >= self._max_overflow: if not wait: return self._do_get() else: @@ -806,17 +792,33 @@ class QueuePool(Pool): "connection timed out, timeout %d" % (self.size(), self.overflow(), self._timeout)) - self._overflow_lock.acquire() - try: - if self._max_overflow > -1 and \ - self._overflow >= self._max_overflow: - return self._do_get() - else: - con = self._create_connection() - self._overflow += 1 - return con - finally: - self._overflow_lock.release() + if self._inc_overflow(): + try: + return self._create_connection() + except: + self._dec_overflow() + raise + else: + return self._do_get() + + def _inc_overflow(self): + if self._max_overflow == -1: + self._overflow += 1 + return True + with self._overflow_lock: + if self._overflow < self._max_overflow: + self._overflow += 1 + return True + else: + return False + + def _dec_overflow(self): + if self._max_overflow == -1: + self._overflow -= 1 + return True + with self._overflow_lock: + self._overflow -= 1 + return True def recreate(self): self.logger.info("Pool recreating") diff --git a/test/engine/test_pool.py b/test/engine/test_pool.py index 3f05f661a..eb70bdf7f 100644 --- a/test/engine/test_pool.py +++ b/test/engine/test_pool.py @@ -873,6 +873,88 @@ class QueuePoolTest(PoolTestBase): lazy_gc() assert not pool._refs + + def test_overflow_reset_on_failed_connect(self): + dbapi = Mock() + + def failing_dbapi(): + time.sleep(2) + raise Exception("connection failed") + + creator = dbapi.connect + def create(): + return creator() + + p = pool.QueuePool(creator=create, pool_size=2, max_overflow=3) + c1 = p.connect() + c2 = p.connect() + c3 = p.connect() + eq_(p._overflow, 1) + creator = failing_dbapi + assert_raises(Exception, p.connect) + eq_(p._overflow, 1) + + @testing.requires.threading_with_mock + def test_hanging_connect_within_overflow(self): + """test that a single connect() call which is hanging + does not block other connections from proceeding.""" + + dbapi = Mock() + mutex = threading.Lock() + + def hanging_dbapi(): + time.sleep(2) + with mutex: + return dbapi.connect() + + def fast_dbapi(): + with mutex: + return dbapi.connect() + + creator = threading.local() + + def create(): + return creator.mock_connector() + + def run_test(name, pool, should_hang): + if should_hang: + creator.mock_connector = hanging_dbapi + else: + creator.mock_connector = fast_dbapi + + conn = pool.connect() + conn.operation(name) + time.sleep(1) + conn.close() + + p = pool.QueuePool(creator=create, pool_size=2, max_overflow=3) + + threads = [ + threading.Thread( + target=run_test, args=("success_one", p, False)), + threading.Thread( + target=run_test, args=("success_two", p, False)), + threading.Thread( + target=run_test, args=("overflow_one", p, True)), + threading.Thread( + target=run_test, args=("overflow_two", p, False)), + threading.Thread( + target=run_test, args=("overflow_three", p, False)) + ] + for t in threads: + t.start() + time.sleep(.2) + + for t in threads: + t.join(timeout=join_timeout) + eq_( + dbapi.connect().operation.mock_calls, + [call("success_one"), call("success_two"), + call("overflow_two"), call("overflow_three"), + call("overflow_one")] + ) + + @testing.requires.threading_with_mock def test_waiters_handled(self): """test that threads waiting for connections are |