From 79d1eca85a59a2e6ecda4eac536f70d7178d8736 Mon Sep 17 00:00:00 2001 From: Benjamin Schubert Date: Sat, 5 Dec 2020 15:35:22 +0000 Subject: job.py: Stop sending the result from a job through the pipe This is not needed now that jobs run in the smae process, we can just return the value from the method. --- src/buildstream/_scheduler/jobs/job.py | 38 +++++++--------------------------- 1 file changed, 8 insertions(+), 30 deletions(-) diff --git a/src/buildstream/_scheduler/jobs/job.py b/src/buildstream/_scheduler/jobs/job.py index b2c65b6ba..3fde4148d 100644 --- a/src/buildstream/_scheduler/jobs/job.py +++ b/src/buildstream/_scheduler/jobs/job.py @@ -73,7 +73,6 @@ class _Envelope: class _MessageType(FastEnum): LOG_MESSAGE = 1 ERROR = 2 - RESULT = 3 # Job() @@ -177,8 +176,8 @@ class Job: loop = asyncio.get_event_loop() async def execute(): - result = await loop.run_in_executor(None, self._child.child_action, pipe_w) - await self._parent_child_completed(result) + ret_code, self._result = await loop.run_in_executor(None, self._child.child_action, pipe_w) + await self._parent_child_completed(ret_code) self._task = loop.create_task(execute()) @@ -389,9 +388,6 @@ class Job: # reported from a child task in the main process, this global state # is currently managed in _exceptions.py set_last_task_error(envelope.message["domain"], envelope.message["reason"]) - elif envelope.message_type is _MessageType.RESULT: - assert self._result is None - self._result = envelope.message else: assert False, "Unhandled message type '{}': {}".format(envelope.message_type, envelope.message) @@ -548,7 +544,7 @@ class ChildJob: with self._terminate_lock: self._thread_id = threading.current_thread().ident if self._should_terminate: - return _ReturnCode.TERMINATED + return _ReturnCode.TERMINATED, None try: # Try the task action @@ -558,7 +554,7 @@ class ChildJob: self.message(MessageType.SKIPPED, str(e), elapsed=elapsed, logfile=filename) # Alert parent of skip by return code - return _ReturnCode.SKIPPED + return _ReturnCode.SKIPPED, None except BstError as e: elapsed = datetime.datetime.now() - timeinfo.start_time retry_flag = e.temporary @@ -585,7 +581,7 @@ class ChildJob: # Set return code based on whether or not the error was temporary. # - return _ReturnCode.FAIL if retry_flag else _ReturnCode.PERM_FAIL + return _ReturnCode.FAIL if retry_flag else _ReturnCode.PERM_FAIL, None except Exception: # pylint: disable=broad-except # If an unhandled (not normalized to BstError) occurs, that's a bug, @@ -597,24 +593,22 @@ class ChildJob: self.message(MessageType.BUG, self.action_name, elapsed=elapsed, detail=detail, logfile=filename) # Unhandled exceptions should permenantly fail - return _ReturnCode.PERM_FAIL + return _ReturnCode.PERM_FAIL, None else: # No exception occurred in the action - self._child_send_result(result) - elapsed = datetime.datetime.now() - timeinfo.start_time self.message(MessageType.SUCCESS, self.action_name, elapsed=elapsed, logfile=filename) # Shutdown needs to stay outside of the above context manager, # make sure we dont try to handle SIGTERM while the process # is already busy in sys.exit() - return _ReturnCode.OK + return _ReturnCode.OK, result finally: self._thread_id = None except TerminateException: self._thread_id = None - return _ReturnCode.TERMINATED + return _ReturnCode.TERMINATED, None finally: self._pipe_w.close() @@ -671,22 +665,6 @@ class ChildJob: self._send_message(_MessageType.ERROR, {"domain": domain, "reason": reason}) - # _child_send_result() - # - # Sends the serialized result to the main process through the message pipe - # - # Args: - # result (any): None, or a simple object (must be pickle-able, i.e. - # strings, lists, dicts, numbers, but not Element - # instances). - # - # Note: If None is passed here, nothing needs to be sent, the - # result member in the parent process will simply remain None. - # - def _child_send_result(self, result): - if result is not None: - self._send_message(_MessageType.RESULT, result) - # _child_message_handler() # # A Context delegate for handling messages, this replaces the -- cgit v1.2.1