summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAngelos Evripiotis <jevripiotis@bloomberg.net>2019-10-14 13:53:00 +0100
committerbst-marge-bot <marge-bot@buildstream.build>2019-11-22 16:52:26 +0000
commit44c9f09a1f6743775ef4fe7cde0e6820be3f4254 (patch)
tree41018b4fdc47e489fb2d1593cdd25277f250c183
parent12de92f64d974ebd8d6c29f953796706305a6e69 (diff)
downloadbuildstream-44c9f09a1f6743775ef4fe7cde0e6820be3f4254.tar.gz
Extract casd_channel logic to CASDChannel
Encapsulate the management of a connection to CASD, so we can hide the details of how it happens. This will make it easier to port to Windows, as we will have to take a different approach there. Remove the _LimitedCASDProcessManagerProxy, as CASDChannel does everything the child jobs will need. Also make get_local_cas() public, since it is already used outside of the CASCache class. Make get_cas() public to match.
-rw-r--r--src/buildstream/_cas/cascache.py89
-rw-r--r--src/buildstream/_cas/casdprocessmanager.py89
-rw-r--r--src/buildstream/_cas/casremote.py6
3 files changed, 113 insertions, 71 deletions
diff --git a/src/buildstream/_cas/cascache.py b/src/buildstream/_cas/cascache.py
index 925c80bbc..e46396bc0 100644
--- a/src/buildstream/_cas/cascache.py
+++ b/src/buildstream/_cas/cascache.py
@@ -31,8 +31,8 @@ import time
import grpc
from .._protos.google.rpc import code_pb2
-from .._protos.build.bazel.remote.execution.v2 import remote_execution_pb2, remote_execution_pb2_grpc
-from .._protos.build.buildgrid import local_cas_pb2, local_cas_pb2_grpc
+from .._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
+from .._protos.build.buildgrid import local_cas_pb2
from .. import _signals, utils
from ..types import FastEnum
@@ -73,29 +73,30 @@ class CASCache:
os.makedirs(os.path.join(self.casdir, "objects"), exist_ok=True)
os.makedirs(self.tmpdir, exist_ok=True)
- self._casd_channel = None
- self._casd_cas = None
- self._local_cas = None
self._cache_usage_monitor = None
self._cache_usage_monitor_forbidden = False
+ self._casd_process_manager = None
+ self._casd_channel = None
if casd:
log_dir = os.path.join(self.casdir, "logs")
self._casd_process_manager = CASDProcessManager(
path, log_dir, log_level, cache_quota, protect_session_blobs
)
+ self._casd_channel = self._casd_process_manager.create_channel()
self._cache_usage_monitor = _CASCacheUsageMonitor(self)
- else:
- self._casd_process_manager = None
def __getstate__(self):
state = self.__dict__.copy()
- # Popen objects are not pickle-able, however, child processes only
- # need some information from the manager, so we can use a proxy.
+ # Child jobs do not need to manage the CASD process, they only need a
+ # connection to CASD.
if state["_casd_process_manager"] is not None:
- state["_casd_process_manager"] = _LimitedCASDProcessManagerProxy(self._casd_process_manager)
+ state["_casd_process_manager"] = None
+ # In order to be pickle-able, the connection must be in the initial
+ # 'closed' state.
+ state["_casd_channel"] = self._casd_process_manager.create_channel()
# The usage monitor is not pickle-able, but we also don't need it in
# child processes currently. Make sure that if this changes, we get a
@@ -107,43 +108,21 @@ class CASCache:
return state
- def _init_casd(self):
- assert self._casd_process_manager, "CASCache was instantiated without buildbox-casd"
-
- if not self._casd_channel:
- while not os.path.exists(self._casd_process_manager.socket_path):
- # casd is not ready yet, try again after a 10ms delay,
- # but don't wait for more than 15s
- if time.time() > self._casd_process_manager.start_time + 15:
- raise CASCacheError("Timed out waiting for buildbox-casd to become ready")
-
- time.sleep(0.01)
-
- self._casd_channel = grpc.insecure_channel("unix:" + self._casd_process_manager.socket_path)
- self._casd_cas = remote_execution_pb2_grpc.ContentAddressableStorageStub(self._casd_channel)
- self._local_cas = local_cas_pb2_grpc.LocalContentAddressableStorageStub(self._casd_channel)
-
- # Call GetCapabilities() to establish connection to casd
- capabilities = remote_execution_pb2_grpc.CapabilitiesStub(self._casd_channel)
- capabilities.GetCapabilities(remote_execution_pb2.GetCapabilitiesRequest())
-
- # _get_cas():
+ # get_cas():
#
# Return ContentAddressableStorage stub for buildbox-casd channel.
#
- def _get_cas(self):
- if not self._casd_cas:
- self._init_casd()
- return self._casd_cas
+ def get_cas(self):
+ assert self._casd_channel, "CASCache was created without a channel"
+ return self._casd_channel.get_cas()
- # _get_local_cas():
+ # get_local_cas():
#
# Return LocalCAS stub for buildbox-casd channel.
#
- def _get_local_cas(self):
- if not self._local_cas:
- self._init_casd()
- return self._local_cas
+ def get_local_cas(self):
+ assert self._casd_channel, "CASCache was created without a channel"
+ return self._casd_channel.get_local_cas()
# preflight():
#
@@ -161,7 +140,7 @@ class CASCache:
# against fork() with open gRPC channels.
#
def has_open_grpc_channels(self):
- return bool(self._casd_channel)
+ return self._casd_lazy_connection and not self._casd_lazy_connection.is_closed()
# close_grpc_channels():
#
@@ -169,10 +148,7 @@ class CASCache:
#
def close_grpc_channels(self):
if self._casd_channel:
- self._local_cas = None
- self._casd_cas = None
self._casd_channel.close()
- self._casd_channel = None
# release_resources():
#
@@ -226,7 +202,7 @@ class CASCache:
# Returns: True if the directory is available in the local cache
#
def contains_directory(self, digest, *, with_files):
- local_cas = self._get_local_cas()
+ local_cas = self.get_local_cas()
request = local_cas_pb2.FetchTreeRequest()
request.root_digest.CopyFrom(digest)
@@ -385,7 +361,7 @@ class CASCache:
request.path.append(path)
- local_cas = self._get_local_cas()
+ local_cas = self.get_local_cas()
response = local_cas.CaptureFiles(request)
@@ -412,7 +388,7 @@ class CASCache:
# (Digest): The digest of the imported directory
#
def import_directory(self, path):
- local_cas = self._get_local_cas()
+ local_cas = self.get_local_cas()
request = local_cas_pb2.CaptureTreeRequest()
request.path.append(path)
@@ -532,7 +508,7 @@ class CASCache:
# Returns: List of missing Digest objects
#
def remote_missing_blobs(self, remote, blobs):
- cas = self._get_cas()
+ cas = self.get_cas()
instance_name = remote.local_cas_instance_name
missing_blobs = dict()
@@ -1042,7 +1018,7 @@ class _CASCacheUsageMonitor:
disk_usage = self._disk_usage
disk_quota = self._disk_quota
- local_cas = self.cas._get_local_cas()
+ local_cas = self.cas.get_local_cas()
while True:
try:
@@ -1068,18 +1044,3 @@ def _grouper(iterable, n):
except StopIteration:
return
yield itertools.chain([current], itertools.islice(iterable, n - 1))
-
-
-# _LimitedCASDProcessManagerProxy
-#
-# This can stand-in for an owning CASDProcessManager, for some functions. This
-# is useful when pickling objects that contain a CASDProcessManager - as long
-# as the lifetime of the original exceeds this proxy.
-#
-# Args:
-# casd_process_manager (CASDProcessManager): The manager to proxy
-#
-class _LimitedCASDProcessManagerProxy:
- def __init__(self, casd_process_manager):
- self.socket_path = casd_process_manager.socket_path
- self.start_time = casd_process_manager.start_time
diff --git a/src/buildstream/_cas/casdprocessmanager.py b/src/buildstream/_cas/casdprocessmanager.py
index 27426c907..d6e89831a 100644
--- a/src/buildstream/_cas/casdprocessmanager.py
+++ b/src/buildstream/_cas/casdprocessmanager.py
@@ -24,7 +24,13 @@ import subprocess
import tempfile
import time
+import grpc
+
+from .._protos.build.bazel.remote.execution.v2 import remote_execution_pb2, remote_execution_pb2_grpc
+from .._protos.build.buildgrid import local_cas_pb2_grpc
+
from .. import _signals, utils
+from .._exceptions import CASCacheError
from .._message import Message, MessageType
_CASD_MAX_LOGFILES = 10
@@ -48,10 +54,11 @@ class CASDProcessManager:
# Place socket in global/user temporary directory to avoid hitting
# the socket path length limit.
self._socket_tempdir = tempfile.mkdtemp(prefix="buildstream")
- self.socket_path = os.path.join(self._socket_tempdir, "casd.sock")
+ self._socket_path = os.path.join(self._socket_tempdir, "casd.sock")
+ self._connection_string = "unix:" + self._socket_path
casd_args = [utils.get_host_tool("buildbox-casd")]
- casd_args.append("--bind=unix:" + self.socket_path)
+ casd_args.append("--bind=" + self._connection_string)
casd_args.append("--log-level=" + log_level.value)
if cache_quota is not None:
@@ -63,7 +70,7 @@ class CASDProcessManager:
casd_args.append(path)
- self.start_time = time.time()
+ self._start_time = time.time()
self._logfile = self._rotate_and_get_next_logfile()
with open(self._logfile, "w") as logfile_fp:
@@ -92,7 +99,7 @@ class CASDProcessManager:
logfile_to_delete = existing_logs.pop(0)
os.remove(os.path.join(self._log_dir, logfile_to_delete))
- return os.path.join(self._log_dir, str(self.start_time) + ".log")
+ return os.path.join(self._log_dir, str(self._start_time) + ".log")
# release_resources()
#
@@ -154,3 +161,77 @@ class CASDProcessManager:
"Buildbox-casd didn't exit cleanly. Exit code: {}, Logs: {}".format(return_code, self._logfile),
)
)
+
+ # create_channel():
+ #
+ # Return a CASDChannel, note that the actual connection is not necessarily
+ # established until it is needed.
+ #
+ def create_channel(self):
+ return CASDChannel(self._socket_path, self._connection_string, self._start_time)
+
+
+class CASDChannel:
+ def __init__(self, socket_path, connection_string, start_time):
+ self._socket_path = socket_path
+ self._connection_string = connection_string
+ self._start_time = start_time
+ self._casd_channel = None
+ self._casd_cas = None
+ self._local_cas = None
+
+ def _establish_connection(self):
+ assert self._casd_channel is None
+
+ while not os.path.exists(self._socket_path):
+ # casd is not ready yet, try again after a 10ms delay,
+ # but don't wait for more than 15s
+ if time.time() > self._start_time + 15:
+ raise CASCacheError("Timed out waiting for buildbox-casd to become ready")
+
+ time.sleep(0.01)
+
+ self._casd_channel = grpc.insecure_channel(self._connection_string)
+ self._casd_cas = remote_execution_pb2_grpc.ContentAddressableStorageStub(self._casd_channel)
+ self._local_cas = local_cas_pb2_grpc.LocalContentAddressableStorageStub(self._casd_channel)
+
+ # Call GetCapabilities() to establish connection to casd
+ capabilities = remote_execution_pb2_grpc.CapabilitiesStub(self._casd_channel)
+ capabilities.GetCapabilities(remote_execution_pb2.GetCapabilitiesRequest())
+
+ # get_cas():
+ #
+ # Return ContentAddressableStorage stub for buildbox-casd channel.
+ #
+ def get_cas(self):
+ if self._casd_channel is None:
+ self._establish_connection()
+ return self._casd_cas
+
+ # get_local_cas():
+ #
+ # Return LocalCAS stub for buildbox-casd channel.
+ #
+ def get_local_cas(self):
+ if self._casd_channel is None:
+ self._establish_connection()
+ return self._local_cas
+
+ # is_closed():
+ #
+ # Return whether the channel is closed or not.
+ #
+ def is_closed(self):
+ return self._casd_channel is None
+
+ # close():
+ #
+ # Close the casd channel.
+ #
+ def close(self):
+ if self.is_closed():
+ return
+ self._local_cas = None
+ self._casd_cas = None
+ self._casd_channel.close()
+ self._casd_channel = None
diff --git a/src/buildstream/_cas/casremote.py b/src/buildstream/_cas/casremote.py
index ee6f4679c..93f4e500c 100644
--- a/src/buildstream/_cas/casremote.py
+++ b/src/buildstream/_cas/casremote.py
@@ -53,7 +53,7 @@ class CASRemote(BaseRemote):
# be called outside of init().
#
def _configure_protocols(self):
- local_cas = self.cascache._get_local_cas()
+ local_cas = self.cascache.get_local_cas()
request = local_cas_pb2.GetInstanceNameForRemoteRequest()
request.url = self.spec.url
if self.spec.instance_name:
@@ -113,7 +113,7 @@ class _CASBatchRead:
if not self._requests:
return
- local_cas = self._remote.cascache._get_local_cas()
+ local_cas = self._remote.cascache.get_local_cas()
for request in self._requests:
batch_response = local_cas.FetchMissingBlobs(request)
@@ -167,7 +167,7 @@ class _CASBatchUpdate:
if not self._requests:
return
- local_cas = self._remote.cascache._get_local_cas()
+ local_cas = self._remote.cascache.get_local_cas()
for request in self._requests:
batch_response = local_cas.UploadMissingBlobs(request)