summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJames E. Blair <jeblair@hp.com>2015-02-07 11:41:19 -0800
committerJames E. Blair <jeblair@hp.com>2015-02-09 08:31:38 -0800
commitdbfe1cd08f9b5b270350fd44149559fe0e2becbd (patch)
tree4f7951e2756fc11e68557e0c835fa9ccfacf5582
parent107c385ec28f0c7eb003d8e125ee69c256d59376 (diff)
downloadzuul-dbfe1cd08f9b5b270350fd44149559fe0e2becbd.tar.gz
Fix independent pipeline CRD
If two git-dependent changes in the same project are added to an independent pipeline, the first should be enqueued as a single live item in its own change queue, and the second should be enqueued as a live item behind a non-live copy of the first with those two in their own change queue. However, the current behavior is that they are enqueued as two live items each in their own change queue. Once the first is removed, the queue processor would determine that a dependency of the second change had failed to merge, and therefore the second failed tests (as if it were in a dependent pipeline). This is because the queue processor looked for the status of dependent changes throughout the pipeline, so any failed dependent change would cause it to fail a change. The queue processor now only looks ahead in a given item's change queue (rather than the entire pipeline) to determine if a needed change is missing. Change-Id: Ieaa0cdbff59e6b77a11c82876f4fd5cb01fe950b
-rwxr-xr-xtests/test_scheduler.py73
-rw-r--r--zuul/scheduler.py43
2 files changed, 101 insertions, 15 deletions
diff --git a/tests/test_scheduler.py b/tests/test_scheduler.py
index 990136914..fb9073475 100755
--- a/tests/test_scheduler.py
+++ b/tests/test_scheduler.py
@@ -3137,6 +3137,79 @@ For CI problems and help debugging, contact ci@example.org"""
self.assertEqual(self.history[0].changes, '2,1 1,1')
self.assertEqual(len(self.sched.layout.pipelines['check'].queues), 0)
+ def test_crd_check_git_depends(self):
+ "Test single-repo dependencies in independent pipelines"
+ self.gearman_server.hold_jobs_in_queue = True
+ A = self.fake_gerrit.addFakeChange('org/project1', 'master', 'A')
+ B = self.fake_gerrit.addFakeChange('org/project1', 'master', 'B')
+
+ # Add two git-dependent changes and make sure they both report
+ # success.
+ B.setDependsOn(A, 1)
+ self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1))
+ self.waitUntilSettled()
+ self.fake_gerrit.addEvent(B.getPatchsetCreatedEvent(1))
+ self.waitUntilSettled()
+
+ self.gearman_server.hold_jobs_in_queue = False
+ self.gearman_server.release()
+ self.waitUntilSettled()
+
+ self.assertEqual(A.data['status'], 'NEW')
+ self.assertEqual(B.data['status'], 'NEW')
+ self.assertEqual(A.reported, 1)
+ self.assertEqual(B.reported, 1)
+
+ self.assertEqual(self.history[0].changes, '1,1')
+ self.assertEqual(self.history[-1].changes, '1,1 2,1')
+ self.assertEqual(len(self.sched.layout.pipelines['check'].queues), 0)
+
+ self.assertIn('Build succeeded', A.messages[0])
+ self.assertIn('Build succeeded', B.messages[0])
+
+ def test_crd_check_duplicate(self):
+ "Test duplicate check in independent pipelines"
+ self.gearman_server.hold_jobs_in_queue = True
+ A = self.fake_gerrit.addFakeChange('org/project1', 'master', 'A')
+ B = self.fake_gerrit.addFakeChange('org/project1', 'master', 'B')
+ check_pipeline = self.sched.layout.pipelines['check']
+
+ # Add two git-dependent changes...
+ B.setDependsOn(A, 1)
+ self.fake_gerrit.addEvent(B.getPatchsetCreatedEvent(1))
+ self.waitUntilSettled()
+ self.assertEqual(len(check_pipeline.getAllItems()), 2)
+
+ # ...make sure the live one is not duplicated...
+ self.fake_gerrit.addEvent(B.getPatchsetCreatedEvent(1))
+ self.waitUntilSettled()
+ self.assertEqual(len(check_pipeline.getAllItems()), 2)
+
+ # ...but the non-live one is able to be.
+ self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1))
+ self.waitUntilSettled()
+ self.assertEqual(len(check_pipeline.getAllItems()), 3)
+
+ self.gearman_server.hold_jobs_in_queue = False
+ self.gearman_server.release('.*-merge')
+ self.waitUntilSettled()
+ self.gearman_server.release('.*-merge')
+ self.waitUntilSettled()
+ self.gearman_server.release()
+ self.waitUntilSettled()
+
+ self.assertEqual(A.data['status'], 'NEW')
+ self.assertEqual(B.data['status'], 'NEW')
+ self.assertEqual(A.reported, 1)
+ self.assertEqual(B.reported, 1)
+
+ self.assertEqual(self.history[0].changes, '1,1 2,1')
+ self.assertEqual(self.history[1].changes, '1,1')
+ self.assertEqual(len(self.sched.layout.pipelines['check'].queues), 0)
+
+ self.assertIn('Build succeeded', A.messages[0])
+ self.assertIn('Build succeeded', B.messages[0])
+
def test_crd_check_reconfiguration(self):
"Test cross-repo dependencies re-enqueued in independent pipelines"
diff --git a/zuul/scheduler.py b/zuul/scheduler.py
index 49e6698ca..0bc171ee4 100644
--- a/zuul/scheduler.py
+++ b/zuul/scheduler.py
@@ -1,4 +1,4 @@
-# Copyright 2012-2014 Hewlett-Packard Development Company, L.P.
+# Copyright 2012-2015 Hewlett-Packard Development Company, L.P.
# Copyright 2013 OpenStack Foundation
# Copyright 2013 Antoine "hashar" Musso
# Copyright 2013 Wikimedia Foundation Inc.
@@ -1029,9 +1029,17 @@ class BasePipelineManager(object):
return True
return False
- def isChangeAlreadyInQueue(self, change):
- for c in self.pipeline.getChangesInQueue():
- if change.equals(c):
+ def isChangeAlreadyInPipeline(self, change):
+ # Checks live items in the pipeline
+ for item in self.pipeline.getAllItems():
+ if item.live and change.equals(item.change):
+ return True
+ return False
+
+ def isChangeAlreadyInQueue(self, change, change_queue):
+ # Checks any item in the specified change queue
+ for item in change_queue.queue:
+ if change.equals(item.change):
return True
return False
@@ -1077,7 +1085,7 @@ class BasePipelineManager(object):
change_queue):
return True
- def checkForChangesNeededBy(self, change):
+ def checkForChangesNeededBy(self, change, change_queue):
return True
def getFailingDependentItems(self, item):
@@ -1139,8 +1147,13 @@ class BasePipelineManager(object):
ignore_requirements=False, live=True,
change_queue=None):
self.log.debug("Considering adding change %s" % change)
- if self.isChangeAlreadyInQueue(change):
- self.log.debug("Change %s is already in queue, ignoring" % change)
+
+ # If we are adding a live change, check if it's a live item
+ # anywhere in the pipeline. Otherwise, we will perform the
+ # duplicate check below on the specific change_queue.
+ if live and self.isChangeAlreadyInPipeline(change):
+ self.log.debug("Change %s is already in pipeline, "
+ "ignoring" % change)
return True
if not self.isChangeReadyToBeEnqueued(change):
@@ -1168,7 +1181,7 @@ class BasePipelineManager(object):
self.log.debug("Failed to enqueue changes ahead of %s" % change)
return False
- if self.isChangeAlreadyInQueue(change):
+ if self.isChangeAlreadyInQueue(change, change_queue):
self.log.debug("Change %s is already in queue, ignoring" % change)
return True
@@ -1306,7 +1319,7 @@ class BasePipelineManager(object):
change_queue = item.queue
failing_reasons = [] # Reasons this item is failing
- if self.checkForChangesNeededBy(item.change) is not True:
+ if self.checkForChangesNeededBy(item.change, change_queue) is not True:
# It's not okay to enqueue this change, we should remove it.
self.log.info("Dequeuing change %s because "
"it can no longer merge" % item.change)
@@ -1719,7 +1732,7 @@ class IndependentPipelineManager(BasePipelineManager):
def enqueueChangesAhead(self, change, quiet, ignore_requirements,
change_queue):
- ret = self.checkForChangesNeededBy(change)
+ ret = self.checkForChangesNeededBy(change, change_queue)
if ret in [True, False]:
return ret
self.log.debug(" Changes %s must be merged ahead of %s" %
@@ -1737,7 +1750,7 @@ class IndependentPipelineManager(BasePipelineManager):
return False
return True
- def checkForChangesNeededBy(self, change):
+ def checkForChangesNeededBy(self, change, change_queue):
self.log.debug("Checking for changes needed by %s:" % change)
# Return true if okay to proceed enqueing this change,
# false if the change should not be enqueued.
@@ -1754,7 +1767,7 @@ class IndependentPipelineManager(BasePipelineManager):
if needed_change.is_merged:
self.log.debug(" Needed change is merged")
continue
- if self.isChangeAlreadyInQueue(needed_change):
+ if self.isChangeAlreadyInQueue(needed_change, change_queue):
self.log.debug(" Needed change is already ahead in the queue")
continue
self.log.debug(" Change %s is needed" % needed_change)
@@ -1875,7 +1888,7 @@ class DependentPipelineManager(BasePipelineManager):
def enqueueChangesAhead(self, change, quiet, ignore_requirements,
change_queue):
- ret = self.checkForChangesNeededBy(change)
+ ret = self.checkForChangesNeededBy(change, change_queue)
if ret in [True, False]:
return ret
self.log.debug(" Changes %s must be merged ahead of %s" %
@@ -1888,7 +1901,7 @@ class DependentPipelineManager(BasePipelineManager):
return False
return True
- def checkForChangesNeededBy(self, change):
+ def checkForChangesNeededBy(self, change, change_queue):
self.log.debug("Checking for changes needed by %s:" % change)
# Return true if okay to proceed enqueing this change,
# false if the change should not be enqueued.
@@ -1916,7 +1929,7 @@ class DependentPipelineManager(BasePipelineManager):
if not needed_change.is_current_patchset:
self.log.debug(" Needed change is not the current patchset")
return False
- if self.isChangeAlreadyInQueue(needed_change):
+ if self.isChangeAlreadyInQueue(needed_change, change_queue):
self.log.debug(" Needed change is already ahead in the queue")
continue
if self.pipeline.source.canMerge(needed_change,