summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/buildstream/_scheduler/queues/artifactpushqueue.py11
-rw-r--r--src/buildstream/_scheduler/queues/buildqueue.py10
-rw-r--r--src/buildstream/_scheduler/queues/fetchqueue.py15
-rw-r--r--src/buildstream/_scheduler/queues/pullqueue.py11
-rw-r--r--src/buildstream/_scheduler/queues/queue.py25
-rw-r--r--src/buildstream/_scheduler/queues/sourcepushqueue.py11
-rw-r--r--src/buildstream/_scheduler/queues/trackqueue.py8
7 files changed, 62 insertions, 29 deletions
diff --git a/src/buildstream/_scheduler/queues/artifactpushqueue.py b/src/buildstream/_scheduler/queues/artifactpushqueue.py
index b861d4fc7..5b240e932 100644
--- a/src/buildstream/_scheduler/queues/artifactpushqueue.py
+++ b/src/buildstream/_scheduler/queues/artifactpushqueue.py
@@ -32,13 +32,16 @@ class ArtifactPushQueue(Queue):
complete_name = "Pushed"
resources = [ResourceType.UPLOAD]
- def process(self, element):
- # returns whether an artifact was uploaded or not
- if not element._push():
- raise SkipJob(self.action_name)
+ def get_process_func(self):
+ return ArtifactPushQueue._raise_skip_if_not_pushed
def status(self, element):
if element._skip_push():
return QueueStatus.SKIP
return QueueStatus.READY
+
+ @staticmethod
+ def _raise_skip_if_not_pushed(element):
+ if not element._push():
+ raise SkipJob(ArtifactPushQueue.action_name)
diff --git a/src/buildstream/_scheduler/queues/buildqueue.py b/src/buildstream/_scheduler/queues/buildqueue.py
index b280661cc..1dd45607b 100644
--- a/src/buildstream/_scheduler/queues/buildqueue.py
+++ b/src/buildstream/_scheduler/queues/buildqueue.py
@@ -57,7 +57,7 @@ class BuildQueue(Queue):
logfile=logfile)
job = ElementJob(self._scheduler, self.action_name,
logfile, element=element, queue=self,
- action_cb=self.process,
+ action_cb=self.get_process_func(),
complete_cb=self._job_done,
max_retries=self._max_retries)
self._done_queue.append(element)
@@ -66,8 +66,8 @@ class BuildQueue(Queue):
return super().enqueue(to_queue)
- def process(self, element):
- return element._assemble()
+ def get_process_func(self):
+ return BuildQueue._assemble_element
def status(self, element):
if element._cached_success():
@@ -115,3 +115,7 @@ class BuildQueue(Queue):
# Set a "buildable" callback for an element not yet ready
# to be processed in the build queue.
element._set_buildable_callback(self._enqueue_element)
+
+ @staticmethod
+ def _assemble_element(element):
+ return element._assemble()
diff --git a/src/buildstream/_scheduler/queues/fetchqueue.py b/src/buildstream/_scheduler/queues/fetchqueue.py
index bbb3b3d78..3d0f80342 100644
--- a/src/buildstream/_scheduler/queues/fetchqueue.py
+++ b/src/buildstream/_scheduler/queues/fetchqueue.py
@@ -41,8 +41,11 @@ class FetchQueue(Queue):
self._skip_cached = skip_cached
self._fetch_original = fetch_original
- def process(self, element):
- element._fetch(fetch_original=self._fetch_original)
+ def get_process_func(self):
+ if self._fetch_original:
+ return FetchQueue._fetch_original
+ else:
+ return FetchQueue._fetch_no_original
def status(self, element):
# Optionally skip elements that are already in the artifact cache
@@ -78,3 +81,11 @@ class FetchQueue(Queue):
# Set a "can_query_cache" callback for an element not yet ready
# to be processed in the fetch queue.
element._set_can_query_cache_callback(self._enqueue_element)
+
+ @staticmethod
+ def _fetch_no_original(element):
+ element._fetch(fetch_original=False)
+
+ @staticmethod
+ def _fetch_original(element):
+ element._fetch(fetch_original=True)
diff --git a/src/buildstream/_scheduler/queues/pullqueue.py b/src/buildstream/_scheduler/queues/pullqueue.py
index 245293342..dfb00aa21 100644
--- a/src/buildstream/_scheduler/queues/pullqueue.py
+++ b/src/buildstream/_scheduler/queues/pullqueue.py
@@ -33,10 +33,8 @@ class PullQueue(Queue):
complete_name = "Pulled"
resources = [ResourceType.DOWNLOAD, ResourceType.CACHE]
- def process(self, element):
- # returns whether an artifact was downloaded or not
- if not element._pull():
- raise SkipJob(self.action_name)
+ def get_process_func(self):
+ return PullQueue._raise_skip_if_not_pulled
def status(self, element):
if not element._can_query_cache():
@@ -65,3 +63,8 @@ class PullQueue(Queue):
# immediately ready to query the artifact cache so that it
# may be pulled.
element._set_can_query_cache_callback(self._enqueue_element)
+
+ @staticmethod
+ def _raise_skip_if_not_pulled(element):
+ if not element._pull():
+ raise SkipJob(PullQueue.action_name)
diff --git a/src/buildstream/_scheduler/queues/queue.py b/src/buildstream/_scheduler/queues/queue.py
index 7c577e7bd..f2cefd5d2 100644
--- a/src/buildstream/_scheduler/queues/queue.py
+++ b/src/buildstream/_scheduler/queues/queue.py
@@ -91,20 +91,25 @@ class Queue():
# Abstract Methods for Queue implementations #
#####################################################
- # process()
+ # get_process_func()
#
- # Abstract method for processing an element
+ # Abstract method, returns a callable for processing an element.
#
- # Args:
- # element (Element): An element to process
+ # The callable should fit the signature `process(element: Element) -> any`.
#
- # Returns:
- # (any): An optional something to be returned
- # for every element successfully processed
+ # Note that the callable may be executed in a child process, so the return
+ # value should be a simple object (must be pickle-able, i.e. strings,
+ # lists, dicts, numbers, but not Element instances). This is sent to back
+ # to the main process.
#
+ # This method is the only way for a queue to affect elements, and so is
+ # not optional to implement.
#
- def process(self, element):
- pass
+ # Returns:
+ # (Callable[[Element], Any]): The callable for processing elements.
+ #
+ def get_process_func(self):
+ raise NotImplementedError()
# status()
#
@@ -218,7 +223,7 @@ class Queue():
ElementJob(self._scheduler, self.action_name,
self._element_log_path(element),
element=element, queue=self,
- action_cb=self.process,
+ action_cb=self.get_process_func(),
complete_cb=self._job_done,
max_retries=self._max_retries)
for element in ready
diff --git a/src/buildstream/_scheduler/queues/sourcepushqueue.py b/src/buildstream/_scheduler/queues/sourcepushqueue.py
index c38460e6a..f0926654c 100644
--- a/src/buildstream/_scheduler/queues/sourcepushqueue.py
+++ b/src/buildstream/_scheduler/queues/sourcepushqueue.py
@@ -30,13 +30,16 @@ class SourcePushQueue(Queue):
complete_name = "Sources pushed"
resources = [ResourceType.UPLOAD]
- def process(self, element):
- # Returns whether a source was pushed or not
- if not element._source_push():
- raise SkipJob(self.action_name)
+ def get_process_func(self):
+ return SourcePushQueue._raise_skip_if_not_pushed
def status(self, element):
if element._skip_source_push():
return QueueStatus.SKIP
return QueueStatus.READY
+
+ @staticmethod
+ def _raise_skip_if_not_pushed(element):
+ if not element._source_push():
+ raise SkipJob(SourcePushQueue.action_name)
diff --git a/src/buildstream/_scheduler/queues/trackqueue.py b/src/buildstream/_scheduler/queues/trackqueue.py
index 194bb7e1d..6bdf838f9 100644
--- a/src/buildstream/_scheduler/queues/trackqueue.py
+++ b/src/buildstream/_scheduler/queues/trackqueue.py
@@ -35,8 +35,8 @@ class TrackQueue(Queue):
complete_name = "Tracked"
resources = [ResourceType.DOWNLOAD]
- def process(self, element):
- return element._track()
+ def get_process_func(self):
+ return TrackQueue._track_element
def status(self, element):
# We can skip elements entirely if they have no sources.
@@ -60,3 +60,7 @@ class TrackQueue(Queue):
source._set_ref(new_ref, save=True)
element._tracking_done()
+
+ @staticmethod
+ def _track_element(element):
+ return element._track()