summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMax Lobur <max_lobur@outlook.com>2014-01-22 10:13:39 -0500
committerMax Lobur <max_lobur@outlook.com>2014-02-13 09:08:37 -0500
commitc4f2f26edf5cd1f5880f86853fcc611281605e63 (patch)
tree81075a4ff1bbe94cf6214f96302bb0fcb5025913
parentcb3246848f5c49ce7c418cedcf06fba01bbf4aa9 (diff)
downloadironic-c4f2f26edf5cd1f5880f86853fcc611281605e63.tar.gz
Fix race condition when changing node states
To fix race condition we're adding a mechanism of background task execution in conductor. The conductor will get synchrozed AMQP call, reserve lock, start the background task and return empty response to the API. In case when lock cannot be acquired or background tasks pool is full, the exception is sent back to the API and the task is not started. Also the patch adds an ability to control resource locks manually. This feauture used to release lock in the end of background task. Change-Id: I4095de2d82058ea5e052531698e67a0947424435 Closes-Bug: #1259910
-rw-r--r--ironic/api/controllers/v1/node.py21
-rw-r--r--ironic/common/exception.py6
-rw-r--r--ironic/conductor/manager.py52
-rw-r--r--ironic/conductor/rpcapi.py15
-rw-r--r--ironic/conductor/task_manager.py113
-rw-r--r--ironic/conductor/utils.py8
-rw-r--r--ironic/tests/api/test_nodes.py26
-rw-r--r--ironic/tests/conductor/test_conductor_utils.py189
-rw-r--r--ironic/tests/conductor/test_manager.py189
-rw-r--r--ironic/tests/conductor/test_rpcapi.py2
10 files changed, 441 insertions, 180 deletions
diff --git a/ironic/api/controllers/v1/node.py b/ironic/api/controllers/v1/node.py
index 6240660ab..50f665451 100644
--- a/ironic/api/controllers/v1/node.py
+++ b/ironic/api/controllers/v1/node.py
@@ -120,21 +120,14 @@ class NodeStatesController(rest.RestController):
rpc_node = objects.Node.get_by_uuid(pecan.request.context, node_uuid)
topic = pecan.request.rpcapi.get_topic_for(rpc_node)
- if rpc_node.target_power_state is not None:
- raise wsme.exc.ClientSideError(_("Power operation for node %s is "
- "already in progress.") %
- rpc_node['uuid'],
- status_code=409)
- # Note that there is a race condition. The node state(s) could change
- # by the time the RPC call is made and the TaskManager manager gets a
- # lock.
- if target in [ir_states.POWER_ON,
- ir_states.POWER_OFF,
- ir_states.REBOOT]:
- pecan.request.rpcapi.change_node_power_state(
- pecan.request.context, node_uuid, target, topic)
- else:
+ if target not in [ir_states.POWER_ON,
+ ir_states.POWER_OFF,
+ ir_states.REBOOT]:
raise exception.InvalidStateRequested(state=target, node=node_uuid)
+
+ pecan.request.rpcapi.change_node_power_state(pecan.request.context,
+ node_uuid, target, topic)
+
# FIXME(lucasagomes): Currently WSME doesn't support returning
# the Location header. Once it's implemented we should use the
# Location to point to the /states subresource of the node so
diff --git a/ironic/common/exception.py b/ironic/common/exception.py
index 26792bc0c..64413c10b 100644
--- a/ironic/common/exception.py
+++ b/ironic/common/exception.py
@@ -322,3 +322,9 @@ class HTTPNotFound(NotFound):
class ConfigNotFound(IronicException):
message = _("Could not find config at %(path)s")
+
+
+class NoFreeConductorWorker(IronicException):
+ message = _('Requested action cannot be performed due to lack of free '
+ 'conductor workers.')
+ code = 503 # Service Unavailable (temporary).
diff --git a/ironic/conductor/manager.py b/ironic/conductor/manager.py
index 8a7a1b03a..6ebdd85d6 100644
--- a/ironic/conductor/manager.py
+++ b/ironic/conductor/manager.py
@@ -43,6 +43,8 @@ building or tearing down the TFTP environment for a node, notifying Neutron of
a change, etc.
"""
+from eventlet import greenpool
+
from oslo.config import cfg
from ironic.common import driver_factory
@@ -55,10 +57,12 @@ from ironic.conductor import utils
from ironic.db import api as dbapi
from ironic.objects import base as objects_base
from ironic.openstack.common import excutils
+from ironic.openstack.common import lockutils
from ironic.openstack.common import log
from ironic.openstack.common import periodic_task
MANAGER_TOPIC = 'ironic.conductor_manager'
+WORKER_SPAWN_lOCK = "conductor_worker_spawn"
LOG = log.getLogger(__name__)
@@ -115,6 +119,9 @@ class ConductorManager(service.PeriodicService):
self.driver_rings = self._get_current_driver_rings()
"""Consistent hash ring which maps drivers to conductors."""
+ self._worker_pool = greenpool.GreenPool(size=CONF.rpc_thread_pool_size)
+ """GreenPool of background workers for performing tasks async."""
+
# TODO(deva): add stop() to call unregister_conductor
def initialize_service_hook(self, service):
@@ -181,25 +188,35 @@ class ConductorManager(service.PeriodicService):
def change_node_power_state(self, context, node_id, new_state):
"""RPC method to encapsulate changes to a node's state.
- Perform actions such as power on, power off. It waits for the power
- action to finish, then if successful, it updates the power_state for
- the node with the new power state.
+ Perform actions such as power on, power off. The validation and power
+ action are performed in background (async). Once the power action is
+ finished and successful, it updates the power_state for the node with
+ the new power state.
:param context: an admin context.
:param node_id: the id or uuid of a node.
:param new_state: the desired power state of the node.
- :raises: InvalidParameterValue when the wrong state is specified
- or the wrong driver info is specified.
- :raises: other exceptions by the node's power driver if something
- wrong occurred during the power action.
+ :raises: NoFreeConductorWorker when there is no free worker to start
+ async task.
"""
LOG.debug(_("RPC change_node_power_state called for node %(node)s. "
"The desired new state is %(state)s.")
% {'node': node_id, 'state': new_state})
- with task_manager.acquire(context, node_id, shared=False) as task:
- utils.node_power_action(task, task.node, new_state)
+ task = task_manager.TaskManager(context)
+ task.acquire_resources(node_id, shared=False)
+
+ try:
+ # Start requested action in the background.
+ thread = self._spawn_worker(utils.node_power_action,
+ task, task.node, new_state)
+ # Release node lock at the end.
+ thread.link(lambda t: task.release_resources())
+ except Exception:
+ with excutils.save_and_reraise_exception():
+ # Release node lock if error occurred.
+ task.release_resources()
# NOTE(deva): There is a race condition in the RPC API for vendor_passthru.
# Between the validate_vendor_action and do_vendor_action calls, it's
@@ -481,3 +498,20 @@ class ConductorManager(service.PeriodicService):
reason=msg)
return node
+
+ @lockutils.synchronized(WORKER_SPAWN_lOCK, 'ironic-')
+ def _spawn_worker(self, func, *args, **kwargs):
+
+ """Create a greenthread to run func(*args, **kwargs).
+
+ Spawns a greenthread if there are free slots in pool, otherwise raises
+ exception. Execution control returns immediately to the caller.
+
+ :returns: GreenThread object.
+ :raises: NoFreeConductorWorker if worker pool is currently full.
+
+ """
+ if self._worker_pool.free():
+ return self._worker_pool.spawn(func, *args, **kwargs)
+ else:
+ raise exception.NoFreeConductorWorker()
diff --git a/ironic/conductor/rpcapi.py b/ironic/conductor/rpcapi.py
index 2b5a59807..e375fc25c 100644
--- a/ironic/conductor/rpcapi.py
+++ b/ironic/conductor/rpcapi.py
@@ -127,19 +127,22 @@ class ConductorAPI(ironic.openstack.common.rpc.proxy.RpcProxy):
topic=topic or self.topic)
def change_node_power_state(self, context, node_id, new_state, topic=None):
- """Asynchronously change power state of a node.
+ """Synchronously, acquire lock and start the conductor background task
+ to change power state of a node.
:param context: request context.
:param node_id: node id or uuid.
:param new_state: one of ironic.common.states power state values
:param topic: RPC topic. Defaults to self.topic.
+ :raises: NoFreeConductorWorker when there is no free worker to start
+ async task.
"""
- self.cast(context,
- self.make_msg('change_node_power_state',
- node_id=node_id,
- new_state=new_state),
- topic=topic or self.topic)
+ return self.call(context,
+ self.make_msg('change_node_power_state',
+ node_id=node_id,
+ new_state=new_state),
+ topic=topic or self.topic)
def vendor_passthru(self, context, node_id, driver_method, info,
topic=None):
diff --git a/ironic/conductor/task_manager.py b/ironic/conductor/task_manager.py
index e583e9a22..259c16c6a 100644
--- a/ironic/conductor/task_manager.py
+++ b/ironic/conductor/task_manager.py
@@ -52,6 +52,31 @@ a shorthand to access it. For example::
driver = task.node.driver
driver.power.power_on(task.node)
+If you need to execute task-requiring code in the background thread the
+TaskManager provides the interface to manage resource locks manually. Common
+approach is to use manager._spawn_worker method and release resources using
+link method of the returned thread object.
+For example (somewhere inside conductor manager)::
+
+ task = task_manager.TaskManager(context)
+ task.acquire_resources(node_id, shared=False)
+
+ try:
+ # Start requested action in the background.
+ thread = self._spawn_worker(utils.node_power_action,
+ task, task.node, new_state)
+ # Release node lock at the end.
+ thread.link(lambda t: task.release_resources())
+ except Exception:
+ with excutils.save_and_reraise_exception():
+ # Release node lock if error occurred.
+ task.release_resources()
+
+link callback will be called whenever:
+ - background task finished with no errors.
+ - background task has crashed with exception.
+ - callback was added after the background task has finished or crashed.
+
Eventually, driver functionality may be wrapped by tasks to facilitate
multi-node tasks more easily. Once implemented, it might look like this::
@@ -67,8 +92,6 @@ multi-node tasks more easily. Once implemented, it might look like this::
states = task.get_power_state()
"""
-
-import contextlib
from oslo.config import cfg
from ironic.common import exception
@@ -94,14 +117,8 @@ def require_exclusive_lock(f):
return wrapper
-@contextlib.contextmanager
def acquire(context, node_ids, shared=False, driver_name=None):
- """Context manager for acquiring a lock on one or more Nodes.
-
- Acquire a lock atomically on a non-empty set of nodes. The lock
- can be either shared or exclusive. Shared locks may be used for
- read-only or non-disruptive actions only, and must be considerate
- to what other threads may be doing on the nodes at the same time.
+ """Shortcut for acquiring a lock on one or more Nodes.
:param context: Request context.
:param node_ids: A list of ids or uuids of nodes to lock.
@@ -111,36 +128,66 @@ def acquire(context, node_ids, shared=False, driver_name=None):
:returns: An instance of :class:`TaskManager`.
"""
-
- t = TaskManager(context, shared)
-
- # instead of generating an exception, DTRT and convert to a list
- if not isinstance(node_ids, list):
- node_ids = [node_ids]
-
- try:
- if not shared:
- t.dbapi.reserve_nodes(CONF.host, node_ids)
- for id in node_ids:
- t.resources.append(resource_manager.NodeManager.acquire(
- id, t, driver_name))
- yield t
- finally:
- for id in [r.id for r in t.resources]:
- resource_manager.NodeManager.release(id, t)
- if not shared:
- t.dbapi.release_nodes(CONF.host, node_ids)
+ mgr = TaskManager(context)
+ mgr.acquire_resources(node_ids, shared, driver_name)
+ return mgr
class TaskManager(object):
"""Context manager for tasks."""
- def __init__(self, context, shared):
+ def __init__(self, context):
self.context = context
- self.shared = shared
self.resources = []
+ self.shared = False
self.dbapi = dbapi.get_instance()
+ def acquire_resources(self, node_ids, shared=False, driver_name=None):
+ """Acquire a lock on one or more Nodes.
+
+ Acquire a lock atomically on a non-empty set of nodes. The lock
+ can be either shared or exclusive. Shared locks may be used for
+ read-only or non-disruptive actions only, and must be considerate
+ to what other threads may be doing on the nodes at the same time.
+
+ :param node_ids: A list of ids or uuids of nodes to lock.
+ :param shared: Boolean indicating whether to take a shared or exclusive
+ lock. Default: False.
+ :param driver_name: Name of Driver. Default: None.
+
+ """
+ # Do not allow multiple acquire calls.
+ if self.resources:
+ raise exception.IronicException(
+ _("Task manager already has resources."))
+
+ self.shared = shared
+
+ # instead of generating an exception, DTRT and convert to a list
+ if not isinstance(node_ids, list):
+ node_ids = [node_ids]
+
+ if not self.shared:
+ self.dbapi.reserve_nodes(CONF.host, node_ids)
+ for node_id in node_ids:
+ node_mgr = resource_manager.NodeManager.acquire(node_id, self,
+ driver_name)
+ self.resources.append(node_mgr)
+
+ def release_resources(self):
+ """Release all the resources acquired for this TaskManager."""
+ # Do not allow multiple release calls.
+ if not self.resources:
+ raise exception.IronicException(
+ _("Task manager doesn't have resources to release."))
+
+ node_ids = [r.id for r in self.resources]
+ for node_id in node_ids:
+ resource_manager.NodeManager.release(node_id, self)
+ if not self.shared:
+ self.dbapi.release_nodes(CONF.host, node_ids)
+ self.resources = []
+
@property
def node(self):
"""Special accessor for single-node tasks."""
@@ -167,3 +214,9 @@ class TaskManager(object):
else:
raise AttributeError(_("Multi-node TaskManager "
"can't select single node manager from the list"))
+
+ def __enter__(self):
+ return self
+
+ def __exit__(self, exc_type, exc_val, exc_tb):
+ self.release_resources()
diff --git a/ironic/conductor/utils.py b/ironic/conductor/utils.py
index 83fd7e908..1f9140087 100644
--- a/ironic/conductor/utils.py
+++ b/ironic/conductor/utils.py
@@ -25,11 +25,19 @@ LOG = log.getLogger(__name__)
def node_power_action(task, node, state):
"""Change power state or reset for a node.
+ Validate whether the given power transition is possible and perform
+ power action.
+
:param task: a TaskManager instance.
:param node: the Node object to act upon.
:param state: Any power state from ironic.common.states. If the
state is 'REBOOT' then a reboot will be attempted, otherwise
the node power state is directly set to 'state'.
+ :raises: InvalidParameterValue when the wrong state is specified
+ or the wrong driver info is specified.
+ :raises: other exceptions by the node's power driver if something
+ wrong occurred during the power action.
+
"""
context = task.context
new_state = states.POWER_ON if state == states.REBOOT else state
diff --git a/ironic/tests/api/test_nodes.py b/ironic/tests/api/test_nodes.py
index 4d2c78f37..718b3ea9a 100644
--- a/ironic/tests/api/test_nodes.py
+++ b/ironic/tests/api/test_nodes.py
@@ -26,7 +26,6 @@ from ironic.common import exception
from ironic.common import states
from ironic.common import utils
from ironic.conductor import rpcapi
-from ironic import objects
from ironic.openstack.common import timeutils
from ironic.tests.api import base
from ironic.tests.db import utils as dbutils
@@ -732,31 +731,6 @@ class TestPut(base.FunctionalTest):
states.POWER_ON,
'test-topic')
- def test_power_state_in_progress(self):
- manager = mock.MagicMock()
- with mock.patch.object(objects.Node, 'get_by_uuid') as mock_get_node:
- mock_get_node.return_value = self.node
- manager.attach_mock(mock_get_node, 'get_by_uuid')
- manager.attach_mock(self.mock_cnps, 'change_node_power_state')
- expected = [mock.call.get_by_uuid(mock.ANY, self.node['uuid']),
- mock.call.change_node_power_state(mock.ANY,
- self.node['uuid'],
- states.POWER_ON,
- 'test-topic')]
-
- self.put_json('/nodes/%s/states/power' % self.node['uuid'],
- {'target': states.POWER_ON})
- self.assertEqual(expected, manager.mock_calls)
-
- self.dbapi.update_node(self.node['uuid'],
- {'target_power_state': 'fake'})
- response = self.put_json('/nodes/%s/states/power' % self.node['uuid'],
- {'target': states.POWER_ON},
- expect_errors=True)
- self.assertEqual('application/json', response.content_type)
- self.assertEqual(409, response.status_code)
- self.assertTrue(response.json['error_message'])
-
def test_power_invalid_state_request(self):
ret = self.put_json('/nodes/%s/states/power' % self.node.uuid,
{'target': 'not-supported'}, expect_errors=True)
diff --git a/ironic/tests/conductor/test_conductor_utils.py b/ironic/tests/conductor/test_conductor_utils.py
index ee891c853..e5fc2d3e5 100644
--- a/ironic/tests/conductor/test_conductor_utils.py
+++ b/ironic/tests/conductor/test_conductor_utils.py
@@ -16,8 +16,9 @@ import mock
from ironic.common import exception
from ironic.common import states
-from ironic.conductor import manager
+from ironic.common import utils as cmn_utils
from ironic.conductor import task_manager
+from ironic.conductor import utils as conductor_utils
from ironic.db import api as dbapi
from ironic.openstack.common import context
from ironic.tests.conductor import utils as mgr_utils
@@ -25,197 +26,195 @@ from ironic.tests.db import base
from ironic.tests.db import utils
-class PowerActionTestCase(base.DbTestCase):
+class NodePowerActionTestCase(base.DbTestCase):
def setUp(self):
- super(PowerActionTestCase, self).setUp()
- self.service = manager.ConductorManager('test-host', 'test-topic')
+ super(NodePowerActionTestCase, self).setUp()
self.context = context.get_admin_context()
self.dbapi = dbapi.get_instance()
self.driver = mgr_utils.get_mocked_node_manager()
+ self.task = task_manager.TaskManager(self.context)
- def test_change_node_power_state_power_on(self):
- """Test if change_node_power_state to turn node power on
- is successful or not.
- """
- ndict = utils.get_test_node(driver='fake',
+ def test_node_power_action_power_on(self):
+ """Test node_power_action to turn node power on."""
+ ndict = utils.get_test_node(uuid=cmn_utils.generate_uuid(),
+ driver='fake',
power_state=states.POWER_OFF)
node = self.dbapi.create_node(ndict)
+ self.task.acquire_resources(node.uuid)
with mock.patch.object(self.driver.power, 'get_power_state') \
as get_power_mock:
get_power_mock.return_value = states.POWER_OFF
- self.service.change_node_power_state(self.context,
- node['uuid'], states.POWER_ON)
+ conductor_utils.node_power_action(self.task, self.task.node,
+ states.POWER_ON)
+
node.refresh(self.context)
get_power_mock.assert_called_once_with(mock.ANY, mock.ANY)
self.assertEqual(node['power_state'], states.POWER_ON)
- self.assertEqual(node['target_power_state'], None)
- self.assertEqual(node['last_error'], None)
+ self.assertIsNone(node['target_power_state'])
+ self.assertIsNone(node['last_error'])
- def test_change_node_power_state_power_off(self):
- """Test if change_node_power_state to turn node power off
- is successful or not.
- """
- ndict = utils.get_test_node(driver='fake',
+ def test_node_power_action_power_off(self):
+ """Test node_power_action to turn node power off."""
+ ndict = utils.get_test_node(uuid=cmn_utils.generate_uuid(),
+ driver='fake',
power_state=states.POWER_ON)
node = self.dbapi.create_node(ndict)
+ self.task.acquire_resources(node.uuid)
with mock.patch.object(self.driver.power, 'get_power_state') \
as get_power_mock:
get_power_mock.return_value = states.POWER_ON
- self.service.change_node_power_state(self.context, node['uuid'],
- states.POWER_OFF)
+ conductor_utils.node_power_action(self.task, self.task.node,
+ states.POWER_OFF)
+
node.refresh(self.context)
get_power_mock.assert_called_once_with(mock.ANY, mock.ANY)
self.assertEqual(node['power_state'], states.POWER_OFF)
- self.assertEqual(node['target_power_state'], None)
- self.assertEqual(node['last_error'], None)
+ self.assertIsNone(node['target_power_state'])
+ self.assertIsNone(node['last_error'])
- def test_change_node_power_state_reboot(self):
+ def test_node_power_action_power_reboot(self):
"""Test for reboot a node."""
- ndict = utils.get_test_node(driver='fake',
+ ndict = utils.get_test_node(uuid=cmn_utils.generate_uuid(),
+ driver='fake',
power_state=states.POWER_ON)
node = self.dbapi.create_node(ndict)
+ self.task.acquire_resources(node.uuid)
with mock.patch.object(self.driver.power, 'reboot') as reboot_mock:
- self.service.change_node_power_state(self.context, node['uuid'],
- states.REBOOT)
+ conductor_utils.node_power_action(self.task, self.task.node,
+ states.REBOOT)
+
node.refresh(self.context)
reboot_mock.assert_called_once()
self.assertEqual(node['power_state'], states.POWER_ON)
- self.assertEqual(node['target_power_state'], None)
- self.assertEqual(node['last_error'], None)
+ self.assertIsNone(node['target_power_state'])
+ self.assertIsNone(node['last_error'])
- def test_change_node_power_state_invalid_state(self):
+ def test_node_power_action_invalid_state(self):
"""Test if an exception is thrown when changing to an invalid
power state.
"""
- ndict = utils.get_test_node(driver='fake',
+ ndict = utils.get_test_node(uuid=cmn_utils.generate_uuid(),
+ driver='fake',
power_state=states.POWER_ON)
node = self.dbapi.create_node(ndict)
+ self.task.acquire_resources(node.uuid)
with mock.patch.object(self.driver.power, 'get_power_state') \
as get_power_mock:
get_power_mock.return_value = states.POWER_ON
self.assertRaises(exception.InvalidParameterValue,
- self.service.change_node_power_state,
- self.context,
- node['uuid'],
- "POWER")
+ conductor_utils.node_power_action,
+ self.task,
+ self.task.node,
+ "INVALID_POWER_STATE")
+
node.refresh(self.context)
get_power_mock.assert_called_once_with(mock.ANY, mock.ANY)
self.assertEqual(node['power_state'], states.POWER_ON)
- self.assertEqual(node['target_power_state'], None)
- self.assertNotEqual(node['last_error'], None)
+ self.assertIsNone(node['target_power_state'])
+ self.assertIsNotNone(node['last_error'])
# last_error is cleared when a new transaction happens
- self.service.change_node_power_state(self.context, node['uuid'],
- states.POWER_OFF)
+ conductor_utils.node_power_action(self.task, self.task.node,
+ states.POWER_OFF)
node.refresh(self.context)
self.assertEqual(node['power_state'], states.POWER_OFF)
- self.assertEqual(node['target_power_state'], None)
- self.assertEqual(node['last_error'], None)
-
- def test_change_node_power_state_already_locked(self):
- """Test if an exception is thrown when applying an exclusive
- lock to the node failed.
- """
- ndict = utils.get_test_node(driver='fake',
- power_state=states.POWER_ON)
- node = self.dbapi.create_node(ndict)
-
- # check if the node is locked
- with task_manager.acquire(self.context, node['id'], shared=False):
- self.assertRaises(exception.NodeLocked,
- self.service.change_node_power_state,
- self.context,
- node['uuid'],
- states.POWER_ON)
- node.refresh(self.context)
- self.assertEqual(node['power_state'], states.POWER_ON)
- self.assertEqual(node['target_power_state'], None)
- self.assertEqual(node['last_error'], None)
+ self.assertIsNone(node['target_power_state'])
+ self.assertIsNone(node['last_error'])
- def test_change_node_power_state_already_being_processed(self):
+ def test_node_power_action_already_being_processed(self):
"""The target_power_state is expected to be None so it isn't
checked in the code. This is what happens if it is not None.
(Eg, if a conductor had died during a previous power-off
attempt and left the target_power_state set to states.POWER_OFF,
and the user is attempting to power-off again.)
"""
- ndict = utils.get_test_node(driver='fake',
+ ndict = utils.get_test_node(uuid=cmn_utils.generate_uuid(),
+ driver='fake',
power_state=states.POWER_ON,
target_power_state=states.POWER_OFF)
node = self.dbapi.create_node(ndict)
+ self.task.acquire_resources(node.uuid)
+
+ conductor_utils.node_power_action(self.task, self.task.node,
+ states.POWER_OFF)
- self.service.change_node_power_state(self.context, node['uuid'],
- states.POWER_OFF)
node.refresh(self.context)
self.assertEqual(node['power_state'], states.POWER_OFF)
self.assertEqual(node['target_power_state'], states.NOSTATE)
- self.assertEqual(node['last_error'], None)
+ self.assertIsNone(node['last_error'])
- def test_change_node_power_state_in_same_state(self):
+ def test_node_power_action_in_same_state(self):
"""Test that we don't try to set the power state if the requested
state is the same as the current state.
"""
- ndict = utils.get_test_node(driver='fake',
+ ndict = utils.get_test_node(uuid=cmn_utils.generate_uuid(),
+ driver='fake',
last_error='anything but None',
power_state=states.POWER_ON)
node = self.dbapi.create_node(ndict)
+ self.task.acquire_resources(node.uuid)
with mock.patch.object(self.driver.power, 'get_power_state') \
as get_power_mock:
get_power_mock.return_value = states.POWER_ON
+
with mock.patch.object(self.driver.power, 'set_power_state') \
as set_power_mock:
- set_power_mock.side_effect = exception.IronicException()
+ conductor_utils.node_power_action(self.task, self.task.node,
+ states.POWER_ON)
- self.service.change_node_power_state(self.context,
- node['uuid'],
- states.POWER_ON)
- node.refresh(self.context)
- get_power_mock.assert_called_once_with(mock.ANY, mock.ANY)
- self.assertFalse(set_power_mock.called)
- self.assertEqual(node['power_state'], states.POWER_ON)
- self.assertEqual(node['target_power_state'], None)
- self.assertEqual(node['last_error'], None)
+ node.refresh(self.context)
+ get_power_mock.assert_called_once_with(mock.ANY, mock.ANY)
+ self.assertFalse(set_power_mock.called,
+ "set_power_state unexpectedly called")
+ self.assertEqual(node['power_state'], states.POWER_ON)
+ self.assertIsNone(node['target_power_state'])
+ self.assertIsNone(node['last_error'])
- def test_change_node_power_state_invalid_driver_info(self):
+ def test_node_power_action_invalid_driver_info(self):
"""Test if an exception is thrown when the driver validation
fails.
"""
- ndict = utils.get_test_node(driver='fake',
+ ndict = utils.get_test_node(uuid=cmn_utils.generate_uuid(),
+ driver='fake',
power_state=states.POWER_ON)
node = self.dbapi.create_node(ndict)
+ self.task.acquire_resources(node.uuid)
with mock.patch.object(self.driver.power, 'validate') \
as validate_mock:
validate_mock.side_effect = exception.InvalidParameterValue(
- 'wrong power driver info')
+ 'wrong power driver info')
self.assertRaises(exception.InvalidParameterValue,
- self.service.change_node_power_state,
- self.context,
- node['uuid'],
+ conductor_utils.node_power_action,
+ self.task,
+ self.task.node,
states.POWER_ON)
+
node.refresh(self.context)
validate_mock.assert_called_once_with(mock.ANY)
self.assertEqual(node['power_state'], states.POWER_ON)
- self.assertEqual(node['target_power_state'], None)
- self.assertNotEqual(node['last_error'], None)
+ self.assertIsNone(node['target_power_state'])
+ self.assertIsNotNone(node['last_error'])
- def test_change_node_power_state_set_power_failure(self):
+ def test_node_power_action_set_power_failure(self):
"""Test if an exception is thrown when the set_power call
fails.
"""
- ndict = utils.get_test_node(driver='fake',
+ ndict = utils.get_test_node(uuid=cmn_utils.generate_uuid(),
+ driver='fake',
power_state=states.POWER_OFF)
node = self.dbapi.create_node(ndict)
+ self.task.acquire_resources(node.uuid)
with mock.patch.object(self.driver.power, 'get_power_state') \
as get_power_mock:
@@ -224,15 +223,17 @@ class PowerActionTestCase(base.DbTestCase):
get_power_mock.return_value = states.POWER_OFF
set_power_mock.side_effect = exception.IronicException()
- self.assertRaises(exception.IronicException,
- self.service.change_node_power_state,
- self.context,
- node['uuid'],
- states.POWER_ON)
+ self.assertRaises(
+ exception.IronicException,
+ conductor_utils.node_power_action,
+ self.task,
+ self.task.node,
+ states.POWER_ON)
+
node.refresh(self.context)
get_power_mock.assert_called_once_with(mock.ANY, mock.ANY)
set_power_mock.assert_called_once_with(mock.ANY, mock.ANY,
states.POWER_ON)
self.assertEqual(node['power_state'], states.POWER_OFF)
- self.assertEqual(node['target_power_state'], None)
- self.assertNotEqual(node['last_error'], None)
+ self.assertIsNone(node['target_power_state'])
+ self.assertIsNotNone(node['last_error'])
diff --git a/ironic/tests/conductor/test_manager.py b/ironic/tests/conductor/test_manager.py
index 9dac36ffd..ed16c943e 100644
--- a/ironic/tests/conductor/test_manager.py
+++ b/ironic/tests/conductor/test_manager.py
@@ -19,6 +19,8 @@
"""Test class for Ironic ManagerService."""
+import time
+
import mock
from oslo.config import cfg
from testtools.matchers import HasLength
@@ -29,6 +31,7 @@ from ironic.common import states
from ironic.common import utils as ironic_utils
from ironic.conductor import manager
from ironic.conductor import task_manager
+from ironic.conductor import utils as conductor_utils
from ironic.db import api as dbapi
from ironic import objects
from ironic.openstack.common import context
@@ -207,6 +210,124 @@ class ManagerTestCase(base.DbTestCase):
self.assertEqual(state, states.POWER_ON)
self.assertEqual(get_power_mock.call_args_list, expected)
+ def test_change_node_power_state_power_on(self):
+ # Test change_node_power_state including integration with
+ # conductor.utils.node_power_action and lower.
+ n = utils.get_test_node(driver='fake',
+ power_state=states.POWER_OFF)
+ db_node = self.dbapi.create_node(n)
+ self.service.start()
+
+ with mock.patch.object(self.driver.power, 'get_power_state') \
+ as get_power_mock:
+ get_power_mock.return_value = states.POWER_OFF
+
+ self.service.change_node_power_state(self.context,
+ db_node.uuid,
+ states.POWER_ON)
+ self.service._worker_pool.waitall()
+
+ get_power_mock.assert_called_once_with(mock.ANY, mock.ANY)
+ db_node.refresh(self.context)
+ self.assertEqual(states.POWER_ON, db_node.power_state)
+ self.assertIsNone(db_node.target_power_state)
+ self.assertIsNone(db_node.last_error)
+ # Verify the reservation has been cleared by
+ # background task's link callback.
+ self.assertIsNone(db_node.reservation)
+
+ @mock.patch.object(conductor_utils, 'node_power_action')
+ def test_change_node_power_state_node_already_locked(self,
+ pwr_act_mock):
+ # Test change_node_power_state with mocked
+ # conductor.utils.node_power_action.
+ fake_reservation = 'fake-reserv'
+ pwr_state = states.POWER_ON
+ n = utils.get_test_node(driver='fake',
+ power_state=pwr_state,
+ reservation=fake_reservation)
+ db_node = self.dbapi.create_node(n)
+ self.service.start()
+
+ self.assertRaises(exception.NodeLocked,
+ self.service.change_node_power_state,
+ self.context,
+ db_node.uuid,
+ states.POWER_ON)
+ # In this test worker should not be spawned, but waiting to make sure
+ # the below perform_mock assertion is valid.
+ self.service._worker_pool.waitall()
+ self.assertFalse(pwr_act_mock.called, 'node_power_action has been '
+ 'unexpectedly called.')
+ # Verify existing reservation wasn't broken.
+ db_node.refresh(self.context)
+ self.assertEqual(fake_reservation, db_node.reservation)
+
+ def test_change_node_power_state_worker_pool_full(self):
+ # Test change_node_power_state including integration with
+ # conductor.utils.node_power_action and lower.
+ initial_state = states.POWER_OFF
+ n = utils.get_test_node(driver='fake',
+ power_state=initial_state)
+ db_node = self.dbapi.create_node(n)
+ self.service.start()
+
+ with mock.patch.object(self.service, '_spawn_worker') \
+ as spawn_mock:
+ spawn_mock.side_effect = exception.NoFreeConductorWorker()
+
+ self.assertRaises(exception.NoFreeConductorWorker,
+ self.service.change_node_power_state,
+ self.context,
+ db_node.uuid,
+ states.POWER_ON)
+
+ spawn_mock.assert_called_once_with(mock.ANY, mock.ANY,
+ mock.ANY, mock.ANY)
+ db_node.refresh(self.context)
+ self.assertEqual(initial_state, db_node.power_state)
+ self.assertIsNone(db_node.target_power_state)
+ self.assertIsNone(db_node.last_error)
+ # Verify the picked reservation has been cleared due to full pool.
+ self.assertIsNone(db_node.reservation)
+
+ def test_change_node_power_state_exception_in_background_task(
+ self):
+ # Test change_node_power_state including integration with
+ # conductor.utils.node_power_action and lower.
+ initial_state = states.POWER_OFF
+ n = utils.get_test_node(driver='fake',
+ power_state=initial_state)
+ db_node = self.dbapi.create_node(n)
+ self.service.start()
+
+ with mock.patch.object(self.driver.power, 'get_power_state') \
+ as get_power_mock:
+ get_power_mock.return_value = states.POWER_OFF
+
+ with mock.patch.object(self.driver.power, 'set_power_state') \
+ as set_power_mock:
+ new_state = states.POWER_ON
+ set_power_mock.side_effect = exception.PowerStateFailure(
+ pstate=new_state
+ )
+
+ self.service.change_node_power_state(self.context,
+ db_node.uuid,
+ new_state)
+ self.service._worker_pool.waitall()
+
+ get_power_mock.assert_called_once_with(mock.ANY, mock.ANY)
+ set_power_mock.assert_called_once_with(mock.ANY, mock.ANY,
+ new_state)
+ db_node.refresh(self.context)
+ self.assertEqual(initial_state, db_node.power_state)
+ self.assertIsNone(db_node.target_power_state)
+ self.assertIsNotNone(db_node.last_error)
+ # Verify the reservation has been cleared by background task's
+ # link callback despite exception in background task.
+ self.assertIsNone(db_node.reservation)
+
def test_update_node(self):
ndict = utils.get_test_node(driver='fake', extra={'test': 'one'})
node = self.dbapi.create_node(ndict)
@@ -517,3 +638,71 @@ class ManagerTestCase(base.DbTestCase):
self.context, node.uuid, False)
node.refresh(self.context)
self.assertFalse(node.maintenance)
+
+ def test__spawn_worker(self):
+ func_mock = mock.Mock()
+ args = (1, 2, "test")
+ kwargs = dict(kw1='test1', kw2='test2')
+ self.service.start()
+
+ thread = self.service._spawn_worker(func_mock, *args, **kwargs)
+ self.service._worker_pool.waitall()
+
+ self.assertIsNotNone(thread)
+ func_mock.assert_called_once_with(*args, **kwargs)
+
+ # The tests below related to greenthread. We have they to assert our
+ # assumptions about greenthread behavior.
+
+ def test__spawn_link_callback_added_during_execution(self):
+ def func():
+ time.sleep(1)
+ link_callback = mock.Mock()
+ self.service.start()
+
+ thread = self.service._spawn_worker(func)
+ # func_mock executing at this moment
+ thread.link(link_callback)
+ self.service._worker_pool.waitall()
+
+ link_callback.assert_called_once_with(thread)
+
+ def test__spawn_link_callback_added_after_execution(self):
+ def func():
+ pass
+ link_callback = mock.Mock()
+ self.service.start()
+
+ thread = self.service._spawn_worker(func)
+ self.service._worker_pool.waitall()
+ # func_mock finished at this moment
+ thread.link(link_callback)
+
+ link_callback.assert_called_once_with(thread)
+
+ def test__spawn_link_callback_exception_inside_thread(self):
+ def func():
+ time.sleep(1)
+ raise Exception()
+ link_callback = mock.Mock()
+ self.service.start()
+
+ thread = self.service._spawn_worker(func)
+ # func_mock executing at this moment
+ thread.link(link_callback)
+ self.service._worker_pool.waitall()
+
+ link_callback.assert_called_once_with(thread)
+
+ def test__spawn_link_callback_added_after_exception_inside_thread(self):
+ def func():
+ raise Exception()
+ link_callback = mock.Mock()
+ self.service.start()
+
+ thread = self.service._spawn_worker(func)
+ self.service._worker_pool.waitall()
+ # func_mock finished at this moment
+ thread.link(link_callback)
+
+ link_callback.assert_called_once_with(thread)
diff --git a/ironic/tests/conductor/test_rpcapi.py b/ironic/tests/conductor/test_rpcapi.py
index 86e0c366d..67d5eadb8 100644
--- a/ironic/tests/conductor/test_rpcapi.py
+++ b/ironic/tests/conductor/test_rpcapi.py
@@ -117,7 +117,7 @@ class RPCAPITestCase(base.DbTestCase):
def test_change_node_power_state(self):
self._test_rpcapi('change_node_power_state',
- 'cast',
+ 'call',
node_id=self.fake_node['uuid'],
new_state=states.POWER_ON)