From 2908ca8f2b417f7b0824a934770619d0f8cf8909 Mon Sep 17 00:00:00 2001 From: James Ennis Date: Tue, 28 May 2019 17:31:07 +0100 Subject: queue.py: Use heapq for the ready queue This patch includes setting a _depth to each element once the pipeline has been sorted. This is necessary as we need to store elements in the heapq sorted by their depth. --- src/buildstream/_pipeline.py | 5 +++++ src/buildstream/_scheduler/queues/queue.py | 7 ++++--- src/buildstream/element.py | 12 ++++++++++++ 3 files changed, 21 insertions(+), 3 deletions(-) diff --git a/src/buildstream/_pipeline.py b/src/buildstream/_pipeline.py index c176b82f6..d44813348 100644 --- a/src/buildstream/_pipeline.py +++ b/src/buildstream/_pipeline.py @@ -513,4 +513,9 @@ class _Planner(): self.plan_element(root, 0) depth_sorted = sorted(self.depth_map.items(), key=itemgetter(1), reverse=True) + + # Set the depth of each element + for index, item in enumerate(depth_sorted): + item[0]._set_depth(index) + return [item[0] for item in depth_sorted if plan_cached or not item[0]._cached_success()] diff --git a/src/buildstream/_scheduler/queues/queue.py b/src/buildstream/_scheduler/queues/queue.py index 951b28c2a..9a07f633c 100644 --- a/src/buildstream/_scheduler/queues/queue.py +++ b/src/buildstream/_scheduler/queues/queue.py @@ -22,6 +22,7 @@ import os from collections import deque from enum import Enum +import heapq import traceback # Local imports @@ -73,7 +74,7 @@ class Queue(): # self._scheduler = scheduler self._resources = scheduler.resources # Shared resource pool - self._ready_queue = deque() # Ready elements + self._ready_queue = [] # Ready elements self._done_queue = deque() # Processed / Skipped elements self._max_retries = 0 @@ -210,7 +211,7 @@ class Queue(): if not reserved: break - element = self._ready_queue.popleft() + _, element = heapq.heappop(self._ready_queue) ready.append(element) return [ @@ -355,7 +356,7 @@ class Queue(): self._done_queue.append(element) # Elements to proceed to the next queue elif status == QueueStatus.READY: # Push elements which are ready to be processed immediately into the queue - self._ready_queue.append(element) + heapq.heappush(self._ready_queue, (element._depth, element)) else: # Register a queue specific callback for pending elements self.register_pending_element(element) diff --git a/src/buildstream/element.py b/src/buildstream/element.py index 355773f76..84c8f20ff 100644 --- a/src/buildstream/element.py +++ b/src/buildstream/element.py @@ -244,6 +244,8 @@ class Element(Plugin): self.__can_query_cache_callback = None # Callback to PullQueue/FetchQueue self.__buildable_callback = None # Callback to BuildQueue + self._depth = None # Depth of Element in its current dependency graph + # Ensure we have loaded this class's defaults self.__init_defaults(project, plugin_conf, meta.kind, meta.is_junction) @@ -2310,6 +2312,16 @@ class Element(Plugin): def _set_buildable_callback(self, callback): self.__buildable_callback = callback + # _set_depth() + # + # Set the depth of the Element. + # + # The depth represents the position of the Element within the current + # session's dependency graph. A depth of zero represents the bottommost element. + # + def _set_depth(self, depth): + self._depth = depth + ############################################################# # Private Local Methods # ############################################################# -- cgit v1.2.1