summaryrefslogtreecommitdiff
path: root/nova/tests/virt/test_virt_drivers.py
diff options
context:
space:
mode:
Diffstat (limited to 'nova/tests/virt/test_virt_drivers.py')
-rw-r--r--nova/tests/virt/test_virt_drivers.py879
1 files changed, 0 insertions, 879 deletions
diff --git a/nova/tests/virt/test_virt_drivers.py b/nova/tests/virt/test_virt_drivers.py
deleted file mode 100644
index e4136d6eed..0000000000
--- a/nova/tests/virt/test_virt_drivers.py
+++ /dev/null
@@ -1,879 +0,0 @@
-# Copyright 2010 OpenStack Foundation
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-import base64
-import sys
-import traceback
-
-import fixtures
-import mock
-import netaddr
-from oslo.serialization import jsonutils
-from oslo.utils import importutils
-from oslo.utils import timeutils
-import six
-
-from nova.compute import manager
-from nova.console import type as ctype
-from nova import exception
-from nova import objects
-from nova.openstack.common import log as logging
-from nova import test
-from nova.tests import fake_block_device
-from nova.tests.image import fake as fake_image
-from nova.tests import utils as test_utils
-from nova.tests.virt.libvirt import fake_libvirt_utils
-from nova.virt import block_device as driver_block_device
-from nova.virt import event as virtevent
-from nova.virt import fake
-from nova.virt import libvirt
-from nova.virt.libvirt import imagebackend
-
-LOG = logging.getLogger(__name__)
-
-
-def catch_notimplementederror(f):
- """Decorator to simplify catching drivers raising NotImplementedError
-
- If a particular call makes a driver raise NotImplementedError, we
- log it so that we can extract this information afterwards as needed.
- """
- def wrapped_func(self, *args, **kwargs):
- try:
- return f(self, *args, **kwargs)
- except NotImplementedError:
- frame = traceback.extract_tb(sys.exc_info()[2])[-1]
- LOG.error("%(driver)s does not implement %(method)s "
- "required for test %(test)s" %
- {'driver': type(self.connection),
- 'method': frame[2], 'test': f.__name__})
-
- wrapped_func.__name__ = f.__name__
- wrapped_func.__doc__ = f.__doc__
- return wrapped_func
-
-
-class _FakeDriverBackendTestCase(object):
- def _setup_fakelibvirt(self):
- # So that the _supports_direct_io does the test based
- # on the current working directory, instead of the
- # default instances_path which doesn't exist
- self.flags(instances_path=self.useFixture(fixtures.TempDir()).path)
-
- # Put fakelibvirt in place
- if 'libvirt' in sys.modules:
- self.saved_libvirt = sys.modules['libvirt']
- else:
- self.saved_libvirt = None
-
- import nova.tests.virt.libvirt.fake_imagebackend as fake_imagebackend
- import nova.tests.virt.libvirt.fake_libvirt_utils as fake_libvirt_utils
- import nova.tests.virt.libvirt.fakelibvirt as fakelibvirt
-
- sys.modules['libvirt'] = fakelibvirt
- import nova.virt.libvirt.driver
- import nova.virt.libvirt.firewall
-
- self.useFixture(fixtures.MonkeyPatch(
- 'nova.virt.libvirt.driver.imagebackend',
- fake_imagebackend))
- self.useFixture(fixtures.MonkeyPatch(
- 'nova.virt.libvirt.driver.libvirt',
- fakelibvirt))
- self.useFixture(fixtures.MonkeyPatch(
- 'nova.virt.libvirt.driver.libvirt_utils',
- fake_libvirt_utils))
- self.useFixture(fixtures.MonkeyPatch(
- 'nova.virt.libvirt.imagebackend.libvirt_utils',
- fake_libvirt_utils))
- self.useFixture(fixtures.MonkeyPatch(
- 'nova.virt.libvirt.firewall.libvirt',
- fakelibvirt))
-
- self.flags(rescue_image_id="2",
- rescue_kernel_id="3",
- rescue_ramdisk_id=None,
- snapshots_directory='./',
- group='libvirt')
-
- def fake_extend(image, size):
- pass
-
- def fake_migrateToURI(*a):
- pass
-
- def fake_make_drive(_self, _path):
- pass
-
- def fake_get_instance_disk_info(_self, instance, xml=None,
- block_device_info=None):
- return '[]'
-
- def fake_delete_instance_files(_self, _instance):
- pass
-
- self.stubs.Set(nova.virt.libvirt.driver.LibvirtDriver,
- '_get_instance_disk_info',
- fake_get_instance_disk_info)
-
- self.stubs.Set(nova.virt.libvirt.driver.disk,
- 'extend', fake_extend)
-
- self.stubs.Set(nova.virt.libvirt.driver.LibvirtDriver,
- '_delete_instance_files',
- fake_delete_instance_files)
-
- # Like the existing fakelibvirt.migrateToURI, do nothing,
- # but don't fail for these tests.
- self.stubs.Set(nova.virt.libvirt.driver.libvirt.Domain,
- 'migrateToURI', fake_migrateToURI)
-
- # We can't actually make a config drive v2 because ensure_tree has
- # been faked out
- self.stubs.Set(nova.virt.configdrive.ConfigDriveBuilder,
- 'make_drive', fake_make_drive)
-
- def _teardown_fakelibvirt(self):
- # Restore libvirt
- if self.saved_libvirt:
- sys.modules['libvirt'] = self.saved_libvirt
-
- def setUp(self):
- super(_FakeDriverBackendTestCase, self).setUp()
- # TODO(sdague): it would be nice to do this in a way that only
- # the relevant backends where replaced for tests, though this
- # should not harm anything by doing it for all backends
- fake_image.stub_out_image_service(self.stubs)
- self._setup_fakelibvirt()
-
- def tearDown(self):
- fake_image.FakeImageService_reset()
- self._teardown_fakelibvirt()
- super(_FakeDriverBackendTestCase, self).tearDown()
-
-
-class VirtDriverLoaderTestCase(_FakeDriverBackendTestCase, test.TestCase):
- """Test that ComputeManager can successfully load both
- old style and new style drivers and end up with the correct
- final class.
- """
-
- # if your driver supports being tested in a fake way, it can go here
- #
- # both long form and short form drivers are supported
- new_drivers = {
- 'nova.virt.fake.FakeDriver': 'FakeDriver',
- 'nova.virt.libvirt.LibvirtDriver': 'LibvirtDriver',
- 'fake.FakeDriver': 'FakeDriver',
- 'libvirt.LibvirtDriver': 'LibvirtDriver'
- }
-
- def test_load_new_drivers(self):
- for cls, driver in self.new_drivers.iteritems():
- self.flags(compute_driver=cls)
- # NOTE(sdague) the try block is to make it easier to debug a
- # failure by knowing which driver broke
- try:
- cm = manager.ComputeManager()
- except Exception as e:
- self.fail("Couldn't load driver %s - %s" % (cls, e))
-
- self.assertEqual(cm.driver.__class__.__name__, driver,
- "Could't load driver %s" % cls)
-
- def test_fail_to_load_new_drivers(self):
- self.flags(compute_driver='nova.virt.amiga')
-
- def _fake_exit(error):
- raise test.TestingException()
-
- self.stubs.Set(sys, 'exit', _fake_exit)
- self.assertRaises(test.TestingException, manager.ComputeManager)
-
-
-class _VirtDriverTestCase(_FakeDriverBackendTestCase):
- def setUp(self):
- super(_VirtDriverTestCase, self).setUp()
-
- self.flags(instances_path=self.useFixture(fixtures.TempDir()).path)
- self.connection = importutils.import_object(self.driver_module,
- fake.FakeVirtAPI())
- self.ctxt = test_utils.get_test_admin_context()
- self.image_service = fake_image.FakeImageService()
- # NOTE(dripton): resolve_driver_format does some file reading and
- # writing and chowning that complicate testing too much by requiring
- # using real directories with proper permissions. Just stub it out
- # here; we test it in test_imagebackend.py
- self.stubs.Set(imagebackend.Image, 'resolve_driver_format',
- imagebackend.Image._get_driver_format)
-
- def _get_running_instance(self, obj=True):
- instance_ref = test_utils.get_test_instance(obj=obj)
- network_info = test_utils.get_test_network_info()
- network_info[0]['network']['subnets'][0]['meta']['dhcp_server'] = \
- '1.1.1.1'
- image_info = test_utils.get_test_image_info(None, instance_ref)
- self.connection.spawn(self.ctxt, instance_ref, image_info,
- [], 'herp', network_info=network_info)
- return instance_ref, network_info
-
- @catch_notimplementederror
- def test_init_host(self):
- self.connection.init_host('myhostname')
-
- @catch_notimplementederror
- def test_list_instances(self):
- self.connection.list_instances()
-
- @catch_notimplementederror
- def test_list_instance_uuids(self):
- self.connection.list_instance_uuids()
-
- @catch_notimplementederror
- def test_spawn(self):
- instance_ref, network_info = self._get_running_instance()
- domains = self.connection.list_instances()
- self.assertIn(instance_ref['name'], domains)
-
- num_instances = self.connection.get_num_instances()
- self.assertEqual(1, num_instances)
-
- @catch_notimplementederror
- def test_snapshot_not_running(self):
- instance_ref = test_utils.get_test_instance()
- img_ref = self.image_service.create(self.ctxt, {'name': 'snap-1'})
- self.assertRaises(exception.InstanceNotRunning,
- self.connection.snapshot,
- self.ctxt, instance_ref, img_ref['id'],
- lambda *args, **kwargs: None)
-
- @catch_notimplementederror
- def test_snapshot_running(self):
- img_ref = self.image_service.create(self.ctxt, {'name': 'snap-1'})
- instance_ref, network_info = self._get_running_instance()
- self.connection.snapshot(self.ctxt, instance_ref, img_ref['id'],
- lambda *args, **kwargs: None)
-
- @catch_notimplementederror
- def test_post_interrupted_snapshot_cleanup(self):
- instance_ref, network_info = self._get_running_instance()
- self.connection.post_interrupted_snapshot_cleanup(self.ctxt,
- instance_ref)
-
- @catch_notimplementederror
- def test_reboot(self):
- reboot_type = "SOFT"
- instance_ref, network_info = self._get_running_instance()
- self.connection.reboot(self.ctxt, instance_ref, network_info,
- reboot_type)
-
- @catch_notimplementederror
- def test_get_host_ip_addr(self):
- host_ip = self.connection.get_host_ip_addr()
-
- # Will raise an exception if it's not a valid IP at all
- ip = netaddr.IPAddress(host_ip)
-
- # For now, assume IPv4.
- self.assertEqual(ip.version, 4)
-
- @catch_notimplementederror
- def test_set_admin_password(self):
- instance, network_info = self._get_running_instance(obj=True)
- self.connection.set_admin_password(instance, 'p4ssw0rd')
-
- @catch_notimplementederror
- def test_inject_file(self):
- instance_ref, network_info = self._get_running_instance()
- self.connection.inject_file(instance_ref,
- base64.b64encode('/testfile'),
- base64.b64encode('testcontents'))
-
- @catch_notimplementederror
- def test_resume_state_on_host_boot(self):
- instance_ref, network_info = self._get_running_instance()
- self.connection.resume_state_on_host_boot(self.ctxt, instance_ref,
- network_info)
-
- @catch_notimplementederror
- def test_rescue(self):
- instance_ref, network_info = self._get_running_instance()
- self.connection.rescue(self.ctxt, instance_ref, network_info, None, '')
-
- @catch_notimplementederror
- def test_unrescue_unrescued_instance(self):
- instance_ref, network_info = self._get_running_instance()
- self.connection.unrescue(instance_ref, network_info)
-
- @catch_notimplementederror
- def test_unrescue_rescued_instance(self):
- instance_ref, network_info = self._get_running_instance()
- self.connection.rescue(self.ctxt, instance_ref, network_info, None, '')
- self.connection.unrescue(instance_ref, network_info)
-
- @catch_notimplementederror
- def test_poll_rebooting_instances(self):
- instances = [self._get_running_instance()]
- self.connection.poll_rebooting_instances(10, instances)
-
- @catch_notimplementederror
- def test_migrate_disk_and_power_off(self):
- instance_ref, network_info = self._get_running_instance()
- flavor_ref = test_utils.get_test_flavor()
- self.connection.migrate_disk_and_power_off(
- self.ctxt, instance_ref, 'dest_host', flavor_ref,
- network_info)
-
- @catch_notimplementederror
- def test_power_off(self):
- instance_ref, network_info = self._get_running_instance()
- self.connection.power_off(instance_ref)
-
- @catch_notimplementederror
- def test_power_on_running(self):
- instance_ref, network_info = self._get_running_instance()
- self.connection.power_on(self.ctxt, instance_ref,
- network_info, None)
-
- @catch_notimplementederror
- def test_power_on_powered_off(self):
- instance_ref, network_info = self._get_running_instance()
- self.connection.power_off(instance_ref)
- self.connection.power_on(self.ctxt, instance_ref, network_info, None)
-
- @catch_notimplementederror
- def test_soft_delete(self):
- instance_ref, network_info = self._get_running_instance(obj=True)
- self.connection.soft_delete(instance_ref)
-
- @catch_notimplementederror
- def test_restore_running(self):
- instance_ref, network_info = self._get_running_instance()
- self.connection.restore(instance_ref)
-
- @catch_notimplementederror
- def test_restore_soft_deleted(self):
- instance_ref, network_info = self._get_running_instance()
- self.connection.soft_delete(instance_ref)
- self.connection.restore(instance_ref)
-
- @catch_notimplementederror
- def test_pause(self):
- instance_ref, network_info = self._get_running_instance()
- self.connection.pause(instance_ref)
-
- @catch_notimplementederror
- def test_unpause_unpaused_instance(self):
- instance_ref, network_info = self._get_running_instance()
- self.connection.unpause(instance_ref)
-
- @catch_notimplementederror
- def test_unpause_paused_instance(self):
- instance_ref, network_info = self._get_running_instance()
- self.connection.pause(instance_ref)
- self.connection.unpause(instance_ref)
-
- @catch_notimplementederror
- def test_suspend(self):
- instance_ref, network_info = self._get_running_instance()
- self.connection.suspend(instance_ref)
-
- @catch_notimplementederror
- def test_resume_unsuspended_instance(self):
- instance_ref, network_info = self._get_running_instance()
- self.connection.resume(self.ctxt, instance_ref, network_info)
-
- @catch_notimplementederror
- def test_resume_suspended_instance(self):
- instance_ref, network_info = self._get_running_instance()
- self.connection.suspend(instance_ref)
- self.connection.resume(self.ctxt, instance_ref, network_info)
-
- @catch_notimplementederror
- def test_destroy_instance_nonexistent(self):
- fake_instance = {'id': 42, 'name': 'I just made this up!',
- 'uuid': 'bda5fb9e-b347-40e8-8256-42397848cb00'}
- network_info = test_utils.get_test_network_info()
- self.connection.destroy(self.ctxt, fake_instance, network_info)
-
- @catch_notimplementederror
- def test_destroy_instance(self):
- instance_ref, network_info = self._get_running_instance()
- self.assertIn(instance_ref['name'],
- self.connection.list_instances())
- self.connection.destroy(self.ctxt, instance_ref, network_info)
- self.assertNotIn(instance_ref['name'],
- self.connection.list_instances())
-
- @catch_notimplementederror
- def test_get_volume_connector(self):
- result = self.connection.get_volume_connector({'id': 'fake'})
- self.assertIn('ip', result)
- self.assertIn('initiator', result)
- self.assertIn('host', result)
-
- @catch_notimplementederror
- def test_attach_detach_volume(self):
- instance_ref, network_info = self._get_running_instance()
- connection_info = {
- "driver_volume_type": "fake",
- "serial": "fake_serial",
- "data": {}
- }
- self.assertIsNone(
- self.connection.attach_volume(None, connection_info, instance_ref,
- '/dev/sda'))
- self.assertIsNone(
- self.connection.detach_volume(connection_info, instance_ref,
- '/dev/sda'))
-
- @catch_notimplementederror
- def test_swap_volume(self):
- instance_ref, network_info = self._get_running_instance()
- self.assertIsNone(
- self.connection.attach_volume(None, {'driver_volume_type': 'fake',
- 'data': {}},
- instance_ref,
- '/dev/sda'))
- self.assertIsNone(
- self.connection.swap_volume({'driver_volume_type': 'fake',
- 'data': {}},
- {'driver_volume_type': 'fake',
- 'data': {}},
- instance_ref,
- '/dev/sda', 2))
-
- @catch_notimplementederror
- def test_attach_detach_different_power_states(self):
- instance_ref, network_info = self._get_running_instance()
- connection_info = {
- "driver_volume_type": "fake",
- "serial": "fake_serial",
- "data": {}
- }
- self.connection.power_off(instance_ref)
- self.connection.attach_volume(None, connection_info, instance_ref,
- '/dev/sda')
-
- bdm = {
- 'root_device_name': None,
- 'swap': None,
- 'ephemerals': [],
- 'block_device_mapping': driver_block_device.convert_volumes([
- fake_block_device.FakeDbBlockDeviceDict(
- {'id': 1, 'instance_uuid': instance_ref['uuid'],
- 'device_name': '/dev/sda',
- 'source_type': 'volume',
- 'destination_type': 'volume',
- 'delete_on_termination': False,
- 'snapshot_id': None,
- 'volume_id': 'abcdedf',
- 'volume_size': None,
- 'no_device': None
- }),
- ])
- }
- bdm['block_device_mapping'][0]['connection_info'] = (
- {'driver_volume_type': 'fake', 'data': {}})
- with mock.patch.object(
- driver_block_device.DriverVolumeBlockDevice, 'save'):
- self.connection.power_on(
- self.ctxt, instance_ref, network_info, bdm)
- self.connection.detach_volume(connection_info,
- instance_ref,
- '/dev/sda')
-
- @catch_notimplementederror
- def test_get_info(self):
- instance_ref, network_info = self._get_running_instance()
- info = self.connection.get_info(instance_ref)
- self.assertIn('state', info)
- self.assertIn('max_mem', info)
- self.assertIn('mem', info)
- self.assertIn('num_cpu', info)
- self.assertIn('cpu_time', info)
-
- @catch_notimplementederror
- def test_get_info_for_unknown_instance(self):
- self.assertRaises(exception.NotFound,
- self.connection.get_info,
- {'name': 'I just made this name up'})
-
- @catch_notimplementederror
- def test_get_diagnostics(self):
- instance_ref, network_info = self._get_running_instance(obj=True)
- self.connection.get_diagnostics(instance_ref)
-
- @catch_notimplementederror
- def test_get_instance_diagnostics(self):
- instance_ref, network_info = self._get_running_instance(obj=True)
- instance_ref['launched_at'] = timeutils.utcnow()
- self.connection.get_instance_diagnostics(instance_ref)
-
- @catch_notimplementederror
- def test_block_stats(self):
- instance_ref, network_info = self._get_running_instance()
- stats = self.connection.block_stats(instance_ref['name'], 'someid')
- self.assertEqual(len(stats), 5)
-
- @catch_notimplementederror
- def test_interface_stats(self):
- instance_ref, network_info = self._get_running_instance()
- stats = self.connection.interface_stats(instance_ref['name'], 'someid')
- self.assertEqual(len(stats), 8)
-
- @catch_notimplementederror
- def test_get_console_output(self):
- fake_libvirt_utils.files['dummy.log'] = ''
- instance_ref, network_info = self._get_running_instance()
- console_output = self.connection.get_console_output(self.ctxt,
- instance_ref)
- self.assertIsInstance(console_output, six.string_types)
-
- @catch_notimplementederror
- def test_get_vnc_console(self):
- instance, network_info = self._get_running_instance(obj=True)
- vnc_console = self.connection.get_vnc_console(self.ctxt, instance)
- self.assertIsInstance(vnc_console, ctype.ConsoleVNC)
-
- @catch_notimplementederror
- def test_get_spice_console(self):
- instance_ref, network_info = self._get_running_instance()
- spice_console = self.connection.get_spice_console(self.ctxt,
- instance_ref)
- self.assertIsInstance(spice_console, ctype.ConsoleSpice)
-
- @catch_notimplementederror
- def test_get_rdp_console(self):
- instance_ref, network_info = self._get_running_instance()
- rdp_console = self.connection.get_rdp_console(self.ctxt, instance_ref)
- self.assertIsInstance(rdp_console, ctype.ConsoleRDP)
-
- @catch_notimplementederror
- def test_get_serial_console(self):
- instance_ref, network_info = self._get_running_instance()
- serial_console = self.connection.get_serial_console(self.ctxt,
- instance_ref)
- self.assertIsInstance(serial_console, ctype.ConsoleSerial)
-
- @catch_notimplementederror
- def test_get_console_pool_info(self):
- instance_ref, network_info = self._get_running_instance()
- console_pool = self.connection.get_console_pool_info(instance_ref)
- self.assertIn('address', console_pool)
- self.assertIn('username', console_pool)
- self.assertIn('password', console_pool)
-
- @catch_notimplementederror
- def test_refresh_security_group_rules(self):
- # FIXME: Create security group and add the instance to it
- instance_ref, network_info = self._get_running_instance()
- self.connection.refresh_security_group_rules(1)
-
- @catch_notimplementederror
- def test_refresh_security_group_members(self):
- # FIXME: Create security group and add the instance to it
- instance_ref, network_info = self._get_running_instance()
- self.connection.refresh_security_group_members(1)
-
- @catch_notimplementederror
- def test_refresh_instance_security_rules(self):
- # FIXME: Create security group and add the instance to it
- instance_ref, network_info = self._get_running_instance()
- self.connection.refresh_instance_security_rules(instance_ref)
-
- @catch_notimplementederror
- def test_refresh_provider_fw_rules(self):
- instance_ref, network_info = self._get_running_instance()
- self.connection.refresh_provider_fw_rules()
-
- @catch_notimplementederror
- def test_ensure_filtering_for_instance(self):
- instance = test_utils.get_test_instance(obj=True)
- network_info = test_utils.get_test_network_info()
- self.connection.ensure_filtering_rules_for_instance(instance,
- network_info)
-
- @catch_notimplementederror
- def test_unfilter_instance(self):
- instance_ref = test_utils.get_test_instance()
- network_info = test_utils.get_test_network_info()
- self.connection.unfilter_instance(instance_ref, network_info)
-
- @catch_notimplementederror
- def test_live_migration(self):
- instance_ref, network_info = self._get_running_instance()
- self.connection.live_migration(self.ctxt, instance_ref, 'otherhost',
- lambda *a: None, lambda *a: None)
-
- @catch_notimplementederror
- def _check_available_resource_fields(self, host_status):
- keys = ['vcpus', 'memory_mb', 'local_gb', 'vcpus_used',
- 'memory_mb_used', 'hypervisor_type', 'hypervisor_version',
- 'hypervisor_hostname', 'cpu_info', 'disk_available_least',
- 'supported_instances']
- for key in keys:
- self.assertIn(key, host_status)
- self.assertIsInstance(host_status['hypervisor_version'], int)
-
- @catch_notimplementederror
- def test_get_available_resource(self):
- available_resource = self.connection.get_available_resource(
- 'myhostname')
- self._check_available_resource_fields(available_resource)
-
- @catch_notimplementederror
- def test_get_available_nodes(self):
- self.connection.get_available_nodes(False)
-
- @catch_notimplementederror
- def _check_host_cpu_status_fields(self, host_cpu_status):
- self.assertIn('kernel', host_cpu_status)
- self.assertIn('idle', host_cpu_status)
- self.assertIn('user', host_cpu_status)
- self.assertIn('iowait', host_cpu_status)
- self.assertIn('frequency', host_cpu_status)
-
- @catch_notimplementederror
- def test_get_host_cpu_stats(self):
- host_cpu_status = self.connection.get_host_cpu_stats()
- self._check_host_cpu_status_fields(host_cpu_status)
-
- @catch_notimplementederror
- def test_set_host_enabled(self):
- self.connection.set_host_enabled('a useless argument?', True)
-
- @catch_notimplementederror
- def test_get_host_uptime(self):
- self.connection.get_host_uptime('a useless argument?')
-
- @catch_notimplementederror
- def test_host_power_action_reboot(self):
- self.connection.host_power_action('a useless argument?', 'reboot')
-
- @catch_notimplementederror
- def test_host_power_action_shutdown(self):
- self.connection.host_power_action('a useless argument?', 'shutdown')
-
- @catch_notimplementederror
- def test_host_power_action_startup(self):
- self.connection.host_power_action('a useless argument?', 'startup')
-
- @catch_notimplementederror
- def test_add_to_aggregate(self):
- self.connection.add_to_aggregate(self.ctxt, 'aggregate', 'host')
-
- @catch_notimplementederror
- def test_remove_from_aggregate(self):
- self.connection.remove_from_aggregate(self.ctxt, 'aggregate', 'host')
-
- def test_events(self):
- got_events = []
-
- def handler(event):
- got_events.append(event)
-
- self.connection.register_event_listener(handler)
-
- event1 = virtevent.LifecycleEvent(
- "cef19ce0-0ca2-11df-855d-b19fbce37686",
- virtevent.EVENT_LIFECYCLE_STARTED)
- event2 = virtevent.LifecycleEvent(
- "cef19ce0-0ca2-11df-855d-b19fbce37686",
- virtevent.EVENT_LIFECYCLE_PAUSED)
-
- self.connection.emit_event(event1)
- self.connection.emit_event(event2)
- want_events = [event1, event2]
- self.assertEqual(want_events, got_events)
-
- event3 = virtevent.LifecycleEvent(
- "cef19ce0-0ca2-11df-855d-b19fbce37686",
- virtevent.EVENT_LIFECYCLE_RESUMED)
- event4 = virtevent.LifecycleEvent(
- "cef19ce0-0ca2-11df-855d-b19fbce37686",
- virtevent.EVENT_LIFECYCLE_STOPPED)
-
- self.connection.emit_event(event3)
- self.connection.emit_event(event4)
-
- want_events = [event1, event2, event3, event4]
- self.assertEqual(want_events, got_events)
-
- def test_event_bad_object(self):
- # Passing in something which does not inherit
- # from virtevent.Event
-
- def handler(event):
- pass
-
- self.connection.register_event_listener(handler)
-
- badevent = {
- "foo": "bar"
- }
-
- self.assertRaises(ValueError,
- self.connection.emit_event,
- badevent)
-
- def test_event_bad_callback(self):
- # Check that if a callback raises an exception,
- # it does not propagate back out of the
- # 'emit_event' call
-
- def handler(event):
- raise Exception("Hit Me!")
-
- self.connection.register_event_listener(handler)
-
- event1 = virtevent.LifecycleEvent(
- "cef19ce0-0ca2-11df-855d-b19fbce37686",
- virtevent.EVENT_LIFECYCLE_STARTED)
-
- self.connection.emit_event(event1)
-
- def test_set_bootable(self):
- self.assertRaises(NotImplementedError, self.connection.set_bootable,
- 'instance', True)
-
- @catch_notimplementederror
- def test_get_instance_disk_info(self):
- # This should be implemented by any driver that supports live migrate.
- instance_ref, network_info = self._get_running_instance()
- self.connection.get_instance_disk_info(instance_ref['name'],
- block_device_info={})
-
-
-class AbstractDriverTestCase(_VirtDriverTestCase, test.TestCase):
- def setUp(self):
- self.driver_module = "nova.virt.driver.ComputeDriver"
- super(AbstractDriverTestCase, self).setUp()
-
-
-class FakeConnectionTestCase(_VirtDriverTestCase, test.TestCase):
- def setUp(self):
- self.driver_module = 'nova.virt.fake.FakeDriver'
- fake.set_nodes(['myhostname'])
- super(FakeConnectionTestCase, self).setUp()
-
- def _check_available_resource_fields(self, host_status):
- super(FakeConnectionTestCase, self)._check_available_resource_fields(
- host_status)
-
- hypervisor_type = host_status['hypervisor_type']
- supported_instances = host_status['supported_instances']
- try:
- # supported_instances could be JSON wrapped
- supported_instances = jsonutils.loads(supported_instances)
- except TypeError:
- pass
- self.assertTrue(any(hypervisor_type in x for x in supported_instances))
-
-
-class LibvirtConnTestCase(_VirtDriverTestCase, test.TestCase):
-
- REQUIRES_LOCKING = True
-
- def setUp(self):
- # Point _VirtDriverTestCase at the right module
- self.driver_module = 'nova.virt.libvirt.LibvirtDriver'
- super(LibvirtConnTestCase, self).setUp()
- self.stubs.Set(self.connection,
- '_set_host_enabled', mock.MagicMock())
- self.useFixture(fixtures.MonkeyPatch(
- 'nova.context.get_admin_context',
- self._fake_admin_context))
-
- def _fake_admin_context(self, *args, **kwargs):
- return self.ctxt
-
- def test_force_hard_reboot(self):
- self.flags(wait_soft_reboot_seconds=0, group='libvirt')
- self.test_reboot()
-
- def test_migrate_disk_and_power_off(self):
- # there is lack of fake stuff to execute this method. so pass.
- self.skipTest("Test nothing, but this method"
- " needed to override superclass.")
-
- def test_internal_set_host_enabled(self):
- self.mox.UnsetStubs()
- service_mock = mock.MagicMock()
-
- # Previous status of the service: disabled: False
- service_mock.configure_mock(disabled_reason='None',
- disabled=False)
- with mock.patch.object(objects.Service, "get_by_compute_host",
- return_value=service_mock):
- self.connection._set_host_enabled(False, 'ERROR!')
- self.assertTrue(service_mock.disabled)
- self.assertEqual(service_mock.disabled_reason, 'AUTO: ERROR!')
-
- def test_set_host_enabled_when_auto_disabled(self):
- self.mox.UnsetStubs()
- service_mock = mock.MagicMock()
-
- # Previous status of the service: disabled: True, 'AUTO: ERROR'
- service_mock.configure_mock(disabled_reason='AUTO: ERROR',
- disabled=True)
- with mock.patch.object(objects.Service, "get_by_compute_host",
- return_value=service_mock):
- self.connection._set_host_enabled(True)
- self.assertFalse(service_mock.disabled)
- self.assertEqual(service_mock.disabled_reason, 'None')
-
- def test_set_host_enabled_when_manually_disabled(self):
- self.mox.UnsetStubs()
- service_mock = mock.MagicMock()
-
- # Previous status of the service: disabled: True, 'Manually disabled'
- service_mock.configure_mock(disabled_reason='Manually disabled',
- disabled=True)
- with mock.patch.object(objects.Service, "get_by_compute_host",
- return_value=service_mock):
- self.connection._set_host_enabled(True)
- self.assertTrue(service_mock.disabled)
- self.assertEqual(service_mock.disabled_reason, 'Manually disabled')
-
- def test_set_host_enabled_dont_override_manually_disabled(self):
- self.mox.UnsetStubs()
- service_mock = mock.MagicMock()
-
- # Previous status of the service: disabled: True, 'Manually disabled'
- service_mock.configure_mock(disabled_reason='Manually disabled',
- disabled=True)
- with mock.patch.object(objects.Service, "get_by_compute_host",
- return_value=service_mock):
- self.connection._set_host_enabled(False, 'ERROR!')
- self.assertTrue(service_mock.disabled)
- self.assertEqual(service_mock.disabled_reason, 'Manually disabled')
-
- @catch_notimplementederror
- @mock.patch.object(libvirt.driver.LibvirtDriver, '_unplug_vifs')
- def test_unplug_vifs_with_destroy_vifs_false(self, unplug_vifs_mock):
- instance_ref, network_info = self._get_running_instance()
- self.connection.cleanup(self.ctxt, instance_ref, network_info,
- destroy_vifs=False)
- self.assertEqual(unplug_vifs_mock.call_count, 0)
-
- @catch_notimplementederror
- @mock.patch.object(libvirt.driver.LibvirtDriver, '_unplug_vifs')
- def test_unplug_vifs_with_destroy_vifs_true(self, unplug_vifs_mock):
- instance_ref, network_info = self._get_running_instance()
- self.connection.cleanup(self.ctxt, instance_ref, network_info,
- destroy_vifs=True)
- self.assertEqual(unplug_vifs_mock.call_count, 1)
- unplug_vifs_mock.assert_called_once_with(instance_ref,
- network_info, True)