From 646fc51732b60b46385ceff0c7ea2c23372bbd72 Mon Sep 17 00:00:00 2001 From: Fabian Wiesel Date: Wed, 2 Feb 2022 10:13:06 +0100 Subject: Transport context to all threads The nova.utils.spawn and spawn_n methods transport the context (and profiling information) to the newly created threads. But the same isn't done when submitting work to thread-pools in the ComputeManager. The code doing that for spawn and spawn_n is extracted to a new function and called to submit the work to the thread-pools. Closes-Bug: #1962574 Change-Id: I9085deaa8cf0b167d87db68e4afc4a463c00569c --- nova/compute/manager.py | 7 +++-- nova/conductor/manager.py | 4 +-- nova/tests/unit/compute/test_compute_mgr.py | 8 ++++- nova/utils.py | 47 ++++++++++++++--------------- 4 files changed, 37 insertions(+), 29 deletions(-) (limited to 'nova') diff --git a/nova/compute/manager.py b/nova/compute/manager.py index 9f8479a30e..de52973b0b 100644 --- a/nova/compute/manager.py +++ b/nova/compute/manager.py @@ -8621,7 +8621,8 @@ class ComputeManager(manager.Manager): # in order to be able to track and abort it in the future. self._waiting_live_migrations[instance.uuid] = (None, None) try: - future = self._live_migration_executor.submit( + future = nova.utils.pass_context( + self._live_migration_executor.submit, self._do_live_migration, context, dest, instance, block_migration, migration, migrate_data) self._waiting_live_migrations[instance.uuid] = (migration, future) @@ -9866,7 +9867,9 @@ class ComputeManager(manager.Manager): else: LOG.debug('Triggering sync for uuid %s', uuid) self._syncs_in_progress[uuid] = True - self._sync_power_pool.spawn_n(_sync, db_instance) + nova.utils.pass_context(self._sync_power_pool.spawn_n, + _sync, + db_instance) def _query_driver_power_state_and_sync(self, context, db_instance): if db_instance.task_state is not None: diff --git a/nova/conductor/manager.py b/nova/conductor/manager.py index f6b0815d1b..c6946a8de5 100644 --- a/nova/conductor/manager.py +++ b/nova/conductor/manager.py @@ -2049,8 +2049,8 @@ class ComputeTaskManager: skipped_host(target_ctxt, host, image_ids) continue - fetch_pool.spawn_n(wrap_cache_images, target_ctxt, host, - image_ids) + utils.pass_context(fetch_pool.spawn_n, wrap_cache_images, + target_ctxt, host, image_ids) # Wait until all those things finish fetch_pool.waitall() diff --git a/nova/tests/unit/compute/test_compute_mgr.py b/nova/tests/unit/compute/test_compute_mgr.py index 31267d3007..d56f12fecb 100644 --- a/nova/tests/unit/compute/test_compute_mgr.py +++ b/nova/tests/unit/compute/test_compute_mgr.py @@ -9154,9 +9154,15 @@ class ComputeManagerMigrationTestCase(test.NoDBTestCase, self.assertEqual(driver_console.get_connection_info.return_value, console) + @mock.patch('nova.utils.pass_context') @mock.patch('nova.compute.manager.ComputeManager.' '_do_live_migration') - def _test_max_concurrent_live(self, mock_lm): + def _test_max_concurrent_live(self, mock_lm, mock_pass_context): + # pass_context wraps the function, which doesn't work with a mock + # So we simply mock it too + def _mock_pass_context(runner, func, *args, **kwargs): + return runner(func, *args, **kwargs) + mock_pass_context.side_effect = _mock_pass_context @mock.patch('nova.objects.Migration.save') def _do_it(mock_mig_save): diff --git a/nova/utils.py b/nova/utils.py index 664056a09f..b5d45c58b5 100644 --- a/nova/utils.py +++ b/nova/utils.py @@ -632,15 +632,13 @@ def _serialize_profile_info(): return trace_info -def spawn(func, *args, **kwargs): - """Passthrough method for eventlet.spawn. - - This utility exists so that it can be stubbed for testing without - interfering with the service spawns. +def pass_context(runner, func, *args, **kwargs): + """Generalised passthrough method - It will also grab the context from the threadlocal store and add it to - the store on the new thread. This allows for continuity in logging the - context when using this method to spawn a new thread. + It will grab the context from the threadlocal store and add it to + the store on the runner. This allows for continuity in logging the + context when using this method to spawn a new thread through the + runner function """ _context = common_context.get_current() profiler_info = _serialize_profile_info() @@ -655,11 +653,11 @@ def spawn(func, *args, **kwargs): profiler.init(**profiler_info) return func(*args, **kwargs) - return eventlet.spawn(context_wrapper, *args, **kwargs) + return runner(context_wrapper, *args, **kwargs) -def spawn_n(func, *args, **kwargs): - """Passthrough method for eventlet.spawn_n. +def spawn(func, *args, **kwargs): + """Passthrough method for eventlet.spawn. This utility exists so that it can be stubbed for testing without interfering with the service spawns. @@ -668,25 +666,26 @@ def spawn_n(func, *args, **kwargs): the store on the new thread. This allows for continuity in logging the context when using this method to spawn a new thread. """ - _context = common_context.get_current() - profiler_info = _serialize_profile_info() - @functools.wraps(func) - def context_wrapper(*args, **kwargs): - # NOTE: If update_store is not called after spawn_n it won't be - # available for the logger to pull from threadlocal storage. - if _context is not None: - _context.update_store() - if profiler_info and profiler: - profiler.init(**profiler_info) - func(*args, **kwargs) + return pass_context(eventlet.spawn, func, *args, **kwargs) + + +def spawn_n(func, *args, **kwargs): + """Passthrough method for eventlet.spawn_n. + + This utility exists so that it can be stubbed for testing without + interfering with the service spawns. - eventlet.spawn_n(context_wrapper, *args, **kwargs) + It will also grab the context from the threadlocal store and add it to + the store on the new thread. This allows for continuity in logging the + context when using this method to spawn a new thread. + """ + pass_context(eventlet.spawn_n, func, *args, **kwargs) def tpool_execute(func, *args, **kwargs): """Run func in a native thread""" - tpool.execute(func, *args, **kwargs) + return pass_context(tpool.execute, func, *args, **kwargs) def is_none_string(val): -- cgit v1.2.1