diff options
Diffstat (limited to 'buildstream')
-rw-r--r-- | buildstream/_artifact.py | 2 | ||||
-rw-r--r-- | buildstream/_artifactcache.py | 27 | ||||
-rw-r--r-- | buildstream/_cas/cascache.py | 86 | ||||
-rw-r--r-- | buildstream/_stream.py | 7 | ||||
-rw-r--r-- | buildstream/element.py | 56 | ||||
-rw-r--r-- | buildstream/sandbox/_sandboxremote.py | 48 | ||||
-rw-r--r-- | buildstream/sandbox/sandbox.py | 16 |
7 files changed, 188 insertions, 54 deletions
diff --git a/buildstream/_artifact.py b/buildstream/_artifact.py index 6d26eb392..ba9c626fc 100644 --- a/buildstream/_artifact.py +++ b/buildstream/_artifact.py @@ -469,7 +469,7 @@ class Artifact(): # Determine whether directories are required require_directories = context.require_artifact_directories # Determine whether file contents are required as well - require_files = context.require_artifact_files + require_files = context.require_artifact_files or self._element._artifact_files_required() filesdigest = vdir._get_child_digest('files') diff --git a/buildstream/_artifactcache.py b/buildstream/_artifactcache.py index fb0670e3e..7a6f2ea0c 100644 --- a/buildstream/_artifactcache.py +++ b/buildstream/_artifactcache.py @@ -436,3 +436,30 @@ class ArtifactCache(BaseCache): if missing_blobs: raise ArtifactError("Blobs not found on configured artifact servers") + + # find_missing_blobs(): + # + # Find missing blobs from configured push remote repositories. + # + # Args: + # project (Project): The current project + # missing_blobs (list): The Digests of the blobs to check + # + # Returns: + # (list): The Digests of the blobs missing on at least one push remote + # + def find_missing_blobs(self, project, missing_blobs): + if not missing_blobs: + return [] + + push_remotes = [r for r in self._remotes[project] if r.spec.push] + + remote_missing_blobs_set = set() + + for remote in push_remotes: + remote.init() + + remote_missing_blobs = self.cas.remote_missing_blobs(remote, missing_blobs) + remote_missing_blobs_set.update(remote_missing_blobs) + + return list(remote_missing_blobs_set) diff --git a/buildstream/_cas/cascache.py b/buildstream/_cas/cascache.py index eae3ef04d..5f67dc0c1 100644 --- a/buildstream/_cas/cascache.py +++ b/buildstream/_cas/cascache.py @@ -268,15 +268,13 @@ class CASCache(): request.key = ref response = remote.ref_storage.GetReference(request) - tree = remote_execution_pb2.Digest() - tree.hash = response.digest.hash - tree.size_bytes = response.digest.size_bytes + tree = response.digest # Fetch Directory objects self._fetch_directory(remote, tree) # Fetch files, excluded_subdirs determined in pullqueue - required_blobs = self._required_blobs(tree, excluded_subdirs=excluded_subdirs) + required_blobs = self.required_blobs_for_directory(tree, excluded_subdirs=excluded_subdirs) missing_blobs = self.local_missing_blobs(required_blobs) if missing_blobs: self.fetch_blobs(remote, missing_blobs) @@ -368,8 +366,7 @@ class CASCache(): request = buildstream_pb2.UpdateReferenceRequest(instance_name=remote.spec.instance_name) request.keys.append(ref) - request.digest.hash = tree.hash - request.digest.size_bytes = tree.size_bytes + request.digest.CopyFrom(tree) remote.ref_storage.UpdateReference(request) skipped_remote = False @@ -647,23 +644,33 @@ class CASCache(): # Returns: List of missing Digest objects # def remote_missing_blobs_for_directory(self, remote, digest): - required_blobs = self._required_blobs(digest) + required_blobs = self.required_blobs_for_directory(digest) + return self.remote_missing_blobs(remote, required_blobs) + + # remote_missing_blobs(): + # + # Determine which blobs are missing on the remote. + # + # Args: + # blobs (Digest): The directory digest + # + # Returns: List of missing Digest objects + # + def remote_missing_blobs(self, remote, blobs): missing_blobs = dict() # Limit size of FindMissingBlobs request - for required_blobs_group in _grouper(required_blobs, 512): + for required_blobs_group in _grouper(blobs, 512): request = remote_execution_pb2.FindMissingBlobsRequest(instance_name=remote.spec.instance_name) for required_digest in required_blobs_group: d = request.blob_digests.add() - d.hash = required_digest.hash - d.size_bytes = required_digest.size_bytes + d.CopyFrom(required_digest) response = remote.cas.FindMissingBlobs(request) for missing_digest in response.missing_blob_digests: d = remote_execution_pb2.Digest() - d.hash = missing_digest.hash - d.size_bytes = missing_digest.size_bytes + d.CopyFrom(missing_digest) missing_blobs[d.hash] = d return missing_blobs.values() @@ -685,6 +692,31 @@ class CASCache(): missing_blobs.append(digest) return missing_blobs + # required_blobs_for_directory(): + # + # Generator that returns the Digests of all blobs in the tree specified by + # the Digest of the toplevel Directory object. + # + def required_blobs_for_directory(self, directory_digest, *, excluded_subdirs=None): + if not excluded_subdirs: + excluded_subdirs = [] + + # parse directory, and recursively add blobs + + yield directory_digest + + directory = remote_execution_pb2.Directory() + + with open(self.objpath(directory_digest), 'rb') as f: + directory.ParseFromString(f.read()) + + for filenode in directory.files: + yield filenode.digest + + for dirnode in directory.directories: + if dirnode.name not in excluded_subdirs: + yield from self.required_blobs_for_directory(dirnode.digest) + ################################################ # Local Private Methods # ################################################ @@ -881,31 +913,6 @@ class CASCache(): for dirnode in directory.directories: self._reachable_refs_dir(reachable, dirnode.digest, update_mtime=update_mtime, check_exists=check_exists) - def _required_blobs(self, directory_digest, *, excluded_subdirs=None): - if not excluded_subdirs: - excluded_subdirs = [] - - # parse directory, and recursively add blobs - d = remote_execution_pb2.Digest() - d.hash = directory_digest.hash - d.size_bytes = directory_digest.size_bytes - yield d - - directory = remote_execution_pb2.Directory() - - with open(self.objpath(directory_digest), 'rb') as f: - directory.ParseFromString(f.read()) - - for filenode in directory.files: - d = remote_execution_pb2.Digest() - d.hash = filenode.digest.hash - d.size_bytes = filenode.digest.size_bytes - yield d - - for dirnode in directory.directories: - if dirnode.name not in excluded_subdirs: - yield from self._required_blobs(dirnode.digest) - # _temporary_object(): # # Returns: @@ -1042,11 +1049,6 @@ class CASCache(): tree.children.extend([tree.root]) for directory in tree.children: - for filenode in directory.files: - self._ensure_blob(remote, filenode.digest) - - # place directory blob only in final location when we've downloaded - # all referenced blobs to avoid dangling references in the repository dirbuffer = directory.SerializeToString() dirdigest = self.add_object(buffer=dirbuffer) assert dirdigest.size_bytes == len(dirbuffer) diff --git a/buildstream/_stream.py b/buildstream/_stream.py index 73bc4badd..64a578c92 100644 --- a/buildstream/_stream.py +++ b/buildstream/_stream.py @@ -247,6 +247,13 @@ class Stream(): # Assert that the elements we're not going to track are consistent self._pipeline.assert_consistent(elements) + if all(project.remote_execution_specs for project in self._context.get_projects()): + # Remote execution is configured for all projects. + # Require artifact files only for target elements and their runtime dependencies. + self._context.set_artifact_files_optional() + for element in self.targets: + element._set_artifact_files_required() + # Now construct the queues # track_queue = None diff --git a/buildstream/element.py b/buildstream/element.py index 5c28b4753..bc8f25cf8 100644 --- a/buildstream/element.py +++ b/buildstream/element.py @@ -227,6 +227,7 @@ class Element(Plugin): self.__staged_sources_directory = None # Location where Element.stage_sources() was called self.__tainted = None # Whether the artifact is tainted and should not be shared self.__required = False # Whether the artifact is required in the current session + self.__artifact_files_required = False # Whether artifact files are required in the local cache self.__build_result = None # The result of assembling this Element (success, description, detail) self._build_log_path = None # The path of the build log for this Element self.__artifact = Artifact(self, context) # Artifact class for direct artifact composite interaction @@ -1556,6 +1557,29 @@ class Element(Plugin): def _is_required(self): return self.__required + # _set_artifact_files_required(): + # + # Mark artifact files for this element and its runtime dependencies as + # required in the local cache. + # + def _set_artifact_files_required(self): + if self.__artifact_files_required: + # Already done + return + + self.__artifact_files_required = True + + # Request artifact files of runtime dependencies + for dep in self.dependencies(Scope.RUN, recurse=False): + dep._set_artifact_files_required() + + # _artifact_files_required(): + # + # Returns whether artifact files for this element have been marked as required. + # + def _artifact_files_required(self): + return self.__artifact_files_required + # _schedule_assemble(): # # This is called in the main process before the element is assembled @@ -1661,6 +1685,15 @@ class Element(Plugin): with _signals.terminator(cleanup_rootdir), \ self.__sandbox(rootdir, output_file, output_file, self.__sandbox_config) as sandbox: # noqa + # Let the sandbox know whether the buildtree will be required. + # This allows the remote execution sandbox to skip buildtree + # download when it's not needed. + buildroot = self.get_variable('build-root') + cache_buildtrees = context.cache_buildtrees + if cache_buildtrees != 'never': + always_cache_buildtrees = cache_buildtrees == 'always' + sandbox._set_build_directory(buildroot, always=always_cache_buildtrees) + if not self.BST_RUN_COMMANDS: # Element doesn't need to run any commands in the sandbox. # @@ -2348,7 +2381,7 @@ class Element(Plugin): # supports it. # def __use_remote_execution(self): - return self.__remote_execution_specs and self.BST_VIRTUAL_DIRECTORY + return bool(self.__remote_execution_specs) # __sandbox(): # @@ -2376,8 +2409,15 @@ class Element(Plugin): if directory is not None and allow_remote and self.__use_remote_execution(): + if not self.BST_VIRTUAL_DIRECTORY: + raise ElementError("Element {} is configured to use remote execution but plugin does not support it." + .format(self.name), detail="Plugin '{kind}' does not support virtual directories." + .format(kind=self.get_kind())) + self.info("Using a remote sandbox for artifact {} with directory '{}'".format(self.name, directory)) + output_files_required = context.require_artifact_files or self._artifact_files_required() + sandbox = SandboxRemote(context, project, directory, plugin=self, @@ -2386,16 +2426,11 @@ class Element(Plugin): config=config, specs=self.__remote_execution_specs, bare_directory=bare_directory, - allow_real_directory=False) + allow_real_directory=False, + output_files_required=output_files_required) yield sandbox elif directory is not None and os.path.exists(directory): - if allow_remote and self.__remote_execution_specs: - self.warn("Artifact {} is configured to use remote execution but element plugin does not support it." - .format(self.name), detail="Element plugin '{kind}' does not support virtual directories." - .format(kind=self.get_kind()), warning_token="remote-failure") - - self.info("Falling back to local sandbox for artifact {}".format(self.name)) sandbox = platform.create_sandbox(context, project, directory, @@ -2960,6 +2995,11 @@ class Element(Plugin): subdir = "buildtree" excluded_subdirs.remove(subdir) + # If file contents are not required for this element, don't pull them. + # The directories themselves will always be pulled. + if not context.require_artifact_files and not self._artifact_files_required(): + excluded_subdirs.append("files") + return (subdir, excluded_subdirs) # __cache_sources(): diff --git a/buildstream/sandbox/_sandboxremote.py b/buildstream/sandbox/_sandboxremote.py index ffd53f0d0..2cb7e2538 100644 --- a/buildstream/sandbox/_sandboxremote.py +++ b/buildstream/sandbox/_sandboxremote.py @@ -29,6 +29,7 @@ import grpc from .. import utils from .._message import Message, MessageType from .sandbox import Sandbox, SandboxCommandError, _SandboxBatch +from ..storage.directory import VirtualDirectoryError from ..storage._casbaseddirectory import CasBasedDirectory from .. import _signals from .._protos.build.bazel.remote.execution.v2 import remote_execution_pb2, remote_execution_pb2_grpc @@ -53,6 +54,8 @@ class SandboxRemote(Sandbox): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) + self._output_files_required = kwargs.get('output_files_required', True) + config = kwargs['specs'] # This should be a RemoteExecutionSpec if config is None: return @@ -251,7 +254,7 @@ class SandboxRemote(Sandbox): raise SandboxError("Failed trying to send CancelOperation request: " "{} ({})".format(e.details(), e.code().name)) - def process_job_output(self, output_directories, output_files): + def process_job_output(self, output_directories, output_files, *, failure): # Reads the remote execution server response to an execution request. # # output_directories is an array of OutputDirectory objects. @@ -274,10 +277,12 @@ class SandboxRemote(Sandbox): raise SandboxError("Output directory structure had no digest attached.") context = self._get_context() + project = self._get_project() cascache = context.get_cascache() + artifactcache = context.artifactcache casremote = CASRemote(self.storage_remote_spec) - # Now do a pull to ensure we have the necessary parts. + # Now do a pull to ensure we have the full directory structure. dir_digest = cascache.pull_tree(casremote, tree_digest) if dir_digest is None or not dir_digest.hash or not dir_digest.size_bytes: raise SandboxError("Output directory structure pulling from remote failed.") @@ -289,6 +294,42 @@ class SandboxRemote(Sandbox): new_dir = CasBasedDirectory(context.artifactcache.cas, digest=dir_digest) self._set_virtual_directory(new_dir) + # Fetch the file blobs if needed + if self._output_files_required or artifactcache.has_push_remotes(): + required_blobs = [] + directories = [] + + directories.append(self._output_directory) + if self._build_directory and (self._build_directory_always or failure): + directories.append(self._build_directory) + + for directory in directories: + try: + vdir = new_dir.descend(*directory.strip(os.sep).split(os.sep)) + dir_digest = vdir._get_digest() + required_blobs += cascache.required_blobs_for_directory(dir_digest) + except VirtualDirectoryError: + # If the directory does not exist, there is no need to + # download file blobs. + pass + + local_missing_blobs = cascache.local_missing_blobs(required_blobs) + if local_missing_blobs: + if self._output_files_required: + # Fetch all blobs from Remote Execution CAS server + blobs_to_fetch = local_missing_blobs + else: + # Output files are not required in the local cache, + # however, artifact push remotes will need them. + # Only fetch blobs that are missing on one or multiple + # artifact servers. + blobs_to_fetch = artifactcache.find_missing_blobs(project, local_missing_blobs) + + remote_missing_blobs = cascache.fetch_blobs(casremote, blobs_to_fetch) + if remote_missing_blobs: + raise SandboxError("{} output files are missing on the CAS server" + .format(len(remote_missing_blobs))) + def _run(self, command, flags, *, cwd, env): stdout, stderr = self._get_output() @@ -377,7 +418,8 @@ class SandboxRemote(Sandbox): action_result = self._extract_action_result(operation) # Get output of build - self.process_job_output(action_result.output_directories, action_result.output_files) + self.process_job_output(action_result.output_directories, action_result.output_files, + failure=action_result.exit_code != 0) if stdout: if action_result.stdout_raw: diff --git a/buildstream/sandbox/sandbox.py b/buildstream/sandbox/sandbox.py index 93db2f8ca..c96ccb57b 100644 --- a/buildstream/sandbox/sandbox.py +++ b/buildstream/sandbox/sandbox.py @@ -147,6 +147,8 @@ class Sandbox(): os.makedirs(directory_, exist_ok=True) self._output_directory = None + self._build_directory = None + self._build_directory_always = None self._vdir = None self._usebuildtree = False @@ -592,6 +594,20 @@ class Sandbox(): def _disable_run(self): self.__allow_run = False + # _set_build_directory() + # + # Sets the build directory - the directory which may be preserved as + # buildtree in the artifact. + # + # Args: + # directory (str): An absolute path within the sandbox + # always (bool): True if the build directory should always be downloaded, + # False if it should be downloaded only on failure + # + def _set_build_directory(self, directory, *, always): + self._build_directory = directory + self._build_directory_always = always + # _SandboxBatch() # |