summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorValentin David <valentin.david@codethink.co.uk>2018-11-16 15:25:12 +0100
committerValentin David <valentin.david@codethink.co.uk>2018-11-28 15:29:52 +0100
commit353b90dda760f320ec5b97c0bb56dce2ed7ea68f (patch)
tree48d34cce0a7da082cb1aa1ca9061413046a15503
parenta64f667db8cc8123b7a77c9871143fbe8d008aaf (diff)
downloadbuildstream-353b90dda760f320ec5b97c0bb56dce2ed7ea68f.tar.gz
Cleanup cache in cas server more agressively
When there is less than 2GB left, it cleans up have 10GB available. These values are configurable.
-rw-r--r--buildstream/_artifactcache/casserver.py149
-rw-r--r--tests/frontend/push.py4
-rw-r--r--tests/testutils/artifactshare.py21
3 files changed, 107 insertions, 67 deletions
diff --git a/buildstream/_artifactcache/casserver.py b/buildstream/_artifactcache/casserver.py
index 3a6481fb2..84d22cc51 100644
--- a/buildstream/_artifactcache/casserver.py
+++ b/buildstream/_artifactcache/casserver.py
@@ -57,18 +57,22 @@ class ArtifactTooLargeException(Exception):
# repo (str): Path to CAS repository
# enable_push (bool): Whether to allow blob uploads and artifact updates
#
-def create_server(repo, *, enable_push):
+def create_server(repo, *, enable_push,
+ max_head_size=int(10e9),
+ min_head_size=int(2e9)):
cas = CASCache(os.path.abspath(repo))
# Use max_workers default from Python 3.5+
max_workers = (os.cpu_count() or 1) * 5
server = grpc.server(futures.ThreadPoolExecutor(max_workers))
+ cache_cleaner = _CacheCleaner(cas, max_head_size, min_head_size)
+
bytestream_pb2_grpc.add_ByteStreamServicer_to_server(
- _ByteStreamServicer(cas, enable_push=enable_push), server)
+ _ByteStreamServicer(cas, cache_cleaner, enable_push=enable_push), server)
remote_execution_pb2_grpc.add_ContentAddressableStorageServicer_to_server(
- _ContentAddressableStorageServicer(cas, enable_push=enable_push), server)
+ _ContentAddressableStorageServicer(cas, cache_cleaner, enable_push=enable_push), server)
remote_execution_pb2_grpc.add_CapabilitiesServicer_to_server(
_CapabilitiesServicer(), server)
@@ -86,9 +90,19 @@ def create_server(repo, *, enable_push):
@click.option('--client-certs', help="Public client certificates for TLS (PEM-encoded)")
@click.option('--enable-push', default=False, is_flag=True,
help="Allow clients to upload blobs and update artifact cache")
+@click.option('--head-room-min', type=click.INT,
+ help="Disk head room minimum in bytes",
+ default=2e9)
+@click.option('--head-room-max', type=click.INT,
+ help="Disk head room maximum in bytes",
+ default=10e9)
@click.argument('repo')
-def server_main(repo, port, server_key, server_cert, client_certs, enable_push):
- server = create_server(repo, enable_push=enable_push)
+def server_main(repo, port, server_key, server_cert, client_certs, enable_push,
+ head_room_min, head_room_max):
+ server = create_server(repo,
+ max_head_size=head_room_max,
+ min_head_size=head_room_min,
+ enable_push=enable_push)
use_tls = bool(server_key)
@@ -130,10 +144,11 @@ def server_main(repo, port, server_key, server_cert, client_certs, enable_push):
class _ByteStreamServicer(bytestream_pb2_grpc.ByteStreamServicer):
- def __init__(self, cas, *, enable_push):
+ def __init__(self, cas, cache_cleaner, *, enable_push):
super().__init__()
self.cas = cas
self.enable_push = enable_push
+ self.cache_cleaner = cache_cleaner
def Read(self, request, context):
resource_name = request.resource_name
@@ -195,7 +210,7 @@ class _ByteStreamServicer(bytestream_pb2_grpc.ByteStreamServicer):
if client_digest.size_bytes == 0:
break
try:
- _clean_up_cache(self.cas, client_digest.size_bytes)
+ self.cache_cleaner.clean_up(client_digest.size_bytes)
except ArtifactTooLargeException as e:
context.set_code(grpc.StatusCode.RESOURCE_EXHAUSTED)
context.set_details(str(e))
@@ -239,10 +254,11 @@ class _ByteStreamServicer(bytestream_pb2_grpc.ByteStreamServicer):
class _ContentAddressableStorageServicer(remote_execution_pb2_grpc.ContentAddressableStorageServicer):
- def __init__(self, cas, *, enable_push):
+ def __init__(self, cas, cache_cleaner, *, enable_push):
super().__init__()
self.cas = cas
self.enable_push = enable_push
+ self.cache_cleaner = cache_cleaner
def FindMissingBlobs(self, request, context):
response = remote_execution_pb2.FindMissingBlobsResponse()
@@ -311,7 +327,7 @@ class _ContentAddressableStorageServicer(remote_execution_pb2_grpc.ContentAddres
continue
try:
- _clean_up_cache(self.cas, digest.size_bytes)
+ self.cache_cleaner.clean_up(digest.size_bytes)
with tempfile.NamedTemporaryFile(dir=self.cas.tmpdir) as out:
out.write(blob_request.data)
@@ -432,63 +448,70 @@ def _digest_from_upload_resource_name(resource_name):
return None
-# _clean_up_cache()
-#
-# Keep removing Least Recently Pushed (LRP) artifacts in a cache until there
-# is enough space for the incoming artifact
-#
-# Args:
-# cas: CASCache object
-# object_size: The size of the object being received in bytes
-#
-# Returns:
-# int: The total bytes removed on the filesystem
-#
-def _clean_up_cache(cas, object_size):
- # Determine the available disk space, in bytes, of the file system
- # which mounts the repo
- stats = os.statvfs(cas.casdir)
- buffer_ = int(2e9) # Add a 2 GB buffer
- free_disk_space = (stats.f_bavail * stats.f_bsize) - buffer_
- total_disk_space = (stats.f_blocks * stats.f_bsize) - buffer_
-
- if object_size > total_disk_space:
- raise ArtifactTooLargeException("Artifact of size: {} is too large for "
- "the filesystem which mounts the remote "
- "cache".format(object_size))
-
- if object_size <= free_disk_space:
- # No need to clean up
- return 0
-
- # obtain a list of LRP artifacts
- LRP_objects = cas.list_objects()
-
- removed_size = 0 # in bytes
- last_mtime = 0
- while object_size - removed_size > free_disk_space:
- try:
- last_mtime, to_remove = LRP_objects.pop(0) # The first element in the list is the LRP objects
- except IndexError:
- # This exception is caught if there are no more artifacts in the list
- # LRP_artifacts. This means the the artifact is too large for the filesystem
- # so we abort the process
- raise ArtifactTooLargeException("Artifact of size {} is too large for "
+class _CacheCleaner:
+
+ def __init__(self, cas, max_head_size, min_head_size=int(2e9)):
+ self.__cas = cas
+ self.__max_head_size = max_head_size
+ self.__min_head_size = min_head_size
+
+ # _clean_up_cache()
+ #
+ # Keep removing Least Recently Pushed (LRP) artifacts in a cache until there
+ # is enough space for the incoming artifact
+ #
+ # Args:
+ # object_size: The size of the object being received in bytes
+ #
+ # Returns:
+ # int: The total bytes removed on the filesystem
+ #
+ def clean_up(self, object_size):
+ stats = os.statvfs(self.__cas.casdir)
+ free_disk_space = (stats.f_bavail * stats.f_bsize) - self.__min_head_size
+ total_disk_space = (stats.f_blocks * stats.f_bsize) - self.__min_head_size
+
+ if object_size > total_disk_space:
+ raise ArtifactTooLargeException("Artifact of size: {} is too large for "
"the filesystem which mounts the remote "
"cache".format(object_size))
- try:
- size = os.stat(to_remove).st_size
- os.unlink(to_remove)
- removed_size += size
- except FileNotFoundError:
- pass
+ if object_size <= free_disk_space:
+ # No need to clean up
+ return 0
- cas.clean_up_refs_until(last_mtime)
+ stats = os.statvfs(self.__cas.casdir)
+ target_disk_space = (stats.f_bavail * stats.f_bsize) - self.__max_head_size
- if removed_size > 0:
- logging.info("Successfully removed {} bytes from the cache".format(removed_size))
- else:
- logging.info("No artifacts were removed from the cache.")
+ # obtain a list of LRP artifacts
+ LRP_objects = self.__cas.list_objects()
+
+ removed_size = 0 # in bytes
+ last_mtime = 0
+
+ while object_size - removed_size > target_disk_space:
+ try:
+ last_mtime, to_remove = LRP_objects.pop(0) # The first element in the list is the LRP artifact
+ except IndexError:
+ # This exception is caught if there are no more artifacts in the list
+ # LRP_artifacts. This means the the artifact is too large for the filesystem
+ # so we abort the process
+ raise ArtifactTooLargeException("Artifact of size {} is too large for "
+ "the filesystem which mounts the remote "
+ "cache".format(object_size))
+
+ try:
+ size = os.stat(to_remove).st_size
+ os.unlink(to_remove)
+ removed_size += size
+ except FileNotFoundError:
+ pass
+
+ self.__cas.clean_up_refs_until(last_mtime)
+
+ if removed_size > 0:
+ logging.info("Successfully removed {} bytes from the cache".format(removed_size))
+ else:
+ logging.info("No artifacts were removed from the cache.")
- return removed_size
+ return removed_size
diff --git a/tests/frontend/push.py b/tests/frontend/push.py
index f2d6814d6..153d43340 100644
--- a/tests/frontend/push.py
+++ b/tests/frontend/push.py
@@ -230,6 +230,8 @@ def test_artifact_expires(cli, datafiles, tmpdir):
# Create an artifact share (remote artifact cache) in the tmpdir/artifactshare
# Mock a file system with 12 MB free disk space
with create_artifact_share(os.path.join(str(tmpdir), 'artifactshare'),
+ min_head_size=int(2e9),
+ max_head_size=int(2e9),
total_space=int(10e9), free_space=(int(12e6) + int(2e9))) as share:
# Configure bst to push to the cache
@@ -313,6 +315,8 @@ def test_recently_pulled_artifact_does_not_expire(cli, datafiles, tmpdir):
# Create an artifact share (remote cache) in tmpdir/artifactshare
# Mock a file system with 12 MB free disk space
with create_artifact_share(os.path.join(str(tmpdir), 'artifactshare'),
+ min_head_size=int(2e9),
+ max_head_size=int(2e9),
total_space=int(10e9), free_space=(int(12e6) + int(2e9))) as share:
# Configure bst to push to the cache
diff --git a/tests/testutils/artifactshare.py b/tests/testutils/artifactshare.py
index c1ddc2c46..38c54a947 100644
--- a/tests/testutils/artifactshare.py
+++ b/tests/testutils/artifactshare.py
@@ -29,7 +29,11 @@ from buildstream._protos.build.bazel.remote.execution.v2 import remote_execution
#
class ArtifactShare():
- def __init__(self, directory, *, total_space=None, free_space=None):
+ def __init__(self, directory, *,
+ total_space=None,
+ free_space=None,
+ min_head_size=int(2e9),
+ max_head_size=int(10e9)):
# The working directory for the artifact share (in case it
# needs to do something outside of its backend's storage folder).
@@ -50,6 +54,9 @@ class ArtifactShare():
self.total_space = total_space
self.free_space = free_space
+ self.max_head_size = max_head_size
+ self.min_head_size = min_head_size
+
q = Queue()
self.process = Process(target=self.run, args=(q,))
@@ -74,7 +81,10 @@ class ArtifactShare():
self.free_space = self.total_space
os.statvfs = self._mock_statvfs
- server = create_server(self.repodir, enable_push=True)
+ server = create_server(self.repodir,
+ max_head_size=self.max_head_size,
+ min_head_size=self.min_head_size,
+ enable_push=True)
port = server.add_insecure_port('localhost:0')
server.start()
@@ -176,8 +186,11 @@ class ArtifactShare():
# Create an ArtifactShare for use in a test case
#
@contextmanager
-def create_artifact_share(directory, *, total_space=None, free_space=None):
- share = ArtifactShare(directory, total_space=total_space, free_space=free_space)
+def create_artifact_share(directory, *, total_space=None, free_space=None,
+ min_head_size=int(2e9),
+ max_head_size=int(10e9)):
+ share = ArtifactShare(directory, total_space=total_space, free_space=free_space,
+ min_head_size=min_head_size, max_head_size=max_head_size)
try:
yield share
finally: