diff options
author | Zuul <zuul@review.opendev.org> | 2023-02-27 21:49:42 +0000 |
---|---|---|
committer | Gerrit Code Review <review@openstack.org> | 2023-02-27 21:49:42 +0000 |
commit | bb648c8b7d4a996a3f107effa693704dd1b34520 (patch) | |
tree | d65e4b9dc25e09fc44f8a44a3f46bc12f30137ff /zuul | |
parent | 7254a75cd8ec9a34d9eb10e51a4b93de2e4defd8 (diff) | |
parent | 9229e1a774c893d0b80981f22f0e7e5dfc319ef8 (diff) | |
download | zuul-bb648c8b7d4a996a3f107effa693704dd1b34520.tar.gz |
Merge "Match events to pipelines based on topic deps"
Diffstat (limited to 'zuul')
-rw-r--r-- | zuul/manager/__init__.py | 19 | ||||
-rw-r--r-- | zuul/scheduler.py | 19 |
2 files changed, 31 insertions, 7 deletions
diff --git a/zuul/manager/__init__.py b/zuul/manager/__init__.py index 22543b5cd..f6558d46d 100644 --- a/zuul/manager/__init__.py +++ b/zuul/manager/__init__.py @@ -243,7 +243,7 @@ class PipelineManager(metaclass=ABCMeta): and self.useDependenciesByTopic(change.project)) if (update_commit_dependencies or update_topic_dependencies): - self.updateCommitDependencies(change, None, event=None) + self.updateCommitDependencies(change, event=None) self._change_cache[change.cache_key] = change resolved_changes.append(change) return resolved_changes @@ -285,11 +285,18 @@ class PipelineManager(metaclass=ABCMeta): return True return False - def isAnyVersionOfChangeInPipeline(self, change): - # Checks any items in the pipeline + def isChangeRelevantToPipeline(self, change): + # Checks if any version of the change or its deps matches any + # item in the pipeline. for change_key in self.pipeline.change_list.getChangeKeys(): if change.cache_stat.key.isSameChange(change_key): return True + if isinstance(change, model.Change): + for dep_change_ref in change.getNeedsChanges( + self.useDependenciesByTopic(change.project)): + dep_change_key = ChangeKey.fromReference(dep_change_ref) + if change.cache_stat.key.isSameChange(dep_change_key): + return True return False def isChangeAlreadyInQueue(self, change, change_queue): @@ -315,7 +322,7 @@ class PipelineManager(metaclass=ABCMeta): to_refresh.add(item.change) for existing_change in to_refresh: - self.updateCommitDependencies(existing_change, None, event) + self.updateCommitDependencies(existing_change, event) def reportEnqueue(self, item): if not self.pipeline.state.disabled: @@ -564,7 +571,7 @@ class PipelineManager(metaclass=ABCMeta): # to date and this is a noop; otherwise, we need to refresh # them anyway. if isinstance(change, model.Change): - self.updateCommitDependencies(change, None, event) + self.updateCommitDependencies(change, event) with self.getChangeQueue(change, event, change_queue) as change_queue: if not change_queue: @@ -857,7 +864,7 @@ class PipelineManager(metaclass=ABCMeta): self.pipeline.tenant.name][other_pipeline.name].put( event, needs_result=False) - def updateCommitDependencies(self, change, change_queue, event): + def updateCommitDependencies(self, change, event): log = get_annotated_logger(self.log, event) must_update_commit_deps = ( diff --git a/zuul/scheduler.py b/zuul/scheduler.py index e646c09b8..cd15a878c 100644 --- a/zuul/scheduler.py +++ b/zuul/scheduler.py @@ -2519,9 +2519,26 @@ class Scheduler(threading.Thread): event.span_context = tracing.getSpanContext(span) for pipeline in tenant.layout.pipelines.values(): + # For most kinds of dependencies, it's sufficient to check + # if this change is already in the pipeline, because the + # only way to update a dependency cycle is to update one + # of the changes in it. However, dependencies-by-topic + # can have changes added to the cycle without updating any + # of the existing changes in the cycle. That means in + # order to detect whether a new change is added to an + # existing cycle in the pipeline, we need to know all of + # the dependencies of the new change, and check if *they* + # are in the pipeline. Therefore, go ahead and update our + # dependencies here so they are available for comparison + # against the pipeline contents. This front-loads some + # work that otherwise would happen in the pipeline + # manager, but the result of the work goes into the change + # cache, so it's not wasted; it's just less parallelized. + if isinstance(change, Change): + pipeline.manager.updateCommitDependencies(change, event) if ( pipeline.manager.eventMatches(event, change) - or pipeline.manager.isAnyVersionOfChangeInPipeline(change) + or pipeline.manager.isChangeRelevantToPipeline(change) ): self.pipeline_trigger_events[tenant.name][ pipeline.name |