diff options
author | Jürg Billeter <j@bitron.ch> | 2018-10-01 15:32:30 +0000 |
---|---|---|
committer | Jürg Billeter <j@bitron.ch> | 2018-10-01 15:32:30 +0000 |
commit | cf00c0a1317dddbab08859dd02c41b38eb35b9b9 (patch) | |
tree | 620ace79f14883e0f586725226fed77807087970 | |
parent | fafa81367d505cebe564d8f2dbf6cbee5bfcc118 (diff) | |
parent | f47895c0818059859a2133ce98e1abdc0be00ad3 (diff) | |
download | buildstream-cf00c0a1317dddbab08859dd02c41b38eb35b9b9.tar.gz |
Merge branch 'juerg/cas-batch' into 'master'
CAS: Implement BatchUpdateBlobs support
Closes #677 and #676
See merge request BuildStream/buildstream!839
-rw-r--r-- | buildstream/_artifactcache/cascache.py | 78 | ||||
-rw-r--r-- | buildstream/_artifactcache/casserver.py | 45 |
2 files changed, 117 insertions, 6 deletions
diff --git a/buildstream/_artifactcache/cascache.py b/buildstream/_artifactcache/cascache.py index 936cb780b..3a3181bfb 100644 --- a/buildstream/_artifactcache/cascache.py +++ b/buildstream/_artifactcache/cascache.py @@ -1048,10 +1048,29 @@ class CASCache(ArtifactCache): missing_blobs[d.hash] = d # Upload any blobs missing on the server - for blob_digest in missing_blobs.values(): - with open(self.objpath(blob_digest), 'rb') as f: - assert os.fstat(f.fileno()).st_size == blob_digest.size_bytes - self._send_blob(remote, blob_digest, f, u_uid=u_uid) + self._send_blobs(remote, missing_blobs.values(), u_uid) + + def _send_blobs(self, remote, digests, u_uid=uuid.uuid4()): + batch = _CASBatchUpdate(remote) + + for digest in digests: + with open(self.objpath(digest), 'rb') as f: + assert os.fstat(f.fileno()).st_size == digest.size_bytes + + if (digest.size_bytes >= remote.max_batch_total_size_bytes or + not remote.batch_update_supported): + # Too large for batch request, upload in independent request. + self._send_blob(remote, digest, f, u_uid=u_uid) + else: + if not batch.add(digest, f): + # Not enough space left in batch request. + # Complete pending batch first. + batch.send() + batch = _CASBatchUpdate(remote) + batch.add(digest, f) + + # Send final batch + batch.send() # Represents a single remote CAS cache. @@ -1126,6 +1145,17 @@ class _CASRemote(): if e.code() != grpc.StatusCode.UNIMPLEMENTED: raise + # Check whether the server supports BatchUpdateBlobs() + self.batch_update_supported = False + try: + request = remote_execution_pb2.BatchUpdateBlobsRequest() + response = self.cas.BatchUpdateBlobs(request) + self.batch_update_supported = True + except grpc.RpcError as e: + if (e.code() != grpc.StatusCode.UNIMPLEMENTED and + e.code() != grpc.StatusCode.PERMISSION_DENIED): + raise + self._initialized = True @@ -1173,6 +1203,46 @@ class _CASBatchRead(): yield (response.digest, response.data) +# Represents a batch of blobs queued for upload. +# +class _CASBatchUpdate(): + def __init__(self, remote): + self._remote = remote + self._max_total_size_bytes = remote.max_batch_total_size_bytes + self._request = remote_execution_pb2.BatchUpdateBlobsRequest() + self._size = 0 + self._sent = False + + def add(self, digest, stream): + assert not self._sent + + new_batch_size = self._size + digest.size_bytes + if new_batch_size > self._max_total_size_bytes: + # Not enough space left in current batch + return False + + blob_request = self._request.requests.add() + blob_request.digest.hash = digest.hash + blob_request.digest.size_bytes = digest.size_bytes + blob_request.data = stream.read(digest.size_bytes) + self._size = new_batch_size + return True + + def send(self): + assert not self._sent + self._sent = True + + if len(self._request.requests) == 0: + return + + batch_response = self._remote.cas.BatchUpdateBlobs(self._request) + + for response in batch_response.responses: + if response.status.code != grpc.StatusCode.OK.value[0]: + raise ArtifactError("Failed to upload blob {}: {}".format( + response.digest.hash, response.status.code)) + + def _grouper(iterable, n): while True: try: diff --git a/buildstream/_artifactcache/casserver.py b/buildstream/_artifactcache/casserver.py index b51572755..31b05ce0f 100644 --- a/buildstream/_artifactcache/casserver.py +++ b/buildstream/_artifactcache/casserver.py @@ -68,7 +68,7 @@ def create_server(repo, *, enable_push): _ByteStreamServicer(artifactcache, enable_push=enable_push), server) remote_execution_pb2_grpc.add_ContentAddressableStorageServicer_to_server( - _ContentAddressableStorageServicer(artifactcache), server) + _ContentAddressableStorageServicer(artifactcache, enable_push=enable_push), server) remote_execution_pb2_grpc.add_CapabilitiesServicer_to_server( _CapabilitiesServicer(), server) @@ -222,9 +222,10 @@ class _ByteStreamServicer(bytestream_pb2_grpc.ByteStreamServicer): class _ContentAddressableStorageServicer(remote_execution_pb2_grpc.ContentAddressableStorageServicer): - def __init__(self, cas): + def __init__(self, cas, *, enable_push): super().__init__() self.cas = cas + self.enable_push = enable_push def FindMissingBlobs(self, request, context): response = remote_execution_pb2.FindMissingBlobsResponse() @@ -260,6 +261,46 @@ class _ContentAddressableStorageServicer(remote_execution_pb2_grpc.ContentAddres return response + def BatchUpdateBlobs(self, request, context): + response = remote_execution_pb2.BatchUpdateBlobsResponse() + + if not self.enable_push: + context.set_code(grpc.StatusCode.PERMISSION_DENIED) + return response + + batch_size = 0 + + for blob_request in request.requests: + digest = blob_request.digest + + batch_size += digest.size_bytes + if batch_size > _MAX_PAYLOAD_BYTES: + context.set_code(grpc.StatusCode.INVALID_ARGUMENT) + return response + + blob_response = response.responses.add() + blob_response.digest.hash = digest.hash + blob_response.digest.size_bytes = digest.size_bytes + + if len(blob_request.data) != digest.size_bytes: + blob_response.status.code = grpc.StatusCode.FAILED_PRECONDITION + continue + + try: + _clean_up_cache(self.cas, digest.size_bytes) + + with tempfile.NamedTemporaryFile(dir=self.cas.tmpdir) as out: + out.write(blob_request.data) + out.flush() + server_digest = self.cas.add_object(path=out.name) + if server_digest.hash != digest.hash: + blob_response.status.code = grpc.StatusCode.FAILED_PRECONDITION + + except ArtifactTooLargeException: + blob_response.status.code = grpc.StatusCode.RESOURCE_EXHAUSTED + + return response + class _CapabilitiesServicer(remote_execution_pb2_grpc.CapabilitiesServicer): def GetCapabilities(self, request, context): |