summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorQinusty <jrsmith9822@gmail.com>2018-09-19 10:59:48 +0000
committerQinusty <jrsmith9822@gmail.com>2018-09-19 10:59:48 +0000
commitdddd6025705d4553f857695bb2b5e6bde6943556 (patch)
tree61013f4d95a36ac8b1c9603e539808a1c740adb7
parent72b5902157316e173de2eec5b3a2772283eec3c7 (diff)
parent213f77946c661d685ec6e74d5d243fbbb4549b79 (diff)
downloadbuildstream-dddd6025705d4553f857695bb2b5e6bde6943556.tar.gz
Merge branch 'Qinusty/skipped-rework' into 'master'
Add SkipJob for indicating a skipped activity See merge request BuildStream/buildstream!765
-rw-r--r--buildstream/_artifactcache/cascache.py24
-rw-r--r--buildstream/_exceptions.py9
-rw-r--r--buildstream/_scheduler/jobs/job.py30
-rw-r--r--buildstream/_scheduler/queues/buildqueue.py3
-rw-r--r--buildstream/_scheduler/queues/fetchqueue.py4
-rw-r--r--buildstream/_scheduler/queues/pullqueue.py8
-rw-r--r--buildstream/_scheduler/queues/pushqueue.py14
-rw-r--r--buildstream/_scheduler/queues/queue.py9
-rw-r--r--buildstream/_scheduler/queues/trackqueue.py11
-rw-r--r--buildstream/element.py17
-rw-r--r--tests/frontend/pull.py3
-rw-r--r--tests/frontend/push.py23
-rw-r--r--tests/testutils/runcli.py2
13 files changed, 88 insertions, 69 deletions
diff --git a/buildstream/_artifactcache/cascache.py b/buildstream/_artifactcache/cascache.py
index 9cf83a222..840e190f1 100644
--- a/buildstream/_artifactcache/cascache.py
+++ b/buildstream/_artifactcache/cascache.py
@@ -228,8 +228,8 @@ class CASCache(ArtifactCache):
for remote in self._remotes[project]:
try:
remote.init()
-
- element.info("Pulling {} <- {}".format(element._get_brief_display_key(), remote.spec.url))
+ display_key = element._get_brief_display_key()
+ element.status("Pulling artifact {} <- {}".format(display_key, remote.spec.url))
request = buildstream_pb2.GetReferenceRequest()
request.key = ref
@@ -243,6 +243,7 @@ class CASCache(ArtifactCache):
self.set_ref(ref, tree)
+ element.info("Pulled artifact {} <- {}".format(display_key, remote.spec.url))
# no need to pull from additional remotes
return True
@@ -251,11 +252,8 @@ class CASCache(ArtifactCache):
raise ArtifactError("Failed to pull artifact {}: {}".format(
element._get_brief_display_key(), e)) from e
else:
- self.context.message(Message(
- None,
- MessageType.SKIPPED,
- "Remote ({}) does not have {} cached".format(
- remote.spec.url, element._get_brief_display_key())
+ element.info("Remote ({}) does not have {} cached".format(
+ remote.spec.url, element._get_brief_display_key()
))
return False
@@ -336,17 +334,15 @@ class CASCache(ArtifactCache):
for remote in push_remotes:
remote.init()
-
- element.info("Pushing {} -> {}".format(element._get_brief_display_key(), remote.spec.url))
+ display_key = element._get_brief_display_key()
+ element.status("Pushing artifact {} -> {}".format(display_key, remote.spec.url))
if self._push_refs_to_remote(refs, remote):
+ element.info("Pushed artifact {} -> {}".format(display_key, remote.spec.url))
pushed = True
else:
- self.context.message(Message(
- None,
- MessageType.SKIPPED,
- "Remote ({}) already has {} cached".format(
- remote.spec.url, element._get_brief_display_key())
+ element.info("Remote ({}) already has {} cached".format(
+ remote.spec.url, element._get_brief_display_key()
))
return pushed
diff --git a/buildstream/_exceptions.py b/buildstream/_exceptions.py
index 6fe4f4847..19606776e 100644
--- a/buildstream/_exceptions.py
+++ b/buildstream/_exceptions.py
@@ -312,3 +312,12 @@ class StreamError(BstError):
class AppError(BstError):
def __init__(self, message, detail=None, reason=None):
super().__init__(message, detail=detail, domain=ErrorDomain.APP, reason=reason)
+
+
+# SkipJob
+#
+# Raised from a child process within a job when the job should be
+# considered skipped by the parent process.
+#
+class SkipJob(Exception):
+ pass
diff --git a/buildstream/_scheduler/jobs/job.py b/buildstream/_scheduler/jobs/job.py
index c55219b58..1c6b4a582 100644
--- a/buildstream/_scheduler/jobs/job.py
+++ b/buildstream/_scheduler/jobs/job.py
@@ -31,7 +31,7 @@ import multiprocessing
import psutil
# BuildStream toplevel imports
-from ..._exceptions import ImplError, BstError, set_last_task_error
+from ..._exceptions import ImplError, BstError, set_last_task_error, SkipJob
from ..._message import Message, MessageType, unconditional_messages
from ... import _signals, utils
@@ -40,6 +40,7 @@ from ... import _signals, utils
RC_OK = 0
RC_FAIL = 1
RC_PERM_FAIL = 2
+RC_SKIPPED = 3
# Used to distinguish between status messages and return values
@@ -117,7 +118,7 @@ class Job():
self._max_retries = max_retries # Maximum number of automatic retries
self._result = None # Return value of child action in the parent
self._tries = 0 # Try count, for retryable jobs
-
+ self._skipped_flag = False # Indicate whether the job was skipped.
# If False, a retry will not be attempted regardless of whether _tries is less than _max_retries.
#
self._retry_flag = True
@@ -277,6 +278,14 @@ class Job():
def set_task_id(self, task_id):
self._task_id = task_id
+ # skipped
+ #
+ # Returns:
+ # bool: True if the job was skipped while processing.
+ @property
+ def skipped(self):
+ return self._skipped_flag
+
#######################################################
# Abstract Methods #
#######################################################
@@ -398,6 +407,13 @@ class Job():
try:
# Try the task action
result = self.child_process()
+ except SkipJob as e:
+ elapsed = datetime.datetime.now() - starttime
+ self.message(MessageType.SKIPPED, str(e),
+ elapsed=elapsed, logfile=filename)
+
+ # Alert parent of skip by return code
+ self._child_shutdown(RC_SKIPPED)
except BstError as e:
elapsed = datetime.datetime.now() - starttime
self._retry_flag = e.temporary
@@ -545,14 +561,18 @@ class Job():
# We don't want to retry if we got OK or a permanent fail.
# This is set in _child_action but must also be set for the parent.
#
- self._retry_flag = returncode not in (RC_OK, RC_PERM_FAIL)
+ self._retry_flag = returncode == RC_FAIL
+
+ # Set the flag to alert Queue that this job skipped.
+ self._skipped_flag = returncode == RC_SKIPPED
if self._retry_flag and (self._tries <= self._max_retries) and not self._scheduler.terminated:
self.spawn()
return
- self.parent_complete(returncode == RC_OK, self._result)
- self._scheduler.job_completed(self, returncode == RC_OK)
+ success = returncode in (RC_OK, RC_SKIPPED)
+ self.parent_complete(success, self._result)
+ self._scheduler.job_completed(self, success)
# Force the deletion of the queue and process objects to try and clean up FDs
self._queue = self._process = None
diff --git a/buildstream/_scheduler/queues/buildqueue.py b/buildstream/_scheduler/queues/buildqueue.py
index 0c74b3698..39ed83a32 100644
--- a/buildstream/_scheduler/queues/buildqueue.py
+++ b/buildstream/_scheduler/queues/buildqueue.py
@@ -47,6 +47,7 @@ class BuildQueue(Queue):
to_queue.append(element)
continue
+ # XXX: Fix this, See https://mail.gnome.org/archives/buildstream-list/2018-September/msg00029.html
# Bypass queue processing entirely the first time it's tried.
self._tried.add(element)
_, description, detail = element._get_build_result()
@@ -113,5 +114,3 @@ class BuildQueue(Queue):
# This has to be done after _assemble_done, such that the
# element may register its cache key as required
self._check_cache_size(job, element, result)
-
- return True
diff --git a/buildstream/_scheduler/queues/fetchqueue.py b/buildstream/_scheduler/queues/fetchqueue.py
index bd90a13b6..446dbbd3b 100644
--- a/buildstream/_scheduler/queues/fetchqueue.py
+++ b/buildstream/_scheduler/queues/fetchqueue.py
@@ -72,11 +72,9 @@ class FetchQueue(Queue):
def done(self, _, element, result, success):
if not success:
- return False
+ return
element._update_state()
# Successful fetch, we must be CACHED now
assert element._get_consistency() == Consistency.CACHED
-
- return True
diff --git a/buildstream/_scheduler/queues/pullqueue.py b/buildstream/_scheduler/queues/pullqueue.py
index e18967cf4..2842c5e21 100644
--- a/buildstream/_scheduler/queues/pullqueue.py
+++ b/buildstream/_scheduler/queues/pullqueue.py
@@ -21,6 +21,7 @@
# Local imports
from . import Queue, QueueStatus
from ..resources import ResourceType
+from ..._exceptions import SkipJob
# A queue which pulls element artifacts
@@ -33,7 +34,8 @@ class PullQueue(Queue):
def process(self, element):
# returns whether an artifact was downloaded or not
- return element._pull()
+ if not element._pull():
+ raise SkipJob(self.action_name)
def status(self, element):
# state of dependencies may have changed, recalculate element state
@@ -63,7 +65,3 @@ class PullQueue(Queue):
# do not get an artifact size from pull jobs, we have to
# actually check the cache size.
self._scheduler.check_cache_size()
-
- # Element._pull() returns True if it downloaded an artifact,
- # here we want to appear skipped if we did not download.
- return result
diff --git a/buildstream/_scheduler/queues/pushqueue.py b/buildstream/_scheduler/queues/pushqueue.py
index 568e053d6..35532d23d 100644
--- a/buildstream/_scheduler/queues/pushqueue.py
+++ b/buildstream/_scheduler/queues/pushqueue.py
@@ -21,6 +21,7 @@
# Local imports
from . import Queue, QueueStatus
from ..resources import ResourceType
+from ..._exceptions import SkipJob
# A queue which pushes element artifacts
@@ -33,20 +34,11 @@ class PushQueue(Queue):
def process(self, element):
# returns whether an artifact was uploaded or not
- return element._push()
+ if not element._push():
+ raise SkipJob(self.action_name)
def status(self, element):
if element._skip_push():
return QueueStatus.SKIP
return QueueStatus.READY
-
- def done(self, _, element, result, success):
-
- if not success:
- return False
-
- # Element._push() returns True if it uploaded an artifact,
- # here we want to appear skipped if the remote already had
- # the artifact.
- return result
diff --git a/buildstream/_scheduler/queues/queue.py b/buildstream/_scheduler/queues/queue.py
index 472e033da..f058663a1 100644
--- a/buildstream/_scheduler/queues/queue.py
+++ b/buildstream/_scheduler/queues/queue.py
@@ -136,10 +136,6 @@ class Queue():
# success (bool): True if the process() implementation did not
# raise any exception
#
- # Returns:
- # (bool): True if the element should appear to be processsed,
- # Otherwise False will count the element as "skipped"
- #
def done(self, job, element, result, success):
pass
@@ -306,8 +302,7 @@ class Queue():
# and determine if it should be considered as processed
# or skipped.
try:
- processed = self.done(job, element, result, success)
-
+ self.done(job, element, result, success)
except BstError as e:
# Report error and mark as failed
@@ -337,7 +332,7 @@ class Queue():
self._done_queue.append(job)
if success:
- if processed:
+ if not job.skipped:
self.processed_elements.append(element)
else:
self.skipped_elements.append(element)
diff --git a/buildstream/_scheduler/queues/trackqueue.py b/buildstream/_scheduler/queues/trackqueue.py
index f443df3be..133655e14 100644
--- a/buildstream/_scheduler/queues/trackqueue.py
+++ b/buildstream/_scheduler/queues/trackqueue.py
@@ -51,18 +51,11 @@ class TrackQueue(Queue):
def done(self, _, element, result, success):
if not success:
- return False
-
- changed = False
+ return
# Set the new refs in the main process one by one as they complete
for unique_id, new_ref in result:
source = _plugin_lookup(unique_id)
- # We appear processed if at least one source has changed
- if source._save_ref(new_ref):
- changed = True
+ source._save_ref(new_ref)
element._tracking_done()
-
- # We'll appear as a skipped element if tracking resulted in no change
- return changed
diff --git a/buildstream/element.py b/buildstream/element.py
index 13d76dbad..6bc400bb9 100644
--- a/buildstream/element.py
+++ b/buildstream/element.py
@@ -1760,8 +1760,6 @@ class Element(Plugin):
return False
# Notify successfull download
- display_key = self._get_brief_display_key()
- self.info("Downloaded artifact {}".format(display_key))
return True
# _skip_push():
@@ -1800,16 +1798,13 @@ class Element(Plugin):
self.warn("Not pushing tainted artifact.")
return False
- display_key = self._get_brief_display_key()
- with self.timed_activity("Pushing artifact {}".format(display_key)):
- # Push all keys used for local commit
- pushed = self.__artifacts.push(self, self.__get_cache_keys_for_commit())
- if not pushed:
- return False
+ # Push all keys used for local commit
+ pushed = self.__artifacts.push(self, self.__get_cache_keys_for_commit())
+ if not pushed:
+ return False
- # Notify successful upload
- self.info("Pushed artifact {}".format(display_key))
- return True
+ # Notify successful upload
+ return True
# _shell():
#
diff --git a/tests/frontend/pull.py b/tests/frontend/pull.py
index ed9a9643e..c883e2030 100644
--- a/tests/frontend/pull.py
+++ b/tests/frontend/pull.py
@@ -356,4 +356,5 @@ def test_pull_missing_notifies_user(caplog, cli, tmpdir, datafiles):
assert not result.get_pulled_elements(), \
"No elements should have been pulled since the cache was empty"
- assert "SKIPPED Remote ({}) does not have".format(share.repo) in result.stderr
+ assert "INFO Remote ({}) does not have".format(share.repo) in result.stderr
+ assert "SKIPPED Pull" in result.stderr
diff --git a/tests/frontend/push.py b/tests/frontend/push.py
index f351e33be..f2d6814d6 100644
--- a/tests/frontend/push.py
+++ b/tests/frontend/push.py
@@ -386,3 +386,26 @@ def test_push_cross_junction(cli, tmpdir, datafiles):
cache_key = cli.get_element_key(project, 'junction.bst:import-etc.bst')
assert share.has_artifact('subtest', 'import-etc.bst', cache_key)
+
+
+@pytest.mark.datafiles(DATA_DIR)
+def test_push_already_cached(caplog, cli, tmpdir, datafiles):
+ project = os.path.join(datafiles.dirname, datafiles.basename)
+ caplog.set_level(1)
+
+ with create_artifact_share(os.path.join(str(tmpdir), 'artifactshare')) as share:
+
+ cli.configure({
+ 'artifacts': {'url': share.repo, 'push': True}
+ })
+ result = cli.run(project=project, args=['build', 'target.bst'])
+
+ result.assert_success()
+ assert "SKIPPED Push" not in result.stderr
+
+ result = cli.run(project=project, args=['push', 'target.bst'])
+
+ result.assert_success()
+ assert not result.get_pushed_elements(), "No elements should have been pushed since the cache was populated"
+ assert "INFO Remote ({}) already has ".format(share.repo) in result.stderr
+ assert "SKIPPED Push" in result.stderr
diff --git a/tests/testutils/runcli.py b/tests/testutils/runcli.py
index 8cd5bcb75..3535e94ea 100644
--- a/tests/testutils/runcli.py
+++ b/tests/testutils/runcli.py
@@ -178,7 +178,7 @@ class Result():
return list(pushed)
def get_pulled_elements(self):
- pulled = re.findall(r'\[\s*pull:(\S+)\s*\]\s*INFO\s*Downloaded artifact', self.stderr)
+ pulled = re.findall(r'\[\s*pull:(\S+)\s*\]\s*INFO\s*Pulled artifact', self.stderr)
if pulled is None:
return []