summaryrefslogtreecommitdiff
path: root/src/buildstream/_cas/cascache.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/buildstream/_cas/cascache.py')
-rw-r--r--src/buildstream/_cas/cascache.py275
1 files changed, 104 insertions, 171 deletions
diff --git a/src/buildstream/_cas/cascache.py b/src/buildstream/_cas/cascache.py
index d13531c6c..b80460abf 100644
--- a/src/buildstream/_cas/cascache.py
+++ b/src/buildstream/_cas/cascache.py
@@ -37,7 +37,7 @@ from ..types import FastEnum, SourceRef
from .._exceptions import CASCacheError
from .casdprocessmanager import CASDProcessManager
-from .casremote import _CASBatchRead, _CASBatchUpdate
+from .casremote import _CASBatchRead, _CASBatchUpdate, BlobNotFound
_BUFFER_SIZE = 65536
@@ -152,13 +152,7 @@ class CASCache:
# Returns: True if the files are in the cache, False otherwise
#
def contains_files(self, digests):
- cas = self.get_cas()
-
- request = remote_execution_pb2.FindMissingBlobsRequest()
- request.blob_digests.extend(digests)
-
- response = cas.FindMissingBlobs(request)
- return len(response.missing_blob_digests) == 0
+ return len(self.missing_blobs(digests)) == 0
# contains_directory():
#
@@ -278,15 +272,29 @@ class CASCache:
def objpath(self, digest):
return os.path.join(self.casdir, "objects", digest.hash[:2], digest.hash[2:])
+ # open():
+ #
+ # Open file read-only by CAS digest and return a corresponding file object.
+ #
+ # Args:
+ # digest (Digest): The digest of the object
+ # mode (str): An optional string that specifies the mode in which the file is opened.
+ #
+ def open(self, digest, mode="r"):
+ if mode not in ["r", "rb"]:
+ raise ValueError("Unsupported mode: `{}`".format(mode))
+
+ objpath = self.objpath(digest)
+
+ return open(objpath, mode=mode)
+
# add_object():
#
# Hash and write object to CAS.
#
# Args:
- # digest (Digest): An optional Digest object to populate
# path (str): Path to file to add
# buffer (bytes): Byte buffer to add
- # link_directly (bool): Whether file given by path can be linked
# instance_name (str): casd instance_name for remote CAS
#
# Returns:
@@ -294,44 +302,72 @@ class CASCache:
#
# Either `path` or `buffer` must be passed, but not both.
#
- def add_object(self, *, digest=None, path=None, buffer=None, link_directly=False, instance_name=None):
+ def add_object(self, *, path=None, buffer=None, instance_name=None):
# Exactly one of the two parameters has to be specified
assert (path is None) != (buffer is None)
+ if path is None:
+ digests = self.add_objects(buffers=[buffer], instance_name=instance_name)
+ else:
+ digests = self.add_objects(paths=[path], instance_name=instance_name)
+ assert len(digests) == 1
+ return digests[0]
- # If we're linking directly, then path must be specified.
- assert (not link_directly) or (link_directly and path)
+ # add_objects():
+ #
+ # Hash and write objects to CAS.
+ #
+ # Args:
+ # paths (List[str]): Paths to files to add
+ # buffers (List[bytes]): Byte buffers to add
+ # instance_name (str): casd instance_name for remote CAS
+ #
+ # Returns:
+ # (List[Digest]): The digests of the added objects
+ #
+ # Either `paths` or `buffers` must be passed, but not both.
+ #
+ def add_objects(self, *, paths=None, buffers=None, instance_name=None):
+ # Exactly one of the two parameters has to be specified
+ assert (paths is None) != (buffers is None)
- if digest is None:
- digest = remote_execution_pb2.Digest()
+ digests = []
with contextlib.ExitStack() as stack:
- if path is None:
- tmp = stack.enter_context(self._temporary_object())
- tmp.write(buffer)
- tmp.flush()
- path = tmp.name
+ if paths is None:
+ paths = []
+ for buffer in buffers:
+ tmp = stack.enter_context(self._temporary_object())
+ tmp.write(buffer)
+ tmp.flush()
+ paths.append(tmp.name)
request = local_cas_pb2.CaptureFilesRequest()
if instance_name:
request.instance_name = instance_name
- request.path.append(path)
+ for path in paths:
+ request.path.append(path)
local_cas = self.get_local_cas()
response = local_cas.CaptureFiles(request)
- if len(response.responses) != 1:
- raise CASCacheError("Expected 1 response from CaptureFiles, got {}".format(len(response.responses)))
+ if len(response.responses) != len(paths):
+ raise CASCacheError(
+ "Expected {} responses from CaptureFiles, got {}".format(len(paths), len(response.responses))
+ )
+
+ for path, blob_response in zip(paths, response.responses):
+ if blob_response.status.code == code_pb2.RESOURCE_EXHAUSTED:
+ raise CASCacheError("Cache too full", reason="cache-too-full")
+ if blob_response.status.code != code_pb2.OK:
+ raise CASCacheError("Failed to capture blob {}: {}".format(path, blob_response.status.code))
- blob_response = response.responses[0]
- if blob_response.status.code == code_pb2.RESOURCE_EXHAUSTED:
- raise CASCacheError("Cache too full", reason="cache-too-full")
- if blob_response.status.code != code_pb2.OK:
- raise CASCacheError("Failed to capture blob {}: {}".format(path, blob_response.status.code))
- digest.CopyFrom(blob_response.digest)
+ digest = remote_execution_pb2.Digest()
+ digest.CopyFrom(blob_response.digest)
+ digests.append(digest)
- return digest
+ return digests
# import_directory():
#
@@ -374,7 +410,7 @@ class CASCache:
return utils._message_digest(root_directory)
- # remote_missing_blobs_for_directory():
+ # missing_blobs_for_directory():
#
# Determine which blobs of a directory tree are missing on the remote.
#
@@ -383,23 +419,27 @@ class CASCache:
#
# Returns: List of missing Digest objects
#
- def remote_missing_blobs_for_directory(self, remote, digest):
+ def missing_blobs_for_directory(self, digest, *, remote=None):
required_blobs = self.required_blobs_for_directory(digest)
- return self.remote_missing_blobs(remote, required_blobs)
+ return self.missing_blobs(required_blobs, remote=remote)
- # remote_missing_blobs():
+ # missing_blobs():
#
- # Determine which blobs are missing on the remote.
+ # Determine which blobs are missing locally or on the remote.
#
# Args:
# blobs ([Digest]): List of directory digests to check
#
# Returns: List of missing Digest objects
#
- def remote_missing_blobs(self, remote, blobs):
+ def missing_blobs(self, blobs, *, remote=None):
cas = self.get_cas()
- instance_name = remote.local_cas_instance_name
+
+ if remote:
+ instance_name = remote.local_cas_instance_name
+ else:
+ instance_name = ""
missing_blobs = dict()
# Limit size of FindMissingBlobs request
@@ -424,23 +464,6 @@ class CASCache:
return missing_blobs.values()
- # local_missing_blobs():
- #
- # Check local cache for missing blobs.
- #
- # Args:
- # digests (list): The Digests of blobs to check
- #
- # Returns: Missing Digest objects
- #
- def local_missing_blobs(self, digests):
- missing_blobs = []
- for digest in digests:
- objpath = self.objpath(digest)
- if not os.path.exists(objpath):
- missing_blobs.append(digest)
- return missing_blobs
-
# required_blobs_for_directory():
#
# Generator that returns the Digests of all blobs in the tree specified by
@@ -470,38 +493,6 @@ class CASCache:
# Local Private Methods #
################################################
- def _reachable_refs_dir(self, reachable, tree, update_mtime=False, check_exists=False):
- if tree.hash in reachable:
- return
- try:
- if update_mtime:
- os.utime(self.objpath(tree))
-
- reachable.add(tree.hash)
-
- directory = remote_execution_pb2.Directory()
-
- with open(self.objpath(tree), "rb") as f:
- directory.ParseFromString(f.read())
-
- except FileNotFoundError:
- if check_exists:
- raise
-
- # Just exit early if the file doesn't exist
- return
-
- for filenode in directory.files:
- if update_mtime:
- os.utime(self.objpath(filenode.digest))
- if check_exists:
- if not os.path.exists(self.objpath(filenode.digest)):
- raise FileNotFoundError
- reachable.add(filenode.digest.hash)
-
- for dirnode in directory.directories:
- self._reachable_refs_dir(reachable, dirnode.digest, update_mtime=update_mtime, check_exists=check_exists)
-
# _temporary_object():
#
# Returns:
@@ -514,113 +505,55 @@ class CASCache:
os.chmod(f.name, stat.S_IRUSR | stat.S_IWUSR | stat.S_IRGRP | stat.S_IROTH)
yield f
- # _ensure_blob():
- #
- # Fetch and add blob if it's not already local.
- #
- # Args:
- # remote (Remote): The remote to use.
- # digest (Digest): Digest object for the blob to fetch.
- #
- # Returns:
- # (str): The path of the object
- #
- def _ensure_blob(self, remote, digest):
- objpath = self.objpath(digest)
- if os.path.exists(objpath):
- # already in local repository
- return objpath
-
- batch = _CASBatchRead(remote)
- batch.add(digest)
- batch.send()
-
- return objpath
-
- # Helper function for _fetch_directory().
- def _fetch_directory_batch(self, remote, batch, fetch_queue, fetch_next_queue):
- batch.send()
-
- # All previously scheduled directories are now locally available,
- # move them to the processing queue.
- fetch_queue.extend(fetch_next_queue)
- fetch_next_queue.clear()
- return _CASBatchRead(remote)
-
- # Helper function for _fetch_directory().
- def _fetch_directory_node(self, remote, digest, batch, fetch_queue, fetch_next_queue, *, recursive=False):
- in_local_cache = os.path.exists(self.objpath(digest))
-
- if in_local_cache:
- # Skip download, already in local cache.
- pass
- else:
- batch.add(digest)
-
- if recursive:
- if in_local_cache:
- # Add directory to processing queue.
- fetch_queue.append(digest)
- else:
- # Directory will be available after completing pending batch.
- # Add directory to deferred processing queue.
- fetch_next_queue.append(digest)
-
- return batch
-
# _fetch_directory():
#
# Fetches remote directory and adds it to content addressable store.
#
- # This recursively fetches directory objects but doesn't fetch any
- # files.
+ # This recursively fetches directory objects and files.
#
# Args:
# remote (Remote): The remote to use.
# dir_digest (Digest): Digest object for the directory to fetch.
#
def _fetch_directory(self, remote, dir_digest):
- # TODO Use GetTree() if the server supports it
-
- fetch_queue = [dir_digest]
- fetch_next_queue = []
- batch = _CASBatchRead(remote)
-
- while len(fetch_queue) + len(fetch_next_queue) > 0:
- if not fetch_queue:
- batch = self._fetch_directory_batch(remote, batch, fetch_queue, fetch_next_queue)
-
- dir_digest = fetch_queue.pop(0)
-
- objpath = self._ensure_blob(remote, dir_digest)
+ local_cas = self.get_local_cas()
- directory = remote_execution_pb2.Directory()
- with open(objpath, "rb") as f:
- directory.ParseFromString(f.read())
+ request = local_cas_pb2.FetchTreeRequest()
+ request.instance_name = remote.local_cas_instance_name
+ request.root_digest.CopyFrom(dir_digest)
+ request.fetch_file_blobs = False
- for dirnode in directory.directories:
- batch = self._fetch_directory_node(
- remote, dirnode.digest, batch, fetch_queue, fetch_next_queue, recursive=True
- )
+ try:
+ local_cas.FetchTree(request)
+ except grpc.RpcError as e:
+ if e.code() == grpc.StatusCode.NOT_FOUND:
+ raise BlobNotFound(
+ dir_digest.hash,
+ "Failed to fetch directory tree {}: {}: {}".format(dir_digest.hash, e.code().name, e.details()),
+ ) from e
+ raise CASCacheError(
+ "Failed to fetch directory tree {}: {}: {}".format(dir_digest.hash, e.code().name, e.details())
+ ) from e
- # Fetch final batch
- self._fetch_directory_batch(remote, batch, fetch_queue, fetch_next_queue)
+ required_blobs = self.required_blobs_for_directory(dir_digest)
+ self.fetch_blobs(remote, required_blobs)
def _fetch_tree(self, remote, digest):
- objpath = self._ensure_blob(remote, digest)
+ self.fetch_blobs(remote, [digest])
tree = remote_execution_pb2.Tree()
- with open(objpath, "rb") as f:
+ with self.open(digest, "rb") as f:
tree.ParseFromString(f.read())
- tree.children.extend([tree.root])
+ dirbuffers = [tree.root.SerializeToString()]
for directory in tree.children:
- dirbuffer = directory.SerializeToString()
- dirdigest = self.add_object(buffer=dirbuffer)
- assert dirdigest.size_bytes == len(dirbuffer)
+ dirbuffers.append(directory.SerializeToString())
+
+ dirdigests = self.add_objects(buffers=dirbuffers)
- return dirdigest
+ # The digest of the root directory
+ return dirdigests[0]
# fetch_blobs():
#