diff options
Diffstat (limited to 'src/buildstream/_scheduler')
-rw-r--r-- | src/buildstream/_scheduler/queues/buildqueue.py | 12 | ||||
-rw-r--r-- | src/buildstream/_scheduler/queues/fetchqueue.py | 12 | ||||
-rw-r--r-- | src/buildstream/_scheduler/queues/pullqueue.py | 13 | ||||
-rw-r--r-- | src/buildstream/_scheduler/queues/queue.py | 107 |
4 files changed, 89 insertions, 55 deletions
diff --git a/src/buildstream/_scheduler/queues/buildqueue.py b/src/buildstream/_scheduler/queues/buildqueue.py index dc82f54ec..b280661cc 100644 --- a/src/buildstream/_scheduler/queues/buildqueue.py +++ b/src/buildstream/_scheduler/queues/buildqueue.py @@ -70,16 +70,11 @@ class BuildQueue(Queue): return element._assemble() def status(self, element): - if not element._is_required(): - # Artifact is not currently required but it may be requested later. - # Keep it in the queue. - return QueueStatus.WAIT - if element._cached_success(): return QueueStatus.SKIP if not element._buildable(): - return QueueStatus.WAIT + return QueueStatus.PENDING return QueueStatus.READY @@ -115,3 +110,8 @@ class BuildQueue(Queue): # if status is JobStatus.OK: self._check_cache_size(job, element, result) + + def register_pending_element(self, element): + # Set a "buildable" callback for an element not yet ready + # to be processed in the build queue. + element._set_buildable_callback(self._enqueue_element) diff --git a/src/buildstream/_scheduler/queues/fetchqueue.py b/src/buildstream/_scheduler/queues/fetchqueue.py index 90db77f42..bbb3b3d78 100644 --- a/src/buildstream/_scheduler/queues/fetchqueue.py +++ b/src/buildstream/_scheduler/queues/fetchqueue.py @@ -45,15 +45,10 @@ class FetchQueue(Queue): element._fetch(fetch_original=self._fetch_original) def status(self, element): - if not element._is_required(): - # Artifact is not currently required but it may be requested later. - # Keep it in the queue. - return QueueStatus.WAIT - # Optionally skip elements that are already in the artifact cache if self._skip_cached: if not element._can_query_cache(): - return QueueStatus.WAIT + return QueueStatus.PENDING if element._cached(): return QueueStatus.SKIP @@ -78,3 +73,8 @@ class FetchQueue(Queue): assert element._get_consistency() == Consistency.CACHED else: assert element._source_cached() + + def register_pending_element(self, element): + # Set a "can_query_cache" callback for an element not yet ready + # to be processed in the fetch queue. + element._set_can_query_cache_callback(self._enqueue_element) diff --git a/src/buildstream/_scheduler/queues/pullqueue.py b/src/buildstream/_scheduler/queues/pullqueue.py index 374181cda..245293342 100644 --- a/src/buildstream/_scheduler/queues/pullqueue.py +++ b/src/buildstream/_scheduler/queues/pullqueue.py @@ -39,13 +39,8 @@ class PullQueue(Queue): raise SkipJob(self.action_name) def status(self, element): - if not element._is_required(): - # Artifact is not currently required but it may be requested later. - # Keep it in the queue. - return QueueStatus.WAIT - if not element._can_query_cache(): - return QueueStatus.WAIT + return QueueStatus.PENDING if element._pull_pending(): return QueueStatus.READY @@ -64,3 +59,9 @@ class PullQueue(Queue): # actually check the cache size. if status is JobStatus.OK: self._scheduler.check_cache_size() + + def register_pending_element(self, element): + # Set a "can_query_cache"_callback for an element which is not + # immediately ready to query the artifact cache so that it + # may be pulled. + element._set_can_query_cache_callback(self._enqueue_element) diff --git a/src/buildstream/_scheduler/queues/queue.py b/src/buildstream/_scheduler/queues/queue.py index 7740896b5..951b28c2a 100644 --- a/src/buildstream/_scheduler/queues/queue.py +++ b/src/buildstream/_scheduler/queues/queue.py @@ -29,7 +29,7 @@ from ..jobs import ElementJob, JobStatus from ..resources import ResourceType # BuildStream toplevel imports -from ..._exceptions import BstError, set_last_task_error +from ..._exceptions import BstError, ImplError, set_last_task_error from ..._message import Message, MessageType @@ -37,8 +37,8 @@ from ..._message import Message, MessageType # # class QueueStatus(Enum): - # The element is waiting for dependencies. - WAIT = 1 + # The element is not yet ready to be processed in the queue. + PENDING = 1 # The element can skip this queue. SKIP = 2 @@ -73,10 +73,12 @@ class Queue(): # self._scheduler = scheduler self._resources = scheduler.resources # Shared resource pool - self._wait_queue = deque() # Ready / Waiting elements + self._ready_queue = deque() # Ready elements self._done_queue = deque() # Processed / Skipped elements self._max_retries = 0 + self._required_element_check = False # Whether we should check that elements are required before enqueuing + # Assert the subclass has setup class data assert self.action_name is not None assert self.complete_name is not None @@ -105,7 +107,9 @@ class Queue(): # status() # - # Abstract method for reporting the status of an element. + # Abstract method for reporting the immediate status of an element. The status + # determines whether an element can/cannot be pushed into the queue, or even + # skip the queue entirely, when called. # # Args: # element (Element): An element to process @@ -130,6 +134,23 @@ class Queue(): pass ##################################################### + # Virtual Methods for Queue implementations # + ##################################################### + + # register_pending_element() + # + # Virtual method for registering a queue specific callback + # to an Element which is not immediately ready to advance + # to the next queue + # + # Args: + # element (Element): The element waiting to be pushed into the queue + # + def register_pending_element(self, element): + raise ImplError("Queue type: {} does not implement register_pending_element()" + .format(self.action_name)) + + ##################################################### # Scheduler / Pipeline facing APIs # ##################################################### @@ -144,18 +165,12 @@ class Queue(): if not elts: return - # Place skipped elements on the done queue right away. - # - # The remaining ready and waiting elements must remain in the - # same queue, and ready status must be determined at the moment - # which the scheduler is asking for the next job. - # - skip = [elt for elt in elts if self.status(elt) == QueueStatus.SKIP] - wait = [elt for elt in elts if elt not in skip] - - self.skipped_elements.extend(skip) # Public record of skipped elements - self._done_queue.extend(skip) # Elements to be processed - self._wait_queue.extend(wait) # Elements eligible to be dequeued + # Obtain immediate element status + for elt in elts: + if self._required_element_check and not elt._is_required(): + elt._set_required_callback(self._enqueue_element) + else: + self._enqueue_element(elt) # dequeue() # @@ -181,35 +196,22 @@ class Queue(): # harvest_jobs() # - # Process elements in the queue, moving elements which were enqueued - # into the dequeue pool, and creating as many jobs for which resources + # Spawn as many jobs from the ready queue for which resources # can be reserved. # # Returns: # ([Job]): A list of jobs which can be run now # def harvest_jobs(self): - unready = [] ready = [] - - while self._wait_queue: - if not self._resources.reserve(self.resources, peek=True): + while self._ready_queue: + # Now reserve them + reserved = self._resources.reserve(self.resources) + if not reserved: break - element = self._wait_queue.popleft() - status = self.status(element) - - if status == QueueStatus.WAIT: - unready.append(element) - elif status == QueueStatus.SKIP: - self._done_queue.append(element) - self.skipped_elements.append(element) - else: - reserved = self._resources.reserve(self.resources) - assert reserved - ready.append(element) - - self._wait_queue.extendleft(unready) + element = self._ready_queue.popleft() + ready.append(element) return [ ElementJob(self._scheduler, self.action_name, @@ -221,6 +223,13 @@ class Queue(): for element in ready ] + # set_required_element_check() + # + # This ensures that, for the first non-track queue, we must check + # whether elements are required before enqueuing them + def set_required_element_check(self): + self._required_element_check = True + ##################################################### # Private Methods # ##################################################### @@ -326,3 +335,27 @@ class Queue(): logfile = "{key}-{action}".format(key=key, action=action) return os.path.join(project.name, element.normal_name, logfile) + + # _enqueue_element() + # + # Enqueue an Element upon a callback to a specific queue + # Here we check whether an element is either immediately ready to be processed + # in the current queue or whether it can skip the queue. Element's which are + # not yet ready to be processed or cannot skip will have the appropriate + # callback registered + # + # Args: + # element (Element): The Element to enqueue + # + def _enqueue_element(self, element): + status = self.status(element) + if status == QueueStatus.SKIP: + # Place skipped elements into the done queue immediately + self.skipped_elements.append(element) # Public record of skipped elements + self._done_queue.append(element) # Elements to proceed to the next queue + elif status == QueueStatus.READY: + # Push elements which are ready to be processed immediately into the queue + self._ready_queue.append(element) + else: + # Register a queue specific callback for pending elements + self.register_pending_element(element) |