diff options
author | Hans Lindgren <hanlind@kth.se> | 2015-03-23 17:49:50 +0100 |
---|---|---|
committer | Hans Lindgren <hanlind@kth.se> | 2015-09-30 22:06:18 +0200 |
commit | 4e0b995a4948332694b6d0d4d7890af4784b8cc0 (patch) | |
tree | a6b3d7bbeb4050ef872076b4953a7c971803b20f | |
parent | b0013d93ffeaed53bc28d9558def26bdb7041ed7 (diff) | |
download | nova-4e0b995a4948332694b6d0d4d7890af4784b8cc0.tar.gz |
Remove conductor 2.x RPC API
*** DO NOT MERGE UNTIL MITAKA OPENS ***
A previous change adding support for the 3.x rpc interface retained
compatibility with 2.x as a transition point. This commit removes the
old API from the server side.
Related to blueprint liberty-bump-object-and-rpcapi-versions
UpgradeImpact
Change-Id: I757541cfb35b23eb4d242a15b223ee2db02593e3
-rw-r--r-- | nova/conductor/manager.py | 406 | ||||
-rw-r--r-- | nova/tests/unit/conductor/test_conductor.py | 683 |
2 files changed, 4 insertions, 1085 deletions
diff --git a/nova/conductor/manager.py b/nova/conductor/manager.py index c4259c9146..1a226db251 100644 --- a/nova/conductor/manager.py +++ b/nova/conductor/manager.py @@ -22,13 +22,8 @@ from oslo_log import log as logging import oslo_messaging as messaging from oslo_serialization import jsonutils from oslo_utils import excutils -from oslo_utils import timeutils import six -from nova.api.ec2 import ec2utils -from nova import block_device -from nova.cells import rpcapi as cells_rpcapi -from nova.compute import api as compute_api from nova.compute import rpcapi as compute_rpcapi from nova.compute import task_states from nova.compute import utils as compute_utils @@ -40,11 +35,8 @@ from nova import exception from nova.i18n import _, _LE, _LW from nova import image from nova import manager -from nova import network -from nova.network.security_group import openstack_driver from nova import objects from nova.objects import base as nova_object -from nova import quota from nova import rpc from nova.scheduler import client as scheduler_client from nova.scheduler import utils as scheduler_utils @@ -54,21 +46,6 @@ from nova import utils LOG = logging.getLogger(__name__) CONF = cfg.CONF -# Instead of having a huge list of arguments to instance_update(), we just -# accept a dict of fields to update and use this whitelist to validate it. -allowed_updates = ['task_state', 'vm_state', 'expected_task_state', - 'power_state', 'access_ip_v4', 'access_ip_v6', - 'launched_at', 'terminated_at', 'host', 'node', - 'memory_mb', 'vcpus', 'root_gb', 'ephemeral_gb', - 'instance_type_id', 'root_device_name', 'launched_on', - 'progress', 'vm_mode', 'default_ephemeral_device', - 'default_swap_device', 'root_device_name', - 'system_metadata', 'updated_at' - ] - -# Fields that we want to convert back into a datetime object. -datetime_fields = ['launched_at', 'terminated_at', 'updated_at'] - class ConductorManager(manager.Manager): """Mission: Conduct things. @@ -83,357 +60,18 @@ class ConductorManager(manager.Manager): namespace. See the ComputeTaskManager class for details. """ - target = messaging.Target(version='2.3') + target = messaging.Target(version='3.0') def __init__(self, *args, **kwargs): super(ConductorManager, self).__init__(service_name='conductor', *args, **kwargs) - self.security_group_api = ( - openstack_driver.get_openstack_security_group_driver()) - self._network_api = None - self._compute_api = None self.compute_task_mgr = ComputeTaskManager() - self.cells_rpcapi = cells_rpcapi.CellsAPI() self.additional_endpoints.append(self.compute_task_mgr) - self.additional_endpoints.append(_ConductorManagerV3Proxy(self)) - - @property - def network_api(self): - # NOTE(danms): We need to instantiate our network_api on first use - # to avoid the circular dependency that exists between our init - # and network_api's - if self._network_api is None: - self._network_api = network.API() - return self._network_api - - @property - def compute_api(self): - if self._compute_api is None: - self._compute_api = compute_api.API() - return self._compute_api - - # NOTE(hanlind): This can be removed in version 3.0 of the RPC API - @messaging.expected_exceptions(KeyError, ValueError, - exception.InvalidUUID, - exception.InstanceNotFound, - exception.UnexpectedTaskStateError) - def instance_update(self, context, instance_uuid, - updates, service): - for key, value in six.iteritems(updates): - if key not in allowed_updates: - LOG.error(_LE("Instance update attempted for " - "'%(key)s' on %(instance_uuid)s"), - {'key': key, 'instance_uuid': instance_uuid}) - raise KeyError("unexpected update keyword '%s'" % key) - if key in datetime_fields and isinstance(value, six.string_types): - updates[key] = timeutils.parse_strtime(value) - - instance = objects.Instance(context=context, uuid=instance_uuid, - **updates) - instance.obj_reset_changes(['uuid']) - instance.save() - return nova_object.obj_to_primitive(instance) - - # NOTE(hanlind): This can be removed in version 3.0 of the RPC API - @messaging.expected_exceptions(exception.InstanceNotFound) - def instance_get_by_uuid(self, context, instance_uuid, - columns_to_join): - return jsonutils.to_primitive( - self.db.instance_get_by_uuid(context, instance_uuid, - columns_to_join)) - - # NOTE(hanlind): This can be removed in version 3.0 of the RPC API - def instance_get_all_by_host(self, context, host, node, - columns_to_join): - if node is not None: - result = self.db.instance_get_all_by_host_and_node( - context.elevated(), host, node) - else: - result = self.db.instance_get_all_by_host(context.elevated(), host, - columns_to_join) - return jsonutils.to_primitive(result) - - # NOTE(hanlind): This can be removed in version 3.0 of the RPC API - def migration_get_in_progress_by_host_and_node(self, context, - host, node): - migrations = self.db.migration_get_in_progress_by_host_and_node( - context, host, node) - return jsonutils.to_primitive(migrations) - - # NOTE(hanlind): This can be removed in version 3.0 of the RPC API - @messaging.expected_exceptions(exception.AggregateHostExists) - def aggregate_host_add(self, context, aggregate, host): - host_ref = self.db.aggregate_host_add(context.elevated(), - aggregate['id'], host) - - return jsonutils.to_primitive(host_ref) - - # NOTE(hanlind): This can be removed in version 3.0 of the RPC API - @messaging.expected_exceptions(exception.AggregateHostNotFound) - def aggregate_host_delete(self, context, aggregate, host): - self.db.aggregate_host_delete(context.elevated(), - aggregate['id'], host) - - # NOTE(hanlind): This can be removed in version 3.0 of the RPC API - def aggregate_metadata_get_by_host(self, context, host, - key='availability_zone'): - result = self.db.aggregate_metadata_get_by_host(context, host, key) - return jsonutils.to_primitive(result) - - # NOTE(hanlind): This can be removed in version 3.0 of the RPC API - def bw_usage_update(self, context, uuid, mac, start_period, - bw_in, bw_out, last_ctr_in, last_ctr_out, - last_refreshed, update_cells): - if [bw_in, bw_out, last_ctr_in, last_ctr_out].count(None) != 4: - self.db.bw_usage_update(context, uuid, mac, start_period, - bw_in, bw_out, last_ctr_in, last_ctr_out, - last_refreshed, - update_cells=update_cells) - usage = self.db.bw_usage_get(context, uuid, start_period, mac) - return jsonutils.to_primitive(usage) def provider_fw_rule_get_all(self, context): rules = self.db.provider_fw_rule_get_all(context) return jsonutils.to_primitive(rules) - # NOTE(danms): This can be removed in version 3.0 of the RPC API - def agent_build_get_by_triple(self, context, hypervisor, os, architecture): - info = self.db.agent_build_get_by_triple(context, hypervisor, os, - architecture) - return jsonutils.to_primitive(info) - - # NOTE(ndipanov): This can be removed in version 3.0 of the RPC API - def block_device_mapping_update_or_create(self, context, values, create): - if create is None: - bdm = self.db.block_device_mapping_update_or_create(context, - values) - elif create is True: - bdm = self.db.block_device_mapping_create(context, values) - else: - bdm = self.db.block_device_mapping_update(context, - values['id'], - values) - bdm_obj = objects.BlockDeviceMapping._from_db_object( - context, objects.BlockDeviceMapping(), bdm) - self.cells_rpcapi.bdm_update_or_create_at_top(context, bdm_obj, - create=create) - - # NOTE(hanlind): This can be removed in version 3.0 of the RPC API - def block_device_mapping_get_all_by_instance(self, context, instance, - legacy): - bdms = self.db.block_device_mapping_get_all_by_instance( - context, instance['uuid']) - if legacy: - bdms = block_device.legacy_mapping(bdms) - return jsonutils.to_primitive(bdms) - - # NOTE(hanlind): This can be removed in version 3.0 of the RPC API - def instance_get_all_by_filters(self, context, filters, sort_key, - sort_dir, columns_to_join, - use_slave): - result = self.db.instance_get_all_by_filters( - context, filters, sort_key, sort_dir, - columns_to_join=columns_to_join, use_slave=use_slave) - return jsonutils.to_primitive(result) - - # NOTE(hanlind): This can be removed in version 3.0 of the RPC API - def instance_get_active_by_window_joined(self, context, begin, end, - project_id, host): - result = self.db.instance_get_active_by_window_joined( - context, begin, end, project_id, host) - return jsonutils.to_primitive(result) - - # NOTE(hanlind): This can be removed in version 3.0 of the RPC API - def instance_destroy(self, context, instance): - if not isinstance(instance, objects.Instance): - instance = objects.Instance._from_db_object(context, - objects.Instance(), - instance) - instance.destroy() - return nova_object.obj_to_primitive(instance) - - # NOTE(hanlind): This can be removed in version 3.0 of the RPC API - def instance_fault_create(self, context, values): - result = self.db.instance_fault_create(context, values) - return jsonutils.to_primitive(result) - - # NOTE(hanlind): This can be removed in version 3.0 of the RPC API - def vol_usage_update(self, context, vol_id, rd_req, rd_bytes, wr_req, - wr_bytes, instance, last_refreshed, update_totals): - vol_usage = self.db.vol_usage_update(context, vol_id, - rd_req, rd_bytes, - wr_req, wr_bytes, - instance['uuid'], - instance['project_id'], - instance['user_id'], - instance['availability_zone'], - update_totals) - - # We have just updated the database, so send the notification now - vol_usage = objects.VolumeUsage._from_db_object( - context, objects.VolumeUsage(), vol_usage) - self.notifier.info(context, 'volume.usage', - compute_utils.usage_volume_info(vol_usage)) - - # NOTE(hanlind): This method can be removed in version 3.0 of the RPC API - @messaging.expected_exceptions(exception.ComputeHostNotFound, - exception.HostBinaryNotFound) - def service_get_all_by(self, context, topic, host, binary): - if not any((topic, host, binary)): - result = self.db.service_get_all(context) - elif all((topic, host)): - if topic == 'compute': - result = self.db.service_get_by_compute_host(context, host) - # NOTE(sbauza): Only Juno computes are still calling this - # conductor method for getting service_get_by_compute_node, - # but expect a compute_node field so we can safely add it. - result['compute_node' - ] = objects.ComputeNodeList.get_all_by_host( - context, result['host']) - # FIXME(comstud) Potentially remove this on bump to v3.0 - result = [result] - else: - result = self.db.service_get_by_host_and_topic(context, - host, topic) - elif all((host, binary)): - result = self.db.service_get_by_host_and_binary( - context, host, binary) - elif topic: - result = self.db.service_get_all_by_topic(context, topic) - elif host: - result = self.db.service_get_all_by_host(context, host) - - return jsonutils.to_primitive(result) - - # NOTE(hanlind): This can be removed in version 3.0 of the RPC API - @messaging.expected_exceptions(exception.InstanceActionNotFound) - def action_event_start(self, context, values): - evt = self.db.action_event_start(context, values) - return jsonutils.to_primitive(evt) - - # NOTE(hanlind): This can be removed in version 3.0 of the RPC API - @messaging.expected_exceptions(exception.InstanceActionNotFound, - exception.InstanceActionEventNotFound) - def action_event_finish(self, context, values): - evt = self.db.action_event_finish(context, values) - return jsonutils.to_primitive(evt) - - # NOTE(hanlind): This method can be removed in version 3.0 of the RPC API - def service_create(self, context, values): - svc = self.db.service_create(context, values) - return jsonutils.to_primitive(svc) - - # NOTE(hanlind): This method can be removed in version 3.0 of the RPC API - @messaging.expected_exceptions(exception.ServiceNotFound) - def service_destroy(self, context, service_id): - self.db.service_destroy(context, service_id) - - # NOTE(hanlind): This method can be removed in version 3.0 of the RPC API - def compute_node_create(self, context, values): - result = self.db.compute_node_create(context, values) - return jsonutils.to_primitive(result) - - # NOTE(hanlind): This can be removed in version 3.0 of the RPC API - def compute_node_update(self, context, node, values): - result = self.db.compute_node_update(context, node['id'], values) - return jsonutils.to_primitive(result) - - # NOTE(hanlind): This can be removed in version 3.0 of the RPC API - def compute_node_delete(self, context, node): - result = self.db.compute_node_delete(context, node['id']) - return jsonutils.to_primitive(result) - - # NOTE(hanlind): This method can be removed in version 3.0 of the RPC API - @messaging.expected_exceptions(exception.ServiceNotFound) - def service_update(self, context, service, values): - svc = self.db.service_update(context, service['id'], values) - return jsonutils.to_primitive(svc) - - # NOTE(hanlind): This can be removed in version 3.0 of the RPC API - def task_log_get(self, context, task_name, begin, end, host, state): - result = self.db.task_log_get(context, task_name, begin, end, host, - state) - return jsonutils.to_primitive(result) - - # NOTE(hanlind): This can be removed in version 3.0 of the RPC API - def task_log_begin_task(self, context, task_name, begin, end, host, - task_items, message): - result = self.db.task_log_begin_task(context.elevated(), task_name, - begin, end, host, task_items, - message) - return jsonutils.to_primitive(result) - - # NOTE(hanlind): This can be removed in version 3.0 of the RPC API - def task_log_end_task(self, context, task_name, begin, end, host, - errors, message): - result = self.db.task_log_end_task(context.elevated(), task_name, - begin, end, host, errors, message) - return jsonutils.to_primitive(result) - - # NOTE(hanlind): This can be removed in version 3.0 of the RPC API - def notify_usage_exists(self, context, instance, current_period, - ignore_missing_network_data, - system_metadata, extra_usage_info): - if not isinstance(instance, objects.Instance): - attrs = ['metadata', 'system_metadata'] - instance = objects.Instance._from_db_object(context, - objects.Instance(), - instance, - expected_attrs=attrs) - compute_utils.notify_usage_exists(self.notifier, context, instance, - current_period, - ignore_missing_network_data, - system_metadata, extra_usage_info) - - # NOTE(hanlind): This can be removed in version 3.0 of the RPC API - def security_groups_trigger_handler(self, context, event, args): - self.security_group_api.trigger_handler(event, context, *args) - - # NOTE(hanlind): This can be removed in version 3.0 of the RPC API - def security_groups_trigger_members_refresh(self, context, group_ids): - self.security_group_api.trigger_members_refresh(context, group_ids) - - # NOTE(hanlind): This can be removed in version 3.0 of the RPC API - def network_migrate_instance_start(self, context, instance, migration): - self.network_api.migrate_instance_start(context, instance, migration) - - # NOTE(hanlind): This can be removed in version 3.0 of the RPC API - def network_migrate_instance_finish(self, context, instance, migration): - self.network_api.migrate_instance_finish(context, instance, migration) - - # NOTE(hanlind): This can be removed in version 3.0 of the RPC API - def quota_commit(self, context, reservations, project_id=None, - user_id=None): - quota.QUOTAS.commit(context, reservations, project_id=project_id, - user_id=user_id) - - # NOTE(hanlind): This can be removed in version 3.0 of the RPC API - def quota_rollback(self, context, reservations, project_id=None, - user_id=None): - quota.QUOTAS.rollback(context, reservations, project_id=project_id, - user_id=user_id) - - # NOTE(hanlind): This method can be removed in version 3.0 of the RPC API - def get_ec2_ids(self, context, instance): - ec2_ids = {} - - ec2_ids['instance-id'] = ec2utils.id_to_ec2_inst_id(instance['uuid']) - ec2_ids['ami-id'] = ec2utils.glance_id_to_ec2_id(context, - instance['image_ref']) - for image_type in ['kernel', 'ramdisk']: - image_id = instance.get('%s_id' % image_type) - if image_id is not None: - ec2_image_type = ec2utils.image_type(image_type) - ec2_id = ec2utils.glance_id_to_ec2_id(context, image_id, - ec2_image_type) - ec2_ids['%s-id' % image_type] = ec2_id - - return ec2_ids - - # NOTE(hanlind): This can be removed in version 3.0 of the RPC API - def compute_unrescue(self, context, instance): - self.compute_api.unrescue(context, instance) - def _object_dispatch(self, target, method, args, kwargs): """Dispatch a call to an object method. @@ -448,20 +86,6 @@ class ConductorManager(manager.Manager): except Exception: raise messaging.ExpectedException() - # NOTE(hanlind): This can be removed in version 3.0 of the RPC API - def object_class_action(self, context, objname, objmethod, - objver, args, kwargs): - """Perform a classmethod action on an object.""" - objclass = nova_object.NovaObject.obj_class_from_name(objname, - objver) - args = tuple([context] + list(args)) - result = self._object_dispatch(objclass, objmethod, args, kwargs) - # NOTE(danms): The RPC layer will convert to primitives for us, - # but in this case, we need to honor the version the client is - # asking for, so we do it before returning here. - return (result.obj_to_primitive(target_version=objver) - if isinstance(result, nova_object.NovaObject) else result) - def object_class_action_versions(self, context, objname, objmethod, object_versions, args, kwargs): objclass = nova_object.NovaObject.obj_class_from_name( @@ -496,10 +120,6 @@ class ConductorManager(manager.Manager): updates['obj_what_changed'] = objinst.obj_what_changed() return updates, result - # NOTE(hanlind): This can be removed in version 3.0 of the RPC API - def object_backport(self, context, objinst, target_version): - return objinst.obj_to_primitive(target_version=target_version) - def object_backport_versions(self, context, objinst, object_versions): target = object_versions[objinst.obj_name()] LOG.debug('Backporting %(obj)s to %(ver)s with versions %(manifest)s', @@ -919,27 +539,3 @@ class ComputeTaskManager(base.Base): preserve_ephemeral=preserve_ephemeral, migration=migration, host=host, node=node, limits=limits) - - -class _ConductorManagerV3Proxy(object): - - target = messaging.Target(version='3.0') - - def __init__(self, manager): - self.manager = manager - - def provider_fw_rule_get_all(self, context): - return self.manager.provider_fw_rule_get_all(context) - - def object_class_action_versions(self, context, objname, objmethod, - object_versions, args, kwargs): - return self.manager.object_class_action_versions( - context, objname, objmethod, object_versions, args, kwargs) - - def object_action(self, context, objinst, objmethod, args, kwargs): - return self.manager.object_action(context, objinst, objmethod, args, - kwargs) - - def object_backport_versions(self, context, objinst, object_versions): - return self.manager.object_backport_versions(context, objinst, - object_versions) diff --git a/nova/tests/unit/conductor/test_conductor.py b/nova/tests/unit/conductor/test_conductor.py index c1fce5f5fc..c710e7dfb0 100644 --- a/nova/tests/unit/conductor/test_conductor.py +++ b/nova/tests/unit/conductor/test_conductor.py @@ -25,12 +25,9 @@ import oslo_messaging as messaging from oslo_utils import timeutils import six -from nova.api.ec2 import ec2utils -from nova.compute import arch from nova.compute import flavors from nova.compute import rpcapi as compute_rpcapi from nova.compute import task_states -from nova.compute import utils as compute_utils from nova.compute import vm_states from nova import conductor from nova.conductor import api as conductor_api @@ -40,13 +37,10 @@ from nova.conductor.tasks import live_migrate from nova.conductor.tasks import migrate from nova import context from nova import db -from nova.db.sqlalchemy import models from nova import exception as exc from nova.image import api as image_api -from nova import notifications from nova import objects from nova.objects import base as obj_base -from nova.objects import block_device as block_device_obj from nova.objects import fields from nova import rpc from nova.scheduler import client as scheduler_client @@ -54,18 +48,13 @@ from nova.scheduler import utils as scheduler_utils from nova import test from nova.tests.unit import cast_as_call from nova.tests.unit.compute import test_compute -from nova.tests.unit import fake_block_device from nova.tests.unit import fake_instance from nova.tests.unit import fake_notifier from nova.tests.unit import fake_server_actions from nova.tests.unit import fake_utils -from nova.tests.unit.objects import test_volume_usage from nova import utils -FAKE_IMAGE_REF = 'fake-image-ref' - - class FakeContext(context.RequestContext): def elevated(self): """Return a consistent elevated context so we can detect it.""" @@ -111,216 +100,6 @@ class ConductorTestCase(_BaseTestCase, test.TestCase): self.conductor = conductor_manager.ConductorManager() self.conductor_manager = self.conductor - def _create_fake_instance(self, params=None, type_name='m1.tiny'): - if not params: - params = {} - - inst = {} - inst['vm_state'] = vm_states.ACTIVE - inst['image_ref'] = FAKE_IMAGE_REF - inst['reservation_id'] = 'r-fakeres' - inst['user_id'] = self.user_id - inst['project_id'] = self.project_id - inst['host'] = 'fake_host' - type_id = flavors.get_flavor_by_name(type_name)['id'] - inst['instance_type_id'] = type_id - inst['ami_launch_index'] = 0 - inst['memory_mb'] = 0 - inst['vcpus'] = 0 - inst['root_gb'] = 0 - inst['ephemeral_gb'] = 0 - inst['architecture'] = arch.X86_64 - inst['os_type'] = 'Linux' - inst['availability_zone'] = 'fake-az' - inst.update(params) - return db.instance_create(self.context, inst) - - def _do_update(self, instance_uuid, **updates): - return self.conductor.instance_update(self.context, instance_uuid, - updates, None) - - def test_instance_update(self): - instance = self._create_fake_instance() - new_inst = self._do_update(instance['uuid'], - vm_state=vm_states.STOPPED) - instance = db.instance_get_by_uuid(self.context, instance['uuid']) - self.assertEqual(instance['vm_state'], vm_states.STOPPED) - self.assertEqual(new_inst['vm_state'], instance['vm_state']) - - def test_instance_update_invalid_key(self): - # NOTE(danms): the real DB API call ignores invalid keys - if self.db is None: - self.conductor = utils.ExceptionHelper(self.conductor) - self.assertRaises(KeyError, - self._do_update, 'any-uuid', foobar=1) - - def test_instance_get_by_uuid(self): - orig_instance = self._create_fake_instance() - copy_instance = self.conductor.instance_get_by_uuid( - self.context, orig_instance['uuid'], None) - self.assertEqual(orig_instance['name'], - copy_instance['name']) - - def test_block_device_mapping_update_or_create(self): - fake_bdm = {'id': 1, 'device_name': 'foo', - 'source_type': 'volume', 'volume_id': 'fake-vol-id', - 'destination_type': 'volume'} - fake_bdm = fake_block_device.FakeDbBlockDeviceDict(fake_bdm) - fake_bdm2 = {'id': 1, 'device_name': 'foo2', - 'source_type': 'volume', 'volume_id': 'fake-vol-id', - 'destination_type': 'volume'} - fake_bdm2 = fake_block_device.FakeDbBlockDeviceDict(fake_bdm2) - cells_rpcapi = self.conductor.cells_rpcapi - self.mox.StubOutWithMock(db, 'block_device_mapping_create') - self.mox.StubOutWithMock(db, 'block_device_mapping_update') - self.mox.StubOutWithMock(db, 'block_device_mapping_update_or_create') - self.mox.StubOutWithMock(cells_rpcapi, - 'bdm_update_or_create_at_top') - db.block_device_mapping_create(self.context, - fake_bdm).AndReturn(fake_bdm2) - cells_rpcapi.bdm_update_or_create_at_top( - self.context, mox.IsA(block_device_obj.BlockDeviceMapping), - create=True) - db.block_device_mapping_update(self.context, fake_bdm['id'], - fake_bdm).AndReturn(fake_bdm2) - cells_rpcapi.bdm_update_or_create_at_top( - self.context, mox.IsA(block_device_obj.BlockDeviceMapping), - create=False) - self.mox.ReplayAll() - self.conductor.block_device_mapping_update_or_create(self.context, - fake_bdm, - create=True) - self.conductor.block_device_mapping_update_or_create(self.context, - fake_bdm, - create=False) - - def test_instance_get_all_by_filters(self): - filters = {'foo': 'bar'} - self.mox.StubOutWithMock(db, 'instance_get_all_by_filters') - db.instance_get_all_by_filters(self.context, filters, - 'fake-key', 'fake-sort', - columns_to_join=None, use_slave=False) - self.mox.ReplayAll() - self.conductor.instance_get_all_by_filters(self.context, filters, - 'fake-key', 'fake-sort', - None, False) - - def test_instance_get_all_by_filters_use_slave(self): - filters = {'foo': 'bar'} - self.mox.StubOutWithMock(db, 'instance_get_all_by_filters') - db.instance_get_all_by_filters(self.context, filters, - 'fake-key', 'fake-sort', - columns_to_join=None, use_slave=True) - self.mox.ReplayAll() - self.conductor.instance_get_all_by_filters(self.context, filters, - 'fake-key', 'fake-sort', - columns_to_join=None, - use_slave=True) - - def test_instance_get_all_by_host(self): - self.mox.StubOutWithMock(db, 'instance_get_all_by_host') - self.mox.StubOutWithMock(db, 'instance_get_all_by_host_and_node') - db.instance_get_all_by_host(self.context.elevated(), - 'host', None).AndReturn('result') - db.instance_get_all_by_host_and_node(self.context.elevated(), 'host', - 'node').AndReturn('result') - self.mox.ReplayAll() - result = self.conductor.instance_get_all_by_host(self.context, 'host', - None, None) - self.assertEqual(result, 'result') - result = self.conductor.instance_get_all_by_host(self.context, 'host', - 'node', None) - self.assertEqual(result, 'result') - - def _test_stubbed(self, name, dbargs, condargs, - db_result_listified=False, db_exception=None): - - self.mox.StubOutWithMock(db, name) - if db_exception: - getattr(db, name)(self.context, *dbargs).AndRaise(db_exception) - getattr(db, name)(self.context, *dbargs).AndRaise(db_exception) - else: - getattr(db, name)(self.context, *dbargs).AndReturn(condargs) - if name == 'service_get_by_compute_host': - self.mox.StubOutWithMock( - objects.ComputeNodeList, 'get_all_by_host') - objects.ComputeNodeList.get_all_by_host( - self.context, mox.IgnoreArg() - ).AndReturn(['fake-compute']) - self.mox.ReplayAll() - if db_exception: - self.assertRaises(messaging.ExpectedException, - self.conductor.service_get_all_by, - self.context, **condargs) - - self.conductor = utils.ExceptionHelper(self.conductor) - - self.assertRaises(db_exception.__class__, - self.conductor.service_get_all_by, - self.context, **condargs) - else: - result = self.conductor.service_get_all_by(self.context, - **condargs) - if db_result_listified: - if name == 'service_get_by_compute_host': - condargs['compute_node'] = ['fake-compute'] - self.assertEqual([condargs], result) - else: - self.assertEqual(condargs, result) - - def test_service_get_all(self): - self._test_stubbed('service_get_all', (), - dict(host=None, topic=None, binary=None)) - - def test_service_get_by_host_and_topic(self): - self._test_stubbed('service_get_by_host_and_topic', - ('host', 'topic'), - dict(topic='topic', host='host', binary=None)) - - def test_service_get_all_by_topic(self): - self._test_stubbed('service_get_all_by_topic', - ('topic',), - dict(topic='topic', host=None, binary=None)) - - def test_service_get_all_by_host(self): - self._test_stubbed('service_get_all_by_host', - ('host',), - dict(host='host', topic=None, binary=None)) - - def test_service_get_by_compute_host(self): - self._test_stubbed('service_get_by_compute_host', - ('host',), - dict(topic='compute', host='host', binary=None), - db_result_listified=True) - - def test_service_get_by_host_and_binary(self): - self._test_stubbed('service_get_by_host_and_binary', - ('host', 'binary'), - dict(host='host', binary='binary', topic=None)) - - def test_service_get_by_compute_host_not_found(self): - self._test_stubbed('service_get_by_compute_host', - ('host',), - dict(topic='compute', host='host', binary=None), - db_exception=exc.ComputeHostNotFound(host='host')) - - def test_service_get_by_host_and_binary_not_found(self): - self._test_stubbed('service_get_by_host_and_binary', - ('host', 'binary'), - dict(host='host', binary='binary', topic=None), - db_exception=exc.HostBinaryNotFound(binary='binary', - host='host')) - - def test_security_groups_trigger_handler(self): - self.mox.StubOutWithMock(self.conductor_manager.security_group_api, - 'trigger_handler') - self.conductor_manager.security_group_api.trigger_handler('event', - self.context, - 'args') - self.mox.ReplayAll() - self.conductor.security_groups_trigger_handler(self.context, - 'event', ['args']) - def _test_object_action(self, is_classmethod, raise_exception): class TestObject(obj_base.NovaObject): def foo(self, raise_exception=False): @@ -343,8 +122,9 @@ class ConductorTestCase(_BaseTestCase, test.TestCase): # so use a list here to make sure we can handle it fake_args = [] if is_classmethod: - result = self.conductor.object_class_action( - self.context, TestObject.obj_name(), 'bar', '1.0', + versions = {'TestObject': '1.0'} + result = self.conductor.object_class_action_versions( + self.context, TestObject.obj_name(), 'bar', versions, fake_args, {'raise_exception': raise_exception}) else: updates, result = self.conductor.object_action( @@ -410,399 +190,6 @@ class ConductorTestCase(_BaseTestCase, test.TestCase): m.return_value.obj_to_primitive.assert_called_once_with( target_version='1.2', version_manifest=versions) - def _test_expected_exceptions(self, db_method, conductor_method, errors, - *args, **kwargs): - # Tests that expected exceptions are handled properly. - for error in errors: - with mock.patch.object(db, db_method, side_effect=error): - self.assertRaises(messaging.ExpectedException, - conductor_method, - self.context, *args, **kwargs) - - def test_action_event_start_expected_exceptions(self): - error = exc.InstanceActionNotFound(request_id='1', instance_uuid='2') - self._test_expected_exceptions( - 'action_event_start', self.conductor.action_event_start, [error], - {'foo': 'bar'}) - - def test_action_event_finish_expected_exceptions(self): - errors = (exc.InstanceActionNotFound(request_id='1', - instance_uuid='2'), - exc.InstanceActionEventNotFound(event='1', action_id='2')) - self._test_expected_exceptions( - 'action_event_finish', self.conductor.action_event_finish, - errors, {'foo': 'bar'}) - - def test_instance_update_expected_exceptions(self): - errors = (exc.InvalidUUID(uuid='foo'), - exc.InstanceNotFound(instance_id=1), - exc.UnexpectedTaskStateError(instance_uuid='fake_uuid', - expected={'task_state': 'foo'}, - actual={'task_state': 'bar'})) - self._test_expected_exceptions( - 'instance_update', self.conductor.instance_update, - errors, None, {'foo': 'bar'}, None) - - def test_instance_get_by_uuid_expected_exceptions(self): - error = exc.InstanceNotFound(instance_id=1) - self._test_expected_exceptions( - 'instance_get_by_uuid', self.conductor.instance_get_by_uuid, - [error], None, []) - - def test_aggregate_host_add_expected_exceptions(self): - error = exc.AggregateHostExists(aggregate_id=1, host='foo') - self._test_expected_exceptions( - 'aggregate_host_add', self.conductor.aggregate_host_add, - [error], {'id': 1}, None) - - def test_aggregate_host_delete_expected_exceptions(self): - error = exc.AggregateHostNotFound(aggregate_id=1, host='foo') - self._test_expected_exceptions( - 'aggregate_host_delete', self.conductor.aggregate_host_delete, - [error], {'id': 1}, None) - - def test_service_update_expected_exceptions(self): - error = exc.ServiceNotFound(service_id=1) - self._test_expected_exceptions( - 'service_update', - self.conductor.service_update, - [error], {'id': 1}, None) - - def test_service_destroy_expected_exceptions(self): - error = exc.ServiceNotFound(service_id=1) - self._test_expected_exceptions( - 'service_destroy', - self.conductor.service_destroy, - [error], 1) - - def _setup_aggregate_with_host(self): - aggregate_ref = db.aggregate_create(self.context.elevated(), - {'name': 'foo'}, metadata={'availability_zone': 'foo'}) - - self.conductor.aggregate_host_add(self.context, aggregate_ref, 'bar') - - aggregate_ref = db.aggregate_get(self.context.elevated(), - aggregate_ref['id']) - - return aggregate_ref - - def test_aggregate_host_add(self): - aggregate_ref = self._setup_aggregate_with_host() - - self.assertIn('bar', aggregate_ref['hosts']) - - db.aggregate_delete(self.context.elevated(), aggregate_ref['id']) - - def test_aggregate_host_delete(self): - aggregate_ref = self._setup_aggregate_with_host() - - self.conductor.aggregate_host_delete(self.context, aggregate_ref, - 'bar') - - aggregate_ref = db.aggregate_get(self.context.elevated(), - aggregate_ref['id']) - - self.assertNotIn('bar', aggregate_ref['hosts']) - - db.aggregate_delete(self.context.elevated(), aggregate_ref['id']) - - def test_network_migrate_instance_start(self): - self.mox.StubOutWithMock(self.conductor_manager.network_api, - 'migrate_instance_start') - self.conductor_manager.network_api.migrate_instance_start(self.context, - 'instance', - 'migration') - self.mox.ReplayAll() - self.conductor.network_migrate_instance_start(self.context, - 'instance', - 'migration') - - def test_network_migrate_instance_finish(self): - self.mox.StubOutWithMock(self.conductor_manager.network_api, - 'migrate_instance_finish') - self.conductor_manager.network_api.migrate_instance_finish( - self.context, 'instance', 'migration') - self.mox.ReplayAll() - self.conductor.network_migrate_instance_finish(self.context, - 'instance', - 'migration') - - def test_instance_destroy(self): - instance = objects.Instance(id=1, uuid='fake-uuid') - - @mock.patch.object(instance, 'destroy') - @mock.patch.object(obj_base, 'obj_to_primitive', - return_value='fake-result') - def do_test(mock_to_primitive, mock_destroy): - result = self.conductor.instance_destroy(self.context, instance) - mock_destroy.assert_called_once_with() - mock_to_primitive.assert_called_once_with(instance) - self.assertEqual(result, 'fake-result') - do_test() - - def test_compute_unrescue(self): - self.mox.StubOutWithMock(self.conductor_manager.compute_api, - 'unrescue') - self.conductor_manager.compute_api.unrescue(self.context, 'instance') - self.mox.ReplayAll() - self.conductor.compute_unrescue(self.context, 'instance') - - def test_instance_get_active_by_window_joined(self): - self.mox.StubOutWithMock(db, 'instance_get_active_by_window_joined') - db.instance_get_active_by_window_joined(self.context, 'fake-begin', - 'fake-end', 'fake-proj', - 'fake-host') - self.mox.ReplayAll() - self.conductor.instance_get_active_by_window_joined( - self.context, 'fake-begin', 'fake-end', 'fake-proj', 'fake-host') - - def test_instance_fault_create(self): - self.mox.StubOutWithMock(db, 'instance_fault_create') - db.instance_fault_create(self.context, 'fake-values').AndReturn( - 'fake-result') - self.mox.ReplayAll() - result = self.conductor.instance_fault_create(self.context, - 'fake-values') - self.assertEqual(result, 'fake-result') - - def test_action_event_start(self): - self.mox.StubOutWithMock(db, 'action_event_start') - db.action_event_start(self.context, mox.IgnoreArg()) - self.mox.ReplayAll() - self.conductor.action_event_start(self.context, {}) - - def test_action_event_finish(self): - self.mox.StubOutWithMock(db, 'action_event_finish') - db.action_event_finish(self.context, mox.IgnoreArg()) - self.mox.ReplayAll() - self.conductor.action_event_finish(self.context, {}) - - def test_agent_build_get_by_triple(self): - self.mox.StubOutWithMock(db, 'agent_build_get_by_triple') - db.agent_build_get_by_triple(self.context, 'fake-hv', 'fake-os', - 'fake-arch').AndReturn('it worked') - self.mox.ReplayAll() - result = self.conductor.agent_build_get_by_triple(self.context, - 'fake-hv', - 'fake-os', - 'fake-arch') - self.assertEqual(result, 'it worked') - - def test_bw_usage_update(self): - self.mox.StubOutWithMock(db, 'bw_usage_update') - self.mox.StubOutWithMock(db, 'bw_usage_get') - - update_args = (self.context, 'uuid', 'mac', 0, 10, 20, 5, 10, 20) - get_args = (self.context, 'uuid', 0, 'mac') - - db.bw_usage_update(*update_args, update_cells=True) - db.bw_usage_get(*get_args).AndReturn('foo') - - self.mox.ReplayAll() - result = self.conductor.bw_usage_update(*update_args, - update_cells=True) - self.assertEqual(result, 'foo') - - @mock.patch.object(notifications, 'audit_period_bounds') - @mock.patch.object(notifications, 'bandwidth_usage') - @mock.patch.object(compute_utils, 'notify_about_instance_usage') - def test_notify_usage_exists(self, mock_notify, mock_bw, mock_audit): - info = { - 'audit_period_beginning': 'start', - 'audit_period_ending': 'end', - 'bandwidth': 'bw_usage', - 'image_meta': {}, - 'extra': 'info', - } - instance = objects.Instance(id=1, system_metadata={}) - - mock_audit.return_value = ('start', 'end') - mock_bw.return_value = 'bw_usage' - - self.conductor.notify_usage_exists(self.context, instance, False, True, - system_metadata={}, - extra_usage_info=dict(extra='info')) - - class MatchInstance(object): - def __eq__(self, thing): - return thing.id == instance.id - - notifier = self.conductor_manager.notifier - mock_audit.assert_called_once_with(False) - mock_bw.assert_called_once_with(MatchInstance(), 'start', True) - mock_notify.assert_called_once_with(notifier, self.context, - MatchInstance(), - 'exists', system_metadata={}, - extra_usage_info=info) - - def test_get_ec2_ids(self): - expected = { - 'instance-id': 'ec2-inst-id', - 'ami-id': 'ec2-ami-id', - 'kernel-id': 'ami-kernel-ec2-kernelid', - 'ramdisk-id': 'ami-ramdisk-ec2-ramdiskid', - } - inst = { - 'uuid': 'fake-uuid', - 'kernel_id': 'ec2-kernelid', - 'ramdisk_id': 'ec2-ramdiskid', - 'image_ref': 'fake-image', - } - self.mox.StubOutWithMock(ec2utils, 'id_to_ec2_inst_id') - self.mox.StubOutWithMock(ec2utils, 'glance_id_to_ec2_id') - self.mox.StubOutWithMock(ec2utils, 'image_type') - - ec2utils.id_to_ec2_inst_id(inst['uuid']).AndReturn( - expected['instance-id']) - ec2utils.glance_id_to_ec2_id(self.context, - inst['image_ref']).AndReturn( - expected['ami-id']) - for image_type in ['kernel', 'ramdisk']: - image_id = inst['%s_id' % image_type] - ec2utils.image_type(image_type).AndReturn('ami-' + image_type) - ec2utils.glance_id_to_ec2_id(self.context, image_id, - 'ami-' + image_type).AndReturn( - 'ami-%s-ec2-%sid' % (image_type, image_type)) - - self.mox.ReplayAll() - result = self.conductor.get_ec2_ids(self.context, inst) - self.assertEqual(result, expected) - - def test_migration_get_in_progress_by_host_and_node(self): - self.mox.StubOutWithMock(db, - 'migration_get_in_progress_by_host_and_node') - db.migration_get_in_progress_by_host_and_node( - self.context, 'fake-host', 'fake-node').AndReturn('fake-result') - self.mox.ReplayAll() - result = self.conductor.migration_get_in_progress_by_host_and_node( - self.context, 'fake-host', 'fake-node') - self.assertEqual(result, 'fake-result') - - def test_aggregate_metadata_get_by_host(self): - self.mox.StubOutWithMock(db, 'aggregate_metadata_get_by_host') - db.aggregate_metadata_get_by_host(self.context, 'host', - 'key').AndReturn('result') - self.mox.ReplayAll() - result = self.conductor.aggregate_metadata_get_by_host(self.context, - 'host', 'key') - self.assertEqual(result, 'result') - - def test_block_device_mapping_get_all_by_instance(self): - fake_inst = {'uuid': 'fake-uuid'} - self.mox.StubOutWithMock(db, - 'block_device_mapping_get_all_by_instance') - db.block_device_mapping_get_all_by_instance( - self.context, fake_inst['uuid']).AndReturn('fake-result') - self.mox.ReplayAll() - result = self.conductor.block_device_mapping_get_all_by_instance( - self.context, fake_inst, legacy=False) - self.assertEqual(result, 'fake-result') - - def test_compute_node_update(self): - node = {'id': 'fake-id'} - self.mox.StubOutWithMock(db, 'compute_node_update') - db.compute_node_update(self.context, node['id'], {'fake': 'values'}).\ - AndReturn('fake-result') - self.mox.ReplayAll() - result = self.conductor.compute_node_update(self.context, node, - {'fake': 'values'}) - self.assertEqual(result, 'fake-result') - - def test_compute_node_delete(self): - node = {'id': 'fake-id'} - self.mox.StubOutWithMock(db, 'compute_node_delete') - db.compute_node_delete(self.context, node['id']).AndReturn(None) - self.mox.ReplayAll() - result = self.conductor.compute_node_delete(self.context, node) - self.assertIsNone(result) - - def test_task_log_get(self): - self.mox.StubOutWithMock(db, 'task_log_get') - db.task_log_get(self.context, 'task', 'begin', 'end', 'host', - 'state').AndReturn('result') - self.mox.ReplayAll() - result = self.conductor.task_log_get(self.context, 'task', 'begin', - 'end', 'host', 'state') - self.assertEqual(result, 'result') - - def test_task_log_get_with_no_state(self): - self.mox.StubOutWithMock(db, 'task_log_get') - db.task_log_get(self.context, 'task', 'begin', 'end', - 'host', None).AndReturn('result') - self.mox.ReplayAll() - result = self.conductor.task_log_get(self.context, 'task', 'begin', - 'end', 'host', None) - self.assertEqual(result, 'result') - - def test_task_log_begin_task(self): - self.mox.StubOutWithMock(db, 'task_log_begin_task') - db.task_log_begin_task(self.context.elevated(), 'task', 'begin', - 'end', 'host', 'items', - 'message').AndReturn('result') - self.mox.ReplayAll() - result = self.conductor.task_log_begin_task( - self.context, 'task', 'begin', 'end', 'host', 'items', 'message') - self.assertEqual(result, 'result') - - def test_task_log_end_task(self): - self.mox.StubOutWithMock(db, 'task_log_end_task') - db.task_log_end_task(self.context.elevated(), 'task', 'begin', 'end', - 'host', 'errors', 'message').AndReturn('result') - self.mox.ReplayAll() - result = self.conductor.task_log_end_task( - self.context, 'task', 'begin', 'end', 'host', 'errors', 'message') - self.assertEqual(result, 'result') - - def test_security_groups_trigger_members_refresh(self): - self.mox.StubOutWithMock(self.conductor_manager.security_group_api, - 'trigger_members_refresh') - self.conductor_manager.security_group_api.trigger_members_refresh( - self.context, [1, 2, 3]) - self.mox.ReplayAll() - self.conductor.security_groups_trigger_members_refresh(self.context, - [1, 2, 3]) - - def test_vol_usage_update(self): - self.mox.StubOutWithMock(db, 'vol_usage_update') - self.mox.StubOutWithMock(compute_utils, 'usage_volume_info') - - fake_inst = {'uuid': 'fake-uuid', - 'project_id': 'fake-project', - 'user_id': 'fake-user', - 'availability_zone': 'fake-az', - } - - db.vol_usage_update(self.context, 'fake-vol', 22, 33, 44, 55, - fake_inst['uuid'], - fake_inst['project_id'], - fake_inst['user_id'], - fake_inst['availability_zone'], - False).AndReturn(test_volume_usage.fake_vol_usage) - compute_utils.usage_volume_info( - mox.IsA(objects.VolumeUsage)).AndReturn('fake-info') - - self.mox.ReplayAll() - - self.conductor.vol_usage_update(self.context, 'fake-vol', - 22, 33, 44, 55, fake_inst, None, False) - - self.assertEqual(1, len(fake_notifier.NOTIFICATIONS)) - msg = fake_notifier.NOTIFICATIONS[0] - self.assertEqual('conductor.%s' % self.conductor_manager.host, - msg.publisher_id) - self.assertEqual('volume.usage', msg.event_type) - self.assertEqual('INFO', msg.priority) - self.assertEqual('fake-info', msg.payload) - - def test_compute_node_create(self): - self.mox.StubOutWithMock(db, 'compute_node_create') - db.compute_node_create(self.context, 'fake-values').AndReturn( - 'fake-result') - self.mox.ReplayAll() - result = self.conductor.compute_node_create(self.context, - 'fake-values') - self.assertEqual(result, 'fake-result') - class ConductorRPCAPITestCase(_BaseTestCase, test.TestCase): """Conductor RPC API Tests.""" @@ -890,49 +277,6 @@ class ConductorImportTest(test.TestCase): conductor_api.LocalComputeTaskAPI) -class ConductorPolicyTest(test.TestCase): - def test_all_allowed_keys(self): - ctxt = context.RequestContext('fake-user', 'fake-project') - conductor = conductor_manager.ConductorManager() - updates = {} - for key in conductor_manager.allowed_updates: - if key in conductor_manager.datetime_fields: - updates[key] = timeutils.utcnow() - elif key == 'access_ip_v4': - updates[key] = '10.0.0.2' - elif key == 'access_ip_v6': - updates[key] = '2001:db8:0:1::1' - elif key in ('instance_type_id', 'memory_mb', 'ephemeral_gb', - 'root_gb', 'vcpus', 'power_state', 'progress'): - updates[key] = 5 - elif key == 'system_metadata': - updates[key] = {'foo': 'foo'} - else: - updates[key] = 'foo' - - def fake_save(inst): - # id that comes back from db after updating - inst.id = 1 - - with mock.patch.object(objects.Instance, 'save', - side_effect=fake_save, - autospec=True) as mock_save: - conductor.instance_update(ctxt, 'fake-instance', updates, - 'conductor') - mock_save.assert_called_once_with(mock.ANY) - - def test_allowed_keys_are_real(self): - instance = models.Instance() - keys = list(conductor_manager.allowed_updates) - - # NOTE(danms): expected_task_state is a parameter that gets - # passed to the db layer, but is not actually an instance attribute - del keys[keys.index('expected_task_state')] - - for key in keys: - self.assertTrue(hasattr(instance, key)) - - class _BaseTaskTestCase(object): def setUp(self): super(_BaseTaskTestCase, self).setUp() @@ -2105,24 +1449,3 @@ class ConductorLocalComputeTaskAPITestCase(ConductorTaskAPITestCase): super(ConductorLocalComputeTaskAPITestCase, self).setUp() self.conductor = conductor_api.LocalComputeTaskAPI() self.conductor_manager = self.conductor._manager._target - - -class ConductorV3ManagerProxyTestCase(test.NoDBTestCase): - def test_v3_manager_proxy(self): - manager = conductor_manager.ConductorManager() - proxy = conductor_manager._ConductorManagerV3Proxy(manager) - ctxt = context.get_admin_context() - - methods = [ - # (method, number_of_args) - ('provider_fw_rule_get_all', 0), - ('object_class_action_versions', 5), - ('object_action', 4), - ('object_backport_versions', 2), - ] - - for method, num_args in methods: - args = range(num_args) - with mock.patch.object(manager, method) as mock_method: - getattr(proxy, method)(ctxt, *args) - mock_method.assert_called_once_with(ctxt, *args) |