summaryrefslogtreecommitdiff
path: root/lib/sqlalchemy
diff options
context:
space:
mode:
authorMike Bayer <mike_mp@zzzcomputing.com>2013-07-02 13:14:21 -0400
committerMike Bayer <mike_mp@zzzcomputing.com>2013-07-02 13:14:21 -0400
commitd3d10c982c8a44c85a0114c491207297eac7611d (patch)
treebdfda394fb23cc8d65c0acb77ca070937d93580a /lib/sqlalchemy
parent38c5e870a7883df0ae104df828217e326f6cff6a (diff)
downloadsqlalchemy-d3d10c982c8a44c85a0114c491207297eac7611d.tar.gz
- refactor pool a bit so that intent between ConnectionRecord/ConnectionFairy is clear;
make sure that the DBAPI connection passed to the reset-on-return events/dialect hooks is also a "fairy", so that dictionaries like "info" are available. [ticket:2770] - rework the execution_options system so that the dialect is given the job of making any immediate adjustments based on a set event. move the "isolation level" logic to use this new system. Also work things out so that even engine-level execution options can be used for things like isolation level; the dialect attaches a connect-event handler in this case to handle the task. - to support this new system as well as further extensibiltiy of execution options add events engine_connect(), set_connection_execution_options(), set_engine_execution_options()
Diffstat (limited to 'lib/sqlalchemy')
-rw-r--r--lib/sqlalchemy/engine/base.py28
-rw-r--r--lib/sqlalchemy/engine/default.py19
-rw-r--r--lib/sqlalchemy/engine/strategies.py9
-rw-r--r--lib/sqlalchemy/events.py101
-rw-r--r--lib/sqlalchemy/pool.py198
5 files changed, 245 insertions, 110 deletions
diff --git a/lib/sqlalchemy/engine/base.py b/lib/sqlalchemy/engine/base.py
index 2d9f3af94..f69bd3d4b 100644
--- a/lib/sqlalchemy/engine/base.py
+++ b/lib/sqlalchemy/engine/base.py
@@ -46,7 +46,7 @@ class Connection(Connectable):
def __init__(self, engine, connection=None, close_with_result=False,
_branch=False, _execution_options=None,
_dispatch=None,
- _has_events=False):
+ _has_events=None):
"""Construct a new Connection.
The constructor here is not public and is only called only by an
@@ -67,7 +67,8 @@ class Connection(Connectable):
self.dispatch = _dispatch
elif engine._has_events:
self.dispatch = self.dispatch._join(engine.dispatch)
- self._has_events = _has_events or engine._has_events
+ self._has_events = _has_events or (
+ _has_events is None and engine._has_events)
self._echo = self.engine._should_log_info()
if _execution_options:
@@ -76,6 +77,9 @@ class Connection(Connectable):
else:
self._execution_options = engine._execution_options
+ if self._has_events:
+ self.dispatch.engine_connect(self, _branch)
+
def _branch(self):
"""Return a new Connection which references this Connection's
engine and connection; but does not have close_with_result enabled,
@@ -200,16 +204,11 @@ class Connection(Connectable):
"""
c = self._clone()
c._execution_options = c._execution_options.union(opt)
- if 'isolation_level' in opt:
- c._set_isolation_level()
+ if self._has_events:
+ self.dispatch.set_connection_execution_options(c, opt)
+ self.dialect.set_connection_execution_options(c, opt)
return c
- def _set_isolation_level(self):
- self.dialect.set_isolation_level(self.connection,
- self._execution_options['isolation_level'])
- self.connection._connection_record.finalize_callback = \
- self.dialect.reset_isolation_level
-
@property
def closed(self):
"""Return True if this connection is closed."""
@@ -1336,15 +1335,10 @@ class Engine(Connectable, log.Identified):
:meth:`.Engine.execution_options`
"""
- if 'isolation_level' in opt:
- raise exc.ArgumentError(
- "'isolation_level' execution option may "
- "only be specified on Connection.execution_options(). "
- "To set engine-wide isolation level, "
- "use the isolation_level argument to create_engine()."
- )
self._execution_options = \
self._execution_options.union(opt)
+ self.dispatch.set_engine_execution_options(self, opt)
+ self.dialect.set_engine_execution_options(self, opt)
def execution_options(self, **opt):
"""Return a new :class:`.Engine` that will provide
diff --git a/lib/sqlalchemy/engine/default.py b/lib/sqlalchemy/engine/default.py
index 2ad7002c4..3e8e96a42 100644
--- a/lib/sqlalchemy/engine/default.py
+++ b/lib/sqlalchemy/engine/default.py
@@ -19,6 +19,7 @@ from ..sql import compiler, expression
from .. import exc, types as sqltypes, util, pool, processors
import codecs
import weakref
+from .. import event
AUTOCOMMIT_REGEXP = re.compile(
r'\s*(?:UPDATE|INSERT|CREATE|DELETE|DROP|ALTER)',
@@ -289,6 +290,24 @@ class DefaultDialect(interfaces.Dialect):
opts.update(url.query)
return [[], opts]
+ def set_engine_execution_options(self, engine, opts):
+ if 'isolation_level' in opts:
+ isolation_level = opts['isolation_level']
+ @event.listens_for(engine, "engine_connect")
+ def set_isolation(connection, branch):
+ if not branch:
+ self._set_connection_isolation(connection, isolation_level)
+
+ def set_connection_execution_options(self, connection, opts):
+ if 'isolation_level' in opts:
+ self._set_connection_isolation(connection, opts['isolation_level'])
+
+ def _set_connection_isolation(self, connection, level):
+ self.set_isolation_level(connection.connection, level)
+ connection.connection._connection_record.\
+ finalize_callback.append(self.reset_isolation_level)
+
+
def do_begin(self, dbapi_connection):
pass
diff --git a/lib/sqlalchemy/engine/strategies.py b/lib/sqlalchemy/engine/strategies.py
index c65986ca2..3ca91968b 100644
--- a/lib/sqlalchemy/engine/strategies.py
+++ b/lib/sqlalchemy/engine/strategies.py
@@ -150,13 +150,8 @@ class DefaultEngineStrategy(EngineStrategy):
event.listen(pool, 'connect', on_connect)
def first_connect(dbapi_connection, connection_record):
- c = base.Connection(engine, connection=dbapi_connection)
-
- # TODO: removing this allows the on connect activities
- # to generate events. tests currently assume these aren't
- # sent. do we want users to get all the initial connect
- # activities as events ?
- c._has_events = False
+ c = base.Connection(engine, connection=dbapi_connection,
+ _has_events=False)
dialect.initialize(c)
event.listen(pool, 'first_connect', first_connect)
diff --git a/lib/sqlalchemy/events.py b/lib/sqlalchemy/events.py
index ae2e4ed93..7f11232ac 100644
--- a/lib/sqlalchemy/events.py
+++ b/lib/sqlalchemy/events.py
@@ -319,6 +319,10 @@ class PoolEvents(event.Events):
connection will be disposed and a fresh connection retrieved.
Processing of all checkout listeners will abort and restart
using the new connection.
+
+ .. seealso:: :meth:`.ConnectionEvents.connect` - a similar event
+ which occurs upon creation of a new :class:`.Connection`.
+
"""
def checkin(self, dbapi_connection, connection_record):
@@ -615,6 +619,103 @@ class ConnectionEvents(event.Events):
"""
+ def engine_connect(self, conn, branch):
+ """Intercept the creation of a new :class:`.Connection`.
+
+ This event is called typically as the direct result of calling
+ the :meth:`.Engine.connect` method.
+
+ It differs from the :meth:`.PoolEvents.connect` method, which
+ refers to the actual connection to a database at the DBAPI level;
+ a DBAPI connection may be pooled and reused for many operations.
+ In contrast, this event refers only to the production of a higher level
+ :class:`.Connection` wrapper around such a DBAPI connection.
+
+ It also differs from the :meth:`.PoolEvents.checkout` event
+ in that it is specific to the :class:`.Connection` object, not the
+ DBAPI connection that :meth:`.PoolEvents.checkout` deals with, although
+ this DBAPI connection is available here via the :attr:`.Connection.connection`
+ attribute. But note there can in fact
+ be multiple :meth:`.PoolEvents.checkout` events within the lifespan
+ of a single :class:`.Connection` object, if that :class:`.Connection`
+ is invalidated and re-established. There can also be multiple
+ :class:`.Connection` objects generated for the same already-checked-out
+ DBAPI connection, in the case that a "branch" of a :class:`.Connection`
+ is produced.
+
+ :param conn: :class:`.Connection` object.
+ :param branch: if True, this is a "branch" of an existing
+ :class:`.Connection`. A branch is generated within the course
+ of a statement execution to invoke supplemental statements, most
+ typically to pre-execute a SELECT of a default value for the purposes
+ of an INSERT statement.
+
+ .. versionadded:: 0.9.0
+
+ .. seealso::
+
+ :meth:`.PoolEvents.checkout` the lower-level pool checkout event
+ for an individual DBAPI connection
+
+ :meth:`.ConnectionEvents.set_connection_execution_options` - a copy of a
+ :class:`.Connection` is also made when the
+ :meth:`.Connection.execution_options` method is called.
+
+ """
+
+ def set_connection_execution_options(self, conn, opts):
+ """Intercept when the :meth:`.Connection.execution_options`
+ method is called.
+
+ This method is called after the new :class:`.Connection` has been
+ produced, with the newly updated execution options collection, but
+ before the :class:`.Dialect` has acted upon any of those new options.
+
+ Note that this method is not called when a new :class:`.Connection`
+ is produced which is inheriting execution options from its parent
+ :class:`.Engine`; to intercept this condition, use the
+ :meth:`.ConnectionEvents.connect` event.
+
+ :param conn: The newly copied :class:`.Connection` object
+
+ :param opts: dictionary of options that were passed to the
+ :meth:`.Connection.execution_options` method.
+
+ .. versionadded:: 0.9.0
+
+ .. seealso::
+
+ :meth:`.ConnectionEvents.set_engine_execution_options` - event
+ which is called when :meth:`.Engine.execution_options` is called.
+
+
+ """
+
+ def set_engine_execution_options(self, engine, opts):
+ """Intercept when the :meth:`.Engine.execution_options`
+ method is called.
+
+ The :meth:`.Engine.execution_options` method produces a shallow
+ copy of the :class:`.Engine` which stores the new options. That new
+ :class:`.Engine` is passed here. A particular application of this
+ method is to add a :meth:`.ConnectionEvents.engine_connect` event
+ handler to the given :class:`.Engine` which will perform some per-
+ :class:`.Connection` task specific to these execution options.
+
+ :param conn: The newly copied :class:`.Engine` object
+
+ :param opts: dictionary of options that were passed to the
+ :meth:`.Connection.execution_options` method.
+
+ .. versionadded:: 0.9.0
+
+ .. seealso::
+
+ :meth:`.ConnectionEvents.set_connection_execution_options` - event
+ which is called when :meth:`.Connection.execution_options` is called.
+
+ """
+
def begin(self, conn):
"""Intercept begin() events.
diff --git a/lib/sqlalchemy/pool.py b/lib/sqlalchemy/pool.py
index dcf3d9e39..97411dd3a 100644
--- a/lib/sqlalchemy/pool.py
+++ b/lib/sqlalchemy/pool.py
@@ -25,6 +25,7 @@ from .util import queue as sqla_queue
from .util import threading, memoized_property, \
chop_traceback
+from collections import deque
proxies = {}
@@ -217,7 +218,7 @@ class Pool(log.Identified):
"""
- return _ConnectionFairy(self).checkout()
+ return _ConnectionFairy.checkout(self)
def _create_connection(self):
"""Called by subclasses to create a new ConnectionRecord."""
@@ -269,18 +270,16 @@ class Pool(log.Identified):
"""
if not self._use_threadlocal:
- return _ConnectionFairy(self).checkout()
+ return _ConnectionFairy.checkout(self)
try:
rec = self._threadconns.current()
- if rec:
- return rec.checkout()
except AttributeError:
pass
+ else:
+ return rec.checkout_existing()
- agent = _ConnectionFairy(self)
- self._threadconns.current = weakref.ref(agent)
- return agent.checkout()
+ return _ConnectionFairy.checkout(self, self._threadconns)
def _return_conn(self, record):
"""Given a _ConnectionRecord, return it to the :class:`.Pool`.
@@ -311,11 +310,11 @@ class Pool(log.Identified):
class _ConnectionRecord(object):
- finalize_callback = None
def __init__(self, pool):
self.__pool = pool
self.connection = self.__connect()
+ self.finalize_callback = deque()
pool.dispatch.first_connect.\
for_modify(pool.dispatch).\
@@ -326,6 +325,36 @@ class _ConnectionRecord(object):
def info(self):
return {}
+ @classmethod
+ def checkout(cls, pool):
+ rec = pool._do_get()
+ dbapi_connection = rec.get_connection()
+ fairy = _ConnectionFairy(dbapi_connection, rec)
+ rec.fairy_ref = weakref.ref(
+ fairy,
+ lambda ref: _finalize_fairy and \
+ _finalize_fairy(
+ dbapi_connection,
+ rec, pool, ref, pool._echo)
+ )
+ _refs.add(rec)
+ if pool._echo:
+ pool.logger.debug("Connection %r checked out from pool",
+ dbapi_connection)
+ return fairy
+
+ def checkin(self):
+ self.fairy_ref = None
+ connection = self.connection
+ pool = self.__pool
+ while self.finalize_callback:
+ finalizer = self.finalize_callback.pop()
+ finalizer(connection)
+ if pool.dispatch.checkin:
+ pool.dispatch.checkin(connection, self)
+ pool._return_conn(self)
+
+
def close(self):
if self.connection is not None:
self.__pool._close_connection(self.connection)
@@ -373,11 +402,15 @@ class _ConnectionRecord(object):
raise
-def _finalize_fairy(connection, connection_record, pool, ref, echo):
+def _finalize_fairy(connection, connection_record, pool, ref, echo, fairy=None):
+ """Cleanup for a :class:`._ConnectionFairy` whether or not it's already
+ been garbage collected.
+
+ """
_refs.discard(connection_record)
if ref is not None and \
- connection_record.fairy is not ref:
+ connection_record.fairy_ref is not ref:
return
if connection is not None:
@@ -386,35 +419,31 @@ def _finalize_fairy(connection, connection_record, pool, ref, echo):
connection)
try:
+ fairy = fairy or _ConnectionFairy(connection, connection_record)
if pool.dispatch.reset:
- pool.dispatch.reset(connection, connection_record)
+ pool.dispatch.reset(fairy, connection_record)
if pool._reset_on_return is reset_rollback:
if echo:
pool.logger.debug("Connection %s rollback-on-return",
connection)
- pool._dialect.do_rollback(connection)
+ pool._dialect.do_rollback(fairy)
elif pool._reset_on_return is reset_commit:
if echo:
- pool.logger.debug("Conneciton %s commit-on-return",
+ pool.logger.debug("Connection %s commit-on-return",
connection)
- pool._dialect.do_commit(connection)
+ pool._dialect.do_commit(fairy)
+
# Immediately close detached instances
- if connection_record is None:
+ if not connection_record:
pool._close_connection(connection)
except Exception as e:
- if connection_record is not None:
+ if connection_record:
connection_record.invalidate(e=e)
if isinstance(e, (SystemExit, KeyboardInterrupt)):
raise
- if connection_record is not None:
- connection_record.fairy = None
- if connection_record.finalize_callback:
- connection_record.finalize_callback(connection)
- del connection_record.finalize_callback
- if pool.dispatch.checkin:
- pool.dispatch.checkin(connection, connection_record)
- pool._return_conn(connection_record)
+ if connection_record:
+ connection_record.checkin()
_refs = set()
@@ -424,27 +453,58 @@ class _ConnectionFairy(object):
"""Proxies a DB-API connection and provides return-on-dereference
support."""
- def __init__(self, pool):
- self._pool = pool
- self.__counter = 0
- self._echo = _echo = pool._should_log_debug()
- try:
- rec = self._connection_record = pool._do_get()
- conn = self.connection = self._connection_record.get_connection()
- rec.fairy = weakref.ref(
- self,
- lambda ref: _finalize_fairy and \
- _finalize_fairy(conn, rec, pool, ref, _echo)
- )
- _refs.add(rec)
- except:
- # helps with endless __getattr__ loops later on
- self.connection = None
- self._connection_record = None
- raise
- if self._echo:
- self._pool.logger.debug("Connection %r checked out from pool",
- self.connection)
+ def __init__(self, dbapi_connection, connection_record):
+ self.connection = dbapi_connection
+ self._connection_record = connection_record
+
+ @classmethod
+ def checkout(cls, pool, threadconns=None, fairy=None):
+ if not fairy:
+ fairy = _ConnectionRecord.checkout(pool)
+
+ fairy._pool = pool
+ fairy._counter = 0
+ fairy._echo = pool._should_log_debug()
+
+ if threadconns is not None:
+ threadconns.current = weakref.ref(fairy)
+
+ if fairy.connection is None:
+ raise exc.InvalidRequestError("This connection is closed")
+ fairy._counter += 1
+
+ if not pool.dispatch.checkout or fairy._counter != 1:
+ return fairy
+
+ # Pool listeners can trigger a reconnection on checkout
+ attempts = 2
+ while attempts > 0:
+ try:
+ pool.dispatch.checkout(fairy.connection,
+ fairy._connection_record,
+ fairy)
+ return fairy
+ except exc.DisconnectionError as e:
+ pool.logger.info(
+ "Disconnection detected on checkout: %s", e)
+ fairy._connection_record.invalidate(e)
+ fairy.connection = fairy._connection_record.get_connection()
+ attempts -= 1
+
+ pool.logger.info("Reconnection attempts exhausted on checkout")
+ fairy.invalidate()
+ raise exc.InvalidRequestError("This connection is closed")
+
+ def checkout_existing(self):
+ return _ConnectionFairy.checkout(self._pool, fairy=self)
+
+ def checkin(self):
+ _finalize_fairy(self.connection, self._connection_record,
+ self._pool, None, self._echo, fairy=self)
+ self.connection = None
+ self._connection_record = None
+
+ _close = checkin
@property
def _logger(self):
@@ -465,10 +525,7 @@ class _ConnectionFairy(object):
in subsequent instances of :class:`.ConnectionFairy`.
"""
- try:
- return self._connection_record.info
- except AttributeError:
- raise exc.InvalidRequestError("This connection is closed")
+ return self._connection_record.info
def invalidate(self, e=None):
"""Mark this connection as invalidated.
@@ -479,10 +536,10 @@ class _ConnectionFairy(object):
if self.connection is None:
raise exc.InvalidRequestError("This connection is closed")
- if self._connection_record is not None:
+ if self._connection_record:
self._connection_record.invalidate(e=e)
self.connection = None
- self._close()
+ self.checkin()
def cursor(self, *args, **kwargs):
return self.connection.cursor(*args, **kwargs)
@@ -490,32 +547,6 @@ class _ConnectionFairy(object):
def __getattr__(self, key):
return getattr(self.connection, key)
- def checkout(self):
- if self.connection is None:
- raise exc.InvalidRequestError("This connection is closed")
- self.__counter += 1
-
- if not self._pool.dispatch.checkout or self.__counter != 1:
- return self
-
- # Pool listeners can trigger a reconnection on checkout
- attempts = 2
- while attempts > 0:
- try:
- self._pool.dispatch.checkout(self.connection,
- self._connection_record,
- self)
- return self
- except exc.DisconnectionError as e:
- self._pool.logger.info(
- "Disconnection detected on checkout: %s", e)
- self._connection_record.invalidate(e)
- self.connection = self._connection_record.get_connection()
- attempts -= 1
-
- self._pool.logger.info("Reconnection attempts exhausted on checkout")
- self.invalidate()
- raise exc.InvalidRequestError("This connection is closed")
def detach(self):
"""Separate this connection from its Pool.
@@ -532,22 +563,17 @@ class _ConnectionFairy(object):
if self._connection_record is not None:
_refs.remove(self._connection_record)
- self._connection_record.fairy = None
+ self._connection_record.fairy_ref = None
self._connection_record.connection = None
self._pool._do_return_conn(self._connection_record)
self.info = self.info.copy()
self._connection_record = None
def close(self):
- self.__counter -= 1
- if self.__counter == 0:
- self._close()
+ self._counter -= 1
+ if self._counter == 0:
+ self.checkin()
- def _close(self):
- _finalize_fairy(self.connection, self._connection_record,
- self._pool, None, self._echo)
- self.connection = None
- self._connection_record = None
class SingletonThreadPool(Pool):