summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBenjamin Schubert <1593324-BenjaminSchubert@users.noreply.gitlab.com>2020-12-04 15:25:54 +0000
committerBenjamin Schubert <1593324-BenjaminSchubert@users.noreply.gitlab.com>2020-12-04 15:25:54 +0000
commit458c05751f089673dc1cae8b8edfabfa32cf21b7 (patch)
tree7fc64a082ec4a15079e5d6907a6605ac2d19e7b6
parent6b961f20a8b2174a342e9b92e68752b2ff19da29 (diff)
parentc9847346c42c322cbd595a0de832711eba2e0f7d (diff)
downloadbuildstream-458c05751f089673dc1cae8b8edfabfa32cf21b7.tar.gz
Merge branch 'bschubert/no-multiprocessing' into 'master'
Rework the scheduler to use threads instead of processes Closes #911, #93, and #810 See merge request BuildStream/buildstream!1982
-rw-r--r--.pylintrc1
-rw-r--r--setup.cfg2
-rwxr-xr-xsetup.py1
-rw-r--r--src/buildstream/_cas/cascache.py73
-rw-r--r--src/buildstream/_cas/casdprocessmanager.py95
-rw-r--r--src/buildstream/_cas/casserver.py7
-rw-r--r--src/buildstream/_context.py14
-rw-r--r--src/buildstream/_elementsources.py4
-rw-r--r--src/buildstream/_messenger.py60
-rw-r--r--src/buildstream/_remote.py56
-rw-r--r--src/buildstream/_scheduler/_multiprocessing.py79
-rw-r--r--src/buildstream/_scheduler/jobs/_job.pyi1
-rw-r--r--src/buildstream/_scheduler/jobs/_job.pyx15
-rw-r--r--src/buildstream/_scheduler/jobs/job.py295
-rw-r--r--src/buildstream/_scheduler/queues/trackqueue.py7
-rw-r--r--src/buildstream/_scheduler/scheduler.py85
-rw-r--r--src/buildstream/_signals.py83
-rw-r--r--src/buildstream/_workspaces.py2
-rw-r--r--src/buildstream/downloadablefilesource.py105
-rw-r--r--src/buildstream/element.py35
-rw-r--r--src/buildstream/plugin.py147
-rw-r--r--src/buildstream/sandbox/_sandboxbuildboxrun.py14
-rw-r--r--src/buildstream/sandbox/_sandboxremote.py4
-rw-r--r--src/buildstream/source.py9
-rw-r--r--src/buildstream/testing/_fixtures.py29
-rw-r--r--src/buildstream/utils.py76
-rw-r--r--tests/artifactcache/pull.py1
-rw-r--r--tests/internals/cascache.py9
28 files changed, 697 insertions, 612 deletions
diff --git a/.pylintrc b/.pylintrc
index 25d5647b0..806a11395 100644
--- a/.pylintrc
+++ b/.pylintrc
@@ -8,6 +8,7 @@ extension-pkg-whitelist=
buildstream._loader._loader,
buildstream._loader.loadelement,
buildstream._loader.types,
+ buildstream._scheduler.jobs._job,
buildstream._types,
buildstream._utils,
buildstream._variables,
diff --git a/setup.cfg b/setup.cfg
index 19ffbcbc2..283d78099 100644
--- a/setup.cfg
+++ b/setup.cfg
@@ -28,7 +28,7 @@ warn_no_return = True
# Ignore missing stubs for third-party packages.
# In future, these should be re-enabled if/when stubs for them become available.
-[mypy-copyreg,grpc,pluginbase,psutil,pyroaring,ruamel]
+[mypy-copyreg,grpc,pluginbase,psutil,pyroaring,ruamel,multiprocessing.forkserver]
ignore_missing_imports=True
# Ignore issues with generated files and vendored code
diff --git a/setup.py b/setup.py
index a7c698030..18b5e42be 100755
--- a/setup.py
+++ b/setup.py
@@ -319,6 +319,7 @@ BUILD_EXTENSIONS = []
register_cython_module("buildstream.node")
register_cython_module("buildstream._loader._loader")
register_cython_module("buildstream._loader.loadelement", dependencies=["buildstream.node"])
+register_cython_module("buildstream._scheduler.jobs._job")
register_cython_module("buildstream._yaml", dependencies=["buildstream.node"])
register_cython_module("buildstream._types")
register_cython_module("buildstream._utils")
diff --git a/src/buildstream/_cas/cascache.py b/src/buildstream/_cas/cascache.py
index 7936121ea..d13531c6c 100644
--- a/src/buildstream/_cas/cascache.py
+++ b/src/buildstream/_cas/cascache.py
@@ -22,11 +22,9 @@ import itertools
import os
import stat
import contextlib
-import ctypes
-import multiprocessing
-import signal
import time
from typing import Optional, List
+import threading
import grpc
@@ -34,7 +32,7 @@ from .._protos.google.rpc import code_pb2
from .._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
from .._protos.build.buildgrid import local_cas_pb2
-from .. import _signals, utils
+from .. import utils
from ..types import FastEnum, SourceRef
from .._exceptions import CASCacheError
@@ -93,6 +91,7 @@ class CASCache:
self._casd_channel = self._casd_process_manager.create_channel()
self._cache_usage_monitor = _CASCacheUsageMonitor(self._casd_channel)
+ self._cache_usage_monitor.start()
# get_cas():
#
@@ -131,8 +130,12 @@ class CASCache:
# Release resources used by CASCache.
#
def release_resources(self, messenger=None):
+ if self._casd_channel:
+ self._casd_channel.request_shutdown()
+
if self._cache_usage_monitor:
- self._cache_usage_monitor.release_resources()
+ self._cache_usage_monitor.stop()
+ self._cache_usage_monitor.join()
if self._casd_process_manager:
self.close_grpc_channels()
@@ -731,65 +734,45 @@ class _CASCacheUsage:
# This manages the subprocess that tracks cache usage information via
# buildbox-casd.
#
-class _CASCacheUsageMonitor:
+class _CASCacheUsageMonitor(threading.Thread):
def __init__(self, connection):
+ super().__init__()
self._connection = connection
-
- # Shared memory (64-bit signed integer) for current disk usage and quota
- self._disk_usage = multiprocessing.Value(ctypes.c_longlong, -1)
- self._disk_quota = multiprocessing.Value(ctypes.c_longlong, -1)
-
- # multiprocessing.Process will fork without exec on Unix.
- # This can't be allowed with background threads or open gRPC channels.
- assert utils._is_single_threaded() and connection.is_closed()
-
- # Block SIGINT, we don't want to kill the process when we interrupt the frontend
- # and this process if very lightweight.
- with _signals.blocked([signal.SIGINT], ignore=False):
- self._subprocess = multiprocessing.Process(target=self._subprocess_run)
- self._subprocess.start()
+ self._disk_usage = None
+ self._disk_quota = None
+ self._should_stop = False
def get_cache_usage(self):
- disk_usage = self._disk_usage.value
- disk_quota = self._disk_quota.value
-
- if disk_usage < 0:
- # Disk usage still unknown
- disk_usage = None
-
- if disk_quota <= 0:
- # No disk quota
- disk_quota = None
-
- return _CASCacheUsage(disk_usage, disk_quota)
-
- def release_resources(self):
- # Simply terminate the subprocess, no cleanup required in the subprocess
- self._subprocess.terminate()
+ return _CASCacheUsage(self._disk_usage, self._disk_quota)
- def _subprocess_run(self):
- # Reset SIGTERM in subprocess to default as no cleanup is necessary
- signal.signal(signal.SIGTERM, signal.SIG_DFL)
+ def stop(self):
+ self._should_stop = True
- disk_usage = self._disk_usage
- disk_quota = self._disk_quota
+ def run(self):
local_cas = self._connection.get_local_cas()
- while True:
+ while not self._should_stop:
try:
# Ask buildbox-casd for current value
request = local_cas_pb2.GetLocalDiskUsageRequest()
response = local_cas.GetLocalDiskUsage(request)
# Update values in shared memory
- disk_usage.value = response.size_bytes
- disk_quota.value = response.quota_bytes
+ self._disk_usage = response.size_bytes
+ disk_quota = response.quota_bytes
+ if disk_quota == 0: # Quota == 0 means there is no quota
+ self._disk_quota = None
+ else:
+ self._disk_quota = disk_quota
except grpc.RpcError:
# Terminate loop when buildbox-casd becomes unavailable
break
# Sleep until next refresh
- time.sleep(_CACHE_USAGE_REFRESH)
+ for _ in range(_CACHE_USAGE_REFRESH * 10):
+ if self._should_stop:
+ break
+ time.sleep(0.1)
def _grouper(iterable, n):
diff --git a/src/buildstream/_cas/casdprocessmanager.py b/src/buildstream/_cas/casdprocessmanager.py
index 32e4cce63..20ff610eb 100644
--- a/src/buildstream/_cas/casdprocessmanager.py
+++ b/src/buildstream/_cas/casdprocessmanager.py
@@ -17,6 +17,7 @@
#
import contextlib
+import threading
import os
import random
import shutil
@@ -240,35 +241,48 @@ class CASDChannel:
self._asset_fetch = None
self._asset_push = None
self._casd_pid = casd_pid
+ self._shutdown_requested = False
- def _establish_connection(self):
- assert self._casd_channel is None
-
- while not os.path.exists(self._socket_path):
- # casd is not ready yet, try again after a 10ms delay,
- # but don't wait for more than specified timeout period
- if time.time() > self._start_time + _CASD_TIMEOUT:
- raise CASCacheError("Timed out waiting for buildbox-casd to become ready")
+ self._lock = threading.Lock()
- # check that process is still alive
- try:
- proc = psutil.Process(self._casd_pid)
- if proc.status() == psutil.STATUS_ZOMBIE:
- proc.wait()
+ def _establish_connection(self):
+ with self._lock:
+ if self._casd_channel is not None:
+ return
+
+ while not os.path.exists(self._socket_path):
+ # casd is not ready yet, try again after a 10ms delay,
+ # but don't wait for more than specified timeout period
+ if time.time() > self._start_time + _CASD_TIMEOUT:
+ raise CASCacheError("Timed out waiting for buildbox-casd to become ready")
+
+ if self._shutdown_requested:
+ # Shutdown has been requested, we can exit
+ return
- if not proc.is_running():
+ # check that process is still alive
+ try:
+ proc = psutil.Process(self._casd_pid)
+ if proc.status() == psutil.STATUS_ZOMBIE:
+ proc.wait()
+
+ if not proc.is_running():
+ if self._shutdown_requested:
+ return
+ raise CASCacheError("buildbox-casd process died before connection could be established")
+ except psutil.NoSuchProcess:
+ if self._shutdown_requested:
+ return
raise CASCacheError("buildbox-casd process died before connection could be established")
- except psutil.NoSuchProcess:
- raise CASCacheError("buildbox-casd process died before connection could be established")
- time.sleep(0.01)
+ time.sleep(0.01)
- self._casd_channel = grpc.insecure_channel(self._connection_string)
- self._bytestream = bytestream_pb2_grpc.ByteStreamStub(self._casd_channel)
- self._casd_cas = remote_execution_pb2_grpc.ContentAddressableStorageStub(self._casd_channel)
- self._local_cas = local_cas_pb2_grpc.LocalContentAddressableStorageStub(self._casd_channel)
- self._asset_fetch = remote_asset_pb2_grpc.FetchStub(self._casd_channel)
- self._asset_push = remote_asset_pb2_grpc.PushStub(self._casd_channel)
+ self._casd_channel = grpc.insecure_channel(self._connection_string)
+ self._bytestream = bytestream_pb2_grpc.ByteStreamStub(self._casd_channel)
+ self._casd_cas = remote_execution_pb2_grpc.ContentAddressableStorageStub(self._casd_channel)
+ self._local_cas = local_cas_pb2_grpc.LocalContentAddressableStorageStub(self._casd_channel)
+ self._asset_fetch = remote_asset_pb2_grpc.FetchStub(self._casd_channel)
+ self._asset_push = remote_asset_pb2_grpc.PushStub(self._casd_channel)
# get_cas():
#
@@ -284,12 +298,12 @@ class CASDChannel:
# Return LocalCAS stub for buildbox-casd channel.
#
def get_local_cas(self):
- if self._casd_channel is None:
+ if self._local_cas is None:
self._establish_connection()
return self._local_cas
def get_bytestream(self):
- if self._casd_channel is None:
+ if self._bytestream is None:
self._establish_connection()
return self._bytestream
@@ -318,17 +332,30 @@ class CASDChannel:
def is_closed(self):
return self._casd_channel is None
+ # request_shutdown():
+ #
+ # Notify the channel that a shutdown of casd was requested.
+ #
+ # Thus we know that not being able to establish a connection is expected
+ # and no error will be reported in that case.
+ def request_shutdown(self) -> None:
+ self._shutdown_requested = True
+
# close():
#
# Close the casd channel.
#
def close(self):
- if self.is_closed():
- return
- self._asset_push = None
- self._asset_fetch = None
- self._local_cas = None
- self._casd_cas = None
- self._bytestream = None
- self._casd_channel.close()
- self._casd_channel = None
+ assert self._shutdown_requested, "Please request shutdown before closing"
+
+ with self._lock:
+ if self.is_closed():
+ return
+
+ self._asset_push = None
+ self._asset_fetch = None
+ self._local_cas = None
+ self._casd_cas = None
+ self._bytestream = None
+ self._casd_channel.close()
+ self._casd_channel = None
diff --git a/src/buildstream/_cas/casserver.py b/src/buildstream/_cas/casserver.py
index 013fb07dd..04c5eb836 100644
--- a/src/buildstream/_cas/casserver.py
+++ b/src/buildstream/_cas/casserver.py
@@ -30,6 +30,7 @@ import grpc
import click
from .._protos.build.bazel.remote.asset.v1 import remote_asset_pb2_grpc
+from .. import _signals
from .._protos.build.bazel.remote.execution.v2 import (
remote_execution_pb2,
remote_execution_pb2_grpc,
@@ -137,7 +138,11 @@ def create_server(repo, *, enable_push, quota, index_only, log_level=LogLevel.Le
_ReferenceStorageServicer(casd_channel, root, enable_push=enable_push), server
)
- yield server
+ # Ensure we have the signal handler set for SIGTERM
+ # This allows threads from GRPC to call our methods that do register
+ # handlers at exit.
+ with _signals.terminator(lambda: None):
+ yield server
finally:
casd_channel.close()
diff --git a/src/buildstream/_context.py b/src/buildstream/_context.py
index 0c2d1a150..87b23d50c 100644
--- a/src/buildstream/_context.py
+++ b/src/buildstream/_context.py
@@ -564,17 +564,3 @@ class Context:
log_directory=self.logdir,
)
return self._cascache
-
- # prepare_fork():
- #
- # Prepare this process for fork without exec. This is a safeguard against
- # fork issues with multiple threads and gRPC connections.
- #
- def prepare_fork(self):
- # gRPC channels must be closed before fork.
- for cache in [self._cascache, self._artifactcache, self._sourcecache]:
- if cache:
- cache.close_grpc_channels()
-
- # Do not allow fork if there are background threads.
- return utils._is_single_threaded()
diff --git a/src/buildstream/_elementsources.py b/src/buildstream/_elementsources.py
index 69051ee8a..a15d20ecb 100644
--- a/src/buildstream/_elementsources.py
+++ b/src/buildstream/_elementsources.py
@@ -100,7 +100,7 @@ class ElementSources:
else:
new_ref = source._track()
- refs.append((source._unique_id, new_ref))
+ refs.append((source._unique_id, new_ref, old_ref != new_ref))
# Complimentary warning that the new ref will be unused.
if old_ref != new_ref and workspace:
@@ -113,7 +113,7 @@ class ElementSources:
# Sources which do not implement track() will return None, produce
# a SKIP message in the UI if all sources produce None
#
- if all(ref is None for _, ref in refs):
+ if all(ref is None for _, ref, _ in refs):
raise SkipJob("Element sources are not trackable")
return refs
diff --git a/src/buildstream/_messenger.py b/src/buildstream/_messenger.py
index c37eca8bc..69a309f91 100644
--- a/src/buildstream/_messenger.py
+++ b/src/buildstream/_messenger.py
@@ -19,10 +19,10 @@
import os
import datetime
+import threading
from contextlib import contextmanager
from . import _signals
-from . import utils
from ._exceptions import BstError
from ._message import Message, MessageType
@@ -48,15 +48,17 @@ class _TimeData:
class Messenger:
def __init__(self):
- self._message_handler = None
- self._silence_scope_depth = 0
- self._log_handle = None
- self._log_filename = None
self._state = None
self._next_render = None # A Time object
self._active_simple_tasks = 0
self._render_status_cb = None
+ self._locals = threading.local()
+ self._locals.message_handler = None
+ self._locals.log_handle = None
+ self._locals.log_filename = None
+ self._locals.silence_scope_depth = 0
+
# set_message_handler()
#
# Sets the handler for any status messages propagated through
@@ -70,7 +72,7 @@ class Messenger:
# ) -> None
#
def set_message_handler(self, handler):
- self._message_handler = handler
+ self._locals.message_handler = handler
# set_state()
#
@@ -101,7 +103,7 @@ class Messenger:
# (bool): Whether messages are currently being silenced
#
def _silent_messages(self):
- return self._silence_scope_depth > 0
+ return self._locals.silence_scope_depth > 0
# message():
#
@@ -112,16 +114,15 @@ class Messenger:
# message: A Message object
#
def message(self, message):
-
# If we are recording messages, dump a copy into the open log file.
self._record_message(message)
# Send it off to the log handler (can be the frontend,
# or it can be the child task which will propagate
# to the frontend)
- assert self._message_handler
+ assert self._locals.message_handler
- self._message_handler(message, is_silenced=self._silent_messages())
+ self._locals.message_handler(message, is_silenced=self._silent_messages())
# silence()
#
@@ -141,12 +142,12 @@ class Messenger:
yield
return
- self._silence_scope_depth += 1
+ self._locals.silence_scope_depth += 1
try:
yield
finally:
- assert self._silence_scope_depth > 0
- self._silence_scope_depth -= 1
+ assert self._locals.silence_scope_depth > 0
+ self._locals.silence_scope_depth -= 1
# timed_activity()
#
@@ -264,22 +265,21 @@ class Messenger:
#
@contextmanager
def recorded_messages(self, filename, logdir):
-
# We dont allow recursing in this context manager, and
# we also do not allow it in the main process.
- assert self._log_handle is None
- assert self._log_filename is None
- assert not utils._is_main_process()
+ assert not hasattr(self._locals, "log_handle") or self._locals.log_handle is None
+ assert not hasattr(self._locals, "log_filename") or self._locals.log_filename is None
# Create the fully qualified logfile in the log directory,
# appending the pid and .log extension at the end.
- self._log_filename = os.path.join(logdir, "{}.{}.log".format(filename, os.getpid()))
+ self._locals.log_filename = os.path.join(logdir, "{}.{}.log".format(filename, os.getpid()))
+ self._locals.silence_scope_depth = 0
# Ensure the directory exists first
- directory = os.path.dirname(self._log_filename)
+ directory = os.path.dirname(self._locals.log_filename)
os.makedirs(directory, exist_ok=True)
- with open(self._log_filename, "a") as logfile:
+ with open(self._locals.log_filename, "a") as logfile:
# Write one last line to the log and flush it to disk
def flush_log():
@@ -294,12 +294,12 @@ class Messenger:
except RuntimeError:
os.fsync(logfile.fileno())
- self._log_handle = logfile
+ self._locals.log_handle = logfile
with _signals.terminator(flush_log):
- yield self._log_filename
+ yield self._locals.log_filename
- self._log_handle = None
- self._log_filename = None
+ self._locals.log_handle = None
+ self._locals.log_filename = None
# get_log_handle()
#
@@ -311,7 +311,7 @@ class Messenger:
# (file): The active logging file handle, or None
#
def get_log_handle(self):
- return self._log_handle
+ return self._locals.log_handle
# get_log_filename()
#
@@ -323,7 +323,7 @@ class Messenger:
# (str): The active logging filename, or None
#
def get_log_filename(self):
- return self._log_filename
+ return self._locals.log_filename
# timed_suspendable()
#
@@ -345,8 +345,6 @@ class Messenger:
stopped_time = datetime.datetime.now()
def resume_time():
- nonlocal timedata
- nonlocal stopped_time
sleep_time = datetime.datetime.now() - stopped_time
timedata.start_time += sleep_time
@@ -362,7 +360,7 @@ class Messenger:
#
def _record_message(self, message):
- if self._log_handle is None:
+ if self._locals.log_handle is None:
return
INDENT = " "
@@ -405,8 +403,8 @@ class Messenger:
)
# Write to the open log file
- self._log_handle.write("{}\n".format(text))
- self._log_handle.flush()
+ self._locals.log_handle.write("{}\n".format(text))
+ self._locals.log_handle.flush()
# _render_status()
#
diff --git a/src/buildstream/_remote.py b/src/buildstream/_remote.py
index d8b8e68fe..6d52ff56a 100644
--- a/src/buildstream/_remote.py
+++ b/src/buildstream/_remote.py
@@ -16,6 +16,7 @@
#
import os
+import threading
from collections import namedtuple
from urllib.parse import urlparse
@@ -146,41 +147,44 @@ class BaseRemote:
self.push = spec.push
self.url = spec.url
+ self._lock = threading.Lock()
+
# init():
#
# Initialize the given remote. This function must be called before
# any communication is performed, since such will otherwise fail.
#
def init(self):
- if self._initialized:
- return
-
- # Set up the communcation channel
- url = urlparse(self.spec.url)
- if url.scheme == "http":
- port = url.port or 80
- self.channel = grpc.insecure_channel("{}:{}".format(url.hostname, port))
- elif url.scheme == "https":
- port = url.port or 443
- try:
- server_cert, client_key, client_cert = _read_files(
- self.spec.server_cert, self.spec.client_key, self.spec.client_cert
+ with self._lock:
+ if self._initialized:
+ return
+
+ # Set up the communcation channel
+ url = urlparse(self.spec.url)
+ if url.scheme == "http":
+ port = url.port or 80
+ self.channel = grpc.insecure_channel("{}:{}".format(url.hostname, port))
+ elif url.scheme == "https":
+ port = url.port or 443
+ try:
+ server_cert, client_key, client_cert = _read_files(
+ self.spec.server_cert, self.spec.client_key, self.spec.client_cert
+ )
+ except FileNotFoundError as e:
+ raise RemoteError("Could not read certificates: {}".format(e)) from e
+ self.server_cert = server_cert
+ self.client_key = client_key
+ self.client_cert = client_cert
+ credentials = grpc.ssl_channel_credentials(
+ root_certificates=self.server_cert, private_key=self.client_key, certificate_chain=self.client_cert
)
- except FileNotFoundError as e:
- raise RemoteError("Could not read certificates: {}".format(e)) from e
- self.server_cert = server_cert
- self.client_key = client_key
- self.client_cert = client_cert
- credentials = grpc.ssl_channel_credentials(
- root_certificates=self.server_cert, private_key=self.client_key, certificate_chain=self.client_cert
- )
- self.channel = grpc.secure_channel("{}:{}".format(url.hostname, port), credentials)
- else:
- raise RemoteError("Unsupported URL: {}".format(self.spec.url))
+ self.channel = grpc.secure_channel("{}:{}".format(url.hostname, port), credentials)
+ else:
+ raise RemoteError("Unsupported URL: {}".format(self.spec.url))
- self._configure_protocols()
+ self._configure_protocols()
- self._initialized = True
+ self._initialized = True
def __enter__(self):
return self
diff --git a/src/buildstream/_scheduler/_multiprocessing.py b/src/buildstream/_scheduler/_multiprocessing.py
deleted file mode 100644
index 4864e140c..000000000
--- a/src/buildstream/_scheduler/_multiprocessing.py
+++ /dev/null
@@ -1,79 +0,0 @@
-#
-# Copyright (C) 2019 Bloomberg Finance LP
-#
-# This program is free software; you can redistribute it and/or
-# modify it under the terms of the GNU Lesser General Public
-# License as published by the Free Software Foundation; either
-# version 2 of the License, or (at your option) any later version.
-#
-# This library is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
-# Lesser General Public License for more details.
-#
-# You should have received a copy of the GNU Lesser General Public
-# License along with this library. If not, see <http://www.gnu.org/licenses/>.
-#
-
-# TLDR:
-# ALWAYS use `.AsyncioSafeProcess` when you have an asyncio event loop running and need a `multiprocessing.Process`
-#
-#
-# The upstream asyncio library doesn't play well with forking subprocesses while an event loop is running.
-#
-# The main problem that affects us is that the parent and the child will share some file handlers.
-# The most important one for us is the sig_handler_fd, which the loop uses to buffer signals received
-# by the app so that the asyncio loop can treat them afterwards.
-#
-# This sharing means that when we send a signal to the child, the sighandler in the child will write
-# it back to the parent sig_handler_fd, making the parent have to treat it too.
-# This is a problem for example when we sigterm the process. The scheduler will send sigterms to all its children,
-# which in turn will make the scheduler receive N SIGTERMs (one per child). Which in turn will send sigterms to
-# the children...
-#
-# We therefore provide a `AsyncioSafeProcess` derived from multiprocessing.Process that automatically
-# tries to cleanup the loop and never calls `waitpid` on the child process, which breaks our child watchers.
-#
-#
-# Relevant issues:
-# - Asyncio: support fork (https://bugs.python.org/issue21998)
-# - Asyncio: support multiprocessing (support fork) (https://bugs.python.org/issue22087)
-# - Signal delivered to a subprocess triggers parent's handler (https://bugs.python.org/issue31489)
-#
-#
-
-import multiprocessing
-import signal
-import sys
-from asyncio import set_event_loop_policy
-
-
-# _AsyncioSafeForkAwareProcess()
-#
-# Process class that doesn't call waitpid on its own.
-# This prevents conflicts with the asyncio child watcher.
-#
-# Also automatically close any running asyncio loop before calling
-# the actual run target
-#
-class _AsyncioSafeForkAwareProcess(multiprocessing.Process):
- # pylint: disable=attribute-defined-outside-init
- def start(self):
- self._popen = self._Popen(self)
- self._sentinel = self._popen.sentinel
-
- def run(self):
- signal.set_wakeup_fd(-1)
- set_event_loop_policy(None)
-
- super().run()
-
-
-if sys.platform != "win32":
- # Set the default event loop policy to automatically close our asyncio loop in child processes
- AsyncioSafeProcess = _AsyncioSafeForkAwareProcess
-
-else:
- # Windows doesn't support ChildWatcher that way anyways, we'll need another
- # implementation if we want it
- AsyncioSafeProcess = multiprocessing.Process
diff --git a/src/buildstream/_scheduler/jobs/_job.pyi b/src/buildstream/_scheduler/jobs/_job.pyi
new file mode 100644
index 000000000..fbf3e64de
--- /dev/null
+++ b/src/buildstream/_scheduler/jobs/_job.pyi
@@ -0,0 +1 @@
+def terminate_thread(thread_id: int): ...
diff --git a/src/buildstream/_scheduler/jobs/_job.pyx b/src/buildstream/_scheduler/jobs/_job.pyx
new file mode 100644
index 000000000..82f6ab044
--- /dev/null
+++ b/src/buildstream/_scheduler/jobs/_job.pyx
@@ -0,0 +1,15 @@
+from cpython.pystate cimport PyThreadState_SetAsyncExc
+from cpython.ref cimport PyObject
+from ..._signals import TerminateException
+
+
+# terminate_thread()
+#
+# Ask a given a given thread to terminate by raising an exception in it.
+#
+# Args:
+# thread_id (int): the thread id in which to throw the exception
+#
+def terminate_thread(long thread_id):
+ res = PyThreadState_SetAsyncExc(thread_id, <PyObject*> TerminateException)
+ assert res == 1
diff --git a/src/buildstream/_scheduler/jobs/job.py b/src/buildstream/_scheduler/jobs/job.py
index 08e40694e..2e8f5ca1a 100644
--- a/src/buildstream/_scheduler/jobs/job.py
+++ b/src/buildstream/_scheduler/jobs/job.py
@@ -25,17 +25,16 @@ import asyncio
import datetime
import itertools
import multiprocessing
-import os
-import signal
-import sys
+import threading
import traceback
# BuildStream toplevel imports
+from ... import utils
from ..._exceptions import ImplError, BstError, set_last_task_error, SkipJob
from ..._message import Message, MessageType, unconditional_messages
from ...types import FastEnum
-from ... import _signals, utils
-from .. import _multiprocessing
+from ._job import terminate_thread
+from ..._signals import TerminateException
# Return code values shutdown of job handling child processes
@@ -46,7 +45,6 @@ class _ReturnCode(FastEnum):
PERM_FAIL = 2
SKIPPED = 3
TERMINATED = 4
- KILLED = -9
# JobStatus:
@@ -131,7 +129,6 @@ class Job:
self._scheduler = scheduler # The scheduler
self._messenger = self._scheduler.context.messenger
self._pipe_r = None # The read end of a pipe for message passing
- self._process = None # The Process object
self._listening = False # Whether the parent is currently listening
self._suspended = False # Whether this job is currently suspended
self._max_retries = max_retries # Maximum number of automatic retries
@@ -144,6 +141,9 @@ class Job:
self._message_element_key = None # The task-wide element cache key
self._element = None # The Element() passed to the Job() constructor, if applicable
+ self._task = None # The task that is run
+ self._child = None
+
# set_name()
#
# Sets the name of this job
@@ -158,12 +158,14 @@ class Job:
assert not self._terminated, "Attempted to start process which was already terminated"
+ # FIXME: remove this, this is not necessary when using asyncio
self._pipe_r, pipe_w = multiprocessing.Pipe(duplex=False)
self._tries += 1
self._parent_start_listening()
- child_job = self.create_child_job( # pylint: disable=assignment-from-no-return
+ # FIXME: remove the parent/child separation, it's not needed anymore.
+ self._child = self.create_child_job( # pylint: disable=assignment-from-no-return
self.action_name,
self._messenger,
self._scheduler.context.logdir,
@@ -174,46 +176,28 @@ class Job:
self._message_element_key,
)
- self._process = _multiprocessing.AsyncioSafeProcess(target=child_job.child_action, args=[pipe_w],)
-
- # Block signals which are handled in the main process such that
- # the child process does not inherit the parent's state, but the main
- # process will be notified of any signal after we launch the child.
- #
- with _signals.blocked([signal.SIGINT, signal.SIGTSTP, signal.SIGTERM], ignore=False):
- with asyncio.get_child_watcher() as watcher:
- self._process.start()
- # Register the process to call `_parent_child_completed` once it is done
-
- # Close the write end of the pipe in the parent
- pipe_w.close()
+ loop = asyncio.get_event_loop()
- # Here we delay the call to the next loop tick. This is in order to be running
- # in the main thread, as the callback itself must be thread safe.
- def on_completion(pid, returncode):
- asyncio.get_event_loop().call_soon(self._parent_child_completed, pid, returncode)
+ async def execute():
+ result = await loop.run_in_executor(None, self._child.child_action, pipe_w)
+ await self._parent_child_completed(result)
- watcher.add_child_handler(self._process.pid, on_completion)
+ self._task = loop.create_task(execute())
# terminate()
#
# Politely request that an ongoing job terminate soon.
#
- # This will send a SIGTERM signal to the Job process.
+ # This will raise an exception in the child to ask it to exit.
#
def terminate(self):
-
- # First resume the job if it's suspended
- self.resume(silent=True)
-
self.message(MessageType.STATUS, "{} terminating".format(self.action_name))
# Make sure there is no garbage on the pipe
self._parent_stop_listening()
- # Terminate the process using multiprocessing API pathway
- if self._process:
- self._process.terminate()
+ if self._task:
+ self._child.terminate()
self._terminated = True
@@ -227,51 +211,6 @@ class Job:
def get_terminated(self):
return self._terminated
- # kill()
- #
- # Forcefully kill the process, and any children it might have.
- #
- def kill(self):
- # Force kill
- self.message(MessageType.WARN, "{} did not terminate gracefully, killing".format(self.action_name))
- if self._process:
- utils._kill_process_tree(self._process.pid)
-
- # suspend()
- #
- # Suspend this job.
- #
- def suspend(self):
- if not self._suspended:
- self.message(MessageType.STATUS, "{} suspending".format(self.action_name))
-
- try:
- # Use SIGTSTP so that child processes may handle and propagate
- # it to processes they start that become session leaders.
- os.kill(self._process.pid, signal.SIGTSTP)
-
- # For some reason we receive exactly one suspend event for
- # every SIGTSTP we send to the child process, even though the
- # child processes are setsid(). We keep a count of these so we
- # can ignore them in our event loop suspend_event().
- self._scheduler.internal_stops += 1
- self._suspended = True
- except ProcessLookupError:
- # ignore, process has already exited
- pass
-
- # resume()
- #
- # Resume this suspended job.
- #
- def resume(self, silent=False):
- if self._suspended:
- if not silent and not self._scheduler.terminated:
- self.message(MessageType.STATUS, "{} resuming".format(self.action_name))
-
- os.kill(self._process.pid, signal.SIGCONT)
- self._suspended = False
-
# set_message_element_name()
#
# This is called by Job subclasses to set the plugin instance element
@@ -380,10 +319,9 @@ class Job:
# Called in the main process courtesy of asyncio's ChildWatcher.add_child_handler()
#
# Args:
- # pid (int): The PID of the child which completed
# returncode (int): The return code of the child process
#
- def _parent_child_completed(self, pid, returncode):
+ async def _parent_child_completed(self, returncode):
self._parent_shutdown()
try:
@@ -414,16 +352,9 @@ class Job:
status = JobStatus.FAIL
elif returncode == _ReturnCode.TERMINATED:
if self._terminated:
- self.message(MessageType.INFO, "Process was terminated")
- else:
- self.message(MessageType.ERROR, "Process was terminated unexpectedly")
-
- status = JobStatus.FAIL
- elif returncode == _ReturnCode.KILLED:
- if self._terminated:
- self.message(MessageType.INFO, "Process was killed")
+ self.message(MessageType.INFO, "Job terminated")
else:
- self.message(MessageType.ERROR, "Process was killed unexpectedly")
+ self.message(MessageType.ERROR, "Job was terminated unexpectedly")
status = JobStatus.FAIL
else:
@@ -434,7 +365,7 @@ class Job:
# Force the deletion of the pipe and process objects to try and clean up FDs
self._pipe_r.close()
- self._pipe_r = self._process = None
+ self._pipe_r = self._task = None
# _parent_process_envelope()
#
@@ -549,6 +480,9 @@ class ChildJob:
self._message_element_key = message_element_key
self._pipe_w = None # The write end of a pipe for message passing
+ self._thread_id = None # Thread in which the child executes its action
+ self._should_terminate = False
+ self._terminate_lock = threading.Lock()
# message():
#
@@ -615,19 +549,6 @@ class ChildJob:
# pipe_w (multiprocessing.connection.Connection): The message pipe for IPC
#
def child_action(self, pipe_w):
-
- # This avoids some SIGTSTP signals from grandchildren
- # getting propagated up to the master process
- os.setsid()
-
- # First set back to the default signal handlers for the signals
- # we handle, and then clear their blocked state.
- #
- signal_list = [signal.SIGTSTP, signal.SIGTERM]
- for sig in signal_list:
- signal.signal(sig, signal.SIG_DFL)
- signal.pthread_sigmask(signal.SIG_UNBLOCK, signal_list)
-
# Assign the pipe we passed across the process boundaries
#
# Set the global message handler in this child
@@ -635,78 +556,108 @@ class ChildJob:
self._pipe_w = pipe_w
self._messenger.set_message_handler(self._child_message_handler)
- # Graciously handle sigterms.
- def handle_sigterm():
- self._child_shutdown(_ReturnCode.TERMINATED)
-
# Time, log and and run the action function
#
- with _signals.terminator(
- handle_sigterm
- ), self._messenger.timed_suspendable() as timeinfo, self._messenger.recorded_messages(
+ with self._messenger.timed_suspendable() as timeinfo, self._messenger.recorded_messages(
self._logfile, self._logdir
) as filename:
- self.message(MessageType.START, self.action_name, logfile=filename)
-
try:
- # Try the task action
- result = self.child_process() # pylint: disable=assignment-from-no-return
- except SkipJob as e:
- elapsed = datetime.datetime.now() - timeinfo.start_time
- self.message(MessageType.SKIPPED, str(e), elapsed=elapsed, logfile=filename)
-
- # Alert parent of skip by return code
- self._child_shutdown(_ReturnCode.SKIPPED)
- except BstError as e:
- elapsed = datetime.datetime.now() - timeinfo.start_time
- retry_flag = e.temporary
-
- if retry_flag and (self._tries <= self._max_retries):
- self.message(
- MessageType.FAIL,
- "Try #{} failed, retrying".format(self._tries),
- elapsed=elapsed,
- logfile=filename,
- )
- else:
- self.message(
- MessageType.FAIL, str(e), elapsed=elapsed, detail=e.detail, logfile=filename, sandbox=e.sandbox
- )
+ self.message(MessageType.START, self.action_name, logfile=filename)
+
+ with self._terminate_lock:
+ self._thread_id = threading.current_thread().ident
+ if self._should_terminate:
+ return _ReturnCode.TERMINATED
+
+ try:
+ # Try the task action
+ result = self.child_process() # pylint: disable=assignment-from-no-return
+ except SkipJob as e:
+ elapsed = datetime.datetime.now() - timeinfo.start_time
+ self.message(MessageType.SKIPPED, str(e), elapsed=elapsed, logfile=filename)
+
+ # Alert parent of skip by return code
+ return _ReturnCode.SKIPPED
+ except BstError as e:
+ elapsed = datetime.datetime.now() - timeinfo.start_time
+ retry_flag = e.temporary
+
+ if retry_flag and (self._tries <= self._max_retries):
+ self.message(
+ MessageType.FAIL,
+ "Try #{} failed, retrying".format(self._tries),
+ elapsed=elapsed,
+ logfile=filename,
+ )
+ else:
+ self.message(
+ MessageType.FAIL,
+ str(e),
+ elapsed=elapsed,
+ detail=e.detail,
+ logfile=filename,
+ sandbox=e.sandbox,
+ )
+
+ self._send_message(_MessageType.CHILD_DATA, self.child_process_data())
+
+ # Report the exception to the parent (for internal testing purposes)
+ self._child_send_error(e)
+
+ # Set return code based on whether or not the error was temporary.
+ #
+ return _ReturnCode.FAIL if retry_flag else _ReturnCode.PERM_FAIL
+ except Exception: # pylint: disable=broad-except
+
+ # If an unhandled (not normalized to BstError) occurs, that's a bug,
+ # send the traceback and formatted exception back to the frontend
+ # and print it to the log file.
+ #
+ elapsed = datetime.datetime.now() - timeinfo.start_time
+ detail = "An unhandled exception occured:\n\n{}".format(traceback.format_exc())
+
+ self.message(MessageType.BUG, self.action_name, elapsed=elapsed, detail=detail, logfile=filename)
+ # Unhandled exceptions should permenantly fail
+ return _ReturnCode.PERM_FAIL
- self._send_message(_MessageType.CHILD_DATA, self.child_process_data())
-
- # Report the exception to the parent (for internal testing purposes)
- self._child_send_error(e)
-
- # Set return code based on whether or not the error was temporary.
- #
- self._child_shutdown(_ReturnCode.FAIL if retry_flag else _ReturnCode.PERM_FAIL)
-
- except Exception: # pylint: disable=broad-except
-
- # If an unhandled (not normalized to BstError) occurs, that's a bug,
- # send the traceback and formatted exception back to the frontend
- # and print it to the log file.
- #
- elapsed = datetime.datetime.now() - timeinfo.start_time
- detail = "An unhandled exception occured:\n\n{}".format(traceback.format_exc())
+ else:
+ # No exception occurred in the action
+ self._send_message(_MessageType.CHILD_DATA, self.child_process_data())
+ self._child_send_result(result)
+
+ elapsed = datetime.datetime.now() - timeinfo.start_time
+ self.message(MessageType.SUCCESS, self.action_name, elapsed=elapsed, logfile=filename)
+
+ # Shutdown needs to stay outside of the above context manager,
+ # make sure we dont try to handle SIGTERM while the process
+ # is already busy in sys.exit()
+ return _ReturnCode.OK
+ finally:
+ self._thread_id = None
+ except TerminateException:
+ self._thread_id = None
+ return _ReturnCode.TERMINATED
+ finally:
+ self._pipe_w.close()
- self.message(MessageType.BUG, self.action_name, elapsed=elapsed, detail=detail, logfile=filename)
- # Unhandled exceptions should permenantly fail
- self._child_shutdown(_ReturnCode.PERM_FAIL)
+ # terminate()
+ #
+ # Ask the the current child thread to terminate
+ #
+ # This should only ever be called from the main thread.
+ #
+ def terminate(self):
+ assert utils._is_in_main_thread(), "Terminating the job's thread should only be done from the scheduler"
- else:
- # No exception occurred in the action
- self._send_message(_MessageType.CHILD_DATA, self.child_process_data())
- self._child_send_result(result)
+ if self._should_terminate:
+ return
- elapsed = datetime.datetime.now() - timeinfo.start_time
- self.message(MessageType.SUCCESS, self.action_name, elapsed=elapsed, logfile=filename)
+ with self._terminate_lock:
+ self._should_terminate = True
+ if self._thread_id is None:
+ return
- # Shutdown needs to stay outside of the above context manager,
- # make sure we dont try to handle SIGTERM while the process
- # is already busy in sys.exit()
- self._child_shutdown(_ReturnCode.OK)
+ terminate_thread(self._thread_id)
#######################################################
# Local Private Methods #
@@ -758,18 +709,6 @@ class ChildJob:
if result is not None:
self._send_message(_MessageType.RESULT, result)
- # _child_shutdown()
- #
- # Shuts down the child process by cleaning up and exiting the process
- #
- # Args:
- # exit_code (_ReturnCode): The exit code to exit with
- #
- def _child_shutdown(self, exit_code):
- self._pipe_w.close()
- assert isinstance(exit_code, _ReturnCode)
- sys.exit(exit_code.value)
-
# _child_message_handler()
#
# A Context delegate for handling messages, this replaces the
diff --git a/src/buildstream/_scheduler/queues/trackqueue.py b/src/buildstream/_scheduler/queues/trackqueue.py
index 5b3f05b57..5879cc125 100644
--- a/src/buildstream/_scheduler/queues/trackqueue.py
+++ b/src/buildstream/_scheduler/queues/trackqueue.py
@@ -56,9 +56,10 @@ class TrackQueue(Queue):
# Set the new refs in the main process one by one as they complete,
# writing to bst files this time
if result is not None:
- for unique_id, new_ref in result:
- source = Plugin._lookup(unique_id)
- source._set_ref(new_ref, save=True)
+ for unique_id, new_ref, ref_changed in result:
+ if ref_changed:
+ source = Plugin._lookup(unique_id)
+ source._set_ref(new_ref, save=True)
element._tracking_done()
diff --git a/src/buildstream/_scheduler/scheduler.py b/src/buildstream/_scheduler/scheduler.py
index 903cd0be9..7acb062d0 100644
--- a/src/buildstream/_scheduler/scheduler.py
+++ b/src/buildstream/_scheduler/scheduler.py
@@ -20,12 +20,15 @@
# Jürg Billeter <juerg.billeter@codethink.co.uk>
# System imports
+import functools
import os
import asyncio
from itertools import chain
import signal
import datetime
+import multiprocessing.forkserver
import sys
+from concurrent.futures import ThreadPoolExecutor
# Local imports
from .resources import Resources
@@ -34,9 +37,7 @@ from ..types import FastEnum
from .._profile import Topics, PROFILER
from .._message import Message, MessageType
from ..plugin import Plugin
-
-
-_MAX_TIMEOUT_TO_KILL_CHILDREN = 20 # in seconds
+from .. import _signals
# A decent return code for Scheduler.run()
@@ -46,6 +47,23 @@ class SchedStatus(FastEnum):
TERMINATED = 1
+def reset_signals_on_exit(func):
+ @functools.wraps(func)
+ def wrapper(*args, **kwargs):
+ orig_sigint = signal.getsignal(signal.SIGINT)
+ orig_sigterm = signal.getsignal(signal.SIGTERM)
+ orig_sigtstp = signal.getsignal(signal.SIGTSTP)
+
+ try:
+ return func(*args, **kwargs)
+ finally:
+ signal.signal(signal.SIGINT, orig_sigint)
+ signal.signal(signal.SIGTERM, orig_sigterm)
+ signal.signal(signal.SIGTSTP, orig_sigtstp)
+
+ return wrapper
+
+
# Scheduler()
#
# The scheduler operates on a list queues, each of which is meant to accomplish
@@ -79,7 +97,6 @@ class Scheduler:
# These are shared with the Job, but should probably be removed or made private in some way.
self.loop = None # Shared for Job access to observe the message queue
- self.internal_stops = 0 # Amount of SIGSTP signals we've introduced, this is shared with job.py
#
# Private members
@@ -98,6 +115,14 @@ class Scheduler:
self.resources = Resources(context.sched_builders, context.sched_fetchers, context.sched_pushers)
self._state.register_task_retry_callback(self._failure_retry)
+ # Ensure that the forkserver is started before we start.
+ # This is best run before we do any GRPC connections to casd or have
+ # other background threads started.
+ # We ignore signals here, as this is the state all the python child
+ # processes from now on will have when starting
+ with _signals.blocked([signal.SIGINT, signal.SIGTSTP], ignore=True):
+ multiprocessing.forkserver.ensure_running()
+
# run()
#
# Args:
@@ -113,6 +138,7 @@ class Scheduler:
# elements have been processed by each queue or when
# an error arises
#
+ @reset_signals_on_exit
def run(self, queues, casd_process_manager):
# Hold on to the queues to process
@@ -149,10 +175,18 @@ class Scheduler:
# Start the profiler
with PROFILER.profile(Topics.SCHEDULER, "_".join(queue.action_name for queue in self.queues)):
- # Run the queues
- self._sched()
- self.loop.run_forever()
- self.loop.close()
+ # This is not a no-op. Since it is the first signal registration
+ # that is set, it allows then other threads to register signal
+ # handling routines, which would not be possible if the main thread
+ # hadn't set it before.
+ # FIXME: this should be done in a cleaner way
+ with _signals.suspendable(lambda: None, lambda: None), _signals.terminator(lambda: None):
+ with ThreadPoolExecutor(max_workers=sum(self.resources._max_resources.values())) as pool:
+ self.loop.set_default_executor(pool)
+ # Run the queues
+ self._sched()
+ self.loop.run_forever()
+ self.loop.close()
# Invoke the ticker callback a final time to render pending messages
self._ticker_callback()
@@ -230,8 +264,8 @@ class Scheduler:
# Restart the scheduler
#
def resume(self):
- self._resume_jobs()
self._connect_signals()
+ self._resume_jobs()
# stop()
#
@@ -351,13 +385,6 @@ class Scheduler:
# If that happens, do another round.
process_queues = any(q.dequeue_ready() for q in self.queues)
- # Make sure fork is allowed before starting jobs
- if not self.context.prepare_fork():
- message = Message(MessageType.BUG, "Fork is not allowed", detail="Background threads are active")
- self.context.messenger.message(message)
- self.terminate()
- return
-
# Start the jobs
#
for job in ready:
@@ -415,9 +442,10 @@ class Scheduler:
if not self.suspended:
self._suspendtime = datetime.datetime.now()
self.suspended = True
- # Notify that we're suspended
- for job in self._active_jobs:
- job.suspend()
+ _signals.is_not_suspended.clear()
+
+ for suspender in reversed(_signals.suspendable_stack):
+ suspender.suspend()
# _resume_jobs()
#
@@ -425,8 +453,10 @@ class Scheduler:
#
def _resume_jobs(self):
if self.suspended:
- for job in self._active_jobs:
- job.resume()
+ for suspender in _signals.suspendable_stack:
+ suspender.resume()
+
+ _signals.is_not_suspended.set()
self.suspended = False
# Notify that we're unsuspended
self._state.offset_start_time(datetime.datetime.now() - self._suspendtime)
@@ -458,12 +488,6 @@ class Scheduler:
# A loop registered event callback for SIGTSTP
#
def _suspend_event(self):
-
- # Ignore the feedback signals from Job.suspend()
- if self.internal_stops:
- self.internal_stops -= 1
- return
-
# No need to care if jobs were suspended or not, we _only_ handle this
# while we know jobs are not suspended.
self._suspend_jobs()
@@ -485,13 +509,6 @@ class Scheduler:
self.loop.remove_signal_handler(signal.SIGTERM)
def _terminate_jobs_real(self):
- def kill_jobs():
- for job_ in self._active_jobs:
- job_.kill()
-
- # Schedule all jobs to be killed if they have not exited after timeout
- self.loop.call_later(_MAX_TIMEOUT_TO_KILL_CHILDREN, kill_jobs)
-
for job in self._active_jobs:
job.terminate()
diff --git a/src/buildstream/_signals.py b/src/buildstream/_signals.py
index 03b55b052..ef2ba1965 100644
--- a/src/buildstream/_signals.py
+++ b/src/buildstream/_signals.py
@@ -33,6 +33,23 @@ from typing import Callable, Deque
terminator_stack: Deque[Callable] = deque()
suspendable_stack: Deque[Callable] = deque()
+terminator_lock = threading.Lock()
+suspendable_lock = threading.Lock()
+
+# This event is used to block all the threads while we wait for user
+# interaction. This is because we can't stop all the pythin threads but the
+# one easily when waiting for user input. However, most performance intensive
+# tasks will pass through a subprocess or a multiprocess.Process and all of
+# those are guarded by the signal handling. Thus, by setting and unsetting this
+# event in the scheduler, we can enable and disable the launching of processes
+# and ensure we don't do anything resource intensive while being interrupted.
+is_not_suspended = threading.Event()
+is_not_suspended.set()
+
+
+class TerminateException(BaseException):
+ pass
+
# Per process SIGTERM handler
def terminator_handler(signal_, frame):
@@ -68,6 +85,9 @@ def terminator_handler(signal_, frame):
# that while the code block is running, the supplied function
# will be called upon process termination.
#
+# /!\ The callbacks passed must only contain code that does not acces thread
+# local variables. Those will run in the main thread.
+#
# Note that after handlers are called, the termination will be handled by
# terminating immediately with os._exit(). This means that SystemExit will not
# be raised and 'finally' clauses will not be executed.
@@ -80,23 +100,27 @@ def terminator_handler(signal_, frame):
def terminator(terminate_func):
global terminator_stack # pylint: disable=global-statement
- # Signal handling only works in the main thread
- if threading.current_thread() != threading.main_thread():
- yield
- return
-
outermost = bool(not terminator_stack)
- terminator_stack.append(terminate_func)
+ assert threading.current_thread() == threading.main_thread() or not outermost
+
+ with terminator_lock:
+ terminator_stack.append(terminate_func)
+
if outermost:
original_handler = signal.signal(signal.SIGTERM, terminator_handler)
try:
yield
+ except TerminateException:
+ terminate_func()
+ raise
finally:
if outermost:
signal.signal(signal.SIGTERM, original_handler)
- terminator_stack.pop()
+
+ with terminator_lock:
+ terminator_stack.remove(terminate_func)
# Just a simple object for holding on to two callbacks
@@ -108,21 +132,25 @@ class Suspender:
# Per process SIGTSTP handler
def suspend_handler(sig, frame):
+ is_not_suspended.clear()
# Suspend callbacks from innermost frame first
- for suspender in reversed(suspendable_stack):
- suspender.suspend()
+ with suspendable_lock:
+ for suspender in reversed(suspendable_stack):
+ suspender.suspend()
- # Use SIGSTOP directly now on self, dont introduce more SIGTSTP
- #
- # Here the process sleeps until SIGCONT, which we simply
- # dont handle. We know we'll pickup execution right here
- # when we wake up.
- os.kill(os.getpid(), signal.SIGSTOP)
+ # Use SIGSTOP directly now on self, dont introduce more SIGTSTP
+ #
+ # Here the process sleeps until SIGCONT, which we simply
+ # dont handle. We know we'll pickup execution right here
+ # when we wake up.
+ os.kill(os.getpid(), signal.SIGSTOP)
- # Resume callbacks from outermost frame inwards
- for suspender in suspendable_stack:
- suspender.resume()
+ # Resume callbacks from outermost frame inwards
+ for suspender in suspendable_stack:
+ suspender.resume()
+
+ is_not_suspended.set()
# suspendable()
@@ -133,6 +161,9 @@ def suspend_handler(sig, frame):
# suspend_callback (callable): A function to call as process suspend time.
# resume_callback (callable): A function to call as process resume time.
#
+# /!\ The callbacks passed must only contain code that does not acces thread
+# local variables. Those will run in the main thread.
+#
# This must be used in code blocks which start processes that become
# their own session leader. In these cases, SIGSTOP and SIGCONT need
# to be propagated to the child process group.
@@ -146,8 +177,19 @@ def suspendable(suspend_callback, resume_callback):
global suspendable_stack # pylint: disable=global-statement
outermost = bool(not suspendable_stack)
+ assert threading.current_thread() == threading.main_thread() or not outermost
+
+ # If we are not in the main thread, ensure that we are not suspended
+ # before running.
+ # If we are in the main thread, never block on this, to ensure we
+ # don't deadlock.
+ if threading.current_thread() != threading.main_thread():
+ is_not_suspended.wait()
+
suspender = Suspender(suspend_callback, resume_callback)
- suspendable_stack.append(suspender)
+
+ with suspendable_lock:
+ suspendable_stack.append(suspender)
if outermost:
original_stop = signal.signal(signal.SIGTSTP, suspend_handler)
@@ -158,7 +200,8 @@ def suspendable(suspend_callback, resume_callback):
if outermost:
signal.signal(signal.SIGTSTP, original_stop)
- suspendable_stack.pop()
+ with suspendable_lock:
+ suspendable_stack.remove(suspender)
# blocked()
diff --git a/src/buildstream/_workspaces.py b/src/buildstream/_workspaces.py
index f1fc353a7..e51be0829 100644
--- a/src/buildstream/_workspaces.py
+++ b/src/buildstream/_workspaces.py
@@ -428,7 +428,7 @@ class Workspaces:
# create_workspace permanent
#
def save_config(self):
- assert utils._is_main_process()
+ assert utils._is_in_main_thread()
config = {
"format-version": BST_WORKSPACE_FORMAT_VERSION,
diff --git a/src/buildstream/downloadablefilesource.py b/src/buildstream/downloadablefilesource.py
index 2c438b033..b299b7f1b 100644
--- a/src/buildstream/downloadablefilesource.py
+++ b/src/buildstream/downloadablefilesource.py
@@ -92,6 +92,33 @@ class _NetrcPasswordManager:
return login, password
+def _download_file(opener, url, etag, directory):
+ default_name = os.path.basename(url)
+ request = urllib.request.Request(url)
+ request.add_header("Accept", "*/*")
+ request.add_header("User-Agent", "BuildStream/2")
+
+ if etag is not None:
+ request.add_header("If-None-Match", etag)
+
+ with contextlib.closing(opener.open(request)) as response:
+ info = response.info()
+
+ # some servers don't honor the 'If-None-Match' header
+ if etag and info["ETag"] == etag:
+ return None, None
+
+ etag = info["ETag"]
+
+ filename = info.get_filename(default_name)
+ filename = os.path.basename(filename)
+ local_file = os.path.join(directory, filename)
+ with open(local_file, "wb") as dest:
+ shutil.copyfileobj(response, dest)
+
+ return local_file, etag
+
+
class DownloadableFileSource(Source):
# pylint: disable=attribute-defined-outside-init
@@ -130,19 +157,18 @@ class DownloadableFileSource(Source):
# there is no 'track' field in the source to determine what/whether
# or not to update refs, because tracking a ref is always a conscious
# decision by the user.
- with self.timed_activity("Tracking {}".format(self.url), silent_nested=True):
- new_ref = self._ensure_mirror()
-
- if self.ref and self.ref != new_ref:
- detail = (
- "When tracking, new ref differs from current ref:\n"
- + " Tracked URL: {}\n".format(self.url)
- + " Current ref: {}\n".format(self.ref)
- + " New ref: {}\n".format(new_ref)
- )
- self.warn("Potential man-in-the-middle attack!", detail=detail)
+ new_ref = self._ensure_mirror("Tracking {}".format(self.url))
- return new_ref
+ if self.ref and self.ref != new_ref:
+ detail = (
+ "When tracking, new ref differs from current ref:\n"
+ + " Tracked URL: {}\n".format(self.url)
+ + " Current ref: {}\n".format(self.ref)
+ + " New ref: {}\n".format(new_ref)
+ )
+ self.warn("Potential man-in-the-middle attack!", detail=detail)
+
+ return new_ref
def fetch(self): # pylint: disable=arguments-differ
@@ -155,12 +181,11 @@ class DownloadableFileSource(Source):
# Download the file, raise hell if the sha256sums don't match,
# and mirror the file otherwise.
- with self.timed_activity("Fetching {}".format(self.url), silent_nested=True):
- sha256 = self._ensure_mirror()
- if sha256 != self.ref:
- raise SourceError(
- "File downloaded from {} has sha256sum '{}', not '{}'!".format(self.url, sha256, self.ref)
- )
+ sha256 = self._ensure_mirror("Fetching {}".format(self.url),)
+ if sha256 != self.ref:
+ raise SourceError(
+ "File downloaded from {} has sha256sum '{}', not '{}'!".format(self.url, sha256, self.ref)
+ )
def _warn_deprecated_etag(self, node):
etag = node.get_str("etag", None)
@@ -181,40 +206,25 @@ class DownloadableFileSource(Source):
with utils.save_file_atomic(etagfilename) as etagfile:
etagfile.write(etag)
- def _ensure_mirror(self):
+ def _ensure_mirror(self, activity_name: str):
# Downloads from the url and caches it according to its sha256sum.
try:
with self.tempdir() as td:
- default_name = os.path.basename(self.url)
- request = urllib.request.Request(self.url)
- request.add_header("Accept", "*/*")
- request.add_header("User-Agent", "BuildStream/2")
-
# We do not use etag in case what we have in cache is
# not matching ref in order to be able to recover from
# corrupted download.
- if self.ref:
- etag = self._get_etag(self.ref)
-
+ if self.ref and not self.is_cached():
# Do not re-download the file if the ETag matches.
- if etag and self.is_cached():
- request.add_header("If-None-Match", etag)
-
- opener = self.__get_urlopener()
- with contextlib.closing(opener.open(request)) as response:
- info = response.info()
-
- # some servers don't honor the 'If-None-Match' header
- if self.ref and etag and info["ETag"] == etag:
- return self.ref
+ etag = self._get_etag(self.ref)
+ else:
+ etag = None
- etag = info["ETag"]
+ local_file, new_etag = self.blocking_activity(
+ _download_file, (self.__get_urlopener(), self.url, etag, td), activity_name
+ )
- filename = info.get_filename(default_name)
- filename = os.path.basename(filename)
- local_file = os.path.join(td, filename)
- with open(local_file, "wb") as dest:
- shutil.copyfileobj(response, dest)
+ if local_file is None:
+ return self.ref
# Make sure url-specific mirror dir exists.
if not os.path.isdir(self._mirror_dir):
@@ -226,8 +236,8 @@ class DownloadableFileSource(Source):
# In case the old file was corrupted somehow.
os.rename(local_file, self._get_mirror_file(sha256))
- if etag:
- self._store_etag(sha256, etag)
+ if new_etag:
+ self._store_etag(sha256, new_etag)
return sha256
except urllib.error.HTTPError as e:
@@ -252,6 +262,11 @@ class DownloadableFileSource(Source):
return self.__default_mirror_file
+ @classmethod
+ def _reset_url_opener(cls):
+ # Needed for tests, in order to cleanup the `netrc` configuration.
+ cls.__urlopener = None
+
def __get_urlopener(self):
if not DownloadableFileSource.__urlopener:
try:
diff --git a/src/buildstream/element.py b/src/buildstream/element.py
index ac16e3103..6c4e45d03 100644
--- a/src/buildstream/element.py
+++ b/src/buildstream/element.py
@@ -1497,15 +1497,9 @@ class Element(Plugin):
#
def _stage_sources_at(self, vdirectory, usebuildtree=False):
- context = self._get_context()
-
# It's advantageous to have this temporary directory on
# the same file system as the rest of our cache.
- with self.timed_activity("Staging sources", silent_nested=True), utils._tempdir(
- dir=context.tmpdir, prefix="staging-temp"
- ) as temp_staging_directory:
-
- import_dir = temp_staging_directory
+ with self.timed_activity("Staging sources", silent_nested=True):
if not isinstance(vdirectory, Directory):
vdirectory = FileBasedDirectory(vdirectory)
@@ -1534,8 +1528,7 @@ class Element(Plugin):
import_dir = staged_sources
# Set update_mtime to ensure deterministic mtime of sources at build time
- with utils._deterministic_umask():
- vdirectory.import_files(import_dir, update_mtime=BST_ARBITRARY_TIMESTAMP)
+ vdirectory.import_files(import_dir, update_mtime=BST_ARBITRARY_TIMESTAMP)
# Ensure deterministic owners of sources at build time
vdirectory.set_deterministic_user()
@@ -1549,7 +1542,7 @@ class Element(Plugin):
# scope (_Scope): The scope of dependencies to mark as required
#
def _set_required(self, scope=_Scope.RUN):
- assert utils._is_main_process(), "This has an impact on all elements and must be run in the main process"
+ assert utils._is_in_main_thread(), "This has an impact on all elements and must be run in the main thread"
if self.__required:
# Already done
@@ -1586,7 +1579,7 @@ class Element(Plugin):
# required in the local cache.
#
def _set_artifact_files_required(self, scope=_Scope.RUN):
- assert utils._is_main_process(), "This has an impact on all elements and must be run in the main process"
+ assert utils._is_in_main_thread(), "This has an impact on all elements and must be run in the main thread"
if self.__artifact_files_required:
# Already done
@@ -1636,7 +1629,7 @@ class Element(Plugin):
# in a subprocess.
#
def __schedule_assembly_when_necessary(self):
- assert utils._is_main_process(), "This has an impact on all elements and must be run in the main process"
+ assert utils._is_in_main_thread(), "This has an impact on all elements and must be run in the main thread"
# FIXME: We could reduce the number of function calls a bit by
# factoring this out of this method (and checking whether we
@@ -1669,7 +1662,7 @@ class Element(Plugin):
#
def _assemble_done(self, successful):
assert self.__assemble_scheduled
- assert utils._is_main_process(), "This has an impact on all elements and must be run in the main process"
+ assert utils._is_in_main_thread(), "This has an impact on all elements and must be run in the main thread"
self.__assemble_done = True
@@ -1689,8 +1682,6 @@ class Element(Plugin):
self._update_ready_for_runtime_and_cached()
if self._get_workspace() and self._cached():
- assert utils._is_main_process(), "Attempted to save workspace configuration from child process"
- #
# Note that this block can only happen in the
# main process, since `self._cached_success()` cannot
# be true when assembly is successful in the task.
@@ -1867,7 +1858,7 @@ class Element(Plugin):
# fetched_original (bool): Whether the original sources had been asked (and fetched) or not
#
def _fetch_done(self, fetched_original):
- assert utils._is_main_process(), "This has an impact on all elements and must be run in the main process"
+ assert utils._is_in_main_thread(), "This has an impact on all elements and must be run in the main thread"
self.__sources.fetch_done(fetched_original)
@@ -1913,7 +1904,7 @@ class Element(Plugin):
# This will result in updating the element state.
#
def _pull_done(self):
- assert utils._is_main_process(), "This has an impact on all elements and must be run in the main process"
+ assert utils._is_in_main_thread(), "This has an impact on all elements and must be run in the main thread"
self.__pull_done = True
@@ -2091,7 +2082,7 @@ class Element(Plugin):
# the workspaces metadata first.
#
def _open_workspace(self):
- assert utils._is_main_process(), "This writes to a global file and therefore must be run in the main process"
+ assert utils._is_in_main_thread(), "This writes to a global file and therefore must be run in the main thread"
context = self._get_context()
workspace = self._get_workspace()
@@ -2393,7 +2384,7 @@ class Element(Plugin):
# the appropriate counters.
#
def _update_ready_for_runtime_and_cached(self):
- assert utils._is_main_process(), "This has an impact on all elements and must be run in the main process"
+ assert utils._is_in_main_thread(), "This has an impact on all elements and must be run in the main thread"
if not self.__ready_for_runtime_and_cached:
if self.__runtime_deps_uncached == 0 and self.__cache_key and self._cached_success():
@@ -3139,7 +3130,7 @@ class Element(Plugin):
# in Scope.BUILD has changed in any way.
#
def __update_cache_keys(self):
- assert utils._is_main_process(), "This has an impact on all elements and must be run in the main process"
+ assert utils._is_in_main_thread(), "This has an impact on all elements and must be run in the main thread"
if self.__strict_cache_key is not None:
# Cache keys already calculated
@@ -3212,7 +3203,7 @@ class Element(Plugin):
# it can check whether an artifact exists for that cache key.
#
def __update_artifact_state(self):
- assert utils._is_main_process(), "This has an impact on all elements and must be run in the main process"
+ assert utils._is_in_main_thread(), "This has an impact on all elements and must be run in the main thread"
assert self.__artifact is None
context = self._get_context()
@@ -3247,7 +3238,7 @@ class Element(Plugin):
# a remote cache).
#
def __update_cache_key_non_strict(self):
- assert utils._is_main_process(), "This has an impact on all elements and must be run in the main process"
+ assert utils._is_in_main_thread(), "This has an impact on all elements and must be run in the main thread"
# The final cache key can be None here only in non-strict mode
if self.__cache_key is None:
diff --git a/src/buildstream/plugin.py b/src/buildstream/plugin.py
index 2b2382eb7..f98005644 100644
--- a/src/buildstream/plugin.py
+++ b/src/buildstream/plugin.py
@@ -110,14 +110,19 @@ Class Reference
"""
import itertools
+import multiprocessing
import os
+import pickle
+import queue
+import signal
import subprocess
import sys
-from contextlib import contextmanager
-from typing import Generator, Optional, Tuple, TYPE_CHECKING
+import traceback
+from contextlib import contextmanager, suppress
+from typing import Any, Callable, Generator, Optional, Sequence, Tuple, TypeVar, TYPE_CHECKING
from weakref import WeakValueDictionary
-from . import utils
+from . import utils, _signals
from ._exceptions import PluginError, ImplError
from ._message import Message, MessageType
from .node import Node, MappingNode
@@ -131,6 +136,36 @@ if TYPE_CHECKING:
# pylint: enable=cyclic-import
+T1 = TypeVar("T1")
+
+
+# _background_job_wrapper()
+#
+# Wrapper for running jobs in the background, transparently for users
+#
+# This method will put on the queue a response of the form:
+# (PickleError, OtherError, Result)
+#
+# Args:
+# result_queue: The queue in which to pass back the result
+# target: function to execute in the background
+# args: positional arguments to give to the target function
+#
+def _background_job_wrapper(result_queue: multiprocessing.Queue, target: Callable[..., T1], args: Any) -> None:
+ result = None
+
+ try:
+ result = target(*args)
+ result_queue.put((None, result))
+ except Exception as exc: # pylint: disable=broad-except
+ try:
+ # Here we send the result again, just in case it was a PickleError
+ # in which case the same exception would be thrown down
+ result_queue.put((exc, result))
+ except pickle.PickleError as exc:
+ result_queue.put((traceback.format_exc(), None))
+
+
class Plugin:
"""Plugin()
@@ -212,6 +247,18 @@ class Plugin:
# scheduling tasks.
__TABLE = WeakValueDictionary() # type: WeakValueDictionary[int, Plugin]
+ try:
+ __multiprocessing_context = multiprocessing.get_context("forkserver")
+ except ValueError:
+ # We are on a system without `forkserver` support. Let's default to
+ # spawn. This seems to be hanging however in some rare cases.
+ #
+ # Support is not as critical for now, since we do not work on
+ # platforms not supporting forkserver for now (mainly Windows)
+ #
+ # XXX: investigate why we sometimes get deadlocks there
+ __multiprocessing_context = multiprocessing.get_context("spawn")
+
def __init__(
self,
name: str,
@@ -509,6 +556,100 @@ class Plugin:
):
yield
+ def blocking_activity(
+ self,
+ target: Callable[..., T1],
+ args: Sequence[Any],
+ activity_name: str,
+ *,
+ detail: Optional[str] = None,
+ silent_nested: bool = False
+ ) -> T1:
+ """Execute a blocking activity in the background.
+
+ This is to execute potentially blocking methods in the background,
+ in order to avoid starving the scheduler.
+
+ The function, its arguments and return value must all be pickleable,
+ as it will be run in another process.
+
+ This should be used whenever there is a potential for a blocking
+ syscall to not return in a reasonable (<1s) amount of time.
+ For example, you would use this if you were doing a request to a
+ remote server, without a timeout.
+
+ Args:
+ target: the function to execute in the background
+ args: positional arguments to pass to the method to call
+ activity_name: The name of the activity
+ detail: An optional detailed message, can be multiline output
+ silent_nested: If specified, nested messages will be silenced
+
+ Returns:
+ the return value from `target`.
+ """
+ with self.__context.messenger.timed_activity(
+ activity_name, element_name=self._get_full_name(), detail=detail, silent_nested=silent_nested
+ ):
+ result_queue = self.__multiprocessing_context.Queue()
+ proc = None
+
+ def kill_proc():
+ if proc and proc.is_alive():
+ proc.kill()
+ proc.join()
+
+ def suspend_proc():
+ if proc and proc.is_alive():
+ with suppress(ProcessLookupError):
+ os.kill(proc.pid, signal.SIGSTOP)
+
+ def resume_proc():
+ if proc and proc.is_alive():
+ with suppress(ProcessLookupError):
+ os.kill(proc.pid, signal.SIGCONT)
+
+ with _signals.suspendable(suspend_proc, resume_proc), _signals.terminator(kill_proc):
+ proc = self.__multiprocessing_context.Process(
+ target=_background_job_wrapper, args=(result_queue, target, args)
+ )
+ proc.start()
+
+ should_continue = True
+ last_check = False
+
+ while should_continue or last_check:
+ last_check = False
+
+ try:
+ err, result = result_queue.get(timeout=1)
+ break
+ except queue.Empty:
+ if not proc.is_alive() and should_continue:
+ # Let's check one last time, just in case it stopped
+ # between our last check and now
+ last_check = True
+ should_continue = False
+ continue
+ else:
+ raise PluginError("Background process died with error code {}".format(proc.exitcode))
+
+ try:
+ proc.join(timeout=15)
+ proc.terminate()
+ except TimeoutError:
+ raise PluginError("Background process didn't exit after 15 seconds and got killed.")
+
+ if err is not None:
+ if isinstance(err, str):
+ # This was a pickle error, this is a bug
+ raise PluginError(
+ "An error happened while returning the result from a blocking activity", detail=err
+ )
+ raise err
+
+ return result
+
def call(self, *popenargs, fail: Optional[str] = None, fail_temporarily: bool = False, **kwargs) -> int:
"""A wrapper for subprocess.call()
diff --git a/src/buildstream/sandbox/_sandboxbuildboxrun.py b/src/buildstream/sandbox/_sandboxbuildboxrun.py
index 3d71b7440..1c187d7fd 100644
--- a/src/buildstream/sandbox/_sandboxbuildboxrun.py
+++ b/src/buildstream/sandbox/_sandboxbuildboxrun.py
@@ -184,17 +184,27 @@ class SandboxBuildBoxRun(SandboxREAPI):
try:
while True:
try:
- returncode = process.wait()
+ # Here, we don't use `process.wait()` directly without a timeout
+ # This is because, if we were to do that, and the process would never
+ # output anything, the control would never be given back to the python
+ # process, which might thus not be able to check for request to
+ # shutdown, or kill the process.
+ # We therefore loop with a timeout, to ensure the python process
+ # can act if it needs.
+ returncode = process.wait(timeout=1)
# If the process exits due to a signal, we
# brutally murder it to avoid zombies
if returncode < 0:
utils._kill_process_tree(process.pid)
+ except subprocess.TimeoutExpired:
+ continue
+
# Unlike in the bwrap case, here only the main
# process seems to receive the SIGINT. We pass
# on the signal to the child and then continue
# to wait.
- except KeyboardInterrupt:
+ except _signals.TerminateException:
process.send_signal(signal.SIGINT)
continue
diff --git a/src/buildstream/sandbox/_sandboxremote.py b/src/buildstream/sandbox/_sandboxremote.py
index 6cba7d611..2ac159337 100644
--- a/src/buildstream/sandbox/_sandboxremote.py
+++ b/src/buildstream/sandbox/_sandboxremote.py
@@ -26,7 +26,6 @@ from functools import partial
import grpc
-from .. import utils
from ..node import Node
from .._message import Message, MessageType
from ._sandboxreapi import SandboxREAPI
@@ -59,9 +58,6 @@ class SandboxRemote(SandboxREAPI):
if config is None:
return
- # gRPC doesn't support fork without exec, which is used in the main process.
- assert not utils._is_main_process()
-
self.storage_url = config.storage_service["url"]
self.exec_url = config.exec_service["url"]
diff --git a/src/buildstream/source.py b/src/buildstream/source.py
index e0a2db45d..3268d3a93 100644
--- a/src/buildstream/source.py
+++ b/src/buildstream/source.py
@@ -928,17 +928,14 @@ class Source(Plugin):
clean = node.strip_node_info()
to_modify = node.strip_node_info()
- current_ref = self.get_ref() # pylint: disable=assignment-from-no-return
-
# Set the ref regardless of whether it changed, the
# TrackQueue() will want to update a specific node with
# the ref, regardless of whether the original has changed.
self.set_ref(new_ref, to_modify)
- if current_ref == new_ref or not save:
- # Note: We do not look for and propagate changes at this point
- # which might result in desync depending if something changes about
- # tracking in the future. For now, this is quite safe.
+ # FIXME: this will save things too often, as a ref might not have
+ # changed. We should optimize this to detect it differently
+ if not save:
return False
# Ensure the node is not from a junction
diff --git a/src/buildstream/testing/_fixtures.py b/src/buildstream/testing/_fixtures.py
index 98778936d..095a5f6f1 100644
--- a/src/buildstream/testing/_fixtures.py
+++ b/src/buildstream/testing/_fixtures.py
@@ -16,29 +16,48 @@
# pylint: disable=redefined-outer-name
+import time
+
import psutil
import pytest
-from buildstream import node, utils
+from buildstream import node, DownloadableFileSource
+
+
+# Number of seconds to wait for background threads to exit.
+_AWAIT_THREADS_TIMEOUT_SECONDS = 5
+
+
+def has_no_unexpected_background_threads(expected_num_threads):
+ # Use psutil as threading.active_count() doesn't include gRPC threads.
+ process = psutil.Process()
+
+ wait = 0.1
+ for _ in range(0, int(_AWAIT_THREADS_TIMEOUT_SECONDS / wait)):
+ if process.num_threads() == expected_num_threads:
+ return True
+ time.sleep(wait)
+
+ return False
@pytest.fixture(autouse=True, scope="session")
def default_thread_number():
# xdist/execnet has its own helper thread.
- # Ignore that for `utils._is_single_threaded` checks.
- utils._INITIAL_NUM_THREADS_IN_MAIN_PROCESS = psutil.Process().num_threads()
+ return psutil.Process().num_threads()
# Catch tests that don't shut down background threads, which could then lead
# to other tests hanging when BuildStream uses fork().
@pytest.fixture(autouse=True)
def thread_check(default_thread_number):
- assert utils._is_single_threaded()
+ assert has_no_unexpected_background_threads(default_thread_number)
yield
- assert utils._is_single_threaded()
+ assert has_no_unexpected_background_threads(default_thread_number)
# Reset global state in node.pyx to improve test isolation
@pytest.fixture(autouse=True)
def reset_global_node_state():
node._reset_global_state()
+ DownloadableFileSource._reset_url_opener()
diff --git a/src/buildstream/utils.py b/src/buildstream/utils.py
index 9c6761ccc..04bbf261c 100644
--- a/src/buildstream/utils.py
+++ b/src/buildstream/utils.py
@@ -32,8 +32,9 @@ import signal
import stat
from stat import S_ISDIR
import subprocess
+from subprocess import TimeoutExpired
import tempfile
-import time
+import threading
import datetime
import itertools
from contextlib import contextmanager
@@ -60,16 +61,6 @@ BST_ARBITRARY_TIMESTAMP = calendar.timegm((2011, 11, 11, 11, 11, 11))
_ALIAS_SEPARATOR = ":"
_URI_SCHEMES = ["http", "https", "ftp", "file", "git", "sftp", "ssh"]
-# Main process pid
-_MAIN_PID = os.getpid()
-
-# The number of threads in the main process at startup.
-# This is 1 except for certain test environments (xdist/execnet).
-_INITIAL_NUM_THREADS_IN_MAIN_PROCESS = 1
-
-# Number of seconds to wait for background threads to exit.
-_AWAIT_THREADS_TIMEOUT_SECONDS = 5
-
# The process's file mode creation mask.
# Impossible to retrieve without temporarily changing it on POSIX.
_UMASK = os.umask(0o777)
@@ -868,13 +859,12 @@ def _pretty_size(size, dec_places=0):
return "{size:g}{unit}".format(size=round(psize, dec_places), unit=unit)
-# _is_main_process()
+# _is_in_main_thread()
#
-# Return whether we are in the main process or not.
+# Return whether we are running in the main thread or not
#
-def _is_main_process():
- assert _MAIN_PID is not None
- return os.getpid() == _MAIN_PID
+def _is_in_main_thread():
+ return threading.current_thread() is threading.main_thread()
# Remove a path and any empty directories leading up to it.
@@ -1392,7 +1382,20 @@ def _call(*popenargs, terminate=False, **kwargs):
process = subprocess.Popen( # pylint: disable=subprocess-popen-preexec-fn
*popenargs, preexec_fn=preexec_fn, universal_newlines=True, **kwargs
)
- output, _ = process.communicate()
+ # Here, we don't use `process.communicate()` directly without a timeout
+ # This is because, if we were to do that, and the process would never
+ # output anything, the control would never be given back to the python
+ # process, which might thus not be able to check for request to
+ # shutdown, or kill the process.
+ # We therefore loop with a timeout, to ensure the python process
+ # can act if it needs.
+ while True:
+ try:
+ output, _ = process.communicate(timeout=1)
+ break
+ except TimeoutExpired:
+ continue
+
exit_code = process.poll()
return (exit_code, output)
@@ -1549,21 +1552,6 @@ def _search_upward_for_files(directory, filenames):
directory = parent_dir
-# _deterministic_umask()
-#
-# Context managed to apply a umask to a section that may be affected by a users
-# umask. Restores old mask afterwards.
-#
-@contextmanager
-def _deterministic_umask():
- old_umask = os.umask(0o022)
-
- try:
- yield
- finally:
- os.umask(old_umask)
-
-
# _get_compression:
#
# Given a file name infer the compression
@@ -1601,30 +1589,6 @@ def _get_compression(tar):
return ""
-# _is_single_threaded()
-#
-# Return whether the current Process is single-threaded. Don't count threads
-# in the main process that were created by a test environment (xdist/execnet)
-# before BuildStream was executed.
-#
-def _is_single_threaded():
- # Use psutil as threading.active_count() doesn't include gRPC threads.
- process = psutil.Process()
-
- if process.pid == _MAIN_PID:
- expected_num_threads = _INITIAL_NUM_THREADS_IN_MAIN_PROCESS
- else:
- expected_num_threads = 1
-
- # gRPC threads are not joined when shut down. Wait for them to exit.
- wait = 0.1
- for _ in range(0, int(_AWAIT_THREADS_TIMEOUT_SECONDS / wait)):
- if process.num_threads() == expected_num_threads:
- return True
- time.sleep(wait)
- return False
-
-
# _parse_version():
#
# Args:
diff --git a/tests/artifactcache/pull.py b/tests/artifactcache/pull.py
index e6eaec960..63e6d9814 100644
--- a/tests/artifactcache/pull.py
+++ b/tests/artifactcache/pull.py
@@ -162,6 +162,7 @@ def test_pull_tree(cli, tmpdir, datafiles):
# Assert that we are not cached locally anymore
artifactcache.close_grpc_channels()
+ cas._casd_channel.request_shutdown()
cas.close_grpc_channels()
assert cli.get_element_state(project_dir, "target.bst") != "cached"
diff --git a/tests/internals/cascache.py b/tests/internals/cascache.py
index 043531c24..e27e40974 100644
--- a/tests/internals/cascache.py
+++ b/tests/internals/cascache.py
@@ -3,6 +3,7 @@ import time
from unittest.mock import MagicMock
from buildstream._cas.cascache import CASCache
+from buildstream._cas import casdprocessmanager
from buildstream._message import MessageType
from buildstream._messenger import Messenger
@@ -31,6 +32,10 @@ def test_report_when_cascache_exits_not_cleanly(tmp_path, monkeypatch):
dummy_buildbox_casd.write_text("#!/usr/bin/env sh\nwhile :\ndo\nsleep 60\ndone")
dummy_buildbox_casd.chmod(0o777)
monkeypatch.setenv("PATH", str(tmp_path), prepend=os.pathsep)
+ # FIXME: this is a hack, we should instead have a socket be created nicely
+ # on the fake casd script. This whole test suite probably would
+ # need some cleanup
+ monkeypatch.setattr(casdprocessmanager, "_CASD_TIMEOUT", 0.1)
messenger = MagicMock(spec_set=Messenger)
cache = CASCache(str(tmp_path.joinpath("casd")), casd=True, log_directory=str(tmp_path.joinpath("logs")))
@@ -50,6 +55,10 @@ def test_report_when_cascache_is_forcefully_killed(tmp_path, monkeypatch):
dummy_buildbox_casd.write_text("#!/usr/bin/env sh\ntrap 'echo hello' TERM\nwhile :\ndo\nsleep 60\ndone")
dummy_buildbox_casd.chmod(0o777)
monkeypatch.setenv("PATH", str(tmp_path), prepend=os.pathsep)
+ # FIXME: this is a hack, we should instead have a socket be created nicely
+ # on the fake casd script. This whole test suite probably would
+ # need some cleanup
+ monkeypatch.setattr(casdprocessmanager, "_CASD_TIMEOUT", 0.1)
messenger = MagicMock(spec_set=Messenger)
cache = CASCache(str(tmp_path.joinpath("casd")), casd=True, log_directory=str(tmp_path.joinpath("logs")))