summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTristan van Berkom <tristan@codethink.co.uk>2020-12-23 17:21:08 +0900
committerTristan van Berkom <tristan@codethink.co.uk>2020-12-23 17:37:53 +0900
commit5ccacfc8cea692dc0296c2f335c8129656a3d450 (patch)
tree904a140c21f073aca49acaccf6aa971efe5fdcb2
parent082a5ca10195a37548197183de5c5ce89f1fb73b (diff)
downloadbuildstream-5ccacfc8cea692dc0296c2f335c8129656a3d450.tar.gz
_pipeline.py/_stream.py: Remove Pipeline object
This removes the stateful Pipeline object and leaves behind only a toolbox of functions for constructing element lists, such as _pipeline.get_selection() and _pipeline.except_elements(), and some helpers for asserting element states on lists of elements. This makes it easier for Stream to manage it's own internal state, so that Stream can more easily decide to operate without hard requiring a Project instance be available. This also adds type annotations to the new version of _pipeline.py.
-rw-r--r--src/buildstream/_pipeline.py466
-rw-r--r--src/buildstream/_stream.py35
2 files changed, 269 insertions, 232 deletions
diff --git a/src/buildstream/_pipeline.py b/src/buildstream/_pipeline.py
index d365fa85a..e1e6dcf39 100644
--- a/src/buildstream/_pipeline.py
+++ b/src/buildstream/_pipeline.py
@@ -1,5 +1,5 @@
#
-# Copyright (C) 2016-2018 Codethink Limited
+# Copyright (C) 2016-2020 Codethink Limited
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
@@ -20,234 +20,274 @@
# Tristan Maat <tristan.maat@codethink.co.uk>
import itertools
-from operator import itemgetter
-from collections import OrderedDict
+from collections import OrderedDict
+from operator import itemgetter
+from typing import List, Iterator
from pyroaring import BitMap # pylint: disable=no-name-in-module
-from ._exceptions import PipelineError
+from .element import Element
from .types import _PipelineSelection, _Scope
+from ._context import Context
+from ._exceptions import PipelineError
+
-# Pipeline()
+# dependencies()
+#
+# Generator function to iterate over the dependencies of multiple
+# targets in the specified scope, while guaranteeing that a given
+# element is never yielded more than once.
#
# Args:
-# project (Project): The Project object
-# context (Context): The Context object
-# artifacts (Context): The ArtifactCache object
+# targets: The target Elements to loop over
+# scope: An integer value from the _Scope enum, the scope to iterate over
+# recurse: Whether to recurse into dependencies
#
-class Pipeline:
- def __init__(self, context, project, artifacts):
- self._context = context # The Context
- self._project = project # The toplevel project
- self._artifacts = artifacts # The artifact cache
+# Yields:
+# Elements in the scope of the specified target elements
+#
+def dependencies(targets: List[Element], scope: int, *, recurse: bool = True) -> Iterator[Element]:
+ # Keep track of 'visited' in this scope, so that all targets
+ # share the same context.
+ visited = (BitMap(), BitMap())
- # dependencies()
- #
- # Generator function to iterate over elements and optionally
- # also iterate over sources.
- #
- # Args:
- # targets (list of Element): The target Elements to loop over
- # scope (_Scope): The scope to iterate over
- # recurse (bool): Whether to recurse into dependencies
- #
- def dependencies(self, targets, scope, *, recurse=True):
- # Keep track of 'visited' in this scope, so that all targets
- # share the same context.
- visited = (BitMap(), BitMap())
+ for target in targets:
+ for element in target._dependencies(scope, recurse=recurse, visited=visited):
+ yield element
- for target in targets:
- for element in target._dependencies(scope, recurse=recurse, visited=visited):
- yield element
- # plan()
- #
- # Generator function to iterate over only the elements
- # which are required to build the pipeline target, omitting
- # cached elements. The elements are yielded in a depth sorted
- # ordering for optimal build plans
- #
- # Args:
- # elements (list of Element): List of target elements to plan
- #
- # Returns:
- # (list of Element): A depth sorted list of the build plan
- #
- def plan(self, elements):
+# get_selection()
+#
+# Gets a full list of elements based on a toplevel
+# list of element targets
+#
+# Various commands define a --deps option to specify what elements to
+# use in the result, this function reports a list that is appropriate for
+# the selected option.
+#
+# Args:
+# context: The invocation context
+# targets: The target Elements
+# mode: A value from PipelineSelection enumeration
+# silent: Whether to silence messages
+#
+# Returns:
+# A list of Elements appropriate for the specified selection mode
+#
+def get_selection(context: Context, targets: List[Element], mode: str, *, silent: bool = True) -> List[Element]:
+ def redirect_and_log() -> List[Element]:
+ # Redirect and log if permitted
+ elements: List[Element] = []
+ for t in targets:
+ new_elm = t._get_source_element()
+ if new_elm != t and not silent:
+ context.messenger.info("Element '{}' redirected to '{}'".format(t.name, new_elm.name))
+ if new_elm not in elements:
+ elements.append(new_elm)
+ return elements
+
+ def plan() -> List[Element]:
# Keep locally cached elements in the plan if remote artifact cache is used
# to allow pulling artifact with strict cache key, if available.
- plan_cached = not self._context.get_strict() and self._artifacts.has_fetch_remotes()
+ plan_cached = not context.get_strict() and context.artifactcache.has_fetch_remotes()
+ return _Planner().plan(targets, plan_cached)
- return _Planner().plan(elements, plan_cached)
-
- # get_selection()
- #
- # Gets a full list of elements based on a toplevel
- # list of element targets
- #
- # Args:
- # targets (list of Element): The target Elements
- # mode (_PipelineSelection): The PipelineSelection mode
- #
- # Various commands define a --deps option to specify what elements to
- # use in the result, this function reports a list that is appropriate for
- # the selected option.
- #
- def get_selection(self, targets, mode, *, silent=True):
- def redirect_and_log():
- # Redirect and log if permitted
- elements = []
- for t in targets:
- new_elm = t._get_source_element()
- if new_elm != t and not silent:
- self._context.messenger.info("Element '{}' redirected to '{}'".format(t.name, new_elm.name))
- if new_elm not in elements:
- elements.append(new_elm)
- return elements
-
- # Work around python not having a switch statement; this is
- # much clearer than the if/elif/else block we used to have.
- #
- # Note that the lambda is necessary so that we don't evaluate
- # all possible values at run time; that would be slow.
- return {
- _PipelineSelection.NONE: lambda: targets,
- _PipelineSelection.REDIRECT: redirect_and_log,
- _PipelineSelection.PLAN: lambda: self.plan(targets),
- _PipelineSelection.ALL: lambda: list(self.dependencies(targets, _Scope.ALL)),
- _PipelineSelection.BUILD: lambda: list(self.dependencies(targets, _Scope.BUILD)),
- _PipelineSelection.RUN: lambda: list(self.dependencies(targets, _Scope.RUN)),
- }[mode]()
-
- # except_elements():
- #
- # Return what we are left with after the intersection between
- # excepted and target elements and their unique dependencies is
- # gone.
- #
- # Args:
- # targets (list of Element): List of toplevel targetted elements
- # elements (list of Element): The list to remove elements from
- # except_targets (list of Element): List of toplevel except targets
- #
- # Returns:
- # (list of Element): The elements list with the intersected
- # exceptions removed
- #
- def except_elements(self, targets, elements, except_targets):
- if not except_targets:
- return elements
-
- targeted = list(self.dependencies(targets, _Scope.ALL))
- visited = []
-
- def find_intersection(element):
- if element in visited:
- return
- visited.append(element)
-
- # Intersection elements are those that are also in
- # 'targeted', as long as we don't recurse into them.
- if element in targeted:
- yield element
- else:
- for dep in element._dependencies(_Scope.ALL, recurse=False):
- yield from find_intersection(dep)
-
- # Build a list of 'intersection' elements, i.e. the set of
- # elements that lie on the border closest to excepted elements
- # between excepted and target elements.
- intersection = list(itertools.chain.from_iterable(find_intersection(element) for element in except_targets))
-
- # Now use this set of elements to traverse the targeted
- # elements, except 'intersection' elements and their unique
- # dependencies.
- queue = []
- visited = []
-
- queue.extend(targets)
- while queue:
- element = queue.pop()
- if element in visited or element in intersection:
- continue
- visited.append(element)
-
- queue.extend(element._dependencies(_Scope.ALL, recurse=False))
-
- # That looks like a lot, but overall we only traverse (part
- # of) the graph twice. This could be reduced to once if we
- # kept track of parent elements, but is probably not
- # significant.
-
- # Ensure that we return elements in the same order they were
- # in before.
- return [element for element in elements if element in visited]
-
- # assert_consistent()
- #
- # Asserts that the given list of elements are in a consistent state, that
- # is to say that all sources are consistent and can at least be fetched.
+ # Work around python not having a switch statement; this is
+ # much clearer than the if/elif/else block we used to have.
#
- # Consequently it also means that cache keys can be resolved.
- #
- def assert_consistent(self, elements):
- inconsistent = []
- inconsistent_workspaced = []
- with self._context.messenger.timed_activity("Checking sources"):
- for element in elements:
- if not element._has_all_sources_resolved():
- if element._get_workspace():
- inconsistent_workspaced.append(element)
- else:
- inconsistent.append(element)
-
- if inconsistent:
- detail = "Exact versions are missing for the following elements:\n\n"
- for element in inconsistent:
- detail += " Element: {} is inconsistent\n".format(element._get_full_name())
- for source in element.sources():
- if not source.is_resolved():
- detail += " {} is missing ref\n".format(source)
- detail += "\n"
- detail += "Try tracking these elements first with `bst source track`\n"
-
- raise PipelineError("Inconsistent pipeline", detail=detail, reason="inconsistent-pipeline")
-
- if inconsistent_workspaced:
- detail = "Some workspaces exist but are not closed\n" + "Try closing them with `bst workspace close`\n\n"
- for element in inconsistent_workspaced:
- detail += " " + element._get_full_name() + "\n"
- raise PipelineError("Inconsistent pipeline", detail=detail, reason="inconsistent-pipeline-workspaced")
-
- # assert_sources_cached()
- #
- # Asserts that sources for the given list of elements are cached.
- #
- # Args:
- # elements (list): The list of elements
- #
- def assert_sources_cached(self, elements):
- uncached = []
- with self._context.messenger.timed_activity("Checking sources"):
- for element in elements:
- if element._fetch_needed():
- uncached.append(element)
-
- if uncached:
- detail = "Sources are not cached for the following elements:\n\n"
- for element in uncached:
- detail += " Following sources for element: {} are not cached:\n".format(element._get_full_name())
- for source in element.sources():
- if not source._is_cached():
- detail += " {}\n".format(source)
- detail += "\n"
- detail += (
- "Try fetching these elements first with `bst source fetch`,\n"
- + "or run this command with `--fetch` option\n"
- )
-
- raise PipelineError("Uncached sources", detail=detail, reason="uncached-sources")
+ # Note that the lambda is necessary so that we don't evaluate
+ # all possible values at run time; that would be slow.
+ return {
+ _PipelineSelection.NONE: lambda: targets,
+ _PipelineSelection.REDIRECT: redirect_and_log,
+ _PipelineSelection.PLAN: plan,
+ _PipelineSelection.ALL: lambda: list(dependencies(targets, _Scope.ALL)),
+ _PipelineSelection.BUILD: lambda: list(dependencies(targets, _Scope.BUILD)),
+ _PipelineSelection.RUN: lambda: list(dependencies(targets, _Scope.RUN)),
+ }[mode]()
+
+
+# except_elements():
+#
+# This function calculates the intersection of the `except_targets`
+# element dependencies and the `targets` dependencies, and removes
+# that intersection from the `elements` list, returning the result.
+#
+# Args:
+# targets: List of toplevel targetted elements
+# elements: The list to remove elements from
+# except_targets: List of toplevel except targets
+#
+# Returns:
+# The elements list with the intersected exceptions removed
+#
+# Important notes on the behavior
+# ===============================
+#
+# * Except elements can be completely outside of the scope
+# of targets.
+#
+# * When the dependencies of except elements intersect with
+# dependencies of targets, those dependencies are removed
+# from the result.
+#
+# * If a target is found within the intersection of excepted
+# elements, that target and it's dependencies are considered
+# exempt from the exception intersection.
+#
+# Example:
+#
+# (t1) (e1)
+# / \ /
+# (o) (o) ( )
+# / \ / \
+# (o) (x) ( )
+# \ / \
+# (o) (x) ( )
+# \ /
+# (x)
+# / \
+# (x) (t2)
+# / \ / \
+# (x) (x) (o)
+# / \
+# (o) (o)
+#
+# Here we have a mockup graph with 2 target elements (t1) and (t2),
+# and one except element (e1) which lies outside of the graph.
+#
+# - ( ) elements are ignored, they were never in the element list
+# - (o) elements will be included in the result
+# - (x) elements are removed from the graph
+#
+# Note how (t2) reintroduces portions of the graph which were otherwise
+# tainted by being depended on indirectly by the (e1) except element.
+#
+def except_elements(targets: List[Element], elements: List[Element], except_targets: List[Element]) -> List[Element]:
+ if not except_targets:
+ return elements
+
+ targeted: List[Element] = list(dependencies(targets, _Scope.ALL))
+ visited: List[Element] = []
+
+ def find_intersection(element: Element) -> Iterator[Element]:
+ if element in visited:
+ return
+ visited.append(element)
+
+ # Intersection elements are those that are also in
+ # 'targeted', as long as we don't recurse into them.
+ if element in targeted:
+ yield element
+ else:
+ for dep in element._dependencies(_Scope.ALL, recurse=False):
+ yield from find_intersection(dep)
+
+ # Build a list of 'intersection' elements, i.e. the set of
+ # elements that lie on the border closest to excepted elements
+ # between excepted and target elements.
+ intersection = list(itertools.chain.from_iterable(find_intersection(element) for element in except_targets))
+
+ # Now use this set of elements to traverse the targeted
+ # elements, except 'intersection' elements and their unique
+ # dependencies.
+ queue = []
+ visited = []
+
+ queue.extend(targets)
+ while queue:
+ element = queue.pop()
+ if element in visited or element in intersection:
+ continue
+ visited.append(element)
+
+ queue.extend(element._dependencies(_Scope.ALL, recurse=False))
+
+ # That looks like a lot, but overall we only traverse (part
+ # of) the graph twice. This could be reduced to once if we
+ # kept track of parent elements, but is probably not
+ # significant.
+
+ # Ensure that we return elements in the same order they were
+ # in before.
+ return [element for element in elements if element in visited]
+
+
+# assert_consistent()
+#
+# Asserts that the given list of elements are in a consistent state, that
+# is to say that all sources are consistent and can at least be fetched.
+#
+# Consequently it also means that cache keys can be resolved.
+#
+# Args:
+# context: The invocation context
+# elements: The elements to assert consistency on
+#
+# Raises:
+# PipelineError: If the elements are inconsistent.
+#
+def assert_consistent(context: Context, elements: List[Element]) -> None:
+ inconsistent = []
+ inconsistent_workspaced = []
+ with context.messenger.timed_activity("Checking sources"):
+ for element in elements:
+ if not element._has_all_sources_resolved():
+ if element._get_workspace():
+ inconsistent_workspaced.append(element)
+ else:
+ inconsistent.append(element)
+
+ if inconsistent:
+ detail = "Exact versions are missing for the following elements:\n\n"
+ for element in inconsistent:
+ detail += " Element: {} is inconsistent\n".format(element._get_full_name())
+ for source in element.sources():
+ if not source.is_resolved():
+ detail += " {} is missing ref\n".format(source)
+ detail += "\n"
+ detail += "Try tracking these elements first with `bst source track`\n"
+ raise PipelineError("Inconsistent pipeline", detail=detail, reason="inconsistent-pipeline")
+
+ if inconsistent_workspaced:
+ detail = "Some workspaces exist but are not closed\n" + "Try closing them with `bst workspace close`\n\n"
+ for element in inconsistent_workspaced:
+ detail += " " + element._get_full_name() + "\n"
+ raise PipelineError("Inconsistent pipeline", detail=detail, reason="inconsistent-pipeline-workspaced")
+
+
+# assert_sources_cached()
+#
+# Asserts that sources for the given list of elements are cached.
+#
+# Args:
+# context: The invocation context
+# elements: The elements to assert cached source state for
+#
+# Raises:
+# PipelineError: If the elements have uncached sources
+#
+def assert_sources_cached(context: Context, elements: List[Element]):
+ uncached = []
+ with context.messenger.timed_activity("Checking sources"):
+ for element in elements:
+ if element._fetch_needed():
+ uncached.append(element)
+
+ if uncached:
+ detail = "Sources are not cached for the following elements:\n\n"
+ for element in uncached:
+ detail += " Following sources for element: {} are not cached:\n".format(element._get_full_name())
+ for source in element.sources():
+ if not source._is_cached():
+ detail += " {}\n".format(source)
+ detail += "\n"
+ detail += (
+ "Try fetching these elements first with `bst source fetch`,\n"
+ + "or run this command with `--fetch` option\n"
+ )
+ raise PipelineError("Uncached sources", detail=detail, reason="uncached-sources")
# _Planner()
diff --git a/src/buildstream/_stream.py b/src/buildstream/_stream.py
index 09735678b..35cb230ed 100644
--- a/src/buildstream/_stream.py
+++ b/src/buildstream/_stream.py
@@ -1,5 +1,5 @@
#
-# Copyright (C) 2018 Codethink Limited
+# Copyright (C) 2020 Codethink Limited
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
@@ -45,13 +45,12 @@ from ._scheduler import (
ArtifactPushQueue,
)
from .element import Element
-from ._pipeline import Pipeline
from ._profile import Topics, PROFILER
from ._project import ProjectRefStorage
from ._state import State
from .types import _KeyStrength, _PipelineSelection, _Scope
from .plugin import Plugin
-from . import utils, _yaml, _site
+from . import utils, _yaml, _site, _pipeline
# Stream()
@@ -86,7 +85,6 @@ class Stream:
self._elementsourcescache = None
self._sourcecache = None
self._project = None
- self._pipeline = None
self._state = State(session_start) # Owned by Stream, used by Core to set state
self._notification_queue = deque()
@@ -127,7 +125,6 @@ class Stream:
assert self._project is None
self._project = project
self._project.load_context.set_fetch_subprojects(self._fetch_subprojects)
- self._pipeline = Pipeline(self._context, project, self._artifacts)
# load_selection()
#
@@ -220,7 +217,7 @@ class Stream:
self._enqueue_plan(plan)
self._run()
- missing_deps = [dep for dep in self._pipeline.dependencies([element], scope) if not dep._cached()]
+ missing_deps = [dep for dep in _pipeline.dependencies([element], scope) if not dep._cached()]
if missing_deps:
raise StreamError(
"Elements need to be built or downloaded before staging a shell environment",
@@ -250,7 +247,7 @@ class Stream:
# Ensure we have our sources if we are launching a build shell
if scope == _Scope.BUILD and not usebuildtree:
self._fetch([element])
- self._pipeline.assert_sources_cached([element])
+ _pipeline.assert_sources_cached(self._context, [element])
return element._shell(
scope, mounts=mounts, isolate=isolate, prompt=prompt(element), command=command, usebuildtree=usebuildtree
@@ -286,7 +283,7 @@ class Stream:
)
# Assert that the elements are consistent
- self._pipeline.assert_consistent(elements)
+ _pipeline.assert_consistent(self._context, elements)
if all(project.remote_execution_specs for project in self._context.get_projects()):
# Remote execution is configured for all projects.
@@ -411,7 +408,7 @@ class Stream:
if not self._sourcecache.has_push_remotes():
raise StreamError("No source caches available for pushing sources")
- self._pipeline.assert_consistent(elements)
+ _pipeline.assert_consistent(self._context, elements)
self._add_queue(FetchQueue(self._scheduler))
@@ -451,7 +448,7 @@ class Stream:
if not self._artifacts.has_fetch_remotes():
raise StreamError("No artifact caches available for pulling artifacts")
- self._pipeline.assert_consistent(elements)
+ _pipeline.assert_consistent(self._context, elements)
self._scheduler.clear_queues()
self._add_queue(PullQueue(self._scheduler))
self._enqueue_plan(elements)
@@ -492,7 +489,7 @@ class Stream:
if not self._artifacts.has_push_remotes():
raise StreamError("No artifact caches available for pushing artifacts")
- self._pipeline.assert_consistent(elements)
+ _pipeline.assert_consistent(self._context, elements)
self._scheduler.clear_queues()
self._add_queue(PullQueue(self._scheduler))
@@ -748,7 +745,7 @@ class Stream:
# Assert all sources are cached in the source dir
self._fetch(elements)
- self._pipeline.assert_sources_cached(elements)
+ _pipeline.assert_sources_cached(self._context, elements)
# Stage all sources determined by scope
try:
@@ -1303,11 +1300,11 @@ class Stream:
track_selected = []
for project, project_elements in track_projects.items():
- selected = self._pipeline.get_selection(project_elements, selection)
+ selected = _pipeline.get_selection(self._context, project_elements, selection)
selected = self._track_cross_junction_filter(project, selected, cross_junctions)
track_selected.extend(selected)
- return self._pipeline.except_elements(elements, track_selected, except_elements)
+ return _pipeline.except_elements(elements, track_selected, except_elements)
# _track_cross_junction_filter()
#
@@ -1413,8 +1410,8 @@ class Stream:
# Now move on to loading primary selection.
#
self._resolve_elements(self.targets)
- selected = self._pipeline.get_selection(self.targets, selection, silent=False)
- selected = self._pipeline.except_elements(self.targets, selected, except_elements)
+ selected = _pipeline.get_selection(self._context, self.targets, selection, silent=False)
+ selected = _pipeline.except_elements(self.targets, selected, except_elements)
if selection == _PipelineSelection.PLAN and dynamic_plan:
# We use a dynamic build plan, only request artifacts of top-level targets,
@@ -1446,7 +1443,7 @@ class Stream:
# to happen, even for large projects (tested with the Debian stack). Although,
# if it does become a problem we may have to set the recursion limit to a
# greater value.
- for element in self._pipeline.dependencies(targets, _Scope.ALL):
+ for element in _pipeline.dependencies(targets, _Scope.ALL):
# Determine initial element state.
element._initialize_state()
@@ -1501,7 +1498,7 @@ class Stream:
# Inform the frontend of the full list of elements
# and the list of elements which will be processed in this run
#
- self.total_elements = list(self._pipeline.dependencies(self.targets, _Scope.ALL))
+ self.total_elements = list(_pipeline.dependencies(self.targets, _Scope.ALL))
if announce_session and self._session_start_callback is not None:
self._session_start_callback()
@@ -1528,7 +1525,7 @@ class Stream:
def _fetch(self, elements: List[Element], *, fetch_original: bool = False, announce_session: bool = False):
# Assert consistency for the fetch elements
- self._pipeline.assert_consistent(elements)
+ _pipeline.assert_consistent(self._context, elements)
# Construct queues, enqueue and run
#