diff options
Diffstat (limited to 'src/buildstream/_scheduler/jobs/job.py')
-rw-r--r-- | src/buildstream/_scheduler/jobs/job.py | 136 |
1 files changed, 29 insertions, 107 deletions
diff --git a/src/buildstream/_scheduler/jobs/job.py b/src/buildstream/_scheduler/jobs/job.py index 3a5694a71..8baf8fe1b 100644 --- a/src/buildstream/_scheduler/jobs/job.py +++ b/src/buildstream/_scheduler/jobs/job.py @@ -146,13 +146,9 @@ class Job: self._terminated = False # Whether this job has been explicitly terminated self._logfile = logfile - self._message_element_name = ( - None # The plugin instance element name for messaging - ) + self._message_element_name = None # The plugin instance element name for messaging self._message_element_key = None # The element key for messaging - self._element = ( - None # The Element() passed to the Job() constructor, if applicable - ) + self._element = None # The Element() passed to the Job() constructor, if applicable # set_name() # @@ -182,15 +178,9 @@ class Job: self._message_element_key, ) - if ( - self._scheduler.context.platform.does_multiprocessing_start_require_pickling() - ): - pickled = pickle_child_job( - child_job, self._scheduler.context.get_projects(), - ) - self._process = Process( - target=do_pickled_child_job, args=[pickled, self._queue], - ) + if self._scheduler.context.platform.does_multiprocessing_start_require_pickling(): + pickled = pickle_child_job(child_job, self._scheduler.context.get_projects(),) + self._process = Process(target=do_pickled_child_job, args=[pickled, self._queue],) else: self._process = Process(target=child_job.child_action, args=[self._queue],) @@ -198,9 +188,7 @@ class Job: # the child process does not inherit the parent's state, but the main # process will be notified of any signal after we launch the child. # - with _signals.blocked( - [signal.SIGINT, signal.SIGTSTP, signal.SIGTERM], ignore=False - ): + with _signals.blocked([signal.SIGINT, signal.SIGTSTP, signal.SIGTERM], ignore=False): self._process.start() # Wait for the child task to complete. @@ -282,8 +270,7 @@ class Job: def kill(self): # Force kill self.message( - MessageType.WARN, - "{} did not terminate gracefully, killing".format(self.action_name), + MessageType.WARN, "{} did not terminate gracefully, killing".format(self.action_name), ) utils._kill_process_tree(self._process.pid) @@ -358,22 +345,14 @@ class Job: # kwargs: Remaining Message() constructor arguments, note that you can # override 'element_name' and 'element_key' this way. # - def message( - self, message_type, message, element_name=None, element_key=None, **kwargs - ): + def message(self, message_type, message, element_name=None, element_key=None, **kwargs): kwargs["scheduler"] = True # If default name & key values not provided, set as given job attributes if element_name is None: element_name = self._message_element_name if element_key is None: element_key = self._message_element_key - message = Message( - message_type, - message, - element_name=element_name, - element_key=element_key, - **kwargs - ) + message = Message(message_type, message, element_name=element_name, element_key=element_key, **kwargs) self._scheduler.notify_messenger(message) # get_element() @@ -405,11 +384,7 @@ class Job: # lists, dicts, numbers, but not Element instances). # def handle_message(self, message): - raise ImplError( - "Job '{kind}' does not implement handle_message()".format( - kind=type(self).__name__ - ) - ) + raise ImplError("Job '{kind}' does not implement handle_message()".format(kind=type(self).__name__)) # parent_complete() # @@ -421,11 +396,7 @@ class Job: # result (any): The result returned by child_process(). # def parent_complete(self, status, result): - raise ImplError( - "Job '{kind}' does not implement parent_complete()".format( - kind=type(self).__name__ - ) - ) + raise ImplError("Job '{kind}' does not implement parent_complete()".format(kind=type(self).__name__)) # create_child_job() # @@ -443,11 +414,7 @@ class Job: # (ChildJob): An instance of a subclass of ChildJob. # def create_child_job(self, *args, **kwargs): - raise ImplError( - "Job '{kind}' does not implement create_child_job()".format( - kind=type(self).__name__ - ) - ) + raise ImplError("Job '{kind}' does not implement create_child_job()".format(kind=type(self).__name__)) ####################################################### # Local Private Methods # @@ -480,9 +447,7 @@ class Job: # An unexpected return code was returned; fail permanently and report self.message( MessageType.ERROR, - "Internal job process unexpectedly died with exit code {}".format( - returncode - ), + "Internal job process unexpectedly died with exit code {}".format(returncode), logfile=self._logfile, ) returncode = _ReturnCode.PERM_FAIL @@ -490,11 +455,7 @@ class Job: # We don't want to retry if we got OK or a permanent fail. retry_flag = returncode == _ReturnCode.FAIL - if ( - retry_flag - and (self._tries <= self._max_retries) - and not self._scheduler.terminated - ): + if retry_flag and (self._tries <= self._max_retries) and not self._scheduler.terminated: self.start() return @@ -548,9 +509,7 @@ class Job: elif envelope.message_type is _MessageType.SUBCLASS_CUSTOM_MESSAGE: self.handle_message(envelope.message) else: - assert False, "Unhandled message type '{}': {}".format( - envelope.message_type, envelope.message - ) + assert False, "Unhandled message type '{}': {}".format(envelope.message_type, envelope.message) # _parent_process_queue() # @@ -587,9 +546,7 @@ class Job: # http://bugs.python.org/issue3831 # if not self._listening: - self._scheduler.loop.add_reader( - self._queue._reader.fileno(), self._parent_recv - ) + self._scheduler.loop.add_reader(self._queue._reader.fileno(), self._parent_recv) self._listening = True # _parent_stop_listening() @@ -627,15 +584,7 @@ class Job: # class ChildJob: def __init__( - self, - action_name, - messenger, - logdir, - logfile, - max_retries, - tries, - message_element_name, - message_element_key, + self, action_name, messenger, logdir, logfile, max_retries, tries, message_element_name, message_element_key, ): self.action_name = action_name @@ -666,9 +615,7 @@ class ChildJob: # for front end display if not already set or explicitly # overriden here. # - def message( - self, message_type, message, element_name=None, element_key=None, **kwargs - ): + def message(self, message_type, message, element_name=None, element_key=None, **kwargs): kwargs["scheduler"] = True # If default name & key values not provided, set as given job attributes if element_name is None: @@ -676,13 +623,7 @@ class ChildJob: if element_key is None: element_key = self._message_element_key self._messenger.message( - Message( - message_type, - message, - element_name=element_name, - element_key=element_key, - **kwargs - ) + Message(message_type, message, element_name=element_name, element_key=element_key, **kwargs) ) # send_message() @@ -720,11 +661,7 @@ class ChildJob: # the result of the Job. # def child_process(self): - raise ImplError( - "ChildJob '{kind}' does not implement child_process()".format( - kind=type(self).__name__ - ) - ) + raise ImplError("ChildJob '{kind}' does not implement child_process()".format(kind=type(self).__name__)) # child_process_data() # @@ -782,22 +719,18 @@ class ChildJob: # Time, log and and run the action function # - with _signals.suspendable( - stop_time, resume_time - ), self._messenger.recorded_messages(self._logfile, self._logdir) as filename: + with _signals.suspendable(stop_time, resume_time), self._messenger.recorded_messages( + self._logfile, self._logdir + ) as filename: self.message(MessageType.START, self.action_name, logfile=filename) try: # Try the task action - result = ( - self.child_process() - ) # pylint: disable=assignment-from-no-return + result = self.child_process() # pylint: disable=assignment-from-no-return except SkipJob as e: elapsed = datetime.datetime.now() - starttime - self.message( - MessageType.SKIPPED, str(e), elapsed=elapsed, logfile=filename - ) + self.message(MessageType.SKIPPED, str(e), elapsed=elapsed, logfile=filename) # Alert parent of skip by return code self._child_shutdown(_ReturnCode.SKIPPED) @@ -829,9 +762,7 @@ class ChildJob: # Set return code based on whether or not the error was temporary. # - self._child_shutdown( - _ReturnCode.FAIL if retry_flag else _ReturnCode.PERM_FAIL - ) + self._child_shutdown(_ReturnCode.FAIL if retry_flag else _ReturnCode.PERM_FAIL) except Exception: # pylint: disable=broad-except @@ -840,16 +771,10 @@ class ChildJob: # and print it to the log file. # elapsed = datetime.datetime.now() - starttime - detail = "An unhandled exception occured:\n\n{}".format( - traceback.format_exc() - ) + detail = "An unhandled exception occured:\n\n{}".format(traceback.format_exc()) self.message( - MessageType.BUG, - self.action_name, - elapsed=elapsed, - detail=detail, - logfile=filename, + MessageType.BUG, self.action_name, elapsed=elapsed, detail=detail, logfile=filename, ) # Unhandled exceptions should permenantly fail self._child_shutdown(_ReturnCode.PERM_FAIL) @@ -861,10 +786,7 @@ class ChildJob: elapsed = datetime.datetime.now() - starttime self.message( - MessageType.SUCCESS, - self.action_name, - elapsed=elapsed, - logfile=filename, + MessageType.SUCCESS, self.action_name, elapsed=elapsed, logfile=filename, ) # Shutdown needs to stay outside of the above context manager, |