diff options
author | Jim MacArthur <jim.macarthur@codethink.co.uk> | 2018-06-15 14:26:21 +0100 |
---|---|---|
committer | Jim MacArthur <jim.macarthur@codethink.co.uk> | 2018-06-26 09:34:16 +0100 |
commit | c628be9f16cf79e3629809d38206feb60dc49b01 (patch) | |
tree | 5c6b42362b07ae83c5af3ffaff097a022b62c5db | |
parent | 33bf8397eb654f841ddf98cc93590d0ebe206f61 (diff) | |
download | buildstream-c628be9f16cf79e3629809d38206feb60dc49b01.tar.gz |
Copy/paste the push_key_only function from push(), for temporary experiments
-rw-r--r-- | buildstream/_artifactcache/cascache.py | 86 |
1 files changed, 85 insertions, 1 deletions
diff --git a/buildstream/_artifactcache/cascache.py b/buildstream/_artifactcache/cascache.py index 880d93ba4..8823f08af 100644 --- a/buildstream/_artifactcache/cascache.py +++ b/buildstream/_artifactcache/cascache.py @@ -252,9 +252,93 @@ class CASCache(ArtifactCache): self.set_ref(newref, tree) + def push_key_only(self, key, project): + + ref = 'worker-source/{}'.format(key) + + push_remotes = [r for r in self._remotes[project] if r.spec.push] + + pushed = False + + for remote in push_remotes: + print("push_key_only: Pushing {} to {}".format(ref, remote)) + remote.init() + + tree = self.resolve_ref(ref) + + # Check whether ref is already on the server in which case + # there is no need to push the artifact + try: + request = buildstream_pb2.GetArtifactRequest() + request.key = ref + response = remote.artifact_cache.GetArtifact(request) + + if response.artifact.hash == tree.hash and response.artifact.size_bytes == tree.size_bytes: + # ref is already on the server with the same tree + continue + + except grpc.RpcError as e: + if e.code() != grpc.StatusCode.NOT_FOUND: + raise + + missing_blobs = {} + required_blobs = self._required_blobs(tree) + + # Limit size of FindMissingBlobs request + for required_blobs_group in _grouper(required_blobs, 512): + request = remote_execution_pb2.FindMissingBlobsRequest() + + for required_digest in required_blobs_group: + d = request.blob_digests.add() + d.hash = required_digest.hash + d.size_bytes = required_digest.size_bytes + + response = remote.cas.FindMissingBlobs(request) + for digest in response.missing_blob_digests: + d = remote_execution_pb2.Digest() + d.hash = digest.hash + d.size_bytes = digest.size_bytes + missing_blobs[d.hash] = d + + # Upload any blobs missing on the server + for digest in missing_blobs.values(): + def request_stream(): + resource_name = os.path.join(digest.hash, str(digest.size_bytes)) + with open(self.objpath(digest), 'rb') as f: + assert os.fstat(f.fileno()).st_size == digest.size_bytes + offset = 0 + finished = False + remaining = digest.size_bytes + while not finished: + chunk_size = min(remaining, 64 * 1024) + remaining -= chunk_size + + request = bytestream_pb2.WriteRequest() + request.write_offset = offset + # max. 64 kB chunks + request.data = f.read(chunk_size) + request.resource_name = resource_name + request.finish_write = remaining <= 0 + yield request + offset += chunk_size + finished = request.finish_write + response = remote.bytestream.Write(request_stream()) + + request = buildstream_pb2.UpdateArtifactRequest() + request.keys.append(ref) + request.artifact.hash = tree.hash + request.artifact.size_bytes = tree.size_bytes + remote.artifact_cache.UpdateArtifact(request) + + pushed = True + + return pushed + def push(self, element, keys): + keys = list(keys) refs = [self.get_artifact_fullname(element, key) for key in keys] - + element.info("Pushing keys ({}): {}".format(len(keys),",".join(["'{}'".format(k) for k in keys]))) + element.info("Pushing refs ({}): {}".format(len(refs),",".join(refs))) project = element._get_project() push_remotes = [r for r in self._remotes[project] if r.spec.push] |