summaryrefslogtreecommitdiff
path: root/oslo
diff options
context:
space:
mode:
authorVictor Sergeyev <vsergeyev@mirantis.com>2014-04-18 11:33:16 +0300
committerVictor Sergeyev <vsergeyev@mirantis.com>2014-04-18 11:33:16 +0300
commite4cfa6d39d2aa53af64ab34de97183f98fbeb667 (patch)
tree219fee2ad0aa87c7471977470d563f48d1b97cd9 /oslo
parentb5e4815563334958d081e0fef30f25113d80555f (diff)
downloadoslo-db-e4cfa6d39d2aa53af64ab34de97183f98fbeb667.tar.gz
Fix the graduate.sh script result
Diffstat (limited to 'oslo')
-rw-r--r--oslo/__init__.py15
-rw-r--r--oslo/db/__init__.py0
-rw-r--r--oslo/db/api.py162
-rw-r--r--oslo/db/exception.py56
-rw-r--r--oslo/db/options.py171
-rw-r--r--oslo/db/sqlalchemy/__init__.py0
-rw-r--r--oslo/db/sqlalchemy/migration.py278
-rw-r--r--oslo/db/sqlalchemy/migration_cli/README.rst9
-rw-r--r--oslo/db/sqlalchemy/migration_cli/__init__.py0
-rw-r--r--oslo/db/sqlalchemy/migration_cli/ext_alembic.py78
-rw-r--r--oslo/db/sqlalchemy/migration_cli/ext_base.py79
-rw-r--r--oslo/db/sqlalchemy/migration_cli/ext_migrate.py69
-rw-r--r--oslo/db/sqlalchemy/migration_cli/manager.py71
-rw-r--r--oslo/db/sqlalchemy/models.py119
-rw-r--r--oslo/db/sqlalchemy/provision.py157
-rw-r--r--oslo/db/sqlalchemy/session.py904
-rw-r--r--oslo/db/sqlalchemy/test_base.py165
-rw-r--r--oslo/db/sqlalchemy/test_migrations.conf7
-rw-r--r--oslo/db/sqlalchemy/test_migrations.py269
-rw-r--r--oslo/db/sqlalchemy/utils.py655
20 files changed, 3264 insertions, 0 deletions
diff --git a/oslo/__init__.py b/oslo/__init__.py
new file mode 100644
index 0000000..c659cac
--- /dev/null
+++ b/oslo/__init__.py
@@ -0,0 +1,15 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+__import__('pkg_resources').declare_namespace(__name__) \ No newline at end of file
diff --git a/oslo/db/__init__.py b/oslo/db/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/oslo/db/__init__.py
diff --git a/oslo/db/api.py b/oslo/db/api.py
new file mode 100644
index 0000000..28e9a82
--- /dev/null
+++ b/oslo/db/api.py
@@ -0,0 +1,162 @@
+# Copyright (c) 2013 Rackspace Hosting
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+"""Multiple DB API backend support.
+
+A DB backend module should implement a method named 'get_backend' which
+takes no arguments. The method can return any object that implements DB
+API methods.
+"""
+
+import functools
+import logging
+import threading
+import time
+
+from oslo.db import exception
+from openstack.common.gettextutils import _LE
+from openstack.common import importutils
+
+
+LOG = logging.getLogger(__name__)
+
+
+def safe_for_db_retry(f):
+ """Enable db-retry for decorated function, if config option enabled."""
+ f.__dict__['enable_retry'] = True
+ return f
+
+
+class wrap_db_retry(object):
+ """Retry db.api methods, if DBConnectionError() raised
+
+ Retry decorated db.api methods. If we enabled `use_db_reconnect`
+ in config, this decorator will be applied to all db.api functions,
+ marked with @safe_for_db_retry decorator.
+ Decorator catchs DBConnectionError() and retries function in a
+ loop until it succeeds, or until maximum retries count will be reached.
+ """
+
+ def __init__(self, retry_interval, max_retries, inc_retry_interval,
+ max_retry_interval):
+ super(wrap_db_retry, self).__init__()
+
+ self.retry_interval = retry_interval
+ self.max_retries = max_retries
+ self.inc_retry_interval = inc_retry_interval
+ self.max_retry_interval = max_retry_interval
+
+ def __call__(self, f):
+ @functools.wraps(f)
+ def wrapper(*args, **kwargs):
+ next_interval = self.retry_interval
+ remaining = self.max_retries
+
+ while True:
+ try:
+ return f(*args, **kwargs)
+ except exception.DBConnectionError as e:
+ if remaining == 0:
+ LOG.exception(_LE('DB exceeded retry limit.'))
+ raise exception.DBError(e)
+ if remaining != -1:
+ remaining -= 1
+ LOG.exception(_LE('DB connection error.'))
+ # NOTE(vsergeyev): We are using patched time module, so
+ # this effectively yields the execution
+ # context to another green thread.
+ time.sleep(next_interval)
+ if self.inc_retry_interval:
+ next_interval = min(
+ next_interval * 2,
+ self.max_retry_interval
+ )
+ return wrapper
+
+
+class DBAPI(object):
+ def __init__(self, backend_name, backend_mapping=None, lazy=False,
+ **kwargs):
+ """Initialize the chosen DB API backend.
+
+ :param backend_name: name of the backend to load
+ :type backend_name: str
+
+ :param backend_mapping: backend name -> module/class to load mapping
+ :type backend_mapping: dict
+
+ :param lazy: load the DB backend lazily on the first DB API method call
+ :type lazy: bool
+
+ Keyword arguments:
+
+ :keyword use_db_reconnect: retry DB transactions on disconnect or not
+ :type use_db_reconnect: bool
+
+ :keyword retry_interval: seconds between transaction retries
+ :type retry_interval: int
+
+ :keyword inc_retry_interval: increase retry interval or not
+ :type inc_retry_interval: bool
+
+ :keyword max_retry_interval: max interval value between retries
+ :type max_retry_interval: int
+
+ :keyword max_retries: max number of retries before an error is raised
+ :type max_retries: int
+
+ """
+
+ self._backend = None
+ self._backend_name = backend_name
+ self._backend_mapping = backend_mapping or {}
+ self._lock = threading.Lock()
+
+ if not lazy:
+ self._load_backend()
+
+ self.use_db_reconnect = kwargs.get('use_db_reconnect', False)
+ self.retry_interval = kwargs.get('retry_interval', 1)
+ self.inc_retry_interval = kwargs.get('inc_retry_interval', True)
+ self.max_retry_interval = kwargs.get('max_retry_interval', 10)
+ self.max_retries = kwargs.get('max_retries', 20)
+
+ def _load_backend(self):
+ with self._lock:
+ if not self._backend:
+ # Import the untranslated name if we don't have a mapping
+ backend_path = self._backend_mapping.get(self._backend_name,
+ self._backend_name)
+ backend_mod = importutils.import_module(backend_path)
+ self._backend = backend_mod.get_backend()
+
+ def __getattr__(self, key):
+ if not self._backend:
+ self._load_backend()
+
+ attr = getattr(self._backend, key)
+ if not hasattr(attr, '__call__'):
+ return attr
+ # NOTE(vsergeyev): If `use_db_reconnect` option is set to True, retry
+ # DB API methods, decorated with @safe_for_db_retry
+ # on disconnect.
+ if self.use_db_reconnect and hasattr(attr, 'enable_retry'):
+ attr = wrap_db_retry(
+ retry_interval=self.retry_interval,
+ max_retries=self.max_retries,
+ inc_retry_interval=self.inc_retry_interval,
+ max_retry_interval=self.max_retry_interval)(attr)
+
+ return attr
diff --git a/oslo/db/exception.py b/oslo/db/exception.py
new file mode 100644
index 0000000..601063e
--- /dev/null
+++ b/oslo/db/exception.py
@@ -0,0 +1,56 @@
+# Copyright 2010 United States Government as represented by the
+# Administrator of the National Aeronautics and Space Administration.
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+"""DB related custom exceptions."""
+
+import six
+
+from openstack.common.gettextutils import _
+
+
+class DBError(Exception):
+ """Wraps an implementation specific exception."""
+ def __init__(self, inner_exception=None):
+ self.inner_exception = inner_exception
+ super(DBError, self).__init__(six.text_type(inner_exception))
+
+
+class DBDuplicateEntry(DBError):
+ """Wraps an implementation specific exception."""
+ def __init__(self, columns=[], inner_exception=None):
+ self.columns = columns
+ super(DBDuplicateEntry, self).__init__(inner_exception)
+
+
+class DBDeadlock(DBError):
+ def __init__(self, inner_exception=None):
+ super(DBDeadlock, self).__init__(inner_exception)
+
+
+class DBInvalidUnicodeParameter(Exception):
+ message = _("Invalid Parameter: "
+ "Unicode is not supported by the current database.")
+
+
+class DbMigrationError(DBError):
+ """Wraps migration specific exception."""
+ def __init__(self, message=None):
+ super(DbMigrationError, self).__init__(message)
+
+
+class DBConnectionError(DBError):
+ """Wraps connection specific exception."""
+ pass
diff --git a/oslo/db/options.py b/oslo/db/options.py
new file mode 100644
index 0000000..f60f470
--- /dev/null
+++ b/oslo/db/options.py
@@ -0,0 +1,171 @@
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import copy
+
+from oslo.config import cfg
+
+
+database_opts = [
+ cfg.StrOpt('sqlite_db',
+ deprecated_group='DEFAULT',
+ default='oslo.sqlite',
+ help='The file name to use with SQLite'),
+ cfg.BoolOpt('sqlite_synchronous',
+ deprecated_group='DEFAULT',
+ default=True,
+ help='If True, SQLite uses synchronous mode'),
+ cfg.StrOpt('backend',
+ default='sqlalchemy',
+ deprecated_name='db_backend',
+ deprecated_group='DEFAULT',
+ help='The backend to use for db'),
+ cfg.StrOpt('connection',
+ help='The SQLAlchemy connection string used to connect to the '
+ 'database',
+ secret=True,
+ deprecated_opts=[cfg.DeprecatedOpt('sql_connection',
+ group='DEFAULT'),
+ cfg.DeprecatedOpt('sql_connection',
+ group='DATABASE'),
+ cfg.DeprecatedOpt('connection',
+ group='sql'), ]),
+ cfg.StrOpt('mysql_sql_mode',
+ default='TRADITIONAL',
+ help='The SQL mode to be used for MySQL sessions. '
+ 'This option, including the default, overrides any '
+ 'server-set SQL mode. To use whatever SQL mode '
+ 'is set by the server configuration, '
+ 'set this to no value. Example: mysql_sql_mode='),
+ cfg.IntOpt('idle_timeout',
+ default=3600,
+ deprecated_opts=[cfg.DeprecatedOpt('sql_idle_timeout',
+ group='DEFAULT'),
+ cfg.DeprecatedOpt('sql_idle_timeout',
+ group='DATABASE'),
+ cfg.DeprecatedOpt('idle_timeout',
+ group='sql')],
+ help='Timeout before idle sql connections are reaped'),
+ cfg.IntOpt('min_pool_size',
+ default=1,
+ deprecated_opts=[cfg.DeprecatedOpt('sql_min_pool_size',
+ group='DEFAULT'),
+ cfg.DeprecatedOpt('sql_min_pool_size',
+ group='DATABASE')],
+ help='Minimum number of SQL connections to keep open in a '
+ 'pool'),
+ cfg.IntOpt('max_pool_size',
+ default=None,
+ deprecated_opts=[cfg.DeprecatedOpt('sql_max_pool_size',
+ group='DEFAULT'),
+ cfg.DeprecatedOpt('sql_max_pool_size',
+ group='DATABASE')],
+ help='Maximum number of SQL connections to keep open in a '
+ 'pool'),
+ cfg.IntOpt('max_retries',
+ default=10,
+ deprecated_opts=[cfg.DeprecatedOpt('sql_max_retries',
+ group='DEFAULT'),
+ cfg.DeprecatedOpt('sql_max_retries',
+ group='DATABASE')],
+ help='Maximum db connection retries during startup. '
+ '(setting -1 implies an infinite retry count)'),
+ cfg.IntOpt('retry_interval',
+ default=10,
+ deprecated_opts=[cfg.DeprecatedOpt('sql_retry_interval',
+ group='DEFAULT'),
+ cfg.DeprecatedOpt('reconnect_interval',
+ group='DATABASE')],
+ help='Interval between retries of opening a sql connection'),
+ cfg.IntOpt('max_overflow',
+ default=None,
+ deprecated_opts=[cfg.DeprecatedOpt('sql_max_overflow',
+ group='DEFAULT'),
+ cfg.DeprecatedOpt('sqlalchemy_max_overflow',
+ group='DATABASE')],
+ help='If set, use this value for max_overflow with sqlalchemy'),
+ cfg.IntOpt('connection_debug',
+ default=0,
+ deprecated_opts=[cfg.DeprecatedOpt('sql_connection_debug',
+ group='DEFAULT')],
+ help='Verbosity of SQL debugging information. 0=None, '
+ '100=Everything'),
+ cfg.BoolOpt('connection_trace',
+ default=False,
+ deprecated_opts=[cfg.DeprecatedOpt('sql_connection_trace',
+ group='DEFAULT')],
+ help='Add python stack traces to SQL as comment strings'),
+ cfg.IntOpt('pool_timeout',
+ default=None,
+ deprecated_opts=[cfg.DeprecatedOpt('sqlalchemy_pool_timeout',
+ group='DATABASE')],
+ help='If set, use this value for pool_timeout with sqlalchemy'),
+ cfg.BoolOpt('use_db_reconnect',
+ default=False,
+ help='Enable the experimental use of database reconnect '
+ 'on connection lost'),
+ cfg.IntOpt('db_retry_interval',
+ default=1,
+ help='seconds between db connection retries'),
+ cfg.BoolOpt('db_inc_retry_interval',
+ default=True,
+ help='Whether to increase interval between db connection '
+ 'retries, up to db_max_retry_interval'),
+ cfg.IntOpt('db_max_retry_interval',
+ default=10,
+ help='max seconds between db connection retries, if '
+ 'db_inc_retry_interval is enabled'),
+ cfg.IntOpt('db_max_retries',
+ default=20,
+ help='maximum db connection retries before error is raised. '
+ '(setting -1 implies an infinite retry count)'),
+]
+
+CONF = cfg.CONF
+CONF.register_opts(database_opts, 'database')
+
+
+def set_defaults(sql_connection, sqlite_db, max_pool_size=None,
+ max_overflow=None, pool_timeout=None):
+ """Set defaults for configuration variables."""
+ cfg.set_defaults(database_opts,
+ connection=sql_connection,
+ sqlite_db=sqlite_db)
+ # Update the QueuePool defaults
+ if max_pool_size is not None:
+ cfg.set_defaults(database_opts,
+ max_pool_size=max_pool_size)
+ if max_overflow is not None:
+ cfg.set_defaults(database_opts,
+ max_overflow=max_overflow)
+ if pool_timeout is not None:
+ cfg.set_defaults(database_opts,
+ pool_timeout=pool_timeout)
+
+
+def list_opts():
+ """Returns a list of oslo.config options available in the library.
+
+ The returned list includes all oslo.config options which may be registered
+ at runtime by the library.
+
+ Each element of the list is a tuple. The first element is the name of the
+ group under which the list of elements in the second element will be
+ registered. A group name of None corresponds to the [DEFAULT] group in
+ config files.
+
+ The purpose of this is to allow tools like the Oslo sample config file
+ generator to discover the options exposed to users by this library.
+
+ :returns: a list of (group_name, opts) tuples
+ """
+ return [('database', copy.deepcopy(database_opts))]
diff --git a/oslo/db/sqlalchemy/__init__.py b/oslo/db/sqlalchemy/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/oslo/db/sqlalchemy/__init__.py
diff --git a/oslo/db/sqlalchemy/migration.py b/oslo/db/sqlalchemy/migration.py
new file mode 100644
index 0000000..5c69d27
--- /dev/null
+++ b/oslo/db/sqlalchemy/migration.py
@@ -0,0 +1,278 @@
+# coding: utf-8
+#
+# Copyright (c) 2013 OpenStack Foundation
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+#
+# Base on code in migrate/changeset/databases/sqlite.py which is under
+# the following license:
+#
+# The MIT License
+#
+# Copyright (c) 2009 Evan Rosson, Jan Dittberner, Domen Kožar
+#
+# Permission is hereby granted, free of charge, to any person obtaining a copy
+# of this software and associated documentation files (the "Software"), to deal
+# in the Software without restriction, including without limitation the rights
+# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+# copies of the Software, and to permit persons to whom the Software is
+# furnished to do so, subject to the following conditions:
+# The above copyright notice and this permission notice shall be included in
+# all copies or substantial portions of the Software.
+#
+# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+# THE SOFTWARE.
+
+import os
+import re
+
+from migrate.changeset import ansisql
+from migrate.changeset.databases import sqlite
+from migrate import exceptions as versioning_exceptions
+from migrate.versioning import api as versioning_api
+from migrate.versioning.repository import Repository
+import sqlalchemy
+from sqlalchemy.schema import UniqueConstraint
+
+from oslo.db import exception
+from openstack.common.gettextutils import _
+
+
+def _get_unique_constraints(self, table):
+ """Retrieve information about existing unique constraints of the table
+
+ This feature is needed for _recreate_table() to work properly.
+ Unfortunately, it's not available in sqlalchemy 0.7.x/0.8.x.
+
+ """
+
+ data = table.metadata.bind.execute(
+ """SELECT sql
+ FROM sqlite_master
+ WHERE
+ type='table' AND
+ name=:table_name""",
+ table_name=table.name
+ ).fetchone()[0]
+
+ UNIQUE_PATTERN = "CONSTRAINT (\w+) UNIQUE \(([^\)]+)\)"
+ return [
+ UniqueConstraint(
+ *[getattr(table.columns, c.strip(' "')) for c in cols.split(",")],
+ name=name
+ )
+ for name, cols in re.findall(UNIQUE_PATTERN, data)
+ ]
+
+
+def _recreate_table(self, table, column=None, delta=None, omit_uniques=None):
+ """Recreate the table properly
+
+ Unlike the corresponding original method of sqlalchemy-migrate this one
+ doesn't drop existing unique constraints when creating a new one.
+
+ """
+
+ table_name = self.preparer.format_table(table)
+
+ # we remove all indexes so as not to have
+ # problems during copy and re-create
+ for index in table.indexes:
+ index.drop()
+
+ # reflect existing unique constraints
+ for uc in self._get_unique_constraints(table):
+ table.append_constraint(uc)
+ # omit given unique constraints when creating a new table if required
+ table.constraints = set([
+ cons for cons in table.constraints
+ if omit_uniques is None or cons.name not in omit_uniques
+ ])
+
+ self.append('ALTER TABLE %s RENAME TO migration_tmp' % table_name)
+ self.execute()
+
+ insertion_string = self._modify_table(table, column, delta)
+
+ table.create(bind=self.connection)
+ self.append(insertion_string % {'table_name': table_name})
+ self.execute()
+ self.append('DROP TABLE migration_tmp')
+ self.execute()
+
+
+def _visit_migrate_unique_constraint(self, *p, **k):
+ """Drop the given unique constraint
+
+ The corresponding original method of sqlalchemy-migrate just
+ raises NotImplemented error
+
+ """
+
+ self.recreate_table(p[0].table, omit_uniques=[p[0].name])
+
+
+def patch_migrate():
+ """A workaround for SQLite's inability to alter things
+
+ SQLite abilities to alter tables are very limited (please read
+ http://www.sqlite.org/lang_altertable.html for more details).
+ E. g. one can't drop a column or a constraint in SQLite. The
+ workaround for this is to recreate the original table omitting
+ the corresponding constraint (or column).
+
+ sqlalchemy-migrate library has recreate_table() method that
+ implements this workaround, but it does it wrong:
+
+ - information about unique constraints of a table
+ is not retrieved. So if you have a table with one
+ unique constraint and a migration adding another one
+ you will end up with a table that has only the
+ latter unique constraint, and the former will be lost
+
+ - dropping of unique constraints is not supported at all
+
+ The proper way to fix this is to provide a pull-request to
+ sqlalchemy-migrate, but the project seems to be dead. So we
+ can go on with monkey-patching of the lib at least for now.
+
+ """
+
+ # this patch is needed to ensure that recreate_table() doesn't drop
+ # existing unique constraints of the table when creating a new one
+ helper_cls = sqlite.SQLiteHelper
+ helper_cls.recreate_table = _recreate_table
+ helper_cls._get_unique_constraints = _get_unique_constraints
+
+ # this patch is needed to be able to drop existing unique constraints
+ constraint_cls = sqlite.SQLiteConstraintDropper
+ constraint_cls.visit_migrate_unique_constraint = \
+ _visit_migrate_unique_constraint
+ constraint_cls.__bases__ = (ansisql.ANSIColumnDropper,
+ sqlite.SQLiteConstraintGenerator)
+
+
+def db_sync(engine, abs_path, version=None, init_version=0, sanity_check=True):
+ """Upgrade or downgrade a database.
+
+ Function runs the upgrade() or downgrade() functions in change scripts.
+
+ :param engine: SQLAlchemy engine instance for a given database
+ :param abs_path: Absolute path to migrate repository.
+ :param version: Database will upgrade/downgrade until this version.
+ If None - database will update to the latest
+ available version.
+ :param init_version: Initial database version
+ :param sanity_check: Require schema sanity checking for all tables
+ """
+
+ if version is not None:
+ try:
+ version = int(version)
+ except ValueError:
+ raise exception.DbMigrationError(
+ message=_("version should be an integer"))
+
+ current_version = db_version(engine, abs_path, init_version)
+ repository = _find_migrate_repo(abs_path)
+ if sanity_check:
+ _db_schema_sanity_check(engine)
+ if version is None or version > current_version:
+ return versioning_api.upgrade(engine, repository, version)
+ else:
+ return versioning_api.downgrade(engine, repository,
+ version)
+
+
+def _db_schema_sanity_check(engine):
+ """Ensure all database tables were created with required parameters.
+
+ :param engine: SQLAlchemy engine instance for a given database
+
+ """
+
+ if engine.name == 'mysql':
+ onlyutf8_sql = ('SELECT TABLE_NAME,TABLE_COLLATION '
+ 'from information_schema.TABLES '
+ 'where TABLE_SCHEMA=%s and '
+ 'TABLE_COLLATION NOT LIKE "%%utf8%%"')
+
+ # NOTE(morganfainberg): exclude the sqlalchemy-migrate and alembic
+ # versioning tables from the tables we need to verify utf8 status on.
+ # Non-standard table names are not supported.
+ EXCLUDED_TABLES = ['migrate_version', 'alembic_version']
+
+ table_names = [res[0] for res in
+ engine.execute(onlyutf8_sql, engine.url.database) if
+ res[0].lower() not in EXCLUDED_TABLES]
+
+ if len(table_names) > 0:
+ raise ValueError(_('Tables "%s" have non utf8 collation, '
+ 'please make sure all tables are CHARSET=utf8'
+ ) % ','.join(table_names))
+
+
+def db_version(engine, abs_path, init_version):
+ """Show the current version of the repository.
+
+ :param engine: SQLAlchemy engine instance for a given database
+ :param abs_path: Absolute path to migrate repository
+ :param version: Initial database version
+ """
+ repository = _find_migrate_repo(abs_path)
+ try:
+ return versioning_api.db_version(engine, repository)
+ except versioning_exceptions.DatabaseNotControlledError:
+ meta = sqlalchemy.MetaData()
+ meta.reflect(bind=engine)
+ tables = meta.tables
+ if len(tables) == 0 or 'alembic_version' in tables:
+ db_version_control(engine, abs_path, version=init_version)
+ return versioning_api.db_version(engine, repository)
+ else:
+ raise exception.DbMigrationError(
+ message=_(
+ "The database is not under version control, but has "
+ "tables. Please stamp the current version of the schema "
+ "manually."))
+
+
+def db_version_control(engine, abs_path, version=None):
+ """Mark a database as under this repository's version control.
+
+ Once a database is under version control, schema changes should
+ only be done via change scripts in this repository.
+
+ :param engine: SQLAlchemy engine instance for a given database
+ :param abs_path: Absolute path to migrate repository
+ :param version: Initial database version
+ """
+ repository = _find_migrate_repo(abs_path)
+ versioning_api.version_control(engine, repository, version)
+ return version
+
+
+def _find_migrate_repo(abs_path):
+ """Get the project's change script repository
+
+ :param abs_path: Absolute path to migrate repository
+ """
+ if not os.path.exists(abs_path):
+ raise exception.DbMigrationError("Path %s not found" % abs_path)
+ return Repository(abs_path)
diff --git a/oslo/db/sqlalchemy/migration_cli/README.rst b/oslo/db/sqlalchemy/migration_cli/README.rst
new file mode 100644
index 0000000..ebbbdcb
--- /dev/null
+++ b/oslo/db/sqlalchemy/migration_cli/README.rst
@@ -0,0 +1,9 @@
+This module could be used either for:
+1. Smooth transition from migrate tool to alembic
+2. As standalone alembic tool
+
+Core points:
+1. Upgrade/downgrade database with usage of alembic/migrate migrations
+or both
+2. Compatibility with oslo.config
+3. The way to autogenerate new revisions or stamps
diff --git a/oslo/db/sqlalchemy/migration_cli/__init__.py b/oslo/db/sqlalchemy/migration_cli/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/oslo/db/sqlalchemy/migration_cli/__init__.py
diff --git a/oslo/db/sqlalchemy/migration_cli/ext_alembic.py b/oslo/db/sqlalchemy/migration_cli/ext_alembic.py
new file mode 100644
index 0000000..be3d8c6
--- /dev/null
+++ b/oslo/db/sqlalchemy/migration_cli/ext_alembic.py
@@ -0,0 +1,78 @@
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import os
+
+import alembic
+from alembic import config as alembic_config
+import alembic.migration as alembic_migration
+
+from oslo.db.sqlalchemy.migration_cli import ext_base
+from oslo.db.sqlalchemy import session as db_session
+
+
+class AlembicExtension(ext_base.MigrationExtensionBase):
+
+ order = 2
+
+ @property
+ def enabled(self):
+ return os.path.exists(self.alembic_ini_path)
+
+ def __init__(self, migration_config):
+ """Extension to provide alembic features.
+
+ :param migration_config: Stores specific configuration for migrations
+ :type migration_config: dict
+ """
+ self.alembic_ini_path = migration_config.get('alembic_ini_path', '')
+ self.config = alembic_config.Config(self.alembic_ini_path)
+ # option should be used if script is not in default directory
+ repo_path = migration_config.get('alembic_repo_path')
+ if repo_path:
+ self.config.set_main_option('script_location', repo_path)
+ self.db_url = migration_config['db_url']
+
+ def upgrade(self, version):
+ return alembic.command.upgrade(self.config, version or 'head')
+
+ def downgrade(self, version):
+ if isinstance(version, int) or version is None or version.isdigit():
+ version = 'base'
+ return alembic.command.downgrade(self.config, version)
+
+ def version(self):
+ engine = db_session.create_engine(self.db_url)
+ with engine.connect() as conn:
+ context = alembic_migration.MigrationContext.configure(conn)
+ return context.get_current_revision()
+
+ def revision(self, message='', autogenerate=False):
+ """Creates template for migration.
+
+ :param message: Text that will be used for migration title
+ :type message: string
+ :param autogenerate: If True - generates diff based on current database
+ state
+ :type autogenerate: bool
+ """
+ return alembic.command.revision(self.config, message=message,
+ autogenerate=autogenerate)
+
+ def stamp(self, revision):
+ """Stamps database with provided revision.
+
+ :param revision: Should match one from repository or head - to stamp
+ database with most recent revision
+ :type revision: string
+ """
+ return alembic.command.stamp(self.config, revision=revision)
diff --git a/oslo/db/sqlalchemy/migration_cli/ext_base.py b/oslo/db/sqlalchemy/migration_cli/ext_base.py
new file mode 100644
index 0000000..271cd0a
--- /dev/null
+++ b/oslo/db/sqlalchemy/migration_cli/ext_base.py
@@ -0,0 +1,79 @@
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import abc
+
+import six
+
+
+@six.add_metaclass(abc.ABCMeta)
+class MigrationExtensionBase(object):
+
+ #used to sort migration in logical order
+ order = 0
+
+ @property
+ def enabled(self):
+ """Used for availability verification of a plugin.
+
+ :rtype: bool
+ """
+ return False
+
+ @abc.abstractmethod
+ def upgrade(self, version):
+ """Used for upgrading database.
+
+ :param version: Desired database version
+ :type version: string
+ """
+
+ @abc.abstractmethod
+ def downgrade(self, version):
+ """Used for downgrading database.
+
+ :param version: Desired database version
+ :type version: string
+ """
+
+ @abc.abstractmethod
+ def version(self):
+ """Current database version.
+
+ :returns: Databse version
+ :rtype: string
+ """
+
+ def revision(self, *args, **kwargs):
+ """Used to generate migration script.
+
+ In migration engines that support this feature, it should generate
+ new migration script.
+
+ Accept arbitrary set of arguments.
+ """
+ raise NotImplementedError()
+
+ def stamp(self, *args, **kwargs):
+ """Stamps database based on plugin features.
+
+ Accept arbitrary set of arguments.
+ """
+ raise NotImplementedError()
+
+ def __cmp__(self, other):
+ """Used for definition of plugin order.
+
+ :param other: MigrationExtensionBase instance
+ :rtype: bool
+ """
+ return self.order > other.order
diff --git a/oslo/db/sqlalchemy/migration_cli/ext_migrate.py b/oslo/db/sqlalchemy/migration_cli/ext_migrate.py
new file mode 100644
index 0000000..cf5280b
--- /dev/null
+++ b/oslo/db/sqlalchemy/migration_cli/ext_migrate.py
@@ -0,0 +1,69 @@
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import logging
+import os
+
+from oslo.db.sqlalchemy import migration
+from oslo.db.sqlalchemy.migration_cli import ext_base
+from oslo.db.sqlalchemy import session as db_session
+from openstack.common.gettextutils import _LE
+
+
+LOG = logging.getLogger(__name__)
+
+
+class MigrateExtension(ext_base.MigrationExtensionBase):
+ """Extension to provide sqlalchemy-migrate features.
+
+ :param migration_config: Stores specific configuration for migrations
+ :type migration_config: dict
+ """
+
+ order = 1
+
+ def __init__(self, migration_config):
+ self.repository = migration_config.get('migration_repo_path', '')
+ self.init_version = migration_config.get('init_version', 0)
+ self.db_url = migration_config['db_url']
+ self.engine = db_session.create_engine(self.db_url)
+
+ @property
+ def enabled(self):
+ return os.path.exists(self.repository)
+
+ def upgrade(self, version):
+ version = None if version == 'head' else version
+ return migration.db_sync(
+ self.engine, self.repository, version,
+ init_version=self.init_version)
+
+ def downgrade(self, version):
+ try:
+ #version for migrate should be valid int - else skip
+ if version in ('base', None):
+ version = self.init_version
+ version = int(version)
+ return migration.db_sync(
+ self.engine, self.repository, version,
+ init_version=self.init_version)
+ except ValueError:
+ LOG.error(
+ _LE('Migration number for migrate plugin must be valid '
+ 'integer or empty, if you want to downgrade '
+ 'to initial state')
+ )
+ raise
+
+ def version(self):
+ return migration.db_version(
+ self.engine, self.repository, init_version=self.init_version)
diff --git a/oslo/db/sqlalchemy/migration_cli/manager.py b/oslo/db/sqlalchemy/migration_cli/manager.py
new file mode 100644
index 0000000..ccc2712
--- /dev/null
+++ b/oslo/db/sqlalchemy/migration_cli/manager.py
@@ -0,0 +1,71 @@
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+from stevedore import enabled
+
+
+MIGRATION_NAMESPACE = 'openstack.common.migration'
+
+
+def check_plugin_enabled(ext):
+ """Used for EnabledExtensionManager"""
+ return ext.obj.enabled
+
+
+class MigrationManager(object):
+
+ def __init__(self, migration_config):
+ self._manager = enabled.EnabledExtensionManager(
+ MIGRATION_NAMESPACE,
+ check_plugin_enabled,
+ invoke_kwds={'migration_config': migration_config},
+ invoke_on_load=True
+ )
+ if not self._plugins:
+ raise ValueError('There must be at least one plugin active.')
+
+ @property
+ def _plugins(self):
+ return sorted(ext.obj for ext in self._manager.extensions)
+
+ def upgrade(self, revision):
+ """Upgrade database with all available backends."""
+ results = []
+ for plugin in self._plugins:
+ results.append(plugin.upgrade(revision))
+ return results
+
+ def downgrade(self, revision):
+ """Downgrade database with available backends."""
+ #downgrading should be performed in reversed order
+ results = []
+ for plugin in reversed(self._plugins):
+ results.append(plugin.downgrade(revision))
+ return results
+
+ def version(self):
+ """Return last version of db."""
+ last = None
+ for plugin in self._plugins:
+ version = plugin.version()
+ if version:
+ last = version
+ return last
+
+ def revision(self, message, autogenerate):
+ """Generate template or autogenerated revision."""
+ #revision should be done only by last plugin
+ return self._plugins[-1].revision(message, autogenerate)
+
+ def stamp(self, revision):
+ """Create stamp for a given revision."""
+ return self._plugins[-1].stamp(revision)
diff --git a/oslo/db/sqlalchemy/models.py b/oslo/db/sqlalchemy/models.py
new file mode 100644
index 0000000..d52edcd
--- /dev/null
+++ b/oslo/db/sqlalchemy/models.py
@@ -0,0 +1,119 @@
+# Copyright (c) 2011 X.commerce, a business unit of eBay Inc.
+# Copyright 2010 United States Government as represented by the
+# Administrator of the National Aeronautics and Space Administration.
+# Copyright 2011 Piston Cloud Computing, Inc.
+# Copyright 2012 Cloudscaling Group, Inc.
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+"""
+SQLAlchemy models.
+"""
+
+import six
+
+from sqlalchemy import Column, Integer
+from sqlalchemy import DateTime
+from sqlalchemy.orm import object_mapper
+
+from openstack.common import timeutils
+
+
+class ModelBase(six.Iterator):
+ """Base class for models."""
+ __table_initialized__ = False
+
+ def save(self, session):
+ """Save this object."""
+
+ # NOTE(boris-42): This part of code should be look like:
+ # session.add(self)
+ # session.flush()
+ # But there is a bug in sqlalchemy and eventlet that
+ # raises NoneType exception if there is no running
+ # transaction and rollback is called. As long as
+ # sqlalchemy has this bug we have to create transaction
+ # explicitly.
+ with session.begin(subtransactions=True):
+ session.add(self)
+ session.flush()
+
+ def __setitem__(self, key, value):
+ setattr(self, key, value)
+
+ def __getitem__(self, key):
+ return getattr(self, key)
+
+ def get(self, key, default=None):
+ return getattr(self, key, default)
+
+ @property
+ def _extra_keys(self):
+ """Specifies custom fields
+
+ Subclasses can override this property to return a list
+ of custom fields that should be included in their dict
+ representation.
+
+ For reference check tests/db/sqlalchemy/test_models.py
+ """
+ return []
+
+ def __iter__(self):
+ columns = list(dict(object_mapper(self).columns).keys())
+ # NOTE(russellb): Allow models to specify other keys that can be looked
+ # up, beyond the actual db columns. An example would be the 'name'
+ # property for an Instance.
+ columns.extend(self._extra_keys)
+ self._i = iter(columns)
+ return self
+
+ # In Python 3, __next__() has replaced next().
+ def __next__(self):
+ n = six.advance_iterator(self._i)
+ return n, getattr(self, n)
+
+ def next(self):
+ return self.__next__()
+
+ def update(self, values):
+ """Make the model object behave like a dict."""
+ for k, v in six.iteritems(values):
+ setattr(self, k, v)
+
+ def iteritems(self):
+ """Make the model object behave like a dict.
+
+ Includes attributes from joins.
+ """
+ local = dict(self)
+ joined = dict([(k, v) for k, v in six.iteritems(self.__dict__)
+ if not k[0] == '_'])
+ local.update(joined)
+ return six.iteritems(local)
+
+
+class TimestampMixin(object):
+ created_at = Column(DateTime, default=lambda: timeutils.utcnow())
+ updated_at = Column(DateTime, onupdate=lambda: timeutils.utcnow())
+
+
+class SoftDeleteMixin(object):
+ deleted_at = Column(DateTime)
+ deleted = Column(Integer, default=0)
+
+ def soft_delete(self, session):
+ """Mark this object as deleted."""
+ self.deleted = self.id
+ self.deleted_at = timeutils.utcnow()
+ self.save(session=session)
diff --git a/oslo/db/sqlalchemy/provision.py b/oslo/db/sqlalchemy/provision.py
new file mode 100644
index 0000000..598305b
--- /dev/null
+++ b/oslo/db/sqlalchemy/provision.py
@@ -0,0 +1,157 @@
+# Copyright 2013 Mirantis.inc
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+"""Provision test environment for specific DB backends"""
+
+import argparse
+import logging
+import os
+import random
+import string
+
+from six import moves
+import sqlalchemy
+
+from oslo.db import exception as exc
+
+
+LOG = logging.getLogger(__name__)
+
+
+def get_engine(uri):
+ """Engine creation
+
+ Call the function without arguments to get admin connection. Admin
+ connection required to create temporary user and database for each
+ particular test. Otherwise use existing connection to recreate connection
+ to the temporary database.
+ """
+ return sqlalchemy.create_engine(uri, poolclass=sqlalchemy.pool.NullPool)
+
+
+def _execute_sql(engine, sql, driver):
+ """Initialize connection, execute sql query and close it."""
+ try:
+ with engine.connect() as conn:
+ if driver == 'postgresql':
+ conn.connection.set_isolation_level(0)
+ for s in sql:
+ conn.execute(s)
+ except sqlalchemy.exc.OperationalError:
+ msg = ('%s does not match database admin '
+ 'credentials or database does not exist.')
+ LOG.exception(msg % engine.url)
+ raise exc.DBConnectionError(msg % engine.url)
+
+
+def create_database(engine):
+ """Provide temporary user and database for each particular test."""
+ driver = engine.name
+
+ auth = {
+ 'database': ''.join(random.choice(string.ascii_lowercase)
+ for i in moves.range(10)),
+ 'user': engine.url.username,
+ 'passwd': engine.url.password,
+ }
+
+ sqls = [
+ "drop database if exists %(database)s;",
+ "create database %(database)s;"
+ ]
+
+ if driver == 'sqlite':
+ return 'sqlite:////tmp/%s' % auth['database']
+ elif driver in ['mysql', 'postgresql']:
+ sql_query = map(lambda x: x % auth, sqls)
+ _execute_sql(engine, sql_query, driver)
+ else:
+ raise ValueError('Unsupported RDBMS %s' % driver)
+
+ params = auth.copy()
+ params['backend'] = driver
+ return "%(backend)s://%(user)s:%(passwd)s@localhost/%(database)s" % params
+
+
+def drop_database(admin_engine, current_uri):
+ """Drop temporary database and user after each particular test."""
+
+ engine = get_engine(current_uri)
+ driver = engine.name
+ auth = {'database': engine.url.database, 'user': engine.url.username}
+
+ if driver == 'sqlite':
+ try:
+ os.remove(auth['database'])
+ except OSError:
+ pass
+ elif driver in ['mysql', 'postgresql']:
+ sql = "drop database if exists %(database)s;"
+ _execute_sql(admin_engine, [sql % auth], driver)
+ else:
+ raise ValueError('Unsupported RDBMS %s' % driver)
+
+
+def main():
+ """Controller to handle commands
+
+ ::create: Create test user and database with random names.
+ ::drop: Drop user and database created by previous command.
+ """
+ parser = argparse.ArgumentParser(
+ description='Controller to handle database creation and dropping'
+ ' commands.',
+ epilog='Under normal circumstances is not used directly.'
+ ' Used in .testr.conf to automate test database creation'
+ ' and dropping processes.')
+ subparsers = parser.add_subparsers(
+ help='Subcommands to manipulate temporary test databases.')
+
+ create = subparsers.add_parser(
+ 'create',
+ help='Create temporary test '
+ 'databases and users.')
+ create.set_defaults(which='create')
+ create.add_argument(
+ 'instances_count',
+ type=int,
+ help='Number of databases to create.')
+
+ drop = subparsers.add_parser(
+ 'drop',
+ help='Drop temporary test databases and users.')
+ drop.set_defaults(which='drop')
+ drop.add_argument(
+ 'instances',
+ nargs='+',
+ help='List of databases uri to be dropped.')
+
+ args = parser.parse_args()
+
+ connection_string = os.getenv('OS_TEST_DBAPI_ADMIN_CONNECTION',
+ 'sqlite://')
+ engine = get_engine(connection_string)
+ which = args.which
+
+ if which == "create":
+ for i in range(int(args.instances_count)):
+ print(create_database(engine))
+ elif which == "drop":
+ for db in args.instances:
+ drop_database(engine, db)
+
+
+if __name__ == "__main__":
+ main()
diff --git a/oslo/db/sqlalchemy/session.py b/oslo/db/sqlalchemy/session.py
new file mode 100644
index 0000000..90ff934
--- /dev/null
+++ b/oslo/db/sqlalchemy/session.py
@@ -0,0 +1,904 @@
+# Copyright 2010 United States Government as represented by the
+# Administrator of the National Aeronautics and Space Administration.
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+"""Session Handling for SQLAlchemy backend.
+
+Recommended ways to use sessions within this framework:
+
+* Don't use them explicitly; this is like running with ``AUTOCOMMIT=1``.
+ `model_query()` will implicitly use a session when called without one
+ supplied. This is the ideal situation because it will allow queries
+ to be automatically retried if the database connection is interrupted.
+
+ .. note:: Automatic retry will be enabled in a future patch.
+
+ It is generally fine to issue several queries in a row like this. Even though
+ they may be run in separate transactions and/or separate sessions, each one
+ will see the data from the prior calls. If needed, undo- or rollback-like
+ functionality should be handled at a logical level. For an example, look at
+ the code around quotas and `reservation_rollback()`.
+
+ Examples:
+
+ .. code:: python
+
+ def get_foo(context, foo):
+ return (model_query(context, models.Foo).
+ filter_by(foo=foo).
+ first())
+
+ def update_foo(context, id, newfoo):
+ (model_query(context, models.Foo).
+ filter_by(id=id).
+ update({'foo': newfoo}))
+
+ def create_foo(context, values):
+ foo_ref = models.Foo()
+ foo_ref.update(values)
+ foo_ref.save()
+ return foo_ref
+
+
+* Within the scope of a single method, keep all the reads and writes within
+ the context managed by a single session. In this way, the session's
+ `__exit__` handler will take care of calling `flush()` and `commit()` for
+ you. If using this approach, you should not explicitly call `flush()` or
+ `commit()`. Any error within the context of the session will cause the
+ session to emit a `ROLLBACK`. Database errors like `IntegrityError` will be
+ raised in `session`'s `__exit__` handler, and any try/except within the
+ context managed by `session` will not be triggered. And catching other
+ non-database errors in the session will not trigger the ROLLBACK, so
+ exception handlers should always be outside the session, unless the
+ developer wants to do a partial commit on purpose. If the connection is
+ dropped before this is possible, the database will implicitly roll back the
+ transaction.
+
+ .. note:: Statements in the session scope will not be automatically retried.
+
+ If you create models within the session, they need to be added, but you
+ do not need to call `model.save()`:
+
+ .. code:: python
+
+ def create_many_foo(context, foos):
+ session = sessionmaker()
+ with session.begin():
+ for foo in foos:
+ foo_ref = models.Foo()
+ foo_ref.update(foo)
+ session.add(foo_ref)
+
+ def update_bar(context, foo_id, newbar):
+ session = sessionmaker()
+ with session.begin():
+ foo_ref = (model_query(context, models.Foo, session).
+ filter_by(id=foo_id).
+ first())
+ (model_query(context, models.Bar, session).
+ filter_by(id=foo_ref['bar_id']).
+ update({'bar': newbar}))
+
+ .. note:: `update_bar` is a trivially simple example of using
+ ``with session.begin``. Whereas `create_many_foo` is a good example of
+ when a transaction is needed, it is always best to use as few queries as
+ possible.
+
+ The two queries in `update_bar` can be better expressed using a single query
+ which avoids the need for an explicit transaction. It can be expressed like
+ so:
+
+ .. code:: python
+
+ def update_bar(context, foo_id, newbar):
+ subq = (model_query(context, models.Foo.id).
+ filter_by(id=foo_id).
+ limit(1).
+ subquery())
+ (model_query(context, models.Bar).
+ filter_by(id=subq.as_scalar()).
+ update({'bar': newbar}))
+
+ For reference, this emits approximately the following SQL statement:
+
+ .. code:: sql
+
+ UPDATE bar SET bar = ${newbar}
+ WHERE id=(SELECT bar_id FROM foo WHERE id = ${foo_id} LIMIT 1);
+
+ .. note:: `create_duplicate_foo` is a trivially simple example of catching an
+ exception while using ``with session.begin``. Here create two duplicate
+ instances with same primary key, must catch the exception out of context
+ managed by a single session:
+
+ .. code:: python
+
+ def create_duplicate_foo(context):
+ foo1 = models.Foo()
+ foo2 = models.Foo()
+ foo1.id = foo2.id = 1
+ session = sessionmaker()
+ try:
+ with session.begin():
+ session.add(foo1)
+ session.add(foo2)
+ except exception.DBDuplicateEntry as e:
+ handle_error(e)
+
+* Passing an active session between methods. Sessions should only be passed
+ to private methods. The private method must use a subtransaction; otherwise
+ SQLAlchemy will throw an error when you call `session.begin()` on an existing
+ transaction. Public methods should not accept a session parameter and should
+ not be involved in sessions within the caller's scope.
+
+ Note that this incurs more overhead in SQLAlchemy than the above means
+ due to nesting transactions, and it is not possible to implicitly retry
+ failed database operations when using this approach.
+
+ This also makes code somewhat more difficult to read and debug, because a
+ single database transaction spans more than one method. Error handling
+ becomes less clear in this situation. When this is needed for code clarity,
+ it should be clearly documented.
+
+ .. code:: python
+
+ def myfunc(foo):
+ session = sessionmaker()
+ with session.begin():
+ # do some database things
+ bar = _private_func(foo, session)
+ return bar
+
+ def _private_func(foo, session=None):
+ if not session:
+ session = sessionmaker()
+ with session.begin(subtransaction=True):
+ # do some other database things
+ return bar
+
+
+There are some things which it is best to avoid:
+
+* Don't keep a transaction open any longer than necessary.
+
+ This means that your ``with session.begin()`` block should be as short
+ as possible, while still containing all the related calls for that
+ transaction.
+
+* Avoid ``with_lockmode('UPDATE')`` when possible.
+
+ In MySQL/InnoDB, when a ``SELECT ... FOR UPDATE`` query does not match
+ any rows, it will take a gap-lock. This is a form of write-lock on the
+ "gap" where no rows exist, and prevents any other writes to that space.
+ This can effectively prevent any INSERT into a table by locking the gap
+ at the end of the index. Similar problems will occur if the SELECT FOR UPDATE
+ has an overly broad WHERE clause, or doesn't properly use an index.
+
+ One idea proposed at ODS Fall '12 was to use a normal SELECT to test the
+ number of rows matching a query, and if only one row is returned,
+ then issue the SELECT FOR UPDATE.
+
+ The better long-term solution is to use
+ ``INSERT .. ON DUPLICATE KEY UPDATE``.
+ However, this can not be done until the "deleted" columns are removed and
+ proper UNIQUE constraints are added to the tables.
+
+
+Enabling soft deletes:
+
+* To use/enable soft-deletes, the `SoftDeleteMixin` must be added
+ to your model class. For example:
+
+ .. code:: python
+
+ class NovaBase(models.SoftDeleteMixin, models.ModelBase):
+ pass
+
+
+Efficient use of soft deletes:
+
+* There are two possible ways to mark a record as deleted:
+ `model.soft_delete()` and `query.soft_delete()`.
+
+ The `model.soft_delete()` method works with a single already-fetched entry.
+ `query.soft_delete()` makes only one db request for all entries that
+ correspond to the query.
+
+* In almost all cases you should use `query.soft_delete()`. Some examples:
+
+ .. code:: python
+
+ def soft_delete_bar():
+ count = model_query(BarModel).find(some_condition).soft_delete()
+ if count == 0:
+ raise Exception("0 entries were soft deleted")
+
+ def complex_soft_delete_with_synchronization_bar(session=None):
+ if session is None:
+ session = sessionmaker()
+ with session.begin(subtransactions=True):
+ count = (model_query(BarModel).
+ find(some_condition).
+ soft_delete(synchronize_session=True))
+ # Here synchronize_session is required, because we
+ # don't know what is going on in outer session.
+ if count == 0:
+ raise Exception("0 entries were soft deleted")
+
+* There is only one situation where `model.soft_delete()` is appropriate: when
+ you fetch a single record, work with it, and mark it as deleted in the same
+ transaction.
+
+ .. code:: python
+
+ def soft_delete_bar_model():
+ session = sessionmaker()
+ with session.begin():
+ bar_ref = model_query(BarModel).find(some_condition).first()
+ # Work with bar_ref
+ bar_ref.soft_delete(session=session)
+
+ However, if you need to work with all entries that correspond to query and
+ then soft delete them you should use the `query.soft_delete()` method:
+
+ .. code:: python
+
+ def soft_delete_multi_models():
+ session = sessionmaker()
+ with session.begin():
+ query = (model_query(BarModel, session=session).
+ find(some_condition))
+ model_refs = query.all()
+ # Work with model_refs
+ query.soft_delete(synchronize_session=False)
+ # synchronize_session=False should be set if there is no outer
+ # session and these entries are not used after this.
+
+ When working with many rows, it is very important to use query.soft_delete,
+ which issues a single query. Using `model.soft_delete()`, as in the following
+ example, is very inefficient.
+
+ .. code:: python
+
+ for bar_ref in bar_refs:
+ bar_ref.soft_delete(session=session)
+ # This will produce count(bar_refs) db requests.
+
+"""
+
+import functools
+import logging
+import re
+import time
+
+import six
+from sqlalchemy import exc as sqla_exc
+from sqlalchemy.interfaces import PoolListener
+import sqlalchemy.orm
+from sqlalchemy.pool import NullPool, StaticPool
+from sqlalchemy.sql.expression import literal_column
+
+from oslo.db import exception
+from openstack.common.gettextutils import _LE, _LW
+from openstack.common import timeutils
+
+
+LOG = logging.getLogger(__name__)
+
+
+class SqliteForeignKeysListener(PoolListener):
+ """Ensures that the foreign key constraints are enforced in SQLite.
+
+ The foreign key constraints are disabled by default in SQLite,
+ so the foreign key constraints will be enabled here for every
+ database connection
+ """
+ def connect(self, dbapi_con, con_record):
+ dbapi_con.execute('pragma foreign_keys=ON')
+
+
+# note(boris-42): In current versions of DB backends unique constraint
+# violation messages follow the structure:
+#
+# sqlite:
+# 1 column - (IntegrityError) column c1 is not unique
+# N columns - (IntegrityError) column c1, c2, ..., N are not unique
+#
+# sqlite since 3.7.16:
+# 1 column - (IntegrityError) UNIQUE constraint failed: tbl.k1
+#
+# N columns - (IntegrityError) UNIQUE constraint failed: tbl.k1, tbl.k2
+#
+# postgres:
+# 1 column - (IntegrityError) duplicate key value violates unique
+# constraint "users_c1_key"
+# N columns - (IntegrityError) duplicate key value violates unique
+# constraint "name_of_our_constraint"
+#
+# mysql:
+# 1 column - (IntegrityError) (1062, "Duplicate entry 'value_of_c1' for key
+# 'c1'")
+# N columns - (IntegrityError) (1062, "Duplicate entry 'values joined
+# with -' for key 'name_of_our_constraint'")
+#
+# ibm_db_sa:
+# N columns - (IntegrityError) SQL0803N One or more values in the INSERT
+# statement, UPDATE statement, or foreign key update caused by a
+# DELETE statement are not valid because the primary key, unique
+# constraint or unique index identified by "2" constrains table
+# "NOVA.KEY_PAIRS" from having duplicate values for the index
+# key.
+_DUP_KEY_RE_DB = {
+ "sqlite": (re.compile(r"^.*columns?([^)]+)(is|are)\s+not\s+unique$"),
+ re.compile(r"^.*UNIQUE\s+constraint\s+failed:\s+(.+)$")),
+ "postgresql": (re.compile(r"^.*duplicate\s+key.*\"([^\"]+)\"\s*\n.*$"),),
+ "mysql": (re.compile(r"^.*\(1062,.*'([^\']+)'\"\)$"),),
+ "ibm_db_sa": (re.compile(r"^.*SQL0803N.*$"),),
+}
+
+
+def _raise_if_duplicate_entry_error(integrity_error, engine_name):
+ """Raise exception if two entries are duplicated.
+
+ In this function will be raised DBDuplicateEntry exception if integrity
+ error wrap unique constraint violation.
+ """
+
+ def get_columns_from_uniq_cons_or_name(columns):
+ # note(vsergeyev): UniqueConstraint name convention: "uniq_t0c10c2"
+ # where `t` it is table name and columns `c1`, `c2`
+ # are in UniqueConstraint.
+ uniqbase = "uniq_"
+ if not columns.startswith(uniqbase):
+ if engine_name == "postgresql":
+ return [columns[columns.index("_") + 1:columns.rindex("_")]]
+ return [columns]
+ return columns[len(uniqbase):].split("0")[1:]
+
+ if engine_name not in ("ibm_db_sa", "mysql", "sqlite", "postgresql"):
+ return
+
+ # FIXME(johannes): The usage of the .message attribute has been
+ # deprecated since Python 2.6. However, the exceptions raised by
+ # SQLAlchemy can differ when using unicode() and accessing .message.
+ # An audit across all three supported engines will be necessary to
+ # ensure there are no regressions.
+ for pattern in _DUP_KEY_RE_DB[engine_name]:
+ match = pattern.match(integrity_error.message)
+ if match:
+ break
+ else:
+ return
+
+ # NOTE(mriedem): The ibm_db_sa integrity error message doesn't provide the
+ # columns so we have to omit that from the DBDuplicateEntry error.
+ columns = ''
+
+ if engine_name != 'ibm_db_sa':
+ columns = match.group(1)
+
+ if engine_name == "sqlite":
+ columns = [c.split('.')[-1] for c in columns.strip().split(", ")]
+ else:
+ columns = get_columns_from_uniq_cons_or_name(columns)
+ raise exception.DBDuplicateEntry(columns, integrity_error)
+
+
+# NOTE(comstud): In current versions of DB backends, Deadlock violation
+# messages follow the structure:
+#
+# mysql:
+# (OperationalError) (1213, 'Deadlock found when trying to get lock; try '
+# 'restarting transaction') <query_str> <query_args>
+_DEADLOCK_RE_DB = {
+ "mysql": re.compile(r"^.*\(1213, 'Deadlock.*")
+}
+
+
+def _raise_if_deadlock_error(operational_error, engine_name):
+ """Raise exception on deadlock condition.
+
+ Raise DBDeadlock exception if OperationalError contains a Deadlock
+ condition.
+ """
+ re = _DEADLOCK_RE_DB.get(engine_name)
+ if re is None:
+ return
+ # FIXME(johannes): The usage of the .message attribute has been
+ # deprecated since Python 2.6. However, the exceptions raised by
+ # SQLAlchemy can differ when using unicode() and accessing .message.
+ # An audit across all three supported engines will be necessary to
+ # ensure there are no regressions.
+ m = re.match(operational_error.message)
+ if not m:
+ return
+ raise exception.DBDeadlock(operational_error)
+
+
+def _wrap_db_error(f):
+ @functools.wraps(f)
+ def _wrap(self, *args, **kwargs):
+ try:
+ assert issubclass(
+ self.__class__, sqlalchemy.orm.session.Session
+ ), ('_wrap_db_error() can only be applied to methods of '
+ 'subclasses of sqlalchemy.orm.session.Session.')
+
+ return f(self, *args, **kwargs)
+ except UnicodeEncodeError:
+ raise exception.DBInvalidUnicodeParameter()
+ except sqla_exc.OperationalError as e:
+ _raise_if_db_connection_lost(e, self.bind)
+ _raise_if_deadlock_error(e, self.bind.dialect.name)
+ # NOTE(comstud): A lot of code is checking for OperationalError
+ # so let's not wrap it for now.
+ raise
+ # note(boris-42): We should catch unique constraint violation and
+ # wrap it by our own DBDuplicateEntry exception. Unique constraint
+ # violation is wrapped by IntegrityError.
+ except sqla_exc.IntegrityError as e:
+ # note(boris-42): SqlAlchemy doesn't unify errors from different
+ # DBs so we must do this. Also in some tables (for example
+ # instance_types) there are more than one unique constraint. This
+ # means we should get names of columns, which values violate
+ # unique constraint, from error message.
+ _raise_if_duplicate_entry_error(e, self.bind.dialect.name)
+ raise exception.DBError(e)
+ except Exception as e:
+ LOG.exception(_LE('DB exception wrapped.'))
+ raise exception.DBError(e)
+ return _wrap
+
+
+def _synchronous_switch_listener(dbapi_conn, connection_rec):
+ """Switch sqlite connections to non-synchronous mode."""
+ dbapi_conn.execute("PRAGMA synchronous = OFF")
+
+
+def _add_regexp_listener(dbapi_con, con_record):
+ """Add REGEXP function to sqlite connections."""
+
+ def regexp(expr, item):
+ reg = re.compile(expr)
+ return reg.search(six.text_type(item)) is not None
+ dbapi_con.create_function('regexp', 2, regexp)
+
+
+def _thread_yield(dbapi_con, con_record):
+ """Ensure other greenthreads get a chance to be executed.
+
+ If we use eventlet.monkey_patch(), eventlet.greenthread.sleep(0) will
+ execute instead of time.sleep(0).
+ Force a context switch. With common database backends (eg MySQLdb and
+ sqlite), there is no implicit yield caused by network I/O since they are
+ implemented by C libraries that eventlet cannot monkey patch.
+ """
+ time.sleep(0)
+
+
+def _ping_listener(engine, dbapi_conn, connection_rec, connection_proxy):
+ """Ensures that MySQL, PostgreSQL or DB2 connections are alive.
+
+ Borrowed from:
+ http://groups.google.com/group/sqlalchemy/msg/a4ce563d802c929f
+ """
+ cursor = dbapi_conn.cursor()
+ try:
+ ping_sql = 'select 1'
+ if engine.name == 'ibm_db_sa':
+ # DB2 requires a table expression
+ ping_sql = 'select 1 from (values (1)) AS t1'
+ cursor.execute(ping_sql)
+ except Exception as ex:
+ if engine.dialect.is_disconnect(ex, dbapi_conn, cursor):
+ msg = _LW('Database server has gone away: %s') % ex
+ LOG.warning(msg)
+
+ # if the database server has gone away, all connections in the pool
+ # have become invalid and we can safely close all of them here,
+ # rather than waste time on checking of every single connection
+ engine.dispose()
+
+ # this will be handled by SQLAlchemy and will force it to create
+ # a new connection and retry the original action
+ raise sqla_exc.DisconnectionError(msg)
+ else:
+ raise
+
+
+def _set_session_sql_mode(dbapi_con, connection_rec, sql_mode=None):
+ """Set the sql_mode session variable.
+
+ MySQL supports several server modes. The default is None, but sessions
+ may choose to enable server modes like TRADITIONAL, ANSI,
+ several STRICT_* modes and others.
+
+ Note: passing in '' (empty string) for sql_mode clears
+ the SQL mode for the session, overriding a potentially set
+ server default.
+ """
+
+ cursor = dbapi_con.cursor()
+ cursor.execute("SET SESSION sql_mode = %s", [sql_mode])
+
+
+def _mysql_get_effective_sql_mode(engine):
+ """Returns the effective SQL mode for connections from the engine pool.
+
+ Returns ``None`` if the mode isn't available, otherwise returns the mode.
+
+ """
+ # Get the real effective SQL mode. Even when unset by
+ # our own config, the server may still be operating in a specific
+ # SQL mode as set by the server configuration.
+ # Also note that the checkout listener will be called on execute to
+ # set the mode if it's registered.
+ row = engine.execute("SHOW VARIABLES LIKE 'sql_mode'").fetchone()
+ if row is None:
+ return
+ return row[1]
+
+
+def _mysql_check_effective_sql_mode(engine):
+ """Logs a message based on the effective SQL mode for MySQL connections."""
+ realmode = _mysql_get_effective_sql_mode(engine)
+
+ if realmode is None:
+ LOG.warning(_LW('Unable to detect effective SQL mode'))
+ return
+
+ LOG.debug('MySQL server mode set to %s', realmode)
+ # 'TRADITIONAL' mode enables several other modes, so
+ # we need a substring match here
+ if not ('TRADITIONAL' in realmode.upper() or
+ 'STRICT_ALL_TABLES' in realmode.upper()):
+ LOG.warning(_LW("MySQL SQL mode is '%s', "
+ "consider enabling TRADITIONAL or STRICT_ALL_TABLES"),
+ realmode)
+
+
+def _mysql_set_mode_callback(engine, sql_mode):
+ if sql_mode is not None:
+ mode_callback = functools.partial(_set_session_sql_mode,
+ sql_mode=sql_mode)
+ sqlalchemy.event.listen(engine, 'connect', mode_callback)
+ _mysql_check_effective_sql_mode(engine)
+
+
+def _is_db_connection_error(args):
+ """Return True if error in connecting to db."""
+ # NOTE(adam_g): This is currently MySQL specific and needs to be extended
+ # to support Postgres and others.
+ # For the db2, the error code is -30081 since the db2 is still not ready
+ conn_err_codes = ('2002', '2003', '2006', '2013', '-30081')
+ for err_code in conn_err_codes:
+ if args.find(err_code) != -1:
+ return True
+ return False
+
+
+def _raise_if_db_connection_lost(error, engine):
+ # NOTE(vsergeyev): Function is_disconnect(e, connection, cursor)
+ # requires connection and cursor in incoming parameters,
+ # but we have no possibility to create connection if DB
+ # is not available, so in such case reconnect fails.
+ # But is_disconnect() ignores these parameters, so it
+ # makes sense to pass to function None as placeholder
+ # instead of connection and cursor.
+ if engine.dialect.is_disconnect(error, None, None):
+ raise exception.DBConnectionError(error)
+
+
+def create_engine(sql_connection, sqlite_fk=False, mysql_sql_mode=None,
+ idle_timeout=3600,
+ connection_debug=0, max_pool_size=None, max_overflow=None,
+ pool_timeout=None, sqlite_synchronous=True,
+ connection_trace=False, max_retries=10, retry_interval=10):
+ """Return a new SQLAlchemy engine."""
+
+ connection_dict = sqlalchemy.engine.url.make_url(sql_connection)
+
+ engine_args = {
+ "pool_recycle": idle_timeout,
+ 'convert_unicode': True,
+ }
+
+ logger = logging.getLogger('sqlalchemy.engine')
+
+ # Map SQL debug level to Python log level
+ if connection_debug >= 100:
+ logger.setLevel(logging.DEBUG)
+ elif connection_debug >= 50:
+ logger.setLevel(logging.INFO)
+ else:
+ logger.setLevel(logging.WARNING)
+
+ if "sqlite" in connection_dict.drivername:
+ if sqlite_fk:
+ engine_args["listeners"] = [SqliteForeignKeysListener()]
+ engine_args["poolclass"] = NullPool
+
+ if sql_connection == "sqlite://":
+ engine_args["poolclass"] = StaticPool
+ engine_args["connect_args"] = {'check_same_thread': False}
+ else:
+ if max_pool_size is not None:
+ engine_args['pool_size'] = max_pool_size
+ if max_overflow is not None:
+ engine_args['max_overflow'] = max_overflow
+ if pool_timeout is not None:
+ engine_args['pool_timeout'] = pool_timeout
+
+ engine = sqlalchemy.create_engine(sql_connection, **engine_args)
+
+ sqlalchemy.event.listen(engine, 'checkin', _thread_yield)
+
+ if engine.name in ('ibm_db_sa', 'mysql', 'postgresql'):
+ ping_callback = functools.partial(_ping_listener, engine)
+ sqlalchemy.event.listen(engine, 'checkout', ping_callback)
+ if engine.name == 'mysql':
+ if mysql_sql_mode:
+ _mysql_set_mode_callback(engine, mysql_sql_mode)
+ elif 'sqlite' in connection_dict.drivername:
+ if not sqlite_synchronous:
+ sqlalchemy.event.listen(engine, 'connect',
+ _synchronous_switch_listener)
+ sqlalchemy.event.listen(engine, 'connect', _add_regexp_listener)
+
+ if connection_trace and engine.dialect.dbapi.__name__ == 'MySQLdb':
+ _patch_mysqldb_with_stacktrace_comments()
+
+ try:
+ engine.connect()
+ except sqla_exc.OperationalError as e:
+ if not _is_db_connection_error(e.args[0]):
+ raise
+
+ remaining = max_retries
+ if remaining == -1:
+ remaining = 'infinite'
+ while True:
+ msg = _LW('SQL connection failed. %s attempts left.')
+ LOG.warning(msg % remaining)
+ if remaining != 'infinite':
+ remaining -= 1
+ time.sleep(retry_interval)
+ try:
+ engine.connect()
+ break
+ except sqla_exc.OperationalError as e:
+ if (remaining != 'infinite' and remaining == 0) or \
+ not _is_db_connection_error(e.args[0]):
+ raise
+ return engine
+
+
+class Query(sqlalchemy.orm.query.Query):
+ """Subclass of sqlalchemy.query with soft_delete() method."""
+ def soft_delete(self, synchronize_session='evaluate'):
+ return self.update({'deleted': literal_column('id'),
+ 'updated_at': literal_column('updated_at'),
+ 'deleted_at': timeutils.utcnow()},
+ synchronize_session=synchronize_session)
+
+
+class Session(sqlalchemy.orm.session.Session):
+ """Custom Session class to avoid SqlAlchemy Session monkey patching."""
+ @_wrap_db_error
+ def query(self, *args, **kwargs):
+ return super(Session, self).query(*args, **kwargs)
+
+ @_wrap_db_error
+ def flush(self, *args, **kwargs):
+ return super(Session, self).flush(*args, **kwargs)
+
+ @_wrap_db_error
+ def execute(self, *args, **kwargs):
+ return super(Session, self).execute(*args, **kwargs)
+
+
+def get_maker(engine, autocommit=True, expire_on_commit=False):
+ """Return a SQLAlchemy sessionmaker using the given engine."""
+ return sqlalchemy.orm.sessionmaker(bind=engine,
+ class_=Session,
+ autocommit=autocommit,
+ expire_on_commit=expire_on_commit,
+ query_cls=Query)
+
+
+def _patch_mysqldb_with_stacktrace_comments():
+ """Adds current stack trace as a comment in queries.
+
+ Patches MySQLdb.cursors.BaseCursor._do_query.
+ """
+ import MySQLdb.cursors
+ import traceback
+
+ old_mysql_do_query = MySQLdb.cursors.BaseCursor._do_query
+
+ def _do_query(self, q):
+ stack = ''
+ for filename, line, method, function in traceback.extract_stack():
+ # exclude various common things from trace
+ if filename.endswith('session.py') and method == '_do_query':
+ continue
+ if filename.endswith('api.py') and method == 'wrapper':
+ continue
+ if filename.endswith('utils.py') and method == '_inner':
+ continue
+ if filename.endswith('exception.py') and method == '_wrap':
+ continue
+ # db/api is just a wrapper around db/sqlalchemy/api
+ if filename.endswith('db/api.py'):
+ continue
+ # only trace inside oslo
+ index = filename.rfind('oslo')
+ if index == -1:
+ continue
+ stack += "File:%s:%s Method:%s() Line:%s | " \
+ % (filename[index:], line, method, function)
+
+ # strip trailing " | " from stack
+ if stack:
+ stack = stack[:-3]
+ qq = "%s /* %s */" % (q, stack)
+ else:
+ qq = q
+ old_mysql_do_query(self, qq)
+
+ setattr(MySQLdb.cursors.BaseCursor, '_do_query', _do_query)
+
+
+class EngineFacade(object):
+ """A helper class for removing of global engine instances from oslo.db.
+
+ As a library, oslo.db can't decide where to store/when to create engine
+ and sessionmaker instances, so this must be left for a target application.
+
+ On the other hand, in order to simplify the adoption of oslo.db changes,
+ we'll provide a helper class, which creates engine and sessionmaker
+ on its instantiation and provides get_engine()/get_session() methods
+ that are compatible with corresponding utility functions that currently
+ exist in target projects, e.g. in Nova.
+
+ engine/sessionmaker instances will still be global (and they are meant to
+ be global), but they will be stored in the app context, rather that in the
+ oslo.db context.
+
+ Note: using of this helper is completely optional and you are encouraged to
+ integrate engine/sessionmaker instances into your apps any way you like
+ (e.g. one might want to bind a session to a request context). Two important
+ things to remember:
+
+ 1. An Engine instance is effectively a pool of DB connections, so it's
+ meant to be shared (and it's thread-safe).
+ 2. A Session instance is not meant to be shared and represents a DB
+ transactional context (i.e. it's not thread-safe). sessionmaker is
+ a factory of sessions.
+
+ """
+
+ def __init__(self, sql_connection,
+ sqlite_fk=False, autocommit=True,
+ expire_on_commit=False, **kwargs):
+ """Initialize engine and sessionmaker instances.
+
+ :param sqlite_fk: enable foreign keys in SQLite
+ :type sqlite_fk: bool
+
+ :param autocommit: use autocommit mode for created Session instances
+ :type autocommit: bool
+
+ :param expire_on_commit: expire session objects on commit
+ :type expire_on_commit: bool
+
+ Keyword arguments:
+
+ :keyword mysql_sql_mode: the SQL mode to be used for MySQL sessions.
+ (defaults to TRADITIONAL)
+ :keyword idle_timeout: timeout before idle sql connections are reaped
+ (defaults to 3600)
+ :keyword connection_debug: verbosity of SQL debugging information.
+ 0=None, 100=Everything (defaults to 0)
+ :keyword max_pool_size: maximum number of SQL connections to keep open
+ in a pool (defaults to SQLAlchemy settings)
+ :keyword max_overflow: if set, use this value for max_overflow with
+ sqlalchemy (defaults to SQLAlchemy settings)
+ :keyword pool_timeout: if set, use this value for pool_timeout with
+ sqlalchemy (defaults to SQLAlchemy settings)
+ :keyword sqlite_synchronous: if True, SQLite uses synchronous mode
+ (defaults to True)
+ :keyword connection_trace: add python stack traces to SQL as comment
+ strings (defaults to False)
+ :keyword max_retries: maximum db connection retries during startup.
+ (setting -1 implies an infinite retry count)
+ (defaults to 10)
+ :keyword retry_interval: interval between retries of opening a sql
+ connection (defaults to 10)
+
+ """
+
+ super(EngineFacade, self).__init__()
+
+ self._engine = create_engine(
+ sql_connection=sql_connection,
+ sqlite_fk=sqlite_fk,
+ mysql_sql_mode=kwargs.get('mysql_sql_mode', 'TRADITIONAL'),
+ idle_timeout=kwargs.get('idle_timeout', 3600),
+ connection_debug=kwargs.get('connection_debug', 0),
+ max_pool_size=kwargs.get('max_pool_size'),
+ max_overflow=kwargs.get('max_overflow'),
+ pool_timeout=kwargs.get('pool_timeout'),
+ sqlite_synchronous=kwargs.get('sqlite_synchronous', True),
+ connection_trace=kwargs.get('connection_trace', False),
+ max_retries=kwargs.get('max_retries', 10),
+ retry_interval=kwargs.get('retry_interval', 10))
+ self._session_maker = get_maker(
+ engine=self._engine,
+ autocommit=autocommit,
+ expire_on_commit=expire_on_commit)
+
+ def get_engine(self):
+ """Get the engine instance (note, that it's shared)."""
+
+ return self._engine
+
+ def get_session(self, **kwargs):
+ """Get a Session instance.
+
+ If passed, keyword arguments values override the ones used when the
+ sessionmaker instance was created.
+
+ :keyword autocommit: use autocommit mode for created Session instances
+ :type autocommit: bool
+
+ :keyword expire_on_commit: expire session objects on commit
+ :type expire_on_commit: bool
+
+ """
+
+ for arg in kwargs:
+ if arg not in ('autocommit', 'expire_on_commit'):
+ del kwargs[arg]
+
+ return self._session_maker(**kwargs)
+
+ @classmethod
+ def from_config(cls, connection_string, conf,
+ sqlite_fk=False, autocommit=True, expire_on_commit=False):
+ """Initialize EngineFacade using oslo.config config instance options.
+
+ :param connection_string: SQLAlchemy connection string
+ :type connection_string: string
+
+ :param conf: oslo.config config instance
+ :type conf: oslo.config.cfg.ConfigOpts
+
+ :param sqlite_fk: enable foreign keys in SQLite
+ :type sqlite_fk: bool
+
+ :param autocommit: use autocommit mode for created Session instances
+ :type autocommit: bool
+
+ :param expire_on_commit: expire session objects on commit
+ :type expire_on_commit: bool
+
+ """
+
+ return cls(sql_connection=connection_string,
+ sqlite_fk=sqlite_fk,
+ autocommit=autocommit,
+ expire_on_commit=expire_on_commit,
+ **dict(conf.database.items()))
diff --git a/oslo/db/sqlalchemy/test_base.py b/oslo/db/sqlalchemy/test_base.py
new file mode 100644
index 0000000..7664faa
--- /dev/null
+++ b/oslo/db/sqlalchemy/test_base.py
@@ -0,0 +1,165 @@
+# Copyright (c) 2013 OpenStack Foundation
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import abc
+import functools
+import os
+
+import fixtures
+from oslotest import base as test_base
+import six
+
+from oslo.db.sqlalchemy import provision
+from oslo.db.sqlalchemy import session
+from oslo.db.sqlalchemy import utils
+
+
+class DbFixture(fixtures.Fixture):
+ """Basic database fixture.
+
+ Allows to run tests on various db backends, such as SQLite, MySQL and
+ PostgreSQL. By default use sqlite backend. To override default backend
+ uri set env variable OS_TEST_DBAPI_CONNECTION with database admin
+ credentials for specific backend.
+ """
+
+ def _get_uri(self):
+ return os.getenv('OS_TEST_DBAPI_CONNECTION', 'sqlite://')
+
+ def __init__(self, test):
+ super(DbFixture, self).__init__()
+
+ self.test = test
+
+ def cleanUp(self):
+ self.test.engine.dispose()
+
+ def setUp(self):
+ super(DbFixture, self).setUp()
+
+ self.test.engine = session.create_engine(self._get_uri())
+ self.test.sessionmaker = session.get_maker(self.test.engine)
+
+
+class DbTestCase(test_base.BaseTestCase):
+ """Base class for testing of DB code.
+
+ Using `DbFixture`. Intended to be the main database test case to use all
+ the tests on a given backend with user defined uri. Backend specific
+ tests should be decorated with `backend_specific` decorator.
+ """
+
+ FIXTURE = DbFixture
+
+ def setUp(self):
+ super(DbTestCase, self).setUp()
+ self.useFixture(self.FIXTURE(self))
+
+
+ALLOWED_DIALECTS = ['sqlite', 'mysql', 'postgresql']
+
+
+def backend_specific(*dialects):
+ """Decorator to skip backend specific tests on inappropriate engines.
+
+ ::dialects: list of dialects names under which the test will be launched.
+ """
+ def wrap(f):
+ @functools.wraps(f)
+ def ins_wrap(self):
+ if not set(dialects).issubset(ALLOWED_DIALECTS):
+ raise ValueError(
+ "Please use allowed dialects: %s" % ALLOWED_DIALECTS)
+ if self.engine.name not in dialects:
+ msg = ('The test "%s" can be run '
+ 'only on %s. Current engine is %s.')
+ args = (f.__name__, ' '.join(dialects), self.engine.name)
+ self.skip(msg % args)
+ else:
+ return f(self)
+ return ins_wrap
+ return wrap
+
+
+@six.add_metaclass(abc.ABCMeta)
+class OpportunisticFixture(DbFixture):
+ """Base fixture to use default CI databases.
+
+ The databases exist in OpenStack CI infrastructure. But for the
+ correct functioning in local environment the databases must be
+ created manually.
+ """
+
+ DRIVER = abc.abstractproperty(lambda: None)
+ DBNAME = PASSWORD = USERNAME = 'openstack_citest'
+
+ def setUp(self):
+ self._provisioning_engine = provision.get_engine(
+ utils.get_connect_string(backend=self.DRIVER,
+ user=self.USERNAME,
+ passwd=self.PASSWORD,
+ database=self.DBNAME)
+ )
+ self._uri = provision.create_database(self._provisioning_engine)
+
+ super(OpportunisticFixture, self).setUp()
+
+ def cleanUp(self):
+ super(OpportunisticFixture, self).cleanUp()
+
+ provision.drop_database(self._provisioning_engine, self._uri)
+
+ def _get_uri(self):
+ return self._uri
+
+
+@six.add_metaclass(abc.ABCMeta)
+class OpportunisticTestCase(DbTestCase):
+ """Base test case to use default CI databases.
+
+ The subclasses of the test case are running only when openstack_citest
+ database is available otherwise a tests will be skipped.
+ """
+
+ FIXTURE = abc.abstractproperty(lambda: None)
+
+ def setUp(self):
+ credentials = {
+ 'backend': self.FIXTURE.DRIVER,
+ 'user': self.FIXTURE.USERNAME,
+ 'passwd': self.FIXTURE.PASSWORD,
+ 'database': self.FIXTURE.DBNAME}
+
+ if self.FIXTURE.DRIVER and not utils.is_backend_avail(**credentials):
+ msg = '%s backend is not available.' % self.FIXTURE.DRIVER
+ return self.skip(msg)
+
+ super(OpportunisticTestCase, self).setUp()
+
+
+class MySQLOpportunisticFixture(OpportunisticFixture):
+ DRIVER = 'mysql'
+
+
+class PostgreSQLOpportunisticFixture(OpportunisticFixture):
+ DRIVER = 'postgresql'
+
+
+class MySQLOpportunisticTestCase(OpportunisticTestCase):
+ FIXTURE = MySQLOpportunisticFixture
+
+
+class PostgreSQLOpportunisticTestCase(OpportunisticTestCase):
+ FIXTURE = PostgreSQLOpportunisticFixture
diff --git a/oslo/db/sqlalchemy/test_migrations.conf b/oslo/db/sqlalchemy/test_migrations.conf
new file mode 100644
index 0000000..e5e60f3
--- /dev/null
+++ b/oslo/db/sqlalchemy/test_migrations.conf
@@ -0,0 +1,7 @@
+[DEFAULT]
+# Set up any number of migration data stores you want, one
+# The "name" used in the test is the config variable key.
+#sqlite=sqlite:///test_migrations.db
+sqlite=sqlite://
+#mysql=mysql://root:@localhost/test_migrations
+#postgresql=postgresql://user:pass@localhost/test_migrations
diff --git a/oslo/db/sqlalchemy/test_migrations.py b/oslo/db/sqlalchemy/test_migrations.py
new file mode 100644
index 0000000..886bb04
--- /dev/null
+++ b/oslo/db/sqlalchemy/test_migrations.py
@@ -0,0 +1,269 @@
+# Copyright 2010-2011 OpenStack Foundation
+# Copyright 2012-2013 IBM Corp.
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import functools
+import logging
+import os
+import subprocess
+
+import lockfile
+from oslotest import base as test_base
+from six import moves
+from six.moves.urllib import parse
+import sqlalchemy
+import sqlalchemy.exc
+
+from oslo.db.sqlalchemy import utils
+from openstack.common.gettextutils import _LE
+
+LOG = logging.getLogger(__name__)
+
+
+def _have_mysql(user, passwd, database):
+ present = os.environ.get('TEST_MYSQL_PRESENT')
+ if present is None:
+ return utils.is_backend_avail(backend='mysql',
+ user=user,
+ passwd=passwd,
+ database=database)
+ return present.lower() in ('', 'true')
+
+
+def _have_postgresql(user, passwd, database):
+ present = os.environ.get('TEST_POSTGRESQL_PRESENT')
+ if present is None:
+ return utils.is_backend_avail(backend='postgres',
+ user=user,
+ passwd=passwd,
+ database=database)
+ return present.lower() in ('', 'true')
+
+
+def _set_db_lock(lock_path=None, lock_prefix=None):
+ def decorator(f):
+ @functools.wraps(f)
+ def wrapper(*args, **kwargs):
+ try:
+ path = lock_path or os.environ.get("OSLO_LOCK_PATH")
+ lock = lockfile.FileLock(os.path.join(path, lock_prefix))
+ with lock:
+ LOG.debug('Got lock "%s"' % f.__name__)
+ return f(*args, **kwargs)
+ finally:
+ LOG.debug('Lock released "%s"' % f.__name__)
+ return wrapper
+ return decorator
+
+
+class BaseMigrationTestCase(test_base.BaseTestCase):
+ """Base class fort testing of migration utils."""
+
+ def __init__(self, *args, **kwargs):
+ super(BaseMigrationTestCase, self).__init__(*args, **kwargs)
+
+ self.DEFAULT_CONFIG_FILE = os.path.join(os.path.dirname(__file__),
+ 'test_migrations.conf')
+ # Test machines can set the TEST_MIGRATIONS_CONF variable
+ # to override the location of the config file for migration testing
+ self.CONFIG_FILE_PATH = os.environ.get('TEST_MIGRATIONS_CONF',
+ self.DEFAULT_CONFIG_FILE)
+ self.test_databases = {}
+ self.migration_api = None
+
+ def setUp(self):
+ super(BaseMigrationTestCase, self).setUp()
+
+ # Load test databases from the config file. Only do this
+ # once. No need to re-run this on each test...
+ LOG.debug('config_path is %s' % self.CONFIG_FILE_PATH)
+ if os.path.exists(self.CONFIG_FILE_PATH):
+ cp = moves.configparser.RawConfigParser()
+ try:
+ cp.read(self.CONFIG_FILE_PATH)
+ defaults = cp.defaults()
+ for key, value in defaults.items():
+ self.test_databases[key] = value
+ except moves.configparser.ParsingError as e:
+ self.fail("Failed to read test_migrations.conf config "
+ "file. Got error: %s" % e)
+ else:
+ self.fail("Failed to find test_migrations.conf config "
+ "file.")
+
+ self.engines = {}
+ for key, value in self.test_databases.items():
+ self.engines[key] = sqlalchemy.create_engine(value)
+
+ # We start each test case with a completely blank slate.
+ self._reset_databases()
+
+ def tearDown(self):
+ # We destroy the test data store between each test case,
+ # and recreate it, which ensures that we have no side-effects
+ # from the tests
+ self._reset_databases()
+ super(BaseMigrationTestCase, self).tearDown()
+
+ def execute_cmd(self, cmd=None):
+ process = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE,
+ stderr=subprocess.STDOUT)
+ output = process.communicate()[0]
+ LOG.debug(output)
+ self.assertEqual(0, process.returncode,
+ "Failed to run: %s\n%s" % (cmd, output))
+
+ def _reset_pg(self, conn_pieces):
+ (user,
+ password,
+ database,
+ host) = utils.get_db_connection_info(conn_pieces)
+ os.environ['PGPASSWORD'] = password
+ os.environ['PGUSER'] = user
+ # note(boris-42): We must create and drop database, we can't
+ # drop database which we have connected to, so for such
+ # operations there is a special database template1.
+ sqlcmd = ("psql -w -U %(user)s -h %(host)s -c"
+ " '%(sql)s' -d template1")
+
+ sql = ("drop database if exists %s;") % database
+ droptable = sqlcmd % {'user': user, 'host': host, 'sql': sql}
+ self.execute_cmd(droptable)
+
+ sql = ("create database %s;") % database
+ createtable = sqlcmd % {'user': user, 'host': host, 'sql': sql}
+ self.execute_cmd(createtable)
+
+ os.unsetenv('PGPASSWORD')
+ os.unsetenv('PGUSER')
+
+ @_set_db_lock(lock_prefix='migration_tests-')
+ def _reset_databases(self):
+ for key, engine in self.engines.items():
+ conn_string = self.test_databases[key]
+ conn_pieces = parse.urlparse(conn_string)
+ engine.dispose()
+ if conn_string.startswith('sqlite'):
+ # We can just delete the SQLite database, which is
+ # the easiest and cleanest solution
+ db_path = conn_pieces.path.strip('/')
+ if os.path.exists(db_path):
+ os.unlink(db_path)
+ # No need to recreate the SQLite DB. SQLite will
+ # create it for us if it's not there...
+ elif conn_string.startswith('mysql'):
+ # We can execute the MySQL client to destroy and re-create
+ # the MYSQL database, which is easier and less error-prone
+ # than using SQLAlchemy to do this via MetaData...trust me.
+ (user, password, database, host) = \
+ utils.get_db_connection_info(conn_pieces)
+ sql = ("drop database if exists %(db)s; "
+ "create database %(db)s;") % {'db': database}
+ cmd = ("mysql -u \"%(user)s\" -p\"%(password)s\" -h %(host)s "
+ "-e \"%(sql)s\"") % {'user': user, 'password': password,
+ 'host': host, 'sql': sql}
+ self.execute_cmd(cmd)
+ elif conn_string.startswith('postgresql'):
+ self._reset_pg(conn_pieces)
+
+
+class WalkVersionsMixin(object):
+ def _walk_versions(self, engine=None, snake_walk=False, downgrade=True):
+ # Determine latest version script from the repo, then
+ # upgrade from 1 through to the latest, with no data
+ # in the databases. This just checks that the schema itself
+ # upgrades successfully.
+
+ # Place the database under version control
+ self.migration_api.version_control(engine, self.REPOSITORY,
+ self.INIT_VERSION)
+ self.assertEqual(self.INIT_VERSION,
+ self.migration_api.db_version(engine,
+ self.REPOSITORY))
+
+ LOG.debug('latest version is %s' % self.REPOSITORY.latest)
+ versions = range(self.INIT_VERSION + 1, self.REPOSITORY.latest + 1)
+
+ for version in versions:
+ # upgrade -> downgrade -> upgrade
+ self._migrate_up(engine, version, with_data=True)
+ if snake_walk:
+ downgraded = self._migrate_down(
+ engine, version - 1, with_data=True)
+ if downgraded:
+ self._migrate_up(engine, version)
+
+ if downgrade:
+ # Now walk it back down to 0 from the latest, testing
+ # the downgrade paths.
+ for version in reversed(versions):
+ # downgrade -> upgrade -> downgrade
+ downgraded = self._migrate_down(engine, version - 1)
+
+ if snake_walk and downgraded:
+ self._migrate_up(engine, version)
+ self._migrate_down(engine, version - 1)
+
+ def _migrate_down(self, engine, version, with_data=False):
+ try:
+ self.migration_api.downgrade(engine, self.REPOSITORY, version)
+ except NotImplementedError:
+ # NOTE(sirp): some migrations, namely release-level
+ # migrations, don't support a downgrade.
+ return False
+
+ self.assertEqual(
+ version, self.migration_api.db_version(engine, self.REPOSITORY))
+
+ # NOTE(sirp): `version` is what we're downgrading to (i.e. the 'target'
+ # version). So if we have any downgrade checks, they need to be run for
+ # the previous (higher numbered) migration.
+ if with_data:
+ post_downgrade = getattr(
+ self, "_post_downgrade_%03d" % (version + 1), None)
+ if post_downgrade:
+ post_downgrade(engine)
+
+ return True
+
+ def _migrate_up(self, engine, version, with_data=False):
+ """migrate up to a new version of the db.
+
+ We allow for data insertion and post checks at every
+ migration version with special _pre_upgrade_### and
+ _check_### functions in the main test.
+ """
+ # NOTE(sdague): try block is here because it's impossible to debug
+ # where a failed data migration happens otherwise
+ try:
+ if with_data:
+ data = None
+ pre_upgrade = getattr(
+ self, "_pre_upgrade_%03d" % version, None)
+ if pre_upgrade:
+ data = pre_upgrade(engine)
+
+ self.migration_api.upgrade(engine, self.REPOSITORY, version)
+ self.assertEqual(version,
+ self.migration_api.db_version(engine,
+ self.REPOSITORY))
+ if with_data:
+ check = getattr(self, "_check_%03d" % version, None)
+ if check:
+ check(engine, data)
+ except Exception:
+ LOG.error(_LE("Failed to migrate to version %s on engine %s") %
+ (version, engine))
+ raise
diff --git a/oslo/db/sqlalchemy/utils.py b/oslo/db/sqlalchemy/utils.py
new file mode 100644
index 0000000..02daea5
--- /dev/null
+++ b/oslo/db/sqlalchemy/utils.py
@@ -0,0 +1,655 @@
+# Copyright 2010 United States Government as represented by the
+# Administrator of the National Aeronautics and Space Administration.
+# Copyright 2010-2011 OpenStack Foundation.
+# Copyright 2012 Justin Santa Barbara
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import logging
+import re
+
+import sqlalchemy
+from sqlalchemy import Boolean
+from sqlalchemy import CheckConstraint
+from sqlalchemy import Column
+from sqlalchemy.engine import reflection
+from sqlalchemy.ext.compiler import compiles
+from sqlalchemy import func
+from sqlalchemy import Index
+from sqlalchemy import Integer
+from sqlalchemy import MetaData
+from sqlalchemy import or_
+from sqlalchemy.sql.expression import literal_column
+from sqlalchemy.sql.expression import UpdateBase
+from sqlalchemy import String
+from sqlalchemy import Table
+from sqlalchemy.types import NullType
+
+from openstack.common import context as request_context
+from oslo.db.sqlalchemy import models
+from openstack.common.gettextutils import _, _LI, _LW
+from openstack.common import timeutils
+
+
+LOG = logging.getLogger(__name__)
+
+_DBURL_REGEX = re.compile(r"[^:]+://([^:]+):([^@]+)@.+")
+
+
+def sanitize_db_url(url):
+ match = _DBURL_REGEX.match(url)
+ if match:
+ return '%s****:****%s' % (url[:match.start(1)], url[match.end(2):])
+ return url
+
+
+class InvalidSortKey(Exception):
+ message = _("Sort key supplied was not valid.")
+
+
+# copy from glance/db/sqlalchemy/api.py
+def paginate_query(query, model, limit, sort_keys, marker=None,
+ sort_dir=None, sort_dirs=None):
+ """Returns a query with sorting / pagination criteria added.
+
+ Pagination works by requiring a unique sort_key, specified by sort_keys.
+ (If sort_keys is not unique, then we risk looping through values.)
+ We use the last row in the previous page as the 'marker' for pagination.
+ So we must return values that follow the passed marker in the order.
+ With a single-valued sort_key, this would be easy: sort_key > X.
+ With a compound-values sort_key, (k1, k2, k3) we must do this to repeat
+ the lexicographical ordering:
+ (k1 > X1) or (k1 == X1 && k2 > X2) or (k1 == X1 && k2 == X2 && k3 > X3)
+
+ We also have to cope with different sort_directions.
+
+ Typically, the id of the last row is used as the client-facing pagination
+ marker, then the actual marker object must be fetched from the db and
+ passed in to us as marker.
+
+ :param query: the query object to which we should add paging/sorting
+ :param model: the ORM model class
+ :param limit: maximum number of items to return
+ :param sort_keys: array of attributes by which results should be sorted
+ :param marker: the last item of the previous page; we returns the next
+ results after this value.
+ :param sort_dir: direction in which results should be sorted (asc, desc)
+ :param sort_dirs: per-column array of sort_dirs, corresponding to sort_keys
+
+ :rtype: sqlalchemy.orm.query.Query
+ :return: The query with sorting/pagination added.
+ """
+
+ if 'id' not in sort_keys:
+ # TODO(justinsb): If this ever gives a false-positive, check
+ # the actual primary key, rather than assuming its id
+ LOG.warning(_LW('Id not in sort_keys; is sort_keys unique?'))
+
+ assert(not (sort_dir and sort_dirs))
+
+ # Default the sort direction to ascending
+ if sort_dirs is None and sort_dir is None:
+ sort_dir = 'asc'
+
+ # Ensure a per-column sort direction
+ if sort_dirs is None:
+ sort_dirs = [sort_dir for _sort_key in sort_keys]
+
+ assert(len(sort_dirs) == len(sort_keys))
+
+ # Add sorting
+ for current_sort_key, current_sort_dir in zip(sort_keys, sort_dirs):
+ try:
+ sort_dir_func = {
+ 'asc': sqlalchemy.asc,
+ 'desc': sqlalchemy.desc,
+ }[current_sort_dir]
+ except KeyError:
+ raise ValueError(_("Unknown sort direction, "
+ "must be 'desc' or 'asc'"))
+ try:
+ sort_key_attr = getattr(model, current_sort_key)
+ except AttributeError:
+ raise InvalidSortKey()
+ query = query.order_by(sort_dir_func(sort_key_attr))
+
+ # Add pagination
+ if marker is not None:
+ marker_values = []
+ for sort_key in sort_keys:
+ v = getattr(marker, sort_key)
+ marker_values.append(v)
+
+ # Build up an array of sort criteria as in the docstring
+ criteria_list = []
+ for i in range(len(sort_keys)):
+ crit_attrs = []
+ for j in range(i):
+ model_attr = getattr(model, sort_keys[j])
+ crit_attrs.append((model_attr == marker_values[j]))
+
+ model_attr = getattr(model, sort_keys[i])
+ if sort_dirs[i] == 'desc':
+ crit_attrs.append((model_attr < marker_values[i]))
+ else:
+ crit_attrs.append((model_attr > marker_values[i]))
+
+ criteria = sqlalchemy.sql.and_(*crit_attrs)
+ criteria_list.append(criteria)
+
+ f = sqlalchemy.sql.or_(*criteria_list)
+ query = query.filter(f)
+
+ if limit is not None:
+ query = query.limit(limit)
+
+ return query
+
+
+def _read_deleted_filter(query, db_model, read_deleted):
+ if 'deleted' not in db_model.__table__.columns:
+ raise ValueError(_("There is no `deleted` column in `%s` table. "
+ "Project doesn't use soft-deleted feature.")
+ % db_model.__name__)
+
+ default_deleted_value = db_model.__table__.c.deleted.default.arg
+ if read_deleted == 'no':
+ query = query.filter(db_model.deleted == default_deleted_value)
+ elif read_deleted == 'yes':
+ pass # omit the filter to include deleted and active
+ elif read_deleted == 'only':
+ query = query.filter(db_model.deleted != default_deleted_value)
+ else:
+ raise ValueError(_("Unrecognized read_deleted value '%s'")
+ % read_deleted)
+ return query
+
+
+def _project_filter(query, db_model, context, project_only):
+ if project_only and 'project_id' not in db_model.__table__.columns:
+ raise ValueError(_("There is no `project_id` column in `%s` table.")
+ % db_model.__name__)
+
+ if request_context.is_user_context(context) and project_only:
+ if project_only == 'allow_none':
+ is_none = None
+ query = query.filter(or_(db_model.project_id == context.project_id,
+ db_model.project_id == is_none))
+ else:
+ query = query.filter(db_model.project_id == context.project_id)
+
+ return query
+
+
+def model_query(context, model, session, args=None, project_only=False,
+ read_deleted=None):
+ """Query helper that accounts for context's `read_deleted` field.
+
+ :param context: context to query under
+
+ :param model: Model to query. Must be a subclass of ModelBase.
+ :type model: models.ModelBase
+
+ :param session: The session to use.
+ :type session: sqlalchemy.orm.session.Session
+
+ :param args: Arguments to query. If None - model is used.
+ :type args: tuple
+
+ :param project_only: If present and context is user-type, then restrict
+ query to match the context's project_id. If set to
+ 'allow_none', restriction includes project_id = None.
+ :type project_only: bool
+
+ :param read_deleted: If present, overrides context's read_deleted field.
+ :type read_deleted: bool
+
+ Usage:
+
+ ..code:: python
+
+ result = (utils.model_query(context, models.Instance, session=session)
+ .filter_by(uuid=instance_uuid)
+ .all())
+
+ query = utils.model_query(
+ context, Node,
+ session=session,
+ args=(func.count(Node.id), func.sum(Node.ram))
+ ).filter_by(project_id=project_id)
+
+ """
+
+ if not read_deleted:
+ if hasattr(context, 'read_deleted'):
+ # NOTE(viktors): some projects use `read_deleted` attribute in
+ # their contexts instead of `show_deleted`.
+ read_deleted = context.read_deleted
+ else:
+ read_deleted = context.show_deleted
+
+ if not issubclass(model, models.ModelBase):
+ raise TypeError(_("model should be a subclass of ModelBase"))
+
+ query = session.query(model) if not args else session.query(*args)
+ query = _read_deleted_filter(query, model, read_deleted)
+ query = _project_filter(query, model, context, project_only)
+
+ return query
+
+
+def get_table(engine, name):
+ """Returns an sqlalchemy table dynamically from db.
+
+ Needed because the models don't work for us in migrations
+ as models will be far out of sync with the current data.
+
+ .. warning::
+
+ Do not use this method when creating ForeignKeys in database migrations
+ because sqlalchemy needs the same MetaData object to hold information
+ about the parent table and the reference table in the ForeignKey. This
+ method uses a unique MetaData object per table object so it won't work
+ with ForeignKey creation.
+ """
+ metadata = MetaData()
+ metadata.bind = engine
+ return Table(name, metadata, autoload=True)
+
+
+class InsertFromSelect(UpdateBase):
+ """Form the base for `INSERT INTO table (SELECT ... )` statement."""
+ def __init__(self, table, select):
+ self.table = table
+ self.select = select
+
+
+@compiles(InsertFromSelect)
+def visit_insert_from_select(element, compiler, **kw):
+ """Form the `INSERT INTO table (SELECT ... )` statement."""
+ return "INSERT INTO %s %s" % (
+ compiler.process(element.table, asfrom=True),
+ compiler.process(element.select))
+
+
+class ColumnError(Exception):
+ """Error raised when no column or an invalid column is found."""
+
+
+def _get_not_supported_column(col_name_col_instance, column_name):
+ try:
+ column = col_name_col_instance[column_name]
+ except KeyError:
+ msg = _("Please specify column %s in col_name_col_instance "
+ "param. It is required because column has unsupported "
+ "type by sqlite).")
+ raise ColumnError(msg % column_name)
+
+ if not isinstance(column, Column):
+ msg = _("col_name_col_instance param has wrong type of "
+ "column instance for column %s It should be instance "
+ "of sqlalchemy.Column.")
+ raise ColumnError(msg % column_name)
+ return column
+
+
+def drop_unique_constraint(migrate_engine, table_name, uc_name, *columns,
+ **col_name_col_instance):
+ """Drop unique constraint from table.
+
+ DEPRECATED: this function is deprecated and will be removed from oslo.db
+ in a few releases. Please use UniqueConstraint.drop() method directly for
+ sqlalchemy-migrate migration scripts.
+
+ This method drops UC from table and works for mysql, postgresql and sqlite.
+ In mysql and postgresql we are able to use "alter table" construction.
+ Sqlalchemy doesn't support some sqlite column types and replaces their
+ type with NullType in metadata. We process these columns and replace
+ NullType with the correct column type.
+
+ :param migrate_engine: sqlalchemy engine
+ :param table_name: name of table that contains uniq constraint.
+ :param uc_name: name of uniq constraint that will be dropped.
+ :param columns: columns that are in uniq constraint.
+ :param col_name_col_instance: contains pair column_name=column_instance.
+ column_instance is instance of Column. These params
+ are required only for columns that have unsupported
+ types by sqlite. For example BigInteger.
+ """
+
+ from migrate.changeset import UniqueConstraint
+
+ meta = MetaData()
+ meta.bind = migrate_engine
+ t = Table(table_name, meta, autoload=True)
+
+ if migrate_engine.name == "sqlite":
+ override_cols = [
+ _get_not_supported_column(col_name_col_instance, col.name)
+ for col in t.columns
+ if isinstance(col.type, NullType)
+ ]
+ for col in override_cols:
+ t.columns.replace(col)
+
+ uc = UniqueConstraint(*columns, table=t, name=uc_name)
+ uc.drop()
+
+
+def drop_old_duplicate_entries_from_table(migrate_engine, table_name,
+ use_soft_delete, *uc_column_names):
+ """Drop all old rows having the same values for columns in uc_columns.
+
+ This method drop (or mark ad `deleted` if use_soft_delete is True) old
+ duplicate rows form table with name `table_name`.
+
+ :param migrate_engine: Sqlalchemy engine
+ :param table_name: Table with duplicates
+ :param use_soft_delete: If True - values will be marked as `deleted`,
+ if False - values will be removed from table
+ :param uc_column_names: Unique constraint columns
+ """
+ meta = MetaData()
+ meta.bind = migrate_engine
+
+ table = Table(table_name, meta, autoload=True)
+ columns_for_group_by = [table.c[name] for name in uc_column_names]
+
+ columns_for_select = [func.max(table.c.id)]
+ columns_for_select.extend(columns_for_group_by)
+
+ duplicated_rows_select = sqlalchemy.sql.select(
+ columns_for_select, group_by=columns_for_group_by,
+ having=func.count(table.c.id) > 1)
+
+ for row in migrate_engine.execute(duplicated_rows_select):
+ # NOTE(boris-42): Do not remove row that has the biggest ID.
+ delete_condition = table.c.id != row[0]
+ is_none = None # workaround for pyflakes
+ delete_condition &= table.c.deleted_at == is_none
+ for name in uc_column_names:
+ delete_condition &= table.c[name] == row[name]
+
+ rows_to_delete_select = sqlalchemy.sql.select(
+ [table.c.id]).where(delete_condition)
+ for row in migrate_engine.execute(rows_to_delete_select).fetchall():
+ LOG.info(_LI("Deleting duplicated row with id: %(id)s from table: "
+ "%(table)s") % dict(id=row[0], table=table_name))
+
+ if use_soft_delete:
+ delete_statement = table.update().\
+ where(delete_condition).\
+ values({
+ 'deleted': literal_column('id'),
+ 'updated_at': literal_column('updated_at'),
+ 'deleted_at': timeutils.utcnow()
+ })
+ else:
+ delete_statement = table.delete().where(delete_condition)
+ migrate_engine.execute(delete_statement)
+
+
+def _get_default_deleted_value(table):
+ if isinstance(table.c.id.type, Integer):
+ return 0
+ if isinstance(table.c.id.type, String):
+ return ""
+ raise ColumnError(_("Unsupported id columns type"))
+
+
+def _restore_indexes_on_deleted_columns(migrate_engine, table_name, indexes):
+ table = get_table(migrate_engine, table_name)
+
+ insp = reflection.Inspector.from_engine(migrate_engine)
+ real_indexes = insp.get_indexes(table_name)
+ existing_index_names = dict(
+ [(index['name'], index['column_names']) for index in real_indexes])
+
+ # NOTE(boris-42): Restore indexes on `deleted` column
+ for index in indexes:
+ if 'deleted' not in index['column_names']:
+ continue
+ name = index['name']
+ if name in existing_index_names:
+ column_names = [table.c[c] for c in existing_index_names[name]]
+ old_index = Index(name, *column_names, unique=index["unique"])
+ old_index.drop(migrate_engine)
+
+ column_names = [table.c[c] for c in index['column_names']]
+ new_index = Index(index["name"], *column_names, unique=index["unique"])
+ new_index.create(migrate_engine)
+
+
+def change_deleted_column_type_to_boolean(migrate_engine, table_name,
+ **col_name_col_instance):
+ if migrate_engine.name == "sqlite":
+ return _change_deleted_column_type_to_boolean_sqlite(
+ migrate_engine, table_name, **col_name_col_instance)
+ insp = reflection.Inspector.from_engine(migrate_engine)
+ indexes = insp.get_indexes(table_name)
+
+ table = get_table(migrate_engine, table_name)
+
+ old_deleted = Column('old_deleted', Boolean, default=False)
+ old_deleted.create(table, populate_default=False)
+
+ table.update().\
+ where(table.c.deleted == table.c.id).\
+ values(old_deleted=True).\
+ execute()
+
+ table.c.deleted.drop()
+ table.c.old_deleted.alter(name="deleted")
+
+ _restore_indexes_on_deleted_columns(migrate_engine, table_name, indexes)
+
+
+def _change_deleted_column_type_to_boolean_sqlite(migrate_engine, table_name,
+ **col_name_col_instance):
+ insp = reflection.Inspector.from_engine(migrate_engine)
+ table = get_table(migrate_engine, table_name)
+
+ columns = []
+ for column in table.columns:
+ column_copy = None
+ if column.name != "deleted":
+ if isinstance(column.type, NullType):
+ column_copy = _get_not_supported_column(col_name_col_instance,
+ column.name)
+ else:
+ column_copy = column.copy()
+ else:
+ column_copy = Column('deleted', Boolean, default=0)
+ columns.append(column_copy)
+
+ constraints = [constraint.copy() for constraint in table.constraints]
+
+ meta = table.metadata
+ new_table = Table(table_name + "__tmp__", meta,
+ *(columns + constraints))
+ new_table.create()
+
+ indexes = []
+ for index in insp.get_indexes(table_name):
+ column_names = [new_table.c[c] for c in index['column_names']]
+ indexes.append(Index(index["name"], *column_names,
+ unique=index["unique"]))
+
+ c_select = []
+ for c in table.c:
+ if c.name != "deleted":
+ c_select.append(c)
+ else:
+ c_select.append(table.c.deleted == table.c.id)
+
+ ins = InsertFromSelect(new_table, sqlalchemy.sql.select(c_select))
+ migrate_engine.execute(ins)
+
+ table.drop()
+ [index.create(migrate_engine) for index in indexes]
+
+ new_table.rename(table_name)
+ new_table.update().\
+ where(new_table.c.deleted == new_table.c.id).\
+ values(deleted=True).\
+ execute()
+
+
+def change_deleted_column_type_to_id_type(migrate_engine, table_name,
+ **col_name_col_instance):
+ if migrate_engine.name == "sqlite":
+ return _change_deleted_column_type_to_id_type_sqlite(
+ migrate_engine, table_name, **col_name_col_instance)
+ insp = reflection.Inspector.from_engine(migrate_engine)
+ indexes = insp.get_indexes(table_name)
+
+ table = get_table(migrate_engine, table_name)
+
+ new_deleted = Column('new_deleted', table.c.id.type,
+ default=_get_default_deleted_value(table))
+ new_deleted.create(table, populate_default=True)
+
+ deleted = True # workaround for pyflakes
+ table.update().\
+ where(table.c.deleted == deleted).\
+ values(new_deleted=table.c.id).\
+ execute()
+ table.c.deleted.drop()
+ table.c.new_deleted.alter(name="deleted")
+
+ _restore_indexes_on_deleted_columns(migrate_engine, table_name, indexes)
+
+
+def _change_deleted_column_type_to_id_type_sqlite(migrate_engine, table_name,
+ **col_name_col_instance):
+ # NOTE(boris-42): sqlaclhemy-migrate can't drop column with check
+ # constraints in sqlite DB and our `deleted` column has
+ # 2 check constraints. So there is only one way to remove
+ # these constraints:
+ # 1) Create new table with the same columns, constraints
+ # and indexes. (except deleted column).
+ # 2) Copy all data from old to new table.
+ # 3) Drop old table.
+ # 4) Rename new table to old table name.
+ insp = reflection.Inspector.from_engine(migrate_engine)
+ meta = MetaData(bind=migrate_engine)
+ table = Table(table_name, meta, autoload=True)
+ default_deleted_value = _get_default_deleted_value(table)
+
+ columns = []
+ for column in table.columns:
+ column_copy = None
+ if column.name != "deleted":
+ if isinstance(column.type, NullType):
+ column_copy = _get_not_supported_column(col_name_col_instance,
+ column.name)
+ else:
+ column_copy = column.copy()
+ else:
+ column_copy = Column('deleted', table.c.id.type,
+ default=default_deleted_value)
+ columns.append(column_copy)
+
+ def is_deleted_column_constraint(constraint):
+ # NOTE(boris-42): There is no other way to check is CheckConstraint
+ # associated with deleted column.
+ if not isinstance(constraint, CheckConstraint):
+ return False
+ sqltext = str(constraint.sqltext)
+ return (sqltext.endswith("deleted in (0, 1)") or
+ sqltext.endswith("deleted IN (:deleted_1, :deleted_2)"))
+
+ constraints = []
+ for constraint in table.constraints:
+ if not is_deleted_column_constraint(constraint):
+ constraints.append(constraint.copy())
+
+ new_table = Table(table_name + "__tmp__", meta,
+ *(columns + constraints))
+ new_table.create()
+
+ indexes = []
+ for index in insp.get_indexes(table_name):
+ column_names = [new_table.c[c] for c in index['column_names']]
+ indexes.append(Index(index["name"], *column_names,
+ unique=index["unique"]))
+
+ ins = InsertFromSelect(new_table, table.select())
+ migrate_engine.execute(ins)
+
+ table.drop()
+ [index.create(migrate_engine) for index in indexes]
+
+ new_table.rename(table_name)
+ deleted = True # workaround for pyflakes
+ new_table.update().\
+ where(new_table.c.deleted == deleted).\
+ values(deleted=new_table.c.id).\
+ execute()
+
+ # NOTE(boris-42): Fix value of deleted column: False -> "" or 0.
+ deleted = False # workaround for pyflakes
+ new_table.update().\
+ where(new_table.c.deleted == deleted).\
+ values(deleted=default_deleted_value).\
+ execute()
+
+
+def get_connect_string(backend, database, user=None, passwd=None):
+ """Get database connection
+
+ Try to get a connection with a very specific set of values, if we get
+ these then we'll run the tests, otherwise they are skipped
+ """
+ args = {'backend': backend,
+ 'user': user,
+ 'passwd': passwd,
+ 'database': database}
+ if backend == 'sqlite':
+ template = '%(backend)s:///%(database)s'
+ else:
+ template = "%(backend)s://%(user)s:%(passwd)s@localhost/%(database)s"
+ return template % args
+
+
+def is_backend_avail(backend, database, user=None, passwd=None):
+ try:
+ connect_uri = get_connect_string(backend=backend,
+ database=database,
+ user=user,
+ passwd=passwd)
+ engine = sqlalchemy.create_engine(connect_uri)
+ connection = engine.connect()
+ except Exception:
+ # intentionally catch all to handle exceptions even if we don't
+ # have any backend code loaded.
+ return False
+ else:
+ connection.close()
+ engine.dispose()
+ return True
+
+
+def get_db_connection_info(conn_pieces):
+ database = conn_pieces.path.strip('/')
+ loc_pieces = conn_pieces.netloc.split('@')
+ host = loc_pieces[1]
+
+ auth_pieces = loc_pieces[0].split(':')
+ user = auth_pieces[0]
+ password = ""
+ if len(auth_pieces) > 1:
+ password = auth_pieces[1].strip()
+
+ return (user, password, database, host)