summaryrefslogtreecommitdiff
path: root/buildstream/_pipeline.py
diff options
context:
space:
mode:
Diffstat (limited to 'buildstream/_pipeline.py')
-rw-r--r--buildstream/_pipeline.py137
1 files changed, 92 insertions, 45 deletions
diff --git a/buildstream/_pipeline.py b/buildstream/_pipeline.py
index ec374df71..c1d3c1b56 100644
--- a/buildstream/_pipeline.py
+++ b/buildstream/_pipeline.py
@@ -51,7 +51,7 @@ class Planner():
# Here we want to traverse the same element more than once when
# it is reachable from multiple places, with the interest of finding
# the deepest occurance of every element
- def plan_element(self, element, depth):
+ def plan_element(self, element, depth, ignore_cache):
if element in self.visiting_elements:
# circular dependency, already being processed
return
@@ -63,22 +63,22 @@ class Planner():
self.visiting_elements.add(element)
for dep in element.dependencies(Scope.RUN, recurse=False):
- self.plan_element(dep, depth)
+ self.plan_element(dep, depth, ignore_cache)
# Dont try to plan builds of elements that are cached already
- if not element._cached() and not element._remotely_cached():
+ if ignore_cache or (not element._cached() and not element._remotely_cached()):
for dep in element.dependencies(Scope.BUILD, recurse=False):
- self.plan_element(dep, depth + 1)
+ self.plan_element(dep, depth + 1, ignore_cache)
self.depth_map[element] = depth
self.visiting_elements.remove(element)
- def plan(self, roots):
+ def plan(self, roots, ignore_cache=False):
for root in roots:
- self.plan_element(root, 0)
+ self.plan_element(root, 0, ignore_cache)
depth_sorted = sorted(self.depth_map.items(), key=itemgetter(1), reverse=True)
- return [item[0] for item in depth_sorted if not item[0]._cached()]
+ return [item[0] for item in depth_sorted if ignore_cache or not item[0]._cached()]
# Pipeline()
@@ -112,13 +112,11 @@ class Planner():
class Pipeline():
def __init__(self, context, project, targets, except_,
- inconsistent=False,
rewritable=False,
- use_remote_cache=False,
- load_ticker=None,
- resolve_ticker=None,
remote_ticker=None,
- cache_ticker=None):
+ cache_ticker=None,
+ load_ticker=None,
+ resolve_ticker=None):
self.context = context
self.project = project
self.session_elements = 0
@@ -130,6 +128,8 @@ class Pipeline():
Platform._create_instance(context, project)
self.platform = Platform.get_platform()
self.artifacts = self.platform.artifactcache
+ self.remote_ticker = remote_ticker
+ self.cache_ticker = cache_ticker
loader = Loader(self.project.element_path, targets + except_,
self.project._options)
@@ -152,14 +152,26 @@ class Pipeline():
if resolve_ticker:
resolve_ticker(None)
- # Preflight directly after resolving elements, before ever interrogating
- # caches or anything.
- for plugin in self.dependencies(Scope.ALL, include_sources=True):
- plugin.preflight()
+ def initialize(self, use_remote_cache=False, inconsistent=None):
+ # Preflight directly, before ever interrogating caches or
+ # anything.
+ self.preflight()
self.total_elements = len(list(self.dependencies(Scope.ALL)))
- for element_name, source, workspace in project._list_workspaces():
+ self.initialize_workspaces()
+
+ if use_remote_cache and self.artifacts.can_fetch():
+ self.fetch_remote_refs()
+
+ self.resolve_cache_keys(inconsistent)
+
+ def preflight(self):
+ for plugin in self.dependencies(Scope.ALL, include_sources=True):
+ plugin.preflight()
+
+ def initialize_workspaces(self):
+ for element_name, source, workspace in self.project._list_workspaces():
for target in self.targets:
element = target.search(Scope.ALL, element_name)
@@ -169,21 +181,25 @@ class Pipeline():
self.project._set_workspace(element, source, workspace)
- if use_remote_cache and self.artifacts.can_fetch():
- try:
- if remote_ticker:
- remote_ticker(self.artifacts.url)
- self.artifacts.initialize_remote()
- self.artifacts.fetch_remote_refs()
- except ArtifactError:
- self.message(MessageType.WARN, "Failed to fetch remote refs")
- self.artifacts.set_offline()
+ def fetch_remote_refs(self):
+ try:
+ if self.remote_ticker:
+ self.remote_ticker(self.artifacts.url)
+ self.artifacts.initialize_remote()
+ self.artifacts.fetch_remote_refs()
+ except ArtifactError:
+ self.message(MessageType.WARN, "Failed to fetch remote refs")
+ self.artifacts.set_offline()
+
+ def resolve_cache_keys(self, inconsistent):
+ if inconsistent:
+ inconsistent = self.get_elements_to_track(inconsistent)
for element in self.dependencies(Scope.ALL):
- if cache_ticker:
- cache_ticker(element.name)
+ if self.cache_ticker:
+ self.cache_ticker(element.name)
- if inconsistent:
+ if inconsistent and element in inconsistent:
# Load the pipeline in an explicitly inconsistent state, use
# this for pipelines with tracking queues enabled.
element._force_inconsistent()
@@ -192,8 +208,8 @@ class Pipeline():
# for the first time.
element._cached()
- if cache_ticker:
- cache_ticker(None)
+ if self.cache_ticker:
+ self.cache_ticker(None)
# Generator function to iterate over elements and optionally
# also iterate over sources.
@@ -234,9 +250,11 @@ class Pipeline():
# which are required to build the pipeline target, omitting
# cached elements. The elements are yielded in a depth sorted
# ordering for optimal build plans
- def plan(self):
+ def plan(self, except_=True):
build_plan = Planner().plan(self.targets)
- self.remove_elements(build_plan)
+
+ if except_:
+ build_plan = self.remove_elements(build_plan)
for element in build_plan:
yield element
@@ -302,7 +320,7 @@ class Pipeline():
def track(self, scheduler, dependencies):
dependencies = list(dependencies)
- track = TrackQueue()
+ track = TrackQueue(save=True)
track.enqueue(dependencies)
self.session_elements = len(dependencies)
@@ -374,6 +392,15 @@ class Pipeline():
"Fetched {} elements".format(fetched),
elapsed=elapsed)
+ def get_elements_to_track(self, track_targets):
+ planner = Planner()
+
+ target_elements = [e for e in self.dependencies(Scope.ALL)
+ if e.name in track_targets]
+ track_elements = planner.plan(target_elements, ignore_cache=True)
+
+ return self.remove_elements(track_elements)
+
# build()
#
# Builds (assembles) elements in the pipeline.
@@ -393,15 +420,30 @@ class Pipeline():
detail="\n".join([el + "-" + str(src) for el, src, _
in self.unused_workspaces]))
- if build_all or track_first:
- plan = list(self.dependencies(Scope.ALL))
+ # We set up two plans; one to track elements, the other to
+ # build them once tracking has finished. The first plan
+ # contains elements from track_first, the second contains the
+ # target elements.
+ #
+ # The reason we can't use one plan is that the tracking
+ # elements may consist of entirely different elements.
+ track_plan = []
+ if track_first:
+ track_plan = self.get_elements_to_track(track_first)
+
+ if build_all:
+ plan = self.dependencies(Scope.ALL)
else:
- plan = list(self.plan())
+ plan = self.plan()
- # Assert that we have a consistent pipeline, or that
- # the track option will make it consistent
- if not track_first:
- self.assert_consistent(plan)
+ # We want to start the build queue with any elements that are
+ # not being tracked first
+ track_elements = set(track_plan)
+ plan = [e for e in plan if e not in track_elements]
+
+ # Assert that we have a consistent pipeline now (elements in
+ # track_plan will be made consistent)
+ self.assert_consistent(plan)
fetch = FetchQueue(skip_cached=True)
build = BuildQueue()
@@ -409,7 +451,7 @@ class Pipeline():
pull = None
push = None
queues = []
- if track_first:
+ if track_plan:
track = TrackQueue(save=save)
queues.append(track)
if self.artifacts.can_fetch():
@@ -420,9 +462,14 @@ class Pipeline():
if self.artifacts.can_push():
push = PushQueue()
queues.append(push)
- queues[0].enqueue(plan)
- self.session_elements = len(plan)
+ if track:
+ queues[0].enqueue(track_plan)
+ queues[1].enqueue(plan)
+ else:
+ queues[0].enqueue(plan)
+
+ self.session_elements = len(track_plan) + len(plan)
self.message(MessageType.START, "Starting build")
elapsed, status = scheduler.run(queues)
@@ -792,7 +839,7 @@ class Pipeline():
# use in the result, this function reports a list that is appropriate for
# the selected option.
#
- def deps_elements(self, mode, except_=None):
+ def deps_elements(self, mode):
elements = None
if mode == 'none':