diff options
author | Tobias Henkel <tobias.henkel@bmw.de> | 2019-05-12 10:58:29 +0200 |
---|---|---|
committer | Tobias Henkel <tobias.henkel@bmw.de> | 2019-05-17 06:09:29 +0200 |
commit | 5bcee8309025f383fe89a00a9cf82e3e957ae49c (patch) | |
tree | 2cad82b618c1ad7977eed17cc180067dcfad463b /zuul | |
parent | 4e5352fef09b73e7cec394968d8116ed4bc0f328 (diff) | |
download | zuul-5bcee8309025f383fe89a00a9cf82e3e957ae49c.tar.gz |
Annotate pipeline processing with event id
The pipeline processing is very complex and it's helpful to be able to
track events through the pipeline processing.
Change-Id: I455cdcf34128a2c067c10425736bf1d195df6fa8
Diffstat (limited to 'zuul')
-rw-r--r-- | zuul/manager/__init__.py | 109 | ||||
-rw-r--r-- | zuul/manager/dependent.py | 88 | ||||
-rw-r--r-- | zuul/manager/independent.py | 36 | ||||
-rw-r--r-- | zuul/manager/supercedent.py | 9 |
4 files changed, 133 insertions, 109 deletions
diff --git a/zuul/manager/__init__.py b/zuul/manager/__init__.py index 1193e872e..08761da59 100644 --- a/zuul/manager/__init__.py +++ b/zuul/manager/__init__.py @@ -17,6 +17,7 @@ import urllib from zuul import exceptions from zuul import model from zuul.lib.dependson import find_dependency_headers +from zuul.lib.logutil import get_annotated_logger class DynamicChangeQueueContextManager(object): @@ -107,23 +108,24 @@ class PipelineManager(object): return allow_needs def eventMatches(self, event, change): + log = get_annotated_logger(self.log, event) if event.forced_pipeline: if event.forced_pipeline == self.pipeline.name: - self.log.debug("Event %s for change %s was directly assigned " - "to pipeline %s" % (event, change, self)) + log.debug("Event %s for change %s was directly assigned " + "to pipeline %s" % (event, change, self)) return True else: return False for ef in self.event_filters: match_result = ef.matches(event, change) if match_result: - self.log.debug("Event %s for change %s matched %s " - "in pipeline %s" % (event, change, ef, self)) + log.debug("Event %s for change %s matched %s " + "in pipeline %s" % (event, change, ef, self)) return True else: - self.log.debug("Event %s for change %s does not match %s " - "in pipeline %s because %s" % ( - event, change, ef, self, str(match_result))) + log.debug("Event %s for change %s does not match %s " + "in pipeline %s because %s" % ( + event, change, ef, self, str(match_result))) return False def getNodePriority(self, item): @@ -187,7 +189,7 @@ class PipelineManager(object): change_queue): return True - def checkForChangesNeededBy(self, change, change_queue): + def checkForChangesNeededBy(self, change, change_queue, event): return True def getFailingDependentItems(self, item): @@ -225,7 +227,8 @@ class PipelineManager(object): self.removeItem(item) def reEnqueueItem(self, item, last_head, old_item_ahead, item_ahead_valid): - with self.getChangeQueue(item.change, last_head.queue) as change_queue: + with self.getChangeQueue(item.change, item.event, + last_head.queue) as change_queue: if change_queue: self.log.debug("Re-enqueing change %s in queue %s" % (item.change, change_queue)) @@ -272,55 +275,53 @@ class PipelineManager(object): def addChange(self, change, event, quiet=False, enqueue_time=None, ignore_requirements=False, live=True, change_queue=None, history=None): - self.log.debug("Considering adding change %s" % change) + log = get_annotated_logger(self.log, event) + log.debug("Considering adding change %s" % change) # If we are adding a live change, check if it's a live item # anywhere in the pipeline. Otherwise, we will perform the # duplicate check below on the specific change_queue. if live and self.isChangeAlreadyInPipeline(change): - self.log.debug("Change %s is already in pipeline, " - "ignoring" % change) + log.debug("Change %s is already in pipeline, ignoring" % change) return True if not ignore_requirements: for f in self.ref_filters: if f.connection_name != change.project.connection_name: - self.log.debug("Filter %s skipped for change %s due " - "to mismatched connections" % (f, change)) + log.debug("Filter %s skipped for change %s due " + "to mismatched connections" % (f, change)) continue match_result = f.matches(change) if not match_result: - self.log.debug("Change %s does not match pipeline " - "requirement %s because %s" % ( - change, f, str(match_result))) + log.debug("Change %s does not match pipeline " + "requirement %s because %s" % ( + change, f, str(match_result))) return False if not self.isChangeReadyToBeEnqueued(change): - self.log.debug("Change %s is not ready to be enqueued, ignoring" % - change) + log.debug("Change %s is not ready to be enqueued, ignoring" % + change) return False - with self.getChangeQueue(change, change_queue) as change_queue: + with self.getChangeQueue(change, event, change_queue) as change_queue: if not change_queue: - self.log.debug("Unable to find change queue for " - "change %s in project %s" % - (change, change.project)) + log.debug("Unable to find change queue for " + "change %s in project %s" % + (change, change.project)) return False if not self.enqueueChangesAhead(change, event, quiet, ignore_requirements, change_queue, history=history): - self.log.debug("Failed to enqueue changes " - "ahead of %s" % change) + log.debug("Failed to enqueue changes ahead of %s" % change) return False if self.isChangeAlreadyInQueue(change, change_queue): - self.log.debug("Change %s is already in queue, " - "ignoring" % change) + log.debug("Change %s is already in queue, ignoring" % change) return True - self.log.info("Adding change %s to queue %s in %s" % - (change, change_queue, self.pipeline)) + log.info("Adding change %s to queue %s in %s" % + (change, change_queue, self.pipeline)) item = change_queue.enqueueChange(change, event) if enqueue_time: item.enqueue_time = enqueue_time @@ -348,14 +349,16 @@ class PipelineManager(object): self.dequeueItem(item) self.reportStats(item) - def updateCommitDependencies(self, change, change_queue): + def updateCommitDependencies(self, change, change_queue, event): + log = get_annotated_logger(self.log, event) + # Search for Depends-On headers and find appropriate changes - self.log.debug(" Updating commit dependencies for %s", change) + log.debug(" Updating commit dependencies for %s", change) change.refresh_deps = False dependencies = [] seen = set() for match in find_dependency_headers(change.message): - self.log.debug(" Found Depends-On header: %s", match) + log.debug(" Found Depends-On header: %s", match) if match in seen: continue seen.add(match) @@ -367,10 +370,10 @@ class PipelineManager(object): url.hostname) if not source: continue - self.log.debug(" Found source: %s", source) + log.debug(" Found source: %s", source) dep = source.getChangeByURL(match) if dep and (not dep.is_merged) and dep not in dependencies: - self.log.debug(" Adding dependency: %s", dep) + log.debug(" Adding dependency: %s", dep) dependencies.append(dep) change.commit_needs_changes = dependencies @@ -612,8 +615,9 @@ class PipelineManager(object): return self._loadDynamicLayout(item) def scheduleMerge(self, item, files=None, dirs=None): - self.log.debug("Scheduling merge for item %s (files: %s, dirs: %s)" % - (item, files, dirs)) + log = get_annotated_logger(self.log, item.event) + log.debug("Scheduling merge for item %s (files: %s, dirs: %s)" % + (item, files, dirs)) build_set = item.current_build_set build_set.merge_state = build_set.PENDING if isinstance(item.change, model.Change): @@ -627,7 +631,8 @@ class PipelineManager(object): return False def scheduleFilesChanges(self, item): - self.log.debug("Scheduling fileschanged for item %s", item) + log = get_annotated_logger(self.log, item.event) + log.debug("Scheduling fileschanged for item %s", item) build_set = item.current_build_set build_set.files_state = build_set.PENDING @@ -660,6 +665,7 @@ class PipelineManager(object): return ready def prepareJobs(self, item): + log = get_annotated_logger(self.log, item.event) # This only runs once the item is in the pipeline's action window # Returns True if the item is ready, false otherwise if not item.live: @@ -674,18 +680,18 @@ class PipelineManager(object): if not item.job_graph: try: - self.log.debug("Freezing job graph for %s" % (item,)) + log.debug("Freezing job graph for %s" % (item,)) item.freezeJobGraph() except Exception as e: # TODOv3(jeblair): nicify this exception as it will be reported - self.log.exception("Error freezing job graph for %s" % - (item,)) + log.exception("Error freezing job graph for %s" % (item,)) item.setConfigError("Unable to freeze job graph: %s" % (str(e))) return False return True def _processOneItem(self, item, nnfi): + log = get_annotated_logger(self.log, item.event) changed = False ready = False dequeued = False @@ -696,10 +702,11 @@ class PipelineManager(object): item_ahead = None change_queue = item.queue - if self.checkForChangesNeededBy(item.change, change_queue) is not True: + if self.checkForChangesNeededBy(item.change, change_queue, + item.event) is not True: # It's not okay to enqueue this change, we should remove it. - self.log.info("Dequeuing change %s because " - "it can no longer merge" % item.change) + log.info("Dequeuing change %s because " + "it can no longer merge" % item.change) self.cancelJobs(item) self.dequeueItem(item) item.setDequeuedNeedingChange() @@ -727,9 +734,9 @@ class PipelineManager(object): # Our current base is different than what we expected, # and it's not because our current base merged. Something # ahead must have failed. - self.log.info("Resetting builds for change %s because the " - "item ahead, %s, is not the nearest non-failing " - "item, %s" % (item.change, item_ahead, nnfi)) + log.info("Resetting builds for change %s because the " + "item ahead, %s, is not the nearest non-failing " + "item, %s" % (item.change, item_ahead, nnfi)) change_queue.moveItem(item, nnfi) changed = True self.cancelJobs(item) @@ -764,9 +771,9 @@ class PipelineManager(object): except exceptions.MergeFailure: failing_reasons.append("it did not merge") for item_behind in item.items_behind: - self.log.info("Resetting builds for change %s because the " - "item ahead, %s, failed to merge" % - (item_behind.change, item)) + log.info("Resetting builds for change %s because the " + "item ahead, %s, failed to merge" % + (item_behind.change, item)) self.cancelJobs(item_behind) self.dequeueItem(item) changed = dequeued = True @@ -774,8 +781,8 @@ class PipelineManager(object): nnfi = item item.current_build_set.failing_reasons = failing_reasons if failing_reasons: - self.log.debug("%s is a failing item because %s" % - (item, failing_reasons)) + log.debug("%s is a failing item because %s" % + (item, failing_reasons)) if item.live and not dequeued and self.sched.use_relative_priority: priority = item.getNodePriority() for node_request in item.current_build_set.node_requests.values(): diff --git a/zuul/manager/dependent.py b/zuul/manager/dependent.py index ffb5a9c68..ae65300da 100644 --- a/zuul/manager/dependent.py +++ b/zuul/manager/dependent.py @@ -11,6 +11,7 @@ # under the License. from zuul import model +from zuul.lib.logutil import get_annotated_logger from zuul.manager import PipelineManager, StaticChangeQueueContextManager from zuul.manager import DynamicChangeQueueContextManager @@ -74,7 +75,9 @@ class DependentPipelineManager(PipelineManager): self.log.debug("Added project %s to queue: %s" % (project, change_queue)) - def getChangeQueue(self, change, existing=None): + def getChangeQueue(self, change, event, existing=None): + log = get_annotated_logger(self.log, event) + if existing: return StaticChangeQueueContextManager(existing) queue = self.pipeline.getQueue(change.project) @@ -86,11 +89,11 @@ class DependentPipelineManager(PipelineManager): change_queue = model.ChangeQueue(self.pipeline, dynamic=True) change_queue.addProject(change.project) self.pipeline.addQueue(change_queue) - self.log.debug("Dynamically created queue %s", change_queue) + log.debug("Dynamically created queue %s", change_queue) return DynamicChangeQueueContextManager(change_queue) def getNodePriority(self, item): - with self.getChangeQueue(item.change) as change_queue: + with self.getChangeQueue(item.change, item.event) as change_queue: items = change_queue.queue return items.index(item) @@ -103,9 +106,11 @@ class DependentPipelineManager(PipelineManager): def enqueueChangesBehind(self, change, event, quiet, ignore_requirements, change_queue): - self.log.debug("Checking for changes needing %s:" % change) + log = get_annotated_logger(self.log, event) + + log.debug("Checking for changes needing %s:" % change) if not hasattr(change, 'needed_by_changes'): - self.log.debug(" %s does not support dependencies" % type(change)) + log.debug(" %s does not support dependencies" % type(change)) return # for project in change_queue, project.source get changes, then dedup. @@ -116,7 +121,7 @@ class DependentPipelineManager(PipelineManager): seen = set(change.needed_by_changes) needed_by_changes = change.needed_by_changes[:] for source in sources: - self.log.debug(" Checking source: %s", source) + log.debug(" Checking source: %s", source) for c in source.getChangesDependingOn(change, change_queue.projects, self.pipeline.tenant): @@ -124,25 +129,26 @@ class DependentPipelineManager(PipelineManager): seen.add(c) needed_by_changes.append(c) - self.log.debug(" Following changes: %s", needed_by_changes) + log.debug(" Following changes: %s", needed_by_changes) to_enqueue = [] for other_change in needed_by_changes: - with self.getChangeQueue(other_change) as other_change_queue: + with self.getChangeQueue(other_change, + event) as other_change_queue: if other_change_queue != change_queue: - self.log.debug(" Change %s in project %s can not be " - "enqueued in the target queue %s" % - (other_change, other_change.project, - change_queue)) + log.debug(" Change %s in project %s can not be " + "enqueued in the target queue %s" % + (other_change, other_change.project, + change_queue)) continue source = other_change.project.source if source.canMerge(other_change, self.getSubmitAllowNeeds()): - self.log.debug(" Change %s needs %s and is ready to merge" % - (other_change, change)) + log.debug(" Change %s needs %s and is ready to merge", + other_change, change) to_enqueue.append(other_change) if not to_enqueue: - self.log.debug(" No changes need %s" % change) + log.debug(" No changes need %s" % change) for other_change in to_enqueue: self.addChange(other_change, event, quiet=quiet, @@ -151,9 +157,11 @@ class DependentPipelineManager(PipelineManager): def enqueueChangesAhead(self, change, event, quiet, ignore_requirements, change_queue, history=None): + log = get_annotated_logger(self.log, event) + if history and change in history: # detected dependency cycle - self.log.warn("Dependency cycle detected") + log.warn("Dependency cycle detected") return False if hasattr(change, 'number'): history = history or [] @@ -162,11 +170,10 @@ class DependentPipelineManager(PipelineManager): # Don't enqueue dependencies ahead of a non-change ref. return True - ret = self.checkForChangesNeededBy(change, change_queue) + ret = self.checkForChangesNeededBy(change, change_queue, event) if ret in [True, False]: return ret - self.log.debug(" Changes %s must be merged ahead of %s" % - (ret, change)) + log.debug(" Changes %s must be merged ahead of %s", ret, change) for needed_change in ret: r = self.addChange(needed_change, event, quiet=quiet, ignore_requirements=ignore_requirements, @@ -175,53 +182,54 @@ class DependentPipelineManager(PipelineManager): return False return True - def checkForChangesNeededBy(self, change, change_queue): + def checkForChangesNeededBy(self, change, change_queue, event): + log = get_annotated_logger(self.log, event) + # Return true if okay to proceed enqueing this change, # false if the change should not be enqueued. - self.log.debug("Checking for changes needed by %s:" % change) + log.debug("Checking for changes needed by %s:" % change) if (hasattr(change, 'commit_needs_changes') and (change.refresh_deps or change.commit_needs_changes is None)): - self.updateCommitDependencies(change, change_queue) + self.updateCommitDependencies(change, change_queue, event) if not hasattr(change, 'needs_changes'): - self.log.debug(" %s does not support dependencies" % type(change)) + log.debug(" %s does not support dependencies", type(change)) return True if not change.needs_changes: - self.log.debug(" No changes needed") + log.debug(" No changes needed") return True changes_needed = [] # Ignore supplied change_queue - with self.getChangeQueue(change) as change_queue: + with self.getChangeQueue(change, event) as change_queue: for needed_change in change.needs_changes: - self.log.debug(" Change %s needs change %s:" % ( + log.debug(" Change %s needs change %s:" % ( change, needed_change)) if needed_change.is_merged: - self.log.debug(" Needed change is merged") + log.debug(" Needed change is merged") continue - with self.getChangeQueue(needed_change) as needed_change_queue: + with self.getChangeQueue(needed_change, + event) as needed_change_queue: if needed_change_queue != change_queue: - self.log.debug(" Change %s in project %s does not " - "share a change queue with %s " - "in project %s" % - (needed_change, needed_change.project, - change, change.project)) + log.debug(" Change %s in project %s does not " + "share a change queue with %s " + "in project %s", + needed_change, needed_change.project, + change, change.project) return False if not needed_change.is_current_patchset: - self.log.debug(" Needed change is not the " - "current patchset") + log.debug(" Needed change is not the current patchset") return False if self.isChangeAlreadyInQueue(needed_change, change_queue): - self.log.debug(" Needed change is already ahead " - "in the queue") + log.debug(" Needed change is already ahead in the queue") continue if needed_change.project.source.canMerge( needed_change, self.getSubmitAllowNeeds()): - self.log.debug(" Change %s is needed" % needed_change) + log.debug(" Change %s is needed", needed_change) if needed_change not in changes_needed: changes_needed.append(needed_change) continue # The needed change can't be merged. - self.log.debug(" Change %s is needed but can not be merged" % - needed_change) + log.debug(" Change %s is needed but can not be merged", + needed_change) return False if changes_needed: return changes_needed diff --git a/zuul/manager/independent.py b/zuul/manager/independent.py index dfcbd1393..1ff0f52ca 100644 --- a/zuul/manager/independent.py +++ b/zuul/manager/independent.py @@ -11,6 +11,7 @@ # under the License. from zuul import model +from zuul.lib.logutil import get_annotated_logger from zuul.manager import PipelineManager, DynamicChangeQueueContextManager @@ -22,7 +23,9 @@ class IndependentPipelineManager(PipelineManager): def _postConfig(self, layout): super(IndependentPipelineManager, self)._postConfig(layout) - def getChangeQueue(self, change, existing=None): + def getChangeQueue(self, change, event, existing=None): + log = get_annotated_logger(self.log, event) + # We ignore any shared change queues on the pipeline and # instead create a new change queue for every change. if existing: @@ -30,14 +33,16 @@ class IndependentPipelineManager(PipelineManager): change_queue = model.ChangeQueue(self.pipeline) change_queue.addProject(change.project) self.pipeline.addQueue(change_queue) - self.log.debug("Dynamically created queue %s", change_queue) + log.debug("Dynamically created queue %s", change_queue) return DynamicChangeQueueContextManager(change_queue) def enqueueChangesAhead(self, change, event, quiet, ignore_requirements, change_queue, history=None): + log = get_annotated_logger(self.log, event) + if history and change in history: # detected dependency cycle - self.log.warn("Dependency cycle detected") + log.warn("Dependency cycle detected") return False if hasattr(change, 'number'): history = history or [] @@ -46,11 +51,10 @@ class IndependentPipelineManager(PipelineManager): # Don't enqueue dependencies ahead of a non-change ref. return True - ret = self.checkForChangesNeededBy(change, change_queue) + ret = self.checkForChangesNeededBy(change, change_queue, event) if ret in [True, False]: return ret - self.log.debug(" Changes %s must be merged ahead of %s" % - (ret, change)) + log.debug(" Changes %s must be merged ahead of %s" % (ret, change)) for needed_change in ret: # This differs from the dependent pipeline by enqueuing # changes ahead as "not live", that is, not intended to @@ -65,32 +69,34 @@ class IndependentPipelineManager(PipelineManager): return False return True - def checkForChangesNeededBy(self, change, change_queue): + def checkForChangesNeededBy(self, change, change_queue, event): + log = get_annotated_logger(self.log, event) + if self.pipeline.ignore_dependencies: return True - self.log.debug("Checking for changes needed by %s:" % change) + log.debug("Checking for changes needed by %s:" % change) # Return true if okay to proceed enqueing this change, # false if the change should not be enqueued. if (hasattr(change, 'commit_needs_changes') and (change.refresh_deps or change.commit_needs_changes is None)): - self.updateCommitDependencies(change, None) + self.updateCommitDependencies(change, None, event) if not hasattr(change, 'needs_changes'): - self.log.debug(" %s does not support dependencies" % type(change)) + log.debug(" %s does not support dependencies" % type(change)) return True if not change.needs_changes: - self.log.debug(" No changes needed") + log.debug(" No changes needed") return True changes_needed = [] for needed_change in change.needs_changes: - self.log.debug(" Change %s needs change %s:" % ( + log.debug(" Change %s needs change %s:" % ( change, needed_change)) if needed_change.is_merged: - self.log.debug(" Needed change is merged") + log.debug(" Needed change is merged") continue if self.isChangeAlreadyInQueue(needed_change, change_queue): - self.log.debug(" Needed change is already ahead in the queue") + log.debug(" Needed change is already ahead in the queue") continue - self.log.debug(" Change %s is needed" % needed_change) + log.debug(" Change %s is needed" % needed_change) if needed_change not in changes_needed: changes_needed.append(needed_change) continue diff --git a/zuul/manager/supercedent.py b/zuul/manager/supercedent.py index 73932484b..ec0174e2f 100644 --- a/zuul/manager/supercedent.py +++ b/zuul/manager/supercedent.py @@ -11,6 +11,7 @@ # under the License. from zuul import model +from zuul.lib.logutil import get_annotated_logger from zuul.manager import PipelineManager, DynamicChangeQueueContextManager @@ -19,7 +20,9 @@ class SupercedentPipelineManager(PipelineManager): changes_merge = False - def getChangeQueue(self, change, existing=None): + def getChangeQueue(self, change, event, existing=None): + log = get_annotated_logger(self.log, event) + # creates a new change queue for every project-ref # combination. if existing: @@ -33,7 +36,7 @@ class SupercedentPipelineManager(PipelineManager): hasattr(queue.queue[-1].change, 'branch') and queue.queue[-1].change.branch == change.branch) or queue.queue[-1].change.ref == change.ref)): - self.log.debug("Found existing queue %s", queue) + log.debug("Found existing queue %s", queue) return DynamicChangeQueueContextManager(queue) change_queue = model.ChangeQueue( self.pipeline, @@ -43,7 +46,7 @@ class SupercedentPipelineManager(PipelineManager): window_decrease_type='none') change_queue.addProject(change.project) self.pipeline.addQueue(change_queue) - self.log.debug("Dynamically created queue %s", change_queue) + log.debug("Dynamically created queue %s", change_queue) return DynamicChangeQueueContextManager(change_queue) def _pruneQueues(self): |