diff options
Diffstat (limited to 'lib/sqlalchemy')
-rw-r--r-- | lib/sqlalchemy/engine/base.py | 6 | ||||
-rw-r--r-- | lib/sqlalchemy/orm/strategies.py | 1 | ||||
-rw-r--r-- | lib/sqlalchemy/pool.py | 46 | ||||
-rw-r--r-- | lib/sqlalchemy/util/queue.py | 41 |
4 files changed, 31 insertions, 63 deletions
diff --git a/lib/sqlalchemy/engine/base.py b/lib/sqlalchemy/engine/base.py index d3024640b..2cad2a094 100644 --- a/lib/sqlalchemy/engine/base.py +++ b/lib/sqlalchemy/engine/base.py @@ -1091,9 +1091,7 @@ class Connection(Connectable): del self._is_disconnect dbapi_conn_wrapper = self.connection self.invalidate(e) - if not hasattr(dbapi_conn_wrapper, '_pool') or \ - dbapi_conn_wrapper._pool is self.engine.pool: - self.engine.dispose() + self.engine.pool._invalidate(dbapi_conn_wrapper) if self.should_close_with_result: self.close() @@ -1503,7 +1501,7 @@ class Engine(Connectable, log.Identified): the engine are not affected. """ - self.pool = self.pool._replace() + self.pool.dispose() def _execute_default(self, default): with self.contextual_connect() as conn: diff --git a/lib/sqlalchemy/orm/strategies.py b/lib/sqlalchemy/orm/strategies.py index 473b665c8..4a07e7856 100644 --- a/lib/sqlalchemy/orm/strategies.py +++ b/lib/sqlalchemy/orm/strategies.py @@ -528,7 +528,6 @@ class LazyLoader(AbstractRelationshipLoader): def _emit_lazyload(self, strategy_options, session, state, ident_key, passive): q = session.query(self.mapper)._adapt_all_clauses() - if self.parent_property.secondary is not None: q = q.select_from(self.mapper, self.parent_property.secondary) diff --git a/lib/sqlalchemy/pool.py b/lib/sqlalchemy/pool.py index 59c1e614a..7fc4fc659 100644 --- a/lib/sqlalchemy/pool.py +++ b/lib/sqlalchemy/pool.py @@ -210,6 +210,7 @@ class Pool(log.Identified): self._threadconns = threading.local() self._creator = creator self._recycle = recycle + self._invalidate_time = 0 self._use_threadlocal = use_threadlocal if reset_on_return in ('rollback', True, reset_rollback): self._reset_on_return = reset_rollback @@ -276,6 +277,22 @@ class Pool(log.Identified): return _ConnectionRecord(self) + def _invalidate(self, connection): + """Mark all connections established within the generation + of the given connection as invalidated. + + If this pool's last invalidate time is before when the given + connection was created, update the timestamp til now. Otherwise, + no action is performed. + + Connections with a start time prior to this pool's invalidation + time will be recycled upon next checkout. + """ + rec = getattr(connection, "_connection_record", None) + if not rec or self._invalidate_time < rec.starttime: + self._invalidate_time = time.time() + + def recreate(self): """Return a new :class:`.Pool`, of the same class as this one and configured with identical creation arguments. @@ -301,17 +318,6 @@ class Pool(log.Identified): raise NotImplementedError() - def _replace(self): - """Dispose + recreate this pool. - - Subclasses may employ special logic to - move threads waiting on this pool to the - new one. - - """ - self.dispose() - return self.recreate() - def connect(self): """Return a DBAPI connection from the pool. @@ -483,6 +489,7 @@ class _ConnectionRecord(object): self.connection = None def get_connection(self): + recycle = False if self.connection is None: self.connection = self.__connect() self.info.clear() @@ -493,6 +500,15 @@ class _ConnectionRecord(object): self.__pool.logger.info( "Connection %r exceeded timeout; recycling", self.connection) + recycle = True + elif self.__pool._invalidate_time > self.starttime: + self.__pool.logger.info( + "Connection %r invalidated due to pool invalidation; recycling", + self.connection + ) + recycle = True + + if recycle: self.__close() self.connection = self.__connect() self.info.clear() @@ -911,8 +927,6 @@ class QueuePool(Pool): try: wait = use_overflow and self._overflow >= self._max_overflow return self._pool.get(wait, self._timeout) - except sqla_queue.SAAbort as aborted: - return aborted.context._do_get() except sqla_queue.Empty: if use_overflow and self._overflow >= self._max_overflow: if not wait: @@ -974,12 +988,6 @@ class QueuePool(Pool): self._overflow = 0 - self.size() self.logger.info("Pool disposed. %s", self.status()) - def _replace(self): - self.dispose() - np = self.recreate() - self._pool.abort(np) - return np - def status(self): return "Pool size: %d Connections in pool: %d "\ "Current Overflow: %d Current Checked out "\ diff --git a/lib/sqlalchemy/util/queue.py b/lib/sqlalchemy/util/queue.py index 82ff55a5d..c98aa7fda 100644 --- a/lib/sqlalchemy/util/queue.py +++ b/lib/sqlalchemy/util/queue.py @@ -15,11 +15,6 @@ rare cases be invoked within the ``get()`` method of the Queue itself, producing a ``put()`` inside the ``get()`` and therefore a reentrant condition. -An additional change includes a special "abort" method which can be used -to immediately raise a special exception for threads that are blocking -on get(). This is to accommodate a rare race condition that can occur -within QueuePool. - """ from collections import deque @@ -27,7 +22,7 @@ from time import time as _time from .compat import threading -__all__ = ['Empty', 'Full', 'Queue', 'SAAbort'] +__all__ = ['Empty', 'Full', 'Queue'] class Empty(Exception): @@ -42,12 +37,6 @@ class Full(Exception): pass -class SAAbort(Exception): - "Special SQLA exception to abort waiting" - def __init__(self, context): - self.context = context - - class Queue: def __init__(self, maxsize=0): """Initialize a queue object with a given maximum size. @@ -68,8 +57,6 @@ class Queue: # a thread waiting to put is notified then. self.not_full = threading.Condition(self.mutex) - # when this is set, SAAbort is raised within get(). - self._sqla_abort_context = False def qsize(self): """Return the approximate size of the queue (not reliable!).""" @@ -158,13 +145,7 @@ class Queue: raise Empty elif timeout is None: while self._empty(): - # wait for only half a second, then - # loop around, so that we can see a change in - # _sqla_abort_context in case we missed the notify_all() - # called by abort() - self.not_empty.wait(.5) - if self._sqla_abort_context: - raise SAAbort(self._sqla_abort_context) + self.not_empty.wait() else: if timeout < 0: raise ValueError("'timeout' must be a positive number") @@ -174,30 +155,12 @@ class Queue: if remaining <= 0.0: raise Empty self.not_empty.wait(remaining) - if self._sqla_abort_context: - raise SAAbort(self._sqla_abort_context) item = self._get() self.not_full.notify() return item finally: self.not_empty.release() - def abort(self, context): - """Issue an 'abort', will force any thread waiting on get() - to stop waiting and raise SAAbort. - - """ - self._sqla_abort_context = context - if not self.not_full.acquire(False): - return - try: - # note that this is now optional - # as the waiters in get() both loop around - # to check the _sqla_abort_context flag periodically - self.not_empty.notify_all() - finally: - self.not_full.release() - def get_nowait(self): """Remove and return an item from the queue without blocking. |