diff options
author | bst-marge-bot <marge-bot@buildstream.build> | 2019-08-20 07:12:05 +0000 |
---|---|---|
committer | bst-marge-bot <marge-bot@buildstream.build> | 2019-08-20 07:12:05 +0000 |
commit | 56ff33fbd7c5af1518f27a040da37520b1a3e247 (patch) | |
tree | 2a4e3468ab091770267970804e03f8518392b5b0 | |
parent | e92781bdb7e76b91195fef84039fe7ff51cd02bf (diff) | |
parent | 4e867691dbebf91ceb24e7dabea6cbc93399222d (diff) | |
download | buildstream-56ff33fbd7c5af1518f27a040da37520b1a3e247.tar.gz |
Merge branch 'juerg/casd' into 'master'
Use buildbox-casd for CAS access
See merge request BuildStream/buildstream!1499
36 files changed, 2426 insertions, 1845 deletions
diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml index 2bdaab0fb..9b7db92cf 100644 --- a/.gitlab-ci.yml +++ b/.gitlab-ci.yml @@ -1,7 +1,7 @@ include: - template: Code-Quality.gitlab-ci.yml -image: registry.gitlab.com/buildstream/buildstream-docker-images/testsuite-debian:9-master-46405991 +image: registry.gitlab.com/buildstream/buildstream-docker-images/testsuite-debian:9-master-75925678 cache: key: "$CI_JOB_NAME-" @@ -54,24 +54,24 @@ variables: - .coverage-reports tests-debian-9: - image: registry.gitlab.com/buildstream/buildstream-docker-images/testsuite-debian:9-master-46405991 + image: registry.gitlab.com/buildstream/buildstream-docker-images/testsuite-debian:9-master-75925678 <<: *tests tests-fedora-29: - image: registry.gitlab.com/buildstream/buildstream-docker-images/testsuite-fedora:29-master-47052095 + image: registry.gitlab.com/buildstream/buildstream-docker-images/testsuite-fedora:29-master-75925678 <<: *tests tests-fedora-30: - image: registry.gitlab.com/buildstream/buildstream-docker-images/testsuite-fedora:30-master-59168197 + image: registry.gitlab.com/buildstream/buildstream-docker-images/testsuite-fedora:30-master-75925678 <<: *tests tests-ubuntu-18.04: - image: registry.gitlab.com/buildstream/buildstream-docker-images/testsuite-ubuntu:18.04-master-46405991 + image: registry.gitlab.com/buildstream/buildstream-docker-images/testsuite-ubuntu:18.04-master-75925678 <<: *tests tests-centos-7.6: <<: *tests - image: registry.gitlab.com/buildstream/buildstream-docker-images/testsuite-centos:7.6.1810-master-46405991 + image: registry.gitlab.com/buildstream/buildstream-docker-images/testsuite-centos:7.6.1810-master-75925678 overnight-fedora-30-aarch64: image: registry.gitlab.com/buildstream/buildstream-docker-images/testsuite-fedora:aarch64-30-master-59168197 @@ -87,7 +87,7 @@ overnight-fedora-30-aarch64: tests-unix: # Use fedora here, to a) run a test on fedora and b) ensure that we # can get rid of ostree - this is not possible with debian-8 - image: registry.gitlab.com/buildstream/buildstream-docker-images/testsuite-fedora:29-master-47052095 + image: registry.gitlab.com/buildstream/buildstream-docker-images/testsuite-fedora:29-master-75925678 <<: *tests variables: BST_FORCE_SANDBOX: "chroot" @@ -104,7 +104,7 @@ tests-unix: - ${TEST_COMMAND} tests-buildbox: - image: registry.gitlab.com/buildstream/buildstream-docker-images/testsuite-fedora:29-master-47052095 + image: registry.gitlab.com/buildstream/buildstream-docker-images/testsuite-fedora:29-master-75925678 <<: *tests variables: BST_FORCE_SANDBOX: "buildbox" @@ -134,7 +134,7 @@ tests-buildbox: tests-fedora-missing-deps: # Ensure that tests behave nicely while missing bwrap and ostree - image: registry.gitlab.com/buildstream/buildstream-docker-images/testsuite-fedora:29-master-47052095 + image: registry.gitlab.com/buildstream/buildstream-docker-images/testsuite-fedora:29-master-75925678 <<: *tests script: @@ -153,7 +153,7 @@ tests-fedora-update-deps: # Check if the tests pass after updating requirements to their latest # allowed version. allow_failure: true - image: registry.gitlab.com/buildstream/buildstream-docker-images/testsuite-fedora:29-master-47052095 + image: registry.gitlab.com/buildstream/buildstream-docker-images/testsuite-fedora:29-master-75925678 <<: *tests script: @@ -167,7 +167,7 @@ tests-fedora-update-deps: tests-remote-execution: allow_failure: true - image: registry.gitlab.com/buildstream/buildstream-docker-images/testsuite-fedora:29-master-47052095 + image: registry.gitlab.com/buildstream/buildstream-docker-images/testsuite-fedora:29-master-75925678 <<: *tests before_script: - dnf install -y docker docker-compose @@ -190,7 +190,7 @@ tests-remote-execution: PYTEST_ARGS: "--color=yes --remote-execution" tests-spawn-multiprocessing-start-method: - image: registry.gitlab.com/buildstream/buildstream-docker-images/testsuite-fedora:29-master-47052095 + image: registry.gitlab.com/buildstream/buildstream-docker-images/testsuite-fedora:29-master-75925678 <<: *tests variables: BST_FORCE_START_METHOD: "spawn" @@ -221,6 +221,10 @@ tests-wsl-master: - df -h - PATH=/root/.local/bin:$PATH tox --version script: + # Install static buildbox-casd binary + - wget https://buildbox-casd-binaries.nyc3.cdn.digitaloceanspaces.com/buildbox-casd-x86_64-linux-20190813-20d41af4.tar.xz + - tar -C /root/.local/bin -xf buildbox-casd-x86_64-linux-20190813-20d41af4.tar.xz + - PATH=/root/.local/bin:$PATH ${TEST_COMMAND} only: - master @@ -237,6 +241,10 @@ tests-wsl-non-master: - df -h - PATH=/root/.local/bin:$PATH tox --version script: + # Install static buildbox-casd binary + - wget https://buildbox-casd-binaries.nyc3.cdn.digitaloceanspaces.com/buildbox-casd-x86_64-linux-20190813-20d41af4.tar.xz + - tar -C /root/.local/bin -xf buildbox-casd-x86_64-linux-20190813-20d41af4.tar.xz + - PATH=/root/.local/bin:$PATH ${TEST_COMMAND} when: manual except: @@ -259,7 +267,7 @@ docs: .overnight-tests: &overnight-tests-template stage: test - image: registry.gitlab.com/buildstream/buildstream-docker-images/testsuite-fedora:30-master-59168197 + image: registry.gitlab.com/buildstream/buildstream-docker-images/testsuite-fedora:30-master-75925678 variables: BST_EXT_URL: git+https://gitlab.com/BuildStream/bst-plugins-experimental.git BST_EXT_REF: 0.12.0-40-g7aa1423377629281decc455d1090964417c38f2e diff --git a/src/buildstream/_artifactcache.py b/src/buildstream/_artifactcache.py index d62e7f500..f92a7c84f 100644 --- a/src/buildstream/_artifactcache.py +++ b/src/buildstream/_artifactcache.py @@ -21,8 +21,7 @@ import os import grpc from ._basecache import BaseCache -from .types import _KeyStrength -from ._exceptions import ArtifactError, CASError, CASCacheError +from ._exceptions import ArtifactError, CASError, CASCacheError, CASRemoteError from ._protos.buildstream.v2 import buildstream_pb2, buildstream_pb2_grpc, \ artifact_pb2, artifact_pb2_grpc @@ -96,107 +95,16 @@ class ArtifactCache(BaseCache): def __init__(self, context): super().__init__(context) - self._required_elements = set() # The elements required for this session - # create artifact directory self.artifactdir = context.artifactdir os.makedirs(self.artifactdir, exist_ok=True) - self.casquota.add_remove_callbacks(self.unrequired_artifacts, self.remove) - self.casquota.add_list_refs_callback(self.list_artifacts) - - self.cas.add_reachable_directories_callback(self._reachable_directories) - self.cas.add_reachable_digests_callback(self._reachable_digests) - - # mark_required_elements(): - # - # Mark elements whose artifacts are required for the current run. - # - # Artifacts whose elements are in this list will be locked by the artifact - # cache and not touched for the duration of the current pipeline. - # - # Args: - # elements (iterable): A set of elements to mark as required - # - def mark_required_elements(self, elements): - - # We risk calling this function with a generator, so we - # better consume it first. - # - elements = list(elements) - - # Mark the elements as required. We cannot know that we know the - # cache keys yet, so we only check that later when deleting. - # - self._required_elements.update(elements) - - # For the cache keys which were resolved so far, we bump - # the mtime of them. - # - # This is just in case we have concurrent instances of - # BuildStream running with the same artifact cache, it will - # reduce the likelyhood of one instance deleting artifacts - # which are required by the other. - for element in elements: - strong_key = element._get_cache_key(strength=_KeyStrength.STRONG) - weak_key = element._get_cache_key(strength=_KeyStrength.WEAK) - for key in (strong_key, weak_key): - if key: - ref = element.get_artifact_name(key) - - try: - self.update_mtime(ref) - except ArtifactError: - pass - def update_mtime(self, ref): try: os.utime(os.path.join(self.artifactdir, ref)) except FileNotFoundError as e: raise ArtifactError("Couldn't find artifact: {}".format(ref)) from e - # unrequired_artifacts() - # - # Returns iterator over artifacts that are not required in the build plan - # - # Returns: - # (iter): Iterator over tuples of (float, str) where float is the time - # and str is the artifact ref - # - def unrequired_artifacts(self): - required_artifacts = set(map(lambda x: x.get_artifact_name(), - self._required_elements)) - for (mtime, artifact) in self._list_refs_mtimes(self.artifactdir): - if artifact not in required_artifacts: - yield (mtime, artifact) - - def required_artifacts(self): - # Build a set of the cache keys which are required - # based on the required elements at cleanup time - # - # We lock both strong and weak keys - deleting one but not the - # other won't save space, but would be a user inconvenience. - for element in self._required_elements: - yield element._get_cache_key(strength=_KeyStrength.STRONG) - yield element._get_cache_key(strength=_KeyStrength.WEAK) - - def full(self): - return self.casquota.full() - - # add_artifact_size() - # - # Adds the reported size of a newly cached artifact to the - # overall estimated size. - # - # Args: - # artifact_size (int): The size to add. - # - def add_artifact_size(self, artifact_size): - cache_size = self.casquota.get_cache_size() - cache_size += artifact_size - - self.casquota.set_cache_size(cache_size) - # preflight(): # # Preflight check. @@ -241,25 +149,13 @@ class ArtifactCache(BaseCache): # Args: # ref (artifact_name): The name of the artifact to remove (as # generated by `Element.get_artifact_name`) - # defer_prune (bool): Optionally declare whether pruning should - # occur immediately after the ref is removed. - # - # Returns: - # (int): The amount of space recovered in the cache, in bytes # - def remove(self, ref, *, defer_prune=False): + def remove(self, ref): try: - return self.cas.remove(ref, basedir=self.artifactdir, defer_prune=defer_prune) + self.cas.remove(ref, basedir=self.artifactdir) except CASCacheError as e: raise ArtifactError("{}".format(e)) from e - # prune(): - # - # Prune the artifact cache of unreachable refs - # - def prune(self): - return self.cas.prune() - # diff(): # # Return a list of files that have been added or modified between @@ -491,42 +387,6 @@ class ArtifactCache(BaseCache): # Local Private Methods # ################################################ - # _reachable_directories() - # - # Returns: - # (iter): Iterator over directories digests available from artifacts. - # - def _reachable_directories(self): - for root, _, files in os.walk(self.artifactdir): - for artifact_file in files: - artifact = artifact_pb2.Artifact() - with open(os.path.join(root, artifact_file), 'r+b') as f: - artifact.ParseFromString(f.read()) - - if str(artifact.files): - yield artifact.files - - if str(artifact.buildtree): - yield artifact.buildtree - - # _reachable_digests() - # - # Returns: - # (iter): Iterator over single file digests in artifacts - # - def _reachable_digests(self): - for root, _, files in os.walk(self.artifactdir): - for artifact_file in files: - artifact = artifact_pb2.Artifact() - with open(os.path.join(root, artifact_file), 'r+b') as f: - artifact.ParseFromString(f.read()) - - if str(artifact.public_data): - yield artifact.public_data - - for log_file in artifact.logs: - yield log_file.digest - # _push_artifact() # # Pushes relevant directories and then artifact proto to remote. @@ -580,6 +440,10 @@ class ArtifactCache(BaseCache): self.cas.send_blobs(remote, digests) + except CASRemoteError as cas_error: + if cas_error.reason != "cache-too-full": + raise ArtifactError("Failed to push artifact blobs: {}".format(cas_error)) + return False except grpc.RpcError as e: if e.code() != grpc.StatusCode.RESOURCE_EXHAUSTED: raise ArtifactError("Failed to push artifact blobs: {}".format(e.details())) diff --git a/src/buildstream/_basecache.py b/src/buildstream/_basecache.py index 739d14b6a..fc6087bf8 100644 --- a/src/buildstream/_basecache.py +++ b/src/buildstream/_basecache.py @@ -42,8 +42,6 @@ class BaseCache(): def __init__(self, context): self.context = context self.cas = context.get_cascache() - self.casquota = context.get_casquota() - self.casquota._calculate_cache_quota() self._remotes_setup = False # Check to prevent double-setup of remotes # Per-project list of _CASRemote instances. @@ -163,7 +161,7 @@ class BaseCache(): q = multiprocessing.Queue() for remote_spec in remote_specs: - error = self.remote_class.check_remote(remote_spec, q) + error = self.remote_class.check_remote(remote_spec, self.cas, q) if error and on_failure: on_failure(remote_spec.url, error) @@ -175,7 +173,7 @@ class BaseCache(): if remote_spec.push: self._has_push_remotes = True - remotes[remote_spec.url] = self.remote_class(remote_spec) + remotes[remote_spec.url] = self.remote_class(remote_spec, self.cas) for project in self.context.get_projects(): remote_specs = self.global_remote_specs diff --git a/src/buildstream/_cas/__init__.py b/src/buildstream/_cas/__init__.py index 46bd9567f..a88e41371 100644 --- a/src/buildstream/_cas/__init__.py +++ b/src/buildstream/_cas/__init__.py @@ -17,5 +17,5 @@ # Authors: # Tristan Van Berkom <tristan.vanberkom@codethink.co.uk> -from .cascache import CASCache, CASQuota, CASCacheUsage +from .cascache import CASCache from .casremote import CASRemote, CASRemoteSpec diff --git a/src/buildstream/_cas/cascache.py b/src/buildstream/_cas/cascache.py index ff16480b1..e5fc530a7 100644 --- a/src/buildstream/_cas/cascache.py +++ b/src/buildstream/_cas/cascache.py @@ -17,22 +17,25 @@ # Authors: # Jürg Billeter <juerg.billeter@codethink.co.uk> -import hashlib import itertools import os import stat import errno -import uuid import contextlib +import shutil +import subprocess +import tempfile +import time import grpc -from .._protos.build.bazel.remote.execution.v2 import remote_execution_pb2 +from .._protos.google.rpc import code_pb2 +from .._protos.build.bazel.remote.execution.v2 import remote_execution_pb2, remote_execution_pb2_grpc +from .._protos.build.buildgrid import local_cas_pb2, local_cas_pb2_grpc from .._protos.buildstream.v2 import buildstream_pb2 from .. import utils -from .._exceptions import CASCacheError, LoadError, LoadErrorReason -from .._message import Message, MessageType +from .._exceptions import CASCacheError from .casremote import BlobNotFound, _CASBatchRead, _CASBatchUpdate @@ -42,54 +45,86 @@ _BUFFER_SIZE = 65536 CACHE_SIZE_FILE = "cache_size" -# CASCacheUsage -# -# A simple object to report the current CAS cache usage details. -# -# Note that this uses the user configured cache quota -# rather than the internal quota with protective headroom -# removed, to provide a more sensible value to display to -# the user. -# -# Args: -# cas (CASQuota): The CAS cache to get the status of -# -class CASCacheUsage(): - - def __init__(self, casquota): - self.quota_config = casquota._config_cache_quota # Configured quota - self.quota_size = casquota._cache_quota_original # Resolved cache quota in bytes - self.used_size = casquota.get_cache_size() # Size used by artifacts in bytes - self.used_percent = 0 # Percentage of the quota used - if self.quota_size is not None: - self.used_percent = int(self.used_size * 100 / self.quota_size) - - # Formattable into a human readable string - # - def __str__(self): - return "{} / {} ({}%)" \ - .format(utils._pretty_size(self.used_size, dec_places=1), - self.quota_config, - self.used_percent) - - # A CASCache manages a CAS repository as specified in the Remote Execution API. # # Args: # path (str): The root directory for the CAS repository +# casd (bool): True to spawn buildbox-casd (default) or False (testing only) # cache_quota (int): User configured cache quota +# protect_session_blobs (bool): Disable expiry for blobs used in the current session # class CASCache(): - def __init__(self, path): + def __init__(self, path, *, casd=True, cache_quota=None, protect_session_blobs=True): self.casdir = os.path.join(path, 'cas') self.tmpdir = os.path.join(path, 'tmp') os.makedirs(os.path.join(self.casdir, 'refs', 'heads'), exist_ok=True) os.makedirs(os.path.join(self.casdir, 'objects'), exist_ok=True) os.makedirs(self.tmpdir, exist_ok=True) - self.__reachable_directory_callbacks = [] - self.__reachable_digest_callbacks = [] + if casd: + # Place socket in global/user temporary directory to avoid hitting + # the socket path length limit. + self._casd_socket_tempdir = tempfile.mkdtemp(prefix='buildstream') + self._casd_socket_path = os.path.join(self._casd_socket_tempdir, 'casd.sock') + + casd_args = [utils.get_host_tool('buildbox-casd')] + casd_args.append('--bind=unix:' + self._casd_socket_path) + + if cache_quota is not None: + casd_args.append('--quota-high={}'.format(int(cache_quota))) + casd_args.append('--quota-low={}'.format(int(cache_quota / 2))) + + if protect_session_blobs: + casd_args.append('--protect-session-blobs') + + casd_args.append(path) + self._casd_process = subprocess.Popen(casd_args, cwd=path) + self._casd_start_time = time.time() + else: + self._casd_process = None + + self._casd_channel = None + self._local_cas = None + self._fork_disabled = False + + def __getstate__(self): + state = self.__dict__.copy() + + # Popen objects are not pickle-able, however, child processes only + # need the information whether a casd subprocess was started or not. + assert '_casd_process' in state + state['_casd_process'] = bool(self._casd_process) + + return state + + def _get_local_cas(self): + assert self._casd_process, "CASCache was instantiated without buildbox-casd" + + if not self._local_cas: + # gRPC doesn't support fork without exec, which is used in the main process. + assert self._fork_disabled or not utils._is_main_process() + + self._casd_channel = grpc.insecure_channel('unix:' + self._casd_socket_path) + self._local_cas = local_cas_pb2_grpc.LocalContentAddressableStorageStub(self._casd_channel) + + # Call GetCapabilities() to establish connection to casd + capabilities = remote_execution_pb2_grpc.CapabilitiesStub(self._casd_channel) + while True: + try: + capabilities.GetCapabilities(remote_execution_pb2.GetCapabilitiesRequest()) + break + except grpc.RpcError as e: + if e.code() == grpc.StatusCode.UNAVAILABLE: + # casd is not ready yet, try again after a 10ms delay, + # but don't wait for more than 15s + if time.time() < self._casd_start_time + 15: + time.sleep(1 / 100) + continue + + raise + + return self._local_cas # preflight(): # @@ -101,6 +136,39 @@ class CASCache(): if not (os.path.isdir(headdir) and os.path.isdir(objdir)): raise CASCacheError("CAS repository check failed for '{}'".format(self.casdir)) + # notify_fork_disabled(): + # + # Called by Context when fork() is disabled. This will enable communication + # with casd via gRPC in the main process. + # + def notify_fork_disabled(self): + self._fork_disabled = True + + # release_resources(): + # + # Release resources used by CASCache. + # + def release_resources(self, messenger=None): + if self._casd_process: + self._casd_process.terminate() + try: + # Don't print anything if buildbox-casd terminates quickly + self._casd_process.wait(timeout=0.5) + except subprocess.TimeoutExpired: + if messenger: + cm = messenger.timed_activity("Terminating buildbox-casd") + else: + cm = contextlib.suppress() + with cm: + try: + self._casd_process.wait(timeout=15) + except subprocess.TimeoutExpired: + self._casd_process.kill() + self._casd_process.wait(timeout=15) + self._casd_process = None + + shutil.rmtree(self._casd_socket_tempdir) + # contains(): # # Check whether the specified ref is already available in the local CAS cache. @@ -365,13 +433,14 @@ class CASCache(): # path (str): Path to file to add # buffer (bytes): Byte buffer to add # link_directly (bool): Whether file given by path can be linked + # instance_name (str): casd instance_name for remote CAS # # Returns: # (Digest): The digest of the added object # # Either `path` or `buffer` must be passed, but not both. # - def add_object(self, *, digest=None, path=None, buffer=None, link_directly=False): + def add_object(self, *, digest=None, path=None, buffer=None, link_directly=False, instance_name=None): # Exactly one of the two parameters has to be specified assert (path is None) != (buffer is None) @@ -381,42 +450,32 @@ class CASCache(): if digest is None: digest = remote_execution_pb2.Digest() - try: - h = hashlib.sha256() - # Always write out new file to avoid corruption if input file is modified - with contextlib.ExitStack() as stack: - if path is not None and link_directly: - tmp = stack.enter_context(open(path, 'rb')) - for chunk in iter(lambda: tmp.read(_BUFFER_SIZE), b""): - h.update(chunk) - else: - tmp = stack.enter_context(self._temporary_object()) + with contextlib.ExitStack() as stack: + if path is None: + tmp = stack.enter_context(self._temporary_object()) + tmp.write(buffer) + tmp.flush() + path = tmp.name - if path: - with open(path, 'rb') as f: - for chunk in iter(lambda: f.read(_BUFFER_SIZE), b""): - h.update(chunk) - tmp.write(chunk) - else: - h.update(buffer) - tmp.write(buffer) + request = local_cas_pb2.CaptureFilesRequest() + if instance_name: + request.instance_name = instance_name - tmp.flush() + request.path.append(path) - digest.hash = h.hexdigest() - digest.size_bytes = os.fstat(tmp.fileno()).st_size + local_cas = self._get_local_cas() - # Place file at final location - objpath = self.objpath(digest) - os.makedirs(os.path.dirname(objpath), exist_ok=True) - os.link(tmp.name, objpath) + response = local_cas.CaptureFiles(request) - except FileExistsError: - # We can ignore the failed link() if the object is already in the repo. - pass + if len(response.responses) != 1: + raise CASCacheError("Expected 1 response from CaptureFiles, got {}".format(len(response.responses))) - except OSError as e: - raise CASCacheError("Failed to hash object: {}".format(e)) from e + blob_response = response.responses[0] + if blob_response.status.code == code_pb2.RESOURCE_EXHAUSTED: + raise CASCacheError("Cache too full", reason="cache-too-full") + elif blob_response.status.code != code_pb2.OK: + raise CASCacheError("Failed to capture blob {}: {}".format(path, blob_response.status.code)) + digest.CopyFrom(blob_response.digest) return digest @@ -472,41 +531,6 @@ class CASCache(): except FileNotFoundError as e: raise CASCacheError("Attempt to access unavailable ref: {}".format(e)) from e - # list_objects(): - # - # List cached objects in Least Recently Modified (LRM) order. - # - # Returns: - # (list) - A list of objects and timestamps in LRM order - # - def list_objects(self): - objs = [] - mtimes = [] - - for root, _, files in os.walk(os.path.join(self.casdir, 'objects')): - for filename in files: - obj_path = os.path.join(root, filename) - try: - mtimes.append(os.path.getmtime(obj_path)) - except FileNotFoundError: - pass - else: - objs.append(obj_path) - - # NOTE: Sorted will sort from earliest to latest, thus the - # first element of this list will be the file modified earliest. - return sorted(zip(mtimes, objs)) - - def clean_up_refs_until(self, time): - ref_heads = os.path.join(self.casdir, 'refs', 'heads') - - for root, _, files in os.walk(ref_heads): - for filename in files: - ref_path = os.path.join(root, filename) - # Obtain the mtime (the time a file was last modified) - if os.path.getmtime(ref_path) < time: - os.unlink(ref_path) - # remove(): # # Removes the given symbolic ref from the repo. @@ -515,75 +539,14 @@ class CASCache(): # ref (str): A symbolic ref # basedir (str): Path of base directory the ref is in, defaults to # CAS refs heads - # defer_prune (bool): Whether to defer pruning to the caller. NOTE: - # The space won't be freed until you manually - # call prune. # - # Returns: - # (int|None) The amount of space pruned from the repository in - # Bytes, or None if defer_prune is True - # - def remove(self, ref, *, basedir=None, defer_prune=False): + def remove(self, ref, *, basedir=None): if basedir is None: basedir = os.path.join(self.casdir, 'refs', 'heads') # Remove cache ref self._remove_ref(ref, basedir) - if not defer_prune: - pruned = self.prune() - return pruned - - return None - - # adds callback of iterator over reachable directory digests - def add_reachable_directories_callback(self, callback): - self.__reachable_directory_callbacks.append(callback) - - # adds callbacks of iterator over reachable file digests - def add_reachable_digests_callback(self, callback): - self.__reachable_digest_callbacks.append(callback) - - # prune(): - # - # Prune unreachable objects from the repo. - # - def prune(self): - ref_heads = os.path.join(self.casdir, 'refs', 'heads') - - pruned = 0 - reachable = set() - - # Check which objects are reachable - for root, _, files in os.walk(ref_heads): - for filename in files: - ref_path = os.path.join(root, filename) - ref = os.path.relpath(ref_path, ref_heads) - - tree = self.resolve_ref(ref) - self._reachable_refs_dir(reachable, tree) - - # check callback directory digests that are reachable - for digest_callback in self.__reachable_directory_callbacks: - for digest in digest_callback(): - self._reachable_refs_dir(reachable, digest) - - # check callback file digests that are reachable - for digest_callback in self.__reachable_digest_callbacks: - for digest in digest_callback(): - reachable.add(digest.hash) - - # Prune unreachable objects - for root, _, files in os.walk(os.path.join(self.casdir, 'objects')): - for filename in files: - objhash = os.path.basename(root) + filename - if objhash not in reachable: - obj_path = os.path.join(root, filename) - pruned += os.stat(obj_path).st_size - os.unlink(obj_path) - - return pruned - def update_tree_mtime(self, tree): reachable = set() self._reachable_refs_dir(reachable, tree, update_mtime=True) @@ -860,26 +823,13 @@ class CASCache(): # already in local repository return objpath - with self._temporary_object() as f: - remote._fetch_blob(digest, f) - - added_digest = self.add_object(path=f.name, link_directly=True) - assert added_digest.hash == digest.hash + remote._fetch_blob(digest) return objpath - def _batch_download_complete(self, batch, *, missing_blobs=None): - for digest, data in batch.send(missing_blobs=missing_blobs): - with self._temporary_object() as f: - f.write(data) - f.flush() - - added_digest = self.add_object(path=f.name, link_directly=True) - assert added_digest.hash == digest.hash - # Helper function for _fetch_directory(). def _fetch_directory_batch(self, remote, batch, fetch_queue, fetch_next_queue): - self._batch_download_complete(batch) + batch.send() # All previously scheduled directories are now locally available, # move them to the processing queue. @@ -894,17 +844,8 @@ class CASCache(): if in_local_cache: # Skip download, already in local cache. pass - elif (digest.size_bytes >= remote.max_batch_total_size_bytes or - not remote.batch_read_supported): - # Too large for batch request, download in independent request. - self._ensure_blob(remote, digest) - in_local_cache = True else: - if not batch.add(digest): - # Not enough space left in batch request. - # Complete pending batch first. - batch = self._fetch_directory_batch(remote, batch, fetch_queue, fetch_next_queue) - batch.add(digest) + batch.add(digest) if recursive: if in_local_cache: @@ -955,20 +896,18 @@ class CASCache(): self._fetch_directory_batch(remote, batch, fetch_queue, fetch_next_queue) def _fetch_tree(self, remote, digest): - # download but do not store the Tree object - with utils._tempnamedfile(dir=self.tmpdir) as out: - remote._fetch_blob(digest, out) + objpath = self._ensure_blob(remote, digest) - tree = remote_execution_pb2.Tree() + tree = remote_execution_pb2.Tree() - with open(out.name, 'rb') as f: - tree.ParseFromString(f.read()) + with open(objpath, 'rb') as f: + tree.ParseFromString(f.read()) - tree.children.extend([tree.root]) - for directory in tree.children: - dirbuffer = directory.SerializeToString() - dirdigest = self.add_object(buffer=dirbuffer) - assert dirdigest.size_bytes == len(dirbuffer) + tree.children.extend([tree.root]) + for directory in tree.children: + dirbuffer = directory.SerializeToString() + dirdigest = self.add_object(buffer=dirbuffer) + assert dirdigest.size_bytes == len(dirbuffer) return dirdigest @@ -990,27 +929,9 @@ class CASCache(): batch = _CASBatchRead(remote) for digest in digests: - if (digest.size_bytes >= remote.max_batch_total_size_bytes or - not remote.batch_read_supported): - # Too large for batch request, download in independent request. - try: - self._ensure_blob(remote, digest) - except grpc.RpcError as e: - if e.code() == grpc.StatusCode.NOT_FOUND: - missing_blobs.append(digest) - else: - raise CASCacheError("Failed to fetch blob: {}".format(e)) from e - else: - if not batch.add(digest): - # Not enough space left in batch request. - # Complete pending batch first. - self._batch_download_complete(batch, missing_blobs=missing_blobs) - - batch = _CASBatchRead(remote) - batch.add(digest) + batch.add(digest) - # Complete last pending batch - self._batch_download_complete(batch, missing_blobs=missing_blobs) + batch.send(missing_blobs=missing_blobs) return missing_blobs @@ -1022,390 +943,19 @@ class CASCache(): # remote (CASRemote): The remote repository to upload to # digests (list): The Digests of Blobs to upload # - def send_blobs(self, remote, digests, u_uid=uuid.uuid4()): + def send_blobs(self, remote, digests): batch = _CASBatchUpdate(remote) for digest in digests: - with open(self.objpath(digest), 'rb') as f: - assert os.fstat(f.fileno()).st_size == digest.size_bytes + batch.add(digest) - if (digest.size_bytes >= remote.max_batch_total_size_bytes or - not remote.batch_update_supported): - # Too large for batch request, upload in independent request. - remote._send_blob(digest, f, u_uid=u_uid) - else: - if not batch.add(digest, f): - # Not enough space left in batch request. - # Complete pending batch first. - batch.send() - batch = _CASBatchUpdate(remote) - batch.add(digest, f) - - # Send final batch batch.send() - def _send_directory(self, remote, digest, u_uid=uuid.uuid4()): + def _send_directory(self, remote, digest): missing_blobs = self.remote_missing_blobs_for_directory(remote, digest) # Upload any blobs missing on the server - self.send_blobs(remote, missing_blobs, u_uid) - - -class CASQuota: - def __init__(self, context): - self.context = context - self.cas = context.get_cascache() - self.casdir = self.cas.casdir - self._config_cache_quota = context.config_cache_quota - self._config_cache_quota_string = context.config_cache_quota_string - self._cache_size = None # The current cache size, sometimes it's an estimate - self._cache_quota = None # The cache quota - self._cache_quota_original = None # The cache quota as specified by the user, in bytes - self._cache_quota_headroom = None # The headroom in bytes before reaching the quota or full disk - self._cache_lower_threshold = None # The target cache size for a cleanup - - self._message = context.messenger.message - - self._remove_callbacks = [] # Callbacks to remove unrequired refs and their remove method - self._list_refs_callbacks = [] # Callbacks to all refs - - self._calculate_cache_quota() - - # compute_cache_size() - # - # Computes the real artifact cache size. - # - # Returns: - # (int): The size of the artifact cache. - # - def compute_cache_size(self): - self._cache_size = utils._get_dir_size(self.casdir) - return self._cache_size - - # get_cache_size() - # - # Fetches the cached size of the cache, this is sometimes - # an estimate and periodically adjusted to the real size - # when a cache size calculation job runs. - # - # When it is an estimate, the value is either correct, or - # it is greater than the actual cache size. - # - # Returns: - # (int) An approximation of the artifact cache size, in bytes. - # - def get_cache_size(self): - - # If we don't currently have an estimate, figure out the real cache size. - if self._cache_size is None: - stored_size = self._read_cache_size() - if stored_size is not None: - self._cache_size = stored_size - else: - self.compute_cache_size() - - return self._cache_size - - # set_cache_size() - # - # Forcefully set the overall cache size. - # - # This is used to update the size in the main process after - # having calculated in a cleanup or a cache size calculation job. - # - # Args: - # cache_size (int): The size to set. - # write_to_disk (bool): Whether to write the value to disk. - # - def set_cache_size(self, cache_size, *, write_to_disk=True): - - assert cache_size is not None - - self._cache_size = cache_size - if write_to_disk: - self._write_cache_size(self._cache_size) - - # full() - # - # Checks if the artifact cache is full, either - # because the user configured quota has been exceeded - # or because the underlying disk is almost full. - # - # Returns: - # (bool): True if the artifact cache is full - # - def full(self): - - if self.get_cache_size() > self._cache_quota: - return True - - _, volume_avail = self._get_cache_volume_size() - if volume_avail < self._cache_quota_headroom: - return True - - return False - - # add_remove_callbacks() - # - # This adds tuples of iterators over unrequired objects (currently - # artifacts and source refs), and a callback to remove them. - # - # Args: - # callback (iter(unrequired), remove): tuple of iterator and remove - # method associated. - # - def add_remove_callbacks(self, list_unrequired, remove_method): - self._remove_callbacks.append((list_unrequired, remove_method)) - - def add_list_refs_callback(self, list_callback): - self._list_refs_callbacks.append(list_callback) - - ################################################ - # Local Private Methods # - ################################################ - - # _read_cache_size() - # - # Reads and returns the size of the artifact cache that's stored in the - # cache's size file - # - # Returns: - # (int): The size of the artifact cache, as recorded in the file - # - def _read_cache_size(self): - size_file_path = os.path.join(self.casdir, CACHE_SIZE_FILE) - - if not os.path.exists(size_file_path): - return None - - with open(size_file_path, "r") as f: - size = f.read() - - try: - num_size = int(size) - except ValueError as e: - raise CASCacheError("Size '{}' parsed from '{}' was not an integer".format( - size, size_file_path)) from e - - return num_size - - # _write_cache_size() - # - # Writes the given size of the artifact to the cache's size file - # - # Args: - # size (int): The size of the artifact cache to record - # - def _write_cache_size(self, size): - assert isinstance(size, int) - size_file_path = os.path.join(self.casdir, CACHE_SIZE_FILE) - with utils.save_file_atomic(size_file_path, "w", tempdir=self.cas.tmpdir) as f: - f.write(str(size)) - - # _get_cache_volume_size() - # - # Get the available space and total space for the volume on - # which the artifact cache is located. - # - # Returns: - # (int): The total number of bytes on the volume - # (int): The number of available bytes on the volume - # - # NOTE: We use this stub to allow the test cases - # to override what an artifact cache thinks - # about it's disk size and available bytes. - # - def _get_cache_volume_size(self): - return utils._get_volume_size(self.casdir) - - # _calculate_cache_quota() - # - # Calculates and sets the cache quota and lower threshold based on the - # quota set in Context. - # It checks that the quota is both a valid expression, and that there is - # enough disk space to satisfy that quota - # - def _calculate_cache_quota(self): - # Headroom intended to give BuildStream a bit of leeway. - # This acts as the minimum size of cache_quota and also - # is taken from the user requested cache_quota. - # - if self.context.is_running_in_test_suite: - self._cache_quota_headroom = 0 - else: - self._cache_quota_headroom = 2e9 - - total_size, available_space = self._get_cache_volume_size() - cache_size = self.get_cache_size() - - # Ensure system has enough storage for the cache_quota - # - # If cache_quota is none, set it to the maximum it could possibly be. - # - # Also check that cache_quota is at least as large as our headroom. - # - cache_quota = self._config_cache_quota - if cache_quota is None: - # The user has set no limit, so we may take all the space. - cache_quota = min(cache_size + available_space, total_size) - if cache_quota < self._cache_quota_headroom: # Check minimum - raise LoadError("Invalid cache quota ({}): BuildStream requires a minimum cache quota of {}." - .format(utils._pretty_size(cache_quota), utils._pretty_size(self._cache_quota_headroom)), - LoadErrorReason.INVALID_DATA) - elif cache_quota > total_size: - # A quota greater than the total disk size is certianly an error - raise CASCacheError("Your system does not have enough available " + - "space to support the cache quota specified.", - detail=("You have specified a quota of {quota} total disk space.\n" + - "The filesystem containing {local_cache_path} only " + - "has {total_size} total disk space.") - .format( - quota=self._config_cache_quota, - local_cache_path=self.casdir, - total_size=utils._pretty_size(total_size)), - reason='insufficient-storage-for-quota') - - elif cache_quota > cache_size + available_space: - # The quota does not fit in the available space, this is a warning - if '%' in self._config_cache_quota_string: - available = (available_space / total_size) * 100 - available = '{}% of total disk space'.format(round(available, 1)) - else: - available = utils._pretty_size(available_space) - - self._message(Message( - MessageType.WARN, - "Your system does not have enough available " + - "space to support the cache quota specified.", - detail=("You have specified a quota of {quota} total disk space.\n" + - "The filesystem containing {local_cache_path} only " + - "has {available_size} available.") - .format(quota=self._config_cache_quota, - local_cache_path=self.casdir, - available_size=available))) - - # Place a slight headroom (2e9 (2GB) on the cache_quota) into - # cache_quota to try and avoid exceptions. - # - # Of course, we might still end up running out during a build - # if we end up writing more than 2G, but hey, this stuff is - # already really fuzzy. - # - self._cache_quota_original = cache_quota - self._cache_quota = cache_quota - self._cache_quota_headroom - self._cache_lower_threshold = self._cache_quota / 2 - - # clean(): - # - # Clean the artifact cache as much as possible. - # - # Args: - # progress (callable): A callback to call when a ref is removed - # - # Returns: - # (int): The size of the cache after having cleaned up - # - def clean(self, progress=None): - context = self.context - - # Some accumulative statistics - removed_ref_count = 0 - space_saved = 0 - - total_refs = 0 - for refs in self._list_refs_callbacks: - total_refs += len(list(refs())) - - # Start off with an announcement with as much info as possible - volume_size, volume_avail = self._get_cache_volume_size() - self._message(Message( - MessageType.STATUS, "Starting cache cleanup", - detail=("Elements required by the current build plan:\n" + "{}\n" + - "User specified quota: {} ({})\n" + - "Cache usage: {}\n" + - "Cache volume: {} total, {} available") - .format( - total_refs, - context.config_cache_quota, - utils._pretty_size(self._cache_quota, dec_places=2), - utils._pretty_size(self.get_cache_size(), dec_places=2), - utils._pretty_size(volume_size, dec_places=2), - utils._pretty_size(volume_avail, dec_places=2)))) - - # Do a real computation of the cache size once, just in case - self.compute_cache_size() - usage = CASCacheUsage(self) - self._message(Message(MessageType.STATUS, - "Cache usage recomputed: {}".format(usage))) - - # Collect digests and their remove method - all_unrequired_refs = [] - for (unrequired_refs, remove) in self._remove_callbacks: - for (mtime, ref) in unrequired_refs(): - all_unrequired_refs.append((mtime, ref, remove)) - - # Pair refs and their remove method sorted in time order - all_unrequired_refs = [(ref, remove) for (_, ref, remove) in sorted(all_unrequired_refs)] - - # Go through unrequired refs and remove them, oldest first - made_space = False - for (ref, remove) in all_unrequired_refs: - size = remove(ref) - removed_ref_count += 1 - space_saved += size - - self._message(Message( - MessageType.STATUS, - "Freed {: <7} {}".format( - utils._pretty_size(size, dec_places=2), - ref))) - - self.set_cache_size(self._cache_size - size) - - # User callback - # - # Currently this process is fairly slow, but we should - # think about throttling this progress() callback if this - # becomes too intense. - if progress: - progress() - - if self.get_cache_size() < self._cache_lower_threshold: - made_space = True - break - - if not made_space and self.full(): - # If too many artifacts are required, and we therefore - # can't remove them, we have to abort the build. - # - # FIXME: Asking the user what to do may be neater - # - default_conf = os.path.join(os.environ['XDG_CONFIG_HOME'], - 'buildstream.conf') - detail = ("Aborted after removing {} refs and saving {} disk space.\n" - "The remaining {} in the cache is required by the {} references in your build plan\n\n" - "There is not enough space to complete the build.\n" - "Please increase the cache-quota in {} and/or make more disk space." - .format(removed_ref_count, - utils._pretty_size(space_saved, dec_places=2), - utils._pretty_size(self.get_cache_size(), dec_places=2), - total_refs, - (context.config_origin or default_conf))) - - raise CASCacheError("Cache too full. Aborting.", - detail=detail, - reason="cache-too-full") - - # Informational message about the side effects of the cleanup - self._message(Message( - MessageType.INFO, "Cleanup completed", - detail=("Removed {} refs and saving {} disk space.\n" + - "Cache usage is now: {}") - .format(removed_ref_count, - utils._pretty_size(space_saved, dec_places=2), - utils._pretty_size(self.get_cache_size(), dec_places=2)))) - - return self.get_cache_size() + self.send_blobs(remote, missing_blobs) def _grouper(iterable, n): diff --git a/src/buildstream/_cas/casremote.py b/src/buildstream/_cas/casremote.py index 1c7e3152d..ab26d32c7 100644 --- a/src/buildstream/_cas/casremote.py +++ b/src/buildstream/_cas/casremote.py @@ -1,16 +1,14 @@ from collections import namedtuple -import io import os import multiprocessing import signal from urllib.parse import urlparse -import uuid import grpc from .._protos.google.rpc import code_pb2 -from .._protos.google.bytestream import bytestream_pb2, bytestream_pb2_grpc from .._protos.build.bazel.remote.execution.v2 import remote_execution_pb2, remote_execution_pb2_grpc +from .._protos.build.buildgrid import local_cas_pb2 from .._protos.buildstream.v2 import buildstream_pb2, buildstream_pb2_grpc from .._exceptions import CASRemoteError, LoadError, LoadErrorReason @@ -77,24 +75,29 @@ class BlobNotFound(CASRemoteError): # Represents a single remote CAS cache. # class CASRemote(): - def __init__(self, spec): + def __init__(self, spec, cascache): self.spec = spec self._initialized = False + self.cascache = cascache self.channel = None self.instance_name = None - self.bytestream = None self.cas = None self.ref_storage = None self.batch_update_supported = None self.batch_read_supported = None self.capabilities = None self.max_batch_total_size_bytes = None + self.local_cas_instance_name = None def init(self): if not self._initialized: # gRPC doesn't support fork without exec, which is used in the main process. assert not utils._is_main_process() + server_cert_bytes = None + client_key_bytes = None + client_cert_bytes = None + url = urlparse(self.spec.url) if url.scheme == 'http': port = url.port or 80 @@ -105,20 +108,14 @@ class CASRemote(): if self.spec.server_cert: with open(self.spec.server_cert, 'rb') as f: server_cert_bytes = f.read() - else: - server_cert_bytes = None if self.spec.client_key: with open(self.spec.client_key, 'rb') as f: client_key_bytes = f.read() - else: - client_key_bytes = None if self.spec.client_cert: with open(self.spec.client_cert, 'rb') as f: client_cert_bytes = f.read() - else: - client_cert_bytes = None credentials = grpc.ssl_channel_credentials(root_certificates=server_cert_bytes, private_key=client_key_bytes, @@ -129,7 +126,6 @@ class CASRemote(): self.instance_name = self.spec.instance_name or None - self.bytestream = bytestream_pb2_grpc.ByteStreamStub(self.channel) self.cas = remote_execution_pb2_grpc.ContentAddressableStorageStub(self.channel) self.capabilities = remote_execution_pb2_grpc.CapabilitiesStub(self.channel) self.ref_storage = buildstream_pb2_grpc.ReferenceStorageStub(self.channel) @@ -173,6 +169,20 @@ class CASRemote(): e.code() != grpc.StatusCode.PERMISSION_DENIED): raise + local_cas = self.cascache._get_local_cas() + request = local_cas_pb2.GetInstanceNameForRemoteRequest() + request.url = self.spec.url + if self.spec.instance_name: + request.instance_name = self.spec.instance_name + if server_cert_bytes: + request.server_cert = server_cert_bytes + if client_key_bytes: + request.client_key = client_key_bytes + if client_cert_bytes: + request.client_cert = client_cert_bytes + response = local_cas.GetInstanceNameForRemote(request) + self.local_cas_instance_name = response.instance_name + self._initialized = True # check_remote @@ -182,11 +192,11 @@ class CASRemote(): # in the main BuildStream process # See https://github.com/grpc/grpc/blob/master/doc/fork_support.md for details @classmethod - def check_remote(cls, remote_spec, q): + def check_remote(cls, remote_spec, cascache, q): def __check_remote(): try: - remote = cls(remote_spec) + remote = cls(remote_spec, cascache) remote.init() request = buildstream_pb2.StatusRequest() @@ -235,66 +245,29 @@ class CASRemote(): def push_message(self, message): message_buffer = message.SerializeToString() - message_digest = utils._message_digest(message_buffer) self.init() - with io.BytesIO(message_buffer) as b: - self._send_blob(message_digest, b) - - return message_digest + return self.cascache.add_object(buffer=message_buffer, instance_name=self.local_cas_instance_name) ################################################ # Local Private Methods # ################################################ - def _fetch_blob(self, digest, stream): - if self.instance_name: - resource_name = '/'.join([self.instance_name, 'blobs', - digest.hash, str(digest.size_bytes)]) - else: - resource_name = '/'.join(['blobs', - digest.hash, str(digest.size_bytes)]) - - request = bytestream_pb2.ReadRequest() - request.resource_name = resource_name - request.read_offset = 0 - for response in self.bytestream.Read(request): - stream.write(response.data) - stream.flush() - - assert digest.size_bytes == os.fstat(stream.fileno()).st_size - - def _send_blob(self, digest, stream, u_uid=uuid.uuid4()): - if self.instance_name: - resource_name = '/'.join([self.instance_name, 'uploads', str(u_uid), 'blobs', - digest.hash, str(digest.size_bytes)]) - else: - resource_name = '/'.join(['uploads', str(u_uid), 'blobs', - digest.hash, str(digest.size_bytes)]) - - def request_stream(resname, instream): - offset = 0 - finished = False - remaining = digest.size_bytes - while not finished: - chunk_size = min(remaining, _MAX_PAYLOAD_BYTES) - remaining -= chunk_size - - request = bytestream_pb2.WriteRequest() - request.write_offset = offset - # max. _MAX_PAYLOAD_BYTES chunks - request.data = instream.read(chunk_size) - request.resource_name = resname - request.finish_write = remaining <= 0 - - yield request - - offset += chunk_size - finished = request.finish_write - - response = self.bytestream.Write(request_stream(resource_name, stream)) - - assert response.committed_size == digest.size_bytes + def _fetch_blob(self, digest): + local_cas = self.cascache._get_local_cas() + request = local_cas_pb2.FetchMissingBlobsRequest() + request.instance_name = self.local_cas_instance_name + request_digest = request.blob_digests.add() + request_digest.CopyFrom(digest) + response = local_cas.FetchMissingBlobs(request) + for blob_response in response.responses: + if blob_response.status.code == code_pb2.NOT_FOUND: + raise BlobNotFound(response.digest.hash, "Failed to download blob {}: {}".format( + blob_response.digest.hash, blob_response.status.code)) + + if blob_response.status.code != code_pb2.OK: + raise CASRemoteError("Failed to download blob {}: {}".format( + blob_response.digest.hash, blob_response.status.code)) # Represents a batch of blobs queued for fetching. @@ -302,35 +275,25 @@ class CASRemote(): class _CASBatchRead(): def __init__(self, remote): self._remote = remote - self._max_total_size_bytes = remote.max_batch_total_size_bytes - self._request = remote_execution_pb2.BatchReadBlobsRequest() - if remote.instance_name: - self._request.instance_name = remote.instance_name - self._size = 0 + self._request = local_cas_pb2.FetchMissingBlobsRequest() + self._request.instance_name = remote.local_cas_instance_name self._sent = False def add(self, digest): assert not self._sent - new_batch_size = self._size + digest.size_bytes - if new_batch_size > self._max_total_size_bytes: - # Not enough space left in current batch - return False - - request_digest = self._request.digests.add() - request_digest.hash = digest.hash - request_digest.size_bytes = digest.size_bytes - self._size = new_batch_size - return True + request_digest = self._request.blob_digests.add() + request_digest.CopyFrom(digest) def send(self, *, missing_blobs=None): assert not self._sent self._sent = True - if not self._request.digests: + if not self._request.blob_digests: return - batch_response = self._remote.cas.BatchReadBlobs(self._request) + local_cas = self._remote.cascache._get_local_cas() + batch_response = local_cas.FetchMissingBlobs(self._request) for response in batch_response.responses: if response.status.code == code_pb2.NOT_FOUND: @@ -347,46 +310,38 @@ class _CASBatchRead(): raise CASRemoteError("Failed to download blob {}: expected {} bytes, received {} bytes".format( response.digest.hash, response.digest.size_bytes, len(response.data))) - yield (response.digest, response.data) - # Represents a batch of blobs queued for upload. # class _CASBatchUpdate(): def __init__(self, remote): self._remote = remote - self._max_total_size_bytes = remote.max_batch_total_size_bytes - self._request = remote_execution_pb2.BatchUpdateBlobsRequest() - if remote.instance_name: - self._request.instance_name = remote.instance_name - self._size = 0 + self._request = local_cas_pb2.UploadMissingBlobsRequest() + self._request.instance_name = remote.local_cas_instance_name self._sent = False - def add(self, digest, stream): + def add(self, digest): assert not self._sent - new_batch_size = self._size + digest.size_bytes - if new_batch_size > self._max_total_size_bytes: - # Not enough space left in current batch - return False - - blob_request = self._request.requests.add() - blob_request.digest.hash = digest.hash - blob_request.digest.size_bytes = digest.size_bytes - blob_request.data = stream.read(digest.size_bytes) - self._size = new_batch_size - return True + request_digest = self._request.blob_digests.add() + request_digest.CopyFrom(digest) def send(self): assert not self._sent self._sent = True - if not self._request.requests: + if not self._request.blob_digests: return - batch_response = self._remote.cas.BatchUpdateBlobs(self._request) + local_cas = self._remote.cascache._get_local_cas() + batch_response = local_cas.UploadMissingBlobs(self._request) for response in batch_response.responses: if response.status.code != code_pb2.OK: + if response.status.code == code_pb2.RESOURCE_EXHAUSTED: + reason = "cache-too-full" + else: + reason = None + raise CASRemoteError("Failed to upload blob {}: {}".format( - response.digest.hash, response.status.code)) + response.digest.hash, response.status.code), reason=reason) diff --git a/src/buildstream/_cas/casserver.py b/src/buildstream/_cas/casserver.py index 9606c263b..5a4c2b7ac 100644 --- a/src/buildstream/_cas/casserver.py +++ b/src/buildstream/_cas/casserver.py @@ -18,14 +18,13 @@ # Jürg Billeter <juerg.billeter@codethink.co.uk> from concurrent import futures -import logging +from contextlib import contextmanager import os import signal import sys import tempfile import uuid import errno -import threading import grpc from google.protobuf.message import DecodeError @@ -38,7 +37,7 @@ from .._protos.buildstream.v2 import buildstream_pb2, buildstream_pb2_grpc, \ artifact_pb2, artifact_pb2_grpc, source_pb2, source_pb2_grpc from .. import utils -from .._exceptions import CASError +from .._exceptions import CASError, CASCacheError from .cascache import CASCache @@ -48,11 +47,6 @@ from .cascache import CASCache _MAX_PAYLOAD_BYTES = 1024 * 1024 -# Trying to push an artifact that is too large -class ArtifactTooLargeException(Exception): - pass - - # create_server(): # # Create gRPC CAS artifact server as specified in the Remote Execution API. @@ -61,47 +55,49 @@ class ArtifactTooLargeException(Exception): # repo (str): Path to CAS repository # enable_push (bool): Whether to allow blob uploads and artifact updates # -def create_server(repo, *, enable_push, - max_head_size=int(10e9), - min_head_size=int(2e9)): - cas = CASCache(os.path.abspath(repo)) - artifactdir = os.path.join(os.path.abspath(repo), 'artifacts', 'refs') - sourcedir = os.path.join(os.path.abspath(repo), 'source_protos') +@contextmanager +def create_server(repo, *, enable_push, quota): + cas = CASCache(os.path.abspath(repo), cache_quota=quota, protect_session_blobs=False) + + try: + artifactdir = os.path.join(os.path.abspath(repo), 'artifacts', 'refs') + sourcedir = os.path.join(os.path.abspath(repo), 'source_protos') - # Use max_workers default from Python 3.5+ - max_workers = (os.cpu_count() or 1) * 5 - server = grpc.server(futures.ThreadPoolExecutor(max_workers)) + # Use max_workers default from Python 3.5+ + max_workers = (os.cpu_count() or 1) * 5 + server = grpc.server(futures.ThreadPoolExecutor(max_workers)) - cache_cleaner = _CacheCleaner(cas, max_head_size, min_head_size) + bytestream_pb2_grpc.add_ByteStreamServicer_to_server( + _ByteStreamServicer(cas, enable_push=enable_push), server) - bytestream_pb2_grpc.add_ByteStreamServicer_to_server( - _ByteStreamServicer(cas, cache_cleaner, enable_push=enable_push), server) + remote_execution_pb2_grpc.add_ContentAddressableStorageServicer_to_server( + _ContentAddressableStorageServicer(cas, enable_push=enable_push), server) - remote_execution_pb2_grpc.add_ContentAddressableStorageServicer_to_server( - _ContentAddressableStorageServicer(cas, cache_cleaner, enable_push=enable_push), server) + remote_execution_pb2_grpc.add_CapabilitiesServicer_to_server( + _CapabilitiesServicer(), server) - remote_execution_pb2_grpc.add_CapabilitiesServicer_to_server( - _CapabilitiesServicer(), server) + buildstream_pb2_grpc.add_ReferenceStorageServicer_to_server( + _ReferenceStorageServicer(cas, enable_push=enable_push), server) - buildstream_pb2_grpc.add_ReferenceStorageServicer_to_server( - _ReferenceStorageServicer(cas, enable_push=enable_push), server) + artifact_pb2_grpc.add_ArtifactServiceServicer_to_server( + _ArtifactServicer(cas, artifactdir), server) - artifact_pb2_grpc.add_ArtifactServiceServicer_to_server( - _ArtifactServicer(cas, artifactdir), server) + source_pb2_grpc.add_SourceServiceServicer_to_server( + _SourceServicer(sourcedir), server) - source_pb2_grpc.add_SourceServiceServicer_to_server( - _SourceServicer(sourcedir), server) + # Create up reference storage and artifact capabilities + artifact_capabilities = buildstream_pb2.ArtifactCapabilities( + allow_updates=enable_push) + source_capabilities = buildstream_pb2.SourceCapabilities( + allow_updates=enable_push) + buildstream_pb2_grpc.add_CapabilitiesServicer_to_server( + _BuildStreamCapabilitiesServicer(artifact_capabilities, source_capabilities), + server) - # Create up reference storage and artifact capabilities - artifact_capabilities = buildstream_pb2.ArtifactCapabilities( - allow_updates=enable_push) - source_capabilities = buildstream_pb2.SourceCapabilities( - allow_updates=enable_push) - buildstream_pb2_grpc.add_CapabilitiesServicer_to_server( - _BuildStreamCapabilitiesServicer(artifact_capabilities, source_capabilities), - server) + yield server - return server + finally: + cas.release_resources() @click.command(short_help="CAS Artifact Server") @@ -111,65 +107,65 @@ def create_server(repo, *, enable_push, @click.option('--client-certs', help="Public client certificates for TLS (PEM-encoded)") @click.option('--enable-push', default=False, is_flag=True, help="Allow clients to upload blobs and update artifact cache") -@click.option('--head-room-min', type=click.INT, - help="Disk head room minimum in bytes", - default=2e9) -@click.option('--head-room-max', type=click.INT, - help="Disk head room maximum in bytes", +@click.option('--quota', type=click.INT, + help="Maximum disk usage in bytes", default=10e9) @click.argument('repo') def server_main(repo, port, server_key, server_cert, client_certs, enable_push, - head_room_min, head_room_max): - server = create_server(repo, - max_head_size=head_room_max, - min_head_size=head_room_min, - enable_push=enable_push) - - use_tls = bool(server_key) - - if bool(server_cert) != use_tls: - click.echo("ERROR: --server-key and --server-cert are both required for TLS", err=True) - sys.exit(-1) - - if client_certs and not use_tls: - click.echo("ERROR: --client-certs can only be used with --server-key", err=True) - sys.exit(-1) - - if use_tls: - # Read public/private key pair - with open(server_key, 'rb') as f: - server_key_bytes = f.read() - with open(server_cert, 'rb') as f: - server_cert_bytes = f.read() - - if client_certs: - with open(client_certs, 'rb') as f: - client_certs_bytes = f.read() - else: - client_certs_bytes = None + quota): + # Handle SIGTERM by calling sys.exit(0), which will raise a SystemExit exception, + # properly executing cleanup code in `finally` clauses and context managers. + # This is required to terminate buildbox-casd on SIGTERM. + signal.signal(signal.SIGTERM, lambda signalnum, frame: sys.exit(0)) + + with create_server(repo, + quota=quota, + enable_push=enable_push) as server: + + use_tls = bool(server_key) + + if bool(server_cert) != use_tls: + click.echo("ERROR: --server-key and --server-cert are both required for TLS", err=True) + sys.exit(-1) + + if client_certs and not use_tls: + click.echo("ERROR: --client-certs can only be used with --server-key", err=True) + sys.exit(-1) + + if use_tls: + # Read public/private key pair + with open(server_key, 'rb') as f: + server_key_bytes = f.read() + with open(server_cert, 'rb') as f: + server_cert_bytes = f.read() + + if client_certs: + with open(client_certs, 'rb') as f: + client_certs_bytes = f.read() + else: + client_certs_bytes = None - credentials = grpc.ssl_server_credentials([(server_key_bytes, server_cert_bytes)], - root_certificates=client_certs_bytes, - require_client_auth=bool(client_certs)) - server.add_secure_port('[::]:{}'.format(port), credentials) - else: - server.add_insecure_port('[::]:{}'.format(port)) + credentials = grpc.ssl_server_credentials([(server_key_bytes, server_cert_bytes)], + root_certificates=client_certs_bytes, + require_client_auth=bool(client_certs)) + server.add_secure_port('[::]:{}'.format(port), credentials) + else: + server.add_insecure_port('[::]:{}'.format(port)) - # Run artifact server - server.start() - try: - while True: - signal.pause() - except KeyboardInterrupt: - server.stop(0) + # Run artifact server + server.start() + try: + while True: + signal.pause() + finally: + server.stop(0) class _ByteStreamServicer(bytestream_pb2_grpc.ByteStreamServicer): - def __init__(self, cas, cache_cleaner, *, enable_push): + def __init__(self, cas, *, enable_push): super().__init__() self.cas = cas self.enable_push = enable_push - self.cache_cleaner = cache_cleaner def Read(self, request, context): resource_name = request.resource_name @@ -188,6 +184,8 @@ class _ByteStreamServicer(bytestream_pb2_grpc.ByteStreamServicer): context.set_code(grpc.StatusCode.NOT_FOUND) return + os.utime(f.fileno()) + if request.read_offset > 0: f.seek(request.read_offset) @@ -230,12 +228,6 @@ class _ByteStreamServicer(bytestream_pb2_grpc.ByteStreamServicer): while True: if client_digest.size_bytes == 0: break - try: - self.cache_cleaner.clean_up(client_digest.size_bytes) - except ArtifactTooLargeException as e: - context.set_code(grpc.StatusCode.RESOURCE_EXHAUSTED) - context.set_details(str(e)) - return response try: os.posix_fallocate(out.fileno(), 0, client_digest.size_bytes) @@ -262,10 +254,20 @@ class _ByteStreamServicer(bytestream_pb2_grpc.ByteStreamServicer): context.set_code(grpc.StatusCode.FAILED_PRECONDITION) return response out.flush() - digest = self.cas.add_object(path=out.name, link_directly=True) + + try: + digest = self.cas.add_object(path=out.name, link_directly=True) + except CASCacheError as e: + if e.reason == "cache-too-full": + context.set_code(grpc.StatusCode.RESOURCE_EXHAUSTED) + else: + context.set_code(grpc.StatusCode.INTERNAL) + return response + if digest.hash != client_digest.hash: context.set_code(grpc.StatusCode.FAILED_PRECONDITION) return response + finished = True assert finished @@ -275,11 +277,10 @@ class _ByteStreamServicer(bytestream_pb2_grpc.ByteStreamServicer): class _ContentAddressableStorageServicer(remote_execution_pb2_grpc.ContentAddressableStorageServicer): - def __init__(self, cas, cache_cleaner, *, enable_push): + def __init__(self, cas, *, enable_push): super().__init__() self.cas = cas self.enable_push = enable_push - self.cache_cleaner = cache_cleaner def FindMissingBlobs(self, request, context): response = remote_execution_pb2.FindMissingBlobsResponse() @@ -311,11 +312,14 @@ class _ContentAddressableStorageServicer(remote_execution_pb2_grpc.ContentAddres blob_response.digest.hash = digest.hash blob_response.digest.size_bytes = digest.size_bytes try: - with open(self.cas.objpath(digest), 'rb') as f: + objpath = self.cas.objpath(digest) + with open(objpath, 'rb') as f: if os.fstat(f.fileno()).st_size != digest.size_bytes: blob_response.status.code = code_pb2.NOT_FOUND continue + os.utime(f.fileno()) + blob_response.data = f.read(digest.size_bytes) except FileNotFoundError: blob_response.status.code = code_pb2.NOT_FOUND @@ -347,18 +351,21 @@ class _ContentAddressableStorageServicer(remote_execution_pb2_grpc.ContentAddres blob_response.status.code = code_pb2.FAILED_PRECONDITION continue - try: - self.cache_cleaner.clean_up(digest.size_bytes) + with tempfile.NamedTemporaryFile(dir=self.cas.tmpdir) as out: + out.write(blob_request.data) + out.flush() - with tempfile.NamedTemporaryFile(dir=self.cas.tmpdir) as out: - out.write(blob_request.data) - out.flush() + try: server_digest = self.cas.add_object(path=out.name) - if server_digest.hash != digest.hash: - blob_response.status.code = code_pb2.FAILED_PRECONDITION + except CASCacheError as e: + if e.reason == "cache-too-full": + blob_response.status.code = code_pb2.RESOURCE_EXHAUSTED + else: + blob_response.status.code = code_pb2.INTERNAL + continue - except ArtifactTooLargeException: - blob_response.status.code = code_pb2.RESOURCE_EXHAUSTED + if server_digest.hash != digest.hash: + blob_response.status.code = code_pb2.FAILED_PRECONDITION return response @@ -394,7 +401,7 @@ class _ReferenceStorageServicer(buildstream_pb2_grpc.ReferenceStorageServicer): try: self.cas.update_tree_mtime(tree) except FileNotFoundError: - self.cas.remove(request.key, defer_prune=True) + self.cas.remove(request.key) context.set_code(grpc.StatusCode.NOT_FOUND) return response @@ -602,81 +609,3 @@ def _digest_from_upload_resource_name(resource_name): return digest except ValueError: return None - - -class _CacheCleaner: - - __cleanup_cache_lock = threading.Lock() - - def __init__(self, cas, max_head_size, min_head_size=int(2e9)): - self.__cas = cas - self.__max_head_size = max_head_size - self.__min_head_size = min_head_size - - def __has_space(self, object_size): - stats = os.statvfs(self.__cas.casdir) - free_disk_space = (stats.f_bavail * stats.f_bsize) - self.__min_head_size - total_disk_space = (stats.f_blocks * stats.f_bsize) - self.__min_head_size - - if object_size > total_disk_space: - raise ArtifactTooLargeException("Artifact of size: {} is too large for " - "the filesystem which mounts the remote " - "cache".format(object_size)) - - return object_size <= free_disk_space - - # _clean_up_cache() - # - # Keep removing Least Recently Pushed (LRP) artifacts in a cache until there - # is enough space for the incoming artifact - # - # Args: - # object_size: The size of the object being received in bytes - # - # Returns: - # int: The total bytes removed on the filesystem - # - def clean_up(self, object_size): - if self.__has_space(object_size): - return 0 - - with _CacheCleaner.__cleanup_cache_lock: - if self.__has_space(object_size): - # Another thread has done the cleanup for us - return 0 - - stats = os.statvfs(self.__cas.casdir) - target_disk_space = (stats.f_bavail * stats.f_bsize) - self.__max_head_size - - # obtain a list of LRP artifacts - LRP_objects = self.__cas.list_objects() - - removed_size = 0 # in bytes - last_mtime = 0 - - while object_size - removed_size > target_disk_space: - try: - last_mtime, to_remove = LRP_objects.pop(0) # The first element in the list is the LRP artifact - except IndexError: - # This exception is caught if there are no more artifacts in the list - # LRP_artifacts. This means the the artifact is too large for the filesystem - # so we abort the process - raise ArtifactTooLargeException("Artifact of size {} is too large for " - "the filesystem which mounts the remote " - "cache".format(object_size)) - - try: - size = os.stat(to_remove).st_size - os.unlink(to_remove) - removed_size += size - except FileNotFoundError: - pass - - self.__cas.clean_up_refs_until(last_mtime) - - if removed_size > 0: - logging.info("Successfully removed %d bytes from the cache", removed_size) - else: - logging.info("No artifacts were removed from the cache.") - - return removed_size diff --git a/src/buildstream/_context.py b/src/buildstream/_context.py index c1a3b0619..c6cde4003 100644 --- a/src/buildstream/_context.py +++ b/src/buildstream/_context.py @@ -28,7 +28,7 @@ from ._profile import Topics, PROFILER from ._platform import Platform from ._artifactcache import ArtifactCache from ._sourcecache import SourceCache -from ._cas import CASCache, CASQuota, CASCacheUsage +from ._cas import CASCache from .types import _CacheBuildTrees, _SchedulerErrorAction from ._workspaces import Workspaces, WorkspaceProjectCache from .node import Node @@ -150,6 +150,8 @@ class Context(): # Whether file contents are required for all artifacts in the local cache self.require_artifact_files = True + self.fork_allowed = True + # Whether elements must be rebuilt when their dependencies have changed self._strict_build_plan = None @@ -167,7 +169,6 @@ class Context(): self._workspaces = None self._workspace_project_cache = WorkspaceProjectCache() self._cascache = None - self._casquota = None # __enter__() # @@ -181,7 +182,8 @@ class Context(): # Called when exiting the with-statement context. # def __exit__(self, exc_type, exc_value, traceback): - return None + if self._cascache: + self._cascache.release_resources(self.messenger) # load() # @@ -358,16 +360,6 @@ class Context(): return self._artifactcache - # get_cache_usage() - # - # Fetches the current usage of the artifact cache - # - # Returns: - # (CASCacheUsage): The current status - # - def get_cache_usage(self): - return CASCacheUsage(self.get_casquota()) - @property def sourcecache(self): if not self._sourcecache: @@ -495,10 +487,15 @@ class Context(): def get_cascache(self): if self._cascache is None: - self._cascache = CASCache(self.cachedir) + self._cascache = CASCache(self.cachedir, cache_quota=self.config_cache_quota) return self._cascache - def get_casquota(self): - if self._casquota is None: - self._casquota = CASQuota(self) - return self._casquota + # disable_fork(): + # + # This will prevent the scheduler from running but will allow communication + # with casd in the main process. + # + def disable_fork(self): + self.fork_allowed = False + cascache = self.get_cascache() + cascache.notify_fork_disabled() diff --git a/src/buildstream/_frontend/cli.py b/src/buildstream/_frontend/cli.py index a77bd80e8..ecfea05bb 100644 --- a/src/buildstream/_frontend/cli.py +++ b/src/buildstream/_frontend/cli.py @@ -1242,14 +1242,12 @@ def artifact_list_contents(app, artifacts): # Artifact Delete Command # ################################################################### @artifact.command(name='delete', short_help="Remove artifacts from the local cache") -@click.option('--no-prune', 'no_prune', default=False, is_flag=True, - help="Do not prune the local cache of unreachable refs") @click.argument('artifacts', type=click.Path(), nargs=-1) @click.pass_obj -def artifact_delete(app, artifacts, no_prune): +def artifact_delete(app, artifacts): """Remove artifacts from the local cache""" with app.initialized(): - app.stream.artifact_delete(artifacts, no_prune) + app.stream.artifact_delete(artifacts) ################################################################## diff --git a/src/buildstream/_frontend/status.py b/src/buildstream/_frontend/status.py index 38e388818..d0070cef0 100644 --- a/src/buildstream/_frontend/status.py +++ b/src/buildstream/_frontend/status.py @@ -350,7 +350,7 @@ class _StatusHeader(): # # Public members # - self.lines = 3 + self.lines = 2 # # Private members @@ -413,31 +413,7 @@ class _StatusHeader(): line2 = self._centered(text, size, line_length, ' ') - # - # Line 3: Cache usage percentage report - # - # ~~~~~~ cache: 69% ~~~~~~ - # - usage = self._context.get_cache_usage() - usage_percent = '{}%'.format(usage.used_percent) - - size = 21 - size += len(usage_percent) - if usage.used_percent >= 95: - formatted_usage_percent = self._error_profile.fmt(usage_percent) - elif usage.used_percent >= 80: - formatted_usage_percent = self._content_profile.fmt(usage_percent) - else: - formatted_usage_percent = self._success_profile.fmt(usage_percent) - - text = self._format_profile.fmt("~~~~~~ ") + \ - self._content_profile.fmt('cache') + \ - self._format_profile.fmt(': ') + \ - formatted_usage_percent + \ - self._format_profile.fmt(' ~~~~~~') - line3 = self._centered(text, size, line_length, ' ') - - return line1 + '\n' + line2 + '\n' + line3 + return line1 + '\n' + line2 ################################################### # Private Methods # diff --git a/src/buildstream/_frontend/widget.py b/src/buildstream/_frontend/widget.py index 20f5d1767..955680f9b 100644 --- a/src/buildstream/_frontend/widget.py +++ b/src/buildstream/_frontend/widget.py @@ -475,7 +475,6 @@ class LogLine(Widget): values["Session Start"] = starttime.strftime('%A, %d-%m-%Y at %H:%M:%S') values["Project"] = "{} ({})".format(project.name, project.directory) values["Targets"] = ", ".join([t.name for t in stream.targets]) - values["Cache Usage"] = str(context.get_cache_usage()) text += self._format_values(values) # User configurations diff --git a/src/buildstream/_protos/build/buildgrid/__init__.py b/src/buildstream/_protos/build/buildgrid/__init__.py new file mode 100644 index 000000000..e69de29bb --- /dev/null +++ b/src/buildstream/_protos/build/buildgrid/__init__.py diff --git a/src/buildstream/_protos/build/buildgrid/local_cas.proto b/src/buildstream/_protos/build/buildgrid/local_cas.proto new file mode 100644 index 000000000..671caef0e --- /dev/null +++ b/src/buildstream/_protos/build/buildgrid/local_cas.proto @@ -0,0 +1,373 @@ +// Copyright (C) 2018-2019 Bloomberg LP +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +syntax = "proto3"; + +package build.buildgrid; + +import "build/bazel/remote/execution/v2/remote_execution.proto"; +import "google/api/annotations.proto"; +import "google/rpc/status.proto"; + +service LocalContentAddressableStorage { + // Fetch blobs from a remote CAS to the local cache. + // + // This request is equivalent to ByteStream `Read` or `BatchReadBlobs` + // requests, storing the downloaded blobs in the local cache. + // + // Requested blobs that failed to be downloaded will be listed in the + // response. + // + // Errors: + // * `INVALID_ARGUMENT`: The client attempted to download more than the + // server supported limit. + // + // Individual requests may return the following error, additionally: + // * `NOT_FOUND`: The requested blob is not present in the remote CAS. + rpc FetchMissingBlobs(FetchMissingBlobsRequest) returns (FetchMissingBlobsResponse) {} + + // Upload blobs from the local cache to a remote CAS. + // + // This request is equivalent to `FindMissingBlobs` followed by + // ByteStream `Write` or `BatchUpdateBlobs` requests. + // + // Blobs that failed to be uploaded will be listed in the response. + // + // Errors: + // * `INVALID_ARGUMENT`: The client attempted to upload more than the + // server supported limit. + // + // Individual requests may return the following error, additionally: + // * `NOT_FOUND`: The requested blob is not present in the local cache. + // * `RESOURCE_EXHAUSTED`: There is insufficient disk quota to store the blob. + rpc UploadMissingBlobs(UploadMissingBlobsRequest) returns (UploadMissingBlobsResponse) {} + + // Fetch the entire directory tree rooted at a node from a remote CAS to the + // local cache. + // + // This request is equivalent to `GetTree`, storing the `Directory` objects + // in the local cache. Optionally, this will also fetch all blobs referenced + // by the `Directory` objects, equivalent to `FetchMissingBlobs`. + // + // If part of the tree is missing from the CAS, the server will return the + // portion present and omit the rest. + // + // * `NOT_FOUND`: The requested tree root is not present in the CAS. + rpc FetchTree(FetchTreeRequest) returns (FetchTreeResponse) {} + + // Upload the entire directory tree from the local cache to a remote CAS. + // + // This request is equivalent to `UploadMissingBlobs` for all blobs + // referenced by the specified tree (recursively). + // + // Errors: + // * `NOT_FOUND`: The requested tree root is not present in the local cache. + // * `RESOURCE_EXHAUSTED`: There is insufficient disk quota to store the tree. + rpc UploadTree(UploadTreeRequest) returns (UploadTreeResponse) {} + + // Stage a directory tree in the local filesystem. + // + // This makes the specified directory tree temporarily available for local + // filesystem access. It is implementation-defined whether this uses a + // userspace filesystem such as FUSE, hardlinking or a full copy. + // + // Missing blobs are fetched, if a CAS remote is configured. + // + // The staging starts when the server receives the initial request and + // it is ready to be used on the initial (non-error) response from the + // server. + // + // The server will clean up the staged directory when it either + // receives an additional request (with all fields unset) or when the + // stream is closed. The server will send an additional response after + // cleanup is complete. + rpc StageTree(stream StageTreeRequest) returns (stream StageTreeResponse) {} + + // Capture a directory tree from the local filesystem. + // + // This imports the specified path from the local filesystem into CAS. + // + // If a CAS remote is configured, the blobs are uploaded. + // The `bypass_local_cache` parameter is a hint to indicate whether the blobs + // shall be uploaded without first storing them in the local cache. + rpc CaptureTree(CaptureTreeRequest) returns (CaptureTreeResponse) {} + + // Capture files from the local filesystem. + // + // This imports the specified paths from the local filesystem into CAS. + // + // If a CAS remote is configured, the blobs are uploaded. + // The `bypass_local_cache` parameter is a hint to indicate whether the blobs + // shall be uploaded without first storing them in the local cache. + rpc CaptureFiles(CaptureFilesRequest) returns (CaptureFilesResponse) {} + + // Configure remote CAS endpoint. + // + // This returns a string that can be used as instance_name to access the + // specified endpoint in further requests. + rpc GetInstanceNameForRemote(GetInstanceNameForRemoteRequest) returns (GetInstanceNameForRemoteResponse) {} +} + +// A request message for +// [LocalContentAddressableStorage.FetchMissingBlobs][build.buildgrid.v2.LocalContentAddressableStorage.FetchMissingBlobs]. +message FetchMissingBlobsRequest { + // The instance of the execution system to operate against. A server may + // support multiple instances of the execution system (with their own workers, + // storage, caches, etc.). The server MAY require use of this field to select + // between them in an implementation-defined fashion, otherwise it can be + // omitted. + string instance_name = 1; + + // A list of the blobs to fetch. + repeated build.bazel.remote.execution.v2.Digest blob_digests = 2; +} + +// A response message for +// [LocalContentAddressableStorage.FetchMissingBlobs][build.buildgrid.v2.LocalContentAddressableStorage.FetchMissingBlobs]. +message FetchMissingBlobsResponse { + // A response corresponding to a single blob that the client tried to upload. + message Response { + // The digest to which this response corresponds. + build.bazel.remote.execution.v2.Digest digest = 1; + + // The result of attempting to download that blob. + google.rpc.Status status = 2; + } + + // The responses to the failed requests. + repeated Response responses = 1; +} + +// A request message for +// [LocalContentAddressableStorage.UploadMissingBlobs][build.buildgrid.v2.LocalContentAddressableStorage.UploadMissingBlobs]. +message UploadMissingBlobsRequest { + // The instance of the execution system to operate against. A server may + // support multiple instances of the execution system (with their own workers, + // storage, caches, etc.). The server MAY require use of this field to select + // between them in an implementation-defined fashion, otherwise it can be + // omitted. + string instance_name = 1; + + // A list of the blobs to fetch. + repeated build.bazel.remote.execution.v2.Digest blob_digests = 2; +} + +// A response message for +// [LocalContentAddressableStorage.UploadMissingBlobs][build.buildgrid.v2.LocalContentAddressableStorage.UploadMissingBlobs]. +message UploadMissingBlobsResponse { + // A response corresponding to a single blob that the client tried to upload. + message Response { + // The digest to which this response corresponds. + build.bazel.remote.execution.v2.Digest digest = 1; + + // The result of attempting to upload that blob. + google.rpc.Status status = 2; + } + + // The responses to the failed requests. + repeated Response responses = 1; +} + +// A request message for +// [LocalContentAddressableStorage.FetchTree][build.buildgrid.v2.LocalContentAddressableStorage.FetchTree]. +message FetchTreeRequest { + // The instance of the execution system to operate against. A server may + // support multiple instances of the execution system (with their own workers, + // storage, caches, etc.). The server MAY require use of this field to select + // between them in an implementation-defined fashion, otherwise it can be + // omitted. + string instance_name = 1; + + // The digest of the root, which must be an encoded + // [Directory][build.bazel.remote.execution.v2.Directory] message + // stored in the + // [ContentAddressableStorage][build.bazel.remote.execution.v2.ContentAddressableStorage]. + build.bazel.remote.execution.v2.Digest root_digest = 2; + + // Whether to fetch blobs of files in the directory tree. + bool fetch_file_blobs = 3; +} + +// A response message for +// [LocalContentAddressableStorage.FetchTree][build.buildgrid.v2.LocalContentAddressableStorage.FetchTree]. +message FetchTreeResponse { +} + +// A request message for +// [LocalContentAddressableStorage.UploadTree][build.buildgrid.v2.LocalContentAddressableStorage.UploadTree]. +message UploadTreeRequest { + // The instance of the execution system to operate against. A server may + // support multiple instances of the execution system (with their own workers, + // storage, caches, etc.). The server MAY require use of this field to select + // between them in an implementation-defined fashion, otherwise it can be + // omitted. + string instance_name = 1; + + // The digest of the root, which must be an encoded + // [Directory][build.bazel.remote.execution.v2.Directory] message + // stored in the + // [ContentAddressableStorage][build.bazel.remote.execution.v2.ContentAddressableStorage]. + build.bazel.remote.execution.v2.Digest root_digest = 2; +} + +// A response message for +// [LocalContentAddressableStorage.UploadTree][build.buildgrid.v2.LocalContentAddressableStorage.UploadTree]. +message UploadTreeResponse { +} + +// A request message for +// [LocalContentAddressableStorage.StageTree][build.buildgrid.v2.LocalContentAddressableStorage.StageTree]. +message StageTreeRequest { + // The instance of the execution system to operate against. A server may + // support multiple instances of the execution system (with their own workers, + // storage, caches, etc.). The server MAY require use of this field to select + // between them in an implementation-defined fashion, otherwise it can be + // omitted. + string instance_name = 1; + + // The digest of the root, which must be an encoded + // [Directory][build.bazel.remote.execution.v2.Directory] message + // stored in the + // [ContentAddressableStorage][build.bazel.remote.execution.v2.ContentAddressableStorage]. + build.bazel.remote.execution.v2.Digest root_digest = 2; + + // The path in the local filesystem where the specified tree should be + // staged. It must either point to an empty directory or not exist yet. + // + // This is optional. If not specified, the server will choose a location. + // The server may require the path to be on the same filesystem as the local + // cache to support hardlinking. + // + // The path may be a subdirectory of another staged tree. The lifetime of + // this staged tree will in that case be limited to the lifetime of the + // parent. + string path = 3; +} + +// A response message for +// [LocalContentAddressableStorage.StageTree][build.buildgrid.v2.LocalContentAddressableStorage.StageTree]. +message StageTreeResponse { + // The path in the local filesystem where the specified tree has been staged. + string path = 1; +} + +// A request message for +// [LocalContentAddressableStorage.CaptureTree][build.buildgrid.v2.LocalContentAddressableStorage.CaptureTree]. +message CaptureTreeRequest { + // The instance of the execution system to operate against. A server may + // support multiple instances of the execution system (with their own workers, + // storage, caches, etc.). The server MAY require use of this field to select + // between them in an implementation-defined fashion, otherwise it can be + // omitted. + string instance_name = 1; + + // The path(s) in the local filesystem to capture. + repeated string path = 2; + + // This is a hint whether the blobs shall be uploaded to the remote CAS + // without first storing them in the local cache. + bool bypass_local_cache = 3; +} + +// A response message for +// [LocalContentAddressableStorage.CaptureTree][build.buildgrid.v2.LocalContentAddressableStorage.CaptureTree]. +message CaptureTreeResponse { + // A response corresponding to a single blob that the client tried to upload. + message Response { + // The path to which this response corresponds. + string path = 1; + + // The digest of the captured tree as an encoded + // [Tree][build.bazel.remote.execution.v2.Tree] proto containing the + // directory's contents, if successful. + build.bazel.remote.execution.v2.Digest tree_digest = 2; + + // The result of attempting to capture and upload the tree. + google.rpc.Status status = 3; + } + + // The responses to the requests. + repeated Response responses = 1; +} + +// A request message for +// [LocalContentAddressableStorage.CaptureFiles][build.buildgrid.v2.LocalContentAddressableStorage.CaptureFiles]. +message CaptureFilesRequest { + // The instance of the execution system to operate against. A server may + // support multiple instances of the execution system (with their own workers, + // storage, caches, etc.). The server MAY require use of this field to select + // between them in an implementation-defined fashion, otherwise it can be + // omitted. + string instance_name = 1; + + // The path(s) in the local filesystem to capture. + repeated string path = 2; + + // This is a hint whether the blobs shall be uploaded to the remote CAS + // without first storing them in the local cache. + bool bypass_local_cache = 3; +} + +// A response message for +// [LocalContentAddressableStorage.CaptureFiles][build.buildgrid.v2.LocalContentAddressableStorage.CaptureFiles]. +message CaptureFilesResponse { + // A response corresponding to a single blob that the client tried to upload. + message Response { + // The path to which this response corresponds. + string path = 1; + + // The digest of the captured file's content, if successful. + build.bazel.remote.execution.v2.Digest digest = 2; + + // The result of attempting to capture and upload the file. + google.rpc.Status status = 3; + } + + // The responses to the requests. + repeated Response responses = 1; +} + +// A request message for +// [LocalContentAddressableStorage.GetInstanceNameForRemote][build.buildgrid.v2.LocalContentAddressableStorage.GetInstanceNameForRemote]. +message GetInstanceNameForRemoteRequest { + // The URL for the remote CAS server. + string url = 1; + + // The instance of the execution system to operate against. A server may + // support multiple instances of the execution system (with their own workers, + // storage, caches, etc.). The server MAY require use of this field to select + // between them in an implementation-defined fashion, otherwise it can be + // omitted. + string instance_name = 2; + + // PEM-encoded public server certificate for https connections to the remote + // CAS server. + bytes server_cert = 3; + + // PEM-encoded private client key for https with certificate-based client + // authentication. If this is specified, `client_cert` must be specified + // as well. + bytes client_key = 4; + + // PEM-encoded public client certificate for https with certificate-based + // client authentication. If this is specified, `client_key` must be + // specified as well. + bytes client_cert = 5; +} + +// A response message for +// [LocalContentAddressableStorage.GetInstanceNameForRemote][build.buildgrid.v2.LocalContentAddressableStorage.GetInstanceNameForRemote]. +message GetInstanceNameForRemoteResponse { + string instance_name = 1; +} diff --git a/src/buildstream/_protos/build/buildgrid/local_cas_pb2.py b/src/buildstream/_protos/build/buildgrid/local_cas_pb2.py new file mode 100644 index 000000000..d1ef586f8 --- /dev/null +++ b/src/buildstream/_protos/build/buildgrid/local_cas_pb2.py @@ -0,0 +1,1052 @@ +# -*- coding: utf-8 -*- +# Generated by the protocol buffer compiler. DO NOT EDIT! +# source: build/buildgrid/local_cas.proto + +import sys +_b=sys.version_info[0]<3 and (lambda x:x) or (lambda x:x.encode('latin1')) +from google.protobuf import descriptor as _descriptor +from google.protobuf import message as _message +from google.protobuf import reflection as _reflection +from google.protobuf import symbol_database as _symbol_database +# @@protoc_insertion_point(imports) + +_sym_db = _symbol_database.Default() + + +from buildstream._protos.build.bazel.remote.execution.v2 import remote_execution_pb2 as build_dot_bazel_dot_remote_dot_execution_dot_v2_dot_remote__execution__pb2 +from buildstream._protos.google.api import annotations_pb2 as google_dot_api_dot_annotations__pb2 +from buildstream._protos.google.rpc import status_pb2 as google_dot_rpc_dot_status__pb2 + + +DESCRIPTOR = _descriptor.FileDescriptor( + name='build/buildgrid/local_cas.proto', + package='build.buildgrid', + syntax='proto3', + serialized_options=None, + serialized_pb=_b('\n\x1f\x62uild/buildgrid/local_cas.proto\x12\x0f\x62uild.buildgrid\x1a\x36\x62uild/bazel/remote/execution/v2/remote_execution.proto\x1a\x1cgoogle/api/annotations.proto\x1a\x17google/rpc/status.proto\"p\n\x18\x46\x65tchMissingBlobsRequest\x12\x15\n\rinstance_name\x18\x01 \x01(\t\x12=\n\x0c\x62lob_digests\x18\x02 \x03(\x0b\x32\'.build.bazel.remote.execution.v2.Digest\"\xcc\x01\n\x19\x46\x65tchMissingBlobsResponse\x12\x46\n\tresponses\x18\x01 \x03(\x0b\x32\x33.build.buildgrid.FetchMissingBlobsResponse.Response\x1ag\n\x08Response\x12\x37\n\x06\x64igest\x18\x01 \x01(\x0b\x32\'.build.bazel.remote.execution.v2.Digest\x12\"\n\x06status\x18\x02 \x01(\x0b\x32\x12.google.rpc.Status\"q\n\x19UploadMissingBlobsRequest\x12\x15\n\rinstance_name\x18\x01 \x01(\t\x12=\n\x0c\x62lob_digests\x18\x02 \x03(\x0b\x32\'.build.bazel.remote.execution.v2.Digest\"\xce\x01\n\x1aUploadMissingBlobsResponse\x12G\n\tresponses\x18\x01 \x03(\x0b\x32\x34.build.buildgrid.UploadMissingBlobsResponse.Response\x1ag\n\x08Response\x12\x37\n\x06\x64igest\x18\x01 \x01(\x0b\x32\'.build.bazel.remote.execution.v2.Digest\x12\"\n\x06status\x18\x02 \x01(\x0b\x32\x12.google.rpc.Status\"\x81\x01\n\x10\x46\x65tchTreeRequest\x12\x15\n\rinstance_name\x18\x01 \x01(\t\x12<\n\x0broot_digest\x18\x02 \x01(\x0b\x32\'.build.bazel.remote.execution.v2.Digest\x12\x18\n\x10\x66\x65tch_file_blobs\x18\x03 \x01(\x08\"\x13\n\x11\x46\x65tchTreeResponse\"h\n\x11UploadTreeRequest\x12\x15\n\rinstance_name\x18\x01 \x01(\t\x12<\n\x0broot_digest\x18\x02 \x01(\x0b\x32\'.build.bazel.remote.execution.v2.Digest\"\x14\n\x12UploadTreeResponse\"u\n\x10StageTreeRequest\x12\x15\n\rinstance_name\x18\x01 \x01(\t\x12<\n\x0broot_digest\x18\x02 \x01(\x0b\x32\'.build.bazel.remote.execution.v2.Digest\x12\x0c\n\x04path\x18\x03 \x01(\t\"!\n\x11StageTreeResponse\x12\x0c\n\x04path\x18\x01 \x01(\t\"U\n\x12\x43\x61ptureTreeRequest\x12\x15\n\rinstance_name\x18\x01 \x01(\t\x12\x0c\n\x04path\x18\x02 \x03(\t\x12\x1a\n\x12\x62ypass_local_cache\x18\x03 \x01(\x08\"\xd3\x01\n\x13\x43\x61ptureTreeResponse\x12@\n\tresponses\x18\x01 \x03(\x0b\x32-.build.buildgrid.CaptureTreeResponse.Response\x1az\n\x08Response\x12\x0c\n\x04path\x18\x01 \x01(\t\x12<\n\x0btree_digest\x18\x02 \x01(\x0b\x32\'.build.bazel.remote.execution.v2.Digest\x12\"\n\x06status\x18\x03 \x01(\x0b\x32\x12.google.rpc.Status\"V\n\x13\x43\x61ptureFilesRequest\x12\x15\n\rinstance_name\x18\x01 \x01(\t\x12\x0c\n\x04path\x18\x02 \x03(\t\x12\x1a\n\x12\x62ypass_local_cache\x18\x03 \x01(\x08\"\xd0\x01\n\x14\x43\x61ptureFilesResponse\x12\x41\n\tresponses\x18\x01 \x03(\x0b\x32..build.buildgrid.CaptureFilesResponse.Response\x1au\n\x08Response\x12\x0c\n\x04path\x18\x01 \x01(\t\x12\x37\n\x06\x64igest\x18\x02 \x01(\x0b\x32\'.build.bazel.remote.execution.v2.Digest\x12\"\n\x06status\x18\x03 \x01(\x0b\x32\x12.google.rpc.Status\"\x83\x01\n\x1fGetInstanceNameForRemoteRequest\x12\x0b\n\x03url\x18\x01 \x01(\t\x12\x15\n\rinstance_name\x18\x02 \x01(\t\x12\x13\n\x0bserver_cert\x18\x03 \x01(\x0c\x12\x12\n\nclient_key\x18\x04 \x01(\x0c\x12\x13\n\x0b\x63lient_cert\x18\x05 \x01(\x0c\"9\n GetInstanceNameForRemoteResponse\x12\x15\n\rinstance_name\x18\x01 \x01(\t2\xc7\x06\n\x1eLocalContentAddressableStorage\x12l\n\x11\x46\x65tchMissingBlobs\x12).build.buildgrid.FetchMissingBlobsRequest\x1a*.build.buildgrid.FetchMissingBlobsResponse\"\x00\x12o\n\x12UploadMissingBlobs\x12*.build.buildgrid.UploadMissingBlobsRequest\x1a+.build.buildgrid.UploadMissingBlobsResponse\"\x00\x12T\n\tFetchTree\x12!.build.buildgrid.FetchTreeRequest\x1a\".build.buildgrid.FetchTreeResponse\"\x00\x12W\n\nUploadTree\x12\".build.buildgrid.UploadTreeRequest\x1a#.build.buildgrid.UploadTreeResponse\"\x00\x12X\n\tStageTree\x12!.build.buildgrid.StageTreeRequest\x1a\".build.buildgrid.StageTreeResponse\"\x00(\x01\x30\x01\x12Z\n\x0b\x43\x61ptureTree\x12#.build.buildgrid.CaptureTreeRequest\x1a$.build.buildgrid.CaptureTreeResponse\"\x00\x12]\n\x0c\x43\x61ptureFiles\x12$.build.buildgrid.CaptureFilesRequest\x1a%.build.buildgrid.CaptureFilesResponse\"\x00\x12\x81\x01\n\x18GetInstanceNameForRemote\x12\x30.build.buildgrid.GetInstanceNameForRemoteRequest\x1a\x31.build.buildgrid.GetInstanceNameForRemoteResponse\"\x00\x62\x06proto3') + , + dependencies=[build_dot_bazel_dot_remote_dot_execution_dot_v2_dot_remote__execution__pb2.DESCRIPTOR,google_dot_api_dot_annotations__pb2.DESCRIPTOR,google_dot_rpc_dot_status__pb2.DESCRIPTOR,]) + + + + +_FETCHMISSINGBLOBSREQUEST = _descriptor.Descriptor( + name='FetchMissingBlobsRequest', + full_name='build.buildgrid.FetchMissingBlobsRequest', + filename=None, + file=DESCRIPTOR, + containing_type=None, + fields=[ + _descriptor.FieldDescriptor( + name='instance_name', full_name='build.buildgrid.FetchMissingBlobsRequest.instance_name', index=0, + number=1, type=9, cpp_type=9, label=1, + has_default_value=False, default_value=_b("").decode('utf-8'), + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR), + _descriptor.FieldDescriptor( + name='blob_digests', full_name='build.buildgrid.FetchMissingBlobsRequest.blob_digests', index=1, + number=2, type=11, cpp_type=10, label=3, + has_default_value=False, default_value=[], + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR), + ], + extensions=[ + ], + nested_types=[], + enum_types=[ + ], + serialized_options=None, + is_extendable=False, + syntax='proto3', + extension_ranges=[], + oneofs=[ + ], + serialized_start=163, + serialized_end=275, +) + + +_FETCHMISSINGBLOBSRESPONSE_RESPONSE = _descriptor.Descriptor( + name='Response', + full_name='build.buildgrid.FetchMissingBlobsResponse.Response', + filename=None, + file=DESCRIPTOR, + containing_type=None, + fields=[ + _descriptor.FieldDescriptor( + name='digest', full_name='build.buildgrid.FetchMissingBlobsResponse.Response.digest', index=0, + number=1, type=11, cpp_type=10, label=1, + has_default_value=False, default_value=None, + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR), + _descriptor.FieldDescriptor( + name='status', full_name='build.buildgrid.FetchMissingBlobsResponse.Response.status', index=1, + number=2, type=11, cpp_type=10, label=1, + has_default_value=False, default_value=None, + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR), + ], + extensions=[ + ], + nested_types=[], + enum_types=[ + ], + serialized_options=None, + is_extendable=False, + syntax='proto3', + extension_ranges=[], + oneofs=[ + ], + serialized_start=379, + serialized_end=482, +) + +_FETCHMISSINGBLOBSRESPONSE = _descriptor.Descriptor( + name='FetchMissingBlobsResponse', + full_name='build.buildgrid.FetchMissingBlobsResponse', + filename=None, + file=DESCRIPTOR, + containing_type=None, + fields=[ + _descriptor.FieldDescriptor( + name='responses', full_name='build.buildgrid.FetchMissingBlobsResponse.responses', index=0, + number=1, type=11, cpp_type=10, label=3, + has_default_value=False, default_value=[], + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR), + ], + extensions=[ + ], + nested_types=[_FETCHMISSINGBLOBSRESPONSE_RESPONSE, ], + enum_types=[ + ], + serialized_options=None, + is_extendable=False, + syntax='proto3', + extension_ranges=[], + oneofs=[ + ], + serialized_start=278, + serialized_end=482, +) + + +_UPLOADMISSINGBLOBSREQUEST = _descriptor.Descriptor( + name='UploadMissingBlobsRequest', + full_name='build.buildgrid.UploadMissingBlobsRequest', + filename=None, + file=DESCRIPTOR, + containing_type=None, + fields=[ + _descriptor.FieldDescriptor( + name='instance_name', full_name='build.buildgrid.UploadMissingBlobsRequest.instance_name', index=0, + number=1, type=9, cpp_type=9, label=1, + has_default_value=False, default_value=_b("").decode('utf-8'), + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR), + _descriptor.FieldDescriptor( + name='blob_digests', full_name='build.buildgrid.UploadMissingBlobsRequest.blob_digests', index=1, + number=2, type=11, cpp_type=10, label=3, + has_default_value=False, default_value=[], + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR), + ], + extensions=[ + ], + nested_types=[], + enum_types=[ + ], + serialized_options=None, + is_extendable=False, + syntax='proto3', + extension_ranges=[], + oneofs=[ + ], + serialized_start=484, + serialized_end=597, +) + + +_UPLOADMISSINGBLOBSRESPONSE_RESPONSE = _descriptor.Descriptor( + name='Response', + full_name='build.buildgrid.UploadMissingBlobsResponse.Response', + filename=None, + file=DESCRIPTOR, + containing_type=None, + fields=[ + _descriptor.FieldDescriptor( + name='digest', full_name='build.buildgrid.UploadMissingBlobsResponse.Response.digest', index=0, + number=1, type=11, cpp_type=10, label=1, + has_default_value=False, default_value=None, + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR), + _descriptor.FieldDescriptor( + name='status', full_name='build.buildgrid.UploadMissingBlobsResponse.Response.status', index=1, + number=2, type=11, cpp_type=10, label=1, + has_default_value=False, default_value=None, + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR), + ], + extensions=[ + ], + nested_types=[], + enum_types=[ + ], + serialized_options=None, + is_extendable=False, + syntax='proto3', + extension_ranges=[], + oneofs=[ + ], + serialized_start=379, + serialized_end=482, +) + +_UPLOADMISSINGBLOBSRESPONSE = _descriptor.Descriptor( + name='UploadMissingBlobsResponse', + full_name='build.buildgrid.UploadMissingBlobsResponse', + filename=None, + file=DESCRIPTOR, + containing_type=None, + fields=[ + _descriptor.FieldDescriptor( + name='responses', full_name='build.buildgrid.UploadMissingBlobsResponse.responses', index=0, + number=1, type=11, cpp_type=10, label=3, + has_default_value=False, default_value=[], + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR), + ], + extensions=[ + ], + nested_types=[_UPLOADMISSINGBLOBSRESPONSE_RESPONSE, ], + enum_types=[ + ], + serialized_options=None, + is_extendable=False, + syntax='proto3', + extension_ranges=[], + oneofs=[ + ], + serialized_start=600, + serialized_end=806, +) + + +_FETCHTREEREQUEST = _descriptor.Descriptor( + name='FetchTreeRequest', + full_name='build.buildgrid.FetchTreeRequest', + filename=None, + file=DESCRIPTOR, + containing_type=None, + fields=[ + _descriptor.FieldDescriptor( + name='instance_name', full_name='build.buildgrid.FetchTreeRequest.instance_name', index=0, + number=1, type=9, cpp_type=9, label=1, + has_default_value=False, default_value=_b("").decode('utf-8'), + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR), + _descriptor.FieldDescriptor( + name='root_digest', full_name='build.buildgrid.FetchTreeRequest.root_digest', index=1, + number=2, type=11, cpp_type=10, label=1, + has_default_value=False, default_value=None, + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR), + _descriptor.FieldDescriptor( + name='fetch_file_blobs', full_name='build.buildgrid.FetchTreeRequest.fetch_file_blobs', index=2, + number=3, type=8, cpp_type=7, label=1, + has_default_value=False, default_value=False, + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR), + ], + extensions=[ + ], + nested_types=[], + enum_types=[ + ], + serialized_options=None, + is_extendable=False, + syntax='proto3', + extension_ranges=[], + oneofs=[ + ], + serialized_start=809, + serialized_end=938, +) + + +_FETCHTREERESPONSE = _descriptor.Descriptor( + name='FetchTreeResponse', + full_name='build.buildgrid.FetchTreeResponse', + filename=None, + file=DESCRIPTOR, + containing_type=None, + fields=[ + ], + extensions=[ + ], + nested_types=[], + enum_types=[ + ], + serialized_options=None, + is_extendable=False, + syntax='proto3', + extension_ranges=[], + oneofs=[ + ], + serialized_start=940, + serialized_end=959, +) + + +_UPLOADTREEREQUEST = _descriptor.Descriptor( + name='UploadTreeRequest', + full_name='build.buildgrid.UploadTreeRequest', + filename=None, + file=DESCRIPTOR, + containing_type=None, + fields=[ + _descriptor.FieldDescriptor( + name='instance_name', full_name='build.buildgrid.UploadTreeRequest.instance_name', index=0, + number=1, type=9, cpp_type=9, label=1, + has_default_value=False, default_value=_b("").decode('utf-8'), + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR), + _descriptor.FieldDescriptor( + name='root_digest', full_name='build.buildgrid.UploadTreeRequest.root_digest', index=1, + number=2, type=11, cpp_type=10, label=1, + has_default_value=False, default_value=None, + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR), + ], + extensions=[ + ], + nested_types=[], + enum_types=[ + ], + serialized_options=None, + is_extendable=False, + syntax='proto3', + extension_ranges=[], + oneofs=[ + ], + serialized_start=961, + serialized_end=1065, +) + + +_UPLOADTREERESPONSE = _descriptor.Descriptor( + name='UploadTreeResponse', + full_name='build.buildgrid.UploadTreeResponse', + filename=None, + file=DESCRIPTOR, + containing_type=None, + fields=[ + ], + extensions=[ + ], + nested_types=[], + enum_types=[ + ], + serialized_options=None, + is_extendable=False, + syntax='proto3', + extension_ranges=[], + oneofs=[ + ], + serialized_start=1067, + serialized_end=1087, +) + + +_STAGETREEREQUEST = _descriptor.Descriptor( + name='StageTreeRequest', + full_name='build.buildgrid.StageTreeRequest', + filename=None, + file=DESCRIPTOR, + containing_type=None, + fields=[ + _descriptor.FieldDescriptor( + name='instance_name', full_name='build.buildgrid.StageTreeRequest.instance_name', index=0, + number=1, type=9, cpp_type=9, label=1, + has_default_value=False, default_value=_b("").decode('utf-8'), + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR), + _descriptor.FieldDescriptor( + name='root_digest', full_name='build.buildgrid.StageTreeRequest.root_digest', index=1, + number=2, type=11, cpp_type=10, label=1, + has_default_value=False, default_value=None, + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR), + _descriptor.FieldDescriptor( + name='path', full_name='build.buildgrid.StageTreeRequest.path', index=2, + number=3, type=9, cpp_type=9, label=1, + has_default_value=False, default_value=_b("").decode('utf-8'), + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR), + ], + extensions=[ + ], + nested_types=[], + enum_types=[ + ], + serialized_options=None, + is_extendable=False, + syntax='proto3', + extension_ranges=[], + oneofs=[ + ], + serialized_start=1089, + serialized_end=1206, +) + + +_STAGETREERESPONSE = _descriptor.Descriptor( + name='StageTreeResponse', + full_name='build.buildgrid.StageTreeResponse', + filename=None, + file=DESCRIPTOR, + containing_type=None, + fields=[ + _descriptor.FieldDescriptor( + name='path', full_name='build.buildgrid.StageTreeResponse.path', index=0, + number=1, type=9, cpp_type=9, label=1, + has_default_value=False, default_value=_b("").decode('utf-8'), + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR), + ], + extensions=[ + ], + nested_types=[], + enum_types=[ + ], + serialized_options=None, + is_extendable=False, + syntax='proto3', + extension_ranges=[], + oneofs=[ + ], + serialized_start=1208, + serialized_end=1241, +) + + +_CAPTURETREEREQUEST = _descriptor.Descriptor( + name='CaptureTreeRequest', + full_name='build.buildgrid.CaptureTreeRequest', + filename=None, + file=DESCRIPTOR, + containing_type=None, + fields=[ + _descriptor.FieldDescriptor( + name='instance_name', full_name='build.buildgrid.CaptureTreeRequest.instance_name', index=0, + number=1, type=9, cpp_type=9, label=1, + has_default_value=False, default_value=_b("").decode('utf-8'), + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR), + _descriptor.FieldDescriptor( + name='path', full_name='build.buildgrid.CaptureTreeRequest.path', index=1, + number=2, type=9, cpp_type=9, label=3, + has_default_value=False, default_value=[], + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR), + _descriptor.FieldDescriptor( + name='bypass_local_cache', full_name='build.buildgrid.CaptureTreeRequest.bypass_local_cache', index=2, + number=3, type=8, cpp_type=7, label=1, + has_default_value=False, default_value=False, + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR), + ], + extensions=[ + ], + nested_types=[], + enum_types=[ + ], + serialized_options=None, + is_extendable=False, + syntax='proto3', + extension_ranges=[], + oneofs=[ + ], + serialized_start=1243, + serialized_end=1328, +) + + +_CAPTURETREERESPONSE_RESPONSE = _descriptor.Descriptor( + name='Response', + full_name='build.buildgrid.CaptureTreeResponse.Response', + filename=None, + file=DESCRIPTOR, + containing_type=None, + fields=[ + _descriptor.FieldDescriptor( + name='path', full_name='build.buildgrid.CaptureTreeResponse.Response.path', index=0, + number=1, type=9, cpp_type=9, label=1, + has_default_value=False, default_value=_b("").decode('utf-8'), + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR), + _descriptor.FieldDescriptor( + name='tree_digest', full_name='build.buildgrid.CaptureTreeResponse.Response.tree_digest', index=1, + number=2, type=11, cpp_type=10, label=1, + has_default_value=False, default_value=None, + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR), + _descriptor.FieldDescriptor( + name='status', full_name='build.buildgrid.CaptureTreeResponse.Response.status', index=2, + number=3, type=11, cpp_type=10, label=1, + has_default_value=False, default_value=None, + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR), + ], + extensions=[ + ], + nested_types=[], + enum_types=[ + ], + serialized_options=None, + is_extendable=False, + syntax='proto3', + extension_ranges=[], + oneofs=[ + ], + serialized_start=1420, + serialized_end=1542, +) + +_CAPTURETREERESPONSE = _descriptor.Descriptor( + name='CaptureTreeResponse', + full_name='build.buildgrid.CaptureTreeResponse', + filename=None, + file=DESCRIPTOR, + containing_type=None, + fields=[ + _descriptor.FieldDescriptor( + name='responses', full_name='build.buildgrid.CaptureTreeResponse.responses', index=0, + number=1, type=11, cpp_type=10, label=3, + has_default_value=False, default_value=[], + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR), + ], + extensions=[ + ], + nested_types=[_CAPTURETREERESPONSE_RESPONSE, ], + enum_types=[ + ], + serialized_options=None, + is_extendable=False, + syntax='proto3', + extension_ranges=[], + oneofs=[ + ], + serialized_start=1331, + serialized_end=1542, +) + + +_CAPTUREFILESREQUEST = _descriptor.Descriptor( + name='CaptureFilesRequest', + full_name='build.buildgrid.CaptureFilesRequest', + filename=None, + file=DESCRIPTOR, + containing_type=None, + fields=[ + _descriptor.FieldDescriptor( + name='instance_name', full_name='build.buildgrid.CaptureFilesRequest.instance_name', index=0, + number=1, type=9, cpp_type=9, label=1, + has_default_value=False, default_value=_b("").decode('utf-8'), + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR), + _descriptor.FieldDescriptor( + name='path', full_name='build.buildgrid.CaptureFilesRequest.path', index=1, + number=2, type=9, cpp_type=9, label=3, + has_default_value=False, default_value=[], + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR), + _descriptor.FieldDescriptor( + name='bypass_local_cache', full_name='build.buildgrid.CaptureFilesRequest.bypass_local_cache', index=2, + number=3, type=8, cpp_type=7, label=1, + has_default_value=False, default_value=False, + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR), + ], + extensions=[ + ], + nested_types=[], + enum_types=[ + ], + serialized_options=None, + is_extendable=False, + syntax='proto3', + extension_ranges=[], + oneofs=[ + ], + serialized_start=1544, + serialized_end=1630, +) + + +_CAPTUREFILESRESPONSE_RESPONSE = _descriptor.Descriptor( + name='Response', + full_name='build.buildgrid.CaptureFilesResponse.Response', + filename=None, + file=DESCRIPTOR, + containing_type=None, + fields=[ + _descriptor.FieldDescriptor( + name='path', full_name='build.buildgrid.CaptureFilesResponse.Response.path', index=0, + number=1, type=9, cpp_type=9, label=1, + has_default_value=False, default_value=_b("").decode('utf-8'), + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR), + _descriptor.FieldDescriptor( + name='digest', full_name='build.buildgrid.CaptureFilesResponse.Response.digest', index=1, + number=2, type=11, cpp_type=10, label=1, + has_default_value=False, default_value=None, + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR), + _descriptor.FieldDescriptor( + name='status', full_name='build.buildgrid.CaptureFilesResponse.Response.status', index=2, + number=3, type=11, cpp_type=10, label=1, + has_default_value=False, default_value=None, + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR), + ], + extensions=[ + ], + nested_types=[], + enum_types=[ + ], + serialized_options=None, + is_extendable=False, + syntax='proto3', + extension_ranges=[], + oneofs=[ + ], + serialized_start=1724, + serialized_end=1841, +) + +_CAPTUREFILESRESPONSE = _descriptor.Descriptor( + name='CaptureFilesResponse', + full_name='build.buildgrid.CaptureFilesResponse', + filename=None, + file=DESCRIPTOR, + containing_type=None, + fields=[ + _descriptor.FieldDescriptor( + name='responses', full_name='build.buildgrid.CaptureFilesResponse.responses', index=0, + number=1, type=11, cpp_type=10, label=3, + has_default_value=False, default_value=[], + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR), + ], + extensions=[ + ], + nested_types=[_CAPTUREFILESRESPONSE_RESPONSE, ], + enum_types=[ + ], + serialized_options=None, + is_extendable=False, + syntax='proto3', + extension_ranges=[], + oneofs=[ + ], + serialized_start=1633, + serialized_end=1841, +) + + +_GETINSTANCENAMEFORREMOTEREQUEST = _descriptor.Descriptor( + name='GetInstanceNameForRemoteRequest', + full_name='build.buildgrid.GetInstanceNameForRemoteRequest', + filename=None, + file=DESCRIPTOR, + containing_type=None, + fields=[ + _descriptor.FieldDescriptor( + name='url', full_name='build.buildgrid.GetInstanceNameForRemoteRequest.url', index=0, + number=1, type=9, cpp_type=9, label=1, + has_default_value=False, default_value=_b("").decode('utf-8'), + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR), + _descriptor.FieldDescriptor( + name='instance_name', full_name='build.buildgrid.GetInstanceNameForRemoteRequest.instance_name', index=1, + number=2, type=9, cpp_type=9, label=1, + has_default_value=False, default_value=_b("").decode('utf-8'), + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR), + _descriptor.FieldDescriptor( + name='server_cert', full_name='build.buildgrid.GetInstanceNameForRemoteRequest.server_cert', index=2, + number=3, type=12, cpp_type=9, label=1, + has_default_value=False, default_value=_b(""), + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR), + _descriptor.FieldDescriptor( + name='client_key', full_name='build.buildgrid.GetInstanceNameForRemoteRequest.client_key', index=3, + number=4, type=12, cpp_type=9, label=1, + has_default_value=False, default_value=_b(""), + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR), + _descriptor.FieldDescriptor( + name='client_cert', full_name='build.buildgrid.GetInstanceNameForRemoteRequest.client_cert', index=4, + number=5, type=12, cpp_type=9, label=1, + has_default_value=False, default_value=_b(""), + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR), + ], + extensions=[ + ], + nested_types=[], + enum_types=[ + ], + serialized_options=None, + is_extendable=False, + syntax='proto3', + extension_ranges=[], + oneofs=[ + ], + serialized_start=1844, + serialized_end=1975, +) + + +_GETINSTANCENAMEFORREMOTERESPONSE = _descriptor.Descriptor( + name='GetInstanceNameForRemoteResponse', + full_name='build.buildgrid.GetInstanceNameForRemoteResponse', + filename=None, + file=DESCRIPTOR, + containing_type=None, + fields=[ + _descriptor.FieldDescriptor( + name='instance_name', full_name='build.buildgrid.GetInstanceNameForRemoteResponse.instance_name', index=0, + number=1, type=9, cpp_type=9, label=1, + has_default_value=False, default_value=_b("").decode('utf-8'), + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR), + ], + extensions=[ + ], + nested_types=[], + enum_types=[ + ], + serialized_options=None, + is_extendable=False, + syntax='proto3', + extension_ranges=[], + oneofs=[ + ], + serialized_start=1977, + serialized_end=2034, +) + +_FETCHMISSINGBLOBSREQUEST.fields_by_name['blob_digests'].message_type = build_dot_bazel_dot_remote_dot_execution_dot_v2_dot_remote__execution__pb2._DIGEST +_FETCHMISSINGBLOBSRESPONSE_RESPONSE.fields_by_name['digest'].message_type = build_dot_bazel_dot_remote_dot_execution_dot_v2_dot_remote__execution__pb2._DIGEST +_FETCHMISSINGBLOBSRESPONSE_RESPONSE.fields_by_name['status'].message_type = google_dot_rpc_dot_status__pb2._STATUS +_FETCHMISSINGBLOBSRESPONSE_RESPONSE.containing_type = _FETCHMISSINGBLOBSRESPONSE +_FETCHMISSINGBLOBSRESPONSE.fields_by_name['responses'].message_type = _FETCHMISSINGBLOBSRESPONSE_RESPONSE +_UPLOADMISSINGBLOBSREQUEST.fields_by_name['blob_digests'].message_type = build_dot_bazel_dot_remote_dot_execution_dot_v2_dot_remote__execution__pb2._DIGEST +_UPLOADMISSINGBLOBSRESPONSE_RESPONSE.fields_by_name['digest'].message_type = build_dot_bazel_dot_remote_dot_execution_dot_v2_dot_remote__execution__pb2._DIGEST +_UPLOADMISSINGBLOBSRESPONSE_RESPONSE.fields_by_name['status'].message_type = google_dot_rpc_dot_status__pb2._STATUS +_UPLOADMISSINGBLOBSRESPONSE_RESPONSE.containing_type = _UPLOADMISSINGBLOBSRESPONSE +_UPLOADMISSINGBLOBSRESPONSE.fields_by_name['responses'].message_type = _UPLOADMISSINGBLOBSRESPONSE_RESPONSE +_FETCHTREEREQUEST.fields_by_name['root_digest'].message_type = build_dot_bazel_dot_remote_dot_execution_dot_v2_dot_remote__execution__pb2._DIGEST +_UPLOADTREEREQUEST.fields_by_name['root_digest'].message_type = build_dot_bazel_dot_remote_dot_execution_dot_v2_dot_remote__execution__pb2._DIGEST +_STAGETREEREQUEST.fields_by_name['root_digest'].message_type = build_dot_bazel_dot_remote_dot_execution_dot_v2_dot_remote__execution__pb2._DIGEST +_CAPTURETREERESPONSE_RESPONSE.fields_by_name['tree_digest'].message_type = build_dot_bazel_dot_remote_dot_execution_dot_v2_dot_remote__execution__pb2._DIGEST +_CAPTURETREERESPONSE_RESPONSE.fields_by_name['status'].message_type = google_dot_rpc_dot_status__pb2._STATUS +_CAPTURETREERESPONSE_RESPONSE.containing_type = _CAPTURETREERESPONSE +_CAPTURETREERESPONSE.fields_by_name['responses'].message_type = _CAPTURETREERESPONSE_RESPONSE +_CAPTUREFILESRESPONSE_RESPONSE.fields_by_name['digest'].message_type = build_dot_bazel_dot_remote_dot_execution_dot_v2_dot_remote__execution__pb2._DIGEST +_CAPTUREFILESRESPONSE_RESPONSE.fields_by_name['status'].message_type = google_dot_rpc_dot_status__pb2._STATUS +_CAPTUREFILESRESPONSE_RESPONSE.containing_type = _CAPTUREFILESRESPONSE +_CAPTUREFILESRESPONSE.fields_by_name['responses'].message_type = _CAPTUREFILESRESPONSE_RESPONSE +DESCRIPTOR.message_types_by_name['FetchMissingBlobsRequest'] = _FETCHMISSINGBLOBSREQUEST +DESCRIPTOR.message_types_by_name['FetchMissingBlobsResponse'] = _FETCHMISSINGBLOBSRESPONSE +DESCRIPTOR.message_types_by_name['UploadMissingBlobsRequest'] = _UPLOADMISSINGBLOBSREQUEST +DESCRIPTOR.message_types_by_name['UploadMissingBlobsResponse'] = _UPLOADMISSINGBLOBSRESPONSE +DESCRIPTOR.message_types_by_name['FetchTreeRequest'] = _FETCHTREEREQUEST +DESCRIPTOR.message_types_by_name['FetchTreeResponse'] = _FETCHTREERESPONSE +DESCRIPTOR.message_types_by_name['UploadTreeRequest'] = _UPLOADTREEREQUEST +DESCRIPTOR.message_types_by_name['UploadTreeResponse'] = _UPLOADTREERESPONSE +DESCRIPTOR.message_types_by_name['StageTreeRequest'] = _STAGETREEREQUEST +DESCRIPTOR.message_types_by_name['StageTreeResponse'] = _STAGETREERESPONSE +DESCRIPTOR.message_types_by_name['CaptureTreeRequest'] = _CAPTURETREEREQUEST +DESCRIPTOR.message_types_by_name['CaptureTreeResponse'] = _CAPTURETREERESPONSE +DESCRIPTOR.message_types_by_name['CaptureFilesRequest'] = _CAPTUREFILESREQUEST +DESCRIPTOR.message_types_by_name['CaptureFilesResponse'] = _CAPTUREFILESRESPONSE +DESCRIPTOR.message_types_by_name['GetInstanceNameForRemoteRequest'] = _GETINSTANCENAMEFORREMOTEREQUEST +DESCRIPTOR.message_types_by_name['GetInstanceNameForRemoteResponse'] = _GETINSTANCENAMEFORREMOTERESPONSE +_sym_db.RegisterFileDescriptor(DESCRIPTOR) + +FetchMissingBlobsRequest = _reflection.GeneratedProtocolMessageType('FetchMissingBlobsRequest', (_message.Message,), dict( + DESCRIPTOR = _FETCHMISSINGBLOBSREQUEST, + __module__ = 'build.buildgrid.local_cas_pb2' + # @@protoc_insertion_point(class_scope:build.buildgrid.FetchMissingBlobsRequest) + )) +_sym_db.RegisterMessage(FetchMissingBlobsRequest) + +FetchMissingBlobsResponse = _reflection.GeneratedProtocolMessageType('FetchMissingBlobsResponse', (_message.Message,), dict( + + Response = _reflection.GeneratedProtocolMessageType('Response', (_message.Message,), dict( + DESCRIPTOR = _FETCHMISSINGBLOBSRESPONSE_RESPONSE, + __module__ = 'build.buildgrid.local_cas_pb2' + # @@protoc_insertion_point(class_scope:build.buildgrid.FetchMissingBlobsResponse.Response) + )) + , + DESCRIPTOR = _FETCHMISSINGBLOBSRESPONSE, + __module__ = 'build.buildgrid.local_cas_pb2' + # @@protoc_insertion_point(class_scope:build.buildgrid.FetchMissingBlobsResponse) + )) +_sym_db.RegisterMessage(FetchMissingBlobsResponse) +_sym_db.RegisterMessage(FetchMissingBlobsResponse.Response) + +UploadMissingBlobsRequest = _reflection.GeneratedProtocolMessageType('UploadMissingBlobsRequest', (_message.Message,), dict( + DESCRIPTOR = _UPLOADMISSINGBLOBSREQUEST, + __module__ = 'build.buildgrid.local_cas_pb2' + # @@protoc_insertion_point(class_scope:build.buildgrid.UploadMissingBlobsRequest) + )) +_sym_db.RegisterMessage(UploadMissingBlobsRequest) + +UploadMissingBlobsResponse = _reflection.GeneratedProtocolMessageType('UploadMissingBlobsResponse', (_message.Message,), dict( + + Response = _reflection.GeneratedProtocolMessageType('Response', (_message.Message,), dict( + DESCRIPTOR = _UPLOADMISSINGBLOBSRESPONSE_RESPONSE, + __module__ = 'build.buildgrid.local_cas_pb2' + # @@protoc_insertion_point(class_scope:build.buildgrid.UploadMissingBlobsResponse.Response) + )) + , + DESCRIPTOR = _UPLOADMISSINGBLOBSRESPONSE, + __module__ = 'build.buildgrid.local_cas_pb2' + # @@protoc_insertion_point(class_scope:build.buildgrid.UploadMissingBlobsResponse) + )) +_sym_db.RegisterMessage(UploadMissingBlobsResponse) +_sym_db.RegisterMessage(UploadMissingBlobsResponse.Response) + +FetchTreeRequest = _reflection.GeneratedProtocolMessageType('FetchTreeRequest', (_message.Message,), dict( + DESCRIPTOR = _FETCHTREEREQUEST, + __module__ = 'build.buildgrid.local_cas_pb2' + # @@protoc_insertion_point(class_scope:build.buildgrid.FetchTreeRequest) + )) +_sym_db.RegisterMessage(FetchTreeRequest) + +FetchTreeResponse = _reflection.GeneratedProtocolMessageType('FetchTreeResponse', (_message.Message,), dict( + DESCRIPTOR = _FETCHTREERESPONSE, + __module__ = 'build.buildgrid.local_cas_pb2' + # @@protoc_insertion_point(class_scope:build.buildgrid.FetchTreeResponse) + )) +_sym_db.RegisterMessage(FetchTreeResponse) + +UploadTreeRequest = _reflection.GeneratedProtocolMessageType('UploadTreeRequest', (_message.Message,), dict( + DESCRIPTOR = _UPLOADTREEREQUEST, + __module__ = 'build.buildgrid.local_cas_pb2' + # @@protoc_insertion_point(class_scope:build.buildgrid.UploadTreeRequest) + )) +_sym_db.RegisterMessage(UploadTreeRequest) + +UploadTreeResponse = _reflection.GeneratedProtocolMessageType('UploadTreeResponse', (_message.Message,), dict( + DESCRIPTOR = _UPLOADTREERESPONSE, + __module__ = 'build.buildgrid.local_cas_pb2' + # @@protoc_insertion_point(class_scope:build.buildgrid.UploadTreeResponse) + )) +_sym_db.RegisterMessage(UploadTreeResponse) + +StageTreeRequest = _reflection.GeneratedProtocolMessageType('StageTreeRequest', (_message.Message,), dict( + DESCRIPTOR = _STAGETREEREQUEST, + __module__ = 'build.buildgrid.local_cas_pb2' + # @@protoc_insertion_point(class_scope:build.buildgrid.StageTreeRequest) + )) +_sym_db.RegisterMessage(StageTreeRequest) + +StageTreeResponse = _reflection.GeneratedProtocolMessageType('StageTreeResponse', (_message.Message,), dict( + DESCRIPTOR = _STAGETREERESPONSE, + __module__ = 'build.buildgrid.local_cas_pb2' + # @@protoc_insertion_point(class_scope:build.buildgrid.StageTreeResponse) + )) +_sym_db.RegisterMessage(StageTreeResponse) + +CaptureTreeRequest = _reflection.GeneratedProtocolMessageType('CaptureTreeRequest', (_message.Message,), dict( + DESCRIPTOR = _CAPTURETREEREQUEST, + __module__ = 'build.buildgrid.local_cas_pb2' + # @@protoc_insertion_point(class_scope:build.buildgrid.CaptureTreeRequest) + )) +_sym_db.RegisterMessage(CaptureTreeRequest) + +CaptureTreeResponse = _reflection.GeneratedProtocolMessageType('CaptureTreeResponse', (_message.Message,), dict( + + Response = _reflection.GeneratedProtocolMessageType('Response', (_message.Message,), dict( + DESCRIPTOR = _CAPTURETREERESPONSE_RESPONSE, + __module__ = 'build.buildgrid.local_cas_pb2' + # @@protoc_insertion_point(class_scope:build.buildgrid.CaptureTreeResponse.Response) + )) + , + DESCRIPTOR = _CAPTURETREERESPONSE, + __module__ = 'build.buildgrid.local_cas_pb2' + # @@protoc_insertion_point(class_scope:build.buildgrid.CaptureTreeResponse) + )) +_sym_db.RegisterMessage(CaptureTreeResponse) +_sym_db.RegisterMessage(CaptureTreeResponse.Response) + +CaptureFilesRequest = _reflection.GeneratedProtocolMessageType('CaptureFilesRequest', (_message.Message,), dict( + DESCRIPTOR = _CAPTUREFILESREQUEST, + __module__ = 'build.buildgrid.local_cas_pb2' + # @@protoc_insertion_point(class_scope:build.buildgrid.CaptureFilesRequest) + )) +_sym_db.RegisterMessage(CaptureFilesRequest) + +CaptureFilesResponse = _reflection.GeneratedProtocolMessageType('CaptureFilesResponse', (_message.Message,), dict( + + Response = _reflection.GeneratedProtocolMessageType('Response', (_message.Message,), dict( + DESCRIPTOR = _CAPTUREFILESRESPONSE_RESPONSE, + __module__ = 'build.buildgrid.local_cas_pb2' + # @@protoc_insertion_point(class_scope:build.buildgrid.CaptureFilesResponse.Response) + )) + , + DESCRIPTOR = _CAPTUREFILESRESPONSE, + __module__ = 'build.buildgrid.local_cas_pb2' + # @@protoc_insertion_point(class_scope:build.buildgrid.CaptureFilesResponse) + )) +_sym_db.RegisterMessage(CaptureFilesResponse) +_sym_db.RegisterMessage(CaptureFilesResponse.Response) + +GetInstanceNameForRemoteRequest = _reflection.GeneratedProtocolMessageType('GetInstanceNameForRemoteRequest', (_message.Message,), dict( + DESCRIPTOR = _GETINSTANCENAMEFORREMOTEREQUEST, + __module__ = 'build.buildgrid.local_cas_pb2' + # @@protoc_insertion_point(class_scope:build.buildgrid.GetInstanceNameForRemoteRequest) + )) +_sym_db.RegisterMessage(GetInstanceNameForRemoteRequest) + +GetInstanceNameForRemoteResponse = _reflection.GeneratedProtocolMessageType('GetInstanceNameForRemoteResponse', (_message.Message,), dict( + DESCRIPTOR = _GETINSTANCENAMEFORREMOTERESPONSE, + __module__ = 'build.buildgrid.local_cas_pb2' + # @@protoc_insertion_point(class_scope:build.buildgrid.GetInstanceNameForRemoteResponse) + )) +_sym_db.RegisterMessage(GetInstanceNameForRemoteResponse) + + + +_LOCALCONTENTADDRESSABLESTORAGE = _descriptor.ServiceDescriptor( + name='LocalContentAddressableStorage', + full_name='build.buildgrid.LocalContentAddressableStorage', + file=DESCRIPTOR, + index=0, + serialized_options=None, + serialized_start=2037, + serialized_end=2876, + methods=[ + _descriptor.MethodDescriptor( + name='FetchMissingBlobs', + full_name='build.buildgrid.LocalContentAddressableStorage.FetchMissingBlobs', + index=0, + containing_service=None, + input_type=_FETCHMISSINGBLOBSREQUEST, + output_type=_FETCHMISSINGBLOBSRESPONSE, + serialized_options=None, + ), + _descriptor.MethodDescriptor( + name='UploadMissingBlobs', + full_name='build.buildgrid.LocalContentAddressableStorage.UploadMissingBlobs', + index=1, + containing_service=None, + input_type=_UPLOADMISSINGBLOBSREQUEST, + output_type=_UPLOADMISSINGBLOBSRESPONSE, + serialized_options=None, + ), + _descriptor.MethodDescriptor( + name='FetchTree', + full_name='build.buildgrid.LocalContentAddressableStorage.FetchTree', + index=2, + containing_service=None, + input_type=_FETCHTREEREQUEST, + output_type=_FETCHTREERESPONSE, + serialized_options=None, + ), + _descriptor.MethodDescriptor( + name='UploadTree', + full_name='build.buildgrid.LocalContentAddressableStorage.UploadTree', + index=3, + containing_service=None, + input_type=_UPLOADTREEREQUEST, + output_type=_UPLOADTREERESPONSE, + serialized_options=None, + ), + _descriptor.MethodDescriptor( + name='StageTree', + full_name='build.buildgrid.LocalContentAddressableStorage.StageTree', + index=4, + containing_service=None, + input_type=_STAGETREEREQUEST, + output_type=_STAGETREERESPONSE, + serialized_options=None, + ), + _descriptor.MethodDescriptor( + name='CaptureTree', + full_name='build.buildgrid.LocalContentAddressableStorage.CaptureTree', + index=5, + containing_service=None, + input_type=_CAPTURETREEREQUEST, + output_type=_CAPTURETREERESPONSE, + serialized_options=None, + ), + _descriptor.MethodDescriptor( + name='CaptureFiles', + full_name='build.buildgrid.LocalContentAddressableStorage.CaptureFiles', + index=6, + containing_service=None, + input_type=_CAPTUREFILESREQUEST, + output_type=_CAPTUREFILESRESPONSE, + serialized_options=None, + ), + _descriptor.MethodDescriptor( + name='GetInstanceNameForRemote', + full_name='build.buildgrid.LocalContentAddressableStorage.GetInstanceNameForRemote', + index=7, + containing_service=None, + input_type=_GETINSTANCENAMEFORREMOTEREQUEST, + output_type=_GETINSTANCENAMEFORREMOTERESPONSE, + serialized_options=None, + ), +]) +_sym_db.RegisterServiceDescriptor(_LOCALCONTENTADDRESSABLESTORAGE) + +DESCRIPTOR.services_by_name['LocalContentAddressableStorage'] = _LOCALCONTENTADDRESSABLESTORAGE + +# @@protoc_insertion_point(module_scope) diff --git a/src/buildstream/_protos/build/buildgrid/local_cas_pb2_grpc.py b/src/buildstream/_protos/build/buildgrid/local_cas_pb2_grpc.py new file mode 100644 index 000000000..7ea06a3a7 --- /dev/null +++ b/src/buildstream/_protos/build/buildgrid/local_cas_pb2_grpc.py @@ -0,0 +1,238 @@ +# Generated by the gRPC Python protocol compiler plugin. DO NOT EDIT! +import grpc + +from buildstream._protos.build.buildgrid import local_cas_pb2 as build_dot_buildgrid_dot_local__cas__pb2 + + +class LocalContentAddressableStorageStub(object): + # missing associated documentation comment in .proto file + pass + + def __init__(self, channel): + """Constructor. + + Args: + channel: A grpc.Channel. + """ + self.FetchMissingBlobs = channel.unary_unary( + '/build.buildgrid.LocalContentAddressableStorage/FetchMissingBlobs', + request_serializer=build_dot_buildgrid_dot_local__cas__pb2.FetchMissingBlobsRequest.SerializeToString, + response_deserializer=build_dot_buildgrid_dot_local__cas__pb2.FetchMissingBlobsResponse.FromString, + ) + self.UploadMissingBlobs = channel.unary_unary( + '/build.buildgrid.LocalContentAddressableStorage/UploadMissingBlobs', + request_serializer=build_dot_buildgrid_dot_local__cas__pb2.UploadMissingBlobsRequest.SerializeToString, + response_deserializer=build_dot_buildgrid_dot_local__cas__pb2.UploadMissingBlobsResponse.FromString, + ) + self.FetchTree = channel.unary_unary( + '/build.buildgrid.LocalContentAddressableStorage/FetchTree', + request_serializer=build_dot_buildgrid_dot_local__cas__pb2.FetchTreeRequest.SerializeToString, + response_deserializer=build_dot_buildgrid_dot_local__cas__pb2.FetchTreeResponse.FromString, + ) + self.UploadTree = channel.unary_unary( + '/build.buildgrid.LocalContentAddressableStorage/UploadTree', + request_serializer=build_dot_buildgrid_dot_local__cas__pb2.UploadTreeRequest.SerializeToString, + response_deserializer=build_dot_buildgrid_dot_local__cas__pb2.UploadTreeResponse.FromString, + ) + self.StageTree = channel.stream_stream( + '/build.buildgrid.LocalContentAddressableStorage/StageTree', + request_serializer=build_dot_buildgrid_dot_local__cas__pb2.StageTreeRequest.SerializeToString, + response_deserializer=build_dot_buildgrid_dot_local__cas__pb2.StageTreeResponse.FromString, + ) + self.CaptureTree = channel.unary_unary( + '/build.buildgrid.LocalContentAddressableStorage/CaptureTree', + request_serializer=build_dot_buildgrid_dot_local__cas__pb2.CaptureTreeRequest.SerializeToString, + response_deserializer=build_dot_buildgrid_dot_local__cas__pb2.CaptureTreeResponse.FromString, + ) + self.CaptureFiles = channel.unary_unary( + '/build.buildgrid.LocalContentAddressableStorage/CaptureFiles', + request_serializer=build_dot_buildgrid_dot_local__cas__pb2.CaptureFilesRequest.SerializeToString, + response_deserializer=build_dot_buildgrid_dot_local__cas__pb2.CaptureFilesResponse.FromString, + ) + self.GetInstanceNameForRemote = channel.unary_unary( + '/build.buildgrid.LocalContentAddressableStorage/GetInstanceNameForRemote', + request_serializer=build_dot_buildgrid_dot_local__cas__pb2.GetInstanceNameForRemoteRequest.SerializeToString, + response_deserializer=build_dot_buildgrid_dot_local__cas__pb2.GetInstanceNameForRemoteResponse.FromString, + ) + + +class LocalContentAddressableStorageServicer(object): + # missing associated documentation comment in .proto file + pass + + def FetchMissingBlobs(self, request, context): + """Fetch blobs from a remote CAS to the local cache. + + This request is equivalent to ByteStream `Read` or `BatchReadBlobs` + requests, storing the downloaded blobs in the local cache. + + Requested blobs that failed to be downloaded will be listed in the + response. + + Errors: + * `INVALID_ARGUMENT`: The client attempted to download more than the + server supported limit. + + Individual requests may return the following error, additionally: + * `NOT_FOUND`: The requested blob is not present in the remote CAS. + """ + context.set_code(grpc.StatusCode.UNIMPLEMENTED) + context.set_details('Method not implemented!') + raise NotImplementedError('Method not implemented!') + + def UploadMissingBlobs(self, request, context): + """Upload blobs from the local cache to a remote CAS. + + This request is equivalent to `FindMissingBlobs` followed by + ByteStream `Write` or `BatchUpdateBlobs` requests. + + Blobs that failed to be uploaded will be listed in the response. + + Errors: + * `INVALID_ARGUMENT`: The client attempted to upload more than the + server supported limit. + + Individual requests may return the following error, additionally: + * `NOT_FOUND`: The requested blob is not present in the local cache. + * `RESOURCE_EXHAUSTED`: There is insufficient disk quota to store the blob. + """ + context.set_code(grpc.StatusCode.UNIMPLEMENTED) + context.set_details('Method not implemented!') + raise NotImplementedError('Method not implemented!') + + def FetchTree(self, request, context): + """Fetch the entire directory tree rooted at a node from a remote CAS to the + local cache. + + This request is equivalent to `GetTree`, storing the `Directory` objects + in the local cache. Optionally, this will also fetch all blobs referenced + by the `Directory` objects, equivalent to `FetchMissingBlobs`. + + If part of the tree is missing from the CAS, the server will return the + portion present and omit the rest. + + * `NOT_FOUND`: The requested tree root is not present in the CAS. + """ + context.set_code(grpc.StatusCode.UNIMPLEMENTED) + context.set_details('Method not implemented!') + raise NotImplementedError('Method not implemented!') + + def UploadTree(self, request, context): + """Upload the entire directory tree from the local cache to a remote CAS. + + This request is equivalent to `UploadMissingBlobs` for all blobs + referenced by the specified tree (recursively). + + Errors: + * `NOT_FOUND`: The requested tree root is not present in the local cache. + * `RESOURCE_EXHAUSTED`: There is insufficient disk quota to store the tree. + """ + context.set_code(grpc.StatusCode.UNIMPLEMENTED) + context.set_details('Method not implemented!') + raise NotImplementedError('Method not implemented!') + + def StageTree(self, request_iterator, context): + """Stage a directory tree in the local filesystem. + + This makes the specified directory tree temporarily available for local + filesystem access. It is implementation-defined whether this uses a + userspace filesystem such as FUSE, hardlinking or a full copy. + + Missing blobs are fetched, if a CAS remote is configured. + + The staging starts when the server receives the initial request and + it is ready to be used on the initial (non-error) response from the + server. + + The server will clean up the staged directory when it either + receives an additional request (with all fields unset) or when the + stream is closed. The server will send an additional response after + cleanup is complete. + """ + context.set_code(grpc.StatusCode.UNIMPLEMENTED) + context.set_details('Method not implemented!') + raise NotImplementedError('Method not implemented!') + + def CaptureTree(self, request, context): + """Capture a directory tree from the local filesystem. + + This imports the specified path from the local filesystem into CAS. + + If a CAS remote is configured, the blobs are uploaded. + The `bypass_local_cache` parameter is a hint to indicate whether the blobs + shall be uploaded without first storing them in the local cache. + """ + context.set_code(grpc.StatusCode.UNIMPLEMENTED) + context.set_details('Method not implemented!') + raise NotImplementedError('Method not implemented!') + + def CaptureFiles(self, request, context): + """Capture files from the local filesystem. + + This imports the specified paths from the local filesystem into CAS. + + If a CAS remote is configured, the blobs are uploaded. + The `bypass_local_cache` parameter is a hint to indicate whether the blobs + shall be uploaded without first storing them in the local cache. + """ + context.set_code(grpc.StatusCode.UNIMPLEMENTED) + context.set_details('Method not implemented!') + raise NotImplementedError('Method not implemented!') + + def GetInstanceNameForRemote(self, request, context): + """Configure remote CAS endpoint. + + This returns a string that can be used as instance_name to access the + specified endpoint in further requests. + """ + context.set_code(grpc.StatusCode.UNIMPLEMENTED) + context.set_details('Method not implemented!') + raise NotImplementedError('Method not implemented!') + + +def add_LocalContentAddressableStorageServicer_to_server(servicer, server): + rpc_method_handlers = { + 'FetchMissingBlobs': grpc.unary_unary_rpc_method_handler( + servicer.FetchMissingBlobs, + request_deserializer=build_dot_buildgrid_dot_local__cas__pb2.FetchMissingBlobsRequest.FromString, + response_serializer=build_dot_buildgrid_dot_local__cas__pb2.FetchMissingBlobsResponse.SerializeToString, + ), + 'UploadMissingBlobs': grpc.unary_unary_rpc_method_handler( + servicer.UploadMissingBlobs, + request_deserializer=build_dot_buildgrid_dot_local__cas__pb2.UploadMissingBlobsRequest.FromString, + response_serializer=build_dot_buildgrid_dot_local__cas__pb2.UploadMissingBlobsResponse.SerializeToString, + ), + 'FetchTree': grpc.unary_unary_rpc_method_handler( + servicer.FetchTree, + request_deserializer=build_dot_buildgrid_dot_local__cas__pb2.FetchTreeRequest.FromString, + response_serializer=build_dot_buildgrid_dot_local__cas__pb2.FetchTreeResponse.SerializeToString, + ), + 'UploadTree': grpc.unary_unary_rpc_method_handler( + servicer.UploadTree, + request_deserializer=build_dot_buildgrid_dot_local__cas__pb2.UploadTreeRequest.FromString, + response_serializer=build_dot_buildgrid_dot_local__cas__pb2.UploadTreeResponse.SerializeToString, + ), + 'StageTree': grpc.stream_stream_rpc_method_handler( + servicer.StageTree, + request_deserializer=build_dot_buildgrid_dot_local__cas__pb2.StageTreeRequest.FromString, + response_serializer=build_dot_buildgrid_dot_local__cas__pb2.StageTreeResponse.SerializeToString, + ), + 'CaptureTree': grpc.unary_unary_rpc_method_handler( + servicer.CaptureTree, + request_deserializer=build_dot_buildgrid_dot_local__cas__pb2.CaptureTreeRequest.FromString, + response_serializer=build_dot_buildgrid_dot_local__cas__pb2.CaptureTreeResponse.SerializeToString, + ), + 'CaptureFiles': grpc.unary_unary_rpc_method_handler( + servicer.CaptureFiles, + request_deserializer=build_dot_buildgrid_dot_local__cas__pb2.CaptureFilesRequest.FromString, + response_serializer=build_dot_buildgrid_dot_local__cas__pb2.CaptureFilesResponse.SerializeToString, + ), + 'GetInstanceNameForRemote': grpc.unary_unary_rpc_method_handler( + servicer.GetInstanceNameForRemote, + request_deserializer=build_dot_buildgrid_dot_local__cas__pb2.GetInstanceNameForRemoteRequest.FromString, + response_serializer=build_dot_buildgrid_dot_local__cas__pb2.GetInstanceNameForRemoteResponse.SerializeToString, + ), + } + generic_handler = grpc.method_handlers_generic_handler( + 'build.buildgrid.LocalContentAddressableStorage', rpc_method_handlers) + server.add_generic_rpc_handlers((generic_handler,)) diff --git a/src/buildstream/_scheduler/jobs/__init__.py b/src/buildstream/_scheduler/jobs/__init__.py index 3e213171a..9f081c8a0 100644 --- a/src/buildstream/_scheduler/jobs/__init__.py +++ b/src/buildstream/_scheduler/jobs/__init__.py @@ -18,6 +18,4 @@ # Tristan Maat <tristan.maat@codethink.co.uk> from .elementjob import ElementJob -from .cachesizejob import CacheSizeJob -from .cleanupjob import CleanupJob from .job import JobStatus diff --git a/src/buildstream/_scheduler/jobs/cachesizejob.py b/src/buildstream/_scheduler/jobs/cachesizejob.py deleted file mode 100644 index 581101c07..000000000 --- a/src/buildstream/_scheduler/jobs/cachesizejob.py +++ /dev/null @@ -1,48 +0,0 @@ -# Copyright (C) 2018 Codethink Limited -# -# This program is free software; you can redistribute it and/or -# modify it under the terms of the GNU Lesser General Public -# License as published by the Free Software Foundation; either -# version 2 of the License, or (at your option) any later version. -# -# This library is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU -# Lesser General Public License for more details. -# -# You should have received a copy of the GNU Lesser General Public -# License along with this library. If not, see <http://www.gnu.org/licenses/>. -# -# Author: -# Tristan Daniël Maat <tristan.maat@codethink.co.uk> -# -from .job import Job, JobStatus, ChildJob - - -class CacheSizeJob(Job): - def __init__(self, *args, complete_cb, **kwargs): - super().__init__(*args, **kwargs) - self.set_name(self.action_name) - self._complete_cb = complete_cb - - context = self._scheduler.context - self._casquota = context.get_casquota() - - def parent_complete(self, status, result): - if status is JobStatus.OK: - self._casquota.set_cache_size(result) - - if self._complete_cb: - self._complete_cb(status, result) - - def create_child_job(self, *args, **kwargs): - return ChildCacheSizeJob(*args, casquota=self._scheduler.context._casquota, **kwargs) - - -class ChildCacheSizeJob(ChildJob): - def __init__(self, *args, casquota, **kwargs): - super().__init__(*args, **kwargs) - self._casquota = casquota - - def child_process(self): - return self._casquota.compute_cache_size() diff --git a/src/buildstream/_scheduler/jobs/cleanupjob.py b/src/buildstream/_scheduler/jobs/cleanupjob.py deleted file mode 100644 index 3e9a8ff47..000000000 --- a/src/buildstream/_scheduler/jobs/cleanupjob.py +++ /dev/null @@ -1,55 +0,0 @@ -# Copyright (C) 2018 Codethink Limited -# -# This program is free software; you can redistribute it and/or -# modify it under the terms of the GNU Lesser General Public -# License as published by the Free Software Foundation; either -# version 2 of the License, or (at your option) any later version. -# -# This library is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU -# Lesser General Public License for more details. -# -# You should have received a copy of the GNU Lesser General Public -# License along with this library. If not, see <http://www.gnu.org/licenses/>. -# -# Author: -# Tristan Daniël Maat <tristan.maat@codethink.co.uk> -# -from .job import Job, JobStatus, ChildJob - - -class CleanupJob(Job): - def __init__(self, *args, complete_cb, **kwargs): - super().__init__(*args, **kwargs) - self.set_name(self.action_name) - self._complete_cb = complete_cb - - context = self._scheduler.context - self._casquota = context.get_casquota() - - def handle_message(self, message): - # Update the cache size in the main process as we go, - # this provides better feedback in the UI. - self._casquota.set_cache_size(message, write_to_disk=False) - - def parent_complete(self, status, result): - if status is JobStatus.OK: - self._casquota.set_cache_size(result, write_to_disk=False) - - if self._complete_cb: - self._complete_cb(status, result) - - def create_child_job(self, *args, **kwargs): - return ChildCleanupJob(*args, casquota=self._scheduler.context.get_casquota(), **kwargs) - - -class ChildCleanupJob(ChildJob): - def __init__(self, *args, casquota, **kwargs): - super().__init__(*args, **kwargs) - self._casquota = casquota - - def child_process(self): - def progress(): - self.send_message(self._casquota.get_cache_size()) - return self._casquota.clean(progress) diff --git a/src/buildstream/_scheduler/queues/buildqueue.py b/src/buildstream/_scheduler/queues/buildqueue.py index 1be3f7cd0..dc33e6510 100644 --- a/src/buildstream/_scheduler/queues/buildqueue.py +++ b/src/buildstream/_scheduler/queues/buildqueue.py @@ -21,7 +21,6 @@ from datetime import timedelta from . import Queue, QueueStatus -from ..jobs import JobStatus from ..resources import ResourceType from ..._message import MessageType @@ -73,39 +72,11 @@ class BuildQueue(Queue): return QueueStatus.READY - def _check_cache_size(self, job, element, artifact_size): - - # After completing a build job, add the artifact size - # as returned from Element._assemble() to the estimated - # artifact cache size - # - context = self._scheduler.context - artifacts = context.artifactcache - - artifacts.add_artifact_size(artifact_size) - - # If the estimated size outgrows the quota, ask the scheduler - # to queue a job to actually check the real cache size. - # - if artifacts.full(): - self._scheduler.check_cache_size() - def done(self, job, element, result, status): # Inform element in main process that assembly is done element._assemble_done() - # This has to be done after _assemble_done, such that the - # element may register its cache key as required - # - # FIXME: Element._assemble() does not report both the failure state and the - # size of the newly cached failed artifact, so we can only adjust the - # artifact cache size for a successful build even though we know a - # failed build also grows the artifact cache size. - # - if status is JobStatus.OK: - self._check_cache_size(job, element, result) - def register_pending_element(self, element): # Set a "buildable" callback for an element not yet ready # to be processed in the build queue. diff --git a/src/buildstream/_scheduler/queues/pullqueue.py b/src/buildstream/_scheduler/queues/pullqueue.py index 2c46cd2fd..7f4125099 100644 --- a/src/buildstream/_scheduler/queues/pullqueue.py +++ b/src/buildstream/_scheduler/queues/pullqueue.py @@ -52,12 +52,6 @@ class PullQueue(Queue): element._pull_done() - # Build jobs will check the "approximate" size first. Since we - # do not get an artifact size from pull jobs, we have to - # actually check the cache size. - if status is JobStatus.OK: - self._scheduler.check_cache_size() - def register_pending_element(self, element): # Set a "can_query_cache"_callback for an element which is not # immediately ready to query the artifact cache so that it diff --git a/src/buildstream/_scheduler/scheduler.py b/src/buildstream/_scheduler/scheduler.py index 17878c4fd..bd76e00b1 100644 --- a/src/buildstream/_scheduler/scheduler.py +++ b/src/buildstream/_scheduler/scheduler.py @@ -27,8 +27,8 @@ import datetime from contextlib import contextmanager # Local imports -from .resources import Resources, ResourceType -from .jobs import JobStatus, CacheSizeJob, CleanupJob +from .resources import Resources +from .jobs import JobStatus from .._profile import Topics, PROFILER @@ -39,12 +39,6 @@ class SchedStatus(): TERMINATED = 1 -# Some action names for the internal jobs we launch -# -_ACTION_NAME_CLEANUP = 'clean' -_ACTION_NAME_CACHE_SIZE = 'size' - - # Scheduler() # # The scheduler operates on a list queues, each of which is meant to accomplish @@ -98,12 +92,6 @@ class Scheduler(): self._queue_jobs = True # Whether we should continue to queue jobs self._state = state - # State of cache management related jobs - self._cache_size_scheduled = False # Whether we have a cache size job scheduled - self._cache_size_running = None # A running CacheSizeJob, or None - self._cleanup_scheduled = False # Whether we have a cleanup job scheduled - self._cleanup_running = None # A running CleanupJob, or None - # Callbacks to report back to the Scheduler owner self._interrupt_callback = interrupt_callback self._ticker_callback = ticker_callback @@ -127,6 +115,8 @@ class Scheduler(): # def run(self, queues): + assert self.context.fork_allowed + # Hold on to the queues to process self.queues = queues @@ -142,9 +132,6 @@ class Scheduler(): # Handle unix signals while running self._connect_signals() - # Check if we need to start with some cache maintenance - self._check_cache_management() - # Start the profiler with PROFILER.profile(Topics.SCHEDULER, "_".join(queue.action_name for queue in self.queues)): # Run the queues @@ -271,51 +258,10 @@ class Scheduler(): # Now check for more jobs self._sched() - # check_cache_size(): - # - # Queues a cache size calculation job, after the cache - # size is calculated, a cleanup job will be run automatically - # if needed. - # - def check_cache_size(self): - - # Here we assume we are called in response to a job - # completion callback, or before entering the scheduler. - # - # As such there is no need to call `_sched()` from here, - # and we prefer to run it once at the last moment. - # - self._cache_size_scheduled = True - ####################################################### # Local Private Methods # ####################################################### - # _check_cache_management() - # - # Run an initial check if we need to lock the cache - # resource and check the size and possibly launch - # a cleanup. - # - # Sessions which do not add to the cache are not affected. - # - def _check_cache_management(self): - - # Only trigger the check for a scheduler run which has - # queues which require the CACHE resource. - if not any(q for q in self.queues - if ResourceType.CACHE in q.resources): - return - - # If the estimated size outgrows the quota, queue a job to - # actually check the real cache size initially, this one - # should have exclusive access to the cache to ensure nothing - # starts while we are checking the cache. - # - artifacts = self.context.artifactcache - if artifacts.full(): - self._sched_cache_size_job(exclusive=True) - # _start_job() # # Spanws a job @@ -328,106 +274,6 @@ class Scheduler(): self._state.add_task(job.action_name, job.name, self.elapsed_time()) job.start() - # Callback for the cache size job - def _cache_size_job_complete(self, status, cache_size): - - # Deallocate cache size job resources - self._cache_size_running = None - self.resources.release([ResourceType.CACHE, ResourceType.PROCESS]) - - # Unregister the exclusive interest if there was any - self.resources.unregister_exclusive_interest( - [ResourceType.CACHE], 'cache-size' - ) - - # Schedule a cleanup job if we've hit the threshold - if status is not JobStatus.OK: - return - - context = self.context - artifacts = context.artifactcache - - if artifacts.full(): - self._cleanup_scheduled = True - - # Callback for the cleanup job - def _cleanup_job_complete(self, status, cache_size): - - # Deallocate cleanup job resources - self._cleanup_running = None - self.resources.release([ResourceType.CACHE, ResourceType.PROCESS]) - - # Unregister the exclusive interest when we're done with it - if not self._cleanup_scheduled: - self.resources.unregister_exclusive_interest( - [ResourceType.CACHE], 'cache-cleanup' - ) - - # _sched_cleanup_job() - # - # Runs a cleanup job if one is scheduled to run now and - # sufficient recources are available. - # - def _sched_cleanup_job(self): - - if self._cleanup_scheduled and self._cleanup_running is None: - - # Ensure we have an exclusive interest in the resources - self.resources.register_exclusive_interest( - [ResourceType.CACHE], 'cache-cleanup' - ) - - if self.resources.reserve([ResourceType.CACHE, ResourceType.PROCESS], - [ResourceType.CACHE]): - - # Update state and launch - self._cleanup_scheduled = False - self._cleanup_running = \ - CleanupJob(self, _ACTION_NAME_CLEANUP, 'cleanup/cleanup', - complete_cb=self._cleanup_job_complete) - self._start_job(self._cleanup_running) - - # _sched_cache_size_job() - # - # Runs a cache size job if one is scheduled to run now and - # sufficient recources are available. - # - # Args: - # exclusive (bool): Run a cache size job immediately and - # hold the ResourceType.CACHE resource - # exclusively (used at startup). - # - def _sched_cache_size_job(self, *, exclusive=False): - - # The exclusive argument is not intended (or safe) for arbitrary use. - if exclusive: - assert not self._cache_size_scheduled - assert not self._cache_size_running - assert not self._active_jobs - self._cache_size_scheduled = True - - if self._cache_size_scheduled and not self._cache_size_running: - - # Handle the exclusive launch - exclusive_resources = set() - if exclusive: - exclusive_resources.add(ResourceType.CACHE) - self.resources.register_exclusive_interest( - exclusive_resources, 'cache-size' - ) - - # Reserve the resources (with the possible exclusive cache resource) - if self.resources.reserve([ResourceType.CACHE, ResourceType.PROCESS], - exclusive_resources): - - # Update state and launch - self._cache_size_scheduled = False - self._cache_size_running = \ - CacheSizeJob(self, _ACTION_NAME_CACHE_SIZE, - 'cache_size/cache_size', - complete_cb=self._cache_size_job_complete) - self._start_job(self._cache_size_running) - # _sched_queue_jobs() # # Ask the queues what jobs they want to schedule and schedule @@ -490,12 +336,6 @@ class Scheduler(): if not self.terminated: # - # Try the cache management jobs - # - self._sched_cleanup_job() - self._sched_cache_size_job() - - # # Run as many jobs as the queues can handle for the # available resources # diff --git a/src/buildstream/_sourcecache.py b/src/buildstream/_sourcecache.py index b39874b20..64498ba32 100644 --- a/src/buildstream/_sourcecache.py +++ b/src/buildstream/_sourcecache.py @@ -94,52 +94,9 @@ class SourceCache(BaseCache): def __init__(self, context): super().__init__(context) - self._required_sources = set() self.sourcerefdir = os.path.join(context.cachedir, 'source_protos') os.makedirs(self.sourcerefdir, exist_ok=True) - self.casquota.add_remove_callbacks(self.unrequired_sources, self._remove_source) - self.casquota.add_list_refs_callback(self.list_sources) - - self.cas.add_reachable_directories_callback(self._reachable_directories) - - # mark_required_sources() - # - # Mark sources that are required by the current run. - # - # Sources that are in this list will not be removed during the current - # pipeline. - # - # Args: - # sources (iterable): An iterable over sources that are required - # - def mark_required_sources(self, sources): - sources = list(sources) # in case it's a generator - - self._required_sources.update(sources) - - # update mtimes just in case - for source in sources: - ref = source._get_source_name() - try: - self._update_mtime(ref) - except SourceCacheError: - pass - - # unrequired_sources() - # - # Yields the refs of all sources not required by the current build plan - # - # Returns: - # iter (str): iterable over unrequired source keys - # - def unrequired_sources(self): - required_source_names = set(map( - lambda x: x._get_source_name(), self._required_sources)) - for (mtime, source) in self._list_refs_mtimes(self.sourcerefdir): - if source not in required_source_names: - yield (mtime, source) - # list_sources() # # Get list of all sources in the `sources_protos/` folder diff --git a/src/buildstream/_stream.py b/src/buildstream/_stream.py index a7705332b..167127cf2 100644 --- a/src/buildstream/_stream.py +++ b/src/buildstream/_stream.py @@ -19,8 +19,6 @@ # Jürg Billeter <juerg.billeter@codethink.co.uk> # Tristan Maat <tristan.maat@codethink.co.uk> -import itertools -import functools import os import sys import stat @@ -225,6 +223,8 @@ class Stream(): else: buildtree = True + self._context.disable_fork() + return element._shell(scope, directory, mounts=mounts, isolate=isolate, prompt=prompt, command=command, usebuildtree=buildtree) @@ -557,6 +557,8 @@ class Stream(): self._enqueue_plan(uncached_elts) self._run() + self._context.disable_fork() + # Stage deps into a temporary sandbox first if isinstance(target, ArtifactElement): try: @@ -669,9 +671,8 @@ class Stream(): # # Args: # targets (str): Targets to remove - # no_prune (bool): Whether to prune the unreachable refs, default False # - def artifact_delete(self, targets, no_prune): + def artifact_delete(self, targets): # Return list of Element and/or ArtifactElement objects target_objects = self.load_selection(targets, selection=PipelineSelection.NONE, load_refs=True) @@ -686,7 +687,7 @@ class Stream(): ref_removed = False for ref in remove_refs: try: - self._artifacts.remove(ref, defer_prune=True) + self._artifacts.remove(ref) except ArtifactError as e: self._message(MessageType.WARN, str(e)) continue @@ -694,11 +695,6 @@ class Stream(): self._message(MessageType.INFO, "Removed: {}".format(ref)) ref_removed = True - # Prune the artifact cache - if ref_removed and not no_prune: - with self._context.messenger.timed_activity("Pruning artifact cache"): - self._artifacts.prune() - if not ref_removed: self._message(MessageType.INFO, "No artifacts were removed") @@ -1248,20 +1244,6 @@ class Stream(): selected, except_elements) - # Set the "required" artifacts that should not be removed - # while this pipeline is active - # - # It must include all the artifacts which are required by the - # final product. Note that this is a superset of the build plan. - # - # use partial as we send this to both Artifact and Source caches - required_elements = functools.partial(self._pipeline.dependencies, elements, Scope.ALL) - self._artifacts.mark_required_elements(required_elements()) - - self._sourcecache.mark_required_sources( - itertools.chain.from_iterable( - [element.sources() for element in required_elements()])) - if selection == PipelineSelection.PLAN and dynamic_plan: # We use a dynamic build plan, only request artifacts of top-level targets, # others are requested dynamically as needed. diff --git a/src/buildstream/element.py b/src/buildstream/element.py index ca573bee1..bd5ca14e7 100644 --- a/src/buildstream/element.py +++ b/src/buildstream/element.py @@ -1566,21 +1566,6 @@ class Element(Plugin): workspace.clear_running_files() self._get_context().get_workspaces().save_config() - # This element will have already been marked as - # required, but we bump the atime again, in case - # we did not know the cache key until now. - # - # FIXME: This is not exactly correct, we should be - # doing this at the time which we have discovered - # a new cache key, this just happens to be the - # last place where that can happen. - # - # Ultimately, we should be refactoring - # Element._update_state() such that we know - # when a cache key is actually discovered. - # - self.__artifacts.mark_required_elements([self]) - # _assemble(): # # Internal method for running the entire build phase. diff --git a/src/buildstream/sandbox/_sandboxremote.py b/src/buildstream/sandbox/_sandboxremote.py index affe597dd..4308d662b 100644 --- a/src/buildstream/sandbox/_sandboxremote.py +++ b/src/buildstream/sandbox/_sandboxremote.py @@ -281,7 +281,7 @@ class SandboxRemote(Sandbox): context = self._get_context() cascache = context.get_cascache() artifactcache = context.artifactcache - casremote = CASRemote(self.storage_remote_spec) + casremote = CASRemote(self.storage_remote_spec, cascache) # Now do a pull to ensure we have the full directory structure. dir_digest = cascache.pull_tree(casremote, tree_digest) @@ -300,7 +300,7 @@ class SandboxRemote(Sandbox): project = self._get_project() cascache = context.get_cascache() artifactcache = context.artifactcache - casremote = CASRemote(self.storage_remote_spec) + casremote = CASRemote(self.storage_remote_spec, cascache) # Fetch the file blobs if needed if self._output_files_required or artifactcache.has_push_remotes(): @@ -368,7 +368,7 @@ class SandboxRemote(Sandbox): action_result = self._check_action_cache(action_digest) if not action_result: - casremote = CASRemote(self.storage_remote_spec) + casremote = CASRemote(self.storage_remote_spec, cascache) try: casremote.init() except grpc.RpcError as e: diff --git a/src/buildstream/testing/runcli.py b/src/buildstream/testing/runcli.py index 95bf83eff..6c3ab3496 100644 --- a/src/buildstream/testing/runcli.py +++ b/src/buildstream/testing/runcli.py @@ -746,7 +746,7 @@ class TestArtifact(): def _extract_subdirectory(self, tmpdir, digest): with tempfile.TemporaryDirectory() as extractdir: try: - cas = CASCache(str(tmpdir)) + cas = CASCache(str(tmpdir), casd=False) cas.checkout(extractdir, digest) yield extractdir except FileNotFoundError: diff --git a/tests/artifactcache/artifactservice.py b/tests/artifactcache/artifactservice.py index ac9514793..3d229e24f 100644 --- a/tests/artifactcache/artifactservice.py +++ b/tests/artifactcache/artifactservice.py @@ -86,19 +86,9 @@ def _artifact_request(url, queue): @pytest.mark.parametrize("files", ["present", "absent", "invalid"]) def test_update_artifact(tmpdir, files): sharedir = os.path.join(str(tmpdir), "share") - with create_artifact_share(sharedir) as share: - # put files object - if files == "present": - directory = re_pb2.Directory() - digest = share.cas.add_object(buffer=directory.SerializeToString()) - elif files == "invalid": - digest = share.cas.add_object(buffer="abcdefghijklmnop".encode("utf-8")) - elif files == "absent": - digest = utils._message_digest("abcdefghijklmnop".encode("utf-8")) - - url = urlparse(share.repo) + with create_artifact_share(sharedir, casd=True) as share: queue = Queue() - process = Process(target=_queue_wrapper, args=(_get_artifact, queue, url, files, digest)) + process = Process(target=_queue_wrapper, args=(_update_artifact, queue, share, files)) try: with _signals.blocked([signal.SIGINT], ignore=False): @@ -112,7 +102,18 @@ def test_update_artifact(tmpdir, files): assert not error -def _get_artifact(url, files, digest, queue): +def _update_artifact(share, files, *, queue): + # put files object + if files == "present": + directory = re_pb2.Directory() + digest = share.cas.add_object(buffer=directory.SerializeToString()) + elif files == "invalid": + digest = share.cas.add_object(buffer="abcdefghijklmnop".encode("utf-8")) + elif files == "absent": + digest = utils._message_digest("abcdefghijklmnop".encode("utf-8")) + + url = urlparse(share.repo) + channel = grpc.insecure_channel("{}:{}".format(url.hostname, url.port)) artifact_stub = ArtifactServiceStub(channel) diff --git a/tests/artifactcache/cache_size.py b/tests/artifactcache/cache_size.py deleted file mode 100644 index fb34b5fad..000000000 --- a/tests/artifactcache/cache_size.py +++ /dev/null @@ -1,90 +0,0 @@ -# Pylint doesn't play well with fixtures and dependency injection from pytest -# pylint: disable=redefined-outer-name - -import os -from unittest import mock - -from buildstream import _yaml -from buildstream._cas.cascache import CACHE_SIZE_FILE -from buildstream._exceptions import ErrorDomain -from buildstream.testing import cli # pylint: disable=unused-import - -from tests.testutils import create_element_size - -# XXX: Currently lacking: -# * A way to check whether it's faster to read cache size on -# successive invocations. -# * A way to check whether the cache size file has been read. - - -def create_project(project_dir): - project_file = os.path.join(project_dir, "project.conf") - project_conf = { - "name": "test" - } - _yaml.roundtrip_dump(project_conf, project_file) - element_name = "test.bst" - create_element_size(element_name, project_dir, ".", [], 1024) - - -def test_cache_size_roundtrip(cli, tmpdir): - # Builds (to put files in the cache), then invokes buildstream again - # to check nothing breaks - - # Create project - project_dir = str(tmpdir) - create_project(project_dir) - - # Build, to populate the cache - res = cli.run(project=project_dir, args=["build", "test.bst"]) - res.assert_success() - - # Show, to check that nothing breaks while reading cache size - res = cli.run(project=project_dir, args=["show", "test.bst"]) - res.assert_success() - - -def test_cache_size_write(cli, tmpdir): - # Builds (to put files in the cache), then checks a number is - # written to the cache size file. - - project_dir = str(tmpdir) - create_project(project_dir) - - # Artifact cache must be in a known place - casdir = os.path.join(project_dir, "cas") - cli.configure({"cachedir": project_dir}) - - # Build, to populate the cache - res = cli.run(project=project_dir, args=["build", "test.bst"]) - res.assert_success() - - # Inspect the artifact cache - sizefile = os.path.join(casdir, CACHE_SIZE_FILE) - assert os.path.isfile(sizefile) - - -def test_quota_over_1024T(cli, tmpdir): - KiB = 1024 - MiB = (KiB * 1024) - GiB = (MiB * 1024) - TiB = (GiB * 1024) - - cli.configure({ - 'cache': { - 'quota': 2048 * TiB - } - }) - project = tmpdir.join("main") - os.makedirs(str(project)) - _yaml.roundtrip_dump({'name': 'main'}, str(project.join("project.conf"))) - - volume_space_patch = mock.patch( - "buildstream._cas.CASQuota._get_cache_volume_size", - autospec=True, - return_value=(1025 * TiB, 1025 * TiB) - ) - - with volume_space_patch: - result = cli.run(project, args=["build", "file.bst"]) - result.assert_main_error(ErrorDomain.CAS, 'insufficient-storage-for-quota') diff --git a/tests/artifactcache/expiry.py b/tests/artifactcache/expiry.py index b163903cb..e0bbb4007 100644 --- a/tests/artifactcache/expiry.py +++ b/tests/artifactcache/expiry.py @@ -21,7 +21,6 @@ # pylint: disable=redefined-outer-name import os -import re from unittest import mock import pytest @@ -213,6 +212,7 @@ def test_never_delete_required(cli, datafiles): 'quota': 10000000 }, 'scheduler': { + 'fetchers': 1, 'builders': 1 } }) @@ -223,30 +223,19 @@ def test_never_delete_required(cli, datafiles): create_element_size('dep3.bst', project, element_path, ['dep2.bst'], 8000000) create_element_size('target.bst', project, element_path, ['dep3.bst'], 8000000) + # Build dep1.bst, which should fit into the cache. + res = cli.run(project=project, args=['build', 'dep1.bst']) + res.assert_success() + # We try to build this pipeline, but it's too big for the # cache. Since all elements are required, the build should fail. res = cli.run(project=project, args=['build', 'target.bst']) res.assert_main_error(ErrorDomain.STREAM, None) res.assert_task_error(ErrorDomain.CAS, 'cache-too-full') - # Only the first artifact fits in the cache, but we expect - # that the first *two* artifacts will be cached. - # - # This is because after caching the first artifact we must - # proceed to build the next artifact, and we cannot really - # know how large an artifact will be until we try to cache it. - # - # In this case, we deem it more acceptable to not delete an - # artifact which caused the cache to outgrow the quota. - # - # Note that this test only works because we have forced - # the configuration to build one element at a time, in real - # life there may potentially be N-builders cached artifacts - # which exceed the quota - # states = cli.get_element_states(project, ['target.bst']) assert states['dep1.bst'] == 'cached' - assert states['dep2.bst'] == 'cached' + assert states['dep2.bst'] != 'cached' assert states['dep3.bst'] != 'cached' assert states['target.bst'] != 'cached' @@ -265,6 +254,7 @@ def test_never_delete_required_track(cli, datafiles): 'quota': 10000000 }, 'scheduler': { + 'fetchers': 1, 'builders': 1 } }) @@ -348,6 +338,7 @@ def test_never_delete_required_track(cli, datafiles): ("70%", 'warning', 'Your system does not have enough available') ]) @pytest.mark.datafiles(DATA_DIR) +@pytest.mark.xfail() def test_invalid_cache_quota(cli, datafiles, quota, err_domain, err_reason): project = str(datafiles) os.makedirs(os.path.join(project, 'elements')) @@ -438,18 +429,6 @@ def test_cleanup_first(cli, datafiles): res = cli.run(project=project, args=['build', 'target2.bst']) res.assert_success() - # Find all of the activity (like push, pull, src-pull) lines - results = re.findall(r'\[.*\]\[.*\]\[\s*(\S+):.*\]\s*START\s*.*\.log', res.stderr) - - # Don't bother checking the order of 'src-pull', it is allowed to start - # before or after the initial cache size job, runs in parallel, and does - # not require ResourceType.CACHE. - results.remove('fetch') - print(results) - - # Assert the expected sequence of events - assert results == ['size', 'clean', 'build'] - # Check that the correct element remains in the cache states = cli.get_element_states(project, ['target.bst', 'target2.bst']) assert states['target.bst'] != 'cached' diff --git a/tests/frontend/push.py b/tests/frontend/push.py index e92646154..9c3947c2a 100644 --- a/tests/frontend/push.py +++ b/tests/frontend/push.py @@ -292,19 +292,17 @@ def test_artifact_expires(cli, datafiles, tmpdir): element_path = 'elements' # Create an artifact share (remote artifact cache) in the tmpdir/artifactshare - # Mock a file system with 12 MB free disk space + # Set a 22 MB quota with create_artifact_share(os.path.join(str(tmpdir), 'artifactshare'), - min_head_size=int(2e9), - max_head_size=int(2e9), - total_space=int(10e9), free_space=(int(12e6) + int(2e9))) as share: + quota=int(22e6)) as share: # Configure bst to push to the cache cli.configure({ 'artifacts': {'url': share.repo, 'push': True}, }) - # Create and build an element of 5 MB - create_element_size('element1.bst', project, element_path, [], int(5e6)) + # Create and build an element of 15 MB + create_element_size('element1.bst', project, element_path, [], int(15e6)) result = cli.run(project=project, args=['build', 'element1.bst']) result.assert_success() @@ -349,7 +347,7 @@ def test_artifact_too_large(cli, datafiles, tmpdir): # Create an artifact share (remote cache) in tmpdir/artifactshare # Mock a file system with 5 MB total space with create_artifact_share(os.path.join(str(tmpdir), 'artifactshare'), - total_space=int(5e6) + int(2e9)) as share: + quota=int(5e6)) as share: # Configure bst to push to the remote cache cli.configure({ @@ -383,23 +381,21 @@ def test_recently_pulled_artifact_does_not_expire(cli, datafiles, tmpdir): element_path = 'elements' # Create an artifact share (remote cache) in tmpdir/artifactshare - # Mock a file system with 12 MB free disk space + # Set a 22 MB quota with create_artifact_share(os.path.join(str(tmpdir), 'artifactshare'), - min_head_size=int(2e9), - max_head_size=int(2e9), - total_space=int(10e9), free_space=(int(12e6) + int(2e9))) as share: + quota=int(22e6)) as share: # Configure bst to push to the cache cli.configure({ 'artifacts': {'url': share.repo, 'push': True}, }) - # Create and build 2 elements, each of 5 MB. + # Create and build 2 elements, one 5 MB and one 15 MB. create_element_size('element1.bst', project, element_path, [], int(5e6)) result = cli.run(project=project, args=['build', 'element1.bst']) result.assert_success() - create_element_size('element2.bst', project, element_path, [], int(5e6)) + create_element_size('element2.bst', project, element_path, [], int(15e6)) result = cli.run(project=project, args=['build', 'element2.bst']) result.assert_success() diff --git a/tests/integration/cachedfail.py b/tests/integration/cachedfail.py index e3b5b2796..f8dd52aa6 100644 --- a/tests/integration/cachedfail.py +++ b/tests/integration/cachedfail.py @@ -20,7 +20,7 @@ import os import pytest -from buildstream import _yaml +from buildstream import utils, _yaml from buildstream._exceptions import ErrorDomain from buildstream.testing import cli_integration as cli # pylint: disable=unused-import from buildstream.testing._utils.site import HAVE_SANDBOX @@ -185,7 +185,12 @@ def test_push_cached_fail(cli, tmpdir, datafiles, on_error): @pytest.mark.skipif(HAVE_SANDBOX != 'bwrap', reason='Only available with bubblewrap on Linux') @pytest.mark.datafiles(DATA_DIR) -def test_host_tools_errors_are_not_cached(cli, datafiles): +def test_host_tools_errors_are_not_cached(cli, datafiles, tmp_path): + # Create symlink to buildbox-casd to work with custom PATH + buildbox_casd = tmp_path.joinpath('bin/buildbox-casd') + buildbox_casd.parent.mkdir() + os.symlink(utils.get_host_tool('buildbox-casd'), str(buildbox_casd)) + project = str(datafiles) element_path = os.path.join(project, 'elements', 'element.bst') @@ -207,7 +212,11 @@ def test_host_tools_errors_are_not_cached(cli, datafiles): _yaml.roundtrip_dump(element, element_path) # Build without access to host tools, this will fail - result1 = cli.run(project=project, args=['build', 'element.bst'], env={'PATH': '', 'BST_FORCE_SANDBOX': None}) + result1 = cli.run( + project=project, + args=['build', 'element.bst'], + env={'PATH': str(tmp_path.joinpath('bin')), + 'BST_FORCE_SANDBOX': None}) result1.assert_task_error(ErrorDomain.SANDBOX, 'unavailable-local-sandbox') assert cli.get_element_state(project, 'element.bst') == 'buildable' diff --git a/tests/internals/storage.py b/tests/internals/storage.py index 66423afd1..385162c13 100644 --- a/tests/internals/storage.py +++ b/tests/internals/storage.py @@ -1,7 +1,11 @@ +from contextlib import contextmanager +import multiprocessing import os +import signal import pytest +from buildstream import utils, _signals from buildstream._cas import CASCache from buildstream.storage._casbaseddirectory import CasBasedDirectory from buildstream.storage._filebaseddirectory import FileBasedDirectory @@ -12,44 +16,93 @@ DATA_DIR = os.path.join( ) +# Since parent processes wait for queue events, we need +# to put something on it if the called process raises an +# exception. +def _queue_wrapper(target, queue, *args): + try: + target(*args) + except Exception as e: + queue.put(str(e)) + raise + + queue.put(None) + + +@contextmanager def setup_backend(backend_class, tmpdir): if backend_class == FileBasedDirectory: - return backend_class(os.path.join(tmpdir, "vdir")) + yield backend_class(os.path.join(tmpdir, "vdir")) else: cas_cache = CASCache(tmpdir) - return backend_class(cas_cache) + try: + yield backend_class(cas_cache) + finally: + cas_cache.release_resources() -@pytest.mark.parametrize("backend", [ - FileBasedDirectory, CasBasedDirectory]) -@pytest.mark.datafiles(DATA_DIR) -def test_import(tmpdir, datafiles, backend): +def _test_import_subprocess(tmpdir, datafiles, backend): original = os.path.join(str(datafiles), "original") - c = setup_backend(backend, str(tmpdir)) - - c.import_files(original) + with setup_backend(backend, str(tmpdir)) as c: + c.import_files(original) - assert "bin/bash" in c.list_relative_paths() - assert "bin/hello" in c.list_relative_paths() + assert "bin/bash" in c.list_relative_paths() + assert "bin/hello" in c.list_relative_paths() @pytest.mark.parametrize("backend", [ FileBasedDirectory, CasBasedDirectory]) @pytest.mark.datafiles(DATA_DIR) -def test_modified_file_list(tmpdir, datafiles, backend): +def test_import(tmpdir, datafiles, backend): + queue = multiprocessing.Queue() + process = multiprocessing.Process(target=_queue_wrapper, + args=(_test_import_subprocess, queue, + tmpdir, datafiles, backend)) + try: + with _signals.blocked([signal.SIGINT], ignore=False): + process.start() + error = queue.get() + process.join() + except KeyboardInterrupt: + utils._kill_process_tree(process.pid) + raise + + assert not error + + +def _test_modified_file_list_subprocess(tmpdir, datafiles, backend): original = os.path.join(str(datafiles), "original") overlay = os.path.join(str(datafiles), "overlay") - c = setup_backend(backend, str(tmpdir)) + with setup_backend(backend, str(tmpdir)) as c: + c.import_files(original) - c.import_files(original) + c.mark_unmodified() - c.mark_unmodified() + c.import_files(overlay) - c.import_files(overlay) + print("List of all paths in imported results: {}".format(c.list_relative_paths())) + assert "bin/bash" in c.list_relative_paths() + assert "bin/bash" in c.list_modified_paths() + assert "bin/hello" not in c.list_modified_paths() - print("List of all paths in imported results: {}".format(c.list_relative_paths())) - assert "bin/bash" in c.list_relative_paths() - assert "bin/bash" in c.list_modified_paths() - assert "bin/hello" not in c.list_modified_paths() + +@pytest.mark.parametrize("backend", [ + FileBasedDirectory, CasBasedDirectory]) +@pytest.mark.datafiles(DATA_DIR) +def test_modified_file_list(tmpdir, datafiles, backend): + queue = multiprocessing.Queue() + process = multiprocessing.Process(target=_queue_wrapper, + args=(_test_modified_file_list_subprocess, queue, + tmpdir, datafiles, backend)) + try: + with _signals.blocked([signal.SIGINT], ignore=False): + process.start() + error = queue.get() + process.join() + except KeyboardInterrupt: + utils._kill_process_tree(process.pid) + raise + + assert not error diff --git a/tests/internals/storage_vdir_import.py b/tests/internals/storage_vdir_import.py index 7c6cbe4fb..e0165fc13 100644 --- a/tests/internals/storage_vdir_import.py +++ b/tests/internals/storage_vdir_import.py @@ -14,11 +14,14 @@ # You should have received a copy of the GNU Lesser General Public # License along with this library. If not, see <http://www.gnu.org/licenses/>. from hashlib import sha256 +import multiprocessing import os import random +import signal import pytest +from buildstream import utils, _signals from buildstream.storage._casbaseddirectory import CasBasedDirectory from buildstream.storage._filebaseddirectory import FileBasedDirectory from buildstream._cas import CASCache @@ -55,6 +58,34 @@ RANDOM_SEED = 69105 NUM_RANDOM_TESTS = 10 +# Since parent processes wait for queue events, we need +# to put something on it if the called process raises an +# exception. +def _queue_wrapper(target, queue, *args): + try: + target(*args) + except Exception as e: + queue.put(str(e)) + raise + + queue.put(None) + + +def _run_test_in_subprocess(func, *args): + queue = multiprocessing.Queue() + process = multiprocessing.Process(target=_queue_wrapper, args=(func, queue, *args)) + try: + with _signals.blocked([signal.SIGINT], ignore=False): + process.start() + error = queue.get() + process.join() + except KeyboardInterrupt: + utils._kill_process_tree(process.pid) + raise + + assert not error + + def generate_import_roots(rootno, directory): rootname = "root{}".format(rootno) rootdir = os.path.join(directory, "content", rootname) @@ -182,56 +213,64 @@ def directory_not_empty(path): return os.listdir(path) -def _import_test(tmpdir, original, overlay, generator_function, verify_contents=False): +def _import_test_subprocess(tmpdir, original, overlay, generator_function, verify_contents=False): cas_cache = CASCache(tmpdir) - # Create some fake content - generator_function(original, tmpdir) - if original != overlay: - generator_function(overlay, tmpdir) - - d = create_new_casdir(original, cas_cache, tmpdir) - - duplicate_cas = create_new_casdir(original, cas_cache, tmpdir) - - assert duplicate_cas._get_digest().hash == d._get_digest().hash - - d2 = create_new_casdir(overlay, cas_cache, tmpdir) - d.import_files(d2) - export_dir = os.path.join(tmpdir, "output-{}-{}".format(original, overlay)) - roundtrip_dir = os.path.join(tmpdir, "roundtrip-{}-{}".format(original, overlay)) - d2.export_files(roundtrip_dir) - d.export_files(export_dir) - - if verify_contents: - for item in root_filesets[overlay - 1]: - (path, typename, content) = item - realpath = resolve_symlinks(path, export_dir) - if typename == 'F': - if os.path.isdir(realpath) and directory_not_empty(realpath): - # The file should not have overwritten the directory in this case. - pass - else: - assert os.path.isfile(realpath), "{} did not exist in the combined virtual directory".format(path) - assert file_contents_are(realpath, content) - elif typename == 'S': - if os.path.isdir(realpath) and directory_not_empty(realpath): - # The symlink should not have overwritten the directory in this case. - pass - else: - assert os.path.islink(realpath) - assert os.readlink(realpath) == content - elif typename == 'D': - # We can't do any more tests than this because it - # depends on things present in the original. Blank - # directories here will be ignored and the original - # left in place. - assert os.path.lexists(realpath) - - # Now do the same thing with filebaseddirectories and check the contents match - - duplicate_cas.import_files(roundtrip_dir) - - assert duplicate_cas._get_digest().hash == d._get_digest().hash + try: + # Create some fake content + generator_function(original, tmpdir) + if original != overlay: + generator_function(overlay, tmpdir) + + d = create_new_casdir(original, cas_cache, tmpdir) + + duplicate_cas = create_new_casdir(original, cas_cache, tmpdir) + + assert duplicate_cas._get_digest().hash == d._get_digest().hash + + d2 = create_new_casdir(overlay, cas_cache, tmpdir) + d.import_files(d2) + export_dir = os.path.join(tmpdir, "output-{}-{}".format(original, overlay)) + roundtrip_dir = os.path.join(tmpdir, "roundtrip-{}-{}".format(original, overlay)) + d2.export_files(roundtrip_dir) + d.export_files(export_dir) + + if verify_contents: + for item in root_filesets[overlay - 1]: + (path, typename, content) = item + realpath = resolve_symlinks(path, export_dir) + if typename == 'F': + if os.path.isdir(realpath) and directory_not_empty(realpath): + # The file should not have overwritten the directory in this case. + pass + else: + assert os.path.isfile(realpath), \ + "{} did not exist in the combined virtual directory".format(path) + assert file_contents_are(realpath, content) + elif typename == 'S': + if os.path.isdir(realpath) and directory_not_empty(realpath): + # The symlink should not have overwritten the directory in this case. + pass + else: + assert os.path.islink(realpath) + assert os.readlink(realpath) == content + elif typename == 'D': + # We can't do any more tests than this because it + # depends on things present in the original. Blank + # directories here will be ignored and the original + # left in place. + assert os.path.lexists(realpath) + + # Now do the same thing with filebaseddirectories and check the contents match + + duplicate_cas.import_files(roundtrip_dir) + + assert duplicate_cas._get_digest().hash == d._get_digest().hash + finally: + cas_cache.release_resources() + + +def _import_test(tmpdir, original, overlay, generator_function, verify_contents=False): + _run_test_in_subprocess(_import_test_subprocess, tmpdir, original, overlay, generator_function, verify_contents) # It's possible to parameterize on both original and overlay values, @@ -249,18 +288,25 @@ def test_random_cas_import(tmpdir, original): _import_test(str(tmpdir), original, overlay, generate_random_root, verify_contents=False) -def _listing_test(tmpdir, root, generator_function): +def _listing_test_subprocess(tmpdir, root, generator_function): cas_cache = CASCache(tmpdir) - # Create some fake content - generator_function(root, tmpdir) + try: + # Create some fake content + generator_function(root, tmpdir) + + d = create_new_filedir(root, tmpdir) + filelist = list(d.list_relative_paths()) - d = create_new_filedir(root, tmpdir) - filelist = list(d.list_relative_paths()) + d2 = create_new_casdir(root, cas_cache, tmpdir) + filelist2 = list(d2.list_relative_paths()) - d2 = create_new_casdir(root, cas_cache, tmpdir) - filelist2 = list(d2.list_relative_paths()) + assert filelist == filelist2 + finally: + cas_cache.release_resources() - assert filelist == filelist2 + +def _listing_test(tmpdir, root, generator_function): + _run_test_in_subprocess(_listing_test_subprocess, tmpdir, root, generator_function) @pytest.mark.parametrize("root", range(1, 11)) @@ -274,120 +320,155 @@ def test_fixed_directory_listing(tmpdir, root): # Check that the vdir is decending and readable -def test_descend(tmpdir): +def _test_descend_subprocess(tmpdir): cas_dir = os.path.join(str(tmpdir), 'cas') cas_cache = CASCache(cas_dir) - d = CasBasedDirectory(cas_cache) + try: + d = CasBasedDirectory(cas_cache) + + Content_to_check = 'You got me' + test_dir = os.path.join(str(tmpdir), 'importfrom') + filesys_discription = [ + ('a', 'D', ''), + ('a/l', 'D', ''), + ('a/l/g', 'F', Content_to_check) + ] + generate_import_root(test_dir, filesys_discription) + + d.import_files(test_dir) + digest = d.descend('a', 'l').index['g'].get_digest() - Content_to_check = 'You got me' - test_dir = os.path.join(str(tmpdir), 'importfrom') - filesys_discription = [ - ('a', 'D', ''), - ('a/l', 'D', ''), - ('a/l/g', 'F', Content_to_check) - ] - generate_import_root(test_dir, filesys_discription) + assert Content_to_check == open(cas_cache.objpath(digest)).read() + finally: + cas_cache.release_resources() - d.import_files(test_dir) - digest = d.descend('a', 'l').index['g'].get_digest() - assert Content_to_check == open(cas_cache.objpath(digest)).read() +def test_descend(tmpdir): + _run_test_in_subprocess(_test_descend_subprocess, tmpdir) # Check symlink logic for edgecases # Make sure the correct erros are raised when trying # to decend in to files or links to files -def test_bad_symlinks(tmpdir): +def _test_bad_symlinks_subprocess(tmpdir): cas_dir = os.path.join(str(tmpdir), 'cas') cas_cache = CASCache(cas_dir) - d = CasBasedDirectory(cas_cache) - - test_dir = os.path.join(str(tmpdir), 'importfrom') - filesys_discription = [ - ('a', 'D', ''), - ('a/l', 'S', '../target'), - ('target', 'F', 'You got me') - ] - generate_import_root(test_dir, filesys_discription) - d.import_files(test_dir) - exp_reason = "not-a-directory" - - with pytest.raises(VirtualDirectoryError) as error: - d.descend('a', 'l', follow_symlinks=True) - assert error.reason == exp_reason + try: + d = CasBasedDirectory(cas_cache) + + test_dir = os.path.join(str(tmpdir), 'importfrom') + filesys_discription = [ + ('a', 'D', ''), + ('a/l', 'S', '../target'), + ('target', 'F', 'You got me') + ] + generate_import_root(test_dir, filesys_discription) + d.import_files(test_dir) + exp_reason = "not-a-directory" + + with pytest.raises(VirtualDirectoryError) as error: + d.descend('a', 'l', follow_symlinks=True) + assert error.reason == exp_reason + + with pytest.raises(VirtualDirectoryError) as error: + d.descend('a', 'l') + assert error.reason == exp_reason + + with pytest.raises(VirtualDirectoryError) as error: + d.descend('a', 'f') + assert error.reason == exp_reason + finally: + cas_cache.release_resources() - with pytest.raises(VirtualDirectoryError) as error: - d.descend('a', 'l') - assert error.reason == exp_reason - with pytest.raises(VirtualDirectoryError) as error: - d.descend('a', 'f') - assert error.reason == exp_reason +def test_bad_symlinks(tmpdir): + _run_test_in_subprocess(_test_bad_symlinks_subprocess, tmpdir) # Check symlink logic for edgecases # Check decend accross relitive link -def test_relitive_symlink(tmpdir): +def _test_relative_symlink_subprocess(tmpdir): cas_dir = os.path.join(str(tmpdir), 'cas') cas_cache = CASCache(cas_dir) - d = CasBasedDirectory(cas_cache) + try: + d = CasBasedDirectory(cas_cache) + + Content_to_check = 'You got me' + test_dir = os.path.join(str(tmpdir), 'importfrom') + filesys_discription = [ + ('a', 'D', ''), + ('a/l', 'S', '../target'), + ('target', 'D', ''), + ('target/file', 'F', Content_to_check) + ] + generate_import_root(test_dir, filesys_discription) + d.import_files(test_dir) + + digest = d.descend('a', 'l', follow_symlinks=True).index['file'].get_digest() + assert Content_to_check == open(cas_cache.objpath(digest)).read() + finally: + cas_cache.release_resources() - Content_to_check = 'You got me' - test_dir = os.path.join(str(tmpdir), 'importfrom') - filesys_discription = [ - ('a', 'D', ''), - ('a/l', 'S', '../target'), - ('target', 'D', ''), - ('target/file', 'F', Content_to_check) - ] - generate_import_root(test_dir, filesys_discription) - d.import_files(test_dir) - digest = d.descend('a', 'l', follow_symlinks=True).index['file'].get_digest() - assert Content_to_check == open(cas_cache.objpath(digest)).read() +def test_relative_symlink(tmpdir): + _run_test_in_subprocess(_test_relative_symlink_subprocess, tmpdir) # Check symlink logic for edgecases # Check deccend accross abs link -def test_abs_symlink(tmpdir): +def _test_abs_symlink_subprocess(tmpdir): cas_dir = os.path.join(str(tmpdir), 'cas') cas_cache = CASCache(cas_dir) - d = CasBasedDirectory(cas_cache) + try: + d = CasBasedDirectory(cas_cache) + + Content_to_check = 'two step file' + test_dir = os.path.join(str(tmpdir), 'importfrom') + filesys_discription = [ + ('a', 'D', ''), + ('a/l', 'S', '/target'), + ('target', 'D', ''), + ('target/file', 'F', Content_to_check) + ] + generate_import_root(test_dir, filesys_discription) + d.import_files(test_dir) - Content_to_check = 'two step file' - test_dir = os.path.join(str(tmpdir), 'importfrom') - filesys_discription = [ - ('a', 'D', ''), - ('a/l', 'S', '/target'), - ('target', 'D', ''), - ('target/file', 'F', Content_to_check) - ] - generate_import_root(test_dir, filesys_discription) - d.import_files(test_dir) + digest = d.descend('a', 'l', follow_symlinks=True).index['file'].get_digest() - digest = d.descend('a', 'l', follow_symlinks=True).index['file'].get_digest() + assert Content_to_check == open(cas_cache.objpath(digest)).read() + finally: + cas_cache.release_resources() - assert Content_to_check == open(cas_cache.objpath(digest)).read() + +def test_abs_symlink(tmpdir): + _run_test_in_subprocess(_test_abs_symlink_subprocess, tmpdir) # Check symlink logic for edgecases # Check symlink can not escape root -def test_bad_sym_escape(tmpdir): +def _test_bad_sym_escape_subprocess(tmpdir): cas_dir = os.path.join(str(tmpdir), 'cas') cas_cache = CASCache(cas_dir) - d = CasBasedDirectory(cas_cache) + try: + d = CasBasedDirectory(cas_cache) + + test_dir = os.path.join(str(tmpdir), 'importfrom') + filesys_discription = [ + ('jail', 'D', ''), + ('jail/a', 'D', ''), + ('jail/a/l', 'S', '../../target'), + ('target', 'D', ''), + ('target/file', 'F', 'two step file') + ] + generate_import_root(test_dir, filesys_discription) + d.import_files(os.path.join(test_dir, 'jail')) + + with pytest.raises(VirtualDirectoryError) as error: + d.descend('a', 'l', follow_symlinks=True) + assert error.reason == "directory-not-found" + finally: + cas_cache.release_resources() - test_dir = os.path.join(str(tmpdir), 'importfrom') - filesys_discription = [ - ('jail', 'D', ''), - ('jail/a', 'D', ''), - ('jail/a/l', 'S', '../../target'), - ('target', 'D', ''), - ('target/file', 'F', 'two step file') - ] - generate_import_root(test_dir, filesys_discription) - d.import_files(os.path.join(test_dir, 'jail')) - - with pytest.raises(VirtualDirectoryError) as error: - d.descend('a', 'l', follow_symlinks=True) - assert error.reason == "directory-not-found" + +def test_bad_sym_escape(tmpdir): + _run_test_in_subprocess(_test_bad_sym_escape_subprocess, tmpdir) diff --git a/tests/sandboxes/missing_dependencies.py b/tests/sandboxes/missing_dependencies.py index 33a169ca2..975c8eb00 100644 --- a/tests/sandboxes/missing_dependencies.py +++ b/tests/sandboxes/missing_dependencies.py @@ -5,7 +5,7 @@ import os import pytest -from buildstream import _yaml +from buildstream import utils, _yaml from buildstream._exceptions import ErrorDomain from buildstream.testing._utils.site import IS_LINUX from buildstream.testing import cli # pylint: disable=unused-import @@ -20,7 +20,12 @@ DATA_DIR = os.path.join( @pytest.mark.skipif(not IS_LINUX, reason='Only available on Linux') @pytest.mark.datafiles(DATA_DIR) -def test_missing_brwap_has_nice_error_message(cli, datafiles): +def test_missing_brwap_has_nice_error_message(cli, datafiles, tmp_path): + # Create symlink to buildbox-casd to work with custom PATH + buildbox_casd = tmp_path.joinpath('bin/buildbox-casd') + buildbox_casd.parent.mkdir() + os.symlink(utils.get_host_tool('buildbox-casd'), str(buildbox_casd)) + project = str(datafiles) element_path = os.path.join(project, 'elements', 'element.bst') @@ -45,8 +50,8 @@ def test_missing_brwap_has_nice_error_message(cli, datafiles): result = cli.run( project=project, args=['build', 'element.bst'], - env={'PATH': '', 'BST_FORCE_SANDBOX': None} - ) + env={'PATH': str(tmp_path.joinpath('bin')), + 'BST_FORCE_SANDBOX': None}) result.assert_task_error(ErrorDomain.SANDBOX, 'unavailable-local-sandbox') assert "not found" in result.stderr @@ -64,6 +69,10 @@ def test_old_brwap_has_nice_error_message(cli, datafiles, tmp_path): bwrap.chmod(0o755) + # Create symlink to buildbox-casd to work with custom PATH + buildbox_casd = tmp_path.joinpath('bin/buildbox-casd') + os.symlink(utils.get_host_tool('buildbox-casd'), str(buildbox_casd)) + project = str(datafiles) element_path = os.path.join(project, 'elements', 'element3.bst') diff --git a/tests/sandboxes/selection.py b/tests/sandboxes/selection.py index eff247a95..b4bbb1b00 100644 --- a/tests/sandboxes/selection.py +++ b/tests/sandboxes/selection.py @@ -19,7 +19,7 @@ import os import pytest -from buildstream import _yaml +from buildstream import utils, _yaml from buildstream._exceptions import ErrorDomain from buildstream.testing import cli # pylint: disable=unused-import @@ -64,7 +64,12 @@ def test_force_sandbox(cli, datafiles): @pytest.mark.datafiles(DATA_DIR) -def test_dummy_sandbox_fallback(cli, datafiles): +def test_dummy_sandbox_fallback(cli, datafiles, tmp_path): + # Create symlink to buildbox-casd to work with custom PATH + buildbox_casd = tmp_path.joinpath('bin/buildbox-casd') + buildbox_casd.parent.mkdir() + os.symlink(utils.get_host_tool('buildbox-casd'), str(buildbox_casd)) + project = str(datafiles) element_path = os.path.join(project, 'elements', 'element.bst') @@ -86,7 +91,11 @@ def test_dummy_sandbox_fallback(cli, datafiles): _yaml.roundtrip_dump(element, element_path) # Build without access to host tools, this will fail - result = cli.run(project=project, args=['build', 'element.bst'], env={'PATH': '', 'BST_FORCE_SANDBOX': None}) + result = cli.run( + project=project, + args=['build', 'element.bst'], + env={'PATH': str(tmp_path.joinpath('bin')), + 'BST_FORCE_SANDBOX': None}) # But if we dont spesify a sandbox then we fall back to dummy, we still # fail early but only once we know we need a facny sandbox and that # dumy is not enough, there for element gets fetched and so is buildable diff --git a/tests/testutils/artifactshare.py b/tests/testutils/artifactshare.py index a5522c8eb..7d5faeb66 100644 --- a/tests/testutils/artifactshare.py +++ b/tests/testutils/artifactshare.py @@ -20,16 +20,12 @@ from buildstream._protos.buildstream.v2 import artifact_pb2 # # Args: # directory (str): The base temp directory for the test -# total_space (int): Mock total disk space on artifact server -# free_space (int): Mock free disk space on artifact server +# cache_quota (int): Maximum amount of disk space to use +# casd (bool): Allow write access via casd # class ArtifactShare(): - def __init__(self, directory, *, - total_space=None, - free_space=None, - min_head_size=int(2e9), - max_head_size=int(10e9)): + def __init__(self, directory, *, quota=None, casd=False): # The working directory for the artifact share (in case it # needs to do something outside of its backend's storage folder). @@ -46,13 +42,9 @@ class ArtifactShare(): self.artifactdir = os.path.join(self.repodir, 'artifacts', 'refs') os.makedirs(self.artifactdir) - self.cas = CASCache(self.repodir) + self.cas = CASCache(self.repodir, casd=casd) - self.total_space = total_space - self.free_space = free_space - - self.max_head_size = max_head_size - self.min_head_size = min_head_size + self.quota = quota q = Queue() @@ -78,30 +70,23 @@ class ArtifactShare(): pytest_cov.embed.cleanup_on_sigterm() try: - # Optionally mock statvfs - if self.total_space: - if self.free_space is None: - self.free_space = self.total_space - os.statvfs = self._mock_statvfs + with create_server(self.repodir, + quota=self.quota, + enable_push=True) as server: + port = server.add_insecure_port('localhost:0') - server = create_server(self.repodir, - max_head_size=self.max_head_size, - min_head_size=self.min_head_size, - enable_push=True) - port = server.add_insecure_port('localhost:0') + server.start() - server.start() + # Send port to parent + q.put(port) - # Send port to parent - q.put(port) + # Sleep until termination by signal + signal.pause() except Exception: q.put(None) raise - # Sleep until termination by signal - signal.pause() - # has_object(): # # Checks whether the object is present in the share @@ -176,18 +161,9 @@ class ArtifactShare(): self.process.terminate() self.process.join() - shutil.rmtree(self.directory) + self.cas.release_resources() - def _mock_statvfs(self, _path): - repo_size = 0 - for root, _, files in os.walk(self.repodir): - for filename in files: - repo_size += os.path.getsize(os.path.join(root, filename)) - - return statvfs_result(f_blocks=self.total_space, - f_bfree=self.free_space - repo_size, - f_bavail=self.free_space - repo_size, - f_bsize=1) + shutil.rmtree(self.directory) # create_artifact_share() @@ -195,11 +171,8 @@ class ArtifactShare(): # Create an ArtifactShare for use in a test case # @contextmanager -def create_artifact_share(directory, *, total_space=None, free_space=None, - min_head_size=int(2e9), - max_head_size=int(10e9)): - share = ArtifactShare(directory, total_space=total_space, free_space=free_space, - min_head_size=min_head_size, max_head_size=max_head_size) +def create_artifact_share(directory, *, quota=None, casd=False): + share = ArtifactShare(directory, quota=quota, casd=casd) try: yield share finally: |