summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--ironic/conductor/manager.py46
-rw-r--r--ironic/conf/conductor.py9
-rw-r--r--ironic/tests/unit/conductor/test_manager.py71
-rw-r--r--releasenotes/notes/add-parallel-power-syncs-b099d66e80aab616.yaml9
4 files changed, 128 insertions, 7 deletions
diff --git a/ironic/conductor/manager.py b/ironic/conductor/manager.py
index da179364e..b46c026f8 100644
--- a/ironic/conductor/manager.py
+++ b/ironic/conductor/manager.py
@@ -1663,10 +1663,40 @@ class ConductorManager(base_manager.BaseConductorManager):
@periodics.periodic(spacing=CONF.conductor.sync_power_state_interval,
enabled=CONF.conductor.sync_power_state_interval > 0)
def _sync_power_states(self, context):
- """Periodic task to sync power states for the nodes.
+ """Periodic task to sync power states for the nodes."""
+ filters = {'maintenance': False}
+ nodes = queue.Queue()
+ for node_info in self.iter_nodes(fields=['id'], filters=filters):
+ nodes.put(node_info)
- Attempt to grab a lock and sync only if the following
- conditions are met:
+ number_of_threads = min(CONF.conductor.sync_power_state_workers,
+ CONF.conductor.periodic_max_workers,
+ nodes.qsize())
+ futures = []
+
+ for thread_number in range(max(0, number_of_threads - 1)):
+ try:
+ futures.append(
+ self._spawn_worker(self._sync_power_state_nodes_task,
+ context, nodes))
+ except exception.NoFreeConductorWorker:
+ LOG.warning("There are no more conductor workers for "
+ "power sync task. %(workers)d workers have "
+ "been already spawned.",
+ {'workers': thread_number})
+ break
+
+ try:
+ self._sync_power_state_nodes_task(context, nodes)
+
+ finally:
+ waiters.wait_for_all(futures)
+
+ def _sync_power_state_nodes_task(self, context, nodes):
+ """Invokes power state sync on nodes from synchronized queue.
+
+ Attempt to grab a lock and sync only if the following conditions
+ are met:
1) Node is mapped to this conductor.
2) Node is not in maintenance mode.
@@ -1692,9 +1722,13 @@ class ConductorManager(base_manager.BaseConductorManager):
# (through to its DB API call) so that we can eliminate our call
# and first set of checks below.
- filters = {'maintenance': False}
- node_iter = self.iter_nodes(fields=['id'], filters=filters)
- for (node_uuid, driver, conductor_group, node_id) in node_iter:
+ while not self._shutdown:
+ try:
+ (node_uuid, driver, conductor_group,
+ node_id) = nodes.get_nowait()
+ except queue.Empty:
+ break
+
try:
# NOTE(dtantsur): start with a shared lock, upgrade if needed
with task_manager.acquire(context, node_uuid,
diff --git a/ironic/conf/conductor.py b/ironic/conf/conductor.py
index 6495e805d..45024ad4f 100644
--- a/ironic/conf/conductor.py
+++ b/ironic/conf/conductor.py
@@ -24,7 +24,9 @@ opts = [
default=100, min=3,
help=_('The size of the workers greenthread pool. '
'Note that 2 threads will be reserved by the conductor '
- 'itself for handling heart beats and periodic tasks.')),
+ 'itself for handling heart beats and periodic tasks. '
+ 'On top of that, `sync_power_state_workers` will take '
+ 'up to 7 green threads with the default value of 8.')),
cfg.IntOpt('heartbeat_interval',
default=10,
help=_('Seconds between conductor heart beats.')),
@@ -77,6 +79,11 @@ opts = [
'number of times Ironic should try syncing the '
'hardware node power state with the node power state '
'in DB')),
+ cfg.IntOpt('sync_power_state_workers',
+ default=8, min=1,
+ help=_('The maximum number of workers that can be started '
+ 'simultaneously to sync nodes power state from the '
+ 'periodic task.')),
cfg.IntOpt('periodic_max_workers',
default=8,
help=_('Maximum number of worker threads that can be started '
diff --git a/ironic/tests/unit/conductor/test_manager.py b/ironic/tests/unit/conductor/test_manager.py
index 8e60a52d7..b5c1d2ee4 100644
--- a/ironic/tests/unit/conductor/test_manager.py
+++ b/ironic/tests/unit/conductor/test_manager.py
@@ -22,6 +22,7 @@ from collections import namedtuple
import datetime
import eventlet
+from futurist import waiters
import mock
from oslo_config import cfg
from oslo_db import exception as db_exception
@@ -6416,6 +6417,10 @@ class ManagerDoSyncPowerStateTestCase(db_base.DbTestCase):
self.task.upgrade_lock.assert_called_once_with()
+@mock.patch.object(waiters, 'wait_for_all',
+ new=mock.MagicMock(return_value=(0, 0)))
+@mock.patch.object(manager.ConductorManager, '_spawn_worker',
+ new=lambda self, fun, *args: fun(*args))
@mock.patch.object(manager, 'do_sync_power_state')
@mock.patch.object(task_manager, 'acquire')
@mock.patch.object(manager.ConductorManager, '_mapped_to_this_conductor')
@@ -7140,6 +7145,72 @@ class ManagerTestHardwareTypeProperties(mgr_utils.ServiceSetUpMixin,
self._check_hardware_type_properties('manual-management', expected)
+@mock.patch.object(waiters, 'wait_for_all')
+@mock.patch.object(manager.ConductorManager, '_spawn_worker')
+@mock.patch.object(manager.ConductorManager, '_sync_power_state_nodes_task')
+class ParallelPowerSyncTestCase(mgr_utils.CommonMixIn):
+
+ def setUp(self):
+ super(ParallelPowerSyncTestCase, self).setUp()
+ self.service = manager.ConductorManager('hostname', 'test-topic')
+
+ def test__sync_power_states_9_nodes_8_workers(
+ self, sync_mock, spawn_mock, waiter_mock):
+
+ CONF.set_override('sync_power_state_workers', 8, group='conductor')
+
+ with mock.patch.object(self.service, 'iter_nodes',
+ new=mock.MagicMock(return_value=[None] * 9)):
+
+ self.service._sync_power_states(self.context)
+
+ self.assertEqual(7, spawn_mock.call_count)
+ self.assertEqual(1, sync_mock.call_count)
+ self.assertEqual(1, waiter_mock.call_count)
+
+ def test__sync_power_states_6_nodes_8_workers(
+ self, sync_mock, spawn_mock, waiter_mock):
+
+ CONF.set_override('sync_power_state_workers', 8, group='conductor')
+
+ with mock.patch.object(self.service, 'iter_nodes',
+ new=mock.MagicMock(return_value=[None] * 6)):
+
+ self.service._sync_power_states(self.context)
+
+ self.assertEqual(5, spawn_mock.call_count)
+ self.assertEqual(1, sync_mock.call_count)
+ self.assertEqual(1, waiter_mock.call_count)
+
+ def test__sync_power_states_1_nodes_8_workers(
+ self, sync_mock, spawn_mock, waiter_mock):
+
+ CONF.set_override('sync_power_state_workers', 8, group='conductor')
+
+ with mock.patch.object(self.service, 'iter_nodes',
+ new=mock.MagicMock(return_value=[None])):
+
+ self.service._sync_power_states(self.context)
+
+ self.assertEqual(0, spawn_mock.call_count)
+ self.assertEqual(1, sync_mock.call_count)
+ self.assertEqual(1, waiter_mock.call_count)
+
+ def test__sync_power_states_9_nodes_1_worker(
+ self, sync_mock, spawn_mock, waiter_mock):
+
+ CONF.set_override('sync_power_state_workers', 1, group='conductor')
+
+ with mock.patch.object(self.service, 'iter_nodes',
+ new=mock.MagicMock(return_value=[None] * 9)):
+
+ self.service._sync_power_states(self.context)
+
+ self.assertEqual(0, spawn_mock.call_count)
+ self.assertEqual(9, sync_mock.call_count)
+ self.assertEqual(1, waiter_mock.call_count)
+
+
@mock.patch.object(task_manager, 'acquire')
@mock.patch.object(manager.ConductorManager, '_mapped_to_this_conductor')
@mock.patch.object(dbapi.IMPL, 'get_nodeinfo_list')
diff --git a/releasenotes/notes/add-parallel-power-syncs-b099d66e80aab616.yaml b/releasenotes/notes/add-parallel-power-syncs-b099d66e80aab616.yaml
new file mode 100644
index 000000000..c77a48274
--- /dev/null
+++ b/releasenotes/notes/add-parallel-power-syncs-b099d66e80aab616.yaml
@@ -0,0 +1,9 @@
+---
+features:
+ - |
+ Parallelizes periodic power sync calls by running up to
+ ``sync_power_state_workers`` simultenously. The default is to run
+ up to ``8`` workers.
+ This change should let larger-scale setups to run power syncs more
+ frequently and make the whole power sync procedure more resilient to slow
+ or dead BMCs.