summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAbderrahim Kitouni <akitouni@gnome.org>2020-07-17 11:46:36 +0100
committerJavier Jardón <jjardon@gnome.org>2020-07-30 10:22:53 +0000
commiteaf6f692869a3b3e0a92defdf604e662d6f770e7 (patch)
tree18d47a6ef8f77984d2460dab1cc135a16f05b465
parent6706d3c64c9cef1f7ee43978aa84f484c55cea1e (diff)
downloadbuildstream-eaf6f692869a3b3e0a92defdf604e662d6f770e7.tar.gz
Split up artifact cache and CAS cache
This changes CASCache from a subclass to a delegate object of ArtifactCache. As the lower layer, CASCache no longer deals with elements or projects. Based on 626d20aefb52d25d987c61f377cc1ce3172da8c3 Fixes #659.
-rw-r--r--buildstream/_artifactcache/artifactcache.py244
-rw-r--r--buildstream/_artifactcache/cascache.py435
-rw-r--r--buildstream/_artifactcache/casserver.py21
-rw-r--r--buildstream/_context.py3
-rw-r--r--buildstream/_exceptions.py10
-rw-r--r--tests/testutils/artifactshare.py6
6 files changed, 380 insertions, 339 deletions
diff --git a/buildstream/_artifactcache/artifactcache.py b/buildstream/_artifactcache/artifactcache.py
index ed5ef8262..38500a048 100644
--- a/buildstream/_artifactcache/artifactcache.py
+++ b/buildstream/_artifactcache/artifactcache.py
@@ -17,16 +17,21 @@
# Authors:
# Tristan Maat <tristan.maat@codethink.co.uk>
+import multiprocessing
import os
+import signal
import string
from collections import Mapping, namedtuple
from ..types import _KeyStrength
-from .._exceptions import ArtifactError, ImplError, LoadError, LoadErrorReason
+from .._exceptions import ArtifactError, CASError, LoadError, LoadErrorReason
from .._message import Message, MessageType
+from .. import _signals
from .. import utils
from .. import _yaml
+from .cascache import CASCache, CASRemote
+
CACHE_SIZE_FILE = "cache_size"
@@ -125,7 +130,8 @@ class ArtifactCache():
def __init__(self, context):
self.context = context
self.extractdir = os.path.join(context.artifactdir, 'extract')
- self.tmpdir = os.path.join(context.artifactdir, 'tmp')
+
+ self.cas = CASCache(context.artifactdir)
self.global_remote_specs = []
self.project_remote_specs = {}
@@ -136,12 +142,15 @@ class ArtifactCache():
self._cache_quota_original = None # The cache quota as specified by the user, in bytes
self._cache_lower_threshold = None # The target cache size for a cleanup
+ # Per-project list of _CASRemote instances.
+ self._remotes = {}
+
+ self._has_fetch_remotes = False
+ self._has_push_remotes = False
+
os.makedirs(self.extractdir, exist_ok=True)
- os.makedirs(self.tmpdir, exist_ok=True)
- ################################################
- # Methods implemented on the abstract class #
- ################################################
+ self._calculate_cache_quota()
# get_artifact_fullname()
#
@@ -268,8 +277,10 @@ class ArtifactCache():
for key in (strong_key, weak_key):
if key:
try:
- self.update_mtime(element, key)
- except ArtifactError:
+ ref = self.get_artifact_fullname(element, key)
+
+ self.cas.update_mtime(ref)
+ except CASError:
pass
# clean():
@@ -392,7 +403,7 @@ class ArtifactCache():
#
def compute_cache_size(self):
old_cache_size = self._cache_size
- new_cache_size = self.calculate_cache_size()
+ new_cache_size = self.cas.calculate_cache_size()
if old_cache_size != new_cache_size:
self._cache_size = new_cache_size
@@ -467,28 +478,12 @@ class ArtifactCache():
def has_quota_exceeded(self):
return self.get_cache_size() > self._cache_quota
- ################################################
- # Abstract methods for subclasses to implement #
- ################################################
-
# preflight():
#
# Preflight check.
#
def preflight(self):
- pass
-
- # update_mtime()
- #
- # Update the mtime of an artifact.
- #
- # Args:
- # element (Element): The Element to update
- # key (str): The key of the artifact.
- #
- def update_mtime(self, element, key):
- raise ImplError("Cache '{kind}' does not implement update_mtime()"
- .format(kind=type(self).__name__))
+ self.cas.preflight()
# initialize_remotes():
#
@@ -498,7 +493,59 @@ class ArtifactCache():
# on_failure (callable): Called if we fail to contact one of the caches.
#
def initialize_remotes(self, *, on_failure=None):
- pass
+ remote_specs = self.global_remote_specs
+
+ for project in self.project_remote_specs:
+ remote_specs += self.project_remote_specs[project]
+
+ remote_specs = list(utils._deduplicate(remote_specs))
+
+ remotes = {}
+ q = multiprocessing.Queue()
+ for remote_spec in remote_specs:
+ # Use subprocess to avoid creation of gRPC threads in main BuildStream process
+ # See https://github.com/grpc/grpc/blob/master/doc/fork_support.md for details
+ p = multiprocessing.Process(target=self.cas.initialize_remote, args=(remote_spec, q))
+
+ try:
+ # Keep SIGINT blocked in the child process
+ with _signals.blocked([signal.SIGINT], ignore=False):
+ p.start()
+
+ error = q.get()
+ p.join()
+ except KeyboardInterrupt:
+ utils._kill_process_tree(p.pid)
+ raise
+
+ if error and on_failure:
+ on_failure(remote_spec.url, error)
+ elif error:
+ raise ArtifactError(error)
+ else:
+ self._has_fetch_remotes = True
+ if remote_spec.push:
+ self._has_push_remotes = True
+
+ remotes[remote_spec.url] = CASRemote(remote_spec)
+
+ for project in self.context.get_projects():
+ remote_specs = self.global_remote_specs
+ if project in self.project_remote_specs:
+ remote_specs = list(utils._deduplicate(remote_specs + self.project_remote_specs[project]))
+
+ project_remotes = []
+
+ for remote_spec in remote_specs:
+ # Errors are already handled in the loop above,
+ # skip unreachable remotes here.
+ if remote_spec.url not in remotes:
+ continue
+
+ remote = remotes[remote_spec.url]
+ project_remotes.append(remote)
+
+ self._remotes[project] = project_remotes
# contains():
#
@@ -512,8 +559,9 @@ class ArtifactCache():
# Returns: True if the artifact is in the cache, False otherwise
#
def contains(self, element, key):
- raise ImplError("Cache '{kind}' does not implement contains()"
- .format(kind=type(self).__name__))
+ ref = self.get_artifact_fullname(element, key)
+
+ return self.cas.contains(ref)
# list_artifacts():
#
@@ -524,8 +572,7 @@ class ArtifactCache():
# `ArtifactCache.get_artifact_fullname` in LRU order
#
def list_artifacts(self):
- raise ImplError("Cache '{kind}' does not implement list_artifacts()"
- .format(kind=type(self).__name__))
+ return self.cas.list_refs()
# remove():
#
@@ -537,9 +584,31 @@ class ArtifactCache():
# generated by
# `ArtifactCache.get_artifact_fullname`)
#
- def remove(self, artifact_name):
- raise ImplError("Cache '{kind}' does not implement remove()"
- .format(kind=type(self).__name__))
+ # Returns:
+ # (int|None) The amount of space pruned from the repository in
+ # Bytes, or None if defer_prune is True
+ #
+ def remove(self, ref):
+
+ # Remove extract if not used by other ref
+ tree = self.cas.resolve_ref(ref)
+ ref_name, ref_hash = os.path.split(ref)
+ extract = os.path.join(self.extractdir, ref_name, tree.hash)
+ keys_file = os.path.join(extract, 'meta', 'keys.yaml')
+ if os.path.exists(keys_file):
+ keys_meta = _yaml.load(keys_file)
+ keys = [keys_meta['strong'], keys_meta['weak']]
+ remove_extract = True
+ for other_hash in keys:
+ if other_hash == ref_hash:
+ continue
+ remove_extract = False
+ break
+
+ if remove_extract:
+ utils._force_rmtree(extract)
+
+ return self.cas.remove(ref)
# extract():
#
@@ -559,8 +628,11 @@ class ArtifactCache():
# Returns: path to extracted artifact
#
def extract(self, element, key):
- raise ImplError("Cache '{kind}' does not implement extract()"
- .format(kind=type(self).__name__))
+ ref = self.get_artifact_fullname(element, key)
+
+ path = os.path.join(self.extractdir, element._get_project().name, element.normal_name)
+
+ return self.cas.extract(ref, path)
# commit():
#
@@ -572,8 +644,9 @@ class ArtifactCache():
# keys (list): The cache keys to use
#
def commit(self, element, content, keys):
- raise ImplError("Cache '{kind}' does not implement commit()"
- .format(kind=type(self).__name__))
+ refs = [self.get_artifact_fullname(element, key) for key in keys]
+
+ self.cas.commit(refs, content)
# diff():
#
@@ -587,8 +660,10 @@ class ArtifactCache():
# subdir (str): A subdirectory to limit the comparison to
#
def diff(self, element, key_a, key_b, *, subdir=None):
- raise ImplError("Cache '{kind}' does not implement diff()"
- .format(kind=type(self).__name__))
+ ref_a = self.get_artifact_fullname(element, key_a)
+ ref_b = self.get_artifact_fullname(element, key_b)
+
+ return self.cas.diff(ref_a, ref_b, subdir=subdir)
# has_fetch_remotes():
#
@@ -600,7 +675,16 @@ class ArtifactCache():
# Returns: True if any remote repositories are configured, False otherwise
#
def has_fetch_remotes(self, *, element=None):
- return False
+ if not self._has_fetch_remotes:
+ # No project has fetch remotes
+ return False
+ elif element is None:
+ # At least one (sub)project has fetch remotes
+ return True
+ else:
+ # Check whether the specified element's project has fetch remotes
+ remotes_for_project = self._remotes[element._get_project()]
+ return bool(remotes_for_project)
# has_push_remotes():
#
@@ -612,7 +696,16 @@ class ArtifactCache():
# Returns: True if any remote repository is configured, False otherwise
#
def has_push_remotes(self, *, element=None):
- return False
+ if not self._has_push_remotes:
+ # No project has push remotes
+ return False
+ elif element is None:
+ # At least one (sub)project has push remotes
+ return True
+ else:
+ # Check whether the specified element's project has push remotes
+ remotes_for_project = self._remotes[element._get_project()]
+ return any(remote.spec.push for remote in remotes_for_project)
# push():
#
@@ -629,8 +722,28 @@ class ArtifactCache():
# (ArtifactError): if there was an error
#
def push(self, element, keys):
- raise ImplError("Cache '{kind}' does not implement push()"
- .format(kind=type(self).__name__))
+ refs = [self.get_artifact_fullname(element, key) for key in list(keys)]
+
+ project = element._get_project()
+
+ push_remotes = [r for r in self._remotes[project] if r.spec.push]
+
+ pushed = False
+
+ for remote in push_remotes:
+ remote.init()
+ display_key = element._get_brief_display_key()
+ element.status("Pushing artifact {} -> {}".format(display_key, remote.spec.url))
+
+ if self.cas.push(refs, remote):
+ element.info("Pushed artifact {} -> {}".format(display_key, remote.spec.url))
+ pushed = True
+ else:
+ element.info("Remote ({}) already has {} cached".format(
+ remote.spec.url, element._get_brief_display_key()
+ ))
+
+ return pushed
# pull():
#
@@ -645,8 +758,32 @@ class ArtifactCache():
# (bool): True if pull was successful, False if artifact was not available
#
def pull(self, element, key, *, progress=None):
- raise ImplError("Cache '{kind}' does not implement pull()"
- .format(kind=type(self).__name__))
+ ref = self.get_artifact_fullname(element, key)
+ display_key = key[:self.context.log_key_length]
+
+ project = element._get_project()
+
+ for remote in self._remotes[project]:
+ try:
+ element.status("Pulling artifact {} <- {}".format(display_key, remote.spec.url))
+
+ if self.cas.pull(ref, remote, progress=progress):
+ element.info("Pulled artifact {} <- {}".format(display_key, remote.spec.url))
+ # no need to pull from additional remotes
+ return True
+ else:
+ element.info("Remote ({}) does not have {} cached".format(
+ remote.spec.url, display_key
+ ))
+ except BlobNotFound as e:
+ element.info("Remote ({}) does not have {} cached".format(
+ remote.spec.url, display_key
+ ))
+ except CASError as e:
+ raise ArtifactError("Failed to pull artifact {}: {}".format(
+ display_key, e)) from e
+
+ return False
# link_key():
#
@@ -658,19 +795,10 @@ class ArtifactCache():
# newkey (str): A new cache key for the artifact
#
def link_key(self, element, oldkey, newkey):
- raise ImplError("Cache '{kind}' does not implement link_key()"
- .format(kind=type(self).__name__))
+ oldref = self.get_artifact_fullname(element, oldkey)
+ newref = self.get_artifact_fullname(element, newkey)
- # calculate_cache_size()
- #
- # Return the real artifact cache size.
- #
- # Returns:
- # (int): The size of the artifact cache.
- #
- def calculate_cache_size(self):
- raise ImplError("Cache '{kind}' does not implement calculate_cache_size()"
- .format(kind=type(self).__name__))
+ self.cas.link_ref(oldref, newref)
################################################
# Local Private Methods #
diff --git a/buildstream/_artifactcache/cascache.py b/buildstream/_artifactcache/cascache.py
index a3d27c8d1..c6efcf508 100644
--- a/buildstream/_artifactcache/cascache.py
+++ b/buildstream/_artifactcache/cascache.py
@@ -19,9 +19,7 @@
import hashlib
import itertools
-import multiprocessing
import os
-import signal
import stat
import tempfile
import uuid
@@ -31,17 +29,12 @@ from urllib.parse import urlparse
import grpc
-from .. import _yaml
-
from .._protos.google.bytestream import bytestream_pb2, bytestream_pb2_grpc
from .._protos.build.bazel.remote.execution.v2 import remote_execution_pb2, remote_execution_pb2_grpc
from .._protos.buildstream.v2 import buildstream_pb2, buildstream_pb2_grpc
-from .._message import MessageType, Message
-from .. import _signals, utils
-from .._exceptions import ArtifactError
-
-from . import ArtifactCache
+from .. import utils
+from .._exceptions import CASError
# The default limit for gRPC messages is 4 MiB.
@@ -89,68 +82,74 @@ def _retry(tries=5):
break
-class BlobNotFound(ArtifactError):
+class BlobNotFound(CASError):
def __init__(self, blob, msg):
self.blob = blob
super().__init__(msg)
-# A CASCache manages artifacts in a CAS repository as specified in the
-# Remote Execution API.
+# A CASCache manages a CAS repository as specified in the Remote Execution API.
#
# Args:
-# context (Context): The BuildStream context
-#
-# Pushing is explicitly disabled by the platform in some cases,
-# like when we are falling back to functioning without using
-# user namespaces.
+# path (str): The root directory for the CAS repository
#
-class CASCache(ArtifactCache):
+class CASCache():
- def __init__(self, context):
- super().__init__(context)
-
- self.casdir = os.path.join(context.artifactdir, 'cas')
+ def __init__(self, path):
+ self.casdir = os.path.join(path, 'cas')
+ self.tmpdir = os.path.join(path, 'tmp')
os.makedirs(os.path.join(self.casdir, 'refs', 'heads'), exist_ok=True)
os.makedirs(os.path.join(self.casdir, 'objects'), exist_ok=True)
+ os.makedirs(self.tmpdir, exist_ok=True)
- self._calculate_cache_quota()
-
- # Per-project list of _CASRemote instances.
- self._remotes = {}
-
- self._has_fetch_remotes = False
- self._has_push_remotes = False
-
- ################################################
- # Implementation of abstract methods #
- ################################################
-
+ # preflight():
+ #
+ # Preflight check.
+ #
def preflight(self):
if (not os.path.isdir(os.path.join(self.casdir, 'refs', 'heads')) or
not os.path.isdir(os.path.join(self.casdir, 'objects'))):
- raise ArtifactError("CAS repository check failed for '{}'"
- .format(self.casdir))
+ raise CASError("CAS repository check failed for '{}'".format(self.casdir))
- def contains(self, element, key):
- refpath = self._refpath(self.get_artifact_fullname(element, key))
+ # contains():
+ #
+ # Check whether the specified ref is already available in the local CAS cache.
+ #
+ # Args:
+ # ref (str): The ref to check
+ #
+ # Returns: True if the ref is in the cache, False otherwise
+ #
+ def contains(self, ref):
+ refpath = self._refpath(ref)
# This assumes that the repository doesn't have any dangling pointers
return os.path.exists(refpath)
- def extract(self, element, key):
- ref = self.get_artifact_fullname(element, key)
-
+ # extract():
+ #
+ # Extract cached directory for the specified ref if it hasn't
+ # already been extracted.
+ #
+ # Args:
+ # ref (str): The ref whose directory to extract
+ # path (str): The destination path
+ #
+ # Raises:
+ # CASError: In cases there was an OSError, or if the ref did not exist.
+ #
+ # Returns: path to extracted directory
+ #
+ def extract(self, ref, path):
tree = self.resolve_ref(ref, update_mtime=True)
- dest = os.path.join(self.extractdir, element._get_project().name,
- element.normal_name, tree.hash)
+ dest = os.path.join(path, tree.hash)
if os.path.isdir(dest):
- # artifact has already been extracted
+ # directory has already been extracted
return dest
- with tempfile.TemporaryDirectory(prefix='tmp', dir=self.extractdir) as tmpdir:
+ with tempfile.TemporaryDirectory(prefix='tmp', dir=self.tmpdir) as tmpdir:
checkoutdir = os.path.join(tmpdir, ref)
self._checkout(checkoutdir, tree)
@@ -164,23 +163,35 @@ class CASCache(ArtifactCache):
# If rename fails with these errors, another process beat
# us to it so just ignore.
if e.errno not in [errno.ENOTEMPTY, errno.EEXIST]:
- raise ArtifactError("Failed to extract artifact for ref '{}': {}"
- .format(ref, e)) from e
+ raise CASError("Failed to extract directory for ref '{}': {}".format(ref, e)) from e
return dest
- def commit(self, element, content, keys):
- refs = [self.get_artifact_fullname(element, key) for key in keys]
-
- tree = self._commit_directory(content)
+ # commit():
+ #
+ # Commit directory to cache.
+ #
+ # Args:
+ # refs (list): The refs to set
+ # path (str): The directory to import
+ #
+ def commit(self, refs, path):
+ tree = self._commit_directory(path)
for ref in refs:
self.set_ref(ref, tree)
- def diff(self, element, key_a, key_b, *, subdir=None):
- ref_a = self.get_artifact_fullname(element, key_a)
- ref_b = self.get_artifact_fullname(element, key_b)
-
+ # diff():
+ #
+ # Return a list of files that have been added or modified between
+ # the refs described by ref_a and ref_b.
+ #
+ # Args:
+ # ref_a (str): The first ref
+ # ref_b (str): The second ref
+ # subdir (str): A subdirectory to limit the comparison to
+ #
+ def diff(self, ref_a, ref_b, *, subdir=None):
tree_a = self.resolve_ref(ref_a)
tree_b = self.resolve_ref(ref_b)
@@ -196,145 +207,103 @@ class CASCache(ArtifactCache):
return modified, removed, added
- def initialize_remotes(self, *, on_failure=None):
- remote_specs = self.global_remote_specs
-
- for project in self.project_remote_specs:
- remote_specs += self.project_remote_specs[project]
-
- remote_specs = list(utils._deduplicate(remote_specs))
+ def initialize_remote(self, remote_spec, q):
+ try:
+ remote = CASRemote(remote_spec)
+ remote.init()
- remotes = {}
- q = multiprocessing.Queue()
- for remote_spec in remote_specs:
- # Use subprocess to avoid creation of gRPC threads in main BuildStream process
- # See https://github.com/grpc/grpc/blob/master/doc/fork_support.md for details
- p = multiprocessing.Process(target=self._initialize_remote, args=(remote_spec, q))
+ request = buildstream_pb2.StatusRequest()
+ for attempt in _retry():
+ with attempt:
+ response = remote.ref_storage.Status(request)
- try:
- # Keep SIGINT blocked in the child process
- with _signals.blocked([signal.SIGINT], ignore=False):
- p.start()
-
- error = q.get()
- p.join()
- except KeyboardInterrupt:
- utils._kill_process_tree(p.pid)
- raise
-
- if error and on_failure:
- on_failure(remote_spec.url, error)
- elif error:
- raise ArtifactError(error)
+ if remote_spec.push and not response.allow_updates:
+ q.put('CAS server does not allow push')
else:
- self._has_fetch_remotes = True
- if remote_spec.push:
- self._has_push_remotes = True
+ # No error
+ q.put(None)
- remotes[remote_spec.url] = _CASRemote(remote_spec)
+ except grpc.RpcError as e:
+ # str(e) is too verbose for errors reported to the user
+ q.put(e.details())
- for project in self.context.get_projects():
- remote_specs = self.global_remote_specs
- if project in self.project_remote_specs:
- remote_specs = list(utils._deduplicate(remote_specs + self.project_remote_specs[project]))
+ except Exception as e: # pylint: disable=broad-except
+ # Whatever happens, we need to return it to the calling process
+ #
+ q.put(str(e))
- project_remotes = []
+ # pull():
+ #
+ # Pull a ref from a remote repository.
+ #
+ # Args:
+ # ref (str): The ref to pull
+ # remote (CASRemote): The remote repository to pull from
+ # progress (callable): The progress callback, if any
+ #
+ # Returns:
+ # (bool): True if pull was successful, False if ref was not available
+ #
+ def pull(self, ref, remote, *, progress=None):
+ try:
+ remote.init()
- for remote_spec in remote_specs:
- # Errors are already handled in the loop above,
- # skip unreachable remotes here.
- if remote_spec.url not in remotes:
- continue
+ request = buildstream_pb2.GetReferenceRequest()
+ request.key = ref
+ for attempt in _retry():
+ with attempt:
+ response = remote.ref_storage.GetReference(request)
- remote = remotes[remote_spec.url]
- project_remotes.append(remote)
+ tree = remote_execution_pb2.Digest()
+ tree.hash = response.digest.hash
+ tree.size_bytes = response.digest.size_bytes
- self._remotes[project] = project_remotes
+ self._fetch_directory(remote, tree)
- def has_fetch_remotes(self, *, element=None):
- if not self._has_fetch_remotes:
- # No project has fetch remotes
- return False
- elif element is None:
- # At least one (sub)project has fetch remotes
- return True
- else:
- # Check whether the specified element's project has fetch remotes
- remotes_for_project = self._remotes[element._get_project()]
- return bool(remotes_for_project)
+ self.set_ref(ref, tree)
- def has_push_remotes(self, *, element=None):
- if not self._has_push_remotes:
- # No project has push remotes
- return False
- elif element is None:
- # At least one (sub)project has push remotes
return True
- else:
- # Check whether the specified element's project has push remotes
- remotes_for_project = self._remotes[element._get_project()]
- return any(remote.spec.push for remote in remotes_for_project)
-
- def pull(self, element, key, *, progress=None):
- ref = self.get_artifact_fullname(element, key)
- display_key = key[:self.context.log_key_length]
-
- project = element._get_project()
-
- for remote in self._remotes[project]:
- try:
- remote.init()
- element.status("Pulling artifact {} <- {}".format(display_key, remote.spec.url))
-
- request = buildstream_pb2.GetReferenceRequest()
- request.key = ref
- for attempt in _retry():
- with attempt:
- response = remote.ref_storage.GetReference(request)
-
- tree = remote_execution_pb2.Digest()
- tree.hash = response.digest.hash
- tree.size_bytes = response.digest.size_bytes
-
- self._fetch_directory(remote, tree)
-
- self.set_ref(ref, tree)
-
- element.info("Pulled artifact {} <- {}".format(display_key, remote.spec.url))
- # no need to pull from additional remotes
- return True
-
- except grpc.RpcError as e:
- if e.code() != grpc.StatusCode.NOT_FOUND:
- raise ArtifactError("Failed to pull artifact {}: {}".format(
- display_key, e)) from e
- else:
- element.info("Remote ({}) does not have {} cached".format(
- remote.spec.url, display_key
- ))
- except BlobNotFound as e:
- element.info("Remote ({}) does not have {} cached".format(
- remote.spec.url, display_key
- ))
-
- return False
-
- def link_key(self, element, oldkey, newkey):
- oldref = self.get_artifact_fullname(element, oldkey)
- newref = self.get_artifact_fullname(element, newkey)
+ except grpc.RpcError as e:
+ if e.code() != grpc.StatusCode.NOT_FOUND:
+ raise CASError("Failed to pull ref {}: {}".format(ref, e)) from e
+ else:
+ return False
+ # link_ref():
+ #
+ # Add an alias for an existing ref.
+ #
+ # Args:
+ # oldref (str): An existing ref
+ # newref (str): A new ref for the same directory
+ #
+ def link_ref(self, oldref, newref):
tree = self.resolve_ref(oldref)
self.set_ref(newref, tree)
- def _push_refs_to_remote(self, refs, remote):
+ # push():
+ #
+ # Push committed refs to remote repository.
+ #
+ # Args:
+ # refs (list): The refs to push
+ # remote (CASRemote): The remote to push to
+ #
+ # Returns:
+ # (bool): True if any remote was updated, False if no pushes were required
+ #
+ # Raises:
+ # (CASError): if there was an error
+ #
+ def push(self, refs, remote):
skipped_remote = True
try:
for ref in refs:
tree = self.resolve_ref(ref)
# Check whether ref is already on the server in which case
- # there is no need to push the artifact
+ # there is no need to push the ref
try:
request = buildstream_pb2.GetReferenceRequest()
request.key = ref
@@ -364,42 +333,10 @@ class CASCache(ArtifactCache):
skipped_remote = False
except grpc.RpcError as e:
if e.code() != grpc.StatusCode.RESOURCE_EXHAUSTED:
- raise ArtifactError("Failed to push artifact {}: {}".format(refs, e), temporary=True) from e
+ raise CASError("Failed to push ref {}: {}".format(refs, e), temporary=True) from e
return not skipped_remote
- def push(self, element, keys):
-
- refs = [self.get_artifact_fullname(element, key) for key in list(keys)]
-
- project = element._get_project()
-
- push_remotes = [r for r in self._remotes[project] if r.spec.push]
-
- pushed = False
-
- for remote in push_remotes:
- remote.init()
- display_key = element._get_brief_display_key()
- element.status("Pushing artifact {} -> {}".format(display_key, remote.spec.url))
-
- if self._push_refs_to_remote(refs, remote):
- element.info("Pushed artifact {} -> {}".format(display_key, remote.spec.url))
- pushed = True
- else:
- self.context.message(Message(
- None,
- MessageType.INFO,
- "Remote ({}) already has {} cached".format(
- remote.spec.url, element._get_brief_display_key())
- ))
-
- return pushed
-
- ################################################
- # API Private Methods #
- ################################################
-
# objpath():
#
# Return the path of an object based on its digest.
@@ -470,7 +407,7 @@ class CASCache(ArtifactCache):
pass
except OSError as e:
- raise ArtifactError("Failed to hash object: {}".format(e)) from e
+ raise CASError("Failed to hash object: {}".format(e)) from e
return digest
@@ -511,26 +448,39 @@ class CASCache(ArtifactCache):
return digest
except FileNotFoundError as e:
- raise ArtifactError("Attempt to access unavailable artifact: {}".format(e)) from e
+ raise CASError("Attempt to access unavailable ref: {}".format(e)) from e
- def update_mtime(self, element, key):
+ # update_mtime()
+ #
+ # Update the mtime of a ref.
+ #
+ # Args:
+ # ref (str): The ref to update
+ #
+ def update_mtime(self, ref):
try:
- ref = self.get_artifact_fullname(element, key)
os.utime(self._refpath(ref))
except FileNotFoundError as e:
- raise ArtifactError("Attempt to access unavailable artifact: {}".format(e)) from e
+ raise CASError("Attempt to access unavailable ref: {}".format(e)) from e
+ # calculate_cache_size()
+ #
+ # Return the real disk usage of the CAS cache.
+ #
+ # Returns:
+ # (int): The size of the cache.
+ #
def calculate_cache_size(self):
return utils._get_dir_size(self.casdir)
- # list_artifacts():
+ # list_refs():
#
- # List cached artifacts in Least Recently Modified (LRM) order.
+ # List refs in Least Recently Modified (LRM) order.
#
# Returns:
# (list) - A list of refs in LRM order
#
- def list_artifacts(self):
+ def list_refs(self):
# string of: /path/to/repo/refs/heads
ref_heads = os.path.join(self.casdir, 'refs', 'heads')
@@ -545,7 +495,7 @@ class CASCache(ArtifactCache):
mtimes.append(os.path.getmtime(ref_path))
# NOTE: Sorted will sort from earliest to latest, thus the
- # first element of this list will be the file modified earliest.
+ # first ref of this list will be the file modified earliest.
return [ref for _, ref in sorted(zip(mtimes, refs))]
# list_objects():
@@ -599,28 +549,10 @@ class CASCache(ArtifactCache):
#
def remove(self, ref, *, defer_prune=False):
- # Remove extract if not used by other ref
- tree = self.resolve_ref(ref)
- ref_name, ref_hash = os.path.split(ref)
- extract = os.path.join(self.extractdir, ref_name, tree.hash)
- keys_file = os.path.join(extract, 'meta', 'keys.yaml')
- if os.path.exists(keys_file):
- keys_meta = _yaml.load(keys_file)
- keys = [keys_meta['strong'], keys_meta['weak']]
- remove_extract = True
- for other_hash in keys:
- if other_hash == ref_hash:
- continue
- remove_extract = False
- break
-
- if remove_extract:
- utils._force_rmtree(extract)
-
# Remove cache ref
refpath = self._refpath(ref)
if not os.path.exists(refpath):
- raise ArtifactError("Could not find artifact for ref '{}'".format(ref))
+ raise CASError("Could not find ref '{}'".format(ref))
os.unlink(refpath)
@@ -731,7 +663,7 @@ class CASCache(ArtifactCache):
symlinknode.name = name
symlinknode.target = os.readlink(full_path)
else:
- raise ArtifactError("Unsupported file type for {}".format(full_path))
+ raise CASError("Unsupported file type for {}".format(full_path))
return self.add_object(digest=dir_digest,
buffer=directory.SerializeToString())
@@ -750,7 +682,7 @@ class CASCache(ArtifactCache):
if dirnode.name == name:
return dirnode.digest
- raise ArtifactError("Subdirectory {} not found".format(name))
+ raise CASError("Subdirectory {} not found".format(name))
def _diff_trees(self, tree_a, tree_b, *, added, removed, modified, path=""):
dir_a = remote_execution_pb2.Directory()
@@ -827,31 +759,6 @@ class CASCache(ArtifactCache):
for dirnode in directory.directories:
self._reachable_refs_dir(reachable, dirnode.digest, update_mtime=update_mtime)
- def _initialize_remote(self, remote_spec, q):
- try:
- remote = _CASRemote(remote_spec)
- remote.init()
-
- request = buildstream_pb2.StatusRequest()
- for attempt in _retry():
- with attempt:
- response = remote.ref_storage.Status(request)
-
- if remote_spec.push and not response.allow_updates:
- q.put('Artifact server does not allow push')
- else:
- # No error
- q.put(None)
-
- except grpc.RpcError as e:
- # str(e) is too verbose for errors reported to the user
- q.put(e.details())
-
- except Exception as e: # pylint: disable=broad-except
- # Whatever happens, we need to return it to the calling process
- #
- q.put(str(e))
-
def _required_blobs(self, directory_digest):
# parse directory, and recursively add blobs
d = remote_execution_pb2.Digest()
@@ -1091,7 +998,7 @@ class CASCache(ArtifactCache):
# Represents a single remote CAS cache.
#
-class _CASRemote():
+class CASRemote():
def __init__(self, spec):
self.spec = spec
self._initialized = False
@@ -1132,7 +1039,7 @@ class _CASRemote():
certificate_chain=client_cert_bytes)
self.channel = grpc.secure_channel('{}:{}'.format(url.hostname, port), credentials)
else:
- raise ArtifactError("Unsupported URL: {}".format(self.spec.url))
+ raise CASError("Unsupported URL: {}".format(self.spec.url))
self.bytestream = bytestream_pb2_grpc.ByteStreamStub(self.channel)
self.cas = remote_execution_pb2_grpc.ContentAddressableStorageStub(self.channel)
@@ -1221,10 +1128,10 @@ class _CASBatchRead():
raise BlobNotFound(response.digest.hash, "Failed to download blob {}: {}".format(
response.digest.hash, response.status.code))
if response.status.code != grpc.StatusCode.OK.value[0]:
- raise ArtifactError("Failed to download blob {}: {}".format(
+ raise CASError("Failed to download blob {}: {}".format(
response.digest.hash, response.status.code))
if response.digest.size_bytes != len(response.data):
- raise ArtifactError("Failed to download blob {}: expected {} bytes, received {} bytes".format(
+ raise CASError("Failed to download blob {}: expected {} bytes, received {} bytes".format(
response.digest.hash, response.digest.size_bytes, len(response.data)))
yield (response.digest, response.data)
@@ -1268,7 +1175,7 @@ class _CASBatchUpdate():
for response in batch_response.responses:
if response.status.code != grpc.StatusCode.OK.value[0]:
- raise ArtifactError("Failed to upload blob {}: {}".format(
+ raise CASError("Failed to upload blob {}: {}".format(
response.digest.hash, response.status.code))
diff --git a/buildstream/_artifactcache/casserver.py b/buildstream/_artifactcache/casserver.py
index 4a9d5191a..1fdab80a8 100644
--- a/buildstream/_artifactcache/casserver.py
+++ b/buildstream/_artifactcache/casserver.py
@@ -34,8 +34,9 @@ from .._protos.build.bazel.remote.execution.v2 import remote_execution_pb2, remo
from .._protos.google.bytestream import bytestream_pb2, bytestream_pb2_grpc
from .._protos.buildstream.v2 import buildstream_pb2, buildstream_pb2_grpc
-from .._exceptions import ArtifactError
-from .._context import Context
+from .._exceptions import CASError
+
+from .cascache import CASCache
# The default limit for gRPC messages is 4 MiB.
@@ -66,29 +67,25 @@ def message_handler(message, context):
def create_server(repo, *, enable_push,
max_head_size=int(10e9),
min_head_size=int(2e9)):
- context = Context()
- context.artifactdir = os.path.abspath(repo)
- context.set_message_handler(message_handler)
-
- artifactcache = context.artifactcache
+ cas = CASCache(os.path.abspath(repo))
# Use max_workers default from Python 3.5+
max_workers = (os.cpu_count() or 1) * 5
server = grpc.server(futures.ThreadPoolExecutor(max_workers))
- cache_cleaner = _CacheCleaner(artifactcache, max_head_size, min_head_size)
+ cache_cleaner = _CacheCleaner(cas, max_head_size, min_head_size)
bytestream_pb2_grpc.add_ByteStreamServicer_to_server(
- _ByteStreamServicer(artifactcache, cache_cleaner, enable_push=enable_push), server)
+ _ByteStreamServicer(cas, cache_cleaner, enable_push=enable_push), server)
remote_execution_pb2_grpc.add_ContentAddressableStorageServicer_to_server(
- _ContentAddressableStorageServicer(artifactcache, cache_cleaner, enable_push=enable_push), server)
+ _ContentAddressableStorageServicer(cas, cache_cleaner, enable_push=enable_push), server)
remote_execution_pb2_grpc.add_CapabilitiesServicer_to_server(
_CapabilitiesServicer(), server)
buildstream_pb2_grpc.add_ReferenceStorageServicer_to_server(
- _ReferenceStorageServicer(artifactcache, enable_push=enable_push), server)
+ _ReferenceStorageServicer(cas, enable_push=enable_push), server)
return server
@@ -389,7 +386,7 @@ class _ReferenceStorageServicer(buildstream_pb2_grpc.ReferenceStorageServicer):
response.digest.hash = tree.hash
response.digest.size_bytes = tree.size_bytes
- except ArtifactError:
+ except CASError:
context.set_code(grpc.StatusCode.NOT_FOUND)
return response
diff --git a/buildstream/_context.py b/buildstream/_context.py
index f36bfd343..8ee45f787 100644
--- a/buildstream/_context.py
+++ b/buildstream/_context.py
@@ -30,7 +30,6 @@ from ._exceptions import LoadError, LoadErrorReason, BstError
from ._message import Message, MessageType
from ._profile import Topics, profile_start, profile_end
from ._artifactcache import ArtifactCache, ArtifactCacheUsage
-from ._artifactcache.cascache import CASCache
from ._workspaces import Workspaces
from .plugin import Plugin
@@ -246,7 +245,7 @@ class Context():
@property
def artifactcache(self):
if not self._artifactcache:
- self._artifactcache = CASCache(self)
+ self._artifactcache = ArtifactCache(self)
return self._artifactcache
diff --git a/buildstream/_exceptions.py b/buildstream/_exceptions.py
index aeacf8dcd..44d890ead 100644
--- a/buildstream/_exceptions.py
+++ b/buildstream/_exceptions.py
@@ -89,6 +89,7 @@ class ErrorDomain(Enum):
ELEMENT = 11
APP = 12
STREAM = 13
+ CAS = 15
# BstError is an internal base exception class for BuildSream
@@ -275,6 +276,15 @@ class ArtifactError(BstError):
super().__init__(message, detail=detail, domain=ErrorDomain.ARTIFACT, reason=reason, temporary=True)
+# CASError
+#
+# Raised when errors are encountered in the CAS
+#
+class CASError(BstError):
+ def __init__(self, message, *, detail=None, reason=None, temporary=False):
+ super().__init__(message, detail=detail, domain=ErrorDomain.CAS, reason=reason, temporary=True)
+
+
# PipelineError
#
# Raised from pipeline operations
diff --git a/tests/testutils/artifactshare.py b/tests/testutils/artifactshare.py
index c7987e02c..d929a7b91 100644
--- a/tests/testutils/artifactshare.py
+++ b/tests/testutils/artifactshare.py
@@ -13,7 +13,7 @@ from multiprocessing import Process, Queue
from buildstream import _yaml
from buildstream._artifactcache.casserver import create_server
from buildstream._context import Context
-from buildstream._exceptions import ArtifactError
+from buildstream._exceptions import CASError
# ArtifactShare()
@@ -52,7 +52,7 @@ class ArtifactShare():
context.artifactdir = self.repodir
context.set_message_handler(self._message_handler)
- self.cas = context.artifactcache
+ self.cas = context.artifactcache.cas
self.total_space = total_space
self.free_space = free_space
@@ -144,7 +144,7 @@ class ArtifactShare():
if not os.path.exists(object_name):
return False
return True
- except ArtifactError:
+ except CASError:
return False
# close():