diff options
author | Mike Bayer <mike_mp@zzzcomputing.com> | 2013-07-02 13:14:21 -0400 |
---|---|---|
committer | Mike Bayer <mike_mp@zzzcomputing.com> | 2013-07-02 13:14:21 -0400 |
commit | d3d10c982c8a44c85a0114c491207297eac7611d (patch) | |
tree | bdfda394fb23cc8d65c0acb77ca070937d93580a /lib/sqlalchemy | |
parent | 38c5e870a7883df0ae104df828217e326f6cff6a (diff) | |
download | sqlalchemy-d3d10c982c8a44c85a0114c491207297eac7611d.tar.gz |
- refactor pool a bit so that intent between ConnectionRecord/ConnectionFairy is clear;
make sure that the DBAPI connection passed to the reset-on-return events/dialect hooks
is also a "fairy", so that dictionaries like "info" are available. [ticket:2770]
- rework the execution_options system so that the dialect is given the job of making
any immediate adjustments based on a set event. move the "isolation level" logic to use
this new system. Also work things out so that even engine-level execution options
can be used for things like isolation level; the dialect attaches a connect-event
handler in this case to handle the task.
- to support this new system as well as further extensibiltiy of execution options
add events engine_connect(), set_connection_execution_options(), set_engine_execution_options()
Diffstat (limited to 'lib/sqlalchemy')
-rw-r--r-- | lib/sqlalchemy/engine/base.py | 28 | ||||
-rw-r--r-- | lib/sqlalchemy/engine/default.py | 19 | ||||
-rw-r--r-- | lib/sqlalchemy/engine/strategies.py | 9 | ||||
-rw-r--r-- | lib/sqlalchemy/events.py | 101 | ||||
-rw-r--r-- | lib/sqlalchemy/pool.py | 198 |
5 files changed, 245 insertions, 110 deletions
diff --git a/lib/sqlalchemy/engine/base.py b/lib/sqlalchemy/engine/base.py index 2d9f3af94..f69bd3d4b 100644 --- a/lib/sqlalchemy/engine/base.py +++ b/lib/sqlalchemy/engine/base.py @@ -46,7 +46,7 @@ class Connection(Connectable): def __init__(self, engine, connection=None, close_with_result=False, _branch=False, _execution_options=None, _dispatch=None, - _has_events=False): + _has_events=None): """Construct a new Connection. The constructor here is not public and is only called only by an @@ -67,7 +67,8 @@ class Connection(Connectable): self.dispatch = _dispatch elif engine._has_events: self.dispatch = self.dispatch._join(engine.dispatch) - self._has_events = _has_events or engine._has_events + self._has_events = _has_events or ( + _has_events is None and engine._has_events) self._echo = self.engine._should_log_info() if _execution_options: @@ -76,6 +77,9 @@ class Connection(Connectable): else: self._execution_options = engine._execution_options + if self._has_events: + self.dispatch.engine_connect(self, _branch) + def _branch(self): """Return a new Connection which references this Connection's engine and connection; but does not have close_with_result enabled, @@ -200,16 +204,11 @@ class Connection(Connectable): """ c = self._clone() c._execution_options = c._execution_options.union(opt) - if 'isolation_level' in opt: - c._set_isolation_level() + if self._has_events: + self.dispatch.set_connection_execution_options(c, opt) + self.dialect.set_connection_execution_options(c, opt) return c - def _set_isolation_level(self): - self.dialect.set_isolation_level(self.connection, - self._execution_options['isolation_level']) - self.connection._connection_record.finalize_callback = \ - self.dialect.reset_isolation_level - @property def closed(self): """Return True if this connection is closed.""" @@ -1336,15 +1335,10 @@ class Engine(Connectable, log.Identified): :meth:`.Engine.execution_options` """ - if 'isolation_level' in opt: - raise exc.ArgumentError( - "'isolation_level' execution option may " - "only be specified on Connection.execution_options(). " - "To set engine-wide isolation level, " - "use the isolation_level argument to create_engine()." - ) self._execution_options = \ self._execution_options.union(opt) + self.dispatch.set_engine_execution_options(self, opt) + self.dialect.set_engine_execution_options(self, opt) def execution_options(self, **opt): """Return a new :class:`.Engine` that will provide diff --git a/lib/sqlalchemy/engine/default.py b/lib/sqlalchemy/engine/default.py index 2ad7002c4..3e8e96a42 100644 --- a/lib/sqlalchemy/engine/default.py +++ b/lib/sqlalchemy/engine/default.py @@ -19,6 +19,7 @@ from ..sql import compiler, expression from .. import exc, types as sqltypes, util, pool, processors import codecs import weakref +from .. import event AUTOCOMMIT_REGEXP = re.compile( r'\s*(?:UPDATE|INSERT|CREATE|DELETE|DROP|ALTER)', @@ -289,6 +290,24 @@ class DefaultDialect(interfaces.Dialect): opts.update(url.query) return [[], opts] + def set_engine_execution_options(self, engine, opts): + if 'isolation_level' in opts: + isolation_level = opts['isolation_level'] + @event.listens_for(engine, "engine_connect") + def set_isolation(connection, branch): + if not branch: + self._set_connection_isolation(connection, isolation_level) + + def set_connection_execution_options(self, connection, opts): + if 'isolation_level' in opts: + self._set_connection_isolation(connection, opts['isolation_level']) + + def _set_connection_isolation(self, connection, level): + self.set_isolation_level(connection.connection, level) + connection.connection._connection_record.\ + finalize_callback.append(self.reset_isolation_level) + + def do_begin(self, dbapi_connection): pass diff --git a/lib/sqlalchemy/engine/strategies.py b/lib/sqlalchemy/engine/strategies.py index c65986ca2..3ca91968b 100644 --- a/lib/sqlalchemy/engine/strategies.py +++ b/lib/sqlalchemy/engine/strategies.py @@ -150,13 +150,8 @@ class DefaultEngineStrategy(EngineStrategy): event.listen(pool, 'connect', on_connect) def first_connect(dbapi_connection, connection_record): - c = base.Connection(engine, connection=dbapi_connection) - - # TODO: removing this allows the on connect activities - # to generate events. tests currently assume these aren't - # sent. do we want users to get all the initial connect - # activities as events ? - c._has_events = False + c = base.Connection(engine, connection=dbapi_connection, + _has_events=False) dialect.initialize(c) event.listen(pool, 'first_connect', first_connect) diff --git a/lib/sqlalchemy/events.py b/lib/sqlalchemy/events.py index ae2e4ed93..7f11232ac 100644 --- a/lib/sqlalchemy/events.py +++ b/lib/sqlalchemy/events.py @@ -319,6 +319,10 @@ class PoolEvents(event.Events): connection will be disposed and a fresh connection retrieved. Processing of all checkout listeners will abort and restart using the new connection. + + .. seealso:: :meth:`.ConnectionEvents.connect` - a similar event + which occurs upon creation of a new :class:`.Connection`. + """ def checkin(self, dbapi_connection, connection_record): @@ -615,6 +619,103 @@ class ConnectionEvents(event.Events): """ + def engine_connect(self, conn, branch): + """Intercept the creation of a new :class:`.Connection`. + + This event is called typically as the direct result of calling + the :meth:`.Engine.connect` method. + + It differs from the :meth:`.PoolEvents.connect` method, which + refers to the actual connection to a database at the DBAPI level; + a DBAPI connection may be pooled and reused for many operations. + In contrast, this event refers only to the production of a higher level + :class:`.Connection` wrapper around such a DBAPI connection. + + It also differs from the :meth:`.PoolEvents.checkout` event + in that it is specific to the :class:`.Connection` object, not the + DBAPI connection that :meth:`.PoolEvents.checkout` deals with, although + this DBAPI connection is available here via the :attr:`.Connection.connection` + attribute. But note there can in fact + be multiple :meth:`.PoolEvents.checkout` events within the lifespan + of a single :class:`.Connection` object, if that :class:`.Connection` + is invalidated and re-established. There can also be multiple + :class:`.Connection` objects generated for the same already-checked-out + DBAPI connection, in the case that a "branch" of a :class:`.Connection` + is produced. + + :param conn: :class:`.Connection` object. + :param branch: if True, this is a "branch" of an existing + :class:`.Connection`. A branch is generated within the course + of a statement execution to invoke supplemental statements, most + typically to pre-execute a SELECT of a default value for the purposes + of an INSERT statement. + + .. versionadded:: 0.9.0 + + .. seealso:: + + :meth:`.PoolEvents.checkout` the lower-level pool checkout event + for an individual DBAPI connection + + :meth:`.ConnectionEvents.set_connection_execution_options` - a copy of a + :class:`.Connection` is also made when the + :meth:`.Connection.execution_options` method is called. + + """ + + def set_connection_execution_options(self, conn, opts): + """Intercept when the :meth:`.Connection.execution_options` + method is called. + + This method is called after the new :class:`.Connection` has been + produced, with the newly updated execution options collection, but + before the :class:`.Dialect` has acted upon any of those new options. + + Note that this method is not called when a new :class:`.Connection` + is produced which is inheriting execution options from its parent + :class:`.Engine`; to intercept this condition, use the + :meth:`.ConnectionEvents.connect` event. + + :param conn: The newly copied :class:`.Connection` object + + :param opts: dictionary of options that were passed to the + :meth:`.Connection.execution_options` method. + + .. versionadded:: 0.9.0 + + .. seealso:: + + :meth:`.ConnectionEvents.set_engine_execution_options` - event + which is called when :meth:`.Engine.execution_options` is called. + + + """ + + def set_engine_execution_options(self, engine, opts): + """Intercept when the :meth:`.Engine.execution_options` + method is called. + + The :meth:`.Engine.execution_options` method produces a shallow + copy of the :class:`.Engine` which stores the new options. That new + :class:`.Engine` is passed here. A particular application of this + method is to add a :meth:`.ConnectionEvents.engine_connect` event + handler to the given :class:`.Engine` which will perform some per- + :class:`.Connection` task specific to these execution options. + + :param conn: The newly copied :class:`.Engine` object + + :param opts: dictionary of options that were passed to the + :meth:`.Connection.execution_options` method. + + .. versionadded:: 0.9.0 + + .. seealso:: + + :meth:`.ConnectionEvents.set_connection_execution_options` - event + which is called when :meth:`.Connection.execution_options` is called. + + """ + def begin(self, conn): """Intercept begin() events. diff --git a/lib/sqlalchemy/pool.py b/lib/sqlalchemy/pool.py index dcf3d9e39..97411dd3a 100644 --- a/lib/sqlalchemy/pool.py +++ b/lib/sqlalchemy/pool.py @@ -25,6 +25,7 @@ from .util import queue as sqla_queue from .util import threading, memoized_property, \ chop_traceback +from collections import deque proxies = {} @@ -217,7 +218,7 @@ class Pool(log.Identified): """ - return _ConnectionFairy(self).checkout() + return _ConnectionFairy.checkout(self) def _create_connection(self): """Called by subclasses to create a new ConnectionRecord.""" @@ -269,18 +270,16 @@ class Pool(log.Identified): """ if not self._use_threadlocal: - return _ConnectionFairy(self).checkout() + return _ConnectionFairy.checkout(self) try: rec = self._threadconns.current() - if rec: - return rec.checkout() except AttributeError: pass + else: + return rec.checkout_existing() - agent = _ConnectionFairy(self) - self._threadconns.current = weakref.ref(agent) - return agent.checkout() + return _ConnectionFairy.checkout(self, self._threadconns) def _return_conn(self, record): """Given a _ConnectionRecord, return it to the :class:`.Pool`. @@ -311,11 +310,11 @@ class Pool(log.Identified): class _ConnectionRecord(object): - finalize_callback = None def __init__(self, pool): self.__pool = pool self.connection = self.__connect() + self.finalize_callback = deque() pool.dispatch.first_connect.\ for_modify(pool.dispatch).\ @@ -326,6 +325,36 @@ class _ConnectionRecord(object): def info(self): return {} + @classmethod + def checkout(cls, pool): + rec = pool._do_get() + dbapi_connection = rec.get_connection() + fairy = _ConnectionFairy(dbapi_connection, rec) + rec.fairy_ref = weakref.ref( + fairy, + lambda ref: _finalize_fairy and \ + _finalize_fairy( + dbapi_connection, + rec, pool, ref, pool._echo) + ) + _refs.add(rec) + if pool._echo: + pool.logger.debug("Connection %r checked out from pool", + dbapi_connection) + return fairy + + def checkin(self): + self.fairy_ref = None + connection = self.connection + pool = self.__pool + while self.finalize_callback: + finalizer = self.finalize_callback.pop() + finalizer(connection) + if pool.dispatch.checkin: + pool.dispatch.checkin(connection, self) + pool._return_conn(self) + + def close(self): if self.connection is not None: self.__pool._close_connection(self.connection) @@ -373,11 +402,15 @@ class _ConnectionRecord(object): raise -def _finalize_fairy(connection, connection_record, pool, ref, echo): +def _finalize_fairy(connection, connection_record, pool, ref, echo, fairy=None): + """Cleanup for a :class:`._ConnectionFairy` whether or not it's already + been garbage collected. + + """ _refs.discard(connection_record) if ref is not None and \ - connection_record.fairy is not ref: + connection_record.fairy_ref is not ref: return if connection is not None: @@ -386,35 +419,31 @@ def _finalize_fairy(connection, connection_record, pool, ref, echo): connection) try: + fairy = fairy or _ConnectionFairy(connection, connection_record) if pool.dispatch.reset: - pool.dispatch.reset(connection, connection_record) + pool.dispatch.reset(fairy, connection_record) if pool._reset_on_return is reset_rollback: if echo: pool.logger.debug("Connection %s rollback-on-return", connection) - pool._dialect.do_rollback(connection) + pool._dialect.do_rollback(fairy) elif pool._reset_on_return is reset_commit: if echo: - pool.logger.debug("Conneciton %s commit-on-return", + pool.logger.debug("Connection %s commit-on-return", connection) - pool._dialect.do_commit(connection) + pool._dialect.do_commit(fairy) + # Immediately close detached instances - if connection_record is None: + if not connection_record: pool._close_connection(connection) except Exception as e: - if connection_record is not None: + if connection_record: connection_record.invalidate(e=e) if isinstance(e, (SystemExit, KeyboardInterrupt)): raise - if connection_record is not None: - connection_record.fairy = None - if connection_record.finalize_callback: - connection_record.finalize_callback(connection) - del connection_record.finalize_callback - if pool.dispatch.checkin: - pool.dispatch.checkin(connection, connection_record) - pool._return_conn(connection_record) + if connection_record: + connection_record.checkin() _refs = set() @@ -424,27 +453,58 @@ class _ConnectionFairy(object): """Proxies a DB-API connection and provides return-on-dereference support.""" - def __init__(self, pool): - self._pool = pool - self.__counter = 0 - self._echo = _echo = pool._should_log_debug() - try: - rec = self._connection_record = pool._do_get() - conn = self.connection = self._connection_record.get_connection() - rec.fairy = weakref.ref( - self, - lambda ref: _finalize_fairy and \ - _finalize_fairy(conn, rec, pool, ref, _echo) - ) - _refs.add(rec) - except: - # helps with endless __getattr__ loops later on - self.connection = None - self._connection_record = None - raise - if self._echo: - self._pool.logger.debug("Connection %r checked out from pool", - self.connection) + def __init__(self, dbapi_connection, connection_record): + self.connection = dbapi_connection + self._connection_record = connection_record + + @classmethod + def checkout(cls, pool, threadconns=None, fairy=None): + if not fairy: + fairy = _ConnectionRecord.checkout(pool) + + fairy._pool = pool + fairy._counter = 0 + fairy._echo = pool._should_log_debug() + + if threadconns is not None: + threadconns.current = weakref.ref(fairy) + + if fairy.connection is None: + raise exc.InvalidRequestError("This connection is closed") + fairy._counter += 1 + + if not pool.dispatch.checkout or fairy._counter != 1: + return fairy + + # Pool listeners can trigger a reconnection on checkout + attempts = 2 + while attempts > 0: + try: + pool.dispatch.checkout(fairy.connection, + fairy._connection_record, + fairy) + return fairy + except exc.DisconnectionError as e: + pool.logger.info( + "Disconnection detected on checkout: %s", e) + fairy._connection_record.invalidate(e) + fairy.connection = fairy._connection_record.get_connection() + attempts -= 1 + + pool.logger.info("Reconnection attempts exhausted on checkout") + fairy.invalidate() + raise exc.InvalidRequestError("This connection is closed") + + def checkout_existing(self): + return _ConnectionFairy.checkout(self._pool, fairy=self) + + def checkin(self): + _finalize_fairy(self.connection, self._connection_record, + self._pool, None, self._echo, fairy=self) + self.connection = None + self._connection_record = None + + _close = checkin @property def _logger(self): @@ -465,10 +525,7 @@ class _ConnectionFairy(object): in subsequent instances of :class:`.ConnectionFairy`. """ - try: - return self._connection_record.info - except AttributeError: - raise exc.InvalidRequestError("This connection is closed") + return self._connection_record.info def invalidate(self, e=None): """Mark this connection as invalidated. @@ -479,10 +536,10 @@ class _ConnectionFairy(object): if self.connection is None: raise exc.InvalidRequestError("This connection is closed") - if self._connection_record is not None: + if self._connection_record: self._connection_record.invalidate(e=e) self.connection = None - self._close() + self.checkin() def cursor(self, *args, **kwargs): return self.connection.cursor(*args, **kwargs) @@ -490,32 +547,6 @@ class _ConnectionFairy(object): def __getattr__(self, key): return getattr(self.connection, key) - def checkout(self): - if self.connection is None: - raise exc.InvalidRequestError("This connection is closed") - self.__counter += 1 - - if not self._pool.dispatch.checkout or self.__counter != 1: - return self - - # Pool listeners can trigger a reconnection on checkout - attempts = 2 - while attempts > 0: - try: - self._pool.dispatch.checkout(self.connection, - self._connection_record, - self) - return self - except exc.DisconnectionError as e: - self._pool.logger.info( - "Disconnection detected on checkout: %s", e) - self._connection_record.invalidate(e) - self.connection = self._connection_record.get_connection() - attempts -= 1 - - self._pool.logger.info("Reconnection attempts exhausted on checkout") - self.invalidate() - raise exc.InvalidRequestError("This connection is closed") def detach(self): """Separate this connection from its Pool. @@ -532,22 +563,17 @@ class _ConnectionFairy(object): if self._connection_record is not None: _refs.remove(self._connection_record) - self._connection_record.fairy = None + self._connection_record.fairy_ref = None self._connection_record.connection = None self._pool._do_return_conn(self._connection_record) self.info = self.info.copy() self._connection_record = None def close(self): - self.__counter -= 1 - if self.__counter == 0: - self._close() + self._counter -= 1 + if self._counter == 0: + self.checkin() - def _close(self): - _finalize_fairy(self.connection, self._connection_record, - self._pool, None, self._echo) - self.connection = None - self._connection_record = None class SingletonThreadPool(Pool): |