diff options
author | bst-marge-bot <marge-bot@buildstream.build> | 2020-12-09 18:03:26 +0000 |
---|---|---|
committer | bst-marge-bot <marge-bot@buildstream.build> | 2020-12-09 18:03:26 +0000 |
commit | 270439458a2c34486d1907aec53844cd29bfe2d6 (patch) | |
tree | e4c7679ded79f80c478a13a7a878ef00a772efe4 | |
parent | 47ef74f0ba4868eeba94baea3a8beb2874ec6b24 (diff) | |
parent | c836b7be6feaa0b9f86ab8c0cb5b913b0d8ed0e0 (diff) | |
download | buildstream-270439458a2c34486d1907aec53844cd29bfe2d6.tar.gz |
Merge branch 'juerg/cas' into 'master'
CASCache improvements
See merge request BuildStream/buildstream!2112
-rw-r--r-- | src/buildstream/_artifact.py | 11 | ||||
-rw-r--r-- | src/buildstream/_artifactcache.py | 15 | ||||
-rw-r--r-- | src/buildstream/_cas/cascache.py | 275 | ||||
-rw-r--r-- | src/buildstream/_elementsourcescache.py | 11 | ||||
-rw-r--r-- | src/buildstream/_sourcecache.py | 3 | ||||
-rw-r--r-- | src/buildstream/sandbox/_sandboxremote.py | 10 | ||||
-rw-r--r-- | src/buildstream/storage/_casbaseddirectory.py | 2 | ||||
-rw-r--r-- | tests/testutils/artifactshare.py | 21 |
8 files changed, 142 insertions, 206 deletions
diff --git a/src/buildstream/_artifact.py b/src/buildstream/_artifact.py index 6e2bc9342..d4a716ff1 100644 --- a/src/buildstream/_artifact.py +++ b/src/buildstream/_artifact.py @@ -241,7 +241,7 @@ class Artifact: # Store public data with utils._tempnamedfile_name(dir=self._tmpdir) as tmpname: _yaml.roundtrip_dump(publicdata, tmpname) - public_data_digest = self._cas.add_object(path=tmpname, link_directly=True) + public_data_digest = self._cas.add_object(path=tmpname) artifact.public_data.CopyFrom(public_data_digest) size += public_data_digest.size_bytes @@ -255,7 +255,7 @@ class Artifact: low_diversity_node = Node.from_dict(low_diversity_dict) _yaml.roundtrip_dump(low_diversity_node, tmpname) - low_diversity_meta_digest = self._cas.add_object(path=tmpname, link_directly=True) + low_diversity_meta_digest = self._cas.add_object(path=tmpname) artifact.low_diversity_meta.CopyFrom(low_diversity_meta_digest) size += low_diversity_meta_digest.size_bytes @@ -269,7 +269,7 @@ class Artifact: high_diversity_node = Node.from_dict(high_diversity_dict) _yaml.roundtrip_dump(high_diversity_node, tmpname) - high_diversity_meta_digest = self._cas.add_object(path=tmpname, link_directly=True) + high_diversity_meta_digest = self._cas.add_object(path=tmpname) artifact.high_diversity_meta.CopyFrom(high_diversity_meta_digest) size += high_diversity_meta_digest.size_bytes @@ -370,8 +370,9 @@ class Artifact: # Load the public data from the artifact artifact = self._get_proto() - meta_file = self._cas.objpath(artifact.public_data) - data = _yaml.load(meta_file, shortname="public.yaml") + with self._cas.open(artifact.public_data) as meta_file: + meta_str = meta_file.read() + data = _yaml.load_data(meta_str, file_name="public.yaml") return data diff --git a/src/buildstream/_artifactcache.py b/src/buildstream/_artifactcache.py index 09804fe01..ded87679a 100644 --- a/src/buildstream/_artifactcache.py +++ b/src/buildstream/_artifactcache.py @@ -339,7 +339,7 @@ class ArtifactCache(AssetCache): for remote in push_remotes: remote.init() - remote_missing_blobs = self.cas.remote_missing_blobs(remote, missing_blobs) + remote_missing_blobs = self.cas.missing_blobs(missing_blobs, remote=remote) for blob in remote_missing_blobs: if blob not in remote_missing_blobs_list: @@ -504,20 +504,13 @@ class ArtifactCache(AssetCache): # blobs not existing on the server. # def _pull_artifact_storage(self, element, key, artifact_digest, remote, pull_buildtrees=False): - def __pull_digest(digest): - self.cas._fetch_directory(remote, digest) - required_blobs = self.cas.required_blobs_for_directory(digest) - missing_blobs = self.cas.local_missing_blobs(required_blobs) - if missing_blobs: - self.cas.fetch_blobs(remote, missing_blobs) - artifact_name = element.get_artifact_name(key=key) try: # Fetch and parse artifact proto self.cas.fetch_blobs(remote, [artifact_digest]) artifact = artifact_pb2.Artifact() - with open(self.cas.objpath(artifact_digest), "rb") as f: + with self.cas.open(artifact_digest, "rb") as f: artifact.ParseFromString(f.read()) # Write the artifact proto to cache @@ -527,10 +520,10 @@ class ArtifactCache(AssetCache): f.write(artifact.SerializeToString()) if str(artifact.files): - __pull_digest(artifact.files) + self.cas._fetch_directory(remote, artifact.files) if pull_buildtrees and str(artifact.buildtree): - __pull_digest(artifact.buildtree) + self.cas._fetch_directory(remote, artifact.buildtree) digests = [artifact.low_diversity_meta, artifact.high_diversity_meta] if str(artifact.public_data): diff --git a/src/buildstream/_cas/cascache.py b/src/buildstream/_cas/cascache.py index d13531c6c..b80460abf 100644 --- a/src/buildstream/_cas/cascache.py +++ b/src/buildstream/_cas/cascache.py @@ -37,7 +37,7 @@ from ..types import FastEnum, SourceRef from .._exceptions import CASCacheError from .casdprocessmanager import CASDProcessManager -from .casremote import _CASBatchRead, _CASBatchUpdate +from .casremote import _CASBatchRead, _CASBatchUpdate, BlobNotFound _BUFFER_SIZE = 65536 @@ -152,13 +152,7 @@ class CASCache: # Returns: True if the files are in the cache, False otherwise # def contains_files(self, digests): - cas = self.get_cas() - - request = remote_execution_pb2.FindMissingBlobsRequest() - request.blob_digests.extend(digests) - - response = cas.FindMissingBlobs(request) - return len(response.missing_blob_digests) == 0 + return len(self.missing_blobs(digests)) == 0 # contains_directory(): # @@ -278,15 +272,29 @@ class CASCache: def objpath(self, digest): return os.path.join(self.casdir, "objects", digest.hash[:2], digest.hash[2:]) + # open(): + # + # Open file read-only by CAS digest and return a corresponding file object. + # + # Args: + # digest (Digest): The digest of the object + # mode (str): An optional string that specifies the mode in which the file is opened. + # + def open(self, digest, mode="r"): + if mode not in ["r", "rb"]: + raise ValueError("Unsupported mode: `{}`".format(mode)) + + objpath = self.objpath(digest) + + return open(objpath, mode=mode) + # add_object(): # # Hash and write object to CAS. # # Args: - # digest (Digest): An optional Digest object to populate # path (str): Path to file to add # buffer (bytes): Byte buffer to add - # link_directly (bool): Whether file given by path can be linked # instance_name (str): casd instance_name for remote CAS # # Returns: @@ -294,44 +302,72 @@ class CASCache: # # Either `path` or `buffer` must be passed, but not both. # - def add_object(self, *, digest=None, path=None, buffer=None, link_directly=False, instance_name=None): + def add_object(self, *, path=None, buffer=None, instance_name=None): # Exactly one of the two parameters has to be specified assert (path is None) != (buffer is None) + if path is None: + digests = self.add_objects(buffers=[buffer], instance_name=instance_name) + else: + digests = self.add_objects(paths=[path], instance_name=instance_name) + assert len(digests) == 1 + return digests[0] - # If we're linking directly, then path must be specified. - assert (not link_directly) or (link_directly and path) + # add_objects(): + # + # Hash and write objects to CAS. + # + # Args: + # paths (List[str]): Paths to files to add + # buffers (List[bytes]): Byte buffers to add + # instance_name (str): casd instance_name for remote CAS + # + # Returns: + # (List[Digest]): The digests of the added objects + # + # Either `paths` or `buffers` must be passed, but not both. + # + def add_objects(self, *, paths=None, buffers=None, instance_name=None): + # Exactly one of the two parameters has to be specified + assert (paths is None) != (buffers is None) - if digest is None: - digest = remote_execution_pb2.Digest() + digests = [] with contextlib.ExitStack() as stack: - if path is None: - tmp = stack.enter_context(self._temporary_object()) - tmp.write(buffer) - tmp.flush() - path = tmp.name + if paths is None: + paths = [] + for buffer in buffers: + tmp = stack.enter_context(self._temporary_object()) + tmp.write(buffer) + tmp.flush() + paths.append(tmp.name) request = local_cas_pb2.CaptureFilesRequest() if instance_name: request.instance_name = instance_name - request.path.append(path) + for path in paths: + request.path.append(path) local_cas = self.get_local_cas() response = local_cas.CaptureFiles(request) - if len(response.responses) != 1: - raise CASCacheError("Expected 1 response from CaptureFiles, got {}".format(len(response.responses))) + if len(response.responses) != len(paths): + raise CASCacheError( + "Expected {} responses from CaptureFiles, got {}".format(len(paths), len(response.responses)) + ) + + for path, blob_response in zip(paths, response.responses): + if blob_response.status.code == code_pb2.RESOURCE_EXHAUSTED: + raise CASCacheError("Cache too full", reason="cache-too-full") + if blob_response.status.code != code_pb2.OK: + raise CASCacheError("Failed to capture blob {}: {}".format(path, blob_response.status.code)) - blob_response = response.responses[0] - if blob_response.status.code == code_pb2.RESOURCE_EXHAUSTED: - raise CASCacheError("Cache too full", reason="cache-too-full") - if blob_response.status.code != code_pb2.OK: - raise CASCacheError("Failed to capture blob {}: {}".format(path, blob_response.status.code)) - digest.CopyFrom(blob_response.digest) + digest = remote_execution_pb2.Digest() + digest.CopyFrom(blob_response.digest) + digests.append(digest) - return digest + return digests # import_directory(): # @@ -374,7 +410,7 @@ class CASCache: return utils._message_digest(root_directory) - # remote_missing_blobs_for_directory(): + # missing_blobs_for_directory(): # # Determine which blobs of a directory tree are missing on the remote. # @@ -383,23 +419,27 @@ class CASCache: # # Returns: List of missing Digest objects # - def remote_missing_blobs_for_directory(self, remote, digest): + def missing_blobs_for_directory(self, digest, *, remote=None): required_blobs = self.required_blobs_for_directory(digest) - return self.remote_missing_blobs(remote, required_blobs) + return self.missing_blobs(required_blobs, remote=remote) - # remote_missing_blobs(): + # missing_blobs(): # - # Determine which blobs are missing on the remote. + # Determine which blobs are missing locally or on the remote. # # Args: # blobs ([Digest]): List of directory digests to check # # Returns: List of missing Digest objects # - def remote_missing_blobs(self, remote, blobs): + def missing_blobs(self, blobs, *, remote=None): cas = self.get_cas() - instance_name = remote.local_cas_instance_name + + if remote: + instance_name = remote.local_cas_instance_name + else: + instance_name = "" missing_blobs = dict() # Limit size of FindMissingBlobs request @@ -424,23 +464,6 @@ class CASCache: return missing_blobs.values() - # local_missing_blobs(): - # - # Check local cache for missing blobs. - # - # Args: - # digests (list): The Digests of blobs to check - # - # Returns: Missing Digest objects - # - def local_missing_blobs(self, digests): - missing_blobs = [] - for digest in digests: - objpath = self.objpath(digest) - if not os.path.exists(objpath): - missing_blobs.append(digest) - return missing_blobs - # required_blobs_for_directory(): # # Generator that returns the Digests of all blobs in the tree specified by @@ -470,38 +493,6 @@ class CASCache: # Local Private Methods # ################################################ - def _reachable_refs_dir(self, reachable, tree, update_mtime=False, check_exists=False): - if tree.hash in reachable: - return - try: - if update_mtime: - os.utime(self.objpath(tree)) - - reachable.add(tree.hash) - - directory = remote_execution_pb2.Directory() - - with open(self.objpath(tree), "rb") as f: - directory.ParseFromString(f.read()) - - except FileNotFoundError: - if check_exists: - raise - - # Just exit early if the file doesn't exist - return - - for filenode in directory.files: - if update_mtime: - os.utime(self.objpath(filenode.digest)) - if check_exists: - if not os.path.exists(self.objpath(filenode.digest)): - raise FileNotFoundError - reachable.add(filenode.digest.hash) - - for dirnode in directory.directories: - self._reachable_refs_dir(reachable, dirnode.digest, update_mtime=update_mtime, check_exists=check_exists) - # _temporary_object(): # # Returns: @@ -514,113 +505,55 @@ class CASCache: os.chmod(f.name, stat.S_IRUSR | stat.S_IWUSR | stat.S_IRGRP | stat.S_IROTH) yield f - # _ensure_blob(): - # - # Fetch and add blob if it's not already local. - # - # Args: - # remote (Remote): The remote to use. - # digest (Digest): Digest object for the blob to fetch. - # - # Returns: - # (str): The path of the object - # - def _ensure_blob(self, remote, digest): - objpath = self.objpath(digest) - if os.path.exists(objpath): - # already in local repository - return objpath - - batch = _CASBatchRead(remote) - batch.add(digest) - batch.send() - - return objpath - - # Helper function for _fetch_directory(). - def _fetch_directory_batch(self, remote, batch, fetch_queue, fetch_next_queue): - batch.send() - - # All previously scheduled directories are now locally available, - # move them to the processing queue. - fetch_queue.extend(fetch_next_queue) - fetch_next_queue.clear() - return _CASBatchRead(remote) - - # Helper function for _fetch_directory(). - def _fetch_directory_node(self, remote, digest, batch, fetch_queue, fetch_next_queue, *, recursive=False): - in_local_cache = os.path.exists(self.objpath(digest)) - - if in_local_cache: - # Skip download, already in local cache. - pass - else: - batch.add(digest) - - if recursive: - if in_local_cache: - # Add directory to processing queue. - fetch_queue.append(digest) - else: - # Directory will be available after completing pending batch. - # Add directory to deferred processing queue. - fetch_next_queue.append(digest) - - return batch - # _fetch_directory(): # # Fetches remote directory and adds it to content addressable store. # - # This recursively fetches directory objects but doesn't fetch any - # files. + # This recursively fetches directory objects and files. # # Args: # remote (Remote): The remote to use. # dir_digest (Digest): Digest object for the directory to fetch. # def _fetch_directory(self, remote, dir_digest): - # TODO Use GetTree() if the server supports it - - fetch_queue = [dir_digest] - fetch_next_queue = [] - batch = _CASBatchRead(remote) - - while len(fetch_queue) + len(fetch_next_queue) > 0: - if not fetch_queue: - batch = self._fetch_directory_batch(remote, batch, fetch_queue, fetch_next_queue) - - dir_digest = fetch_queue.pop(0) - - objpath = self._ensure_blob(remote, dir_digest) + local_cas = self.get_local_cas() - directory = remote_execution_pb2.Directory() - with open(objpath, "rb") as f: - directory.ParseFromString(f.read()) + request = local_cas_pb2.FetchTreeRequest() + request.instance_name = remote.local_cas_instance_name + request.root_digest.CopyFrom(dir_digest) + request.fetch_file_blobs = False - for dirnode in directory.directories: - batch = self._fetch_directory_node( - remote, dirnode.digest, batch, fetch_queue, fetch_next_queue, recursive=True - ) + try: + local_cas.FetchTree(request) + except grpc.RpcError as e: + if e.code() == grpc.StatusCode.NOT_FOUND: + raise BlobNotFound( + dir_digest.hash, + "Failed to fetch directory tree {}: {}: {}".format(dir_digest.hash, e.code().name, e.details()), + ) from e + raise CASCacheError( + "Failed to fetch directory tree {}: {}: {}".format(dir_digest.hash, e.code().name, e.details()) + ) from e - # Fetch final batch - self._fetch_directory_batch(remote, batch, fetch_queue, fetch_next_queue) + required_blobs = self.required_blobs_for_directory(dir_digest) + self.fetch_blobs(remote, required_blobs) def _fetch_tree(self, remote, digest): - objpath = self._ensure_blob(remote, digest) + self.fetch_blobs(remote, [digest]) tree = remote_execution_pb2.Tree() - with open(objpath, "rb") as f: + with self.open(digest, "rb") as f: tree.ParseFromString(f.read()) - tree.children.extend([tree.root]) + dirbuffers = [tree.root.SerializeToString()] for directory in tree.children: - dirbuffer = directory.SerializeToString() - dirdigest = self.add_object(buffer=dirbuffer) - assert dirdigest.size_bytes == len(dirbuffer) + dirbuffers.append(directory.SerializeToString()) + + dirdigests = self.add_objects(buffers=dirbuffers) - return dirdigest + # The digest of the root directory + return dirdigests[0] # fetch_blobs(): # diff --git a/src/buildstream/_elementsourcescache.py b/src/buildstream/_elementsourcescache.py index 84e7633e5..194f3fd4a 100644 --- a/src/buildstream/_elementsourcescache.py +++ b/src/buildstream/_elementsourcescache.py @@ -295,18 +295,11 @@ class ElementSourcesCache(AssetCache): # blobs not existing on the server. # def _pull_source_storage(self, key, source_digest, remote): - def __pull_digest(digest): - self.cas._fetch_directory(remote, digest) - required_blobs = self.cas.required_blobs_for_directory(digest) - missing_blobs = self.cas.local_missing_blobs(required_blobs) - if missing_blobs: - self.cas.fetch_blobs(remote, missing_blobs) - try: # Fetch and parse source proto self.cas.fetch_blobs(remote, [source_digest]) source = source_pb2.Source() - with open(self.cas.objpath(source_digest), "rb") as f: + with self.cas.open(source_digest, "rb") as f: source.ParseFromString(f.read()) # Write the source proto to cache @@ -314,7 +307,7 @@ class ElementSourcesCache(AssetCache): with utils.save_file_atomic(source_path, mode="wb") as f: f.write(source.SerializeToString()) - __pull_digest(source.files) + self.cas._fetch_directory(remote, source.files) except grpc.RpcError as e: if e.code() != grpc.StatusCode.NOT_FOUND: raise SourceCacheError("Failed to pull source with status {}: {}".format(e.code().name, e.details())) diff --git a/src/buildstream/_sourcecache.py b/src/buildstream/_sourcecache.py index 76c22efbd..37d990b4f 100644 --- a/src/buildstream/_sourcecache.py +++ b/src/buildstream/_sourcecache.py @@ -146,9 +146,6 @@ class SourceCache(AssetCache): # Fetch source blobs self.cas._fetch_directory(remote, source_digest) - required_blobs = self.cas.required_blobs_for_directory(source_digest) - missing_blobs = self.cas.local_missing_blobs(required_blobs) - self.cas.fetch_blobs(remote, missing_blobs) source.info("Pulled source {} <- {}".format(display_key, remote)) return True diff --git a/src/buildstream/sandbox/_sandboxremote.py b/src/buildstream/sandbox/_sandboxremote.py index 2ac159337..ff314adba 100644 --- a/src/buildstream/sandbox/_sandboxremote.py +++ b/src/buildstream/sandbox/_sandboxremote.py @@ -276,7 +276,7 @@ class SandboxRemote(SandboxREAPI): dir_digest = vdir._get_digest() required_blobs = cascache.required_blobs_for_directory(dir_digest) - local_missing_blobs = cascache.local_missing_blobs(required_blobs) + local_missing_blobs = cascache.missing_blobs(required_blobs) if local_missing_blobs: if self._output_files_required: # Fetch all blobs from Remote Execution CAS server @@ -319,14 +319,14 @@ class SandboxRemote(SandboxREAPI): # Determine blobs missing on remote try: input_root_digest = action.input_root_digest - missing_blobs = list(cascache.remote_missing_blobs_for_directory(casremote, input_root_digest)) + missing_blobs = list(cascache.missing_blobs_for_directory(input_root_digest, remote=casremote)) except grpc.RpcError as e: raise SandboxError("Failed to determine missing blobs: {}".format(e)) from e # Check if any blobs are also missing locally (partial artifact) # and pull them from the artifact cache. try: - local_missing_blobs = cascache.local_missing_blobs(missing_blobs) + local_missing_blobs = cascache.missing_blobs(missing_blobs) if local_missing_blobs: artifactcache.fetch_missing_blobs(project, local_missing_blobs) except (grpc.RpcError, BstError) as e: @@ -380,13 +380,13 @@ class SandboxRemote(SandboxREAPI): # Forward remote stdout and stderr if stdout: if action_result.stdout_digest.hash: - with open(cascache.objpath(action_result.stdout_digest), "r") as f: + with cascache.open(action_result.stdout_digest, "r") as f: shutil.copyfileobj(f, stdout) elif action_result.stdout_raw: stdout.write(str(action_result.stdout_raw, "utf-8", errors="ignore")) if stderr: if action_result.stderr_digest.hash: - with open(cascache.objpath(action_result.stderr_digest), "r") as f: + with cascache.open(action_result.stderr_digest, "r") as f: shutil.copyfileobj(f, stderr) elif action_result.stderr_raw: stderr.write(str(action_result.stderr_raw, "utf-8", errors="ignore")) diff --git a/src/buildstream/storage/_casbaseddirectory.py b/src/buildstream/storage/_casbaseddirectory.py index 75399953e..c061a28e4 100644 --- a/src/buildstream/storage/_casbaseddirectory.py +++ b/src/buildstream/storage/_casbaseddirectory.py @@ -220,7 +220,7 @@ class CasBasedDirectory(Directory): return newdir def _add_file(self, name, path, modified=False, can_link=False, properties=None): - digest = self.cas_cache.add_object(path=path, link_directly=can_link) + digest = self.cas_cache.add_object(path=path) is_executable = os.access(path, os.X_OK) mtime = None if properties and "mtime" in properties: diff --git a/tests/testutils/artifactshare.py b/tests/testutils/artifactshare.py index bd9c97c61..8ce8a4198 100644 --- a/tests/testutils/artifactshare.py +++ b/tests/testutils/artifactshare.py @@ -209,7 +209,7 @@ class ArtifactShare(BaseArtifactShare): reachable = set() def reachable_dir(digest): - self.cas._reachable_refs_dir(reachable, digest, update_mtime=False, check_exists=True) + self._reachable_refs_dir(reachable, digest) try: artifact_proto_path = self.cas.objpath(artifact_proto_digest) @@ -271,6 +271,25 @@ class ArtifactShare(BaseArtifactShare): shutil.rmtree(self.directory) + def _reachable_refs_dir(self, reachable, tree): + if tree.hash in reachable: + return + + reachable.add(tree.hash) + + directory = remote_execution_pb2.Directory() + + with open(self.cas.objpath(tree), "rb") as f: + directory.ParseFromString(f.read()) + + for filenode in directory.files: + if not os.path.exists(self.cas.objpath(filenode.digest)): + raise FileNotFoundError + reachable.add(filenode.digest.hash) + + for dirnode in directory.directories: + self._reachable_refs_dir(reachable, dirnode.digest) + # create_artifact_share() # |