summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorZuul <zuul@review.opendev.org>2023-02-27 21:49:42 +0000
committerGerrit Code Review <review@openstack.org>2023-02-27 21:49:42 +0000
commitbb648c8b7d4a996a3f107effa693704dd1b34520 (patch)
treed65e4b9dc25e09fc44f8a44a3f46bc12f30137ff
parent7254a75cd8ec9a34d9eb10e51a4b93de2e4defd8 (diff)
parent9229e1a774c893d0b80981f22f0e7e5dfc319ef8 (diff)
downloadzuul-bb648c8b7d4a996a3f107effa693704dd1b34520.tar.gz
Merge "Match events to pipelines based on topic deps"
-rw-r--r--tests/fixtures/layouts/deps-by-topic.yaml13
-rw-r--r--tests/unit/test_circular_dependencies.py85
-rw-r--r--zuul/manager/__init__.py19
-rw-r--r--zuul/scheduler.py19
4 files changed, 119 insertions, 17 deletions
diff --git a/tests/fixtures/layouts/deps-by-topic.yaml b/tests/fixtures/layouts/deps-by-topic.yaml
index 3824c5c2c..e7e8fc465 100644
--- a/tests/fixtures/layouts/deps-by-topic.yaml
+++ b/tests/fixtures/layouts/deps-by-topic.yaml
@@ -47,24 +47,27 @@
run: playbooks/run.yaml
- job:
- name: test-job
+ name: check-job
+
+- job:
+ name: gate-job
- project:
name: org/project1
queue: integrated
check:
jobs:
- - test-job
+ - check-job
gate:
jobs:
- - test-job
+ - gate-job
- project:
name: org/project2
queue: integrated
check:
jobs:
- - test-job
+ - check-job
gate:
jobs:
- - test-job
+ - gate-job
diff --git a/tests/unit/test_circular_dependencies.py b/tests/unit/test_circular_dependencies.py
index f534b2596..f38e55001 100644
--- a/tests/unit/test_circular_dependencies.py
+++ b/tests/unit/test_circular_dependencies.py
@@ -2267,8 +2267,8 @@ class TestGerritCircularDependencies(ZuulTestCase):
self.assertEqual(B.patchsets[-1]["approvals"][0]["value"], "1")
self.assertHistory([
- dict(name="test-job", result="SUCCESS", changes="2,1 1,1"),
- dict(name="test-job", result="SUCCESS", changes="1,1 2,1"),
+ dict(name="check-job", result="SUCCESS", changes="2,1 1,1"),
+ dict(name="check-job", result="SUCCESS", changes="1,1 2,1"),
], ordered=False)
A.addPatchset()
@@ -2277,10 +2277,10 @@ class TestGerritCircularDependencies(ZuulTestCase):
self.assertHistory([
# Original check run
- dict(name="test-job", result="SUCCESS", changes="2,1 1,1"),
- dict(name="test-job", result="SUCCESS", changes="1,1 2,1"),
+ dict(name="check-job", result="SUCCESS", changes="2,1 1,1"),
+ dict(name="check-job", result="SUCCESS", changes="1,1 2,1"),
# Second check run
- dict(name="test-job", result="SUCCESS", changes="2,1 1,2"),
+ dict(name="check-job", result="SUCCESS", changes="2,1 1,2"),
], ordered=False)
def test_deps_by_topic_multi_tenant(self):
@@ -2378,6 +2378,81 @@ class TestGerritCircularDependencies(ZuulTestCase):
dict(name="project-job", result="SUCCESS", changes="2,1 1,2"),
], ordered=False)
+ @simple_layout('layouts/deps-by-topic.yaml')
+ def test_dependency_refresh_by_topic_check(self):
+ # Test that when two changes are put into a cycle, the
+ # dependencies are refreshed and items already in pipelines
+ # are updated.
+ self.executor_server.hold_jobs_in_build = True
+
+ # This simulates the typical workflow where a developer
+ # uploads changes one at a time.
+ # The first change:
+ A = self.fake_gerrit.addFakeChange('org/project1', "master", "A",
+ topic='test-topic')
+ self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1))
+ self.waitUntilSettled()
+
+ # Now that it has been uploaded, upload the second change
+ # in the same topic.
+ B = self.fake_gerrit.addFakeChange('org/project2', "master", "B",
+ topic='test-topic')
+ self.fake_gerrit.addEvent(B.getPatchsetCreatedEvent(1))
+ self.waitUntilSettled()
+
+ self.executor_server.hold_jobs_in_build = False
+ self.executor_server.release()
+ self.waitUntilSettled()
+
+ # A quirk: at the end of this process, the second change in
+ # Gerrit has a complete run because only at that point is the
+ # topic complete; the first is aborted once the second is
+ # uploaded.
+ self.assertHistory([
+ dict(name="check-job", result="ABORTED", changes="1,1"),
+ dict(name="check-job", result="SUCCESS", changes="1,1 2,1"),
+ ], ordered=False)
+
+ @simple_layout('layouts/deps-by-topic.yaml')
+ def test_dependency_refresh_by_topic_gate(self):
+ # Test that when two changes are put into a cycle, the
+ # dependencies are refreshed and items already in pipelines
+ # are updated.
+ self.executor_server.hold_jobs_in_build = True
+
+ # This simulates a workflow where a developer adds a change to
+ # a cycle already in gate.
+ A = self.fake_gerrit.addFakeChange('org/project1', "master", "A",
+ topic='test-topic')
+ B = self.fake_gerrit.addFakeChange('org/project2', "master", "B",
+ topic='test-topic')
+ A.addApproval("Code-Review", 2)
+ B.addApproval("Code-Review", 2)
+ A.addApproval("Approved", 1)
+ self.fake_gerrit.addEvent(B.addApproval("Approved", 1))
+ self.waitUntilSettled()
+
+ # Add a new change to the cycle.
+ C = self.fake_gerrit.addFakeChange('org/project1', "master", "C",
+ topic='test-topic')
+ self.fake_gerrit.addEvent(C.getPatchsetCreatedEvent(1))
+ self.waitUntilSettled()
+
+ self.executor_server.hold_jobs_in_build = False
+ self.executor_server.release()
+ self.waitUntilSettled()
+
+ # At the end of this process, the gate jobs should be aborted
+ # because the new dpendency showed up.
+ self.assertEqual(A.data["status"], "NEW")
+ self.assertEqual(B.data["status"], "NEW")
+ self.assertEqual(C.data["status"], "NEW")
+ self.assertHistory([
+ dict(name="gate-job", result="ABORTED", changes="1,1 2,1"),
+ dict(name="gate-job", result="ABORTED", changes="1,1 2,1"),
+ dict(name="check-job", result="SUCCESS", changes="2,1 1,1 3,1"),
+ ], ordered=False)
+
class TestGithubCircularDependencies(ZuulTestCase):
config_file = "zuul-gerrit-github.conf"
diff --git a/zuul/manager/__init__.py b/zuul/manager/__init__.py
index 22543b5cd..f6558d46d 100644
--- a/zuul/manager/__init__.py
+++ b/zuul/manager/__init__.py
@@ -243,7 +243,7 @@ class PipelineManager(metaclass=ABCMeta):
and self.useDependenciesByTopic(change.project))
if (update_commit_dependencies
or update_topic_dependencies):
- self.updateCommitDependencies(change, None, event=None)
+ self.updateCommitDependencies(change, event=None)
self._change_cache[change.cache_key] = change
resolved_changes.append(change)
return resolved_changes
@@ -285,11 +285,18 @@ class PipelineManager(metaclass=ABCMeta):
return True
return False
- def isAnyVersionOfChangeInPipeline(self, change):
- # Checks any items in the pipeline
+ def isChangeRelevantToPipeline(self, change):
+ # Checks if any version of the change or its deps matches any
+ # item in the pipeline.
for change_key in self.pipeline.change_list.getChangeKeys():
if change.cache_stat.key.isSameChange(change_key):
return True
+ if isinstance(change, model.Change):
+ for dep_change_ref in change.getNeedsChanges(
+ self.useDependenciesByTopic(change.project)):
+ dep_change_key = ChangeKey.fromReference(dep_change_ref)
+ if change.cache_stat.key.isSameChange(dep_change_key):
+ return True
return False
def isChangeAlreadyInQueue(self, change, change_queue):
@@ -315,7 +322,7 @@ class PipelineManager(metaclass=ABCMeta):
to_refresh.add(item.change)
for existing_change in to_refresh:
- self.updateCommitDependencies(existing_change, None, event)
+ self.updateCommitDependencies(existing_change, event)
def reportEnqueue(self, item):
if not self.pipeline.state.disabled:
@@ -564,7 +571,7 @@ class PipelineManager(metaclass=ABCMeta):
# to date and this is a noop; otherwise, we need to refresh
# them anyway.
if isinstance(change, model.Change):
- self.updateCommitDependencies(change, None, event)
+ self.updateCommitDependencies(change, event)
with self.getChangeQueue(change, event, change_queue) as change_queue:
if not change_queue:
@@ -857,7 +864,7 @@ class PipelineManager(metaclass=ABCMeta):
self.pipeline.tenant.name][other_pipeline.name].put(
event, needs_result=False)
- def updateCommitDependencies(self, change, change_queue, event):
+ def updateCommitDependencies(self, change, event):
log = get_annotated_logger(self.log, event)
must_update_commit_deps = (
diff --git a/zuul/scheduler.py b/zuul/scheduler.py
index e646c09b8..cd15a878c 100644
--- a/zuul/scheduler.py
+++ b/zuul/scheduler.py
@@ -2519,9 +2519,26 @@ class Scheduler(threading.Thread):
event.span_context = tracing.getSpanContext(span)
for pipeline in tenant.layout.pipelines.values():
+ # For most kinds of dependencies, it's sufficient to check
+ # if this change is already in the pipeline, because the
+ # only way to update a dependency cycle is to update one
+ # of the changes in it. However, dependencies-by-topic
+ # can have changes added to the cycle without updating any
+ # of the existing changes in the cycle. That means in
+ # order to detect whether a new change is added to an
+ # existing cycle in the pipeline, we need to know all of
+ # the dependencies of the new change, and check if *they*
+ # are in the pipeline. Therefore, go ahead and update our
+ # dependencies here so they are available for comparison
+ # against the pipeline contents. This front-loads some
+ # work that otherwise would happen in the pipeline
+ # manager, but the result of the work goes into the change
+ # cache, so it's not wasted; it's just less parallelized.
+ if isinstance(change, Change):
+ pipeline.manager.updateCommitDependencies(change, event)
if (
pipeline.manager.eventMatches(event, change)
- or pipeline.manager.isAnyVersionOfChangeInPipeline(change)
+ or pipeline.manager.isChangeRelevantToPipeline(change)
):
self.pipeline_trigger_events[tenant.name][
pipeline.name