summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJames E. Blair <jim@acmegating.com>2023-02-09 14:05:49 -0800
committerJames E. Blair <jim@acmegating.com>2023-02-17 10:12:14 -0800
commit9229e1a774c893d0b80981f22f0e7e5dfc319ef8 (patch)
tree0208859212b35104195b38bfe4579c7cc2d35ea8
parent58dbb7ae1a364ad18856a38b6f2787f13419caf1 (diff)
downloadzuul-9229e1a774c893d0b80981f22f0e7e5dfc319ef8.tar.gz
Match events to pipelines based on topic deps
We distribute tenant events to pipelines based on whether the event matches the pipeline (ie, patchset-created for a check pipeline) or if the event is related to a change already in the pipeline. The latter condition means that pipelines notice quickly when dependencies are changed and they can take appropriate action (such as ejecting changes which no longer have correct dependencies). For git and commit dependencies, an update to a cycle to add a new change requires an update to at least one existing change (for example, adding a new change to a cycle usually requires at least two Depends-On footers: the new change, as well as one of the changes already in the cycle). This means that updates to add new changes to cycles quickly come to the attention of pipelines. However, it is possible to add a new change to a topic dependency cycle without updating any existing changes. Merely uploading a new change in the same topic adds it to the cycle. Since that new change does not appear in any existing pipelines, pipelines won't notice the update until their next natural processing cycle, at which point they will refresh dependencies of any changes they contain, and they will notice the new dpendency and eject the cycle. To align the behavior of topic dependencies with git and commit dependencis, this change causes the scheduler to refresh the dependencies of the change it is handling during tenant trigger event processing, so that it can then compare that change's dependencies to changes already in pipelines to determine if this event is potentially relevant. This moves some work from pipeline processing (which is highly parallel) to tenant processing (which is only somewhat parallel). This could slow tenant event processing somewhat. However, the work is persisted in the change cache, and so it will not need to be repeated during pipeline processing. This is necessary because the tenant trigger event processor operates only with the pipeline change list data; it does not perform a full pipeline refresh, so it does not have access to the current queue items and their changes in order to compare the event change's topic with currently enqueued topics. There are some alternative ways we could implement this if the additional cost is an issue: 1) At the beginning of tenant trigger event processing, using the change list, restore each of the queue's change items from the change cache and compare topics. For large queues, this could end up generating quite a bit of ZK traffic. 2) Add the change topic to the change reference data structure so that it is stored in the change list. This is an abuse of this structure which otherwise exists only to store the minimum amount of information about a change in order to uniquely identify it. 3) Implement a PipelineTopicList similar to a PipelineChangeList for storing pipeline topics and accesing them without a full refresh. Another alternative would be to accept the delayed event handling of topic dependencies and elect not to "fix" this behavior. Change-Id: Ia9d691fa45d4a71a1bc78cc7a4bdec206cc025c8
-rw-r--r--tests/fixtures/layouts/deps-by-topic.yaml13
-rw-r--r--tests/unit/test_circular_dependencies.py85
-rw-r--r--zuul/manager/__init__.py19
-rw-r--r--zuul/scheduler.py19
4 files changed, 119 insertions, 17 deletions
diff --git a/tests/fixtures/layouts/deps-by-topic.yaml b/tests/fixtures/layouts/deps-by-topic.yaml
index 3824c5c2c..e7e8fc465 100644
--- a/tests/fixtures/layouts/deps-by-topic.yaml
+++ b/tests/fixtures/layouts/deps-by-topic.yaml
@@ -47,24 +47,27 @@
run: playbooks/run.yaml
- job:
- name: test-job
+ name: check-job
+
+- job:
+ name: gate-job
- project:
name: org/project1
queue: integrated
check:
jobs:
- - test-job
+ - check-job
gate:
jobs:
- - test-job
+ - gate-job
- project:
name: org/project2
queue: integrated
check:
jobs:
- - test-job
+ - check-job
gate:
jobs:
- - test-job
+ - gate-job
diff --git a/tests/unit/test_circular_dependencies.py b/tests/unit/test_circular_dependencies.py
index f534b2596..f38e55001 100644
--- a/tests/unit/test_circular_dependencies.py
+++ b/tests/unit/test_circular_dependencies.py
@@ -2267,8 +2267,8 @@ class TestGerritCircularDependencies(ZuulTestCase):
self.assertEqual(B.patchsets[-1]["approvals"][0]["value"], "1")
self.assertHistory([
- dict(name="test-job", result="SUCCESS", changes="2,1 1,1"),
- dict(name="test-job", result="SUCCESS", changes="1,1 2,1"),
+ dict(name="check-job", result="SUCCESS", changes="2,1 1,1"),
+ dict(name="check-job", result="SUCCESS", changes="1,1 2,1"),
], ordered=False)
A.addPatchset()
@@ -2277,10 +2277,10 @@ class TestGerritCircularDependencies(ZuulTestCase):
self.assertHistory([
# Original check run
- dict(name="test-job", result="SUCCESS", changes="2,1 1,1"),
- dict(name="test-job", result="SUCCESS", changes="1,1 2,1"),
+ dict(name="check-job", result="SUCCESS", changes="2,1 1,1"),
+ dict(name="check-job", result="SUCCESS", changes="1,1 2,1"),
# Second check run
- dict(name="test-job", result="SUCCESS", changes="2,1 1,2"),
+ dict(name="check-job", result="SUCCESS", changes="2,1 1,2"),
], ordered=False)
def test_deps_by_topic_multi_tenant(self):
@@ -2378,6 +2378,81 @@ class TestGerritCircularDependencies(ZuulTestCase):
dict(name="project-job", result="SUCCESS", changes="2,1 1,2"),
], ordered=False)
+ @simple_layout('layouts/deps-by-topic.yaml')
+ def test_dependency_refresh_by_topic_check(self):
+ # Test that when two changes are put into a cycle, the
+ # dependencies are refreshed and items already in pipelines
+ # are updated.
+ self.executor_server.hold_jobs_in_build = True
+
+ # This simulates the typical workflow where a developer
+ # uploads changes one at a time.
+ # The first change:
+ A = self.fake_gerrit.addFakeChange('org/project1', "master", "A",
+ topic='test-topic')
+ self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1))
+ self.waitUntilSettled()
+
+ # Now that it has been uploaded, upload the second change
+ # in the same topic.
+ B = self.fake_gerrit.addFakeChange('org/project2', "master", "B",
+ topic='test-topic')
+ self.fake_gerrit.addEvent(B.getPatchsetCreatedEvent(1))
+ self.waitUntilSettled()
+
+ self.executor_server.hold_jobs_in_build = False
+ self.executor_server.release()
+ self.waitUntilSettled()
+
+ # A quirk: at the end of this process, the second change in
+ # Gerrit has a complete run because only at that point is the
+ # topic complete; the first is aborted once the second is
+ # uploaded.
+ self.assertHistory([
+ dict(name="check-job", result="ABORTED", changes="1,1"),
+ dict(name="check-job", result="SUCCESS", changes="1,1 2,1"),
+ ], ordered=False)
+
+ @simple_layout('layouts/deps-by-topic.yaml')
+ def test_dependency_refresh_by_topic_gate(self):
+ # Test that when two changes are put into a cycle, the
+ # dependencies are refreshed and items already in pipelines
+ # are updated.
+ self.executor_server.hold_jobs_in_build = True
+
+ # This simulates a workflow where a developer adds a change to
+ # a cycle already in gate.
+ A = self.fake_gerrit.addFakeChange('org/project1', "master", "A",
+ topic='test-topic')
+ B = self.fake_gerrit.addFakeChange('org/project2', "master", "B",
+ topic='test-topic')
+ A.addApproval("Code-Review", 2)
+ B.addApproval("Code-Review", 2)
+ A.addApproval("Approved", 1)
+ self.fake_gerrit.addEvent(B.addApproval("Approved", 1))
+ self.waitUntilSettled()
+
+ # Add a new change to the cycle.
+ C = self.fake_gerrit.addFakeChange('org/project1', "master", "C",
+ topic='test-topic')
+ self.fake_gerrit.addEvent(C.getPatchsetCreatedEvent(1))
+ self.waitUntilSettled()
+
+ self.executor_server.hold_jobs_in_build = False
+ self.executor_server.release()
+ self.waitUntilSettled()
+
+ # At the end of this process, the gate jobs should be aborted
+ # because the new dpendency showed up.
+ self.assertEqual(A.data["status"], "NEW")
+ self.assertEqual(B.data["status"], "NEW")
+ self.assertEqual(C.data["status"], "NEW")
+ self.assertHistory([
+ dict(name="gate-job", result="ABORTED", changes="1,1 2,1"),
+ dict(name="gate-job", result="ABORTED", changes="1,1 2,1"),
+ dict(name="check-job", result="SUCCESS", changes="2,1 1,1 3,1"),
+ ], ordered=False)
+
class TestGithubCircularDependencies(ZuulTestCase):
config_file = "zuul-gerrit-github.conf"
diff --git a/zuul/manager/__init__.py b/zuul/manager/__init__.py
index 832be780a..430fcc320 100644
--- a/zuul/manager/__init__.py
+++ b/zuul/manager/__init__.py
@@ -216,7 +216,7 @@ class PipelineManager(metaclass=ABCMeta):
and self.useDependenciesByTopic(change.project))
if (update_commit_dependencies
or update_topic_dependencies):
- self.updateCommitDependencies(change, None, event=None)
+ self.updateCommitDependencies(change, event=None)
self._change_cache[change.cache_key] = change
resolved_changes.append(change)
return resolved_changes
@@ -258,11 +258,18 @@ class PipelineManager(metaclass=ABCMeta):
return True
return False
- def isAnyVersionOfChangeInPipeline(self, change):
- # Checks any items in the pipeline
+ def isChangeRelevantToPipeline(self, change):
+ # Checks if any version of the change or its deps matches any
+ # item in the pipeline.
for change_key in self.pipeline.change_list.getChangeKeys():
if change.cache_stat.key.isSameChange(change_key):
return True
+ if isinstance(change, model.Change):
+ for dep_change_ref in change.getNeedsChanges(
+ self.useDependenciesByTopic(change.project)):
+ dep_change_key = ChangeKey.fromReference(dep_change_ref)
+ if change.cache_stat.key.isSameChange(dep_change_key):
+ return True
return False
def isChangeAlreadyInQueue(self, change, change_queue):
@@ -288,7 +295,7 @@ class PipelineManager(metaclass=ABCMeta):
to_refresh.add(item.change)
for existing_change in to_refresh:
- self.updateCommitDependencies(existing_change, None, event)
+ self.updateCommitDependencies(existing_change, event)
def reportEnqueue(self, item):
if not self.pipeline.state.disabled:
@@ -537,7 +544,7 @@ class PipelineManager(metaclass=ABCMeta):
# to date and this is a noop; otherwise, we need to refresh
# them anyway.
if isinstance(change, model.Change):
- self.updateCommitDependencies(change, None, event)
+ self.updateCommitDependencies(change, event)
with self.getChangeQueue(change, event, change_queue) as change_queue:
if not change_queue:
@@ -830,7 +837,7 @@ class PipelineManager(metaclass=ABCMeta):
self.pipeline.tenant.name][other_pipeline.name].put(
event, needs_result=False)
- def updateCommitDependencies(self, change, change_queue, event):
+ def updateCommitDependencies(self, change, event):
log = get_annotated_logger(self.log, event)
must_update_commit_deps = (
diff --git a/zuul/scheduler.py b/zuul/scheduler.py
index 7f61f3fe4..2b78b4c84 100644
--- a/zuul/scheduler.py
+++ b/zuul/scheduler.py
@@ -2502,9 +2502,26 @@ class Scheduler(threading.Thread):
event.span_context = tracing.getSpanContext(span)
for pipeline in tenant.layout.pipelines.values():
+ # For most kinds of dependencies, it's sufficient to check
+ # if this change is already in the pipeline, because the
+ # only way to update a dependency cycle is to update one
+ # of the changes in it. However, dependencies-by-topic
+ # can have changes added to the cycle without updating any
+ # of the existing changes in the cycle. That means in
+ # order to detect whether a new change is added to an
+ # existing cycle in the pipeline, we need to know all of
+ # the dependencies of the new change, and check if *they*
+ # are in the pipeline. Therefore, go ahead and update our
+ # dependencies here so they are available for comparison
+ # against the pipeline contents. This front-loads some
+ # work that otherwise would happen in the pipeline
+ # manager, but the result of the work goes into the change
+ # cache, so it's not wasted; it's just less parallelized.
+ if isinstance(change, Change):
+ pipeline.manager.updateCommitDependencies(change, event)
if (
pipeline.manager.eventMatches(event, change)
- or pipeline.manager.isAnyVersionOfChangeInPipeline(change)
+ or pipeline.manager.isChangeRelevantToPipeline(change)
):
self.pipeline_trigger_events[tenant.name][
pipeline.name