diff options
author | bst-marge-bot <marge-bot@buildstream.build> | 2020-12-07 08:04:36 +0000 |
---|---|---|
committer | bst-marge-bot <marge-bot@buildstream.build> | 2020-12-07 08:04:36 +0000 |
commit | 5f68adb3f80b1d84edf900fde8cf40c84c297f91 (patch) | |
tree | f047c9f5d81c78534832f479eb51f7191f48cd55 | |
parent | 458c05751f089673dc1cae8b8edfabfa32cf21b7 (diff) | |
parent | 3f07e6056076736ea63f5ed5b96f240978691da1 (diff) | |
download | buildstream-5f68adb3f80b1d84edf900fde8cf40c84c297f91.tar.gz |
Merge branch 'bschubert/optimize-job' into 'master'
First part of the parent-child job separation cleanup
See merge request BuildStream/buildstream!2111
-rw-r--r-- | src/buildstream/_scheduler/jobs/elementjob.py | 9 | ||||
-rw-r--r-- | src/buildstream/_scheduler/jobs/job.py | 138 | ||||
-rw-r--r-- | src/buildstream/_scheduler/queues/queue.py | 15 |
3 files changed, 20 insertions, 142 deletions
diff --git a/src/buildstream/_scheduler/jobs/elementjob.py b/src/buildstream/_scheduler/jobs/elementjob.py index 683129506..c72be4052 100644 --- a/src/buildstream/_scheduler/jobs/elementjob.py +++ b/src/buildstream/_scheduler/jobs/elementjob.py @@ -91,12 +91,3 @@ class ChildElementJob(ChildJob): # Run the action return self._action_cb(self._element) - - def child_process_data(self): - data = {} - - workspace = self._element._get_workspace() - if workspace is not None: - data["workspace"] = workspace.to_dict() - - return data diff --git a/src/buildstream/_scheduler/jobs/job.py b/src/buildstream/_scheduler/jobs/job.py index 2e8f5ca1a..aa71b6e18 100644 --- a/src/buildstream/_scheduler/jobs/job.py +++ b/src/buildstream/_scheduler/jobs/job.py @@ -63,20 +63,6 @@ class JobStatus(FastEnum): SKIPPED = 3 -# Used to distinguish between status messages and return values -class _Envelope: - def __init__(self, message_type, message): - self.message_type = message_type - self.message = message - - -class _MessageType(FastEnum): - LOG_MESSAGE = 1 - ERROR = 2 - RESULT = 3 - CHILD_DATA = 4 - - # Job() # # The Job object represents a task that will run in parallel to the main @@ -121,7 +107,6 @@ class Job: self.id = "{}-{}".format(action_name, next(Job._id_generator)) self.name = None # The name of the job, set by the job's subclass self.action_name = action_name # The action name for the Queue - self.child_data = None # Data to be sent to the main process # # Private members @@ -179,8 +164,8 @@ class Job: loop = asyncio.get_event_loop() async def execute(): - result = await loop.run_in_executor(None, self._child.child_action, pipe_w) - await self._parent_child_completed(result) + ret_code, self._result = await loop.run_in_executor(None, self._child.child_action, pipe_w) + await self._parent_child_completed(ret_code) self._task = loop.create_task(execute()) @@ -367,39 +352,6 @@ class Job: self._pipe_r.close() self._pipe_r = self._task = None - # _parent_process_envelope() - # - # Processes a message Envelope deserialized form the message pipe. - # - # this will have the side effect of assigning some local state - # on the Job in the parent process for later inspection when the - # child process completes. - # - # Args: - # envelope (Envelope): The message envelope - # - def _parent_process_envelope(self, envelope): - if not self._listening: - return - - if envelope.message_type is _MessageType.LOG_MESSAGE: - # Propagate received messages from children - # back through the context. - self._messenger.message(envelope.message) - elif envelope.message_type is _MessageType.ERROR: - # For regression tests only, save the last error domain / reason - # reported from a child task in the main process, this global state - # is currently managed in _exceptions.py - set_last_task_error(envelope.message["domain"], envelope.message["reason"]) - elif envelope.message_type is _MessageType.RESULT: - assert self._result is None - self._result = envelope.message - elif envelope.message_type is _MessageType.CHILD_DATA: - # If we retry a job, we assign a new value to this - self.child_data = envelope.message - else: - assert False, "Unhandled message type '{}': {}".format(envelope.message_type, envelope.message) - # _parent_process_pipe() # # Reads back message envelopes from the message pipe @@ -408,11 +360,12 @@ class Job: def _parent_process_pipe(self): while self._pipe_r.poll(): try: - envelope = self._pipe_r.recv() + message = self._pipe_r.recv() except EOFError: self._parent_stop_listening() break - self._parent_process_envelope(envelope) + + self._messenger.message(message) # _parent_recv() # @@ -527,20 +480,6 @@ class ChildJob: def child_process(self): raise ImplError("ChildJob '{kind}' does not implement child_process()".format(kind=type(self).__name__)) - # child_process_data() - # - # Abstract method to retrieve additional data that should be - # returned to the parent process. Note that the job result is - # retrieved independently. - # - # Values can later be retrieved in Job.child_data. - # - # Returns: - # (dict) A dict containing values to be reported to the main process - # - def child_process_data(self): - return {} - # child_action() # # Perform the action in the child process, this calls the action_cb. @@ -567,7 +506,7 @@ class ChildJob: with self._terminate_lock: self._thread_id = threading.current_thread().ident if self._should_terminate: - return _ReturnCode.TERMINATED + return _ReturnCode.TERMINATED, None try: # Try the task action @@ -577,7 +516,7 @@ class ChildJob: self.message(MessageType.SKIPPED, str(e), elapsed=elapsed, logfile=filename) # Alert parent of skip by return code - return _ReturnCode.SKIPPED + return _ReturnCode.SKIPPED, None except BstError as e: elapsed = datetime.datetime.now() - timeinfo.start_time retry_flag = e.temporary @@ -599,14 +538,12 @@ class ChildJob: sandbox=e.sandbox, ) - self._send_message(_MessageType.CHILD_DATA, self.child_process_data()) - # Report the exception to the parent (for internal testing purposes) - self._child_send_error(e) + set_last_task_error(e.domain, e.reason) # Set return code based on whether or not the error was temporary. # - return _ReturnCode.FAIL if retry_flag else _ReturnCode.PERM_FAIL + return _ReturnCode.FAIL if retry_flag else _ReturnCode.PERM_FAIL, None except Exception: # pylint: disable=broad-except # If an unhandled (not normalized to BstError) occurs, that's a bug, @@ -618,25 +555,22 @@ class ChildJob: self.message(MessageType.BUG, self.action_name, elapsed=elapsed, detail=detail, logfile=filename) # Unhandled exceptions should permenantly fail - return _ReturnCode.PERM_FAIL + return _ReturnCode.PERM_FAIL, None else: # No exception occurred in the action - self._send_message(_MessageType.CHILD_DATA, self.child_process_data()) - self._child_send_result(result) - elapsed = datetime.datetime.now() - timeinfo.start_time self.message(MessageType.SUCCESS, self.action_name, elapsed=elapsed, logfile=filename) # Shutdown needs to stay outside of the above context manager, # make sure we dont try to handle SIGTERM while the process # is already busy in sys.exit() - return _ReturnCode.OK + return _ReturnCode.OK, result finally: self._thread_id = None except TerminateException: self._thread_id = None - return _ReturnCode.TERMINATED + return _ReturnCode.TERMINATED, None finally: self._pipe_w.close() @@ -663,52 +597,6 @@ class ChildJob: # Local Private Methods # ####################################################### - # _send_message() - # - # Send data in a message to the parent Job, running in the main process. - # - # Args: - # message_type (str): The type of message to send. - # message_data (any): A simple object (must be pickle-able, i.e. - # strings, lists, dicts, numbers, but not Element - # instances). This is sent to the parent Job. - # - def _send_message(self, message_type, message_data): - self._pipe_w.send(_Envelope(message_type, message_data)) - - # _child_send_error() - # - # Sends an error to the main process through the message pipe - # - # Args: - # e (Exception): The error to send - # - def _child_send_error(self, e): - domain = None - reason = None - - if isinstance(e, BstError): - domain = e.domain - reason = e.reason - - self._send_message(_MessageType.ERROR, {"domain": domain, "reason": reason}) - - # _child_send_result() - # - # Sends the serialized result to the main process through the message pipe - # - # Args: - # result (any): None, or a simple object (must be pickle-able, i.e. - # strings, lists, dicts, numbers, but not Element - # instances). - # - # Note: If None is passed here, nothing needs to be sent, the - # result member in the parent process will simply remain None. - # - def _child_send_result(self, result): - if result is not None: - self._send_message(_MessageType.RESULT, result) - # _child_message_handler() # # A Context delegate for handling messages, this replaces the @@ -736,4 +624,4 @@ class ChildJob: if message.message_type == MessageType.LOG: return - self._send_message(_MessageType.LOG_MESSAGE, message) + self._pipe_w.send(message) diff --git a/src/buildstream/_scheduler/queues/queue.py b/src/buildstream/_scheduler/queues/queue.py index e05d95188..38595729c 100644 --- a/src/buildstream/_scheduler/queues/queue.py +++ b/src/buildstream/_scheduler/queues/queue.py @@ -269,19 +269,18 @@ class Queue: # # Args: # element (Element): The element which completed - # job (Job): The job which completed # - def _update_workspaces(self, element, job): - workspace_dict = None - if job.child_data: - workspace_dict = job.child_data.get("workspace", None) + def _update_workspaces(self, element): + # FIXME: Does this really needs to be done for every job or only some? + # If some, we should only run it for those. + workspace = element._get_workspace() # Handle any workspace modifications now # - if workspace_dict: + if workspace: context = element._get_context() workspaces = context.get_workspaces() - if workspaces.update_workspace(element._get_full_name(), workspace_dict): + if workspaces.update_workspace(element._get_full_name(), workspace.to_dict()): try: workspaces.save_config() except BstError as e: @@ -311,7 +310,7 @@ class Queue: # Update values that need to be synchronized in the main task # before calling any queue implementation - self._update_workspaces(element, job) + self._update_workspaces(element) # Give the result of the job to the Queue implementor, # and determine if it should be considered as processed |