diff options
Diffstat (limited to 'ironic/conductor/manager.py')
-rw-r--r-- | ironic/conductor/manager.py | 166 |
1 files changed, 78 insertions, 88 deletions
diff --git a/ironic/conductor/manager.py b/ironic/conductor/manager.py index ad45d2d74..bbd2355bd 100644 --- a/ironic/conductor/manager.py +++ b/ironic/conductor/manager.py @@ -63,6 +63,7 @@ from ironic.conductor import allocations from ironic.conductor import base_manager from ironic.conductor import cleaning from ironic.conductor import deployments +from ironic.conductor import inspection from ironic.conductor import notification_utils as notify_utils from ironic.conductor import periodics from ironic.conductor import steps as conductor_steps @@ -98,6 +99,8 @@ class ConductorManager(base_manager.BaseConductorManager): def __init__(self, host, topic): super(ConductorManager, self).__init__(host, topic) + # NOTE(TheJulia): This is less a metric-able count, but a means to + # sort out nodes and prioritise a subset (of non-responding nodes). self.power_state_sync_count = collections.defaultdict(int) @METRICS.timer('ConductorManager._clean_up_caches') @@ -1349,7 +1352,8 @@ class ConductorManager(base_manager.BaseConductorManager): callback=self._spawn_worker, call_args=(cleaning.do_node_clean_abort, task), err_handler=utils.provisioning_error_handler, - target_state=target_state) + target_state=target_state, + last_error=cleaning.get_last_error(node)) return if node.provision_state == states.RESCUEWAIT: @@ -1362,35 +1366,7 @@ class ConductorManager(base_manager.BaseConductorManager): return if node.provision_state == states.INSPECTWAIT: - try: - task.driver.inspect.abort(task) - except exception.UnsupportedDriverExtension: - with excutils.save_and_reraise_exception(): - intf_name = task.driver.inspect.__class__.__name__ - LOG.error('Inspect interface %(intf)s does not ' - 'support abort operation when aborting ' - 'inspection of node %(node)s', - {'intf': intf_name, 'node': node.uuid}) - except Exception as e: - with excutils.save_and_reraise_exception(): - LOG.exception('Error in aborting the inspection of ' - 'node %(node)s', {'node': node.uuid}) - error = _('Failed to abort inspection: %s') % e - utils.node_history_record(task.node, event=error, - event_type=states.INTROSPECTION, - error=True, - user=task.context.user_id) - node.save() - error = _('Inspection was aborted by request.') - utils.node_history_record(task.node, event=error, - event_type=states.INTROSPECTION, - error=True, - user=task.context.user_id) - utils.wipe_token_and_url(task) - task.process_event('abort') - LOG.info('Successfully aborted inspection of node %(node)s', - {'node': node.uuid}) - return + return inspection.abort_inspection(task) @METRICS.timer('ConductorManager._sync_power_states') @periodics.periodic(spacing=CONF.conductor.sync_power_state_interval, @@ -1433,6 +1409,11 @@ class ConductorManager(base_manager.BaseConductorManager): finally: waiters.wait_for_all(futures) + # report a count of the nodes + METRICS.send_gauge( + 'ConductorManager.PowerSyncNodesCount', + len(nodes)) + def _sync_power_state_nodes_task(self, context, nodes): """Invokes power state sync on nodes from synchronized queue. @@ -1451,6 +1432,7 @@ class ConductorManager(base_manager.BaseConductorManager): can do here to avoid failing a brand new deploy to a node that we've locked here, though. """ + # FIXME(comstud): Since our initial state checks are outside # of the lock (to try to avoid the lock), some checks are # repeated after grabbing the lock so we can unlock quickly. @@ -1497,6 +1479,12 @@ class ConductorManager(base_manager.BaseConductorManager): LOG.info("During sync_power_state, node %(node)s was not " "found and presumed deleted by another process.", {'node': node_uuid}) + # TODO(TheJulia): The chance exists that we orphan a node + # in power_state_sync_count, albeit it is not much data, + # it could eventually cause the memory footprint to grow + # on an exceptionally large ironic deployment. We should + # make sure we clean it up at some point, but overall given + # minimal impact, it is definite low hanging fruit. except exception.NodeLocked: LOG.info("During sync_power_state, node %(node)s was " "already locked by another process. Skip.", @@ -1513,6 +1501,7 @@ class ConductorManager(base_manager.BaseConductorManager): # regular power state checking, maintenance is still a required # condition. filters={'maintenance': True, 'fault': faults.POWER_FAILURE}, + node_count_metric_name='ConductorManager.PowerSyncRecoveryNodeCount', ) def _power_failure_recovery(self, task, context): """Periodic task to check power states for nodes in maintenance. @@ -1855,6 +1844,7 @@ class ConductorManager(base_manager.BaseConductorManager): predicate=lambda n, m: n.conductor_affinity != m.conductor.id, limit=lambda: CONF.conductor.periodic_max_workers, shared_task=False, + node_count_metric_name='ConductorManager.SyncLocalStateNodeCount', ) def _sync_local_state(self, task, context): """Perform any actions necessary to sync local state. @@ -2640,14 +2630,63 @@ class ConductorManager(base_manager.BaseConductorManager): # Yield on every iteration eventlet.sleep(0) + def _sensors_conductor(self, context): + """Called to collect and send metrics "sensors" for the conductor.""" + # populate the message which will be sent to ceilometer + # or other data consumer + message = {'message_id': uuidutils.generate_uuid(), + 'timestamp': datetime.datetime.utcnow(), + 'hostname': self.host} + + try: + ev_type = 'ironic.metrics' + message['event_type'] = ev_type + '.update' + sensors_data = METRICS.get_metrics_data() + except AttributeError: + # TODO(TheJulia): Remove this at some point, but right now + # don't inherently break on version mismatches when people + # disregard requriements. + LOG.warning( + 'get_sensors_data has been configured to collect ' + 'conductor metrics, however the installed ironic-lib ' + 'library lacks the functionality. Please update ' + 'ironic-lib to a minimum of version 5.4.0.') + except Exception as e: + LOG.exception( + "An unknown error occured while attempting to collect " + "sensor data from within the conductor. Error: %(error)s", + {'error': e}) + else: + message['payload'] = ( + self._filter_out_unsupported_types(sensors_data)) + if message['payload']: + self.sensors_notifier.info( + context, ev_type, message) + @METRICS.timer('ConductorManager._send_sensor_data') - @periodics.periodic(spacing=CONF.conductor.send_sensor_data_interval, - enabled=CONF.conductor.send_sensor_data) + @periodics.periodic(spacing=CONF.sensor_data.interval, + enabled=CONF.sensor_data.send_sensor_data) def _send_sensor_data(self, context): """Periodically collects and transmits sensor data notifications.""" + if CONF.sensor_data.enable_for_conductor: + if CONF.sensor_data.workers == 1: + # Directly call the sensors_conductor when only one + # worker is permitted, so we collect data serially + # instead. + self._sensors_conductor(context) + else: + # Also, do not apply the general threshold limit to + # the self collection of "sensor" data from the conductor, + # as were not launching external processes, we're just reading + # from an internal data structure, if we can. + self._spawn_worker(self._sensors_conductor, context) + if not CONF.sensor_data.enable_for_nodes: + # NOTE(TheJulia): If node sensor data is not required, then + # skip the rest of this method. + return filters = {} - if not CONF.conductor.send_sensor_data_for_undeployed_nodes: + if not CONF.sensor_data.enable_for_undeployed_nodes: filters['provision_state'] = states.ACTIVE nodes = queue.Queue() @@ -2655,7 +2694,7 @@ class ConductorManager(base_manager.BaseConductorManager): filters=filters): nodes.put_nowait(node_info) - number_of_threads = min(CONF.conductor.send_sensor_data_workers, + number_of_threads = min(CONF.sensor_data.workers, nodes.qsize()) futures = [] for thread_number in range(number_of_threads): @@ -2671,7 +2710,7 @@ class ConductorManager(base_manager.BaseConductorManager): break done, not_done = waiters.wait_for_all( - futures, timeout=CONF.conductor.send_sensor_data_wait_timeout) + futures, timeout=CONF.sensor_data.wait_timeout) if not_done: LOG.warning("%d workers for send sensors data did not complete", len(not_done)) @@ -2680,13 +2719,14 @@ class ConductorManager(base_manager.BaseConductorManager): """Filters out sensor data types that aren't specified in the config. Removes sensor data types that aren't specified in - CONF.conductor.send_sensor_data_types. + CONF.sensor_data.data_types. :param sensors_data: dict containing sensor types and the associated data :returns: dict with unsupported sensor types removed """ - allowed = set(x.lower() for x in CONF.conductor.send_sensor_data_types) + allowed = set(x.lower() for x in + CONF.sensor_data.data_types) if 'all' in allowed: return sensors_data @@ -2971,7 +3011,7 @@ class ConductorManager(base_manager.BaseConductorManager): task.process_event( 'inspect', callback=self._spawn_worker, - call_args=(_do_inspect_hardware, task), + call_args=(inspection.inspect_hardware, task), err_handler=utils.provisioning_error_handler) except exception.InvalidState: @@ -3791,53 +3831,3 @@ def do_sync_power_state(task, count): task, old_power_state) return count - - -@task_manager.require_exclusive_lock -def _do_inspect_hardware(task): - """Initiates inspection. - - :param task: a TaskManager instance with an exclusive lock - on its node. - :raises: HardwareInspectionFailure if driver doesn't - return the state as states.MANAGEABLE, states.INSPECTWAIT. - - """ - node = task.node - - def handle_failure(e, log_func=LOG.error): - utils.node_history_record(task.node, event=e, - event_type=states.INTROSPECTION, - error=True, user=task.context.user_id) - task.process_event('fail') - log_func("Failed to inspect node %(node)s: %(err)s", - {'node': node.uuid, 'err': e}) - - # Inspection cannot start in fast-track mode, wipe token and URL. - utils.wipe_token_and_url(task) - - try: - new_state = task.driver.inspect.inspect_hardware(task) - except exception.IronicException as e: - with excutils.save_and_reraise_exception(): - error = str(e) - handle_failure(error) - except Exception as e: - error = (_('Unexpected exception of type %(type)s: %(msg)s') % - {'type': type(e).__name__, 'msg': e}) - handle_failure(error, log_func=LOG.exception) - raise exception.HardwareInspectionFailure(error=error) - - if new_state == states.MANAGEABLE: - task.process_event('done') - LOG.info('Successfully inspected node %(node)s', - {'node': node.uuid}) - elif new_state == states.INSPECTWAIT: - task.process_event('wait') - LOG.info('Successfully started introspection on node %(node)s', - {'node': node.uuid}) - else: - error = (_("During inspection, driver returned unexpected " - "state %(state)s") % {'state': new_state}) - handle_failure(error) - raise exception.HardwareInspectionFailure(error=error) |