summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorbst-marge-bot <marge-bot@buildstream.build>2020-12-09 18:03:26 +0000
committerbst-marge-bot <marge-bot@buildstream.build>2020-12-09 18:03:26 +0000
commit270439458a2c34486d1907aec53844cd29bfe2d6 (patch)
treee4c7679ded79f80c478a13a7a878ef00a772efe4
parent47ef74f0ba4868eeba94baea3a8beb2874ec6b24 (diff)
parentc836b7be6feaa0b9f86ab8c0cb5b913b0d8ed0e0 (diff)
downloadbuildstream-270439458a2c34486d1907aec53844cd29bfe2d6.tar.gz
Merge branch 'juerg/cas' into 'master'
CASCache improvements See merge request BuildStream/buildstream!2112
-rw-r--r--src/buildstream/_artifact.py11
-rw-r--r--src/buildstream/_artifactcache.py15
-rw-r--r--src/buildstream/_cas/cascache.py275
-rw-r--r--src/buildstream/_elementsourcescache.py11
-rw-r--r--src/buildstream/_sourcecache.py3
-rw-r--r--src/buildstream/sandbox/_sandboxremote.py10
-rw-r--r--src/buildstream/storage/_casbaseddirectory.py2
-rw-r--r--tests/testutils/artifactshare.py21
8 files changed, 142 insertions, 206 deletions
diff --git a/src/buildstream/_artifact.py b/src/buildstream/_artifact.py
index 6e2bc9342..d4a716ff1 100644
--- a/src/buildstream/_artifact.py
+++ b/src/buildstream/_artifact.py
@@ -241,7 +241,7 @@ class Artifact:
# Store public data
with utils._tempnamedfile_name(dir=self._tmpdir) as tmpname:
_yaml.roundtrip_dump(publicdata, tmpname)
- public_data_digest = self._cas.add_object(path=tmpname, link_directly=True)
+ public_data_digest = self._cas.add_object(path=tmpname)
artifact.public_data.CopyFrom(public_data_digest)
size += public_data_digest.size_bytes
@@ -255,7 +255,7 @@ class Artifact:
low_diversity_node = Node.from_dict(low_diversity_dict)
_yaml.roundtrip_dump(low_diversity_node, tmpname)
- low_diversity_meta_digest = self._cas.add_object(path=tmpname, link_directly=True)
+ low_diversity_meta_digest = self._cas.add_object(path=tmpname)
artifact.low_diversity_meta.CopyFrom(low_diversity_meta_digest)
size += low_diversity_meta_digest.size_bytes
@@ -269,7 +269,7 @@ class Artifact:
high_diversity_node = Node.from_dict(high_diversity_dict)
_yaml.roundtrip_dump(high_diversity_node, tmpname)
- high_diversity_meta_digest = self._cas.add_object(path=tmpname, link_directly=True)
+ high_diversity_meta_digest = self._cas.add_object(path=tmpname)
artifact.high_diversity_meta.CopyFrom(high_diversity_meta_digest)
size += high_diversity_meta_digest.size_bytes
@@ -370,8 +370,9 @@ class Artifact:
# Load the public data from the artifact
artifact = self._get_proto()
- meta_file = self._cas.objpath(artifact.public_data)
- data = _yaml.load(meta_file, shortname="public.yaml")
+ with self._cas.open(artifact.public_data) as meta_file:
+ meta_str = meta_file.read()
+ data = _yaml.load_data(meta_str, file_name="public.yaml")
return data
diff --git a/src/buildstream/_artifactcache.py b/src/buildstream/_artifactcache.py
index 09804fe01..ded87679a 100644
--- a/src/buildstream/_artifactcache.py
+++ b/src/buildstream/_artifactcache.py
@@ -339,7 +339,7 @@ class ArtifactCache(AssetCache):
for remote in push_remotes:
remote.init()
- remote_missing_blobs = self.cas.remote_missing_blobs(remote, missing_blobs)
+ remote_missing_blobs = self.cas.missing_blobs(missing_blobs, remote=remote)
for blob in remote_missing_blobs:
if blob not in remote_missing_blobs_list:
@@ -504,20 +504,13 @@ class ArtifactCache(AssetCache):
# blobs not existing on the server.
#
def _pull_artifact_storage(self, element, key, artifact_digest, remote, pull_buildtrees=False):
- def __pull_digest(digest):
- self.cas._fetch_directory(remote, digest)
- required_blobs = self.cas.required_blobs_for_directory(digest)
- missing_blobs = self.cas.local_missing_blobs(required_blobs)
- if missing_blobs:
- self.cas.fetch_blobs(remote, missing_blobs)
-
artifact_name = element.get_artifact_name(key=key)
try:
# Fetch and parse artifact proto
self.cas.fetch_blobs(remote, [artifact_digest])
artifact = artifact_pb2.Artifact()
- with open(self.cas.objpath(artifact_digest), "rb") as f:
+ with self.cas.open(artifact_digest, "rb") as f:
artifact.ParseFromString(f.read())
# Write the artifact proto to cache
@@ -527,10 +520,10 @@ class ArtifactCache(AssetCache):
f.write(artifact.SerializeToString())
if str(artifact.files):
- __pull_digest(artifact.files)
+ self.cas._fetch_directory(remote, artifact.files)
if pull_buildtrees and str(artifact.buildtree):
- __pull_digest(artifact.buildtree)
+ self.cas._fetch_directory(remote, artifact.buildtree)
digests = [artifact.low_diversity_meta, artifact.high_diversity_meta]
if str(artifact.public_data):
diff --git a/src/buildstream/_cas/cascache.py b/src/buildstream/_cas/cascache.py
index d13531c6c..b80460abf 100644
--- a/src/buildstream/_cas/cascache.py
+++ b/src/buildstream/_cas/cascache.py
@@ -37,7 +37,7 @@ from ..types import FastEnum, SourceRef
from .._exceptions import CASCacheError
from .casdprocessmanager import CASDProcessManager
-from .casremote import _CASBatchRead, _CASBatchUpdate
+from .casremote import _CASBatchRead, _CASBatchUpdate, BlobNotFound
_BUFFER_SIZE = 65536
@@ -152,13 +152,7 @@ class CASCache:
# Returns: True if the files are in the cache, False otherwise
#
def contains_files(self, digests):
- cas = self.get_cas()
-
- request = remote_execution_pb2.FindMissingBlobsRequest()
- request.blob_digests.extend(digests)
-
- response = cas.FindMissingBlobs(request)
- return len(response.missing_blob_digests) == 0
+ return len(self.missing_blobs(digests)) == 0
# contains_directory():
#
@@ -278,15 +272,29 @@ class CASCache:
def objpath(self, digest):
return os.path.join(self.casdir, "objects", digest.hash[:2], digest.hash[2:])
+ # open():
+ #
+ # Open file read-only by CAS digest and return a corresponding file object.
+ #
+ # Args:
+ # digest (Digest): The digest of the object
+ # mode (str): An optional string that specifies the mode in which the file is opened.
+ #
+ def open(self, digest, mode="r"):
+ if mode not in ["r", "rb"]:
+ raise ValueError("Unsupported mode: `{}`".format(mode))
+
+ objpath = self.objpath(digest)
+
+ return open(objpath, mode=mode)
+
# add_object():
#
# Hash and write object to CAS.
#
# Args:
- # digest (Digest): An optional Digest object to populate
# path (str): Path to file to add
# buffer (bytes): Byte buffer to add
- # link_directly (bool): Whether file given by path can be linked
# instance_name (str): casd instance_name for remote CAS
#
# Returns:
@@ -294,44 +302,72 @@ class CASCache:
#
# Either `path` or `buffer` must be passed, but not both.
#
- def add_object(self, *, digest=None, path=None, buffer=None, link_directly=False, instance_name=None):
+ def add_object(self, *, path=None, buffer=None, instance_name=None):
# Exactly one of the two parameters has to be specified
assert (path is None) != (buffer is None)
+ if path is None:
+ digests = self.add_objects(buffers=[buffer], instance_name=instance_name)
+ else:
+ digests = self.add_objects(paths=[path], instance_name=instance_name)
+ assert len(digests) == 1
+ return digests[0]
- # If we're linking directly, then path must be specified.
- assert (not link_directly) or (link_directly and path)
+ # add_objects():
+ #
+ # Hash and write objects to CAS.
+ #
+ # Args:
+ # paths (List[str]): Paths to files to add
+ # buffers (List[bytes]): Byte buffers to add
+ # instance_name (str): casd instance_name for remote CAS
+ #
+ # Returns:
+ # (List[Digest]): The digests of the added objects
+ #
+ # Either `paths` or `buffers` must be passed, but not both.
+ #
+ def add_objects(self, *, paths=None, buffers=None, instance_name=None):
+ # Exactly one of the two parameters has to be specified
+ assert (paths is None) != (buffers is None)
- if digest is None:
- digest = remote_execution_pb2.Digest()
+ digests = []
with contextlib.ExitStack() as stack:
- if path is None:
- tmp = stack.enter_context(self._temporary_object())
- tmp.write(buffer)
- tmp.flush()
- path = tmp.name
+ if paths is None:
+ paths = []
+ for buffer in buffers:
+ tmp = stack.enter_context(self._temporary_object())
+ tmp.write(buffer)
+ tmp.flush()
+ paths.append(tmp.name)
request = local_cas_pb2.CaptureFilesRequest()
if instance_name:
request.instance_name = instance_name
- request.path.append(path)
+ for path in paths:
+ request.path.append(path)
local_cas = self.get_local_cas()
response = local_cas.CaptureFiles(request)
- if len(response.responses) != 1:
- raise CASCacheError("Expected 1 response from CaptureFiles, got {}".format(len(response.responses)))
+ if len(response.responses) != len(paths):
+ raise CASCacheError(
+ "Expected {} responses from CaptureFiles, got {}".format(len(paths), len(response.responses))
+ )
+
+ for path, blob_response in zip(paths, response.responses):
+ if blob_response.status.code == code_pb2.RESOURCE_EXHAUSTED:
+ raise CASCacheError("Cache too full", reason="cache-too-full")
+ if blob_response.status.code != code_pb2.OK:
+ raise CASCacheError("Failed to capture blob {}: {}".format(path, blob_response.status.code))
- blob_response = response.responses[0]
- if blob_response.status.code == code_pb2.RESOURCE_EXHAUSTED:
- raise CASCacheError("Cache too full", reason="cache-too-full")
- if blob_response.status.code != code_pb2.OK:
- raise CASCacheError("Failed to capture blob {}: {}".format(path, blob_response.status.code))
- digest.CopyFrom(blob_response.digest)
+ digest = remote_execution_pb2.Digest()
+ digest.CopyFrom(blob_response.digest)
+ digests.append(digest)
- return digest
+ return digests
# import_directory():
#
@@ -374,7 +410,7 @@ class CASCache:
return utils._message_digest(root_directory)
- # remote_missing_blobs_for_directory():
+ # missing_blobs_for_directory():
#
# Determine which blobs of a directory tree are missing on the remote.
#
@@ -383,23 +419,27 @@ class CASCache:
#
# Returns: List of missing Digest objects
#
- def remote_missing_blobs_for_directory(self, remote, digest):
+ def missing_blobs_for_directory(self, digest, *, remote=None):
required_blobs = self.required_blobs_for_directory(digest)
- return self.remote_missing_blobs(remote, required_blobs)
+ return self.missing_blobs(required_blobs, remote=remote)
- # remote_missing_blobs():
+ # missing_blobs():
#
- # Determine which blobs are missing on the remote.
+ # Determine which blobs are missing locally or on the remote.
#
# Args:
# blobs ([Digest]): List of directory digests to check
#
# Returns: List of missing Digest objects
#
- def remote_missing_blobs(self, remote, blobs):
+ def missing_blobs(self, blobs, *, remote=None):
cas = self.get_cas()
- instance_name = remote.local_cas_instance_name
+
+ if remote:
+ instance_name = remote.local_cas_instance_name
+ else:
+ instance_name = ""
missing_blobs = dict()
# Limit size of FindMissingBlobs request
@@ -424,23 +464,6 @@ class CASCache:
return missing_blobs.values()
- # local_missing_blobs():
- #
- # Check local cache for missing blobs.
- #
- # Args:
- # digests (list): The Digests of blobs to check
- #
- # Returns: Missing Digest objects
- #
- def local_missing_blobs(self, digests):
- missing_blobs = []
- for digest in digests:
- objpath = self.objpath(digest)
- if not os.path.exists(objpath):
- missing_blobs.append(digest)
- return missing_blobs
-
# required_blobs_for_directory():
#
# Generator that returns the Digests of all blobs in the tree specified by
@@ -470,38 +493,6 @@ class CASCache:
# Local Private Methods #
################################################
- def _reachable_refs_dir(self, reachable, tree, update_mtime=False, check_exists=False):
- if tree.hash in reachable:
- return
- try:
- if update_mtime:
- os.utime(self.objpath(tree))
-
- reachable.add(tree.hash)
-
- directory = remote_execution_pb2.Directory()
-
- with open(self.objpath(tree), "rb") as f:
- directory.ParseFromString(f.read())
-
- except FileNotFoundError:
- if check_exists:
- raise
-
- # Just exit early if the file doesn't exist
- return
-
- for filenode in directory.files:
- if update_mtime:
- os.utime(self.objpath(filenode.digest))
- if check_exists:
- if not os.path.exists(self.objpath(filenode.digest)):
- raise FileNotFoundError
- reachable.add(filenode.digest.hash)
-
- for dirnode in directory.directories:
- self._reachable_refs_dir(reachable, dirnode.digest, update_mtime=update_mtime, check_exists=check_exists)
-
# _temporary_object():
#
# Returns:
@@ -514,113 +505,55 @@ class CASCache:
os.chmod(f.name, stat.S_IRUSR | stat.S_IWUSR | stat.S_IRGRP | stat.S_IROTH)
yield f
- # _ensure_blob():
- #
- # Fetch and add blob if it's not already local.
- #
- # Args:
- # remote (Remote): The remote to use.
- # digest (Digest): Digest object for the blob to fetch.
- #
- # Returns:
- # (str): The path of the object
- #
- def _ensure_blob(self, remote, digest):
- objpath = self.objpath(digest)
- if os.path.exists(objpath):
- # already in local repository
- return objpath
-
- batch = _CASBatchRead(remote)
- batch.add(digest)
- batch.send()
-
- return objpath
-
- # Helper function for _fetch_directory().
- def _fetch_directory_batch(self, remote, batch, fetch_queue, fetch_next_queue):
- batch.send()
-
- # All previously scheduled directories are now locally available,
- # move them to the processing queue.
- fetch_queue.extend(fetch_next_queue)
- fetch_next_queue.clear()
- return _CASBatchRead(remote)
-
- # Helper function for _fetch_directory().
- def _fetch_directory_node(self, remote, digest, batch, fetch_queue, fetch_next_queue, *, recursive=False):
- in_local_cache = os.path.exists(self.objpath(digest))
-
- if in_local_cache:
- # Skip download, already in local cache.
- pass
- else:
- batch.add(digest)
-
- if recursive:
- if in_local_cache:
- # Add directory to processing queue.
- fetch_queue.append(digest)
- else:
- # Directory will be available after completing pending batch.
- # Add directory to deferred processing queue.
- fetch_next_queue.append(digest)
-
- return batch
-
# _fetch_directory():
#
# Fetches remote directory and adds it to content addressable store.
#
- # This recursively fetches directory objects but doesn't fetch any
- # files.
+ # This recursively fetches directory objects and files.
#
# Args:
# remote (Remote): The remote to use.
# dir_digest (Digest): Digest object for the directory to fetch.
#
def _fetch_directory(self, remote, dir_digest):
- # TODO Use GetTree() if the server supports it
-
- fetch_queue = [dir_digest]
- fetch_next_queue = []
- batch = _CASBatchRead(remote)
-
- while len(fetch_queue) + len(fetch_next_queue) > 0:
- if not fetch_queue:
- batch = self._fetch_directory_batch(remote, batch, fetch_queue, fetch_next_queue)
-
- dir_digest = fetch_queue.pop(0)
-
- objpath = self._ensure_blob(remote, dir_digest)
+ local_cas = self.get_local_cas()
- directory = remote_execution_pb2.Directory()
- with open(objpath, "rb") as f:
- directory.ParseFromString(f.read())
+ request = local_cas_pb2.FetchTreeRequest()
+ request.instance_name = remote.local_cas_instance_name
+ request.root_digest.CopyFrom(dir_digest)
+ request.fetch_file_blobs = False
- for dirnode in directory.directories:
- batch = self._fetch_directory_node(
- remote, dirnode.digest, batch, fetch_queue, fetch_next_queue, recursive=True
- )
+ try:
+ local_cas.FetchTree(request)
+ except grpc.RpcError as e:
+ if e.code() == grpc.StatusCode.NOT_FOUND:
+ raise BlobNotFound(
+ dir_digest.hash,
+ "Failed to fetch directory tree {}: {}: {}".format(dir_digest.hash, e.code().name, e.details()),
+ ) from e
+ raise CASCacheError(
+ "Failed to fetch directory tree {}: {}: {}".format(dir_digest.hash, e.code().name, e.details())
+ ) from e
- # Fetch final batch
- self._fetch_directory_batch(remote, batch, fetch_queue, fetch_next_queue)
+ required_blobs = self.required_blobs_for_directory(dir_digest)
+ self.fetch_blobs(remote, required_blobs)
def _fetch_tree(self, remote, digest):
- objpath = self._ensure_blob(remote, digest)
+ self.fetch_blobs(remote, [digest])
tree = remote_execution_pb2.Tree()
- with open(objpath, "rb") as f:
+ with self.open(digest, "rb") as f:
tree.ParseFromString(f.read())
- tree.children.extend([tree.root])
+ dirbuffers = [tree.root.SerializeToString()]
for directory in tree.children:
- dirbuffer = directory.SerializeToString()
- dirdigest = self.add_object(buffer=dirbuffer)
- assert dirdigest.size_bytes == len(dirbuffer)
+ dirbuffers.append(directory.SerializeToString())
+
+ dirdigests = self.add_objects(buffers=dirbuffers)
- return dirdigest
+ # The digest of the root directory
+ return dirdigests[0]
# fetch_blobs():
#
diff --git a/src/buildstream/_elementsourcescache.py b/src/buildstream/_elementsourcescache.py
index 84e7633e5..194f3fd4a 100644
--- a/src/buildstream/_elementsourcescache.py
+++ b/src/buildstream/_elementsourcescache.py
@@ -295,18 +295,11 @@ class ElementSourcesCache(AssetCache):
# blobs not existing on the server.
#
def _pull_source_storage(self, key, source_digest, remote):
- def __pull_digest(digest):
- self.cas._fetch_directory(remote, digest)
- required_blobs = self.cas.required_blobs_for_directory(digest)
- missing_blobs = self.cas.local_missing_blobs(required_blobs)
- if missing_blobs:
- self.cas.fetch_blobs(remote, missing_blobs)
-
try:
# Fetch and parse source proto
self.cas.fetch_blobs(remote, [source_digest])
source = source_pb2.Source()
- with open(self.cas.objpath(source_digest), "rb") as f:
+ with self.cas.open(source_digest, "rb") as f:
source.ParseFromString(f.read())
# Write the source proto to cache
@@ -314,7 +307,7 @@ class ElementSourcesCache(AssetCache):
with utils.save_file_atomic(source_path, mode="wb") as f:
f.write(source.SerializeToString())
- __pull_digest(source.files)
+ self.cas._fetch_directory(remote, source.files)
except grpc.RpcError as e:
if e.code() != grpc.StatusCode.NOT_FOUND:
raise SourceCacheError("Failed to pull source with status {}: {}".format(e.code().name, e.details()))
diff --git a/src/buildstream/_sourcecache.py b/src/buildstream/_sourcecache.py
index 76c22efbd..37d990b4f 100644
--- a/src/buildstream/_sourcecache.py
+++ b/src/buildstream/_sourcecache.py
@@ -146,9 +146,6 @@ class SourceCache(AssetCache):
# Fetch source blobs
self.cas._fetch_directory(remote, source_digest)
- required_blobs = self.cas.required_blobs_for_directory(source_digest)
- missing_blobs = self.cas.local_missing_blobs(required_blobs)
- self.cas.fetch_blobs(remote, missing_blobs)
source.info("Pulled source {} <- {}".format(display_key, remote))
return True
diff --git a/src/buildstream/sandbox/_sandboxremote.py b/src/buildstream/sandbox/_sandboxremote.py
index 2ac159337..ff314adba 100644
--- a/src/buildstream/sandbox/_sandboxremote.py
+++ b/src/buildstream/sandbox/_sandboxremote.py
@@ -276,7 +276,7 @@ class SandboxRemote(SandboxREAPI):
dir_digest = vdir._get_digest()
required_blobs = cascache.required_blobs_for_directory(dir_digest)
- local_missing_blobs = cascache.local_missing_blobs(required_blobs)
+ local_missing_blobs = cascache.missing_blobs(required_blobs)
if local_missing_blobs:
if self._output_files_required:
# Fetch all blobs from Remote Execution CAS server
@@ -319,14 +319,14 @@ class SandboxRemote(SandboxREAPI):
# Determine blobs missing on remote
try:
input_root_digest = action.input_root_digest
- missing_blobs = list(cascache.remote_missing_blobs_for_directory(casremote, input_root_digest))
+ missing_blobs = list(cascache.missing_blobs_for_directory(input_root_digest, remote=casremote))
except grpc.RpcError as e:
raise SandboxError("Failed to determine missing blobs: {}".format(e)) from e
# Check if any blobs are also missing locally (partial artifact)
# and pull them from the artifact cache.
try:
- local_missing_blobs = cascache.local_missing_blobs(missing_blobs)
+ local_missing_blobs = cascache.missing_blobs(missing_blobs)
if local_missing_blobs:
artifactcache.fetch_missing_blobs(project, local_missing_blobs)
except (grpc.RpcError, BstError) as e:
@@ -380,13 +380,13 @@ class SandboxRemote(SandboxREAPI):
# Forward remote stdout and stderr
if stdout:
if action_result.stdout_digest.hash:
- with open(cascache.objpath(action_result.stdout_digest), "r") as f:
+ with cascache.open(action_result.stdout_digest, "r") as f:
shutil.copyfileobj(f, stdout)
elif action_result.stdout_raw:
stdout.write(str(action_result.stdout_raw, "utf-8", errors="ignore"))
if stderr:
if action_result.stderr_digest.hash:
- with open(cascache.objpath(action_result.stderr_digest), "r") as f:
+ with cascache.open(action_result.stderr_digest, "r") as f:
shutil.copyfileobj(f, stderr)
elif action_result.stderr_raw:
stderr.write(str(action_result.stderr_raw, "utf-8", errors="ignore"))
diff --git a/src/buildstream/storage/_casbaseddirectory.py b/src/buildstream/storage/_casbaseddirectory.py
index 75399953e..c061a28e4 100644
--- a/src/buildstream/storage/_casbaseddirectory.py
+++ b/src/buildstream/storage/_casbaseddirectory.py
@@ -220,7 +220,7 @@ class CasBasedDirectory(Directory):
return newdir
def _add_file(self, name, path, modified=False, can_link=False, properties=None):
- digest = self.cas_cache.add_object(path=path, link_directly=can_link)
+ digest = self.cas_cache.add_object(path=path)
is_executable = os.access(path, os.X_OK)
mtime = None
if properties and "mtime" in properties:
diff --git a/tests/testutils/artifactshare.py b/tests/testutils/artifactshare.py
index bd9c97c61..8ce8a4198 100644
--- a/tests/testutils/artifactshare.py
+++ b/tests/testutils/artifactshare.py
@@ -209,7 +209,7 @@ class ArtifactShare(BaseArtifactShare):
reachable = set()
def reachable_dir(digest):
- self.cas._reachable_refs_dir(reachable, digest, update_mtime=False, check_exists=True)
+ self._reachable_refs_dir(reachable, digest)
try:
artifact_proto_path = self.cas.objpath(artifact_proto_digest)
@@ -271,6 +271,25 @@ class ArtifactShare(BaseArtifactShare):
shutil.rmtree(self.directory)
+ def _reachable_refs_dir(self, reachable, tree):
+ if tree.hash in reachable:
+ return
+
+ reachable.add(tree.hash)
+
+ directory = remote_execution_pb2.Directory()
+
+ with open(self.cas.objpath(tree), "rb") as f:
+ directory.ParseFromString(f.read())
+
+ for filenode in directory.files:
+ if not os.path.exists(self.cas.objpath(filenode.digest)):
+ raise FileNotFoundError
+ reachable.add(filenode.digest.hash)
+
+ for dirnode in directory.directories:
+ self._reachable_refs_dir(reachable, dirnode.digest)
+
# create_artifact_share()
#