summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorbst-marge-bot <marge-bot@buildstream.build>2019-06-07 15:05:50 +0000
committerbst-marge-bot <marge-bot@buildstream.build>2019-06-07 15:05:50 +0000
commit16338464b15f8836cd6e84b9e7bc11047494cc66 (patch)
treeb11b6021fd5dca76b525918c7cf8c1c1d3f8da51
parent640f0ca5a17a144448a48de2a80e1f7a655eb9b2 (diff)
parent2908ca8f2b417f7b0824a934770619d0f8cf8909 (diff)
downloadbuildstream-16338464b15f8836cd6e84b9e7bc11047494cc66.tar.gz
Merge branch 'jennis/push_based_pipeline' into 'master'
Push based pipeline Closes #1002 and #943 See merge request BuildStream/buildstream!1344
-rw-r--r--src/buildstream/_pipeline.py5
-rw-r--r--src/buildstream/_scheduler/queues/buildqueue.py12
-rw-r--r--src/buildstream/_scheduler/queues/fetchqueue.py12
-rw-r--r--src/buildstream/_scheduler/queues/pullqueue.py13
-rw-r--r--src/buildstream/_scheduler/queues/queue.py108
-rw-r--r--src/buildstream/_stream.py1
-rw-r--r--src/buildstream/element.py119
7 files changed, 202 insertions, 68 deletions
diff --git a/src/buildstream/_pipeline.py b/src/buildstream/_pipeline.py
index c176b82f6..d44813348 100644
--- a/src/buildstream/_pipeline.py
+++ b/src/buildstream/_pipeline.py
@@ -513,4 +513,9 @@ class _Planner():
self.plan_element(root, 0)
depth_sorted = sorted(self.depth_map.items(), key=itemgetter(1), reverse=True)
+
+ # Set the depth of each element
+ for index, item in enumerate(depth_sorted):
+ item[0]._set_depth(index)
+
return [item[0] for item in depth_sorted if plan_cached or not item[0]._cached_success()]
diff --git a/src/buildstream/_scheduler/queues/buildqueue.py b/src/buildstream/_scheduler/queues/buildqueue.py
index dc82f54ec..b280661cc 100644
--- a/src/buildstream/_scheduler/queues/buildqueue.py
+++ b/src/buildstream/_scheduler/queues/buildqueue.py
@@ -70,16 +70,11 @@ class BuildQueue(Queue):
return element._assemble()
def status(self, element):
- if not element._is_required():
- # Artifact is not currently required but it may be requested later.
- # Keep it in the queue.
- return QueueStatus.WAIT
-
if element._cached_success():
return QueueStatus.SKIP
if not element._buildable():
- return QueueStatus.WAIT
+ return QueueStatus.PENDING
return QueueStatus.READY
@@ -115,3 +110,8 @@ class BuildQueue(Queue):
#
if status is JobStatus.OK:
self._check_cache_size(job, element, result)
+
+ def register_pending_element(self, element):
+ # Set a "buildable" callback for an element not yet ready
+ # to be processed in the build queue.
+ element._set_buildable_callback(self._enqueue_element)
diff --git a/src/buildstream/_scheduler/queues/fetchqueue.py b/src/buildstream/_scheduler/queues/fetchqueue.py
index 90db77f42..bbb3b3d78 100644
--- a/src/buildstream/_scheduler/queues/fetchqueue.py
+++ b/src/buildstream/_scheduler/queues/fetchqueue.py
@@ -45,15 +45,10 @@ class FetchQueue(Queue):
element._fetch(fetch_original=self._fetch_original)
def status(self, element):
- if not element._is_required():
- # Artifact is not currently required but it may be requested later.
- # Keep it in the queue.
- return QueueStatus.WAIT
-
# Optionally skip elements that are already in the artifact cache
if self._skip_cached:
if not element._can_query_cache():
- return QueueStatus.WAIT
+ return QueueStatus.PENDING
if element._cached():
return QueueStatus.SKIP
@@ -78,3 +73,8 @@ class FetchQueue(Queue):
assert element._get_consistency() == Consistency.CACHED
else:
assert element._source_cached()
+
+ def register_pending_element(self, element):
+ # Set a "can_query_cache" callback for an element not yet ready
+ # to be processed in the fetch queue.
+ element._set_can_query_cache_callback(self._enqueue_element)
diff --git a/src/buildstream/_scheduler/queues/pullqueue.py b/src/buildstream/_scheduler/queues/pullqueue.py
index 374181cda..245293342 100644
--- a/src/buildstream/_scheduler/queues/pullqueue.py
+++ b/src/buildstream/_scheduler/queues/pullqueue.py
@@ -39,13 +39,8 @@ class PullQueue(Queue):
raise SkipJob(self.action_name)
def status(self, element):
- if not element._is_required():
- # Artifact is not currently required but it may be requested later.
- # Keep it in the queue.
- return QueueStatus.WAIT
-
if not element._can_query_cache():
- return QueueStatus.WAIT
+ return QueueStatus.PENDING
if element._pull_pending():
return QueueStatus.READY
@@ -64,3 +59,9 @@ class PullQueue(Queue):
# actually check the cache size.
if status is JobStatus.OK:
self._scheduler.check_cache_size()
+
+ def register_pending_element(self, element):
+ # Set a "can_query_cache"_callback for an element which is not
+ # immediately ready to query the artifact cache so that it
+ # may be pulled.
+ element._set_can_query_cache_callback(self._enqueue_element)
diff --git a/src/buildstream/_scheduler/queues/queue.py b/src/buildstream/_scheduler/queues/queue.py
index 7740896b5..9a07f633c 100644
--- a/src/buildstream/_scheduler/queues/queue.py
+++ b/src/buildstream/_scheduler/queues/queue.py
@@ -22,6 +22,7 @@
import os
from collections import deque
from enum import Enum
+import heapq
import traceback
# Local imports
@@ -29,7 +30,7 @@ from ..jobs import ElementJob, JobStatus
from ..resources import ResourceType
# BuildStream toplevel imports
-from ..._exceptions import BstError, set_last_task_error
+from ..._exceptions import BstError, ImplError, set_last_task_error
from ..._message import Message, MessageType
@@ -37,8 +38,8 @@ from ..._message import Message, MessageType
#
#
class QueueStatus(Enum):
- # The element is waiting for dependencies.
- WAIT = 1
+ # The element is not yet ready to be processed in the queue.
+ PENDING = 1
# The element can skip this queue.
SKIP = 2
@@ -73,10 +74,12 @@ class Queue():
#
self._scheduler = scheduler
self._resources = scheduler.resources # Shared resource pool
- self._wait_queue = deque() # Ready / Waiting elements
+ self._ready_queue = [] # Ready elements
self._done_queue = deque() # Processed / Skipped elements
self._max_retries = 0
+ self._required_element_check = False # Whether we should check that elements are required before enqueuing
+
# Assert the subclass has setup class data
assert self.action_name is not None
assert self.complete_name is not None
@@ -105,7 +108,9 @@ class Queue():
# status()
#
- # Abstract method for reporting the status of an element.
+ # Abstract method for reporting the immediate status of an element. The status
+ # determines whether an element can/cannot be pushed into the queue, or even
+ # skip the queue entirely, when called.
#
# Args:
# element (Element): An element to process
@@ -130,6 +135,23 @@ class Queue():
pass
#####################################################
+ # Virtual Methods for Queue implementations #
+ #####################################################
+
+ # register_pending_element()
+ #
+ # Virtual method for registering a queue specific callback
+ # to an Element which is not immediately ready to advance
+ # to the next queue
+ #
+ # Args:
+ # element (Element): The element waiting to be pushed into the queue
+ #
+ def register_pending_element(self, element):
+ raise ImplError("Queue type: {} does not implement register_pending_element()"
+ .format(self.action_name))
+
+ #####################################################
# Scheduler / Pipeline facing APIs #
#####################################################
@@ -144,18 +166,12 @@ class Queue():
if not elts:
return
- # Place skipped elements on the done queue right away.
- #
- # The remaining ready and waiting elements must remain in the
- # same queue, and ready status must be determined at the moment
- # which the scheduler is asking for the next job.
- #
- skip = [elt for elt in elts if self.status(elt) == QueueStatus.SKIP]
- wait = [elt for elt in elts if elt not in skip]
-
- self.skipped_elements.extend(skip) # Public record of skipped elements
- self._done_queue.extend(skip) # Elements to be processed
- self._wait_queue.extend(wait) # Elements eligible to be dequeued
+ # Obtain immediate element status
+ for elt in elts:
+ if self._required_element_check and not elt._is_required():
+ elt._set_required_callback(self._enqueue_element)
+ else:
+ self._enqueue_element(elt)
# dequeue()
#
@@ -181,35 +197,22 @@ class Queue():
# harvest_jobs()
#
- # Process elements in the queue, moving elements which were enqueued
- # into the dequeue pool, and creating as many jobs for which resources
+ # Spawn as many jobs from the ready queue for which resources
# can be reserved.
#
# Returns:
# ([Job]): A list of jobs which can be run now
#
def harvest_jobs(self):
- unready = []
ready = []
-
- while self._wait_queue:
- if not self._resources.reserve(self.resources, peek=True):
+ while self._ready_queue:
+ # Now reserve them
+ reserved = self._resources.reserve(self.resources)
+ if not reserved:
break
- element = self._wait_queue.popleft()
- status = self.status(element)
-
- if status == QueueStatus.WAIT:
- unready.append(element)
- elif status == QueueStatus.SKIP:
- self._done_queue.append(element)
- self.skipped_elements.append(element)
- else:
- reserved = self._resources.reserve(self.resources)
- assert reserved
- ready.append(element)
-
- self._wait_queue.extendleft(unready)
+ _, element = heapq.heappop(self._ready_queue)
+ ready.append(element)
return [
ElementJob(self._scheduler, self.action_name,
@@ -221,6 +224,13 @@ class Queue():
for element in ready
]
+ # set_required_element_check()
+ #
+ # This ensures that, for the first non-track queue, we must check
+ # whether elements are required before enqueuing them
+ def set_required_element_check(self):
+ self._required_element_check = True
+
#####################################################
# Private Methods #
#####################################################
@@ -326,3 +336,27 @@ class Queue():
logfile = "{key}-{action}".format(key=key, action=action)
return os.path.join(project.name, element.normal_name, logfile)
+
+ # _enqueue_element()
+ #
+ # Enqueue an Element upon a callback to a specific queue
+ # Here we check whether an element is either immediately ready to be processed
+ # in the current queue or whether it can skip the queue. Element's which are
+ # not yet ready to be processed or cannot skip will have the appropriate
+ # callback registered
+ #
+ # Args:
+ # element (Element): The Element to enqueue
+ #
+ def _enqueue_element(self, element):
+ status = self.status(element)
+ if status == QueueStatus.SKIP:
+ # Place skipped elements into the done queue immediately
+ self.skipped_elements.append(element) # Public record of skipped elements
+ self._done_queue.append(element) # Elements to proceed to the next queue
+ elif status == QueueStatus.READY:
+ # Push elements which are ready to be processed immediately into the queue
+ heapq.heappush(self._ready_queue, (element._depth, element))
+ else:
+ # Register a queue specific callback for pending elements
+ self.register_pending_element(element)
diff --git a/src/buildstream/_stream.py b/src/buildstream/_stream.py
index 2343c553c..537671679 100644
--- a/src/buildstream/_stream.py
+++ b/src/buildstream/_stream.py
@@ -1180,6 +1180,7 @@ class Stream():
if not (track or self._first_non_track_queue):
self._first_non_track_queue = queue
+ self._first_non_track_queue.set_required_element_check()
# _enqueue_plan()
#
diff --git a/src/buildstream/element.py b/src/buildstream/element.py
index 909a0e851..84c8f20ff 100644
--- a/src/buildstream/element.py
+++ b/src/buildstream/element.py
@@ -207,7 +207,8 @@ class Element(Plugin):
self.__runtime_dependencies = [] # Direct runtime dependency Elements
self.__build_dependencies = [] # Direct build dependency Elements
self.__reverse_dependencies = set() # Direct reverse dependency Elements
- self.__ready_for_runtime = False # Wether the element has all its dependencies ready and has a cache key
+ self.__ready_for_runtime = False # Whether the element has all dependencies ready and has a cache key
+ self.__ready_for_runtime_and_cached = False # Whether the element has all deps ready for runtime and cached
self.__sources = [] # List of Sources
self.__weak_cache_key = None # Our cached weak cache key
self.__strict_cache_key = None # Our cached cache key for strict builds
@@ -238,6 +239,13 @@ class Element(Plugin):
self.__batch_prepare_assemble_flags = 0 # Sandbox flags for batching across prepare()/assemble()
self.__batch_prepare_assemble_collect = None # Collect dir for batching across prepare()/assemble()
+ # Callbacks
+ self.__required_callback = None # Callback to Queues
+ self.__can_query_cache_callback = None # Callback to PullQueue/FetchQueue
+ self.__buildable_callback = None # Callback to BuildQueue
+
+ self._depth = None # Depth of Element in its current dependency graph
+
# Ensure we have loaded this class's defaults
self.__init_defaults(project, plugin_conf, meta.kind, meta.is_junction)
@@ -1120,19 +1128,10 @@ class Element(Plugin):
not self._source_cached():
return False
- for dependency in self.dependencies(Scope.BUILD):
- # In non-strict mode an element's strong cache key may not be available yet
- # even though an artifact is available in the local cache. This can happen
- # if the pull job is still pending as the remote cache may have an artifact
- # that matches the strict cache key, which is preferred over a locally
- # cached artifact with a weak cache key match.
- if not dependency._cached_success() or not dependency._get_cache_key(strength=_KeyStrength.STRONG):
- return False
-
if not self.__assemble_scheduled:
return False
- return True
+ return all(dep.__ready_for_runtime_and_cached for dep in self.dependencies(Scope.BUILD, recurse=False))
# _get_cache_key():
#
@@ -1190,9 +1189,14 @@ class Element(Plugin):
if self._get_workspace() and self.__assemble_scheduled:
# If we have an active workspace and are going to build, then
- # discard current cache key values as their correct values can only
- # be calculated once the build is complete
+ # discard current cache key values and invoke the buildable callback.
+ # The correct keys can only be calculated once the build is complete
self.__reset_cache_data()
+
+ if self.__buildable_callback is not None and self._buildable():
+ self.__buildable_callback(self)
+ self.__buildable_callback = None
+
return
self.__update_cache_keys()
@@ -1210,6 +1214,13 @@ class Element(Plugin):
not self._cached_success() and
not self._pull_pending()):
self._schedule_assemble()
+
+ # If a build has been scheduled, we know that the element
+ # is not cached and can allow cache query even if the strict cache
+ # key is not available yet.
+ if self.__can_query_cache_callback is not None:
+ self.__can_query_cache_callback(self)
+
return
if not context.get_strict():
@@ -1219,6 +1230,17 @@ class Element(Plugin):
self.__ready_for_runtime = all(
dep.__ready_for_runtime for dep in self.__runtime_dependencies)
+ if self.__ready_for_runtime:
+ # ready_for_runtime_and_cached is stronger than ready_for_runtime, so don't
+ # check the former if the latter is False
+ if not self.__ready_for_runtime_and_cached and self._cached_success():
+ self.__ready_for_runtime_and_cached = all(
+ dep.__ready_for_runtime_and_cached for dep in self.__runtime_dependencies)
+
+ if self.__buildable_callback is not None and self._buildable():
+ self.__buildable_callback(self)
+ self.__buildable_callback = None
+
# _get_display_key():
#
# Returns cache keys for display purposes
@@ -1501,6 +1523,11 @@ class Element(Plugin):
self._update_state()
+ # Callback to the Queue
+ if self.__required_callback is not None:
+ self.__required_callback(self)
+ self.__required_callback = None
+
# _is_required():
#
# Returns whether this element has been marked as required.
@@ -2235,6 +2262,66 @@ class Element(Plugin):
else:
return True
+ # _set_required_callback()
+ #
+ #
+ # Notify the pull/fetch/build queue that the element is potentially
+ # ready to be processed.
+ #
+ # _Set the _required_callback - the _required_callback is invoked when an
+ # element is marked as required. This informs us that the element needs to
+ # either be pulled or fetched + built.
+ #
+ # Args:
+ # callback (callable) - The callback function
+ #
+ def _set_required_callback(self, callback):
+ self.__required_callback = callback
+
+ # _set_can_query_cache_callback()
+ #
+ # Notify the pull/fetch queue that the element is potentially
+ # ready to be processed.
+ #
+ # Set the _can_query_cache_callback - the _can_query_cache_callback is
+ # invoked when an element becomes able to query the cache. That is,
+ # the (non-workspaced) element's strict cache key has been calculated.
+ # However, if the element is workspaced, we also invoke this callback
+ # once its build has been scheduled - this ensures that the workspaced
+ # element does not get blocked in the pull queue.
+ #
+ # Args:
+ # callback (callable) - The callback function
+ #
+ def _set_can_query_cache_callback(self, callback):
+ self.__can_query_cache_callback = callback
+
+ # _set_buildable_callback()
+ #
+ # Notifiy the build queue that the element is potentially ready
+ # to be processed
+ #
+ # Set the _buildable_callback - the _buildable_callback is invoked when
+ # an element is marked as "buildable". That is, its sources are consistent,
+ # its been scheduled to be built and all of its build dependencies have
+ # had their cache key's calculated and are cached.
+ #
+ # Args:
+ # callback (callable) - The callback function
+ #
+ def _set_buildable_callback(self, callback):
+ self.__buildable_callback = callback
+
+ # _set_depth()
+ #
+ # Set the depth of the Element.
+ #
+ # The depth represents the position of the Element within the current
+ # session's dependency graph. A depth of zero represents the bottommost element.
+ #
+ def _set_depth(self, depth):
+ self._depth = depth
+
#############################################################
# Private Local Methods #
#############################################################
@@ -2873,10 +2960,12 @@ class Element(Plugin):
element = queue.pop()
old_ready_for_runtime = element.__ready_for_runtime
+ old_ready_for_runtime_and_cached = element.__ready_for_runtime_and_cached
old_strict_cache_key = element.__strict_cache_key
element._update_state()
if element.__ready_for_runtime != old_ready_for_runtime or \
+ element.__ready_for_runtime_and_cached != old_ready_for_runtime_and_cached or \
element.__strict_cache_key != old_strict_cache_key:
for rdep in element.__reverse_dependencies:
queue.push(rdep._unique_id, rdep)
@@ -2944,6 +3033,10 @@ class Element(Plugin):
]
self.__strict_cache_key = self._calculate_cache_key(dependencies)
+ if self.__strict_cache_key is not None and self.__can_query_cache_callback is not None:
+ self.__can_query_cache_callback(self)
+ self.__can_query_cache_callback = None
+
# __update_artifact_state()
#
# Updates the data involved in knowing about the artifact corresponding