summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJürg Billeter <j@bitron.ch>2019-09-10 08:03:07 +0200
committerJürg Billeter <j@bitron.ch>2019-09-10 08:12:05 +0200
commit0fac0b4416d1becb7c5bddf076c02773d1ce05c4 (patch)
treebb5135d305f15a914e5def80a12fd0ce6a9422bb
parentd7dca0d13f4ce6e3f3922e47a50df8cbd6298496 (diff)
downloadbuildstream-0fac0b4416d1becb7c5bddf076c02773d1ce05c4.tar.gz
casremote.py: Limit request size for batch download and upload
Fixes #1129.
-rw-r--r--src/buildstream/_cas/casremote.py75
1 files changed, 47 insertions, 28 deletions
diff --git a/src/buildstream/_cas/casremote.py b/src/buildstream/_cas/casremote.py
index 1efed22e6..43e215c63 100644
--- a/src/buildstream/_cas/casremote.py
+++ b/src/buildstream/_cas/casremote.py
@@ -12,6 +12,11 @@ from .._exceptions import CASRemoteError
# Limit payload to 1 MiB to leave sufficient headroom for metadata.
_MAX_PAYLOAD_BYTES = 1024 * 1024
+# How many digests to put in a single gRPC message.
+# A 256-bit hash requires 64 bytes of space (hexadecimal encoding).
+# 80 bytes provide sufficient space for hash, size, and protobuf overhead.
+_MAX_DIGESTS = _MAX_PAYLOAD_BYTES / 80
+
class BlobNotFound(CASRemoteError):
@@ -157,13 +162,18 @@ class CASRemote(BaseRemote):
class _CASBatchRead():
def __init__(self, remote):
self._remote = remote
- self._request = local_cas_pb2.FetchMissingBlobsRequest()
- self._request.instance_name = remote.local_cas_instance_name
+ self._requests = []
+ self._request = None
self._sent = False
def add(self, digest):
assert not self._sent
+ if not self._request or len(self._request.blob_digests) >= _MAX_DIGESTS:
+ self._request = local_cas_pb2.FetchMissingBlobsRequest()
+ self._request.instance_name = self._remote.local_cas_instance_name
+ self._requests.append(self._request)
+
request_digest = self._request.blob_digests.add()
request_digest.CopyFrom(digest)
@@ -171,26 +181,28 @@ class _CASBatchRead():
assert not self._sent
self._sent = True
- if not self._request.blob_digests:
+ if not self._requests:
return
local_cas = self._remote.cascache._get_local_cas()
- batch_response = local_cas.FetchMissingBlobs(self._request)
- for response in batch_response.responses:
- if response.status.code == code_pb2.NOT_FOUND:
- if missing_blobs is None:
- raise BlobNotFound(response.digest.hash, "Failed to download blob {}: {}".format(
- response.digest.hash, response.status.code))
+ for request in self._requests:
+ batch_response = local_cas.FetchMissingBlobs(request)
- missing_blobs.append(response.digest)
+ for response in batch_response.responses:
+ if response.status.code == code_pb2.NOT_FOUND:
+ if missing_blobs is None:
+ raise BlobNotFound(response.digest.hash, "Failed to download blob {}: {}".format(
+ response.digest.hash, response.status.code))
- if response.status.code != code_pb2.OK:
- raise CASRemoteError("Failed to download blob {}: {}".format(
- response.digest.hash, response.status.code))
- if response.digest.size_bytes != len(response.data):
- raise CASRemoteError("Failed to download blob {}: expected {} bytes, received {} bytes".format(
- response.digest.hash, response.digest.size_bytes, len(response.data)))
+ missing_blobs.append(response.digest)
+
+ if response.status.code != code_pb2.OK:
+ raise CASRemoteError("Failed to download blob {}: {}".format(
+ response.digest.hash, response.status.code))
+ if response.digest.size_bytes != len(response.data):
+ raise CASRemoteError("Failed to download blob {}: expected {} bytes, received {} bytes".format(
+ response.digest.hash, response.digest.size_bytes, len(response.data)))
# Represents a batch of blobs queued for upload.
@@ -198,13 +210,18 @@ class _CASBatchRead():
class _CASBatchUpdate():
def __init__(self, remote):
self._remote = remote
- self._request = local_cas_pb2.UploadMissingBlobsRequest()
- self._request.instance_name = remote.local_cas_instance_name
+ self._requests = []
+ self._request = None
self._sent = False
def add(self, digest):
assert not self._sent
+ if not self._request or len(self._request.blob_digests) >= _MAX_DIGESTS:
+ self._request = local_cas_pb2.UploadMissingBlobsRequest()
+ self._request.instance_name = self._remote.local_cas_instance_name
+ self._requests.append(self._request)
+
request_digest = self._request.blob_digests.add()
request_digest.CopyFrom(digest)
@@ -212,18 +229,20 @@ class _CASBatchUpdate():
assert not self._sent
self._sent = True
- if not self._request.blob_digests:
+ if not self._requests:
return
local_cas = self._remote.cascache._get_local_cas()
- batch_response = local_cas.UploadMissingBlobs(self._request)
- for response in batch_response.responses:
- if response.status.code != code_pb2.OK:
- if response.status.code == code_pb2.RESOURCE_EXHAUSTED:
- reason = "cache-too-full"
- else:
- reason = None
+ for request in self._requests:
+ batch_response = local_cas.UploadMissingBlobs(request)
+
+ for response in batch_response.responses:
+ if response.status.code != code_pb2.OK:
+ if response.status.code == code_pb2.RESOURCE_EXHAUSTED:
+ reason = "cache-too-full"
+ else:
+ reason = None
- raise CASRemoteError("Failed to upload blob {}: {}".format(
- response.digest.hash, response.status.code), reason=reason)
+ raise CASRemoteError("Failed to upload blob {}: {}".format(
+ response.digest.hash, response.status.code), reason=reason)