diff options
author | bst-marge-bot <marge-bot@buildstream.build> | 2019-09-06 16:03:09 +0000 |
---|---|---|
committer | bst-marge-bot <marge-bot@buildstream.build> | 2019-09-06 16:03:09 +0000 |
commit | 20a920f4fd3641b43e714b6c9a9d57a7095c66c5 (patch) | |
tree | 3b2cab77021916fb37fdb7eb60001bc8f8bd2608 | |
parent | 92fedf8b7fffefb76fe9648d99ac684477b3dbd9 (diff) | |
parent | f40212206bb3ee3f772f9a816476d9cb10c46fca (diff) | |
download | buildstream-20a920f4fd3641b43e714b6c9a9d57a7095c66c5.tar.gz |
Merge branch 'tlater/cache-endpoints' into 'master'
Support separate end points for artifact caches
Closes #1041
See merge request BuildStream/buildstream!1540
25 files changed, 1329 insertions, 563 deletions
diff --git a/doc/source/format_project.rst b/doc/source/format_project.rst index d33a4974f..c4988527a 100644 --- a/doc/source/format_project.rst +++ b/doc/source/format_project.rst @@ -225,6 +225,44 @@ and keys, please see: :ref:`Key pair for the server <server_authentication>`. new server API. As a result newer buildstream clients won't work with older servers. +.. _project_essentials_split_artifacts: + +Split cache servers +~~~~~~~~~~~~~~~~~~~ + +Should you need to configure an artifact cache to work with a CAS +server that does not support BuildStream's artifact format, you can +"split" that cache and run an artifacts-only server separately. The +format for that is as such: + +.. code:: yaml + + # + # Artifacts + # + artifacts: + # A remote cache from which to download prebuilt artifacts + - url: https://storage.foo.com:11001 + server-cert: server.crt + # "storage" remotes store the artifact contents only - this can + # be a normal CAS implementation such as those provided by + # Buildbarn or Bazel Buildfarm + type: storage + - url: https://index.foo.com:11001 + server-cert: server.crt + # "index" remotes store only artifact metadata. This is + # currently only provided by the bst-artifact-server and BuildGrid + type: index + # A remote cache from which to upload/download built/prebuilt artifacts + - url: https://foo.com:11002 + push: true + server-cert: server.crt + client-cert: client.crt + client-key: client.key + # Caches that support both can omit the type, or set it to "both" - + # currently, also only supported by bst-artifact-server and BuildGrid + type: both + .. _project_source_cache: Source cache server @@ -247,6 +285,11 @@ Exactly the same as artifact servers, source cache servers can be specified. client-cert: client.crt client-key: client.key +.. note:: + + Source caches also support "splitting" like :ref:`artifact servers + <project_essentials_split_artifacts>`. + .. _project_remote_execution: Remote execution diff --git a/doc/source/using_configuring_cache_server.rst b/doc/source/using_configuring_cache_server.rst index 856046f35..d31a6661c 100644 --- a/doc/source/using_configuring_cache_server.rst +++ b/doc/source/using_configuring_cache_server.rst @@ -159,6 +159,22 @@ Instance with push and requiring client authentication: bst-artifact-server --port 11002 --server-key server.key --server-cert server.crt --client-certs authorized.crt --enable-push /home/artifacts/artifacts +.. note:: + + BuildStream's artifact cache is an extension of `Google's Remote + Execution CAS server + <https://github.com/bazelbuild/remote-apis/>`_. + + Sometimes, when using Remote Execution, it is useful to run + BuildStream with just a basic CAS server, without using the + artifact extensions, but BuildStream still needs to store these to + work correctly. + + For this scenario, you can add the `--index-only` flag to the above + commands, and configure BuildStream to store artifact metadata and + files in a separate caches (e.g. bst-artifact-server and Buildbarn) + using :ref:`"types" <project_essentials_split_artifacts>`. + Managing the cache with systemd ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ diff --git a/src/buildstream/_artifactcache.py b/src/buildstream/_artifactcache.py index 3357f986a..0e2eb1091 100644 --- a/src/buildstream/_artifactcache.py +++ b/src/buildstream/_artifactcache.py @@ -25,58 +25,89 @@ from ._exceptions import ArtifactError, CASError, CASCacheError, CASRemoteError from ._protos.buildstream.v2 import buildstream_pb2, buildstream_pb2_grpc, \ artifact_pb2, artifact_pb2_grpc -from ._cas import CASRemoteSpec, CASRemote +from ._remote import BaseRemote from .storage._casbaseddirectory import CasBasedDirectory from ._artifact import Artifact from . import utils -# An ArtifactCacheSpec holds the user configuration for a single remote -# artifact cache. +# ArtifactRemote(): # -# Args: -# url (str): Location of the remote artifact cache -# push (bool): Whether we should attempt to push artifacts to this cache, -# in addition to pulling from it. +# Facilitates communication with the BuildStream-specific part of +# artifact remotes. # -class ArtifactCacheSpec(CASRemoteSpec): - pass +class ArtifactRemote(BaseRemote): + # _configure_protocols(): + # + # Configure the protocols used by this remote as part of the + # remote initialization; Note that this should only be used in + # Remote.init(), and is expected to fail when called by itself. + # + def _configure_protocols(self): + # Add artifact stub + capabilities_service = buildstream_pb2_grpc.CapabilitiesStub(self.channel) + # Check whether the server supports newer proto based artifact. + try: + request = buildstream_pb2.GetCapabilitiesRequest() + if self.instance_name: + request.instance_name = self.instance_name + response = capabilities_service.GetCapabilities(request) + except grpc.RpcError as e: + # Check if this remote has the artifact service + if e.code() == grpc.StatusCode.UNIMPLEMENTED: + raise ArtifactError( + "Configured remote does not have the BuildStream " + "capabilities service. Please check remote configuration.") + # Else raise exception with details + raise ArtifactError( + "Remote initialisation failed: {}".format(e.details())) -# ArtifactRemote extends CASRemote to check during initialisation that there is -# an artifact service -class ArtifactRemote(CASRemote): - def __init__(self, *args): - super().__init__(*args) - self.capabilities_service = None + if not response.artifact_capabilities: + raise ArtifactError( + "Configured remote does not support artifact service") - def init(self): - if not self._initialized: - # do default initialisation - super().init() + # get_artifact(): + # + # Get an artifact proto for a given cache key from the remote. + # + # Args: + # cache_key (str): The artifact cache key. NOTE: This "key" + # is actually the ref/name and its name in + # the protocol is inaccurate. You have been warned. + # + # Returns: + # (Artifact): The artifact proto + # + # Raises: + # grpc.RpcError: If someting goes wrong during the request. + # + def get_artifact(self, cache_key): + artifact_request = artifact_pb2.GetArtifactRequest() + artifact_request.cache_key = cache_key - # Add artifact stub - self.capabilities_service = buildstream_pb2_grpc.CapabilitiesStub(self.channel) + artifact_service = artifact_pb2_grpc.ArtifactServiceStub(self.channel) + return artifact_service.GetArtifact(artifact_request) - # Check whether the server supports newer proto based artifact. - try: - request = buildstream_pb2.GetCapabilitiesRequest() - if self.instance_name: - request.instance_name = self.instance_name - response = self.capabilities_service.GetCapabilities(request) - except grpc.RpcError as e: - # Check if this remote has the artifact service - if e.code() == grpc.StatusCode.UNIMPLEMENTED: - raise ArtifactError( - "Configured remote does not have the BuildStream " - "capabilities service. Please check remote configuration.") - # Else raise exception with details - raise ArtifactError( - "Remote initialisation failed: {}".format(e.details())) + # update_artifact(): + # + # Update an artifact with the given cache key on the remote with + # the given proto. + # + # Args: + # cache_key (str): The artifact cache key of the artifact to update. + # artifact (ArtifactProto): The artifact proto to send. + # + # Raises: + # grpc.RpcError: If someting goes wrong during the request. + # + def update_artifact(self, cache_key, artifact): + update_request = artifact_pb2.UpdateArtifactRequest() + update_request.cache_key = cache_key + update_request.artifact.CopyFrom(artifact) - if not response.artifact_capabilities: - raise ArtifactError( - "Configured remote does not support artifact service") + artifact_service = artifact_pb2_grpc.ArtifactServiceStub(self.channel) + artifact_service.UpdateArtifact(update_request) # An ArtifactCache manages artifacts. @@ -86,11 +117,10 @@ class ArtifactRemote(CASRemote): # class ArtifactCache(BaseCache): - spec_class = ArtifactCacheSpec spec_name = "artifact_cache_specs" spec_error = ArtifactError config_node_name = "artifacts" - remote_class = ArtifactRemote + index_remote_class = ArtifactRemote def __init__(self, context): super().__init__(context) @@ -198,22 +228,35 @@ class ArtifactCache(BaseCache): # def push(self, element, artifact): project = element._get_project() + display_key = element._get_brief_display_key() - push_remotes = [r for r in self._remotes[project] if r.spec.push] + index_remotes = [r for r in self._index_remotes[project] if r.push] + storage_remotes = [r for r in self._storage_remotes[project] if r.push] pushed = False + # First push our files to all storage remotes, so that they + # can perform file checks on their end + for remote in storage_remotes: + remote.init() + element.status("Pushing data from artifact {} -> {}".format(display_key, remote)) - for remote in push_remotes: + if self._push_artifact_blobs(artifact, remote): + element.info("Pushed data from artifact {} -> {}".format(display_key, remote)) + else: + element.info("Remote ({}) already has all data of artifact {} cached".format( + remote, element._get_brief_display_key() + )) + + for remote in index_remotes: remote.init() - display_key = element._get_brief_display_key() - element.status("Pushing artifact {} -> {}".format(display_key, remote.spec.url)) + element.status("Pushing artifact {} -> {}".format(display_key, remote)) - if self._push_artifact(element, artifact, remote): - element.info("Pushed artifact {} -> {}".format(display_key, remote.spec.url)) + if self._push_artifact_proto(element, artifact, remote): + element.info("Pushed artifact {} -> {}".format(display_key, remote)) pushed = True else: element.info("Remote ({}) already has artifact {} cached".format( - remote.spec.url, element._get_brief_display_key() + remote, element._get_brief_display_key() )) return pushed @@ -231,26 +274,59 @@ class ArtifactCache(BaseCache): # (bool): True if pull was successful, False if artifact was not available # def pull(self, element, key, *, pull_buildtrees=False): + artifact = None display_key = key[:self.context.log_key_length] project = element._get_project() - for remote in self._remotes[project]: + errors = [] + # Start by pulling our artifact proto, so that we know which + # blobs to pull + for remote in self._index_remotes[project]: remote.init() try: - element.status("Pulling artifact {} <- {}".format(display_key, remote.spec.url)) - - if self._pull_artifact(element, key, remote, pull_buildtrees=pull_buildtrees): - element.info("Pulled artifact {} <- {}".format(display_key, remote.spec.url)) - # no need to pull from additional remotes - return True + element.status("Pulling artifact {} <- {}".format(display_key, remote)) + artifact = self._pull_artifact_proto(element, key, remote) + if artifact: + element.info("Pulled artifact {} <- {}".format(display_key, remote)) + break else: element.info("Remote ({}) does not have artifact {} cached".format( - remote.spec.url, display_key + remote, display_key )) + except CASError as e: + element.warn("Could not pull from remote {}: {}".format(remote, e)) + errors.append(e) + + if errors and not artifact: + raise ArtifactError("Failed to pull artifact {}".format(display_key), + detail="\n".join(str(e) for e in errors)) + + # If we don't have an artifact, we can't exactly pull our + # artifact + if not artifact: + return False + + errors = [] + # If we do, we can pull it! + for remote in self._storage_remotes[project]: + remote.init() + try: + element.status("Pulling data for artifact {} <- {}".format(display_key, remote)) + + if self._pull_artifact_storage(element, artifact, remote, pull_buildtrees=pull_buildtrees): + element.info("Pulled data for artifact {} <- {}".format(display_key, remote)) + return True + element.info("Remote ({}) does not have artifact {} cached".format( + remote, display_key + )) except CASError as e: - raise ArtifactError("Failed to pull artifact {}: {}".format( - display_key, e)) from e + element.warn("Could not pull from remote {}: {}".format(remote, e)) + errors.append(e) + + if errors: + raise ArtifactError("Failed to pull artifact {}".format(display_key), + detail="\n".join(str(e) for e in errors)) return False @@ -264,7 +340,7 @@ class ArtifactCache(BaseCache): # digest (Digest): The digest of the tree # def pull_tree(self, project, digest): - for remote in self._remotes[project]: + for remote in self._storage_remotes[project]: digest = self.cas.pull_tree(remote, digest) if digest: @@ -287,7 +363,7 @@ class ArtifactCache(BaseCache): def push_message(self, project, message): if self._has_push_remotes: - push_remotes = [r for r in self._remotes[project] if r.spec.push] + push_remotes = [r for r in self._storage_remotes[project] if r.spec.push] else: push_remotes = [] @@ -341,7 +417,7 @@ class ArtifactCache(BaseCache): # missing_blobs (list): The Digests of the blobs to fetch # def fetch_missing_blobs(self, project, missing_blobs): - for remote in self._remotes[project]: + for remote in self._index_remotes[project]: if not missing_blobs: break @@ -368,7 +444,7 @@ class ArtifactCache(BaseCache): if not missing_blobs: return [] - push_remotes = [r for r in self._remotes[project] if r.spec.push] + push_remotes = [r for r in self._storage_remotes[project] if r.spec.push] remote_missing_blobs_list = [] @@ -395,12 +471,12 @@ class ArtifactCache(BaseCache): # def check_remotes_for_element(self, element): # If there are no remotes - if not self._remotes: + if not self._index_remotes: return False project = element._get_project() ref = element.get_artifact_name() - for remote in self._remotes[project]: + for remote in self._index_remotes[project]: remote.init() if self._query_remote(ref, remote): @@ -412,40 +488,59 @@ class ArtifactCache(BaseCache): # Local Private Methods # ################################################ - # _push_artifact() + # _reachable_directories() # - # Pushes relevant directories and then artifact proto to remote. + # Returns: + # (iter): Iterator over directories digests available from artifacts. # - # Args: - # element (Element): The element - # artifact (Artifact): The related artifact being pushed - # remote (CASRemote): Remote to push to + def _reachable_directories(self): + for root, _, files in os.walk(self.artifactdir): + for artifact_file in files: + artifact = artifact_pb2.Artifact() + with open(os.path.join(root, artifact_file), 'r+b') as f: + artifact.ParseFromString(f.read()) + + if str(artifact.files): + yield artifact.files + + if str(artifact.buildtree): + yield artifact.buildtree + + # _reachable_digests() # # Returns: - # (bool): whether the push was successful + # (iter): Iterator over single file digests in artifacts # - def _push_artifact(self, element, artifact, remote): + def _reachable_digests(self): + for root, _, files in os.walk(self.artifactdir): + for artifact_file in files: + artifact = artifact_pb2.Artifact() + with open(os.path.join(root, artifact_file), 'r+b') as f: + artifact.ParseFromString(f.read()) - artifact_proto = artifact._get_proto() + if str(artifact.public_data): + yield artifact.public_data - keys = list(utils._deduplicate([artifact_proto.strong_key, artifact_proto.weak_key])) + for log_file in artifact.logs: + yield log_file.digest - # Check whether the artifact is on the server - present = False - for key in keys: - get_artifact = artifact_pb2.GetArtifactRequest() - get_artifact.cache_key = element.get_artifact_name(key) - try: - artifact_service = artifact_pb2_grpc.ArtifactServiceStub(remote.channel) - artifact_service.GetArtifact(get_artifact) - except grpc.RpcError as e: - if e.code() != grpc.StatusCode.NOT_FOUND: - raise ArtifactError("Error checking artifact cache: {}" - .format(e.details())) - else: - present = True - if present: - return False + # _push_artifact_blobs() + # + # Push the blobs that make up an artifact to the remote server. + # + # Args: + # artifact (Artifact): The artifact whose blobs to push. + # remote (CASRemote): The remote to push the blobs to. + # + # Returns: + # (bool) - True if we uploaded anything, False otherwise. + # + # Raises: + # ArtifactError: If we fail to push blobs (*unless* they're + # already there or we run out of space on the server). + # + def _push_artifact_blobs(self, artifact, remote): + artifact_proto = artifact._get_proto() try: self.cas._send_directory(remote, artifact_proto.files) @@ -474,33 +569,68 @@ class ArtifactCache(BaseCache): raise ArtifactError("Failed to push artifact blobs: {}".format(e.details())) return False - # finally need to send the artifact proto + return True + + # _push_artifact_proto() + # + # Pushes the artifact proto to remote. + # + # Args: + # element (Element): The element + # artifact (Artifact): The related artifact being pushed + # remote (ArtifactRemote): Remote to push to + # + # Returns: + # (bool): Whether we pushed the artifact. + # + # Raises: + # ArtifactError: If the push fails for any reason except the + # artifact already existing. + # + def _push_artifact_proto(self, element, artifact, remote): + + artifact_proto = artifact._get_proto() + + keys = list(utils._deduplicate([artifact_proto.strong_key, artifact_proto.weak_key])) + + # Check whether the artifact is on the server for key in keys: - update_artifact = artifact_pb2.UpdateArtifactRequest() - update_artifact.cache_key = element.get_artifact_name(key) - update_artifact.artifact.CopyFrom(artifact_proto) + try: + remote.get_artifact(element.get_artifact_name(key=key)) + except grpc.RpcError as e: + if e.code() != grpc.StatusCode.NOT_FOUND: + raise ArtifactError("Error checking artifact cache: {}" + .format(e.details())) + else: + return False + # If not, we send the artifact proto + for key in keys: try: - artifact_service = artifact_pb2_grpc.ArtifactServiceStub(remote.channel) - artifact_service.UpdateArtifact(update_artifact) + remote.update_artifact(element.get_artifact_name(key=key), artifact_proto) except grpc.RpcError as e: raise ArtifactError("Failed to push artifact: {}".format(e.details())) return True - # _pull_artifact() + # _pull_artifact_storage(): + # + # Pull artifact blobs from the given remote. # # Args: - # element (Element): element to pull - # key (str): specific key of element to pull - # remote (CASRemote): remote to pull from - # pull_buildtree (bool): whether to pull buildtrees or not + # element (Element): element to pull + # key (str): The specific key for the artifact to pull + # remote (CASRemote): remote to pull from + # pull_buildtree (bool): whether to pull buildtrees or not # # Returns: - # (bool): whether the pull was successful + # (bool): True if we pulled any blobs. # - def _pull_artifact(self, element, key, remote, pull_buildtrees=False): - + # Raises: + # ArtifactError: If the pull failed for any reason except the + # blobs not existing on the server. + # + def _pull_artifact_storage(self, element, artifact, remote, pull_buildtrees=False): def __pull_digest(digest): self.cas._fetch_directory(remote, digest) required_blobs = self.cas.required_blobs_for_directory(digest) @@ -508,16 +638,6 @@ class ArtifactCache(BaseCache): if missing_blobs: self.cas.fetch_blobs(remote, missing_blobs) - request = artifact_pb2.GetArtifactRequest() - request.cache_key = element.get_artifact_name(key=key) - try: - artifact_service = artifact_pb2_grpc.ArtifactServiceStub(remote.channel) - artifact = artifact_service.GetArtifact(request) - except grpc.RpcError as e: - if e.code() != grpc.StatusCode.NOT_FOUND: - raise ArtifactError("Failed to pull artifact: {}".format(e.details())) - return False - try: if str(artifact.files): __pull_digest(artifact.files) @@ -538,13 +658,41 @@ class ArtifactCache(BaseCache): raise ArtifactError("Failed to pull artifact: {}".format(e.details())) return False + return True + + # _pull_artifact_proto(): + # + # Pull an artifact proto from a remote server. + # + # Args: + # element (Element): The element whose artifact to pull. + # key (str): The specific key for the artifact to pull. + # remote (ArtifactRemote): The remote to pull from. + # + # Returns: + # (Artifact|None): The artifact proto, or None if the server + # doesn't have it. + # + # Raises: + # ArtifactError: If the pull fails. + # + def _pull_artifact_proto(self, element, key, remote): + artifact_name = element.get_artifact_name(key=key) + + try: + artifact = remote.get_artifact(artifact_name) + except grpc.RpcError as e: + if e.code() != grpc.StatusCode.NOT_FOUND: + raise ArtifactError("Failed to pull artifact: {}".format(e.details())) + return None + # Write the artifact proto to cache - artifact_path = os.path.join(self.artifactdir, request.cache_key) + artifact_path = os.path.join(self.artifactdir, artifact_name) os.makedirs(os.path.dirname(artifact_path), exist_ok=True) with utils.save_file_atomic(artifact_path, mode='wb') as f: f.write(artifact.SerializeToString()) - return True + return artifact # _query_remote() # diff --git a/src/buildstream/_basecache.py b/src/buildstream/_basecache.py index 9ad6c1277..df50bfb62 100644 --- a/src/buildstream/_basecache.py +++ b/src/buildstream/_basecache.py @@ -16,21 +16,23 @@ # Authors: # Raoul Hidalgo Charman <raoul.hidalgocharman@codethink.co.uk> # -import multiprocessing import os from fnmatch import fnmatch +from itertools import chain from typing import TYPE_CHECKING from . import utils from . import _yaml from ._cas import CASRemote from ._message import Message, MessageType -from ._exceptions import LoadError +from ._exceptions import LoadError, RemoteError +from ._remote import RemoteSpec, RemoteType + if TYPE_CHECKING: from typing import Optional, Type from ._exceptions import BstError - from ._cas import CASRemoteSpec + from ._remote import BaseRemote # Base Cache for Caches to derive from @@ -39,19 +41,20 @@ class BaseCache(): # None of these should ever be called in the base class, but this appeases # pylint to some degree - spec_class = None # type: Type[CASRemoteSpec] - spec_name = None # type: str - spec_error = None # type: Type[BstError] - config_node_name = None # type: str - remote_class = CASRemote # type: Type[CASRemote] + spec_name = None # type: str + spec_error = None # type: Type[BstError] + config_node_name = None # type: str + index_remote_class = None # type: Type[BaseRemote] + storage_remote_class = CASRemote # type: Type[BaseRemote] def __init__(self, context): self.context = context self.cas = context.get_cascache() self._remotes_setup = False # Check to prevent double-setup of remotes - # Per-project list of _CASRemote instances. - self._remotes = {} + # Per-project list of Remote instances. + self._storage_remotes = {} + self._index_remotes = {} self.global_remote_specs = [] self.project_remote_specs = {} @@ -65,7 +68,7 @@ class BaseCache(): # against fork() with open gRPC channels. # def has_open_grpc_channels(self): - for project_remotes in self._remotes.values(): + for project_remotes in chain(self._index_remotes.values(), self._storage_remotes.values()): for remote in project_remotes: if remote.channel: return True @@ -77,7 +80,7 @@ class BaseCache(): # def release_resources(self): # Close all remotes and their gRPC channels - for project_remotes in self._remotes.values(): + for project_remotes in chain(self._index_remotes.values(), self._storage_remotes.values()): for remote in project_remotes: remote.close() @@ -86,11 +89,11 @@ class BaseCache(): # Parses the configuration of remote artifact caches from a config block. # # Args: - # config_node (dict): The config block, which may contain the 'artifacts' key + # config_node (dict): The config block, which may contain a key defined by cls.config_node_name # basedir (str): The base directory for relative paths # # Returns: - # A list of ArtifactCacheSpec instances. + # A list of RemoteSpec instances. # # Raises: # LoadError, if the config block contains invalid keys. @@ -106,11 +109,11 @@ class BaseCache(): artifacts = config_node.get_sequence(cls.config_node_name, default=[]) except LoadError: provenance = config_node.get_node(cls.config_node_name).get_provenance() - raise _yaml.LoadError("{}: 'artifacts' must be a single 'url:' mapping, or a list of mappings" - .format(provenance), _yaml.LoadErrorReason.INVALID_DATA) + raise _yaml.LoadError("{}: '{}' must be a single remote mapping, or a list of mappings" + .format(provenance, cls.config_node_name), _yaml.LoadErrorReason.INVALID_DATA) for spec_node in artifacts: - cache_specs.append(cls.spec_class._new_from_config_node(spec_node, basedir)) + cache_specs.append(RemoteSpec.new_from_config_node(spec_node)) return cache_specs @@ -124,7 +127,7 @@ class BaseCache(): # project (Project): The BuildStream project # # Returns: - # A list of ArtifactCacheSpec instances describing the remote artifact caches. + # A list of RemoteSpec instances describing the remote caches. # @classmethod def _configured_remote_cache_specs(cls, context, project): @@ -158,18 +161,26 @@ class BaseCache(): # the user config in some cases (for example `bst artifact push --remote=...`). has_remote_caches = False if remote_url: - # pylint: disable=not-callable - self._set_remotes([self.spec_class(remote_url, push=True)]) + self._set_remotes([RemoteSpec(remote_url, push=True)]) has_remote_caches = True if use_config: for project in self.context.get_projects(): caches = self._configured_remote_cache_specs(self.context, project) - if caches: # caches is a list of spec_class instances + if caches: # caches is a list of RemoteSpec instances self._set_remotes(caches, project=project) has_remote_caches = True if has_remote_caches: self._initialize_remotes() + # Notify remotes that forking is disabled + def notify_fork_disabled(self): + for project in self._index_remotes: + for remote in self._index_remotes[project]: + remote.notify_fork_disabled() + for project in self._storage_remotes: + for remote in self._storage_remotes[project]: + remote.notify_fork_disabled() + # initialize_remotes(): # # This will contact each remote cache. @@ -178,48 +189,31 @@ class BaseCache(): # on_failure (callable): Called if we fail to contact one of the caches. # def initialize_remotes(self, *, on_failure=None): - remote_specs = self.global_remote_specs.copy() - - for project in self.project_remote_specs: - remote_specs.extend(self.project_remote_specs[project]) - - remote_specs = list(utils._deduplicate(remote_specs)) - - remotes = {} - q = multiprocessing.Queue() - for remote_spec in remote_specs: - - error = self.remote_class.check_remote(remote_spec, self.cas, q) - - if error and on_failure: - on_failure(remote_spec.url, error) - continue - elif error: - raise self.spec_error(error) # pylint: disable=not-callable - - self._has_fetch_remotes = True - if remote_spec.push: - self._has_push_remotes = True - - remotes[remote_spec.url] = self.remote_class(remote_spec, self.cas) + index_remotes, storage_remotes = self._create_remote_instances(on_failure=on_failure) + # Assign remote instances to their respective projects for project in self.context.get_projects(): - remote_specs = self.global_remote_specs + # Get the list of specs that should be considered for this + # project + remote_specs = self.global_remote_specs.copy() if project in self.project_remote_specs: - remote_specs = list(utils._deduplicate(remote_specs + self.project_remote_specs[project])) + remote_specs.extend(self.project_remote_specs[project]) - project_remotes = [] + # De-duplicate the list + remote_specs = list(utils._deduplicate(remote_specs)) - for remote_spec in remote_specs: - # Errors are already handled in the loop above, - # skip unreachable remotes here. - if remote_spec.url not in remotes: - continue + def get_remotes(remote_list, remote_specs): + for remote_spec in remote_specs: + # If a remote_spec didn't make it into the remotes + # dict, that means we can't access it, and it has been + # disabled for this session. + if remote_spec not in remote_list: + continue - remote = remotes[remote_spec.url] - project_remotes.append(remote) + yield remote_list[remote_spec] - self._remotes[project] = project_remotes + self._index_remotes[project] = list(get_remotes(index_remotes, remote_specs)) + self._storage_remotes[project] = list(get_remotes(storage_remotes, remote_specs)) # has_fetch_remotes(): # @@ -239,8 +233,9 @@ class BaseCache(): return True else: # Check whether the specified element's project has fetch remotes - remotes_for_project = self._remotes[plugin._get_project()] - return bool(remotes_for_project) + index_remotes = self._index_remotes[plugin._get_project()] + storage_remotes = self._storage_remotes[plugin._get_project()] + return index_remotes and storage_remotes # has_push_remotes(): # @@ -260,13 +255,102 @@ class BaseCache(): return True else: # Check whether the specified element's project has push remotes - remotes_for_project = self._remotes[plugin._get_project()] - return any(remote.spec.push for remote in remotes_for_project) + index_remotes = self._index_remotes[plugin._get_project()] + storage_remotes = self._storage_remotes[plugin._get_project()] + return (any(remote.spec.push for remote in index_remotes) and + any(remote.spec.push for remote in storage_remotes)) ################################################ # Local Private Methods # ################################################ + # _create_remote_instances(): + # + # Create the global set of Remote instances, including + # project-specific and global instances, ensuring that all of them + # are accessible. + # + # Args: + # on_failure (Callable[[self.remote_class,Exception],None]): + # What do do when a remote doesn't respond. + # + # Returns: + # (Dict[RemoteSpec, self.remote_class], Dict[RemoteSpec, + # self.remote_class]) - + # The created remote instances, index first, storage last. + # + def _create_remote_instances(self, *, on_failure=None): + # Create a flat list of all remote specs, global or + # project-specific + remote_specs = self.global_remote_specs.copy() + for project in self.project_remote_specs: + remote_specs.extend(self.project_remote_specs[project]) + + # By de-duplicating it after we flattened the list, we ensure + # that we never instantiate the same remote twice. This + # de-duplication also preserves their order. + remote_specs = list(utils._deduplicate(remote_specs)) + + # Now let's create a dict of this, indexed by their specs, so + # that we can later assign them to the right projects. + index_remotes = {} + storage_remotes = {} + for remote_spec in remote_specs: + try: + index, storage = self._instantiate_remote(remote_spec) + except RemoteError as err: + if on_failure: + on_failure(remote_spec, str(err)) + continue + else: + raise + + # Finally, we can instantiate the remote. Note that + # NamedTuples are hashable, so we can use them as pretty + # low-overhead keys. + if index: + index_remotes[remote_spec] = index + if storage: + storage_remotes[remote_spec] = storage + + self._has_fetch_remotes = storage_remotes and index_remotes + self._has_push_remotes = (any(spec.push for spec in storage_remotes) and + any(spec.push for spec in index_remotes)) + + return index_remotes, storage_remotes + + # _instantiate_remote() + # + # Instantiate a remote given its spec, asserting that it is + # reachable - this may produce two remote instances (a storage and + # an index remote as specified by the class variables). + # + # Args: + # + # remote_spec (RemoteSpec): The spec of the remote to + # instantiate. + # + # Returns: + # + # (Tuple[Remote|None, Remote|None]) - The remotes, index remote + # first, storage remote second. One must always be specified, + # the other may be None. + # + def _instantiate_remote(self, remote_spec): + # Our remotes can be index, storage or both. In either case, + # we need to use a different type of Remote for our calls, so + # we create two objects here + index = None + storage = None + if remote_spec.type in [RemoteType.INDEX, RemoteType.ALL]: + index = self.index_remote_class(remote_spec) # pylint: disable=not-callable + index.check() + if remote_spec.type in [RemoteType.STORAGE, RemoteType.ALL]: + storage = self.storage_remote_class(remote_spec, self.cas) + storage.check() + + return (index, storage) + # _message() # # Local message propagator @@ -298,8 +382,8 @@ class BaseCache(): # reports takes care of messaging # def _initialize_remotes(self): - def remote_failed(url, error): - self._message(MessageType.WARN, "Failed to initialize remote {}: {}".format(url, error)) + def remote_failed(remote, error): + self._message(MessageType.WARN, "Failed to initialize remote {}: {}".format(remote.url, error)) with self.context.messenger.timed_activity("Initializing remote caches", silent_nested=True): self.initialize_remotes(on_failure=remote_failed) diff --git a/src/buildstream/_cas/__init__.py b/src/buildstream/_cas/__init__.py index a88e41371..17e3a3fd9 100644 --- a/src/buildstream/_cas/__init__.py +++ b/src/buildstream/_cas/__init__.py @@ -18,4 +18,4 @@ # Tristan Van Berkom <tristan.vanberkom@codethink.co.uk> from .cascache import CASCache -from .casremote import CASRemote, CASRemoteSpec +from .casremote import CASRemote diff --git a/src/buildstream/_cas/casremote.py b/src/buildstream/_cas/casremote.py index ba0477550..1efed22e6 100644 --- a/src/buildstream/_cas/casremote.py +++ b/src/buildstream/_cas/casremote.py @@ -1,9 +1,3 @@ -from collections import namedtuple -import os -import multiprocessing -import signal -from urllib.parse import urlparse - import grpc from .._protos.google.rpc import code_pb2 @@ -11,61 +5,14 @@ from .._protos.build.bazel.remote.execution.v2 import remote_execution_pb2, remo from .._protos.build.buildgrid import local_cas_pb2 from .._protos.buildstream.v2 import buildstream_pb2, buildstream_pb2_grpc -from .._exceptions import CASRemoteError, LoadError, LoadErrorReason -from .. import _signals -from .. import utils +from .._remote import BaseRemote +from .._exceptions import CASRemoteError # The default limit for gRPC messages is 4 MiB. # Limit payload to 1 MiB to leave sufficient headroom for metadata. _MAX_PAYLOAD_BYTES = 1024 * 1024 -class CASRemoteSpec(namedtuple('CASRemoteSpec', 'url push server_cert client_key client_cert instance_name')): - - # _new_from_config_node - # - # Creates an CASRemoteSpec() from a YAML loaded node - # - @staticmethod - def _new_from_config_node(spec_node, basedir=None): - spec_node.validate_keys(['url', 'push', 'server-cert', 'client-key', 'client-cert', 'instance-name']) - url = spec_node.get_str('url') - push = spec_node.get_bool('push', default=False) - if not url: - provenance = spec_node.get_node('url').get_provenance() - raise LoadError("{}: empty artifact cache URL".format(provenance), LoadErrorReason.INVALID_DATA) - - instance_name = spec_node.get_str('instance-name', default=None) - - server_cert = spec_node.get_str('server-cert', default=None) - if server_cert and basedir: - server_cert = os.path.join(basedir, server_cert) - - client_key = spec_node.get_str('client-key', default=None) - if client_key and basedir: - client_key = os.path.join(basedir, client_key) - - client_cert = spec_node.get_str('client-cert', default=None) - if client_cert and basedir: - client_cert = os.path.join(basedir, client_cert) - - if client_key and not client_cert: - provenance = spec_node.get_node('client-key').get_provenance() - raise LoadError("{}: 'client-key' was specified without 'client-cert'".format(provenance), - LoadErrorReason.INVALID_DATA) - - if client_cert and not client_key: - provenance = spec_node.get_node('client-cert').get_provenance() - raise LoadError("{}: 'client-cert' was specified without 'client-key'".format(provenance), - LoadErrorReason.INVALID_DATA) - - return CASRemoteSpec(url, push, server_cert, client_key, client_cert, instance_name) - - -# Disable type-checking since "Callable[...] has no attributes __defaults__" -CASRemoteSpec.__new__.__defaults__ = (None, None, None, None) # type: ignore - - class BlobNotFound(CASRemoteError): def __init__(self, blob, msg): @@ -75,13 +22,12 @@ class BlobNotFound(CASRemoteError): # Represents a single remote CAS cache. # -class CASRemote(): - def __init__(self, spec, cascache): - self.spec = spec - self._initialized = False +class CASRemote(BaseRemote): + + def __init__(self, spec, cascache, **kwargs): + super().__init__(spec, **kwargs) + self.cascache = cascache - self.channel = None - self.instance_name = None self.cas = None self.ref_storage = None self.batch_update_supported = None @@ -90,157 +36,102 @@ class CASRemote(): self.max_batch_total_size_bytes = None self.local_cas_instance_name = None - def init(self): - if not self._initialized: - server_cert_bytes = None - client_key_bytes = None - client_cert_bytes = None - - url = urlparse(self.spec.url) - if url.scheme == 'http': - port = url.port or 80 - self.channel = grpc.insecure_channel('{}:{}'.format(url.hostname, port)) - elif url.scheme == 'https': - port = url.port or 443 - - if self.spec.server_cert: - with open(self.spec.server_cert, 'rb') as f: - server_cert_bytes = f.read() - - if self.spec.client_key: - with open(self.spec.client_key, 'rb') as f: - client_key_bytes = f.read() - - if self.spec.client_cert: - with open(self.spec.client_cert, 'rb') as f: - client_cert_bytes = f.read() - - credentials = grpc.ssl_channel_credentials(root_certificates=server_cert_bytes, - private_key=client_key_bytes, - certificate_chain=client_cert_bytes) - self.channel = grpc.secure_channel('{}:{}'.format(url.hostname, port), credentials) - else: - raise CASRemoteError("Unsupported URL: {}".format(self.spec.url)) - - self.instance_name = self.spec.instance_name or None - - self.cas = remote_execution_pb2_grpc.ContentAddressableStorageStub(self.channel) - self.capabilities = remote_execution_pb2_grpc.CapabilitiesStub(self.channel) - self.ref_storage = buildstream_pb2_grpc.ReferenceStorageStub(self.channel) - - self.max_batch_total_size_bytes = _MAX_PAYLOAD_BYTES - try: - request = remote_execution_pb2.GetCapabilitiesRequest() - if self.instance_name: - request.instance_name = self.instance_name - response = self.capabilities.GetCapabilities(request) - server_max_batch_total_size_bytes = response.cache_capabilities.max_batch_total_size_bytes - if 0 < server_max_batch_total_size_bytes < self.max_batch_total_size_bytes: - self.max_batch_total_size_bytes = server_max_batch_total_size_bytes - except grpc.RpcError as e: - # Simply use the defaults for servers that don't implement GetCapabilities() - if e.code() != grpc.StatusCode.UNIMPLEMENTED: - raise - - # Check whether the server supports BatchReadBlobs() - self.batch_read_supported = False - try: - request = remote_execution_pb2.BatchReadBlobsRequest() - if self.instance_name: - request.instance_name = self.instance_name - response = self.cas.BatchReadBlobs(request) - self.batch_read_supported = True - except grpc.RpcError as e: - if e.code() != grpc.StatusCode.UNIMPLEMENTED: - raise - - # Check whether the server supports BatchUpdateBlobs() - self.batch_update_supported = False - try: - request = remote_execution_pb2.BatchUpdateBlobsRequest() - if self.instance_name: - request.instance_name = self.instance_name - response = self.cas.BatchUpdateBlobs(request) - self.batch_update_supported = True - except grpc.RpcError as e: - if (e.code() != grpc.StatusCode.UNIMPLEMENTED and - e.code() != grpc.StatusCode.PERMISSION_DENIED): - raise - - local_cas = self.cascache._get_local_cas() - request = local_cas_pb2.GetInstanceNameForRemoteRequest() - request.url = self.spec.url - if self.spec.instance_name: - request.instance_name = self.spec.instance_name - if server_cert_bytes: - request.server_cert = server_cert_bytes - if client_key_bytes: - request.client_key = client_key_bytes - if client_cert_bytes: - request.client_cert = client_cert_bytes - response = local_cas.GetInstanceNameForRemote(request) - self.local_cas_instance_name = response.instance_name - - self._initialized = True - - def __enter__(self): - return self - - def __exit__(self, exc_type, exc_value, traceback): - self.close() - return False - - def close(self): - if self.channel: - self.channel.close() - self.channel = None - # check_remote + # _configure_protocols(): # - # Used when checking whether remote_specs work in the buildstream main - # thread, runs this in a seperate process to avoid creation of gRPC threads - # in the main BuildStream process - # See https://github.com/grpc/grpc/blob/master/doc/fork_support.md for details - @classmethod - def check_remote(cls, remote_spec, cascache, q): - - def __check_remote(): - try: - remote = cls(remote_spec, cascache) - remote.init() - - request = buildstream_pb2.StatusRequest() - response = remote.ref_storage.Status(request) - - if remote_spec.push and not response.allow_updates: - q.put('CAS server does not allow push') - else: - # No error - q.put(None) - - except grpc.RpcError as e: - # str(e) is too verbose for errors reported to the user - q.put(e.details()) + # Configure remote-specific protocols. This method should *never* + # be called outside of init(). + # + def _configure_protocols(self): + self.cas = remote_execution_pb2_grpc.ContentAddressableStorageStub(self.channel) + self.capabilities = remote_execution_pb2_grpc.CapabilitiesStub(self.channel) + self.ref_storage = buildstream_pb2_grpc.ReferenceStorageStub(self.channel) + + # Figure out what batch sizes the server will accept, falling + # back to our _MAX_PAYLOAD_BYTES + self.max_batch_total_size_bytes = _MAX_PAYLOAD_BYTES + try: + request = remote_execution_pb2.GetCapabilitiesRequest() + if self.instance_name: + request.instance_name = self.instance_name + response = self.capabilities.GetCapabilities(request) + server_max_batch_total_size_bytes = response.cache_capabilities.max_batch_total_size_bytes + if 0 < server_max_batch_total_size_bytes < self.max_batch_total_size_bytes: + self.max_batch_total_size_bytes = server_max_batch_total_size_bytes + except grpc.RpcError as e: + # Simply use the defaults for servers that don't implement + # GetCapabilities() + if e.code() != grpc.StatusCode.UNIMPLEMENTED: + raise + + # Check whether the server supports BatchReadBlobs() + self.batch_read_supported = self._check_support( + remote_execution_pb2.BatchReadBlobsRequest, + self.cas.BatchReadBlobs + ) + + # Check whether the server supports BatchUpdateBlobs() + self.batch_update_supported = self._check_support( + remote_execution_pb2.BatchUpdateBlobsRequest, + self.cas.BatchUpdateBlobs + ) + + local_cas = self.cascache._get_local_cas() + request = local_cas_pb2.GetInstanceNameForRemoteRequest() + request.url = self.spec.url + if self.spec.instance_name: + request.instance_name = self.spec.instance_name + if self.server_cert: + request.server_cert = self.server_cert + if self.client_key: + request.client_key = self.client_key + if self.client_cert: + request.client_cert = self.client_cert + response = local_cas.GetInstanceNameForRemote(request) + self.local_cas_instance_name = response.instance_name + + # _check(): + # + # Check if this remote provides everything required for the + # particular kind of remote. This is expected to be called as part + # of check(), and must be called in a non-main process. + # + # Returns: + # (str|None): An error message, or None if no error message. + # + def _check(self): + request = buildstream_pb2.StatusRequest() + response = self.ref_storage.Status(request) - except Exception as e: # pylint: disable=broad-except - # Whatever happens, we need to return it to the calling process - # - q.put(str(e)) + if self.spec.push and not response.allow_updates: + return 'CAS server does not allow push' - p = multiprocessing.Process(target=__check_remote) + return None + # _check_support(): + # + # Figure out if a remote server supports a given method based on + # grpc.StatusCode.UNIMPLEMENTED and grpc.StatusCode.PERMISSION_DENIED. + # + # Args: + # request_type (callable): The type of request to check. + # invoker (callable): The remote method that will be invoked. + # + # Returns: + # (bool) - Whether the request is supported. + # + def _check_support(self, request_type, invoker): try: - # Keep SIGINT blocked in the child process - with _signals.blocked([signal.SIGINT], ignore=False): - p.start() + request = request_type() + if self.instance_name: + request.instance_name = self.instance_name + invoker(request) + return True + except grpc.RpcError as e: + if not e.code() in (grpc.StatusCode.UNIMPLEMENTED, grpc.StatusCode.PERMISSION_DENIED): + raise - error = q.get() - p.join() - except KeyboardInterrupt: - utils._kill_process_tree(p.pid) - raise - - return error + return False # push_message(): # diff --git a/src/buildstream/_cas/casserver.py b/src/buildstream/_cas/casserver.py index ca7a21955..bb011146e 100644 --- a/src/buildstream/_cas/casserver.py +++ b/src/buildstream/_cas/casserver.py @@ -54,9 +54,10 @@ _MAX_PAYLOAD_BYTES = 1024 * 1024 # Args: # repo (str): Path to CAS repository # enable_push (bool): Whether to allow blob uploads and artifact updates +# index_only (bool): Whether to store CAS blobs or only artifacts # @contextmanager -def create_server(repo, *, enable_push, quota): +def create_server(repo, *, enable_push, quota, index_only): cas = CASCache(os.path.abspath(repo), cache_quota=quota, protect_session_blobs=False) try: @@ -67,11 +68,12 @@ def create_server(repo, *, enable_push, quota): max_workers = (os.cpu_count() or 1) * 5 server = grpc.server(futures.ThreadPoolExecutor(max_workers)) - bytestream_pb2_grpc.add_ByteStreamServicer_to_server( - _ByteStreamServicer(cas, enable_push=enable_push), server) + if not index_only: + bytestream_pb2_grpc.add_ByteStreamServicer_to_server( + _ByteStreamServicer(cas, enable_push=enable_push), server) - remote_execution_pb2_grpc.add_ContentAddressableStorageServicer_to_server( - _ContentAddressableStorageServicer(cas, enable_push=enable_push), server) + remote_execution_pb2_grpc.add_ContentAddressableStorageServicer_to_server( + _ContentAddressableStorageServicer(cas, enable_push=enable_push), server) remote_execution_pb2_grpc.add_CapabilitiesServicer_to_server( _CapabilitiesServicer(), server) @@ -80,7 +82,7 @@ def create_server(repo, *, enable_push, quota): _ReferenceStorageServicer(cas, enable_push=enable_push), server) artifact_pb2_grpc.add_ArtifactServiceServicer_to_server( - _ArtifactServicer(cas, artifactdir), server) + _ArtifactServicer(cas, artifactdir, update_cas=not index_only), server) source_pb2_grpc.add_SourceServiceServicer_to_server( _SourceServicer(sourcedir), server) @@ -110,9 +112,12 @@ def create_server(repo, *, enable_push, quota): @click.option('--quota', type=click.INT, help="Maximum disk usage in bytes", default=10e9) +@click.option('--index-only', type=click.BOOL, + help="Only provide the BuildStream artifact and source services (\"index\"), not the CAS (\"storage\")", + default=False) @click.argument('repo') def server_main(repo, port, server_key, server_cert, client_certs, enable_push, - quota): + quota, index_only): # Handle SIGTERM by calling sys.exit(0), which will raise a SystemExit exception, # properly executing cleanup code in `finally` clauses and context managers. # This is required to terminate buildbox-casd on SIGTERM. @@ -120,7 +125,8 @@ def server_main(repo, port, server_key, server_cert, client_certs, enable_push, with create_server(repo, quota=quota, - enable_push=enable_push) as server: + enable_push=enable_push, + index_only=index_only) as server: use_tls = bool(server_key) @@ -434,10 +440,11 @@ class _ReferenceStorageServicer(buildstream_pb2_grpc.ReferenceStorageServicer): class _ArtifactServicer(artifact_pb2_grpc.ArtifactServiceServicer): - def __init__(self, cas, artifactdir): + def __init__(self, cas, artifactdir, *, update_cas=True): super().__init__() self.cas = cas self.artifactdir = artifactdir + self.update_cas = update_cas os.makedirs(artifactdir, exist_ok=True) def GetArtifact(self, request, context): @@ -449,6 +456,20 @@ class _ArtifactServicer(artifact_pb2_grpc.ArtifactServiceServicer): with open(artifact_path, 'rb') as f: artifact.ParseFromString(f.read()) + # Artifact-only servers will not have blobs on their system, + # so we can't reasonably update their mtimes. Instead, we exit + # early, and let the CAS server deal with its blobs. + # + # FIXME: We could try to run FindMissingBlobs on the other + # server. This is tricky to do from here, of course, + # because we don't know who the other server is, but + # the client could be smart about it - but this might + # make things slower. + # + # It needs some more thought... + if not self.update_cas: + return artifact + # Now update mtimes of files present. try: @@ -481,16 +502,17 @@ class _ArtifactServicer(artifact_pb2_grpc.ArtifactServiceServicer): def UpdateArtifact(self, request, context): artifact = request.artifact - # Check that the files specified are in the CAS - self._check_directory("files", artifact.files, context) + if self.update_cas: + # Check that the files specified are in the CAS + self._check_directory("files", artifact.files, context) - # Unset protocol buffers don't evaluated to False but do return empty - # strings, hence str() - if str(artifact.public_data): - self._check_file("public data", artifact.public_data, context) + # Unset protocol buffers don't evaluated to False but do return empty + # strings, hence str() + if str(artifact.public_data): + self._check_file("public data", artifact.public_data, context) - for log_file in artifact.logs: - self._check_file("log digest", log_file.digest, context) + for log_file in artifact.logs: + self._check_file("log digest", log_file.digest, context) # Add the artifact proto to the cas artifact_path = os.path.join(self.artifactdir, request.cache_key) diff --git a/src/buildstream/_exceptions.py b/src/buildstream/_exceptions.py index 648742dbb..947b83149 100644 --- a/src/buildstream/_exceptions.py +++ b/src/buildstream/_exceptions.py @@ -96,6 +96,7 @@ class ErrorDomain(Enum): VIRTUAL_FS = 13 CAS = 14 PROG_NOT_FOUND = 15 + REMOTE = 16 # BstError is an internal base exception class for BuildStream @@ -290,6 +291,15 @@ class ArtifactError(BstError): super().__init__(message, detail=detail, domain=ErrorDomain.ARTIFACT, reason=reason, temporary=True) +# RemoteError +# +# Raised when errors are encountered in Remotes +# +class RemoteError(BstError): + def __init__(self, message, *, detail=None, reason=None): + super().__init__(message, detail=detail, domain=ErrorDomain.REMOTE, reason=reason) + + # CASError # # Raised when errors are encountered in the CAS diff --git a/src/buildstream/_remote.py b/src/buildstream/_remote.py new file mode 100644 index 000000000..75c626c47 --- /dev/null +++ b/src/buildstream/_remote.py @@ -0,0 +1,294 @@ +# +# Copyright (C) 2019 Bloomberg Finance LP +# +# This program is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License as published by the Free Software Foundation; either +# version 2 of the License, or (at your option) any later version. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with this library. If not, see <http://www.gnu.org/licenses/>. +# + +import multiprocessing +import os +import signal +from collections import namedtuple +from urllib.parse import urlparse + +import grpc + +from . import _signals +from . import utils +from ._exceptions import LoadError, LoadErrorReason, ImplError, RemoteError +from ._protos.google.bytestream import bytestream_pb2_grpc +from .types import FastEnum + + +# RemoteType(): +# +# Defines the different types of remote. +# +class RemoteType(FastEnum): + INDEX = "index" + STORAGE = "storage" + ALL = "all" + + def __str__(self): + return self.name.lower().replace('_', '-') + + +# RemoteSpec(): +# +# Defines the basic structure of a remote specification. +# +class RemoteSpec(namedtuple('RemoteSpec', 'url push server_cert client_key client_cert instance_name type')): + + # new_from_config_node + # + # Creates a RemoteSpec() from a YAML loaded node. + # + # Args: + # spec_node (MappingNode): The configuration node describing the spec. + # basedir (str): The base directory from which to find certificates. + # + # Returns: + # (RemoteSpec) - The described RemoteSpec instance. + # + # Raises: + # LoadError: If the node is malformed. + # + @classmethod + def new_from_config_node(cls, spec_node, basedir=None): + spec_node.validate_keys(['url', 'push', 'server-cert', 'client-key', 'client-cert', 'instance-name', 'type']) + + url = spec_node.get_str('url') + if not url: + provenance = spec_node.get_node('url').get_provenance() + raise LoadError("{}: empty artifact cache URL".format(provenance), LoadErrorReason.INVALID_DATA) + + push = spec_node.get_bool('push', default=False) + instance_name = spec_node.get_str('instance-name', default=None) + + def parse_cert(key): + cert = spec_node.get_str(key, default=None) + if cert and basedir: + cert = os.path.join(basedir, cert) + return cert + + cert_keys = ('server-cert', 'client-key', 'client-cert') + server_cert, client_key, client_cert = tuple(parse_cert(key) for key in cert_keys) + + if client_key and not client_cert: + provenance = spec_node.get_node('client-key').get_provenance() + raise LoadError("{}: 'client-key' was specified without 'client-cert'".format(provenance), + LoadErrorReason.INVALID_DATA) + + if client_cert and not client_key: + provenance = spec_node.get_node('client-cert').get_provenance() + raise LoadError("{}: 'client-cert' was specified without 'client-key'".format(provenance), + LoadErrorReason.INVALID_DATA) + + type_ = spec_node.get_enum('type', RemoteType, default=RemoteType.ALL) + + return cls(url, push, server_cert, client_key, client_cert, instance_name, type_) + + +# FIXME: This can be made much nicer in python 3.7 through the use of +# defaults - or hell, by replacing it all with a typing.NamedTuple +# +# Note that defaults are specified from the right, and ommitted values +# are considered mandatory. +# +# Disable type-checking since "Callable[...] has no attributes __defaults__" +RemoteSpec.__new__.__defaults__ = ( # type: ignore + # mandatory # url - The url of the remote + # mandatory # push - Whether the remote should be used for pushing + None, # server_cert - The server certificate + None, # client_key - The (private) client key + None, # client_cert - The (public) client certificate + None, # instance_name - The (grpc) instance name of the remote + RemoteType.ALL # type - The type of the remote (index, storage, both) +) + + +# BaseRemote(): +# +# Provides the basic functionality required to set up remote +# interaction via GRPC. In particular, this will set up a +# grpc.insecure_channel, or a grpc.secure_channel, based on the given +# spec. +# +# Customization for the particular protocol is expected to be +# performed in children. +# +class BaseRemote(): + key_name = None + + def __init__(self, spec): + self.spec = spec + self._initialized = False + + self.bytestream = None + self.channel = None + + self.server_cert = None + self.client_key = None + self.client_cert = None + + self.instance_name = spec.instance_name + self.push = spec.push + self.url = spec.url + + # init(): + # + # Initialize the given remote. This function must be called before + # any communication is performed, since such will otherwise fail. + # + def init(self): + if self._initialized: + return + + # Set up the communcation channel + url = urlparse(self.spec.url) + if url.scheme == 'http': + port = url.port or 80 + self.channel = grpc.insecure_channel('{}:{}'.format(url.hostname, port)) + elif url.scheme == 'https': + port = url.port or 443 + try: + server_cert, client_key, client_cert = _read_files( + self.spec.server_cert, + self.spec.client_key, + self.spec.client_cert) + except FileNotFoundError as e: + raise RemoteError("Could not read certificates: {}".format(e)) from e + self.server_cert = server_cert + self.client_key = client_key + self.client_cert = client_cert + credentials = grpc.ssl_channel_credentials(root_certificates=self.server_cert, + private_key=self.client_key, + certificate_chain=self.client_cert) + self.channel = grpc.secure_channel('{}:{}'.format(url.hostname, port), credentials) + else: + raise RemoteError("Unsupported URL: {}".format(self.spec.url)) + + # Set up the bytestream on our channel + self.bytestream = bytestream_pb2_grpc.ByteStreamStub(self.channel) + + self._configure_protocols() + + self._initialized = True + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_value, traceback): + self.close() + return False + + def close(self): + if self.channel: + self.channel.close() + self.channel = None + + # _configure_protocols(): + # + # An abstract method to configure remote-specific protocols. This + # is *not* done as super().init() because we want to be able to + # set self._initialized *after* initialization completes in the + # parent class. + # + # This method should *never* be called outside of init(). + # + def _configure_protocols(self): + raise ImplError("An implementation of a Remote must configure its protocols.") + + # check(): + # + # Check if the remote is functional and has all the required + # capabilities. This should be used somewhat like an assertion, + # expecting a RemoteError. + # + # Note that this method runs the calls on a separate process, so + # that we can use grpc calls even if we are on the main process. + # + # Raises: + # RemoteError: If the grpc call fails. + # + def check(self): + queue = multiprocessing.Queue() + + def __check_remote(): + try: + self.init() + queue.put(self._check()) + + except grpc.RpcError as e: + # str(e) is too verbose for errors reported to the user + queue.put(e.details()) + + except Exception as e: # pylint: disable=broad-except + # Whatever happens, we need to return it to the calling process + # + queue.put(str(e)) + + process = multiprocessing.Process(target=__check_remote) + + try: + # Keep SIGINT blocked in the child process + with _signals.blocked([signal.SIGINT], ignore=False): + process.start() + + error = queue.get() + process.join() + except KeyboardInterrupt: + utils._kill_process_tree(process.pid) + raise + finally: + # Should not be necessary, but let's avoid keeping them + # alive too long + queue.close() + + if error: + raise RemoteError(error) + + # _check(): + # + # Check if this remote provides everything required for the + # particular kind of remote. This is expected to be called as part + # of check(), and must be called in a non-main process. + # + # Returns: + # (str|None): An error message, or None if no error message. + # + def _check(self): + return None + + def __str__(self): + return self.url + + +# _read_files(): +# +# A helper method to read a bunch of files, ignoring any input +# arguments that are None. +# +# Args: +# files (Iterable[str|None]): A list of files to read. Nones are passed back. +# +# Returns: +# Generator[str|None, None, None] - Strings read from those files. +# +def _read_files(*files): + def read_file(f): + if f: + with open(f, 'rb') as data: + return data.read() + return None + return (read_file(f) for f in files) diff --git a/src/buildstream/_sourcecache.py b/src/buildstream/_sourcecache.py index 64498ba32..76a2e4f39 100644 --- a/src/buildstream/_sourcecache.py +++ b/src/buildstream/_sourcecache.py @@ -20,7 +20,7 @@ import os import grpc -from ._cas import CASRemote, CASRemoteSpec +from ._remote import BaseRemote from .storage._casbaseddirectory import CasBasedDirectory from ._basecache import BaseCache from ._exceptions import CASError, CASRemoteError, SourceCacheError @@ -29,53 +29,75 @@ from ._protos.buildstream.v2 import buildstream_pb2, buildstream_pb2_grpc, \ source_pb2, source_pb2_grpc -# Holds configuration for a remote used for the source cache. -# -# Args: -# url (str): Location of the remote source cache -# push (bool): Whether we should attempt to push sources to this cache, -# in addition to pulling from it. -# instance-name (str): Name if any, of instance of server -# -class SourceCacheSpec(CASRemoteSpec): - pass - +class SourceRemote(BaseRemote): -class SourceRemote(CASRemote): - def __init__(self, *args): - super().__init__(*args) - self.capabilities_service = None + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) self.source_service = None - def init(self): - if not self._initialized: - super().init() - - self.capabilities_service = buildstream_pb2_grpc.CapabilitiesStub(self.channel) + def _configure_protocols(self): + capabilities_service = buildstream_pb2_grpc.CapabilitiesStub(self.channel) + # check that the service supports sources + try: + request = buildstream_pb2.GetCapabilitiesRequest() + if self.instance_name: + request.instance_name = self.instance_name - # check that the service supports sources - try: - request = buildstream_pb2.GetCapabilitiesRequest() - if self.instance_name: - request.instance_name = self.instance_name - - response = self.capabilities_service.GetCapabilities(request) - except grpc.RpcError as e: - # Check if this remote has the artifact service - if e.code() == grpc.StatusCode.UNIMPLEMENTED: - raise SourceCacheError( - "Configured remote does not have the BuildStream " - "capabilities service. Please check remote configuration.") - # Else raise exception with details + response = capabilities_service.GetCapabilities(request) + except grpc.RpcError as e: + # Check if this remote has the artifact service + if e.code() == grpc.StatusCode.UNIMPLEMENTED: raise SourceCacheError( - "Remote initialisation failed: {}".format(e.details())) + "Configured remote does not have the BuildStream " + "capabilities service. Please check remote configuration.") + # Else raise exception with details + raise SourceCacheError( + "Remote initialisation failed: {}".format(e.details())) - if not response.source_capabilities: - raise SourceCacheError( - "Configured remote does not support source service") + if not response.source_capabilities: + raise SourceCacheError( + "Configured remote does not support source service") - # set up source service - self.source_service = source_pb2_grpc.SourceServiceStub(self.channel) + # set up source service + self.source_service = source_pb2_grpc.SourceServiceStub(self.channel) + + # get_source(): + # + # Get a source proto for a given source_ref from the remote. + # + # Args: + # source_ref (str): The source ref of the source to pull. + # + # Returns: + # (Source): The source proto + # + # Raises: + # grpc.RpcError: If something goes wrong during the request. + # + def get_source(self, source_ref): + request = source_pb2.GetSourceRequest() + request.cache_key = source_ref + return self.source_service.GetSource(request) + + # update_source(): + # + # Update the source on the remote. + # + # Args: + # source_ref (str): The source ref of the source to update. + # source (Source): The proto to update with. + # + # Returns: + # (bool): Whether the update was successful. + # + # Raises: + # grpc.RpcError: If something goes wrong during the request. + # + def update_source(self, source_ref, source): + request = source_pb2.UpdateSourceRequest() + request.cache_key = source_ref + request.source.CopyFrom(source) + return self.source_service.UpdateSource(request) # Class that keeps config of remotes and deals with caching of sources. @@ -85,11 +107,10 @@ class SourceRemote(CASRemote): # class SourceCache(BaseCache): - spec_class = SourceCacheSpec spec_name = "source_cache_specs" spec_error = SourceCacheError config_node_name = "source-caches" - remote_class = SourceRemote + index_remote_class = SourceRemote def __init__(self, context): super().__init__(context) @@ -183,39 +204,53 @@ class SourceCache(BaseCache): # (bool): True if pull successful, False if not def pull(self, source): ref = source._get_source_name() - project = source._get_project() - display_key = source._get_brief_display_key() - for remote in self._remotes[project]: + index_remotes = self._index_remotes[project] + storage_remotes = self._storage_remotes[project] + + # First fetch the source proto so we know what to pull + source_proto = None + for remote in index_remotes: try: - source.status("Pulling source {} <- {}".format(display_key, remote.spec.url)) + remote.init() + source.status("Pulling source {} <- {}".format(display_key, remote)) - # fetch source proto - response = self._pull_source(ref, remote) - if response is None: + source_proto = self._pull_source(ref, remote) + if source_proto is None: source.info("Remote source service ({}) does not have source {} cached".format( - remote.spec.url, display_key)) + remote, display_key)) continue + except CASError as e: + raise SourceCacheError("Failed to pull source {}: {}".format( + display_key, e)) from e + + if not source_proto: + return False + + for remote in storage_remotes: + try: + remote.init() + source.status("Pulling data for source {} <- {}".format(display_key, remote)) # Fetch source blobs - self.cas._fetch_directory(remote, response.files) - required_blobs = self.cas.required_blobs_for_directory(response.files) + self.cas._fetch_directory(remote, source_proto.files) + required_blobs = self.cas.required_blobs_for_directory(source_proto.files) missing_blobs = self.cas.local_missing_blobs(required_blobs) missing_blobs = self.cas.fetch_blobs(remote, missing_blobs) if missing_blobs: source.info("Remote cas ({}) does not have source {} cached".format( - remote.spec.url, display_key)) + remote, display_key)) continue - source.info("Pulled source {} <- {}".format(display_key, remote.spec.url)) + source.info("Pulled source {} <- {}".format(display_key, remote)) return True - except CASError as e: raise SourceCacheError("Failed to pull source {}: {}".format( display_key, e)) from e + return False # push() @@ -232,41 +267,48 @@ class SourceCache(BaseCache): ref = source._get_source_name() project = source._get_project() + index_remotes = [] + storage_remotes = [] + # find configured push remotes for this source if self._has_push_remotes: - push_remotes = [r for r in self._remotes[project] if r.spec.push] - else: - push_remotes = [] + index_remotes = [r for r in self._index_remotes[project] if r.push] + storage_remotes = [r for r in self._storage_remotes[project] if r.push] - pushed = False + pushed_storage = False + pushed_index = False display_key = source._get_brief_display_key() - for remote in push_remotes: + for remote in storage_remotes: remote.init() - source.status("Pushing source {} -> {}".format(display_key, remote.spec.url)) - - # check whether cache has files already - if self._pull_source(ref, remote) is not None: - source.info("Remote ({}) already has source {} cached" - .format(remote.spec.url, display_key)) - continue + source.status("Pushing data for source {} -> {}".format(display_key, remote)) - # push files to storage source_proto = self._get_source(ref) try: self.cas._send_directory(remote, source_proto.files) + pushed_storage = True except CASRemoteError: - source.info("Failed to push source files {} -> {}".format(display_key, remote.spec.url)) + source.info("Failed to push source files {} -> {}".format(display_key, remote)) + continue + + for remote in index_remotes: + remote.init() + source.status("Pushing source {} -> {}".format(display_key, remote)) + + # check whether cache has files already + if self._pull_source(ref, remote) is not None: + source.info("Remote ({}) already has source {} cached" + .format(remote, display_key)) continue if not self._push_source(ref, remote): - source.info("Failed to push source metadata {} -> {}".format(display_key, remote.spec.url)) + source.info("Failed to push source metadata {} -> {}".format(display_key, remote)) continue - source.info("Pushed source {} -> {}".format(display_key, remote.spec.url)) - pushed = True + source.info("Pushed source {} -> {}".format(display_key, remote)) + pushed_index = True - return pushed + return pushed_index and pushed_storage def _remove_source(self, ref, *, defer_prune=False): return self.cas.remove(ref, basedir=self.sourcerefdir, defer_prune=defer_prune) @@ -315,14 +357,8 @@ class SourceCache(BaseCache): def _pull_source(self, source_ref, remote): try: remote.init() - - request = source_pb2.GetSourceRequest() - request.cache_key = source_ref - - response = remote.source_service.GetSource(request) - + response = remote.get_source(source_ref) self._store_proto(response, source_ref) - return response except grpc.RpcError as e: @@ -333,14 +369,8 @@ class SourceCache(BaseCache): def _push_source(self, source_ref, remote): try: remote.init() - source_proto = self._get_source(source_ref) - - request = source_pb2.UpdateSourceRequest() - request.cache_key = source_ref - request.source.CopyFrom(source_proto) - - return remote.source_service.UpdateSource(request) + return remote.update_source(source_ref, source_proto) except grpc.RpcError as e: if e.code() != grpc.StatusCode.RESOURCE_EXHAUSTED: diff --git a/src/buildstream/sandbox/_sandboxremote.py b/src/buildstream/sandbox/_sandboxremote.py index cf034dbf9..678b11c32 100644 --- a/src/buildstream/sandbox/_sandboxremote.py +++ b/src/buildstream/sandbox/_sandboxremote.py @@ -37,7 +37,8 @@ from .._protos.google.rpc import code_pb2 from .._exceptions import BstError, SandboxError from .. import _yaml from .._protos.google.longrunning import operations_pb2, operations_pb2_grpc -from .._cas import CASRemote, CASRemoteSpec +from .._cas import CASRemote +from .._remote import RemoteSpec class RemoteExecutionSpec(namedtuple('RemoteExecutionSpec', 'exec_service storage_service action_service')): @@ -98,11 +99,11 @@ class SandboxRemote(Sandbox): self.exec_instance = config.exec_service.get('instance-name', None) self.storage_instance = config.storage_service.get('instance-name', None) - self.storage_remote_spec = CASRemoteSpec(self.storage_url, push=True, - server_cert=config.storage_service.get('server-cert'), - client_key=config.storage_service.get('client-key'), - client_cert=config.storage_service.get('client-cert'), - instance_name=self.storage_instance) + self.storage_remote_spec = RemoteSpec(self.storage_url, push=True, + server_cert=config.storage_service.get('server-cert'), + client_key=config.storage_service.get('client-key'), + client_cert=config.storage_service.get('client-cert'), + instance_name=self.storage_instance) self.operation_name = None def info(self, msg): diff --git a/tests/artifactcache/config.py b/tests/artifactcache/config.py index 08d6f74bb..8b01a9ebe 100644 --- a/tests/artifactcache/config.py +++ b/tests/artifactcache/config.py @@ -6,7 +6,8 @@ import os import pytest -from buildstream._artifactcache import ArtifactCacheSpec, ArtifactCache +from buildstream._remote import RemoteSpec, RemoteType +from buildstream._artifactcache import ArtifactCache from buildstream._project import Project from buildstream.utils import _deduplicate from buildstream import _yaml @@ -18,17 +19,33 @@ from tests.testutils import dummy_context DATA_DIR = os.path.dirname(os.path.realpath(__file__)) -cache1 = ArtifactCacheSpec(url='https://example.com/cache1', push=True) -cache2 = ArtifactCacheSpec(url='https://example.com/cache2', push=False) -cache3 = ArtifactCacheSpec(url='https://example.com/cache3', push=False) -cache4 = ArtifactCacheSpec(url='https://example.com/cache4', push=False) -cache5 = ArtifactCacheSpec(url='https://example.com/cache5', push=False) -cache6 = ArtifactCacheSpec(url='https://example.com/cache6', push=True) +cache1 = RemoteSpec(url='https://example.com/cache1', push=True) +cache2 = RemoteSpec(url='https://example.com/cache2', push=False) +cache3 = RemoteSpec(url='https://example.com/cache3', push=False) +cache4 = RemoteSpec(url='https://example.com/cache4', push=False) +cache5 = RemoteSpec(url='https://example.com/cache5', push=False) +cache6 = RemoteSpec(url='https://example.com/cache6', + push=True, + type=RemoteType.ALL) +cache7 = RemoteSpec(url='https://index.example.com/cache1', + push=True, + type=RemoteType.INDEX) +cache8 = RemoteSpec(url='https://storage.example.com/cache1', + push=True, + type=RemoteType.STORAGE) # Generate cache configuration fragments for the user config and project config files. # -def configure_remote_caches(override_caches, project_caches=None, user_caches=None): +def configure_remote_caches(override_caches, + project_caches=None, + user_caches=None): + type_strings = { + RemoteType.INDEX: 'index', + RemoteType.STORAGE: 'storage', + RemoteType.ALL: 'all' + } + if project_caches is None: project_caches = [] @@ -40,10 +57,15 @@ def configure_remote_caches(override_caches, project_caches=None, user_caches=No user_config['artifacts'] = { 'url': user_caches[0].url, 'push': user_caches[0].push, + 'type': type_strings[user_caches[0].type] } elif len(user_caches) > 1: user_config['artifacts'] = [ - {'url': cache.url, 'push': cache.push} for cache in user_caches + { + 'url': cache.url, + 'push': cache.push, + 'type': type_strings[cache.type] + } for cache in user_caches ] if len(override_caches) == 1: @@ -52,6 +74,7 @@ def configure_remote_caches(override_caches, project_caches=None, user_caches=No 'artifacts': { 'url': override_caches[0].url, 'push': override_caches[0].push, + 'type': type_strings[override_caches[0].type] } } } @@ -59,7 +82,11 @@ def configure_remote_caches(override_caches, project_caches=None, user_caches=No user_config['projects'] = { 'test': { 'artifacts': [ - {'url': cache.url, 'push': cache.push} for cache in override_caches + { + 'url': cache.url, + 'push': cache.push, + 'type': type_strings[cache.type] + } for cache in override_caches ] } } @@ -71,12 +98,17 @@ def configure_remote_caches(override_caches, project_caches=None, user_caches=No 'artifacts': { 'url': project_caches[0].url, 'push': project_caches[0].push, + 'type': type_strings[project_caches[0].type], } }) elif len(project_caches) > 1: project_config.update({ 'artifacts': [ - {'url': cache.url, 'push': cache.push} for cache in project_caches + { + 'url': cache.url, + 'push': cache.push, + 'type': type_strings[cache.type] + } for cache in project_caches ] }) @@ -95,6 +127,7 @@ def configure_remote_caches(override_caches, project_caches=None, user_caches=No pytest.param([cache1], [cache2], [cache3], id='project-override-in-user-config'), pytest.param([cache1, cache2], [cache3, cache4], [cache5, cache6], id='list-order'), pytest.param([cache1, cache2, cache1], [cache2], [cache2, cache1], id='duplicates'), + pytest.param([cache7, cache8], [], [cache1], id='split-caches') ]) def test_artifact_cache_precedence(tmpdir, override_caches, project_caches, user_caches): # Produce a fake user and project config with the cache configuration. @@ -148,3 +181,36 @@ def test_missing_certs(cli, datafiles, config_key, config_value): # This does not happen for a simple `bst show`. result = cli.run(project=project, args=['artifact', 'pull', 'element.bst']) result.assert_main_error(ErrorDomain.LOAD, LoadErrorReason.INVALID_DATA) + + +# Assert that BuildStream complains when someone attempts to define +# only one type of storage. +@pytest.mark.datafiles(DATA_DIR) +@pytest.mark.parametrize( + 'override_caches, project_caches, user_caches', + [ + # The leftmost cache is the highest priority one in all cases here. + pytest.param([], [], [cache7], id='index-user'), + pytest.param([], [], [cache8], id='storage-user'), + pytest.param([], [cache7], [], id='index-project'), + pytest.param([], [cache8], [], id='storage-project'), + pytest.param([cache7], [], [], id='index-override'), + pytest.param([cache8], [], [], id='storage-override'), + ]) +def test_only_one(cli, datafiles, override_caches, project_caches, user_caches): + project = os.path.join(datafiles.dirname, datafiles.basename, 'only-one') + + # Produce a fake user and project config with the cache configuration. + user_config, project_config = configure_remote_caches(override_caches, project_caches, user_caches) + project_config['name'] = 'test' + + cli.configure(user_config) + + project_config_file = os.path.join(project, 'project.conf') + _yaml.roundtrip_dump(project_config, file=project_config_file) + + # Use `pull` here to ensure we try to initialize the remotes, triggering the error + # + # This does not happen for a simple `bst show`. + result = cli.run(project=project, args=['artifact', 'pull', 'element.bst']) + result.assert_main_error(ErrorDomain.STREAM, None) diff --git a/tests/artifactcache/only-one/element.bst b/tests/artifactcache/only-one/element.bst new file mode 100644 index 000000000..3c29b4ea1 --- /dev/null +++ b/tests/artifactcache/only-one/element.bst @@ -0,0 +1 @@ +kind: autotools diff --git a/tests/artifactcache/pull.py b/tests/artifactcache/pull.py index 71db3e338..2e33af3ac 100644 --- a/tests/artifactcache/pull.py +++ b/tests/artifactcache/pull.py @@ -66,7 +66,7 @@ def test_pull(cli, tmpdir, datafiles): # Assert that we are now cached locally assert cli.get_element_state(project_dir, 'target.bst') == 'cached' # Assert that we shared/pushed the cached artifact - assert share.has_artifact(cli.get_artifact_name(project_dir, 'test', 'target.bst')) + assert share.get_artifact(cli.get_artifact_name(project_dir, 'test', 'target.bst')) # Delete the artifact locally cli.remove_artifact_from_cache(project_dir, 'target.bst') @@ -138,7 +138,7 @@ def test_pull_tree(cli, tmpdir, datafiles): # Assert that we are now cached locally assert cli.get_element_state(project_dir, 'target.bst') == 'cached' # Assert that we shared/pushed the cached artifact - assert share.has_artifact(cli.get_artifact_name(project_dir, 'test', 'target.bst')) + assert share.get_artifact(cli.get_artifact_name(project_dir, 'test', 'target.bst')) with dummy_context(config=user_config_file) as context: # Load the project and CAS cache diff --git a/tests/artifactcache/push.py b/tests/artifactcache/push.py index 20d9ccfec..8b00d0fb7 100644 --- a/tests/artifactcache/push.py +++ b/tests/artifactcache/push.py @@ -10,7 +10,7 @@ from buildstream._project import Project from buildstream._protos.build.bazel.remote.execution.v2 import remote_execution_pb2 from buildstream.testing import cli # pylint: disable=unused-import -from tests.testutils import create_artifact_share, dummy_context +from tests.testutils import create_artifact_share, create_split_share, dummy_context # Project directory @@ -20,6 +20,39 @@ DATA_DIR = os.path.join( ) +# Push the given element and return its artifact key for assertions. +def _push(cli, cache_dir, project_dir, config_file, target): + with dummy_context(config=config_file) as context: + # Load the project manually + project = Project(project_dir, context) + project.ensure_fully_loaded() + + # Assert that the element's artifact is cached + element = project.load_elements(['target.bst'])[0] + element_key = cli.get_element_key(project_dir, 'target.bst') + assert cli.artifact.is_cached(cache_dir, element, element_key) + + # Create a local artifact cache handle + artifactcache = context.artifactcache + + # Ensure the element's artifact memeber is initialised + # This is duplicated from Pipeline.resolve_elements() + # as this test does not use the cli frontend. + for e in element.dependencies(Scope.ALL): + # Determine initial element state. + e._update_state() + + # Manually setup the CAS remotes + artifactcache.setup_remotes(use_config=True) + artifactcache.initialize_remotes() + + assert artifactcache.has_push_remotes(plugin=element), \ + "No remote configured for element target.bst" + assert element._push(), "Push operation failed" + + return element_key + + @pytest.mark.in_subprocess @pytest.mark.datafiles(DATA_DIR) def test_push(cli, tmpdir, datafiles): @@ -50,36 +83,52 @@ def test_push(cli, tmpdir, datafiles): # Write down the user configuration file _yaml.roundtrip_dump(user_config, file=user_config_file) + element_key = _push(cli, rootcache_dir, project_dir, user_config_file, 'target.bst') + assert share.get_artifact(cli.get_artifact_name(project_dir, 'test', 'target.bst', cache_key=element_key)) - with dummy_context(config=user_config_file) as context: - # Load the project manually - project = Project(project_dir, context) - project.ensure_fully_loaded() - # Assert that the element's artifact is cached - element = project.load_elements(['target.bst'])[0] - element_key = cli.get_element_key(project_dir, 'target.bst') - assert cli.artifact.is_cached(rootcache_dir, element, element_key) +@pytest.mark.in_subprocess +@pytest.mark.datafiles(DATA_DIR) +def test_push_split(cli, tmpdir, datafiles): + project_dir = str(datafiles) - # Create a local artifact cache handle - artifactcache = context.artifactcache + # First build the project without the artifact cache configured + result = cli.run(project=project_dir, args=['build', 'target.bst']) + result.assert_success() - # Ensure the element's artifact memeber is initialised - # This is duplicated from Pipeline.resolve_elements() - # as this test does not use the cli frontend. - for e in element.dependencies(Scope.ALL): - # Determine initial element state. - e._update_state() + # Assert that we are now cached locally + assert cli.get_element_state(project_dir, 'target.bst') == 'cached' - # Manually setup the CAS remotes - artifactcache.setup_remotes(use_config=True) - artifactcache.initialize_remotes() + indexshare = os.path.join(str(tmpdir), 'indexshare') + storageshare = os.path.join(str(tmpdir), 'storageshare') - assert artifactcache.has_push_remotes(plugin=element), \ - "No remote configured for element target.bst" - assert element._push(), "Push operation failed" + # Set up an artifact cache. + with create_split_share(indexshare, storageshare) as (index, storage): + rootcache_dir = os.path.join(str(tmpdir), 'cache') + user_config = { + 'scheduler': { + 'pushers': 1 + }, + 'artifacts': [{ + 'url': index.repo, + 'push': True, + 'type': 'index' + }, { + 'url': storage.repo, + 'push': True, + 'type': 'storage' + }], + 'cachedir': rootcache_dir + } + config_path = str(tmpdir.join('buildstream.conf')) + _yaml.roundtrip_dump(user_config, file=config_path) - assert share.has_artifact(cli.get_artifact_name(project_dir, 'test', 'target.bst', cache_key=element_key)) + element_key = _push(cli, rootcache_dir, project_dir, config_path, 'target.bst') + proto = index.get_artifact_proto(cli.get_artifact_name(project_dir, + 'test', + 'target.bst', + cache_key=element_key)) + assert storage.get_cas_files(proto) is not None @pytest.mark.in_subprocess @@ -88,7 +137,8 @@ def test_push_message(tmpdir, datafiles): project_dir = str(datafiles) # Set up an artifact cache. - with create_artifact_share(os.path.join(str(tmpdir), 'artifactshare')) as share: + artifactshare = os.path.join(str(tmpdir), 'artifactshare') + with create_artifact_share(artifactshare) as share: # Configure artifact share rootcache_dir = os.path.join(str(tmpdir), 'cache') user_config_file = str(tmpdir.join('buildstream.conf')) diff --git a/tests/frontend/artifact_delete.py b/tests/frontend/artifact_delete.py index 80870c81a..a9f5ec6da 100644 --- a/tests/frontend/artifact_delete.py +++ b/tests/frontend/artifact_delete.py @@ -159,7 +159,7 @@ def test_artifact_delete_pulled_artifact_without_buildtree(cli, tmpdir, datafile result.assert_success() # Make sure it's in the share - assert remote.has_artifact(cli.get_artifact_name(project, 'test', element)) + assert remote.get_artifact(cli.get_artifact_name(project, 'test', element)) # Delete and then pull the artifact (without its buildtree) result = cli.run(project=project, args=['artifact', 'delete', element]) diff --git a/tests/frontend/artifact_show.py b/tests/frontend/artifact_show.py index 913dde9c8..76ea93d63 100644 --- a/tests/frontend/artifact_show.py +++ b/tests/frontend/artifact_show.py @@ -124,7 +124,7 @@ def test_artifact_show_element_available_remotely(cli, tmpdir, datafiles): result.assert_success() # Make sure it's in the share - assert remote.has_artifact(cli.get_artifact_name(project, 'test', element)) + assert remote.get_artifact(cli.get_artifact_name(project, 'test', element)) # Delete the artifact from the local cache result = cli.run(project=project, args=['artifact', 'delete', element]) diff --git a/tests/frontend/push.py b/tests/frontend/push.py index 4f0fa3c19..31f96cbdf 100644 --- a/tests/frontend/push.py +++ b/tests/frontend/push.py @@ -464,7 +464,16 @@ def test_artifact_too_large(cli, datafiles, tmpdir): # Create and try to push a 6MB element. create_element_size('large_element.bst', project, element_path, [], int(6e6)) result = cli.run(project=project, args=['build', 'large_element.bst']) - result.assert_success() + # This should fail; the server will refuse to store the CAS + # blobs for the artifact, and then fail to find the files for + # the uploaded artifact proto. + # + # FIXME: This should be extremely uncommon in practice, since + # the artifact needs to be at least half the cache size for + # this to happen. Nonetheless, a nicer error message would be + # nice (perhaps we should just disallow uploading artifacts + # that large). + result.assert_main_error(ErrorDomain.STREAM, None) # Ensure that the small artifact is still in the share states = cli.get_element_states(project, ['small_element.bst', 'large_element.bst']) @@ -560,7 +569,7 @@ def test_push_cross_junction(cli, tmpdir, datafiles): cli.run(project=project, args=['artifact', 'push', 'junction.bst:import-etc.bst']) cache_key = cli.get_element_key(project, 'junction.bst:import-etc.bst') - assert share.has_artifact(cli.get_artifact_name(project, 'subtest', 'import-etc.bst', cache_key=cache_key)) + assert share.get_artifact(cli.get_artifact_name(project, 'subtest', 'import-etc.bst', cache_key=cache_key)) @pytest.mark.datafiles(DATA_DIR) diff --git a/tests/integration/artifact.py b/tests/integration/artifact.py index 4180bf6fd..59b7bfaad 100644 --- a/tests/integration/artifact.py +++ b/tests/integration/artifact.py @@ -70,12 +70,12 @@ def test_cache_buildtrees(cli, tmpdir, datafiles): result = cli.run(project=project, args=['build', element_name]) assert result.exit_code == 0 assert cli.get_element_state(project, element_name) == 'cached' - assert share1.has_artifact(cli.get_artifact_name(project, 'test', element_name)) + assert share1.get_artifact(cli.get_artifact_name(project, 'test', element_name)) # The buildtree dir should not exist, as we set the config to not cache buildtrees. artifact_name = cli.get_artifact_name(project, 'test', element_name) - assert share1.has_artifact(artifact_name) + assert share1.get_artifact(artifact_name) with cli.artifact.extract_buildtree(cwd, cwd, artifact_name) as buildtreedir: assert not buildtreedir @@ -111,7 +111,7 @@ def test_cache_buildtrees(cli, tmpdir, datafiles): result = cli.run(project=project, args=['--cache-buildtrees', 'always', 'build', element_name]) assert result.exit_code == 0 assert cli.get_element_state(project, element_name) == 'cached' - assert share2.has_artifact(cli.get_artifact_name(project, 'test', element_name)) + assert share2.get_artifact(cli.get_artifact_name(project, 'test', element_name)) # Cache key will be the same however the digest hash will have changed as expected, so reconstruct paths with cli.artifact.extract_buildtree(cwd, cwd, artifact_name) as buildtreedir: diff --git a/tests/integration/cachedfail.py b/tests/integration/cachedfail.py index f8dd52aa6..08733ab71 100644 --- a/tests/integration/cachedfail.py +++ b/tests/integration/cachedfail.py @@ -180,7 +180,7 @@ def test_push_cached_fail(cli, tmpdir, datafiles, on_error): # This element should have failed assert cli.get_element_state(project, 'element.bst') == 'failed' # This element should have been pushed to the remote - assert share.has_artifact(cli.get_artifact_name(project, 'test', 'element.bst')) + assert share.get_artifact(cli.get_artifact_name(project, 'test', 'element.bst')) @pytest.mark.skipif(HAVE_SANDBOX != 'bwrap', reason='Only available with bubblewrap on Linux') diff --git a/tests/integration/pullbuildtrees.py b/tests/integration/pullbuildtrees.py index af9186b1b..4251aa6d0 100644 --- a/tests/integration/pullbuildtrees.py +++ b/tests/integration/pullbuildtrees.py @@ -56,7 +56,7 @@ def test_pullbuildtrees(cli2, tmpdir, datafiles): result = cli2.run(project=project, args=['build', element_name]) assert result.exit_code == 0 assert cli2.get_element_state(project, element_name) == 'cached' - assert share1.has_artifact(cli2.get_artifact_name(project, 'test', element_name)) + assert share1.get_artifact(cli2.get_artifact_name(project, 'test', element_name)) default_state(cli2, tmpdir, share1) # Pull artifact with default config, assert that pulling again @@ -116,7 +116,7 @@ def test_pullbuildtrees(cli2, tmpdir, datafiles): cli2.configure({'artifacts': {'url': share2.repo, 'push': True}}) result = cli2.run(project=project, args=['artifact', 'push', element_name]) assert element_name not in result.get_pushed_elements() - assert not share2.has_artifact(cli2.get_artifact_name(project, 'test', element_name)) + assert not share2.get_artifact(cli2.get_artifact_name(project, 'test', element_name)) # Assert that after pulling the missing buildtree the element artifact can be # successfully pushed to the remote. This will attempt to pull the buildtree @@ -127,7 +127,7 @@ def test_pullbuildtrees(cli2, tmpdir, datafiles): cli2.configure({'artifacts': {'url': share2.repo, 'push': True}}) result = cli2.run(project=project, args=['artifact', 'push', element_name]) assert element_name in result.get_pushed_elements() - assert share2.has_artifact(cli2.get_artifact_name(project, 'test', element_name)) + assert share2.get_artifact(cli2.get_artifact_name(project, 'test', element_name)) default_state(cli2, tmpdir, share1) # Assert that bst artifact push will automatically attempt to pull a missing buildtree @@ -143,7 +143,7 @@ def test_pullbuildtrees(cli2, tmpdir, datafiles): with cli2.artifact.extract_buildtree(cwd, cwd, artifact_name) as buildtreedir: assert not buildtreedir assert element_name not in result.get_pushed_elements() - assert not share3.has_artifact(cli2.get_artifact_name(project, 'test', element_name)) + assert not share3.get_artifact(cli2.get_artifact_name(project, 'test', element_name)) # Assert that if we add an extra remote that has the buildtree artfact cached, bst artifact push will # automatically attempt to pull it and will be successful, leading to the full artifact being pushed @@ -156,7 +156,7 @@ def test_pullbuildtrees(cli2, tmpdir, datafiles): with cli2.artifact.extract_buildtree(cwd, cwd, artifact_name) as buildtreedir: assert os.path.isdir(buildtreedir) assert element_name in result.get_pushed_elements() - assert share3.has_artifact(cli2.get_artifact_name(project, 'test', element_name)) + assert share3.get_artifact(cli2.get_artifact_name(project, 'test', element_name)) # Ensure that only valid pull-buildtrees boolean options make it through the loading diff --git a/tests/integration/shellbuildtrees.py b/tests/integration/shellbuildtrees.py index 81a279479..146bc6062 100644 --- a/tests/integration/shellbuildtrees.py +++ b/tests/integration/shellbuildtrees.py @@ -239,7 +239,7 @@ def test_buildtree_options(cli, tmpdir, datafiles): result = cli.run(project=project, args=['--cache-buildtrees', 'always', 'build', element_name]) result.assert_success() assert cli.get_element_state(project, element_name) == 'cached' - assert share.has_artifact(cli.get_artifact_name(project, 'test', element_name)) + assert share.get_artifact(cli.get_artifact_name(project, 'test', element_name)) # Discard the cache shutil.rmtree(str(os.path.join(str(tmpdir), 'cache', 'cas'))) diff --git a/tests/sourcecache/push.py b/tests/sourcecache/push.py index ad9653f9d..1be2d40cd 100644 --- a/tests/sourcecache/push.py +++ b/tests/sourcecache/push.py @@ -21,6 +21,8 @@ # pylint: disable=redefined-outer-name import os import shutil +from contextlib import contextmanager, ExitStack + import pytest from buildstream._exceptions import ErrorDomain @@ -38,6 +40,82 @@ def message_handler(message, is_silenced): pass +# Args: +# tmpdir: A temporary directory to use as root. +# directories: Directory names to use as cache directories. +# +@contextmanager +def _configure_caches(tmpdir, *directories): + with ExitStack() as stack: + def create_share(directory): + return create_artifact_share(os.path.join(str(tmpdir), directory)) + + yield (stack.enter_context(create_share(remote)) for remote in directories) + + +@pytest.mark.datafiles(DATA_DIR) +def test_source_push_split(cli, tmpdir, datafiles): + cache_dir = os.path.join(str(tmpdir), 'cache') + project_dir = str(datafiles) + + with _configure_caches(tmpdir, 'indexshare', 'storageshare') as (index, storage): + user_config_file = str(tmpdir.join('buildstream.conf')) + user_config = { + 'scheduler': { + 'pushers': 1 + }, + 'source-caches': [{ + 'url': index.repo, + 'push': True, + 'type': 'index' + }, { + 'url': storage.repo, + 'push': True, + 'type': 'storage' + }], + 'cachedir': cache_dir + } + _yaml.roundtrip_dump(user_config, file=user_config_file) + cli.configure(user_config) + + repo = create_repo('git', str(tmpdir)) + ref = repo.create(os.path.join(project_dir, 'files')) + element_path = os.path.join(project_dir, 'elements') + element_name = 'push.bst' + element = { + 'kind': 'import', + 'sources': [repo.source_config(ref=ref)] + } + _yaml.roundtrip_dump(element, os.path.join(element_path, element_name)) + + # get the source object + with dummy_context(config=user_config_file) as context: + project = Project(project_dir, context) + project.ensure_fully_loaded() + + element = project.load_elements(['push.bst'])[0] + assert not element._source_cached() + source = list(element.sources())[0] + + # check we don't have it in the current cache + cas = context.get_cascache() + assert not cas.contains(source._get_source_name()) + + # build the element, this should fetch and then push the source to the + # remote + res = cli.run(project=project_dir, args=['build', 'push.bst']) + res.assert_success() + assert "Pushed source" in res.stderr + + # check that we've got the remote locally now + sourcecache = context.sourcecache + assert sourcecache.contains(source) + + # check that the remote CAS now has it + digest = sourcecache.export(source)._get_digest() + assert storage.has_object(digest) + + @pytest.mark.datafiles(DATA_DIR) def test_source_push(cli, tmpdir, datafiles): cache_dir = os.path.join(str(tmpdir), 'cache') diff --git a/tests/testutils/__init__.py b/tests/testutils/__init__.py index 25fa6d763..9642ddf47 100644 --- a/tests/testutils/__init__.py +++ b/tests/testutils/__init__.py @@ -23,7 +23,7 @@ # William Salmon <will.salmon@codethink.co.uk> # -from .artifactshare import create_artifact_share, assert_shared, assert_not_shared +from .artifactshare import create_artifact_share, create_split_share, assert_shared, assert_not_shared from .context import dummy_context from .element_generators import create_element_size, update_element_size from .junction import generate_junction diff --git a/tests/testutils/artifactshare.py b/tests/testutils/artifactshare.py index 7d5faeb66..f883b3d59 100644 --- a/tests/testutils/artifactshare.py +++ b/tests/testutils/artifactshare.py @@ -25,7 +25,7 @@ from buildstream._protos.buildstream.v2 import artifact_pb2 # class ArtifactShare(): - def __init__(self, directory, *, quota=None, casd=False): + def __init__(self, directory, *, quota=None, casd=False, index_only=False): # The working directory for the artifact share (in case it # needs to do something outside of its backend's storage folder). @@ -45,6 +45,7 @@ class ArtifactShare(): self.cas = CASCache(self.repodir, casd=casd) self.quota = quota + self.index_only = index_only q = Queue() @@ -72,7 +73,8 @@ class ArtifactShare(): try: with create_server(self.repodir, quota=self.quota, - enable_push=True) as server: + enable_push=True, + index_only=self.index_only) as server: port = server.add_insecure_port('localhost:0') server.start() @@ -104,17 +106,7 @@ class ArtifactShare(): return os.path.exists(object_path) - # has_artifact(): - # - # Checks whether the artifact is present in the share - # - # Args: - # artifact_name (str): The composed complete artifact name - # - # Returns: - # (str): artifact digest if the artifact exists in the share, otherwise None. - def has_artifact(self, artifact_name): - + def get_artifact_proto(self, artifact_name): artifact_proto = artifact_pb2.Artifact() artifact_path = os.path.join(self.artifactdir, artifact_name) @@ -124,6 +116,10 @@ class ArtifactShare(): except FileNotFoundError: return None + return artifact_proto + + def get_cas_files(self, artifact_proto): + reachable = set() def reachable_dir(digest): @@ -153,6 +149,21 @@ class ArtifactShare(): except FileNotFoundError: return None + # has_artifact(): + # + # Checks whether the artifact is present in the share + # + # Args: + # artifact_name (str): The composed complete artifact name + # + # Returns: + # (ArtifactProto): artifact digest if the artifact exists in the share, otherwise None. + def get_artifact(self, artifact_name): + artifact_proto = self.get_artifact_proto(artifact_name) + if not artifact_proto: + return None + return self.get_cas_files(artifact_proto) + # close(): # # Remove the artifact share. @@ -179,13 +190,25 @@ def create_artifact_share(directory, *, quota=None, casd=False): share.close() +@contextmanager +def create_split_share(directory1, directory2, *, quota=None, casd=False): + index = ArtifactShare(directory1, quota=quota, casd=casd, index_only=True) + storage = ArtifactShare(directory2, quota=quota, casd=casd) + + try: + yield index, storage + finally: + index.close() + storage.close() + + statvfs_result = namedtuple('statvfs_result', 'f_blocks f_bfree f_bsize f_bavail') # Assert that a given artifact is in the share # def assert_shared(cli, share, project, element_name, *, project_name='test'): - if not share.has_artifact(cli.get_artifact_name(project, project_name, element_name)): + if not share.get_artifact(cli.get_artifact_name(project, project_name, element_name)): raise AssertionError("Artifact share at {} does not contain the expected element {}" .format(share.repo, element_name)) @@ -193,6 +216,6 @@ def assert_shared(cli, share, project, element_name, *, project_name='test'): # Assert that a given artifact is not in the share # def assert_not_shared(cli, share, project, element_name, *, project_name='test'): - if share.has_artifact(cli.get_artifact_name(project, project_name, element_name)): + if share.get_artifact(cli.get_artifact_name(project, project_name, element_name)): raise AssertionError("Artifact share at {} unexpectedly contains the element {}" .format(share.repo, element_name)) |