summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorZuul <zuul@review.opendev.org>2022-07-29 17:37:57 +0000
committerGerrit Code Review <review@openstack.org>2022-07-29 17:37:57 +0000
commit3c95746185469364dcf8cbecf74c444295247e09 (patch)
tree2376ff1388f9b7334837f59a160ad0905de8bddd
parentb2b36d413e3413a76118b74b2cf003d7b7fe7bad (diff)
parenta61ec1bb5f92e4e3b84741df434e2de32b540fac (diff)
downloadzuul-3c95746185469364dcf8cbecf74c444295247e09.tar.gz
Merge "Clear pipeline change cache at start of refresh"
-rw-r--r--tests/unit/test_sos.py73
-rw-r--r--zuul/manager/__init__.py3
-rw-r--r--zuul/model.py5
3 files changed, 81 insertions, 0 deletions
diff --git a/tests/unit/test_sos.py b/tests/unit/test_sos.py
index 7d95f2548..f371a8064 100644
--- a/tests/unit/test_sos.py
+++ b/tests/unit/test_sos.py
@@ -55,6 +55,79 @@ class TestScaleOutScheduler(ZuulTestCase):
dict(name='project-test2', result='SUCCESS', changes='1,1'),
], ordered=False)
+ def test_pipeline_cache_clear(self):
+ # Test that the pipeline cache on a second scheduler isn't
+ # holding old change objects.
+
+ # Hold jobs in build
+ sched1 = self.scheds.first
+ self.executor_server.hold_jobs_in_build = True
+
+ # We need a pair of changes in order to populate the pipeline
+ # change cache (a single change doesn't activate the cache,
+ # it's for dependencies).
+ A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
+ A.addApproval('Code-Review', 2)
+ B = self.fake_gerrit.addFakeChange('org/project', 'master', 'B')
+ B.addApproval('Code-Review', 2)
+ B.addApproval('Approved', 1)
+ B.setDependsOn(A, 1)
+
+ # Fail a job
+ self.executor_server.failJob('project-test1', A)
+
+ # Enqueue into gate with scheduler 1
+ self.fake_gerrit.addEvent(A.addApproval('Approved', 1))
+ self.waitUntilSettled()
+
+ # Start scheduler 2
+ sched2 = self.createScheduler()
+ sched2.start()
+ self.assertEqual(len(self.scheds), 2)
+
+ # Pause scheduler 1
+ with sched1.sched.run_handler_lock:
+ # Release jobs
+ self.executor_server.hold_jobs_in_build = False
+ self.executor_server.release()
+ # Wait for scheduler 2 to dequeue
+ self.waitUntilSettled(matcher=[sched2])
+ # Unpause scheduler 1
+ self.assertEqual(A.data['status'], 'NEW')
+ self.assertEqual(B.data['status'], 'NEW')
+
+ # Clear zk change cache
+ self.fake_gerrit._change_cache.prune([], max_age=0)
+
+ # At this point, scheduler 1 should have a bogus change entry
+ # in the pipeline cache because scheduler 2 performed the
+ # dequeue so scheduler 1 never cleaned up its cache.
+
+ self.executor_server.fail_tests.clear()
+ self.executor_server.hold_jobs_in_build = True
+ # Pause scheduler 1
+ with sched1.sched.run_handler_lock:
+ # Enqueue into gate with scheduler 2
+ self.fake_gerrit.addEvent(A.addApproval('Approved', 1))
+ self.waitUntilSettled(matcher=[sched2])
+
+ # Pause scheduler 2
+ with sched2.sched.run_handler_lock:
+ # Make sure that scheduler 1 does some pipeline runs which
+ # reconstitute state from ZK. This gives it the
+ # opportunity to use old cache data if we don't clear it.
+
+ # Release job1
+ self.executor_server.release()
+ self.waitUntilSettled(matcher=[sched1])
+ # Release job2
+ self.executor_server.hold_jobs_in_build = False
+ self.executor_server.release()
+ # Wait for scheduler 1 to merge change
+ self.waitUntilSettled(matcher=[sched1])
+ self.assertEqual(A.data['status'], 'MERGED')
+ self.assertEqual(B.data['status'], 'MERGED')
+
@simple_layout('layouts/multi-scheduler-status.yaml')
def test_multi_scheduler_status(self):
self.hold_merge_jobs_in_queue = True
diff --git a/zuul/manager/__init__.py b/zuul/manager/__init__.py
index 417de9acd..7cb96e34b 100644
--- a/zuul/manager/__init__.py
+++ b/zuul/manager/__init__.py
@@ -235,6 +235,9 @@ class PipelineManager(metaclass=ABCMeta):
resolved_changes.append(change)
return resolved_changes
+ def clearCache(self):
+ self._change_cache.clear()
+
def _maintainCache(self):
active_layout_uuids = set()
referenced_change_keys = set()
diff --git a/zuul/model.py b/zuul/model.py
index f1edf5d93..14f10deb9 100644
--- a/zuul/model.py
+++ b/zuul/model.py
@@ -694,6 +694,11 @@ class PipelineState(zkobject.ZKObject):
return json.dumps(data, sort_keys=True).encode("utf8")
def deserialize(self, raw, context):
+ # We may have old change objects in the pipeline cache, so
+ # make sure they are the same objects we would get from the
+ # source change cache.
+ self.pipeline.manager.clearCache()
+
data = super().deserialize(raw, context)
existing_queues = {
q.getPath(): q for q in self.queues + self.old_queues