summaryrefslogtreecommitdiff
path: root/buildstream/_artifactcache/cascache.py
diff options
context:
space:
mode:
Diffstat (limited to 'buildstream/_artifactcache/cascache.py')
-rw-r--r--buildstream/_artifactcache/cascache.py435
1 files changed, 171 insertions, 264 deletions
diff --git a/buildstream/_artifactcache/cascache.py b/buildstream/_artifactcache/cascache.py
index a3d27c8d1..c6efcf508 100644
--- a/buildstream/_artifactcache/cascache.py
+++ b/buildstream/_artifactcache/cascache.py
@@ -19,9 +19,7 @@
import hashlib
import itertools
-import multiprocessing
import os
-import signal
import stat
import tempfile
import uuid
@@ -31,17 +29,12 @@ from urllib.parse import urlparse
import grpc
-from .. import _yaml
-
from .._protos.google.bytestream import bytestream_pb2, bytestream_pb2_grpc
from .._protos.build.bazel.remote.execution.v2 import remote_execution_pb2, remote_execution_pb2_grpc
from .._protos.buildstream.v2 import buildstream_pb2, buildstream_pb2_grpc
-from .._message import MessageType, Message
-from .. import _signals, utils
-from .._exceptions import ArtifactError
-
-from . import ArtifactCache
+from .. import utils
+from .._exceptions import CASError
# The default limit for gRPC messages is 4 MiB.
@@ -89,68 +82,74 @@ def _retry(tries=5):
break
-class BlobNotFound(ArtifactError):
+class BlobNotFound(CASError):
def __init__(self, blob, msg):
self.blob = blob
super().__init__(msg)
-# A CASCache manages artifacts in a CAS repository as specified in the
-# Remote Execution API.
+# A CASCache manages a CAS repository as specified in the Remote Execution API.
#
# Args:
-# context (Context): The BuildStream context
-#
-# Pushing is explicitly disabled by the platform in some cases,
-# like when we are falling back to functioning without using
-# user namespaces.
+# path (str): The root directory for the CAS repository
#
-class CASCache(ArtifactCache):
+class CASCache():
- def __init__(self, context):
- super().__init__(context)
-
- self.casdir = os.path.join(context.artifactdir, 'cas')
+ def __init__(self, path):
+ self.casdir = os.path.join(path, 'cas')
+ self.tmpdir = os.path.join(path, 'tmp')
os.makedirs(os.path.join(self.casdir, 'refs', 'heads'), exist_ok=True)
os.makedirs(os.path.join(self.casdir, 'objects'), exist_ok=True)
+ os.makedirs(self.tmpdir, exist_ok=True)
- self._calculate_cache_quota()
-
- # Per-project list of _CASRemote instances.
- self._remotes = {}
-
- self._has_fetch_remotes = False
- self._has_push_remotes = False
-
- ################################################
- # Implementation of abstract methods #
- ################################################
-
+ # preflight():
+ #
+ # Preflight check.
+ #
def preflight(self):
if (not os.path.isdir(os.path.join(self.casdir, 'refs', 'heads')) or
not os.path.isdir(os.path.join(self.casdir, 'objects'))):
- raise ArtifactError("CAS repository check failed for '{}'"
- .format(self.casdir))
+ raise CASError("CAS repository check failed for '{}'".format(self.casdir))
- def contains(self, element, key):
- refpath = self._refpath(self.get_artifact_fullname(element, key))
+ # contains():
+ #
+ # Check whether the specified ref is already available in the local CAS cache.
+ #
+ # Args:
+ # ref (str): The ref to check
+ #
+ # Returns: True if the ref is in the cache, False otherwise
+ #
+ def contains(self, ref):
+ refpath = self._refpath(ref)
# This assumes that the repository doesn't have any dangling pointers
return os.path.exists(refpath)
- def extract(self, element, key):
- ref = self.get_artifact_fullname(element, key)
-
+ # extract():
+ #
+ # Extract cached directory for the specified ref if it hasn't
+ # already been extracted.
+ #
+ # Args:
+ # ref (str): The ref whose directory to extract
+ # path (str): The destination path
+ #
+ # Raises:
+ # CASError: In cases there was an OSError, or if the ref did not exist.
+ #
+ # Returns: path to extracted directory
+ #
+ def extract(self, ref, path):
tree = self.resolve_ref(ref, update_mtime=True)
- dest = os.path.join(self.extractdir, element._get_project().name,
- element.normal_name, tree.hash)
+ dest = os.path.join(path, tree.hash)
if os.path.isdir(dest):
- # artifact has already been extracted
+ # directory has already been extracted
return dest
- with tempfile.TemporaryDirectory(prefix='tmp', dir=self.extractdir) as tmpdir:
+ with tempfile.TemporaryDirectory(prefix='tmp', dir=self.tmpdir) as tmpdir:
checkoutdir = os.path.join(tmpdir, ref)
self._checkout(checkoutdir, tree)
@@ -164,23 +163,35 @@ class CASCache(ArtifactCache):
# If rename fails with these errors, another process beat
# us to it so just ignore.
if e.errno not in [errno.ENOTEMPTY, errno.EEXIST]:
- raise ArtifactError("Failed to extract artifact for ref '{}': {}"
- .format(ref, e)) from e
+ raise CASError("Failed to extract directory for ref '{}': {}".format(ref, e)) from e
return dest
- def commit(self, element, content, keys):
- refs = [self.get_artifact_fullname(element, key) for key in keys]
-
- tree = self._commit_directory(content)
+ # commit():
+ #
+ # Commit directory to cache.
+ #
+ # Args:
+ # refs (list): The refs to set
+ # path (str): The directory to import
+ #
+ def commit(self, refs, path):
+ tree = self._commit_directory(path)
for ref in refs:
self.set_ref(ref, tree)
- def diff(self, element, key_a, key_b, *, subdir=None):
- ref_a = self.get_artifact_fullname(element, key_a)
- ref_b = self.get_artifact_fullname(element, key_b)
-
+ # diff():
+ #
+ # Return a list of files that have been added or modified between
+ # the refs described by ref_a and ref_b.
+ #
+ # Args:
+ # ref_a (str): The first ref
+ # ref_b (str): The second ref
+ # subdir (str): A subdirectory to limit the comparison to
+ #
+ def diff(self, ref_a, ref_b, *, subdir=None):
tree_a = self.resolve_ref(ref_a)
tree_b = self.resolve_ref(ref_b)
@@ -196,145 +207,103 @@ class CASCache(ArtifactCache):
return modified, removed, added
- def initialize_remotes(self, *, on_failure=None):
- remote_specs = self.global_remote_specs
-
- for project in self.project_remote_specs:
- remote_specs += self.project_remote_specs[project]
-
- remote_specs = list(utils._deduplicate(remote_specs))
+ def initialize_remote(self, remote_spec, q):
+ try:
+ remote = CASRemote(remote_spec)
+ remote.init()
- remotes = {}
- q = multiprocessing.Queue()
- for remote_spec in remote_specs:
- # Use subprocess to avoid creation of gRPC threads in main BuildStream process
- # See https://github.com/grpc/grpc/blob/master/doc/fork_support.md for details
- p = multiprocessing.Process(target=self._initialize_remote, args=(remote_spec, q))
+ request = buildstream_pb2.StatusRequest()
+ for attempt in _retry():
+ with attempt:
+ response = remote.ref_storage.Status(request)
- try:
- # Keep SIGINT blocked in the child process
- with _signals.blocked([signal.SIGINT], ignore=False):
- p.start()
-
- error = q.get()
- p.join()
- except KeyboardInterrupt:
- utils._kill_process_tree(p.pid)
- raise
-
- if error and on_failure:
- on_failure(remote_spec.url, error)
- elif error:
- raise ArtifactError(error)
+ if remote_spec.push and not response.allow_updates:
+ q.put('CAS server does not allow push')
else:
- self._has_fetch_remotes = True
- if remote_spec.push:
- self._has_push_remotes = True
+ # No error
+ q.put(None)
- remotes[remote_spec.url] = _CASRemote(remote_spec)
+ except grpc.RpcError as e:
+ # str(e) is too verbose for errors reported to the user
+ q.put(e.details())
- for project in self.context.get_projects():
- remote_specs = self.global_remote_specs
- if project in self.project_remote_specs:
- remote_specs = list(utils._deduplicate(remote_specs + self.project_remote_specs[project]))
+ except Exception as e: # pylint: disable=broad-except
+ # Whatever happens, we need to return it to the calling process
+ #
+ q.put(str(e))
- project_remotes = []
+ # pull():
+ #
+ # Pull a ref from a remote repository.
+ #
+ # Args:
+ # ref (str): The ref to pull
+ # remote (CASRemote): The remote repository to pull from
+ # progress (callable): The progress callback, if any
+ #
+ # Returns:
+ # (bool): True if pull was successful, False if ref was not available
+ #
+ def pull(self, ref, remote, *, progress=None):
+ try:
+ remote.init()
- for remote_spec in remote_specs:
- # Errors are already handled in the loop above,
- # skip unreachable remotes here.
- if remote_spec.url not in remotes:
- continue
+ request = buildstream_pb2.GetReferenceRequest()
+ request.key = ref
+ for attempt in _retry():
+ with attempt:
+ response = remote.ref_storage.GetReference(request)
- remote = remotes[remote_spec.url]
- project_remotes.append(remote)
+ tree = remote_execution_pb2.Digest()
+ tree.hash = response.digest.hash
+ tree.size_bytes = response.digest.size_bytes
- self._remotes[project] = project_remotes
+ self._fetch_directory(remote, tree)
- def has_fetch_remotes(self, *, element=None):
- if not self._has_fetch_remotes:
- # No project has fetch remotes
- return False
- elif element is None:
- # At least one (sub)project has fetch remotes
- return True
- else:
- # Check whether the specified element's project has fetch remotes
- remotes_for_project = self._remotes[element._get_project()]
- return bool(remotes_for_project)
+ self.set_ref(ref, tree)
- def has_push_remotes(self, *, element=None):
- if not self._has_push_remotes:
- # No project has push remotes
- return False
- elif element is None:
- # At least one (sub)project has push remotes
return True
- else:
- # Check whether the specified element's project has push remotes
- remotes_for_project = self._remotes[element._get_project()]
- return any(remote.spec.push for remote in remotes_for_project)
-
- def pull(self, element, key, *, progress=None):
- ref = self.get_artifact_fullname(element, key)
- display_key = key[:self.context.log_key_length]
-
- project = element._get_project()
-
- for remote in self._remotes[project]:
- try:
- remote.init()
- element.status("Pulling artifact {} <- {}".format(display_key, remote.spec.url))
-
- request = buildstream_pb2.GetReferenceRequest()
- request.key = ref
- for attempt in _retry():
- with attempt:
- response = remote.ref_storage.GetReference(request)
-
- tree = remote_execution_pb2.Digest()
- tree.hash = response.digest.hash
- tree.size_bytes = response.digest.size_bytes
-
- self._fetch_directory(remote, tree)
-
- self.set_ref(ref, tree)
-
- element.info("Pulled artifact {} <- {}".format(display_key, remote.spec.url))
- # no need to pull from additional remotes
- return True
-
- except grpc.RpcError as e:
- if e.code() != grpc.StatusCode.NOT_FOUND:
- raise ArtifactError("Failed to pull artifact {}: {}".format(
- display_key, e)) from e
- else:
- element.info("Remote ({}) does not have {} cached".format(
- remote.spec.url, display_key
- ))
- except BlobNotFound as e:
- element.info("Remote ({}) does not have {} cached".format(
- remote.spec.url, display_key
- ))
-
- return False
-
- def link_key(self, element, oldkey, newkey):
- oldref = self.get_artifact_fullname(element, oldkey)
- newref = self.get_artifact_fullname(element, newkey)
+ except grpc.RpcError as e:
+ if e.code() != grpc.StatusCode.NOT_FOUND:
+ raise CASError("Failed to pull ref {}: {}".format(ref, e)) from e
+ else:
+ return False
+ # link_ref():
+ #
+ # Add an alias for an existing ref.
+ #
+ # Args:
+ # oldref (str): An existing ref
+ # newref (str): A new ref for the same directory
+ #
+ def link_ref(self, oldref, newref):
tree = self.resolve_ref(oldref)
self.set_ref(newref, tree)
- def _push_refs_to_remote(self, refs, remote):
+ # push():
+ #
+ # Push committed refs to remote repository.
+ #
+ # Args:
+ # refs (list): The refs to push
+ # remote (CASRemote): The remote to push to
+ #
+ # Returns:
+ # (bool): True if any remote was updated, False if no pushes were required
+ #
+ # Raises:
+ # (CASError): if there was an error
+ #
+ def push(self, refs, remote):
skipped_remote = True
try:
for ref in refs:
tree = self.resolve_ref(ref)
# Check whether ref is already on the server in which case
- # there is no need to push the artifact
+ # there is no need to push the ref
try:
request = buildstream_pb2.GetReferenceRequest()
request.key = ref
@@ -364,42 +333,10 @@ class CASCache(ArtifactCache):
skipped_remote = False
except grpc.RpcError as e:
if e.code() != grpc.StatusCode.RESOURCE_EXHAUSTED:
- raise ArtifactError("Failed to push artifact {}: {}".format(refs, e), temporary=True) from e
+ raise CASError("Failed to push ref {}: {}".format(refs, e), temporary=True) from e
return not skipped_remote
- def push(self, element, keys):
-
- refs = [self.get_artifact_fullname(element, key) for key in list(keys)]
-
- project = element._get_project()
-
- push_remotes = [r for r in self._remotes[project] if r.spec.push]
-
- pushed = False
-
- for remote in push_remotes:
- remote.init()
- display_key = element._get_brief_display_key()
- element.status("Pushing artifact {} -> {}".format(display_key, remote.spec.url))
-
- if self._push_refs_to_remote(refs, remote):
- element.info("Pushed artifact {} -> {}".format(display_key, remote.spec.url))
- pushed = True
- else:
- self.context.message(Message(
- None,
- MessageType.INFO,
- "Remote ({}) already has {} cached".format(
- remote.spec.url, element._get_brief_display_key())
- ))
-
- return pushed
-
- ################################################
- # API Private Methods #
- ################################################
-
# objpath():
#
# Return the path of an object based on its digest.
@@ -470,7 +407,7 @@ class CASCache(ArtifactCache):
pass
except OSError as e:
- raise ArtifactError("Failed to hash object: {}".format(e)) from e
+ raise CASError("Failed to hash object: {}".format(e)) from e
return digest
@@ -511,26 +448,39 @@ class CASCache(ArtifactCache):
return digest
except FileNotFoundError as e:
- raise ArtifactError("Attempt to access unavailable artifact: {}".format(e)) from e
+ raise CASError("Attempt to access unavailable ref: {}".format(e)) from e
- def update_mtime(self, element, key):
+ # update_mtime()
+ #
+ # Update the mtime of a ref.
+ #
+ # Args:
+ # ref (str): The ref to update
+ #
+ def update_mtime(self, ref):
try:
- ref = self.get_artifact_fullname(element, key)
os.utime(self._refpath(ref))
except FileNotFoundError as e:
- raise ArtifactError("Attempt to access unavailable artifact: {}".format(e)) from e
+ raise CASError("Attempt to access unavailable ref: {}".format(e)) from e
+ # calculate_cache_size()
+ #
+ # Return the real disk usage of the CAS cache.
+ #
+ # Returns:
+ # (int): The size of the cache.
+ #
def calculate_cache_size(self):
return utils._get_dir_size(self.casdir)
- # list_artifacts():
+ # list_refs():
#
- # List cached artifacts in Least Recently Modified (LRM) order.
+ # List refs in Least Recently Modified (LRM) order.
#
# Returns:
# (list) - A list of refs in LRM order
#
- def list_artifacts(self):
+ def list_refs(self):
# string of: /path/to/repo/refs/heads
ref_heads = os.path.join(self.casdir, 'refs', 'heads')
@@ -545,7 +495,7 @@ class CASCache(ArtifactCache):
mtimes.append(os.path.getmtime(ref_path))
# NOTE: Sorted will sort from earliest to latest, thus the
- # first element of this list will be the file modified earliest.
+ # first ref of this list will be the file modified earliest.
return [ref for _, ref in sorted(zip(mtimes, refs))]
# list_objects():
@@ -599,28 +549,10 @@ class CASCache(ArtifactCache):
#
def remove(self, ref, *, defer_prune=False):
- # Remove extract if not used by other ref
- tree = self.resolve_ref(ref)
- ref_name, ref_hash = os.path.split(ref)
- extract = os.path.join(self.extractdir, ref_name, tree.hash)
- keys_file = os.path.join(extract, 'meta', 'keys.yaml')
- if os.path.exists(keys_file):
- keys_meta = _yaml.load(keys_file)
- keys = [keys_meta['strong'], keys_meta['weak']]
- remove_extract = True
- for other_hash in keys:
- if other_hash == ref_hash:
- continue
- remove_extract = False
- break
-
- if remove_extract:
- utils._force_rmtree(extract)
-
# Remove cache ref
refpath = self._refpath(ref)
if not os.path.exists(refpath):
- raise ArtifactError("Could not find artifact for ref '{}'".format(ref))
+ raise CASError("Could not find ref '{}'".format(ref))
os.unlink(refpath)
@@ -731,7 +663,7 @@ class CASCache(ArtifactCache):
symlinknode.name = name
symlinknode.target = os.readlink(full_path)
else:
- raise ArtifactError("Unsupported file type for {}".format(full_path))
+ raise CASError("Unsupported file type for {}".format(full_path))
return self.add_object(digest=dir_digest,
buffer=directory.SerializeToString())
@@ -750,7 +682,7 @@ class CASCache(ArtifactCache):
if dirnode.name == name:
return dirnode.digest
- raise ArtifactError("Subdirectory {} not found".format(name))
+ raise CASError("Subdirectory {} not found".format(name))
def _diff_trees(self, tree_a, tree_b, *, added, removed, modified, path=""):
dir_a = remote_execution_pb2.Directory()
@@ -827,31 +759,6 @@ class CASCache(ArtifactCache):
for dirnode in directory.directories:
self._reachable_refs_dir(reachable, dirnode.digest, update_mtime=update_mtime)
- def _initialize_remote(self, remote_spec, q):
- try:
- remote = _CASRemote(remote_spec)
- remote.init()
-
- request = buildstream_pb2.StatusRequest()
- for attempt in _retry():
- with attempt:
- response = remote.ref_storage.Status(request)
-
- if remote_spec.push and not response.allow_updates:
- q.put('Artifact server does not allow push')
- else:
- # No error
- q.put(None)
-
- except grpc.RpcError as e:
- # str(e) is too verbose for errors reported to the user
- q.put(e.details())
-
- except Exception as e: # pylint: disable=broad-except
- # Whatever happens, we need to return it to the calling process
- #
- q.put(str(e))
-
def _required_blobs(self, directory_digest):
# parse directory, and recursively add blobs
d = remote_execution_pb2.Digest()
@@ -1091,7 +998,7 @@ class CASCache(ArtifactCache):
# Represents a single remote CAS cache.
#
-class _CASRemote():
+class CASRemote():
def __init__(self, spec):
self.spec = spec
self._initialized = False
@@ -1132,7 +1039,7 @@ class _CASRemote():
certificate_chain=client_cert_bytes)
self.channel = grpc.secure_channel('{}:{}'.format(url.hostname, port), credentials)
else:
- raise ArtifactError("Unsupported URL: {}".format(self.spec.url))
+ raise CASError("Unsupported URL: {}".format(self.spec.url))
self.bytestream = bytestream_pb2_grpc.ByteStreamStub(self.channel)
self.cas = remote_execution_pb2_grpc.ContentAddressableStorageStub(self.channel)
@@ -1221,10 +1128,10 @@ class _CASBatchRead():
raise BlobNotFound(response.digest.hash, "Failed to download blob {}: {}".format(
response.digest.hash, response.status.code))
if response.status.code != grpc.StatusCode.OK.value[0]:
- raise ArtifactError("Failed to download blob {}: {}".format(
+ raise CASError("Failed to download blob {}: {}".format(
response.digest.hash, response.status.code))
if response.digest.size_bytes != len(response.data):
- raise ArtifactError("Failed to download blob {}: expected {} bytes, received {} bytes".format(
+ raise CASError("Failed to download blob {}: expected {} bytes, received {} bytes".format(
response.digest.hash, response.digest.size_bytes, len(response.data)))
yield (response.digest, response.data)
@@ -1268,7 +1175,7 @@ class _CASBatchUpdate():
for response in batch_response.responses:
if response.status.code != grpc.StatusCode.OK.value[0]:
- raise ArtifactError("Failed to upload blob {}: {}".format(
+ raise CASError("Failed to upload blob {}: {}".format(
response.digest.hash, response.status.code))