diff options
author | bst-marge-bot <marge-bot@buildstream.build> | 2019-06-07 15:05:50 +0000 |
---|---|---|
committer | bst-marge-bot <marge-bot@buildstream.build> | 2019-06-07 15:05:50 +0000 |
commit | 16338464b15f8836cd6e84b9e7bc11047494cc66 (patch) | |
tree | b11b6021fd5dca76b525918c7cf8c1c1d3f8da51 | |
parent | 640f0ca5a17a144448a48de2a80e1f7a655eb9b2 (diff) | |
parent | 2908ca8f2b417f7b0824a934770619d0f8cf8909 (diff) | |
download | buildstream-16338464b15f8836cd6e84b9e7bc11047494cc66.tar.gz |
Merge branch 'jennis/push_based_pipeline' into 'master'
Push based pipeline
Closes #1002 and #943
See merge request BuildStream/buildstream!1344
-rw-r--r-- | src/buildstream/_pipeline.py | 5 | ||||
-rw-r--r-- | src/buildstream/_scheduler/queues/buildqueue.py | 12 | ||||
-rw-r--r-- | src/buildstream/_scheduler/queues/fetchqueue.py | 12 | ||||
-rw-r--r-- | src/buildstream/_scheduler/queues/pullqueue.py | 13 | ||||
-rw-r--r-- | src/buildstream/_scheduler/queues/queue.py | 108 | ||||
-rw-r--r-- | src/buildstream/_stream.py | 1 | ||||
-rw-r--r-- | src/buildstream/element.py | 119 |
7 files changed, 202 insertions, 68 deletions
diff --git a/src/buildstream/_pipeline.py b/src/buildstream/_pipeline.py index c176b82f6..d44813348 100644 --- a/src/buildstream/_pipeline.py +++ b/src/buildstream/_pipeline.py @@ -513,4 +513,9 @@ class _Planner(): self.plan_element(root, 0) depth_sorted = sorted(self.depth_map.items(), key=itemgetter(1), reverse=True) + + # Set the depth of each element + for index, item in enumerate(depth_sorted): + item[0]._set_depth(index) + return [item[0] for item in depth_sorted if plan_cached or not item[0]._cached_success()] diff --git a/src/buildstream/_scheduler/queues/buildqueue.py b/src/buildstream/_scheduler/queues/buildqueue.py index dc82f54ec..b280661cc 100644 --- a/src/buildstream/_scheduler/queues/buildqueue.py +++ b/src/buildstream/_scheduler/queues/buildqueue.py @@ -70,16 +70,11 @@ class BuildQueue(Queue): return element._assemble() def status(self, element): - if not element._is_required(): - # Artifact is not currently required but it may be requested later. - # Keep it in the queue. - return QueueStatus.WAIT - if element._cached_success(): return QueueStatus.SKIP if not element._buildable(): - return QueueStatus.WAIT + return QueueStatus.PENDING return QueueStatus.READY @@ -115,3 +110,8 @@ class BuildQueue(Queue): # if status is JobStatus.OK: self._check_cache_size(job, element, result) + + def register_pending_element(self, element): + # Set a "buildable" callback for an element not yet ready + # to be processed in the build queue. + element._set_buildable_callback(self._enqueue_element) diff --git a/src/buildstream/_scheduler/queues/fetchqueue.py b/src/buildstream/_scheduler/queues/fetchqueue.py index 90db77f42..bbb3b3d78 100644 --- a/src/buildstream/_scheduler/queues/fetchqueue.py +++ b/src/buildstream/_scheduler/queues/fetchqueue.py @@ -45,15 +45,10 @@ class FetchQueue(Queue): element._fetch(fetch_original=self._fetch_original) def status(self, element): - if not element._is_required(): - # Artifact is not currently required but it may be requested later. - # Keep it in the queue. - return QueueStatus.WAIT - # Optionally skip elements that are already in the artifact cache if self._skip_cached: if not element._can_query_cache(): - return QueueStatus.WAIT + return QueueStatus.PENDING if element._cached(): return QueueStatus.SKIP @@ -78,3 +73,8 @@ class FetchQueue(Queue): assert element._get_consistency() == Consistency.CACHED else: assert element._source_cached() + + def register_pending_element(self, element): + # Set a "can_query_cache" callback for an element not yet ready + # to be processed in the fetch queue. + element._set_can_query_cache_callback(self._enqueue_element) diff --git a/src/buildstream/_scheduler/queues/pullqueue.py b/src/buildstream/_scheduler/queues/pullqueue.py index 374181cda..245293342 100644 --- a/src/buildstream/_scheduler/queues/pullqueue.py +++ b/src/buildstream/_scheduler/queues/pullqueue.py @@ -39,13 +39,8 @@ class PullQueue(Queue): raise SkipJob(self.action_name) def status(self, element): - if not element._is_required(): - # Artifact is not currently required but it may be requested later. - # Keep it in the queue. - return QueueStatus.WAIT - if not element._can_query_cache(): - return QueueStatus.WAIT + return QueueStatus.PENDING if element._pull_pending(): return QueueStatus.READY @@ -64,3 +59,9 @@ class PullQueue(Queue): # actually check the cache size. if status is JobStatus.OK: self._scheduler.check_cache_size() + + def register_pending_element(self, element): + # Set a "can_query_cache"_callback for an element which is not + # immediately ready to query the artifact cache so that it + # may be pulled. + element._set_can_query_cache_callback(self._enqueue_element) diff --git a/src/buildstream/_scheduler/queues/queue.py b/src/buildstream/_scheduler/queues/queue.py index 7740896b5..9a07f633c 100644 --- a/src/buildstream/_scheduler/queues/queue.py +++ b/src/buildstream/_scheduler/queues/queue.py @@ -22,6 +22,7 @@ import os from collections import deque from enum import Enum +import heapq import traceback # Local imports @@ -29,7 +30,7 @@ from ..jobs import ElementJob, JobStatus from ..resources import ResourceType # BuildStream toplevel imports -from ..._exceptions import BstError, set_last_task_error +from ..._exceptions import BstError, ImplError, set_last_task_error from ..._message import Message, MessageType @@ -37,8 +38,8 @@ from ..._message import Message, MessageType # # class QueueStatus(Enum): - # The element is waiting for dependencies. - WAIT = 1 + # The element is not yet ready to be processed in the queue. + PENDING = 1 # The element can skip this queue. SKIP = 2 @@ -73,10 +74,12 @@ class Queue(): # self._scheduler = scheduler self._resources = scheduler.resources # Shared resource pool - self._wait_queue = deque() # Ready / Waiting elements + self._ready_queue = [] # Ready elements self._done_queue = deque() # Processed / Skipped elements self._max_retries = 0 + self._required_element_check = False # Whether we should check that elements are required before enqueuing + # Assert the subclass has setup class data assert self.action_name is not None assert self.complete_name is not None @@ -105,7 +108,9 @@ class Queue(): # status() # - # Abstract method for reporting the status of an element. + # Abstract method for reporting the immediate status of an element. The status + # determines whether an element can/cannot be pushed into the queue, or even + # skip the queue entirely, when called. # # Args: # element (Element): An element to process @@ -130,6 +135,23 @@ class Queue(): pass ##################################################### + # Virtual Methods for Queue implementations # + ##################################################### + + # register_pending_element() + # + # Virtual method for registering a queue specific callback + # to an Element which is not immediately ready to advance + # to the next queue + # + # Args: + # element (Element): The element waiting to be pushed into the queue + # + def register_pending_element(self, element): + raise ImplError("Queue type: {} does not implement register_pending_element()" + .format(self.action_name)) + + ##################################################### # Scheduler / Pipeline facing APIs # ##################################################### @@ -144,18 +166,12 @@ class Queue(): if not elts: return - # Place skipped elements on the done queue right away. - # - # The remaining ready and waiting elements must remain in the - # same queue, and ready status must be determined at the moment - # which the scheduler is asking for the next job. - # - skip = [elt for elt in elts if self.status(elt) == QueueStatus.SKIP] - wait = [elt for elt in elts if elt not in skip] - - self.skipped_elements.extend(skip) # Public record of skipped elements - self._done_queue.extend(skip) # Elements to be processed - self._wait_queue.extend(wait) # Elements eligible to be dequeued + # Obtain immediate element status + for elt in elts: + if self._required_element_check and not elt._is_required(): + elt._set_required_callback(self._enqueue_element) + else: + self._enqueue_element(elt) # dequeue() # @@ -181,35 +197,22 @@ class Queue(): # harvest_jobs() # - # Process elements in the queue, moving elements which were enqueued - # into the dequeue pool, and creating as many jobs for which resources + # Spawn as many jobs from the ready queue for which resources # can be reserved. # # Returns: # ([Job]): A list of jobs which can be run now # def harvest_jobs(self): - unready = [] ready = [] - - while self._wait_queue: - if not self._resources.reserve(self.resources, peek=True): + while self._ready_queue: + # Now reserve them + reserved = self._resources.reserve(self.resources) + if not reserved: break - element = self._wait_queue.popleft() - status = self.status(element) - - if status == QueueStatus.WAIT: - unready.append(element) - elif status == QueueStatus.SKIP: - self._done_queue.append(element) - self.skipped_elements.append(element) - else: - reserved = self._resources.reserve(self.resources) - assert reserved - ready.append(element) - - self._wait_queue.extendleft(unready) + _, element = heapq.heappop(self._ready_queue) + ready.append(element) return [ ElementJob(self._scheduler, self.action_name, @@ -221,6 +224,13 @@ class Queue(): for element in ready ] + # set_required_element_check() + # + # This ensures that, for the first non-track queue, we must check + # whether elements are required before enqueuing them + def set_required_element_check(self): + self._required_element_check = True + ##################################################### # Private Methods # ##################################################### @@ -326,3 +336,27 @@ class Queue(): logfile = "{key}-{action}".format(key=key, action=action) return os.path.join(project.name, element.normal_name, logfile) + + # _enqueue_element() + # + # Enqueue an Element upon a callback to a specific queue + # Here we check whether an element is either immediately ready to be processed + # in the current queue or whether it can skip the queue. Element's which are + # not yet ready to be processed or cannot skip will have the appropriate + # callback registered + # + # Args: + # element (Element): The Element to enqueue + # + def _enqueue_element(self, element): + status = self.status(element) + if status == QueueStatus.SKIP: + # Place skipped elements into the done queue immediately + self.skipped_elements.append(element) # Public record of skipped elements + self._done_queue.append(element) # Elements to proceed to the next queue + elif status == QueueStatus.READY: + # Push elements which are ready to be processed immediately into the queue + heapq.heappush(self._ready_queue, (element._depth, element)) + else: + # Register a queue specific callback for pending elements + self.register_pending_element(element) diff --git a/src/buildstream/_stream.py b/src/buildstream/_stream.py index 2343c553c..537671679 100644 --- a/src/buildstream/_stream.py +++ b/src/buildstream/_stream.py @@ -1180,6 +1180,7 @@ class Stream(): if not (track or self._first_non_track_queue): self._first_non_track_queue = queue + self._first_non_track_queue.set_required_element_check() # _enqueue_plan() # diff --git a/src/buildstream/element.py b/src/buildstream/element.py index 909a0e851..84c8f20ff 100644 --- a/src/buildstream/element.py +++ b/src/buildstream/element.py @@ -207,7 +207,8 @@ class Element(Plugin): self.__runtime_dependencies = [] # Direct runtime dependency Elements self.__build_dependencies = [] # Direct build dependency Elements self.__reverse_dependencies = set() # Direct reverse dependency Elements - self.__ready_for_runtime = False # Wether the element has all its dependencies ready and has a cache key + self.__ready_for_runtime = False # Whether the element has all dependencies ready and has a cache key + self.__ready_for_runtime_and_cached = False # Whether the element has all deps ready for runtime and cached self.__sources = [] # List of Sources self.__weak_cache_key = None # Our cached weak cache key self.__strict_cache_key = None # Our cached cache key for strict builds @@ -238,6 +239,13 @@ class Element(Plugin): self.__batch_prepare_assemble_flags = 0 # Sandbox flags for batching across prepare()/assemble() self.__batch_prepare_assemble_collect = None # Collect dir for batching across prepare()/assemble() + # Callbacks + self.__required_callback = None # Callback to Queues + self.__can_query_cache_callback = None # Callback to PullQueue/FetchQueue + self.__buildable_callback = None # Callback to BuildQueue + + self._depth = None # Depth of Element in its current dependency graph + # Ensure we have loaded this class's defaults self.__init_defaults(project, plugin_conf, meta.kind, meta.is_junction) @@ -1120,19 +1128,10 @@ class Element(Plugin): not self._source_cached(): return False - for dependency in self.dependencies(Scope.BUILD): - # In non-strict mode an element's strong cache key may not be available yet - # even though an artifact is available in the local cache. This can happen - # if the pull job is still pending as the remote cache may have an artifact - # that matches the strict cache key, which is preferred over a locally - # cached artifact with a weak cache key match. - if not dependency._cached_success() or not dependency._get_cache_key(strength=_KeyStrength.STRONG): - return False - if not self.__assemble_scheduled: return False - return True + return all(dep.__ready_for_runtime_and_cached for dep in self.dependencies(Scope.BUILD, recurse=False)) # _get_cache_key(): # @@ -1190,9 +1189,14 @@ class Element(Plugin): if self._get_workspace() and self.__assemble_scheduled: # If we have an active workspace and are going to build, then - # discard current cache key values as their correct values can only - # be calculated once the build is complete + # discard current cache key values and invoke the buildable callback. + # The correct keys can only be calculated once the build is complete self.__reset_cache_data() + + if self.__buildable_callback is not None and self._buildable(): + self.__buildable_callback(self) + self.__buildable_callback = None + return self.__update_cache_keys() @@ -1210,6 +1214,13 @@ class Element(Plugin): not self._cached_success() and not self._pull_pending()): self._schedule_assemble() + + # If a build has been scheduled, we know that the element + # is not cached and can allow cache query even if the strict cache + # key is not available yet. + if self.__can_query_cache_callback is not None: + self.__can_query_cache_callback(self) + return if not context.get_strict(): @@ -1219,6 +1230,17 @@ class Element(Plugin): self.__ready_for_runtime = all( dep.__ready_for_runtime for dep in self.__runtime_dependencies) + if self.__ready_for_runtime: + # ready_for_runtime_and_cached is stronger than ready_for_runtime, so don't + # check the former if the latter is False + if not self.__ready_for_runtime_and_cached and self._cached_success(): + self.__ready_for_runtime_and_cached = all( + dep.__ready_for_runtime_and_cached for dep in self.__runtime_dependencies) + + if self.__buildable_callback is not None and self._buildable(): + self.__buildable_callback(self) + self.__buildable_callback = None + # _get_display_key(): # # Returns cache keys for display purposes @@ -1501,6 +1523,11 @@ class Element(Plugin): self._update_state() + # Callback to the Queue + if self.__required_callback is not None: + self.__required_callback(self) + self.__required_callback = None + # _is_required(): # # Returns whether this element has been marked as required. @@ -2235,6 +2262,66 @@ class Element(Plugin): else: return True + # _set_required_callback() + # + # + # Notify the pull/fetch/build queue that the element is potentially + # ready to be processed. + # + # _Set the _required_callback - the _required_callback is invoked when an + # element is marked as required. This informs us that the element needs to + # either be pulled or fetched + built. + # + # Args: + # callback (callable) - The callback function + # + def _set_required_callback(self, callback): + self.__required_callback = callback + + # _set_can_query_cache_callback() + # + # Notify the pull/fetch queue that the element is potentially + # ready to be processed. + # + # Set the _can_query_cache_callback - the _can_query_cache_callback is + # invoked when an element becomes able to query the cache. That is, + # the (non-workspaced) element's strict cache key has been calculated. + # However, if the element is workspaced, we also invoke this callback + # once its build has been scheduled - this ensures that the workspaced + # element does not get blocked in the pull queue. + # + # Args: + # callback (callable) - The callback function + # + def _set_can_query_cache_callback(self, callback): + self.__can_query_cache_callback = callback + + # _set_buildable_callback() + # + # Notifiy the build queue that the element is potentially ready + # to be processed + # + # Set the _buildable_callback - the _buildable_callback is invoked when + # an element is marked as "buildable". That is, its sources are consistent, + # its been scheduled to be built and all of its build dependencies have + # had their cache key's calculated and are cached. + # + # Args: + # callback (callable) - The callback function + # + def _set_buildable_callback(self, callback): + self.__buildable_callback = callback + + # _set_depth() + # + # Set the depth of the Element. + # + # The depth represents the position of the Element within the current + # session's dependency graph. A depth of zero represents the bottommost element. + # + def _set_depth(self, depth): + self._depth = depth + ############################################################# # Private Local Methods # ############################################################# @@ -2873,10 +2960,12 @@ class Element(Plugin): element = queue.pop() old_ready_for_runtime = element.__ready_for_runtime + old_ready_for_runtime_and_cached = element.__ready_for_runtime_and_cached old_strict_cache_key = element.__strict_cache_key element._update_state() if element.__ready_for_runtime != old_ready_for_runtime or \ + element.__ready_for_runtime_and_cached != old_ready_for_runtime_and_cached or \ element.__strict_cache_key != old_strict_cache_key: for rdep in element.__reverse_dependencies: queue.push(rdep._unique_id, rdep) @@ -2944,6 +3033,10 @@ class Element(Plugin): ] self.__strict_cache_key = self._calculate_cache_key(dependencies) + if self.__strict_cache_key is not None and self.__can_query_cache_callback is not None: + self.__can_query_cache_callback(self) + self.__can_query_cache_callback = None + # __update_artifact_state() # # Updates the data involved in knowing about the artifact corresponding |