summaryrefslogtreecommitdiff
path: root/oslo/db/sqlalchemy/provision.py
diff options
context:
space:
mode:
Diffstat (limited to 'oslo/db/sqlalchemy/provision.py')
-rw-r--r--oslo/db/sqlalchemy/provision.py522
1 files changed, 438 insertions, 84 deletions
diff --git a/oslo/db/sqlalchemy/provision.py b/oslo/db/sqlalchemy/provision.py
index f1aa2cd..260cb5c 100644
--- a/oslo/db/sqlalchemy/provision.py
+++ b/oslo/db/sqlalchemy/provision.py
@@ -15,111 +15,459 @@
"""Provision test environment for specific DB backends"""
+import abc
import argparse
-import copy
import logging
import os
import random
+import re
import string
+import six
from six import moves
import sqlalchemy
+from sqlalchemy.engine import url as sa_url
-from oslo.db import exception as exc
-
+from oslo.db._i18n import _LI
+from oslo.db import exception
+from oslo.db.sqlalchemy import session
+from oslo.db.sqlalchemy import utils
LOG = logging.getLogger(__name__)
-def get_engine(uri):
- """Engine creation
+class ProvisionedDatabase(object):
+ """Represent a single database node that can be used for testing in
+
+ a serialized fashion.
+
+ ``ProvisionedDatabase`` includes features for full lifecycle management
+ of a node, in a way that is context-specific. Depending on how the
+ test environment runs, ``ProvisionedDatabase`` should know if it needs
+ to create and drop databases or if it is making use of a database that
+ is maintained by an external process.
- Call the function without arguments to get admin connection. Admin
- connection required to create temporary database for each
- particular test. Otherwise use existing connection to recreate
- connection to the temporary database.
"""
- return sqlalchemy.create_engine(uri, poolclass=sqlalchemy.pool.NullPool)
+ def __init__(self, database_type):
+ self.backend = Backend.backend_for_database_type(database_type)
+ self.db_token = _random_ident()
-def _execute_sql(engine, sql, driver):
- """Initialize connection, execute sql query and close it."""
- try:
- with engine.connect() as conn:
- if driver == 'postgresql':
- conn.connection.set_isolation_level(0)
- for s in sql:
- conn.execute(s)
- except sqlalchemy.exc.OperationalError:
- msg = ('%s does not match database admin '
- 'credentials or database does not exist.')
- LOG.exception(msg, engine.url)
- raise exc.DBConnectionError(msg % engine.url)
-
-
-def create_database(engine):
- """Provide temporary database for each particular test."""
- driver = engine.name
-
- database = ''.join(random.choice(string.ascii_lowercase)
- for i in moves.range(10))
-
- if driver == 'sqlite':
- database = '/tmp/%s' % database
- elif driver in ['mysql', 'postgresql']:
- sql = 'create database %s;' % database
- _execute_sql(engine, [sql], driver)
- else:
- raise ValueError('Unsupported RDBMS %s' % driver)
-
- # Both shallow and deep copies may lead to surprising behaviour
- # without knowing the implementation of sqlalchemy.engine.url.
- # Use a shallow copy here, since we're only overriding a single
- # property, invoking __str__ and then discarding our copy. This
- # is currently safe and _should_ remain safe into the future.
- new_url = copy.copy(engine.url)
-
- new_url.database = database
- return str(new_url)
-
-
-def drop_database(admin_engine, current_uri):
- """Drop temporary database after each particular test."""
-
- engine = get_engine(current_uri)
- driver = engine.name
-
- if driver == 'sqlite':
+ self.backend.create_named_database(self.db_token)
+ self.engine = self.backend.provisioned_engine(self.db_token)
+
+ def dispose(self):
+ self.engine.dispose()
+ self.backend.drop_named_database(self.db_token)
+
+
+class Backend(object):
+ """Represent a particular database backend that may be provisionable.
+
+ The ``Backend`` object maintains a database type (e.g. database without
+ specific driver type, such as "sqlite", "postgresql", etc.),
+ a target URL, a base ``Engine`` for that URL object that can be used
+ to provision databases and a ``BackendImpl`` which knows how to perform
+ operations against this type of ``Engine``.
+
+ """
+
+ backends_by_database_type = {}
+
+ def __init__(self, database_type, url):
+ self.database_type = database_type
+ self.url = url
+ self.verified = False
+ self.engine = None
+ self.impl = BackendImpl.impl(database_type)
+ Backend.backends_by_database_type[database_type] = self
+
+ @classmethod
+ def backend_for_database_type(cls, database_type):
+ """Return and verify the ``Backend`` for the given database type.
+
+ Creates the engine if it does not already exist and raises
+ ``BackendNotAvailable`` if it cannot be produced.
+
+ :return: a base ``Engine`` that allows provisioning of databases.
+
+ :raises: ``BackendNotAvailable``, if an engine for this backend
+ cannot be produced.
+
+ """
+ try:
+ backend = cls.backends_by_database_type[database_type]
+ except KeyError:
+ raise exception.BackendNotAvailable(database_type)
+ else:
+ return backend._verify()
+
+ @classmethod
+ def all_viable_backends(cls):
+ """Return an iterator of all ``Backend`` objects that are present
+
+ and provisionable.
+
+ """
+
+ for backend in cls.backends_by_database_type.values():
+ try:
+ yield backend._verify()
+ except exception.BackendNotAvailable:
+ pass
+
+ def _verify(self):
+ """Verify that this ``Backend`` is available and provisionable.
+
+ :return: this ``Backend``
+
+ :raises: ``BackendNotAvailable`` if the backend is not available.
+
+ """
+
+ if not self.verified:
+ try:
+ eng = self._ensure_backend_available(self.url)
+ except exception.BackendNotAvailable:
+ raise
+ else:
+ self.engine = eng
+ finally:
+ self.verified = True
+ if self.engine is None:
+ raise exception.BackendNotAvailable(self.database_type)
+ return self
+
+ @classmethod
+ def _ensure_backend_available(cls, url):
+ url = sa_url.make_url(str(url))
try:
- os.remove(engine.url.database)
- except OSError:
- pass
- elif driver in ['mysql', 'postgresql']:
- sql = 'drop database %s;' % engine.url.database
- _execute_sql(admin_engine, [sql], driver)
- else:
- raise ValueError('Unsupported RDBMS %s' % driver)
+ eng = sqlalchemy.create_engine(url)
+ except ImportError as i_e:
+ # SQLAlchemy performs an "import" of the DBAPI module
+ # within create_engine(). So if ibm_db_sa, cx_oracle etc.
+ # isn't installed, we get an ImportError here.
+ LOG.info(
+ _LI("The %(dbapi)s backend is unavailable: %(err)s"),
+ dict(dbapi=url.drivername, err=i_e))
+ raise exception.BackendNotAvailable("No DBAPI installed")
+ else:
+ try:
+ conn = eng.connect()
+ except sqlalchemy.exc.DBAPIError as d_e:
+ # upon connect, SQLAlchemy calls dbapi.connect(). This
+ # usually raises OperationalError and should always at
+ # least raise a SQLAlchemy-wrapped DBAPI Error.
+ LOG.info(
+ _LI("The %(dbapi)s backend is unavailable: %(err)s"),
+ dict(dbapi=url.drivername, err=d_e)
+ )
+ raise exception.BackendNotAvailable("Could not connect")
+ else:
+ conn.close()
+ return eng
+
+ def create_named_database(self, ident):
+ """Create a database with the given name."""
+
+ self.impl.create_named_database(self.engine, ident)
+
+ def drop_named_database(self, ident, conditional=False):
+ """Drop a database with the given name."""
+
+ self.impl.drop_named_database(
+ self.engine, ident,
+ conditional=conditional)
+
+ def database_exists(self, ident):
+ """Return True if a database of the given name exists."""
+
+ return self.impl.database_exists(self.engine, ident)
+
+ def provisioned_engine(self, ident):
+ """Given the URL of a particular database backend and the string
+
+ name of a particular 'database' within that backend, return
+ an Engine instance whose connections will refer directly to the
+ named database.
+
+ For hostname-based URLs, this typically involves switching just the
+ 'database' portion of the URL with the given name and creating
+ an engine.
+
+ For URLs that instead deal with DSNs, the rules may be more custom;
+ for example, the engine may need to connect to the root URL and
+ then emit a command to switch to the named database.
+
+ """
+ return self.impl.provisioned_engine(self.url, ident)
+
+ @classmethod
+ def _setup(cls):
+ """Initial startup feature will scan the environment for configured
+
+ URLs and place them into the list of URLs we will use for provisioning.
+
+ This searches through OS_TEST_DBAPI_ADMIN_CONNECTION for URLs. If
+ not present, we set up URLs based on the "opportunstic" convention,
+ e.g. username+password = "openstack_citest".
+
+ The provisioning system will then use or discard these URLs as they
+ are requested, based on whether or not the target database is actually
+ found to be available.
+
+ """
+ configured_urls = os.getenv('OS_TEST_DBAPI_ADMIN_CONNECTION', None)
+ if configured_urls:
+ configured_urls = configured_urls.split(";")
+ else:
+ configured_urls = [
+ impl.create_opportunistic_driver_url()
+ for impl in BackendImpl.all_impls()
+ ]
+
+ for url_str in configured_urls:
+ url = sa_url.make_url(url_str)
+ m = re.match(r'([^+]+?)(?:\+(.+))?$', url.drivername)
+ database_type, drivertype = m.group(1, 2)
+ Backend(database_type, url)
+
+
+@six.add_metaclass(abc.ABCMeta)
+class BackendImpl(object):
+ """Provide database-specific implementations of key provisioning
+
+ functions.
+
+ ``BackendImpl`` is owned by a ``Backend`` instance which delegates
+ to it for all database-specific features.
+
+ """
+
+ @classmethod
+ def all_impls(cls):
+ """Return an iterator of all possible BackendImpl objects.
+
+ These are BackendImpls that are implemented, but not
+ necessarily provisionable.
+
+ """
+ for database_type in cls.impl.reg:
+ if database_type == '*':
+ continue
+ yield BackendImpl.impl(database_type)
+ @utils.dispatch_for_dialect("*")
+ def impl(drivername):
+ """Return a ``BackendImpl`` instance corresponding to the
-def main():
- """Controller to handle commands
+ given driver name.
+
+ This is a dispatched method which will refer to the constructor
+ of implementing subclasses.
+
+ """
+ raise NotImplementedError(
+ "No provision impl available for driver: %s" % drivername)
+
+ def __init__(self, drivername):
+ self.drivername = drivername
+
+ @abc.abstractmethod
+ def create_opportunistic_driver_url(self):
+ """Produce a string url known as the 'opportunistic' URL.
+
+ This URL is one that corresponds to an established Openstack
+ convention for a pre-established database login, which, when
+ detected as available in the local environment, is automatically
+ used as a test platform for a specific type of driver.
+
+ """
+
+ @abc.abstractmethod
+ def create_named_database(self, engine, ident):
+ """Create a database with the given name."""
+
+ @abc.abstractmethod
+ def drop_named_database(self, engine, ident, conditional=False):
+ """Drop a database with the given name."""
+
+ def provisioned_engine(self, base_url, ident):
+ """Return a provisioned engine.
+
+ Given the URL of a particular database backend and the string
+ name of a particular 'database' within that backend, return
+ an Engine instance whose connections will refer directly to the
+ named database.
+
+ For hostname-based URLs, this typically involves switching just the
+ 'database' portion of the URL with the given name and creating
+ an engine.
+
+ For URLs that instead deal with DSNs, the rules may be more custom;
+ for example, the engine may need to connect to the root URL and
+ then emit a command to switch to the named database.
+
+ """
+
+ url = sa_url.make_url(str(base_url))
+ url.database = ident
+ return session.create_engine(
+ url,
+ logging_name="%s@%s" % (self.drivername, ident))
+
+
+@BackendImpl.impl.dispatch_for("mysql")
+class MySQLBackendImpl(BackendImpl):
+ def create_opportunistic_driver_url(self):
+ return "mysql://openstack_citest:openstack_citest@localhost/"
+
+ def create_named_database(self, engine, ident):
+ with engine.connect() as conn:
+ conn.execute("CREATE DATABASE %s" % ident)
+
+ def drop_named_database(self, engine, ident, conditional=False):
+ with engine.connect() as conn:
+ if not conditional or self.database_exists(conn, ident):
+ conn.execute("DROP DATABASE %s" % ident)
+
+ def database_exists(self, engine, ident):
+ return bool(engine.scalar("SHOW DATABASES LIKE '%s'" % ident))
+
+
+@BackendImpl.impl.dispatch_for("sqlite")
+class SQLiteBackendImpl(BackendImpl):
+ def create_opportunistic_driver_url(self):
+ return "sqlite://"
+
+ def create_named_database(self, engine, ident):
+ url = self._provisioned_database_url(engine.url, ident)
+ eng = sqlalchemy.create_engine(url)
+ eng.connect().close()
+
+ def provisioned_engine(self, base_url, ident):
+ return session.create_engine(
+ self._provisioned_database_url(base_url, ident))
+
+ def drop_named_database(self, engine, ident, conditional=False):
+ url = self._provisioned_database_url(engine.url, ident)
+ filename = url.database
+ if filename and (not conditional or os.access(filename, os.F_OK)):
+ os.remove(filename)
+
+ def database_exists(self, engine, ident):
+ url = self._provisioned_database_url(engine.url, ident)
+ filename = url.database
+ return not filename or os.access(filename, os.F_OK)
+
+ def _provisioned_database_url(self, base_url, ident):
+ if base_url.database:
+ return sa_url.make_url("sqlite:////tmp/%s.db" % ident)
+ else:
+ return base_url
+
+
+@BackendImpl.impl.dispatch_for("postgresql")
+class PostgresqlBackendImpl(BackendImpl):
+ def create_opportunistic_driver_url(self):
+ return "postgresql://openstack_citest:openstack_citest"\
+ "@localhost/postgres"
+
+ def create_named_database(self, engine, ident):
+ with engine.connect().execution_options(
+ isolation_level="AUTOCOMMIT") as conn:
+ conn.execute("CREATE DATABASE %s" % ident)
+
+ def drop_named_database(self, engine, ident, conditional=False):
+ with engine.connect().execution_options(
+ isolation_level="AUTOCOMMIT") as conn:
+ self._close_out_database_users(conn, ident)
+ if conditional:
+ conn.execute("DROP DATABASE IF EXISTS %s" % ident)
+ else:
+ conn.execute("DROP DATABASE %s" % ident)
+
+ def database_exists(self, engine, ident):
+ return bool(
+ engine.scalar(
+ sqlalchemy.text(
+ "select datname from pg_database "
+ "where datname=:name"), name=ident)
+ )
+
+ def _close_out_database_users(self, conn, ident):
+ """Attempt to guarantee a database can be dropped.
+
+ Optional feature which guarantees no connections with our
+ username are attached to the DB we're going to drop.
+
+ This method has caveats; for one, the 'pid' column was named
+ 'procpid' prior to Postgresql 9.2. But more critically,
+ prior to 9.2 this operation required superuser permissions,
+ even if the connections we're closing are under the same username
+ as us. In more recent versions this restriction has been
+ lifted for same-user connections.
+
+ """
+ if conn.dialect.server_version_info >= (9, 2):
+ conn.execute(
+ sqlalchemy.text(
+ "select pg_terminate_backend(pid) "
+ "from pg_stat_activity "
+ "where usename=current_user and "
+ "pid != pg_backend_pid() "
+ "and datname=:dname"
+ ), dname=ident)
+
+
+def _random_ident():
+ return ''.join(
+ random.choice(string.ascii_lowercase)
+ for i in moves.range(10))
+
+
+def _echo_cmd(args):
+ idents = [_random_ident() for i in moves.range(args.instances_count)]
+ print("\n".join(idents))
+
+
+def _create_cmd(args):
+ idents = [_random_ident() for i in moves.range(args.instances_count)]
+
+ for backend in Backend.all_viable_backends():
+ for ident in idents:
+ backend.create_named_database(ident)
+
+ print("\n".join(idents))
+
+
+def _drop_cmd(args):
+ for backend in Backend.all_viable_backends():
+ for ident in args.instances:
+ backend.drop_named_database(ident, args.conditional)
+
+Backend._setup()
+
+
+def main(argv=None):
+ """Command line interface to create/drop databases.
::create: Create test database with random names.
::drop: Drop database created by previous command.
+ ::echo: create random names and display them; don't create.
"""
parser = argparse.ArgumentParser(
description='Controller to handle database creation and dropping'
' commands.',
- epilog='Under normal circumstances is not used directly.'
- ' Used in .testr.conf to automate test database creation'
- ' and dropping processes.')
+ epilog='Typically called by the test runner, e.g. shell script, '
+ 'testr runner via .testr.conf, or other system.')
subparsers = parser.add_subparsers(
help='Subcommands to manipulate temporary test databases.')
create = subparsers.add_parser(
'create',
help='Create temporary test databases.')
- create.set_defaults(which='create')
+ create.set_defaults(which=_create_cmd)
create.add_argument(
'instances_count',
type=int,
@@ -128,25 +476,31 @@ def main():
drop = subparsers.add_parser(
'drop',
help='Drop temporary test databases.')
- drop.set_defaults(which='drop')
+ drop.set_defaults(which=_drop_cmd)
drop.add_argument(
'instances',
nargs='+',
help='List of databases uri to be dropped.')
+ drop.add_argument(
+ '--conditional',
+ action="store_true",
+ help="Check if database exists first before dropping"
+ )
- args = parser.parse_args()
+ echo = subparsers.add_parser(
+ 'echo',
+ help="Create random database names and display only."
+ )
+ echo.set_defaults(which=_echo_cmd)
+ echo.add_argument(
+ 'instances_count',
+ type=int,
+ help='Number of identifiers to create.')
- connection_string = os.getenv('OS_TEST_DBAPI_ADMIN_CONNECTION',
- 'sqlite://')
- engine = get_engine(connection_string)
- which = args.which
+ args = parser.parse_args(argv)
- if which == "create":
- for i in range(int(args.instances_count)):
- print(create_database(engine))
- elif which == "drop":
- for db in args.instances:
- drop_database(engine, db)
+ cmd = args.which
+ cmd(args)
if __name__ == "__main__":