summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorbst-marge-bot <marge-bot@buildstream.build>2019-09-06 16:03:09 +0000
committerbst-marge-bot <marge-bot@buildstream.build>2019-09-06 16:03:09 +0000
commit20a920f4fd3641b43e714b6c9a9d57a7095c66c5 (patch)
tree3b2cab77021916fb37fdb7eb60001bc8f8bd2608
parent92fedf8b7fffefb76fe9648d99ac684477b3dbd9 (diff)
parentf40212206bb3ee3f772f9a816476d9cb10c46fca (diff)
downloadbuildstream-20a920f4fd3641b43e714b6c9a9d57a7095c66c5.tar.gz
Merge branch 'tlater/cache-endpoints' into 'master'
Support separate end points for artifact caches Closes #1041 See merge request BuildStream/buildstream!1540
-rw-r--r--doc/source/format_project.rst43
-rw-r--r--doc/source/using_configuring_cache_server.rst16
-rw-r--r--src/buildstream/_artifactcache.py380
-rw-r--r--src/buildstream/_basecache.py208
-rw-r--r--src/buildstream/_cas/__init__.py2
-rw-r--r--src/buildstream/_cas/casremote.py301
-rw-r--r--src/buildstream/_cas/casserver.py56
-rw-r--r--src/buildstream/_exceptions.py10
-rw-r--r--src/buildstream/_remote.py294
-rw-r--r--src/buildstream/_sourcecache.py206
-rw-r--r--src/buildstream/sandbox/_sandboxremote.py13
-rw-r--r--tests/artifactcache/config.py88
-rw-r--r--tests/artifactcache/only-one/element.bst1
-rw-r--r--tests/artifactcache/pull.py4
-rw-r--r--tests/artifactcache/push.py100
-rw-r--r--tests/frontend/artifact_delete.py2
-rw-r--r--tests/frontend/artifact_show.py2
-rw-r--r--tests/frontend/push.py13
-rw-r--r--tests/integration/artifact.py6
-rw-r--r--tests/integration/cachedfail.py2
-rw-r--r--tests/integration/pullbuildtrees.py10
-rw-r--r--tests/integration/shellbuildtrees.py2
-rw-r--r--tests/sourcecache/push.py78
-rw-r--r--tests/testutils/__init__.py2
-rw-r--r--tests/testutils/artifactshare.py53
25 files changed, 1329 insertions, 563 deletions
diff --git a/doc/source/format_project.rst b/doc/source/format_project.rst
index d33a4974f..c4988527a 100644
--- a/doc/source/format_project.rst
+++ b/doc/source/format_project.rst
@@ -225,6 +225,44 @@ and keys, please see: :ref:`Key pair for the server <server_authentication>`.
new server API. As a result newer buildstream clients won't work with older
servers.
+.. _project_essentials_split_artifacts:
+
+Split cache servers
+~~~~~~~~~~~~~~~~~~~
+
+Should you need to configure an artifact cache to work with a CAS
+server that does not support BuildStream's artifact format, you can
+"split" that cache and run an artifacts-only server separately. The
+format for that is as such:
+
+.. code:: yaml
+
+ #
+ # Artifacts
+ #
+ artifacts:
+ # A remote cache from which to download prebuilt artifacts
+ - url: https://storage.foo.com:11001
+ server-cert: server.crt
+ # "storage" remotes store the artifact contents only - this can
+ # be a normal CAS implementation such as those provided by
+ # Buildbarn or Bazel Buildfarm
+ type: storage
+ - url: https://index.foo.com:11001
+ server-cert: server.crt
+ # "index" remotes store only artifact metadata. This is
+ # currently only provided by the bst-artifact-server and BuildGrid
+ type: index
+ # A remote cache from which to upload/download built/prebuilt artifacts
+ - url: https://foo.com:11002
+ push: true
+ server-cert: server.crt
+ client-cert: client.crt
+ client-key: client.key
+ # Caches that support both can omit the type, or set it to "both" -
+ # currently, also only supported by bst-artifact-server and BuildGrid
+ type: both
+
.. _project_source_cache:
Source cache server
@@ -247,6 +285,11 @@ Exactly the same as artifact servers, source cache servers can be specified.
client-cert: client.crt
client-key: client.key
+.. note::
+
+ Source caches also support "splitting" like :ref:`artifact servers
+ <project_essentials_split_artifacts>`.
+
.. _project_remote_execution:
Remote execution
diff --git a/doc/source/using_configuring_cache_server.rst b/doc/source/using_configuring_cache_server.rst
index 856046f35..d31a6661c 100644
--- a/doc/source/using_configuring_cache_server.rst
+++ b/doc/source/using_configuring_cache_server.rst
@@ -159,6 +159,22 @@ Instance with push and requiring client authentication:
bst-artifact-server --port 11002 --server-key server.key --server-cert server.crt --client-certs authorized.crt --enable-push /home/artifacts/artifacts
+.. note::
+
+ BuildStream's artifact cache is an extension of `Google's Remote
+ Execution CAS server
+ <https://github.com/bazelbuild/remote-apis/>`_.
+
+ Sometimes, when using Remote Execution, it is useful to run
+ BuildStream with just a basic CAS server, without using the
+ artifact extensions, but BuildStream still needs to store these to
+ work correctly.
+
+ For this scenario, you can add the `--index-only` flag to the above
+ commands, and configure BuildStream to store artifact metadata and
+ files in a separate caches (e.g. bst-artifact-server and Buildbarn)
+ using :ref:`"types" <project_essentials_split_artifacts>`.
+
Managing the cache with systemd
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
diff --git a/src/buildstream/_artifactcache.py b/src/buildstream/_artifactcache.py
index 3357f986a..0e2eb1091 100644
--- a/src/buildstream/_artifactcache.py
+++ b/src/buildstream/_artifactcache.py
@@ -25,58 +25,89 @@ from ._exceptions import ArtifactError, CASError, CASCacheError, CASRemoteError
from ._protos.buildstream.v2 import buildstream_pb2, buildstream_pb2_grpc, \
artifact_pb2, artifact_pb2_grpc
-from ._cas import CASRemoteSpec, CASRemote
+from ._remote import BaseRemote
from .storage._casbaseddirectory import CasBasedDirectory
from ._artifact import Artifact
from . import utils
-# An ArtifactCacheSpec holds the user configuration for a single remote
-# artifact cache.
+# ArtifactRemote():
#
-# Args:
-# url (str): Location of the remote artifact cache
-# push (bool): Whether we should attempt to push artifacts to this cache,
-# in addition to pulling from it.
+# Facilitates communication with the BuildStream-specific part of
+# artifact remotes.
#
-class ArtifactCacheSpec(CASRemoteSpec):
- pass
+class ArtifactRemote(BaseRemote):
+ # _configure_protocols():
+ #
+ # Configure the protocols used by this remote as part of the
+ # remote initialization; Note that this should only be used in
+ # Remote.init(), and is expected to fail when called by itself.
+ #
+ def _configure_protocols(self):
+ # Add artifact stub
+ capabilities_service = buildstream_pb2_grpc.CapabilitiesStub(self.channel)
+ # Check whether the server supports newer proto based artifact.
+ try:
+ request = buildstream_pb2.GetCapabilitiesRequest()
+ if self.instance_name:
+ request.instance_name = self.instance_name
+ response = capabilities_service.GetCapabilities(request)
+ except grpc.RpcError as e:
+ # Check if this remote has the artifact service
+ if e.code() == grpc.StatusCode.UNIMPLEMENTED:
+ raise ArtifactError(
+ "Configured remote does not have the BuildStream "
+ "capabilities service. Please check remote configuration.")
+ # Else raise exception with details
+ raise ArtifactError(
+ "Remote initialisation failed: {}".format(e.details()))
-# ArtifactRemote extends CASRemote to check during initialisation that there is
-# an artifact service
-class ArtifactRemote(CASRemote):
- def __init__(self, *args):
- super().__init__(*args)
- self.capabilities_service = None
+ if not response.artifact_capabilities:
+ raise ArtifactError(
+ "Configured remote does not support artifact service")
- def init(self):
- if not self._initialized:
- # do default initialisation
- super().init()
+ # get_artifact():
+ #
+ # Get an artifact proto for a given cache key from the remote.
+ #
+ # Args:
+ # cache_key (str): The artifact cache key. NOTE: This "key"
+ # is actually the ref/name and its name in
+ # the protocol is inaccurate. You have been warned.
+ #
+ # Returns:
+ # (Artifact): The artifact proto
+ #
+ # Raises:
+ # grpc.RpcError: If someting goes wrong during the request.
+ #
+ def get_artifact(self, cache_key):
+ artifact_request = artifact_pb2.GetArtifactRequest()
+ artifact_request.cache_key = cache_key
- # Add artifact stub
- self.capabilities_service = buildstream_pb2_grpc.CapabilitiesStub(self.channel)
+ artifact_service = artifact_pb2_grpc.ArtifactServiceStub(self.channel)
+ return artifact_service.GetArtifact(artifact_request)
- # Check whether the server supports newer proto based artifact.
- try:
- request = buildstream_pb2.GetCapabilitiesRequest()
- if self.instance_name:
- request.instance_name = self.instance_name
- response = self.capabilities_service.GetCapabilities(request)
- except grpc.RpcError as e:
- # Check if this remote has the artifact service
- if e.code() == grpc.StatusCode.UNIMPLEMENTED:
- raise ArtifactError(
- "Configured remote does not have the BuildStream "
- "capabilities service. Please check remote configuration.")
- # Else raise exception with details
- raise ArtifactError(
- "Remote initialisation failed: {}".format(e.details()))
+ # update_artifact():
+ #
+ # Update an artifact with the given cache key on the remote with
+ # the given proto.
+ #
+ # Args:
+ # cache_key (str): The artifact cache key of the artifact to update.
+ # artifact (ArtifactProto): The artifact proto to send.
+ #
+ # Raises:
+ # grpc.RpcError: If someting goes wrong during the request.
+ #
+ def update_artifact(self, cache_key, artifact):
+ update_request = artifact_pb2.UpdateArtifactRequest()
+ update_request.cache_key = cache_key
+ update_request.artifact.CopyFrom(artifact)
- if not response.artifact_capabilities:
- raise ArtifactError(
- "Configured remote does not support artifact service")
+ artifact_service = artifact_pb2_grpc.ArtifactServiceStub(self.channel)
+ artifact_service.UpdateArtifact(update_request)
# An ArtifactCache manages artifacts.
@@ -86,11 +117,10 @@ class ArtifactRemote(CASRemote):
#
class ArtifactCache(BaseCache):
- spec_class = ArtifactCacheSpec
spec_name = "artifact_cache_specs"
spec_error = ArtifactError
config_node_name = "artifacts"
- remote_class = ArtifactRemote
+ index_remote_class = ArtifactRemote
def __init__(self, context):
super().__init__(context)
@@ -198,22 +228,35 @@ class ArtifactCache(BaseCache):
#
def push(self, element, artifact):
project = element._get_project()
+ display_key = element._get_brief_display_key()
- push_remotes = [r for r in self._remotes[project] if r.spec.push]
+ index_remotes = [r for r in self._index_remotes[project] if r.push]
+ storage_remotes = [r for r in self._storage_remotes[project] if r.push]
pushed = False
+ # First push our files to all storage remotes, so that they
+ # can perform file checks on their end
+ for remote in storage_remotes:
+ remote.init()
+ element.status("Pushing data from artifact {} -> {}".format(display_key, remote))
- for remote in push_remotes:
+ if self._push_artifact_blobs(artifact, remote):
+ element.info("Pushed data from artifact {} -> {}".format(display_key, remote))
+ else:
+ element.info("Remote ({}) already has all data of artifact {} cached".format(
+ remote, element._get_brief_display_key()
+ ))
+
+ for remote in index_remotes:
remote.init()
- display_key = element._get_brief_display_key()
- element.status("Pushing artifact {} -> {}".format(display_key, remote.spec.url))
+ element.status("Pushing artifact {} -> {}".format(display_key, remote))
- if self._push_artifact(element, artifact, remote):
- element.info("Pushed artifact {} -> {}".format(display_key, remote.spec.url))
+ if self._push_artifact_proto(element, artifact, remote):
+ element.info("Pushed artifact {} -> {}".format(display_key, remote))
pushed = True
else:
element.info("Remote ({}) already has artifact {} cached".format(
- remote.spec.url, element._get_brief_display_key()
+ remote, element._get_brief_display_key()
))
return pushed
@@ -231,26 +274,59 @@ class ArtifactCache(BaseCache):
# (bool): True if pull was successful, False if artifact was not available
#
def pull(self, element, key, *, pull_buildtrees=False):
+ artifact = None
display_key = key[:self.context.log_key_length]
project = element._get_project()
- for remote in self._remotes[project]:
+ errors = []
+ # Start by pulling our artifact proto, so that we know which
+ # blobs to pull
+ for remote in self._index_remotes[project]:
remote.init()
try:
- element.status("Pulling artifact {} <- {}".format(display_key, remote.spec.url))
-
- if self._pull_artifact(element, key, remote, pull_buildtrees=pull_buildtrees):
- element.info("Pulled artifact {} <- {}".format(display_key, remote.spec.url))
- # no need to pull from additional remotes
- return True
+ element.status("Pulling artifact {} <- {}".format(display_key, remote))
+ artifact = self._pull_artifact_proto(element, key, remote)
+ if artifact:
+ element.info("Pulled artifact {} <- {}".format(display_key, remote))
+ break
else:
element.info("Remote ({}) does not have artifact {} cached".format(
- remote.spec.url, display_key
+ remote, display_key
))
+ except CASError as e:
+ element.warn("Could not pull from remote {}: {}".format(remote, e))
+ errors.append(e)
+
+ if errors and not artifact:
+ raise ArtifactError("Failed to pull artifact {}".format(display_key),
+ detail="\n".join(str(e) for e in errors))
+
+ # If we don't have an artifact, we can't exactly pull our
+ # artifact
+ if not artifact:
+ return False
+
+ errors = []
+ # If we do, we can pull it!
+ for remote in self._storage_remotes[project]:
+ remote.init()
+ try:
+ element.status("Pulling data for artifact {} <- {}".format(display_key, remote))
+
+ if self._pull_artifact_storage(element, artifact, remote, pull_buildtrees=pull_buildtrees):
+ element.info("Pulled data for artifact {} <- {}".format(display_key, remote))
+ return True
+ element.info("Remote ({}) does not have artifact {} cached".format(
+ remote, display_key
+ ))
except CASError as e:
- raise ArtifactError("Failed to pull artifact {}: {}".format(
- display_key, e)) from e
+ element.warn("Could not pull from remote {}: {}".format(remote, e))
+ errors.append(e)
+
+ if errors:
+ raise ArtifactError("Failed to pull artifact {}".format(display_key),
+ detail="\n".join(str(e) for e in errors))
return False
@@ -264,7 +340,7 @@ class ArtifactCache(BaseCache):
# digest (Digest): The digest of the tree
#
def pull_tree(self, project, digest):
- for remote in self._remotes[project]:
+ for remote in self._storage_remotes[project]:
digest = self.cas.pull_tree(remote, digest)
if digest:
@@ -287,7 +363,7 @@ class ArtifactCache(BaseCache):
def push_message(self, project, message):
if self._has_push_remotes:
- push_remotes = [r for r in self._remotes[project] if r.spec.push]
+ push_remotes = [r for r in self._storage_remotes[project] if r.spec.push]
else:
push_remotes = []
@@ -341,7 +417,7 @@ class ArtifactCache(BaseCache):
# missing_blobs (list): The Digests of the blobs to fetch
#
def fetch_missing_blobs(self, project, missing_blobs):
- for remote in self._remotes[project]:
+ for remote in self._index_remotes[project]:
if not missing_blobs:
break
@@ -368,7 +444,7 @@ class ArtifactCache(BaseCache):
if not missing_blobs:
return []
- push_remotes = [r for r in self._remotes[project] if r.spec.push]
+ push_remotes = [r for r in self._storage_remotes[project] if r.spec.push]
remote_missing_blobs_list = []
@@ -395,12 +471,12 @@ class ArtifactCache(BaseCache):
#
def check_remotes_for_element(self, element):
# If there are no remotes
- if not self._remotes:
+ if not self._index_remotes:
return False
project = element._get_project()
ref = element.get_artifact_name()
- for remote in self._remotes[project]:
+ for remote in self._index_remotes[project]:
remote.init()
if self._query_remote(ref, remote):
@@ -412,40 +488,59 @@ class ArtifactCache(BaseCache):
# Local Private Methods #
################################################
- # _push_artifact()
+ # _reachable_directories()
#
- # Pushes relevant directories and then artifact proto to remote.
+ # Returns:
+ # (iter): Iterator over directories digests available from artifacts.
#
- # Args:
- # element (Element): The element
- # artifact (Artifact): The related artifact being pushed
- # remote (CASRemote): Remote to push to
+ def _reachable_directories(self):
+ for root, _, files in os.walk(self.artifactdir):
+ for artifact_file in files:
+ artifact = artifact_pb2.Artifact()
+ with open(os.path.join(root, artifact_file), 'r+b') as f:
+ artifact.ParseFromString(f.read())
+
+ if str(artifact.files):
+ yield artifact.files
+
+ if str(artifact.buildtree):
+ yield artifact.buildtree
+
+ # _reachable_digests()
#
# Returns:
- # (bool): whether the push was successful
+ # (iter): Iterator over single file digests in artifacts
#
- def _push_artifact(self, element, artifact, remote):
+ def _reachable_digests(self):
+ for root, _, files in os.walk(self.artifactdir):
+ for artifact_file in files:
+ artifact = artifact_pb2.Artifact()
+ with open(os.path.join(root, artifact_file), 'r+b') as f:
+ artifact.ParseFromString(f.read())
- artifact_proto = artifact._get_proto()
+ if str(artifact.public_data):
+ yield artifact.public_data
- keys = list(utils._deduplicate([artifact_proto.strong_key, artifact_proto.weak_key]))
+ for log_file in artifact.logs:
+ yield log_file.digest
- # Check whether the artifact is on the server
- present = False
- for key in keys:
- get_artifact = artifact_pb2.GetArtifactRequest()
- get_artifact.cache_key = element.get_artifact_name(key)
- try:
- artifact_service = artifact_pb2_grpc.ArtifactServiceStub(remote.channel)
- artifact_service.GetArtifact(get_artifact)
- except grpc.RpcError as e:
- if e.code() != grpc.StatusCode.NOT_FOUND:
- raise ArtifactError("Error checking artifact cache: {}"
- .format(e.details()))
- else:
- present = True
- if present:
- return False
+ # _push_artifact_blobs()
+ #
+ # Push the blobs that make up an artifact to the remote server.
+ #
+ # Args:
+ # artifact (Artifact): The artifact whose blobs to push.
+ # remote (CASRemote): The remote to push the blobs to.
+ #
+ # Returns:
+ # (bool) - True if we uploaded anything, False otherwise.
+ #
+ # Raises:
+ # ArtifactError: If we fail to push blobs (*unless* they're
+ # already there or we run out of space on the server).
+ #
+ def _push_artifact_blobs(self, artifact, remote):
+ artifact_proto = artifact._get_proto()
try:
self.cas._send_directory(remote, artifact_proto.files)
@@ -474,33 +569,68 @@ class ArtifactCache(BaseCache):
raise ArtifactError("Failed to push artifact blobs: {}".format(e.details()))
return False
- # finally need to send the artifact proto
+ return True
+
+ # _push_artifact_proto()
+ #
+ # Pushes the artifact proto to remote.
+ #
+ # Args:
+ # element (Element): The element
+ # artifact (Artifact): The related artifact being pushed
+ # remote (ArtifactRemote): Remote to push to
+ #
+ # Returns:
+ # (bool): Whether we pushed the artifact.
+ #
+ # Raises:
+ # ArtifactError: If the push fails for any reason except the
+ # artifact already existing.
+ #
+ def _push_artifact_proto(self, element, artifact, remote):
+
+ artifact_proto = artifact._get_proto()
+
+ keys = list(utils._deduplicate([artifact_proto.strong_key, artifact_proto.weak_key]))
+
+ # Check whether the artifact is on the server
for key in keys:
- update_artifact = artifact_pb2.UpdateArtifactRequest()
- update_artifact.cache_key = element.get_artifact_name(key)
- update_artifact.artifact.CopyFrom(artifact_proto)
+ try:
+ remote.get_artifact(element.get_artifact_name(key=key))
+ except grpc.RpcError as e:
+ if e.code() != grpc.StatusCode.NOT_FOUND:
+ raise ArtifactError("Error checking artifact cache: {}"
+ .format(e.details()))
+ else:
+ return False
+ # If not, we send the artifact proto
+ for key in keys:
try:
- artifact_service = artifact_pb2_grpc.ArtifactServiceStub(remote.channel)
- artifact_service.UpdateArtifact(update_artifact)
+ remote.update_artifact(element.get_artifact_name(key=key), artifact_proto)
except grpc.RpcError as e:
raise ArtifactError("Failed to push artifact: {}".format(e.details()))
return True
- # _pull_artifact()
+ # _pull_artifact_storage():
+ #
+ # Pull artifact blobs from the given remote.
#
# Args:
- # element (Element): element to pull
- # key (str): specific key of element to pull
- # remote (CASRemote): remote to pull from
- # pull_buildtree (bool): whether to pull buildtrees or not
+ # element (Element): element to pull
+ # key (str): The specific key for the artifact to pull
+ # remote (CASRemote): remote to pull from
+ # pull_buildtree (bool): whether to pull buildtrees or not
#
# Returns:
- # (bool): whether the pull was successful
+ # (bool): True if we pulled any blobs.
#
- def _pull_artifact(self, element, key, remote, pull_buildtrees=False):
-
+ # Raises:
+ # ArtifactError: If the pull failed for any reason except the
+ # blobs not existing on the server.
+ #
+ def _pull_artifact_storage(self, element, artifact, remote, pull_buildtrees=False):
def __pull_digest(digest):
self.cas._fetch_directory(remote, digest)
required_blobs = self.cas.required_blobs_for_directory(digest)
@@ -508,16 +638,6 @@ class ArtifactCache(BaseCache):
if missing_blobs:
self.cas.fetch_blobs(remote, missing_blobs)
- request = artifact_pb2.GetArtifactRequest()
- request.cache_key = element.get_artifact_name(key=key)
- try:
- artifact_service = artifact_pb2_grpc.ArtifactServiceStub(remote.channel)
- artifact = artifact_service.GetArtifact(request)
- except grpc.RpcError as e:
- if e.code() != grpc.StatusCode.NOT_FOUND:
- raise ArtifactError("Failed to pull artifact: {}".format(e.details()))
- return False
-
try:
if str(artifact.files):
__pull_digest(artifact.files)
@@ -538,13 +658,41 @@ class ArtifactCache(BaseCache):
raise ArtifactError("Failed to pull artifact: {}".format(e.details()))
return False
+ return True
+
+ # _pull_artifact_proto():
+ #
+ # Pull an artifact proto from a remote server.
+ #
+ # Args:
+ # element (Element): The element whose artifact to pull.
+ # key (str): The specific key for the artifact to pull.
+ # remote (ArtifactRemote): The remote to pull from.
+ #
+ # Returns:
+ # (Artifact|None): The artifact proto, or None if the server
+ # doesn't have it.
+ #
+ # Raises:
+ # ArtifactError: If the pull fails.
+ #
+ def _pull_artifact_proto(self, element, key, remote):
+ artifact_name = element.get_artifact_name(key=key)
+
+ try:
+ artifact = remote.get_artifact(artifact_name)
+ except grpc.RpcError as e:
+ if e.code() != grpc.StatusCode.NOT_FOUND:
+ raise ArtifactError("Failed to pull artifact: {}".format(e.details()))
+ return None
+
# Write the artifact proto to cache
- artifact_path = os.path.join(self.artifactdir, request.cache_key)
+ artifact_path = os.path.join(self.artifactdir, artifact_name)
os.makedirs(os.path.dirname(artifact_path), exist_ok=True)
with utils.save_file_atomic(artifact_path, mode='wb') as f:
f.write(artifact.SerializeToString())
- return True
+ return artifact
# _query_remote()
#
diff --git a/src/buildstream/_basecache.py b/src/buildstream/_basecache.py
index 9ad6c1277..df50bfb62 100644
--- a/src/buildstream/_basecache.py
+++ b/src/buildstream/_basecache.py
@@ -16,21 +16,23 @@
# Authors:
# Raoul Hidalgo Charman <raoul.hidalgocharman@codethink.co.uk>
#
-import multiprocessing
import os
from fnmatch import fnmatch
+from itertools import chain
from typing import TYPE_CHECKING
from . import utils
from . import _yaml
from ._cas import CASRemote
from ._message import Message, MessageType
-from ._exceptions import LoadError
+from ._exceptions import LoadError, RemoteError
+from ._remote import RemoteSpec, RemoteType
+
if TYPE_CHECKING:
from typing import Optional, Type
from ._exceptions import BstError
- from ._cas import CASRemoteSpec
+ from ._remote import BaseRemote
# Base Cache for Caches to derive from
@@ -39,19 +41,20 @@ class BaseCache():
# None of these should ever be called in the base class, but this appeases
# pylint to some degree
- spec_class = None # type: Type[CASRemoteSpec]
- spec_name = None # type: str
- spec_error = None # type: Type[BstError]
- config_node_name = None # type: str
- remote_class = CASRemote # type: Type[CASRemote]
+ spec_name = None # type: str
+ spec_error = None # type: Type[BstError]
+ config_node_name = None # type: str
+ index_remote_class = None # type: Type[BaseRemote]
+ storage_remote_class = CASRemote # type: Type[BaseRemote]
def __init__(self, context):
self.context = context
self.cas = context.get_cascache()
self._remotes_setup = False # Check to prevent double-setup of remotes
- # Per-project list of _CASRemote instances.
- self._remotes = {}
+ # Per-project list of Remote instances.
+ self._storage_remotes = {}
+ self._index_remotes = {}
self.global_remote_specs = []
self.project_remote_specs = {}
@@ -65,7 +68,7 @@ class BaseCache():
# against fork() with open gRPC channels.
#
def has_open_grpc_channels(self):
- for project_remotes in self._remotes.values():
+ for project_remotes in chain(self._index_remotes.values(), self._storage_remotes.values()):
for remote in project_remotes:
if remote.channel:
return True
@@ -77,7 +80,7 @@ class BaseCache():
#
def release_resources(self):
# Close all remotes and their gRPC channels
- for project_remotes in self._remotes.values():
+ for project_remotes in chain(self._index_remotes.values(), self._storage_remotes.values()):
for remote in project_remotes:
remote.close()
@@ -86,11 +89,11 @@ class BaseCache():
# Parses the configuration of remote artifact caches from a config block.
#
# Args:
- # config_node (dict): The config block, which may contain the 'artifacts' key
+ # config_node (dict): The config block, which may contain a key defined by cls.config_node_name
# basedir (str): The base directory for relative paths
#
# Returns:
- # A list of ArtifactCacheSpec instances.
+ # A list of RemoteSpec instances.
#
# Raises:
# LoadError, if the config block contains invalid keys.
@@ -106,11 +109,11 @@ class BaseCache():
artifacts = config_node.get_sequence(cls.config_node_name, default=[])
except LoadError:
provenance = config_node.get_node(cls.config_node_name).get_provenance()
- raise _yaml.LoadError("{}: 'artifacts' must be a single 'url:' mapping, or a list of mappings"
- .format(provenance), _yaml.LoadErrorReason.INVALID_DATA)
+ raise _yaml.LoadError("{}: '{}' must be a single remote mapping, or a list of mappings"
+ .format(provenance, cls.config_node_name), _yaml.LoadErrorReason.INVALID_DATA)
for spec_node in artifacts:
- cache_specs.append(cls.spec_class._new_from_config_node(spec_node, basedir))
+ cache_specs.append(RemoteSpec.new_from_config_node(spec_node))
return cache_specs
@@ -124,7 +127,7 @@ class BaseCache():
# project (Project): The BuildStream project
#
# Returns:
- # A list of ArtifactCacheSpec instances describing the remote artifact caches.
+ # A list of RemoteSpec instances describing the remote caches.
#
@classmethod
def _configured_remote_cache_specs(cls, context, project):
@@ -158,18 +161,26 @@ class BaseCache():
# the user config in some cases (for example `bst artifact push --remote=...`).
has_remote_caches = False
if remote_url:
- # pylint: disable=not-callable
- self._set_remotes([self.spec_class(remote_url, push=True)])
+ self._set_remotes([RemoteSpec(remote_url, push=True)])
has_remote_caches = True
if use_config:
for project in self.context.get_projects():
caches = self._configured_remote_cache_specs(self.context, project)
- if caches: # caches is a list of spec_class instances
+ if caches: # caches is a list of RemoteSpec instances
self._set_remotes(caches, project=project)
has_remote_caches = True
if has_remote_caches:
self._initialize_remotes()
+ # Notify remotes that forking is disabled
+ def notify_fork_disabled(self):
+ for project in self._index_remotes:
+ for remote in self._index_remotes[project]:
+ remote.notify_fork_disabled()
+ for project in self._storage_remotes:
+ for remote in self._storage_remotes[project]:
+ remote.notify_fork_disabled()
+
# initialize_remotes():
#
# This will contact each remote cache.
@@ -178,48 +189,31 @@ class BaseCache():
# on_failure (callable): Called if we fail to contact one of the caches.
#
def initialize_remotes(self, *, on_failure=None):
- remote_specs = self.global_remote_specs.copy()
-
- for project in self.project_remote_specs:
- remote_specs.extend(self.project_remote_specs[project])
-
- remote_specs = list(utils._deduplicate(remote_specs))
-
- remotes = {}
- q = multiprocessing.Queue()
- for remote_spec in remote_specs:
-
- error = self.remote_class.check_remote(remote_spec, self.cas, q)
-
- if error and on_failure:
- on_failure(remote_spec.url, error)
- continue
- elif error:
- raise self.spec_error(error) # pylint: disable=not-callable
-
- self._has_fetch_remotes = True
- if remote_spec.push:
- self._has_push_remotes = True
-
- remotes[remote_spec.url] = self.remote_class(remote_spec, self.cas)
+ index_remotes, storage_remotes = self._create_remote_instances(on_failure=on_failure)
+ # Assign remote instances to their respective projects
for project in self.context.get_projects():
- remote_specs = self.global_remote_specs
+ # Get the list of specs that should be considered for this
+ # project
+ remote_specs = self.global_remote_specs.copy()
if project in self.project_remote_specs:
- remote_specs = list(utils._deduplicate(remote_specs + self.project_remote_specs[project]))
+ remote_specs.extend(self.project_remote_specs[project])
- project_remotes = []
+ # De-duplicate the list
+ remote_specs = list(utils._deduplicate(remote_specs))
- for remote_spec in remote_specs:
- # Errors are already handled in the loop above,
- # skip unreachable remotes here.
- if remote_spec.url not in remotes:
- continue
+ def get_remotes(remote_list, remote_specs):
+ for remote_spec in remote_specs:
+ # If a remote_spec didn't make it into the remotes
+ # dict, that means we can't access it, and it has been
+ # disabled for this session.
+ if remote_spec not in remote_list:
+ continue
- remote = remotes[remote_spec.url]
- project_remotes.append(remote)
+ yield remote_list[remote_spec]
- self._remotes[project] = project_remotes
+ self._index_remotes[project] = list(get_remotes(index_remotes, remote_specs))
+ self._storage_remotes[project] = list(get_remotes(storage_remotes, remote_specs))
# has_fetch_remotes():
#
@@ -239,8 +233,9 @@ class BaseCache():
return True
else:
# Check whether the specified element's project has fetch remotes
- remotes_for_project = self._remotes[plugin._get_project()]
- return bool(remotes_for_project)
+ index_remotes = self._index_remotes[plugin._get_project()]
+ storage_remotes = self._storage_remotes[plugin._get_project()]
+ return index_remotes and storage_remotes
# has_push_remotes():
#
@@ -260,13 +255,102 @@ class BaseCache():
return True
else:
# Check whether the specified element's project has push remotes
- remotes_for_project = self._remotes[plugin._get_project()]
- return any(remote.spec.push for remote in remotes_for_project)
+ index_remotes = self._index_remotes[plugin._get_project()]
+ storage_remotes = self._storage_remotes[plugin._get_project()]
+ return (any(remote.spec.push for remote in index_remotes) and
+ any(remote.spec.push for remote in storage_remotes))
################################################
# Local Private Methods #
################################################
+ # _create_remote_instances():
+ #
+ # Create the global set of Remote instances, including
+ # project-specific and global instances, ensuring that all of them
+ # are accessible.
+ #
+ # Args:
+ # on_failure (Callable[[self.remote_class,Exception],None]):
+ # What do do when a remote doesn't respond.
+ #
+ # Returns:
+ # (Dict[RemoteSpec, self.remote_class], Dict[RemoteSpec,
+ # self.remote_class]) -
+ # The created remote instances, index first, storage last.
+ #
+ def _create_remote_instances(self, *, on_failure=None):
+ # Create a flat list of all remote specs, global or
+ # project-specific
+ remote_specs = self.global_remote_specs.copy()
+ for project in self.project_remote_specs:
+ remote_specs.extend(self.project_remote_specs[project])
+
+ # By de-duplicating it after we flattened the list, we ensure
+ # that we never instantiate the same remote twice. This
+ # de-duplication also preserves their order.
+ remote_specs = list(utils._deduplicate(remote_specs))
+
+ # Now let's create a dict of this, indexed by their specs, so
+ # that we can later assign them to the right projects.
+ index_remotes = {}
+ storage_remotes = {}
+ for remote_spec in remote_specs:
+ try:
+ index, storage = self._instantiate_remote(remote_spec)
+ except RemoteError as err:
+ if on_failure:
+ on_failure(remote_spec, str(err))
+ continue
+ else:
+ raise
+
+ # Finally, we can instantiate the remote. Note that
+ # NamedTuples are hashable, so we can use them as pretty
+ # low-overhead keys.
+ if index:
+ index_remotes[remote_spec] = index
+ if storage:
+ storage_remotes[remote_spec] = storage
+
+ self._has_fetch_remotes = storage_remotes and index_remotes
+ self._has_push_remotes = (any(spec.push for spec in storage_remotes) and
+ any(spec.push for spec in index_remotes))
+
+ return index_remotes, storage_remotes
+
+ # _instantiate_remote()
+ #
+ # Instantiate a remote given its spec, asserting that it is
+ # reachable - this may produce two remote instances (a storage and
+ # an index remote as specified by the class variables).
+ #
+ # Args:
+ #
+ # remote_spec (RemoteSpec): The spec of the remote to
+ # instantiate.
+ #
+ # Returns:
+ #
+ # (Tuple[Remote|None, Remote|None]) - The remotes, index remote
+ # first, storage remote second. One must always be specified,
+ # the other may be None.
+ #
+ def _instantiate_remote(self, remote_spec):
+ # Our remotes can be index, storage or both. In either case,
+ # we need to use a different type of Remote for our calls, so
+ # we create two objects here
+ index = None
+ storage = None
+ if remote_spec.type in [RemoteType.INDEX, RemoteType.ALL]:
+ index = self.index_remote_class(remote_spec) # pylint: disable=not-callable
+ index.check()
+ if remote_spec.type in [RemoteType.STORAGE, RemoteType.ALL]:
+ storage = self.storage_remote_class(remote_spec, self.cas)
+ storage.check()
+
+ return (index, storage)
+
# _message()
#
# Local message propagator
@@ -298,8 +382,8 @@ class BaseCache():
# reports takes care of messaging
#
def _initialize_remotes(self):
- def remote_failed(url, error):
- self._message(MessageType.WARN, "Failed to initialize remote {}: {}".format(url, error))
+ def remote_failed(remote, error):
+ self._message(MessageType.WARN, "Failed to initialize remote {}: {}".format(remote.url, error))
with self.context.messenger.timed_activity("Initializing remote caches", silent_nested=True):
self.initialize_remotes(on_failure=remote_failed)
diff --git a/src/buildstream/_cas/__init__.py b/src/buildstream/_cas/__init__.py
index a88e41371..17e3a3fd9 100644
--- a/src/buildstream/_cas/__init__.py
+++ b/src/buildstream/_cas/__init__.py
@@ -18,4 +18,4 @@
# Tristan Van Berkom <tristan.vanberkom@codethink.co.uk>
from .cascache import CASCache
-from .casremote import CASRemote, CASRemoteSpec
+from .casremote import CASRemote
diff --git a/src/buildstream/_cas/casremote.py b/src/buildstream/_cas/casremote.py
index ba0477550..1efed22e6 100644
--- a/src/buildstream/_cas/casremote.py
+++ b/src/buildstream/_cas/casremote.py
@@ -1,9 +1,3 @@
-from collections import namedtuple
-import os
-import multiprocessing
-import signal
-from urllib.parse import urlparse
-
import grpc
from .._protos.google.rpc import code_pb2
@@ -11,61 +5,14 @@ from .._protos.build.bazel.remote.execution.v2 import remote_execution_pb2, remo
from .._protos.build.buildgrid import local_cas_pb2
from .._protos.buildstream.v2 import buildstream_pb2, buildstream_pb2_grpc
-from .._exceptions import CASRemoteError, LoadError, LoadErrorReason
-from .. import _signals
-from .. import utils
+from .._remote import BaseRemote
+from .._exceptions import CASRemoteError
# The default limit for gRPC messages is 4 MiB.
# Limit payload to 1 MiB to leave sufficient headroom for metadata.
_MAX_PAYLOAD_BYTES = 1024 * 1024
-class CASRemoteSpec(namedtuple('CASRemoteSpec', 'url push server_cert client_key client_cert instance_name')):
-
- # _new_from_config_node
- #
- # Creates an CASRemoteSpec() from a YAML loaded node
- #
- @staticmethod
- def _new_from_config_node(spec_node, basedir=None):
- spec_node.validate_keys(['url', 'push', 'server-cert', 'client-key', 'client-cert', 'instance-name'])
- url = spec_node.get_str('url')
- push = spec_node.get_bool('push', default=False)
- if not url:
- provenance = spec_node.get_node('url').get_provenance()
- raise LoadError("{}: empty artifact cache URL".format(provenance), LoadErrorReason.INVALID_DATA)
-
- instance_name = spec_node.get_str('instance-name', default=None)
-
- server_cert = spec_node.get_str('server-cert', default=None)
- if server_cert and basedir:
- server_cert = os.path.join(basedir, server_cert)
-
- client_key = spec_node.get_str('client-key', default=None)
- if client_key and basedir:
- client_key = os.path.join(basedir, client_key)
-
- client_cert = spec_node.get_str('client-cert', default=None)
- if client_cert and basedir:
- client_cert = os.path.join(basedir, client_cert)
-
- if client_key and not client_cert:
- provenance = spec_node.get_node('client-key').get_provenance()
- raise LoadError("{}: 'client-key' was specified without 'client-cert'".format(provenance),
- LoadErrorReason.INVALID_DATA)
-
- if client_cert and not client_key:
- provenance = spec_node.get_node('client-cert').get_provenance()
- raise LoadError("{}: 'client-cert' was specified without 'client-key'".format(provenance),
- LoadErrorReason.INVALID_DATA)
-
- return CASRemoteSpec(url, push, server_cert, client_key, client_cert, instance_name)
-
-
-# Disable type-checking since "Callable[...] has no attributes __defaults__"
-CASRemoteSpec.__new__.__defaults__ = (None, None, None, None) # type: ignore
-
-
class BlobNotFound(CASRemoteError):
def __init__(self, blob, msg):
@@ -75,13 +22,12 @@ class BlobNotFound(CASRemoteError):
# Represents a single remote CAS cache.
#
-class CASRemote():
- def __init__(self, spec, cascache):
- self.spec = spec
- self._initialized = False
+class CASRemote(BaseRemote):
+
+ def __init__(self, spec, cascache, **kwargs):
+ super().__init__(spec, **kwargs)
+
self.cascache = cascache
- self.channel = None
- self.instance_name = None
self.cas = None
self.ref_storage = None
self.batch_update_supported = None
@@ -90,157 +36,102 @@ class CASRemote():
self.max_batch_total_size_bytes = None
self.local_cas_instance_name = None
- def init(self):
- if not self._initialized:
- server_cert_bytes = None
- client_key_bytes = None
- client_cert_bytes = None
-
- url = urlparse(self.spec.url)
- if url.scheme == 'http':
- port = url.port or 80
- self.channel = grpc.insecure_channel('{}:{}'.format(url.hostname, port))
- elif url.scheme == 'https':
- port = url.port or 443
-
- if self.spec.server_cert:
- with open(self.spec.server_cert, 'rb') as f:
- server_cert_bytes = f.read()
-
- if self.spec.client_key:
- with open(self.spec.client_key, 'rb') as f:
- client_key_bytes = f.read()
-
- if self.spec.client_cert:
- with open(self.spec.client_cert, 'rb') as f:
- client_cert_bytes = f.read()
-
- credentials = grpc.ssl_channel_credentials(root_certificates=server_cert_bytes,
- private_key=client_key_bytes,
- certificate_chain=client_cert_bytes)
- self.channel = grpc.secure_channel('{}:{}'.format(url.hostname, port), credentials)
- else:
- raise CASRemoteError("Unsupported URL: {}".format(self.spec.url))
-
- self.instance_name = self.spec.instance_name or None
-
- self.cas = remote_execution_pb2_grpc.ContentAddressableStorageStub(self.channel)
- self.capabilities = remote_execution_pb2_grpc.CapabilitiesStub(self.channel)
- self.ref_storage = buildstream_pb2_grpc.ReferenceStorageStub(self.channel)
-
- self.max_batch_total_size_bytes = _MAX_PAYLOAD_BYTES
- try:
- request = remote_execution_pb2.GetCapabilitiesRequest()
- if self.instance_name:
- request.instance_name = self.instance_name
- response = self.capabilities.GetCapabilities(request)
- server_max_batch_total_size_bytes = response.cache_capabilities.max_batch_total_size_bytes
- if 0 < server_max_batch_total_size_bytes < self.max_batch_total_size_bytes:
- self.max_batch_total_size_bytes = server_max_batch_total_size_bytes
- except grpc.RpcError as e:
- # Simply use the defaults for servers that don't implement GetCapabilities()
- if e.code() != grpc.StatusCode.UNIMPLEMENTED:
- raise
-
- # Check whether the server supports BatchReadBlobs()
- self.batch_read_supported = False
- try:
- request = remote_execution_pb2.BatchReadBlobsRequest()
- if self.instance_name:
- request.instance_name = self.instance_name
- response = self.cas.BatchReadBlobs(request)
- self.batch_read_supported = True
- except grpc.RpcError as e:
- if e.code() != grpc.StatusCode.UNIMPLEMENTED:
- raise
-
- # Check whether the server supports BatchUpdateBlobs()
- self.batch_update_supported = False
- try:
- request = remote_execution_pb2.BatchUpdateBlobsRequest()
- if self.instance_name:
- request.instance_name = self.instance_name
- response = self.cas.BatchUpdateBlobs(request)
- self.batch_update_supported = True
- except grpc.RpcError as e:
- if (e.code() != grpc.StatusCode.UNIMPLEMENTED and
- e.code() != grpc.StatusCode.PERMISSION_DENIED):
- raise
-
- local_cas = self.cascache._get_local_cas()
- request = local_cas_pb2.GetInstanceNameForRemoteRequest()
- request.url = self.spec.url
- if self.spec.instance_name:
- request.instance_name = self.spec.instance_name
- if server_cert_bytes:
- request.server_cert = server_cert_bytes
- if client_key_bytes:
- request.client_key = client_key_bytes
- if client_cert_bytes:
- request.client_cert = client_cert_bytes
- response = local_cas.GetInstanceNameForRemote(request)
- self.local_cas_instance_name = response.instance_name
-
- self._initialized = True
-
- def __enter__(self):
- return self
-
- def __exit__(self, exc_type, exc_value, traceback):
- self.close()
- return False
-
- def close(self):
- if self.channel:
- self.channel.close()
- self.channel = None
-
# check_remote
+ # _configure_protocols():
#
- # Used when checking whether remote_specs work in the buildstream main
- # thread, runs this in a seperate process to avoid creation of gRPC threads
- # in the main BuildStream process
- # See https://github.com/grpc/grpc/blob/master/doc/fork_support.md for details
- @classmethod
- def check_remote(cls, remote_spec, cascache, q):
-
- def __check_remote():
- try:
- remote = cls(remote_spec, cascache)
- remote.init()
-
- request = buildstream_pb2.StatusRequest()
- response = remote.ref_storage.Status(request)
-
- if remote_spec.push and not response.allow_updates:
- q.put('CAS server does not allow push')
- else:
- # No error
- q.put(None)
-
- except grpc.RpcError as e:
- # str(e) is too verbose for errors reported to the user
- q.put(e.details())
+ # Configure remote-specific protocols. This method should *never*
+ # be called outside of init().
+ #
+ def _configure_protocols(self):
+ self.cas = remote_execution_pb2_grpc.ContentAddressableStorageStub(self.channel)
+ self.capabilities = remote_execution_pb2_grpc.CapabilitiesStub(self.channel)
+ self.ref_storage = buildstream_pb2_grpc.ReferenceStorageStub(self.channel)
+
+ # Figure out what batch sizes the server will accept, falling
+ # back to our _MAX_PAYLOAD_BYTES
+ self.max_batch_total_size_bytes = _MAX_PAYLOAD_BYTES
+ try:
+ request = remote_execution_pb2.GetCapabilitiesRequest()
+ if self.instance_name:
+ request.instance_name = self.instance_name
+ response = self.capabilities.GetCapabilities(request)
+ server_max_batch_total_size_bytes = response.cache_capabilities.max_batch_total_size_bytes
+ if 0 < server_max_batch_total_size_bytes < self.max_batch_total_size_bytes:
+ self.max_batch_total_size_bytes = server_max_batch_total_size_bytes
+ except grpc.RpcError as e:
+ # Simply use the defaults for servers that don't implement
+ # GetCapabilities()
+ if e.code() != grpc.StatusCode.UNIMPLEMENTED:
+ raise
+
+ # Check whether the server supports BatchReadBlobs()
+ self.batch_read_supported = self._check_support(
+ remote_execution_pb2.BatchReadBlobsRequest,
+ self.cas.BatchReadBlobs
+ )
+
+ # Check whether the server supports BatchUpdateBlobs()
+ self.batch_update_supported = self._check_support(
+ remote_execution_pb2.BatchUpdateBlobsRequest,
+ self.cas.BatchUpdateBlobs
+ )
+
+ local_cas = self.cascache._get_local_cas()
+ request = local_cas_pb2.GetInstanceNameForRemoteRequest()
+ request.url = self.spec.url
+ if self.spec.instance_name:
+ request.instance_name = self.spec.instance_name
+ if self.server_cert:
+ request.server_cert = self.server_cert
+ if self.client_key:
+ request.client_key = self.client_key
+ if self.client_cert:
+ request.client_cert = self.client_cert
+ response = local_cas.GetInstanceNameForRemote(request)
+ self.local_cas_instance_name = response.instance_name
+
+ # _check():
+ #
+ # Check if this remote provides everything required for the
+ # particular kind of remote. This is expected to be called as part
+ # of check(), and must be called in a non-main process.
+ #
+ # Returns:
+ # (str|None): An error message, or None if no error message.
+ #
+ def _check(self):
+ request = buildstream_pb2.StatusRequest()
+ response = self.ref_storage.Status(request)
- except Exception as e: # pylint: disable=broad-except
- # Whatever happens, we need to return it to the calling process
- #
- q.put(str(e))
+ if self.spec.push and not response.allow_updates:
+ return 'CAS server does not allow push'
- p = multiprocessing.Process(target=__check_remote)
+ return None
+ # _check_support():
+ #
+ # Figure out if a remote server supports a given method based on
+ # grpc.StatusCode.UNIMPLEMENTED and grpc.StatusCode.PERMISSION_DENIED.
+ #
+ # Args:
+ # request_type (callable): The type of request to check.
+ # invoker (callable): The remote method that will be invoked.
+ #
+ # Returns:
+ # (bool) - Whether the request is supported.
+ #
+ def _check_support(self, request_type, invoker):
try:
- # Keep SIGINT blocked in the child process
- with _signals.blocked([signal.SIGINT], ignore=False):
- p.start()
+ request = request_type()
+ if self.instance_name:
+ request.instance_name = self.instance_name
+ invoker(request)
+ return True
+ except grpc.RpcError as e:
+ if not e.code() in (grpc.StatusCode.UNIMPLEMENTED, grpc.StatusCode.PERMISSION_DENIED):
+ raise
- error = q.get()
- p.join()
- except KeyboardInterrupt:
- utils._kill_process_tree(p.pid)
- raise
-
- return error
+ return False
# push_message():
#
diff --git a/src/buildstream/_cas/casserver.py b/src/buildstream/_cas/casserver.py
index ca7a21955..bb011146e 100644
--- a/src/buildstream/_cas/casserver.py
+++ b/src/buildstream/_cas/casserver.py
@@ -54,9 +54,10 @@ _MAX_PAYLOAD_BYTES = 1024 * 1024
# Args:
# repo (str): Path to CAS repository
# enable_push (bool): Whether to allow blob uploads and artifact updates
+# index_only (bool): Whether to store CAS blobs or only artifacts
#
@contextmanager
-def create_server(repo, *, enable_push, quota):
+def create_server(repo, *, enable_push, quota, index_only):
cas = CASCache(os.path.abspath(repo), cache_quota=quota, protect_session_blobs=False)
try:
@@ -67,11 +68,12 @@ def create_server(repo, *, enable_push, quota):
max_workers = (os.cpu_count() or 1) * 5
server = grpc.server(futures.ThreadPoolExecutor(max_workers))
- bytestream_pb2_grpc.add_ByteStreamServicer_to_server(
- _ByteStreamServicer(cas, enable_push=enable_push), server)
+ if not index_only:
+ bytestream_pb2_grpc.add_ByteStreamServicer_to_server(
+ _ByteStreamServicer(cas, enable_push=enable_push), server)
- remote_execution_pb2_grpc.add_ContentAddressableStorageServicer_to_server(
- _ContentAddressableStorageServicer(cas, enable_push=enable_push), server)
+ remote_execution_pb2_grpc.add_ContentAddressableStorageServicer_to_server(
+ _ContentAddressableStorageServicer(cas, enable_push=enable_push), server)
remote_execution_pb2_grpc.add_CapabilitiesServicer_to_server(
_CapabilitiesServicer(), server)
@@ -80,7 +82,7 @@ def create_server(repo, *, enable_push, quota):
_ReferenceStorageServicer(cas, enable_push=enable_push), server)
artifact_pb2_grpc.add_ArtifactServiceServicer_to_server(
- _ArtifactServicer(cas, artifactdir), server)
+ _ArtifactServicer(cas, artifactdir, update_cas=not index_only), server)
source_pb2_grpc.add_SourceServiceServicer_to_server(
_SourceServicer(sourcedir), server)
@@ -110,9 +112,12 @@ def create_server(repo, *, enable_push, quota):
@click.option('--quota', type=click.INT,
help="Maximum disk usage in bytes",
default=10e9)
+@click.option('--index-only', type=click.BOOL,
+ help="Only provide the BuildStream artifact and source services (\"index\"), not the CAS (\"storage\")",
+ default=False)
@click.argument('repo')
def server_main(repo, port, server_key, server_cert, client_certs, enable_push,
- quota):
+ quota, index_only):
# Handle SIGTERM by calling sys.exit(0), which will raise a SystemExit exception,
# properly executing cleanup code in `finally` clauses and context managers.
# This is required to terminate buildbox-casd on SIGTERM.
@@ -120,7 +125,8 @@ def server_main(repo, port, server_key, server_cert, client_certs, enable_push,
with create_server(repo,
quota=quota,
- enable_push=enable_push) as server:
+ enable_push=enable_push,
+ index_only=index_only) as server:
use_tls = bool(server_key)
@@ -434,10 +440,11 @@ class _ReferenceStorageServicer(buildstream_pb2_grpc.ReferenceStorageServicer):
class _ArtifactServicer(artifact_pb2_grpc.ArtifactServiceServicer):
- def __init__(self, cas, artifactdir):
+ def __init__(self, cas, artifactdir, *, update_cas=True):
super().__init__()
self.cas = cas
self.artifactdir = artifactdir
+ self.update_cas = update_cas
os.makedirs(artifactdir, exist_ok=True)
def GetArtifact(self, request, context):
@@ -449,6 +456,20 @@ class _ArtifactServicer(artifact_pb2_grpc.ArtifactServiceServicer):
with open(artifact_path, 'rb') as f:
artifact.ParseFromString(f.read())
+ # Artifact-only servers will not have blobs on their system,
+ # so we can't reasonably update their mtimes. Instead, we exit
+ # early, and let the CAS server deal with its blobs.
+ #
+ # FIXME: We could try to run FindMissingBlobs on the other
+ # server. This is tricky to do from here, of course,
+ # because we don't know who the other server is, but
+ # the client could be smart about it - but this might
+ # make things slower.
+ #
+ # It needs some more thought...
+ if not self.update_cas:
+ return artifact
+
# Now update mtimes of files present.
try:
@@ -481,16 +502,17 @@ class _ArtifactServicer(artifact_pb2_grpc.ArtifactServiceServicer):
def UpdateArtifact(self, request, context):
artifact = request.artifact
- # Check that the files specified are in the CAS
- self._check_directory("files", artifact.files, context)
+ if self.update_cas:
+ # Check that the files specified are in the CAS
+ self._check_directory("files", artifact.files, context)
- # Unset protocol buffers don't evaluated to False but do return empty
- # strings, hence str()
- if str(artifact.public_data):
- self._check_file("public data", artifact.public_data, context)
+ # Unset protocol buffers don't evaluated to False but do return empty
+ # strings, hence str()
+ if str(artifact.public_data):
+ self._check_file("public data", artifact.public_data, context)
- for log_file in artifact.logs:
- self._check_file("log digest", log_file.digest, context)
+ for log_file in artifact.logs:
+ self._check_file("log digest", log_file.digest, context)
# Add the artifact proto to the cas
artifact_path = os.path.join(self.artifactdir, request.cache_key)
diff --git a/src/buildstream/_exceptions.py b/src/buildstream/_exceptions.py
index 648742dbb..947b83149 100644
--- a/src/buildstream/_exceptions.py
+++ b/src/buildstream/_exceptions.py
@@ -96,6 +96,7 @@ class ErrorDomain(Enum):
VIRTUAL_FS = 13
CAS = 14
PROG_NOT_FOUND = 15
+ REMOTE = 16
# BstError is an internal base exception class for BuildStream
@@ -290,6 +291,15 @@ class ArtifactError(BstError):
super().__init__(message, detail=detail, domain=ErrorDomain.ARTIFACT, reason=reason, temporary=True)
+# RemoteError
+#
+# Raised when errors are encountered in Remotes
+#
+class RemoteError(BstError):
+ def __init__(self, message, *, detail=None, reason=None):
+ super().__init__(message, detail=detail, domain=ErrorDomain.REMOTE, reason=reason)
+
+
# CASError
#
# Raised when errors are encountered in the CAS
diff --git a/src/buildstream/_remote.py b/src/buildstream/_remote.py
new file mode 100644
index 000000000..75c626c47
--- /dev/null
+++ b/src/buildstream/_remote.py
@@ -0,0 +1,294 @@
+#
+# Copyright (C) 2019 Bloomberg Finance LP
+#
+# This program is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License as published by the Free Software Foundation; either
+# version 2 of the License, or (at your option) any later version.
+#
+# This library is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+# Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public
+# License along with this library. If not, see <http://www.gnu.org/licenses/>.
+#
+
+import multiprocessing
+import os
+import signal
+from collections import namedtuple
+from urllib.parse import urlparse
+
+import grpc
+
+from . import _signals
+from . import utils
+from ._exceptions import LoadError, LoadErrorReason, ImplError, RemoteError
+from ._protos.google.bytestream import bytestream_pb2_grpc
+from .types import FastEnum
+
+
+# RemoteType():
+#
+# Defines the different types of remote.
+#
+class RemoteType(FastEnum):
+ INDEX = "index"
+ STORAGE = "storage"
+ ALL = "all"
+
+ def __str__(self):
+ return self.name.lower().replace('_', '-')
+
+
+# RemoteSpec():
+#
+# Defines the basic structure of a remote specification.
+#
+class RemoteSpec(namedtuple('RemoteSpec', 'url push server_cert client_key client_cert instance_name type')):
+
+ # new_from_config_node
+ #
+ # Creates a RemoteSpec() from a YAML loaded node.
+ #
+ # Args:
+ # spec_node (MappingNode): The configuration node describing the spec.
+ # basedir (str): The base directory from which to find certificates.
+ #
+ # Returns:
+ # (RemoteSpec) - The described RemoteSpec instance.
+ #
+ # Raises:
+ # LoadError: If the node is malformed.
+ #
+ @classmethod
+ def new_from_config_node(cls, spec_node, basedir=None):
+ spec_node.validate_keys(['url', 'push', 'server-cert', 'client-key', 'client-cert', 'instance-name', 'type'])
+
+ url = spec_node.get_str('url')
+ if not url:
+ provenance = spec_node.get_node('url').get_provenance()
+ raise LoadError("{}: empty artifact cache URL".format(provenance), LoadErrorReason.INVALID_DATA)
+
+ push = spec_node.get_bool('push', default=False)
+ instance_name = spec_node.get_str('instance-name', default=None)
+
+ def parse_cert(key):
+ cert = spec_node.get_str(key, default=None)
+ if cert and basedir:
+ cert = os.path.join(basedir, cert)
+ return cert
+
+ cert_keys = ('server-cert', 'client-key', 'client-cert')
+ server_cert, client_key, client_cert = tuple(parse_cert(key) for key in cert_keys)
+
+ if client_key and not client_cert:
+ provenance = spec_node.get_node('client-key').get_provenance()
+ raise LoadError("{}: 'client-key' was specified without 'client-cert'".format(provenance),
+ LoadErrorReason.INVALID_DATA)
+
+ if client_cert and not client_key:
+ provenance = spec_node.get_node('client-cert').get_provenance()
+ raise LoadError("{}: 'client-cert' was specified without 'client-key'".format(provenance),
+ LoadErrorReason.INVALID_DATA)
+
+ type_ = spec_node.get_enum('type', RemoteType, default=RemoteType.ALL)
+
+ return cls(url, push, server_cert, client_key, client_cert, instance_name, type_)
+
+
+# FIXME: This can be made much nicer in python 3.7 through the use of
+# defaults - or hell, by replacing it all with a typing.NamedTuple
+#
+# Note that defaults are specified from the right, and ommitted values
+# are considered mandatory.
+#
+# Disable type-checking since "Callable[...] has no attributes __defaults__"
+RemoteSpec.__new__.__defaults__ = ( # type: ignore
+ # mandatory # url - The url of the remote
+ # mandatory # push - Whether the remote should be used for pushing
+ None, # server_cert - The server certificate
+ None, # client_key - The (private) client key
+ None, # client_cert - The (public) client certificate
+ None, # instance_name - The (grpc) instance name of the remote
+ RemoteType.ALL # type - The type of the remote (index, storage, both)
+)
+
+
+# BaseRemote():
+#
+# Provides the basic functionality required to set up remote
+# interaction via GRPC. In particular, this will set up a
+# grpc.insecure_channel, or a grpc.secure_channel, based on the given
+# spec.
+#
+# Customization for the particular protocol is expected to be
+# performed in children.
+#
+class BaseRemote():
+ key_name = None
+
+ def __init__(self, spec):
+ self.spec = spec
+ self._initialized = False
+
+ self.bytestream = None
+ self.channel = None
+
+ self.server_cert = None
+ self.client_key = None
+ self.client_cert = None
+
+ self.instance_name = spec.instance_name
+ self.push = spec.push
+ self.url = spec.url
+
+ # init():
+ #
+ # Initialize the given remote. This function must be called before
+ # any communication is performed, since such will otherwise fail.
+ #
+ def init(self):
+ if self._initialized:
+ return
+
+ # Set up the communcation channel
+ url = urlparse(self.spec.url)
+ if url.scheme == 'http':
+ port = url.port or 80
+ self.channel = grpc.insecure_channel('{}:{}'.format(url.hostname, port))
+ elif url.scheme == 'https':
+ port = url.port or 443
+ try:
+ server_cert, client_key, client_cert = _read_files(
+ self.spec.server_cert,
+ self.spec.client_key,
+ self.spec.client_cert)
+ except FileNotFoundError as e:
+ raise RemoteError("Could not read certificates: {}".format(e)) from e
+ self.server_cert = server_cert
+ self.client_key = client_key
+ self.client_cert = client_cert
+ credentials = grpc.ssl_channel_credentials(root_certificates=self.server_cert,
+ private_key=self.client_key,
+ certificate_chain=self.client_cert)
+ self.channel = grpc.secure_channel('{}:{}'.format(url.hostname, port), credentials)
+ else:
+ raise RemoteError("Unsupported URL: {}".format(self.spec.url))
+
+ # Set up the bytestream on our channel
+ self.bytestream = bytestream_pb2_grpc.ByteStreamStub(self.channel)
+
+ self._configure_protocols()
+
+ self._initialized = True
+
+ def __enter__(self):
+ return self
+
+ def __exit__(self, exc_type, exc_value, traceback):
+ self.close()
+ return False
+
+ def close(self):
+ if self.channel:
+ self.channel.close()
+ self.channel = None
+
+ # _configure_protocols():
+ #
+ # An abstract method to configure remote-specific protocols. This
+ # is *not* done as super().init() because we want to be able to
+ # set self._initialized *after* initialization completes in the
+ # parent class.
+ #
+ # This method should *never* be called outside of init().
+ #
+ def _configure_protocols(self):
+ raise ImplError("An implementation of a Remote must configure its protocols.")
+
+ # check():
+ #
+ # Check if the remote is functional and has all the required
+ # capabilities. This should be used somewhat like an assertion,
+ # expecting a RemoteError.
+ #
+ # Note that this method runs the calls on a separate process, so
+ # that we can use grpc calls even if we are on the main process.
+ #
+ # Raises:
+ # RemoteError: If the grpc call fails.
+ #
+ def check(self):
+ queue = multiprocessing.Queue()
+
+ def __check_remote():
+ try:
+ self.init()
+ queue.put(self._check())
+
+ except grpc.RpcError as e:
+ # str(e) is too verbose for errors reported to the user
+ queue.put(e.details())
+
+ except Exception as e: # pylint: disable=broad-except
+ # Whatever happens, we need to return it to the calling process
+ #
+ queue.put(str(e))
+
+ process = multiprocessing.Process(target=__check_remote)
+
+ try:
+ # Keep SIGINT blocked in the child process
+ with _signals.blocked([signal.SIGINT], ignore=False):
+ process.start()
+
+ error = queue.get()
+ process.join()
+ except KeyboardInterrupt:
+ utils._kill_process_tree(process.pid)
+ raise
+ finally:
+ # Should not be necessary, but let's avoid keeping them
+ # alive too long
+ queue.close()
+
+ if error:
+ raise RemoteError(error)
+
+ # _check():
+ #
+ # Check if this remote provides everything required for the
+ # particular kind of remote. This is expected to be called as part
+ # of check(), and must be called in a non-main process.
+ #
+ # Returns:
+ # (str|None): An error message, or None if no error message.
+ #
+ def _check(self):
+ return None
+
+ def __str__(self):
+ return self.url
+
+
+# _read_files():
+#
+# A helper method to read a bunch of files, ignoring any input
+# arguments that are None.
+#
+# Args:
+# files (Iterable[str|None]): A list of files to read. Nones are passed back.
+#
+# Returns:
+# Generator[str|None, None, None] - Strings read from those files.
+#
+def _read_files(*files):
+ def read_file(f):
+ if f:
+ with open(f, 'rb') as data:
+ return data.read()
+ return None
+ return (read_file(f) for f in files)
diff --git a/src/buildstream/_sourcecache.py b/src/buildstream/_sourcecache.py
index 64498ba32..76a2e4f39 100644
--- a/src/buildstream/_sourcecache.py
+++ b/src/buildstream/_sourcecache.py
@@ -20,7 +20,7 @@
import os
import grpc
-from ._cas import CASRemote, CASRemoteSpec
+from ._remote import BaseRemote
from .storage._casbaseddirectory import CasBasedDirectory
from ._basecache import BaseCache
from ._exceptions import CASError, CASRemoteError, SourceCacheError
@@ -29,53 +29,75 @@ from ._protos.buildstream.v2 import buildstream_pb2, buildstream_pb2_grpc, \
source_pb2, source_pb2_grpc
-# Holds configuration for a remote used for the source cache.
-#
-# Args:
-# url (str): Location of the remote source cache
-# push (bool): Whether we should attempt to push sources to this cache,
-# in addition to pulling from it.
-# instance-name (str): Name if any, of instance of server
-#
-class SourceCacheSpec(CASRemoteSpec):
- pass
-
+class SourceRemote(BaseRemote):
-class SourceRemote(CASRemote):
- def __init__(self, *args):
- super().__init__(*args)
- self.capabilities_service = None
+ def __init__(self, *args, **kwargs):
+ super().__init__(*args, **kwargs)
self.source_service = None
- def init(self):
- if not self._initialized:
- super().init()
-
- self.capabilities_service = buildstream_pb2_grpc.CapabilitiesStub(self.channel)
+ def _configure_protocols(self):
+ capabilities_service = buildstream_pb2_grpc.CapabilitiesStub(self.channel)
+ # check that the service supports sources
+ try:
+ request = buildstream_pb2.GetCapabilitiesRequest()
+ if self.instance_name:
+ request.instance_name = self.instance_name
- # check that the service supports sources
- try:
- request = buildstream_pb2.GetCapabilitiesRequest()
- if self.instance_name:
- request.instance_name = self.instance_name
-
- response = self.capabilities_service.GetCapabilities(request)
- except grpc.RpcError as e:
- # Check if this remote has the artifact service
- if e.code() == grpc.StatusCode.UNIMPLEMENTED:
- raise SourceCacheError(
- "Configured remote does not have the BuildStream "
- "capabilities service. Please check remote configuration.")
- # Else raise exception with details
+ response = capabilities_service.GetCapabilities(request)
+ except grpc.RpcError as e:
+ # Check if this remote has the artifact service
+ if e.code() == grpc.StatusCode.UNIMPLEMENTED:
raise SourceCacheError(
- "Remote initialisation failed: {}".format(e.details()))
+ "Configured remote does not have the BuildStream "
+ "capabilities service. Please check remote configuration.")
+ # Else raise exception with details
+ raise SourceCacheError(
+ "Remote initialisation failed: {}".format(e.details()))
- if not response.source_capabilities:
- raise SourceCacheError(
- "Configured remote does not support source service")
+ if not response.source_capabilities:
+ raise SourceCacheError(
+ "Configured remote does not support source service")
- # set up source service
- self.source_service = source_pb2_grpc.SourceServiceStub(self.channel)
+ # set up source service
+ self.source_service = source_pb2_grpc.SourceServiceStub(self.channel)
+
+ # get_source():
+ #
+ # Get a source proto for a given source_ref from the remote.
+ #
+ # Args:
+ # source_ref (str): The source ref of the source to pull.
+ #
+ # Returns:
+ # (Source): The source proto
+ #
+ # Raises:
+ # grpc.RpcError: If something goes wrong during the request.
+ #
+ def get_source(self, source_ref):
+ request = source_pb2.GetSourceRequest()
+ request.cache_key = source_ref
+ return self.source_service.GetSource(request)
+
+ # update_source():
+ #
+ # Update the source on the remote.
+ #
+ # Args:
+ # source_ref (str): The source ref of the source to update.
+ # source (Source): The proto to update with.
+ #
+ # Returns:
+ # (bool): Whether the update was successful.
+ #
+ # Raises:
+ # grpc.RpcError: If something goes wrong during the request.
+ #
+ def update_source(self, source_ref, source):
+ request = source_pb2.UpdateSourceRequest()
+ request.cache_key = source_ref
+ request.source.CopyFrom(source)
+ return self.source_service.UpdateSource(request)
# Class that keeps config of remotes and deals with caching of sources.
@@ -85,11 +107,10 @@ class SourceRemote(CASRemote):
#
class SourceCache(BaseCache):
- spec_class = SourceCacheSpec
spec_name = "source_cache_specs"
spec_error = SourceCacheError
config_node_name = "source-caches"
- remote_class = SourceRemote
+ index_remote_class = SourceRemote
def __init__(self, context):
super().__init__(context)
@@ -183,39 +204,53 @@ class SourceCache(BaseCache):
# (bool): True if pull successful, False if not
def pull(self, source):
ref = source._get_source_name()
-
project = source._get_project()
-
display_key = source._get_brief_display_key()
- for remote in self._remotes[project]:
+ index_remotes = self._index_remotes[project]
+ storage_remotes = self._storage_remotes[project]
+
+ # First fetch the source proto so we know what to pull
+ source_proto = None
+ for remote in index_remotes:
try:
- source.status("Pulling source {} <- {}".format(display_key, remote.spec.url))
+ remote.init()
+ source.status("Pulling source {} <- {}".format(display_key, remote))
- # fetch source proto
- response = self._pull_source(ref, remote)
- if response is None:
+ source_proto = self._pull_source(ref, remote)
+ if source_proto is None:
source.info("Remote source service ({}) does not have source {} cached".format(
- remote.spec.url, display_key))
+ remote, display_key))
continue
+ except CASError as e:
+ raise SourceCacheError("Failed to pull source {}: {}".format(
+ display_key, e)) from e
+
+ if not source_proto:
+ return False
+
+ for remote in storage_remotes:
+ try:
+ remote.init()
+ source.status("Pulling data for source {} <- {}".format(display_key, remote))
# Fetch source blobs
- self.cas._fetch_directory(remote, response.files)
- required_blobs = self.cas.required_blobs_for_directory(response.files)
+ self.cas._fetch_directory(remote, source_proto.files)
+ required_blobs = self.cas.required_blobs_for_directory(source_proto.files)
missing_blobs = self.cas.local_missing_blobs(required_blobs)
missing_blobs = self.cas.fetch_blobs(remote, missing_blobs)
if missing_blobs:
source.info("Remote cas ({}) does not have source {} cached".format(
- remote.spec.url, display_key))
+ remote, display_key))
continue
- source.info("Pulled source {} <- {}".format(display_key, remote.spec.url))
+ source.info("Pulled source {} <- {}".format(display_key, remote))
return True
-
except CASError as e:
raise SourceCacheError("Failed to pull source {}: {}".format(
display_key, e)) from e
+
return False
# push()
@@ -232,41 +267,48 @@ class SourceCache(BaseCache):
ref = source._get_source_name()
project = source._get_project()
+ index_remotes = []
+ storage_remotes = []
+
# find configured push remotes for this source
if self._has_push_remotes:
- push_remotes = [r for r in self._remotes[project] if r.spec.push]
- else:
- push_remotes = []
+ index_remotes = [r for r in self._index_remotes[project] if r.push]
+ storage_remotes = [r for r in self._storage_remotes[project] if r.push]
- pushed = False
+ pushed_storage = False
+ pushed_index = False
display_key = source._get_brief_display_key()
- for remote in push_remotes:
+ for remote in storage_remotes:
remote.init()
- source.status("Pushing source {} -> {}".format(display_key, remote.spec.url))
-
- # check whether cache has files already
- if self._pull_source(ref, remote) is not None:
- source.info("Remote ({}) already has source {} cached"
- .format(remote.spec.url, display_key))
- continue
+ source.status("Pushing data for source {} -> {}".format(display_key, remote))
- # push files to storage
source_proto = self._get_source(ref)
try:
self.cas._send_directory(remote, source_proto.files)
+ pushed_storage = True
except CASRemoteError:
- source.info("Failed to push source files {} -> {}".format(display_key, remote.spec.url))
+ source.info("Failed to push source files {} -> {}".format(display_key, remote))
+ continue
+
+ for remote in index_remotes:
+ remote.init()
+ source.status("Pushing source {} -> {}".format(display_key, remote))
+
+ # check whether cache has files already
+ if self._pull_source(ref, remote) is not None:
+ source.info("Remote ({}) already has source {} cached"
+ .format(remote, display_key))
continue
if not self._push_source(ref, remote):
- source.info("Failed to push source metadata {} -> {}".format(display_key, remote.spec.url))
+ source.info("Failed to push source metadata {} -> {}".format(display_key, remote))
continue
- source.info("Pushed source {} -> {}".format(display_key, remote.spec.url))
- pushed = True
+ source.info("Pushed source {} -> {}".format(display_key, remote))
+ pushed_index = True
- return pushed
+ return pushed_index and pushed_storage
def _remove_source(self, ref, *, defer_prune=False):
return self.cas.remove(ref, basedir=self.sourcerefdir, defer_prune=defer_prune)
@@ -315,14 +357,8 @@ class SourceCache(BaseCache):
def _pull_source(self, source_ref, remote):
try:
remote.init()
-
- request = source_pb2.GetSourceRequest()
- request.cache_key = source_ref
-
- response = remote.source_service.GetSource(request)
-
+ response = remote.get_source(source_ref)
self._store_proto(response, source_ref)
-
return response
except grpc.RpcError as e:
@@ -333,14 +369,8 @@ class SourceCache(BaseCache):
def _push_source(self, source_ref, remote):
try:
remote.init()
-
source_proto = self._get_source(source_ref)
-
- request = source_pb2.UpdateSourceRequest()
- request.cache_key = source_ref
- request.source.CopyFrom(source_proto)
-
- return remote.source_service.UpdateSource(request)
+ return remote.update_source(source_ref, source_proto)
except grpc.RpcError as e:
if e.code() != grpc.StatusCode.RESOURCE_EXHAUSTED:
diff --git a/src/buildstream/sandbox/_sandboxremote.py b/src/buildstream/sandbox/_sandboxremote.py
index cf034dbf9..678b11c32 100644
--- a/src/buildstream/sandbox/_sandboxremote.py
+++ b/src/buildstream/sandbox/_sandboxremote.py
@@ -37,7 +37,8 @@ from .._protos.google.rpc import code_pb2
from .._exceptions import BstError, SandboxError
from .. import _yaml
from .._protos.google.longrunning import operations_pb2, operations_pb2_grpc
-from .._cas import CASRemote, CASRemoteSpec
+from .._cas import CASRemote
+from .._remote import RemoteSpec
class RemoteExecutionSpec(namedtuple('RemoteExecutionSpec', 'exec_service storage_service action_service')):
@@ -98,11 +99,11 @@ class SandboxRemote(Sandbox):
self.exec_instance = config.exec_service.get('instance-name', None)
self.storage_instance = config.storage_service.get('instance-name', None)
- self.storage_remote_spec = CASRemoteSpec(self.storage_url, push=True,
- server_cert=config.storage_service.get('server-cert'),
- client_key=config.storage_service.get('client-key'),
- client_cert=config.storage_service.get('client-cert'),
- instance_name=self.storage_instance)
+ self.storage_remote_spec = RemoteSpec(self.storage_url, push=True,
+ server_cert=config.storage_service.get('server-cert'),
+ client_key=config.storage_service.get('client-key'),
+ client_cert=config.storage_service.get('client-cert'),
+ instance_name=self.storage_instance)
self.operation_name = None
def info(self, msg):
diff --git a/tests/artifactcache/config.py b/tests/artifactcache/config.py
index 08d6f74bb..8b01a9ebe 100644
--- a/tests/artifactcache/config.py
+++ b/tests/artifactcache/config.py
@@ -6,7 +6,8 @@ import os
import pytest
-from buildstream._artifactcache import ArtifactCacheSpec, ArtifactCache
+from buildstream._remote import RemoteSpec, RemoteType
+from buildstream._artifactcache import ArtifactCache
from buildstream._project import Project
from buildstream.utils import _deduplicate
from buildstream import _yaml
@@ -18,17 +19,33 @@ from tests.testutils import dummy_context
DATA_DIR = os.path.dirname(os.path.realpath(__file__))
-cache1 = ArtifactCacheSpec(url='https://example.com/cache1', push=True)
-cache2 = ArtifactCacheSpec(url='https://example.com/cache2', push=False)
-cache3 = ArtifactCacheSpec(url='https://example.com/cache3', push=False)
-cache4 = ArtifactCacheSpec(url='https://example.com/cache4', push=False)
-cache5 = ArtifactCacheSpec(url='https://example.com/cache5', push=False)
-cache6 = ArtifactCacheSpec(url='https://example.com/cache6', push=True)
+cache1 = RemoteSpec(url='https://example.com/cache1', push=True)
+cache2 = RemoteSpec(url='https://example.com/cache2', push=False)
+cache3 = RemoteSpec(url='https://example.com/cache3', push=False)
+cache4 = RemoteSpec(url='https://example.com/cache4', push=False)
+cache5 = RemoteSpec(url='https://example.com/cache5', push=False)
+cache6 = RemoteSpec(url='https://example.com/cache6',
+ push=True,
+ type=RemoteType.ALL)
+cache7 = RemoteSpec(url='https://index.example.com/cache1',
+ push=True,
+ type=RemoteType.INDEX)
+cache8 = RemoteSpec(url='https://storage.example.com/cache1',
+ push=True,
+ type=RemoteType.STORAGE)
# Generate cache configuration fragments for the user config and project config files.
#
-def configure_remote_caches(override_caches, project_caches=None, user_caches=None):
+def configure_remote_caches(override_caches,
+ project_caches=None,
+ user_caches=None):
+ type_strings = {
+ RemoteType.INDEX: 'index',
+ RemoteType.STORAGE: 'storage',
+ RemoteType.ALL: 'all'
+ }
+
if project_caches is None:
project_caches = []
@@ -40,10 +57,15 @@ def configure_remote_caches(override_caches, project_caches=None, user_caches=No
user_config['artifacts'] = {
'url': user_caches[0].url,
'push': user_caches[0].push,
+ 'type': type_strings[user_caches[0].type]
}
elif len(user_caches) > 1:
user_config['artifacts'] = [
- {'url': cache.url, 'push': cache.push} for cache in user_caches
+ {
+ 'url': cache.url,
+ 'push': cache.push,
+ 'type': type_strings[cache.type]
+ } for cache in user_caches
]
if len(override_caches) == 1:
@@ -52,6 +74,7 @@ def configure_remote_caches(override_caches, project_caches=None, user_caches=No
'artifacts': {
'url': override_caches[0].url,
'push': override_caches[0].push,
+ 'type': type_strings[override_caches[0].type]
}
}
}
@@ -59,7 +82,11 @@ def configure_remote_caches(override_caches, project_caches=None, user_caches=No
user_config['projects'] = {
'test': {
'artifacts': [
- {'url': cache.url, 'push': cache.push} for cache in override_caches
+ {
+ 'url': cache.url,
+ 'push': cache.push,
+ 'type': type_strings[cache.type]
+ } for cache in override_caches
]
}
}
@@ -71,12 +98,17 @@ def configure_remote_caches(override_caches, project_caches=None, user_caches=No
'artifacts': {
'url': project_caches[0].url,
'push': project_caches[0].push,
+ 'type': type_strings[project_caches[0].type],
}
})
elif len(project_caches) > 1:
project_config.update({
'artifacts': [
- {'url': cache.url, 'push': cache.push} for cache in project_caches
+ {
+ 'url': cache.url,
+ 'push': cache.push,
+ 'type': type_strings[cache.type]
+ } for cache in project_caches
]
})
@@ -95,6 +127,7 @@ def configure_remote_caches(override_caches, project_caches=None, user_caches=No
pytest.param([cache1], [cache2], [cache3], id='project-override-in-user-config'),
pytest.param([cache1, cache2], [cache3, cache4], [cache5, cache6], id='list-order'),
pytest.param([cache1, cache2, cache1], [cache2], [cache2, cache1], id='duplicates'),
+ pytest.param([cache7, cache8], [], [cache1], id='split-caches')
])
def test_artifact_cache_precedence(tmpdir, override_caches, project_caches, user_caches):
# Produce a fake user and project config with the cache configuration.
@@ -148,3 +181,36 @@ def test_missing_certs(cli, datafiles, config_key, config_value):
# This does not happen for a simple `bst show`.
result = cli.run(project=project, args=['artifact', 'pull', 'element.bst'])
result.assert_main_error(ErrorDomain.LOAD, LoadErrorReason.INVALID_DATA)
+
+
+# Assert that BuildStream complains when someone attempts to define
+# only one type of storage.
+@pytest.mark.datafiles(DATA_DIR)
+@pytest.mark.parametrize(
+ 'override_caches, project_caches, user_caches',
+ [
+ # The leftmost cache is the highest priority one in all cases here.
+ pytest.param([], [], [cache7], id='index-user'),
+ pytest.param([], [], [cache8], id='storage-user'),
+ pytest.param([], [cache7], [], id='index-project'),
+ pytest.param([], [cache8], [], id='storage-project'),
+ pytest.param([cache7], [], [], id='index-override'),
+ pytest.param([cache8], [], [], id='storage-override'),
+ ])
+def test_only_one(cli, datafiles, override_caches, project_caches, user_caches):
+ project = os.path.join(datafiles.dirname, datafiles.basename, 'only-one')
+
+ # Produce a fake user and project config with the cache configuration.
+ user_config, project_config = configure_remote_caches(override_caches, project_caches, user_caches)
+ project_config['name'] = 'test'
+
+ cli.configure(user_config)
+
+ project_config_file = os.path.join(project, 'project.conf')
+ _yaml.roundtrip_dump(project_config, file=project_config_file)
+
+ # Use `pull` here to ensure we try to initialize the remotes, triggering the error
+ #
+ # This does not happen for a simple `bst show`.
+ result = cli.run(project=project, args=['artifact', 'pull', 'element.bst'])
+ result.assert_main_error(ErrorDomain.STREAM, None)
diff --git a/tests/artifactcache/only-one/element.bst b/tests/artifactcache/only-one/element.bst
new file mode 100644
index 000000000..3c29b4ea1
--- /dev/null
+++ b/tests/artifactcache/only-one/element.bst
@@ -0,0 +1 @@
+kind: autotools
diff --git a/tests/artifactcache/pull.py b/tests/artifactcache/pull.py
index 71db3e338..2e33af3ac 100644
--- a/tests/artifactcache/pull.py
+++ b/tests/artifactcache/pull.py
@@ -66,7 +66,7 @@ def test_pull(cli, tmpdir, datafiles):
# Assert that we are now cached locally
assert cli.get_element_state(project_dir, 'target.bst') == 'cached'
# Assert that we shared/pushed the cached artifact
- assert share.has_artifact(cli.get_artifact_name(project_dir, 'test', 'target.bst'))
+ assert share.get_artifact(cli.get_artifact_name(project_dir, 'test', 'target.bst'))
# Delete the artifact locally
cli.remove_artifact_from_cache(project_dir, 'target.bst')
@@ -138,7 +138,7 @@ def test_pull_tree(cli, tmpdir, datafiles):
# Assert that we are now cached locally
assert cli.get_element_state(project_dir, 'target.bst') == 'cached'
# Assert that we shared/pushed the cached artifact
- assert share.has_artifact(cli.get_artifact_name(project_dir, 'test', 'target.bst'))
+ assert share.get_artifact(cli.get_artifact_name(project_dir, 'test', 'target.bst'))
with dummy_context(config=user_config_file) as context:
# Load the project and CAS cache
diff --git a/tests/artifactcache/push.py b/tests/artifactcache/push.py
index 20d9ccfec..8b00d0fb7 100644
--- a/tests/artifactcache/push.py
+++ b/tests/artifactcache/push.py
@@ -10,7 +10,7 @@ from buildstream._project import Project
from buildstream._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
from buildstream.testing import cli # pylint: disable=unused-import
-from tests.testutils import create_artifact_share, dummy_context
+from tests.testutils import create_artifact_share, create_split_share, dummy_context
# Project directory
@@ -20,6 +20,39 @@ DATA_DIR = os.path.join(
)
+# Push the given element and return its artifact key for assertions.
+def _push(cli, cache_dir, project_dir, config_file, target):
+ with dummy_context(config=config_file) as context:
+ # Load the project manually
+ project = Project(project_dir, context)
+ project.ensure_fully_loaded()
+
+ # Assert that the element's artifact is cached
+ element = project.load_elements(['target.bst'])[0]
+ element_key = cli.get_element_key(project_dir, 'target.bst')
+ assert cli.artifact.is_cached(cache_dir, element, element_key)
+
+ # Create a local artifact cache handle
+ artifactcache = context.artifactcache
+
+ # Ensure the element's artifact memeber is initialised
+ # This is duplicated from Pipeline.resolve_elements()
+ # as this test does not use the cli frontend.
+ for e in element.dependencies(Scope.ALL):
+ # Determine initial element state.
+ e._update_state()
+
+ # Manually setup the CAS remotes
+ artifactcache.setup_remotes(use_config=True)
+ artifactcache.initialize_remotes()
+
+ assert artifactcache.has_push_remotes(plugin=element), \
+ "No remote configured for element target.bst"
+ assert element._push(), "Push operation failed"
+
+ return element_key
+
+
@pytest.mark.in_subprocess
@pytest.mark.datafiles(DATA_DIR)
def test_push(cli, tmpdir, datafiles):
@@ -50,36 +83,52 @@ def test_push(cli, tmpdir, datafiles):
# Write down the user configuration file
_yaml.roundtrip_dump(user_config, file=user_config_file)
+ element_key = _push(cli, rootcache_dir, project_dir, user_config_file, 'target.bst')
+ assert share.get_artifact(cli.get_artifact_name(project_dir, 'test', 'target.bst', cache_key=element_key))
- with dummy_context(config=user_config_file) as context:
- # Load the project manually
- project = Project(project_dir, context)
- project.ensure_fully_loaded()
- # Assert that the element's artifact is cached
- element = project.load_elements(['target.bst'])[0]
- element_key = cli.get_element_key(project_dir, 'target.bst')
- assert cli.artifact.is_cached(rootcache_dir, element, element_key)
+@pytest.mark.in_subprocess
+@pytest.mark.datafiles(DATA_DIR)
+def test_push_split(cli, tmpdir, datafiles):
+ project_dir = str(datafiles)
- # Create a local artifact cache handle
- artifactcache = context.artifactcache
+ # First build the project without the artifact cache configured
+ result = cli.run(project=project_dir, args=['build', 'target.bst'])
+ result.assert_success()
- # Ensure the element's artifact memeber is initialised
- # This is duplicated from Pipeline.resolve_elements()
- # as this test does not use the cli frontend.
- for e in element.dependencies(Scope.ALL):
- # Determine initial element state.
- e._update_state()
+ # Assert that we are now cached locally
+ assert cli.get_element_state(project_dir, 'target.bst') == 'cached'
- # Manually setup the CAS remotes
- artifactcache.setup_remotes(use_config=True)
- artifactcache.initialize_remotes()
+ indexshare = os.path.join(str(tmpdir), 'indexshare')
+ storageshare = os.path.join(str(tmpdir), 'storageshare')
- assert artifactcache.has_push_remotes(plugin=element), \
- "No remote configured for element target.bst"
- assert element._push(), "Push operation failed"
+ # Set up an artifact cache.
+ with create_split_share(indexshare, storageshare) as (index, storage):
+ rootcache_dir = os.path.join(str(tmpdir), 'cache')
+ user_config = {
+ 'scheduler': {
+ 'pushers': 1
+ },
+ 'artifacts': [{
+ 'url': index.repo,
+ 'push': True,
+ 'type': 'index'
+ }, {
+ 'url': storage.repo,
+ 'push': True,
+ 'type': 'storage'
+ }],
+ 'cachedir': rootcache_dir
+ }
+ config_path = str(tmpdir.join('buildstream.conf'))
+ _yaml.roundtrip_dump(user_config, file=config_path)
- assert share.has_artifact(cli.get_artifact_name(project_dir, 'test', 'target.bst', cache_key=element_key))
+ element_key = _push(cli, rootcache_dir, project_dir, config_path, 'target.bst')
+ proto = index.get_artifact_proto(cli.get_artifact_name(project_dir,
+ 'test',
+ 'target.bst',
+ cache_key=element_key))
+ assert storage.get_cas_files(proto) is not None
@pytest.mark.in_subprocess
@@ -88,7 +137,8 @@ def test_push_message(tmpdir, datafiles):
project_dir = str(datafiles)
# Set up an artifact cache.
- with create_artifact_share(os.path.join(str(tmpdir), 'artifactshare')) as share:
+ artifactshare = os.path.join(str(tmpdir), 'artifactshare')
+ with create_artifact_share(artifactshare) as share:
# Configure artifact share
rootcache_dir = os.path.join(str(tmpdir), 'cache')
user_config_file = str(tmpdir.join('buildstream.conf'))
diff --git a/tests/frontend/artifact_delete.py b/tests/frontend/artifact_delete.py
index 80870c81a..a9f5ec6da 100644
--- a/tests/frontend/artifact_delete.py
+++ b/tests/frontend/artifact_delete.py
@@ -159,7 +159,7 @@ def test_artifact_delete_pulled_artifact_without_buildtree(cli, tmpdir, datafile
result.assert_success()
# Make sure it's in the share
- assert remote.has_artifact(cli.get_artifact_name(project, 'test', element))
+ assert remote.get_artifact(cli.get_artifact_name(project, 'test', element))
# Delete and then pull the artifact (without its buildtree)
result = cli.run(project=project, args=['artifact', 'delete', element])
diff --git a/tests/frontend/artifact_show.py b/tests/frontend/artifact_show.py
index 913dde9c8..76ea93d63 100644
--- a/tests/frontend/artifact_show.py
+++ b/tests/frontend/artifact_show.py
@@ -124,7 +124,7 @@ def test_artifact_show_element_available_remotely(cli, tmpdir, datafiles):
result.assert_success()
# Make sure it's in the share
- assert remote.has_artifact(cli.get_artifact_name(project, 'test', element))
+ assert remote.get_artifact(cli.get_artifact_name(project, 'test', element))
# Delete the artifact from the local cache
result = cli.run(project=project, args=['artifact', 'delete', element])
diff --git a/tests/frontend/push.py b/tests/frontend/push.py
index 4f0fa3c19..31f96cbdf 100644
--- a/tests/frontend/push.py
+++ b/tests/frontend/push.py
@@ -464,7 +464,16 @@ def test_artifact_too_large(cli, datafiles, tmpdir):
# Create and try to push a 6MB element.
create_element_size('large_element.bst', project, element_path, [], int(6e6))
result = cli.run(project=project, args=['build', 'large_element.bst'])
- result.assert_success()
+ # This should fail; the server will refuse to store the CAS
+ # blobs for the artifact, and then fail to find the files for
+ # the uploaded artifact proto.
+ #
+ # FIXME: This should be extremely uncommon in practice, since
+ # the artifact needs to be at least half the cache size for
+ # this to happen. Nonetheless, a nicer error message would be
+ # nice (perhaps we should just disallow uploading artifacts
+ # that large).
+ result.assert_main_error(ErrorDomain.STREAM, None)
# Ensure that the small artifact is still in the share
states = cli.get_element_states(project, ['small_element.bst', 'large_element.bst'])
@@ -560,7 +569,7 @@ def test_push_cross_junction(cli, tmpdir, datafiles):
cli.run(project=project, args=['artifact', 'push', 'junction.bst:import-etc.bst'])
cache_key = cli.get_element_key(project, 'junction.bst:import-etc.bst')
- assert share.has_artifact(cli.get_artifact_name(project, 'subtest', 'import-etc.bst', cache_key=cache_key))
+ assert share.get_artifact(cli.get_artifact_name(project, 'subtest', 'import-etc.bst', cache_key=cache_key))
@pytest.mark.datafiles(DATA_DIR)
diff --git a/tests/integration/artifact.py b/tests/integration/artifact.py
index 4180bf6fd..59b7bfaad 100644
--- a/tests/integration/artifact.py
+++ b/tests/integration/artifact.py
@@ -70,12 +70,12 @@ def test_cache_buildtrees(cli, tmpdir, datafiles):
result = cli.run(project=project, args=['build', element_name])
assert result.exit_code == 0
assert cli.get_element_state(project, element_name) == 'cached'
- assert share1.has_artifact(cli.get_artifact_name(project, 'test', element_name))
+ assert share1.get_artifact(cli.get_artifact_name(project, 'test', element_name))
# The buildtree dir should not exist, as we set the config to not cache buildtrees.
artifact_name = cli.get_artifact_name(project, 'test', element_name)
- assert share1.has_artifact(artifact_name)
+ assert share1.get_artifact(artifact_name)
with cli.artifact.extract_buildtree(cwd, cwd, artifact_name) as buildtreedir:
assert not buildtreedir
@@ -111,7 +111,7 @@ def test_cache_buildtrees(cli, tmpdir, datafiles):
result = cli.run(project=project, args=['--cache-buildtrees', 'always', 'build', element_name])
assert result.exit_code == 0
assert cli.get_element_state(project, element_name) == 'cached'
- assert share2.has_artifact(cli.get_artifact_name(project, 'test', element_name))
+ assert share2.get_artifact(cli.get_artifact_name(project, 'test', element_name))
# Cache key will be the same however the digest hash will have changed as expected, so reconstruct paths
with cli.artifact.extract_buildtree(cwd, cwd, artifact_name) as buildtreedir:
diff --git a/tests/integration/cachedfail.py b/tests/integration/cachedfail.py
index f8dd52aa6..08733ab71 100644
--- a/tests/integration/cachedfail.py
+++ b/tests/integration/cachedfail.py
@@ -180,7 +180,7 @@ def test_push_cached_fail(cli, tmpdir, datafiles, on_error):
# This element should have failed
assert cli.get_element_state(project, 'element.bst') == 'failed'
# This element should have been pushed to the remote
- assert share.has_artifact(cli.get_artifact_name(project, 'test', 'element.bst'))
+ assert share.get_artifact(cli.get_artifact_name(project, 'test', 'element.bst'))
@pytest.mark.skipif(HAVE_SANDBOX != 'bwrap', reason='Only available with bubblewrap on Linux')
diff --git a/tests/integration/pullbuildtrees.py b/tests/integration/pullbuildtrees.py
index af9186b1b..4251aa6d0 100644
--- a/tests/integration/pullbuildtrees.py
+++ b/tests/integration/pullbuildtrees.py
@@ -56,7 +56,7 @@ def test_pullbuildtrees(cli2, tmpdir, datafiles):
result = cli2.run(project=project, args=['build', element_name])
assert result.exit_code == 0
assert cli2.get_element_state(project, element_name) == 'cached'
- assert share1.has_artifact(cli2.get_artifact_name(project, 'test', element_name))
+ assert share1.get_artifact(cli2.get_artifact_name(project, 'test', element_name))
default_state(cli2, tmpdir, share1)
# Pull artifact with default config, assert that pulling again
@@ -116,7 +116,7 @@ def test_pullbuildtrees(cli2, tmpdir, datafiles):
cli2.configure({'artifacts': {'url': share2.repo, 'push': True}})
result = cli2.run(project=project, args=['artifact', 'push', element_name])
assert element_name not in result.get_pushed_elements()
- assert not share2.has_artifact(cli2.get_artifact_name(project, 'test', element_name))
+ assert not share2.get_artifact(cli2.get_artifact_name(project, 'test', element_name))
# Assert that after pulling the missing buildtree the element artifact can be
# successfully pushed to the remote. This will attempt to pull the buildtree
@@ -127,7 +127,7 @@ def test_pullbuildtrees(cli2, tmpdir, datafiles):
cli2.configure({'artifacts': {'url': share2.repo, 'push': True}})
result = cli2.run(project=project, args=['artifact', 'push', element_name])
assert element_name in result.get_pushed_elements()
- assert share2.has_artifact(cli2.get_artifact_name(project, 'test', element_name))
+ assert share2.get_artifact(cli2.get_artifact_name(project, 'test', element_name))
default_state(cli2, tmpdir, share1)
# Assert that bst artifact push will automatically attempt to pull a missing buildtree
@@ -143,7 +143,7 @@ def test_pullbuildtrees(cli2, tmpdir, datafiles):
with cli2.artifact.extract_buildtree(cwd, cwd, artifact_name) as buildtreedir:
assert not buildtreedir
assert element_name not in result.get_pushed_elements()
- assert not share3.has_artifact(cli2.get_artifact_name(project, 'test', element_name))
+ assert not share3.get_artifact(cli2.get_artifact_name(project, 'test', element_name))
# Assert that if we add an extra remote that has the buildtree artfact cached, bst artifact push will
# automatically attempt to pull it and will be successful, leading to the full artifact being pushed
@@ -156,7 +156,7 @@ def test_pullbuildtrees(cli2, tmpdir, datafiles):
with cli2.artifact.extract_buildtree(cwd, cwd, artifact_name) as buildtreedir:
assert os.path.isdir(buildtreedir)
assert element_name in result.get_pushed_elements()
- assert share3.has_artifact(cli2.get_artifact_name(project, 'test', element_name))
+ assert share3.get_artifact(cli2.get_artifact_name(project, 'test', element_name))
# Ensure that only valid pull-buildtrees boolean options make it through the loading
diff --git a/tests/integration/shellbuildtrees.py b/tests/integration/shellbuildtrees.py
index 81a279479..146bc6062 100644
--- a/tests/integration/shellbuildtrees.py
+++ b/tests/integration/shellbuildtrees.py
@@ -239,7 +239,7 @@ def test_buildtree_options(cli, tmpdir, datafiles):
result = cli.run(project=project, args=['--cache-buildtrees', 'always', 'build', element_name])
result.assert_success()
assert cli.get_element_state(project, element_name) == 'cached'
- assert share.has_artifact(cli.get_artifact_name(project, 'test', element_name))
+ assert share.get_artifact(cli.get_artifact_name(project, 'test', element_name))
# Discard the cache
shutil.rmtree(str(os.path.join(str(tmpdir), 'cache', 'cas')))
diff --git a/tests/sourcecache/push.py b/tests/sourcecache/push.py
index ad9653f9d..1be2d40cd 100644
--- a/tests/sourcecache/push.py
+++ b/tests/sourcecache/push.py
@@ -21,6 +21,8 @@
# pylint: disable=redefined-outer-name
import os
import shutil
+from contextlib import contextmanager, ExitStack
+
import pytest
from buildstream._exceptions import ErrorDomain
@@ -38,6 +40,82 @@ def message_handler(message, is_silenced):
pass
+# Args:
+# tmpdir: A temporary directory to use as root.
+# directories: Directory names to use as cache directories.
+#
+@contextmanager
+def _configure_caches(tmpdir, *directories):
+ with ExitStack() as stack:
+ def create_share(directory):
+ return create_artifact_share(os.path.join(str(tmpdir), directory))
+
+ yield (stack.enter_context(create_share(remote)) for remote in directories)
+
+
+@pytest.mark.datafiles(DATA_DIR)
+def test_source_push_split(cli, tmpdir, datafiles):
+ cache_dir = os.path.join(str(tmpdir), 'cache')
+ project_dir = str(datafiles)
+
+ with _configure_caches(tmpdir, 'indexshare', 'storageshare') as (index, storage):
+ user_config_file = str(tmpdir.join('buildstream.conf'))
+ user_config = {
+ 'scheduler': {
+ 'pushers': 1
+ },
+ 'source-caches': [{
+ 'url': index.repo,
+ 'push': True,
+ 'type': 'index'
+ }, {
+ 'url': storage.repo,
+ 'push': True,
+ 'type': 'storage'
+ }],
+ 'cachedir': cache_dir
+ }
+ _yaml.roundtrip_dump(user_config, file=user_config_file)
+ cli.configure(user_config)
+
+ repo = create_repo('git', str(tmpdir))
+ ref = repo.create(os.path.join(project_dir, 'files'))
+ element_path = os.path.join(project_dir, 'elements')
+ element_name = 'push.bst'
+ element = {
+ 'kind': 'import',
+ 'sources': [repo.source_config(ref=ref)]
+ }
+ _yaml.roundtrip_dump(element, os.path.join(element_path, element_name))
+
+ # get the source object
+ with dummy_context(config=user_config_file) as context:
+ project = Project(project_dir, context)
+ project.ensure_fully_loaded()
+
+ element = project.load_elements(['push.bst'])[0]
+ assert not element._source_cached()
+ source = list(element.sources())[0]
+
+ # check we don't have it in the current cache
+ cas = context.get_cascache()
+ assert not cas.contains(source._get_source_name())
+
+ # build the element, this should fetch and then push the source to the
+ # remote
+ res = cli.run(project=project_dir, args=['build', 'push.bst'])
+ res.assert_success()
+ assert "Pushed source" in res.stderr
+
+ # check that we've got the remote locally now
+ sourcecache = context.sourcecache
+ assert sourcecache.contains(source)
+
+ # check that the remote CAS now has it
+ digest = sourcecache.export(source)._get_digest()
+ assert storage.has_object(digest)
+
+
@pytest.mark.datafiles(DATA_DIR)
def test_source_push(cli, tmpdir, datafiles):
cache_dir = os.path.join(str(tmpdir), 'cache')
diff --git a/tests/testutils/__init__.py b/tests/testutils/__init__.py
index 25fa6d763..9642ddf47 100644
--- a/tests/testutils/__init__.py
+++ b/tests/testutils/__init__.py
@@ -23,7 +23,7 @@
# William Salmon <will.salmon@codethink.co.uk>
#
-from .artifactshare import create_artifact_share, assert_shared, assert_not_shared
+from .artifactshare import create_artifact_share, create_split_share, assert_shared, assert_not_shared
from .context import dummy_context
from .element_generators import create_element_size, update_element_size
from .junction import generate_junction
diff --git a/tests/testutils/artifactshare.py b/tests/testutils/artifactshare.py
index 7d5faeb66..f883b3d59 100644
--- a/tests/testutils/artifactshare.py
+++ b/tests/testutils/artifactshare.py
@@ -25,7 +25,7 @@ from buildstream._protos.buildstream.v2 import artifact_pb2
#
class ArtifactShare():
- def __init__(self, directory, *, quota=None, casd=False):
+ def __init__(self, directory, *, quota=None, casd=False, index_only=False):
# The working directory for the artifact share (in case it
# needs to do something outside of its backend's storage folder).
@@ -45,6 +45,7 @@ class ArtifactShare():
self.cas = CASCache(self.repodir, casd=casd)
self.quota = quota
+ self.index_only = index_only
q = Queue()
@@ -72,7 +73,8 @@ class ArtifactShare():
try:
with create_server(self.repodir,
quota=self.quota,
- enable_push=True) as server:
+ enable_push=True,
+ index_only=self.index_only) as server:
port = server.add_insecure_port('localhost:0')
server.start()
@@ -104,17 +106,7 @@ class ArtifactShare():
return os.path.exists(object_path)
- # has_artifact():
- #
- # Checks whether the artifact is present in the share
- #
- # Args:
- # artifact_name (str): The composed complete artifact name
- #
- # Returns:
- # (str): artifact digest if the artifact exists in the share, otherwise None.
- def has_artifact(self, artifact_name):
-
+ def get_artifact_proto(self, artifact_name):
artifact_proto = artifact_pb2.Artifact()
artifact_path = os.path.join(self.artifactdir, artifact_name)
@@ -124,6 +116,10 @@ class ArtifactShare():
except FileNotFoundError:
return None
+ return artifact_proto
+
+ def get_cas_files(self, artifact_proto):
+
reachable = set()
def reachable_dir(digest):
@@ -153,6 +149,21 @@ class ArtifactShare():
except FileNotFoundError:
return None
+ # has_artifact():
+ #
+ # Checks whether the artifact is present in the share
+ #
+ # Args:
+ # artifact_name (str): The composed complete artifact name
+ #
+ # Returns:
+ # (ArtifactProto): artifact digest if the artifact exists in the share, otherwise None.
+ def get_artifact(self, artifact_name):
+ artifact_proto = self.get_artifact_proto(artifact_name)
+ if not artifact_proto:
+ return None
+ return self.get_cas_files(artifact_proto)
+
# close():
#
# Remove the artifact share.
@@ -179,13 +190,25 @@ def create_artifact_share(directory, *, quota=None, casd=False):
share.close()
+@contextmanager
+def create_split_share(directory1, directory2, *, quota=None, casd=False):
+ index = ArtifactShare(directory1, quota=quota, casd=casd, index_only=True)
+ storage = ArtifactShare(directory2, quota=quota, casd=casd)
+
+ try:
+ yield index, storage
+ finally:
+ index.close()
+ storage.close()
+
+
statvfs_result = namedtuple('statvfs_result', 'f_blocks f_bfree f_bsize f_bavail')
# Assert that a given artifact is in the share
#
def assert_shared(cli, share, project, element_name, *, project_name='test'):
- if not share.has_artifact(cli.get_artifact_name(project, project_name, element_name)):
+ if not share.get_artifact(cli.get_artifact_name(project, project_name, element_name)):
raise AssertionError("Artifact share at {} does not contain the expected element {}"
.format(share.repo, element_name))
@@ -193,6 +216,6 @@ def assert_shared(cli, share, project, element_name, *, project_name='test'):
# Assert that a given artifact is not in the share
#
def assert_not_shared(cli, share, project, element_name, *, project_name='test'):
- if share.has_artifact(cli.get_artifact_name(project, project_name, element_name)):
+ if share.get_artifact(cli.get_artifact_name(project, project_name, element_name)):
raise AssertionError("Artifact share at {} unexpectedly contains the element {}"
.format(share.repo, element_name))