#
# Copyright (C) 2018 Codethink Limited
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License as published by the Free Software Foundation; either
# version 2 of the License, or (at your option) any later version.
#
# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with this library. If not, see .
#
# Authors:
# Jürg Billeter
from concurrent import futures
import logging
import os
import signal
import sys
import tempfile
import uuid
import errno
import threading
import click
import grpc
from .._protos.build.bazel.remote.execution.v2 import remote_execution_pb2, remote_execution_pb2_grpc
from .._protos.google.bytestream import bytestream_pb2, bytestream_pb2_grpc
from .._protos.buildstream.v2 import buildstream_pb2, buildstream_pb2_grpc
from .._exceptions import ArtifactError
from .._context import Context
# The default limit for gRPC messages is 4 MiB.
# Limit payload to 1 MiB to leave sufficient headroom for metadata.
_MAX_PAYLOAD_BYTES = 1024 * 1024
# Trying to push an artifact that is too large
class ArtifactTooLargeException(Exception):
pass
# We need a message handler because this will own an ArtifactCache
# which can in turn fire messages.
def message_handler(message, context):
logging.info(message.message)
logging.info(message.detail)
# create_server():
#
# Create gRPC CAS artifact server as specified in the Remote Execution API.
#
# Args:
# repo (str): Path to CAS repository
# enable_push (bool): Whether to allow blob uploads and artifact updates
#
def create_server(repo, *, enable_push,
max_head_size=int(10e9),
min_head_size=int(2e9)):
context = Context()
context.artifactdir = os.path.abspath(repo)
context.set_message_handler(message_handler)
artifactcache = context.artifactcache
# Use max_workers default from Python 3.5+
max_workers = (os.cpu_count() or 1) * 5
server = grpc.server(futures.ThreadPoolExecutor(max_workers))
cache_cleaner = _CacheCleaner(artifactcache, max_head_size, min_head_size)
bytestream_pb2_grpc.add_ByteStreamServicer_to_server(
_ByteStreamServicer(artifactcache, cache_cleaner, enable_push=enable_push), server)
remote_execution_pb2_grpc.add_ContentAddressableStorageServicer_to_server(
_ContentAddressableStorageServicer(artifactcache, cache_cleaner, enable_push=enable_push), server)
remote_execution_pb2_grpc.add_CapabilitiesServicer_to_server(
_CapabilitiesServicer(), server)
buildstream_pb2_grpc.add_ReferenceStorageServicer_to_server(
_ReferenceStorageServicer(artifactcache, enable_push=enable_push), server)
return server
@click.command(short_help="CAS Artifact Server")
@click.option('--port', '-p', type=click.INT, required=True, help="Port number")
@click.option('--server-key', help="Private server key for TLS (PEM-encoded)")
@click.option('--server-cert', help="Public server certificate for TLS (PEM-encoded)")
@click.option('--client-certs', help="Public client certificates for TLS (PEM-encoded)")
@click.option('--enable-push', default=False, is_flag=True,
help="Allow clients to upload blobs and update artifact cache")
@click.option('--head-room-min', type=click.INT,
help="Disk head room minimum in bytes",
default=2e9)
@click.option('--head-room-max', type=click.INT,
help="Disk head room maximum in bytes",
default=10e9)
@click.argument('repo')
def server_main(repo, port, server_key, server_cert, client_certs, enable_push,
head_room_min, head_room_max):
server = create_server(repo,
max_head_size=head_room_max,
min_head_size=head_room_min,
enable_push=enable_push)
use_tls = bool(server_key)
if bool(server_cert) != use_tls:
click.echo("ERROR: --server-key and --server-cert are both required for TLS", err=True)
sys.exit(-1)
if client_certs and not use_tls:
click.echo("ERROR: --client-certs can only be used with --server-key", err=True)
sys.exit(-1)
if use_tls:
# Read public/private key pair
with open(server_key, 'rb') as f:
server_key_bytes = f.read()
with open(server_cert, 'rb') as f:
server_cert_bytes = f.read()
if client_certs:
with open(client_certs, 'rb') as f:
client_certs_bytes = f.read()
else:
client_certs_bytes = None
credentials = grpc.ssl_server_credentials([(server_key_bytes, server_cert_bytes)],
root_certificates=client_certs_bytes,
require_client_auth=bool(client_certs))
server.add_secure_port('[::]:{}'.format(port), credentials)
else:
server.add_insecure_port('[::]:{}'.format(port))
# Run artifact server
server.start()
try:
while True:
signal.pause()
except KeyboardInterrupt:
server.stop(0)
class _ByteStreamServicer(bytestream_pb2_grpc.ByteStreamServicer):
def __init__(self, cas, cache_cleaner, *, enable_push):
super().__init__()
self.cas = cas
self.enable_push = enable_push
self.cache_cleaner = cache_cleaner
def Read(self, request, context):
resource_name = request.resource_name
client_digest = _digest_from_download_resource_name(resource_name)
if client_digest is None:
context.set_code(grpc.StatusCode.NOT_FOUND)
return
if request.read_offset > client_digest.size_bytes:
context.set_code(grpc.StatusCode.OUT_OF_RANGE)
return
try:
with open(self.cas.objpath(client_digest), 'rb') as f:
if os.fstat(f.fileno()).st_size != client_digest.size_bytes:
context.set_code(grpc.StatusCode.NOT_FOUND)
return
if request.read_offset > 0:
f.seek(request.read_offset)
remaining = client_digest.size_bytes - request.read_offset
while remaining > 0:
chunk_size = min(remaining, _MAX_PAYLOAD_BYTES)
remaining -= chunk_size
response = bytestream_pb2.ReadResponse()
# max. 64 kB chunks
response.data = f.read(chunk_size)
yield response
except FileNotFoundError:
context.set_code(grpc.StatusCode.NOT_FOUND)
def Write(self, request_iterator, context):
response = bytestream_pb2.WriteResponse()
if not self.enable_push:
context.set_code(grpc.StatusCode.PERMISSION_DENIED)
return response
offset = 0
finished = False
resource_name = None
with tempfile.NamedTemporaryFile(dir=self.cas.tmpdir) as out:
for request in request_iterator:
if finished or request.write_offset != offset:
context.set_code(grpc.StatusCode.FAILED_PRECONDITION)
return response
if resource_name is None:
# First request
resource_name = request.resource_name
client_digest = _digest_from_upload_resource_name(resource_name)
if client_digest is None:
context.set_code(grpc.StatusCode.NOT_FOUND)
return response
while True:
if client_digest.size_bytes == 0:
break
try:
self.cache_cleaner.clean_up(client_digest.size_bytes)
except ArtifactTooLargeException as e:
context.set_code(grpc.StatusCode.RESOURCE_EXHAUSTED)
context.set_details(str(e))
return response
try:
os.posix_fallocate(out.fileno(), 0, client_digest.size_bytes)
break
except OSError as e:
# Multiple upload can happen in the same time
if e.errno != errno.ENOSPC:
raise
elif request.resource_name:
# If it is set on subsequent calls, it **must** match the value of the first request.
if request.resource_name != resource_name:
context.set_code(grpc.StatusCode.FAILED_PRECONDITION)
return response
if (offset + len(request.data)) > client_digest.size_bytes:
context.set_code(grpc.StatusCode.FAILED_PRECONDITION)
return response
out.write(request.data)
offset += len(request.data)
if request.finish_write:
if client_digest.size_bytes != offset:
context.set_code(grpc.StatusCode.FAILED_PRECONDITION)
return response
out.flush()
digest = self.cas.add_object(path=out.name, link_directly=True)
if digest.hash != client_digest.hash:
context.set_code(grpc.StatusCode.FAILED_PRECONDITION)
return response
finished = True
assert finished
response.committed_size = offset
return response
class _ContentAddressableStorageServicer(remote_execution_pb2_grpc.ContentAddressableStorageServicer):
def __init__(self, cas, cache_cleaner, *, enable_push):
super().__init__()
self.cas = cas
self.enable_push = enable_push
self.cache_cleaner = cache_cleaner
def FindMissingBlobs(self, request, context):
response = remote_execution_pb2.FindMissingBlobsResponse()
for digest in request.blob_digests:
objpath = self.cas.objpath(digest)
try:
os.utime(objpath)
except OSError as e:
if e.errno != errno.ENOENT:
raise
else:
d = response.missing_blob_digests.add()
d.hash = digest.hash
d.size_bytes = digest.size_bytes
return response
def BatchReadBlobs(self, request, context):
response = remote_execution_pb2.BatchReadBlobsResponse()
batch_size = 0
for digest in request.digests:
batch_size += digest.size_bytes
if batch_size > _MAX_PAYLOAD_BYTES:
context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
return response
blob_response = response.responses.add()
blob_response.digest.hash = digest.hash
blob_response.digest.size_bytes = digest.size_bytes
try:
with open(self.cas.objpath(digest), 'rb') as f:
if os.fstat(f.fileno()).st_size != digest.size_bytes:
blob_response.status.code = grpc.StatusCode.NOT_FOUND.value[0]
continue
blob_response.data = f.read(digest.size_bytes)
except FileNotFoundError:
blob_response.status.code = grpc.StatusCode.NOT_FOUND.value[0]
return response
def BatchUpdateBlobs(self, request, context):
response = remote_execution_pb2.BatchUpdateBlobsResponse()
if not self.enable_push:
context.set_code(grpc.StatusCode.PERMISSION_DENIED)
return response
batch_size = 0
for blob_request in request.requests:
digest = blob_request.digest
batch_size += digest.size_bytes
if batch_size > _MAX_PAYLOAD_BYTES:
context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
return response
blob_response = response.responses.add()
blob_response.digest.hash = digest.hash
blob_response.digest.size_bytes = digest.size_bytes
if len(blob_request.data) != digest.size_bytes:
blob_response.status.code = grpc.StatusCode.FAILED_PRECONDITION
continue
try:
self.cache_cleaner.clean_up(digest.size_bytes)
with tempfile.NamedTemporaryFile(dir=self.cas.tmpdir) as out:
out.write(blob_request.data)
out.flush()
server_digest = self.cas.add_object(path=out.name)
if server_digest.hash != digest.hash:
blob_response.status.code = grpc.StatusCode.FAILED_PRECONDITION
except ArtifactTooLargeException:
blob_response.status.code = grpc.StatusCode.RESOURCE_EXHAUSTED
return response
class _CapabilitiesServicer(remote_execution_pb2_grpc.CapabilitiesServicer):
def GetCapabilities(self, request, context):
response = remote_execution_pb2.ServerCapabilities()
cache_capabilities = response.cache_capabilities
cache_capabilities.digest_function.append(remote_execution_pb2.SHA256)
cache_capabilities.action_cache_update_capabilities.update_enabled = False
cache_capabilities.max_batch_total_size_bytes = _MAX_PAYLOAD_BYTES
cache_capabilities.symlink_absolute_path_strategy = remote_execution_pb2.CacheCapabilities.ALLOWED
response.deprecated_api_version.major = 2
response.low_api_version.major = 2
response.high_api_version.major = 2
return response
class _ReferenceStorageServicer(buildstream_pb2_grpc.ReferenceStorageServicer):
def __init__(self, cas, *, enable_push):
super().__init__()
self.cas = cas
self.enable_push = enable_push
def GetReference(self, request, context):
response = buildstream_pb2.GetReferenceResponse()
try:
tree = self.cas.resolve_ref(request.key, update_mtime=True)
try:
self.cas.update_tree_mtime(tree)
except FileNotFoundError:
self.cas.remove(request.key, defer_prune=True)
context.set_code(grpc.StatusCode.NOT_FOUND)
return response
response.digest.hash = tree.hash
response.digest.size_bytes = tree.size_bytes
except ArtifactError:
context.set_code(grpc.StatusCode.NOT_FOUND)
return response
def UpdateReference(self, request, context):
response = buildstream_pb2.UpdateReferenceResponse()
if not self.enable_push:
context.set_code(grpc.StatusCode.PERMISSION_DENIED)
return response
for key in request.keys:
self.cas.set_ref(key, request.digest)
return response
def Status(self, request, context):
response = buildstream_pb2.StatusResponse()
response.allow_updates = self.enable_push
return response
def _digest_from_download_resource_name(resource_name):
parts = resource_name.split('/')
# Accept requests from non-conforming BuildStream 1.1.x clients
if len(parts) == 2:
parts.insert(0, 'blobs')
if len(parts) != 3 or parts[0] != 'blobs':
return None
try:
digest = remote_execution_pb2.Digest()
digest.hash = parts[1]
digest.size_bytes = int(parts[2])
return digest
except ValueError:
return None
def _digest_from_upload_resource_name(resource_name):
parts = resource_name.split('/')
# Accept requests from non-conforming BuildStream 1.1.x clients
if len(parts) == 2:
parts.insert(0, 'uploads')
parts.insert(1, str(uuid.uuid4()))
parts.insert(2, 'blobs')
if len(parts) < 5 or parts[0] != 'uploads' or parts[2] != 'blobs':
return None
try:
uuid_ = uuid.UUID(hex=parts[1])
if uuid_.version != 4:
return None
digest = remote_execution_pb2.Digest()
digest.hash = parts[3]
digest.size_bytes = int(parts[4])
return digest
except ValueError:
return None
class _CacheCleaner:
__cleanup_cache_lock = threading.Lock()
def __init__(self, cas, max_head_size, min_head_size=int(2e9)):
self.__cas = cas
self.__max_head_size = max_head_size
self.__min_head_size = min_head_size
def __has_space(self, object_size):
stats = os.statvfs(self.__cas.casdir)
free_disk_space = (stats.f_bavail * stats.f_bsize) - self.__min_head_size
total_disk_space = (stats.f_blocks * stats.f_bsize) - self.__min_head_size
if object_size > total_disk_space:
raise ArtifactTooLargeException("Artifact of size: {} is too large for "
"the filesystem which mounts the remote "
"cache".format(object_size))
return object_size <= free_disk_space
# _clean_up_cache()
#
# Keep removing Least Recently Pushed (LRP) artifacts in a cache until there
# is enough space for the incoming artifact
#
# Args:
# object_size: The size of the object being received in bytes
#
# Returns:
# int: The total bytes removed on the filesystem
#
def clean_up(self, object_size):
if self.__has_space(object_size):
return 0
with _CacheCleaner.__cleanup_cache_lock:
if self.__has_space(object_size):
# Another thread has done the cleanup for us
return 0
stats = os.statvfs(self.__cas.casdir)
target_disk_space = (stats.f_bavail * stats.f_bsize) - self.__max_head_size
# obtain a list of LRP artifacts
LRP_objects = self.__cas.list_objects()
removed_size = 0 # in bytes
last_mtime = 0
while object_size - removed_size > target_disk_space:
try:
last_mtime, to_remove = LRP_objects.pop(0) # The first element in the list is the LRP artifact
except IndexError:
# This exception is caught if there are no more artifacts in the list
# LRP_artifacts. This means the the artifact is too large for the filesystem
# so we abort the process
raise ArtifactTooLargeException("Artifact of size {} is too large for "
"the filesystem which mounts the remote "
"cache".format(object_size))
try:
size = os.stat(to_remove).st_size
os.unlink(to_remove)
removed_size += size
except FileNotFoundError:
pass
self.__cas.clean_up_refs_until(last_mtime)
if removed_size > 0:
logging.info("Successfully removed {} bytes from the cache".format(removed_size))
else:
logging.info("No artifacts were removed from the cache.")
return removed_size