From d7d056bff1e90dae359d8feb0f287ce0a5f098a8 Mon Sep 17 00:00:00 2001 From: Tom Pollard Date: Fri, 11 Oct 2019 10:45:58 +0100 Subject: basic async in stream --- src/buildstream/_scheduler/scheduler.py | 1 + src/buildstream/_stream.py | 77 ++++++++++++++++++++++++--------- 2 files changed, 57 insertions(+), 21 deletions(-) diff --git a/src/buildstream/_scheduler/scheduler.py b/src/buildstream/_scheduler/scheduler.py index 122ba3716..0d06500c0 100644 --- a/src/buildstream/_scheduler/scheduler.py +++ b/src/buildstream/_scheduler/scheduler.py @@ -68,6 +68,7 @@ class NotificationType(FastEnum): START = "start" TASK_GROUPS = "task_groups" ELEMENT_TOTALS = "element_totals" + FINISH = "finish" # Notification() diff --git a/src/buildstream/_stream.py b/src/buildstream/_stream.py index 74f7755e0..f0f61383e 100644 --- a/src/buildstream/_stream.py +++ b/src/buildstream/_stream.py @@ -74,6 +74,7 @@ class Stream(): self.queues = [] # Queue objects self.len_session_elements = None self.len_total_elements = None + self.loop = None # # Private members @@ -141,26 +142,37 @@ class Stream(): self._subprocess = mp_context.Process(target=Stream._subprocess_main, args=args, kwargs=kwargs, name=process_name) + self._subprocess.start() + # We can now launch another async + self.loop = asyncio.new_event_loop() + asyncio.set_event_loop(self.loop) + self._start_listening() + #raise ValueError("started listening") + self.loop.run_forever() + + # Run forever needs to be forcefully stopped, else we never exit the statement + + #raise ValueError("run_forever") # TODO connect signal handlers with asyncio - while self._subprocess.exitcode is None: + #while self._subprocess.exitcode is None: # check every given time interval on subprocess state - self._subprocess.join(0.01) - # if no exit code, go back to checking the message queue - self._loop() - + #self._subprocess.join(0.01) + # Scheduler has stopped running, so safe to still have async here + self._stop_listening() + #print("closing the loop") + #raise ValueError("closing loop") + #self.loop.stop() + self.loop.close() + self.loop = None # Set main process back utils._reset_main_pid() # Ensure no more notifcations to process - try: - while True: - notification = self._notify_front_queue.get_nowait() - self._notification_handler(notification) - except queue.Empty: - pass - + while not self._notify_front_queue.empty(): + notification = self._notify_front_queue.get_nowait() + self._notification_handler(notification) # cleanup() # @@ -1456,6 +1468,9 @@ class Stream(): status = self._scheduler.run(self.queues) + # Scheduler has finished running, send frontend notification + self._notify_front(Notification(NotificationType.FINISH)) + if status == SchedStatus.ERROR: raise StreamError() if status == SchedStatus.TERMINATED: @@ -1774,12 +1789,18 @@ class Stream(): elif notification.notification_type == NotificationType.TASK_ERROR: set_last_task_error(*notification.task_error) elif notification.notification_type == NotificationType.EXCEPTION: + # If we're looping, stop + if self.loop: + self.loop.stop() # Regenerate the exception here, so we don't have to pickle it raise SubprocessException(**notification.exception) elif notification.notification_type == NotificationType.START: self._session_start_callback() elif notification.notification_type == NotificationType.ELEMENT_TOTALS: self.len_session_elements, self.len_total_elements = notification.element_totals + elif notification.notification_type == NotificationType.FINISH: + if self.loop: + self.loop.stop() else: raise StreamError("Unrecognised notification type received") @@ -1797,16 +1818,30 @@ class Stream(): # The code to be run by the Stream's event loop while delegating # work to a subprocess with the @subprocessed decorator - def _loop(self): - assert self._notify_front_queue + #def _loop(self): + #assert self._notify_front_queue # Check for and process new messages - while True: - try: - notification = self._notify_front_queue.get_nowait() - self._notification_handler(notification) - except queue.Empty: - notification = None - break + #while True: + #try: + #notification = self._notify_front_queue.get_nowait() + #self._notification_handler(notification) + #except queue.Empty: + #notification = None + #break + + def _loop(self): + while not self._notify_front_queue.empty(): + notification = self._notify_front_queue.get_nowait() + self._notification_handler(notification) + + def _start_listening(self): + if self._notify_front_queue: + self.loop.add_reader(self._notify_front_queue._reader.fileno(), self._loop) + + def _stop_listening(self): + if self._notify_front_queue: + self.loop.remove_reader(self._notify_front_queue._reader.fileno()) + def __getstate__(self): # The only use-cases for pickling in BuildStream at the time of writing -- cgit v1.2.1