diff options
author | bst-marge-bot <marge-bot@buildstream.build> | 2019-11-22 18:40:26 +0000 |
---|---|---|
committer | bst-marge-bot <marge-bot@buildstream.build> | 2019-11-22 18:40:26 +0000 |
commit | 9aa48dd2ceb91d74c92154384a2c10bef6335c1f (patch) | |
tree | 2e20c8d5a63bc216111ec587acde8e1be885487d | |
parent | cdf15d0658cd57c2666963119b4e05c2ff815ff7 (diff) | |
parent | 315a3df6dfbadff97928ae0896512d069cbf9e65 (diff) | |
download | buildstream-9aa48dd2ceb91d74c92154384a2c10bef6335c1f.tar.gz |
Merge branch 'aevri/casdprocessmanager' into 'master'
cascache: refactor, extract CASDProcessManager and CASDConnection
See merge request BuildStream/buildstream!1638
-rw-r--r-- | src/buildstream/_cas/cascache.py | 233 | ||||
-rw-r--r-- | src/buildstream/_cas/casdprocessmanager.py | 233 | ||||
-rw-r--r-- | src/buildstream/_cas/casremote.py | 6 | ||||
-rw-r--r-- | src/buildstream/_scheduler/scheduler.py | 12 | ||||
-rw-r--r-- | src/buildstream/_stream.py | 2 |
5 files changed, 289 insertions, 197 deletions
diff --git a/src/buildstream/_cas/cascache.py b/src/buildstream/_cas/cascache.py index c1f2b30b0..98581d351 100644 --- a/src/buildstream/_cas/cascache.py +++ b/src/buildstream/_cas/cascache.py @@ -25,23 +25,20 @@ import errno import contextlib import ctypes import multiprocessing -import shutil import signal -import subprocess -import tempfile import time import grpc from .._protos.google.rpc import code_pb2 -from .._protos.build.bazel.remote.execution.v2 import remote_execution_pb2, remote_execution_pb2_grpc -from .._protos.build.buildgrid import local_cas_pb2, local_cas_pb2_grpc +from .._protos.build.bazel.remote.execution.v2 import remote_execution_pb2 +from .._protos.build.buildgrid import local_cas_pb2 from .. import _signals, utils from ..types import FastEnum from .._exceptions import CASCacheError -from .._message import Message, MessageType +from .casdprocessmanager import CASDProcessManager from .casremote import _CASBatchRead, _CASBatchUpdate _BUFFER_SIZE = 65536 @@ -50,8 +47,6 @@ _BUFFER_SIZE = 65536 # Refresh interval for disk usage of local cache in seconds _CACHE_USAGE_REFRESH = 5 -_CASD_MAX_LOGFILES = 10 - class CASLogLevel(FastEnum): WARNING = "warning" @@ -78,53 +73,35 @@ class CASCache: os.makedirs(os.path.join(self.casdir, "objects"), exist_ok=True) os.makedirs(self.tmpdir, exist_ok=True) - self._casd_channel = None - self._casd_cas = None - self._local_cas = None self._cache_usage_monitor = None self._cache_usage_monitor_forbidden = False + self._casd_process_manager = None + self._casd_channel = None if casd: - # Place socket in global/user temporary directory to avoid hitting - # the socket path length limit. - self._casd_socket_tempdir = tempfile.mkdtemp(prefix="buildstream") - self._casd_socket_path = os.path.join(self._casd_socket_tempdir, "casd.sock") - - casd_args = [utils.get_host_tool("buildbox-casd")] - casd_args.append("--bind=unix:" + self._casd_socket_path) - casd_args.append("--log-level=" + log_level.value) - - if cache_quota is not None: - casd_args.append("--quota-high={}".format(int(cache_quota))) - casd_args.append("--quota-low={}".format(int(cache_quota / 2))) - - if protect_session_blobs: - casd_args.append("--protect-session-blobs") - - casd_args.append(path) - - self._casd_start_time = time.time() - self.casd_logfile = self._rotate_and_get_next_logfile() - - with open(self.casd_logfile, "w") as logfile_fp: - # Block SIGINT on buildbox-casd, we don't need to stop it - # The frontend will take care of it if needed - with _signals.blocked([signal.SIGINT], ignore=False): - self._casd_process = subprocess.Popen( - casd_args, cwd=path, stdout=logfile_fp, stderr=subprocess.STDOUT - ) + log_dir = os.path.join(self.casdir, "logs") + self._casd_process_manager = CASDProcessManager( + path, log_dir, log_level, cache_quota, protect_session_blobs + ) - self._cache_usage_monitor = _CASCacheUsageMonitor(self) - else: - self._casd_process = None + self._casd_channel = self._casd_process_manager.create_channel() + self._cache_usage_monitor = _CASCacheUsageMonitor(self._casd_channel) def __getstate__(self): + # Note that we can't use jobpickler's + # 'get_state_for_child_job_pickling' protocol here, since CASCache's + # are passed to subprocesses other than child jobs. e.g. + # test.utils.ArtifactShare. + state = self.__dict__.copy() - # Popen objects are not pickle-able, however, child processes only - # need the information whether a casd subprocess was started or not. - assert "_casd_process" in state - state["_casd_process"] = bool(self._casd_process) + # Child jobs do not need to manage the CASD process, they only need a + # connection to CASD. + if state["_casd_process_manager"] is not None: + state["_casd_process_manager"] = None + # In order to be pickle-able, the connection must be in the initial + # 'closed' state. + state["_casd_channel"] = self._casd_process_manager.create_channel() # The usage monitor is not pickle-able, but we also don't need it in # child processes currently. Make sure that if this changes, we get a @@ -136,43 +113,21 @@ class CASCache: return state - def _init_casd(self): - assert self._casd_process, "CASCache was instantiated without buildbox-casd" - - if not self._casd_channel: - while not os.path.exists(self._casd_socket_path): - # casd is not ready yet, try again after a 10ms delay, - # but don't wait for more than 15s - if time.time() > self._casd_start_time + 15: - raise CASCacheError("Timed out waiting for buildbox-casd to become ready") - - time.sleep(0.01) - - self._casd_channel = grpc.insecure_channel("unix:" + self._casd_socket_path) - self._casd_cas = remote_execution_pb2_grpc.ContentAddressableStorageStub(self._casd_channel) - self._local_cas = local_cas_pb2_grpc.LocalContentAddressableStorageStub(self._casd_channel) - - # Call GetCapabilities() to establish connection to casd - capabilities = remote_execution_pb2_grpc.CapabilitiesStub(self._casd_channel) - capabilities.GetCapabilities(remote_execution_pb2.GetCapabilitiesRequest()) - - # _get_cas(): + # get_cas(): # # Return ContentAddressableStorage stub for buildbox-casd channel. # - def _get_cas(self): - if not self._casd_cas: - self._init_casd() - return self._casd_cas + def get_cas(self): + assert self._casd_channel, "CASCache was created without a channel" + return self._casd_channel.get_cas() - # _get_local_cas(): + # get_local_cas(): # # Return LocalCAS stub for buildbox-casd channel. # - def _get_local_cas(self): - if not self._local_cas: - self._init_casd() - return self._local_cas + def get_local_cas(self): + assert self._casd_channel, "CASCache was created without a channel" + return self._casd_channel.get_local_cas() # preflight(): # @@ -184,24 +139,13 @@ class CASCache: if not (os.path.isdir(headdir) and os.path.isdir(objdir)): raise CASCacheError("CAS repository check failed for '{}'".format(self.casdir)) - # has_open_grpc_channels(): - # - # Return whether there are gRPC channel instances. This is used to safeguard - # against fork() with open gRPC channels. - # - def has_open_grpc_channels(self): - return bool(self._casd_channel) - # close_grpc_channels(): # # Close the casd channel if it exists # def close_grpc_channels(self): if self._casd_channel: - self._local_cas = None - self._casd_cas = None self._casd_channel.close() - self._casd_channel = None # release_resources(): # @@ -211,10 +155,10 @@ class CASCache: if self._cache_usage_monitor: self._cache_usage_monitor.release_resources() - if self._casd_process: + if self._casd_process_manager: self.close_grpc_channels() - self._terminate_casd_process(messenger) - shutil.rmtree(self._casd_socket_tempdir) + self._casd_process_manager.release_resources(messenger) + self._casd_process_manager = None # contains(): # @@ -255,7 +199,7 @@ class CASCache: # Returns: True if the directory is available in the local cache # def contains_directory(self, digest, *, with_files): - local_cas = self._get_local_cas() + local_cas = self.get_local_cas() request = local_cas_pb2.FetchTreeRequest() request.root_digest.CopyFrom(digest) @@ -414,7 +358,7 @@ class CASCache: request.path.append(path) - local_cas = self._get_local_cas() + local_cas = self.get_local_cas() response = local_cas.CaptureFiles(request) @@ -441,7 +385,7 @@ class CASCache: # (Digest): The digest of the imported directory # def import_directory(self, path): - local_cas = self._get_local_cas() + local_cas = self.get_local_cas() request = local_cas_pb2.CaptureTreeRequest() request.path.append(path) @@ -561,7 +505,7 @@ class CASCache: # Returns: List of missing Digest objects # def remote_missing_blobs(self, remote, blobs): - cas = self._get_cas() + cas = self.get_cas() instance_name = remote.local_cas_instance_name missing_blobs = dict() @@ -701,30 +645,6 @@ class CASCache: # Local Private Methods # ################################################ - # _rotate_and_get_next_logfile() - # - # Get the logfile to use for casd - # - # This will ensure that we don't create too many casd log files by - # rotating the logs and only keeping _CASD_MAX_LOGFILES logs around. - # - # Returns: - # (str): the path to the log file to use - # - def _rotate_and_get_next_logfile(self): - log_dir = os.path.join(self.casdir, "logs") - - try: - existing_logs = sorted(os.listdir(log_dir)) - except FileNotFoundError: - os.makedirs(log_dir) - else: - while len(existing_logs) >= _CASD_MAX_LOGFILES: - logfile_to_delete = existing_logs.pop(0) - os.remove(os.path.join(log_dir, logfile_to_delete)) - - return os.path.join(log_dir, str(self._casd_start_time) + ".log") - def _refpath(self, ref): return os.path.join(self.casdir, "refs", "heads", ref) @@ -993,67 +913,6 @@ class CASCache: # Upload any blobs missing on the server self.send_blobs(remote, missing_blobs) - # _terminate_casd_process() - # - # Terminate the buildbox casd process - # - # Args: - # messenger (buildstream._messenger.Messenger): Messenger to forward information to the frontend - # - def _terminate_casd_process(self, messenger=None): - return_code = self._casd_process.poll() - - if return_code is not None: - # buildbox-casd is already dead - self._casd_process = None - - if messenger: - messenger.message( - Message( - MessageType.BUG, - "Buildbox-casd died during the run. Exit code: {}, Logs: {}".format( - return_code, self.casd_logfile - ), - ) - ) - return - - self._casd_process.terminate() - - try: - # Don't print anything if buildbox-casd terminates quickly - return_code = self._casd_process.wait(timeout=0.5) - except subprocess.TimeoutExpired: - if messenger: - cm = messenger.timed_activity("Terminating buildbox-casd") - else: - cm = contextlib.suppress() - with cm: - try: - return_code = self._casd_process.wait(timeout=15) - except subprocess.TimeoutExpired: - self._casd_process.kill() - self._casd_process.wait(timeout=15) - - if messenger: - messenger.message( - Message(MessageType.WARN, "Buildbox-casd didn't exit in time and has been killed") - ) - self._casd_process = None - return - - if return_code != 0 and messenger: - messenger.message( - Message( - MessageType.BUG, - "Buildbox-casd didn't exit cleanly. Exit code: {}, Logs: {}".format( - return_code, self.casd_logfile - ), - ) - ) - - self._casd_process = None - # get_cache_usage(): # # Fetches the current usage of the CAS local cache. @@ -1065,16 +924,16 @@ class CASCache: assert not self._cache_usage_monitor_forbidden return self._cache_usage_monitor.get_cache_usage() - # get_casd_process() + # get_casd_process_manager() # # Get the underlying buildbox-casd process # # Returns: # (subprocess.Process): The casd process that is used for the current cascache # - def get_casd_process(self): - assert self._casd_process is not None, "This should only be called with a running buildbox-casd process" - return self._casd_process + def get_casd_process_manager(self): + assert self._casd_process_manager is not None, "Only call this with a running buildbox-casd process" + return self._casd_process_manager # _CASCacheUsage @@ -1115,8 +974,8 @@ class _CASCacheUsage: # buildbox-casd. # class _CASCacheUsageMonitor: - def __init__(self, cas): - self.cas = cas + def __init__(self, connection): + self._connection = connection # Shared memory (64-bit signed integer) for current disk usage and quota self._disk_usage = multiprocessing.Value(ctypes.c_longlong, -1) @@ -1124,7 +983,7 @@ class _CASCacheUsageMonitor: # multiprocessing.Process will fork without exec on Unix. # This can't be allowed with background threads or open gRPC channels. - assert utils._is_single_threaded() and not cas.has_open_grpc_channels() + assert utils._is_single_threaded() and connection.is_closed() # Block SIGINT, we don't want to kill the process when we interrupt the frontend # and this process if very lightweight. @@ -1156,7 +1015,7 @@ class _CASCacheUsageMonitor: disk_usage = self._disk_usage disk_quota = self._disk_quota - local_cas = self.cas._get_local_cas() + local_cas = self._connection.get_local_cas() while True: try: diff --git a/src/buildstream/_cas/casdprocessmanager.py b/src/buildstream/_cas/casdprocessmanager.py new file mode 100644 index 000000000..e4a58d7d5 --- /dev/null +++ b/src/buildstream/_cas/casdprocessmanager.py @@ -0,0 +1,233 @@ +# +# Copyright (C) 2018 Codethink Limited +# Copyright (C) 2018-2019 Bloomberg Finance LP +# +# This program is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License as published by the Free Software Foundation; either +# version 2 of the License, or (at your option) any later version. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with this library. If not, see <http://www.gnu.org/licenses/>. +# + +import contextlib +import os +import shutil +import signal +import subprocess +import tempfile +import time + +import grpc + +from .._protos.build.bazel.remote.execution.v2 import remote_execution_pb2_grpc +from .._protos.build.buildgrid import local_cas_pb2_grpc + +from .. import _signals, utils +from .._exceptions import CASCacheError +from .._message import Message, MessageType + +_CASD_MAX_LOGFILES = 10 + + +# CASDProcessManager +# +# This manages the subprocess that runs buildbox-casd. +# +# Args: +# path (str): The root directory for the CAS repository +# log_dir (str): The directory for the logs +# log_level (LogLevel): Log level to give to buildbox-casd for logging +# cache_quota (int): User configured cache quota +# protect_session_blobs (bool): Disable expiry for blobs used in the current session +# +class CASDProcessManager: + def __init__(self, path, log_dir, log_level, cache_quota, protect_session_blobs): + self._log_dir = log_dir + + # Place socket in global/user temporary directory to avoid hitting + # the socket path length limit. + self._socket_tempdir = tempfile.mkdtemp(prefix="buildstream") + self._socket_path = os.path.join(self._socket_tempdir, "casd.sock") + self._connection_string = "unix:" + self._socket_path + + casd_args = [utils.get_host_tool("buildbox-casd")] + casd_args.append("--bind=" + self._connection_string) + casd_args.append("--log-level=" + log_level.value) + + if cache_quota is not None: + casd_args.append("--quota-high={}".format(int(cache_quota))) + casd_args.append("--quota-low={}".format(int(cache_quota / 2))) + + if protect_session_blobs: + casd_args.append("--protect-session-blobs") + + casd_args.append(path) + + self._start_time = time.time() + self._logfile = self._rotate_and_get_next_logfile() + + with open(self._logfile, "w") as logfile_fp: + # Block SIGINT on buildbox-casd, we don't need to stop it + # The frontend will take care of it if needed + with _signals.blocked([signal.SIGINT], ignore=False): + self.process = subprocess.Popen(casd_args, cwd=path, stdout=logfile_fp, stderr=subprocess.STDOUT) + + # _rotate_and_get_next_logfile() + # + # Get the logfile to use for casd + # + # This will ensure that we don't create too many casd log files by + # rotating the logs and only keeping _CASD_MAX_LOGFILES logs around. + # + # Returns: + # (str): the path to the log file to use + # + def _rotate_and_get_next_logfile(self): + try: + existing_logs = sorted(os.listdir(self._log_dir)) + except FileNotFoundError: + os.makedirs(self._log_dir) + else: + while len(existing_logs) >= _CASD_MAX_LOGFILES: + logfile_to_delete = existing_logs.pop(0) + os.remove(os.path.join(self._log_dir, logfile_to_delete)) + + return os.path.join(self._log_dir, str(self._start_time) + ".log") + + # release_resources() + # + # Terminate the process and release related resources. + # + def release_resources(self, messenger=None): + self._terminate(messenger) + self.process = None + shutil.rmtree(self._socket_tempdir) + + # _terminate() + # + # Terminate the buildbox casd process. + # + def _terminate(self, messenger=None): + return_code = self.process.poll() + + if return_code is not None: + # buildbox-casd is already dead + + if messenger: + messenger.message( + Message( + MessageType.BUG, + "Buildbox-casd died during the run. Exit code: {}, Logs: {}".format( + return_code, self._logfile + ), + ) + ) + return + + self.process.terminate() + + try: + # Don't print anything if buildbox-casd terminates quickly + return_code = self.process.wait(timeout=0.5) + except subprocess.TimeoutExpired: + if messenger: + cm = messenger.timed_activity("Terminating buildbox-casd") + else: + cm = contextlib.suppress() + with cm: + try: + return_code = self.process.wait(timeout=15) + except subprocess.TimeoutExpired: + self.process.kill() + self.process.wait(timeout=15) + + if messenger: + messenger.message( + Message(MessageType.WARN, "Buildbox-casd didn't exit in time and has been killed") + ) + return + + if return_code != 0 and messenger: + messenger.message( + Message( + MessageType.BUG, + "Buildbox-casd didn't exit cleanly. Exit code: {}, Logs: {}".format(return_code, self._logfile), + ) + ) + + # create_channel(): + # + # Return a CASDChannel, note that the actual connection is not necessarily + # established until it is needed. + # + def create_channel(self): + return CASDChannel(self._socket_path, self._connection_string, self._start_time) + + +class CASDChannel: + def __init__(self, socket_path, connection_string, start_time): + self._socket_path = socket_path + self._connection_string = connection_string + self._start_time = start_time + self._casd_channel = None + self._casd_cas = None + self._local_cas = None + + def _establish_connection(self): + assert self._casd_channel is None + + while not os.path.exists(self._socket_path): + # casd is not ready yet, try again after a 10ms delay, + # but don't wait for more than 15s + if time.time() > self._start_time + 15: + raise CASCacheError("Timed out waiting for buildbox-casd to become ready") + + time.sleep(0.01) + + self._casd_channel = grpc.insecure_channel(self._connection_string) + self._casd_cas = remote_execution_pb2_grpc.ContentAddressableStorageStub(self._casd_channel) + self._local_cas = local_cas_pb2_grpc.LocalContentAddressableStorageStub(self._casd_channel) + + # get_cas(): + # + # Return ContentAddressableStorage stub for buildbox-casd channel. + # + def get_cas(self): + if self._casd_channel is None: + self._establish_connection() + return self._casd_cas + + # get_local_cas(): + # + # Return LocalCAS stub for buildbox-casd channel. + # + def get_local_cas(self): + if self._casd_channel is None: + self._establish_connection() + return self._local_cas + + # is_closed(): + # + # Return whether this connection is closed or not. + # + def is_closed(self): + return self._casd_channel is None + + # close(): + # + # Close the casd channel. + # + def close(self): + if self.is_closed(): + return + self._local_cas = None + self._casd_cas = None + self._casd_channel.close() + self._casd_channel = None diff --git a/src/buildstream/_cas/casremote.py b/src/buildstream/_cas/casremote.py index ee6f4679c..93f4e500c 100644 --- a/src/buildstream/_cas/casremote.py +++ b/src/buildstream/_cas/casremote.py @@ -53,7 +53,7 @@ class CASRemote(BaseRemote): # be called outside of init(). # def _configure_protocols(self): - local_cas = self.cascache._get_local_cas() + local_cas = self.cascache.get_local_cas() request = local_cas_pb2.GetInstanceNameForRemoteRequest() request.url = self.spec.url if self.spec.instance_name: @@ -113,7 +113,7 @@ class _CASBatchRead: if not self._requests: return - local_cas = self._remote.cascache._get_local_cas() + local_cas = self._remote.cascache.get_local_cas() for request in self._requests: batch_response = local_cas.FetchMissingBlobs(request) @@ -167,7 +167,7 @@ class _CASBatchUpdate: if not self._requests: return - local_cas = self._remote.cascache._get_local_cas() + local_cas = self._remote.cascache.get_local_cas() for request in self._requests: batch_response = local_cas.UploadMissingBlobs(request) diff --git a/src/buildstream/_scheduler/scheduler.py b/src/buildstream/_scheduler/scheduler.py index 171281bd9..80bd9fb50 100644 --- a/src/buildstream/_scheduler/scheduler.py +++ b/src/buildstream/_scheduler/scheduler.py @@ -152,8 +152,8 @@ class Scheduler: # # Args: # queues (list): A list of Queue objects - # casd_processes (subprocess.Process): The subprocess which runs casd in order to be notified - # of failures. + # casd_process_manager (cascache.CASDProcessManager): The subprocess which runs casd, in order to be notified + # of failures. # # Returns: # (SchedStatus): How the scheduling terminated @@ -163,7 +163,7 @@ class Scheduler: # elements have been processed by each queue or when # an error arises # - def run(self, queues, casd_process): + def run(self, queues, casd_process_manager): # Hold on to the queues to process self.queues = queues @@ -183,9 +183,9 @@ class Scheduler: self._connect_signals() # Watch casd while running to ensure it doesn't die - self._casd_process = casd_process + self._casd_process = casd_process_manager.process _watcher = asyncio.get_child_watcher() - _watcher.add_child_handler(casd_process.pid, self._abort_on_casd_failure) + _watcher.add_child_handler(self._casd_process.pid, self._abort_on_casd_failure) # Start the profiler with PROFILER.profile(Topics.SCHEDULER, "_".join(queue.action_name for queue in self.queues)): @@ -195,7 +195,7 @@ class Scheduler: self.loop.close() # Stop watching casd - _watcher.remove_child_handler(casd_process.pid) + _watcher.remove_child_handler(self._casd_process.pid) self._casd_process = None # Stop handling unix signals diff --git a/src/buildstream/_stream.py b/src/buildstream/_stream.py index 0a3495ce5..bab5cb100 100644 --- a/src/buildstream/_stream.py +++ b/src/buildstream/_stream.py @@ -1373,7 +1373,7 @@ class Stream: if self._session_start_callback is not None: self._session_start_callback() - status = self._scheduler.run(self.queues, self._context.get_cascache().get_casd_process()) + status = self._scheduler.run(self.queues, self._context.get_cascache().get_casd_process_manager()) if status == SchedStatus.ERROR: raise StreamError() |