summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAngelos Evripiotis <jevripiotis@bloomberg.net>2019-10-08 17:14:32 +0100
committerbst-marge-bot <marge-bot@buildstream.build>2019-11-22 16:52:26 +0000
commit4694dfadaf4d2f718d6b4148a000f5ffa7a69509 (patch)
tree8a172bc871846ddbea56b590109ba6360a83fa32
parentcdf15d0658cd57c2666963119b4e05c2ff815ff7 (diff)
downloadbuildstream-4694dfadaf4d2f718d6b4148a000f5ffa7a69509.tar.gz
cascache: extract CASDProcess in new module
Make it easier to specialize handling of the buildbox-casd process on Windows, by splitting it into it's own class. This allows us to encapsulate some decisions, and decreases the complexity of the CASCache class. Take some of the complexity out of this file by splitting the responsibility of managing the process out to another file.
-rw-r--r--src/buildstream/_cas/cascache.py170
-rw-r--r--src/buildstream/_cas/casdprocessmanager.py159
-rw-r--r--src/buildstream/_scheduler/scheduler.py12
-rw-r--r--src/buildstream/_stream.py2
4 files changed, 202 insertions, 141 deletions
diff --git a/src/buildstream/_cas/cascache.py b/src/buildstream/_cas/cascache.py
index c1f2b30b0..9b7dcffa7 100644
--- a/src/buildstream/_cas/cascache.py
+++ b/src/buildstream/_cas/cascache.py
@@ -25,10 +25,7 @@ import errno
import contextlib
import ctypes
import multiprocessing
-import shutil
import signal
-import subprocess
-import tempfile
import time
import grpc
@@ -40,8 +37,8 @@ from .._protos.build.buildgrid import local_cas_pb2, local_cas_pb2_grpc
from .. import _signals, utils
from ..types import FastEnum
from .._exceptions import CASCacheError
-from .._message import Message, MessageType
+from .casdprocessmanager import CASDProcessManager
from .casremote import _CASBatchRead, _CASBatchUpdate
_BUFFER_SIZE = 65536
@@ -50,8 +47,6 @@ _BUFFER_SIZE = 65536
# Refresh interval for disk usage of local cache in seconds
_CACHE_USAGE_REFRESH = 5
-_CASD_MAX_LOGFILES = 10
-
class CASLogLevel(FastEnum):
WARNING = "warning"
@@ -85,46 +80,22 @@ class CASCache:
self._cache_usage_monitor_forbidden = False
if casd:
- # Place socket in global/user temporary directory to avoid hitting
- # the socket path length limit.
- self._casd_socket_tempdir = tempfile.mkdtemp(prefix="buildstream")
- self._casd_socket_path = os.path.join(self._casd_socket_tempdir, "casd.sock")
-
- casd_args = [utils.get_host_tool("buildbox-casd")]
- casd_args.append("--bind=unix:" + self._casd_socket_path)
- casd_args.append("--log-level=" + log_level.value)
-
- if cache_quota is not None:
- casd_args.append("--quota-high={}".format(int(cache_quota)))
- casd_args.append("--quota-low={}".format(int(cache_quota / 2)))
-
- if protect_session_blobs:
- casd_args.append("--protect-session-blobs")
-
- casd_args.append(path)
-
- self._casd_start_time = time.time()
- self.casd_logfile = self._rotate_and_get_next_logfile()
-
- with open(self.casd_logfile, "w") as logfile_fp:
- # Block SIGINT on buildbox-casd, we don't need to stop it
- # The frontend will take care of it if needed
- with _signals.blocked([signal.SIGINT], ignore=False):
- self._casd_process = subprocess.Popen(
- casd_args, cwd=path, stdout=logfile_fp, stderr=subprocess.STDOUT
- )
+ log_dir = os.path.join(self.casdir, "logs")
+ self._casd_process_manager = CASDProcessManager(
+ path, log_dir, log_level, cache_quota, protect_session_blobs
+ )
self._cache_usage_monitor = _CASCacheUsageMonitor(self)
else:
- self._casd_process = None
+ self._casd_process_manager = None
def __getstate__(self):
state = self.__dict__.copy()
# Popen objects are not pickle-able, however, child processes only
- # need the information whether a casd subprocess was started or not.
- assert "_casd_process" in state
- state["_casd_process"] = bool(self._casd_process)
+ # need some information from the manager, so we can use a proxy.
+ if state["_casd_process_manager"] is not None:
+ state["_casd_process_manager"] = _LimitedCASDProcessManagerProxy(self._casd_process_manager)
# The usage monitor is not pickle-able, but we also don't need it in
# child processes currently. Make sure that if this changes, we get a
@@ -137,18 +108,18 @@ class CASCache:
return state
def _init_casd(self):
- assert self._casd_process, "CASCache was instantiated without buildbox-casd"
+ assert self._casd_process_manager, "CASCache was instantiated without buildbox-casd"
if not self._casd_channel:
- while not os.path.exists(self._casd_socket_path):
+ while not os.path.exists(self._casd_process_manager.socket_path):
# casd is not ready yet, try again after a 10ms delay,
# but don't wait for more than 15s
- if time.time() > self._casd_start_time + 15:
+ if time.time() > self._casd_process_manager.start_time + 15:
raise CASCacheError("Timed out waiting for buildbox-casd to become ready")
time.sleep(0.01)
- self._casd_channel = grpc.insecure_channel("unix:" + self._casd_socket_path)
+ self._casd_channel = grpc.insecure_channel("unix:" + self._casd_process_manager.socket_path)
self._casd_cas = remote_execution_pb2_grpc.ContentAddressableStorageStub(self._casd_channel)
self._local_cas = local_cas_pb2_grpc.LocalContentAddressableStorageStub(self._casd_channel)
@@ -211,10 +182,11 @@ class CASCache:
if self._cache_usage_monitor:
self._cache_usage_monitor.release_resources()
- if self._casd_process:
+ if self._casd_process_manager:
self.close_grpc_channels()
- self._terminate_casd_process(messenger)
- shutil.rmtree(self._casd_socket_tempdir)
+ self._casd_process_manager.terminate(messenger)
+ self._casd_process_manager.clean_up()
+ self._casd_process_manager = None
# contains():
#
@@ -701,30 +673,6 @@ class CASCache:
# Local Private Methods #
################################################
- # _rotate_and_get_next_logfile()
- #
- # Get the logfile to use for casd
- #
- # This will ensure that we don't create too many casd log files by
- # rotating the logs and only keeping _CASD_MAX_LOGFILES logs around.
- #
- # Returns:
- # (str): the path to the log file to use
- #
- def _rotate_and_get_next_logfile(self):
- log_dir = os.path.join(self.casdir, "logs")
-
- try:
- existing_logs = sorted(os.listdir(log_dir))
- except FileNotFoundError:
- os.makedirs(log_dir)
- else:
- while len(existing_logs) >= _CASD_MAX_LOGFILES:
- logfile_to_delete = existing_logs.pop(0)
- os.remove(os.path.join(log_dir, logfile_to_delete))
-
- return os.path.join(log_dir, str(self._casd_start_time) + ".log")
-
def _refpath(self, ref):
return os.path.join(self.casdir, "refs", "heads", ref)
@@ -993,67 +941,6 @@ class CASCache:
# Upload any blobs missing on the server
self.send_blobs(remote, missing_blobs)
- # _terminate_casd_process()
- #
- # Terminate the buildbox casd process
- #
- # Args:
- # messenger (buildstream._messenger.Messenger): Messenger to forward information to the frontend
- #
- def _terminate_casd_process(self, messenger=None):
- return_code = self._casd_process.poll()
-
- if return_code is not None:
- # buildbox-casd is already dead
- self._casd_process = None
-
- if messenger:
- messenger.message(
- Message(
- MessageType.BUG,
- "Buildbox-casd died during the run. Exit code: {}, Logs: {}".format(
- return_code, self.casd_logfile
- ),
- )
- )
- return
-
- self._casd_process.terminate()
-
- try:
- # Don't print anything if buildbox-casd terminates quickly
- return_code = self._casd_process.wait(timeout=0.5)
- except subprocess.TimeoutExpired:
- if messenger:
- cm = messenger.timed_activity("Terminating buildbox-casd")
- else:
- cm = contextlib.suppress()
- with cm:
- try:
- return_code = self._casd_process.wait(timeout=15)
- except subprocess.TimeoutExpired:
- self._casd_process.kill()
- self._casd_process.wait(timeout=15)
-
- if messenger:
- messenger.message(
- Message(MessageType.WARN, "Buildbox-casd didn't exit in time and has been killed")
- )
- self._casd_process = None
- return
-
- if return_code != 0 and messenger:
- messenger.message(
- Message(
- MessageType.BUG,
- "Buildbox-casd didn't exit cleanly. Exit code: {}, Logs: {}".format(
- return_code, self.casd_logfile
- ),
- )
- )
-
- self._casd_process = None
-
# get_cache_usage():
#
# Fetches the current usage of the CAS local cache.
@@ -1065,16 +952,16 @@ class CASCache:
assert not self._cache_usage_monitor_forbidden
return self._cache_usage_monitor.get_cache_usage()
- # get_casd_process()
+ # get_casd_process_manager()
#
# Get the underlying buildbox-casd process
#
# Returns:
# (subprocess.Process): The casd process that is used for the current cascache
#
- def get_casd_process(self):
- assert self._casd_process is not None, "This should only be called with a running buildbox-casd process"
- return self._casd_process
+ def get_casd_process_manager(self):
+ assert self._casd_process_manager is not None, "Only call this with a running buildbox-casd process"
+ return self._casd_process_manager
# _CASCacheUsage
@@ -1182,3 +1069,18 @@ def _grouper(iterable, n):
except StopIteration:
return
yield itertools.chain([current], itertools.islice(iterable, n - 1))
+
+
+# _LimitedCASDProcessManagerProxy
+#
+# This can stand-in for an owning CASDProcessManager, for some functions. This
+# is useful when pickling objects that contain a CASDProcessManager - as long
+# as the lifetime of the original exceeds this proxy.
+#
+# Args:
+# casd_process_manager (CASDProcessManager): The manager to proxy
+#
+class _LimitedCASDProcessManagerProxy:
+ def __init__(self, casd_process_manager):
+ self.socket_path = casd_process_manager.socket_path
+ self.start_time = casd_process_manager.start_time
diff --git a/src/buildstream/_cas/casdprocessmanager.py b/src/buildstream/_cas/casdprocessmanager.py
new file mode 100644
index 000000000..90857abd1
--- /dev/null
+++ b/src/buildstream/_cas/casdprocessmanager.py
@@ -0,0 +1,159 @@
+#
+# Copyright (C) 2018 Codethink Limited
+# Copyright (C) 2018-2019 Bloomberg Finance LP
+#
+# This program is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License as published by the Free Software Foundation; either
+# version 2 of the License, or (at your option) any later version.
+#
+# This library is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+# Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public
+# License along with this library. If not, see <http://www.gnu.org/licenses/>.
+#
+
+import contextlib
+import os
+import shutil
+import signal
+import subprocess
+import tempfile
+import time
+
+from .. import _signals, utils
+from .._message import Message, MessageType
+
+_CASD_MAX_LOGFILES = 10
+
+
+# CASDProcessManager
+#
+# This manages the subprocess that runs buildbox-casd.
+#
+# Args:
+# path (str): The root directory for the CAS repository
+# log_dir (str): The directory for the logs
+# log_level (LogLevel): Log level to give to buildbox-casd for logging
+# cache_quota (int): User configured cache quota
+# protect_session_blobs (bool): Disable expiry for blobs used in the current session
+#
+class CASDProcessManager:
+ def __init__(self, path, log_dir, log_level, cache_quota, protect_session_blobs):
+ self._log_dir = log_dir
+
+ # Place socket in global/user temporary directory to avoid hitting
+ # the socket path length limit.
+ self._socket_tempdir = tempfile.mkdtemp(prefix="buildstream")
+ self.socket_path = os.path.join(self._socket_tempdir, "casd.sock")
+
+ casd_args = [utils.get_host_tool("buildbox-casd")]
+ casd_args.append("--bind=unix:" + self.socket_path)
+ casd_args.append("--log-level=" + log_level.value)
+
+ if cache_quota is not None:
+ casd_args.append("--quota-high={}".format(int(cache_quota)))
+ casd_args.append("--quota-low={}".format(int(cache_quota / 2)))
+
+ if protect_session_blobs:
+ casd_args.append("--protect-session-blobs")
+
+ casd_args.append(path)
+
+ self.start_time = time.time()
+ self._logfile = self._rotate_and_get_next_logfile()
+
+ with open(self._logfile, "w") as logfile_fp:
+ # Block SIGINT on buildbox-casd, we don't need to stop it
+ # The frontend will take care of it if needed
+ with _signals.blocked([signal.SIGINT], ignore=False):
+ self.process = subprocess.Popen(casd_args, cwd=path, stdout=logfile_fp, stderr=subprocess.STDOUT)
+
+ # _rotate_and_get_next_logfile()
+ #
+ # Get the logfile to use for casd
+ #
+ # This will ensure that we don't create too many casd log files by
+ # rotating the logs and only keeping _CASD_MAX_LOGFILES logs around.
+ #
+ # Returns:
+ # (str): the path to the log file to use
+ #
+ def _rotate_and_get_next_logfile(self):
+ try:
+ existing_logs = sorted(os.listdir(self._log_dir))
+ except FileNotFoundError:
+ os.makedirs(self._log_dir)
+ else:
+ while len(existing_logs) >= _CASD_MAX_LOGFILES:
+ logfile_to_delete = existing_logs.pop(0)
+ os.remove(os.path.join(self._log_dir, logfile_to_delete))
+
+ return os.path.join(self._log_dir, str(self.start_time) + ".log")
+
+ # terminate()
+ #
+ # Terminate the buildbox casd process
+ #
+ # Args:
+ # messenger (buildstream._messenger.Messenger): Messenger to forward information to the frontend
+ #
+ def terminate(self, messenger=None):
+ return_code = self.process.poll()
+
+ if return_code is not None:
+ # buildbox-casd is already dead
+ self.process = None
+
+ if messenger:
+ messenger.message(
+ Message(
+ MessageType.BUG,
+ "Buildbox-casd died during the run. Exit code: {}, Logs: {}".format(
+ return_code, self._logfile
+ ),
+ )
+ )
+ return
+
+ self.process.terminate()
+
+ try:
+ # Don't print anything if buildbox-casd terminates quickly
+ return_code = self.process.wait(timeout=0.5)
+ except subprocess.TimeoutExpired:
+ if messenger:
+ cm = messenger.timed_activity("Terminating buildbox-casd")
+ else:
+ cm = contextlib.suppress()
+ with cm:
+ try:
+ return_code = self.process.wait(timeout=15)
+ except subprocess.TimeoutExpired:
+ self.process.kill()
+ self.process.wait(timeout=15)
+
+ if messenger:
+ messenger.message(
+ Message(MessageType.WARN, "Buildbox-casd didn't exit in time and has been killed")
+ )
+ self.process = None
+ return
+
+ if return_code != 0 and messenger:
+ messenger.message(
+ Message(
+ MessageType.BUG,
+ "Buildbox-casd didn't exit cleanly. Exit code: {}, Logs: {}".format(return_code, self._logfile),
+ )
+ )
+
+ # clean_up()
+ #
+ # After termination, clean up any additional resources
+ #
+ def clean_up(self):
+ shutil.rmtree(self._socket_tempdir)
diff --git a/src/buildstream/_scheduler/scheduler.py b/src/buildstream/_scheduler/scheduler.py
index 171281bd9..80bd9fb50 100644
--- a/src/buildstream/_scheduler/scheduler.py
+++ b/src/buildstream/_scheduler/scheduler.py
@@ -152,8 +152,8 @@ class Scheduler:
#
# Args:
# queues (list): A list of Queue objects
- # casd_processes (subprocess.Process): The subprocess which runs casd in order to be notified
- # of failures.
+ # casd_process_manager (cascache.CASDProcessManager): The subprocess which runs casd, in order to be notified
+ # of failures.
#
# Returns:
# (SchedStatus): How the scheduling terminated
@@ -163,7 +163,7 @@ class Scheduler:
# elements have been processed by each queue or when
# an error arises
#
- def run(self, queues, casd_process):
+ def run(self, queues, casd_process_manager):
# Hold on to the queues to process
self.queues = queues
@@ -183,9 +183,9 @@ class Scheduler:
self._connect_signals()
# Watch casd while running to ensure it doesn't die
- self._casd_process = casd_process
+ self._casd_process = casd_process_manager.process
_watcher = asyncio.get_child_watcher()
- _watcher.add_child_handler(casd_process.pid, self._abort_on_casd_failure)
+ _watcher.add_child_handler(self._casd_process.pid, self._abort_on_casd_failure)
# Start the profiler
with PROFILER.profile(Topics.SCHEDULER, "_".join(queue.action_name for queue in self.queues)):
@@ -195,7 +195,7 @@ class Scheduler:
self.loop.close()
# Stop watching casd
- _watcher.remove_child_handler(casd_process.pid)
+ _watcher.remove_child_handler(self._casd_process.pid)
self._casd_process = None
# Stop handling unix signals
diff --git a/src/buildstream/_stream.py b/src/buildstream/_stream.py
index 0a3495ce5..bab5cb100 100644
--- a/src/buildstream/_stream.py
+++ b/src/buildstream/_stream.py
@@ -1373,7 +1373,7 @@ class Stream:
if self._session_start_callback is not None:
self._session_start_callback()
- status = self._scheduler.run(self.queues, self._context.get_cascache().get_casd_process())
+ status = self._scheduler.run(self.queues, self._context.get_cascache().get_casd_process_manager())
if status == SchedStatus.ERROR:
raise StreamError()