summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTom Pollard <tom.pollard@codethink.co.uk>2019-09-05 17:30:35 +0100
committerbst-marge-bot <marge-bot@buildstream.build>2019-09-10 10:44:53 +0000
commita97af8edd7e1e9385c45fca2761f7327f9180059 (patch)
treeb6c38c31da12c85d486bf4333f9559ea86f85f7a
parentaaa104bc2e95e973adb6b61276e38ed573bd97d2 (diff)
downloadbuildstream-a97af8edd7e1e9385c45fca2761f7327f9180059.tar.gz
scheduler.py: Notification for Message() propagation
Add a notification for MESSAGE. Instead of scheduler's Queues and Jobs directly calling the message handler that App has assigned to Context, the Message() is now sent over the notification handler where it is then given to Messenger's handler.
-rw-r--r--src/buildstream/_scheduler/jobs/job.py6
-rw-r--r--src/buildstream/_scheduler/queues/queue.py3
-rw-r--r--src/buildstream/_scheduler/scheduler.py16
-rw-r--r--src/buildstream/_stream.py4
4 files changed, 22 insertions, 7 deletions
diff --git a/src/buildstream/_scheduler/jobs/job.py b/src/buildstream/_scheduler/jobs/job.py
index 9af08df92..913e27ea2 100644
--- a/src/buildstream/_scheduler/jobs/job.py
+++ b/src/buildstream/_scheduler/jobs/job.py
@@ -393,8 +393,8 @@ class Job():
element_name = self._message_element_name
if element_key is None:
element_key = self._message_element_key
- self._scheduler.context.messenger.message(
- Message(message_type, message, element_name=element_name, element_key=element_key, **kwargs))
+ message = Message(message_type, message, element_name=element_name, element_key=element_key, **kwargs)
+ self._scheduler.notify_messenger(message)
# get_element()
#
@@ -536,7 +536,7 @@ class Job():
if envelope.message_type is _MessageType.LOG_MESSAGE:
# Propagate received messages from children
# back through the context.
- self._scheduler.context.messenger.message(envelope.message)
+ self._scheduler.notify_messenger(envelope.message)
elif envelope.message_type is _MessageType.ERROR:
# For regression tests only, save the last error domain / reason
# reported from a child task in the main process, this global state
diff --git a/src/buildstream/_scheduler/queues/queue.py b/src/buildstream/_scheduler/queues/queue.py
index 745b59417..6c6dfdc4f 100644
--- a/src/buildstream/_scheduler/queues/queue.py
+++ b/src/buildstream/_scheduler/queues/queue.py
@@ -345,9 +345,8 @@ class Queue():
# Convenience wrapper for Queue implementations to send
# a message for the element they are processing
def _message(self, element, message_type, brief, **kwargs):
- context = element._get_context()
message = Message(message_type, brief, element_name=element._get_full_name(), **kwargs)
- context.messenger.message(message)
+ self._scheduler.notify_messenger(message)
def _element_log_path(self, element):
project = element._get_project()
diff --git a/src/buildstream/_scheduler/scheduler.py b/src/buildstream/_scheduler/scheduler.py
index b892296f5..d0a189545 100644
--- a/src/buildstream/_scheduler/scheduler.py
+++ b/src/buildstream/_scheduler/scheduler.py
@@ -61,6 +61,7 @@ class NotificationType(FastEnum):
UNSUSPEND = "unsuspend"
SUSPENDED = "suspended"
RETRY = "retry"
+ MESSAGE = "message"
# Notification()
@@ -80,13 +81,15 @@ class Notification():
job_action=None,
job_status=None,
time=None,
- element=None):
+ element=None,
+ message=None):
self.notification_type = notification_type
self.full_name = full_name
self.job_action = job_action
self.job_status = job_status
self.time = time
self.element = element
+ self.message = message
# Scheduler()
@@ -301,6 +304,17 @@ class Scheduler():
self._notify(notification)
self._sched()
+ # notify_messenger()
+ #
+ # Send message over notification queue to Messenger callback
+ #
+ # Args:
+ # message (Message): A Message() to be sent to the frontend message
+ # handler, as assigned by context's messenger.
+ #
+ def notify_messenger(self, message):
+ self._notify(Notification(NotificationType.MESSAGE, message=message))
+
#######################################################
# Local Private Methods #
#######################################################
diff --git a/src/buildstream/_stream.py b/src/buildstream/_stream.py
index 6d8d918dd..293ba051d 100644
--- a/src/buildstream/_stream.py
+++ b/src/buildstream/_stream.py
@@ -1653,7 +1653,9 @@ class Stream():
assert self._notification_queue
notification = self._notification_queue.pop()
- if notification.notification_type == NotificationType.INTERRUPT:
+ if notification.notification_type == NotificationType.MESSAGE:
+ self._context.messenger.message(notification.message)
+ elif notification.notification_type == NotificationType.INTERRUPT:
self._interrupt_callback()
elif notification.notification_type == NotificationType.TICK:
self._ticker_callback()