summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/buildstream/_cas/cascache.py72
1 files changed, 17 insertions, 55 deletions
diff --git a/src/buildstream/_cas/cascache.py b/src/buildstream/_cas/cascache.py
index 3182ec050..42e2244c5 100644
--- a/src/buildstream/_cas/cascache.py
+++ b/src/buildstream/_cas/cascache.py
@@ -37,7 +37,7 @@ from ..types import FastEnum, SourceRef
from .._exceptions import CASCacheError
from .casdprocessmanager import CASDProcessManager
-from .casremote import _CASBatchRead, _CASBatchUpdate
+from .casremote import _CASBatchRead, _CASBatchUpdate, BlobNotFound
_BUFFER_SIZE = 65536
@@ -505,37 +505,6 @@ class CASCache:
return objpath
- # Helper function for _fetch_directory().
- def _fetch_directory_batch(self, remote, batch, fetch_queue, fetch_next_queue):
- batch.send()
-
- # All previously scheduled directories are now locally available,
- # move them to the processing queue.
- fetch_queue.extend(fetch_next_queue)
- fetch_next_queue.clear()
- return _CASBatchRead(remote)
-
- # Helper function for _fetch_directory().
- def _fetch_directory_node(self, remote, digest, batch, fetch_queue, fetch_next_queue, *, recursive=False):
- in_local_cache = os.path.exists(self.objpath(digest))
-
- if in_local_cache:
- # Skip download, already in local cache.
- pass
- else:
- batch.add(digest)
-
- if recursive:
- if in_local_cache:
- # Add directory to processing queue.
- fetch_queue.append(digest)
- else:
- # Directory will be available after completing pending batch.
- # Add directory to deferred processing queue.
- fetch_next_queue.append(digest)
-
- return batch
-
# _fetch_directory():
#
# Fetches remote directory and adds it to content addressable store.
@@ -548,31 +517,24 @@ class CASCache:
# dir_digest (Digest): Digest object for the directory to fetch.
#
def _fetch_directory(self, remote, dir_digest):
- # TODO Use GetTree() if the server supports it
-
- fetch_queue = [dir_digest]
- fetch_next_queue = []
- batch = _CASBatchRead(remote)
-
- while len(fetch_queue) + len(fetch_next_queue) > 0:
- if not fetch_queue:
- batch = self._fetch_directory_batch(remote, batch, fetch_queue, fetch_next_queue)
-
- dir_digest = fetch_queue.pop(0)
-
- objpath = self._ensure_blob(remote, dir_digest)
-
- directory = remote_execution_pb2.Directory()
- with open(objpath, "rb") as f:
- directory.ParseFromString(f.read())
+ local_cas = self.get_local_cas()
- for dirnode in directory.directories:
- batch = self._fetch_directory_node(
- remote, dirnode.digest, batch, fetch_queue, fetch_next_queue, recursive=True
- )
+ request = local_cas_pb2.FetchTreeRequest()
+ request.instance_name = remote.local_cas_instance_name
+ request.root_digest.CopyFrom(dir_digest)
+ request.fetch_file_blobs = False
- # Fetch final batch
- self._fetch_directory_batch(remote, batch, fetch_queue, fetch_next_queue)
+ try:
+ local_cas.FetchTree(request)
+ except grpc.RpcError as e:
+ if e.code() == grpc.StatusCode.NOT_FOUND:
+ raise BlobNotFound(
+ dir_digest.hash,
+ "Failed to fetch directory tree {}: {}: {}".format(dir_digest.hash, e.code().name, e.details()),
+ ) from e
+ raise CASCacheError(
+ "Failed to fetch directory tree {}: {}: {}".format(dir_digest.hash, e.code().name, e.details())
+ ) from e
def _fetch_tree(self, remote, digest):
objpath = self._ensure_blob(remote, digest)