summaryrefslogtreecommitdiff
path: root/lib/sqlalchemy/pool.py
diff options
context:
space:
mode:
Diffstat (limited to 'lib/sqlalchemy/pool.py')
-rw-r--r--lib/sqlalchemy/pool.py269
1 files changed, 202 insertions, 67 deletions
diff --git a/lib/sqlalchemy/pool.py b/lib/sqlalchemy/pool.py
index 498b001c1..f84f331d5 100644
--- a/lib/sqlalchemy/pool.py
+++ b/lib/sqlalchemy/pool.py
@@ -1,5 +1,5 @@
# sqlalchemy/pool.py
-# Copyright (C) 2005-2013 the SQLAlchemy authors and contributors <see AUTHORS file>
+# Copyright (C) 2005-2014 the SQLAlchemy authors and contributors <see AUTHORS file>
#
# This module is part of SQLAlchemy and is released under
# the MIT License: http://www.opensource.org/licenses/mit-license.php
@@ -20,7 +20,7 @@ import time
import traceback
import weakref
-from . import exc, log, event, events, interfaces, util
+from . import exc, log, event, interfaces, util
from .util import queue as sqla_queue
from .util import threading, memoized_property, \
chop_traceback
@@ -130,10 +130,10 @@ class Pool(log.Identified):
:meth:`unique_connection` method is provided to bypass the
threadlocal behavior installed into :meth:`connect`.
- :param reset_on_return: If true, reset the database state of
- connections returned to the pool. This is typically a
- ROLLBACK to release locks and transaction resources.
- Disable at your own peril. Defaults to True.
+ :param reset_on_return: Configures the action to take
+ on connections as they are returned to the pool.
+ See the argument description in :class:`.QueuePool` for
+ more detail.
:param events: a list of 2-tuples, each of the form
``(callable, target)`` which will be passed to event.listen()
@@ -185,8 +185,6 @@ class Pool(log.Identified):
for l in listeners:
self.add_listener(l)
- dispatch = event.dispatcher(events.PoolEvents)
-
def _close_connection(self, connection):
self.logger.debug("Closing connection %r", connection)
try:
@@ -218,7 +216,7 @@ class Pool(log.Identified):
"""
- return _ConnectionFairy.checkout(self)
+ return _ConnectionFairy._checkout(self)
def _create_connection(self):
"""Called by subclasses to create a new ConnectionRecord."""
@@ -270,16 +268,17 @@ class Pool(log.Identified):
"""
if not self._use_threadlocal:
- return _ConnectionFairy.checkout(self)
+ return _ConnectionFairy._checkout(self)
try:
rec = self._threadconns.current()
except AttributeError:
pass
else:
- return rec.checkout_existing()
+ if rec is not None:
+ return rec._checkout_existing()
- return _ConnectionFairy.checkout(self, self._threadconns)
+ return _ConnectionFairy._checkout(self, self._threadconns)
def _return_conn(self, record):
"""Given a _ConnectionRecord, return it to the :class:`.Pool`.
@@ -310,6 +309,34 @@ class Pool(log.Identified):
class _ConnectionRecord(object):
+ """Internal object which maintains an individual DBAPI connection
+ referenced by a :class:`.Pool`.
+
+ The :class:`._ConnectionRecord` object always exists for any particular
+ DBAPI connection whether or not that DBAPI connection has been
+ "checked out". This is in contrast to the :class:`._ConnectionFairy`
+ which is only a public facade to the DBAPI connection while it is checked
+ out.
+
+ A :class:`._ConnectionRecord` may exist for a span longer than that
+ of a single DBAPI connection. For example, if the
+ :meth:`._ConnectionRecord.invalidate`
+ method is called, the DBAPI connection associated with this
+ :class:`._ConnectionRecord`
+ will be discarded, but the :class:`._ConnectionRecord` may be used again,
+ in which case a new DBAPI connection is produced when the :class:`.Pool`
+ next uses this record.
+
+ The :class:`._ConnectionRecord` is delivered along with connection
+ pool events, including :meth:`.PoolEvents.connect` and
+ :meth:`.PoolEvents.checkout`, however :class:`._ConnectionRecord` still
+ remains an internal object whose API and internals may change.
+
+ .. seealso::
+
+ :class:`._ConnectionFairy`
+
+ """
def __init__(self, pool):
self.__pool = pool
@@ -321,8 +348,23 @@ class _ConnectionRecord(object):
exec_once(self.connection, self)
pool.dispatch.connect(self.connection, self)
+ connection = None
+ """A reference to the actual DBAPI connection being tracked.
+
+ May be ``None`` if this :class:`._ConnectionRecord` has been marked
+ as invalidated; a new DBAPI connection may replace it if the owning
+ pool calls upon this :class:`._ConnectionRecord` to reconnect.
+
+ """
+
@util.memoized_property
def info(self):
+ """The ``.info`` dictionary associated with the DBAPI connection.
+
+ This dictionary is shared among the :attr:`._ConnectionFairy.info`
+ and :attr:`.Connection.info` accessors.
+
+ """
return {}
@classmethod
@@ -361,9 +403,22 @@ class _ConnectionRecord(object):
def close(self):
if self.connection is not None:
- self.__pool._close_connection(self.connection)
+ self.__close()
def invalidate(self, e=None):
+ """Invalidate the DBAPI connection held by this :class:`._ConnectionRecord`.
+
+ This method is called for all connection invalidations, including
+ when the :meth:`._ConnectionFairy.invalidate` or :meth:`.Connection.invalidate`
+ methods are called, as well as when any so-called "automatic invalidation"
+ condition occurs.
+
+ .. seealso::
+
+ :ref:`pool_connection_invalidation`
+
+ """
+ self.__pool.dispatch.invalidate(self.connection, self, e)
if e is not None:
self.__pool.logger.info(
"Invalidate connection %r (reason: %s:%s)",
@@ -424,18 +479,8 @@ def _finalize_fairy(connection, connection_record, pool, ref, echo, fairy=None):
try:
fairy = fairy or _ConnectionFairy(connection, connection_record)
- if pool.dispatch.reset:
- pool.dispatch.reset(fairy, connection_record)
- if pool._reset_on_return is reset_rollback:
- if echo:
- pool.logger.debug("Connection %s rollback-on-return",
- connection)
- pool._dialect.do_rollback(fairy)
- elif pool._reset_on_return is reset_commit:
- if echo:
- pool.logger.debug("Connection %s commit-on-return",
- connection)
- pool._dialect.do_commit(fairy)
+ assert fairy.connection is connection
+ fairy._reset(pool, echo)
# Immediately close detached instances
if not connection_record:
@@ -454,15 +499,58 @@ _refs = set()
class _ConnectionFairy(object):
- """Proxies a DB-API connection and provides return-on-dereference
- support."""
+ """Proxies a DBAPI connection and provides return-on-dereference
+ support.
+
+ This is an internal object used by the :class:`.Pool` implementation
+ to provide context management to a DBAPI connection delivered by
+ that :class:`.Pool`.
+
+ The name "fairy" is inspired by the fact that the :class:`._ConnectionFairy`
+ object's lifespan is transitory, as it lasts only for the length of a
+ specific DBAPI connection being checked out from the pool, and additionally
+ that as a transparent proxy, it is mostly invisible.
+
+ .. seealso::
+
+ :class:`._ConnectionRecord`
+
+ """
def __init__(self, dbapi_connection, connection_record):
self.connection = dbapi_connection
self._connection_record = connection_record
+ connection = None
+ """A reference to the actual DBAPI connection being tracked."""
+
+ _connection_record = None
+ """A reference to the :class:`._ConnectionRecord` object associated
+ with the DBAPI connection.
+
+ This is currently an internal accessor which is subject to change.
+
+ """
+
+ _reset_agent = None
+ """Refer to an object with a ``.commit()`` and ``.rollback()`` method;
+ if non-None, the "reset-on-return" feature will call upon this object
+ rather than directly against the dialect-level do_rollback() and do_commit()
+ methods.
+
+ In practice, a :class:`.Connection` assigns a :class:`.Transaction` object
+ to this variable when one is in scope so that the :class:`.Transaction`
+ takes the job of committing or rolling back on return if
+ :meth:`.Connection.close` is called while the :class:`.Transaction`
+ still exists.
+
+ This is essentially an "event handler" of sorts but is simplified as an
+ instance variable both for performance/simplicity as well as that there
+ can only be one "reset agent" at a time.
+ """
+
@classmethod
- def checkout(cls, pool, threadconns=None, fairy=None):
+ def _checkout(cls, pool, threadconns=None, fairy=None):
if not fairy:
fairy = _ConnectionRecord.checkout(pool)
@@ -499,16 +587,40 @@ class _ConnectionFairy(object):
fairy.invalidate()
raise exc.InvalidRequestError("This connection is closed")
- def checkout_existing(self):
- return _ConnectionFairy.checkout(self._pool, fairy=self)
+ def _checkout_existing(self):
+ return _ConnectionFairy._checkout(self._pool, fairy=self)
- def checkin(self):
+ def _checkin(self):
_finalize_fairy(self.connection, self._connection_record,
self._pool, None, self._echo, fairy=self)
self.connection = None
self._connection_record = None
- _close = checkin
+ _close = _checkin
+
+ def _reset(self, pool, echo):
+ if pool.dispatch.reset:
+ pool.dispatch.reset(self, self._connection_record)
+ if pool._reset_on_return is reset_rollback:
+ if echo:
+ pool.logger.debug("Connection %s rollback-on-return%s",
+ self.connection,
+ ", via agent"
+ if self._reset_agent else "")
+ if self._reset_agent:
+ self._reset_agent.rollback()
+ else:
+ pool._dialect.do_rollback(self)
+ elif pool._reset_on_return is reset_commit:
+ if echo:
+ pool.logger.debug("Connection %s commit-on-return%s",
+ self.connection,
+ ", via agent"
+ if self._reset_agent else "")
+ if self._reset_agent:
+ self._reset_agent.commit()
+ else:
+ pool._dialect.do_commit(self)
@property
def _logger(self):
@@ -516,6 +628,9 @@ class _ConnectionFairy(object):
@property
def is_valid(self):
+ """Return True if this :class:`._ConnectionFairy` still refers
+ to an active DBAPI connection."""
+
return self.connection is not None
@util.memoized_property
@@ -526,7 +641,9 @@ class _ConnectionFairy(object):
The data here will follow along with the DBAPI connection including
after it is returned to the connection pool and used again
- in subsequent instances of :class:`.ConnectionFairy`.
+ in subsequent instances of :class:`._ConnectionFairy`. It is shared
+ with the :attr:`._ConnectionRecord.info` and :attr:`.Connection.info`
+ accessors.
"""
return self._connection_record.info
@@ -534,8 +651,16 @@ class _ConnectionFairy(object):
def invalidate(self, e=None):
"""Mark this connection as invalidated.
- The connection will be immediately closed. The containing
- ConnectionRecord will create a new connection when next used.
+ This method can be called directly, and is also called as a result
+ of the :meth:`.Connection.invalidate` method. When invoked,
+ the DBAPI connection is immediately closed and discarded from
+ further use by the pool. The invalidation mechanism proceeds
+ via the :meth:`._ConnectionRecord.invalidate` internal method.
+
+ .. seealso::
+
+ :ref:`pool_connection_invalidation`
+
"""
if self.connection is None:
@@ -543,9 +668,15 @@ class _ConnectionFairy(object):
if self._connection_record:
self._connection_record.invalidate(e=e)
self.connection = None
- self.checkin()
+ self._checkin()
def cursor(self, *args, **kwargs):
+ """Return a new DBAPI cursor for the underlying connection.
+
+ This method is a proxy for the ``connection.cursor()`` DBAPI
+ method.
+
+ """
return self.connection.cursor(*args, **kwargs)
def __getattr__(self, key):
@@ -577,7 +708,7 @@ class _ConnectionFairy(object):
def close(self):
self._counter -= 1
if self._counter == 0:
- self.checkin()
+ self._checkin()
@@ -659,15 +790,6 @@ class SingletonThreadPool(Pool):
return c
-class DummyLock(object):
-
- def acquire(self, wait=True):
- return True
-
- def release(self):
- pass
-
-
class QueuePool(Pool):
"""A :class:`.Pool` that imposes a limit on the number of open connections.
@@ -775,30 +897,27 @@ class QueuePool(Pool):
self._overflow = 0 - pool_size
self._max_overflow = max_overflow
self._timeout = timeout
- self._overflow_lock = threading.Lock() if self._max_overflow > -1 \
- else DummyLock()
+ self._overflow_lock = threading.Lock()
def _do_return_conn(self, conn):
try:
self._pool.put(conn, False)
except sqla_queue.Full:
- conn.close()
- self._overflow_lock.acquire()
try:
- self._overflow -= 1
+ conn.close()
finally:
- self._overflow_lock.release()
+ self._dec_overflow()
def _do_get(self):
+ use_overflow = self._max_overflow > -1
+
try:
- wait = self._max_overflow > -1 and \
- self._overflow >= self._max_overflow
+ wait = use_overflow and self._overflow >= self._max_overflow
return self._pool.get(wait, self._timeout)
except sqla_queue.SAAbort as aborted:
return aborted.context._do_get()
except sqla_queue.Empty:
- if self._max_overflow > -1 and \
- self._overflow >= self._max_overflow:
+ if use_overflow and self._overflow >= self._max_overflow:
if not wait:
return self._do_get()
else:
@@ -807,17 +926,33 @@ class QueuePool(Pool):
"connection timed out, timeout %d" %
(self.size(), self.overflow(), self._timeout))
- self._overflow_lock.acquire()
- try:
- if self._max_overflow > -1 and \
- self._overflow >= self._max_overflow:
- return self._do_get()
- else:
- con = self._create_connection()
- self._overflow += 1
- return con
- finally:
- self._overflow_lock.release()
+ if self._inc_overflow():
+ try:
+ return self._create_connection()
+ except:
+ self._dec_overflow()
+ raise
+ else:
+ return self._do_get()
+
+ def _inc_overflow(self):
+ if self._max_overflow == -1:
+ self._overflow += 1
+ return True
+ with self._overflow_lock:
+ if self._overflow < self._max_overflow:
+ self._overflow += 1
+ return True
+ else:
+ return False
+
+ def _dec_overflow(self):
+ if self._max_overflow == -1:
+ self._overflow -= 1
+ return True
+ with self._overflow_lock:
+ self._overflow -= 1
+ return True
def recreate(self):
self.logger.info("Pool recreating")