diff options
Diffstat (limited to 'src/buildstream')
-rw-r--r-- | src/buildstream/_scheduler/queues/artifactpushqueue.py | 11 | ||||
-rw-r--r-- | src/buildstream/_scheduler/queues/buildqueue.py | 10 | ||||
-rw-r--r-- | src/buildstream/_scheduler/queues/fetchqueue.py | 15 | ||||
-rw-r--r-- | src/buildstream/_scheduler/queues/pullqueue.py | 11 | ||||
-rw-r--r-- | src/buildstream/_scheduler/queues/queue.py | 25 | ||||
-rw-r--r-- | src/buildstream/_scheduler/queues/sourcepushqueue.py | 11 | ||||
-rw-r--r-- | src/buildstream/_scheduler/queues/trackqueue.py | 8 |
7 files changed, 62 insertions, 29 deletions
diff --git a/src/buildstream/_scheduler/queues/artifactpushqueue.py b/src/buildstream/_scheduler/queues/artifactpushqueue.py index b861d4fc7..5b240e932 100644 --- a/src/buildstream/_scheduler/queues/artifactpushqueue.py +++ b/src/buildstream/_scheduler/queues/artifactpushqueue.py @@ -32,13 +32,16 @@ class ArtifactPushQueue(Queue): complete_name = "Pushed" resources = [ResourceType.UPLOAD] - def process(self, element): - # returns whether an artifact was uploaded or not - if not element._push(): - raise SkipJob(self.action_name) + def get_process_func(self): + return ArtifactPushQueue._raise_skip_if_not_pushed def status(self, element): if element._skip_push(): return QueueStatus.SKIP return QueueStatus.READY + + @staticmethod + def _raise_skip_if_not_pushed(element): + if not element._push(): + raise SkipJob(ArtifactPushQueue.action_name) diff --git a/src/buildstream/_scheduler/queues/buildqueue.py b/src/buildstream/_scheduler/queues/buildqueue.py index b280661cc..1dd45607b 100644 --- a/src/buildstream/_scheduler/queues/buildqueue.py +++ b/src/buildstream/_scheduler/queues/buildqueue.py @@ -57,7 +57,7 @@ class BuildQueue(Queue): logfile=logfile) job = ElementJob(self._scheduler, self.action_name, logfile, element=element, queue=self, - action_cb=self.process, + action_cb=self.get_process_func(), complete_cb=self._job_done, max_retries=self._max_retries) self._done_queue.append(element) @@ -66,8 +66,8 @@ class BuildQueue(Queue): return super().enqueue(to_queue) - def process(self, element): - return element._assemble() + def get_process_func(self): + return BuildQueue._assemble_element def status(self, element): if element._cached_success(): @@ -115,3 +115,7 @@ class BuildQueue(Queue): # Set a "buildable" callback for an element not yet ready # to be processed in the build queue. element._set_buildable_callback(self._enqueue_element) + + @staticmethod + def _assemble_element(element): + return element._assemble() diff --git a/src/buildstream/_scheduler/queues/fetchqueue.py b/src/buildstream/_scheduler/queues/fetchqueue.py index bbb3b3d78..3d0f80342 100644 --- a/src/buildstream/_scheduler/queues/fetchqueue.py +++ b/src/buildstream/_scheduler/queues/fetchqueue.py @@ -41,8 +41,11 @@ class FetchQueue(Queue): self._skip_cached = skip_cached self._fetch_original = fetch_original - def process(self, element): - element._fetch(fetch_original=self._fetch_original) + def get_process_func(self): + if self._fetch_original: + return FetchQueue._fetch_original + else: + return FetchQueue._fetch_no_original def status(self, element): # Optionally skip elements that are already in the artifact cache @@ -78,3 +81,11 @@ class FetchQueue(Queue): # Set a "can_query_cache" callback for an element not yet ready # to be processed in the fetch queue. element._set_can_query_cache_callback(self._enqueue_element) + + @staticmethod + def _fetch_no_original(element): + element._fetch(fetch_original=False) + + @staticmethod + def _fetch_original(element): + element._fetch(fetch_original=True) diff --git a/src/buildstream/_scheduler/queues/pullqueue.py b/src/buildstream/_scheduler/queues/pullqueue.py index 245293342..dfb00aa21 100644 --- a/src/buildstream/_scheduler/queues/pullqueue.py +++ b/src/buildstream/_scheduler/queues/pullqueue.py @@ -33,10 +33,8 @@ class PullQueue(Queue): complete_name = "Pulled" resources = [ResourceType.DOWNLOAD, ResourceType.CACHE] - def process(self, element): - # returns whether an artifact was downloaded or not - if not element._pull(): - raise SkipJob(self.action_name) + def get_process_func(self): + return PullQueue._raise_skip_if_not_pulled def status(self, element): if not element._can_query_cache(): @@ -65,3 +63,8 @@ class PullQueue(Queue): # immediately ready to query the artifact cache so that it # may be pulled. element._set_can_query_cache_callback(self._enqueue_element) + + @staticmethod + def _raise_skip_if_not_pulled(element): + if not element._pull(): + raise SkipJob(PullQueue.action_name) diff --git a/src/buildstream/_scheduler/queues/queue.py b/src/buildstream/_scheduler/queues/queue.py index 7c577e7bd..f2cefd5d2 100644 --- a/src/buildstream/_scheduler/queues/queue.py +++ b/src/buildstream/_scheduler/queues/queue.py @@ -91,20 +91,25 @@ class Queue(): # Abstract Methods for Queue implementations # ##################################################### - # process() + # get_process_func() # - # Abstract method for processing an element + # Abstract method, returns a callable for processing an element. # - # Args: - # element (Element): An element to process + # The callable should fit the signature `process(element: Element) -> any`. # - # Returns: - # (any): An optional something to be returned - # for every element successfully processed + # Note that the callable may be executed in a child process, so the return + # value should be a simple object (must be pickle-able, i.e. strings, + # lists, dicts, numbers, but not Element instances). This is sent to back + # to the main process. # + # This method is the only way for a queue to affect elements, and so is + # not optional to implement. # - def process(self, element): - pass + # Returns: + # (Callable[[Element], Any]): The callable for processing elements. + # + def get_process_func(self): + raise NotImplementedError() # status() # @@ -218,7 +223,7 @@ class Queue(): ElementJob(self._scheduler, self.action_name, self._element_log_path(element), element=element, queue=self, - action_cb=self.process, + action_cb=self.get_process_func(), complete_cb=self._job_done, max_retries=self._max_retries) for element in ready diff --git a/src/buildstream/_scheduler/queues/sourcepushqueue.py b/src/buildstream/_scheduler/queues/sourcepushqueue.py index c38460e6a..f0926654c 100644 --- a/src/buildstream/_scheduler/queues/sourcepushqueue.py +++ b/src/buildstream/_scheduler/queues/sourcepushqueue.py @@ -30,13 +30,16 @@ class SourcePushQueue(Queue): complete_name = "Sources pushed" resources = [ResourceType.UPLOAD] - def process(self, element): - # Returns whether a source was pushed or not - if not element._source_push(): - raise SkipJob(self.action_name) + def get_process_func(self): + return SourcePushQueue._raise_skip_if_not_pushed def status(self, element): if element._skip_source_push(): return QueueStatus.SKIP return QueueStatus.READY + + @staticmethod + def _raise_skip_if_not_pushed(element): + if not element._source_push(): + raise SkipJob(SourcePushQueue.action_name) diff --git a/src/buildstream/_scheduler/queues/trackqueue.py b/src/buildstream/_scheduler/queues/trackqueue.py index 194bb7e1d..6bdf838f9 100644 --- a/src/buildstream/_scheduler/queues/trackqueue.py +++ b/src/buildstream/_scheduler/queues/trackqueue.py @@ -35,8 +35,8 @@ class TrackQueue(Queue): complete_name = "Tracked" resources = [ResourceType.DOWNLOAD] - def process(self, element): - return element._track() + def get_process_func(self): + return TrackQueue._track_element def status(self, element): # We can skip elements entirely if they have no sources. @@ -60,3 +60,7 @@ class TrackQueue(Queue): source._set_ref(new_ref, save=True) element._tracking_done() + + @staticmethod + def _track_element(element): + return element._track() |