summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJürg Billeter <j@bitron.ch>2018-09-10 16:07:30 +0000
committerJürg Billeter <j@bitron.ch>2018-09-10 16:07:30 +0000
commitb3ffcdc8fcc8d150cf6d75acbc660379b3fc0fcb (patch)
treead38eda56b3237fc184390cc4cfb99f2869514be
parente0bb71b24c6e40a5dd2ee935352a301ae2113895 (diff)
parent1a7fb3cb325e396ff362a81997302adceef2a836 (diff)
downloadbuildstream-b3ffcdc8fcc8d150cf6d75acbc660379b3fc0fcb.tar.gz
Merge branch 'juerg/cas-batch' into 'master'
_artifactcache/casserver.py: Implement BatchReadBlobs Closes #632 See merge request BuildStream/buildstream!785
-rw-r--r--buildstream/_artifactcache/casserver.py49
1 files changed, 49 insertions, 0 deletions
diff --git a/buildstream/_artifactcache/casserver.py b/buildstream/_artifactcache/casserver.py
index 0af65729b..8c3ece27d 100644
--- a/buildstream/_artifactcache/casserver.py
+++ b/buildstream/_artifactcache/casserver.py
@@ -38,6 +38,10 @@ from .._context import Context
from .cascache import CASCache
+# The default limit for gRPC messages is 4 MiB
+_MAX_BATCH_TOTAL_SIZE_BYTES = 4 * 1024 * 1024
+
+
# Trying to push an artifact that is too large
class ArtifactTooLargeException(Exception):
pass
@@ -67,6 +71,9 @@ def create_server(repo, *, enable_push):
remote_execution_pb2_grpc.add_ContentAddressableStorageServicer_to_server(
_ContentAddressableStorageServicer(artifactcache), server)
+ remote_execution_pb2_grpc.add_CapabilitiesServicer_to_server(
+ _CapabilitiesServicer(), server)
+
buildstream_pb2_grpc.add_ReferenceStorageServicer_to_server(
_ReferenceStorageServicer(artifactcache, enable_push=enable_push), server)
@@ -229,6 +236,48 @@ class _ContentAddressableStorageServicer(remote_execution_pb2_grpc.ContentAddres
d.size_bytes = digest.size_bytes
return response
+ def BatchReadBlobs(self, request, context):
+ response = remote_execution_pb2.BatchReadBlobsResponse()
+ batch_size = 0
+
+ for digest in request.digests:
+ batch_size += digest.size_bytes
+ if batch_size > _MAX_BATCH_TOTAL_SIZE_BYTES:
+ context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
+ return response
+
+ blob_response = response.responses.add()
+ blob_response.digest.hash = digest.hash
+ blob_response.digest.size_bytes = digest.size_bytes
+ try:
+ with open(self.cas.objpath(digest), 'rb') as f:
+ if os.fstat(f.fileno()).st_size != digest.size_bytes:
+ blob_response.status.code = grpc.StatusCode.NOT_FOUND
+ continue
+
+ blob_response.data = f.read(digest.size_bytes)
+ except FileNotFoundError:
+ blob_response.status.code = grpc.StatusCode.NOT_FOUND
+
+ return response
+
+
+class _CapabilitiesServicer(remote_execution_pb2_grpc.CapabilitiesServicer):
+ def GetCapabilities(self, request, context):
+ response = remote_execution_pb2.ServerCapabilities()
+
+ cache_capabilities = response.cache_capabilities
+ cache_capabilities.digest_function.append(remote_execution_pb2.SHA256)
+ cache_capabilities.action_cache_update_capabilities.update_enabled = False
+ cache_capabilities.max_batch_total_size_bytes = _MAX_BATCH_TOTAL_SIZE_BYTES
+ cache_capabilities.symlink_absolute_path_strategy = remote_execution_pb2.CacheCapabilities.ALLOWED
+
+ response.deprecated_api_version.major = 2
+ response.low_api_version.major = 2
+ response.high_api_version.major = 2
+
+ return response
+
class _ReferenceStorageServicer(buildstream_pb2_grpc.ReferenceStorageServicer):
def __init__(self, cas, *, enable_push):