summaryrefslogtreecommitdiff
path: root/ironic
diff options
context:
space:
mode:
authorJenkins <jenkins@review.openstack.org>2013-11-28 18:37:20 +0000
committerGerrit Code Review <review@openstack.org>2013-11-28 18:37:20 +0000
commite19bebc6e6f7c79684bf5cb495bbce4f6b709443 (patch)
tree374b1e656f50b8bacd005622fade6df710dce964 /ironic
parentf6672ff5228bad23ac64f4d529da6950c3efb005 (diff)
parente6c48cfa448457df8dae234b8d6b320791890faf (diff)
downloadironic-e19bebc6e6f7c79684bf5cb495bbce4f6b709443.tar.gz
Merge "Sync some db changes from Oslo"
Diffstat (limited to 'ironic')
-rw-r--r--ironic/openstack/common/db/exception.py15
-rw-r--r--ironic/openstack/common/db/sqlalchemy/models.py21
-rw-r--r--ironic/openstack/common/db/sqlalchemy/session.py232
-rw-r--r--ironic/openstack/common/db/sqlalchemy/utils.py384
4 files changed, 562 insertions, 90 deletions
diff --git a/ironic/openstack/common/db/exception.py b/ironic/openstack/common/db/exception.py
index 894eb2d33..ea712b17c 100644
--- a/ironic/openstack/common/db/exception.py
+++ b/ironic/openstack/common/db/exception.py
@@ -1,5 +1,3 @@
-# vim: tabstop=4 shiftwidth=4 softtabstop=4
-
# Copyright 2010 United States Government as represented by the
# Administrator of the National Aeronautics and Space Administration.
# All Rights Reserved.
@@ -18,7 +16,7 @@
"""DB related custom exceptions."""
-from ironic.openstack.common.gettextutils import _
+from ironic.openstack.common.gettextutils import _ # noqa
class DBError(Exception):
@@ -43,3 +41,14 @@ class DBDeadlock(DBError):
class DBInvalidUnicodeParameter(Exception):
message = _("Invalid Parameter: "
"Unicode is not supported by the current database.")
+
+
+class DbMigrationError(DBError):
+ """Wraps migration specific exception."""
+ def __init__(self, message=None):
+ super(DbMigrationError, self).__init__(str(message))
+
+
+class DBConnectionError(DBError):
+ """Wraps connection specific exception."""
+ pass
diff --git a/ironic/openstack/common/db/sqlalchemy/models.py b/ironic/openstack/common/db/sqlalchemy/models.py
index e90d4110e..d55d0e58a 100644
--- a/ironic/openstack/common/db/sqlalchemy/models.py
+++ b/ironic/openstack/common/db/sqlalchemy/models.py
@@ -22,11 +22,13 @@
SQLAlchemy models.
"""
+import six
+
from sqlalchemy import Column, Integer
from sqlalchemy import DateTime
from sqlalchemy.orm import object_mapper
-from ironic.openstack.common.db.sqlalchemy.session import get_session
+from ironic.openstack.common.db.sqlalchemy import session as sa
from ironic.openstack.common import timeutils
@@ -37,7 +39,7 @@ class ModelBase(object):
def save(self, session=None):
"""Save this object."""
if not session:
- session = get_session()
+ session = sa.get_session()
# NOTE(boris-42): This part of code should be look like:
# sesssion.add(self)
# session.flush()
@@ -59,31 +61,34 @@ class ModelBase(object):
def get(self, key, default=None):
return getattr(self, key, default)
+ def _get_extra_keys(self):
+ return []
+
def __iter__(self):
columns = dict(object_mapper(self).columns).keys()
# NOTE(russellb): Allow models to specify other keys that can be looked
# up, beyond the actual db columns. An example would be the 'name'
# property for an Instance.
- if hasattr(self, '_extra_keys'):
- columns.extend(self._extra_keys())
+ columns.extend(self._get_extra_keys())
self._i = iter(columns)
return self
def next(self):
- n = self._i.next()
+ n = six.advance_iterator(self._i)
return n, getattr(self, n)
def update(self, values):
"""Make the model object behave like a dict."""
- for k, v in values.iteritems():
+ for k, v in six.iteritems(values):
setattr(self, k, v)
def iteritems(self):
"""Make the model object behave like a dict.
- Includes attributes from joins."""
+ Includes attributes from joins.
+ """
local = dict(self)
- joined = dict([(k, v) for k, v in self.__dict__.iteritems()
+ joined = dict([(k, v) for k, v in six.iteritems(self.__dict__)
if not k[0] == '_'])
local.update(joined)
return local.iteritems()
diff --git a/ironic/openstack/common/db/sqlalchemy/session.py b/ironic/openstack/common/db/sqlalchemy/session.py
index 3e06c5460..c453c75b3 100644
--- a/ironic/openstack/common/db/sqlalchemy/session.py
+++ b/ironic/openstack/common/db/sqlalchemy/session.py
@@ -241,11 +241,11 @@ Efficient use of soft deletes:
# This will produce count(bar_refs) db requests.
"""
+import functools
import os.path
import re
import time
-from eventlet import greenthread
from oslo.config import cfg
import six
from sqlalchemy import exc as sqla_exc
@@ -256,12 +256,10 @@ from sqlalchemy.pool import NullPool, StaticPool
from sqlalchemy.sql.expression import literal_column
from ironic.openstack.common.db import exception
+from ironic.openstack.common.gettextutils import _ # noqa
from ironic.openstack.common import log as logging
-from ironic.openstack.common.gettextutils import _
from ironic.openstack.common import timeutils
-DEFAULT = 'DEFAULT'
-
sqlite_db_opts = [
cfg.StrOpt('sqlite_db',
default='ironic.sqlite',
@@ -278,74 +276,113 @@ database_opts = [
'../', '$sqlite_db')),
help='The SQLAlchemy connection string used to connect to the '
'database',
- deprecated_name='sql_connection',
- deprecated_group=DEFAULT,
- secret=True),
+ deprecated_opts=[cfg.DeprecatedOpt('sql_connection',
+ group='DEFAULT'),
+ cfg.DeprecatedOpt('sql_connection',
+ group='DATABASE'),
+ cfg.DeprecatedOpt('connection',
+ group='sql'), ]),
+ cfg.StrOpt('slave_connection',
+ default='',
+ help='The SQLAlchemy connection string used to connect to the '
+ 'slave database'),
cfg.IntOpt('idle_timeout',
default=3600,
- deprecated_name='sql_idle_timeout',
- deprecated_group=DEFAULT,
+ deprecated_opts=[cfg.DeprecatedOpt('sql_idle_timeout',
+ group='DEFAULT'),
+ cfg.DeprecatedOpt('sql_idle_timeout',
+ group='DATABASE')],
help='timeout before idle sql connections are reaped'),
cfg.IntOpt('min_pool_size',
default=1,
- deprecated_name='sql_min_pool_size',
- deprecated_group=DEFAULT,
+ deprecated_opts=[cfg.DeprecatedOpt('sql_min_pool_size',
+ group='DEFAULT'),
+ cfg.DeprecatedOpt('sql_min_pool_size',
+ group='DATABASE')],
help='Minimum number of SQL connections to keep open in a '
'pool'),
cfg.IntOpt('max_pool_size',
- default=5,
- deprecated_name='sql_max_pool_size',
- deprecated_group=DEFAULT,
+ default=None,
+ deprecated_opts=[cfg.DeprecatedOpt('sql_max_pool_size',
+ group='DEFAULT'),
+ cfg.DeprecatedOpt('sql_max_pool_size',
+ group='DATABASE')],
help='Maximum number of SQL connections to keep open in a '
'pool'),
cfg.IntOpt('max_retries',
default=10,
- deprecated_name='sql_max_retries',
- deprecated_group=DEFAULT,
+ deprecated_opts=[cfg.DeprecatedOpt('sql_max_retries',
+ group='DEFAULT'),
+ cfg.DeprecatedOpt('sql_max_retries',
+ group='DATABASE')],
help='maximum db connection retries during startup. '
'(setting -1 implies an infinite retry count)'),
cfg.IntOpt('retry_interval',
default=10,
- deprecated_name='sql_retry_interval',
- deprecated_group=DEFAULT,
+ deprecated_opts=[cfg.DeprecatedOpt('sql_retry_interval',
+ group='DEFAULT'),
+ cfg.DeprecatedOpt('reconnect_interval',
+ group='DATABASE')],
help='interval between retries of opening a sql connection'),
cfg.IntOpt('max_overflow',
default=None,
- deprecated_name='sql_max_overflow',
- deprecated_group=DEFAULT,
+ deprecated_opts=[cfg.DeprecatedOpt('sql_max_overflow',
+ group='DEFAULT'),
+ cfg.DeprecatedOpt('sqlalchemy_max_overflow',
+ group='DATABASE')],
help='If set, use this value for max_overflow with sqlalchemy'),
cfg.IntOpt('connection_debug',
default=0,
- deprecated_name='sql_connection_debug',
- deprecated_group=DEFAULT,
+ deprecated_opts=[cfg.DeprecatedOpt('sql_connection_debug',
+ group='DEFAULT')],
help='Verbosity of SQL debugging information. 0=None, '
'100=Everything'),
cfg.BoolOpt('connection_trace',
default=False,
- deprecated_name='sql_connection_trace',
- deprecated_group=DEFAULT,
+ deprecated_opts=[cfg.DeprecatedOpt('sql_connection_trace',
+ group='DEFAULT')],
help='Add python stack traces to SQL as comment strings'),
+ cfg.IntOpt('pool_timeout',
+ default=None,
+ deprecated_opts=[cfg.DeprecatedOpt('sqlalchemy_pool_timeout',
+ group='DATABASE')],
+ help='If set, use this value for pool_timeout with sqlalchemy'),
]
CONF = cfg.CONF
CONF.register_opts(sqlite_db_opts)
CONF.register_opts(database_opts, 'database')
+
LOG = logging.getLogger(__name__)
_ENGINE = None
_MAKER = None
+_SLAVE_ENGINE = None
+_SLAVE_MAKER = None
-def set_defaults(sql_connection, sqlite_db):
+def set_defaults(sql_connection, sqlite_db, max_pool_size=None,
+ max_overflow=None, pool_timeout=None):
"""Set defaults for configuration variables."""
cfg.set_defaults(database_opts,
connection=sql_connection)
cfg.set_defaults(sqlite_db_opts,
sqlite_db=sqlite_db)
+ # Update the QueuePool defaults
+ if max_pool_size is not None:
+ cfg.set_defaults(database_opts,
+ max_pool_size=max_pool_size)
+ if max_overflow is not None:
+ cfg.set_defaults(database_opts,
+ max_overflow=max_overflow)
+ if pool_timeout is not None:
+ cfg.set_defaults(database_opts,
+ pool_timeout=pool_timeout)
def cleanup():
global _ENGINE, _MAKER
+ global _SLAVE_ENGINE, _SLAVE_MAKER
if _MAKER:
_MAKER.close_all()
@@ -353,11 +390,16 @@ def cleanup():
if _ENGINE:
_ENGINE.dispose()
_ENGINE = None
+ if _SLAVE_MAKER:
+ _SLAVE_MAKER.close_all()
+ _SLAVE_MAKER = None
+ if _SLAVE_ENGINE:
+ _SLAVE_ENGINE.dispose()
+ _SLAVE_ENGINE = None
class SqliteForeignKeysListener(PoolListener):
- """
- Ensures that the foreign key constraints are enforced in SQLite.
+ """Ensures that the foreign key constraints are enforced in SQLite.
The foreign key constraints are disabled by default in SQLite,
so the foreign key constraints will be enabled here for every
@@ -368,15 +410,25 @@ class SqliteForeignKeysListener(PoolListener):
def get_session(autocommit=True, expire_on_commit=False,
- sqlite_fk=False):
+ sqlite_fk=False, slave_session=False):
"""Return a SQLAlchemy session."""
global _MAKER
+ global _SLAVE_MAKER
+ maker = _MAKER
- if _MAKER is None:
- engine = get_engine(sqlite_fk=sqlite_fk)
- _MAKER = get_maker(engine, autocommit, expire_on_commit)
+ if slave_session:
+ maker = _SLAVE_MAKER
- session = _MAKER()
+ if maker is None:
+ engine = get_engine(sqlite_fk=sqlite_fk, slave_engine=slave_session)
+ maker = get_maker(engine, autocommit, expire_on_commit)
+
+ if slave_session:
+ _SLAVE_MAKER = maker
+ else:
+ _MAKER = maker
+
+ session = maker()
return session
@@ -406,24 +458,31 @@ _DUP_KEY_RE_DB = {
def _raise_if_duplicate_entry_error(integrity_error, engine_name):
- """
+ """Raise exception if two entries are duplicated.
+
In this function will be raised DBDuplicateEntry exception if integrity
error wrap unique constraint violation.
"""
def get_columns_from_uniq_cons_or_name(columns):
- # note(boris-42): UniqueConstraint name convention: "uniq_c1_x_c2_x_c3"
- # means that columns c1, c2, c3 are in UniqueConstraint.
+ # note(vsergeyev): UniqueConstraint name convention: "uniq_t0c10c2"
+ # where `t` it is table name and columns `c1`, `c2`
+ # are in UniqueConstraint.
uniqbase = "uniq_"
if not columns.startswith(uniqbase):
if engine_name == "postgresql":
return [columns[columns.index("_") + 1:columns.rindex("_")]]
return [columns]
- return columns[len(uniqbase):].split("_x_")
+ return columns[len(uniqbase):].split("0")[1:]
if engine_name not in ["mysql", "sqlite", "postgresql"]:
return
+ # FIXME(johannes): The usage of the .message attribute has been
+ # deprecated since Python 2.6. However, the exceptions raised by
+ # SQLAlchemy can differ when using unicode() and accessing .message.
+ # An audit across all three supported engines will be necessary to
+ # ensure there are no regressions.
m = _DUP_KEY_RE_DB[engine_name].match(integrity_error.message)
if not m:
return
@@ -448,13 +507,19 @@ _DEADLOCK_RE_DB = {
def _raise_if_deadlock_error(operational_error, engine_name):
- """
+ """Raise exception on deadlock condition.
+
Raise DBDeadlock exception if OperationalError contains a Deadlock
condition.
"""
re = _DEADLOCK_RE_DB.get(engine_name)
if re is None:
return
+ # FIXME(johannes): The usage of the .message attribute has been
+ # deprecated since Python 2.6. However, the exceptions raised by
+ # SQLAlchemy can differ when using unicode() and accessing .message.
+ # An audit across all three supported engines will be necessary to
+ # ensure there are no regressions.
m = re.match(operational_error.message)
if not m:
return
@@ -462,6 +527,7 @@ def _raise_if_deadlock_error(operational_error, engine_name):
def _wrap_db_error(f):
+ @functools.wraps(f)
def _wrap(*args, **kwargs):
try:
return f(*args, **kwargs)
@@ -486,17 +552,29 @@ def _wrap_db_error(f):
except Exception as e:
LOG.exception(_('DB exception wrapped.'))
raise exception.DBError(e)
- _wrap.func_name = f.func_name
return _wrap
-def get_engine(sqlite_fk=False):
+def get_engine(sqlite_fk=False, slave_engine=False):
"""Return a SQLAlchemy engine."""
global _ENGINE
- if _ENGINE is None:
- _ENGINE = create_engine(CONF.database.connection,
- sqlite_fk=sqlite_fk)
- return _ENGINE
+ global _SLAVE_ENGINE
+ engine = _ENGINE
+ db_uri = CONF.database.connection
+
+ if slave_engine:
+ engine = _SLAVE_ENGINE
+ db_uri = CONF.database.slave_connection
+
+ if engine is None:
+ engine = create_engine(db_uri,
+ sqlite_fk=sqlite_fk)
+ if slave_engine:
+ _SLAVE_ENGINE = engine
+ else:
+ _ENGINE = engine
+
+ return engine
def _synchronous_switch_listener(dbapi_conn, connection_rec):
@@ -513,20 +591,20 @@ def _add_regexp_listener(dbapi_con, con_record):
dbapi_con.create_function('regexp', 2, regexp)
-def _greenthread_yield(dbapi_con, con_record):
- """
- Ensure other greenthreads get a chance to execute by forcing a context
- switch. With common database backends (eg MySQLdb and sqlite), there is
- no implicit yield caused by network I/O since they are implemented by
- C libraries that eventlet cannot monkey patch.
+def _thread_yield(dbapi_con, con_record):
+ """Ensure other greenthreads get a chance to be executed.
+
+ If we use eventlet.monkey_patch(), eventlet.greenthread.sleep(0) will
+ execute instead of time.sleep(0).
+ Force a context switch. With common database backends (eg MySQLdb and
+ sqlite), there is no implicit yield caused by network I/O since they are
+ implemented by C libraries that eventlet cannot monkey patch.
"""
- greenthread.sleep(0)
+ time.sleep(0)
def _ping_listener(dbapi_conn, connection_rec, connection_proxy):
- """
- Ensures that MySQL connections checked out of the
- pool are alive.
+ """Ensures that MySQL connections checked out of the pool are alive.
Borrowed from:
http://groups.google.com/group/sqlalchemy/msg/a4ce563d802c929f
@@ -545,7 +623,8 @@ def _is_db_connection_error(args):
"""Return True if error in connecting to db."""
# NOTE(adam_g): This is currently MySQL specific and needs to be extended
# to support Postgres and others.
- conn_err_codes = ('2002', '2003', '2006')
+ # For the db2, the error code is -30081 since the db2 is still not ready
+ conn_err_codes = ('2002', '2003', '2006', '-30081')
for err_code in conn_err_codes:
if args.find(err_code) != -1:
return True
@@ -554,6 +633,11 @@ def _is_db_connection_error(args):
def create_engine(sql_connection, sqlite_fk=False):
"""Return a new SQLAlchemy engine."""
+ # NOTE(geekinutah): At this point we could be connecting to the normal
+ # db handle or the slave db handle. Things like
+ # _wrap_db_error aren't going to work well if their
+ # backends don't match. Let's check.
+ _assert_matching_drivers()
connection_dict = sqlalchemy.engine.url.make_url(sql_connection)
engine_args = {
@@ -577,13 +661,16 @@ def create_engine(sql_connection, sqlite_fk=False):
engine_args["poolclass"] = StaticPool
engine_args["connect_args"] = {'check_same_thread': False}
else:
- engine_args['pool_size'] = CONF.database.max_pool_size
+ if CONF.database.max_pool_size is not None:
+ engine_args['pool_size'] = CONF.database.max_pool_size
if CONF.database.max_overflow is not None:
engine_args['max_overflow'] = CONF.database.max_overflow
+ if CONF.database.pool_timeout is not None:
+ engine_args['pool_timeout'] = CONF.database.pool_timeout
engine = sqlalchemy.create_engine(sql_connection, **engine_args)
- sqlalchemy.event.listen(engine, 'checkin', _greenthread_yield)
+ sqlalchemy.event.listen(engine, 'checkin', _thread_yield)
if 'mysql' in connection_dict.drivername:
sqlalchemy.event.listen(engine, 'checkout', _ping_listener)
@@ -656,8 +743,9 @@ def get_maker(engine, autocommit=True, expire_on_commit=False):
def _patch_mysqldb_with_stacktrace_comments():
- """Adds current stack trace as a comment in queries by patching
- MySQLdb.cursors.BaseCursor._do_query.
+ """Adds current stack trace as a comment in queries.
+
+ Patches MySQLdb.cursors.BaseCursor._do_query.
"""
import MySQLdb.cursors
import traceback
@@ -666,25 +754,25 @@ def _patch_mysqldb_with_stacktrace_comments():
def _do_query(self, q):
stack = ''
- for file, line, method, function in traceback.extract_stack():
+ for filename, line, method, function in traceback.extract_stack():
# exclude various common things from trace
- if file.endswith('session.py') and method == '_do_query':
+ if filename.endswith('session.py') and method == '_do_query':
continue
- if file.endswith('api.py') and method == 'wrapper':
+ if filename.endswith('api.py') and method == 'wrapper':
continue
- if file.endswith('utils.py') and method == '_inner':
+ if filename.endswith('utils.py') and method == '_inner':
continue
- if file.endswith('exception.py') and method == '_wrap':
+ if filename.endswith('exception.py') and method == '_wrap':
continue
# db/api is just a wrapper around db/sqlalchemy/api
- if file.endswith('db/api.py'):
+ if filename.endswith('db/api.py'):
continue
# only trace inside ironic
- index = file.rfind('ironic')
+ index = filename.rfind('ironic')
if index == -1:
continue
stack += "File:%s:%s Method:%s() Line:%s | " \
- % (file[index:], line, method, function)
+ % (filename[index:], line, method, function)
# strip trailing " | " from stack
if stack:
@@ -695,3 +783,15 @@ def _patch_mysqldb_with_stacktrace_comments():
old_mysql_do_query(self, qq)
setattr(MySQLdb.cursors.BaseCursor, '_do_query', _do_query)
+
+
+def _assert_matching_drivers():
+ """Make sure slave handle and normal handle have the same driver."""
+ # NOTE(geekinutah): There's no use case for writing to one backend and
+ # reading from another. Who knows what the future holds?
+ if CONF.database.slave_connection == '':
+ return
+
+ normal = sqlalchemy.engine.url.make_url(CONF.database.connection)
+ slave = sqlalchemy.engine.url.make_url(CONF.database.slave_connection)
+ assert normal.drivername == slave.drivername
diff --git a/ironic/openstack/common/db/sqlalchemy/utils.py b/ironic/openstack/common/db/sqlalchemy/utils.py
index 9d5949891..dd4a799ca 100644
--- a/ironic/openstack/common/db/sqlalchemy/utils.py
+++ b/ironic/openstack/common/db/sqlalchemy/utils.py
@@ -18,16 +18,43 @@
# License for the specific language governing permissions and limitations
# under the License.
-"""Implementation of paginate query."""
+import re
+from migrate.changeset import UniqueConstraint
import sqlalchemy
+from sqlalchemy import Boolean
+from sqlalchemy import CheckConstraint
+from sqlalchemy import Column
+from sqlalchemy.engine import reflection
+from sqlalchemy.ext.compiler import compiles
+from sqlalchemy import func
+from sqlalchemy import Index
+from sqlalchemy import Integer
+from sqlalchemy import MetaData
+from sqlalchemy.sql.expression import literal_column
+from sqlalchemy.sql.expression import UpdateBase
+from sqlalchemy.sql import select
+from sqlalchemy import String
+from sqlalchemy import Table
+from sqlalchemy.types import NullType
+
+from ironic.openstack.common.gettextutils import _ # noqa
-from ironic.openstack.common.gettextutils import _
from ironic.openstack.common import log as logging
+from ironic.openstack.common import timeutils
LOG = logging.getLogger(__name__)
+_DBURL_REGEX = re.compile(r"[^:]+://([^:]+):([^@]+)@.+")
+
+
+def sanitize_db_url(url):
+ match = _DBURL_REGEX.match(url)
+ if match:
+ return '%s****:****%s' % (url[:match.start(1)], url[match.end(2):])
+ return url
+
class InvalidSortKey(Exception):
message = _("Sort key supplied was not valid.")
@@ -85,11 +112,14 @@ def paginate_query(query, model, limit, sort_keys, marker=None,
# Add sorting
for current_sort_key, current_sort_dir in zip(sort_keys, sort_dirs):
- sort_dir_func = {
- 'asc': sqlalchemy.asc,
- 'desc': sqlalchemy.desc,
- }[current_sort_dir]
-
+ try:
+ sort_dir_func = {
+ 'asc': sqlalchemy.asc,
+ 'desc': sqlalchemy.desc,
+ }[current_sort_dir]
+ except KeyError:
+ raise ValueError(_("Unknown sort direction, "
+ "must be 'desc' or 'asc'"))
try:
sort_key_attr = getattr(model, current_sort_key)
except AttributeError:
@@ -114,11 +144,8 @@ def paginate_query(query, model, limit, sort_keys, marker=None,
model_attr = getattr(model, sort_keys[i])
if sort_dirs[i] == 'desc':
crit_attrs.append((model_attr < marker_values[i]))
- elif sort_dirs[i] == 'asc':
- crit_attrs.append((model_attr > marker_values[i]))
else:
- raise ValueError(_("Unknown sort direction, "
- "must be 'desc' or 'asc'"))
+ crit_attrs.append((model_attr > marker_values[i]))
criteria = sqlalchemy.sql.and_(*crit_attrs)
criteria_list.append(criteria)
@@ -138,6 +165,337 @@ def get_table(engine, name):
Needed because the models don't work for us in migrations
as models will be far out of sync with the current data.
"""
- metadata = sqlalchemy.MetaData()
+ metadata = MetaData()
metadata.bind = engine
- return sqlalchemy.Table(name, metadata, autoload=True)
+ return Table(name, metadata, autoload=True)
+
+
+class InsertFromSelect(UpdateBase):
+ """Form the base for `INSERT INTO table (SELECT ... )` statement."""
+ def __init__(self, table, select):
+ self.table = table
+ self.select = select
+
+
+@compiles(InsertFromSelect)
+def visit_insert_from_select(element, compiler, **kw):
+ """Form the `INSERT INTO table (SELECT ... )` statement."""
+ return "INSERT INTO %s %s" % (
+ compiler.process(element.table, asfrom=True),
+ compiler.process(element.select))
+
+
+class ColumnError(Exception):
+ """Error raised when no column or an invalid column is found."""
+
+
+def _get_not_supported_column(col_name_col_instance, column_name):
+ try:
+ column = col_name_col_instance[column_name]
+ except KeyError:
+ msg = _("Please specify column %s in col_name_col_instance "
+ "param. It is required because column has unsupported "
+ "type by sqlite).")
+ raise ColumnError(msg % column_name)
+
+ if not isinstance(column, Column):
+ msg = _("col_name_col_instance param has wrong type of "
+ "column instance for column %s It should be instance "
+ "of sqlalchemy.Column.")
+ raise ColumnError(msg % column_name)
+ return column
+
+
+def drop_unique_constraint(migrate_engine, table_name, uc_name, *columns,
+ **col_name_col_instance):
+ """Drop unique constraint from table.
+
+ This method drops UC from table and works for mysql, postgresql and sqlite.
+ In mysql and postgresql we are able to use "alter table" construction.
+ Sqlalchemy doesn't support some sqlite column types and replaces their
+ type with NullType in metadata. We process these columns and replace
+ NullType with the correct column type.
+
+ :param migrate_engine: sqlalchemy engine
+ :param table_name: name of table that contains uniq constraint.
+ :param uc_name: name of uniq constraint that will be dropped.
+ :param columns: columns that are in uniq constraint.
+ :param col_name_col_instance: contains pair column_name=column_instance.
+ column_instance is instance of Column. These params
+ are required only for columns that have unsupported
+ types by sqlite. For example BigInteger.
+ """
+
+ meta = MetaData()
+ meta.bind = migrate_engine
+ t = Table(table_name, meta, autoload=True)
+
+ if migrate_engine.name == "sqlite":
+ override_cols = [
+ _get_not_supported_column(col_name_col_instance, col.name)
+ for col in t.columns
+ if isinstance(col.type, NullType)
+ ]
+ for col in override_cols:
+ t.columns.replace(col)
+
+ uc = UniqueConstraint(*columns, table=t, name=uc_name)
+ uc.drop()
+
+
+def drop_old_duplicate_entries_from_table(migrate_engine, table_name,
+ use_soft_delete, *uc_column_names):
+ """Drop all old rows having the same values for columns in uc_columns.
+
+ This method drop (or mark ad `deleted` if use_soft_delete is True) old
+ duplicate rows form table with name `table_name`.
+
+ :param migrate_engine: Sqlalchemy engine
+ :param table_name: Table with duplicates
+ :param use_soft_delete: If True - values will be marked as `deleted`,
+ if False - values will be removed from table
+ :param uc_column_names: Unique constraint columns
+ """
+ meta = MetaData()
+ meta.bind = migrate_engine
+
+ table = Table(table_name, meta, autoload=True)
+ columns_for_group_by = [table.c[name] for name in uc_column_names]
+
+ columns_for_select = [func.max(table.c.id)]
+ columns_for_select.extend(columns_for_group_by)
+
+ duplicated_rows_select = select(columns_for_select,
+ group_by=columns_for_group_by,
+ having=func.count(table.c.id) > 1)
+
+ for row in migrate_engine.execute(duplicated_rows_select):
+ # NOTE(boris-42): Do not remove row that has the biggest ID.
+ delete_condition = table.c.id != row[0]
+ is_none = None # workaround for pyflakes
+ delete_condition &= table.c.deleted_at == is_none
+ for name in uc_column_names:
+ delete_condition &= table.c[name] == row[name]
+
+ rows_to_delete_select = select([table.c.id]).where(delete_condition)
+ for row in migrate_engine.execute(rows_to_delete_select).fetchall():
+ LOG.info(_("Deleting duplicated row with id: %(id)s from table: "
+ "%(table)s") % dict(id=row[0], table=table_name))
+
+ if use_soft_delete:
+ delete_statement = table.update().\
+ where(delete_condition).\
+ values({
+ 'deleted': literal_column('id'),
+ 'updated_at': literal_column('updated_at'),
+ 'deleted_at': timeutils.utcnow()
+ })
+ else:
+ delete_statement = table.delete().where(delete_condition)
+ migrate_engine.execute(delete_statement)
+
+
+def _get_default_deleted_value(table):
+ if isinstance(table.c.id.type, Integer):
+ return 0
+ if isinstance(table.c.id.type, String):
+ return ""
+ raise ColumnError(_("Unsupported id columns type"))
+
+
+def _restore_indexes_on_deleted_columns(migrate_engine, table_name, indexes):
+ table = get_table(migrate_engine, table_name)
+
+ insp = reflection.Inspector.from_engine(migrate_engine)
+ real_indexes = insp.get_indexes(table_name)
+ existing_index_names = dict(
+ [(index['name'], index['column_names']) for index in real_indexes])
+
+ # NOTE(boris-42): Restore indexes on `deleted` column
+ for index in indexes:
+ if 'deleted' not in index['column_names']:
+ continue
+ name = index['name']
+ if name in existing_index_names:
+ column_names = [table.c[c] for c in existing_index_names[name]]
+ old_index = Index(name, *column_names, unique=index["unique"])
+ old_index.drop(migrate_engine)
+
+ column_names = [table.c[c] for c in index['column_names']]
+ new_index = Index(index["name"], *column_names, unique=index["unique"])
+ new_index.create(migrate_engine)
+
+
+def change_deleted_column_type_to_boolean(migrate_engine, table_name,
+ **col_name_col_instance):
+ if migrate_engine.name == "sqlite":
+ return _change_deleted_column_type_to_boolean_sqlite(
+ migrate_engine, table_name, **col_name_col_instance)
+ insp = reflection.Inspector.from_engine(migrate_engine)
+ indexes = insp.get_indexes(table_name)
+
+ table = get_table(migrate_engine, table_name)
+
+ old_deleted = Column('old_deleted', Boolean, default=False)
+ old_deleted.create(table, populate_default=False)
+
+ table.update().\
+ where(table.c.deleted == table.c.id).\
+ values(old_deleted=True).\
+ execute()
+
+ table.c.deleted.drop()
+ table.c.old_deleted.alter(name="deleted")
+
+ _restore_indexes_on_deleted_columns(migrate_engine, table_name, indexes)
+
+
+def _change_deleted_column_type_to_boolean_sqlite(migrate_engine, table_name,
+ **col_name_col_instance):
+ insp = reflection.Inspector.from_engine(migrate_engine)
+ table = get_table(migrate_engine, table_name)
+
+ columns = []
+ for column in table.columns:
+ column_copy = None
+ if column.name != "deleted":
+ if isinstance(column.type, NullType):
+ column_copy = _get_not_supported_column(col_name_col_instance,
+ column.name)
+ else:
+ column_copy = column.copy()
+ else:
+ column_copy = Column('deleted', Boolean, default=0)
+ columns.append(column_copy)
+
+ constraints = [constraint.copy() for constraint in table.constraints]
+
+ meta = table.metadata
+ new_table = Table(table_name + "__tmp__", meta,
+ *(columns + constraints))
+ new_table.create()
+
+ indexes = []
+ for index in insp.get_indexes(table_name):
+ column_names = [new_table.c[c] for c in index['column_names']]
+ indexes.append(Index(index["name"], *column_names,
+ unique=index["unique"]))
+
+ c_select = []
+ for c in table.c:
+ if c.name != "deleted":
+ c_select.append(c)
+ else:
+ c_select.append(table.c.deleted == table.c.id)
+
+ ins = InsertFromSelect(new_table, select(c_select))
+ migrate_engine.execute(ins)
+
+ table.drop()
+ [index.create(migrate_engine) for index in indexes]
+
+ new_table.rename(table_name)
+ new_table.update().\
+ where(new_table.c.deleted == new_table.c.id).\
+ values(deleted=True).\
+ execute()
+
+
+def change_deleted_column_type_to_id_type(migrate_engine, table_name,
+ **col_name_col_instance):
+ if migrate_engine.name == "sqlite":
+ return _change_deleted_column_type_to_id_type_sqlite(
+ migrate_engine, table_name, **col_name_col_instance)
+ insp = reflection.Inspector.from_engine(migrate_engine)
+ indexes = insp.get_indexes(table_name)
+
+ table = get_table(migrate_engine, table_name)
+
+ new_deleted = Column('new_deleted', table.c.id.type,
+ default=_get_default_deleted_value(table))
+ new_deleted.create(table, populate_default=True)
+
+ deleted = True # workaround for pyflakes
+ table.update().\
+ where(table.c.deleted == deleted).\
+ values(new_deleted=table.c.id).\
+ execute()
+ table.c.deleted.drop()
+ table.c.new_deleted.alter(name="deleted")
+
+ _restore_indexes_on_deleted_columns(migrate_engine, table_name, indexes)
+
+
+def _change_deleted_column_type_to_id_type_sqlite(migrate_engine, table_name,
+ **col_name_col_instance):
+ # NOTE(boris-42): sqlaclhemy-migrate can't drop column with check
+ # constraints in sqlite DB and our `deleted` column has
+ # 2 check constraints. So there is only one way to remove
+ # these constraints:
+ # 1) Create new table with the same columns, constraints
+ # and indexes. (except deleted column).
+ # 2) Copy all data from old to new table.
+ # 3) Drop old table.
+ # 4) Rename new table to old table name.
+ insp = reflection.Inspector.from_engine(migrate_engine)
+ meta = MetaData(bind=migrate_engine)
+ table = Table(table_name, meta, autoload=True)
+ default_deleted_value = _get_default_deleted_value(table)
+
+ columns = []
+ for column in table.columns:
+ column_copy = None
+ if column.name != "deleted":
+ if isinstance(column.type, NullType):
+ column_copy = _get_not_supported_column(col_name_col_instance,
+ column.name)
+ else:
+ column_copy = column.copy()
+ else:
+ column_copy = Column('deleted', table.c.id.type,
+ default=default_deleted_value)
+ columns.append(column_copy)
+
+ def is_deleted_column_constraint(constraint):
+ # NOTE(boris-42): There is no other way to check is CheckConstraint
+ # associated with deleted column.
+ if not isinstance(constraint, CheckConstraint):
+ return False
+ sqltext = str(constraint.sqltext)
+ return (sqltext.endswith("deleted in (0, 1)") or
+ sqltext.endswith("deleted IN (:deleted_1, :deleted_2)"))
+
+ constraints = []
+ for constraint in table.constraints:
+ if not is_deleted_column_constraint(constraint):
+ constraints.append(constraint.copy())
+
+ new_table = Table(table_name + "__tmp__", meta,
+ *(columns + constraints))
+ new_table.create()
+
+ indexes = []
+ for index in insp.get_indexes(table_name):
+ column_names = [new_table.c[c] for c in index['column_names']]
+ indexes.append(Index(index["name"], *column_names,
+ unique=index["unique"]))
+
+ ins = InsertFromSelect(new_table, table.select())
+ migrate_engine.execute(ins)
+
+ table.drop()
+ [index.create(migrate_engine) for index in indexes]
+
+ new_table.rename(table_name)
+ deleted = True # workaround for pyflakes
+ new_table.update().\
+ where(new_table.c.deleted == deleted).\
+ values(deleted=new_table.c.id).\
+ execute()
+
+ # NOTE(boris-42): Fix value of deleted column: False -> "" or 0.
+ deleted = False # workaround for pyflakes
+ new_table.update().\
+ where(new_table.c.deleted == deleted).\
+ values(deleted=default_deleted_value).\
+ execute()