diff options
author | Jürg Billeter <j@bitron.ch> | 2019-07-02 13:25:24 +0100 |
---|---|---|
committer | Darius Makovsky <traveltissues@protonmail.com> | 2019-08-19 12:27:54 +0100 |
commit | 77d33f5a44334694bad76adcc9f26cce81710357 (patch) | |
tree | 40ea3d72c3cd8e520917ae9ab1f0944b925682b2 | |
parent | f6dd7078b914a7cda214ab9746b53ac3035aff96 (diff) | |
download | buildstream-77d33f5a44334694bad76adcc9f26cce81710357.tar.gz |
casremote.py: Use FetchMissingBlobs in CASBatchRead
-rw-r--r-- | src/buildstream/_cas/cascache.py | 44 | ||||
-rw-r--r-- | src/buildstream/_cas/casremote.py | 25 |
2 files changed, 11 insertions, 58 deletions
diff --git a/src/buildstream/_cas/cascache.py b/src/buildstream/_cas/cascache.py index c44de9482..55dac681a 100644 --- a/src/buildstream/_cas/cascache.py +++ b/src/buildstream/_cas/cascache.py @@ -795,18 +795,9 @@ class CASCache(): return objpath - def _batch_download_complete(self, batch, *, missing_blobs=None): - for digest, data in batch.send(missing_blobs=missing_blobs): - with self._temporary_object() as f: - f.write(data) - f.flush() - - added_digest = self.add_object(path=f.name, link_directly=True) - assert added_digest.hash == digest.hash - # Helper function for _fetch_directory(). def _fetch_directory_batch(self, remote, batch, fetch_queue, fetch_next_queue): - self._batch_download_complete(batch) + batch.send() # All previously scheduled directories are now locally available, # move them to the processing queue. @@ -821,17 +812,8 @@ class CASCache(): if in_local_cache: # Skip download, already in local cache. pass - elif (digest.size_bytes >= remote.max_batch_total_size_bytes or - not remote.batch_read_supported): - # Too large for batch request, download in independent request. - self._ensure_blob(remote, digest) - in_local_cache = True else: - if not batch.add(digest): - # Not enough space left in batch request. - # Complete pending batch first. - batch = self._fetch_directory_batch(remote, batch, fetch_queue, fetch_next_queue) - batch.add(digest) + batch.add(digest) if recursive: if in_local_cache: @@ -915,27 +897,9 @@ class CASCache(): batch = _CASBatchRead(remote) for digest in digests: - if (digest.size_bytes >= remote.max_batch_total_size_bytes or - not remote.batch_read_supported): - # Too large for batch request, download in independent request. - try: - self._ensure_blob(remote, digest) - except grpc.RpcError as e: - if e.code() == grpc.StatusCode.NOT_FOUND: - missing_blobs.append(digest) - else: - raise CASCacheError("Failed to fetch blob: {}".format(e)) from e - else: - if not batch.add(digest): - # Not enough space left in batch request. - # Complete pending batch first. - self._batch_download_complete(batch, missing_blobs=missing_blobs) - - batch = _CASBatchRead(remote) - batch.add(digest) + batch.add(digest) - # Complete last pending batch - self._batch_download_complete(batch, missing_blobs=missing_blobs) + batch.send(missing_blobs=missing_blobs) return missing_blobs diff --git a/src/buildstream/_cas/casremote.py b/src/buildstream/_cas/casremote.py index 2a6028bf8..8068881fc 100644 --- a/src/buildstream/_cas/casremote.py +++ b/src/buildstream/_cas/casremote.py @@ -316,35 +316,26 @@ class CASRemote(): class _CASBatchRead(): def __init__(self, remote): self._remote = remote - self._max_total_size_bytes = remote.max_batch_total_size_bytes - self._request = remote_execution_pb2.BatchReadBlobsRequest() - if remote.instance_name: - self._request.instance_name = remote.instance_name - self._size = 0 + self._request = local_cas_pb2.FetchMissingBlobsRequest() + self._request.instance_name = remote.local_cas_instance_name self._sent = False def add(self, digest): assert not self._sent - new_batch_size = self._size + digest.size_bytes - if new_batch_size > self._max_total_size_bytes: - # Not enough space left in current batch - return False - - request_digest = self._request.digests.add() - request_digest.hash = digest.hash - request_digest.size_bytes = digest.size_bytes - self._size = new_batch_size + request_digest = self._request.blob_digests.add() + request_digest.CopyFrom(digest) return True def send(self, *, missing_blobs=None): assert not self._sent self._sent = True - if not self._request.digests: + if not self._request.blob_digests: return - batch_response = self._remote.cas.BatchReadBlobs(self._request) + local_cas = self._remote.cascache._get_local_cas() + batch_response = local_cas.FetchMissingBlobs(self._request) for response in batch_response.responses: if response.status.code == code_pb2.NOT_FOUND: @@ -361,8 +352,6 @@ class _CASBatchRead(): raise CASRemoteError("Failed to download blob {}: expected {} bytes, received {} bytes".format( response.digest.hash, response.digest.size_bytes, len(response.data))) - yield (response.digest, response.data) - # Represents a batch of blobs queued for upload. # |