summaryrefslogtreecommitdiff
path: root/src/buildstream/plugin.py
diff options
context:
space:
mode:
authorBenjamin Schubert <bschubert15@bloomberg.net>2020-07-09 19:02:20 +0100
committerBenjamin Schubert <bschubert15@bloomberg.net>2020-12-01 10:58:08 +0000
commitbe88eaec0445ff2d85b73c17a392d0e65620202b (patch)
tree1f7558750989ae902091f4bed085a8bbb7753698 /src/buildstream/plugin.py
parent0360bc1feca1d5429cdb7fbc083727d242499733 (diff)
downloadbuildstream-be88eaec0445ff2d85b73c17a392d0e65620202b.tar.gz
plugin.py: Add a helper to run blocking processes in subprocesses
This ensures that we can cleanly cleanup processes and threads on termination of BuildStream. Plugins should use this helper whenever there is a risk of them being blocked on a syscall for an indefinite amount of time * downloadablefilesource.py: Use this new helper to do the actual download, which would prevent the process from completely blocking if we have a badly behaving upstream
Diffstat (limited to 'src/buildstream/plugin.py')
-rw-r--r--src/buildstream/plugin.py148
1 files changed, 145 insertions, 3 deletions
diff --git a/src/buildstream/plugin.py b/src/buildstream/plugin.py
index 2b2382eb7..f0c1bf859 100644
--- a/src/buildstream/plugin.py
+++ b/src/buildstream/plugin.py
@@ -110,14 +110,19 @@ Class Reference
"""
import itertools
+import multiprocessing
import os
+import pickle
+import queue
+import signal
import subprocess
import sys
-from contextlib import contextmanager
-from typing import Generator, Optional, Tuple, TYPE_CHECKING
+import traceback
+from contextlib import contextmanager, suppress
+from typing import Any, Callable, Generator, Optional, Sequence, Tuple, TypeVar, TYPE_CHECKING
from weakref import WeakValueDictionary
-from . import utils
+from . import utils, _signals
from ._exceptions import PluginError, ImplError
from ._message import Message, MessageType
from .node import Node, MappingNode
@@ -131,6 +136,36 @@ if TYPE_CHECKING:
# pylint: enable=cyclic-import
+T1 = TypeVar("T1")
+
+
+# _background_job_wrapper()
+#
+# Wrapper for running jobs in the background, transparently for users
+#
+# This method will put on the queue a response of the form:
+# (PickleError, OtherError, Result)
+#
+# Args:
+# result_queue: The queue in which to pass back the result
+# target: function to execute in the background
+# args: positional arguments to give to the target function
+#
+def _background_job_wrapper(result_queue: multiprocessing.Queue, target: Callable[..., T1], args: Any) -> None:
+ result = None
+
+ try:
+ result = target(*args)
+ result_queue.put((None, result))
+ except Exception as exc: # pylint: disable=broad-except
+ try:
+ # Here we send the result again, just in case it was a PickleError
+ # in which case the same exception would be thrown down
+ result_queue.put((exc, result))
+ except pickle.PickleError as exc:
+ result_queue.put((traceback.format_exc(), None))
+
+
class Plugin:
"""Plugin()
@@ -212,6 +247,18 @@ class Plugin:
# scheduling tasks.
__TABLE = WeakValueDictionary() # type: WeakValueDictionary[int, Plugin]
+ try:
+ __multiprocessing_context = multiprocessing.get_context("forkserver")
+ except ValueError:
+ # We are on a system without `forkserver` support. Let's default to
+ # spawn. This seems to be hanging however in some rare cases.
+ #
+ # Support is not as critical for now, since we do not work on
+ # platforms not supporting forkserver for now (mainly Windows)
+ #
+ # XXX: investigate why we sometimes get deadlocks there
+ __multiprocessing_context = multiprocessing.get_context("spawn")
+
def __init__(
self,
name: str,
@@ -509,6 +556,101 @@ class Plugin:
):
yield
+ def blocking_activity(
+ self,
+ target: Callable[..., T1],
+ args: Sequence[Any],
+ activity_name: str,
+ *,
+ detail: Optional[str] = None,
+ silent_nested: bool = False
+ ) -> T1:
+ """Execute a blocking activity in the background.
+
+ This is to execute potentially blocking methods in the background,
+ in order to avoid starving the scheduler.
+
+ The function, its arguments and return value must all be pickleable,
+ as it will be run in another process.
+
+ This should be used whenever there is a potential for a blocking
+ syscall to not return in a reasonable (<1s) amount of time.
+ For example, you would use this if you were doing a request to a
+ remote server, without a timeout.
+
+ Args:
+ target: the function to execute in the background
+ args: positional arguments to pass to the method to call
+ activity_name: The name of the activity
+ detail: An optional detailed message, can be multiline output
+ silent_nested: If specified, nested messages will be silenced
+
+ Returns:
+ the return value from `target`.
+ """
+ with self.__context.messenger.timed_activity(
+ activity_name, element_name=self._get_full_name(), detail=detail, silent_nested=silent_nested
+ ):
+ result_queue = self.__multiprocessing_context.Queue()
+ proc = None
+
+ def kill_proc():
+ if proc and proc.is_alive():
+ proc.kill()
+ proc.join()
+
+ def suspend_proc():
+ if proc and proc.is_alive():
+ with suppress(ProcessLookupError):
+ os.kill(proc.pid, signal.SIGSTOP)
+
+ def resume_proc():
+ if proc and proc.is_alive():
+ with suppress(ProcessLookupError):
+ os.kill(proc.pid, signal.SIGCONT)
+
+ with _signals.suspendable(suspend_proc, resume_proc), _signals.terminator(kill_proc):
+ proc = self.__multiprocessing_context.Process(
+ target=_background_job_wrapper, args=(result_queue, target, args)
+ )
+ with _signals.blocked([signal.SIGINT, signal.SIGTSTP, signal.SIGTERM], ignore=False):
+ proc.start()
+
+ should_continue = True
+ last_check = False
+
+ while should_continue or last_check:
+ last_check = False
+
+ try:
+ err, result = result_queue.get(timeout=1)
+ break
+ except queue.Empty:
+ if not proc.is_alive() and should_continue:
+ # Let's check one last time, just in case it stopped
+ # between our last check and now
+ last_check = True
+ should_continue = False
+ continue
+ else:
+ raise PluginError("Background process died with error code {}".format(proc.exitcode))
+
+ try:
+ proc.join(timeout=15)
+ proc.terminate()
+ except TimeoutError:
+ raise PluginError("Background process didn't exit after 15 seconds and got killed.")
+
+ if err is not None:
+ if isinstance(err, str):
+ # This was a pickle error, this is a bug
+ raise PluginError(
+ "An error happened while returning the result from a blocking activity", detail=err
+ )
+ raise err
+
+ return result
+
def call(self, *popenargs, fail: Optional[str] = None, fail_temporarily: bool = False, **kwargs) -> int:
"""A wrapper for subprocess.call()