From 57236af65c306ebbbec6d01f99593319b8cdeabf Mon Sep 17 00:00:00 2001 From: Phil Dawson Date: Wed, 21 Nov 2018 09:10:05 +0000 Subject: WIP: scheduler.py: Add a second high priority queue Adds a queue which allows "high priority" jobs to bypass the main waiting_jobs queue. This is then used to ensure that fetch jobs are prioritised over pull jobs. --- buildstream/_scheduler/queues/fetchqueue.py | 1 + buildstream/_scheduler/queues/queue.py | 1 + buildstream/_scheduler/scheduler.py | 58 +++++++++++++++++------------ 3 files changed, 37 insertions(+), 23 deletions(-) diff --git a/buildstream/_scheduler/queues/fetchqueue.py b/buildstream/_scheduler/queues/fetchqueue.py index 446dbbd3b..5c441ba64 100644 --- a/buildstream/_scheduler/queues/fetchqueue.py +++ b/buildstream/_scheduler/queues/fetchqueue.py @@ -33,6 +33,7 @@ class FetchQueue(Queue): action_name = "Fetch" complete_name = "Fetched" resources = [ResourceType.DOWNLOAD] + high_priority = True def __init__(self, scheduler, skip_cached=False): super().__init__(scheduler) diff --git a/buildstream/_scheduler/queues/queue.py b/buildstream/_scheduler/queues/queue.py index 909cebb44..3c0414036 100644 --- a/buildstream/_scheduler/queues/queue.py +++ b/buildstream/_scheduler/queues/queue.py @@ -58,6 +58,7 @@ class Queue(): action_name = None complete_name = None resources = [] # Resources this queues' jobs want + high_priority = False # If jobs from this queue should be prioritised by the scheduler def __init__(self, scheduler): diff --git a/buildstream/_scheduler/scheduler.py b/buildstream/_scheduler/scheduler.py index b76c7308e..1b05415fd 100644 --- a/buildstream/_scheduler/scheduler.py +++ b/buildstream/_scheduler/scheduler.py @@ -71,12 +71,13 @@ class Scheduler(): # # Public members # - self.active_jobs = [] # Jobs currently being run in the scheduler - self.waiting_jobs = [] # Jobs waiting for resources - self.queues = None # Exposed for the frontend to print summaries - self.context = context # The Context object shared with Queues - self.terminated = False # Whether the scheduler was asked to terminate or has terminated - self.suspended = False # Whether the scheduler is currently suspended + self.active_jobs = [] # Jobs currently being run in the scheduler + self.waiting_jobs = [] # Jobs waiting for resources + self.waiting_priority_jobs = [] # High priority jobs waiting for resources + self.queues = None # Exposed for the frontend to print summaries + self.context = context # The Context object shared with Queues + self.terminated = False # Whether the scheduler was asked to terminate or has terminated + self.suspended = False # Whether the scheduler is currently suspended # These are shared with the Job, but should probably be removed or made private in some way. self.loop = None # Shared for Job access to observe the message queue @@ -220,7 +221,9 @@ class Scheduler(): # run as soon any other queueing jobs finish, provided sufficient # resources are available for them to run # - def schedule_jobs(self, jobs): + def schedule_jobs(self, jobs, priority_jobs): + for job in priority_jobs: + self.waiting_priority_jobs.append(job) for job in jobs: self.waiting_jobs.append(job) @@ -257,7 +260,7 @@ class Scheduler(): resources=[ResourceType.CACHE, ResourceType.PROCESS], complete_cb=self._run_cleanup) - self.schedule_jobs([job]) + self.schedule_jobs([job], []) ####################################################### # Local Private Methods # @@ -269,22 +272,27 @@ class Scheduler(): # automatically when Scheduler.run() is called initially, # def _sched(self): - for job in self.waiting_jobs: - self._resources.reserve_exclusive_resources(job) + def allocate_resources_and_spawn_jobs(job_list): + for job in job_list: + self._resources.reserve_exclusive_resources(job) + + for job in job_list: + if not self._resources.reserve_job_resources(job): + continue - for job in self.waiting_jobs: - if not self._resources.reserve_job_resources(job): - continue + job.spawn() + job_list.remove(job) + self.active_jobs.append(job) - job.spawn() - self.waiting_jobs.remove(job) - self.active_jobs.append(job) + if self._job_start_callback: + self._job_start_callback(job) - if self._job_start_callback: - self._job_start_callback(job) + # Process jobs from the high priority list first + allocate_resources_and_spawn_jobs(self.waiting_priority_jobs) + allocate_resources_and_spawn_jobs(self.waiting_jobs) # If nothings ticking, time to bail out - if not self.active_jobs and not self.waiting_jobs: + if not self.active_jobs and not self.waiting_jobs and not self.waiting_priority_jobs: self.loop.stop() # _schedule_queue_jobs() @@ -298,6 +306,7 @@ class Scheduler(): # def _schedule_queue_jobs(self): ready = [] + ready_priority = [] process_queues = True while self._queue_jobs and process_queues: @@ -322,16 +331,19 @@ class Scheduler(): # to fetch tasks for elements which failed to pull, and # thus need all the pulls to complete before ever starting # a build - ready.extend(chain.from_iterable( - queue.pop_ready_jobs() for queue in reversed(self.queues) - )) + + for queue in reversed(self.queues): + if queue.high_priority: + ready_priority.extend(queue.pop_ready_jobs()) + else: + ready.extend(queue.pop_ready_jobs()) # pop_ready_jobs() may have skipped jobs, adding them to # the done_queue. Pull these skipped elements forward to # the next queue and process them. process_queues = any(q.dequeue_ready() for q in self.queues) - self.schedule_jobs(ready) + self.schedule_jobs(ready, ready_priority) self._sched() # _run_cleanup() -- cgit v1.2.1