diff options
author | Angelos Evripiotis <jevripiotis@bloomberg.net> | 2019-10-14 13:53:00 +0100 |
---|---|---|
committer | bst-marge-bot <marge-bot@buildstream.build> | 2019-11-22 16:52:26 +0000 |
commit | 44c9f09a1f6743775ef4fe7cde0e6820be3f4254 (patch) | |
tree | 41018b4fdc47e489fb2d1593cdd25277f250c183 | |
parent | 12de92f64d974ebd8d6c29f953796706305a6e69 (diff) | |
download | buildstream-44c9f09a1f6743775ef4fe7cde0e6820be3f4254.tar.gz |
Extract casd_channel logic to CASDChannel
Encapsulate the management of a connection to CASD, so we can hide the
details of how it happens. This will make it easier to port to Windows,
as we will have to take a different approach there.
Remove the _LimitedCASDProcessManagerProxy, as CASDChannel does
everything the child jobs will need.
Also make get_local_cas() public, since it is already used outside of
the CASCache class. Make get_cas() public to match.
-rw-r--r-- | src/buildstream/_cas/cascache.py | 89 | ||||
-rw-r--r-- | src/buildstream/_cas/casdprocessmanager.py | 89 | ||||
-rw-r--r-- | src/buildstream/_cas/casremote.py | 6 |
3 files changed, 113 insertions, 71 deletions
diff --git a/src/buildstream/_cas/cascache.py b/src/buildstream/_cas/cascache.py index 925c80bbc..e46396bc0 100644 --- a/src/buildstream/_cas/cascache.py +++ b/src/buildstream/_cas/cascache.py @@ -31,8 +31,8 @@ import time import grpc from .._protos.google.rpc import code_pb2 -from .._protos.build.bazel.remote.execution.v2 import remote_execution_pb2, remote_execution_pb2_grpc -from .._protos.build.buildgrid import local_cas_pb2, local_cas_pb2_grpc +from .._protos.build.bazel.remote.execution.v2 import remote_execution_pb2 +from .._protos.build.buildgrid import local_cas_pb2 from .. import _signals, utils from ..types import FastEnum @@ -73,29 +73,30 @@ class CASCache: os.makedirs(os.path.join(self.casdir, "objects"), exist_ok=True) os.makedirs(self.tmpdir, exist_ok=True) - self._casd_channel = None - self._casd_cas = None - self._local_cas = None self._cache_usage_monitor = None self._cache_usage_monitor_forbidden = False + self._casd_process_manager = None + self._casd_channel = None if casd: log_dir = os.path.join(self.casdir, "logs") self._casd_process_manager = CASDProcessManager( path, log_dir, log_level, cache_quota, protect_session_blobs ) + self._casd_channel = self._casd_process_manager.create_channel() self._cache_usage_monitor = _CASCacheUsageMonitor(self) - else: - self._casd_process_manager = None def __getstate__(self): state = self.__dict__.copy() - # Popen objects are not pickle-able, however, child processes only - # need some information from the manager, so we can use a proxy. + # Child jobs do not need to manage the CASD process, they only need a + # connection to CASD. if state["_casd_process_manager"] is not None: - state["_casd_process_manager"] = _LimitedCASDProcessManagerProxy(self._casd_process_manager) + state["_casd_process_manager"] = None + # In order to be pickle-able, the connection must be in the initial + # 'closed' state. + state["_casd_channel"] = self._casd_process_manager.create_channel() # The usage monitor is not pickle-able, but we also don't need it in # child processes currently. Make sure that if this changes, we get a @@ -107,43 +108,21 @@ class CASCache: return state - def _init_casd(self): - assert self._casd_process_manager, "CASCache was instantiated without buildbox-casd" - - if not self._casd_channel: - while not os.path.exists(self._casd_process_manager.socket_path): - # casd is not ready yet, try again after a 10ms delay, - # but don't wait for more than 15s - if time.time() > self._casd_process_manager.start_time + 15: - raise CASCacheError("Timed out waiting for buildbox-casd to become ready") - - time.sleep(0.01) - - self._casd_channel = grpc.insecure_channel("unix:" + self._casd_process_manager.socket_path) - self._casd_cas = remote_execution_pb2_grpc.ContentAddressableStorageStub(self._casd_channel) - self._local_cas = local_cas_pb2_grpc.LocalContentAddressableStorageStub(self._casd_channel) - - # Call GetCapabilities() to establish connection to casd - capabilities = remote_execution_pb2_grpc.CapabilitiesStub(self._casd_channel) - capabilities.GetCapabilities(remote_execution_pb2.GetCapabilitiesRequest()) - - # _get_cas(): + # get_cas(): # # Return ContentAddressableStorage stub for buildbox-casd channel. # - def _get_cas(self): - if not self._casd_cas: - self._init_casd() - return self._casd_cas + def get_cas(self): + assert self._casd_channel, "CASCache was created without a channel" + return self._casd_channel.get_cas() - # _get_local_cas(): + # get_local_cas(): # # Return LocalCAS stub for buildbox-casd channel. # - def _get_local_cas(self): - if not self._local_cas: - self._init_casd() - return self._local_cas + def get_local_cas(self): + assert self._casd_channel, "CASCache was created without a channel" + return self._casd_channel.get_local_cas() # preflight(): # @@ -161,7 +140,7 @@ class CASCache: # against fork() with open gRPC channels. # def has_open_grpc_channels(self): - return bool(self._casd_channel) + return self._casd_lazy_connection and not self._casd_lazy_connection.is_closed() # close_grpc_channels(): # @@ -169,10 +148,7 @@ class CASCache: # def close_grpc_channels(self): if self._casd_channel: - self._local_cas = None - self._casd_cas = None self._casd_channel.close() - self._casd_channel = None # release_resources(): # @@ -226,7 +202,7 @@ class CASCache: # Returns: True if the directory is available in the local cache # def contains_directory(self, digest, *, with_files): - local_cas = self._get_local_cas() + local_cas = self.get_local_cas() request = local_cas_pb2.FetchTreeRequest() request.root_digest.CopyFrom(digest) @@ -385,7 +361,7 @@ class CASCache: request.path.append(path) - local_cas = self._get_local_cas() + local_cas = self.get_local_cas() response = local_cas.CaptureFiles(request) @@ -412,7 +388,7 @@ class CASCache: # (Digest): The digest of the imported directory # def import_directory(self, path): - local_cas = self._get_local_cas() + local_cas = self.get_local_cas() request = local_cas_pb2.CaptureTreeRequest() request.path.append(path) @@ -532,7 +508,7 @@ class CASCache: # Returns: List of missing Digest objects # def remote_missing_blobs(self, remote, blobs): - cas = self._get_cas() + cas = self.get_cas() instance_name = remote.local_cas_instance_name missing_blobs = dict() @@ -1042,7 +1018,7 @@ class _CASCacheUsageMonitor: disk_usage = self._disk_usage disk_quota = self._disk_quota - local_cas = self.cas._get_local_cas() + local_cas = self.cas.get_local_cas() while True: try: @@ -1068,18 +1044,3 @@ def _grouper(iterable, n): except StopIteration: return yield itertools.chain([current], itertools.islice(iterable, n - 1)) - - -# _LimitedCASDProcessManagerProxy -# -# This can stand-in for an owning CASDProcessManager, for some functions. This -# is useful when pickling objects that contain a CASDProcessManager - as long -# as the lifetime of the original exceeds this proxy. -# -# Args: -# casd_process_manager (CASDProcessManager): The manager to proxy -# -class _LimitedCASDProcessManagerProxy: - def __init__(self, casd_process_manager): - self.socket_path = casd_process_manager.socket_path - self.start_time = casd_process_manager.start_time diff --git a/src/buildstream/_cas/casdprocessmanager.py b/src/buildstream/_cas/casdprocessmanager.py index 27426c907..d6e89831a 100644 --- a/src/buildstream/_cas/casdprocessmanager.py +++ b/src/buildstream/_cas/casdprocessmanager.py @@ -24,7 +24,13 @@ import subprocess import tempfile import time +import grpc + +from .._protos.build.bazel.remote.execution.v2 import remote_execution_pb2, remote_execution_pb2_grpc +from .._protos.build.buildgrid import local_cas_pb2_grpc + from .. import _signals, utils +from .._exceptions import CASCacheError from .._message import Message, MessageType _CASD_MAX_LOGFILES = 10 @@ -48,10 +54,11 @@ class CASDProcessManager: # Place socket in global/user temporary directory to avoid hitting # the socket path length limit. self._socket_tempdir = tempfile.mkdtemp(prefix="buildstream") - self.socket_path = os.path.join(self._socket_tempdir, "casd.sock") + self._socket_path = os.path.join(self._socket_tempdir, "casd.sock") + self._connection_string = "unix:" + self._socket_path casd_args = [utils.get_host_tool("buildbox-casd")] - casd_args.append("--bind=unix:" + self.socket_path) + casd_args.append("--bind=" + self._connection_string) casd_args.append("--log-level=" + log_level.value) if cache_quota is not None: @@ -63,7 +70,7 @@ class CASDProcessManager: casd_args.append(path) - self.start_time = time.time() + self._start_time = time.time() self._logfile = self._rotate_and_get_next_logfile() with open(self._logfile, "w") as logfile_fp: @@ -92,7 +99,7 @@ class CASDProcessManager: logfile_to_delete = existing_logs.pop(0) os.remove(os.path.join(self._log_dir, logfile_to_delete)) - return os.path.join(self._log_dir, str(self.start_time) + ".log") + return os.path.join(self._log_dir, str(self._start_time) + ".log") # release_resources() # @@ -154,3 +161,77 @@ class CASDProcessManager: "Buildbox-casd didn't exit cleanly. Exit code: {}, Logs: {}".format(return_code, self._logfile), ) ) + + # create_channel(): + # + # Return a CASDChannel, note that the actual connection is not necessarily + # established until it is needed. + # + def create_channel(self): + return CASDChannel(self._socket_path, self._connection_string, self._start_time) + + +class CASDChannel: + def __init__(self, socket_path, connection_string, start_time): + self._socket_path = socket_path + self._connection_string = connection_string + self._start_time = start_time + self._casd_channel = None + self._casd_cas = None + self._local_cas = None + + def _establish_connection(self): + assert self._casd_channel is None + + while not os.path.exists(self._socket_path): + # casd is not ready yet, try again after a 10ms delay, + # but don't wait for more than 15s + if time.time() > self._start_time + 15: + raise CASCacheError("Timed out waiting for buildbox-casd to become ready") + + time.sleep(0.01) + + self._casd_channel = grpc.insecure_channel(self._connection_string) + self._casd_cas = remote_execution_pb2_grpc.ContentAddressableStorageStub(self._casd_channel) + self._local_cas = local_cas_pb2_grpc.LocalContentAddressableStorageStub(self._casd_channel) + + # Call GetCapabilities() to establish connection to casd + capabilities = remote_execution_pb2_grpc.CapabilitiesStub(self._casd_channel) + capabilities.GetCapabilities(remote_execution_pb2.GetCapabilitiesRequest()) + + # get_cas(): + # + # Return ContentAddressableStorage stub for buildbox-casd channel. + # + def get_cas(self): + if self._casd_channel is None: + self._establish_connection() + return self._casd_cas + + # get_local_cas(): + # + # Return LocalCAS stub for buildbox-casd channel. + # + def get_local_cas(self): + if self._casd_channel is None: + self._establish_connection() + return self._local_cas + + # is_closed(): + # + # Return whether the channel is closed or not. + # + def is_closed(self): + return self._casd_channel is None + + # close(): + # + # Close the casd channel. + # + def close(self): + if self.is_closed(): + return + self._local_cas = None + self._casd_cas = None + self._casd_channel.close() + self._casd_channel = None diff --git a/src/buildstream/_cas/casremote.py b/src/buildstream/_cas/casremote.py index ee6f4679c..93f4e500c 100644 --- a/src/buildstream/_cas/casremote.py +++ b/src/buildstream/_cas/casremote.py @@ -53,7 +53,7 @@ class CASRemote(BaseRemote): # be called outside of init(). # def _configure_protocols(self): - local_cas = self.cascache._get_local_cas() + local_cas = self.cascache.get_local_cas() request = local_cas_pb2.GetInstanceNameForRemoteRequest() request.url = self.spec.url if self.spec.instance_name: @@ -113,7 +113,7 @@ class _CASBatchRead: if not self._requests: return - local_cas = self._remote.cascache._get_local_cas() + local_cas = self._remote.cascache.get_local_cas() for request in self._requests: batch_response = local_cas.FetchMissingBlobs(request) @@ -167,7 +167,7 @@ class _CASBatchUpdate: if not self._requests: return - local_cas = self._remote.cascache._get_local_cas() + local_cas = self._remote.cascache.get_local_cas() for request in self._requests: batch_response = local_cas.UploadMissingBlobs(request) |