summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDmitry Tantsur <dtantsur@protonmail.com>2021-10-05 14:35:22 +0200
committerDmitry Tantsur <dtantsur@protonmail.com>2021-10-11 17:26:06 +0200
commitcf1b42ea3d35d51d327e3aff0a05a9d402af0e15 (patch)
treedf1092433ce1ef8be0cef8dec8b5dd6c84f67976
parent7f9badb5437631fdea0f31b85aee3373f7bbb985 (diff)
downloadironic-cf1b42ea3d35d51d327e3aff0a05a9d402af0e15.tar.gz
Add a helper for node-based periodics
We have a very common pattern of periodic tasks that use iter_nodes to fetch some nodes, check them, create a task and conductor some operation. This change introduces a helper decorator for that and migrates the drivers to it. I'm intentionally leaving unit tests intact to demonstrate that the new decorator works exactly the same way (modulo cosmetic changes) as the previous hand-written code. Change-Id: Ifed4a457275d9451cc412dc80f3c09df72f50492 Story: #2009203 Task: #43522
-rw-r--r--doc/source/contributor/architecture.rst7
-rw-r--r--doc/source/contributor/deploy-steps.rst89
-rw-r--r--ironic/conductor/manager.py157
-rw-r--r--ironic/conductor/periodics.py151
-rw-r--r--ironic/drivers/modules/drac/bios.py64
-rw-r--r--ironic/drivers/modules/drac/management.py53
-rw-r--r--ironic/drivers/modules/drac/raid.py95
-rw-r--r--ironic/drivers/modules/inspector.py23
-rw-r--r--ironic/drivers/modules/irmc/raid.py132
-rw-r--r--ironic/drivers/modules/pxe_base.py33
-rw-r--r--ironic/drivers/modules/redfish/management.py116
-rw-r--r--ironic/drivers/modules/redfish/raid.py115
-rw-r--r--ironic/tests/unit/conductor/test_manager.py12
-rw-r--r--ironic/tests/unit/conductor/test_periodics.py135
-rw-r--r--ironic/tests/unit/drivers/modules/drac/test_management.py5
-rw-r--r--ironic/tests/unit/drivers/modules/drac/test_raid.py5
-rw-r--r--ironic/tests/unit/drivers/modules/irmc/test_periodic_task.py3
-rw-r--r--ironic/tests/unit/drivers/modules/redfish/test_management.py9
18 files changed, 665 insertions, 539 deletions
diff --git a/doc/source/contributor/architecture.rst b/doc/source/contributor/architecture.rst
index e246fd28b..84e58ee1a 100644
--- a/doc/source/contributor/architecture.rst
+++ b/doc/source/contributor/architecture.rst
@@ -42,7 +42,7 @@ Drivers may run their own periodic tasks, i.e. actions run repeatedly after
a certain amount of time. Such a task is created by using the periodic_
decorator on an interface method. For example
-::
+.. code-block:: python
from futurist import periodics
@@ -55,6 +55,11 @@ decorator on an interface method. For example
Here the ``spacing`` argument is a period in seconds for a given periodic task.
For example 'spacing=5' means every 5 seconds.
+Starting with the Yoga cycle, there is also a new decorator
+:py:func:`ironic.conductor.periodics.node_periodic` to create periodic tasks
+that handle nodes. See :ref:`deploy steps documentation <deploy-steps-polling>`
+for an example.
+
Driver-Specific Steps
---------------------
diff --git a/doc/source/contributor/deploy-steps.rst b/doc/source/contributor/deploy-steps.rst
index e6407d41e..a6cd6809d 100644
--- a/doc/source/contributor/deploy-steps.rst
+++ b/doc/source/contributor/deploy-steps.rst
@@ -188,6 +188,95 @@ following pattern:
return deploy_utils.reboot_to_finish_step(task)
+.. _deploy-steps-polling:
+
+Polling for completion
+~~~~~~~~~~~~~~~~~~~~~~~
+
+Finally, you may want to poll the BMC until the operation is complete. Often
+enough, this also involves a reboot. In this case you can use the
+:py:func:`ironic.conductor.periodics.node_periodic` decorator to create a
+periodic task that operates on relevant nodes:
+
+.. code-block:: python
+
+ from ironic.common import states
+ from ironic.common import utils
+ from ironic.conductor import periodics
+ from ironic.drivers import base
+ from ironic.drivers.modules import deploy_utils
+
+ _STATUS_CHECK_INTERVAL = ... # better use a configuration option
+
+ class MyManagement(base.ManagementInterface):
+ ...
+
+ @base.clean_step(priority=0)
+ def my_action(self, task):
+ ...
+
+ reboot_required = ... # your step may or may not need rebooting
+
+ # Make this node as running my_action. Often enough you will store
+ # some useful data rather than a boolean flag.
+ utils.set_node_nested_field(task.node, 'driver_internal_info',
+ 'in_my_action', True)
+
+ # Tell ironic that...
+ deploy_utils.set_async_step_flags(
+ node,
+ # ... we're waiting for IPA to come back after reboot
+ reboot=reboot_required,
+ # ... the current step shouldn't be entered again
+ skip_current_step=True,
+ # ... we'll be polling until the step is done
+ polling=True)
+
+ if reboot_required:
+ return deploy_utils.reboot_to_finish_step(task)
+
+ @periodics.node_periodic(
+ purpose='checking my action status',
+ spacing=_STATUS_CHECK_INTERVAL,
+ filters={
+ # Skip nodes that already have a lock
+ 'reserved': False,
+ # Only consider nodes that are waiting for cleaning or failed
+ # on timeout.
+ 'provision_state_in': [states.CLEANWAIT, states.CLEANFAIL],
+ },
+ # Load driver_internal_info from the database on listing
+ predicate_extra_fields=['driver_internal_info'],
+ # Only consider nodes with in_my_action
+ predicate=lambda n: n.driver_internal_info.get('in_my_action'),
+ )
+ def check_my_action(self, task, manager, context):
+ # Double-check that the node is managed by this interface
+ if not isinstance(task.driver.management, MyManagement):
+ return
+
+ if not needs_actions(): # insert your checks here
+ return
+
+ task.upgrade_lock()
+
+ ... # do any required updates
+
+ # Drop the flag so that this node is no longer considered
+ utils.pop_node_nested_field(task.node, 'driver_internal_info',
+ 'in_my_action')
+
+Note that creating a ``task`` involves an additional database query, so you
+want to avoid creating them for too many nodes in your periodic tasks. Instead:
+
+* Try to use precise ``filters`` to filter out nodes on the database level.
+ Using ``reserved`` and ``provision_state``/``provision_state_in`` are
+ recommended in most cases. See
+ :py:meth:`ironic.db.api.Connection.get_nodeinfo_list` for a list of possible
+ filters.
+* Use ``predicate`` to filter on complex fields such as
+ ``driver_internal_info``. Predicates are checked before tasks are created.
+
Implementing RAID
-----------------
diff --git a/ironic/conductor/manager.py b/ironic/conductor/manager.py
index ed54aa67b..4c49bc789 100644
--- a/ironic/conductor/manager.py
+++ b/ironic/conductor/manager.py
@@ -45,7 +45,6 @@ import datetime
import queue
import eventlet
-from futurist import periodics
from futurist import waiters
from ironic_lib import metrics_utils
from oslo_log import log
@@ -66,6 +65,7 @@ from ironic.conductor import base_manager
from ironic.conductor import cleaning
from ironic.conductor import deployments
from ironic.conductor import notification_utils as notify_utils
+from ironic.conductor import periodics
from ironic.conductor import steps as conductor_steps
from ironic.conductor import task_manager
from ironic.conductor import utils
@@ -1497,10 +1497,15 @@ class ConductorManager(base_manager.BaseConductorManager):
eventlet.sleep(0)
@METRICS.timer('ConductorManager._power_failure_recovery')
- @periodics.periodic(spacing=CONF.conductor.power_failure_recovery_interval,
- enabled=bool(
- CONF.conductor.power_failure_recovery_interval))
- def _power_failure_recovery(self, context):
+ @periodics.node_periodic(
+ purpose='power failure recovery',
+ spacing=CONF.conductor.power_failure_recovery_interval,
+ # NOTE(kaifeng) To avoid conflicts with periodic task of the
+ # regular power state checking, maintenance is still a required
+ # condition.
+ filters={'maintenance': True, 'fault': faults.POWER_FAILURE},
+ )
+ def _power_failure_recovery(self, task, context):
"""Periodic task to check power states for nodes in maintenance.
Attempt to grab a lock and sync only if the following
@@ -1511,19 +1516,6 @@ class ConductorManager(base_manager.BaseConductorManager):
3) Node is not reserved.
4) Node is not in the ENROLL state.
"""
- def should_sync_power_state_for_recovery(task):
- """Check if ironic should sync power state for recovery."""
-
- # NOTE(dtantsur): it's also pointless (and dangerous) to
- # sync power state when a power action is in progress
- if (task.node.provision_state == states.ENROLL
- or not task.node.maintenance
- or task.node.fault != faults.POWER_FAILURE
- or task.node.target_power_state
- or task.node.reservation):
- return False
- return True
-
def handle_recovery(task, actual_power_state):
"""Handle recovery when power sync is succeeded."""
task.upgrade_lock()
@@ -1546,48 +1538,33 @@ class ConductorManager(base_manager.BaseConductorManager):
notify_utils.emit_power_state_corrected_notification(
task, old_power_state)
- # NOTE(kaifeng) To avoid conflicts with periodic task of the
- # regular power state checking, maintenance is still a required
- # condition.
- filters = {'maintenance': True,
- 'fault': faults.POWER_FAILURE}
- node_iter = self.iter_nodes(fields=['id'], filters=filters)
- for (node_uuid, driver, conductor_group, node_id) in node_iter:
- try:
- with task_manager.acquire(context, node_uuid,
- purpose='power failure recovery',
- shared=True) as task:
- if not should_sync_power_state_for_recovery(task):
- continue
- try:
- # Validate driver info in case of parameter changed
- # in maintenance.
- task.driver.power.validate(task)
- # The driver may raise an exception, or may return
- # ERROR. Handle both the same way.
- power_state = task.driver.power.get_power_state(task)
- if power_state == states.ERROR:
- raise exception.PowerStateFailure(
- _("Power driver returned ERROR state "
- "while trying to get power state."))
- except Exception as e:
- LOG.debug("During power_failure_recovery, could "
- "not get power state for node %(node)s, "
- "Error: %(err)s.",
- {'node': task.node.uuid, 'err': e})
- else:
- handle_recovery(task, power_state)
- except exception.NodeNotFound:
- LOG.info("During power_failure_recovery, node %(node)s was "
- "not found and presumed deleted by another process.",
- {'node': node_uuid})
- except exception.NodeLocked:
- LOG.info("During power_failure_recovery, node %(node)s was "
- "already locked by another process. Skip.",
- {'node': node_uuid})
- finally:
- # Yield on every iteration
- eventlet.sleep(0)
+ # NOTE(dtantsur): it's also pointless (and dangerous) to
+ # sync power state when a power action is in progress
+ if (task.node.provision_state == states.ENROLL
+ or not task.node.maintenance
+ or task.node.fault != faults.POWER_FAILURE
+ or task.node.target_power_state
+ or task.node.reservation):
+ return
+
+ try:
+ # Validate driver info in case of parameter changed
+ # in maintenance.
+ task.driver.power.validate(task)
+ # The driver may raise an exception, or may return
+ # ERROR. Handle both the same way.
+ power_state = task.driver.power.get_power_state(task)
+ if power_state == states.ERROR:
+ raise exception.PowerStateFailure(
+ _("Power driver returned ERROR state "
+ "while trying to get power state."))
+ except Exception as e:
+ LOG.debug("During power_failure_recovery, could "
+ "not get power state for node %(node)s, "
+ "Error: %(err)s.",
+ {'node': task.node.uuid, 'err': e})
+ else:
+ handle_recovery(task, power_state)
@METRICS.timer('ConductorManager._check_deploy_timeouts')
@periodics.periodic(
@@ -1869,9 +1846,17 @@ class ConductorManager(base_manager.BaseConductorManager):
)
@METRICS.timer('ConductorManager._sync_local_state')
- @periodics.periodic(spacing=CONF.conductor.sync_local_state_interval,
- enabled=CONF.conductor.sync_local_state_interval > 0)
- def _sync_local_state(self, context):
+ @periodics.node_periodic(
+ purpose='node take over',
+ spacing=CONF.conductor.sync_local_state_interval,
+ filters={'reserved': False, 'maintenance': False,
+ 'provision_state': states.ACTIVE},
+ predicate_extra_fields=['conductor_affinity'],
+ predicate=lambda n, m: n.conductor_affinity != m.conductor.id,
+ limit=lambda: CONF.conductor.periodic_max_workers,
+ shared_task=False,
+ )
+ def _sync_local_state(self, task, context):
"""Perform any actions necessary to sync local state.
This is called periodically to refresh the conductor's copy of the
@@ -1880,40 +1865,20 @@ class ConductorManager(base_manager.BaseConductorManager):
The ensuing actions could include preparing a PXE environment,
updating the DHCP server, and so on.
"""
- filters = {'reserved': False,
- 'maintenance': False,
- 'provision_state': states.ACTIVE}
- node_iter = self.iter_nodes(fields=['id', 'conductor_affinity'],
- filters=filters)
-
- workers_count = 0
- for (node_uuid, driver, conductor_group, node_id,
- conductor_affinity) in node_iter:
- if conductor_affinity == self.conductor.id:
- continue
-
- # Node is mapped here, but not updated by this conductor last
- try:
- with task_manager.acquire(context, node_uuid,
- purpose='node take over') as task:
- # NOTE(tenbrae): now that we have the lock, check again to
- # avoid racing with deletes and other state changes
- node = task.node
- if (node.maintenance
- or node.conductor_affinity == self.conductor.id
- or node.provision_state != states.ACTIVE):
- continue
-
- task.spawn_after(self._spawn_worker,
- self._do_takeover, task)
+ # NOTE(tenbrae): now that we have the lock, check again to
+ # avoid racing with deletes and other state changes
+ node = task.node
+ if (node.maintenance
+ or node.conductor_affinity == self.conductor.id
+ or node.provision_state != states.ACTIVE):
+ return False
- except exception.NoFreeConductorWorker:
- break
- except (exception.NodeLocked, exception.NodeNotFound):
- continue
- workers_count += 1
- if workers_count == CONF.conductor.periodic_max_workers:
- break
+ try:
+ task.spawn_after(self._spawn_worker, self._do_takeover, task)
+ except exception.NoFreeConductorWorker:
+ raise periodics.Stop()
+ else:
+ return True
@METRICS.timer('ConductorManager.validate_driver_interfaces')
@messaging.expected_exceptions(exception.NodeLocked)
diff --git a/ironic/conductor/periodics.py b/ironic/conductor/periodics.py
new file mode 100644
index 000000000..ead5cbf08
--- /dev/null
+++ b/ironic/conductor/periodics.py
@@ -0,0 +1,151 @@
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+"""Conductor periodics."""
+
+import collections
+import functools
+import inspect
+
+import eventlet
+from futurist import periodics
+from oslo_log import log
+
+from ironic.common import exception
+from ironic.conductor import base_manager
+from ironic.conductor import task_manager
+
+
+LOG = log.getLogger(__name__)
+
+
+def periodic(spacing, enabled=True, **kwargs):
+ """A decorator to define a periodic task.
+
+ :param spacing: how often (in seconds) to run the periodic task.
+ :param enabled: whether the task is enabled; defaults to ``spacing > 0``.
+ """
+ return periodics.periodic(spacing=spacing,
+ enabled=enabled and spacing > 0,
+ **kwargs)
+
+
+class Stop(Exception):
+ """A signal to stop the current iteration of a periodic task."""
+
+
+def node_periodic(purpose, spacing, enabled=True, filters=None,
+ predicate=None, predicate_extra_fields=(), limit=None,
+ shared_task=True):
+ """A decorator to define a periodic task to act on nodes.
+
+ Defines a periodic task that fetches the list of nodes mapped to the
+ current conductor which satisfy the provided filters.
+
+ The decorated function must be a method on either the conductor manager
+ or a hardware interface. The signature is:
+
+ * for conductor manager: ``(self, task, context)``
+ * for hardware interfaces: ``(self, task, manager, context)``.
+
+ ``NodeNotFound`` and ``NodeLocked`` exceptions are ignored. Raise ``Stop``
+ to abort the current iteration of the task and reschedule it.
+
+ :param purpose: a human-readable description of the activity, e.g.
+ "verifying that the cat is purring".
+ :param spacing: how often (in seconds) to run the periodic task.
+ :param enabled: whether the task is enabled; defaults to ``spacing > 0``.
+ :param filters: database-level filters for the nodes.
+ :param predicate: a callable to run on the fetched nodes *before* creating
+ a task for them. The only parameter will be a named tuple with fields
+ ``uuid``, ``driver``, ``conductor_group`` plus everything from
+ ``predicate_extra_fields``. If the callable accepts a 2nd parameter,
+ it will be the conductor manager instance.
+ :param predicate_extra_fields: extra fields to fetch on the initial
+ request and pass into the ``predicate``. Must not contain ``uuid``,
+ ``driver`` and ``conductor_group`` since they are always included.
+ :param limit: how many nodes to process before stopping the current
+ iteration. If ``predicate`` returns ``False``, the node is not counted.
+ If the decorated function returns ``False``, the node is not counted
+ either. Can be a callable, in which case it will be called on each
+ iteration to determine the limit.
+ :param shared_task: if ``True``, the task will have a shared lock. It is
+ recommended to start with a shared lock and upgrade it only if needed.
+ """
+ node_type = collections.namedtuple(
+ 'Node',
+ ['uuid', 'driver', 'conductor_group'] + list(predicate_extra_fields)
+ )
+
+ # Accepting a conductor manager is a bit of an edge case, doing a bit of
+ # a signature magic to avoid passing it everywhere.
+ accepts_manager = (predicate is not None
+ and len(inspect.signature(predicate).parameters) > 1)
+
+ def decorator(func):
+ @periodic(spacing=spacing, enabled=enabled)
+ @functools.wraps(func)
+ def wrapper(self, *args, **kwargs):
+ # Make it work with both drivers and the conductor manager
+ if isinstance(self, base_manager.BaseConductorManager):
+ manager = self
+ context = args[0]
+ else:
+ manager = args[0]
+ context = args[1]
+
+ if callable(limit):
+ local_limit = limit()
+ else:
+ local_limit = limit
+ assert local_limit is None or local_limit > 0
+
+ nodes = manager.iter_nodes(filters=filters,
+ fields=predicate_extra_fields)
+ for (node_uuid, *other) in nodes:
+ if predicate is not None:
+ node = node_type(node_uuid, *other)
+ if accepts_manager:
+ result = predicate(node, manager)
+ else:
+ result = predicate(node)
+ if not result:
+ continue
+
+ try:
+ with task_manager.acquire(context, node_uuid,
+ purpose=purpose,
+ shared=shared_task) as task:
+ result = func(self, task, *args, **kwargs)
+ except exception.NodeNotFound:
+ LOG.info("During %(action)s, node %(node)s was not found "
+ "and presumed deleted by another process.",
+ {'node': node_uuid, 'action': purpose})
+ except exception.NodeLocked:
+ LOG.info("During %(action)s, node %(node)s was already "
+ "locked by another process. Skip.",
+ {'node': node_uuid, 'action': purpose})
+ except Stop:
+ break
+ finally:
+ # Yield on every iteration
+ eventlet.sleep(0)
+
+ if (local_limit is not None
+ and (result is None or result)):
+ local_limit -= 1
+ if not local_limit:
+ return
+
+ return wrapper
+
+ return decorator
diff --git a/ironic/drivers/modules/drac/bios.py b/ironic/drivers/modules/drac/bios.py
index e40089a4f..795e4f150 100644
--- a/ironic/drivers/modules/drac/bios.py
+++ b/ironic/drivers/modules/drac/bios.py
@@ -15,7 +15,6 @@
DRAC BIOS configuration specific methods
"""
-from futurist import periodics
from ironic_lib import metrics_utils
from oslo_log import log as logging
from oslo_utils import importutils
@@ -23,7 +22,7 @@ from oslo_utils import timeutils
from ironic.common import exception
from ironic.common.i18n import _
-from ironic.conductor import task_manager
+from ironic.conductor import periodics
from ironic.conductor import utils as manager_utils
from ironic.conf import CONF
from ironic.drivers import base
@@ -151,9 +150,16 @@ class DracWSManBIOS(base.BIOSInterface):
# spacing since BIOS jobs could be comparatively shorter in time than
# RAID ones currently using the raid spacing to avoid errors
# spacing parameter for periodic method
- @periodics.periodic(
- spacing=CONF.drac.query_raid_config_job_status_interval)
- def _query_bios_config_job_status(self, manager, context):
+ @periodics.node_periodic(
+ purpose='checking async bios configuration jobs',
+ spacing=CONF.drac.query_raid_config_job_status_interval,
+ filters={'reserved': False, 'maintenance': False},
+ predicate_extra_fields=['driver_internal_info'],
+ predicate=lambda n: (
+ n.driver_internal_info.get('bios_config_job_ids')
+ or n.driver_internal_info.get('factory_reset_time_before_reboot')),
+ )
+ def _query_bios_config_job_status(self, task, manager, context):
"""Periodic task to check the progress of running BIOS config jobs.
:param manager: an instance of Ironic Conductor Manager with
@@ -161,47 +167,17 @@ class DracWSManBIOS(base.BIOSInterface):
:param context: context of the request, needed when acquiring
a lock on a node. For access control.
"""
+ # skip a node not being managed by idrac driver
+ if not isinstance(task.driver.bios, DracWSManBIOS):
+ return
- filters = {'reserved': False, 'maintenance': False}
- fields = ['driver_internal_info']
+ # check bios_config_job_id exist & checks job is completed
+ if task.node.driver_internal_info.get("bios_config_job_ids"):
+ self._check_node_bios_jobs(task)
- node_list = manager.iter_nodes(fields=fields, filters=filters)
- for (node_uuid, driver, conductor_group,
- driver_internal_info) in node_list:
- try:
- # NOTE(TheJulia) Evaluate if work is actually required before
- # creating a task for every node in the deployment which does
- # not have a lock and is not in maintenance mode.
- if (not driver_internal_info.get("bios_config_job_ids")
- and not driver_internal_info.get(
- "factory_reset_time_before_reboot")):
- continue
-
- lock_purpose = 'checking async bios configuration jobs'
- # Performing read-only/non-destructive work with shared lock
- with task_manager.acquire(context, node_uuid,
- purpose=lock_purpose,
- shared=True) as task:
- # skip a node not being managed by idrac driver
- if not isinstance(task.driver.bios, DracWSManBIOS):
- continue
-
- # check bios_config_job_id exist & checks job is completed
- if driver_internal_info.get("bios_config_job_ids"):
- self._check_node_bios_jobs(task)
-
- if driver_internal_info.get(
- "factory_reset_time_before_reboot"):
- self._check_last_system_inventory_changed(task)
-
- except exception.NodeNotFound:
- LOG.info("During query_bios_config_job_status, node "
- "%(node)s was not found and presumed deleted by "
- "another process.", {'node': node_uuid})
- except exception.NodeLocked:
- LOG.info("During query_bios_config_job_status, node "
- "%(node)s was already locked by another process. "
- "Skip.", {'node': node_uuid})
+ if task.node.driver_internal_info.get(
+ "factory_reset_time_before_reboot"):
+ self._check_last_system_inventory_changed(task)
def _check_last_system_inventory_changed(self, task):
"""Check the progress of last system inventory time of a node.
diff --git a/ironic/drivers/modules/drac/management.py b/ironic/drivers/modules/drac/management.py
index dd614b42a..9b2b53431 100644
--- a/ironic/drivers/modules/drac/management.py
+++ b/ironic/drivers/modules/drac/management.py
@@ -23,7 +23,6 @@ DRAC management interface
import json
import time
-from futurist import periodics
from ironic_lib import metrics_utils
import jsonschema
from jsonschema import exceptions as json_schema_exc
@@ -34,6 +33,7 @@ from ironic.common import boot_devices
from ironic.common import exception
from ironic.common.i18n import _
from ironic.common import molds
+from ironic.conductor import periodics
from ironic.conductor import task_manager
from ironic.conductor import utils as manager_utils
from ironic.conf import CONF
@@ -485,46 +485,23 @@ class DracRedfishManagement(redfish_management.RedfishManagement):
# Export executed as part of Import async periodic task status check
@METRICS.timer('DracRedfishManagement._query_import_configuration_status')
- @periodics.periodic(
+ @periodics.node_periodic(
+ purpose='checking async import configuration task',
spacing=CONF.drac.query_import_config_job_status_interval,
- enabled=CONF.drac.query_import_config_job_status_interval > 0)
- def _query_import_configuration_status(self, manager, context):
+ filters={'reserved': False, 'maintenance': False},
+ predicate_extra_fields=['driver_internal_info'],
+ predicate=lambda n: (
+ n.driver_internal_info.get('import_task_monitor_url')
+ ),
+ )
+ def _query_import_configuration_status(self, task, manager, context):
"""Period job to check import configuration task."""
+ if not isinstance(task.driver.management, DracRedfishManagement):
+ return
- filters = {'reserved': False, 'maintenance': False}
- fields = ['driver_internal_info']
- node_list = manager.iter_nodes(fields=fields, filters=filters)
- for (node_uuid, driver, conductor_group,
- driver_internal_info) in node_list:
- try:
-
- task_monitor_url = driver_internal_info.get(
- 'import_task_monitor_url')
- # NOTE(TheJulia): Evaluate if a task montitor URL exists
- # based upon our inital DB query before pulling a task for
- # every node in the deployment which reduces the overall
- # number of DB queries triggering in the background where
- # no work is required.
- if not task_monitor_url:
- continue
-
- lock_purpose = 'checking async import configuration task'
- with task_manager.acquire(context, node_uuid,
- purpose=lock_purpose,
- shared=True) as task:
- if not isinstance(task.driver.management,
- DracRedfishManagement):
- continue
- self._check_import_configuration_task(
- task, task_monitor_url)
- except exception.NodeNotFound:
- LOG.info('During _query_import_configuration_status, node '
- '%(node)s was not found and presumed deleted by '
- 'another process.', {'node': node_uuid})
- except exception.NodeLocked:
- LOG.info('During _query_import_configuration_status, node '
- '%(node)s was already locked by another process. '
- 'Skip.', {'node': node_uuid})
+ self._check_import_configuration_task(
+ task, task.node.driver_internal_info.get(
+ 'import_task_monitor_url'))
def _check_import_configuration_task(self, task, task_monitor_url):
"""Checks progress of running import configuration task"""
diff --git a/ironic/drivers/modules/drac/raid.py b/ironic/drivers/modules/drac/raid.py
index 1bdd36d85..726f57d3a 100644
--- a/ironic/drivers/modules/drac/raid.py
+++ b/ironic/drivers/modules/drac/raid.py
@@ -18,7 +18,6 @@ DRAC RAID specific methods
from collections import defaultdict
import math
-from futurist import periodics
from ironic_lib import metrics_utils
from oslo_log import log as logging
from oslo_utils import importutils
@@ -28,7 +27,7 @@ import tenacity
from ironic.common import exception
from ironic.common.i18n import _
from ironic.common import raid as raid_common
-from ironic.conductor import task_manager
+from ironic.conductor import periodics
from ironic.conductor import utils as manager_utils
from ironic.conf import CONF
from ironic.drivers import base
@@ -1487,38 +1486,22 @@ class DracRedfishRAID(redfish_raid.RedfishRAID):
return False
@METRICS.timer('DracRedfishRAID._query_raid_tasks_status')
- @periodics.periodic(
- spacing=CONF.drac.query_raid_config_job_status_interval)
- def _query_raid_tasks_status(self, manager, context):
+ @periodics.node_periodic(
+ purpose='checking async RAID tasks',
+ spacing=CONF.drac.query_raid_config_job_status_interval,
+ filters={'reserved': False, 'maintenance': False},
+ predicate_extra_fields=['driver_internal_info'],
+ predicate=lambda n: (
+ n.driver_internal_info.get('raid_task_monitor_uris')
+ ),
+ )
+ def _query_raid_tasks_status(self, task, manager, context):
"""Periodic task to check the progress of running RAID tasks"""
+ if not isinstance(task.driver.raid, DracRedfishRAID):
+ return
- filters = {'reserved': False, 'maintenance': False}
- fields = ['driver_internal_info']
- node_list = manager.iter_nodes(fields=fields, filters=filters)
- for (node_uuid, driver, conductor_group,
- driver_internal_info) in node_list:
- task_monitor_uris = driver_internal_info.get(
- 'raid_task_monitor_uris')
- if not task_monitor_uris:
- continue
- try:
- lock_purpose = 'checking async RAID tasks'
- with task_manager.acquire(context, node_uuid,
- purpose=lock_purpose,
- shared=True) as task:
- if not isinstance(task.driver.raid,
- DracRedfishRAID):
- continue
- self._check_raid_tasks_status(
- task, task_monitor_uris)
- except exception.NodeNotFound:
- LOG.info('During _query_raid_tasks_status, node '
- '%(node)s was not found and presumed deleted by '
- 'another process.', {'node': node_uuid})
- except exception.NodeLocked:
- LOG.info('During _query_raid_tasks_status, node '
- '%(node)s was already locked by another process. '
- 'Skip.', {'node': node_uuid})
+ self._check_raid_tasks_status(
+ task, task.node.driver_internal_info.get('raid_task_monitor_uris'))
def _check_raid_tasks_status(self, task, task_mon_uris):
"""Checks RAID tasks for completion
@@ -1763,43 +1746,21 @@ class DracWSManRAID(base.RAIDInterface):
return {'logical_disks': logical_disks}
@METRICS.timer('DracRAID._query_raid_config_job_status')
- @periodics.periodic(
- spacing=CONF.drac.query_raid_config_job_status_interval)
- def _query_raid_config_job_status(self, manager, context):
+ @periodics.node_periodic(
+ purpose='checking async raid configuration jobs',
+ spacing=CONF.drac.query_raid_config_job_status_interval,
+ filters={'reserved': False, 'maintenance': False},
+ predicate_extra_fields=['driver_internal_info'],
+ predicate=lambda n: (
+ n.driver_internal_info.get('raid_config_job_ids')
+ ),
+ )
+ def _query_raid_config_job_status(self, task, manager, context):
"""Periodic task to check the progress of running RAID config jobs."""
+ if not isinstance(task.driver.raid, DracWSManRAID):
+ return
- filters = {'reserved': False, 'maintenance': False}
- fields = ['driver_internal_info']
-
- node_list = manager.iter_nodes(fields=fields, filters=filters)
- for (node_uuid, driver, conductor_group,
- driver_internal_info) in node_list:
- try:
-
- job_ids = driver_internal_info.get('raid_config_job_ids')
- # NOTE(TheJulia): Evaluate if there is work to be done
- # based upon the original DB query's results so we don't
- # proceed creating tasks for every node in the deployment.
- if not job_ids:
- continue
-
- lock_purpose = 'checking async raid configuration jobs'
- with task_manager.acquire(context, node_uuid,
- purpose=lock_purpose,
- shared=True) as task:
- if not isinstance(task.driver.raid, DracWSManRAID):
- continue
-
- self._check_node_raid_jobs(task)
-
- except exception.NodeNotFound:
- LOG.info("During query_raid_config_job_status, node "
- "%(node)s was not found and presumed deleted by "
- "another process.", {'node': node_uuid})
- except exception.NodeLocked:
- LOG.info("During query_raid_config_job_status, node "
- "%(node)s was already locked by another process. "
- "Skip.", {'node': node_uuid})
+ self._check_node_raid_jobs(task)
@METRICS.timer('DracRAID._check_node_raid_jobs')
def _check_node_raid_jobs(self, task):
diff --git a/ironic/drivers/modules/inspector.py b/ironic/drivers/modules/inspector.py
index b344abb74..1b866d0d5 100644
--- a/ironic/drivers/modules/inspector.py
+++ b/ironic/drivers/modules/inspector.py
@@ -20,7 +20,6 @@ import shlex
from urllib import parse as urlparse
import eventlet
-from futurist import periodics
import openstack
from oslo_log import log as logging
@@ -29,6 +28,7 @@ from ironic.common.i18n import _
from ironic.common import keystone
from ironic.common import states
from ironic.common import utils
+from ironic.conductor import periodics
from ironic.conductor import task_manager
from ironic.conductor import utils as cond_utils
from ironic.conf import CONF
@@ -292,21 +292,14 @@ class Inspector(base.InspectInterface):
'ironic-inspector', {'uuid': node_uuid})
_get_client(task.context).abort_introspection(node_uuid)
- @periodics.periodic(spacing=CONF.inspector.status_check_period)
- def _periodic_check_result(self, manager, context):
+ @periodics.node_periodic(
+ purpose='checking hardware inspection status',
+ spacing=CONF.inspector.status_check_period,
+ filters={'provision_state': states.INSPECTWAIT},
+ )
+ def _periodic_check_result(self, task, manager, context):
"""Periodic task checking results of inspection."""
- filters = {'provision_state': states.INSPECTWAIT}
- node_iter = manager.iter_nodes(filters=filters)
-
- for node_uuid, driver, conductor_group in node_iter:
- try:
- lock_purpose = 'checking hardware inspection status'
- with task_manager.acquire(context, node_uuid,
- shared=True,
- purpose=lock_purpose) as task:
- _check_status(task)
- except (exception.NodeLocked, exception.NodeNotFound):
- continue
+ _check_status(task)
def _start_inspection(node_uuid, context):
diff --git a/ironic/drivers/modules/irmc/raid.py b/ironic/drivers/modules/irmc/raid.py
index 8f1bd172a..3368e887d 100644
--- a/ironic/drivers/modules/irmc/raid.py
+++ b/ironic/drivers/modules/irmc/raid.py
@@ -15,7 +15,6 @@
"""
Irmc RAID specific methods
"""
-from futurist import periodics
from ironic_lib import metrics_utils
from oslo_log import log as logging
from oslo_utils import importutils
@@ -23,7 +22,7 @@ from oslo_utils import importutils
from ironic.common import exception
from ironic.common import raid as raid_common
from ironic.common import states
-from ironic.conductor import task_manager
+from ironic.conductor import periodics
from ironic.conductor import utils as manager_utils
from ironic import conf
from ironic.drivers import base
@@ -430,80 +429,63 @@ class IRMCRAID(base.RAIDInterface):
{'node_id': node_uuid, 'cfg': node.raid_config})
@METRICS.timer('IRMCRAID._query_raid_config_fgi_status')
- @periodics.periodic(
- spacing=CONF.irmc.query_raid_config_fgi_status_interval)
- def _query_raid_config_fgi_status(self, manager, context):
+ @periodics.node_periodic(
+ purpose='checking async RAID configuration tasks',
+ spacing=CONF.irmc.query_raid_config_fgi_status_interval,
+ filters={'reserved': False, 'provision_state': states.CLEANWAIT,
+ 'maintenance': False},
+ predicate_extra_fields=['raid_config'],
+ predicate=lambda n: (
+ n.raid_config and not n.raid_config.get('fgi_status')
+ ),
+ )
+ def _query_raid_config_fgi_status(self, task, manager, context):
"""Periodic tasks to check the progress of running RAID config."""
-
- filters = {'reserved': False, 'provision_state': states.CLEANWAIT,
- 'maintenance': False}
- fields = ['raid_config']
- node_list = manager.iter_nodes(fields=fields, filters=filters)
- for (node_uuid, driver, conductor_group, raid_config) in node_list:
- try:
- # NOTE(TheJulia): Evaluate based upon presence of raid
- # configuration before triggering a task, as opposed to after
- # so we don't create excess node task objects with related
- # DB queries.
- if not raid_config or raid_config.get('fgi_status'):
- continue
-
- lock_purpose = 'checking async RAID configuration tasks'
- with task_manager.acquire(context, node_uuid,
- purpose=lock_purpose,
- shared=True) as task:
- node = task.node
- node_uuid = task.node.uuid
- if not isinstance(task.driver.raid, IRMCRAID):
- continue
- if task.node.target_raid_config is None:
- continue
- task.upgrade_lock()
- if node.provision_state != states.CLEANWAIT:
- continue
- # Avoid hitting clean_callback_timeout expiration
- node.touch_provisioning()
-
- try:
- report = irmc_common.get_irmc_report(node)
- except client.scci.SCCIInvalidInputError:
- raid_config.update({'fgi_status': RAID_FAILED})
- raid_common.update_raid_info(node, raid_config)
- self._set_clean_failed(task, RAID_FAILED)
- continue
- except client.scci.SCCIClientError:
- raid_config.update({'fgi_status': RAID_FAILED})
- raid_common.update_raid_info(node, raid_config)
- self._set_clean_failed(task, RAID_FAILED)
- continue
-
- fgi_status_dict = _get_fgi_status(report, node_uuid)
- # Note(trungnv): Allow to check until RAID mechanism to be
- # completed with RAID information in report.
- if fgi_status_dict == 'completing':
- continue
- if not fgi_status_dict:
- raid_config.update({'fgi_status': RAID_FAILED})
- raid_common.update_raid_info(node, raid_config)
- self._set_clean_failed(task, fgi_status_dict)
- continue
- if all(fgi_status == 'Idle' for fgi_status in
- fgi_status_dict.values()):
- raid_config.update({'fgi_status': RAID_COMPLETED})
- raid_common.update_raid_info(node, raid_config)
- LOG.info('RAID configuration has completed on '
- 'node %(node)s with fgi_status is %(fgi)s',
- {'node': node_uuid, 'fgi': RAID_COMPLETED})
- self._resume_cleaning(task)
-
- except exception.NodeNotFound:
- LOG.info('During query_raid_config_job_status, node '
- '%(node)s was not found raid_config and presumed '
- 'deleted by another process.', {'node': node_uuid})
- except exception.NodeLocked:
- LOG.info('During query_raid_config_job_status, node '
- '%(node)s was already locked by another process. '
- 'Skip.', {'node': node_uuid})
+ node = task.node
+ node_uuid = task.node.uuid
+ if not isinstance(task.driver.raid, IRMCRAID):
+ return
+ if task.node.target_raid_config is None:
+ return
+ task.upgrade_lock()
+ if node.provision_state != states.CLEANWAIT:
+ return
+ # Avoid hitting clean_callback_timeout expiration
+ node.touch_provisioning()
+
+ raid_config = node.raid_config
+
+ try:
+ report = irmc_common.get_irmc_report(node)
+ except client.scci.SCCIInvalidInputError:
+ raid_config.update({'fgi_status': RAID_FAILED})
+ raid_common.update_raid_info(node, raid_config)
+ self._set_clean_failed(task, RAID_FAILED)
+ return
+ except client.scci.SCCIClientError:
+ raid_config.update({'fgi_status': RAID_FAILED})
+ raid_common.update_raid_info(node, raid_config)
+ self._set_clean_failed(task, RAID_FAILED)
+ return
+
+ fgi_status_dict = _get_fgi_status(report, node_uuid)
+ # Note(trungnv): Allow to check until RAID mechanism to be
+ # completed with RAID information in report.
+ if fgi_status_dict == 'completing':
+ return
+ if not fgi_status_dict:
+ raid_config.update({'fgi_status': RAID_FAILED})
+ raid_common.update_raid_info(node, raid_config)
+ self._set_clean_failed(task, fgi_status_dict)
+ return
+ if all(fgi_status == 'Idle' for fgi_status in
+ fgi_status_dict.values()):
+ raid_config.update({'fgi_status': RAID_COMPLETED})
+ raid_common.update_raid_info(node, raid_config)
+ LOG.info('RAID configuration has completed on '
+ 'node %(node)s with fgi_status is %(fgi)s',
+ {'node': node_uuid, 'fgi': RAID_COMPLETED})
+ self._resume_cleaning(task)
def _set_clean_failed(self, task, fgi_status_dict):
LOG.error('RAID configuration task failed for node %(node)s. '
diff --git a/ironic/drivers/modules/pxe_base.py b/ironic/drivers/modules/pxe_base.py
index 5fff4ae51..ab5b0d535 100644
--- a/ironic/drivers/modules/pxe_base.py
+++ b/ironic/drivers/modules/pxe_base.py
@@ -13,7 +13,6 @@
Base PXE Interface Methods
"""
-from futurist import periodics
from ironic_lib import metrics_utils
from oslo_config import cfg
from oslo_log import log as logging
@@ -24,7 +23,7 @@ from ironic.common import exception
from ironic.common.i18n import _
from ironic.common import pxe_utils
from ironic.common import states
-from ironic.conductor import task_manager
+from ironic.conductor import periodics
from ironic.conductor import utils as manager_utils
from ironic.drivers.modules import boot_mode_utils
from ironic.drivers.modules import deploy_utils
@@ -452,29 +451,23 @@ class PXEBaseMixin(object):
states.RESCUEWAIT}
@METRICS.timer('PXEBaseMixin._check_boot_timeouts')
- @periodics.periodic(spacing=CONF.pxe.boot_retry_check_interval,
- enabled=bool(CONF.pxe.boot_retry_timeout))
- def _check_boot_timeouts(self, manager, context):
+ @periodics.node_periodic(
+ purpose='checking PXE boot status',
+ spacing=CONF.pxe.boot_retry_check_interval,
+ enabled=bool(CONF.pxe.boot_retry_timeout),
+ filters={'provision_state_in': _RETRY_ALLOWED_STATES,
+ 'reserved': False,
+ 'maintenance': False,
+ 'provisioned_before': CONF.pxe.boot_retry_timeout},
+ )
+ def _check_boot_timeouts(self, task, manager, context):
"""Periodically checks whether boot has timed out and retry it.
+ :param task: a task instance.
:param manager: conductor manager.
:param context: request context.
"""
- filters = {'provision_state_in': self._RETRY_ALLOWED_STATES,
- 'reserved': False,
- 'maintenance': False,
- 'provisioned_before': CONF.pxe.boot_retry_timeout}
- node_iter = manager.iter_nodes(filters=filters)
-
- for node_uuid, driver, conductor_group in node_iter:
- try:
- lock_purpose = 'checking PXE boot status'
- with task_manager.acquire(context, node_uuid,
- shared=True,
- purpose=lock_purpose) as task:
- self._check_boot_status(task)
- except (exception.NodeLocked, exception.NodeNotFound):
- continue
+ self._check_boot_status(task)
def _check_boot_status(self, task):
if not isinstance(task.driver.boot, PXEBaseMixin):
diff --git a/ironic/drivers/modules/redfish/management.py b/ironic/drivers/modules/redfish/management.py
index 9a68d9975..ab1a105ef 100644
--- a/ironic/drivers/modules/redfish/management.py
+++ b/ironic/drivers/modules/redfish/management.py
@@ -15,7 +15,6 @@
import collections
-from futurist import periodics
from ironic_lib import metrics_utils
from oslo_log import log
from oslo_utils import importutils
@@ -29,6 +28,7 @@ from ironic.common.i18n import _
from ironic.common import indicator_states
from ironic.common import states
from ironic.common import utils
+from ironic.conductor import periodics
from ironic.conductor import task_manager
from ironic.conductor import utils as manager_utils
from ironic.conf import CONF
@@ -853,100 +853,46 @@ class RedfishManagement(base.ManagementInterface):
node.save()
@METRICS.timer('RedfishManagement._query_firmware_update_failed')
- @periodics.periodic(
+ @periodics.node_periodic(
+ purpose='checking if async firmware update failed',
spacing=CONF.redfish.firmware_update_fail_interval,
- enabled=CONF.redfish.firmware_update_fail_interval > 0)
- def _query_firmware_update_failed(self, manager, context):
+ filters={'reserved': False, 'provision_state': states.CLEANFAIL,
+ 'maintenance': True},
+ predicate_extra_fields=['driver_internal_info'],
+ predicate=lambda n: n.driver_internal_info.get('firmware_updates'),
+ )
+ def _query_firmware_update_failed(self, task, manager, context):
"""Periodic job to check for failed firmware updates."""
+ if not isinstance(task.driver.management, RedfishManagement):
+ return
- filters = {'reserved': False, 'provision_state': states.CLEANFAIL,
- 'maintenance': True}
+ node = task.node
- fields = ['driver_internal_info']
+ # A firmware update failed. Discard any remaining firmware
+ # updates so when the user takes the node out of
+ # maintenance mode, pending firmware updates do not
+ # automatically continue.
+ LOG.warning('Firmware update failed for node %(node)s. '
+ 'Discarding remaining firmware updates.',
+ {'node': node.uuid})
- node_list = manager.iter_nodes(fields=fields, filters=filters)
- for (node_uuid, driver, conductor_group,
- driver_internal_info) in node_list:
- try:
- firmware_updates = driver_internal_info.get(
- 'firmware_updates')
- # NOTE(TheJulia): If we don't have a entry upfront, we can
- # safely skip past the node as we know work here is not
- # required, otherwise minimizing the number of potential
- # nodes to visit.
- if not firmware_updates:
- continue
-
- lock_purpose = 'checking async firmware update failed.'
- with task_manager.acquire(context, node_uuid,
- purpose=lock_purpose,
- shared=True) as task:
- if not isinstance(task.driver.management,
- RedfishManagement):
- continue
-
- node = task.node
-
- # A firmware update failed. Discard any remaining firmware
- # updates so when the user takes the node out of
- # maintenance mode, pending firmware updates do not
- # automatically continue.
- LOG.warning('Firmware update failed for node %(node)s. '
- 'Discarding remaining firmware updates.',
- {'node': node.uuid})
-
- task.upgrade_lock()
- self._clear_firmware_updates(node)
-
- except exception.NodeNotFound:
- LOG.info('During _query_firmware_update_failed, node '
- '%(node)s was not found and presumed deleted by '
- 'another process.', {'node': node_uuid})
- except exception.NodeLocked:
- LOG.info('During _query_firmware_update_failed, node '
- '%(node)s was already locked by another process. '
- 'Skip.', {'node': node_uuid})
+ task.upgrade_lock()
+ self._clear_firmware_updates(node)
@METRICS.timer('RedfishManagement._query_firmware_update_status')
- @periodics.periodic(
+ @periodics.node_periodic(
+ purpose='checking async firmware update tasks',
spacing=CONF.redfish.firmware_update_status_interval,
- enabled=CONF.redfish.firmware_update_status_interval > 0)
- def _query_firmware_update_status(self, manager, context):
+ filters={'reserved': False, 'provision_state': states.CLEANWAIT},
+ predicate_extra_fields=['driver_internal_info'],
+ predicate=lambda n: n.driver_internal_info.get('firmware_updates'),
+ )
+ def _query_firmware_update_status(self, task, manager, context):
"""Periodic job to check firmware update tasks."""
+ if not isinstance(task.driver.management, RedfishManagement):
+ return
- filters = {'reserved': False, 'provision_state': states.CLEANWAIT}
- fields = ['driver_internal_info']
-
- node_list = manager.iter_nodes(fields=fields, filters=filters)
- for (node_uuid, driver, conductor_group,
- driver_internal_info) in node_list:
- try:
- firmware_updates = driver_internal_info.get(
- 'firmware_updates')
- # NOTE(TheJulia): Check and skip upfront before creating a
- # task so we don't generate additional tasks and db queries
- # for every node in CLEANWAIT which is not locked.
- if not firmware_updates:
- continue
-
- lock_purpose = 'checking async firmware update tasks.'
- with task_manager.acquire(context, node_uuid,
- purpose=lock_purpose,
- shared=True) as task:
- if not isinstance(task.driver.management,
- RedfishManagement):
- continue
-
- self._check_node_firmware_update(task)
-
- except exception.NodeNotFound:
- LOG.info('During _query_firmware_update_status, node '
- '%(node)s was not found and presumed deleted by '
- 'another process.', {'node': node_uuid})
- except exception.NodeLocked:
- LOG.info('During _query_firmware_update_status, node '
- '%(node)s was already locked by another process. '
- 'Skip.', {'node': node_uuid})
+ self._check_node_firmware_update(task)
@METRICS.timer('RedfishManagement._check_node_firmware_update')
def _check_node_firmware_update(self, task):
diff --git a/ironic/drivers/modules/redfish/raid.py b/ironic/drivers/modules/redfish/raid.py
index c01d08a9c..95052bb46 100644
--- a/ironic/drivers/modules/redfish/raid.py
+++ b/ironic/drivers/modules/redfish/raid.py
@@ -15,7 +15,6 @@
import math
-from futurist import periodics
from ironic_lib import metrics_utils
from oslo_log import log
from oslo_utils import importutils
@@ -25,7 +24,7 @@ from ironic.common import exception
from ironic.common.i18n import _
from ironic.common import raid
from ironic.common import states
-from ironic.conductor import task_manager
+from ironic.conductor import periodics
from ironic.conductor import utils as manager_utils
from ironic.conf import CONF
from ironic.drivers import base
@@ -1014,98 +1013,46 @@ class RedfishRAID(base.RAIDInterface):
node.save()
@METRICS.timer('RedfishRAID._query_raid_config_failed')
- @periodics.periodic(
+ @periodics.node_periodic(
+ purpose='checking async RAID config failed',
spacing=CONF.redfish.raid_config_fail_interval,
- enabled=CONF.redfish.raid_config_fail_interval > 0)
- def _query_raid_config_failed(self, manager, context):
+ filters={'reserved': False, 'provision_state': states.CLEANFAIL,
+ 'maintenance': True},
+ predicate_extra_fields=['driver_internal_info'],
+ predicate=lambda n: n.driver_internal_info.get('raid_configs'),
+ )
+ def _query_raid_config_failed(self, task, manager, context):
"""Periodic job to check for failed RAID configuration."""
+ if not isinstance(task.driver.raid, RedfishRAID):
+ return
- filters = {'reserved': False, 'provision_state': states.CLEANFAIL,
- 'maintenance': True}
-
- fields = ['driver_internal_info']
+ node = task.node
- node_list = manager.iter_nodes(fields=fields, filters=filters)
- for (node_uuid, driver, conductor_group,
- driver_internal_info) in node_list:
- try:
- raid_configs = driver_internal_info.get(
- 'raid_configs')
- # NOTE(TheJulia): Evaluate the presence of raid configuration
- # activity before pulling the task, so we don't needlessly
- # create database queries with tasks which would be skipped
- # anyhow.
- if not raid_configs:
- continue
+ # A RAID config failed. Discard any remaining RAID
+ # configs so when the user takes the node out of
+ # maintenance mode, pending RAID configs do not
+ # automatically continue.
+ LOG.warning('RAID configuration failed for node %(node)s. '
+ 'Discarding remaining RAID configurations.',
+ {'node': node.uuid})
- lock_purpose = 'checking async RAID config failed.'
- with task_manager.acquire(context, node_uuid,
- purpose=lock_purpose,
- shared=True) as task:
- if not isinstance(task.driver.raid, RedfishRAID):
- continue
-
- node = task.node
-
- # A RAID config failed. Discard any remaining RAID
- # configs so when the user takes the node out of
- # maintenance mode, pending RAID configs do not
- # automatically continue.
- LOG.warning('RAID configuration failed for node %(node)s. '
- 'Discarding remaining RAID configurations.',
- {'node': node.uuid})
-
- task.upgrade_lock()
- self._clear_raid_configs(node)
-
- except exception.NodeNotFound:
- LOG.info('During _query_raid_config_failed, node '
- '%(node)s was not found and presumed deleted by '
- 'another process.', {'node': node_uuid})
- except exception.NodeLocked:
- LOG.info('During _query_raid_config_failed, node '
- '%(node)s was already locked by another process. '
- 'Skip.', {'node': node_uuid})
+ task.upgrade_lock()
+ self._clear_raid_configs(node)
@METRICS.timer('RedfishRAID._query_raid_config_status')
- @periodics.periodic(
+ @periodics.node_periodic(
+ purpose='checking async RAID config tasks',
spacing=CONF.redfish.raid_config_status_interval,
- enabled=CONF.redfish.raid_config_status_interval > 0)
- def _query_raid_config_status(self, manager, context):
+ filters={'reserved': False, 'provision_state': states.CLEANWAIT},
+ predicate_extra_fields=['driver_internal_info'],
+ predicate=lambda n: n.driver_internal_info.get('raid_configs'),
+ )
+ def _query_raid_config_status(self, task, manager, context):
"""Periodic job to check RAID config tasks."""
+ if not isinstance(task.driver.raid, RedfishRAID):
+ return
- filters = {'reserved': False, 'provision_state': states.CLEANWAIT}
- fields = ['driver_internal_info']
-
- node_list = manager.iter_nodes(fields=fields, filters=filters)
- for (node_uuid, driver, conductor_group,
- driver_internal_info) in node_list:
- try:
- raid_configs = driver_internal_info.get(
- 'raid_configs')
- # NOTE(TheJulia): Skip to next record if we do not
- # have raid configuraiton tasks, so we don't pull tasks
- # for every unrelated node in CLEANWAIT.
- if not raid_configs:
- continue
-
- lock_purpose = 'checking async RAID config tasks.'
- with task_manager.acquire(context, node_uuid,
- purpose=lock_purpose,
- shared=True) as task:
- if not isinstance(task.driver.raid, RedfishRAID):
- continue
-
- self._check_node_raid_config(task)
-
- except exception.NodeNotFound:
- LOG.info('During _query_raid_config_status, node '
- '%(node)s was not found and presumed deleted by '
- 'another process.', {'node': node_uuid})
- except exception.NodeLocked:
- LOG.info('During _query_raid_config_status, node '
- '%(node)s was already locked by another process. '
- 'Skip.', {'node': node_uuid})
+ self._check_node_raid_config(task)
def _get_error_messages(self, response):
try:
diff --git a/ironic/tests/unit/conductor/test_manager.py b/ironic/tests/unit/conductor/test_manager.py
index e93b3cb78..a00bb97f8 100644
--- a/ironic/tests/unit/conductor/test_manager.py
+++ b/ironic/tests/unit/conductor/test_manager.py
@@ -5567,7 +5567,7 @@ class ManagerPowerRecoveryTestCase(mgr_utils.CommonMixIn,
self.task.driver = self.driver
self.filters = {'maintenance': True,
'fault': 'power failure'}
- self.columns = ['uuid', 'driver', 'conductor_group', 'id']
+ self.columns = ['uuid', 'driver', 'conductor_group']
def test_node_not_mapped(self, get_nodeinfo_mock,
mapped_mock, acquire_mock):
@@ -6152,7 +6152,7 @@ class ManagerSyncLocalStateTestCase(mgr_utils.CommonMixIn, db_base.DbTestCase):
self.filters = {'reserved': False,
'maintenance': False,
'provision_state': states.ACTIVE}
- self.columns = ['uuid', 'driver', 'conductor_group', 'id',
+ self.columns = ['uuid', 'driver', 'conductor_group',
'conductor_affinity']
def _assert_get_nodeinfo_args(self, get_nodeinfo_mock):
@@ -6200,7 +6200,7 @@ class ManagerSyncLocalStateTestCase(mgr_utils.CommonMixIn, db_base.DbTestCase):
self.service, self.node.uuid, self.node.driver,
self.node.conductor_group)
acquire_mock.assert_called_once_with(self.context, self.node.uuid,
- purpose=mock.ANY)
+ purpose=mock.ANY, shared=False)
# assert spawn_after has been called
self.task.spawn_after.assert_called_once_with(
self.service._spawn_worker,
@@ -6234,7 +6234,7 @@ class ManagerSyncLocalStateTestCase(mgr_utils.CommonMixIn, db_base.DbTestCase):
# assert acquire() gets called 2 times only instead of 3. When
# NoFreeConductorWorker is raised the loop should be broken
expected = [mock.call(self.context, self.node.uuid,
- purpose=mock.ANY)] * 2
+ purpose=mock.ANY, shared=False)] * 2
self.assertEqual(expected, acquire_mock.call_args_list)
# assert spawn_after has been called twice
@@ -6264,7 +6264,7 @@ class ManagerSyncLocalStateTestCase(mgr_utils.CommonMixIn, db_base.DbTestCase):
# assert acquire() gets called 3 times
expected = [mock.call(self.context, self.node.uuid,
- purpose=mock.ANY)] * 3
+ purpose=mock.ANY, shared=False)] * 3
self.assertEqual(expected, acquire_mock.call_args_list)
# assert spawn_after has been called only 2 times
@@ -6296,7 +6296,7 @@ class ManagerSyncLocalStateTestCase(mgr_utils.CommonMixIn, db_base.DbTestCase):
# assert acquire() gets called only once because of the worker limit
acquire_mock.assert_called_once_with(self.context, self.node.uuid,
- purpose=mock.ANY)
+ purpose=mock.ANY, shared=False)
# assert spawn_after has been called
self.task.spawn_after.assert_called_once_with(
diff --git a/ironic/tests/unit/conductor/test_periodics.py b/ironic/tests/unit/conductor/test_periodics.py
new file mode 100644
index 000000000..85868163a
--- /dev/null
+++ b/ironic/tests/unit/conductor/test_periodics.py
@@ -0,0 +1,135 @@
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+from unittest import mock
+
+from oslo_utils import uuidutils
+
+from ironic.common import context as ironic_context
+from ironic.conductor import base_manager
+from ironic.conductor import periodics
+from ironic.conductor import task_manager
+from ironic.tests.unit.db import base as db_base
+from ironic.tests.unit.objects import utils as obj_utils
+
+
+_FILTERS = {'maintenance': False}
+
+
+class PeriodicTestService(base_manager.BaseConductorManager):
+
+ def __init__(self, test):
+ self.test = test
+ self.nodes = []
+
+ @periodics.node_periodic(purpose="herding cats", spacing=42)
+ def simple(self, task, context):
+ self.test.assertIsInstance(context, ironic_context.RequestContext)
+ self.test.assertTrue(task.shared)
+ self.nodes.append(task.node.uuid)
+
+ @periodics.node_periodic(purpose="herding cats", spacing=42,
+ shared_task=False, filters=_FILTERS)
+ def exclusive(self, task, context):
+ self.test.assertIsInstance(context, ironic_context.RequestContext)
+ self.test.assertFalse(task.shared)
+ self.nodes.append(task.node.uuid)
+
+ @periodics.node_periodic(purpose="never running", spacing=42,
+ predicate=lambda n: n.cat != 'meow',
+ predicate_extra_fields=['cat'])
+ def never_run(self, task, context):
+ self.test.fail(f"Was not supposed to run, ran with {task.node}")
+
+ @periodics.node_periodic(purpose="herding cats", spacing=42, limit=3)
+ def limit(self, task, context):
+ self.test.assertIsInstance(context, ironic_context.RequestContext)
+ self.test.assertTrue(task.shared)
+ self.nodes.append(task.node.uuid)
+ if task.node.uuid == 'stop':
+ raise periodics.Stop()
+
+
+@mock.patch.object(PeriodicTestService, 'iter_nodes', autospec=True)
+class NodePeriodicTestCase(db_base.DbTestCase):
+
+ def setUp(self):
+ super().setUp()
+ self.service = PeriodicTestService(self)
+ self.ctx = ironic_context.get_admin_context()
+ self.uuid = uuidutils.generate_uuid()
+ self.node = obj_utils.create_test_node(self.context, uuid=self.uuid)
+
+ def test_simple(self, mock_iter_nodes):
+ mock_iter_nodes.return_value = iter([
+ (uuidutils.generate_uuid(), 'driver1', ''),
+ (self.uuid, 'driver2', 'group'),
+ ])
+
+ self.service.simple(self.ctx)
+
+ mock_iter_nodes.assert_called_once_with(self.service,
+ filters=None, fields=())
+ self.assertEqual([self.uuid], self.service.nodes)
+
+ def test_exclusive(self, mock_iter_nodes):
+ mock_iter_nodes.return_value = iter([
+ (uuidutils.generate_uuid(), 'driver1', ''),
+ (self.uuid, 'driver2', 'group'),
+ ])
+
+ self.service.exclusive(self.ctx)
+
+ mock_iter_nodes.assert_called_once_with(self.service,
+ filters=_FILTERS,
+ fields=())
+ self.assertEqual([self.uuid], self.service.nodes)
+
+ @mock.patch.object(task_manager, 'acquire', autospec=True)
+ def test_never_run(self, mock_acquire, mock_iter_nodes):
+ mock_iter_nodes.return_value = iter([
+ (self.uuid, 'driver2', 'group', 'meow'),
+ ])
+
+ self.service.never_run(self.ctx)
+
+ mock_iter_nodes.assert_called_once_with(self.service,
+ filters=None,
+ fields=['cat'])
+ self.assertEqual([], self.service.nodes)
+ mock_acquire.assert_not_called()
+
+ @mock.patch.object(task_manager, 'acquire', autospec=True)
+ def test_limit(self, mock_acquire, mock_iter_nodes):
+ mock_iter_nodes.return_value = iter([
+ (self.uuid, 'driver1', ''),
+ ] * 10)
+ mock_acquire.return_value.__enter__.return_value.node.uuid = self.uuid
+
+ self.service.limit(self.ctx)
+
+ mock_iter_nodes.assert_called_once_with(self.service,
+ filters=None, fields=())
+ self.assertEqual([self.uuid] * 3, self.service.nodes)
+
+ @mock.patch.object(task_manager, 'acquire', autospec=True)
+ def test_stop(self, mock_acquire, mock_iter_nodes):
+ mock_iter_nodes.return_value = iter([
+ (self.uuid, 'driver1', ''),
+ ] * 10)
+ mock_acquire.return_value.__enter__.return_value.node.uuid = 'stop'
+
+ self.service.limit(self.ctx)
+
+ mock_iter_nodes.assert_called_once_with(self.service,
+ filters=None, fields=())
+ self.assertEqual(['stop'], self.service.nodes)
diff --git a/ironic/tests/unit/drivers/modules/drac/test_management.py b/ironic/tests/unit/drivers/modules/drac/test_management.py
index f3d23d9a8..9d5182e89 100644
--- a/ironic/tests/unit/drivers/modules/drac/test_management.py
+++ b/ironic/tests/unit/drivers/modules/drac/test_management.py
@@ -28,6 +28,7 @@ from oslo_utils import importutils
import ironic.common.boot_devices
from ironic.common import exception
from ironic.common import molds
+from ironic.conductor import periodics
from ironic.conductor import task_manager
from ironic.conductor import utils as manager_utils
from ironic.drivers.modules import deploy_utils
@@ -1021,7 +1022,7 @@ class DracRedfishManagementTestCase(test_utils.BaseDracTest):
self.management._check_import_configuration_task.assert_not_called()
- @mock.patch.object(drac_mgmt.LOG, 'info', autospec=True)
+ @mock.patch.object(periodics.LOG, 'info', autospec=True)
@mock.patch.object(task_manager, 'acquire', autospec=True)
def test__query_import_configuration_status_node_notfound(
self, mock_acquire, mock_log):
@@ -1044,7 +1045,7 @@ class DracRedfishManagementTestCase(test_utils.BaseDracTest):
self.management._check_import_configuration_task.assert_not_called()
self.assertTrue(mock_log.called)
- @mock.patch.object(drac_mgmt.LOG, 'info', autospec=True)
+ @mock.patch.object(periodics.LOG, 'info', autospec=True)
@mock.patch.object(task_manager, 'acquire', autospec=True)
def test__query_import_configuration_status_node_locked(
self, mock_acquire, mock_log):
diff --git a/ironic/tests/unit/drivers/modules/drac/test_raid.py b/ironic/tests/unit/drivers/modules/drac/test_raid.py
index 1a5928e43..01a5ca9d1 100644
--- a/ironic/tests/unit/drivers/modules/drac/test_raid.py
+++ b/ironic/tests/unit/drivers/modules/drac/test_raid.py
@@ -25,6 +25,7 @@ import tenacity
from ironic.common import exception
from ironic.common import states
+from ironic.conductor import periodics
from ironic.conductor import task_manager
from ironic.conductor import utils as manager_utils
from ironic.conf import CONF
@@ -2592,7 +2593,7 @@ class DracRedfishRAIDTestCase(test_utils.BaseDracTest):
self.raid._check_raid_tasks_status.assert_not_called()
- @mock.patch.object(drac_raid.LOG, 'info', autospec=True)
+ @mock.patch.object(periodics.LOG, 'info', autospec=True)
@mock.patch.object(task_manager, 'acquire', autospec=True)
def test__query_raid_tasks_status_node_notfound(
self, mock_acquire, mock_log):
@@ -2610,7 +2611,7 @@ class DracRedfishRAIDTestCase(test_utils.BaseDracTest):
self.raid._check_raid_tasks_status.assert_not_called()
self.assertTrue(mock_log.called)
- @mock.patch.object(drac_raid.LOG, 'info', autospec=True)
+ @mock.patch.object(periodics.LOG, 'info', autospec=True)
@mock.patch.object(task_manager, 'acquire', autospec=True)
def test__query_raid_tasks_status_node_locked(
self, mock_acquire, mock_log):
diff --git a/ironic/tests/unit/drivers/modules/irmc/test_periodic_task.py b/ironic/tests/unit/drivers/modules/irmc/test_periodic_task.py
index 865f58962..57ba8263f 100644
--- a/ironic/tests/unit/drivers/modules/irmc/test_periodic_task.py
+++ b/ironic/tests/unit/drivers/modules/irmc/test_periodic_task.py
@@ -49,6 +49,8 @@ class iRMCPeriodicTaskTestCase(test_common.BaseIRMCTest):
{
'key': 'value'
}]}
+ self.node.raid_config = self.raid_config
+ self.node.target_raid_config = self.target_raid_config
@mock.patch.object(irmc_common, 'get_irmc_report', autospec=True)
def test__query_raid_config_fgi_status_without_node(
@@ -286,6 +288,7 @@ class iRMCPeriodicTaskTestCase(test_common.BaseIRMCTest):
mock_manager = mock.Mock()
raid_config = self.raid_config
raid_config_2 = self.raid_config.copy()
+ self.node_2.raid_config = raid_config_2
fgi_status_dict = {}
fgi_mock.side_effect = [{}, {'0': 'Idle', '1': 'Idle'}]
node_list = [(self.node_2.uuid, 'fake-hardware', '', raid_config_2),
diff --git a/ironic/tests/unit/drivers/modules/redfish/test_management.py b/ironic/tests/unit/drivers/modules/redfish/test_management.py
index 99da1265b..d5f23b93f 100644
--- a/ironic/tests/unit/drivers/modules/redfish/test_management.py
+++ b/ironic/tests/unit/drivers/modules/redfish/test_management.py
@@ -25,6 +25,7 @@ from ironic.common import components
from ironic.common import exception
from ironic.common import indicator_states
from ironic.common import states
+from ironic.conductor import periodics
from ironic.conductor import task_manager
from ironic.conductor import utils as manager_utils
from ironic.drivers.modules import deploy_utils
@@ -905,7 +906,7 @@ class RedfishManagementTestCase(db_base.DbTestCase):
management._clear_firmware_updates.assert_not_called()
- @mock.patch.object(redfish_mgmt.LOG, 'info', autospec=True)
+ @mock.patch.object(periodics.LOG, 'info', autospec=True)
@mock.patch.object(task_manager, 'acquire', autospec=True)
def test__query_firmware_update_failed_node_notfound(self, mock_acquire,
mock_log):
@@ -928,7 +929,7 @@ class RedfishManagementTestCase(db_base.DbTestCase):
management._clear_firmware_updates.assert_not_called()
self.assertTrue(mock_log.called)
- @mock.patch.object(redfish_mgmt.LOG, 'info', autospec=True)
+ @mock.patch.object(periodics.LOG, 'info', autospec=True)
@mock.patch.object(task_manager, 'acquire', autospec=True)
def test__query_firmware_update_failed_node_locked(
self, mock_acquire, mock_log):
@@ -1017,7 +1018,7 @@ class RedfishManagementTestCase(db_base.DbTestCase):
management._check_node_firmware_update.assert_not_called()
- @mock.patch.object(redfish_mgmt.LOG, 'info', autospec=True)
+ @mock.patch.object(periodics.LOG, 'info', autospec=True)
@mock.patch.object(task_manager, 'acquire', autospec=True)
def test__query_firmware_update_status_node_notfound(self, mock_acquire,
mock_log):
@@ -1040,7 +1041,7 @@ class RedfishManagementTestCase(db_base.DbTestCase):
management._check_node_firmware_update.assert_not_called()
self.assertTrue(mock_log.called)
- @mock.patch.object(redfish_mgmt.LOG, 'info', autospec=True)
+ @mock.patch.object(periodics.LOG, 'info', autospec=True)
@mock.patch.object(task_manager, 'acquire', autospec=True)
def test__query_firmware_update_status_node_locked(
self, mock_acquire, mock_log):