summaryrefslogtreecommitdiff
path: root/ironic/conductor/manager.py
diff options
context:
space:
mode:
Diffstat (limited to 'ironic/conductor/manager.py')
-rw-r--r--ironic/conductor/manager.py167
1 files changed, 147 insertions, 20 deletions
diff --git a/ironic/conductor/manager.py b/ironic/conductor/manager.py
index 13d11d1d9..74e3192cf 100644
--- a/ironic/conductor/manager.py
+++ b/ironic/conductor/manager.py
@@ -73,6 +73,7 @@ from ironic.conf import CONF
from ironic.drivers import base as drivers_base
from ironic.drivers.modules import deploy_utils
from ironic.drivers.modules import image_cache
+from ironic.drivers.modules import inspect_utils
from ironic import objects
from ironic.objects import base as objects_base
from ironic.objects import fields
@@ -97,6 +98,8 @@ class ConductorManager(base_manager.BaseConductorManager):
def __init__(self, host, topic):
super(ConductorManager, self).__init__(host, topic)
+ # NOTE(TheJulia): This is less a metric-able count, but a means to
+ # sort out nodes and prioritise a subset (of non-responding nodes).
self.power_state_sync_count = collections.defaultdict(int)
@METRICS.timer('ConductorManager._clean_up_caches')
@@ -886,7 +889,8 @@ class ConductorManager(base_manager.BaseConductorManager):
exception.NodeInMaintenance,
exception.InstanceDeployFailure,
exception.InvalidStateRequested,
- exception.NodeProtected)
+ exception.NodeProtected,
+ exception.ConcurrentActionLimit)
def do_node_deploy(self, context, node_id, rebuild=False,
configdrive=None, deploy_steps=None):
"""RPC method to initiate deployment to a node.
@@ -910,8 +914,11 @@ class ConductorManager(base_manager.BaseConductorManager):
:raises: InvalidStateRequested when the requested state is not a valid
target from the current state.
:raises: NodeProtected if the node is protected.
+ :raises: ConcurrentActionLimit if this action would exceed the maximum
+ number of configured concurrent actions of this type.
"""
LOG.debug("RPC do_node_deploy called for node %s.", node_id)
+ self._concurrent_action_limit(action='provisioning')
event = 'rebuild' if rebuild else 'deploy'
# NOTE(comstud): If the _sync_power_states() periodic task happens
@@ -983,7 +990,8 @@ class ConductorManager(base_manager.BaseConductorManager):
exception.NodeLocked,
exception.InstanceDeployFailure,
exception.InvalidStateRequested,
- exception.NodeProtected)
+ exception.NodeProtected,
+ exception.ConcurrentActionLimit)
def do_node_tear_down(self, context, node_id):
"""RPC method to tear down an existing node deployment.
@@ -998,8 +1006,11 @@ class ConductorManager(base_manager.BaseConductorManager):
:raises: InvalidStateRequested when the requested state is not a valid
target from the current state.
:raises: NodeProtected if the node is protected.
+ :raises: ConcurrentActionLimit if this action would exceed the maximum
+ number of configured concurrent actions of this type.
"""
LOG.debug("RPC do_node_tear_down called for node %s.", node_id)
+ self._concurrent_action_limit(action='unprovisioning')
with task_manager.acquire(context, node_id, shared=False,
purpose='node tear down') as task:
@@ -1121,7 +1132,8 @@ class ConductorManager(base_manager.BaseConductorManager):
exception.InvalidStateRequested,
exception.NodeInMaintenance,
exception.NodeLocked,
- exception.NoFreeConductorWorker)
+ exception.NoFreeConductorWorker,
+ exception.ConcurrentActionLimit)
def do_node_clean(self, context, node_id, clean_steps,
disable_ramdisk=False):
"""RPC method to initiate manual cleaning.
@@ -1150,7 +1162,10 @@ class ConductorManager(base_manager.BaseConductorManager):
:raises: NodeLocked if node is locked by another conductor.
:raises: NoFreeConductorWorker when there is no free worker to start
async task.
+ :raises: ConcurrentActionLimit If this action would exceed the
+ configured limits of the deployment.
"""
+ self._concurrent_action_limit(action='cleaning')
with task_manager.acquire(context, node_id, shared=False,
purpose='node manual cleaning') as task:
node = task.node
@@ -1336,7 +1351,8 @@ class ConductorManager(base_manager.BaseConductorManager):
callback=self._spawn_worker,
call_args=(cleaning.do_node_clean_abort, task),
err_handler=utils.provisioning_error_handler,
- target_state=target_state)
+ target_state=target_state,
+ last_error=cleaning.get_last_error(node))
return
if node.provision_state == states.RESCUEWAIT:
@@ -1420,6 +1436,11 @@ class ConductorManager(base_manager.BaseConductorManager):
finally:
waiters.wait_for_all(futures)
+ # report a count of the nodes
+ METRICS.send_gauge(
+ 'ConductorManager.PowerSyncNodesCount',
+ len(nodes))
+
def _sync_power_state_nodes_task(self, context, nodes):
"""Invokes power state sync on nodes from synchronized queue.
@@ -1438,6 +1459,7 @@ class ConductorManager(base_manager.BaseConductorManager):
can do here to avoid failing a brand new deploy to a node that
we've locked here, though.
"""
+
# FIXME(comstud): Since our initial state checks are outside
# of the lock (to try to avoid the lock), some checks are
# repeated after grabbing the lock so we can unlock quickly.
@@ -1484,6 +1506,12 @@ class ConductorManager(base_manager.BaseConductorManager):
LOG.info("During sync_power_state, node %(node)s was not "
"found and presumed deleted by another process.",
{'node': node_uuid})
+ # TODO(TheJulia): The chance exists that we orphan a node
+ # in power_state_sync_count, albeit it is not much data,
+ # it could eventually cause the memory footprint to grow
+ # on an exceptionally large ironic deployment. We should
+ # make sure we clean it up at some point, but overall given
+ # minimal impact, it is definite low hanging fruit.
except exception.NodeLocked:
LOG.info("During sync_power_state, node %(node)s was "
"already locked by another process. Skip.",
@@ -1500,6 +1528,7 @@ class ConductorManager(base_manager.BaseConductorManager):
# regular power state checking, maintenance is still a required
# condition.
filters={'maintenance': True, 'fault': faults.POWER_FAILURE},
+ node_count_metric_name='ConductorManager.PowerSyncRecoveryNodeCount',
)
def _power_failure_recovery(self, task, context):
"""Periodic task to check power states for nodes in maintenance.
@@ -1761,10 +1790,6 @@ class ConductorManager(base_manager.BaseConductorManager):
if task.node.console_enabled:
notify_utils.emit_console_notification(
task, 'console_restore', fields.NotificationStatus.START)
- # NOTE(kaifeng) Clear allocated_ipmi_terminal_port if exists,
- # so current conductor can allocate a new free port from local
- # resources.
- task.node.del_driver_internal_info('allocated_ipmi_terminal_port')
try:
task.driver.console.start_console(task)
except Exception as err:
@@ -1846,6 +1871,7 @@ class ConductorManager(base_manager.BaseConductorManager):
predicate=lambda n, m: n.conductor_affinity != m.conductor.id,
limit=lambda: CONF.conductor.periodic_max_workers,
shared_task=False,
+ node_count_metric_name='ConductorManager.SyncLocalStateNodeCount',
)
def _sync_local_state(self, task, context):
"""Perform any actions necessary to sync local state.
@@ -2011,6 +2037,26 @@ class ConductorManager(base_manager.BaseConductorManager):
node.console_enabled = False
notify_utils.emit_console_notification(
task, 'console_set', fields.NotificationStatus.END)
+ # Destroy Swift Inventory entries for this node
+ try:
+ inspect_utils.clean_up_swift_entries(task)
+ except exception.SwiftObjectStillExists as e:
+ if node.maintenance:
+ # Maintenance -> Allow orphaning
+ LOG.warning('Swift object orphaned during destruction of '
+ 'node %(node)s: %(e)s',
+ {'node': node.uuid, 'e': e})
+ else:
+ LOG.error('Swift object cannot be orphaned without '
+ 'maintenance mode during destruction of node '
+ '%(node)s: %(e)s', {'node': node.uuid, 'e': e})
+ raise
+ except Exception as err:
+ LOG.error('Failed to delete Swift entries related '
+ 'to the node %(node)s: %(err)s.',
+ {'node': node.uuid, 'err': err})
+ raise
+
node.destroy()
LOG.info('Successfully deleted node %(node)s.',
{'node': node.uuid})
@@ -2191,18 +2237,16 @@ class ConductorManager(base_manager.BaseConductorManager):
"""
LOG.debug('RPC set_console_mode called for node %(node)s with '
'enabled %(enabled)s', {'node': node_id, 'enabled': enabled})
-
- with task_manager.acquire(context, node_id, shared=False,
+ with task_manager.acquire(context, node_id, shared=True,
purpose='setting console mode') as task:
node = task.node
-
task.driver.console.validate(task)
-
if enabled == node.console_enabled:
op = 'enabled' if enabled else 'disabled'
LOG.info("No console action was triggered because the "
"console is already %s", op)
else:
+ task.upgrade_lock()
node.last_error = None
node.save()
task.spawn_after(self._spawn_worker,
@@ -2613,14 +2657,63 @@ class ConductorManager(base_manager.BaseConductorManager):
# Yield on every iteration
eventlet.sleep(0)
+ def _sensors_conductor(self, context):
+ """Called to collect and send metrics "sensors" for the conductor."""
+ # populate the message which will be sent to ceilometer
+ # or other data consumer
+ message = {'message_id': uuidutils.generate_uuid(),
+ 'timestamp': datetime.datetime.utcnow(),
+ 'hostname': self.host}
+
+ try:
+ ev_type = 'ironic.metrics'
+ message['event_type'] = ev_type + '.update'
+ sensors_data = METRICS.get_metrics_data()
+ except AttributeError:
+ # TODO(TheJulia): Remove this at some point, but right now
+ # don't inherently break on version mismatches when people
+ # disregard requriements.
+ LOG.warning(
+ 'get_sensors_data has been configured to collect '
+ 'conductor metrics, however the installed ironic-lib '
+ 'library lacks the functionality. Please update '
+ 'ironic-lib to a minimum of version 5.4.0.')
+ except Exception as e:
+ LOG.exception(
+ "An unknown error occured while attempting to collect "
+ "sensor data from within the conductor. Error: %(error)s",
+ {'error': e})
+ else:
+ message['payload'] = (
+ self._filter_out_unsupported_types(sensors_data))
+ if message['payload']:
+ self.sensors_notifier.info(
+ context, ev_type, message)
+
@METRICS.timer('ConductorManager._send_sensor_data')
- @periodics.periodic(spacing=CONF.conductor.send_sensor_data_interval,
- enabled=CONF.conductor.send_sensor_data)
+ @periodics.periodic(spacing=CONF.sensor_data.interval,
+ enabled=CONF.sensor_data.send_sensor_data)
def _send_sensor_data(self, context):
"""Periodically collects and transmits sensor data notifications."""
+ if CONF.sensor_data.enable_for_conductor:
+ if CONF.sensor_data.workers == 1:
+ # Directly call the sensors_conductor when only one
+ # worker is permitted, so we collect data serially
+ # instead.
+ self._sensors_conductor(context)
+ else:
+ # Also, do not apply the general threshold limit to
+ # the self collection of "sensor" data from the conductor,
+ # as were not launching external processes, we're just reading
+ # from an internal data structure, if we can.
+ self._spawn_worker(self._sensors_conductor, context)
+ if not CONF.sensor_data.enable_for_nodes:
+ # NOTE(TheJulia): If node sensor data is not required, then
+ # skip the rest of this method.
+ return
filters = {}
- if not CONF.conductor.send_sensor_data_for_undeployed_nodes:
+ if not CONF.sensor_data.enable_for_undeployed_nodes:
filters['provision_state'] = states.ACTIVE
nodes = queue.Queue()
@@ -2628,7 +2721,7 @@ class ConductorManager(base_manager.BaseConductorManager):
filters=filters):
nodes.put_nowait(node_info)
- number_of_threads = min(CONF.conductor.send_sensor_data_workers,
+ number_of_threads = min(CONF.sensor_data.workers,
nodes.qsize())
futures = []
for thread_number in range(number_of_threads):
@@ -2644,7 +2737,7 @@ class ConductorManager(base_manager.BaseConductorManager):
break
done, not_done = waiters.wait_for_all(
- futures, timeout=CONF.conductor.send_sensor_data_wait_timeout)
+ futures, timeout=CONF.sensor_data.wait_timeout)
if not_done:
LOG.warning("%d workers for send sensors data did not complete",
len(not_done))
@@ -2653,13 +2746,14 @@ class ConductorManager(base_manager.BaseConductorManager):
"""Filters out sensor data types that aren't specified in the config.
Removes sensor data types that aren't specified in
- CONF.conductor.send_sensor_data_types.
+ CONF.sensor_data.data_types.
:param sensors_data: dict containing sensor types and the associated
data
:returns: dict with unsupported sensor types removed
"""
- allowed = set(x.lower() for x in CONF.conductor.send_sensor_data_types)
+ allowed = set(x.lower() for x in
+ CONF.sensor_data.data_types)
if 'all' in allowed:
return sensors_data
@@ -3457,7 +3551,6 @@ class ConductorManager(base_manager.BaseConductorManager):
self.conductor.id):
# Another conductor has taken over, skipping
continue
-
LOG.debug('Taking over allocation %s', allocation.uuid)
allocations.do_allocate(context, allocation)
except Exception:
@@ -3549,6 +3642,40 @@ class ConductorManager(base_manager.BaseConductorManager):
# impact DB access if done in excess.
eventlet.sleep(0)
+ def _concurrent_action_limit(self, action):
+ """Check Concurrency limits and block operations if needed.
+
+ This method is used to serve as a central place for the logic
+ for checks on concurrency limits. If a limit is reached, then
+ an appropriate exception is raised.
+
+ :raises: ConcurrentActionLimit If the system configuration
+ is exceeded.
+ """
+ # NOTE(TheJulia): Keeping this all in one place for simplicity.
+ if action == 'provisioning':
+ node_count = self.dbapi.count_nodes_in_provision_state([
+ states.DEPLOYING,
+ states.DEPLOYWAIT
+ ])
+ if node_count >= CONF.conductor.max_concurrent_deploy:
+ raise exception.ConcurrentActionLimit(
+ task_type=action)
+
+ if action == 'unprovisioning' or action == 'cleaning':
+ # NOTE(TheJulia): This also checks for the deleting state
+ # which is super transitory, *but* you can get a node into
+ # the state. So in order to guard against a DoS attack, we
+ # need to check even the super transitory node state.
+ node_count = self.dbapi.count_nodes_in_provision_state([
+ states.DELETING,
+ states.CLEANING,
+ states.CLEANWAIT
+ ])
+ if node_count >= CONF.conductor.max_concurrent_clean:
+ raise exception.ConcurrentActionLimit(
+ task_type=action)
+
@METRICS.timer('get_vendor_passthru_metadata')
def get_vendor_passthru_metadata(route_dict):