summaryrefslogtreecommitdiff
path: root/lib/sqlalchemy
diff options
context:
space:
mode:
Diffstat (limited to 'lib/sqlalchemy')
-rw-r--r--lib/sqlalchemy/engine/base.py10
-rw-r--r--lib/sqlalchemy/engine/interfaces.py53
-rw-r--r--lib/sqlalchemy/engine/strategies.py16
-rw-r--r--lib/sqlalchemy/events.py46
-rw-r--r--lib/sqlalchemy/pool.py86
5 files changed, 194 insertions, 17 deletions
diff --git a/lib/sqlalchemy/engine/base.py b/lib/sqlalchemy/engine/base.py
index 5921ab9ba..af310c450 100644
--- a/lib/sqlalchemy/engine/base.py
+++ b/lib/sqlalchemy/engine/base.py
@@ -1254,6 +1254,8 @@ class Connection(Connectable):
if context:
context.is_disconnect = self._is_disconnect
+ invalidate_pool_on_disconnect = True
+
if self._reentrant_error:
util.raise_from_cause(
exc.DBAPIError.instance(statement,
@@ -1316,6 +1318,11 @@ class Connection(Connectable):
sqlalchemy_exception.connection_invalidated = \
self._is_disconnect = ctx.is_disconnect
+ # set up potentially user-defined value for
+ # invalidate pool.
+ invalidate_pool_on_disconnect = \
+ ctx.invalidate_pool_on_disconnect
+
if should_wrap and context:
context.handle_dbapi_exception(e)
@@ -1340,7 +1347,8 @@ class Connection(Connectable):
del self._is_disconnect
if not self.invalidated:
dbapi_conn_wrapper = self.__connection
- self.engine.pool._invalidate(dbapi_conn_wrapper, e)
+ if invalidate_pool_on_disconnect:
+ self.engine.pool._invalidate(dbapi_conn_wrapper, e)
self.invalidate(e)
if self.should_close_with_result:
self.close()
diff --git a/lib/sqlalchemy/engine/interfaces.py b/lib/sqlalchemy/engine/interfaces.py
index da8fa81eb..2dd192162 100644
--- a/lib/sqlalchemy/engine/interfaces.py
+++ b/lib/sqlalchemy/engine/interfaces.py
@@ -733,6 +733,41 @@ class Dialect(object):
raise NotImplementedError()
+ @classmethod
+ def get_dialect_cls(cls, url):
+ """Given a URL, return the :class:`.Dialect` that will be used.
+
+ This is a hook that allows an external plugin to provide functionality
+ around an existing dialect, by allowing the plugin to be loaded
+ from the url based on an entrypoint, and then the plugin returns
+ the actual dialect to be used.
+
+ By default this just returns the cls.
+
+ .. versionadded:: 1.0.3
+
+ """
+ return cls
+
+ @classmethod
+ def engine_created(cls, engine):
+ """A convenience hook called before returning the final :class:`.Engine`.
+
+ If the dialect returned a different class from the
+ :meth:`.get_dialect_cls`
+ method, then the hook is called on both classes, first on
+ the dialect class returned by the :meth:`.get_dialect_cls` method and
+ then on the class on which the method was called.
+
+ The hook should be used by dialects and/or wrappers to apply special
+ events to the engine or its components. In particular, it allows
+ a dialect-wrapping class to apply dialect-level events.
+
+ .. versionadded:: 1.0.3
+
+ """
+ pass
+
class ExecutionContext(object):
"""A messenger object for a Dialect that corresponds to a single
@@ -1085,3 +1120,21 @@ class ExceptionContext(object):
changing this flag.
"""
+
+ invalidate_pool_on_disconnect = True
+ """Represent whether all connections in the pool should be invalidated
+ when a "disconnect" condition is in effect.
+
+ Setting this flag to False within the scope of the
+ :meth:`.ConnectionEvents.handle_error` event will have the effect such
+ that the full collection of connections in the pool will not be
+ invalidated during a disconnect; only the current connection that is the
+ subject of the error will actually be invalidated.
+
+ The purpose of this flag is for custom disconnect-handling schemes where
+ the invalidation of other connections in the pool is to be performed
+ based on other conditions, or even on a per-connection basis.
+
+ .. versionadded:: 1.0.3
+
+ """ \ No newline at end of file
diff --git a/lib/sqlalchemy/engine/strategies.py b/lib/sqlalchemy/engine/strategies.py
index 1fd105d67..a802e5d90 100644
--- a/lib/sqlalchemy/engine/strategies.py
+++ b/lib/sqlalchemy/engine/strategies.py
@@ -48,7 +48,8 @@ class DefaultEngineStrategy(EngineStrategy):
# create url.URL object
u = url.make_url(name_or_url)
- dialect_cls = u.get_dialect()
+ entrypoint = u.get_dialect()
+ dialect_cls = entrypoint.get_dialect_cls(u)
if kwargs.pop('_coerce_config', False):
def pop_kwarg(key, default=None):
@@ -81,11 +82,18 @@ class DefaultEngineStrategy(EngineStrategy):
# assemble connection arguments
(cargs, cparams) = dialect.create_connect_args(u)
cparams.update(pop_kwarg('connect_args', {}))
+ cargs = list(cargs) # allow mutability
# look for existing pool or create
pool = pop_kwarg('pool', None)
if pool is None:
- def connect():
+ def connect(connection_record=None):
+ if dialect._has_events:
+ for fn in dialect.dispatch.do_connect:
+ connection = fn(
+ dialect, connection_record, cargs, cparams)
+ if connection is not None:
+ return connection
return dialect.connect(*cargs, **cparams)
creator = pop_kwarg('creator', connect)
@@ -157,6 +165,10 @@ class DefaultEngineStrategy(EngineStrategy):
dialect.initialize(c)
event.listen(pool, 'first_connect', first_connect, once=True)
+ dialect_cls.engine_created(engine)
+ if entrypoint is not dialect_cls:
+ entrypoint.engine_created(engine)
+
return engine
diff --git a/lib/sqlalchemy/events.py b/lib/sqlalchemy/events.py
index 22e066c88..b2d4b54a9 100644
--- a/lib/sqlalchemy/events.py
+++ b/lib/sqlalchemy/events.py
@@ -371,7 +371,9 @@ class PoolEvents(event.Events):
"""Called when a DBAPI connection is to be "invalidated".
This event is called any time the :meth:`._ConnectionRecord.invalidate`
- method is invoked, either from API usage or via "auto-invalidation".
+ method is invoked, either from API usage or via "auto-invalidation",
+ without the ``soft`` flag.
+
The event occurs before a final attempt to call ``.close()`` on the
connection occurs.
@@ -392,6 +394,21 @@ class PoolEvents(event.Events):
"""
+ def soft_invalidate(self, dbapi_connection, connection_record, exception):
+ """Called when a DBAPI connection is to be "soft invalidated".
+
+ This event is called any time the :meth:`._ConnectionRecord.invalidate`
+ method is invoked with the ``soft`` flag.
+
+ Soft invalidation refers to when the connection record that tracks
+ this connection will force a reconnect after the current connection
+ is checked in. It does not actively close the dbapi_connection
+ at the point at which it is called.
+
+ .. versionadded:: 1.0.3
+
+ """
+
class ConnectionEvents(event.Events):
"""Available events for :class:`.Connectable`, which includes
@@ -707,6 +724,16 @@ class ConnectionEvents(event.Events):
"failed" in str(context.original_exception):
raise MySpecialException("failed operation")
+ .. warning:: Because the :meth:`.ConnectionEvents.handle_error`
+ event specifically provides for exceptions to be re-thrown as
+ the ultimate exception raised by the failed statement,
+ **stack traces will be misleading** if the user-defined event
+ handler itself fails and throws an unexpected exception;
+ the stack trace may not illustrate the actual code line that
+ failed! It is advised to code carefully here and use
+ logging and/or inline debugging if unexpected exceptions are
+ occurring.
+
Alternatively, a "chained" style of event handling can be
used, by configuring the handler with the ``retval=True``
modifier and returning the new exception instance from the
@@ -1007,6 +1034,23 @@ class DialectEvents(event.Events):
else:
return target
+ def do_connect(self, dialect, conn_rec, cargs, cparams):
+ """Receive connection arguments before a connection is made.
+
+ Return a DBAPI connection to halt further events from invoking;
+ the returned connection will be used.
+
+ Alternatively, the event can manipulate the cargs and/or cparams
+ collections; cargs will always be a Python list that can be mutated
+ in-place and cparams a Python dictionary. Return None to
+ allow control to pass to the next event handler and ultimately
+ to allow the dialect to connect normally, given the updated
+ arguments.
+
+ .. versionadded:: 1.0.3
+
+ """
+
def do_executemany(self, cursor, statement, parameters, context):
"""Receive a cursor to have executemany() called.
diff --git a/lib/sqlalchemy/pool.py b/lib/sqlalchemy/pool.py
index 999cc1120..902309d75 100644
--- a/lib/sqlalchemy/pool.py
+++ b/lib/sqlalchemy/pool.py
@@ -219,6 +219,7 @@ class Pool(log.Identified):
log.instance_logger(self, echoflag=echo)
self._threadconns = threading.local()
self._creator = creator
+ self._wrapped_creator = self._maybe_wrap_callable(creator)
self._recycle = recycle
self._invalidate_time = 0
self._use_threadlocal = use_threadlocal
@@ -249,6 +250,32 @@ class Pool(log.Identified):
for l in listeners:
self.add_listener(l)
+ def _maybe_wrap_callable(self, fn):
+ """Detect if creator accepts a single argument, or is sent
+ as a legacy style no-arg function.
+
+ """
+
+ try:
+ argspec = util.get_callable_argspec(fn, no_self=True)
+ except TypeError:
+ return lambda ctx: fn()
+
+ defaulted = argspec[3] is not None and len(argspec[3]) or 0
+ positionals = len(argspec[0]) - defaulted
+
+ # look for the exact arg signature that DefaultStrategy
+ # sends us
+ if (argspec[0], argspec[3]) == (['connection_record'], (None,)):
+ return fn
+ # or just a single positional
+ elif positionals == 1:
+ return fn
+ # all other cases, just wrap and assume legacy "creator" callable
+ # thing
+ else:
+ return lambda ctx: fn()
+
def _close_connection(self, connection):
self.logger.debug("Closing connection %r", connection)
try:
@@ -428,6 +455,8 @@ class _ConnectionRecord(object):
"""
+ _soft_invalidate_time = 0
+
@util.memoized_property
def info(self):
"""The ``.info`` dictionary associated with the DBAPI connection.
@@ -476,7 +505,7 @@ class _ConnectionRecord(object):
if self.connection is not None:
self.__close()
- def invalidate(self, e=None):
+ def invalidate(self, e=None, soft=False):
"""Invalidate the DBAPI connection held by this :class:`._ConnectionRecord`.
This method is called for all connection invalidations, including
@@ -484,6 +513,13 @@ class _ConnectionRecord(object):
:meth:`.Connection.invalidate` methods are called, as well as when any
so-called "automatic invalidation" condition occurs.
+ :param e: an exception object indicating a reason for the invalidation.
+
+ :param soft: if True, the connection isn't closed; instead, this
+ connection will be recycled on next checkout.
+
+ .. versionadded:: 1.0.3
+
.. seealso::
:ref:`pool_connection_invalidation`
@@ -492,22 +528,31 @@ class _ConnectionRecord(object):
# already invalidated
if self.connection is None:
return
- self.__pool.dispatch.invalidate(self.connection, self, e)
+ if soft:
+ self.__pool.dispatch.soft_invalidate(self.connection, self, e)
+ else:
+ self.__pool.dispatch.invalidate(self.connection, self, e)
if e is not None:
self.__pool.logger.info(
- "Invalidate connection %r (reason: %s:%s)",
+ "%sInvalidate connection %r (reason: %s:%s)",
+ "Soft " if soft else "",
self.connection, e.__class__.__name__, e)
else:
self.__pool.logger.info(
- "Invalidate connection %r", self.connection)
- self.__close()
- self.connection = None
+ "%sInvalidate connection %r",
+ "Soft " if soft else "",
+ self.connection)
+ if soft:
+ self._soft_invalidate_time = time.time()
+ else:
+ self.__close()
+ self.connection = None
def get_connection(self):
recycle = False
if self.connection is None:
- self.connection = self.__connect()
self.info.clear()
+ self.connection = self.__connect()
if self.__pool.dispatch.connect:
self.__pool.dispatch.connect(self.connection, self)
elif self.__pool._recycle > -1 and \
@@ -523,11 +568,18 @@ class _ConnectionRecord(object):
self.connection
)
recycle = True
+ elif self._soft_invalidate_time > self.starttime:
+ self.__pool.logger.info(
+ "Connection %r invalidated due to local soft invalidation; " +
+ "recycling",
+ self.connection
+ )
+ recycle = True
if recycle:
self.__close()
- self.connection = self.__connect()
self.info.clear()
+ self.connection = self.__connect()
if self.__pool.dispatch.connect:
self.__pool.dispatch.connect(self.connection, self)
return self.connection
@@ -539,7 +591,7 @@ class _ConnectionRecord(object):
def __connect(self):
try:
self.starttime = time.time()
- connection = self.__pool._creator()
+ connection = self.__pool._wrapped_creator(self)
self.__pool.logger.debug("Created new connection %r", connection)
return connection
except Exception as e:
@@ -740,7 +792,7 @@ class _ConnectionFairy(object):
"""
return self._connection_record.info
- def invalidate(self, e=None):
+ def invalidate(self, e=None, soft=False):
"""Mark this connection as invalidated.
This method can be called directly, and is also called as a result
@@ -749,6 +801,13 @@ class _ConnectionFairy(object):
further use by the pool. The invalidation mechanism proceeds
via the :meth:`._ConnectionRecord.invalidate` internal method.
+ :param e: an exception object indicating a reason for the invalidation.
+
+ :param soft: if True, the connection isn't closed; instead, this
+ connection will be recycled on next checkout.
+
+ .. versionadded:: 1.0.3
+
.. seealso::
:ref:`pool_connection_invalidation`
@@ -759,9 +818,10 @@ class _ConnectionFairy(object):
util.warn("Can't invalidate an already-closed connection.")
return
if self._connection_record:
- self._connection_record.invalidate(e=e)
- self.connection = None
- self._checkin()
+ self._connection_record.invalidate(e=e, soft=soft)
+ if not soft:
+ self.connection = None
+ self._checkin()
def cursor(self, *args, **kwargs):
"""Return a new DBAPI cursor for the underlying connection.