diff options
author | melanie witt <melwittt@gmail.com> | 2017-06-20 23:35:41 +0000 |
---|---|---|
committer | melanie witt <melwittt@gmail.com> | 2017-06-29 19:41:20 +0000 |
commit | c7e3149d7203b4753175d873822369f0e3fb4e54 (patch) | |
tree | e3dbb15fed25ed639ad86dbe02966c0a9fa13f33 /nova/context.py | |
parent | d121a1de93dcebfe7d7ba5d41a2137bd530fde53 (diff) | |
download | nova-c7e3149d7203b4753175d873822369f0e3fb4e54.tar.gz |
Add scatter gather utilities for cells
This adds utilities: scatter_gather_all_cells(),
scatter_gather_skip_cell0(), and scatter_gather_cells() for querying
cells in parallel using eventlet green threads.
Change-Id: I289932176e8029b0f9a76dbfb963f8ac218fdc06
Diffstat (limited to 'nova/context.py')
-rw-r--r-- | nova/context.py | 133 |
1 files changed, 133 insertions, 0 deletions
diff --git a/nova/context.py b/nova/context.py index 6f59107695..b47b63f985 100644 --- a/nova/context.py +++ b/nova/context.py @@ -20,6 +20,8 @@ from contextlib import contextmanager import copy +import eventlet.queue +import eventlet.timeout from keystoneauth1.access import service_catalog as ksa_service_catalog from keystoneauth1 import plugin from oslo_context import context @@ -30,6 +32,7 @@ import six from nova import exception from nova.i18n import _ +from nova import objects from nova import policy from nova import utils @@ -38,6 +41,16 @@ LOG = logging.getLogger(__name__) # SIGHUP and periodically based on an expiration time. Currently, none of the # cell caches are purged, so neither is this one, for now. CELL_CACHE = {} +# NOTE(melwitt): Used for the scatter-gather utility to indicate we timed out +# waiting for a result from a cell. +did_not_respond_sentinel = object() +# NOTE(melwitt): Used for the scatter-gather utility to indicate an exception +# was raised gathering a result from a cell. +raised_exception_sentinel = object() +# FIXME(danms): Keep a global cache of the cells we find the +# first time we look. This needs to be refreshed on a timer or +# trigger. +CELLS = [] class _ContextAuthPlugin(plugin.BaseAuthPlugin): @@ -415,3 +428,123 @@ def target_cell(context, cell_mapping): cctxt = copy.copy(context) set_target_cell(cctxt, cell_mapping) yield cctxt + + +def scatter_gather_cells(context, cell_mappings, timeout, fn, *args, **kwargs): + """Target cells in parallel and return their results. + + The first parameter in the signature of the function to call for each cell + should be of type RequestContext. + + :param context: The RequestContext for querying cells + :param cell_mappings: The CellMappings to target in parallel + :param timeout: The total time in seconds to wait for all the results to be + gathered + :param fn: The function to call for each cell + :param args: The args for the function to call for each cell, not including + the RequestContext + :param kwargs: The kwargs for the function to call for each cell + :returns: A dict {cell_uuid: result} containing the joined results. The + did_not_respond_sentinel will be returned if a cell did not + respond within the timeout. The raised_exception_sentinel will + be returned if the call to a cell raised an exception. The + exception will be logged. + """ + greenthreads = [] + queue = eventlet.queue.LightQueue() + results = {} + + def gather_result(cell_uuid, fn, *args, **kwargs): + try: + result = fn(*args, **kwargs) + except Exception: + LOG.exception('Error gathering result from cell %s', cell_uuid) + result = raised_exception_sentinel + # The queue is already synchronized. + queue.put((cell_uuid, result)) + + for cell_mapping in cell_mappings: + with target_cell(context, cell_mapping) as cctxt: + greenthreads.append((cell_mapping.uuid, + utils.spawn(gather_result, cell_mapping.uuid, + fn, cctxt, *args, **kwargs))) + + with eventlet.timeout.Timeout(timeout, exception.CellTimeout): + try: + while len(results) != len(greenthreads): + cell_uuid, result = queue.get() + results[cell_uuid] = result + except exception.CellTimeout: + # NOTE(melwitt): We'll fill in did_not_respond_sentinels at the + # same time we kill/wait for the green threads. + pass + + # Kill the green threads still pending and wait on those we know are done. + for cell_uuid, greenthread in greenthreads: + if cell_uuid not in results: + greenthread.kill() + results[cell_uuid] = did_not_respond_sentinel + LOG.warning('Timed out waiting for response from cell %s', + cell_uuid) + else: + greenthread.wait() + + return results + + +def load_cells(): + global CELLS + if not CELLS: + CELLS = objects.CellMappingList.get_all(get_admin_context()) + LOG.debug('Found %(count)i cells: %(cells)s', + dict(count=len(CELLS), + cells=','.join([c.identity for c in CELLS]))) + + if not CELLS: + LOG.error('No cells are configured, unable to continue') + + +def scatter_gather_skip_cell0(context, fn, *args, **kwargs): + """Target all cells except cell0 in parallel and return their results. + + The first parameter in the signature of the function to call for each cell + should be of type RequestContext. There is a 60 second timeout for waiting + on all results to be gathered. + + :param context: The RequestContext for querying cells + :param fn: The function to call for each cell + :param args: The args for the function to call for each cell, not including + the RequestContext + :param kwargs: The kwargs for the function to call for each cell + :returns: A dict {cell_uuid: result} containing the joined results. The + did_not_respond_sentinel will be returned if a cell did not + respond within the timeout. The raised_exception_sentinel will + be returned if the call to a cell raised an exception. The + exception will be logged. + """ + load_cells() + cell_mappings = [cell for cell in CELLS if not cell.is_cell0()] + return scatter_gather_cells(context, cell_mappings, 60, fn, *args, + **kwargs) + + +def scatter_gather_all_cells(context, fn, *args, **kwargs): + """Target all cells in parallel and return their results. + + The first parameter in the signature of the function to call for each cell + should be of type RequestContext. There is a 60 second timeout for waiting + on all results to be gathered. + + :param context: The RequestContext for querying cells + :param fn: The function to call for each cell + :param args: The args for the function to call for each cell, not including + the RequestContext + :param kwargs: The kwargs for the function to call for each cell + :returns: A dict {cell_uuid: result} containing the joined results. The + did_not_respond_sentinel will be returned if a cell did not + respond within the timeout. The raised_exception_sentinel will + be returned if the call to a cell raised an exception. The + exception will be logged. + """ + load_cells() + return scatter_gather_cells(context, CELLS, 60, fn, *args, **kwargs) |