# # Copyright (C) 2018 Codethink Limited # Copyright (C) 2018-2019 Bloomberg Finance LP # # This program is free software; you can redistribute it and/or # modify it under the terms of the GNU Lesser General Public # License as published by the Free Software Foundation; either # version 2 of the License, or (at your option) any later version. # # This library is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU # Lesser General Public License for more details. # # You should have received a copy of the GNU Lesser General Public # License along with this library. If not, see . # import contextlib import threading import os import random import shutil import signal import stat import subprocess import tempfile import time import psutil import grpc from .._protos.build.bazel.remote.asset.v1 import remote_asset_pb2_grpc from .._protos.build.bazel.remote.execution.v2 import remote_execution_pb2_grpc from .._protos.build.buildgrid import local_cas_pb2_grpc from .._protos.google.bytestream import bytestream_pb2_grpc from .. import _signals, utils from .._exceptions import CASCacheError _CASD_MAX_LOGFILES = 10 _CASD_TIMEOUT = 300 # in seconds # CASDProcessManager # # This manages the subprocess that runs buildbox-casd. # # Args: # path (str): The root directory for the CAS repository # log_dir (str): The directory for the logs # log_level (LogLevel): Log level to give to buildbox-casd for logging # cache_quota (int): User configured cache quota # protect_session_blobs (bool): Disable expiry for blobs used in the current session # class CASDProcessManager: def __init__(self, path, log_dir, log_level, cache_quota, protect_session_blobs): self._log_dir = log_dir self._socket_path = self._make_socket_path(path) self._connection_string = "unix:" + self._socket_path casd_args = [utils.get_host_tool("buildbox-casd")] casd_args.append("--bind=" + self._connection_string) casd_args.append("--log-level=" + log_level.value) if cache_quota is not None: casd_args.append("--quota-high={}".format(int(cache_quota))) casd_args.append("--quota-low={}".format(int(cache_quota / 2))) if protect_session_blobs: casd_args.append("--protect-session-blobs") casd_args.append(path) self._start_time = time.time() self._logfile = self._rotate_and_get_next_logfile() with open(self._logfile, "w") as logfile_fp: # Block SIGINT on buildbox-casd, we don't need to stop it # The frontend will take care of it if needed with _signals.blocked([signal.SIGINT], ignore=False): self.process = subprocess.Popen(casd_args, cwd=path, stdout=logfile_fp, stderr=subprocess.STDOUT) # _make_socket_path() # # Create a path to the CASD socket, ensuring that we don't exceed # the socket path limit. # # Note that we *may* exceed the path limit if the python-chosen # tmpdir path is very long, though this should be /tmp. # # Args: # path (str): The root directory for the CAS repository. # # Returns: # (str) - The path to the CASD socket. # def _make_socket_path(self, path): self._socket_tempdir = tempfile.mkdtemp(prefix="buildstream") # mkdtemp will create this directory in the "most secure" # way. This translates to "u+rwx,go-rwx". # # This is a good thing, generally, since it prevents us # from leaking sensitive information to other users, but # it's a problem for the workflow for userchroot, since # the setuid casd binary will not share a uid with the # user creating the tempdir. # # Instead, we chmod the directory 755, and only place a # symlink to the CAS directory in here, which will allow the # CASD process RWX access to a directory without leaking build # information. os.chmod( self._socket_tempdir, stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR | stat.S_IRGRP | stat.S_IXGRP | stat.S_IROTH | stat.S_IXOTH, ) os.symlink(path, os.path.join(self._socket_tempdir, "cas")) # FIXME: There is a potential race condition here; if multiple # instances happen to create the same socket path, at least # one will try to talk to the same server as us. # # There's no real way to avoid this from our side; we'd need # buildbox-casd to tell us that it could not create a fresh # socket. # # We could probably make this even safer by including some # thread/process-specific information, but we're not really # supporting this use case anyway; it's mostly here fore # testing, and to help more gracefully handle the situation. # # Note: this uses the same random string generation principle # as cpython, so this is probably a safe file name. random_name = "".join([random.choice("abcdefghijklmnopqrstuvwxyz0123456789_") for _ in range(8)]) socket_name = "casserver-{}.sock".format(random_name) return os.path.join(self._socket_tempdir, "cas", socket_name) # _rotate_and_get_next_logfile() # # Get the logfile to use for casd # # This will ensure that we don't create too many casd log files by # rotating the logs and only keeping _CASD_MAX_LOGFILES logs around. # # Returns: # (str): the path to the log file to use # def _rotate_and_get_next_logfile(self): try: existing_logs = sorted(os.listdir(self._log_dir)) except FileNotFoundError: os.makedirs(self._log_dir) else: while len(existing_logs) >= _CASD_MAX_LOGFILES: logfile_to_delete = existing_logs.pop(0) os.remove(os.path.join(self._log_dir, logfile_to_delete)) return os.path.join(self._log_dir, str(self._start_time) + ".log") # release_resources() # # Terminate the process and release related resources. # def release_resources(self, messenger=None): self._terminate(messenger) self.process = None shutil.rmtree(self._socket_tempdir) # _terminate() # # Terminate the buildbox casd process. # def _terminate(self, messenger=None): return_code = self.process.poll() if return_code is not None: # buildbox-casd is already dead if messenger: messenger.bug( "Buildbox-casd died during the run. Exit code: {}, Logs: {}".format(return_code, self._logfile) ) return self.process.terminate() try: # Don't print anything if buildbox-casd terminates quickly return_code = self.process.wait(timeout=0.5) except subprocess.TimeoutExpired: if messenger: cm = messenger.timed_activity("Terminating buildbox-casd") else: cm = contextlib.suppress() with cm: try: return_code = self.process.wait(timeout=15) except subprocess.TimeoutExpired: self.process.kill() self.process.wait(timeout=15) if messenger: messenger.warn("Buildbox-casd didn't exit in time and has been killed") return if return_code != 0 and messenger: messenger.bug( "Buildbox-casd didn't exit cleanly. Exit code: {}, Logs: {}".format(return_code, self._logfile) ) # create_channel(): # # Return a CASDChannel, note that the actual connection is not necessarily # established until it is needed. # def create_channel(self): return CASDChannel(self._socket_path, self._connection_string, self._start_time, self.process.pid) class CASDChannel: def __init__(self, socket_path, connection_string, start_time, casd_pid): self._socket_path = socket_path self._connection_string = connection_string self._start_time = start_time self._casd_channel = None self._bytestream = None self._casd_cas = None self._local_cas = None self._asset_fetch = None self._asset_push = None self._casd_pid = casd_pid self._shutdown_requested = False self._lock = threading.Lock() def _establish_connection(self): with self._lock: if self._casd_channel is not None: return while not os.path.exists(self._socket_path): # casd is not ready yet, try again after a 10ms delay, # but don't wait for more than specified timeout period if time.time() > self._start_time + _CASD_TIMEOUT: raise CASCacheError("Timed out waiting for buildbox-casd to become ready") if self._shutdown_requested: # Shutdown has been requested, we can exit return # check that process is still alive try: proc = psutil.Process(self._casd_pid) if proc.status() == psutil.STATUS_ZOMBIE: proc.wait() if not proc.is_running(): if self._shutdown_requested: return raise CASCacheError("buildbox-casd process died before connection could be established") except psutil.NoSuchProcess: if self._shutdown_requested: return raise CASCacheError("buildbox-casd process died before connection could be established") time.sleep(0.01) self._casd_channel = grpc.insecure_channel(self._connection_string) self._bytestream = bytestream_pb2_grpc.ByteStreamStub(self._casd_channel) self._casd_cas = remote_execution_pb2_grpc.ContentAddressableStorageStub(self._casd_channel) self._local_cas = local_cas_pb2_grpc.LocalContentAddressableStorageStub(self._casd_channel) self._asset_fetch = remote_asset_pb2_grpc.FetchStub(self._casd_channel) self._asset_push = remote_asset_pb2_grpc.PushStub(self._casd_channel) # get_cas(): # # Return ContentAddressableStorage stub for buildbox-casd channel. # def get_cas(self): if self._casd_channel is None: self._establish_connection() return self._casd_cas # get_local_cas(): # # Return LocalCAS stub for buildbox-casd channel. # def get_local_cas(self): if self._local_cas is None: self._establish_connection() return self._local_cas def get_bytestream(self): if self._bytestream is None: self._establish_connection() return self._bytestream # get_asset_fetch(): # # Return Remote Asset Fetch stub for buildbox-casd channel. # def get_asset_fetch(self): if self._casd_channel is None: self._establish_connection() return self._asset_fetch # get_asset_push(): # # Return Remote Asset Push stub for buildbox-casd channel. # def get_asset_push(self): if self._casd_channel is None: self._establish_connection() return self._asset_push # is_closed(): # # Return whether this connection is closed or not. # def is_closed(self): return self._casd_channel is None # request_shutdown(): # # Notify the channel that a shutdown of casd was requested. # # Thus we know that not being able to establish a connection is expected # and no error will be reported in that case. def request_shutdown(self) -> None: self._shutdown_requested = True # close(): # # Close the casd channel. # def close(self): assert self._shutdown_requested, "Please request shutdown before closing" with self._lock: if self.is_closed(): return self._asset_push = None self._asset_fetch = None self._local_cas = None self._casd_cas = None self._bytestream = None self._casd_channel.close() self._casd_channel = None