diff options
author | Benjamin Schubert <1593324-BenjaminSchubert@users.noreply.gitlab.com> | 2020-12-04 15:25:54 +0000 |
---|---|---|
committer | Benjamin Schubert <1593324-BenjaminSchubert@users.noreply.gitlab.com> | 2020-12-04 15:25:54 +0000 |
commit | 458c05751f089673dc1cae8b8edfabfa32cf21b7 (patch) | |
tree | 7fc64a082ec4a15079e5d6907a6605ac2d19e7b6 | |
parent | 6b961f20a8b2174a342e9b92e68752b2ff19da29 (diff) | |
parent | c9847346c42c322cbd595a0de832711eba2e0f7d (diff) | |
download | buildstream-458c05751f089673dc1cae8b8edfabfa32cf21b7.tar.gz |
Merge branch 'bschubert/no-multiprocessing' into 'master'
Rework the scheduler to use threads instead of processes
Closes #911, #93, and #810
See merge request BuildStream/buildstream!1982
28 files changed, 697 insertions, 612 deletions
@@ -8,6 +8,7 @@ extension-pkg-whitelist= buildstream._loader._loader, buildstream._loader.loadelement, buildstream._loader.types, + buildstream._scheduler.jobs._job, buildstream._types, buildstream._utils, buildstream._variables, @@ -28,7 +28,7 @@ warn_no_return = True # Ignore missing stubs for third-party packages. # In future, these should be re-enabled if/when stubs for them become available. -[mypy-copyreg,grpc,pluginbase,psutil,pyroaring,ruamel] +[mypy-copyreg,grpc,pluginbase,psutil,pyroaring,ruamel,multiprocessing.forkserver] ignore_missing_imports=True # Ignore issues with generated files and vendored code @@ -319,6 +319,7 @@ BUILD_EXTENSIONS = [] register_cython_module("buildstream.node") register_cython_module("buildstream._loader._loader") register_cython_module("buildstream._loader.loadelement", dependencies=["buildstream.node"]) +register_cython_module("buildstream._scheduler.jobs._job") register_cython_module("buildstream._yaml", dependencies=["buildstream.node"]) register_cython_module("buildstream._types") register_cython_module("buildstream._utils") diff --git a/src/buildstream/_cas/cascache.py b/src/buildstream/_cas/cascache.py index 7936121ea..d13531c6c 100644 --- a/src/buildstream/_cas/cascache.py +++ b/src/buildstream/_cas/cascache.py @@ -22,11 +22,9 @@ import itertools import os import stat import contextlib -import ctypes -import multiprocessing -import signal import time from typing import Optional, List +import threading import grpc @@ -34,7 +32,7 @@ from .._protos.google.rpc import code_pb2 from .._protos.build.bazel.remote.execution.v2 import remote_execution_pb2 from .._protos.build.buildgrid import local_cas_pb2 -from .. import _signals, utils +from .. import utils from ..types import FastEnum, SourceRef from .._exceptions import CASCacheError @@ -93,6 +91,7 @@ class CASCache: self._casd_channel = self._casd_process_manager.create_channel() self._cache_usage_monitor = _CASCacheUsageMonitor(self._casd_channel) + self._cache_usage_monitor.start() # get_cas(): # @@ -131,8 +130,12 @@ class CASCache: # Release resources used by CASCache. # def release_resources(self, messenger=None): + if self._casd_channel: + self._casd_channel.request_shutdown() + if self._cache_usage_monitor: - self._cache_usage_monitor.release_resources() + self._cache_usage_monitor.stop() + self._cache_usage_monitor.join() if self._casd_process_manager: self.close_grpc_channels() @@ -731,65 +734,45 @@ class _CASCacheUsage: # This manages the subprocess that tracks cache usage information via # buildbox-casd. # -class _CASCacheUsageMonitor: +class _CASCacheUsageMonitor(threading.Thread): def __init__(self, connection): + super().__init__() self._connection = connection - - # Shared memory (64-bit signed integer) for current disk usage and quota - self._disk_usage = multiprocessing.Value(ctypes.c_longlong, -1) - self._disk_quota = multiprocessing.Value(ctypes.c_longlong, -1) - - # multiprocessing.Process will fork without exec on Unix. - # This can't be allowed with background threads or open gRPC channels. - assert utils._is_single_threaded() and connection.is_closed() - - # Block SIGINT, we don't want to kill the process when we interrupt the frontend - # and this process if very lightweight. - with _signals.blocked([signal.SIGINT], ignore=False): - self._subprocess = multiprocessing.Process(target=self._subprocess_run) - self._subprocess.start() + self._disk_usage = None + self._disk_quota = None + self._should_stop = False def get_cache_usage(self): - disk_usage = self._disk_usage.value - disk_quota = self._disk_quota.value - - if disk_usage < 0: - # Disk usage still unknown - disk_usage = None - - if disk_quota <= 0: - # No disk quota - disk_quota = None - - return _CASCacheUsage(disk_usage, disk_quota) - - def release_resources(self): - # Simply terminate the subprocess, no cleanup required in the subprocess - self._subprocess.terminate() + return _CASCacheUsage(self._disk_usage, self._disk_quota) - def _subprocess_run(self): - # Reset SIGTERM in subprocess to default as no cleanup is necessary - signal.signal(signal.SIGTERM, signal.SIG_DFL) + def stop(self): + self._should_stop = True - disk_usage = self._disk_usage - disk_quota = self._disk_quota + def run(self): local_cas = self._connection.get_local_cas() - while True: + while not self._should_stop: try: # Ask buildbox-casd for current value request = local_cas_pb2.GetLocalDiskUsageRequest() response = local_cas.GetLocalDiskUsage(request) # Update values in shared memory - disk_usage.value = response.size_bytes - disk_quota.value = response.quota_bytes + self._disk_usage = response.size_bytes + disk_quota = response.quota_bytes + if disk_quota == 0: # Quota == 0 means there is no quota + self._disk_quota = None + else: + self._disk_quota = disk_quota except grpc.RpcError: # Terminate loop when buildbox-casd becomes unavailable break # Sleep until next refresh - time.sleep(_CACHE_USAGE_REFRESH) + for _ in range(_CACHE_USAGE_REFRESH * 10): + if self._should_stop: + break + time.sleep(0.1) def _grouper(iterable, n): diff --git a/src/buildstream/_cas/casdprocessmanager.py b/src/buildstream/_cas/casdprocessmanager.py index 32e4cce63..20ff610eb 100644 --- a/src/buildstream/_cas/casdprocessmanager.py +++ b/src/buildstream/_cas/casdprocessmanager.py @@ -17,6 +17,7 @@ # import contextlib +import threading import os import random import shutil @@ -240,35 +241,48 @@ class CASDChannel: self._asset_fetch = None self._asset_push = None self._casd_pid = casd_pid + self._shutdown_requested = False - def _establish_connection(self): - assert self._casd_channel is None - - while not os.path.exists(self._socket_path): - # casd is not ready yet, try again after a 10ms delay, - # but don't wait for more than specified timeout period - if time.time() > self._start_time + _CASD_TIMEOUT: - raise CASCacheError("Timed out waiting for buildbox-casd to become ready") + self._lock = threading.Lock() - # check that process is still alive - try: - proc = psutil.Process(self._casd_pid) - if proc.status() == psutil.STATUS_ZOMBIE: - proc.wait() + def _establish_connection(self): + with self._lock: + if self._casd_channel is not None: + return + + while not os.path.exists(self._socket_path): + # casd is not ready yet, try again after a 10ms delay, + # but don't wait for more than specified timeout period + if time.time() > self._start_time + _CASD_TIMEOUT: + raise CASCacheError("Timed out waiting for buildbox-casd to become ready") + + if self._shutdown_requested: + # Shutdown has been requested, we can exit + return - if not proc.is_running(): + # check that process is still alive + try: + proc = psutil.Process(self._casd_pid) + if proc.status() == psutil.STATUS_ZOMBIE: + proc.wait() + + if not proc.is_running(): + if self._shutdown_requested: + return + raise CASCacheError("buildbox-casd process died before connection could be established") + except psutil.NoSuchProcess: + if self._shutdown_requested: + return raise CASCacheError("buildbox-casd process died before connection could be established") - except psutil.NoSuchProcess: - raise CASCacheError("buildbox-casd process died before connection could be established") - time.sleep(0.01) + time.sleep(0.01) - self._casd_channel = grpc.insecure_channel(self._connection_string) - self._bytestream = bytestream_pb2_grpc.ByteStreamStub(self._casd_channel) - self._casd_cas = remote_execution_pb2_grpc.ContentAddressableStorageStub(self._casd_channel) - self._local_cas = local_cas_pb2_grpc.LocalContentAddressableStorageStub(self._casd_channel) - self._asset_fetch = remote_asset_pb2_grpc.FetchStub(self._casd_channel) - self._asset_push = remote_asset_pb2_grpc.PushStub(self._casd_channel) + self._casd_channel = grpc.insecure_channel(self._connection_string) + self._bytestream = bytestream_pb2_grpc.ByteStreamStub(self._casd_channel) + self._casd_cas = remote_execution_pb2_grpc.ContentAddressableStorageStub(self._casd_channel) + self._local_cas = local_cas_pb2_grpc.LocalContentAddressableStorageStub(self._casd_channel) + self._asset_fetch = remote_asset_pb2_grpc.FetchStub(self._casd_channel) + self._asset_push = remote_asset_pb2_grpc.PushStub(self._casd_channel) # get_cas(): # @@ -284,12 +298,12 @@ class CASDChannel: # Return LocalCAS stub for buildbox-casd channel. # def get_local_cas(self): - if self._casd_channel is None: + if self._local_cas is None: self._establish_connection() return self._local_cas def get_bytestream(self): - if self._casd_channel is None: + if self._bytestream is None: self._establish_connection() return self._bytestream @@ -318,17 +332,30 @@ class CASDChannel: def is_closed(self): return self._casd_channel is None + # request_shutdown(): + # + # Notify the channel that a shutdown of casd was requested. + # + # Thus we know that not being able to establish a connection is expected + # and no error will be reported in that case. + def request_shutdown(self) -> None: + self._shutdown_requested = True + # close(): # # Close the casd channel. # def close(self): - if self.is_closed(): - return - self._asset_push = None - self._asset_fetch = None - self._local_cas = None - self._casd_cas = None - self._bytestream = None - self._casd_channel.close() - self._casd_channel = None + assert self._shutdown_requested, "Please request shutdown before closing" + + with self._lock: + if self.is_closed(): + return + + self._asset_push = None + self._asset_fetch = None + self._local_cas = None + self._casd_cas = None + self._bytestream = None + self._casd_channel.close() + self._casd_channel = None diff --git a/src/buildstream/_cas/casserver.py b/src/buildstream/_cas/casserver.py index 013fb07dd..04c5eb836 100644 --- a/src/buildstream/_cas/casserver.py +++ b/src/buildstream/_cas/casserver.py @@ -30,6 +30,7 @@ import grpc import click from .._protos.build.bazel.remote.asset.v1 import remote_asset_pb2_grpc +from .. import _signals from .._protos.build.bazel.remote.execution.v2 import ( remote_execution_pb2, remote_execution_pb2_grpc, @@ -137,7 +138,11 @@ def create_server(repo, *, enable_push, quota, index_only, log_level=LogLevel.Le _ReferenceStorageServicer(casd_channel, root, enable_push=enable_push), server ) - yield server + # Ensure we have the signal handler set for SIGTERM + # This allows threads from GRPC to call our methods that do register + # handlers at exit. + with _signals.terminator(lambda: None): + yield server finally: casd_channel.close() diff --git a/src/buildstream/_context.py b/src/buildstream/_context.py index 0c2d1a150..87b23d50c 100644 --- a/src/buildstream/_context.py +++ b/src/buildstream/_context.py @@ -564,17 +564,3 @@ class Context: log_directory=self.logdir, ) return self._cascache - - # prepare_fork(): - # - # Prepare this process for fork without exec. This is a safeguard against - # fork issues with multiple threads and gRPC connections. - # - def prepare_fork(self): - # gRPC channels must be closed before fork. - for cache in [self._cascache, self._artifactcache, self._sourcecache]: - if cache: - cache.close_grpc_channels() - - # Do not allow fork if there are background threads. - return utils._is_single_threaded() diff --git a/src/buildstream/_elementsources.py b/src/buildstream/_elementsources.py index 69051ee8a..a15d20ecb 100644 --- a/src/buildstream/_elementsources.py +++ b/src/buildstream/_elementsources.py @@ -100,7 +100,7 @@ class ElementSources: else: new_ref = source._track() - refs.append((source._unique_id, new_ref)) + refs.append((source._unique_id, new_ref, old_ref != new_ref)) # Complimentary warning that the new ref will be unused. if old_ref != new_ref and workspace: @@ -113,7 +113,7 @@ class ElementSources: # Sources which do not implement track() will return None, produce # a SKIP message in the UI if all sources produce None # - if all(ref is None for _, ref in refs): + if all(ref is None for _, ref, _ in refs): raise SkipJob("Element sources are not trackable") return refs diff --git a/src/buildstream/_messenger.py b/src/buildstream/_messenger.py index c37eca8bc..69a309f91 100644 --- a/src/buildstream/_messenger.py +++ b/src/buildstream/_messenger.py @@ -19,10 +19,10 @@ import os import datetime +import threading from contextlib import contextmanager from . import _signals -from . import utils from ._exceptions import BstError from ._message import Message, MessageType @@ -48,15 +48,17 @@ class _TimeData: class Messenger: def __init__(self): - self._message_handler = None - self._silence_scope_depth = 0 - self._log_handle = None - self._log_filename = None self._state = None self._next_render = None # A Time object self._active_simple_tasks = 0 self._render_status_cb = None + self._locals = threading.local() + self._locals.message_handler = None + self._locals.log_handle = None + self._locals.log_filename = None + self._locals.silence_scope_depth = 0 + # set_message_handler() # # Sets the handler for any status messages propagated through @@ -70,7 +72,7 @@ class Messenger: # ) -> None # def set_message_handler(self, handler): - self._message_handler = handler + self._locals.message_handler = handler # set_state() # @@ -101,7 +103,7 @@ class Messenger: # (bool): Whether messages are currently being silenced # def _silent_messages(self): - return self._silence_scope_depth > 0 + return self._locals.silence_scope_depth > 0 # message(): # @@ -112,16 +114,15 @@ class Messenger: # message: A Message object # def message(self, message): - # If we are recording messages, dump a copy into the open log file. self._record_message(message) # Send it off to the log handler (can be the frontend, # or it can be the child task which will propagate # to the frontend) - assert self._message_handler + assert self._locals.message_handler - self._message_handler(message, is_silenced=self._silent_messages()) + self._locals.message_handler(message, is_silenced=self._silent_messages()) # silence() # @@ -141,12 +142,12 @@ class Messenger: yield return - self._silence_scope_depth += 1 + self._locals.silence_scope_depth += 1 try: yield finally: - assert self._silence_scope_depth > 0 - self._silence_scope_depth -= 1 + assert self._locals.silence_scope_depth > 0 + self._locals.silence_scope_depth -= 1 # timed_activity() # @@ -264,22 +265,21 @@ class Messenger: # @contextmanager def recorded_messages(self, filename, logdir): - # We dont allow recursing in this context manager, and # we also do not allow it in the main process. - assert self._log_handle is None - assert self._log_filename is None - assert not utils._is_main_process() + assert not hasattr(self._locals, "log_handle") or self._locals.log_handle is None + assert not hasattr(self._locals, "log_filename") or self._locals.log_filename is None # Create the fully qualified logfile in the log directory, # appending the pid and .log extension at the end. - self._log_filename = os.path.join(logdir, "{}.{}.log".format(filename, os.getpid())) + self._locals.log_filename = os.path.join(logdir, "{}.{}.log".format(filename, os.getpid())) + self._locals.silence_scope_depth = 0 # Ensure the directory exists first - directory = os.path.dirname(self._log_filename) + directory = os.path.dirname(self._locals.log_filename) os.makedirs(directory, exist_ok=True) - with open(self._log_filename, "a") as logfile: + with open(self._locals.log_filename, "a") as logfile: # Write one last line to the log and flush it to disk def flush_log(): @@ -294,12 +294,12 @@ class Messenger: except RuntimeError: os.fsync(logfile.fileno()) - self._log_handle = logfile + self._locals.log_handle = logfile with _signals.terminator(flush_log): - yield self._log_filename + yield self._locals.log_filename - self._log_handle = None - self._log_filename = None + self._locals.log_handle = None + self._locals.log_filename = None # get_log_handle() # @@ -311,7 +311,7 @@ class Messenger: # (file): The active logging file handle, or None # def get_log_handle(self): - return self._log_handle + return self._locals.log_handle # get_log_filename() # @@ -323,7 +323,7 @@ class Messenger: # (str): The active logging filename, or None # def get_log_filename(self): - return self._log_filename + return self._locals.log_filename # timed_suspendable() # @@ -345,8 +345,6 @@ class Messenger: stopped_time = datetime.datetime.now() def resume_time(): - nonlocal timedata - nonlocal stopped_time sleep_time = datetime.datetime.now() - stopped_time timedata.start_time += sleep_time @@ -362,7 +360,7 @@ class Messenger: # def _record_message(self, message): - if self._log_handle is None: + if self._locals.log_handle is None: return INDENT = " " @@ -405,8 +403,8 @@ class Messenger: ) # Write to the open log file - self._log_handle.write("{}\n".format(text)) - self._log_handle.flush() + self._locals.log_handle.write("{}\n".format(text)) + self._locals.log_handle.flush() # _render_status() # diff --git a/src/buildstream/_remote.py b/src/buildstream/_remote.py index d8b8e68fe..6d52ff56a 100644 --- a/src/buildstream/_remote.py +++ b/src/buildstream/_remote.py @@ -16,6 +16,7 @@ # import os +import threading from collections import namedtuple from urllib.parse import urlparse @@ -146,41 +147,44 @@ class BaseRemote: self.push = spec.push self.url = spec.url + self._lock = threading.Lock() + # init(): # # Initialize the given remote. This function must be called before # any communication is performed, since such will otherwise fail. # def init(self): - if self._initialized: - return - - # Set up the communcation channel - url = urlparse(self.spec.url) - if url.scheme == "http": - port = url.port or 80 - self.channel = grpc.insecure_channel("{}:{}".format(url.hostname, port)) - elif url.scheme == "https": - port = url.port or 443 - try: - server_cert, client_key, client_cert = _read_files( - self.spec.server_cert, self.spec.client_key, self.spec.client_cert + with self._lock: + if self._initialized: + return + + # Set up the communcation channel + url = urlparse(self.spec.url) + if url.scheme == "http": + port = url.port or 80 + self.channel = grpc.insecure_channel("{}:{}".format(url.hostname, port)) + elif url.scheme == "https": + port = url.port or 443 + try: + server_cert, client_key, client_cert = _read_files( + self.spec.server_cert, self.spec.client_key, self.spec.client_cert + ) + except FileNotFoundError as e: + raise RemoteError("Could not read certificates: {}".format(e)) from e + self.server_cert = server_cert + self.client_key = client_key + self.client_cert = client_cert + credentials = grpc.ssl_channel_credentials( + root_certificates=self.server_cert, private_key=self.client_key, certificate_chain=self.client_cert ) - except FileNotFoundError as e: - raise RemoteError("Could not read certificates: {}".format(e)) from e - self.server_cert = server_cert - self.client_key = client_key - self.client_cert = client_cert - credentials = grpc.ssl_channel_credentials( - root_certificates=self.server_cert, private_key=self.client_key, certificate_chain=self.client_cert - ) - self.channel = grpc.secure_channel("{}:{}".format(url.hostname, port), credentials) - else: - raise RemoteError("Unsupported URL: {}".format(self.spec.url)) + self.channel = grpc.secure_channel("{}:{}".format(url.hostname, port), credentials) + else: + raise RemoteError("Unsupported URL: {}".format(self.spec.url)) - self._configure_protocols() + self._configure_protocols() - self._initialized = True + self._initialized = True def __enter__(self): return self diff --git a/src/buildstream/_scheduler/_multiprocessing.py b/src/buildstream/_scheduler/_multiprocessing.py deleted file mode 100644 index 4864e140c..000000000 --- a/src/buildstream/_scheduler/_multiprocessing.py +++ /dev/null @@ -1,79 +0,0 @@ -# -# Copyright (C) 2019 Bloomberg Finance LP -# -# This program is free software; you can redistribute it and/or -# modify it under the terms of the GNU Lesser General Public -# License as published by the Free Software Foundation; either -# version 2 of the License, or (at your option) any later version. -# -# This library is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU -# Lesser General Public License for more details. -# -# You should have received a copy of the GNU Lesser General Public -# License along with this library. If not, see <http://www.gnu.org/licenses/>. -# - -# TLDR: -# ALWAYS use `.AsyncioSafeProcess` when you have an asyncio event loop running and need a `multiprocessing.Process` -# -# -# The upstream asyncio library doesn't play well with forking subprocesses while an event loop is running. -# -# The main problem that affects us is that the parent and the child will share some file handlers. -# The most important one for us is the sig_handler_fd, which the loop uses to buffer signals received -# by the app so that the asyncio loop can treat them afterwards. -# -# This sharing means that when we send a signal to the child, the sighandler in the child will write -# it back to the parent sig_handler_fd, making the parent have to treat it too. -# This is a problem for example when we sigterm the process. The scheduler will send sigterms to all its children, -# which in turn will make the scheduler receive N SIGTERMs (one per child). Which in turn will send sigterms to -# the children... -# -# We therefore provide a `AsyncioSafeProcess` derived from multiprocessing.Process that automatically -# tries to cleanup the loop and never calls `waitpid` on the child process, which breaks our child watchers. -# -# -# Relevant issues: -# - Asyncio: support fork (https://bugs.python.org/issue21998) -# - Asyncio: support multiprocessing (support fork) (https://bugs.python.org/issue22087) -# - Signal delivered to a subprocess triggers parent's handler (https://bugs.python.org/issue31489) -# -# - -import multiprocessing -import signal -import sys -from asyncio import set_event_loop_policy - - -# _AsyncioSafeForkAwareProcess() -# -# Process class that doesn't call waitpid on its own. -# This prevents conflicts with the asyncio child watcher. -# -# Also automatically close any running asyncio loop before calling -# the actual run target -# -class _AsyncioSafeForkAwareProcess(multiprocessing.Process): - # pylint: disable=attribute-defined-outside-init - def start(self): - self._popen = self._Popen(self) - self._sentinel = self._popen.sentinel - - def run(self): - signal.set_wakeup_fd(-1) - set_event_loop_policy(None) - - super().run() - - -if sys.platform != "win32": - # Set the default event loop policy to automatically close our asyncio loop in child processes - AsyncioSafeProcess = _AsyncioSafeForkAwareProcess - -else: - # Windows doesn't support ChildWatcher that way anyways, we'll need another - # implementation if we want it - AsyncioSafeProcess = multiprocessing.Process diff --git a/src/buildstream/_scheduler/jobs/_job.pyi b/src/buildstream/_scheduler/jobs/_job.pyi new file mode 100644 index 000000000..fbf3e64de --- /dev/null +++ b/src/buildstream/_scheduler/jobs/_job.pyi @@ -0,0 +1 @@ +def terminate_thread(thread_id: int): ... diff --git a/src/buildstream/_scheduler/jobs/_job.pyx b/src/buildstream/_scheduler/jobs/_job.pyx new file mode 100644 index 000000000..82f6ab044 --- /dev/null +++ b/src/buildstream/_scheduler/jobs/_job.pyx @@ -0,0 +1,15 @@ +from cpython.pystate cimport PyThreadState_SetAsyncExc +from cpython.ref cimport PyObject +from ..._signals import TerminateException + + +# terminate_thread() +# +# Ask a given a given thread to terminate by raising an exception in it. +# +# Args: +# thread_id (int): the thread id in which to throw the exception +# +def terminate_thread(long thread_id): + res = PyThreadState_SetAsyncExc(thread_id, <PyObject*> TerminateException) + assert res == 1 diff --git a/src/buildstream/_scheduler/jobs/job.py b/src/buildstream/_scheduler/jobs/job.py index 08e40694e..2e8f5ca1a 100644 --- a/src/buildstream/_scheduler/jobs/job.py +++ b/src/buildstream/_scheduler/jobs/job.py @@ -25,17 +25,16 @@ import asyncio import datetime import itertools import multiprocessing -import os -import signal -import sys +import threading import traceback # BuildStream toplevel imports +from ... import utils from ..._exceptions import ImplError, BstError, set_last_task_error, SkipJob from ..._message import Message, MessageType, unconditional_messages from ...types import FastEnum -from ... import _signals, utils -from .. import _multiprocessing +from ._job import terminate_thread +from ..._signals import TerminateException # Return code values shutdown of job handling child processes @@ -46,7 +45,6 @@ class _ReturnCode(FastEnum): PERM_FAIL = 2 SKIPPED = 3 TERMINATED = 4 - KILLED = -9 # JobStatus: @@ -131,7 +129,6 @@ class Job: self._scheduler = scheduler # The scheduler self._messenger = self._scheduler.context.messenger self._pipe_r = None # The read end of a pipe for message passing - self._process = None # The Process object self._listening = False # Whether the parent is currently listening self._suspended = False # Whether this job is currently suspended self._max_retries = max_retries # Maximum number of automatic retries @@ -144,6 +141,9 @@ class Job: self._message_element_key = None # The task-wide element cache key self._element = None # The Element() passed to the Job() constructor, if applicable + self._task = None # The task that is run + self._child = None + # set_name() # # Sets the name of this job @@ -158,12 +158,14 @@ class Job: assert not self._terminated, "Attempted to start process which was already terminated" + # FIXME: remove this, this is not necessary when using asyncio self._pipe_r, pipe_w = multiprocessing.Pipe(duplex=False) self._tries += 1 self._parent_start_listening() - child_job = self.create_child_job( # pylint: disable=assignment-from-no-return + # FIXME: remove the parent/child separation, it's not needed anymore. + self._child = self.create_child_job( # pylint: disable=assignment-from-no-return self.action_name, self._messenger, self._scheduler.context.logdir, @@ -174,46 +176,28 @@ class Job: self._message_element_key, ) - self._process = _multiprocessing.AsyncioSafeProcess(target=child_job.child_action, args=[pipe_w],) - - # Block signals which are handled in the main process such that - # the child process does not inherit the parent's state, but the main - # process will be notified of any signal after we launch the child. - # - with _signals.blocked([signal.SIGINT, signal.SIGTSTP, signal.SIGTERM], ignore=False): - with asyncio.get_child_watcher() as watcher: - self._process.start() - # Register the process to call `_parent_child_completed` once it is done - - # Close the write end of the pipe in the parent - pipe_w.close() + loop = asyncio.get_event_loop() - # Here we delay the call to the next loop tick. This is in order to be running - # in the main thread, as the callback itself must be thread safe. - def on_completion(pid, returncode): - asyncio.get_event_loop().call_soon(self._parent_child_completed, pid, returncode) + async def execute(): + result = await loop.run_in_executor(None, self._child.child_action, pipe_w) + await self._parent_child_completed(result) - watcher.add_child_handler(self._process.pid, on_completion) + self._task = loop.create_task(execute()) # terminate() # # Politely request that an ongoing job terminate soon. # - # This will send a SIGTERM signal to the Job process. + # This will raise an exception in the child to ask it to exit. # def terminate(self): - - # First resume the job if it's suspended - self.resume(silent=True) - self.message(MessageType.STATUS, "{} terminating".format(self.action_name)) # Make sure there is no garbage on the pipe self._parent_stop_listening() - # Terminate the process using multiprocessing API pathway - if self._process: - self._process.terminate() + if self._task: + self._child.terminate() self._terminated = True @@ -227,51 +211,6 @@ class Job: def get_terminated(self): return self._terminated - # kill() - # - # Forcefully kill the process, and any children it might have. - # - def kill(self): - # Force kill - self.message(MessageType.WARN, "{} did not terminate gracefully, killing".format(self.action_name)) - if self._process: - utils._kill_process_tree(self._process.pid) - - # suspend() - # - # Suspend this job. - # - def suspend(self): - if not self._suspended: - self.message(MessageType.STATUS, "{} suspending".format(self.action_name)) - - try: - # Use SIGTSTP so that child processes may handle and propagate - # it to processes they start that become session leaders. - os.kill(self._process.pid, signal.SIGTSTP) - - # For some reason we receive exactly one suspend event for - # every SIGTSTP we send to the child process, even though the - # child processes are setsid(). We keep a count of these so we - # can ignore them in our event loop suspend_event(). - self._scheduler.internal_stops += 1 - self._suspended = True - except ProcessLookupError: - # ignore, process has already exited - pass - - # resume() - # - # Resume this suspended job. - # - def resume(self, silent=False): - if self._suspended: - if not silent and not self._scheduler.terminated: - self.message(MessageType.STATUS, "{} resuming".format(self.action_name)) - - os.kill(self._process.pid, signal.SIGCONT) - self._suspended = False - # set_message_element_name() # # This is called by Job subclasses to set the plugin instance element @@ -380,10 +319,9 @@ class Job: # Called in the main process courtesy of asyncio's ChildWatcher.add_child_handler() # # Args: - # pid (int): The PID of the child which completed # returncode (int): The return code of the child process # - def _parent_child_completed(self, pid, returncode): + async def _parent_child_completed(self, returncode): self._parent_shutdown() try: @@ -414,16 +352,9 @@ class Job: status = JobStatus.FAIL elif returncode == _ReturnCode.TERMINATED: if self._terminated: - self.message(MessageType.INFO, "Process was terminated") - else: - self.message(MessageType.ERROR, "Process was terminated unexpectedly") - - status = JobStatus.FAIL - elif returncode == _ReturnCode.KILLED: - if self._terminated: - self.message(MessageType.INFO, "Process was killed") + self.message(MessageType.INFO, "Job terminated") else: - self.message(MessageType.ERROR, "Process was killed unexpectedly") + self.message(MessageType.ERROR, "Job was terminated unexpectedly") status = JobStatus.FAIL else: @@ -434,7 +365,7 @@ class Job: # Force the deletion of the pipe and process objects to try and clean up FDs self._pipe_r.close() - self._pipe_r = self._process = None + self._pipe_r = self._task = None # _parent_process_envelope() # @@ -549,6 +480,9 @@ class ChildJob: self._message_element_key = message_element_key self._pipe_w = None # The write end of a pipe for message passing + self._thread_id = None # Thread in which the child executes its action + self._should_terminate = False + self._terminate_lock = threading.Lock() # message(): # @@ -615,19 +549,6 @@ class ChildJob: # pipe_w (multiprocessing.connection.Connection): The message pipe for IPC # def child_action(self, pipe_w): - - # This avoids some SIGTSTP signals from grandchildren - # getting propagated up to the master process - os.setsid() - - # First set back to the default signal handlers for the signals - # we handle, and then clear their blocked state. - # - signal_list = [signal.SIGTSTP, signal.SIGTERM] - for sig in signal_list: - signal.signal(sig, signal.SIG_DFL) - signal.pthread_sigmask(signal.SIG_UNBLOCK, signal_list) - # Assign the pipe we passed across the process boundaries # # Set the global message handler in this child @@ -635,78 +556,108 @@ class ChildJob: self._pipe_w = pipe_w self._messenger.set_message_handler(self._child_message_handler) - # Graciously handle sigterms. - def handle_sigterm(): - self._child_shutdown(_ReturnCode.TERMINATED) - # Time, log and and run the action function # - with _signals.terminator( - handle_sigterm - ), self._messenger.timed_suspendable() as timeinfo, self._messenger.recorded_messages( + with self._messenger.timed_suspendable() as timeinfo, self._messenger.recorded_messages( self._logfile, self._logdir ) as filename: - self.message(MessageType.START, self.action_name, logfile=filename) - try: - # Try the task action - result = self.child_process() # pylint: disable=assignment-from-no-return - except SkipJob as e: - elapsed = datetime.datetime.now() - timeinfo.start_time - self.message(MessageType.SKIPPED, str(e), elapsed=elapsed, logfile=filename) - - # Alert parent of skip by return code - self._child_shutdown(_ReturnCode.SKIPPED) - except BstError as e: - elapsed = datetime.datetime.now() - timeinfo.start_time - retry_flag = e.temporary - - if retry_flag and (self._tries <= self._max_retries): - self.message( - MessageType.FAIL, - "Try #{} failed, retrying".format(self._tries), - elapsed=elapsed, - logfile=filename, - ) - else: - self.message( - MessageType.FAIL, str(e), elapsed=elapsed, detail=e.detail, logfile=filename, sandbox=e.sandbox - ) + self.message(MessageType.START, self.action_name, logfile=filename) + + with self._terminate_lock: + self._thread_id = threading.current_thread().ident + if self._should_terminate: + return _ReturnCode.TERMINATED + + try: + # Try the task action + result = self.child_process() # pylint: disable=assignment-from-no-return + except SkipJob as e: + elapsed = datetime.datetime.now() - timeinfo.start_time + self.message(MessageType.SKIPPED, str(e), elapsed=elapsed, logfile=filename) + + # Alert parent of skip by return code + return _ReturnCode.SKIPPED + except BstError as e: + elapsed = datetime.datetime.now() - timeinfo.start_time + retry_flag = e.temporary + + if retry_flag and (self._tries <= self._max_retries): + self.message( + MessageType.FAIL, + "Try #{} failed, retrying".format(self._tries), + elapsed=elapsed, + logfile=filename, + ) + else: + self.message( + MessageType.FAIL, + str(e), + elapsed=elapsed, + detail=e.detail, + logfile=filename, + sandbox=e.sandbox, + ) + + self._send_message(_MessageType.CHILD_DATA, self.child_process_data()) + + # Report the exception to the parent (for internal testing purposes) + self._child_send_error(e) + + # Set return code based on whether or not the error was temporary. + # + return _ReturnCode.FAIL if retry_flag else _ReturnCode.PERM_FAIL + except Exception: # pylint: disable=broad-except + + # If an unhandled (not normalized to BstError) occurs, that's a bug, + # send the traceback and formatted exception back to the frontend + # and print it to the log file. + # + elapsed = datetime.datetime.now() - timeinfo.start_time + detail = "An unhandled exception occured:\n\n{}".format(traceback.format_exc()) + + self.message(MessageType.BUG, self.action_name, elapsed=elapsed, detail=detail, logfile=filename) + # Unhandled exceptions should permenantly fail + return _ReturnCode.PERM_FAIL - self._send_message(_MessageType.CHILD_DATA, self.child_process_data()) - - # Report the exception to the parent (for internal testing purposes) - self._child_send_error(e) - - # Set return code based on whether or not the error was temporary. - # - self._child_shutdown(_ReturnCode.FAIL if retry_flag else _ReturnCode.PERM_FAIL) - - except Exception: # pylint: disable=broad-except - - # If an unhandled (not normalized to BstError) occurs, that's a bug, - # send the traceback and formatted exception back to the frontend - # and print it to the log file. - # - elapsed = datetime.datetime.now() - timeinfo.start_time - detail = "An unhandled exception occured:\n\n{}".format(traceback.format_exc()) + else: + # No exception occurred in the action + self._send_message(_MessageType.CHILD_DATA, self.child_process_data()) + self._child_send_result(result) + + elapsed = datetime.datetime.now() - timeinfo.start_time + self.message(MessageType.SUCCESS, self.action_name, elapsed=elapsed, logfile=filename) + + # Shutdown needs to stay outside of the above context manager, + # make sure we dont try to handle SIGTERM while the process + # is already busy in sys.exit() + return _ReturnCode.OK + finally: + self._thread_id = None + except TerminateException: + self._thread_id = None + return _ReturnCode.TERMINATED + finally: + self._pipe_w.close() - self.message(MessageType.BUG, self.action_name, elapsed=elapsed, detail=detail, logfile=filename) - # Unhandled exceptions should permenantly fail - self._child_shutdown(_ReturnCode.PERM_FAIL) + # terminate() + # + # Ask the the current child thread to terminate + # + # This should only ever be called from the main thread. + # + def terminate(self): + assert utils._is_in_main_thread(), "Terminating the job's thread should only be done from the scheduler" - else: - # No exception occurred in the action - self._send_message(_MessageType.CHILD_DATA, self.child_process_data()) - self._child_send_result(result) + if self._should_terminate: + return - elapsed = datetime.datetime.now() - timeinfo.start_time - self.message(MessageType.SUCCESS, self.action_name, elapsed=elapsed, logfile=filename) + with self._terminate_lock: + self._should_terminate = True + if self._thread_id is None: + return - # Shutdown needs to stay outside of the above context manager, - # make sure we dont try to handle SIGTERM while the process - # is already busy in sys.exit() - self._child_shutdown(_ReturnCode.OK) + terminate_thread(self._thread_id) ####################################################### # Local Private Methods # @@ -758,18 +709,6 @@ class ChildJob: if result is not None: self._send_message(_MessageType.RESULT, result) - # _child_shutdown() - # - # Shuts down the child process by cleaning up and exiting the process - # - # Args: - # exit_code (_ReturnCode): The exit code to exit with - # - def _child_shutdown(self, exit_code): - self._pipe_w.close() - assert isinstance(exit_code, _ReturnCode) - sys.exit(exit_code.value) - # _child_message_handler() # # A Context delegate for handling messages, this replaces the diff --git a/src/buildstream/_scheduler/queues/trackqueue.py b/src/buildstream/_scheduler/queues/trackqueue.py index 5b3f05b57..5879cc125 100644 --- a/src/buildstream/_scheduler/queues/trackqueue.py +++ b/src/buildstream/_scheduler/queues/trackqueue.py @@ -56,9 +56,10 @@ class TrackQueue(Queue): # Set the new refs in the main process one by one as they complete, # writing to bst files this time if result is not None: - for unique_id, new_ref in result: - source = Plugin._lookup(unique_id) - source._set_ref(new_ref, save=True) + for unique_id, new_ref, ref_changed in result: + if ref_changed: + source = Plugin._lookup(unique_id) + source._set_ref(new_ref, save=True) element._tracking_done() diff --git a/src/buildstream/_scheduler/scheduler.py b/src/buildstream/_scheduler/scheduler.py index 903cd0be9..7acb062d0 100644 --- a/src/buildstream/_scheduler/scheduler.py +++ b/src/buildstream/_scheduler/scheduler.py @@ -20,12 +20,15 @@ # Jürg Billeter <juerg.billeter@codethink.co.uk> # System imports +import functools import os import asyncio from itertools import chain import signal import datetime +import multiprocessing.forkserver import sys +from concurrent.futures import ThreadPoolExecutor # Local imports from .resources import Resources @@ -34,9 +37,7 @@ from ..types import FastEnum from .._profile import Topics, PROFILER from .._message import Message, MessageType from ..plugin import Plugin - - -_MAX_TIMEOUT_TO_KILL_CHILDREN = 20 # in seconds +from .. import _signals # A decent return code for Scheduler.run() @@ -46,6 +47,23 @@ class SchedStatus(FastEnum): TERMINATED = 1 +def reset_signals_on_exit(func): + @functools.wraps(func) + def wrapper(*args, **kwargs): + orig_sigint = signal.getsignal(signal.SIGINT) + orig_sigterm = signal.getsignal(signal.SIGTERM) + orig_sigtstp = signal.getsignal(signal.SIGTSTP) + + try: + return func(*args, **kwargs) + finally: + signal.signal(signal.SIGINT, orig_sigint) + signal.signal(signal.SIGTERM, orig_sigterm) + signal.signal(signal.SIGTSTP, orig_sigtstp) + + return wrapper + + # Scheduler() # # The scheduler operates on a list queues, each of which is meant to accomplish @@ -79,7 +97,6 @@ class Scheduler: # These are shared with the Job, but should probably be removed or made private in some way. self.loop = None # Shared for Job access to observe the message queue - self.internal_stops = 0 # Amount of SIGSTP signals we've introduced, this is shared with job.py # # Private members @@ -98,6 +115,14 @@ class Scheduler: self.resources = Resources(context.sched_builders, context.sched_fetchers, context.sched_pushers) self._state.register_task_retry_callback(self._failure_retry) + # Ensure that the forkserver is started before we start. + # This is best run before we do any GRPC connections to casd or have + # other background threads started. + # We ignore signals here, as this is the state all the python child + # processes from now on will have when starting + with _signals.blocked([signal.SIGINT, signal.SIGTSTP], ignore=True): + multiprocessing.forkserver.ensure_running() + # run() # # Args: @@ -113,6 +138,7 @@ class Scheduler: # elements have been processed by each queue or when # an error arises # + @reset_signals_on_exit def run(self, queues, casd_process_manager): # Hold on to the queues to process @@ -149,10 +175,18 @@ class Scheduler: # Start the profiler with PROFILER.profile(Topics.SCHEDULER, "_".join(queue.action_name for queue in self.queues)): - # Run the queues - self._sched() - self.loop.run_forever() - self.loop.close() + # This is not a no-op. Since it is the first signal registration + # that is set, it allows then other threads to register signal + # handling routines, which would not be possible if the main thread + # hadn't set it before. + # FIXME: this should be done in a cleaner way + with _signals.suspendable(lambda: None, lambda: None), _signals.terminator(lambda: None): + with ThreadPoolExecutor(max_workers=sum(self.resources._max_resources.values())) as pool: + self.loop.set_default_executor(pool) + # Run the queues + self._sched() + self.loop.run_forever() + self.loop.close() # Invoke the ticker callback a final time to render pending messages self._ticker_callback() @@ -230,8 +264,8 @@ class Scheduler: # Restart the scheduler # def resume(self): - self._resume_jobs() self._connect_signals() + self._resume_jobs() # stop() # @@ -351,13 +385,6 @@ class Scheduler: # If that happens, do another round. process_queues = any(q.dequeue_ready() for q in self.queues) - # Make sure fork is allowed before starting jobs - if not self.context.prepare_fork(): - message = Message(MessageType.BUG, "Fork is not allowed", detail="Background threads are active") - self.context.messenger.message(message) - self.terminate() - return - # Start the jobs # for job in ready: @@ -415,9 +442,10 @@ class Scheduler: if not self.suspended: self._suspendtime = datetime.datetime.now() self.suspended = True - # Notify that we're suspended - for job in self._active_jobs: - job.suspend() + _signals.is_not_suspended.clear() + + for suspender in reversed(_signals.suspendable_stack): + suspender.suspend() # _resume_jobs() # @@ -425,8 +453,10 @@ class Scheduler: # def _resume_jobs(self): if self.suspended: - for job in self._active_jobs: - job.resume() + for suspender in _signals.suspendable_stack: + suspender.resume() + + _signals.is_not_suspended.set() self.suspended = False # Notify that we're unsuspended self._state.offset_start_time(datetime.datetime.now() - self._suspendtime) @@ -458,12 +488,6 @@ class Scheduler: # A loop registered event callback for SIGTSTP # def _suspend_event(self): - - # Ignore the feedback signals from Job.suspend() - if self.internal_stops: - self.internal_stops -= 1 - return - # No need to care if jobs were suspended or not, we _only_ handle this # while we know jobs are not suspended. self._suspend_jobs() @@ -485,13 +509,6 @@ class Scheduler: self.loop.remove_signal_handler(signal.SIGTERM) def _terminate_jobs_real(self): - def kill_jobs(): - for job_ in self._active_jobs: - job_.kill() - - # Schedule all jobs to be killed if they have not exited after timeout - self.loop.call_later(_MAX_TIMEOUT_TO_KILL_CHILDREN, kill_jobs) - for job in self._active_jobs: job.terminate() diff --git a/src/buildstream/_signals.py b/src/buildstream/_signals.py index 03b55b052..ef2ba1965 100644 --- a/src/buildstream/_signals.py +++ b/src/buildstream/_signals.py @@ -33,6 +33,23 @@ from typing import Callable, Deque terminator_stack: Deque[Callable] = deque() suspendable_stack: Deque[Callable] = deque() +terminator_lock = threading.Lock() +suspendable_lock = threading.Lock() + +# This event is used to block all the threads while we wait for user +# interaction. This is because we can't stop all the pythin threads but the +# one easily when waiting for user input. However, most performance intensive +# tasks will pass through a subprocess or a multiprocess.Process and all of +# those are guarded by the signal handling. Thus, by setting and unsetting this +# event in the scheduler, we can enable and disable the launching of processes +# and ensure we don't do anything resource intensive while being interrupted. +is_not_suspended = threading.Event() +is_not_suspended.set() + + +class TerminateException(BaseException): + pass + # Per process SIGTERM handler def terminator_handler(signal_, frame): @@ -68,6 +85,9 @@ def terminator_handler(signal_, frame): # that while the code block is running, the supplied function # will be called upon process termination. # +# /!\ The callbacks passed must only contain code that does not acces thread +# local variables. Those will run in the main thread. +# # Note that after handlers are called, the termination will be handled by # terminating immediately with os._exit(). This means that SystemExit will not # be raised and 'finally' clauses will not be executed. @@ -80,23 +100,27 @@ def terminator_handler(signal_, frame): def terminator(terminate_func): global terminator_stack # pylint: disable=global-statement - # Signal handling only works in the main thread - if threading.current_thread() != threading.main_thread(): - yield - return - outermost = bool(not terminator_stack) - terminator_stack.append(terminate_func) + assert threading.current_thread() == threading.main_thread() or not outermost + + with terminator_lock: + terminator_stack.append(terminate_func) + if outermost: original_handler = signal.signal(signal.SIGTERM, terminator_handler) try: yield + except TerminateException: + terminate_func() + raise finally: if outermost: signal.signal(signal.SIGTERM, original_handler) - terminator_stack.pop() + + with terminator_lock: + terminator_stack.remove(terminate_func) # Just a simple object for holding on to two callbacks @@ -108,21 +132,25 @@ class Suspender: # Per process SIGTSTP handler def suspend_handler(sig, frame): + is_not_suspended.clear() # Suspend callbacks from innermost frame first - for suspender in reversed(suspendable_stack): - suspender.suspend() + with suspendable_lock: + for suspender in reversed(suspendable_stack): + suspender.suspend() - # Use SIGSTOP directly now on self, dont introduce more SIGTSTP - # - # Here the process sleeps until SIGCONT, which we simply - # dont handle. We know we'll pickup execution right here - # when we wake up. - os.kill(os.getpid(), signal.SIGSTOP) + # Use SIGSTOP directly now on self, dont introduce more SIGTSTP + # + # Here the process sleeps until SIGCONT, which we simply + # dont handle. We know we'll pickup execution right here + # when we wake up. + os.kill(os.getpid(), signal.SIGSTOP) - # Resume callbacks from outermost frame inwards - for suspender in suspendable_stack: - suspender.resume() + # Resume callbacks from outermost frame inwards + for suspender in suspendable_stack: + suspender.resume() + + is_not_suspended.set() # suspendable() @@ -133,6 +161,9 @@ def suspend_handler(sig, frame): # suspend_callback (callable): A function to call as process suspend time. # resume_callback (callable): A function to call as process resume time. # +# /!\ The callbacks passed must only contain code that does not acces thread +# local variables. Those will run in the main thread. +# # This must be used in code blocks which start processes that become # their own session leader. In these cases, SIGSTOP and SIGCONT need # to be propagated to the child process group. @@ -146,8 +177,19 @@ def suspendable(suspend_callback, resume_callback): global suspendable_stack # pylint: disable=global-statement outermost = bool(not suspendable_stack) + assert threading.current_thread() == threading.main_thread() or not outermost + + # If we are not in the main thread, ensure that we are not suspended + # before running. + # If we are in the main thread, never block on this, to ensure we + # don't deadlock. + if threading.current_thread() != threading.main_thread(): + is_not_suspended.wait() + suspender = Suspender(suspend_callback, resume_callback) - suspendable_stack.append(suspender) + + with suspendable_lock: + suspendable_stack.append(suspender) if outermost: original_stop = signal.signal(signal.SIGTSTP, suspend_handler) @@ -158,7 +200,8 @@ def suspendable(suspend_callback, resume_callback): if outermost: signal.signal(signal.SIGTSTP, original_stop) - suspendable_stack.pop() + with suspendable_lock: + suspendable_stack.remove(suspender) # blocked() diff --git a/src/buildstream/_workspaces.py b/src/buildstream/_workspaces.py index f1fc353a7..e51be0829 100644 --- a/src/buildstream/_workspaces.py +++ b/src/buildstream/_workspaces.py @@ -428,7 +428,7 @@ class Workspaces: # create_workspace permanent # def save_config(self): - assert utils._is_main_process() + assert utils._is_in_main_thread() config = { "format-version": BST_WORKSPACE_FORMAT_VERSION, diff --git a/src/buildstream/downloadablefilesource.py b/src/buildstream/downloadablefilesource.py index 2c438b033..b299b7f1b 100644 --- a/src/buildstream/downloadablefilesource.py +++ b/src/buildstream/downloadablefilesource.py @@ -92,6 +92,33 @@ class _NetrcPasswordManager: return login, password +def _download_file(opener, url, etag, directory): + default_name = os.path.basename(url) + request = urllib.request.Request(url) + request.add_header("Accept", "*/*") + request.add_header("User-Agent", "BuildStream/2") + + if etag is not None: + request.add_header("If-None-Match", etag) + + with contextlib.closing(opener.open(request)) as response: + info = response.info() + + # some servers don't honor the 'If-None-Match' header + if etag and info["ETag"] == etag: + return None, None + + etag = info["ETag"] + + filename = info.get_filename(default_name) + filename = os.path.basename(filename) + local_file = os.path.join(directory, filename) + with open(local_file, "wb") as dest: + shutil.copyfileobj(response, dest) + + return local_file, etag + + class DownloadableFileSource(Source): # pylint: disable=attribute-defined-outside-init @@ -130,19 +157,18 @@ class DownloadableFileSource(Source): # there is no 'track' field in the source to determine what/whether # or not to update refs, because tracking a ref is always a conscious # decision by the user. - with self.timed_activity("Tracking {}".format(self.url), silent_nested=True): - new_ref = self._ensure_mirror() - - if self.ref and self.ref != new_ref: - detail = ( - "When tracking, new ref differs from current ref:\n" - + " Tracked URL: {}\n".format(self.url) - + " Current ref: {}\n".format(self.ref) - + " New ref: {}\n".format(new_ref) - ) - self.warn("Potential man-in-the-middle attack!", detail=detail) + new_ref = self._ensure_mirror("Tracking {}".format(self.url)) - return new_ref + if self.ref and self.ref != new_ref: + detail = ( + "When tracking, new ref differs from current ref:\n" + + " Tracked URL: {}\n".format(self.url) + + " Current ref: {}\n".format(self.ref) + + " New ref: {}\n".format(new_ref) + ) + self.warn("Potential man-in-the-middle attack!", detail=detail) + + return new_ref def fetch(self): # pylint: disable=arguments-differ @@ -155,12 +181,11 @@ class DownloadableFileSource(Source): # Download the file, raise hell if the sha256sums don't match, # and mirror the file otherwise. - with self.timed_activity("Fetching {}".format(self.url), silent_nested=True): - sha256 = self._ensure_mirror() - if sha256 != self.ref: - raise SourceError( - "File downloaded from {} has sha256sum '{}', not '{}'!".format(self.url, sha256, self.ref) - ) + sha256 = self._ensure_mirror("Fetching {}".format(self.url),) + if sha256 != self.ref: + raise SourceError( + "File downloaded from {} has sha256sum '{}', not '{}'!".format(self.url, sha256, self.ref) + ) def _warn_deprecated_etag(self, node): etag = node.get_str("etag", None) @@ -181,40 +206,25 @@ class DownloadableFileSource(Source): with utils.save_file_atomic(etagfilename) as etagfile: etagfile.write(etag) - def _ensure_mirror(self): + def _ensure_mirror(self, activity_name: str): # Downloads from the url and caches it according to its sha256sum. try: with self.tempdir() as td: - default_name = os.path.basename(self.url) - request = urllib.request.Request(self.url) - request.add_header("Accept", "*/*") - request.add_header("User-Agent", "BuildStream/2") - # We do not use etag in case what we have in cache is # not matching ref in order to be able to recover from # corrupted download. - if self.ref: - etag = self._get_etag(self.ref) - + if self.ref and not self.is_cached(): # Do not re-download the file if the ETag matches. - if etag and self.is_cached(): - request.add_header("If-None-Match", etag) - - opener = self.__get_urlopener() - with contextlib.closing(opener.open(request)) as response: - info = response.info() - - # some servers don't honor the 'If-None-Match' header - if self.ref and etag and info["ETag"] == etag: - return self.ref + etag = self._get_etag(self.ref) + else: + etag = None - etag = info["ETag"] + local_file, new_etag = self.blocking_activity( + _download_file, (self.__get_urlopener(), self.url, etag, td), activity_name + ) - filename = info.get_filename(default_name) - filename = os.path.basename(filename) - local_file = os.path.join(td, filename) - with open(local_file, "wb") as dest: - shutil.copyfileobj(response, dest) + if local_file is None: + return self.ref # Make sure url-specific mirror dir exists. if not os.path.isdir(self._mirror_dir): @@ -226,8 +236,8 @@ class DownloadableFileSource(Source): # In case the old file was corrupted somehow. os.rename(local_file, self._get_mirror_file(sha256)) - if etag: - self._store_etag(sha256, etag) + if new_etag: + self._store_etag(sha256, new_etag) return sha256 except urllib.error.HTTPError as e: @@ -252,6 +262,11 @@ class DownloadableFileSource(Source): return self.__default_mirror_file + @classmethod + def _reset_url_opener(cls): + # Needed for tests, in order to cleanup the `netrc` configuration. + cls.__urlopener = None + def __get_urlopener(self): if not DownloadableFileSource.__urlopener: try: diff --git a/src/buildstream/element.py b/src/buildstream/element.py index ac16e3103..6c4e45d03 100644 --- a/src/buildstream/element.py +++ b/src/buildstream/element.py @@ -1497,15 +1497,9 @@ class Element(Plugin): # def _stage_sources_at(self, vdirectory, usebuildtree=False): - context = self._get_context() - # It's advantageous to have this temporary directory on # the same file system as the rest of our cache. - with self.timed_activity("Staging sources", silent_nested=True), utils._tempdir( - dir=context.tmpdir, prefix="staging-temp" - ) as temp_staging_directory: - - import_dir = temp_staging_directory + with self.timed_activity("Staging sources", silent_nested=True): if not isinstance(vdirectory, Directory): vdirectory = FileBasedDirectory(vdirectory) @@ -1534,8 +1528,7 @@ class Element(Plugin): import_dir = staged_sources # Set update_mtime to ensure deterministic mtime of sources at build time - with utils._deterministic_umask(): - vdirectory.import_files(import_dir, update_mtime=BST_ARBITRARY_TIMESTAMP) + vdirectory.import_files(import_dir, update_mtime=BST_ARBITRARY_TIMESTAMP) # Ensure deterministic owners of sources at build time vdirectory.set_deterministic_user() @@ -1549,7 +1542,7 @@ class Element(Plugin): # scope (_Scope): The scope of dependencies to mark as required # def _set_required(self, scope=_Scope.RUN): - assert utils._is_main_process(), "This has an impact on all elements and must be run in the main process" + assert utils._is_in_main_thread(), "This has an impact on all elements and must be run in the main thread" if self.__required: # Already done @@ -1586,7 +1579,7 @@ class Element(Plugin): # required in the local cache. # def _set_artifact_files_required(self, scope=_Scope.RUN): - assert utils._is_main_process(), "This has an impact on all elements and must be run in the main process" + assert utils._is_in_main_thread(), "This has an impact on all elements and must be run in the main thread" if self.__artifact_files_required: # Already done @@ -1636,7 +1629,7 @@ class Element(Plugin): # in a subprocess. # def __schedule_assembly_when_necessary(self): - assert utils._is_main_process(), "This has an impact on all elements and must be run in the main process" + assert utils._is_in_main_thread(), "This has an impact on all elements and must be run in the main thread" # FIXME: We could reduce the number of function calls a bit by # factoring this out of this method (and checking whether we @@ -1669,7 +1662,7 @@ class Element(Plugin): # def _assemble_done(self, successful): assert self.__assemble_scheduled - assert utils._is_main_process(), "This has an impact on all elements and must be run in the main process" + assert utils._is_in_main_thread(), "This has an impact on all elements and must be run in the main thread" self.__assemble_done = True @@ -1689,8 +1682,6 @@ class Element(Plugin): self._update_ready_for_runtime_and_cached() if self._get_workspace() and self._cached(): - assert utils._is_main_process(), "Attempted to save workspace configuration from child process" - # # Note that this block can only happen in the # main process, since `self._cached_success()` cannot # be true when assembly is successful in the task. @@ -1867,7 +1858,7 @@ class Element(Plugin): # fetched_original (bool): Whether the original sources had been asked (and fetched) or not # def _fetch_done(self, fetched_original): - assert utils._is_main_process(), "This has an impact on all elements and must be run in the main process" + assert utils._is_in_main_thread(), "This has an impact on all elements and must be run in the main thread" self.__sources.fetch_done(fetched_original) @@ -1913,7 +1904,7 @@ class Element(Plugin): # This will result in updating the element state. # def _pull_done(self): - assert utils._is_main_process(), "This has an impact on all elements and must be run in the main process" + assert utils._is_in_main_thread(), "This has an impact on all elements and must be run in the main thread" self.__pull_done = True @@ -2091,7 +2082,7 @@ class Element(Plugin): # the workspaces metadata first. # def _open_workspace(self): - assert utils._is_main_process(), "This writes to a global file and therefore must be run in the main process" + assert utils._is_in_main_thread(), "This writes to a global file and therefore must be run in the main thread" context = self._get_context() workspace = self._get_workspace() @@ -2393,7 +2384,7 @@ class Element(Plugin): # the appropriate counters. # def _update_ready_for_runtime_and_cached(self): - assert utils._is_main_process(), "This has an impact on all elements and must be run in the main process" + assert utils._is_in_main_thread(), "This has an impact on all elements and must be run in the main thread" if not self.__ready_for_runtime_and_cached: if self.__runtime_deps_uncached == 0 and self.__cache_key and self._cached_success(): @@ -3139,7 +3130,7 @@ class Element(Plugin): # in Scope.BUILD has changed in any way. # def __update_cache_keys(self): - assert utils._is_main_process(), "This has an impact on all elements and must be run in the main process" + assert utils._is_in_main_thread(), "This has an impact on all elements and must be run in the main thread" if self.__strict_cache_key is not None: # Cache keys already calculated @@ -3212,7 +3203,7 @@ class Element(Plugin): # it can check whether an artifact exists for that cache key. # def __update_artifact_state(self): - assert utils._is_main_process(), "This has an impact on all elements and must be run in the main process" + assert utils._is_in_main_thread(), "This has an impact on all elements and must be run in the main thread" assert self.__artifact is None context = self._get_context() @@ -3247,7 +3238,7 @@ class Element(Plugin): # a remote cache). # def __update_cache_key_non_strict(self): - assert utils._is_main_process(), "This has an impact on all elements and must be run in the main process" + assert utils._is_in_main_thread(), "This has an impact on all elements and must be run in the main thread" # The final cache key can be None here only in non-strict mode if self.__cache_key is None: diff --git a/src/buildstream/plugin.py b/src/buildstream/plugin.py index 2b2382eb7..f98005644 100644 --- a/src/buildstream/plugin.py +++ b/src/buildstream/plugin.py @@ -110,14 +110,19 @@ Class Reference """ import itertools +import multiprocessing import os +import pickle +import queue +import signal import subprocess import sys -from contextlib import contextmanager -from typing import Generator, Optional, Tuple, TYPE_CHECKING +import traceback +from contextlib import contextmanager, suppress +from typing import Any, Callable, Generator, Optional, Sequence, Tuple, TypeVar, TYPE_CHECKING from weakref import WeakValueDictionary -from . import utils +from . import utils, _signals from ._exceptions import PluginError, ImplError from ._message import Message, MessageType from .node import Node, MappingNode @@ -131,6 +136,36 @@ if TYPE_CHECKING: # pylint: enable=cyclic-import +T1 = TypeVar("T1") + + +# _background_job_wrapper() +# +# Wrapper for running jobs in the background, transparently for users +# +# This method will put on the queue a response of the form: +# (PickleError, OtherError, Result) +# +# Args: +# result_queue: The queue in which to pass back the result +# target: function to execute in the background +# args: positional arguments to give to the target function +# +def _background_job_wrapper(result_queue: multiprocessing.Queue, target: Callable[..., T1], args: Any) -> None: + result = None + + try: + result = target(*args) + result_queue.put((None, result)) + except Exception as exc: # pylint: disable=broad-except + try: + # Here we send the result again, just in case it was a PickleError + # in which case the same exception would be thrown down + result_queue.put((exc, result)) + except pickle.PickleError as exc: + result_queue.put((traceback.format_exc(), None)) + + class Plugin: """Plugin() @@ -212,6 +247,18 @@ class Plugin: # scheduling tasks. __TABLE = WeakValueDictionary() # type: WeakValueDictionary[int, Plugin] + try: + __multiprocessing_context = multiprocessing.get_context("forkserver") + except ValueError: + # We are on a system without `forkserver` support. Let's default to + # spawn. This seems to be hanging however in some rare cases. + # + # Support is not as critical for now, since we do not work on + # platforms not supporting forkserver for now (mainly Windows) + # + # XXX: investigate why we sometimes get deadlocks there + __multiprocessing_context = multiprocessing.get_context("spawn") + def __init__( self, name: str, @@ -509,6 +556,100 @@ class Plugin: ): yield + def blocking_activity( + self, + target: Callable[..., T1], + args: Sequence[Any], + activity_name: str, + *, + detail: Optional[str] = None, + silent_nested: bool = False + ) -> T1: + """Execute a blocking activity in the background. + + This is to execute potentially blocking methods in the background, + in order to avoid starving the scheduler. + + The function, its arguments and return value must all be pickleable, + as it will be run in another process. + + This should be used whenever there is a potential for a blocking + syscall to not return in a reasonable (<1s) amount of time. + For example, you would use this if you were doing a request to a + remote server, without a timeout. + + Args: + target: the function to execute in the background + args: positional arguments to pass to the method to call + activity_name: The name of the activity + detail: An optional detailed message, can be multiline output + silent_nested: If specified, nested messages will be silenced + + Returns: + the return value from `target`. + """ + with self.__context.messenger.timed_activity( + activity_name, element_name=self._get_full_name(), detail=detail, silent_nested=silent_nested + ): + result_queue = self.__multiprocessing_context.Queue() + proc = None + + def kill_proc(): + if proc and proc.is_alive(): + proc.kill() + proc.join() + + def suspend_proc(): + if proc and proc.is_alive(): + with suppress(ProcessLookupError): + os.kill(proc.pid, signal.SIGSTOP) + + def resume_proc(): + if proc and proc.is_alive(): + with suppress(ProcessLookupError): + os.kill(proc.pid, signal.SIGCONT) + + with _signals.suspendable(suspend_proc, resume_proc), _signals.terminator(kill_proc): + proc = self.__multiprocessing_context.Process( + target=_background_job_wrapper, args=(result_queue, target, args) + ) + proc.start() + + should_continue = True + last_check = False + + while should_continue or last_check: + last_check = False + + try: + err, result = result_queue.get(timeout=1) + break + except queue.Empty: + if not proc.is_alive() and should_continue: + # Let's check one last time, just in case it stopped + # between our last check and now + last_check = True + should_continue = False + continue + else: + raise PluginError("Background process died with error code {}".format(proc.exitcode)) + + try: + proc.join(timeout=15) + proc.terminate() + except TimeoutError: + raise PluginError("Background process didn't exit after 15 seconds and got killed.") + + if err is not None: + if isinstance(err, str): + # This was a pickle error, this is a bug + raise PluginError( + "An error happened while returning the result from a blocking activity", detail=err + ) + raise err + + return result + def call(self, *popenargs, fail: Optional[str] = None, fail_temporarily: bool = False, **kwargs) -> int: """A wrapper for subprocess.call() diff --git a/src/buildstream/sandbox/_sandboxbuildboxrun.py b/src/buildstream/sandbox/_sandboxbuildboxrun.py index 3d71b7440..1c187d7fd 100644 --- a/src/buildstream/sandbox/_sandboxbuildboxrun.py +++ b/src/buildstream/sandbox/_sandboxbuildboxrun.py @@ -184,17 +184,27 @@ class SandboxBuildBoxRun(SandboxREAPI): try: while True: try: - returncode = process.wait() + # Here, we don't use `process.wait()` directly without a timeout + # This is because, if we were to do that, and the process would never + # output anything, the control would never be given back to the python + # process, which might thus not be able to check for request to + # shutdown, or kill the process. + # We therefore loop with a timeout, to ensure the python process + # can act if it needs. + returncode = process.wait(timeout=1) # If the process exits due to a signal, we # brutally murder it to avoid zombies if returncode < 0: utils._kill_process_tree(process.pid) + except subprocess.TimeoutExpired: + continue + # Unlike in the bwrap case, here only the main # process seems to receive the SIGINT. We pass # on the signal to the child and then continue # to wait. - except KeyboardInterrupt: + except _signals.TerminateException: process.send_signal(signal.SIGINT) continue diff --git a/src/buildstream/sandbox/_sandboxremote.py b/src/buildstream/sandbox/_sandboxremote.py index 6cba7d611..2ac159337 100644 --- a/src/buildstream/sandbox/_sandboxremote.py +++ b/src/buildstream/sandbox/_sandboxremote.py @@ -26,7 +26,6 @@ from functools import partial import grpc -from .. import utils from ..node import Node from .._message import Message, MessageType from ._sandboxreapi import SandboxREAPI @@ -59,9 +58,6 @@ class SandboxRemote(SandboxREAPI): if config is None: return - # gRPC doesn't support fork without exec, which is used in the main process. - assert not utils._is_main_process() - self.storage_url = config.storage_service["url"] self.exec_url = config.exec_service["url"] diff --git a/src/buildstream/source.py b/src/buildstream/source.py index e0a2db45d..3268d3a93 100644 --- a/src/buildstream/source.py +++ b/src/buildstream/source.py @@ -928,17 +928,14 @@ class Source(Plugin): clean = node.strip_node_info() to_modify = node.strip_node_info() - current_ref = self.get_ref() # pylint: disable=assignment-from-no-return - # Set the ref regardless of whether it changed, the # TrackQueue() will want to update a specific node with # the ref, regardless of whether the original has changed. self.set_ref(new_ref, to_modify) - if current_ref == new_ref or not save: - # Note: We do not look for and propagate changes at this point - # which might result in desync depending if something changes about - # tracking in the future. For now, this is quite safe. + # FIXME: this will save things too often, as a ref might not have + # changed. We should optimize this to detect it differently + if not save: return False # Ensure the node is not from a junction diff --git a/src/buildstream/testing/_fixtures.py b/src/buildstream/testing/_fixtures.py index 98778936d..095a5f6f1 100644 --- a/src/buildstream/testing/_fixtures.py +++ b/src/buildstream/testing/_fixtures.py @@ -16,29 +16,48 @@ # pylint: disable=redefined-outer-name +import time + import psutil import pytest -from buildstream import node, utils +from buildstream import node, DownloadableFileSource + + +# Number of seconds to wait for background threads to exit. +_AWAIT_THREADS_TIMEOUT_SECONDS = 5 + + +def has_no_unexpected_background_threads(expected_num_threads): + # Use psutil as threading.active_count() doesn't include gRPC threads. + process = psutil.Process() + + wait = 0.1 + for _ in range(0, int(_AWAIT_THREADS_TIMEOUT_SECONDS / wait)): + if process.num_threads() == expected_num_threads: + return True + time.sleep(wait) + + return False @pytest.fixture(autouse=True, scope="session") def default_thread_number(): # xdist/execnet has its own helper thread. - # Ignore that for `utils._is_single_threaded` checks. - utils._INITIAL_NUM_THREADS_IN_MAIN_PROCESS = psutil.Process().num_threads() + return psutil.Process().num_threads() # Catch tests that don't shut down background threads, which could then lead # to other tests hanging when BuildStream uses fork(). @pytest.fixture(autouse=True) def thread_check(default_thread_number): - assert utils._is_single_threaded() + assert has_no_unexpected_background_threads(default_thread_number) yield - assert utils._is_single_threaded() + assert has_no_unexpected_background_threads(default_thread_number) # Reset global state in node.pyx to improve test isolation @pytest.fixture(autouse=True) def reset_global_node_state(): node._reset_global_state() + DownloadableFileSource._reset_url_opener() diff --git a/src/buildstream/utils.py b/src/buildstream/utils.py index 9c6761ccc..04bbf261c 100644 --- a/src/buildstream/utils.py +++ b/src/buildstream/utils.py @@ -32,8 +32,9 @@ import signal import stat from stat import S_ISDIR import subprocess +from subprocess import TimeoutExpired import tempfile -import time +import threading import datetime import itertools from contextlib import contextmanager @@ -60,16 +61,6 @@ BST_ARBITRARY_TIMESTAMP = calendar.timegm((2011, 11, 11, 11, 11, 11)) _ALIAS_SEPARATOR = ":" _URI_SCHEMES = ["http", "https", "ftp", "file", "git", "sftp", "ssh"] -# Main process pid -_MAIN_PID = os.getpid() - -# The number of threads in the main process at startup. -# This is 1 except for certain test environments (xdist/execnet). -_INITIAL_NUM_THREADS_IN_MAIN_PROCESS = 1 - -# Number of seconds to wait for background threads to exit. -_AWAIT_THREADS_TIMEOUT_SECONDS = 5 - # The process's file mode creation mask. # Impossible to retrieve without temporarily changing it on POSIX. _UMASK = os.umask(0o777) @@ -868,13 +859,12 @@ def _pretty_size(size, dec_places=0): return "{size:g}{unit}".format(size=round(psize, dec_places), unit=unit) -# _is_main_process() +# _is_in_main_thread() # -# Return whether we are in the main process or not. +# Return whether we are running in the main thread or not # -def _is_main_process(): - assert _MAIN_PID is not None - return os.getpid() == _MAIN_PID +def _is_in_main_thread(): + return threading.current_thread() is threading.main_thread() # Remove a path and any empty directories leading up to it. @@ -1392,7 +1382,20 @@ def _call(*popenargs, terminate=False, **kwargs): process = subprocess.Popen( # pylint: disable=subprocess-popen-preexec-fn *popenargs, preexec_fn=preexec_fn, universal_newlines=True, **kwargs ) - output, _ = process.communicate() + # Here, we don't use `process.communicate()` directly without a timeout + # This is because, if we were to do that, and the process would never + # output anything, the control would never be given back to the python + # process, which might thus not be able to check for request to + # shutdown, or kill the process. + # We therefore loop with a timeout, to ensure the python process + # can act if it needs. + while True: + try: + output, _ = process.communicate(timeout=1) + break + except TimeoutExpired: + continue + exit_code = process.poll() return (exit_code, output) @@ -1549,21 +1552,6 @@ def _search_upward_for_files(directory, filenames): directory = parent_dir -# _deterministic_umask() -# -# Context managed to apply a umask to a section that may be affected by a users -# umask. Restores old mask afterwards. -# -@contextmanager -def _deterministic_umask(): - old_umask = os.umask(0o022) - - try: - yield - finally: - os.umask(old_umask) - - # _get_compression: # # Given a file name infer the compression @@ -1601,30 +1589,6 @@ def _get_compression(tar): return "" -# _is_single_threaded() -# -# Return whether the current Process is single-threaded. Don't count threads -# in the main process that were created by a test environment (xdist/execnet) -# before BuildStream was executed. -# -def _is_single_threaded(): - # Use psutil as threading.active_count() doesn't include gRPC threads. - process = psutil.Process() - - if process.pid == _MAIN_PID: - expected_num_threads = _INITIAL_NUM_THREADS_IN_MAIN_PROCESS - else: - expected_num_threads = 1 - - # gRPC threads are not joined when shut down. Wait for them to exit. - wait = 0.1 - for _ in range(0, int(_AWAIT_THREADS_TIMEOUT_SECONDS / wait)): - if process.num_threads() == expected_num_threads: - return True - time.sleep(wait) - return False - - # _parse_version(): # # Args: diff --git a/tests/artifactcache/pull.py b/tests/artifactcache/pull.py index e6eaec960..63e6d9814 100644 --- a/tests/artifactcache/pull.py +++ b/tests/artifactcache/pull.py @@ -162,6 +162,7 @@ def test_pull_tree(cli, tmpdir, datafiles): # Assert that we are not cached locally anymore artifactcache.close_grpc_channels() + cas._casd_channel.request_shutdown() cas.close_grpc_channels() assert cli.get_element_state(project_dir, "target.bst") != "cached" diff --git a/tests/internals/cascache.py b/tests/internals/cascache.py index 043531c24..e27e40974 100644 --- a/tests/internals/cascache.py +++ b/tests/internals/cascache.py @@ -3,6 +3,7 @@ import time from unittest.mock import MagicMock from buildstream._cas.cascache import CASCache +from buildstream._cas import casdprocessmanager from buildstream._message import MessageType from buildstream._messenger import Messenger @@ -31,6 +32,10 @@ def test_report_when_cascache_exits_not_cleanly(tmp_path, monkeypatch): dummy_buildbox_casd.write_text("#!/usr/bin/env sh\nwhile :\ndo\nsleep 60\ndone") dummy_buildbox_casd.chmod(0o777) monkeypatch.setenv("PATH", str(tmp_path), prepend=os.pathsep) + # FIXME: this is a hack, we should instead have a socket be created nicely + # on the fake casd script. This whole test suite probably would + # need some cleanup + monkeypatch.setattr(casdprocessmanager, "_CASD_TIMEOUT", 0.1) messenger = MagicMock(spec_set=Messenger) cache = CASCache(str(tmp_path.joinpath("casd")), casd=True, log_directory=str(tmp_path.joinpath("logs"))) @@ -50,6 +55,10 @@ def test_report_when_cascache_is_forcefully_killed(tmp_path, monkeypatch): dummy_buildbox_casd.write_text("#!/usr/bin/env sh\ntrap 'echo hello' TERM\nwhile :\ndo\nsleep 60\ndone") dummy_buildbox_casd.chmod(0o777) monkeypatch.setenv("PATH", str(tmp_path), prepend=os.pathsep) + # FIXME: this is a hack, we should instead have a socket be created nicely + # on the fake casd script. This whole test suite probably would + # need some cleanup + monkeypatch.setattr(casdprocessmanager, "_CASD_TIMEOUT", 0.1) messenger = MagicMock(spec_set=Messenger) cache = CASCache(str(tmp_path.joinpath("casd")), casd=True, log_directory=str(tmp_path.joinpath("logs"))) |