summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTristan Van Berkom <tristan.vanberkom@codethink.co.uk>2017-10-30 20:04:23 +0900
committerTristan Van Berkom <tristan.vanberkom@codethink.co.uk>2017-10-30 20:04:23 +0900
commit051dd9ccbdd9dccc7bd4d20ab88ff96f0891e9fd (patch)
tree2ea0ba48427e00d964dbce8407e3b9338801fcd1
parente472f5abd7f9aeff994a14e581c7a89199bd255f (diff)
downloadbuildstream-051dd9ccbdd9dccc7bd4d20ab88ff96f0891e9fd.tar.gz
_pipeline.py: The toplevel message() function no longer takes an element.
Instead, use None as the unique ID. This will help with messaging when we allow invoking multiple targets and the element would have been ambiguous - this also consequently fixes issue #137. The reason for the hangs with #137 is because: o When you --except a base element, reverse dependencies cannot calculate a cache key o When you track, cache keys are intentionally reset at startup time, because we know they are going to change o At the end of tracking, we make one attempt to print the toplevel cache key This operation is insanely expensive, because we never cache a cache key because it logically cannot ever be resolved in this situation. This fix is basically a workaround to the above.
-rw-r--r--buildstream/_pipeline.py67
1 files changed, 32 insertions, 35 deletions
diff --git a/buildstream/_pipeline.py b/buildstream/_pipeline.py
index eecb88f6a..dd368d8fd 100644
--- a/buildstream/_pipeline.py
+++ b/buildstream/_pipeline.py
@@ -181,7 +181,7 @@ class Pipeline():
remote_ticker(self.artifacts.artifact_pull)
self.artifacts.fetch_remote_refs()
except _ArtifactError:
- self.message(self.target, MessageType.WARN, "Failed to fetch remote refs")
+ self.message(MessageType.WARN, "Failed to fetch remote refs")
self.artifacts.set_offline()
for element in self.dependencies(Scope.ALL):
@@ -227,7 +227,7 @@ class Pipeline():
"Try tracking these elements first with `bst track`\n\n"
for element in inconsistent:
detail += " " + element.name + "\n"
- self.message(self.target, MessageType.ERROR, "Inconsistent pipeline", detail=detail)
+ self.message(MessageType.ERROR, "Inconsistent pipeline", detail=detail)
raise PipelineError()
# Generator function to iterate over only the elements
@@ -241,13 +241,10 @@ class Pipeline():
# Local message propagator
#
- def message(self, plugin, message_type, message, **kwargs):
+ def message(self, message_type, message, **kwargs):
args = dict(kwargs)
self.context._message(
- Message(plugin._get_unique_id(),
- message_type,
- message,
- **args))
+ Message(None, message_type, message, **args))
# Internal: Instantiates plugin-provided Element and Source instances
# from MetaElement and MetaSource objects
@@ -292,14 +289,14 @@ class Pipeline():
def can_push_remote_artifact_cache(self):
if self.artifacts.can_push():
starttime = datetime.datetime.now()
- self.message(self.target, MessageType.START, "Checking connectivity to remote artifact cache")
+ self.message(MessageType.START, "Checking connectivity to remote artifact cache")
try:
self.artifacts.preflight()
except _ArtifactError as e:
- self.message(self.target, MessageType.WARN, str(e),
+ self.message(MessageType.WARN, str(e),
elapsed=datetime.datetime.now() - starttime)
return False
- self.message(self.target, MessageType.SUCCESS, "Connectivity OK",
+ self.message(MessageType.SUCCESS, "Connectivity OK",
elapsed=datetime.datetime.now() - starttime)
return True
else:
@@ -329,20 +326,20 @@ class Pipeline():
track.enqueue(dependencies)
self.session_elements = len(dependencies)
- self.message(self.target, MessageType.START, "Starting track")
+ self.message(MessageType.START, "Starting track")
elapsed, status = scheduler.run([track])
changed = len(track.processed_elements)
if status == SchedStatus.ERROR:
- self.message(self.target, MessageType.FAIL, "Track failed", elapsed=elapsed)
+ self.message(MessageType.FAIL, "Track failed", elapsed=elapsed)
raise PipelineError()
elif status == SchedStatus.TERMINATED:
- self.message(self.target, MessageType.WARN,
+ self.message(MessageType.WARN,
"Terminated after updating {} source references".format(changed),
elapsed=elapsed)
raise PipelineError()
else:
- self.message(self.target, MessageType.SUCCESS,
+ self.message(MessageType.SUCCESS,
"Updated {} source references".format(changed),
elapsed=elapsed)
@@ -381,20 +378,20 @@ class Pipeline():
fetch.enqueue(plan)
queues = [fetch]
- self.message(self.target, MessageType.START, "Fetching {} elements".format(len(plan)))
+ self.message(MessageType.START, "Fetching {} elements".format(len(plan)))
elapsed, status = scheduler.run(queues)
fetched = len(fetch.processed_elements)
if status == SchedStatus.ERROR:
- self.message(self.target, MessageType.FAIL, "Fetch failed", elapsed=elapsed)
+ self.message(MessageType.FAIL, "Fetch failed", elapsed=elapsed)
raise PipelineError()
elif status == SchedStatus.TERMINATED:
- self.message(self.target, MessageType.WARN,
+ self.message(MessageType.WARN,
"Terminated after fetching {} elements".format(fetched),
elapsed=elapsed)
raise PipelineError()
else:
- self.message(self.target, MessageType.SUCCESS,
+ self.message(MessageType.SUCCESS,
"Fetched {} elements".format(fetched),
elapsed=elapsed)
@@ -410,7 +407,7 @@ class Pipeline():
#
def build(self, scheduler, build_all, track_first):
if len(self.unused_workspaces) > 0:
- self.message(self.target, MessageType.WARN, "Unused workspaces",
+ self.message(MessageType.WARN, "Unused workspaces",
detail="\n".join([el + "-" + str(src) for el, src, _
in self.unused_workspaces]))
@@ -445,18 +442,18 @@ class Pipeline():
self.session_elements = len(plan)
- self.message(self.target, MessageType.START, "Starting build")
+ self.message(MessageType.START, "Starting build")
elapsed, status = scheduler.run(queues)
built = len(build.processed_elements)
if status == SchedStatus.ERROR:
- self.message(self.target, MessageType.FAIL, "Build failed", elapsed=elapsed)
+ self.message(MessageType.FAIL, "Build failed", elapsed=elapsed)
raise PipelineError()
elif status == SchedStatus.TERMINATED:
- self.message(self.target, MessageType.WARN, "Terminated", elapsed=elapsed)
+ self.message(MessageType.WARN, "Terminated", elapsed=elapsed)
raise PipelineError()
else:
- self.message(self.target, MessageType.SUCCESS, "Build Complete", elapsed=elapsed)
+ self.message(MessageType.SUCCESS, "Build Complete", elapsed=elapsed)
# checkout()
#
@@ -484,7 +481,7 @@ class Pipeline():
# commands for cross-build artifacts.
can_integrate = (self.context.host_arch == self.context.target_arch)
if not can_integrate:
- self.message(self.target, MessageType.WARN,
+ self.message(MessageType.WARN,
"Host-incompatible checkout -- no integration commands can be run")
# Stage deps into a temporary sandbox first
@@ -548,15 +545,15 @@ class Pipeline():
fetched = len(fetch.processed_elements)
if status == SchedStatus.ERROR:
- self.message(self.target, MessageType.FAIL, "Tracking failed", elapsed=elapsed)
+ self.message(MessageType.FAIL, "Tracking failed", elapsed=elapsed)
raise PipelineError()
elif status == SchedStatus.TERMINATED:
- self.message(self.target, MessageType.WARN,
+ self.message(MessageType.WARN,
"Terminated after fetching {} elements".format(fetched),
elapsed=elapsed)
raise PipelineError()
else:
- self.message(self.target, MessageType.SUCCESS,
+ self.message(MessageType.SUCCESS,
"Fetched {} elements".format(fetched), elapsed=elapsed)
if not no_checkout:
@@ -657,20 +654,20 @@ class Pipeline():
pull.enqueue(plan)
queues = [pull]
- self.message(self.target, MessageType.START, "Pulling {} artifacts".format(len(plan)))
+ self.message(MessageType.START, "Pulling {} artifacts".format(len(plan)))
elapsed, status = scheduler.run(queues)
pulled = len(pull.processed_elements)
if status == SchedStatus.ERROR:
- self.message(self.target, MessageType.FAIL, "Pull failed", elapsed=elapsed)
+ self.message(MessageType.FAIL, "Pull failed", elapsed=elapsed)
raise PipelineError()
elif status == SchedStatus.TERMINATED:
- self.message(self.target, MessageType.WARN,
+ self.message(MessageType.WARN,
"Terminated after pulling {} elements".format(pulled),
elapsed=elapsed)
raise PipelineError()
else:
- self.message(self.target, MessageType.SUCCESS,
+ self.message(MessageType.SUCCESS,
"Pulled {} complete".format(pulled),
elapsed=elapsed)
@@ -695,20 +692,20 @@ class Pipeline():
push.enqueue(plan)
queues = [push]
- self.message(self.target, MessageType.START, "Pushing {} artifacts".format(len(plan)))
+ self.message(MessageType.START, "Pushing {} artifacts".format(len(plan)))
elapsed, status = scheduler.run(queues)
pushed = len(push.processed_elements)
if status == SchedStatus.ERROR:
- self.message(self.target, MessageType.FAIL, "Push failed", elapsed=elapsed)
+ self.message(MessageType.FAIL, "Push failed", elapsed=elapsed)
raise PipelineError()
elif status == SchedStatus.TERMINATED:
- self.message(self.target, MessageType.WARN,
+ self.message(MessageType.WARN,
"Terminated after pushing {} elements".format(pushed),
elapsed=elapsed)
raise PipelineError()
else:
- self.message(self.target, MessageType.SUCCESS,
+ self.message(MessageType.SUCCESS,
"Pushed {} complete".format(pushed),
elapsed=elapsed)