summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChris Behrens <cbehrens@codestud.com>2012-04-13 05:54:48 +0000
committerChris Behrens <cbehrens@codestud.com>2013-01-04 20:45:05 +0000
commitf9a868e86ce11f786538547c301b805bd68a1697 (patch)
tree9c75001829a75e47efe28e7ef034c79ded8cde76
parent48487f1a4b8f8fa538f90716e293ac8d67853311 (diff)
downloadnova-f9a868e86ce11f786538547c301b805bd68a1697.tar.gz
Cells: Add the main code.
This introduces *EXPERIMENTAL* compute cells functionality as a way to scale nova in a more distributed fashion without having to use complicated technologies like DB and message queue clustering. Cells are configured as a tree and the top level cell should contain nova-api without any nova-computes while child cells contain everything except nova-api. One can think of a cell as a normal nova deployment in that each cell has its own DB server and message queue broker. The top level cell keeps a subset of data about ALL instances in all cells in its DB. Child cells send messages to the top level cell when instances change state. Data in 1 child cell is not shared with another child cell. A new service, nova-cells, is introduced that handles communication between cells and picking of a cell for new instances. This service is required for every cell. Communication between cells is pluggable with the only option currently implemented being communnication via RPC. Cells scheduling is separate from host scheduling. nova-cells first picks a cell (currently randomly -- future patches add filtering/weighing functionality and decisions can be based on broadcasts of capacity/capabilities). Once a cell has been selected and the new build request has reached its nova-cells service, it'll be sent over to the host scheduler in that cell and the build proceeds as it does without cells. New config options are introduced for enabling and configuring the cells code. Cells is disabled by default. All of the config options below go under a '[cells]' section in nova.conf. These are the options that one may want to tweak: enable -- Turn on cells code (default is False) name -- Name of the current cell. capabilities -- List of arbitrary key=value pairs defining capabilities of the current cell. These are sent to parent cells, but aren't used in scheduling until later filter/weight support is added. call_timeout -- How long to wait for replies from a calls between cells When using cells, the compute API class must be changed in the API cell, so that requests can be proxied via nova-cells down to the correct cell properly. Thus, config requirements for API cell: -- [DEFAULT] compute_api_class=nova.compute.cells_api.ComputeCellsAPI. [cells] enable=True name=api-cell -- Config requirements for child cell: -- [cells] enable=True name=child-cell1 -- Another requirement is populating the 'cells' DB table in each cell. Each cell needs to know about its parent and children and how to communicate with them (message broker location, credentials, etc). Implements blueprint nova-compute-cells DocImpact Change-Id: I1b52788ea9d7753365d175abf39bdbc22ba822fe
-rwxr-xr-xbin/nova-cells53
-rw-r--r--nova/cells/__init__.py19
-rw-r--r--nova/cells/driver.py41
-rw-r--r--nova/cells/manager.py136
-rw-r--r--nova/cells/messaging.py1047
-rw-r--r--nova/cells/opts.py44
-rw-r--r--nova/cells/rpc_driver.py165
-rw-r--r--nova/cells/rpcapi.py138
-rw-r--r--nova/cells/scheduler.py136
-rw-r--r--nova/cells/state.py346
-rw-r--r--nova/compute/api.py8
-rw-r--r--nova/compute/cells_api.py471
-rw-r--r--nova/db/api.py69
-rw-r--r--nova/exception.py28
-rw-r--r--nova/tests/cells/__init__.py19
-rw-r--r--nova/tests/cells/fakes.py191
-rw-r--r--nova/tests/cells/test_cells_manager.py151
-rw-r--r--nova/tests/cells/test_cells_messaging.py913
-rw-r--r--nova/tests/cells/test_cells_rpc_driver.py218
-rw-r--r--nova/tests/cells/test_cells_rpcapi.py206
-rw-r--r--nova/tests/cells/test_cells_scheduler.py206
-rw-r--r--nova/tests/compute/test_compute.py4
-rw-r--r--nova/tests/compute/test_compute_cells.py99
-rw-r--r--setup.py1
24 files changed, 4696 insertions, 13 deletions
diff --git a/bin/nova-cells b/bin/nova-cells
new file mode 100755
index 0000000000..a7e16ef532
--- /dev/null
+++ b/bin/nova-cells
@@ -0,0 +1,53 @@
+#!/usr/bin/env python
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+#
+# Copyright (c) 2012 Rackspace Hosting
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+"""Starter script for Nova Cells Service."""
+
+import eventlet
+eventlet.monkey_patch()
+
+import os
+import sys
+
+# If ../nova/__init__.py exists, add ../ to Python search path, so that
+# it will override what happens to be installed in /usr/(local/)lib/python...
+possible_topdir = os.path.normpath(os.path.join(os.path.abspath(sys.argv[0]),
+ os.pardir,
+ os.pardir))
+if os.path.exists(os.path.join(possible_topdir, 'nova', '__init__.py')):
+ sys.path.insert(0, possible_topdir)
+
+from nova import config
+from nova.openstack.common import cfg
+from nova.openstack.common import log as logging
+from nova import service
+from nova import utils
+
+CONF = cfg.CONF
+CONF.import_opt('topic', 'nova.cells.opts', group='cells')
+CONF.import_opt('manager', 'nova.cells.opts', group='cells')
+
+if __name__ == '__main__':
+ config.parse_args(sys.argv)
+ logging.setup('nova')
+ utils.monkey_patch()
+ server = service.Service.create(binary='nova-cells',
+ topic=CONF.cells.topic,
+ manager=CONF.cells.manager)
+ service.serve(server)
+ service.wait()
diff --git a/nova/cells/__init__.py b/nova/cells/__init__.py
new file mode 100644
index 0000000000..47d21a14b3
--- /dev/null
+++ b/nova/cells/__init__.py
@@ -0,0 +1,19 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright (c) 2012 Rackspace Hosting
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+"""
+Cells
+"""
diff --git a/nova/cells/driver.py b/nova/cells/driver.py
new file mode 100644
index 0000000000..04e29dddfa
--- /dev/null
+++ b/nova/cells/driver.py
@@ -0,0 +1,41 @@
+# Copyright (c) 2012 Rackspace Hosting
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+"""
+Base Cells Communication Driver
+"""
+
+
+class BaseCellsDriver(object):
+ """The base class for cells communication.
+
+ One instance of this class will be created for every neighbor cell
+ that we find in the DB and it will be associated with the cell in
+ its CellState.
+
+ One instance is also created by the cells manager for setting up
+ the consumers.
+ """
+ def start_consumers(self, msg_runner):
+ """Start any consumers the driver may need."""
+ raise NotImplementedError()
+
+ def stop_consumers(self):
+ """Stop consuming messages."""
+ raise NotImplementedError()
+
+ def send_message_to_cell(self, cell_state, message):
+ """Send a message to a cell."""
+ raise NotImplementedError()
diff --git a/nova/cells/manager.py b/nova/cells/manager.py
new file mode 100644
index 0000000000..a1352601cb
--- /dev/null
+++ b/nova/cells/manager.py
@@ -0,0 +1,136 @@
+# Copyright (c) 2012 Rackspace Hosting
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+"""
+Cells Service Manager
+"""
+
+from nova.cells import messaging
+from nova.cells import state as cells_state
+from nova import context
+from nova import manager
+from nova.openstack.common import cfg
+from nova.openstack.common import importutils
+from nova.openstack.common import log as logging
+
+cell_manager_opts = [
+ cfg.StrOpt('driver',
+ default='nova.cells.rpc_driver.CellsRPCDriver',
+ help='Cells communication driver to use'),
+]
+
+
+LOG = logging.getLogger(__name__)
+
+CONF = cfg.CONF
+CONF.register_opts(cell_manager_opts, group='cells')
+
+
+class CellsManager(manager.Manager):
+ """The nova-cells manager class. This class defines RPC
+ methods that the local cell may call. This class is NOT used for
+ messages coming from other cells. That communication is
+ driver-specific.
+
+ Communication to other cells happens via the messaging module. The
+ MessageRunner from that module will handle routing the message to
+ the correct cell via the communications driver. Most methods below
+ create 'targeted' (where we want to route a message to a specific cell)
+ or 'broadcast' (where we want a message to go to multiple cells)
+ messages.
+
+ Scheduling requests get passed to the scheduler class.
+ """
+ RPC_API_VERSION = '1.0'
+
+ def __init__(self, *args, **kwargs):
+ # Mostly for tests.
+ cell_state_manager = kwargs.pop('cell_state_manager', None)
+ super(CellsManager, self).__init__(*args, **kwargs)
+ if cell_state_manager is None:
+ cell_state_manager = cells_state.CellStateManager
+ self.state_manager = cell_state_manager()
+ self.msg_runner = messaging.MessageRunner(self.state_manager)
+ cells_driver_cls = importutils.import_class(
+ CONF.cells.driver)
+ self.driver = cells_driver_cls()
+
+ def post_start_hook(self):
+ """Have the driver start its consumers for inter-cell communication.
+ Also ask our child cells for their capacities and capabilities so
+ we get them more quickly than just waiting for the next periodic
+ update. Receiving the updates from the children will cause us to
+ update our parents. If we don't have any children, just update
+ our parents immediately.
+ """
+ # FIXME(comstud): There's currently no hooks when services are
+ # stopping, so we have no way to stop consumers cleanly.
+ self.driver.start_consumers(self.msg_runner)
+ ctxt = context.get_admin_context()
+ if self.state_manager.get_child_cells():
+ self.msg_runner.ask_children_for_capabilities(ctxt)
+ self.msg_runner.ask_children_for_capacities(ctxt)
+ else:
+ self._update_our_parents(ctxt)
+
+ @manager.periodic_task
+ def _update_our_parents(self, ctxt):
+ """Update our parent cells with our capabilities and capacity
+ if we're at the bottom of the tree.
+ """
+ self.msg_runner.tell_parents_our_capabilities(ctxt)
+ self.msg_runner.tell_parents_our_capacities(ctxt)
+
+ def schedule_run_instance(self, ctxt, host_sched_kwargs):
+ """Pick a cell (possibly ourselves) to build new instance(s)
+ and forward the request accordingly.
+ """
+ # Target is ourselves first.
+ our_cell = self.state_manager.get_my_state()
+ self.msg_runner.schedule_run_instance(ctxt, our_cell,
+ host_sched_kwargs)
+
+ def run_compute_api_method(self, ctxt, cell_name, method_info, call):
+ """Call a compute API method in a specific cell."""
+ response = self.msg_runner.run_compute_api_method(ctxt,
+ cell_name,
+ method_info,
+ call)
+ if call:
+ return response.value_or_raise()
+
+ def instance_update_at_top(self, ctxt, instance):
+ """Update an instance at the top level cell."""
+ self.msg_runner.instance_update_at_top(ctxt, instance)
+
+ def instance_destroy_at_top(self, ctxt, instance):
+ """Destroy an instance at the top level cell."""
+ self.msg_runner.instance_destroy_at_top(ctxt, instance)
+
+ def instance_delete_everywhere(self, ctxt, instance, delete_type):
+ """This is used by API cell when it didn't know what cell
+ an instance was in, but the instance was requested to be
+ deleted or soft_deleted. So, we'll broadcast this everywhere.
+ """
+ self.msg_runner.instance_delete_everywhere(ctxt, instance,
+ delete_type)
+
+ def instance_fault_create_at_top(self, ctxt, instance_fault):
+ """Create an instance fault at the top level cell."""
+ self.msg_runner.instance_fault_create_at_top(ctxt, instance_fault)
+
+ def bw_usage_update_at_top(self, ctxt, bw_update_info):
+ """Update bandwidth usage at top level cell."""
+ self.msg_runner.bw_usage_update_at_top(ctxt, bw_update_info)
diff --git a/nova/cells/messaging.py b/nova/cells/messaging.py
new file mode 100644
index 0000000000..e5617e7427
--- /dev/null
+++ b/nova/cells/messaging.py
@@ -0,0 +1,1047 @@
+# Copyright (c) 2012 Rackspace Hosting
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+"""
+Cell messaging module.
+
+This module defines the different message types that are passed between
+cells and the methods that they can call when the target cell has been
+reached.
+
+The interface into this module is the MessageRunner class.
+"""
+import sys
+
+from eventlet import queue
+
+from nova.cells import state as cells_state
+from nova import compute
+from nova import context
+from nova.db import base
+from nova import exception
+from nova.openstack.common import cfg
+from nova.openstack.common import excutils
+from nova.openstack.common import importutils
+from nova.openstack.common import jsonutils
+from nova.openstack.common import log as logging
+from nova.openstack.common.rpc import common as rpc_common
+from nova.openstack.common import uuidutils
+from nova import utils
+
+
+cell_messaging_opts = [
+ cfg.IntOpt('max_hop_count',
+ default=10,
+ help='Maximum number of hops for cells routing.'),
+ cfg.StrOpt('scheduler',
+ default='nova.cells.scheduler.CellsScheduler',
+ help='Cells scheduler to use')]
+
+CONF = cfg.CONF
+CONF.import_opt('name', 'nova.cells.opts', group='cells')
+CONF.import_opt('call_timeout', 'nova.cells.opts', group='cells')
+CONF.register_opts(cell_messaging_opts, group='cells')
+
+LOG = logging.getLogger(__name__)
+
+# Separator used between cell names for the 'full cell name' and routing
+# path.
+_PATH_CELL_SEP = '!'
+
+
+def _reverse_path(path):
+ """Reverse a path. Used for sending responses upstream."""
+ path_parts = path.split(_PATH_CELL_SEP)
+ path_parts.reverse()
+ return _PATH_CELL_SEP.join(path_parts)
+
+
+def _response_cell_name_from_path(routing_path, neighbor_only=False):
+ """Reverse the routing_path. If we only want to send to our parent,
+ set neighbor_only to True.
+ """
+ path = _reverse_path(routing_path)
+ if not neighbor_only or len(path) == 1:
+ return path
+ return _PATH_CELL_SEP.join(path.split(_PATH_CELL_SEP)[:2])
+
+
+#
+# Message classes.
+#
+
+
+class _BaseMessage(object):
+ """Base message class. It defines data that is passed with every
+ single message through every cell.
+
+ Messages are JSON-ified before sending and turned back into a
+ class instance when being received.
+
+ Every message has a unique ID. This is used to route responses
+ back to callers. In the future, this might be used to detect
+ receiving the same message more than once.
+
+ routing_path is updated on every hop through a cell. The current
+ cell name is appended to it (cells are separated by
+ _PATH_CELL_SEP ('!')). This is used to tell if we've reached the
+ target cell and also to determine the source of a message for
+ responses by reversing it.
+
+ hop_count is incremented and compared against max_hop_count. The
+ only current usefulness of this is to break out of a routing loop
+ if someone has a broken config.
+
+ fanout means to send to all nova-cells services running in a cell.
+ This is useful for capacity and capability broadcasting as well
+ as making sure responses get back to the nova-cells service that
+ is waiting.
+ """
+
+ # Override message_type in a subclass
+ message_type = None
+
+ base_attrs_to_json = ['message_type',
+ 'ctxt',
+ 'method_name',
+ 'method_kwargs',
+ 'direction',
+ 'need_response',
+ 'fanout',
+ 'uuid',
+ 'routing_path',
+ 'hop_count',
+ 'max_hop_count']
+
+ def __init__(self, msg_runner, ctxt, method_name, method_kwargs,
+ direction, need_response=False, fanout=False, uuid=None,
+ routing_path=None, hop_count=0, max_hop_count=None,
+ **kwargs):
+ self.ctxt = ctxt
+ self.resp_queue = None
+ self.msg_runner = msg_runner
+ self.state_manager = msg_runner.state_manager
+ # Copy these.
+ self.base_attrs_to_json = self.base_attrs_to_json[:]
+ # Normally this would just be CONF.cells.name, but going through
+ # the msg_runner allows us to stub it more easily.
+ self.our_path_part = self.msg_runner.our_name
+ self.uuid = uuid
+ if self.uuid is None:
+ self.uuid = uuidutils.generate_uuid()
+ self.method_name = method_name
+ self.method_kwargs = method_kwargs
+ self.direction = direction
+ self.need_response = need_response
+ self.fanout = fanout
+ self.routing_path = routing_path
+ self.hop_count = hop_count
+ if max_hop_count is None:
+ max_hop_count = CONF.cells.max_hop_count
+ self.max_hop_count = max_hop_count
+ self.is_broadcast = False
+ self._append_hop()
+ # Each sub-class should set this when the message is inited
+ self.next_hops = []
+ self.resp_queue = None
+
+ def __repr__(self):
+ _dict = self._to_dict()
+ _dict.pop('method_kwargs')
+ return "<%s: %s>" % (self.__class__.__name__, _dict)
+
+ def _append_hop(self):
+ """Add our hop to the routing_path."""
+ routing_path = (self.routing_path and
+ self.routing_path + _PATH_CELL_SEP or '')
+ self.routing_path = routing_path + self.our_path_part
+ self.hop_count += 1
+
+ def _at_max_hop_count(self, do_raise=True):
+ """Check if we're at the max hop count. If we are and do_raise is
+ True, raise CellMaxHopCountReached. If we are at the max and
+ do_raise is False... return True, else False.
+ """
+ if self.hop_count >= self.max_hop_count:
+ if do_raise:
+ raise exception.CellMaxHopCountReached(
+ hop_count=self.hop_count)
+ return True
+ return False
+
+ def _process_locally(self):
+ """Its been determined that we should process this message in this
+ cell. Go through the MessageRunner to call the appropriate
+ method for this message. Catch the response and/or exception and
+ encode it within a Response instance. Return it so the caller
+ can potentially return it to another cell... or return it to
+ a caller waiting in this cell.
+ """
+ try:
+ resp_value = self.msg_runner._process_message_locally(self)
+ failure = False
+ except Exception as exc:
+ resp_value = sys.exc_info()
+ failure = True
+ LOG.exception(_("Error processing message locally: %(exc)s"),
+ locals())
+ return Response(self.routing_path, resp_value, failure)
+
+ def _setup_response_queue(self):
+ """Shortcut to creating a response queue in the MessageRunner."""
+ self.resp_queue = self.msg_runner._setup_response_queue(self)
+
+ def _cleanup_response_queue(self):
+ """Shortcut to deleting a response queue in the MessageRunner."""
+ if self.resp_queue:
+ self.msg_runner._cleanup_response_queue(self)
+ self.resp_queue = None
+
+ def _wait_for_json_responses(self, num_responses=1):
+ """Wait for response(s) to be put into the eventlet queue. Since
+ each queue entry actually contains a list of JSON-ified responses,
+ combine them all into a single list to return.
+
+ Destroy the eventlet queue when done.
+ """
+ if not self.resp_queue:
+ # Source is not actually expecting a response
+ return
+ responses = []
+ wait_time = CONF.cells.call_timeout
+ try:
+ for x in xrange(num_responses):
+ json_responses = self.resp_queue.get(timeout=wait_time)
+ responses.extend(json_responses)
+ except queue.Empty:
+ raise exception.CellTimeout()
+ finally:
+ self._cleanup_response_queue()
+ return responses
+
+ def _send_json_responses(self, json_responses, neighbor_only=False,
+ fanout=False):
+ """Send list of responses to this message. Responses passed here
+ are JSON-ified. Targeted messages have a single response while
+ Broadcast messages may have multiple responses.
+
+ If this cell was the source of the message, these responses will
+ be returned from self.process().
+
+ Otherwise, we will route the response to the source of the
+ request. If 'neighbor_only' is True, the response will be sent
+ to the neighbor cell, not the original requester. Broadcast
+ messages get aggregated at each hop, so neighbor_only will be
+ True for those messages.
+ """
+ if not self.need_response:
+ return
+ if self.source_is_us():
+ responses = []
+ for json_response in json_responses:
+ responses.append(Response.from_json(json_response))
+ return responses
+ direction = self.direction == 'up' and 'down' or 'up'
+ response_kwargs = {'orig_message': self.to_json(),
+ 'responses': json_responses}
+ target_cell = _response_cell_name_from_path(self.routing_path,
+ neighbor_only=neighbor_only)
+ response = self.msg_runner._create_response_message(self.ctxt,
+ direction, target_cell, self.uuid, response_kwargs,
+ fanout=fanout)
+ response.process()
+
+ def _send_response(self, response, neighbor_only=False):
+ """Send a response to this message. If the source of the
+ request was ourselves, just return the response. It'll be
+ passed back to the caller of self.process(). See DocString for
+ _send_json_responses() as it handles most of the real work for
+ this method.
+
+ 'response' is an instance of Response class.
+ """
+ if not self.need_response:
+ return
+ if self.source_is_us():
+ return response
+ self._send_json_responses([response.to_json()],
+ neighbor_only=neighbor_only)
+
+ def _send_response_from_exception(self, exc_info):
+ """Take an exception as returned from sys.exc_info(), encode
+ it in a Response, and send it.
+ """
+ response = Response(self.routing_path, exc_info, True)
+ return self._send_response(response)
+
+ def _to_dict(self):
+ """Convert a message to a dictionary. Only used internally."""
+ _dict = {}
+ for key in self.base_attrs_to_json:
+ _dict[key] = getattr(self, key)
+ return _dict
+
+ def to_json(self):
+ """Convert a message into JSON for sending to a sibling cell."""
+ _dict = self._to_dict()
+ # Convert context to dict.
+ _dict['ctxt'] = _dict['ctxt'].to_dict()
+ return jsonutils.dumps(_dict)
+
+ def source_is_us(self):
+ """Did this cell create this message?"""
+ return self.routing_path == self.our_path_part
+
+ def process(self):
+ """Process a message. Deal with it locally and/or forward it to a
+ sibling cell.
+
+ Override in a subclass.
+ """
+ raise NotImplementedError()
+
+
+class _TargetedMessage(_BaseMessage):
+ """A targeted message is a message that is destined for a specific
+ single cell.
+
+ 'target_cell' can be a full cell name like 'api!child-cell' or it can
+ be an instance of the CellState class if the target is a neighbor cell.
+ """
+ message_type = 'targeted'
+
+ def __init__(self, msg_runner, ctxt, method_name, method_kwargs,
+ direction, target_cell, **kwargs):
+ super(_TargetedMessage, self).__init__(msg_runner, ctxt,
+ method_name, method_kwargs, direction, **kwargs)
+ if isinstance(target_cell, cells_state.CellState):
+ # Neighbor cell or ourselves. Convert it to a 'full path'.
+ if target_cell.is_me:
+ target_cell = self.our_path_part
+ else:
+ target_cell = '%s%s%s' % (self.our_path_part,
+ _PATH_CELL_SEP,
+ target_cell.name)
+ self.target_cell = target_cell
+ self.base_attrs_to_json.append('target_cell')
+
+ def _get_next_hop(self):
+ """Return the cell name for the next hop. If the next hop is
+ the current cell, return None.
+ """
+ if self.target_cell == self.routing_path:
+ return self.state_manager.my_cell_state
+ target_cell = self.target_cell
+ routing_path = self.routing_path
+ current_hops = routing_path.count(_PATH_CELL_SEP)
+ next_hop_num = current_hops + 1
+ dest_hops = target_cell.count(_PATH_CELL_SEP)
+ if dest_hops < current_hops:
+ reason = _("destination is %(target_cell)s but routing_path "
+ "is %(routing_path)s") % locals()
+ raise exception.CellRoutingInconsistency(reason=reason)
+ dest_name_parts = target_cell.split(_PATH_CELL_SEP)
+ if (_PATH_CELL_SEP.join(dest_name_parts[:next_hop_num]) !=
+ routing_path):
+ reason = _("destination is %(target_cell)s but routing_path "
+ "is %(routing_path)s") % locals()
+ raise exception.CellRoutingInconsistency(reason=reason)
+ next_hop_name = dest_name_parts[next_hop_num]
+ if self.direction == 'up':
+ next_hop = self.state_manager.get_parent_cell(next_hop_name)
+ else:
+ next_hop = self.state_manager.get_child_cell(next_hop_name)
+ if not next_hop:
+ cell_type = 'parent' if self.direction == 'up' else 'child'
+ reason = _("Unknown %(cell_type)s when routing to "
+ "%(target_cell)s") % locals()
+ raise exception.CellRoutingInconsistency(reason=reason)
+ return next_hop
+
+ def process(self):
+ """Process a targeted message. This is called for all cells
+ that touch this message. If the local cell is the one that
+ created this message, we reply directly with a Response instance.
+ If the local cell is not the target, an eventlet queue is created
+ and we wait for the response to show up via another thread
+ receiving the Response back.
+
+ Responses to targeted messages are routed directly back to the
+ source. No eventlet queues are created in intermediate hops.
+
+ All exceptions for processing the message across the whole
+ routing path are caught and encoded within the Response and
+ returned to the caller.
+ """
+ try:
+ next_hop = self._get_next_hop()
+ except Exception as exc:
+ exc_info = sys.exc_info()
+ LOG.exception(_("Error locating next hop for message: %(exc)s"),
+ locals())
+ return self._send_response_from_exception(exc_info)
+
+ if next_hop.is_me:
+ # Final destination.
+ response = self._process_locally()
+ return self._send_response(response)
+
+ # Need to forward via neighbor cell.
+ if self.need_response and self.source_is_us():
+ # A response is needed and the source of the message is
+ # this cell. Create the eventlet queue.
+ self._setup_response_queue()
+ wait_for_response = True
+ else:
+ wait_for_response = False
+
+ try:
+ # This is inside the try block, so we can encode the
+ # exception and return it to the caller.
+ if self.hop_count >= self.max_hop_count:
+ raise exception.CellMaxHopCountReached(
+ hop_count=self.hop_count)
+ next_hop.send_message(self)
+ except Exception as exc:
+ exc_info = sys.exc_info()
+ err_str = _("Failed to send message to cell: %(next_hop)s: "
+ "%(exc)s")
+ LOG.exception(err_str, locals())
+ self._cleanup_response_queue()
+ return self._send_response_from_exception(exc_info)
+
+ if wait_for_response:
+ # Targeted messages only have 1 response.
+ remote_response = self._wait_for_json_responses()[0]
+ return Response.from_json(remote_response)
+
+
+class _BroadcastMessage(_BaseMessage):
+ """A broadcast message. This means to call a method in every single
+ cell going in a certain direction.
+ """
+ message_type = 'broadcast'
+
+ def __init__(self, msg_runner, ctxt, method_name, method_kwargs,
+ direction, run_locally=True, **kwargs):
+ super(_BroadcastMessage, self).__init__(msg_runner, ctxt,
+ method_name, method_kwargs, direction, **kwargs)
+ # The local cell creating this message has the option
+ # to be able to process the message locally or not.
+ self.run_locally = run_locally
+ self.is_broadcast = True
+
+ def _get_next_hops(self):
+ """Set the next hops and return the number of hops. The next
+ hops may include ourself.
+ """
+ if self.hop_count >= self.max_hop_count:
+ return []
+ if self.direction == 'down':
+ return self.state_manager.get_child_cells()
+ else:
+ return self.state_manager.get_parent_cells()
+
+ def _send_to_cells(self, target_cells):
+ """Send a message to multiple cells."""
+ for cell in target_cells:
+ cell.send_message(self)
+
+ def _send_json_responses(self, json_responses):
+ """Responses to broadcast messages always need to go to the
+ neighbor cell from which we received this message. That
+ cell aggregates the responses and makes sure to forward them
+ to the correct source.
+ """
+ return super(_BroadcastMessage, self)._send_json_responses(
+ json_responses, neighbor_only=True, fanout=True)
+
+ def process(self):
+ """Process a broadcast message. This is called for all cells
+ that touch this message.
+
+ The message is sent to all cells in the certain direction and
+ the creator of this message has the option of whether or not
+ to process it locally as well.
+
+ If responses from all cells are required, each hop creates an
+ eventlet queue and waits for responses from its immediate
+ neighbor cells. All responses are then aggregated into a
+ single list and are returned to the neighbor cell until the
+ source is reached.
+
+ When the source is reached, a list of Response instances are
+ returned to the caller.
+
+ All exceptions for processing the message across the whole
+ routing path are caught and encoded within the Response and
+ returned to the caller. It is possible to get a mix of
+ successful responses and failure responses. The caller is
+ responsible for dealing with this.
+ """
+ try:
+ next_hops = self._get_next_hops()
+ except Exception as exc:
+ exc_info = sys.exc_info()
+ LOG.exception(_("Error locating next hops for message: %(exc)s"),
+ locals())
+ return self._send_response_from_exception(exc_info)
+
+ # Short circuit if we don't need to respond
+ if not self.need_response:
+ if self.run_locally:
+ self._process_locally()
+ self._send_to_cells(next_hops)
+ return
+
+ # We'll need to aggregate all of the responses (from ourself
+ # and our sibling cells) into 1 response
+ try:
+ self._setup_response_queue()
+ self._send_to_cells(next_hops)
+ except Exception as exc:
+ # Error just trying to send to cells. Send a single response
+ # with the failure.
+ exc_info = sys.exc_info()
+ LOG.exception(_("Error sending message to next hops: %(exc)s"),
+ locals())
+ self._cleanup_response_queue()
+ return self._send_response_from_exception(exc_info)
+
+ if self.run_locally:
+ # Run locally and store the Response.
+ local_response = self._process_locally()
+ else:
+ local_response = None
+
+ try:
+ remote_responses = self._wait_for_json_responses(
+ num_responses=len(next_hops))
+ except Exception as exc:
+ # Error waiting for responses, most likely a timeout.
+ # Send a single response back with the failure.
+ exc_info = sys.exc_info()
+ err_str = _("Error waiting for responses from neighbor cells: "
+ "%(exc)s")
+ LOG.exception(err_str, locals())
+ return self._send_response_from_exception(exc_info)
+
+ if local_response:
+ remote_responses.append(local_response.to_json())
+ return self._send_json_responses(remote_responses)
+
+
+class _ResponseMessage(_TargetedMessage):
+ """A response message is really just a special targeted message,
+ saying to call 'parse_responses' when we reach the source of a 'call'.
+
+ The 'fanout' attribute on this message may be true if we're responding
+ to a broadcast or if we're about to respond to the source of an
+ original target message. Because multiple nova-cells services may
+ be running within a cell, we need to make sure the response gets
+ back to the correct one, so we have to fanout.
+ """
+ message_type = 'response'
+
+ def __init__(self, msg_runner, ctxt, method_name, method_kwargs,
+ direction, target_cell, response_uuid, **kwargs):
+ super(_ResponseMessage, self).__init__(msg_runner, ctxt,
+ method_name, method_kwargs, direction, target_cell, **kwargs)
+ self.response_uuid = response_uuid
+ self.base_attrs_to_json.append('response_uuid')
+
+ def process(self):
+ """Process a response. If the target is the local cell, process
+ the response here. Otherwise, forward it to where it needs to
+ go.
+ """
+ next_hop = self._get_next_hop()
+ if next_hop.is_me:
+ self._process_locally()
+ return
+ if self.fanout is False:
+ # Really there's 1 more hop on each of these below, but
+ # it doesn't matter for this logic.
+ target_hops = self.target_cell.count(_PATH_CELL_SEP)
+ current_hops = self.routing_path.count(_PATH_CELL_SEP)
+ if current_hops + 1 == target_hops:
+ # Next hop is the target.. so we must fanout. See
+ # DocString above.
+ self.fanout = True
+ next_hop.send_message(self)
+
+
+#
+# Methods that may be called when processing messages after reaching
+# a target cell.
+#
+
+
+class _BaseMessageMethods(base.Base):
+ """Base class for defining methods by message types."""
+ def __init__(self, msg_runner):
+ super(_BaseMessageMethods, self).__init__()
+ self.msg_runner = msg_runner
+ self.state_manager = msg_runner.state_manager
+ self.compute_api = compute.API()
+
+
+class _ResponseMessageMethods(_BaseMessageMethods):
+ """Methods that are called from a ResponseMessage. There's only
+ 1 method (parse_responses) and it is called when the message reaches
+ the source of a 'call'. All we do is stuff the response into the
+ eventlet queue to signal the caller that's waiting.
+ """
+ def parse_responses(self, message, orig_message, responses):
+ self.msg_runner._put_response(message.response_uuid,
+ responses)
+
+
+class _TargetedMessageMethods(_BaseMessageMethods):
+ """These are the methods that can be called when routing a message
+ to a specific cell.
+ """
+ def __init__(self, *args, **kwargs):
+ super(_TargetedMessageMethods, self).__init__(*args, **kwargs)
+
+ def schedule_run_instance(self, message, host_sched_kwargs):
+ """Parent cell told us to schedule new instance creation."""
+ self.msg_runner.scheduler.run_instance(message, host_sched_kwargs)
+
+ def run_compute_api_method(self, message, method_info):
+ """Run a method in the compute api class."""
+ method = method_info['method']
+ fn = getattr(self.compute_api, method, None)
+ if not fn:
+ detail = _("Unknown method '%(method)s' in compute API")
+ raise exception.CellServiceAPIMethodNotFound(
+ detail=detail % locals())
+ args = list(method_info['method_args'])
+ # 1st arg is instance_uuid that we need to turn into the
+ # instance object.
+ instance_uuid = args[0]
+ try:
+ instance = self.db.instance_get_by_uuid(message.ctxt,
+ instance_uuid)
+ except exception.InstanceNotFound:
+ with excutils.save_and_reraise_exception():
+ # Must be a race condition. Let's try to resolve it by
+ # telling the top level cells that this instance doesn't
+ # exist.
+ instance = {'uuid': instance_uuid}
+ self.msg_runner.instance_destroy_at_top(message.ctxt,
+ instance)
+ args[0] = instance
+ return fn(message.ctxt, *args, **method_info['method_kwargs'])
+
+ def update_capabilities(self, message, cell_name, capabilities):
+ """A child cell told us about their capabilities."""
+ LOG.debug(_("Received capabilities from child cell "
+ "%(cell_name)s: %(capabilities)s"), locals())
+ self.state_manager.update_cell_capabilities(cell_name,
+ capabilities)
+ # Go ahead and update our parents now that a child updated us
+ self.msg_runner.tell_parents_our_capabilities(message.ctxt)
+
+ def update_capacities(self, message, cell_name, capacities):
+ """A child cell told us about their capacity."""
+ LOG.debug(_("Received capacities from child cell "
+ "%(cell_name)s: %(capacities)s"), locals())
+ self.state_manager.update_cell_capacities(cell_name,
+ capacities)
+ # Go ahead and update our parents now that a child updated us
+ self.msg_runner.tell_parents_our_capacities(message.ctxt)
+
+ def announce_capabilities(self, message):
+ """A parent cell has told us to send our capabilities, so let's
+ do so.
+ """
+ self.msg_runner.tell_parents_our_capabilities(message.ctxt)
+
+ def announce_capacities(self, message):
+ """A parent cell has told us to send our capacity, so let's
+ do so.
+ """
+ self.msg_runner.tell_parents_our_capacities(message.ctxt)
+
+
+class _BroadcastMessageMethods(_BaseMessageMethods):
+ """These are the methods that can be called as a part of a broadcast
+ message.
+ """
+ def _at_the_top(self):
+ """Are we the API level?"""
+ return not self.state_manager.get_parent_cells()
+
+ def instance_update_at_top(self, message, instance, **kwargs):
+ """Update an instance in the DB if we're a top level cell."""
+ if not self._at_the_top():
+ return
+ instance_uuid = instance['uuid']
+ routing_path = message.routing_path
+ instance['cell_name'] = _reverse_path(routing_path)
+ # Remove things that we can't update in the top level cells.
+ # 'cell_name' is included in this list.. because we'll set it
+ # ourselves based on the reverse of the routing path. metadata
+ # is only updated in the API cell, so we don't listen to what
+ # the child cell tells us.
+ items_to_remove = ['id', 'security_groups', 'instance_type',
+ 'volumes', 'cell_name', 'name', 'metadata']
+ for key in items_to_remove:
+ instance.pop(key, None)
+
+ # Fixup info_cache. We'll have to update this separately if
+ # it exists.
+ info_cache = instance.pop('info_cache', None)
+ if info_cache is not None:
+ info_cache.pop('id', None)
+ info_cache.pop('instance', None)
+
+ # Fixup system_metadata (should be a dict for update, not a list)
+ if ('system_metadata' in instance and
+ isinstance(instance['system_metadata'], list)):
+ sys_metadata = dict([(md['key'], md['value'])
+ for md in instance['system_metadata']])
+ instance['system_metadata'] = sys_metadata
+
+ LOG.debug(_("Got update for instance %(instance_uuid)s: "
+ "%(instance)s") % locals())
+
+ # It's possible due to some weird condition that the instance
+ # was already set as deleted... so we'll attempt to update
+ # it with permissions that allows us to read deleted.
+ with utils.temporary_mutation(message.ctxt, read_deleted="yes"):
+ try:
+ self.db.instance_update(message.ctxt, instance_uuid,
+ instance, update_cells=False)
+ except exception.NotFound:
+ # FIXME(comstud): Strange. Need to handle quotas here,
+ # if we actually want this code to remain..
+ self.db.instance_create(message.ctxt, instance)
+ if info_cache:
+ self.db.instance_info_cache_update(message.ctxt, instance_uuid,
+ info_cache, update_cells=False)
+
+ def instance_destroy_at_top(self, message, instance, **kwargs):
+ """Destroy an instance from the DB if we're a top level cell."""
+ if not self._at_the_top():
+ return
+ instance_uuid = instance['uuid']
+ LOG.debug(_("Got update to delete instance %(instance_uuid)s") %
+ locals())
+ try:
+ self.db.instance_destroy(message.ctxt, instance_uuid,
+ update_cells=False)
+ except exception.InstanceNotFound:
+ pass
+
+ def instance_delete_everywhere(self, message, instance, delete_type,
+ **kwargs):
+ """Call compute API delete() or soft_delete() in every cell.
+ This is used when the API cell doesn't know what cell an instance
+ belongs to but the instance was requested to be deleted or
+ soft-deleted. So, we'll run it everywhere.
+ """
+ LOG.debug(_("Got broadcast to %(delete_type)s delete instance"),
+ locals(), instance=instance)
+ if delete_type == 'soft':
+ self.compute_api.soft_delete(message.ctxt, instance)
+ else:
+ self.compute_api.delete(message.ctxt, instance)
+
+ def instance_fault_create_at_top(self, message, instance_fault, **kwargs):
+ """Destroy an instance from the DB if we're a top level cell."""
+ if not self._at_the_top():
+ return
+ items_to_remove = ['id']
+ for key in items_to_remove:
+ instance_fault.pop(key, None)
+ log_str = _("Got message to create instance fault: "
+ "%(instance_fault)s")
+ LOG.debug(log_str, locals())
+ self.db.instance_fault_create(message.ctxt, instance_fault)
+
+ def bw_usage_update_at_top(self, message, bw_update_info, **kwargs):
+ """Update Bandwidth usage in the DB if we're a top level cell."""
+ if not self._at_the_top():
+ return
+ self.db.bw_usage_update(message.ctxt, **bw_update_info)
+
+
+_CELL_MESSAGE_TYPE_TO_MESSAGE_CLS = {'targeted': _TargetedMessage,
+ 'broadcast': _BroadcastMessage,
+ 'response': _ResponseMessage}
+_CELL_MESSAGE_TYPE_TO_METHODS_CLS = {'targeted': _TargetedMessageMethods,
+ 'broadcast': _BroadcastMessageMethods,
+ 'response': _ResponseMessageMethods}
+
+
+#
+# Below are the public interfaces into this module.
+#
+
+
+class MessageRunner(object):
+ """This class is the main interface into creating messages and
+ processing them.
+
+ Public methods in this class are typically called by the CellsManager
+ to create a new message and process it with the exception of
+ 'message_from_json' which should be used by CellsDrivers to convert
+ a JSONified message it has received back into the appropriate Message
+ class.
+
+ Private methods are used internally when we need to keep some
+ 'global' state. For instance, eventlet queues used for responses are
+ held in this class. Also, when a Message is process()ed above and
+ it's determined we should take action locally,
+ _process_message_locally() will be called.
+
+ When needing to add a new method to call in a Cell2Cell message,
+ define the new method below and also add it to the appropriate
+ MessageMethods class where the real work will be done.
+ """
+
+ def __init__(self, state_manager):
+ self.state_manager = state_manager
+ cells_scheduler_cls = importutils.import_class(
+ CONF.cells.scheduler)
+ self.scheduler = cells_scheduler_cls(self)
+ self.response_queues = {}
+ self.methods_by_type = {}
+ self.our_name = CONF.cells.name
+ for msg_type, cls in _CELL_MESSAGE_TYPE_TO_METHODS_CLS.iteritems():
+ self.methods_by_type[msg_type] = cls(self)
+
+ def _process_message_locally(self, message):
+ """Message processing will call this when its determined that
+ the message should be processed within this cell. Find the
+ method to call based on the message type, and call it. The
+ caller is responsible for catching exceptions and returning
+ results to cells, if needed.
+ """
+ methods = self.methods_by_type[message.message_type]
+ fn = getattr(methods, message.method_name)
+ return fn(message, **message.method_kwargs)
+
+ def _put_response(self, response_uuid, response):
+ """Put a response into a response queue. This is called when
+ a _ResponseMessage is processed in the cell that initiated a
+ 'call' to another cell.
+ """
+ resp_queue = self.response_queues.get(response_uuid)
+ if not resp_queue:
+ # Response queue is gone. We must have restarted or we
+ # received a response after our timeout period.
+ return
+ resp_queue.put(response)
+
+ def _setup_response_queue(self, message):
+ """Set up an eventlet queue to use to wait for replies.
+
+ Replies come back from the target cell as a _ResponseMessage
+ being sent back to the source.
+ """
+ resp_queue = queue.Queue()
+ self.response_queues[message.uuid] = resp_queue
+ return resp_queue
+
+ def _cleanup_response_queue(self, message):
+ """Stop tracking the response queue either because we're
+ done receiving responses, or we've timed out.
+ """
+ try:
+ del self.response_queues[message.uuid]
+ except KeyError:
+ # Ignore if queue is gone already somehow.
+ pass
+
+ def _create_response_message(self, ctxt, direction, target_cell,
+ response_uuid, response_kwargs, **kwargs):
+ """Create a ResponseMessage. This is used internally within
+ the messaging module.
+ """
+ return _ResponseMessage(self, ctxt, 'parse_responses',
+ response_kwargs, direction, target_cell,
+ response_uuid, **kwargs)
+
+ def message_from_json(self, json_message):
+ """Turns a message in JSON format into an appropriate Message
+ instance. This is called when cells receive a message from
+ another cell.
+ """
+ message_dict = jsonutils.loads(json_message)
+ message_type = message_dict.pop('message_type')
+ # Need to convert context back.
+ ctxt = message_dict['ctxt']
+ message_dict['ctxt'] = context.RequestContext.from_dict(ctxt)
+ message_cls = _CELL_MESSAGE_TYPE_TO_MESSAGE_CLS[message_type]
+ return message_cls(self, **message_dict)
+
+ def ask_children_for_capabilities(self, ctxt):
+ """Tell child cells to send us capabilities. This is typically
+ called on startup of the nova-cells service.
+ """
+ child_cells = self.state_manager.get_child_cells()
+ for child_cell in child_cells:
+ message = _TargetedMessage(self, ctxt,
+ 'announce_capabilities',
+ dict(), 'down', child_cell)
+ message.process()
+
+ def ask_children_for_capacities(self, ctxt):
+ """Tell child cells to send us capacities. This is typically
+ called on startup of the nova-cells service.
+ """
+ child_cells = self.state_manager.get_child_cells()
+ for child_cell in child_cells:
+ message = _TargetedMessage(self, ctxt, 'announce_capacities',
+ dict(), 'down', child_cell)
+ message.process()
+
+ def tell_parents_our_capabilities(self, ctxt):
+ """Send our capabilities to parent cells."""
+ parent_cells = self.state_manager.get_parent_cells()
+ if not parent_cells:
+ return
+ my_cell_info = self.state_manager.get_my_state()
+ capabs = self.state_manager.get_our_capabilities()
+ LOG.debug(_("Updating parents with our capabilities: %(capabs)s"),
+ locals())
+ # We have to turn the sets into lists so they can potentially
+ # be json encoded when the raw message is sent.
+ for key, values in capabs.items():
+ capabs[key] = list(values)
+ method_kwargs = {'cell_name': my_cell_info.name,
+ 'capabilities': capabs}
+ for cell in parent_cells:
+ message = _TargetedMessage(self, ctxt, 'update_capabilities',
+ method_kwargs, 'up', cell, fanout=True)
+ message.process()
+
+ def tell_parents_our_capacities(self, ctxt):
+ """Send our capacities to parent cells."""
+ parent_cells = self.state_manager.get_parent_cells()
+ if not parent_cells:
+ return
+ my_cell_info = self.state_manager.get_my_state()
+ capacities = self.state_manager.get_our_capacities()
+ LOG.debug(_("Updating parents with our capacities: %(capacities)s"),
+ locals())
+ method_kwargs = {'cell_name': my_cell_info.name,
+ 'capacities': capacities}
+ for cell in parent_cells:
+ message = _TargetedMessage(self, ctxt, 'update_capacities',
+ method_kwargs, 'up', cell, fanout=True)
+ message.process()
+
+ def schedule_run_instance(self, ctxt, target_cell, host_sched_kwargs):
+ """Called by the scheduler to tell a child cell to schedule
+ a new instance for build.
+ """
+ method_kwargs = dict(host_sched_kwargs=host_sched_kwargs)
+ message = _TargetedMessage(self, ctxt, 'schedule_run_instance',
+ method_kwargs, 'down',
+ target_cell)
+ message.process()
+
+ def run_compute_api_method(self, ctxt, cell_name, method_info, call):
+ """Call a compute API method in a specific cell."""
+ message = _TargetedMessage(self, ctxt, 'run_compute_api_method',
+ dict(method_info=method_info), 'down',
+ cell_name, need_response=call)
+ return message.process()
+
+ def instance_update_at_top(self, ctxt, instance):
+ """Update an instance at the top level cell."""
+ message = _BroadcastMessage(self, ctxt, 'instance_update_at_top',
+ dict(instance=instance), 'up',
+ run_locally=False)
+ message.process()
+
+ def instance_destroy_at_top(self, ctxt, instance):
+ """Destroy an instance at the top level cell."""
+ message = _BroadcastMessage(self, ctxt, 'instance_destroy_at_top',
+ dict(instance=instance), 'up',
+ run_locally=False)
+ message.process()
+
+ def instance_delete_everywhere(self, ctxt, instance, delete_type):
+ """This is used by API cell when it didn't know what cell
+ an instance was in, but the instance was requested to be
+ deleted or soft_deleted. So, we'll broadcast this everywhere.
+ """
+ method_kwargs = dict(instance=instance, delete_type=delete_type)
+ message = _BroadcastMessage(self, ctxt,
+ 'instance_delete_everywhere',
+ method_kwargs, 'down',
+ run_locally=False)
+ message.process()
+
+ def instance_fault_create_at_top(self, ctxt, instance_fault):
+ """Create an instance fault at the top level cell."""
+ message = _BroadcastMessage(self, ctxt,
+ 'instance_fault_create_at_top',
+ dict(instance_fault=instance_fault),
+ 'up', run_locally=False)
+ message.process()
+
+ def bw_usage_update_at_top(self, ctxt, bw_update_info):
+ """Update bandwidth usage at top level cell."""
+ message = _BroadcastMessage(self, ctxt, 'bw_usage_update_at_top',
+ dict(bw_update_info=bw_update_info),
+ 'up', run_locally=False)
+ message.process()
+
+ @staticmethod
+ def get_message_types():
+ return _CELL_MESSAGE_TYPE_TO_MESSAGE_CLS.keys()
+
+
+class Response(object):
+ """Holds a response from a cell. If there was a failure, 'failure'
+ will be True and 'response' will contain an encoded Exception.
+ """
+ def __init__(self, cell_name, value, failure):
+ self.failure = failure
+ self.cell_name = cell_name
+ self.value = value
+
+ def to_json(self):
+ resp_value = self.value
+ if self.failure:
+ resp_value = rpc_common.serialize_remote_exception(resp_value,
+ log_failure=False)
+ _dict = {'cell_name': self.cell_name,
+ 'value': resp_value,
+ 'failure': self.failure}
+ return jsonutils.dumps(_dict)
+
+ @classmethod
+ def from_json(cls, json_message):
+ _dict = jsonutils.loads(json_message)
+ if _dict['failure']:
+ resp_value = rpc_common.deserialize_remote_exception(
+ CONF, _dict['value'])
+ _dict['value'] = resp_value
+ return cls(**_dict)
+
+ def value_or_raise(self):
+ if self.failure:
+ if isinstance(self.value, (tuple, list)):
+ raise self.value[0], self.value[1], self.value[2]
+ else:
+ raise self.value
+ return self.value
diff --git a/nova/cells/opts.py b/nova/cells/opts.py
new file mode 100644
index 0000000000..45b453ebc0
--- /dev/null
+++ b/nova/cells/opts.py
@@ -0,0 +1,44 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright (c) 2012 Rackspace Hosting
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+"""
+Global cells config options
+"""
+
+from nova.openstack.common import cfg
+
+cells_opts = [
+ cfg.BoolOpt('enable',
+ default=False,
+ help='Enable cell functionality'),
+ cfg.StrOpt('topic',
+ default='cells',
+ help='the topic cells nodes listen on'),
+ cfg.StrOpt('manager',
+ default='nova.cells.manager.CellsManager',
+ help='Manager for cells'),
+ cfg.StrOpt('name',
+ default='nova',
+ help='name of this cell'),
+ cfg.ListOpt('capabilities',
+ default=['hypervisor=xenserver;kvm', 'os=linux;windows'],
+ help='Key/Multi-value list with the capabilities of the cell'),
+ cfg.IntOpt('call_timeout',
+ default=60,
+ help='Seconds to wait for response from a call to a cell.'),
+]
+
+cfg.CONF.register_opts(cells_opts, group='cells')
diff --git a/nova/cells/rpc_driver.py b/nova/cells/rpc_driver.py
new file mode 100644
index 0000000000..5e420aa8eb
--- /dev/null
+++ b/nova/cells/rpc_driver.py
@@ -0,0 +1,165 @@
+# Copyright (c) 2012 Rackspace Hosting
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+"""
+Cells RPC Communication Driver
+"""
+from nova.cells import driver
+from nova.openstack.common import cfg
+from nova.openstack.common import rpc
+from nova.openstack.common.rpc import dispatcher as rpc_dispatcher
+from nova.openstack.common.rpc import proxy as rpc_proxy
+
+cell_rpc_driver_opts = [
+ cfg.StrOpt('rpc_driver_queue_base',
+ default='cells.intercell',
+ help="Base queue name to use when communicating between "
+ "cells. Various topics by message type will be "
+ "appended to this.")]
+
+CONF = cfg.CONF
+CONF.register_opts(cell_rpc_driver_opts, group='cells')
+CONF.import_opt('call_timeout', 'nova.cells.opts', group='cells')
+
+_CELL_TO_CELL_RPC_API_VERSION = '1.0'
+
+
+class CellsRPCDriver(driver.BaseCellsDriver):
+ """Driver for cell<->cell communication via RPC. This is used to
+ setup the RPC consumers as well as to send a message to another cell.
+
+ One instance of this class will be created for every neighbor cell
+ that we find in the DB and it will be associated with the cell in
+ its CellState.
+
+ One instance is also created by the cells manager for setting up
+ the consumers.
+ """
+ BASE_RPC_API_VERSION = _CELL_TO_CELL_RPC_API_VERSION
+
+ def __init__(self, *args, **kwargs):
+ super(CellsRPCDriver, self).__init__(*args, **kwargs)
+ self.rpc_connections = []
+ self.intercell_rpcapi = InterCellRPCAPI(
+ self.BASE_RPC_API_VERSION)
+
+ def _start_consumer(self, dispatcher, topic):
+ """Start an RPC consumer."""
+ conn = rpc.create_connection(new=True)
+ conn.create_consumer(topic, dispatcher, fanout=False)
+ conn.create_consumer(topic, dispatcher, fanout=True)
+ self.rpc_connections.append(conn)
+ conn.consume_in_thread()
+ return conn
+
+ def start_consumers(self, msg_runner):
+ """Start RPC consumers.
+
+ Start up 2 separate consumers for handling inter-cell
+ communication via RPC. Both handle the same types of
+ messages, but requests/replies are separated to solve
+ potential deadlocks. (If we used the same queue for both,
+ it's possible to exhaust the RPC thread pool while we wait
+ for replies.. such that we'd never consume a reply.)
+ """
+ topic_base = CONF.cells.rpc_driver_queue_base
+ proxy_manager = InterCellRPCDispatcher(msg_runner)
+ dispatcher = rpc_dispatcher.RpcDispatcher([proxy_manager])
+ for msg_type in msg_runner.get_message_types():
+ topic = '%s.%s' % (topic_base, msg_type)
+ self._start_consumer(dispatcher, topic)
+
+ def stop_consumers(self):
+ """Stop RPC consumers.
+
+ NOTE: Currently there's no hooks when stopping services
+ to have managers cleanup, so this is not currently called.
+ """
+ for conn in self.rpc_connections:
+ conn.close()
+
+ def send_message_to_cell(self, cell_state, message):
+ """Use the IntercellRPCAPI to send a message to a cell."""
+ self.intercell_rpcapi.send_message_to_cell(cell_state, message)
+
+
+class InterCellRPCAPI(rpc_proxy.RpcProxy):
+ """Client side of the Cell<->Cell RPC API.
+
+ The CellsRPCDriver uses this to make calls to another cell.
+
+ API version history:
+ 1.0 - Initial version.
+ """
+ def __init__(self, default_version):
+ super(InterCellRPCAPI, self).__init__(None, default_version)
+
+ @staticmethod
+ def _get_server_params_for_cell(next_hop):
+ """Turn the DB information for a cell into the parameters
+ needed for the RPC call.
+ """
+ param_map = {'username': 'username',
+ 'password': 'password',
+ 'rpc_host': 'hostname',
+ 'rpc_port': 'port',
+ 'rpc_virtual_host': 'virtual_host'}
+ server_params = {}
+ for source, target in param_map.items():
+ if next_hop.db_info[source]:
+ server_params[target] = next_hop.db_info[source]
+ return server_params
+
+ def send_message_to_cell(self, cell_state, message):
+ """Send a message to another cell by JSON-ifying the message and
+ making an RPC cast to 'process_message'. If the message says to
+ fanout, do it. The topic that is used will be
+ 'CONF.rpc_driver_queue_base.<message_type>'.
+ """
+ ctxt = message.ctxt
+ json_message = message.to_json()
+ rpc_message = self.make_msg('process_message', message=json_message)
+ topic_base = CONF.cells.rpc_driver_queue_base
+ topic = '%s.%s' % (topic_base, message.message_type)
+ server_params = self._get_server_params_for_cell(cell_state)
+ if message.fanout:
+ self.fanout_cast_to_server(ctxt, server_params,
+ rpc_message, topic=topic)
+ else:
+ self.cast_to_server(ctxt, server_params,
+ rpc_message, topic=topic)
+
+
+class InterCellRPCDispatcher(object):
+ """RPC Dispatcher to handle messages received from other cells.
+
+ All messages received here have come from a sibling cell. Depending
+ on the ultimate target and type of message, we may process the message
+ in this cell, relay the message to another sibling cell, or both. This
+ logic is defined by the message class in the messaging module.
+ """
+ BASE_RPC_API_VERSION = _CELL_TO_CELL_RPC_API_VERSION
+
+ def __init__(self, msg_runner):
+ """Init the Intercell RPC Dispatcher."""
+ self.msg_runner = msg_runner
+
+ def process_message(self, _ctxt, message):
+ """We received a message from another cell. Use the MessageRunner
+ to turn this from JSON back into an instance of the correct
+ Message class. Then process it!
+ """
+ message = self.msg_runner.message_from_json(message)
+ message.process()
diff --git a/nova/cells/rpcapi.py b/nova/cells/rpcapi.py
new file mode 100644
index 0000000000..8ce2988295
--- /dev/null
+++ b/nova/cells/rpcapi.py
@@ -0,0 +1,138 @@
+# Copyright (c) 2012 Rackspace Hosting
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+"""
+Client side of nova-cells RPC API (for talking to the nova-cells service
+within a cell).
+
+This is different than communication between child and parent nova-cells
+services. That communication is handled by the cells driver via the
+messging module.
+"""
+
+from nova.openstack.common import cfg
+from nova.openstack.common import jsonutils
+from nova.openstack.common import log as logging
+from nova.openstack.common.rpc import proxy as rpc_proxy
+
+LOG = logging.getLogger(__name__)
+
+CONF = cfg.CONF
+CONF.import_opt('enable', 'nova.cells.opts', group='cells')
+CONF.import_opt('topic', 'nova.cells.opts', group='cells')
+
+
+class CellsAPI(rpc_proxy.RpcProxy):
+ '''Cells client-side RPC API
+
+ API version history:
+
+ 1.0 - Initial version.
+ '''
+ BASE_RPC_API_VERSION = '1.0'
+
+ def __init__(self):
+ super(CellsAPI, self).__init__(topic=CONF.cells.topic,
+ default_version=self.BASE_RPC_API_VERSION)
+
+ def cast_compute_api_method(self, ctxt, cell_name, method,
+ *args, **kwargs):
+ """Make a cast to a compute API method in a certain cell."""
+ method_info = {'method': method,
+ 'method_args': args,
+ 'method_kwargs': kwargs}
+ self.cast(ctxt, self.make_msg('run_compute_api_method',
+ cell_name=cell_name,
+ method_info=method_info,
+ call=False))
+
+ def call_compute_api_method(self, ctxt, cell_name, method,
+ *args, **kwargs):
+ """Make a call to a compute API method in a certain cell."""
+ method_info = {'method': method,
+ 'method_args': args,
+ 'method_kwargs': kwargs}
+ return self.call(ctxt, self.make_msg('run_compute_api_method',
+ cell_name=cell_name,
+ method_info=method_info,
+ call=True))
+
+ def schedule_run_instance(self, ctxt, **kwargs):
+ """Schedule a new instance for creation."""
+ self.cast(ctxt, self.make_msg('schedule_run_instance',
+ host_sched_kwargs=kwargs))
+
+ def instance_update_at_top(self, ctxt, instance):
+ """Update instance at API level."""
+ if not CONF.cells.enable:
+ return
+ # Make sure we have a dict, not a SQLAlchemy model
+ instance_p = jsonutils.to_primitive(instance)
+ self.cast(ctxt, self.make_msg('instance_update_at_top',
+ instance=instance_p))
+
+ def instance_destroy_at_top(self, ctxt, instance):
+ """Destroy instance at API level."""
+ if not CONF.cells.enable:
+ return
+ instance_p = jsonutils.to_primitive(instance)
+ self.cast(ctxt, self.make_msg('instance_destroy_at_top',
+ instance=instance_p))
+
+ def instance_delete_everywhere(self, ctxt, instance, delete_type):
+ """Delete instance everywhere. delete_type may be 'soft'
+ or 'hard'. This is generally only used to resolve races
+ when API cell doesn't know to what cell an instance belongs.
+ """
+ if not CONF.cells.enable:
+ return
+ instance_p = jsonutils.to_primitive(instance)
+ self.cast(ctxt, self.make_msg('instance_delete_everywhere',
+ instance=instance_p,
+ delete_type=delete_type))
+
+ def instance_fault_create_at_top(self, ctxt, instance_fault):
+ """Create an instance fault at the top."""
+ if not CONF.cells.enable:
+ return
+ instance_fault_p = jsonutils.to_primitive(instance_fault)
+ self.cast(ctxt, self.make_msg('instance_fault_create_at_top',
+ instance_fault=instance_fault_p))
+
+ def bw_usage_update_at_top(self, ctxt, uuid, mac, start_period,
+ bw_in, bw_out, last_ctr_in, last_ctr_out, last_refreshed=None):
+ """Broadcast upwards that bw_usage was updated."""
+ if not CONF.cells.enable:
+ return
+ bw_update_info = {'uuid': uuid,
+ 'mac': mac,
+ 'start_period': start_period,
+ 'bw_in': bw_in,
+ 'bw_out': bw_out,
+ 'last_ctr_in': last_ctr_in,
+ 'last_ctr_out': last_ctr_out,
+ 'last_refreshed': last_refreshed}
+ self.cast(ctxt, self.make_msg('bw_usage_update_at_top',
+ bw_update_info=bw_update_info))
+
+ def instance_info_cache_update_at_top(self, ctxt, instance_info_cache):
+ """Broadcast up that an instance's info_cache has changed."""
+ if not CONF.cells.enable:
+ return
+ iicache = jsonutils.to_primitive(instance_info_cache)
+ instance = {'uuid': iicache['instance_uuid'],
+ 'info_cache': iicache}
+ self.cast(ctxt, self.make_msg('instance_update_at_top',
+ instance=instance))
diff --git a/nova/cells/scheduler.py b/nova/cells/scheduler.py
new file mode 100644
index 0000000000..0b730290a9
--- /dev/null
+++ b/nova/cells/scheduler.py
@@ -0,0 +1,136 @@
+# Copyright (c) 2012 Rackspace Hosting
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+"""
+Cells Scheduler
+"""
+import random
+import time
+
+from nova import compute
+from nova.compute import vm_states
+from nova.db import base
+from nova import exception
+from nova.openstack.common import cfg
+from nova.openstack.common import log as logging
+from nova.scheduler import rpcapi as scheduler_rpcapi
+
+cell_scheduler_opts = [
+ cfg.IntOpt('scheduler_retries',
+ default=10,
+ help='How many retries when no cells are available.'),
+ cfg.IntOpt('scheduler_retry_delay',
+ default=2,
+ help='How often to retry in seconds when no cells are '
+ 'available.')
+]
+
+LOG = logging.getLogger(__name__)
+
+CONF = cfg.CONF
+CONF.register_opts(cell_scheduler_opts, group='cells')
+
+
+class CellsScheduler(base.Base):
+ """The cells scheduler."""
+
+ def __init__(self, msg_runner):
+ super(CellsScheduler, self).__init__()
+ self.msg_runner = msg_runner
+ self.state_manager = msg_runner.state_manager
+ self.compute_api = compute.API()
+ self.scheduler_rpcapi = scheduler_rpcapi.SchedulerAPI()
+
+ def _create_instances_here(self, ctxt, request_spec):
+ instance_values = request_spec['instance_properties']
+ for instance_uuid in request_spec['instance_uuids']:
+ instance_values['uuid'] = instance_uuid
+ instance = self.compute_api.create_db_entry_for_new_instance(
+ ctxt,
+ request_spec['instance_type'],
+ request_spec['image'],
+ instance_values,
+ request_spec['security_group'],
+ request_spec['block_device_mapping'])
+ self.msg_runner.instance_update_at_top(ctxt, instance)
+
+ def _get_possible_cells(self):
+ cells = set(self.state_manager.get_child_cells())
+ our_cell = self.state_manager.get_my_state()
+ # Include our cell in the list, if we have any capacity info
+ if not cells or our_cell.capacities:
+ cells.add(our_cell)
+ return cells
+
+ def _run_instance(self, message, host_sched_kwargs):
+ """Attempt to schedule instance(s). If we have no cells
+ to try, raise exception.NoCellsAvailable
+ """
+ ctxt = message.ctxt
+ request_spec = host_sched_kwargs['request_spec']
+
+ # The message we might forward to a child cell
+ cells = self._get_possible_cells()
+ if not cells:
+ raise exception.NoCellsAvailable()
+ cells = list(cells)
+
+ # Random selection for now
+ random.shuffle(cells)
+ target_cell = cells[0]
+
+ LOG.debug(_("Scheduling with routing_path=%(routing_path)s"),
+ locals())
+
+ if target_cell.is_me:
+ # Need to create instance DB entries as the host scheduler
+ # expects that the instance(s) already exists.
+ self._create_instances_here(ctxt, request_spec)
+ self.scheduler_rpcapi.run_instance(ctxt,
+ **host_sched_kwargs)
+ return
+ self.msg_runner.schedule_run_instance(ctxt, target_cell,
+ host_sched_kwargs)
+
+ def run_instance(self, message, host_sched_kwargs):
+ """Pick a cell where we should create a new instance."""
+ try:
+ for i in xrange(max(0, CONF.cells.scheduler_retries) + 1):
+ try:
+ return self._run_instance(message, host_sched_kwargs)
+ except exception.NoCellsAvailable:
+ if i == max(0, CONF.cells.scheduler_retries):
+ raise
+ sleep_time = max(1, CONF.cells.scheduler_retry_delay)
+ LOG.info(_("No cells available when scheduling. Will "
+ "retry in %(sleep_time)s second(s)"), locals())
+ time.sleep(sleep_time)
+ continue
+ except Exception:
+ request_spec = host_sched_kwargs['request_spec']
+ instance_uuids = request_spec['instance_uuids']
+ LOG.exception(_("Error scheduling instances %(instance_uuids)s"),
+ locals())
+ ctxt = message.ctxt
+ for instance_uuid in instance_uuids:
+ self.msg_runner.instance_update_at_top(ctxt,
+ {'uuid': instance_uuid,
+ 'vm_state': vm_states.ERROR})
+ try:
+ self.db.instance_update(ctxt,
+ instance_uuid,
+ {'vm_state': vm_states.ERROR})
+ except Exception:
+ pass
diff --git a/nova/cells/state.py b/nova/cells/state.py
new file mode 100644
index 0000000000..c6f8f32202
--- /dev/null
+++ b/nova/cells/state.py
@@ -0,0 +1,346 @@
+# Copyright (c) 2012 Rackspace Hosting
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+"""
+CellState Manager
+"""
+import copy
+import datetime
+import functools
+
+from nova.cells import rpc_driver
+from nova import context
+from nova.db import base
+from nova.openstack.common import cfg
+from nova.openstack.common import lockutils
+from nova.openstack.common import log as logging
+from nova.openstack.common import timeutils
+
+cell_state_manager_opts = [
+ cfg.IntOpt('db_check_interval',
+ default=60,
+ help='Seconds between getting fresh cell info from db.'),
+]
+
+
+LOG = logging.getLogger(__name__)
+
+CONF = cfg.CONF
+CONF.import_opt('host', 'nova.config')
+CONF.import_opt('name', 'nova.cells.opts', group='cells')
+#CONF.import_opt('capabilities', 'nova.cells.opts', group='cells')
+CONF.register_opts(cell_state_manager_opts, group='cells')
+
+
+class CellState(object):
+ """Holds information for a particular cell."""
+ def __init__(self, cell_name, is_me=False):
+ self.name = cell_name
+ self.is_me = is_me
+ self.last_seen = datetime.datetime.min
+ self.capabilities = {}
+ self.capacities = {}
+ self.db_info = {}
+ # TODO(comstud): The DB will specify the driver to use to talk
+ # to this cell, but there's no column for this yet. The only
+ # available driver is the rpc driver.
+ self.driver = rpc_driver.CellsRPCDriver()
+
+ def update_db_info(self, cell_db_info):
+ """Update cell credentials from db"""
+ self.db_info = dict(
+ [(k, v) for k, v in cell_db_info.iteritems()
+ if k != 'name'])
+
+ def update_capabilities(self, cell_metadata):
+ """Update cell capabilities for a cell."""
+ self.last_seen = timeutils.utcnow()
+ self.capabilities = cell_metadata
+
+ def update_capacities(self, capacities):
+ """Update capacity information for a cell."""
+ self.last_seen = timeutils.utcnow()
+ self.capacities = capacities
+
+ def get_cell_info(self):
+ """Return subset of cell information for OS API use."""
+ db_fields_to_return = ['id', 'is_parent', 'weight_scale',
+ 'weight_offset', 'username', 'rpc_host', 'rpc_port']
+ cell_info = dict(name=self.name, capabilities=self.capabilities)
+ if self.db_info:
+ for field in db_fields_to_return:
+ cell_info[field] = self.db_info[field]
+ return cell_info
+
+ def send_message(self, message):
+ """Send a message to a cell. Just forward this to the driver,
+ passing ourselves and the message as arguments.
+ """
+ self.driver.send_message_to_cell(self, message)
+
+ def __repr__(self):
+ me = "me" if self.is_me else "not_me"
+ return "Cell '%s' (%s)" % (self.name, me)
+
+
+def sync_from_db(f):
+ """Use as a decorator to wrap methods that use cell information to
+ make sure they sync the latest information from the DB periodically.
+ """
+ @functools.wraps(f)
+ def wrapper(self, *args, **kwargs):
+ if self._time_to_sync():
+ self._cell_db_sync()
+ return f(self, *args, **kwargs)
+ return wrapper
+
+
+class CellStateManager(base.Base):
+ def __init__(self, cell_state_cls=None):
+ super(CellStateManager, self).__init__()
+ if not cell_state_cls:
+ cell_state_cls = CellState
+ self.cell_state_cls = cell_state_cls
+ self.my_cell_state = cell_state_cls(CONF.cells.name, is_me=True)
+ self.parent_cells = {}
+ self.child_cells = {}
+ self.last_cell_db_check = datetime.datetime.min
+ self._cell_db_sync()
+ my_cell_capabs = {}
+ for cap in CONF.cells.capabilities:
+ name, value = cap.split('=', 1)
+ if ';' in value:
+ values = set(value.split(';'))
+ else:
+ values = set([value])
+ my_cell_capabs[name] = values
+ self.my_cell_state.update_capabilities(my_cell_capabs)
+
+ def _refresh_cells_from_db(self, ctxt):
+ """Make our cell info map match the db."""
+ # Add/update existing cells ...
+ db_cells = self.db.cell_get_all(ctxt)
+ db_cells_dict = dict([(cell['name'], cell) for cell in db_cells])
+
+ # Update current cells. Delete ones that disappeared
+ for cells_dict in (self.parent_cells, self.child_cells):
+ for cell_name, cell_info in cells_dict.items():
+ is_parent = cell_info.db_info['is_parent']
+ db_dict = db_cells_dict.get(cell_name)
+ if db_dict and is_parent == db_dict['is_parent']:
+ cell_info.update_db_info(db_dict)
+ else:
+ del cells_dict[cell_name]
+
+ # Add new cells
+ for cell_name, db_info in db_cells_dict.items():
+ if db_info['is_parent']:
+ cells_dict = self.parent_cells
+ else:
+ cells_dict = self.child_cells
+ if cell_name not in cells_dict:
+ cells_dict[cell_name] = self.cell_state_cls(cell_name)
+ cells_dict[cell_name].update_db_info(db_info)
+
+ def _time_to_sync(self):
+ """Is it time to sync the DB against our memory cache?"""
+ diff = timeutils.utcnow() - self.last_cell_db_check
+ return diff.seconds >= CONF.cells.db_check_interval
+
+ def _update_our_capacity(self, context):
+ """Update our capacity in the self.my_cell_state CellState.
+
+ This will add/update 2 entries in our CellState.capacities,
+ 'ram_free' and 'disk_free'.
+
+ The values of these are both dictionaries with the following
+ format:
+
+ {'total_mb': <total_memory_free_in_the_cell>,
+ 'units_by_mb: <units_dictionary>}
+
+ <units_dictionary> contains the number of units that we can
+ build for every instance_type that we have. This number is
+ computed by looking at room available on every compute_node.
+
+ Take the following instance_types as an example:
+
+ [{'memory_mb': 1024, 'root_gb': 10, 'ephemeral_gb': 100},
+ {'memory_mb': 2048, 'root_gb': 20, 'ephemeral_gb': 200}]
+
+ capacities['ram_free']['units_by_mb'] would contain the following:
+
+ {'1024': <number_of_instances_that_will_fit>,
+ '2048': <number_of_instances_that_will_fit>}
+
+ capacities['disk_free']['units_by_mb'] would contain the following:
+
+ {'122880': <number_of_instances_that_will_fit>,
+ '225280': <number_of_instances_that_will_fit>}
+
+ Units are in MB, so 122880 = (10 + 100) * 1024.
+
+ NOTE(comstud): Perhaps we should only report a single number
+ available per instance_type.
+ """
+
+ compute_hosts = {}
+
+ def _get_compute_hosts():
+ compute_nodes = self.db.compute_node_get_all(context)
+ for compute in compute_nodes:
+ service = compute['service']
+ if not service or service['disabled']:
+ continue
+ host = service['host']
+ compute_hosts[host] = {
+ 'free_ram_mb': compute['free_ram_mb'],
+ 'free_disk_mb': compute['free_disk_gb'] * 1024}
+
+ _get_compute_hosts()
+ if not compute_hosts:
+ self.my_cell_state.update_capacities({})
+ return
+
+ ram_mb_free_units = {}
+ disk_mb_free_units = {}
+ total_ram_mb_free = 0
+ total_disk_mb_free = 0
+
+ def _free_units(tot, per_inst):
+ if per_inst:
+ return max(0, int(tot / per_inst))
+ else:
+ return 0
+
+ def _update_from_values(values, instance_type):
+ memory_mb = instance_type['memory_mb']
+ disk_mb = (instance_type['root_gb'] +
+ instance_type['ephemeral_gb']) * 1024
+ ram_mb_free_units.setdefault(str(memory_mb), 0)
+ disk_mb_free_units.setdefault(str(disk_mb), 0)
+ ram_free_units = _free_units(compute_values['free_ram_mb'],
+ memory_mb)
+ disk_free_units = _free_units(compute_values['free_disk_mb'],
+ disk_mb)
+ ram_mb_free_units[str(memory_mb)] += ram_free_units
+ disk_mb_free_units[str(disk_mb)] += disk_free_units
+
+ instance_types = self.db.instance_type_get_all(context)
+
+ for compute_values in compute_hosts.values():
+ total_ram_mb_free += compute_values['free_ram_mb']
+ total_disk_mb_free += compute_values['free_disk_mb']
+ for instance_type in instance_types:
+ _update_from_values(compute_values, instance_type)
+
+ capacities = {'ram_free': {'total_mb': total_ram_mb_free,
+ 'units_by_mb': ram_mb_free_units},
+ 'disk_free': {'total_mb': total_disk_mb_free,
+ 'units_by_mb': disk_mb_free_units}}
+ self.my_cell_state.update_capacities(capacities)
+
+ @lockutils.synchronized('cell-db-sync', 'nova-')
+ def _cell_db_sync(self):
+ """Update status for all cells if it's time. Most calls to
+ this are from the check_for_update() decorator that checks
+ the time, but it checks outside of a lock. The duplicate
+ check here is to prevent multiple threads from pulling the
+ information simultaneously.
+ """
+ if self._time_to_sync():
+ LOG.debug(_("Updating cell cache from db."))
+ self.last_cell_db_check = timeutils.utcnow()
+ ctxt = context.get_admin_context()
+ self._refresh_cells_from_db(ctxt)
+ self._update_our_capacity(ctxt)
+
+ @sync_from_db
+ def get_my_state(self):
+ """Return information for my (this) cell."""
+ return self.my_cell_state
+
+ @sync_from_db
+ def get_child_cells(self):
+ """Return list of child cell_infos."""
+ return self.child_cells.values()
+
+ @sync_from_db
+ def get_parent_cells(self):
+ """Return list of parent cell_infos."""
+ return self.parent_cells.values()
+
+ @sync_from_db
+ def get_parent_cell(self, cell_name):
+ return self.parent_cells.get(cell_name)
+
+ @sync_from_db
+ def get_child_cell(self, cell_name):
+ return self.child_cells.get(cell_name)
+
+ @sync_from_db
+ def update_cell_capabilities(self, cell_name, capabilities):
+ """Update capabilities for a cell."""
+ cell = self.child_cells.get(cell_name)
+ if not cell:
+ cell = self.parent_cells.get(cell_name)
+ if not cell:
+ LOG.error(_("Unknown cell '%(cell_name)s' when trying to "
+ "update capabilities"), locals())
+ return
+ # Make sure capabilities are sets.
+ for capab_name, values in capabilities.items():
+ capabilities[capab_name] = set(values)
+ cell.update_capabilities(capabilities)
+
+ @sync_from_db
+ def update_cell_capacities(self, cell_name, capacities):
+ """Update capacities for a cell."""
+ cell = self.child_cells.get(cell_name)
+ if not cell:
+ cell = self.parent_cells.get(cell_name)
+ if not cell:
+ LOG.error(_("Unknown cell '%(cell_name)s' when trying to "
+ "update capacities"), locals())
+ return
+ cell.update_capacities(capacities)
+
+ @sync_from_db
+ def get_our_capabilities(self, include_children=True):
+ capabs = copy.deepcopy(self.my_cell_state.capabilities)
+ if include_children:
+ for cell in self.child_cells.values():
+ for capab_name, values in cell.capabilities.items():
+ if capab_name not in capabs:
+ capabs[capab_name] = set([])
+ capabs[capab_name] |= values
+ return capabs
+
+ def _add_to_dict(self, target, src):
+ for key, value in src.items():
+ if isinstance(value, dict):
+ target.setdefault(key, {})
+ self._add_to_dict(target[key], value)
+ continue
+ target.setdefault(key, 0)
+ target[key] += value
+
+ @sync_from_db
+ def get_our_capacities(self, include_children=True):
+ capacities = copy.deepcopy(self.my_cell_state.capacities)
+ if include_children:
+ for cell in self.child_cells.values():
+ self._add_to_dict(capacities, cell.capacities)
+ return capacities
diff --git a/nova/compute/api.py b/nova/compute/api.py
index 757f78f2d4..abbc0bd92f 100644
--- a/nova/compute/api.py
+++ b/nova/compute/api.py
@@ -1954,6 +1954,14 @@ class API(base.Base):
return {'url': connect_info['access_url']}
+ def get_vnc_connect_info(self, context, instance, console_type):
+ """Used in a child cell to get console info."""
+ if not instance['host']:
+ raise exception.InstanceNotReady(instance_id=instance['uuid'])
+ connect_info = self.compute_rpcapi.get_vnc_console(context,
+ instance=instance, console_type=console_type)
+ return connect_info
+
@wrap_check_policy
def get_console_output(self, context, instance, tail_length=None):
"""Get console output for an instance."""
diff --git a/nova/compute/cells_api.py b/nova/compute/cells_api.py
new file mode 100644
index 0000000000..cdbccebb17
--- /dev/null
+++ b/nova/compute/cells_api.py
@@ -0,0 +1,471 @@
+# Copyright (c) 2012 Rackspace Hosting
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+"""Compute API that proxies via Cells Service"""
+
+from nova import block_device
+from nova.cells import rpcapi as cells_rpcapi
+from nova.compute import api as compute_api
+from nova.compute import task_states
+from nova.compute import vm_states
+from nova import exception
+from nova.openstack.common import excutils
+from nova.openstack.common import log as logging
+
+LOG = logging.getLogger(__name__)
+
+
+check_instance_state = compute_api.check_instance_state
+wrap_check_policy = compute_api.wrap_check_policy
+check_policy = compute_api.check_policy
+check_instance_lock = compute_api.check_instance_lock
+
+
+def validate_cell(fn):
+ def _wrapped(self, context, instance, *args, **kwargs):
+ self._validate_cell(instance, fn.__name__)
+ return fn(self, context, instance, *args, **kwargs)
+ _wrapped.__name__ = fn.__name__
+ return _wrapped
+
+
+class ComputeRPCAPINoOp(object):
+ def __getattr__(self, key):
+ def _noop_rpc_wrapper(*args, **kwargs):
+ return None
+ return _noop_rpc_wrapper
+
+
+class SchedulerRPCAPIRedirect(object):
+ def __init__(self, cells_rpcapi_obj):
+ self.cells_rpcapi = cells_rpcapi_obj
+
+ def __getattr__(self, key):
+ def _noop_rpc_wrapper(*args, **kwargs):
+ return None
+ return _noop_rpc_wrapper
+
+ def run_instance(self, context, **kwargs):
+ self.cells_rpcapi.schedule_run_instance(context, **kwargs)
+
+
+class ComputeCellsAPI(compute_api.API):
+ def __init__(self, *args, **kwargs):
+ super(ComputeCellsAPI, self).__init__(*args, **kwargs)
+ self.cells_rpcapi = cells_rpcapi.CellsAPI()
+ # Avoid casts/calls directly to compute
+ self.compute_rpcapi = ComputeRPCAPINoOp()
+ # Redirect scheduler run_instance to cells.
+ self.scheduler_rpcapi = SchedulerRPCAPIRedirect(self.cells_rpcapi)
+
+ def _cell_read_only(self, cell_name):
+ """Is the target cell in a read-only mode?"""
+ # FIXME(comstud): Add support for this.
+ return False
+
+ def _validate_cell(self, instance, method):
+ cell_name = instance['cell_name']
+ if not cell_name:
+ raise exception.InstanceUnknownCell(
+ instance_uuid=instance['uuid'])
+ if self._cell_read_only(cell_name):
+ raise exception.InstanceInvalidState(
+ attr="vm_state",
+ instance_uuid=instance['uuid'],
+ state="temporary_readonly",
+ method=method)
+
+ def _cast_to_cells(self, context, instance, method, *args, **kwargs):
+ instance_uuid = instance['uuid']
+ cell_name = instance['cell_name']
+ if not cell_name:
+ raise exception.InstanceUnknownCell(instance_uuid=instance_uuid)
+ self.cells_rpcapi.cast_compute_api_method(context, cell_name,
+ method, instance_uuid, *args, **kwargs)
+
+ def _call_to_cells(self, context, instance, method, *args, **kwargs):
+ instance_uuid = instance['uuid']
+ cell_name = instance['cell_name']
+ if not cell_name:
+ raise exception.InstanceUnknownCell(instance_uuid=instance_uuid)
+ return self.cells_rpcapi.call_compute_api_method(context, cell_name,
+ method, instance_uuid, *args, **kwargs)
+
+ def _check_requested_networks(self, context, requested_networks):
+ """Override compute API's checking of this. It'll happen in
+ child cell
+ """
+ return
+
+ def _validate_image_href(self, context, image_href):
+ """Override compute API's checking of this. It'll happen in
+ child cell
+ """
+ return
+
+ def _create_image(self, context, instance, name, image_type,
+ backup_type=None, rotation=None, extra_properties=None):
+ if backup_type:
+ return self._call_to_cells(context, instance, 'backup',
+ name, backup_type, rotation,
+ extra_properties=extra_properties)
+ else:
+ return self._call_to_cells(context, instance, 'snapshot',
+ name, extra_properties=extra_properties)
+
+ def create(self, *args, **kwargs):
+ """We can use the base functionality, but I left this here just
+ for completeness.
+ """
+ return super(ComputeCellsAPI, self).create(*args, **kwargs)
+
+ @validate_cell
+ def update(self, context, instance, **kwargs):
+ """Update an instance."""
+ rv = super(ComputeCellsAPI, self).update(context,
+ instance, **kwargs)
+ # We need to skip vm_state/task_state updates... those will
+ # happen when via a a _cast_to_cells for running a different
+ # compute api method
+ kwargs_copy = kwargs.copy()
+ kwargs_copy.pop('vm_state', None)
+ kwargs_copy.pop('task_state', None)
+ if kwargs_copy:
+ try:
+ self._cast_to_cells(context, instance, 'update',
+ **kwargs_copy)
+ except exception.InstanceUnknownCell:
+ pass
+ return rv
+
+ def _local_delete(self, context, instance, bdms):
+ # This will get called for every delete in the API cell
+ # because _delete() in compute/api.py will not find a
+ # service when checking if it's up.
+ # We need to only take action if there's no cell_name. Our
+ # overrides of delete() and soft_delete() will take care of
+ # the rest.
+ cell_name = instance['cell_name']
+ if not cell_name:
+ return super(ComputeCellsAPI, self)._local_delete(context,
+ instance, bdms)
+
+ def soft_delete(self, context, instance):
+ self._handle_cell_delete(context, instance,
+ super(ComputeCellsAPI, self).soft_delete, 'soft_delete')
+
+ def delete(self, context, instance):
+ self._handle_cell_delete(context, instance,
+ super(ComputeCellsAPI, self).delete, 'delete')
+
+ def _handle_cell_delete(self, context, instance, method, method_name):
+ """Terminate an instance."""
+ # We can't use the decorator because we have special logic in the
+ # case we don't know the cell_name...
+ cell_name = instance['cell_name']
+ if cell_name and self._cell_read_only(cell_name):
+ raise exception.InstanceInvalidState(
+ attr="vm_state",
+ instance_uuid=instance['uuid'],
+ state="temporary_readonly",
+ method=method_name)
+ method(context, instance)
+ try:
+ self._cast_to_cells(context, instance, method_name)
+ except exception.InstanceUnknownCell:
+ # If there's no cell, there's also no host... which means
+ # the instance was destroyed from the DB here. Let's just
+ # broadcast a message down to all cells and hope this ends
+ # up resolving itself... Worse case.. the instance will
+ # show back up again here.
+ delete_type = method == 'soft_delete' and 'soft' or 'hard'
+ self.cells_rpcapi.instance_delete_everywhere(context,
+ instance['uuid'], delete_type)
+
+ @validate_cell
+ def restore(self, context, instance):
+ """Restore a previously deleted (but not reclaimed) instance."""
+ super(ComputeCellsAPI, self).restore(context, instance)
+ self._cast_to_cells(context, instance, 'restore')
+
+ @validate_cell
+ def force_delete(self, context, instance):
+ """Force delete a previously deleted (but not reclaimed) instance."""
+ super(ComputeCellsAPI, self).force_delete(context, instance)
+ self._cast_to_cells(context, instance, 'force_delete')
+
+ @validate_cell
+ def stop(self, context, instance, do_cast=True):
+ """Stop an instance."""
+ super(ComputeCellsAPI, self).stop(context, instance)
+ if do_cast:
+ self._cast_to_cells(context, instance, 'stop', do_cast=True)
+ else:
+ return self._call_to_cells(context, instance, 'stop',
+ do_cast=False)
+
+ @validate_cell
+ def start(self, context, instance):
+ """Start an instance."""
+ super(ComputeCellsAPI, self).start(context, instance)
+ self._cast_to_cells(context, instance, 'start')
+
+ @validate_cell
+ def reboot(self, context, instance, *args, **kwargs):
+ """Reboot the given instance."""
+ super(ComputeCellsAPI, self).reboot(context, instance,
+ *args, **kwargs)
+ self._cast_to_cells(context, instance, 'reboot', *args,
+ **kwargs)
+
+ @validate_cell
+ def rebuild(self, context, instance, *args, **kwargs):
+ """Rebuild the given instance with the provided attributes."""
+ super(ComputeCellsAPI, self).rebuild(context, instance, *args,
+ **kwargs)
+ self._cast_to_cells(context, instance, 'rebuild', *args, **kwargs)
+
+ @check_instance_state(vm_state=[vm_states.RESIZED])
+ @validate_cell
+ def revert_resize(self, context, instance):
+ """Reverts a resize, deleting the 'new' instance in the process."""
+ # NOTE(markwash): regular api manipulates the migration here, but we
+ # don't have access to it. So to preserve the interface just update the
+ # vm and task state.
+ self.update(context, instance,
+ task_state=task_states.RESIZE_REVERTING)
+ self._cast_to_cells(context, instance, 'revert_resize')
+
+ @check_instance_state(vm_state=[vm_states.RESIZED])
+ @validate_cell
+ def confirm_resize(self, context, instance):
+ """Confirms a migration/resize and deletes the 'old' instance."""
+ # NOTE(markwash): regular api manipulates migration here, but we don't
+ # have the migration in the api database. So to preserve the interface
+ # just update the vm and task state without calling super()
+ self.update(context, instance, task_state=None,
+ vm_state=vm_states.ACTIVE)
+ self._cast_to_cells(context, instance, 'confirm_resize')
+
+ @check_instance_state(vm_state=[vm_states.ACTIVE, vm_states.STOPPED],
+ task_state=[None])
+ @validate_cell
+ def resize(self, context, instance, *args, **kwargs):
+ """Resize (ie, migrate) a running instance.
+
+ If flavor_id is None, the process is considered a migration, keeping
+ the original flavor_id. If flavor_id is not None, the instance should
+ be migrated to a new host and resized to the new flavor_id.
+ """
+ super(ComputeCellsAPI, self).resize(context, instance, *args,
+ **kwargs)
+ # FIXME(comstud): pass new instance_type object down to a method
+ # that'll unfold it
+ self._cast_to_cells(context, instance, 'resize', *args, **kwargs)
+
+ @validate_cell
+ def add_fixed_ip(self, context, instance, *args, **kwargs):
+ """Add fixed_ip from specified network to given instance."""
+ super(ComputeCellsAPI, self).add_fixed_ip(context, instance,
+ *args, **kwargs)
+ self._cast_to_cells(context, instance, 'add_fixed_ip',
+ *args, **kwargs)
+
+ @validate_cell
+ def remove_fixed_ip(self, context, instance, *args, **kwargs):
+ """Remove fixed_ip from specified network to given instance."""
+ super(ComputeCellsAPI, self).remove_fixed_ip(context, instance,
+ *args, **kwargs)
+ self._cast_to_cells(context, instance, 'remove_fixed_ip',
+ *args, **kwargs)
+
+ @validate_cell
+ def pause(self, context, instance):
+ """Pause the given instance."""
+ super(ComputeCellsAPI, self).pause(context, instance)
+ self._cast_to_cells(context, instance, 'pause')
+
+ @validate_cell
+ def unpause(self, context, instance):
+ """Unpause the given instance."""
+ super(ComputeCellsAPI, self).unpause(context, instance)
+ self._cast_to_cells(context, instance, 'unpause')
+
+ def set_host_enabled(self, context, host, enabled):
+ """Sets the specified host's ability to accept new instances."""
+ # FIXME(comstud): Since there's no instance here, we have no
+ # idea which cell should be the target.
+ pass
+
+ def host_power_action(self, context, host, action):
+ """Reboots, shuts down or powers up the host."""
+ # FIXME(comstud): Since there's no instance here, we have no
+ # idea which cell should be the target.
+ pass
+
+ def get_diagnostics(self, context, instance):
+ """Retrieve diagnostics for the given instance."""
+ # FIXME(comstud): Cache this?
+ # Also: only calling super() to get state/policy checking
+ super(ComputeCellsAPI, self).get_diagnostics(context, instance)
+ return self._call_to_cells(context, instance, 'get_diagnostics')
+
+ @validate_cell
+ def suspend(self, context, instance):
+ """Suspend the given instance."""
+ super(ComputeCellsAPI, self).suspend(context, instance)
+ self._cast_to_cells(context, instance, 'suspend')
+
+ @validate_cell
+ def resume(self, context, instance):
+ """Resume the given instance."""
+ super(ComputeCellsAPI, self).resume(context, instance)
+ self._cast_to_cells(context, instance, 'resume')
+
+ @validate_cell
+ def rescue(self, context, instance, rescue_password=None):
+ """Rescue the given instance."""
+ super(ComputeCellsAPI, self).rescue(context, instance,
+ rescue_password=rescue_password)
+ self._cast_to_cells(context, instance, 'rescue',
+ rescue_password=rescue_password)
+
+ @validate_cell
+ def unrescue(self, context, instance):
+ """Unrescue the given instance."""
+ super(ComputeCellsAPI, self).unrescue(context, instance)
+ self._cast_to_cells(context, instance, 'unrescue')
+
+ @validate_cell
+ def set_admin_password(self, context, instance, password=None):
+ """Set the root/admin password for the given instance."""
+ super(ComputeCellsAPI, self).set_admin_password(context, instance,
+ password=password)
+ self._cast_to_cells(context, instance, 'set_admin_password',
+ password=password)
+
+ @validate_cell
+ def inject_file(self, context, instance, *args, **kwargs):
+ """Write a file to the given instance."""
+ super(ComputeCellsAPI, self).inject_file(context, instance, *args,
+ **kwargs)
+ self._cast_to_cells(context, instance, 'inject_file', *args, **kwargs)
+
+ @wrap_check_policy
+ @validate_cell
+ def get_vnc_console(self, context, instance, console_type):
+ """Get a url to a VNC Console."""
+ if not instance['host']:
+ raise exception.InstanceNotReady(instance_id=instance['uuid'])
+
+ connect_info = self._call_to_cells(context, instance,
+ 'get_vnc_connect_info', console_type)
+
+ self.consoleauth_rpcapi.authorize_console(context,
+ connect_info['token'], console_type, connect_info['host'],
+ connect_info['port'], connect_info['internal_access_path'])
+ return {'url': connect_info['access_url']}
+
+ @validate_cell
+ def get_console_output(self, context, instance, *args, **kwargs):
+ """Get console output for an an instance."""
+ # NOTE(comstud): Calling super() just to get policy check
+ super(ComputeCellsAPI, self).get_console_output(context, instance,
+ *args, **kwargs)
+ return self._call_to_cells(context, instance, 'get_console_output',
+ *args, **kwargs)
+
+ def lock(self, context, instance):
+ """Lock the given instance."""
+ super(ComputeCellsAPI, self).lock(context, instance)
+ self._cast_to_cells(context, instance, 'lock')
+
+ def unlock(self, context, instance):
+ """Unlock the given instance."""
+ super(ComputeCellsAPI, self).lock(context, instance)
+ self._cast_to_cells(context, instance, 'unlock')
+
+ @validate_cell
+ def reset_network(self, context, instance):
+ """Reset networking on the instance."""
+ super(ComputeCellsAPI, self).reset_network(context, instance)
+ self._cast_to_cells(context, instance, 'reset_network')
+
+ @validate_cell
+ def inject_network_info(self, context, instance):
+ """Inject network info for the instance."""
+ super(ComputeCellsAPI, self).inject_network_info(context, instance)
+ self._cast_to_cells(context, instance, 'inject_network_info')
+
+ @wrap_check_policy
+ @validate_cell
+ def attach_volume(self, context, instance, volume_id, device=None):
+ """Attach an existing volume to an existing instance."""
+ if device and not block_device.match_device(device):
+ raise exception.InvalidDevicePath(path=device)
+ device = self.compute_rpcapi.reserve_block_device_name(
+ context, device=device, instance=instance, volume_id=volume_id)
+ try:
+ volume = self.volume_api.get(context, volume_id)
+ self.volume_api.check_attach(context, volume)
+ except Exception:
+ with excutils.save_and_reraise_exception():
+ self.db.block_device_mapping_destroy_by_instance_and_device(
+ context, instance['uuid'], device)
+ self._cast_to_cells(context, instance, 'attach_volume',
+ volume_id, device)
+
+ @check_instance_lock
+ @validate_cell
+ def _detach_volume(self, context, instance, volume_id):
+ """Detach a volume from an instance."""
+ check_policy(context, 'detach_volume', instance)
+
+ volume = self.volume_api.get(context, volume_id)
+ self.volume_api.check_detach(context, volume)
+ self._cast_to_cells(context, instance, 'detach_volume',
+ volume_id)
+
+ @wrap_check_policy
+ @validate_cell
+ def associate_floating_ip(self, context, instance, address):
+ """Makes calls to network_api to associate_floating_ip.
+
+ :param address: is a string floating ip address
+ """
+ self._cast_to_cells(context, instance, 'associate_floating_ip',
+ address)
+
+ @validate_cell
+ def delete_instance_metadata(self, context, instance, key):
+ """Delete the given metadata item from an instance."""
+ super(ComputeCellsAPI, self).delete_instance_metadata(context,
+ instance, key)
+ self._cast_to_cells(context, instance, 'delete_instance_metadata',
+ key)
+
+ @wrap_check_policy
+ @validate_cell
+ def update_instance_metadata(self, context, instance,
+ metadata, delete=False):
+ rv = super(ComputeCellsAPI, self).update_instance_metadata(context,
+ instance, metadata, delete=delete)
+ try:
+ self._cast_to_cells(context, instance,
+ 'update_instance_metadata',
+ metadata, delete=delete)
+ except exception.InstanceUnknownCell:
+ pass
+ return rv
diff --git a/nova/db/api.py b/nova/db/api.py
index 3e350fc754..1322c29e9c 100644
--- a/nova/db/api.py
+++ b/nova/db/api.py
@@ -43,8 +43,10 @@ these objects be simple dictionaries.
"""
+from nova.cells import rpcapi as cells_rpcapi
from nova import exception
from nova.openstack.common import cfg
+from nova.openstack.common import log as logging
from nova import utils
@@ -68,6 +70,7 @@ CONF.register_opts(db_opts)
IMPL = utils.LazyPluggable('db_backend',
sqlalchemy='nova.db.sqlalchemy.api')
+LOG = logging.getLogger(__name__)
class NoMoreNetworks(exception.NovaException):
@@ -566,9 +569,16 @@ def instance_data_get_for_project(context, project_id, session=None):
session=session)
-def instance_destroy(context, instance_uuid, constraint=None):
+def instance_destroy(context, instance_uuid, constraint=None,
+ update_cells=True):
"""Destroy the instance or raise if it does not exist."""
- return IMPL.instance_destroy(context, instance_uuid, constraint)
+ rv = IMPL.instance_destroy(context, instance_uuid, constraint)
+ if update_cells:
+ try:
+ cells_rpcapi.CellsAPI().instance_destroy_at_top(context, rv)
+ except Exception:
+ LOG.exception(_("Failed to notify cells of instance destroy"))
+ return rv
def instance_get_by_uuid(context, uuid):
@@ -665,13 +675,19 @@ def instance_test_and_set(context, instance_uuid, attr, ok_states,
ok_states, new_state)
-def instance_update(context, instance_uuid, values):
+def instance_update(context, instance_uuid, values, update_cells=True):
"""Set the given properties on an instance and update it.
Raises NotFound if instance does not exist.
"""
- return IMPL.instance_update(context, instance_uuid, values)
+ rv = IMPL.instance_update(context, instance_uuid, values)
+ if update_cells:
+ try:
+ cells_rpcapi.CellsAPI().instance_update_at_top(context, rv)
+ except Exception:
+ LOG.exception(_("Failed to notify cells of instance update"))
+ return rv
def instance_update_and_get_original(context, instance_uuid, values):
@@ -687,8 +703,12 @@ def instance_update_and_get_original(context, instance_uuid, values):
Raises NotFound if instance does not exist.
"""
- return IMPL.instance_update_and_get_original(context, instance_uuid,
- values)
+ rv = IMPL.instance_update_and_get_original(context, instance_uuid, values)
+ try:
+ cells_rpcapi.CellsAPI().instance_update_at_top(context, rv[1])
+ except Exception:
+ LOG.exception(_("Failed to notify cells of instance update"))
+ return rv
def instance_add_security_group(context, instance_id, security_group_id):
@@ -714,13 +734,21 @@ def instance_info_cache_get(context, instance_uuid):
return IMPL.instance_info_cache_get(context, instance_uuid)
-def instance_info_cache_update(context, instance_uuid, values):
+def instance_info_cache_update(context, instance_uuid, values,
+ update_cells=True):
"""Update an instance info cache record in the table.
:param instance_uuid: = uuid of info cache's instance
:param values: = dict containing column values to update
"""
- return IMPL.instance_info_cache_update(context, instance_uuid, values)
+ rv = IMPL.instance_info_cache_update(context, instance_uuid, values)
+ try:
+ cells_rpcapi.CellsAPI().instance_info_cache_update_at_top(context,
+ rv)
+ except Exception:
+ LOG.exception(_("Failed to notify cells of instance info cache "
+ "update"))
+ return rv
def instance_info_cache_delete(context, instance_uuid):
@@ -1354,7 +1382,7 @@ def instance_metadata_delete(context, instance_uuid, key):
def instance_metadata_update(context, instance_uuid, metadata, delete):
"""Update metadata if it exists, otherwise create it."""
return IMPL.instance_metadata_update(context, instance_uuid,
- metadata, delete)
+ metadata, delete)
####################
@@ -1414,12 +1442,21 @@ def bw_usage_get_by_uuids(context, uuids, start_period):
def bw_usage_update(context, uuid, mac, start_period, bw_in, bw_out,
- last_ctr_in, last_ctr_out, last_refreshed=None):
+ last_ctr_in, last_ctr_out, last_refreshed=None,
+ update_cells=True):
"""Update cached bandwidth usage for an instance's network based on mac
address. Creates new record if needed.
"""
- return IMPL.bw_usage_update(context, uuid, mac, start_period, bw_in,
+ rv = IMPL.bw_usage_update(context, uuid, mac, start_period, bw_in,
bw_out, last_ctr_in, last_ctr_out, last_refreshed=last_refreshed)
+ if update_cells:
+ try:
+ cells_rpcapi.CellsAPI().bw_usage_update_at_top(context,
+ uuid, mac, start_period, bw_in, bw_out,
+ last_ctr_in, last_ctr_out, last_refreshed)
+ except Exception:
+ LOG.exception(_("Failed to notify cells of bw_usage update"))
+ return rv
####################
@@ -1555,9 +1592,15 @@ def aggregate_host_delete(context, aggregate_id, host):
####################
-def instance_fault_create(context, values):
+def instance_fault_create(context, values, update_cells=True):
"""Create a new Instance Fault."""
- return IMPL.instance_fault_create(context, values)
+ rv = IMPL.instance_fault_create(context, values)
+ if update_cells:
+ try:
+ cells_rpcapi.CellsAPI().instance_fault_create_at_top(context, rv)
+ except Exception:
+ LOG.exception(_("Failed to notify cells of instance fault"))
+ return rv
def instance_fault_get_by_instance_uuids(context, instance_uuids):
diff --git a/nova/exception.py b/nova/exception.py
index ee0a88a95e..c484b51202 100644
--- a/nova/exception.py
+++ b/nova/exception.py
@@ -769,6 +769,34 @@ class CellNotFound(NotFound):
message = _("Cell %(cell_id)s could not be found.")
+class CellRoutingInconsistency(NovaException):
+ message = _("Inconsistency in cell routing: %(reason)s")
+
+
+class CellServiceAPIMethodNotFound(NotFound):
+ message = _("Service API method not found: %(detail)s")
+
+
+class CellTimeout(NotFound):
+ message = _("Timeout waiting for response from cell")
+
+
+class CellMaxHopCountReached(NovaException):
+ message = _("Cell message has reached maximum hop count: %(hop_count)s")
+
+
+class NoCellsAvailable(NovaException):
+ message = _("No cells available matching scheduling criteria.")
+
+
+class CellError(NovaException):
+ message = _("Exception received during cell processing: %(exc_name)s.")
+
+
+class InstanceUnknownCell(NotFound):
+ message = _("Cell is not known for instance %(instance_uuid)s")
+
+
class SchedulerHostFilterNotFound(NotFound):
message = _("Scheduler Host Filter %(filter_name)s could not be found.")
diff --git a/nova/tests/cells/__init__.py b/nova/tests/cells/__init__.py
new file mode 100644
index 0000000000..d1bf725f78
--- /dev/null
+++ b/nova/tests/cells/__init__.py
@@ -0,0 +1,19 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright (c) 2012 Rackspace Hosting
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+# NOTE(vish): this forces the fixtures from tests/__init.py:setup() to work
+from nova.tests import *
diff --git a/nova/tests/cells/fakes.py b/nova/tests/cells/fakes.py
new file mode 100644
index 0000000000..a9de530d1d
--- /dev/null
+++ b/nova/tests/cells/fakes.py
@@ -0,0 +1,191 @@
+# Copyright (c) 2012 Rackspace Hosting
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+"""
+Fakes For Cells tests.
+"""
+
+from nova.cells import driver
+from nova.cells import manager as cells_manager
+from nova.cells import messaging
+from nova.cells import state as cells_state
+import nova.db
+from nova.db import base
+from nova.openstack.common import cfg
+
+CONF = cfg.CONF
+CONF.import_opt('name', 'nova.cells.opts', group='cells')
+
+
+# Fake Cell Hierarchy
+FAKE_TOP_LEVEL_CELL_NAME = 'api-cell'
+FAKE_CELL_LAYOUT = [{'child-cell1': []},
+ {'child-cell2': [{'grandchild-cell1': []}]},
+ {'child-cell3': [{'grandchild-cell2': []},
+ {'grandchild-cell3': []}]},
+ {'child-cell4': []}]
+
+# build_cell_stub_infos() below will take the above layout and create
+# a fake view of the DB from the perspective of each of the cells.
+# For each cell, a CellStubInfo will be created with this info.
+CELL_NAME_TO_STUB_INFO = {}
+
+
+class FakeDBApi(object):
+ def __init__(self, cell_db_entries):
+ self.cell_db_entries = cell_db_entries
+
+ def __getattr__(self, key):
+ return getattr(nova.db, key)
+
+ def cell_get_all(self, ctxt):
+ return self.cell_db_entries
+
+ def compute_node_get_all(self, ctxt):
+ return []
+
+
+class FakeCellsDriver(driver.BaseCellsDriver):
+ pass
+
+
+class FakeCellState(cells_state.CellState):
+ def send_message(self, message):
+ message_runner = get_message_runner(self.name)
+ orig_ctxt = message.ctxt
+ json_message = message.to_json()
+ message = message_runner.message_from_json(json_message)
+ # Restore this so we can use mox and verify same context
+ message.ctxt = orig_ctxt
+ message.process()
+
+
+class FakeCellStateManager(cells_state.CellStateManager):
+ def __init__(self, *args, **kwargs):
+ super(FakeCellStateManager, self).__init__(*args,
+ cell_state_cls=FakeCellState, **kwargs)
+
+
+class FakeCellsManager(cells_manager.CellsManager):
+ def __init__(self, *args, **kwargs):
+ super(FakeCellsManager, self).__init__(*args,
+ cell_state_manager=FakeCellStateManager,
+ **kwargs)
+
+
+class CellStubInfo(object):
+ def __init__(self, test_case, cell_name, db_entries):
+ self.test_case = test_case
+ self.cell_name = cell_name
+ self.db_entries = db_entries
+
+ def fake_base_init(_self, *args, **kwargs):
+ _self.db = FakeDBApi(db_entries)
+
+ test_case.stubs.Set(base.Base, '__init__', fake_base_init)
+ self.cells_manager = FakeCellsManager()
+ # Fix the cell name, as it normally uses CONF.cells.name
+ msg_runner = self.cells_manager.msg_runner
+ msg_runner.our_name = self.cell_name
+ self.cells_manager.state_manager.my_cell_state.name = self.cell_name
+
+
+def _build_cell_stub_info(test_case, our_name, parent_path, children):
+ cell_db_entries = []
+ cur_db_id = 1
+ sep_char = messaging._PATH_CELL_SEP
+ if parent_path:
+ cell_db_entries.append(
+ dict(id=cur_db_id,
+ name=parent_path.split(sep_char)[-1],
+ is_parent=True,
+ username='username%s' % cur_db_id,
+ password='password%s' % cur_db_id,
+ rpc_host='rpc_host%s' % cur_db_id,
+ rpc_port='rpc_port%s' % cur_db_id,
+ rpc_virtual_host='rpc_vhost%s' % cur_db_id))
+ cur_db_id += 1
+ our_path = parent_path + sep_char + our_name
+ else:
+ our_path = our_name
+ for child in children:
+ for child_name, grandchildren in child.items():
+ _build_cell_stub_info(test_case, child_name, our_path,
+ grandchildren)
+ cell_entry = dict(id=cur_db_id,
+ name=child_name,
+ username='username%s' % cur_db_id,
+ password='password%s' % cur_db_id,
+ rpc_host='rpc_host%s' % cur_db_id,
+ rpc_port='rpc_port%s' % cur_db_id,
+ rpc_virtual_host='rpc_vhost%s' % cur_db_id,
+ is_parent=False)
+ cell_db_entries.append(cell_entry)
+ cur_db_id += 1
+ stub_info = CellStubInfo(test_case, our_name, cell_db_entries)
+ CELL_NAME_TO_STUB_INFO[our_name] = stub_info
+
+
+def _build_cell_stub_infos(test_case):
+ _build_cell_stub_info(test_case, FAKE_TOP_LEVEL_CELL_NAME, '',
+ FAKE_CELL_LAYOUT)
+
+
+def init(test_case):
+ global CELL_NAME_TO_STUB_INFO
+ test_case.flags(driver='nova.tests.cells.fakes.FakeCellsDriver',
+ group='cells')
+ CELL_NAME_TO_STUB_INFO = {}
+ _build_cell_stub_infos(test_case)
+
+
+def _get_cell_stub_info(cell_name):
+ return CELL_NAME_TO_STUB_INFO[cell_name]
+
+
+def get_state_manager(cell_name):
+ return _get_cell_stub_info(cell_name).cells_manager.state_manager
+
+
+def get_cell_state(cur_cell_name, tgt_cell_name):
+ state_manager = get_state_manager(cur_cell_name)
+ cell = state_manager.child_cells.get(tgt_cell_name)
+ if cell is None:
+ cell = state_manager.parent_cells.get(tgt_cell_name)
+ return cell
+
+
+def get_cells_manager(cell_name):
+ return _get_cell_stub_info(cell_name).cells_manager
+
+
+def get_message_runner(cell_name):
+ return _get_cell_stub_info(cell_name).cells_manager.msg_runner
+
+
+def stub_tgt_method(test_case, cell_name, method_name, method):
+ msg_runner = get_message_runner(cell_name)
+ tgt_msg_methods = msg_runner.methods_by_type['targeted']
+ setattr(tgt_msg_methods, method_name, method)
+
+
+def stub_bcast_method(test_case, cell_name, method_name, method):
+ msg_runner = get_message_runner(cell_name)
+ tgt_msg_methods = msg_runner.methods_by_type['broadcast']
+ setattr(tgt_msg_methods, method_name, method)
+
+
+def stub_bcast_methods(test_case, method_name, method):
+ for cell_name in CELL_NAME_TO_STUB_INFO.keys():
+ stub_bcast_method(test_case, cell_name, method_name, method)
diff --git a/nova/tests/cells/test_cells_manager.py b/nova/tests/cells/test_cells_manager.py
new file mode 100644
index 0000000000..5a2b83145a
--- /dev/null
+++ b/nova/tests/cells/test_cells_manager.py
@@ -0,0 +1,151 @@
+# Copyright (c) 2012 Rackspace Hosting
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+"""
+Tests For CellsManager
+"""
+from nova.cells import messaging
+from nova import context
+from nova import test
+from nova.tests.cells import fakes
+
+
+class CellsManagerClassTestCase(test.TestCase):
+ """Test case for CellsManager class"""
+
+ def setUp(self):
+ super(CellsManagerClassTestCase, self).setUp()
+ fakes.init(self)
+ # pick a child cell to use for tests.
+ self.our_cell = 'grandchild-cell1'
+ self.cells_manager = fakes.get_cells_manager(self.our_cell)
+ self.msg_runner = self.cells_manager.msg_runner
+ self.driver = self.cells_manager.driver
+ self.ctxt = 'fake_context'
+
+ def test_post_start_hook_child_cell(self):
+ self.mox.StubOutWithMock(self.driver, 'start_consumers')
+ self.mox.StubOutWithMock(context, 'get_admin_context')
+ self.mox.StubOutWithMock(self.cells_manager, '_update_our_parents')
+
+ self.driver.start_consumers(self.msg_runner)
+ context.get_admin_context().AndReturn(self.ctxt)
+ self.cells_manager._update_our_parents(self.ctxt)
+ self.mox.ReplayAll()
+ self.cells_manager.post_start_hook()
+
+ def test_post_start_hook_middle_cell(self):
+ cells_manager = fakes.get_cells_manager('child-cell2')
+ msg_runner = cells_manager.msg_runner
+ driver = cells_manager.driver
+
+ self.mox.StubOutWithMock(driver, 'start_consumers')
+ self.mox.StubOutWithMock(context, 'get_admin_context')
+ self.mox.StubOutWithMock(msg_runner,
+ 'ask_children_for_capabilities')
+ self.mox.StubOutWithMock(msg_runner,
+ 'ask_children_for_capacities')
+
+ driver.start_consumers(msg_runner)
+ context.get_admin_context().AndReturn(self.ctxt)
+ msg_runner.ask_children_for_capabilities(self.ctxt)
+ msg_runner.ask_children_for_capacities(self.ctxt)
+ self.mox.ReplayAll()
+ cells_manager.post_start_hook()
+
+ def test_update_our_parents(self):
+ self.mox.StubOutWithMock(self.msg_runner,
+ 'tell_parents_our_capabilities')
+ self.mox.StubOutWithMock(self.msg_runner,
+ 'tell_parents_our_capacities')
+
+ self.msg_runner.tell_parents_our_capabilities(self.ctxt)
+ self.msg_runner.tell_parents_our_capacities(self.ctxt)
+ self.mox.ReplayAll()
+ self.cells_manager._update_our_parents(self.ctxt)
+
+ def test_schedule_run_instance(self):
+ host_sched_kwargs = 'fake_host_sched_kwargs_silently_passed'
+ self.mox.StubOutWithMock(self.msg_runner, 'schedule_run_instance')
+ our_cell = self.msg_runner.state_manager.get_my_state()
+ self.msg_runner.schedule_run_instance(self.ctxt, our_cell,
+ host_sched_kwargs)
+ self.mox.ReplayAll()
+ self.cells_manager.schedule_run_instance(self.ctxt,
+ host_sched_kwargs=host_sched_kwargs)
+
+ def test_run_compute_api_method(self):
+ # Args should just be silently passed through
+ cell_name = 'fake-cell-name'
+ method_info = 'fake-method-info'
+
+ fake_response = messaging.Response('fake', 'fake', False)
+
+ self.mox.StubOutWithMock(self.msg_runner,
+ 'run_compute_api_method')
+ self.mox.StubOutWithMock(fake_response,
+ 'value_or_raise')
+ self.msg_runner.run_compute_api_method(self.ctxt,
+ cell_name,
+ method_info,
+ True).AndReturn(fake_response)
+ fake_response.value_or_raise().AndReturn('fake-response')
+ self.mox.ReplayAll()
+ response = self.cells_manager.run_compute_api_method(
+ self.ctxt, cell_name=cell_name, method_info=method_info,
+ call=True)
+ self.assertEqual('fake-response', response)
+
+ def test_instance_update_at_top(self):
+ self.mox.StubOutWithMock(self.msg_runner, 'instance_update_at_top')
+ self.msg_runner.instance_update_at_top(self.ctxt, 'fake-instance')
+ self.mox.ReplayAll()
+ self.cells_manager.instance_update_at_top(self.ctxt,
+ instance='fake-instance')
+
+ def test_instance_destroy_at_top(self):
+ self.mox.StubOutWithMock(self.msg_runner, 'instance_destroy_at_top')
+ self.msg_runner.instance_destroy_at_top(self.ctxt, 'fake-instance')
+ self.mox.ReplayAll()
+ self.cells_manager.instance_destroy_at_top(self.ctxt,
+ instance='fake-instance')
+
+ def test_instance_delete_everywhere(self):
+ self.mox.StubOutWithMock(self.msg_runner,
+ 'instance_delete_everywhere')
+ self.msg_runner.instance_delete_everywhere(self.ctxt,
+ 'fake-instance',
+ 'fake-type')
+ self.mox.ReplayAll()
+ self.cells_manager.instance_delete_everywhere(
+ self.ctxt, instance='fake-instance',
+ delete_type='fake-type')
+
+ def test_instance_fault_create_at_top(self):
+ self.mox.StubOutWithMock(self.msg_runner,
+ 'instance_fault_create_at_top')
+ self.msg_runner.instance_fault_create_at_top(self.ctxt,
+ 'fake-fault')
+ self.mox.ReplayAll()
+ self.cells_manager.instance_fault_create_at_top(
+ self.ctxt, instance_fault='fake-fault')
+
+ def test_bw_usage_update_at_top(self):
+ self.mox.StubOutWithMock(self.msg_runner,
+ 'bw_usage_update_at_top')
+ self.msg_runner.bw_usage_update_at_top(self.ctxt,
+ 'fake-bw-info')
+ self.mox.ReplayAll()
+ self.cells_manager.bw_usage_update_at_top(
+ self.ctxt, bw_update_info='fake-bw-info')
diff --git a/nova/tests/cells/test_cells_messaging.py b/nova/tests/cells/test_cells_messaging.py
new file mode 100644
index 0000000000..d728c9474c
--- /dev/null
+++ b/nova/tests/cells/test_cells_messaging.py
@@ -0,0 +1,913 @@
+# Copyright (c) 2012 Rackspace Hosting # All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+"""
+Tests For Cells Messaging module
+"""
+
+from nova.cells import messaging
+from nova import context
+from nova import exception
+from nova.openstack.common import cfg
+from nova import test
+from nova.tests.cells import fakes
+
+
+CONF = cfg.CONF
+CONF.import_opt('host', 'nova.config')
+CONF.import_opt('name', 'nova.cells.opts', group='cells')
+CONF.import_opt('allowed_rpc_exception_modules',
+ 'nova.openstack.common.rpc')
+
+
+class CellsMessageClassesTestCase(test.TestCase):
+ """Test case for the main Cells Message classes."""
+ def setUp(self):
+ super(CellsMessageClassesTestCase, self).setUp()
+ fakes.init(self)
+ self.ctxt = context.RequestContext('fake', 'fake')
+ # Need to be able to deserialize test.TestingException.
+ allowed_modules = CONF.allowed_rpc_exception_modules
+ allowed_modules.append('nova.test')
+ self.flags(allowed_rpc_exception_modules=allowed_modules)
+ self.our_name = 'api-cell'
+ self.msg_runner = fakes.get_message_runner(self.our_name)
+ self.state_manager = self.msg_runner.state_manager
+
+ def test_reverse_path(self):
+ path = 'a!b!c!d'
+ expected = 'd!c!b!a'
+ rev_path = messaging._reverse_path(path)
+ self.assertEqual(rev_path, expected)
+
+ def test_response_cell_name_from_path(self):
+ # test array with tuples of inputs/expected outputs
+ test_paths = [('cell1', 'cell1'),
+ ('cell1!cell2', 'cell2!cell1'),
+ ('cell1!cell2!cell3', 'cell3!cell2!cell1')]
+
+ for test_input, expected_output in test_paths:
+ self.assertEqual(expected_output,
+ messaging._response_cell_name_from_path(test_input))
+
+ def test_response_cell_name_from_path_neighbor_only(self):
+ # test array with tuples of inputs/expected outputs
+ test_paths = [('cell1', 'cell1'),
+ ('cell1!cell2', 'cell2!cell1'),
+ ('cell1!cell2!cell3', 'cell3!cell2')]
+
+ for test_input, expected_output in test_paths:
+ self.assertEqual(expected_output,
+ messaging._response_cell_name_from_path(test_input,
+ neighbor_only=True))
+
+ def test_targeted_message(self):
+ self.flags(max_hop_count=99, group='cells')
+ target_cell = 'api-cell!child-cell2!grandchild-cell1'
+ method = 'fake_method'
+ method_kwargs = dict(arg1=1, arg2=2)
+ direction = 'down'
+ tgt_message = messaging._TargetedMessage(self.msg_runner,
+ self.ctxt, method,
+ method_kwargs, direction,
+ target_cell)
+ self.assertEqual(self.ctxt, tgt_message.ctxt)
+ self.assertEqual(method, tgt_message.method_name)
+ self.assertEqual(method_kwargs, tgt_message.method_kwargs)
+ self.assertEqual(direction, tgt_message.direction)
+ self.assertEqual(target_cell, target_cell)
+ self.assertFalse(tgt_message.fanout)
+ self.assertFalse(tgt_message.need_response)
+ self.assertEqual(self.our_name, tgt_message.routing_path)
+ self.assertEqual(1, tgt_message.hop_count)
+ self.assertEqual(99, tgt_message.max_hop_count)
+ self.assertFalse(tgt_message.is_broadcast)
+ # Correct next hop?
+ next_hop = tgt_message._get_next_hop()
+ child_cell = self.state_manager.get_child_cell('child-cell2')
+ self.assertEqual(child_cell, next_hop)
+
+ def test_create_targeted_message_with_response(self):
+ self.flags(max_hop_count=99, group='cells')
+ our_name = 'child-cell1'
+ target_cell = 'child-cell1!api-cell'
+ msg_runner = fakes.get_message_runner(our_name)
+ method = 'fake_method'
+ method_kwargs = dict(arg1=1, arg2=2)
+ direction = 'up'
+ tgt_message = messaging._TargetedMessage(msg_runner,
+ self.ctxt, method,
+ method_kwargs, direction,
+ target_cell,
+ need_response=True)
+ self.assertEqual(self.ctxt, tgt_message.ctxt)
+ self.assertEqual(method, tgt_message.method_name)
+ self.assertEqual(method_kwargs, tgt_message.method_kwargs)
+ self.assertEqual(direction, tgt_message.direction)
+ self.assertEqual(target_cell, target_cell)
+ self.assertFalse(tgt_message.fanout)
+ self.assertTrue(tgt_message.need_response)
+ self.assertEqual(our_name, tgt_message.routing_path)
+ self.assertEqual(1, tgt_message.hop_count)
+ self.assertEqual(99, tgt_message.max_hop_count)
+ self.assertFalse(tgt_message.is_broadcast)
+ # Correct next hop?
+ next_hop = tgt_message._get_next_hop()
+ parent_cell = msg_runner.state_manager.get_parent_cell('api-cell')
+ self.assertEqual(parent_cell, next_hop)
+
+ def test_targeted_message_when_target_is_cell_state(self):
+ method = 'fake_method'
+ method_kwargs = dict(arg1=1, arg2=2)
+ direction = 'down'
+ target_cell = self.state_manager.get_child_cell('child-cell2')
+ tgt_message = messaging._TargetedMessage(self.msg_runner,
+ self.ctxt, method,
+ method_kwargs, direction,
+ target_cell)
+ self.assertEqual('api-cell!child-cell2', tgt_message.target_cell)
+ # Correct next hop?
+ next_hop = tgt_message._get_next_hop()
+ self.assertEqual(target_cell, next_hop)
+
+ def test_targeted_message_when_target_cell_state_is_me(self):
+ method = 'fake_method'
+ method_kwargs = dict(arg1=1, arg2=2)
+ direction = 'down'
+ target_cell = self.state_manager.get_my_state()
+ tgt_message = messaging._TargetedMessage(self.msg_runner,
+ self.ctxt, method,
+ method_kwargs, direction,
+ target_cell)
+ self.assertEqual('api-cell', tgt_message.target_cell)
+ # Correct next hop?
+ next_hop = tgt_message._get_next_hop()
+ self.assertEqual(target_cell, next_hop)
+
+ def test_create_broadcast_message(self):
+ self.flags(max_hop_count=99, group='cells')
+ self.flags(name='api-cell', max_hop_count=99, group='cells')
+ method = 'fake_method'
+ method_kwargs = dict(arg1=1, arg2=2)
+ direction = 'down'
+ bcast_message = messaging._BroadcastMessage(self.msg_runner,
+ self.ctxt, method,
+ method_kwargs, direction)
+ self.assertEqual(self.ctxt, bcast_message.ctxt)
+ self.assertEqual(method, bcast_message.method_name)
+ self.assertEqual(method_kwargs, bcast_message.method_kwargs)
+ self.assertEqual(direction, bcast_message.direction)
+ self.assertFalse(bcast_message.fanout)
+ self.assertFalse(bcast_message.need_response)
+ self.assertEqual(self.our_name, bcast_message.routing_path)
+ self.assertEqual(1, bcast_message.hop_count)
+ self.assertEqual(99, bcast_message.max_hop_count)
+ self.assertTrue(bcast_message.is_broadcast)
+ # Correct next hops?
+ next_hops = bcast_message._get_next_hops()
+ child_cells = self.state_manager.get_child_cells()
+ self.assertEqual(child_cells, next_hops)
+
+ def test_create_broadcast_message_with_response(self):
+ self.flags(max_hop_count=99, group='cells')
+ our_name = 'child-cell1'
+ msg_runner = fakes.get_message_runner(our_name)
+ method = 'fake_method'
+ method_kwargs = dict(arg1=1, arg2=2)
+ direction = 'up'
+ bcast_message = messaging._BroadcastMessage(msg_runner, self.ctxt,
+ method, method_kwargs, direction, need_response=True)
+ self.assertEqual(self.ctxt, bcast_message.ctxt)
+ self.assertEqual(method, bcast_message.method_name)
+ self.assertEqual(method_kwargs, bcast_message.method_kwargs)
+ self.assertEqual(direction, bcast_message.direction)
+ self.assertFalse(bcast_message.fanout)
+ self.assertTrue(bcast_message.need_response)
+ self.assertEqual(our_name, bcast_message.routing_path)
+ self.assertEqual(1, bcast_message.hop_count)
+ self.assertEqual(99, bcast_message.max_hop_count)
+ self.assertTrue(bcast_message.is_broadcast)
+ # Correct next hops?
+ next_hops = bcast_message._get_next_hops()
+ parent_cells = msg_runner.state_manager.get_parent_cells()
+ self.assertEqual(parent_cells, next_hops)
+
+ def test_self_targeted_message(self):
+ target_cell = 'api-cell'
+ method = 'our_fake_method'
+ method_kwargs = dict(arg1=1, arg2=2)
+ direction = 'down'
+
+ call_info = {}
+
+ def our_fake_method(message, **kwargs):
+ call_info['context'] = message.ctxt
+ call_info['routing_path'] = message.routing_path
+ call_info['kwargs'] = kwargs
+
+ fakes.stub_tgt_method(self, 'api-cell', 'our_fake_method',
+ our_fake_method)
+
+ tgt_message = messaging._TargetedMessage(self.msg_runner,
+ self.ctxt, method,
+ method_kwargs, direction,
+ target_cell)
+ tgt_message.process()
+
+ self.assertEqual(self.ctxt, call_info['context'])
+ self.assertEqual(method_kwargs, call_info['kwargs'])
+ self.assertEqual(target_cell, call_info['routing_path'])
+
+ def test_child_targeted_message(self):
+ target_cell = 'api-cell!child-cell1'
+ method = 'our_fake_method'
+ method_kwargs = dict(arg1=1, arg2=2)
+ direction = 'down'
+
+ call_info = {}
+
+ def our_fake_method(message, **kwargs):
+ call_info['context'] = message.ctxt
+ call_info['routing_path'] = message.routing_path
+ call_info['kwargs'] = kwargs
+
+ fakes.stub_tgt_method(self, 'child-cell1', 'our_fake_method',
+ our_fake_method)
+
+ tgt_message = messaging._TargetedMessage(self.msg_runner,
+ self.ctxt, method,
+ method_kwargs, direction,
+ target_cell)
+ tgt_message.process()
+
+ self.assertEqual(self.ctxt, call_info['context'])
+ self.assertEqual(method_kwargs, call_info['kwargs'])
+ self.assertEqual(target_cell, call_info['routing_path'])
+
+ def test_grandchild_targeted_message(self):
+ target_cell = 'api-cell!child-cell2!grandchild-cell1'
+ method = 'our_fake_method'
+ method_kwargs = dict(arg1=1, arg2=2)
+ direction = 'down'
+
+ call_info = {}
+
+ def our_fake_method(message, **kwargs):
+ call_info['context'] = message.ctxt
+ call_info['routing_path'] = message.routing_path
+ call_info['kwargs'] = kwargs
+
+ fakes.stub_tgt_method(self, 'grandchild-cell1', 'our_fake_method',
+ our_fake_method)
+
+ tgt_message = messaging._TargetedMessage(self.msg_runner,
+ self.ctxt, method,
+ method_kwargs, direction,
+ target_cell)
+ tgt_message.process()
+
+ self.assertEqual(self.ctxt, call_info['context'])
+ self.assertEqual(method_kwargs, call_info['kwargs'])
+ self.assertEqual(target_cell, call_info['routing_path'])
+
+ def test_grandchild_targeted_message_with_response(self):
+ target_cell = 'api-cell!child-cell2!grandchild-cell1'
+ method = 'our_fake_method'
+ method_kwargs = dict(arg1=1, arg2=2)
+ direction = 'down'
+
+ call_info = {}
+
+ def our_fake_method(message, **kwargs):
+ call_info['context'] = message.ctxt
+ call_info['routing_path'] = message.routing_path
+ call_info['kwargs'] = kwargs
+ return 'our_fake_response'
+
+ fakes.stub_tgt_method(self, 'grandchild-cell1', 'our_fake_method',
+ our_fake_method)
+
+ tgt_message = messaging._TargetedMessage(self.msg_runner,
+ self.ctxt, method,
+ method_kwargs, direction,
+ target_cell,
+ need_response=True)
+ response = tgt_message.process()
+
+ self.assertEqual(self.ctxt, call_info['context'])
+ self.assertEqual(method_kwargs, call_info['kwargs'])
+ self.assertEqual(target_cell, call_info['routing_path'])
+ self.assertFalse(response.failure)
+ self.assertTrue(response.value_or_raise(), 'our_fake_response')
+
+ def test_grandchild_targeted_message_with_error(self):
+ target_cell = 'api-cell!child-cell2!grandchild-cell1'
+ method = 'our_fake_method'
+ method_kwargs = dict(arg1=1, arg2=2)
+ direction = 'down'
+
+ def our_fake_method(message, **kwargs):
+ raise test.TestingException('this should be returned')
+
+ fakes.stub_tgt_method(self, 'grandchild-cell1', 'our_fake_method',
+ our_fake_method)
+
+ tgt_message = messaging._TargetedMessage(self.msg_runner,
+ self.ctxt, method,
+ method_kwargs, direction,
+ target_cell,
+ need_response=True)
+ response = tgt_message.process()
+ self.assertTrue(response.failure)
+ self.assertRaises(test.TestingException, response.value_or_raise)
+
+ def test_grandchild_targeted_message_max_hops(self):
+ self.flags(max_hop_count=2, group='cells')
+ target_cell = 'api-cell!child-cell2!grandchild-cell1'
+ method = 'our_fake_method'
+ method_kwargs = dict(arg1=1, arg2=2)
+ direction = 'down'
+
+ def our_fake_method(message, **kwargs):
+ raise test.TestingException('should not be reached')
+
+ fakes.stub_tgt_method(self, 'grandchild-cell1', 'our_fake_method',
+ our_fake_method)
+
+ tgt_message = messaging._TargetedMessage(self.msg_runner,
+ self.ctxt, method,
+ method_kwargs, direction,
+ target_cell,
+ need_response=True)
+ response = tgt_message.process()
+ self.assertTrue(response.failure)
+ self.assertRaises(exception.CellMaxHopCountReached,
+ response.value_or_raise)
+
+ def test_targeted_message_invalid_cell(self):
+ target_cell = 'api-cell!child-cell2!grandchild-cell4'
+ method = 'our_fake_method'
+ method_kwargs = dict(arg1=1, arg2=2)
+ direction = 'down'
+
+ tgt_message = messaging._TargetedMessage(self.msg_runner,
+ self.ctxt, method,
+ method_kwargs, direction,
+ target_cell,
+ need_response=True)
+ response = tgt_message.process()
+ self.assertTrue(response.failure)
+ self.assertRaises(exception.CellRoutingInconsistency,
+ response.value_or_raise)
+
+ def test_targeted_message_invalid_cell2(self):
+ target_cell = 'unknown-cell!child-cell2'
+ method = 'our_fake_method'
+ method_kwargs = dict(arg1=1, arg2=2)
+ direction = 'down'
+
+ tgt_message = messaging._TargetedMessage(self.msg_runner,
+ self.ctxt, method,
+ method_kwargs, direction,
+ target_cell,
+ need_response=True)
+ response = tgt_message.process()
+ self.assertTrue(response.failure)
+ self.assertRaises(exception.CellRoutingInconsistency,
+ response.value_or_raise)
+
+ def test_broadcast_routing(self):
+ method = 'our_fake_method'
+ method_kwargs = dict(arg1=1, arg2=2)
+ direction = 'down'
+
+ cells = set()
+
+ def our_fake_method(message, **kwargs):
+ cells.add(message.routing_path)
+
+ fakes.stub_bcast_methods(self, 'our_fake_method', our_fake_method)
+
+ bcast_message = messaging._BroadcastMessage(self.msg_runner,
+ self.ctxt, method,
+ method_kwargs,
+ direction,
+ run_locally=True)
+ bcast_message.process()
+ # fakes creates 8 cells (including ourself).
+ self.assertEqual(len(cells), 8)
+
+ def test_broadcast_routing_up(self):
+ method = 'our_fake_method'
+ method_kwargs = dict(arg1=1, arg2=2)
+ direction = 'up'
+ msg_runner = fakes.get_message_runner('grandchild-cell3')
+
+ cells = set()
+
+ def our_fake_method(message, **kwargs):
+ cells.add(message.routing_path)
+
+ fakes.stub_bcast_methods(self, 'our_fake_method', our_fake_method)
+
+ bcast_message = messaging._BroadcastMessage(msg_runner, self.ctxt,
+ method, method_kwargs,
+ direction,
+ run_locally=True)
+ bcast_message.process()
+ # Paths are reversed, since going 'up'
+ expected = set(['grandchild-cell3', 'grandchild-cell3!child-cell3',
+ 'grandchild-cell3!child-cell3!api-cell'])
+ self.assertEqual(expected, cells)
+
+ def test_broadcast_routing_without_ourselves(self):
+ method = 'our_fake_method'
+ method_kwargs = dict(arg1=1, arg2=2)
+ direction = 'down'
+
+ cells = set()
+
+ def our_fake_method(message, **kwargs):
+ cells.add(message.routing_path)
+
+ fakes.stub_bcast_methods(self, 'our_fake_method', our_fake_method)
+
+ bcast_message = messaging._BroadcastMessage(self.msg_runner,
+ self.ctxt, method,
+ method_kwargs,
+ direction,
+ run_locally=False)
+ bcast_message.process()
+ # fakes creates 8 cells (including ourself). So we should see
+ # only 7 here.
+ self.assertEqual(len(cells), 7)
+
+ def test_broadcast_routing_with_response(self):
+ method = 'our_fake_method'
+ method_kwargs = dict(arg1=1, arg2=2)
+ direction = 'down'
+
+ def our_fake_method(message, **kwargs):
+ return 'response-%s' % message.routing_path
+
+ fakes.stub_bcast_methods(self, 'our_fake_method', our_fake_method)
+
+ bcast_message = messaging._BroadcastMessage(self.msg_runner,
+ self.ctxt, method,
+ method_kwargs,
+ direction,
+ run_locally=True,
+ need_response=True)
+ responses = bcast_message.process()
+ self.assertEqual(len(responses), 8)
+ for response in responses:
+ self.assertFalse(response.failure)
+ self.assertEqual('response-%s' % response.cell_name,
+ response.value_or_raise())
+
+ def test_broadcast_routing_with_response_max_hops(self):
+ self.flags(max_hop_count=2, group='cells')
+ method = 'our_fake_method'
+ method_kwargs = dict(arg1=1, arg2=2)
+ direction = 'down'
+
+ def our_fake_method(message, **kwargs):
+ return 'response-%s' % message.routing_path
+
+ fakes.stub_bcast_methods(self, 'our_fake_method', our_fake_method)
+
+ bcast_message = messaging._BroadcastMessage(self.msg_runner,
+ self.ctxt, method,
+ method_kwargs,
+ direction,
+ run_locally=True,
+ need_response=True)
+ responses = bcast_message.process()
+ # Should only get responses from our immediate children (and
+ # ourselves)
+ self.assertEqual(len(responses), 5)
+ for response in responses:
+ self.assertFalse(response.failure)
+ self.assertEqual('response-%s' % response.cell_name,
+ response.value_or_raise())
+
+ def test_broadcast_routing_with_all_erroring(self):
+ method = 'our_fake_method'
+ method_kwargs = dict(arg1=1, arg2=2)
+ direction = 'down'
+
+ def our_fake_method(message, **kwargs):
+ raise test.TestingException('fake failure')
+
+ fakes.stub_bcast_methods(self, 'our_fake_method', our_fake_method)
+
+ bcast_message = messaging._BroadcastMessage(self.msg_runner,
+ self.ctxt, method,
+ method_kwargs,
+ direction,
+ run_locally=True,
+ need_response=True)
+ responses = bcast_message.process()
+ self.assertEqual(len(responses), 8)
+ for response in responses:
+ self.assertTrue(response.failure)
+ self.assertRaises(test.TestingException, response.value_or_raise)
+
+ def test_broadcast_routing_with_two_erroring(self):
+ method = 'our_fake_method'
+ method_kwargs = dict(arg1=1, arg2=2)
+ direction = 'down'
+
+ def our_fake_method_failing(message, **kwargs):
+ raise test.TestingException('fake failure')
+
+ def our_fake_method(message, **kwargs):
+ return 'response-%s' % message.routing_path
+
+ fakes.stub_bcast_methods(self, 'our_fake_method', our_fake_method)
+ fakes.stub_bcast_method(self, 'child-cell2', 'our_fake_method',
+ our_fake_method_failing)
+ fakes.stub_bcast_method(self, 'grandchild-cell3', 'our_fake_method',
+ our_fake_method_failing)
+
+ bcast_message = messaging._BroadcastMessage(self.msg_runner,
+ self.ctxt, method,
+ method_kwargs,
+ direction,
+ run_locally=True,
+ need_response=True)
+ responses = bcast_message.process()
+ self.assertEqual(len(responses), 8)
+ failure_responses = [resp for resp in responses if resp.failure]
+ success_responses = [resp for resp in responses if not resp.failure]
+ self.assertEqual(len(failure_responses), 2)
+ self.assertEqual(len(success_responses), 6)
+
+ for response in success_responses:
+ self.assertFalse(response.failure)
+ self.assertEqual('response-%s' % response.cell_name,
+ response.value_or_raise())
+
+ for response in failure_responses:
+ self.assertIn(response.cell_name, ['api-cell!child-cell2',
+ 'api-cell!child-cell3!grandchild-cell3'])
+ self.assertTrue(response.failure)
+ self.assertRaises(test.TestingException, response.value_or_raise)
+
+
+class CellsTargetedMethodsTestCase(test.TestCase):
+ """Test case for _TargetedMessageMethods class. Most of these
+ tests actually test the full path from the MessageRunner through
+ to the functionality of the message method. Hits 2 birds with 1
+ stone, even though it's a little more than a unit test.
+ """
+ def setUp(self):
+ super(CellsTargetedMethodsTestCase, self).setUp()
+ fakes.init(self)
+ self.ctxt = context.RequestContext('fake', 'fake')
+ self._setup_attrs('api-cell', 'api-cell!child-cell2')
+
+ def _setup_attrs(self, source_cell, target_cell):
+ self.tgt_cell_name = target_cell
+ self.src_msg_runner = fakes.get_message_runner(source_cell)
+ self.src_state_manager = self.src_msg_runner.state_manager
+ tgt_shortname = target_cell.split('!')[-1]
+ self.tgt_cell_mgr = fakes.get_cells_manager(tgt_shortname)
+ self.tgt_msg_runner = self.tgt_cell_mgr.msg_runner
+ self.tgt_scheduler = self.tgt_msg_runner.scheduler
+ self.tgt_state_manager = self.tgt_msg_runner.state_manager
+ methods_cls = self.tgt_msg_runner.methods_by_type['targeted']
+ self.tgt_methods_cls = methods_cls
+ self.tgt_compute_api = methods_cls.compute_api
+ self.tgt_db_inst = methods_cls.db
+
+ def test_schedule_run_instance(self):
+ host_sched_kwargs = {'filter_properties': {},
+ 'key1': 'value1',
+ 'key2': 'value2'}
+ self.mox.StubOutWithMock(self.tgt_scheduler, 'run_instance')
+ self.tgt_scheduler.run_instance(self.ctxt, host_sched_kwargs)
+ self.mox.ReplayAll()
+ self.src_msg_runner.schedule_run_instance(self.ctxt,
+ self.tgt_cell_name,
+ host_sched_kwargs)
+
+ def test_call_compute_api_method(self):
+
+ instance_uuid = 'fake_instance_uuid'
+ method_info = {'method': 'reboot',
+ 'method_args': (instance_uuid, 2, 3),
+ 'method_kwargs': {'arg1': 'val1', 'arg2': 'val2'}}
+ self.mox.StubOutWithMock(self.tgt_compute_api, 'reboot')
+ self.mox.StubOutWithMock(self.tgt_db_inst, 'instance_get_by_uuid')
+
+ self.tgt_db_inst.instance_get_by_uuid(self.ctxt,
+ instance_uuid).AndReturn(
+ 'fake_instance')
+ self.tgt_compute_api.reboot(self.ctxt, 'fake_instance', 2, 3,
+ arg1='val1', arg2='val2').AndReturn('fake_result')
+ self.mox.ReplayAll()
+
+ response = self.src_msg_runner.run_compute_api_method(
+ self.ctxt,
+ self.tgt_cell_name,
+ method_info,
+ True)
+ result = response.value_or_raise()
+ self.assertEqual('fake_result', result)
+
+ def test_call_compute_api_method_unknown_instance(self):
+ # Unknown instance should send a broadcast up that instance
+ # is gone.
+ instance_uuid = 'fake_instance_uuid'
+ instance = {'uuid': instance_uuid}
+ method_info = {'method': 'reboot',
+ 'method_args': (instance_uuid, 2, 3),
+ 'method_kwargs': {'arg1': 'val1', 'arg2': 'val2'}}
+
+ self.mox.StubOutWithMock(self.tgt_db_inst, 'instance_get_by_uuid')
+ self.mox.StubOutWithMock(self.tgt_msg_runner,
+ 'instance_destroy_at_top')
+
+ self.tgt_db_inst.instance_get_by_uuid(self.ctxt,
+ 'fake_instance_uuid').AndRaise(
+ exception.InstanceNotFound(instance_id=instance_uuid))
+ self.tgt_msg_runner.instance_destroy_at_top(self.ctxt, instance)
+
+ self.mox.ReplayAll()
+
+ response = self.src_msg_runner.run_compute_api_method(
+ self.ctxt,
+ self.tgt_cell_name,
+ method_info,
+ True)
+ self.assertRaises(exception.InstanceNotFound,
+ response.value_or_raise)
+
+ def test_update_capabilities(self):
+ # Route up to API
+ self._setup_attrs('child-cell2', 'child-cell2!api-cell')
+ capabs = {'cap1': set(['val1', 'val2']),
+ 'cap2': set(['val3'])}
+ # The list(set([])) seems silly, but we can't assume the order
+ # of the list... This behavior should match the code we're
+ # testing... which is check that a set was converted to a list.
+ expected_capabs = {'cap1': list(set(['val1', 'val2'])),
+ 'cap2': ['val3']}
+ self.mox.StubOutWithMock(self.src_state_manager,
+ 'get_our_capabilities')
+ self.mox.StubOutWithMock(self.tgt_state_manager,
+ 'update_cell_capabilities')
+ self.mox.StubOutWithMock(self.tgt_msg_runner,
+ 'tell_parents_our_capabilities')
+ self.src_state_manager.get_our_capabilities().AndReturn(capabs)
+ self.tgt_state_manager.update_cell_capabilities('child-cell2',
+ expected_capabs)
+ self.tgt_msg_runner.tell_parents_our_capabilities(self.ctxt)
+
+ self.mox.ReplayAll()
+
+ self.src_msg_runner.tell_parents_our_capabilities(self.ctxt)
+
+ def test_update_capacities(self):
+ self._setup_attrs('child-cell2', 'child-cell2!api-cell')
+ capacs = 'fake_capacs'
+ self.mox.StubOutWithMock(self.src_state_manager,
+ 'get_our_capacities')
+ self.mox.StubOutWithMock(self.tgt_state_manager,
+ 'update_cell_capacities')
+ self.mox.StubOutWithMock(self.tgt_msg_runner,
+ 'tell_parents_our_capacities')
+ self.src_state_manager.get_our_capacities().AndReturn(capacs)
+ self.tgt_state_manager.update_cell_capacities('child-cell2',
+ capacs)
+ self.tgt_msg_runner.tell_parents_our_capacities(self.ctxt)
+
+ self.mox.ReplayAll()
+
+ self.src_msg_runner.tell_parents_our_capacities(self.ctxt)
+
+ def test_announce_capabilities(self):
+ self._setup_attrs('api-cell', 'api-cell!child-cell1')
+ # To make this easier to test, make us only have 1 child cell.
+ cell_state = self.src_state_manager.child_cells['child-cell1']
+ self.src_state_manager.child_cells = {'child-cell1': cell_state}
+
+ self.mox.StubOutWithMock(self.tgt_msg_runner,
+ 'tell_parents_our_capabilities')
+ self.tgt_msg_runner.tell_parents_our_capabilities(self.ctxt)
+
+ self.mox.ReplayAll()
+
+ self.src_msg_runner.ask_children_for_capabilities(self.ctxt)
+
+ def test_announce_capacities(self):
+ self._setup_attrs('api-cell', 'api-cell!child-cell1')
+ # To make this easier to test, make us only have 1 child cell.
+ cell_state = self.src_state_manager.child_cells['child-cell1']
+ self.src_state_manager.child_cells = {'child-cell1': cell_state}
+
+ self.mox.StubOutWithMock(self.tgt_msg_runner,
+ 'tell_parents_our_capacities')
+ self.tgt_msg_runner.tell_parents_our_capacities(self.ctxt)
+
+ self.mox.ReplayAll()
+
+ self.src_msg_runner.ask_children_for_capacities(self.ctxt)
+
+
+class CellsBroadcastMethodsTestCase(test.TestCase):
+ """Test case for _BroadcastMessageMethods class. Most of these
+ tests actually test the full path from the MessageRunner through
+ to the functionality of the message method. Hits 2 birds with 1
+ stone, even though it's a little more than a unit test.
+ """
+
+ def setUp(self):
+ super(CellsBroadcastMethodsTestCase, self).setUp()
+ fakes.init(self)
+ self.ctxt = context.RequestContext('fake', 'fake')
+ self._setup_attrs()
+
+ def _setup_attrs(self, up=True):
+ mid_cell = 'child-cell2'
+ if up:
+ src_cell = 'grandchild-cell1'
+ tgt_cell = 'api-cell'
+ else:
+ src_cell = 'api-cell'
+ tgt_cell = 'grandchild-cell1'
+
+ self.src_msg_runner = fakes.get_message_runner(src_cell)
+ methods_cls = self.src_msg_runner.methods_by_type['broadcast']
+ self.src_methods_cls = methods_cls
+ self.src_db_inst = methods_cls.db
+ self.src_compute_api = methods_cls.compute_api
+
+ self.mid_msg_runner = fakes.get_message_runner(mid_cell)
+ methods_cls = self.mid_msg_runner.methods_by_type['broadcast']
+ self.mid_methods_cls = methods_cls
+ self.mid_db_inst = methods_cls.db
+ self.mid_compute_api = methods_cls.compute_api
+
+ self.tgt_msg_runner = fakes.get_message_runner(tgt_cell)
+ methods_cls = self.tgt_msg_runner.methods_by_type['broadcast']
+ self.tgt_methods_cls = methods_cls
+ self.tgt_db_inst = methods_cls.db
+ self.tgt_compute_api = methods_cls.compute_api
+
+ def test_at_the_top(self):
+ self.assertTrue(self.tgt_methods_cls._at_the_top())
+ self.assertFalse(self.mid_methods_cls._at_the_top())
+ self.assertFalse(self.src_methods_cls._at_the_top())
+
+ def test_instance_update_at_top(self):
+ fake_info_cache = {'id': 1,
+ 'instance': 'fake_instance',
+ 'other': 'moo'}
+ fake_sys_metadata = [{'id': 1,
+ 'key': 'key1',
+ 'value': 'value1'},
+ {'id': 2,
+ 'key': 'key2',
+ 'value': 'value2'}]
+ fake_instance = {'id': 2,
+ 'uuid': 'fake_uuid',
+ 'security_groups': 'fake',
+ 'instance_type': 'fake',
+ 'volumes': 'fake',
+ 'cell_name': 'fake',
+ 'name': 'fake',
+ 'metadata': 'fake',
+ 'info_cache': fake_info_cache,
+ 'system_metadata': fake_sys_metadata,
+ 'other': 'meow'}
+ expected_sys_metadata = {'key1': 'value1',
+ 'key2': 'value2'}
+ expected_info_cache = {'other': 'moo'}
+ expected_instance = {'system_metadata': expected_sys_metadata,
+ 'other': 'meow',
+ 'uuid': 'fake_uuid'}
+
+ # To show these should not be called in src/mid-level cell
+ self.mox.StubOutWithMock(self.src_db_inst, 'instance_update')
+ self.mox.StubOutWithMock(self.src_db_inst,
+ 'instance_info_cache_update')
+ self.mox.StubOutWithMock(self.mid_db_inst, 'instance_update')
+ self.mox.StubOutWithMock(self.mid_db_inst,
+ 'instance_info_cache_update')
+
+ self.mox.StubOutWithMock(self.tgt_db_inst, 'instance_update')
+ self.mox.StubOutWithMock(self.tgt_db_inst,
+ 'instance_info_cache_update')
+ self.tgt_db_inst.instance_update(self.ctxt, 'fake_uuid',
+ expected_instance,
+ update_cells=False)
+ self.tgt_db_inst.instance_info_cache_update(self.ctxt, 'fake_uuid',
+ expected_info_cache,
+ update_cells=False)
+ self.mox.ReplayAll()
+
+ self.src_msg_runner.instance_update_at_top(self.ctxt, fake_instance)
+
+ def test_instance_destroy_at_top(self):
+ fake_instance = {'uuid': 'fake_uuid'}
+
+ # To show these should not be called in src/mid-level cell
+ self.mox.StubOutWithMock(self.src_db_inst, 'instance_destroy')
+
+ self.mox.StubOutWithMock(self.tgt_db_inst, 'instance_destroy')
+ self.tgt_db_inst.instance_destroy(self.ctxt, 'fake_uuid',
+ update_cells=False)
+ self.mox.ReplayAll()
+
+ self.src_msg_runner.instance_destroy_at_top(self.ctxt, fake_instance)
+
+ def test_instance_hard_delete_everywhere(self):
+ # Reset this, as this is a broadcast down.
+ self._setup_attrs(up=False)
+ instance = {'uuid': 'meow'}
+
+ # Should not be called in src (API cell)
+ self.mox.StubOutWithMock(self.src_compute_api, 'delete')
+
+ self.mox.StubOutWithMock(self.mid_compute_api, 'delete')
+ self.mox.StubOutWithMock(self.tgt_compute_api, 'delete')
+
+ self.mid_compute_api.delete(self.ctxt, instance)
+ self.tgt_compute_api.delete(self.ctxt, instance)
+
+ self.mox.ReplayAll()
+
+ self.src_msg_runner.instance_delete_everywhere(self.ctxt,
+ instance, 'hard')
+
+ def test_instance_soft_delete_everywhere(self):
+ # Reset this, as this is a broadcast down.
+ self._setup_attrs(up=False)
+ instance = {'uuid': 'meow'}
+
+ # Should not be called in src (API cell)
+ self.mox.StubOutWithMock(self.src_compute_api, 'soft_delete')
+
+ self.mox.StubOutWithMock(self.mid_compute_api, 'soft_delete')
+ self.mox.StubOutWithMock(self.tgt_compute_api, 'soft_delete')
+
+ self.mid_compute_api.soft_delete(self.ctxt, instance)
+ self.tgt_compute_api.soft_delete(self.ctxt, instance)
+
+ self.mox.ReplayAll()
+
+ self.src_msg_runner.instance_delete_everywhere(self.ctxt,
+ instance, 'soft')
+
+ def test_instance_fault_create_at_top(self):
+ fake_instance_fault = {'id': 1,
+ 'other stuff': 2,
+ 'more stuff': 3}
+ expected_instance_fault = {'other stuff': 2,
+ 'more stuff': 3}
+
+ # Shouldn't be called for these 2 cells
+ self.mox.StubOutWithMock(self.src_db_inst, 'instance_fault_create')
+ self.mox.StubOutWithMock(self.mid_db_inst, 'instance_fault_create')
+
+ self.mox.StubOutWithMock(self.tgt_db_inst, 'instance_fault_create')
+ self.tgt_db_inst.instance_fault_create(self.ctxt,
+ expected_instance_fault)
+ self.mox.ReplayAll()
+
+ self.src_msg_runner.instance_fault_create_at_top(self.ctxt,
+ fake_instance_fault)
+
+ def test_bw_usage_update_at_top(self):
+ fake_bw_update_info = {'uuid': 'fake_uuid',
+ 'mac': 'fake_mac',
+ 'start_period': 'fake_start_period',
+ 'bw_in': 'fake_bw_in',
+ 'bw_out': 'fake_bw_out',
+ 'last_ctr_in': 'fake_last_ctr_in',
+ 'last_ctr_out': 'fake_last_ctr_out',
+ 'last_refreshed': 'fake_last_refreshed'}
+
+ # Shouldn't be called for these 2 cells
+ self.mox.StubOutWithMock(self.src_db_inst, 'bw_usage_update')
+ self.mox.StubOutWithMock(self.mid_db_inst, 'bw_usage_update')
+
+ self.mox.StubOutWithMock(self.tgt_db_inst, 'bw_usage_update')
+ self.tgt_db_inst.bw_usage_update(self.ctxt, **fake_bw_update_info)
+
+ self.mox.ReplayAll()
+
+ self.src_msg_runner.bw_usage_update_at_top(self.ctxt,
+ fake_bw_update_info)
diff --git a/nova/tests/cells/test_cells_rpc_driver.py b/nova/tests/cells/test_cells_rpc_driver.py
new file mode 100644
index 0000000000..a44fe93765
--- /dev/null
+++ b/nova/tests/cells/test_cells_rpc_driver.py
@@ -0,0 +1,218 @@
+# Copyright (c) 2012 Rackspace Hosting
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+"""
+Tests For Cells RPC Communication Driver
+"""
+
+from nova.cells import messaging
+from nova.cells import rpc_driver
+from nova import context
+from nova.openstack.common import cfg
+from nova.openstack.common import rpc
+from nova.openstack.common.rpc import dispatcher as rpc_dispatcher
+from nova import test
+from nova.tests.cells import fakes
+
+CONF = cfg.CONF
+CONF.import_opt('rpc_driver_queue_base', 'nova.cells.rpc_driver',
+ group='cells')
+
+
+class CellsRPCDriverTestCase(test.TestCase):
+ """Test case for Cells communication via RPC."""
+
+ def setUp(self):
+ super(CellsRPCDriverTestCase, self).setUp()
+ fakes.init(self)
+ self.ctxt = context.RequestContext('fake', 'fake')
+ self.driver = rpc_driver.CellsRPCDriver()
+
+ def test_start_consumers(self):
+ self.flags(rpc_driver_queue_base='cells.intercell42', group='cells')
+ rpc_consumers = []
+ rpc_conns = []
+ fake_msg_runner = fakes.get_message_runner('api-cell')
+ call_info = {}
+
+ class FakeInterCellRPCDispatcher(object):
+ def __init__(_self, msg_runner):
+ self.assertEqual(fake_msg_runner, msg_runner)
+ call_info['intercell_dispatcher'] = _self
+
+ class FakeRPCDispatcher(object):
+ def __init__(_self, proxy_objs):
+ self.assertEqual([call_info['intercell_dispatcher']],
+ proxy_objs)
+ call_info['rpc_dispatcher'] = _self
+
+ class FakeRPCConn(object):
+ def create_consumer(_self, topic, proxy_obj, **kwargs):
+ self.assertEqual(call_info['rpc_dispatcher'], proxy_obj)
+ rpc_consumers.append((topic, kwargs))
+
+ def consume_in_thread(_self):
+ pass
+
+ def _fake_create_connection(new):
+ self.assertTrue(new)
+ fake_conn = FakeRPCConn()
+ rpc_conns.append(fake_conn)
+ return fake_conn
+
+ self.stubs.Set(rpc, 'create_connection', _fake_create_connection)
+ self.stubs.Set(rpc_driver, 'InterCellRPCDispatcher',
+ FakeInterCellRPCDispatcher)
+ self.stubs.Set(rpc_dispatcher, 'RpcDispatcher', FakeRPCDispatcher)
+
+ self.driver.start_consumers(fake_msg_runner)
+
+ for message_type in ['broadcast', 'response', 'targeted']:
+ topic = 'cells.intercell42.' + message_type
+ self.assertIn((topic, {'fanout': True}), rpc_consumers)
+ self.assertIn((topic, {'fanout': False}), rpc_consumers)
+ self.assertEqual(rpc_conns, self.driver.rpc_connections)
+
+ def test_stop_consumers(self):
+ call_info = {'closed': []}
+
+ class FakeRPCConn(object):
+ def close(self):
+ call_info['closed'].append(self)
+
+ fake_conns = [FakeRPCConn() for x in xrange(5)]
+ self.driver.rpc_connections = fake_conns
+ self.driver.stop_consumers()
+ self.assertEqual(fake_conns, call_info['closed'])
+
+ def test_send_message_to_cell_cast(self):
+ msg_runner = fakes.get_message_runner('api-cell')
+ cell_state = fakes.get_cell_state('api-cell', 'child-cell2')
+ message = messaging._TargetedMessage(msg_runner,
+ self.ctxt, 'fake', 'fake', 'down', cell_state, fanout=False)
+
+ call_info = {}
+
+ def _fake_make_msg(method, **kwargs):
+ call_info['rpc_method'] = method
+ call_info['rpc_kwargs'] = kwargs
+ return 'fake-message'
+
+ def _fake_cast_to_server(*args, **kwargs):
+ call_info['cast_args'] = args
+ call_info['cast_kwargs'] = kwargs
+
+ self.stubs.Set(rpc, 'cast_to_server', _fake_cast_to_server)
+ self.stubs.Set(self.driver.intercell_rpcapi, 'make_msg',
+ _fake_make_msg)
+ self.stubs.Set(self.driver.intercell_rpcapi, 'cast_to_server',
+ _fake_cast_to_server)
+
+ self.driver.send_message_to_cell(cell_state, message)
+ expected_server_params = {'hostname': 'rpc_host2',
+ 'password': 'password2',
+ 'port': 'rpc_port2',
+ 'username': 'username2',
+ 'virtual_host': 'rpc_vhost2'}
+ expected_cast_args = (self.ctxt, expected_server_params,
+ 'fake-message')
+ expected_cast_kwargs = {'topic': 'cells.intercell.targeted'}
+ expected_rpc_kwargs = {'message': message.to_json()}
+ self.assertEqual(expected_cast_args, call_info['cast_args'])
+ self.assertEqual(expected_cast_kwargs, call_info['cast_kwargs'])
+ self.assertEqual('process_message', call_info['rpc_method'])
+ self.assertEqual(expected_rpc_kwargs, call_info['rpc_kwargs'])
+
+ def test_send_message_to_cell_fanout_cast(self):
+ msg_runner = fakes.get_message_runner('api-cell')
+ cell_state = fakes.get_cell_state('api-cell', 'child-cell2')
+ message = messaging._TargetedMessage(msg_runner,
+ self.ctxt, 'fake', 'fake', 'down', cell_state, fanout=True)
+
+ call_info = {}
+
+ def _fake_make_msg(method, **kwargs):
+ call_info['rpc_method'] = method
+ call_info['rpc_kwargs'] = kwargs
+ return 'fake-message'
+
+ def _fake_fanout_cast_to_server(*args, **kwargs):
+ call_info['cast_args'] = args
+ call_info['cast_kwargs'] = kwargs
+
+ self.stubs.Set(rpc, 'fanout_cast_to_server',
+ _fake_fanout_cast_to_server)
+ self.stubs.Set(self.driver.intercell_rpcapi, 'make_msg',
+ _fake_make_msg)
+ self.stubs.Set(self.driver.intercell_rpcapi,
+ 'fanout_cast_to_server', _fake_fanout_cast_to_server)
+
+ self.driver.send_message_to_cell(cell_state, message)
+ expected_server_params = {'hostname': 'rpc_host2',
+ 'password': 'password2',
+ 'port': 'rpc_port2',
+ 'username': 'username2',
+ 'virtual_host': 'rpc_vhost2'}
+ expected_cast_args = (self.ctxt, expected_server_params,
+ 'fake-message')
+ expected_cast_kwargs = {'topic': 'cells.intercell.targeted'}
+ expected_rpc_kwargs = {'message': message.to_json()}
+ self.assertEqual(expected_cast_args, call_info['cast_args'])
+ self.assertEqual(expected_cast_kwargs, call_info['cast_kwargs'])
+ self.assertEqual('process_message', call_info['rpc_method'])
+ self.assertEqual(expected_rpc_kwargs, call_info['rpc_kwargs'])
+
+ def test_rpc_topic_uses_message_type(self):
+ self.flags(rpc_driver_queue_base='cells.intercell42', group='cells')
+ msg_runner = fakes.get_message_runner('api-cell')
+ cell_state = fakes.get_cell_state('api-cell', 'child-cell2')
+ message = messaging._BroadcastMessage(msg_runner,
+ self.ctxt, 'fake', 'fake', 'down', fanout=True)
+ message.message_type = 'fake-message-type'
+
+ call_info = {}
+
+ def _fake_fanout_cast_to_server(*args, **kwargs):
+ call_info['topic'] = kwargs.get('topic')
+
+ self.stubs.Set(self.driver.intercell_rpcapi,
+ 'fanout_cast_to_server', _fake_fanout_cast_to_server)
+
+ self.driver.send_message_to_cell(cell_state, message)
+ self.assertEqual('cells.intercell42.fake-message-type',
+ call_info['topic'])
+
+ def test_process_message(self):
+ msg_runner = fakes.get_message_runner('api-cell')
+ dispatcher = rpc_driver.InterCellRPCDispatcher(msg_runner)
+ message = messaging._BroadcastMessage(msg_runner,
+ self.ctxt, 'fake', 'fake', 'down', fanout=True)
+
+ call_info = {}
+
+ def _fake_message_from_json(json_message):
+ call_info['json_message'] = json_message
+ self.assertEqual(message.to_json(), json_message)
+ return message
+
+ def _fake_process():
+ call_info['process_called'] = True
+
+ self.stubs.Set(msg_runner, 'message_from_json',
+ _fake_message_from_json)
+ self.stubs.Set(message, 'process', _fake_process)
+
+ dispatcher.process_message(self.ctxt, message.to_json())
+ self.assertEqual(message.to_json(), call_info['json_message'])
+ self.assertTrue(call_info['process_called'])
diff --git a/nova/tests/cells/test_cells_rpcapi.py b/nova/tests/cells/test_cells_rpcapi.py
new file mode 100644
index 0000000000..b51bfa0c1b
--- /dev/null
+++ b/nova/tests/cells/test_cells_rpcapi.py
@@ -0,0 +1,206 @@
+# Copyright (c) 2012 Rackspace Hosting
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+"""
+Tests For Cells RPCAPI
+"""
+
+from nova.cells import rpcapi as cells_rpcapi
+from nova.openstack.common import cfg
+from nova.openstack.common import rpc
+from nova import test
+
+CONF = cfg.CONF
+CONF.import_opt('topic', 'nova.cells.opts', group='cells')
+
+
+class CellsAPITestCase(test.TestCase):
+ """Test case for cells.api interfaces."""
+
+ def setUp(self):
+ super(CellsAPITestCase, self).setUp()
+ self.fake_topic = 'fake_topic'
+ self.fake_context = 'fake_context'
+ self.flags(topic=self.fake_topic, enable=True, group='cells')
+ self.cells_rpcapi = cells_rpcapi.CellsAPI()
+
+ def _stub_rpc_method(self, rpc_method, result):
+ call_info = {}
+
+ def fake_rpc_method(ctxt, topic, msg, *args, **kwargs):
+ call_info['context'] = ctxt
+ call_info['topic'] = topic
+ call_info['msg'] = msg
+ return result
+
+ self.stubs.Set(rpc, rpc_method, fake_rpc_method)
+ return call_info
+
+ def _check_result(self, call_info, method, args, version=None):
+ if version is None:
+ version = self.cells_rpcapi.BASE_RPC_API_VERSION
+ self.assertEqual(self.fake_context, call_info['context'])
+ self.assertEqual(self.fake_topic, call_info['topic'])
+ self.assertEqual(method, call_info['msg']['method'])
+ self.assertEqual(version, call_info['msg']['version'])
+ self.assertEqual(args, call_info['msg']['args'])
+
+ def test_cast_compute_api_method(self):
+ fake_cell_name = 'fake_cell_name'
+ fake_method = 'fake_method'
+ fake_method_args = (1, 2)
+ fake_method_kwargs = {'kwarg1': 10, 'kwarg2': 20}
+
+ expected_method_info = {'method': fake_method,
+ 'method_args': fake_method_args,
+ 'method_kwargs': fake_method_kwargs}
+ expected_args = {'method_info': expected_method_info,
+ 'cell_name': fake_cell_name,
+ 'call': False}
+
+ call_info = self._stub_rpc_method('cast', None)
+
+ self.cells_rpcapi.cast_compute_api_method(self.fake_context,
+ fake_cell_name, fake_method,
+ *fake_method_args, **fake_method_kwargs)
+ self._check_result(call_info, 'run_compute_api_method',
+ expected_args)
+
+ def test_call_compute_api_method(self):
+ fake_cell_name = 'fake_cell_name'
+ fake_method = 'fake_method'
+ fake_method_args = (1, 2)
+ fake_method_kwargs = {'kwarg1': 10, 'kwarg2': 20}
+ fake_response = 'fake_response'
+
+ expected_method_info = {'method': fake_method,
+ 'method_args': fake_method_args,
+ 'method_kwargs': fake_method_kwargs}
+ expected_args = {'method_info': expected_method_info,
+ 'cell_name': fake_cell_name,
+ 'call': True}
+
+ call_info = self._stub_rpc_method('call', fake_response)
+
+ result = self.cells_rpcapi.call_compute_api_method(self.fake_context,
+ fake_cell_name, fake_method,
+ *fake_method_args, **fake_method_kwargs)
+ self._check_result(call_info, 'run_compute_api_method',
+ expected_args)
+ self.assertEqual(fake_response, result)
+
+ def test_schedule_run_instance(self):
+ call_info = self._stub_rpc_method('cast', None)
+
+ self.cells_rpcapi.schedule_run_instance(
+ self.fake_context, arg1=1, arg2=2, arg3=3)
+
+ expected_args = {'host_sched_kwargs': {'arg1': 1,
+ 'arg2': 2,
+ 'arg3': 3}}
+ self._check_result(call_info, 'schedule_run_instance',
+ expected_args)
+
+ def test_instance_update_at_top(self):
+ fake_info_cache = {'id': 1,
+ 'instance': 'fake_instance',
+ 'other': 'moo'}
+ fake_sys_metadata = [{'id': 1,
+ 'key': 'key1',
+ 'value': 'value1'},
+ {'id': 2,
+ 'key': 'key2',
+ 'value': 'value2'}]
+ fake_instance = {'id': 2,
+ 'security_groups': 'fake',
+ 'instance_type': 'fake',
+ 'volumes': 'fake',
+ 'cell_name': 'fake',
+ 'name': 'fake',
+ 'metadata': 'fake',
+ 'info_cache': fake_info_cache,
+ 'system_metadata': fake_sys_metadata,
+ 'other': 'meow'}
+
+ call_info = self._stub_rpc_method('cast', None)
+
+ self.cells_rpcapi.instance_update_at_top(
+ self.fake_context, fake_instance)
+
+ expected_args = {'instance': fake_instance}
+ self._check_result(call_info, 'instance_update_at_top',
+ expected_args)
+
+ def test_instance_destroy_at_top(self):
+ fake_instance = {'uuid': 'fake-uuid'}
+
+ call_info = self._stub_rpc_method('cast', None)
+
+ self.cells_rpcapi.instance_destroy_at_top(
+ self.fake_context, fake_instance)
+
+ expected_args = {'instance': fake_instance}
+ self._check_result(call_info, 'instance_destroy_at_top',
+ expected_args)
+
+ def test_instance_delete_everywhere(self):
+ fake_instance = {'uuid': 'fake-uuid'}
+
+ call_info = self._stub_rpc_method('cast', None)
+
+ self.cells_rpcapi.instance_delete_everywhere(
+ self.fake_context, fake_instance,
+ 'fake-type')
+
+ expected_args = {'instance': fake_instance,
+ 'delete_type': 'fake-type'}
+ self._check_result(call_info, 'instance_delete_everywhere',
+ expected_args)
+
+ def test_instance_fault_create_at_top(self):
+ fake_instance_fault = {'id': 2,
+ 'other': 'meow'}
+
+ call_info = self._stub_rpc_method('cast', None)
+
+ self.cells_rpcapi.instance_fault_create_at_top(
+ self.fake_context, fake_instance_fault)
+
+ expected_args = {'instance_fault': fake_instance_fault}
+ self._check_result(call_info, 'instance_fault_create_at_top',
+ expected_args)
+
+ def test_bw_usage_update_at_top(self):
+ update_args = ('fake_uuid', 'fake_mac', 'fake_start_period',
+ 'fake_bw_in', 'fake_bw_out', 'fake_ctr_in',
+ 'fake_ctr_out')
+ update_kwargs = {'last_refreshed': 'fake_refreshed'}
+
+ call_info = self._stub_rpc_method('cast', None)
+
+ self.cells_rpcapi.bw_usage_update_at_top(
+ self.fake_context, *update_args, **update_kwargs)
+
+ bw_update_info = {'uuid': 'fake_uuid',
+ 'mac': 'fake_mac',
+ 'start_period': 'fake_start_period',
+ 'bw_in': 'fake_bw_in',
+ 'bw_out': 'fake_bw_out',
+ 'last_ctr_in': 'fake_ctr_in',
+ 'last_ctr_out': 'fake_ctr_out',
+ 'last_refreshed': 'fake_refreshed'}
+
+ expected_args = {'bw_update_info': bw_update_info}
+ self._check_result(call_info, 'bw_usage_update_at_top',
+ expected_args)
diff --git a/nova/tests/cells/test_cells_scheduler.py b/nova/tests/cells/test_cells_scheduler.py
new file mode 100644
index 0000000000..66e7e245e1
--- /dev/null
+++ b/nova/tests/cells/test_cells_scheduler.py
@@ -0,0 +1,206 @@
+# Copyright (c) 2012 Rackspace Hosting
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+"""
+Tests For CellsScheduler
+"""
+import time
+
+from nova.compute import vm_states
+from nova import context
+from nova import db
+from nova import exception
+from nova.openstack.common import cfg
+from nova.openstack.common import uuidutils
+from nova import test
+from nova.tests.cells import fakes
+
+CONF = cfg.CONF
+CONF.import_opt('scheduler_retries', 'nova.cells.scheduler', group='cells')
+
+
+class CellsSchedulerTestCase(test.TestCase):
+ """Test case for CellsScheduler class"""
+
+ def setUp(self):
+ super(CellsSchedulerTestCase, self).setUp()
+ fakes.init(self)
+ self.msg_runner = fakes.get_message_runner('api-cell')
+ self.scheduler = self.msg_runner.scheduler
+ self.state_manager = self.msg_runner.state_manager
+ self.my_cell_state = self.state_manager.get_my_state()
+ self.ctxt = context.RequestContext('fake', 'fake')
+ instance_uuids = []
+ for x in xrange(3):
+ instance_uuids.append(uuidutils.generate_uuid())
+ self.instance_uuids = instance_uuids
+ self.request_spec = {'instance_uuids': instance_uuids,
+ 'other': 'stuff'}
+
+ def test_create_instances_here(self):
+ # Just grab the first instance type
+ inst_type = db.instance_type_get(self.ctxt, 1)
+ image = {'properties': {}}
+ instance_props = {'hostname': 'meow',
+ 'display_name': 'moo',
+ 'image_ref': 'fake_image_ref',
+ 'user_id': self.ctxt.user_id,
+ 'project_id': self.ctxt.project_id}
+ request_spec = {'instance_type': inst_type,
+ 'image': image,
+ 'security_group': ['default'],
+ 'block_device_mapping': [],
+ 'instance_properties': instance_props,
+ 'instance_uuids': self.instance_uuids}
+
+ call_info = {'uuids': []}
+
+ def _fake_instance_update_at_top(_ctxt, instance):
+ call_info['uuids'].append(instance['uuid'])
+
+ self.stubs.Set(self.msg_runner, 'instance_update_at_top',
+ _fake_instance_update_at_top)
+
+ self.scheduler._create_instances_here(self.ctxt, request_spec)
+ self.assertEqual(self.instance_uuids, call_info['uuids'])
+
+ for instance_uuid in self.instance_uuids:
+ instance = db.instance_get_by_uuid(self.ctxt, instance_uuid)
+ self.assertEqual('meow', instance['hostname'])
+ self.assertEqual('moo', instance['display_name'])
+ self.assertEqual('fake_image_ref', instance['image_ref'])
+
+ def test_run_instance_selects_child_cell(self):
+ # Make sure there's no capacity info so we're sure to
+ # select a child cell
+ our_cell_info = self.state_manager.get_my_state()
+ our_cell_info.capacities = {}
+
+ call_info = {'times': 0}
+
+ orig_fn = self.msg_runner.schedule_run_instance
+
+ def msg_runner_schedule_run_instance(ctxt, target_cell,
+ host_sched_kwargs):
+ # This gets called twice. Once for our running it
+ # in this cell.. and then it'll get called when the
+ # child cell is picked. So, first time.. just run it
+ # like normal.
+ if not call_info['times']:
+ call_info['times'] += 1
+ return orig_fn(ctxt, target_cell, host_sched_kwargs)
+ call_info['ctxt'] = ctxt
+ call_info['target_cell'] = target_cell
+ call_info['host_sched_kwargs'] = host_sched_kwargs
+
+ self.stubs.Set(self.msg_runner, 'schedule_run_instance',
+ msg_runner_schedule_run_instance)
+
+ host_sched_kwargs = {'request_spec': self.request_spec}
+ self.msg_runner.schedule_run_instance(self.ctxt,
+ self.my_cell_state, host_sched_kwargs)
+
+ self.assertEqual(self.ctxt, call_info['ctxt'])
+ self.assertEqual(host_sched_kwargs, call_info['host_sched_kwargs'])
+ child_cells = self.state_manager.get_child_cells()
+ self.assertIn(call_info['target_cell'], child_cells)
+
+ def test_run_instance_selects_current_cell(self):
+ # Make sure there's no child cells so that we will be
+ # selected
+ self.state_manager.child_cells = {}
+
+ call_info = {}
+
+ def fake_create_instances_here(ctxt, request_spec):
+ call_info['ctxt'] = ctxt
+ call_info['request_spec'] = request_spec
+
+ def fake_rpc_run_instance(ctxt, **host_sched_kwargs):
+ call_info['host_sched_kwargs'] = host_sched_kwargs
+
+ self.stubs.Set(self.scheduler, '_create_instances_here',
+ fake_create_instances_here)
+ self.stubs.Set(self.scheduler.scheduler_rpcapi,
+ 'run_instance', fake_rpc_run_instance)
+
+ host_sched_kwargs = {'request_spec': self.request_spec,
+ 'other': 'stuff'}
+ self.msg_runner.schedule_run_instance(self.ctxt,
+ self.my_cell_state, host_sched_kwargs)
+
+ self.assertEqual(self.ctxt, call_info['ctxt'])
+ self.assertEqual(self.request_spec, call_info['request_spec'])
+ self.assertEqual(host_sched_kwargs, call_info['host_sched_kwargs'])
+
+ def test_run_instance_retries_when_no_cells_avail(self):
+ self.flags(scheduler_retries=7, group='cells')
+
+ host_sched_kwargs = {'request_spec': self.request_spec}
+
+ call_info = {'num_tries': 0, 'errored_uuids': []}
+
+ def fake_run_instance(message, host_sched_kwargs):
+ call_info['num_tries'] += 1
+ raise exception.NoCellsAvailable()
+
+ def fake_sleep(_secs):
+ return
+
+ def fake_instance_update(ctxt, instance_uuid, values):
+ self.assertEqual(vm_states.ERROR, values['vm_state'])
+ call_info['errored_uuids'].append(instance_uuid)
+
+ self.stubs.Set(self.scheduler, '_run_instance', fake_run_instance)
+ self.stubs.Set(time, 'sleep', fake_sleep)
+ self.stubs.Set(db, 'instance_update', fake_instance_update)
+
+ self.msg_runner.schedule_run_instance(self.ctxt,
+ self.my_cell_state, host_sched_kwargs)
+
+ self.assertEqual(8, call_info['num_tries'])
+ self.assertEqual(self.instance_uuids, call_info['errored_uuids'])
+
+ def test_run_instance_on_random_exception(self):
+ self.flags(scheduler_retries=7, group='cells')
+
+ host_sched_kwargs = {'request_spec': self.request_spec}
+
+ call_info = {'num_tries': 0,
+ 'errored_uuids1': [],
+ 'errored_uuids2': []}
+
+ def fake_run_instance(message, host_sched_kwargs):
+ call_info['num_tries'] += 1
+ raise test.TestingException()
+
+ def fake_instance_update(ctxt, instance_uuid, values):
+ self.assertEqual(vm_states.ERROR, values['vm_state'])
+ call_info['errored_uuids1'].append(instance_uuid)
+
+ def fake_instance_update_at_top(ctxt, instance):
+ self.assertEqual(vm_states.ERROR, instance['vm_state'])
+ call_info['errored_uuids2'].append(instance['uuid'])
+
+ self.stubs.Set(self.scheduler, '_run_instance', fake_run_instance)
+ self.stubs.Set(db, 'instance_update', fake_instance_update)
+ self.stubs.Set(self.msg_runner, 'instance_update_at_top',
+ fake_instance_update_at_top)
+
+ self.msg_runner.schedule_run_instance(self.ctxt,
+ self.my_cell_state, host_sched_kwargs)
+ # Shouldn't retry
+ self.assertEqual(1, call_info['num_tries'])
+ self.assertEqual(self.instance_uuids, call_info['errored_uuids1'])
+ self.assertEqual(self.instance_uuids, call_info['errored_uuids2'])
diff --git a/nova/tests/compute/test_compute.py b/nova/tests/compute/test_compute.py
index d335f66756..104fd4e681 100644
--- a/nova/tests/compute/test_compute.py
+++ b/nova/tests/compute/test_compute.py
@@ -3696,8 +3696,12 @@ class ComputeAPITestCase(BaseTestCase):
{'vm_state': vm_states.SOFT_DELETED,
'task_state': None})
+ # Ensure quotas are committed
self.mox.StubOutWithMock(nova.quota.QUOTAS, 'commit')
nova.quota.QUOTAS.commit(mox.IgnoreArg(), mox.IgnoreArg())
+ if self.__class__.__name__ == 'CellsComputeAPITestCase':
+ # Called a 2nd time (for the child cell) when testing cells
+ nova.quota.QUOTAS.commit(mox.IgnoreArg(), mox.IgnoreArg())
self.mox.ReplayAll()
self.compute_api.restore(self.context, instance)
diff --git a/nova/tests/compute/test_compute_cells.py b/nova/tests/compute/test_compute_cells.py
new file mode 100644
index 0000000000..aa4b448d44
--- /dev/null
+++ b/nova/tests/compute/test_compute_cells.py
@@ -0,0 +1,99 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+# Copyright (c) 2012 Rackspace Hosting
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+"""
+Tests For Compute w/ Cells
+"""
+from nova.compute import cells_api as compute_cells_api
+from nova.openstack.common import log as logging
+from nova.tests.compute import test_compute
+
+
+LOG = logging.getLogger('nova.tests.test_compute_cells')
+
+ORIG_COMPUTE_API = None
+
+
+def stub_call_to_cells(context, instance, method, *args, **kwargs):
+ fn = getattr(ORIG_COMPUTE_API, method)
+ return fn(context, instance, *args, **kwargs)
+
+
+def stub_cast_to_cells(context, instance, method, *args, **kwargs):
+ fn = getattr(ORIG_COMPUTE_API, method)
+ fn(context, instance, *args, **kwargs)
+
+
+def deploy_stubs(stubs, api):
+ stubs.Set(api, '_call_to_cells', stub_call_to_cells)
+ stubs.Set(api, '_cast_to_cells', stub_cast_to_cells)
+
+
+class CellsComputeAPITestCase(test_compute.ComputeAPITestCase):
+ def setUp(self):
+ super(CellsComputeAPITestCase, self).setUp()
+ global ORIG_COMPUTE_API
+ ORIG_COMPUTE_API = self.compute_api
+
+ def _fake_cell_read_only(*args, **kwargs):
+ return False
+
+ def _fake_validate_cell(*args, **kwargs):
+ return
+
+ def _nop_update(context, instance, **kwargs):
+ return instance
+
+ self.compute_api = compute_cells_api.ComputeCellsAPI()
+ self.stubs.Set(self.compute_api, '_cell_read_only',
+ _fake_cell_read_only)
+ self.stubs.Set(self.compute_api, '_validate_cell',
+ _fake_validate_cell)
+
+ # NOTE(belliott) Don't update the instance state
+ # for the tests at the API layer. Let it happen after
+ # the stub cast to cells so that expected_task_states
+ # match.
+ self.stubs.Set(self.compute_api, 'update', _nop_update)
+
+ deploy_stubs(self.stubs, self.compute_api)
+
+ def tearDown(self):
+ global ORIG_COMPUTE_API
+ self.compute_api = ORIG_COMPUTE_API
+ super(CellsComputeAPITestCase, self).tearDown()
+
+ def test_instance_metadata(self):
+ self.skipTest("Test is incompatible with cells.")
+
+ def test_live_migrate(self):
+ self.skipTest("Test is incompatible with cells.")
+
+ def test_get_backdoor_port(self):
+ self.skipTest("Test is incompatible with cells.")
+
+
+class CellsComputePolicyTestCase(test_compute.ComputePolicyTestCase):
+ def setUp(self):
+ super(CellsComputePolicyTestCase, self).setUp()
+ global ORIG_COMPUTE_API
+ ORIG_COMPUTE_API = self.compute_api
+ self.compute_api = compute_cells_api.ComputeCellsAPI()
+ deploy_stubs(self.stubs, self.compute_api)
+
+ def tearDown(self):
+ global ORIG_COMPUTE_API
+ self.compute_api = ORIG_COMPUTE_API
+ super(CellsComputePolicyTestCase, self).tearDown()
diff --git a/setup.py b/setup.py
index f3da546187..e13ae4f64e 100644
--- a/setup.py
+++ b/setup.py
@@ -50,6 +50,7 @@ setuptools.setup(name='nova',
'bin/nova-api-metadata',
'bin/nova-api-os-compute',
'bin/nova-rpc-zmq-receiver',
+ 'bin/nova-cells',
'bin/nova-cert',
'bin/nova-clear-rabbit-queues',
'bin/nova-compute',