diff options
author | Jim MacArthur <jim.macarthur@codethink.co.uk> | 2018-08-02 15:49:33 +0100 |
---|---|---|
committer | Martin Blanchard <martin.blanchard@codethink.co.uk> | 2018-09-07 13:57:28 +0100 |
commit | 936bb93af4c7e416f67786783d8d0b1bd82bbde5 (patch) | |
tree | cc5c899f3a50c65bdd4eca480e97363ae77daec6 | |
parent | 7b32e1ec903d658dfa75c754b0dd45a3e9331638 (diff) | |
download | buildstream-936bb93af4c7e416f67786783d8d0b1bd82bbde5.tar.gz |
cascache.py: Preparation for remote execution
Refactor the push() and pull() implementations so that API additions
needed for remote-execution is made easier.
https://gitlab.com/BuildStream/buildstream/issues/454
-rw-r--r-- | buildstream/_artifactcache/cascache.py | 199 |
1 files changed, 109 insertions, 90 deletions
diff --git a/buildstream/_artifactcache/cascache.py b/buildstream/_artifactcache/cascache.py index 9a9f7024f..9c9d97370 100644 --- a/buildstream/_artifactcache/cascache.py +++ b/buildstream/_artifactcache/cascache.py @@ -76,6 +76,7 @@ class CASCache(ArtifactCache): ################################################ # Implementation of abstract methods # ################################################ + def contains(self, element, key): refpath = self._refpath(self.get_artifact_fullname(element, key)) @@ -153,6 +154,7 @@ class CASCache(ArtifactCache): q = multiprocessing.Queue() for remote_spec in remote_specs: # Use subprocess to avoid creation of gRPC threads in main BuildStream process + # See https://github.com/grpc/grpc/blob/master/doc/fork_support.md for details p = multiprocessing.Process(target=self._initialize_remote, args=(remote_spec, q)) try: @@ -267,8 +269,46 @@ class CASCache(ArtifactCache): self.set_ref(newref, tree) + def _push_refs_to_remote(self, refs, remote): + skipped_remote = True + try: + for ref in refs: + tree = self.resolve_ref(ref) + + # Check whether ref is already on the server in which case + # there is no need to push the artifact + try: + request = buildstream_pb2.GetReferenceRequest() + request.key = ref + response = remote.ref_storage.GetReference(request) + + if response.digest.hash == tree.hash and response.digest.size_bytes == tree.size_bytes: + # ref is already on the server with the same tree + continue + + except grpc.RpcError as e: + if e.code() != grpc.StatusCode.NOT_FOUND: + # Intentionally re-raise RpcError for outer except block. + raise + + self._send_directory(remote, tree) + + request = buildstream_pb2.UpdateReferenceRequest() + request.keys.append(ref) + request.digest.hash = tree.hash + request.digest.size_bytes = tree.size_bytes + remote.ref_storage.UpdateReference(request) + + skipped_remote = False + except grpc.RpcError as e: + if e.code() != grpc.StatusCode.RESOURCE_EXHAUSTED: + raise ArtifactError("Failed to push artifact {}: {}".format(refs, e), temporary=True) from e + + return not skipped_remote + def push(self, element, keys): - refs = [self.get_artifact_fullname(element, key) for key in keys] + + refs = [self.get_artifact_fullname(element, key) for key in list(keys)] project = element._get_project() @@ -278,95 +318,19 @@ class CASCache(ArtifactCache): for remote in push_remotes: remote.init() - skipped_remote = True - element.info("Pushing {} -> {}".format(element._get_brief_display_key(), remote.spec.url)) - try: - for ref in refs: - tree = self.resolve_ref(ref) - - # Check whether ref is already on the server in which case - # there is no need to push the artifact - try: - request = buildstream_pb2.GetReferenceRequest() - request.key = ref - response = remote.ref_storage.GetReference(request) - - if response.digest.hash == tree.hash and response.digest.size_bytes == tree.size_bytes: - # ref is already on the server with the same tree - continue - - except grpc.RpcError as e: - if e.code() != grpc.StatusCode.NOT_FOUND: - # Intentionally re-raise RpcError for outer except block. - raise - - missing_blobs = {} - required_blobs = self._required_blobs(tree) - - # Limit size of FindMissingBlobs request - for required_blobs_group in _grouper(required_blobs, 512): - request = remote_execution_pb2.FindMissingBlobsRequest() - - for required_digest in required_blobs_group: - d = request.blob_digests.add() - d.hash = required_digest.hash - d.size_bytes = required_digest.size_bytes - - response = remote.cas.FindMissingBlobs(request) - for digest in response.missing_blob_digests: - d = remote_execution_pb2.Digest() - d.hash = digest.hash - d.size_bytes = digest.size_bytes - missing_blobs[d.hash] = d - - # Upload any blobs missing on the server - skipped_remote = False - for digest in missing_blobs.values(): - uuid_ = uuid.uuid4() - resource_name = '/'.join(['uploads', str(uuid_), 'blobs', - digest.hash, str(digest.size_bytes)]) - - def request_stream(resname): - with open(self.objpath(digest), 'rb') as f: - assert os.fstat(f.fileno()).st_size == digest.size_bytes - offset = 0 - finished = False - remaining = digest.size_bytes - while not finished: - chunk_size = min(remaining, 64 * 1024) - remaining -= chunk_size - - request = bytestream_pb2.WriteRequest() - request.write_offset = offset - # max. 64 kB chunks - request.data = f.read(chunk_size) - request.resource_name = resname - request.finish_write = remaining <= 0 - yield request - offset += chunk_size - finished = request.finish_write - response = remote.bytestream.Write(request_stream(resource_name)) - - request = buildstream_pb2.UpdateReferenceRequest() - request.keys.append(ref) - request.digest.hash = tree.hash - request.digest.size_bytes = tree.size_bytes - remote.ref_storage.UpdateReference(request) - - pushed = True - - except grpc.RpcError as e: - if e.code() != grpc.StatusCode.RESOURCE_EXHAUSTED: - raise ArtifactError("Failed to push artifact {}: {}".format(refs, e), temporary=True) from e + element.info("Pushing {} -> {}".format(element._get_brief_display_key(), remote.spec.url)) - if skipped_remote: + if self._push_refs_to_remote(refs, remote): + pushed = True + else: self.context.message(Message( None, MessageType.SKIPPED, "Remote ({}) already has {} cached".format( remote.spec.url, element._get_brief_display_key()) )) + return pushed ################################################ @@ -599,6 +563,7 @@ class CASCache(ArtifactCache): ################################################ # Local Private Methods # ################################################ + def _checkout(self, dest, tree): os.makedirs(dest, exist_ok=True) @@ -761,16 +726,16 @@ class CASCache(ArtifactCache): # q.put(str(e)) - def _required_blobs(self, tree): + def _required_blobs(self, directory_digest): # parse directory, and recursively add blobs d = remote_execution_pb2.Digest() - d.hash = tree.hash - d.size_bytes = tree.size_bytes + d.hash = directory_digest.hash + d.size_bytes = directory_digest.size_bytes yield d directory = remote_execution_pb2.Directory() - with open(self.objpath(tree), 'rb') as f: + with open(self.objpath(directory_digest), 'rb') as f: directory.ParseFromString(f.read()) for filenode in directory.files: @@ -782,16 +747,16 @@ class CASCache(ArtifactCache): for dirnode in directory.directories: yield from self._required_blobs(dirnode.digest) - def _fetch_blob(self, remote, digest, out): + def _fetch_blob(self, remote, digest, stream): resource_name = '/'.join(['blobs', digest.hash, str(digest.size_bytes)]) request = bytestream_pb2.ReadRequest() request.resource_name = resource_name request.read_offset = 0 for response in remote.bytestream.Read(request): - out.write(response.data) + stream.write(response.data) + stream.flush() - out.flush() - assert digest.size_bytes == os.fstat(out.fileno()).st_size + assert digest.size_bytes == os.fstat(stream.fileno()).st_size def _fetch_directory(self, remote, tree): objpath = self.objpath(tree) @@ -827,6 +792,60 @@ class CASCache(ArtifactCache): digest = self.add_object(path=out.name) assert digest.hash == tree.hash + def _send_blob(self, remote, digest, stream, u_uid=uuid.uuid4()): + resource_name = '/'.join(['uploads', str(u_uid), 'blobs', + digest.hash, str(digest.size_bytes)]) + + def request_stream(resname, instream): + offset = 0 + finished = False + remaining = digest.size_bytes + while not finished: + chunk_size = min(remaining, 64 * 1024) + remaining -= chunk_size + + request = bytestream_pb2.WriteRequest() + request.write_offset = offset + # max. 64 kB chunks + request.data = instream.read(chunk_size) + request.resource_name = resname + request.finish_write = remaining <= 0 + + yield request + + offset += chunk_size + finished = request.finish_write + + response = remote.bytestream.Write(request_stream(resource_name, stream)) + + assert response.committed_size == digest.size_bytes + + def _send_directory(self, remote, digest, u_uid=uuid.uuid4()): + required_blobs = self._required_blobs(digest) + + missing_blobs = dict() + # Limit size of FindMissingBlobs request + for required_blobs_group in _grouper(required_blobs, 512): + request = remote_execution_pb2.FindMissingBlobsRequest() + + for required_digest in required_blobs_group: + d = request.blob_digests.add() + d.hash = required_digest.hash + d.size_bytes = required_digest.size_bytes + + response = remote.cas.FindMissingBlobs(request) + for missing_digest in response.missing_blob_digests: + d = remote_execution_pb2.Digest() + d.hash = missing_digest.hash + d.size_bytes = missing_digest.size_bytes + missing_blobs[d.hash] = d + + # Upload any blobs missing on the server + for blob_digest in missing_blobs.values(): + with open(self.objpath(blob_digest), 'rb') as f: + assert os.fstat(f.fileno()).st_size == blob_digest.size_bytes + self._send_blob(remote, blob_digest, f, u_uid=u_uid) + # Represents a single remote CAS cache. # |