diff options
author | Jenkins <jenkins@review.openstack.org> | 2015-02-05 19:15:41 +0000 |
---|---|---|
committer | Gerrit Code Review <review@openstack.org> | 2015-02-05 19:15:41 +0000 |
commit | c8fd826c5276082c6a566c8cd9d2c20d51b000b1 (patch) | |
tree | 9cfef42726cfff9f3f7cba454db9046e42a2aac5 | |
parent | 2f74bd78951cd7a7827aa82ee48cd576387dd1e3 (diff) | |
parent | bfb8e049487d76acbf685762d6a4725f49670e5a (diff) | |
download | zuul-c8fd826c5276082c6a566c8cd9d2c20d51b000b1.tar.gz |
Merge "Support cross-repo-dependencies in independent pipelines"
-rwxr-xr-x | tests/test_scheduler.py | 44 | ||||
-rw-r--r-- | tests/test_zuultrigger.py | 2 | ||||
-rw-r--r-- | zuul/model.py | 34 | ||||
-rw-r--r-- | zuul/scheduler.py | 105 |
4 files changed, 152 insertions, 33 deletions
diff --git a/tests/test_scheduler.py b/tests/test_scheduler.py index 3837cfac5..059f155f4 100755 --- a/tests/test_scheduler.py +++ b/tests/test_scheduler.py @@ -1921,7 +1921,7 @@ class TestScheduler(ZuulTestCase): status_jobs = set() for p in data['pipelines']: for q in p['change_queues']: - if q['dependent']: + if p['name'] in ['gate', 'conflict']: self.assertEqual(q['window'], 20) else: self.assertEqual(q['window'], 0) @@ -3014,3 +3014,45 @@ For CI problems and help debugging, contact ci@example.org""" self.assertEqual(B.reported, 0) self.assertEqual(A.data['status'], 'NEW') self.assertEqual(B.data['status'], 'NEW') + + def test_crd_check(self): + "Test cross-repo dependencies in independent pipelines" + + self.gearman_server.hold_jobs_in_queue = True + A = self.fake_gerrit.addFakeChange('org/project1', 'master', 'A') + B = self.fake_gerrit.addFakeChange('org/project2', 'master', 'B') + + # A Depends-On: B + A.data['commitMessage'] = '%s\n\nDepends-On: %s\n' % ( + A.subject, B.data['id']) + + self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1)) + self.waitUntilSettled() + + queue = self.gearman_server.getQueue() + ref = self.getParameter(queue[-1], 'ZUUL_REF') + self.gearman_server.hold_jobs_in_queue = False + self.gearman_server.release() + self.waitUntilSettled() + + path = os.path.join(self.git_root, "org/project1") + repo = git.Repo(path) + repo_messages = [c.message.strip() for c in repo.iter_commits(ref)] + repo_messages.reverse() + correct_messages = ['initial commit', 'A-1'] + self.assertEqual(repo_messages, correct_messages) + + path = os.path.join(self.git_root, "org/project2") + repo = git.Repo(path) + repo_messages = [c.message.strip() for c in repo.iter_commits(ref)] + repo_messages.reverse() + correct_messages = ['initial commit', 'B-1'] + self.assertEqual(repo_messages, correct_messages) + + self.assertEqual(A.data['status'], 'NEW') + self.assertEqual(B.data['status'], 'NEW') + self.assertEqual(A.reported, 1) + self.assertEqual(B.reported, 0) + + self.assertEqual(self.history[0].changes, '2,1 1,1') + self.assertEqual(len(self.sched.layout.pipelines['check'].queues), 0) diff --git a/tests/test_zuultrigger.py b/tests/test_zuultrigger.py index 3f339beff..a26fa8605 100644 --- a/tests/test_zuultrigger.py +++ b/tests/test_zuultrigger.py @@ -65,7 +65,7 @@ class TestZuulTrigger(ZuulTestCase): for job in self.history: if job.changes == '1,1': self.assertEqual(job.name, 'project-gate') - elif job.changes == '2,1': + elif job.changes == '1,1 2,1': self.assertEqual(job.name, 'project-check') elif job.changes == '1,1 3,1': self.assertEqual(job.name, 'project-gate') diff --git a/zuul/model.py b/zuul/model.py index 3bba28467..2a69b7902 100644 --- a/zuul/model.py +++ b/zuul/model.py @@ -111,6 +111,9 @@ class Pipeline(object): return queue return None + def removeQueue(self, queue): + self.queues.remove(queue) + def getJobTree(self, project): tree = self.job_trees.get(project) return tree @@ -148,6 +151,8 @@ class Pipeline(object): return torun def findJobsToRun(self, item): + if not item.live: + return [] tree = self.getJobTree(item.change.project) if not tree: return [] @@ -193,6 +198,8 @@ class Pipeline(object): return False def isHoldingFollowingChanges(self, item): + if not item.live: + return False for job in self.getJobs(item.change): if not job.hold_following_changes: continue @@ -256,7 +263,6 @@ class Pipeline(object): j_queues.append(j_queue) j_queue['heads'] = [] j_queue['window'] = queue.window - j_queue['dependent'] = queue.dependent j_changes = [] for e in queue.queue: @@ -303,8 +309,8 @@ class ChangeQueue(object): different projects; this is one of them. For instance, there may a queue shared by interrelated projects foo and bar, and a second queue for independent project baz. Pipelines have one or more - PipelineQueues.""" - def __init__(self, pipeline, dependent=True, window=0, window_floor=1, + ChangeQueues.""" + def __init__(self, pipeline, window=0, window_floor=1, window_increase_type='linear', window_increase_factor=1, window_decrease_type='exponential', window_decrease_factor=2): self.pipeline = pipeline @@ -314,7 +320,6 @@ class ChangeQueue(object): self.projects = [] self._jobs = set() self.queue = [] - self.dependent = dependent self.window = window self.window_floor = window_floor self.window_increase_type = window_increase_type @@ -348,14 +353,15 @@ class ChangeQueue(object): self.name = self.assigned_name or self.generated_name def enqueueChange(self, change): - item = QueueItem(self.pipeline, change) + item = QueueItem(self, change) self.enqueueItem(item) item.enqueue_time = time.time() return item def enqueueItem(self, item): item.pipeline = self.pipeline - if self.dependent and self.queue: + item.queue = self + if self.queue: item.item_ahead = self.queue[-1] item.item_ahead.items_behind.append(item) self.queue.append(item) @@ -374,8 +380,6 @@ class ChangeQueue(object): item.dequeue_time = time.time() def moveItem(self, item, item_ahead): - if not self.dependent: - return False if item.item_ahead == item_ahead: return False # Remove from current location @@ -399,20 +403,20 @@ class ChangeQueue(object): # TODO merge semantics def isActionable(self, item): - if self.dependent and self.window: + if self.window: return item in self.queue[:self.window] else: return True def increaseWindowSize(self): - if self.dependent: + if self.window: if self.window_increase_type == 'linear': self.window += self.window_increase_factor elif self.window_increase_type == 'exponential': self.window *= self.window_increase_factor def decreaseWindowSize(self): - if self.dependent: + if self.window: if self.window_decrease_type == 'linear': self.window = max( self.window_floor, @@ -650,8 +654,9 @@ class BuildSet(object): class QueueItem(object): """A changish inside of a Pipeline queue""" - def __init__(self, pipeline, change): - self.pipeline = pipeline + def __init__(self, queue, change): + self.pipeline = queue.pipeline + self.queue = queue self.change = change # a changeish self.build_sets = [] self.dequeued_needing_change = False @@ -662,7 +667,8 @@ class QueueItem(object): self.enqueue_time = None self.dequeue_time = None self.reported = False - self.active = False + self.active = False # Whether an item is within an active window + self.live = True # Whether an item is intended to be processed at all def __repr__(self): if self.pipeline: diff --git a/zuul/scheduler.py b/zuul/scheduler.py index 3c09e0a92..ffb8e3f83 100644 --- a/zuul/scheduler.py +++ b/zuul/scheduler.py @@ -638,11 +638,15 @@ class Scheduler(threading.Thread): self.log.debug("Re-enqueueing changes for pipeline %s" % name) items_to_remove = [] builds_to_remove = [] + last_head = None for shared_queue in old_pipeline.queues: for item in shared_queue.queue: + if not item.item_ahead: + last_head = item item.item_ahead = None item.items_behind = [] item.pipeline = None + item.queue = None project = layout.projects.get(item.change.project.name) if not project: self.log.warning("Unable to find project for " @@ -658,7 +662,8 @@ class Scheduler(threading.Thread): build.job = job else: builds_to_remove.append(build) - if not new_pipeline.manager.reEnqueueItem(item): + if not new_pipeline.manager.reEnqueueItem(item, + last_head): items_to_remove.append(item) for item in items_to_remove: for build in item.current_build_set.getBuilds(): @@ -1114,8 +1119,11 @@ class BasePipelineManager(object): self.log.debug("Change %s abandoned, removing." % change) self.removeChange(change) - def reEnqueueItem(self, item): - change_queue = self.pipeline.getQueue(item.change.project) + def reEnqueueItem(self, item, last_head): + if last_head.queue: + change_queue = last_head.queue + else: + change_queue = self.getChangeQueue(item.change) if change_queue: self.log.debug("Re-enqueing change %s in queue %s" % (item.change, change_queue)) @@ -1128,7 +1136,8 @@ class BasePipelineManager(object): return False def addChange(self, change, quiet=False, enqueue_time=None, - ignore_requirements=False, change_queue=None): + ignore_requirements=False, live=True, + change_queue=None): self.log.debug("Considering adding change %s" % change) if self.isChangeAlreadyInQueue(change): self.log.debug("Change %s is already in queue, ignoring" % change) @@ -1171,6 +1180,7 @@ class BasePipelineManager(object): item = change_queue.enqueueChange(change) if enqueue_time: item.enqueue_time = enqueue_time + item.live = live self.reportStats(item) self.enqueueChangesBehind(change, quiet, ignore_requirements, change_queue) @@ -1180,8 +1190,7 @@ class BasePipelineManager(object): def dequeueItem(self, item): self.log.debug("Removing change %s from queue" % item.change) - change_queue = self.pipeline.getQueue(item.change.project) - change_queue.dequeueItem(item) + item.queue.dequeueItem(item) def removeChange(self, change): # Remove a change from the queue, probably because it has been @@ -1292,7 +1301,9 @@ class BasePipelineManager(object): def _processOneItem(self, item, nnfi, ready_ahead): changed = False item_ahead = item.item_ahead - change_queue = self.pipeline.getQueue(item.change.project) + if item_ahead and (not item_ahead.live): + item_ahead = None + change_queue = item.queue failing_reasons = [] # Reasons this item is failing if self.checkForChangesNeededBy(item.change) is not True: @@ -1316,8 +1327,7 @@ class BasePipelineManager(object): self.cancelJobs(item, prime=False) else: item_ahead_merged = False - if ((item_ahead and item_ahead.change.is_merged) or - not change_queue.dependent): + if (item_ahead and item_ahead.change.is_merged): item_ahead_merged = True if (item_ahead != nnfi and not item_ahead_merged): # Our current base is different than what we expected, @@ -1340,6 +1350,10 @@ class BasePipelineManager(object): changed = True if self.pipeline.didAnyJobFail(item): failing_reasons.append("at least one job failed") + if (not item.live) and (not item.items_behind): + failing_reasons.append("is a non-live item with no items behind") + self.dequeueItem(item) + changed = True if (not item_ahead) and self.pipeline.areAllJobsComplete(item): try: self.reportItem(item) @@ -1352,7 +1366,7 @@ class BasePipelineManager(object): self.cancelJobs(item_behind) self.dequeueItem(item) changed = True - elif not failing_reasons: + elif not failing_reasons and item.live: nnfi = item item.current_build_set.failing_reasons = failing_reasons if failing_reasons: @@ -1436,7 +1450,7 @@ class BasePipelineManager(object): item.change.branch) self.log.info("Reported change %s status: all-succeeded: %s, " "merged: %s" % (item.change, succeeded, merged)) - change_queue = self.pipeline.getQueue(item.change.project) + change_queue = item.queue if not (succeeded and merged): self.log.debug("Reported change %s failed tests or failed " "to merge" % (item.change)) @@ -1693,14 +1707,71 @@ class IndependentPipelineManager(BasePipelineManager): def _postConfig(self, layout): super(IndependentPipelineManager, self)._postConfig(layout) - change_queue = ChangeQueue(self.pipeline, dependent=False) - for project in self.pipeline.getProjects(): - change_queue.addProject(project) - + def getChangeQueue(self, change): + # creates a new change queue for every change + if change.project not in self.pipeline.getProjects(): + return None + change_queue = ChangeQueue(self.pipeline) + change_queue.addProject(change.project) self.pipeline.addQueue(change_queue) + return change_queue - def getChangeQueue(self, change): - return self.pipeline.getQueue(change.project) + def enqueueChangesAhead(self, change, quiet, ignore_requirements, + change_queue): + ret = self.checkForChangesNeededBy(change) + if ret in [True, False]: + return ret + self.log.debug(" Changes %s must be merged ahead of %s" % + (ret, change)) + for needed_change in ret: + # This differs from the dependent pipeline by enqueuing + # changes ahead as "not live", that is, not intended to + # have jobs run. Also, pipeline requirements are always + # ignored (which is safe because the changes are not + # live). + r = self.addChange(needed_change, quiet=True, + ignore_requirements=True, + live=False, change_queue=change_queue) + if not r: + return False + return True + + def checkForChangesNeededBy(self, change): + self.log.debug("Checking for changes needed by %s:" % change) + # Return true if okay to proceed enqueing this change, + # false if the change should not be enqueued. + if not hasattr(change, 'needs_changes'): + self.log.debug(" Changeish does not support dependencies") + return True + if not change.needs_changes: + self.log.debug(" No changes needed") + return True + changes_needed = [] + for needed_change in change.needs_changes: + self.log.debug(" Change %s needs change %s:" % ( + change, needed_change)) + if needed_change.is_merged: + self.log.debug(" Needed change is merged") + continue + if self.isChangeAlreadyInQueue(needed_change): + self.log.debug(" Needed change is already ahead in the queue") + continue + self.log.debug(" Change %s is needed" % needed_change) + if needed_change not in changes_needed: + changes_needed.append(needed_change) + continue + # This differs from the dependent pipeline check in not + # verifying that the dependent change is mergable. + if changes_needed: + return changes_needed + return True + + def dequeueItem(self, item): + super(IndependentPipelineManager, self).dequeueItem(item) + # An independent pipeline manager dynamically removes empty + # queues + if not item.queue.queue: + self.pipeline.removeQueue(item.queue) class DependentPipelineManager(BasePipelineManager): |