diff options
Diffstat (limited to 'src/buildstream/_stream.py')
-rw-r--r-- | src/buildstream/_stream.py | 321 |
1 files changed, 69 insertions, 252 deletions
diff --git a/src/buildstream/_stream.py b/src/buildstream/_stream.py index 402473e33..aa14f12c9 100644 --- a/src/buildstream/_stream.py +++ b/src/buildstream/_stream.py @@ -74,13 +74,7 @@ from . import Scope, Consistency # class Stream: def __init__( - self, - context, - session_start, - *, - session_start_callback=None, - interrupt_callback=None, - ticker_callback=None + self, context, session_start, *, session_start_callback=None, interrupt_callback=None, ticker_callback=None ): # @@ -101,26 +95,18 @@ class Stream: self._pipeline = None self._state = State(session_start) # Owned by Stream, used by Core to set state self._notification_queue = deque() - self._starttime = ( - session_start # Synchronised with Scheduler's relative start time - ) + self._starttime = session_start # Synchronised with Scheduler's relative start time context.messenger.set_state(self._state) self._scheduler = Scheduler( - context, - session_start, - self._state, - self._notification_queue, - self._scheduler_notification_handler, + context, session_start, self._state, self._notification_queue, self._scheduler_notification_handler, ) self._first_non_track_queue = None self._session_start_callback = session_start_callback self._ticker_callback = ticker_callback self._interrupt_callback = interrupt_callback - self._notifier = ( - self._scheduler._stream_notification_handler - ) # Assign the schedulers notification handler + self._notifier = self._scheduler._stream_notification_handler # Assign the schedulers notification handler self._scheduler_running = False self._scheduler_terminated = False self._scheduler_suspended = False @@ -177,9 +163,7 @@ class Stream: use_artifact_config=False, load_refs=False ): - with PROFILER.profile( - Topics.LOAD_SELECTION, "_".join(t.replace(os.sep, "-") for t in targets) - ): + with PROFILER.profile(Topics.LOAD_SELECTION, "_".join(t.replace(os.sep, "-") for t in targets)): target_objects, _ = self._load( targets, (), @@ -233,22 +217,15 @@ class Stream: # in which case we just blindly trust the directory, using the element # definitions to control the execution environment only. if directory is None: - missing_deps = [ - dep - for dep in self._pipeline.dependencies([element], scope) - if not dep._cached() - ] + missing_deps = [dep for dep in self._pipeline.dependencies([element], scope) if not dep._cached()] if missing_deps: if not pull_dependencies: raise StreamError( "Elements need to be built or downloaded before staging a shell environment", - detail="\n".join( - list(map(lambda x: x._get_full_name(), missing_deps)) - ), + detail="\n".join(list(map(lambda x: x._get_full_name(), missing_deps))), ) self._message( - MessageType.INFO, - "Attempting to fetch missing or incomplete artifacts", + MessageType.INFO, "Attempting to fetch missing or incomplete artifacts", ) self._scheduler.clear_queues() self._add_queue(PullQueue(self._scheduler)) @@ -264,8 +241,7 @@ class Stream: # Attempt a pull queue for the given element if remote and context allow it if require_buildtree: self._message( - MessageType.INFO, - "Attempting to fetch missing artifact buildtree", + MessageType.INFO, "Attempting to fetch missing artifact buildtree", ) self._scheduler.clear_queues() self._add_queue(PullQueue(self._scheduler)) @@ -280,20 +256,12 @@ class Stream: if usebuildtree == "always": raise StreamError(message) - self._message( - MessageType.INFO, message + ", shell will be loaded without it" - ) + self._message(MessageType.INFO, message + ", shell will be loaded without it") else: buildtree = True return element._shell( - scope, - directory, - mounts=mounts, - isolate=isolate, - prompt=prompt, - command=command, - usebuildtree=buildtree, + scope, directory, mounts=mounts, isolate=isolate, prompt=prompt, command=command, usebuildtree=buildtree, ) # build() @@ -309,14 +277,7 @@ class Stream: # If `remote` specified as None, then regular configuration will be used # to determine where to push artifacts to. # - def build( - self, - targets, - *, - selection=PipelineSelection.PLAN, - ignore_junction_targets=False, - remote=None - ): + def build(self, targets, *, selection=PipelineSelection.PLAN, ignore_junction_targets=False, remote=None): use_config = True if remote: @@ -336,9 +297,7 @@ class Stream: # Assert that the elements are consistent self._pipeline.assert_consistent(elements) - if all( - project.remote_execution_specs for project in self._context.get_projects() - ): + if all(project.remote_execution_specs for project in self._context.get_projects()): # Remote execution is configured for all projects. # Require artifact files only for target elements and their runtime dependencies. self._context.set_artifact_files_optional() @@ -434,14 +393,7 @@ class Stream: # If no error is encountered while tracking, then the project files # are rewritten inline. # - def track( - self, - targets, - *, - selection=PipelineSelection.REDIRECT, - except_targets=None, - cross_junctions=False - ): + def track(self, targets, *, selection=PipelineSelection.REDIRECT, except_targets=None, cross_junctions=False): # We pass no target to build. Only to track. Passing build targets # would fully load project configuration which might not be @@ -475,14 +427,7 @@ class Stream: # If `remote` specified as None, then regular configuration will be used # to determine where to pull artifacts from. # - def pull( - self, - targets, - *, - selection=PipelineSelection.NONE, - ignore_junction_targets=False, - remote=None - ): + def pull(self, targets, *, selection=PipelineSelection.NONE, ignore_junction_targets=False, remote=None): use_config = True if remote: @@ -524,14 +469,7 @@ class Stream: # a pull queue will be created if user context and available remotes allow for # attempting to fetch them. # - def push( - self, - targets, - *, - selection=PipelineSelection.NONE, - ignore_junction_targets=False, - remote=None - ): + def push(self, targets, *, selection=PipelineSelection.NONE, ignore_junction_targets=False, remote=None): use_config = True if remote: @@ -555,9 +493,7 @@ class Stream: # Check if we require a pull queue, with given artifact state and context require_buildtrees = self._buildtree_pull_required(elements) if require_buildtrees: - self._message( - MessageType.INFO, "Attempting to fetch missing artifact buildtrees" - ) + self._message(MessageType.INFO, "Attempting to fetch missing artifact buildtrees") self._add_queue(PullQueue(self._scheduler)) self._enqueue_plan(require_buildtrees) @@ -589,10 +525,7 @@ class Stream: # NOTE: Usually we check the _SchedulerErrorAction when a *job* has failed. # However, we cannot create a PushQueue job unless we intentionally # ready an uncached element in the PushQueue. - if ( - self._context.sched_error_action == _SchedulerErrorAction.CONTINUE - and uncached_elements - ): + if self._context.sched_error_action == _SchedulerErrorAction.CONTINUE and uncached_elements: names = [element.name for element in uncached_elements] fail_str = ( "Error while pushing. The following elements were not pushed as they are " @@ -635,9 +568,7 @@ class Stream: tar=False ): - elements, _ = self._load( - (target,), (), selection=selection, use_artifact_config=True, load_refs=True - ) + elements, _ = self._load((target,), (), selection=selection, use_artifact_config=True, load_refs=True) # self.targets contains a list of the loaded target objects # if we specify --deps build, Stream._load() will return a list @@ -649,9 +580,7 @@ class Stream: uncached_elts = [elt for elt in elements if not elt._cached()] if uncached_elts and pull: - self._message( - MessageType.INFO, "Attempting to fetch missing or incomplete artifact" - ) + self._message(MessageType.INFO, "Attempting to fetch missing or incomplete artifact") self._scheduler.clear_queues() self._add_queue(PullQueue(self._scheduler)) self._enqueue_plan(uncached_elts) @@ -664,19 +593,13 @@ class Stream: "none": Scope.NONE, "all": Scope.ALL, } - with target._prepare_sandbox( - scope=scope[selection], directory=None, integrate=integrate - ) as sandbox: + with target._prepare_sandbox(scope=scope[selection], directory=None, integrate=integrate) as sandbox: # Copy or move the sandbox to the target directory virdir = sandbox.get_virtual_directory() - self._export_artifact( - tar, location, compression, target, hardlinks, virdir - ) + self._export_artifact(tar, location, compression, target, hardlinks, virdir) except BstError as e: raise StreamError( - "Error while staging dependencies into a sandbox" ": '{}'".format(e), - detail=e.detail, - reason=e.reason, + "Error while staging dependencies into a sandbox" ": '{}'".format(e), detail=e.detail, reason=e.reason, ) from e # _export_artifact() @@ -729,9 +652,7 @@ class Stream: # def artifact_show(self, targets, *, selection=PipelineSelection.NONE): # Obtain list of Element and/or ArtifactElement objects - target_objects = self.load_selection( - targets, selection=selection, use_artifact_config=True, load_refs=True - ) + target_objects = self.load_selection(targets, selection=selection, use_artifact_config=True, load_refs=True) if self._artifacts.has_fetch_remotes(): self._pipeline.check_remotes(target_objects) @@ -756,9 +677,7 @@ class Stream: # def artifact_log(self, targets): # Return list of Element and/or ArtifactElement objects - target_objects = self.load_selection( - targets, selection=PipelineSelection.NONE, load_refs=True - ) + target_objects = self.load_selection(targets, selection=PipelineSelection.NONE, load_refs=True) artifact_logs = {} for obj in target_objects: @@ -767,9 +686,7 @@ class Stream: self._message(MessageType.WARN, "{} is not cached".format(ref)) continue elif not obj._cached_logs(): - self._message( - MessageType.WARN, "{} is cached without log files".format(ref) - ) + self._message(MessageType.WARN, "{} is cached without log files".format(ref)) continue artifact_logs[obj.name] = obj.get_logs() @@ -788,9 +705,7 @@ class Stream: # def artifact_list_contents(self, targets): # Return list of Element and/or ArtifactElement objects - target_objects = self.load_selection( - targets, selection=PipelineSelection.NONE, load_refs=True - ) + target_objects = self.load_selection(targets, selection=PipelineSelection.NONE, load_refs=True) elements_to_files = {} for obj in target_objects: @@ -814,9 +729,7 @@ class Stream: # def artifact_delete(self, targets, *, selection=PipelineSelection.NONE): # Return list of Element and/or ArtifactElement objects - target_objects = self.load_selection( - targets, selection=selection, load_refs=True - ) + target_objects = self.load_selection(targets, selection=selection, load_refs=True) # Some of the targets may refer to the same key, so first obtain a # set of the refs to be removed. @@ -869,9 +782,7 @@ class Stream: self._check_location_writable(location, force=force, tar=tar) - elements, _ = self._load( - (target,), (), selection=deps, except_targets=except_targets - ) + elements, _ = self._load((target,), (), selection=deps, except_targets=except_targets) # Assert all sources are cached in the source dir self._fetch(elements) @@ -879,14 +790,10 @@ class Stream: # Stage all sources determined by scope try: - self._source_checkout( - elements, location, force, deps, tar, compression, include_build_scripts - ) + self._source_checkout(elements, location, force, deps, tar, compression, include_build_scripts) except BstError as e: raise StreamError( - "Error while writing sources" ": '{}'".format(e), - detail=e.detail, - reason=e.reason, + "Error while writing sources" ": '{}'".format(e), detail=e.detail, reason=e.reason, ) from e self._message(MessageType.INFO, "Checked out sources to '{}'".format(location)) @@ -934,18 +841,12 @@ class Stream: for target in elements: if not list(target.sources()): - build_depends = [ - x.name for x in target.dependencies(Scope.BUILD, recurse=False) - ] + build_depends = [x.name for x in target.dependencies(Scope.BUILD, recurse=False)] if not build_depends: - raise StreamError( - "The element {} has no sources".format(target.name) - ) + raise StreamError("The element {} has no sources".format(target.name)) detail = "Try opening a workspace on one of its dependencies instead:\n" detail += " \n".join(build_depends) - raise StreamError( - "The element {} has no sources".format(target.name), detail=detail - ) + raise StreamError("The element {} has no sources".format(target.name), detail=detail) # Check for workspace config workspace = workspaces.get_workspace(target._get_full_name()) @@ -962,16 +863,10 @@ class Stream: target.name, workspace.get_absolute_path() ) ) - self.workspace_close( - target._get_full_name(), remove_dir=not no_checkout - ) + self.workspace_close(target._get_full_name(), remove_dir=not no_checkout) target_consistency = target._get_consistency() - if ( - not no_checkout - and target_consistency < Consistency.CACHED - and target_consistency._source_cached() - ): + if not no_checkout and target_consistency < Consistency.CACHED and target_consistency._source_cached(): raise StreamError( "Could not stage uncached source. For {} ".format(target.name) + "Use `--track` to track and " @@ -980,9 +875,7 @@ class Stream: ) if not custom_dir: - directory = os.path.abspath( - os.path.join(self._context.workspacedir, target.name) - ) + directory = os.path.abspath(os.path.join(self._context.workspacedir, target.name)) if directory[-4:] == ".bst": directory = directory[:-4] expanded_directories.append(directory) @@ -1006,17 +899,13 @@ class Stream: if os.path.exists(directory): if not os.path.isdir(directory): raise StreamError( - "For element '{}', Directory path is not a directory: {}".format( - target.name, directory - ), + "For element '{}', Directory path is not a directory: {}".format(target.name, directory), reason="bad-directory", ) if not (no_checkout or force) and os.listdir(directory): raise StreamError( - "For element '{}', Directory path is not empty: {}".format( - target.name, directory - ), + "For element '{}', Directory path is not empty: {}".format(target.name, directory), reason="bad-directory", ) if os.listdir(directory): @@ -1028,8 +917,7 @@ class Stream: targetGenerator = zip(elements, expanded_directories) for target, directory in targetGenerator: self._message( - MessageType.INFO, - "Creating workspace for element {}".format(target.name), + MessageType.INFO, "Creating workspace for element {}".format(target.name), ) workspace = workspaces.get_workspace(target._get_full_name()) @@ -1040,22 +928,15 @@ class Stream: try: os.makedirs(directory, exist_ok=True) except OSError as e: - todo_elements = " ".join( - [str(target.name) for target, directory_dict in targetGenerator] - ) + todo_elements = " ".join([str(target.name) for target, directory_dict in targetGenerator]) if todo_elements: # This output should make creating the remaining workspaces as easy as possible. - todo_elements = ( - "\nDid not try to create workspaces for " + todo_elements - ) - raise StreamError( - "Failed to create workspace directory: {}".format(e) + todo_elements - ) from e + todo_elements = "\nDid not try to create workspaces for " + todo_elements + raise StreamError("Failed to create workspace directory: {}".format(e) + todo_elements) from e workspaces.create_workspace(target, directory, checkout=not no_checkout) self._message( - MessageType.INFO, - "Created a workspace for element: {}".format(target._get_full_name()), + MessageType.INFO, "Created a workspace for element: {}".format(target._get_full_name()), ) # workspace_close @@ -1078,11 +959,7 @@ class Stream: try: shutil.rmtree(workspace.get_absolute_path()) except OSError as e: - raise StreamError( - "Could not remove '{}': {}".format( - workspace.get_absolute_path(), e - ) - ) from e + raise StreamError("Could not remove '{}': {}".format(workspace.get_absolute_path(), e)) from e # Delete the workspace and save the configuration workspaces.delete_workspace(element_name) @@ -1102,10 +979,7 @@ class Stream: def workspace_reset(self, targets, *, soft, track_first): elements, _ = self._load( - targets, - [], - selection=PipelineSelection.REDIRECT, - track_selection=PipelineSelection.REDIRECT, + targets, [], selection=PipelineSelection.REDIRECT, track_selection=PipelineSelection.REDIRECT, ) nonexisting = [] @@ -1123,10 +997,7 @@ class Stream: if soft: workspace.prepared = False self._message( - MessageType.INFO, - "Reset workspace state for {} at: {}".format( - element.name, workspace_path - ), + MessageType.INFO, "Reset workspace state for {} at: {}".format(element.name, workspace_path), ) continue @@ -1218,10 +1089,7 @@ class Stream: output_elements.add(e) if load_elements: loaded_elements, _ = self._load( - load_elements, - (), - selection=PipelineSelection.REDIRECT, - track_selection=PipelineSelection.REDIRECT, + load_elements, (), selection=PipelineSelection.REDIRECT, track_selection=PipelineSelection.REDIRECT, ) for e in loaded_elements: @@ -1379,15 +1247,9 @@ class Stream: if target_artifacts: if not load_refs: detail = "\n".join(target_artifacts) - raise ArtifactElementError( - "Cannot perform this operation with artifact refs:", detail=detail - ) + raise ArtifactElementError("Cannot perform this operation with artifact refs:", detail=detail) if selection in (PipelineSelection.ALL, PipelineSelection.RUN): - raise StreamError( - "Error: '--deps {}' is not supported for artifact refs".format( - selection - ) - ) + raise StreamError("Error: '--deps {}' is not supported for artifact refs".format(selection)) # Load rewritable if we have any tracking selection to make rewritable = False @@ -1402,12 +1264,7 @@ class Stream: track_except_targets, ] if any(loadable): - ( - elements, - except_elements, - track_elements, - track_except_elements, - ) = self._pipeline.load( + (elements, except_elements, track_elements, track_except_elements,) = self._pipeline.load( loadable, rewritable=rewritable, ignore_workspaces=ignore_workspaces ) else: @@ -1419,9 +1276,7 @@ class Stream: ) # Load all target artifacts - artifacts = ( - self._pipeline.load_artifacts(target_artifacts) if target_artifacts else [] - ) + artifacts = self._pipeline.load_artifacts(target_artifacts) if target_artifacts else [] # Optionally filter out junction elements if ignore_junction_targets: @@ -1437,10 +1292,7 @@ class Stream: # This can happen with `bst build --track` # if targets and not self._pipeline.targets_include(elements, track_elements): - raise StreamError( - "Specified tracking targets that are not " - "within the scope of primary targets" - ) + raise StreamError("Specified tracking targets that are not " "within the scope of primary targets") # First take care of marking tracking elements, this must be # done before resolving element states. @@ -1462,14 +1314,10 @@ class Stream: for project, project_elements in track_projects.items(): selected = self._pipeline.get_selection(project_elements, track_selection) - selected = self._pipeline.track_cross_junction_filter( - project, selected, track_cross_junctions - ) + selected = self._pipeline.track_cross_junction_filter(project, selected, track_cross_junctions) track_selected.extend(selected) - track_selected = self._pipeline.except_elements( - track_elements, track_selected, track_except_elements - ) + track_selected = self._pipeline.except_elements(track_elements, track_selected, track_except_elements) for element in track_selected: element._schedule_tracking() @@ -1483,20 +1331,14 @@ class Stream: project.ensure_fully_loaded() # Connect to remote caches, this needs to be done before resolving element state - self._artifacts.setup_remotes( - use_config=use_artifact_config, remote_url=artifact_remote_url - ) - self._sourcecache.setup_remotes( - use_config=use_source_config, remote_url=source_remote_url - ) + self._artifacts.setup_remotes(use_config=use_artifact_config, remote_url=artifact_remote_url) + self._sourcecache.setup_remotes(use_config=use_source_config, remote_url=source_remote_url) # Now move on to loading primary selection. # self._pipeline.resolve_elements(self.targets) selected = self._pipeline.get_selection(self.targets, selection, silent=False) - selected = self._pipeline.except_elements( - self.targets, selected, except_elements - ) + selected = self._pipeline.except_elements(self.targets, selected, except_elements) if selection == PipelineSelection.PLAN and dynamic_plan: # We use a dynamic build plan, only request artifacts of top-level targets, @@ -1557,9 +1399,7 @@ class Stream: # unique_id (str): A unique_id to load an Element instance # def _failure_retry(self, action_name, unique_id): - notification = Notification( - NotificationType.RETRY, job_action=action_name, element=unique_id - ) + notification = Notification(NotificationType.RETRY, job_action=action_name, element=unique_id) self._notify(notification) # _run() @@ -1576,9 +1416,7 @@ class Stream: if self._session_start_callback is not None: self._session_start_callback() - status = self._scheduler.run( - self.queues, self._context.get_cascache().get_casd_process() - ) + status = self._scheduler.run(self.queues, self._context.get_cascache().get_casd_process()) if status == SchedStatus.ERROR: raise StreamError() @@ -1643,17 +1481,11 @@ class Stream: try: os.makedirs(location, exist_ok=True) except OSError as e: - raise StreamError( - "Failed to create destination directory: '{}'".format(e) - ) from e + raise StreamError("Failed to create destination directory: '{}'".format(e)) from e if not os.access(location, os.W_OK): - raise StreamError( - "Destination directory '{}' not writable".format(location) - ) + raise StreamError("Destination directory '{}' not writable".format(location)) if not force and os.listdir(location): - raise StreamError( - "Destination directory '{}' not empty".format(location) - ) + raise StreamError("Destination directory '{}' not empty".format(location)) elif os.path.exists(location) and location != "-": if not os.access(location, os.W_OK): raise StreamError("Output file '{}' not writable".format(location)) @@ -1666,9 +1498,7 @@ class Stream: try: utils.safe_remove(directory) except OSError as e: - raise StreamError( - "Failed to remove checkout directory: {}".format(e) - ) from e + raise StreamError("Failed to remove checkout directory: {}".format(e)) from e sandbox_vroot.export_files(directory, can_link=True, can_destroy=True) @@ -1698,9 +1528,7 @@ class Stream: else: self._move_directory(temp_source_dir.name, location, force) except OSError as e: - raise StreamError( - "Failed to checkout sources to {}: {}".format(location, e) - ) from e + raise StreamError("Failed to checkout sources to {}: {}".format(location, e)) from e finally: with suppress(FileNotFoundError): temp_source_dir.cleanup() @@ -1819,11 +1647,7 @@ class Stream: for element in elements: # Check if element is partially cached without its buildtree, as the element # artifact may not be cached at all - if ( - element._cached() - and not element._cached_buildtree() - and element._buildtree_exists() - ): + if element._cached() and not element._cached_buildtree() and element._buildtree_exists(): required_list.append(element) return required_list @@ -1877,10 +1701,7 @@ class Stream: artifact_refs.extend(self._artifacts.list_artifacts(glob=glob)) if not artifact_refs: self._message( - MessageType.WARN, - "No artifacts found for globs: {}".format( - ", ".join(artifact_globs) - ), + MessageType.WARN, "No artifacts found for globs: {}".format(", ".join(artifact_globs)), ) return element_targets, artifact_refs @@ -1897,16 +1718,12 @@ class Stream: elif notification.notification_type == NotificationType.TICK: self._ticker_callback() elif notification.notification_type == NotificationType.JOB_START: - self._state.add_task( - notification.job_action, notification.full_name, notification.time - ) + self._state.add_task(notification.job_action, notification.full_name, notification.time) elif notification.notification_type == NotificationType.JOB_COMPLETE: self._state.remove_task(notification.job_action, notification.full_name) if notification.job_status == JobStatus.FAIL: self._state.fail_task( - notification.job_action, - notification.full_name, - notification.element, + notification.job_action, notification.full_name, notification.element, ) elif notification.notification_type == NotificationType.SCHED_START_TIME: self._starttime = notification.time |