summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--oslo/db/sqlalchemy/session.py54
-rw-r--r--tests/sqlalchemy/test_exc_filters.py69
-rw-r--r--tests/sqlalchemy/test_sqlalchemy.py50
3 files changed, 93 insertions, 80 deletions
diff --git a/oslo/db/sqlalchemy/session.py b/oslo/db/sqlalchemy/session.py
index 13eb118..2b5a0e7 100644
--- a/oslo/db/sqlalchemy/session.py
+++ b/oslo/db/sqlalchemy/session.py
@@ -289,7 +289,9 @@ from sqlalchemy.interfaces import PoolListener
import sqlalchemy.orm
from sqlalchemy.pool import NullPool, StaticPool
from sqlalchemy.sql.expression import literal_column
+from sqlalchemy.sql.expression import select
+from oslo.db import exception
from oslo.db.openstack.common.gettextutils import _LW
from oslo.db.openstack.common import timeutils
from oslo.db import options
@@ -335,34 +337,23 @@ def _thread_yield(dbapi_con, con_record):
time.sleep(0)
-def _ping_listener(engine, dbapi_conn, connection_rec, connection_proxy):
- """Ensures that MySQL, PostgreSQL or DB2 connections are alive.
+def _begin_ping_listener(connection):
+ """Ping the server at transaction begin and transparently reconnect
+ if a disconnect exception occurs.
- Borrowed from:
- http://groups.google.com/group/sqlalchemy/msg/a4ce563d802c929f
"""
- cursor = dbapi_conn.cursor()
try:
- ping_sql = 'select 1'
- if engine.name == 'ibm_db_sa':
- # DB2 requires a table expression
- ping_sql = 'select 1 from (values (1)) AS t1'
- cursor.execute(ping_sql)
- except Exception as ex:
- if engine.dialect.is_disconnect(ex, dbapi_conn, cursor):
- msg = _LW('Database server has gone away: %s') % ex
- LOG.warning(msg)
-
- # if the database server has gone away, all connections in the pool
- # have become invalid and we can safely close all of them here,
- # rather than waste time on checking of every single connection
- engine.dispose()
-
- # this will be handled by SQLAlchemy and will force it to create
- # a new connection and retry the original action
- raise sqla_exc.DisconnectionError(msg)
- else:
- raise
+ # run a SELECT 1. use a core select() so that
+ # any details like that needed by Oracle, DB2 etc. are handled.
+ connection.scalar(select([1]))
+ except exception.DBConnectionError:
+ # catch DBConnectionError, which is raised by the filter
+ # system.
+ # disconnect detected. The connection is now
+ # "invalid", but the pool should be ready to return
+ # new connections assuming they are good now.
+ # run the select again to re-validate the Connection.
+ connection.scalar(select([1]))
def _set_session_sql_mode(dbapi_con, connection_rec, sql_mode=None):
@@ -482,12 +473,9 @@ def create_engine(sql_connection, sqlite_fk=False, mysql_sql_mode=None,
if thread_checkin:
sqlalchemy.event.listen(engine, 'checkin', _thread_yield)
- if engine.name in ('ibm_db_sa', 'mysql', 'postgresql'):
- ping_callback = functools.partial(_ping_listener, engine)
- sqlalchemy.event.listen(engine, 'checkout', ping_callback)
- if engine.name == 'mysql':
- if mysql_sql_mode is not None:
- _mysql_set_mode_callback(engine, mysql_sql_mode)
+ if engine.name == 'mysql':
+ if mysql_sql_mode is not None:
+ _mysql_set_mode_callback(engine, mysql_sql_mode)
elif 'sqlite' in connection_dict.drivername:
if not sqlite_synchronous:
sqlalchemy.event.listen(engine, 'connect',
@@ -522,6 +510,10 @@ def create_engine(sql_connection, sqlite_fk=False, mysql_sql_mode=None,
# register alternate exception handler
exc_filters.register_engine(engine)
+
+ # register on begin handler
+ sqlalchemy.event.listen(engine, "begin", _begin_ping_listener)
+
return engine
diff --git a/tests/sqlalchemy/test_exc_filters.py b/tests/sqlalchemy/test_exc_filters.py
index 7c3b250..6f95731 100644
--- a/tests/sqlalchemy/test_exc_filters.py
+++ b/tests/sqlalchemy/test_exc_filters.py
@@ -13,6 +13,7 @@
"""Test exception filters applied to engines."""
import contextlib
+import itertools
import mock
import six
@@ -60,6 +61,16 @@ class TestsExceptionFilter(test_base.DbTestCase):
"""
@contextlib.contextmanager
+ def _dbapi_fixture(self, dialect_name):
+ engine = self.engine
+ with contextlib.nested(
+ mock.patch.object(engine.dialect.dbapi, "Error",
+ self.Error),
+ mock.patch.object(engine.dialect, "name", dialect_name),
+ ):
+ yield
+
+ @contextlib.contextmanager
def _fixture(self, dialect_name, exception, is_disconnect=False):
def do_execute(self, cursor, statement, parameters, **kw):
@@ -451,3 +462,61 @@ class IntegrationTest(test_base.DbTestCase):
self.Foo.counter == sqla.func.imfake(123))
matched = self.assertRaises(sqla.exc.OperationalError, q.all)
self.assertTrue("no such function" in str(matched))
+
+
+class TestDBDisconnected(TestsExceptionFilter):
+
+ @contextlib.contextmanager
+ def _fixture(self, dialect_name, exception, num_disconnects):
+ engine = self.engine
+
+ real_do_execute = engine.dialect.do_execute
+ counter = itertools.count(1)
+
+ def fake_do_execute(self, *arg, **kw):
+ if next(counter) > num_disconnects:
+ return real_do_execute(self, *arg, **kw)
+ else:
+ raise exception
+
+ with self._dbapi_fixture(dialect_name):
+ with contextlib.nested(
+ mock.patch.object(engine.dialect,
+ "do_execute", fake_do_execute),
+ mock.patch.object(engine.dialect, "is_disconnect",
+ mock.Mock(return_value=True))
+ ):
+ yield
+
+ def _test_ping_listener_disconnected(self, dialect_name, exc_obj):
+ with self._fixture(dialect_name, exc_obj, 1):
+ conn = self.engine.connect()
+ with conn.begin():
+ self.assertEqual(conn.scalar(sqla.select([1])), 1)
+ self.assertFalse(conn.closed)
+ self.assertFalse(conn.invalidated)
+ self.assertTrue(conn.in_transaction())
+
+ with self._fixture(dialect_name, exc_obj, 2):
+ conn = self.engine.connect()
+ self.assertRaises(
+ exception.DBConnectionError,
+ conn.begin
+ )
+ self.assertFalse(conn.closed)
+ self.assertFalse(conn.in_transaction())
+ self.assertTrue(conn.invalidated)
+
+ def test_mysql_ping_listener_disconnected(self):
+ for code in [2006, 2013, 2014, 2045, 2055]:
+ self._test_ping_listener_disconnected(
+ "mysql",
+ self.OperationalError('%d MySQL server has gone away' % code)
+ )
+
+ def test_db2_ping_listener_disconnected(self):
+ self._test_ping_listener_disconnected(
+ "ibm_db_sa",
+ self.OperationalError(
+ 'SQL30081N: DB2 Server connection is no longer active')
+ )
diff --git a/tests/sqlalchemy/test_sqlalchemy.py b/tests/sqlalchemy/test_sqlalchemy.py
index 7dc070c..fac17ae 100644
--- a/tests/sqlalchemy/test_sqlalchemy.py
+++ b/tests/sqlalchemy/test_sqlalchemy.py
@@ -17,9 +17,7 @@
"""Unit tests for SQLAlchemy specific code."""
import logging
-from oslo.config import cfg
-import _mysql_exceptions
import fixtures
import mock
from oslotest import base as oslo_test
@@ -28,6 +26,7 @@ from sqlalchemy import Column, MetaData, Table
from sqlalchemy import Integer, String
from sqlalchemy.ext.declarative import declarative_base
+from oslo.config import cfg
from oslo.db import exception
from oslo.db import options as db_options
from oslo.db.sqlalchemy import models
@@ -127,53 +126,6 @@ class FakeDB2Engine(object):
pass
-class TestDBDisconnected(oslo_test.BaseTestCase):
-
- def _test_ping_listener_disconnected(self, connection):
- engine_args = {
- 'pool_recycle': 3600,
- 'echo': False,
- 'convert_unicode': True}
-
- engine = sqlalchemy.create_engine(connection, **engine_args)
- with mock.patch.object(engine, 'dispose') as dispose_mock:
- self.assertRaises(sqlalchemy.exc.DisconnectionError,
- session._ping_listener, engine,
- FakeDBAPIConnection(), FakeConnectionRec(),
- FakeConnectionProxy())
- dispose_mock.assert_called_once_with()
-
- def test_mysql_ping_listener_disconnected(self):
- def fake_execute(sql):
- raise _mysql_exceptions.OperationalError(self.mysql_error,
- ('MySQL server has '
- 'gone away'))
- with mock.patch.object(FakeCursor, 'execute',
- side_effect=fake_execute):
- connection = 'mysql://root:password@fakehost/fakedb?charset=utf8'
- for code in [2006, 2013, 2014, 2045, 2055]:
- self.mysql_error = code
- self._test_ping_listener_disconnected(connection)
-
- def test_db2_ping_listener_disconnected(self):
-
- def fake_execute(sql):
- raise OperationalError('SQL30081N: DB2 Server '
- 'connection is no longer active')
- with mock.patch.object(FakeCursor, 'execute',
- side_effect=fake_execute):
- # TODO(dperaza): Need a fake engine for db2 since ibm_db_sa is not
- # in global requirements. Change this code to use real IBM db2
- # engine as soon as ibm_db_sa is included in global-requirements
- # under openstack/requirements project.
- fake_create_engine = lambda *args, **kargs: FakeDB2Engine()
- with mock.patch.object(sqlalchemy, 'create_engine',
- side_effect=fake_create_engine):
- connection = ('ibm_db_sa://db2inst1:openstack@fakehost:50000'
- '/fakedab')
- self._test_ping_listener_disconnected(connection)
-
-
class MySQLModeTestCase(test_base.MySQLOpportunisticTestCase):
def __init__(self, *args, **kwargs):