summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJim MacArthur <jim.macarthur@codethink.co.uk>2018-08-02 15:49:33 +0100
committerMartin Blanchard <martin.blanchard@codethink.co.uk>2018-09-07 13:57:28 +0100
commit936bb93af4c7e416f67786783d8d0b1bd82bbde5 (patch)
treecc5c899f3a50c65bdd4eca480e97363ae77daec6
parent7b32e1ec903d658dfa75c754b0dd45a3e9331638 (diff)
downloadbuildstream-936bb93af4c7e416f67786783d8d0b1bd82bbde5.tar.gz
cascache.py: Preparation for remote execution
Refactor the push() and pull() implementations so that API additions needed for remote-execution is made easier. https://gitlab.com/BuildStream/buildstream/issues/454
-rw-r--r--buildstream/_artifactcache/cascache.py199
1 files changed, 109 insertions, 90 deletions
diff --git a/buildstream/_artifactcache/cascache.py b/buildstream/_artifactcache/cascache.py
index 9a9f7024f..9c9d97370 100644
--- a/buildstream/_artifactcache/cascache.py
+++ b/buildstream/_artifactcache/cascache.py
@@ -76,6 +76,7 @@ class CASCache(ArtifactCache):
################################################
# Implementation of abstract methods #
################################################
+
def contains(self, element, key):
refpath = self._refpath(self.get_artifact_fullname(element, key))
@@ -153,6 +154,7 @@ class CASCache(ArtifactCache):
q = multiprocessing.Queue()
for remote_spec in remote_specs:
# Use subprocess to avoid creation of gRPC threads in main BuildStream process
+ # See https://github.com/grpc/grpc/blob/master/doc/fork_support.md for details
p = multiprocessing.Process(target=self._initialize_remote, args=(remote_spec, q))
try:
@@ -267,8 +269,46 @@ class CASCache(ArtifactCache):
self.set_ref(newref, tree)
+ def _push_refs_to_remote(self, refs, remote):
+ skipped_remote = True
+ try:
+ for ref in refs:
+ tree = self.resolve_ref(ref)
+
+ # Check whether ref is already on the server in which case
+ # there is no need to push the artifact
+ try:
+ request = buildstream_pb2.GetReferenceRequest()
+ request.key = ref
+ response = remote.ref_storage.GetReference(request)
+
+ if response.digest.hash == tree.hash and response.digest.size_bytes == tree.size_bytes:
+ # ref is already on the server with the same tree
+ continue
+
+ except grpc.RpcError as e:
+ if e.code() != grpc.StatusCode.NOT_FOUND:
+ # Intentionally re-raise RpcError for outer except block.
+ raise
+
+ self._send_directory(remote, tree)
+
+ request = buildstream_pb2.UpdateReferenceRequest()
+ request.keys.append(ref)
+ request.digest.hash = tree.hash
+ request.digest.size_bytes = tree.size_bytes
+ remote.ref_storage.UpdateReference(request)
+
+ skipped_remote = False
+ except grpc.RpcError as e:
+ if e.code() != grpc.StatusCode.RESOURCE_EXHAUSTED:
+ raise ArtifactError("Failed to push artifact {}: {}".format(refs, e), temporary=True) from e
+
+ return not skipped_remote
+
def push(self, element, keys):
- refs = [self.get_artifact_fullname(element, key) for key in keys]
+
+ refs = [self.get_artifact_fullname(element, key) for key in list(keys)]
project = element._get_project()
@@ -278,95 +318,19 @@ class CASCache(ArtifactCache):
for remote in push_remotes:
remote.init()
- skipped_remote = True
- element.info("Pushing {} -> {}".format(element._get_brief_display_key(), remote.spec.url))
- try:
- for ref in refs:
- tree = self.resolve_ref(ref)
-
- # Check whether ref is already on the server in which case
- # there is no need to push the artifact
- try:
- request = buildstream_pb2.GetReferenceRequest()
- request.key = ref
- response = remote.ref_storage.GetReference(request)
-
- if response.digest.hash == tree.hash and response.digest.size_bytes == tree.size_bytes:
- # ref is already on the server with the same tree
- continue
-
- except grpc.RpcError as e:
- if e.code() != grpc.StatusCode.NOT_FOUND:
- # Intentionally re-raise RpcError for outer except block.
- raise
-
- missing_blobs = {}
- required_blobs = self._required_blobs(tree)
-
- # Limit size of FindMissingBlobs request
- for required_blobs_group in _grouper(required_blobs, 512):
- request = remote_execution_pb2.FindMissingBlobsRequest()
-
- for required_digest in required_blobs_group:
- d = request.blob_digests.add()
- d.hash = required_digest.hash
- d.size_bytes = required_digest.size_bytes
-
- response = remote.cas.FindMissingBlobs(request)
- for digest in response.missing_blob_digests:
- d = remote_execution_pb2.Digest()
- d.hash = digest.hash
- d.size_bytes = digest.size_bytes
- missing_blobs[d.hash] = d
-
- # Upload any blobs missing on the server
- skipped_remote = False
- for digest in missing_blobs.values():
- uuid_ = uuid.uuid4()
- resource_name = '/'.join(['uploads', str(uuid_), 'blobs',
- digest.hash, str(digest.size_bytes)])
-
- def request_stream(resname):
- with open(self.objpath(digest), 'rb') as f:
- assert os.fstat(f.fileno()).st_size == digest.size_bytes
- offset = 0
- finished = False
- remaining = digest.size_bytes
- while not finished:
- chunk_size = min(remaining, 64 * 1024)
- remaining -= chunk_size
-
- request = bytestream_pb2.WriteRequest()
- request.write_offset = offset
- # max. 64 kB chunks
- request.data = f.read(chunk_size)
- request.resource_name = resname
- request.finish_write = remaining <= 0
- yield request
- offset += chunk_size
- finished = request.finish_write
- response = remote.bytestream.Write(request_stream(resource_name))
-
- request = buildstream_pb2.UpdateReferenceRequest()
- request.keys.append(ref)
- request.digest.hash = tree.hash
- request.digest.size_bytes = tree.size_bytes
- remote.ref_storage.UpdateReference(request)
-
- pushed = True
-
- except grpc.RpcError as e:
- if e.code() != grpc.StatusCode.RESOURCE_EXHAUSTED:
- raise ArtifactError("Failed to push artifact {}: {}".format(refs, e), temporary=True) from e
+ element.info("Pushing {} -> {}".format(element._get_brief_display_key(), remote.spec.url))
- if skipped_remote:
+ if self._push_refs_to_remote(refs, remote):
+ pushed = True
+ else:
self.context.message(Message(
None,
MessageType.SKIPPED,
"Remote ({}) already has {} cached".format(
remote.spec.url, element._get_brief_display_key())
))
+
return pushed
################################################
@@ -599,6 +563,7 @@ class CASCache(ArtifactCache):
################################################
# Local Private Methods #
################################################
+
def _checkout(self, dest, tree):
os.makedirs(dest, exist_ok=True)
@@ -761,16 +726,16 @@ class CASCache(ArtifactCache):
#
q.put(str(e))
- def _required_blobs(self, tree):
+ def _required_blobs(self, directory_digest):
# parse directory, and recursively add blobs
d = remote_execution_pb2.Digest()
- d.hash = tree.hash
- d.size_bytes = tree.size_bytes
+ d.hash = directory_digest.hash
+ d.size_bytes = directory_digest.size_bytes
yield d
directory = remote_execution_pb2.Directory()
- with open(self.objpath(tree), 'rb') as f:
+ with open(self.objpath(directory_digest), 'rb') as f:
directory.ParseFromString(f.read())
for filenode in directory.files:
@@ -782,16 +747,16 @@ class CASCache(ArtifactCache):
for dirnode in directory.directories:
yield from self._required_blobs(dirnode.digest)
- def _fetch_blob(self, remote, digest, out):
+ def _fetch_blob(self, remote, digest, stream):
resource_name = '/'.join(['blobs', digest.hash, str(digest.size_bytes)])
request = bytestream_pb2.ReadRequest()
request.resource_name = resource_name
request.read_offset = 0
for response in remote.bytestream.Read(request):
- out.write(response.data)
+ stream.write(response.data)
+ stream.flush()
- out.flush()
- assert digest.size_bytes == os.fstat(out.fileno()).st_size
+ assert digest.size_bytes == os.fstat(stream.fileno()).st_size
def _fetch_directory(self, remote, tree):
objpath = self.objpath(tree)
@@ -827,6 +792,60 @@ class CASCache(ArtifactCache):
digest = self.add_object(path=out.name)
assert digest.hash == tree.hash
+ def _send_blob(self, remote, digest, stream, u_uid=uuid.uuid4()):
+ resource_name = '/'.join(['uploads', str(u_uid), 'blobs',
+ digest.hash, str(digest.size_bytes)])
+
+ def request_stream(resname, instream):
+ offset = 0
+ finished = False
+ remaining = digest.size_bytes
+ while not finished:
+ chunk_size = min(remaining, 64 * 1024)
+ remaining -= chunk_size
+
+ request = bytestream_pb2.WriteRequest()
+ request.write_offset = offset
+ # max. 64 kB chunks
+ request.data = instream.read(chunk_size)
+ request.resource_name = resname
+ request.finish_write = remaining <= 0
+
+ yield request
+
+ offset += chunk_size
+ finished = request.finish_write
+
+ response = remote.bytestream.Write(request_stream(resource_name, stream))
+
+ assert response.committed_size == digest.size_bytes
+
+ def _send_directory(self, remote, digest, u_uid=uuid.uuid4()):
+ required_blobs = self._required_blobs(digest)
+
+ missing_blobs = dict()
+ # Limit size of FindMissingBlobs request
+ for required_blobs_group in _grouper(required_blobs, 512):
+ request = remote_execution_pb2.FindMissingBlobsRequest()
+
+ for required_digest in required_blobs_group:
+ d = request.blob_digests.add()
+ d.hash = required_digest.hash
+ d.size_bytes = required_digest.size_bytes
+
+ response = remote.cas.FindMissingBlobs(request)
+ for missing_digest in response.missing_blob_digests:
+ d = remote_execution_pb2.Digest()
+ d.hash = missing_digest.hash
+ d.size_bytes = missing_digest.size_bytes
+ missing_blobs[d.hash] = d
+
+ # Upload any blobs missing on the server
+ for blob_digest in missing_blobs.values():
+ with open(self.objpath(blob_digest), 'rb') as f:
+ assert os.fstat(f.fileno()).st_size == blob_digest.size_bytes
+ self._send_blob(remote, blob_digest, f, u_uid=u_uid)
+
# Represents a single remote CAS cache.
#