summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJosh Smith <qinusty@gmail.com>2018-09-07 15:36:43 +0100
committerJosh Smith <qinusty@gmail.com>2018-09-19 10:25:21 +0100
commitca811a4d5905b61862c230e17925ab9152a78b16 (patch)
treea9a647cff50c300f1634848a6061f94916c1f3ad
parent72b5902157316e173de2eec5b3a2772283eec3c7 (diff)
downloadbuildstream-ca811a4d5905b61862c230e17925ab9152a78b16.tar.gz
Rework Skipped usage
The SKIPPED message type is now used to indicate the end of a task which was successful without having to perform the given task. This overhauls the use of `Queue.done()` and therefore queues do not need to provide a processed/skipped return value from `done()`. Instead this is replaced with the action of raising a `SkipJob` exception from within `Queue.process()`.
-rw-r--r--buildstream/_artifactcache/cascache.py4
-rw-r--r--buildstream/_exceptions.py9
-rw-r--r--buildstream/_scheduler/jobs/job.py30
-rw-r--r--buildstream/_scheduler/queues/buildqueue.py3
-rw-r--r--buildstream/_scheduler/queues/fetchqueue.py4
-rw-r--r--buildstream/_scheduler/queues/pullqueue.py8
-rw-r--r--buildstream/_scheduler/queues/pushqueue.py14
-rw-r--r--buildstream/_scheduler/queues/queue.py9
-rw-r--r--buildstream/_scheduler/queues/trackqueue.py11
-rw-r--r--tests/frontend/pull.py3
10 files changed, 50 insertions, 45 deletions
diff --git a/buildstream/_artifactcache/cascache.py b/buildstream/_artifactcache/cascache.py
index 9cf83a222..1a48c4065 100644
--- a/buildstream/_artifactcache/cascache.py
+++ b/buildstream/_artifactcache/cascache.py
@@ -253,7 +253,7 @@ class CASCache(ArtifactCache):
else:
self.context.message(Message(
None,
- MessageType.SKIPPED,
+ MessageType.INFO,
"Remote ({}) does not have {} cached".format(
remote.spec.url, element._get_brief_display_key())
))
@@ -344,7 +344,7 @@ class CASCache(ArtifactCache):
else:
self.context.message(Message(
None,
- MessageType.SKIPPED,
+ MessageType.INFO,
"Remote ({}) already has {} cached".format(
remote.spec.url, element._get_brief_display_key())
))
diff --git a/buildstream/_exceptions.py b/buildstream/_exceptions.py
index 6fe4f4847..19606776e 100644
--- a/buildstream/_exceptions.py
+++ b/buildstream/_exceptions.py
@@ -312,3 +312,12 @@ class StreamError(BstError):
class AppError(BstError):
def __init__(self, message, detail=None, reason=None):
super().__init__(message, detail=detail, domain=ErrorDomain.APP, reason=reason)
+
+
+# SkipJob
+#
+# Raised from a child process within a job when the job should be
+# considered skipped by the parent process.
+#
+class SkipJob(Exception):
+ pass
diff --git a/buildstream/_scheduler/jobs/job.py b/buildstream/_scheduler/jobs/job.py
index c55219b58..1c6b4a582 100644
--- a/buildstream/_scheduler/jobs/job.py
+++ b/buildstream/_scheduler/jobs/job.py
@@ -31,7 +31,7 @@ import multiprocessing
import psutil
# BuildStream toplevel imports
-from ..._exceptions import ImplError, BstError, set_last_task_error
+from ..._exceptions import ImplError, BstError, set_last_task_error, SkipJob
from ..._message import Message, MessageType, unconditional_messages
from ... import _signals, utils
@@ -40,6 +40,7 @@ from ... import _signals, utils
RC_OK = 0
RC_FAIL = 1
RC_PERM_FAIL = 2
+RC_SKIPPED = 3
# Used to distinguish between status messages and return values
@@ -117,7 +118,7 @@ class Job():
self._max_retries = max_retries # Maximum number of automatic retries
self._result = None # Return value of child action in the parent
self._tries = 0 # Try count, for retryable jobs
-
+ self._skipped_flag = False # Indicate whether the job was skipped.
# If False, a retry will not be attempted regardless of whether _tries is less than _max_retries.
#
self._retry_flag = True
@@ -277,6 +278,14 @@ class Job():
def set_task_id(self, task_id):
self._task_id = task_id
+ # skipped
+ #
+ # Returns:
+ # bool: True if the job was skipped while processing.
+ @property
+ def skipped(self):
+ return self._skipped_flag
+
#######################################################
# Abstract Methods #
#######################################################
@@ -398,6 +407,13 @@ class Job():
try:
# Try the task action
result = self.child_process()
+ except SkipJob as e:
+ elapsed = datetime.datetime.now() - starttime
+ self.message(MessageType.SKIPPED, str(e),
+ elapsed=elapsed, logfile=filename)
+
+ # Alert parent of skip by return code
+ self._child_shutdown(RC_SKIPPED)
except BstError as e:
elapsed = datetime.datetime.now() - starttime
self._retry_flag = e.temporary
@@ -545,14 +561,18 @@ class Job():
# We don't want to retry if we got OK or a permanent fail.
# This is set in _child_action but must also be set for the parent.
#
- self._retry_flag = returncode not in (RC_OK, RC_PERM_FAIL)
+ self._retry_flag = returncode == RC_FAIL
+
+ # Set the flag to alert Queue that this job skipped.
+ self._skipped_flag = returncode == RC_SKIPPED
if self._retry_flag and (self._tries <= self._max_retries) and not self._scheduler.terminated:
self.spawn()
return
- self.parent_complete(returncode == RC_OK, self._result)
- self._scheduler.job_completed(self, returncode == RC_OK)
+ success = returncode in (RC_OK, RC_SKIPPED)
+ self.parent_complete(success, self._result)
+ self._scheduler.job_completed(self, success)
# Force the deletion of the queue and process objects to try and clean up FDs
self._queue = self._process = None
diff --git a/buildstream/_scheduler/queues/buildqueue.py b/buildstream/_scheduler/queues/buildqueue.py
index 0c74b3698..39ed83a32 100644
--- a/buildstream/_scheduler/queues/buildqueue.py
+++ b/buildstream/_scheduler/queues/buildqueue.py
@@ -47,6 +47,7 @@ class BuildQueue(Queue):
to_queue.append(element)
continue
+ # XXX: Fix this, See https://mail.gnome.org/archives/buildstream-list/2018-September/msg00029.html
# Bypass queue processing entirely the first time it's tried.
self._tried.add(element)
_, description, detail = element._get_build_result()
@@ -113,5 +114,3 @@ class BuildQueue(Queue):
# This has to be done after _assemble_done, such that the
# element may register its cache key as required
self._check_cache_size(job, element, result)
-
- return True
diff --git a/buildstream/_scheduler/queues/fetchqueue.py b/buildstream/_scheduler/queues/fetchqueue.py
index bd90a13b6..446dbbd3b 100644
--- a/buildstream/_scheduler/queues/fetchqueue.py
+++ b/buildstream/_scheduler/queues/fetchqueue.py
@@ -72,11 +72,9 @@ class FetchQueue(Queue):
def done(self, _, element, result, success):
if not success:
- return False
+ return
element._update_state()
# Successful fetch, we must be CACHED now
assert element._get_consistency() == Consistency.CACHED
-
- return True
diff --git a/buildstream/_scheduler/queues/pullqueue.py b/buildstream/_scheduler/queues/pullqueue.py
index e18967cf4..2842c5e21 100644
--- a/buildstream/_scheduler/queues/pullqueue.py
+++ b/buildstream/_scheduler/queues/pullqueue.py
@@ -21,6 +21,7 @@
# Local imports
from . import Queue, QueueStatus
from ..resources import ResourceType
+from ..._exceptions import SkipJob
# A queue which pulls element artifacts
@@ -33,7 +34,8 @@ class PullQueue(Queue):
def process(self, element):
# returns whether an artifact was downloaded or not
- return element._pull()
+ if not element._pull():
+ raise SkipJob(self.action_name)
def status(self, element):
# state of dependencies may have changed, recalculate element state
@@ -63,7 +65,3 @@ class PullQueue(Queue):
# do not get an artifact size from pull jobs, we have to
# actually check the cache size.
self._scheduler.check_cache_size()
-
- # Element._pull() returns True if it downloaded an artifact,
- # here we want to appear skipped if we did not download.
- return result
diff --git a/buildstream/_scheduler/queues/pushqueue.py b/buildstream/_scheduler/queues/pushqueue.py
index 568e053d6..35532d23d 100644
--- a/buildstream/_scheduler/queues/pushqueue.py
+++ b/buildstream/_scheduler/queues/pushqueue.py
@@ -21,6 +21,7 @@
# Local imports
from . import Queue, QueueStatus
from ..resources import ResourceType
+from ..._exceptions import SkipJob
# A queue which pushes element artifacts
@@ -33,20 +34,11 @@ class PushQueue(Queue):
def process(self, element):
# returns whether an artifact was uploaded or not
- return element._push()
+ if not element._push():
+ raise SkipJob(self.action_name)
def status(self, element):
if element._skip_push():
return QueueStatus.SKIP
return QueueStatus.READY
-
- def done(self, _, element, result, success):
-
- if not success:
- return False
-
- # Element._push() returns True if it uploaded an artifact,
- # here we want to appear skipped if the remote already had
- # the artifact.
- return result
diff --git a/buildstream/_scheduler/queues/queue.py b/buildstream/_scheduler/queues/queue.py
index 472e033da..f058663a1 100644
--- a/buildstream/_scheduler/queues/queue.py
+++ b/buildstream/_scheduler/queues/queue.py
@@ -136,10 +136,6 @@ class Queue():
# success (bool): True if the process() implementation did not
# raise any exception
#
- # Returns:
- # (bool): True if the element should appear to be processsed,
- # Otherwise False will count the element as "skipped"
- #
def done(self, job, element, result, success):
pass
@@ -306,8 +302,7 @@ class Queue():
# and determine if it should be considered as processed
# or skipped.
try:
- processed = self.done(job, element, result, success)
-
+ self.done(job, element, result, success)
except BstError as e:
# Report error and mark as failed
@@ -337,7 +332,7 @@ class Queue():
self._done_queue.append(job)
if success:
- if processed:
+ if not job.skipped:
self.processed_elements.append(element)
else:
self.skipped_elements.append(element)
diff --git a/buildstream/_scheduler/queues/trackqueue.py b/buildstream/_scheduler/queues/trackqueue.py
index f443df3be..133655e14 100644
--- a/buildstream/_scheduler/queues/trackqueue.py
+++ b/buildstream/_scheduler/queues/trackqueue.py
@@ -51,18 +51,11 @@ class TrackQueue(Queue):
def done(self, _, element, result, success):
if not success:
- return False
-
- changed = False
+ return
# Set the new refs in the main process one by one as they complete
for unique_id, new_ref in result:
source = _plugin_lookup(unique_id)
- # We appear processed if at least one source has changed
- if source._save_ref(new_ref):
- changed = True
+ source._save_ref(new_ref)
element._tracking_done()
-
- # We'll appear as a skipped element if tracking resulted in no change
- return changed
diff --git a/tests/frontend/pull.py b/tests/frontend/pull.py
index ed9a9643e..c883e2030 100644
--- a/tests/frontend/pull.py
+++ b/tests/frontend/pull.py
@@ -356,4 +356,5 @@ def test_pull_missing_notifies_user(caplog, cli, tmpdir, datafiles):
assert not result.get_pulled_elements(), \
"No elements should have been pulled since the cache was empty"
- assert "SKIPPED Remote ({}) does not have".format(share.repo) in result.stderr
+ assert "INFO Remote ({}) does not have".format(share.repo) in result.stderr
+ assert "SKIPPED Pull" in result.stderr