summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTristan Van Berkom <tristan.vanberkom@codethink.co.uk>2018-04-12 23:25:51 +0900
committerTristan Van Berkom <tristan.vanberkom@codethink.co.uk>2018-04-13 00:01:36 +0900
commit39c4cd17b02f627c2a6c9c3d53fafedac8091e45 (patch)
tree11c6c65af8503617542857478c705d0a44ada3d7
parent4aa482ae70a117a6e7bed06e7603e0451778a204 (diff)
downloadbuildstream-39c4cd17b02f627c2a6c9c3d53fafedac8091e45.tar.gz
Fix disaster while making cross junction tracking optional.
This disaster was introduced in fece8cc81e8d8412e32c6667682a33e7d2f9dafe When doing `bst build --track`, we were: o first scheduling every element to be tracked o later filtering out the cross junction elements o finally asserting consistency state, which would trigger an error because we previously scheduled for tracking. Fixed this by moving all code which resolves elements to track into Pipeline.initialize(), and removing special element list handling from the individual build/fetch/track commands.
-rw-r--r--buildstream/_frontend/app.py8
-rw-r--r--buildstream/_frontend/cli.py14
-rw-r--r--buildstream/_pipeline.py146
3 files changed, 83 insertions, 85 deletions
diff --git a/buildstream/_frontend/app.py b/buildstream/_frontend/app.py
index fa6a1b800..29cedeba8 100644
--- a/buildstream/_frontend/app.py
+++ b/buildstream/_frontend/app.py
@@ -225,6 +225,7 @@ class App():
# use_configured_remote_caches (bool): Whether we should contact remotes
# add_remote_cache (str): The URL for an explicitly mentioned remote cache
# track_elements (list of elements): Elements which are to be tracked
+ # track_cross_junctions (bool): Whether tracking is allowed to cross junction boundaries
# fetch_subprojects (bool): Whether we should fetch subprojects as a part of the
# loading process, if they are not yet locally cached
#
@@ -240,7 +241,7 @@ class App():
def initialized(self, elements, *, session_name=None,
except_=tuple(), rewritable=False,
use_configured_remote_caches=False, add_remote_cache=None,
- track_elements=None, fetch_subprojects=False):
+ track_elements=None, track_cross_junctions=False, fetch_subprojects=False):
profile_start(Topics.LOAD_PIPELINE, "_".join(t.replace(os.sep, '-') for t in elements))
# Start with the early stage init, this enables logging right away
@@ -274,7 +275,8 @@ class App():
try:
self.pipeline.initialize(use_configured_remote_caches=use_configured_remote_caches,
add_remote_cache=add_remote_cache,
- track_elements=track_elements)
+ track_elements=track_elements,
+ track_cross_junctions=track_cross_junctions)
except BstError as e:
self.print_error(e, "Error initializing pipeline")
sys.exit(-1)
@@ -414,7 +416,7 @@ class App():
# If we're going to checkout, we need at least a fetch,
# if we were asked to track first, we're going to fetch anyway.
if not no_checkout or track_first:
- self.pipeline.fetch(self.scheduler, [target], track_first=track_first)
+ self.pipeline.fetch(self.scheduler, [target])
if not no_checkout and target._get_consistency() != Consistency.CACHED:
raise PipelineError("Could not stage uncached source. " +
diff --git a/buildstream/_frontend/cli.py b/buildstream/_frontend/cli.py
index 0234ebb4f..2490f59b7 100644
--- a/buildstream/_frontend/cli.py
+++ b/buildstream/_frontend/cli.py
@@ -232,9 +232,9 @@ def build(app, elements, all_, track_, track_save, track_all, track_except, trac
with app.initialized(elements, session_name="Build", except_=track_except, rewritable=rewritable,
use_configured_remote_caches=True, track_elements=track_,
+ track_cross_junctions=track_cross_junctions,
fetch_subprojects=True):
- app.pipeline.build(app.scheduler, build_all=all_, track_first=track_,
- track_cross_junctions=track_cross_junctions)
+ app.pipeline.build(app.scheduler, build_all=all_)
##################################################################
@@ -275,10 +275,10 @@ def fetch(app, elements, deps, track_, except_, track_cross_junctions):
with app.initialized(elements, session_name="Fetch", except_=except_, rewritable=track_,
track_elements=elements if track_ else None,
+ track_cross_junctions=track_cross_junctions,
fetch_subprojects=True):
dependencies = app.pipeline.deps_elements(deps)
- app.pipeline.fetch(app.scheduler, dependencies, track_first=track_,
- track_cross_junctions=track_cross_junctions)
+ app.pipeline.fetch(app.scheduler, dependencies)
##################################################################
@@ -310,9 +310,11 @@ def track(app, elements, deps, except_, cross_junctions):
all: All dependencies of all specified elements
"""
with app.initialized(elements, session_name="Track", except_=except_, rewritable=True,
- track_elements=elements, fetch_subprojects=True):
+ track_elements=elements,
+ track_cross_junctions=cross_junctions,
+ fetch_subprojects=True):
dependencies = app.pipeline.deps_elements(deps)
- app.pipeline.track(app.scheduler, dependencies, cross_junctions=cross_junctions)
+ app.pipeline.track(app.scheduler)
##################################################################
diff --git a/buildstream/_pipeline.py b/buildstream/_pipeline.py
index 98d96328a..4b48deee9 100644
--- a/buildstream/_pipeline.py
+++ b/buildstream/_pipeline.py
@@ -126,9 +126,13 @@ class Pipeline():
# use_configured_remote_caches (bool): Whether to contact configured remote artifact caches
# add_remote_cache (str): The URL for an additional remote artifact cache
# track_element (list of Elements): List of elements specified by the frontend for tracking
+ # track_cross_junctions (bool): Whether tracking is allowed to cross junction boundaries
#
- def initialize(self, use_configured_remote_caches=False,
- add_remote_cache=None, track_elements=None):
+ def initialize(self,
+ use_configured_remote_caches=False,
+ add_remote_cache=None,
+ track_elements=None,
+ track_cross_junctions=False):
# Preflight directly, before ever interrogating caches or anything.
self._preflight()
@@ -150,7 +154,14 @@ class Pipeline():
if has_remote_caches:
self._initialize_remote_caches()
- self._resolve_cache_keys(track_elements)
+ # Work out what we're going track, if anything
+ self._track_cross_junctions = track_cross_junctions
+ self._track_elements = []
+ if track_elements:
+ self._track_elements = self._get_elements_to_track(track_elements)
+
+ # Now resolve the cache keys once tracking elements have been resolved
+ self._resolve_cache_keys()
# cleanup()
#
@@ -221,23 +232,14 @@ class Pipeline():
#
# Args:
# scheduler (Scheduler): The scheduler to run this pipeline on
- # dependencies (list): List of elements to track
- # cross_junctions (bool): Whether to allow cross junction tracking
#
# If no error is encountered while tracking, then the project files
# are rewritten inline.
#
- def track(self, scheduler, dependencies, *, cross_junctions=False):
-
- dependencies = list(dependencies)
- if cross_junctions:
- self._assert_junction_tracking(dependencies)
- else:
- dependencies = self._filter_cross_junctions(dependencies)
-
+ def track(self, scheduler):
track = TrackQueue()
- track.enqueue(dependencies)
- self.session_elements = len(dependencies)
+ track.enqueue(self._track_elements)
+ self.session_elements = len(self._track_elements)
_, status = scheduler.run([track])
if status == SchedStatus.ERROR:
@@ -252,43 +254,30 @@ class Pipeline():
# Args:
# scheduler (Scheduler): The scheduler to run this pipeline on
# dependencies (list): List of elements to fetch
- # track_first (bool): Track new source references before fetching
- # track_cross_junctions (bool): Whether to allow cross junction tracking
#
- def fetch(self, scheduler, dependencies, *, track_first=False, track_cross_junctions=False):
+ def fetch(self, scheduler, dependencies):
fetch_plan = dependencies
- track_plan = []
- # Assert that we have a consistent pipeline, or that
- # the track option will make it consistent
- if track_first:
- track_plan = fetch_plan
-
- if track_cross_junctions:
- self._assert_junction_tracking(track_plan)
- else:
- track_plan = self._filter_cross_junctions(track_plan)
-
- # Subtract the track elements from the fetch elements, they will be added separately
- track_elements = set(track_plan)
+ # Subtract the track elements from the fetch elements, they will be added separately
+ if self._track_elements:
+ track_elements = set(self._track_elements)
fetch_plan = [e for e in fetch_plan if e not in track_elements]
- else:
- # If we're not going to track first, we need to make sure
- # the elements are not in an inconsistent state
- self._assert_consistent(fetch_plan)
+
+ # Assert consistency for the fetch elements
+ self._assert_consistent(fetch_plan)
# Filter out elements with cached sources, only from the fetch plan
# let the track plan resolve new refs.
cached = [elt for elt in fetch_plan if elt._get_consistency() == Consistency.CACHED]
fetch_plan = [elt for elt in fetch_plan if elt not in cached]
- self.session_elements = len(track_plan) + len(fetch_plan)
+ self.session_elements = len(self._track_elements) + len(fetch_plan)
fetch = FetchQueue()
fetch.enqueue(fetch_plan)
- if track_first:
+ if self._track_elements:
track = TrackQueue()
- track.enqueue(track_plan)
+ track.enqueue(self._track_elements)
queues = [track, fetch]
else:
queues = [fetch]
@@ -299,15 +288,6 @@ class Pipeline():
elif status == SchedStatus.TERMINATED:
raise PipelineError(terminated=True)
- def get_elements_to_track(self, track_targets):
- planner = _Planner()
-
- target_elements = [e for e in self.dependencies(Scope.ALL)
- if e.name in track_targets]
- track_elements = planner.plan(target_elements, ignore_cache=True)
-
- return self.remove_elements(track_elements)
-
# build()
#
# Builds (assembles) elements in the pipeline.
@@ -316,32 +296,13 @@ class Pipeline():
# scheduler (Scheduler): The scheduler to run this pipeline on
# build_all (bool): Whether to build all elements, or only those
# which are required to build the target.
- # track_first (list): Elements whose sources to track prior to
- # building
- # track_cross_junctions (bool): Whether to allow cross junction tracking
#
- def build(self, scheduler, *, build_all=False, track_first=False, track_cross_junctions=False):
+ def build(self, scheduler, *, build_all=False):
unused_workspaces = self._collect_unused_workspaces()
if unused_workspaces:
self._message(MessageType.WARN, "Unused workspaces",
detail="\n".join([el for el in unused_workspaces]))
- # We set up two plans; one to track elements, the other to
- # build them once tracking has finished. The first plan
- # contains elements from track_first, the second contains the
- # target elements.
- #
- # The reason we can't use one plan is that the tracking
- # elements may consist of entirely different elements.
- track_plan = []
- if track_first:
- track_plan = self.get_elements_to_track(track_first)
-
- if track_cross_junctions:
- self._assert_junction_tracking(track_plan)
- else:
- track_plan = self._filter_cross_junctions(track_plan)
-
if build_all:
plan = self.dependencies(Scope.ALL)
else:
@@ -349,7 +310,7 @@ class Pipeline():
# We want to start the build queue with any elements that are
# not being tracked first
- track_elements = set(track_plan)
+ track_elements = set(self._track_elements)
plan = [e for e in plan if e not in track_elements]
# Assert that we have a consistent pipeline now (elements in
@@ -362,7 +323,7 @@ class Pipeline():
pull = None
push = None
queues = []
- if track_plan:
+ if self._track_elements:
track = TrackQueue()
queues.append(track)
if self._artifacts.has_fetch_remotes():
@@ -374,13 +335,16 @@ class Pipeline():
push = PushQueue()
queues.append(push)
+ # If we're going to track, tracking elements go into the first queue
+ # which is the tracking queue, the rest of the plan goes into the next
+ # queue (whatever that happens to be)
if track:
- queues[0].enqueue(track_plan)
+ queues[0].enqueue(self._track_elements)
queues[1].enqueue(plan)
else:
queues[0].enqueue(plan)
- self.session_elements = len(track_plan) + len(plan)
+ self.session_elements = len(self._track_elements) + len(plan)
_, status = scheduler.run(queues)
if status == SchedStatus.ERROR:
@@ -595,7 +559,7 @@ class Pipeline():
.format(tar_location, e)) from e
plan = list(dependencies)
- self.fetch(scheduler, plan, track_first=track_first)
+ self.fetch(scheduler, plan)
# We don't use the scheduler for this as it is almost entirely IO
# bound.
@@ -626,6 +590,37 @@ class Pipeline():
# Private Methods #
#############################################################
+ # _get_elements_to_track():
+ #
+ # Work out which elements are going to be tracked
+ #
+ # Args:
+ # (list of str): List of target names
+ #
+ # Returns:
+ # (list): List of Element objects to track
+ #
+ def _get_elements_to_track(self, track_targets):
+ planner = _Planner()
+
+ # Convert target names to elements
+ target_elements = [e for e in self.dependencies(Scope.ALL)
+ if e.name in track_targets]
+
+ # Plan them out
+ track_elements = planner.plan(target_elements, ignore_cache=True)
+
+ # Filter out --except elements
+ track_elements = self.remove_elements(track_elements)
+
+ # Filter out cross junctioned elements
+ if self._track_cross_junctions:
+ self._assert_junction_tracking(track_elements)
+ else:
+ track_elements = self._filter_cross_junctions(track_elements)
+
+ return track_elements
+
# _resolve()
#
# Instantiates plugin-provided Element and Source instances
@@ -708,13 +703,12 @@ class Pipeline():
#
# Initially resolve the cache keys
#
- def _resolve_cache_keys(self, track_elements):
- if track_elements:
- track_elements = self.get_elements_to_track(track_elements)
+ def _resolve_cache_keys(self):
+ track_elements = set(self._track_elements)
with self.context.timed_activity("Resolving cached state", silent_nested=True):
for element in self.dependencies(Scope.ALL):
- if track_elements and element in track_elements:
+ if element in track_elements:
# Load the pipeline in an explicitly inconsistent state, use
# this for pipelines with tracking queues enabled.
element._schedule_tracking()