summaryrefslogtreecommitdiff
path: root/oslo_db/tests
diff options
context:
space:
mode:
authorMike Bayer <mike_mp@zzzcomputing.com>2022-08-12 18:47:57 -0400
committerMike Bayer <mike_mp@zzzcomputing.com>2023-03-21 13:59:26 -0400
commit94d6e24ca19b0116eed00d5ccdb8a538918c6dcf (patch)
tree7006674c4fcd747ae7c679ac4d2a79f77075866a /oslo_db/tests
parent877bcfc6a6ed16ba6885f47824df6b1f5ac60b4e (diff)
downloadoslo-db-94d6e24ca19b0116eed00d5ccdb8a538918c6dcf.tar.gz
Remove sqlalchemy-migrate
sqlalchemy-migrate does not support SQLAlchemy 2.x and we're not going to invest the time in changing this. Remove integration of sqlalchemy-migrate in oslo.db, allowing us to support SQLAlchemy 2.x fully. Note that we do not remove the 'migration_cli' module entirely yet. While this is deprecated, it is possible to use this with alembic. New users shouldn't be switching to it, but any existing users can continue to use this module for some time yet. Change-Id: Ic3d6bd318038d723b0d50d39e45f8e26289e9a57 Sem-Ver: api-break
Diffstat (limited to 'oslo_db/tests')
-rw-r--r--oslo_db/tests/fixtures.py37
-rw-r--r--oslo_db/tests/sqlalchemy/test_migrate_cli.py83
-rw-r--r--oslo_db/tests/sqlalchemy/test_migration_common.py287
-rw-r--r--oslo_db/tests/sqlalchemy/test_migrations.py162
-rw-r--r--oslo_db/tests/sqlalchemy/test_utils.py151
5 files changed, 10 insertions, 710 deletions
diff --git a/oslo_db/tests/fixtures.py b/oslo_db/tests/fixtures.py
index 62a88fb..14d7f07 100644
--- a/oslo_db/tests/fixtures.py
+++ b/oslo_db/tests/fixtures.py
@@ -24,49 +24,32 @@ class WarningsFixture(fixtures.Fixture):
self._original_warning_filters = warnings.filters[:]
+ # Enable deprecation warnings
+
warnings.simplefilter('once', DeprecationWarning)
- # Except things we've deprecated but are still testing until removal
+ # Except for things we've deprecated but are still testing until
+ # removal
warnings.filterwarnings(
'ignore',
category=DeprecationWarning,
- module='oslo_db')
+ module='oslo_db',
+ )
# Enable generic warnings to ensure we're not doing anything odd
warnings.filterwarnings(
'error',
- category=sqla_exc.SAWarning)
+ category=sqla_exc.SAWarning,
+ )
# Enable deprecation warnings to capture upcoming SQLAlchemy changes
warnings.filterwarnings(
'error',
- category=sqla_exc.SADeprecationWarning)
-
- # ...but filter things that aren't our fault
-
- # FIXME(stephenfin): These are caused by sqlalchemy-migrate, not us,
- # and should be removed when we drop support for that library
-
- warnings.filterwarnings(
- 'ignore',
- message=r'Passing a string to Connection.execute\(\) .*',
- module='migrate',
- category=sqla_exc.SADeprecationWarning)
-
- warnings.filterwarnings(
- 'once',
- message=r'The current statement is being autocommitted .*',
- module='migrate',
- category=sqla_exc.SADeprecationWarning)
-
- warnings.filterwarnings(
- 'ignore',
- message=r'The Engine.execute\(\) method is considered legacy .*',
- module='migrate',
- category=sqla_exc.SADeprecationWarning)
+ category=sqla_exc.SADeprecationWarning,
+ )
self.addCleanup(self._reset_warning_filters)
diff --git a/oslo_db/tests/sqlalchemy/test_migrate_cli.py b/oslo_db/tests/sqlalchemy/test_migrate_cli.py
index 21716d0..ca9485f 100644
--- a/oslo_db/tests/sqlalchemy/test_migrate_cli.py
+++ b/oslo_db/tests/sqlalchemy/test_migrate_cli.py
@@ -17,7 +17,6 @@ import sqlalchemy
from oslo_db import exception
from oslo_db.sqlalchemy.migration_cli import ext_alembic
-from oslo_db.sqlalchemy.migration_cli import ext_migrate
from oslo_db.sqlalchemy.migration_cli import manager
from oslo_db.tests import base as test_base
@@ -127,88 +126,6 @@ class TestAlembicExtension(test_base.BaseTestCase):
'test')
-@mock.patch(('oslo_db.sqlalchemy.migration_cli.'
- 'ext_migrate.migration'))
-class TestMigrateExtension(test_base.BaseTestCase):
-
- def setUp(self):
- self.migration_config = {'migration_repo_path': '.',
- 'db_url': 'sqlite://'}
- self.engine = sqlalchemy.create_engine(self.migration_config['db_url'])
- self.migrate = ext_migrate.MigrateExtension(
- self.engine, self.migration_config)
- super(TestMigrateExtension, self).setUp()
-
- def test_check_enabled_true(self, migration):
- self.assertTrue(self.migrate.enabled)
-
- def test_check_enabled_false(self, migration):
- self.migration_config['migration_repo_path'] = ''
- migrate = ext_migrate.MigrateExtension(
- self.engine, self.migration_config)
- self.assertFalse(migrate.enabled)
-
- def test_upgrade_head(self, migration):
- self.migrate.upgrade('head')
- migration.db_sync.assert_called_once_with(
- self.migrate.engine, self.migrate.repository, None, init_version=0)
-
- def test_upgrade_normal(self, migration):
- self.migrate.upgrade(111)
- migration.db_sync.assert_called_once_with(
- mock.ANY, self.migrate.repository, 111, init_version=0)
-
- def test_downgrade_init_version_from_base(self, migration):
- self.migrate.downgrade('base')
- migration.db_sync.assert_called_once_with(
- self.migrate.engine, self.migrate.repository, mock.ANY,
- init_version=mock.ANY)
-
- def test_downgrade_init_version_from_none(self, migration):
- self.migrate.downgrade(None)
- migration.db_sync.assert_called_once_with(
- self.migrate.engine, self.migrate.repository, mock.ANY,
- init_version=mock.ANY)
-
- def test_downgrade_normal(self, migration):
- self.migrate.downgrade(101)
- migration.db_sync.assert_called_once_with(
- self.migrate.engine, self.migrate.repository, 101, init_version=0)
-
- def test_version(self, migration):
- self.migrate.version()
- migration.db_version.assert_called_once_with(
- self.migrate.engine, self.migrate.repository, init_version=0)
-
- def test_change_init_version(self, migration):
- self.migration_config['init_version'] = 101
- migrate = ext_migrate.MigrateExtension(
- self.engine, self.migration_config)
- migrate.downgrade(None)
- migration.db_sync.assert_called_once_with(
- migrate.engine,
- self.migrate.repository,
- self.migration_config['init_version'],
- init_version=self.migration_config['init_version'])
-
- def test_has_revision(self, command):
- with mock.patch(('oslo_db.sqlalchemy.migration_cli.'
- 'ext_migrate.migrate_version')) as mocked:
- self.migrate.has_revision('test')
- mocked.Collection().version.assert_called_once_with('test')
- # tip of the branch should always be True
- self.assertIs(True, self.migrate.has_revision(None))
-
- def test_has_revision_negative(self, command):
- with mock.patch(('oslo_db.sqlalchemy.migration_cli.'
- 'ext_migrate.migrate_version')) as mocked:
- mocked.Collection().version.side_effect = ValueError
- self.assertIs(False, self.migrate.has_revision('test'))
- mocked.Collection().version.assert_called_once_with('test')
- # relative revision, should be False for migrate
- self.assertIs(False, self.migrate.has_revision('+1'))
-
-
class TestMigrationManager(test_base.BaseTestCase):
def setUp(self):
diff --git a/oslo_db/tests/sqlalchemy/test_migration_common.py b/oslo_db/tests/sqlalchemy/test_migration_common.py
deleted file mode 100644
index d0b10cd..0000000
--- a/oslo_db/tests/sqlalchemy/test_migration_common.py
+++ /dev/null
@@ -1,287 +0,0 @@
-# Copyright 2013 Mirantis Inc.
-# All Rights Reserved
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-#
-
-import os
-import tempfile
-from unittest import mock
-
-from migrate import exceptions as migrate_exception
-from migrate.versioning import api as versioning_api
-import sqlalchemy
-
-from oslo_db import exception as db_exception
-from oslo_db.sqlalchemy import migration
-from oslo_db.tests.sqlalchemy import base as test_base
-from oslo_db.tests import utils as test_utils
-
-
-class TestMigrationCommon(test_base._DbTestCase):
- def setUp(self):
- super(TestMigrationCommon, self).setUp()
-
- migration._REPOSITORY = None
- self.path = tempfile.mkdtemp('test_migration')
- self.path1 = tempfile.mkdtemp('test_migration')
- self.return_value = '/home/openstack/migrations'
- self.return_value1 = '/home/extension/migrations'
- self.init_version = 1
- self.test_version = 123
-
- self.patcher_repo = mock.patch.object(migration, 'Repository')
- self.repository = self.patcher_repo.start()
- self.repository.side_effect = [self.return_value, self.return_value1]
-
- self.mock_api_db = mock.patch.object(versioning_api, 'db_version')
- self.mock_api_db_version = self.mock_api_db.start()
- self.mock_api_db_version.return_value = self.test_version
-
- def tearDown(self):
- os.rmdir(self.path)
- self.mock_api_db.stop()
- self.patcher_repo.stop()
- super(TestMigrationCommon, self).tearDown()
-
- def test_find_migrate_repo_path_not_found(self):
- self.assertRaises(
- db_exception.DBMigrationError,
- migration._find_migrate_repo,
- "/foo/bar/",
- )
- self.assertIsNone(migration._REPOSITORY)
-
- def test_find_migrate_repo_called_once(self):
- my_repository = migration._find_migrate_repo(self.path)
- self.repository.assert_called_once_with(self.path)
- self.assertEqual(self.return_value, my_repository)
-
- def test_find_migrate_repo_called_few_times(self):
- repo1 = migration._find_migrate_repo(self.path)
- repo2 = migration._find_migrate_repo(self.path1)
- self.assertNotEqual(repo1, repo2)
-
- def test_db_version_control(self):
- with test_utils.nested(
- mock.patch.object(migration, '_find_migrate_repo'),
- mock.patch.object(versioning_api, 'version_control'),
- ) as (mock_find_repo, mock_version_control):
- mock_find_repo.return_value = self.return_value
-
- version = migration.db_version_control(
- self.engine, self.path, self.test_version)
-
- self.assertEqual(self.test_version, version)
- mock_version_control.assert_called_once_with(
- self.engine, self.return_value, self.test_version)
-
- @mock.patch.object(migration, '_find_migrate_repo')
- @mock.patch.object(versioning_api, 'version_control')
- def test_db_version_control_version_less_than_actual_version(
- self, mock_version_control, mock_find_repo):
- mock_find_repo.return_value = self.return_value
- mock_version_control.side_effect = (migrate_exception.
- DatabaseAlreadyControlledError)
- self.assertRaises(db_exception.DBMigrationError,
- migration.db_version_control, self.engine,
- self.path, self.test_version - 1)
-
- @mock.patch.object(migration, '_find_migrate_repo')
- @mock.patch.object(versioning_api, 'version_control')
- def test_db_version_control_version_greater_than_actual_version(
- self, mock_version_control, mock_find_repo):
- mock_find_repo.return_value = self.return_value
- mock_version_control.side_effect = (migrate_exception.
- InvalidVersionError)
- self.assertRaises(db_exception.DBMigrationError,
- migration.db_version_control, self.engine,
- self.path, self.test_version + 1)
-
- def test_db_version_return(self):
- ret_val = migration.db_version(self.engine, self.path,
- self.init_version)
- self.assertEqual(self.test_version, ret_val)
-
- def test_db_version_raise_not_controlled_error_first(self):
- with mock.patch.object(migration, 'db_version_control') as mock_ver:
-
- self.mock_api_db_version.side_effect = [
- migrate_exception.DatabaseNotControlledError('oups'),
- self.test_version]
-
- ret_val = migration.db_version(self.engine, self.path,
- self.init_version)
- self.assertEqual(self.test_version, ret_val)
- mock_ver.assert_called_once_with(self.engine, self.path,
- version=self.init_version)
-
- def test_db_version_raise_not_controlled_error_tables(self):
- with mock.patch.object(sqlalchemy, 'MetaData') as mock_meta:
- self.mock_api_db_version.side_effect = \
- migrate_exception.DatabaseNotControlledError('oups')
- my_meta = mock.MagicMock()
- my_meta.tables = {'a': 1, 'b': 2}
- mock_meta.return_value = my_meta
-
- self.assertRaises(
- db_exception.DBMigrationError, migration.db_version,
- self.engine, self.path, self.init_version)
-
- @mock.patch.object(versioning_api, 'version_control')
- def test_db_version_raise_not_controlled_error_no_tables(self, mock_vc):
- with mock.patch.object(sqlalchemy, 'MetaData') as mock_meta:
- self.mock_api_db_version.side_effect = (
- migrate_exception.DatabaseNotControlledError('oups'),
- self.init_version)
- my_meta = mock.MagicMock()
- my_meta.tables = {}
- mock_meta.return_value = my_meta
- migration.db_version(self.engine, self.path, self.init_version)
-
- mock_vc.assert_called_once_with(self.engine, self.return_value1,
- self.init_version)
-
- @mock.patch.object(versioning_api, 'version_control')
- def test_db_version_raise_not_controlled_alembic_tables(self, mock_vc):
- # When there are tables but the alembic control table
- # (alembic_version) is present, attempt to version the db.
- # This simulates the case where there is are multiple repos (different
- # abs_paths) and a different path has been versioned already.
- with mock.patch.object(sqlalchemy, 'MetaData') as mock_meta:
- self.mock_api_db_version.side_effect = [
- migrate_exception.DatabaseNotControlledError('oups'), None]
- my_meta = mock.MagicMock()
- my_meta.tables = {'alembic_version': 1, 'b': 2}
- mock_meta.return_value = my_meta
-
- migration.db_version(self.engine, self.path, self.init_version)
-
- mock_vc.assert_called_once_with(self.engine, self.return_value1,
- self.init_version)
-
- @mock.patch.object(versioning_api, 'version_control')
- def test_db_version_raise_not_controlled_migrate_tables(self, mock_vc):
- # When there are tables but the sqlalchemy-migrate control table
- # (migrate_version) is present, attempt to version the db.
- # This simulates the case where there is are multiple repos (different
- # abs_paths) and a different path has been versioned already.
- with mock.patch.object(sqlalchemy, 'MetaData') as mock_meta:
- self.mock_api_db_version.side_effect = [
- migrate_exception.DatabaseNotControlledError('oups'), None]
- my_meta = mock.MagicMock()
- my_meta.tables = {'migrate_version': 1, 'b': 2}
- mock_meta.return_value = my_meta
-
- migration.db_version(self.engine, self.path, self.init_version)
-
- mock_vc.assert_called_once_with(self.engine, self.return_value1,
- self.init_version)
-
- def test_db_sync_wrong_version(self):
- self.assertRaises(db_exception.DBMigrationError,
- migration.db_sync, self.engine, self.path, 'foo')
-
- @mock.patch.object(versioning_api, 'upgrade')
- def test_db_sync_script_not_present(self, upgrade):
- # For non existent migration script file sqlalchemy-migrate will raise
- # VersionNotFoundError which will be wrapped in DBMigrationError.
- upgrade.side_effect = migrate_exception.VersionNotFoundError
- self.assertRaises(db_exception.DBMigrationError,
- migration.db_sync, self.engine, self.path,
- self.test_version + 1)
-
- @mock.patch.object(versioning_api, 'upgrade')
- def test_db_sync_known_error_raised(self, upgrade):
- upgrade.side_effect = migrate_exception.KnownError
- self.assertRaises(db_exception.DBMigrationError,
- migration.db_sync, self.engine, self.path,
- self.test_version + 1)
-
- def test_db_sync_upgrade(self):
- init_ver = 55
- with test_utils.nested(
- mock.patch.object(migration, '_find_migrate_repo'),
- mock.patch.object(versioning_api, 'upgrade')
- ) as (mock_find_repo, mock_upgrade):
-
- mock_find_repo.return_value = self.return_value
- self.mock_api_db_version.return_value = self.test_version - 1
-
- migration.db_sync(self.engine, self.path, self.test_version,
- init_ver)
-
- mock_upgrade.assert_called_once_with(
- self.engine, self.return_value, self.test_version)
-
- def test_db_sync_downgrade(self):
- with test_utils.nested(
- mock.patch.object(migration, '_find_migrate_repo'),
- mock.patch.object(versioning_api, 'downgrade')
- ) as (mock_find_repo, mock_downgrade):
-
- mock_find_repo.return_value = self.return_value
- self.mock_api_db_version.return_value = self.test_version + 1
-
- migration.db_sync(self.engine, self.path, self.test_version)
-
- mock_downgrade.assert_called_once_with(
- self.engine, self.return_value, self.test_version)
-
- def test_db_sync_sanity_called(self):
- with test_utils.nested(
- mock.patch.object(migration, '_find_migrate_repo'),
- mock.patch.object(migration, '_db_schema_sanity_check'),
- mock.patch.object(versioning_api, 'downgrade')
- ) as (mock_find_repo, mock_sanity, mock_downgrade):
-
- mock_find_repo.return_value = self.return_value
- migration.db_sync(self.engine, self.path, self.test_version)
-
- self.assertEqual([mock.call(self.engine), mock.call(self.engine)],
- mock_sanity.call_args_list)
-
- def test_db_sync_sanity_skipped(self):
- with test_utils.nested(
- mock.patch.object(migration, '_find_migrate_repo'),
- mock.patch.object(migration, '_db_schema_sanity_check'),
- mock.patch.object(versioning_api, 'downgrade')
- ) as (mock_find_repo, mock_sanity, mock_downgrade):
-
- mock_find_repo.return_value = self.return_value
- migration.db_sync(self.engine, self.path, self.test_version,
- sanity_check=False)
-
- self.assertFalse(mock_sanity.called)
-
- def test_db_sanity_table_not_utf8(self):
- with mock.patch.object(self, 'engine') as mock_eng:
- type(mock_eng).name = mock.PropertyMock(return_value='mysql')
- mock_eng.execute.return_value = [['table_A', 'latin1'],
- ['table_B', 'latin1']]
-
- self.assertRaises(ValueError, migration._db_schema_sanity_check,
- mock_eng)
-
- def test_db_sanity_table_not_utf8_exclude_migrate_tables(self):
- with mock.patch.object(self, 'engine') as mock_eng:
- type(mock_eng).name = mock.PropertyMock(return_value='mysql')
- # NOTE(morganfainberg): Check both lower and upper case versions
- # of the migration table names (validate case insensitivity in
- # the sanity check.
- mock_eng.execute.return_value = [['migrate_version', 'latin1'],
- ['alembic_version', 'latin1'],
- ['MIGRATE_VERSION', 'latin1'],
- ['ALEMBIC_VERSION', 'latin1']]
-
- migration._db_schema_sanity_check(mock_eng)
diff --git a/oslo_db/tests/sqlalchemy/test_migrations.py b/oslo_db/tests/sqlalchemy/test_migrations.py
index fe15191..275c277 100644
--- a/oslo_db/tests/sqlalchemy/test_migrations.py
+++ b/oslo_db/tests/sqlalchemy/test_migrations.py
@@ -16,175 +16,13 @@
from unittest import mock
-import fixtures
-from migrate.versioning import api as versioning_api
import sqlalchemy as sa
from sqlalchemy import orm
-from oslo_db import exception as exc
from oslo_db.sqlalchemy import test_migrations as migrate
-from oslo_db.tests import base as test_base
from oslo_db.tests.sqlalchemy import base as db_test_base
-class TestWalkVersions(test_base.BaseTestCase, migrate.WalkVersionsMixin):
- migration_api = mock.MagicMock()
- REPOSITORY = mock.MagicMock()
- engine = mock.MagicMock()
- INIT_VERSION = versioning_api.VerNum(4)
-
- @property
- def migrate_engine(self):
- return self.engine
-
- def test_migrate_up(self):
- self.migration_api.db_version.return_value = 141
-
- self.migrate_up(141)
-
- self.migration_api.upgrade.assert_called_with(
- self.engine, self.REPOSITORY, 141)
- self.migration_api.db_version.assert_called_with(
- self.engine, self.REPOSITORY)
-
- @staticmethod
- def _fake_upgrade_boom(*args, **kwargs):
- raise exc.DBMigrationError("boom")
-
- def test_migrate_up_fail(self):
- version = 141
- self.migration_api.db_version.return_value = version
- expected_output = (
- "Failed to migrate to version %(version)s on "
- "engine %(engine)s\n" %
- {'version': version, 'engine': self.engine})
-
- with mock.patch.object(
- self.migration_api,
- 'upgrade',
- side_effect=self._fake_upgrade_boom,
- ):
- log = self.useFixture(fixtures.FakeLogger())
- self.assertRaises(exc.DBMigrationError, self.migrate_up, version)
- self.assertEqual(expected_output, log.output)
-
- def test_migrate_up_with_data(self):
- test_value = {"a": 1, "b": 2}
- self.migration_api.db_version.return_value = 141
- self._pre_upgrade_141 = mock.MagicMock()
- self._pre_upgrade_141.return_value = test_value
- self._check_141 = mock.MagicMock()
-
- self.migrate_up(141, True)
-
- self._pre_upgrade_141.assert_called_with(self.engine)
- self._check_141.assert_called_with(self.engine, test_value)
-
- def test_migrate_down(self):
- self.migration_api.db_version.return_value = 42
-
- self.assertTrue(self.migrate_down(42))
- self.migration_api.db_version.assert_called_with(
- self.engine, self.REPOSITORY)
-
- def test_migrate_down_not_implemented(self):
- with mock.patch.object(
- self.migration_api,
- 'downgrade',
- side_effect=NotImplementedError,
- ):
- self.assertFalse(self.migrate_down(self.engine, 42))
-
- def test_migrate_down_with_data(self):
- self._post_downgrade_043 = mock.MagicMock()
- self.migration_api.db_version.return_value = 42
-
- self.migrate_down(42, True)
-
- self._post_downgrade_043.assert_called_with(self.engine)
-
- @mock.patch.object(migrate.WalkVersionsMixin, 'migrate_up')
- @mock.patch.object(migrate.WalkVersionsMixin, 'migrate_down')
- def test_walk_versions_all_default(self, migrate_up, migrate_down):
- self.REPOSITORY.latest = versioning_api.VerNum(20)
- self.migration_api.db_version.return_value = self.INIT_VERSION
-
- self.walk_versions()
-
- self.migration_api.version_control.assert_called_with(
- self.engine, self.REPOSITORY, self.INIT_VERSION)
- self.migration_api.db_version.assert_called_with(
- self.engine, self.REPOSITORY)
-
- versions = range(int(self.INIT_VERSION) + 1,
- int(self.REPOSITORY.latest) + 1)
- upgraded = [mock.call(v, with_data=True)
- for v in versions]
- self.assertEqual(upgraded, self.migrate_up.call_args_list)
-
- downgraded = [mock.call(v - 1) for v in reversed(versions)]
- self.assertEqual(downgraded, self.migrate_down.call_args_list)
-
- @mock.patch.object(migrate.WalkVersionsMixin, 'migrate_up')
- @mock.patch.object(migrate.WalkVersionsMixin, 'migrate_down')
- def test_walk_versions_all_true(self, migrate_up, migrate_down):
- self.REPOSITORY.latest = versioning_api.VerNum(20)
- self.migration_api.db_version.return_value = self.INIT_VERSION
-
- self.walk_versions(snake_walk=True, downgrade=True)
-
- versions = range(int(self.INIT_VERSION) + 1,
- int(self.REPOSITORY.latest) + 1)
- upgraded = []
- for v in versions:
- upgraded.append(mock.call(v, with_data=True))
- upgraded.append(mock.call(v))
- upgraded.extend([mock.call(v) for v in reversed(versions)])
- self.assertEqual(upgraded, self.migrate_up.call_args_list)
-
- downgraded_1 = [mock.call(v - 1, with_data=True) for v in versions]
- downgraded_2 = []
- for v in reversed(versions):
- downgraded_2.append(mock.call(v - 1))
- downgraded_2.append(mock.call(v - 1))
- downgraded = downgraded_1 + downgraded_2
- self.assertEqual(downgraded, self.migrate_down.call_args_list)
-
- @mock.patch.object(migrate.WalkVersionsMixin, 'migrate_up')
- @mock.patch.object(migrate.WalkVersionsMixin, 'migrate_down')
- def test_walk_versions_true_false(self, migrate_up, migrate_down):
- self.REPOSITORY.latest = versioning_api.VerNum(20)
- self.migration_api.db_version.return_value = self.INIT_VERSION
-
- self.walk_versions(snake_walk=True, downgrade=False)
-
- versions = range(int(self.INIT_VERSION) + 1,
- int(self.REPOSITORY.latest) + 1)
-
- upgraded = []
- for v in versions:
- upgraded.append(mock.call(v, with_data=True))
- upgraded.append(mock.call(v))
- self.assertEqual(upgraded, self.migrate_up.call_args_list)
-
- downgraded = [mock.call(v - 1, with_data=True) for v in versions]
- self.assertEqual(downgraded, self.migrate_down.call_args_list)
-
- @mock.patch.object(migrate.WalkVersionsMixin, 'migrate_up')
- @mock.patch.object(migrate.WalkVersionsMixin, 'migrate_down')
- def test_walk_versions_all_false(self, migrate_up, migrate_down):
- self.REPOSITORY.latest = versioning_api.VerNum(20)
- self.migration_api.db_version.return_value = self.INIT_VERSION
-
- self.walk_versions(snake_walk=False, downgrade=False)
-
- versions = range(int(self.INIT_VERSION) + 1,
- int(self.REPOSITORY.latest) + 1)
-
- upgraded = [mock.call(v, with_data=True) for v in versions]
- self.assertEqual(upgraded, self.migrate_up.call_args_list)
-
-
class ModelsMigrationSyncMixin(db_test_base._DbTestCase):
def setUp(self):
diff --git a/oslo_db/tests/sqlalchemy/test_utils.py b/oslo_db/tests/sqlalchemy/test_utils.py
index 6b75796..8fe11c5 100644
--- a/oslo_db/tests/sqlalchemy/test_utils.py
+++ b/oslo_db/tests/sqlalchemy/test_utils.py
@@ -18,7 +18,6 @@ from urllib import parse
import fixtures
import sqlalchemy
-from sqlalchemy.dialects import mysql
from sqlalchemy import Boolean, Index, Integer, DateTime, String
from sqlalchemy import CheckConstraint
from sqlalchemy import MetaData, Table, Column
@@ -782,129 +781,6 @@ class TestMigrationUtils(db_test_base._DbTestCase):
for value in soft_deleted_values:
self.assertIn(value['id'], deleted_rows_ids)
- def test_change_deleted_column_type_does_not_drop_index(self):
- table_name = 'abc'
-
- indexes = {
- 'idx_a_deleted': ['a', 'deleted'],
- 'idx_b_deleted': ['b', 'deleted'],
- 'idx_a': ['a']
- }
-
- index_instances = [Index(name, *columns)
- for name, columns in indexes.items()]
-
- table = Table(table_name, self.meta,
- Column('id', Integer, primary_key=True),
- Column('a', String(255)),
- Column('b', String(255)),
- Column('deleted', Boolean),
- *index_instances)
- table.create(self.engine)
- utils.change_deleted_column_type_to_id_type(self.engine, table_name)
- utils.change_deleted_column_type_to_boolean(self.engine, table_name)
-
- insp = sqlalchemy.inspect(self.engine)
- real_indexes = insp.get_indexes(table_name)
- self.assertEqual(3, len(real_indexes))
- for index in real_indexes:
- name = index['name']
- self.assertIn(name, indexes)
- self.assertEqual(set(indexes[name]),
- set(index['column_names']))
-
- def test_change_deleted_column_type_to_id_type_integer(self):
- table_name = 'abc'
- table = Table(table_name, self.meta,
- Column('id', Integer, primary_key=True),
- Column('deleted', Boolean))
- table.create(self.engine)
- utils.change_deleted_column_type_to_id_type(self.engine, table_name)
-
- table = utils.get_table(self.engine, table_name)
- self.assertIsInstance(table.c.deleted.type, Integer)
-
- def test_change_deleted_column_type_to_id_type_string(self):
- table_name = 'abc'
- table = Table(table_name, self.meta,
- Column('id', String(255), primary_key=True),
- Column('deleted', Boolean))
- table.create(self.engine)
- utils.change_deleted_column_type_to_id_type(self.engine, table_name)
-
- table = utils.get_table(self.engine, table_name)
- self.assertIsInstance(table.c.deleted.type, String)
-
- @db_test_base.backend_specific('sqlite')
- def test_change_deleted_column_type_to_id_type_custom(self):
- table_name = 'abc'
- table = Table(table_name, self.meta,
- Column('id', Integer, primary_key=True),
- Column('foo', CustomType),
- Column('deleted', Boolean))
- table.create(self.engine)
-
- fooColumn = Column('foo', CustomType())
- utils.change_deleted_column_type_to_id_type(self.engine, table_name,
- foo=fooColumn)
-
- table = utils.get_table(self.engine, table_name)
-
- self.assertIsInstance(table.c.deleted.type, Integer)
-
- def test_change_deleted_column_type_to_boolean(self):
- expected_types = {'mysql': mysql.TINYINT}
- table_name = 'abc'
- table = Table(table_name, self.meta,
- Column('id', Integer, primary_key=True),
- Column('deleted', Integer))
- table.create(self.engine)
-
- utils.change_deleted_column_type_to_boolean(self.engine, table_name)
-
- table = utils.get_table(self.engine, table_name)
- self.assertIsInstance(table.c.deleted.type,
- expected_types.get(self.engine.name, Boolean))
-
- def test_change_deleted_column_type_to_boolean_with_fc(self):
- expected_types = {'mysql': mysql.TINYINT}
- table_name_1 = 'abc'
- table_name_2 = 'bcd'
-
- table_1 = Table(table_name_1, self.meta,
- Column('id', Integer, primary_key=True),
- Column('deleted', Integer))
- table_1.create(self.engine)
-
- table_2 = Table(table_name_2, self.meta,
- Column('id', Integer, primary_key=True),
- Column('foreign_id', Integer,
- ForeignKey('%s.id' % table_name_1)),
- Column('deleted', Integer))
- table_2.create(self.engine)
-
- utils.change_deleted_column_type_to_boolean(self.engine, table_name_2)
-
- table = utils.get_table(self.engine, table_name_2)
- self.assertIsInstance(table.c.deleted.type,
- expected_types.get(self.engine.name, Boolean))
-
- @db_test_base.backend_specific('sqlite')
- def test_change_deleted_column_type_to_boolean_type_custom(self):
- table_name = 'abc'
- table = Table(table_name, self.meta,
- Column('id', Integer, primary_key=True),
- Column('foo', CustomType),
- Column('deleted', Integer))
- table.create(self.engine)
-
- fooColumn = Column('foo', CustomType())
- utils.change_deleted_column_type_to_boolean(self.engine, table_name,
- foo=fooColumn)
-
- table = utils.get_table(self.engine, table_name)
- self.assertIsInstance(table.c.deleted.type, Boolean)
-
def test_detect_boolean_deleted_constraint_detection(self):
table_name = 'abc'
table = Table(table_name, self.meta,
@@ -922,33 +798,6 @@ class TestMigrationUtils(db_test_base._DbTestCase):
)
)
- @db_test_base.backend_specific('sqlite')
- def test_change_deleted_column_type_sqlite_drops_check_constraint(self):
- table_name = 'abc'
- table = Table(table_name, self.meta,
- Column('id', Integer, primary_key=True),
- Column('deleted', Boolean))
- table.create(self.engine)
-
- utils._change_deleted_column_type_to_id_type_sqlite(self.engine,
- table_name)
- table = Table(table_name, self.meta, autoload_with=self.engine)
- # NOTE(I159): if the CHECK constraint has been dropped (expected
- # behavior), any integer value can be inserted, otherwise only 1 or 0.
- # NOTE(zzzeek): SQLAlchemy 1.2 Boolean type will disallow non 1/0
- # value here, 1.1 also coerces to "1/0" so use raw SQL to test the
- # constraint
- with self.engine.connect() as conn, conn.begin():
- conn.exec_driver_sql(
- "INSERT INTO abc (deleted) VALUES (?)",
- (10, ),
- )
-
- self.assertEqual(
- 10,
- conn.scalar(sql.text("SELECT deleted FROM abc")),
- )
-
def test_get_foreign_key_constraint_name(self):
table_1 = Table('table_name_1', self.meta,
Column('id', Integer, primary_key=True),