diff options
Diffstat (limited to 'lib/sqlalchemy')
-rw-r--r-- | lib/sqlalchemy/engine/base.py | 10 | ||||
-rw-r--r-- | lib/sqlalchemy/engine/interfaces.py | 53 | ||||
-rw-r--r-- | lib/sqlalchemy/engine/strategies.py | 16 | ||||
-rw-r--r-- | lib/sqlalchemy/events.py | 46 | ||||
-rw-r--r-- | lib/sqlalchemy/pool.py | 86 |
5 files changed, 194 insertions, 17 deletions
diff --git a/lib/sqlalchemy/engine/base.py b/lib/sqlalchemy/engine/base.py index 5921ab9ba..af310c450 100644 --- a/lib/sqlalchemy/engine/base.py +++ b/lib/sqlalchemy/engine/base.py @@ -1254,6 +1254,8 @@ class Connection(Connectable): if context: context.is_disconnect = self._is_disconnect + invalidate_pool_on_disconnect = True + if self._reentrant_error: util.raise_from_cause( exc.DBAPIError.instance(statement, @@ -1316,6 +1318,11 @@ class Connection(Connectable): sqlalchemy_exception.connection_invalidated = \ self._is_disconnect = ctx.is_disconnect + # set up potentially user-defined value for + # invalidate pool. + invalidate_pool_on_disconnect = \ + ctx.invalidate_pool_on_disconnect + if should_wrap and context: context.handle_dbapi_exception(e) @@ -1340,7 +1347,8 @@ class Connection(Connectable): del self._is_disconnect if not self.invalidated: dbapi_conn_wrapper = self.__connection - self.engine.pool._invalidate(dbapi_conn_wrapper, e) + if invalidate_pool_on_disconnect: + self.engine.pool._invalidate(dbapi_conn_wrapper, e) self.invalidate(e) if self.should_close_with_result: self.close() diff --git a/lib/sqlalchemy/engine/interfaces.py b/lib/sqlalchemy/engine/interfaces.py index da8fa81eb..2dd192162 100644 --- a/lib/sqlalchemy/engine/interfaces.py +++ b/lib/sqlalchemy/engine/interfaces.py @@ -733,6 +733,41 @@ class Dialect(object): raise NotImplementedError() + @classmethod + def get_dialect_cls(cls, url): + """Given a URL, return the :class:`.Dialect` that will be used. + + This is a hook that allows an external plugin to provide functionality + around an existing dialect, by allowing the plugin to be loaded + from the url based on an entrypoint, and then the plugin returns + the actual dialect to be used. + + By default this just returns the cls. + + .. versionadded:: 1.0.3 + + """ + return cls + + @classmethod + def engine_created(cls, engine): + """A convenience hook called before returning the final :class:`.Engine`. + + If the dialect returned a different class from the + :meth:`.get_dialect_cls` + method, then the hook is called on both classes, first on + the dialect class returned by the :meth:`.get_dialect_cls` method and + then on the class on which the method was called. + + The hook should be used by dialects and/or wrappers to apply special + events to the engine or its components. In particular, it allows + a dialect-wrapping class to apply dialect-level events. + + .. versionadded:: 1.0.3 + + """ + pass + class ExecutionContext(object): """A messenger object for a Dialect that corresponds to a single @@ -1085,3 +1120,21 @@ class ExceptionContext(object): changing this flag. """ + + invalidate_pool_on_disconnect = True + """Represent whether all connections in the pool should be invalidated + when a "disconnect" condition is in effect. + + Setting this flag to False within the scope of the + :meth:`.ConnectionEvents.handle_error` event will have the effect such + that the full collection of connections in the pool will not be + invalidated during a disconnect; only the current connection that is the + subject of the error will actually be invalidated. + + The purpose of this flag is for custom disconnect-handling schemes where + the invalidation of other connections in the pool is to be performed + based on other conditions, or even on a per-connection basis. + + .. versionadded:: 1.0.3 + + """
\ No newline at end of file diff --git a/lib/sqlalchemy/engine/strategies.py b/lib/sqlalchemy/engine/strategies.py index 1fd105d67..a802e5d90 100644 --- a/lib/sqlalchemy/engine/strategies.py +++ b/lib/sqlalchemy/engine/strategies.py @@ -48,7 +48,8 @@ class DefaultEngineStrategy(EngineStrategy): # create url.URL object u = url.make_url(name_or_url) - dialect_cls = u.get_dialect() + entrypoint = u.get_dialect() + dialect_cls = entrypoint.get_dialect_cls(u) if kwargs.pop('_coerce_config', False): def pop_kwarg(key, default=None): @@ -81,11 +82,18 @@ class DefaultEngineStrategy(EngineStrategy): # assemble connection arguments (cargs, cparams) = dialect.create_connect_args(u) cparams.update(pop_kwarg('connect_args', {})) + cargs = list(cargs) # allow mutability # look for existing pool or create pool = pop_kwarg('pool', None) if pool is None: - def connect(): + def connect(connection_record=None): + if dialect._has_events: + for fn in dialect.dispatch.do_connect: + connection = fn( + dialect, connection_record, cargs, cparams) + if connection is not None: + return connection return dialect.connect(*cargs, **cparams) creator = pop_kwarg('creator', connect) @@ -157,6 +165,10 @@ class DefaultEngineStrategy(EngineStrategy): dialect.initialize(c) event.listen(pool, 'first_connect', first_connect, once=True) + dialect_cls.engine_created(engine) + if entrypoint is not dialect_cls: + entrypoint.engine_created(engine) + return engine diff --git a/lib/sqlalchemy/events.py b/lib/sqlalchemy/events.py index 22e066c88..b2d4b54a9 100644 --- a/lib/sqlalchemy/events.py +++ b/lib/sqlalchemy/events.py @@ -371,7 +371,9 @@ class PoolEvents(event.Events): """Called when a DBAPI connection is to be "invalidated". This event is called any time the :meth:`._ConnectionRecord.invalidate` - method is invoked, either from API usage or via "auto-invalidation". + method is invoked, either from API usage or via "auto-invalidation", + without the ``soft`` flag. + The event occurs before a final attempt to call ``.close()`` on the connection occurs. @@ -392,6 +394,21 @@ class PoolEvents(event.Events): """ + def soft_invalidate(self, dbapi_connection, connection_record, exception): + """Called when a DBAPI connection is to be "soft invalidated". + + This event is called any time the :meth:`._ConnectionRecord.invalidate` + method is invoked with the ``soft`` flag. + + Soft invalidation refers to when the connection record that tracks + this connection will force a reconnect after the current connection + is checked in. It does not actively close the dbapi_connection + at the point at which it is called. + + .. versionadded:: 1.0.3 + + """ + class ConnectionEvents(event.Events): """Available events for :class:`.Connectable`, which includes @@ -707,6 +724,16 @@ class ConnectionEvents(event.Events): "failed" in str(context.original_exception): raise MySpecialException("failed operation") + .. warning:: Because the :meth:`.ConnectionEvents.handle_error` + event specifically provides for exceptions to be re-thrown as + the ultimate exception raised by the failed statement, + **stack traces will be misleading** if the user-defined event + handler itself fails and throws an unexpected exception; + the stack trace may not illustrate the actual code line that + failed! It is advised to code carefully here and use + logging and/or inline debugging if unexpected exceptions are + occurring. + Alternatively, a "chained" style of event handling can be used, by configuring the handler with the ``retval=True`` modifier and returning the new exception instance from the @@ -1007,6 +1034,23 @@ class DialectEvents(event.Events): else: return target + def do_connect(self, dialect, conn_rec, cargs, cparams): + """Receive connection arguments before a connection is made. + + Return a DBAPI connection to halt further events from invoking; + the returned connection will be used. + + Alternatively, the event can manipulate the cargs and/or cparams + collections; cargs will always be a Python list that can be mutated + in-place and cparams a Python dictionary. Return None to + allow control to pass to the next event handler and ultimately + to allow the dialect to connect normally, given the updated + arguments. + + .. versionadded:: 1.0.3 + + """ + def do_executemany(self, cursor, statement, parameters, context): """Receive a cursor to have executemany() called. diff --git a/lib/sqlalchemy/pool.py b/lib/sqlalchemy/pool.py index 999cc1120..902309d75 100644 --- a/lib/sqlalchemy/pool.py +++ b/lib/sqlalchemy/pool.py @@ -219,6 +219,7 @@ class Pool(log.Identified): log.instance_logger(self, echoflag=echo) self._threadconns = threading.local() self._creator = creator + self._wrapped_creator = self._maybe_wrap_callable(creator) self._recycle = recycle self._invalidate_time = 0 self._use_threadlocal = use_threadlocal @@ -249,6 +250,32 @@ class Pool(log.Identified): for l in listeners: self.add_listener(l) + def _maybe_wrap_callable(self, fn): + """Detect if creator accepts a single argument, or is sent + as a legacy style no-arg function. + + """ + + try: + argspec = util.get_callable_argspec(fn, no_self=True) + except TypeError: + return lambda ctx: fn() + + defaulted = argspec[3] is not None and len(argspec[3]) or 0 + positionals = len(argspec[0]) - defaulted + + # look for the exact arg signature that DefaultStrategy + # sends us + if (argspec[0], argspec[3]) == (['connection_record'], (None,)): + return fn + # or just a single positional + elif positionals == 1: + return fn + # all other cases, just wrap and assume legacy "creator" callable + # thing + else: + return lambda ctx: fn() + def _close_connection(self, connection): self.logger.debug("Closing connection %r", connection) try: @@ -428,6 +455,8 @@ class _ConnectionRecord(object): """ + _soft_invalidate_time = 0 + @util.memoized_property def info(self): """The ``.info`` dictionary associated with the DBAPI connection. @@ -476,7 +505,7 @@ class _ConnectionRecord(object): if self.connection is not None: self.__close() - def invalidate(self, e=None): + def invalidate(self, e=None, soft=False): """Invalidate the DBAPI connection held by this :class:`._ConnectionRecord`. This method is called for all connection invalidations, including @@ -484,6 +513,13 @@ class _ConnectionRecord(object): :meth:`.Connection.invalidate` methods are called, as well as when any so-called "automatic invalidation" condition occurs. + :param e: an exception object indicating a reason for the invalidation. + + :param soft: if True, the connection isn't closed; instead, this + connection will be recycled on next checkout. + + .. versionadded:: 1.0.3 + .. seealso:: :ref:`pool_connection_invalidation` @@ -492,22 +528,31 @@ class _ConnectionRecord(object): # already invalidated if self.connection is None: return - self.__pool.dispatch.invalidate(self.connection, self, e) + if soft: + self.__pool.dispatch.soft_invalidate(self.connection, self, e) + else: + self.__pool.dispatch.invalidate(self.connection, self, e) if e is not None: self.__pool.logger.info( - "Invalidate connection %r (reason: %s:%s)", + "%sInvalidate connection %r (reason: %s:%s)", + "Soft " if soft else "", self.connection, e.__class__.__name__, e) else: self.__pool.logger.info( - "Invalidate connection %r", self.connection) - self.__close() - self.connection = None + "%sInvalidate connection %r", + "Soft " if soft else "", + self.connection) + if soft: + self._soft_invalidate_time = time.time() + else: + self.__close() + self.connection = None def get_connection(self): recycle = False if self.connection is None: - self.connection = self.__connect() self.info.clear() + self.connection = self.__connect() if self.__pool.dispatch.connect: self.__pool.dispatch.connect(self.connection, self) elif self.__pool._recycle > -1 and \ @@ -523,11 +568,18 @@ class _ConnectionRecord(object): self.connection ) recycle = True + elif self._soft_invalidate_time > self.starttime: + self.__pool.logger.info( + "Connection %r invalidated due to local soft invalidation; " + + "recycling", + self.connection + ) + recycle = True if recycle: self.__close() - self.connection = self.__connect() self.info.clear() + self.connection = self.__connect() if self.__pool.dispatch.connect: self.__pool.dispatch.connect(self.connection, self) return self.connection @@ -539,7 +591,7 @@ class _ConnectionRecord(object): def __connect(self): try: self.starttime = time.time() - connection = self.__pool._creator() + connection = self.__pool._wrapped_creator(self) self.__pool.logger.debug("Created new connection %r", connection) return connection except Exception as e: @@ -740,7 +792,7 @@ class _ConnectionFairy(object): """ return self._connection_record.info - def invalidate(self, e=None): + def invalidate(self, e=None, soft=False): """Mark this connection as invalidated. This method can be called directly, and is also called as a result @@ -749,6 +801,13 @@ class _ConnectionFairy(object): further use by the pool. The invalidation mechanism proceeds via the :meth:`._ConnectionRecord.invalidate` internal method. + :param e: an exception object indicating a reason for the invalidation. + + :param soft: if True, the connection isn't closed; instead, this + connection will be recycled on next checkout. + + .. versionadded:: 1.0.3 + .. seealso:: :ref:`pool_connection_invalidation` @@ -759,9 +818,10 @@ class _ConnectionFairy(object): util.warn("Can't invalidate an already-closed connection.") return if self._connection_record: - self._connection_record.invalidate(e=e) - self.connection = None - self._checkin() + self._connection_record.invalidate(e=e, soft=soft) + if not soft: + self.connection = None + self._checkin() def cursor(self, *args, **kwargs): """Return a new DBAPI cursor for the underlying connection. |