summaryrefslogtreecommitdiff
path: root/oslo/db/sqlalchemy/provision.py
blob: 315d599b960f9ecff4a56e7c03160587698abd94 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
# Copyright 2013 Mirantis.inc
# All Rights Reserved.
#
#    Licensed under the Apache License, Version 2.0 (the "License"); you may
#    not use this file except in compliance with the License. You may obtain
#    a copy of the License at
#
#         http://www.apache.org/licenses/LICENSE-2.0
#
#    Unless required by applicable law or agreed to in writing, software
#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
#    License for the specific language governing permissions and limitations
#    under the License.

"""Provision test environment for specific DB backends"""

import argparse
import logging
import os
import random
import string

from six import moves
import sqlalchemy

from oslo.db import exception as exc


LOG = logging.getLogger(__name__)


def get_engine(uri):
    """Engine creation

    Call the function without arguments to get admin connection. Admin
    connection required to create temporary user and database for each
    particular test. Otherwise use existing connection to recreate connection
    to the temporary database.
    """
    return sqlalchemy.create_engine(uri, poolclass=sqlalchemy.pool.NullPool)


def _execute_sql(engine, sql, driver):
    """Initialize connection, execute sql query and close it."""
    try:
        with engine.connect() as conn:
            if driver == 'postgresql':
                conn.connection.set_isolation_level(0)
            for s in sql:
                conn.execute(s)
    except sqlalchemy.exc.OperationalError:
        msg = ('%s does not match database admin '
               'credentials or database does not exist.')
        LOG.exception(msg, engine.url)
        raise exc.DBConnectionError(msg % engine.url)


def create_database(engine):
    """Provide temporary user and database for each particular test."""
    driver = engine.name

    auth = {
        'database': ''.join(random.choice(string.ascii_lowercase)
                            for i in moves.range(10)),
        'user': engine.url.username,
        'passwd': engine.url.password,
    }

    if driver == 'sqlite':
        return 'sqlite:////tmp/%s' % auth['database']
    elif driver in ['mysql', 'postgresql']:
        sql = 'create database %s;' % auth['database']
        _execute_sql(engine, [sql], driver)
    else:
        raise ValueError('Unsupported RDBMS %s' % driver)

    params = auth.copy()
    params['backend'] = driver
    return "%(backend)s://%(user)s:%(passwd)s@localhost/%(database)s" % params


def drop_database(admin_engine, current_uri):
    """Drop temporary database and user after each particular test."""

    engine = get_engine(current_uri)
    driver = engine.name

    if driver == 'sqlite':
        try:
            os.remove(engine.url.database)
        except OSError:
            pass
    elif driver in ['mysql', 'postgresql']:
        sql = 'drop database %s;' % engine.url.database
        _execute_sql(admin_engine, [sql], driver)
    else:
        raise ValueError('Unsupported RDBMS %s' % driver)


def main():
    """Controller to handle commands

    ::create: Create test user and database with random names.
    ::drop: Drop user and database created by previous command.
    """
    parser = argparse.ArgumentParser(
        description='Controller to handle database creation and dropping'
        ' commands.',
        epilog='Under normal circumstances is not used directly.'
        ' Used in .testr.conf to automate test database creation'
        ' and dropping processes.')
    subparsers = parser.add_subparsers(
        help='Subcommands to manipulate temporary test databases.')

    create = subparsers.add_parser(
        'create',
        help='Create temporary test '
        'databases and users.')
    create.set_defaults(which='create')
    create.add_argument(
        'instances_count',
        type=int,
        help='Number of databases to create.')

    drop = subparsers.add_parser(
        'drop',
        help='Drop temporary test databases and users.')
    drop.set_defaults(which='drop')
    drop.add_argument(
        'instances',
        nargs='+',
        help='List of databases uri to be dropped.')

    args = parser.parse_args()

    connection_string = os.getenv('OS_TEST_DBAPI_ADMIN_CONNECTION',
                                  'sqlite://')
    engine = get_engine(connection_string)
    which = args.which

    if which == "create":
        for i in range(int(args.instances_count)):
            print(create_database(engine))
    elif which == "drop":
        for db in args.instances:
            drop_database(engine, db)


if __name__ == "__main__":
    main()