diff options
-rw-r--r-- | oslo/db/sqlalchemy/session.py | 54 | ||||
-rw-r--r-- | tests/sqlalchemy/test_exc_filters.py | 69 | ||||
-rw-r--r-- | tests/sqlalchemy/test_sqlalchemy.py | 50 |
3 files changed, 93 insertions, 80 deletions
diff --git a/oslo/db/sqlalchemy/session.py b/oslo/db/sqlalchemy/session.py index 13eb118..2b5a0e7 100644 --- a/oslo/db/sqlalchemy/session.py +++ b/oslo/db/sqlalchemy/session.py @@ -289,7 +289,9 @@ from sqlalchemy.interfaces import PoolListener import sqlalchemy.orm from sqlalchemy.pool import NullPool, StaticPool from sqlalchemy.sql.expression import literal_column +from sqlalchemy.sql.expression import select +from oslo.db import exception from oslo.db.openstack.common.gettextutils import _LW from oslo.db.openstack.common import timeutils from oslo.db import options @@ -335,34 +337,23 @@ def _thread_yield(dbapi_con, con_record): time.sleep(0) -def _ping_listener(engine, dbapi_conn, connection_rec, connection_proxy): - """Ensures that MySQL, PostgreSQL or DB2 connections are alive. +def _begin_ping_listener(connection): + """Ping the server at transaction begin and transparently reconnect + if a disconnect exception occurs. - Borrowed from: - http://groups.google.com/group/sqlalchemy/msg/a4ce563d802c929f """ - cursor = dbapi_conn.cursor() try: - ping_sql = 'select 1' - if engine.name == 'ibm_db_sa': - # DB2 requires a table expression - ping_sql = 'select 1 from (values (1)) AS t1' - cursor.execute(ping_sql) - except Exception as ex: - if engine.dialect.is_disconnect(ex, dbapi_conn, cursor): - msg = _LW('Database server has gone away: %s') % ex - LOG.warning(msg) - - # if the database server has gone away, all connections in the pool - # have become invalid and we can safely close all of them here, - # rather than waste time on checking of every single connection - engine.dispose() - - # this will be handled by SQLAlchemy and will force it to create - # a new connection and retry the original action - raise sqla_exc.DisconnectionError(msg) - else: - raise + # run a SELECT 1. use a core select() so that + # any details like that needed by Oracle, DB2 etc. are handled. + connection.scalar(select([1])) + except exception.DBConnectionError: + # catch DBConnectionError, which is raised by the filter + # system. + # disconnect detected. The connection is now + # "invalid", but the pool should be ready to return + # new connections assuming they are good now. + # run the select again to re-validate the Connection. + connection.scalar(select([1])) def _set_session_sql_mode(dbapi_con, connection_rec, sql_mode=None): @@ -482,12 +473,9 @@ def create_engine(sql_connection, sqlite_fk=False, mysql_sql_mode=None, if thread_checkin: sqlalchemy.event.listen(engine, 'checkin', _thread_yield) - if engine.name in ('ibm_db_sa', 'mysql', 'postgresql'): - ping_callback = functools.partial(_ping_listener, engine) - sqlalchemy.event.listen(engine, 'checkout', ping_callback) - if engine.name == 'mysql': - if mysql_sql_mode is not None: - _mysql_set_mode_callback(engine, mysql_sql_mode) + if engine.name == 'mysql': + if mysql_sql_mode is not None: + _mysql_set_mode_callback(engine, mysql_sql_mode) elif 'sqlite' in connection_dict.drivername: if not sqlite_synchronous: sqlalchemy.event.listen(engine, 'connect', @@ -522,6 +510,10 @@ def create_engine(sql_connection, sqlite_fk=False, mysql_sql_mode=None, # register alternate exception handler exc_filters.register_engine(engine) + + # register on begin handler + sqlalchemy.event.listen(engine, "begin", _begin_ping_listener) + return engine diff --git a/tests/sqlalchemy/test_exc_filters.py b/tests/sqlalchemy/test_exc_filters.py index 7c3b250..6f95731 100644 --- a/tests/sqlalchemy/test_exc_filters.py +++ b/tests/sqlalchemy/test_exc_filters.py @@ -13,6 +13,7 @@ """Test exception filters applied to engines.""" import contextlib +import itertools import mock import six @@ -60,6 +61,16 @@ class TestsExceptionFilter(test_base.DbTestCase): """ @contextlib.contextmanager + def _dbapi_fixture(self, dialect_name): + engine = self.engine + with contextlib.nested( + mock.patch.object(engine.dialect.dbapi, "Error", + self.Error), + mock.patch.object(engine.dialect, "name", dialect_name), + ): + yield + + @contextlib.contextmanager def _fixture(self, dialect_name, exception, is_disconnect=False): def do_execute(self, cursor, statement, parameters, **kw): @@ -451,3 +462,61 @@ class IntegrationTest(test_base.DbTestCase): self.Foo.counter == sqla.func.imfake(123)) matched = self.assertRaises(sqla.exc.OperationalError, q.all) self.assertTrue("no such function" in str(matched)) + + +class TestDBDisconnected(TestsExceptionFilter): + + @contextlib.contextmanager + def _fixture(self, dialect_name, exception, num_disconnects): + engine = self.engine + + real_do_execute = engine.dialect.do_execute + counter = itertools.count(1) + + def fake_do_execute(self, *arg, **kw): + if next(counter) > num_disconnects: + return real_do_execute(self, *arg, **kw) + else: + raise exception + + with self._dbapi_fixture(dialect_name): + with contextlib.nested( + mock.patch.object(engine.dialect, + "do_execute", fake_do_execute), + mock.patch.object(engine.dialect, "is_disconnect", + mock.Mock(return_value=True)) + ): + yield + + def _test_ping_listener_disconnected(self, dialect_name, exc_obj): + with self._fixture(dialect_name, exc_obj, 1): + conn = self.engine.connect() + with conn.begin(): + self.assertEqual(conn.scalar(sqla.select([1])), 1) + self.assertFalse(conn.closed) + self.assertFalse(conn.invalidated) + self.assertTrue(conn.in_transaction()) + + with self._fixture(dialect_name, exc_obj, 2): + conn = self.engine.connect() + self.assertRaises( + exception.DBConnectionError, + conn.begin + ) + self.assertFalse(conn.closed) + self.assertFalse(conn.in_transaction()) + self.assertTrue(conn.invalidated) + + def test_mysql_ping_listener_disconnected(self): + for code in [2006, 2013, 2014, 2045, 2055]: + self._test_ping_listener_disconnected( + "mysql", + self.OperationalError('%d MySQL server has gone away' % code) + ) + + def test_db2_ping_listener_disconnected(self): + self._test_ping_listener_disconnected( + "ibm_db_sa", + self.OperationalError( + 'SQL30081N: DB2 Server connection is no longer active') + ) diff --git a/tests/sqlalchemy/test_sqlalchemy.py b/tests/sqlalchemy/test_sqlalchemy.py index 7dc070c..fac17ae 100644 --- a/tests/sqlalchemy/test_sqlalchemy.py +++ b/tests/sqlalchemy/test_sqlalchemy.py @@ -17,9 +17,7 @@ """Unit tests for SQLAlchemy specific code.""" import logging -from oslo.config import cfg -import _mysql_exceptions import fixtures import mock from oslotest import base as oslo_test @@ -28,6 +26,7 @@ from sqlalchemy import Column, MetaData, Table from sqlalchemy import Integer, String from sqlalchemy.ext.declarative import declarative_base +from oslo.config import cfg from oslo.db import exception from oslo.db import options as db_options from oslo.db.sqlalchemy import models @@ -127,53 +126,6 @@ class FakeDB2Engine(object): pass -class TestDBDisconnected(oslo_test.BaseTestCase): - - def _test_ping_listener_disconnected(self, connection): - engine_args = { - 'pool_recycle': 3600, - 'echo': False, - 'convert_unicode': True} - - engine = sqlalchemy.create_engine(connection, **engine_args) - with mock.patch.object(engine, 'dispose') as dispose_mock: - self.assertRaises(sqlalchemy.exc.DisconnectionError, - session._ping_listener, engine, - FakeDBAPIConnection(), FakeConnectionRec(), - FakeConnectionProxy()) - dispose_mock.assert_called_once_with() - - def test_mysql_ping_listener_disconnected(self): - def fake_execute(sql): - raise _mysql_exceptions.OperationalError(self.mysql_error, - ('MySQL server has ' - 'gone away')) - with mock.patch.object(FakeCursor, 'execute', - side_effect=fake_execute): - connection = 'mysql://root:password@fakehost/fakedb?charset=utf8' - for code in [2006, 2013, 2014, 2045, 2055]: - self.mysql_error = code - self._test_ping_listener_disconnected(connection) - - def test_db2_ping_listener_disconnected(self): - - def fake_execute(sql): - raise OperationalError('SQL30081N: DB2 Server ' - 'connection is no longer active') - with mock.patch.object(FakeCursor, 'execute', - side_effect=fake_execute): - # TODO(dperaza): Need a fake engine for db2 since ibm_db_sa is not - # in global requirements. Change this code to use real IBM db2 - # engine as soon as ibm_db_sa is included in global-requirements - # under openstack/requirements project. - fake_create_engine = lambda *args, **kargs: FakeDB2Engine() - with mock.patch.object(sqlalchemy, 'create_engine', - side_effect=fake_create_engine): - connection = ('ibm_db_sa://db2inst1:openstack@fakehost:50000' - '/fakedab') - self._test_ping_listener_disconnected(connection) - - class MySQLModeTestCase(test_base.MySQLOpportunisticTestCase): def __init__(self, *args, **kwargs): |