summaryrefslogtreecommitdiff
path: root/src/buildstream/_stream.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/buildstream/_stream.py')
-rw-r--r--src/buildstream/_stream.py77
1 files changed, 56 insertions, 21 deletions
diff --git a/src/buildstream/_stream.py b/src/buildstream/_stream.py
index 74f7755e0..f0f61383e 100644
--- a/src/buildstream/_stream.py
+++ b/src/buildstream/_stream.py
@@ -74,6 +74,7 @@ class Stream():
self.queues = [] # Queue objects
self.len_session_elements = None
self.len_total_elements = None
+ self.loop = None
#
# Private members
@@ -141,26 +142,37 @@ class Stream():
self._subprocess = mp_context.Process(target=Stream._subprocess_main, args=args,
kwargs=kwargs, name=process_name)
+
self._subprocess.start()
+ # We can now launch another async
+ self.loop = asyncio.new_event_loop()
+ asyncio.set_event_loop(self.loop)
+ self._start_listening()
+ #raise ValueError("started listening")
+ self.loop.run_forever()
+
+ # Run forever needs to be forcefully stopped, else we never exit the statement
+
+ #raise ValueError("run_forever")
# TODO connect signal handlers with asyncio
- while self._subprocess.exitcode is None:
+ #while self._subprocess.exitcode is None:
# check every given time interval on subprocess state
- self._subprocess.join(0.01)
- # if no exit code, go back to checking the message queue
- self._loop()
-
+ #self._subprocess.join(0.01)
+ # Scheduler has stopped running, so safe to still have async here
+ self._stop_listening()
+ #print("closing the loop")
+ #raise ValueError("closing loop")
+ #self.loop.stop()
+ self.loop.close()
+ self.loop = None
# Set main process back
utils._reset_main_pid()
# Ensure no more notifcations to process
- try:
- while True:
- notification = self._notify_front_queue.get_nowait()
- self._notification_handler(notification)
- except queue.Empty:
- pass
-
+ while not self._notify_front_queue.empty():
+ notification = self._notify_front_queue.get_nowait()
+ self._notification_handler(notification)
# cleanup()
#
@@ -1456,6 +1468,9 @@ class Stream():
status = self._scheduler.run(self.queues)
+ # Scheduler has finished running, send frontend notification
+ self._notify_front(Notification(NotificationType.FINISH))
+
if status == SchedStatus.ERROR:
raise StreamError()
if status == SchedStatus.TERMINATED:
@@ -1774,12 +1789,18 @@ class Stream():
elif notification.notification_type == NotificationType.TASK_ERROR:
set_last_task_error(*notification.task_error)
elif notification.notification_type == NotificationType.EXCEPTION:
+ # If we're looping, stop
+ if self.loop:
+ self.loop.stop()
# Regenerate the exception here, so we don't have to pickle it
raise SubprocessException(**notification.exception)
elif notification.notification_type == NotificationType.START:
self._session_start_callback()
elif notification.notification_type == NotificationType.ELEMENT_TOTALS:
self.len_session_elements, self.len_total_elements = notification.element_totals
+ elif notification.notification_type == NotificationType.FINISH:
+ if self.loop:
+ self.loop.stop()
else:
raise StreamError("Unrecognised notification type received")
@@ -1797,16 +1818,30 @@ class Stream():
# The code to be run by the Stream's event loop while delegating
# work to a subprocess with the @subprocessed decorator
- def _loop(self):
- assert self._notify_front_queue
+ #def _loop(self):
+ #assert self._notify_front_queue
# Check for and process new messages
- while True:
- try:
- notification = self._notify_front_queue.get_nowait()
- self._notification_handler(notification)
- except queue.Empty:
- notification = None
- break
+ #while True:
+ #try:
+ #notification = self._notify_front_queue.get_nowait()
+ #self._notification_handler(notification)
+ #except queue.Empty:
+ #notification = None
+ #break
+
+ def _loop(self):
+ while not self._notify_front_queue.empty():
+ notification = self._notify_front_queue.get_nowait()
+ self._notification_handler(notification)
+
+ def _start_listening(self):
+ if self._notify_front_queue:
+ self.loop.add_reader(self._notify_front_queue._reader.fileno(), self._loop)
+
+ def _stop_listening(self):
+ if self._notify_front_queue:
+ self.loop.remove_reader(self._notify_front_queue._reader.fileno())
+
def __getstate__(self):
# The only use-cases for pickling in BuildStream at the time of writing