summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorbst-marge-bot <marge-bot@buildstream.build>2019-08-19 15:57:22 +0000
committerbst-marge-bot <marge-bot@buildstream.build>2019-08-19 15:57:22 +0000
commite92781bdb7e76b91195fef84039fe7ff51cd02bf (patch)
tree5ce1ab618acdc2cfb5d44b027a12780f7124ddfd
parent1abf4ab51c4227841d5d6805dfa71675604b95d0 (diff)
parent2beb48dbdc13133b977a6a0ea065f4ed4c0e88dc (diff)
downloadbuildstream-e92781bdb7e76b91195fef84039fe7ff51cd02bf.tar.gz
Merge branch 'tpollard/frontendelement' into 'master'
Don't directly handle Elements and Queues in App See merge request BuildStream/buildstream!1546
-rw-r--r--src/buildstream/_frontend/app.py40
-rw-r--r--src/buildstream/_frontend/cli.py5
-rw-r--r--src/buildstream/_scheduler/scheduler.py10
-rw-r--r--src/buildstream/_state.py3
-rw-r--r--src/buildstream/_stream.py34
5 files changed, 66 insertions, 26 deletions
diff --git a/src/buildstream/_frontend/app.py b/src/buildstream/_frontend/app.py
index 87575b675..236e8f80b 100644
--- a/src/buildstream/_frontend/app.py
+++ b/src/buildstream/_frontend/app.py
@@ -408,14 +408,15 @@ class App():
# if they are available in the execution context.
#
# Args:
- # element (Element): The Element object to resolve a prompt for
+ # element_name (str): The element's full name
+ # element_key (tuple): The element's display key
#
# Returns:
# (str): The formatted prompt to display in the shell
#
- def shell_prompt(self, element):
- _, key, dim = element._get_display_key()
- element_name = element._get_full_name()
+ def shell_prompt(self, element_name, element_key):
+
+ _, key, dim = element_key
if self.colors:
prompt = self._format_profile.fmt('[') + \
@@ -559,19 +560,13 @@ class App():
# action_name (str): The name of the action being performed,
# same as the task group, if it exists
# full_name (str): The name of this specific task, e.g. the element full name
- # element (Element): If an element job failed the Element instance
+ # element (tuple): If an element job failed a tuple of Element instance unique_id & display key
#
def _job_failed(self, action_name, full_name, element=None):
# Dont attempt to handle a failure if the user has already opted to
# terminate
if not self.stream.terminated:
if element:
- # look-up queue
- for q in self.stream.queues:
- if q.action_name == action_name:
- queue = q
- assert queue, "Job action {} does not have a corresponding queue".format(action_name)
-
# Get the last failure message for additional context
failure = self._fail_messages.get(full_name)
@@ -583,14 +578,14 @@ class App():
"unable to retrieve failure message for element {}\n\n\n\n\n"
.format(full_name), err=True)
else:
- self._handle_failure(element, queue, failure)
+ self._handle_failure(element, action_name, failure, full_name)
else:
# Not an element_job, we don't handle the failure
click.echo("\nTerminating all jobs\n", err=True)
self.stream.terminate()
- def _handle_failure(self, element, queue, failure):
+ def _handle_failure(self, element, action_name, failure, full_name):
# Handle non interactive mode setting of what to do when a job fails.
if not self._interactive_failures:
@@ -606,7 +601,7 @@ class App():
# Interactive mode for element failures
with self._interrupted():
- summary = ("\n{} failure on element: {}\n".format(failure.action_name, element.name) +
+ summary = ("\n{} failure on element: {}\n".format(failure.action_name, full_name) +
"\n" +
"Choose one of the following options:\n" +
" (c)ontinue - Continue queueing jobs as much as possible\n" +
@@ -630,7 +625,7 @@ class App():
click.echo(summary, err=True)
self._notify("BuildStream failure", "{} on element {}"
- .format(failure.action_name, element.name))
+ .format(failure.action_name, full_name))
try:
choice = click.prompt("Choice:", default='continue', err=True,
@@ -645,8 +640,10 @@ class App():
if choice == 'shell':
click.echo("\nDropping into an interactive shell in the failed build sandbox\n", err=True)
try:
- prompt = self.shell_prompt(element)
- self.stream.shell(element, Scope.BUILD, prompt, isolate=True, usebuildtree='always')
+ unique_id, element_key = element
+ prompt = self.shell_prompt(full_name, element_key)
+ self.stream.shell(None, Scope.BUILD, prompt, isolate=True,
+ usebuildtree='always', unique_id=unique_id)
except BstError as e:
click.echo("Error while attempting to create interactive shell: {}".format(e), err=True)
elif choice == 'log':
@@ -665,9 +662,12 @@ class App():
click.echo("\nContinuing with other non failing elements\n", err=True)
elif choice == 'retry':
click.echo("\nRetrying failed job\n", err=True)
- # FIXME: Outstandingly nasty modification of core state
- queue._task_group.failed_tasks.remove(element._get_full_name())
- queue.enqueue([element])
+ unique_id = element[0]
+ try:
+ self.stream._failure_retry(action_name, unique_id)
+ except StreamError:
+ click.echo("Job action {} does not have a corresponding queue".format(action_name), err=True)
+ self.stream.terminate()
#
# Print the session heading if we've loaded a pipeline and there
diff --git a/src/buildstream/_frontend/cli.py b/src/buildstream/_frontend/cli.py
index 1365cede4..a77bd80e8 100644
--- a/src/buildstream/_frontend/cli.py
+++ b/src/buildstream/_frontend/cli.py
@@ -597,7 +597,10 @@ def shell(app, element, sysroot, mount, isolate, build_, cli_buildtree, pull_, c
element = elements[-1]
pull_dependencies = elements[:-1] if pull_ else None
- prompt = app.shell_prompt(element)
+ element_name = element._get_full_name()
+ element_key = element._get_display_key()
+
+ prompt = app.shell_prompt(element_name, element_key)
mounts = [
HostMount(path, host_path)
for host_path, path in mount
diff --git a/src/buildstream/_scheduler/scheduler.py b/src/buildstream/_scheduler/scheduler.py
index 398e52e74..17878c4fd 100644
--- a/src/buildstream/_scheduler/scheduler.py
+++ b/src/buildstream/_scheduler/scheduler.py
@@ -259,10 +259,14 @@ class Scheduler():
self._state.remove_task(job.action_name, job.name)
if status == JobStatus.FAIL:
# If it's an elementjob, we want to compare against the failure messages
- # and send the Element() instance. Note this will change if the frontend
- # is run in a separate process for pickling
+ # and send the unique_id and display key tuple of the Element. This can then
+ # be used to load the element instance relative to the process it is running in.
element = job.get_element()
- self._state.fail_task(job.action_name, job.name, element=element)
+ if element:
+ element_info = element._unique_id, element._get_display_key()
+ else:
+ element_info = None
+ self._state.fail_task(job.action_name, job.name, element=element_info)
# Now check for more jobs
self._sched()
diff --git a/src/buildstream/_state.py b/src/buildstream/_state.py
index a4f767de8..df3bceff2 100644
--- a/src/buildstream/_state.py
+++ b/src/buildstream/_state.py
@@ -323,7 +323,8 @@ class State():
# full_name (str): The full name of the task, distinguishing
# it from other tasks with the same action name
# e.g. an element's name.
- # element (Element): (optionally) The element instance if an element job
+ # element (tuple): (optionally) The element unique_id and display keys if an
+ # element job
#
def fail_task(self, action_name, full_name, element=None):
for cb in self._task_failed_cbs:
diff --git a/src/buildstream/_stream.py b/src/buildstream/_stream.py
index 453670ad1..a7705332b 100644
--- a/src/buildstream/_stream.py
+++ b/src/buildstream/_stream.py
@@ -41,6 +41,7 @@ from ._pipeline import Pipeline, PipelineSelection
from ._profile import Topics, PROFILER
from ._state import State
from .types import _KeyStrength, _SchedulerErrorAction
+from .plugin import Plugin
from . import utils, _yaml, _site
from . import Scope, Consistency
@@ -160,6 +161,7 @@ class Stream():
# command (list): An argv to launch in the sandbox, or None
# usebuildtree (str): Whether to use a buildtree as the source, given cli option
# pull_dependencies ([Element]|None): Elements to attempt to pull
+ # unique_id: (str): Whether to use a unique_id to load an Element instance
#
# Returns:
# (int): The exit code of the launched shell
@@ -170,7 +172,12 @@ class Stream():
isolate=False,
command=None,
usebuildtree=None,
- pull_dependencies=None):
+ pull_dependencies=None,
+ unique_id=None):
+
+ # Load the Element via the unique_id if given
+ if unique_id and element is None:
+ element = Plugin._lookup(unique_id)
# Assert we have everything we need built, unless the directory is specified
# in which case we just blindly trust the directory, using the element
@@ -1304,6 +1311,31 @@ class Stream():
queue.enqueue(plan)
self.session_elements += plan
+ # _failure_retry()
+ #
+ # Enqueues given element via unique_id to the specified queue
+ # matched against provided action_name & removes the related
+ # failed task from the tasks group.
+ #
+ # Args:
+ # action_name (str): The name of the action being performed
+ # unique_id (str): A unique_id to load an Element instance
+ #
+ # Raises:
+ # (StreamError): If the related queue cannot be found
+ #
+ def _failure_retry(self, action_name, unique_id):
+ queue = None
+ # Attempt to resolve the required queue
+ for queue in self.queues:
+ if queue.action_name == action_name:
+ queue = queue
+ if not queue:
+ raise StreamError()
+ element = Plugin._lookup(unique_id)
+ queue._task_group.failed_tasks.remove(element._get_full_name())
+ queue.enqueue([element])
+
# _run()
#
# Common function for running the scheduler