summaryrefslogtreecommitdiff
path: root/buildstream/_scheduler/queues/queue.py
diff options
context:
space:
mode:
Diffstat (limited to 'buildstream/_scheduler/queues/queue.py')
-rw-r--r--buildstream/_scheduler/queues/queue.py119
1 files changed, 36 insertions, 83 deletions
diff --git a/buildstream/_scheduler/queues/queue.py b/buildstream/_scheduler/queues/queue.py
index 1efcffc16..5ca80f823 100644
--- a/buildstream/_scheduler/queues/queue.py
+++ b/buildstream/_scheduler/queues/queue.py
@@ -31,7 +31,11 @@ from ..resources import ResourceType
# BuildStream toplevel imports
from ..._exceptions import BstError, set_last_task_error
from ..._message import Message, MessageType
+from ...types import _UniquePriorityQueue
+import sys
+def debug(*args):
+ print("➤➤➤", *args, file=sys.stderr)
# Queue status for a given element
#
@@ -59,7 +63,7 @@ class Queue():
complete_name = None
resources = [] # Resources this queues' jobs want
- def __init__(self, scheduler):
+ def __init__(self, scheduler, next_queue=None):
#
# Public members
@@ -73,10 +77,11 @@ class Queue():
#
self._scheduler = scheduler
self._resources = scheduler.resources # Shared resource pool
- self._wait_queue = deque() # Ready / Waiting elements
- self._done_queue = deque() # Processed / Skipped elements
+ self._queue = _UniquePriorityQueue() # Ready / Waiting elements
self._max_retries = 0
+ self._next_queue = next_queue
+
# Assert the subclass has setup class data
assert self.action_name is not None
assert self.complete_name is not None
@@ -101,7 +106,7 @@ class Queue():
#
#
def process(self, element):
- pass
+ raise NotImplementedError()
# status()
#
@@ -114,7 +119,7 @@ class Queue():
# (QueueStatus): The element status
#
def status(self, element):
- return QueueStatus.READY
+ raise NotImplementedError()
# done()
#
@@ -140,86 +145,34 @@ class Queue():
# Args:
# elts (list): A list of Elements
#
- def enqueue(self, elts):
- if not elts:
- return
-
- # Place skipped elements on the done queue right away.
- #
- # The remaining ready and waiting elements must remain in the
- # same queue, and ready status must be determined at the moment
- # which the scheduler is asking for the next job.
- #
- skip = [elt for elt in elts if self.status(elt) == QueueStatus.SKIP]
- wait = [elt for elt in elts if elt not in skip]
-
- self.skipped_elements.extend(skip) # Public record of skipped elements
- self._done_queue.extend(skip) # Elements to be processed
- self._wait_queue.extend(wait) # Elements eligible to be dequeued
-
- # dequeue()
- #
- # A generator which dequeues the elements which
- # are ready to exit the queue.
- #
- # Yields:
- # (Element): Elements being dequeued
- #
- def dequeue(self):
- while self._done_queue:
- yield self._done_queue.popleft()
-
- # dequeue_ready()
- #
- # Reports whether any elements can be promoted to other queues
- #
- # Returns:
- # (bool): Whether there are elements ready
- #
- def dequeue_ready(self):
- return any(self._done_queue)
+ def push(self, element):
+ debug("Adding element", element, "to", self.action_name)
+ self._queue.push(element._pipeline_id, element)
- # harvest_jobs()
- #
- # Process elements in the queue, moving elements which were enqueued
- # into the dequeue pool, and creating as many jobs for which resources
- # can be reserved.
- #
- # Returns:
- # ([Job]): A list of jobs which can be run now
- #
- def harvest_jobs(self):
- unready = []
- ready = []
+ def skip(self, element):
+ self.skipped_elements.append(element)
+ if self._next_queue:
+ self._next_queue.push(element)
- while self._wait_queue:
- if not self._resources.reserve(self.resources, peek=True):
- break
+ def pop(self):
+ debug("Popping", self.action_name, [(str(x[1]), x[1]._unique_id) for x in self._queue._heap])
- element = self._wait_queue.popleft()
- status = self.status(element)
+ if not self._resources.reserve(self.resources) or not self._queue:
+ raise IndexError()
- if status == QueueStatus.WAIT:
- unready.append(element)
- elif status == QueueStatus.SKIP:
- self._done_queue.append(element)
- self.skipped_elements.append(element)
- else:
- reserved = self._resources.reserve(self.resources)
- assert reserved
- ready.append(element)
-
- self._wait_queue.extendleft(unready)
-
- return [
- ElementJob(self._scheduler, self.action_name,
- self._element_log_path(element),
- element=element, queue=self,
- action_cb=self.process,
- complete_cb=self._job_done,
- max_retries=self._max_retries)
- for element in ready
- ]
+ else:
+ element = self._queue.pop()
+
+ return ElementJob(
+ self._scheduler,
+ self.action_name,
+ self._element_log_path(element),
+ element=element,
+ queue=self,
+ action_cb=self.process,
+ complete_cb=self._job_done,
+ max_retries=self._max_retries,
+ )
#####################################################
# Private Methods #
@@ -301,8 +254,8 @@ class Queue():
detail=traceback.format_exc())
self.failed_elements.append(element)
else:
- # All elements get placed on the done queue for later processing.
- self._done_queue.append(element)
+ if self._next_queue:
+ self._next_queue.push(element)
# These lists are for bookkeeping purposes for the UI and logging.
if status == JobStatus.SKIPPED or job.get_terminated():