summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJürg Billeter <j@bitron.ch>2019-06-06 15:40:37 +0200
committerJürg Billeter <j@bitron.ch>2019-08-20 07:41:23 +0200
commit32191bab48d7b8c8965ddbcc2cae93802028f29b (patch)
tree7cc4a04b05d3192f3ef54890d7840694c168bd0d
parente1204be7ece0b55536b9860643d901158769019c (diff)
downloadbuildstream-32191bab48d7b8c8965ddbcc2cae93802028f29b.tar.gz
casserver.py: Remove CacheCleaner
Cache expiry will be performed by buildbox-casd.
-rw-r--r--src/buildstream/_cas/casserver.py121
-rw-r--r--tests/frontend/push.py3
2 files changed, 13 insertions, 111 deletions
diff --git a/src/buildstream/_cas/casserver.py b/src/buildstream/_cas/casserver.py
index 2ed51bded..0975023ef 100644
--- a/src/buildstream/_cas/casserver.py
+++ b/src/buildstream/_cas/casserver.py
@@ -18,14 +18,12 @@
# Jürg Billeter <juerg.billeter@codethink.co.uk>
from concurrent import futures
-import logging
import os
import signal
import sys
import tempfile
import uuid
import errno
-import threading
import grpc
from google.protobuf.message import DecodeError
@@ -48,11 +46,6 @@ from .cascache import CASCache
_MAX_PAYLOAD_BYTES = 1024 * 1024
-# Trying to push an artifact that is too large
-class ArtifactTooLargeException(Exception):
- pass
-
-
# create_server():
#
# Create gRPC CAS artifact server as specified in the Remote Execution API.
@@ -72,13 +65,11 @@ def create_server(repo, *, enable_push,
max_workers = (os.cpu_count() or 1) * 5
server = grpc.server(futures.ThreadPoolExecutor(max_workers))
- cache_cleaner = _CacheCleaner(cas, max_head_size, min_head_size)
-
bytestream_pb2_grpc.add_ByteStreamServicer_to_server(
- _ByteStreamServicer(cas, cache_cleaner, enable_push=enable_push), server)
+ _ByteStreamServicer(cas, enable_push=enable_push), server)
remote_execution_pb2_grpc.add_ContentAddressableStorageServicer_to_server(
- _ContentAddressableStorageServicer(cas, cache_cleaner, enable_push=enable_push), server)
+ _ContentAddressableStorageServicer(cas, enable_push=enable_push), server)
remote_execution_pb2_grpc.add_CapabilitiesServicer_to_server(
_CapabilitiesServicer(), server)
@@ -165,11 +156,10 @@ def server_main(repo, port, server_key, server_cert, client_certs, enable_push,
class _ByteStreamServicer(bytestream_pb2_grpc.ByteStreamServicer):
- def __init__(self, cas, cache_cleaner, *, enable_push):
+ def __init__(self, cas, *, enable_push):
super().__init__()
self.cas = cas
self.enable_push = enable_push
- self.cache_cleaner = cache_cleaner
def Read(self, request, context):
resource_name = request.resource_name
@@ -230,12 +220,6 @@ class _ByteStreamServicer(bytestream_pb2_grpc.ByteStreamServicer):
while True:
if client_digest.size_bytes == 0:
break
- try:
- self.cache_cleaner.clean_up(client_digest.size_bytes)
- except ArtifactTooLargeException as e:
- context.set_code(grpc.StatusCode.RESOURCE_EXHAUSTED)
- context.set_details(str(e))
- return response
try:
os.posix_fallocate(out.fileno(), 0, client_digest.size_bytes)
@@ -275,11 +259,10 @@ class _ByteStreamServicer(bytestream_pb2_grpc.ByteStreamServicer):
class _ContentAddressableStorageServicer(remote_execution_pb2_grpc.ContentAddressableStorageServicer):
- def __init__(self, cas, cache_cleaner, *, enable_push):
+ def __init__(self, cas, *, enable_push):
super().__init__()
self.cas = cas
self.enable_push = enable_push
- self.cache_cleaner = cache_cleaner
def FindMissingBlobs(self, request, context):
response = remote_execution_pb2.FindMissingBlobsResponse()
@@ -347,18 +330,12 @@ class _ContentAddressableStorageServicer(remote_execution_pb2_grpc.ContentAddres
blob_response.status.code = code_pb2.FAILED_PRECONDITION
continue
- try:
- self.cache_cleaner.clean_up(digest.size_bytes)
-
- with tempfile.NamedTemporaryFile(dir=self.cas.tmpdir) as out:
- out.write(blob_request.data)
- out.flush()
- server_digest = self.cas.add_object(path=out.name)
- if server_digest.hash != digest.hash:
- blob_response.status.code = code_pb2.FAILED_PRECONDITION
-
- except ArtifactTooLargeException:
- blob_response.status.code = code_pb2.RESOURCE_EXHAUSTED
+ with tempfile.NamedTemporaryFile(dir=self.cas.tmpdir) as out:
+ out.write(blob_request.data)
+ out.flush()
+ server_digest = self.cas.add_object(path=out.name)
+ if server_digest.hash != digest.hash:
+ blob_response.status.code = code_pb2.FAILED_PRECONDITION
return response
@@ -602,81 +579,3 @@ def _digest_from_upload_resource_name(resource_name):
return digest
except ValueError:
return None
-
-
-class _CacheCleaner:
-
- __cleanup_cache_lock = threading.Lock()
-
- def __init__(self, cas, max_head_size, min_head_size=int(2e9)):
- self.__cas = cas
- self.__max_head_size = max_head_size
- self.__min_head_size = min_head_size
-
- def __has_space(self, object_size):
- stats = os.statvfs(self.__cas.casdir)
- free_disk_space = (stats.f_bavail * stats.f_bsize) - self.__min_head_size
- total_disk_space = (stats.f_blocks * stats.f_bsize) - self.__min_head_size
-
- if object_size > total_disk_space:
- raise ArtifactTooLargeException("Artifact of size: {} is too large for "
- "the filesystem which mounts the remote "
- "cache".format(object_size))
-
- return object_size <= free_disk_space
-
- # _clean_up_cache()
- #
- # Keep removing Least Recently Pushed (LRP) artifacts in a cache until there
- # is enough space for the incoming artifact
- #
- # Args:
- # object_size: The size of the object being received in bytes
- #
- # Returns:
- # int: The total bytes removed on the filesystem
- #
- def clean_up(self, object_size):
- if self.__has_space(object_size):
- return 0
-
- with _CacheCleaner.__cleanup_cache_lock:
- if self.__has_space(object_size):
- # Another thread has done the cleanup for us
- return 0
-
- stats = os.statvfs(self.__cas.casdir)
- target_disk_space = (stats.f_bavail * stats.f_bsize) - self.__max_head_size
-
- # obtain a list of LRP artifacts
- LRP_objects = self.__cas.list_objects()
-
- removed_size = 0 # in bytes
- last_mtime = 0
-
- while object_size - removed_size > target_disk_space:
- try:
- last_mtime, to_remove = LRP_objects.pop(0) # The first element in the list is the LRP artifact
- except IndexError:
- # This exception is caught if there are no more artifacts in the list
- # LRP_artifacts. This means the the artifact is too large for the filesystem
- # so we abort the process
- raise ArtifactTooLargeException("Artifact of size {} is too large for "
- "the filesystem which mounts the remote "
- "cache".format(object_size))
-
- try:
- size = os.stat(to_remove).st_size
- os.unlink(to_remove)
- removed_size += size
- except FileNotFoundError:
- pass
-
- self.__cas.clean_up_refs_until(last_mtime)
-
- if removed_size > 0:
- logging.info("Successfully removed %d bytes from the cache", removed_size)
- else:
- logging.info("No artifacts were removed from the cache.")
-
- return removed_size
diff --git a/tests/frontend/push.py b/tests/frontend/push.py
index e92646154..e0a6c1e99 100644
--- a/tests/frontend/push.py
+++ b/tests/frontend/push.py
@@ -287,6 +287,7 @@ def test_push_after_pull(cli, tmpdir, datafiles):
# the least recently pushed artifact is deleted in order to make room for
# the incoming artifact.
@pytest.mark.datafiles(DATA_DIR)
+@pytest.mark.xfail()
def test_artifact_expires(cli, datafiles, tmpdir):
project = str(datafiles)
element_path = 'elements'
@@ -342,6 +343,7 @@ def test_artifact_expires(cli, datafiles, tmpdir):
# Test that a large artifact, whose size exceeds the quota, is not pushed
# to the remote share
@pytest.mark.datafiles(DATA_DIR)
+@pytest.mark.xfail()
def test_artifact_too_large(cli, datafiles, tmpdir):
project = str(datafiles)
element_path = 'elements'
@@ -378,6 +380,7 @@ def test_artifact_too_large(cli, datafiles, tmpdir):
# Test that when an element is pulled recently, it is not considered the LRU element.
@pytest.mark.datafiles(DATA_DIR)
+@pytest.mark.xfail()
def test_recently_pulled_artifact_does_not_expire(cli, datafiles, tmpdir):
project = str(datafiles)
element_path = 'elements'