summaryrefslogtreecommitdiff
path: root/buildstream/_frontend
diff options
context:
space:
mode:
authorTristan Van Berkom <tristan.vanberkom@codethink.co.uk>2018-04-29 15:20:09 +0900
committerTristan Van Berkom <tristan.vanberkom@codethink.co.uk>2018-05-08 03:59:38 +0900
commit2390c81411aee2019cec891ea0be5247e779bc2e (patch)
treebb065bef7e66507e65f0b7564a18ddf1002f44b9 /buildstream/_frontend
parentbbb894bdd921f39e4440e74351b5f478f65555f3 (diff)
downloadbuildstream-2390c81411aee2019cec891ea0be5247e779bc2e.tar.gz
_stream.py: New Stream object, main calling interface for BuildStream core
This is the first part of the pipeline refactor, at this stage all calling interfaces remain the same, except that invocation of the scheduler has been moved from Pipline to Stream.
Diffstat (limited to 'buildstream/_frontend')
-rw-r--r--buildstream/_frontend/app.py13
-rw-r--r--buildstream/_frontend/cli.py16
-rw-r--r--buildstream/_frontend/status.py14
-rw-r--r--buildstream/_frontend/widget.py8
4 files changed, 29 insertions, 22 deletions
diff --git a/buildstream/_frontend/app.py b/buildstream/_frontend/app.py
index 3bcb0d962..84c33a385 100644
--- a/buildstream/_frontend/app.py
+++ b/buildstream/_frontend/app.py
@@ -39,6 +39,7 @@ from .._context import Context
from .._project import Project
from .._exceptions import BstError, PipelineError, LoadError, LoadErrorReason, AppError
from .._message import Message, MessageType, unconditional_messages
+from .._stream import Stream
from .._pipeline import Pipeline, PipelineSelection
from .._scheduler import Scheduler
from .._profile import Topics, profile_start, profile_end
@@ -69,6 +70,7 @@ class App():
# Public members
#
self.context = None # The Context object
+ self.stream = None # The Stream object
self.project = None # The toplevel Project object
self.scheduler = None # The Scheduler
self.pipeline = None # The Pipeline
@@ -255,11 +257,12 @@ class App():
track_cross_junctions=False,
track_selection=PipelineSelection.ALL,
fetch_subprojects=False):
- profile_start(Topics.LOAD_PIPELINE, "_".join(t.replace(os.sep, '-') for t in elements))
# Start with the early stage init, this enables logging right away
with self.partially_initialized(fetch_subprojects=fetch_subprojects):
+ profile_start(Topics.LOAD_PIPELINE, "_".join(t.replace(os.sep, '-') for t in elements))
+
# Mark the beginning of the session
if session_name:
self._message(MessageType.START, session_name)
@@ -280,7 +283,7 @@ class App():
# Create our status printer, only available in interactive
self._status = Status(self._content_profile, self._format_profile,
self._success_profile, self._error_profile,
- self.pipeline, self.scheduler,
+ self.stream, self.pipeline, self.scheduler,
colors=self.colors)
# Initialize pipeline
@@ -293,6 +296,8 @@ class App():
except BstError as e:
self._error_exit(e, "Error initializing pipeline")
+ self.stream = Stream(self.context, self.scheduler, self.pipeline)
+
# Pipeline is loaded, now we can tell the logger about it
self.logger.size_request(self.pipeline)
@@ -476,7 +481,7 @@ class App():
# If we're going to checkout, we need at least a fetch,
# if we were asked to track first, we're going to fetch anyway.
if not no_checkout or track_first:
- self.pipeline.fetch(self.scheduler, [target])
+ self.stream.fetch(self.scheduler, [target])
if not no_checkout and target._get_consistency() != Consistency.CACHED:
raise PipelineError("Could not stage uncached source. " +
@@ -751,7 +756,7 @@ class App():
#
def _print_summary(self):
click.echo("", err=True)
- self.logger.print_summary(self.pipeline, self.scheduler,
+ self.logger.print_summary(self.stream, self.scheduler,
self._main_options['log_file'],
styling=self.colors)
diff --git a/buildstream/_frontend/cli.py b/buildstream/_frontend/cli.py
index 5f9e2751e..11a0ca2cc 100644
--- a/buildstream/_frontend/cli.py
+++ b/buildstream/_frontend/cli.py
@@ -243,7 +243,7 @@ def build(app, elements, all_, track_, track_save, track_all, track_except, trac
use_configured_remote_caches=True, track_elements=track_,
track_cross_junctions=track_cross_junctions,
fetch_subprojects=True):
- app.pipeline.build(app.scheduler, build_all=all_)
+ app.stream.build(app.scheduler, build_all=all_)
##################################################################
@@ -287,7 +287,7 @@ def fetch(app, elements, deps, track_, except_, track_cross_junctions):
track_cross_junctions=track_cross_junctions,
fetch_subprojects=True):
dependencies = app.pipeline.get_selection(deps)
- app.pipeline.fetch(app.scheduler, dependencies)
+ app.stream.fetch(app.scheduler, dependencies)
##################################################################
@@ -323,7 +323,7 @@ def track(app, elements, deps, except_, cross_junctions):
track_cross_junctions=cross_junctions,
track_selection=deps,
fetch_subprojects=True):
- app.pipeline.track(app.scheduler)
+ app.stream.track(app.scheduler)
##################################################################
@@ -354,7 +354,7 @@ def pull(app, elements, deps, remote):
with app.initialized(elements, session_name="Pull", use_configured_remote_caches=(remote is None),
add_remote_cache=remote, fetch_subprojects=True):
to_pull = app.pipeline.get_selection(deps)
- app.pipeline.pull(app.scheduler, to_pull)
+ app.stream.pull(app.scheduler, to_pull)
##################################################################
@@ -385,7 +385,7 @@ def push(app, elements, deps, remote):
use_configured_remote_caches=(remote is None),
add_remote_cache=remote, fetch_subprojects=True):
to_push = app.pipeline.get_selection(deps)
- app.pipeline.push(app.scheduler, to_push)
+ app.stream.push(app.scheduler, to_push)
##################################################################
@@ -564,7 +564,7 @@ def checkout(app, element, directory, force, integrate, hardlinks):
"""Checkout a built artifact to the specified directory
"""
with app.initialized((element,)):
- app.pipeline.checkout(directory, force, integrate, hardlinks)
+ app.stream.checkout(directory, force, integrate, hardlinks)
##################################################################
@@ -592,8 +592,8 @@ def source_bundle(app, target, force, directory,
"""
with app.initialized((target,), rewritable=track_, track_elements=[target] if track_ else None):
dependencies = app.pipeline.get_selection('all')
- app.pipeline.source_bundle(app.scheduler, dependencies, force, track_,
- compression, directory)
+ app.stream.source_bundle(app.scheduler, dependencies, force, track_,
+ compression, directory)
##################################################################
diff --git a/buildstream/_frontend/status.py b/buildstream/_frontend/status.py
index 9bb2b644f..4f3eed0f5 100644
--- a/buildstream/_frontend/status.py
+++ b/buildstream/_frontend/status.py
@@ -37,6 +37,7 @@ from .widget import TimeCode
# format_profile (Profile): Formatting profile for formatting text
# success_profile (Profile): Formatting profile for success text
# error_profile (Profile): Formatting profile for error text
+# stream (Stream): The Stream
# pipeline (Pipeline): The Pipeline
# scheduler (Scheduler): The Scheduler
# colors (bool): Whether to print the ANSI color codes in the output
@@ -45,13 +46,12 @@ class Status():
def __init__(self, content_profile, format_profile,
success_profile, error_profile,
- pipeline, scheduler, colors=False):
+ stream, pipeline, scheduler, colors=False):
self._content_profile = content_profile
self._format_profile = format_profile
self._success_profile = success_profile
self._error_profile = error_profile
- self._pipeline = pipeline
self._scheduler = scheduler
self._jobs = []
self._last_lines = 0 # Number of status lines we last printed to console
@@ -60,7 +60,7 @@ class Status():
self._colors = colors
self._header = _StatusHeader(content_profile, format_profile,
success_profile, error_profile,
- pipeline, scheduler)
+ stream, pipeline, scheduler)
self._term_width, _ = click.get_terminal_size()
self._alloc_lines = 0
@@ -246,6 +246,7 @@ class Status():
# format_profile (Profile): Formatting profile for formatting text
# success_profile (Profile): Formatting profile for success text
# error_profile (Profile): Formatting profile for error text
+# stream (Stream): The Stream
# pipeline (Pipeline): The Pipeline
# scheduler (Scheduler): The Scheduler
#
@@ -253,7 +254,7 @@ class _StatusHeader():
def __init__(self, content_profile, format_profile,
success_profile, error_profile,
- pipeline, scheduler):
+ stream, pipeline, scheduler):
#
# Public members
@@ -267,6 +268,7 @@ class _StatusHeader():
self._format_profile = format_profile
self._success_profile = success_profile
self._error_profile = error_profile
+ self._stream = stream
self._pipeline = pipeline
self._scheduler = scheduler
self._time_code = TimeCode(content_profile, format_profile)
@@ -276,8 +278,8 @@ class _StatusHeader():
size = 0
text = ''
- session = str(self._pipeline.session_elements)
- total = str(self._pipeline.total_elements)
+ session = str(self._stream.session_elements)
+ total = str(self._stream.total_elements)
# Format and calculate size for pipeline target and overall time code
size += len(total) + len(session) + 4 # Size for (N/N) with a leading space
diff --git a/buildstream/_frontend/widget.py b/buildstream/_frontend/widget.py
index b5942b91e..a72293b04 100644
--- a/buildstream/_frontend/widget.py
+++ b/buildstream/_frontend/widget.py
@@ -530,12 +530,12 @@ class LogLine(Widget):
# Print a summary of activities at the end of a session
#
# Args:
- # pipeline (Pipeline): The Pipeline
+ # stream (Stream): The Stream
# scheduler (Scheduler): The Scheduler
# log_file (file): An optional file handle for additional logging
# styling (bool): Whether to enable ansi escape codes in the output
#
- def print_summary(self, pipeline, scheduler, log_file, styling=False):
+ def print_summary(self, stream, scheduler, log_file, styling=False):
# Early silent return if there are no queues, can happen
# only in the case that the pipeline early returned due to
@@ -563,8 +563,8 @@ class LogLine(Widget):
text += self.content_profile.fmt("Pipeline Summary\n", bold=True)
values = OrderedDict()
- values['Total'] = self.content_profile.fmt(str(pipeline.total_elements))
- values['Session'] = self.content_profile.fmt(str(pipeline.session_elements))
+ values['Total'] = self.content_profile.fmt(str(stream.total_elements))
+ values['Session'] = self.content_profile.fmt(str(stream.session_elements))
processed_maxlen = 1
skipped_maxlen = 1