diff options
author | Jürg Billeter <j@bitron.ch> | 2018-07-10 16:37:14 +0200 |
---|---|---|
committer | Jürg Billeter <j@bitron.ch> | 2018-07-17 07:56:40 +0200 |
commit | 971606aed477fbdbb5efe54b000ff8e587d7c53e (patch) | |
tree | 7515cc83d4c03ef85929152fbb1847363b93a8e0 | |
parent | ff8703c908552bd79c4e7d7ed0345b0c3fe0d126 (diff) | |
download | buildstream-971606aed477fbdbb5efe54b000ff8e587d7c53e.tar.gz |
_artifactcache/casserver.py: Add cache cleanup based on pushreceive
-rw-r--r-- | buildstream/_artifactcache/casserver.py | 66 |
1 files changed, 66 insertions, 0 deletions
diff --git a/buildstream/_artifactcache/casserver.py b/buildstream/_artifactcache/casserver.py index 0f897ebd5..a7086202e 100644 --- a/buildstream/_artifactcache/casserver.py +++ b/buildstream/_artifactcache/casserver.py @@ -18,6 +18,7 @@ # Jürg Billeter <juerg.billeter@codethink.co.uk> from concurrent import futures +import logging import os import signal import sys @@ -36,6 +37,11 @@ from .._context import Context from .cascache import CASCache +# Trying to push an artifact that is too large +class ArtifactTooLargeException(Exception): + pass + + # create_server(): # # Create gRPC CAS artifact server as specified in the Remote Execution API. @@ -163,6 +169,12 @@ class _ByteStreamServicer(bytestream_pb2_grpc.ByteStreamServicer): # First request resource_name = request.resource_name client_digest = _digest_from_resource_name(resource_name) + try: + _clean_up_cache(self.cas, client_digest.size_bytes) + except ArtifactTooLargeException as e: + context.set_code(grpc.StatusCode.RESOURCE_EXHAUSTED) + context.set_details(str(e)) + return response elif request.resource_name: # If it is set on subsequent calls, it **must** match the value of the first request. assert request.resource_name == resource_name @@ -247,3 +259,57 @@ def _digest_from_resource_name(resource_name): def _has_object(cas, digest): objpath = cas.objpath(digest) return os.path.exists(objpath) + + +# _clean_up_cache() +# +# Keep removing Least Recently Pushed (LRP) artifacts in a cache until there +# is enough space for the incoming artifact +# +# Args: +# cas: CASCache object +# object_size: The size of the object being received in bytes +# +# Returns: +# int: The total bytes removed on the filesystem +# +def _clean_up_cache(cas, object_size): + # Determine the available disk space, in bytes, of the file system + # which mounts the repo + stats = os.statvfs(cas.casdir) + buffer_ = int(2e9) # Add a 2 GB buffer + free_disk_space = (stats.f_bfree * stats.f_bsize) - buffer_ + total_disk_space = (stats.f_blocks * stats.f_bsize) - buffer_ + + if object_size > total_disk_space: + raise ArtifactTooLargeException("Artifact of size: {} is too large for " + "the filesystem which mounts the remote " + "cache".format(object_size)) + + if object_size <= free_disk_space: + # No need to clean up + return 0 + + # obtain a list of LRP artifacts + LRP_artifacts = cas.list_artifacts() + + removed_size = 0 # in bytes + while object_size - removed_size > free_disk_space: + try: + to_remove = LRP_artifacts.pop(0) # The first element in the list is the LRP artifact + except IndexError: + # This exception is caught if there are no more artifacts in the list + # LRP_artifacts. This means the the artifact is too large for the filesystem + # so we abort the process + raise ArtifactTooLargeException("Artifact of size {} is too large for " + "the filesystem which mounts the remote " + "cache".format(object_size)) + + removed_size += cas.remove(to_remove, defer_prune=False) + + if removed_size > 0: + logging.info("Successfully removed {} bytes from the cache".format(removed_size)) + else: + logging.info("No artifacts were removed from the cache.") + + return removed_size |