diff options
author | Qinusty <jrsmith9822@gmail.com> | 2018-09-19 10:59:48 +0000 |
---|---|---|
committer | Qinusty <jrsmith9822@gmail.com> | 2018-09-19 10:59:48 +0000 |
commit | dddd6025705d4553f857695bb2b5e6bde6943556 (patch) | |
tree | 61013f4d95a36ac8b1c9603e539808a1c740adb7 | |
parent | 72b5902157316e173de2eec5b3a2772283eec3c7 (diff) | |
parent | 213f77946c661d685ec6e74d5d243fbbb4549b79 (diff) | |
download | buildstream-dddd6025705d4553f857695bb2b5e6bde6943556.tar.gz |
Merge branch 'Qinusty/skipped-rework' into 'master'
Add SkipJob for indicating a skipped activity
See merge request BuildStream/buildstream!765
-rw-r--r-- | buildstream/_artifactcache/cascache.py | 24 | ||||
-rw-r--r-- | buildstream/_exceptions.py | 9 | ||||
-rw-r--r-- | buildstream/_scheduler/jobs/job.py | 30 | ||||
-rw-r--r-- | buildstream/_scheduler/queues/buildqueue.py | 3 | ||||
-rw-r--r-- | buildstream/_scheduler/queues/fetchqueue.py | 4 | ||||
-rw-r--r-- | buildstream/_scheduler/queues/pullqueue.py | 8 | ||||
-rw-r--r-- | buildstream/_scheduler/queues/pushqueue.py | 14 | ||||
-rw-r--r-- | buildstream/_scheduler/queues/queue.py | 9 | ||||
-rw-r--r-- | buildstream/_scheduler/queues/trackqueue.py | 11 | ||||
-rw-r--r-- | buildstream/element.py | 17 | ||||
-rw-r--r-- | tests/frontend/pull.py | 3 | ||||
-rw-r--r-- | tests/frontend/push.py | 23 | ||||
-rw-r--r-- | tests/testutils/runcli.py | 2 |
13 files changed, 88 insertions, 69 deletions
diff --git a/buildstream/_artifactcache/cascache.py b/buildstream/_artifactcache/cascache.py index 9cf83a222..840e190f1 100644 --- a/buildstream/_artifactcache/cascache.py +++ b/buildstream/_artifactcache/cascache.py @@ -228,8 +228,8 @@ class CASCache(ArtifactCache): for remote in self._remotes[project]: try: remote.init() - - element.info("Pulling {} <- {}".format(element._get_brief_display_key(), remote.spec.url)) + display_key = element._get_brief_display_key() + element.status("Pulling artifact {} <- {}".format(display_key, remote.spec.url)) request = buildstream_pb2.GetReferenceRequest() request.key = ref @@ -243,6 +243,7 @@ class CASCache(ArtifactCache): self.set_ref(ref, tree) + element.info("Pulled artifact {} <- {}".format(display_key, remote.spec.url)) # no need to pull from additional remotes return True @@ -251,11 +252,8 @@ class CASCache(ArtifactCache): raise ArtifactError("Failed to pull artifact {}: {}".format( element._get_brief_display_key(), e)) from e else: - self.context.message(Message( - None, - MessageType.SKIPPED, - "Remote ({}) does not have {} cached".format( - remote.spec.url, element._get_brief_display_key()) + element.info("Remote ({}) does not have {} cached".format( + remote.spec.url, element._get_brief_display_key() )) return False @@ -336,17 +334,15 @@ class CASCache(ArtifactCache): for remote in push_remotes: remote.init() - - element.info("Pushing {} -> {}".format(element._get_brief_display_key(), remote.spec.url)) + display_key = element._get_brief_display_key() + element.status("Pushing artifact {} -> {}".format(display_key, remote.spec.url)) if self._push_refs_to_remote(refs, remote): + element.info("Pushed artifact {} -> {}".format(display_key, remote.spec.url)) pushed = True else: - self.context.message(Message( - None, - MessageType.SKIPPED, - "Remote ({}) already has {} cached".format( - remote.spec.url, element._get_brief_display_key()) + element.info("Remote ({}) already has {} cached".format( + remote.spec.url, element._get_brief_display_key() )) return pushed diff --git a/buildstream/_exceptions.py b/buildstream/_exceptions.py index 6fe4f4847..19606776e 100644 --- a/buildstream/_exceptions.py +++ b/buildstream/_exceptions.py @@ -312,3 +312,12 @@ class StreamError(BstError): class AppError(BstError): def __init__(self, message, detail=None, reason=None): super().__init__(message, detail=detail, domain=ErrorDomain.APP, reason=reason) + + +# SkipJob +# +# Raised from a child process within a job when the job should be +# considered skipped by the parent process. +# +class SkipJob(Exception): + pass diff --git a/buildstream/_scheduler/jobs/job.py b/buildstream/_scheduler/jobs/job.py index c55219b58..1c6b4a582 100644 --- a/buildstream/_scheduler/jobs/job.py +++ b/buildstream/_scheduler/jobs/job.py @@ -31,7 +31,7 @@ import multiprocessing import psutil # BuildStream toplevel imports -from ..._exceptions import ImplError, BstError, set_last_task_error +from ..._exceptions import ImplError, BstError, set_last_task_error, SkipJob from ..._message import Message, MessageType, unconditional_messages from ... import _signals, utils @@ -40,6 +40,7 @@ from ... import _signals, utils RC_OK = 0 RC_FAIL = 1 RC_PERM_FAIL = 2 +RC_SKIPPED = 3 # Used to distinguish between status messages and return values @@ -117,7 +118,7 @@ class Job(): self._max_retries = max_retries # Maximum number of automatic retries self._result = None # Return value of child action in the parent self._tries = 0 # Try count, for retryable jobs - + self._skipped_flag = False # Indicate whether the job was skipped. # If False, a retry will not be attempted regardless of whether _tries is less than _max_retries. # self._retry_flag = True @@ -277,6 +278,14 @@ class Job(): def set_task_id(self, task_id): self._task_id = task_id + # skipped + # + # Returns: + # bool: True if the job was skipped while processing. + @property + def skipped(self): + return self._skipped_flag + ####################################################### # Abstract Methods # ####################################################### @@ -398,6 +407,13 @@ class Job(): try: # Try the task action result = self.child_process() + except SkipJob as e: + elapsed = datetime.datetime.now() - starttime + self.message(MessageType.SKIPPED, str(e), + elapsed=elapsed, logfile=filename) + + # Alert parent of skip by return code + self._child_shutdown(RC_SKIPPED) except BstError as e: elapsed = datetime.datetime.now() - starttime self._retry_flag = e.temporary @@ -545,14 +561,18 @@ class Job(): # We don't want to retry if we got OK or a permanent fail. # This is set in _child_action but must also be set for the parent. # - self._retry_flag = returncode not in (RC_OK, RC_PERM_FAIL) + self._retry_flag = returncode == RC_FAIL + + # Set the flag to alert Queue that this job skipped. + self._skipped_flag = returncode == RC_SKIPPED if self._retry_flag and (self._tries <= self._max_retries) and not self._scheduler.terminated: self.spawn() return - self.parent_complete(returncode == RC_OK, self._result) - self._scheduler.job_completed(self, returncode == RC_OK) + success = returncode in (RC_OK, RC_SKIPPED) + self.parent_complete(success, self._result) + self._scheduler.job_completed(self, success) # Force the deletion of the queue and process objects to try and clean up FDs self._queue = self._process = None diff --git a/buildstream/_scheduler/queues/buildqueue.py b/buildstream/_scheduler/queues/buildqueue.py index 0c74b3698..39ed83a32 100644 --- a/buildstream/_scheduler/queues/buildqueue.py +++ b/buildstream/_scheduler/queues/buildqueue.py @@ -47,6 +47,7 @@ class BuildQueue(Queue): to_queue.append(element) continue + # XXX: Fix this, See https://mail.gnome.org/archives/buildstream-list/2018-September/msg00029.html # Bypass queue processing entirely the first time it's tried. self._tried.add(element) _, description, detail = element._get_build_result() @@ -113,5 +114,3 @@ class BuildQueue(Queue): # This has to be done after _assemble_done, such that the # element may register its cache key as required self._check_cache_size(job, element, result) - - return True diff --git a/buildstream/_scheduler/queues/fetchqueue.py b/buildstream/_scheduler/queues/fetchqueue.py index bd90a13b6..446dbbd3b 100644 --- a/buildstream/_scheduler/queues/fetchqueue.py +++ b/buildstream/_scheduler/queues/fetchqueue.py @@ -72,11 +72,9 @@ class FetchQueue(Queue): def done(self, _, element, result, success): if not success: - return False + return element._update_state() # Successful fetch, we must be CACHED now assert element._get_consistency() == Consistency.CACHED - - return True diff --git a/buildstream/_scheduler/queues/pullqueue.py b/buildstream/_scheduler/queues/pullqueue.py index e18967cf4..2842c5e21 100644 --- a/buildstream/_scheduler/queues/pullqueue.py +++ b/buildstream/_scheduler/queues/pullqueue.py @@ -21,6 +21,7 @@ # Local imports from . import Queue, QueueStatus from ..resources import ResourceType +from ..._exceptions import SkipJob # A queue which pulls element artifacts @@ -33,7 +34,8 @@ class PullQueue(Queue): def process(self, element): # returns whether an artifact was downloaded or not - return element._pull() + if not element._pull(): + raise SkipJob(self.action_name) def status(self, element): # state of dependencies may have changed, recalculate element state @@ -63,7 +65,3 @@ class PullQueue(Queue): # do not get an artifact size from pull jobs, we have to # actually check the cache size. self._scheduler.check_cache_size() - - # Element._pull() returns True if it downloaded an artifact, - # here we want to appear skipped if we did not download. - return result diff --git a/buildstream/_scheduler/queues/pushqueue.py b/buildstream/_scheduler/queues/pushqueue.py index 568e053d6..35532d23d 100644 --- a/buildstream/_scheduler/queues/pushqueue.py +++ b/buildstream/_scheduler/queues/pushqueue.py @@ -21,6 +21,7 @@ # Local imports from . import Queue, QueueStatus from ..resources import ResourceType +from ..._exceptions import SkipJob # A queue which pushes element artifacts @@ -33,20 +34,11 @@ class PushQueue(Queue): def process(self, element): # returns whether an artifact was uploaded or not - return element._push() + if not element._push(): + raise SkipJob(self.action_name) def status(self, element): if element._skip_push(): return QueueStatus.SKIP return QueueStatus.READY - - def done(self, _, element, result, success): - - if not success: - return False - - # Element._push() returns True if it uploaded an artifact, - # here we want to appear skipped if the remote already had - # the artifact. - return result diff --git a/buildstream/_scheduler/queues/queue.py b/buildstream/_scheduler/queues/queue.py index 472e033da..f058663a1 100644 --- a/buildstream/_scheduler/queues/queue.py +++ b/buildstream/_scheduler/queues/queue.py @@ -136,10 +136,6 @@ class Queue(): # success (bool): True if the process() implementation did not # raise any exception # - # Returns: - # (bool): True if the element should appear to be processsed, - # Otherwise False will count the element as "skipped" - # def done(self, job, element, result, success): pass @@ -306,8 +302,7 @@ class Queue(): # and determine if it should be considered as processed # or skipped. try: - processed = self.done(job, element, result, success) - + self.done(job, element, result, success) except BstError as e: # Report error and mark as failed @@ -337,7 +332,7 @@ class Queue(): self._done_queue.append(job) if success: - if processed: + if not job.skipped: self.processed_elements.append(element) else: self.skipped_elements.append(element) diff --git a/buildstream/_scheduler/queues/trackqueue.py b/buildstream/_scheduler/queues/trackqueue.py index f443df3be..133655e14 100644 --- a/buildstream/_scheduler/queues/trackqueue.py +++ b/buildstream/_scheduler/queues/trackqueue.py @@ -51,18 +51,11 @@ class TrackQueue(Queue): def done(self, _, element, result, success): if not success: - return False - - changed = False + return # Set the new refs in the main process one by one as they complete for unique_id, new_ref in result: source = _plugin_lookup(unique_id) - # We appear processed if at least one source has changed - if source._save_ref(new_ref): - changed = True + source._save_ref(new_ref) element._tracking_done() - - # We'll appear as a skipped element if tracking resulted in no change - return changed diff --git a/buildstream/element.py b/buildstream/element.py index 13d76dbad..6bc400bb9 100644 --- a/buildstream/element.py +++ b/buildstream/element.py @@ -1760,8 +1760,6 @@ class Element(Plugin): return False # Notify successfull download - display_key = self._get_brief_display_key() - self.info("Downloaded artifact {}".format(display_key)) return True # _skip_push(): @@ -1800,16 +1798,13 @@ class Element(Plugin): self.warn("Not pushing tainted artifact.") return False - display_key = self._get_brief_display_key() - with self.timed_activity("Pushing artifact {}".format(display_key)): - # Push all keys used for local commit - pushed = self.__artifacts.push(self, self.__get_cache_keys_for_commit()) - if not pushed: - return False + # Push all keys used for local commit + pushed = self.__artifacts.push(self, self.__get_cache_keys_for_commit()) + if not pushed: + return False - # Notify successful upload - self.info("Pushed artifact {}".format(display_key)) - return True + # Notify successful upload + return True # _shell(): # diff --git a/tests/frontend/pull.py b/tests/frontend/pull.py index ed9a9643e..c883e2030 100644 --- a/tests/frontend/pull.py +++ b/tests/frontend/pull.py @@ -356,4 +356,5 @@ def test_pull_missing_notifies_user(caplog, cli, tmpdir, datafiles): assert not result.get_pulled_elements(), \ "No elements should have been pulled since the cache was empty" - assert "SKIPPED Remote ({}) does not have".format(share.repo) in result.stderr + assert "INFO Remote ({}) does not have".format(share.repo) in result.stderr + assert "SKIPPED Pull" in result.stderr diff --git a/tests/frontend/push.py b/tests/frontend/push.py index f351e33be..f2d6814d6 100644 --- a/tests/frontend/push.py +++ b/tests/frontend/push.py @@ -386,3 +386,26 @@ def test_push_cross_junction(cli, tmpdir, datafiles): cache_key = cli.get_element_key(project, 'junction.bst:import-etc.bst') assert share.has_artifact('subtest', 'import-etc.bst', cache_key) + + +@pytest.mark.datafiles(DATA_DIR) +def test_push_already_cached(caplog, cli, tmpdir, datafiles): + project = os.path.join(datafiles.dirname, datafiles.basename) + caplog.set_level(1) + + with create_artifact_share(os.path.join(str(tmpdir), 'artifactshare')) as share: + + cli.configure({ + 'artifacts': {'url': share.repo, 'push': True} + }) + result = cli.run(project=project, args=['build', 'target.bst']) + + result.assert_success() + assert "SKIPPED Push" not in result.stderr + + result = cli.run(project=project, args=['push', 'target.bst']) + + result.assert_success() + assert not result.get_pushed_elements(), "No elements should have been pushed since the cache was populated" + assert "INFO Remote ({}) already has ".format(share.repo) in result.stderr + assert "SKIPPED Push" in result.stderr diff --git a/tests/testutils/runcli.py b/tests/testutils/runcli.py index 8cd5bcb75..3535e94ea 100644 --- a/tests/testutils/runcli.py +++ b/tests/testutils/runcli.py @@ -178,7 +178,7 @@ class Result(): return list(pushed) def get_pulled_elements(self): - pulled = re.findall(r'\[\s*pull:(\S+)\s*\]\s*INFO\s*Downloaded artifact', self.stderr) + pulled = re.findall(r'\[\s*pull:(\S+)\s*\]\s*INFO\s*Pulled artifact', self.stderr) if pulled is None: return [] |