diff options
Diffstat (limited to 'buildstream/_artifactcache/cascache.py')
-rw-r--r-- | buildstream/_artifactcache/cascache.py | 434 |
1 files changed, 171 insertions, 263 deletions
diff --git a/buildstream/_artifactcache/cascache.py b/buildstream/_artifactcache/cascache.py index 4f0d10da5..c6efcf508 100644 --- a/buildstream/_artifactcache/cascache.py +++ b/buildstream/_artifactcache/cascache.py @@ -19,9 +19,7 @@ import hashlib import itertools -import multiprocessing import os -import signal import stat import tempfile import uuid @@ -31,17 +29,12 @@ from urllib.parse import urlparse import grpc -from .. import _yaml - from .._protos.google.bytestream import bytestream_pb2, bytestream_pb2_grpc from .._protos.build.bazel.remote.execution.v2 import remote_execution_pb2, remote_execution_pb2_grpc from .._protos.buildstream.v2 import buildstream_pb2, buildstream_pb2_grpc -from .._message import MessageType, Message -from .. import _signals, utils -from .._exceptions import ArtifactError - -from . import ArtifactCache +from .. import utils +from .._exceptions import CASError # The default limit for gRPC messages is 4 MiB. @@ -89,68 +82,74 @@ def _retry(tries=5): break -class BlobNotFound(ArtifactError): +class BlobNotFound(CASError): def __init__(self, blob, msg): self.blob = blob super().__init__(msg) -# A CASCache manages artifacts in a CAS repository as specified in the -# Remote Execution API. +# A CASCache manages a CAS repository as specified in the Remote Execution API. # # Args: -# context (Context): The BuildStream context -# -# Pushing is explicitly disabled by the platform in some cases, -# like when we are falling back to functioning without using -# user namespaces. +# path (str): The root directory for the CAS repository # -class CASCache(ArtifactCache): +class CASCache(): - def __init__(self, context): - super().__init__(context) - - self.casdir = os.path.join(context.artifactdir, 'cas') + def __init__(self, path): + self.casdir = os.path.join(path, 'cas') + self.tmpdir = os.path.join(path, 'tmp') os.makedirs(os.path.join(self.casdir, 'refs', 'heads'), exist_ok=True) os.makedirs(os.path.join(self.casdir, 'objects'), exist_ok=True) + os.makedirs(self.tmpdir, exist_ok=True) - self._calculate_cache_quota() - - # Per-project list of _CASRemote instances. - self._remotes = {} - - self._has_fetch_remotes = False - self._has_push_remotes = False - - ################################################ - # Implementation of abstract methods # - ################################################ - + # preflight(): + # + # Preflight check. + # def preflight(self): if (not os.path.isdir(os.path.join(self.casdir, 'refs', 'heads')) or not os.path.isdir(os.path.join(self.casdir, 'objects'))): - raise ArtifactError("CAS repository check failed for '{}'" - .format(self.casdir)) + raise CASError("CAS repository check failed for '{}'".format(self.casdir)) - def contains(self, element, key): - refpath = self._refpath(self.get_artifact_fullname(element, key)) + # contains(): + # + # Check whether the specified ref is already available in the local CAS cache. + # + # Args: + # ref (str): The ref to check + # + # Returns: True if the ref is in the cache, False otherwise + # + def contains(self, ref): + refpath = self._refpath(ref) # This assumes that the repository doesn't have any dangling pointers return os.path.exists(refpath) - def extract(self, element, key): - ref = self.get_artifact_fullname(element, key) - + # extract(): + # + # Extract cached directory for the specified ref if it hasn't + # already been extracted. + # + # Args: + # ref (str): The ref whose directory to extract + # path (str): The destination path + # + # Raises: + # CASError: In cases there was an OSError, or if the ref did not exist. + # + # Returns: path to extracted directory + # + def extract(self, ref, path): tree = self.resolve_ref(ref, update_mtime=True) - dest = os.path.join(self.extractdir, element._get_project().name, - element.normal_name, tree.hash) + dest = os.path.join(path, tree.hash) if os.path.isdir(dest): - # artifact has already been extracted + # directory has already been extracted return dest - with tempfile.TemporaryDirectory(prefix='tmp', dir=self.extractdir) as tmpdir: + with tempfile.TemporaryDirectory(prefix='tmp', dir=self.tmpdir) as tmpdir: checkoutdir = os.path.join(tmpdir, ref) self._checkout(checkoutdir, tree) @@ -164,23 +163,35 @@ class CASCache(ArtifactCache): # If rename fails with these errors, another process beat # us to it so just ignore. if e.errno not in [errno.ENOTEMPTY, errno.EEXIST]: - raise ArtifactError("Failed to extract artifact for ref '{}': {}" - .format(ref, e)) from e + raise CASError("Failed to extract directory for ref '{}': {}".format(ref, e)) from e return dest - def commit(self, element, content, keys): - refs = [self.get_artifact_fullname(element, key) for key in keys] - - tree = self._commit_directory(content) + # commit(): + # + # Commit directory to cache. + # + # Args: + # refs (list): The refs to set + # path (str): The directory to import + # + def commit(self, refs, path): + tree = self._commit_directory(path) for ref in refs: self.set_ref(ref, tree) - def diff(self, element, key_a, key_b, *, subdir=None): - ref_a = self.get_artifact_fullname(element, key_a) - ref_b = self.get_artifact_fullname(element, key_b) - + # diff(): + # + # Return a list of files that have been added or modified between + # the refs described by ref_a and ref_b. + # + # Args: + # ref_a (str): The first ref + # ref_b (str): The second ref + # subdir (str): A subdirectory to limit the comparison to + # + def diff(self, ref_a, ref_b, *, subdir=None): tree_a = self.resolve_ref(ref_a) tree_b = self.resolve_ref(ref_b) @@ -196,145 +207,103 @@ class CASCache(ArtifactCache): return modified, removed, added - def initialize_remotes(self, *, on_failure=None): - remote_specs = self.global_remote_specs - - for project in self.project_remote_specs: - remote_specs += self.project_remote_specs[project] - - remote_specs = list(utils._deduplicate(remote_specs)) + def initialize_remote(self, remote_spec, q): + try: + remote = CASRemote(remote_spec) + remote.init() - remotes = {} - q = multiprocessing.Queue() - for remote_spec in remote_specs: - # Use subprocess to avoid creation of gRPC threads in main BuildStream process - # See https://github.com/grpc/grpc/blob/master/doc/fork_support.md for details - p = multiprocessing.Process(target=self._initialize_remote, args=(remote_spec, q)) + request = buildstream_pb2.StatusRequest() + for attempt in _retry(): + with attempt: + response = remote.ref_storage.Status(request) - try: - # Keep SIGINT blocked in the child process - with _signals.blocked([signal.SIGINT], ignore=False): - p.start() - - error = q.get() - p.join() - except KeyboardInterrupt: - utils._kill_process_tree(p.pid) - raise - - if error and on_failure: - on_failure(remote_spec.url, error) - elif error: - raise ArtifactError(error) + if remote_spec.push and not response.allow_updates: + q.put('CAS server does not allow push') else: - self._has_fetch_remotes = True - if remote_spec.push: - self._has_push_remotes = True + # No error + q.put(None) - remotes[remote_spec.url] = _CASRemote(remote_spec) + except grpc.RpcError as e: + # str(e) is too verbose for errors reported to the user + q.put(e.details()) - for project in self.context.get_projects(): - remote_specs = self.global_remote_specs - if project in self.project_remote_specs: - remote_specs = list(utils._deduplicate(remote_specs + self.project_remote_specs[project])) + except Exception as e: # pylint: disable=broad-except + # Whatever happens, we need to return it to the calling process + # + q.put(str(e)) - project_remotes = [] + # pull(): + # + # Pull a ref from a remote repository. + # + # Args: + # ref (str): The ref to pull + # remote (CASRemote): The remote repository to pull from + # progress (callable): The progress callback, if any + # + # Returns: + # (bool): True if pull was successful, False if ref was not available + # + def pull(self, ref, remote, *, progress=None): + try: + remote.init() - for remote_spec in remote_specs: - # Errors are already handled in the loop above, - # skip unreachable remotes here. - if remote_spec.url not in remotes: - continue + request = buildstream_pb2.GetReferenceRequest() + request.key = ref + for attempt in _retry(): + with attempt: + response = remote.ref_storage.GetReference(request) - remote = remotes[remote_spec.url] - project_remotes.append(remote) + tree = remote_execution_pb2.Digest() + tree.hash = response.digest.hash + tree.size_bytes = response.digest.size_bytes - self._remotes[project] = project_remotes + self._fetch_directory(remote, tree) - def has_fetch_remotes(self, *, element=None): - if not self._has_fetch_remotes: - # No project has fetch remotes - return False - elif element is None: - # At least one (sub)project has fetch remotes - return True - else: - # Check whether the specified element's project has fetch remotes - remotes_for_project = self._remotes[element._get_project()] - return bool(remotes_for_project) + self.set_ref(ref, tree) - def has_push_remotes(self, *, element=None): - if not self._has_push_remotes: - # No project has push remotes - return False - elif element is None: - # At least one (sub)project has push remotes return True - else: - # Check whether the specified element's project has push remotes - remotes_for_project = self._remotes[element._get_project()] - return any(remote.spec.push for remote in remotes_for_project) - - def pull(self, element, key, *, progress=None): - ref = self.get_artifact_fullname(element, key) - display_key = key[:self.context.log_key_length] - - project = element._get_project() - - for remote in self._remotes[project]: - try: - remote.init() - element.status("Pulling artifact {} <- {}".format(display_key, remote.spec.url)) - - request = buildstream_pb2.GetReferenceRequest() - request.key = ref - for attempt in _retry(): - with attempt: - response = remote.ref_storage.GetReference(request) - - tree = remote_execution_pb2.Digest() - tree.hash = response.digest.hash - tree.size_bytes = response.digest.size_bytes - - self._fetch_directory(remote, tree) - - self.set_ref(ref, tree) - - element.info("Pulled artifact {} <- {}".format(display_key, remote.spec.url)) - # no need to pull from additional remotes - return True - - except grpc.RpcError as e: - if e.code() != grpc.StatusCode.NOT_FOUND: - raise ArtifactError("Failed to pull artifact {}: {}".format( - display_key, e)) from e - else: - element.info("Remote ({}) does not have {} cached".format( - remote.spec.url, display_key - )) - except BlobNotFound as e: - element.info("Remote ({}) does not have {} cached".format( - remote.spec.url, display_key - )) - - return False - - def link_key(self, element, oldkey, newkey): - oldref = self.get_artifact_fullname(element, oldkey) - newref = self.get_artifact_fullname(element, newkey) + except grpc.RpcError as e: + if e.code() != grpc.StatusCode.NOT_FOUND: + raise CASError("Failed to pull ref {}: {}".format(ref, e)) from e + else: + return False + # link_ref(): + # + # Add an alias for an existing ref. + # + # Args: + # oldref (str): An existing ref + # newref (str): A new ref for the same directory + # + def link_ref(self, oldref, newref): tree = self.resolve_ref(oldref) self.set_ref(newref, tree) - def _push_refs_to_remote(self, refs, remote): + # push(): + # + # Push committed refs to remote repository. + # + # Args: + # refs (list): The refs to push + # remote (CASRemote): The remote to push to + # + # Returns: + # (bool): True if any remote was updated, False if no pushes were required + # + # Raises: + # (CASError): if there was an error + # + def push(self, refs, remote): skipped_remote = True try: for ref in refs: tree = self.resolve_ref(ref) # Check whether ref is already on the server in which case - # there is no need to push the artifact + # there is no need to push the ref try: request = buildstream_pb2.GetReferenceRequest() request.key = ref @@ -364,42 +333,10 @@ class CASCache(ArtifactCache): skipped_remote = False except grpc.RpcError as e: if e.code() != grpc.StatusCode.RESOURCE_EXHAUSTED: - raise ArtifactError("Failed to push artifact {}: {}".format(refs, e), temporary=True) from e + raise CASError("Failed to push ref {}: {}".format(refs, e), temporary=True) from e return not skipped_remote - def push(self, element, keys): - - refs = [self.get_artifact_fullname(element, key) for key in list(keys)] - - project = element._get_project() - - push_remotes = [r for r in self._remotes[project] if r.spec.push] - - pushed = False - - for remote in push_remotes: - remote.init() - display_key = element._get_brief_display_key() - element.status("Pushing artifact {} -> {}".format(display_key, remote.spec.url)) - - if self._push_refs_to_remote(refs, remote): - element.info("Pushed artifact {} -> {}".format(display_key, remote.spec.url)) - pushed = True - else: - self.context.message(Message( - None, - MessageType.INFO, - "Remote ({}) already has {} cached".format( - remote.spec.url, element._get_brief_display_key()) - )) - - return pushed - - ################################################ - # API Private Methods # - ################################################ - # objpath(): # # Return the path of an object based on its digest. @@ -470,7 +407,7 @@ class CASCache(ArtifactCache): pass except OSError as e: - raise ArtifactError("Failed to hash object: {}".format(e)) from e + raise CASError("Failed to hash object: {}".format(e)) from e return digest @@ -511,25 +448,39 @@ class CASCache(ArtifactCache): return digest except FileNotFoundError as e: - raise ArtifactError("Attempt to access unavailable artifact: {}".format(e)) from e + raise CASError("Attempt to access unavailable ref: {}".format(e)) from e - def update_atime(self, ref): + # update_mtime() + # + # Update the mtime of a ref. + # + # Args: + # ref (str): The ref to update + # + def update_mtime(self, ref): try: os.utime(self._refpath(ref)) except FileNotFoundError as e: - raise ArtifactError("Attempt to access unavailable artifact: {}".format(e)) from e + raise CASError("Attempt to access unavailable ref: {}".format(e)) from e + # calculate_cache_size() + # + # Return the real disk usage of the CAS cache. + # + # Returns: + # (int): The size of the cache. + # def calculate_cache_size(self): return utils._get_dir_size(self.casdir) - # list_artifacts(): + # list_refs(): # - # List cached artifacts in Least Recently Modified (LRM) order. + # List refs in Least Recently Modified (LRM) order. # # Returns: # (list) - A list of refs in LRM order # - def list_artifacts(self): + def list_refs(self): # string of: /path/to/repo/refs/heads ref_heads = os.path.join(self.casdir, 'refs', 'heads') @@ -544,7 +495,7 @@ class CASCache(ArtifactCache): mtimes.append(os.path.getmtime(ref_path)) # NOTE: Sorted will sort from earliest to latest, thus the - # first element of this list will be the file modified earliest. + # first ref of this list will be the file modified earliest. return [ref for _, ref in sorted(zip(mtimes, refs))] # list_objects(): @@ -598,28 +549,10 @@ class CASCache(ArtifactCache): # def remove(self, ref, *, defer_prune=False): - # Remove extract if not used by other ref - tree = self.resolve_ref(ref) - ref_name, ref_hash = os.path.split(ref) - extract = os.path.join(self.extractdir, ref_name, tree.hash) - keys_file = os.path.join(extract, 'meta', 'keys.yaml') - if os.path.exists(keys_file): - keys_meta = _yaml.load(keys_file) - keys = [keys_meta['strong'], keys_meta['weak']] - remove_extract = True - for other_hash in keys: - if other_hash == ref_hash: - continue - remove_extract = False - break - - if remove_extract: - utils._force_rmtree(extract) - # Remove cache ref refpath = self._refpath(ref) if not os.path.exists(refpath): - raise ArtifactError("Could not find artifact for ref '{}'".format(ref)) + raise CASError("Could not find ref '{}'".format(ref)) os.unlink(refpath) @@ -730,7 +663,7 @@ class CASCache(ArtifactCache): symlinknode.name = name symlinknode.target = os.readlink(full_path) else: - raise ArtifactError("Unsupported file type for {}".format(full_path)) + raise CASError("Unsupported file type for {}".format(full_path)) return self.add_object(digest=dir_digest, buffer=directory.SerializeToString()) @@ -749,7 +682,7 @@ class CASCache(ArtifactCache): if dirnode.name == name: return dirnode.digest - raise ArtifactError("Subdirectory {} not found".format(name)) + raise CASError("Subdirectory {} not found".format(name)) def _diff_trees(self, tree_a, tree_b, *, added, removed, modified, path=""): dir_a = remote_execution_pb2.Directory() @@ -826,31 +759,6 @@ class CASCache(ArtifactCache): for dirnode in directory.directories: self._reachable_refs_dir(reachable, dirnode.digest, update_mtime=update_mtime) - def _initialize_remote(self, remote_spec, q): - try: - remote = _CASRemote(remote_spec) - remote.init() - - request = buildstream_pb2.StatusRequest() - for attempt in _retry(): - with attempt: - response = remote.ref_storage.Status(request) - - if remote_spec.push and not response.allow_updates: - q.put('Artifact server does not allow push') - else: - # No error - q.put(None) - - except grpc.RpcError as e: - # str(e) is too verbose for errors reported to the user - q.put(e.details()) - - except Exception as e: # pylint: disable=broad-except - # Whatever happens, we need to return it to the calling process - # - q.put(str(e)) - def _required_blobs(self, directory_digest): # parse directory, and recursively add blobs d = remote_execution_pb2.Digest() @@ -1090,7 +998,7 @@ class CASCache(ArtifactCache): # Represents a single remote CAS cache. # -class _CASRemote(): +class CASRemote(): def __init__(self, spec): self.spec = spec self._initialized = False @@ -1131,7 +1039,7 @@ class _CASRemote(): certificate_chain=client_cert_bytes) self.channel = grpc.secure_channel('{}:{}'.format(url.hostname, port), credentials) else: - raise ArtifactError("Unsupported URL: {}".format(self.spec.url)) + raise CASError("Unsupported URL: {}".format(self.spec.url)) self.bytestream = bytestream_pb2_grpc.ByteStreamStub(self.channel) self.cas = remote_execution_pb2_grpc.ContentAddressableStorageStub(self.channel) @@ -1220,10 +1128,10 @@ class _CASBatchRead(): raise BlobNotFound(response.digest.hash, "Failed to download blob {}: {}".format( response.digest.hash, response.status.code)) if response.status.code != grpc.StatusCode.OK.value[0]: - raise ArtifactError("Failed to download blob {}: {}".format( + raise CASError("Failed to download blob {}: {}".format( response.digest.hash, response.status.code)) if response.digest.size_bytes != len(response.data): - raise ArtifactError("Failed to download blob {}: expected {} bytes, received {} bytes".format( + raise CASError("Failed to download blob {}: expected {} bytes, received {} bytes".format( response.digest.hash, response.digest.size_bytes, len(response.data))) yield (response.digest, response.data) @@ -1267,7 +1175,7 @@ class _CASBatchUpdate(): for response in batch_response.responses: if response.status.code != grpc.StatusCode.OK.value[0]: - raise ArtifactError("Failed to upload blob {}: {}".format( + raise CASError("Failed to upload blob {}: {}".format( response.digest.hash, response.status.code)) |