summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTristan Maat <tristan.maat@codethink.co.uk>2019-10-15 11:38:29 +0100
committerTristan Maat <tristan.maat@codethink.co.uk>2019-11-12 11:19:00 +0000
commit7c6d27b5dcfd4434a759febeae9195995f44a304 (patch)
treee1e443624b697edf8a8a18de65948a5f8c2d8e2b
parentabaa1d1ec6cf858de0eac48271b0bff3d5b71139 (diff)
downloadbuildstream-7c6d27b5dcfd4434a759febeae9195995f44a304.tar.gz
casserver.py: Run buildbox-casd in `bst-artifact-server`
This is in preparation of a switch to directly calling buildbox-casd through the `bst-artifact-server`, rather than relying on BuildStream internals to do this. This should help untangle the codebase a little, since our CAS interfaces will all end up with a single goal, rather than trying to do both server-end and client-end things.
-rw-r--r--src/buildstream/_cas/casserver.py121
1 files changed, 120 insertions, 1 deletions
diff --git a/src/buildstream/_cas/casserver.py b/src/buildstream/_cas/casserver.py
index d5a29a33d..c0c62b033 100644
--- a/src/buildstream/_cas/casserver.py
+++ b/src/buildstream/_cas/casserver.py
@@ -18,13 +18,15 @@
# Jürg Billeter <juerg.billeter@codethink.co.uk>
from concurrent import futures
-from contextlib import contextmanager
from enum import Enum
+import contextlib
import logging
import os
import signal
+import subprocess
import sys
import tempfile
+import time
import uuid
import errno
@@ -78,6 +80,120 @@ class ClickLogLevel(click.Choice):
return LogLevel(super().convert(value, param, ctx))
+# CASdRunner():
+#
+# Manage a buildbox-casd process.
+#
+# FIXME: Probably better to replace this with the work from !1638
+#
+class CASdRunner:
+ def __init__(self, path: str, *, cache_quota: int = None, log_level: LogLevel = LogLevel.WARNING):
+ self.root = path
+ self.casdir = os.path.join(path, "cas")
+ self.tmpdir = os.path.join(path, "tmp")
+
+ self._casd_process = None
+ self._casd_socket_path = None
+ self._casd_socket_tempdir = None
+ self._log_level = log_level
+ self._quota = cache_quota
+
+ # start_casd():
+ #
+ # Start the CASd process.
+ #
+ def start_casd(self):
+ assert not self._casd_process, "CASd was already started"
+
+ os.makedirs(os.path.join(self.casdir, "refs", "heads"), exist_ok=True)
+ os.makedirs(os.path.join(self.casdir, "objects"), exist_ok=True)
+ os.makedirs(self.tmpdir, exist_ok=True)
+
+ # Place socket in global/user temporary directory to avoid hitting
+ # the socket path length limit.
+ self._casd_socket_tempdir = tempfile.mkdtemp(prefix="buildstream")
+ self._casd_socket_path = os.path.join(self._casd_socket_tempdir, "casd.sock")
+
+ casd_args = [utils.get_host_tool("buildbox-casd")]
+ casd_args.append("--bind=unix:" + self._casd_socket_path)
+ casd_args.append("--log-level=" + self._log_level.value)
+
+ if self._quota is not None:
+ casd_args.append("--quota-high={}".format(int(self._quota)))
+ casd_args.append("--quota-low={}".format(int(self._quota / 2)))
+
+ casd_args.append(self.root)
+
+ blocked_signals = signal.pthread_sigmask(signal.SIG_BLOCK, [signal.SIGINT])
+
+ try:
+ self._casd_process = subprocess.Popen(
+ casd_args,
+ cwd=self.root,
+ stdout=subprocess.PIPE,
+ stderr=subprocess.STDOUT,
+ )
+ finally:
+ signal.pthread_sigmask(signal.SIG_SETMASK, blocked_signals)
+
+ # stop():
+ #
+ # Stop and tear down the CASd process.
+ #
+ def stop(self):
+ return_code = self._casd_process.poll()
+
+ if return_code is not None:
+ self._casd_process = None
+ logging.error(
+ "Buildbox-casd died during the run. Exit code: %s", return_code
+ )
+ logging.error(self._casd_process.stdout.read().decode())
+ return
+
+ self._casd_process.terminate()
+
+ try:
+ return_code = self._casd_process.wait(timeout=0.5)
+ except subprocess.TimeoutExpired:
+ with contextlib.suppress():
+ try:
+ return_code = self._casd_process.wait(timeout=15)
+ except subprocess.TimeoutExpired:
+ self._casd_process.kill()
+ self._casd_process.wait(timeout=15)
+ logging.warning(
+ "Buildbox-casd didn't exit in time and has been killed"
+ )
+ logging.error(self._casd_process.stdout.read().decode())
+ self._casd_process = None
+ return
+
+ if return_code != 0:
+ logging.error(
+ "Buildbox-casd didn't exit cleanly. Exit code: %d", return_code
+ )
+ logging.error(self._casd_process.stdout.read().decode())
+
+ self._casd_process = None
+
+ # get_socket_path():
+ #
+ # Get the path to the socket of the CASd process - None if the
+ # process has not been started yet.
+ #
+ def get_socket_path(self) -> str:
+ assert self._casd_socket_path is not None, "CASd has not been started"
+ return self._casd_socket_path
+
+ # get_casdir():
+ #
+ # Get the path to the directory managed by CASd.
+ #
+ def get_casdir(self) -> str:
+ return self.casdir
+
+
# create_server():
#
# Create gRPC CAS artifact server as specified in the Remote Execution API.
@@ -96,6 +212,8 @@ def create_server(repo, *, enable_push, quota, index_only, log_level=LogLevel.WA
logger.addHandler(handler)
cas = CASCache(os.path.abspath(repo), cache_quota=quota, protect_session_blobs=False)
+ cas_runner = CASdRunner(os.path.abspath(repo), cache_quota=quota)
+ cas_runner.start_casd()
try:
artifactdir = os.path.join(os.path.abspath(repo), 'artifacts', 'refs')
@@ -137,6 +255,7 @@ def create_server(repo, *, enable_push, quota, index_only, log_level=LogLevel.WA
finally:
cas.release_resources()
+ cas_runner.stop()
@click.command(short_help="CAS Artifact Server")