diff options
author | Jürg Billeter <j@bitron.ch> | 2020-07-01 12:58:11 +0200 |
---|---|---|
committer | bst-marge-bot <marge-bot@buildstream.build> | 2020-08-13 09:24:43 +0000 |
commit | fce20bc02396071f3e77b64f45f16c8a2049571c (patch) | |
tree | 9d4dd422433cef2cbc7b65aee5ac63eee15d9ba6 /src | |
parent | fa255ad0629bd7f5977342ce8e3a7ddb3ac2166f (diff) | |
download | buildstream-fce20bc02396071f3e77b64f45f16c8a2049571c.tar.gz |
_sourcecache.py: Use AssetRemote
This migrates the source cache from the BuildStream Source protocol to
the Remote Asset API.
Diffstat (limited to 'src')
-rw-r--r-- | src/buildstream/_sourcecache.py | 127 |
1 files changed, 25 insertions, 102 deletions
diff --git a/src/buildstream/_sourcecache.py b/src/buildstream/_sourcecache.py index 6ba7ec782..a05344de4 100644 --- a/src/buildstream/_sourcecache.py +++ b/src/buildstream/_sourcecache.py @@ -1,5 +1,5 @@ # -# Copyright (C) 2019 Bloomberg Finance LP +# Copyright (C) 2019-2020 Bloomberg Finance LP # # This program is free software; you can redistribute it and/or # modify it under the terms of the GNU Lesser General Public @@ -20,98 +20,14 @@ import os import grpc -from ._remote import BaseRemote from ._cas.casremote import BlobNotFound from .storage._casbaseddirectory import CasBasedDirectory -from ._assetcache import AssetCache -from ._exceptions import CASError, CASRemoteError, SourceCacheError, RemoteError +from ._assetcache import AssetCache, AssetRemote +from ._exceptions import CASError, CASRemoteError, SourceCacheError from . import utils -from ._protos.buildstream.v2 import buildstream_pb2, buildstream_pb2_grpc, source_pb2, source_pb2_grpc +from ._protos.buildstream.v2 import source_pb2 - -class SourceRemote(BaseRemote): - def __init__(self, *args, **kwargs): - super().__init__(*args, **kwargs) - self.source_service = None - - def close(self): - self.source_service = None - super().close() - - def _configure_protocols(self): - # set up source service - self.source_service = source_pb2_grpc.SourceServiceStub(self.channel) - - # _check(): - # - # Check if this remote provides everything required for the - # particular kind of remote. This is expected to be called as part - # of check() - # - # Raises: - # RemoteError: If the upstream has a problem - # - def _check(self): - capabilities_service = buildstream_pb2_grpc.CapabilitiesStub(self.channel) - - # check that the service supports sources - try: - request = buildstream_pb2.GetCapabilitiesRequest() - if self.instance_name: - request.instance_name = self.instance_name - response = capabilities_service.GetCapabilities(request) - except grpc.RpcError as e: - # Check if this remote has the artifact service - if e.code() == grpc.StatusCode.UNIMPLEMENTED: - raise RemoteError( - "Configured remote does not have the BuildStream " - "capabilities service. Please check remote configuration." - ) - raise RemoteError("Remote initialisation failed with status {}: {}".format(e.code().name, e.details())) - - if not response.source_capabilities: - raise RemoteError("Configured remote does not support source service") - - if self.spec.push and not response.source_capabilities.allow_updates: - raise RemoteError("Source server does not allow push") - - # get_source(): - # - # Get a source proto for a given source_ref from the remote. - # - # Args: - # source_ref (str): The source ref of the source to pull. - # - # Returns: - # (Source): The source proto - # - # Raises: - # grpc.RpcError: If something goes wrong during the request. - # - def get_source(self, source_ref): - request = source_pb2.GetSourceRequest() - request.cache_key = source_ref - return self.source_service.GetSource(request) - - # update_source(): - # - # Update the source on the remote. - # - # Args: - # source_ref (str): The source ref of the source to update. - # source (Source): The proto to update with. - # - # Returns: - # (bool): Whether the update was successful. - # - # Raises: - # grpc.RpcError: If something goes wrong during the request. - # - def update_source(self, source_ref, source): - request = source_pb2.UpdateSourceRequest() - request.cache_key = source_ref - request.source.CopyFrom(source) - return self.source_service.UpdateSource(request) +REMOTE_ASSET_SOURCE_URN_TEMPLATE = "urn:fdc:buildstream.build:2020:source:{}" # Class that keeps config of remotes and deals with caching of sources. @@ -123,7 +39,7 @@ class SourceCache(AssetCache): spec_name = "source_cache_specs" config_node_name = "source-caches" - index_remote_class = SourceRemote + index_remote_class = AssetRemote def __init__(self, context): super().__init__(context) @@ -213,15 +129,15 @@ class SourceCache(AssetCache): index_remotes = self._index_remotes[project] storage_remotes = self._storage_remotes[project] - # First fetch the source proto so we know what to pull - source_proto = None + # First fetch the source directory digest so we know what to pull + source_digest = None for remote in index_remotes: try: remote.init() source.status("Pulling source {} <- {}".format(display_key, remote)) - source_proto = self._pull_source(ref, remote) - if source_proto is None: + source_digest = self._pull_source(ref, remote) + if source_digest is None: source.info( "Remote source service ({}) does not have source {} cached".format(remote, display_key) ) @@ -229,7 +145,7 @@ class SourceCache(AssetCache): except CASError as e: raise SourceCacheError("Failed to pull source {}: {}".format(display_key, e)) from e - if not source_proto: + if not source_digest: return False for remote in storage_remotes: @@ -238,8 +154,8 @@ class SourceCache(AssetCache): source.status("Pulling data for source {} <- {}".format(display_key, remote)) # Fetch source blobs - self.cas._fetch_directory(remote, source_proto.files) - required_blobs = self.cas.required_blobs_for_directory(source_proto.files) + self.cas._fetch_directory(remote, source_digest) + required_blobs = self.cas.required_blobs_for_directory(source_digest) missing_blobs = self.cas.local_missing_blobs(required_blobs) self.cas.fetch_blobs(remote, missing_blobs) @@ -336,11 +252,15 @@ class SourceCache(AssetCache): return os.path.join(self._basedir, ref) def _pull_source(self, source_ref, remote): + uri = REMOTE_ASSET_SOURCE_URN_TEMPLATE.format(source_ref) + try: remote.init() - response = remote.get_source(source_ref) - self._store_proto(response, source_ref) - return response + response = remote.fetch_directory([uri]) + if not response: + return None + self._store_source(source_ref, response.root_directory_digest) + return response.root_directory_digest except grpc.RpcError as e: if e.code() != grpc.StatusCode.NOT_FOUND: @@ -348,12 +268,15 @@ class SourceCache(AssetCache): return None def _push_source(self, source_ref, remote): + uri = REMOTE_ASSET_SOURCE_URN_TEMPLATE.format(source_ref) + try: remote.init() source_proto = self._get_source(source_ref) - return remote.update_source(source_ref, source_proto) + remote.push_directory([uri], source_proto.files) + return True except grpc.RpcError as e: if e.code() != grpc.StatusCode.RESOURCE_EXHAUSTED: raise SourceCacheError("Failed to push source with status {}: {}".format(e.code().name, e.details())) - return None + return False |