summaryrefslogtreecommitdiff
path: root/src/buildstream/_stream.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/buildstream/_stream.py')
-rw-r--r--src/buildstream/_stream.py321
1 files changed, 69 insertions, 252 deletions
diff --git a/src/buildstream/_stream.py b/src/buildstream/_stream.py
index 402473e33..aa14f12c9 100644
--- a/src/buildstream/_stream.py
+++ b/src/buildstream/_stream.py
@@ -74,13 +74,7 @@ from . import Scope, Consistency
#
class Stream:
def __init__(
- self,
- context,
- session_start,
- *,
- session_start_callback=None,
- interrupt_callback=None,
- ticker_callback=None
+ self, context, session_start, *, session_start_callback=None, interrupt_callback=None, ticker_callback=None
):
#
@@ -101,26 +95,18 @@ class Stream:
self._pipeline = None
self._state = State(session_start) # Owned by Stream, used by Core to set state
self._notification_queue = deque()
- self._starttime = (
- session_start # Synchronised with Scheduler's relative start time
- )
+ self._starttime = session_start # Synchronised with Scheduler's relative start time
context.messenger.set_state(self._state)
self._scheduler = Scheduler(
- context,
- session_start,
- self._state,
- self._notification_queue,
- self._scheduler_notification_handler,
+ context, session_start, self._state, self._notification_queue, self._scheduler_notification_handler,
)
self._first_non_track_queue = None
self._session_start_callback = session_start_callback
self._ticker_callback = ticker_callback
self._interrupt_callback = interrupt_callback
- self._notifier = (
- self._scheduler._stream_notification_handler
- ) # Assign the schedulers notification handler
+ self._notifier = self._scheduler._stream_notification_handler # Assign the schedulers notification handler
self._scheduler_running = False
self._scheduler_terminated = False
self._scheduler_suspended = False
@@ -177,9 +163,7 @@ class Stream:
use_artifact_config=False,
load_refs=False
):
- with PROFILER.profile(
- Topics.LOAD_SELECTION, "_".join(t.replace(os.sep, "-") for t in targets)
- ):
+ with PROFILER.profile(Topics.LOAD_SELECTION, "_".join(t.replace(os.sep, "-") for t in targets)):
target_objects, _ = self._load(
targets,
(),
@@ -233,22 +217,15 @@ class Stream:
# in which case we just blindly trust the directory, using the element
# definitions to control the execution environment only.
if directory is None:
- missing_deps = [
- dep
- for dep in self._pipeline.dependencies([element], scope)
- if not dep._cached()
- ]
+ missing_deps = [dep for dep in self._pipeline.dependencies([element], scope) if not dep._cached()]
if missing_deps:
if not pull_dependencies:
raise StreamError(
"Elements need to be built or downloaded before staging a shell environment",
- detail="\n".join(
- list(map(lambda x: x._get_full_name(), missing_deps))
- ),
+ detail="\n".join(list(map(lambda x: x._get_full_name(), missing_deps))),
)
self._message(
- MessageType.INFO,
- "Attempting to fetch missing or incomplete artifacts",
+ MessageType.INFO, "Attempting to fetch missing or incomplete artifacts",
)
self._scheduler.clear_queues()
self._add_queue(PullQueue(self._scheduler))
@@ -264,8 +241,7 @@ class Stream:
# Attempt a pull queue for the given element if remote and context allow it
if require_buildtree:
self._message(
- MessageType.INFO,
- "Attempting to fetch missing artifact buildtree",
+ MessageType.INFO, "Attempting to fetch missing artifact buildtree",
)
self._scheduler.clear_queues()
self._add_queue(PullQueue(self._scheduler))
@@ -280,20 +256,12 @@ class Stream:
if usebuildtree == "always":
raise StreamError(message)
- self._message(
- MessageType.INFO, message + ", shell will be loaded without it"
- )
+ self._message(MessageType.INFO, message + ", shell will be loaded without it")
else:
buildtree = True
return element._shell(
- scope,
- directory,
- mounts=mounts,
- isolate=isolate,
- prompt=prompt,
- command=command,
- usebuildtree=buildtree,
+ scope, directory, mounts=mounts, isolate=isolate, prompt=prompt, command=command, usebuildtree=buildtree,
)
# build()
@@ -309,14 +277,7 @@ class Stream:
# If `remote` specified as None, then regular configuration will be used
# to determine where to push artifacts to.
#
- def build(
- self,
- targets,
- *,
- selection=PipelineSelection.PLAN,
- ignore_junction_targets=False,
- remote=None
- ):
+ def build(self, targets, *, selection=PipelineSelection.PLAN, ignore_junction_targets=False, remote=None):
use_config = True
if remote:
@@ -336,9 +297,7 @@ class Stream:
# Assert that the elements are consistent
self._pipeline.assert_consistent(elements)
- if all(
- project.remote_execution_specs for project in self._context.get_projects()
- ):
+ if all(project.remote_execution_specs for project in self._context.get_projects()):
# Remote execution is configured for all projects.
# Require artifact files only for target elements and their runtime dependencies.
self._context.set_artifact_files_optional()
@@ -434,14 +393,7 @@ class Stream:
# If no error is encountered while tracking, then the project files
# are rewritten inline.
#
- def track(
- self,
- targets,
- *,
- selection=PipelineSelection.REDIRECT,
- except_targets=None,
- cross_junctions=False
- ):
+ def track(self, targets, *, selection=PipelineSelection.REDIRECT, except_targets=None, cross_junctions=False):
# We pass no target to build. Only to track. Passing build targets
# would fully load project configuration which might not be
@@ -475,14 +427,7 @@ class Stream:
# If `remote` specified as None, then regular configuration will be used
# to determine where to pull artifacts from.
#
- def pull(
- self,
- targets,
- *,
- selection=PipelineSelection.NONE,
- ignore_junction_targets=False,
- remote=None
- ):
+ def pull(self, targets, *, selection=PipelineSelection.NONE, ignore_junction_targets=False, remote=None):
use_config = True
if remote:
@@ -524,14 +469,7 @@ class Stream:
# a pull queue will be created if user context and available remotes allow for
# attempting to fetch them.
#
- def push(
- self,
- targets,
- *,
- selection=PipelineSelection.NONE,
- ignore_junction_targets=False,
- remote=None
- ):
+ def push(self, targets, *, selection=PipelineSelection.NONE, ignore_junction_targets=False, remote=None):
use_config = True
if remote:
@@ -555,9 +493,7 @@ class Stream:
# Check if we require a pull queue, with given artifact state and context
require_buildtrees = self._buildtree_pull_required(elements)
if require_buildtrees:
- self._message(
- MessageType.INFO, "Attempting to fetch missing artifact buildtrees"
- )
+ self._message(MessageType.INFO, "Attempting to fetch missing artifact buildtrees")
self._add_queue(PullQueue(self._scheduler))
self._enqueue_plan(require_buildtrees)
@@ -589,10 +525,7 @@ class Stream:
# NOTE: Usually we check the _SchedulerErrorAction when a *job* has failed.
# However, we cannot create a PushQueue job unless we intentionally
# ready an uncached element in the PushQueue.
- if (
- self._context.sched_error_action == _SchedulerErrorAction.CONTINUE
- and uncached_elements
- ):
+ if self._context.sched_error_action == _SchedulerErrorAction.CONTINUE and uncached_elements:
names = [element.name for element in uncached_elements]
fail_str = (
"Error while pushing. The following elements were not pushed as they are "
@@ -635,9 +568,7 @@ class Stream:
tar=False
):
- elements, _ = self._load(
- (target,), (), selection=selection, use_artifact_config=True, load_refs=True
- )
+ elements, _ = self._load((target,), (), selection=selection, use_artifact_config=True, load_refs=True)
# self.targets contains a list of the loaded target objects
# if we specify --deps build, Stream._load() will return a list
@@ -649,9 +580,7 @@ class Stream:
uncached_elts = [elt for elt in elements if not elt._cached()]
if uncached_elts and pull:
- self._message(
- MessageType.INFO, "Attempting to fetch missing or incomplete artifact"
- )
+ self._message(MessageType.INFO, "Attempting to fetch missing or incomplete artifact")
self._scheduler.clear_queues()
self._add_queue(PullQueue(self._scheduler))
self._enqueue_plan(uncached_elts)
@@ -664,19 +593,13 @@ class Stream:
"none": Scope.NONE,
"all": Scope.ALL,
}
- with target._prepare_sandbox(
- scope=scope[selection], directory=None, integrate=integrate
- ) as sandbox:
+ with target._prepare_sandbox(scope=scope[selection], directory=None, integrate=integrate) as sandbox:
# Copy or move the sandbox to the target directory
virdir = sandbox.get_virtual_directory()
- self._export_artifact(
- tar, location, compression, target, hardlinks, virdir
- )
+ self._export_artifact(tar, location, compression, target, hardlinks, virdir)
except BstError as e:
raise StreamError(
- "Error while staging dependencies into a sandbox" ": '{}'".format(e),
- detail=e.detail,
- reason=e.reason,
+ "Error while staging dependencies into a sandbox" ": '{}'".format(e), detail=e.detail, reason=e.reason,
) from e
# _export_artifact()
@@ -729,9 +652,7 @@ class Stream:
#
def artifact_show(self, targets, *, selection=PipelineSelection.NONE):
# Obtain list of Element and/or ArtifactElement objects
- target_objects = self.load_selection(
- targets, selection=selection, use_artifact_config=True, load_refs=True
- )
+ target_objects = self.load_selection(targets, selection=selection, use_artifact_config=True, load_refs=True)
if self._artifacts.has_fetch_remotes():
self._pipeline.check_remotes(target_objects)
@@ -756,9 +677,7 @@ class Stream:
#
def artifact_log(self, targets):
# Return list of Element and/or ArtifactElement objects
- target_objects = self.load_selection(
- targets, selection=PipelineSelection.NONE, load_refs=True
- )
+ target_objects = self.load_selection(targets, selection=PipelineSelection.NONE, load_refs=True)
artifact_logs = {}
for obj in target_objects:
@@ -767,9 +686,7 @@ class Stream:
self._message(MessageType.WARN, "{} is not cached".format(ref))
continue
elif not obj._cached_logs():
- self._message(
- MessageType.WARN, "{} is cached without log files".format(ref)
- )
+ self._message(MessageType.WARN, "{} is cached without log files".format(ref))
continue
artifact_logs[obj.name] = obj.get_logs()
@@ -788,9 +705,7 @@ class Stream:
#
def artifact_list_contents(self, targets):
# Return list of Element and/or ArtifactElement objects
- target_objects = self.load_selection(
- targets, selection=PipelineSelection.NONE, load_refs=True
- )
+ target_objects = self.load_selection(targets, selection=PipelineSelection.NONE, load_refs=True)
elements_to_files = {}
for obj in target_objects:
@@ -814,9 +729,7 @@ class Stream:
#
def artifact_delete(self, targets, *, selection=PipelineSelection.NONE):
# Return list of Element and/or ArtifactElement objects
- target_objects = self.load_selection(
- targets, selection=selection, load_refs=True
- )
+ target_objects = self.load_selection(targets, selection=selection, load_refs=True)
# Some of the targets may refer to the same key, so first obtain a
# set of the refs to be removed.
@@ -869,9 +782,7 @@ class Stream:
self._check_location_writable(location, force=force, tar=tar)
- elements, _ = self._load(
- (target,), (), selection=deps, except_targets=except_targets
- )
+ elements, _ = self._load((target,), (), selection=deps, except_targets=except_targets)
# Assert all sources are cached in the source dir
self._fetch(elements)
@@ -879,14 +790,10 @@ class Stream:
# Stage all sources determined by scope
try:
- self._source_checkout(
- elements, location, force, deps, tar, compression, include_build_scripts
- )
+ self._source_checkout(elements, location, force, deps, tar, compression, include_build_scripts)
except BstError as e:
raise StreamError(
- "Error while writing sources" ": '{}'".format(e),
- detail=e.detail,
- reason=e.reason,
+ "Error while writing sources" ": '{}'".format(e), detail=e.detail, reason=e.reason,
) from e
self._message(MessageType.INFO, "Checked out sources to '{}'".format(location))
@@ -934,18 +841,12 @@ class Stream:
for target in elements:
if not list(target.sources()):
- build_depends = [
- x.name for x in target.dependencies(Scope.BUILD, recurse=False)
- ]
+ build_depends = [x.name for x in target.dependencies(Scope.BUILD, recurse=False)]
if not build_depends:
- raise StreamError(
- "The element {} has no sources".format(target.name)
- )
+ raise StreamError("The element {} has no sources".format(target.name))
detail = "Try opening a workspace on one of its dependencies instead:\n"
detail += " \n".join(build_depends)
- raise StreamError(
- "The element {} has no sources".format(target.name), detail=detail
- )
+ raise StreamError("The element {} has no sources".format(target.name), detail=detail)
# Check for workspace config
workspace = workspaces.get_workspace(target._get_full_name())
@@ -962,16 +863,10 @@ class Stream:
target.name, workspace.get_absolute_path()
)
)
- self.workspace_close(
- target._get_full_name(), remove_dir=not no_checkout
- )
+ self.workspace_close(target._get_full_name(), remove_dir=not no_checkout)
target_consistency = target._get_consistency()
- if (
- not no_checkout
- and target_consistency < Consistency.CACHED
- and target_consistency._source_cached()
- ):
+ if not no_checkout and target_consistency < Consistency.CACHED and target_consistency._source_cached():
raise StreamError(
"Could not stage uncached source. For {} ".format(target.name)
+ "Use `--track` to track and "
@@ -980,9 +875,7 @@ class Stream:
)
if not custom_dir:
- directory = os.path.abspath(
- os.path.join(self._context.workspacedir, target.name)
- )
+ directory = os.path.abspath(os.path.join(self._context.workspacedir, target.name))
if directory[-4:] == ".bst":
directory = directory[:-4]
expanded_directories.append(directory)
@@ -1006,17 +899,13 @@ class Stream:
if os.path.exists(directory):
if not os.path.isdir(directory):
raise StreamError(
- "For element '{}', Directory path is not a directory: {}".format(
- target.name, directory
- ),
+ "For element '{}', Directory path is not a directory: {}".format(target.name, directory),
reason="bad-directory",
)
if not (no_checkout or force) and os.listdir(directory):
raise StreamError(
- "For element '{}', Directory path is not empty: {}".format(
- target.name, directory
- ),
+ "For element '{}', Directory path is not empty: {}".format(target.name, directory),
reason="bad-directory",
)
if os.listdir(directory):
@@ -1028,8 +917,7 @@ class Stream:
targetGenerator = zip(elements, expanded_directories)
for target, directory in targetGenerator:
self._message(
- MessageType.INFO,
- "Creating workspace for element {}".format(target.name),
+ MessageType.INFO, "Creating workspace for element {}".format(target.name),
)
workspace = workspaces.get_workspace(target._get_full_name())
@@ -1040,22 +928,15 @@ class Stream:
try:
os.makedirs(directory, exist_ok=True)
except OSError as e:
- todo_elements = " ".join(
- [str(target.name) for target, directory_dict in targetGenerator]
- )
+ todo_elements = " ".join([str(target.name) for target, directory_dict in targetGenerator])
if todo_elements:
# This output should make creating the remaining workspaces as easy as possible.
- todo_elements = (
- "\nDid not try to create workspaces for " + todo_elements
- )
- raise StreamError(
- "Failed to create workspace directory: {}".format(e) + todo_elements
- ) from e
+ todo_elements = "\nDid not try to create workspaces for " + todo_elements
+ raise StreamError("Failed to create workspace directory: {}".format(e) + todo_elements) from e
workspaces.create_workspace(target, directory, checkout=not no_checkout)
self._message(
- MessageType.INFO,
- "Created a workspace for element: {}".format(target._get_full_name()),
+ MessageType.INFO, "Created a workspace for element: {}".format(target._get_full_name()),
)
# workspace_close
@@ -1078,11 +959,7 @@ class Stream:
try:
shutil.rmtree(workspace.get_absolute_path())
except OSError as e:
- raise StreamError(
- "Could not remove '{}': {}".format(
- workspace.get_absolute_path(), e
- )
- ) from e
+ raise StreamError("Could not remove '{}': {}".format(workspace.get_absolute_path(), e)) from e
# Delete the workspace and save the configuration
workspaces.delete_workspace(element_name)
@@ -1102,10 +979,7 @@ class Stream:
def workspace_reset(self, targets, *, soft, track_first):
elements, _ = self._load(
- targets,
- [],
- selection=PipelineSelection.REDIRECT,
- track_selection=PipelineSelection.REDIRECT,
+ targets, [], selection=PipelineSelection.REDIRECT, track_selection=PipelineSelection.REDIRECT,
)
nonexisting = []
@@ -1123,10 +997,7 @@ class Stream:
if soft:
workspace.prepared = False
self._message(
- MessageType.INFO,
- "Reset workspace state for {} at: {}".format(
- element.name, workspace_path
- ),
+ MessageType.INFO, "Reset workspace state for {} at: {}".format(element.name, workspace_path),
)
continue
@@ -1218,10 +1089,7 @@ class Stream:
output_elements.add(e)
if load_elements:
loaded_elements, _ = self._load(
- load_elements,
- (),
- selection=PipelineSelection.REDIRECT,
- track_selection=PipelineSelection.REDIRECT,
+ load_elements, (), selection=PipelineSelection.REDIRECT, track_selection=PipelineSelection.REDIRECT,
)
for e in loaded_elements:
@@ -1379,15 +1247,9 @@ class Stream:
if target_artifacts:
if not load_refs:
detail = "\n".join(target_artifacts)
- raise ArtifactElementError(
- "Cannot perform this operation with artifact refs:", detail=detail
- )
+ raise ArtifactElementError("Cannot perform this operation with artifact refs:", detail=detail)
if selection in (PipelineSelection.ALL, PipelineSelection.RUN):
- raise StreamError(
- "Error: '--deps {}' is not supported for artifact refs".format(
- selection
- )
- )
+ raise StreamError("Error: '--deps {}' is not supported for artifact refs".format(selection))
# Load rewritable if we have any tracking selection to make
rewritable = False
@@ -1402,12 +1264,7 @@ class Stream:
track_except_targets,
]
if any(loadable):
- (
- elements,
- except_elements,
- track_elements,
- track_except_elements,
- ) = self._pipeline.load(
+ (elements, except_elements, track_elements, track_except_elements,) = self._pipeline.load(
loadable, rewritable=rewritable, ignore_workspaces=ignore_workspaces
)
else:
@@ -1419,9 +1276,7 @@ class Stream:
)
# Load all target artifacts
- artifacts = (
- self._pipeline.load_artifacts(target_artifacts) if target_artifacts else []
- )
+ artifacts = self._pipeline.load_artifacts(target_artifacts) if target_artifacts else []
# Optionally filter out junction elements
if ignore_junction_targets:
@@ -1437,10 +1292,7 @@ class Stream:
# This can happen with `bst build --track`
#
if targets and not self._pipeline.targets_include(elements, track_elements):
- raise StreamError(
- "Specified tracking targets that are not "
- "within the scope of primary targets"
- )
+ raise StreamError("Specified tracking targets that are not " "within the scope of primary targets")
# First take care of marking tracking elements, this must be
# done before resolving element states.
@@ -1462,14 +1314,10 @@ class Stream:
for project, project_elements in track_projects.items():
selected = self._pipeline.get_selection(project_elements, track_selection)
- selected = self._pipeline.track_cross_junction_filter(
- project, selected, track_cross_junctions
- )
+ selected = self._pipeline.track_cross_junction_filter(project, selected, track_cross_junctions)
track_selected.extend(selected)
- track_selected = self._pipeline.except_elements(
- track_elements, track_selected, track_except_elements
- )
+ track_selected = self._pipeline.except_elements(track_elements, track_selected, track_except_elements)
for element in track_selected:
element._schedule_tracking()
@@ -1483,20 +1331,14 @@ class Stream:
project.ensure_fully_loaded()
# Connect to remote caches, this needs to be done before resolving element state
- self._artifacts.setup_remotes(
- use_config=use_artifact_config, remote_url=artifact_remote_url
- )
- self._sourcecache.setup_remotes(
- use_config=use_source_config, remote_url=source_remote_url
- )
+ self._artifacts.setup_remotes(use_config=use_artifact_config, remote_url=artifact_remote_url)
+ self._sourcecache.setup_remotes(use_config=use_source_config, remote_url=source_remote_url)
# Now move on to loading primary selection.
#
self._pipeline.resolve_elements(self.targets)
selected = self._pipeline.get_selection(self.targets, selection, silent=False)
- selected = self._pipeline.except_elements(
- self.targets, selected, except_elements
- )
+ selected = self._pipeline.except_elements(self.targets, selected, except_elements)
if selection == PipelineSelection.PLAN and dynamic_plan:
# We use a dynamic build plan, only request artifacts of top-level targets,
@@ -1557,9 +1399,7 @@ class Stream:
# unique_id (str): A unique_id to load an Element instance
#
def _failure_retry(self, action_name, unique_id):
- notification = Notification(
- NotificationType.RETRY, job_action=action_name, element=unique_id
- )
+ notification = Notification(NotificationType.RETRY, job_action=action_name, element=unique_id)
self._notify(notification)
# _run()
@@ -1576,9 +1416,7 @@ class Stream:
if self._session_start_callback is not None:
self._session_start_callback()
- status = self._scheduler.run(
- self.queues, self._context.get_cascache().get_casd_process()
- )
+ status = self._scheduler.run(self.queues, self._context.get_cascache().get_casd_process())
if status == SchedStatus.ERROR:
raise StreamError()
@@ -1643,17 +1481,11 @@ class Stream:
try:
os.makedirs(location, exist_ok=True)
except OSError as e:
- raise StreamError(
- "Failed to create destination directory: '{}'".format(e)
- ) from e
+ raise StreamError("Failed to create destination directory: '{}'".format(e)) from e
if not os.access(location, os.W_OK):
- raise StreamError(
- "Destination directory '{}' not writable".format(location)
- )
+ raise StreamError("Destination directory '{}' not writable".format(location))
if not force and os.listdir(location):
- raise StreamError(
- "Destination directory '{}' not empty".format(location)
- )
+ raise StreamError("Destination directory '{}' not empty".format(location))
elif os.path.exists(location) and location != "-":
if not os.access(location, os.W_OK):
raise StreamError("Output file '{}' not writable".format(location))
@@ -1666,9 +1498,7 @@ class Stream:
try:
utils.safe_remove(directory)
except OSError as e:
- raise StreamError(
- "Failed to remove checkout directory: {}".format(e)
- ) from e
+ raise StreamError("Failed to remove checkout directory: {}".format(e)) from e
sandbox_vroot.export_files(directory, can_link=True, can_destroy=True)
@@ -1698,9 +1528,7 @@ class Stream:
else:
self._move_directory(temp_source_dir.name, location, force)
except OSError as e:
- raise StreamError(
- "Failed to checkout sources to {}: {}".format(location, e)
- ) from e
+ raise StreamError("Failed to checkout sources to {}: {}".format(location, e)) from e
finally:
with suppress(FileNotFoundError):
temp_source_dir.cleanup()
@@ -1819,11 +1647,7 @@ class Stream:
for element in elements:
# Check if element is partially cached without its buildtree, as the element
# artifact may not be cached at all
- if (
- element._cached()
- and not element._cached_buildtree()
- and element._buildtree_exists()
- ):
+ if element._cached() and not element._cached_buildtree() and element._buildtree_exists():
required_list.append(element)
return required_list
@@ -1877,10 +1701,7 @@ class Stream:
artifact_refs.extend(self._artifacts.list_artifacts(glob=glob))
if not artifact_refs:
self._message(
- MessageType.WARN,
- "No artifacts found for globs: {}".format(
- ", ".join(artifact_globs)
- ),
+ MessageType.WARN, "No artifacts found for globs: {}".format(", ".join(artifact_globs)),
)
return element_targets, artifact_refs
@@ -1897,16 +1718,12 @@ class Stream:
elif notification.notification_type == NotificationType.TICK:
self._ticker_callback()
elif notification.notification_type == NotificationType.JOB_START:
- self._state.add_task(
- notification.job_action, notification.full_name, notification.time
- )
+ self._state.add_task(notification.job_action, notification.full_name, notification.time)
elif notification.notification_type == NotificationType.JOB_COMPLETE:
self._state.remove_task(notification.job_action, notification.full_name)
if notification.job_status == JobStatus.FAIL:
self._state.fail_task(
- notification.job_action,
- notification.full_name,
- notification.element,
+ notification.job_action, notification.full_name, notification.element,
)
elif notification.notification_type == NotificationType.SCHED_START_TIME:
self._starttime = notification.time