summaryrefslogtreecommitdiff
path: root/src/buildstream/_scheduler
diff options
context:
space:
mode:
authorBenjamin Schubert <bschubert15@bloomberg.net>2020-12-05 17:08:27 +0000
committerBenjamin Schubert <bschubert15@bloomberg.net>2020-12-05 17:08:27 +0000
commit3f07e6056076736ea63f5ed5b96f240978691da1 (patch)
treef047c9f5d81c78534832f479eb51f7191f48cd55 /src/buildstream/_scheduler
parentded144e97de4c6bb4c211204761b0104e437df29 (diff)
downloadbuildstream-3f07e6056076736ea63f5ed5b96f240978691da1.tar.gz
job.py: Simplify handling of messages through the parent-child pipebschubert/optimize-job
Now that the only type of message that goes through are messages for the messenger, we can remove the enveloppe and only ever handle messenger's messages
Diffstat (limited to 'src/buildstream/_scheduler')
-rw-r--r--src/buildstream/_scheduler/jobs/job.py53
1 files changed, 4 insertions, 49 deletions
diff --git a/src/buildstream/_scheduler/jobs/job.py b/src/buildstream/_scheduler/jobs/job.py
index 322849a79..aa71b6e18 100644
--- a/src/buildstream/_scheduler/jobs/job.py
+++ b/src/buildstream/_scheduler/jobs/job.py
@@ -63,17 +63,6 @@ class JobStatus(FastEnum):
SKIPPED = 3
-# Used to distinguish between status messages and return values
-class _Envelope:
- def __init__(self, message_type, message):
- self.message_type = message_type
- self.message = message
-
-
-class _MessageType(FastEnum):
- LOG_MESSAGE = 1
-
-
# Job()
#
# The Job object represents a task that will run in parallel to the main
@@ -363,28 +352,6 @@ class Job:
self._pipe_r.close()
self._pipe_r = self._task = None
- # _parent_process_envelope()
- #
- # Processes a message Envelope deserialized form the message pipe.
- #
- # this will have the side effect of assigning some local state
- # on the Job in the parent process for later inspection when the
- # child process completes.
- #
- # Args:
- # envelope (Envelope): The message envelope
- #
- def _parent_process_envelope(self, envelope):
- if not self._listening:
- return
-
- if envelope.message_type is _MessageType.LOG_MESSAGE:
- # Propagate received messages from children
- # back through the context.
- self._messenger.message(envelope.message)
- else:
- assert False, "Unhandled message type '{}': {}".format(envelope.message_type, envelope.message)
-
# _parent_process_pipe()
#
# Reads back message envelopes from the message pipe
@@ -393,11 +360,12 @@ class Job:
def _parent_process_pipe(self):
while self._pipe_r.poll():
try:
- envelope = self._pipe_r.recv()
+ message = self._pipe_r.recv()
except EOFError:
self._parent_stop_listening()
break
- self._parent_process_envelope(envelope)
+
+ self._messenger.message(message)
# _parent_recv()
#
@@ -629,19 +597,6 @@ class ChildJob:
# Local Private Methods #
#######################################################
- # _send_message()
- #
- # Send data in a message to the parent Job, running in the main process.
- #
- # Args:
- # message_type (str): The type of message to send.
- # message_data (any): A simple object (must be pickle-able, i.e.
- # strings, lists, dicts, numbers, but not Element
- # instances). This is sent to the parent Job.
- #
- def _send_message(self, message_type, message_data):
- self._pipe_w.send(_Envelope(message_type, message_data))
-
# _child_message_handler()
#
# A Context delegate for handling messages, this replaces the
@@ -669,4 +624,4 @@ class ChildJob:
if message.message_type == MessageType.LOG:
return
- self._send_message(_MessageType.LOG_MESSAGE, message)
+ self._pipe_w.send(message)