summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMike Bayer <mike_mp@zzzcomputing.com>2015-01-12 17:26:23 -0500
committerMike Bayer <mike_mp@zzzcomputing.com>2015-01-12 17:26:23 -0500
commitb1fc55c7ce6004311379f4002fdceddcc8da9784 (patch)
tree67a08d5b9f1627078103d5192c9477d8cca5dd00
parent98cdceb1ec7b8242472a2b929e086a362b8dad3d (diff)
downloadoslo-db-b1fc55c7ce6004311379f4002fdceddcc8da9784.tar.gz
Restore the check_foreign_keys() method.1.4.1
This method was prematurely removed from oslo.db without a deprecation phase, when Alembic added support for foreign key autodetection. oslo.db continues to use alembic for this purpose, however the ModelsMigrationsSync.check_foreign_keys() method is restored directly for those projects which were calling into it explicitly. Change-Id: Id892567bd60d6b4b88765bbfe3cd5c5e75910b25
-rw-r--r--oslo_db/sqlalchemy/test_migrations.py72
-rw-r--r--oslo_db/tests/sqlalchemy/test_migrations.py187
2 files changed, 259 insertions, 0 deletions
diff --git a/oslo_db/sqlalchemy/test_migrations.py b/oslo_db/sqlalchemy/test_migrations.py
index 8990790..9b65421 100644
--- a/oslo_db/sqlalchemy/test_migrations.py
+++ b/oslo_db/sqlalchemy/test_migrations.py
@@ -15,6 +15,7 @@
# under the License.
import abc
+import collections
import logging
import pprint
@@ -510,6 +511,77 @@ class ModelsMigrationsSync(object):
for table in tbs:
conn.execute(schema.DropTable(table))
+ FKInfo = collections.namedtuple('fk_info', ['constrained_columns',
+ 'referred_table',
+ 'referred_columns'])
+
+ def check_foreign_keys(self, metadata, bind):
+ """Compare foreign keys between model and db table.
+
+ :returns: a list that contains information about:
+
+ * should be a new key added or removed existing,
+ * name of that key,
+ * source table,
+ * referred table,
+ * constrained columns,
+ * referred columns
+
+ Output::
+
+ [('drop_key',
+ 'testtbl_fk_check_fkey',
+ 'testtbl',
+ fk_info(constrained_columns=(u'fk_check',),
+ referred_table=u'table',
+ referred_columns=(u'fk_check',)))]
+
+ DEPRECATED: this function is deprecated and will be removed from
+ oslo.db in a few releases. Alembic autogenerate.compare_metadata()
+ now includes foreign key comparison directly.
+
+ """
+
+ diff = []
+ insp = sqlalchemy.engine.reflection.Inspector.from_engine(bind)
+ # Get all tables from db
+ db_tables = insp.get_table_names()
+ # Get all tables from models
+ model_tables = metadata.tables
+ for table in db_tables:
+ if table not in model_tables:
+ continue
+ # Get all necessary information about key of current table from db
+ fk_db = dict((self._get_fk_info_from_db(i), i['name'])
+ for i in insp.get_foreign_keys(table))
+ fk_db_set = set(fk_db.keys())
+ # Get all necessary information about key of current table from
+ # models
+ fk_models = dict((self._get_fk_info_from_model(fk), fk)
+ for fk in model_tables[table].foreign_keys)
+ fk_models_set = set(fk_models.keys())
+ for key in (fk_db_set - fk_models_set):
+ diff.append(('drop_key', fk_db[key], table, key))
+ LOG.info(("Detected removed foreign key %(fk)r on "
+ "table %(table)r"), {'fk': fk_db[key],
+ 'table': table})
+ for key in (fk_models_set - fk_db_set):
+ diff.append(('add_key', fk_models[key], table, key))
+ LOG.info((
+ "Detected added foreign key for column %(fk)r on table "
+ "%(table)r"), {'fk': fk_models[key].column.name,
+ 'table': table})
+ return diff
+
+ def _get_fk_info_from_db(self, fk):
+ return self.FKInfo(tuple(fk['constrained_columns']),
+ fk['referred_table'],
+ tuple(fk['referred_columns']))
+
+ def _get_fk_info_from_model(self, fk):
+ return self.FKInfo((fk.parent.name,), fk.column.table.name,
+ (fk.column.name,))
+
def test_models_sync(self):
# recent versions of sqlalchemy and alembic are needed for running of
# this test, but we already have them in requirements
diff --git a/oslo_db/tests/sqlalchemy/test_migrations.py b/oslo_db/tests/sqlalchemy/test_migrations.py
index ad71757..1bae701 100644
--- a/oslo_db/tests/sqlalchemy/test_migrations.py
+++ b/oslo_db/tests/sqlalchemy/test_migrations.py
@@ -307,3 +307,190 @@ class ModelsMigrationsSyncPsql(ModelsMigrationSyncMixin,
def test_models_not_sync(self):
self._test_models_not_sync()
+
+
+class TestOldCheckForeignKeys(test_base.DbTestCase):
+ def setUp(self):
+ super(TestOldCheckForeignKeys, self).setUp()
+
+ test = self
+
+ class MigrateSync(migrate.ModelsMigrationsSync):
+ def get_engine(self):
+ return test.engine
+
+ def get_metadata(self):
+ return test.metadata
+
+ def db_sync(self):
+ raise NotImplementedError()
+
+ self.migration_sync = MigrateSync()
+
+ def _fk_added_fixture(self):
+ self.metadata = sa.MetaData()
+ self.metadata_migrations = sa.MetaData()
+
+ sa.Table(
+ 'testtbl_one', self.metadata,
+ sa.Column('id', sa.Integer, primary_key=True),
+ mysql_engine='InnoDB'
+ )
+
+ sa.Table(
+ 'testtbl_two', self.metadata,
+ sa.Column('id', sa.Integer, primary_key=True),
+ sa.Column('tone_id', sa.Integer),
+ mysql_engine='InnoDB'
+ )
+
+ sa.Table(
+ 'testtbl_one', self.metadata_migrations,
+ sa.Column('id', sa.Integer, primary_key=True),
+ mysql_engine='InnoDB'
+ )
+
+ sa.Table(
+ 'testtbl_two', self.metadata_migrations,
+ sa.Column('id', sa.Integer, primary_key=True),
+ sa.Column(
+ 'tone_id', sa.Integer,
+ sa.ForeignKey('testtbl_one.id', name="tone_id_fk")),
+ mysql_engine='InnoDB'
+ )
+
+ def _fk_removed_fixture(self):
+ self.metadata = sa.MetaData()
+ self.metadata_migrations = sa.MetaData()
+
+ sa.Table(
+ 'testtbl_one', self.metadata,
+ sa.Column('id', sa.Integer, primary_key=True),
+ mysql_engine='InnoDB'
+ )
+
+ sa.Table(
+ 'testtbl_two', self.metadata,
+ sa.Column('id', sa.Integer, primary_key=True),
+ sa.Column(
+ 'tone_id', sa.Integer,
+ sa.ForeignKey('testtbl_one.id', name="tone_id_fk")),
+ mysql_engine='InnoDB'
+ )
+
+ sa.Table(
+ 'testtbl_one', self.metadata_migrations,
+ sa.Column('id', sa.Integer, primary_key=True),
+ mysql_engine='InnoDB'
+ )
+
+ sa.Table(
+ 'testtbl_two', self.metadata_migrations,
+ sa.Column('id', sa.Integer, primary_key=True),
+ sa.Column('tone_id', sa.Integer),
+ mysql_engine='InnoDB'
+ )
+
+ def _fk_no_change_fixture(self):
+ self.metadata = sa.MetaData()
+ self.metadata_migrations = sa.MetaData()
+
+ sa.Table(
+ 'testtbl_one', self.metadata,
+ sa.Column('id', sa.Integer, primary_key=True),
+ mysql_engine='InnoDB'
+ )
+
+ sa.Table(
+ 'testtbl_two', self.metadata,
+ sa.Column('id', sa.Integer, primary_key=True),
+ sa.Column(
+ 'tone_id', sa.Integer,
+ sa.ForeignKey('testtbl_one.id', name="tone_id_fk")),
+ mysql_engine='InnoDB'
+ )
+
+ sa.Table(
+ 'testtbl_one', self.metadata_migrations,
+ sa.Column('id', sa.Integer, primary_key=True),
+ mysql_engine='InnoDB'
+ )
+
+ sa.Table(
+ 'testtbl_two', self.metadata_migrations,
+ sa.Column('id', sa.Integer, primary_key=True),
+ sa.Column(
+ 'tone_id', sa.Integer,
+ sa.ForeignKey('testtbl_one.id', name="tone_id_fk")),
+ mysql_engine='InnoDB'
+ )
+
+ def _run_test(self):
+ self.metadata.create_all(bind=self.engine)
+ return self.migration_sync.check_foreign_keys(
+ self.metadata_migrations, self.engine)
+
+ def _compare_diffs(self, diffs, compare_to):
+ diffs = [
+ (
+ cmd,
+ fk._get_colspec() if isinstance(fk, sa.ForeignKey)
+ else "tone_id_fk" if fk is None # sqlite workaround
+ else fk,
+ tname, fk_info
+ )
+ for cmd, fk, tname, fk_info in diffs
+ ]
+ self.assertEqual(diffs, compare_to)
+
+ def test_fk_added(self):
+ self._fk_added_fixture()
+ diffs = self._run_test()
+
+ self._compare_diffs(
+ diffs,
+ [(
+ 'add_key',
+ 'testtbl_one.id',
+ 'testtbl_two',
+ self.migration_sync.FKInfo(
+ constrained_columns=('tone_id',),
+ referred_table='testtbl_one',
+ referred_columns=('id',))
+ )]
+ )
+
+ def test_fk_removed(self):
+ self._fk_removed_fixture()
+ diffs = self._run_test()
+
+ self._compare_diffs(
+ diffs,
+ [(
+ 'drop_key',
+ "tone_id_fk",
+ 'testtbl_two',
+ self.migration_sync.FKInfo(
+ constrained_columns=('tone_id',),
+ referred_table='testtbl_one',
+ referred_columns=('id',))
+ )]
+ )
+
+ def test_fk_no_change(self):
+ self._fk_no_change_fixture()
+ diffs = self._run_test()
+
+ self._compare_diffs(
+ diffs,
+ [])
+
+
+class PGTestOldCheckForeignKeys(
+ TestOldCheckForeignKeys, test_base.PostgreSQLOpportunisticTestCase):
+ pass
+
+
+class MySQLTestOldCheckForeignKeys(
+ TestOldCheckForeignKeys, test_base.MySQLOpportunisticTestCase):
+ pass