summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAngelos Evripiotis <jevripiotis@bloomberg.net>2019-10-08 17:14:32 +0100
committerAngelos Evripiotis <jevripiotis@bloomberg.net>2019-10-15 15:37:30 +0100
commit998942215c7a33227ead23bc182f379ef062c47a (patch)
treef11992cedec29aed14cc5295d0d75948cfb92209
parentd80a5f50975fa398749b172b4b729d163bde6f06 (diff)
downloadbuildstream-998942215c7a33227ead23bc182f379ef062c47a.tar.gz
cascache: extract CASDProcess in new module
Make it easier to specialize handling of the buildbox-casd process on Windows, by splitting it into it's own class. This allows us to encapsulate some decisions, and decreases the complexity of the CASCache class. Take some of the complexity out of this file by splitting the responsibility of managing the process out to another file.
-rw-r--r--src/buildstream/_cas/cascache.py149
-rw-r--r--src/buildstream/_cas/casdprocessmanager.py220
-rw-r--r--src/buildstream/_scheduler/scheduler.py22
-rw-r--r--src/buildstream/_stream.py2
4 files changed, 245 insertions, 148 deletions
diff --git a/src/buildstream/_cas/cascache.py b/src/buildstream/_cas/cascache.py
index 83b8e8539..7c37d0ee1 100644
--- a/src/buildstream/_cas/cascache.py
+++ b/src/buildstream/_cas/cascache.py
@@ -25,10 +25,7 @@ import errno
import contextlib
import ctypes
import multiprocessing
-import shutil
import signal
-import subprocess
-import tempfile
import time
import grpc
@@ -40,8 +37,8 @@ from .._protos.build.buildgrid import local_cas_pb2, local_cas_pb2_grpc
from .. import _signals, utils
from ..types import FastEnum
from .._exceptions import CASCacheError
-from .._message import Message, MessageType
+from .casdprocessmanager import CASDProcessManager
from .casremote import _CASBatchRead, _CASBatchUpdate
_BUFFER_SIZE = 65536
@@ -50,8 +47,6 @@ _BUFFER_SIZE = 65536
# Refresh interval for disk usage of local cache in seconds
_CACHE_USAGE_REFRESH = 5
-_CASD_MAX_LOGFILES = 10
-
class CASLogLevel(FastEnum):
WARNING = "warning"
@@ -80,35 +75,11 @@ class CASCache():
os.makedirs(self.tmpdir, exist_ok=True)
if casd:
- # Place socket in global/user temporary directory to avoid hitting
- # the socket path length limit.
- self._casd_socket_tempdir = tempfile.mkdtemp(prefix='buildstream')
- self._casd_socket_path = os.path.join(self._casd_socket_tempdir, 'casd.sock')
-
- casd_args = [utils.get_host_tool('buildbox-casd')]
- casd_args.append('--bind=unix:' + self._casd_socket_path)
- casd_args.append('--log-level=' + log_level.value)
-
- if cache_quota is not None:
- casd_args.append('--quota-high={}'.format(int(cache_quota)))
- casd_args.append('--quota-low={}'.format(int(cache_quota / 2)))
-
- if protect_session_blobs:
- casd_args.append('--protect-session-blobs')
-
- casd_args.append(path)
-
- self._casd_start_time = time.time()
- self.casd_logfile = self._rotate_and_get_next_logfile()
-
- with open(self.casd_logfile, "w") as logfile_fp:
- # Block SIGINT on buildbox-casd, we don't need to stop it
- # The frontend will take care of it if needed
- with _signals.blocked([signal.SIGINT], ignore=False):
- self._casd_process = subprocess.Popen(
- casd_args, cwd=path, stdout=logfile_fp, stderr=subprocess.STDOUT)
+ log_dir = os.path.join(self.casdir, "logs")
+ self._casd_process_manager = CASDProcessManager(
+ path, log_dir, log_level, cache_quota, protect_session_blobs)
else:
- self._casd_process = None
+ self._casd_process_manager = None
self._casd_channel = None
self._casd_cas = None
@@ -120,16 +91,16 @@ class CASCache():
# Popen objects are not pickle-able, however, child processes only
# need the information whether a casd subprocess was started or not.
- assert '_casd_process' in state
- state['_casd_process'] = bool(self._casd_process)
+ assert '_casd_process_manager' in state
+ state['_casd_process_manager'] = bool(self._casd_process_manager)
return state
def _init_casd(self):
- assert self._casd_process, "CASCache was instantiated without buildbox-casd"
+ assert self._casd_process_manager, "CASCache was instantiated without buildbox-casd"
if not self._casd_channel:
- self._casd_channel = grpc.insecure_channel('unix:' + self._casd_socket_path)
+ self._casd_channel = grpc.insecure_channel('unix:' + self._casd_process_manager.socket_path)
self._casd_cas = remote_execution_pb2_grpc.ContentAddressableStorageStub(self._casd_channel)
self._local_cas = local_cas_pb2_grpc.LocalContentAddressableStorageStub(self._casd_channel)
@@ -143,7 +114,7 @@ class CASCache():
if e.code() == grpc.StatusCode.UNAVAILABLE:
# casd is not ready yet, try again after a 10ms delay,
# but don't wait for more than 15s
- if time.time() < self._casd_start_time + 15:
+ if time.time() < self._casd_process_manager.start_time + 15:
time.sleep(1 / 100)
continue
@@ -204,10 +175,11 @@ class CASCache():
if self._cache_usage_monitor:
self._cache_usage_monitor.release_resources()
- if self._casd_process:
+ if self._casd_process_manager:
self.close_grpc_channels()
- self._terminate_casd_process(messenger)
- shutil.rmtree(self._casd_socket_tempdir)
+ self._casd_process_manager.terminate(messenger)
+ self._casd_process_manager.clean_up()
+ self._casd_process_manager = None
# contains():
#
@@ -684,30 +656,6 @@ class CASCache():
# Local Private Methods #
################################################
- # _rotate_and_get_next_logfile()
- #
- # Get the logfile to use for casd
- #
- # This will ensure that we don't create too many casd log files by
- # rotating the logs and only keeping _CASD_MAX_LOGFILES logs around.
- #
- # Returns:
- # (str): the path to the log file to use
- #
- def _rotate_and_get_next_logfile(self):
- log_dir = os.path.join(self.casdir, "logs")
-
- try:
- existing_logs = sorted(os.listdir(log_dir))
- except FileNotFoundError:
- os.makedirs(log_dir)
- else:
- while len(existing_logs) >= _CASD_MAX_LOGFILES:
- logfile_to_delete = existing_logs.pop(0)
- os.remove(os.path.join(log_dir, logfile_to_delete))
-
- return os.path.join(log_dir, str(self._casd_start_time) + ".log")
-
def _refpath(self, ref):
return os.path.join(self.casdir, 'refs', 'heads', ref)
@@ -976,67 +924,6 @@ class CASCache():
# Upload any blobs missing on the server
self.send_blobs(remote, missing_blobs)
- # _terminate_casd_process()
- #
- # Terminate the buildbox casd process
- #
- # Args:
- # messenger (buildstream._messenger.Messenger): Messenger to forward information to the frontend
- #
- def _terminate_casd_process(self, messenger=None):
- return_code = self._casd_process.poll()
-
- if return_code is not None:
- # buildbox-casd is already dead
- self._casd_process = None
-
- if messenger:
- messenger.message(
- Message(
- MessageType.BUG,
- "Buildbox-casd died during the run. Exit code: {}, Logs: {}".format(
- return_code, self.casd_logfile
- ),
- )
- )
- return
-
- self._casd_process.terminate()
-
- try:
- # Don't print anything if buildbox-casd terminates quickly
- return_code = self._casd_process.wait(timeout=0.5)
- except subprocess.TimeoutExpired:
- if messenger:
- cm = messenger.timed_activity("Terminating buildbox-casd")
- else:
- cm = contextlib.suppress()
- with cm:
- try:
- return_code = self._casd_process.wait(timeout=15)
- except subprocess.TimeoutExpired:
- self._casd_process.kill()
- self._casd_process.wait(timeout=15)
-
- if messenger:
- messenger.message(
- Message(MessageType.WARN, "Buildbox-casd didn't exit in time and has been killed")
- )
- self._casd_process = None
- return
-
- if return_code != 0 and messenger:
- messenger.message(
- Message(
- MessageType.BUG,
- "Buildbox-casd didn't exit cleanly. Exit code: {}, Logs: {}".format(
- return_code, self.casd_logfile
- ),
- )
- )
-
- self._casd_process = None
-
# get_cache_usage():
#
# Fetches the current usage of the CAS local cache.
@@ -1050,16 +937,16 @@ class CASCache():
return self._cache_usage_monitor.get_cache_usage()
- # get_casd_process()
+ # get_casd_process_manager()
#
# Get the underlying buildbox-casd process
#
# Returns:
# (subprocess.Process): The casd process that is used for the current cascache
#
- def get_casd_process(self):
- assert self._casd_process is not None, "This should only be called with a running buildbox-casd process"
- return self._casd_process
+ def get_casd_process_manager(self):
+ assert self._casd_process_manager is not None, "Only call this with a running buildbox-casd process"
+ return self._casd_process_manager
# _CASCacheUsage
diff --git a/src/buildstream/_cas/casdprocessmanager.py b/src/buildstream/_cas/casdprocessmanager.py
new file mode 100644
index 000000000..1ae5e8e62
--- /dev/null
+++ b/src/buildstream/_cas/casdprocessmanager.py
@@ -0,0 +1,220 @@
+#
+# Copyright (C) 2018 Codethink Limited
+# Copyright (C) 2018-2019 Bloomberg Finance LP
+#
+# This program is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License as published by the Free Software Foundation; either
+# version 2 of the License, or (at your option) any later version.
+#
+# This library is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+# Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public
+# License along with this library. If not, see <http://www.gnu.org/licenses/>.
+#
+
+import asyncio
+import contextlib
+import os
+import shutil
+import signal
+import subprocess
+import tempfile
+import time
+
+from .. import _signals, utils
+from .._message import Message, MessageType
+
+_CASD_MAX_LOGFILES = 10
+
+
+# CASDProcessManager
+#
+# This manages the subprocess that runs buildbox-casd.
+#
+# Args:
+# path (str): The root directory for the CAS repository
+# log_dir (str): The directory for the logs
+# log_level (LogLevel): Log level to give to buildbox-casd for logging
+# cache_quota (int): User configured cache quota
+# protect_session_blobs (bool): Disable expiry for blobs used in the current session
+#
+class CASDProcessManager:
+
+ def __init__(self, path, log_dir, log_level, cache_quota, protect_session_blobs):
+ self._log_dir = log_dir
+
+ # Place socket in global/user temporary directory to avoid hitting
+ # the socket path length limit.
+ self._socket_tempdir = tempfile.mkdtemp(prefix='buildstream')
+ self.socket_path = os.path.join(self._socket_tempdir, 'casd.sock')
+
+ casd_args = [utils.get_host_tool('buildbox-casd')]
+ casd_args.append('--bind=unix:' + self.socket_path)
+ casd_args.append('--log-level=' + log_level.value)
+
+ if cache_quota is not None:
+ casd_args.append('--quota-high={}'.format(int(cache_quota)))
+ casd_args.append('--quota-low={}'.format(int(cache_quota / 2)))
+
+ if protect_session_blobs:
+ casd_args.append('--protect-session-blobs')
+
+ casd_args.append(path)
+
+ self.start_time = time.time()
+ self.logfile = self._rotate_and_get_next_logfile()
+
+ with open(self.logfile, "w") as logfile_fp:
+ # Block SIGINT on buildbox-casd, we don't need to stop it
+ # The frontend will take care of it if needed
+ with _signals.blocked([signal.SIGINT], ignore=False):
+ self._process = subprocess.Popen(
+ casd_args, cwd=path, stdout=logfile_fp, stderr=subprocess.STDOUT)
+
+ self._failure_callback = None
+ self._watcher = None
+
+ # _rotate_and_get_next_logfile()
+ #
+ # Get the logfile to use for casd
+ #
+ # This will ensure that we don't create too many casd log files by
+ # rotating the logs and only keeping _CASD_MAX_LOGFILES logs around.
+ #
+ # Returns:
+ # (str): the path to the log file to use
+ #
+ def _rotate_and_get_next_logfile(self):
+ try:
+ existing_logs = sorted(os.listdir(self._log_dir))
+ except FileNotFoundError:
+ os.makedirs(self._log_dir)
+ else:
+ while len(existing_logs) >= _CASD_MAX_LOGFILES:
+ logfile_to_delete = existing_logs.pop(0)
+ os.remove(os.path.join(self._log_dir, logfile_to_delete))
+
+ return os.path.join(self._log_dir, str(self.start_time) + ".log")
+
+ # terminate()
+ #
+ # Terminate the buildbox casd process
+ #
+ # Args:
+ # messenger (buildstream._messenger.Messenger): Messenger to forward information to the frontend
+ #
+ def terminate(self, messenger=None):
+ assert self._watcher is None
+ assert self._failure_callback is None
+
+ return_code = self._process.poll()
+
+ if return_code is not None:
+ # buildbox-casd is already dead
+ self._process = None
+
+ if messenger:
+ messenger.message(
+ Message(
+ MessageType.BUG,
+ "Buildbox-casd died during the run. Exit code: {}, Logs: {}".format(
+ return_code, self.logfile
+ ),
+ )
+ )
+ return
+
+ self._process.terminate()
+
+ try:
+ # Don't print anything if buildbox-casd terminates quickly
+ return_code = self._process.wait(timeout=0.5)
+ except subprocess.TimeoutExpired:
+ if messenger:
+ cm = messenger.timed_activity("Terminating buildbox-casd")
+ else:
+ cm = contextlib.suppress()
+ with cm:
+ try:
+ return_code = self._process.wait(timeout=15)
+ except subprocess.TimeoutExpired:
+ self._process.kill()
+ self._process.wait(timeout=15)
+
+ if messenger:
+ messenger.message(
+ Message(MessageType.WARN, "Buildbox-casd didn't exit in time and has been killed")
+ )
+ self._process = None
+ return
+
+ if return_code != 0 and messenger:
+ messenger.message(
+ Message(
+ MessageType.BUG,
+ "Buildbox-casd didn't exit cleanly. Exit code: {}, Logs: {}".format(
+ return_code, self.logfile
+ ),
+ )
+ )
+
+ # clean_up()
+ #
+ # After termination, clean up any additional resources
+ #
+ def clean_up(self):
+ shutil.rmtree(self._socket_tempdir)
+
+ # set_failure_callback()
+ #
+ # Call this function if the CASD process stops unexpectedly.
+ #
+ # Note that we guarantee that the lifetime of any 'watcher' used is bound
+ # to the lifetime of the callback - we won't hang on to the asyncio loop
+ # longer than necessary.
+ #
+ # We won't be able to use watchers on win32, so we'll need to support
+ # another approach.
+ #
+ # Args:
+ # func (callable): a callable that takes no parameters
+ #
+ def set_failure_callback(self, func):
+ assert func is not None
+ assert self._watcher is None
+ assert self._failure_callback is None, "We only support one callback for now"
+ self._failure_callback = func
+ self._watcher = asyncio.get_child_watcher()
+ self._watcher.add_child_handler(self._process.pid, self._on_casd_failure)
+
+ # clear_failure_callback()
+ #
+ # No longer call this callable if the CASD process stops unexpectedly
+ #
+ # Args:
+ # func (callable): The callable that was provided to add_failure_callback().
+ # Supplying this again allows us to do error checking.
+ #
+ def clear_failure_callback(self, func):
+ assert func is not None
+ assert self._failure_callback == func, "We only support one callback for now"
+ self._watcher.remove_child_handler(self._process.pid)
+ self._failure_callback = None
+ self._watcher = None
+
+ # _on_casd_failure()
+ #
+ # Handler for casd process terminating unexpectedly
+ #
+ # Args:
+ # pid (int): the process id under which buildbox-casd was running
+ # returncode (int): the return code with which buildbox-casd exited
+ #
+ def _on_casd_failure(self, pid, returncode):
+ assert self._failure_callback is not None
+ self._process.returncode = returncode
+ self._failure_callback()
diff --git a/src/buildstream/_scheduler/scheduler.py b/src/buildstream/_scheduler/scheduler.py
index 4c648d251..24086be83 100644
--- a/src/buildstream/_scheduler/scheduler.py
+++ b/src/buildstream/_scheduler/scheduler.py
@@ -138,7 +138,6 @@ class Scheduler():
self._suspendtime = None # Session time compensation for suspended state
self._queue_jobs = True # Whether we should continue to queue jobs
self._state = state
- self._casd_process = None # handle to the casd process for monitoring purpose
# Bidirectional queue to send notifications back to the Scheduler's owner
self._notification_queue = notification_queue
@@ -152,8 +151,8 @@ class Scheduler():
#
# Args:
# queues (list): A list of Queue objects
- # casd_processes (subprocess.Process): The subprocess which runs casd in order to be notified
- # of failures.
+ # casd_process_manager (cascache.CASDProcessManager): The subprocess which runs casd, in order to be notified
+ # of failures.
#
# Returns:
# (SchedStatus): How the scheduling terminated
@@ -163,7 +162,7 @@ class Scheduler():
# elements have been processed by each queue or when
# an error arises
#
- def run(self, queues, casd_process):
+ def run(self, queues, casd_process_manager):
# Hold on to the queues to process
self.queues = queues
@@ -183,9 +182,7 @@ class Scheduler():
self._connect_signals()
# Watch casd while running to ensure it doesn't die
- self._casd_process = casd_process
- _watcher = asyncio.get_child_watcher()
- _watcher.add_child_handler(casd_process.pid, self._abort_on_casd_failure)
+ casd_process_manager.set_failure_callback(self._abort_on_casd_failure)
# Start the profiler
with PROFILER.profile(Topics.SCHEDULER, "_".join(queue.action_name for queue in self.queues)):
@@ -195,8 +192,7 @@ class Scheduler():
self.loop.close()
# Stop watching casd
- _watcher.remove_child_handler(casd_process.pid)
- self._casd_process = None
+ casd_process_manager.clear_failure_callback(self._abort_on_casd_failure)
# Stop handling unix signals
self._disconnect_signals()
@@ -338,15 +334,9 @@ class Scheduler():
# This will terminate immediately all jobs, since buildbox-casd is dead,
# we can't do anything with them anymore.
#
- # Args:
- # pid (int): the process id under which buildbox-casd was running
- # returncode (int): the return code with which buildbox-casd exited
- #
- def _abort_on_casd_failure(self, pid, returncode):
+ def _abort_on_casd_failure(self):
message = Message(MessageType.BUG, "buildbox-casd died while the pipeline was active.")
self._notify(Notification(NotificationType.MESSAGE, message=message))
-
- self._casd_process.returncode = returncode
self.terminate_jobs()
# _start_job()
diff --git a/src/buildstream/_stream.py b/src/buildstream/_stream.py
index 6e4e5caec..500adb8e9 100644
--- a/src/buildstream/_stream.py
+++ b/src/buildstream/_stream.py
@@ -1375,7 +1375,7 @@ class Stream():
if self._session_start_callback is not None:
self._session_start_callback()
- status = self._scheduler.run(self.queues, self._context.get_cascache().get_casd_process())
+ status = self._scheduler.run(self.queues, self._context.get_cascache().get_casd_process_manager())
if status == SchedStatus.ERROR:
raise StreamError()