summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDevananda van der Veen <devananda.vdv@gmail.com>2014-01-04 14:57:05 -0800
committerDevananda van der Veen <devananda.vdv@gmail.com>2014-01-13 12:41:29 -0800
commit0fc3ad85e90a05322e20f4c2c0fce299d1c352f1 (patch)
treee467aff74e1d5d3f4d1aaa43ec791d9883ceb692
parent520467a2958949d16a1915acae18b30fc719c824 (diff)
downloadironic-0fc3ad85e90a05322e20f4c2c0fce299d1c352f1.tar.gz
Implement consistent hashing of nodes to conductors
Implement the consistent hashing of nodes to conductors. Implement automatic routing of RPC messages from the API tier to the conductor tier, by using the consistent hash ring. Improves _sync_power_states to only check the nodes mapped to that particular conductor. Add a stub for a rebalance method which can trigger certain actions by a conductor when a node is mapped to it by changes in the hash distribution. blueprint instance-mapping-by-consistent-hash Change-Id: Ib74a8ded49e4a85964c46d4a445ff0a2df39862a
-rw-r--r--ironic/api/controllers/v1/node.py32
-rw-r--r--ironic/conductor/manager.py67
-rw-r--r--ironic/conductor/rpcapi.py81
-rw-r--r--ironic/tests/api/test_nodes.py45
-rw-r--r--ironic/tests/conductor/test_manager.py4
-rw-r--r--ironic/tests/conductor/test_rpcapi.py24
6 files changed, 197 insertions, 56 deletions
diff --git a/ironic/api/controllers/v1/node.py b/ironic/api/controllers/v1/node.py
index 9f546b33a..020615960 100644
--- a/ironic/api/controllers/v1/node.py
+++ b/ironic/api/controllers/v1/node.py
@@ -113,6 +113,8 @@ class NodeStatesController(rest.RestController):
# TODO(lucasagomes): Test if it's able to transition to the
# target state from the current one
rpc_node = objects.Node.get_by_uuid(pecan.request.context, node_uuid)
+ topic = pecan.request.rpcapi.get_topic_for(rpc_node)
+
if rpc_node.target_power_state is not None:
raise wsme.exc.ClientSideError(_("Power operation for node %s is "
"already in progress.") %
@@ -124,10 +126,11 @@ class NodeStatesController(rest.RestController):
if target in [ir_states.POWER_ON,
ir_states.POWER_OFF,
ir_states.REBOOT]:
- pecan.request.rpcapi.change_node_power_state(pecan.request.context,
- node_uuid, target)
+ pecan.request.rpcapi.change_node_power_state(
+ pecan.request.context, node_uuid, target, topic)
else:
raise exception.InvalidStateRequested(state=target, node=node_uuid)
+
# FIXME(lucasagomes): Currently WSME doesn't support returning
# the Location header. Once it's implemented we should use the
# Location to point to the /states subresource of the node so
@@ -157,6 +160,8 @@ class NodeStatesController(rest.RestController):
"""
rpc_node = objects.Node.get_by_uuid(pecan.request.context, node_uuid)
+ topic = pecan.request.rpcapi.get_topic_for(rpc_node)
+
if rpc_node.target_provision_state is not None:
msg = _('Node %s is already being provisioned.') % rpc_node['uuid']
LOG.exception(msg)
@@ -172,11 +177,11 @@ class NodeStatesController(rest.RestController):
# lock.
if target == ir_states.ACTIVE:
- pecan.request.rpcapi.do_node_deploy(pecan.request.context,
- node_uuid)
+ pecan.request.rpcapi.do_node_deploy(
+ pecan.request.context, node_uuid, topic)
elif target == ir_states.DELETED:
- pecan.request.rpcapi.do_node_tear_down(pecan.request.context,
- node_uuid)
+ pecan.request.rpcapi.do_node_tear_down(
+ pecan.request.context, node_uuid, topic)
else:
raise exception.InvalidStateRequested(state=target, node=node_uuid)
# FIXME(lucasagomes): Currently WSME doesn't support returning
@@ -341,14 +346,15 @@ class NodeVendorPassthruController(rest.RestController):
:param data: body of data to supply to the specified method.
"""
# Raise an exception if node is not found
- objects.Node.get_by_uuid(pecan.request.context, node_uuid)
+ rpc_node = objects.Node.get_by_uuid(pecan.request.context, node_uuid)
+ topic = pecan.request.rpcapi.get_topic_for(rpc_node)
# Raise an exception if method is not specified
if not method:
raise wsme.exc.ClientSideError(_("Method not specified"))
return pecan.request.rpcapi.vendor_passthru(
- pecan.request.context, node_uuid, method, data)
+ pecan.request.context, node_uuid, method, data, topic)
class NodesController(rest.RestController):
@@ -492,9 +498,10 @@ class NodesController(rest.RestController):
def validate(self, node_uuid):
"""Validate the driver interfaces."""
# check if node exists
- node = objects.Node.get_by_uuid(pecan.request.context, node_uuid)
+ rpc_node = objects.Node.get_by_uuid(pecan.request.context, node_uuid)
+ topic = pecan.request.rpcapi.get_topic_for(rpc_node)
return pecan.request.rpcapi.validate_driver_interfaces(
- pecan.request.context, node.uuid)
+ pecan.request.context, rpc_node.uuid, topic)
@wsme_pecan.wsexpose(Node, types.uuid)
def get_one(self, node_uuid):
@@ -536,6 +543,7 @@ class NodesController(rest.RestController):
raise exception.OperationNotPermitted
rpc_node = objects.Node.get_by_uuid(pecan.request.context, node_uuid)
+ topic = pecan.request.rpcapi.get_topic_for(rpc_node)
# Check if node is transitioning state
if rpc_node['target_power_state'] or \
@@ -557,8 +565,8 @@ class NodesController(rest.RestController):
rpc_node[field] = getattr(node, field)
try:
- new_node = pecan.request.rpcapi.update_node(pecan.request.context,
- rpc_node)
+ new_node = pecan.request.rpcapi.update_node(
+ pecan.request.context, rpc_node, topic)
except Exception as e:
with excutils.save_and_reraise_exception():
LOG.exception(e)
diff --git a/ironic/conductor/manager.py b/ironic/conductor/manager.py
index 2b5fa6e15..30bb73546 100644
--- a/ironic/conductor/manager.py
+++ b/ironic/conductor/manager.py
@@ -35,12 +35,19 @@ all active and cooperatively manage all nodes in the deployment. Nodes are
locked by each conductor when performing actions which change the state of that
node; these locks are represented by the
:py:class:`ironic.conductor.task_manager.TaskManager` class.
+
+A :py:class:`ironic.common.hash_ring.HashRing` is used to distribute nodes
+across the set of active conductors which support each node's driver.
+Rebalancing this ring can trigger various actions by each conductor, such as
+building or tearing down the TFTP environment for a node, notifying Neutron of
+a change, etc.
"""
from oslo.config import cfg
from ironic.common import driver_factory
from ironic.common import exception
+from ironic.common import hash_ring as hash
from ironic.common import service
from ironic.common import states
from ironic.conductor import task_manager
@@ -79,7 +86,7 @@ CONF.register_opts(conductor_opts, 'conductor')
class ConductorManager(service.PeriodicService):
"""Ironic Conductor service main class."""
- RPC_API_VERSION = '1.6'
+ RPC_API_VERSION = '1.7'
def __init__(self, host, topic):
serializer = objects_base.IronicObjectSerializer()
@@ -91,17 +98,22 @@ class ConductorManager(service.PeriodicService):
self.dbapi = dbapi.get_instance()
df = driver_factory.DriverFactory()
- drivers = df.names
+ self.drivers = df.names
+ """List of driver names which this conductor supports."""
+
try:
self.dbapi.register_conductor({'hostname': self.host,
- 'drivers': drivers})
+ 'drivers': self.drivers})
except exception.ConductorAlreadyRegistered:
LOG.warn(_("A conductor with hostname %(hostname)s "
"was previously registered. Updating registration")
% {'hostname': self.host})
self.dbapi.unregister_conductor(self.host)
self.dbapi.register_conductor({'hostname': self.host,
- 'drivers': drivers})
+ 'drivers': self.drivers})
+
+ self.driver_rings = self._get_current_driver_rings()
+ """Consistent hash ring which maps drivers to conductors."""
# TODO(deva): add stop() to call unregister_conductor
@@ -137,7 +149,7 @@ class ConductorManager(service.PeriodicService):
:param node_obj: a changed (but not saved) node object.
"""
- node_id = node_obj.get('uuid')
+ node_id = node_obj.uuid
LOG.debug(_("RPC update_node called for node %s.") % node_id)
delta = node_obj.obj_what_changed()
@@ -337,10 +349,16 @@ class ConductorManager(service.PeriodicService):
@periodic_task.periodic_task(
spacing=CONF.conductor.sync_power_state_interval)
def _sync_power_states(self, context):
- # TODO(deva): add filter by conductor<->instance mapping
filters = {'reserved': False}
- node_list = self.dbapi.get_nodeinfo_list(filters=filters)
- for node_id, in node_list:
+ columns = ['id', 'uuid', 'driver']
+ node_list = self.dbapi.get_nodeinfo_list(columns=columns,
+ filters=filters)
+ for (node_id, node_uuid, driver) in node_list:
+ # only sync power states for nodes mapped to this conductor
+ mapped_hosts = self.driver_rings[driver].get_hosts(node_uuid)
+ if self.host not in mapped_hosts:
+ continue
+
try:
with task_manager.acquire(context, node_id) as task:
node = task.node
@@ -358,12 +376,37 @@ class ConductorManager(service.PeriodicService):
node['power_state'] = power_state
node.save(context)
- except (exception.NodeLocked, exception.NodeNotFound):
- # NOTE(deva): if an instance is deleted during sync,
- # or locked by another process,
- # silently ignore it and continue
+ except exception.NodeNotFound:
+ LOG.info(_("During sync_power_state, node %(node)s was not "
+ "found and presumed deleted by another process.") %
+ {'node': node_uuid})
+ continue
+ except exception.NodeLocked:
+ LOG.info(_("During sync_power_state, node %(node)s was "
+ "already locked by another process. Skip.") %
+ {'node': node_uuid})
continue
+ def _get_current_driver_rings(self):
+ """Build the current hash ring for this ConductorManager's drivers."""
+
+ ring = {}
+ d2c = self.dbapi.get_active_driver_dict()
+
+ for driver in self.drivers:
+ ring[driver] = hash.HashRing(d2c[driver])
+ return ring
+
+ def rebalance_node_ring(self):
+ """Perform any actions necessary when rebalancing the consistent hash.
+
+ This may trigger several actions, such as calling driver.deploy.prepare
+ for nodes which are now mapped to this conductor.
+
+ """
+ # TODO(deva): implement this
+ pass
+
def validate_driver_interfaces(self, context, node_id):
"""Validate the `core` and `standardized` interfaces for drivers.
diff --git a/ironic/conductor/rpcapi.py b/ironic/conductor/rpcapi.py
index f7f543cd0..efb6a3122 100644
--- a/ironic/conductor/rpcapi.py
+++ b/ironic/conductor/rpcapi.py
@@ -19,9 +19,12 @@
Client side of the conductor RPC API.
"""
+from oslo.config import cfg
+
+from ironic.common import hash_ring as hash
+from ironic.db import api as dbapi
from ironic.objects import base as objects_base
import ironic.openstack.common.rpc.proxy
-from oslo.config import cfg
# NOTE(max_lobur): This is temporary override for Oslo setting defined in
# ironic.openstack.common.rpc.__init__.py. Should stay while Oslo is not fixed.
@@ -50,33 +53,57 @@ class ConductorAPI(ironic.openstack.common.rpc.proxy.RpcProxy):
1.5 - Added validate_driver_interfaces.
1.6 - change_node_power_state, do_node_deploy and do_node_tear_down
accept node id instead of node object.
+ 1.7 - Added topic parameter to RPC methods.
"""
- RPC_API_VERSION = '1.6'
+ RPC_API_VERSION = '1.7'
def __init__(self, topic=None):
if topic is None:
topic = MANAGER_TOPIC
+ # Initialize consistent hash ring
+ self.hash_rings = {}
+ d2c = dbapi.get_instance().get_active_driver_dict()
+ for driver in d2c.keys():
+ self.hash_rings[driver] = hash.HashRing(d2c[driver])
+
super(ConductorAPI, self).__init__(
topic=topic,
serializer=objects_base.IronicObjectSerializer(),
default_version=self.RPC_API_VERSION)
- def get_node_power_state(self, context, node_id):
+ def get_topic_for(self, node):
+ """Get the RPC topic for the conductor service which the node
+ is mapped to.
+
+ :param node: a node object.
+ :returns: an RPC topic string.
+
+ """
+ try:
+ ring = self.hash_rings[node.driver]
+ dest = ring.get_hosts(node.uuid)
+ return self.topic + "." + dest[0]
+ except KeyError:
+ return self.topic
+
+ def get_node_power_state(self, context, node_id, topic=None):
"""Ask a conductor for the node power state.
:param context: request context.
:param node_id: node id or uuid.
+ :param topic: RPC topic. Defaults to self.topic.
:returns: power status.
"""
return self.call(context,
self.make_msg('get_node_power_state',
- node_id=node_id))
+ node_id=node_id),
+ topic=topic or self.topic)
- def update_node(self, context, node_obj):
+ def update_node(self, context, node_obj, topic=None):
"""Synchronously, have a conductor update the node's information.
Update the node's information in the database and return a node object.
@@ -90,42 +117,51 @@ class ConductorAPI(ironic.openstack.common.rpc.proxy.RpcProxy):
:param context: request context.
:param node_obj: a changed (but not saved) node object.
+ :param topic: RPC topic. Defaults to self.topic.
:returns: updated node object, including all fields.
"""
return self.call(context,
self.make_msg('update_node',
- node_obj=node_obj))
+ node_obj=node_obj),
+ topic=topic or self.topic)
- def change_node_power_state(self, context, node_id, new_state):
+ def change_node_power_state(self, context, node_id, new_state, topic=None):
"""Asynchronously change power state of a node.
:param context: request context.
:param node_id: node id or uuid.
:param new_state: one of ironic.common.states power state values
+ :param topic: RPC topic. Defaults to self.topic.
"""
self.cast(context,
self.make_msg('change_node_power_state',
node_id=node_id,
- new_state=new_state))
+ new_state=new_state),
+ topic=topic or self.topic)
- def vendor_passthru(self, context, node_id, driver_method, info):
+ def vendor_passthru(self, context, node_id, driver_method, info,
+ topic=None):
"""Pass vendor specific info to a node driver.
:param context: request context.
:param node_id: node id or uuid.
:param driver_method: name of method for driver.
:param info: info for node driver.
+ :param topic: RPC topic. Defaults to self.topic.
:raises: InvalidParameterValue for parameter errors.
:raises: UnsupportedDriverExtension for unsupported extensions.
"""
+ topic = topic or self.topic
+
driver_data = self.call(context,
self.make_msg('validate_vendor_action',
- node_id=node_id,
- driver_method=driver_method,
- info=info))
+ node_id=node_id,
+ driver_method=driver_method,
+ info=info),
+ topic=topic)
# this method can do nothing if 'driver_method' intended only
# for obtain 'driver_data'
@@ -133,15 +169,17 @@ class ConductorAPI(ironic.openstack.common.rpc.proxy.RpcProxy):
self.make_msg('do_vendor_action',
node_id=node_id,
driver_method=driver_method,
- info=info))
+ info=info),
+ topic=topic)
return driver_data
- def do_node_deploy(self, context, node_id):
+ def do_node_deploy(self, context, node_id, topic=None):
"""Signal to conductor service to perform a deployment.
:param context: request context.
:param node_id: node id or uuid.
+ :param topic: RPC topic. Defaults to self.topic.
The node must already be configured and in the appropriate
undeployed state before this method is called.
@@ -149,13 +187,15 @@ class ConductorAPI(ironic.openstack.common.rpc.proxy.RpcProxy):
"""
self.cast(context,
self.make_msg('do_node_deploy',
- node_id=node_id))
+ node_id=node_id),
+ topic=topic or self.topic)
- def do_node_tear_down(self, context, node_id):
+ def do_node_tear_down(self, context, node_id, topic=None):
"""Signal to conductor service to tear down a deployment.
:param context: request context.
:param node_id: node id or uuid.
+ :param topic: RPC topic. Defaults to self.topic.
The node must already be configured and in the appropriate
deployed state before this method is called.
@@ -163,17 +203,20 @@ class ConductorAPI(ironic.openstack.common.rpc.proxy.RpcProxy):
"""
self.cast(context,
self.make_msg('do_node_tear_down',
- node_id=node_id))
+ node_id=node_id),
+ topic=topic or self.topic)
- def validate_driver_interfaces(self, context, node_id):
+ def validate_driver_interfaces(self, context, node_id, topic=None):
"""Validate the `core` and `standardized` interfaces for drivers.
:param context: request context.
:param node_id: node id or uuid.
+ :param topic: RPC topic. Defaults to self.topic.
:returns: a dictionary containing the results of each
interface validation.
"""
return self.call(context,
self.make_msg('validate_driver_interfaces',
- node_id=node_id))
+ node_id=node_id),
+ topic=topic or self.topic)
diff --git a/ironic/tests/api/test_nodes.py b/ironic/tests/api/test_nodes.py
index be6481478..473af5e58 100644
--- a/ironic/tests/api/test_nodes.py
+++ b/ironic/tests/api/test_nodes.py
@@ -331,6 +331,10 @@ class TestPatch(base.FunctionalTest):
self.chassis = self.dbapi.create_chassis(cdict)
ndict = dbutils.get_test_node()
self.node = self.dbapi.create_node(ndict)
+ p = mock.patch.object(rpcapi.ConductorAPI, 'get_topic_for')
+ self.mock_gtf = p.start()
+ self.mock_gtf.return_value = 'test-topic'
+ self.addCleanup(p.stop)
p = mock.patch.object(rpcapi.ConductorAPI, 'update_node')
self.mock_update_node = p.start()
self.addCleanup(p.stop)
@@ -350,7 +354,8 @@ class TestPatch(base.FunctionalTest):
self.assertEqual(response.status_code, 200)
self.assertEqual(self.mock_update_node.return_value.updated_at,
timeutils.parse_isotime(response.json['updated_at']))
- self.mock_update_node.assert_called_once_with(mock.ANY, mock.ANY)
+ self.mock_update_node.assert_called_once_with(
+ mock.ANY, mock.ANY, 'test-topic')
def test_update_state(self):
self.assertRaises(webtest.app.AppError, self.patch_json,
@@ -373,7 +378,8 @@ class TestPatch(base.FunctionalTest):
self.assertEqual(response.content_type, 'application/json')
self.assertEqual(response.status_code, 400)
- self.mock_update_node.assert_called_once_with(mock.ANY, mock.ANY)
+ self.mock_update_node.assert_called_once_with(
+ mock.ANY, mock.ANY, 'test-topic')
def test_update_fails_bad_state(self):
fake_err = 'Fake Power State'
@@ -388,7 +394,8 @@ class TestPatch(base.FunctionalTest):
self.assertEqual(response.content_type, 'application/json')
self.assertEqual(response.status_code, 409)
- self.mock_update_node.assert_called_once_with(mock.ANY, mock.ANY)
+ self.mock_update_node.assert_called_once_with(
+ mock.ANY, mock.ANY, 'test-topic')
def test_add_ok(self):
self.mock_update_node.return_value = self.node
@@ -400,7 +407,8 @@ class TestPatch(base.FunctionalTest):
self.assertEqual(response.content_type, 'application/json')
self.assertEqual(response.status_code, 200)
- self.mock_update_node.assert_called_once_with(mock.ANY, mock.ANY)
+ self.mock_update_node.assert_called_once_with(
+ mock.ANY, mock.ANY, 'test-topic')
def test_add_fail(self):
self.assertRaises(webtest.app.AppError, self.patch_json,
@@ -416,7 +424,8 @@ class TestPatch(base.FunctionalTest):
self.assertEqual(response.content_type, 'application/json')
self.assertEqual(response.status_code, 200)
- self.mock_update_node.assert_called_once_with(mock.ANY, mock.ANY)
+ self.mock_update_node.assert_called_once_with(
+ mock.ANY, mock.ANY, 'test-topic')
def test_remove_fail(self):
self.assertRaises(webtest.app.AppError, self.patch_json,
@@ -490,6 +499,10 @@ class TestPost(base.FunctionalTest):
cdict = dbutils.get_test_chassis()
self.chassis = self.dbapi.create_chassis(cdict)
self.addCleanup(timeutils.clear_time_override)
+ p = mock.patch.object(rpcapi.ConductorAPI, 'get_topic_for')
+ self.mock_gtf = p.start()
+ self.mock_gtf.return_value = 'test-topic'
+ self.addCleanup(p.stop)
def test_create_node(self):
ndict = post_get_test_node()
@@ -525,7 +538,8 @@ class TestPost(base.FunctionalTest):
mock_vendor.return_value = 'OK'
response = self.post_json('/nodes/%s/vendor_passthru/test' % uuid,
info, expect_errors=False)
- mock_vendor.assert_called_once_with(mock.ANY, uuid, 'test', info)
+ mock_vendor.assert_called_once_with(
+ mock.ANY, uuid, 'test', info, 'test-topic')
self.assertEqual(response.body, '"OK"')
self.assertEqual(response.status_code, 202)
@@ -543,7 +557,8 @@ class TestPost(base.FunctionalTest):
'extension': 'test'})
response = self.post_json('/nodes/%s/vendor_passthru/test' % uuid,
info, expect_errors=True)
- mock_vendor.assert_called_once_with(mock.ANY, uuid, 'test', info)
+ mock_vendor.assert_called_once_with(
+ mock.ANY, uuid, 'test', info, 'test-topic')
self.assertEqual(response.status_code, 400)
def test_vendor_passthru_without_method(self):
@@ -626,6 +641,10 @@ class TestPut(base.FunctionalTest):
self.chassis = self.dbapi.create_chassis(cdict)
ndict = dbutils.get_test_node()
self.node = self.dbapi.create_node(ndict)
+ p = mock.patch.object(rpcapi.ConductorAPI, 'get_topic_for')
+ self.mock_gtf = p.start()
+ self.mock_gtf.return_value = 'test-topic'
+ self.addCleanup(p.stop)
p = mock.patch.object(rpcapi.ConductorAPI, 'change_node_power_state')
self.mock_cnps = p.start()
self.addCleanup(p.stop)
@@ -644,7 +663,8 @@ class TestPut(base.FunctionalTest):
self.mock_cnps.assert_called_once_with(mock.ANY,
self.node['uuid'],
- states.POWER_ON)
+ states.POWER_ON,
+ 'test-topic')
def test_power_state_in_progress(self):
manager = mock.MagicMock()
@@ -655,7 +675,8 @@ class TestPut(base.FunctionalTest):
expected = [mock.call.get_by_uuid(mock.ANY, self.node['uuid']),
mock.call.change_node_power_state(mock.ANY,
self.node['uuid'],
- states.POWER_ON)]
+ states.POWER_ON,
+ 'test-topic')]
self.put_json('/nodes/%s/states/power' % self.node['uuid'],
{'target': states.POWER_ON})
@@ -680,13 +701,15 @@ class TestPut(base.FunctionalTest):
ret = self.put_json('/nodes/%s/states/provision' % self.node.uuid,
{'target': states.ACTIVE})
self.assertEqual(ret.status_code, 202)
- self.mock_dnd.assert_called_once_with(mock.ANY, self.node.uuid)
+ self.mock_dnd.assert_called_once_with(
+ mock.ANY, self.node.uuid, 'test-topic')
def test_provision_with_tear_down(self):
ret = self.put_json('/nodes/%s/states/provision' % self.node.uuid,
{'target': states.DELETED})
self.assertEqual(ret.status_code, 202)
- self.mock_dntd.assert_called_once_with(mock.ANY, self.node.uuid)
+ self.mock_dntd.assert_called_once_with(
+ mock.ANY, self.node.uuid, 'test-topic')
def test_provision_invalid_state_request(self):
ret = self.put_json('/nodes/%s/states/provision' % self.node.uuid,
diff --git a/ironic/tests/conductor/test_manager.py b/ironic/tests/conductor/test_manager.py
index 113a5c825..9a72ef2c0 100644
--- a/ironic/tests/conductor/test_manager.py
+++ b/ironic/tests/conductor/test_manager.py
@@ -120,11 +120,13 @@ class ManagerTestCase(base.DbTestCase):
# create three nodes
nodes = []
+ nodeinfo = []
for i in range(0, 3):
n = utils.get_test_node(id=i, uuid=ironic_utils.generate_uuid(),
driver='fake', power_state=states.POWER_OFF)
self.dbapi.create_node(n)
nodes.append(n['uuid'])
+ nodeinfo.append([i, n['uuid'], 'fake'])
# lock the first node
self.dbapi.reserve_nodes('fake-reserve', [nodes[0]])
@@ -136,7 +138,7 @@ class ManagerTestCase(base.DbTestCase):
'get_nodeinfo_list') as get_fnl_mock:
# delete the second node
self.dbapi.destroy_node(nodes[1])
- get_fnl_mock.return_value = [[n] for n in nodes]
+ get_fnl_mock.return_value = nodeinfo
self.service._sync_power_states(self.context)
# check that get_power only called once, which updated third node
get_power_mock.assert_called_once_with(mock.ANY, mock.ANY)
diff --git a/ironic/tests/conductor/test_rpcapi.py b/ironic/tests/conductor/test_rpcapi.py
index d3e87ba06..b38a0621e 100644
--- a/ironic/tests/conductor/test_rpcapi.py
+++ b/ironic/tests/conductor/test_rpcapi.py
@@ -41,7 +41,8 @@ class RPCAPITestCase(base.DbTestCase):
super(RPCAPITestCase, self).setUp()
self.context = context.get_admin_context()
self.dbapi = dbapi.get_instance()
- self.fake_node = json.to_primitive(dbutils.get_test_node())
+ self.fake_node = json.to_primitive(dbutils.get_test_node(
+ driver='fake-driver'))
self.fake_node_obj = objects.Node._from_db_object(
objects.Node(),
self.fake_node)
@@ -49,6 +50,26 @@ class RPCAPITestCase(base.DbTestCase):
def test_serialized_instance_has_uuid(self):
self.assertTrue('uuid' in self.fake_node)
+ def test_get_topic_for_known_driver(self):
+ CONF.set_override('host', 'fake-host')
+ self.dbapi.register_conductor({'hostname': 'fake-host',
+ 'drivers': ['fake-driver']})
+
+ rpcapi = conductor_rpcapi.ConductorAPI(topic='fake-topic')
+ expected_topic = 'fake-topic.fake-host'
+ self.assertEqual(expected_topic,
+ rpcapi.get_topic_for(self.fake_node_obj))
+
+ def test_get_topic_for_unknown_driver(self):
+ CONF.set_override('host', 'fake-host')
+ self.dbapi.register_conductor({'hostname': 'fake-host',
+ 'drivers': ['other-driver']})
+
+ rpcapi = conductor_rpcapi.ConductorAPI(topic='fake-topic')
+ expected_topic = 'fake-topic'
+ self.assertEqual(expected_topic,
+ rpcapi.get_topic_for(self.fake_node_obj))
+
def _test_rpcapi(self, method, rpc_method, **kwargs):
ctxt = context.get_admin_context()
rpcapi = conductor_rpcapi.ConductorAPI(topic='fake-topic')
@@ -69,6 +90,7 @@ class RPCAPITestCase(base.DbTestCase):
def _fake_rpc_method(*args, **kwargs):
self.fake_args = args
self.fake_kwargs = kwargs
+
if expected_retval:
return expected_retval