# # Copyright (C) 2019 Codethink Limited # Copyright (C) 2019 Bloomberg Finance LP # # This program is free software; you can redistribute it and/or # modify it under the terms of the GNU Lesser General Public # License as published by the Free Software Foundation; either # version 2 of the License, or (at your option) any later version. # # This library is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU # Lesser General Public License for more details. # # You should have received a copy of the GNU Lesser General Public # License along with this library. If not, see . # # Authors: # Tom Pollard # Tristan Van Berkom """ Artifact ========= Implementation of the Artifact class which aims to 'abstract' direct artifact composite interaction away from Element class """ import os from ._exceptions import ArtifactError from ._protos.buildstream.v2.artifact_pb2 import Artifact as ArtifactProto from . import _yaml from . import utils from .types import Scope from .storage._casbaseddirectory import CasBasedDirectory # An Artifact class to abstract artifact operations # from the Element class # # Args: # element (Element): The Element object # context (Context): The BuildStream context # strong_key (str): The elements strong cache key, dependent on context # weak_key (str): The elements weak cache key # class Artifact: version = 0 def __init__(self, element, context, *, strong_key=None, weak_key=None): self._element = element self._context = context self._cache_key = strong_key self._weak_cache_key = weak_key self._artifactdir = context.artifactdir self._cas = context.get_cascache() self._tmpdir = context.tmpdir self._proto = None self._metadata_keys = None # Strong and weak key tuple extracted from the artifact self._metadata_dependencies = None # Dictionary of dependency strong keys from the artifact self._metadata_workspaced = None # Boolean of whether it's a workspaced artifact self._metadata_workspaced_dependencies = None # List of which dependencies are workspaced from the artifact self._cached = None # Boolean of whether the artifact is cached # get_files(): # # Get a virtual directory for the artifact files content # # Returns: # (Directory): The virtual directory object # def get_files(self): files_digest = self._get_field_digest("files") return CasBasedDirectory(self._cas, digest=files_digest) # get_buildtree(): # # Get a virtual directory for the artifact buildtree content # # Returns: # (Directory): The virtual directory object # def get_buildtree(self): buildtree_digest = self._get_field_digest("buildtree") return CasBasedDirectory(self._cas, digest=buildtree_digest) # get_sources(): # # Get a virtual directory for the artifact sources # # Returns: # (Directory): The virtual directory object # def get_sources(self): sources_digest = self._get_field_digest("sources") return CasBasedDirectory(self._cas, digest=sources_digest) # get_logs(): # # Get the paths of the artifact's logs # # Returns: # (list): A list of object paths # def get_logs(self): artifact = self._get_proto() logfile_paths = [] for logfile in artifact.logs: logfile_paths.append(self._cas.objpath(logfile.digest)) return logfile_paths # get_extract_key(): # # Get the key used to extract the artifact # # Returns: # (str): The key # def get_extract_key(self): return self._cache_key or self._weak_cache_key # cache(): # # Create the artifact and commit to cache # # Args: # sandbox_build_dir (Directory): Virtual Directory object for the sandbox build-root # collectvdir (Directory): Virtual Directoy object from within the sandbox for collection # sourcesvdir (Directory): Virtual Directoy object for the staged sources # buildresult (tuple): bool, short desc and detailed desc of result # publicdata (dict): dict of public data to commit to artifact metadata # # Returns: # (int): The size of the newly cached artifact # def cache(self, sandbox_build_dir, collectvdir, sourcesvdir, buildresult, publicdata): context = self._context element = self._element size = 0 filesvdir = None buildtreevdir = None artifact = ArtifactProto() artifact.version = self.version # Store result artifact.build_success = buildresult[0] artifact.build_error = buildresult[1] artifact.build_error_details = "" if not buildresult[2] else buildresult[2] # Store keys artifact.strong_key = self._cache_key artifact.weak_key = self._weak_cache_key artifact.was_workspaced = bool(element._get_workspace()) properties = ["MTime"] if artifact.was_workspaced else [] # Store files if collectvdir: filesvdir = CasBasedDirectory(cas_cache=self._cas) filesvdir.import_files(collectvdir, properties=properties) artifact.files.CopyFrom(filesvdir._get_digest()) size += filesvdir.get_size() # Store public data with utils._tempnamedfile_name(dir=self._tmpdir) as tmpname: _yaml.roundtrip_dump(publicdata, tmpname) public_data_digest = self._cas.add_object(path=tmpname, link_directly=True) artifact.public_data.CopyFrom(public_data_digest) size += public_data_digest.size_bytes # store build dependencies for e in element.dependencies(Scope.BUILD): new_build = artifact.build_deps.add() new_build.project_name = e.project_name new_build.element_name = e.name new_build.cache_key = e._get_cache_key() new_build.was_workspaced = bool(e._get_workspace()) # Store log file log_filename = context.messenger.get_log_filename() if log_filename: digest = self._cas.add_object(path=log_filename) element._build_log_path = self._cas.objpath(digest) log = artifact.logs.add() log.name = os.path.basename(log_filename) log.digest.CopyFrom(digest) size += log.digest.size_bytes # Store build tree if sandbox_build_dir: buildtreevdir = CasBasedDirectory(cas_cache=self._cas) buildtreevdir.import_files(sandbox_build_dir, properties=properties) artifact.buildtree.CopyFrom(buildtreevdir._get_digest()) size += buildtreevdir.get_size() # Store sources if sourcesvdir: artifact.sources.CopyFrom(sourcesvdir._get_digest()) size += sourcesvdir.get_size() os.makedirs(os.path.dirname(os.path.join(self._artifactdir, element.get_artifact_name())), exist_ok=True) keys = utils._deduplicate([self._cache_key, self._weak_cache_key]) for key in keys: path = os.path.join(self._artifactdir, element.get_artifact_name(key=key)) with utils.save_file_atomic(path, mode="wb") as f: f.write(artifact.SerializeToString()) return size # cached_buildtree() # # Check if artifact is cached with expected buildtree. A # buildtree will not be present if the rest of the partial artifact # is not cached. # # Returns: # (bool): True if artifact cached with buildtree, False if # missing expected buildtree. Note this only confirms # if a buildtree is present, not its contents. # def cached_buildtree(self): buildtree_digest = self._get_field_digest("buildtree") if buildtree_digest: return self._cas.contains_directory(buildtree_digest, with_files=True) else: return False # buildtree_exists() # # Check if artifact was created with a buildtree. This does not check # whether the buildtree is present in the local cache. # # Returns: # (bool): True if artifact was created with buildtree # def buildtree_exists(self): artifact = self._get_proto() return bool(str(artifact.buildtree)) # cached_sources() # # Check if artifact is cached with sources. # # Returns: # (bool): True if artifact is cached with sources, False if sources # are not available. # def cached_sources(self): sources_digest = self._get_field_digest("sources") if sources_digest: return self._cas.contains_directory(sources_digest, with_files=True) else: return False # load_public_data(): # # Loads the public data from the cached artifact # # Returns: # (dict): The artifacts cached public data # def load_public_data(self): # Load the public data from the artifact artifact = self._get_proto() meta_file = self._cas.objpath(artifact.public_data) data = _yaml.load(meta_file, shortname="public.yaml") return data # load_build_result(): # # Load the build result from the cached artifact # # Returns: # (bool): Whether the artifact of this element present in the artifact cache is of a success # (str): Short description of the result # (str): Detailed description of the result # def load_build_result(self): artifact = self._get_proto() build_result = (artifact.build_success, artifact.build_error, artifact.build_error_details) return build_result # get_metadata_keys(): # # Retrieve the strong and weak keys from the given artifact. # # Returns: # (str): The strong key # (str): The weak key # def get_metadata_keys(self): if self._metadata_keys is not None: return self._metadata_keys # Extract proto artifact = self._get_proto() strong_key = artifact.strong_key weak_key = artifact.weak_key self._metadata_keys = (strong_key, weak_key) return self._metadata_keys # get_metadata_workspaced(): # # Retrieve the hash of dependency from the given artifact. # # Returns: # (bool): Whether the given artifact was workspaced # def get_metadata_workspaced(self): if self._metadata_workspaced is not None: return self._metadata_workspaced # Extract proto artifact = self._get_proto() self._metadata_workspaced = artifact.was_workspaced return self._metadata_workspaced # get_metadata_workspaced_dependencies(): # # Retrieve the hash of workspaced dependencies keys from the given artifact. # # Returns: # (list): List of which dependencies are workspaced # def get_metadata_workspaced_dependencies(self): if self._metadata_workspaced_dependencies is not None: return self._metadata_workspaced_dependencies # Extract proto artifact = self._get_proto() self._metadata_workspaced_dependencies = [ dep.element_name for dep in artifact.build_deps if dep.was_workspaced ] return self._metadata_workspaced_dependencies # get_dependency_refs() # # Retrieve the artifact refs of the artifact's dependencies # # Args: # deps (Scope): The scope of dependencies # # Returns: # (list [str]): A list of refs of all build dependencies in staging order. # def get_dependency_refs(self, deps=Scope.BUILD): # XXX: The pylint disable is necessary due to upstream issue: # https://github.com/PyCQA/pylint/issues/850 from .element import _get_normal_name # pylint: disable=cyclic-import # Extract the proto artifact = self._get_proto() if deps == Scope.BUILD: try: dependency_refs = [ os.path.join(dep.project_name, _get_normal_name(dep.element_name), dep.cache_key) for dep in artifact.build_deps ] except AttributeError: # If the artifact has no dependencies dependency_refs = [] elif deps == Scope.NONE: dependency_refs = [self._element.get_artifact_name()] else: # XXX: We can only support obtaining the build dependencies of # an artifact. This is because this is the only information we store # in the proto. If we were to add runtime deps to the proto, we'd need # to include these in cache key calculation. # # This would have some undesirable side effects: # 1. It might trigger unnecessary rebuilds. # 2. It would be impossible to support cyclic runtime dependencies # in the future raise ArtifactError("Dependency scope: {} is not supported for artifacts".format(deps)) return dependency_refs # cached(): # # Check whether the artifact corresponding to the stored cache key is # available. This also checks whether all required parts of the artifact # are available, which may depend on command and configuration. The cache # key used for querying is dependent on the current context. # # Returns: # (bool): Whether artifact is in local cache # def cached(self): if self._cached is not None: return self._cached context = self._context artifact = self._load_proto() if not artifact: self._cached = False return False # Determine whether directories are required require_directories = context.require_artifact_directories # Determine whether file contents are required as well require_files = context.require_artifact_files or self._element._artifact_files_required() # Check whether 'files' subdirectory is available, with or without file contents if ( require_directories and str(artifact.files) and not self._cas.contains_directory(artifact.files, with_files=require_files) ): self._cached = False return False # Check whether public data and logs are available logfile_digests = [logfile.digest for logfile in artifact.logs] digests = [artifact.public_data] + logfile_digests if not self._cas.contains_files(digests): self._cached = False return False self._proto = artifact self._cached = True return True # cached_logs() # # Check if the artifact is cached with log files. # # Returns: # (bool): True if artifact is cached with logs, False if # element not cached or missing logs. # def cached_logs(self): # Log files are currently considered an essential part of an artifact. # If the artifact is cached, its log files are available as well. return self._element._cached() # reset_cached() # # Allow the Artifact to query the filesystem to determine whether it # is cached or not. # def reset_cached(self): self._proto = None self._cached = None # set_cached() # # Mark the artifact as cached without querying the filesystem. # This is used as optimization when we know the artifact is available. # def set_cached(self): self._proto = self._load_proto() self._cached = True # load_proto() # # Returns: # (Artifact): Artifact proto # def _load_proto(self): key = self.get_extract_key() proto_path = os.path.join(self._artifactdir, self._element.get_artifact_name(key=key)) artifact = ArtifactProto() try: with open(proto_path, mode="r+b") as f: artifact.ParseFromString(f.read()) except FileNotFoundError: return None os.utime(proto_path) return artifact # _get_proto() # # Returns: # (Artifact): Artifact proto # def _get_proto(self): return self._proto # _get_field_digest() # # Returns: # (Digest): Digest of field specified # def _get_field_digest(self, field): artifact_proto = self._get_proto() digest = getattr(artifact_proto, field) if not str(digest): return None return digest