diff options
author | Benjamin Schubert <bschubert15@bloomberg.net> | 2020-07-09 19:02:20 +0100 |
---|---|---|
committer | Benjamin Schubert <bschubert15@bloomberg.net> | 2020-12-01 10:58:08 +0000 |
commit | be88eaec0445ff2d85b73c17a392d0e65620202b (patch) | |
tree | 1f7558750989ae902091f4bed085a8bbb7753698 /src | |
parent | 0360bc1feca1d5429cdb7fbc083727d242499733 (diff) | |
download | buildstream-be88eaec0445ff2d85b73c17a392d0e65620202b.tar.gz |
plugin.py: Add a helper to run blocking processes in subprocesses
This ensures that we can cleanly cleanup processes and threads on
termination of BuildStream.
Plugins should use this helper whenever there is a risk of them being
blocked on a syscall for an indefinite amount of time
* downloadablefilesource.py: Use this new helper to do the actual
download, which would prevent the process from completely blocking if
we have a badly behaving upstream
Diffstat (limited to 'src')
-rw-r--r-- | src/buildstream/downloadablefilesource.py | 100 | ||||
-rw-r--r-- | src/buildstream/plugin.py | 148 |
2 files changed, 200 insertions, 48 deletions
diff --git a/src/buildstream/downloadablefilesource.py b/src/buildstream/downloadablefilesource.py index 495b8bd4a..b299b7f1b 100644 --- a/src/buildstream/downloadablefilesource.py +++ b/src/buildstream/downloadablefilesource.py @@ -92,6 +92,33 @@ class _NetrcPasswordManager: return login, password +def _download_file(opener, url, etag, directory): + default_name = os.path.basename(url) + request = urllib.request.Request(url) + request.add_header("Accept", "*/*") + request.add_header("User-Agent", "BuildStream/2") + + if etag is not None: + request.add_header("If-None-Match", etag) + + with contextlib.closing(opener.open(request)) as response: + info = response.info() + + # some servers don't honor the 'If-None-Match' header + if etag and info["ETag"] == etag: + return None, None + + etag = info["ETag"] + + filename = info.get_filename(default_name) + filename = os.path.basename(filename) + local_file = os.path.join(directory, filename) + with open(local_file, "wb") as dest: + shutil.copyfileobj(response, dest) + + return local_file, etag + + class DownloadableFileSource(Source): # pylint: disable=attribute-defined-outside-init @@ -130,19 +157,18 @@ class DownloadableFileSource(Source): # there is no 'track' field in the source to determine what/whether # or not to update refs, because tracking a ref is always a conscious # decision by the user. - with self.timed_activity("Tracking {}".format(self.url), silent_nested=True): - new_ref = self._ensure_mirror() - - if self.ref and self.ref != new_ref: - detail = ( - "When tracking, new ref differs from current ref:\n" - + " Tracked URL: {}\n".format(self.url) - + " Current ref: {}\n".format(self.ref) - + " New ref: {}\n".format(new_ref) - ) - self.warn("Potential man-in-the-middle attack!", detail=detail) + new_ref = self._ensure_mirror("Tracking {}".format(self.url)) - return new_ref + if self.ref and self.ref != new_ref: + detail = ( + "When tracking, new ref differs from current ref:\n" + + " Tracked URL: {}\n".format(self.url) + + " Current ref: {}\n".format(self.ref) + + " New ref: {}\n".format(new_ref) + ) + self.warn("Potential man-in-the-middle attack!", detail=detail) + + return new_ref def fetch(self): # pylint: disable=arguments-differ @@ -155,12 +181,11 @@ class DownloadableFileSource(Source): # Download the file, raise hell if the sha256sums don't match, # and mirror the file otherwise. - with self.timed_activity("Fetching {}".format(self.url), silent_nested=True): - sha256 = self._ensure_mirror() - if sha256 != self.ref: - raise SourceError( - "File downloaded from {} has sha256sum '{}', not '{}'!".format(self.url, sha256, self.ref) - ) + sha256 = self._ensure_mirror("Fetching {}".format(self.url),) + if sha256 != self.ref: + raise SourceError( + "File downloaded from {} has sha256sum '{}', not '{}'!".format(self.url, sha256, self.ref) + ) def _warn_deprecated_etag(self, node): etag = node.get_str("etag", None) @@ -181,40 +206,25 @@ class DownloadableFileSource(Source): with utils.save_file_atomic(etagfilename) as etagfile: etagfile.write(etag) - def _ensure_mirror(self): + def _ensure_mirror(self, activity_name: str): # Downloads from the url and caches it according to its sha256sum. try: with self.tempdir() as td: - default_name = os.path.basename(self.url) - request = urllib.request.Request(self.url) - request.add_header("Accept", "*/*") - request.add_header("User-Agent", "BuildStream/2") - # We do not use etag in case what we have in cache is # not matching ref in order to be able to recover from # corrupted download. - if self.ref: - etag = self._get_etag(self.ref) - + if self.ref and not self.is_cached(): # Do not re-download the file if the ETag matches. - if etag and self.is_cached(): - request.add_header("If-None-Match", etag) - - opener = self.__get_urlopener() - with contextlib.closing(opener.open(request)) as response: - info = response.info() - - # some servers don't honor the 'If-None-Match' header - if self.ref and etag and info["ETag"] == etag: - return self.ref + etag = self._get_etag(self.ref) + else: + etag = None - etag = info["ETag"] + local_file, new_etag = self.blocking_activity( + _download_file, (self.__get_urlopener(), self.url, etag, td), activity_name + ) - filename = info.get_filename(default_name) - filename = os.path.basename(filename) - local_file = os.path.join(td, filename) - with open(local_file, "wb") as dest: - shutil.copyfileobj(response, dest) + if local_file is None: + return self.ref # Make sure url-specific mirror dir exists. if not os.path.isdir(self._mirror_dir): @@ -226,8 +236,8 @@ class DownloadableFileSource(Source): # In case the old file was corrupted somehow. os.rename(local_file, self._get_mirror_file(sha256)) - if etag: - self._store_etag(sha256, etag) + if new_etag: + self._store_etag(sha256, new_etag) return sha256 except urllib.error.HTTPError as e: diff --git a/src/buildstream/plugin.py b/src/buildstream/plugin.py index 2b2382eb7..f0c1bf859 100644 --- a/src/buildstream/plugin.py +++ b/src/buildstream/plugin.py @@ -110,14 +110,19 @@ Class Reference """ import itertools +import multiprocessing import os +import pickle +import queue +import signal import subprocess import sys -from contextlib import contextmanager -from typing import Generator, Optional, Tuple, TYPE_CHECKING +import traceback +from contextlib import contextmanager, suppress +from typing import Any, Callable, Generator, Optional, Sequence, Tuple, TypeVar, TYPE_CHECKING from weakref import WeakValueDictionary -from . import utils +from . import utils, _signals from ._exceptions import PluginError, ImplError from ._message import Message, MessageType from .node import Node, MappingNode @@ -131,6 +136,36 @@ if TYPE_CHECKING: # pylint: enable=cyclic-import +T1 = TypeVar("T1") + + +# _background_job_wrapper() +# +# Wrapper for running jobs in the background, transparently for users +# +# This method will put on the queue a response of the form: +# (PickleError, OtherError, Result) +# +# Args: +# result_queue: The queue in which to pass back the result +# target: function to execute in the background +# args: positional arguments to give to the target function +# +def _background_job_wrapper(result_queue: multiprocessing.Queue, target: Callable[..., T1], args: Any) -> None: + result = None + + try: + result = target(*args) + result_queue.put((None, result)) + except Exception as exc: # pylint: disable=broad-except + try: + # Here we send the result again, just in case it was a PickleError + # in which case the same exception would be thrown down + result_queue.put((exc, result)) + except pickle.PickleError as exc: + result_queue.put((traceback.format_exc(), None)) + + class Plugin: """Plugin() @@ -212,6 +247,18 @@ class Plugin: # scheduling tasks. __TABLE = WeakValueDictionary() # type: WeakValueDictionary[int, Plugin] + try: + __multiprocessing_context = multiprocessing.get_context("forkserver") + except ValueError: + # We are on a system without `forkserver` support. Let's default to + # spawn. This seems to be hanging however in some rare cases. + # + # Support is not as critical for now, since we do not work on + # platforms not supporting forkserver for now (mainly Windows) + # + # XXX: investigate why we sometimes get deadlocks there + __multiprocessing_context = multiprocessing.get_context("spawn") + def __init__( self, name: str, @@ -509,6 +556,101 @@ class Plugin: ): yield + def blocking_activity( + self, + target: Callable[..., T1], + args: Sequence[Any], + activity_name: str, + *, + detail: Optional[str] = None, + silent_nested: bool = False + ) -> T1: + """Execute a blocking activity in the background. + + This is to execute potentially blocking methods in the background, + in order to avoid starving the scheduler. + + The function, its arguments and return value must all be pickleable, + as it will be run in another process. + + This should be used whenever there is a potential for a blocking + syscall to not return in a reasonable (<1s) amount of time. + For example, you would use this if you were doing a request to a + remote server, without a timeout. + + Args: + target: the function to execute in the background + args: positional arguments to pass to the method to call + activity_name: The name of the activity + detail: An optional detailed message, can be multiline output + silent_nested: If specified, nested messages will be silenced + + Returns: + the return value from `target`. + """ + with self.__context.messenger.timed_activity( + activity_name, element_name=self._get_full_name(), detail=detail, silent_nested=silent_nested + ): + result_queue = self.__multiprocessing_context.Queue() + proc = None + + def kill_proc(): + if proc and proc.is_alive(): + proc.kill() + proc.join() + + def suspend_proc(): + if proc and proc.is_alive(): + with suppress(ProcessLookupError): + os.kill(proc.pid, signal.SIGSTOP) + + def resume_proc(): + if proc and proc.is_alive(): + with suppress(ProcessLookupError): + os.kill(proc.pid, signal.SIGCONT) + + with _signals.suspendable(suspend_proc, resume_proc), _signals.terminator(kill_proc): + proc = self.__multiprocessing_context.Process( + target=_background_job_wrapper, args=(result_queue, target, args) + ) + with _signals.blocked([signal.SIGINT, signal.SIGTSTP, signal.SIGTERM], ignore=False): + proc.start() + + should_continue = True + last_check = False + + while should_continue or last_check: + last_check = False + + try: + err, result = result_queue.get(timeout=1) + break + except queue.Empty: + if not proc.is_alive() and should_continue: + # Let's check one last time, just in case it stopped + # between our last check and now + last_check = True + should_continue = False + continue + else: + raise PluginError("Background process died with error code {}".format(proc.exitcode)) + + try: + proc.join(timeout=15) + proc.terminate() + except TimeoutError: + raise PluginError("Background process didn't exit after 15 seconds and got killed.") + + if err is not None: + if isinstance(err, str): + # This was a pickle error, this is a bug + raise PluginError( + "An error happened while returning the result from a blocking activity", detail=err + ) + raise err + + return result + def call(self, *popenargs, fail: Optional[str] = None, fail_temporarily: bool = False, **kwargs) -> int: """A wrapper for subprocess.call() |