summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJosh Smith <qinusty@gmail.com>2018-09-07 15:36:43 +0100
committerJosh Smith <qinusty@gmail.com>2018-09-12 14:56:56 +0100
commit851aac8dc7ac4901d049d13d993c83f2e1c09f14 (patch)
tree71efe5ac6c4cd9d7ef31b29a36abd5472bcd2cad
parentb3ffcdc8fcc8d150cf6d75acbc660379b3fc0fcb (diff)
downloadbuildstream-851aac8dc7ac4901d049d13d993c83f2e1c09f14.tar.gz
Rework Skipped usage
The SKIPPED message type is now used to indicate the end of a task which was successful without having to perform the given task. This overhauls the use of `Queue.done()` and therefore queues do not need to provide a processed/skipped return value from `done()`. Instead this is replaced with the action of raising a `SkipJob` exception from within `Queue.process()`.
-rw-r--r--buildstream/_artifactcache/cascache.py4
-rw-r--r--buildstream/_exceptions.py14
-rw-r--r--buildstream/_scheduler/jobs/job.py31
-rw-r--r--buildstream/_scheduler/queues/buildqueue.py9
-rw-r--r--buildstream/_scheduler/queues/fetchqueue.py13
-rw-r--r--buildstream/_scheduler/queues/pullqueue.py13
-rw-r--r--buildstream/_scheduler/queues/pushqueue.py18
-rw-r--r--buildstream/_scheduler/queues/queue.py35
-rw-r--r--buildstream/_scheduler/queues/trackqueue.py22
-rw-r--r--tests/frontend/pull.py3
10 files changed, 84 insertions, 78 deletions
diff --git a/buildstream/_artifactcache/cascache.py b/buildstream/_artifactcache/cascache.py
index a93ec01ea..60495a867 100644
--- a/buildstream/_artifactcache/cascache.py
+++ b/buildstream/_artifactcache/cascache.py
@@ -253,7 +253,7 @@ class CASCache(ArtifactCache):
else:
self.context.message(Message(
None,
- MessageType.SKIPPED,
+ MessageType.INFO,
"Remote ({}) does not have {} cached".format(
remote.spec.url, element._get_brief_display_key())
))
@@ -344,7 +344,7 @@ class CASCache(ArtifactCache):
else:
self.context.message(Message(
None,
- MessageType.SKIPPED,
+ MessageType.INFO,
"Remote ({}) already has {} cached".format(
remote.spec.url, element._get_brief_display_key())
))
diff --git a/buildstream/_exceptions.py b/buildstream/_exceptions.py
index 6fe4f4847..eb6cf4a39 100644
--- a/buildstream/_exceptions.py
+++ b/buildstream/_exceptions.py
@@ -312,3 +312,17 @@ class StreamError(BstError):
class AppError(BstError):
def __init__(self, message, detail=None, reason=None):
super().__init__(message, detail=detail, domain=ErrorDomain.APP, reason=reason)
+
+
+# SkipJob
+#
+# Raised from a child process within a job when the job should be
+# considered skipped by the parent process.
+#
+class SkipJob(Exception):
+ def __init__(self, *, detail=""):
+ super().__init__()
+ self._detail = detail
+
+ def __str__(self):
+ return self._detail
diff --git a/buildstream/_scheduler/jobs/job.py b/buildstream/_scheduler/jobs/job.py
index c55219b58..caf8fa926 100644
--- a/buildstream/_scheduler/jobs/job.py
+++ b/buildstream/_scheduler/jobs/job.py
@@ -31,7 +31,7 @@ import multiprocessing
import psutil
# BuildStream toplevel imports
-from ..._exceptions import ImplError, BstError, set_last_task_error
+from ..._exceptions import ImplError, BstError, set_last_task_error, SkipJob
from ..._message import Message, MessageType, unconditional_messages
from ... import _signals, utils
@@ -40,6 +40,7 @@ from ... import _signals, utils
RC_OK = 0
RC_FAIL = 1
RC_PERM_FAIL = 2
+RC_SKIPPED = 3
# Used to distinguish between status messages and return values
@@ -117,7 +118,7 @@ class Job():
self._max_retries = max_retries # Maximum number of automatic retries
self._result = None # Return value of child action in the parent
self._tries = 0 # Try count, for retryable jobs
-
+ self._skipped_flag = False # Indicate whether the job was skipped.
# If False, a retry will not be attempted regardless of whether _tries is less than _max_retries.
#
self._retry_flag = True
@@ -277,6 +278,14 @@ class Job():
def set_task_id(self, task_id):
self._task_id = task_id
+ # skipped
+ #
+ # Returns:
+ # bool: True if the job was skipped while processing.
+ @property
+ def skipped(self):
+ return self._skipped_flag
+
#######################################################
# Abstract Methods #
#######################################################
@@ -398,6 +407,13 @@ class Job():
try:
# Try the task action
result = self.child_process()
+ except SkipJob as e:
+ elapsed = datetime.datetime.now() - starttime
+ self.message(MessageType.SKIPPED, str(e),
+ elapsed=elapsed, logfile=filename)
+
+ # Alert parent of skip by return code
+ self._child_shutdown(RC_SKIPPED)
except BstError as e:
elapsed = datetime.datetime.now() - starttime
self._retry_flag = e.temporary
@@ -444,6 +460,7 @@ class Job():
self.message(MessageType.SUCCESS, self.action_name, elapsed=elapsed,
logfile=filename)
+ # XXX Verify below.
# Shutdown needs to stay outside of the above context manager,
# make sure we dont try to handle SIGTERM while the process
# is already busy in sys.exit()
@@ -545,14 +562,18 @@ class Job():
# We don't want to retry if we got OK or a permanent fail.
# This is set in _child_action but must also be set for the parent.
#
- self._retry_flag = returncode not in (RC_OK, RC_PERM_FAIL)
+ self._retry_flag = (returncode == RC_FAIL)
+
+ # Set the flag to alert Queue that this job skipped.
+ self._skipped_flag = (returncode == RC_SKIPPED)
if self._retry_flag and (self._tries <= self._max_retries) and not self._scheduler.terminated:
self.spawn()
return
- self.parent_complete(returncode == RC_OK, self._result)
- self._scheduler.job_completed(self, returncode == RC_OK)
+ success = (returncode in (RC_OK, RC_SKIPPED))
+ self.parent_complete(success, self._result)
+ self._scheduler.job_completed(self, success)
# Force the deletion of the queue and process objects to try and clean up FDs
self._queue = self._process = None
diff --git a/buildstream/_scheduler/queues/buildqueue.py b/buildstream/_scheduler/queues/buildqueue.py
index 6e7ce04aa..dc11b58d7 100644
--- a/buildstream/_scheduler/queues/buildqueue.py
+++ b/buildstream/_scheduler/queues/buildqueue.py
@@ -55,12 +55,7 @@ class BuildQueue(Queue):
detail=detail, action_name=self.action_name,
elapsed=timedelta(seconds=0),
logfile=logfile)
- job = ElementJob(self._scheduler, self.action_name,
- logfile, element=element, queue=self,
- resources=self.resources,
- action_cb=self.process,
- complete_cb=self._job_done,
- max_retries=self._max_retries)
+ job = self._create_job(element)
self._done_queue.append(job)
self.failed_elements.append(element)
self._scheduler._job_complete_callback(job, False)
@@ -113,5 +108,3 @@ class BuildQueue(Queue):
# This has to be done after _assemble_done, such that the
# element may register its cache key as required
self._check_cache_size(job, element, result)
-
- return True
diff --git a/buildstream/_scheduler/queues/fetchqueue.py b/buildstream/_scheduler/queues/fetchqueue.py
index bd90a13b6..df3aca965 100644
--- a/buildstream/_scheduler/queues/fetchqueue.py
+++ b/buildstream/_scheduler/queues/fetchqueue.py
@@ -70,13 +70,8 @@ class FetchQueue(Queue):
return QueueStatus.READY
def done(self, _, element, result, success):
+ if success:
+ element._update_state()
- if not success:
- return False
-
- element._update_state()
-
- # Successful fetch, we must be CACHED now
- assert element._get_consistency() == Consistency.CACHED
-
- return True
+ # Successful fetch, we must be CACHED now
+ assert element._get_consistency() == Consistency.CACHED
diff --git a/buildstream/_scheduler/queues/pullqueue.py b/buildstream/_scheduler/queues/pullqueue.py
index e18967cf4..411f07c49 100644
--- a/buildstream/_scheduler/queues/pullqueue.py
+++ b/buildstream/_scheduler/queues/pullqueue.py
@@ -21,6 +21,7 @@
# Local imports
from . import Queue, QueueStatus
from ..resources import ResourceType
+from ..._exceptions import SkipJob
# A queue which pulls element artifacts
@@ -33,7 +34,12 @@ class PullQueue(Queue):
def process(self, element):
# returns whether an artifact was downloaded or not
- return element._pull()
+ pulled = element._pull()
+
+ if not pulled:
+ raise SkipJob(detail=self.action_name)
+
+ return pulled
def status(self, element):
# state of dependencies may have changed, recalculate element state
@@ -53,7 +59,6 @@ class PullQueue(Queue):
return QueueStatus.SKIP
def done(self, _, element, result, success):
-
if not success:
return False
@@ -63,7 +68,3 @@ class PullQueue(Queue):
# do not get an artifact size from pull jobs, we have to
# actually check the cache size.
self._scheduler.check_cache_size()
-
- # Element._pull() returns True if it downloaded an artifact,
- # here we want to appear skipped if we did not download.
- return result
diff --git a/buildstream/_scheduler/queues/pushqueue.py b/buildstream/_scheduler/queues/pushqueue.py
index 568e053d6..26ca4c973 100644
--- a/buildstream/_scheduler/queues/pushqueue.py
+++ b/buildstream/_scheduler/queues/pushqueue.py
@@ -21,6 +21,7 @@
# Local imports
from . import Queue, QueueStatus
from ..resources import ResourceType
+from ..._exceptions import SkipJob
# A queue which pushes element artifacts
@@ -33,20 +34,15 @@ class PushQueue(Queue):
def process(self, element):
# returns whether an artifact was uploaded or not
- return element._push()
+ pushed = element._push()
+
+ if not pushed:
+ raise SkipJob(detail=self.action_name)
+
+ return pushed
def status(self, element):
if element._skip_push():
return QueueStatus.SKIP
return QueueStatus.READY
-
- def done(self, _, element, result, success):
-
- if not success:
- return False
-
- # Element._push() returns True if it uploaded an artifact,
- # here we want to appear skipped if the remote already had
- # the artifact.
- return result
diff --git a/buildstream/_scheduler/queues/queue.py b/buildstream/_scheduler/queues/queue.py
index 472e033da..730aeb0b6 100644
--- a/buildstream/_scheduler/queues/queue.py
+++ b/buildstream/_scheduler/queues/queue.py
@@ -136,10 +136,6 @@ class Queue():
# success (bool): True if the process() implementation did not
# raise any exception
#
- # Returns:
- # (bool): True if the element should appear to be processsed,
- # Otherwise False will count the element as "skipped"
- #
def done(self, job, element, result, success):
pass
@@ -158,20 +154,8 @@ class Queue():
if not elts:
return
- # Note: The internal lists work with jobs. This is not
- # reflected in any external methods (except
- # pop/peek_ready_jobs).
- def create_job(element):
- logfile = self._element_log_path(element)
- return ElementJob(self._scheduler, self.action_name,
- logfile, element=element, queue=self,
- resources=self.resources,
- action_cb=self.process,
- complete_cb=self._job_done,
- max_retries=self._max_retries)
-
# Place skipped elements directly on the done queue
- jobs = [create_job(elt) for elt in elts]
+ jobs = [self._create_job(elt) for elt in elts]
skip = [job for job in jobs if self.status(job.element) == QueueStatus.SKIP]
wait = [job for job in jobs if job not in skip]
@@ -306,8 +290,7 @@ class Queue():
# and determine if it should be considered as processed
# or skipped.
try:
- processed = self.done(job, element, result, success)
-
+ self.done(job, element, result, success)
except BstError as e:
# Report error and mark as failed
@@ -337,7 +320,7 @@ class Queue():
self._done_queue.append(job)
if success:
- if processed:
+ if not job.skipped:
self.processed_elements.append(element)
else:
self.skipped_elements.append(element)
@@ -358,3 +341,15 @@ class Queue():
logfile = "{key}-{action}".format(key=key, action=action)
return os.path.join(project.name, element.normal_name, logfile)
+
+ # Note: The internal lists work with jobs. This is not
+ # reflected in any external methods (except
+ # pop/peek_ready_jobs).
+ def _create_job(self, element):
+ logfile = self._element_log_path(element)
+ return ElementJob(self._scheduler, self.action_name,
+ logfile, element=element, queue=self,
+ resources=self.resources,
+ action_cb=self.process,
+ complete_cb=self._job_done,
+ max_retries=self._max_retries)
diff --git a/buildstream/_scheduler/queues/trackqueue.py b/buildstream/_scheduler/queues/trackqueue.py
index f443df3be..ae79e1411 100644
--- a/buildstream/_scheduler/queues/trackqueue.py
+++ b/buildstream/_scheduler/queues/trackqueue.py
@@ -49,20 +49,10 @@ class TrackQueue(Queue):
return QueueStatus.READY
def done(self, _, element, result, success):
+ if success:
+ # Set the new refs in the main process one by one as they complete
+ for unique_id, new_ref in result:
+ source = _plugin_lookup(unique_id)
+ source._save_ref(new_ref)
- if not success:
- return False
-
- changed = False
-
- # Set the new refs in the main process one by one as they complete
- for unique_id, new_ref in result:
- source = _plugin_lookup(unique_id)
- # We appear processed if at least one source has changed
- if source._save_ref(new_ref):
- changed = True
-
- element._tracking_done()
-
- # We'll appear as a skipped element if tracking resulted in no change
- return changed
+ element._tracking_done()
diff --git a/tests/frontend/pull.py b/tests/frontend/pull.py
index ed9a9643e..c883e2030 100644
--- a/tests/frontend/pull.py
+++ b/tests/frontend/pull.py
@@ -356,4 +356,5 @@ def test_pull_missing_notifies_user(caplog, cli, tmpdir, datafiles):
assert not result.get_pulled_elements(), \
"No elements should have been pulled since the cache was empty"
- assert "SKIPPED Remote ({}) does not have".format(share.repo) in result.stderr
+ assert "INFO Remote ({}) does not have".format(share.repo) in result.stderr
+ assert "SKIPPED Pull" in result.stderr