diff options
author | Benjamin Schubert <bschubert15@bloomberg.net> | 2020-07-09 19:02:20 +0100 |
---|---|---|
committer | Benjamin Schubert <bschubert15@bloomberg.net> | 2020-12-01 10:58:08 +0000 |
commit | be88eaec0445ff2d85b73c17a392d0e65620202b (patch) | |
tree | 1f7558750989ae902091f4bed085a8bbb7753698 /src/buildstream/plugin.py | |
parent | 0360bc1feca1d5429cdb7fbc083727d242499733 (diff) | |
download | buildstream-be88eaec0445ff2d85b73c17a392d0e65620202b.tar.gz |
plugin.py: Add a helper to run blocking processes in subprocesses
This ensures that we can cleanly cleanup processes and threads on
termination of BuildStream.
Plugins should use this helper whenever there is a risk of them being
blocked on a syscall for an indefinite amount of time
* downloadablefilesource.py: Use this new helper to do the actual
download, which would prevent the process from completely blocking if
we have a badly behaving upstream
Diffstat (limited to 'src/buildstream/plugin.py')
-rw-r--r-- | src/buildstream/plugin.py | 148 |
1 files changed, 145 insertions, 3 deletions
diff --git a/src/buildstream/plugin.py b/src/buildstream/plugin.py index 2b2382eb7..f0c1bf859 100644 --- a/src/buildstream/plugin.py +++ b/src/buildstream/plugin.py @@ -110,14 +110,19 @@ Class Reference """ import itertools +import multiprocessing import os +import pickle +import queue +import signal import subprocess import sys -from contextlib import contextmanager -from typing import Generator, Optional, Tuple, TYPE_CHECKING +import traceback +from contextlib import contextmanager, suppress +from typing import Any, Callable, Generator, Optional, Sequence, Tuple, TypeVar, TYPE_CHECKING from weakref import WeakValueDictionary -from . import utils +from . import utils, _signals from ._exceptions import PluginError, ImplError from ._message import Message, MessageType from .node import Node, MappingNode @@ -131,6 +136,36 @@ if TYPE_CHECKING: # pylint: enable=cyclic-import +T1 = TypeVar("T1") + + +# _background_job_wrapper() +# +# Wrapper for running jobs in the background, transparently for users +# +# This method will put on the queue a response of the form: +# (PickleError, OtherError, Result) +# +# Args: +# result_queue: The queue in which to pass back the result +# target: function to execute in the background +# args: positional arguments to give to the target function +# +def _background_job_wrapper(result_queue: multiprocessing.Queue, target: Callable[..., T1], args: Any) -> None: + result = None + + try: + result = target(*args) + result_queue.put((None, result)) + except Exception as exc: # pylint: disable=broad-except + try: + # Here we send the result again, just in case it was a PickleError + # in which case the same exception would be thrown down + result_queue.put((exc, result)) + except pickle.PickleError as exc: + result_queue.put((traceback.format_exc(), None)) + + class Plugin: """Plugin() @@ -212,6 +247,18 @@ class Plugin: # scheduling tasks. __TABLE = WeakValueDictionary() # type: WeakValueDictionary[int, Plugin] + try: + __multiprocessing_context = multiprocessing.get_context("forkserver") + except ValueError: + # We are on a system without `forkserver` support. Let's default to + # spawn. This seems to be hanging however in some rare cases. + # + # Support is not as critical for now, since we do not work on + # platforms not supporting forkserver for now (mainly Windows) + # + # XXX: investigate why we sometimes get deadlocks there + __multiprocessing_context = multiprocessing.get_context("spawn") + def __init__( self, name: str, @@ -509,6 +556,101 @@ class Plugin: ): yield + def blocking_activity( + self, + target: Callable[..., T1], + args: Sequence[Any], + activity_name: str, + *, + detail: Optional[str] = None, + silent_nested: bool = False + ) -> T1: + """Execute a blocking activity in the background. + + This is to execute potentially blocking methods in the background, + in order to avoid starving the scheduler. + + The function, its arguments and return value must all be pickleable, + as it will be run in another process. + + This should be used whenever there is a potential for a blocking + syscall to not return in a reasonable (<1s) amount of time. + For example, you would use this if you were doing a request to a + remote server, without a timeout. + + Args: + target: the function to execute in the background + args: positional arguments to pass to the method to call + activity_name: The name of the activity + detail: An optional detailed message, can be multiline output + silent_nested: If specified, nested messages will be silenced + + Returns: + the return value from `target`. + """ + with self.__context.messenger.timed_activity( + activity_name, element_name=self._get_full_name(), detail=detail, silent_nested=silent_nested + ): + result_queue = self.__multiprocessing_context.Queue() + proc = None + + def kill_proc(): + if proc and proc.is_alive(): + proc.kill() + proc.join() + + def suspend_proc(): + if proc and proc.is_alive(): + with suppress(ProcessLookupError): + os.kill(proc.pid, signal.SIGSTOP) + + def resume_proc(): + if proc and proc.is_alive(): + with suppress(ProcessLookupError): + os.kill(proc.pid, signal.SIGCONT) + + with _signals.suspendable(suspend_proc, resume_proc), _signals.terminator(kill_proc): + proc = self.__multiprocessing_context.Process( + target=_background_job_wrapper, args=(result_queue, target, args) + ) + with _signals.blocked([signal.SIGINT, signal.SIGTSTP, signal.SIGTERM], ignore=False): + proc.start() + + should_continue = True + last_check = False + + while should_continue or last_check: + last_check = False + + try: + err, result = result_queue.get(timeout=1) + break + except queue.Empty: + if not proc.is_alive() and should_continue: + # Let's check one last time, just in case it stopped + # between our last check and now + last_check = True + should_continue = False + continue + else: + raise PluginError("Background process died with error code {}".format(proc.exitcode)) + + try: + proc.join(timeout=15) + proc.terminate() + except TimeoutError: + raise PluginError("Background process didn't exit after 15 seconds and got killed.") + + if err is not None: + if isinstance(err, str): + # This was a pickle error, this is a bug + raise PluginError( + "An error happened while returning the result from a blocking activity", detail=err + ) + raise err + + return result + def call(self, *popenargs, fail: Optional[str] = None, fail_temporarily: bool = False, **kwargs) -> int: """A wrapper for subprocess.call() |