summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJames E. Blair <jim@acmegating.com>2022-06-25 15:17:18 -0700
committerJames E. Blair <jim@acmegating.com>2022-06-29 15:33:13 -0700
commit39aded45178520fba6190f81660025573951a6e4 (patch)
treee2ac835a281df884e7caa394105efc8b5c689b71
parentf2d4ff276b2e22be982cf261864dd938f707288a (diff)
downloadzuul-39aded45178520fba6190f81660025573951a6e4.tar.gz
Fix merging with submitWholeTopic
Previously support for Gerrit's submitWholeTopic feature was added so that when it is enabled, changes that are submitted together are treated as circular dependencies in Zuul. However, this feature did not work in a gating pipeline because when that setting is enabled, Gerrit requires all changes to be mergable at once so that it can attempt to atomically merge all of them. That means that every change would need a Verified+2 vote before any change can be submitted. Zuul leaves the vote and submits each change one at a time. (Note, this does not affect the emulated submitWholeTopic feature in Zuul, since in that case, Gerrit will merge each change alone.) To correct this, a two-phase option is added to reporters. In phase1, reporters will report all information about the change but not submit. In phase2, they will only submit. In the cases where we are about to submit a successful bundle, we enable the two-phase option and report the entire bundle without submitting first, then proceed to submit each change in the bundle in sequence as normal. In Gerrit, if submitWholeTopic is enabled, this will cause all changes to be submitted as soon as the first one is, but that's okay because we handle the case where we try to submit a change and it is already submitted. The fake Gerrit used in the Zuul unit tests is updated to match the real Gerrit in these cases. If submitWholeTopic is enabled, it will return a 409 to submit requests if the whole topic is not able to be submitted. One unit test of failed bundle reporting is adjusted since we will now report the buildset result to all changes before potentially reporting a second time if the bundle later fails to merge. While this does mean that some changes will have extra report messages, it actually makes the behavior consistent (before, some changes would have 2 reports and some would have only 1; now all changes will have 2 reports: the expected result and then a second report of the unexpected merge failure). All reporters are updated to handle the two-phase reporting. Since all reporters have different API methods to leave comments and merge changes, this won't actually cause any extra API calls even for reporters which don't need two-phase reporting. Non-merging reporters (MQTT, SMTP, etc) simply ignore phase2. Change-Id: Ibf377ab5b7141fe60ecfd5cbbb296bb4f9c24265
-rw-r--r--tests/base.py99
-rw-r--r--tests/unit/test_circular_dependencies.py2
-rw-r--r--tests/unit/test_gerrit.py81
-rw-r--r--zuul/driver/elasticsearch/reporter.py4
-rw-r--r--zuul/driver/gerrit/gerritconnection.py139
-rw-r--r--zuul/driver/gerrit/gerritreporter.py5
-rw-r--r--zuul/driver/github/githubreporter.py38
-rw-r--r--zuul/driver/gitlab/gitlabreporter.py17
-rw-r--r--zuul/driver/mqtt/mqttreporter.py4
-rw-r--r--zuul/driver/pagure/pagurereporter.py25
-rw-r--r--zuul/driver/smtp/smtpreporter.py4
-rw-r--r--zuul/manager/__init__.py49
-rw-r--r--zuul/reporter/__init__.py15
13 files changed, 320 insertions, 162 deletions
diff --git a/tests/base.py b/tests/base.py
index 504a9718d..e2845fe8f 100644
--- a/tests/base.py
+++ b/tests/base.py
@@ -1035,6 +1035,10 @@ class GerritWebServer(object):
self.send_response(404)
self.end_headers()
+ def _409(self):
+ self.send_response(409)
+ self.end_headers()
+
def _get_change(self, change_id):
change_id = urllib.parse.unquote(change_id)
project, branch, change = change_id.split('~')
@@ -1060,7 +1064,7 @@ class GerritWebServer(object):
tag = data.get('tag', None)
fake_gerrit._test_handle_review(
int(change.data['number']), message, False, labels,
- comments, tag=tag)
+ True, False, comments, tag=tag)
self.send_response(200)
self.end_headers()
@@ -1069,10 +1073,26 @@ class GerritWebServer(object):
if not change:
return self._404()
+ candidate = self._get_change(change_id)
+ sr = candidate.getSubmitRecords()
+ if sr[0]['status'] != 'OK':
+ # One of the changes in this topic isn't
+ # ready to merge
+ return self._409()
+ if fake_gerrit._fake_submit_whole_topic:
+ results = fake_gerrit._test_get_submitted_together(change)
+ for record in results:
+ candidate = self._get_change(record['id'])
+ sr = candidate.getSubmitRecords()
+ if sr[0]['status'] != 'OK':
+ # One of the changes in this topic isn't
+ # ready to merge
+ return self._409()
message = None
labels = {}
fake_gerrit._test_handle_review(
- int(change.data['number']), message, True, labels)
+ int(change.data['number']), message, True, labels,
+ False, True)
self.send_response(200)
self.end_headers()
@@ -1147,19 +1167,8 @@ class GerritWebServer(object):
change = fake_gerrit.changes.get(int(number))
if not change:
return self._404()
- topic = change.data.get('topic')
- if not fake_gerrit._fake_submit_whole_topic:
- topic = None
- if topic:
- results = fake_gerrit._simpleQuery(
- f'topic:{topic}', http=True)
- else:
- results = []
- for dep in change.data.get('dependsOn', []):
- dep_change = fake_gerrit.changes.get(int(dep['number']))
- r = dep_change.queryHTTP(internal=True)
- if r not in results:
- results.append(r)
+
+ results = fake_gerrit._test_get_submitted_together(change)
self.send_data(results)
self.end_headers()
@@ -1378,16 +1387,33 @@ class FakeGerritConnection(gerritconnection.GerritConnection):
return event
def review(self, item, message, submit, labels, checks_api, file_comments,
- zuul_event_id=None):
+ phase1, phase2, zuul_event_id=None):
if self.web_server:
return super(FakeGerritConnection, self).review(
item, message, submit, labels, checks_api, file_comments,
- zuul_event_id)
+ phase1, phase2, zuul_event_id)
self._test_handle_review(int(item.change.number), message, submit,
- labels)
+ labels, phase1, phase2)
+
+ def _test_get_submitted_together(self, change):
+ topic = change.data.get('topic')
+ if not self._fake_submit_whole_topic:
+ topic = None
+ if topic:
+ results = self._simpleQuery(f'topic:{topic}', http=True)
+ else:
+ results = [change.queryHTTP(internal=True)]
+ for dep in change.data.get('dependsOn', []):
+ dep_change = self.changes.get(int(dep['number']))
+ r = dep_change.queryHTTP(internal=True)
+ if r not in results:
+ results.append(r)
+ if len(results) == 1:
+ return []
+ return results
def _test_handle_review(self, change_number, message, submit, labels,
- file_comments=None, tag=None):
+ phase1, phase2, file_comments=None, tag=None):
# Handle a review action from a test
change = self.changes[change_number]
@@ -1401,24 +1427,25 @@ class FakeGerritConnection(gerritconnection.GerritConnection):
# happens they can add their own verified event into the queue.
# Nevertheless, we can update change with the new review in gerrit.
- for cat in labels:
- change.addApproval(cat, labels[cat], username=self.user,
- tag=tag)
-
- if message:
- change.messages.append(message)
-
- if file_comments:
- for filename, commentlist in file_comments.items():
- for comment in commentlist:
- change.addComment(filename, comment['line'],
- comment['message'], 'Zuul',
- 'zuul@example.com', self.user,
- comment.get('range'))
- if submit:
+ if phase1:
+ for cat in labels:
+ change.addApproval(cat, labels[cat], username=self.user,
+ tag=tag)
+
+ if message:
+ change.messages.append(message)
+
+ if file_comments:
+ for filename, commentlist in file_comments.items():
+ for comment in commentlist:
+ change.addComment(filename, comment['line'],
+ comment['message'], 'Zuul',
+ 'zuul@example.com', self.user,
+ comment.get('range'))
+ if message:
+ change.setReported()
+ if submit and phase2:
change.setMerged()
- if message:
- change.setReported()
def queryChangeSSH(self, number, event=None):
self.log.debug("Query change SSH: %s", number)
diff --git a/tests/unit/test_circular_dependencies.py b/tests/unit/test_circular_dependencies.py
index c0efb7156..315840dfd 100644
--- a/tests/unit/test_circular_dependencies.py
+++ b/tests/unit/test_circular_dependencies.py
@@ -766,7 +766,7 @@ class TestGerritCircularDependencies(ZuulTestCase):
self.fake_gerrit.addEvent(A.addApproval("Approved", 1))
self.waitUntilSettled()
- self.assertEqual(A.reported, 2)
+ self.assertEqual(A.reported, 3)
self.assertEqual(B.reported, 3)
self.assertEqual(A.patchsets[-1]["approvals"][-1]["value"], "-2")
self.assertEqual(B.patchsets[-1]["approvals"][-1]["value"], "-2")
diff --git a/tests/unit/test_gerrit.py b/tests/unit/test_gerrit.py
index bcb8f7bc8..66f1211b6 100644
--- a/tests/unit/test_gerrit.py
+++ b/tests/unit/test_gerrit.py
@@ -767,3 +767,84 @@ class TestWrongConnection(ZuulTestCase):
dict(name='test-job', result='SUCCESS', changes='1,1'),
dict(name='test-job', result='SUCCESS', changes='2,1'),
], ordered=False)
+
+
+class TestGerritFake(ZuulTestCase):
+ config_file = "zuul-gerrit-github.conf"
+ tenant_config_file = "config/circular-dependencies/main.yaml"
+
+ def _get_tuple(self, change_number):
+ ret = []
+ data = self.fake_gerrit.get(
+ f'changes/{change_number}/submitted_together')
+ for c in data:
+ dep_change = c['_number']
+ dep_ps = c['revisions'][c['current_revision']]['_number']
+ ret.append((dep_change, dep_ps))
+ return sorted(ret)
+
+ def test_submitted_together_normal(self):
+ # Test that the fake submitted together endpoint returns
+ # expected data
+
+ # This test verifies behavior with submitWholeTopic=False
+
+ # A single change
+ A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
+ data = self._get_tuple(1)
+ self.assertEqual(data, [])
+ ret = self.fake_gerrit._getSubmittedTogether(A, None)
+ self.assertEqual(ret, [])
+
+ # A dependent series (B->A)
+ B = self.fake_gerrit.addFakeChange('org/project', 'master', 'B')
+ B.setDependsOn(A, 1)
+ data = self._get_tuple(2)
+ self.assertEqual(data, [(1, 1), (2, 1)])
+ # The Gerrit connection method filters out the queried change
+ ret = self.fake_gerrit._getSubmittedTogether(B, None)
+ self.assertEqual(ret, [(1, 1)])
+
+ # A topic cycle
+ C1 = self.fake_gerrit.addFakeChange('org/project', 'master', 'C1',
+ topic='test-topic')
+ self.fake_gerrit.addFakeChange('org/project', 'master', 'C2',
+ topic='test-topic')
+ data = self._get_tuple(3)
+ self.assertEqual(data, [])
+ ret = self.fake_gerrit._getSubmittedTogether(C1, None)
+ self.assertEqual(ret, [])
+
+ def test_submitted_together_whole_topic(self):
+ # Test that the fake submitted together endpoint returns
+ # expected data
+
+ # This test verifies behavior with submitWholeTopic=True
+ self.fake_gerrit._fake_submit_whole_topic = True
+
+ # A single change
+ A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
+ data = self._get_tuple(1)
+ self.assertEqual(data, [])
+ ret = self.fake_gerrit._getSubmittedTogether(A, None)
+ self.assertEqual(ret, [])
+
+ # A dependent series (B->A)
+ B = self.fake_gerrit.addFakeChange('org/project', 'master', 'B')
+ B.setDependsOn(A, 1)
+ data = self._get_tuple(2)
+ self.assertEqual(data, [(1, 1), (2, 1)])
+ # The Gerrit connection method filters out the queried change
+ ret = self.fake_gerrit._getSubmittedTogether(B, None)
+ self.assertEqual(ret, [(1, 1)])
+
+ # A topic cycle
+ C1 = self.fake_gerrit.addFakeChange('org/project', 'master', 'C1',
+ topic='test-topic')
+ self.fake_gerrit.addFakeChange('org/project', 'master', 'C2',
+ topic='test-topic')
+ data = self._get_tuple(3)
+ self.assertEqual(data, [(3, 1), (4, 1)])
+ # The Gerrit connection method filters out the queried change
+ ret = self.fake_gerrit._getSubmittedTogether(C1, None)
+ self.assertEqual(ret, [(4, 1)])
diff --git a/zuul/driver/elasticsearch/reporter.py b/zuul/driver/elasticsearch/reporter.py
index e35bbcd48..7802cb609 100644
--- a/zuul/driver/elasticsearch/reporter.py
+++ b/zuul/driver/elasticsearch/reporter.py
@@ -30,8 +30,10 @@ class ElasticsearchReporter(BaseReporter):
self.index_vars = self.config.get('index-vars')
self.index_returned_vars = self.config.get('index-returned-vars')
- def report(self, item):
+ def report(self, item, phase1=True, phase2=True):
"""Create an entry into a database."""
+ if not phase1:
+ return
docs = []
index = '%s.%s-%s' % (self.index, item.pipeline.tenant.name,
time.strftime("%Y.%m.%d"))
diff --git a/zuul/driver/gerrit/gerritconnection.py b/zuul/driver/gerrit/gerritconnection.py
index 005f62b9a..6b12b4cd2 100644
--- a/zuul/driver/gerrit/gerritconnection.py
+++ b/zuul/driver/gerrit/gerritconnection.py
@@ -1203,16 +1203,17 @@ class GerritConnection(ZKChangeCacheMixin, ZKBranchCacheMixin, BaseConnection):
self.event_queue.put(event)
def review(self, item, message, submit, labels, checks_api,
- file_comments, zuul_event_id=None):
+ file_comments, phase1, phase2, zuul_event_id=None):
if self.session:
meth = self.review_http
else:
meth = self.review_ssh
return meth(item, message, submit, labels, checks_api,
- file_comments, zuul_event_id=zuul_event_id)
+ file_comments, phase1, phase2,
+ zuul_event_id=zuul_event_id)
def review_ssh(self, item, message, submit, labels, checks_api,
- file_comments, zuul_event_id=None):
+ file_comments, phase1, phase2, zuul_event_id=None):
log = get_annotated_logger(self.log, zuul_event_id)
if checks_api:
log.error("Zuul is configured to report to the checks API, "
@@ -1221,23 +1222,24 @@ class GerritConnection(ZKChangeCacheMixin, ZKBranchCacheMixin, BaseConnection):
change = item.change
project = change.project.name
cmd = 'gerrit review --project %s' % project
- if message:
- b_len = len(message.encode('utf-8'))
- if b_len >= GERRIT_HUMAN_MESSAGE_LIMIT:
- log.info("Message truncated %d > %d" %
- (b_len, GERRIT_HUMAN_MESSAGE_LIMIT))
- message = ("%s... (truncated)" %
- message[:GERRIT_HUMAN_MESSAGE_LIMIT - 20])
- cmd += ' --message %s' % shlex.quote(message)
- if submit:
+ if phase1:
+ if message:
+ b_len = len(message.encode('utf-8'))
+ if b_len >= GERRIT_HUMAN_MESSAGE_LIMIT:
+ log.info("Message truncated %d > %d" %
+ (b_len, GERRIT_HUMAN_MESSAGE_LIMIT))
+ message = ("%s... (truncated)" %
+ message[:GERRIT_HUMAN_MESSAGE_LIMIT - 20])
+ cmd += ' --message %s' % shlex.quote(message)
+ for key, val in labels.items():
+ if val is True:
+ cmd += ' --%s' % key
+ else:
+ cmd += ' --label %s=%s' % (key, val)
+ if self.version >= (2, 13, 0):
+ cmd += ' --tag autogenerated:zuul:%s' % (item.pipeline.name)
+ if phase2 and submit:
cmd += ' --submit'
- for key, val in labels.items():
- if val is True:
- cmd += ' --%s' % key
- else:
- cmd += ' --label %s=%s' % (key, val)
- if self.version >= (2, 13, 0):
- cmd += ' --tag autogenerated:zuul:%s' % (item.pipeline.name)
changeid = '%s,%s' % (change.number, change.patchset)
cmd += ' %s' % changeid
out, err = self._ssh(cmd, zuul_event_id=zuul_event_id)
@@ -1290,8 +1292,13 @@ class GerritConnection(ZKChangeCacheMixin, ZKBranchCacheMixin, BaseConnection):
time.sleep(x * 10)
def review_http(self, item, message, submit, labels,
- checks_api, file_comments, zuul_event_id=None):
+ checks_api, file_comments, phase1, phase2,
+ zuul_event_id=None):
change = item.change
+ changeid = "%s~%s~%s" % (
+ urllib.parse.quote(str(change.project), safe=''),
+ urllib.parse.quote(str(change.branch), safe=''),
+ change.id)
log = get_annotated_logger(self.log, zuul_event_id)
b_len = len(message.encode('utf-8'))
if b_len >= GERRIT_HUMAN_MESSAGE_LIMIT:
@@ -1299,53 +1306,51 @@ class GerritConnection(ZKChangeCacheMixin, ZKBranchCacheMixin, BaseConnection):
(b_len, GERRIT_HUMAN_MESSAGE_LIMIT))
message = ("%s... (truncated)" %
message[:GERRIT_HUMAN_MESSAGE_LIMIT - 20])
- data = dict(message=message,
- strict_labels=False)
- if change.is_current_patchset:
- if labels:
- data['labels'] = labels
- if file_comments:
- if self.version >= (2, 15, 0):
- file_comments = copy.deepcopy(file_comments)
- url = item.formatStatusUrl()
- for comments in itertools.chain(file_comments.values()):
- for comment in comments:
- comment['robot_id'] = 'zuul'
- comment['robot_run_id'] = \
- item.current_build_set.uuid
- if url:
- comment['url'] = url
- data['robot_comments'] = file_comments
- else:
- data['comments'] = file_comments
- if self.version >= (2, 13, 0):
- data['tag'] = 'autogenerated:zuul:%s' % (item.pipeline.name)
- changeid = "%s~%s~%s" % (
- urllib.parse.quote(str(change.project), safe=''),
- urllib.parse.quote(str(change.branch), safe=''),
- change.id)
- if checks_api:
- self.report_checks(log, item, changeid, checks_api)
- if (message or data.get('labels') or data.get('comments')
- or data.get('robot_comments')):
- for x in range(1, 4):
- try:
- self.post('changes/%s/revisions/%s/review' %
- (changeid, change.commit),
- data)
- break
- except HTTPConflictException:
- log.exception("Conflict submitting data to gerrit.")
- break
- except HTTPBadRequestException:
- log.exception(
- "Bad request submitting check data to gerrit.")
- break
- except Exception:
- log.exception(
- "Error submitting data to gerrit, attempt %s", x)
- time.sleep(x * 10)
- if change.is_current_patchset and submit:
+ data = dict(strict_labels=False)
+ if phase1:
+ data['message'] = message
+ if change.is_current_patchset:
+ if labels:
+ data['labels'] = labels
+ if file_comments:
+ if self.version >= (2, 15, 0):
+ file_comments = copy.deepcopy(file_comments)
+ url = item.formatStatusUrl()
+ for comments in itertools.chain(
+ file_comments.values()):
+ for comment in comments:
+ comment['robot_id'] = 'zuul'
+ comment['robot_run_id'] = \
+ item.current_build_set.uuid
+ if url:
+ comment['url'] = url
+ data['robot_comments'] = file_comments
+ else:
+ data['comments'] = file_comments
+ if self.version >= (2, 13, 0):
+ data['tag'] = 'autogenerated:zuul:%s' % (item.pipeline.name)
+ if checks_api:
+ self.report_checks(log, item, changeid, checks_api)
+ if (message or data.get('labels') or data.get('comments')
+ or data.get('robot_comments')):
+ for x in range(1, 4):
+ try:
+ self.post('changes/%s/revisions/%s/review' %
+ (changeid, change.commit),
+ data)
+ break
+ except HTTPConflictException:
+ log.exception("Conflict submitting data to gerrit.")
+ break
+ except HTTPBadRequestException:
+ log.exception(
+ "Bad request submitting check data to gerrit.")
+ break
+ except Exception:
+ log.exception(
+ "Error submitting data to gerrit, attempt %s", x)
+ time.sleep(x * 10)
+ if phase2 and change.is_current_patchset and submit:
for x in range(1, 4):
try:
self.post('changes/%s/submit' % (changeid,), {})
diff --git a/zuul/driver/gerrit/gerritreporter.py b/zuul/driver/gerrit/gerritreporter.py
index fbee0e0ec..984b8742a 100644
--- a/zuul/driver/gerrit/gerritreporter.py
+++ b/zuul/driver/gerrit/gerritreporter.py
@@ -35,7 +35,7 @@ class GerritReporter(BaseReporter):
self._checks_api = action.pop('checks-api', None)
self._labels = action
- def report(self, item):
+ def report(self, item, phase1=True, phase2=True):
"""Send a message to gerrit."""
log = get_annotated_logger(self.log, item.event)
@@ -70,7 +70,8 @@ class GerritReporter(BaseReporter):
return self.connection.review(item, message, self._submit,
self._labels, self._checks_api,
- comments, zuul_event_id=item.event)
+ comments, phase1, phase2,
+ zuul_event_id=item.event)
def getSubmitAllowNeeds(self):
"""Get a list of code review labels that are allowed to be
diff --git a/zuul/driver/github/githubreporter.py b/zuul/driver/github/githubreporter.py
index c5297a23a..de62f2565 100644
--- a/zuul/driver/github/githubreporter.py
+++ b/zuul/driver/github/githubreporter.py
@@ -55,7 +55,7 @@ class GithubReporter(BaseReporter):
self._unlabels = [self._unlabels]
self.context = "{}/{}".format(pipeline.tenant.name, pipeline.name)
- def report(self, item):
+ def report(self, item, phase1=True, phase2=True):
"""Report on an event."""
# If the source is not GithubSource we cannot report anything here.
if not isinstance(item.change.project.source, GithubSource):
@@ -69,7 +69,7 @@ class GithubReporter(BaseReporter):
# order is important for github branch protection.
# A status should be set before a merge attempt
- if self._commit_status is not None:
+ if phase1 and self._commit_status is not None:
if (hasattr(item.change, 'patchset') and
item.change.patchset is not None):
self.setCommitStatus(item)
@@ -80,22 +80,24 @@ class GithubReporter(BaseReporter):
# If the change is not a pull request (e.g. a push) skip them.
if hasattr(item.change, 'number'):
errors_received = False
- if self._labels or self._unlabels:
- self.setLabels(item)
- if self._review:
- self.addReview(item)
- if self._check:
- check_errors = self.updateCheck(item)
- # TODO (felix): We could use this mechanism to also report back
- # errors from label and review actions
- if check_errors:
- item.current_build_set.warning_messages.extend(
- check_errors
- )
- errors_received = True
- if self._create_comment or errors_received:
- self.addPullComment(item)
- if (self._merge):
+ if phase1:
+ if self._labels or self._unlabels:
+ self.setLabels(item)
+ if self._review:
+ self.addReview(item)
+ if self._check:
+ check_errors = self.updateCheck(item)
+ # TODO (felix): We could use this mechanism to
+ # also report back errors from label and review
+ # actions
+ if check_errors:
+ item.current_build_set.warning_messages.extend(
+ check_errors
+ )
+ errors_received = True
+ if self._create_comment or errors_received:
+ self.addPullComment(item)
+ if phase2 and self._merge:
try:
self.mergePull(item)
except Exception as e:
diff --git a/zuul/driver/gitlab/gitlabreporter.py b/zuul/driver/gitlab/gitlabreporter.py
index f364521f5..819c89c47 100644
--- a/zuul/driver/gitlab/gitlabreporter.py
+++ b/zuul/driver/gitlab/gitlabreporter.py
@@ -50,7 +50,7 @@ class GitlabReporter(BaseReporter):
if not isinstance(self._unlabels, list):
self._unlabels = [self._unlabels]
- def report(self, item):
+ def report(self, item, phase1=True, phase2=True):
"""Report on an event."""
if not isinstance(item.change.project.source, GitlabSource):
return
@@ -60,13 +60,14 @@ class GitlabReporter(BaseReporter):
return
if hasattr(item.change, 'number'):
- if self._create_comment:
- self.addMRComment(item)
- if self._approval is not None:
- self.setApproval(item)
- if self._labels or self._unlabels:
- self.setLabels(item)
- if self._merge:
+ if phase1:
+ if self._create_comment:
+ self.addMRComment(item)
+ if self._approval is not None:
+ self.setApproval(item)
+ if self._labels or self._unlabels:
+ self.setLabels(item)
+ if phase2 and self._merge:
self.mergeMR(item)
if not item.change.is_merged:
msg = self._formatItemReportMergeConflict(item)
diff --git a/zuul/driver/mqtt/mqttreporter.py b/zuul/driver/mqtt/mqttreporter.py
index a53389337..4090bb082 100644
--- a/zuul/driver/mqtt/mqttreporter.py
+++ b/zuul/driver/mqtt/mqttreporter.py
@@ -27,7 +27,9 @@ class MQTTReporter(BaseReporter):
name = 'mqtt'
log = logging.getLogger("zuul.MQTTReporter")
- def report(self, item):
+ def report(self, item, phase1=True, phase2=True):
+ if not phase1:
+ return
log = get_annotated_logger(self.log, item.event)
log.debug("Report change %s, params %s", item.change, self.config)
message = {
diff --git a/zuul/driver/pagure/pagurereporter.py b/zuul/driver/pagure/pagurereporter.py
index 918a31b6b..0bfdbc9b8 100644
--- a/zuul/driver/pagure/pagurereporter.py
+++ b/zuul/driver/pagure/pagurereporter.py
@@ -34,7 +34,7 @@ class PagureReporter(BaseReporter):
self._merge = self.config.get('merge', False)
self.context = "{}/{}".format(pipeline.tenant.name, pipeline.name)
- def report(self, item):
+ def report(self, item, phase1=True, phase2=True):
"""Report on an event."""
# If the source is not PagureSource we cannot report anything here.
@@ -47,17 +47,18 @@ class PagureReporter(BaseReporter):
self.connection.canonical_hostname:
return
- if self._commit_status is not None:
- if (hasattr(item.change, 'patchset') and
- item.change.patchset is not None):
- self.setCommitStatus(item)
- elif (hasattr(item.change, 'newrev') and
- item.change.newrev is not None):
- self.setCommitStatus(item)
- if hasattr(item.change, 'number'):
- if self._create_comment:
- self.addPullComment(item)
- if self._merge:
+ if phase1:
+ if self._commit_status is not None:
+ if (hasattr(item.change, 'patchset') and
+ item.change.patchset is not None):
+ self.setCommitStatus(item)
+ elif (hasattr(item.change, 'newrev') and
+ item.change.newrev is not None):
+ self.setCommitStatus(item)
+ if hasattr(item.change, 'number'):
+ if self._create_comment:
+ self.addPullComment(item)
+ if phase2 and self._merge:
self.mergePull(item)
if not item.change.is_merged:
msg = self._formatItemReportMergeConflict(item)
diff --git a/zuul/driver/smtp/smtpreporter.py b/zuul/driver/smtp/smtpreporter.py
index 58bd25cef..4815026ab 100644
--- a/zuul/driver/smtp/smtpreporter.py
+++ b/zuul/driver/smtp/smtpreporter.py
@@ -25,8 +25,10 @@ class SMTPReporter(BaseReporter):
name = 'smtp'
log = logging.getLogger("zuul.SMTPReporter")
- def report(self, item):
+ def report(self, item, phase1=True, phase2=True):
"""Send the compiled report message via smtp."""
+ if not phase1:
+ return
log = get_annotated_logger(self.log, item.event)
message = self._formatItemReport(item)
diff --git a/zuul/manager/__init__.py b/zuul/manager/__init__.py
index 6fc4c86d8..1e9f57cd1 100644
--- a/zuul/manager/__init__.py
+++ b/zuul/manager/__init__.py
@@ -341,18 +341,19 @@ class PipelineManager(metaclass=ABCMeta):
self.reportNormalBuildsetEnd(item.current_build_set, 'dequeue',
final=False)
- def sendReport(self, action_reporters, item, message=None):
+ def sendReport(self, action_reporters, item, phase1=True, phase2=True):
"""Sends the built message off to configured reporters.
- Takes the action_reporters, item, message and extra options and
- sends them to the pluggable reporters.
+ Takes the action_reporters and item and sends them to the
+ pluggable reporters.
+
"""
log = get_annotated_logger(self.log, item.event)
report_errors = []
if len(action_reporters) > 0:
for reporter in action_reporters:
try:
- ret = reporter.report(item)
+ ret = reporter.report(item, phase1=phase1, phase2=phase2)
if ret:
report_errors.append(ret)
except Exception as e:
@@ -1567,8 +1568,23 @@ class PipelineManager(metaclass=ABCMeta):
item.bundle.started_reporting = can_report
if can_report:
+ # If we're starting to report a successful bundle, enable
+ # two-phase reporting. Report the first phase for every item
+ # in the bundle, then the second.
+ phase1 = True
+ phase2 = True
+ if (self.changes_merge
+ and item.bundle
+ and (not item.cannotMergeBundle())
+ and (not item.isBundleFailing())):
+ for i in item.bundle.items:
+ if not i.reported:
+ self.log.debug("Report phase1 for bundle item %s", i)
+ self.reportItem(i, phase1=True, phase2=False)
+ phase1 = False
+
try:
- self.reportItem(item)
+ self.reportItem(item, phase1=phase1, phase2=phase2)
except exceptions.MergeFailure:
failing_reasons.append("it did not merge")
for item_behind in item.items_behind:
@@ -1576,7 +1592,7 @@ class PipelineManager(metaclass=ABCMeta):
"item ahead, %s, failed to merge" %
(item_behind.change, item))
self.cancelJobs(item_behind)
- # Only re-reported items in the cycle when we encounter a merge
+ # Only re-report items in the cycle when we encounter a merge
# failure for a successful bundle.
if (item.bundle and not (
item.isBundleFailing() or item.cannotMergeBundle())):
@@ -1884,13 +1900,19 @@ class PipelineManager(metaclass=ABCMeta):
"with nodes %s",
request, request.job_name, build_set.item, request.nodes)
- def reportItem(self, item):
+ def reportItem(self, item, phase1=True, phase2=True):
log = get_annotated_logger(self.log, item.event)
action = None
- if not item.reported:
- action, reported = self._reportItem(item)
+
+ already_reported = item.reported
+ if phase2 and not phase1:
+ already_reported = False
+ if not already_reported:
+ action, reported = self._reportItem(item, phase1, phase2)
item.updateAttributes(self.current_context,
reported=reported)
+ if not phase2:
+ return
if self.changes_merge:
succeeded = item.didAllJobsSucceed() and not item.isBundleFailing()
merged = item.reported
@@ -1914,7 +1936,7 @@ class PipelineManager(metaclass=ABCMeta):
error_reason = "failed tests"
else:
error_reason = "failed to merge"
- log.info("Reported change %s did not merge because it %s,"
+ log.info("Reported change %s did not merge because it %s, "
"status: all-succeeded: %s, merged: %s",
item.change, error_reason, succeeded, merged)
if not succeeded:
@@ -1939,9 +1961,10 @@ class PipelineManager(metaclass=ABCMeta):
self.reportNormalBuildsetEnd(item.current_build_set,
action, final=True)
- def _reportItem(self, item):
+ def _reportItem(self, item, phase1, phase2):
log = get_annotated_logger(self.log, item.event)
- log.debug("Reporting change %s", item.change)
+ log.debug("Reporting phase1: %s phase2: %s change: %s",
+ phase1, phase2, item.change)
ret = True # Means error as returned by trigger.report
# In the case of failure, we may not have completed an initial
@@ -2037,7 +2060,7 @@ class PipelineManager(metaclass=ABCMeta):
self.current_context, disabled=True)
if actions:
log.info("Reporting item %s, actions: %s", item, actions)
- ret = self.sendReport(actions, item)
+ ret = self.sendReport(actions, item, phase1, phase2)
if ret:
log.error("Reporting item %s received: %s", item, ret)
return action, (not ret)
diff --git a/zuul/reporter/__init__.py b/zuul/reporter/__init__.py
index 93b218519..513220f4b 100644
--- a/zuul/reporter/__init__.py
+++ b/zuul/reporter/__init__.py
@@ -35,8 +35,19 @@ class BaseReporter(object, metaclass=abc.ABCMeta):
self._action = action
@abc.abstractmethod
- def report(self, item):
- """Send the compiled report message."""
+ def report(self, item, phase1=True, phase2=True):
+ """Send the compiled report message
+
+ Two-phase reporting may be enabled if one or the other of the
+ `phase1` or `phase2` arguments is False.
+
+ Phase1 should report everything except the actual merge action.
+ Phase2 should report only the merge action.
+
+ :arg phase1 bool: Whether to enable phase1 reporting
+ :arg phase2 bool: Whether to enable phase2 reporting
+
+ """
def getSubmitAllowNeeds(self):
"""Get a list of code review labels that are allowed to be