diff options
author | Tristan Maat <tristan.maat@codethink.co.uk> | 2019-10-15 11:38:29 +0100 |
---|---|---|
committer | Tristan Maat <tristan.maat@codethink.co.uk> | 2019-11-12 11:19:00 +0000 |
commit | 7c6d27b5dcfd4434a759febeae9195995f44a304 (patch) | |
tree | e1e443624b697edf8a8a18de65948a5f8c2d8e2b | |
parent | abaa1d1ec6cf858de0eac48271b0bff3d5b71139 (diff) | |
download | buildstream-7c6d27b5dcfd4434a759febeae9195995f44a304.tar.gz |
casserver.py: Run buildbox-casd in `bst-artifact-server`
This is in preparation of a switch to directly calling buildbox-casd
through the `bst-artifact-server`, rather than relying on BuildStream
internals to do this.
This should help untangle the codebase a little, since our CAS
interfaces will all end up with a single goal, rather than trying to
do both server-end and client-end things.
-rw-r--r-- | src/buildstream/_cas/casserver.py | 121 |
1 files changed, 120 insertions, 1 deletions
diff --git a/src/buildstream/_cas/casserver.py b/src/buildstream/_cas/casserver.py index d5a29a33d..c0c62b033 100644 --- a/src/buildstream/_cas/casserver.py +++ b/src/buildstream/_cas/casserver.py @@ -18,13 +18,15 @@ # Jürg Billeter <juerg.billeter@codethink.co.uk> from concurrent import futures -from contextlib import contextmanager from enum import Enum +import contextlib import logging import os import signal +import subprocess import sys import tempfile +import time import uuid import errno @@ -78,6 +80,120 @@ class ClickLogLevel(click.Choice): return LogLevel(super().convert(value, param, ctx)) +# CASdRunner(): +# +# Manage a buildbox-casd process. +# +# FIXME: Probably better to replace this with the work from !1638 +# +class CASdRunner: + def __init__(self, path: str, *, cache_quota: int = None, log_level: LogLevel = LogLevel.WARNING): + self.root = path + self.casdir = os.path.join(path, "cas") + self.tmpdir = os.path.join(path, "tmp") + + self._casd_process = None + self._casd_socket_path = None + self._casd_socket_tempdir = None + self._log_level = log_level + self._quota = cache_quota + + # start_casd(): + # + # Start the CASd process. + # + def start_casd(self): + assert not self._casd_process, "CASd was already started" + + os.makedirs(os.path.join(self.casdir, "refs", "heads"), exist_ok=True) + os.makedirs(os.path.join(self.casdir, "objects"), exist_ok=True) + os.makedirs(self.tmpdir, exist_ok=True) + + # Place socket in global/user temporary directory to avoid hitting + # the socket path length limit. + self._casd_socket_tempdir = tempfile.mkdtemp(prefix="buildstream") + self._casd_socket_path = os.path.join(self._casd_socket_tempdir, "casd.sock") + + casd_args = [utils.get_host_tool("buildbox-casd")] + casd_args.append("--bind=unix:" + self._casd_socket_path) + casd_args.append("--log-level=" + self._log_level.value) + + if self._quota is not None: + casd_args.append("--quota-high={}".format(int(self._quota))) + casd_args.append("--quota-low={}".format(int(self._quota / 2))) + + casd_args.append(self.root) + + blocked_signals = signal.pthread_sigmask(signal.SIG_BLOCK, [signal.SIGINT]) + + try: + self._casd_process = subprocess.Popen( + casd_args, + cwd=self.root, + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT, + ) + finally: + signal.pthread_sigmask(signal.SIG_SETMASK, blocked_signals) + + # stop(): + # + # Stop and tear down the CASd process. + # + def stop(self): + return_code = self._casd_process.poll() + + if return_code is not None: + self._casd_process = None + logging.error( + "Buildbox-casd died during the run. Exit code: %s", return_code + ) + logging.error(self._casd_process.stdout.read().decode()) + return + + self._casd_process.terminate() + + try: + return_code = self._casd_process.wait(timeout=0.5) + except subprocess.TimeoutExpired: + with contextlib.suppress(): + try: + return_code = self._casd_process.wait(timeout=15) + except subprocess.TimeoutExpired: + self._casd_process.kill() + self._casd_process.wait(timeout=15) + logging.warning( + "Buildbox-casd didn't exit in time and has been killed" + ) + logging.error(self._casd_process.stdout.read().decode()) + self._casd_process = None + return + + if return_code != 0: + logging.error( + "Buildbox-casd didn't exit cleanly. Exit code: %d", return_code + ) + logging.error(self._casd_process.stdout.read().decode()) + + self._casd_process = None + + # get_socket_path(): + # + # Get the path to the socket of the CASd process - None if the + # process has not been started yet. + # + def get_socket_path(self) -> str: + assert self._casd_socket_path is not None, "CASd has not been started" + return self._casd_socket_path + + # get_casdir(): + # + # Get the path to the directory managed by CASd. + # + def get_casdir(self) -> str: + return self.casdir + + # create_server(): # # Create gRPC CAS artifact server as specified in the Remote Execution API. @@ -96,6 +212,8 @@ def create_server(repo, *, enable_push, quota, index_only, log_level=LogLevel.WA logger.addHandler(handler) cas = CASCache(os.path.abspath(repo), cache_quota=quota, protect_session_blobs=False) + cas_runner = CASdRunner(os.path.abspath(repo), cache_quota=quota) + cas_runner.start_casd() try: artifactdir = os.path.join(os.path.abspath(repo), 'artifacts', 'refs') @@ -137,6 +255,7 @@ def create_server(repo, *, enable_push, quota, index_only, log_level=LogLevel.WA finally: cas.release_resources() + cas_runner.stop() @click.command(short_help="CAS Artifact Server") |