summaryrefslogtreecommitdiff
path: root/src/buildstream/_stream.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/buildstream/_stream.py')
-rw-r--r--src/buildstream/_stream.py35
1 files changed, 16 insertions, 19 deletions
diff --git a/src/buildstream/_stream.py b/src/buildstream/_stream.py
index 09735678b..35cb230ed 100644
--- a/src/buildstream/_stream.py
+++ b/src/buildstream/_stream.py
@@ -1,5 +1,5 @@
#
-# Copyright (C) 2018 Codethink Limited
+# Copyright (C) 2020 Codethink Limited
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
@@ -45,13 +45,12 @@ from ._scheduler import (
ArtifactPushQueue,
)
from .element import Element
-from ._pipeline import Pipeline
from ._profile import Topics, PROFILER
from ._project import ProjectRefStorage
from ._state import State
from .types import _KeyStrength, _PipelineSelection, _Scope
from .plugin import Plugin
-from . import utils, _yaml, _site
+from . import utils, _yaml, _site, _pipeline
# Stream()
@@ -86,7 +85,6 @@ class Stream:
self._elementsourcescache = None
self._sourcecache = None
self._project = None
- self._pipeline = None
self._state = State(session_start) # Owned by Stream, used by Core to set state
self._notification_queue = deque()
@@ -127,7 +125,6 @@ class Stream:
assert self._project is None
self._project = project
self._project.load_context.set_fetch_subprojects(self._fetch_subprojects)
- self._pipeline = Pipeline(self._context, project, self._artifacts)
# load_selection()
#
@@ -220,7 +217,7 @@ class Stream:
self._enqueue_plan(plan)
self._run()
- missing_deps = [dep for dep in self._pipeline.dependencies([element], scope) if not dep._cached()]
+ missing_deps = [dep for dep in _pipeline.dependencies([element], scope) if not dep._cached()]
if missing_deps:
raise StreamError(
"Elements need to be built or downloaded before staging a shell environment",
@@ -250,7 +247,7 @@ class Stream:
# Ensure we have our sources if we are launching a build shell
if scope == _Scope.BUILD and not usebuildtree:
self._fetch([element])
- self._pipeline.assert_sources_cached([element])
+ _pipeline.assert_sources_cached(self._context, [element])
return element._shell(
scope, mounts=mounts, isolate=isolate, prompt=prompt(element), command=command, usebuildtree=usebuildtree
@@ -286,7 +283,7 @@ class Stream:
)
# Assert that the elements are consistent
- self._pipeline.assert_consistent(elements)
+ _pipeline.assert_consistent(self._context, elements)
if all(project.remote_execution_specs for project in self._context.get_projects()):
# Remote execution is configured for all projects.
@@ -411,7 +408,7 @@ class Stream:
if not self._sourcecache.has_push_remotes():
raise StreamError("No source caches available for pushing sources")
- self._pipeline.assert_consistent(elements)
+ _pipeline.assert_consistent(self._context, elements)
self._add_queue(FetchQueue(self._scheduler))
@@ -451,7 +448,7 @@ class Stream:
if not self._artifacts.has_fetch_remotes():
raise StreamError("No artifact caches available for pulling artifacts")
- self._pipeline.assert_consistent(elements)
+ _pipeline.assert_consistent(self._context, elements)
self._scheduler.clear_queues()
self._add_queue(PullQueue(self._scheduler))
self._enqueue_plan(elements)
@@ -492,7 +489,7 @@ class Stream:
if not self._artifacts.has_push_remotes():
raise StreamError("No artifact caches available for pushing artifacts")
- self._pipeline.assert_consistent(elements)
+ _pipeline.assert_consistent(self._context, elements)
self._scheduler.clear_queues()
self._add_queue(PullQueue(self._scheduler))
@@ -748,7 +745,7 @@ class Stream:
# Assert all sources are cached in the source dir
self._fetch(elements)
- self._pipeline.assert_sources_cached(elements)
+ _pipeline.assert_sources_cached(self._context, elements)
# Stage all sources determined by scope
try:
@@ -1303,11 +1300,11 @@ class Stream:
track_selected = []
for project, project_elements in track_projects.items():
- selected = self._pipeline.get_selection(project_elements, selection)
+ selected = _pipeline.get_selection(self._context, project_elements, selection)
selected = self._track_cross_junction_filter(project, selected, cross_junctions)
track_selected.extend(selected)
- return self._pipeline.except_elements(elements, track_selected, except_elements)
+ return _pipeline.except_elements(elements, track_selected, except_elements)
# _track_cross_junction_filter()
#
@@ -1413,8 +1410,8 @@ class Stream:
# Now move on to loading primary selection.
#
self._resolve_elements(self.targets)
- selected = self._pipeline.get_selection(self.targets, selection, silent=False)
- selected = self._pipeline.except_elements(self.targets, selected, except_elements)
+ selected = _pipeline.get_selection(self._context, self.targets, selection, silent=False)
+ selected = _pipeline.except_elements(self.targets, selected, except_elements)
if selection == _PipelineSelection.PLAN and dynamic_plan:
# We use a dynamic build plan, only request artifacts of top-level targets,
@@ -1446,7 +1443,7 @@ class Stream:
# to happen, even for large projects (tested with the Debian stack). Although,
# if it does become a problem we may have to set the recursion limit to a
# greater value.
- for element in self._pipeline.dependencies(targets, _Scope.ALL):
+ for element in _pipeline.dependencies(targets, _Scope.ALL):
# Determine initial element state.
element._initialize_state()
@@ -1501,7 +1498,7 @@ class Stream:
# Inform the frontend of the full list of elements
# and the list of elements which will be processed in this run
#
- self.total_elements = list(self._pipeline.dependencies(self.targets, _Scope.ALL))
+ self.total_elements = list(_pipeline.dependencies(self.targets, _Scope.ALL))
if announce_session and self._session_start_callback is not None:
self._session_start_callback()
@@ -1528,7 +1525,7 @@ class Stream:
def _fetch(self, elements: List[Element], *, fetch_original: bool = False, announce_session: bool = False):
# Assert consistency for the fetch elements
- self._pipeline.assert_consistent(elements)
+ _pipeline.assert_consistent(self._context, elements)
# Construct queues, enqueue and run
#