diff options
Diffstat (limited to 'buildstream/_artifactcache/artifactcache.py')
-rw-r--r-- | buildstream/_artifactcache/artifactcache.py | 244 |
1 files changed, 186 insertions, 58 deletions
diff --git a/buildstream/_artifactcache/artifactcache.py b/buildstream/_artifactcache/artifactcache.py index ed5ef8262..38500a048 100644 --- a/buildstream/_artifactcache/artifactcache.py +++ b/buildstream/_artifactcache/artifactcache.py @@ -17,16 +17,21 @@ # Authors: # Tristan Maat <tristan.maat@codethink.co.uk> +import multiprocessing import os +import signal import string from collections import Mapping, namedtuple from ..types import _KeyStrength -from .._exceptions import ArtifactError, ImplError, LoadError, LoadErrorReason +from .._exceptions import ArtifactError, CASError, LoadError, LoadErrorReason from .._message import Message, MessageType +from .. import _signals from .. import utils from .. import _yaml +from .cascache import CASCache, CASRemote + CACHE_SIZE_FILE = "cache_size" @@ -125,7 +130,8 @@ class ArtifactCache(): def __init__(self, context): self.context = context self.extractdir = os.path.join(context.artifactdir, 'extract') - self.tmpdir = os.path.join(context.artifactdir, 'tmp') + + self.cas = CASCache(context.artifactdir) self.global_remote_specs = [] self.project_remote_specs = {} @@ -136,12 +142,15 @@ class ArtifactCache(): self._cache_quota_original = None # The cache quota as specified by the user, in bytes self._cache_lower_threshold = None # The target cache size for a cleanup + # Per-project list of _CASRemote instances. + self._remotes = {} + + self._has_fetch_remotes = False + self._has_push_remotes = False + os.makedirs(self.extractdir, exist_ok=True) - os.makedirs(self.tmpdir, exist_ok=True) - ################################################ - # Methods implemented on the abstract class # - ################################################ + self._calculate_cache_quota() # get_artifact_fullname() # @@ -268,8 +277,10 @@ class ArtifactCache(): for key in (strong_key, weak_key): if key: try: - self.update_mtime(element, key) - except ArtifactError: + ref = self.get_artifact_fullname(element, key) + + self.cas.update_mtime(ref) + except CASError: pass # clean(): @@ -392,7 +403,7 @@ class ArtifactCache(): # def compute_cache_size(self): old_cache_size = self._cache_size - new_cache_size = self.calculate_cache_size() + new_cache_size = self.cas.calculate_cache_size() if old_cache_size != new_cache_size: self._cache_size = new_cache_size @@ -467,28 +478,12 @@ class ArtifactCache(): def has_quota_exceeded(self): return self.get_cache_size() > self._cache_quota - ################################################ - # Abstract methods for subclasses to implement # - ################################################ - # preflight(): # # Preflight check. # def preflight(self): - pass - - # update_mtime() - # - # Update the mtime of an artifact. - # - # Args: - # element (Element): The Element to update - # key (str): The key of the artifact. - # - def update_mtime(self, element, key): - raise ImplError("Cache '{kind}' does not implement update_mtime()" - .format(kind=type(self).__name__)) + self.cas.preflight() # initialize_remotes(): # @@ -498,7 +493,59 @@ class ArtifactCache(): # on_failure (callable): Called if we fail to contact one of the caches. # def initialize_remotes(self, *, on_failure=None): - pass + remote_specs = self.global_remote_specs + + for project in self.project_remote_specs: + remote_specs += self.project_remote_specs[project] + + remote_specs = list(utils._deduplicate(remote_specs)) + + remotes = {} + q = multiprocessing.Queue() + for remote_spec in remote_specs: + # Use subprocess to avoid creation of gRPC threads in main BuildStream process + # See https://github.com/grpc/grpc/blob/master/doc/fork_support.md for details + p = multiprocessing.Process(target=self.cas.initialize_remote, args=(remote_spec, q)) + + try: + # Keep SIGINT blocked in the child process + with _signals.blocked([signal.SIGINT], ignore=False): + p.start() + + error = q.get() + p.join() + except KeyboardInterrupt: + utils._kill_process_tree(p.pid) + raise + + if error and on_failure: + on_failure(remote_spec.url, error) + elif error: + raise ArtifactError(error) + else: + self._has_fetch_remotes = True + if remote_spec.push: + self._has_push_remotes = True + + remotes[remote_spec.url] = CASRemote(remote_spec) + + for project in self.context.get_projects(): + remote_specs = self.global_remote_specs + if project in self.project_remote_specs: + remote_specs = list(utils._deduplicate(remote_specs + self.project_remote_specs[project])) + + project_remotes = [] + + for remote_spec in remote_specs: + # Errors are already handled in the loop above, + # skip unreachable remotes here. + if remote_spec.url not in remotes: + continue + + remote = remotes[remote_spec.url] + project_remotes.append(remote) + + self._remotes[project] = project_remotes # contains(): # @@ -512,8 +559,9 @@ class ArtifactCache(): # Returns: True if the artifact is in the cache, False otherwise # def contains(self, element, key): - raise ImplError("Cache '{kind}' does not implement contains()" - .format(kind=type(self).__name__)) + ref = self.get_artifact_fullname(element, key) + + return self.cas.contains(ref) # list_artifacts(): # @@ -524,8 +572,7 @@ class ArtifactCache(): # `ArtifactCache.get_artifact_fullname` in LRU order # def list_artifacts(self): - raise ImplError("Cache '{kind}' does not implement list_artifacts()" - .format(kind=type(self).__name__)) + return self.cas.list_refs() # remove(): # @@ -537,9 +584,31 @@ class ArtifactCache(): # generated by # `ArtifactCache.get_artifact_fullname`) # - def remove(self, artifact_name): - raise ImplError("Cache '{kind}' does not implement remove()" - .format(kind=type(self).__name__)) + # Returns: + # (int|None) The amount of space pruned from the repository in + # Bytes, or None if defer_prune is True + # + def remove(self, ref): + + # Remove extract if not used by other ref + tree = self.cas.resolve_ref(ref) + ref_name, ref_hash = os.path.split(ref) + extract = os.path.join(self.extractdir, ref_name, tree.hash) + keys_file = os.path.join(extract, 'meta', 'keys.yaml') + if os.path.exists(keys_file): + keys_meta = _yaml.load(keys_file) + keys = [keys_meta['strong'], keys_meta['weak']] + remove_extract = True + for other_hash in keys: + if other_hash == ref_hash: + continue + remove_extract = False + break + + if remove_extract: + utils._force_rmtree(extract) + + return self.cas.remove(ref) # extract(): # @@ -559,8 +628,11 @@ class ArtifactCache(): # Returns: path to extracted artifact # def extract(self, element, key): - raise ImplError("Cache '{kind}' does not implement extract()" - .format(kind=type(self).__name__)) + ref = self.get_artifact_fullname(element, key) + + path = os.path.join(self.extractdir, element._get_project().name, element.normal_name) + + return self.cas.extract(ref, path) # commit(): # @@ -572,8 +644,9 @@ class ArtifactCache(): # keys (list): The cache keys to use # def commit(self, element, content, keys): - raise ImplError("Cache '{kind}' does not implement commit()" - .format(kind=type(self).__name__)) + refs = [self.get_artifact_fullname(element, key) for key in keys] + + self.cas.commit(refs, content) # diff(): # @@ -587,8 +660,10 @@ class ArtifactCache(): # subdir (str): A subdirectory to limit the comparison to # def diff(self, element, key_a, key_b, *, subdir=None): - raise ImplError("Cache '{kind}' does not implement diff()" - .format(kind=type(self).__name__)) + ref_a = self.get_artifact_fullname(element, key_a) + ref_b = self.get_artifact_fullname(element, key_b) + + return self.cas.diff(ref_a, ref_b, subdir=subdir) # has_fetch_remotes(): # @@ -600,7 +675,16 @@ class ArtifactCache(): # Returns: True if any remote repositories are configured, False otherwise # def has_fetch_remotes(self, *, element=None): - return False + if not self._has_fetch_remotes: + # No project has fetch remotes + return False + elif element is None: + # At least one (sub)project has fetch remotes + return True + else: + # Check whether the specified element's project has fetch remotes + remotes_for_project = self._remotes[element._get_project()] + return bool(remotes_for_project) # has_push_remotes(): # @@ -612,7 +696,16 @@ class ArtifactCache(): # Returns: True if any remote repository is configured, False otherwise # def has_push_remotes(self, *, element=None): - return False + if not self._has_push_remotes: + # No project has push remotes + return False + elif element is None: + # At least one (sub)project has push remotes + return True + else: + # Check whether the specified element's project has push remotes + remotes_for_project = self._remotes[element._get_project()] + return any(remote.spec.push for remote in remotes_for_project) # push(): # @@ -629,8 +722,28 @@ class ArtifactCache(): # (ArtifactError): if there was an error # def push(self, element, keys): - raise ImplError("Cache '{kind}' does not implement push()" - .format(kind=type(self).__name__)) + refs = [self.get_artifact_fullname(element, key) for key in list(keys)] + + project = element._get_project() + + push_remotes = [r for r in self._remotes[project] if r.spec.push] + + pushed = False + + for remote in push_remotes: + remote.init() + display_key = element._get_brief_display_key() + element.status("Pushing artifact {} -> {}".format(display_key, remote.spec.url)) + + if self.cas.push(refs, remote): + element.info("Pushed artifact {} -> {}".format(display_key, remote.spec.url)) + pushed = True + else: + element.info("Remote ({}) already has {} cached".format( + remote.spec.url, element._get_brief_display_key() + )) + + return pushed # pull(): # @@ -645,8 +758,32 @@ class ArtifactCache(): # (bool): True if pull was successful, False if artifact was not available # def pull(self, element, key, *, progress=None): - raise ImplError("Cache '{kind}' does not implement pull()" - .format(kind=type(self).__name__)) + ref = self.get_artifact_fullname(element, key) + display_key = key[:self.context.log_key_length] + + project = element._get_project() + + for remote in self._remotes[project]: + try: + element.status("Pulling artifact {} <- {}".format(display_key, remote.spec.url)) + + if self.cas.pull(ref, remote, progress=progress): + element.info("Pulled artifact {} <- {}".format(display_key, remote.spec.url)) + # no need to pull from additional remotes + return True + else: + element.info("Remote ({}) does not have {} cached".format( + remote.spec.url, display_key + )) + except BlobNotFound as e: + element.info("Remote ({}) does not have {} cached".format( + remote.spec.url, display_key + )) + except CASError as e: + raise ArtifactError("Failed to pull artifact {}: {}".format( + display_key, e)) from e + + return False # link_key(): # @@ -658,19 +795,10 @@ class ArtifactCache(): # newkey (str): A new cache key for the artifact # def link_key(self, element, oldkey, newkey): - raise ImplError("Cache '{kind}' does not implement link_key()" - .format(kind=type(self).__name__)) + oldref = self.get_artifact_fullname(element, oldkey) + newref = self.get_artifact_fullname(element, newkey) - # calculate_cache_size() - # - # Return the real artifact cache size. - # - # Returns: - # (int): The size of the artifact cache. - # - def calculate_cache_size(self): - raise ImplError("Cache '{kind}' does not implement calculate_cache_size()" - .format(kind=type(self).__name__)) + self.cas.link_ref(oldref, newref) ################################################ # Local Private Methods # |