From 1d88533b9c3f8e70ed009171ae4f444910bf0418 Mon Sep 17 00:00:00 2001 From: Abderrahim Kitouni Date: Thu, 16 Jul 2020 20:56:15 +0100 Subject: cascache.py: allow using Remote Asset for storing refs --- buildstream/_artifactcache/cascache.py | 124 +++++++++++++++++++++++++-------- 1 file changed, 95 insertions(+), 29 deletions(-) diff --git a/buildstream/_artifactcache/cascache.py b/buildstream/_artifactcache/cascache.py index 645f2dad7..27020a099 100644 --- a/buildstream/_artifactcache/cascache.py +++ b/buildstream/_artifactcache/cascache.py @@ -31,6 +31,7 @@ import grpc from .._protos.google.bytestream import bytestream_pb2, bytestream_pb2_grpc from .._protos.build.bazel.remote.execution.v2 import remote_execution_pb2, remote_execution_pb2_grpc +from .._protos.build.bazel.remote.asset.v1 import remote_asset_pb2, remote_asset_pb2_grpc from .._protos.buildstream.v2 import buildstream_pb2, buildstream_pb2_grpc from .. import utils @@ -44,6 +45,8 @@ _MAX_PAYLOAD_BYTES = 1024 * 1024 # How often is a keepalive ping sent to the server to make sure the transport is still alive _KEEPALIVE_TIME_MS = 60000 +REMOTE_ASSET_URN_TEMPLATE = "urn:fdc:buildstream.build:2020:v1:{}" + class _Attempt(): @@ -215,16 +218,23 @@ class CASCache(): remote = CASRemote(remote_spec) remote.init() - request = buildstream_pb2.StatusRequest() - for attempt in _retry(): - with attempt: - response = remote.ref_storage.Status(request) - - if remote_spec.push and not response.allow_updates: - q.put('CAS server does not allow push') + if remote.asset_fetch_supported: + if remote_spec.push and not remote.asset_push_supported: + q.put('Remote Asset server does not allow push') + else: + # No error + q.put(None) else: - # No error - q.put(None) + request = buildstream_pb2.StatusRequest() + for attempt in _retry(): + with attempt: + response = remote.ref_storage.Status(request) + + if remote_spec.push and not response.allow_updates: + q.put('CAS server does not allow push') + else: + # No error + q.put(None) except grpc.RpcError as e: # str(e) is too verbose for errors reported to the user @@ -251,15 +261,24 @@ class CASCache(): try: remote.init() - request = buildstream_pb2.GetReferenceRequest() - request.key = ref - for attempt in _retry(): - with attempt: - response = remote.ref_storage.GetReference(request) + if remote.asset_fetch_supported: + request = remote_asset_pb2.FetchDirectoryRequest() + request.uris.append(REMOTE_ASSET_URN_TEMPLATE.format(ref)) + for attempt in _retry(): + with attempt: + response = remote.remote_asset_fetch.FetchDirectory(request) + digest = response.root_directory_digest + else: + request = buildstream_pb2.GetReferenceRequest() + request.key = ref + for attempt in _retry(): + with attempt: + response = remote.ref_storage.GetReference(request) + digest = response.digest tree = remote_execution_pb2.Digest() - tree.hash = response.digest.hash - tree.size_bytes = response.digest.size_bytes + tree.hash = digest.hash + tree.size_bytes = digest.size_bytes self._fetch_directory(remote, tree) @@ -308,13 +327,23 @@ class CASCache(): # Check whether ref is already on the server in which case # there is no need to push the ref try: - request = buildstream_pb2.GetReferenceRequest() - request.key = ref - for attempt in _retry(): - with attempt: - response = remote.ref_storage.GetReference(request) + if remote.asset_fetch_supported: + request = remote_asset_pb2.FetchDirectoryRequest() + request.uris.append(REMOTE_ASSET_URN_TEMPLATE.format(ref)) + for attempt in _retry(): + with attempt: + response = remote.remote_asset_fetch.FetchDirectory(request) + digest = response.root_directory_digest - if response.digest.hash == tree.hash and response.digest.size_bytes == tree.size_bytes: + else: + request = buildstream_pb2.GetReferenceRequest() + request.key = ref + for attempt in _retry(): + with attempt: + response = remote.ref_storage.GetReference(request) + digest = response.digest + + if digest.hash == tree.hash and digest.size_bytes == tree.size_bytes: # ref is already on the server with the same tree continue @@ -325,13 +354,22 @@ class CASCache(): self._send_directory(remote, tree) - request = buildstream_pb2.UpdateReferenceRequest() - request.keys.append(ref) - request.digest.hash = tree.hash - request.digest.size_bytes = tree.size_bytes - for attempt in _retry(): - with attempt: - remote.ref_storage.UpdateReference(request) + if remote.asset_push_supported: + request = remote_asset_pb2.PushDirectoryRequest() + request.uris.append(REMOTE_ASSET_URN_TEMPLATE.format(ref)) + request.root_directory_digest.hash = tree.hash + request.root_directory_digest.size_bytes = tree.size_bytes + for attempt in _retry(): + with attempt: + remote.remote_asset_push.PushDirectory(request) + else: + request = buildstream_pb2.UpdateReferenceRequest() + request.keys.append(ref) + request.digest.hash = tree.hash + request.digest.size_bytes = tree.size_bytes + for attempt in _retry(): + with attempt: + remote.ref_storage.UpdateReference(request) skipped_remote = False except grpc.RpcError as e: @@ -1050,6 +1088,8 @@ class CASRemote(): self.cas = remote_execution_pb2_grpc.ContentAddressableStorageStub(self.channel) self.capabilities = remote_execution_pb2_grpc.CapabilitiesStub(self.channel) self.ref_storage = buildstream_pb2_grpc.ReferenceStorageStub(self.channel) + self.remote_asset_fetch = remote_asset_pb2_grpc.FetchStub(self.channel) + self.remote_asset_push = remote_asset_pb2_grpc.PushStub(self.channel) self.max_batch_total_size_bytes = _MAX_PAYLOAD_BYTES try: @@ -1090,6 +1130,32 @@ class CASRemote(): e.code() != grpc.StatusCode.PERMISSION_DENIED): raise + self.asset_fetch_supported = False + try: + request = remote_asset_pb2.FetchDirectoryRequest() + for attempt in _retry(): + with attempt: + response = self.remote_asset_fetch.FetchDirectory(request) + except grpc.RpcError as e: + if e.code() == grpc.StatusCode.INVALID_ARGUMENT: + # Expected error as the request doesn't specify any URIs. + self.asset_fetch_supported = True + elif e.code() != grpc.StatusCode.UNIMPLEMENTED: + raise + + self.asset_push_supported = False + try: + request = remote_asset_pb2.PushDirectoryRequest() + for attempt in _retry(): + with attempt: + response = self.remote_asset_push.PushDirectory(request) + except grpc.RpcError as e: + if e.code() == grpc.StatusCode.INVALID_ARGUMENT: + # Expected error as the request doesn't specify any URIs. + self.asset_push_supported = True + elif e.code() != grpc.StatusCode.UNIMPLEMENTED: + raise + self._initialized = True -- cgit v1.2.1