From 34395d069f1fe78c0d50af6255d2a62c68fbde95 Mon Sep 17 00:00:00 2001 From: Phil Dawson Date: Wed, 21 Nov 2018 09:10:05 +0000 Subject: WIP: use priority queue in schedular --- buildstream/_scheduler/jobs/cachesizejob.py | 3 +++ buildstream/_scheduler/jobs/cleanupjob.py | 3 +++ buildstream/_scheduler/jobs/elementjob.py | 10 ++++++++++ buildstream/_scheduler/jobs/job.py | 4 ++++ buildstream/_scheduler/scheduler.py | 5 +++-- setup.py | 1 + 6 files changed, 24 insertions(+), 2 deletions(-) diff --git a/buildstream/_scheduler/jobs/cachesizejob.py b/buildstream/_scheduler/jobs/cachesizejob.py index d46fd4c16..17ccb0a36 100644 --- a/buildstream/_scheduler/jobs/cachesizejob.py +++ b/buildstream/_scheduler/jobs/cachesizejob.py @@ -39,3 +39,6 @@ class CacheSizeJob(Job): def child_process_data(self): return {} + + def key(self): + return (100, 'cache-size') diff --git a/buildstream/_scheduler/jobs/cleanupjob.py b/buildstream/_scheduler/jobs/cleanupjob.py index 8bdbba0ed..3e860e28c 100644 --- a/buildstream/_scheduler/jobs/cleanupjob.py +++ b/buildstream/_scheduler/jobs/cleanupjob.py @@ -32,3 +32,6 @@ class CleanupJob(Job): def parent_complete(self, success, result): if success: self._artifacts.set_cache_size(result) + + def key(self): + return (0, 'cleanup') diff --git a/buildstream/_scheduler/jobs/elementjob.py b/buildstream/_scheduler/jobs/elementjob.py index 8ce5c062f..1d6305cc5 100644 --- a/buildstream/_scheduler/jobs/elementjob.py +++ b/buildstream/_scheduler/jobs/elementjob.py @@ -22,6 +22,13 @@ from ..._message import Message, MessageType from .job import Job +_ACTIONS = { + "Build": 10, + "Fetch": 20, + "Pull": 30, + "Push": 40, + "Track": 50, +} # ElementJob() # @@ -113,3 +120,6 @@ class ElementJob(Job): data['workspace'] = workspace.to_dict() return data + + def key(self): + return (_ACTIONS.get(self.action_name, 100), self._element.name) diff --git a/buildstream/_scheduler/jobs/job.py b/buildstream/_scheduler/jobs/job.py index 60ae0d001..ff0ea8340 100644 --- a/buildstream/_scheduler/jobs/job.py +++ b/buildstream/_scheduler/jobs/job.py @@ -353,6 +353,10 @@ class Job(): def child_process_data(self): return {} + def key(self): + raise ImplError("Job '{kind}' does not implement key()" + .format(kind=type(self).__name__)) + ####################################################### # Local Private Methods # ####################################################### diff --git a/buildstream/_scheduler/scheduler.py b/buildstream/_scheduler/scheduler.py index b76c7308e..6b849f359 100644 --- a/buildstream/_scheduler/scheduler.py +++ b/buildstream/_scheduler/scheduler.py @@ -25,6 +25,7 @@ from itertools import chain import signal import datetime from contextlib import contextmanager +from sortedcontainers import SortedList # Local imports from .resources import Resources, ResourceType @@ -72,7 +73,7 @@ class Scheduler(): # Public members # self.active_jobs = [] # Jobs currently being run in the scheduler - self.waiting_jobs = [] # Jobs waiting for resources + self.waiting_jobs = SortedList([], key=lambda job: job.key()) # Jobs waiting for resources self.queues = None # Exposed for the frontend to print summaries self.context = context # The Context object shared with Queues self.terminated = False # Whether the scheduler was asked to terminate or has terminated @@ -222,7 +223,7 @@ class Scheduler(): # def schedule_jobs(self, jobs): for job in jobs: - self.waiting_jobs.append(job) + self.waiting_jobs.add(job) # job_completed(): # diff --git a/setup.py b/setup.py index 76610f0ef..95d920406 100755 --- a/setup.py +++ b/setup.py @@ -343,6 +343,7 @@ setup(name='BuildStream', 'jinja2 >= 2.10', 'protobuf >= 3.5', 'grpcio >= 1.10', + 'sortedcontainers >= 1.5.7', ], entry_points=bst_install_entry_points, tests_require=dev_requires, -- cgit v1.2.1