summaryrefslogtreecommitdiff
path: root/nova/context.py
diff options
context:
space:
mode:
authormelanie witt <melwittt@gmail.com>2017-06-20 23:35:41 +0000
committermelanie witt <melwittt@gmail.com>2017-06-29 19:41:20 +0000
commitc7e3149d7203b4753175d873822369f0e3fb4e54 (patch)
treee3dbb15fed25ed639ad86dbe02966c0a9fa13f33 /nova/context.py
parentd121a1de93dcebfe7d7ba5d41a2137bd530fde53 (diff)
downloadnova-c7e3149d7203b4753175d873822369f0e3fb4e54.tar.gz
Add scatter gather utilities for cells
This adds utilities: scatter_gather_all_cells(), scatter_gather_skip_cell0(), and scatter_gather_cells() for querying cells in parallel using eventlet green threads. Change-Id: I289932176e8029b0f9a76dbfb963f8ac218fdc06
Diffstat (limited to 'nova/context.py')
-rw-r--r--nova/context.py133
1 files changed, 133 insertions, 0 deletions
diff --git a/nova/context.py b/nova/context.py
index 6f59107695..b47b63f985 100644
--- a/nova/context.py
+++ b/nova/context.py
@@ -20,6 +20,8 @@
from contextlib import contextmanager
import copy
+import eventlet.queue
+import eventlet.timeout
from keystoneauth1.access import service_catalog as ksa_service_catalog
from keystoneauth1 import plugin
from oslo_context import context
@@ -30,6 +32,7 @@ import six
from nova import exception
from nova.i18n import _
+from nova import objects
from nova import policy
from nova import utils
@@ -38,6 +41,16 @@ LOG = logging.getLogger(__name__)
# SIGHUP and periodically based on an expiration time. Currently, none of the
# cell caches are purged, so neither is this one, for now.
CELL_CACHE = {}
+# NOTE(melwitt): Used for the scatter-gather utility to indicate we timed out
+# waiting for a result from a cell.
+did_not_respond_sentinel = object()
+# NOTE(melwitt): Used for the scatter-gather utility to indicate an exception
+# was raised gathering a result from a cell.
+raised_exception_sentinel = object()
+# FIXME(danms): Keep a global cache of the cells we find the
+# first time we look. This needs to be refreshed on a timer or
+# trigger.
+CELLS = []
class _ContextAuthPlugin(plugin.BaseAuthPlugin):
@@ -415,3 +428,123 @@ def target_cell(context, cell_mapping):
cctxt = copy.copy(context)
set_target_cell(cctxt, cell_mapping)
yield cctxt
+
+
+def scatter_gather_cells(context, cell_mappings, timeout, fn, *args, **kwargs):
+ """Target cells in parallel and return their results.
+
+ The first parameter in the signature of the function to call for each cell
+ should be of type RequestContext.
+
+ :param context: The RequestContext for querying cells
+ :param cell_mappings: The CellMappings to target in parallel
+ :param timeout: The total time in seconds to wait for all the results to be
+ gathered
+ :param fn: The function to call for each cell
+ :param args: The args for the function to call for each cell, not including
+ the RequestContext
+ :param kwargs: The kwargs for the function to call for each cell
+ :returns: A dict {cell_uuid: result} containing the joined results. The
+ did_not_respond_sentinel will be returned if a cell did not
+ respond within the timeout. The raised_exception_sentinel will
+ be returned if the call to a cell raised an exception. The
+ exception will be logged.
+ """
+ greenthreads = []
+ queue = eventlet.queue.LightQueue()
+ results = {}
+
+ def gather_result(cell_uuid, fn, *args, **kwargs):
+ try:
+ result = fn(*args, **kwargs)
+ except Exception:
+ LOG.exception('Error gathering result from cell %s', cell_uuid)
+ result = raised_exception_sentinel
+ # The queue is already synchronized.
+ queue.put((cell_uuid, result))
+
+ for cell_mapping in cell_mappings:
+ with target_cell(context, cell_mapping) as cctxt:
+ greenthreads.append((cell_mapping.uuid,
+ utils.spawn(gather_result, cell_mapping.uuid,
+ fn, cctxt, *args, **kwargs)))
+
+ with eventlet.timeout.Timeout(timeout, exception.CellTimeout):
+ try:
+ while len(results) != len(greenthreads):
+ cell_uuid, result = queue.get()
+ results[cell_uuid] = result
+ except exception.CellTimeout:
+ # NOTE(melwitt): We'll fill in did_not_respond_sentinels at the
+ # same time we kill/wait for the green threads.
+ pass
+
+ # Kill the green threads still pending and wait on those we know are done.
+ for cell_uuid, greenthread in greenthreads:
+ if cell_uuid not in results:
+ greenthread.kill()
+ results[cell_uuid] = did_not_respond_sentinel
+ LOG.warning('Timed out waiting for response from cell %s',
+ cell_uuid)
+ else:
+ greenthread.wait()
+
+ return results
+
+
+def load_cells():
+ global CELLS
+ if not CELLS:
+ CELLS = objects.CellMappingList.get_all(get_admin_context())
+ LOG.debug('Found %(count)i cells: %(cells)s',
+ dict(count=len(CELLS),
+ cells=','.join([c.identity for c in CELLS])))
+
+ if not CELLS:
+ LOG.error('No cells are configured, unable to continue')
+
+
+def scatter_gather_skip_cell0(context, fn, *args, **kwargs):
+ """Target all cells except cell0 in parallel and return their results.
+
+ The first parameter in the signature of the function to call for each cell
+ should be of type RequestContext. There is a 60 second timeout for waiting
+ on all results to be gathered.
+
+ :param context: The RequestContext for querying cells
+ :param fn: The function to call for each cell
+ :param args: The args for the function to call for each cell, not including
+ the RequestContext
+ :param kwargs: The kwargs for the function to call for each cell
+ :returns: A dict {cell_uuid: result} containing the joined results. The
+ did_not_respond_sentinel will be returned if a cell did not
+ respond within the timeout. The raised_exception_sentinel will
+ be returned if the call to a cell raised an exception. The
+ exception will be logged.
+ """
+ load_cells()
+ cell_mappings = [cell for cell in CELLS if not cell.is_cell0()]
+ return scatter_gather_cells(context, cell_mappings, 60, fn, *args,
+ **kwargs)
+
+
+def scatter_gather_all_cells(context, fn, *args, **kwargs):
+ """Target all cells in parallel and return their results.
+
+ The first parameter in the signature of the function to call for each cell
+ should be of type RequestContext. There is a 60 second timeout for waiting
+ on all results to be gathered.
+
+ :param context: The RequestContext for querying cells
+ :param fn: The function to call for each cell
+ :param args: The args for the function to call for each cell, not including
+ the RequestContext
+ :param kwargs: The kwargs for the function to call for each cell
+ :returns: A dict {cell_uuid: result} containing the joined results. The
+ did_not_respond_sentinel will be returned if a cell did not
+ respond within the timeout. The raised_exception_sentinel will
+ be returned if the call to a cell raised an exception. The
+ exception will be logged.
+ """
+ load_cells()
+ return scatter_gather_cells(context, CELLS, 60, fn, *args, **kwargs)