summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJürg Billeter <j@bitron.ch>2018-07-10 16:37:14 +0200
committerJürg Billeter <j@bitron.ch>2018-07-17 07:56:40 +0200
commit971606aed477fbdbb5efe54b000ff8e587d7c53e (patch)
tree7515cc83d4c03ef85929152fbb1847363b93a8e0
parentff8703c908552bd79c4e7d7ed0345b0c3fe0d126 (diff)
downloadbuildstream-971606aed477fbdbb5efe54b000ff8e587d7c53e.tar.gz
_artifactcache/casserver.py: Add cache cleanup based on pushreceive
-rw-r--r--buildstream/_artifactcache/casserver.py66
1 files changed, 66 insertions, 0 deletions
diff --git a/buildstream/_artifactcache/casserver.py b/buildstream/_artifactcache/casserver.py
index 0f897ebd5..a7086202e 100644
--- a/buildstream/_artifactcache/casserver.py
+++ b/buildstream/_artifactcache/casserver.py
@@ -18,6 +18,7 @@
# Jürg Billeter <juerg.billeter@codethink.co.uk>
from concurrent import futures
+import logging
import os
import signal
import sys
@@ -36,6 +37,11 @@ from .._context import Context
from .cascache import CASCache
+# Trying to push an artifact that is too large
+class ArtifactTooLargeException(Exception):
+ pass
+
+
# create_server():
#
# Create gRPC CAS artifact server as specified in the Remote Execution API.
@@ -163,6 +169,12 @@ class _ByteStreamServicer(bytestream_pb2_grpc.ByteStreamServicer):
# First request
resource_name = request.resource_name
client_digest = _digest_from_resource_name(resource_name)
+ try:
+ _clean_up_cache(self.cas, client_digest.size_bytes)
+ except ArtifactTooLargeException as e:
+ context.set_code(grpc.StatusCode.RESOURCE_EXHAUSTED)
+ context.set_details(str(e))
+ return response
elif request.resource_name:
# If it is set on subsequent calls, it **must** match the value of the first request.
assert request.resource_name == resource_name
@@ -247,3 +259,57 @@ def _digest_from_resource_name(resource_name):
def _has_object(cas, digest):
objpath = cas.objpath(digest)
return os.path.exists(objpath)
+
+
+# _clean_up_cache()
+#
+# Keep removing Least Recently Pushed (LRP) artifacts in a cache until there
+# is enough space for the incoming artifact
+#
+# Args:
+# cas: CASCache object
+# object_size: The size of the object being received in bytes
+#
+# Returns:
+# int: The total bytes removed on the filesystem
+#
+def _clean_up_cache(cas, object_size):
+ # Determine the available disk space, in bytes, of the file system
+ # which mounts the repo
+ stats = os.statvfs(cas.casdir)
+ buffer_ = int(2e9) # Add a 2 GB buffer
+ free_disk_space = (stats.f_bfree * stats.f_bsize) - buffer_
+ total_disk_space = (stats.f_blocks * stats.f_bsize) - buffer_
+
+ if object_size > total_disk_space:
+ raise ArtifactTooLargeException("Artifact of size: {} is too large for "
+ "the filesystem which mounts the remote "
+ "cache".format(object_size))
+
+ if object_size <= free_disk_space:
+ # No need to clean up
+ return 0
+
+ # obtain a list of LRP artifacts
+ LRP_artifacts = cas.list_artifacts()
+
+ removed_size = 0 # in bytes
+ while object_size - removed_size > free_disk_space:
+ try:
+ to_remove = LRP_artifacts.pop(0) # The first element in the list is the LRP artifact
+ except IndexError:
+ # This exception is caught if there are no more artifacts in the list
+ # LRP_artifacts. This means the the artifact is too large for the filesystem
+ # so we abort the process
+ raise ArtifactTooLargeException("Artifact of size {} is too large for "
+ "the filesystem which mounts the remote "
+ "cache".format(object_size))
+
+ removed_size += cas.remove(to_remove, defer_prune=False)
+
+ if removed_size > 0:
+ logging.info("Successfully removed {} bytes from the cache".format(removed_size))
+ else:
+ logging.info("No artifacts were removed from the cache.")
+
+ return removed_size