summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorbst-marge-bot <marge-bot@buildstream.build>2019-08-20 07:12:05 +0000
committerbst-marge-bot <marge-bot@buildstream.build>2019-08-20 07:12:05 +0000
commit56ff33fbd7c5af1518f27a040da37520b1a3e247 (patch)
tree2a4e3468ab091770267970804e03f8518392b5b0
parente92781bdb7e76b91195fef84039fe7ff51cd02bf (diff)
parent4e867691dbebf91ceb24e7dabea6cbc93399222d (diff)
downloadbuildstream-56ff33fbd7c5af1518f27a040da37520b1a3e247.tar.gz
Merge branch 'juerg/casd' into 'master'
Use buildbox-casd for CAS access See merge request BuildStream/buildstream!1499
-rw-r--r--.gitlab-ci.yml34
-rw-r--r--src/buildstream/_artifactcache.py150
-rw-r--r--src/buildstream/_basecache.py6
-rw-r--r--src/buildstream/_cas/__init__.py2
-rw-r--r--src/buildstream/_cas/cascache.py746
-rw-r--r--src/buildstream/_cas/casremote.py167
-rw-r--r--src/buildstream/_cas/casserver.py299
-rw-r--r--src/buildstream/_context.py33
-rw-r--r--src/buildstream/_frontend/cli.py6
-rw-r--r--src/buildstream/_frontend/status.py28
-rw-r--r--src/buildstream/_frontend/widget.py1
-rw-r--r--src/buildstream/_protos/build/buildgrid/__init__.py0
-rw-r--r--src/buildstream/_protos/build/buildgrid/local_cas.proto373
-rw-r--r--src/buildstream/_protos/build/buildgrid/local_cas_pb2.py1052
-rw-r--r--src/buildstream/_protos/build/buildgrid/local_cas_pb2_grpc.py238
-rw-r--r--src/buildstream/_scheduler/jobs/__init__.py2
-rw-r--r--src/buildstream/_scheduler/jobs/cachesizejob.py48
-rw-r--r--src/buildstream/_scheduler/jobs/cleanupjob.py55
-rw-r--r--src/buildstream/_scheduler/queues/buildqueue.py29
-rw-r--r--src/buildstream/_scheduler/queues/pullqueue.py6
-rw-r--r--src/buildstream/_scheduler/scheduler.py168
-rw-r--r--src/buildstream/_sourcecache.py43
-rw-r--r--src/buildstream/_stream.py30
-rw-r--r--src/buildstream/element.py15
-rw-r--r--src/buildstream/sandbox/_sandboxremote.py6
-rw-r--r--src/buildstream/testing/runcli.py2
-rw-r--r--tests/artifactcache/artifactservice.py27
-rw-r--r--tests/artifactcache/cache_size.py90
-rw-r--r--tests/artifactcache/expiry.py37
-rw-r--r--tests/frontend/push.py22
-rw-r--r--tests/integration/cachedfail.py15
-rw-r--r--tests/internals/storage.py93
-rw-r--r--tests/internals/storage_vdir_import.py353
-rw-r--r--tests/sandboxes/missing_dependencies.py17
-rw-r--r--tests/sandboxes/selection.py15
-rw-r--r--tests/testutils/artifactshare.py63
36 files changed, 2426 insertions, 1845 deletions
diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml
index 2bdaab0fb..9b7db92cf 100644
--- a/.gitlab-ci.yml
+++ b/.gitlab-ci.yml
@@ -1,7 +1,7 @@
include:
- template: Code-Quality.gitlab-ci.yml
-image: registry.gitlab.com/buildstream/buildstream-docker-images/testsuite-debian:9-master-46405991
+image: registry.gitlab.com/buildstream/buildstream-docker-images/testsuite-debian:9-master-75925678
cache:
key: "$CI_JOB_NAME-"
@@ -54,24 +54,24 @@ variables:
- .coverage-reports
tests-debian-9:
- image: registry.gitlab.com/buildstream/buildstream-docker-images/testsuite-debian:9-master-46405991
+ image: registry.gitlab.com/buildstream/buildstream-docker-images/testsuite-debian:9-master-75925678
<<: *tests
tests-fedora-29:
- image: registry.gitlab.com/buildstream/buildstream-docker-images/testsuite-fedora:29-master-47052095
+ image: registry.gitlab.com/buildstream/buildstream-docker-images/testsuite-fedora:29-master-75925678
<<: *tests
tests-fedora-30:
- image: registry.gitlab.com/buildstream/buildstream-docker-images/testsuite-fedora:30-master-59168197
+ image: registry.gitlab.com/buildstream/buildstream-docker-images/testsuite-fedora:30-master-75925678
<<: *tests
tests-ubuntu-18.04:
- image: registry.gitlab.com/buildstream/buildstream-docker-images/testsuite-ubuntu:18.04-master-46405991
+ image: registry.gitlab.com/buildstream/buildstream-docker-images/testsuite-ubuntu:18.04-master-75925678
<<: *tests
tests-centos-7.6:
<<: *tests
- image: registry.gitlab.com/buildstream/buildstream-docker-images/testsuite-centos:7.6.1810-master-46405991
+ image: registry.gitlab.com/buildstream/buildstream-docker-images/testsuite-centos:7.6.1810-master-75925678
overnight-fedora-30-aarch64:
image: registry.gitlab.com/buildstream/buildstream-docker-images/testsuite-fedora:aarch64-30-master-59168197
@@ -87,7 +87,7 @@ overnight-fedora-30-aarch64:
tests-unix:
# Use fedora here, to a) run a test on fedora and b) ensure that we
# can get rid of ostree - this is not possible with debian-8
- image: registry.gitlab.com/buildstream/buildstream-docker-images/testsuite-fedora:29-master-47052095
+ image: registry.gitlab.com/buildstream/buildstream-docker-images/testsuite-fedora:29-master-75925678
<<: *tests
variables:
BST_FORCE_SANDBOX: "chroot"
@@ -104,7 +104,7 @@ tests-unix:
- ${TEST_COMMAND}
tests-buildbox:
- image: registry.gitlab.com/buildstream/buildstream-docker-images/testsuite-fedora:29-master-47052095
+ image: registry.gitlab.com/buildstream/buildstream-docker-images/testsuite-fedora:29-master-75925678
<<: *tests
variables:
BST_FORCE_SANDBOX: "buildbox"
@@ -134,7 +134,7 @@ tests-buildbox:
tests-fedora-missing-deps:
# Ensure that tests behave nicely while missing bwrap and ostree
- image: registry.gitlab.com/buildstream/buildstream-docker-images/testsuite-fedora:29-master-47052095
+ image: registry.gitlab.com/buildstream/buildstream-docker-images/testsuite-fedora:29-master-75925678
<<: *tests
script:
@@ -153,7 +153,7 @@ tests-fedora-update-deps:
# Check if the tests pass after updating requirements to their latest
# allowed version.
allow_failure: true
- image: registry.gitlab.com/buildstream/buildstream-docker-images/testsuite-fedora:29-master-47052095
+ image: registry.gitlab.com/buildstream/buildstream-docker-images/testsuite-fedora:29-master-75925678
<<: *tests
script:
@@ -167,7 +167,7 @@ tests-fedora-update-deps:
tests-remote-execution:
allow_failure: true
- image: registry.gitlab.com/buildstream/buildstream-docker-images/testsuite-fedora:29-master-47052095
+ image: registry.gitlab.com/buildstream/buildstream-docker-images/testsuite-fedora:29-master-75925678
<<: *tests
before_script:
- dnf install -y docker docker-compose
@@ -190,7 +190,7 @@ tests-remote-execution:
PYTEST_ARGS: "--color=yes --remote-execution"
tests-spawn-multiprocessing-start-method:
- image: registry.gitlab.com/buildstream/buildstream-docker-images/testsuite-fedora:29-master-47052095
+ image: registry.gitlab.com/buildstream/buildstream-docker-images/testsuite-fedora:29-master-75925678
<<: *tests
variables:
BST_FORCE_START_METHOD: "spawn"
@@ -221,6 +221,10 @@ tests-wsl-master:
- df -h
- PATH=/root/.local/bin:$PATH tox --version
script:
+ # Install static buildbox-casd binary
+ - wget https://buildbox-casd-binaries.nyc3.cdn.digitaloceanspaces.com/buildbox-casd-x86_64-linux-20190813-20d41af4.tar.xz
+ - tar -C /root/.local/bin -xf buildbox-casd-x86_64-linux-20190813-20d41af4.tar.xz
+
- PATH=/root/.local/bin:$PATH ${TEST_COMMAND}
only:
- master
@@ -237,6 +241,10 @@ tests-wsl-non-master:
- df -h
- PATH=/root/.local/bin:$PATH tox --version
script:
+ # Install static buildbox-casd binary
+ - wget https://buildbox-casd-binaries.nyc3.cdn.digitaloceanspaces.com/buildbox-casd-x86_64-linux-20190813-20d41af4.tar.xz
+ - tar -C /root/.local/bin -xf buildbox-casd-x86_64-linux-20190813-20d41af4.tar.xz
+
- PATH=/root/.local/bin:$PATH ${TEST_COMMAND}
when: manual
except:
@@ -259,7 +267,7 @@ docs:
.overnight-tests: &overnight-tests-template
stage: test
- image: registry.gitlab.com/buildstream/buildstream-docker-images/testsuite-fedora:30-master-59168197
+ image: registry.gitlab.com/buildstream/buildstream-docker-images/testsuite-fedora:30-master-75925678
variables:
BST_EXT_URL: git+https://gitlab.com/BuildStream/bst-plugins-experimental.git
BST_EXT_REF: 0.12.0-40-g7aa1423377629281decc455d1090964417c38f2e
diff --git a/src/buildstream/_artifactcache.py b/src/buildstream/_artifactcache.py
index d62e7f500..f92a7c84f 100644
--- a/src/buildstream/_artifactcache.py
+++ b/src/buildstream/_artifactcache.py
@@ -21,8 +21,7 @@ import os
import grpc
from ._basecache import BaseCache
-from .types import _KeyStrength
-from ._exceptions import ArtifactError, CASError, CASCacheError
+from ._exceptions import ArtifactError, CASError, CASCacheError, CASRemoteError
from ._protos.buildstream.v2 import buildstream_pb2, buildstream_pb2_grpc, \
artifact_pb2, artifact_pb2_grpc
@@ -96,107 +95,16 @@ class ArtifactCache(BaseCache):
def __init__(self, context):
super().__init__(context)
- self._required_elements = set() # The elements required for this session
-
# create artifact directory
self.artifactdir = context.artifactdir
os.makedirs(self.artifactdir, exist_ok=True)
- self.casquota.add_remove_callbacks(self.unrequired_artifacts, self.remove)
- self.casquota.add_list_refs_callback(self.list_artifacts)
-
- self.cas.add_reachable_directories_callback(self._reachable_directories)
- self.cas.add_reachable_digests_callback(self._reachable_digests)
-
- # mark_required_elements():
- #
- # Mark elements whose artifacts are required for the current run.
- #
- # Artifacts whose elements are in this list will be locked by the artifact
- # cache and not touched for the duration of the current pipeline.
- #
- # Args:
- # elements (iterable): A set of elements to mark as required
- #
- def mark_required_elements(self, elements):
-
- # We risk calling this function with a generator, so we
- # better consume it first.
- #
- elements = list(elements)
-
- # Mark the elements as required. We cannot know that we know the
- # cache keys yet, so we only check that later when deleting.
- #
- self._required_elements.update(elements)
-
- # For the cache keys which were resolved so far, we bump
- # the mtime of them.
- #
- # This is just in case we have concurrent instances of
- # BuildStream running with the same artifact cache, it will
- # reduce the likelyhood of one instance deleting artifacts
- # which are required by the other.
- for element in elements:
- strong_key = element._get_cache_key(strength=_KeyStrength.STRONG)
- weak_key = element._get_cache_key(strength=_KeyStrength.WEAK)
- for key in (strong_key, weak_key):
- if key:
- ref = element.get_artifact_name(key)
-
- try:
- self.update_mtime(ref)
- except ArtifactError:
- pass
-
def update_mtime(self, ref):
try:
os.utime(os.path.join(self.artifactdir, ref))
except FileNotFoundError as e:
raise ArtifactError("Couldn't find artifact: {}".format(ref)) from e
- # unrequired_artifacts()
- #
- # Returns iterator over artifacts that are not required in the build plan
- #
- # Returns:
- # (iter): Iterator over tuples of (float, str) where float is the time
- # and str is the artifact ref
- #
- def unrequired_artifacts(self):
- required_artifacts = set(map(lambda x: x.get_artifact_name(),
- self._required_elements))
- for (mtime, artifact) in self._list_refs_mtimes(self.artifactdir):
- if artifact not in required_artifacts:
- yield (mtime, artifact)
-
- def required_artifacts(self):
- # Build a set of the cache keys which are required
- # based on the required elements at cleanup time
- #
- # We lock both strong and weak keys - deleting one but not the
- # other won't save space, but would be a user inconvenience.
- for element in self._required_elements:
- yield element._get_cache_key(strength=_KeyStrength.STRONG)
- yield element._get_cache_key(strength=_KeyStrength.WEAK)
-
- def full(self):
- return self.casquota.full()
-
- # add_artifact_size()
- #
- # Adds the reported size of a newly cached artifact to the
- # overall estimated size.
- #
- # Args:
- # artifact_size (int): The size to add.
- #
- def add_artifact_size(self, artifact_size):
- cache_size = self.casquota.get_cache_size()
- cache_size += artifact_size
-
- self.casquota.set_cache_size(cache_size)
-
# preflight():
#
# Preflight check.
@@ -241,25 +149,13 @@ class ArtifactCache(BaseCache):
# Args:
# ref (artifact_name): The name of the artifact to remove (as
# generated by `Element.get_artifact_name`)
- # defer_prune (bool): Optionally declare whether pruning should
- # occur immediately after the ref is removed.
- #
- # Returns:
- # (int): The amount of space recovered in the cache, in bytes
#
- def remove(self, ref, *, defer_prune=False):
+ def remove(self, ref):
try:
- return self.cas.remove(ref, basedir=self.artifactdir, defer_prune=defer_prune)
+ self.cas.remove(ref, basedir=self.artifactdir)
except CASCacheError as e:
raise ArtifactError("{}".format(e)) from e
- # prune():
- #
- # Prune the artifact cache of unreachable refs
- #
- def prune(self):
- return self.cas.prune()
-
# diff():
#
# Return a list of files that have been added or modified between
@@ -491,42 +387,6 @@ class ArtifactCache(BaseCache):
# Local Private Methods #
################################################
- # _reachable_directories()
- #
- # Returns:
- # (iter): Iterator over directories digests available from artifacts.
- #
- def _reachable_directories(self):
- for root, _, files in os.walk(self.artifactdir):
- for artifact_file in files:
- artifact = artifact_pb2.Artifact()
- with open(os.path.join(root, artifact_file), 'r+b') as f:
- artifact.ParseFromString(f.read())
-
- if str(artifact.files):
- yield artifact.files
-
- if str(artifact.buildtree):
- yield artifact.buildtree
-
- # _reachable_digests()
- #
- # Returns:
- # (iter): Iterator over single file digests in artifacts
- #
- def _reachable_digests(self):
- for root, _, files in os.walk(self.artifactdir):
- for artifact_file in files:
- artifact = artifact_pb2.Artifact()
- with open(os.path.join(root, artifact_file), 'r+b') as f:
- artifact.ParseFromString(f.read())
-
- if str(artifact.public_data):
- yield artifact.public_data
-
- for log_file in artifact.logs:
- yield log_file.digest
-
# _push_artifact()
#
# Pushes relevant directories and then artifact proto to remote.
@@ -580,6 +440,10 @@ class ArtifactCache(BaseCache):
self.cas.send_blobs(remote, digests)
+ except CASRemoteError as cas_error:
+ if cas_error.reason != "cache-too-full":
+ raise ArtifactError("Failed to push artifact blobs: {}".format(cas_error))
+ return False
except grpc.RpcError as e:
if e.code() != grpc.StatusCode.RESOURCE_EXHAUSTED:
raise ArtifactError("Failed to push artifact blobs: {}".format(e.details()))
diff --git a/src/buildstream/_basecache.py b/src/buildstream/_basecache.py
index 739d14b6a..fc6087bf8 100644
--- a/src/buildstream/_basecache.py
+++ b/src/buildstream/_basecache.py
@@ -42,8 +42,6 @@ class BaseCache():
def __init__(self, context):
self.context = context
self.cas = context.get_cascache()
- self.casquota = context.get_casquota()
- self.casquota._calculate_cache_quota()
self._remotes_setup = False # Check to prevent double-setup of remotes
# Per-project list of _CASRemote instances.
@@ -163,7 +161,7 @@ class BaseCache():
q = multiprocessing.Queue()
for remote_spec in remote_specs:
- error = self.remote_class.check_remote(remote_spec, q)
+ error = self.remote_class.check_remote(remote_spec, self.cas, q)
if error and on_failure:
on_failure(remote_spec.url, error)
@@ -175,7 +173,7 @@ class BaseCache():
if remote_spec.push:
self._has_push_remotes = True
- remotes[remote_spec.url] = self.remote_class(remote_spec)
+ remotes[remote_spec.url] = self.remote_class(remote_spec, self.cas)
for project in self.context.get_projects():
remote_specs = self.global_remote_specs
diff --git a/src/buildstream/_cas/__init__.py b/src/buildstream/_cas/__init__.py
index 46bd9567f..a88e41371 100644
--- a/src/buildstream/_cas/__init__.py
+++ b/src/buildstream/_cas/__init__.py
@@ -17,5 +17,5 @@
# Authors:
# Tristan Van Berkom <tristan.vanberkom@codethink.co.uk>
-from .cascache import CASCache, CASQuota, CASCacheUsage
+from .cascache import CASCache
from .casremote import CASRemote, CASRemoteSpec
diff --git a/src/buildstream/_cas/cascache.py b/src/buildstream/_cas/cascache.py
index ff16480b1..e5fc530a7 100644
--- a/src/buildstream/_cas/cascache.py
+++ b/src/buildstream/_cas/cascache.py
@@ -17,22 +17,25 @@
# Authors:
# Jürg Billeter <juerg.billeter@codethink.co.uk>
-import hashlib
import itertools
import os
import stat
import errno
-import uuid
import contextlib
+import shutil
+import subprocess
+import tempfile
+import time
import grpc
-from .._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
+from .._protos.google.rpc import code_pb2
+from .._protos.build.bazel.remote.execution.v2 import remote_execution_pb2, remote_execution_pb2_grpc
+from .._protos.build.buildgrid import local_cas_pb2, local_cas_pb2_grpc
from .._protos.buildstream.v2 import buildstream_pb2
from .. import utils
-from .._exceptions import CASCacheError, LoadError, LoadErrorReason
-from .._message import Message, MessageType
+from .._exceptions import CASCacheError
from .casremote import BlobNotFound, _CASBatchRead, _CASBatchUpdate
@@ -42,54 +45,86 @@ _BUFFER_SIZE = 65536
CACHE_SIZE_FILE = "cache_size"
-# CASCacheUsage
-#
-# A simple object to report the current CAS cache usage details.
-#
-# Note that this uses the user configured cache quota
-# rather than the internal quota with protective headroom
-# removed, to provide a more sensible value to display to
-# the user.
-#
-# Args:
-# cas (CASQuota): The CAS cache to get the status of
-#
-class CASCacheUsage():
-
- def __init__(self, casquota):
- self.quota_config = casquota._config_cache_quota # Configured quota
- self.quota_size = casquota._cache_quota_original # Resolved cache quota in bytes
- self.used_size = casquota.get_cache_size() # Size used by artifacts in bytes
- self.used_percent = 0 # Percentage of the quota used
- if self.quota_size is not None:
- self.used_percent = int(self.used_size * 100 / self.quota_size)
-
- # Formattable into a human readable string
- #
- def __str__(self):
- return "{} / {} ({}%)" \
- .format(utils._pretty_size(self.used_size, dec_places=1),
- self.quota_config,
- self.used_percent)
-
-
# A CASCache manages a CAS repository as specified in the Remote Execution API.
#
# Args:
# path (str): The root directory for the CAS repository
+# casd (bool): True to spawn buildbox-casd (default) or False (testing only)
# cache_quota (int): User configured cache quota
+# protect_session_blobs (bool): Disable expiry for blobs used in the current session
#
class CASCache():
- def __init__(self, path):
+ def __init__(self, path, *, casd=True, cache_quota=None, protect_session_blobs=True):
self.casdir = os.path.join(path, 'cas')
self.tmpdir = os.path.join(path, 'tmp')
os.makedirs(os.path.join(self.casdir, 'refs', 'heads'), exist_ok=True)
os.makedirs(os.path.join(self.casdir, 'objects'), exist_ok=True)
os.makedirs(self.tmpdir, exist_ok=True)
- self.__reachable_directory_callbacks = []
- self.__reachable_digest_callbacks = []
+ if casd:
+ # Place socket in global/user temporary directory to avoid hitting
+ # the socket path length limit.
+ self._casd_socket_tempdir = tempfile.mkdtemp(prefix='buildstream')
+ self._casd_socket_path = os.path.join(self._casd_socket_tempdir, 'casd.sock')
+
+ casd_args = [utils.get_host_tool('buildbox-casd')]
+ casd_args.append('--bind=unix:' + self._casd_socket_path)
+
+ if cache_quota is not None:
+ casd_args.append('--quota-high={}'.format(int(cache_quota)))
+ casd_args.append('--quota-low={}'.format(int(cache_quota / 2)))
+
+ if protect_session_blobs:
+ casd_args.append('--protect-session-blobs')
+
+ casd_args.append(path)
+ self._casd_process = subprocess.Popen(casd_args, cwd=path)
+ self._casd_start_time = time.time()
+ else:
+ self._casd_process = None
+
+ self._casd_channel = None
+ self._local_cas = None
+ self._fork_disabled = False
+
+ def __getstate__(self):
+ state = self.__dict__.copy()
+
+ # Popen objects are not pickle-able, however, child processes only
+ # need the information whether a casd subprocess was started or not.
+ assert '_casd_process' in state
+ state['_casd_process'] = bool(self._casd_process)
+
+ return state
+
+ def _get_local_cas(self):
+ assert self._casd_process, "CASCache was instantiated without buildbox-casd"
+
+ if not self._local_cas:
+ # gRPC doesn't support fork without exec, which is used in the main process.
+ assert self._fork_disabled or not utils._is_main_process()
+
+ self._casd_channel = grpc.insecure_channel('unix:' + self._casd_socket_path)
+ self._local_cas = local_cas_pb2_grpc.LocalContentAddressableStorageStub(self._casd_channel)
+
+ # Call GetCapabilities() to establish connection to casd
+ capabilities = remote_execution_pb2_grpc.CapabilitiesStub(self._casd_channel)
+ while True:
+ try:
+ capabilities.GetCapabilities(remote_execution_pb2.GetCapabilitiesRequest())
+ break
+ except grpc.RpcError as e:
+ if e.code() == grpc.StatusCode.UNAVAILABLE:
+ # casd is not ready yet, try again after a 10ms delay,
+ # but don't wait for more than 15s
+ if time.time() < self._casd_start_time + 15:
+ time.sleep(1 / 100)
+ continue
+
+ raise
+
+ return self._local_cas
# preflight():
#
@@ -101,6 +136,39 @@ class CASCache():
if not (os.path.isdir(headdir) and os.path.isdir(objdir)):
raise CASCacheError("CAS repository check failed for '{}'".format(self.casdir))
+ # notify_fork_disabled():
+ #
+ # Called by Context when fork() is disabled. This will enable communication
+ # with casd via gRPC in the main process.
+ #
+ def notify_fork_disabled(self):
+ self._fork_disabled = True
+
+ # release_resources():
+ #
+ # Release resources used by CASCache.
+ #
+ def release_resources(self, messenger=None):
+ if self._casd_process:
+ self._casd_process.terminate()
+ try:
+ # Don't print anything if buildbox-casd terminates quickly
+ self._casd_process.wait(timeout=0.5)
+ except subprocess.TimeoutExpired:
+ if messenger:
+ cm = messenger.timed_activity("Terminating buildbox-casd")
+ else:
+ cm = contextlib.suppress()
+ with cm:
+ try:
+ self._casd_process.wait(timeout=15)
+ except subprocess.TimeoutExpired:
+ self._casd_process.kill()
+ self._casd_process.wait(timeout=15)
+ self._casd_process = None
+
+ shutil.rmtree(self._casd_socket_tempdir)
+
# contains():
#
# Check whether the specified ref is already available in the local CAS cache.
@@ -365,13 +433,14 @@ class CASCache():
# path (str): Path to file to add
# buffer (bytes): Byte buffer to add
# link_directly (bool): Whether file given by path can be linked
+ # instance_name (str): casd instance_name for remote CAS
#
# Returns:
# (Digest): The digest of the added object
#
# Either `path` or `buffer` must be passed, but not both.
#
- def add_object(self, *, digest=None, path=None, buffer=None, link_directly=False):
+ def add_object(self, *, digest=None, path=None, buffer=None, link_directly=False, instance_name=None):
# Exactly one of the two parameters has to be specified
assert (path is None) != (buffer is None)
@@ -381,42 +450,32 @@ class CASCache():
if digest is None:
digest = remote_execution_pb2.Digest()
- try:
- h = hashlib.sha256()
- # Always write out new file to avoid corruption if input file is modified
- with contextlib.ExitStack() as stack:
- if path is not None and link_directly:
- tmp = stack.enter_context(open(path, 'rb'))
- for chunk in iter(lambda: tmp.read(_BUFFER_SIZE), b""):
- h.update(chunk)
- else:
- tmp = stack.enter_context(self._temporary_object())
+ with contextlib.ExitStack() as stack:
+ if path is None:
+ tmp = stack.enter_context(self._temporary_object())
+ tmp.write(buffer)
+ tmp.flush()
+ path = tmp.name
- if path:
- with open(path, 'rb') as f:
- for chunk in iter(lambda: f.read(_BUFFER_SIZE), b""):
- h.update(chunk)
- tmp.write(chunk)
- else:
- h.update(buffer)
- tmp.write(buffer)
+ request = local_cas_pb2.CaptureFilesRequest()
+ if instance_name:
+ request.instance_name = instance_name
- tmp.flush()
+ request.path.append(path)
- digest.hash = h.hexdigest()
- digest.size_bytes = os.fstat(tmp.fileno()).st_size
+ local_cas = self._get_local_cas()
- # Place file at final location
- objpath = self.objpath(digest)
- os.makedirs(os.path.dirname(objpath), exist_ok=True)
- os.link(tmp.name, objpath)
+ response = local_cas.CaptureFiles(request)
- except FileExistsError:
- # We can ignore the failed link() if the object is already in the repo.
- pass
+ if len(response.responses) != 1:
+ raise CASCacheError("Expected 1 response from CaptureFiles, got {}".format(len(response.responses)))
- except OSError as e:
- raise CASCacheError("Failed to hash object: {}".format(e)) from e
+ blob_response = response.responses[0]
+ if blob_response.status.code == code_pb2.RESOURCE_EXHAUSTED:
+ raise CASCacheError("Cache too full", reason="cache-too-full")
+ elif blob_response.status.code != code_pb2.OK:
+ raise CASCacheError("Failed to capture blob {}: {}".format(path, blob_response.status.code))
+ digest.CopyFrom(blob_response.digest)
return digest
@@ -472,41 +531,6 @@ class CASCache():
except FileNotFoundError as e:
raise CASCacheError("Attempt to access unavailable ref: {}".format(e)) from e
- # list_objects():
- #
- # List cached objects in Least Recently Modified (LRM) order.
- #
- # Returns:
- # (list) - A list of objects and timestamps in LRM order
- #
- def list_objects(self):
- objs = []
- mtimes = []
-
- for root, _, files in os.walk(os.path.join(self.casdir, 'objects')):
- for filename in files:
- obj_path = os.path.join(root, filename)
- try:
- mtimes.append(os.path.getmtime(obj_path))
- except FileNotFoundError:
- pass
- else:
- objs.append(obj_path)
-
- # NOTE: Sorted will sort from earliest to latest, thus the
- # first element of this list will be the file modified earliest.
- return sorted(zip(mtimes, objs))
-
- def clean_up_refs_until(self, time):
- ref_heads = os.path.join(self.casdir, 'refs', 'heads')
-
- for root, _, files in os.walk(ref_heads):
- for filename in files:
- ref_path = os.path.join(root, filename)
- # Obtain the mtime (the time a file was last modified)
- if os.path.getmtime(ref_path) < time:
- os.unlink(ref_path)
-
# remove():
#
# Removes the given symbolic ref from the repo.
@@ -515,75 +539,14 @@ class CASCache():
# ref (str): A symbolic ref
# basedir (str): Path of base directory the ref is in, defaults to
# CAS refs heads
- # defer_prune (bool): Whether to defer pruning to the caller. NOTE:
- # The space won't be freed until you manually
- # call prune.
#
- # Returns:
- # (int|None) The amount of space pruned from the repository in
- # Bytes, or None if defer_prune is True
- #
- def remove(self, ref, *, basedir=None, defer_prune=False):
+ def remove(self, ref, *, basedir=None):
if basedir is None:
basedir = os.path.join(self.casdir, 'refs', 'heads')
# Remove cache ref
self._remove_ref(ref, basedir)
- if not defer_prune:
- pruned = self.prune()
- return pruned
-
- return None
-
- # adds callback of iterator over reachable directory digests
- def add_reachable_directories_callback(self, callback):
- self.__reachable_directory_callbacks.append(callback)
-
- # adds callbacks of iterator over reachable file digests
- def add_reachable_digests_callback(self, callback):
- self.__reachable_digest_callbacks.append(callback)
-
- # prune():
- #
- # Prune unreachable objects from the repo.
- #
- def prune(self):
- ref_heads = os.path.join(self.casdir, 'refs', 'heads')
-
- pruned = 0
- reachable = set()
-
- # Check which objects are reachable
- for root, _, files in os.walk(ref_heads):
- for filename in files:
- ref_path = os.path.join(root, filename)
- ref = os.path.relpath(ref_path, ref_heads)
-
- tree = self.resolve_ref(ref)
- self._reachable_refs_dir(reachable, tree)
-
- # check callback directory digests that are reachable
- for digest_callback in self.__reachable_directory_callbacks:
- for digest in digest_callback():
- self._reachable_refs_dir(reachable, digest)
-
- # check callback file digests that are reachable
- for digest_callback in self.__reachable_digest_callbacks:
- for digest in digest_callback():
- reachable.add(digest.hash)
-
- # Prune unreachable objects
- for root, _, files in os.walk(os.path.join(self.casdir, 'objects')):
- for filename in files:
- objhash = os.path.basename(root) + filename
- if objhash not in reachable:
- obj_path = os.path.join(root, filename)
- pruned += os.stat(obj_path).st_size
- os.unlink(obj_path)
-
- return pruned
-
def update_tree_mtime(self, tree):
reachable = set()
self._reachable_refs_dir(reachable, tree, update_mtime=True)
@@ -860,26 +823,13 @@ class CASCache():
# already in local repository
return objpath
- with self._temporary_object() as f:
- remote._fetch_blob(digest, f)
-
- added_digest = self.add_object(path=f.name, link_directly=True)
- assert added_digest.hash == digest.hash
+ remote._fetch_blob(digest)
return objpath
- def _batch_download_complete(self, batch, *, missing_blobs=None):
- for digest, data in batch.send(missing_blobs=missing_blobs):
- with self._temporary_object() as f:
- f.write(data)
- f.flush()
-
- added_digest = self.add_object(path=f.name, link_directly=True)
- assert added_digest.hash == digest.hash
-
# Helper function for _fetch_directory().
def _fetch_directory_batch(self, remote, batch, fetch_queue, fetch_next_queue):
- self._batch_download_complete(batch)
+ batch.send()
# All previously scheduled directories are now locally available,
# move them to the processing queue.
@@ -894,17 +844,8 @@ class CASCache():
if in_local_cache:
# Skip download, already in local cache.
pass
- elif (digest.size_bytes >= remote.max_batch_total_size_bytes or
- not remote.batch_read_supported):
- # Too large for batch request, download in independent request.
- self._ensure_blob(remote, digest)
- in_local_cache = True
else:
- if not batch.add(digest):
- # Not enough space left in batch request.
- # Complete pending batch first.
- batch = self._fetch_directory_batch(remote, batch, fetch_queue, fetch_next_queue)
- batch.add(digest)
+ batch.add(digest)
if recursive:
if in_local_cache:
@@ -955,20 +896,18 @@ class CASCache():
self._fetch_directory_batch(remote, batch, fetch_queue, fetch_next_queue)
def _fetch_tree(self, remote, digest):
- # download but do not store the Tree object
- with utils._tempnamedfile(dir=self.tmpdir) as out:
- remote._fetch_blob(digest, out)
+ objpath = self._ensure_blob(remote, digest)
- tree = remote_execution_pb2.Tree()
+ tree = remote_execution_pb2.Tree()
- with open(out.name, 'rb') as f:
- tree.ParseFromString(f.read())
+ with open(objpath, 'rb') as f:
+ tree.ParseFromString(f.read())
- tree.children.extend([tree.root])
- for directory in tree.children:
- dirbuffer = directory.SerializeToString()
- dirdigest = self.add_object(buffer=dirbuffer)
- assert dirdigest.size_bytes == len(dirbuffer)
+ tree.children.extend([tree.root])
+ for directory in tree.children:
+ dirbuffer = directory.SerializeToString()
+ dirdigest = self.add_object(buffer=dirbuffer)
+ assert dirdigest.size_bytes == len(dirbuffer)
return dirdigest
@@ -990,27 +929,9 @@ class CASCache():
batch = _CASBatchRead(remote)
for digest in digests:
- if (digest.size_bytes >= remote.max_batch_total_size_bytes or
- not remote.batch_read_supported):
- # Too large for batch request, download in independent request.
- try:
- self._ensure_blob(remote, digest)
- except grpc.RpcError as e:
- if e.code() == grpc.StatusCode.NOT_FOUND:
- missing_blobs.append(digest)
- else:
- raise CASCacheError("Failed to fetch blob: {}".format(e)) from e
- else:
- if not batch.add(digest):
- # Not enough space left in batch request.
- # Complete pending batch first.
- self._batch_download_complete(batch, missing_blobs=missing_blobs)
-
- batch = _CASBatchRead(remote)
- batch.add(digest)
+ batch.add(digest)
- # Complete last pending batch
- self._batch_download_complete(batch, missing_blobs=missing_blobs)
+ batch.send(missing_blobs=missing_blobs)
return missing_blobs
@@ -1022,390 +943,19 @@ class CASCache():
# remote (CASRemote): The remote repository to upload to
# digests (list): The Digests of Blobs to upload
#
- def send_blobs(self, remote, digests, u_uid=uuid.uuid4()):
+ def send_blobs(self, remote, digests):
batch = _CASBatchUpdate(remote)
for digest in digests:
- with open(self.objpath(digest), 'rb') as f:
- assert os.fstat(f.fileno()).st_size == digest.size_bytes
+ batch.add(digest)
- if (digest.size_bytes >= remote.max_batch_total_size_bytes or
- not remote.batch_update_supported):
- # Too large for batch request, upload in independent request.
- remote._send_blob(digest, f, u_uid=u_uid)
- else:
- if not batch.add(digest, f):
- # Not enough space left in batch request.
- # Complete pending batch first.
- batch.send()
- batch = _CASBatchUpdate(remote)
- batch.add(digest, f)
-
- # Send final batch
batch.send()
- def _send_directory(self, remote, digest, u_uid=uuid.uuid4()):
+ def _send_directory(self, remote, digest):
missing_blobs = self.remote_missing_blobs_for_directory(remote, digest)
# Upload any blobs missing on the server
- self.send_blobs(remote, missing_blobs, u_uid)
-
-
-class CASQuota:
- def __init__(self, context):
- self.context = context
- self.cas = context.get_cascache()
- self.casdir = self.cas.casdir
- self._config_cache_quota = context.config_cache_quota
- self._config_cache_quota_string = context.config_cache_quota_string
- self._cache_size = None # The current cache size, sometimes it's an estimate
- self._cache_quota = None # The cache quota
- self._cache_quota_original = None # The cache quota as specified by the user, in bytes
- self._cache_quota_headroom = None # The headroom in bytes before reaching the quota or full disk
- self._cache_lower_threshold = None # The target cache size for a cleanup
-
- self._message = context.messenger.message
-
- self._remove_callbacks = [] # Callbacks to remove unrequired refs and their remove method
- self._list_refs_callbacks = [] # Callbacks to all refs
-
- self._calculate_cache_quota()
-
- # compute_cache_size()
- #
- # Computes the real artifact cache size.
- #
- # Returns:
- # (int): The size of the artifact cache.
- #
- def compute_cache_size(self):
- self._cache_size = utils._get_dir_size(self.casdir)
- return self._cache_size
-
- # get_cache_size()
- #
- # Fetches the cached size of the cache, this is sometimes
- # an estimate and periodically adjusted to the real size
- # when a cache size calculation job runs.
- #
- # When it is an estimate, the value is either correct, or
- # it is greater than the actual cache size.
- #
- # Returns:
- # (int) An approximation of the artifact cache size, in bytes.
- #
- def get_cache_size(self):
-
- # If we don't currently have an estimate, figure out the real cache size.
- if self._cache_size is None:
- stored_size = self._read_cache_size()
- if stored_size is not None:
- self._cache_size = stored_size
- else:
- self.compute_cache_size()
-
- return self._cache_size
-
- # set_cache_size()
- #
- # Forcefully set the overall cache size.
- #
- # This is used to update the size in the main process after
- # having calculated in a cleanup or a cache size calculation job.
- #
- # Args:
- # cache_size (int): The size to set.
- # write_to_disk (bool): Whether to write the value to disk.
- #
- def set_cache_size(self, cache_size, *, write_to_disk=True):
-
- assert cache_size is not None
-
- self._cache_size = cache_size
- if write_to_disk:
- self._write_cache_size(self._cache_size)
-
- # full()
- #
- # Checks if the artifact cache is full, either
- # because the user configured quota has been exceeded
- # or because the underlying disk is almost full.
- #
- # Returns:
- # (bool): True if the artifact cache is full
- #
- def full(self):
-
- if self.get_cache_size() > self._cache_quota:
- return True
-
- _, volume_avail = self._get_cache_volume_size()
- if volume_avail < self._cache_quota_headroom:
- return True
-
- return False
-
- # add_remove_callbacks()
- #
- # This adds tuples of iterators over unrequired objects (currently
- # artifacts and source refs), and a callback to remove them.
- #
- # Args:
- # callback (iter(unrequired), remove): tuple of iterator and remove
- # method associated.
- #
- def add_remove_callbacks(self, list_unrequired, remove_method):
- self._remove_callbacks.append((list_unrequired, remove_method))
-
- def add_list_refs_callback(self, list_callback):
- self._list_refs_callbacks.append(list_callback)
-
- ################################################
- # Local Private Methods #
- ################################################
-
- # _read_cache_size()
- #
- # Reads and returns the size of the artifact cache that's stored in the
- # cache's size file
- #
- # Returns:
- # (int): The size of the artifact cache, as recorded in the file
- #
- def _read_cache_size(self):
- size_file_path = os.path.join(self.casdir, CACHE_SIZE_FILE)
-
- if not os.path.exists(size_file_path):
- return None
-
- with open(size_file_path, "r") as f:
- size = f.read()
-
- try:
- num_size = int(size)
- except ValueError as e:
- raise CASCacheError("Size '{}' parsed from '{}' was not an integer".format(
- size, size_file_path)) from e
-
- return num_size
-
- # _write_cache_size()
- #
- # Writes the given size of the artifact to the cache's size file
- #
- # Args:
- # size (int): The size of the artifact cache to record
- #
- def _write_cache_size(self, size):
- assert isinstance(size, int)
- size_file_path = os.path.join(self.casdir, CACHE_SIZE_FILE)
- with utils.save_file_atomic(size_file_path, "w", tempdir=self.cas.tmpdir) as f:
- f.write(str(size))
-
- # _get_cache_volume_size()
- #
- # Get the available space and total space for the volume on
- # which the artifact cache is located.
- #
- # Returns:
- # (int): The total number of bytes on the volume
- # (int): The number of available bytes on the volume
- #
- # NOTE: We use this stub to allow the test cases
- # to override what an artifact cache thinks
- # about it's disk size and available bytes.
- #
- def _get_cache_volume_size(self):
- return utils._get_volume_size(self.casdir)
-
- # _calculate_cache_quota()
- #
- # Calculates and sets the cache quota and lower threshold based on the
- # quota set in Context.
- # It checks that the quota is both a valid expression, and that there is
- # enough disk space to satisfy that quota
- #
- def _calculate_cache_quota(self):
- # Headroom intended to give BuildStream a bit of leeway.
- # This acts as the minimum size of cache_quota and also
- # is taken from the user requested cache_quota.
- #
- if self.context.is_running_in_test_suite:
- self._cache_quota_headroom = 0
- else:
- self._cache_quota_headroom = 2e9
-
- total_size, available_space = self._get_cache_volume_size()
- cache_size = self.get_cache_size()
-
- # Ensure system has enough storage for the cache_quota
- #
- # If cache_quota is none, set it to the maximum it could possibly be.
- #
- # Also check that cache_quota is at least as large as our headroom.
- #
- cache_quota = self._config_cache_quota
- if cache_quota is None:
- # The user has set no limit, so we may take all the space.
- cache_quota = min(cache_size + available_space, total_size)
- if cache_quota < self._cache_quota_headroom: # Check minimum
- raise LoadError("Invalid cache quota ({}): BuildStream requires a minimum cache quota of {}."
- .format(utils._pretty_size(cache_quota), utils._pretty_size(self._cache_quota_headroom)),
- LoadErrorReason.INVALID_DATA)
- elif cache_quota > total_size:
- # A quota greater than the total disk size is certianly an error
- raise CASCacheError("Your system does not have enough available " +
- "space to support the cache quota specified.",
- detail=("You have specified a quota of {quota} total disk space.\n" +
- "The filesystem containing {local_cache_path} only " +
- "has {total_size} total disk space.")
- .format(
- quota=self._config_cache_quota,
- local_cache_path=self.casdir,
- total_size=utils._pretty_size(total_size)),
- reason='insufficient-storage-for-quota')
-
- elif cache_quota > cache_size + available_space:
- # The quota does not fit in the available space, this is a warning
- if '%' in self._config_cache_quota_string:
- available = (available_space / total_size) * 100
- available = '{}% of total disk space'.format(round(available, 1))
- else:
- available = utils._pretty_size(available_space)
-
- self._message(Message(
- MessageType.WARN,
- "Your system does not have enough available " +
- "space to support the cache quota specified.",
- detail=("You have specified a quota of {quota} total disk space.\n" +
- "The filesystem containing {local_cache_path} only " +
- "has {available_size} available.")
- .format(quota=self._config_cache_quota,
- local_cache_path=self.casdir,
- available_size=available)))
-
- # Place a slight headroom (2e9 (2GB) on the cache_quota) into
- # cache_quota to try and avoid exceptions.
- #
- # Of course, we might still end up running out during a build
- # if we end up writing more than 2G, but hey, this stuff is
- # already really fuzzy.
- #
- self._cache_quota_original = cache_quota
- self._cache_quota = cache_quota - self._cache_quota_headroom
- self._cache_lower_threshold = self._cache_quota / 2
-
- # clean():
- #
- # Clean the artifact cache as much as possible.
- #
- # Args:
- # progress (callable): A callback to call when a ref is removed
- #
- # Returns:
- # (int): The size of the cache after having cleaned up
- #
- def clean(self, progress=None):
- context = self.context
-
- # Some accumulative statistics
- removed_ref_count = 0
- space_saved = 0
-
- total_refs = 0
- for refs in self._list_refs_callbacks:
- total_refs += len(list(refs()))
-
- # Start off with an announcement with as much info as possible
- volume_size, volume_avail = self._get_cache_volume_size()
- self._message(Message(
- MessageType.STATUS, "Starting cache cleanup",
- detail=("Elements required by the current build plan:\n" + "{}\n" +
- "User specified quota: {} ({})\n" +
- "Cache usage: {}\n" +
- "Cache volume: {} total, {} available")
- .format(
- total_refs,
- context.config_cache_quota,
- utils._pretty_size(self._cache_quota, dec_places=2),
- utils._pretty_size(self.get_cache_size(), dec_places=2),
- utils._pretty_size(volume_size, dec_places=2),
- utils._pretty_size(volume_avail, dec_places=2))))
-
- # Do a real computation of the cache size once, just in case
- self.compute_cache_size()
- usage = CASCacheUsage(self)
- self._message(Message(MessageType.STATUS,
- "Cache usage recomputed: {}".format(usage)))
-
- # Collect digests and their remove method
- all_unrequired_refs = []
- for (unrequired_refs, remove) in self._remove_callbacks:
- for (mtime, ref) in unrequired_refs():
- all_unrequired_refs.append((mtime, ref, remove))
-
- # Pair refs and their remove method sorted in time order
- all_unrequired_refs = [(ref, remove) for (_, ref, remove) in sorted(all_unrequired_refs)]
-
- # Go through unrequired refs and remove them, oldest first
- made_space = False
- for (ref, remove) in all_unrequired_refs:
- size = remove(ref)
- removed_ref_count += 1
- space_saved += size
-
- self._message(Message(
- MessageType.STATUS,
- "Freed {: <7} {}".format(
- utils._pretty_size(size, dec_places=2),
- ref)))
-
- self.set_cache_size(self._cache_size - size)
-
- # User callback
- #
- # Currently this process is fairly slow, but we should
- # think about throttling this progress() callback if this
- # becomes too intense.
- if progress:
- progress()
-
- if self.get_cache_size() < self._cache_lower_threshold:
- made_space = True
- break
-
- if not made_space and self.full():
- # If too many artifacts are required, and we therefore
- # can't remove them, we have to abort the build.
- #
- # FIXME: Asking the user what to do may be neater
- #
- default_conf = os.path.join(os.environ['XDG_CONFIG_HOME'],
- 'buildstream.conf')
- detail = ("Aborted after removing {} refs and saving {} disk space.\n"
- "The remaining {} in the cache is required by the {} references in your build plan\n\n"
- "There is not enough space to complete the build.\n"
- "Please increase the cache-quota in {} and/or make more disk space."
- .format(removed_ref_count,
- utils._pretty_size(space_saved, dec_places=2),
- utils._pretty_size(self.get_cache_size(), dec_places=2),
- total_refs,
- (context.config_origin or default_conf)))
-
- raise CASCacheError("Cache too full. Aborting.",
- detail=detail,
- reason="cache-too-full")
-
- # Informational message about the side effects of the cleanup
- self._message(Message(
- MessageType.INFO, "Cleanup completed",
- detail=("Removed {} refs and saving {} disk space.\n" +
- "Cache usage is now: {}")
- .format(removed_ref_count,
- utils._pretty_size(space_saved, dec_places=2),
- utils._pretty_size(self.get_cache_size(), dec_places=2))))
-
- return self.get_cache_size()
+ self.send_blobs(remote, missing_blobs)
def _grouper(iterable, n):
diff --git a/src/buildstream/_cas/casremote.py b/src/buildstream/_cas/casremote.py
index 1c7e3152d..ab26d32c7 100644
--- a/src/buildstream/_cas/casremote.py
+++ b/src/buildstream/_cas/casremote.py
@@ -1,16 +1,14 @@
from collections import namedtuple
-import io
import os
import multiprocessing
import signal
from urllib.parse import urlparse
-import uuid
import grpc
from .._protos.google.rpc import code_pb2
-from .._protos.google.bytestream import bytestream_pb2, bytestream_pb2_grpc
from .._protos.build.bazel.remote.execution.v2 import remote_execution_pb2, remote_execution_pb2_grpc
+from .._protos.build.buildgrid import local_cas_pb2
from .._protos.buildstream.v2 import buildstream_pb2, buildstream_pb2_grpc
from .._exceptions import CASRemoteError, LoadError, LoadErrorReason
@@ -77,24 +75,29 @@ class BlobNotFound(CASRemoteError):
# Represents a single remote CAS cache.
#
class CASRemote():
- def __init__(self, spec):
+ def __init__(self, spec, cascache):
self.spec = spec
self._initialized = False
+ self.cascache = cascache
self.channel = None
self.instance_name = None
- self.bytestream = None
self.cas = None
self.ref_storage = None
self.batch_update_supported = None
self.batch_read_supported = None
self.capabilities = None
self.max_batch_total_size_bytes = None
+ self.local_cas_instance_name = None
def init(self):
if not self._initialized:
# gRPC doesn't support fork without exec, which is used in the main process.
assert not utils._is_main_process()
+ server_cert_bytes = None
+ client_key_bytes = None
+ client_cert_bytes = None
+
url = urlparse(self.spec.url)
if url.scheme == 'http':
port = url.port or 80
@@ -105,20 +108,14 @@ class CASRemote():
if self.spec.server_cert:
with open(self.spec.server_cert, 'rb') as f:
server_cert_bytes = f.read()
- else:
- server_cert_bytes = None
if self.spec.client_key:
with open(self.spec.client_key, 'rb') as f:
client_key_bytes = f.read()
- else:
- client_key_bytes = None
if self.spec.client_cert:
with open(self.spec.client_cert, 'rb') as f:
client_cert_bytes = f.read()
- else:
- client_cert_bytes = None
credentials = grpc.ssl_channel_credentials(root_certificates=server_cert_bytes,
private_key=client_key_bytes,
@@ -129,7 +126,6 @@ class CASRemote():
self.instance_name = self.spec.instance_name or None
- self.bytestream = bytestream_pb2_grpc.ByteStreamStub(self.channel)
self.cas = remote_execution_pb2_grpc.ContentAddressableStorageStub(self.channel)
self.capabilities = remote_execution_pb2_grpc.CapabilitiesStub(self.channel)
self.ref_storage = buildstream_pb2_grpc.ReferenceStorageStub(self.channel)
@@ -173,6 +169,20 @@ class CASRemote():
e.code() != grpc.StatusCode.PERMISSION_DENIED):
raise
+ local_cas = self.cascache._get_local_cas()
+ request = local_cas_pb2.GetInstanceNameForRemoteRequest()
+ request.url = self.spec.url
+ if self.spec.instance_name:
+ request.instance_name = self.spec.instance_name
+ if server_cert_bytes:
+ request.server_cert = server_cert_bytes
+ if client_key_bytes:
+ request.client_key = client_key_bytes
+ if client_cert_bytes:
+ request.client_cert = client_cert_bytes
+ response = local_cas.GetInstanceNameForRemote(request)
+ self.local_cas_instance_name = response.instance_name
+
self._initialized = True
# check_remote
@@ -182,11 +192,11 @@ class CASRemote():
# in the main BuildStream process
# See https://github.com/grpc/grpc/blob/master/doc/fork_support.md for details
@classmethod
- def check_remote(cls, remote_spec, q):
+ def check_remote(cls, remote_spec, cascache, q):
def __check_remote():
try:
- remote = cls(remote_spec)
+ remote = cls(remote_spec, cascache)
remote.init()
request = buildstream_pb2.StatusRequest()
@@ -235,66 +245,29 @@ class CASRemote():
def push_message(self, message):
message_buffer = message.SerializeToString()
- message_digest = utils._message_digest(message_buffer)
self.init()
- with io.BytesIO(message_buffer) as b:
- self._send_blob(message_digest, b)
-
- return message_digest
+ return self.cascache.add_object(buffer=message_buffer, instance_name=self.local_cas_instance_name)
################################################
# Local Private Methods #
################################################
- def _fetch_blob(self, digest, stream):
- if self.instance_name:
- resource_name = '/'.join([self.instance_name, 'blobs',
- digest.hash, str(digest.size_bytes)])
- else:
- resource_name = '/'.join(['blobs',
- digest.hash, str(digest.size_bytes)])
-
- request = bytestream_pb2.ReadRequest()
- request.resource_name = resource_name
- request.read_offset = 0
- for response in self.bytestream.Read(request):
- stream.write(response.data)
- stream.flush()
-
- assert digest.size_bytes == os.fstat(stream.fileno()).st_size
-
- def _send_blob(self, digest, stream, u_uid=uuid.uuid4()):
- if self.instance_name:
- resource_name = '/'.join([self.instance_name, 'uploads', str(u_uid), 'blobs',
- digest.hash, str(digest.size_bytes)])
- else:
- resource_name = '/'.join(['uploads', str(u_uid), 'blobs',
- digest.hash, str(digest.size_bytes)])
-
- def request_stream(resname, instream):
- offset = 0
- finished = False
- remaining = digest.size_bytes
- while not finished:
- chunk_size = min(remaining, _MAX_PAYLOAD_BYTES)
- remaining -= chunk_size
-
- request = bytestream_pb2.WriteRequest()
- request.write_offset = offset
- # max. _MAX_PAYLOAD_BYTES chunks
- request.data = instream.read(chunk_size)
- request.resource_name = resname
- request.finish_write = remaining <= 0
-
- yield request
-
- offset += chunk_size
- finished = request.finish_write
-
- response = self.bytestream.Write(request_stream(resource_name, stream))
-
- assert response.committed_size == digest.size_bytes
+ def _fetch_blob(self, digest):
+ local_cas = self.cascache._get_local_cas()
+ request = local_cas_pb2.FetchMissingBlobsRequest()
+ request.instance_name = self.local_cas_instance_name
+ request_digest = request.blob_digests.add()
+ request_digest.CopyFrom(digest)
+ response = local_cas.FetchMissingBlobs(request)
+ for blob_response in response.responses:
+ if blob_response.status.code == code_pb2.NOT_FOUND:
+ raise BlobNotFound(response.digest.hash, "Failed to download blob {}: {}".format(
+ blob_response.digest.hash, blob_response.status.code))
+
+ if blob_response.status.code != code_pb2.OK:
+ raise CASRemoteError("Failed to download blob {}: {}".format(
+ blob_response.digest.hash, blob_response.status.code))
# Represents a batch of blobs queued for fetching.
@@ -302,35 +275,25 @@ class CASRemote():
class _CASBatchRead():
def __init__(self, remote):
self._remote = remote
- self._max_total_size_bytes = remote.max_batch_total_size_bytes
- self._request = remote_execution_pb2.BatchReadBlobsRequest()
- if remote.instance_name:
- self._request.instance_name = remote.instance_name
- self._size = 0
+ self._request = local_cas_pb2.FetchMissingBlobsRequest()
+ self._request.instance_name = remote.local_cas_instance_name
self._sent = False
def add(self, digest):
assert not self._sent
- new_batch_size = self._size + digest.size_bytes
- if new_batch_size > self._max_total_size_bytes:
- # Not enough space left in current batch
- return False
-
- request_digest = self._request.digests.add()
- request_digest.hash = digest.hash
- request_digest.size_bytes = digest.size_bytes
- self._size = new_batch_size
- return True
+ request_digest = self._request.blob_digests.add()
+ request_digest.CopyFrom(digest)
def send(self, *, missing_blobs=None):
assert not self._sent
self._sent = True
- if not self._request.digests:
+ if not self._request.blob_digests:
return
- batch_response = self._remote.cas.BatchReadBlobs(self._request)
+ local_cas = self._remote.cascache._get_local_cas()
+ batch_response = local_cas.FetchMissingBlobs(self._request)
for response in batch_response.responses:
if response.status.code == code_pb2.NOT_FOUND:
@@ -347,46 +310,38 @@ class _CASBatchRead():
raise CASRemoteError("Failed to download blob {}: expected {} bytes, received {} bytes".format(
response.digest.hash, response.digest.size_bytes, len(response.data)))
- yield (response.digest, response.data)
-
# Represents a batch of blobs queued for upload.
#
class _CASBatchUpdate():
def __init__(self, remote):
self._remote = remote
- self._max_total_size_bytes = remote.max_batch_total_size_bytes
- self._request = remote_execution_pb2.BatchUpdateBlobsRequest()
- if remote.instance_name:
- self._request.instance_name = remote.instance_name
- self._size = 0
+ self._request = local_cas_pb2.UploadMissingBlobsRequest()
+ self._request.instance_name = remote.local_cas_instance_name
self._sent = False
- def add(self, digest, stream):
+ def add(self, digest):
assert not self._sent
- new_batch_size = self._size + digest.size_bytes
- if new_batch_size > self._max_total_size_bytes:
- # Not enough space left in current batch
- return False
-
- blob_request = self._request.requests.add()
- blob_request.digest.hash = digest.hash
- blob_request.digest.size_bytes = digest.size_bytes
- blob_request.data = stream.read(digest.size_bytes)
- self._size = new_batch_size
- return True
+ request_digest = self._request.blob_digests.add()
+ request_digest.CopyFrom(digest)
def send(self):
assert not self._sent
self._sent = True
- if not self._request.requests:
+ if not self._request.blob_digests:
return
- batch_response = self._remote.cas.BatchUpdateBlobs(self._request)
+ local_cas = self._remote.cascache._get_local_cas()
+ batch_response = local_cas.UploadMissingBlobs(self._request)
for response in batch_response.responses:
if response.status.code != code_pb2.OK:
+ if response.status.code == code_pb2.RESOURCE_EXHAUSTED:
+ reason = "cache-too-full"
+ else:
+ reason = None
+
raise CASRemoteError("Failed to upload blob {}: {}".format(
- response.digest.hash, response.status.code))
+ response.digest.hash, response.status.code), reason=reason)
diff --git a/src/buildstream/_cas/casserver.py b/src/buildstream/_cas/casserver.py
index 9606c263b..5a4c2b7ac 100644
--- a/src/buildstream/_cas/casserver.py
+++ b/src/buildstream/_cas/casserver.py
@@ -18,14 +18,13 @@
# Jürg Billeter <juerg.billeter@codethink.co.uk>
from concurrent import futures
-import logging
+from contextlib import contextmanager
import os
import signal
import sys
import tempfile
import uuid
import errno
-import threading
import grpc
from google.protobuf.message import DecodeError
@@ -38,7 +37,7 @@ from .._protos.buildstream.v2 import buildstream_pb2, buildstream_pb2_grpc, \
artifact_pb2, artifact_pb2_grpc, source_pb2, source_pb2_grpc
from .. import utils
-from .._exceptions import CASError
+from .._exceptions import CASError, CASCacheError
from .cascache import CASCache
@@ -48,11 +47,6 @@ from .cascache import CASCache
_MAX_PAYLOAD_BYTES = 1024 * 1024
-# Trying to push an artifact that is too large
-class ArtifactTooLargeException(Exception):
- pass
-
-
# create_server():
#
# Create gRPC CAS artifact server as specified in the Remote Execution API.
@@ -61,47 +55,49 @@ class ArtifactTooLargeException(Exception):
# repo (str): Path to CAS repository
# enable_push (bool): Whether to allow blob uploads and artifact updates
#
-def create_server(repo, *, enable_push,
- max_head_size=int(10e9),
- min_head_size=int(2e9)):
- cas = CASCache(os.path.abspath(repo))
- artifactdir = os.path.join(os.path.abspath(repo), 'artifacts', 'refs')
- sourcedir = os.path.join(os.path.abspath(repo), 'source_protos')
+@contextmanager
+def create_server(repo, *, enable_push, quota):
+ cas = CASCache(os.path.abspath(repo), cache_quota=quota, protect_session_blobs=False)
+
+ try:
+ artifactdir = os.path.join(os.path.abspath(repo), 'artifacts', 'refs')
+ sourcedir = os.path.join(os.path.abspath(repo), 'source_protos')
- # Use max_workers default from Python 3.5+
- max_workers = (os.cpu_count() or 1) * 5
- server = grpc.server(futures.ThreadPoolExecutor(max_workers))
+ # Use max_workers default from Python 3.5+
+ max_workers = (os.cpu_count() or 1) * 5
+ server = grpc.server(futures.ThreadPoolExecutor(max_workers))
- cache_cleaner = _CacheCleaner(cas, max_head_size, min_head_size)
+ bytestream_pb2_grpc.add_ByteStreamServicer_to_server(
+ _ByteStreamServicer(cas, enable_push=enable_push), server)
- bytestream_pb2_grpc.add_ByteStreamServicer_to_server(
- _ByteStreamServicer(cas, cache_cleaner, enable_push=enable_push), server)
+ remote_execution_pb2_grpc.add_ContentAddressableStorageServicer_to_server(
+ _ContentAddressableStorageServicer(cas, enable_push=enable_push), server)
- remote_execution_pb2_grpc.add_ContentAddressableStorageServicer_to_server(
- _ContentAddressableStorageServicer(cas, cache_cleaner, enable_push=enable_push), server)
+ remote_execution_pb2_grpc.add_CapabilitiesServicer_to_server(
+ _CapabilitiesServicer(), server)
- remote_execution_pb2_grpc.add_CapabilitiesServicer_to_server(
- _CapabilitiesServicer(), server)
+ buildstream_pb2_grpc.add_ReferenceStorageServicer_to_server(
+ _ReferenceStorageServicer(cas, enable_push=enable_push), server)
- buildstream_pb2_grpc.add_ReferenceStorageServicer_to_server(
- _ReferenceStorageServicer(cas, enable_push=enable_push), server)
+ artifact_pb2_grpc.add_ArtifactServiceServicer_to_server(
+ _ArtifactServicer(cas, artifactdir), server)
- artifact_pb2_grpc.add_ArtifactServiceServicer_to_server(
- _ArtifactServicer(cas, artifactdir), server)
+ source_pb2_grpc.add_SourceServiceServicer_to_server(
+ _SourceServicer(sourcedir), server)
- source_pb2_grpc.add_SourceServiceServicer_to_server(
- _SourceServicer(sourcedir), server)
+ # Create up reference storage and artifact capabilities
+ artifact_capabilities = buildstream_pb2.ArtifactCapabilities(
+ allow_updates=enable_push)
+ source_capabilities = buildstream_pb2.SourceCapabilities(
+ allow_updates=enable_push)
+ buildstream_pb2_grpc.add_CapabilitiesServicer_to_server(
+ _BuildStreamCapabilitiesServicer(artifact_capabilities, source_capabilities),
+ server)
- # Create up reference storage and artifact capabilities
- artifact_capabilities = buildstream_pb2.ArtifactCapabilities(
- allow_updates=enable_push)
- source_capabilities = buildstream_pb2.SourceCapabilities(
- allow_updates=enable_push)
- buildstream_pb2_grpc.add_CapabilitiesServicer_to_server(
- _BuildStreamCapabilitiesServicer(artifact_capabilities, source_capabilities),
- server)
+ yield server
- return server
+ finally:
+ cas.release_resources()
@click.command(short_help="CAS Artifact Server")
@@ -111,65 +107,65 @@ def create_server(repo, *, enable_push,
@click.option('--client-certs', help="Public client certificates for TLS (PEM-encoded)")
@click.option('--enable-push', default=False, is_flag=True,
help="Allow clients to upload blobs and update artifact cache")
-@click.option('--head-room-min', type=click.INT,
- help="Disk head room minimum in bytes",
- default=2e9)
-@click.option('--head-room-max', type=click.INT,
- help="Disk head room maximum in bytes",
+@click.option('--quota', type=click.INT,
+ help="Maximum disk usage in bytes",
default=10e9)
@click.argument('repo')
def server_main(repo, port, server_key, server_cert, client_certs, enable_push,
- head_room_min, head_room_max):
- server = create_server(repo,
- max_head_size=head_room_max,
- min_head_size=head_room_min,
- enable_push=enable_push)
-
- use_tls = bool(server_key)
-
- if bool(server_cert) != use_tls:
- click.echo("ERROR: --server-key and --server-cert are both required for TLS", err=True)
- sys.exit(-1)
-
- if client_certs and not use_tls:
- click.echo("ERROR: --client-certs can only be used with --server-key", err=True)
- sys.exit(-1)
-
- if use_tls:
- # Read public/private key pair
- with open(server_key, 'rb') as f:
- server_key_bytes = f.read()
- with open(server_cert, 'rb') as f:
- server_cert_bytes = f.read()
-
- if client_certs:
- with open(client_certs, 'rb') as f:
- client_certs_bytes = f.read()
- else:
- client_certs_bytes = None
+ quota):
+ # Handle SIGTERM by calling sys.exit(0), which will raise a SystemExit exception,
+ # properly executing cleanup code in `finally` clauses and context managers.
+ # This is required to terminate buildbox-casd on SIGTERM.
+ signal.signal(signal.SIGTERM, lambda signalnum, frame: sys.exit(0))
+
+ with create_server(repo,
+ quota=quota,
+ enable_push=enable_push) as server:
+
+ use_tls = bool(server_key)
+
+ if bool(server_cert) != use_tls:
+ click.echo("ERROR: --server-key and --server-cert are both required for TLS", err=True)
+ sys.exit(-1)
+
+ if client_certs and not use_tls:
+ click.echo("ERROR: --client-certs can only be used with --server-key", err=True)
+ sys.exit(-1)
+
+ if use_tls:
+ # Read public/private key pair
+ with open(server_key, 'rb') as f:
+ server_key_bytes = f.read()
+ with open(server_cert, 'rb') as f:
+ server_cert_bytes = f.read()
+
+ if client_certs:
+ with open(client_certs, 'rb') as f:
+ client_certs_bytes = f.read()
+ else:
+ client_certs_bytes = None
- credentials = grpc.ssl_server_credentials([(server_key_bytes, server_cert_bytes)],
- root_certificates=client_certs_bytes,
- require_client_auth=bool(client_certs))
- server.add_secure_port('[::]:{}'.format(port), credentials)
- else:
- server.add_insecure_port('[::]:{}'.format(port))
+ credentials = grpc.ssl_server_credentials([(server_key_bytes, server_cert_bytes)],
+ root_certificates=client_certs_bytes,
+ require_client_auth=bool(client_certs))
+ server.add_secure_port('[::]:{}'.format(port), credentials)
+ else:
+ server.add_insecure_port('[::]:{}'.format(port))
- # Run artifact server
- server.start()
- try:
- while True:
- signal.pause()
- except KeyboardInterrupt:
- server.stop(0)
+ # Run artifact server
+ server.start()
+ try:
+ while True:
+ signal.pause()
+ finally:
+ server.stop(0)
class _ByteStreamServicer(bytestream_pb2_grpc.ByteStreamServicer):
- def __init__(self, cas, cache_cleaner, *, enable_push):
+ def __init__(self, cas, *, enable_push):
super().__init__()
self.cas = cas
self.enable_push = enable_push
- self.cache_cleaner = cache_cleaner
def Read(self, request, context):
resource_name = request.resource_name
@@ -188,6 +184,8 @@ class _ByteStreamServicer(bytestream_pb2_grpc.ByteStreamServicer):
context.set_code(grpc.StatusCode.NOT_FOUND)
return
+ os.utime(f.fileno())
+
if request.read_offset > 0:
f.seek(request.read_offset)
@@ -230,12 +228,6 @@ class _ByteStreamServicer(bytestream_pb2_grpc.ByteStreamServicer):
while True:
if client_digest.size_bytes == 0:
break
- try:
- self.cache_cleaner.clean_up(client_digest.size_bytes)
- except ArtifactTooLargeException as e:
- context.set_code(grpc.StatusCode.RESOURCE_EXHAUSTED)
- context.set_details(str(e))
- return response
try:
os.posix_fallocate(out.fileno(), 0, client_digest.size_bytes)
@@ -262,10 +254,20 @@ class _ByteStreamServicer(bytestream_pb2_grpc.ByteStreamServicer):
context.set_code(grpc.StatusCode.FAILED_PRECONDITION)
return response
out.flush()
- digest = self.cas.add_object(path=out.name, link_directly=True)
+
+ try:
+ digest = self.cas.add_object(path=out.name, link_directly=True)
+ except CASCacheError as e:
+ if e.reason == "cache-too-full":
+ context.set_code(grpc.StatusCode.RESOURCE_EXHAUSTED)
+ else:
+ context.set_code(grpc.StatusCode.INTERNAL)
+ return response
+
if digest.hash != client_digest.hash:
context.set_code(grpc.StatusCode.FAILED_PRECONDITION)
return response
+
finished = True
assert finished
@@ -275,11 +277,10 @@ class _ByteStreamServicer(bytestream_pb2_grpc.ByteStreamServicer):
class _ContentAddressableStorageServicer(remote_execution_pb2_grpc.ContentAddressableStorageServicer):
- def __init__(self, cas, cache_cleaner, *, enable_push):
+ def __init__(self, cas, *, enable_push):
super().__init__()
self.cas = cas
self.enable_push = enable_push
- self.cache_cleaner = cache_cleaner
def FindMissingBlobs(self, request, context):
response = remote_execution_pb2.FindMissingBlobsResponse()
@@ -311,11 +312,14 @@ class _ContentAddressableStorageServicer(remote_execution_pb2_grpc.ContentAddres
blob_response.digest.hash = digest.hash
blob_response.digest.size_bytes = digest.size_bytes
try:
- with open(self.cas.objpath(digest), 'rb') as f:
+ objpath = self.cas.objpath(digest)
+ with open(objpath, 'rb') as f:
if os.fstat(f.fileno()).st_size != digest.size_bytes:
blob_response.status.code = code_pb2.NOT_FOUND
continue
+ os.utime(f.fileno())
+
blob_response.data = f.read(digest.size_bytes)
except FileNotFoundError:
blob_response.status.code = code_pb2.NOT_FOUND
@@ -347,18 +351,21 @@ class _ContentAddressableStorageServicer(remote_execution_pb2_grpc.ContentAddres
blob_response.status.code = code_pb2.FAILED_PRECONDITION
continue
- try:
- self.cache_cleaner.clean_up(digest.size_bytes)
+ with tempfile.NamedTemporaryFile(dir=self.cas.tmpdir) as out:
+ out.write(blob_request.data)
+ out.flush()
- with tempfile.NamedTemporaryFile(dir=self.cas.tmpdir) as out:
- out.write(blob_request.data)
- out.flush()
+ try:
server_digest = self.cas.add_object(path=out.name)
- if server_digest.hash != digest.hash:
- blob_response.status.code = code_pb2.FAILED_PRECONDITION
+ except CASCacheError as e:
+ if e.reason == "cache-too-full":
+ blob_response.status.code = code_pb2.RESOURCE_EXHAUSTED
+ else:
+ blob_response.status.code = code_pb2.INTERNAL
+ continue
- except ArtifactTooLargeException:
- blob_response.status.code = code_pb2.RESOURCE_EXHAUSTED
+ if server_digest.hash != digest.hash:
+ blob_response.status.code = code_pb2.FAILED_PRECONDITION
return response
@@ -394,7 +401,7 @@ class _ReferenceStorageServicer(buildstream_pb2_grpc.ReferenceStorageServicer):
try:
self.cas.update_tree_mtime(tree)
except FileNotFoundError:
- self.cas.remove(request.key, defer_prune=True)
+ self.cas.remove(request.key)
context.set_code(grpc.StatusCode.NOT_FOUND)
return response
@@ -602,81 +609,3 @@ def _digest_from_upload_resource_name(resource_name):
return digest
except ValueError:
return None
-
-
-class _CacheCleaner:
-
- __cleanup_cache_lock = threading.Lock()
-
- def __init__(self, cas, max_head_size, min_head_size=int(2e9)):
- self.__cas = cas
- self.__max_head_size = max_head_size
- self.__min_head_size = min_head_size
-
- def __has_space(self, object_size):
- stats = os.statvfs(self.__cas.casdir)
- free_disk_space = (stats.f_bavail * stats.f_bsize) - self.__min_head_size
- total_disk_space = (stats.f_blocks * stats.f_bsize) - self.__min_head_size
-
- if object_size > total_disk_space:
- raise ArtifactTooLargeException("Artifact of size: {} is too large for "
- "the filesystem which mounts the remote "
- "cache".format(object_size))
-
- return object_size <= free_disk_space
-
- # _clean_up_cache()
- #
- # Keep removing Least Recently Pushed (LRP) artifacts in a cache until there
- # is enough space for the incoming artifact
- #
- # Args:
- # object_size: The size of the object being received in bytes
- #
- # Returns:
- # int: The total bytes removed on the filesystem
- #
- def clean_up(self, object_size):
- if self.__has_space(object_size):
- return 0
-
- with _CacheCleaner.__cleanup_cache_lock:
- if self.__has_space(object_size):
- # Another thread has done the cleanup for us
- return 0
-
- stats = os.statvfs(self.__cas.casdir)
- target_disk_space = (stats.f_bavail * stats.f_bsize) - self.__max_head_size
-
- # obtain a list of LRP artifacts
- LRP_objects = self.__cas.list_objects()
-
- removed_size = 0 # in bytes
- last_mtime = 0
-
- while object_size - removed_size > target_disk_space:
- try:
- last_mtime, to_remove = LRP_objects.pop(0) # The first element in the list is the LRP artifact
- except IndexError:
- # This exception is caught if there are no more artifacts in the list
- # LRP_artifacts. This means the the artifact is too large for the filesystem
- # so we abort the process
- raise ArtifactTooLargeException("Artifact of size {} is too large for "
- "the filesystem which mounts the remote "
- "cache".format(object_size))
-
- try:
- size = os.stat(to_remove).st_size
- os.unlink(to_remove)
- removed_size += size
- except FileNotFoundError:
- pass
-
- self.__cas.clean_up_refs_until(last_mtime)
-
- if removed_size > 0:
- logging.info("Successfully removed %d bytes from the cache", removed_size)
- else:
- logging.info("No artifacts were removed from the cache.")
-
- return removed_size
diff --git a/src/buildstream/_context.py b/src/buildstream/_context.py
index c1a3b0619..c6cde4003 100644
--- a/src/buildstream/_context.py
+++ b/src/buildstream/_context.py
@@ -28,7 +28,7 @@ from ._profile import Topics, PROFILER
from ._platform import Platform
from ._artifactcache import ArtifactCache
from ._sourcecache import SourceCache
-from ._cas import CASCache, CASQuota, CASCacheUsage
+from ._cas import CASCache
from .types import _CacheBuildTrees, _SchedulerErrorAction
from ._workspaces import Workspaces, WorkspaceProjectCache
from .node import Node
@@ -150,6 +150,8 @@ class Context():
# Whether file contents are required for all artifacts in the local cache
self.require_artifact_files = True
+ self.fork_allowed = True
+
# Whether elements must be rebuilt when their dependencies have changed
self._strict_build_plan = None
@@ -167,7 +169,6 @@ class Context():
self._workspaces = None
self._workspace_project_cache = WorkspaceProjectCache()
self._cascache = None
- self._casquota = None
# __enter__()
#
@@ -181,7 +182,8 @@ class Context():
# Called when exiting the with-statement context.
#
def __exit__(self, exc_type, exc_value, traceback):
- return None
+ if self._cascache:
+ self._cascache.release_resources(self.messenger)
# load()
#
@@ -358,16 +360,6 @@ class Context():
return self._artifactcache
- # get_cache_usage()
- #
- # Fetches the current usage of the artifact cache
- #
- # Returns:
- # (CASCacheUsage): The current status
- #
- def get_cache_usage(self):
- return CASCacheUsage(self.get_casquota())
-
@property
def sourcecache(self):
if not self._sourcecache:
@@ -495,10 +487,15 @@ class Context():
def get_cascache(self):
if self._cascache is None:
- self._cascache = CASCache(self.cachedir)
+ self._cascache = CASCache(self.cachedir, cache_quota=self.config_cache_quota)
return self._cascache
- def get_casquota(self):
- if self._casquota is None:
- self._casquota = CASQuota(self)
- return self._casquota
+ # disable_fork():
+ #
+ # This will prevent the scheduler from running but will allow communication
+ # with casd in the main process.
+ #
+ def disable_fork(self):
+ self.fork_allowed = False
+ cascache = self.get_cascache()
+ cascache.notify_fork_disabled()
diff --git a/src/buildstream/_frontend/cli.py b/src/buildstream/_frontend/cli.py
index a77bd80e8..ecfea05bb 100644
--- a/src/buildstream/_frontend/cli.py
+++ b/src/buildstream/_frontend/cli.py
@@ -1242,14 +1242,12 @@ def artifact_list_contents(app, artifacts):
# Artifact Delete Command #
###################################################################
@artifact.command(name='delete', short_help="Remove artifacts from the local cache")
-@click.option('--no-prune', 'no_prune', default=False, is_flag=True,
- help="Do not prune the local cache of unreachable refs")
@click.argument('artifacts', type=click.Path(), nargs=-1)
@click.pass_obj
-def artifact_delete(app, artifacts, no_prune):
+def artifact_delete(app, artifacts):
"""Remove artifacts from the local cache"""
with app.initialized():
- app.stream.artifact_delete(artifacts, no_prune)
+ app.stream.artifact_delete(artifacts)
##################################################################
diff --git a/src/buildstream/_frontend/status.py b/src/buildstream/_frontend/status.py
index 38e388818..d0070cef0 100644
--- a/src/buildstream/_frontend/status.py
+++ b/src/buildstream/_frontend/status.py
@@ -350,7 +350,7 @@ class _StatusHeader():
#
# Public members
#
- self.lines = 3
+ self.lines = 2
#
# Private members
@@ -413,31 +413,7 @@ class _StatusHeader():
line2 = self._centered(text, size, line_length, ' ')
- #
- # Line 3: Cache usage percentage report
- #
- # ~~~~~~ cache: 69% ~~~~~~
- #
- usage = self._context.get_cache_usage()
- usage_percent = '{}%'.format(usage.used_percent)
-
- size = 21
- size += len(usage_percent)
- if usage.used_percent >= 95:
- formatted_usage_percent = self._error_profile.fmt(usage_percent)
- elif usage.used_percent >= 80:
- formatted_usage_percent = self._content_profile.fmt(usage_percent)
- else:
- formatted_usage_percent = self._success_profile.fmt(usage_percent)
-
- text = self._format_profile.fmt("~~~~~~ ") + \
- self._content_profile.fmt('cache') + \
- self._format_profile.fmt(': ') + \
- formatted_usage_percent + \
- self._format_profile.fmt(' ~~~~~~')
- line3 = self._centered(text, size, line_length, ' ')
-
- return line1 + '\n' + line2 + '\n' + line3
+ return line1 + '\n' + line2
###################################################
# Private Methods #
diff --git a/src/buildstream/_frontend/widget.py b/src/buildstream/_frontend/widget.py
index 20f5d1767..955680f9b 100644
--- a/src/buildstream/_frontend/widget.py
+++ b/src/buildstream/_frontend/widget.py
@@ -475,7 +475,6 @@ class LogLine(Widget):
values["Session Start"] = starttime.strftime('%A, %d-%m-%Y at %H:%M:%S')
values["Project"] = "{} ({})".format(project.name, project.directory)
values["Targets"] = ", ".join([t.name for t in stream.targets])
- values["Cache Usage"] = str(context.get_cache_usage())
text += self._format_values(values)
# User configurations
diff --git a/src/buildstream/_protos/build/buildgrid/__init__.py b/src/buildstream/_protos/build/buildgrid/__init__.py
new file mode 100644
index 000000000..e69de29bb
--- /dev/null
+++ b/src/buildstream/_protos/build/buildgrid/__init__.py
diff --git a/src/buildstream/_protos/build/buildgrid/local_cas.proto b/src/buildstream/_protos/build/buildgrid/local_cas.proto
new file mode 100644
index 000000000..671caef0e
--- /dev/null
+++ b/src/buildstream/_protos/build/buildgrid/local_cas.proto
@@ -0,0 +1,373 @@
+// Copyright (C) 2018-2019 Bloomberg LP
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+syntax = "proto3";
+
+package build.buildgrid;
+
+import "build/bazel/remote/execution/v2/remote_execution.proto";
+import "google/api/annotations.proto";
+import "google/rpc/status.proto";
+
+service LocalContentAddressableStorage {
+ // Fetch blobs from a remote CAS to the local cache.
+ //
+ // This request is equivalent to ByteStream `Read` or `BatchReadBlobs`
+ // requests, storing the downloaded blobs in the local cache.
+ //
+ // Requested blobs that failed to be downloaded will be listed in the
+ // response.
+ //
+ // Errors:
+ // * `INVALID_ARGUMENT`: The client attempted to download more than the
+ // server supported limit.
+ //
+ // Individual requests may return the following error, additionally:
+ // * `NOT_FOUND`: The requested blob is not present in the remote CAS.
+ rpc FetchMissingBlobs(FetchMissingBlobsRequest) returns (FetchMissingBlobsResponse) {}
+
+ // Upload blobs from the local cache to a remote CAS.
+ //
+ // This request is equivalent to `FindMissingBlobs` followed by
+ // ByteStream `Write` or `BatchUpdateBlobs` requests.
+ //
+ // Blobs that failed to be uploaded will be listed in the response.
+ //
+ // Errors:
+ // * `INVALID_ARGUMENT`: The client attempted to upload more than the
+ // server supported limit.
+ //
+ // Individual requests may return the following error, additionally:
+ // * `NOT_FOUND`: The requested blob is not present in the local cache.
+ // * `RESOURCE_EXHAUSTED`: There is insufficient disk quota to store the blob.
+ rpc UploadMissingBlobs(UploadMissingBlobsRequest) returns (UploadMissingBlobsResponse) {}
+
+ // Fetch the entire directory tree rooted at a node from a remote CAS to the
+ // local cache.
+ //
+ // This request is equivalent to `GetTree`, storing the `Directory` objects
+ // in the local cache. Optionally, this will also fetch all blobs referenced
+ // by the `Directory` objects, equivalent to `FetchMissingBlobs`.
+ //
+ // If part of the tree is missing from the CAS, the server will return the
+ // portion present and omit the rest.
+ //
+ // * `NOT_FOUND`: The requested tree root is not present in the CAS.
+ rpc FetchTree(FetchTreeRequest) returns (FetchTreeResponse) {}
+
+ // Upload the entire directory tree from the local cache to a remote CAS.
+ //
+ // This request is equivalent to `UploadMissingBlobs` for all blobs
+ // referenced by the specified tree (recursively).
+ //
+ // Errors:
+ // * `NOT_FOUND`: The requested tree root is not present in the local cache.
+ // * `RESOURCE_EXHAUSTED`: There is insufficient disk quota to store the tree.
+ rpc UploadTree(UploadTreeRequest) returns (UploadTreeResponse) {}
+
+ // Stage a directory tree in the local filesystem.
+ //
+ // This makes the specified directory tree temporarily available for local
+ // filesystem access. It is implementation-defined whether this uses a
+ // userspace filesystem such as FUSE, hardlinking or a full copy.
+ //
+ // Missing blobs are fetched, if a CAS remote is configured.
+ //
+ // The staging starts when the server receives the initial request and
+ // it is ready to be used on the initial (non-error) response from the
+ // server.
+ //
+ // The server will clean up the staged directory when it either
+ // receives an additional request (with all fields unset) or when the
+ // stream is closed. The server will send an additional response after
+ // cleanup is complete.
+ rpc StageTree(stream StageTreeRequest) returns (stream StageTreeResponse) {}
+
+ // Capture a directory tree from the local filesystem.
+ //
+ // This imports the specified path from the local filesystem into CAS.
+ //
+ // If a CAS remote is configured, the blobs are uploaded.
+ // The `bypass_local_cache` parameter is a hint to indicate whether the blobs
+ // shall be uploaded without first storing them in the local cache.
+ rpc CaptureTree(CaptureTreeRequest) returns (CaptureTreeResponse) {}
+
+ // Capture files from the local filesystem.
+ //
+ // This imports the specified paths from the local filesystem into CAS.
+ //
+ // If a CAS remote is configured, the blobs are uploaded.
+ // The `bypass_local_cache` parameter is a hint to indicate whether the blobs
+ // shall be uploaded without first storing them in the local cache.
+ rpc CaptureFiles(CaptureFilesRequest) returns (CaptureFilesResponse) {}
+
+ // Configure remote CAS endpoint.
+ //
+ // This returns a string that can be used as instance_name to access the
+ // specified endpoint in further requests.
+ rpc GetInstanceNameForRemote(GetInstanceNameForRemoteRequest) returns (GetInstanceNameForRemoteResponse) {}
+}
+
+// A request message for
+// [LocalContentAddressableStorage.FetchMissingBlobs][build.buildgrid.v2.LocalContentAddressableStorage.FetchMissingBlobs].
+message FetchMissingBlobsRequest {
+ // The instance of the execution system to operate against. A server may
+ // support multiple instances of the execution system (with their own workers,
+ // storage, caches, etc.). The server MAY require use of this field to select
+ // between them in an implementation-defined fashion, otherwise it can be
+ // omitted.
+ string instance_name = 1;
+
+ // A list of the blobs to fetch.
+ repeated build.bazel.remote.execution.v2.Digest blob_digests = 2;
+}
+
+// A response message for
+// [LocalContentAddressableStorage.FetchMissingBlobs][build.buildgrid.v2.LocalContentAddressableStorage.FetchMissingBlobs].
+message FetchMissingBlobsResponse {
+ // A response corresponding to a single blob that the client tried to upload.
+ message Response {
+ // The digest to which this response corresponds.
+ build.bazel.remote.execution.v2.Digest digest = 1;
+
+ // The result of attempting to download that blob.
+ google.rpc.Status status = 2;
+ }
+
+ // The responses to the failed requests.
+ repeated Response responses = 1;
+}
+
+// A request message for
+// [LocalContentAddressableStorage.UploadMissingBlobs][build.buildgrid.v2.LocalContentAddressableStorage.UploadMissingBlobs].
+message UploadMissingBlobsRequest {
+ // The instance of the execution system to operate against. A server may
+ // support multiple instances of the execution system (with their own workers,
+ // storage, caches, etc.). The server MAY require use of this field to select
+ // between them in an implementation-defined fashion, otherwise it can be
+ // omitted.
+ string instance_name = 1;
+
+ // A list of the blobs to fetch.
+ repeated build.bazel.remote.execution.v2.Digest blob_digests = 2;
+}
+
+// A response message for
+// [LocalContentAddressableStorage.UploadMissingBlobs][build.buildgrid.v2.LocalContentAddressableStorage.UploadMissingBlobs].
+message UploadMissingBlobsResponse {
+ // A response corresponding to a single blob that the client tried to upload.
+ message Response {
+ // The digest to which this response corresponds.
+ build.bazel.remote.execution.v2.Digest digest = 1;
+
+ // The result of attempting to upload that blob.
+ google.rpc.Status status = 2;
+ }
+
+ // The responses to the failed requests.
+ repeated Response responses = 1;
+}
+
+// A request message for
+// [LocalContentAddressableStorage.FetchTree][build.buildgrid.v2.LocalContentAddressableStorage.FetchTree].
+message FetchTreeRequest {
+ // The instance of the execution system to operate against. A server may
+ // support multiple instances of the execution system (with their own workers,
+ // storage, caches, etc.). The server MAY require use of this field to select
+ // between them in an implementation-defined fashion, otherwise it can be
+ // omitted.
+ string instance_name = 1;
+
+ // The digest of the root, which must be an encoded
+ // [Directory][build.bazel.remote.execution.v2.Directory] message
+ // stored in the
+ // [ContentAddressableStorage][build.bazel.remote.execution.v2.ContentAddressableStorage].
+ build.bazel.remote.execution.v2.Digest root_digest = 2;
+
+ // Whether to fetch blobs of files in the directory tree.
+ bool fetch_file_blobs = 3;
+}
+
+// A response message for
+// [LocalContentAddressableStorage.FetchTree][build.buildgrid.v2.LocalContentAddressableStorage.FetchTree].
+message FetchTreeResponse {
+}
+
+// A request message for
+// [LocalContentAddressableStorage.UploadTree][build.buildgrid.v2.LocalContentAddressableStorage.UploadTree].
+message UploadTreeRequest {
+ // The instance of the execution system to operate against. A server may
+ // support multiple instances of the execution system (with their own workers,
+ // storage, caches, etc.). The server MAY require use of this field to select
+ // between them in an implementation-defined fashion, otherwise it can be
+ // omitted.
+ string instance_name = 1;
+
+ // The digest of the root, which must be an encoded
+ // [Directory][build.bazel.remote.execution.v2.Directory] message
+ // stored in the
+ // [ContentAddressableStorage][build.bazel.remote.execution.v2.ContentAddressableStorage].
+ build.bazel.remote.execution.v2.Digest root_digest = 2;
+}
+
+// A response message for
+// [LocalContentAddressableStorage.UploadTree][build.buildgrid.v2.LocalContentAddressableStorage.UploadTree].
+message UploadTreeResponse {
+}
+
+// A request message for
+// [LocalContentAddressableStorage.StageTree][build.buildgrid.v2.LocalContentAddressableStorage.StageTree].
+message StageTreeRequest {
+ // The instance of the execution system to operate against. A server may
+ // support multiple instances of the execution system (with their own workers,
+ // storage, caches, etc.). The server MAY require use of this field to select
+ // between them in an implementation-defined fashion, otherwise it can be
+ // omitted.
+ string instance_name = 1;
+
+ // The digest of the root, which must be an encoded
+ // [Directory][build.bazel.remote.execution.v2.Directory] message
+ // stored in the
+ // [ContentAddressableStorage][build.bazel.remote.execution.v2.ContentAddressableStorage].
+ build.bazel.remote.execution.v2.Digest root_digest = 2;
+
+ // The path in the local filesystem where the specified tree should be
+ // staged. It must either point to an empty directory or not exist yet.
+ //
+ // This is optional. If not specified, the server will choose a location.
+ // The server may require the path to be on the same filesystem as the local
+ // cache to support hardlinking.
+ //
+ // The path may be a subdirectory of another staged tree. The lifetime of
+ // this staged tree will in that case be limited to the lifetime of the
+ // parent.
+ string path = 3;
+}
+
+// A response message for
+// [LocalContentAddressableStorage.StageTree][build.buildgrid.v2.LocalContentAddressableStorage.StageTree].
+message StageTreeResponse {
+ // The path in the local filesystem where the specified tree has been staged.
+ string path = 1;
+}
+
+// A request message for
+// [LocalContentAddressableStorage.CaptureTree][build.buildgrid.v2.LocalContentAddressableStorage.CaptureTree].
+message CaptureTreeRequest {
+ // The instance of the execution system to operate against. A server may
+ // support multiple instances of the execution system (with their own workers,
+ // storage, caches, etc.). The server MAY require use of this field to select
+ // between them in an implementation-defined fashion, otherwise it can be
+ // omitted.
+ string instance_name = 1;
+
+ // The path(s) in the local filesystem to capture.
+ repeated string path = 2;
+
+ // This is a hint whether the blobs shall be uploaded to the remote CAS
+ // without first storing them in the local cache.
+ bool bypass_local_cache = 3;
+}
+
+// A response message for
+// [LocalContentAddressableStorage.CaptureTree][build.buildgrid.v2.LocalContentAddressableStorage.CaptureTree].
+message CaptureTreeResponse {
+ // A response corresponding to a single blob that the client tried to upload.
+ message Response {
+ // The path to which this response corresponds.
+ string path = 1;
+
+ // The digest of the captured tree as an encoded
+ // [Tree][build.bazel.remote.execution.v2.Tree] proto containing the
+ // directory's contents, if successful.
+ build.bazel.remote.execution.v2.Digest tree_digest = 2;
+
+ // The result of attempting to capture and upload the tree.
+ google.rpc.Status status = 3;
+ }
+
+ // The responses to the requests.
+ repeated Response responses = 1;
+}
+
+// A request message for
+// [LocalContentAddressableStorage.CaptureFiles][build.buildgrid.v2.LocalContentAddressableStorage.CaptureFiles].
+message CaptureFilesRequest {
+ // The instance of the execution system to operate against. A server may
+ // support multiple instances of the execution system (with their own workers,
+ // storage, caches, etc.). The server MAY require use of this field to select
+ // between them in an implementation-defined fashion, otherwise it can be
+ // omitted.
+ string instance_name = 1;
+
+ // The path(s) in the local filesystem to capture.
+ repeated string path = 2;
+
+ // This is a hint whether the blobs shall be uploaded to the remote CAS
+ // without first storing them in the local cache.
+ bool bypass_local_cache = 3;
+}
+
+// A response message for
+// [LocalContentAddressableStorage.CaptureFiles][build.buildgrid.v2.LocalContentAddressableStorage.CaptureFiles].
+message CaptureFilesResponse {
+ // A response corresponding to a single blob that the client tried to upload.
+ message Response {
+ // The path to which this response corresponds.
+ string path = 1;
+
+ // The digest of the captured file's content, if successful.
+ build.bazel.remote.execution.v2.Digest digest = 2;
+
+ // The result of attempting to capture and upload the file.
+ google.rpc.Status status = 3;
+ }
+
+ // The responses to the requests.
+ repeated Response responses = 1;
+}
+
+// A request message for
+// [LocalContentAddressableStorage.GetInstanceNameForRemote][build.buildgrid.v2.LocalContentAddressableStorage.GetInstanceNameForRemote].
+message GetInstanceNameForRemoteRequest {
+ // The URL for the remote CAS server.
+ string url = 1;
+
+ // The instance of the execution system to operate against. A server may
+ // support multiple instances of the execution system (with their own workers,
+ // storage, caches, etc.). The server MAY require use of this field to select
+ // between them in an implementation-defined fashion, otherwise it can be
+ // omitted.
+ string instance_name = 2;
+
+ // PEM-encoded public server certificate for https connections to the remote
+ // CAS server.
+ bytes server_cert = 3;
+
+ // PEM-encoded private client key for https with certificate-based client
+ // authentication. If this is specified, `client_cert` must be specified
+ // as well.
+ bytes client_key = 4;
+
+ // PEM-encoded public client certificate for https with certificate-based
+ // client authentication. If this is specified, `client_key` must be
+ // specified as well.
+ bytes client_cert = 5;
+}
+
+// A response message for
+// [LocalContentAddressableStorage.GetInstanceNameForRemote][build.buildgrid.v2.LocalContentAddressableStorage.GetInstanceNameForRemote].
+message GetInstanceNameForRemoteResponse {
+ string instance_name = 1;
+}
diff --git a/src/buildstream/_protos/build/buildgrid/local_cas_pb2.py b/src/buildstream/_protos/build/buildgrid/local_cas_pb2.py
new file mode 100644
index 000000000..d1ef586f8
--- /dev/null
+++ b/src/buildstream/_protos/build/buildgrid/local_cas_pb2.py
@@ -0,0 +1,1052 @@
+# -*- coding: utf-8 -*-
+# Generated by the protocol buffer compiler. DO NOT EDIT!
+# source: build/buildgrid/local_cas.proto
+
+import sys
+_b=sys.version_info[0]<3 and (lambda x:x) or (lambda x:x.encode('latin1'))
+from google.protobuf import descriptor as _descriptor
+from google.protobuf import message as _message
+from google.protobuf import reflection as _reflection
+from google.protobuf import symbol_database as _symbol_database
+# @@protoc_insertion_point(imports)
+
+_sym_db = _symbol_database.Default()
+
+
+from buildstream._protos.build.bazel.remote.execution.v2 import remote_execution_pb2 as build_dot_bazel_dot_remote_dot_execution_dot_v2_dot_remote__execution__pb2
+from buildstream._protos.google.api import annotations_pb2 as google_dot_api_dot_annotations__pb2
+from buildstream._protos.google.rpc import status_pb2 as google_dot_rpc_dot_status__pb2
+
+
+DESCRIPTOR = _descriptor.FileDescriptor(
+ name='build/buildgrid/local_cas.proto',
+ package='build.buildgrid',
+ syntax='proto3',
+ serialized_options=None,
+ serialized_pb=_b('\n\x1f\x62uild/buildgrid/local_cas.proto\x12\x0f\x62uild.buildgrid\x1a\x36\x62uild/bazel/remote/execution/v2/remote_execution.proto\x1a\x1cgoogle/api/annotations.proto\x1a\x17google/rpc/status.proto\"p\n\x18\x46\x65tchMissingBlobsRequest\x12\x15\n\rinstance_name\x18\x01 \x01(\t\x12=\n\x0c\x62lob_digests\x18\x02 \x03(\x0b\x32\'.build.bazel.remote.execution.v2.Digest\"\xcc\x01\n\x19\x46\x65tchMissingBlobsResponse\x12\x46\n\tresponses\x18\x01 \x03(\x0b\x32\x33.build.buildgrid.FetchMissingBlobsResponse.Response\x1ag\n\x08Response\x12\x37\n\x06\x64igest\x18\x01 \x01(\x0b\x32\'.build.bazel.remote.execution.v2.Digest\x12\"\n\x06status\x18\x02 \x01(\x0b\x32\x12.google.rpc.Status\"q\n\x19UploadMissingBlobsRequest\x12\x15\n\rinstance_name\x18\x01 \x01(\t\x12=\n\x0c\x62lob_digests\x18\x02 \x03(\x0b\x32\'.build.bazel.remote.execution.v2.Digest\"\xce\x01\n\x1aUploadMissingBlobsResponse\x12G\n\tresponses\x18\x01 \x03(\x0b\x32\x34.build.buildgrid.UploadMissingBlobsResponse.Response\x1ag\n\x08Response\x12\x37\n\x06\x64igest\x18\x01 \x01(\x0b\x32\'.build.bazel.remote.execution.v2.Digest\x12\"\n\x06status\x18\x02 \x01(\x0b\x32\x12.google.rpc.Status\"\x81\x01\n\x10\x46\x65tchTreeRequest\x12\x15\n\rinstance_name\x18\x01 \x01(\t\x12<\n\x0broot_digest\x18\x02 \x01(\x0b\x32\'.build.bazel.remote.execution.v2.Digest\x12\x18\n\x10\x66\x65tch_file_blobs\x18\x03 \x01(\x08\"\x13\n\x11\x46\x65tchTreeResponse\"h\n\x11UploadTreeRequest\x12\x15\n\rinstance_name\x18\x01 \x01(\t\x12<\n\x0broot_digest\x18\x02 \x01(\x0b\x32\'.build.bazel.remote.execution.v2.Digest\"\x14\n\x12UploadTreeResponse\"u\n\x10StageTreeRequest\x12\x15\n\rinstance_name\x18\x01 \x01(\t\x12<\n\x0broot_digest\x18\x02 \x01(\x0b\x32\'.build.bazel.remote.execution.v2.Digest\x12\x0c\n\x04path\x18\x03 \x01(\t\"!\n\x11StageTreeResponse\x12\x0c\n\x04path\x18\x01 \x01(\t\"U\n\x12\x43\x61ptureTreeRequest\x12\x15\n\rinstance_name\x18\x01 \x01(\t\x12\x0c\n\x04path\x18\x02 \x03(\t\x12\x1a\n\x12\x62ypass_local_cache\x18\x03 \x01(\x08\"\xd3\x01\n\x13\x43\x61ptureTreeResponse\x12@\n\tresponses\x18\x01 \x03(\x0b\x32-.build.buildgrid.CaptureTreeResponse.Response\x1az\n\x08Response\x12\x0c\n\x04path\x18\x01 \x01(\t\x12<\n\x0btree_digest\x18\x02 \x01(\x0b\x32\'.build.bazel.remote.execution.v2.Digest\x12\"\n\x06status\x18\x03 \x01(\x0b\x32\x12.google.rpc.Status\"V\n\x13\x43\x61ptureFilesRequest\x12\x15\n\rinstance_name\x18\x01 \x01(\t\x12\x0c\n\x04path\x18\x02 \x03(\t\x12\x1a\n\x12\x62ypass_local_cache\x18\x03 \x01(\x08\"\xd0\x01\n\x14\x43\x61ptureFilesResponse\x12\x41\n\tresponses\x18\x01 \x03(\x0b\x32..build.buildgrid.CaptureFilesResponse.Response\x1au\n\x08Response\x12\x0c\n\x04path\x18\x01 \x01(\t\x12\x37\n\x06\x64igest\x18\x02 \x01(\x0b\x32\'.build.bazel.remote.execution.v2.Digest\x12\"\n\x06status\x18\x03 \x01(\x0b\x32\x12.google.rpc.Status\"\x83\x01\n\x1fGetInstanceNameForRemoteRequest\x12\x0b\n\x03url\x18\x01 \x01(\t\x12\x15\n\rinstance_name\x18\x02 \x01(\t\x12\x13\n\x0bserver_cert\x18\x03 \x01(\x0c\x12\x12\n\nclient_key\x18\x04 \x01(\x0c\x12\x13\n\x0b\x63lient_cert\x18\x05 \x01(\x0c\"9\n GetInstanceNameForRemoteResponse\x12\x15\n\rinstance_name\x18\x01 \x01(\t2\xc7\x06\n\x1eLocalContentAddressableStorage\x12l\n\x11\x46\x65tchMissingBlobs\x12).build.buildgrid.FetchMissingBlobsRequest\x1a*.build.buildgrid.FetchMissingBlobsResponse\"\x00\x12o\n\x12UploadMissingBlobs\x12*.build.buildgrid.UploadMissingBlobsRequest\x1a+.build.buildgrid.UploadMissingBlobsResponse\"\x00\x12T\n\tFetchTree\x12!.build.buildgrid.FetchTreeRequest\x1a\".build.buildgrid.FetchTreeResponse\"\x00\x12W\n\nUploadTree\x12\".build.buildgrid.UploadTreeRequest\x1a#.build.buildgrid.UploadTreeResponse\"\x00\x12X\n\tStageTree\x12!.build.buildgrid.StageTreeRequest\x1a\".build.buildgrid.StageTreeResponse\"\x00(\x01\x30\x01\x12Z\n\x0b\x43\x61ptureTree\x12#.build.buildgrid.CaptureTreeRequest\x1a$.build.buildgrid.CaptureTreeResponse\"\x00\x12]\n\x0c\x43\x61ptureFiles\x12$.build.buildgrid.CaptureFilesRequest\x1a%.build.buildgrid.CaptureFilesResponse\"\x00\x12\x81\x01\n\x18GetInstanceNameForRemote\x12\x30.build.buildgrid.GetInstanceNameForRemoteRequest\x1a\x31.build.buildgrid.GetInstanceNameForRemoteResponse\"\x00\x62\x06proto3')
+ ,
+ dependencies=[build_dot_bazel_dot_remote_dot_execution_dot_v2_dot_remote__execution__pb2.DESCRIPTOR,google_dot_api_dot_annotations__pb2.DESCRIPTOR,google_dot_rpc_dot_status__pb2.DESCRIPTOR,])
+
+
+
+
+_FETCHMISSINGBLOBSREQUEST = _descriptor.Descriptor(
+ name='FetchMissingBlobsRequest',
+ full_name='build.buildgrid.FetchMissingBlobsRequest',
+ filename=None,
+ file=DESCRIPTOR,
+ containing_type=None,
+ fields=[
+ _descriptor.FieldDescriptor(
+ name='instance_name', full_name='build.buildgrid.FetchMissingBlobsRequest.instance_name', index=0,
+ number=1, type=9, cpp_type=9, label=1,
+ has_default_value=False, default_value=_b("").decode('utf-8'),
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ serialized_options=None, file=DESCRIPTOR),
+ _descriptor.FieldDescriptor(
+ name='blob_digests', full_name='build.buildgrid.FetchMissingBlobsRequest.blob_digests', index=1,
+ number=2, type=11, cpp_type=10, label=3,
+ has_default_value=False, default_value=[],
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ serialized_options=None, file=DESCRIPTOR),
+ ],
+ extensions=[
+ ],
+ nested_types=[],
+ enum_types=[
+ ],
+ serialized_options=None,
+ is_extendable=False,
+ syntax='proto3',
+ extension_ranges=[],
+ oneofs=[
+ ],
+ serialized_start=163,
+ serialized_end=275,
+)
+
+
+_FETCHMISSINGBLOBSRESPONSE_RESPONSE = _descriptor.Descriptor(
+ name='Response',
+ full_name='build.buildgrid.FetchMissingBlobsResponse.Response',
+ filename=None,
+ file=DESCRIPTOR,
+ containing_type=None,
+ fields=[
+ _descriptor.FieldDescriptor(
+ name='digest', full_name='build.buildgrid.FetchMissingBlobsResponse.Response.digest', index=0,
+ number=1, type=11, cpp_type=10, label=1,
+ has_default_value=False, default_value=None,
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ serialized_options=None, file=DESCRIPTOR),
+ _descriptor.FieldDescriptor(
+ name='status', full_name='build.buildgrid.FetchMissingBlobsResponse.Response.status', index=1,
+ number=2, type=11, cpp_type=10, label=1,
+ has_default_value=False, default_value=None,
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ serialized_options=None, file=DESCRIPTOR),
+ ],
+ extensions=[
+ ],
+ nested_types=[],
+ enum_types=[
+ ],
+ serialized_options=None,
+ is_extendable=False,
+ syntax='proto3',
+ extension_ranges=[],
+ oneofs=[
+ ],
+ serialized_start=379,
+ serialized_end=482,
+)
+
+_FETCHMISSINGBLOBSRESPONSE = _descriptor.Descriptor(
+ name='FetchMissingBlobsResponse',
+ full_name='build.buildgrid.FetchMissingBlobsResponse',
+ filename=None,
+ file=DESCRIPTOR,
+ containing_type=None,
+ fields=[
+ _descriptor.FieldDescriptor(
+ name='responses', full_name='build.buildgrid.FetchMissingBlobsResponse.responses', index=0,
+ number=1, type=11, cpp_type=10, label=3,
+ has_default_value=False, default_value=[],
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ serialized_options=None, file=DESCRIPTOR),
+ ],
+ extensions=[
+ ],
+ nested_types=[_FETCHMISSINGBLOBSRESPONSE_RESPONSE, ],
+ enum_types=[
+ ],
+ serialized_options=None,
+ is_extendable=False,
+ syntax='proto3',
+ extension_ranges=[],
+ oneofs=[
+ ],
+ serialized_start=278,
+ serialized_end=482,
+)
+
+
+_UPLOADMISSINGBLOBSREQUEST = _descriptor.Descriptor(
+ name='UploadMissingBlobsRequest',
+ full_name='build.buildgrid.UploadMissingBlobsRequest',
+ filename=None,
+ file=DESCRIPTOR,
+ containing_type=None,
+ fields=[
+ _descriptor.FieldDescriptor(
+ name='instance_name', full_name='build.buildgrid.UploadMissingBlobsRequest.instance_name', index=0,
+ number=1, type=9, cpp_type=9, label=1,
+ has_default_value=False, default_value=_b("").decode('utf-8'),
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ serialized_options=None, file=DESCRIPTOR),
+ _descriptor.FieldDescriptor(
+ name='blob_digests', full_name='build.buildgrid.UploadMissingBlobsRequest.blob_digests', index=1,
+ number=2, type=11, cpp_type=10, label=3,
+ has_default_value=False, default_value=[],
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ serialized_options=None, file=DESCRIPTOR),
+ ],
+ extensions=[
+ ],
+ nested_types=[],
+ enum_types=[
+ ],
+ serialized_options=None,
+ is_extendable=False,
+ syntax='proto3',
+ extension_ranges=[],
+ oneofs=[
+ ],
+ serialized_start=484,
+ serialized_end=597,
+)
+
+
+_UPLOADMISSINGBLOBSRESPONSE_RESPONSE = _descriptor.Descriptor(
+ name='Response',
+ full_name='build.buildgrid.UploadMissingBlobsResponse.Response',
+ filename=None,
+ file=DESCRIPTOR,
+ containing_type=None,
+ fields=[
+ _descriptor.FieldDescriptor(
+ name='digest', full_name='build.buildgrid.UploadMissingBlobsResponse.Response.digest', index=0,
+ number=1, type=11, cpp_type=10, label=1,
+ has_default_value=False, default_value=None,
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ serialized_options=None, file=DESCRIPTOR),
+ _descriptor.FieldDescriptor(
+ name='status', full_name='build.buildgrid.UploadMissingBlobsResponse.Response.status', index=1,
+ number=2, type=11, cpp_type=10, label=1,
+ has_default_value=False, default_value=None,
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ serialized_options=None, file=DESCRIPTOR),
+ ],
+ extensions=[
+ ],
+ nested_types=[],
+ enum_types=[
+ ],
+ serialized_options=None,
+ is_extendable=False,
+ syntax='proto3',
+ extension_ranges=[],
+ oneofs=[
+ ],
+ serialized_start=379,
+ serialized_end=482,
+)
+
+_UPLOADMISSINGBLOBSRESPONSE = _descriptor.Descriptor(
+ name='UploadMissingBlobsResponse',
+ full_name='build.buildgrid.UploadMissingBlobsResponse',
+ filename=None,
+ file=DESCRIPTOR,
+ containing_type=None,
+ fields=[
+ _descriptor.FieldDescriptor(
+ name='responses', full_name='build.buildgrid.UploadMissingBlobsResponse.responses', index=0,
+ number=1, type=11, cpp_type=10, label=3,
+ has_default_value=False, default_value=[],
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ serialized_options=None, file=DESCRIPTOR),
+ ],
+ extensions=[
+ ],
+ nested_types=[_UPLOADMISSINGBLOBSRESPONSE_RESPONSE, ],
+ enum_types=[
+ ],
+ serialized_options=None,
+ is_extendable=False,
+ syntax='proto3',
+ extension_ranges=[],
+ oneofs=[
+ ],
+ serialized_start=600,
+ serialized_end=806,
+)
+
+
+_FETCHTREEREQUEST = _descriptor.Descriptor(
+ name='FetchTreeRequest',
+ full_name='build.buildgrid.FetchTreeRequest',
+ filename=None,
+ file=DESCRIPTOR,
+ containing_type=None,
+ fields=[
+ _descriptor.FieldDescriptor(
+ name='instance_name', full_name='build.buildgrid.FetchTreeRequest.instance_name', index=0,
+ number=1, type=9, cpp_type=9, label=1,
+ has_default_value=False, default_value=_b("").decode('utf-8'),
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ serialized_options=None, file=DESCRIPTOR),
+ _descriptor.FieldDescriptor(
+ name='root_digest', full_name='build.buildgrid.FetchTreeRequest.root_digest', index=1,
+ number=2, type=11, cpp_type=10, label=1,
+ has_default_value=False, default_value=None,
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ serialized_options=None, file=DESCRIPTOR),
+ _descriptor.FieldDescriptor(
+ name='fetch_file_blobs', full_name='build.buildgrid.FetchTreeRequest.fetch_file_blobs', index=2,
+ number=3, type=8, cpp_type=7, label=1,
+ has_default_value=False, default_value=False,
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ serialized_options=None, file=DESCRIPTOR),
+ ],
+ extensions=[
+ ],
+ nested_types=[],
+ enum_types=[
+ ],
+ serialized_options=None,
+ is_extendable=False,
+ syntax='proto3',
+ extension_ranges=[],
+ oneofs=[
+ ],
+ serialized_start=809,
+ serialized_end=938,
+)
+
+
+_FETCHTREERESPONSE = _descriptor.Descriptor(
+ name='FetchTreeResponse',
+ full_name='build.buildgrid.FetchTreeResponse',
+ filename=None,
+ file=DESCRIPTOR,
+ containing_type=None,
+ fields=[
+ ],
+ extensions=[
+ ],
+ nested_types=[],
+ enum_types=[
+ ],
+ serialized_options=None,
+ is_extendable=False,
+ syntax='proto3',
+ extension_ranges=[],
+ oneofs=[
+ ],
+ serialized_start=940,
+ serialized_end=959,
+)
+
+
+_UPLOADTREEREQUEST = _descriptor.Descriptor(
+ name='UploadTreeRequest',
+ full_name='build.buildgrid.UploadTreeRequest',
+ filename=None,
+ file=DESCRIPTOR,
+ containing_type=None,
+ fields=[
+ _descriptor.FieldDescriptor(
+ name='instance_name', full_name='build.buildgrid.UploadTreeRequest.instance_name', index=0,
+ number=1, type=9, cpp_type=9, label=1,
+ has_default_value=False, default_value=_b("").decode('utf-8'),
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ serialized_options=None, file=DESCRIPTOR),
+ _descriptor.FieldDescriptor(
+ name='root_digest', full_name='build.buildgrid.UploadTreeRequest.root_digest', index=1,
+ number=2, type=11, cpp_type=10, label=1,
+ has_default_value=False, default_value=None,
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ serialized_options=None, file=DESCRIPTOR),
+ ],
+ extensions=[
+ ],
+ nested_types=[],
+ enum_types=[
+ ],
+ serialized_options=None,
+ is_extendable=False,
+ syntax='proto3',
+ extension_ranges=[],
+ oneofs=[
+ ],
+ serialized_start=961,
+ serialized_end=1065,
+)
+
+
+_UPLOADTREERESPONSE = _descriptor.Descriptor(
+ name='UploadTreeResponse',
+ full_name='build.buildgrid.UploadTreeResponse',
+ filename=None,
+ file=DESCRIPTOR,
+ containing_type=None,
+ fields=[
+ ],
+ extensions=[
+ ],
+ nested_types=[],
+ enum_types=[
+ ],
+ serialized_options=None,
+ is_extendable=False,
+ syntax='proto3',
+ extension_ranges=[],
+ oneofs=[
+ ],
+ serialized_start=1067,
+ serialized_end=1087,
+)
+
+
+_STAGETREEREQUEST = _descriptor.Descriptor(
+ name='StageTreeRequest',
+ full_name='build.buildgrid.StageTreeRequest',
+ filename=None,
+ file=DESCRIPTOR,
+ containing_type=None,
+ fields=[
+ _descriptor.FieldDescriptor(
+ name='instance_name', full_name='build.buildgrid.StageTreeRequest.instance_name', index=0,
+ number=1, type=9, cpp_type=9, label=1,
+ has_default_value=False, default_value=_b("").decode('utf-8'),
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ serialized_options=None, file=DESCRIPTOR),
+ _descriptor.FieldDescriptor(
+ name='root_digest', full_name='build.buildgrid.StageTreeRequest.root_digest', index=1,
+ number=2, type=11, cpp_type=10, label=1,
+ has_default_value=False, default_value=None,
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ serialized_options=None, file=DESCRIPTOR),
+ _descriptor.FieldDescriptor(
+ name='path', full_name='build.buildgrid.StageTreeRequest.path', index=2,
+ number=3, type=9, cpp_type=9, label=1,
+ has_default_value=False, default_value=_b("").decode('utf-8'),
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ serialized_options=None, file=DESCRIPTOR),
+ ],
+ extensions=[
+ ],
+ nested_types=[],
+ enum_types=[
+ ],
+ serialized_options=None,
+ is_extendable=False,
+ syntax='proto3',
+ extension_ranges=[],
+ oneofs=[
+ ],
+ serialized_start=1089,
+ serialized_end=1206,
+)
+
+
+_STAGETREERESPONSE = _descriptor.Descriptor(
+ name='StageTreeResponse',
+ full_name='build.buildgrid.StageTreeResponse',
+ filename=None,
+ file=DESCRIPTOR,
+ containing_type=None,
+ fields=[
+ _descriptor.FieldDescriptor(
+ name='path', full_name='build.buildgrid.StageTreeResponse.path', index=0,
+ number=1, type=9, cpp_type=9, label=1,
+ has_default_value=False, default_value=_b("").decode('utf-8'),
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ serialized_options=None, file=DESCRIPTOR),
+ ],
+ extensions=[
+ ],
+ nested_types=[],
+ enum_types=[
+ ],
+ serialized_options=None,
+ is_extendable=False,
+ syntax='proto3',
+ extension_ranges=[],
+ oneofs=[
+ ],
+ serialized_start=1208,
+ serialized_end=1241,
+)
+
+
+_CAPTURETREEREQUEST = _descriptor.Descriptor(
+ name='CaptureTreeRequest',
+ full_name='build.buildgrid.CaptureTreeRequest',
+ filename=None,
+ file=DESCRIPTOR,
+ containing_type=None,
+ fields=[
+ _descriptor.FieldDescriptor(
+ name='instance_name', full_name='build.buildgrid.CaptureTreeRequest.instance_name', index=0,
+ number=1, type=9, cpp_type=9, label=1,
+ has_default_value=False, default_value=_b("").decode('utf-8'),
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ serialized_options=None, file=DESCRIPTOR),
+ _descriptor.FieldDescriptor(
+ name='path', full_name='build.buildgrid.CaptureTreeRequest.path', index=1,
+ number=2, type=9, cpp_type=9, label=3,
+ has_default_value=False, default_value=[],
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ serialized_options=None, file=DESCRIPTOR),
+ _descriptor.FieldDescriptor(
+ name='bypass_local_cache', full_name='build.buildgrid.CaptureTreeRequest.bypass_local_cache', index=2,
+ number=3, type=8, cpp_type=7, label=1,
+ has_default_value=False, default_value=False,
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ serialized_options=None, file=DESCRIPTOR),
+ ],
+ extensions=[
+ ],
+ nested_types=[],
+ enum_types=[
+ ],
+ serialized_options=None,
+ is_extendable=False,
+ syntax='proto3',
+ extension_ranges=[],
+ oneofs=[
+ ],
+ serialized_start=1243,
+ serialized_end=1328,
+)
+
+
+_CAPTURETREERESPONSE_RESPONSE = _descriptor.Descriptor(
+ name='Response',
+ full_name='build.buildgrid.CaptureTreeResponse.Response',
+ filename=None,
+ file=DESCRIPTOR,
+ containing_type=None,
+ fields=[
+ _descriptor.FieldDescriptor(
+ name='path', full_name='build.buildgrid.CaptureTreeResponse.Response.path', index=0,
+ number=1, type=9, cpp_type=9, label=1,
+ has_default_value=False, default_value=_b("").decode('utf-8'),
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ serialized_options=None, file=DESCRIPTOR),
+ _descriptor.FieldDescriptor(
+ name='tree_digest', full_name='build.buildgrid.CaptureTreeResponse.Response.tree_digest', index=1,
+ number=2, type=11, cpp_type=10, label=1,
+ has_default_value=False, default_value=None,
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ serialized_options=None, file=DESCRIPTOR),
+ _descriptor.FieldDescriptor(
+ name='status', full_name='build.buildgrid.CaptureTreeResponse.Response.status', index=2,
+ number=3, type=11, cpp_type=10, label=1,
+ has_default_value=False, default_value=None,
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ serialized_options=None, file=DESCRIPTOR),
+ ],
+ extensions=[
+ ],
+ nested_types=[],
+ enum_types=[
+ ],
+ serialized_options=None,
+ is_extendable=False,
+ syntax='proto3',
+ extension_ranges=[],
+ oneofs=[
+ ],
+ serialized_start=1420,
+ serialized_end=1542,
+)
+
+_CAPTURETREERESPONSE = _descriptor.Descriptor(
+ name='CaptureTreeResponse',
+ full_name='build.buildgrid.CaptureTreeResponse',
+ filename=None,
+ file=DESCRIPTOR,
+ containing_type=None,
+ fields=[
+ _descriptor.FieldDescriptor(
+ name='responses', full_name='build.buildgrid.CaptureTreeResponse.responses', index=0,
+ number=1, type=11, cpp_type=10, label=3,
+ has_default_value=False, default_value=[],
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ serialized_options=None, file=DESCRIPTOR),
+ ],
+ extensions=[
+ ],
+ nested_types=[_CAPTURETREERESPONSE_RESPONSE, ],
+ enum_types=[
+ ],
+ serialized_options=None,
+ is_extendable=False,
+ syntax='proto3',
+ extension_ranges=[],
+ oneofs=[
+ ],
+ serialized_start=1331,
+ serialized_end=1542,
+)
+
+
+_CAPTUREFILESREQUEST = _descriptor.Descriptor(
+ name='CaptureFilesRequest',
+ full_name='build.buildgrid.CaptureFilesRequest',
+ filename=None,
+ file=DESCRIPTOR,
+ containing_type=None,
+ fields=[
+ _descriptor.FieldDescriptor(
+ name='instance_name', full_name='build.buildgrid.CaptureFilesRequest.instance_name', index=0,
+ number=1, type=9, cpp_type=9, label=1,
+ has_default_value=False, default_value=_b("").decode('utf-8'),
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ serialized_options=None, file=DESCRIPTOR),
+ _descriptor.FieldDescriptor(
+ name='path', full_name='build.buildgrid.CaptureFilesRequest.path', index=1,
+ number=2, type=9, cpp_type=9, label=3,
+ has_default_value=False, default_value=[],
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ serialized_options=None, file=DESCRIPTOR),
+ _descriptor.FieldDescriptor(
+ name='bypass_local_cache', full_name='build.buildgrid.CaptureFilesRequest.bypass_local_cache', index=2,
+ number=3, type=8, cpp_type=7, label=1,
+ has_default_value=False, default_value=False,
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ serialized_options=None, file=DESCRIPTOR),
+ ],
+ extensions=[
+ ],
+ nested_types=[],
+ enum_types=[
+ ],
+ serialized_options=None,
+ is_extendable=False,
+ syntax='proto3',
+ extension_ranges=[],
+ oneofs=[
+ ],
+ serialized_start=1544,
+ serialized_end=1630,
+)
+
+
+_CAPTUREFILESRESPONSE_RESPONSE = _descriptor.Descriptor(
+ name='Response',
+ full_name='build.buildgrid.CaptureFilesResponse.Response',
+ filename=None,
+ file=DESCRIPTOR,
+ containing_type=None,
+ fields=[
+ _descriptor.FieldDescriptor(
+ name='path', full_name='build.buildgrid.CaptureFilesResponse.Response.path', index=0,
+ number=1, type=9, cpp_type=9, label=1,
+ has_default_value=False, default_value=_b("").decode('utf-8'),
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ serialized_options=None, file=DESCRIPTOR),
+ _descriptor.FieldDescriptor(
+ name='digest', full_name='build.buildgrid.CaptureFilesResponse.Response.digest', index=1,
+ number=2, type=11, cpp_type=10, label=1,
+ has_default_value=False, default_value=None,
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ serialized_options=None, file=DESCRIPTOR),
+ _descriptor.FieldDescriptor(
+ name='status', full_name='build.buildgrid.CaptureFilesResponse.Response.status', index=2,
+ number=3, type=11, cpp_type=10, label=1,
+ has_default_value=False, default_value=None,
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ serialized_options=None, file=DESCRIPTOR),
+ ],
+ extensions=[
+ ],
+ nested_types=[],
+ enum_types=[
+ ],
+ serialized_options=None,
+ is_extendable=False,
+ syntax='proto3',
+ extension_ranges=[],
+ oneofs=[
+ ],
+ serialized_start=1724,
+ serialized_end=1841,
+)
+
+_CAPTUREFILESRESPONSE = _descriptor.Descriptor(
+ name='CaptureFilesResponse',
+ full_name='build.buildgrid.CaptureFilesResponse',
+ filename=None,
+ file=DESCRIPTOR,
+ containing_type=None,
+ fields=[
+ _descriptor.FieldDescriptor(
+ name='responses', full_name='build.buildgrid.CaptureFilesResponse.responses', index=0,
+ number=1, type=11, cpp_type=10, label=3,
+ has_default_value=False, default_value=[],
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ serialized_options=None, file=DESCRIPTOR),
+ ],
+ extensions=[
+ ],
+ nested_types=[_CAPTUREFILESRESPONSE_RESPONSE, ],
+ enum_types=[
+ ],
+ serialized_options=None,
+ is_extendable=False,
+ syntax='proto3',
+ extension_ranges=[],
+ oneofs=[
+ ],
+ serialized_start=1633,
+ serialized_end=1841,
+)
+
+
+_GETINSTANCENAMEFORREMOTEREQUEST = _descriptor.Descriptor(
+ name='GetInstanceNameForRemoteRequest',
+ full_name='build.buildgrid.GetInstanceNameForRemoteRequest',
+ filename=None,
+ file=DESCRIPTOR,
+ containing_type=None,
+ fields=[
+ _descriptor.FieldDescriptor(
+ name='url', full_name='build.buildgrid.GetInstanceNameForRemoteRequest.url', index=0,
+ number=1, type=9, cpp_type=9, label=1,
+ has_default_value=False, default_value=_b("").decode('utf-8'),
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ serialized_options=None, file=DESCRIPTOR),
+ _descriptor.FieldDescriptor(
+ name='instance_name', full_name='build.buildgrid.GetInstanceNameForRemoteRequest.instance_name', index=1,
+ number=2, type=9, cpp_type=9, label=1,
+ has_default_value=False, default_value=_b("").decode('utf-8'),
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ serialized_options=None, file=DESCRIPTOR),
+ _descriptor.FieldDescriptor(
+ name='server_cert', full_name='build.buildgrid.GetInstanceNameForRemoteRequest.server_cert', index=2,
+ number=3, type=12, cpp_type=9, label=1,
+ has_default_value=False, default_value=_b(""),
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ serialized_options=None, file=DESCRIPTOR),
+ _descriptor.FieldDescriptor(
+ name='client_key', full_name='build.buildgrid.GetInstanceNameForRemoteRequest.client_key', index=3,
+ number=4, type=12, cpp_type=9, label=1,
+ has_default_value=False, default_value=_b(""),
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ serialized_options=None, file=DESCRIPTOR),
+ _descriptor.FieldDescriptor(
+ name='client_cert', full_name='build.buildgrid.GetInstanceNameForRemoteRequest.client_cert', index=4,
+ number=5, type=12, cpp_type=9, label=1,
+ has_default_value=False, default_value=_b(""),
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ serialized_options=None, file=DESCRIPTOR),
+ ],
+ extensions=[
+ ],
+ nested_types=[],
+ enum_types=[
+ ],
+ serialized_options=None,
+ is_extendable=False,
+ syntax='proto3',
+ extension_ranges=[],
+ oneofs=[
+ ],
+ serialized_start=1844,
+ serialized_end=1975,
+)
+
+
+_GETINSTANCENAMEFORREMOTERESPONSE = _descriptor.Descriptor(
+ name='GetInstanceNameForRemoteResponse',
+ full_name='build.buildgrid.GetInstanceNameForRemoteResponse',
+ filename=None,
+ file=DESCRIPTOR,
+ containing_type=None,
+ fields=[
+ _descriptor.FieldDescriptor(
+ name='instance_name', full_name='build.buildgrid.GetInstanceNameForRemoteResponse.instance_name', index=0,
+ number=1, type=9, cpp_type=9, label=1,
+ has_default_value=False, default_value=_b("").decode('utf-8'),
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ serialized_options=None, file=DESCRIPTOR),
+ ],
+ extensions=[
+ ],
+ nested_types=[],
+ enum_types=[
+ ],
+ serialized_options=None,
+ is_extendable=False,
+ syntax='proto3',
+ extension_ranges=[],
+ oneofs=[
+ ],
+ serialized_start=1977,
+ serialized_end=2034,
+)
+
+_FETCHMISSINGBLOBSREQUEST.fields_by_name['blob_digests'].message_type = build_dot_bazel_dot_remote_dot_execution_dot_v2_dot_remote__execution__pb2._DIGEST
+_FETCHMISSINGBLOBSRESPONSE_RESPONSE.fields_by_name['digest'].message_type = build_dot_bazel_dot_remote_dot_execution_dot_v2_dot_remote__execution__pb2._DIGEST
+_FETCHMISSINGBLOBSRESPONSE_RESPONSE.fields_by_name['status'].message_type = google_dot_rpc_dot_status__pb2._STATUS
+_FETCHMISSINGBLOBSRESPONSE_RESPONSE.containing_type = _FETCHMISSINGBLOBSRESPONSE
+_FETCHMISSINGBLOBSRESPONSE.fields_by_name['responses'].message_type = _FETCHMISSINGBLOBSRESPONSE_RESPONSE
+_UPLOADMISSINGBLOBSREQUEST.fields_by_name['blob_digests'].message_type = build_dot_bazel_dot_remote_dot_execution_dot_v2_dot_remote__execution__pb2._DIGEST
+_UPLOADMISSINGBLOBSRESPONSE_RESPONSE.fields_by_name['digest'].message_type = build_dot_bazel_dot_remote_dot_execution_dot_v2_dot_remote__execution__pb2._DIGEST
+_UPLOADMISSINGBLOBSRESPONSE_RESPONSE.fields_by_name['status'].message_type = google_dot_rpc_dot_status__pb2._STATUS
+_UPLOADMISSINGBLOBSRESPONSE_RESPONSE.containing_type = _UPLOADMISSINGBLOBSRESPONSE
+_UPLOADMISSINGBLOBSRESPONSE.fields_by_name['responses'].message_type = _UPLOADMISSINGBLOBSRESPONSE_RESPONSE
+_FETCHTREEREQUEST.fields_by_name['root_digest'].message_type = build_dot_bazel_dot_remote_dot_execution_dot_v2_dot_remote__execution__pb2._DIGEST
+_UPLOADTREEREQUEST.fields_by_name['root_digest'].message_type = build_dot_bazel_dot_remote_dot_execution_dot_v2_dot_remote__execution__pb2._DIGEST
+_STAGETREEREQUEST.fields_by_name['root_digest'].message_type = build_dot_bazel_dot_remote_dot_execution_dot_v2_dot_remote__execution__pb2._DIGEST
+_CAPTURETREERESPONSE_RESPONSE.fields_by_name['tree_digest'].message_type = build_dot_bazel_dot_remote_dot_execution_dot_v2_dot_remote__execution__pb2._DIGEST
+_CAPTURETREERESPONSE_RESPONSE.fields_by_name['status'].message_type = google_dot_rpc_dot_status__pb2._STATUS
+_CAPTURETREERESPONSE_RESPONSE.containing_type = _CAPTURETREERESPONSE
+_CAPTURETREERESPONSE.fields_by_name['responses'].message_type = _CAPTURETREERESPONSE_RESPONSE
+_CAPTUREFILESRESPONSE_RESPONSE.fields_by_name['digest'].message_type = build_dot_bazel_dot_remote_dot_execution_dot_v2_dot_remote__execution__pb2._DIGEST
+_CAPTUREFILESRESPONSE_RESPONSE.fields_by_name['status'].message_type = google_dot_rpc_dot_status__pb2._STATUS
+_CAPTUREFILESRESPONSE_RESPONSE.containing_type = _CAPTUREFILESRESPONSE
+_CAPTUREFILESRESPONSE.fields_by_name['responses'].message_type = _CAPTUREFILESRESPONSE_RESPONSE
+DESCRIPTOR.message_types_by_name['FetchMissingBlobsRequest'] = _FETCHMISSINGBLOBSREQUEST
+DESCRIPTOR.message_types_by_name['FetchMissingBlobsResponse'] = _FETCHMISSINGBLOBSRESPONSE
+DESCRIPTOR.message_types_by_name['UploadMissingBlobsRequest'] = _UPLOADMISSINGBLOBSREQUEST
+DESCRIPTOR.message_types_by_name['UploadMissingBlobsResponse'] = _UPLOADMISSINGBLOBSRESPONSE
+DESCRIPTOR.message_types_by_name['FetchTreeRequest'] = _FETCHTREEREQUEST
+DESCRIPTOR.message_types_by_name['FetchTreeResponse'] = _FETCHTREERESPONSE
+DESCRIPTOR.message_types_by_name['UploadTreeRequest'] = _UPLOADTREEREQUEST
+DESCRIPTOR.message_types_by_name['UploadTreeResponse'] = _UPLOADTREERESPONSE
+DESCRIPTOR.message_types_by_name['StageTreeRequest'] = _STAGETREEREQUEST
+DESCRIPTOR.message_types_by_name['StageTreeResponse'] = _STAGETREERESPONSE
+DESCRIPTOR.message_types_by_name['CaptureTreeRequest'] = _CAPTURETREEREQUEST
+DESCRIPTOR.message_types_by_name['CaptureTreeResponse'] = _CAPTURETREERESPONSE
+DESCRIPTOR.message_types_by_name['CaptureFilesRequest'] = _CAPTUREFILESREQUEST
+DESCRIPTOR.message_types_by_name['CaptureFilesResponse'] = _CAPTUREFILESRESPONSE
+DESCRIPTOR.message_types_by_name['GetInstanceNameForRemoteRequest'] = _GETINSTANCENAMEFORREMOTEREQUEST
+DESCRIPTOR.message_types_by_name['GetInstanceNameForRemoteResponse'] = _GETINSTANCENAMEFORREMOTERESPONSE
+_sym_db.RegisterFileDescriptor(DESCRIPTOR)
+
+FetchMissingBlobsRequest = _reflection.GeneratedProtocolMessageType('FetchMissingBlobsRequest', (_message.Message,), dict(
+ DESCRIPTOR = _FETCHMISSINGBLOBSREQUEST,
+ __module__ = 'build.buildgrid.local_cas_pb2'
+ # @@protoc_insertion_point(class_scope:build.buildgrid.FetchMissingBlobsRequest)
+ ))
+_sym_db.RegisterMessage(FetchMissingBlobsRequest)
+
+FetchMissingBlobsResponse = _reflection.GeneratedProtocolMessageType('FetchMissingBlobsResponse', (_message.Message,), dict(
+
+ Response = _reflection.GeneratedProtocolMessageType('Response', (_message.Message,), dict(
+ DESCRIPTOR = _FETCHMISSINGBLOBSRESPONSE_RESPONSE,
+ __module__ = 'build.buildgrid.local_cas_pb2'
+ # @@protoc_insertion_point(class_scope:build.buildgrid.FetchMissingBlobsResponse.Response)
+ ))
+ ,
+ DESCRIPTOR = _FETCHMISSINGBLOBSRESPONSE,
+ __module__ = 'build.buildgrid.local_cas_pb2'
+ # @@protoc_insertion_point(class_scope:build.buildgrid.FetchMissingBlobsResponse)
+ ))
+_sym_db.RegisterMessage(FetchMissingBlobsResponse)
+_sym_db.RegisterMessage(FetchMissingBlobsResponse.Response)
+
+UploadMissingBlobsRequest = _reflection.GeneratedProtocolMessageType('UploadMissingBlobsRequest', (_message.Message,), dict(
+ DESCRIPTOR = _UPLOADMISSINGBLOBSREQUEST,
+ __module__ = 'build.buildgrid.local_cas_pb2'
+ # @@protoc_insertion_point(class_scope:build.buildgrid.UploadMissingBlobsRequest)
+ ))
+_sym_db.RegisterMessage(UploadMissingBlobsRequest)
+
+UploadMissingBlobsResponse = _reflection.GeneratedProtocolMessageType('UploadMissingBlobsResponse', (_message.Message,), dict(
+
+ Response = _reflection.GeneratedProtocolMessageType('Response', (_message.Message,), dict(
+ DESCRIPTOR = _UPLOADMISSINGBLOBSRESPONSE_RESPONSE,
+ __module__ = 'build.buildgrid.local_cas_pb2'
+ # @@protoc_insertion_point(class_scope:build.buildgrid.UploadMissingBlobsResponse.Response)
+ ))
+ ,
+ DESCRIPTOR = _UPLOADMISSINGBLOBSRESPONSE,
+ __module__ = 'build.buildgrid.local_cas_pb2'
+ # @@protoc_insertion_point(class_scope:build.buildgrid.UploadMissingBlobsResponse)
+ ))
+_sym_db.RegisterMessage(UploadMissingBlobsResponse)
+_sym_db.RegisterMessage(UploadMissingBlobsResponse.Response)
+
+FetchTreeRequest = _reflection.GeneratedProtocolMessageType('FetchTreeRequest', (_message.Message,), dict(
+ DESCRIPTOR = _FETCHTREEREQUEST,
+ __module__ = 'build.buildgrid.local_cas_pb2'
+ # @@protoc_insertion_point(class_scope:build.buildgrid.FetchTreeRequest)
+ ))
+_sym_db.RegisterMessage(FetchTreeRequest)
+
+FetchTreeResponse = _reflection.GeneratedProtocolMessageType('FetchTreeResponse', (_message.Message,), dict(
+ DESCRIPTOR = _FETCHTREERESPONSE,
+ __module__ = 'build.buildgrid.local_cas_pb2'
+ # @@protoc_insertion_point(class_scope:build.buildgrid.FetchTreeResponse)
+ ))
+_sym_db.RegisterMessage(FetchTreeResponse)
+
+UploadTreeRequest = _reflection.GeneratedProtocolMessageType('UploadTreeRequest', (_message.Message,), dict(
+ DESCRIPTOR = _UPLOADTREEREQUEST,
+ __module__ = 'build.buildgrid.local_cas_pb2'
+ # @@protoc_insertion_point(class_scope:build.buildgrid.UploadTreeRequest)
+ ))
+_sym_db.RegisterMessage(UploadTreeRequest)
+
+UploadTreeResponse = _reflection.GeneratedProtocolMessageType('UploadTreeResponse', (_message.Message,), dict(
+ DESCRIPTOR = _UPLOADTREERESPONSE,
+ __module__ = 'build.buildgrid.local_cas_pb2'
+ # @@protoc_insertion_point(class_scope:build.buildgrid.UploadTreeResponse)
+ ))
+_sym_db.RegisterMessage(UploadTreeResponse)
+
+StageTreeRequest = _reflection.GeneratedProtocolMessageType('StageTreeRequest', (_message.Message,), dict(
+ DESCRIPTOR = _STAGETREEREQUEST,
+ __module__ = 'build.buildgrid.local_cas_pb2'
+ # @@protoc_insertion_point(class_scope:build.buildgrid.StageTreeRequest)
+ ))
+_sym_db.RegisterMessage(StageTreeRequest)
+
+StageTreeResponse = _reflection.GeneratedProtocolMessageType('StageTreeResponse', (_message.Message,), dict(
+ DESCRIPTOR = _STAGETREERESPONSE,
+ __module__ = 'build.buildgrid.local_cas_pb2'
+ # @@protoc_insertion_point(class_scope:build.buildgrid.StageTreeResponse)
+ ))
+_sym_db.RegisterMessage(StageTreeResponse)
+
+CaptureTreeRequest = _reflection.GeneratedProtocolMessageType('CaptureTreeRequest', (_message.Message,), dict(
+ DESCRIPTOR = _CAPTURETREEREQUEST,
+ __module__ = 'build.buildgrid.local_cas_pb2'
+ # @@protoc_insertion_point(class_scope:build.buildgrid.CaptureTreeRequest)
+ ))
+_sym_db.RegisterMessage(CaptureTreeRequest)
+
+CaptureTreeResponse = _reflection.GeneratedProtocolMessageType('CaptureTreeResponse', (_message.Message,), dict(
+
+ Response = _reflection.GeneratedProtocolMessageType('Response', (_message.Message,), dict(
+ DESCRIPTOR = _CAPTURETREERESPONSE_RESPONSE,
+ __module__ = 'build.buildgrid.local_cas_pb2'
+ # @@protoc_insertion_point(class_scope:build.buildgrid.CaptureTreeResponse.Response)
+ ))
+ ,
+ DESCRIPTOR = _CAPTURETREERESPONSE,
+ __module__ = 'build.buildgrid.local_cas_pb2'
+ # @@protoc_insertion_point(class_scope:build.buildgrid.CaptureTreeResponse)
+ ))
+_sym_db.RegisterMessage(CaptureTreeResponse)
+_sym_db.RegisterMessage(CaptureTreeResponse.Response)
+
+CaptureFilesRequest = _reflection.GeneratedProtocolMessageType('CaptureFilesRequest', (_message.Message,), dict(
+ DESCRIPTOR = _CAPTUREFILESREQUEST,
+ __module__ = 'build.buildgrid.local_cas_pb2'
+ # @@protoc_insertion_point(class_scope:build.buildgrid.CaptureFilesRequest)
+ ))
+_sym_db.RegisterMessage(CaptureFilesRequest)
+
+CaptureFilesResponse = _reflection.GeneratedProtocolMessageType('CaptureFilesResponse', (_message.Message,), dict(
+
+ Response = _reflection.GeneratedProtocolMessageType('Response', (_message.Message,), dict(
+ DESCRIPTOR = _CAPTUREFILESRESPONSE_RESPONSE,
+ __module__ = 'build.buildgrid.local_cas_pb2'
+ # @@protoc_insertion_point(class_scope:build.buildgrid.CaptureFilesResponse.Response)
+ ))
+ ,
+ DESCRIPTOR = _CAPTUREFILESRESPONSE,
+ __module__ = 'build.buildgrid.local_cas_pb2'
+ # @@protoc_insertion_point(class_scope:build.buildgrid.CaptureFilesResponse)
+ ))
+_sym_db.RegisterMessage(CaptureFilesResponse)
+_sym_db.RegisterMessage(CaptureFilesResponse.Response)
+
+GetInstanceNameForRemoteRequest = _reflection.GeneratedProtocolMessageType('GetInstanceNameForRemoteRequest', (_message.Message,), dict(
+ DESCRIPTOR = _GETINSTANCENAMEFORREMOTEREQUEST,
+ __module__ = 'build.buildgrid.local_cas_pb2'
+ # @@protoc_insertion_point(class_scope:build.buildgrid.GetInstanceNameForRemoteRequest)
+ ))
+_sym_db.RegisterMessage(GetInstanceNameForRemoteRequest)
+
+GetInstanceNameForRemoteResponse = _reflection.GeneratedProtocolMessageType('GetInstanceNameForRemoteResponse', (_message.Message,), dict(
+ DESCRIPTOR = _GETINSTANCENAMEFORREMOTERESPONSE,
+ __module__ = 'build.buildgrid.local_cas_pb2'
+ # @@protoc_insertion_point(class_scope:build.buildgrid.GetInstanceNameForRemoteResponse)
+ ))
+_sym_db.RegisterMessage(GetInstanceNameForRemoteResponse)
+
+
+
+_LOCALCONTENTADDRESSABLESTORAGE = _descriptor.ServiceDescriptor(
+ name='LocalContentAddressableStorage',
+ full_name='build.buildgrid.LocalContentAddressableStorage',
+ file=DESCRIPTOR,
+ index=0,
+ serialized_options=None,
+ serialized_start=2037,
+ serialized_end=2876,
+ methods=[
+ _descriptor.MethodDescriptor(
+ name='FetchMissingBlobs',
+ full_name='build.buildgrid.LocalContentAddressableStorage.FetchMissingBlobs',
+ index=0,
+ containing_service=None,
+ input_type=_FETCHMISSINGBLOBSREQUEST,
+ output_type=_FETCHMISSINGBLOBSRESPONSE,
+ serialized_options=None,
+ ),
+ _descriptor.MethodDescriptor(
+ name='UploadMissingBlobs',
+ full_name='build.buildgrid.LocalContentAddressableStorage.UploadMissingBlobs',
+ index=1,
+ containing_service=None,
+ input_type=_UPLOADMISSINGBLOBSREQUEST,
+ output_type=_UPLOADMISSINGBLOBSRESPONSE,
+ serialized_options=None,
+ ),
+ _descriptor.MethodDescriptor(
+ name='FetchTree',
+ full_name='build.buildgrid.LocalContentAddressableStorage.FetchTree',
+ index=2,
+ containing_service=None,
+ input_type=_FETCHTREEREQUEST,
+ output_type=_FETCHTREERESPONSE,
+ serialized_options=None,
+ ),
+ _descriptor.MethodDescriptor(
+ name='UploadTree',
+ full_name='build.buildgrid.LocalContentAddressableStorage.UploadTree',
+ index=3,
+ containing_service=None,
+ input_type=_UPLOADTREEREQUEST,
+ output_type=_UPLOADTREERESPONSE,
+ serialized_options=None,
+ ),
+ _descriptor.MethodDescriptor(
+ name='StageTree',
+ full_name='build.buildgrid.LocalContentAddressableStorage.StageTree',
+ index=4,
+ containing_service=None,
+ input_type=_STAGETREEREQUEST,
+ output_type=_STAGETREERESPONSE,
+ serialized_options=None,
+ ),
+ _descriptor.MethodDescriptor(
+ name='CaptureTree',
+ full_name='build.buildgrid.LocalContentAddressableStorage.CaptureTree',
+ index=5,
+ containing_service=None,
+ input_type=_CAPTURETREEREQUEST,
+ output_type=_CAPTURETREERESPONSE,
+ serialized_options=None,
+ ),
+ _descriptor.MethodDescriptor(
+ name='CaptureFiles',
+ full_name='build.buildgrid.LocalContentAddressableStorage.CaptureFiles',
+ index=6,
+ containing_service=None,
+ input_type=_CAPTUREFILESREQUEST,
+ output_type=_CAPTUREFILESRESPONSE,
+ serialized_options=None,
+ ),
+ _descriptor.MethodDescriptor(
+ name='GetInstanceNameForRemote',
+ full_name='build.buildgrid.LocalContentAddressableStorage.GetInstanceNameForRemote',
+ index=7,
+ containing_service=None,
+ input_type=_GETINSTANCENAMEFORREMOTEREQUEST,
+ output_type=_GETINSTANCENAMEFORREMOTERESPONSE,
+ serialized_options=None,
+ ),
+])
+_sym_db.RegisterServiceDescriptor(_LOCALCONTENTADDRESSABLESTORAGE)
+
+DESCRIPTOR.services_by_name['LocalContentAddressableStorage'] = _LOCALCONTENTADDRESSABLESTORAGE
+
+# @@protoc_insertion_point(module_scope)
diff --git a/src/buildstream/_protos/build/buildgrid/local_cas_pb2_grpc.py b/src/buildstream/_protos/build/buildgrid/local_cas_pb2_grpc.py
new file mode 100644
index 000000000..7ea06a3a7
--- /dev/null
+++ b/src/buildstream/_protos/build/buildgrid/local_cas_pb2_grpc.py
@@ -0,0 +1,238 @@
+# Generated by the gRPC Python protocol compiler plugin. DO NOT EDIT!
+import grpc
+
+from buildstream._protos.build.buildgrid import local_cas_pb2 as build_dot_buildgrid_dot_local__cas__pb2
+
+
+class LocalContentAddressableStorageStub(object):
+ # missing associated documentation comment in .proto file
+ pass
+
+ def __init__(self, channel):
+ """Constructor.
+
+ Args:
+ channel: A grpc.Channel.
+ """
+ self.FetchMissingBlobs = channel.unary_unary(
+ '/build.buildgrid.LocalContentAddressableStorage/FetchMissingBlobs',
+ request_serializer=build_dot_buildgrid_dot_local__cas__pb2.FetchMissingBlobsRequest.SerializeToString,
+ response_deserializer=build_dot_buildgrid_dot_local__cas__pb2.FetchMissingBlobsResponse.FromString,
+ )
+ self.UploadMissingBlobs = channel.unary_unary(
+ '/build.buildgrid.LocalContentAddressableStorage/UploadMissingBlobs',
+ request_serializer=build_dot_buildgrid_dot_local__cas__pb2.UploadMissingBlobsRequest.SerializeToString,
+ response_deserializer=build_dot_buildgrid_dot_local__cas__pb2.UploadMissingBlobsResponse.FromString,
+ )
+ self.FetchTree = channel.unary_unary(
+ '/build.buildgrid.LocalContentAddressableStorage/FetchTree',
+ request_serializer=build_dot_buildgrid_dot_local__cas__pb2.FetchTreeRequest.SerializeToString,
+ response_deserializer=build_dot_buildgrid_dot_local__cas__pb2.FetchTreeResponse.FromString,
+ )
+ self.UploadTree = channel.unary_unary(
+ '/build.buildgrid.LocalContentAddressableStorage/UploadTree',
+ request_serializer=build_dot_buildgrid_dot_local__cas__pb2.UploadTreeRequest.SerializeToString,
+ response_deserializer=build_dot_buildgrid_dot_local__cas__pb2.UploadTreeResponse.FromString,
+ )
+ self.StageTree = channel.stream_stream(
+ '/build.buildgrid.LocalContentAddressableStorage/StageTree',
+ request_serializer=build_dot_buildgrid_dot_local__cas__pb2.StageTreeRequest.SerializeToString,
+ response_deserializer=build_dot_buildgrid_dot_local__cas__pb2.StageTreeResponse.FromString,
+ )
+ self.CaptureTree = channel.unary_unary(
+ '/build.buildgrid.LocalContentAddressableStorage/CaptureTree',
+ request_serializer=build_dot_buildgrid_dot_local__cas__pb2.CaptureTreeRequest.SerializeToString,
+ response_deserializer=build_dot_buildgrid_dot_local__cas__pb2.CaptureTreeResponse.FromString,
+ )
+ self.CaptureFiles = channel.unary_unary(
+ '/build.buildgrid.LocalContentAddressableStorage/CaptureFiles',
+ request_serializer=build_dot_buildgrid_dot_local__cas__pb2.CaptureFilesRequest.SerializeToString,
+ response_deserializer=build_dot_buildgrid_dot_local__cas__pb2.CaptureFilesResponse.FromString,
+ )
+ self.GetInstanceNameForRemote = channel.unary_unary(
+ '/build.buildgrid.LocalContentAddressableStorage/GetInstanceNameForRemote',
+ request_serializer=build_dot_buildgrid_dot_local__cas__pb2.GetInstanceNameForRemoteRequest.SerializeToString,
+ response_deserializer=build_dot_buildgrid_dot_local__cas__pb2.GetInstanceNameForRemoteResponse.FromString,
+ )
+
+
+class LocalContentAddressableStorageServicer(object):
+ # missing associated documentation comment in .proto file
+ pass
+
+ def FetchMissingBlobs(self, request, context):
+ """Fetch blobs from a remote CAS to the local cache.
+
+ This request is equivalent to ByteStream `Read` or `BatchReadBlobs`
+ requests, storing the downloaded blobs in the local cache.
+
+ Requested blobs that failed to be downloaded will be listed in the
+ response.
+
+ Errors:
+ * `INVALID_ARGUMENT`: The client attempted to download more than the
+ server supported limit.
+
+ Individual requests may return the following error, additionally:
+ * `NOT_FOUND`: The requested blob is not present in the remote CAS.
+ """
+ context.set_code(grpc.StatusCode.UNIMPLEMENTED)
+ context.set_details('Method not implemented!')
+ raise NotImplementedError('Method not implemented!')
+
+ def UploadMissingBlobs(self, request, context):
+ """Upload blobs from the local cache to a remote CAS.
+
+ This request is equivalent to `FindMissingBlobs` followed by
+ ByteStream `Write` or `BatchUpdateBlobs` requests.
+
+ Blobs that failed to be uploaded will be listed in the response.
+
+ Errors:
+ * `INVALID_ARGUMENT`: The client attempted to upload more than the
+ server supported limit.
+
+ Individual requests may return the following error, additionally:
+ * `NOT_FOUND`: The requested blob is not present in the local cache.
+ * `RESOURCE_EXHAUSTED`: There is insufficient disk quota to store the blob.
+ """
+ context.set_code(grpc.StatusCode.UNIMPLEMENTED)
+ context.set_details('Method not implemented!')
+ raise NotImplementedError('Method not implemented!')
+
+ def FetchTree(self, request, context):
+ """Fetch the entire directory tree rooted at a node from a remote CAS to the
+ local cache.
+
+ This request is equivalent to `GetTree`, storing the `Directory` objects
+ in the local cache. Optionally, this will also fetch all blobs referenced
+ by the `Directory` objects, equivalent to `FetchMissingBlobs`.
+
+ If part of the tree is missing from the CAS, the server will return the
+ portion present and omit the rest.
+
+ * `NOT_FOUND`: The requested tree root is not present in the CAS.
+ """
+ context.set_code(grpc.StatusCode.UNIMPLEMENTED)
+ context.set_details('Method not implemented!')
+ raise NotImplementedError('Method not implemented!')
+
+ def UploadTree(self, request, context):
+ """Upload the entire directory tree from the local cache to a remote CAS.
+
+ This request is equivalent to `UploadMissingBlobs` for all blobs
+ referenced by the specified tree (recursively).
+
+ Errors:
+ * `NOT_FOUND`: The requested tree root is not present in the local cache.
+ * `RESOURCE_EXHAUSTED`: There is insufficient disk quota to store the tree.
+ """
+ context.set_code(grpc.StatusCode.UNIMPLEMENTED)
+ context.set_details('Method not implemented!')
+ raise NotImplementedError('Method not implemented!')
+
+ def StageTree(self, request_iterator, context):
+ """Stage a directory tree in the local filesystem.
+
+ This makes the specified directory tree temporarily available for local
+ filesystem access. It is implementation-defined whether this uses a
+ userspace filesystem such as FUSE, hardlinking or a full copy.
+
+ Missing blobs are fetched, if a CAS remote is configured.
+
+ The staging starts when the server receives the initial request and
+ it is ready to be used on the initial (non-error) response from the
+ server.
+
+ The server will clean up the staged directory when it either
+ receives an additional request (with all fields unset) or when the
+ stream is closed. The server will send an additional response after
+ cleanup is complete.
+ """
+ context.set_code(grpc.StatusCode.UNIMPLEMENTED)
+ context.set_details('Method not implemented!')
+ raise NotImplementedError('Method not implemented!')
+
+ def CaptureTree(self, request, context):
+ """Capture a directory tree from the local filesystem.
+
+ This imports the specified path from the local filesystem into CAS.
+
+ If a CAS remote is configured, the blobs are uploaded.
+ The `bypass_local_cache` parameter is a hint to indicate whether the blobs
+ shall be uploaded without first storing them in the local cache.
+ """
+ context.set_code(grpc.StatusCode.UNIMPLEMENTED)
+ context.set_details('Method not implemented!')
+ raise NotImplementedError('Method not implemented!')
+
+ def CaptureFiles(self, request, context):
+ """Capture files from the local filesystem.
+
+ This imports the specified paths from the local filesystem into CAS.
+
+ If a CAS remote is configured, the blobs are uploaded.
+ The `bypass_local_cache` parameter is a hint to indicate whether the blobs
+ shall be uploaded without first storing them in the local cache.
+ """
+ context.set_code(grpc.StatusCode.UNIMPLEMENTED)
+ context.set_details('Method not implemented!')
+ raise NotImplementedError('Method not implemented!')
+
+ def GetInstanceNameForRemote(self, request, context):
+ """Configure remote CAS endpoint.
+
+ This returns a string that can be used as instance_name to access the
+ specified endpoint in further requests.
+ """
+ context.set_code(grpc.StatusCode.UNIMPLEMENTED)
+ context.set_details('Method not implemented!')
+ raise NotImplementedError('Method not implemented!')
+
+
+def add_LocalContentAddressableStorageServicer_to_server(servicer, server):
+ rpc_method_handlers = {
+ 'FetchMissingBlobs': grpc.unary_unary_rpc_method_handler(
+ servicer.FetchMissingBlobs,
+ request_deserializer=build_dot_buildgrid_dot_local__cas__pb2.FetchMissingBlobsRequest.FromString,
+ response_serializer=build_dot_buildgrid_dot_local__cas__pb2.FetchMissingBlobsResponse.SerializeToString,
+ ),
+ 'UploadMissingBlobs': grpc.unary_unary_rpc_method_handler(
+ servicer.UploadMissingBlobs,
+ request_deserializer=build_dot_buildgrid_dot_local__cas__pb2.UploadMissingBlobsRequest.FromString,
+ response_serializer=build_dot_buildgrid_dot_local__cas__pb2.UploadMissingBlobsResponse.SerializeToString,
+ ),
+ 'FetchTree': grpc.unary_unary_rpc_method_handler(
+ servicer.FetchTree,
+ request_deserializer=build_dot_buildgrid_dot_local__cas__pb2.FetchTreeRequest.FromString,
+ response_serializer=build_dot_buildgrid_dot_local__cas__pb2.FetchTreeResponse.SerializeToString,
+ ),
+ 'UploadTree': grpc.unary_unary_rpc_method_handler(
+ servicer.UploadTree,
+ request_deserializer=build_dot_buildgrid_dot_local__cas__pb2.UploadTreeRequest.FromString,
+ response_serializer=build_dot_buildgrid_dot_local__cas__pb2.UploadTreeResponse.SerializeToString,
+ ),
+ 'StageTree': grpc.stream_stream_rpc_method_handler(
+ servicer.StageTree,
+ request_deserializer=build_dot_buildgrid_dot_local__cas__pb2.StageTreeRequest.FromString,
+ response_serializer=build_dot_buildgrid_dot_local__cas__pb2.StageTreeResponse.SerializeToString,
+ ),
+ 'CaptureTree': grpc.unary_unary_rpc_method_handler(
+ servicer.CaptureTree,
+ request_deserializer=build_dot_buildgrid_dot_local__cas__pb2.CaptureTreeRequest.FromString,
+ response_serializer=build_dot_buildgrid_dot_local__cas__pb2.CaptureTreeResponse.SerializeToString,
+ ),
+ 'CaptureFiles': grpc.unary_unary_rpc_method_handler(
+ servicer.CaptureFiles,
+ request_deserializer=build_dot_buildgrid_dot_local__cas__pb2.CaptureFilesRequest.FromString,
+ response_serializer=build_dot_buildgrid_dot_local__cas__pb2.CaptureFilesResponse.SerializeToString,
+ ),
+ 'GetInstanceNameForRemote': grpc.unary_unary_rpc_method_handler(
+ servicer.GetInstanceNameForRemote,
+ request_deserializer=build_dot_buildgrid_dot_local__cas__pb2.GetInstanceNameForRemoteRequest.FromString,
+ response_serializer=build_dot_buildgrid_dot_local__cas__pb2.GetInstanceNameForRemoteResponse.SerializeToString,
+ ),
+ }
+ generic_handler = grpc.method_handlers_generic_handler(
+ 'build.buildgrid.LocalContentAddressableStorage', rpc_method_handlers)
+ server.add_generic_rpc_handlers((generic_handler,))
diff --git a/src/buildstream/_scheduler/jobs/__init__.py b/src/buildstream/_scheduler/jobs/__init__.py
index 3e213171a..9f081c8a0 100644
--- a/src/buildstream/_scheduler/jobs/__init__.py
+++ b/src/buildstream/_scheduler/jobs/__init__.py
@@ -18,6 +18,4 @@
# Tristan Maat <tristan.maat@codethink.co.uk>
from .elementjob import ElementJob
-from .cachesizejob import CacheSizeJob
-from .cleanupjob import CleanupJob
from .job import JobStatus
diff --git a/src/buildstream/_scheduler/jobs/cachesizejob.py b/src/buildstream/_scheduler/jobs/cachesizejob.py
deleted file mode 100644
index 581101c07..000000000
--- a/src/buildstream/_scheduler/jobs/cachesizejob.py
+++ /dev/null
@@ -1,48 +0,0 @@
-# Copyright (C) 2018 Codethink Limited
-#
-# This program is free software; you can redistribute it and/or
-# modify it under the terms of the GNU Lesser General Public
-# License as published by the Free Software Foundation; either
-# version 2 of the License, or (at your option) any later version.
-#
-# This library is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
-# Lesser General Public License for more details.
-#
-# You should have received a copy of the GNU Lesser General Public
-# License along with this library. If not, see <http://www.gnu.org/licenses/>.
-#
-# Author:
-# Tristan Daniël Maat <tristan.maat@codethink.co.uk>
-#
-from .job import Job, JobStatus, ChildJob
-
-
-class CacheSizeJob(Job):
- def __init__(self, *args, complete_cb, **kwargs):
- super().__init__(*args, **kwargs)
- self.set_name(self.action_name)
- self._complete_cb = complete_cb
-
- context = self._scheduler.context
- self._casquota = context.get_casquota()
-
- def parent_complete(self, status, result):
- if status is JobStatus.OK:
- self._casquota.set_cache_size(result)
-
- if self._complete_cb:
- self._complete_cb(status, result)
-
- def create_child_job(self, *args, **kwargs):
- return ChildCacheSizeJob(*args, casquota=self._scheduler.context._casquota, **kwargs)
-
-
-class ChildCacheSizeJob(ChildJob):
- def __init__(self, *args, casquota, **kwargs):
- super().__init__(*args, **kwargs)
- self._casquota = casquota
-
- def child_process(self):
- return self._casquota.compute_cache_size()
diff --git a/src/buildstream/_scheduler/jobs/cleanupjob.py b/src/buildstream/_scheduler/jobs/cleanupjob.py
deleted file mode 100644
index 3e9a8ff47..000000000
--- a/src/buildstream/_scheduler/jobs/cleanupjob.py
+++ /dev/null
@@ -1,55 +0,0 @@
-# Copyright (C) 2018 Codethink Limited
-#
-# This program is free software; you can redistribute it and/or
-# modify it under the terms of the GNU Lesser General Public
-# License as published by the Free Software Foundation; either
-# version 2 of the License, or (at your option) any later version.
-#
-# This library is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
-# Lesser General Public License for more details.
-#
-# You should have received a copy of the GNU Lesser General Public
-# License along with this library. If not, see <http://www.gnu.org/licenses/>.
-#
-# Author:
-# Tristan Daniël Maat <tristan.maat@codethink.co.uk>
-#
-from .job import Job, JobStatus, ChildJob
-
-
-class CleanupJob(Job):
- def __init__(self, *args, complete_cb, **kwargs):
- super().__init__(*args, **kwargs)
- self.set_name(self.action_name)
- self._complete_cb = complete_cb
-
- context = self._scheduler.context
- self._casquota = context.get_casquota()
-
- def handle_message(self, message):
- # Update the cache size in the main process as we go,
- # this provides better feedback in the UI.
- self._casquota.set_cache_size(message, write_to_disk=False)
-
- def parent_complete(self, status, result):
- if status is JobStatus.OK:
- self._casquota.set_cache_size(result, write_to_disk=False)
-
- if self._complete_cb:
- self._complete_cb(status, result)
-
- def create_child_job(self, *args, **kwargs):
- return ChildCleanupJob(*args, casquota=self._scheduler.context.get_casquota(), **kwargs)
-
-
-class ChildCleanupJob(ChildJob):
- def __init__(self, *args, casquota, **kwargs):
- super().__init__(*args, **kwargs)
- self._casquota = casquota
-
- def child_process(self):
- def progress():
- self.send_message(self._casquota.get_cache_size())
- return self._casquota.clean(progress)
diff --git a/src/buildstream/_scheduler/queues/buildqueue.py b/src/buildstream/_scheduler/queues/buildqueue.py
index 1be3f7cd0..dc33e6510 100644
--- a/src/buildstream/_scheduler/queues/buildqueue.py
+++ b/src/buildstream/_scheduler/queues/buildqueue.py
@@ -21,7 +21,6 @@
from datetime import timedelta
from . import Queue, QueueStatus
-from ..jobs import JobStatus
from ..resources import ResourceType
from ..._message import MessageType
@@ -73,39 +72,11 @@ class BuildQueue(Queue):
return QueueStatus.READY
- def _check_cache_size(self, job, element, artifact_size):
-
- # After completing a build job, add the artifact size
- # as returned from Element._assemble() to the estimated
- # artifact cache size
- #
- context = self._scheduler.context
- artifacts = context.artifactcache
-
- artifacts.add_artifact_size(artifact_size)
-
- # If the estimated size outgrows the quota, ask the scheduler
- # to queue a job to actually check the real cache size.
- #
- if artifacts.full():
- self._scheduler.check_cache_size()
-
def done(self, job, element, result, status):
# Inform element in main process that assembly is done
element._assemble_done()
- # This has to be done after _assemble_done, such that the
- # element may register its cache key as required
- #
- # FIXME: Element._assemble() does not report both the failure state and the
- # size of the newly cached failed artifact, so we can only adjust the
- # artifact cache size for a successful build even though we know a
- # failed build also grows the artifact cache size.
- #
- if status is JobStatus.OK:
- self._check_cache_size(job, element, result)
-
def register_pending_element(self, element):
# Set a "buildable" callback for an element not yet ready
# to be processed in the build queue.
diff --git a/src/buildstream/_scheduler/queues/pullqueue.py b/src/buildstream/_scheduler/queues/pullqueue.py
index 2c46cd2fd..7f4125099 100644
--- a/src/buildstream/_scheduler/queues/pullqueue.py
+++ b/src/buildstream/_scheduler/queues/pullqueue.py
@@ -52,12 +52,6 @@ class PullQueue(Queue):
element._pull_done()
- # Build jobs will check the "approximate" size first. Since we
- # do not get an artifact size from pull jobs, we have to
- # actually check the cache size.
- if status is JobStatus.OK:
- self._scheduler.check_cache_size()
-
def register_pending_element(self, element):
# Set a "can_query_cache"_callback for an element which is not
# immediately ready to query the artifact cache so that it
diff --git a/src/buildstream/_scheduler/scheduler.py b/src/buildstream/_scheduler/scheduler.py
index 17878c4fd..bd76e00b1 100644
--- a/src/buildstream/_scheduler/scheduler.py
+++ b/src/buildstream/_scheduler/scheduler.py
@@ -27,8 +27,8 @@ import datetime
from contextlib import contextmanager
# Local imports
-from .resources import Resources, ResourceType
-from .jobs import JobStatus, CacheSizeJob, CleanupJob
+from .resources import Resources
+from .jobs import JobStatus
from .._profile import Topics, PROFILER
@@ -39,12 +39,6 @@ class SchedStatus():
TERMINATED = 1
-# Some action names for the internal jobs we launch
-#
-_ACTION_NAME_CLEANUP = 'clean'
-_ACTION_NAME_CACHE_SIZE = 'size'
-
-
# Scheduler()
#
# The scheduler operates on a list queues, each of which is meant to accomplish
@@ -98,12 +92,6 @@ class Scheduler():
self._queue_jobs = True # Whether we should continue to queue jobs
self._state = state
- # State of cache management related jobs
- self._cache_size_scheduled = False # Whether we have a cache size job scheduled
- self._cache_size_running = None # A running CacheSizeJob, or None
- self._cleanup_scheduled = False # Whether we have a cleanup job scheduled
- self._cleanup_running = None # A running CleanupJob, or None
-
# Callbacks to report back to the Scheduler owner
self._interrupt_callback = interrupt_callback
self._ticker_callback = ticker_callback
@@ -127,6 +115,8 @@ class Scheduler():
#
def run(self, queues):
+ assert self.context.fork_allowed
+
# Hold on to the queues to process
self.queues = queues
@@ -142,9 +132,6 @@ class Scheduler():
# Handle unix signals while running
self._connect_signals()
- # Check if we need to start with some cache maintenance
- self._check_cache_management()
-
# Start the profiler
with PROFILER.profile(Topics.SCHEDULER, "_".join(queue.action_name for queue in self.queues)):
# Run the queues
@@ -271,51 +258,10 @@ class Scheduler():
# Now check for more jobs
self._sched()
- # check_cache_size():
- #
- # Queues a cache size calculation job, after the cache
- # size is calculated, a cleanup job will be run automatically
- # if needed.
- #
- def check_cache_size(self):
-
- # Here we assume we are called in response to a job
- # completion callback, or before entering the scheduler.
- #
- # As such there is no need to call `_sched()` from here,
- # and we prefer to run it once at the last moment.
- #
- self._cache_size_scheduled = True
-
#######################################################
# Local Private Methods #
#######################################################
- # _check_cache_management()
- #
- # Run an initial check if we need to lock the cache
- # resource and check the size and possibly launch
- # a cleanup.
- #
- # Sessions which do not add to the cache are not affected.
- #
- def _check_cache_management(self):
-
- # Only trigger the check for a scheduler run which has
- # queues which require the CACHE resource.
- if not any(q for q in self.queues
- if ResourceType.CACHE in q.resources):
- return
-
- # If the estimated size outgrows the quota, queue a job to
- # actually check the real cache size initially, this one
- # should have exclusive access to the cache to ensure nothing
- # starts while we are checking the cache.
- #
- artifacts = self.context.artifactcache
- if artifacts.full():
- self._sched_cache_size_job(exclusive=True)
-
# _start_job()
#
# Spanws a job
@@ -328,106 +274,6 @@ class Scheduler():
self._state.add_task(job.action_name, job.name, self.elapsed_time())
job.start()
- # Callback for the cache size job
- def _cache_size_job_complete(self, status, cache_size):
-
- # Deallocate cache size job resources
- self._cache_size_running = None
- self.resources.release([ResourceType.CACHE, ResourceType.PROCESS])
-
- # Unregister the exclusive interest if there was any
- self.resources.unregister_exclusive_interest(
- [ResourceType.CACHE], 'cache-size'
- )
-
- # Schedule a cleanup job if we've hit the threshold
- if status is not JobStatus.OK:
- return
-
- context = self.context
- artifacts = context.artifactcache
-
- if artifacts.full():
- self._cleanup_scheduled = True
-
- # Callback for the cleanup job
- def _cleanup_job_complete(self, status, cache_size):
-
- # Deallocate cleanup job resources
- self._cleanup_running = None
- self.resources.release([ResourceType.CACHE, ResourceType.PROCESS])
-
- # Unregister the exclusive interest when we're done with it
- if not self._cleanup_scheduled:
- self.resources.unregister_exclusive_interest(
- [ResourceType.CACHE], 'cache-cleanup'
- )
-
- # _sched_cleanup_job()
- #
- # Runs a cleanup job if one is scheduled to run now and
- # sufficient recources are available.
- #
- def _sched_cleanup_job(self):
-
- if self._cleanup_scheduled and self._cleanup_running is None:
-
- # Ensure we have an exclusive interest in the resources
- self.resources.register_exclusive_interest(
- [ResourceType.CACHE], 'cache-cleanup'
- )
-
- if self.resources.reserve([ResourceType.CACHE, ResourceType.PROCESS],
- [ResourceType.CACHE]):
-
- # Update state and launch
- self._cleanup_scheduled = False
- self._cleanup_running = \
- CleanupJob(self, _ACTION_NAME_CLEANUP, 'cleanup/cleanup',
- complete_cb=self._cleanup_job_complete)
- self._start_job(self._cleanup_running)
-
- # _sched_cache_size_job()
- #
- # Runs a cache size job if one is scheduled to run now and
- # sufficient recources are available.
- #
- # Args:
- # exclusive (bool): Run a cache size job immediately and
- # hold the ResourceType.CACHE resource
- # exclusively (used at startup).
- #
- def _sched_cache_size_job(self, *, exclusive=False):
-
- # The exclusive argument is not intended (or safe) for arbitrary use.
- if exclusive:
- assert not self._cache_size_scheduled
- assert not self._cache_size_running
- assert not self._active_jobs
- self._cache_size_scheduled = True
-
- if self._cache_size_scheduled and not self._cache_size_running:
-
- # Handle the exclusive launch
- exclusive_resources = set()
- if exclusive:
- exclusive_resources.add(ResourceType.CACHE)
- self.resources.register_exclusive_interest(
- exclusive_resources, 'cache-size'
- )
-
- # Reserve the resources (with the possible exclusive cache resource)
- if self.resources.reserve([ResourceType.CACHE, ResourceType.PROCESS],
- exclusive_resources):
-
- # Update state and launch
- self._cache_size_scheduled = False
- self._cache_size_running = \
- CacheSizeJob(self, _ACTION_NAME_CACHE_SIZE,
- 'cache_size/cache_size',
- complete_cb=self._cache_size_job_complete)
- self._start_job(self._cache_size_running)
-
# _sched_queue_jobs()
#
# Ask the queues what jobs they want to schedule and schedule
@@ -490,12 +336,6 @@ class Scheduler():
if not self.terminated:
#
- # Try the cache management jobs
- #
- self._sched_cleanup_job()
- self._sched_cache_size_job()
-
- #
# Run as many jobs as the queues can handle for the
# available resources
#
diff --git a/src/buildstream/_sourcecache.py b/src/buildstream/_sourcecache.py
index b39874b20..64498ba32 100644
--- a/src/buildstream/_sourcecache.py
+++ b/src/buildstream/_sourcecache.py
@@ -94,52 +94,9 @@ class SourceCache(BaseCache):
def __init__(self, context):
super().__init__(context)
- self._required_sources = set()
self.sourcerefdir = os.path.join(context.cachedir, 'source_protos')
os.makedirs(self.sourcerefdir, exist_ok=True)
- self.casquota.add_remove_callbacks(self.unrequired_sources, self._remove_source)
- self.casquota.add_list_refs_callback(self.list_sources)
-
- self.cas.add_reachable_directories_callback(self._reachable_directories)
-
- # mark_required_sources()
- #
- # Mark sources that are required by the current run.
- #
- # Sources that are in this list will not be removed during the current
- # pipeline.
- #
- # Args:
- # sources (iterable): An iterable over sources that are required
- #
- def mark_required_sources(self, sources):
- sources = list(sources) # in case it's a generator
-
- self._required_sources.update(sources)
-
- # update mtimes just in case
- for source in sources:
- ref = source._get_source_name()
- try:
- self._update_mtime(ref)
- except SourceCacheError:
- pass
-
- # unrequired_sources()
- #
- # Yields the refs of all sources not required by the current build plan
- #
- # Returns:
- # iter (str): iterable over unrequired source keys
- #
- def unrequired_sources(self):
- required_source_names = set(map(
- lambda x: x._get_source_name(), self._required_sources))
- for (mtime, source) in self._list_refs_mtimes(self.sourcerefdir):
- if source not in required_source_names:
- yield (mtime, source)
-
# list_sources()
#
# Get list of all sources in the `sources_protos/` folder
diff --git a/src/buildstream/_stream.py b/src/buildstream/_stream.py
index a7705332b..167127cf2 100644
--- a/src/buildstream/_stream.py
+++ b/src/buildstream/_stream.py
@@ -19,8 +19,6 @@
# Jürg Billeter <juerg.billeter@codethink.co.uk>
# Tristan Maat <tristan.maat@codethink.co.uk>
-import itertools
-import functools
import os
import sys
import stat
@@ -225,6 +223,8 @@ class Stream():
else:
buildtree = True
+ self._context.disable_fork()
+
return element._shell(scope, directory, mounts=mounts, isolate=isolate, prompt=prompt, command=command,
usebuildtree=buildtree)
@@ -557,6 +557,8 @@ class Stream():
self._enqueue_plan(uncached_elts)
self._run()
+ self._context.disable_fork()
+
# Stage deps into a temporary sandbox first
if isinstance(target, ArtifactElement):
try:
@@ -669,9 +671,8 @@ class Stream():
#
# Args:
# targets (str): Targets to remove
- # no_prune (bool): Whether to prune the unreachable refs, default False
#
- def artifact_delete(self, targets, no_prune):
+ def artifact_delete(self, targets):
# Return list of Element and/or ArtifactElement objects
target_objects = self.load_selection(targets, selection=PipelineSelection.NONE, load_refs=True)
@@ -686,7 +687,7 @@ class Stream():
ref_removed = False
for ref in remove_refs:
try:
- self._artifacts.remove(ref, defer_prune=True)
+ self._artifacts.remove(ref)
except ArtifactError as e:
self._message(MessageType.WARN, str(e))
continue
@@ -694,11 +695,6 @@ class Stream():
self._message(MessageType.INFO, "Removed: {}".format(ref))
ref_removed = True
- # Prune the artifact cache
- if ref_removed and not no_prune:
- with self._context.messenger.timed_activity("Pruning artifact cache"):
- self._artifacts.prune()
-
if not ref_removed:
self._message(MessageType.INFO, "No artifacts were removed")
@@ -1248,20 +1244,6 @@ class Stream():
selected,
except_elements)
- # Set the "required" artifacts that should not be removed
- # while this pipeline is active
- #
- # It must include all the artifacts which are required by the
- # final product. Note that this is a superset of the build plan.
- #
- # use partial as we send this to both Artifact and Source caches
- required_elements = functools.partial(self._pipeline.dependencies, elements, Scope.ALL)
- self._artifacts.mark_required_elements(required_elements())
-
- self._sourcecache.mark_required_sources(
- itertools.chain.from_iterable(
- [element.sources() for element in required_elements()]))
-
if selection == PipelineSelection.PLAN and dynamic_plan:
# We use a dynamic build plan, only request artifacts of top-level targets,
# others are requested dynamically as needed.
diff --git a/src/buildstream/element.py b/src/buildstream/element.py
index ca573bee1..bd5ca14e7 100644
--- a/src/buildstream/element.py
+++ b/src/buildstream/element.py
@@ -1566,21 +1566,6 @@ class Element(Plugin):
workspace.clear_running_files()
self._get_context().get_workspaces().save_config()
- # This element will have already been marked as
- # required, but we bump the atime again, in case
- # we did not know the cache key until now.
- #
- # FIXME: This is not exactly correct, we should be
- # doing this at the time which we have discovered
- # a new cache key, this just happens to be the
- # last place where that can happen.
- #
- # Ultimately, we should be refactoring
- # Element._update_state() such that we know
- # when a cache key is actually discovered.
- #
- self.__artifacts.mark_required_elements([self])
-
# _assemble():
#
# Internal method for running the entire build phase.
diff --git a/src/buildstream/sandbox/_sandboxremote.py b/src/buildstream/sandbox/_sandboxremote.py
index affe597dd..4308d662b 100644
--- a/src/buildstream/sandbox/_sandboxremote.py
+++ b/src/buildstream/sandbox/_sandboxremote.py
@@ -281,7 +281,7 @@ class SandboxRemote(Sandbox):
context = self._get_context()
cascache = context.get_cascache()
artifactcache = context.artifactcache
- casremote = CASRemote(self.storage_remote_spec)
+ casremote = CASRemote(self.storage_remote_spec, cascache)
# Now do a pull to ensure we have the full directory structure.
dir_digest = cascache.pull_tree(casremote, tree_digest)
@@ -300,7 +300,7 @@ class SandboxRemote(Sandbox):
project = self._get_project()
cascache = context.get_cascache()
artifactcache = context.artifactcache
- casremote = CASRemote(self.storage_remote_spec)
+ casremote = CASRemote(self.storage_remote_spec, cascache)
# Fetch the file blobs if needed
if self._output_files_required or artifactcache.has_push_remotes():
@@ -368,7 +368,7 @@ class SandboxRemote(Sandbox):
action_result = self._check_action_cache(action_digest)
if not action_result:
- casremote = CASRemote(self.storage_remote_spec)
+ casremote = CASRemote(self.storage_remote_spec, cascache)
try:
casremote.init()
except grpc.RpcError as e:
diff --git a/src/buildstream/testing/runcli.py b/src/buildstream/testing/runcli.py
index 95bf83eff..6c3ab3496 100644
--- a/src/buildstream/testing/runcli.py
+++ b/src/buildstream/testing/runcli.py
@@ -746,7 +746,7 @@ class TestArtifact():
def _extract_subdirectory(self, tmpdir, digest):
with tempfile.TemporaryDirectory() as extractdir:
try:
- cas = CASCache(str(tmpdir))
+ cas = CASCache(str(tmpdir), casd=False)
cas.checkout(extractdir, digest)
yield extractdir
except FileNotFoundError:
diff --git a/tests/artifactcache/artifactservice.py b/tests/artifactcache/artifactservice.py
index ac9514793..3d229e24f 100644
--- a/tests/artifactcache/artifactservice.py
+++ b/tests/artifactcache/artifactservice.py
@@ -86,19 +86,9 @@ def _artifact_request(url, queue):
@pytest.mark.parametrize("files", ["present", "absent", "invalid"])
def test_update_artifact(tmpdir, files):
sharedir = os.path.join(str(tmpdir), "share")
- with create_artifact_share(sharedir) as share:
- # put files object
- if files == "present":
- directory = re_pb2.Directory()
- digest = share.cas.add_object(buffer=directory.SerializeToString())
- elif files == "invalid":
- digest = share.cas.add_object(buffer="abcdefghijklmnop".encode("utf-8"))
- elif files == "absent":
- digest = utils._message_digest("abcdefghijklmnop".encode("utf-8"))
-
- url = urlparse(share.repo)
+ with create_artifact_share(sharedir, casd=True) as share:
queue = Queue()
- process = Process(target=_queue_wrapper, args=(_get_artifact, queue, url, files, digest))
+ process = Process(target=_queue_wrapper, args=(_update_artifact, queue, share, files))
try:
with _signals.blocked([signal.SIGINT], ignore=False):
@@ -112,7 +102,18 @@ def test_update_artifact(tmpdir, files):
assert not error
-def _get_artifact(url, files, digest, queue):
+def _update_artifact(share, files, *, queue):
+ # put files object
+ if files == "present":
+ directory = re_pb2.Directory()
+ digest = share.cas.add_object(buffer=directory.SerializeToString())
+ elif files == "invalid":
+ digest = share.cas.add_object(buffer="abcdefghijklmnop".encode("utf-8"))
+ elif files == "absent":
+ digest = utils._message_digest("abcdefghijklmnop".encode("utf-8"))
+
+ url = urlparse(share.repo)
+
channel = grpc.insecure_channel("{}:{}".format(url.hostname, url.port))
artifact_stub = ArtifactServiceStub(channel)
diff --git a/tests/artifactcache/cache_size.py b/tests/artifactcache/cache_size.py
deleted file mode 100644
index fb34b5fad..000000000
--- a/tests/artifactcache/cache_size.py
+++ /dev/null
@@ -1,90 +0,0 @@
-# Pylint doesn't play well with fixtures and dependency injection from pytest
-# pylint: disable=redefined-outer-name
-
-import os
-from unittest import mock
-
-from buildstream import _yaml
-from buildstream._cas.cascache import CACHE_SIZE_FILE
-from buildstream._exceptions import ErrorDomain
-from buildstream.testing import cli # pylint: disable=unused-import
-
-from tests.testutils import create_element_size
-
-# XXX: Currently lacking:
-# * A way to check whether it's faster to read cache size on
-# successive invocations.
-# * A way to check whether the cache size file has been read.
-
-
-def create_project(project_dir):
- project_file = os.path.join(project_dir, "project.conf")
- project_conf = {
- "name": "test"
- }
- _yaml.roundtrip_dump(project_conf, project_file)
- element_name = "test.bst"
- create_element_size(element_name, project_dir, ".", [], 1024)
-
-
-def test_cache_size_roundtrip(cli, tmpdir):
- # Builds (to put files in the cache), then invokes buildstream again
- # to check nothing breaks
-
- # Create project
- project_dir = str(tmpdir)
- create_project(project_dir)
-
- # Build, to populate the cache
- res = cli.run(project=project_dir, args=["build", "test.bst"])
- res.assert_success()
-
- # Show, to check that nothing breaks while reading cache size
- res = cli.run(project=project_dir, args=["show", "test.bst"])
- res.assert_success()
-
-
-def test_cache_size_write(cli, tmpdir):
- # Builds (to put files in the cache), then checks a number is
- # written to the cache size file.
-
- project_dir = str(tmpdir)
- create_project(project_dir)
-
- # Artifact cache must be in a known place
- casdir = os.path.join(project_dir, "cas")
- cli.configure({"cachedir": project_dir})
-
- # Build, to populate the cache
- res = cli.run(project=project_dir, args=["build", "test.bst"])
- res.assert_success()
-
- # Inspect the artifact cache
- sizefile = os.path.join(casdir, CACHE_SIZE_FILE)
- assert os.path.isfile(sizefile)
-
-
-def test_quota_over_1024T(cli, tmpdir):
- KiB = 1024
- MiB = (KiB * 1024)
- GiB = (MiB * 1024)
- TiB = (GiB * 1024)
-
- cli.configure({
- 'cache': {
- 'quota': 2048 * TiB
- }
- })
- project = tmpdir.join("main")
- os.makedirs(str(project))
- _yaml.roundtrip_dump({'name': 'main'}, str(project.join("project.conf")))
-
- volume_space_patch = mock.patch(
- "buildstream._cas.CASQuota._get_cache_volume_size",
- autospec=True,
- return_value=(1025 * TiB, 1025 * TiB)
- )
-
- with volume_space_patch:
- result = cli.run(project, args=["build", "file.bst"])
- result.assert_main_error(ErrorDomain.CAS, 'insufficient-storage-for-quota')
diff --git a/tests/artifactcache/expiry.py b/tests/artifactcache/expiry.py
index b163903cb..e0bbb4007 100644
--- a/tests/artifactcache/expiry.py
+++ b/tests/artifactcache/expiry.py
@@ -21,7 +21,6 @@
# pylint: disable=redefined-outer-name
import os
-import re
from unittest import mock
import pytest
@@ -213,6 +212,7 @@ def test_never_delete_required(cli, datafiles):
'quota': 10000000
},
'scheduler': {
+ 'fetchers': 1,
'builders': 1
}
})
@@ -223,30 +223,19 @@ def test_never_delete_required(cli, datafiles):
create_element_size('dep3.bst', project, element_path, ['dep2.bst'], 8000000)
create_element_size('target.bst', project, element_path, ['dep3.bst'], 8000000)
+ # Build dep1.bst, which should fit into the cache.
+ res = cli.run(project=project, args=['build', 'dep1.bst'])
+ res.assert_success()
+
# We try to build this pipeline, but it's too big for the
# cache. Since all elements are required, the build should fail.
res = cli.run(project=project, args=['build', 'target.bst'])
res.assert_main_error(ErrorDomain.STREAM, None)
res.assert_task_error(ErrorDomain.CAS, 'cache-too-full')
- # Only the first artifact fits in the cache, but we expect
- # that the first *two* artifacts will be cached.
- #
- # This is because after caching the first artifact we must
- # proceed to build the next artifact, and we cannot really
- # know how large an artifact will be until we try to cache it.
- #
- # In this case, we deem it more acceptable to not delete an
- # artifact which caused the cache to outgrow the quota.
- #
- # Note that this test only works because we have forced
- # the configuration to build one element at a time, in real
- # life there may potentially be N-builders cached artifacts
- # which exceed the quota
- #
states = cli.get_element_states(project, ['target.bst'])
assert states['dep1.bst'] == 'cached'
- assert states['dep2.bst'] == 'cached'
+ assert states['dep2.bst'] != 'cached'
assert states['dep3.bst'] != 'cached'
assert states['target.bst'] != 'cached'
@@ -265,6 +254,7 @@ def test_never_delete_required_track(cli, datafiles):
'quota': 10000000
},
'scheduler': {
+ 'fetchers': 1,
'builders': 1
}
})
@@ -348,6 +338,7 @@ def test_never_delete_required_track(cli, datafiles):
("70%", 'warning', 'Your system does not have enough available')
])
@pytest.mark.datafiles(DATA_DIR)
+@pytest.mark.xfail()
def test_invalid_cache_quota(cli, datafiles, quota, err_domain, err_reason):
project = str(datafiles)
os.makedirs(os.path.join(project, 'elements'))
@@ -438,18 +429,6 @@ def test_cleanup_first(cli, datafiles):
res = cli.run(project=project, args=['build', 'target2.bst'])
res.assert_success()
- # Find all of the activity (like push, pull, src-pull) lines
- results = re.findall(r'\[.*\]\[.*\]\[\s*(\S+):.*\]\s*START\s*.*\.log', res.stderr)
-
- # Don't bother checking the order of 'src-pull', it is allowed to start
- # before or after the initial cache size job, runs in parallel, and does
- # not require ResourceType.CACHE.
- results.remove('fetch')
- print(results)
-
- # Assert the expected sequence of events
- assert results == ['size', 'clean', 'build']
-
# Check that the correct element remains in the cache
states = cli.get_element_states(project, ['target.bst', 'target2.bst'])
assert states['target.bst'] != 'cached'
diff --git a/tests/frontend/push.py b/tests/frontend/push.py
index e92646154..9c3947c2a 100644
--- a/tests/frontend/push.py
+++ b/tests/frontend/push.py
@@ -292,19 +292,17 @@ def test_artifact_expires(cli, datafiles, tmpdir):
element_path = 'elements'
# Create an artifact share (remote artifact cache) in the tmpdir/artifactshare
- # Mock a file system with 12 MB free disk space
+ # Set a 22 MB quota
with create_artifact_share(os.path.join(str(tmpdir), 'artifactshare'),
- min_head_size=int(2e9),
- max_head_size=int(2e9),
- total_space=int(10e9), free_space=(int(12e6) + int(2e9))) as share:
+ quota=int(22e6)) as share:
# Configure bst to push to the cache
cli.configure({
'artifacts': {'url': share.repo, 'push': True},
})
- # Create and build an element of 5 MB
- create_element_size('element1.bst', project, element_path, [], int(5e6))
+ # Create and build an element of 15 MB
+ create_element_size('element1.bst', project, element_path, [], int(15e6))
result = cli.run(project=project, args=['build', 'element1.bst'])
result.assert_success()
@@ -349,7 +347,7 @@ def test_artifact_too_large(cli, datafiles, tmpdir):
# Create an artifact share (remote cache) in tmpdir/artifactshare
# Mock a file system with 5 MB total space
with create_artifact_share(os.path.join(str(tmpdir), 'artifactshare'),
- total_space=int(5e6) + int(2e9)) as share:
+ quota=int(5e6)) as share:
# Configure bst to push to the remote cache
cli.configure({
@@ -383,23 +381,21 @@ def test_recently_pulled_artifact_does_not_expire(cli, datafiles, tmpdir):
element_path = 'elements'
# Create an artifact share (remote cache) in tmpdir/artifactshare
- # Mock a file system with 12 MB free disk space
+ # Set a 22 MB quota
with create_artifact_share(os.path.join(str(tmpdir), 'artifactshare'),
- min_head_size=int(2e9),
- max_head_size=int(2e9),
- total_space=int(10e9), free_space=(int(12e6) + int(2e9))) as share:
+ quota=int(22e6)) as share:
# Configure bst to push to the cache
cli.configure({
'artifacts': {'url': share.repo, 'push': True},
})
- # Create and build 2 elements, each of 5 MB.
+ # Create and build 2 elements, one 5 MB and one 15 MB.
create_element_size('element1.bst', project, element_path, [], int(5e6))
result = cli.run(project=project, args=['build', 'element1.bst'])
result.assert_success()
- create_element_size('element2.bst', project, element_path, [], int(5e6))
+ create_element_size('element2.bst', project, element_path, [], int(15e6))
result = cli.run(project=project, args=['build', 'element2.bst'])
result.assert_success()
diff --git a/tests/integration/cachedfail.py b/tests/integration/cachedfail.py
index e3b5b2796..f8dd52aa6 100644
--- a/tests/integration/cachedfail.py
+++ b/tests/integration/cachedfail.py
@@ -20,7 +20,7 @@
import os
import pytest
-from buildstream import _yaml
+from buildstream import utils, _yaml
from buildstream._exceptions import ErrorDomain
from buildstream.testing import cli_integration as cli # pylint: disable=unused-import
from buildstream.testing._utils.site import HAVE_SANDBOX
@@ -185,7 +185,12 @@ def test_push_cached_fail(cli, tmpdir, datafiles, on_error):
@pytest.mark.skipif(HAVE_SANDBOX != 'bwrap', reason='Only available with bubblewrap on Linux')
@pytest.mark.datafiles(DATA_DIR)
-def test_host_tools_errors_are_not_cached(cli, datafiles):
+def test_host_tools_errors_are_not_cached(cli, datafiles, tmp_path):
+ # Create symlink to buildbox-casd to work with custom PATH
+ buildbox_casd = tmp_path.joinpath('bin/buildbox-casd')
+ buildbox_casd.parent.mkdir()
+ os.symlink(utils.get_host_tool('buildbox-casd'), str(buildbox_casd))
+
project = str(datafiles)
element_path = os.path.join(project, 'elements', 'element.bst')
@@ -207,7 +212,11 @@ def test_host_tools_errors_are_not_cached(cli, datafiles):
_yaml.roundtrip_dump(element, element_path)
# Build without access to host tools, this will fail
- result1 = cli.run(project=project, args=['build', 'element.bst'], env={'PATH': '', 'BST_FORCE_SANDBOX': None})
+ result1 = cli.run(
+ project=project,
+ args=['build', 'element.bst'],
+ env={'PATH': str(tmp_path.joinpath('bin')),
+ 'BST_FORCE_SANDBOX': None})
result1.assert_task_error(ErrorDomain.SANDBOX, 'unavailable-local-sandbox')
assert cli.get_element_state(project, 'element.bst') == 'buildable'
diff --git a/tests/internals/storage.py b/tests/internals/storage.py
index 66423afd1..385162c13 100644
--- a/tests/internals/storage.py
+++ b/tests/internals/storage.py
@@ -1,7 +1,11 @@
+from contextlib import contextmanager
+import multiprocessing
import os
+import signal
import pytest
+from buildstream import utils, _signals
from buildstream._cas import CASCache
from buildstream.storage._casbaseddirectory import CasBasedDirectory
from buildstream.storage._filebaseddirectory import FileBasedDirectory
@@ -12,44 +16,93 @@ DATA_DIR = os.path.join(
)
+# Since parent processes wait for queue events, we need
+# to put something on it if the called process raises an
+# exception.
+def _queue_wrapper(target, queue, *args):
+ try:
+ target(*args)
+ except Exception as e:
+ queue.put(str(e))
+ raise
+
+ queue.put(None)
+
+
+@contextmanager
def setup_backend(backend_class, tmpdir):
if backend_class == FileBasedDirectory:
- return backend_class(os.path.join(tmpdir, "vdir"))
+ yield backend_class(os.path.join(tmpdir, "vdir"))
else:
cas_cache = CASCache(tmpdir)
- return backend_class(cas_cache)
+ try:
+ yield backend_class(cas_cache)
+ finally:
+ cas_cache.release_resources()
-@pytest.mark.parametrize("backend", [
- FileBasedDirectory, CasBasedDirectory])
-@pytest.mark.datafiles(DATA_DIR)
-def test_import(tmpdir, datafiles, backend):
+def _test_import_subprocess(tmpdir, datafiles, backend):
original = os.path.join(str(datafiles), "original")
- c = setup_backend(backend, str(tmpdir))
-
- c.import_files(original)
+ with setup_backend(backend, str(tmpdir)) as c:
+ c.import_files(original)
- assert "bin/bash" in c.list_relative_paths()
- assert "bin/hello" in c.list_relative_paths()
+ assert "bin/bash" in c.list_relative_paths()
+ assert "bin/hello" in c.list_relative_paths()
@pytest.mark.parametrize("backend", [
FileBasedDirectory, CasBasedDirectory])
@pytest.mark.datafiles(DATA_DIR)
-def test_modified_file_list(tmpdir, datafiles, backend):
+def test_import(tmpdir, datafiles, backend):
+ queue = multiprocessing.Queue()
+ process = multiprocessing.Process(target=_queue_wrapper,
+ args=(_test_import_subprocess, queue,
+ tmpdir, datafiles, backend))
+ try:
+ with _signals.blocked([signal.SIGINT], ignore=False):
+ process.start()
+ error = queue.get()
+ process.join()
+ except KeyboardInterrupt:
+ utils._kill_process_tree(process.pid)
+ raise
+
+ assert not error
+
+
+def _test_modified_file_list_subprocess(tmpdir, datafiles, backend):
original = os.path.join(str(datafiles), "original")
overlay = os.path.join(str(datafiles), "overlay")
- c = setup_backend(backend, str(tmpdir))
+ with setup_backend(backend, str(tmpdir)) as c:
+ c.import_files(original)
- c.import_files(original)
+ c.mark_unmodified()
- c.mark_unmodified()
+ c.import_files(overlay)
- c.import_files(overlay)
+ print("List of all paths in imported results: {}".format(c.list_relative_paths()))
+ assert "bin/bash" in c.list_relative_paths()
+ assert "bin/bash" in c.list_modified_paths()
+ assert "bin/hello" not in c.list_modified_paths()
- print("List of all paths in imported results: {}".format(c.list_relative_paths()))
- assert "bin/bash" in c.list_relative_paths()
- assert "bin/bash" in c.list_modified_paths()
- assert "bin/hello" not in c.list_modified_paths()
+
+@pytest.mark.parametrize("backend", [
+ FileBasedDirectory, CasBasedDirectory])
+@pytest.mark.datafiles(DATA_DIR)
+def test_modified_file_list(tmpdir, datafiles, backend):
+ queue = multiprocessing.Queue()
+ process = multiprocessing.Process(target=_queue_wrapper,
+ args=(_test_modified_file_list_subprocess, queue,
+ tmpdir, datafiles, backend))
+ try:
+ with _signals.blocked([signal.SIGINT], ignore=False):
+ process.start()
+ error = queue.get()
+ process.join()
+ except KeyboardInterrupt:
+ utils._kill_process_tree(process.pid)
+ raise
+
+ assert not error
diff --git a/tests/internals/storage_vdir_import.py b/tests/internals/storage_vdir_import.py
index 7c6cbe4fb..e0165fc13 100644
--- a/tests/internals/storage_vdir_import.py
+++ b/tests/internals/storage_vdir_import.py
@@ -14,11 +14,14 @@
# You should have received a copy of the GNU Lesser General Public
# License along with this library. If not, see <http://www.gnu.org/licenses/>.
from hashlib import sha256
+import multiprocessing
import os
import random
+import signal
import pytest
+from buildstream import utils, _signals
from buildstream.storage._casbaseddirectory import CasBasedDirectory
from buildstream.storage._filebaseddirectory import FileBasedDirectory
from buildstream._cas import CASCache
@@ -55,6 +58,34 @@ RANDOM_SEED = 69105
NUM_RANDOM_TESTS = 10
+# Since parent processes wait for queue events, we need
+# to put something on it if the called process raises an
+# exception.
+def _queue_wrapper(target, queue, *args):
+ try:
+ target(*args)
+ except Exception as e:
+ queue.put(str(e))
+ raise
+
+ queue.put(None)
+
+
+def _run_test_in_subprocess(func, *args):
+ queue = multiprocessing.Queue()
+ process = multiprocessing.Process(target=_queue_wrapper, args=(func, queue, *args))
+ try:
+ with _signals.blocked([signal.SIGINT], ignore=False):
+ process.start()
+ error = queue.get()
+ process.join()
+ except KeyboardInterrupt:
+ utils._kill_process_tree(process.pid)
+ raise
+
+ assert not error
+
+
def generate_import_roots(rootno, directory):
rootname = "root{}".format(rootno)
rootdir = os.path.join(directory, "content", rootname)
@@ -182,56 +213,64 @@ def directory_not_empty(path):
return os.listdir(path)
-def _import_test(tmpdir, original, overlay, generator_function, verify_contents=False):
+def _import_test_subprocess(tmpdir, original, overlay, generator_function, verify_contents=False):
cas_cache = CASCache(tmpdir)
- # Create some fake content
- generator_function(original, tmpdir)
- if original != overlay:
- generator_function(overlay, tmpdir)
-
- d = create_new_casdir(original, cas_cache, tmpdir)
-
- duplicate_cas = create_new_casdir(original, cas_cache, tmpdir)
-
- assert duplicate_cas._get_digest().hash == d._get_digest().hash
-
- d2 = create_new_casdir(overlay, cas_cache, tmpdir)
- d.import_files(d2)
- export_dir = os.path.join(tmpdir, "output-{}-{}".format(original, overlay))
- roundtrip_dir = os.path.join(tmpdir, "roundtrip-{}-{}".format(original, overlay))
- d2.export_files(roundtrip_dir)
- d.export_files(export_dir)
-
- if verify_contents:
- for item in root_filesets[overlay - 1]:
- (path, typename, content) = item
- realpath = resolve_symlinks(path, export_dir)
- if typename == 'F':
- if os.path.isdir(realpath) and directory_not_empty(realpath):
- # The file should not have overwritten the directory in this case.
- pass
- else:
- assert os.path.isfile(realpath), "{} did not exist in the combined virtual directory".format(path)
- assert file_contents_are(realpath, content)
- elif typename == 'S':
- if os.path.isdir(realpath) and directory_not_empty(realpath):
- # The symlink should not have overwritten the directory in this case.
- pass
- else:
- assert os.path.islink(realpath)
- assert os.readlink(realpath) == content
- elif typename == 'D':
- # We can't do any more tests than this because it
- # depends on things present in the original. Blank
- # directories here will be ignored and the original
- # left in place.
- assert os.path.lexists(realpath)
-
- # Now do the same thing with filebaseddirectories and check the contents match
-
- duplicate_cas.import_files(roundtrip_dir)
-
- assert duplicate_cas._get_digest().hash == d._get_digest().hash
+ try:
+ # Create some fake content
+ generator_function(original, tmpdir)
+ if original != overlay:
+ generator_function(overlay, tmpdir)
+
+ d = create_new_casdir(original, cas_cache, tmpdir)
+
+ duplicate_cas = create_new_casdir(original, cas_cache, tmpdir)
+
+ assert duplicate_cas._get_digest().hash == d._get_digest().hash
+
+ d2 = create_new_casdir(overlay, cas_cache, tmpdir)
+ d.import_files(d2)
+ export_dir = os.path.join(tmpdir, "output-{}-{}".format(original, overlay))
+ roundtrip_dir = os.path.join(tmpdir, "roundtrip-{}-{}".format(original, overlay))
+ d2.export_files(roundtrip_dir)
+ d.export_files(export_dir)
+
+ if verify_contents:
+ for item in root_filesets[overlay - 1]:
+ (path, typename, content) = item
+ realpath = resolve_symlinks(path, export_dir)
+ if typename == 'F':
+ if os.path.isdir(realpath) and directory_not_empty(realpath):
+ # The file should not have overwritten the directory in this case.
+ pass
+ else:
+ assert os.path.isfile(realpath), \
+ "{} did not exist in the combined virtual directory".format(path)
+ assert file_contents_are(realpath, content)
+ elif typename == 'S':
+ if os.path.isdir(realpath) and directory_not_empty(realpath):
+ # The symlink should not have overwritten the directory in this case.
+ pass
+ else:
+ assert os.path.islink(realpath)
+ assert os.readlink(realpath) == content
+ elif typename == 'D':
+ # We can't do any more tests than this because it
+ # depends on things present in the original. Blank
+ # directories here will be ignored and the original
+ # left in place.
+ assert os.path.lexists(realpath)
+
+ # Now do the same thing with filebaseddirectories and check the contents match
+
+ duplicate_cas.import_files(roundtrip_dir)
+
+ assert duplicate_cas._get_digest().hash == d._get_digest().hash
+ finally:
+ cas_cache.release_resources()
+
+
+def _import_test(tmpdir, original, overlay, generator_function, verify_contents=False):
+ _run_test_in_subprocess(_import_test_subprocess, tmpdir, original, overlay, generator_function, verify_contents)
# It's possible to parameterize on both original and overlay values,
@@ -249,18 +288,25 @@ def test_random_cas_import(tmpdir, original):
_import_test(str(tmpdir), original, overlay, generate_random_root, verify_contents=False)
-def _listing_test(tmpdir, root, generator_function):
+def _listing_test_subprocess(tmpdir, root, generator_function):
cas_cache = CASCache(tmpdir)
- # Create some fake content
- generator_function(root, tmpdir)
+ try:
+ # Create some fake content
+ generator_function(root, tmpdir)
+
+ d = create_new_filedir(root, tmpdir)
+ filelist = list(d.list_relative_paths())
- d = create_new_filedir(root, tmpdir)
- filelist = list(d.list_relative_paths())
+ d2 = create_new_casdir(root, cas_cache, tmpdir)
+ filelist2 = list(d2.list_relative_paths())
- d2 = create_new_casdir(root, cas_cache, tmpdir)
- filelist2 = list(d2.list_relative_paths())
+ assert filelist == filelist2
+ finally:
+ cas_cache.release_resources()
- assert filelist == filelist2
+
+def _listing_test(tmpdir, root, generator_function):
+ _run_test_in_subprocess(_listing_test_subprocess, tmpdir, root, generator_function)
@pytest.mark.parametrize("root", range(1, 11))
@@ -274,120 +320,155 @@ def test_fixed_directory_listing(tmpdir, root):
# Check that the vdir is decending and readable
-def test_descend(tmpdir):
+def _test_descend_subprocess(tmpdir):
cas_dir = os.path.join(str(tmpdir), 'cas')
cas_cache = CASCache(cas_dir)
- d = CasBasedDirectory(cas_cache)
+ try:
+ d = CasBasedDirectory(cas_cache)
+
+ Content_to_check = 'You got me'
+ test_dir = os.path.join(str(tmpdir), 'importfrom')
+ filesys_discription = [
+ ('a', 'D', ''),
+ ('a/l', 'D', ''),
+ ('a/l/g', 'F', Content_to_check)
+ ]
+ generate_import_root(test_dir, filesys_discription)
+
+ d.import_files(test_dir)
+ digest = d.descend('a', 'l').index['g'].get_digest()
- Content_to_check = 'You got me'
- test_dir = os.path.join(str(tmpdir), 'importfrom')
- filesys_discription = [
- ('a', 'D', ''),
- ('a/l', 'D', ''),
- ('a/l/g', 'F', Content_to_check)
- ]
- generate_import_root(test_dir, filesys_discription)
+ assert Content_to_check == open(cas_cache.objpath(digest)).read()
+ finally:
+ cas_cache.release_resources()
- d.import_files(test_dir)
- digest = d.descend('a', 'l').index['g'].get_digest()
- assert Content_to_check == open(cas_cache.objpath(digest)).read()
+def test_descend(tmpdir):
+ _run_test_in_subprocess(_test_descend_subprocess, tmpdir)
# Check symlink logic for edgecases
# Make sure the correct erros are raised when trying
# to decend in to files or links to files
-def test_bad_symlinks(tmpdir):
+def _test_bad_symlinks_subprocess(tmpdir):
cas_dir = os.path.join(str(tmpdir), 'cas')
cas_cache = CASCache(cas_dir)
- d = CasBasedDirectory(cas_cache)
-
- test_dir = os.path.join(str(tmpdir), 'importfrom')
- filesys_discription = [
- ('a', 'D', ''),
- ('a/l', 'S', '../target'),
- ('target', 'F', 'You got me')
- ]
- generate_import_root(test_dir, filesys_discription)
- d.import_files(test_dir)
- exp_reason = "not-a-directory"
-
- with pytest.raises(VirtualDirectoryError) as error:
- d.descend('a', 'l', follow_symlinks=True)
- assert error.reason == exp_reason
+ try:
+ d = CasBasedDirectory(cas_cache)
+
+ test_dir = os.path.join(str(tmpdir), 'importfrom')
+ filesys_discription = [
+ ('a', 'D', ''),
+ ('a/l', 'S', '../target'),
+ ('target', 'F', 'You got me')
+ ]
+ generate_import_root(test_dir, filesys_discription)
+ d.import_files(test_dir)
+ exp_reason = "not-a-directory"
+
+ with pytest.raises(VirtualDirectoryError) as error:
+ d.descend('a', 'l', follow_symlinks=True)
+ assert error.reason == exp_reason
+
+ with pytest.raises(VirtualDirectoryError) as error:
+ d.descend('a', 'l')
+ assert error.reason == exp_reason
+
+ with pytest.raises(VirtualDirectoryError) as error:
+ d.descend('a', 'f')
+ assert error.reason == exp_reason
+ finally:
+ cas_cache.release_resources()
- with pytest.raises(VirtualDirectoryError) as error:
- d.descend('a', 'l')
- assert error.reason == exp_reason
- with pytest.raises(VirtualDirectoryError) as error:
- d.descend('a', 'f')
- assert error.reason == exp_reason
+def test_bad_symlinks(tmpdir):
+ _run_test_in_subprocess(_test_bad_symlinks_subprocess, tmpdir)
# Check symlink logic for edgecases
# Check decend accross relitive link
-def test_relitive_symlink(tmpdir):
+def _test_relative_symlink_subprocess(tmpdir):
cas_dir = os.path.join(str(tmpdir), 'cas')
cas_cache = CASCache(cas_dir)
- d = CasBasedDirectory(cas_cache)
+ try:
+ d = CasBasedDirectory(cas_cache)
+
+ Content_to_check = 'You got me'
+ test_dir = os.path.join(str(tmpdir), 'importfrom')
+ filesys_discription = [
+ ('a', 'D', ''),
+ ('a/l', 'S', '../target'),
+ ('target', 'D', ''),
+ ('target/file', 'F', Content_to_check)
+ ]
+ generate_import_root(test_dir, filesys_discription)
+ d.import_files(test_dir)
+
+ digest = d.descend('a', 'l', follow_symlinks=True).index['file'].get_digest()
+ assert Content_to_check == open(cas_cache.objpath(digest)).read()
+ finally:
+ cas_cache.release_resources()
- Content_to_check = 'You got me'
- test_dir = os.path.join(str(tmpdir), 'importfrom')
- filesys_discription = [
- ('a', 'D', ''),
- ('a/l', 'S', '../target'),
- ('target', 'D', ''),
- ('target/file', 'F', Content_to_check)
- ]
- generate_import_root(test_dir, filesys_discription)
- d.import_files(test_dir)
- digest = d.descend('a', 'l', follow_symlinks=True).index['file'].get_digest()
- assert Content_to_check == open(cas_cache.objpath(digest)).read()
+def test_relative_symlink(tmpdir):
+ _run_test_in_subprocess(_test_relative_symlink_subprocess, tmpdir)
# Check symlink logic for edgecases
# Check deccend accross abs link
-def test_abs_symlink(tmpdir):
+def _test_abs_symlink_subprocess(tmpdir):
cas_dir = os.path.join(str(tmpdir), 'cas')
cas_cache = CASCache(cas_dir)
- d = CasBasedDirectory(cas_cache)
+ try:
+ d = CasBasedDirectory(cas_cache)
+
+ Content_to_check = 'two step file'
+ test_dir = os.path.join(str(tmpdir), 'importfrom')
+ filesys_discription = [
+ ('a', 'D', ''),
+ ('a/l', 'S', '/target'),
+ ('target', 'D', ''),
+ ('target/file', 'F', Content_to_check)
+ ]
+ generate_import_root(test_dir, filesys_discription)
+ d.import_files(test_dir)
- Content_to_check = 'two step file'
- test_dir = os.path.join(str(tmpdir), 'importfrom')
- filesys_discription = [
- ('a', 'D', ''),
- ('a/l', 'S', '/target'),
- ('target', 'D', ''),
- ('target/file', 'F', Content_to_check)
- ]
- generate_import_root(test_dir, filesys_discription)
- d.import_files(test_dir)
+ digest = d.descend('a', 'l', follow_symlinks=True).index['file'].get_digest()
- digest = d.descend('a', 'l', follow_symlinks=True).index['file'].get_digest()
+ assert Content_to_check == open(cas_cache.objpath(digest)).read()
+ finally:
+ cas_cache.release_resources()
- assert Content_to_check == open(cas_cache.objpath(digest)).read()
+
+def test_abs_symlink(tmpdir):
+ _run_test_in_subprocess(_test_abs_symlink_subprocess, tmpdir)
# Check symlink logic for edgecases
# Check symlink can not escape root
-def test_bad_sym_escape(tmpdir):
+def _test_bad_sym_escape_subprocess(tmpdir):
cas_dir = os.path.join(str(tmpdir), 'cas')
cas_cache = CASCache(cas_dir)
- d = CasBasedDirectory(cas_cache)
+ try:
+ d = CasBasedDirectory(cas_cache)
+
+ test_dir = os.path.join(str(tmpdir), 'importfrom')
+ filesys_discription = [
+ ('jail', 'D', ''),
+ ('jail/a', 'D', ''),
+ ('jail/a/l', 'S', '../../target'),
+ ('target', 'D', ''),
+ ('target/file', 'F', 'two step file')
+ ]
+ generate_import_root(test_dir, filesys_discription)
+ d.import_files(os.path.join(test_dir, 'jail'))
+
+ with pytest.raises(VirtualDirectoryError) as error:
+ d.descend('a', 'l', follow_symlinks=True)
+ assert error.reason == "directory-not-found"
+ finally:
+ cas_cache.release_resources()
- test_dir = os.path.join(str(tmpdir), 'importfrom')
- filesys_discription = [
- ('jail', 'D', ''),
- ('jail/a', 'D', ''),
- ('jail/a/l', 'S', '../../target'),
- ('target', 'D', ''),
- ('target/file', 'F', 'two step file')
- ]
- generate_import_root(test_dir, filesys_discription)
- d.import_files(os.path.join(test_dir, 'jail'))
-
- with pytest.raises(VirtualDirectoryError) as error:
- d.descend('a', 'l', follow_symlinks=True)
- assert error.reason == "directory-not-found"
+
+def test_bad_sym_escape(tmpdir):
+ _run_test_in_subprocess(_test_bad_sym_escape_subprocess, tmpdir)
diff --git a/tests/sandboxes/missing_dependencies.py b/tests/sandboxes/missing_dependencies.py
index 33a169ca2..975c8eb00 100644
--- a/tests/sandboxes/missing_dependencies.py
+++ b/tests/sandboxes/missing_dependencies.py
@@ -5,7 +5,7 @@ import os
import pytest
-from buildstream import _yaml
+from buildstream import utils, _yaml
from buildstream._exceptions import ErrorDomain
from buildstream.testing._utils.site import IS_LINUX
from buildstream.testing import cli # pylint: disable=unused-import
@@ -20,7 +20,12 @@ DATA_DIR = os.path.join(
@pytest.mark.skipif(not IS_LINUX, reason='Only available on Linux')
@pytest.mark.datafiles(DATA_DIR)
-def test_missing_brwap_has_nice_error_message(cli, datafiles):
+def test_missing_brwap_has_nice_error_message(cli, datafiles, tmp_path):
+ # Create symlink to buildbox-casd to work with custom PATH
+ buildbox_casd = tmp_path.joinpath('bin/buildbox-casd')
+ buildbox_casd.parent.mkdir()
+ os.symlink(utils.get_host_tool('buildbox-casd'), str(buildbox_casd))
+
project = str(datafiles)
element_path = os.path.join(project, 'elements', 'element.bst')
@@ -45,8 +50,8 @@ def test_missing_brwap_has_nice_error_message(cli, datafiles):
result = cli.run(
project=project,
args=['build', 'element.bst'],
- env={'PATH': '', 'BST_FORCE_SANDBOX': None}
- )
+ env={'PATH': str(tmp_path.joinpath('bin')),
+ 'BST_FORCE_SANDBOX': None})
result.assert_task_error(ErrorDomain.SANDBOX, 'unavailable-local-sandbox')
assert "not found" in result.stderr
@@ -64,6 +69,10 @@ def test_old_brwap_has_nice_error_message(cli, datafiles, tmp_path):
bwrap.chmod(0o755)
+ # Create symlink to buildbox-casd to work with custom PATH
+ buildbox_casd = tmp_path.joinpath('bin/buildbox-casd')
+ os.symlink(utils.get_host_tool('buildbox-casd'), str(buildbox_casd))
+
project = str(datafiles)
element_path = os.path.join(project, 'elements', 'element3.bst')
diff --git a/tests/sandboxes/selection.py b/tests/sandboxes/selection.py
index eff247a95..b4bbb1b00 100644
--- a/tests/sandboxes/selection.py
+++ b/tests/sandboxes/selection.py
@@ -19,7 +19,7 @@
import os
import pytest
-from buildstream import _yaml
+from buildstream import utils, _yaml
from buildstream._exceptions import ErrorDomain
from buildstream.testing import cli # pylint: disable=unused-import
@@ -64,7 +64,12 @@ def test_force_sandbox(cli, datafiles):
@pytest.mark.datafiles(DATA_DIR)
-def test_dummy_sandbox_fallback(cli, datafiles):
+def test_dummy_sandbox_fallback(cli, datafiles, tmp_path):
+ # Create symlink to buildbox-casd to work with custom PATH
+ buildbox_casd = tmp_path.joinpath('bin/buildbox-casd')
+ buildbox_casd.parent.mkdir()
+ os.symlink(utils.get_host_tool('buildbox-casd'), str(buildbox_casd))
+
project = str(datafiles)
element_path = os.path.join(project, 'elements', 'element.bst')
@@ -86,7 +91,11 @@ def test_dummy_sandbox_fallback(cli, datafiles):
_yaml.roundtrip_dump(element, element_path)
# Build without access to host tools, this will fail
- result = cli.run(project=project, args=['build', 'element.bst'], env={'PATH': '', 'BST_FORCE_SANDBOX': None})
+ result = cli.run(
+ project=project,
+ args=['build', 'element.bst'],
+ env={'PATH': str(tmp_path.joinpath('bin')),
+ 'BST_FORCE_SANDBOX': None})
# But if we dont spesify a sandbox then we fall back to dummy, we still
# fail early but only once we know we need a facny sandbox and that
# dumy is not enough, there for element gets fetched and so is buildable
diff --git a/tests/testutils/artifactshare.py b/tests/testutils/artifactshare.py
index a5522c8eb..7d5faeb66 100644
--- a/tests/testutils/artifactshare.py
+++ b/tests/testutils/artifactshare.py
@@ -20,16 +20,12 @@ from buildstream._protos.buildstream.v2 import artifact_pb2
#
# Args:
# directory (str): The base temp directory for the test
-# total_space (int): Mock total disk space on artifact server
-# free_space (int): Mock free disk space on artifact server
+# cache_quota (int): Maximum amount of disk space to use
+# casd (bool): Allow write access via casd
#
class ArtifactShare():
- def __init__(self, directory, *,
- total_space=None,
- free_space=None,
- min_head_size=int(2e9),
- max_head_size=int(10e9)):
+ def __init__(self, directory, *, quota=None, casd=False):
# The working directory for the artifact share (in case it
# needs to do something outside of its backend's storage folder).
@@ -46,13 +42,9 @@ class ArtifactShare():
self.artifactdir = os.path.join(self.repodir, 'artifacts', 'refs')
os.makedirs(self.artifactdir)
- self.cas = CASCache(self.repodir)
+ self.cas = CASCache(self.repodir, casd=casd)
- self.total_space = total_space
- self.free_space = free_space
-
- self.max_head_size = max_head_size
- self.min_head_size = min_head_size
+ self.quota = quota
q = Queue()
@@ -78,30 +70,23 @@ class ArtifactShare():
pytest_cov.embed.cleanup_on_sigterm()
try:
- # Optionally mock statvfs
- if self.total_space:
- if self.free_space is None:
- self.free_space = self.total_space
- os.statvfs = self._mock_statvfs
+ with create_server(self.repodir,
+ quota=self.quota,
+ enable_push=True) as server:
+ port = server.add_insecure_port('localhost:0')
- server = create_server(self.repodir,
- max_head_size=self.max_head_size,
- min_head_size=self.min_head_size,
- enable_push=True)
- port = server.add_insecure_port('localhost:0')
+ server.start()
- server.start()
+ # Send port to parent
+ q.put(port)
- # Send port to parent
- q.put(port)
+ # Sleep until termination by signal
+ signal.pause()
except Exception:
q.put(None)
raise
- # Sleep until termination by signal
- signal.pause()
-
# has_object():
#
# Checks whether the object is present in the share
@@ -176,18 +161,9 @@ class ArtifactShare():
self.process.terminate()
self.process.join()
- shutil.rmtree(self.directory)
+ self.cas.release_resources()
- def _mock_statvfs(self, _path):
- repo_size = 0
- for root, _, files in os.walk(self.repodir):
- for filename in files:
- repo_size += os.path.getsize(os.path.join(root, filename))
-
- return statvfs_result(f_blocks=self.total_space,
- f_bfree=self.free_space - repo_size,
- f_bavail=self.free_space - repo_size,
- f_bsize=1)
+ shutil.rmtree(self.directory)
# create_artifact_share()
@@ -195,11 +171,8 @@ class ArtifactShare():
# Create an ArtifactShare for use in a test case
#
@contextmanager
-def create_artifact_share(directory, *, total_space=None, free_space=None,
- min_head_size=int(2e9),
- max_head_size=int(10e9)):
- share = ArtifactShare(directory, total_space=total_space, free_space=free_space,
- min_head_size=min_head_size, max_head_size=max_head_size)
+def create_artifact_share(directory, *, quota=None, casd=False):
+ share = ArtifactShare(directory, quota=quota, casd=casd)
try:
yield share
finally: