From 0b4ec4b4b6bdec4ca5051518c3af6a87a43f467d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=BCrg=20Billeter?= Date: Wed, 20 Nov 2019 08:17:02 +0100 Subject: WIP: job.py: Queue -> Pipe --- src/buildstream/_scheduler/jobs/job.py | 23 +++++++++++++++-------- 1 file changed, 15 insertions(+), 8 deletions(-) diff --git a/src/buildstream/_scheduler/jobs/job.py b/src/buildstream/_scheduler/jobs/job.py index 8e909977f..ef5a895ab 100644 --- a/src/buildstream/_scheduler/jobs/job.py +++ b/src/buildstream/_scheduler/jobs/job.py @@ -155,7 +155,7 @@ class Job: # def start(self): - self._queue = multiprocessing.Queue() + self._queue, queue_writer = multiprocessing.Pipe(duplex=False) self._tries += 1 self._parent_start_listening() @@ -174,10 +174,10 @@ class Job: if self._scheduler.context.platform.does_multiprocessing_start_require_pickling(): pickled = pickle_child_job(child_job, self._scheduler.context.get_projects(),) self._process = _multiprocessing.AsyncioSafeProcess( - target=do_pickled_child_job, args=[pickled, self._queue], + target=do_pickled_child_job, args=[pickled, queue_writer], ) else: - self._process = _multiprocessing.AsyncioSafeProcess(target=child_job.child_action, args=[self._queue],) + self._process = _multiprocessing.AsyncioSafeProcess(target=child_job.child_action, args=[queue_writer],) # Block signals which are handled in the main process such that # the child process does not inherit the parent's state, but the main @@ -186,6 +186,9 @@ class Job: with _signals.blocked([signal.SIGINT, signal.SIGTSTP, signal.SIGTERM], ignore=False): self._process.start() + queue_writer.close() + queue_writer = None + # Wait for the child task to complete. # # This is a tricky part of python which doesnt seem to @@ -508,8 +511,12 @@ class Job: # in the parent process. # def _parent_process_queue(self): - while not self._queue.empty(): - envelope = self._queue.get_nowait() + while self._queue.poll(): + try: + envelope = self._queue.recv() + except EOFError: + self._parent_stop_listening() + break self._parent_process_envelope(envelope) # _parent_recv() @@ -537,7 +544,7 @@ class Job: # http://bugs.python.org/issue3831 # if not self._listening: - self._scheduler.loop.add_reader(self._queue._reader.fileno(), self._parent_recv) + self._scheduler.loop.add_reader(self._queue.fileno(), self._parent_recv) self._listening = True # _parent_stop_listening() @@ -546,7 +553,7 @@ class Job: # def _parent_stop_listening(self): if self._listening: - self._scheduler.loop.remove_reader(self._queue._reader.fileno()) + self._scheduler.loop.remove_reader(self._queue.fileno()) self._listening = False @@ -797,7 +804,7 @@ class ChildJob: # instances). This is sent to the parent Job. # def _send_message(self, message_type, message_data): - self._queue.put(_Envelope(message_type, message_data)) + self._queue.send(_Envelope(message_type, message_data)) # _child_send_error() # -- cgit v1.2.1