diff options
-rw-r--r-- | src/buildstream/_artifactcache.py | 255 | ||||
-rw-r--r-- | tests/testutils/artifactshare.py | 49 |
2 files changed, 117 insertions, 187 deletions
diff --git a/src/buildstream/_artifactcache.py b/src/buildstream/_artifactcache.py index ad70fad94..932db93ff 100644 --- a/src/buildstream/_artifactcache.py +++ b/src/buildstream/_artifactcache.py @@ -1,6 +1,6 @@ # # Copyright (C) 2017-2018 Codethink Limited -# Copyright (C) 2019 Bloomberg Finance LP +# Copyright (C) 2019-2020 Bloomberg Finance LP # # This program is free software; you can redistribute it and/or # modify it under the terms of the GNU Lesser General Public @@ -21,112 +21,14 @@ import os import grpc -from ._assetcache import AssetCache +from ._assetcache import AssetCache, AssetRemote from ._cas.casremote import BlobNotFound -from ._exceptions import ArtifactError, AssetCacheError, CASError, CASRemoteError, RemoteError -from ._protos.buildstream.v2 import buildstream_pb2, buildstream_pb2_grpc, artifact_pb2, artifact_pb2_grpc +from ._exceptions import ArtifactError, AssetCacheError, CASError, CASRemoteError +from ._protos.buildstream.v2 import artifact_pb2 -from ._remote import BaseRemote from . import utils - -# ArtifactRemote(): -# -# Facilitates communication with the BuildStream-specific part of -# artifact remotes. -# -class ArtifactRemote(BaseRemote): - def __init__(self, *args, **kwargs): - super().__init__(*args, **kwargs) - self.artifact_service = None - - def close(self): - self.artifact_service = None - super().close() - - # _configure_protocols(): - # - # Configure the protocols used by this remote as part of the - # remote initialization; Note that this should only be used in - # Remote.init(), and is expected to fail when called by itself. - # - def _configure_protocols(self): - # Set up artifact stub - self.artifact_service = artifact_pb2_grpc.ArtifactServiceStub(self.channel) - - # _check(): - # - # Check if this remote provides everything required for the - # particular kind of remote. This is expected to be called as part - # of check() - # - # Raises: - # RemoteError: If the upstream has a problem - # - def _check(self): - capabilities_service = buildstream_pb2_grpc.CapabilitiesStub(self.channel) - - # Check whether the server supports newer proto based artifact. - try: - request = buildstream_pb2.GetCapabilitiesRequest() - if self.instance_name: - request.instance_name = self.instance_name - response = capabilities_service.GetCapabilities(request) - except grpc.RpcError as e: - # Check if this remote has the artifact service - if e.code() == grpc.StatusCode.UNIMPLEMENTED: - raise RemoteError( - "Configured remote does not have the BuildStream " - "capabilities service. Please check remote configuration." - ) - # Else raise exception with details - raise RemoteError("Remote initialisation failed with status {}: {}".format(e.code().name, e.details())) - - if not response.artifact_capabilities: - raise RemoteError("Configured remote does not support artifact service") - - if self.spec.push and not response.artifact_capabilities.allow_updates: - raise RemoteError("Artifact server does not allow push") - - # get_artifact(): - # - # Get an artifact proto for a given cache key from the remote. - # - # Args: - # cache_key (str): The artifact cache key. NOTE: This "key" - # is actually the ref/name and its name in - # the protocol is inaccurate. You have been warned. - # - # Returns: - # (Artifact): The artifact proto - # - # Raises: - # grpc.RpcError: If someting goes wrong during the request. - # - def get_artifact(self, cache_key): - artifact_request = artifact_pb2.GetArtifactRequest() - artifact_request.cache_key = cache_key - - return self.artifact_service.GetArtifact(artifact_request) - - # update_artifact(): - # - # Update an artifact with the given cache key on the remote with - # the given proto. - # - # Args: - # cache_key (str): The artifact cache key of the artifact to update. - # artifact (ArtifactProto): The artifact proto to send. - # - # Raises: - # grpc.RpcError: If someting goes wrong during the request. - # - def update_artifact(self, cache_key, artifact): - update_request = artifact_pb2.UpdateArtifactRequest() - update_request.cache_key = cache_key - update_request.artifact.CopyFrom(artifact) - - self.artifact_service.UpdateArtifact(update_request) +REMOTE_ASSET_ARTIFACT_URN_TEMPLATE = "urn:fdc:buildstream.build:2020:artifact:{}" # An ArtifactCache manages artifacts. @@ -138,7 +40,7 @@ class ArtifactCache(AssetCache): spec_name = "artifact_cache_specs" config_node_name = "artifacts" - index_remote_class = ArtifactRemote + index_remote_class = AssetRemote def __init__(self, context): super().__init__(context) @@ -225,14 +127,18 @@ class ArtifactCache(AssetCache): index_remotes = [r for r in self._index_remotes[project] if r.push] storage_remotes = [r for r in self._storage_remotes[project] if r.push] + artifact_proto = artifact._get_proto() + artifact_digest = self.cas.add_object(buffer=artifact_proto.SerializeToString()) + pushed = False + # First push our files to all storage remotes, so that they # can perform file checks on their end for remote in storage_remotes: remote.init() element.status("Pushing data from artifact {} -> {}".format(display_key, remote)) - if self._push_artifact_blobs(artifact, remote): + if self._push_artifact_blobs(artifact, artifact_digest, remote): element.info("Pushed data from artifact {} -> {}".format(display_key, remote)) else: element.info( @@ -245,7 +151,7 @@ class ArtifactCache(AssetCache): remote.init() element.status("Pushing artifact {} -> {}".format(display_key, remote)) - if self._push_artifact_proto(element, artifact, remote): + if self._push_artifact_proto(element, artifact, artifact_digest, remote): element.info("Pushed artifact {} -> {}".format(display_key, remote)) pushed = True else: @@ -268,10 +174,13 @@ class ArtifactCache(AssetCache): # (bool): True if pull was successful, False if artifact was not available # def pull(self, element, key, *, pull_buildtrees=False): - artifact = None + artifact_digest = None display_key = key[: self.context.log_key_length] project = element._get_project() + artifact_name = element.get_artifact_name(key=key) + uri = REMOTE_ASSET_ARTIFACT_URN_TEMPLATE.format(artifact_name) + errors = [] # Start by pulling our artifact proto, so that we know which # blobs to pull @@ -279,23 +188,24 @@ class ArtifactCache(AssetCache): remote.init() try: element.status("Pulling artifact {} <- {}".format(display_key, remote)) - artifact = self._pull_artifact_proto(element, key, remote) - if artifact: + response = remote.fetch_blob([uri]) + if response: + artifact_digest = response.blob_digest break element.info("Remote ({}) does not have artifact {} cached".format(remote, display_key)) - except CASError as e: + except AssetCacheError as e: element.warn("Could not pull from remote {}: {}".format(remote, e)) errors.append(e) - if errors and not artifact: + if errors and not artifact_digest: raise ArtifactError( "Failed to pull artifact {}".format(display_key), detail="\n".join(str(e) for e in errors) ) # If we don't have an artifact, we can't exactly pull our # artifact - if not artifact: + if not artifact_digest: return False errors = [] @@ -305,7 +215,7 @@ class ArtifactCache(AssetCache): try: element.status("Pulling data for artifact {} <- {}".format(display_key, remote)) - if self._pull_artifact_storage(element, artifact, remote, pull_buildtrees=pull_buildtrees): + if self._pull_artifact_storage(element, key, artifact_digest, remote, pull_buildtrees=pull_buildtrees): element.info("Pulled artifact {} <- {}".format(display_key, remote)) return True @@ -483,7 +393,7 @@ class ArtifactCache(AssetCache): # ArtifactError: If we fail to push blobs (*unless* they're # already there or we run out of space on the server). # - def _push_artifact_blobs(self, artifact, remote): + def _push_artifact_blobs(self, artifact, artifact_digest, remote): artifact_proto = artifact._get_proto() try: @@ -496,7 +406,8 @@ class ArtifactCache(AssetCache): except FileNotFoundError: pass - digests = [] + digests = [artifact_digest] + if str(artifact_proto.public_data): digests.append(artifact_proto.public_data) @@ -525,7 +436,7 @@ class ArtifactCache(AssetCache): # Args: # element (Element): The element # artifact (Artifact): The related artifact being pushed - # remote (ArtifactRemote): Remote to push to + # remote (AssetRemote): Remote to push to # # Returns: # (bool): Whether we pushed the artifact. @@ -534,33 +445,46 @@ class ArtifactCache(AssetCache): # ArtifactError: If the push fails for any reason except the # artifact already existing. # - def _push_artifact_proto(self, element, artifact, remote): + def _push_artifact_proto(self, element, artifact, artifact_digest, remote): artifact_proto = artifact._get_proto() keys = list(utils._deduplicate([artifact_proto.strong_key, artifact_proto.weak_key])) + artifact_names = [element.get_artifact_name(key=key) for key in keys] + uris = [REMOTE_ASSET_ARTIFACT_URN_TEMPLATE.format(artifact_name) for artifact_name in artifact_names] - pushed = False + try: + response = remote.fetch_blob(uris) + # Skip push if artifact is already on the server + if response and response.blob_digest == artifact_digest: + return False + except grpc.RpcError as e: + if e.code() != grpc.StatusCode.NOT_FOUND: + raise ArtifactError( + "Error checking artifact cache with status {}: {}".format(e.code().name, e.details()) + ) - for key in keys: - try: - remote_artifact = remote.get_artifact(element.get_artifact_name(key=key)) - # Skip push if artifact is already on the server - if remote_artifact == artifact_proto: - continue - except grpc.RpcError as e: - if e.code() != grpc.StatusCode.NOT_FOUND: - raise ArtifactError( - "Error checking artifact cache with status {}: {}".format(e.code().name, e.details()) - ) + referenced_directories = [] + if artifact_proto.files: + referenced_directories.append(artifact_proto.files) + if artifact_proto.buildtree: + referenced_directories.append(artifact_proto.buildtree) + if artifact_proto.sources: + referenced_directories.append(artifact_proto.sources) - try: - remote.update_artifact(element.get_artifact_name(key=key), artifact_proto) - pushed = True - except grpc.RpcError as e: - raise ArtifactError("Failed to push artifact with status {}: {}".format(e.code().name, e.details())) + referenced_blobs = [log_file.digest for log_file in artifact_proto.logs] - return pushed + try: + remote.push_blob( + uris, + artifact_digest, + references_blobs=referenced_blobs, + references_directories=referenced_directories, + ) + except grpc.RpcError as e: + raise ArtifactError("Failed to push artifact with status {}: {}".format(e.code().name, e.details())) + + return True # _pull_artifact_storage(): # @@ -579,7 +503,7 @@ class ArtifactCache(AssetCache): # ArtifactError: If the pull failed for any reason except the # blobs not existing on the server. # - def _pull_artifact_storage(self, element, artifact, remote, pull_buildtrees=False): + def _pull_artifact_storage(self, element, key, artifact_digest, remote, pull_buildtrees=False): def __pull_digest(digest): self.cas._fetch_directory(remote, digest) required_blobs = self.cas.required_blobs_for_directory(digest) @@ -587,7 +511,21 @@ class ArtifactCache(AssetCache): if missing_blobs: self.cas.fetch_blobs(remote, missing_blobs) + artifact_name = element.get_artifact_name(key=key) + try: + # Fetch and parse artifact proto + self.cas.fetch_blobs(remote, [artifact_digest]) + artifact = artifact_pb2.Artifact() + with open(self.cas.objpath(artifact_digest), "rb") as f: + artifact.ParseFromString(f.read()) + + # Write the artifact proto to cache + artifact_path = os.path.join(self._basedir, artifact_name) + os.makedirs(os.path.dirname(artifact_path), exist_ok=True) + with utils.save_file_atomic(artifact_path, mode="wb") as f: + f.write(artifact.SerializeToString()) + if str(artifact.files): __pull_digest(artifact.files) @@ -609,57 +547,22 @@ class ArtifactCache(AssetCache): return True - # _pull_artifact_proto(): - # - # Pull an artifact proto from a remote server. - # - # Args: - # element (Element): The element whose artifact to pull. - # key (str): The specific key for the artifact to pull. - # remote (ArtifactRemote): The remote to pull from. - # - # Returns: - # (Artifact|None): The artifact proto, or None if the server - # doesn't have it. - # - # Raises: - # ArtifactError: If the pull fails. - # - def _pull_artifact_proto(self, element, key, remote): - artifact_name = element.get_artifact_name(key=key) - - try: - artifact = remote.get_artifact(artifact_name) - except grpc.RpcError as e: - if e.code() != grpc.StatusCode.NOT_FOUND: - raise ArtifactError("Failed to pull artifact with status {}: {}".format(e.code().name, e.details())) - return None - - # Write the artifact proto to cache - artifact_path = os.path.join(self._basedir, artifact_name) - os.makedirs(os.path.dirname(artifact_path), exist_ok=True) - with utils.save_file_atomic(artifact_path, mode="wb") as f: - f.write(artifact.SerializeToString()) - - return artifact - # _query_remote() # # Args: # ref (str): The artifact ref - # remote (ArtifactRemote): The remote we want to check + # remote (AssetRemote): The remote we want to check # # Returns: # (bool): True if the ref exists in the remote, False otherwise. # def _query_remote(self, ref, remote): - request = artifact_pb2.GetArtifactRequest() - request.cache_key = ref + uri = REMOTE_ASSET_ARTIFACT_URN_TEMPLATE.format(ref) + try: - remote.artifact_service.GetArtifact(request) + response = remote.fetch_blob([uri]) + return bool(response) except grpc.RpcError as e: if e.code() != grpc.StatusCode.NOT_FOUND: raise ArtifactError("Error when querying with status {}: {}".format(e.code().name, e.details())) return False - - return True diff --git a/tests/testutils/artifactshare.py b/tests/testutils/artifactshare.py index e471d7989..07def5c86 100644 --- a/tests/testutils/artifactshare.py +++ b/tests/testutils/artifactshare.py @@ -6,14 +6,19 @@ from collections import namedtuple from contextlib import ExitStack, contextmanager from concurrent import futures from multiprocessing import Process, Queue +from urllib.parse import urlparse import grpc from buildstream._cas import CASCache from buildstream._cas.casserver import create_server from buildstream._exceptions import CASError +from buildstream._protos.build.bazel.remote.asset.v1 import remote_asset_pb2, remote_asset_pb2_grpc from buildstream._protos.build.bazel.remote.execution.v2 import remote_execution_pb2 from buildstream._protos.buildstream.v2 import artifact_pb2, source_pb2 +from buildstream._protos.google.rpc import code_pb2 + +REMOTE_ASSET_ARTIFACT_URN_TEMPLATE = "urn:fdc:buildstream.build:2020:artifact:{}" class BaseArtifactShare: @@ -118,8 +123,6 @@ class ArtifactShare(BaseArtifactShare): # self.repodir = os.path.join(self.directory, "repo") os.makedirs(self.repodir) - self.artifactdir = os.path.join(self.repodir, "artifacts", "refs") - os.makedirs(self.artifactdir) self.sourcedir = os.path.join(self.repodir, "source_protos") os.makedirs(self.sourcedir) @@ -153,16 +156,29 @@ class ArtifactShare(BaseArtifactShare): return os.path.exists(object_path) def get_artifact_proto(self, artifact_name): - artifact_proto = artifact_pb2.Artifact() - artifact_path = os.path.join(self.artifactdir, artifact_name) - + url = urlparse(self.repo) + channel = grpc.insecure_channel("{}:{}".format(url.hostname, url.port)) try: - with open(artifact_path, "rb") as f: - artifact_proto.ParseFromString(f.read()) - except FileNotFoundError: - return None + fetch_service = remote_asset_pb2_grpc.FetchStub(channel) + + uri = REMOTE_ASSET_ARTIFACT_URN_TEMPLATE.format(artifact_name) + + request = remote_asset_pb2.FetchBlobRequest() + request.uris.append(uri) + + try: + response = fetch_service.FetchBlob(request) + except grpc.RpcError as e: + if e.code() == grpc.StatusCode.NOT_FOUND: + return None + raise + + if response.status.code != code_pb2.OK: + return None - return artifact_proto + return response.blob_digest + finally: + channel.close() def get_source_proto(self, source_name): source_proto = source_pb2.Source() @@ -176,7 +192,7 @@ class ArtifactShare(BaseArtifactShare): return source_proto - def get_cas_files(self, artifact_proto): + def get_cas_files(self, artifact_proto_digest): reachable = set() @@ -184,6 +200,17 @@ class ArtifactShare(BaseArtifactShare): self.cas._reachable_refs_dir(reachable, digest, update_mtime=False, check_exists=True) try: + artifact_proto_path = self.cas.objpath(artifact_proto_digest) + if not os.path.exists(artifact_proto_path): + return None + + artifact_proto = artifact_pb2.Artifact() + try: + with open(artifact_proto_path, "rb") as f: + artifact_proto.ParseFromString(f.read()) + except FileNotFoundError: + return None + if str(artifact_proto.files): reachable_dir(artifact_proto.files) |