summaryrefslogtreecommitdiff
path: root/buildstream/_artifactcache/artifactcache.py
diff options
context:
space:
mode:
Diffstat (limited to 'buildstream/_artifactcache/artifactcache.py')
-rw-r--r--buildstream/_artifactcache/artifactcache.py244
1 files changed, 186 insertions, 58 deletions
diff --git a/buildstream/_artifactcache/artifactcache.py b/buildstream/_artifactcache/artifactcache.py
index ed5ef8262..38500a048 100644
--- a/buildstream/_artifactcache/artifactcache.py
+++ b/buildstream/_artifactcache/artifactcache.py
@@ -17,16 +17,21 @@
# Authors:
# Tristan Maat <tristan.maat@codethink.co.uk>
+import multiprocessing
import os
+import signal
import string
from collections import Mapping, namedtuple
from ..types import _KeyStrength
-from .._exceptions import ArtifactError, ImplError, LoadError, LoadErrorReason
+from .._exceptions import ArtifactError, CASError, LoadError, LoadErrorReason
from .._message import Message, MessageType
+from .. import _signals
from .. import utils
from .. import _yaml
+from .cascache import CASCache, CASRemote
+
CACHE_SIZE_FILE = "cache_size"
@@ -125,7 +130,8 @@ class ArtifactCache():
def __init__(self, context):
self.context = context
self.extractdir = os.path.join(context.artifactdir, 'extract')
- self.tmpdir = os.path.join(context.artifactdir, 'tmp')
+
+ self.cas = CASCache(context.artifactdir)
self.global_remote_specs = []
self.project_remote_specs = {}
@@ -136,12 +142,15 @@ class ArtifactCache():
self._cache_quota_original = None # The cache quota as specified by the user, in bytes
self._cache_lower_threshold = None # The target cache size for a cleanup
+ # Per-project list of _CASRemote instances.
+ self._remotes = {}
+
+ self._has_fetch_remotes = False
+ self._has_push_remotes = False
+
os.makedirs(self.extractdir, exist_ok=True)
- os.makedirs(self.tmpdir, exist_ok=True)
- ################################################
- # Methods implemented on the abstract class #
- ################################################
+ self._calculate_cache_quota()
# get_artifact_fullname()
#
@@ -268,8 +277,10 @@ class ArtifactCache():
for key in (strong_key, weak_key):
if key:
try:
- self.update_mtime(element, key)
- except ArtifactError:
+ ref = self.get_artifact_fullname(element, key)
+
+ self.cas.update_mtime(ref)
+ except CASError:
pass
# clean():
@@ -392,7 +403,7 @@ class ArtifactCache():
#
def compute_cache_size(self):
old_cache_size = self._cache_size
- new_cache_size = self.calculate_cache_size()
+ new_cache_size = self.cas.calculate_cache_size()
if old_cache_size != new_cache_size:
self._cache_size = new_cache_size
@@ -467,28 +478,12 @@ class ArtifactCache():
def has_quota_exceeded(self):
return self.get_cache_size() > self._cache_quota
- ################################################
- # Abstract methods for subclasses to implement #
- ################################################
-
# preflight():
#
# Preflight check.
#
def preflight(self):
- pass
-
- # update_mtime()
- #
- # Update the mtime of an artifact.
- #
- # Args:
- # element (Element): The Element to update
- # key (str): The key of the artifact.
- #
- def update_mtime(self, element, key):
- raise ImplError("Cache '{kind}' does not implement update_mtime()"
- .format(kind=type(self).__name__))
+ self.cas.preflight()
# initialize_remotes():
#
@@ -498,7 +493,59 @@ class ArtifactCache():
# on_failure (callable): Called if we fail to contact one of the caches.
#
def initialize_remotes(self, *, on_failure=None):
- pass
+ remote_specs = self.global_remote_specs
+
+ for project in self.project_remote_specs:
+ remote_specs += self.project_remote_specs[project]
+
+ remote_specs = list(utils._deduplicate(remote_specs))
+
+ remotes = {}
+ q = multiprocessing.Queue()
+ for remote_spec in remote_specs:
+ # Use subprocess to avoid creation of gRPC threads in main BuildStream process
+ # See https://github.com/grpc/grpc/blob/master/doc/fork_support.md for details
+ p = multiprocessing.Process(target=self.cas.initialize_remote, args=(remote_spec, q))
+
+ try:
+ # Keep SIGINT blocked in the child process
+ with _signals.blocked([signal.SIGINT], ignore=False):
+ p.start()
+
+ error = q.get()
+ p.join()
+ except KeyboardInterrupt:
+ utils._kill_process_tree(p.pid)
+ raise
+
+ if error and on_failure:
+ on_failure(remote_spec.url, error)
+ elif error:
+ raise ArtifactError(error)
+ else:
+ self._has_fetch_remotes = True
+ if remote_spec.push:
+ self._has_push_remotes = True
+
+ remotes[remote_spec.url] = CASRemote(remote_spec)
+
+ for project in self.context.get_projects():
+ remote_specs = self.global_remote_specs
+ if project in self.project_remote_specs:
+ remote_specs = list(utils._deduplicate(remote_specs + self.project_remote_specs[project]))
+
+ project_remotes = []
+
+ for remote_spec in remote_specs:
+ # Errors are already handled in the loop above,
+ # skip unreachable remotes here.
+ if remote_spec.url not in remotes:
+ continue
+
+ remote = remotes[remote_spec.url]
+ project_remotes.append(remote)
+
+ self._remotes[project] = project_remotes
# contains():
#
@@ -512,8 +559,9 @@ class ArtifactCache():
# Returns: True if the artifact is in the cache, False otherwise
#
def contains(self, element, key):
- raise ImplError("Cache '{kind}' does not implement contains()"
- .format(kind=type(self).__name__))
+ ref = self.get_artifact_fullname(element, key)
+
+ return self.cas.contains(ref)
# list_artifacts():
#
@@ -524,8 +572,7 @@ class ArtifactCache():
# `ArtifactCache.get_artifact_fullname` in LRU order
#
def list_artifacts(self):
- raise ImplError("Cache '{kind}' does not implement list_artifacts()"
- .format(kind=type(self).__name__))
+ return self.cas.list_refs()
# remove():
#
@@ -537,9 +584,31 @@ class ArtifactCache():
# generated by
# `ArtifactCache.get_artifact_fullname`)
#
- def remove(self, artifact_name):
- raise ImplError("Cache '{kind}' does not implement remove()"
- .format(kind=type(self).__name__))
+ # Returns:
+ # (int|None) The amount of space pruned from the repository in
+ # Bytes, or None if defer_prune is True
+ #
+ def remove(self, ref):
+
+ # Remove extract if not used by other ref
+ tree = self.cas.resolve_ref(ref)
+ ref_name, ref_hash = os.path.split(ref)
+ extract = os.path.join(self.extractdir, ref_name, tree.hash)
+ keys_file = os.path.join(extract, 'meta', 'keys.yaml')
+ if os.path.exists(keys_file):
+ keys_meta = _yaml.load(keys_file)
+ keys = [keys_meta['strong'], keys_meta['weak']]
+ remove_extract = True
+ for other_hash in keys:
+ if other_hash == ref_hash:
+ continue
+ remove_extract = False
+ break
+
+ if remove_extract:
+ utils._force_rmtree(extract)
+
+ return self.cas.remove(ref)
# extract():
#
@@ -559,8 +628,11 @@ class ArtifactCache():
# Returns: path to extracted artifact
#
def extract(self, element, key):
- raise ImplError("Cache '{kind}' does not implement extract()"
- .format(kind=type(self).__name__))
+ ref = self.get_artifact_fullname(element, key)
+
+ path = os.path.join(self.extractdir, element._get_project().name, element.normal_name)
+
+ return self.cas.extract(ref, path)
# commit():
#
@@ -572,8 +644,9 @@ class ArtifactCache():
# keys (list): The cache keys to use
#
def commit(self, element, content, keys):
- raise ImplError("Cache '{kind}' does not implement commit()"
- .format(kind=type(self).__name__))
+ refs = [self.get_artifact_fullname(element, key) for key in keys]
+
+ self.cas.commit(refs, content)
# diff():
#
@@ -587,8 +660,10 @@ class ArtifactCache():
# subdir (str): A subdirectory to limit the comparison to
#
def diff(self, element, key_a, key_b, *, subdir=None):
- raise ImplError("Cache '{kind}' does not implement diff()"
- .format(kind=type(self).__name__))
+ ref_a = self.get_artifact_fullname(element, key_a)
+ ref_b = self.get_artifact_fullname(element, key_b)
+
+ return self.cas.diff(ref_a, ref_b, subdir=subdir)
# has_fetch_remotes():
#
@@ -600,7 +675,16 @@ class ArtifactCache():
# Returns: True if any remote repositories are configured, False otherwise
#
def has_fetch_remotes(self, *, element=None):
- return False
+ if not self._has_fetch_remotes:
+ # No project has fetch remotes
+ return False
+ elif element is None:
+ # At least one (sub)project has fetch remotes
+ return True
+ else:
+ # Check whether the specified element's project has fetch remotes
+ remotes_for_project = self._remotes[element._get_project()]
+ return bool(remotes_for_project)
# has_push_remotes():
#
@@ -612,7 +696,16 @@ class ArtifactCache():
# Returns: True if any remote repository is configured, False otherwise
#
def has_push_remotes(self, *, element=None):
- return False
+ if not self._has_push_remotes:
+ # No project has push remotes
+ return False
+ elif element is None:
+ # At least one (sub)project has push remotes
+ return True
+ else:
+ # Check whether the specified element's project has push remotes
+ remotes_for_project = self._remotes[element._get_project()]
+ return any(remote.spec.push for remote in remotes_for_project)
# push():
#
@@ -629,8 +722,28 @@ class ArtifactCache():
# (ArtifactError): if there was an error
#
def push(self, element, keys):
- raise ImplError("Cache '{kind}' does not implement push()"
- .format(kind=type(self).__name__))
+ refs = [self.get_artifact_fullname(element, key) for key in list(keys)]
+
+ project = element._get_project()
+
+ push_remotes = [r for r in self._remotes[project] if r.spec.push]
+
+ pushed = False
+
+ for remote in push_remotes:
+ remote.init()
+ display_key = element._get_brief_display_key()
+ element.status("Pushing artifact {} -> {}".format(display_key, remote.spec.url))
+
+ if self.cas.push(refs, remote):
+ element.info("Pushed artifact {} -> {}".format(display_key, remote.spec.url))
+ pushed = True
+ else:
+ element.info("Remote ({}) already has {} cached".format(
+ remote.spec.url, element._get_brief_display_key()
+ ))
+
+ return pushed
# pull():
#
@@ -645,8 +758,32 @@ class ArtifactCache():
# (bool): True if pull was successful, False if artifact was not available
#
def pull(self, element, key, *, progress=None):
- raise ImplError("Cache '{kind}' does not implement pull()"
- .format(kind=type(self).__name__))
+ ref = self.get_artifact_fullname(element, key)
+ display_key = key[:self.context.log_key_length]
+
+ project = element._get_project()
+
+ for remote in self._remotes[project]:
+ try:
+ element.status("Pulling artifact {} <- {}".format(display_key, remote.spec.url))
+
+ if self.cas.pull(ref, remote, progress=progress):
+ element.info("Pulled artifact {} <- {}".format(display_key, remote.spec.url))
+ # no need to pull from additional remotes
+ return True
+ else:
+ element.info("Remote ({}) does not have {} cached".format(
+ remote.spec.url, display_key
+ ))
+ except BlobNotFound as e:
+ element.info("Remote ({}) does not have {} cached".format(
+ remote.spec.url, display_key
+ ))
+ except CASError as e:
+ raise ArtifactError("Failed to pull artifact {}: {}".format(
+ display_key, e)) from e
+
+ return False
# link_key():
#
@@ -658,19 +795,10 @@ class ArtifactCache():
# newkey (str): A new cache key for the artifact
#
def link_key(self, element, oldkey, newkey):
- raise ImplError("Cache '{kind}' does not implement link_key()"
- .format(kind=type(self).__name__))
+ oldref = self.get_artifact_fullname(element, oldkey)
+ newref = self.get_artifact_fullname(element, newkey)
- # calculate_cache_size()
- #
- # Return the real artifact cache size.
- #
- # Returns:
- # (int): The size of the artifact cache.
- #
- def calculate_cache_size(self):
- raise ImplError("Cache '{kind}' does not implement calculate_cache_size()"
- .format(kind=type(self).__name__))
+ self.cas.link_ref(oldref, newref)
################################################
# Local Private Methods #