summaryrefslogtreecommitdiff
path: root/buildstream/_stream.py
diff options
context:
space:
mode:
authorTristan Van Berkom <tristan.vanberkom@codethink.co.uk>2018-05-03 15:14:38 +0900
committerTristan Van Berkom <tristan.vanberkom@codethink.co.uk>2018-05-08 03:59:38 +0900
commit5bdc0a79d1fb67d2da552a902163dec450ff292c (patch)
treeee63ed73e90dbc9aba1d08971e54016bb7839708 /buildstream/_stream.py
parentb8e15706a51272e4f4e116d9e373fd2581102868 (diff)
downloadbuildstream-5bdc0a79d1fb67d2da552a902163dec450ff292c.tar.gz
_stream.py, _pipeline.py: Refactoring of the pipeline itself
Here the pipeline becomes essentially stateless, some dangling state remains to be factored out because of frontend accesses which will be changed in a later commit. Essentially, the Pipeline.load() method no longer has any knowledge of the specific purposes of the loaded targets, and now takes a list of target groups and returns a corresponding list of element groups. The Stream() business logic methods now use other pipeline helper methods to create and filter lists from the loaded target elements. The Stream() also finally absorbs the Scheduler frontend facing APIs. However Queues are still exposed on the Stream object for logging purposes and through callbacks such that the frontend can retry elements.
Diffstat (limited to 'buildstream/_stream.py')
-rw-r--r--buildstream/_stream.py688
1 files changed, 410 insertions, 278 deletions
diff --git a/buildstream/_stream.py b/buildstream/_stream.py
index 62d9f9804..c8d0bb69c 100644
--- a/buildstream/_stream.py
+++ b/buildstream/_stream.py
@@ -17,19 +17,22 @@
#
# Authors:
# Tristan Van Berkom <tristan.vanberkom@codethink.co.uk>
+# Jürg Billeter <juerg.billeter@codethink.co.uk>
+# Tristan Maat <tristan.maat@codethink.co.uk>
+
import os
import stat
import shlex
import shutil
import tarfile
+from contextlib import contextmanager
from tempfile import TemporaryDirectory
-from ._exceptions import StreamError, ImplError, BstError
+from ._exceptions import StreamError, ImplError, BstError, set_last_task_error
from ._message import Message, MessageType
-from ._scheduler import SchedStatus, TrackQueue, FetchQueue, BuildQueue, PullQueue, PushQueue
+from ._scheduler import Scheduler, SchedStatus, TrackQueue, FetchQueue, BuildQueue, PullQueue, PushQueue
from ._pipeline import Pipeline, PipelineSelection
from ._platform import Platform
-from ._profile import Topics, profile_start, profile_end
from . import utils, _yaml, _site
from . import Scope, Consistency
@@ -41,25 +44,46 @@ from . import Scope, Consistency
# Args:
# context (Context): The Context object
# project (Project): The Project object
-# loaded_callback (callable): A callback to invoke when the pipeline is loaded
+# session_start (datetime): The time when the session started
+# session_start_callback (callable): A callback to invoke when the session starts
+# interrupt_callback (callable): A callback to invoke when we get interrupted
+# ticker_callback (callable): Invoked every second while running the scheduler
+# job_start_callback (callable): Called when a job starts
+# job_complete_callback (callable): Called when a job completes
#
class Stream():
- def __init__(self, context, project, loaded_callback):
- self.session_elements = 0 # Number of elements to process in this session
- self.total_elements = 0 # Number of total potential elements for this pipeline
-
- self._context = context
- self._project = project
- self._scheduler = None
- self._pipeline = None
+ def __init__(self, context, project, session_start, *,
+ session_start_callback=None,
+ interrupt_callback=None,
+ ticker_callback=None,
+ job_start_callback=None,
+ job_complete_callback=None):
- self._loaded_cb = loaded_callback
+ #
+ # Public members
+ #
+ self.targets = [] # Resolved target elements
+ self.session_elements = [] # List of elements being processed this session
+ self.total_elements = [] # Total list of elements based on targets
+ self.queues = [] # Queue objects
- # Load selected platform
+ #
+ # Private members
+ #
Platform.create_instance(context, project)
self._platform = Platform.get_platform()
self._artifacts = self._platform.artifactcache
+ self._context = context
+ self._project = project
+ self._pipeline = Pipeline(context, project, self._artifacts)
+ self._scheduler = Scheduler(context, session_start,
+ interrupt_callback=interrupt_callback,
+ ticker_callback=ticker_callback,
+ job_start_callback=job_start_callback,
+ job_complete_callback=job_complete_callback)
+ self._first_non_track_queue = None
+ self._session_start_callback = session_start_callback
# cleanup()
#
@@ -81,14 +105,18 @@ class Stream():
# except_targets (list of str): Specified targets to except from fetching
# downloadable (bool): Whether the downloadable state of elements should be resolved
#
+ # Returns:
+ # (list of Element): The selected elements
def load_selection(self, targets, *,
selection=PipelineSelection.NONE,
except_targets=(),
downloadable=False):
- self.init_pipeline(targets, except_=except_targets,
- use_configured_remote_caches=downloadable,
- fetch_subprojects=False)
- return self._pipeline.get_selection(selection)
+ elements, _ = self._load(targets, (),
+ selection=selection,
+ except_targets=except_targets,
+ use_artifact_config=downloadable,
+ fetch_subprojects=False)
+ return elements
# shell()
#
@@ -118,7 +146,7 @@ class Stream():
if directory is None:
missing_deps = [
dep._get_full_name()
- for dep in self._pipeline.dependencies(scope)
+ for dep in self._pipeline.dependencies([element], scope)
if not dep._cached()
]
if missing_deps:
@@ -145,66 +173,47 @@ class Stream():
track_cross_junctions=False,
build_all=False):
- rewritable = False
- if track_targets:
- rewritable = True
+ if build_all or track_targets:
+ selection = PipelineSelection.ALL
+ else:
+ selection = PipelineSelection.PLAN
- self.init_pipeline(targets,
- except_=track_except,
- rewritable=rewritable,
- use_configured_remote_caches=True,
- track_elements=track_targets,
- track_cross_junctions=track_cross_junctions,
- fetch_subprojects=True)
+ elements, track_elements = \
+ self._load(targets, track_targets,
+ selection=selection, track_selection=PipelineSelection.ALL,
+ track_except_targets=track_except,
+ track_cross_junctions=track_cross_junctions,
+ use_artifact_config=True,
+ fetch_subprojects=True)
- if build_all:
- plan = self._pipeline.dependencies(Scope.ALL)
- else:
- plan = self._pipeline._plan(except_=False)
-
- # We want to start the build queue with any elements that are
- # not being tracked first
- track_elements = set(self._pipeline._track_elements)
- plan = [e for e in plan if e not in track_elements]
-
- # Assert that we have a consistent pipeline now (elements in
- # track_plan will be made consistent)
- self._pipeline._assert_consistent(plan)
-
- fetch = FetchQueue(self._scheduler, skip_cached=True)
- build = BuildQueue(self._scheduler)
- track = None
- pull = None
- push = None
- queues = []
- if self._pipeline._track_elements:
- track = TrackQueue(self._scheduler)
- queues.append(track)
- if self._pipeline._artifacts.has_fetch_remotes():
- pull = PullQueue(self._scheduler)
- queues.append(pull)
- queues.append(fetch)
- queues.append(build)
- if self._pipeline._artifacts.has_push_remotes():
- push = PushQueue(self._scheduler)
- queues.append(push)
-
- # If we're going to track, tracking elements go into the first queue
- # which is the tracking queue, the rest of the plan goes into the next
- # queue (whatever that happens to be)
- if track:
- queues[0].enqueue(self._pipeline._track_elements)
- queues[1].enqueue(plan)
- else:
- queues[0].enqueue(plan)
+ # Remove the tracking elements from the main targets
+ elements = self._pipeline.subtract_elements(elements, track_elements)
- self.session_elements = len(self._pipeline._track_elements) + len(plan)
+ # Assert that the elements we're not going to track are consistent
+ self._pipeline.assert_consistent(elements)
- _, status = self._scheduler.run(queues)
- if status == SchedStatus.ERROR:
- raise StreamError()
- elif status == SchedStatus.TERMINATED:
- raise StreamError(terminated=True)
+ # Now construct the queues
+ #
+ track_queue = None
+ if track_elements:
+ track_queue = TrackQueue(self._scheduler)
+ self._add_queue(track_queue, track=True)
+
+ if self._artifacts.has_fetch_remotes():
+ self._add_queue(PullQueue(self._scheduler))
+
+ self._add_queue(FetchQueue(self._scheduler, skip_cached=True))
+ self._add_queue(BuildQueue(self._scheduler))
+
+ if self._artifacts.has_push_remotes():
+ self._add_queue(PushQueue(self._scheduler))
+
+ # Enqueue elements
+ #
+ if track_elements:
+ self._enqueue_plan(track_elements, queue=track_queue)
+ self._enqueue_plan(elements)
+ self._run()
# fetch()
#
@@ -223,21 +232,25 @@ class Stream():
track_targets=False,
track_cross_junctions=False):
- rewritable = False
if track_targets:
- rewritable = True
-
- self.init_pipeline(targets,
- except_=except_targets,
- rewritable=rewritable,
- track_elements=targets if track_targets else None,
- track_cross_junctions=track_cross_junctions,
- fetch_subprojects=True)
+ track_targets = targets
+ track_selection = selection
+ track_except_targets = except_targets
+ else:
+ track_targets = ()
+ track_selection = PipelineSelection.NONE
+ track_except_targets = ()
- fetch_plan = self._pipeline.get_selection(selection)
+ elements, track_elements = \
+ self._load(targets, track_targets,
+ selection=selection, track_selection=track_selection,
+ except_targets=except_targets,
+ track_except_targets=track_except_targets,
+ track_cross_junctions=track_cross_junctions,
+ fetch_subprojects=True)
- # Delegated to a shared method for now
- self._do_fetch(fetch_plan)
+ # Delegated to a shared fetch method
+ self._fetch(elements, track_elements=track_elements)
# track()
#
@@ -255,26 +268,20 @@ class Stream():
def track(self, targets, *,
selection=PipelineSelection.NONE,
except_targets=None,
- track_targets=False,
cross_junctions=False):
- self.init_pipeline(targets,
- except_=except_targets,
- rewritable=True,
- track_elements=targets,
- track_cross_junctions=cross_junctions,
- track_selection=selection,
- fetch_subprojects=True)
+ _, elements = \
+ self._load(targets, targets,
+ selection=selection, track_selection=selection,
+ except_targets=except_targets,
+ track_except_targets=except_targets,
+ track_cross_junctions=cross_junctions,
+ fetch_subprojects=True)
- track = TrackQueue(self._scheduler)
- track.enqueue(self._pipeline._track_elements)
- self.session_elements = len(self._pipeline._track_elements)
-
- _, status = self._scheduler.run([track])
- if status == SchedStatus.ERROR:
- raise StreamError()
- elif status == SchedStatus.TERMINATED:
- raise StreamError(terminated=True)
+ track_queue = TrackQueue(self._scheduler)
+ self._add_queue(track_queue, track=True)
+ self._enqueue_plan(elements, queue=track_queue)
+ self._run()
# pull()
#
@@ -292,33 +299,23 @@ class Stream():
selection=PipelineSelection.NONE,
remote=None):
- use_configured_remote_caches = True
- if remote is not None:
- use_configured_remote_caches = False
+ use_config = True
+ if remote:
+ use_config = False
- self.init_pipeline(targets,
- use_configured_remote_caches=use_configured_remote_caches,
- add_remote_cache=remote,
- fetch_subprojects=True)
+ elements, _ = self._load(targets, (),
+ selection=selection,
+ use_artifact_config=use_config,
+ artifact_remote_url=remote,
+ fetch_subprojects=True)
- elements = self._pipeline.get_selection(selection)
-
- if not self._pipeline._artifacts.has_fetch_remotes():
+ if not self._artifacts.has_fetch_remotes():
raise StreamError("No artifact caches available for pulling artifacts")
- plan = elements
- self._pipeline._assert_consistent(plan)
- self._pipeline.session_elements = len(plan)
-
- pull = PullQueue(self._scheduler)
- pull.enqueue(plan)
- queues = [pull]
-
- _, status = self._scheduler.run(queues)
- if status == SchedStatus.ERROR:
- raise StreamError()
- elif status == SchedStatus.TERMINATED:
- raise StreamError(terminated=True)
+ self._pipeline.assert_consistent(elements)
+ self._add_queue(PullQueue(self._scheduler))
+ self._enqueue_plan(elements)
+ self._run()
# push()
#
@@ -336,33 +333,23 @@ class Stream():
selection=PipelineSelection.NONE,
remote=None):
- use_configured_remote_caches = True
- if remote is not None:
- use_configured_remote_caches = False
-
- self.init_pipeline(targets,
- use_configured_remote_caches=use_configured_remote_caches,
- add_remote_cache=remote,
- fetch_subprojects=True)
+ use_config = True
+ if remote:
+ use_config = False
- elements = self._pipeline.get_selection(selection)
+ elements, _ = self._load(targets, (),
+ selection=selection,
+ use_artifact_config=use_config,
+ artifact_remote_url=remote,
+ fetch_subprojects=True)
- if not self._pipeline._artifacts.has_push_remotes():
+ if not self._artifacts.has_push_remotes():
raise StreamError("No artifact caches available for pushing artifacts")
- plan = elements
- self._pipeline._assert_consistent(plan)
- self._pipeline.session_elements = len(plan)
-
- push = PushQueue(self._scheduler)
- push.enqueue(plan)
- queues = [push]
-
- _, status = self._scheduler.run(queues)
- if status == SchedStatus.ERROR:
- raise StreamError()
- elif status == SchedStatus.TERMINATED:
- raise StreamError(terminated=True)
+ self._pipeline.assert_consistent(elements)
+ self._add_queue(PushQueue(self._scheduler))
+ self._enqueue_plan(elements)
+ self._run()
# checkout()
#
@@ -382,10 +369,9 @@ class Stream():
integrate=True,
hardlinks=False):
- self.init_pipeline((target,), fetch_subprojects=True)
-
# We only have one target in a checkout command
- target = self._pipeline.targets[0]
+ elements, _ = self._load((target,), (), fetch_subprojects=True)
+ target = elements[0]
try:
os.makedirs(directory, exist_ok=True)
@@ -433,13 +419,13 @@ class Stream():
track_first,
force):
- self.init_pipeline((target,),
- track_elements=[target] if track_first else None,
- track_selection=PipelineSelection.NONE,
- rewritable=track_first,
- fetch_subprojects=False)
+ if track_first:
+ track_targets = (target,)
+ else:
+ track_targets = ()
- target = self._pipeline.targets[0]
+ elements, track_elements = self._load((target,), track_targets)
+ target = elements[0]
workdir = os.path.abspath(directory)
if not list(target.sources()):
@@ -459,11 +445,11 @@ class Stream():
# If we're going to checkout, we need at least a fetch,
# if we were asked to track first, we're going to fetch anyway.
#
- # For now, tracking is handled by _do_fetch() automatically
- # by virtue of our screwed up pipeline initialization stuff.
- #
if not no_checkout or track_first:
- self._do_fetch([target])
+ track_elements = []
+ if track_first:
+ track_elements = elements
+ self._fetch(elements, track_elements=track_elements)
if not no_checkout and target._get_consistency() != Consistency.CACHED:
raise StreamError("Could not stage uncached source. " +
@@ -522,34 +508,35 @@ class Stream():
#
def workspace_reset(self, targets, *, track_first):
- self.init_pipeline(targets,
- track_elements=targets if track_first else None,
- track_selection=PipelineSelection.NONE,
- rewritable=track_first,
- fetch_subprojects=False)
+ if track_first:
+ track_targets = targets
+ else:
+ track_targets = ()
+
+ elements, track_elements = self._load(targets, track_targets)
# Do the tracking first
if track_first:
- self._do_fetch(self._pipeline.targets)
+ self._fetch(elements, track_elements=track_elements)
- for target in self._pipeline.targets:
- workspace = self._project.workspaces.get_workspace(target.name)
+ for element in elements:
+ workspace = self._project.workspaces.get_workspace(element.name)
- with target.timed_activity("Removing workspace directory {}"
- .format(workspace.path)):
+ with element.timed_activity("Removing workspace directory {}"
+ .format(workspace.path)):
try:
shutil.rmtree(workspace.path)
except OSError as e:
raise StreamError("Could not remove '{}': {}"
.format(workspace.path, e)) from e
- self._project.workspaces.delete_workspace(target.name)
- self._project.workspaces.create_workspace(target.name, workspace.path)
+ self._project.workspaces.delete_workspace(element.name)
+ self._project.workspaces.create_workspace(element.name, workspace.path)
- with target.timed_activity("Staging sources to {}".format(workspace.path)):
- target._open_workspace()
+ with element.timed_activity("Staging sources to {}".format(workspace.path)):
+ element._open_workspace()
- self._message(MessageType.INFO, "Reset workspace for {} at: {}".format(target.name, workspace.path))
+ self._message(MessageType.INFO, "Reset workspace for {} at: {}".format(element.name, workspace.path))
self._project.workspaces.save_config()
@@ -609,15 +596,20 @@ class Stream():
force=False,
compression="gz"):
- self.init_pipeline((target,),
- track_elements=[target] if track_first else None,
- track_selection=PipelineSelection.NONE,
- rewritable=track_first,
- fetch_subprojects=True)
+ if track_first:
+ track_targets = (target,)
+ else:
+ track_targets = ()
+
+ elements, track_elements = self._load((target,), track_targets,
+ selection=PipelineSelection.ALL,
+ track_selection=PipelineSelection.ALL,
+ fetch_subprojects=True)
# source-bundle only supports one target
- target = self._pipeline.targets[0]
- dependencies = self._pipeline.get_selection(PipelineSelection.ALL)
+ target = self.targets[0]
+
+ self._message(MessageType.INFO, "Bundling sources for target {}".format(target.name))
# Find the correct filename for the compression algorithm
tar_location = os.path.join(directory, target.normal_name + ".tar")
@@ -635,14 +627,15 @@ class Stream():
raise StreamError("Cannot write to {0}: {1}"
.format(tar_location, e)) from e
- plan = list(dependencies)
- self._do_fetch(plan)
+ # Fetch and possibly track first
+ #
+ self._fetch(elements, track_elements=track_elements)
# We don't use the scheduler for this as it is almost entirely IO
# bound.
# Create a temporary directory to build the source tree in
- builddir = target._get_context().builddir
+ builddir = self._context.builddir
prefix = "{}-".format(target.normal_name)
with TemporaryDirectory(prefix=prefix, dir=builddir) as tempdir:
@@ -655,18 +648,162 @@ class Stream():
# Any elements that don't implement _write_script
# should not be included in the later stages.
- plan = [element for element in plan
- if self._write_element_script(source_directory, element)]
+ elements = [
+ element for element in elements
+ if self._write_element_script(source_directory, element)
+ ]
- self._write_element_sources(tempdir, plan)
- self._write_build_script(tempdir, plan)
+ self._write_element_sources(tempdir, elements)
+ self._write_build_script(tempdir, elements)
self._collect_sources(tempdir, tar_location,
target.normal_name, compression)
#############################################################
- # Private Methods #
+ # Scheduler API forwarding #
+ #############################################################
+
+ # running
+ #
+ # Whether the scheduler is running
+ #
+ @property
+ def running(self):
+ return self._scheduler.loop is not None
+
+ # suspended
+ #
+ # Whether the scheduler is currently suspended
+ #
+ @property
+ def suspended(self):
+ return self._scheduler.suspended
+
+ # terminated
+ #
+ # Whether the scheduler is currently terminated
+ #
+ @property
+ def terminated(self):
+ return self._scheduler.terminated
+
+ # elapsed_time
+ #
+ # Elapsed time since the session start
+ #
+ @property
+ def elapsed_time(self):
+ return self._scheduler.elapsed_time()
+
+ # terminate()
+ #
+ # Terminate jobs
+ #
+ def terminate(self):
+ self._scheduler.terminate_jobs()
+
+ # quit()
+ #
+ # Quit the session, this will continue with any ongoing
+ # jobs, use Stream.terminate() instead for cancellation
+ # of ongoing jobs
+ #
+ def quit(self):
+ self._scheduler.stop_queueing()
+
+ # suspend()
+ #
+ # Context manager to suspend ongoing jobs
+ #
+ @contextmanager
+ def suspend(self):
+ with self._scheduler.jobs_suspended():
+ yield
+
+ #############################################################
+ # Private Methods #
#############################################################
+ # _load()
+ #
+ # A convenience method for loading element lists
+ #
+ # Args:
+ # targets (list of str): Main targets to load
+ # track_targets (list of str): Tracking targets
+ # selection (PipelineSelection): The selection mode for the specified targets
+ # track_selection (PipelineSelection): The selection mode for the specified tracking targets
+ # except_targets (list of str): Specified targets to except from fetching
+ # track_except_targets (list of str): Specified targets to except from fetching
+ # track_cross_junctions (bool): Whether tracking should cross junction boundaries
+ # use_artifact_config (bool): Whether to initialize artifacts with the config
+ # artifact_remote_url (bool): A remote url for initializing the artifacts
+ # fetch_subprojects (bool): Whether to fetch subprojects while loading
+ #
+ # Returns:
+ # (list of Element): The primary element selection
+ # (list of Element): The tracking element selection
+ #
+ def _load(self, targets, track_targets, *,
+ selection=PipelineSelection.NONE,
+ track_selection=PipelineSelection.NONE,
+ except_targets=(),
+ track_except_targets=(),
+ track_cross_junctions=False,
+ use_artifact_config=False,
+ artifact_remote_url=None,
+ fetch_subprojects=False):
+
+ # Load rewritable if we have any tracking selection to make
+ rewritable = False
+ if track_targets:
+ rewritable = True
+
+ # Load all targets
+ elements, except_elements, track_elements, track_except_elements = \
+ self._pipeline.load([targets, except_targets, track_targets, track_except_targets],
+ rewritable=rewritable,
+ fetch_subprojects=fetch_subprojects)
+
+ # Hold on to the targets
+ self.targets = elements
+
+ # Here we should raise an error if the track_elements targets
+ # are not dependencies of the primary targets, this is not
+ # supported.
+ #
+ # This can happen with `bst build --track`
+ #
+ if not self._pipeline.targets_include(elements, track_elements):
+ raise StreamError("Specified tracking targets that are not "
+ "within the scope of primary targets")
+
+ # First take care of marking tracking elements, this must be
+ # done before resolving element states.
+ #
+ assert track_selection != PipelineSelection.PLAN
+ track_selected = self._pipeline.get_selection(track_elements, track_selection)
+ track_selected = self._pipeline.except_elements(track_elements,
+ track_selected,
+ track_except_elements)
+ track_selected = self._pipeline.track_cross_junction_filter(track_selected,
+ track_cross_junctions)
+
+ for element in track_selected:
+ element._schedule_tracking()
+
+ # Connect to remote caches, this needs to be done before resolving element state
+ self._artifacts.setup_remotes(use_config=use_artifact_config, remote_url=artifact_remote_url)
+
+ # Now move on to loading primary selection.
+ #
+ self._pipeline.resolve_elements(elements)
+ selected = self._pipeline.get_selection(elements, selection)
+ selected = self._pipeline.except_elements(elements,
+ selected,
+ except_elements)
+
+ return selected, track_selected
+
# _message()
#
# Local message propagator
@@ -676,47 +813,103 @@ class Stream():
self._context.message(
Message(None, message_type, message, **args))
- # _do_fetch()
+ # _add_queue()
+ #
+ # Adds a queue to the stream
+ #
+ # Args:
+ # queue (Queue): Queue to add to the pipeline
+ # track (bool): Whether this is the tracking queue
+ #
+ def _add_queue(self, queue, *, track=False):
+ self.queues.append(queue)
+
+ if not (track or self._first_non_track_queue):
+ self._first_non_track_queue = queue
+
+ # _enqueue_plan()
+ #
+ # Enqueues planned elements to the specified queue.
+ #
+ # Args:
+ # plan (list of Element): The list of elements to be enqueued
+ # queue (Queue): The target queue, defaults to the first non-track queue
+ #
+ def _enqueue_plan(self, plan, *, queue=None):
+ queue = queue or self._first_non_track_queue
+
+ queue.enqueue(plan)
+ self.session_elements += plan
+
+ # _run()
+ #
+ # Common function for running the scheduler
+ #
+ def _run(self):
+
+ # Inform the frontend of the full list of elements
+ # and the list of elements which will be processed in this run
+ #
+ self.total_elements = list(self._pipeline.dependencies(self.targets, Scope.ALL))
+
+ if self._session_start_callback is not None:
+ self._session_start_callback()
+
+ _, status = self._scheduler.run(self.queues)
+
+ # Force update element states after a run, such that the summary
+ # is more coherent
+ try:
+ for element in self.total_elements:
+ element._update_state()
+ except BstError as e:
+ self._message(MessageType.ERROR, "Error resolving final state", detail=str(e))
+ set_last_task_error(e.domain, e.reason)
+ except Exception as e: # pylint: disable=broad-except
+ self._message(MessageType.BUG, "Unhandled exception while resolving final state", detail=str(e))
+
+ if status == SchedStatus.ERROR:
+ raise StreamError()
+ elif status == SchedStatus.TERMINATED:
+ raise StreamError(terminated=True)
+
+ # _fetch()
#
# Performs the fetch job, the body of this function is here because
# it is shared between a few internals.
#
# Args:
# elements (list of Element): Elements to fetch
+ # track_elements (list of Element): Elements to track
#
- def _do_fetch(self, elements):
+ def _fetch(self, elements, *, track_elements=None):
- fetch_plan = elements
+ if track_elements is None:
+ track_elements = []
# Subtract the track elements from the fetch elements, they will be added separately
- if self._pipeline._track_elements:
- track_elements = set(self._pipeline._track_elements)
- fetch_plan = [e for e in fetch_plan if e not in track_elements]
+ fetch_plan = self._pipeline.subtract_elements(elements, track_elements)
# Assert consistency for the fetch elements
- self._pipeline._assert_consistent(fetch_plan)
+ self._pipeline.assert_consistent(fetch_plan)
# Filter out elements with cached sources, only from the fetch plan
# let the track plan resolve new refs.
cached = [elt for elt in fetch_plan if elt._get_consistency() == Consistency.CACHED]
- fetch_plan = [elt for elt in fetch_plan if elt not in cached]
-
- self.session_elements = len(self._pipeline._track_elements) + len(fetch_plan)
+ fetch_plan = self._pipeline.subtract_elements(fetch_plan, cached)
- fetch = FetchQueue(self._scheduler)
- fetch.enqueue(fetch_plan)
- if self._pipeline._track_elements:
- track = TrackQueue(self._scheduler)
- track.enqueue(self._pipeline._track_elements)
- queues = [track, fetch]
- else:
- queues = [fetch]
+ # Construct queues, enqueue and run
+ #
+ track_queue = None
+ if track_elements:
+ track_queue = TrackQueue(self._scheduler)
+ self._add_queue(track_queue, track=True)
+ self._add_queue(FetchQueue(self._scheduler))
- _, status = self._scheduler.run(queues)
- if status == SchedStatus.ERROR:
- raise StreamError()
- elif status == SchedStatus.TERMINATED:
- raise StreamError(terminated=True)
+ if track_elements:
+ self._enqueue_plan(track_elements, queue=track_queue)
+ self._enqueue_plan(fetch_plan)
+ self._run()
# Helper function for checkout()
#
@@ -772,7 +965,7 @@ class Stream():
# Collect the sources in the given sandbox into a tarfile
def _collect_sources(self, directory, tar_name, element_name, compression):
- with self._pipeline.targets[0].timed_activity("Creating tarball {}".format(tar_name)):
+ with self._context.timed_activity("Creating tarball {}".format(tar_name)):
if compression == "none":
permissions = "w:"
else:
@@ -780,64 +973,3 @@ class Stream():
with tarfile.open(tar_name, permissions) as tar:
tar.add(directory, arcname=element_name)
-
- #############################################################
- # TEMPORARY CRAP #
- #############################################################
-
- # init_pipeline()
- #
- # Initialize the pipeline for a given activity
- #
- # Args:
- # elements (list of elements): The elements to load recursively
- # except_ (list of elements): The elements to except
- # rewritable (bool): Whether we should load the YAML files for roundtripping
- # use_configured_remote_caches (bool): Whether we should contact remotes
- # add_remote_cache (str): The URL for an explicitly mentioned remote cache
- # track_elements (list of elements): Elements which are to be tracked
- # track_cross_junctions (bool): Whether tracking is allowed to cross junction boundaries
- # track_selection (PipelineSelection): The selection algorithm for track elements
- # fetch_subprojects (bool): Whether to fetch subprojects while loading
- #
- # Note that the except_ argument may have a subtly different meaning depending
- # on the activity performed on the Pipeline. In normal circumstances the except_
- # argument excludes elements from the `elements` list. In a build session, the
- # except_ elements are excluded from the tracking plan.
- #
- def init_pipeline(self, elements, *,
- except_=tuple(),
- rewritable=False,
- use_configured_remote_caches=False,
- add_remote_cache=None,
- track_elements=None,
- track_cross_junctions=False,
- track_selection=PipelineSelection.ALL,
- fetch_subprojects=True):
-
- profile_start(Topics.LOAD_PIPELINE, "_".join(t.replace(os.sep, '-') for t in elements))
-
- self._pipeline = Pipeline(self._context, self._project, self._artifacts,
- elements, except_,
- rewritable=rewritable,
- fetch_subprojects=fetch_subprojects)
-
- # After loading the projects, but before resolving cache keys,
- # we need to initialize remote artifact caches where relevant
- #
- self._artifacts.setup_remotes(use_config=use_configured_remote_caches,
- remote_url=add_remote_cache)
-
- # Now complete the initialization
- #
- self._pipeline.initialize(track_elements=track_elements,
- track_cross_junctions=track_cross_junctions,
- track_selection=track_selection)
-
- profile_end(Topics.LOAD_PIPELINE, "_".join(t.replace(os.sep, '-') for t in elements))
-
- # Get the total
- self.total_elements = len(list(self._pipeline.dependencies(Scope.ALL)))
-
- if self._loaded_cb is not None:
- self._loaded_cb(self._pipeline)