diff options
Diffstat (limited to 'buildstream/_scheduler/queues/queue.py')
-rw-r--r-- | buildstream/_scheduler/queues/queue.py | 119 |
1 files changed, 36 insertions, 83 deletions
diff --git a/buildstream/_scheduler/queues/queue.py b/buildstream/_scheduler/queues/queue.py index 1efcffc16..5ca80f823 100644 --- a/buildstream/_scheduler/queues/queue.py +++ b/buildstream/_scheduler/queues/queue.py @@ -31,7 +31,11 @@ from ..resources import ResourceType # BuildStream toplevel imports from ..._exceptions import BstError, set_last_task_error from ..._message import Message, MessageType +from ...types import _UniquePriorityQueue +import sys +def debug(*args): + print("➤➤➤", *args, file=sys.stderr) # Queue status for a given element # @@ -59,7 +63,7 @@ class Queue(): complete_name = None resources = [] # Resources this queues' jobs want - def __init__(self, scheduler): + def __init__(self, scheduler, next_queue=None): # # Public members @@ -73,10 +77,11 @@ class Queue(): # self._scheduler = scheduler self._resources = scheduler.resources # Shared resource pool - self._wait_queue = deque() # Ready / Waiting elements - self._done_queue = deque() # Processed / Skipped elements + self._queue = _UniquePriorityQueue() # Ready / Waiting elements self._max_retries = 0 + self._next_queue = next_queue + # Assert the subclass has setup class data assert self.action_name is not None assert self.complete_name is not None @@ -101,7 +106,7 @@ class Queue(): # # def process(self, element): - pass + raise NotImplementedError() # status() # @@ -114,7 +119,7 @@ class Queue(): # (QueueStatus): The element status # def status(self, element): - return QueueStatus.READY + raise NotImplementedError() # done() # @@ -140,86 +145,34 @@ class Queue(): # Args: # elts (list): A list of Elements # - def enqueue(self, elts): - if not elts: - return - - # Place skipped elements on the done queue right away. - # - # The remaining ready and waiting elements must remain in the - # same queue, and ready status must be determined at the moment - # which the scheduler is asking for the next job. - # - skip = [elt for elt in elts if self.status(elt) == QueueStatus.SKIP] - wait = [elt for elt in elts if elt not in skip] - - self.skipped_elements.extend(skip) # Public record of skipped elements - self._done_queue.extend(skip) # Elements to be processed - self._wait_queue.extend(wait) # Elements eligible to be dequeued - - # dequeue() - # - # A generator which dequeues the elements which - # are ready to exit the queue. - # - # Yields: - # (Element): Elements being dequeued - # - def dequeue(self): - while self._done_queue: - yield self._done_queue.popleft() - - # dequeue_ready() - # - # Reports whether any elements can be promoted to other queues - # - # Returns: - # (bool): Whether there are elements ready - # - def dequeue_ready(self): - return any(self._done_queue) + def push(self, element): + debug("Adding element", element, "to", self.action_name) + self._queue.push(element._pipeline_id, element) - # harvest_jobs() - # - # Process elements in the queue, moving elements which were enqueued - # into the dequeue pool, and creating as many jobs for which resources - # can be reserved. - # - # Returns: - # ([Job]): A list of jobs which can be run now - # - def harvest_jobs(self): - unready = [] - ready = [] + def skip(self, element): + self.skipped_elements.append(element) + if self._next_queue: + self._next_queue.push(element) - while self._wait_queue: - if not self._resources.reserve(self.resources, peek=True): - break + def pop(self): + debug("Popping", self.action_name, [(str(x[1]), x[1]._unique_id) for x in self._queue._heap]) - element = self._wait_queue.popleft() - status = self.status(element) + if not self._resources.reserve(self.resources) or not self._queue: + raise IndexError() - if status == QueueStatus.WAIT: - unready.append(element) - elif status == QueueStatus.SKIP: - self._done_queue.append(element) - self.skipped_elements.append(element) - else: - reserved = self._resources.reserve(self.resources) - assert reserved - ready.append(element) - - self._wait_queue.extendleft(unready) - - return [ - ElementJob(self._scheduler, self.action_name, - self._element_log_path(element), - element=element, queue=self, - action_cb=self.process, - complete_cb=self._job_done, - max_retries=self._max_retries) - for element in ready - ] + else: + element = self._queue.pop() + + return ElementJob( + self._scheduler, + self.action_name, + self._element_log_path(element), + element=element, + queue=self, + action_cb=self.process, + complete_cb=self._job_done, + max_retries=self._max_retries, + ) ##################################################### # Private Methods # @@ -301,8 +254,8 @@ class Queue(): detail=traceback.format_exc()) self.failed_elements.append(element) else: - # All elements get placed on the done queue for later processing. - self._done_queue.append(element) + if self._next_queue: + self._next_queue.push(element) # These lists are for bookkeeping purposes for the UI and logging. if status == JobStatus.SKIPPED or job.get_terminated(): |