diff options
authorJenkins <>2015-01-13 15:13:04 +0000
committerGerrit Code Review <>2015-01-13 15:13:04 +0000
commit67c6f02bb9cff33320a2a92ec3dff41c7ac75337 (patch)
parentb1fc55c7ce6004311379f4002fdceddcc8da9784 (diff)
parent046e576467fdf8eb03fa0dae946bf4c1897e3896 (diff)
Merge "Ensure DBConnectionError is raised on failed revalidate"
5 files changed, 379 insertions, 135 deletions
diff --git a/oslo_db/sqlalchemy/compat/ b/oslo_db/sqlalchemy/compat/
index b49d5c4..3dc29ac 100644
--- a/oslo_db/sqlalchemy/compat/
+++ b/oslo_db/sqlalchemy/compat/
@@ -23,8 +23,6 @@ from oslo_db.sqlalchemy.compat import handle_error as _h_err
# flake8 won't let me import handle_error directly
engine_connect = _e_conn.engine_connect
handle_error = _h_err.handle_error
-handle_connect_context = _h_err.handle_connect_context
__all__ = [
- 'engine_connect', 'handle_error',
- 'handle_connect_context']
+ 'engine_connect', 'handle_error']
diff --git a/oslo_db/sqlalchemy/compat/ b/oslo_db/sqlalchemy/compat/
index 7e476a0..5169857 100644
--- a/oslo_db/sqlalchemy/compat/
+++ b/oslo_db/sqlalchemy/compat/
@@ -16,10 +16,10 @@
-import contextlib
import sys
import six
+from sqlalchemy.engine import base as engine_base
from sqlalchemy.engine import Engine
from sqlalchemy import event
from sqlalchemy import exc as sqla_exc
@@ -39,117 +39,294 @@ def handle_error(engine, listener):
if utils.sqla_100:
event.listen(engine, "handle_error", listener)
- elif utils.sqla_097:
+ assert isinstance(engine, Engine), \
+ "engine argument must be an Engine instance, not a Connection"
+ if not utils.sqla_097:
+ _rework_handle_exception_for_events(engine)
+ engine._oslo_handle_error_events.append(listener)
+ _rework_connect_and_revalidate_for_events(engine)
+ if utils.sqla_097:
# ctx.engine added per
def wrap_listener(ctx):
- ctx.engine = ctx.connection.engine
+ if isinstance(ctx, engine_base.ExceptionContextImpl):
+ ctx.engine = ctx.connection.engine
return listener(ctx)
event.listen(engine, "handle_error", wrap_listener)
- return
- assert isinstance(engine, Engine), \
- "engine argument must be an Engine instance, not a Connection"
- # use a Connection-wrapper class to wrap _handle_dbapi_exception.
- if not getattr(engine._connection_cls,
- '_oslo_handle_error_wrapper', False):
- engine._oslo_handle_error_events = []
- class Connection(engine._connection_cls):
- _oslo_handle_error_wrapper = True
+def _rework_handle_exception_for_events(engine):
+ """Patch the _handle_dbapi_error() system on Connection.
- def _handle_dbapi_exception(self, e, statement, parameters,
- cursor, context):
+ This allows the 0.9.7-style handle_error() event to be available on
+ the Connection object.
- try:
- super(Connection, self)._handle_dbapi_exception(
- e, statement, parameters, cursor, context)
- except Exception as reraised_exception:
- # all versions:
- # _handle_dbapi_exception reraises all DBAPI errors
- # 0.8 and above:
- # reraises all errors unconditionally
- pass
- else:
- # 0.7.8:
- # _handle_dbapi_exception does not unconditionally
- # re-raise
- reraised_exception = e
- _oslo_handle_error_events = getattr(
- self.engine,
- '_oslo_handle_error_events',
- False)
- newraise = None
- if _oslo_handle_error_events:
- if isinstance(reraised_exception,
- sqla_exc.StatementError):
- sqlalchemy_exception = reraised_exception
- original_exception = sqlalchemy_exception.orig
- self._is_disconnect = is_disconnect = (
- isinstance(sqlalchemy_exception,
- sqla_exc.DBAPIError)
- and sqlalchemy_exception.connection_invalidated)
- else:
- sqlalchemy_exception = None
- original_exception = reraised_exception
- is_disconnect = False
- # new handle_error event
- ctx = ExceptionContextImpl(
- original_exception, sqlalchemy_exception,
- self.engine, self, cursor, statement,
- parameters, context, is_disconnect)
- for fn in _oslo_handle_error_events:
- try:
- # handler returns an exception;
- # call next handler in a chain
- per_fn = fn(ctx)
- if per_fn is not None:
- ctx.chained_exception = newraise = per_fn
- except Exception as _raised:
- # handler raises an exception - stop processing
- newraise = _raised
- break
- if sqlalchemy_exception and \
- self._is_disconnect != ctx.is_disconnect:
- if not ctx.is_disconnect:
- raise NotImplementedError(
- "Can't reset 'disconnect' status of exception "
- "once it is set with this version of "
- "SQLAlchemy")
- sqlalchemy_exception.connection_invalidated = \
- self._is_disconnect = ctx.is_disconnect
- if self._is_disconnect:
- self._do_disconnect(e)
- if newraise:
- six.reraise(type(newraise), newraise, sys.exc_info()[2])
- else:
- six.reraise(type(reraised_exception),
- reraised_exception, sys.exc_info()[2])
- def _do_disconnect(self, e):
- del self._is_disconnect
- if utils.sqla_094:
- dbapi_conn_wrapper = self.connection
- self.engine.pool._invalidate(dbapi_conn_wrapper, e)
- self.invalidate(e)
+ """
+ engine._oslo_handle_error_events = []
+ class Connection(engine._connection_cls):
+ def _handle_dbapi_exception(self, e, statement, parameters,
+ cursor, context):
+ try:
+ super(Connection, self)._handle_dbapi_exception(
+ e, statement, parameters, cursor, context)
+ except Exception as reraised_exception:
+ # all versions:
+ # _handle_dbapi_exception reraises all DBAPI errors
+ # 0.8 and above:
+ # reraises all errors unconditionally
+ pass
+ else:
+ # 0.7.8:
+ # _handle_dbapi_exception does not unconditionally
+ # re-raise
+ reraised_exception = e
+ _oslo_handle_error_events = getattr(
+ self.engine,
+ '_oslo_handle_error_events',
+ False)
+ newraise = None
+ if _oslo_handle_error_events:
+ if isinstance(reraised_exception,
+ sqla_exc.StatementError):
+ sqlalchemy_exception = reraised_exception
+ original_exception = sqlalchemy_exception.orig
+ self._is_disconnect = is_disconnect = (
+ isinstance(sqlalchemy_exception,
+ sqla_exc.DBAPIError)
+ and sqlalchemy_exception.connection_invalidated)
- dbapi_conn_wrapper = self.connection
- self.invalidate(e)
- if not hasattr(dbapi_conn_wrapper, '_pool') or \
- dbapi_conn_wrapper._pool is self.engine.pool:
- self.engine.dispose()
+ sqlalchemy_exception = None
+ original_exception = reraised_exception
+ is_disconnect = False
+ # new handle_error event
+ ctx = ExceptionContextImpl(
+ original_exception, sqlalchemy_exception,
+ self.engine, self, cursor, statement,
+ parameters, context, is_disconnect)
+ for fn in _oslo_handle_error_events:
+ try:
+ # handler returns an exception;
+ # call next handler in a chain
+ per_fn = fn(ctx)
+ if per_fn is not None:
+ ctx.chained_exception = newraise = per_fn
+ except Exception as _raised:
+ # handler raises an exception - stop processing
+ newraise = _raised
+ break
+ if sqlalchemy_exception and \
+ self._is_disconnect != ctx.is_disconnect:
+ if not ctx.is_disconnect:
+ raise NotImplementedError(
+ "Can't reset 'disconnect' status of exception "
+ "once it is set with this version of "
+ "SQLAlchemy")
+ sqlalchemy_exception.connection_invalidated = \
+ self._is_disconnect = ctx.is_disconnect
+ if self._is_disconnect:
+ self._do_disconnect(e)
+ if newraise:
+ six.reraise(type(newraise), newraise, sys.exc_info()[2])
+ else:
+ six.reraise(type(reraised_exception),
+ reraised_exception, sys.exc_info()[2])
+ def _do_disconnect(self, e):
+ del self._is_disconnect
+ if utils.sqla_094:
+ dbapi_conn_wrapper = self.connection
+ self.engine.pool._invalidate(dbapi_conn_wrapper, e)
+ self.invalidate(e)
+ else:
+ dbapi_conn_wrapper = self.connection
+ self.invalidate(e)
+ if not hasattr(dbapi_conn_wrapper, '_pool') or \
+ dbapi_conn_wrapper._pool is self.engine.pool:
+ self.engine.dispose()
+ engine._connection_cls = Connection
+def _rework_connect_and_revalidate_for_events(engine):
+ """Patch the _revalidate_connection() system on Connection.
+ This applies 1.0's _revalidate_connection() approach into an 0.9
+ version of SQLAlchemy, and consists of three steps:
+ 1. wrap the pool._creator function, which in 0.9 has a local
+ call to sqlalchemy.exc.DBAPIError.instance(), so that this exception is
+ again unwrapped back to the original DBAPI-specific Error, then raise
+ that. This is essentially the same as if the dbapi.connect() isn't
+ wrapped in the first place, which is how SQLAlchemy 1.0 now functions.
+ 2. patch the Engine object's raw_connection() method. In SQLAlchemy 1.0,
+ this is now where the error wrapping occurs when a pool connect attempt
+ is made. Here, when raw_connection() is called without a hosting
+ Connection, we send exception raises to
+ _handle_dbapi_exception_noconnection(), here copied from SQLAlchemy
+ 1.0, which is an alternate version of Connection._handle_dbapi_exception()
+ tailored for an initial connect failure when there is no
+ Connection object being dealt with. This allows the error handler
+ events to be called.
+ 3. patch the Connection class to follow 1.0's behavior for
+ _revalidate_connection(); here, the call to engine.raw_connection()
+ will pass the raised error to Connection._handle_dbapi_exception(),
+ again allowing error handler events to be called.
- engine._connection_cls = Connection
- engine._oslo_handle_error_events.append(listener)
+ """
+ _orig_connect = engine.pool._creator
+ def connect():
+ try:
+ return _orig_connect()
+ except sqla_exc.DBAPIError as err:
+ original_exception = err.orig
+ raise original_exception
+ engine.pool._creator = connect
+ self = engine
+ def contextual_connect(close_with_result=False, **kwargs):
+ return self._connection_cls(
+ self,
+ self._wrap_pool_connect(self.pool.connect, None),
+ close_with_result=close_with_result,
+ **kwargs)
+ def _wrap_pool_connect(fn, connection):
+ dialect = self.dialect
+ try:
+ return fn()
+ except dialect.dbapi.Error as e:
+ if connection is None:
+ _handle_dbapi_exception_noconnection(
+ e, dialect, self)
+ else:
+ six.reraise(*sys.exc_info())
+ def raw_connection(_connection=None):
+ return self._wrap_pool_connect(
+ self.pool.unique_connection, _connection)
+ engine.contextual_connect = contextual_connect
+ engine._wrap_pool_connect = _wrap_pool_connect
+ engine.raw_connection = raw_connection
+ class Connection(engine._connection_cls):
+ @property
+ def connection(self):
+ "The underlying DB-API connection managed by this Connection."
+ try:
+ return self.__connection
+ except AttributeError:
+ try:
+ return self._revalidate_connection()
+ except Exception as e:
+ self._handle_dbapi_exception(e, None, None, None, None)
+ def _handle_dbapi_exception(self,
+ e,
+ statement,
+ parameters,
+ cursor,
+ context):
+ if self.invalidated:
+ # 0.9's _handle_dbapi_exception() can't handle
+ # a Connection that is invalidated already, meaning
+ # its "__connection" attribute is not set. So if we are
+ # in that case, call our "no connection" invalidator.
+ # this is fine as we are only supporting handle_error listeners
+ # that are applied at the engine level.
+ _handle_dbapi_exception_noconnection(
+ e, self.dialect, self.engine)
+ else:
+ super(Connection, self)._handle_dbapi_exception(
+ e, statement, parameters, cursor, context)
+ def _revalidate_connection(self):
+ if self._Connection__can_reconnect and self._Connection__invalid:
+ if self._Connection__transaction is not None:
+ raise sqla_exc.InvalidRequestError(
+ "Can't reconnect until invalid "
+ "transaction is rolled back")
+ self._Connection__connection = self.engine.raw_connection(
+ _connection=self)
+ self._Connection__invalid = False
+ return self._Connection__connection
+ raise sqla_exc.ResourceClosedError("This Connection is closed")
+ engine._connection_cls = Connection
+def _handle_dbapi_exception_noconnection(e, dialect, engine):
+ exc_info = sys.exc_info()
+ is_disconnect = dialect.is_disconnect(e, None, None)
+ should_wrap = isinstance(e, dialect.dbapi.Error)
+ if should_wrap:
+ sqlalchemy_exception = sqla_exc.DBAPIError.instance(
+ None,
+ None,
+ e,
+ dialect.dbapi.Error,
+ connection_invalidated=is_disconnect)
+ else:
+ sqlalchemy_exception = None
+ newraise = None
+ ctx = ExceptionContextImpl(
+ e, sqlalchemy_exception, engine, None, None, None,
+ None, None, is_disconnect)
+ if hasattr(engine, '_oslo_handle_error_events'):
+ fns = engine._oslo_handle_error_events
+ else:
+ fns = engine.dispatch.handle_error
+ for fn in fns:
+ try:
+ # handler returns an exception;
+ # call next handler in a chain
+ per_fn = fn(ctx)
+ if per_fn is not None:
+ ctx.chained_exception = newraise = per_fn
+ except Exception as _raised:
+ # handler raises an exception - stop processing
+ newraise = _raised
+ break
+ if sqlalchemy_exception and \
+ is_disconnect != ctx.is_disconnect:
+ sqlalchemy_exception.connection_invalidated = \
+ is_disconnect = ctx.is_disconnect
+ if newraise:
+ six.reraise(type(newraise), newraise, exc_info[2])
+ elif should_wrap:
+ six.reraise(
+ type(sqlalchemy_exception), sqlalchemy_exception, exc_info[2])
+ else:
+ six.reraise(*exc_info)
class ExceptionContextImpl(object):
@@ -266,24 +443,3 @@ class ExceptionContextImpl(object):
:meth:`.ConnectionEvents.handle_error` handler.
-def handle_connect_context(handler, engine):
- """Wrap connect() routines with a "handle error" context."""
- try:
- yield
- except Exception as e:
- if utils.sqla_100:
- raise
- if isinstance(e, sqla_exc.StatementError):
- s_exc, orig = e, e.orig
- else:
- s_exc, orig = None, e
- ctx = ExceptionContextImpl(
- orig, s_exc, engine, None, None,
- None, None, None, False
- )
- handler(ctx)
diff --git a/oslo_db/sqlalchemy/ b/oslo_db/sqlalchemy/
index aa5b55a..5df1eb7 100644
--- a/oslo_db/sqlalchemy/
+++ b/oslo_db/sqlalchemy/
@@ -346,14 +346,10 @@ def register_engine(engine):
def handle_connect_error(engine):
- """Handle connect error.
+ """Connect to the engine, including handle_error handlers.
- Provide a special context that will allow on-connect errors
- to be treated within the filtering context.
- This routine is dependent on SQLAlchemy version, as version 1.0.0
- provides this functionality natively.
+ The compat library now builds this into the engine.connect()
+ system as per SQLAlchemy 1.0's behavior.
- with compat.handle_connect_context(handler, engine):
- return engine.connect()
+ return engine.connect()
diff --git a/oslo_db/sqlalchemy/ b/oslo_db/sqlalchemy/
index 24bf31d..ef253b9 100644
--- a/oslo_db/sqlalchemy/
+++ b/oslo_db/sqlalchemy/
@@ -576,7 +576,7 @@ def _test_connection(engine, max_retries, retry_interval):
de_ref = None
for attempt in attempts:
- return exc_filters.handle_connect_error(engine)
+ return engine.connect()
except exception.DBConnectionError as de:
msg = _LW('SQL connection failed. %s attempts left.')
LOG.warning(msg, max_retries - attempt)
diff --git a/oslo_db/tests/sqlalchemy/ b/oslo_db/tests/sqlalchemy/
index b2845a2..edab9d6 100644
--- a/oslo_db/tests/sqlalchemy/
+++ b/oslo_db/tests/sqlalchemy/
@@ -94,13 +94,16 @@ class TestsExceptionFilter(_SQLAExceptionMatcher, oslo_test_base.BaseTestCase):
self.engine.connect().close() # initialize
- def _dbapi_fixture(self, dialect_name):
+ def _dbapi_fixture(self, dialect_name, is_disconnect=False):
engine = self.engine
with test_utils.nested(
mock.patch.object(engine.dialect, "name", dialect_name),
+ mock.patch.object(engine.dialect,
+ "is_disconnect",
+ lambda *args: is_disconnect)
@@ -846,3 +849,94 @@ class TestDBConnectRetry(TestsExceptionFilter):
self.OperationalError("blah blah -39981 blah blah"),
2, 3
+class TestDBConnectPingWrapping(TestsExceptionFilter):
+ def setUp(self):
+ super(TestDBConnectPingWrapping, self).setUp()
+ compat.engine_connect(self.engine, session._connect_ping_listener)
+ @contextlib.contextmanager
+ def _fixture(
+ self, dialect_name, exception, good_conn_count,
+ is_disconnect=True):
+ engine = self.engine
+ # empty out the connection pool
+ engine.dispose()
+ connect_fn = engine.dialect.connect
+ real_do_execute = engine.dialect.do_execute
+ counter = itertools.count(1)
+ def cant_execute(*arg, **kw):
+ value = next(counter)
+ if value > good_conn_count:
+ raise exception
+ else:
+ return real_do_execute(*arg, **kw)
+ def cant_connect(*arg, **kw):
+ value = next(counter)
+ if value > good_conn_count:
+ raise exception
+ else:
+ return connect_fn(*arg, **kw)
+ with self._dbapi_fixture(dialect_name, is_disconnect=is_disconnect):
+ with mock.patch.object(engine.dialect, "connect", cant_connect):
+ with mock.patch.object(
+ engine.dialect, "do_execute", cant_execute):
+ yield
+ def _test_ping_listener_disconnected(
+ self, dialect_name, exc_obj, is_disconnect=True):
+ with self._fixture(dialect_name, exc_obj, 3, is_disconnect):
+ conn = self.engine.connect()
+ self.assertEqual(conn.scalar([1])), 1)
+ conn.close()
+ with self._fixture(dialect_name, exc_obj, 1, is_disconnect):
+ self.assertRaises(
+ exception.DBConnectionError,
+ self.engine.connect
+ )
+ self.assertRaises(
+ exception.DBConnectionError,
+ self.engine.connect
+ )
+ self.assertRaises(
+ exception.DBConnectionError,
+ self.engine.connect
+ )
+ with self._fixture(dialect_name, exc_obj, 1, is_disconnect):
+ self.assertRaises(
+ exception.DBConnectionError,
+ self.engine.contextual_connect
+ )
+ self.assertRaises(
+ exception.DBConnectionError,
+ self.engine.contextual_connect
+ )
+ self.assertRaises(
+ exception.DBConnectionError,
+ self.engine.contextual_connect
+ )
+ def test_mysql_w_disconnect_flag(self):
+ for code in [2002, 2003, 2002]:
+ self._test_ping_listener_disconnected(
+ "mysql",
+ self.OperationalError('%d MySQL server has gone away' % code)
+ )
+ def test_mysql_wo_disconnect_flag(self):
+ for code in [2002, 2003]:
+ self._test_ping_listener_disconnected(
+ "mysql",
+ self.OperationalError('%d MySQL server has gone away' % code),
+ is_disconnect=False
+ )