From 5b06e2dd0a77a5faf2c52558d825ed52d3cb6ed0 Mon Sep 17 00:00:00 2001 From: James Ennis Date: Tue, 15 Jan 2019 15:07:15 +0000 Subject: WIP: cli.py: Spawn pull jobs in another process to isolate gRPC calls --- buildstream/_frontend/cli.py | 59 ++++++++++++++++++++++++++++++++++++++------ 1 file changed, 51 insertions(+), 8 deletions(-) diff --git a/buildstream/_frontend/cli.py b/buildstream/_frontend/cli.py index 24012ba79..1752b8f2d 100644 --- a/buildstream/_frontend/cli.py +++ b/buildstream/_frontend/cli.py @@ -1,4 +1,6 @@ +import multiprocessing import os +import signal import sys from contextlib import ExitStack from fnmatch import fnmatch @@ -973,6 +975,9 @@ def artifact_pull(app, artifacts, deps, remote): none: No dependencies, just the element itself all: All dependencies """ + from .. import _signals + from .. import utils + from .._exceptions import CASError with app.initialized(session_name="Pull"): cache = app.context.artifactcache @@ -1016,14 +1021,52 @@ def artifact_pull(app, artifacts, deps, remote): # Pull buildtrees? excluded_subdirs = ["buildtree"] if app.context.pull_buildtrees else None - # Try to pull the artifact from one of the remotes - remotes = [cache.create_remote(spec) for spec in remotes] - for ref in artifacts: - if cache.contains_ref(ref): - continue - for remote in remotes: - if cache.pull_ref(ref, remote, exclude_subdirs=exclude_subdirs): - break + # Define function to be used by multiprocessing + def _pull_artifact(artifact_cache_obj, remote_specs, refs, excluded_subdirs, queue): + remotes = [artifact_cache_obj.create_remote(spec) for spec in remote_specs] + pull_attempts = {'skipped': [], 'pulled': [], 'failed': []} + for ref in refs: + if artifact_cache_obj.contains_ref(ref): + pull_attempts['skipped'].append(ref) + continue + + pulled = False + for remote in remotes: + try: + if artifact_cache_obj.pull_ref(ref, remote, excluded_subdirs=excluded_subdirs): + pull_attempts['pulled'].append((ref, remote.spec.url)) + break + except CASError as e: + queue.put(e) + raise + if not pulled: + pull_attempts['failed'].append(ref) + + queue.put(pull_attempts) + + q = multiprocessing.Queue() + p = multiprocessing.Process(target=_pull_artifact, args=(cache, remotes, artifacts, excluded_subdirs, q)) + try: + with _signals.blocked([signal.SIGINT], ignore=False): + p.start() + + result = q.get() + p.join() + except KeyboardInterrupt: + utils._kill_process_tree(p.pid) + raise + + # Output to user + if isinstance(result, dict): + for ref in result['skipped']: + click.echo("'{}' already available - pull skipped.".format(ref)) + for ref, remote in result['pulled']: + click.echo("'{}' pulled from '{}'.".format(ref, remote)) + for ref in result['failed']: + urls = [remote.url for remote in remotes] + click.echo("'{}' not available in remotes: {}".format(ref, urls)) # Ugly list print + else: + raise result ################################################################## -- cgit v1.2.1