summaryrefslogtreecommitdiff
path: root/oslo_db/sqlalchemy/migration_cli/ext_alembic.py
blob: e44fbc83878120dcb2d936c97a283f71db044431 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
#    Licensed under the Apache License, Version 2.0 (the "License"); you may
#    not use this file except in compliance with the License. You may obtain
#    a copy of the License at
#
#         http://www.apache.org/licenses/LICENSE-2.0
#
#    Unless required by applicable law or agreed to in writing, software
#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
#    License for the specific language governing permissions and limitations
#    under the License.

import os

import alembic
from alembic import config as alembic_config
import alembic.migration as alembic_migration
from alembic import script as alembic_script

from oslo_db.sqlalchemy.migration_cli import ext_base


class AlembicExtension(ext_base.MigrationExtensionBase):
    """Extension to provide alembic features.

    :param engine: SQLAlchemy engine instance for a given database
    :type engine: sqlalchemy.engine.Engine
    :param migration_config: Stores specific configuration for migrations
    :type migration_config: dict
    """

    order = 2

    @property
    def enabled(self):
        return os.path.exists(self.alembic_ini_path)

    def __init__(self, engine, migration_config):
        self.alembic_ini_path = migration_config.get('alembic_ini_path', '')
        self.config = alembic_config.Config(self.alembic_ini_path)
        # TODO(viktors): Remove this, when we will use Alembic 0.7.5 or
        #                higher, because the ``attributes`` dictionary was
        #                added to Alembic in version 0.7.5.
        if not hasattr(self.config, 'attributes'):
            self.config.attributes = {}
        # option should be used if script is not in default directory
        repo_path = migration_config.get('alembic_repo_path')
        if repo_path:
            self.config.set_main_option('script_location', repo_path)
        self.engine = engine

    def upgrade(self, version):
        with self.engine.begin() as connection:
            self.config.attributes['connection'] = connection
            return alembic.command.upgrade(self.config, version or 'head')

    def downgrade(self, version):
        if isinstance(version, int) or version is None or version.isdigit():
            version = 'base'
        with self.engine.begin() as connection:
            self.config.attributes['connection'] = connection
            return alembic.command.downgrade(self.config, version)

    def version(self):
        with self.engine.connect() as conn:
            context = alembic_migration.MigrationContext.configure(conn)
            return context.get_current_revision()

    def revision(self, message='', autogenerate=False):
        """Creates template for migration.

        :param message: Text that will be used for migration title
        :type message: string
        :param autogenerate: If True - generates diff based on current database
                             state
        :type autogenerate: bool
        """
        with self.engine.begin() as connection:
            self.config.attributes['connection'] = connection
            return alembic.command.revision(self.config, message=message,
                                            autogenerate=autogenerate)

    def stamp(self, revision):
        """Stamps database with provided revision.

        :param revision: Should match one from repository or head - to stamp
                         database with most recent revision
        :type revision: string
        """
        with self.engine.begin() as connection:
            self.config.attributes['connection'] = connection
            return alembic.command.stamp(self.config, revision=revision)

    def has_revision(self, rev_id):
        if rev_id in ['base', 'head']:
            return True

        # Although alembic supports relative upgrades and downgrades,
        # get_revision always returns False for relative revisions.
        # Since only alembic supports relative revisions, assume the
        # revision belongs to this plugin.
        if rev_id:  # rev_id can be None, so the check is required
            if '-' in rev_id or '+' in rev_id:
                return True

        script = alembic_script.ScriptDirectory(
            self.config.get_main_option('script_location'))
        try:
            script.get_revision(rev_id)
            return True
        except alembic.util.CommandError:
            return False