summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorbst-marge-bot <marge-bot@buildstream.build>2020-12-07 08:04:36 +0000
committerbst-marge-bot <marge-bot@buildstream.build>2020-12-07 08:04:36 +0000
commit5f68adb3f80b1d84edf900fde8cf40c84c297f91 (patch)
treef047c9f5d81c78534832f479eb51f7191f48cd55
parent458c05751f089673dc1cae8b8edfabfa32cf21b7 (diff)
parent3f07e6056076736ea63f5ed5b96f240978691da1 (diff)
downloadbuildstream-5f68adb3f80b1d84edf900fde8cf40c84c297f91.tar.gz
Merge branch 'bschubert/optimize-job' into 'master'
First part of the parent-child job separation cleanup See merge request BuildStream/buildstream!2111
-rw-r--r--src/buildstream/_scheduler/jobs/elementjob.py9
-rw-r--r--src/buildstream/_scheduler/jobs/job.py138
-rw-r--r--src/buildstream/_scheduler/queues/queue.py15
3 files changed, 20 insertions, 142 deletions
diff --git a/src/buildstream/_scheduler/jobs/elementjob.py b/src/buildstream/_scheduler/jobs/elementjob.py
index 683129506..c72be4052 100644
--- a/src/buildstream/_scheduler/jobs/elementjob.py
+++ b/src/buildstream/_scheduler/jobs/elementjob.py
@@ -91,12 +91,3 @@ class ChildElementJob(ChildJob):
# Run the action
return self._action_cb(self._element)
-
- def child_process_data(self):
- data = {}
-
- workspace = self._element._get_workspace()
- if workspace is not None:
- data["workspace"] = workspace.to_dict()
-
- return data
diff --git a/src/buildstream/_scheduler/jobs/job.py b/src/buildstream/_scheduler/jobs/job.py
index 2e8f5ca1a..aa71b6e18 100644
--- a/src/buildstream/_scheduler/jobs/job.py
+++ b/src/buildstream/_scheduler/jobs/job.py
@@ -63,20 +63,6 @@ class JobStatus(FastEnum):
SKIPPED = 3
-# Used to distinguish between status messages and return values
-class _Envelope:
- def __init__(self, message_type, message):
- self.message_type = message_type
- self.message = message
-
-
-class _MessageType(FastEnum):
- LOG_MESSAGE = 1
- ERROR = 2
- RESULT = 3
- CHILD_DATA = 4
-
-
# Job()
#
# The Job object represents a task that will run in parallel to the main
@@ -121,7 +107,6 @@ class Job:
self.id = "{}-{}".format(action_name, next(Job._id_generator))
self.name = None # The name of the job, set by the job's subclass
self.action_name = action_name # The action name for the Queue
- self.child_data = None # Data to be sent to the main process
#
# Private members
@@ -179,8 +164,8 @@ class Job:
loop = asyncio.get_event_loop()
async def execute():
- result = await loop.run_in_executor(None, self._child.child_action, pipe_w)
- await self._parent_child_completed(result)
+ ret_code, self._result = await loop.run_in_executor(None, self._child.child_action, pipe_w)
+ await self._parent_child_completed(ret_code)
self._task = loop.create_task(execute())
@@ -367,39 +352,6 @@ class Job:
self._pipe_r.close()
self._pipe_r = self._task = None
- # _parent_process_envelope()
- #
- # Processes a message Envelope deserialized form the message pipe.
- #
- # this will have the side effect of assigning some local state
- # on the Job in the parent process for later inspection when the
- # child process completes.
- #
- # Args:
- # envelope (Envelope): The message envelope
- #
- def _parent_process_envelope(self, envelope):
- if not self._listening:
- return
-
- if envelope.message_type is _MessageType.LOG_MESSAGE:
- # Propagate received messages from children
- # back through the context.
- self._messenger.message(envelope.message)
- elif envelope.message_type is _MessageType.ERROR:
- # For regression tests only, save the last error domain / reason
- # reported from a child task in the main process, this global state
- # is currently managed in _exceptions.py
- set_last_task_error(envelope.message["domain"], envelope.message["reason"])
- elif envelope.message_type is _MessageType.RESULT:
- assert self._result is None
- self._result = envelope.message
- elif envelope.message_type is _MessageType.CHILD_DATA:
- # If we retry a job, we assign a new value to this
- self.child_data = envelope.message
- else:
- assert False, "Unhandled message type '{}': {}".format(envelope.message_type, envelope.message)
-
# _parent_process_pipe()
#
# Reads back message envelopes from the message pipe
@@ -408,11 +360,12 @@ class Job:
def _parent_process_pipe(self):
while self._pipe_r.poll():
try:
- envelope = self._pipe_r.recv()
+ message = self._pipe_r.recv()
except EOFError:
self._parent_stop_listening()
break
- self._parent_process_envelope(envelope)
+
+ self._messenger.message(message)
# _parent_recv()
#
@@ -527,20 +480,6 @@ class ChildJob:
def child_process(self):
raise ImplError("ChildJob '{kind}' does not implement child_process()".format(kind=type(self).__name__))
- # child_process_data()
- #
- # Abstract method to retrieve additional data that should be
- # returned to the parent process. Note that the job result is
- # retrieved independently.
- #
- # Values can later be retrieved in Job.child_data.
- #
- # Returns:
- # (dict) A dict containing values to be reported to the main process
- #
- def child_process_data(self):
- return {}
-
# child_action()
#
# Perform the action in the child process, this calls the action_cb.
@@ -567,7 +506,7 @@ class ChildJob:
with self._terminate_lock:
self._thread_id = threading.current_thread().ident
if self._should_terminate:
- return _ReturnCode.TERMINATED
+ return _ReturnCode.TERMINATED, None
try:
# Try the task action
@@ -577,7 +516,7 @@ class ChildJob:
self.message(MessageType.SKIPPED, str(e), elapsed=elapsed, logfile=filename)
# Alert parent of skip by return code
- return _ReturnCode.SKIPPED
+ return _ReturnCode.SKIPPED, None
except BstError as e:
elapsed = datetime.datetime.now() - timeinfo.start_time
retry_flag = e.temporary
@@ -599,14 +538,12 @@ class ChildJob:
sandbox=e.sandbox,
)
- self._send_message(_MessageType.CHILD_DATA, self.child_process_data())
-
# Report the exception to the parent (for internal testing purposes)
- self._child_send_error(e)
+ set_last_task_error(e.domain, e.reason)
# Set return code based on whether or not the error was temporary.
#
- return _ReturnCode.FAIL if retry_flag else _ReturnCode.PERM_FAIL
+ return _ReturnCode.FAIL if retry_flag else _ReturnCode.PERM_FAIL, None
except Exception: # pylint: disable=broad-except
# If an unhandled (not normalized to BstError) occurs, that's a bug,
@@ -618,25 +555,22 @@ class ChildJob:
self.message(MessageType.BUG, self.action_name, elapsed=elapsed, detail=detail, logfile=filename)
# Unhandled exceptions should permenantly fail
- return _ReturnCode.PERM_FAIL
+ return _ReturnCode.PERM_FAIL, None
else:
# No exception occurred in the action
- self._send_message(_MessageType.CHILD_DATA, self.child_process_data())
- self._child_send_result(result)
-
elapsed = datetime.datetime.now() - timeinfo.start_time
self.message(MessageType.SUCCESS, self.action_name, elapsed=elapsed, logfile=filename)
# Shutdown needs to stay outside of the above context manager,
# make sure we dont try to handle SIGTERM while the process
# is already busy in sys.exit()
- return _ReturnCode.OK
+ return _ReturnCode.OK, result
finally:
self._thread_id = None
except TerminateException:
self._thread_id = None
- return _ReturnCode.TERMINATED
+ return _ReturnCode.TERMINATED, None
finally:
self._pipe_w.close()
@@ -663,52 +597,6 @@ class ChildJob:
# Local Private Methods #
#######################################################
- # _send_message()
- #
- # Send data in a message to the parent Job, running in the main process.
- #
- # Args:
- # message_type (str): The type of message to send.
- # message_data (any): A simple object (must be pickle-able, i.e.
- # strings, lists, dicts, numbers, but not Element
- # instances). This is sent to the parent Job.
- #
- def _send_message(self, message_type, message_data):
- self._pipe_w.send(_Envelope(message_type, message_data))
-
- # _child_send_error()
- #
- # Sends an error to the main process through the message pipe
- #
- # Args:
- # e (Exception): The error to send
- #
- def _child_send_error(self, e):
- domain = None
- reason = None
-
- if isinstance(e, BstError):
- domain = e.domain
- reason = e.reason
-
- self._send_message(_MessageType.ERROR, {"domain": domain, "reason": reason})
-
- # _child_send_result()
- #
- # Sends the serialized result to the main process through the message pipe
- #
- # Args:
- # result (any): None, or a simple object (must be pickle-able, i.e.
- # strings, lists, dicts, numbers, but not Element
- # instances).
- #
- # Note: If None is passed here, nothing needs to be sent, the
- # result member in the parent process will simply remain None.
- #
- def _child_send_result(self, result):
- if result is not None:
- self._send_message(_MessageType.RESULT, result)
-
# _child_message_handler()
#
# A Context delegate for handling messages, this replaces the
@@ -736,4 +624,4 @@ class ChildJob:
if message.message_type == MessageType.LOG:
return
- self._send_message(_MessageType.LOG_MESSAGE, message)
+ self._pipe_w.send(message)
diff --git a/src/buildstream/_scheduler/queues/queue.py b/src/buildstream/_scheduler/queues/queue.py
index e05d95188..38595729c 100644
--- a/src/buildstream/_scheduler/queues/queue.py
+++ b/src/buildstream/_scheduler/queues/queue.py
@@ -269,19 +269,18 @@ class Queue:
#
# Args:
# element (Element): The element which completed
- # job (Job): The job which completed
#
- def _update_workspaces(self, element, job):
- workspace_dict = None
- if job.child_data:
- workspace_dict = job.child_data.get("workspace", None)
+ def _update_workspaces(self, element):
+ # FIXME: Does this really needs to be done for every job or only some?
+ # If some, we should only run it for those.
+ workspace = element._get_workspace()
# Handle any workspace modifications now
#
- if workspace_dict:
+ if workspace:
context = element._get_context()
workspaces = context.get_workspaces()
- if workspaces.update_workspace(element._get_full_name(), workspace_dict):
+ if workspaces.update_workspace(element._get_full_name(), workspace.to_dict()):
try:
workspaces.save_config()
except BstError as e:
@@ -311,7 +310,7 @@ class Queue:
# Update values that need to be synchronized in the main task
# before calling any queue implementation
- self._update_workspaces(element, job)
+ self._update_workspaces(element)
# Give the result of the job to the Queue implementor,
# and determine if it should be considered as processed