diff options
author | Mike Bayer <mike_mp@zzzcomputing.com> | 2015-01-12 17:26:23 -0500 |
---|---|---|
committer | Mike Bayer <mike_mp@zzzcomputing.com> | 2015-01-12 17:26:23 -0500 |
commit | b1fc55c7ce6004311379f4002fdceddcc8da9784 (patch) | |
tree | 67a08d5b9f1627078103d5192c9477d8cca5dd00 /oslo_db | |
parent | 98cdceb1ec7b8242472a2b929e086a362b8dad3d (diff) | |
download | oslo-db-b1fc55c7ce6004311379f4002fdceddcc8da9784.tar.gz |
Restore the check_foreign_keys() method.1.4.1
This method was prematurely removed from oslo.db without
a deprecation phase, when Alembic added support for
foreign key autodetection. oslo.db continues to use
alembic for this purpose, however the
ModelsMigrationsSync.check_foreign_keys() method is restored
directly for those projects which were calling into it
explicitly.
Change-Id: Id892567bd60d6b4b88765bbfe3cd5c5e75910b25
Diffstat (limited to 'oslo_db')
-rw-r--r-- | oslo_db/sqlalchemy/test_migrations.py | 72 | ||||
-rw-r--r-- | oslo_db/tests/sqlalchemy/test_migrations.py | 187 |
2 files changed, 259 insertions, 0 deletions
diff --git a/oslo_db/sqlalchemy/test_migrations.py b/oslo_db/sqlalchemy/test_migrations.py index 8990790..9b65421 100644 --- a/oslo_db/sqlalchemy/test_migrations.py +++ b/oslo_db/sqlalchemy/test_migrations.py @@ -15,6 +15,7 @@ # under the License. import abc +import collections import logging import pprint @@ -510,6 +511,77 @@ class ModelsMigrationsSync(object): for table in tbs: conn.execute(schema.DropTable(table)) + FKInfo = collections.namedtuple('fk_info', ['constrained_columns', + 'referred_table', + 'referred_columns']) + + def check_foreign_keys(self, metadata, bind): + """Compare foreign keys between model and db table. + + :returns: a list that contains information about: + + * should be a new key added or removed existing, + * name of that key, + * source table, + * referred table, + * constrained columns, + * referred columns + + Output:: + + [('drop_key', + 'testtbl_fk_check_fkey', + 'testtbl', + fk_info(constrained_columns=(u'fk_check',), + referred_table=u'table', + referred_columns=(u'fk_check',)))] + + DEPRECATED: this function is deprecated and will be removed from + oslo.db in a few releases. Alembic autogenerate.compare_metadata() + now includes foreign key comparison directly. + + """ + + diff = [] + insp = sqlalchemy.engine.reflection.Inspector.from_engine(bind) + # Get all tables from db + db_tables = insp.get_table_names() + # Get all tables from models + model_tables = metadata.tables + for table in db_tables: + if table not in model_tables: + continue + # Get all necessary information about key of current table from db + fk_db = dict((self._get_fk_info_from_db(i), i['name']) + for i in insp.get_foreign_keys(table)) + fk_db_set = set(fk_db.keys()) + # Get all necessary information about key of current table from + # models + fk_models = dict((self._get_fk_info_from_model(fk), fk) + for fk in model_tables[table].foreign_keys) + fk_models_set = set(fk_models.keys()) + for key in (fk_db_set - fk_models_set): + diff.append(('drop_key', fk_db[key], table, key)) + LOG.info(("Detected removed foreign key %(fk)r on " + "table %(table)r"), {'fk': fk_db[key], + 'table': table}) + for key in (fk_models_set - fk_db_set): + diff.append(('add_key', fk_models[key], table, key)) + LOG.info(( + "Detected added foreign key for column %(fk)r on table " + "%(table)r"), {'fk': fk_models[key].column.name, + 'table': table}) + return diff + + def _get_fk_info_from_db(self, fk): + return self.FKInfo(tuple(fk['constrained_columns']), + fk['referred_table'], + tuple(fk['referred_columns'])) + + def _get_fk_info_from_model(self, fk): + return self.FKInfo((fk.parent.name,), fk.column.table.name, + (fk.column.name,)) + def test_models_sync(self): # recent versions of sqlalchemy and alembic are needed for running of # this test, but we already have them in requirements diff --git a/oslo_db/tests/sqlalchemy/test_migrations.py b/oslo_db/tests/sqlalchemy/test_migrations.py index ad71757..1bae701 100644 --- a/oslo_db/tests/sqlalchemy/test_migrations.py +++ b/oslo_db/tests/sqlalchemy/test_migrations.py @@ -307,3 +307,190 @@ class ModelsMigrationsSyncPsql(ModelsMigrationSyncMixin, def test_models_not_sync(self): self._test_models_not_sync() + + +class TestOldCheckForeignKeys(test_base.DbTestCase): + def setUp(self): + super(TestOldCheckForeignKeys, self).setUp() + + test = self + + class MigrateSync(migrate.ModelsMigrationsSync): + def get_engine(self): + return test.engine + + def get_metadata(self): + return test.metadata + + def db_sync(self): + raise NotImplementedError() + + self.migration_sync = MigrateSync() + + def _fk_added_fixture(self): + self.metadata = sa.MetaData() + self.metadata_migrations = sa.MetaData() + + sa.Table( + 'testtbl_one', self.metadata, + sa.Column('id', sa.Integer, primary_key=True), + mysql_engine='InnoDB' + ) + + sa.Table( + 'testtbl_two', self.metadata, + sa.Column('id', sa.Integer, primary_key=True), + sa.Column('tone_id', sa.Integer), + mysql_engine='InnoDB' + ) + + sa.Table( + 'testtbl_one', self.metadata_migrations, + sa.Column('id', sa.Integer, primary_key=True), + mysql_engine='InnoDB' + ) + + sa.Table( + 'testtbl_two', self.metadata_migrations, + sa.Column('id', sa.Integer, primary_key=True), + sa.Column( + 'tone_id', sa.Integer, + sa.ForeignKey('testtbl_one.id', name="tone_id_fk")), + mysql_engine='InnoDB' + ) + + def _fk_removed_fixture(self): + self.metadata = sa.MetaData() + self.metadata_migrations = sa.MetaData() + + sa.Table( + 'testtbl_one', self.metadata, + sa.Column('id', sa.Integer, primary_key=True), + mysql_engine='InnoDB' + ) + + sa.Table( + 'testtbl_two', self.metadata, + sa.Column('id', sa.Integer, primary_key=True), + sa.Column( + 'tone_id', sa.Integer, + sa.ForeignKey('testtbl_one.id', name="tone_id_fk")), + mysql_engine='InnoDB' + ) + + sa.Table( + 'testtbl_one', self.metadata_migrations, + sa.Column('id', sa.Integer, primary_key=True), + mysql_engine='InnoDB' + ) + + sa.Table( + 'testtbl_two', self.metadata_migrations, + sa.Column('id', sa.Integer, primary_key=True), + sa.Column('tone_id', sa.Integer), + mysql_engine='InnoDB' + ) + + def _fk_no_change_fixture(self): + self.metadata = sa.MetaData() + self.metadata_migrations = sa.MetaData() + + sa.Table( + 'testtbl_one', self.metadata, + sa.Column('id', sa.Integer, primary_key=True), + mysql_engine='InnoDB' + ) + + sa.Table( + 'testtbl_two', self.metadata, + sa.Column('id', sa.Integer, primary_key=True), + sa.Column( + 'tone_id', sa.Integer, + sa.ForeignKey('testtbl_one.id', name="tone_id_fk")), + mysql_engine='InnoDB' + ) + + sa.Table( + 'testtbl_one', self.metadata_migrations, + sa.Column('id', sa.Integer, primary_key=True), + mysql_engine='InnoDB' + ) + + sa.Table( + 'testtbl_two', self.metadata_migrations, + sa.Column('id', sa.Integer, primary_key=True), + sa.Column( + 'tone_id', sa.Integer, + sa.ForeignKey('testtbl_one.id', name="tone_id_fk")), + mysql_engine='InnoDB' + ) + + def _run_test(self): + self.metadata.create_all(bind=self.engine) + return self.migration_sync.check_foreign_keys( + self.metadata_migrations, self.engine) + + def _compare_diffs(self, diffs, compare_to): + diffs = [ + ( + cmd, + fk._get_colspec() if isinstance(fk, sa.ForeignKey) + else "tone_id_fk" if fk is None # sqlite workaround + else fk, + tname, fk_info + ) + for cmd, fk, tname, fk_info in diffs + ] + self.assertEqual(diffs, compare_to) + + def test_fk_added(self): + self._fk_added_fixture() + diffs = self._run_test() + + self._compare_diffs( + diffs, + [( + 'add_key', + 'testtbl_one.id', + 'testtbl_two', + self.migration_sync.FKInfo( + constrained_columns=('tone_id',), + referred_table='testtbl_one', + referred_columns=('id',)) + )] + ) + + def test_fk_removed(self): + self._fk_removed_fixture() + diffs = self._run_test() + + self._compare_diffs( + diffs, + [( + 'drop_key', + "tone_id_fk", + 'testtbl_two', + self.migration_sync.FKInfo( + constrained_columns=('tone_id',), + referred_table='testtbl_one', + referred_columns=('id',)) + )] + ) + + def test_fk_no_change(self): + self._fk_no_change_fixture() + diffs = self._run_test() + + self._compare_diffs( + diffs, + []) + + +class PGTestOldCheckForeignKeys( + TestOldCheckForeignKeys, test_base.PostgreSQLOpportunisticTestCase): + pass + + +class MySQLTestOldCheckForeignKeys( + TestOldCheckForeignKeys, test_base.MySQLOpportunisticTestCase): + pass |