summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJim MacArthur <jim.macarthur@codethink.co.uk>2018-06-15 14:26:21 +0100
committerJim MacArthur <jim.macarthur@codethink.co.uk>2018-06-26 09:34:16 +0100
commitc628be9f16cf79e3629809d38206feb60dc49b01 (patch)
tree5c6b42362b07ae83c5af3ffaff097a022b62c5db
parent33bf8397eb654f841ddf98cc93590d0ebe206f61 (diff)
downloadbuildstream-c628be9f16cf79e3629809d38206feb60dc49b01.tar.gz
Copy/paste the push_key_only function from push(), for temporary experiments
-rw-r--r--buildstream/_artifactcache/cascache.py86
1 files changed, 85 insertions, 1 deletions
diff --git a/buildstream/_artifactcache/cascache.py b/buildstream/_artifactcache/cascache.py
index 880d93ba4..8823f08af 100644
--- a/buildstream/_artifactcache/cascache.py
+++ b/buildstream/_artifactcache/cascache.py
@@ -252,9 +252,93 @@ class CASCache(ArtifactCache):
self.set_ref(newref, tree)
+ def push_key_only(self, key, project):
+
+ ref = 'worker-source/{}'.format(key)
+
+ push_remotes = [r for r in self._remotes[project] if r.spec.push]
+
+ pushed = False
+
+ for remote in push_remotes:
+ print("push_key_only: Pushing {} to {}".format(ref, remote))
+ remote.init()
+
+ tree = self.resolve_ref(ref)
+
+ # Check whether ref is already on the server in which case
+ # there is no need to push the artifact
+ try:
+ request = buildstream_pb2.GetArtifactRequest()
+ request.key = ref
+ response = remote.artifact_cache.GetArtifact(request)
+
+ if response.artifact.hash == tree.hash and response.artifact.size_bytes == tree.size_bytes:
+ # ref is already on the server with the same tree
+ continue
+
+ except grpc.RpcError as e:
+ if e.code() != grpc.StatusCode.NOT_FOUND:
+ raise
+
+ missing_blobs = {}
+ required_blobs = self._required_blobs(tree)
+
+ # Limit size of FindMissingBlobs request
+ for required_blobs_group in _grouper(required_blobs, 512):
+ request = remote_execution_pb2.FindMissingBlobsRequest()
+
+ for required_digest in required_blobs_group:
+ d = request.blob_digests.add()
+ d.hash = required_digest.hash
+ d.size_bytes = required_digest.size_bytes
+
+ response = remote.cas.FindMissingBlobs(request)
+ for digest in response.missing_blob_digests:
+ d = remote_execution_pb2.Digest()
+ d.hash = digest.hash
+ d.size_bytes = digest.size_bytes
+ missing_blobs[d.hash] = d
+
+ # Upload any blobs missing on the server
+ for digest in missing_blobs.values():
+ def request_stream():
+ resource_name = os.path.join(digest.hash, str(digest.size_bytes))
+ with open(self.objpath(digest), 'rb') as f:
+ assert os.fstat(f.fileno()).st_size == digest.size_bytes
+ offset = 0
+ finished = False
+ remaining = digest.size_bytes
+ while not finished:
+ chunk_size = min(remaining, 64 * 1024)
+ remaining -= chunk_size
+
+ request = bytestream_pb2.WriteRequest()
+ request.write_offset = offset
+ # max. 64 kB chunks
+ request.data = f.read(chunk_size)
+ request.resource_name = resource_name
+ request.finish_write = remaining <= 0
+ yield request
+ offset += chunk_size
+ finished = request.finish_write
+ response = remote.bytestream.Write(request_stream())
+
+ request = buildstream_pb2.UpdateArtifactRequest()
+ request.keys.append(ref)
+ request.artifact.hash = tree.hash
+ request.artifact.size_bytes = tree.size_bytes
+ remote.artifact_cache.UpdateArtifact(request)
+
+ pushed = True
+
+ return pushed
+
def push(self, element, keys):
+ keys = list(keys)
refs = [self.get_artifact_fullname(element, key) for key in keys]
-
+ element.info("Pushing keys ({}): {}".format(len(keys),",".join(["'{}'".format(k) for k in keys])))
+ element.info("Pushing refs ({}): {}".format(len(refs),",".join(refs)))
project = element._get_project()
push_remotes = [r for r in self._remotes[project] if r.spec.push]