diff options
Diffstat (limited to 'src/buildstream/_cas/casserver.py')
-rw-r--r-- | src/buildstream/_cas/casserver.py | 71 |
1 files changed, 18 insertions, 53 deletions
diff --git a/src/buildstream/_cas/casserver.py b/src/buildstream/_cas/casserver.py index 327b087c4..77f51256c 100644 --- a/src/buildstream/_cas/casserver.py +++ b/src/buildstream/_cas/casserver.py @@ -67,9 +67,7 @@ _MAX_PAYLOAD_BYTES = 1024 * 1024 # @contextmanager def create_server(repo, *, enable_push, quota, index_only): - cas = CASCache( - os.path.abspath(repo), cache_quota=quota, protect_session_blobs=False - ) + cas = CASCache(os.path.abspath(repo), cache_quota=quota, protect_session_blobs=False) try: artifactdir = os.path.join(os.path.abspath(repo), "artifacts", "refs") @@ -88,9 +86,7 @@ def create_server(repo, *, enable_push, quota, index_only): _ContentAddressableStorageServicer(cas, enable_push=enable_push), server ) - remote_execution_pb2_grpc.add_CapabilitiesServicer_to_server( - _CapabilitiesServicer(), server - ) + remote_execution_pb2_grpc.add_CapabilitiesServicer_to_server(_CapabilitiesServicer(), server) buildstream_pb2_grpc.add_ReferenceStorageServicer_to_server( _ReferenceStorageServicer(cas, enable_push=enable_push), server @@ -100,22 +96,13 @@ def create_server(repo, *, enable_push, quota, index_only): _ArtifactServicer(cas, artifactdir, update_cas=not index_only), server ) - source_pb2_grpc.add_SourceServiceServicer_to_server( - _SourceServicer(sourcedir), server - ) + source_pb2_grpc.add_SourceServiceServicer_to_server(_SourceServicer(sourcedir), server) # Create up reference storage and artifact capabilities - artifact_capabilities = buildstream_pb2.ArtifactCapabilities( - allow_updates=enable_push - ) - source_capabilities = buildstream_pb2.SourceCapabilities( - allow_updates=enable_push - ) + artifact_capabilities = buildstream_pb2.ArtifactCapabilities(allow_updates=enable_push) + source_capabilities = buildstream_pb2.SourceCapabilities(allow_updates=enable_push) buildstream_pb2_grpc.add_CapabilitiesServicer_to_server( - _BuildStreamCapabilitiesServicer( - artifact_capabilities, source_capabilities - ), - server, + _BuildStreamCapabilitiesServicer(artifact_capabilities, source_capabilities), server, ) yield server @@ -130,16 +117,10 @@ def create_server(repo, *, enable_push, quota, index_only): @click.option("--server-cert", help="Public server certificate for TLS (PEM-encoded)") @click.option("--client-certs", help="Public client certificates for TLS (PEM-encoded)") @click.option( - "--enable-push", - is_flag=True, - help="Allow clients to upload blobs and update artifact cache", + "--enable-push", is_flag=True, help="Allow clients to upload blobs and update artifact cache", ) @click.option( - "--quota", - type=click.INT, - default=10e9, - show_default=True, - help="Maximum disk usage in bytes", + "--quota", type=click.INT, default=10e9, show_default=True, help="Maximum disk usage in bytes", ) @click.option( "--index-only", @@ -147,31 +128,24 @@ def create_server(repo, *, enable_push, quota, index_only): help='Only provide the BuildStream artifact and source services ("index"), not the CAS ("storage")', ) @click.argument("repo") -def server_main( - repo, port, server_key, server_cert, client_certs, enable_push, quota, index_only -): +def server_main(repo, port, server_key, server_cert, client_certs, enable_push, quota, index_only): # Handle SIGTERM by calling sys.exit(0), which will raise a SystemExit exception, # properly executing cleanup code in `finally` clauses and context managers. # This is required to terminate buildbox-casd on SIGTERM. signal.signal(signal.SIGTERM, lambda signalnum, frame: sys.exit(0)) - with create_server( - repo, quota=quota, enable_push=enable_push, index_only=index_only - ) as server: + with create_server(repo, quota=quota, enable_push=enable_push, index_only=index_only) as server: use_tls = bool(server_key) if bool(server_cert) != use_tls: click.echo( - "ERROR: --server-key and --server-cert are both required for TLS", - err=True, + "ERROR: --server-key and --server-cert are both required for TLS", err=True, ) sys.exit(-1) if client_certs and not use_tls: - click.echo( - "ERROR: --client-certs can only be used with --server-key", err=True - ) + click.echo("ERROR: --client-certs can only be used with --server-key", err=True) sys.exit(-1) if use_tls: @@ -274,9 +248,7 @@ class _ByteStreamServicer(bytestream_pb2_grpc.ByteStreamServicer): break try: - os.posix_fallocate( - out.fileno(), 0, client_digest.size_bytes - ) + os.posix_fallocate(out.fileno(), 0, client_digest.size_bytes) break except OSError as e: # Multiple upload can happen in the same time @@ -322,9 +294,7 @@ class _ByteStreamServicer(bytestream_pb2_grpc.ByteStreamServicer): return response -class _ContentAddressableStorageServicer( - remote_execution_pb2_grpc.ContentAddressableStorageServicer -): +class _ContentAddressableStorageServicer(remote_execution_pb2_grpc.ContentAddressableStorageServicer): def __init__(self, cas, *, enable_push): super().__init__() self.cas = cas @@ -426,9 +396,7 @@ class _CapabilitiesServicer(remote_execution_pb2_grpc.CapabilitiesServicer): cache_capabilities.digest_function.append(remote_execution_pb2.SHA256) cache_capabilities.action_cache_update_capabilities.update_enabled = False cache_capabilities.max_batch_total_size_bytes = _MAX_PAYLOAD_BYTES - cache_capabilities.symlink_absolute_path_strategy = ( - remote_execution_pb2.CacheCapabilities.ALLOWED - ) + cache_capabilities.symlink_absolute_path_strategy = remote_execution_pb2.CacheCapabilities.ALLOWED response.deprecated_api_version.major = 2 response.low_api_version.major = 2 @@ -574,20 +542,17 @@ class _ArtifactServicer(artifact_pb2_grpc.ArtifactServiceServicer): directory.ParseFromString(f.read()) except FileNotFoundError: context.abort( - grpc.StatusCode.FAILED_PRECONDITION, - "Artifact {} specified but no files found".format(name), + grpc.StatusCode.FAILED_PRECONDITION, "Artifact {} specified but no files found".format(name), ) except DecodeError: context.abort( - grpc.StatusCode.FAILED_PRECONDITION, - "Artifact {} specified but directory not found".format(name), + grpc.StatusCode.FAILED_PRECONDITION, "Artifact {} specified but directory not found".format(name), ) def _check_file(self, name, digest, context): if not os.path.exists(self.cas.objpath(digest)): context.abort( - grpc.StatusCode.FAILED_PRECONDITION, - "Artifact {} specified but not found".format(name), + grpc.StatusCode.FAILED_PRECONDITION, "Artifact {} specified but not found".format(name), ) |