diff options
Diffstat (limited to 'heat/tests/db/test_migrations.py')
-rw-r--r-- | heat/tests/db/test_migrations.py | 200 |
1 files changed, 18 insertions, 182 deletions
diff --git a/heat/tests/db/test_migrations.py b/heat/tests/db/test_migrations.py index 0dbf85bf3..31485c1b0 100644 --- a/heat/tests/db/test_migrations.py +++ b/heat/tests/db/test_migrations.py @@ -1,4 +1,3 @@ -# # Licensed under the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. You may obtain # a copy of the License at @@ -11,31 +10,18 @@ # License for the specific language governing permissions and limitations # under the License. -""" -Tests for database migrations. This test case reads the configuration -file test_migrations.conf for database connection settings -to use in the tests. For each connection found in the config file, -the test case runs a series of test cases to ensure that migrations work -properly both upgrading and downgrading, and that no data loss occurs -if possible. -""" +"""Tests for database migrations.""" import fixtures -import os - -from migrate.versioning import repository from oslo_db.sqlalchemy import enginefacade from oslo_db.sqlalchemy import test_fixtures from oslo_db.sqlalchemy import test_migrations -from oslo_db.sqlalchemy import utils from oslotest import base as test_base import sqlalchemy import testtools -from heat.db.sqlalchemy import migrate_repo from heat.db.sqlalchemy import migration from heat.db.sqlalchemy import models -from heat.tests import common class DBNotAllowed(Exception): @@ -80,168 +66,14 @@ class TestBannedDBSchemaOperations(testtools.TestCase): self.assertRaises(DBNotAllowed, table.alter) -class HeatMigrationsCheckers(test_migrations.WalkVersionsMixin, - common.FakeLogMixin): - """Test sqlalchemy-migrate migrations.""" - - snake_walk = False - downgrade = False - - @property - def INIT_VERSION(self): - return migration.INIT_VERSION - - @property - def REPOSITORY(self): - migrate_file = migrate_repo.__file__ - return repository.Repository( - os.path.abspath(os.path.dirname(migrate_file)) - ) - - @property - def migration_api(self): - temp = __import__('oslo_db.sqlalchemy.migration', globals(), - locals(), ['versioning_api'], 0) - return temp.versioning_api - - @property - def migrate_engine(self): - return self.engine - - def migrate_up(self, version, with_data=False): - """Check that migrations don't cause downtime. - - Schema migrations can be done online, allowing for rolling upgrades. - """ - # NOTE(xek): This is a list of migrations where we allow dropping - # things. The rules for adding exceptions are very very specific. - # Chances are you don't meet the critera. - # Reviewers: DO NOT ALLOW THINGS TO BE ADDED HERE - exceptions = [ - 64, # drop constraint - 86, # drop watch_rule/watch_data tables - ] - # Reviewers: DO NOT ALLOW THINGS TO BE ADDED HERE - - # NOTE(xek): We start requiring things be additive in - # liberty, so ignore all migrations before that point. - LIBERTY_START = 63 - - if version >= LIBERTY_START and version not in exceptions: - banned = ['Table', 'Column'] - else: - banned = None - with BannedDBSchemaOperations(banned): - super(HeatMigrationsCheckers, self).migrate_up(version, with_data) - - def test_walk_versions(self): - self.walk_versions(self.snake_walk, self.downgrade) - - def assertColumnExists(self, engine, table, column): - t = utils.get_table(engine, table) - self.assertIn(column, t.c) - - def assertColumnType(self, engine, table, column, sqltype): - t = utils.get_table(engine, table) - col = getattr(t.c, column) - self.assertIsInstance(col.type, sqltype) - - def assertColumnNotExists(self, engine, table, column): - t = utils.get_table(engine, table) - self.assertNotIn(column, t.c) - - def assertColumnIsNullable(self, engine, table, column): - t = utils.get_table(engine, table) - col = getattr(t.c, column) - self.assertTrue(col.nullable) - - def assertColumnIsNotNullable(self, engine, table, column_name): - table = utils.get_table(engine, table) - column = getattr(table.c, column_name) - self.assertFalse(column.nullable) - - def assertIndexExists(self, engine, table, index): - t = utils.get_table(engine, table) - index_names = [idx.name for idx in t.indexes] - self.assertIn(index, index_names) +class HeatModelsMigrationsSync(test_migrations.ModelsMigrationsSync): - def assertIndexMembers(self, engine, table, index, members): - self.assertIndexExists(engine, table, index) - - t = utils.get_table(engine, table) - index_columns = [] - for idx in t.indexes: - if idx.name == index: - for ix in idx.columns: - index_columns.append(ix.name) - break - - self.assertEqual(sorted(members), sorted(index_columns)) - - def _check_073(self, engine, data): - # check if column still exists and is not nullable. - self.assertColumnIsNotNullable(engine, 'resource_data', 'resource_id') - # Ensure that only one foreign key exists and is created as expected. - inspector = sqlalchemy.engine.reflection.Inspector.from_engine(engine) - resource_data_fkeys = inspector.get_foreign_keys('resource_data') - self.assertEqual(1, len(resource_data_fkeys)) - fk = resource_data_fkeys[0] - self.assertEqual('fk_resource_id', fk['name']) - self.assertEqual(['resource_id'], fk['constrained_columns']) - self.assertEqual('resource', fk['referred_table']) - self.assertEqual(['id'], fk['referred_columns']) - - def _check_079(self, engine, data): - self.assertColumnExists(engine, 'resource', - 'rsrc_prop_data_id') - self.assertColumnExists(engine, 'event', - 'rsrc_prop_data_id') - column_list = [('id', False), - ('data', True), - ('encrypted', True), - ('updated_at', True), - ('created_at', True)] - - for column in column_list: - self.assertColumnExists(engine, - 'resource_properties_data', column[0]) - if not column[1]: - self.assertColumnIsNotNullable(engine, - 'resource_properties_data', - column[0]) - else: - self.assertColumnIsNullable(engine, - 'resource_properties_data', - column[0]) - - def _check_080(self, engine, data): - self.assertColumnExists(engine, 'resource', - 'attr_data_id') - - -class DbTestCase(test_fixtures.OpportunisticDBTestMixin, - test_base.BaseTestCase): def setUp(self): - super(DbTestCase, self).setUp() + super().setUp() self.engine = enginefacade.writer.get_engine() self.sessionmaker = enginefacade.writer.get_sessionmaker() - -class TestHeatMigrationsMySQL(DbTestCase, HeatMigrationsCheckers): - FIXTURE = test_fixtures.MySQLOpportunisticFixture - - -class TestHeatMigrationsPostgreSQL(DbTestCase, HeatMigrationsCheckers): - FIXTURE = test_fixtures.PostgresqlOpportunisticFixture - - -class TestHeatMigrationsSQLite(DbTestCase, HeatMigrationsCheckers): - pass - - -class ModelsMigrationSyncMixin(object): - def get_metadata(self): return models.BASE.metadata @@ -252,24 +84,28 @@ class ModelsMigrationSyncMixin(object): migration.db_sync(engine=engine) def include_object(self, object_, name, type_, reflected, compare_to): - if name in ['migrate_version'] and type_ == 'table': - return False return True -class ModelsMigrationsSyncMysql(DbTestCase, - ModelsMigrationSyncMixin, - test_migrations.ModelsMigrationsSync): +class ModelsMigrationsSyncMysql( + HeatModelsMigrationsSync, + test_fixtures.OpportunisticDBTestMixin, + test_base.BaseTestCase, +): FIXTURE = test_fixtures.MySQLOpportunisticFixture -class ModelsMigrationsSyncPostgres(DbTestCase, - ModelsMigrationSyncMixin, - test_migrations.ModelsMigrationsSync): +class ModelsMigrationsSyncPostgres( + HeatModelsMigrationsSync, + test_fixtures.OpportunisticDBTestMixin, + test_base.BaseTestCase, +): FIXTURE = test_fixtures.PostgresqlOpportunisticFixture -class ModelsMigrationsSyncSQLite(DbTestCase, - ModelsMigrationSyncMixin, - test_migrations.ModelsMigrationsSync): +class ModelsMigrationsSyncSQLite( + HeatModelsMigrationsSync, + test_fixtures.OpportunisticDBTestMixin, + test_base.BaseTestCase, +): pass |