diff options
Diffstat (limited to 'src/buildstream/_cas/cascache.py')
-rw-r--r-- | src/buildstream/_cas/cascache.py | 275 |
1 files changed, 104 insertions, 171 deletions
diff --git a/src/buildstream/_cas/cascache.py b/src/buildstream/_cas/cascache.py index d13531c6c..b80460abf 100644 --- a/src/buildstream/_cas/cascache.py +++ b/src/buildstream/_cas/cascache.py @@ -37,7 +37,7 @@ from ..types import FastEnum, SourceRef from .._exceptions import CASCacheError from .casdprocessmanager import CASDProcessManager -from .casremote import _CASBatchRead, _CASBatchUpdate +from .casremote import _CASBatchRead, _CASBatchUpdate, BlobNotFound _BUFFER_SIZE = 65536 @@ -152,13 +152,7 @@ class CASCache: # Returns: True if the files are in the cache, False otherwise # def contains_files(self, digests): - cas = self.get_cas() - - request = remote_execution_pb2.FindMissingBlobsRequest() - request.blob_digests.extend(digests) - - response = cas.FindMissingBlobs(request) - return len(response.missing_blob_digests) == 0 + return len(self.missing_blobs(digests)) == 0 # contains_directory(): # @@ -278,15 +272,29 @@ class CASCache: def objpath(self, digest): return os.path.join(self.casdir, "objects", digest.hash[:2], digest.hash[2:]) + # open(): + # + # Open file read-only by CAS digest and return a corresponding file object. + # + # Args: + # digest (Digest): The digest of the object + # mode (str): An optional string that specifies the mode in which the file is opened. + # + def open(self, digest, mode="r"): + if mode not in ["r", "rb"]: + raise ValueError("Unsupported mode: `{}`".format(mode)) + + objpath = self.objpath(digest) + + return open(objpath, mode=mode) + # add_object(): # # Hash and write object to CAS. # # Args: - # digest (Digest): An optional Digest object to populate # path (str): Path to file to add # buffer (bytes): Byte buffer to add - # link_directly (bool): Whether file given by path can be linked # instance_name (str): casd instance_name for remote CAS # # Returns: @@ -294,44 +302,72 @@ class CASCache: # # Either `path` or `buffer` must be passed, but not both. # - def add_object(self, *, digest=None, path=None, buffer=None, link_directly=False, instance_name=None): + def add_object(self, *, path=None, buffer=None, instance_name=None): # Exactly one of the two parameters has to be specified assert (path is None) != (buffer is None) + if path is None: + digests = self.add_objects(buffers=[buffer], instance_name=instance_name) + else: + digests = self.add_objects(paths=[path], instance_name=instance_name) + assert len(digests) == 1 + return digests[0] - # If we're linking directly, then path must be specified. - assert (not link_directly) or (link_directly and path) + # add_objects(): + # + # Hash and write objects to CAS. + # + # Args: + # paths (List[str]): Paths to files to add + # buffers (List[bytes]): Byte buffers to add + # instance_name (str): casd instance_name for remote CAS + # + # Returns: + # (List[Digest]): The digests of the added objects + # + # Either `paths` or `buffers` must be passed, but not both. + # + def add_objects(self, *, paths=None, buffers=None, instance_name=None): + # Exactly one of the two parameters has to be specified + assert (paths is None) != (buffers is None) - if digest is None: - digest = remote_execution_pb2.Digest() + digests = [] with contextlib.ExitStack() as stack: - if path is None: - tmp = stack.enter_context(self._temporary_object()) - tmp.write(buffer) - tmp.flush() - path = tmp.name + if paths is None: + paths = [] + for buffer in buffers: + tmp = stack.enter_context(self._temporary_object()) + tmp.write(buffer) + tmp.flush() + paths.append(tmp.name) request = local_cas_pb2.CaptureFilesRequest() if instance_name: request.instance_name = instance_name - request.path.append(path) + for path in paths: + request.path.append(path) local_cas = self.get_local_cas() response = local_cas.CaptureFiles(request) - if len(response.responses) != 1: - raise CASCacheError("Expected 1 response from CaptureFiles, got {}".format(len(response.responses))) + if len(response.responses) != len(paths): + raise CASCacheError( + "Expected {} responses from CaptureFiles, got {}".format(len(paths), len(response.responses)) + ) + + for path, blob_response in zip(paths, response.responses): + if blob_response.status.code == code_pb2.RESOURCE_EXHAUSTED: + raise CASCacheError("Cache too full", reason="cache-too-full") + if blob_response.status.code != code_pb2.OK: + raise CASCacheError("Failed to capture blob {}: {}".format(path, blob_response.status.code)) - blob_response = response.responses[0] - if blob_response.status.code == code_pb2.RESOURCE_EXHAUSTED: - raise CASCacheError("Cache too full", reason="cache-too-full") - if blob_response.status.code != code_pb2.OK: - raise CASCacheError("Failed to capture blob {}: {}".format(path, blob_response.status.code)) - digest.CopyFrom(blob_response.digest) + digest = remote_execution_pb2.Digest() + digest.CopyFrom(blob_response.digest) + digests.append(digest) - return digest + return digests # import_directory(): # @@ -374,7 +410,7 @@ class CASCache: return utils._message_digest(root_directory) - # remote_missing_blobs_for_directory(): + # missing_blobs_for_directory(): # # Determine which blobs of a directory tree are missing on the remote. # @@ -383,23 +419,27 @@ class CASCache: # # Returns: List of missing Digest objects # - def remote_missing_blobs_for_directory(self, remote, digest): + def missing_blobs_for_directory(self, digest, *, remote=None): required_blobs = self.required_blobs_for_directory(digest) - return self.remote_missing_blobs(remote, required_blobs) + return self.missing_blobs(required_blobs, remote=remote) - # remote_missing_blobs(): + # missing_blobs(): # - # Determine which blobs are missing on the remote. + # Determine which blobs are missing locally or on the remote. # # Args: # blobs ([Digest]): List of directory digests to check # # Returns: List of missing Digest objects # - def remote_missing_blobs(self, remote, blobs): + def missing_blobs(self, blobs, *, remote=None): cas = self.get_cas() - instance_name = remote.local_cas_instance_name + + if remote: + instance_name = remote.local_cas_instance_name + else: + instance_name = "" missing_blobs = dict() # Limit size of FindMissingBlobs request @@ -424,23 +464,6 @@ class CASCache: return missing_blobs.values() - # local_missing_blobs(): - # - # Check local cache for missing blobs. - # - # Args: - # digests (list): The Digests of blobs to check - # - # Returns: Missing Digest objects - # - def local_missing_blobs(self, digests): - missing_blobs = [] - for digest in digests: - objpath = self.objpath(digest) - if not os.path.exists(objpath): - missing_blobs.append(digest) - return missing_blobs - # required_blobs_for_directory(): # # Generator that returns the Digests of all blobs in the tree specified by @@ -470,38 +493,6 @@ class CASCache: # Local Private Methods # ################################################ - def _reachable_refs_dir(self, reachable, tree, update_mtime=False, check_exists=False): - if tree.hash in reachable: - return - try: - if update_mtime: - os.utime(self.objpath(tree)) - - reachable.add(tree.hash) - - directory = remote_execution_pb2.Directory() - - with open(self.objpath(tree), "rb") as f: - directory.ParseFromString(f.read()) - - except FileNotFoundError: - if check_exists: - raise - - # Just exit early if the file doesn't exist - return - - for filenode in directory.files: - if update_mtime: - os.utime(self.objpath(filenode.digest)) - if check_exists: - if not os.path.exists(self.objpath(filenode.digest)): - raise FileNotFoundError - reachable.add(filenode.digest.hash) - - for dirnode in directory.directories: - self._reachable_refs_dir(reachable, dirnode.digest, update_mtime=update_mtime, check_exists=check_exists) - # _temporary_object(): # # Returns: @@ -514,113 +505,55 @@ class CASCache: os.chmod(f.name, stat.S_IRUSR | stat.S_IWUSR | stat.S_IRGRP | stat.S_IROTH) yield f - # _ensure_blob(): - # - # Fetch and add blob if it's not already local. - # - # Args: - # remote (Remote): The remote to use. - # digest (Digest): Digest object for the blob to fetch. - # - # Returns: - # (str): The path of the object - # - def _ensure_blob(self, remote, digest): - objpath = self.objpath(digest) - if os.path.exists(objpath): - # already in local repository - return objpath - - batch = _CASBatchRead(remote) - batch.add(digest) - batch.send() - - return objpath - - # Helper function for _fetch_directory(). - def _fetch_directory_batch(self, remote, batch, fetch_queue, fetch_next_queue): - batch.send() - - # All previously scheduled directories are now locally available, - # move them to the processing queue. - fetch_queue.extend(fetch_next_queue) - fetch_next_queue.clear() - return _CASBatchRead(remote) - - # Helper function for _fetch_directory(). - def _fetch_directory_node(self, remote, digest, batch, fetch_queue, fetch_next_queue, *, recursive=False): - in_local_cache = os.path.exists(self.objpath(digest)) - - if in_local_cache: - # Skip download, already in local cache. - pass - else: - batch.add(digest) - - if recursive: - if in_local_cache: - # Add directory to processing queue. - fetch_queue.append(digest) - else: - # Directory will be available after completing pending batch. - # Add directory to deferred processing queue. - fetch_next_queue.append(digest) - - return batch - # _fetch_directory(): # # Fetches remote directory and adds it to content addressable store. # - # This recursively fetches directory objects but doesn't fetch any - # files. + # This recursively fetches directory objects and files. # # Args: # remote (Remote): The remote to use. # dir_digest (Digest): Digest object for the directory to fetch. # def _fetch_directory(self, remote, dir_digest): - # TODO Use GetTree() if the server supports it - - fetch_queue = [dir_digest] - fetch_next_queue = [] - batch = _CASBatchRead(remote) - - while len(fetch_queue) + len(fetch_next_queue) > 0: - if not fetch_queue: - batch = self._fetch_directory_batch(remote, batch, fetch_queue, fetch_next_queue) - - dir_digest = fetch_queue.pop(0) - - objpath = self._ensure_blob(remote, dir_digest) + local_cas = self.get_local_cas() - directory = remote_execution_pb2.Directory() - with open(objpath, "rb") as f: - directory.ParseFromString(f.read()) + request = local_cas_pb2.FetchTreeRequest() + request.instance_name = remote.local_cas_instance_name + request.root_digest.CopyFrom(dir_digest) + request.fetch_file_blobs = False - for dirnode in directory.directories: - batch = self._fetch_directory_node( - remote, dirnode.digest, batch, fetch_queue, fetch_next_queue, recursive=True - ) + try: + local_cas.FetchTree(request) + except grpc.RpcError as e: + if e.code() == grpc.StatusCode.NOT_FOUND: + raise BlobNotFound( + dir_digest.hash, + "Failed to fetch directory tree {}: {}: {}".format(dir_digest.hash, e.code().name, e.details()), + ) from e + raise CASCacheError( + "Failed to fetch directory tree {}: {}: {}".format(dir_digest.hash, e.code().name, e.details()) + ) from e - # Fetch final batch - self._fetch_directory_batch(remote, batch, fetch_queue, fetch_next_queue) + required_blobs = self.required_blobs_for_directory(dir_digest) + self.fetch_blobs(remote, required_blobs) def _fetch_tree(self, remote, digest): - objpath = self._ensure_blob(remote, digest) + self.fetch_blobs(remote, [digest]) tree = remote_execution_pb2.Tree() - with open(objpath, "rb") as f: + with self.open(digest, "rb") as f: tree.ParseFromString(f.read()) - tree.children.extend([tree.root]) + dirbuffers = [tree.root.SerializeToString()] for directory in tree.children: - dirbuffer = directory.SerializeToString() - dirdigest = self.add_object(buffer=dirbuffer) - assert dirdigest.size_bytes == len(dirbuffer) + dirbuffers.append(directory.SerializeToString()) + + dirdigests = self.add_objects(buffers=dirbuffers) - return dirdigest + # The digest of the root directory + return dirdigests[0] # fetch_blobs(): # |