diff options
author | Jürg Billeter <j@bitron.ch> | 2019-07-02 16:16:22 +0100 |
---|---|---|
committer | Jürg Billeter <j@bitron.ch> | 2019-08-20 08:09:52 +0200 |
commit | cfb8901e60a03def79cbceb85fbeaadd13aa5957 (patch) | |
tree | 05fbea31162ecadde8a26850bf3130bb24f4dd3a | |
parent | 4d1b04be71d7438b0453bffd009578f6a5b04331 (diff) | |
download | buildstream-cfb8901e60a03def79cbceb85fbeaadd13aa5957.tar.gz |
casremote.py: Use UploadMissingBlobs in CASBatchUpdate
-rw-r--r-- | src/buildstream/_cas/cascache.py | 16 | ||||
-rw-r--r-- | src/buildstream/_cas/casremote.py | 43 |
2 files changed, 9 insertions, 50 deletions
diff --git a/src/buildstream/_cas/cascache.py b/src/buildstream/_cas/cascache.py index d56ba0f55..e5fc530a7 100644 --- a/src/buildstream/_cas/cascache.py +++ b/src/buildstream/_cas/cascache.py @@ -947,22 +947,8 @@ class CASCache(): batch = _CASBatchUpdate(remote) for digest in digests: - with open(self.objpath(digest), 'rb') as f: - assert os.fstat(f.fileno()).st_size == digest.size_bytes + batch.add(digest) - if (digest.size_bytes >= remote.max_batch_total_size_bytes or - not remote.batch_update_supported): - # Too large for batch request, upload in independent request. - remote._send_blob(digest) - else: - if not batch.add(digest, f): - # Not enough space left in batch request. - # Complete pending batch first. - batch.send() - batch = _CASBatchUpdate(remote) - batch.add(digest, f) - - # Send final batch batch.send() def _send_directory(self, remote, digest): diff --git a/src/buildstream/_cas/casremote.py b/src/buildstream/_cas/casremote.py index 109d83da3..ab26d32c7 100644 --- a/src/buildstream/_cas/casremote.py +++ b/src/buildstream/_cas/casremote.py @@ -269,22 +269,6 @@ class CASRemote(): raise CASRemoteError("Failed to download blob {}: {}".format( blob_response.digest.hash, blob_response.status.code)) - def _send_blob(self, digest): - local_cas = self.cascache._get_local_cas() - request = local_cas_pb2.UploadMissingBlobsRequest() - request.instance_name = self.local_cas_instance_name - request_digest = request.blob_digests.add() - request_digest.CopyFrom(digest) - response = local_cas.UploadMissingBlobs(request) - for blob_response in response.responses: - if blob_response.status.code == code_pb2.NOT_FOUND: - raise BlobNotFound(blob_response.digest.hash, "Failed to upload blob {}: {}".format( - blob_response.digest.hash, blob_response.status.code)) - - if blob_response.status.code != code_pb2.OK: - raise CASRemoteError("Failed to upload blob {}: {}".format( - blob_response.digest.hash, blob_response.status.code)) - # Represents a batch of blobs queued for fetching. # @@ -332,36 +316,25 @@ class _CASBatchRead(): class _CASBatchUpdate(): def __init__(self, remote): self._remote = remote - self._max_total_size_bytes = remote.max_batch_total_size_bytes - self._request = remote_execution_pb2.BatchUpdateBlobsRequest() - if remote.instance_name: - self._request.instance_name = remote.instance_name - self._size = 0 + self._request = local_cas_pb2.UploadMissingBlobsRequest() + self._request.instance_name = remote.local_cas_instance_name self._sent = False - def add(self, digest, stream): + def add(self, digest): assert not self._sent - new_batch_size = self._size + digest.size_bytes - if new_batch_size > self._max_total_size_bytes: - # Not enough space left in current batch - return False - - blob_request = self._request.requests.add() - blob_request.digest.hash = digest.hash - blob_request.digest.size_bytes = digest.size_bytes - blob_request.data = stream.read(digest.size_bytes) - self._size = new_batch_size - return True + request_digest = self._request.blob_digests.add() + request_digest.CopyFrom(digest) def send(self): assert not self._sent self._sent = True - if not self._request.requests: + if not self._request.blob_digests: return - batch_response = self._remote.cas.BatchUpdateBlobs(self._request) + local_cas = self._remote.cascache._get_local_cas() + batch_response = local_cas.UploadMissingBlobs(self._request) for response in batch_response.responses: if response.status.code != code_pb2.OK: |