diff options
Diffstat (limited to 'src/buildstream/_pipeline.py')
-rw-r--r-- | src/buildstream/_pipeline.py | 466 |
1 files changed, 253 insertions, 213 deletions
diff --git a/src/buildstream/_pipeline.py b/src/buildstream/_pipeline.py index d365fa85a..e1e6dcf39 100644 --- a/src/buildstream/_pipeline.py +++ b/src/buildstream/_pipeline.py @@ -1,5 +1,5 @@ # -# Copyright (C) 2016-2018 Codethink Limited +# Copyright (C) 2016-2020 Codethink Limited # # This program is free software; you can redistribute it and/or # modify it under the terms of the GNU Lesser General Public @@ -20,234 +20,274 @@ # Tristan Maat <tristan.maat@codethink.co.uk> import itertools -from operator import itemgetter -from collections import OrderedDict +from collections import OrderedDict +from operator import itemgetter +from typing import List, Iterator from pyroaring import BitMap # pylint: disable=no-name-in-module -from ._exceptions import PipelineError +from .element import Element from .types import _PipelineSelection, _Scope +from ._context import Context +from ._exceptions import PipelineError + -# Pipeline() +# dependencies() +# +# Generator function to iterate over the dependencies of multiple +# targets in the specified scope, while guaranteeing that a given +# element is never yielded more than once. # # Args: -# project (Project): The Project object -# context (Context): The Context object -# artifacts (Context): The ArtifactCache object +# targets: The target Elements to loop over +# scope: An integer value from the _Scope enum, the scope to iterate over +# recurse: Whether to recurse into dependencies # -class Pipeline: - def __init__(self, context, project, artifacts): - self._context = context # The Context - self._project = project # The toplevel project - self._artifacts = artifacts # The artifact cache +# Yields: +# Elements in the scope of the specified target elements +# +def dependencies(targets: List[Element], scope: int, *, recurse: bool = True) -> Iterator[Element]: + # Keep track of 'visited' in this scope, so that all targets + # share the same context. + visited = (BitMap(), BitMap()) - # dependencies() - # - # Generator function to iterate over elements and optionally - # also iterate over sources. - # - # Args: - # targets (list of Element): The target Elements to loop over - # scope (_Scope): The scope to iterate over - # recurse (bool): Whether to recurse into dependencies - # - def dependencies(self, targets, scope, *, recurse=True): - # Keep track of 'visited' in this scope, so that all targets - # share the same context. - visited = (BitMap(), BitMap()) + for target in targets: + for element in target._dependencies(scope, recurse=recurse, visited=visited): + yield element - for target in targets: - for element in target._dependencies(scope, recurse=recurse, visited=visited): - yield element - # plan() - # - # Generator function to iterate over only the elements - # which are required to build the pipeline target, omitting - # cached elements. The elements are yielded in a depth sorted - # ordering for optimal build plans - # - # Args: - # elements (list of Element): List of target elements to plan - # - # Returns: - # (list of Element): A depth sorted list of the build plan - # - def plan(self, elements): +# get_selection() +# +# Gets a full list of elements based on a toplevel +# list of element targets +# +# Various commands define a --deps option to specify what elements to +# use in the result, this function reports a list that is appropriate for +# the selected option. +# +# Args: +# context: The invocation context +# targets: The target Elements +# mode: A value from PipelineSelection enumeration +# silent: Whether to silence messages +# +# Returns: +# A list of Elements appropriate for the specified selection mode +# +def get_selection(context: Context, targets: List[Element], mode: str, *, silent: bool = True) -> List[Element]: + def redirect_and_log() -> List[Element]: + # Redirect and log if permitted + elements: List[Element] = [] + for t in targets: + new_elm = t._get_source_element() + if new_elm != t and not silent: + context.messenger.info("Element '{}' redirected to '{}'".format(t.name, new_elm.name)) + if new_elm not in elements: + elements.append(new_elm) + return elements + + def plan() -> List[Element]: # Keep locally cached elements in the plan if remote artifact cache is used # to allow pulling artifact with strict cache key, if available. - plan_cached = not self._context.get_strict() and self._artifacts.has_fetch_remotes() + plan_cached = not context.get_strict() and context.artifactcache.has_fetch_remotes() + return _Planner().plan(targets, plan_cached) - return _Planner().plan(elements, plan_cached) - - # get_selection() - # - # Gets a full list of elements based on a toplevel - # list of element targets - # - # Args: - # targets (list of Element): The target Elements - # mode (_PipelineSelection): The PipelineSelection mode - # - # Various commands define a --deps option to specify what elements to - # use in the result, this function reports a list that is appropriate for - # the selected option. - # - def get_selection(self, targets, mode, *, silent=True): - def redirect_and_log(): - # Redirect and log if permitted - elements = [] - for t in targets: - new_elm = t._get_source_element() - if new_elm != t and not silent: - self._context.messenger.info("Element '{}' redirected to '{}'".format(t.name, new_elm.name)) - if new_elm not in elements: - elements.append(new_elm) - return elements - - # Work around python not having a switch statement; this is - # much clearer than the if/elif/else block we used to have. - # - # Note that the lambda is necessary so that we don't evaluate - # all possible values at run time; that would be slow. - return { - _PipelineSelection.NONE: lambda: targets, - _PipelineSelection.REDIRECT: redirect_and_log, - _PipelineSelection.PLAN: lambda: self.plan(targets), - _PipelineSelection.ALL: lambda: list(self.dependencies(targets, _Scope.ALL)), - _PipelineSelection.BUILD: lambda: list(self.dependencies(targets, _Scope.BUILD)), - _PipelineSelection.RUN: lambda: list(self.dependencies(targets, _Scope.RUN)), - }[mode]() - - # except_elements(): - # - # Return what we are left with after the intersection between - # excepted and target elements and their unique dependencies is - # gone. - # - # Args: - # targets (list of Element): List of toplevel targetted elements - # elements (list of Element): The list to remove elements from - # except_targets (list of Element): List of toplevel except targets - # - # Returns: - # (list of Element): The elements list with the intersected - # exceptions removed - # - def except_elements(self, targets, elements, except_targets): - if not except_targets: - return elements - - targeted = list(self.dependencies(targets, _Scope.ALL)) - visited = [] - - def find_intersection(element): - if element in visited: - return - visited.append(element) - - # Intersection elements are those that are also in - # 'targeted', as long as we don't recurse into them. - if element in targeted: - yield element - else: - for dep in element._dependencies(_Scope.ALL, recurse=False): - yield from find_intersection(dep) - - # Build a list of 'intersection' elements, i.e. the set of - # elements that lie on the border closest to excepted elements - # between excepted and target elements. - intersection = list(itertools.chain.from_iterable(find_intersection(element) for element in except_targets)) - - # Now use this set of elements to traverse the targeted - # elements, except 'intersection' elements and their unique - # dependencies. - queue = [] - visited = [] - - queue.extend(targets) - while queue: - element = queue.pop() - if element in visited or element in intersection: - continue - visited.append(element) - - queue.extend(element._dependencies(_Scope.ALL, recurse=False)) - - # That looks like a lot, but overall we only traverse (part - # of) the graph twice. This could be reduced to once if we - # kept track of parent elements, but is probably not - # significant. - - # Ensure that we return elements in the same order they were - # in before. - return [element for element in elements if element in visited] - - # assert_consistent() - # - # Asserts that the given list of elements are in a consistent state, that - # is to say that all sources are consistent and can at least be fetched. + # Work around python not having a switch statement; this is + # much clearer than the if/elif/else block we used to have. # - # Consequently it also means that cache keys can be resolved. - # - def assert_consistent(self, elements): - inconsistent = [] - inconsistent_workspaced = [] - with self._context.messenger.timed_activity("Checking sources"): - for element in elements: - if not element._has_all_sources_resolved(): - if element._get_workspace(): - inconsistent_workspaced.append(element) - else: - inconsistent.append(element) - - if inconsistent: - detail = "Exact versions are missing for the following elements:\n\n" - for element in inconsistent: - detail += " Element: {} is inconsistent\n".format(element._get_full_name()) - for source in element.sources(): - if not source.is_resolved(): - detail += " {} is missing ref\n".format(source) - detail += "\n" - detail += "Try tracking these elements first with `bst source track`\n" - - raise PipelineError("Inconsistent pipeline", detail=detail, reason="inconsistent-pipeline") - - if inconsistent_workspaced: - detail = "Some workspaces exist but are not closed\n" + "Try closing them with `bst workspace close`\n\n" - for element in inconsistent_workspaced: - detail += " " + element._get_full_name() + "\n" - raise PipelineError("Inconsistent pipeline", detail=detail, reason="inconsistent-pipeline-workspaced") - - # assert_sources_cached() - # - # Asserts that sources for the given list of elements are cached. - # - # Args: - # elements (list): The list of elements - # - def assert_sources_cached(self, elements): - uncached = [] - with self._context.messenger.timed_activity("Checking sources"): - for element in elements: - if element._fetch_needed(): - uncached.append(element) - - if uncached: - detail = "Sources are not cached for the following elements:\n\n" - for element in uncached: - detail += " Following sources for element: {} are not cached:\n".format(element._get_full_name()) - for source in element.sources(): - if not source._is_cached(): - detail += " {}\n".format(source) - detail += "\n" - detail += ( - "Try fetching these elements first with `bst source fetch`,\n" - + "or run this command with `--fetch` option\n" - ) - - raise PipelineError("Uncached sources", detail=detail, reason="uncached-sources") + # Note that the lambda is necessary so that we don't evaluate + # all possible values at run time; that would be slow. + return { + _PipelineSelection.NONE: lambda: targets, + _PipelineSelection.REDIRECT: redirect_and_log, + _PipelineSelection.PLAN: plan, + _PipelineSelection.ALL: lambda: list(dependencies(targets, _Scope.ALL)), + _PipelineSelection.BUILD: lambda: list(dependencies(targets, _Scope.BUILD)), + _PipelineSelection.RUN: lambda: list(dependencies(targets, _Scope.RUN)), + }[mode]() + + +# except_elements(): +# +# This function calculates the intersection of the `except_targets` +# element dependencies and the `targets` dependencies, and removes +# that intersection from the `elements` list, returning the result. +# +# Args: +# targets: List of toplevel targetted elements +# elements: The list to remove elements from +# except_targets: List of toplevel except targets +# +# Returns: +# The elements list with the intersected exceptions removed +# +# Important notes on the behavior +# =============================== +# +# * Except elements can be completely outside of the scope +# of targets. +# +# * When the dependencies of except elements intersect with +# dependencies of targets, those dependencies are removed +# from the result. +# +# * If a target is found within the intersection of excepted +# elements, that target and it's dependencies are considered +# exempt from the exception intersection. +# +# Example: +# +# (t1) (e1) +# / \ / +# (o) (o) ( ) +# / \ / \ +# (o) (x) ( ) +# \ / \ +# (o) (x) ( ) +# \ / +# (x) +# / \ +# (x) (t2) +# / \ / \ +# (x) (x) (o) +# / \ +# (o) (o) +# +# Here we have a mockup graph with 2 target elements (t1) and (t2), +# and one except element (e1) which lies outside of the graph. +# +# - ( ) elements are ignored, they were never in the element list +# - (o) elements will be included in the result +# - (x) elements are removed from the graph +# +# Note how (t2) reintroduces portions of the graph which were otherwise +# tainted by being depended on indirectly by the (e1) except element. +# +def except_elements(targets: List[Element], elements: List[Element], except_targets: List[Element]) -> List[Element]: + if not except_targets: + return elements + + targeted: List[Element] = list(dependencies(targets, _Scope.ALL)) + visited: List[Element] = [] + + def find_intersection(element: Element) -> Iterator[Element]: + if element in visited: + return + visited.append(element) + + # Intersection elements are those that are also in + # 'targeted', as long as we don't recurse into them. + if element in targeted: + yield element + else: + for dep in element._dependencies(_Scope.ALL, recurse=False): + yield from find_intersection(dep) + + # Build a list of 'intersection' elements, i.e. the set of + # elements that lie on the border closest to excepted elements + # between excepted and target elements. + intersection = list(itertools.chain.from_iterable(find_intersection(element) for element in except_targets)) + + # Now use this set of elements to traverse the targeted + # elements, except 'intersection' elements and their unique + # dependencies. + queue = [] + visited = [] + + queue.extend(targets) + while queue: + element = queue.pop() + if element in visited or element in intersection: + continue + visited.append(element) + + queue.extend(element._dependencies(_Scope.ALL, recurse=False)) + + # That looks like a lot, but overall we only traverse (part + # of) the graph twice. This could be reduced to once if we + # kept track of parent elements, but is probably not + # significant. + + # Ensure that we return elements in the same order they were + # in before. + return [element for element in elements if element in visited] + + +# assert_consistent() +# +# Asserts that the given list of elements are in a consistent state, that +# is to say that all sources are consistent and can at least be fetched. +# +# Consequently it also means that cache keys can be resolved. +# +# Args: +# context: The invocation context +# elements: The elements to assert consistency on +# +# Raises: +# PipelineError: If the elements are inconsistent. +# +def assert_consistent(context: Context, elements: List[Element]) -> None: + inconsistent = [] + inconsistent_workspaced = [] + with context.messenger.timed_activity("Checking sources"): + for element in elements: + if not element._has_all_sources_resolved(): + if element._get_workspace(): + inconsistent_workspaced.append(element) + else: + inconsistent.append(element) + + if inconsistent: + detail = "Exact versions are missing for the following elements:\n\n" + for element in inconsistent: + detail += " Element: {} is inconsistent\n".format(element._get_full_name()) + for source in element.sources(): + if not source.is_resolved(): + detail += " {} is missing ref\n".format(source) + detail += "\n" + detail += "Try tracking these elements first with `bst source track`\n" + raise PipelineError("Inconsistent pipeline", detail=detail, reason="inconsistent-pipeline") + + if inconsistent_workspaced: + detail = "Some workspaces exist but are not closed\n" + "Try closing them with `bst workspace close`\n\n" + for element in inconsistent_workspaced: + detail += " " + element._get_full_name() + "\n" + raise PipelineError("Inconsistent pipeline", detail=detail, reason="inconsistent-pipeline-workspaced") + + +# assert_sources_cached() +# +# Asserts that sources for the given list of elements are cached. +# +# Args: +# context: The invocation context +# elements: The elements to assert cached source state for +# +# Raises: +# PipelineError: If the elements have uncached sources +# +def assert_sources_cached(context: Context, elements: List[Element]): + uncached = [] + with context.messenger.timed_activity("Checking sources"): + for element in elements: + if element._fetch_needed(): + uncached.append(element) + + if uncached: + detail = "Sources are not cached for the following elements:\n\n" + for element in uncached: + detail += " Following sources for element: {} are not cached:\n".format(element._get_full_name()) + for source in element.sources(): + if not source._is_cached(): + detail += " {}\n".format(source) + detail += "\n" + detail += ( + "Try fetching these elements first with `bst source fetch`,\n" + + "or run this command with `--fetch` option\n" + ) + raise PipelineError("Uncached sources", detail=detail, reason="uncached-sources") # _Planner() |