diff options
author | Zuul <zuul@review.opendev.org> | 2021-10-14 12:20:54 +0000 |
---|---|---|
committer | Gerrit Code Review <review@openstack.org> | 2021-10-14 12:20:54 +0000 |
commit | b93dfd54c206c4bfd0584251166dd2d03b8b0f8d (patch) | |
tree | 8d2ce5321c0aeb2acb6bb8404d6fd4bed40734a1 /ironic/conductor | |
parent | 33cc04706ff2171a4090695afc45b90b47d32237 (diff) | |
parent | cf1b42ea3d35d51d327e3aff0a05a9d402af0e15 (diff) | |
download | ironic-b93dfd54c206c4bfd0584251166dd2d03b8b0f8d.tar.gz |
Merge "Add a helper for node-based periodics"
Diffstat (limited to 'ironic/conductor')
-rw-r--r-- | ironic/conductor/manager.py | 157 | ||||
-rw-r--r-- | ironic/conductor/periodics.py | 151 |
2 files changed, 212 insertions, 96 deletions
diff --git a/ironic/conductor/manager.py b/ironic/conductor/manager.py index ed54aa67b..4c49bc789 100644 --- a/ironic/conductor/manager.py +++ b/ironic/conductor/manager.py @@ -45,7 +45,6 @@ import datetime import queue import eventlet -from futurist import periodics from futurist import waiters from ironic_lib import metrics_utils from oslo_log import log @@ -66,6 +65,7 @@ from ironic.conductor import base_manager from ironic.conductor import cleaning from ironic.conductor import deployments from ironic.conductor import notification_utils as notify_utils +from ironic.conductor import periodics from ironic.conductor import steps as conductor_steps from ironic.conductor import task_manager from ironic.conductor import utils @@ -1497,10 +1497,15 @@ class ConductorManager(base_manager.BaseConductorManager): eventlet.sleep(0) @METRICS.timer('ConductorManager._power_failure_recovery') - @periodics.periodic(spacing=CONF.conductor.power_failure_recovery_interval, - enabled=bool( - CONF.conductor.power_failure_recovery_interval)) - def _power_failure_recovery(self, context): + @periodics.node_periodic( + purpose='power failure recovery', + spacing=CONF.conductor.power_failure_recovery_interval, + # NOTE(kaifeng) To avoid conflicts with periodic task of the + # regular power state checking, maintenance is still a required + # condition. + filters={'maintenance': True, 'fault': faults.POWER_FAILURE}, + ) + def _power_failure_recovery(self, task, context): """Periodic task to check power states for nodes in maintenance. Attempt to grab a lock and sync only if the following @@ -1511,19 +1516,6 @@ class ConductorManager(base_manager.BaseConductorManager): 3) Node is not reserved. 4) Node is not in the ENROLL state. """ - def should_sync_power_state_for_recovery(task): - """Check if ironic should sync power state for recovery.""" - - # NOTE(dtantsur): it's also pointless (and dangerous) to - # sync power state when a power action is in progress - if (task.node.provision_state == states.ENROLL - or not task.node.maintenance - or task.node.fault != faults.POWER_FAILURE - or task.node.target_power_state - or task.node.reservation): - return False - return True - def handle_recovery(task, actual_power_state): """Handle recovery when power sync is succeeded.""" task.upgrade_lock() @@ -1546,48 +1538,33 @@ class ConductorManager(base_manager.BaseConductorManager): notify_utils.emit_power_state_corrected_notification( task, old_power_state) - # NOTE(kaifeng) To avoid conflicts with periodic task of the - # regular power state checking, maintenance is still a required - # condition. - filters = {'maintenance': True, - 'fault': faults.POWER_FAILURE} - node_iter = self.iter_nodes(fields=['id'], filters=filters) - for (node_uuid, driver, conductor_group, node_id) in node_iter: - try: - with task_manager.acquire(context, node_uuid, - purpose='power failure recovery', - shared=True) as task: - if not should_sync_power_state_for_recovery(task): - continue - try: - # Validate driver info in case of parameter changed - # in maintenance. - task.driver.power.validate(task) - # The driver may raise an exception, or may return - # ERROR. Handle both the same way. - power_state = task.driver.power.get_power_state(task) - if power_state == states.ERROR: - raise exception.PowerStateFailure( - _("Power driver returned ERROR state " - "while trying to get power state.")) - except Exception as e: - LOG.debug("During power_failure_recovery, could " - "not get power state for node %(node)s, " - "Error: %(err)s.", - {'node': task.node.uuid, 'err': e}) - else: - handle_recovery(task, power_state) - except exception.NodeNotFound: - LOG.info("During power_failure_recovery, node %(node)s was " - "not found and presumed deleted by another process.", - {'node': node_uuid}) - except exception.NodeLocked: - LOG.info("During power_failure_recovery, node %(node)s was " - "already locked by another process. Skip.", - {'node': node_uuid}) - finally: - # Yield on every iteration - eventlet.sleep(0) + # NOTE(dtantsur): it's also pointless (and dangerous) to + # sync power state when a power action is in progress + if (task.node.provision_state == states.ENROLL + or not task.node.maintenance + or task.node.fault != faults.POWER_FAILURE + or task.node.target_power_state + or task.node.reservation): + return + + try: + # Validate driver info in case of parameter changed + # in maintenance. + task.driver.power.validate(task) + # The driver may raise an exception, or may return + # ERROR. Handle both the same way. + power_state = task.driver.power.get_power_state(task) + if power_state == states.ERROR: + raise exception.PowerStateFailure( + _("Power driver returned ERROR state " + "while trying to get power state.")) + except Exception as e: + LOG.debug("During power_failure_recovery, could " + "not get power state for node %(node)s, " + "Error: %(err)s.", + {'node': task.node.uuid, 'err': e}) + else: + handle_recovery(task, power_state) @METRICS.timer('ConductorManager._check_deploy_timeouts') @periodics.periodic( @@ -1869,9 +1846,17 @@ class ConductorManager(base_manager.BaseConductorManager): ) @METRICS.timer('ConductorManager._sync_local_state') - @periodics.periodic(spacing=CONF.conductor.sync_local_state_interval, - enabled=CONF.conductor.sync_local_state_interval > 0) - def _sync_local_state(self, context): + @periodics.node_periodic( + purpose='node take over', + spacing=CONF.conductor.sync_local_state_interval, + filters={'reserved': False, 'maintenance': False, + 'provision_state': states.ACTIVE}, + predicate_extra_fields=['conductor_affinity'], + predicate=lambda n, m: n.conductor_affinity != m.conductor.id, + limit=lambda: CONF.conductor.periodic_max_workers, + shared_task=False, + ) + def _sync_local_state(self, task, context): """Perform any actions necessary to sync local state. This is called periodically to refresh the conductor's copy of the @@ -1880,40 +1865,20 @@ class ConductorManager(base_manager.BaseConductorManager): The ensuing actions could include preparing a PXE environment, updating the DHCP server, and so on. """ - filters = {'reserved': False, - 'maintenance': False, - 'provision_state': states.ACTIVE} - node_iter = self.iter_nodes(fields=['id', 'conductor_affinity'], - filters=filters) - - workers_count = 0 - for (node_uuid, driver, conductor_group, node_id, - conductor_affinity) in node_iter: - if conductor_affinity == self.conductor.id: - continue - - # Node is mapped here, but not updated by this conductor last - try: - with task_manager.acquire(context, node_uuid, - purpose='node take over') as task: - # NOTE(tenbrae): now that we have the lock, check again to - # avoid racing with deletes and other state changes - node = task.node - if (node.maintenance - or node.conductor_affinity == self.conductor.id - or node.provision_state != states.ACTIVE): - continue - - task.spawn_after(self._spawn_worker, - self._do_takeover, task) + # NOTE(tenbrae): now that we have the lock, check again to + # avoid racing with deletes and other state changes + node = task.node + if (node.maintenance + or node.conductor_affinity == self.conductor.id + or node.provision_state != states.ACTIVE): + return False - except exception.NoFreeConductorWorker: - break - except (exception.NodeLocked, exception.NodeNotFound): - continue - workers_count += 1 - if workers_count == CONF.conductor.periodic_max_workers: - break + try: + task.spawn_after(self._spawn_worker, self._do_takeover, task) + except exception.NoFreeConductorWorker: + raise periodics.Stop() + else: + return True @METRICS.timer('ConductorManager.validate_driver_interfaces') @messaging.expected_exceptions(exception.NodeLocked) diff --git a/ironic/conductor/periodics.py b/ironic/conductor/periodics.py new file mode 100644 index 000000000..ead5cbf08 --- /dev/null +++ b/ironic/conductor/periodics.py @@ -0,0 +1,151 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +"""Conductor periodics.""" + +import collections +import functools +import inspect + +import eventlet +from futurist import periodics +from oslo_log import log + +from ironic.common import exception +from ironic.conductor import base_manager +from ironic.conductor import task_manager + + +LOG = log.getLogger(__name__) + + +def periodic(spacing, enabled=True, **kwargs): + """A decorator to define a periodic task. + + :param spacing: how often (in seconds) to run the periodic task. + :param enabled: whether the task is enabled; defaults to ``spacing > 0``. + """ + return periodics.periodic(spacing=spacing, + enabled=enabled and spacing > 0, + **kwargs) + + +class Stop(Exception): + """A signal to stop the current iteration of a periodic task.""" + + +def node_periodic(purpose, spacing, enabled=True, filters=None, + predicate=None, predicate_extra_fields=(), limit=None, + shared_task=True): + """A decorator to define a periodic task to act on nodes. + + Defines a periodic task that fetches the list of nodes mapped to the + current conductor which satisfy the provided filters. + + The decorated function must be a method on either the conductor manager + or a hardware interface. The signature is: + + * for conductor manager: ``(self, task, context)`` + * for hardware interfaces: ``(self, task, manager, context)``. + + ``NodeNotFound`` and ``NodeLocked`` exceptions are ignored. Raise ``Stop`` + to abort the current iteration of the task and reschedule it. + + :param purpose: a human-readable description of the activity, e.g. + "verifying that the cat is purring". + :param spacing: how often (in seconds) to run the periodic task. + :param enabled: whether the task is enabled; defaults to ``spacing > 0``. + :param filters: database-level filters for the nodes. + :param predicate: a callable to run on the fetched nodes *before* creating + a task for them. The only parameter will be a named tuple with fields + ``uuid``, ``driver``, ``conductor_group`` plus everything from + ``predicate_extra_fields``. If the callable accepts a 2nd parameter, + it will be the conductor manager instance. + :param predicate_extra_fields: extra fields to fetch on the initial + request and pass into the ``predicate``. Must not contain ``uuid``, + ``driver`` and ``conductor_group`` since they are always included. + :param limit: how many nodes to process before stopping the current + iteration. If ``predicate`` returns ``False``, the node is not counted. + If the decorated function returns ``False``, the node is not counted + either. Can be a callable, in which case it will be called on each + iteration to determine the limit. + :param shared_task: if ``True``, the task will have a shared lock. It is + recommended to start with a shared lock and upgrade it only if needed. + """ + node_type = collections.namedtuple( + 'Node', + ['uuid', 'driver', 'conductor_group'] + list(predicate_extra_fields) + ) + + # Accepting a conductor manager is a bit of an edge case, doing a bit of + # a signature magic to avoid passing it everywhere. + accepts_manager = (predicate is not None + and len(inspect.signature(predicate).parameters) > 1) + + def decorator(func): + @periodic(spacing=spacing, enabled=enabled) + @functools.wraps(func) + def wrapper(self, *args, **kwargs): + # Make it work with both drivers and the conductor manager + if isinstance(self, base_manager.BaseConductorManager): + manager = self + context = args[0] + else: + manager = args[0] + context = args[1] + + if callable(limit): + local_limit = limit() + else: + local_limit = limit + assert local_limit is None or local_limit > 0 + + nodes = manager.iter_nodes(filters=filters, + fields=predicate_extra_fields) + for (node_uuid, *other) in nodes: + if predicate is not None: + node = node_type(node_uuid, *other) + if accepts_manager: + result = predicate(node, manager) + else: + result = predicate(node) + if not result: + continue + + try: + with task_manager.acquire(context, node_uuid, + purpose=purpose, + shared=shared_task) as task: + result = func(self, task, *args, **kwargs) + except exception.NodeNotFound: + LOG.info("During %(action)s, node %(node)s was not found " + "and presumed deleted by another process.", + {'node': node_uuid, 'action': purpose}) + except exception.NodeLocked: + LOG.info("During %(action)s, node %(node)s was already " + "locked by another process. Skip.", + {'node': node_uuid, 'action': purpose}) + except Stop: + break + finally: + # Yield on every iteration + eventlet.sleep(0) + + if (local_limit is not None + and (result is None or result)): + local_limit -= 1 + if not local_limit: + return + + return wrapper + + return decorator |