summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJames E. Blair <jeblair@hp.com>2014-12-30 10:12:29 -0800
committerJames E. Blair <jeblair@hp.com>2015-02-04 10:07:43 -0800
commit5ee24256518bc79af102176448182f5d26967e0f (patch)
treefc51fb13d312e6bf4b61cd60fb6378b8253f3df6
parent063672f8d3cf22bb90cff5bb9144b7f5d0fc029f (diff)
downloadzuul-5ee24256518bc79af102176448182f5d26967e0f.tar.gz
Support cross-repo-dependencies in dependent pipelines
Parse commit messages for "Depends-On: <changeid>" and treat matching changes as changes that the given change depends on. This will treat any changes in any branch of any project as such. If the projects share a dependent change queue, the changes will be enqueued in order. If they do not share a change queue in a dependent pipeline, then the latter one will be unable to be enqueued until the change it depends on merges. If the dependencies result in a cycle, Zuul will log the error but otherwise the problematic changes will be ignored. Dependent changes in independent pipelines are not yet addressed. Change-Id: I90c173f86d11e6c44d1f408646589b7c75b1cd52
-rwxr-xr-xtests/base.py12
-rwxr-xr-xtests/test_scheduler.py124
-rw-r--r--zuul/scheduler.py98
-rw-r--r--zuul/trigger/gerrit.py57
4 files changed, 246 insertions, 45 deletions
diff --git a/tests/base.py b/tests/base.py
index b872a8581..773f92653 100755
--- a/tests/base.py
+++ b/tests/base.py
@@ -418,11 +418,15 @@ class FakeGerrit(object):
return {}
def simpleQuery(self, query):
- # This is currently only used to return all open changes for a
- # project
self.queries.append(query)
- l = [change.query() for change in self.changes.values()]
- l.append({"type": "stats", "rowCount": 1, "runTimeMilliseconds": 3})
+ if query.startswith('change:'):
+ # Query a specific changeid
+ changeid = query[len('change:'):]
+ l = [change.query() for change in self.changes.values()
+ if change.data['id'] == changeid]
+ else:
+ # Query all open changes
+ l = [change.query() for change in self.changes.values()]
return l
def startWatching(self, *args, **kw):
diff --git a/tests/test_scheduler.py b/tests/test_scheduler.py
index 1a86c6aca..3837cfac5 100755
--- a/tests/test_scheduler.py
+++ b/tests/test_scheduler.py
@@ -2890,3 +2890,127 @@ For CI problems and help debugging, contact ci@example.org"""
self.getJobFromHistory('experimental-project-test').result,
'SUCCESS')
self.assertEqual(A.reported, 1)
+
+ def test_crd_gate(self):
+ "Test cross-repo dependencies"
+ A = self.fake_gerrit.addFakeChange('org/project1', 'master', 'A')
+ B = self.fake_gerrit.addFakeChange('org/project2', 'master', 'B')
+ A.addApproval('CRVW', 2)
+ B.addApproval('CRVW', 2)
+
+ AM2 = self.fake_gerrit.addFakeChange('org/project1', 'master', 'AM2')
+ AM1 = self.fake_gerrit.addFakeChange('org/project1', 'master', 'AM1')
+ AM2.setMerged()
+ AM1.setMerged()
+
+ BM2 = self.fake_gerrit.addFakeChange('org/project2', 'master', 'BM2')
+ BM1 = self.fake_gerrit.addFakeChange('org/project2', 'master', 'BM1')
+ BM2.setMerged()
+ BM1.setMerged()
+
+ # A -> AM1 -> AM2
+ # B -> BM1 -> BM2
+ # A Depends-On: B
+ # M2 is here to make sure it is never queried. If it is, it
+ # means zuul is walking down the entire history of merged
+ # changes.
+
+ B.setDependsOn(BM1, 1)
+ BM1.setDependsOn(BM2, 1)
+
+ A.setDependsOn(AM1, 1)
+ AM1.setDependsOn(AM2, 1)
+
+ A.data['commitMessage'] = '%s\n\nDepends-On: %s\n' % (
+ A.subject, B.data['id'])
+
+ self.fake_gerrit.addEvent(A.addApproval('APRV', 1))
+ self.waitUntilSettled()
+
+ self.assertEqual(A.data['status'], 'NEW')
+ self.assertEqual(B.data['status'], 'NEW')
+
+ source = self.sched.layout.pipelines['gate'].source
+ source.maintainCache([])
+
+ self.worker.hold_jobs_in_build = True
+ B.addApproval('APRV', 1)
+ self.fake_gerrit.addEvent(A.addApproval('APRV', 1))
+ self.waitUntilSettled()
+
+ self.worker.release('.*-merge')
+ self.waitUntilSettled()
+ self.worker.release('.*-merge')
+ self.waitUntilSettled()
+ self.worker.hold_jobs_in_build = False
+ self.worker.release()
+ self.waitUntilSettled()
+
+ self.assertEqual(AM2.queried, 0)
+ self.assertEqual(BM2.queried, 0)
+ self.assertEqual(A.data['status'], 'MERGED')
+ self.assertEqual(B.data['status'], 'MERGED')
+ self.assertEqual(A.reported, 2)
+ self.assertEqual(B.reported, 2)
+
+ self.assertEqual(self.history[-1].changes, '2,1 1,1')
+
+ def test_crd_unshared_gate(self):
+ "Test cross-repo dependencies in unshared gate queues"
+ A = self.fake_gerrit.addFakeChange('org/project1', 'master', 'A')
+ B = self.fake_gerrit.addFakeChange('org/project', 'master', 'B')
+ A.addApproval('CRVW', 2)
+ B.addApproval('CRVW', 2)
+
+ # A Depends-On: B
+ A.data['commitMessage'] = '%s\n\nDepends-On: %s\n' % (
+ A.subject, B.data['id'])
+
+ # A and B do not share a queue, make sure that A is unable to
+ # enqueue B (and therefore, A is unable to be enqueued).
+ B.addApproval('APRV', 1)
+ self.fake_gerrit.addEvent(A.addApproval('APRV', 1))
+ self.waitUntilSettled()
+
+ self.assertEqual(A.data['status'], 'NEW')
+ self.assertEqual(B.data['status'], 'NEW')
+ self.assertEqual(A.reported, 0)
+ self.assertEqual(B.reported, 0)
+ self.assertEqual(len(self.history), 0)
+
+ # Enqueue and merge B alone.
+ self.fake_gerrit.addEvent(B.addApproval('APRV', 1))
+ self.waitUntilSettled()
+
+ self.assertEqual(B.data['status'], 'MERGED')
+ self.assertEqual(B.reported, 2)
+
+ # Now that B is merged, A should be able to be enqueued and
+ # merged.
+ self.fake_gerrit.addEvent(A.addApproval('APRV', 1))
+ self.waitUntilSettled()
+
+ self.assertEqual(A.data['status'], 'MERGED')
+ self.assertEqual(A.reported, 2)
+
+ def test_crd_cycle(self):
+ "Test cross-repo dependency cycles"
+ A = self.fake_gerrit.addFakeChange('org/project1', 'master', 'A')
+ B = self.fake_gerrit.addFakeChange('org/project2', 'master', 'B')
+ A.addApproval('CRVW', 2)
+ B.addApproval('CRVW', 2)
+
+ # A -> B -> A (via commit-depends)
+
+ A.data['commitMessage'] = '%s\n\nDepends-On: %s\n' % (
+ A.subject, B.data['id'])
+ B.data['commitMessage'] = '%s\n\nDepends-On: %s\n' % (
+ B.subject, A.data['id'])
+
+ self.fake_gerrit.addEvent(A.addApproval('APRV', 1))
+ self.waitUntilSettled()
+
+ self.assertEqual(A.reported, 0)
+ self.assertEqual(B.reported, 0)
+ self.assertEqual(A.data['status'], 'NEW')
+ self.assertEqual(B.data['status'], 'NEW')
diff --git a/zuul/scheduler.py b/zuul/scheduler.py
index 44fbb042f..3c09e0a92 100644
--- a/zuul/scheduler.py
+++ b/zuul/scheduler.py
@@ -1064,10 +1064,12 @@ class BasePipelineManager(object):
def isChangeReadyToBeEnqueued(self, change):
return True
- def enqueueChangesAhead(self, change, quiet, ignore_requirements):
+ def enqueueChangesAhead(self, change, quiet, ignore_requirements,
+ change_queue):
return True
- def enqueueChangesBehind(self, change, quiet, ignore_requirements):
+ def enqueueChangesBehind(self, change, quiet, ignore_requirements,
+ change_queue):
return True
def checkForChangesNeededBy(self, change):
@@ -1126,7 +1128,7 @@ class BasePipelineManager(object):
return False
def addChange(self, change, quiet=False, enqueue_time=None,
- ignore_requirements=False):
+ ignore_requirements=False, change_queue=None):
self.log.debug("Considering adding change %s" % change)
if self.isChangeAlreadyInQueue(change):
self.log.debug("Change %s is already in queue, ignoring" % change)
@@ -1144,7 +1146,16 @@ class BasePipelineManager(object):
"requirement %s" % (change, f))
return False
- if not self.enqueueChangesAhead(change, quiet, ignore_requirements):
+ if not change_queue:
+ change_queue = self.getChangeQueue(change)
+ if not change_queue:
+ self.log.debug("Unable to find change queue for "
+ "change %s in project %s" %
+ (change, change.project))
+ return False
+
+ if not self.enqueueChangesAhead(change, quiet, ignore_requirements,
+ change_queue):
self.log.debug("Failed to enqueue changes ahead of %s" % change)
return False
@@ -1152,26 +1163,20 @@ class BasePipelineManager(object):
self.log.debug("Change %s is already in queue, ignoring" % change)
return True
- change_queue = self.pipeline.getQueue(change.project)
- if change_queue:
- self.log.debug("Adding change %s to queue %s" %
- (change, change_queue))
- if not quiet:
- if len(self.pipeline.start_actions) > 0:
- self.reportStart(change)
- item = change_queue.enqueueChange(change)
- if enqueue_time:
- item.enqueue_time = enqueue_time
- self.reportStats(item)
- self.enqueueChangesBehind(change, quiet, ignore_requirements)
- self.sched.triggers['zuul'].onChangeEnqueued(item.change,
- self.pipeline)
- return True
- else:
- self.log.debug("Unable to find change queue for "
- "change %s in project %s" %
- (change, change.project))
- return False
+ self.log.debug("Adding change %s to queue %s" %
+ (change, change_queue))
+ if not quiet:
+ if len(self.pipeline.start_actions) > 0:
+ self.reportStart(change)
+ item = change_queue.enqueueChange(change)
+ if enqueue_time:
+ item.enqueue_time = enqueue_time
+ self.reportStats(item)
+ self.enqueueChangesBehind(change, quiet, ignore_requirements,
+ change_queue)
+ self.sched.triggers['zuul'].onChangeEnqueued(item.change,
+ self.pipeline)
+ return True
def dequeueItem(self, item):
self.log.debug("Removing change %s from queue" % item.change)
@@ -1694,6 +1699,9 @@ class IndependentPipelineManager(BasePipelineManager):
self.pipeline.addQueue(change_queue)
+ def getChangeQueue(self, change):
+ return self.pipeline.getQueue(change.project)
+
class DependentPipelineManager(BasePipelineManager):
log = logging.getLogger("zuul.DependentPipelineManager")
@@ -1754,6 +1762,9 @@ class DependentPipelineManager(BasePipelineManager):
new_change_queues.append(a)
return new_change_queues
+ def getChangeQueue(self, change):
+ return self.pipeline.getQueue(change.project)
+
def isChangeReadyToBeEnqueued(self, change):
if not self.pipeline.source.canMerge(change,
self.getSubmitAllowNeeds()):
@@ -1761,34 +1772,46 @@ class DependentPipelineManager(BasePipelineManager):
return False
return True
- def enqueueChangesBehind(self, change, quiet, ignore_requirements):
+ def enqueueChangesBehind(self, change, quiet, ignore_requirements,
+ change_queue):
to_enqueue = []
self.log.debug("Checking for changes needing %s:" % change)
if not hasattr(change, 'needed_by_changes'):
self.log.debug(" Changeish does not support dependencies")
return
- for needs in change.needed_by_changes:
- if self.pipeline.source.canMerge(needs,
+ for other_change in change.needed_by_changes:
+ other_change_queue = self.getChangeQueue(other_change)
+ if other_change_queue != change_queue:
+ self.log.debug(" Change %s in project %s can not be enqueued "
+ "in the target queue %s" %
+ (other_change, other_change.project,
+ change_queue))
+ continue
+ if self.pipeline.source.canMerge(other_change,
self.getSubmitAllowNeeds()):
self.log.debug(" Change %s needs %s and is ready to merge" %
- (needs, change))
- to_enqueue.append(needs)
+ (other_change, change))
+ to_enqueue.append(other_change)
+
if not to_enqueue:
self.log.debug(" No changes need %s" % change)
for other_change in to_enqueue:
self.addChange(other_change, quiet=quiet,
- ignore_requirements=ignore_requirements)
+ ignore_requirements=ignore_requirements,
+ change_queue=change_queue)
- def enqueueChangesAhead(self, change, quiet, ignore_requirements):
+ def enqueueChangesAhead(self, change, quiet, ignore_requirements,
+ change_queue):
ret = self.checkForChangesNeededBy(change)
if ret in [True, False]:
return ret
- self.log.debug(" Change %s must be merged ahead of %s" %
+ self.log.debug(" Changes %s must be merged ahead of %s" %
(ret, change))
for needed_change in ret:
r = self.addChange(needed_change, quiet=quiet,
- ignore_requirements=ignore_requirements)
+ ignore_requirements=ignore_requirements,
+ change_queue=change_queue)
if not r:
return False
return True
@@ -1804,13 +1827,20 @@ class DependentPipelineManager(BasePipelineManager):
self.log.debug(" No changes needed")
return True
changes_needed = []
- # TODO (jeblair): this is only correct for a list of 1 element
+ change_queue = self.getChangeQueue(change)
for needed_change in change.needs_changes:
self.log.debug(" Change %s needs change %s:" % (
change, needed_change))
if needed_change.is_merged:
self.log.debug(" Needed change is merged")
continue
+ needed_change_queue = self.getChangeQueue(needed_change)
+ if needed_change_queue != change_queue:
+ self.log.debug(" Change %s in project %s does not share a "
+ "change queue with %s in project %s" %
+ (needed_change, needed_change.project,
+ change, change.project))
+ return False
if not needed_change.is_current_patchset:
self.log.debug(" Needed change is not the current patchset")
return False
diff --git a/zuul/trigger/gerrit.py b/zuul/trigger/gerrit.py
index 0a931f0bf..c5fdf9af5 100644
--- a/zuul/trigger/gerrit.py
+++ b/zuul/trigger/gerrit.py
@@ -13,6 +13,7 @@
# under the License.
import logging
+import re
import threading
import time
import urllib2
@@ -93,7 +94,6 @@ class GerritEventConnector(threading.Thread):
refresh=True)
self.sched.addEvent(event)
- self.gerrit.eventDone()
def run(self):
while True:
@@ -103,6 +103,8 @@ class GerritEventConnector(threading.Thread):
self._handleEvent()
except:
self.log.exception("Exception moving Gerrit event:")
+ finally:
+ self.gerrit.eventDone()
class Gerrit(object):
@@ -111,6 +113,9 @@ class Gerrit(object):
replication_timeout = 300
replication_retry_interval = 5
+ depends_on_re = re.compile(r"^Depends-On: (I[0-9a-f]{40})\s*$",
+ re.MULTILINE | re.IGNORECASE)
+
def __init__(self, config, sched):
self._change_cache = {}
self.sched = sched
@@ -304,7 +309,7 @@ class Gerrit(object):
change = NullChange(project)
return change
- def _getChange(self, number, patchset, refresh=False):
+ def _getChange(self, number, patchset, refresh=False, history=None):
key = '%s,%s' % (number, patchset)
change = None
if key in self._change_cache:
@@ -318,7 +323,7 @@ class Gerrit(object):
key = '%s,%s' % (change.number, change.patchset)
self._change_cache[key] = change
try:
- self.updateChange(change)
+ self.updateChange(change, history)
except Exception:
del self._change_cache[key]
raise
@@ -342,7 +347,22 @@ class Gerrit(object):
(record.get('number'),))
return changes
- def updateChange(self, change):
+ def _getDependsOnFromCommit(self, message):
+ records = []
+ seen = set()
+ for match in self.depends_on_re.findall(message):
+ if match in seen:
+ self.log.debug("Ignoring duplicate Depends-On: %s" %
+ (match,))
+ continue
+ seen.add(match)
+ query = "change:%s" % (match,)
+ self.log.debug("Running query %s to find needed changes" %
+ (query,))
+ records.extend(self.gerrit.simpleQuery(query))
+ return records
+
+ def updateChange(self, change, history=None):
self.log.info("Updating information for %s,%s" %
(change.number, change.patchset))
data = self.gerrit.query(change.number)
@@ -382,12 +402,35 @@ class Gerrit(object):
# for dependencies.
return change
+ if history is None:
+ history = []
+ else:
+ history = history[:]
+ history.append(change.number)
+
change.needs_changes = []
if 'dependsOn' in data:
parts = data['dependsOn'][0]['ref'].split('/')
dep_num, dep_ps = parts[3], parts[4]
- dep = self._getChange(dep_num, dep_ps)
- if not dep.is_merged:
+ if dep_num in history:
+ raise Exception("Dependency cycle detected: %s in %s" % (
+ dep_num, history))
+ self.log.debug("Getting git-dependent change %s,%s" %
+ (dep_num, dep_ps))
+ dep = self._getChange(dep_num, dep_ps, history=history)
+ if (not dep.is_merged) and dep not in change.needs_changes:
+ change.needs_changes.append(dep)
+
+ for record in self._getDependsOnFromCommit(data['commitMessage']):
+ dep_num = record['number']
+ dep_ps = record['currentPatchSet']['number']
+ if dep_num in history:
+ raise Exception("Dependency cycle detected: %s in %s" % (
+ dep_num, history))
+ self.log.debug("Getting commit-dependent change %s,%s" %
+ (dep_num, dep_ps))
+ dep = self._getChange(dep_num, dep_ps, history=history)
+ if (not dep.is_merged) and dep not in change.needs_changes:
change.needs_changes.append(dep)
change.needed_by_changes = []
@@ -396,7 +439,7 @@ class Gerrit(object):
parts = needed['ref'].split('/')
dep_num, dep_ps = parts[3], parts[4]
dep = self._getChange(dep_num, dep_ps)
- if not dep.is_merged and dep.is_current_patchset:
+ if (not dep.is_merged) and dep.is_current_patchset:
change.needed_by_changes.append(dep)
return change