summaryrefslogtreecommitdiff
path: root/ironic/conductor/manager.py
diff options
context:
space:
mode:
Diffstat (limited to 'ironic/conductor/manager.py')
-rw-r--r--ironic/conductor/manager.py166
1 files changed, 78 insertions, 88 deletions
diff --git a/ironic/conductor/manager.py b/ironic/conductor/manager.py
index ad45d2d74..bbd2355bd 100644
--- a/ironic/conductor/manager.py
+++ b/ironic/conductor/manager.py
@@ -63,6 +63,7 @@ from ironic.conductor import allocations
from ironic.conductor import base_manager
from ironic.conductor import cleaning
from ironic.conductor import deployments
+from ironic.conductor import inspection
from ironic.conductor import notification_utils as notify_utils
from ironic.conductor import periodics
from ironic.conductor import steps as conductor_steps
@@ -98,6 +99,8 @@ class ConductorManager(base_manager.BaseConductorManager):
def __init__(self, host, topic):
super(ConductorManager, self).__init__(host, topic)
+ # NOTE(TheJulia): This is less a metric-able count, but a means to
+ # sort out nodes and prioritise a subset (of non-responding nodes).
self.power_state_sync_count = collections.defaultdict(int)
@METRICS.timer('ConductorManager._clean_up_caches')
@@ -1349,7 +1352,8 @@ class ConductorManager(base_manager.BaseConductorManager):
callback=self._spawn_worker,
call_args=(cleaning.do_node_clean_abort, task),
err_handler=utils.provisioning_error_handler,
- target_state=target_state)
+ target_state=target_state,
+ last_error=cleaning.get_last_error(node))
return
if node.provision_state == states.RESCUEWAIT:
@@ -1362,35 +1366,7 @@ class ConductorManager(base_manager.BaseConductorManager):
return
if node.provision_state == states.INSPECTWAIT:
- try:
- task.driver.inspect.abort(task)
- except exception.UnsupportedDriverExtension:
- with excutils.save_and_reraise_exception():
- intf_name = task.driver.inspect.__class__.__name__
- LOG.error('Inspect interface %(intf)s does not '
- 'support abort operation when aborting '
- 'inspection of node %(node)s',
- {'intf': intf_name, 'node': node.uuid})
- except Exception as e:
- with excutils.save_and_reraise_exception():
- LOG.exception('Error in aborting the inspection of '
- 'node %(node)s', {'node': node.uuid})
- error = _('Failed to abort inspection: %s') % e
- utils.node_history_record(task.node, event=error,
- event_type=states.INTROSPECTION,
- error=True,
- user=task.context.user_id)
- node.save()
- error = _('Inspection was aborted by request.')
- utils.node_history_record(task.node, event=error,
- event_type=states.INTROSPECTION,
- error=True,
- user=task.context.user_id)
- utils.wipe_token_and_url(task)
- task.process_event('abort')
- LOG.info('Successfully aborted inspection of node %(node)s',
- {'node': node.uuid})
- return
+ return inspection.abort_inspection(task)
@METRICS.timer('ConductorManager._sync_power_states')
@periodics.periodic(spacing=CONF.conductor.sync_power_state_interval,
@@ -1433,6 +1409,11 @@ class ConductorManager(base_manager.BaseConductorManager):
finally:
waiters.wait_for_all(futures)
+ # report a count of the nodes
+ METRICS.send_gauge(
+ 'ConductorManager.PowerSyncNodesCount',
+ len(nodes))
+
def _sync_power_state_nodes_task(self, context, nodes):
"""Invokes power state sync on nodes from synchronized queue.
@@ -1451,6 +1432,7 @@ class ConductorManager(base_manager.BaseConductorManager):
can do here to avoid failing a brand new deploy to a node that
we've locked here, though.
"""
+
# FIXME(comstud): Since our initial state checks are outside
# of the lock (to try to avoid the lock), some checks are
# repeated after grabbing the lock so we can unlock quickly.
@@ -1497,6 +1479,12 @@ class ConductorManager(base_manager.BaseConductorManager):
LOG.info("During sync_power_state, node %(node)s was not "
"found and presumed deleted by another process.",
{'node': node_uuid})
+ # TODO(TheJulia): The chance exists that we orphan a node
+ # in power_state_sync_count, albeit it is not much data,
+ # it could eventually cause the memory footprint to grow
+ # on an exceptionally large ironic deployment. We should
+ # make sure we clean it up at some point, but overall given
+ # minimal impact, it is definite low hanging fruit.
except exception.NodeLocked:
LOG.info("During sync_power_state, node %(node)s was "
"already locked by another process. Skip.",
@@ -1513,6 +1501,7 @@ class ConductorManager(base_manager.BaseConductorManager):
# regular power state checking, maintenance is still a required
# condition.
filters={'maintenance': True, 'fault': faults.POWER_FAILURE},
+ node_count_metric_name='ConductorManager.PowerSyncRecoveryNodeCount',
)
def _power_failure_recovery(self, task, context):
"""Periodic task to check power states for nodes in maintenance.
@@ -1855,6 +1844,7 @@ class ConductorManager(base_manager.BaseConductorManager):
predicate=lambda n, m: n.conductor_affinity != m.conductor.id,
limit=lambda: CONF.conductor.periodic_max_workers,
shared_task=False,
+ node_count_metric_name='ConductorManager.SyncLocalStateNodeCount',
)
def _sync_local_state(self, task, context):
"""Perform any actions necessary to sync local state.
@@ -2640,14 +2630,63 @@ class ConductorManager(base_manager.BaseConductorManager):
# Yield on every iteration
eventlet.sleep(0)
+ def _sensors_conductor(self, context):
+ """Called to collect and send metrics "sensors" for the conductor."""
+ # populate the message which will be sent to ceilometer
+ # or other data consumer
+ message = {'message_id': uuidutils.generate_uuid(),
+ 'timestamp': datetime.datetime.utcnow(),
+ 'hostname': self.host}
+
+ try:
+ ev_type = 'ironic.metrics'
+ message['event_type'] = ev_type + '.update'
+ sensors_data = METRICS.get_metrics_data()
+ except AttributeError:
+ # TODO(TheJulia): Remove this at some point, but right now
+ # don't inherently break on version mismatches when people
+ # disregard requriements.
+ LOG.warning(
+ 'get_sensors_data has been configured to collect '
+ 'conductor metrics, however the installed ironic-lib '
+ 'library lacks the functionality. Please update '
+ 'ironic-lib to a minimum of version 5.4.0.')
+ except Exception as e:
+ LOG.exception(
+ "An unknown error occured while attempting to collect "
+ "sensor data from within the conductor. Error: %(error)s",
+ {'error': e})
+ else:
+ message['payload'] = (
+ self._filter_out_unsupported_types(sensors_data))
+ if message['payload']:
+ self.sensors_notifier.info(
+ context, ev_type, message)
+
@METRICS.timer('ConductorManager._send_sensor_data')
- @periodics.periodic(spacing=CONF.conductor.send_sensor_data_interval,
- enabled=CONF.conductor.send_sensor_data)
+ @periodics.periodic(spacing=CONF.sensor_data.interval,
+ enabled=CONF.sensor_data.send_sensor_data)
def _send_sensor_data(self, context):
"""Periodically collects and transmits sensor data notifications."""
+ if CONF.sensor_data.enable_for_conductor:
+ if CONF.sensor_data.workers == 1:
+ # Directly call the sensors_conductor when only one
+ # worker is permitted, so we collect data serially
+ # instead.
+ self._sensors_conductor(context)
+ else:
+ # Also, do not apply the general threshold limit to
+ # the self collection of "sensor" data from the conductor,
+ # as were not launching external processes, we're just reading
+ # from an internal data structure, if we can.
+ self._spawn_worker(self._sensors_conductor, context)
+ if not CONF.sensor_data.enable_for_nodes:
+ # NOTE(TheJulia): If node sensor data is not required, then
+ # skip the rest of this method.
+ return
filters = {}
- if not CONF.conductor.send_sensor_data_for_undeployed_nodes:
+ if not CONF.sensor_data.enable_for_undeployed_nodes:
filters['provision_state'] = states.ACTIVE
nodes = queue.Queue()
@@ -2655,7 +2694,7 @@ class ConductorManager(base_manager.BaseConductorManager):
filters=filters):
nodes.put_nowait(node_info)
- number_of_threads = min(CONF.conductor.send_sensor_data_workers,
+ number_of_threads = min(CONF.sensor_data.workers,
nodes.qsize())
futures = []
for thread_number in range(number_of_threads):
@@ -2671,7 +2710,7 @@ class ConductorManager(base_manager.BaseConductorManager):
break
done, not_done = waiters.wait_for_all(
- futures, timeout=CONF.conductor.send_sensor_data_wait_timeout)
+ futures, timeout=CONF.sensor_data.wait_timeout)
if not_done:
LOG.warning("%d workers for send sensors data did not complete",
len(not_done))
@@ -2680,13 +2719,14 @@ class ConductorManager(base_manager.BaseConductorManager):
"""Filters out sensor data types that aren't specified in the config.
Removes sensor data types that aren't specified in
- CONF.conductor.send_sensor_data_types.
+ CONF.sensor_data.data_types.
:param sensors_data: dict containing sensor types and the associated
data
:returns: dict with unsupported sensor types removed
"""
- allowed = set(x.lower() for x in CONF.conductor.send_sensor_data_types)
+ allowed = set(x.lower() for x in
+ CONF.sensor_data.data_types)
if 'all' in allowed:
return sensors_data
@@ -2971,7 +3011,7 @@ class ConductorManager(base_manager.BaseConductorManager):
task.process_event(
'inspect',
callback=self._spawn_worker,
- call_args=(_do_inspect_hardware, task),
+ call_args=(inspection.inspect_hardware, task),
err_handler=utils.provisioning_error_handler)
except exception.InvalidState:
@@ -3791,53 +3831,3 @@ def do_sync_power_state(task, count):
task, old_power_state)
return count
-
-
-@task_manager.require_exclusive_lock
-def _do_inspect_hardware(task):
- """Initiates inspection.
-
- :param task: a TaskManager instance with an exclusive lock
- on its node.
- :raises: HardwareInspectionFailure if driver doesn't
- return the state as states.MANAGEABLE, states.INSPECTWAIT.
-
- """
- node = task.node
-
- def handle_failure(e, log_func=LOG.error):
- utils.node_history_record(task.node, event=e,
- event_type=states.INTROSPECTION,
- error=True, user=task.context.user_id)
- task.process_event('fail')
- log_func("Failed to inspect node %(node)s: %(err)s",
- {'node': node.uuid, 'err': e})
-
- # Inspection cannot start in fast-track mode, wipe token and URL.
- utils.wipe_token_and_url(task)
-
- try:
- new_state = task.driver.inspect.inspect_hardware(task)
- except exception.IronicException as e:
- with excutils.save_and_reraise_exception():
- error = str(e)
- handle_failure(error)
- except Exception as e:
- error = (_('Unexpected exception of type %(type)s: %(msg)s') %
- {'type': type(e).__name__, 'msg': e})
- handle_failure(error, log_func=LOG.exception)
- raise exception.HardwareInspectionFailure(error=error)
-
- if new_state == states.MANAGEABLE:
- task.process_event('done')
- LOG.info('Successfully inspected node %(node)s',
- {'node': node.uuid})
- elif new_state == states.INSPECTWAIT:
- task.process_event('wait')
- LOG.info('Successfully started introspection on node %(node)s',
- {'node': node.uuid})
- else:
- error = (_("During inspection, driver returned unexpected "
- "state %(state)s") % {'state': new_state})
- handle_failure(error)
- raise exception.HardwareInspectionFailure(error=error)