diff options
Diffstat (limited to 'src/buildstream/_scheduler/scheduler.py')
-rw-r--r-- | src/buildstream/_scheduler/scheduler.py | 29 |
1 files changed, 18 insertions, 11 deletions
diff --git a/src/buildstream/_scheduler/scheduler.py b/src/buildstream/_scheduler/scheduler.py index 664986534..44ebeef5f 100644 --- a/src/buildstream/_scheduler/scheduler.py +++ b/src/buildstream/_scheduler/scheduler.py @@ -26,6 +26,7 @@ from itertools import chain import signal import datetime from contextlib import contextmanager +import queue # Local imports from .resources import Resources, ResourceType @@ -55,6 +56,7 @@ class NotificationType(enum.Enum): EXCEPTION = "exception" TASK_ERROR = "task_error" SCHED_TERMINATE = "sched_terminate" + QUEUES = "queues" class Notification: @@ -70,7 +72,8 @@ class Notification: element=None, exception=None, domain=None, - reason=None): + reason=None, + queues=None): self.notification_type = notification_type self.full_name = full_name @@ -82,6 +85,7 @@ class Notification: self.exception = exception self.domain = domain self.reason = reason + self.queues = queues # Scheduler() @@ -175,6 +179,14 @@ class Scheduler(): # Hold on to the queues to process self.queues = queues + # Report to the main process which queues are in session, + # for now a list of action_names as pickling queues is + # causing errors. Will need actual queue object or bidirectional + # notification queue for error handling later. + queue_list = [q.action_name for q in self.queues] + notifcation = Notification(NotificationType.QUEUES, queues=queue_list) + self._notify(notifcation) + # Ensure that we have a fresh new event loop, in case we want # to run another test in this thread. self.loop = asyncio.new_event_loop() @@ -294,20 +306,17 @@ class Scheduler(): # queue (Queue): The Queue holding a complete job # job (Job): The completed Job # status (JobStatus): The status of the completed job - # process_jobs (bool): If the scheduler should also process the - # job, else just generate the notification # - def job_completed(self, job, status, process_jobs=True): + def job_completed(self, job, status): - if process_jobs: - # Remove from the active jobs list - self._active_jobs.remove(job) + self._active_jobs.remove(job) + element = None if status == JobStatus.FAIL: # If it's an elementjob, we want to compare against the failure messages # and send the Element() instance if interactive failure handling. Note # this may change if the frontend is run in a separate process for pickling - element = job._element if (job.element_job and self._interactive_failure) else None + element = job._element if (job.element_job and self._interactive_failure) else element notification = Notification(NotificationType.JOB_COMPLETE, full_name=job.name, @@ -317,9 +326,7 @@ class Scheduler(): element=element) self._notify(notification) - if process_jobs: - # Now check for more jobs - self._sched() + self._sched() # check_cache_size(): # |