summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJürg Billeter <j@bitron.ch>2019-07-02 16:16:22 +0100
committerJürg Billeter <j@bitron.ch>2019-08-20 08:09:52 +0200
commitcfb8901e60a03def79cbceb85fbeaadd13aa5957 (patch)
tree05fbea31162ecadde8a26850bf3130bb24f4dd3a
parent4d1b04be71d7438b0453bffd009578f6a5b04331 (diff)
downloadbuildstream-cfb8901e60a03def79cbceb85fbeaadd13aa5957.tar.gz
casremote.py: Use UploadMissingBlobs in CASBatchUpdate
-rw-r--r--src/buildstream/_cas/cascache.py16
-rw-r--r--src/buildstream/_cas/casremote.py43
2 files changed, 9 insertions, 50 deletions
diff --git a/src/buildstream/_cas/cascache.py b/src/buildstream/_cas/cascache.py
index d56ba0f55..e5fc530a7 100644
--- a/src/buildstream/_cas/cascache.py
+++ b/src/buildstream/_cas/cascache.py
@@ -947,22 +947,8 @@ class CASCache():
batch = _CASBatchUpdate(remote)
for digest in digests:
- with open(self.objpath(digest), 'rb') as f:
- assert os.fstat(f.fileno()).st_size == digest.size_bytes
+ batch.add(digest)
- if (digest.size_bytes >= remote.max_batch_total_size_bytes or
- not remote.batch_update_supported):
- # Too large for batch request, upload in independent request.
- remote._send_blob(digest)
- else:
- if not batch.add(digest, f):
- # Not enough space left in batch request.
- # Complete pending batch first.
- batch.send()
- batch = _CASBatchUpdate(remote)
- batch.add(digest, f)
-
- # Send final batch
batch.send()
def _send_directory(self, remote, digest):
diff --git a/src/buildstream/_cas/casremote.py b/src/buildstream/_cas/casremote.py
index 109d83da3..ab26d32c7 100644
--- a/src/buildstream/_cas/casremote.py
+++ b/src/buildstream/_cas/casremote.py
@@ -269,22 +269,6 @@ class CASRemote():
raise CASRemoteError("Failed to download blob {}: {}".format(
blob_response.digest.hash, blob_response.status.code))
- def _send_blob(self, digest):
- local_cas = self.cascache._get_local_cas()
- request = local_cas_pb2.UploadMissingBlobsRequest()
- request.instance_name = self.local_cas_instance_name
- request_digest = request.blob_digests.add()
- request_digest.CopyFrom(digest)
- response = local_cas.UploadMissingBlobs(request)
- for blob_response in response.responses:
- if blob_response.status.code == code_pb2.NOT_FOUND:
- raise BlobNotFound(blob_response.digest.hash, "Failed to upload blob {}: {}".format(
- blob_response.digest.hash, blob_response.status.code))
-
- if blob_response.status.code != code_pb2.OK:
- raise CASRemoteError("Failed to upload blob {}: {}".format(
- blob_response.digest.hash, blob_response.status.code))
-
# Represents a batch of blobs queued for fetching.
#
@@ -332,36 +316,25 @@ class _CASBatchRead():
class _CASBatchUpdate():
def __init__(self, remote):
self._remote = remote
- self._max_total_size_bytes = remote.max_batch_total_size_bytes
- self._request = remote_execution_pb2.BatchUpdateBlobsRequest()
- if remote.instance_name:
- self._request.instance_name = remote.instance_name
- self._size = 0
+ self._request = local_cas_pb2.UploadMissingBlobsRequest()
+ self._request.instance_name = remote.local_cas_instance_name
self._sent = False
- def add(self, digest, stream):
+ def add(self, digest):
assert not self._sent
- new_batch_size = self._size + digest.size_bytes
- if new_batch_size > self._max_total_size_bytes:
- # Not enough space left in current batch
- return False
-
- blob_request = self._request.requests.add()
- blob_request.digest.hash = digest.hash
- blob_request.digest.size_bytes = digest.size_bytes
- blob_request.data = stream.read(digest.size_bytes)
- self._size = new_batch_size
- return True
+ request_digest = self._request.blob_digests.add()
+ request_digest.CopyFrom(digest)
def send(self):
assert not self._sent
self._sent = True
- if not self._request.requests:
+ if not self._request.blob_digests:
return
- batch_response = self._remote.cas.BatchUpdateBlobs(self._request)
+ local_cas = self._remote.cascache._get_local_cas()
+ batch_response = local_cas.UploadMissingBlobs(self._request)
for response in batch_response.responses:
if response.status.code != code_pb2.OK: