summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorBenjamin Schubert <bschubert15@bloomberg.net>2020-07-09 19:02:20 +0100
committerBenjamin Schubert <bschubert15@bloomberg.net>2020-12-01 10:58:08 +0000
commitbe88eaec0445ff2d85b73c17a392d0e65620202b (patch)
tree1f7558750989ae902091f4bed085a8bbb7753698 /src
parent0360bc1feca1d5429cdb7fbc083727d242499733 (diff)
downloadbuildstream-be88eaec0445ff2d85b73c17a392d0e65620202b.tar.gz
plugin.py: Add a helper to run blocking processes in subprocesses
This ensures that we can cleanly cleanup processes and threads on termination of BuildStream. Plugins should use this helper whenever there is a risk of them being blocked on a syscall for an indefinite amount of time * downloadablefilesource.py: Use this new helper to do the actual download, which would prevent the process from completely blocking if we have a badly behaving upstream
Diffstat (limited to 'src')
-rw-r--r--src/buildstream/downloadablefilesource.py100
-rw-r--r--src/buildstream/plugin.py148
2 files changed, 200 insertions, 48 deletions
diff --git a/src/buildstream/downloadablefilesource.py b/src/buildstream/downloadablefilesource.py
index 495b8bd4a..b299b7f1b 100644
--- a/src/buildstream/downloadablefilesource.py
+++ b/src/buildstream/downloadablefilesource.py
@@ -92,6 +92,33 @@ class _NetrcPasswordManager:
return login, password
+def _download_file(opener, url, etag, directory):
+ default_name = os.path.basename(url)
+ request = urllib.request.Request(url)
+ request.add_header("Accept", "*/*")
+ request.add_header("User-Agent", "BuildStream/2")
+
+ if etag is not None:
+ request.add_header("If-None-Match", etag)
+
+ with contextlib.closing(opener.open(request)) as response:
+ info = response.info()
+
+ # some servers don't honor the 'If-None-Match' header
+ if etag and info["ETag"] == etag:
+ return None, None
+
+ etag = info["ETag"]
+
+ filename = info.get_filename(default_name)
+ filename = os.path.basename(filename)
+ local_file = os.path.join(directory, filename)
+ with open(local_file, "wb") as dest:
+ shutil.copyfileobj(response, dest)
+
+ return local_file, etag
+
+
class DownloadableFileSource(Source):
# pylint: disable=attribute-defined-outside-init
@@ -130,19 +157,18 @@ class DownloadableFileSource(Source):
# there is no 'track' field in the source to determine what/whether
# or not to update refs, because tracking a ref is always a conscious
# decision by the user.
- with self.timed_activity("Tracking {}".format(self.url), silent_nested=True):
- new_ref = self._ensure_mirror()
-
- if self.ref and self.ref != new_ref:
- detail = (
- "When tracking, new ref differs from current ref:\n"
- + " Tracked URL: {}\n".format(self.url)
- + " Current ref: {}\n".format(self.ref)
- + " New ref: {}\n".format(new_ref)
- )
- self.warn("Potential man-in-the-middle attack!", detail=detail)
+ new_ref = self._ensure_mirror("Tracking {}".format(self.url))
- return new_ref
+ if self.ref and self.ref != new_ref:
+ detail = (
+ "When tracking, new ref differs from current ref:\n"
+ + " Tracked URL: {}\n".format(self.url)
+ + " Current ref: {}\n".format(self.ref)
+ + " New ref: {}\n".format(new_ref)
+ )
+ self.warn("Potential man-in-the-middle attack!", detail=detail)
+
+ return new_ref
def fetch(self): # pylint: disable=arguments-differ
@@ -155,12 +181,11 @@ class DownloadableFileSource(Source):
# Download the file, raise hell if the sha256sums don't match,
# and mirror the file otherwise.
- with self.timed_activity("Fetching {}".format(self.url), silent_nested=True):
- sha256 = self._ensure_mirror()
- if sha256 != self.ref:
- raise SourceError(
- "File downloaded from {} has sha256sum '{}', not '{}'!".format(self.url, sha256, self.ref)
- )
+ sha256 = self._ensure_mirror("Fetching {}".format(self.url),)
+ if sha256 != self.ref:
+ raise SourceError(
+ "File downloaded from {} has sha256sum '{}', not '{}'!".format(self.url, sha256, self.ref)
+ )
def _warn_deprecated_etag(self, node):
etag = node.get_str("etag", None)
@@ -181,40 +206,25 @@ class DownloadableFileSource(Source):
with utils.save_file_atomic(etagfilename) as etagfile:
etagfile.write(etag)
- def _ensure_mirror(self):
+ def _ensure_mirror(self, activity_name: str):
# Downloads from the url and caches it according to its sha256sum.
try:
with self.tempdir() as td:
- default_name = os.path.basename(self.url)
- request = urllib.request.Request(self.url)
- request.add_header("Accept", "*/*")
- request.add_header("User-Agent", "BuildStream/2")
-
# We do not use etag in case what we have in cache is
# not matching ref in order to be able to recover from
# corrupted download.
- if self.ref:
- etag = self._get_etag(self.ref)
-
+ if self.ref and not self.is_cached():
# Do not re-download the file if the ETag matches.
- if etag and self.is_cached():
- request.add_header("If-None-Match", etag)
-
- opener = self.__get_urlopener()
- with contextlib.closing(opener.open(request)) as response:
- info = response.info()
-
- # some servers don't honor the 'If-None-Match' header
- if self.ref and etag and info["ETag"] == etag:
- return self.ref
+ etag = self._get_etag(self.ref)
+ else:
+ etag = None
- etag = info["ETag"]
+ local_file, new_etag = self.blocking_activity(
+ _download_file, (self.__get_urlopener(), self.url, etag, td), activity_name
+ )
- filename = info.get_filename(default_name)
- filename = os.path.basename(filename)
- local_file = os.path.join(td, filename)
- with open(local_file, "wb") as dest:
- shutil.copyfileobj(response, dest)
+ if local_file is None:
+ return self.ref
# Make sure url-specific mirror dir exists.
if not os.path.isdir(self._mirror_dir):
@@ -226,8 +236,8 @@ class DownloadableFileSource(Source):
# In case the old file was corrupted somehow.
os.rename(local_file, self._get_mirror_file(sha256))
- if etag:
- self._store_etag(sha256, etag)
+ if new_etag:
+ self._store_etag(sha256, new_etag)
return sha256
except urllib.error.HTTPError as e:
diff --git a/src/buildstream/plugin.py b/src/buildstream/plugin.py
index 2b2382eb7..f0c1bf859 100644
--- a/src/buildstream/plugin.py
+++ b/src/buildstream/plugin.py
@@ -110,14 +110,19 @@ Class Reference
"""
import itertools
+import multiprocessing
import os
+import pickle
+import queue
+import signal
import subprocess
import sys
-from contextlib import contextmanager
-from typing import Generator, Optional, Tuple, TYPE_CHECKING
+import traceback
+from contextlib import contextmanager, suppress
+from typing import Any, Callable, Generator, Optional, Sequence, Tuple, TypeVar, TYPE_CHECKING
from weakref import WeakValueDictionary
-from . import utils
+from . import utils, _signals
from ._exceptions import PluginError, ImplError
from ._message import Message, MessageType
from .node import Node, MappingNode
@@ -131,6 +136,36 @@ if TYPE_CHECKING:
# pylint: enable=cyclic-import
+T1 = TypeVar("T1")
+
+
+# _background_job_wrapper()
+#
+# Wrapper for running jobs in the background, transparently for users
+#
+# This method will put on the queue a response of the form:
+# (PickleError, OtherError, Result)
+#
+# Args:
+# result_queue: The queue in which to pass back the result
+# target: function to execute in the background
+# args: positional arguments to give to the target function
+#
+def _background_job_wrapper(result_queue: multiprocessing.Queue, target: Callable[..., T1], args: Any) -> None:
+ result = None
+
+ try:
+ result = target(*args)
+ result_queue.put((None, result))
+ except Exception as exc: # pylint: disable=broad-except
+ try:
+ # Here we send the result again, just in case it was a PickleError
+ # in which case the same exception would be thrown down
+ result_queue.put((exc, result))
+ except pickle.PickleError as exc:
+ result_queue.put((traceback.format_exc(), None))
+
+
class Plugin:
"""Plugin()
@@ -212,6 +247,18 @@ class Plugin:
# scheduling tasks.
__TABLE = WeakValueDictionary() # type: WeakValueDictionary[int, Plugin]
+ try:
+ __multiprocessing_context = multiprocessing.get_context("forkserver")
+ except ValueError:
+ # We are on a system without `forkserver` support. Let's default to
+ # spawn. This seems to be hanging however in some rare cases.
+ #
+ # Support is not as critical for now, since we do not work on
+ # platforms not supporting forkserver for now (mainly Windows)
+ #
+ # XXX: investigate why we sometimes get deadlocks there
+ __multiprocessing_context = multiprocessing.get_context("spawn")
+
def __init__(
self,
name: str,
@@ -509,6 +556,101 @@ class Plugin:
):
yield
+ def blocking_activity(
+ self,
+ target: Callable[..., T1],
+ args: Sequence[Any],
+ activity_name: str,
+ *,
+ detail: Optional[str] = None,
+ silent_nested: bool = False
+ ) -> T1:
+ """Execute a blocking activity in the background.
+
+ This is to execute potentially blocking methods in the background,
+ in order to avoid starving the scheduler.
+
+ The function, its arguments and return value must all be pickleable,
+ as it will be run in another process.
+
+ This should be used whenever there is a potential for a blocking
+ syscall to not return in a reasonable (<1s) amount of time.
+ For example, you would use this if you were doing a request to a
+ remote server, without a timeout.
+
+ Args:
+ target: the function to execute in the background
+ args: positional arguments to pass to the method to call
+ activity_name: The name of the activity
+ detail: An optional detailed message, can be multiline output
+ silent_nested: If specified, nested messages will be silenced
+
+ Returns:
+ the return value from `target`.
+ """
+ with self.__context.messenger.timed_activity(
+ activity_name, element_name=self._get_full_name(), detail=detail, silent_nested=silent_nested
+ ):
+ result_queue = self.__multiprocessing_context.Queue()
+ proc = None
+
+ def kill_proc():
+ if proc and proc.is_alive():
+ proc.kill()
+ proc.join()
+
+ def suspend_proc():
+ if proc and proc.is_alive():
+ with suppress(ProcessLookupError):
+ os.kill(proc.pid, signal.SIGSTOP)
+
+ def resume_proc():
+ if proc and proc.is_alive():
+ with suppress(ProcessLookupError):
+ os.kill(proc.pid, signal.SIGCONT)
+
+ with _signals.suspendable(suspend_proc, resume_proc), _signals.terminator(kill_proc):
+ proc = self.__multiprocessing_context.Process(
+ target=_background_job_wrapper, args=(result_queue, target, args)
+ )
+ with _signals.blocked([signal.SIGINT, signal.SIGTSTP, signal.SIGTERM], ignore=False):
+ proc.start()
+
+ should_continue = True
+ last_check = False
+
+ while should_continue or last_check:
+ last_check = False
+
+ try:
+ err, result = result_queue.get(timeout=1)
+ break
+ except queue.Empty:
+ if not proc.is_alive() and should_continue:
+ # Let's check one last time, just in case it stopped
+ # between our last check and now
+ last_check = True
+ should_continue = False
+ continue
+ else:
+ raise PluginError("Background process died with error code {}".format(proc.exitcode))
+
+ try:
+ proc.join(timeout=15)
+ proc.terminate()
+ except TimeoutError:
+ raise PluginError("Background process didn't exit after 15 seconds and got killed.")
+
+ if err is not None:
+ if isinstance(err, str):
+ # This was a pickle error, this is a bug
+ raise PluginError(
+ "An error happened while returning the result from a blocking activity", detail=err
+ )
+ raise err
+
+ return result
+
def call(self, *popenargs, fail: Optional[str] = None, fail_temporarily: bool = False, **kwargs) -> int:
"""A wrapper for subprocess.call()