From 0becc13bdb8cf3e6a0bc271ce4fa118f52dba50e Mon Sep 17 00:00:00 2001 From: Tom Pollard Date: Thu, 5 Sep 2019 17:30:35 +0100 Subject: scheduler.py: Notification for Message() propagation Add a notification for MESSAGE. Instead of scheduler's Queues and Jobs directly calling the message handler that App has assigned to Context, the Message() is now sent over the notification handler where it is then given to Messenger's handler. --- src/buildstream/_scheduler/jobs/job.py | 6 +++--- src/buildstream/_scheduler/queues/queue.py | 3 +-- src/buildstream/_scheduler/scheduler.py | 16 +++++++++++++++- src/buildstream/_stream.py | 4 +++- 4 files changed, 22 insertions(+), 7 deletions(-) diff --git a/src/buildstream/_scheduler/jobs/job.py b/src/buildstream/_scheduler/jobs/job.py index 9af08df92..913e27ea2 100644 --- a/src/buildstream/_scheduler/jobs/job.py +++ b/src/buildstream/_scheduler/jobs/job.py @@ -393,8 +393,8 @@ class Job(): element_name = self._message_element_name if element_key is None: element_key = self._message_element_key - self._scheduler.context.messenger.message( - Message(message_type, message, element_name=element_name, element_key=element_key, **kwargs)) + message = Message(message_type, message, element_name=element_name, element_key=element_key, **kwargs) + self._scheduler.notify_messenger(message) # get_element() # @@ -536,7 +536,7 @@ class Job(): if envelope.message_type is _MessageType.LOG_MESSAGE: # Propagate received messages from children # back through the context. - self._scheduler.context.messenger.message(envelope.message) + self._scheduler.notify_messenger(envelope.message) elif envelope.message_type is _MessageType.ERROR: # For regression tests only, save the last error domain / reason # reported from a child task in the main process, this global state diff --git a/src/buildstream/_scheduler/queues/queue.py b/src/buildstream/_scheduler/queues/queue.py index 745b59417..6c6dfdc4f 100644 --- a/src/buildstream/_scheduler/queues/queue.py +++ b/src/buildstream/_scheduler/queues/queue.py @@ -345,9 +345,8 @@ class Queue(): # Convenience wrapper for Queue implementations to send # a message for the element they are processing def _message(self, element, message_type, brief, **kwargs): - context = element._get_context() message = Message(message_type, brief, element_name=element._get_full_name(), **kwargs) - context.messenger.message(message) + self._scheduler.notify_messenger(message) def _element_log_path(self, element): project = element._get_project() diff --git a/src/buildstream/_scheduler/scheduler.py b/src/buildstream/_scheduler/scheduler.py index b892296f5..d0a189545 100644 --- a/src/buildstream/_scheduler/scheduler.py +++ b/src/buildstream/_scheduler/scheduler.py @@ -61,6 +61,7 @@ class NotificationType(FastEnum): UNSUSPEND = "unsuspend" SUSPENDED = "suspended" RETRY = "retry" + MESSAGE = "message" # Notification() @@ -80,13 +81,15 @@ class Notification(): job_action=None, job_status=None, time=None, - element=None): + element=None, + message=None): self.notification_type = notification_type self.full_name = full_name self.job_action = job_action self.job_status = job_status self.time = time self.element = element + self.message = message # Scheduler() @@ -301,6 +304,17 @@ class Scheduler(): self._notify(notification) self._sched() + # notify_messenger() + # + # Send message over notification queue to Messenger callback + # + # Args: + # message (Message): A Message() to be sent to the frontend message + # handler, as assigned by context's messenger. + # + def notify_messenger(self, message): + self._notify(Notification(NotificationType.MESSAGE, message=message)) + ####################################################### # Local Private Methods # ####################################################### diff --git a/src/buildstream/_stream.py b/src/buildstream/_stream.py index 6d8d918dd..293ba051d 100644 --- a/src/buildstream/_stream.py +++ b/src/buildstream/_stream.py @@ -1653,7 +1653,9 @@ class Stream(): assert self._notification_queue notification = self._notification_queue.pop() - if notification.notification_type == NotificationType.INTERRUPT: + if notification.notification_type == NotificationType.MESSAGE: + self._context.messenger.message(notification.message) + elif notification.notification_type == NotificationType.INTERRUPT: self._interrupt_callback() elif notification.notification_type == NotificationType.TICK: self._ticker_callback() -- cgit v1.2.1