diff options
Diffstat (limited to 'src/buildstream/_stream.py')
-rw-r--r-- | src/buildstream/_stream.py | 35 |
1 files changed, 16 insertions, 19 deletions
diff --git a/src/buildstream/_stream.py b/src/buildstream/_stream.py index 09735678b..35cb230ed 100644 --- a/src/buildstream/_stream.py +++ b/src/buildstream/_stream.py @@ -1,5 +1,5 @@ # -# Copyright (C) 2018 Codethink Limited +# Copyright (C) 2020 Codethink Limited # # This program is free software; you can redistribute it and/or # modify it under the terms of the GNU Lesser General Public @@ -45,13 +45,12 @@ from ._scheduler import ( ArtifactPushQueue, ) from .element import Element -from ._pipeline import Pipeline from ._profile import Topics, PROFILER from ._project import ProjectRefStorage from ._state import State from .types import _KeyStrength, _PipelineSelection, _Scope from .plugin import Plugin -from . import utils, _yaml, _site +from . import utils, _yaml, _site, _pipeline # Stream() @@ -86,7 +85,6 @@ class Stream: self._elementsourcescache = None self._sourcecache = None self._project = None - self._pipeline = None self._state = State(session_start) # Owned by Stream, used by Core to set state self._notification_queue = deque() @@ -127,7 +125,6 @@ class Stream: assert self._project is None self._project = project self._project.load_context.set_fetch_subprojects(self._fetch_subprojects) - self._pipeline = Pipeline(self._context, project, self._artifacts) # load_selection() # @@ -220,7 +217,7 @@ class Stream: self._enqueue_plan(plan) self._run() - missing_deps = [dep for dep in self._pipeline.dependencies([element], scope) if not dep._cached()] + missing_deps = [dep for dep in _pipeline.dependencies([element], scope) if not dep._cached()] if missing_deps: raise StreamError( "Elements need to be built or downloaded before staging a shell environment", @@ -250,7 +247,7 @@ class Stream: # Ensure we have our sources if we are launching a build shell if scope == _Scope.BUILD and not usebuildtree: self._fetch([element]) - self._pipeline.assert_sources_cached([element]) + _pipeline.assert_sources_cached(self._context, [element]) return element._shell( scope, mounts=mounts, isolate=isolate, prompt=prompt(element), command=command, usebuildtree=usebuildtree @@ -286,7 +283,7 @@ class Stream: ) # Assert that the elements are consistent - self._pipeline.assert_consistent(elements) + _pipeline.assert_consistent(self._context, elements) if all(project.remote_execution_specs for project in self._context.get_projects()): # Remote execution is configured for all projects. @@ -411,7 +408,7 @@ class Stream: if not self._sourcecache.has_push_remotes(): raise StreamError("No source caches available for pushing sources") - self._pipeline.assert_consistent(elements) + _pipeline.assert_consistent(self._context, elements) self._add_queue(FetchQueue(self._scheduler)) @@ -451,7 +448,7 @@ class Stream: if not self._artifacts.has_fetch_remotes(): raise StreamError("No artifact caches available for pulling artifacts") - self._pipeline.assert_consistent(elements) + _pipeline.assert_consistent(self._context, elements) self._scheduler.clear_queues() self._add_queue(PullQueue(self._scheduler)) self._enqueue_plan(elements) @@ -492,7 +489,7 @@ class Stream: if not self._artifacts.has_push_remotes(): raise StreamError("No artifact caches available for pushing artifacts") - self._pipeline.assert_consistent(elements) + _pipeline.assert_consistent(self._context, elements) self._scheduler.clear_queues() self._add_queue(PullQueue(self._scheduler)) @@ -748,7 +745,7 @@ class Stream: # Assert all sources are cached in the source dir self._fetch(elements) - self._pipeline.assert_sources_cached(elements) + _pipeline.assert_sources_cached(self._context, elements) # Stage all sources determined by scope try: @@ -1303,11 +1300,11 @@ class Stream: track_selected = [] for project, project_elements in track_projects.items(): - selected = self._pipeline.get_selection(project_elements, selection) + selected = _pipeline.get_selection(self._context, project_elements, selection) selected = self._track_cross_junction_filter(project, selected, cross_junctions) track_selected.extend(selected) - return self._pipeline.except_elements(elements, track_selected, except_elements) + return _pipeline.except_elements(elements, track_selected, except_elements) # _track_cross_junction_filter() # @@ -1413,8 +1410,8 @@ class Stream: # Now move on to loading primary selection. # self._resolve_elements(self.targets) - selected = self._pipeline.get_selection(self.targets, selection, silent=False) - selected = self._pipeline.except_elements(self.targets, selected, except_elements) + selected = _pipeline.get_selection(self._context, self.targets, selection, silent=False) + selected = _pipeline.except_elements(self.targets, selected, except_elements) if selection == _PipelineSelection.PLAN and dynamic_plan: # We use a dynamic build plan, only request artifacts of top-level targets, @@ -1446,7 +1443,7 @@ class Stream: # to happen, even for large projects (tested with the Debian stack). Although, # if it does become a problem we may have to set the recursion limit to a # greater value. - for element in self._pipeline.dependencies(targets, _Scope.ALL): + for element in _pipeline.dependencies(targets, _Scope.ALL): # Determine initial element state. element._initialize_state() @@ -1501,7 +1498,7 @@ class Stream: # Inform the frontend of the full list of elements # and the list of elements which will be processed in this run # - self.total_elements = list(self._pipeline.dependencies(self.targets, _Scope.ALL)) + self.total_elements = list(_pipeline.dependencies(self.targets, _Scope.ALL)) if announce_session and self._session_start_callback is not None: self._session_start_callback() @@ -1528,7 +1525,7 @@ class Stream: def _fetch(self, elements: List[Element], *, fetch_original: bool = False, announce_session: bool = False): # Assert consistency for the fetch elements - self._pipeline.assert_consistent(elements) + _pipeline.assert_consistent(self._context, elements) # Construct queues, enqueue and run # |