diff options
-rw-r--r-- | ironic/conductor/manager.py | 46 | ||||
-rw-r--r-- | ironic/conf/conductor.py | 9 | ||||
-rw-r--r-- | ironic/tests/unit/conductor/test_manager.py | 71 | ||||
-rw-r--r-- | releasenotes/notes/add-parallel-power-syncs-b099d66e80aab616.yaml | 9 |
4 files changed, 128 insertions, 7 deletions
diff --git a/ironic/conductor/manager.py b/ironic/conductor/manager.py index da179364e..b46c026f8 100644 --- a/ironic/conductor/manager.py +++ b/ironic/conductor/manager.py @@ -1663,10 +1663,40 @@ class ConductorManager(base_manager.BaseConductorManager): @periodics.periodic(spacing=CONF.conductor.sync_power_state_interval, enabled=CONF.conductor.sync_power_state_interval > 0) def _sync_power_states(self, context): - """Periodic task to sync power states for the nodes. + """Periodic task to sync power states for the nodes.""" + filters = {'maintenance': False} + nodes = queue.Queue() + for node_info in self.iter_nodes(fields=['id'], filters=filters): + nodes.put(node_info) - Attempt to grab a lock and sync only if the following - conditions are met: + number_of_threads = min(CONF.conductor.sync_power_state_workers, + CONF.conductor.periodic_max_workers, + nodes.qsize()) + futures = [] + + for thread_number in range(max(0, number_of_threads - 1)): + try: + futures.append( + self._spawn_worker(self._sync_power_state_nodes_task, + context, nodes)) + except exception.NoFreeConductorWorker: + LOG.warning("There are no more conductor workers for " + "power sync task. %(workers)d workers have " + "been already spawned.", + {'workers': thread_number}) + break + + try: + self._sync_power_state_nodes_task(context, nodes) + + finally: + waiters.wait_for_all(futures) + + def _sync_power_state_nodes_task(self, context, nodes): + """Invokes power state sync on nodes from synchronized queue. + + Attempt to grab a lock and sync only if the following conditions + are met: 1) Node is mapped to this conductor. 2) Node is not in maintenance mode. @@ -1692,9 +1722,13 @@ class ConductorManager(base_manager.BaseConductorManager): # (through to its DB API call) so that we can eliminate our call # and first set of checks below. - filters = {'maintenance': False} - node_iter = self.iter_nodes(fields=['id'], filters=filters) - for (node_uuid, driver, conductor_group, node_id) in node_iter: + while not self._shutdown: + try: + (node_uuid, driver, conductor_group, + node_id) = nodes.get_nowait() + except queue.Empty: + break + try: # NOTE(dtantsur): start with a shared lock, upgrade if needed with task_manager.acquire(context, node_uuid, diff --git a/ironic/conf/conductor.py b/ironic/conf/conductor.py index 6495e805d..45024ad4f 100644 --- a/ironic/conf/conductor.py +++ b/ironic/conf/conductor.py @@ -24,7 +24,9 @@ opts = [ default=100, min=3, help=_('The size of the workers greenthread pool. ' 'Note that 2 threads will be reserved by the conductor ' - 'itself for handling heart beats and periodic tasks.')), + 'itself for handling heart beats and periodic tasks. ' + 'On top of that, `sync_power_state_workers` will take ' + 'up to 7 green threads with the default value of 8.')), cfg.IntOpt('heartbeat_interval', default=10, help=_('Seconds between conductor heart beats.')), @@ -77,6 +79,11 @@ opts = [ 'number of times Ironic should try syncing the ' 'hardware node power state with the node power state ' 'in DB')), + cfg.IntOpt('sync_power_state_workers', + default=8, min=1, + help=_('The maximum number of workers that can be started ' + 'simultaneously to sync nodes power state from the ' + 'periodic task.')), cfg.IntOpt('periodic_max_workers', default=8, help=_('Maximum number of worker threads that can be started ' diff --git a/ironic/tests/unit/conductor/test_manager.py b/ironic/tests/unit/conductor/test_manager.py index 8e60a52d7..b5c1d2ee4 100644 --- a/ironic/tests/unit/conductor/test_manager.py +++ b/ironic/tests/unit/conductor/test_manager.py @@ -22,6 +22,7 @@ from collections import namedtuple import datetime import eventlet +from futurist import waiters import mock from oslo_config import cfg from oslo_db import exception as db_exception @@ -6416,6 +6417,10 @@ class ManagerDoSyncPowerStateTestCase(db_base.DbTestCase): self.task.upgrade_lock.assert_called_once_with() +@mock.patch.object(waiters, 'wait_for_all', + new=mock.MagicMock(return_value=(0, 0))) +@mock.patch.object(manager.ConductorManager, '_spawn_worker', + new=lambda self, fun, *args: fun(*args)) @mock.patch.object(manager, 'do_sync_power_state') @mock.patch.object(task_manager, 'acquire') @mock.patch.object(manager.ConductorManager, '_mapped_to_this_conductor') @@ -7140,6 +7145,72 @@ class ManagerTestHardwareTypeProperties(mgr_utils.ServiceSetUpMixin, self._check_hardware_type_properties('manual-management', expected) +@mock.patch.object(waiters, 'wait_for_all') +@mock.patch.object(manager.ConductorManager, '_spawn_worker') +@mock.patch.object(manager.ConductorManager, '_sync_power_state_nodes_task') +class ParallelPowerSyncTestCase(mgr_utils.CommonMixIn): + + def setUp(self): + super(ParallelPowerSyncTestCase, self).setUp() + self.service = manager.ConductorManager('hostname', 'test-topic') + + def test__sync_power_states_9_nodes_8_workers( + self, sync_mock, spawn_mock, waiter_mock): + + CONF.set_override('sync_power_state_workers', 8, group='conductor') + + with mock.patch.object(self.service, 'iter_nodes', + new=mock.MagicMock(return_value=[None] * 9)): + + self.service._sync_power_states(self.context) + + self.assertEqual(7, spawn_mock.call_count) + self.assertEqual(1, sync_mock.call_count) + self.assertEqual(1, waiter_mock.call_count) + + def test__sync_power_states_6_nodes_8_workers( + self, sync_mock, spawn_mock, waiter_mock): + + CONF.set_override('sync_power_state_workers', 8, group='conductor') + + with mock.patch.object(self.service, 'iter_nodes', + new=mock.MagicMock(return_value=[None] * 6)): + + self.service._sync_power_states(self.context) + + self.assertEqual(5, spawn_mock.call_count) + self.assertEqual(1, sync_mock.call_count) + self.assertEqual(1, waiter_mock.call_count) + + def test__sync_power_states_1_nodes_8_workers( + self, sync_mock, spawn_mock, waiter_mock): + + CONF.set_override('sync_power_state_workers', 8, group='conductor') + + with mock.patch.object(self.service, 'iter_nodes', + new=mock.MagicMock(return_value=[None])): + + self.service._sync_power_states(self.context) + + self.assertEqual(0, spawn_mock.call_count) + self.assertEqual(1, sync_mock.call_count) + self.assertEqual(1, waiter_mock.call_count) + + def test__sync_power_states_9_nodes_1_worker( + self, sync_mock, spawn_mock, waiter_mock): + + CONF.set_override('sync_power_state_workers', 1, group='conductor') + + with mock.patch.object(self.service, 'iter_nodes', + new=mock.MagicMock(return_value=[None] * 9)): + + self.service._sync_power_states(self.context) + + self.assertEqual(0, spawn_mock.call_count) + self.assertEqual(9, sync_mock.call_count) + self.assertEqual(1, waiter_mock.call_count) + + @mock.patch.object(task_manager, 'acquire') @mock.patch.object(manager.ConductorManager, '_mapped_to_this_conductor') @mock.patch.object(dbapi.IMPL, 'get_nodeinfo_list') diff --git a/releasenotes/notes/add-parallel-power-syncs-b099d66e80aab616.yaml b/releasenotes/notes/add-parallel-power-syncs-b099d66e80aab616.yaml new file mode 100644 index 000000000..c77a48274 --- /dev/null +++ b/releasenotes/notes/add-parallel-power-syncs-b099d66e80aab616.yaml @@ -0,0 +1,9 @@ +--- +features: + - | + Parallelizes periodic power sync calls by running up to + ``sync_power_state_workers`` simultenously. The default is to run + up to ``8`` workers. + This change should let larger-scale setups to run power syncs more + frequently and make the whole power sync procedure more resilient to slow + or dead BMCs. |