From 2390c81411aee2019cec891ea0be5247e779bc2e Mon Sep 17 00:00:00 2001 From: Tristan Van Berkom Date: Sun, 29 Apr 2018 15:20:09 +0900 Subject: _stream.py: New Stream object, main calling interface for BuildStream core This is the first part of the pipeline refactor, at this stage all calling interfaces remain the same, except that invocation of the scheduler has been moved from Pipline to Stream. --- buildstream/_frontend/app.py | 13 +++++++++---- buildstream/_frontend/cli.py | 16 ++++++++-------- buildstream/_frontend/status.py | 14 ++++++++------ buildstream/_frontend/widget.py | 8 ++++---- 4 files changed, 29 insertions(+), 22 deletions(-) (limited to 'buildstream/_frontend') diff --git a/buildstream/_frontend/app.py b/buildstream/_frontend/app.py index 3bcb0d962..84c33a385 100644 --- a/buildstream/_frontend/app.py +++ b/buildstream/_frontend/app.py @@ -39,6 +39,7 @@ from .._context import Context from .._project import Project from .._exceptions import BstError, PipelineError, LoadError, LoadErrorReason, AppError from .._message import Message, MessageType, unconditional_messages +from .._stream import Stream from .._pipeline import Pipeline, PipelineSelection from .._scheduler import Scheduler from .._profile import Topics, profile_start, profile_end @@ -69,6 +70,7 @@ class App(): # Public members # self.context = None # The Context object + self.stream = None # The Stream object self.project = None # The toplevel Project object self.scheduler = None # The Scheduler self.pipeline = None # The Pipeline @@ -255,11 +257,12 @@ class App(): track_cross_junctions=False, track_selection=PipelineSelection.ALL, fetch_subprojects=False): - profile_start(Topics.LOAD_PIPELINE, "_".join(t.replace(os.sep, '-') for t in elements)) # Start with the early stage init, this enables logging right away with self.partially_initialized(fetch_subprojects=fetch_subprojects): + profile_start(Topics.LOAD_PIPELINE, "_".join(t.replace(os.sep, '-') for t in elements)) + # Mark the beginning of the session if session_name: self._message(MessageType.START, session_name) @@ -280,7 +283,7 @@ class App(): # Create our status printer, only available in interactive self._status = Status(self._content_profile, self._format_profile, self._success_profile, self._error_profile, - self.pipeline, self.scheduler, + self.stream, self.pipeline, self.scheduler, colors=self.colors) # Initialize pipeline @@ -293,6 +296,8 @@ class App(): except BstError as e: self._error_exit(e, "Error initializing pipeline") + self.stream = Stream(self.context, self.scheduler, self.pipeline) + # Pipeline is loaded, now we can tell the logger about it self.logger.size_request(self.pipeline) @@ -476,7 +481,7 @@ class App(): # If we're going to checkout, we need at least a fetch, # if we were asked to track first, we're going to fetch anyway. if not no_checkout or track_first: - self.pipeline.fetch(self.scheduler, [target]) + self.stream.fetch(self.scheduler, [target]) if not no_checkout and target._get_consistency() != Consistency.CACHED: raise PipelineError("Could not stage uncached source. " + @@ -751,7 +756,7 @@ class App(): # def _print_summary(self): click.echo("", err=True) - self.logger.print_summary(self.pipeline, self.scheduler, + self.logger.print_summary(self.stream, self.scheduler, self._main_options['log_file'], styling=self.colors) diff --git a/buildstream/_frontend/cli.py b/buildstream/_frontend/cli.py index 5f9e2751e..11a0ca2cc 100644 --- a/buildstream/_frontend/cli.py +++ b/buildstream/_frontend/cli.py @@ -243,7 +243,7 @@ def build(app, elements, all_, track_, track_save, track_all, track_except, trac use_configured_remote_caches=True, track_elements=track_, track_cross_junctions=track_cross_junctions, fetch_subprojects=True): - app.pipeline.build(app.scheduler, build_all=all_) + app.stream.build(app.scheduler, build_all=all_) ################################################################## @@ -287,7 +287,7 @@ def fetch(app, elements, deps, track_, except_, track_cross_junctions): track_cross_junctions=track_cross_junctions, fetch_subprojects=True): dependencies = app.pipeline.get_selection(deps) - app.pipeline.fetch(app.scheduler, dependencies) + app.stream.fetch(app.scheduler, dependencies) ################################################################## @@ -323,7 +323,7 @@ def track(app, elements, deps, except_, cross_junctions): track_cross_junctions=cross_junctions, track_selection=deps, fetch_subprojects=True): - app.pipeline.track(app.scheduler) + app.stream.track(app.scheduler) ################################################################## @@ -354,7 +354,7 @@ def pull(app, elements, deps, remote): with app.initialized(elements, session_name="Pull", use_configured_remote_caches=(remote is None), add_remote_cache=remote, fetch_subprojects=True): to_pull = app.pipeline.get_selection(deps) - app.pipeline.pull(app.scheduler, to_pull) + app.stream.pull(app.scheduler, to_pull) ################################################################## @@ -385,7 +385,7 @@ def push(app, elements, deps, remote): use_configured_remote_caches=(remote is None), add_remote_cache=remote, fetch_subprojects=True): to_push = app.pipeline.get_selection(deps) - app.pipeline.push(app.scheduler, to_push) + app.stream.push(app.scheduler, to_push) ################################################################## @@ -564,7 +564,7 @@ def checkout(app, element, directory, force, integrate, hardlinks): """Checkout a built artifact to the specified directory """ with app.initialized((element,)): - app.pipeline.checkout(directory, force, integrate, hardlinks) + app.stream.checkout(directory, force, integrate, hardlinks) ################################################################## @@ -592,8 +592,8 @@ def source_bundle(app, target, force, directory, """ with app.initialized((target,), rewritable=track_, track_elements=[target] if track_ else None): dependencies = app.pipeline.get_selection('all') - app.pipeline.source_bundle(app.scheduler, dependencies, force, track_, - compression, directory) + app.stream.source_bundle(app.scheduler, dependencies, force, track_, + compression, directory) ################################################################## diff --git a/buildstream/_frontend/status.py b/buildstream/_frontend/status.py index 9bb2b644f..4f3eed0f5 100644 --- a/buildstream/_frontend/status.py +++ b/buildstream/_frontend/status.py @@ -37,6 +37,7 @@ from .widget import TimeCode # format_profile (Profile): Formatting profile for formatting text # success_profile (Profile): Formatting profile for success text # error_profile (Profile): Formatting profile for error text +# stream (Stream): The Stream # pipeline (Pipeline): The Pipeline # scheduler (Scheduler): The Scheduler # colors (bool): Whether to print the ANSI color codes in the output @@ -45,13 +46,12 @@ class Status(): def __init__(self, content_profile, format_profile, success_profile, error_profile, - pipeline, scheduler, colors=False): + stream, pipeline, scheduler, colors=False): self._content_profile = content_profile self._format_profile = format_profile self._success_profile = success_profile self._error_profile = error_profile - self._pipeline = pipeline self._scheduler = scheduler self._jobs = [] self._last_lines = 0 # Number of status lines we last printed to console @@ -60,7 +60,7 @@ class Status(): self._colors = colors self._header = _StatusHeader(content_profile, format_profile, success_profile, error_profile, - pipeline, scheduler) + stream, pipeline, scheduler) self._term_width, _ = click.get_terminal_size() self._alloc_lines = 0 @@ -246,6 +246,7 @@ class Status(): # format_profile (Profile): Formatting profile for formatting text # success_profile (Profile): Formatting profile for success text # error_profile (Profile): Formatting profile for error text +# stream (Stream): The Stream # pipeline (Pipeline): The Pipeline # scheduler (Scheduler): The Scheduler # @@ -253,7 +254,7 @@ class _StatusHeader(): def __init__(self, content_profile, format_profile, success_profile, error_profile, - pipeline, scheduler): + stream, pipeline, scheduler): # # Public members @@ -267,6 +268,7 @@ class _StatusHeader(): self._format_profile = format_profile self._success_profile = success_profile self._error_profile = error_profile + self._stream = stream self._pipeline = pipeline self._scheduler = scheduler self._time_code = TimeCode(content_profile, format_profile) @@ -276,8 +278,8 @@ class _StatusHeader(): size = 0 text = '' - session = str(self._pipeline.session_elements) - total = str(self._pipeline.total_elements) + session = str(self._stream.session_elements) + total = str(self._stream.total_elements) # Format and calculate size for pipeline target and overall time code size += len(total) + len(session) + 4 # Size for (N/N) with a leading space diff --git a/buildstream/_frontend/widget.py b/buildstream/_frontend/widget.py index b5942b91e..a72293b04 100644 --- a/buildstream/_frontend/widget.py +++ b/buildstream/_frontend/widget.py @@ -530,12 +530,12 @@ class LogLine(Widget): # Print a summary of activities at the end of a session # # Args: - # pipeline (Pipeline): The Pipeline + # stream (Stream): The Stream # scheduler (Scheduler): The Scheduler # log_file (file): An optional file handle for additional logging # styling (bool): Whether to enable ansi escape codes in the output # - def print_summary(self, pipeline, scheduler, log_file, styling=False): + def print_summary(self, stream, scheduler, log_file, styling=False): # Early silent return if there are no queues, can happen # only in the case that the pipeline early returned due to @@ -563,8 +563,8 @@ class LogLine(Widget): text += self.content_profile.fmt("Pipeline Summary\n", bold=True) values = OrderedDict() - values['Total'] = self.content_profile.fmt(str(pipeline.total_elements)) - values['Session'] = self.content_profile.fmt(str(pipeline.session_elements)) + values['Total'] = self.content_profile.fmt(str(stream.total_elements)) + values['Session'] = self.content_profile.fmt(str(stream.session_elements)) processed_maxlen = 1 skipped_maxlen = 1 -- cgit v1.2.1