# Copyright 2010 United States Government as represented by the # Administrator of the National Aeronautics and Space Administration. # All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. You may obtain # a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. """Base classes for our unit tests. Allows overriding of config for use of fakes, and some black magic for inline callbacks. """ import eventlet eventlet.monkey_patch(os=False) import copy import os import shutil import sys import fixtures import testtools from oslo.config import cfg from ironic.db.sqlalchemy import migration from ironic.db.sqlalchemy import models from ironic.common import paths from ironic.db.sqlalchemy import api as sqla_api from ironic.objects import base as objects_base from ironic.openstack.common import log as logging from ironic.tests import conf_fixture from ironic.tests import policy_fixture test_opts = [ cfg.StrOpt('sqlite_clean_db', default='clean.sqlite', help='File name of clean sqlite db.'), ] CONF = cfg.CONF CONF.register_opts(test_opts) CONF.import_opt('connection', 'ironic.openstack.common.db.options', group='database') CONF.import_opt('sqlite_db', 'ironic.openstack.common.db.options', group='database') CONF.set_override('use_stderr', False) logging.setup('ironic') _DB_CACHE = None class Database(fixtures.Fixture): def __init__(self, db_api, db_migrate, sql_connection, sqlite_db, sqlite_clean_db): self.sql_connection = sql_connection self.sqlite_db = sqlite_db self.sqlite_clean_db = sqlite_clean_db self.engine = db_api.get_engine() self.engine.dispose() conn = self.engine.connect() if sql_connection == "sqlite://": self.setup_sqlite(db_migrate) elif sql_connection.startswith('sqlite:///'): testdb = paths.state_path_rel(sqlite_db) if os.path.exists(testdb): return self.setup_sqlite(db_migrate) else: db_migrate.upgrade('head') self.post_migrations() if sql_connection == "sqlite://": conn = self.engine.connect() self._DB = "".join(line for line in conn.connection.iterdump()) self.engine.dispose() else: cleandb = paths.state_path_rel(sqlite_clean_db) shutil.copyfile(testdb, cleandb) def setup_sqlite(self, db_migrate): if db_migrate.version(): return models.Base.metadata.create_all(self.engine) db_migrate.stamp('head') def setUp(self): super(Database, self).setUp() if self.sql_connection == "sqlite://": conn = self.engine.connect() conn.connection.executescript(self._DB) self.addCleanup(self.engine.dispose) else: shutil.copyfile(paths.state_path_rel(self.sqlite_clean_db), paths.state_path_rel(self.sqlite_db)) self.addCleanup(os.unlink, self.sqlite_db) def post_migrations(self): """Any addition steps that are needed outside of the migrations.""" class ReplaceModule(fixtures.Fixture): """Replace a module with a fake module.""" def __init__(self, name, new_value): self.name = name self.new_value = new_value def _restore(self, old_value): sys.modules[self.name] = old_value def setUp(self): super(ReplaceModule, self).setUp() old_value = sys.modules.get(self.name) sys.modules[self.name] = self.new_value self.addCleanup(self._restore, old_value) class TestingException(Exception): pass class TestCase(testtools.TestCase): """Test case base class for all unit tests.""" def setUp(self): """Run before each test method to initialize test environment.""" super(TestCase, self).setUp() test_timeout = os.environ.get('OS_TEST_TIMEOUT', 0) try: test_timeout = int(test_timeout) except ValueError: # If timeout value is invalid do not set a timeout. test_timeout = 0 if test_timeout > 0: self.useFixture(fixtures.Timeout(test_timeout, gentle=True)) self.useFixture(fixtures.NestedTempfile()) self.useFixture(fixtures.TempHomeDir()) if (os.environ.get('OS_STDOUT_CAPTURE') == 'True' or os.environ.get('OS_STDOUT_CAPTURE') == '1'): stdout = self.useFixture(fixtures.StringStream('stdout')).stream self.useFixture(fixtures.MonkeyPatch('sys.stdout', stdout)) if (os.environ.get('OS_STDERR_CAPTURE') == 'True' or os.environ.get('OS_STDERR_CAPTURE') == '1'): stderr = self.useFixture(fixtures.StringStream('stderr')).stream self.useFixture(fixtures.MonkeyPatch('sys.stderr', stderr)) self.log_fixture = self.useFixture(fixtures.FakeLogger()) self.useFixture(conf_fixture.ConfFixture(CONF)) global _DB_CACHE if not _DB_CACHE: _DB_CACHE = Database(sqla_api, migration, sql_connection=CONF.database.connection, sqlite_db=CONF.database.sqlite_db, sqlite_clean_db=CONF.sqlite_clean_db) self.useFixture(_DB_CACHE) # NOTE(danms): Make sure to reset us back to non-remote objects # for each test to avoid interactions. Also, backup the object # registry objects_base.IronicObject.indirection_api = None self._base_test_obj_backup = copy.copy( objects_base.IronicObject._obj_classes) self.addCleanup(self._restore_obj_registry) self.addCleanup(self._clear_attrs) self.useFixture(fixtures.EnvironmentVariable('http_proxy')) self.policy = self.useFixture(policy_fixture.PolicyFixture()) CONF.set_override('fatal_exception_format_errors', True) def _restore_obj_registry(self): objects_base.IronicObject._obj_classes = self._base_test_obj_backup def _clear_attrs(self): # Delete attributes that don't start with _ so they don't pin # memory around unnecessarily for the duration of the test # suite for key in [k for k in self.__dict__.keys() if k[0] != '_']: del self.__dict__[key] def config(self, **kw): """Override config options for a test.""" group = kw.pop('group', None) for k, v in kw.iteritems(): CONF.set_override(k, v, group) def path_get(self, project_file=None): """Get the absolute path to a file. Used for testing the API. :param project_file: File whose path to return. Default: None. :returns: path to the specified file, or path to project root. """ root = os.path.abspath(os.path.join(os.path.dirname(__file__), '..', '..', ) ) if project_file: return os.path.join(root, project_file) else: return root