summaryrefslogtreecommitdiff
path: root/ironic/conductor
diff options
context:
space:
mode:
authorZuul <zuul@review.opendev.org>2021-10-14 12:20:54 +0000
committerGerrit Code Review <review@openstack.org>2021-10-14 12:20:54 +0000
commitb93dfd54c206c4bfd0584251166dd2d03b8b0f8d (patch)
tree8d2ce5321c0aeb2acb6bb8404d6fd4bed40734a1 /ironic/conductor
parent33cc04706ff2171a4090695afc45b90b47d32237 (diff)
parentcf1b42ea3d35d51d327e3aff0a05a9d402af0e15 (diff)
downloadironic-b93dfd54c206c4bfd0584251166dd2d03b8b0f8d.tar.gz
Merge "Add a helper for node-based periodics"
Diffstat (limited to 'ironic/conductor')
-rw-r--r--ironic/conductor/manager.py157
-rw-r--r--ironic/conductor/periodics.py151
2 files changed, 212 insertions, 96 deletions
diff --git a/ironic/conductor/manager.py b/ironic/conductor/manager.py
index ed54aa67b..4c49bc789 100644
--- a/ironic/conductor/manager.py
+++ b/ironic/conductor/manager.py
@@ -45,7 +45,6 @@ import datetime
import queue
import eventlet
-from futurist import periodics
from futurist import waiters
from ironic_lib import metrics_utils
from oslo_log import log
@@ -66,6 +65,7 @@ from ironic.conductor import base_manager
from ironic.conductor import cleaning
from ironic.conductor import deployments
from ironic.conductor import notification_utils as notify_utils
+from ironic.conductor import periodics
from ironic.conductor import steps as conductor_steps
from ironic.conductor import task_manager
from ironic.conductor import utils
@@ -1497,10 +1497,15 @@ class ConductorManager(base_manager.BaseConductorManager):
eventlet.sleep(0)
@METRICS.timer('ConductorManager._power_failure_recovery')
- @periodics.periodic(spacing=CONF.conductor.power_failure_recovery_interval,
- enabled=bool(
- CONF.conductor.power_failure_recovery_interval))
- def _power_failure_recovery(self, context):
+ @periodics.node_periodic(
+ purpose='power failure recovery',
+ spacing=CONF.conductor.power_failure_recovery_interval,
+ # NOTE(kaifeng) To avoid conflicts with periodic task of the
+ # regular power state checking, maintenance is still a required
+ # condition.
+ filters={'maintenance': True, 'fault': faults.POWER_FAILURE},
+ )
+ def _power_failure_recovery(self, task, context):
"""Periodic task to check power states for nodes in maintenance.
Attempt to grab a lock and sync only if the following
@@ -1511,19 +1516,6 @@ class ConductorManager(base_manager.BaseConductorManager):
3) Node is not reserved.
4) Node is not in the ENROLL state.
"""
- def should_sync_power_state_for_recovery(task):
- """Check if ironic should sync power state for recovery."""
-
- # NOTE(dtantsur): it's also pointless (and dangerous) to
- # sync power state when a power action is in progress
- if (task.node.provision_state == states.ENROLL
- or not task.node.maintenance
- or task.node.fault != faults.POWER_FAILURE
- or task.node.target_power_state
- or task.node.reservation):
- return False
- return True
-
def handle_recovery(task, actual_power_state):
"""Handle recovery when power sync is succeeded."""
task.upgrade_lock()
@@ -1546,48 +1538,33 @@ class ConductorManager(base_manager.BaseConductorManager):
notify_utils.emit_power_state_corrected_notification(
task, old_power_state)
- # NOTE(kaifeng) To avoid conflicts with periodic task of the
- # regular power state checking, maintenance is still a required
- # condition.
- filters = {'maintenance': True,
- 'fault': faults.POWER_FAILURE}
- node_iter = self.iter_nodes(fields=['id'], filters=filters)
- for (node_uuid, driver, conductor_group, node_id) in node_iter:
- try:
- with task_manager.acquire(context, node_uuid,
- purpose='power failure recovery',
- shared=True) as task:
- if not should_sync_power_state_for_recovery(task):
- continue
- try:
- # Validate driver info in case of parameter changed
- # in maintenance.
- task.driver.power.validate(task)
- # The driver may raise an exception, or may return
- # ERROR. Handle both the same way.
- power_state = task.driver.power.get_power_state(task)
- if power_state == states.ERROR:
- raise exception.PowerStateFailure(
- _("Power driver returned ERROR state "
- "while trying to get power state."))
- except Exception as e:
- LOG.debug("During power_failure_recovery, could "
- "not get power state for node %(node)s, "
- "Error: %(err)s.",
- {'node': task.node.uuid, 'err': e})
- else:
- handle_recovery(task, power_state)
- except exception.NodeNotFound:
- LOG.info("During power_failure_recovery, node %(node)s was "
- "not found and presumed deleted by another process.",
- {'node': node_uuid})
- except exception.NodeLocked:
- LOG.info("During power_failure_recovery, node %(node)s was "
- "already locked by another process. Skip.",
- {'node': node_uuid})
- finally:
- # Yield on every iteration
- eventlet.sleep(0)
+ # NOTE(dtantsur): it's also pointless (and dangerous) to
+ # sync power state when a power action is in progress
+ if (task.node.provision_state == states.ENROLL
+ or not task.node.maintenance
+ or task.node.fault != faults.POWER_FAILURE
+ or task.node.target_power_state
+ or task.node.reservation):
+ return
+
+ try:
+ # Validate driver info in case of parameter changed
+ # in maintenance.
+ task.driver.power.validate(task)
+ # The driver may raise an exception, or may return
+ # ERROR. Handle both the same way.
+ power_state = task.driver.power.get_power_state(task)
+ if power_state == states.ERROR:
+ raise exception.PowerStateFailure(
+ _("Power driver returned ERROR state "
+ "while trying to get power state."))
+ except Exception as e:
+ LOG.debug("During power_failure_recovery, could "
+ "not get power state for node %(node)s, "
+ "Error: %(err)s.",
+ {'node': task.node.uuid, 'err': e})
+ else:
+ handle_recovery(task, power_state)
@METRICS.timer('ConductorManager._check_deploy_timeouts')
@periodics.periodic(
@@ -1869,9 +1846,17 @@ class ConductorManager(base_manager.BaseConductorManager):
)
@METRICS.timer('ConductorManager._sync_local_state')
- @periodics.periodic(spacing=CONF.conductor.sync_local_state_interval,
- enabled=CONF.conductor.sync_local_state_interval > 0)
- def _sync_local_state(self, context):
+ @periodics.node_periodic(
+ purpose='node take over',
+ spacing=CONF.conductor.sync_local_state_interval,
+ filters={'reserved': False, 'maintenance': False,
+ 'provision_state': states.ACTIVE},
+ predicate_extra_fields=['conductor_affinity'],
+ predicate=lambda n, m: n.conductor_affinity != m.conductor.id,
+ limit=lambda: CONF.conductor.periodic_max_workers,
+ shared_task=False,
+ )
+ def _sync_local_state(self, task, context):
"""Perform any actions necessary to sync local state.
This is called periodically to refresh the conductor's copy of the
@@ -1880,40 +1865,20 @@ class ConductorManager(base_manager.BaseConductorManager):
The ensuing actions could include preparing a PXE environment,
updating the DHCP server, and so on.
"""
- filters = {'reserved': False,
- 'maintenance': False,
- 'provision_state': states.ACTIVE}
- node_iter = self.iter_nodes(fields=['id', 'conductor_affinity'],
- filters=filters)
-
- workers_count = 0
- for (node_uuid, driver, conductor_group, node_id,
- conductor_affinity) in node_iter:
- if conductor_affinity == self.conductor.id:
- continue
-
- # Node is mapped here, but not updated by this conductor last
- try:
- with task_manager.acquire(context, node_uuid,
- purpose='node take over') as task:
- # NOTE(tenbrae): now that we have the lock, check again to
- # avoid racing with deletes and other state changes
- node = task.node
- if (node.maintenance
- or node.conductor_affinity == self.conductor.id
- or node.provision_state != states.ACTIVE):
- continue
-
- task.spawn_after(self._spawn_worker,
- self._do_takeover, task)
+ # NOTE(tenbrae): now that we have the lock, check again to
+ # avoid racing with deletes and other state changes
+ node = task.node
+ if (node.maintenance
+ or node.conductor_affinity == self.conductor.id
+ or node.provision_state != states.ACTIVE):
+ return False
- except exception.NoFreeConductorWorker:
- break
- except (exception.NodeLocked, exception.NodeNotFound):
- continue
- workers_count += 1
- if workers_count == CONF.conductor.periodic_max_workers:
- break
+ try:
+ task.spawn_after(self._spawn_worker, self._do_takeover, task)
+ except exception.NoFreeConductorWorker:
+ raise periodics.Stop()
+ else:
+ return True
@METRICS.timer('ConductorManager.validate_driver_interfaces')
@messaging.expected_exceptions(exception.NodeLocked)
diff --git a/ironic/conductor/periodics.py b/ironic/conductor/periodics.py
new file mode 100644
index 000000000..ead5cbf08
--- /dev/null
+++ b/ironic/conductor/periodics.py
@@ -0,0 +1,151 @@
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+"""Conductor periodics."""
+
+import collections
+import functools
+import inspect
+
+import eventlet
+from futurist import periodics
+from oslo_log import log
+
+from ironic.common import exception
+from ironic.conductor import base_manager
+from ironic.conductor import task_manager
+
+
+LOG = log.getLogger(__name__)
+
+
+def periodic(spacing, enabled=True, **kwargs):
+ """A decorator to define a periodic task.
+
+ :param spacing: how often (in seconds) to run the periodic task.
+ :param enabled: whether the task is enabled; defaults to ``spacing > 0``.
+ """
+ return periodics.periodic(spacing=spacing,
+ enabled=enabled and spacing > 0,
+ **kwargs)
+
+
+class Stop(Exception):
+ """A signal to stop the current iteration of a periodic task."""
+
+
+def node_periodic(purpose, spacing, enabled=True, filters=None,
+ predicate=None, predicate_extra_fields=(), limit=None,
+ shared_task=True):
+ """A decorator to define a periodic task to act on nodes.
+
+ Defines a periodic task that fetches the list of nodes mapped to the
+ current conductor which satisfy the provided filters.
+
+ The decorated function must be a method on either the conductor manager
+ or a hardware interface. The signature is:
+
+ * for conductor manager: ``(self, task, context)``
+ * for hardware interfaces: ``(self, task, manager, context)``.
+
+ ``NodeNotFound`` and ``NodeLocked`` exceptions are ignored. Raise ``Stop``
+ to abort the current iteration of the task and reschedule it.
+
+ :param purpose: a human-readable description of the activity, e.g.
+ "verifying that the cat is purring".
+ :param spacing: how often (in seconds) to run the periodic task.
+ :param enabled: whether the task is enabled; defaults to ``spacing > 0``.
+ :param filters: database-level filters for the nodes.
+ :param predicate: a callable to run on the fetched nodes *before* creating
+ a task for them. The only parameter will be a named tuple with fields
+ ``uuid``, ``driver``, ``conductor_group`` plus everything from
+ ``predicate_extra_fields``. If the callable accepts a 2nd parameter,
+ it will be the conductor manager instance.
+ :param predicate_extra_fields: extra fields to fetch on the initial
+ request and pass into the ``predicate``. Must not contain ``uuid``,
+ ``driver`` and ``conductor_group`` since they are always included.
+ :param limit: how many nodes to process before stopping the current
+ iteration. If ``predicate`` returns ``False``, the node is not counted.
+ If the decorated function returns ``False``, the node is not counted
+ either. Can be a callable, in which case it will be called on each
+ iteration to determine the limit.
+ :param shared_task: if ``True``, the task will have a shared lock. It is
+ recommended to start with a shared lock and upgrade it only if needed.
+ """
+ node_type = collections.namedtuple(
+ 'Node',
+ ['uuid', 'driver', 'conductor_group'] + list(predicate_extra_fields)
+ )
+
+ # Accepting a conductor manager is a bit of an edge case, doing a bit of
+ # a signature magic to avoid passing it everywhere.
+ accepts_manager = (predicate is not None
+ and len(inspect.signature(predicate).parameters) > 1)
+
+ def decorator(func):
+ @periodic(spacing=spacing, enabled=enabled)
+ @functools.wraps(func)
+ def wrapper(self, *args, **kwargs):
+ # Make it work with both drivers and the conductor manager
+ if isinstance(self, base_manager.BaseConductorManager):
+ manager = self
+ context = args[0]
+ else:
+ manager = args[0]
+ context = args[1]
+
+ if callable(limit):
+ local_limit = limit()
+ else:
+ local_limit = limit
+ assert local_limit is None or local_limit > 0
+
+ nodes = manager.iter_nodes(filters=filters,
+ fields=predicate_extra_fields)
+ for (node_uuid, *other) in nodes:
+ if predicate is not None:
+ node = node_type(node_uuid, *other)
+ if accepts_manager:
+ result = predicate(node, manager)
+ else:
+ result = predicate(node)
+ if not result:
+ continue
+
+ try:
+ with task_manager.acquire(context, node_uuid,
+ purpose=purpose,
+ shared=shared_task) as task:
+ result = func(self, task, *args, **kwargs)
+ except exception.NodeNotFound:
+ LOG.info("During %(action)s, node %(node)s was not found "
+ "and presumed deleted by another process.",
+ {'node': node_uuid, 'action': purpose})
+ except exception.NodeLocked:
+ LOG.info("During %(action)s, node %(node)s was already "
+ "locked by another process. Skip.",
+ {'node': node_uuid, 'action': purpose})
+ except Stop:
+ break
+ finally:
+ # Yield on every iteration
+ eventlet.sleep(0)
+
+ if (local_limit is not None
+ and (result is None or result)):
+ local_limit -= 1
+ if not local_limit:
+ return
+
+ return wrapper
+
+ return decorator