diff options
author | Josh Smith <qinusty@gmail.com> | 2018-09-07 15:36:43 +0100 |
---|---|---|
committer | Josh Smith <qinusty@gmail.com> | 2018-09-12 14:56:56 +0100 |
commit | 851aac8dc7ac4901d049d13d993c83f2e1c09f14 (patch) | |
tree | 71efe5ac6c4cd9d7ef31b29a36abd5472bcd2cad | |
parent | b3ffcdc8fcc8d150cf6d75acbc660379b3fc0fcb (diff) | |
download | buildstream-851aac8dc7ac4901d049d13d993c83f2e1c09f14.tar.gz |
Rework Skipped usage
The SKIPPED message type is now used to indicate the end of a task which
was successful without having to perform the given task.
This overhauls the use of `Queue.done()` and therefore queues do not
need to provide a processed/skipped return value from `done()`. Instead
this is replaced with the action of raising a `SkipJob` exception from
within `Queue.process()`.
-rw-r--r-- | buildstream/_artifactcache/cascache.py | 4 | ||||
-rw-r--r-- | buildstream/_exceptions.py | 14 | ||||
-rw-r--r-- | buildstream/_scheduler/jobs/job.py | 31 | ||||
-rw-r--r-- | buildstream/_scheduler/queues/buildqueue.py | 9 | ||||
-rw-r--r-- | buildstream/_scheduler/queues/fetchqueue.py | 13 | ||||
-rw-r--r-- | buildstream/_scheduler/queues/pullqueue.py | 13 | ||||
-rw-r--r-- | buildstream/_scheduler/queues/pushqueue.py | 18 | ||||
-rw-r--r-- | buildstream/_scheduler/queues/queue.py | 35 | ||||
-rw-r--r-- | buildstream/_scheduler/queues/trackqueue.py | 22 | ||||
-rw-r--r-- | tests/frontend/pull.py | 3 |
10 files changed, 84 insertions, 78 deletions
diff --git a/buildstream/_artifactcache/cascache.py b/buildstream/_artifactcache/cascache.py index a93ec01ea..60495a867 100644 --- a/buildstream/_artifactcache/cascache.py +++ b/buildstream/_artifactcache/cascache.py @@ -253,7 +253,7 @@ class CASCache(ArtifactCache): else: self.context.message(Message( None, - MessageType.SKIPPED, + MessageType.INFO, "Remote ({}) does not have {} cached".format( remote.spec.url, element._get_brief_display_key()) )) @@ -344,7 +344,7 @@ class CASCache(ArtifactCache): else: self.context.message(Message( None, - MessageType.SKIPPED, + MessageType.INFO, "Remote ({}) already has {} cached".format( remote.spec.url, element._get_brief_display_key()) )) diff --git a/buildstream/_exceptions.py b/buildstream/_exceptions.py index 6fe4f4847..eb6cf4a39 100644 --- a/buildstream/_exceptions.py +++ b/buildstream/_exceptions.py @@ -312,3 +312,17 @@ class StreamError(BstError): class AppError(BstError): def __init__(self, message, detail=None, reason=None): super().__init__(message, detail=detail, domain=ErrorDomain.APP, reason=reason) + + +# SkipJob +# +# Raised from a child process within a job when the job should be +# considered skipped by the parent process. +# +class SkipJob(Exception): + def __init__(self, *, detail=""): + super().__init__() + self._detail = detail + + def __str__(self): + return self._detail diff --git a/buildstream/_scheduler/jobs/job.py b/buildstream/_scheduler/jobs/job.py index c55219b58..caf8fa926 100644 --- a/buildstream/_scheduler/jobs/job.py +++ b/buildstream/_scheduler/jobs/job.py @@ -31,7 +31,7 @@ import multiprocessing import psutil # BuildStream toplevel imports -from ..._exceptions import ImplError, BstError, set_last_task_error +from ..._exceptions import ImplError, BstError, set_last_task_error, SkipJob from ..._message import Message, MessageType, unconditional_messages from ... import _signals, utils @@ -40,6 +40,7 @@ from ... import _signals, utils RC_OK = 0 RC_FAIL = 1 RC_PERM_FAIL = 2 +RC_SKIPPED = 3 # Used to distinguish between status messages and return values @@ -117,7 +118,7 @@ class Job(): self._max_retries = max_retries # Maximum number of automatic retries self._result = None # Return value of child action in the parent self._tries = 0 # Try count, for retryable jobs - + self._skipped_flag = False # Indicate whether the job was skipped. # If False, a retry will not be attempted regardless of whether _tries is less than _max_retries. # self._retry_flag = True @@ -277,6 +278,14 @@ class Job(): def set_task_id(self, task_id): self._task_id = task_id + # skipped + # + # Returns: + # bool: True if the job was skipped while processing. + @property + def skipped(self): + return self._skipped_flag + ####################################################### # Abstract Methods # ####################################################### @@ -398,6 +407,13 @@ class Job(): try: # Try the task action result = self.child_process() + except SkipJob as e: + elapsed = datetime.datetime.now() - starttime + self.message(MessageType.SKIPPED, str(e), + elapsed=elapsed, logfile=filename) + + # Alert parent of skip by return code + self._child_shutdown(RC_SKIPPED) except BstError as e: elapsed = datetime.datetime.now() - starttime self._retry_flag = e.temporary @@ -444,6 +460,7 @@ class Job(): self.message(MessageType.SUCCESS, self.action_name, elapsed=elapsed, logfile=filename) + # XXX Verify below. # Shutdown needs to stay outside of the above context manager, # make sure we dont try to handle SIGTERM while the process # is already busy in sys.exit() @@ -545,14 +562,18 @@ class Job(): # We don't want to retry if we got OK or a permanent fail. # This is set in _child_action but must also be set for the parent. # - self._retry_flag = returncode not in (RC_OK, RC_PERM_FAIL) + self._retry_flag = (returncode == RC_FAIL) + + # Set the flag to alert Queue that this job skipped. + self._skipped_flag = (returncode == RC_SKIPPED) if self._retry_flag and (self._tries <= self._max_retries) and not self._scheduler.terminated: self.spawn() return - self.parent_complete(returncode == RC_OK, self._result) - self._scheduler.job_completed(self, returncode == RC_OK) + success = (returncode in (RC_OK, RC_SKIPPED)) + self.parent_complete(success, self._result) + self._scheduler.job_completed(self, success) # Force the deletion of the queue and process objects to try and clean up FDs self._queue = self._process = None diff --git a/buildstream/_scheduler/queues/buildqueue.py b/buildstream/_scheduler/queues/buildqueue.py index 6e7ce04aa..dc11b58d7 100644 --- a/buildstream/_scheduler/queues/buildqueue.py +++ b/buildstream/_scheduler/queues/buildqueue.py @@ -55,12 +55,7 @@ class BuildQueue(Queue): detail=detail, action_name=self.action_name, elapsed=timedelta(seconds=0), logfile=logfile) - job = ElementJob(self._scheduler, self.action_name, - logfile, element=element, queue=self, - resources=self.resources, - action_cb=self.process, - complete_cb=self._job_done, - max_retries=self._max_retries) + job = self._create_job(element) self._done_queue.append(job) self.failed_elements.append(element) self._scheduler._job_complete_callback(job, False) @@ -113,5 +108,3 @@ class BuildQueue(Queue): # This has to be done after _assemble_done, such that the # element may register its cache key as required self._check_cache_size(job, element, result) - - return True diff --git a/buildstream/_scheduler/queues/fetchqueue.py b/buildstream/_scheduler/queues/fetchqueue.py index bd90a13b6..df3aca965 100644 --- a/buildstream/_scheduler/queues/fetchqueue.py +++ b/buildstream/_scheduler/queues/fetchqueue.py @@ -70,13 +70,8 @@ class FetchQueue(Queue): return QueueStatus.READY def done(self, _, element, result, success): + if success: + element._update_state() - if not success: - return False - - element._update_state() - - # Successful fetch, we must be CACHED now - assert element._get_consistency() == Consistency.CACHED - - return True + # Successful fetch, we must be CACHED now + assert element._get_consistency() == Consistency.CACHED diff --git a/buildstream/_scheduler/queues/pullqueue.py b/buildstream/_scheduler/queues/pullqueue.py index e18967cf4..411f07c49 100644 --- a/buildstream/_scheduler/queues/pullqueue.py +++ b/buildstream/_scheduler/queues/pullqueue.py @@ -21,6 +21,7 @@ # Local imports from . import Queue, QueueStatus from ..resources import ResourceType +from ..._exceptions import SkipJob # A queue which pulls element artifacts @@ -33,7 +34,12 @@ class PullQueue(Queue): def process(self, element): # returns whether an artifact was downloaded or not - return element._pull() + pulled = element._pull() + + if not pulled: + raise SkipJob(detail=self.action_name) + + return pulled def status(self, element): # state of dependencies may have changed, recalculate element state @@ -53,7 +59,6 @@ class PullQueue(Queue): return QueueStatus.SKIP def done(self, _, element, result, success): - if not success: return False @@ -63,7 +68,3 @@ class PullQueue(Queue): # do not get an artifact size from pull jobs, we have to # actually check the cache size. self._scheduler.check_cache_size() - - # Element._pull() returns True if it downloaded an artifact, - # here we want to appear skipped if we did not download. - return result diff --git a/buildstream/_scheduler/queues/pushqueue.py b/buildstream/_scheduler/queues/pushqueue.py index 568e053d6..26ca4c973 100644 --- a/buildstream/_scheduler/queues/pushqueue.py +++ b/buildstream/_scheduler/queues/pushqueue.py @@ -21,6 +21,7 @@ # Local imports from . import Queue, QueueStatus from ..resources import ResourceType +from ..._exceptions import SkipJob # A queue which pushes element artifacts @@ -33,20 +34,15 @@ class PushQueue(Queue): def process(self, element): # returns whether an artifact was uploaded or not - return element._push() + pushed = element._push() + + if not pushed: + raise SkipJob(detail=self.action_name) + + return pushed def status(self, element): if element._skip_push(): return QueueStatus.SKIP return QueueStatus.READY - - def done(self, _, element, result, success): - - if not success: - return False - - # Element._push() returns True if it uploaded an artifact, - # here we want to appear skipped if the remote already had - # the artifact. - return result diff --git a/buildstream/_scheduler/queues/queue.py b/buildstream/_scheduler/queues/queue.py index 472e033da..730aeb0b6 100644 --- a/buildstream/_scheduler/queues/queue.py +++ b/buildstream/_scheduler/queues/queue.py @@ -136,10 +136,6 @@ class Queue(): # success (bool): True if the process() implementation did not # raise any exception # - # Returns: - # (bool): True if the element should appear to be processsed, - # Otherwise False will count the element as "skipped" - # def done(self, job, element, result, success): pass @@ -158,20 +154,8 @@ class Queue(): if not elts: return - # Note: The internal lists work with jobs. This is not - # reflected in any external methods (except - # pop/peek_ready_jobs). - def create_job(element): - logfile = self._element_log_path(element) - return ElementJob(self._scheduler, self.action_name, - logfile, element=element, queue=self, - resources=self.resources, - action_cb=self.process, - complete_cb=self._job_done, - max_retries=self._max_retries) - # Place skipped elements directly on the done queue - jobs = [create_job(elt) for elt in elts] + jobs = [self._create_job(elt) for elt in elts] skip = [job for job in jobs if self.status(job.element) == QueueStatus.SKIP] wait = [job for job in jobs if job not in skip] @@ -306,8 +290,7 @@ class Queue(): # and determine if it should be considered as processed # or skipped. try: - processed = self.done(job, element, result, success) - + self.done(job, element, result, success) except BstError as e: # Report error and mark as failed @@ -337,7 +320,7 @@ class Queue(): self._done_queue.append(job) if success: - if processed: + if not job.skipped: self.processed_elements.append(element) else: self.skipped_elements.append(element) @@ -358,3 +341,15 @@ class Queue(): logfile = "{key}-{action}".format(key=key, action=action) return os.path.join(project.name, element.normal_name, logfile) + + # Note: The internal lists work with jobs. This is not + # reflected in any external methods (except + # pop/peek_ready_jobs). + def _create_job(self, element): + logfile = self._element_log_path(element) + return ElementJob(self._scheduler, self.action_name, + logfile, element=element, queue=self, + resources=self.resources, + action_cb=self.process, + complete_cb=self._job_done, + max_retries=self._max_retries) diff --git a/buildstream/_scheduler/queues/trackqueue.py b/buildstream/_scheduler/queues/trackqueue.py index f443df3be..ae79e1411 100644 --- a/buildstream/_scheduler/queues/trackqueue.py +++ b/buildstream/_scheduler/queues/trackqueue.py @@ -49,20 +49,10 @@ class TrackQueue(Queue): return QueueStatus.READY def done(self, _, element, result, success): + if success: + # Set the new refs in the main process one by one as they complete + for unique_id, new_ref in result: + source = _plugin_lookup(unique_id) + source._save_ref(new_ref) - if not success: - return False - - changed = False - - # Set the new refs in the main process one by one as they complete - for unique_id, new_ref in result: - source = _plugin_lookup(unique_id) - # We appear processed if at least one source has changed - if source._save_ref(new_ref): - changed = True - - element._tracking_done() - - # We'll appear as a skipped element if tracking resulted in no change - return changed + element._tracking_done() diff --git a/tests/frontend/pull.py b/tests/frontend/pull.py index ed9a9643e..c883e2030 100644 --- a/tests/frontend/pull.py +++ b/tests/frontend/pull.py @@ -356,4 +356,5 @@ def test_pull_missing_notifies_user(caplog, cli, tmpdir, datafiles): assert not result.get_pulled_elements(), \ "No elements should have been pulled since the cache was empty" - assert "SKIPPED Remote ({}) does not have".format(share.repo) in result.stderr + assert "INFO Remote ({}) does not have".format(share.repo) in result.stderr + assert "SKIPPED Pull" in result.stderr |