summaryrefslogtreecommitdiff
path: root/src/buildstream/_scheduler/scheduler.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/buildstream/_scheduler/scheduler.py')
-rw-r--r--src/buildstream/_scheduler/scheduler.py29
1 files changed, 18 insertions, 11 deletions
diff --git a/src/buildstream/_scheduler/scheduler.py b/src/buildstream/_scheduler/scheduler.py
index 664986534..44ebeef5f 100644
--- a/src/buildstream/_scheduler/scheduler.py
+++ b/src/buildstream/_scheduler/scheduler.py
@@ -26,6 +26,7 @@ from itertools import chain
import signal
import datetime
from contextlib import contextmanager
+import queue
# Local imports
from .resources import Resources, ResourceType
@@ -55,6 +56,7 @@ class NotificationType(enum.Enum):
EXCEPTION = "exception"
TASK_ERROR = "task_error"
SCHED_TERMINATE = "sched_terminate"
+ QUEUES = "queues"
class Notification:
@@ -70,7 +72,8 @@ class Notification:
element=None,
exception=None,
domain=None,
- reason=None):
+ reason=None,
+ queues=None):
self.notification_type = notification_type
self.full_name = full_name
@@ -82,6 +85,7 @@ class Notification:
self.exception = exception
self.domain = domain
self.reason = reason
+ self.queues = queues
# Scheduler()
@@ -175,6 +179,14 @@ class Scheduler():
# Hold on to the queues to process
self.queues = queues
+ # Report to the main process which queues are in session,
+ # for now a list of action_names as pickling queues is
+ # causing errors. Will need actual queue object or bidirectional
+ # notification queue for error handling later.
+ queue_list = [q.action_name for q in self.queues]
+ notifcation = Notification(NotificationType.QUEUES, queues=queue_list)
+ self._notify(notifcation)
+
# Ensure that we have a fresh new event loop, in case we want
# to run another test in this thread.
self.loop = asyncio.new_event_loop()
@@ -294,20 +306,17 @@ class Scheduler():
# queue (Queue): The Queue holding a complete job
# job (Job): The completed Job
# status (JobStatus): The status of the completed job
- # process_jobs (bool): If the scheduler should also process the
- # job, else just generate the notification
#
- def job_completed(self, job, status, process_jobs=True):
+ def job_completed(self, job, status):
- if process_jobs:
- # Remove from the active jobs list
- self._active_jobs.remove(job)
+ self._active_jobs.remove(job)
+ element = None
if status == JobStatus.FAIL:
# If it's an elementjob, we want to compare against the failure messages
# and send the Element() instance if interactive failure handling. Note
# this may change if the frontend is run in a separate process for pickling
- element = job._element if (job.element_job and self._interactive_failure) else None
+ element = job._element if (job.element_job and self._interactive_failure) else element
notification = Notification(NotificationType.JOB_COMPLETE,
full_name=job.name,
@@ -317,9 +326,7 @@ class Scheduler():
element=element)
self._notify(notification)
- if process_jobs:
- # Now check for more jobs
- self._sched()
+ self._sched()
# check_cache_size():
#