diff options
Diffstat (limited to 'buildstream/_cas/cascache.py')
-rw-r--r-- | buildstream/_cas/cascache.py | 86 |
1 files changed, 44 insertions, 42 deletions
diff --git a/buildstream/_cas/cascache.py b/buildstream/_cas/cascache.py index eae3ef04d..5f67dc0c1 100644 --- a/buildstream/_cas/cascache.py +++ b/buildstream/_cas/cascache.py @@ -268,15 +268,13 @@ class CASCache(): request.key = ref response = remote.ref_storage.GetReference(request) - tree = remote_execution_pb2.Digest() - tree.hash = response.digest.hash - tree.size_bytes = response.digest.size_bytes + tree = response.digest # Fetch Directory objects self._fetch_directory(remote, tree) # Fetch files, excluded_subdirs determined in pullqueue - required_blobs = self._required_blobs(tree, excluded_subdirs=excluded_subdirs) + required_blobs = self.required_blobs_for_directory(tree, excluded_subdirs=excluded_subdirs) missing_blobs = self.local_missing_blobs(required_blobs) if missing_blobs: self.fetch_blobs(remote, missing_blobs) @@ -368,8 +366,7 @@ class CASCache(): request = buildstream_pb2.UpdateReferenceRequest(instance_name=remote.spec.instance_name) request.keys.append(ref) - request.digest.hash = tree.hash - request.digest.size_bytes = tree.size_bytes + request.digest.CopyFrom(tree) remote.ref_storage.UpdateReference(request) skipped_remote = False @@ -647,23 +644,33 @@ class CASCache(): # Returns: List of missing Digest objects # def remote_missing_blobs_for_directory(self, remote, digest): - required_blobs = self._required_blobs(digest) + required_blobs = self.required_blobs_for_directory(digest) + return self.remote_missing_blobs(remote, required_blobs) + + # remote_missing_blobs(): + # + # Determine which blobs are missing on the remote. + # + # Args: + # blobs (Digest): The directory digest + # + # Returns: List of missing Digest objects + # + def remote_missing_blobs(self, remote, blobs): missing_blobs = dict() # Limit size of FindMissingBlobs request - for required_blobs_group in _grouper(required_blobs, 512): + for required_blobs_group in _grouper(blobs, 512): request = remote_execution_pb2.FindMissingBlobsRequest(instance_name=remote.spec.instance_name) for required_digest in required_blobs_group: d = request.blob_digests.add() - d.hash = required_digest.hash - d.size_bytes = required_digest.size_bytes + d.CopyFrom(required_digest) response = remote.cas.FindMissingBlobs(request) for missing_digest in response.missing_blob_digests: d = remote_execution_pb2.Digest() - d.hash = missing_digest.hash - d.size_bytes = missing_digest.size_bytes + d.CopyFrom(missing_digest) missing_blobs[d.hash] = d return missing_blobs.values() @@ -685,6 +692,31 @@ class CASCache(): missing_blobs.append(digest) return missing_blobs + # required_blobs_for_directory(): + # + # Generator that returns the Digests of all blobs in the tree specified by + # the Digest of the toplevel Directory object. + # + def required_blobs_for_directory(self, directory_digest, *, excluded_subdirs=None): + if not excluded_subdirs: + excluded_subdirs = [] + + # parse directory, and recursively add blobs + + yield directory_digest + + directory = remote_execution_pb2.Directory() + + with open(self.objpath(directory_digest), 'rb') as f: + directory.ParseFromString(f.read()) + + for filenode in directory.files: + yield filenode.digest + + for dirnode in directory.directories: + if dirnode.name not in excluded_subdirs: + yield from self.required_blobs_for_directory(dirnode.digest) + ################################################ # Local Private Methods # ################################################ @@ -881,31 +913,6 @@ class CASCache(): for dirnode in directory.directories: self._reachable_refs_dir(reachable, dirnode.digest, update_mtime=update_mtime, check_exists=check_exists) - def _required_blobs(self, directory_digest, *, excluded_subdirs=None): - if not excluded_subdirs: - excluded_subdirs = [] - - # parse directory, and recursively add blobs - d = remote_execution_pb2.Digest() - d.hash = directory_digest.hash - d.size_bytes = directory_digest.size_bytes - yield d - - directory = remote_execution_pb2.Directory() - - with open(self.objpath(directory_digest), 'rb') as f: - directory.ParseFromString(f.read()) - - for filenode in directory.files: - d = remote_execution_pb2.Digest() - d.hash = filenode.digest.hash - d.size_bytes = filenode.digest.size_bytes - yield d - - for dirnode in directory.directories: - if dirnode.name not in excluded_subdirs: - yield from self._required_blobs(dirnode.digest) - # _temporary_object(): # # Returns: @@ -1042,11 +1049,6 @@ class CASCache(): tree.children.extend([tree.root]) for directory in tree.children: - for filenode in directory.files: - self._ensure_blob(remote, filenode.digest) - - # place directory blob only in final location when we've downloaded - # all referenced blobs to avoid dangling references in the repository dirbuffer = directory.SerializeToString() dirdigest = self.add_object(buffer=dirbuffer) assert dirdigest.size_bytes == len(dirbuffer) |