From 7a9735ddb73c2d1b94eaf1a73b277259fdc37628 Mon Sep 17 00:00:00 2001 From: Benjamin Schubert Date: Sat, 29 Aug 2020 09:36:16 +0000 Subject: utils.py: Don't block on the call's `communicate` call This ensures that, if we were to receive signals or other things while we are on this blocking call, we would be able to process them instead of waiting for the end of the process --- src/buildstream/utils.py | 16 +++++++++++++++- 1 file changed, 15 insertions(+), 1 deletion(-) diff --git a/src/buildstream/utils.py b/src/buildstream/utils.py index 9c6761ccc..4eb62d988 100644 --- a/src/buildstream/utils.py +++ b/src/buildstream/utils.py @@ -32,6 +32,7 @@ import signal import stat from stat import S_ISDIR import subprocess +from subprocess import TimeoutExpired import tempfile import time import datetime @@ -1392,7 +1393,20 @@ def _call(*popenargs, terminate=False, **kwargs): process = subprocess.Popen( # pylint: disable=subprocess-popen-preexec-fn *popenargs, preexec_fn=preexec_fn, universal_newlines=True, **kwargs ) - output, _ = process.communicate() + # Here, we don't use `process.communicate()` directly without a timeout + # This is because, if we were to do that, and the process would never + # output anything, the control would never be given back to the python + # process, which might thus not be able to check for request to + # shutdown, or kill the process. + # We therefore loop with a timeout, to ensure the python process + # can act if it needs. + while True: + try: + output, _ = process.communicate(timeout=1) + break + except TimeoutExpired: + continue + exit_code = process.poll() return (exit_code, output) -- cgit v1.2.1 From c7fc96494d6e268a6a48e0ff68c35763200d5d77 Mon Sep 17 00:00:00 2001 From: Benjamin Schubert Date: Wed, 17 Jun 2020 20:57:37 +0000 Subject: _signals.py: allow calling signal handler from non-main threads * This modifies the signal terminator so that it can be called from any thread. This checks that either: - The signal handler is already in place - Or the caller is in the main thread, allowing to set the signal handler. This also removes the exact callback that was added instead of removing the last one, and fixes the `suspend_handler` to do the same. This is required, as we don't know which interleaving of calls will be done, and we can't guarantee that the last one is the right one to remove --- src/buildstream/_cas/casserver.py | 7 ++++++- src/buildstream/_signals.py | 19 ++++++++++++------- 2 files changed, 18 insertions(+), 8 deletions(-) diff --git a/src/buildstream/_cas/casserver.py b/src/buildstream/_cas/casserver.py index 013fb07dd..04c5eb836 100644 --- a/src/buildstream/_cas/casserver.py +++ b/src/buildstream/_cas/casserver.py @@ -30,6 +30,7 @@ import grpc import click from .._protos.build.bazel.remote.asset.v1 import remote_asset_pb2_grpc +from .. import _signals from .._protos.build.bazel.remote.execution.v2 import ( remote_execution_pb2, remote_execution_pb2_grpc, @@ -137,7 +138,11 @@ def create_server(repo, *, enable_push, quota, index_only, log_level=LogLevel.Le _ReferenceStorageServicer(casd_channel, root, enable_push=enable_push), server ) - yield server + # Ensure we have the signal handler set for SIGTERM + # This allows threads from GRPC to call our methods that do register + # handlers at exit. + with _signals.terminator(lambda: None): + yield server finally: casd_channel.close() diff --git a/src/buildstream/_signals.py b/src/buildstream/_signals.py index 03b55b052..8032752a8 100644 --- a/src/buildstream/_signals.py +++ b/src/buildstream/_signals.py @@ -33,6 +33,9 @@ from typing import Callable, Deque terminator_stack: Deque[Callable] = deque() suspendable_stack: Deque[Callable] = deque() +terminator_lock = threading.Lock() +suspendable_lock = threading.Lock() + # Per process SIGTERM handler def terminator_handler(signal_, frame): @@ -80,13 +83,10 @@ def terminator_handler(signal_, frame): def terminator(terminate_func): global terminator_stack # pylint: disable=global-statement - # Signal handling only works in the main thread - if threading.current_thread() != threading.main_thread(): - yield - return - outermost = bool(not terminator_stack) + assert threading.current_thread() == threading.main_thread() or not outermost + terminator_stack.append(terminate_func) if outermost: original_handler = signal.signal(signal.SIGTERM, terminator_handler) @@ -96,7 +96,9 @@ def terminator(terminate_func): finally: if outermost: signal.signal(signal.SIGTERM, original_handler) - terminator_stack.pop() + + with terminator_lock: + terminator_stack.remove(terminate_func) # Just a simple object for holding on to two callbacks @@ -146,6 +148,8 @@ def suspendable(suspend_callback, resume_callback): global suspendable_stack # pylint: disable=global-statement outermost = bool(not suspendable_stack) + assert threading.current_thread() == threading.main_thread() or not outermost + suspender = Suspender(suspend_callback, resume_callback) suspendable_stack.append(suspender) @@ -158,7 +162,8 @@ def suspendable(suspend_callback, resume_callback): if outermost: signal.signal(signal.SIGTSTP, original_stop) - suspendable_stack.pop() + with suspendable_lock: + suspendable_stack.remove(suspender) # blocked() -- cgit v1.2.1 From 11b2aa717718b315d005ab5b29b56ec521424528 Mon Sep 17 00:00:00 2001 From: Benjamin Schubert Date: Tue, 29 Sep 2020 21:15:35 +0000 Subject: element.py: Stop setting a deterministic umask for staging sources This does not behaves as we would expect, as it is not always consistent, and doesn't have any impact in most cases. We should revisit our handling of permissions and umasks separately, in the meantime, this is required in order to fix building with a threaded scheduler, as it would otherwise introduce concurrency errors --- src/buildstream/element.py | 11 ++--------- src/buildstream/utils.py | 15 --------------- 2 files changed, 2 insertions(+), 24 deletions(-) diff --git a/src/buildstream/element.py b/src/buildstream/element.py index ac16e3103..b6b4b801c 100644 --- a/src/buildstream/element.py +++ b/src/buildstream/element.py @@ -1497,15 +1497,9 @@ class Element(Plugin): # def _stage_sources_at(self, vdirectory, usebuildtree=False): - context = self._get_context() - # It's advantageous to have this temporary directory on # the same file system as the rest of our cache. - with self.timed_activity("Staging sources", silent_nested=True), utils._tempdir( - dir=context.tmpdir, prefix="staging-temp" - ) as temp_staging_directory: - - import_dir = temp_staging_directory + with self.timed_activity("Staging sources", silent_nested=True): if not isinstance(vdirectory, Directory): vdirectory = FileBasedDirectory(vdirectory) @@ -1534,8 +1528,7 @@ class Element(Plugin): import_dir = staged_sources # Set update_mtime to ensure deterministic mtime of sources at build time - with utils._deterministic_umask(): - vdirectory.import_files(import_dir, update_mtime=BST_ARBITRARY_TIMESTAMP) + vdirectory.import_files(import_dir, update_mtime=BST_ARBITRARY_TIMESTAMP) # Ensure deterministic owners of sources at build time vdirectory.set_deterministic_user() diff --git a/src/buildstream/utils.py b/src/buildstream/utils.py index 4eb62d988..956ad1d65 100644 --- a/src/buildstream/utils.py +++ b/src/buildstream/utils.py @@ -1563,21 +1563,6 @@ def _search_upward_for_files(directory, filenames): directory = parent_dir -# _deterministic_umask() -# -# Context managed to apply a umask to a section that may be affected by a users -# umask. Restores old mask afterwards. -# -@contextmanager -def _deterministic_umask(): - old_umask = os.umask(0o022) - - try: - yield - finally: - os.umask(old_umask) - - # _get_compression: # # Given a file name infer the compression -- cgit v1.2.1 From 0360bc1feca1d5429cdb7fbc083727d242499733 Mon Sep 17 00:00:00 2001 From: Benjamin Schubert Date: Sat, 29 Aug 2020 14:08:52 +0000 Subject: downloadablefilesource.py: Reset the file opener between every test This is required when we run this in the main process, with the threaded scheduler rework. Otherwise the state is kept between tests --- src/buildstream/downloadablefilesource.py | 5 +++++ src/buildstream/testing/_fixtures.py | 3 ++- 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/src/buildstream/downloadablefilesource.py b/src/buildstream/downloadablefilesource.py index 2c438b033..495b8bd4a 100644 --- a/src/buildstream/downloadablefilesource.py +++ b/src/buildstream/downloadablefilesource.py @@ -252,6 +252,11 @@ class DownloadableFileSource(Source): return self.__default_mirror_file + @classmethod + def _reset_url_opener(cls): + # Needed for tests, in order to cleanup the `netrc` configuration. + cls.__urlopener = None + def __get_urlopener(self): if not DownloadableFileSource.__urlopener: try: diff --git a/src/buildstream/testing/_fixtures.py b/src/buildstream/testing/_fixtures.py index 98778936d..520f68587 100644 --- a/src/buildstream/testing/_fixtures.py +++ b/src/buildstream/testing/_fixtures.py @@ -19,7 +19,7 @@ import psutil import pytest -from buildstream import node, utils +from buildstream import node, utils, DownloadableFileSource @pytest.fixture(autouse=True, scope="session") @@ -42,3 +42,4 @@ def thread_check(default_thread_number): @pytest.fixture(autouse=True) def reset_global_node_state(): node._reset_global_state() + DownloadableFileSource._reset_url_opener() -- cgit v1.2.1 From be88eaec0445ff2d85b73c17a392d0e65620202b Mon Sep 17 00:00:00 2001 From: Benjamin Schubert Date: Thu, 9 Jul 2020 19:02:20 +0100 Subject: plugin.py: Add a helper to run blocking processes in subprocesses This ensures that we can cleanly cleanup processes and threads on termination of BuildStream. Plugins should use this helper whenever there is a risk of them being blocked on a syscall for an indefinite amount of time * downloadablefilesource.py: Use this new helper to do the actual download, which would prevent the process from completely blocking if we have a badly behaving upstream --- src/buildstream/downloadablefilesource.py | 100 +++++++++++--------- src/buildstream/plugin.py | 148 +++++++++++++++++++++++++++++- 2 files changed, 200 insertions(+), 48 deletions(-) diff --git a/src/buildstream/downloadablefilesource.py b/src/buildstream/downloadablefilesource.py index 495b8bd4a..b299b7f1b 100644 --- a/src/buildstream/downloadablefilesource.py +++ b/src/buildstream/downloadablefilesource.py @@ -92,6 +92,33 @@ class _NetrcPasswordManager: return login, password +def _download_file(opener, url, etag, directory): + default_name = os.path.basename(url) + request = urllib.request.Request(url) + request.add_header("Accept", "*/*") + request.add_header("User-Agent", "BuildStream/2") + + if etag is not None: + request.add_header("If-None-Match", etag) + + with contextlib.closing(opener.open(request)) as response: + info = response.info() + + # some servers don't honor the 'If-None-Match' header + if etag and info["ETag"] == etag: + return None, None + + etag = info["ETag"] + + filename = info.get_filename(default_name) + filename = os.path.basename(filename) + local_file = os.path.join(directory, filename) + with open(local_file, "wb") as dest: + shutil.copyfileobj(response, dest) + + return local_file, etag + + class DownloadableFileSource(Source): # pylint: disable=attribute-defined-outside-init @@ -130,19 +157,18 @@ class DownloadableFileSource(Source): # there is no 'track' field in the source to determine what/whether # or not to update refs, because tracking a ref is always a conscious # decision by the user. - with self.timed_activity("Tracking {}".format(self.url), silent_nested=True): - new_ref = self._ensure_mirror() - - if self.ref and self.ref != new_ref: - detail = ( - "When tracking, new ref differs from current ref:\n" - + " Tracked URL: {}\n".format(self.url) - + " Current ref: {}\n".format(self.ref) - + " New ref: {}\n".format(new_ref) - ) - self.warn("Potential man-in-the-middle attack!", detail=detail) + new_ref = self._ensure_mirror("Tracking {}".format(self.url)) - return new_ref + if self.ref and self.ref != new_ref: + detail = ( + "When tracking, new ref differs from current ref:\n" + + " Tracked URL: {}\n".format(self.url) + + " Current ref: {}\n".format(self.ref) + + " New ref: {}\n".format(new_ref) + ) + self.warn("Potential man-in-the-middle attack!", detail=detail) + + return new_ref def fetch(self): # pylint: disable=arguments-differ @@ -155,12 +181,11 @@ class DownloadableFileSource(Source): # Download the file, raise hell if the sha256sums don't match, # and mirror the file otherwise. - with self.timed_activity("Fetching {}".format(self.url), silent_nested=True): - sha256 = self._ensure_mirror() - if sha256 != self.ref: - raise SourceError( - "File downloaded from {} has sha256sum '{}', not '{}'!".format(self.url, sha256, self.ref) - ) + sha256 = self._ensure_mirror("Fetching {}".format(self.url),) + if sha256 != self.ref: + raise SourceError( + "File downloaded from {} has sha256sum '{}', not '{}'!".format(self.url, sha256, self.ref) + ) def _warn_deprecated_etag(self, node): etag = node.get_str("etag", None) @@ -181,40 +206,25 @@ class DownloadableFileSource(Source): with utils.save_file_atomic(etagfilename) as etagfile: etagfile.write(etag) - def _ensure_mirror(self): + def _ensure_mirror(self, activity_name: str): # Downloads from the url and caches it according to its sha256sum. try: with self.tempdir() as td: - default_name = os.path.basename(self.url) - request = urllib.request.Request(self.url) - request.add_header("Accept", "*/*") - request.add_header("User-Agent", "BuildStream/2") - # We do not use etag in case what we have in cache is # not matching ref in order to be able to recover from # corrupted download. - if self.ref: - etag = self._get_etag(self.ref) - + if self.ref and not self.is_cached(): # Do not re-download the file if the ETag matches. - if etag and self.is_cached(): - request.add_header("If-None-Match", etag) - - opener = self.__get_urlopener() - with contextlib.closing(opener.open(request)) as response: - info = response.info() - - # some servers don't honor the 'If-None-Match' header - if self.ref and etag and info["ETag"] == etag: - return self.ref + etag = self._get_etag(self.ref) + else: + etag = None - etag = info["ETag"] + local_file, new_etag = self.blocking_activity( + _download_file, (self.__get_urlopener(), self.url, etag, td), activity_name + ) - filename = info.get_filename(default_name) - filename = os.path.basename(filename) - local_file = os.path.join(td, filename) - with open(local_file, "wb") as dest: - shutil.copyfileobj(response, dest) + if local_file is None: + return self.ref # Make sure url-specific mirror dir exists. if not os.path.isdir(self._mirror_dir): @@ -226,8 +236,8 @@ class DownloadableFileSource(Source): # In case the old file was corrupted somehow. os.rename(local_file, self._get_mirror_file(sha256)) - if etag: - self._store_etag(sha256, etag) + if new_etag: + self._store_etag(sha256, new_etag) return sha256 except urllib.error.HTTPError as e: diff --git a/src/buildstream/plugin.py b/src/buildstream/plugin.py index 2b2382eb7..f0c1bf859 100644 --- a/src/buildstream/plugin.py +++ b/src/buildstream/plugin.py @@ -110,14 +110,19 @@ Class Reference """ import itertools +import multiprocessing import os +import pickle +import queue +import signal import subprocess import sys -from contextlib import contextmanager -from typing import Generator, Optional, Tuple, TYPE_CHECKING +import traceback +from contextlib import contextmanager, suppress +from typing import Any, Callable, Generator, Optional, Sequence, Tuple, TypeVar, TYPE_CHECKING from weakref import WeakValueDictionary -from . import utils +from . import utils, _signals from ._exceptions import PluginError, ImplError from ._message import Message, MessageType from .node import Node, MappingNode @@ -131,6 +136,36 @@ if TYPE_CHECKING: # pylint: enable=cyclic-import +T1 = TypeVar("T1") + + +# _background_job_wrapper() +# +# Wrapper for running jobs in the background, transparently for users +# +# This method will put on the queue a response of the form: +# (PickleError, OtherError, Result) +# +# Args: +# result_queue: The queue in which to pass back the result +# target: function to execute in the background +# args: positional arguments to give to the target function +# +def _background_job_wrapper(result_queue: multiprocessing.Queue, target: Callable[..., T1], args: Any) -> None: + result = None + + try: + result = target(*args) + result_queue.put((None, result)) + except Exception as exc: # pylint: disable=broad-except + try: + # Here we send the result again, just in case it was a PickleError + # in which case the same exception would be thrown down + result_queue.put((exc, result)) + except pickle.PickleError as exc: + result_queue.put((traceback.format_exc(), None)) + + class Plugin: """Plugin() @@ -212,6 +247,18 @@ class Plugin: # scheduling tasks. __TABLE = WeakValueDictionary() # type: WeakValueDictionary[int, Plugin] + try: + __multiprocessing_context = multiprocessing.get_context("forkserver") + except ValueError: + # We are on a system without `forkserver` support. Let's default to + # spawn. This seems to be hanging however in some rare cases. + # + # Support is not as critical for now, since we do not work on + # platforms not supporting forkserver for now (mainly Windows) + # + # XXX: investigate why we sometimes get deadlocks there + __multiprocessing_context = multiprocessing.get_context("spawn") + def __init__( self, name: str, @@ -509,6 +556,101 @@ class Plugin: ): yield + def blocking_activity( + self, + target: Callable[..., T1], + args: Sequence[Any], + activity_name: str, + *, + detail: Optional[str] = None, + silent_nested: bool = False + ) -> T1: + """Execute a blocking activity in the background. + + This is to execute potentially blocking methods in the background, + in order to avoid starving the scheduler. + + The function, its arguments and return value must all be pickleable, + as it will be run in another process. + + This should be used whenever there is a potential for a blocking + syscall to not return in a reasonable (<1s) amount of time. + For example, you would use this if you were doing a request to a + remote server, without a timeout. + + Args: + target: the function to execute in the background + args: positional arguments to pass to the method to call + activity_name: The name of the activity + detail: An optional detailed message, can be multiline output + silent_nested: If specified, nested messages will be silenced + + Returns: + the return value from `target`. + """ + with self.__context.messenger.timed_activity( + activity_name, element_name=self._get_full_name(), detail=detail, silent_nested=silent_nested + ): + result_queue = self.__multiprocessing_context.Queue() + proc = None + + def kill_proc(): + if proc and proc.is_alive(): + proc.kill() + proc.join() + + def suspend_proc(): + if proc and proc.is_alive(): + with suppress(ProcessLookupError): + os.kill(proc.pid, signal.SIGSTOP) + + def resume_proc(): + if proc and proc.is_alive(): + with suppress(ProcessLookupError): + os.kill(proc.pid, signal.SIGCONT) + + with _signals.suspendable(suspend_proc, resume_proc), _signals.terminator(kill_proc): + proc = self.__multiprocessing_context.Process( + target=_background_job_wrapper, args=(result_queue, target, args) + ) + with _signals.blocked([signal.SIGINT, signal.SIGTSTP, signal.SIGTERM], ignore=False): + proc.start() + + should_continue = True + last_check = False + + while should_continue or last_check: + last_check = False + + try: + err, result = result_queue.get(timeout=1) + break + except queue.Empty: + if not proc.is_alive() and should_continue: + # Let's check one last time, just in case it stopped + # between our last check and now + last_check = True + should_continue = False + continue + else: + raise PluginError("Background process died with error code {}".format(proc.exitcode)) + + try: + proc.join(timeout=15) + proc.terminate() + except TimeoutError: + raise PluginError("Background process didn't exit after 15 seconds and got killed.") + + if err is not None: + if isinstance(err, str): + # This was a pickle error, this is a bug + raise PluginError( + "An error happened while returning the result from a blocking activity", detail=err + ) + raise err + + return result + def call(self, *popenargs, fail: Optional[str] = None, fail_temporarily: bool = False, **kwargs) -> int: """A wrapper for subprocess.call() -- cgit v1.2.1 From 705d0023f65621b23b6b0828306dc5b4ee094b45 Mon Sep 17 00:00:00 2001 From: Benjamin Schubert Date: Fri, 3 Jul 2020 12:57:06 +0000 Subject: scheduler.py: Use threads instead of processes for jobs This changes how the scheduler works and adapts all the code that needs adapting in order to be able to run in threads instead of in subprocesses, which helps with Windows support, and will allow some simplifications in the main pipeline. This addresses the following issues: * Fix #810: All CAS calls are now made in the master process, and thus share the same connection to the cas server * Fix #93: We don't start as many child processes anymore, so the risk of starving the machine are way less * Fix #911: We now use `forkserver` for starting processes. We also don't use subprocesses for jobs so we should be starting less subprocesses And the following highlevel changes where made: * cascache.py: Run the CasCacheUsageMonitor in a thread instead of a subprocess. * casdprocessmanager.py: Ensure start and stop of the process are thread safe. * job.py: Run the child in a thread instead of a process, adapt how we stop a thread, since we ca't use signals anymore. * _multiprocessing.py: Not needed anymore, we are not using `fork()`. * scheduler.py: Run the scheduler with a threadpool, to run the child jobs in. Also adapt how our signal handling is done, since we are not receiving signals from our children anymore, and can't kill them the same way. * sandbox: Stop using blocking signals to wait on the process, and use timeouts all the time. * messenger.py: Use a thread-local context for the handler, to allow for multiple parameters in the same process. * _remote.py: Ensure the start of the connection is thread safe * _signal.py: Allow blocking entering in the signal's context managers by setting an event. This is to ensure no thread runs long-running code while we asked the scheduler to pause. This also ensures all the signal handlers is thread safe. * source.py: Change check around saving the source's ref. We are now running in the same process, and thus the ref will already have been changed. --- .pylintrc | 1 + setup.cfg | 2 +- setup.py | 1 + src/buildstream/_cas/cascache.py | 73 +++--- src/buildstream/_cas/casdprocessmanager.py | 95 +++++--- src/buildstream/_context.py | 14 -- src/buildstream/_elementsources.py | 4 +- src/buildstream/_messenger.py | 60 +++-- src/buildstream/_remote.py | 56 ++--- src/buildstream/_scheduler/_multiprocessing.py | 79 ------- src/buildstream/_scheduler/jobs/_job.pyi | 1 + src/buildstream/_scheduler/jobs/_job.pyx | 15 ++ src/buildstream/_scheduler/jobs/job.py | 295 ++++++++++-------------- src/buildstream/_scheduler/queues/trackqueue.py | 7 +- src/buildstream/_scheduler/scheduler.py | 83 ++++--- src/buildstream/_signals.py | 64 +++-- src/buildstream/_workspaces.py | 2 +- src/buildstream/element.py | 24 +- src/buildstream/plugin.py | 3 +- src/buildstream/sandbox/_sandboxbuildboxrun.py | 14 +- src/buildstream/sandbox/_sandboxremote.py | 4 - src/buildstream/source.py | 9 +- src/buildstream/utils.py | 10 +- tests/artifactcache/pull.py | 1 + tests/internals/cascache.py | 9 + 25 files changed, 434 insertions(+), 492 deletions(-) delete mode 100644 src/buildstream/_scheduler/_multiprocessing.py create mode 100644 src/buildstream/_scheduler/jobs/_job.pyi create mode 100644 src/buildstream/_scheduler/jobs/_job.pyx diff --git a/.pylintrc b/.pylintrc index 25d5647b0..806a11395 100644 --- a/.pylintrc +++ b/.pylintrc @@ -8,6 +8,7 @@ extension-pkg-whitelist= buildstream._loader._loader, buildstream._loader.loadelement, buildstream._loader.types, + buildstream._scheduler.jobs._job, buildstream._types, buildstream._utils, buildstream._variables, diff --git a/setup.cfg b/setup.cfg index 19ffbcbc2..283d78099 100644 --- a/setup.cfg +++ b/setup.cfg @@ -28,7 +28,7 @@ warn_no_return = True # Ignore missing stubs for third-party packages. # In future, these should be re-enabled if/when stubs for them become available. -[mypy-copyreg,grpc,pluginbase,psutil,pyroaring,ruamel] +[mypy-copyreg,grpc,pluginbase,psutil,pyroaring,ruamel,multiprocessing.forkserver] ignore_missing_imports=True # Ignore issues with generated files and vendored code diff --git a/setup.py b/setup.py index a7c698030..18b5e42be 100755 --- a/setup.py +++ b/setup.py @@ -319,6 +319,7 @@ BUILD_EXTENSIONS = [] register_cython_module("buildstream.node") register_cython_module("buildstream._loader._loader") register_cython_module("buildstream._loader.loadelement", dependencies=["buildstream.node"]) +register_cython_module("buildstream._scheduler.jobs._job") register_cython_module("buildstream._yaml", dependencies=["buildstream.node"]) register_cython_module("buildstream._types") register_cython_module("buildstream._utils") diff --git a/src/buildstream/_cas/cascache.py b/src/buildstream/_cas/cascache.py index 7936121ea..d13531c6c 100644 --- a/src/buildstream/_cas/cascache.py +++ b/src/buildstream/_cas/cascache.py @@ -22,11 +22,9 @@ import itertools import os import stat import contextlib -import ctypes -import multiprocessing -import signal import time from typing import Optional, List +import threading import grpc @@ -34,7 +32,7 @@ from .._protos.google.rpc import code_pb2 from .._protos.build.bazel.remote.execution.v2 import remote_execution_pb2 from .._protos.build.buildgrid import local_cas_pb2 -from .. import _signals, utils +from .. import utils from ..types import FastEnum, SourceRef from .._exceptions import CASCacheError @@ -93,6 +91,7 @@ class CASCache: self._casd_channel = self._casd_process_manager.create_channel() self._cache_usage_monitor = _CASCacheUsageMonitor(self._casd_channel) + self._cache_usage_monitor.start() # get_cas(): # @@ -131,8 +130,12 @@ class CASCache: # Release resources used by CASCache. # def release_resources(self, messenger=None): + if self._casd_channel: + self._casd_channel.request_shutdown() + if self._cache_usage_monitor: - self._cache_usage_monitor.release_resources() + self._cache_usage_monitor.stop() + self._cache_usage_monitor.join() if self._casd_process_manager: self.close_grpc_channels() @@ -731,65 +734,45 @@ class _CASCacheUsage: # This manages the subprocess that tracks cache usage information via # buildbox-casd. # -class _CASCacheUsageMonitor: +class _CASCacheUsageMonitor(threading.Thread): def __init__(self, connection): + super().__init__() self._connection = connection - - # Shared memory (64-bit signed integer) for current disk usage and quota - self._disk_usage = multiprocessing.Value(ctypes.c_longlong, -1) - self._disk_quota = multiprocessing.Value(ctypes.c_longlong, -1) - - # multiprocessing.Process will fork without exec on Unix. - # This can't be allowed with background threads or open gRPC channels. - assert utils._is_single_threaded() and connection.is_closed() - - # Block SIGINT, we don't want to kill the process when we interrupt the frontend - # and this process if very lightweight. - with _signals.blocked([signal.SIGINT], ignore=False): - self._subprocess = multiprocessing.Process(target=self._subprocess_run) - self._subprocess.start() + self._disk_usage = None + self._disk_quota = None + self._should_stop = False def get_cache_usage(self): - disk_usage = self._disk_usage.value - disk_quota = self._disk_quota.value - - if disk_usage < 0: - # Disk usage still unknown - disk_usage = None - - if disk_quota <= 0: - # No disk quota - disk_quota = None - - return _CASCacheUsage(disk_usage, disk_quota) - - def release_resources(self): - # Simply terminate the subprocess, no cleanup required in the subprocess - self._subprocess.terminate() + return _CASCacheUsage(self._disk_usage, self._disk_quota) - def _subprocess_run(self): - # Reset SIGTERM in subprocess to default as no cleanup is necessary - signal.signal(signal.SIGTERM, signal.SIG_DFL) + def stop(self): + self._should_stop = True - disk_usage = self._disk_usage - disk_quota = self._disk_quota + def run(self): local_cas = self._connection.get_local_cas() - while True: + while not self._should_stop: try: # Ask buildbox-casd for current value request = local_cas_pb2.GetLocalDiskUsageRequest() response = local_cas.GetLocalDiskUsage(request) # Update values in shared memory - disk_usage.value = response.size_bytes - disk_quota.value = response.quota_bytes + self._disk_usage = response.size_bytes + disk_quota = response.quota_bytes + if disk_quota == 0: # Quota == 0 means there is no quota + self._disk_quota = None + else: + self._disk_quota = disk_quota except grpc.RpcError: # Terminate loop when buildbox-casd becomes unavailable break # Sleep until next refresh - time.sleep(_CACHE_USAGE_REFRESH) + for _ in range(_CACHE_USAGE_REFRESH * 10): + if self._should_stop: + break + time.sleep(0.1) def _grouper(iterable, n): diff --git a/src/buildstream/_cas/casdprocessmanager.py b/src/buildstream/_cas/casdprocessmanager.py index 32e4cce63..20ff610eb 100644 --- a/src/buildstream/_cas/casdprocessmanager.py +++ b/src/buildstream/_cas/casdprocessmanager.py @@ -17,6 +17,7 @@ # import contextlib +import threading import os import random import shutil @@ -240,35 +241,48 @@ class CASDChannel: self._asset_fetch = None self._asset_push = None self._casd_pid = casd_pid + self._shutdown_requested = False - def _establish_connection(self): - assert self._casd_channel is None - - while not os.path.exists(self._socket_path): - # casd is not ready yet, try again after a 10ms delay, - # but don't wait for more than specified timeout period - if time.time() > self._start_time + _CASD_TIMEOUT: - raise CASCacheError("Timed out waiting for buildbox-casd to become ready") + self._lock = threading.Lock() - # check that process is still alive - try: - proc = psutil.Process(self._casd_pid) - if proc.status() == psutil.STATUS_ZOMBIE: - proc.wait() + def _establish_connection(self): + with self._lock: + if self._casd_channel is not None: + return + + while not os.path.exists(self._socket_path): + # casd is not ready yet, try again after a 10ms delay, + # but don't wait for more than specified timeout period + if time.time() > self._start_time + _CASD_TIMEOUT: + raise CASCacheError("Timed out waiting for buildbox-casd to become ready") + + if self._shutdown_requested: + # Shutdown has been requested, we can exit + return - if not proc.is_running(): + # check that process is still alive + try: + proc = psutil.Process(self._casd_pid) + if proc.status() == psutil.STATUS_ZOMBIE: + proc.wait() + + if not proc.is_running(): + if self._shutdown_requested: + return + raise CASCacheError("buildbox-casd process died before connection could be established") + except psutil.NoSuchProcess: + if self._shutdown_requested: + return raise CASCacheError("buildbox-casd process died before connection could be established") - except psutil.NoSuchProcess: - raise CASCacheError("buildbox-casd process died before connection could be established") - time.sleep(0.01) + time.sleep(0.01) - self._casd_channel = grpc.insecure_channel(self._connection_string) - self._bytestream = bytestream_pb2_grpc.ByteStreamStub(self._casd_channel) - self._casd_cas = remote_execution_pb2_grpc.ContentAddressableStorageStub(self._casd_channel) - self._local_cas = local_cas_pb2_grpc.LocalContentAddressableStorageStub(self._casd_channel) - self._asset_fetch = remote_asset_pb2_grpc.FetchStub(self._casd_channel) - self._asset_push = remote_asset_pb2_grpc.PushStub(self._casd_channel) + self._casd_channel = grpc.insecure_channel(self._connection_string) + self._bytestream = bytestream_pb2_grpc.ByteStreamStub(self._casd_channel) + self._casd_cas = remote_execution_pb2_grpc.ContentAddressableStorageStub(self._casd_channel) + self._local_cas = local_cas_pb2_grpc.LocalContentAddressableStorageStub(self._casd_channel) + self._asset_fetch = remote_asset_pb2_grpc.FetchStub(self._casd_channel) + self._asset_push = remote_asset_pb2_grpc.PushStub(self._casd_channel) # get_cas(): # @@ -284,12 +298,12 @@ class CASDChannel: # Return LocalCAS stub for buildbox-casd channel. # def get_local_cas(self): - if self._casd_channel is None: + if self._local_cas is None: self._establish_connection() return self._local_cas def get_bytestream(self): - if self._casd_channel is None: + if self._bytestream is None: self._establish_connection() return self._bytestream @@ -318,17 +332,30 @@ class CASDChannel: def is_closed(self): return self._casd_channel is None + # request_shutdown(): + # + # Notify the channel that a shutdown of casd was requested. + # + # Thus we know that not being able to establish a connection is expected + # and no error will be reported in that case. + def request_shutdown(self) -> None: + self._shutdown_requested = True + # close(): # # Close the casd channel. # def close(self): - if self.is_closed(): - return - self._asset_push = None - self._asset_fetch = None - self._local_cas = None - self._casd_cas = None - self._bytestream = None - self._casd_channel.close() - self._casd_channel = None + assert self._shutdown_requested, "Please request shutdown before closing" + + with self._lock: + if self.is_closed(): + return + + self._asset_push = None + self._asset_fetch = None + self._local_cas = None + self._casd_cas = None + self._bytestream = None + self._casd_channel.close() + self._casd_channel = None diff --git a/src/buildstream/_context.py b/src/buildstream/_context.py index 0c2d1a150..87b23d50c 100644 --- a/src/buildstream/_context.py +++ b/src/buildstream/_context.py @@ -564,17 +564,3 @@ class Context: log_directory=self.logdir, ) return self._cascache - - # prepare_fork(): - # - # Prepare this process for fork without exec. This is a safeguard against - # fork issues with multiple threads and gRPC connections. - # - def prepare_fork(self): - # gRPC channels must be closed before fork. - for cache in [self._cascache, self._artifactcache, self._sourcecache]: - if cache: - cache.close_grpc_channels() - - # Do not allow fork if there are background threads. - return utils._is_single_threaded() diff --git a/src/buildstream/_elementsources.py b/src/buildstream/_elementsources.py index 69051ee8a..a15d20ecb 100644 --- a/src/buildstream/_elementsources.py +++ b/src/buildstream/_elementsources.py @@ -100,7 +100,7 @@ class ElementSources: else: new_ref = source._track() - refs.append((source._unique_id, new_ref)) + refs.append((source._unique_id, new_ref, old_ref != new_ref)) # Complimentary warning that the new ref will be unused. if old_ref != new_ref and workspace: @@ -113,7 +113,7 @@ class ElementSources: # Sources which do not implement track() will return None, produce # a SKIP message in the UI if all sources produce None # - if all(ref is None for _, ref in refs): + if all(ref is None for _, ref, _ in refs): raise SkipJob("Element sources are not trackable") return refs diff --git a/src/buildstream/_messenger.py b/src/buildstream/_messenger.py index c37eca8bc..69a309f91 100644 --- a/src/buildstream/_messenger.py +++ b/src/buildstream/_messenger.py @@ -19,10 +19,10 @@ import os import datetime +import threading from contextlib import contextmanager from . import _signals -from . import utils from ._exceptions import BstError from ._message import Message, MessageType @@ -48,15 +48,17 @@ class _TimeData: class Messenger: def __init__(self): - self._message_handler = None - self._silence_scope_depth = 0 - self._log_handle = None - self._log_filename = None self._state = None self._next_render = None # A Time object self._active_simple_tasks = 0 self._render_status_cb = None + self._locals = threading.local() + self._locals.message_handler = None + self._locals.log_handle = None + self._locals.log_filename = None + self._locals.silence_scope_depth = 0 + # set_message_handler() # # Sets the handler for any status messages propagated through @@ -70,7 +72,7 @@ class Messenger: # ) -> None # def set_message_handler(self, handler): - self._message_handler = handler + self._locals.message_handler = handler # set_state() # @@ -101,7 +103,7 @@ class Messenger: # (bool): Whether messages are currently being silenced # def _silent_messages(self): - return self._silence_scope_depth > 0 + return self._locals.silence_scope_depth > 0 # message(): # @@ -112,16 +114,15 @@ class Messenger: # message: A Message object # def message(self, message): - # If we are recording messages, dump a copy into the open log file. self._record_message(message) # Send it off to the log handler (can be the frontend, # or it can be the child task which will propagate # to the frontend) - assert self._message_handler + assert self._locals.message_handler - self._message_handler(message, is_silenced=self._silent_messages()) + self._locals.message_handler(message, is_silenced=self._silent_messages()) # silence() # @@ -141,12 +142,12 @@ class Messenger: yield return - self._silence_scope_depth += 1 + self._locals.silence_scope_depth += 1 try: yield finally: - assert self._silence_scope_depth > 0 - self._silence_scope_depth -= 1 + assert self._locals.silence_scope_depth > 0 + self._locals.silence_scope_depth -= 1 # timed_activity() # @@ -264,22 +265,21 @@ class Messenger: # @contextmanager def recorded_messages(self, filename, logdir): - # We dont allow recursing in this context manager, and # we also do not allow it in the main process. - assert self._log_handle is None - assert self._log_filename is None - assert not utils._is_main_process() + assert not hasattr(self._locals, "log_handle") or self._locals.log_handle is None + assert not hasattr(self._locals, "log_filename") or self._locals.log_filename is None # Create the fully qualified logfile in the log directory, # appending the pid and .log extension at the end. - self._log_filename = os.path.join(logdir, "{}.{}.log".format(filename, os.getpid())) + self._locals.log_filename = os.path.join(logdir, "{}.{}.log".format(filename, os.getpid())) + self._locals.silence_scope_depth = 0 # Ensure the directory exists first - directory = os.path.dirname(self._log_filename) + directory = os.path.dirname(self._locals.log_filename) os.makedirs(directory, exist_ok=True) - with open(self._log_filename, "a") as logfile: + with open(self._locals.log_filename, "a") as logfile: # Write one last line to the log and flush it to disk def flush_log(): @@ -294,12 +294,12 @@ class Messenger: except RuntimeError: os.fsync(logfile.fileno()) - self._log_handle = logfile + self._locals.log_handle = logfile with _signals.terminator(flush_log): - yield self._log_filename + yield self._locals.log_filename - self._log_handle = None - self._log_filename = None + self._locals.log_handle = None + self._locals.log_filename = None # get_log_handle() # @@ -311,7 +311,7 @@ class Messenger: # (file): The active logging file handle, or None # def get_log_handle(self): - return self._log_handle + return self._locals.log_handle # get_log_filename() # @@ -323,7 +323,7 @@ class Messenger: # (str): The active logging filename, or None # def get_log_filename(self): - return self._log_filename + return self._locals.log_filename # timed_suspendable() # @@ -345,8 +345,6 @@ class Messenger: stopped_time = datetime.datetime.now() def resume_time(): - nonlocal timedata - nonlocal stopped_time sleep_time = datetime.datetime.now() - stopped_time timedata.start_time += sleep_time @@ -362,7 +360,7 @@ class Messenger: # def _record_message(self, message): - if self._log_handle is None: + if self._locals.log_handle is None: return INDENT = " " @@ -405,8 +403,8 @@ class Messenger: ) # Write to the open log file - self._log_handle.write("{}\n".format(text)) - self._log_handle.flush() + self._locals.log_handle.write("{}\n".format(text)) + self._locals.log_handle.flush() # _render_status() # diff --git a/src/buildstream/_remote.py b/src/buildstream/_remote.py index d8b8e68fe..6d52ff56a 100644 --- a/src/buildstream/_remote.py +++ b/src/buildstream/_remote.py @@ -16,6 +16,7 @@ # import os +import threading from collections import namedtuple from urllib.parse import urlparse @@ -146,41 +147,44 @@ class BaseRemote: self.push = spec.push self.url = spec.url + self._lock = threading.Lock() + # init(): # # Initialize the given remote. This function must be called before # any communication is performed, since such will otherwise fail. # def init(self): - if self._initialized: - return - - # Set up the communcation channel - url = urlparse(self.spec.url) - if url.scheme == "http": - port = url.port or 80 - self.channel = grpc.insecure_channel("{}:{}".format(url.hostname, port)) - elif url.scheme == "https": - port = url.port or 443 - try: - server_cert, client_key, client_cert = _read_files( - self.spec.server_cert, self.spec.client_key, self.spec.client_cert + with self._lock: + if self._initialized: + return + + # Set up the communcation channel + url = urlparse(self.spec.url) + if url.scheme == "http": + port = url.port or 80 + self.channel = grpc.insecure_channel("{}:{}".format(url.hostname, port)) + elif url.scheme == "https": + port = url.port or 443 + try: + server_cert, client_key, client_cert = _read_files( + self.spec.server_cert, self.spec.client_key, self.spec.client_cert + ) + except FileNotFoundError as e: + raise RemoteError("Could not read certificates: {}".format(e)) from e + self.server_cert = server_cert + self.client_key = client_key + self.client_cert = client_cert + credentials = grpc.ssl_channel_credentials( + root_certificates=self.server_cert, private_key=self.client_key, certificate_chain=self.client_cert ) - except FileNotFoundError as e: - raise RemoteError("Could not read certificates: {}".format(e)) from e - self.server_cert = server_cert - self.client_key = client_key - self.client_cert = client_cert - credentials = grpc.ssl_channel_credentials( - root_certificates=self.server_cert, private_key=self.client_key, certificate_chain=self.client_cert - ) - self.channel = grpc.secure_channel("{}:{}".format(url.hostname, port), credentials) - else: - raise RemoteError("Unsupported URL: {}".format(self.spec.url)) + self.channel = grpc.secure_channel("{}:{}".format(url.hostname, port), credentials) + else: + raise RemoteError("Unsupported URL: {}".format(self.spec.url)) - self._configure_protocols() + self._configure_protocols() - self._initialized = True + self._initialized = True def __enter__(self): return self diff --git a/src/buildstream/_scheduler/_multiprocessing.py b/src/buildstream/_scheduler/_multiprocessing.py deleted file mode 100644 index 4864e140c..000000000 --- a/src/buildstream/_scheduler/_multiprocessing.py +++ /dev/null @@ -1,79 +0,0 @@ -# -# Copyright (C) 2019 Bloomberg Finance LP -# -# This program is free software; you can redistribute it and/or -# modify it under the terms of the GNU Lesser General Public -# License as published by the Free Software Foundation; either -# version 2 of the License, or (at your option) any later version. -# -# This library is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU -# Lesser General Public License for more details. -# -# You should have received a copy of the GNU Lesser General Public -# License along with this library. If not, see . -# - -# TLDR: -# ALWAYS use `.AsyncioSafeProcess` when you have an asyncio event loop running and need a `multiprocessing.Process` -# -# -# The upstream asyncio library doesn't play well with forking subprocesses while an event loop is running. -# -# The main problem that affects us is that the parent and the child will share some file handlers. -# The most important one for us is the sig_handler_fd, which the loop uses to buffer signals received -# by the app so that the asyncio loop can treat them afterwards. -# -# This sharing means that when we send a signal to the child, the sighandler in the child will write -# it back to the parent sig_handler_fd, making the parent have to treat it too. -# This is a problem for example when we sigterm the process. The scheduler will send sigterms to all its children, -# which in turn will make the scheduler receive N SIGTERMs (one per child). Which in turn will send sigterms to -# the children... -# -# We therefore provide a `AsyncioSafeProcess` derived from multiprocessing.Process that automatically -# tries to cleanup the loop and never calls `waitpid` on the child process, which breaks our child watchers. -# -# -# Relevant issues: -# - Asyncio: support fork (https://bugs.python.org/issue21998) -# - Asyncio: support multiprocessing (support fork) (https://bugs.python.org/issue22087) -# - Signal delivered to a subprocess triggers parent's handler (https://bugs.python.org/issue31489) -# -# - -import multiprocessing -import signal -import sys -from asyncio import set_event_loop_policy - - -# _AsyncioSafeForkAwareProcess() -# -# Process class that doesn't call waitpid on its own. -# This prevents conflicts with the asyncio child watcher. -# -# Also automatically close any running asyncio loop before calling -# the actual run target -# -class _AsyncioSafeForkAwareProcess(multiprocessing.Process): - # pylint: disable=attribute-defined-outside-init - def start(self): - self._popen = self._Popen(self) - self._sentinel = self._popen.sentinel - - def run(self): - signal.set_wakeup_fd(-1) - set_event_loop_policy(None) - - super().run() - - -if sys.platform != "win32": - # Set the default event loop policy to automatically close our asyncio loop in child processes - AsyncioSafeProcess = _AsyncioSafeForkAwareProcess - -else: - # Windows doesn't support ChildWatcher that way anyways, we'll need another - # implementation if we want it - AsyncioSafeProcess = multiprocessing.Process diff --git a/src/buildstream/_scheduler/jobs/_job.pyi b/src/buildstream/_scheduler/jobs/_job.pyi new file mode 100644 index 000000000..fbf3e64de --- /dev/null +++ b/src/buildstream/_scheduler/jobs/_job.pyi @@ -0,0 +1 @@ +def terminate_thread(thread_id: int): ... diff --git a/src/buildstream/_scheduler/jobs/_job.pyx b/src/buildstream/_scheduler/jobs/_job.pyx new file mode 100644 index 000000000..82f6ab044 --- /dev/null +++ b/src/buildstream/_scheduler/jobs/_job.pyx @@ -0,0 +1,15 @@ +from cpython.pystate cimport PyThreadState_SetAsyncExc +from cpython.ref cimport PyObject +from ..._signals import TerminateException + + +# terminate_thread() +# +# Ask a given a given thread to terminate by raising an exception in it. +# +# Args: +# thread_id (int): the thread id in which to throw the exception +# +def terminate_thread(long thread_id): + res = PyThreadState_SetAsyncExc(thread_id, TerminateException) + assert res == 1 diff --git a/src/buildstream/_scheduler/jobs/job.py b/src/buildstream/_scheduler/jobs/job.py index 08e40694e..2e8f5ca1a 100644 --- a/src/buildstream/_scheduler/jobs/job.py +++ b/src/buildstream/_scheduler/jobs/job.py @@ -25,17 +25,16 @@ import asyncio import datetime import itertools import multiprocessing -import os -import signal -import sys +import threading import traceback # BuildStream toplevel imports +from ... import utils from ..._exceptions import ImplError, BstError, set_last_task_error, SkipJob from ..._message import Message, MessageType, unconditional_messages from ...types import FastEnum -from ... import _signals, utils -from .. import _multiprocessing +from ._job import terminate_thread +from ..._signals import TerminateException # Return code values shutdown of job handling child processes @@ -46,7 +45,6 @@ class _ReturnCode(FastEnum): PERM_FAIL = 2 SKIPPED = 3 TERMINATED = 4 - KILLED = -9 # JobStatus: @@ -131,7 +129,6 @@ class Job: self._scheduler = scheduler # The scheduler self._messenger = self._scheduler.context.messenger self._pipe_r = None # The read end of a pipe for message passing - self._process = None # The Process object self._listening = False # Whether the parent is currently listening self._suspended = False # Whether this job is currently suspended self._max_retries = max_retries # Maximum number of automatic retries @@ -144,6 +141,9 @@ class Job: self._message_element_key = None # The task-wide element cache key self._element = None # The Element() passed to the Job() constructor, if applicable + self._task = None # The task that is run + self._child = None + # set_name() # # Sets the name of this job @@ -158,12 +158,14 @@ class Job: assert not self._terminated, "Attempted to start process which was already terminated" + # FIXME: remove this, this is not necessary when using asyncio self._pipe_r, pipe_w = multiprocessing.Pipe(duplex=False) self._tries += 1 self._parent_start_listening() - child_job = self.create_child_job( # pylint: disable=assignment-from-no-return + # FIXME: remove the parent/child separation, it's not needed anymore. + self._child = self.create_child_job( # pylint: disable=assignment-from-no-return self.action_name, self._messenger, self._scheduler.context.logdir, @@ -174,46 +176,28 @@ class Job: self._message_element_key, ) - self._process = _multiprocessing.AsyncioSafeProcess(target=child_job.child_action, args=[pipe_w],) - - # Block signals which are handled in the main process such that - # the child process does not inherit the parent's state, but the main - # process will be notified of any signal after we launch the child. - # - with _signals.blocked([signal.SIGINT, signal.SIGTSTP, signal.SIGTERM], ignore=False): - with asyncio.get_child_watcher() as watcher: - self._process.start() - # Register the process to call `_parent_child_completed` once it is done - - # Close the write end of the pipe in the parent - pipe_w.close() + loop = asyncio.get_event_loop() - # Here we delay the call to the next loop tick. This is in order to be running - # in the main thread, as the callback itself must be thread safe. - def on_completion(pid, returncode): - asyncio.get_event_loop().call_soon(self._parent_child_completed, pid, returncode) + async def execute(): + result = await loop.run_in_executor(None, self._child.child_action, pipe_w) + await self._parent_child_completed(result) - watcher.add_child_handler(self._process.pid, on_completion) + self._task = loop.create_task(execute()) # terminate() # # Politely request that an ongoing job terminate soon. # - # This will send a SIGTERM signal to the Job process. + # This will raise an exception in the child to ask it to exit. # def terminate(self): - - # First resume the job if it's suspended - self.resume(silent=True) - self.message(MessageType.STATUS, "{} terminating".format(self.action_name)) # Make sure there is no garbage on the pipe self._parent_stop_listening() - # Terminate the process using multiprocessing API pathway - if self._process: - self._process.terminate() + if self._task: + self._child.terminate() self._terminated = True @@ -227,51 +211,6 @@ class Job: def get_terminated(self): return self._terminated - # kill() - # - # Forcefully kill the process, and any children it might have. - # - def kill(self): - # Force kill - self.message(MessageType.WARN, "{} did not terminate gracefully, killing".format(self.action_name)) - if self._process: - utils._kill_process_tree(self._process.pid) - - # suspend() - # - # Suspend this job. - # - def suspend(self): - if not self._suspended: - self.message(MessageType.STATUS, "{} suspending".format(self.action_name)) - - try: - # Use SIGTSTP so that child processes may handle and propagate - # it to processes they start that become session leaders. - os.kill(self._process.pid, signal.SIGTSTP) - - # For some reason we receive exactly one suspend event for - # every SIGTSTP we send to the child process, even though the - # child processes are setsid(). We keep a count of these so we - # can ignore them in our event loop suspend_event(). - self._scheduler.internal_stops += 1 - self._suspended = True - except ProcessLookupError: - # ignore, process has already exited - pass - - # resume() - # - # Resume this suspended job. - # - def resume(self, silent=False): - if self._suspended: - if not silent and not self._scheduler.terminated: - self.message(MessageType.STATUS, "{} resuming".format(self.action_name)) - - os.kill(self._process.pid, signal.SIGCONT) - self._suspended = False - # set_message_element_name() # # This is called by Job subclasses to set the plugin instance element @@ -380,10 +319,9 @@ class Job: # Called in the main process courtesy of asyncio's ChildWatcher.add_child_handler() # # Args: - # pid (int): The PID of the child which completed # returncode (int): The return code of the child process # - def _parent_child_completed(self, pid, returncode): + async def _parent_child_completed(self, returncode): self._parent_shutdown() try: @@ -414,16 +352,9 @@ class Job: status = JobStatus.FAIL elif returncode == _ReturnCode.TERMINATED: if self._terminated: - self.message(MessageType.INFO, "Process was terminated") - else: - self.message(MessageType.ERROR, "Process was terminated unexpectedly") - - status = JobStatus.FAIL - elif returncode == _ReturnCode.KILLED: - if self._terminated: - self.message(MessageType.INFO, "Process was killed") + self.message(MessageType.INFO, "Job terminated") else: - self.message(MessageType.ERROR, "Process was killed unexpectedly") + self.message(MessageType.ERROR, "Job was terminated unexpectedly") status = JobStatus.FAIL else: @@ -434,7 +365,7 @@ class Job: # Force the deletion of the pipe and process objects to try and clean up FDs self._pipe_r.close() - self._pipe_r = self._process = None + self._pipe_r = self._task = None # _parent_process_envelope() # @@ -549,6 +480,9 @@ class ChildJob: self._message_element_key = message_element_key self._pipe_w = None # The write end of a pipe for message passing + self._thread_id = None # Thread in which the child executes its action + self._should_terminate = False + self._terminate_lock = threading.Lock() # message(): # @@ -615,19 +549,6 @@ class ChildJob: # pipe_w (multiprocessing.connection.Connection): The message pipe for IPC # def child_action(self, pipe_w): - - # This avoids some SIGTSTP signals from grandchildren - # getting propagated up to the master process - os.setsid() - - # First set back to the default signal handlers for the signals - # we handle, and then clear their blocked state. - # - signal_list = [signal.SIGTSTP, signal.SIGTERM] - for sig in signal_list: - signal.signal(sig, signal.SIG_DFL) - signal.pthread_sigmask(signal.SIG_UNBLOCK, signal_list) - # Assign the pipe we passed across the process boundaries # # Set the global message handler in this child @@ -635,78 +556,108 @@ class ChildJob: self._pipe_w = pipe_w self._messenger.set_message_handler(self._child_message_handler) - # Graciously handle sigterms. - def handle_sigterm(): - self._child_shutdown(_ReturnCode.TERMINATED) - # Time, log and and run the action function # - with _signals.terminator( - handle_sigterm - ), self._messenger.timed_suspendable() as timeinfo, self._messenger.recorded_messages( + with self._messenger.timed_suspendable() as timeinfo, self._messenger.recorded_messages( self._logfile, self._logdir ) as filename: - self.message(MessageType.START, self.action_name, logfile=filename) - try: - # Try the task action - result = self.child_process() # pylint: disable=assignment-from-no-return - except SkipJob as e: - elapsed = datetime.datetime.now() - timeinfo.start_time - self.message(MessageType.SKIPPED, str(e), elapsed=elapsed, logfile=filename) - - # Alert parent of skip by return code - self._child_shutdown(_ReturnCode.SKIPPED) - except BstError as e: - elapsed = datetime.datetime.now() - timeinfo.start_time - retry_flag = e.temporary - - if retry_flag and (self._tries <= self._max_retries): - self.message( - MessageType.FAIL, - "Try #{} failed, retrying".format(self._tries), - elapsed=elapsed, - logfile=filename, - ) - else: - self.message( - MessageType.FAIL, str(e), elapsed=elapsed, detail=e.detail, logfile=filename, sandbox=e.sandbox - ) + self.message(MessageType.START, self.action_name, logfile=filename) + + with self._terminate_lock: + self._thread_id = threading.current_thread().ident + if self._should_terminate: + return _ReturnCode.TERMINATED + + try: + # Try the task action + result = self.child_process() # pylint: disable=assignment-from-no-return + except SkipJob as e: + elapsed = datetime.datetime.now() - timeinfo.start_time + self.message(MessageType.SKIPPED, str(e), elapsed=elapsed, logfile=filename) + + # Alert parent of skip by return code + return _ReturnCode.SKIPPED + except BstError as e: + elapsed = datetime.datetime.now() - timeinfo.start_time + retry_flag = e.temporary + + if retry_flag and (self._tries <= self._max_retries): + self.message( + MessageType.FAIL, + "Try #{} failed, retrying".format(self._tries), + elapsed=elapsed, + logfile=filename, + ) + else: + self.message( + MessageType.FAIL, + str(e), + elapsed=elapsed, + detail=e.detail, + logfile=filename, + sandbox=e.sandbox, + ) + + self._send_message(_MessageType.CHILD_DATA, self.child_process_data()) + + # Report the exception to the parent (for internal testing purposes) + self._child_send_error(e) + + # Set return code based on whether or not the error was temporary. + # + return _ReturnCode.FAIL if retry_flag else _ReturnCode.PERM_FAIL + except Exception: # pylint: disable=broad-except + + # If an unhandled (not normalized to BstError) occurs, that's a bug, + # send the traceback and formatted exception back to the frontend + # and print it to the log file. + # + elapsed = datetime.datetime.now() - timeinfo.start_time + detail = "An unhandled exception occured:\n\n{}".format(traceback.format_exc()) + + self.message(MessageType.BUG, self.action_name, elapsed=elapsed, detail=detail, logfile=filename) + # Unhandled exceptions should permenantly fail + return _ReturnCode.PERM_FAIL - self._send_message(_MessageType.CHILD_DATA, self.child_process_data()) - - # Report the exception to the parent (for internal testing purposes) - self._child_send_error(e) - - # Set return code based on whether or not the error was temporary. - # - self._child_shutdown(_ReturnCode.FAIL if retry_flag else _ReturnCode.PERM_FAIL) - - except Exception: # pylint: disable=broad-except - - # If an unhandled (not normalized to BstError) occurs, that's a bug, - # send the traceback and formatted exception back to the frontend - # and print it to the log file. - # - elapsed = datetime.datetime.now() - timeinfo.start_time - detail = "An unhandled exception occured:\n\n{}".format(traceback.format_exc()) + else: + # No exception occurred in the action + self._send_message(_MessageType.CHILD_DATA, self.child_process_data()) + self._child_send_result(result) + + elapsed = datetime.datetime.now() - timeinfo.start_time + self.message(MessageType.SUCCESS, self.action_name, elapsed=elapsed, logfile=filename) + + # Shutdown needs to stay outside of the above context manager, + # make sure we dont try to handle SIGTERM while the process + # is already busy in sys.exit() + return _ReturnCode.OK + finally: + self._thread_id = None + except TerminateException: + self._thread_id = None + return _ReturnCode.TERMINATED + finally: + self._pipe_w.close() - self.message(MessageType.BUG, self.action_name, elapsed=elapsed, detail=detail, logfile=filename) - # Unhandled exceptions should permenantly fail - self._child_shutdown(_ReturnCode.PERM_FAIL) + # terminate() + # + # Ask the the current child thread to terminate + # + # This should only ever be called from the main thread. + # + def terminate(self): + assert utils._is_in_main_thread(), "Terminating the job's thread should only be done from the scheduler" - else: - # No exception occurred in the action - self._send_message(_MessageType.CHILD_DATA, self.child_process_data()) - self._child_send_result(result) + if self._should_terminate: + return - elapsed = datetime.datetime.now() - timeinfo.start_time - self.message(MessageType.SUCCESS, self.action_name, elapsed=elapsed, logfile=filename) + with self._terminate_lock: + self._should_terminate = True + if self._thread_id is None: + return - # Shutdown needs to stay outside of the above context manager, - # make sure we dont try to handle SIGTERM while the process - # is already busy in sys.exit() - self._child_shutdown(_ReturnCode.OK) + terminate_thread(self._thread_id) ####################################################### # Local Private Methods # @@ -758,18 +709,6 @@ class ChildJob: if result is not None: self._send_message(_MessageType.RESULT, result) - # _child_shutdown() - # - # Shuts down the child process by cleaning up and exiting the process - # - # Args: - # exit_code (_ReturnCode): The exit code to exit with - # - def _child_shutdown(self, exit_code): - self._pipe_w.close() - assert isinstance(exit_code, _ReturnCode) - sys.exit(exit_code.value) - # _child_message_handler() # # A Context delegate for handling messages, this replaces the diff --git a/src/buildstream/_scheduler/queues/trackqueue.py b/src/buildstream/_scheduler/queues/trackqueue.py index 5b3f05b57..5879cc125 100644 --- a/src/buildstream/_scheduler/queues/trackqueue.py +++ b/src/buildstream/_scheduler/queues/trackqueue.py @@ -56,9 +56,10 @@ class TrackQueue(Queue): # Set the new refs in the main process one by one as they complete, # writing to bst files this time if result is not None: - for unique_id, new_ref in result: - source = Plugin._lookup(unique_id) - source._set_ref(new_ref, save=True) + for unique_id, new_ref, ref_changed in result: + if ref_changed: + source = Plugin._lookup(unique_id) + source._set_ref(new_ref, save=True) element._tracking_done() diff --git a/src/buildstream/_scheduler/scheduler.py b/src/buildstream/_scheduler/scheduler.py index 903cd0be9..b46314a9a 100644 --- a/src/buildstream/_scheduler/scheduler.py +++ b/src/buildstream/_scheduler/scheduler.py @@ -20,12 +20,15 @@ # Jürg Billeter # System imports +import functools import os import asyncio from itertools import chain import signal import datetime +import multiprocessing.forkserver import sys +from concurrent.futures import ThreadPoolExecutor # Local imports from .resources import Resources @@ -34,9 +37,7 @@ from ..types import FastEnum from .._profile import Topics, PROFILER from .._message import Message, MessageType from ..plugin import Plugin - - -_MAX_TIMEOUT_TO_KILL_CHILDREN = 20 # in seconds +from .. import _signals # A decent return code for Scheduler.run() @@ -46,6 +47,23 @@ class SchedStatus(FastEnum): TERMINATED = 1 +def reset_signals_on_exit(func): + @functools.wraps(func) + def wrapper(*args, **kwargs): + orig_sigint = signal.getsignal(signal.SIGINT) + orig_sigterm = signal.getsignal(signal.SIGTERM) + orig_sigtstp = signal.getsignal(signal.SIGTSTP) + + try: + return func(*args, **kwargs) + finally: + signal.signal(signal.SIGINT, orig_sigint) + signal.signal(signal.SIGTERM, orig_sigterm) + signal.signal(signal.SIGTSTP, orig_sigtstp) + + return wrapper + + # Scheduler() # # The scheduler operates on a list queues, each of which is meant to accomplish @@ -79,7 +97,6 @@ class Scheduler: # These are shared with the Job, but should probably be removed or made private in some way. self.loop = None # Shared for Job access to observe the message queue - self.internal_stops = 0 # Amount of SIGSTP signals we've introduced, this is shared with job.py # # Private members @@ -98,6 +115,14 @@ class Scheduler: self.resources = Resources(context.sched_builders, context.sched_fetchers, context.sched_pushers) self._state.register_task_retry_callback(self._failure_retry) + # Ensure that the forkserver is started before we start. + # This is best run before we do any GRPC connections to casd or have + # other background threads started. + # We ignore signals here, as this is the state all the python child + # processes from now on will have when starting + with _signals.blocked([signal.SIGINT, signal.SIGTSTP], ignore=True): + multiprocessing.forkserver.ensure_running() + # run() # # Args: @@ -113,6 +138,7 @@ class Scheduler: # elements have been processed by each queue or when # an error arises # + @reset_signals_on_exit def run(self, queues, casd_process_manager): # Hold on to the queues to process @@ -149,10 +175,18 @@ class Scheduler: # Start the profiler with PROFILER.profile(Topics.SCHEDULER, "_".join(queue.action_name for queue in self.queues)): - # Run the queues - self._sched() - self.loop.run_forever() - self.loop.close() + # This is not a no-op. Since it is the first signal registration + # that is set, it allows then other threads to register signal + # handling routines, which would not be possible if the main thread + # hadn't set it before. + # FIXME: this should be done in a cleaner way + with _signals.suspendable(lambda: None, lambda: None), _signals.terminator(lambda: None): + with ThreadPoolExecutor(max_workers=sum(self.resources._max_resources.values())) as pool: + self.loop.set_default_executor(pool) + # Run the queues + self._sched() + self.loop.run_forever() + self.loop.close() # Invoke the ticker callback a final time to render pending messages self._ticker_callback() @@ -351,13 +385,6 @@ class Scheduler: # If that happens, do another round. process_queues = any(q.dequeue_ready() for q in self.queues) - # Make sure fork is allowed before starting jobs - if not self.context.prepare_fork(): - message = Message(MessageType.BUG, "Fork is not allowed", detail="Background threads are active") - self.context.messenger.message(message) - self.terminate() - return - # Start the jobs # for job in ready: @@ -415,9 +442,10 @@ class Scheduler: if not self.suspended: self._suspendtime = datetime.datetime.now() self.suspended = True - # Notify that we're suspended - for job in self._active_jobs: - job.suspend() + _signals.is_not_suspended.clear() + + for suspender in reversed(_signals.suspendable_stack): + suspender.suspend() # _resume_jobs() # @@ -425,8 +453,10 @@ class Scheduler: # def _resume_jobs(self): if self.suspended: - for job in self._active_jobs: - job.resume() + for suspender in _signals.suspendable_stack: + suspender.resume() + + _signals.is_not_suspended.set() self.suspended = False # Notify that we're unsuspended self._state.offset_start_time(datetime.datetime.now() - self._suspendtime) @@ -458,12 +488,6 @@ class Scheduler: # A loop registered event callback for SIGTSTP # def _suspend_event(self): - - # Ignore the feedback signals from Job.suspend() - if self.internal_stops: - self.internal_stops -= 1 - return - # No need to care if jobs were suspended or not, we _only_ handle this # while we know jobs are not suspended. self._suspend_jobs() @@ -485,13 +509,6 @@ class Scheduler: self.loop.remove_signal_handler(signal.SIGTERM) def _terminate_jobs_real(self): - def kill_jobs(): - for job_ in self._active_jobs: - job_.kill() - - # Schedule all jobs to be killed if they have not exited after timeout - self.loop.call_later(_MAX_TIMEOUT_TO_KILL_CHILDREN, kill_jobs) - for job in self._active_jobs: job.terminate() diff --git a/src/buildstream/_signals.py b/src/buildstream/_signals.py index 8032752a8..ef2ba1965 100644 --- a/src/buildstream/_signals.py +++ b/src/buildstream/_signals.py @@ -36,6 +36,20 @@ suspendable_stack: Deque[Callable] = deque() terminator_lock = threading.Lock() suspendable_lock = threading.Lock() +# This event is used to block all the threads while we wait for user +# interaction. This is because we can't stop all the pythin threads but the +# one easily when waiting for user input. However, most performance intensive +# tasks will pass through a subprocess or a multiprocess.Process and all of +# those are guarded by the signal handling. Thus, by setting and unsetting this +# event in the scheduler, we can enable and disable the launching of processes +# and ensure we don't do anything resource intensive while being interrupted. +is_not_suspended = threading.Event() +is_not_suspended.set() + + +class TerminateException(BaseException): + pass + # Per process SIGTERM handler def terminator_handler(signal_, frame): @@ -71,6 +85,9 @@ def terminator_handler(signal_, frame): # that while the code block is running, the supplied function # will be called upon process termination. # +# /!\ The callbacks passed must only contain code that does not acces thread +# local variables. Those will run in the main thread. +# # Note that after handlers are called, the termination will be handled by # terminating immediately with os._exit(). This means that SystemExit will not # be raised and 'finally' clauses will not be executed. @@ -87,12 +104,17 @@ def terminator(terminate_func): assert threading.current_thread() == threading.main_thread() or not outermost - terminator_stack.append(terminate_func) + with terminator_lock: + terminator_stack.append(terminate_func) + if outermost: original_handler = signal.signal(signal.SIGTERM, terminator_handler) try: yield + except TerminateException: + terminate_func() + raise finally: if outermost: signal.signal(signal.SIGTERM, original_handler) @@ -110,21 +132,25 @@ class Suspender: # Per process SIGTSTP handler def suspend_handler(sig, frame): + is_not_suspended.clear() # Suspend callbacks from innermost frame first - for suspender in reversed(suspendable_stack): - suspender.suspend() + with suspendable_lock: + for suspender in reversed(suspendable_stack): + suspender.suspend() - # Use SIGSTOP directly now on self, dont introduce more SIGTSTP - # - # Here the process sleeps until SIGCONT, which we simply - # dont handle. We know we'll pickup execution right here - # when we wake up. - os.kill(os.getpid(), signal.SIGSTOP) + # Use SIGSTOP directly now on self, dont introduce more SIGTSTP + # + # Here the process sleeps until SIGCONT, which we simply + # dont handle. We know we'll pickup execution right here + # when we wake up. + os.kill(os.getpid(), signal.SIGSTOP) - # Resume callbacks from outermost frame inwards - for suspender in suspendable_stack: - suspender.resume() + # Resume callbacks from outermost frame inwards + for suspender in suspendable_stack: + suspender.resume() + + is_not_suspended.set() # suspendable() @@ -135,6 +161,9 @@ def suspend_handler(sig, frame): # suspend_callback (callable): A function to call as process suspend time. # resume_callback (callable): A function to call as process resume time. # +# /!\ The callbacks passed must only contain code that does not acces thread +# local variables. Those will run in the main thread. +# # This must be used in code blocks which start processes that become # their own session leader. In these cases, SIGSTOP and SIGCONT need # to be propagated to the child process group. @@ -150,8 +179,17 @@ def suspendable(suspend_callback, resume_callback): outermost = bool(not suspendable_stack) assert threading.current_thread() == threading.main_thread() or not outermost + # If we are not in the main thread, ensure that we are not suspended + # before running. + # If we are in the main thread, never block on this, to ensure we + # don't deadlock. + if threading.current_thread() != threading.main_thread(): + is_not_suspended.wait() + suspender = Suspender(suspend_callback, resume_callback) - suspendable_stack.append(suspender) + + with suspendable_lock: + suspendable_stack.append(suspender) if outermost: original_stop = signal.signal(signal.SIGTSTP, suspend_handler) diff --git a/src/buildstream/_workspaces.py b/src/buildstream/_workspaces.py index f1fc353a7..e51be0829 100644 --- a/src/buildstream/_workspaces.py +++ b/src/buildstream/_workspaces.py @@ -428,7 +428,7 @@ class Workspaces: # create_workspace permanent # def save_config(self): - assert utils._is_main_process() + assert utils._is_in_main_thread() config = { "format-version": BST_WORKSPACE_FORMAT_VERSION, diff --git a/src/buildstream/element.py b/src/buildstream/element.py index b6b4b801c..6c4e45d03 100644 --- a/src/buildstream/element.py +++ b/src/buildstream/element.py @@ -1542,7 +1542,7 @@ class Element(Plugin): # scope (_Scope): The scope of dependencies to mark as required # def _set_required(self, scope=_Scope.RUN): - assert utils._is_main_process(), "This has an impact on all elements and must be run in the main process" + assert utils._is_in_main_thread(), "This has an impact on all elements and must be run in the main thread" if self.__required: # Already done @@ -1579,7 +1579,7 @@ class Element(Plugin): # required in the local cache. # def _set_artifact_files_required(self, scope=_Scope.RUN): - assert utils._is_main_process(), "This has an impact on all elements and must be run in the main process" + assert utils._is_in_main_thread(), "This has an impact on all elements and must be run in the main thread" if self.__artifact_files_required: # Already done @@ -1629,7 +1629,7 @@ class Element(Plugin): # in a subprocess. # def __schedule_assembly_when_necessary(self): - assert utils._is_main_process(), "This has an impact on all elements and must be run in the main process" + assert utils._is_in_main_thread(), "This has an impact on all elements and must be run in the main thread" # FIXME: We could reduce the number of function calls a bit by # factoring this out of this method (and checking whether we @@ -1662,7 +1662,7 @@ class Element(Plugin): # def _assemble_done(self, successful): assert self.__assemble_scheduled - assert utils._is_main_process(), "This has an impact on all elements and must be run in the main process" + assert utils._is_in_main_thread(), "This has an impact on all elements and must be run in the main thread" self.__assemble_done = True @@ -1682,8 +1682,6 @@ class Element(Plugin): self._update_ready_for_runtime_and_cached() if self._get_workspace() and self._cached(): - assert utils._is_main_process(), "Attempted to save workspace configuration from child process" - # # Note that this block can only happen in the # main process, since `self._cached_success()` cannot # be true when assembly is successful in the task. @@ -1860,7 +1858,7 @@ class Element(Plugin): # fetched_original (bool): Whether the original sources had been asked (and fetched) or not # def _fetch_done(self, fetched_original): - assert utils._is_main_process(), "This has an impact on all elements and must be run in the main process" + assert utils._is_in_main_thread(), "This has an impact on all elements and must be run in the main thread" self.__sources.fetch_done(fetched_original) @@ -1906,7 +1904,7 @@ class Element(Plugin): # This will result in updating the element state. # def _pull_done(self): - assert utils._is_main_process(), "This has an impact on all elements and must be run in the main process" + assert utils._is_in_main_thread(), "This has an impact on all elements and must be run in the main thread" self.__pull_done = True @@ -2084,7 +2082,7 @@ class Element(Plugin): # the workspaces metadata first. # def _open_workspace(self): - assert utils._is_main_process(), "This writes to a global file and therefore must be run in the main process" + assert utils._is_in_main_thread(), "This writes to a global file and therefore must be run in the main thread" context = self._get_context() workspace = self._get_workspace() @@ -2386,7 +2384,7 @@ class Element(Plugin): # the appropriate counters. # def _update_ready_for_runtime_and_cached(self): - assert utils._is_main_process(), "This has an impact on all elements and must be run in the main process" + assert utils._is_in_main_thread(), "This has an impact on all elements and must be run in the main thread" if not self.__ready_for_runtime_and_cached: if self.__runtime_deps_uncached == 0 and self.__cache_key and self._cached_success(): @@ -3132,7 +3130,7 @@ class Element(Plugin): # in Scope.BUILD has changed in any way. # def __update_cache_keys(self): - assert utils._is_main_process(), "This has an impact on all elements and must be run in the main process" + assert utils._is_in_main_thread(), "This has an impact on all elements and must be run in the main thread" if self.__strict_cache_key is not None: # Cache keys already calculated @@ -3205,7 +3203,7 @@ class Element(Plugin): # it can check whether an artifact exists for that cache key. # def __update_artifact_state(self): - assert utils._is_main_process(), "This has an impact on all elements and must be run in the main process" + assert utils._is_in_main_thread(), "This has an impact on all elements and must be run in the main thread" assert self.__artifact is None context = self._get_context() @@ -3240,7 +3238,7 @@ class Element(Plugin): # a remote cache). # def __update_cache_key_non_strict(self): - assert utils._is_main_process(), "This has an impact on all elements and must be run in the main process" + assert utils._is_in_main_thread(), "This has an impact on all elements and must be run in the main thread" # The final cache key can be None here only in non-strict mode if self.__cache_key is None: diff --git a/src/buildstream/plugin.py b/src/buildstream/plugin.py index f0c1bf859..f98005644 100644 --- a/src/buildstream/plugin.py +++ b/src/buildstream/plugin.py @@ -613,8 +613,7 @@ class Plugin: proc = self.__multiprocessing_context.Process( target=_background_job_wrapper, args=(result_queue, target, args) ) - with _signals.blocked([signal.SIGINT, signal.SIGTSTP, signal.SIGTERM], ignore=False): - proc.start() + proc.start() should_continue = True last_check = False diff --git a/src/buildstream/sandbox/_sandboxbuildboxrun.py b/src/buildstream/sandbox/_sandboxbuildboxrun.py index 3d71b7440..1c187d7fd 100644 --- a/src/buildstream/sandbox/_sandboxbuildboxrun.py +++ b/src/buildstream/sandbox/_sandboxbuildboxrun.py @@ -184,17 +184,27 @@ class SandboxBuildBoxRun(SandboxREAPI): try: while True: try: - returncode = process.wait() + # Here, we don't use `process.wait()` directly without a timeout + # This is because, if we were to do that, and the process would never + # output anything, the control would never be given back to the python + # process, which might thus not be able to check for request to + # shutdown, or kill the process. + # We therefore loop with a timeout, to ensure the python process + # can act if it needs. + returncode = process.wait(timeout=1) # If the process exits due to a signal, we # brutally murder it to avoid zombies if returncode < 0: utils._kill_process_tree(process.pid) + except subprocess.TimeoutExpired: + continue + # Unlike in the bwrap case, here only the main # process seems to receive the SIGINT. We pass # on the signal to the child and then continue # to wait. - except KeyboardInterrupt: + except _signals.TerminateException: process.send_signal(signal.SIGINT) continue diff --git a/src/buildstream/sandbox/_sandboxremote.py b/src/buildstream/sandbox/_sandboxremote.py index 6cba7d611..2ac159337 100644 --- a/src/buildstream/sandbox/_sandboxremote.py +++ b/src/buildstream/sandbox/_sandboxremote.py @@ -26,7 +26,6 @@ from functools import partial import grpc -from .. import utils from ..node import Node from .._message import Message, MessageType from ._sandboxreapi import SandboxREAPI @@ -59,9 +58,6 @@ class SandboxRemote(SandboxREAPI): if config is None: return - # gRPC doesn't support fork without exec, which is used in the main process. - assert not utils._is_main_process() - self.storage_url = config.storage_service["url"] self.exec_url = config.exec_service["url"] diff --git a/src/buildstream/source.py b/src/buildstream/source.py index e0a2db45d..3268d3a93 100644 --- a/src/buildstream/source.py +++ b/src/buildstream/source.py @@ -928,17 +928,14 @@ class Source(Plugin): clean = node.strip_node_info() to_modify = node.strip_node_info() - current_ref = self.get_ref() # pylint: disable=assignment-from-no-return - # Set the ref regardless of whether it changed, the # TrackQueue() will want to update a specific node with # the ref, regardless of whether the original has changed. self.set_ref(new_ref, to_modify) - if current_ref == new_ref or not save: - # Note: We do not look for and propagate changes at this point - # which might result in desync depending if something changes about - # tracking in the future. For now, this is quite safe. + # FIXME: this will save things too often, as a ref might not have + # changed. We should optimize this to detect it differently + if not save: return False # Ensure the node is not from a junction diff --git a/src/buildstream/utils.py b/src/buildstream/utils.py index 956ad1d65..8e6a51273 100644 --- a/src/buildstream/utils.py +++ b/src/buildstream/utils.py @@ -34,6 +34,7 @@ from stat import S_ISDIR import subprocess from subprocess import TimeoutExpired import tempfile +import threading import time import datetime import itertools @@ -869,13 +870,12 @@ def _pretty_size(size, dec_places=0): return "{size:g}{unit}".format(size=round(psize, dec_places), unit=unit) -# _is_main_process() +# _is_in_main_thread() # -# Return whether we are in the main process or not. +# Return whether we are running in the main thread or not # -def _is_main_process(): - assert _MAIN_PID is not None - return os.getpid() == _MAIN_PID +def _is_in_main_thread(): + return threading.current_thread() is threading.main_thread() # Remove a path and any empty directories leading up to it. diff --git a/tests/artifactcache/pull.py b/tests/artifactcache/pull.py index e6eaec960..63e6d9814 100644 --- a/tests/artifactcache/pull.py +++ b/tests/artifactcache/pull.py @@ -162,6 +162,7 @@ def test_pull_tree(cli, tmpdir, datafiles): # Assert that we are not cached locally anymore artifactcache.close_grpc_channels() + cas._casd_channel.request_shutdown() cas.close_grpc_channels() assert cli.get_element_state(project_dir, "target.bst") != "cached" diff --git a/tests/internals/cascache.py b/tests/internals/cascache.py index 043531c24..e27e40974 100644 --- a/tests/internals/cascache.py +++ b/tests/internals/cascache.py @@ -3,6 +3,7 @@ import time from unittest.mock import MagicMock from buildstream._cas.cascache import CASCache +from buildstream._cas import casdprocessmanager from buildstream._message import MessageType from buildstream._messenger import Messenger @@ -31,6 +32,10 @@ def test_report_when_cascache_exits_not_cleanly(tmp_path, monkeypatch): dummy_buildbox_casd.write_text("#!/usr/bin/env sh\nwhile :\ndo\nsleep 60\ndone") dummy_buildbox_casd.chmod(0o777) monkeypatch.setenv("PATH", str(tmp_path), prepend=os.pathsep) + # FIXME: this is a hack, we should instead have a socket be created nicely + # on the fake casd script. This whole test suite probably would + # need some cleanup + monkeypatch.setattr(casdprocessmanager, "_CASD_TIMEOUT", 0.1) messenger = MagicMock(spec_set=Messenger) cache = CASCache(str(tmp_path.joinpath("casd")), casd=True, log_directory=str(tmp_path.joinpath("logs"))) @@ -50,6 +55,10 @@ def test_report_when_cascache_is_forcefully_killed(tmp_path, monkeypatch): dummy_buildbox_casd.write_text("#!/usr/bin/env sh\ntrap 'echo hello' TERM\nwhile :\ndo\nsleep 60\ndone") dummy_buildbox_casd.chmod(0o777) monkeypatch.setenv("PATH", str(tmp_path), prepend=os.pathsep) + # FIXME: this is a hack, we should instead have a socket be created nicely + # on the fake casd script. This whole test suite probably would + # need some cleanup + monkeypatch.setattr(casdprocessmanager, "_CASD_TIMEOUT", 0.1) messenger = MagicMock(spec_set=Messenger) cache = CASCache(str(tmp_path.joinpath("casd")), casd=True, log_directory=str(tmp_path.joinpath("logs"))) -- cgit v1.2.1 From 6cec972cc7b3f51ecc22a6de8f4b636383f0f099 Mon Sep 17 00:00:00 2001 From: Benjamin Schubert Date: Wed, 16 Sep 2020 09:59:57 +0000 Subject: utils.py: remove 'is_single_threaded' method, we use it only in tests This moves it to tests with a simplified usage, since we don't use it anywhere else --- src/buildstream/testing/_fixtures.py | 28 +++++++++++++++++++++++----- src/buildstream/utils.py | 35 ----------------------------------- 2 files changed, 23 insertions(+), 40 deletions(-) diff --git a/src/buildstream/testing/_fixtures.py b/src/buildstream/testing/_fixtures.py index 520f68587..095a5f6f1 100644 --- a/src/buildstream/testing/_fixtures.py +++ b/src/buildstream/testing/_fixtures.py @@ -16,26 +16,44 @@ # pylint: disable=redefined-outer-name +import time + import psutil import pytest -from buildstream import node, utils, DownloadableFileSource +from buildstream import node, DownloadableFileSource + + +# Number of seconds to wait for background threads to exit. +_AWAIT_THREADS_TIMEOUT_SECONDS = 5 + + +def has_no_unexpected_background_threads(expected_num_threads): + # Use psutil as threading.active_count() doesn't include gRPC threads. + process = psutil.Process() + + wait = 0.1 + for _ in range(0, int(_AWAIT_THREADS_TIMEOUT_SECONDS / wait)): + if process.num_threads() == expected_num_threads: + return True + time.sleep(wait) + + return False @pytest.fixture(autouse=True, scope="session") def default_thread_number(): # xdist/execnet has its own helper thread. - # Ignore that for `utils._is_single_threaded` checks. - utils._INITIAL_NUM_THREADS_IN_MAIN_PROCESS = psutil.Process().num_threads() + return psutil.Process().num_threads() # Catch tests that don't shut down background threads, which could then lead # to other tests hanging when BuildStream uses fork(). @pytest.fixture(autouse=True) def thread_check(default_thread_number): - assert utils._is_single_threaded() + assert has_no_unexpected_background_threads(default_thread_number) yield - assert utils._is_single_threaded() + assert has_no_unexpected_background_threads(default_thread_number) # Reset global state in node.pyx to improve test isolation diff --git a/src/buildstream/utils.py b/src/buildstream/utils.py index 8e6a51273..04bbf261c 100644 --- a/src/buildstream/utils.py +++ b/src/buildstream/utils.py @@ -35,7 +35,6 @@ import subprocess from subprocess import TimeoutExpired import tempfile import threading -import time import datetime import itertools from contextlib import contextmanager @@ -62,16 +61,6 @@ BST_ARBITRARY_TIMESTAMP = calendar.timegm((2011, 11, 11, 11, 11, 11)) _ALIAS_SEPARATOR = ":" _URI_SCHEMES = ["http", "https", "ftp", "file", "git", "sftp", "ssh"] -# Main process pid -_MAIN_PID = os.getpid() - -# The number of threads in the main process at startup. -# This is 1 except for certain test environments (xdist/execnet). -_INITIAL_NUM_THREADS_IN_MAIN_PROCESS = 1 - -# Number of seconds to wait for background threads to exit. -_AWAIT_THREADS_TIMEOUT_SECONDS = 5 - # The process's file mode creation mask. # Impossible to retrieve without temporarily changing it on POSIX. _UMASK = os.umask(0o777) @@ -1600,30 +1589,6 @@ def _get_compression(tar): return "" -# _is_single_threaded() -# -# Return whether the current Process is single-threaded. Don't count threads -# in the main process that were created by a test environment (xdist/execnet) -# before BuildStream was executed. -# -def _is_single_threaded(): - # Use psutil as threading.active_count() doesn't include gRPC threads. - process = psutil.Process() - - if process.pid == _MAIN_PID: - expected_num_threads = _INITIAL_NUM_THREADS_IN_MAIN_PROCESS - else: - expected_num_threads = 1 - - # gRPC threads are not joined when shut down. Wait for them to exit. - wait = 0.1 - for _ in range(0, int(_AWAIT_THREADS_TIMEOUT_SECONDS / wait)): - if process.num_threads() == expected_num_threads: - return True - time.sleep(wait) - return False - - # _parse_version(): # # Args: -- cgit v1.2.1 From c9847346c42c322cbd595a0de832711eba2e0f7d Mon Sep 17 00:00:00 2001 From: Benjamin Schubert Date: Tue, 24 Nov 2020 21:15:32 +0000 Subject: scheduler.py: Reconnect signal handlers sooner This reduces a race condition where a sigint received shortly after restarting the scheduler would cause the schedulert to crash. --- src/buildstream/_scheduler/scheduler.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/buildstream/_scheduler/scheduler.py b/src/buildstream/_scheduler/scheduler.py index b46314a9a..7acb062d0 100644 --- a/src/buildstream/_scheduler/scheduler.py +++ b/src/buildstream/_scheduler/scheduler.py @@ -264,8 +264,8 @@ class Scheduler: # Restart the scheduler # def resume(self): - self._resume_jobs() self._connect_signals() + self._resume_jobs() # stop() # -- cgit v1.2.1