diff options
-rwxr-xr-x | tests/base.py | 12 | ||||
-rwxr-xr-x | tests/test_scheduler.py | 124 | ||||
-rw-r--r-- | zuul/scheduler.py | 98 | ||||
-rw-r--r-- | zuul/trigger/gerrit.py | 57 |
4 files changed, 246 insertions, 45 deletions
diff --git a/tests/base.py b/tests/base.py index b872a8581..773f92653 100755 --- a/tests/base.py +++ b/tests/base.py @@ -418,11 +418,15 @@ class FakeGerrit(object): return {} def simpleQuery(self, query): - # This is currently only used to return all open changes for a - # project self.queries.append(query) - l = [change.query() for change in self.changes.values()] - l.append({"type": "stats", "rowCount": 1, "runTimeMilliseconds": 3}) + if query.startswith('change:'): + # Query a specific changeid + changeid = query[len('change:'):] + l = [change.query() for change in self.changes.values() + if change.data['id'] == changeid] + else: + # Query all open changes + l = [change.query() for change in self.changes.values()] return l def startWatching(self, *args, **kw): diff --git a/tests/test_scheduler.py b/tests/test_scheduler.py index 1a86c6aca..3837cfac5 100755 --- a/tests/test_scheduler.py +++ b/tests/test_scheduler.py @@ -2890,3 +2890,127 @@ For CI problems and help debugging, contact ci@example.org""" self.getJobFromHistory('experimental-project-test').result, 'SUCCESS') self.assertEqual(A.reported, 1) + + def test_crd_gate(self): + "Test cross-repo dependencies" + A = self.fake_gerrit.addFakeChange('org/project1', 'master', 'A') + B = self.fake_gerrit.addFakeChange('org/project2', 'master', 'B') + A.addApproval('CRVW', 2) + B.addApproval('CRVW', 2) + + AM2 = self.fake_gerrit.addFakeChange('org/project1', 'master', 'AM2') + AM1 = self.fake_gerrit.addFakeChange('org/project1', 'master', 'AM1') + AM2.setMerged() + AM1.setMerged() + + BM2 = self.fake_gerrit.addFakeChange('org/project2', 'master', 'BM2') + BM1 = self.fake_gerrit.addFakeChange('org/project2', 'master', 'BM1') + BM2.setMerged() + BM1.setMerged() + + # A -> AM1 -> AM2 + # B -> BM1 -> BM2 + # A Depends-On: B + # M2 is here to make sure it is never queried. If it is, it + # means zuul is walking down the entire history of merged + # changes. + + B.setDependsOn(BM1, 1) + BM1.setDependsOn(BM2, 1) + + A.setDependsOn(AM1, 1) + AM1.setDependsOn(AM2, 1) + + A.data['commitMessage'] = '%s\n\nDepends-On: %s\n' % ( + A.subject, B.data['id']) + + self.fake_gerrit.addEvent(A.addApproval('APRV', 1)) + self.waitUntilSettled() + + self.assertEqual(A.data['status'], 'NEW') + self.assertEqual(B.data['status'], 'NEW') + + source = self.sched.layout.pipelines['gate'].source + source.maintainCache([]) + + self.worker.hold_jobs_in_build = True + B.addApproval('APRV', 1) + self.fake_gerrit.addEvent(A.addApproval('APRV', 1)) + self.waitUntilSettled() + + self.worker.release('.*-merge') + self.waitUntilSettled() + self.worker.release('.*-merge') + self.waitUntilSettled() + self.worker.hold_jobs_in_build = False + self.worker.release() + self.waitUntilSettled() + + self.assertEqual(AM2.queried, 0) + self.assertEqual(BM2.queried, 0) + self.assertEqual(A.data['status'], 'MERGED') + self.assertEqual(B.data['status'], 'MERGED') + self.assertEqual(A.reported, 2) + self.assertEqual(B.reported, 2) + + self.assertEqual(self.history[-1].changes, '2,1 1,1') + + def test_crd_unshared_gate(self): + "Test cross-repo dependencies in unshared gate queues" + A = self.fake_gerrit.addFakeChange('org/project1', 'master', 'A') + B = self.fake_gerrit.addFakeChange('org/project', 'master', 'B') + A.addApproval('CRVW', 2) + B.addApproval('CRVW', 2) + + # A Depends-On: B + A.data['commitMessage'] = '%s\n\nDepends-On: %s\n' % ( + A.subject, B.data['id']) + + # A and B do not share a queue, make sure that A is unable to + # enqueue B (and therefore, A is unable to be enqueued). + B.addApproval('APRV', 1) + self.fake_gerrit.addEvent(A.addApproval('APRV', 1)) + self.waitUntilSettled() + + self.assertEqual(A.data['status'], 'NEW') + self.assertEqual(B.data['status'], 'NEW') + self.assertEqual(A.reported, 0) + self.assertEqual(B.reported, 0) + self.assertEqual(len(self.history), 0) + + # Enqueue and merge B alone. + self.fake_gerrit.addEvent(B.addApproval('APRV', 1)) + self.waitUntilSettled() + + self.assertEqual(B.data['status'], 'MERGED') + self.assertEqual(B.reported, 2) + + # Now that B is merged, A should be able to be enqueued and + # merged. + self.fake_gerrit.addEvent(A.addApproval('APRV', 1)) + self.waitUntilSettled() + + self.assertEqual(A.data['status'], 'MERGED') + self.assertEqual(A.reported, 2) + + def test_crd_cycle(self): + "Test cross-repo dependency cycles" + A = self.fake_gerrit.addFakeChange('org/project1', 'master', 'A') + B = self.fake_gerrit.addFakeChange('org/project2', 'master', 'B') + A.addApproval('CRVW', 2) + B.addApproval('CRVW', 2) + + # A -> B -> A (via commit-depends) + + A.data['commitMessage'] = '%s\n\nDepends-On: %s\n' % ( + A.subject, B.data['id']) + B.data['commitMessage'] = '%s\n\nDepends-On: %s\n' % ( + B.subject, A.data['id']) + + self.fake_gerrit.addEvent(A.addApproval('APRV', 1)) + self.waitUntilSettled() + + self.assertEqual(A.reported, 0) + self.assertEqual(B.reported, 0) + self.assertEqual(A.data['status'], 'NEW') + self.assertEqual(B.data['status'], 'NEW') diff --git a/zuul/scheduler.py b/zuul/scheduler.py index 44fbb042f..3c09e0a92 100644 --- a/zuul/scheduler.py +++ b/zuul/scheduler.py @@ -1064,10 +1064,12 @@ class BasePipelineManager(object): def isChangeReadyToBeEnqueued(self, change): return True - def enqueueChangesAhead(self, change, quiet, ignore_requirements): + def enqueueChangesAhead(self, change, quiet, ignore_requirements, + change_queue): return True - def enqueueChangesBehind(self, change, quiet, ignore_requirements): + def enqueueChangesBehind(self, change, quiet, ignore_requirements, + change_queue): return True def checkForChangesNeededBy(self, change): @@ -1126,7 +1128,7 @@ class BasePipelineManager(object): return False def addChange(self, change, quiet=False, enqueue_time=None, - ignore_requirements=False): + ignore_requirements=False, change_queue=None): self.log.debug("Considering adding change %s" % change) if self.isChangeAlreadyInQueue(change): self.log.debug("Change %s is already in queue, ignoring" % change) @@ -1144,7 +1146,16 @@ class BasePipelineManager(object): "requirement %s" % (change, f)) return False - if not self.enqueueChangesAhead(change, quiet, ignore_requirements): + if not change_queue: + change_queue = self.getChangeQueue(change) + if not change_queue: + self.log.debug("Unable to find change queue for " + "change %s in project %s" % + (change, change.project)) + return False + + if not self.enqueueChangesAhead(change, quiet, ignore_requirements, + change_queue): self.log.debug("Failed to enqueue changes ahead of %s" % change) return False @@ -1152,26 +1163,20 @@ class BasePipelineManager(object): self.log.debug("Change %s is already in queue, ignoring" % change) return True - change_queue = self.pipeline.getQueue(change.project) - if change_queue: - self.log.debug("Adding change %s to queue %s" % - (change, change_queue)) - if not quiet: - if len(self.pipeline.start_actions) > 0: - self.reportStart(change) - item = change_queue.enqueueChange(change) - if enqueue_time: - item.enqueue_time = enqueue_time - self.reportStats(item) - self.enqueueChangesBehind(change, quiet, ignore_requirements) - self.sched.triggers['zuul'].onChangeEnqueued(item.change, - self.pipeline) - return True - else: - self.log.debug("Unable to find change queue for " - "change %s in project %s" % - (change, change.project)) - return False + self.log.debug("Adding change %s to queue %s" % + (change, change_queue)) + if not quiet: + if len(self.pipeline.start_actions) > 0: + self.reportStart(change) + item = change_queue.enqueueChange(change) + if enqueue_time: + item.enqueue_time = enqueue_time + self.reportStats(item) + self.enqueueChangesBehind(change, quiet, ignore_requirements, + change_queue) + self.sched.triggers['zuul'].onChangeEnqueued(item.change, + self.pipeline) + return True def dequeueItem(self, item): self.log.debug("Removing change %s from queue" % item.change) @@ -1694,6 +1699,9 @@ class IndependentPipelineManager(BasePipelineManager): self.pipeline.addQueue(change_queue) + def getChangeQueue(self, change): + return self.pipeline.getQueue(change.project) + class DependentPipelineManager(BasePipelineManager): log = logging.getLogger("zuul.DependentPipelineManager") @@ -1754,6 +1762,9 @@ class DependentPipelineManager(BasePipelineManager): new_change_queues.append(a) return new_change_queues + def getChangeQueue(self, change): + return self.pipeline.getQueue(change.project) + def isChangeReadyToBeEnqueued(self, change): if not self.pipeline.source.canMerge(change, self.getSubmitAllowNeeds()): @@ -1761,34 +1772,46 @@ class DependentPipelineManager(BasePipelineManager): return False return True - def enqueueChangesBehind(self, change, quiet, ignore_requirements): + def enqueueChangesBehind(self, change, quiet, ignore_requirements, + change_queue): to_enqueue = [] self.log.debug("Checking for changes needing %s:" % change) if not hasattr(change, 'needed_by_changes'): self.log.debug(" Changeish does not support dependencies") return - for needs in change.needed_by_changes: - if self.pipeline.source.canMerge(needs, + for other_change in change.needed_by_changes: + other_change_queue = self.getChangeQueue(other_change) + if other_change_queue != change_queue: + self.log.debug(" Change %s in project %s can not be enqueued " + "in the target queue %s" % + (other_change, other_change.project, + change_queue)) + continue + if self.pipeline.source.canMerge(other_change, self.getSubmitAllowNeeds()): self.log.debug(" Change %s needs %s and is ready to merge" % - (needs, change)) - to_enqueue.append(needs) + (other_change, change)) + to_enqueue.append(other_change) + if not to_enqueue: self.log.debug(" No changes need %s" % change) for other_change in to_enqueue: self.addChange(other_change, quiet=quiet, - ignore_requirements=ignore_requirements) + ignore_requirements=ignore_requirements, + change_queue=change_queue) - def enqueueChangesAhead(self, change, quiet, ignore_requirements): + def enqueueChangesAhead(self, change, quiet, ignore_requirements, + change_queue): ret = self.checkForChangesNeededBy(change) if ret in [True, False]: return ret - self.log.debug(" Change %s must be merged ahead of %s" % + self.log.debug(" Changes %s must be merged ahead of %s" % (ret, change)) for needed_change in ret: r = self.addChange(needed_change, quiet=quiet, - ignore_requirements=ignore_requirements) + ignore_requirements=ignore_requirements, + change_queue=change_queue) if not r: return False return True @@ -1804,13 +1827,20 @@ class DependentPipelineManager(BasePipelineManager): self.log.debug(" No changes needed") return True changes_needed = [] - # TODO (jeblair): this is only correct for a list of 1 element + change_queue = self.getChangeQueue(change) for needed_change in change.needs_changes: self.log.debug(" Change %s needs change %s:" % ( change, needed_change)) if needed_change.is_merged: self.log.debug(" Needed change is merged") continue + needed_change_queue = self.getChangeQueue(needed_change) + if needed_change_queue != change_queue: + self.log.debug(" Change %s in project %s does not share a " + "change queue with %s in project %s" % + (needed_change, needed_change.project, + change, change.project)) + return False if not needed_change.is_current_patchset: self.log.debug(" Needed change is not the current patchset") return False diff --git a/zuul/trigger/gerrit.py b/zuul/trigger/gerrit.py index 0a931f0bf..c5fdf9af5 100644 --- a/zuul/trigger/gerrit.py +++ b/zuul/trigger/gerrit.py @@ -13,6 +13,7 @@ # under the License. import logging +import re import threading import time import urllib2 @@ -93,7 +94,6 @@ class GerritEventConnector(threading.Thread): refresh=True) self.sched.addEvent(event) - self.gerrit.eventDone() def run(self): while True: @@ -103,6 +103,8 @@ class GerritEventConnector(threading.Thread): self._handleEvent() except: self.log.exception("Exception moving Gerrit event:") + finally: + self.gerrit.eventDone() class Gerrit(object): @@ -111,6 +113,9 @@ class Gerrit(object): replication_timeout = 300 replication_retry_interval = 5 + depends_on_re = re.compile(r"^Depends-On: (I[0-9a-f]{40})\s*$", + re.MULTILINE | re.IGNORECASE) + def __init__(self, config, sched): self._change_cache = {} self.sched = sched @@ -304,7 +309,7 @@ class Gerrit(object): change = NullChange(project) return change - def _getChange(self, number, patchset, refresh=False): + def _getChange(self, number, patchset, refresh=False, history=None): key = '%s,%s' % (number, patchset) change = None if key in self._change_cache: @@ -318,7 +323,7 @@ class Gerrit(object): key = '%s,%s' % (change.number, change.patchset) self._change_cache[key] = change try: - self.updateChange(change) + self.updateChange(change, history) except Exception: del self._change_cache[key] raise @@ -342,7 +347,22 @@ class Gerrit(object): (record.get('number'),)) return changes - def updateChange(self, change): + def _getDependsOnFromCommit(self, message): + records = [] + seen = set() + for match in self.depends_on_re.findall(message): + if match in seen: + self.log.debug("Ignoring duplicate Depends-On: %s" % + (match,)) + continue + seen.add(match) + query = "change:%s" % (match,) + self.log.debug("Running query %s to find needed changes" % + (query,)) + records.extend(self.gerrit.simpleQuery(query)) + return records + + def updateChange(self, change, history=None): self.log.info("Updating information for %s,%s" % (change.number, change.patchset)) data = self.gerrit.query(change.number) @@ -382,12 +402,35 @@ class Gerrit(object): # for dependencies. return change + if history is None: + history = [] + else: + history = history[:] + history.append(change.number) + change.needs_changes = [] if 'dependsOn' in data: parts = data['dependsOn'][0]['ref'].split('/') dep_num, dep_ps = parts[3], parts[4] - dep = self._getChange(dep_num, dep_ps) - if not dep.is_merged: + if dep_num in history: + raise Exception("Dependency cycle detected: %s in %s" % ( + dep_num, history)) + self.log.debug("Getting git-dependent change %s,%s" % + (dep_num, dep_ps)) + dep = self._getChange(dep_num, dep_ps, history=history) + if (not dep.is_merged) and dep not in change.needs_changes: + change.needs_changes.append(dep) + + for record in self._getDependsOnFromCommit(data['commitMessage']): + dep_num = record['number'] + dep_ps = record['currentPatchSet']['number'] + if dep_num in history: + raise Exception("Dependency cycle detected: %s in %s" % ( + dep_num, history)) + self.log.debug("Getting commit-dependent change %s,%s" % + (dep_num, dep_ps)) + dep = self._getChange(dep_num, dep_ps, history=history) + if (not dep.is_merged) and dep not in change.needs_changes: change.needs_changes.append(dep) change.needed_by_changes = [] @@ -396,7 +439,7 @@ class Gerrit(object): parts = needed['ref'].split('/') dep_num, dep_ps = parts[3], parts[4] dep = self._getChange(dep_num, dep_ps) - if not dep.is_merged and dep.is_current_patchset: + if (not dep.is_merged) and dep.is_current_patchset: change.needed_by_changes.append(dep) return change |