summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorbst-marge-bot <marge-bot@buildstream.build>2019-03-27 18:49:31 +0000
committerbst-marge-bot <marge-bot@buildstream.build>2019-03-27 18:49:31 +0000
commit49c22f209b32247bc1335f8632518a971c056d9e (patch)
treefd3a75fede307b35ac69d1dbc9ee5cbe432a7b73
parentbcf02294094d23ebacd97d149d7c5cab5605f8ea (diff)
parent60290223f87f17c15a8bd3562a636fe8b7770cfa (diff)
downloadbuildstream-49c22f209b32247bc1335f8632518a971c056d9e.tar.gz
Merge branch 'juerg/partial-cas-remote' into 'master'
Initial partial CAS support for remote execution See merge request BuildStream/buildstream!1232
-rw-r--r--buildstream/_artifactcache.py45
-rw-r--r--buildstream/_cas/cascache.py177
-rw-r--r--buildstream/_cas/casremote.py32
-rw-r--r--buildstream/sandbox/_sandboxremote.py35
-rw-r--r--tests/artifactcache/push.py108
5 files changed, 177 insertions, 220 deletions
diff --git a/buildstream/_artifactcache.py b/buildstream/_artifactcache.py
index 3ca6c6e60..5fd646137 100644
--- a/buildstream/_artifactcache.py
+++ b/buildstream/_artifactcache.py
@@ -359,30 +359,6 @@ class ArtifactCache(BaseCache):
return None
- # push_directory():
- #
- # Push the given virtual directory to all remotes.
- #
- # Args:
- # project (Project): The current project
- # directory (Directory): A virtual directory object to push.
- #
- # Raises:
- # (ArtifactError): if there was an error
- #
- def push_directory(self, project, directory):
- if self._has_push_remotes:
- push_remotes = [r for r in self._remotes[project] if r.spec.push]
- else:
- push_remotes = []
-
- if not push_remotes:
- raise ArtifactError("push_directory was called, but no remote artifact " +
- "servers are configured as push remotes.")
-
- for remote in push_remotes:
- self.cas.push_directory(remote, directory)
-
# push_message():
#
# Push the given protobuf message to all remotes.
@@ -439,3 +415,24 @@ class ArtifactCache(BaseCache):
cache_id = self.cas.resolve_ref(ref, update_mtime=True)
vdir = CasBasedDirectory(self.cas, digest=cache_id).descend('logs')
return vdir
+
+ # fetch_missing_blobs():
+ #
+ # Fetch missing blobs from configured remote repositories.
+ #
+ # Args:
+ # project (Project): The current project
+ # missing_blobs (list): The Digests of the blobs to fetch
+ #
+ def fetch_missing_blobs(self, project, missing_blobs):
+ for remote in self._remotes[project]:
+ if not missing_blobs:
+ break
+
+ remote.init()
+
+ # fetch_blobs() will return the blobs that are still missing
+ missing_blobs = self.cas.fetch_blobs(remote, missing_blobs)
+
+ if missing_blobs:
+ raise ArtifactError("Blobs not found on configured artifact servers")
diff --git a/buildstream/_cas/cascache.py b/buildstream/_cas/cascache.py
index 63871ebe4..19020e234 100644
--- a/buildstream/_cas/cascache.py
+++ b/buildstream/_cas/cascache.py
@@ -272,8 +272,14 @@ class CASCache():
tree.hash = response.digest.hash
tree.size_bytes = response.digest.size_bytes
- # Fetch artifact, excluded_subdirs determined in pullqueue
- self._fetch_directory(remote, tree, excluded_subdirs=excluded_subdirs)
+ # Fetch Directory objects
+ self._fetch_directory(remote, tree)
+
+ # Fetch files, excluded_subdirs determined in pullqueue
+ required_blobs = self._required_blobs(tree, excluded_subdirs=excluded_subdirs)
+ missing_blobs = self.local_missing_blobs(required_blobs)
+ if missing_blobs:
+ self.fetch_blobs(remote, missing_blobs)
self.set_ref(ref, tree)
@@ -373,23 +379,6 @@ class CASCache():
return not skipped_remote
- # push_directory():
- #
- # Push the given virtual directory to a remote.
- #
- # Args:
- # remote (CASRemote): The remote to push to
- # directory (Directory): A virtual directory object to push.
- #
- # Raises:
- # (CASCacheError): if there was an error
- #
- def push_directory(self, remote, directory):
- remote.init()
-
- digest = directory._get_digest()
- self._send_directory(remote, digest)
-
# objpath():
#
# Return the path of an object based on its digest.
@@ -648,6 +637,54 @@ class CASCache():
reachable = set()
self._reachable_refs_dir(reachable, tree, update_mtime=True)
+ # remote_missing_blobs_for_directory():
+ #
+ # Determine which blobs of a directory tree are missing on the remote.
+ #
+ # Args:
+ # digest (Digest): The directory digest
+ #
+ # Returns: List of missing Digest objects
+ #
+ def remote_missing_blobs_for_directory(self, remote, digest):
+ required_blobs = self._required_blobs(digest)
+
+ missing_blobs = dict()
+ # Limit size of FindMissingBlobs request
+ for required_blobs_group in _grouper(required_blobs, 512):
+ request = remote_execution_pb2.FindMissingBlobsRequest(instance_name=remote.spec.instance_name)
+
+ for required_digest in required_blobs_group:
+ d = request.blob_digests.add()
+ d.hash = required_digest.hash
+ d.size_bytes = required_digest.size_bytes
+
+ response = remote.cas.FindMissingBlobs(request)
+ for missing_digest in response.missing_blob_digests:
+ d = remote_execution_pb2.Digest()
+ d.hash = missing_digest.hash
+ d.size_bytes = missing_digest.size_bytes
+ missing_blobs[d.hash] = d
+
+ return missing_blobs.values()
+
+ # local_missing_blobs():
+ #
+ # Check local cache for missing blobs.
+ #
+ # Args:
+ # digests (list): The Digests of blobs to check
+ #
+ # Returns: Missing Digest objects
+ #
+ def local_missing_blobs(self, digests):
+ missing_blobs = []
+ for digest in digests:
+ objpath = self.objpath(digest)
+ if not os.path.exists(objpath):
+ missing_blobs.append(digest)
+ return missing_blobs
+
################################################
# Local Private Methods #
################################################
@@ -841,7 +878,10 @@ class CASCache():
for dirnode in directory.directories:
self._reachable_refs_dir(reachable, dirnode.digest, update_mtime=update_mtime)
- def _required_blobs(self, directory_digest):
+ def _required_blobs(self, directory_digest, *, excluded_subdirs=None):
+ if not excluded_subdirs:
+ excluded_subdirs = []
+
# parse directory, and recursively add blobs
d = remote_execution_pb2.Digest()
d.hash = directory_digest.hash
@@ -860,7 +900,8 @@ class CASCache():
yield d
for dirnode in directory.directories:
- yield from self._required_blobs(dirnode.digest)
+ if dirnode.name not in excluded_subdirs:
+ yield from self._required_blobs(dirnode.digest)
# _temporary_object():
#
@@ -900,8 +941,8 @@ class CASCache():
return objpath
- def _batch_download_complete(self, batch):
- for digest, data in batch.send():
+ def _batch_download_complete(self, batch, *, missing_blobs=None):
+ for digest, data in batch.send(missing_blobs=missing_blobs):
with self._temporary_object() as f:
f.write(data)
f.flush()
@@ -953,21 +994,19 @@ class CASCache():
#
# Fetches remote directory and adds it to content addressable store.
#
- # Fetches files, symbolic links and recursively other directories in
- # the remote directory and adds them to the content addressable
- # store.
+ # This recursively fetches directory objects but doesn't fetch any
+ # files.
#
# Args:
# remote (Remote): The remote to use.
# dir_digest (Digest): Digest object for the directory to fetch.
- # excluded_subdirs (list): The optional list of subdirs to not fetch
#
- def _fetch_directory(self, remote, dir_digest, *, excluded_subdirs=None):
+ def _fetch_directory(self, remote, dir_digest):
+ # TODO Use GetTree() if the server supports it
+
fetch_queue = [dir_digest]
fetch_next_queue = []
batch = _CASBatchRead(remote)
- if not excluded_subdirs:
- excluded_subdirs = []
while len(fetch_queue) + len(fetch_next_queue) > 0:
if not fetch_queue:
@@ -982,13 +1021,8 @@ class CASCache():
directory.ParseFromString(f.read())
for dirnode in directory.directories:
- if dirnode.name not in excluded_subdirs:
- batch = self._fetch_directory_node(remote, dirnode.digest, batch,
- fetch_queue, fetch_next_queue, recursive=True)
-
- for filenode in directory.files:
- batch = self._fetch_directory_node(remote, filenode.digest, batch,
- fetch_queue, fetch_next_queue)
+ batch = self._fetch_directory_node(remote, dirnode.digest, batch,
+ fetch_queue, fetch_next_queue, recursive=True)
# Fetch final batch
self._fetch_directory_batch(remote, batch, fetch_queue, fetch_next_queue)
@@ -1016,30 +1050,55 @@ class CASCache():
return dirdigest
- def _send_directory(self, remote, digest, u_uid=uuid.uuid4()):
- required_blobs = self._required_blobs(digest)
+ # fetch_blobs():
+ #
+ # Fetch blobs from remote CAS. Returns missing blobs that could not be fetched.
+ #
+ # Args:
+ # remote (CASRemote): The remote repository to fetch from
+ # digests (list): The Digests of blobs to fetch
+ #
+ # Returns: The Digests of the blobs that were not available on the remote CAS
+ #
+ def fetch_blobs(self, remote, digests):
+ missing_blobs = []
- missing_blobs = dict()
- # Limit size of FindMissingBlobs request
- for required_blobs_group in _grouper(required_blobs, 512):
- request = remote_execution_pb2.FindMissingBlobsRequest(instance_name=remote.spec.instance_name)
+ batch = _CASBatchRead(remote)
- for required_digest in required_blobs_group:
- d = request.blob_digests.add()
- d.hash = required_digest.hash
- d.size_bytes = required_digest.size_bytes
+ for digest in digests:
+ if (digest.size_bytes >= remote.max_batch_total_size_bytes or
+ not remote.batch_read_supported):
+ # Too large for batch request, download in independent request.
+ try:
+ self._ensure_blob(remote, digest)
+ except grpc.RpcError as e:
+ if e.code() == grpc.StatusCode.NOT_FOUND:
+ missing_blobs.append(digest)
+ else:
+ raise CASCacheError("Failed to fetch blob: {}".format(e)) from e
+ else:
+ if not batch.add(digest):
+ # Not enough space left in batch request.
+ # Complete pending batch first.
+ self._batch_download_complete(batch, missing_blobs=missing_blobs)
- response = remote.cas.FindMissingBlobs(request)
- for missing_digest in response.missing_blob_digests:
- d = remote_execution_pb2.Digest()
- d.hash = missing_digest.hash
- d.size_bytes = missing_digest.size_bytes
- missing_blobs[d.hash] = d
+ batch = _CASBatchRead(remote)
+ batch.add(digest)
- # Upload any blobs missing on the server
- self._send_blobs(remote, missing_blobs.values(), u_uid)
+ # Complete last pending batch
+ self._batch_download_complete(batch, missing_blobs=missing_blobs)
+
+ return missing_blobs
- def _send_blobs(self, remote, digests, u_uid=uuid.uuid4()):
+ # send_blobs():
+ #
+ # Upload blobs to remote CAS.
+ #
+ # Args:
+ # remote (CASRemote): The remote repository to upload to
+ # digests (list): The Digests of Blobs to upload
+ #
+ def send_blobs(self, remote, digests, u_uid=uuid.uuid4()):
batch = _CASBatchUpdate(remote)
for digest in digests:
@@ -1061,6 +1120,12 @@ class CASCache():
# Send final batch
batch.send()
+ def _send_directory(self, remote, digest, u_uid=uuid.uuid4()):
+ missing_blobs = self.remote_missing_blobs_for_directory(remote, digest)
+
+ # Upload any blobs missing on the server
+ self.send_blobs(remote, missing_blobs, u_uid)
+
class CASQuota:
def __init__(self, context):
diff --git a/buildstream/_cas/casremote.py b/buildstream/_cas/casremote.py
index df1dd799c..aac0d2802 100644
--- a/buildstream/_cas/casremote.py
+++ b/buildstream/_cas/casremote.py
@@ -221,28 +221,6 @@ class CASRemote():
return error
- # verify_digest_on_remote():
- #
- # Check whether the object is already on the server in which case
- # there is no need to upload it.
- #
- # Args:
- # digest (Digest): The object digest.
- #
- def verify_digest_on_remote(self, digest):
- self.init()
-
- request = remote_execution_pb2.FindMissingBlobsRequest()
- if self.instance_name:
- request.instance_name = self.instance_name
- request.blob_digests.extend([digest])
-
- response = self.cas.FindMissingBlobs(request)
- if digest in response.missing_blob_digests:
- return False
-
- return True
-
# push_message():
#
# Push the given protobuf message to a remote.
@@ -344,7 +322,7 @@ class _CASBatchRead():
self._size = new_batch_size
return True
- def send(self):
+ def send(self, *, missing_blobs=None):
assert not self._sent
self._sent = True
@@ -355,8 +333,12 @@ class _CASBatchRead():
for response in batch_response.responses:
if response.status.code == code_pb2.NOT_FOUND:
- raise BlobNotFound(response.digest.hash, "Failed to download blob {}: {}".format(
- response.digest.hash, response.status.code))
+ if missing_blobs is None:
+ raise BlobNotFound(response.digest.hash, "Failed to download blob {}: {}".format(
+ response.digest.hash, response.status.code))
+ else:
+ missing_blobs.append(response.digest)
+
if response.status.code != code_pb2.OK:
raise CASRemoteError("Failed to download blob {}: {}".format(
response.digest.hash, response.status.code))
diff --git a/buildstream/sandbox/_sandboxremote.py b/buildstream/sandbox/_sandboxremote.py
index be3234796..ada8268c0 100644
--- a/buildstream/sandbox/_sandboxremote.py
+++ b/buildstream/sandbox/_sandboxremote.py
@@ -34,7 +34,7 @@ from ..storage._casbaseddirectory import CasBasedDirectory
from .. import _signals
from .._protos.build.bazel.remote.execution.v2 import remote_execution_pb2, remote_execution_pb2_grpc
from .._protos.google.rpc import code_pb2
-from .._exceptions import SandboxError
+from .._exceptions import BstError, SandboxError
from .. import _yaml
from .._protos.google.longrunning import operations_pb2, operations_pb2_grpc
from .._cas import CASRemote, CASRemoteSpec
@@ -293,9 +293,13 @@ class SandboxRemote(Sandbox):
def _run(self, command, flags, *, cwd, env):
stdout, stderr = self._get_output()
+ context = self._get_context()
+ project = self._get_project()
+ cascache = context.get_cascache()
+ artifactcache = context.artifactcache
+
# set up virtual dircetory
upload_vdir = self.get_virtual_directory()
- cascache = self._get_context().get_cascache()
# Create directories for all marked directories. This emulates
# some of the behaviour of other sandboxes, which create these
@@ -331,15 +335,32 @@ class SandboxRemote(Sandbox):
if not action_result:
casremote = CASRemote(self.storage_remote_spec)
+ try:
+ casremote.init()
+ except grpc.RpcError as e:
+ raise SandboxError("Failed to contact remote execution CAS endpoint at {}: {}"
+ .format(self.storage_url, e)) from e
- # Now, push that key (without necessarily needing a ref) to the remote.
+ # Determine blobs missing on remote
try:
- cascache.push_directory(casremote, upload_vdir)
+ missing_blobs = cascache.remote_missing_blobs_for_directory(casremote, input_root_digest)
except grpc.RpcError as e:
- raise SandboxError("Failed to push source directory to remote: {}".format(e)) from e
+ raise SandboxError("Failed to determine missing blobs: {}".format(e)) from e
- if not casremote.verify_digest_on_remote(upload_vdir._get_digest()):
- raise SandboxError("Failed to verify that source has been pushed to the remote artifact cache.")
+ # Check if any blobs are also missing locally (partial artifact)
+ # and pull them from the artifact cache.
+ try:
+ local_missing_blobs = cascache.local_missing_blobs(missing_blobs)
+ if local_missing_blobs:
+ artifactcache.fetch_missing_blobs(project, local_missing_blobs)
+ except (grpc.RpcError, BstError) as e:
+ raise SandboxError("Failed to pull missing blobs from artifact cache: {}".format(e)) from e
+
+ # Now, push the missing blobs to the remote.
+ try:
+ cascache.send_blobs(casremote, missing_blobs)
+ except grpc.RpcError as e:
+ raise SandboxError("Failed to push source directory to remote: {}".format(e)) from e
# Push command and action
try:
diff --git a/tests/artifactcache/push.py b/tests/artifactcache/push.py
index 69f3fbfbb..56af50a0d 100644
--- a/tests/artifactcache/push.py
+++ b/tests/artifactcache/push.py
@@ -136,114 +136,6 @@ def _test_push(user_config_file, project_dir, element_name, element_key, queue):
@pytest.mark.datafiles(DATA_DIR)
-def test_push_directory(cli, tmpdir, datafiles):
- project_dir = str(datafiles)
-
- # First build the project without the artifact cache configured
- result = cli.run(project=project_dir, args=['build', 'target.bst'])
- result.assert_success()
-
- # Assert that we are now cached locally
- assert cli.get_element_state(project_dir, 'target.bst') == 'cached'
-
- # Set up an artifact cache.
- with create_artifact_share(os.path.join(str(tmpdir), 'artifactshare')) as share:
- # Configure artifact share
- rootcache_dir = os.path.join(str(tmpdir), 'cache')
- user_config_file = str(tmpdir.join('buildstream.conf'))
- user_config = {
- 'scheduler': {
- 'pushers': 1
- },
- 'artifacts': {
- 'url': share.repo,
- 'push': True,
- },
- 'cachedir': rootcache_dir
- }
-
- # Write down the user configuration file
- _yaml.dump(_yaml.node_sanitize(user_config), filename=user_config_file)
-
- # Fake minimal context
- context = Context()
- context.load(config=user_config_file)
- context.set_message_handler(message_handler)
-
- # Load the project and CAS cache
- project = Project(project_dir, context)
- project.ensure_fully_loaded()
- artifactcache = context.artifactcache
- cas = artifactcache.cas
-
- # Assert that the element's artifact is cached
- element = project.load_elements(['target.bst'])[0]
- element_key = cli.get_element_key(project_dir, 'target.bst')
- assert artifactcache.contains(element, element_key)
-
- # Manually setup the CAS remote
- artifactcache.setup_remotes(use_config=True)
- artifactcache.initialize_remotes()
- assert artifactcache.has_push_remotes(plugin=element)
-
- # Recreate the CasBasedDirectory object from the cached artifact
- artifact_ref = element.get_artifact_name(element_key)
- artifact_digest = cas.resolve_ref(artifact_ref)
-
- queue = multiprocessing.Queue()
- # Use subprocess to avoid creation of gRPC threads in main BuildStream process
- # See https://github.com/grpc/grpc/blob/master/doc/fork_support.md for details
- process = multiprocessing.Process(target=_queue_wrapper,
- args=(_test_push_directory, queue, user_config_file,
- project_dir, artifact_digest))
-
- try:
- # Keep SIGINT blocked in the child process
- with _signals.blocked([signal.SIGINT], ignore=False):
- process.start()
-
- directory_hash = queue.get()
- process.join()
- except KeyboardInterrupt:
- utils._kill_process_tree(process.pid)
- raise
-
- assert directory_hash
- assert artifact_digest.hash == directory_hash
- assert share.has_object(artifact_digest)
-
-
-def _test_push_directory(user_config_file, project_dir, artifact_digest, queue):
- # Fake minimal context
- context = Context()
- context.load(config=user_config_file)
- context.set_message_handler(message_handler)
-
- # Load the project manually
- project = Project(project_dir, context)
- project.ensure_fully_loaded()
-
- # Create a local CAS cache handle
- cas = context.artifactcache
-
- # Manually setup the CAS remote
- cas.setup_remotes(use_config=True)
- cas.initialize_remotes()
-
- if cas.has_push_remotes():
- # Create a CasBasedDirectory from local CAS cache content
- directory = CasBasedDirectory(context.artifactcache.cas, digest=artifact_digest)
-
- # Push the CasBasedDirectory object
- cas.push_directory(project, directory)
-
- digest = directory._get_digest()
- queue.put(digest.hash)
- else:
- queue.put("No remote configured")
-
-
-@pytest.mark.datafiles(DATA_DIR)
def test_push_message(tmpdir, datafiles):
project_dir = str(datafiles)