summaryrefslogtreecommitdiff
path: root/src/buildstream/_scheduler/jobs/job.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/buildstream/_scheduler/jobs/job.py')
-rw-r--r--src/buildstream/_scheduler/jobs/job.py136
1 files changed, 29 insertions, 107 deletions
diff --git a/src/buildstream/_scheduler/jobs/job.py b/src/buildstream/_scheduler/jobs/job.py
index 3a5694a71..8baf8fe1b 100644
--- a/src/buildstream/_scheduler/jobs/job.py
+++ b/src/buildstream/_scheduler/jobs/job.py
@@ -146,13 +146,9 @@ class Job:
self._terminated = False # Whether this job has been explicitly terminated
self._logfile = logfile
- self._message_element_name = (
- None # The plugin instance element name for messaging
- )
+ self._message_element_name = None # The plugin instance element name for messaging
self._message_element_key = None # The element key for messaging
- self._element = (
- None # The Element() passed to the Job() constructor, if applicable
- )
+ self._element = None # The Element() passed to the Job() constructor, if applicable
# set_name()
#
@@ -182,15 +178,9 @@ class Job:
self._message_element_key,
)
- if (
- self._scheduler.context.platform.does_multiprocessing_start_require_pickling()
- ):
- pickled = pickle_child_job(
- child_job, self._scheduler.context.get_projects(),
- )
- self._process = Process(
- target=do_pickled_child_job, args=[pickled, self._queue],
- )
+ if self._scheduler.context.platform.does_multiprocessing_start_require_pickling():
+ pickled = pickle_child_job(child_job, self._scheduler.context.get_projects(),)
+ self._process = Process(target=do_pickled_child_job, args=[pickled, self._queue],)
else:
self._process = Process(target=child_job.child_action, args=[self._queue],)
@@ -198,9 +188,7 @@ class Job:
# the child process does not inherit the parent's state, but the main
# process will be notified of any signal after we launch the child.
#
- with _signals.blocked(
- [signal.SIGINT, signal.SIGTSTP, signal.SIGTERM], ignore=False
- ):
+ with _signals.blocked([signal.SIGINT, signal.SIGTSTP, signal.SIGTERM], ignore=False):
self._process.start()
# Wait for the child task to complete.
@@ -282,8 +270,7 @@ class Job:
def kill(self):
# Force kill
self.message(
- MessageType.WARN,
- "{} did not terminate gracefully, killing".format(self.action_name),
+ MessageType.WARN, "{} did not terminate gracefully, killing".format(self.action_name),
)
utils._kill_process_tree(self._process.pid)
@@ -358,22 +345,14 @@ class Job:
# kwargs: Remaining Message() constructor arguments, note that you can
# override 'element_name' and 'element_key' this way.
#
- def message(
- self, message_type, message, element_name=None, element_key=None, **kwargs
- ):
+ def message(self, message_type, message, element_name=None, element_key=None, **kwargs):
kwargs["scheduler"] = True
# If default name & key values not provided, set as given job attributes
if element_name is None:
element_name = self._message_element_name
if element_key is None:
element_key = self._message_element_key
- message = Message(
- message_type,
- message,
- element_name=element_name,
- element_key=element_key,
- **kwargs
- )
+ message = Message(message_type, message, element_name=element_name, element_key=element_key, **kwargs)
self._scheduler.notify_messenger(message)
# get_element()
@@ -405,11 +384,7 @@ class Job:
# lists, dicts, numbers, but not Element instances).
#
def handle_message(self, message):
- raise ImplError(
- "Job '{kind}' does not implement handle_message()".format(
- kind=type(self).__name__
- )
- )
+ raise ImplError("Job '{kind}' does not implement handle_message()".format(kind=type(self).__name__))
# parent_complete()
#
@@ -421,11 +396,7 @@ class Job:
# result (any): The result returned by child_process().
#
def parent_complete(self, status, result):
- raise ImplError(
- "Job '{kind}' does not implement parent_complete()".format(
- kind=type(self).__name__
- )
- )
+ raise ImplError("Job '{kind}' does not implement parent_complete()".format(kind=type(self).__name__))
# create_child_job()
#
@@ -443,11 +414,7 @@ class Job:
# (ChildJob): An instance of a subclass of ChildJob.
#
def create_child_job(self, *args, **kwargs):
- raise ImplError(
- "Job '{kind}' does not implement create_child_job()".format(
- kind=type(self).__name__
- )
- )
+ raise ImplError("Job '{kind}' does not implement create_child_job()".format(kind=type(self).__name__))
#######################################################
# Local Private Methods #
@@ -480,9 +447,7 @@ class Job:
# An unexpected return code was returned; fail permanently and report
self.message(
MessageType.ERROR,
- "Internal job process unexpectedly died with exit code {}".format(
- returncode
- ),
+ "Internal job process unexpectedly died with exit code {}".format(returncode),
logfile=self._logfile,
)
returncode = _ReturnCode.PERM_FAIL
@@ -490,11 +455,7 @@ class Job:
# We don't want to retry if we got OK or a permanent fail.
retry_flag = returncode == _ReturnCode.FAIL
- if (
- retry_flag
- and (self._tries <= self._max_retries)
- and not self._scheduler.terminated
- ):
+ if retry_flag and (self._tries <= self._max_retries) and not self._scheduler.terminated:
self.start()
return
@@ -548,9 +509,7 @@ class Job:
elif envelope.message_type is _MessageType.SUBCLASS_CUSTOM_MESSAGE:
self.handle_message(envelope.message)
else:
- assert False, "Unhandled message type '{}': {}".format(
- envelope.message_type, envelope.message
- )
+ assert False, "Unhandled message type '{}': {}".format(envelope.message_type, envelope.message)
# _parent_process_queue()
#
@@ -587,9 +546,7 @@ class Job:
# http://bugs.python.org/issue3831
#
if not self._listening:
- self._scheduler.loop.add_reader(
- self._queue._reader.fileno(), self._parent_recv
- )
+ self._scheduler.loop.add_reader(self._queue._reader.fileno(), self._parent_recv)
self._listening = True
# _parent_stop_listening()
@@ -627,15 +584,7 @@ class Job:
#
class ChildJob:
def __init__(
- self,
- action_name,
- messenger,
- logdir,
- logfile,
- max_retries,
- tries,
- message_element_name,
- message_element_key,
+ self, action_name, messenger, logdir, logfile, max_retries, tries, message_element_name, message_element_key,
):
self.action_name = action_name
@@ -666,9 +615,7 @@ class ChildJob:
# for front end display if not already set or explicitly
# overriden here.
#
- def message(
- self, message_type, message, element_name=None, element_key=None, **kwargs
- ):
+ def message(self, message_type, message, element_name=None, element_key=None, **kwargs):
kwargs["scheduler"] = True
# If default name & key values not provided, set as given job attributes
if element_name is None:
@@ -676,13 +623,7 @@ class ChildJob:
if element_key is None:
element_key = self._message_element_key
self._messenger.message(
- Message(
- message_type,
- message,
- element_name=element_name,
- element_key=element_key,
- **kwargs
- )
+ Message(message_type, message, element_name=element_name, element_key=element_key, **kwargs)
)
# send_message()
@@ -720,11 +661,7 @@ class ChildJob:
# the result of the Job.
#
def child_process(self):
- raise ImplError(
- "ChildJob '{kind}' does not implement child_process()".format(
- kind=type(self).__name__
- )
- )
+ raise ImplError("ChildJob '{kind}' does not implement child_process()".format(kind=type(self).__name__))
# child_process_data()
#
@@ -782,22 +719,18 @@ class ChildJob:
# Time, log and and run the action function
#
- with _signals.suspendable(
- stop_time, resume_time
- ), self._messenger.recorded_messages(self._logfile, self._logdir) as filename:
+ with _signals.suspendable(stop_time, resume_time), self._messenger.recorded_messages(
+ self._logfile, self._logdir
+ ) as filename:
self.message(MessageType.START, self.action_name, logfile=filename)
try:
# Try the task action
- result = (
- self.child_process()
- ) # pylint: disable=assignment-from-no-return
+ result = self.child_process() # pylint: disable=assignment-from-no-return
except SkipJob as e:
elapsed = datetime.datetime.now() - starttime
- self.message(
- MessageType.SKIPPED, str(e), elapsed=elapsed, logfile=filename
- )
+ self.message(MessageType.SKIPPED, str(e), elapsed=elapsed, logfile=filename)
# Alert parent of skip by return code
self._child_shutdown(_ReturnCode.SKIPPED)
@@ -829,9 +762,7 @@ class ChildJob:
# Set return code based on whether or not the error was temporary.
#
- self._child_shutdown(
- _ReturnCode.FAIL if retry_flag else _ReturnCode.PERM_FAIL
- )
+ self._child_shutdown(_ReturnCode.FAIL if retry_flag else _ReturnCode.PERM_FAIL)
except Exception: # pylint: disable=broad-except
@@ -840,16 +771,10 @@ class ChildJob:
# and print it to the log file.
#
elapsed = datetime.datetime.now() - starttime
- detail = "An unhandled exception occured:\n\n{}".format(
- traceback.format_exc()
- )
+ detail = "An unhandled exception occured:\n\n{}".format(traceback.format_exc())
self.message(
- MessageType.BUG,
- self.action_name,
- elapsed=elapsed,
- detail=detail,
- logfile=filename,
+ MessageType.BUG, self.action_name, elapsed=elapsed, detail=detail, logfile=filename,
)
# Unhandled exceptions should permenantly fail
self._child_shutdown(_ReturnCode.PERM_FAIL)
@@ -861,10 +786,7 @@ class ChildJob:
elapsed = datetime.datetime.now() - starttime
self.message(
- MessageType.SUCCESS,
- self.action_name,
- elapsed=elapsed,
- logfile=filename,
+ MessageType.SUCCESS, self.action_name, elapsed=elapsed, logfile=filename,
)
# Shutdown needs to stay outside of the above context manager,