summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJim MacArthur <jim.macarthur@codethink.co.uk>2018-08-02 15:49:33 +0100
committerTristan Van Berkom <tristan.van.berkom@gmail.com>2018-10-03 07:35:51 +0000
commit9568824f3780be032b500694bd2c78d6dd526fa4 (patch)
tree35b2a7ac3f69d51126dadfbafac565f020602976
parent6e820362700e4083cb57a8b8603a20e373f30cee (diff)
downloadbuildstream-9568824f3780be032b500694bd2c78d6dd526fa4.tar.gz
cascache.py: Preparation for remote execution
Refactor the push() and pull() implementations so that API additions needed for remote-execution is made easier. https://gitlab.com/BuildStream/buildstream/issues/454
-rw-r--r--buildstream/_artifactcache/cascache.py205
1 files changed, 111 insertions, 94 deletions
diff --git a/buildstream/_artifactcache/cascache.py b/buildstream/_artifactcache/cascache.py
index e2c0d44b5..ec9d78026 100644
--- a/buildstream/_artifactcache/cascache.py
+++ b/buildstream/_artifactcache/cascache.py
@@ -81,6 +81,7 @@ class CASCache(ArtifactCache):
################################################
# Implementation of abstract methods #
################################################
+
def contains(self, element, key):
refpath = self._refpath(self.get_artifact_fullname(element, key))
@@ -156,6 +157,7 @@ class CASCache(ArtifactCache):
q = multiprocessing.Queue()
for remote_spec in remote_specs:
# Use subprocess to avoid creation of gRPC threads in main BuildStream process
+ # See https://github.com/grpc/grpc/blob/master/doc/fork_support.md for details
p = multiprocessing.Process(target=self._initialize_remote, args=(remote_spec, q))
try:
@@ -268,109 +270,69 @@ class CASCache(ArtifactCache):
self.set_ref(newref, tree)
+ def _push_refs_to_remote(self, refs, remote):
+ skipped_remote = True
+ try:
+ for ref in refs:
+ tree = self.resolve_ref(ref)
+
+ # Check whether ref is already on the server in which case
+ # there is no need to push the artifact
+ try:
+ request = buildstream_pb2.GetReferenceRequest()
+ request.key = ref
+ response = remote.ref_storage.GetReference(request)
+
+ if response.digest.hash == tree.hash and response.digest.size_bytes == tree.size_bytes:
+ # ref is already on the server with the same tree
+ continue
+
+ except grpc.RpcError as e:
+ if e.code() != grpc.StatusCode.NOT_FOUND:
+ # Intentionally re-raise RpcError for outer except block.
+ raise
+
+ self._send_directory(remote, tree)
+
+ request = buildstream_pb2.UpdateReferenceRequest()
+ request.keys.append(ref)
+ request.digest.hash = tree.hash
+ request.digest.size_bytes = tree.size_bytes
+ remote.ref_storage.UpdateReference(request)
+
+ skipped_remote = False
+ except grpc.RpcError as e:
+ if e.code() != grpc.StatusCode.RESOURCE_EXHAUSTED:
+ raise ArtifactError("Failed to push artifact {}: {}".format(refs, e), temporary=True) from e
+
+ return not skipped_remote
+
def push(self, element, keys):
- refs = [self.get_artifact_fullname(element, key) for key in keys]
+
+ refs = [self.get_artifact_fullname(element, key) for key in list(keys)]
project = element._get_project()
push_remotes = [r for r in self._remotes[project] if r.spec.push]
pushed = False
- display_key = element._get_brief_display_key()
+
for remote in push_remotes:
remote.init()
- skipped_remote = True
+ display_key = element._get_brief_display_key()
element.status("Pushing artifact {} -> {}".format(display_key, remote.spec.url))
- try:
- for ref in refs:
- tree = self.resolve_ref(ref)
-
- # Check whether ref is already on the server in which case
- # there is no need to push the artifact
- try:
- request = buildstream_pb2.GetReferenceRequest()
- request.key = ref
- response = remote.ref_storage.GetReference(request)
-
- if response.digest.hash == tree.hash and response.digest.size_bytes == tree.size_bytes:
- # ref is already on the server with the same tree
- continue
-
- except grpc.RpcError as e:
- if e.code() != grpc.StatusCode.NOT_FOUND:
- # Intentionally re-raise RpcError for outer except block.
- raise
-
- missing_blobs = {}
- required_blobs = self._required_blobs(tree)
-
- # Limit size of FindMissingBlobs request
- for required_blobs_group in _grouper(required_blobs, 512):
- request = remote_execution_pb2.FindMissingBlobsRequest()
-
- for required_digest in required_blobs_group:
- d = request.blob_digests.add()
- d.hash = required_digest.hash
- d.size_bytes = required_digest.size_bytes
-
- response = remote.cas.FindMissingBlobs(request)
- for digest in response.missing_blob_digests:
- d = remote_execution_pb2.Digest()
- d.hash = digest.hash
- d.size_bytes = digest.size_bytes
- missing_blobs[d.hash] = d
-
- # Upload any blobs missing on the server
- skipped_remote = False
- for digest in missing_blobs.values():
- uuid_ = uuid.uuid4()
- resource_name = '/'.join(['uploads', str(uuid_), 'blobs',
- digest.hash, str(digest.size_bytes)])
-
- def request_stream(resname):
- with open(self.objpath(digest), 'rb') as f:
- assert os.fstat(f.fileno()).st_size == digest.size_bytes
- offset = 0
- finished = False
- remaining = digest.size_bytes
- while not finished:
- chunk_size = min(remaining, _MAX_PAYLOAD_BYTES)
- remaining -= chunk_size
-
- request = bytestream_pb2.WriteRequest()
- request.write_offset = offset
- # max. _MAX_PAYLOAD_BYTES chunks
- request.data = f.read(chunk_size)
- request.resource_name = resname
- request.finish_write = remaining <= 0
- yield request
- offset += chunk_size
- finished = request.finish_write
- response = remote.bytestream.Write(request_stream(resource_name))
-
- request = buildstream_pb2.UpdateReferenceRequest()
- request.keys.append(ref)
- request.digest.hash = tree.hash
- request.digest.size_bytes = tree.size_bytes
- remote.ref_storage.UpdateReference(request)
-
- pushed = True
-
- if not skipped_remote:
- element.info("Pushed artifact {} -> {}".format(display_key, remote.spec.url))
-
- except grpc.RpcError as e:
- if e.code() != grpc.StatusCode.RESOURCE_EXHAUSTED:
- raise ArtifactError("Failed to push artifact {}: {}".format(refs, e), temporary=True) from e
-
- if skipped_remote:
+ if self._push_refs_to_remote(refs, remote):
+ element.info("Pushed artifact {} -> {}".format(display_key, remote.spec.url))
+ pushed = True
+ else:
self.context.message(Message(
None,
MessageType.INFO,
"Remote ({}) already has {} cached".format(
remote.spec.url, element._get_brief_display_key())
))
+
return pushed
################################################
@@ -599,6 +561,7 @@ class CASCache(ArtifactCache):
################################################
# Local Private Methods #
################################################
+
def _checkout(self, dest, tree):
os.makedirs(dest, exist_ok=True)
@@ -776,16 +739,16 @@ class CASCache(ArtifactCache):
#
q.put(str(e))
- def _required_blobs(self, tree):
+ def _required_blobs(self, directory_digest):
# parse directory, and recursively add blobs
d = remote_execution_pb2.Digest()
- d.hash = tree.hash
- d.size_bytes = tree.size_bytes
+ d.hash = directory_digest.hash
+ d.size_bytes = directory_digest.size_bytes
yield d
directory = remote_execution_pb2.Directory()
- with open(self.objpath(tree), 'rb') as f:
+ with open(self.objpath(directory_digest), 'rb') as f:
directory.ParseFromString(f.read())
for filenode in directory.files:
@@ -797,16 +760,16 @@ class CASCache(ArtifactCache):
for dirnode in directory.directories:
yield from self._required_blobs(dirnode.digest)
- def _fetch_blob(self, remote, digest, out):
+ def _fetch_blob(self, remote, digest, stream):
resource_name = '/'.join(['blobs', digest.hash, str(digest.size_bytes)])
request = bytestream_pb2.ReadRequest()
request.resource_name = resource_name
request.read_offset = 0
for response in remote.bytestream.Read(request):
- out.write(response.data)
+ stream.write(response.data)
+ stream.flush()
- out.flush()
- assert digest.size_bytes == os.fstat(out.fileno()).st_size
+ assert digest.size_bytes == os.fstat(stream.fileno()).st_size
# _ensure_blob():
#
@@ -922,6 +885,60 @@ class CASCache(ArtifactCache):
# Fetch final batch
self._fetch_directory_batch(remote, batch, fetch_queue, fetch_next_queue)
+ def _send_blob(self, remote, digest, stream, u_uid=uuid.uuid4()):
+ resource_name = '/'.join(['uploads', str(u_uid), 'blobs',
+ digest.hash, str(digest.size_bytes)])
+
+ def request_stream(resname, instream):
+ offset = 0
+ finished = False
+ remaining = digest.size_bytes
+ while not finished:
+ chunk_size = min(remaining, _MAX_PAYLOAD_BYTES)
+ remaining -= chunk_size
+
+ request = bytestream_pb2.WriteRequest()
+ request.write_offset = offset
+ # max. _MAX_PAYLOAD_BYTES chunks
+ request.data = instream.read(chunk_size)
+ request.resource_name = resname
+ request.finish_write = remaining <= 0
+
+ yield request
+
+ offset += chunk_size
+ finished = request.finish_write
+
+ response = remote.bytestream.Write(request_stream(resource_name, stream))
+
+ assert response.committed_size == digest.size_bytes
+
+ def _send_directory(self, remote, digest, u_uid=uuid.uuid4()):
+ required_blobs = self._required_blobs(digest)
+
+ missing_blobs = dict()
+ # Limit size of FindMissingBlobs request
+ for required_blobs_group in _grouper(required_blobs, 512):
+ request = remote_execution_pb2.FindMissingBlobsRequest()
+
+ for required_digest in required_blobs_group:
+ d = request.blob_digests.add()
+ d.hash = required_digest.hash
+ d.size_bytes = required_digest.size_bytes
+
+ response = remote.cas.FindMissingBlobs(request)
+ for missing_digest in response.missing_blob_digests:
+ d = remote_execution_pb2.Digest()
+ d.hash = missing_digest.hash
+ d.size_bytes = missing_digest.size_bytes
+ missing_blobs[d.hash] = d
+
+ # Upload any blobs missing on the server
+ for blob_digest in missing_blobs.values():
+ with open(self.objpath(blob_digest), 'rb') as f:
+ assert os.fstat(f.fileno()).st_size == blob_digest.size_bytes
+ self._send_blob(remote, blob_digest, f, u_uid=u_uid)
+
# Represents a single remote CAS cache.
#