diff options
Diffstat (limited to 'lib/sqlalchemy/pool.py')
| -rw-r--r-- | lib/sqlalchemy/pool.py | 269 |
1 files changed, 202 insertions, 67 deletions
diff --git a/lib/sqlalchemy/pool.py b/lib/sqlalchemy/pool.py index 498b001c1..f84f331d5 100644 --- a/lib/sqlalchemy/pool.py +++ b/lib/sqlalchemy/pool.py @@ -1,5 +1,5 @@ # sqlalchemy/pool.py -# Copyright (C) 2005-2013 the SQLAlchemy authors and contributors <see AUTHORS file> +# Copyright (C) 2005-2014 the SQLAlchemy authors and contributors <see AUTHORS file> # # This module is part of SQLAlchemy and is released under # the MIT License: http://www.opensource.org/licenses/mit-license.php @@ -20,7 +20,7 @@ import time import traceback import weakref -from . import exc, log, event, events, interfaces, util +from . import exc, log, event, interfaces, util from .util import queue as sqla_queue from .util import threading, memoized_property, \ chop_traceback @@ -130,10 +130,10 @@ class Pool(log.Identified): :meth:`unique_connection` method is provided to bypass the threadlocal behavior installed into :meth:`connect`. - :param reset_on_return: If true, reset the database state of - connections returned to the pool. This is typically a - ROLLBACK to release locks and transaction resources. - Disable at your own peril. Defaults to True. + :param reset_on_return: Configures the action to take + on connections as they are returned to the pool. + See the argument description in :class:`.QueuePool` for + more detail. :param events: a list of 2-tuples, each of the form ``(callable, target)`` which will be passed to event.listen() @@ -185,8 +185,6 @@ class Pool(log.Identified): for l in listeners: self.add_listener(l) - dispatch = event.dispatcher(events.PoolEvents) - def _close_connection(self, connection): self.logger.debug("Closing connection %r", connection) try: @@ -218,7 +216,7 @@ class Pool(log.Identified): """ - return _ConnectionFairy.checkout(self) + return _ConnectionFairy._checkout(self) def _create_connection(self): """Called by subclasses to create a new ConnectionRecord.""" @@ -270,16 +268,17 @@ class Pool(log.Identified): """ if not self._use_threadlocal: - return _ConnectionFairy.checkout(self) + return _ConnectionFairy._checkout(self) try: rec = self._threadconns.current() except AttributeError: pass else: - return rec.checkout_existing() + if rec is not None: + return rec._checkout_existing() - return _ConnectionFairy.checkout(self, self._threadconns) + return _ConnectionFairy._checkout(self, self._threadconns) def _return_conn(self, record): """Given a _ConnectionRecord, return it to the :class:`.Pool`. @@ -310,6 +309,34 @@ class Pool(log.Identified): class _ConnectionRecord(object): + """Internal object which maintains an individual DBAPI connection + referenced by a :class:`.Pool`. + + The :class:`._ConnectionRecord` object always exists for any particular + DBAPI connection whether or not that DBAPI connection has been + "checked out". This is in contrast to the :class:`._ConnectionFairy` + which is only a public facade to the DBAPI connection while it is checked + out. + + A :class:`._ConnectionRecord` may exist for a span longer than that + of a single DBAPI connection. For example, if the + :meth:`._ConnectionRecord.invalidate` + method is called, the DBAPI connection associated with this + :class:`._ConnectionRecord` + will be discarded, but the :class:`._ConnectionRecord` may be used again, + in which case a new DBAPI connection is produced when the :class:`.Pool` + next uses this record. + + The :class:`._ConnectionRecord` is delivered along with connection + pool events, including :meth:`.PoolEvents.connect` and + :meth:`.PoolEvents.checkout`, however :class:`._ConnectionRecord` still + remains an internal object whose API and internals may change. + + .. seealso:: + + :class:`._ConnectionFairy` + + """ def __init__(self, pool): self.__pool = pool @@ -321,8 +348,23 @@ class _ConnectionRecord(object): exec_once(self.connection, self) pool.dispatch.connect(self.connection, self) + connection = None + """A reference to the actual DBAPI connection being tracked. + + May be ``None`` if this :class:`._ConnectionRecord` has been marked + as invalidated; a new DBAPI connection may replace it if the owning + pool calls upon this :class:`._ConnectionRecord` to reconnect. + + """ + @util.memoized_property def info(self): + """The ``.info`` dictionary associated with the DBAPI connection. + + This dictionary is shared among the :attr:`._ConnectionFairy.info` + and :attr:`.Connection.info` accessors. + + """ return {} @classmethod @@ -361,9 +403,22 @@ class _ConnectionRecord(object): def close(self): if self.connection is not None: - self.__pool._close_connection(self.connection) + self.__close() def invalidate(self, e=None): + """Invalidate the DBAPI connection held by this :class:`._ConnectionRecord`. + + This method is called for all connection invalidations, including + when the :meth:`._ConnectionFairy.invalidate` or :meth:`.Connection.invalidate` + methods are called, as well as when any so-called "automatic invalidation" + condition occurs. + + .. seealso:: + + :ref:`pool_connection_invalidation` + + """ + self.__pool.dispatch.invalidate(self.connection, self, e) if e is not None: self.__pool.logger.info( "Invalidate connection %r (reason: %s:%s)", @@ -424,18 +479,8 @@ def _finalize_fairy(connection, connection_record, pool, ref, echo, fairy=None): try: fairy = fairy or _ConnectionFairy(connection, connection_record) - if pool.dispatch.reset: - pool.dispatch.reset(fairy, connection_record) - if pool._reset_on_return is reset_rollback: - if echo: - pool.logger.debug("Connection %s rollback-on-return", - connection) - pool._dialect.do_rollback(fairy) - elif pool._reset_on_return is reset_commit: - if echo: - pool.logger.debug("Connection %s commit-on-return", - connection) - pool._dialect.do_commit(fairy) + assert fairy.connection is connection + fairy._reset(pool, echo) # Immediately close detached instances if not connection_record: @@ -454,15 +499,58 @@ _refs = set() class _ConnectionFairy(object): - """Proxies a DB-API connection and provides return-on-dereference - support.""" + """Proxies a DBAPI connection and provides return-on-dereference + support. + + This is an internal object used by the :class:`.Pool` implementation + to provide context management to a DBAPI connection delivered by + that :class:`.Pool`. + + The name "fairy" is inspired by the fact that the :class:`._ConnectionFairy` + object's lifespan is transitory, as it lasts only for the length of a + specific DBAPI connection being checked out from the pool, and additionally + that as a transparent proxy, it is mostly invisible. + + .. seealso:: + + :class:`._ConnectionRecord` + + """ def __init__(self, dbapi_connection, connection_record): self.connection = dbapi_connection self._connection_record = connection_record + connection = None + """A reference to the actual DBAPI connection being tracked.""" + + _connection_record = None + """A reference to the :class:`._ConnectionRecord` object associated + with the DBAPI connection. + + This is currently an internal accessor which is subject to change. + + """ + + _reset_agent = None + """Refer to an object with a ``.commit()`` and ``.rollback()`` method; + if non-None, the "reset-on-return" feature will call upon this object + rather than directly against the dialect-level do_rollback() and do_commit() + methods. + + In practice, a :class:`.Connection` assigns a :class:`.Transaction` object + to this variable when one is in scope so that the :class:`.Transaction` + takes the job of committing or rolling back on return if + :meth:`.Connection.close` is called while the :class:`.Transaction` + still exists. + + This is essentially an "event handler" of sorts but is simplified as an + instance variable both for performance/simplicity as well as that there + can only be one "reset agent" at a time. + """ + @classmethod - def checkout(cls, pool, threadconns=None, fairy=None): + def _checkout(cls, pool, threadconns=None, fairy=None): if not fairy: fairy = _ConnectionRecord.checkout(pool) @@ -499,16 +587,40 @@ class _ConnectionFairy(object): fairy.invalidate() raise exc.InvalidRequestError("This connection is closed") - def checkout_existing(self): - return _ConnectionFairy.checkout(self._pool, fairy=self) + def _checkout_existing(self): + return _ConnectionFairy._checkout(self._pool, fairy=self) - def checkin(self): + def _checkin(self): _finalize_fairy(self.connection, self._connection_record, self._pool, None, self._echo, fairy=self) self.connection = None self._connection_record = None - _close = checkin + _close = _checkin + + def _reset(self, pool, echo): + if pool.dispatch.reset: + pool.dispatch.reset(self, self._connection_record) + if pool._reset_on_return is reset_rollback: + if echo: + pool.logger.debug("Connection %s rollback-on-return%s", + self.connection, + ", via agent" + if self._reset_agent else "") + if self._reset_agent: + self._reset_agent.rollback() + else: + pool._dialect.do_rollback(self) + elif pool._reset_on_return is reset_commit: + if echo: + pool.logger.debug("Connection %s commit-on-return%s", + self.connection, + ", via agent" + if self._reset_agent else "") + if self._reset_agent: + self._reset_agent.commit() + else: + pool._dialect.do_commit(self) @property def _logger(self): @@ -516,6 +628,9 @@ class _ConnectionFairy(object): @property def is_valid(self): + """Return True if this :class:`._ConnectionFairy` still refers + to an active DBAPI connection.""" + return self.connection is not None @util.memoized_property @@ -526,7 +641,9 @@ class _ConnectionFairy(object): The data here will follow along with the DBAPI connection including after it is returned to the connection pool and used again - in subsequent instances of :class:`.ConnectionFairy`. + in subsequent instances of :class:`._ConnectionFairy`. It is shared + with the :attr:`._ConnectionRecord.info` and :attr:`.Connection.info` + accessors. """ return self._connection_record.info @@ -534,8 +651,16 @@ class _ConnectionFairy(object): def invalidate(self, e=None): """Mark this connection as invalidated. - The connection will be immediately closed. The containing - ConnectionRecord will create a new connection when next used. + This method can be called directly, and is also called as a result + of the :meth:`.Connection.invalidate` method. When invoked, + the DBAPI connection is immediately closed and discarded from + further use by the pool. The invalidation mechanism proceeds + via the :meth:`._ConnectionRecord.invalidate` internal method. + + .. seealso:: + + :ref:`pool_connection_invalidation` + """ if self.connection is None: @@ -543,9 +668,15 @@ class _ConnectionFairy(object): if self._connection_record: self._connection_record.invalidate(e=e) self.connection = None - self.checkin() + self._checkin() def cursor(self, *args, **kwargs): + """Return a new DBAPI cursor for the underlying connection. + + This method is a proxy for the ``connection.cursor()`` DBAPI + method. + + """ return self.connection.cursor(*args, **kwargs) def __getattr__(self, key): @@ -577,7 +708,7 @@ class _ConnectionFairy(object): def close(self): self._counter -= 1 if self._counter == 0: - self.checkin() + self._checkin() @@ -659,15 +790,6 @@ class SingletonThreadPool(Pool): return c -class DummyLock(object): - - def acquire(self, wait=True): - return True - - def release(self): - pass - - class QueuePool(Pool): """A :class:`.Pool` that imposes a limit on the number of open connections. @@ -775,30 +897,27 @@ class QueuePool(Pool): self._overflow = 0 - pool_size self._max_overflow = max_overflow self._timeout = timeout - self._overflow_lock = threading.Lock() if self._max_overflow > -1 \ - else DummyLock() + self._overflow_lock = threading.Lock() def _do_return_conn(self, conn): try: self._pool.put(conn, False) except sqla_queue.Full: - conn.close() - self._overflow_lock.acquire() try: - self._overflow -= 1 + conn.close() finally: - self._overflow_lock.release() + self._dec_overflow() def _do_get(self): + use_overflow = self._max_overflow > -1 + try: - wait = self._max_overflow > -1 and \ - self._overflow >= self._max_overflow + wait = use_overflow and self._overflow >= self._max_overflow return self._pool.get(wait, self._timeout) except sqla_queue.SAAbort as aborted: return aborted.context._do_get() except sqla_queue.Empty: - if self._max_overflow > -1 and \ - self._overflow >= self._max_overflow: + if use_overflow and self._overflow >= self._max_overflow: if not wait: return self._do_get() else: @@ -807,17 +926,33 @@ class QueuePool(Pool): "connection timed out, timeout %d" % (self.size(), self.overflow(), self._timeout)) - self._overflow_lock.acquire() - try: - if self._max_overflow > -1 and \ - self._overflow >= self._max_overflow: - return self._do_get() - else: - con = self._create_connection() - self._overflow += 1 - return con - finally: - self._overflow_lock.release() + if self._inc_overflow(): + try: + return self._create_connection() + except: + self._dec_overflow() + raise + else: + return self._do_get() + + def _inc_overflow(self): + if self._max_overflow == -1: + self._overflow += 1 + return True + with self._overflow_lock: + if self._overflow < self._max_overflow: + self._overflow += 1 + return True + else: + return False + + def _dec_overflow(self): + if self._max_overflow == -1: + self._overflow -= 1 + return True + with self._overflow_lock: + self._overflow -= 1 + return True def recreate(self): self.logger.info("Pool recreating") |
