summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--oslo.db/locale/oslo.db-log-warning.pot15
-rw-r--r--oslo/db/api.py88
-rw-r--r--oslo/db/exception.py14
-rw-r--r--oslo/db/options.py48
-rw-r--r--oslo/db/sqlalchemy/compat/__init__.py7
-rw-r--r--oslo/db/sqlalchemy/compat/handle_error.py30
-rw-r--r--oslo/db/sqlalchemy/compat/utils.py1
-rw-r--r--oslo/db/sqlalchemy/exc_filters.py112
-rw-r--r--oslo/db/sqlalchemy/provision.py44
-rw-r--r--oslo/db/sqlalchemy/session.py189
-rw-r--r--oslo/db/sqlalchemy/utils.py6
-rw-r--r--requirements.txt6
-rw-r--r--test-requirements.txt2
-rw-r--r--tests/sqlalchemy/test_exc_filters.py279
-rw-r--r--tests/sqlalchemy/test_handle_error.py49
-rw-r--r--tests/sqlalchemy/test_sqlalchemy.py66
-rw-r--r--tests/sqlalchemy/test_utils.py5
-rw-r--r--tox.ini4
18 files changed, 718 insertions, 247 deletions
diff --git a/oslo.db/locale/oslo.db-log-warning.pot b/oslo.db/locale/oslo.db-log-warning.pot
index ba17943..5a4c486 100644
--- a/oslo.db/locale/oslo.db-log-warning.pot
+++ b/oslo.db/locale/oslo.db-log-warning.pot
@@ -6,9 +6,9 @@
#, fuzzy
msgid ""
msgstr ""
-"Project-Id-Version: oslo.db 0.3.0.1.g4796d06\n"
+"Project-Id-Version: oslo.db 0.3.0.44.g8839e43\n"
"Report-Msgid-Bugs-To: EMAIL@ADDRESS\n"
-"POT-Creation-Date: 2014-07-14 06:03+0000\n"
+"POT-Creation-Date: 2014-07-28 06:03+0000\n"
"PO-Revision-Date: YEAR-MO-DA HO:MI+ZONE\n"
"Last-Translator: FULL NAME <EMAIL@ADDRESS>\n"
"Language-Team: LANGUAGE <LL@li.org>\n"
@@ -17,21 +17,16 @@ msgstr ""
"Content-Transfer-Encoding: 8bit\n"
"Generated-By: Babel 1.3\n"
-#: oslo/db/sqlalchemy/session.py:527
-#, python-format
-msgid "Database server has gone away: %s"
-msgstr ""
-
-#: oslo/db/sqlalchemy/session.py:580
+#: oslo/db/sqlalchemy/session.py:397
msgid "Unable to detect effective SQL mode"
msgstr ""
-#: oslo/db/sqlalchemy/session.py:588
+#: oslo/db/sqlalchemy/session.py:405
#, python-format
msgid "MySQL SQL mode is '%s', consider enabling TRADITIONAL or STRICT_ALL_TABLES"
msgstr ""
-#: oslo/db/sqlalchemy/session.py:696
+#: oslo/db/sqlalchemy/session.py:498
#, python-format
msgid "SQL connection failed. %s attempts left."
msgstr ""
diff --git a/oslo/db/api.py b/oslo/db/api.py
index 34f7c62..16d4157 100644
--- a/oslo/db/api.py
+++ b/oslo/db/api.py
@@ -13,10 +13,13 @@
# License for the specific language governing permissions and limitations
# under the License.
-"""Multiple DB API backend support.
+"""
+=================================
+Multiple DB API backend support.
+=================================
A DB backend module should implement a method named 'get_backend' which
-takes no arguments. The method can return any object that implements DB
+takes no arguments. The method can return any object that implements DB
API methods.
"""
@@ -35,19 +38,48 @@ LOG = logging.getLogger(__name__)
def safe_for_db_retry(f):
- """Enable db-retry for decorated function, if config option enabled."""
+ """Indicate api method as safe for re-connection to database.
+
+ Database connection retries will be enabled for the decorated api method.
+ Database connection failure can have many causes, which can be temporary.
+ In such cases retry may increase the likelihood of connection.
+
+ Usage::
+
+ @safe_for_db_retry
+ def api_method(self):
+ self.engine.connect()
+
+
+ :param f: database api method.
+ :type f: function.
+ """
f.__dict__['enable_retry'] = True
return f
class wrap_db_retry(object):
- """Retry db.api methods, if DBConnectionError() raised
+ """Decorator class. Retry db.api methods, if DBConnectionError() raised.
Retry decorated db.api methods. If we enabled `use_db_reconnect`
in config, this decorator will be applied to all db.api functions,
marked with @safe_for_db_retry decorator.
- Decorator catchs DBConnectionError() and retries function in a
+ Decorator catches DBConnectionError() and retries function in a
loop until it succeeds, or until maximum retries count will be reached.
+
+ Keyword arguments:
+
+ :param retry_interval: seconds between transaction retries
+ :type retry_interval: int
+
+ :param max_retries: max number of retries before an error is raised
+ :type max_retries: int
+
+ :param inc_retry_interval: determine increase retry interval or not
+ :type inc_retry_interval: bool
+
+ :param max_retry_interval: max interval value between retries
+ :type max_retry_interval: int
"""
def __init__(self, retry_interval, max_retries, inc_retry_interval,
@@ -88,37 +120,41 @@ class wrap_db_retry(object):
class DBAPI(object):
- def __init__(self, backend_name, backend_mapping=None, lazy=False,
- **kwargs):
- """Initialize the chosen DB API backend.
+ """Initialize the chosen DB API backend.
- :param backend_name: name of the backend to load
- :type backend_name: str
+ After initialization API methods is available as normal attributes of
+ ``DBAPI`` subclass. Database API methods are supposed to be called as
+ DBAPI instance methods.
- :param backend_mapping: backend name -> module/class to load mapping
- :type backend_mapping: dict
+ :param backend_name: name of the backend to load
+ :type backend_name: str
- :param lazy: load the DB backend lazily on the first DB API method call
- :type lazy: bool
+ :param backend_mapping: backend name -> module/class to load mapping
+ :type backend_mapping: dict
+ :default backend_mapping: None
- Keyword arguments:
+ :param lazy: load the DB backend lazily on the first DB API method call
+ :type lazy: bool
+ :default lazy: False
- :keyword use_db_reconnect: retry DB transactions on disconnect or not
- :type use_db_reconnect: bool
+ :keyword use_db_reconnect: retry DB transactions on disconnect or not
+ :type use_db_reconnect: bool
- :keyword retry_interval: seconds between transaction retries
- :type retry_interval: int
+ :keyword retry_interval: seconds between transaction retries
+ :type retry_interval: int
- :keyword inc_retry_interval: increase retry interval or not
- :type inc_retry_interval: bool
+ :keyword inc_retry_interval: increase retry interval or not
+ :type inc_retry_interval: bool
- :keyword max_retry_interval: max interval value between retries
- :type max_retry_interval: int
+ :keyword max_retry_interval: max interval value between retries
+ :type max_retry_interval: int
- :keyword max_retries: max number of retries before an error is raised
- :type max_retries: int
+ :keyword max_retries: max number of retries before an error is raised
+ :type max_retries: int
+ """
- """
+ def __init__(self, backend_name, backend_mapping=None, lazy=False,
+ **kwargs):
self._backend = None
self._backend_name = backend_name
diff --git a/oslo/db/exception.py b/oslo/db/exception.py
index 4adf350..c67e4ef 100644
--- a/oslo/db/exception.py
+++ b/oslo/db/exception.py
@@ -30,11 +30,23 @@ class DBError(Exception):
class DBDuplicateEntry(DBError):
"""Wraps an implementation specific exception."""
- def __init__(self, columns=None, inner_exception=None):
+ def __init__(self, columns=None, inner_exception=None, value=None):
self.columns = columns or []
+ self.value = value
super(DBDuplicateEntry, self).__init__(inner_exception)
+class DBReferenceError(DBError):
+ """Wraps an implementation specific exception."""
+ def __init__(self, table, constraint, key, key_table,
+ inner_exception=None):
+ self.table = table
+ self.constraint = constraint
+ self.key = key
+ self.key_table = key_table
+ super(DBReferenceError, self).__init__(inner_exception)
+
+
class DBDeadlock(DBError):
def __init__(self, inner_exception=None):
super(DBDeadlock, self).__init__(inner_exception)
diff --git a/oslo/db/options.py b/oslo/db/options.py
index 72e626c..b056b1c 100644
--- a/oslo/db/options.py
+++ b/oslo/db/options.py
@@ -138,7 +138,53 @@ database_opts = [
def set_defaults(conf, connection=None, sqlite_db=None,
max_pool_size=None, max_overflow=None,
pool_timeout=None):
- """Set defaults for configuration variables."""
+ """Set defaults for configuration variables.
+
+ Overrides default options values.
+
+ :param conf: Config instance specified to set default options in it. Using
+ of instances instead of a global config object prevents conflicts between
+ options declaration.
+ :type conf: oslo.config.cfg.ConfigOpts instance.
+
+ :keyword connection: SQL connection string.
+ Valid SQLite URL forms are:
+ * sqlite:///:memory: (or, sqlite://)
+ * sqlite:///relative/path/to/file.db
+ * sqlite:////absolute/path/to/file.db
+ :type connection: str
+
+ :keyword sqlite_db: path to SQLite database file.
+ :type sqlite_db: str
+
+ :keyword max_pool_size: maximum connections pool size. The size of the pool
+ to be maintained, defaults to 5, will be used if value of the parameter is
+ `None`. This is the largest number of connections that will be kept
+ persistently in the pool. Note that the pool begins with no connections;
+ once this number of connections is requested, that number of connections
+ will remain.
+ :type max_pool_size: int
+ :default max_pool_size: None
+
+ :keyword max_overflow: The maximum overflow size of the pool. When the
+ number of checked-out connections reaches the size set in pool_size,
+ additional connections will be returned up to this limit. When those
+ additional connections are returned to the pool, they are disconnected and
+ discarded. It follows then that the total number of simultaneous
+ connections the pool will allow is pool_size + max_overflow, and the total
+ number of "sleeping" connections the pool will allow is pool_size.
+ max_overflow can be set to -1 to indicate no overflow limit; no limit will
+ be placed on the total number of concurrent connections. Defaults to 10,
+ will be used if value of the parameter in `None`.
+ :type max_overflow: int
+ :default max_overflow: None
+
+ :keyword pool_timeout: The number of seconds to wait before giving up on
+ returning a connection. Defaults to 30, will be used if value of the
+ parameter is `None`.
+ :type pool_timeout: int
+ :default pool_timeout: None
+ """
conf.register_opts(database_opts, group='database')
diff --git a/oslo/db/sqlalchemy/compat/__init__.py b/oslo/db/sqlalchemy/compat/__init__.py
index 2ba9954..1510dd0 100644
--- a/oslo/db/sqlalchemy/compat/__init__.py
+++ b/oslo/db/sqlalchemy/compat/__init__.py
@@ -16,10 +16,11 @@ added at some point but for which oslo.db provides a compatible versions
for previous SQLAlchemy versions.
"""
-from oslo.db.sqlalchemy.compat import handle_error
+from oslo.db.sqlalchemy.compat import handle_error as _h_err
# trying to get: "from oslo.db.sqlalchemy import compat; compat.handle_error"
# flake8 won't let me import handle_error directly
-handle_error = handle_error.handle_error
+handle_error = _h_err.handle_error
+ExceptionContextImpl = _h_err.ExceptionContextImpl
-__all__ = ['handle_error']
+__all__ = ['handle_error', 'ExceptionContextImpl']
diff --git a/oslo/db/sqlalchemy/compat/handle_error.py b/oslo/db/sqlalchemy/compat/handle_error.py
index fd59cdd..6929008 100644
--- a/oslo/db/sqlalchemy/compat/handle_error.py
+++ b/oslo/db/sqlalchemy/compat/handle_error.py
@@ -78,7 +78,8 @@ def handle_error(engine, listener):
sqla_exc.StatementError):
sqlalchemy_exception = reraised_exception
original_exception = sqlalchemy_exception.orig
- is_disconnect = isinstance(sqlalchemy_exception,
+ self._is_disconnect = is_disconnect = \
+ isinstance(sqlalchemy_exception,
sqla_exc.DBAPIError) and sqlalchemy_exception.\
connection_invalidated
else:
@@ -104,12 +105,39 @@ def handle_error(engine, listener):
newraise = _raised
break
+ if sqlalchemy_exception and \
+ self._is_disconnect != ctx.is_disconnect:
+
+ if not ctx.is_disconnect:
+ raise NotImplementedError(
+ "Can't reset 'disconnect' status of exception "
+ "once it is set with this version of "
+ "SQLAlchemy")
+
+ sqlalchemy_exception.connection_invalidated = \
+ self._is_disconnect = ctx.is_disconnect
+ if self._is_disconnect:
+ self._do_disconnect(e)
+
if newraise:
six.reraise(type(newraise), newraise, sys.exc_info()[2])
else:
six.reraise(type(reraised_exception),
reraised_exception, sys.exc_info()[2])
+ def _do_disconnect(self, e):
+ del self._is_disconnect
+ if utils.sqla_094:
+ dbapi_conn_wrapper = self.connection
+ self.engine.pool._invalidate(dbapi_conn_wrapper, e)
+ self.invalidate(e)
+ else:
+ dbapi_conn_wrapper = self.connection
+ self.invalidate(e)
+ if not hasattr(dbapi_conn_wrapper, '_pool') or \
+ dbapi_conn_wrapper._pool is self.engine.pool:
+ self.engine.dispose()
+
engine._connection_cls = Connection
engine._oslo_handle_error_events.append(listener)
diff --git a/oslo/db/sqlalchemy/compat/utils.py b/oslo/db/sqlalchemy/compat/utils.py
index 6060dd1..a1c6d83 100644
--- a/oslo/db/sqlalchemy/compat/utils.py
+++ b/oslo/db/sqlalchemy/compat/utils.py
@@ -20,4 +20,5 @@ _SQLA_VERSION = tuple(
)
sqla_097 = _SQLA_VERSION >= (0, 9, 7)
+sqla_094 = _SQLA_VERSION >= (0, 9, 4)
sqla_08 = _SQLA_VERSION >= (0, 8)
diff --git a/oslo/db/sqlalchemy/exc_filters.py b/oslo/db/sqlalchemy/exc_filters.py
index 3ec19cc..d4d9438 100644
--- a/oslo/db/sqlalchemy/exc_filters.py
+++ b/oslo/db/sqlalchemy/exc_filters.py
@@ -57,7 +57,8 @@ def filters(dbname, exception_type, regex):
# psycopg2.extensions.TransactionRollbackError(OperationalError),
# as well as sqlalchemy.exc.DBAPIError, as SQLAlchemy will reraise it
# as this until issue #3075 is fixed.
-@filters("mysql", sqla_exc.OperationalError, r"^.*\(1213, 'Deadlock.*")
+@filters("mysql", sqla_exc.OperationalError, r"^.*\b1213\b.*Deadlock found.*")
+@filters("mysql", sqla_exc.InternalError, r"^.*\b1213\b.*Deadlock found.*")
@filters("postgresql", sqla_exc.OperationalError, r"^.*deadlock detected.*")
@filters("postgresql", sqla_exc.DBAPIError, r"^.*deadlock detected.*")
@filters("ibm_db_sa", sqla_exc.DBAPIError, r"^.*SQL0911N.*")
@@ -67,10 +68,14 @@ def _deadlock_error(operational_error, match, engine_name, is_disconnect):
NOTE(comstud): In current versions of DB backends, Deadlock violation
messages follow the structure:
- mysql:
+ mysql+mysqldb:
(OperationalError) (1213, 'Deadlock found when trying to get lock; try '
'restarting transaction') <query_str> <query_args>
+ mysql+mysqlconnector:
+ (InternalError) 1213 (40001): Deadlock found when trying to get lock; try
+ restarting transaction
+
postgresql:
(TransactionRollbackError) deadlock detected <deadlock_details>
@@ -84,9 +89,14 @@ def _deadlock_error(operational_error, match, engine_name, is_disconnect):
@filters("mysql", sqla_exc.IntegrityError,
- r"^.*\b1062\b.*Duplicate entry '[^']+' for key '([^']+)'.*$")
+ r"^.*\b1062\b.*Duplicate entry '(?P<value>[^']+)'"
+ r" for key '(?P<columns>[^']+)'.*$")
+# NOTE(pkholkin): the first regex is suitable only for PostgreSQL 9.x versions
+# the second regex is suitable for PostgreSQL 8.x versions
@filters("postgresql", sqla_exc.IntegrityError,
- r"^.*duplicate\s+key.*\"([^\"]+)\"\s*\n.*$")
+ (r'^.*duplicate\s+key.*"(?P<columns>[^"]+)"\s*\n.*'
+ r'Key\s+\((?P<key>.*)\)=\((?P<value>.*)\)\s+already\s+exists.*$',
+ r"^.*duplicate\s+key.*\"(?P<columns>[^\"]+)\"\s*\n.*$"))
def _default_dupe_key_error(integrity_error, match, engine_name,
is_disconnect):
"""Filter for MySQL or Postgresql duplicate key error.
@@ -116,7 +126,7 @@ def _default_dupe_key_error(integrity_error, match, engine_name,
"""
- columns = match.group(1)
+ columns = match.group('columns')
# note(vsergeyev): UniqueConstraint name convention: "uniq_t0c10c2"
# where `t` it is table name and columns `c1`, `c2`
@@ -130,12 +140,14 @@ def _default_dupe_key_error(integrity_error, match, engine_name,
else:
columns = columns[len(uniqbase):].split("0")[1:]
- raise exception.DBDuplicateEntry(columns, integrity_error)
+ value = match.groupdict().get('value')
+
+ raise exception.DBDuplicateEntry(columns, integrity_error, value)
@filters("sqlite", sqla_exc.IntegrityError,
- (r"^.*columns?([^)]+)(is|are)\s+not\s+unique$",
- r"^.*UNIQUE\s+constraint\s+failed:\s+(.+)$"))
+ (r"^.*columns?(?P<columns>[^)]+)(is|are)\s+not\s+unique$",
+ r"^.*UNIQUE\s+constraint\s+failed:\s+(?P<columns>.+)$"))
def _sqlite_dupe_key_error(integrity_error, match, engine_name, is_disconnect):
"""Filter for SQLite duplicate key error.
@@ -151,11 +163,48 @@ def _sqlite_dupe_key_error(integrity_error, match, engine_name, is_disconnect):
N columns - (IntegrityError) UNIQUE constraint failed: tbl.k1, tbl.k2
"""
- columns = match.group(1)
+ columns = match.group('columns')
columns = [c.split('.')[-1] for c in columns.strip().split(", ")]
raise exception.DBDuplicateEntry(columns, integrity_error)
+@filters("sqlite", sqla_exc.IntegrityError,
+ r".*SQL error: foreign key constraint failed")
+@filters("postgresql", sqla_exc.IntegrityError,
+ r".*on table \"(?P<table>[^\"]+)\" violates "
+ "foreign key constraint \"(?P<constraint>[^\"]+)\"\s*\n"
+ "DETAIL: Key \((?P<key>.+)\)=\(.+\) "
+ "is not present in table "
+ "\"(?P<key_table>[^\"]+)\".")
+@filters("mysql", sqla_exc.IntegrityError,
+ r".* Cannot add or update a child row: "
+ "a foreign key constraint fails "
+ "\((?P<table>.+), CONSTRAINT (?P<constraint>.+) "
+ "FOREIGN KEY \((?P<key>.+)\) "
+ "REFERENCES (?P<key_table>.+) \(.+\)\)")
+def _foreign_key_error(integrity_error, match, engine_name, is_disconnect):
+ """Filter for foreign key errors."""
+ try:
+ table = match.group("table")
+ except IndexError:
+ table = None
+ try:
+ constraint = match.group("constraint")
+ except IndexError:
+ constraint = None
+ try:
+ key = match.group("key")
+ except IndexError:
+ key = None
+ try:
+ key_table = match.group("key_table")
+ except IndexError:
+ key_table = None
+
+ raise exception.DBReferenceError(table, constraint, key, key_table,
+ integrity_error)
+
+
@filters("ibm_db_sa", sqla_exc.IntegrityError, r"^.*SQL0803N.*$")
def _db2_dupe_key_error(integrity_error, match, engine_name, is_disconnect):
"""Filter for DB2 duplicate key errors.
@@ -174,7 +223,7 @@ def _db2_dupe_key_error(integrity_error, match, engine_name, is_disconnect):
raise exception.DBDuplicateEntry([], integrity_error)
-@filters("mysql", sqla_exc.DBAPIError, r".*\(1146")
+@filters("mysql", sqla_exc.DBAPIError, r".*\b1146\b")
def _raise_mysql_table_doesnt_exist_asis(
error, match, engine_name, is_disconnect):
"""Raise MySQL error 1146 as is, so that it does not conflict with
@@ -202,6 +251,15 @@ def _raise_operational_errors_directly_filter(operational_error,
raise operational_error
+# For the db2, the error code is -30081 since the db2 is still not ready
+@filters("mysql", sqla_exc.OperationalError, r".*\((?:2002|2003|2006|2013)")
+@filters("ibm_db_sa", sqla_exc.OperationalError, r".*(?:-30081)")
+def _is_db_connection_error(operational_error, match, engine_name,
+ is_disconnect):
+ """Detect the exception as indicating a recoverable error on connect."""
+ raise exception.DBConnectionError(operational_error)
+
+
@filters("*", sqla_exc.DBAPIError, r".*")
def _raise_for_remaining_DBAPIError(error, match, engine_name, is_disconnect):
"""Filter for remaining DBAPIErrors and wrap if they represent
@@ -253,12 +311,36 @@ def handler(context):
for fn, regexp in regexp_reg:
match = regexp.match(exc.message)
if match:
- fn(
- exc,
- match,
- context.connection.dialect.name,
- context.is_disconnect)
+ try:
+ fn(
+ exc,
+ match,
+ context.connection.dialect.name,
+ context.is_disconnect)
+ except exception.DBConnectionError:
+ context.is_disconnect = True
+ raise
def register_engine(engine):
compat.handle_error(engine, handler)
+
+
+def handle_connect_error(engine):
+ """Provide a special context that will allow on-connect errors
+ to be raised within the filtering context.
+
+ """
+ try:
+ return engine.connect()
+ except Exception as e:
+ if isinstance(e, sqla_exc.StatementError):
+ s_exc, orig = e, e.orig
+ else:
+ s_exc, orig = None, e
+
+ ctx = compat.ExceptionContextImpl(
+ orig, s_exc, engine, None,
+ None, None, None, False
+ )
+ handler(ctx)
diff --git a/oslo/db/sqlalchemy/provision.py b/oslo/db/sqlalchemy/provision.py
index 315d599..f1aa2cd 100644
--- a/oslo/db/sqlalchemy/provision.py
+++ b/oslo/db/sqlalchemy/provision.py
@@ -16,6 +16,7 @@
"""Provision test environment for specific DB backends"""
import argparse
+import copy
import logging
import os
import random
@@ -34,9 +35,9 @@ def get_engine(uri):
"""Engine creation
Call the function without arguments to get admin connection. Admin
- connection required to create temporary user and database for each
- particular test. Otherwise use existing connection to recreate connection
- to the temporary database.
+ connection required to create temporary database for each
+ particular test. Otherwise use existing connection to recreate
+ connection to the temporary database.
"""
return sqlalchemy.create_engine(uri, poolclass=sqlalchemy.pool.NullPool)
@@ -57,31 +58,33 @@ def _execute_sql(engine, sql, driver):
def create_database(engine):
- """Provide temporary user and database for each particular test."""
+ """Provide temporary database for each particular test."""
driver = engine.name
- auth = {
- 'database': ''.join(random.choice(string.ascii_lowercase)
- for i in moves.range(10)),
- 'user': engine.url.username,
- 'passwd': engine.url.password,
- }
+ database = ''.join(random.choice(string.ascii_lowercase)
+ for i in moves.range(10))
if driver == 'sqlite':
- return 'sqlite:////tmp/%s' % auth['database']
+ database = '/tmp/%s' % database
elif driver in ['mysql', 'postgresql']:
- sql = 'create database %s;' % auth['database']
+ sql = 'create database %s;' % database
_execute_sql(engine, [sql], driver)
else:
raise ValueError('Unsupported RDBMS %s' % driver)
- params = auth.copy()
- params['backend'] = driver
- return "%(backend)s://%(user)s:%(passwd)s@localhost/%(database)s" % params
+ # Both shallow and deep copies may lead to surprising behaviour
+ # without knowing the implementation of sqlalchemy.engine.url.
+ # Use a shallow copy here, since we're only overriding a single
+ # property, invoking __str__ and then discarding our copy. This
+ # is currently safe and _should_ remain safe into the future.
+ new_url = copy.copy(engine.url)
+
+ new_url.database = database
+ return str(new_url)
def drop_database(admin_engine, current_uri):
- """Drop temporary database and user after each particular test."""
+ """Drop temporary database after each particular test."""
engine = get_engine(current_uri)
driver = engine.name
@@ -101,8 +104,8 @@ def drop_database(admin_engine, current_uri):
def main():
"""Controller to handle commands
- ::create: Create test user and database with random names.
- ::drop: Drop user and database created by previous command.
+ ::create: Create test database with random names.
+ ::drop: Drop database created by previous command.
"""
parser = argparse.ArgumentParser(
description='Controller to handle database creation and dropping'
@@ -115,8 +118,7 @@ def main():
create = subparsers.add_parser(
'create',
- help='Create temporary test '
- 'databases and users.')
+ help='Create temporary test databases.')
create.set_defaults(which='create')
create.add_argument(
'instances_count',
@@ -125,7 +127,7 @@ def main():
drop = subparsers.add_parser(
'drop',
- help='Drop temporary test databases and users.')
+ help='Drop temporary test databases.')
drop.set_defaults(which='drop')
drop.add_argument(
'instances',
diff --git a/oslo/db/sqlalchemy/session.py b/oslo/db/sqlalchemy/session.py
index 2c91b8c..442ee21 100644
--- a/oslo/db/sqlalchemy/session.py
+++ b/oslo/db/sqlalchemy/session.py
@@ -279,17 +279,19 @@ Efficient use of soft deletes:
"""
import functools
+import itertools
import logging
import re
import time
import six
-from sqlalchemy import exc as sqla_exc
from sqlalchemy.interfaces import PoolListener
import sqlalchemy.orm
from sqlalchemy.pool import NullPool, StaticPool
from sqlalchemy.sql.expression import literal_column
+from sqlalchemy.sql.expression import select
+from oslo.db import exception
from oslo.db.openstack.common.gettextutils import _LW
from oslo.db.openstack.common import timeutils
from oslo.db import options
@@ -335,34 +337,23 @@ def _thread_yield(dbapi_con, con_record):
time.sleep(0)
-def _ping_listener(engine, dbapi_conn, connection_rec, connection_proxy):
- """Ensures that MySQL, PostgreSQL or DB2 connections are alive.
+def _begin_ping_listener(connection):
+ """Ping the server at transaction begin and transparently reconnect
+ if a disconnect exception occurs.
- Borrowed from:
- http://groups.google.com/group/sqlalchemy/msg/a4ce563d802c929f
"""
- cursor = dbapi_conn.cursor()
try:
- ping_sql = 'select 1'
- if engine.name == 'ibm_db_sa':
- # DB2 requires a table expression
- ping_sql = 'select 1 from (values (1)) AS t1'
- cursor.execute(ping_sql)
- except Exception as ex:
- if engine.dialect.is_disconnect(ex, dbapi_conn, cursor):
- msg = _LW('Database server has gone away: %s') % ex
- LOG.warning(msg)
-
- # if the database server has gone away, all connections in the pool
- # have become invalid and we can safely close all of them here,
- # rather than waste time on checking of every single connection
- engine.dispose()
-
- # this will be handled by SQLAlchemy and will force it to create
- # a new connection and retry the original action
- raise sqla_exc.DisconnectionError(msg)
- else:
- raise
+ # run a SELECT 1. use a core select() so that
+ # any details like that needed by Oracle, DB2 etc. are handled.
+ connection.scalar(select([1]))
+ except exception.DBConnectionError:
+ # catch DBConnectionError, which is raised by the filter
+ # system.
+ # disconnect detected. The connection is now
+ # "invalid", but the pool should be ready to return
+ # new connections assuming they are good now.
+ # run the select again to re-validate the Connection.
+ connection.scalar(select([1]))
def _set_session_sql_mode(dbapi_con, connection_rec, sql_mode=None):
@@ -424,18 +415,6 @@ def _mysql_set_mode_callback(engine, sql_mode):
_mysql_check_effective_sql_mode(engine)
-def _is_db_connection_error(args):
- """Return True if error in connecting to db."""
- # NOTE(adam_g): This is currently MySQL specific and needs to be extended
- # to support Postgres and others.
- # For the db2, the error code is -30081 since the db2 is still not ready
- conn_err_codes = ('2002', '2003', '2006', '2013', '-30081')
- for err_code in conn_err_codes:
- if args.find(err_code) != -1:
- return True
- return False
-
-
def create_engine(sql_connection, sqlite_fk=False, mysql_sql_mode=None,
idle_timeout=3600,
connection_debug=0, max_pool_size=None, max_overflow=None,
@@ -487,49 +466,48 @@ def create_engine(sql_connection, sqlite_fk=False, mysql_sql_mode=None,
if thread_checkin:
sqlalchemy.event.listen(engine, 'checkin', _thread_yield)
- if engine.name in ('ibm_db_sa', 'mysql', 'postgresql'):
- ping_callback = functools.partial(_ping_listener, engine)
- sqlalchemy.event.listen(engine, 'checkout', ping_callback)
- if engine.name == 'mysql':
- if mysql_sql_mode is not None:
- _mysql_set_mode_callback(engine, mysql_sql_mode)
+ if engine.name == 'mysql':
+ if mysql_sql_mode is not None:
+ _mysql_set_mode_callback(engine, mysql_sql_mode)
elif 'sqlite' in connection_dict.drivername:
if not sqlite_synchronous:
sqlalchemy.event.listen(engine, 'connect',
_synchronous_switch_listener)
sqlalchemy.event.listen(engine, 'connect', _add_regexp_listener)
- if connection_trace and engine.dialect.dbapi.__name__ == 'MySQLdb':
- _patch_mysqldb_with_stacktrace_comments()
-
- try:
- engine.connect()
- except sqla_exc.OperationalError as e:
- if not _is_db_connection_error(e.args[0]):
- raise
-
- remaining = max_retries
- if remaining == -1:
- remaining = 'infinite'
- while True:
- msg = _LW('SQL connection failed. %s attempts left.')
- LOG.warning(msg, remaining)
- if remaining != 'infinite':
- remaining -= 1
- time.sleep(retry_interval)
- try:
- engine.connect()
- break
- except sqla_exc.OperationalError as e:
- if (remaining != 'infinite' and remaining == 0) or \
- not _is_db_connection_error(e.args[0]):
- raise
+ if connection_trace:
+ _add_trace_comments(engine)
# register alternate exception handler
exc_filters.register_engine(engine)
+
+ # register on begin handler
+ sqlalchemy.event.listen(engine, "begin", _begin_ping_listener)
+
+ # initial connect + test
+ _test_connection(engine, max_retries, retry_interval)
+
return engine
+def _test_connection(engine, max_retries, retry_interval):
+ if max_retries == -1:
+ attempts = itertools.count()
+ else:
+ attempts = six.moves.range(max_retries)
+ de = None
+ for attempt in attempts:
+ try:
+ return exc_filters.handle_connect_error(engine)
+ except exception.DBConnectionError as de:
+ msg = _LW('SQL connection failed. %s attempts left.')
+ LOG.warning(msg, max_retries - attempt)
+ time.sleep(retry_interval)
+ else:
+ if de is not None:
+ six.reraise(type(de), de)
+
+
class Query(sqlalchemy.orm.query.Query):
"""Subclass of sqlalchemy.query with soft_delete() method."""
def soft_delete(self, synchronize_session='evaluate'):
@@ -552,47 +530,48 @@ def get_maker(engine, autocommit=True, expire_on_commit=False):
query_cls=Query)
-def _patch_mysqldb_with_stacktrace_comments():
- """Adds current stack trace as a comment in queries.
+def _add_trace_comments(engine):
+ """Augment statements with a trace of the immediate calling code
+ for a given statement.
- Patches MySQLdb.cursors.BaseCursor._do_query.
"""
- import MySQLdb.cursors
+
+ import os
+ import sys
import traceback
+ target_paths = set([
+ os.path.dirname(sys.modules['oslo.db'].__file__),
+ os.path.dirname(sys.modules['sqlalchemy'].__file__)
+ ])
+
+ @sqlalchemy.event.listens_for(engine, "before_cursor_execute", retval=True)
+ def before_cursor_execute(conn, cursor, statement, parameters, context,
+ executemany):
+
+ # NOTE(zzzeek) - if different steps per DB dialect are desirable
+ # here, switch out on engine.name for now.
+ stack = traceback.extract_stack()
+ our_line = None
+ for idx, (filename, line, method, function) in enumerate(stack):
+ for tgt in target_paths:
+ if filename.startswith(tgt):
+ our_line = idx
+ break
+ if our_line:
+ break
- old_mysql_do_query = MySQLdb.cursors.BaseCursor._do_query
-
- def _do_query(self, q):
- stack = ''
- for filename, line, method, function in traceback.extract_stack():
- # exclude various common things from trace
- if filename.endswith('session.py') and method == '_do_query':
- continue
- if filename.endswith('api.py') and method == 'wrapper':
- continue
- if filename.endswith('utils.py') and method == '_inner':
- continue
- if filename.endswith('exception.py') and method == '_wrap':
- continue
- # db/api is just a wrapper around db/sqlalchemy/api
- if filename.endswith('db/api.py'):
- continue
- # only trace inside oslo
- index = filename.rfind('oslo')
- if index == -1:
- continue
- stack += "File:%s:%s Method:%s() Line:%s | " \
- % (filename[index:], line, method, function)
-
- # strip trailing " | " from stack
- if stack:
- stack = stack[:-3]
- qq = "%s /* %s */" % (q, stack)
- else:
- qq = q
- old_mysql_do_query(self, qq)
+ if our_line:
+ trace = "; ".join(
+ "File: %s (%s) %s" % (
+ line[0], line[1], line[2]
+ )
+ # include three lines of context.
+ for line in stack[our_line - 3:our_line]
+
+ )
+ statement = "%s -- %s" % (statement, trace)
- setattr(MySQLdb.cursors.BaseCursor, '_do_query', _do_query)
+ return statement, parameters
class EngineFacade(object):
diff --git a/oslo/db/sqlalchemy/utils.py b/oslo/db/sqlalchemy/utils.py
index a58b1bf..5aa9341 100644
--- a/oslo/db/sqlalchemy/utils.py
+++ b/oslo/db/sqlalchemy/utils.py
@@ -656,7 +656,8 @@ def _change_deleted_column_type_to_id_type_sqlite(migrate_engine, table_name,
execute()
-def get_connect_string(backend, database, user=None, passwd=None):
+def get_connect_string(backend, database, user=None, passwd=None,
+ host='localhost'):
"""Get database connection
Try to get a connection with a very specific set of values, if we get
@@ -665,11 +666,12 @@ def get_connect_string(backend, database, user=None, passwd=None):
args = {'backend': backend,
'user': user,
'passwd': passwd,
+ 'host': host,
'database': database}
if backend == 'sqlite':
template = '%(backend)s:///%(database)s'
else:
- template = "%(backend)s://%(user)s:%(passwd)s@localhost/%(database)s"
+ template = "%(backend)s://%(user)s:%(passwd)s@%(host)s/%(database)s"
return template % args
diff --git a/requirements.txt b/requirements.txt
index c2e16ce..b8b4400 100644
--- a/requirements.txt
+++ b/requirements.txt
@@ -1,7 +1,7 @@
-alembic>=0.4.1
+alembic>=0.6.4
Babel>=1.3
iso8601>=0.1.9
-oslo.config>=1.2.1
-SQLAlchemy>=0.7.8,!=0.9.5,<=0.9.99
+oslo.config>=1.4.0.0a3
+SQLAlchemy>=0.8.4,<=0.8.99,>=0.9.7,<=0.9.99
sqlalchemy-migrate>=0.9.1
stevedore>=0.14
diff --git a/test-requirements.txt b/test-requirements.txt
index c0f57a7..1ff87b6 100644
--- a/test-requirements.txt
+++ b/test-requirements.txt
@@ -1,4 +1,4 @@
-hacking>=0.9.1,<0.10
+hacking>=0.9.2,<0.10
coverage>=3.6
discover
diff --git a/tests/sqlalchemy/test_exc_filters.py b/tests/sqlalchemy/test_exc_filters.py
index 649716c..42e4365 100644
--- a/tests/sqlalchemy/test_exc_filters.py
+++ b/tests/sqlalchemy/test_exc_filters.py
@@ -13,6 +13,7 @@
"""Test exception filters applied to engines."""
import contextlib
+import itertools
import mock
import six
@@ -20,6 +21,7 @@ import sqlalchemy as sqla
from sqlalchemy.orm import mapper
from oslo.db import exception
+from oslo.db.sqlalchemy import session
from oslo.db.sqlalchemy import test_base
_TABLE_NAME = '__tmp__test__tmp__'
@@ -41,6 +43,9 @@ class TestsExceptionFilter(test_base.DbTestCase):
class InterfaceError(Error):
pass
+ class InternalError(Error):
+ pass
+
class IntegrityError(Error):
pass
@@ -57,6 +62,16 @@ class TestsExceptionFilter(test_base.DbTestCase):
"""
@contextlib.contextmanager
+ def _dbapi_fixture(self, dialect_name):
+ engine = self.engine
+ with contextlib.nested(
+ mock.patch.object(engine.dialect.dbapi, "Error",
+ self.Error),
+ mock.patch.object(engine.dialect, "name", dialect_name),
+ ):
+ yield
+
+ @contextlib.contextmanager
def _fixture(self, dialect_name, exception, is_disconnect=False):
def do_execute(self, cursor, statement, parameters, **kw):
@@ -166,16 +181,100 @@ class TestFallthroughsAndNonDBAPI(TestsExceptionFilter):
self.assertEqual("mysqldb has an attribute error", matched.message)
+class TestRaiseReferenceError(TestsExceptionFilter):
+ def test_postgresql(self):
+ e = self._run_test(
+ "postgresql",
+ "INSERT SOMETHING",
+ self.IntegrityError(
+ "insert or update on table "
+ "\"resource_entity\" "
+ "violates foreign key constraint "
+ "\"resource_entity_entity_id_fkey\"\n"
+ "DETAIL: Key "
+ "(entity_id)=(74b5da71-5a9c-4f89-a8e9-4a2d856e6c29) "
+ "is not present in table \"entity\".\n"
+ "'INSERT INTO resource_entity (resource_id, entity_id, name) "
+ "VALUES (%(resource_id)s, "
+ "%(entity_id)s, %(name)s)' "
+ "{'entity_id': '74b5da71-5a9c-4f89-a8e9-4a2d856e6c29', "
+ "'name': u'foo', "
+ "'resource_id': 'ffb12cb4-d955-4d96-a315-5f48ea161eef'}"),
+ exception.DBReferenceError,
+ )
+ self.assertEqual("resource_entity", e.table)
+ self.assertEqual("resource_entity_entity_id_fkey", e.constraint)
+ self.assertEqual("entity_id", e.key)
+ self.assertEqual("entity", e.key_table)
+ self.assertEqual(
+ "(IntegrityError) insert or update on table "
+ "\"resource_entity\" violates foreign key constraint "
+ "\"resource_entity_entity_id_fkey\"\n"
+ "DETAIL: Key (entity_id)=(74b5da71-5a9c-4f89-a8e9-4a2d856e6c29) "
+ "is not present in table \"entity\".\n"
+ "'INSERT INTO resource_entity (resource_id, entity_id, name) "
+ "VALUES (%(resource_id)s, %(entity_id)s, %(name)s)' "
+ "{'entity_id': '74b5da71-5a9c-4f89-a8e9-4a2d856e6c29', "
+ "'name': u'foo', "
+ "'resource_id': 'ffb12cb4-d955-4d96-a315-5f48ea161eef'} "
+ "'INSERT SOMETHING' ()",
+ str(e))
+
+ def test_mysql(self):
+ e = self._run_test(
+ "mysql",
+ "INSERT SOMETHING",
+ self.IntegrityError(
+ "Cannot add or update a child row: "
+ "a foreign key constraint fails "
+ "(resource_entity, CONSTRAINT resource_entity_entity_id_fkey "
+ "FOREIGN KEY (entity_id) "
+ "REFERENCES entity (entity_id))"
+ ),
+ exception.DBReferenceError,
+ )
+ self.assertEqual("resource_entity", e.table)
+ self.assertEqual("resource_entity_entity_id_fkey", e.constraint)
+ self.assertEqual("entity_id", e.key)
+ self.assertEqual("entity", e.key_table)
+ self.assertEqual(
+ "(IntegrityError) Cannot add or update a child row: "
+ "a foreign key constraint fails "
+ "(resource_entity, CONSTRAINT resource_entity_entity_id_fkey "
+ "FOREIGN KEY (entity_id) REFERENCES entity (entity_id)) "
+ "'INSERT SOMETHING' ()",
+ str(e))
+
+ def test_sqlite(self):
+ e = self._run_test(
+ "sqlite",
+ "INSERT SOMETHING",
+ self.IntegrityError(
+ "SQL error: foreign key constraint failed"
+ ),
+ exception.DBReferenceError,
+ )
+ self.assertIsNone(e.table)
+ self.assertIsNone(e.constraint)
+ self.assertIsNone(e.key)
+ self.assertIsNone(e.key_table)
+ self.assertEqual(
+ "(IntegrityError) SQL error: foreign key "
+ "constraint failed 'INSERT SOMETHING' ()",
+ str(e))
+
+
class TestDuplicate(TestsExceptionFilter):
def _run_dupe_constraint_test(self, dialect_name, message,
- expected_columns=['a', 'b']):
+ expected_columns=['a', 'b'], expected_value=None):
matched = self._run_test(
dialect_name, "insert into table some_values",
self.IntegrityError(message),
exception.DBDuplicateEntry
)
self.assertEqual(expected_columns, matched.columns)
+ self.assertEqual(expected_value, matched.value)
def _not_dupe_constraint_test(self, dialect_name, statement, message,
expected_cls, expected_message):
@@ -196,19 +295,36 @@ class TestDuplicate(TestsExceptionFilter):
def test_mysql_mysqldb(self):
self._run_dupe_constraint_test("mysql",
'(1062, "Duplicate entry '
- '\'2-3\' for key \'uniq_tbl0a0b\'")')
+ '\'2-3\' for key \'uniq_tbl0a0b\'")', expected_value='2-3')
def test_mysql_mysqlconnector(self):
self._run_dupe_constraint_test("mysql",
'1062 (23000): Duplicate entry '
- '\'2-3\' for key \'uniq_tbl0a0b\'")')
+ '\'2-3\' for key \'uniq_tbl0a0b\'")', expected_value='2-3')
def test_postgresql(self):
self._run_dupe_constraint_test(
'postgresql',
'duplicate key value violates unique constraint'
'"uniq_tbl0a0b"'
- '\nDETAIL: Key (a, b)=(2, 3) already exists.\n'
+ '\nDETAIL: Key (a, b)=(2, 3) already exists.\n',
+ expected_value='2, 3'
+ )
+
+ def test_mysql_single(self):
+ self._run_dupe_constraint_test("mysql",
+ "1062 (23000): Duplicate entry '2' for key 'b'",
+ expected_columns=['b'],
+ expected_value='2'
+ )
+
+ def test_postgresql_single(self):
+ self._run_dupe_constraint_test(
+ 'postgresql',
+ 'duplicate key value violates unique constraint "uniq_tbl0b"\n'
+ 'DETAIL: Key (b)=(2) already exists.\n',
+ expected_columns=['b'],
+ expected_value='2'
)
def test_unsupported_backend(self):
@@ -287,7 +403,7 @@ class TestDeadlock(TestsExceptionFilter):
str(matched)
)
- def test_mysql_deadlock(self):
+ def test_mysql_mysqldb_deadlock(self):
self._run_deadlock_detect_test(
"mysql",
"(1213, 'Deadlock found when trying "
@@ -295,6 +411,14 @@ class TestDeadlock(TestsExceptionFilter):
"transaction')"
)
+ def test_mysql_mysqlconnector_deadlock(self):
+ self._run_deadlock_detect_test(
+ "mysql",
+ "1213 (40001): Deadlock found when trying to get lock; try "
+ "restarting transaction",
+ orig_exception_cls=self.InternalError
+ )
+
def test_mysql_not_deadlock(self):
self._not_deadlock_test(
"mysql",
@@ -440,3 +564,148 @@ class IntegrationTest(test_base.DbTestCase):
self.Foo.counter == sqla.func.imfake(123))
matched = self.assertRaises(sqla.exc.OperationalError, q.all)
self.assertTrue("no such function" in str(matched))
+
+
+class TestDBDisconnected(TestsExceptionFilter):
+
+ @contextlib.contextmanager
+ def _fixture(self, dialect_name, exception, num_disconnects):
+ engine = self.engine
+
+ real_do_execute = engine.dialect.do_execute
+ counter = itertools.count(1)
+
+ def fake_do_execute(self, *arg, **kw):
+ if next(counter) > num_disconnects:
+ return real_do_execute(self, *arg, **kw)
+ else:
+ raise exception
+
+ with self._dbapi_fixture(dialect_name):
+ with contextlib.nested(
+ mock.patch.object(engine.dialect,
+ "do_execute", fake_do_execute),
+ mock.patch.object(engine.dialect, "is_disconnect",
+ mock.Mock(return_value=True))
+ ):
+ yield
+
+ def _test_ping_listener_disconnected(self, dialect_name, exc_obj):
+ with self._fixture(dialect_name, exc_obj, 1):
+ conn = self.engine.connect()
+ with conn.begin():
+ self.assertEqual(conn.scalar(sqla.select([1])), 1)
+ self.assertFalse(conn.closed)
+ self.assertFalse(conn.invalidated)
+ self.assertTrue(conn.in_transaction())
+
+ with self._fixture(dialect_name, exc_obj, 2):
+ conn = self.engine.connect()
+ self.assertRaises(
+ exception.DBConnectionError,
+ conn.begin
+ )
+ self.assertFalse(conn.closed)
+ self.assertFalse(conn.in_transaction())
+ self.assertTrue(conn.invalidated)
+
+ def test_mysql_ping_listener_disconnected(self):
+ for code in [2006, 2013, 2014, 2045, 2055]:
+ self._test_ping_listener_disconnected(
+ "mysql",
+ self.OperationalError('%d MySQL server has gone away' % code)
+ )
+
+ def test_db2_ping_listener_disconnected(self):
+ self._test_ping_listener_disconnected(
+ "ibm_db_sa",
+ self.OperationalError(
+ 'SQL30081N: DB2 Server connection is no longer active')
+ )
+
+
+class TestDBConnectRetry(TestsExceptionFilter):
+
+ def _run_test(self, dialect_name, exception, count, retries):
+ counter = itertools.count()
+
+ engine = self.engine
+
+ # empty out the connection pool
+ engine.dispose()
+
+ connect_fn = engine.dialect.connect
+
+ def cant_connect(*arg, **kw):
+ if next(counter) < count:
+ raise exception
+ else:
+ return connect_fn(*arg, **kw)
+
+ with self._dbapi_fixture(dialect_name):
+ with mock.patch.object(engine.dialect, "connect", cant_connect):
+ return session._test_connection(engine, retries, .01)
+
+ def test_connect_no_retries(self):
+ conn = self._run_test(
+ "mysql",
+ self.OperationalError("Error: (2003) something wrong"),
+ 2, 0
+ )
+ # didnt connect because nothing was tried
+ self.assertIsNone(conn)
+
+ def test_connect_inifinite_retries(self):
+ conn = self._run_test(
+ "mysql",
+ self.OperationalError("Error: (2003) something wrong"),
+ 2, -1
+ )
+ # conn is good
+ self.assertEqual(conn.scalar(sqla.select([1])), 1)
+
+ def test_connect_retry_past_failure(self):
+ conn = self._run_test(
+ "mysql",
+ self.OperationalError("Error: (2003) something wrong"),
+ 2, 3
+ )
+ # conn is good
+ self.assertEqual(conn.scalar(sqla.select([1])), 1)
+
+ def test_connect_retry_not_candidate_exception(self):
+ self.assertRaises(
+ sqla.exc.OperationalError, # remember, we pass OperationalErrors
+ # through at the moment :)
+ self._run_test,
+ "mysql",
+ self.OperationalError("Error: (2015) I can't connect period"),
+ 2, 3
+ )
+
+ def test_connect_retry_stops_infailure(self):
+ self.assertRaises(
+ exception.DBConnectionError,
+ self._run_test,
+ "mysql",
+ self.OperationalError("Error: (2003) something wrong"),
+ 3, 2
+ )
+
+ def test_db2_error_positive(self):
+ conn = self._run_test(
+ "ibm_db_sa",
+ self.OperationalError("blah blah -30081 blah blah"),
+ 2, -1
+ )
+ # conn is good
+ self.assertEqual(conn.scalar(sqla.select([1])), 1)
+
+ def test_db2_error_negative(self):
+ self.assertRaises(
+ sqla.exc.OperationalError,
+ self._run_test,
+ "ibm_db_sa",
+ self.OperationalError("blah blah -39981 blah blah"),
+ 2, 3
+ )
diff --git a/tests/sqlalchemy/test_handle_error.py b/tests/sqlalchemy/test_handle_error.py
index f9d1a96..a9e6f62 100644
--- a/tests/sqlalchemy/test_handle_error.py
+++ b/tests/sqlalchemy/test_handle_error.py
@@ -28,6 +28,7 @@ from sqlalchemy.types import Integer
from sqlalchemy.types import TypeDecorator
from oslo.db.sqlalchemy.compat import handle_error
+from oslo.db.sqlalchemy.compat import utils
class MyException(Exception):
@@ -141,3 +142,51 @@ class ExceptionReraiseTest(test_base.BaseTestCase):
self.assertTrue(ctx.statement.startswith("SELECT 1 "))
self.assertIs(ctx.is_disconnect, False)
self.assertIs(ctx.original_exception, nope)
+
+ def _test_alter_disconnect(self, orig_error, evt_value):
+ engine = self.engine
+
+ def evt(ctx):
+ ctx.is_disconnect = evt_value
+ handle_error(engine, evt)
+
+ # if we are under sqla 0.9.7, and we are expecting to take
+ # an "is disconnect" exception and make it not a disconnect,
+ # that isn't supported b.c. the wrapped handler has already
+ # done the invalidation.
+ expect_failure = not utils.sqla_097 and orig_error and not evt_value
+
+ with mock.patch.object(engine.dialect, "is_disconnect",
+ mock.Mock(return_value=orig_error)):
+
+ with engine.connect() as c:
+ conn_rec = c.connection._connection_record
+ try:
+ c.execute("SELECT x FROM nonexistent")
+ assert False
+ except sqla.exc.StatementError as st:
+ self.assertFalse(expect_failure)
+
+ # check the exception's invalidation flag
+ self.assertEqual(st.connection_invalidated, evt_value)
+
+ # check the Connection object's invalidation flag
+ self.assertEqual(c.invalidated, evt_value)
+
+ # this is the ConnectionRecord object; it's invalidated
+ # when its .connection member is None
+ self.assertEqual(conn_rec.connection is None, evt_value)
+
+ except NotImplementedError as ne:
+ self.assertTrue(expect_failure)
+ self.assertEqual(str(ne),
+ "Can't reset 'disconnect' status of exception once it "
+ "is set with this version of SQLAlchemy")
+
+ def test_alter_disconnect_to_true(self):
+ self._test_alter_disconnect(False, True)
+ self._test_alter_disconnect(True, True)
+
+ def test_alter_disconnect_to_false(self):
+ self._test_alter_disconnect(True, False)
+ self._test_alter_disconnect(False, False)
diff --git a/tests/sqlalchemy/test_sqlalchemy.py b/tests/sqlalchemy/test_sqlalchemy.py
index 7dc070c..a6278d5 100644
--- a/tests/sqlalchemy/test_sqlalchemy.py
+++ b/tests/sqlalchemy/test_sqlalchemy.py
@@ -17,9 +17,7 @@
"""Unit tests for SQLAlchemy specific code."""
import logging
-from oslo.config import cfg
-import _mysql_exceptions
import fixtures
import mock
from oslotest import base as oslo_test
@@ -28,6 +26,7 @@ from sqlalchemy import Column, MetaData, Table
from sqlalchemy import Integer, String
from sqlalchemy.ext.declarative import declarative_base
+from oslo.config import cfg
from oslo.db import exception
from oslo.db import options as db_options
from oslo.db.sqlalchemy import models
@@ -127,53 +126,6 @@ class FakeDB2Engine(object):
pass
-class TestDBDisconnected(oslo_test.BaseTestCase):
-
- def _test_ping_listener_disconnected(self, connection):
- engine_args = {
- 'pool_recycle': 3600,
- 'echo': False,
- 'convert_unicode': True}
-
- engine = sqlalchemy.create_engine(connection, **engine_args)
- with mock.patch.object(engine, 'dispose') as dispose_mock:
- self.assertRaises(sqlalchemy.exc.DisconnectionError,
- session._ping_listener, engine,
- FakeDBAPIConnection(), FakeConnectionRec(),
- FakeConnectionProxy())
- dispose_mock.assert_called_once_with()
-
- def test_mysql_ping_listener_disconnected(self):
- def fake_execute(sql):
- raise _mysql_exceptions.OperationalError(self.mysql_error,
- ('MySQL server has '
- 'gone away'))
- with mock.patch.object(FakeCursor, 'execute',
- side_effect=fake_execute):
- connection = 'mysql://root:password@fakehost/fakedb?charset=utf8'
- for code in [2006, 2013, 2014, 2045, 2055]:
- self.mysql_error = code
- self._test_ping_listener_disconnected(connection)
-
- def test_db2_ping_listener_disconnected(self):
-
- def fake_execute(sql):
- raise OperationalError('SQL30081N: DB2 Server '
- 'connection is no longer active')
- with mock.patch.object(FakeCursor, 'execute',
- side_effect=fake_execute):
- # TODO(dperaza): Need a fake engine for db2 since ibm_db_sa is not
- # in global requirements. Change this code to use real IBM db2
- # engine as soon as ibm_db_sa is included in global-requirements
- # under openstack/requirements project.
- fake_create_engine = lambda *args, **kargs: FakeDB2Engine()
- with mock.patch.object(sqlalchemy, 'create_engine',
- side_effect=fake_create_engine):
- connection = ('ibm_db_sa://db2inst1:openstack@fakehost:50000'
- '/fakedab')
- self._test_ping_listener_disconnected(connection)
-
-
class MySQLModeTestCase(test_base.MySQLOpportunisticTestCase):
def __init__(self, *args, **kwargs):
@@ -499,3 +451,19 @@ class MysqlSetCallbackTest(oslo_test.BaseTestCase):
"SELECT * FROM bar",
]
self.assertEqual(exp_calls, engine._execs)
+
+
+class PatchStacktraceTest(test_base.DbTestCase):
+
+ def test_trace(self):
+ engine = self.engine
+ engine.connect()
+ with mock.patch.object(engine.dialect, "do_execute") as mock_exec:
+ session._add_trace_comments(engine)
+
+ engine.execute("select * from table")
+
+ call = mock_exec.mock_calls[0]
+
+ # we're the caller, see that we're in there
+ self.assertTrue("tests/sqlalchemy/test_sqlalchemy.py" in call[1][1])
diff --git a/tests/sqlalchemy/test_utils.py b/tests/sqlalchemy/test_utils.py
index 431fdb3..231d106 100644
--- a/tests/sqlalchemy/test_utils.py
+++ b/tests/sqlalchemy/test_utils.py
@@ -685,6 +685,11 @@ class TestConnectionUtils(test_utils.BaseTestCase):
self.assertEqual(utils.get_db_connection_info(conn_pieces),
('dude', 'pass', 'test', 'localhost'))
+ def test_connect_string_host(self):
+ self.full_credentials['host'] = 'myhost'
+ connect_string = utils.get_connect_string(**self.full_credentials)
+ self.assertEqual(connect_string, 'mysql://dude:pass@myhost/test')
+
class MyModelSoftDeletedProjectId(declarative_base(), models.ModelBase,
models.SoftDeleteMixin):
diff --git a/tox.ini b/tox.ini
index 0445c83..55189ff 100644
--- a/tox.ini
+++ b/tox.ini
@@ -24,10 +24,6 @@ commands = pip install SQLAlchemy>=0.9.0,!=0.9.5,<1.0.0
commands = pip install SQLAlchemy>=0.8.0,<0.9.0
python setup.py testr --slowest --testr-args='{posargs}'
-[testenv:sqla_07]
-commands = pip install SQLAlchemy>=0.7.7,<0.8.0
- python setup.py testr --slowest --testr-args='{posargs}'
-
[testenv:pep8]
commands = flake8