diff options
Diffstat (limited to 'src/buildstream')
-rw-r--r-- | src/buildstream/_pipeline.py | 466 | ||||
-rw-r--r-- | src/buildstream/_stream.py | 35 |
2 files changed, 269 insertions, 232 deletions
diff --git a/src/buildstream/_pipeline.py b/src/buildstream/_pipeline.py index d365fa85a..e1e6dcf39 100644 --- a/src/buildstream/_pipeline.py +++ b/src/buildstream/_pipeline.py @@ -1,5 +1,5 @@ # -# Copyright (C) 2016-2018 Codethink Limited +# Copyright (C) 2016-2020 Codethink Limited # # This program is free software; you can redistribute it and/or # modify it under the terms of the GNU Lesser General Public @@ -20,234 +20,274 @@ # Tristan Maat <tristan.maat@codethink.co.uk> import itertools -from operator import itemgetter -from collections import OrderedDict +from collections import OrderedDict +from operator import itemgetter +from typing import List, Iterator from pyroaring import BitMap # pylint: disable=no-name-in-module -from ._exceptions import PipelineError +from .element import Element from .types import _PipelineSelection, _Scope +from ._context import Context +from ._exceptions import PipelineError + -# Pipeline() +# dependencies() +# +# Generator function to iterate over the dependencies of multiple +# targets in the specified scope, while guaranteeing that a given +# element is never yielded more than once. # # Args: -# project (Project): The Project object -# context (Context): The Context object -# artifacts (Context): The ArtifactCache object +# targets: The target Elements to loop over +# scope: An integer value from the _Scope enum, the scope to iterate over +# recurse: Whether to recurse into dependencies # -class Pipeline: - def __init__(self, context, project, artifacts): - self._context = context # The Context - self._project = project # The toplevel project - self._artifacts = artifacts # The artifact cache +# Yields: +# Elements in the scope of the specified target elements +# +def dependencies(targets: List[Element], scope: int, *, recurse: bool = True) -> Iterator[Element]: + # Keep track of 'visited' in this scope, so that all targets + # share the same context. + visited = (BitMap(), BitMap()) - # dependencies() - # - # Generator function to iterate over elements and optionally - # also iterate over sources. - # - # Args: - # targets (list of Element): The target Elements to loop over - # scope (_Scope): The scope to iterate over - # recurse (bool): Whether to recurse into dependencies - # - def dependencies(self, targets, scope, *, recurse=True): - # Keep track of 'visited' in this scope, so that all targets - # share the same context. - visited = (BitMap(), BitMap()) + for target in targets: + for element in target._dependencies(scope, recurse=recurse, visited=visited): + yield element - for target in targets: - for element in target._dependencies(scope, recurse=recurse, visited=visited): - yield element - # plan() - # - # Generator function to iterate over only the elements - # which are required to build the pipeline target, omitting - # cached elements. The elements are yielded in a depth sorted - # ordering for optimal build plans - # - # Args: - # elements (list of Element): List of target elements to plan - # - # Returns: - # (list of Element): A depth sorted list of the build plan - # - def plan(self, elements): +# get_selection() +# +# Gets a full list of elements based on a toplevel +# list of element targets +# +# Various commands define a --deps option to specify what elements to +# use in the result, this function reports a list that is appropriate for +# the selected option. +# +# Args: +# context: The invocation context +# targets: The target Elements +# mode: A value from PipelineSelection enumeration +# silent: Whether to silence messages +# +# Returns: +# A list of Elements appropriate for the specified selection mode +# +def get_selection(context: Context, targets: List[Element], mode: str, *, silent: bool = True) -> List[Element]: + def redirect_and_log() -> List[Element]: + # Redirect and log if permitted + elements: List[Element] = [] + for t in targets: + new_elm = t._get_source_element() + if new_elm != t and not silent: + context.messenger.info("Element '{}' redirected to '{}'".format(t.name, new_elm.name)) + if new_elm not in elements: + elements.append(new_elm) + return elements + + def plan() -> List[Element]: # Keep locally cached elements in the plan if remote artifact cache is used # to allow pulling artifact with strict cache key, if available. - plan_cached = not self._context.get_strict() and self._artifacts.has_fetch_remotes() + plan_cached = not context.get_strict() and context.artifactcache.has_fetch_remotes() + return _Planner().plan(targets, plan_cached) - return _Planner().plan(elements, plan_cached) - - # get_selection() - # - # Gets a full list of elements based on a toplevel - # list of element targets - # - # Args: - # targets (list of Element): The target Elements - # mode (_PipelineSelection): The PipelineSelection mode - # - # Various commands define a --deps option to specify what elements to - # use in the result, this function reports a list that is appropriate for - # the selected option. - # - def get_selection(self, targets, mode, *, silent=True): - def redirect_and_log(): - # Redirect and log if permitted - elements = [] - for t in targets: - new_elm = t._get_source_element() - if new_elm != t and not silent: - self._context.messenger.info("Element '{}' redirected to '{}'".format(t.name, new_elm.name)) - if new_elm not in elements: - elements.append(new_elm) - return elements - - # Work around python not having a switch statement; this is - # much clearer than the if/elif/else block we used to have. - # - # Note that the lambda is necessary so that we don't evaluate - # all possible values at run time; that would be slow. - return { - _PipelineSelection.NONE: lambda: targets, - _PipelineSelection.REDIRECT: redirect_and_log, - _PipelineSelection.PLAN: lambda: self.plan(targets), - _PipelineSelection.ALL: lambda: list(self.dependencies(targets, _Scope.ALL)), - _PipelineSelection.BUILD: lambda: list(self.dependencies(targets, _Scope.BUILD)), - _PipelineSelection.RUN: lambda: list(self.dependencies(targets, _Scope.RUN)), - }[mode]() - - # except_elements(): - # - # Return what we are left with after the intersection between - # excepted and target elements and their unique dependencies is - # gone. - # - # Args: - # targets (list of Element): List of toplevel targetted elements - # elements (list of Element): The list to remove elements from - # except_targets (list of Element): List of toplevel except targets - # - # Returns: - # (list of Element): The elements list with the intersected - # exceptions removed - # - def except_elements(self, targets, elements, except_targets): - if not except_targets: - return elements - - targeted = list(self.dependencies(targets, _Scope.ALL)) - visited = [] - - def find_intersection(element): - if element in visited: - return - visited.append(element) - - # Intersection elements are those that are also in - # 'targeted', as long as we don't recurse into them. - if element in targeted: - yield element - else: - for dep in element._dependencies(_Scope.ALL, recurse=False): - yield from find_intersection(dep) - - # Build a list of 'intersection' elements, i.e. the set of - # elements that lie on the border closest to excepted elements - # between excepted and target elements. - intersection = list(itertools.chain.from_iterable(find_intersection(element) for element in except_targets)) - - # Now use this set of elements to traverse the targeted - # elements, except 'intersection' elements and their unique - # dependencies. - queue = [] - visited = [] - - queue.extend(targets) - while queue: - element = queue.pop() - if element in visited or element in intersection: - continue - visited.append(element) - - queue.extend(element._dependencies(_Scope.ALL, recurse=False)) - - # That looks like a lot, but overall we only traverse (part - # of) the graph twice. This could be reduced to once if we - # kept track of parent elements, but is probably not - # significant. - - # Ensure that we return elements in the same order they were - # in before. - return [element for element in elements if element in visited] - - # assert_consistent() - # - # Asserts that the given list of elements are in a consistent state, that - # is to say that all sources are consistent and can at least be fetched. + # Work around python not having a switch statement; this is + # much clearer than the if/elif/else block we used to have. # - # Consequently it also means that cache keys can be resolved. - # - def assert_consistent(self, elements): - inconsistent = [] - inconsistent_workspaced = [] - with self._context.messenger.timed_activity("Checking sources"): - for element in elements: - if not element._has_all_sources_resolved(): - if element._get_workspace(): - inconsistent_workspaced.append(element) - else: - inconsistent.append(element) - - if inconsistent: - detail = "Exact versions are missing for the following elements:\n\n" - for element in inconsistent: - detail += " Element: {} is inconsistent\n".format(element._get_full_name()) - for source in element.sources(): - if not source.is_resolved(): - detail += " {} is missing ref\n".format(source) - detail += "\n" - detail += "Try tracking these elements first with `bst source track`\n" - - raise PipelineError("Inconsistent pipeline", detail=detail, reason="inconsistent-pipeline") - - if inconsistent_workspaced: - detail = "Some workspaces exist but are not closed\n" + "Try closing them with `bst workspace close`\n\n" - for element in inconsistent_workspaced: - detail += " " + element._get_full_name() + "\n" - raise PipelineError("Inconsistent pipeline", detail=detail, reason="inconsistent-pipeline-workspaced") - - # assert_sources_cached() - # - # Asserts that sources for the given list of elements are cached. - # - # Args: - # elements (list): The list of elements - # - def assert_sources_cached(self, elements): - uncached = [] - with self._context.messenger.timed_activity("Checking sources"): - for element in elements: - if element._fetch_needed(): - uncached.append(element) - - if uncached: - detail = "Sources are not cached for the following elements:\n\n" - for element in uncached: - detail += " Following sources for element: {} are not cached:\n".format(element._get_full_name()) - for source in element.sources(): - if not source._is_cached(): - detail += " {}\n".format(source) - detail += "\n" - detail += ( - "Try fetching these elements first with `bst source fetch`,\n" - + "or run this command with `--fetch` option\n" - ) - - raise PipelineError("Uncached sources", detail=detail, reason="uncached-sources") + # Note that the lambda is necessary so that we don't evaluate + # all possible values at run time; that would be slow. + return { + _PipelineSelection.NONE: lambda: targets, + _PipelineSelection.REDIRECT: redirect_and_log, + _PipelineSelection.PLAN: plan, + _PipelineSelection.ALL: lambda: list(dependencies(targets, _Scope.ALL)), + _PipelineSelection.BUILD: lambda: list(dependencies(targets, _Scope.BUILD)), + _PipelineSelection.RUN: lambda: list(dependencies(targets, _Scope.RUN)), + }[mode]() + + +# except_elements(): +# +# This function calculates the intersection of the `except_targets` +# element dependencies and the `targets` dependencies, and removes +# that intersection from the `elements` list, returning the result. +# +# Args: +# targets: List of toplevel targetted elements +# elements: The list to remove elements from +# except_targets: List of toplevel except targets +# +# Returns: +# The elements list with the intersected exceptions removed +# +# Important notes on the behavior +# =============================== +# +# * Except elements can be completely outside of the scope +# of targets. +# +# * When the dependencies of except elements intersect with +# dependencies of targets, those dependencies are removed +# from the result. +# +# * If a target is found within the intersection of excepted +# elements, that target and it's dependencies are considered +# exempt from the exception intersection. +# +# Example: +# +# (t1) (e1) +# / \ / +# (o) (o) ( ) +# / \ / \ +# (o) (x) ( ) +# \ / \ +# (o) (x) ( ) +# \ / +# (x) +# / \ +# (x) (t2) +# / \ / \ +# (x) (x) (o) +# / \ +# (o) (o) +# +# Here we have a mockup graph with 2 target elements (t1) and (t2), +# and one except element (e1) which lies outside of the graph. +# +# - ( ) elements are ignored, they were never in the element list +# - (o) elements will be included in the result +# - (x) elements are removed from the graph +# +# Note how (t2) reintroduces portions of the graph which were otherwise +# tainted by being depended on indirectly by the (e1) except element. +# +def except_elements(targets: List[Element], elements: List[Element], except_targets: List[Element]) -> List[Element]: + if not except_targets: + return elements + + targeted: List[Element] = list(dependencies(targets, _Scope.ALL)) + visited: List[Element] = [] + + def find_intersection(element: Element) -> Iterator[Element]: + if element in visited: + return + visited.append(element) + + # Intersection elements are those that are also in + # 'targeted', as long as we don't recurse into them. + if element in targeted: + yield element + else: + for dep in element._dependencies(_Scope.ALL, recurse=False): + yield from find_intersection(dep) + + # Build a list of 'intersection' elements, i.e. the set of + # elements that lie on the border closest to excepted elements + # between excepted and target elements. + intersection = list(itertools.chain.from_iterable(find_intersection(element) for element in except_targets)) + + # Now use this set of elements to traverse the targeted + # elements, except 'intersection' elements and their unique + # dependencies. + queue = [] + visited = [] + + queue.extend(targets) + while queue: + element = queue.pop() + if element in visited or element in intersection: + continue + visited.append(element) + + queue.extend(element._dependencies(_Scope.ALL, recurse=False)) + + # That looks like a lot, but overall we only traverse (part + # of) the graph twice. This could be reduced to once if we + # kept track of parent elements, but is probably not + # significant. + + # Ensure that we return elements in the same order they were + # in before. + return [element for element in elements if element in visited] + + +# assert_consistent() +# +# Asserts that the given list of elements are in a consistent state, that +# is to say that all sources are consistent and can at least be fetched. +# +# Consequently it also means that cache keys can be resolved. +# +# Args: +# context: The invocation context +# elements: The elements to assert consistency on +# +# Raises: +# PipelineError: If the elements are inconsistent. +# +def assert_consistent(context: Context, elements: List[Element]) -> None: + inconsistent = [] + inconsistent_workspaced = [] + with context.messenger.timed_activity("Checking sources"): + for element in elements: + if not element._has_all_sources_resolved(): + if element._get_workspace(): + inconsistent_workspaced.append(element) + else: + inconsistent.append(element) + + if inconsistent: + detail = "Exact versions are missing for the following elements:\n\n" + for element in inconsistent: + detail += " Element: {} is inconsistent\n".format(element._get_full_name()) + for source in element.sources(): + if not source.is_resolved(): + detail += " {} is missing ref\n".format(source) + detail += "\n" + detail += "Try tracking these elements first with `bst source track`\n" + raise PipelineError("Inconsistent pipeline", detail=detail, reason="inconsistent-pipeline") + + if inconsistent_workspaced: + detail = "Some workspaces exist but are not closed\n" + "Try closing them with `bst workspace close`\n\n" + for element in inconsistent_workspaced: + detail += " " + element._get_full_name() + "\n" + raise PipelineError("Inconsistent pipeline", detail=detail, reason="inconsistent-pipeline-workspaced") + + +# assert_sources_cached() +# +# Asserts that sources for the given list of elements are cached. +# +# Args: +# context: The invocation context +# elements: The elements to assert cached source state for +# +# Raises: +# PipelineError: If the elements have uncached sources +# +def assert_sources_cached(context: Context, elements: List[Element]): + uncached = [] + with context.messenger.timed_activity("Checking sources"): + for element in elements: + if element._fetch_needed(): + uncached.append(element) + + if uncached: + detail = "Sources are not cached for the following elements:\n\n" + for element in uncached: + detail += " Following sources for element: {} are not cached:\n".format(element._get_full_name()) + for source in element.sources(): + if not source._is_cached(): + detail += " {}\n".format(source) + detail += "\n" + detail += ( + "Try fetching these elements first with `bst source fetch`,\n" + + "or run this command with `--fetch` option\n" + ) + raise PipelineError("Uncached sources", detail=detail, reason="uncached-sources") # _Planner() diff --git a/src/buildstream/_stream.py b/src/buildstream/_stream.py index 09735678b..35cb230ed 100644 --- a/src/buildstream/_stream.py +++ b/src/buildstream/_stream.py @@ -1,5 +1,5 @@ # -# Copyright (C) 2018 Codethink Limited +# Copyright (C) 2020 Codethink Limited # # This program is free software; you can redistribute it and/or # modify it under the terms of the GNU Lesser General Public @@ -45,13 +45,12 @@ from ._scheduler import ( ArtifactPushQueue, ) from .element import Element -from ._pipeline import Pipeline from ._profile import Topics, PROFILER from ._project import ProjectRefStorage from ._state import State from .types import _KeyStrength, _PipelineSelection, _Scope from .plugin import Plugin -from . import utils, _yaml, _site +from . import utils, _yaml, _site, _pipeline # Stream() @@ -86,7 +85,6 @@ class Stream: self._elementsourcescache = None self._sourcecache = None self._project = None - self._pipeline = None self._state = State(session_start) # Owned by Stream, used by Core to set state self._notification_queue = deque() @@ -127,7 +125,6 @@ class Stream: assert self._project is None self._project = project self._project.load_context.set_fetch_subprojects(self._fetch_subprojects) - self._pipeline = Pipeline(self._context, project, self._artifacts) # load_selection() # @@ -220,7 +217,7 @@ class Stream: self._enqueue_plan(plan) self._run() - missing_deps = [dep for dep in self._pipeline.dependencies([element], scope) if not dep._cached()] + missing_deps = [dep for dep in _pipeline.dependencies([element], scope) if not dep._cached()] if missing_deps: raise StreamError( "Elements need to be built or downloaded before staging a shell environment", @@ -250,7 +247,7 @@ class Stream: # Ensure we have our sources if we are launching a build shell if scope == _Scope.BUILD and not usebuildtree: self._fetch([element]) - self._pipeline.assert_sources_cached([element]) + _pipeline.assert_sources_cached(self._context, [element]) return element._shell( scope, mounts=mounts, isolate=isolate, prompt=prompt(element), command=command, usebuildtree=usebuildtree @@ -286,7 +283,7 @@ class Stream: ) # Assert that the elements are consistent - self._pipeline.assert_consistent(elements) + _pipeline.assert_consistent(self._context, elements) if all(project.remote_execution_specs for project in self._context.get_projects()): # Remote execution is configured for all projects. @@ -411,7 +408,7 @@ class Stream: if not self._sourcecache.has_push_remotes(): raise StreamError("No source caches available for pushing sources") - self._pipeline.assert_consistent(elements) + _pipeline.assert_consistent(self._context, elements) self._add_queue(FetchQueue(self._scheduler)) @@ -451,7 +448,7 @@ class Stream: if not self._artifacts.has_fetch_remotes(): raise StreamError("No artifact caches available for pulling artifacts") - self._pipeline.assert_consistent(elements) + _pipeline.assert_consistent(self._context, elements) self._scheduler.clear_queues() self._add_queue(PullQueue(self._scheduler)) self._enqueue_plan(elements) @@ -492,7 +489,7 @@ class Stream: if not self._artifacts.has_push_remotes(): raise StreamError("No artifact caches available for pushing artifacts") - self._pipeline.assert_consistent(elements) + _pipeline.assert_consistent(self._context, elements) self._scheduler.clear_queues() self._add_queue(PullQueue(self._scheduler)) @@ -748,7 +745,7 @@ class Stream: # Assert all sources are cached in the source dir self._fetch(elements) - self._pipeline.assert_sources_cached(elements) + _pipeline.assert_sources_cached(self._context, elements) # Stage all sources determined by scope try: @@ -1303,11 +1300,11 @@ class Stream: track_selected = [] for project, project_elements in track_projects.items(): - selected = self._pipeline.get_selection(project_elements, selection) + selected = _pipeline.get_selection(self._context, project_elements, selection) selected = self._track_cross_junction_filter(project, selected, cross_junctions) track_selected.extend(selected) - return self._pipeline.except_elements(elements, track_selected, except_elements) + return _pipeline.except_elements(elements, track_selected, except_elements) # _track_cross_junction_filter() # @@ -1413,8 +1410,8 @@ class Stream: # Now move on to loading primary selection. # self._resolve_elements(self.targets) - selected = self._pipeline.get_selection(self.targets, selection, silent=False) - selected = self._pipeline.except_elements(self.targets, selected, except_elements) + selected = _pipeline.get_selection(self._context, self.targets, selection, silent=False) + selected = _pipeline.except_elements(self.targets, selected, except_elements) if selection == _PipelineSelection.PLAN and dynamic_plan: # We use a dynamic build plan, only request artifacts of top-level targets, @@ -1446,7 +1443,7 @@ class Stream: # to happen, even for large projects (tested with the Debian stack). Although, # if it does become a problem we may have to set the recursion limit to a # greater value. - for element in self._pipeline.dependencies(targets, _Scope.ALL): + for element in _pipeline.dependencies(targets, _Scope.ALL): # Determine initial element state. element._initialize_state() @@ -1501,7 +1498,7 @@ class Stream: # Inform the frontend of the full list of elements # and the list of elements which will be processed in this run # - self.total_elements = list(self._pipeline.dependencies(self.targets, _Scope.ALL)) + self.total_elements = list(_pipeline.dependencies(self.targets, _Scope.ALL)) if announce_session and self._session_start_callback is not None: self._session_start_callback() @@ -1528,7 +1525,7 @@ class Stream: def _fetch(self, elements: List[Element], *, fetch_original: bool = False, announce_session: bool = False): # Assert consistency for the fetch elements - self._pipeline.assert_consistent(elements) + _pipeline.assert_consistent(self._context, elements) # Construct queues, enqueue and run # |