summaryrefslogtreecommitdiff
path: root/lib
diff options
context:
space:
mode:
authorMike Bayer <mike_mp@zzzcomputing.com>2014-03-22 18:45:39 -0400
committerMike Bayer <mike_mp@zzzcomputing.com>2014-03-22 18:45:39 -0400
commiteed9cfc3ae027f21a1f46a6e07fcef0724741eb2 (patch)
tree5cffcebebd3e7561fd4102709910d6e194f21830 /lib
parentbe3c185fd48c2abcc5d9f54dd0c415e15c33184f (diff)
downloadsqlalchemy-eed9cfc3ae027f21a1f46a6e07fcef0724741eb2.tar.gz
- A major improvement made to the mechanics by which the :class:`.Engine`
recycles the connection pool when a "disconnect" condition is detected; instead of discarding the pool and explicitly closing out connections, the pool is retained and a "generational" timestamp is updated to reflect the current time, thereby causing all existing connections to be recycled when they are next checked out. This greatly simplifies the recycle process, removes the need for "waking up" connect attempts waiting on the old pool and eliminates the race condition that many immediately-discarded "pool" objects could be created during the recycle operation. fixes #2985
Diffstat (limited to 'lib')
-rw-r--r--lib/sqlalchemy/engine/base.py6
-rw-r--r--lib/sqlalchemy/orm/strategies.py1
-rw-r--r--lib/sqlalchemy/pool.py46
-rw-r--r--lib/sqlalchemy/util/queue.py41
4 files changed, 31 insertions, 63 deletions
diff --git a/lib/sqlalchemy/engine/base.py b/lib/sqlalchemy/engine/base.py
index d3024640b..2cad2a094 100644
--- a/lib/sqlalchemy/engine/base.py
+++ b/lib/sqlalchemy/engine/base.py
@@ -1091,9 +1091,7 @@ class Connection(Connectable):
del self._is_disconnect
dbapi_conn_wrapper = self.connection
self.invalidate(e)
- if not hasattr(dbapi_conn_wrapper, '_pool') or \
- dbapi_conn_wrapper._pool is self.engine.pool:
- self.engine.dispose()
+ self.engine.pool._invalidate(dbapi_conn_wrapper)
if self.should_close_with_result:
self.close()
@@ -1503,7 +1501,7 @@ class Engine(Connectable, log.Identified):
the engine are not affected.
"""
- self.pool = self.pool._replace()
+ self.pool.dispose()
def _execute_default(self, default):
with self.contextual_connect() as conn:
diff --git a/lib/sqlalchemy/orm/strategies.py b/lib/sqlalchemy/orm/strategies.py
index 473b665c8..4a07e7856 100644
--- a/lib/sqlalchemy/orm/strategies.py
+++ b/lib/sqlalchemy/orm/strategies.py
@@ -528,7 +528,6 @@ class LazyLoader(AbstractRelationshipLoader):
def _emit_lazyload(self, strategy_options, session, state, ident_key, passive):
q = session.query(self.mapper)._adapt_all_clauses()
-
if self.parent_property.secondary is not None:
q = q.select_from(self.mapper, self.parent_property.secondary)
diff --git a/lib/sqlalchemy/pool.py b/lib/sqlalchemy/pool.py
index 59c1e614a..7fc4fc659 100644
--- a/lib/sqlalchemy/pool.py
+++ b/lib/sqlalchemy/pool.py
@@ -210,6 +210,7 @@ class Pool(log.Identified):
self._threadconns = threading.local()
self._creator = creator
self._recycle = recycle
+ self._invalidate_time = 0
self._use_threadlocal = use_threadlocal
if reset_on_return in ('rollback', True, reset_rollback):
self._reset_on_return = reset_rollback
@@ -276,6 +277,22 @@ class Pool(log.Identified):
return _ConnectionRecord(self)
+ def _invalidate(self, connection):
+ """Mark all connections established within the generation
+ of the given connection as invalidated.
+
+ If this pool's last invalidate time is before when the given
+ connection was created, update the timestamp til now. Otherwise,
+ no action is performed.
+
+ Connections with a start time prior to this pool's invalidation
+ time will be recycled upon next checkout.
+ """
+ rec = getattr(connection, "_connection_record", None)
+ if not rec or self._invalidate_time < rec.starttime:
+ self._invalidate_time = time.time()
+
+
def recreate(self):
"""Return a new :class:`.Pool`, of the same class as this one
and configured with identical creation arguments.
@@ -301,17 +318,6 @@ class Pool(log.Identified):
raise NotImplementedError()
- def _replace(self):
- """Dispose + recreate this pool.
-
- Subclasses may employ special logic to
- move threads waiting on this pool to the
- new one.
-
- """
- self.dispose()
- return self.recreate()
-
def connect(self):
"""Return a DBAPI connection from the pool.
@@ -483,6 +489,7 @@ class _ConnectionRecord(object):
self.connection = None
def get_connection(self):
+ recycle = False
if self.connection is None:
self.connection = self.__connect()
self.info.clear()
@@ -493,6 +500,15 @@ class _ConnectionRecord(object):
self.__pool.logger.info(
"Connection %r exceeded timeout; recycling",
self.connection)
+ recycle = True
+ elif self.__pool._invalidate_time > self.starttime:
+ self.__pool.logger.info(
+ "Connection %r invalidated due to pool invalidation; recycling",
+ self.connection
+ )
+ recycle = True
+
+ if recycle:
self.__close()
self.connection = self.__connect()
self.info.clear()
@@ -911,8 +927,6 @@ class QueuePool(Pool):
try:
wait = use_overflow and self._overflow >= self._max_overflow
return self._pool.get(wait, self._timeout)
- except sqla_queue.SAAbort as aborted:
- return aborted.context._do_get()
except sqla_queue.Empty:
if use_overflow and self._overflow >= self._max_overflow:
if not wait:
@@ -974,12 +988,6 @@ class QueuePool(Pool):
self._overflow = 0 - self.size()
self.logger.info("Pool disposed. %s", self.status())
- def _replace(self):
- self.dispose()
- np = self.recreate()
- self._pool.abort(np)
- return np
-
def status(self):
return "Pool size: %d Connections in pool: %d "\
"Current Overflow: %d Current Checked out "\
diff --git a/lib/sqlalchemy/util/queue.py b/lib/sqlalchemy/util/queue.py
index 82ff55a5d..c98aa7fda 100644
--- a/lib/sqlalchemy/util/queue.py
+++ b/lib/sqlalchemy/util/queue.py
@@ -15,11 +15,6 @@ rare cases be invoked within the ``get()`` method of the Queue itself,
producing a ``put()`` inside the ``get()`` and therefore a reentrant
condition.
-An additional change includes a special "abort" method which can be used
-to immediately raise a special exception for threads that are blocking
-on get(). This is to accommodate a rare race condition that can occur
-within QueuePool.
-
"""
from collections import deque
@@ -27,7 +22,7 @@ from time import time as _time
from .compat import threading
-__all__ = ['Empty', 'Full', 'Queue', 'SAAbort']
+__all__ = ['Empty', 'Full', 'Queue']
class Empty(Exception):
@@ -42,12 +37,6 @@ class Full(Exception):
pass
-class SAAbort(Exception):
- "Special SQLA exception to abort waiting"
- def __init__(self, context):
- self.context = context
-
-
class Queue:
def __init__(self, maxsize=0):
"""Initialize a queue object with a given maximum size.
@@ -68,8 +57,6 @@ class Queue:
# a thread waiting to put is notified then.
self.not_full = threading.Condition(self.mutex)
- # when this is set, SAAbort is raised within get().
- self._sqla_abort_context = False
def qsize(self):
"""Return the approximate size of the queue (not reliable!)."""
@@ -158,13 +145,7 @@ class Queue:
raise Empty
elif timeout is None:
while self._empty():
- # wait for only half a second, then
- # loop around, so that we can see a change in
- # _sqla_abort_context in case we missed the notify_all()
- # called by abort()
- self.not_empty.wait(.5)
- if self._sqla_abort_context:
- raise SAAbort(self._sqla_abort_context)
+ self.not_empty.wait()
else:
if timeout < 0:
raise ValueError("'timeout' must be a positive number")
@@ -174,30 +155,12 @@ class Queue:
if remaining <= 0.0:
raise Empty
self.not_empty.wait(remaining)
- if self._sqla_abort_context:
- raise SAAbort(self._sqla_abort_context)
item = self._get()
self.not_full.notify()
return item
finally:
self.not_empty.release()
- def abort(self, context):
- """Issue an 'abort', will force any thread waiting on get()
- to stop waiting and raise SAAbort.
-
- """
- self._sqla_abort_context = context
- if not self.not_full.acquire(False):
- return
- try:
- # note that this is now optional
- # as the waiters in get() both loop around
- # to check the _sqla_abort_context flag periodically
- self.not_empty.notify_all()
- finally:
- self.not_full.release()
-
def get_nowait(self):
"""Remove and return an item from the queue without blocking.