# # Copyright (C) 2018 Codethink Limited # # This program is free software; you can redistribute it and/or # modify it under the terms of the GNU Lesser General Public # License as published by the Free Software Foundation; either # version 2 of the License, or (at your option) any later version. # # This library is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU # Lesser General Public License for more details. # # You should have received a copy of the GNU Lesser General Public # License along with this library. If not, see . # # Authors: # Jürg Billeter import hashlib import itertools import os import stat import errno import uuid import contextlib from fnmatch import fnmatch import grpc from .._protos.build.bazel.remote.execution.v2 import remote_execution_pb2 from .._protos.buildstream.v2 import buildstream_pb2 from .. import utils from .._exceptions import CASCacheError, LoadError, LoadErrorReason from .._message import Message, MessageType from .casremote import BlobNotFound, _CASBatchRead, _CASBatchUpdate _BUFFER_SIZE = 65536 CACHE_SIZE_FILE = "cache_size" # CASCacheUsage # # A simple object to report the current CAS cache usage details. # # Note that this uses the user configured cache quota # rather than the internal quota with protective headroom # removed, to provide a more sensible value to display to # the user. # # Args: # cas (CASQuota): The CAS cache to get the status of # class CASCacheUsage(): def __init__(self, casquota): self.quota_config = casquota._config_cache_quota # Configured quota self.quota_size = casquota._cache_quota_original # Resolved cache quota in bytes self.used_size = casquota.get_cache_size() # Size used by artifacts in bytes self.used_percent = 0 # Percentage of the quota used if self.quota_size is not None: self.used_percent = int(self.used_size * 100 / self.quota_size) # Formattable into a human readable string # def __str__(self): return "{} / {} ({}%)" \ .format(utils._pretty_size(self.used_size, dec_places=1), self.quota_config, self.used_percent) # A CASCache manages a CAS repository as specified in the Remote Execution API. # # Args: # path (str): The root directory for the CAS repository # cache_quota (int): User configured cache quota # class CASCache(): def __init__(self, path): self.casdir = os.path.join(path, 'cas') self.tmpdir = os.path.join(path, 'tmp') os.makedirs(os.path.join(self.casdir, 'refs', 'heads'), exist_ok=True) os.makedirs(os.path.join(self.casdir, 'objects'), exist_ok=True) os.makedirs(self.tmpdir, exist_ok=True) # preflight(): # # Preflight check. # def preflight(self): headdir = os.path.join(self.casdir, 'refs', 'heads') objdir = os.path.join(self.casdir, 'objects') if not (os.path.isdir(headdir) and os.path.isdir(objdir)): raise CASCacheError("CAS repository check failed for '{}'".format(self.casdir)) # contains(): # # Check whether the specified ref is already available in the local CAS cache. # # Args: # ref (str): The ref to check # # Returns: True if the ref is in the cache, False otherwise # def contains(self, ref): refpath = self._refpath(ref) # This assumes that the repository doesn't have any dangling pointers return os.path.exists(refpath) # contains_subdir_artifact(): # # Check whether the specified artifact element tree has a digest for a subdir # which is populated in the cache, i.e non dangling. # # Args: # ref (str): The ref to check # subdir (str): The subdir to check # with_files (bool): Whether to check files as well # # Returns: True if the subdir exists & is populated in the cache, False otherwise # def contains_subdir_artifact(self, ref, subdir, *, with_files=True): tree = self.resolve_ref(ref) try: subdirdigest = self._get_subdir(tree, subdir) return self.contains_directory(subdirdigest, with_files=with_files) except (CASCacheError, FileNotFoundError): return False # contains_directory(): # # Check whether the specified directory and subdirecotires are in the cache, # i.e non dangling. # # Args: # digest (Digest): The directory digest to check # with_files (bool): Whether to check files as well # # Returns: True if the directory is available in the local cache # def contains_directory(self, digest, *, with_files): try: directory = remote_execution_pb2.Directory() with open(self.objpath(digest), 'rb') as f: directory.ParseFromString(f.read()) # Optionally check presence of files if with_files: for filenode in directory.files: if not os.path.exists(self.objpath(filenode.digest)): return False # Check subdirectories for dirnode in directory.directories: if not self.contains_directory(dirnode.digest, with_files=with_files): return False return True except FileNotFoundError: return False # checkout(): # # Checkout the specified directory digest. # # Args: # dest (str): The destination path # tree (Digest): The directory digest to extract # can_link (bool): Whether we can create hard links in the destination # def checkout(self, dest, tree, *, can_link=False): os.makedirs(dest, exist_ok=True) directory = remote_execution_pb2.Directory() with open(self.objpath(tree), 'rb') as f: directory.ParseFromString(f.read()) for filenode in directory.files: # regular file, create hardlink fullpath = os.path.join(dest, filenode.name) if can_link: utils.safe_link(self.objpath(filenode.digest), fullpath) else: utils.safe_copy(self.objpath(filenode.digest), fullpath) if filenode.is_executable: os.chmod(fullpath, stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR | stat.S_IRGRP | stat.S_IXGRP | stat.S_IROTH | stat.S_IXOTH) for dirnode in directory.directories: fullpath = os.path.join(dest, dirnode.name) self.checkout(fullpath, dirnode.digest, can_link=can_link) for symlinknode in directory.symlinks: # symlink fullpath = os.path.join(dest, symlinknode.name) os.symlink(symlinknode.target, fullpath) # commit(): # # Commit directory to cache. # # Args: # refs (list): The refs to set # path (str): The directory to import # def commit(self, refs, path): tree = self._commit_directory(path) for ref in refs: self.set_ref(ref, tree) # diff(): # # Return a list of files that have been added or modified between # the refs described by ref_a and ref_b. # # Args: # ref_a (str): The first ref # ref_b (str): The second ref # subdir (str): A subdirectory to limit the comparison to # def diff(self, ref_a, ref_b, *, subdir=None): tree_a = self.resolve_ref(ref_a) tree_b = self.resolve_ref(ref_b) if subdir: tree_a = self._get_subdir(tree_a, subdir) tree_b = self._get_subdir(tree_b, subdir) added = [] removed = [] modified = [] self._diff_trees(tree_a, tree_b, added=added, removed=removed, modified=modified) return modified, removed, added # pull(): # # Pull a ref from a remote repository. # # Args: # ref (str): The ref to pull # remote (CASRemote): The remote repository to pull from # progress (callable): The progress callback, if any # subdir (str): The optional specific subdir to pull # excluded_subdirs (list): The optional list of subdirs to not pull # # Returns: # (bool): True if pull was successful, False if ref was not available # def pull(self, ref, remote, *, progress=None, subdir=None, excluded_subdirs=None): try: remote.init() request = buildstream_pb2.GetReferenceRequest(instance_name=remote.spec.instance_name) request.key = ref response = remote.ref_storage.GetReference(request) tree = remote_execution_pb2.Digest() tree.hash = response.digest.hash tree.size_bytes = response.digest.size_bytes # Fetch Directory objects self._fetch_directory(remote, tree) # Fetch files, excluded_subdirs determined in pullqueue required_blobs = self._required_blobs(tree, excluded_subdirs=excluded_subdirs) missing_blobs = self.local_missing_blobs(required_blobs) if missing_blobs: self.fetch_blobs(remote, missing_blobs) self.set_ref(ref, tree) return True except grpc.RpcError as e: if e.code() != grpc.StatusCode.NOT_FOUND: raise CASCacheError("Failed to pull ref {}: {}".format(ref, e)) from e else: return False except BlobNotFound as e: return False # pull_tree(): # # Pull a single Tree rather than a ref. # Does not update local refs. # # Args: # remote (CASRemote): The remote to pull from # digest (Digest): The digest of the tree # def pull_tree(self, remote, digest): try: remote.init() digest = self._fetch_tree(remote, digest) return digest except grpc.RpcError as e: if e.code() != grpc.StatusCode.NOT_FOUND: raise return None # link_ref(): # # Add an alias for an existing ref. # # Args: # oldref (str): An existing ref # newref (str): A new ref for the same directory # def link_ref(self, oldref, newref): tree = self.resolve_ref(oldref) self.set_ref(newref, tree) # push(): # # Push committed refs to remote repository. # # Args: # refs (list): The refs to push # remote (CASRemote): The remote to push to # # Returns: # (bool): True if any remote was updated, False if no pushes were required # # Raises: # (CASCacheError): if there was an error # def push(self, refs, remote): skipped_remote = True try: for ref in refs: tree = self.resolve_ref(ref) # Check whether ref is already on the server in which case # there is no need to push the ref try: request = buildstream_pb2.GetReferenceRequest(instance_name=remote.spec.instance_name) request.key = ref response = remote.ref_storage.GetReference(request) if response.digest.hash == tree.hash and response.digest.size_bytes == tree.size_bytes: # ref is already on the server with the same tree continue except grpc.RpcError as e: if e.code() != grpc.StatusCode.NOT_FOUND: # Intentionally re-raise RpcError for outer except block. raise self._send_directory(remote, tree) request = buildstream_pb2.UpdateReferenceRequest(instance_name=remote.spec.instance_name) request.keys.append(ref) request.digest.hash = tree.hash request.digest.size_bytes = tree.size_bytes remote.ref_storage.UpdateReference(request) skipped_remote = False except grpc.RpcError as e: if e.code() != grpc.StatusCode.RESOURCE_EXHAUSTED: raise CASCacheError("Failed to push ref {}: {}".format(refs, e), temporary=True) from e return not skipped_remote # push_directory(): # # Push the given virtual directory to a remote. # # Args: # remote (CASRemote): The remote to push to # directory (Directory): A virtual directory object to push. # # Raises: # (CASCacheError): if there was an error # def push_directory(self, remote, directory): remote.init() digest = directory._get_digest() self._send_directory(remote, digest) # objpath(): # # Return the path of an object based on its digest. # # Args: # digest (Digest): The digest of the object # # Returns: # (str): The path of the object # def objpath(self, digest): return os.path.join(self.casdir, 'objects', digest.hash[:2], digest.hash[2:]) # add_object(): # # Hash and write object to CAS. # # Args: # digest (Digest): An optional Digest object to populate # path (str): Path to file to add # buffer (bytes): Byte buffer to add # link_directly (bool): Whether file given by path can be linked # # Returns: # (Digest): The digest of the added object # # Either `path` or `buffer` must be passed, but not both. # def add_object(self, *, digest=None, path=None, buffer=None, link_directly=False): # Exactly one of the two parameters has to be specified assert (path is None) != (buffer is None) if digest is None: digest = remote_execution_pb2.Digest() try: h = hashlib.sha256() # Always write out new file to avoid corruption if input file is modified with contextlib.ExitStack() as stack: if path is not None and link_directly: tmp = stack.enter_context(open(path, 'rb')) for chunk in iter(lambda: tmp.read(_BUFFER_SIZE), b""): h.update(chunk) else: tmp = stack.enter_context(self._temporary_object()) if path: with open(path, 'rb') as f: for chunk in iter(lambda: f.read(_BUFFER_SIZE), b""): h.update(chunk) tmp.write(chunk) else: h.update(buffer) tmp.write(buffer) tmp.flush() digest.hash = h.hexdigest() digest.size_bytes = os.fstat(tmp.fileno()).st_size # Place file at final location objpath = self.objpath(digest) os.makedirs(os.path.dirname(objpath), exist_ok=True) os.link(tmp.name, objpath) except FileExistsError as e: # We can ignore the failed link() if the object is already in the repo. pass except OSError as e: raise CASCacheError("Failed to hash object: {}".format(e)) from e return digest # set_ref(): # # Create or replace a ref. # # Args: # ref (str): The name of the ref # def set_ref(self, ref, tree): refpath = self._refpath(ref) os.makedirs(os.path.dirname(refpath), exist_ok=True) with utils.save_file_atomic(refpath, 'wb', tempdir=self.tmpdir) as f: f.write(tree.SerializeToString()) # resolve_ref(): # # Resolve a ref to a digest. # # Args: # ref (str): The name of the ref # update_mtime (bool): Whether to update the mtime of the ref # # Returns: # (Digest): The digest stored in the ref # def resolve_ref(self, ref, *, update_mtime=False): refpath = self._refpath(ref) try: with open(refpath, 'rb') as f: if update_mtime: os.utime(refpath) digest = remote_execution_pb2.Digest() digest.ParseFromString(f.read()) return digest except FileNotFoundError as e: raise CASCacheError("Attempt to access unavailable ref: {}".format(e)) from e # update_mtime() # # Update the mtime of a ref. # # Args: # ref (str): The ref to update # def update_mtime(self, ref): try: os.utime(self._refpath(ref)) except FileNotFoundError as e: raise CASCacheError("Attempt to access unavailable ref: {}".format(e)) from e # list_refs(): # # List refs in Least Recently Modified (LRM) order. # # Args: # glob (str) - An optional glob expression to be used to list refs satisfying the glob # # Returns: # (list) - A list of refs in LRM order # def list_refs(self, *, glob=None): # string of: /path/to/repo/refs/heads ref_heads = os.path.join(self.casdir, 'refs', 'heads') path = ref_heads if glob is not None: globdir = os.path.dirname(glob) if not any(c in "*?[" for c in globdir): # path prefix contains no globbing characters so # append the glob to optimise the os.walk() path = os.path.join(ref_heads, globdir) refs = [] mtimes = [] for root, _, files in os.walk(path): for filename in files: ref_path = os.path.join(root, filename) relative_path = os.path.relpath(ref_path, ref_heads) # Relative to refs head if not glob or fnmatch(relative_path, glob): refs.append(relative_path) # Obtain the mtime (the time a file was last modified) mtimes.append(os.path.getmtime(ref_path)) # NOTE: Sorted will sort from earliest to latest, thus the # first ref of this list will be the file modified earliest. return [ref for _, ref in sorted(zip(mtimes, refs))] # list_objects(): # # List cached objects in Least Recently Modified (LRM) order. # # Returns: # (list) - A list of objects and timestamps in LRM order # def list_objects(self): objs = [] mtimes = [] for root, _, files in os.walk(os.path.join(self.casdir, 'objects')): for filename in files: obj_path = os.path.join(root, filename) try: mtimes.append(os.path.getmtime(obj_path)) except FileNotFoundError: pass else: objs.append(obj_path) # NOTE: Sorted will sort from earliest to latest, thus the # first element of this list will be the file modified earliest. return sorted(zip(mtimes, objs)) def clean_up_refs_until(self, time): ref_heads = os.path.join(self.casdir, 'refs', 'heads') for root, _, files in os.walk(ref_heads): for filename in files: ref_path = os.path.join(root, filename) # Obtain the mtime (the time a file was last modified) if os.path.getmtime(ref_path) < time: os.unlink(ref_path) # remove(): # # Removes the given symbolic ref from the repo. # # Args: # ref (str): A symbolic ref # defer_prune (bool): Whether to defer pruning to the caller. NOTE: # The space won't be freed until you manually # call prune. # # Returns: # (int|None) The amount of space pruned from the repository in # Bytes, or None if defer_prune is True # def remove(self, ref, *, defer_prune=False): # Remove cache ref self._remove_ref(ref) if not defer_prune: pruned = self.prune() return pruned return None # prune(): # # Prune unreachable objects from the repo. # def prune(self): ref_heads = os.path.join(self.casdir, 'refs', 'heads') pruned = 0 reachable = set() # Check which objects are reachable for root, _, files in os.walk(ref_heads): for filename in files: ref_path = os.path.join(root, filename) ref = os.path.relpath(ref_path, ref_heads) tree = self.resolve_ref(ref) self._reachable_refs_dir(reachable, tree) # Prune unreachable objects for root, _, files in os.walk(os.path.join(self.casdir, 'objects')): for filename in files: objhash = os.path.basename(root) + filename if objhash not in reachable: obj_path = os.path.join(root, filename) pruned += os.stat(obj_path).st_size os.unlink(obj_path) return pruned def update_tree_mtime(self, tree): reachable = set() self._reachable_refs_dir(reachable, tree, update_mtime=True) # remote_missing_blobs_for_directory(): # # Determine which blobs of a directory tree are missing on the remote. # # Args: # digest (Digest): The directory digest # # Returns: List of missing Digest objects # def remote_missing_blobs_for_directory(self, remote, digest): required_blobs = self._required_blobs(digest) missing_blobs = dict() # Limit size of FindMissingBlobs request for required_blobs_group in _grouper(required_blobs, 512): request = remote_execution_pb2.FindMissingBlobsRequest(instance_name=remote.spec.instance_name) for required_digest in required_blobs_group: d = request.blob_digests.add() d.hash = required_digest.hash d.size_bytes = required_digest.size_bytes response = remote.cas.FindMissingBlobs(request) for missing_digest in response.missing_blob_digests: d = remote_execution_pb2.Digest() d.hash = missing_digest.hash d.size_bytes = missing_digest.size_bytes missing_blobs[d.hash] = d return missing_blobs.values() # local_missing_blobs(): # # Check local cache for missing blobs. # # Args: # digests (list): The Digests of blobs to check # # Returns: Missing Digest objects # def local_missing_blobs(self, digests): missing_blobs = [] for digest in digests: objpath = self.objpath(digest) if not os.path.exists(objpath): missing_blobs.append(digest) return missing_blobs ################################################ # Local Private Methods # ################################################ def _refpath(self, ref): return os.path.join(self.casdir, 'refs', 'heads', ref) # _remove_ref() # # Removes a ref. # # This also takes care of pruning away directories which can # be removed after having removed the given ref. # # Args: # ref (str): The ref to remove # # Raises: # (CASCacheError): If the ref didnt exist, or a system error # occurred while removing it # def _remove_ref(self, ref): # Remove the ref itself refpath = self._refpath(ref) try: os.unlink(refpath) except FileNotFoundError as e: raise CASCacheError("Could not find ref '{}'".format(ref)) from e # Now remove any leading directories basedir = os.path.join(self.casdir, 'refs', 'heads') components = list(os.path.split(ref)) while components: components.pop() refdir = os.path.join(basedir, *components) # Break out once we reach the base if refdir == basedir: break try: os.rmdir(refdir) except FileNotFoundError: # The parent directory did not exist, but it's # parent directory might still be ready to prune pass except OSError as e: if e.errno == errno.ENOTEMPTY: # The parent directory was not empty, so we # cannot prune directories beyond this point break # Something went wrong here raise CASCacheError("System error while removing ref '{}': {}".format(ref, e)) from e # _commit_directory(): # # Adds local directory to content addressable store. # # Adds files, symbolic links and recursively other directories in # a local directory to the content addressable store. # # Args: # path (str): Path to the directory to add. # dir_digest (Digest): An optional Digest object to use. # # Returns: # (Digest): Digest object for the directory added. # def _commit_directory(self, path, *, dir_digest=None): directory = remote_execution_pb2.Directory() for name in sorted(os.listdir(path)): full_path = os.path.join(path, name) mode = os.lstat(full_path).st_mode if stat.S_ISDIR(mode): dirnode = directory.directories.add() dirnode.name = name self._commit_directory(full_path, dir_digest=dirnode.digest) elif stat.S_ISREG(mode): filenode = directory.files.add() filenode.name = name self.add_object(path=full_path, digest=filenode.digest) filenode.is_executable = (mode & stat.S_IXUSR) == stat.S_IXUSR elif stat.S_ISLNK(mode): symlinknode = directory.symlinks.add() symlinknode.name = name symlinknode.target = os.readlink(full_path) elif stat.S_ISSOCK(mode): # The process serving the socket can't be cached anyway pass else: raise CASCacheError("Unsupported file type for {}".format(full_path)) return self.add_object(digest=dir_digest, buffer=directory.SerializeToString()) def _get_subdir(self, tree, subdir): head, name = os.path.split(subdir) if head: tree = self._get_subdir(tree, head) directory = remote_execution_pb2.Directory() with open(self.objpath(tree), 'rb') as f: directory.ParseFromString(f.read()) for dirnode in directory.directories: if dirnode.name == name: return dirnode.digest raise CASCacheError("Subdirectory {} not found".format(name)) def _diff_trees(self, tree_a, tree_b, *, added, removed, modified, path=""): dir_a = remote_execution_pb2.Directory() dir_b = remote_execution_pb2.Directory() if tree_a: with open(self.objpath(tree_a), 'rb') as f: dir_a.ParseFromString(f.read()) if tree_b: with open(self.objpath(tree_b), 'rb') as f: dir_b.ParseFromString(f.read()) a = 0 b = 0 while a < len(dir_a.files) or b < len(dir_b.files): if b < len(dir_b.files) and (a >= len(dir_a.files) or dir_a.files[a].name > dir_b.files[b].name): added.append(os.path.join(path, dir_b.files[b].name)) b += 1 elif a < len(dir_a.files) and (b >= len(dir_b.files) or dir_b.files[b].name > dir_a.files[a].name): removed.append(os.path.join(path, dir_a.files[a].name)) a += 1 else: # File exists in both directories if dir_a.files[a].digest.hash != dir_b.files[b].digest.hash: modified.append(os.path.join(path, dir_a.files[a].name)) a += 1 b += 1 a = 0 b = 0 while a < len(dir_a.directories) or b < len(dir_b.directories): if b < len(dir_b.directories) and (a >= len(dir_a.directories) or dir_a.directories[a].name > dir_b.directories[b].name): self._diff_trees(None, dir_b.directories[b].digest, added=added, removed=removed, modified=modified, path=os.path.join(path, dir_b.directories[b].name)) b += 1 elif a < len(dir_a.directories) and (b >= len(dir_b.directories) or dir_b.directories[b].name > dir_a.directories[a].name): self._diff_trees(dir_a.directories[a].digest, None, added=added, removed=removed, modified=modified, path=os.path.join(path, dir_a.directories[a].name)) a += 1 else: # Subdirectory exists in both directories if dir_a.directories[a].digest.hash != dir_b.directories[b].digest.hash: self._diff_trees(dir_a.directories[a].digest, dir_b.directories[b].digest, added=added, removed=removed, modified=modified, path=os.path.join(path, dir_a.directories[a].name)) a += 1 b += 1 def _reachable_refs_dir(self, reachable, tree, update_mtime=False): if tree.hash in reachable: return try: if update_mtime: os.utime(self.objpath(tree)) reachable.add(tree.hash) directory = remote_execution_pb2.Directory() with open(self.objpath(tree), 'rb') as f: directory.ParseFromString(f.read()) except FileNotFoundError: # Just exit early if the file doesn't exist return for filenode in directory.files: if update_mtime: os.utime(self.objpath(filenode.digest)) reachable.add(filenode.digest.hash) for dirnode in directory.directories: self._reachable_refs_dir(reachable, dirnode.digest, update_mtime=update_mtime) def _required_blobs(self, directory_digest, *, excluded_subdirs=None): if not excluded_subdirs: excluded_subdirs = [] # parse directory, and recursively add blobs d = remote_execution_pb2.Digest() d.hash = directory_digest.hash d.size_bytes = directory_digest.size_bytes yield d directory = remote_execution_pb2.Directory() with open(self.objpath(directory_digest), 'rb') as f: directory.ParseFromString(f.read()) for filenode in directory.files: d = remote_execution_pb2.Digest() d.hash = filenode.digest.hash d.size_bytes = filenode.digest.size_bytes yield d for dirnode in directory.directories: if dirnode.name not in excluded_subdirs: yield from self._required_blobs(dirnode.digest) # _temporary_object(): # # Returns: # (file): A file object to a named temporary file. # # Create a named temporary file with 0o0644 access rights. @contextlib.contextmanager def _temporary_object(self): with utils._tempnamedfile(dir=self.tmpdir) as f: os.chmod(f.name, stat.S_IRUSR | stat.S_IWUSR | stat.S_IRGRP | stat.S_IROTH) yield f # _ensure_blob(): # # Fetch and add blob if it's not already local. # # Args: # remote (Remote): The remote to use. # digest (Digest): Digest object for the blob to fetch. # # Returns: # (str): The path of the object # def _ensure_blob(self, remote, digest): objpath = self.objpath(digest) if os.path.exists(objpath): # already in local repository return objpath with self._temporary_object() as f: remote._fetch_blob(digest, f) added_digest = self.add_object(path=f.name, link_directly=True) assert added_digest.hash == digest.hash return objpath def _batch_download_complete(self, batch, *, missing_blobs=None): for digest, data in batch.send(missing_blobs=missing_blobs): with self._temporary_object() as f: f.write(data) f.flush() added_digest = self.add_object(path=f.name, link_directly=True) assert added_digest.hash == digest.hash # Helper function for _fetch_directory(). def _fetch_directory_batch(self, remote, batch, fetch_queue, fetch_next_queue): self._batch_download_complete(batch) # All previously scheduled directories are now locally available, # move them to the processing queue. fetch_queue.extend(fetch_next_queue) fetch_next_queue.clear() return _CASBatchRead(remote) # Helper function for _fetch_directory(). def _fetch_directory_node(self, remote, digest, batch, fetch_queue, fetch_next_queue, *, recursive=False): in_local_cache = os.path.exists(self.objpath(digest)) if in_local_cache: # Skip download, already in local cache. pass elif (digest.size_bytes >= remote.max_batch_total_size_bytes or not remote.batch_read_supported): # Too large for batch request, download in independent request. self._ensure_blob(remote, digest) in_local_cache = True else: if not batch.add(digest): # Not enough space left in batch request. # Complete pending batch first. batch = self._fetch_directory_batch(remote, batch, fetch_queue, fetch_next_queue) batch.add(digest) if recursive: if in_local_cache: # Add directory to processing queue. fetch_queue.append(digest) else: # Directory will be available after completing pending batch. # Add directory to deferred processing queue. fetch_next_queue.append(digest) return batch # _fetch_directory(): # # Fetches remote directory and adds it to content addressable store. # # This recursively fetches directory objects but doesn't fetch any # files. # # Args: # remote (Remote): The remote to use. # dir_digest (Digest): Digest object for the directory to fetch. # def _fetch_directory(self, remote, dir_digest): # TODO Use GetTree() if the server supports it fetch_queue = [dir_digest] fetch_next_queue = [] batch = _CASBatchRead(remote) while len(fetch_queue) + len(fetch_next_queue) > 0: if not fetch_queue: batch = self._fetch_directory_batch(remote, batch, fetch_queue, fetch_next_queue) dir_digest = fetch_queue.pop(0) objpath = self._ensure_blob(remote, dir_digest) directory = remote_execution_pb2.Directory() with open(objpath, 'rb') as f: directory.ParseFromString(f.read()) for dirnode in directory.directories: batch = self._fetch_directory_node(remote, dirnode.digest, batch, fetch_queue, fetch_next_queue, recursive=True) # Fetch final batch self._fetch_directory_batch(remote, batch, fetch_queue, fetch_next_queue) def _fetch_tree(self, remote, digest): # download but do not store the Tree object with utils._tempnamedfile(dir=self.tmpdir) as out: remote._fetch_blob(digest, out) tree = remote_execution_pb2.Tree() with open(out.name, 'rb') as f: tree.ParseFromString(f.read()) tree.children.extend([tree.root]) for directory in tree.children: for filenode in directory.files: self._ensure_blob(remote, filenode.digest) # place directory blob only in final location when we've downloaded # all referenced blobs to avoid dangling references in the repository dirbuffer = directory.SerializeToString() dirdigest = self.add_object(buffer=dirbuffer) assert dirdigest.size_bytes == len(dirbuffer) return dirdigest # fetch_blobs(): # # Fetch blobs from remote CAS. Returns missing blobs that could not be fetched. # # Args: # remote (CASRemote): The remote repository to fetch from # digests (list): The Digests of blobs to fetch # # Returns: The Digests of the blobs that were not available on the remote CAS # def fetch_blobs(self, remote, digests): missing_blobs = [] batch = _CASBatchRead(remote) for digest in digests: if (digest.size_bytes >= remote.max_batch_total_size_bytes or not remote.batch_read_supported): # Too large for batch request, download in independent request. try: self._ensure_blob(remote, digest) except grpc.RpcError as e: if e.code() == grpc.StatusCode.NOT_FOUND: missing_blobs.append(digest) else: raise CASCacheError("Failed to fetch blob: {}".format(e)) from e else: if not batch.add(digest): # Not enough space left in batch request. # Complete pending batch first. self._batch_download_complete(batch, missing_blobs=missing_blobs) batch = _CASBatchRead(remote) batch.add(digest) # Complete last pending batch self._batch_download_complete(batch, missing_blobs=missing_blobs) return missing_blobs # send_blobs(): # # Upload blobs to remote CAS. # # Args: # remote (CASRemote): The remote repository to upload to # digests (list): The Digests of Blobs to upload # def send_blobs(self, remote, digests, u_uid=uuid.uuid4()): batch = _CASBatchUpdate(remote) for digest in digests: with open(self.objpath(digest), 'rb') as f: assert os.fstat(f.fileno()).st_size == digest.size_bytes if (digest.size_bytes >= remote.max_batch_total_size_bytes or not remote.batch_update_supported): # Too large for batch request, upload in independent request. remote._send_blob(digest, f, u_uid=u_uid) else: if not batch.add(digest, f): # Not enough space left in batch request. # Complete pending batch first. batch.send() batch = _CASBatchUpdate(remote) batch.add(digest, f) # Send final batch batch.send() def _send_directory(self, remote, digest, u_uid=uuid.uuid4()): missing_blobs = self.remote_missing_blobs_for_directory(remote, digest) # Upload any blobs missing on the server self.send_blobs(remote, missing_blobs, u_uid) class CASQuota: def __init__(self, context): self.context = context self.cas = context.get_cascache() self.casdir = self.cas.casdir self._config_cache_quota = context.config_cache_quota self._config_cache_quota_string = context.config_cache_quota_string self._cache_size = None # The current cache size, sometimes it's an estimate self._cache_quota = None # The cache quota self._cache_quota_original = None # The cache quota as specified by the user, in bytes self._cache_quota_headroom = None # The headroom in bytes before reaching the quota or full disk self._cache_lower_threshold = None # The target cache size for a cleanup self.available_space = None self._message = context.message self._ref_callbacks = [] # Call backs to get required refs self._remove_callbacks = [] # Call backs to remove refs self._calculate_cache_quota() # compute_cache_size() # # Computes the real artifact cache size. # # Returns: # (int): The size of the artifact cache. # def compute_cache_size(self): self._cache_size = utils._get_dir_size(self.casdir) return self._cache_size # get_cache_size() # # Fetches the cached size of the cache, this is sometimes # an estimate and periodically adjusted to the real size # when a cache size calculation job runs. # # When it is an estimate, the value is either correct, or # it is greater than the actual cache size. # # Returns: # (int) An approximation of the artifact cache size, in bytes. # def get_cache_size(self): # If we don't currently have an estimate, figure out the real cache size. if self._cache_size is None: stored_size = self._read_cache_size() if stored_size is not None: self._cache_size = stored_size else: self.compute_cache_size() return self._cache_size # set_cache_size() # # Forcefully set the overall cache size. # # This is used to update the size in the main process after # having calculated in a cleanup or a cache size calculation job. # # Args: # cache_size (int): The size to set. # write_to_disk (bool): Whether to write the value to disk. # def set_cache_size(self, cache_size, *, write_to_disk=True): assert cache_size is not None self._cache_size = cache_size if write_to_disk: self._write_cache_size(self._cache_size) # full() # # Checks if the artifact cache is full, either # because the user configured quota has been exceeded # or because the underlying disk is almost full. # # Returns: # (bool): True if the artifact cache is full # def full(self): if self.get_cache_size() > self._cache_quota: return True _, volume_avail = self._get_cache_volume_size() if volume_avail < self._cache_quota_headroom: return True return False ################################################ # Local Private Methods # ################################################ # _read_cache_size() # # Reads and returns the size of the artifact cache that's stored in the # cache's size file # # Returns: # (int): The size of the artifact cache, as recorded in the file # def _read_cache_size(self): size_file_path = os.path.join(self.casdir, CACHE_SIZE_FILE) if not os.path.exists(size_file_path): return None with open(size_file_path, "r") as f: size = f.read() try: num_size = int(size) except ValueError as e: raise CASCacheError("Size '{}' parsed from '{}' was not an integer".format( size, size_file_path)) from e return num_size # _write_cache_size() # # Writes the given size of the artifact to the cache's size file # # Args: # size (int): The size of the artifact cache to record # def _write_cache_size(self, size): assert isinstance(size, int) size_file_path = os.path.join(self.casdir, CACHE_SIZE_FILE) with utils.save_file_atomic(size_file_path, "w", tempdir=self.cas.tmpdir) as f: f.write(str(size)) # _get_cache_volume_size() # # Get the available space and total space for the volume on # which the artifact cache is located. # # Returns: # (int): The total number of bytes on the volume # (int): The number of available bytes on the volume # # NOTE: We use this stub to allow the test cases # to override what an artifact cache thinks # about it's disk size and available bytes. # def _get_cache_volume_size(self): return utils._get_volume_size(self.casdir) # _calculate_cache_quota() # # Calculates and sets the cache quota and lower threshold based on the # quota set in Context. # It checks that the quota is both a valid expression, and that there is # enough disk space to satisfy that quota # def _calculate_cache_quota(self): # Headroom intended to give BuildStream a bit of leeway. # This acts as the minimum size of cache_quota and also # is taken from the user requested cache_quota. # if 'BST_TEST_SUITE' in os.environ: self._cache_quota_headroom = 0 else: self._cache_quota_headroom = 2e9 total_size, available_space = self._get_cache_volume_size() cache_size = self.get_cache_size() self.available_space = available_space # Ensure system has enough storage for the cache_quota # # If cache_quota is none, set it to the maximum it could possibly be. # # Also check that cache_quota is at least as large as our headroom. # cache_quota = self._config_cache_quota if cache_quota is None: # The user has set no limit, so we may take all the space. cache_quota = min(cache_size + available_space, total_size) if cache_quota < self._cache_quota_headroom: # Check minimum raise LoadError( LoadErrorReason.INVALID_DATA, "Invalid cache quota ({}): BuildStream requires a minimum cache quota of {}.".format( utils._pretty_size(cache_quota), utils._pretty_size(self._cache_quota_headroom))) elif cache_quota > total_size: # A quota greater than the total disk size is certianly an error raise CASCacheError("Your system does not have enough available " + "space to support the cache quota specified.", detail=("You have specified a quota of {quota} total disk space.\n" + "The filesystem containing {local_cache_path} only " + "has {total_size} total disk space.") .format( quota=self._config_cache_quota, local_cache_path=self.casdir, total_size=utils._pretty_size(total_size)), reason='insufficient-storage-for-quota') elif cache_quota > cache_size + available_space: # The quota does not fit in the available space, this is a warning if '%' in self._config_cache_quota_string: available = (available_space / total_size) * 100 available = '{}% of total disk space'.format(round(available, 1)) else: available = utils._pretty_size(available_space) self._message(Message( None, MessageType.WARN, "Your system does not have enough available " + "space to support the cache quota specified.", detail=("You have specified a quota of {quota} total disk space.\n" + "The filesystem containing {local_cache_path} only " + "has {available_size} available.") .format(quota=self._config_cache_quota, local_cache_path=self.casdir, available_size=available))) # Place a slight headroom (2e9 (2GB) on the cache_quota) into # cache_quota to try and avoid exceptions. # # Of course, we might still end up running out during a build # if we end up writing more than 2G, but hey, this stuff is # already really fuzzy. # self._cache_quota_original = cache_quota self._cache_quota = cache_quota - self._cache_quota_headroom self._cache_lower_threshold = self._cache_quota / 2 # clean(): # # Clean the artifact cache as much as possible. # # Args: # progress (callable): A callback to call when a ref is removed # # Returns: # (int): The size of the cache after having cleaned up # def clean(self, progress=None): context = self.context # Some accumulative statistics removed_ref_count = 0 space_saved = 0 # get required refs refs = self.cas.list_refs() required_refs = set(itertools.chain.from_iterable(self._ref_callbacks)) # Start off with an announcement with as much info as possible volume_size, volume_avail = self._get_cache_volume_size() self._message(Message( None, MessageType.STATUS, "Starting cache cleanup", detail=("Elements required by the current build plan: {}\n" + "User specified quota: {} ({})\n" + "Cache usage: {}\n" + "Cache volume: {} total, {} available") .format(len(required_refs), context.config_cache_quota, utils._pretty_size(self._cache_quota, dec_places=2), utils._pretty_size(self.get_cache_size(), dec_places=2), utils._pretty_size(volume_size, dec_places=2), utils._pretty_size(volume_avail, dec_places=2)))) # Do a real computation of the cache size once, just in case self.compute_cache_size() usage = CASCacheUsage(self) self._message(Message(None, MessageType.STATUS, "Cache usage recomputed: {}".format(usage))) while self.get_cache_size() >= self._cache_lower_threshold: try: to_remove = refs.pop(0) except IndexError: # If too many artifacts are required, and we therefore # can't remove them, we have to abort the build. # # FIXME: Asking the user what to do may be neater # default_conf = os.path.join(os.environ['XDG_CONFIG_HOME'], 'buildstream.conf') detail = ("Aborted after removing {} refs and saving {} disk space.\n" "The remaining {} in the cache is required by the {} references in your build plan\n\n" "There is not enough space to complete the build.\n" "Please increase the cache-quota in {} and/or make more disk space." .format(removed_ref_count, utils._pretty_size(space_saved, dec_places=2), utils._pretty_size(self.get_cache_size(), dec_places=2), len(required_refs), (context.config_origin or default_conf))) if self.full(): raise CASCacheError("Cache too full. Aborting.", detail=detail, reason="cache-too-full") else: break key = to_remove.rpartition('/')[2] if key not in required_refs: # Remove the actual artifact, if it's not required. size = 0 removed_ref = False for (pred, remove) in self._remove_callbacks: if pred(to_remove): size = remove(to_remove) removed_ref = True break if not removed_ref: continue removed_ref_count += 1 space_saved += size self._message(Message( None, MessageType.STATUS, "Freed {: <7} {}".format( utils._pretty_size(size, dec_places=2), to_remove))) self.set_cache_size(self._cache_size - size) # User callback # # Currently this process is fairly slow, but we should # think about throttling this progress() callback if this # becomes too intense. if progress: progress() # Informational message about the side effects of the cleanup self._message(Message( None, MessageType.INFO, "Cleanup completed", detail=("Removed {} refs and saving {} disk space.\n" + "Cache usage is now: {}") .format(removed_ref_count, utils._pretty_size(space_saved, dec_places=2), utils._pretty_size(self.get_cache_size(), dec_places=2)))) return self.get_cache_size() # add_ref_callbacks() # # Args: # callback (Iterator): function that gives list of required refs def add_ref_callbacks(self, callback): self._ref_callbacks.append(callback) # add_remove_callbacks() # # Args: # callback (predicate, callback): The predicate says whether this is the # correct type to remove given a ref and the callback does actual # removing. def add_remove_callbacks(self, callback): self._remove_callbacks.append(callback) def _grouper(iterable, n): while True: try: current = next(iterable) except StopIteration: return yield itertools.chain([current], itertools.islice(iterable, n - 1))