diff options
author | bst-marge-bot <marge-bot@buildstream.build> | 2019-09-10 10:29:34 +0000 |
---|---|---|
committer | bst-marge-bot <marge-bot@buildstream.build> | 2019-09-10 10:29:34 +0000 |
commit | a200338e3b1947c0d8e7a89b1977f753988dc76e (patch) | |
tree | 1d349eab591af8bce5fb6de940e965d7d55fc8a0 | |
parent | 5c6a5cc98157efd4e8f5c2d113b6d1a3be38010c (diff) | |
parent | 1fc5515d02d83a75e8b9939f89dead2b531bb11c (diff) | |
download | buildstream-a200338e3b1947c0d8e7a89b1977f753988dc76e.tar.gz |
Merge branch 'juerg/casd' into 'master'
casremote.py: Limit request size for batch download and upload
Closes #1129
See merge request BuildStream/buildstream!1591
-rw-r--r-- | src/buildstream/_cas/cascache.py | 93 | ||||
-rw-r--r-- | src/buildstream/_cas/casremote.py | 75 | ||||
-rw-r--r-- | tests/frontend/large_directory.py | 86 | ||||
-rw-r--r-- | tests/frontend/project/elements/import-large-directory.bst | 4 |
4 files changed, 138 insertions, 120 deletions
diff --git a/src/buildstream/_cas/cascache.py b/src/buildstream/_cas/cascache.py index fa6705b70..2d93f0527 100644 --- a/src/buildstream/_cas/cascache.py +++ b/src/buildstream/_cas/cascache.py @@ -34,12 +34,11 @@ import grpc from .._protos.google.rpc import code_pb2 from .._protos.build.bazel.remote.execution.v2 import remote_execution_pb2, remote_execution_pb2_grpc from .._protos.build.buildgrid import local_cas_pb2, local_cas_pb2_grpc -from .._protos.buildstream.v2 import buildstream_pb2 from .. import utils from .._exceptions import CASCacheError -from .casremote import BlobNotFound, _CASBatchRead, _CASBatchUpdate +from .casremote import _CASBatchRead, _CASBatchUpdate _BUFFER_SIZE = 65536 @@ -305,46 +304,6 @@ class CASCache(): return modified, removed, added - # pull(): - # - # Pull a ref from a remote repository. - # - # Args: - # ref (str): The ref to pull - # remote (CASRemote): The remote repository to pull from - # - # Returns: - # (bool): True if pull was successful, False if ref was not available - # - def pull(self, ref, remote): - try: - remote.init() - - request = buildstream_pb2.GetReferenceRequest(instance_name=remote.spec.instance_name) - request.key = ref - response = remote.ref_storage.GetReference(request) - - tree = response.digest - - # Fetch Directory objects - self._fetch_directory(remote, tree) - - # Fetch files, excluded_subdirs determined in pullqueue - required_blobs = self.required_blobs_for_directory(tree) - missing_blobs = self.local_missing_blobs(required_blobs) - if missing_blobs: - self.fetch_blobs(remote, missing_blobs) - - self.set_ref(ref, tree) - - return True - except grpc.RpcError as e: - if e.code() != grpc.StatusCode.NOT_FOUND: - raise CASCacheError("Failed to pull ref {}: {}".format(ref, e)) from e - return False - except BlobNotFound: - return False - # pull_tree(): # # Pull a single Tree rather than a ref. @@ -368,56 +327,6 @@ class CASCache(): return None - # push(): - # - # Push committed refs to remote repository. - # - # Args: - # refs (list): The refs to push - # remote (CASRemote): The remote to push to - # - # Returns: - # (bool): True if any remote was updated, False if no pushes were required - # - # Raises: - # (CASCacheError): if there was an error - # - def push(self, refs, remote): - skipped_remote = True - try: - for ref in refs: - tree = self.resolve_ref(ref) - - # Check whether ref is already on the server in which case - # there is no need to push the ref - try: - request = buildstream_pb2.GetReferenceRequest(instance_name=remote.spec.instance_name) - request.key = ref - response = remote.ref_storage.GetReference(request) - - if response.digest.hash == tree.hash and response.digest.size_bytes == tree.size_bytes: - # ref is already on the server with the same tree - continue - - except grpc.RpcError as e: - if e.code() != grpc.StatusCode.NOT_FOUND: - # Intentionally re-raise RpcError for outer except block. - raise - - self._send_directory(remote, tree) - - request = buildstream_pb2.UpdateReferenceRequest(instance_name=remote.spec.instance_name) - request.keys.append(ref) - request.digest.CopyFrom(tree) - remote.ref_storage.UpdateReference(request) - - skipped_remote = False - except grpc.RpcError as e: - if e.code() != grpc.StatusCode.RESOURCE_EXHAUSTED: - raise CASCacheError("Failed to push ref {}: {}".format(refs, e), temporary=True) from e - - return not skipped_remote - # objpath(): # # Return the path of an object based on its digest. diff --git a/src/buildstream/_cas/casremote.py b/src/buildstream/_cas/casremote.py index 1efed22e6..43e215c63 100644 --- a/src/buildstream/_cas/casremote.py +++ b/src/buildstream/_cas/casremote.py @@ -12,6 +12,11 @@ from .._exceptions import CASRemoteError # Limit payload to 1 MiB to leave sufficient headroom for metadata. _MAX_PAYLOAD_BYTES = 1024 * 1024 +# How many digests to put in a single gRPC message. +# A 256-bit hash requires 64 bytes of space (hexadecimal encoding). +# 80 bytes provide sufficient space for hash, size, and protobuf overhead. +_MAX_DIGESTS = _MAX_PAYLOAD_BYTES / 80 + class BlobNotFound(CASRemoteError): @@ -157,13 +162,18 @@ class CASRemote(BaseRemote): class _CASBatchRead(): def __init__(self, remote): self._remote = remote - self._request = local_cas_pb2.FetchMissingBlobsRequest() - self._request.instance_name = remote.local_cas_instance_name + self._requests = [] + self._request = None self._sent = False def add(self, digest): assert not self._sent + if not self._request or len(self._request.blob_digests) >= _MAX_DIGESTS: + self._request = local_cas_pb2.FetchMissingBlobsRequest() + self._request.instance_name = self._remote.local_cas_instance_name + self._requests.append(self._request) + request_digest = self._request.blob_digests.add() request_digest.CopyFrom(digest) @@ -171,26 +181,28 @@ class _CASBatchRead(): assert not self._sent self._sent = True - if not self._request.blob_digests: + if not self._requests: return local_cas = self._remote.cascache._get_local_cas() - batch_response = local_cas.FetchMissingBlobs(self._request) - for response in batch_response.responses: - if response.status.code == code_pb2.NOT_FOUND: - if missing_blobs is None: - raise BlobNotFound(response.digest.hash, "Failed to download blob {}: {}".format( - response.digest.hash, response.status.code)) + for request in self._requests: + batch_response = local_cas.FetchMissingBlobs(request) - missing_blobs.append(response.digest) + for response in batch_response.responses: + if response.status.code == code_pb2.NOT_FOUND: + if missing_blobs is None: + raise BlobNotFound(response.digest.hash, "Failed to download blob {}: {}".format( + response.digest.hash, response.status.code)) - if response.status.code != code_pb2.OK: - raise CASRemoteError("Failed to download blob {}: {}".format( - response.digest.hash, response.status.code)) - if response.digest.size_bytes != len(response.data): - raise CASRemoteError("Failed to download blob {}: expected {} bytes, received {} bytes".format( - response.digest.hash, response.digest.size_bytes, len(response.data))) + missing_blobs.append(response.digest) + + if response.status.code != code_pb2.OK: + raise CASRemoteError("Failed to download blob {}: {}".format( + response.digest.hash, response.status.code)) + if response.digest.size_bytes != len(response.data): + raise CASRemoteError("Failed to download blob {}: expected {} bytes, received {} bytes".format( + response.digest.hash, response.digest.size_bytes, len(response.data))) # Represents a batch of blobs queued for upload. @@ -198,13 +210,18 @@ class _CASBatchRead(): class _CASBatchUpdate(): def __init__(self, remote): self._remote = remote - self._request = local_cas_pb2.UploadMissingBlobsRequest() - self._request.instance_name = remote.local_cas_instance_name + self._requests = [] + self._request = None self._sent = False def add(self, digest): assert not self._sent + if not self._request or len(self._request.blob_digests) >= _MAX_DIGESTS: + self._request = local_cas_pb2.UploadMissingBlobsRequest() + self._request.instance_name = self._remote.local_cas_instance_name + self._requests.append(self._request) + request_digest = self._request.blob_digests.add() request_digest.CopyFrom(digest) @@ -212,18 +229,20 @@ class _CASBatchUpdate(): assert not self._sent self._sent = True - if not self._request.blob_digests: + if not self._requests: return local_cas = self._remote.cascache._get_local_cas() - batch_response = local_cas.UploadMissingBlobs(self._request) - for response in batch_response.responses: - if response.status.code != code_pb2.OK: - if response.status.code == code_pb2.RESOURCE_EXHAUSTED: - reason = "cache-too-full" - else: - reason = None + for request in self._requests: + batch_response = local_cas.UploadMissingBlobs(request) + + for response in batch_response.responses: + if response.status.code != code_pb2.OK: + if response.status.code == code_pb2.RESOURCE_EXHAUSTED: + reason = "cache-too-full" + else: + reason = None - raise CASRemoteError("Failed to upload blob {}: {}".format( - response.digest.hash, response.status.code), reason=reason) + raise CASRemoteError("Failed to upload blob {}: {}".format( + response.digest.hash, response.status.code), reason=reason) diff --git a/tests/frontend/large_directory.py b/tests/frontend/large_directory.py new file mode 100644 index 000000000..921e2ddbe --- /dev/null +++ b/tests/frontend/large_directory.py @@ -0,0 +1,86 @@ +# +# Copyright (C) 2019 Bloomberg Finance LP +# +# This program is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License as published by the Free Software Foundation; either +# version 2 of the License, or (at your option) any later version. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with this library. If not, see <http://www.gnu.org/licenses/>. +# + +# Pylint doesn't play well with fixtures and dependency injection from pytest +# pylint: disable=redefined-outer-name + +from contextlib import contextmanager +import os +import pytest + +import grpc + +from buildstream.testing import cli # pylint: disable=unused-import +from tests.testutils import create_artifact_share, assert_shared + + +# Project directory +DATA_DIR = os.path.join( + os.path.dirname(os.path.realpath(__file__)), + "project", +) + + +@contextmanager +def limit_grpc_message_length(limit): + orig_insecure_channel = grpc.insecure_channel + + def new_insecure_channel(target): + return orig_insecure_channel(target, options=(('grpc.max_send_message_length', limit),)) + + grpc.insecure_channel = new_insecure_channel + try: + yield + finally: + grpc.insecure_channel = orig_insecure_channel + + +@pytest.mark.datafiles(DATA_DIR) +def test_large_directory(cli, tmpdir, datafiles): + project = str(datafiles) + + # Number of files chosen to ensure the complete list of digests exceeds + # our 1 MB gRPC message limit. I.e., test message splitting. + MAX_MESSAGE_LENGTH = 1024 * 1024 + NUM_FILES = MAX_MESSAGE_LENGTH // 64 + 1 + + large_directory_dir = os.path.join(project, 'files', 'large-directory') + os.mkdir(large_directory_dir) + for i in range(NUM_FILES): + with open(os.path.join(large_directory_dir, str(i)), 'w') as f: + # The files need to have different content as we want different digests. + f.write(str(i)) + + with create_artifact_share(os.path.join(str(tmpdir), 'artifactshare')) as share: + # Configure bst to push to the artifact share + cli.configure({ + 'artifacts': [ + {'url': share.repo, 'push': True}, + ] + }) + + # Enforce 1 MB gRPC message limit + with limit_grpc_message_length(MAX_MESSAGE_LENGTH): + # Build and push + result = cli.run(project=project, args=['build', 'import-large-directory.bst']) + result.assert_success() + + # Assert that we are now cached locally + assert cli.get_element_state(project, 'import-large-directory.bst') == 'cached' + + # Assert that the push was successful + assert_shared(cli, share, project, 'import-large-directory.bst') diff --git a/tests/frontend/project/elements/import-large-directory.bst b/tests/frontend/project/elements/import-large-directory.bst new file mode 100644 index 000000000..a89a8c16d --- /dev/null +++ b/tests/frontend/project/elements/import-large-directory.bst @@ -0,0 +1,4 @@ +kind: import +sources: +- kind: local + path: files/large-directory |