summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJenkins <jenkins@review.openstack.org>2015-02-05 19:15:41 +0000
committerGerrit Code Review <review@openstack.org>2015-02-05 19:15:41 +0000
commitc8fd826c5276082c6a566c8cd9d2c20d51b000b1 (patch)
tree9cfef42726cfff9f3f7cba454db9046e42a2aac5
parent2f74bd78951cd7a7827aa82ee48cd576387dd1e3 (diff)
parentbfb8e049487d76acbf685762d6a4725f49670e5a (diff)
downloadzuul-c8fd826c5276082c6a566c8cd9d2c20d51b000b1.tar.gz
Merge "Support cross-repo-dependencies in independent pipelines"
-rwxr-xr-xtests/test_scheduler.py44
-rw-r--r--tests/test_zuultrigger.py2
-rw-r--r--zuul/model.py34
-rw-r--r--zuul/scheduler.py105
4 files changed, 152 insertions, 33 deletions
diff --git a/tests/test_scheduler.py b/tests/test_scheduler.py
index 3837cfac5..059f155f4 100755
--- a/tests/test_scheduler.py
+++ b/tests/test_scheduler.py
@@ -1921,7 +1921,7 @@ class TestScheduler(ZuulTestCase):
status_jobs = set()
for p in data['pipelines']:
for q in p['change_queues']:
- if q['dependent']:
+ if p['name'] in ['gate', 'conflict']:
self.assertEqual(q['window'], 20)
else:
self.assertEqual(q['window'], 0)
@@ -3014,3 +3014,45 @@ For CI problems and help debugging, contact ci@example.org"""
self.assertEqual(B.reported, 0)
self.assertEqual(A.data['status'], 'NEW')
self.assertEqual(B.data['status'], 'NEW')
+
+ def test_crd_check(self):
+ "Test cross-repo dependencies in independent pipelines"
+
+ self.gearman_server.hold_jobs_in_queue = True
+ A = self.fake_gerrit.addFakeChange('org/project1', 'master', 'A')
+ B = self.fake_gerrit.addFakeChange('org/project2', 'master', 'B')
+
+ # A Depends-On: B
+ A.data['commitMessage'] = '%s\n\nDepends-On: %s\n' % (
+ A.subject, B.data['id'])
+
+ self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1))
+ self.waitUntilSettled()
+
+ queue = self.gearman_server.getQueue()
+ ref = self.getParameter(queue[-1], 'ZUUL_REF')
+ self.gearman_server.hold_jobs_in_queue = False
+ self.gearman_server.release()
+ self.waitUntilSettled()
+
+ path = os.path.join(self.git_root, "org/project1")
+ repo = git.Repo(path)
+ repo_messages = [c.message.strip() for c in repo.iter_commits(ref)]
+ repo_messages.reverse()
+ correct_messages = ['initial commit', 'A-1']
+ self.assertEqual(repo_messages, correct_messages)
+
+ path = os.path.join(self.git_root, "org/project2")
+ repo = git.Repo(path)
+ repo_messages = [c.message.strip() for c in repo.iter_commits(ref)]
+ repo_messages.reverse()
+ correct_messages = ['initial commit', 'B-1']
+ self.assertEqual(repo_messages, correct_messages)
+
+ self.assertEqual(A.data['status'], 'NEW')
+ self.assertEqual(B.data['status'], 'NEW')
+ self.assertEqual(A.reported, 1)
+ self.assertEqual(B.reported, 0)
+
+ self.assertEqual(self.history[0].changes, '2,1 1,1')
+ self.assertEqual(len(self.sched.layout.pipelines['check'].queues), 0)
diff --git a/tests/test_zuultrigger.py b/tests/test_zuultrigger.py
index 3f339beff..a26fa8605 100644
--- a/tests/test_zuultrigger.py
+++ b/tests/test_zuultrigger.py
@@ -65,7 +65,7 @@ class TestZuulTrigger(ZuulTestCase):
for job in self.history:
if job.changes == '1,1':
self.assertEqual(job.name, 'project-gate')
- elif job.changes == '2,1':
+ elif job.changes == '1,1 2,1':
self.assertEqual(job.name, 'project-check')
elif job.changes == '1,1 3,1':
self.assertEqual(job.name, 'project-gate')
diff --git a/zuul/model.py b/zuul/model.py
index 3bba28467..2a69b7902 100644
--- a/zuul/model.py
+++ b/zuul/model.py
@@ -111,6 +111,9 @@ class Pipeline(object):
return queue
return None
+ def removeQueue(self, queue):
+ self.queues.remove(queue)
+
def getJobTree(self, project):
tree = self.job_trees.get(project)
return tree
@@ -148,6 +151,8 @@ class Pipeline(object):
return torun
def findJobsToRun(self, item):
+ if not item.live:
+ return []
tree = self.getJobTree(item.change.project)
if not tree:
return []
@@ -193,6 +198,8 @@ class Pipeline(object):
return False
def isHoldingFollowingChanges(self, item):
+ if not item.live:
+ return False
for job in self.getJobs(item.change):
if not job.hold_following_changes:
continue
@@ -256,7 +263,6 @@ class Pipeline(object):
j_queues.append(j_queue)
j_queue['heads'] = []
j_queue['window'] = queue.window
- j_queue['dependent'] = queue.dependent
j_changes = []
for e in queue.queue:
@@ -303,8 +309,8 @@ class ChangeQueue(object):
different projects; this is one of them. For instance, there may
a queue shared by interrelated projects foo and bar, and a second
queue for independent project baz. Pipelines have one or more
- PipelineQueues."""
- def __init__(self, pipeline, dependent=True, window=0, window_floor=1,
+ ChangeQueues."""
+ def __init__(self, pipeline, window=0, window_floor=1,
window_increase_type='linear', window_increase_factor=1,
window_decrease_type='exponential', window_decrease_factor=2):
self.pipeline = pipeline
@@ -314,7 +320,6 @@ class ChangeQueue(object):
self.projects = []
self._jobs = set()
self.queue = []
- self.dependent = dependent
self.window = window
self.window_floor = window_floor
self.window_increase_type = window_increase_type
@@ -348,14 +353,15 @@ class ChangeQueue(object):
self.name = self.assigned_name or self.generated_name
def enqueueChange(self, change):
- item = QueueItem(self.pipeline, change)
+ item = QueueItem(self, change)
self.enqueueItem(item)
item.enqueue_time = time.time()
return item
def enqueueItem(self, item):
item.pipeline = self.pipeline
- if self.dependent and self.queue:
+ item.queue = self
+ if self.queue:
item.item_ahead = self.queue[-1]
item.item_ahead.items_behind.append(item)
self.queue.append(item)
@@ -374,8 +380,6 @@ class ChangeQueue(object):
item.dequeue_time = time.time()
def moveItem(self, item, item_ahead):
- if not self.dependent:
- return False
if item.item_ahead == item_ahead:
return False
# Remove from current location
@@ -399,20 +403,20 @@ class ChangeQueue(object):
# TODO merge semantics
def isActionable(self, item):
- if self.dependent and self.window:
+ if self.window:
return item in self.queue[:self.window]
else:
return True
def increaseWindowSize(self):
- if self.dependent:
+ if self.window:
if self.window_increase_type == 'linear':
self.window += self.window_increase_factor
elif self.window_increase_type == 'exponential':
self.window *= self.window_increase_factor
def decreaseWindowSize(self):
- if self.dependent:
+ if self.window:
if self.window_decrease_type == 'linear':
self.window = max(
self.window_floor,
@@ -650,8 +654,9 @@ class BuildSet(object):
class QueueItem(object):
"""A changish inside of a Pipeline queue"""
- def __init__(self, pipeline, change):
- self.pipeline = pipeline
+ def __init__(self, queue, change):
+ self.pipeline = queue.pipeline
+ self.queue = queue
self.change = change # a changeish
self.build_sets = []
self.dequeued_needing_change = False
@@ -662,7 +667,8 @@ class QueueItem(object):
self.enqueue_time = None
self.dequeue_time = None
self.reported = False
- self.active = False
+ self.active = False # Whether an item is within an active window
+ self.live = True # Whether an item is intended to be processed at all
def __repr__(self):
if self.pipeline:
diff --git a/zuul/scheduler.py b/zuul/scheduler.py
index 3c09e0a92..ffb8e3f83 100644
--- a/zuul/scheduler.py
+++ b/zuul/scheduler.py
@@ -638,11 +638,15 @@ class Scheduler(threading.Thread):
self.log.debug("Re-enqueueing changes for pipeline %s" % name)
items_to_remove = []
builds_to_remove = []
+ last_head = None
for shared_queue in old_pipeline.queues:
for item in shared_queue.queue:
+ if not item.item_ahead:
+ last_head = item
item.item_ahead = None
item.items_behind = []
item.pipeline = None
+ item.queue = None
project = layout.projects.get(item.change.project.name)
if not project:
self.log.warning("Unable to find project for "
@@ -658,7 +662,8 @@ class Scheduler(threading.Thread):
build.job = job
else:
builds_to_remove.append(build)
- if not new_pipeline.manager.reEnqueueItem(item):
+ if not new_pipeline.manager.reEnqueueItem(item,
+ last_head):
items_to_remove.append(item)
for item in items_to_remove:
for build in item.current_build_set.getBuilds():
@@ -1114,8 +1119,11 @@ class BasePipelineManager(object):
self.log.debug("Change %s abandoned, removing." % change)
self.removeChange(change)
- def reEnqueueItem(self, item):
- change_queue = self.pipeline.getQueue(item.change.project)
+ def reEnqueueItem(self, item, last_head):
+ if last_head.queue:
+ change_queue = last_head.queue
+ else:
+ change_queue = self.getChangeQueue(item.change)
if change_queue:
self.log.debug("Re-enqueing change %s in queue %s" %
(item.change, change_queue))
@@ -1128,7 +1136,8 @@ class BasePipelineManager(object):
return False
def addChange(self, change, quiet=False, enqueue_time=None,
- ignore_requirements=False, change_queue=None):
+ ignore_requirements=False, live=True,
+ change_queue=None):
self.log.debug("Considering adding change %s" % change)
if self.isChangeAlreadyInQueue(change):
self.log.debug("Change %s is already in queue, ignoring" % change)
@@ -1171,6 +1180,7 @@ class BasePipelineManager(object):
item = change_queue.enqueueChange(change)
if enqueue_time:
item.enqueue_time = enqueue_time
+ item.live = live
self.reportStats(item)
self.enqueueChangesBehind(change, quiet, ignore_requirements,
change_queue)
@@ -1180,8 +1190,7 @@ class BasePipelineManager(object):
def dequeueItem(self, item):
self.log.debug("Removing change %s from queue" % item.change)
- change_queue = self.pipeline.getQueue(item.change.project)
- change_queue.dequeueItem(item)
+ item.queue.dequeueItem(item)
def removeChange(self, change):
# Remove a change from the queue, probably because it has been
@@ -1292,7 +1301,9 @@ class BasePipelineManager(object):
def _processOneItem(self, item, nnfi, ready_ahead):
changed = False
item_ahead = item.item_ahead
- change_queue = self.pipeline.getQueue(item.change.project)
+ if item_ahead and (not item_ahead.live):
+ item_ahead = None
+ change_queue = item.queue
failing_reasons = [] # Reasons this item is failing
if self.checkForChangesNeededBy(item.change) is not True:
@@ -1316,8 +1327,7 @@ class BasePipelineManager(object):
self.cancelJobs(item, prime=False)
else:
item_ahead_merged = False
- if ((item_ahead and item_ahead.change.is_merged) or
- not change_queue.dependent):
+ if (item_ahead and item_ahead.change.is_merged):
item_ahead_merged = True
if (item_ahead != nnfi and not item_ahead_merged):
# Our current base is different than what we expected,
@@ -1340,6 +1350,10 @@ class BasePipelineManager(object):
changed = True
if self.pipeline.didAnyJobFail(item):
failing_reasons.append("at least one job failed")
+ if (not item.live) and (not item.items_behind):
+ failing_reasons.append("is a non-live item with no items behind")
+ self.dequeueItem(item)
+ changed = True
if (not item_ahead) and self.pipeline.areAllJobsComplete(item):
try:
self.reportItem(item)
@@ -1352,7 +1366,7 @@ class BasePipelineManager(object):
self.cancelJobs(item_behind)
self.dequeueItem(item)
changed = True
- elif not failing_reasons:
+ elif not failing_reasons and item.live:
nnfi = item
item.current_build_set.failing_reasons = failing_reasons
if failing_reasons:
@@ -1436,7 +1450,7 @@ class BasePipelineManager(object):
item.change.branch)
self.log.info("Reported change %s status: all-succeeded: %s, "
"merged: %s" % (item.change, succeeded, merged))
- change_queue = self.pipeline.getQueue(item.change.project)
+ change_queue = item.queue
if not (succeeded and merged):
self.log.debug("Reported change %s failed tests or failed "
"to merge" % (item.change))
@@ -1693,14 +1707,71 @@ class IndependentPipelineManager(BasePipelineManager):
def _postConfig(self, layout):
super(IndependentPipelineManager, self)._postConfig(layout)
- change_queue = ChangeQueue(self.pipeline, dependent=False)
- for project in self.pipeline.getProjects():
- change_queue.addProject(project)
-
+ def getChangeQueue(self, change):
+ # creates a new change queue for every change
+ if change.project not in self.pipeline.getProjects():
+ return None
+ change_queue = ChangeQueue(self.pipeline)
+ change_queue.addProject(change.project)
self.pipeline.addQueue(change_queue)
+ return change_queue
- def getChangeQueue(self, change):
- return self.pipeline.getQueue(change.project)
+ def enqueueChangesAhead(self, change, quiet, ignore_requirements,
+ change_queue):
+ ret = self.checkForChangesNeededBy(change)
+ if ret in [True, False]:
+ return ret
+ self.log.debug(" Changes %s must be merged ahead of %s" %
+ (ret, change))
+ for needed_change in ret:
+ # This differs from the dependent pipeline by enqueuing
+ # changes ahead as "not live", that is, not intended to
+ # have jobs run. Also, pipeline requirements are always
+ # ignored (which is safe because the changes are not
+ # live).
+ r = self.addChange(needed_change, quiet=True,
+ ignore_requirements=True,
+ live=False, change_queue=change_queue)
+ if not r:
+ return False
+ return True
+
+ def checkForChangesNeededBy(self, change):
+ self.log.debug("Checking for changes needed by %s:" % change)
+ # Return true if okay to proceed enqueing this change,
+ # false if the change should not be enqueued.
+ if not hasattr(change, 'needs_changes'):
+ self.log.debug(" Changeish does not support dependencies")
+ return True
+ if not change.needs_changes:
+ self.log.debug(" No changes needed")
+ return True
+ changes_needed = []
+ for needed_change in change.needs_changes:
+ self.log.debug(" Change %s needs change %s:" % (
+ change, needed_change))
+ if needed_change.is_merged:
+ self.log.debug(" Needed change is merged")
+ continue
+ if self.isChangeAlreadyInQueue(needed_change):
+ self.log.debug(" Needed change is already ahead in the queue")
+ continue
+ self.log.debug(" Change %s is needed" % needed_change)
+ if needed_change not in changes_needed:
+ changes_needed.append(needed_change)
+ continue
+ # This differs from the dependent pipeline check in not
+ # verifying that the dependent change is mergable.
+ if changes_needed:
+ return changes_needed
+ return True
+
+ def dequeueItem(self, item):
+ super(IndependentPipelineManager, self).dequeueItem(item)
+ # An independent pipeline manager dynamically removes empty
+ # queues
+ if not item.queue.queue:
+ self.pipeline.removeQueue(item.queue)
class DependentPipelineManager(BasePipelineManager):