summaryrefslogtreecommitdiff
path: root/buildstream/_scheduler
diff options
context:
space:
mode:
Diffstat (limited to 'buildstream/_scheduler')
-rw-r--r--buildstream/_scheduler/jobs/job.py30
-rw-r--r--buildstream/_scheduler/queues/buildqueue.py2
-rw-r--r--buildstream/_scheduler/queues/fetchqueue.py4
-rw-r--r--buildstream/_scheduler/queues/pullqueue.py8
-rw-r--r--buildstream/_scheduler/queues/pushqueue.py14
-rw-r--r--buildstream/_scheduler/queues/queue.py9
-rw-r--r--buildstream/_scheduler/queues/trackqueue.py11
7 files changed, 36 insertions, 42 deletions
diff --git a/buildstream/_scheduler/jobs/job.py b/buildstream/_scheduler/jobs/job.py
index 165c7c83f..d77fa0c82 100644
--- a/buildstream/_scheduler/jobs/job.py
+++ b/buildstream/_scheduler/jobs/job.py
@@ -31,7 +31,7 @@ import multiprocessing
import psutil
# BuildStream toplevel imports
-from ..._exceptions import ImplError, BstError, set_last_task_error
+from ..._exceptions import ImplError, BstError, set_last_task_error, SkipJob
from ..._message import Message, MessageType, unconditional_messages
from ... import _signals, utils
@@ -40,6 +40,7 @@ from ... import _signals, utils
RC_OK = 0
RC_FAIL = 1
RC_PERM_FAIL = 2
+RC_SKIPPED = 3
# Used to distinguish between status messages and return values
@@ -117,7 +118,7 @@ class Job():
self._max_retries = max_retries # Maximum number of automatic retries
self._result = None # Return value of child action in the parent
self._tries = 0 # Try count, for retryable jobs
-
+ self._skipped_flag = False # Indicate whether the job was skipped.
# If False, a retry will not be attempted regardless of whether _tries is less than _max_retries.
#
self._retry_flag = True
@@ -275,6 +276,14 @@ class Job():
def set_task_id(self, task_id):
self._task_id = task_id
+ # skipped
+ #
+ # Returns:
+ # bool: True if the job was skipped while processing.
+ @property
+ def skipped(self):
+ return self._skipped_flag
+
#######################################################
# Abstract Methods #
#######################################################
@@ -396,6 +405,13 @@ class Job():
try:
# Try the task action
result = self.child_process()
+ except SkipJob as e:
+ elapsed = datetime.datetime.now() - starttime
+ self.message(MessageType.SKIPPED, str(e),
+ elapsed=elapsed, logfile=filename)
+
+ # Alert parent of skip by return code
+ self._child_shutdown(RC_SKIPPED)
except BstError as e:
elapsed = datetime.datetime.now() - starttime
self._retry_flag = e.temporary
@@ -543,14 +559,18 @@ class Job():
# We don't want to retry if we got OK or a permanent fail.
# This is set in _child_action but must also be set for the parent.
#
- self._retry_flag = returncode not in (RC_OK, RC_PERM_FAIL)
+ self._retry_flag = returncode == RC_FAIL
+
+ # Set the flag to alert Queue that this job skipped.
+ self._skipped_flag = returncode == RC_SKIPPED
if self._retry_flag and (self._tries <= self._max_retries) and not self._scheduler.terminated:
self.spawn()
return
- self.parent_complete(returncode == RC_OK, self._result)
- self._scheduler.job_completed(self, returncode == RC_OK)
+ success = returncode in (RC_OK, RC_SKIPPED)
+ self.parent_complete(success, self._result)
+ self._scheduler.job_completed(self, success)
# _parent_process_envelope()
#
diff --git a/buildstream/_scheduler/queues/buildqueue.py b/buildstream/_scheduler/queues/buildqueue.py
index d3d2fad3e..90e3ad792 100644
--- a/buildstream/_scheduler/queues/buildqueue.py
+++ b/buildstream/_scheduler/queues/buildqueue.py
@@ -77,5 +77,3 @@ class BuildQueue(Queue):
# This has to be done after _assemble_done, such that the
# element may register its cache key as required
self._check_cache_size(job, element, result)
-
- return True
diff --git a/buildstream/_scheduler/queues/fetchqueue.py b/buildstream/_scheduler/queues/fetchqueue.py
index 265890b7a..114790c05 100644
--- a/buildstream/_scheduler/queues/fetchqueue.py
+++ b/buildstream/_scheduler/queues/fetchqueue.py
@@ -70,11 +70,9 @@ class FetchQueue(Queue):
def done(self, _, element, result, success):
if not success:
- return False
+ return
element._update_state()
# Successful fetch, we must be CACHED now
assert element._get_consistency() == Consistency.CACHED
-
- return True
diff --git a/buildstream/_scheduler/queues/pullqueue.py b/buildstream/_scheduler/queues/pullqueue.py
index e18967cf4..2842c5e21 100644
--- a/buildstream/_scheduler/queues/pullqueue.py
+++ b/buildstream/_scheduler/queues/pullqueue.py
@@ -21,6 +21,7 @@
# Local imports
from . import Queue, QueueStatus
from ..resources import ResourceType
+from ..._exceptions import SkipJob
# A queue which pulls element artifacts
@@ -33,7 +34,8 @@ class PullQueue(Queue):
def process(self, element):
# returns whether an artifact was downloaded or not
- return element._pull()
+ if not element._pull():
+ raise SkipJob(self.action_name)
def status(self, element):
# state of dependencies may have changed, recalculate element state
@@ -63,7 +65,3 @@ class PullQueue(Queue):
# do not get an artifact size from pull jobs, we have to
# actually check the cache size.
self._scheduler.check_cache_size()
-
- # Element._pull() returns True if it downloaded an artifact,
- # here we want to appear skipped if we did not download.
- return result
diff --git a/buildstream/_scheduler/queues/pushqueue.py b/buildstream/_scheduler/queues/pushqueue.py
index 568e053d6..35532d23d 100644
--- a/buildstream/_scheduler/queues/pushqueue.py
+++ b/buildstream/_scheduler/queues/pushqueue.py
@@ -21,6 +21,7 @@
# Local imports
from . import Queue, QueueStatus
from ..resources import ResourceType
+from ..._exceptions import SkipJob
# A queue which pushes element artifacts
@@ -33,20 +34,11 @@ class PushQueue(Queue):
def process(self, element):
# returns whether an artifact was uploaded or not
- return element._push()
+ if not element._push():
+ raise SkipJob(self.action_name)
def status(self, element):
if element._skip_push():
return QueueStatus.SKIP
return QueueStatus.READY
-
- def done(self, _, element, result, success):
-
- if not success:
- return False
-
- # Element._push() returns True if it uploaded an artifact,
- # here we want to appear skipped if the remote already had
- # the artifact.
- return result
diff --git a/buildstream/_scheduler/queues/queue.py b/buildstream/_scheduler/queues/queue.py
index 2f875881f..15467ca67 100644
--- a/buildstream/_scheduler/queues/queue.py
+++ b/buildstream/_scheduler/queues/queue.py
@@ -136,10 +136,6 @@ class Queue():
# success (bool): True if the process() implementation did not
# raise any exception
#
- # Returns:
- # (bool): True if the element should appear to be processsed,
- # Otherwise False will count the element as "skipped"
- #
def done(self, job, element, result, success):
pass
@@ -305,8 +301,7 @@ class Queue():
# and determine if it should be considered as processed
# or skipped.
try:
- processed = self.done(job, element, result, success)
-
+ self.done(job, element, result, success)
except BstError as e:
# Report error and mark as failed
@@ -335,7 +330,7 @@ class Queue():
#
if success:
self._done_queue.append(job)
- if processed:
+ if not job.skipped:
self.processed_elements.append(element)
else:
self.skipped_elements.append(element)
diff --git a/buildstream/_scheduler/queues/trackqueue.py b/buildstream/_scheduler/queues/trackqueue.py
index f443df3be..133655e14 100644
--- a/buildstream/_scheduler/queues/trackqueue.py
+++ b/buildstream/_scheduler/queues/trackqueue.py
@@ -51,18 +51,11 @@ class TrackQueue(Queue):
def done(self, _, element, result, success):
if not success:
- return False
-
- changed = False
+ return
# Set the new refs in the main process one by one as they complete
for unique_id, new_ref in result:
source = _plugin_lookup(unique_id)
- # We appear processed if at least one source has changed
- if source._save_ref(new_ref):
- changed = True
+ source._save_ref(new_ref)
element._tracking_done()
-
- # We'll appear as a skipped element if tracking resulted in no change
- return changed