diff options
Diffstat (limited to 'buildstream/_pipeline.py')
-rw-r--r-- | buildstream/_pipeline.py | 137 |
1 files changed, 92 insertions, 45 deletions
diff --git a/buildstream/_pipeline.py b/buildstream/_pipeline.py index ec374df71..c1d3c1b56 100644 --- a/buildstream/_pipeline.py +++ b/buildstream/_pipeline.py @@ -51,7 +51,7 @@ class Planner(): # Here we want to traverse the same element more than once when # it is reachable from multiple places, with the interest of finding # the deepest occurance of every element - def plan_element(self, element, depth): + def plan_element(self, element, depth, ignore_cache): if element in self.visiting_elements: # circular dependency, already being processed return @@ -63,22 +63,22 @@ class Planner(): self.visiting_elements.add(element) for dep in element.dependencies(Scope.RUN, recurse=False): - self.plan_element(dep, depth) + self.plan_element(dep, depth, ignore_cache) # Dont try to plan builds of elements that are cached already - if not element._cached() and not element._remotely_cached(): + if ignore_cache or (not element._cached() and not element._remotely_cached()): for dep in element.dependencies(Scope.BUILD, recurse=False): - self.plan_element(dep, depth + 1) + self.plan_element(dep, depth + 1, ignore_cache) self.depth_map[element] = depth self.visiting_elements.remove(element) - def plan(self, roots): + def plan(self, roots, ignore_cache=False): for root in roots: - self.plan_element(root, 0) + self.plan_element(root, 0, ignore_cache) depth_sorted = sorted(self.depth_map.items(), key=itemgetter(1), reverse=True) - return [item[0] for item in depth_sorted if not item[0]._cached()] + return [item[0] for item in depth_sorted if ignore_cache or not item[0]._cached()] # Pipeline() @@ -112,13 +112,11 @@ class Planner(): class Pipeline(): def __init__(self, context, project, targets, except_, - inconsistent=False, rewritable=False, - use_remote_cache=False, - load_ticker=None, - resolve_ticker=None, remote_ticker=None, - cache_ticker=None): + cache_ticker=None, + load_ticker=None, + resolve_ticker=None): self.context = context self.project = project self.session_elements = 0 @@ -130,6 +128,8 @@ class Pipeline(): Platform._create_instance(context, project) self.platform = Platform.get_platform() self.artifacts = self.platform.artifactcache + self.remote_ticker = remote_ticker + self.cache_ticker = cache_ticker loader = Loader(self.project.element_path, targets + except_, self.project._options) @@ -152,14 +152,26 @@ class Pipeline(): if resolve_ticker: resolve_ticker(None) - # Preflight directly after resolving elements, before ever interrogating - # caches or anything. - for plugin in self.dependencies(Scope.ALL, include_sources=True): - plugin.preflight() + def initialize(self, use_remote_cache=False, inconsistent=None): + # Preflight directly, before ever interrogating caches or + # anything. + self.preflight() self.total_elements = len(list(self.dependencies(Scope.ALL))) - for element_name, source, workspace in project._list_workspaces(): + self.initialize_workspaces() + + if use_remote_cache and self.artifacts.can_fetch(): + self.fetch_remote_refs() + + self.resolve_cache_keys(inconsistent) + + def preflight(self): + for plugin in self.dependencies(Scope.ALL, include_sources=True): + plugin.preflight() + + def initialize_workspaces(self): + for element_name, source, workspace in self.project._list_workspaces(): for target in self.targets: element = target.search(Scope.ALL, element_name) @@ -169,21 +181,25 @@ class Pipeline(): self.project._set_workspace(element, source, workspace) - if use_remote_cache and self.artifacts.can_fetch(): - try: - if remote_ticker: - remote_ticker(self.artifacts.url) - self.artifacts.initialize_remote() - self.artifacts.fetch_remote_refs() - except ArtifactError: - self.message(MessageType.WARN, "Failed to fetch remote refs") - self.artifacts.set_offline() + def fetch_remote_refs(self): + try: + if self.remote_ticker: + self.remote_ticker(self.artifacts.url) + self.artifacts.initialize_remote() + self.artifacts.fetch_remote_refs() + except ArtifactError: + self.message(MessageType.WARN, "Failed to fetch remote refs") + self.artifacts.set_offline() + + def resolve_cache_keys(self, inconsistent): + if inconsistent: + inconsistent = self.get_elements_to_track(inconsistent) for element in self.dependencies(Scope.ALL): - if cache_ticker: - cache_ticker(element.name) + if self.cache_ticker: + self.cache_ticker(element.name) - if inconsistent: + if inconsistent and element in inconsistent: # Load the pipeline in an explicitly inconsistent state, use # this for pipelines with tracking queues enabled. element._force_inconsistent() @@ -192,8 +208,8 @@ class Pipeline(): # for the first time. element._cached() - if cache_ticker: - cache_ticker(None) + if self.cache_ticker: + self.cache_ticker(None) # Generator function to iterate over elements and optionally # also iterate over sources. @@ -234,9 +250,11 @@ class Pipeline(): # which are required to build the pipeline target, omitting # cached elements. The elements are yielded in a depth sorted # ordering for optimal build plans - def plan(self): + def plan(self, except_=True): build_plan = Planner().plan(self.targets) - self.remove_elements(build_plan) + + if except_: + build_plan = self.remove_elements(build_plan) for element in build_plan: yield element @@ -302,7 +320,7 @@ class Pipeline(): def track(self, scheduler, dependencies): dependencies = list(dependencies) - track = TrackQueue() + track = TrackQueue(save=True) track.enqueue(dependencies) self.session_elements = len(dependencies) @@ -374,6 +392,15 @@ class Pipeline(): "Fetched {} elements".format(fetched), elapsed=elapsed) + def get_elements_to_track(self, track_targets): + planner = Planner() + + target_elements = [e for e in self.dependencies(Scope.ALL) + if e.name in track_targets] + track_elements = planner.plan(target_elements, ignore_cache=True) + + return self.remove_elements(track_elements) + # build() # # Builds (assembles) elements in the pipeline. @@ -393,15 +420,30 @@ class Pipeline(): detail="\n".join([el + "-" + str(src) for el, src, _ in self.unused_workspaces])) - if build_all or track_first: - plan = list(self.dependencies(Scope.ALL)) + # We set up two plans; one to track elements, the other to + # build them once tracking has finished. The first plan + # contains elements from track_first, the second contains the + # target elements. + # + # The reason we can't use one plan is that the tracking + # elements may consist of entirely different elements. + track_plan = [] + if track_first: + track_plan = self.get_elements_to_track(track_first) + + if build_all: + plan = self.dependencies(Scope.ALL) else: - plan = list(self.plan()) + plan = self.plan() - # Assert that we have a consistent pipeline, or that - # the track option will make it consistent - if not track_first: - self.assert_consistent(plan) + # We want to start the build queue with any elements that are + # not being tracked first + track_elements = set(track_plan) + plan = [e for e in plan if e not in track_elements] + + # Assert that we have a consistent pipeline now (elements in + # track_plan will be made consistent) + self.assert_consistent(plan) fetch = FetchQueue(skip_cached=True) build = BuildQueue() @@ -409,7 +451,7 @@ class Pipeline(): pull = None push = None queues = [] - if track_first: + if track_plan: track = TrackQueue(save=save) queues.append(track) if self.artifacts.can_fetch(): @@ -420,9 +462,14 @@ class Pipeline(): if self.artifacts.can_push(): push = PushQueue() queues.append(push) - queues[0].enqueue(plan) - self.session_elements = len(plan) + if track: + queues[0].enqueue(track_plan) + queues[1].enqueue(plan) + else: + queues[0].enqueue(plan) + + self.session_elements = len(track_plan) + len(plan) self.message(MessageType.START, "Starting build") elapsed, status = scheduler.run(queues) @@ -792,7 +839,7 @@ class Pipeline(): # use in the result, this function reports a list that is appropriate for # the selected option. # - def deps_elements(self, mode, except_=None): + def deps_elements(self, mode): elements = None if mode == 'none': |