summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorbst-marge-bot <marge-bot@buildstream.build>2019-09-10 10:29:34 +0000
committerbst-marge-bot <marge-bot@buildstream.build>2019-09-10 10:29:34 +0000
commita200338e3b1947c0d8e7a89b1977f753988dc76e (patch)
tree1d349eab591af8bce5fb6de940e965d7d55fc8a0
parent5c6a5cc98157efd4e8f5c2d113b6d1a3be38010c (diff)
parent1fc5515d02d83a75e8b9939f89dead2b531bb11c (diff)
downloadbuildstream-a200338e3b1947c0d8e7a89b1977f753988dc76e.tar.gz
Merge branch 'juerg/casd' into 'master'
casremote.py: Limit request size for batch download and upload Closes #1129 See merge request BuildStream/buildstream!1591
-rw-r--r--src/buildstream/_cas/cascache.py93
-rw-r--r--src/buildstream/_cas/casremote.py75
-rw-r--r--tests/frontend/large_directory.py86
-rw-r--r--tests/frontend/project/elements/import-large-directory.bst4
4 files changed, 138 insertions, 120 deletions
diff --git a/src/buildstream/_cas/cascache.py b/src/buildstream/_cas/cascache.py
index fa6705b70..2d93f0527 100644
--- a/src/buildstream/_cas/cascache.py
+++ b/src/buildstream/_cas/cascache.py
@@ -34,12 +34,11 @@ import grpc
from .._protos.google.rpc import code_pb2
from .._protos.build.bazel.remote.execution.v2 import remote_execution_pb2, remote_execution_pb2_grpc
from .._protos.build.buildgrid import local_cas_pb2, local_cas_pb2_grpc
-from .._protos.buildstream.v2 import buildstream_pb2
from .. import utils
from .._exceptions import CASCacheError
-from .casremote import BlobNotFound, _CASBatchRead, _CASBatchUpdate
+from .casremote import _CASBatchRead, _CASBatchUpdate
_BUFFER_SIZE = 65536
@@ -305,46 +304,6 @@ class CASCache():
return modified, removed, added
- # pull():
- #
- # Pull a ref from a remote repository.
- #
- # Args:
- # ref (str): The ref to pull
- # remote (CASRemote): The remote repository to pull from
- #
- # Returns:
- # (bool): True if pull was successful, False if ref was not available
- #
- def pull(self, ref, remote):
- try:
- remote.init()
-
- request = buildstream_pb2.GetReferenceRequest(instance_name=remote.spec.instance_name)
- request.key = ref
- response = remote.ref_storage.GetReference(request)
-
- tree = response.digest
-
- # Fetch Directory objects
- self._fetch_directory(remote, tree)
-
- # Fetch files, excluded_subdirs determined in pullqueue
- required_blobs = self.required_blobs_for_directory(tree)
- missing_blobs = self.local_missing_blobs(required_blobs)
- if missing_blobs:
- self.fetch_blobs(remote, missing_blobs)
-
- self.set_ref(ref, tree)
-
- return True
- except grpc.RpcError as e:
- if e.code() != grpc.StatusCode.NOT_FOUND:
- raise CASCacheError("Failed to pull ref {}: {}".format(ref, e)) from e
- return False
- except BlobNotFound:
- return False
-
# pull_tree():
#
# Pull a single Tree rather than a ref.
@@ -368,56 +327,6 @@ class CASCache():
return None
- # push():
- #
- # Push committed refs to remote repository.
- #
- # Args:
- # refs (list): The refs to push
- # remote (CASRemote): The remote to push to
- #
- # Returns:
- # (bool): True if any remote was updated, False if no pushes were required
- #
- # Raises:
- # (CASCacheError): if there was an error
- #
- def push(self, refs, remote):
- skipped_remote = True
- try:
- for ref in refs:
- tree = self.resolve_ref(ref)
-
- # Check whether ref is already on the server in which case
- # there is no need to push the ref
- try:
- request = buildstream_pb2.GetReferenceRequest(instance_name=remote.spec.instance_name)
- request.key = ref
- response = remote.ref_storage.GetReference(request)
-
- if response.digest.hash == tree.hash and response.digest.size_bytes == tree.size_bytes:
- # ref is already on the server with the same tree
- continue
-
- except grpc.RpcError as e:
- if e.code() != grpc.StatusCode.NOT_FOUND:
- # Intentionally re-raise RpcError for outer except block.
- raise
-
- self._send_directory(remote, tree)
-
- request = buildstream_pb2.UpdateReferenceRequest(instance_name=remote.spec.instance_name)
- request.keys.append(ref)
- request.digest.CopyFrom(tree)
- remote.ref_storage.UpdateReference(request)
-
- skipped_remote = False
- except grpc.RpcError as e:
- if e.code() != grpc.StatusCode.RESOURCE_EXHAUSTED:
- raise CASCacheError("Failed to push ref {}: {}".format(refs, e), temporary=True) from e
-
- return not skipped_remote
-
# objpath():
#
# Return the path of an object based on its digest.
diff --git a/src/buildstream/_cas/casremote.py b/src/buildstream/_cas/casremote.py
index 1efed22e6..43e215c63 100644
--- a/src/buildstream/_cas/casremote.py
+++ b/src/buildstream/_cas/casremote.py
@@ -12,6 +12,11 @@ from .._exceptions import CASRemoteError
# Limit payload to 1 MiB to leave sufficient headroom for metadata.
_MAX_PAYLOAD_BYTES = 1024 * 1024
+# How many digests to put in a single gRPC message.
+# A 256-bit hash requires 64 bytes of space (hexadecimal encoding).
+# 80 bytes provide sufficient space for hash, size, and protobuf overhead.
+_MAX_DIGESTS = _MAX_PAYLOAD_BYTES / 80
+
class BlobNotFound(CASRemoteError):
@@ -157,13 +162,18 @@ class CASRemote(BaseRemote):
class _CASBatchRead():
def __init__(self, remote):
self._remote = remote
- self._request = local_cas_pb2.FetchMissingBlobsRequest()
- self._request.instance_name = remote.local_cas_instance_name
+ self._requests = []
+ self._request = None
self._sent = False
def add(self, digest):
assert not self._sent
+ if not self._request or len(self._request.blob_digests) >= _MAX_DIGESTS:
+ self._request = local_cas_pb2.FetchMissingBlobsRequest()
+ self._request.instance_name = self._remote.local_cas_instance_name
+ self._requests.append(self._request)
+
request_digest = self._request.blob_digests.add()
request_digest.CopyFrom(digest)
@@ -171,26 +181,28 @@ class _CASBatchRead():
assert not self._sent
self._sent = True
- if not self._request.blob_digests:
+ if not self._requests:
return
local_cas = self._remote.cascache._get_local_cas()
- batch_response = local_cas.FetchMissingBlobs(self._request)
- for response in batch_response.responses:
- if response.status.code == code_pb2.NOT_FOUND:
- if missing_blobs is None:
- raise BlobNotFound(response.digest.hash, "Failed to download blob {}: {}".format(
- response.digest.hash, response.status.code))
+ for request in self._requests:
+ batch_response = local_cas.FetchMissingBlobs(request)
- missing_blobs.append(response.digest)
+ for response in batch_response.responses:
+ if response.status.code == code_pb2.NOT_FOUND:
+ if missing_blobs is None:
+ raise BlobNotFound(response.digest.hash, "Failed to download blob {}: {}".format(
+ response.digest.hash, response.status.code))
- if response.status.code != code_pb2.OK:
- raise CASRemoteError("Failed to download blob {}: {}".format(
- response.digest.hash, response.status.code))
- if response.digest.size_bytes != len(response.data):
- raise CASRemoteError("Failed to download blob {}: expected {} bytes, received {} bytes".format(
- response.digest.hash, response.digest.size_bytes, len(response.data)))
+ missing_blobs.append(response.digest)
+
+ if response.status.code != code_pb2.OK:
+ raise CASRemoteError("Failed to download blob {}: {}".format(
+ response.digest.hash, response.status.code))
+ if response.digest.size_bytes != len(response.data):
+ raise CASRemoteError("Failed to download blob {}: expected {} bytes, received {} bytes".format(
+ response.digest.hash, response.digest.size_bytes, len(response.data)))
# Represents a batch of blobs queued for upload.
@@ -198,13 +210,18 @@ class _CASBatchRead():
class _CASBatchUpdate():
def __init__(self, remote):
self._remote = remote
- self._request = local_cas_pb2.UploadMissingBlobsRequest()
- self._request.instance_name = remote.local_cas_instance_name
+ self._requests = []
+ self._request = None
self._sent = False
def add(self, digest):
assert not self._sent
+ if not self._request or len(self._request.blob_digests) >= _MAX_DIGESTS:
+ self._request = local_cas_pb2.UploadMissingBlobsRequest()
+ self._request.instance_name = self._remote.local_cas_instance_name
+ self._requests.append(self._request)
+
request_digest = self._request.blob_digests.add()
request_digest.CopyFrom(digest)
@@ -212,18 +229,20 @@ class _CASBatchUpdate():
assert not self._sent
self._sent = True
- if not self._request.blob_digests:
+ if not self._requests:
return
local_cas = self._remote.cascache._get_local_cas()
- batch_response = local_cas.UploadMissingBlobs(self._request)
- for response in batch_response.responses:
- if response.status.code != code_pb2.OK:
- if response.status.code == code_pb2.RESOURCE_EXHAUSTED:
- reason = "cache-too-full"
- else:
- reason = None
+ for request in self._requests:
+ batch_response = local_cas.UploadMissingBlobs(request)
+
+ for response in batch_response.responses:
+ if response.status.code != code_pb2.OK:
+ if response.status.code == code_pb2.RESOURCE_EXHAUSTED:
+ reason = "cache-too-full"
+ else:
+ reason = None
- raise CASRemoteError("Failed to upload blob {}: {}".format(
- response.digest.hash, response.status.code), reason=reason)
+ raise CASRemoteError("Failed to upload blob {}: {}".format(
+ response.digest.hash, response.status.code), reason=reason)
diff --git a/tests/frontend/large_directory.py b/tests/frontend/large_directory.py
new file mode 100644
index 000000000..921e2ddbe
--- /dev/null
+++ b/tests/frontend/large_directory.py
@@ -0,0 +1,86 @@
+#
+# Copyright (C) 2019 Bloomberg Finance LP
+#
+# This program is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License as published by the Free Software Foundation; either
+# version 2 of the License, or (at your option) any later version.
+#
+# This library is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+# Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public
+# License along with this library. If not, see <http://www.gnu.org/licenses/>.
+#
+
+# Pylint doesn't play well with fixtures and dependency injection from pytest
+# pylint: disable=redefined-outer-name
+
+from contextlib import contextmanager
+import os
+import pytest
+
+import grpc
+
+from buildstream.testing import cli # pylint: disable=unused-import
+from tests.testutils import create_artifact_share, assert_shared
+
+
+# Project directory
+DATA_DIR = os.path.join(
+ os.path.dirname(os.path.realpath(__file__)),
+ "project",
+)
+
+
+@contextmanager
+def limit_grpc_message_length(limit):
+ orig_insecure_channel = grpc.insecure_channel
+
+ def new_insecure_channel(target):
+ return orig_insecure_channel(target, options=(('grpc.max_send_message_length', limit),))
+
+ grpc.insecure_channel = new_insecure_channel
+ try:
+ yield
+ finally:
+ grpc.insecure_channel = orig_insecure_channel
+
+
+@pytest.mark.datafiles(DATA_DIR)
+def test_large_directory(cli, tmpdir, datafiles):
+ project = str(datafiles)
+
+ # Number of files chosen to ensure the complete list of digests exceeds
+ # our 1 MB gRPC message limit. I.e., test message splitting.
+ MAX_MESSAGE_LENGTH = 1024 * 1024
+ NUM_FILES = MAX_MESSAGE_LENGTH // 64 + 1
+
+ large_directory_dir = os.path.join(project, 'files', 'large-directory')
+ os.mkdir(large_directory_dir)
+ for i in range(NUM_FILES):
+ with open(os.path.join(large_directory_dir, str(i)), 'w') as f:
+ # The files need to have different content as we want different digests.
+ f.write(str(i))
+
+ with create_artifact_share(os.path.join(str(tmpdir), 'artifactshare')) as share:
+ # Configure bst to push to the artifact share
+ cli.configure({
+ 'artifacts': [
+ {'url': share.repo, 'push': True},
+ ]
+ })
+
+ # Enforce 1 MB gRPC message limit
+ with limit_grpc_message_length(MAX_MESSAGE_LENGTH):
+ # Build and push
+ result = cli.run(project=project, args=['build', 'import-large-directory.bst'])
+ result.assert_success()
+
+ # Assert that we are now cached locally
+ assert cli.get_element_state(project, 'import-large-directory.bst') == 'cached'
+
+ # Assert that the push was successful
+ assert_shared(cli, share, project, 'import-large-directory.bst')
diff --git a/tests/frontend/project/elements/import-large-directory.bst b/tests/frontend/project/elements/import-large-directory.bst
new file mode 100644
index 000000000..a89a8c16d
--- /dev/null
+++ b/tests/frontend/project/elements/import-large-directory.bst
@@ -0,0 +1,4 @@
+kind: import
+sources:
+- kind: local
+ path: files/large-directory