summaryrefslogtreecommitdiff
path: root/src/buildstream/_pipeline.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/buildstream/_pipeline.py')
-rw-r--r--src/buildstream/_pipeline.py466
1 files changed, 253 insertions, 213 deletions
diff --git a/src/buildstream/_pipeline.py b/src/buildstream/_pipeline.py
index d365fa85a..e1e6dcf39 100644
--- a/src/buildstream/_pipeline.py
+++ b/src/buildstream/_pipeline.py
@@ -1,5 +1,5 @@
#
-# Copyright (C) 2016-2018 Codethink Limited
+# Copyright (C) 2016-2020 Codethink Limited
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
@@ -20,234 +20,274 @@
# Tristan Maat <tristan.maat@codethink.co.uk>
import itertools
-from operator import itemgetter
-from collections import OrderedDict
+from collections import OrderedDict
+from operator import itemgetter
+from typing import List, Iterator
from pyroaring import BitMap # pylint: disable=no-name-in-module
-from ._exceptions import PipelineError
+from .element import Element
from .types import _PipelineSelection, _Scope
+from ._context import Context
+from ._exceptions import PipelineError
+
-# Pipeline()
+# dependencies()
+#
+# Generator function to iterate over the dependencies of multiple
+# targets in the specified scope, while guaranteeing that a given
+# element is never yielded more than once.
#
# Args:
-# project (Project): The Project object
-# context (Context): The Context object
-# artifacts (Context): The ArtifactCache object
+# targets: The target Elements to loop over
+# scope: An integer value from the _Scope enum, the scope to iterate over
+# recurse: Whether to recurse into dependencies
#
-class Pipeline:
- def __init__(self, context, project, artifacts):
- self._context = context # The Context
- self._project = project # The toplevel project
- self._artifacts = artifacts # The artifact cache
+# Yields:
+# Elements in the scope of the specified target elements
+#
+def dependencies(targets: List[Element], scope: int, *, recurse: bool = True) -> Iterator[Element]:
+ # Keep track of 'visited' in this scope, so that all targets
+ # share the same context.
+ visited = (BitMap(), BitMap())
- # dependencies()
- #
- # Generator function to iterate over elements and optionally
- # also iterate over sources.
- #
- # Args:
- # targets (list of Element): The target Elements to loop over
- # scope (_Scope): The scope to iterate over
- # recurse (bool): Whether to recurse into dependencies
- #
- def dependencies(self, targets, scope, *, recurse=True):
- # Keep track of 'visited' in this scope, so that all targets
- # share the same context.
- visited = (BitMap(), BitMap())
+ for target in targets:
+ for element in target._dependencies(scope, recurse=recurse, visited=visited):
+ yield element
- for target in targets:
- for element in target._dependencies(scope, recurse=recurse, visited=visited):
- yield element
- # plan()
- #
- # Generator function to iterate over only the elements
- # which are required to build the pipeline target, omitting
- # cached elements. The elements are yielded in a depth sorted
- # ordering for optimal build plans
- #
- # Args:
- # elements (list of Element): List of target elements to plan
- #
- # Returns:
- # (list of Element): A depth sorted list of the build plan
- #
- def plan(self, elements):
+# get_selection()
+#
+# Gets a full list of elements based on a toplevel
+# list of element targets
+#
+# Various commands define a --deps option to specify what elements to
+# use in the result, this function reports a list that is appropriate for
+# the selected option.
+#
+# Args:
+# context: The invocation context
+# targets: The target Elements
+# mode: A value from PipelineSelection enumeration
+# silent: Whether to silence messages
+#
+# Returns:
+# A list of Elements appropriate for the specified selection mode
+#
+def get_selection(context: Context, targets: List[Element], mode: str, *, silent: bool = True) -> List[Element]:
+ def redirect_and_log() -> List[Element]:
+ # Redirect and log if permitted
+ elements: List[Element] = []
+ for t in targets:
+ new_elm = t._get_source_element()
+ if new_elm != t and not silent:
+ context.messenger.info("Element '{}' redirected to '{}'".format(t.name, new_elm.name))
+ if new_elm not in elements:
+ elements.append(new_elm)
+ return elements
+
+ def plan() -> List[Element]:
# Keep locally cached elements in the plan if remote artifact cache is used
# to allow pulling artifact with strict cache key, if available.
- plan_cached = not self._context.get_strict() and self._artifacts.has_fetch_remotes()
+ plan_cached = not context.get_strict() and context.artifactcache.has_fetch_remotes()
+ return _Planner().plan(targets, plan_cached)
- return _Planner().plan(elements, plan_cached)
-
- # get_selection()
- #
- # Gets a full list of elements based on a toplevel
- # list of element targets
- #
- # Args:
- # targets (list of Element): The target Elements
- # mode (_PipelineSelection): The PipelineSelection mode
- #
- # Various commands define a --deps option to specify what elements to
- # use in the result, this function reports a list that is appropriate for
- # the selected option.
- #
- def get_selection(self, targets, mode, *, silent=True):
- def redirect_and_log():
- # Redirect and log if permitted
- elements = []
- for t in targets:
- new_elm = t._get_source_element()
- if new_elm != t and not silent:
- self._context.messenger.info("Element '{}' redirected to '{}'".format(t.name, new_elm.name))
- if new_elm not in elements:
- elements.append(new_elm)
- return elements
-
- # Work around python not having a switch statement; this is
- # much clearer than the if/elif/else block we used to have.
- #
- # Note that the lambda is necessary so that we don't evaluate
- # all possible values at run time; that would be slow.
- return {
- _PipelineSelection.NONE: lambda: targets,
- _PipelineSelection.REDIRECT: redirect_and_log,
- _PipelineSelection.PLAN: lambda: self.plan(targets),
- _PipelineSelection.ALL: lambda: list(self.dependencies(targets, _Scope.ALL)),
- _PipelineSelection.BUILD: lambda: list(self.dependencies(targets, _Scope.BUILD)),
- _PipelineSelection.RUN: lambda: list(self.dependencies(targets, _Scope.RUN)),
- }[mode]()
-
- # except_elements():
- #
- # Return what we are left with after the intersection between
- # excepted and target elements and their unique dependencies is
- # gone.
- #
- # Args:
- # targets (list of Element): List of toplevel targetted elements
- # elements (list of Element): The list to remove elements from
- # except_targets (list of Element): List of toplevel except targets
- #
- # Returns:
- # (list of Element): The elements list with the intersected
- # exceptions removed
- #
- def except_elements(self, targets, elements, except_targets):
- if not except_targets:
- return elements
-
- targeted = list(self.dependencies(targets, _Scope.ALL))
- visited = []
-
- def find_intersection(element):
- if element in visited:
- return
- visited.append(element)
-
- # Intersection elements are those that are also in
- # 'targeted', as long as we don't recurse into them.
- if element in targeted:
- yield element
- else:
- for dep in element._dependencies(_Scope.ALL, recurse=False):
- yield from find_intersection(dep)
-
- # Build a list of 'intersection' elements, i.e. the set of
- # elements that lie on the border closest to excepted elements
- # between excepted and target elements.
- intersection = list(itertools.chain.from_iterable(find_intersection(element) for element in except_targets))
-
- # Now use this set of elements to traverse the targeted
- # elements, except 'intersection' elements and their unique
- # dependencies.
- queue = []
- visited = []
-
- queue.extend(targets)
- while queue:
- element = queue.pop()
- if element in visited or element in intersection:
- continue
- visited.append(element)
-
- queue.extend(element._dependencies(_Scope.ALL, recurse=False))
-
- # That looks like a lot, but overall we only traverse (part
- # of) the graph twice. This could be reduced to once if we
- # kept track of parent elements, but is probably not
- # significant.
-
- # Ensure that we return elements in the same order they were
- # in before.
- return [element for element in elements if element in visited]
-
- # assert_consistent()
- #
- # Asserts that the given list of elements are in a consistent state, that
- # is to say that all sources are consistent and can at least be fetched.
+ # Work around python not having a switch statement; this is
+ # much clearer than the if/elif/else block we used to have.
#
- # Consequently it also means that cache keys can be resolved.
- #
- def assert_consistent(self, elements):
- inconsistent = []
- inconsistent_workspaced = []
- with self._context.messenger.timed_activity("Checking sources"):
- for element in elements:
- if not element._has_all_sources_resolved():
- if element._get_workspace():
- inconsistent_workspaced.append(element)
- else:
- inconsistent.append(element)
-
- if inconsistent:
- detail = "Exact versions are missing for the following elements:\n\n"
- for element in inconsistent:
- detail += " Element: {} is inconsistent\n".format(element._get_full_name())
- for source in element.sources():
- if not source.is_resolved():
- detail += " {} is missing ref\n".format(source)
- detail += "\n"
- detail += "Try tracking these elements first with `bst source track`\n"
-
- raise PipelineError("Inconsistent pipeline", detail=detail, reason="inconsistent-pipeline")
-
- if inconsistent_workspaced:
- detail = "Some workspaces exist but are not closed\n" + "Try closing them with `bst workspace close`\n\n"
- for element in inconsistent_workspaced:
- detail += " " + element._get_full_name() + "\n"
- raise PipelineError("Inconsistent pipeline", detail=detail, reason="inconsistent-pipeline-workspaced")
-
- # assert_sources_cached()
- #
- # Asserts that sources for the given list of elements are cached.
- #
- # Args:
- # elements (list): The list of elements
- #
- def assert_sources_cached(self, elements):
- uncached = []
- with self._context.messenger.timed_activity("Checking sources"):
- for element in elements:
- if element._fetch_needed():
- uncached.append(element)
-
- if uncached:
- detail = "Sources are not cached for the following elements:\n\n"
- for element in uncached:
- detail += " Following sources for element: {} are not cached:\n".format(element._get_full_name())
- for source in element.sources():
- if not source._is_cached():
- detail += " {}\n".format(source)
- detail += "\n"
- detail += (
- "Try fetching these elements first with `bst source fetch`,\n"
- + "or run this command with `--fetch` option\n"
- )
-
- raise PipelineError("Uncached sources", detail=detail, reason="uncached-sources")
+ # Note that the lambda is necessary so that we don't evaluate
+ # all possible values at run time; that would be slow.
+ return {
+ _PipelineSelection.NONE: lambda: targets,
+ _PipelineSelection.REDIRECT: redirect_and_log,
+ _PipelineSelection.PLAN: plan,
+ _PipelineSelection.ALL: lambda: list(dependencies(targets, _Scope.ALL)),
+ _PipelineSelection.BUILD: lambda: list(dependencies(targets, _Scope.BUILD)),
+ _PipelineSelection.RUN: lambda: list(dependencies(targets, _Scope.RUN)),
+ }[mode]()
+
+
+# except_elements():
+#
+# This function calculates the intersection of the `except_targets`
+# element dependencies and the `targets` dependencies, and removes
+# that intersection from the `elements` list, returning the result.
+#
+# Args:
+# targets: List of toplevel targetted elements
+# elements: The list to remove elements from
+# except_targets: List of toplevel except targets
+#
+# Returns:
+# The elements list with the intersected exceptions removed
+#
+# Important notes on the behavior
+# ===============================
+#
+# * Except elements can be completely outside of the scope
+# of targets.
+#
+# * When the dependencies of except elements intersect with
+# dependencies of targets, those dependencies are removed
+# from the result.
+#
+# * If a target is found within the intersection of excepted
+# elements, that target and it's dependencies are considered
+# exempt from the exception intersection.
+#
+# Example:
+#
+# (t1) (e1)
+# / \ /
+# (o) (o) ( )
+# / \ / \
+# (o) (x) ( )
+# \ / \
+# (o) (x) ( )
+# \ /
+# (x)
+# / \
+# (x) (t2)
+# / \ / \
+# (x) (x) (o)
+# / \
+# (o) (o)
+#
+# Here we have a mockup graph with 2 target elements (t1) and (t2),
+# and one except element (e1) which lies outside of the graph.
+#
+# - ( ) elements are ignored, they were never in the element list
+# - (o) elements will be included in the result
+# - (x) elements are removed from the graph
+#
+# Note how (t2) reintroduces portions of the graph which were otherwise
+# tainted by being depended on indirectly by the (e1) except element.
+#
+def except_elements(targets: List[Element], elements: List[Element], except_targets: List[Element]) -> List[Element]:
+ if not except_targets:
+ return elements
+
+ targeted: List[Element] = list(dependencies(targets, _Scope.ALL))
+ visited: List[Element] = []
+
+ def find_intersection(element: Element) -> Iterator[Element]:
+ if element in visited:
+ return
+ visited.append(element)
+
+ # Intersection elements are those that are also in
+ # 'targeted', as long as we don't recurse into them.
+ if element in targeted:
+ yield element
+ else:
+ for dep in element._dependencies(_Scope.ALL, recurse=False):
+ yield from find_intersection(dep)
+
+ # Build a list of 'intersection' elements, i.e. the set of
+ # elements that lie on the border closest to excepted elements
+ # between excepted and target elements.
+ intersection = list(itertools.chain.from_iterable(find_intersection(element) for element in except_targets))
+
+ # Now use this set of elements to traverse the targeted
+ # elements, except 'intersection' elements and their unique
+ # dependencies.
+ queue = []
+ visited = []
+
+ queue.extend(targets)
+ while queue:
+ element = queue.pop()
+ if element in visited or element in intersection:
+ continue
+ visited.append(element)
+
+ queue.extend(element._dependencies(_Scope.ALL, recurse=False))
+
+ # That looks like a lot, but overall we only traverse (part
+ # of) the graph twice. This could be reduced to once if we
+ # kept track of parent elements, but is probably not
+ # significant.
+
+ # Ensure that we return elements in the same order they were
+ # in before.
+ return [element for element in elements if element in visited]
+
+
+# assert_consistent()
+#
+# Asserts that the given list of elements are in a consistent state, that
+# is to say that all sources are consistent and can at least be fetched.
+#
+# Consequently it also means that cache keys can be resolved.
+#
+# Args:
+# context: The invocation context
+# elements: The elements to assert consistency on
+#
+# Raises:
+# PipelineError: If the elements are inconsistent.
+#
+def assert_consistent(context: Context, elements: List[Element]) -> None:
+ inconsistent = []
+ inconsistent_workspaced = []
+ with context.messenger.timed_activity("Checking sources"):
+ for element in elements:
+ if not element._has_all_sources_resolved():
+ if element._get_workspace():
+ inconsistent_workspaced.append(element)
+ else:
+ inconsistent.append(element)
+
+ if inconsistent:
+ detail = "Exact versions are missing for the following elements:\n\n"
+ for element in inconsistent:
+ detail += " Element: {} is inconsistent\n".format(element._get_full_name())
+ for source in element.sources():
+ if not source.is_resolved():
+ detail += " {} is missing ref\n".format(source)
+ detail += "\n"
+ detail += "Try tracking these elements first with `bst source track`\n"
+ raise PipelineError("Inconsistent pipeline", detail=detail, reason="inconsistent-pipeline")
+
+ if inconsistent_workspaced:
+ detail = "Some workspaces exist but are not closed\n" + "Try closing them with `bst workspace close`\n\n"
+ for element in inconsistent_workspaced:
+ detail += " " + element._get_full_name() + "\n"
+ raise PipelineError("Inconsistent pipeline", detail=detail, reason="inconsistent-pipeline-workspaced")
+
+
+# assert_sources_cached()
+#
+# Asserts that sources for the given list of elements are cached.
+#
+# Args:
+# context: The invocation context
+# elements: The elements to assert cached source state for
+#
+# Raises:
+# PipelineError: If the elements have uncached sources
+#
+def assert_sources_cached(context: Context, elements: List[Element]):
+ uncached = []
+ with context.messenger.timed_activity("Checking sources"):
+ for element in elements:
+ if element._fetch_needed():
+ uncached.append(element)
+
+ if uncached:
+ detail = "Sources are not cached for the following elements:\n\n"
+ for element in uncached:
+ detail += " Following sources for element: {} are not cached:\n".format(element._get_full_name())
+ for source in element.sources():
+ if not source._is_cached():
+ detail += " {}\n".format(source)
+ detail += "\n"
+ detail += (
+ "Try fetching these elements first with `bst source fetch`,\n"
+ + "or run this command with `--fetch` option\n"
+ )
+ raise PipelineError("Uncached sources", detail=detail, reason="uncached-sources")
# _Planner()