# # Copyright (C) 2018 Codethink Limited # # This program is free software; you can redistribute it and/or # modify it under the terms of the GNU Lesser General Public # License as published by the Free Software Foundation; either # version 2 of the License, or (at your option) any later version. # # This library is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU # Lesser General Public License for more details. # # You should have received a copy of the GNU Lesser General Public # License along with this library. If not, see . # # Authors: # Jürg Billeter from concurrent import futures import logging import os import signal import sys import tempfile import uuid import errno import threading import click import grpc from .._protos.build.bazel.remote.execution.v2 import remote_execution_pb2, remote_execution_pb2_grpc from .._protos.google.bytestream import bytestream_pb2, bytestream_pb2_grpc from .._protos.buildstream.v2 import buildstream_pb2, buildstream_pb2_grpc from .._exceptions import ArtifactError from .._context import Context # The default limit for gRPC messages is 4 MiB. # Limit payload to 1 MiB to leave sufficient headroom for metadata. _MAX_PAYLOAD_BYTES = 1024 * 1024 # Trying to push an artifact that is too large class ArtifactTooLargeException(Exception): pass # We need a message handler because this will own an ArtifactCache # which can in turn fire messages. def message_handler(message, context): logging.info(message.message) logging.info(message.detail) # create_server(): # # Create gRPC CAS artifact server as specified in the Remote Execution API. # # Args: # repo (str): Path to CAS repository # enable_push (bool): Whether to allow blob uploads and artifact updates # def create_server(repo, *, enable_push, max_head_size=int(10e9), min_head_size=int(2e9)): context = Context() context.artifactdir = os.path.abspath(repo) context.set_message_handler(message_handler) artifactcache = context.artifactcache # Use max_workers default from Python 3.5+ max_workers = (os.cpu_count() or 1) * 5 server = grpc.server(futures.ThreadPoolExecutor(max_workers)) cache_cleaner = _CacheCleaner(artifactcache, max_head_size, min_head_size) bytestream_pb2_grpc.add_ByteStreamServicer_to_server( _ByteStreamServicer(artifactcache, cache_cleaner, enable_push=enable_push), server) remote_execution_pb2_grpc.add_ContentAddressableStorageServicer_to_server( _ContentAddressableStorageServicer(artifactcache, cache_cleaner, enable_push=enable_push), server) remote_execution_pb2_grpc.add_CapabilitiesServicer_to_server( _CapabilitiesServicer(), server) buildstream_pb2_grpc.add_ReferenceStorageServicer_to_server( _ReferenceStorageServicer(artifactcache, enable_push=enable_push), server) return server @click.command(short_help="CAS Artifact Server") @click.option('--port', '-p', type=click.INT, required=True, help="Port number") @click.option('--server-key', help="Private server key for TLS (PEM-encoded)") @click.option('--server-cert', help="Public server certificate for TLS (PEM-encoded)") @click.option('--client-certs', help="Public client certificates for TLS (PEM-encoded)") @click.option('--enable-push', default=False, is_flag=True, help="Allow clients to upload blobs and update artifact cache") @click.option('--head-room-min', type=click.INT, help="Disk head room minimum in bytes", default=2e9) @click.option('--head-room-max', type=click.INT, help="Disk head room maximum in bytes", default=10e9) @click.argument('repo') def server_main(repo, port, server_key, server_cert, client_certs, enable_push, head_room_min, head_room_max): server = create_server(repo, max_head_size=head_room_max, min_head_size=head_room_min, enable_push=enable_push) use_tls = bool(server_key) if bool(server_cert) != use_tls: click.echo("ERROR: --server-key and --server-cert are both required for TLS", err=True) sys.exit(-1) if client_certs and not use_tls: click.echo("ERROR: --client-certs can only be used with --server-key", err=True) sys.exit(-1) if use_tls: # Read public/private key pair with open(server_key, 'rb') as f: server_key_bytes = f.read() with open(server_cert, 'rb') as f: server_cert_bytes = f.read() if client_certs: with open(client_certs, 'rb') as f: client_certs_bytes = f.read() else: client_certs_bytes = None credentials = grpc.ssl_server_credentials([(server_key_bytes, server_cert_bytes)], root_certificates=client_certs_bytes, require_client_auth=bool(client_certs)) server.add_secure_port('[::]:{}'.format(port), credentials) else: server.add_insecure_port('[::]:{}'.format(port)) # Run artifact server server.start() try: while True: signal.pause() except KeyboardInterrupt: server.stop(0) class _ByteStreamServicer(bytestream_pb2_grpc.ByteStreamServicer): def __init__(self, cas, cache_cleaner, *, enable_push): super().__init__() self.cas = cas self.enable_push = enable_push self.cache_cleaner = cache_cleaner def Read(self, request, context): resource_name = request.resource_name client_digest = _digest_from_download_resource_name(resource_name) if client_digest is None: context.set_code(grpc.StatusCode.NOT_FOUND) return if request.read_offset > client_digest.size_bytes: context.set_code(grpc.StatusCode.OUT_OF_RANGE) return try: with open(self.cas.objpath(client_digest), 'rb') as f: if os.fstat(f.fileno()).st_size != client_digest.size_bytes: context.set_code(grpc.StatusCode.NOT_FOUND) return if request.read_offset > 0: f.seek(request.read_offset) remaining = client_digest.size_bytes - request.read_offset while remaining > 0: chunk_size = min(remaining, _MAX_PAYLOAD_BYTES) remaining -= chunk_size response = bytestream_pb2.ReadResponse() # max. 64 kB chunks response.data = f.read(chunk_size) yield response except FileNotFoundError: context.set_code(grpc.StatusCode.NOT_FOUND) def Write(self, request_iterator, context): response = bytestream_pb2.WriteResponse() if not self.enable_push: context.set_code(grpc.StatusCode.PERMISSION_DENIED) return response offset = 0 finished = False resource_name = None with tempfile.NamedTemporaryFile(dir=self.cas.tmpdir) as out: for request in request_iterator: if finished or request.write_offset != offset: context.set_code(grpc.StatusCode.FAILED_PRECONDITION) return response if resource_name is None: # First request resource_name = request.resource_name client_digest = _digest_from_upload_resource_name(resource_name) if client_digest is None: context.set_code(grpc.StatusCode.NOT_FOUND) return response while True: if client_digest.size_bytes == 0: break try: self.cache_cleaner.clean_up(client_digest.size_bytes) except ArtifactTooLargeException as e: context.set_code(grpc.StatusCode.RESOURCE_EXHAUSTED) context.set_details(str(e)) return response try: os.posix_fallocate(out.fileno(), 0, client_digest.size_bytes) break except OSError as e: # Multiple upload can happen in the same time if e.errno != errno.ENOSPC: raise elif request.resource_name: # If it is set on subsequent calls, it **must** match the value of the first request. if request.resource_name != resource_name: context.set_code(grpc.StatusCode.FAILED_PRECONDITION) return response if (offset + len(request.data)) > client_digest.size_bytes: context.set_code(grpc.StatusCode.FAILED_PRECONDITION) return response out.write(request.data) offset += len(request.data) if request.finish_write: if client_digest.size_bytes != offset: context.set_code(grpc.StatusCode.FAILED_PRECONDITION) return response out.flush() digest = self.cas.add_object(path=out.name, link_directly=True) if digest.hash != client_digest.hash: context.set_code(grpc.StatusCode.FAILED_PRECONDITION) return response finished = True assert finished response.committed_size = offset return response class _ContentAddressableStorageServicer(remote_execution_pb2_grpc.ContentAddressableStorageServicer): def __init__(self, cas, cache_cleaner, *, enable_push): super().__init__() self.cas = cas self.enable_push = enable_push self.cache_cleaner = cache_cleaner def FindMissingBlobs(self, request, context): response = remote_execution_pb2.FindMissingBlobsResponse() for digest in request.blob_digests: objpath = self.cas.objpath(digest) try: os.utime(objpath) except OSError as e: if e.errno != errno.ENOENT: raise else: d = response.missing_blob_digests.add() d.hash = digest.hash d.size_bytes = digest.size_bytes return response def BatchReadBlobs(self, request, context): response = remote_execution_pb2.BatchReadBlobsResponse() batch_size = 0 for digest in request.digests: batch_size += digest.size_bytes if batch_size > _MAX_PAYLOAD_BYTES: context.set_code(grpc.StatusCode.INVALID_ARGUMENT) return response blob_response = response.responses.add() blob_response.digest.hash = digest.hash blob_response.digest.size_bytes = digest.size_bytes try: with open(self.cas.objpath(digest), 'rb') as f: if os.fstat(f.fileno()).st_size != digest.size_bytes: blob_response.status.code = grpc.StatusCode.NOT_FOUND.value[0] continue blob_response.data = f.read(digest.size_bytes) except FileNotFoundError: blob_response.status.code = grpc.StatusCode.NOT_FOUND.value[0] return response def BatchUpdateBlobs(self, request, context): response = remote_execution_pb2.BatchUpdateBlobsResponse() if not self.enable_push: context.set_code(grpc.StatusCode.PERMISSION_DENIED) return response batch_size = 0 for blob_request in request.requests: digest = blob_request.digest batch_size += digest.size_bytes if batch_size > _MAX_PAYLOAD_BYTES: context.set_code(grpc.StatusCode.INVALID_ARGUMENT) return response blob_response = response.responses.add() blob_response.digest.hash = digest.hash blob_response.digest.size_bytes = digest.size_bytes if len(blob_request.data) != digest.size_bytes: blob_response.status.code = grpc.StatusCode.FAILED_PRECONDITION continue try: self.cache_cleaner.clean_up(digest.size_bytes) with tempfile.NamedTemporaryFile(dir=self.cas.tmpdir) as out: out.write(blob_request.data) out.flush() server_digest = self.cas.add_object(path=out.name) if server_digest.hash != digest.hash: blob_response.status.code = grpc.StatusCode.FAILED_PRECONDITION except ArtifactTooLargeException: blob_response.status.code = grpc.StatusCode.RESOURCE_EXHAUSTED return response class _CapabilitiesServicer(remote_execution_pb2_grpc.CapabilitiesServicer): def GetCapabilities(self, request, context): response = remote_execution_pb2.ServerCapabilities() cache_capabilities = response.cache_capabilities cache_capabilities.digest_function.append(remote_execution_pb2.SHA256) cache_capabilities.action_cache_update_capabilities.update_enabled = False cache_capabilities.max_batch_total_size_bytes = _MAX_PAYLOAD_BYTES cache_capabilities.symlink_absolute_path_strategy = remote_execution_pb2.CacheCapabilities.ALLOWED response.deprecated_api_version.major = 2 response.low_api_version.major = 2 response.high_api_version.major = 2 return response class _ReferenceStorageServicer(buildstream_pb2_grpc.ReferenceStorageServicer): def __init__(self, cas, *, enable_push): super().__init__() self.cas = cas self.enable_push = enable_push def GetReference(self, request, context): response = buildstream_pb2.GetReferenceResponse() try: tree = self.cas.resolve_ref(request.key, update_mtime=True) try: self.cas.update_tree_mtime(tree) except FileNotFoundError: self.cas.remove(request.key, defer_prune=True) context.set_code(grpc.StatusCode.NOT_FOUND) return response response.digest.hash = tree.hash response.digest.size_bytes = tree.size_bytes except ArtifactError: context.set_code(grpc.StatusCode.NOT_FOUND) return response def UpdateReference(self, request, context): response = buildstream_pb2.UpdateReferenceResponse() if not self.enable_push: context.set_code(grpc.StatusCode.PERMISSION_DENIED) return response for key in request.keys: self.cas.set_ref(key, request.digest) return response def Status(self, request, context): response = buildstream_pb2.StatusResponse() response.allow_updates = self.enable_push return response def _digest_from_download_resource_name(resource_name): parts = resource_name.split('/') # Accept requests from non-conforming BuildStream 1.1.x clients if len(parts) == 2: parts.insert(0, 'blobs') if len(parts) != 3 or parts[0] != 'blobs': return None try: digest = remote_execution_pb2.Digest() digest.hash = parts[1] digest.size_bytes = int(parts[2]) return digest except ValueError: return None def _digest_from_upload_resource_name(resource_name): parts = resource_name.split('/') # Accept requests from non-conforming BuildStream 1.1.x clients if len(parts) == 2: parts.insert(0, 'uploads') parts.insert(1, str(uuid.uuid4())) parts.insert(2, 'blobs') if len(parts) < 5 or parts[0] != 'uploads' or parts[2] != 'blobs': return None try: uuid_ = uuid.UUID(hex=parts[1]) if uuid_.version != 4: return None digest = remote_execution_pb2.Digest() digest.hash = parts[3] digest.size_bytes = int(parts[4]) return digest except ValueError: return None class _CacheCleaner: __cleanup_cache_lock = threading.Lock() def __init__(self, cas, max_head_size, min_head_size=int(2e9)): self.__cas = cas self.__max_head_size = max_head_size self.__min_head_size = min_head_size def __has_space(self, object_size): stats = os.statvfs(self.__cas.casdir) free_disk_space = (stats.f_bavail * stats.f_bsize) - self.__min_head_size total_disk_space = (stats.f_blocks * stats.f_bsize) - self.__min_head_size if object_size > total_disk_space: raise ArtifactTooLargeException("Artifact of size: {} is too large for " "the filesystem which mounts the remote " "cache".format(object_size)) return object_size <= free_disk_space # _clean_up_cache() # # Keep removing Least Recently Pushed (LRP) artifacts in a cache until there # is enough space for the incoming artifact # # Args: # object_size: The size of the object being received in bytes # # Returns: # int: The total bytes removed on the filesystem # def clean_up(self, object_size): if self.__has_space(object_size): return 0 with _CacheCleaner.__cleanup_cache_lock: if self.__has_space(object_size): # Another thread has done the cleanup for us return 0 stats = os.statvfs(self.__cas.casdir) target_disk_space = (stats.f_bavail * stats.f_bsize) - self.__max_head_size # obtain a list of LRP artifacts LRP_objects = self.__cas.list_objects() removed_size = 0 # in bytes last_mtime = 0 while object_size - removed_size > target_disk_space: try: last_mtime, to_remove = LRP_objects.pop(0) # The first element in the list is the LRP artifact except IndexError: # This exception is caught if there are no more artifacts in the list # LRP_artifacts. This means the the artifact is too large for the filesystem # so we abort the process raise ArtifactTooLargeException("Artifact of size {} is too large for " "the filesystem which mounts the remote " "cache".format(object_size)) try: size = os.stat(to_remove).st_size os.unlink(to_remove) removed_size += size except FileNotFoundError: pass self.__cas.clean_up_refs_until(last_mtime) if removed_size > 0: logging.info("Successfully removed {} bytes from the cache".format(removed_size)) else: logging.info("No artifacts were removed from the cache.") return removed_size