summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBenjamin Schubert <bschubert15@bloomberg.net>2020-12-05 15:35:22 +0000
committerBenjamin Schubert <bschubert15@bloomberg.net>2020-12-05 15:35:22 +0000
commit79d1eca85a59a2e6ecda4eac536f70d7178d8736 (patch)
tree78d234560fd9822bcbcd4a8c62096046f40a04b2
parent57beae424a5ce86cfe9d27d410d191abc3f69308 (diff)
downloadbuildstream-79d1eca85a59a2e6ecda4eac536f70d7178d8736.tar.gz
job.py: Stop sending the result from a job through the pipe
This is not needed now that jobs run in the smae process, we can just return the value from the method.
-rw-r--r--src/buildstream/_scheduler/jobs/job.py38
1 files changed, 8 insertions, 30 deletions
diff --git a/src/buildstream/_scheduler/jobs/job.py b/src/buildstream/_scheduler/jobs/job.py
index b2c65b6ba..3fde4148d 100644
--- a/src/buildstream/_scheduler/jobs/job.py
+++ b/src/buildstream/_scheduler/jobs/job.py
@@ -73,7 +73,6 @@ class _Envelope:
class _MessageType(FastEnum):
LOG_MESSAGE = 1
ERROR = 2
- RESULT = 3
# Job()
@@ -177,8 +176,8 @@ class Job:
loop = asyncio.get_event_loop()
async def execute():
- result = await loop.run_in_executor(None, self._child.child_action, pipe_w)
- await self._parent_child_completed(result)
+ ret_code, self._result = await loop.run_in_executor(None, self._child.child_action, pipe_w)
+ await self._parent_child_completed(ret_code)
self._task = loop.create_task(execute())
@@ -389,9 +388,6 @@ class Job:
# reported from a child task in the main process, this global state
# is currently managed in _exceptions.py
set_last_task_error(envelope.message["domain"], envelope.message["reason"])
- elif envelope.message_type is _MessageType.RESULT:
- assert self._result is None
- self._result = envelope.message
else:
assert False, "Unhandled message type '{}': {}".format(envelope.message_type, envelope.message)
@@ -548,7 +544,7 @@ class ChildJob:
with self._terminate_lock:
self._thread_id = threading.current_thread().ident
if self._should_terminate:
- return _ReturnCode.TERMINATED
+ return _ReturnCode.TERMINATED, None
try:
# Try the task action
@@ -558,7 +554,7 @@ class ChildJob:
self.message(MessageType.SKIPPED, str(e), elapsed=elapsed, logfile=filename)
# Alert parent of skip by return code
- return _ReturnCode.SKIPPED
+ return _ReturnCode.SKIPPED, None
except BstError as e:
elapsed = datetime.datetime.now() - timeinfo.start_time
retry_flag = e.temporary
@@ -585,7 +581,7 @@ class ChildJob:
# Set return code based on whether or not the error was temporary.
#
- return _ReturnCode.FAIL if retry_flag else _ReturnCode.PERM_FAIL
+ return _ReturnCode.FAIL if retry_flag else _ReturnCode.PERM_FAIL, None
except Exception: # pylint: disable=broad-except
# If an unhandled (not normalized to BstError) occurs, that's a bug,
@@ -597,24 +593,22 @@ class ChildJob:
self.message(MessageType.BUG, self.action_name, elapsed=elapsed, detail=detail, logfile=filename)
# Unhandled exceptions should permenantly fail
- return _ReturnCode.PERM_FAIL
+ return _ReturnCode.PERM_FAIL, None
else:
# No exception occurred in the action
- self._child_send_result(result)
-
elapsed = datetime.datetime.now() - timeinfo.start_time
self.message(MessageType.SUCCESS, self.action_name, elapsed=elapsed, logfile=filename)
# Shutdown needs to stay outside of the above context manager,
# make sure we dont try to handle SIGTERM while the process
# is already busy in sys.exit()
- return _ReturnCode.OK
+ return _ReturnCode.OK, result
finally:
self._thread_id = None
except TerminateException:
self._thread_id = None
- return _ReturnCode.TERMINATED
+ return _ReturnCode.TERMINATED, None
finally:
self._pipe_w.close()
@@ -671,22 +665,6 @@ class ChildJob:
self._send_message(_MessageType.ERROR, {"domain": domain, "reason": reason})
- # _child_send_result()
- #
- # Sends the serialized result to the main process through the message pipe
- #
- # Args:
- # result (any): None, or a simple object (must be pickle-able, i.e.
- # strings, lists, dicts, numbers, but not Element
- # instances).
- #
- # Note: If None is passed here, nothing needs to be sent, the
- # result member in the parent process will simply remain None.
- #
- def _child_send_result(self, result):
- if result is not None:
- self._send_message(_MessageType.RESULT, result)
-
# _child_message_handler()
#
# A Context delegate for handling messages, this replaces the