diff options
Diffstat (limited to 'oslo/db/sqlalchemy/test_migrations.py')
-rw-r--r-- | oslo/db/sqlalchemy/test_migrations.py | 600 |
1 files changed, 1 insertions, 599 deletions
diff --git a/oslo/db/sqlalchemy/test_migrations.py b/oslo/db/sqlalchemy/test_migrations.py index 4d9146a..bfbf0a8 100644 --- a/oslo/db/sqlalchemy/test_migrations.py +++ b/oslo/db/sqlalchemy/test_migrations.py @@ -1,5 +1,3 @@ -# Copyright 2010-2011 OpenStack Foundation -# Copyright 2012-2013 IBM Corp. # All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); you may @@ -14,600 +12,4 @@ # License for the specific language governing permissions and limitations # under the License. -import abc -import collections -import logging -import pprint - -import alembic -import alembic.autogenerate -import alembic.migration -import pkg_resources as pkg -import six -import sqlalchemy -from sqlalchemy.engine import reflection -import sqlalchemy.exc -from sqlalchemy import schema -import sqlalchemy.sql.expression as expr -import sqlalchemy.types as types - -from oslo.db._i18n import _LE -from oslo.db import exception as exc -from oslo.db.sqlalchemy import utils - -LOG = logging.getLogger(__name__) - - -@six.add_metaclass(abc.ABCMeta) -class WalkVersionsMixin(object): - """Test mixin to check upgrade and downgrade ability of migration. - - This is only suitable for testing of migrate_ migration scripts. An - abstract class mixin. `INIT_VERSION`, `REPOSITORY` and `migration_api` - attributes must be implemented in subclasses. - - .. _auxiliary-dynamic-methods: Auxiliary Methods - - Auxiliary Methods: - - `migrate_up` and `migrate_down` instance methods of the class can be - used with auxiliary methods named `_pre_upgrade_<revision_id>`, - `_check_<revision_id>`, `_post_downgrade_<revision_id>`. The methods - intended to check applied changes for correctness of data operations. - This methods should be implemented for every particular revision - which you want to check with data. Implementation recommendations for - `_pre_upgrade_<revision_id>`, `_check_<revision_id>`, - `_post_downgrade_<revision_id>` implementation: - - * `_pre_upgrade_<revision_id>`: provide a data appropriate to - a next revision. Should be used an id of revision which - going to be applied. - - * `_check_<revision_id>`: Insert, select, delete operations - with newly applied changes. The data provided by - `_pre_upgrade_<revision_id>` will be used. - - * `_post_downgrade_<revision_id>`: check for absence - (inability to use) changes provided by reverted revision. - - Execution order of auxiliary methods when revision is upgrading: - - `_pre_upgrade_###` => `upgrade` => `_check_###` - - Execution order of auxiliary methods when revision is downgrading: - - `downgrade` => `_post_downgrade_###` - - .. _migrate: https://sqlalchemy-migrate.readthedocs.org/en/latest/ - - """ - - @abc.abstractproperty - def INIT_VERSION(self): - """Initial version of a migration repository. - - Can be different from 0, if a migrations were squashed. - - :rtype: int - """ - pass - - @abc.abstractproperty - def REPOSITORY(self): - """Allows basic manipulation with migration repository. - - :returns: `migrate.versioning.repository.Repository` subclass. - """ - pass - - @abc.abstractproperty - def migration_api(self): - """Provides API for upgrading, downgrading and version manipulations. - - :returns: `migrate.api` or overloaded analog. - """ - pass - - @abc.abstractproperty - def migrate_engine(self): - """Provides engine instance. - - Should be the same instance as used when migrations are applied. In - most cases, the `engine` attribute provided by the test class in a - `setUp` method will work. - - Example of implementation: - - def migrate_engine(self): - return self.engine - - :returns: sqlalchemy engine instance - """ - pass - - def _walk_versions(self, snake_walk=False, downgrade=True): - """Check if migration upgrades and downgrades successfully. - - DEPRECATED: this function is deprecated and will be removed from - oslo.db in a few releases. Please use walk_versions() method instead. - """ - self.walk_versions(snake_walk, downgrade) - - def _migrate_down(self, version, with_data=False): - """Migrate down to a previous version of the db. - - DEPRECATED: this function is deprecated and will be removed from - oslo.db in a few releases. Please use migrate_down() method instead. - """ - return self.migrate_down(version, with_data) - - def _migrate_up(self, version, with_data=False): - """Migrate up to a new version of the db. - - DEPRECATED: this function is deprecated and will be removed from - oslo.db in a few releases. Please use migrate_up() method instead. - """ - self.migrate_up(version, with_data) - - def walk_versions(self, snake_walk=False, downgrade=True): - """Check if migration upgrades and downgrades successfully. - - Determine the latest version script from the repo, then - upgrade from 1 through to the latest, with no data - in the databases. This just checks that the schema itself - upgrades successfully. - - `walk_versions` calls `migrate_up` and `migrate_down` with - `with_data` argument to check changes with data, but these methods - can be called without any extra check outside of `walk_versions` - method. - - :param snake_walk: enables checking that each individual migration can - be upgraded/downgraded by itself. - - If we have ordered migrations 123abc, 456def, 789ghi and we run - upgrading with the `snake_walk` argument set to `True`, the - migrations will be applied in the following order: - - `123abc => 456def => 123abc => - 456def => 789ghi => 456def => 789ghi` - - :type snake_walk: bool - :param downgrade: Check downgrade behavior if True. - :type downgrade: bool - """ - - # Place the database under version control - self.migration_api.version_control(self.migrate_engine, - self.REPOSITORY, - self.INIT_VERSION) - self.assertEqual(self.INIT_VERSION, - self.migration_api.db_version(self.migrate_engine, - self.REPOSITORY)) - - LOG.debug('latest version is %s', self.REPOSITORY.latest) - versions = range(self.INIT_VERSION + 1, self.REPOSITORY.latest + 1) - - for version in versions: - # upgrade -> downgrade -> upgrade - self.migrate_up(version, with_data=True) - if snake_walk: - downgraded = self.migrate_down(version - 1, with_data=True) - if downgraded: - self.migrate_up(version) - - if downgrade: - # Now walk it back down to 0 from the latest, testing - # the downgrade paths. - for version in reversed(versions): - # downgrade -> upgrade -> downgrade - downgraded = self.migrate_down(version - 1) - - if snake_walk and downgraded: - self.migrate_up(version) - self.migrate_down(version - 1) - - def migrate_down(self, version, with_data=False): - """Migrate down to a previous version of the db. - - :param version: id of revision to downgrade. - :type version: str - :keyword with_data: Whether to verify the absence of changes from - migration(s) being downgraded, see - :ref:`auxiliary-dynamic-methods <Auxiliary Methods>`. - :type with_data: Bool - """ - - try: - self.migration_api.downgrade(self.migrate_engine, - self.REPOSITORY, version) - except NotImplementedError: - # NOTE(sirp): some migrations, namely release-level - # migrations, don't support a downgrade. - return False - - self.assertEqual(version, self.migration_api.db_version( - self.migrate_engine, self.REPOSITORY)) - - # NOTE(sirp): `version` is what we're downgrading to (i.e. the 'target' - # version). So if we have any downgrade checks, they need to be run for - # the previous (higher numbered) migration. - if with_data: - post_downgrade = getattr( - self, "_post_downgrade_%03d" % (version + 1), None) - if post_downgrade: - post_downgrade(self.migrate_engine) - - return True - - def migrate_up(self, version, with_data=False): - """Migrate up to a new version of the db. - - :param version: id of revision to upgrade. - :type version: str - :keyword with_data: Whether to verify the applied changes with data, - see :ref:`auxiliary-dynamic-methods <Auxiliary Methods>`. - :type with_data: Bool - """ - # NOTE(sdague): try block is here because it's impossible to debug - # where a failed data migration happens otherwise - try: - if with_data: - data = None - pre_upgrade = getattr( - self, "_pre_upgrade_%03d" % version, None) - if pre_upgrade: - data = pre_upgrade(self.migrate_engine) - - self.migration_api.upgrade(self.migrate_engine, - self.REPOSITORY, version) - self.assertEqual(version, - self.migration_api.db_version(self.migrate_engine, - self.REPOSITORY)) - if with_data: - check = getattr(self, "_check_%03d" % version, None) - if check: - check(self.migrate_engine, data) - except exc.DbMigrationError: - msg = _LE("Failed to migrate to version %(ver)s on engine %(eng)s") - LOG.error(msg, {"ver": version, "eng": self.migrate_engine}) - raise - - -@six.add_metaclass(abc.ABCMeta) -class ModelsMigrationsSync(object): - """A helper class for comparison of DB migration scripts and models. - - It's intended to be inherited by test cases in target projects. They have - to provide implementations for methods used internally in the test (as - we have no way to implement them here). - - test_model_sync() will run migration scripts for the engine provided and - then compare the given metadata to the one reflected from the database. - The difference between MODELS and MIGRATION scripts will be printed and - the test will fail, if the difference is not empty. The return value is - really a list of actions, that should be performed in order to make the - current database schema state (i.e. migration scripts) consistent with - models definitions. It's left up to developers to analyze the output and - decide whether the models definitions or the migration scripts should be - modified to make them consistent. - - Output:: - - [( - 'add_table', - description of the table from models - ), - ( - 'remove_table', - description of the table from database - ), - ( - 'add_column', - schema, - table name, - column description from models - ), - ( - 'remove_column', - schema, - table name, - column description from database - ), - ( - 'add_index', - description of the index from models - ), - ( - 'remove_index', - description of the index from database - ), - ( - 'add_constraint', - description of constraint from models - ), - ( - 'remove_constraint, - description of constraint from database - ), - ( - 'modify_nullable', - schema, - table name, - column name, - { - 'existing_type': type of the column from database, - 'existing_server_default': default value from database - }, - nullable from database, - nullable from models - ), - ( - 'modify_type', - schema, - table name, - column name, - { - 'existing_nullable': database nullable, - 'existing_server_default': default value from database - }, - database column type, - type of the column from models - ), - ( - 'modify_default', - schema, - table name, - column name, - { - 'existing_nullable': database nullable, - 'existing_type': type of the column from database - }, - connection column default value, - default from models - )] - - Method include_object() can be overridden to exclude some tables from - comparison (e.g. migrate_repo). - - """ - - @abc.abstractmethod - def db_sync(self, engine): - """Run migration scripts with the given engine instance. - - This method must be implemented in subclasses and run migration scripts - for a DB the given engine is connected to. - - """ - - @abc.abstractmethod - def get_engine(self): - """Return the engine instance to be used when running tests. - - This method must be implemented in subclasses and return an engine - instance to be used when running tests. - - """ - - @abc.abstractmethod - def get_metadata(self): - """Return the metadata instance to be used for schema comparison. - - This method must be implemented in subclasses and return the metadata - instance attached to the BASE model. - - """ - - def include_object(self, object_, name, type_, reflected, compare_to): - """Return True for objects that should be compared. - - :param object_: a SchemaItem object such as a Table or Column object - :param name: the name of the object - :param type_: a string describing the type of object (e.g. "table") - :param reflected: True if the given object was produced based on - table reflection, False if it's from a local - MetaData object - :param compare_to: the object being compared against, if available, - else None - - """ - - return True - - def compare_type(self, ctxt, insp_col, meta_col, insp_type, meta_type): - """Return True if types are different, False if not. - - Return None to allow the default implementation to compare these types. - - :param ctxt: alembic MigrationContext instance - :param insp_col: reflected column - :param meta_col: column from model - :param insp_type: reflected column type - :param meta_type: column type from model - - """ - - # some backends (e.g. mysql) don't provide native boolean type - BOOLEAN_METADATA = (types.BOOLEAN, types.Boolean) - BOOLEAN_SQL = BOOLEAN_METADATA + (types.INTEGER, types.Integer) - - if issubclass(type(meta_type), BOOLEAN_METADATA): - return not issubclass(type(insp_type), BOOLEAN_SQL) - - return None # tells alembic to use the default comparison method - - def compare_server_default(self, ctxt, ins_col, meta_col, - insp_def, meta_def, rendered_meta_def): - """Compare default values between model and db table. - - Return True if the defaults are different, False if not, or None to - allow the default implementation to compare these defaults. - - :param ctxt: alembic MigrationContext instance - :param insp_col: reflected column - :param meta_col: column from model - :param insp_def: reflected column default value - :param meta_def: column default value from model - :param rendered_meta_def: rendered column default value (from model) - - """ - return self._compare_server_default(ctxt.bind, meta_col, insp_def, - meta_def) - - @utils.DialectFunctionDispatcher.dispatch_for_dialect("*") - def _compare_server_default(bind, meta_col, insp_def, meta_def): - pass - - @_compare_server_default.dispatch_for('mysql') - def _compare_server_default(bind, meta_col, insp_def, meta_def): - if isinstance(meta_col.type, sqlalchemy.Boolean): - if meta_def is None or insp_def is None: - return meta_def != insp_def - return not ( - isinstance(meta_def.arg, expr.True_) and insp_def == "'1'" or - isinstance(meta_def.arg, expr.False_) and insp_def == "'0'" - ) - - if isinstance(meta_col.type, sqlalchemy.Integer): - if meta_def is None or insp_def is None: - return meta_def != insp_def - return meta_def.arg != insp_def.split("'")[1] - - @_compare_server_default.dispatch_for('postgresql') - def _compare_server_default(bind, meta_col, insp_def, meta_def): - if isinstance(meta_col.type, sqlalchemy.Enum): - if meta_def is None or insp_def is None: - return meta_def != insp_def - return insp_def != "'%s'::%s" % (meta_def.arg, meta_col.type.name) - elif isinstance(meta_col.type, sqlalchemy.String): - if meta_def is None or insp_def is None: - return meta_def != insp_def - return insp_def != "'%s'::character varying" % meta_def.arg - - def _cleanup(self): - engine = self.get_engine() - with engine.begin() as conn: - inspector = reflection.Inspector.from_engine(engine) - metadata = schema.MetaData() - tbs = [] - all_fks = [] - - for table_name in inspector.get_table_names(): - fks = [] - for fk in inspector.get_foreign_keys(table_name): - if not fk['name']: - continue - fks.append( - schema.ForeignKeyConstraint((), (), name=fk['name']) - ) - table = schema.Table(table_name, metadata, *fks) - tbs.append(table) - all_fks.extend(fks) - - for fkc in all_fks: - conn.execute(schema.DropConstraint(fkc)) - - for table in tbs: - conn.execute(schema.DropTable(table)) - - FKInfo = collections.namedtuple('fk_info', ['constrained_columns', - 'referred_table', - 'referred_columns']) - - def check_foreign_keys(self, metadata, bind): - """Compare foreign keys between model and db table. - - :returns: a list that contains information about: - - * should be a new key added or removed existing, - * name of that key, - * source table, - * referred table, - * constrained columns, - * referred columns - - Output:: - - [('drop_key', - 'testtbl_fk_check_fkey', - 'testtbl', - fk_info(constrained_columns=(u'fk_check',), - referred_table=u'table', - referred_columns=(u'fk_check',)))] - - """ - - diff = [] - insp = sqlalchemy.engine.reflection.Inspector.from_engine(bind) - # Get all tables from db - db_tables = insp.get_table_names() - # Get all tables from models - model_tables = metadata.tables - for table in db_tables: - if table not in model_tables: - continue - # Get all necessary information about key of current table from db - fk_db = dict((self._get_fk_info_from_db(i), i['name']) - for i in insp.get_foreign_keys(table)) - fk_db_set = set(fk_db.keys()) - # Get all necessary information about key of current table from - # models - fk_models = dict((self._get_fk_info_from_model(fk), fk) - for fk in model_tables[table].foreign_keys) - fk_models_set = set(fk_models.keys()) - for key in (fk_db_set - fk_models_set): - diff.append(('drop_key', fk_db[key], table, key)) - LOG.info(("Detected removed foreign key %(fk)r on " - "table %(table)r"), {'fk': fk_db[key], - 'table': table}) - for key in (fk_models_set - fk_db_set): - diff.append(('add_key', fk_models[key], table, key)) - LOG.info(( - "Detected added foreign key for column %(fk)r on table " - "%(table)r"), {'fk': fk_models[key].column.name, - 'table': table}) - return diff - - def _get_fk_info_from_db(self, fk): - return self.FKInfo(tuple(fk['constrained_columns']), - fk['referred_table'], - tuple(fk['referred_columns'])) - - def _get_fk_info_from_model(self, fk): - return self.FKInfo((fk.parent.name,), fk.column.table.name, - (fk.column.name,)) - - def test_models_sync(self): - # recent versions of sqlalchemy and alembic are needed for running of - # this test, but we already have them in requirements - try: - pkg.require('sqlalchemy>=0.8.4', 'alembic>=0.6.2') - except (pkg.VersionConflict, pkg.DistributionNotFound) as e: - self.skipTest('sqlalchemy>=0.8.4 and alembic>=0.6.3 are required' - ' for running of this test: %s' % e) - - # drop all tables after a test run - self.addCleanup(self._cleanup) - - # run migration scripts - self.db_sync(self.get_engine()) - - with self.get_engine().connect() as conn: - opts = { - 'include_object': self.include_object, - 'compare_type': self.compare_type, - 'compare_server_default': self.compare_server_default, - } - mc = alembic.migration.MigrationContext.configure(conn, opts=opts) - - # compare schemas and fail with diff, if it's not empty - diff1 = alembic.autogenerate.compare_metadata(mc, - self.get_metadata()) - diff2 = self.check_foreign_keys(self.get_metadata(), - self.get_engine()) - diff = diff1 + diff2 - if diff: - msg = pprint.pformat(diff, indent=2, width=20) - self.fail( - "Models and migration scripts aren't in sync:\n%s" % msg) +from oslo_db.sqlalchemy.test_migrations import * # noqa |