summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorZuul <zuul@review.opendev.org>2023-02-03 13:56:41 +0000
committerGerrit Code Review <review@openstack.org>2023-02-03 13:56:41 +0000
commit1ae2d929a369f700a23799f695831686b8b8e150 (patch)
tree1bf048f2be62fdfe6b57f52f30c61b740419bbd6
parent263e5801758cd260a9d84a018412d0df44749b12 (diff)
parent9048706d93aa60ac414a7f5a1f94eda21a670cf9 (diff)
downloadzuul-1ae2d929a369f700a23799f695831686b8b8e150.tar.gz
Merge "Cleanup deleted pipelines and and event queues"
-rw-r--r--tests/base.py1
-rw-r--r--tests/unit/test_sos.py73
-rw-r--r--zuul/scheduler.py116
3 files changed, 154 insertions, 36 deletions
diff --git a/tests/base.py b/tests/base.py
index cdcf63390..8ac32f2d4 100644
--- a/tests/base.py
+++ b/tests/base.py
@@ -5622,6 +5622,7 @@ class ZuulTestCase(BaseTestCase):
time.sleep(0.1)
def refreshPipelines(self, sched):
+ ctx = None
for tenant in sched.abide.tenants.values():
with tenant_read_lock(self.zk_client, tenant.name):
for pipeline in tenant.layout.pipelines.values():
diff --git a/tests/unit/test_sos.py b/tests/unit/test_sos.py
index 37a47c6ae..4f2110f3e 100644
--- a/tests/unit/test_sos.py
+++ b/tests/unit/test_sos.py
@@ -244,6 +244,79 @@ class TestScaleOutScheduler(ZuulTestCase):
self.assertTrue(all(l == new.uuid for l in layout_uuids))
self.waitUntilSettled()
+ def test_live_reconfiguration_del_pipeline(self):
+ # Test pipeline deletion while changes are enqueued
+
+ # Create a second scheduler instance
+ app = self.createScheduler()
+ app.start()
+ self.assertEqual(len(self.scheds), 2)
+
+ for _ in iterate_timeout(10, "Wait until priming is complete"):
+ old = self.scheds.first.sched.tenant_layout_state.get("tenant-one")
+ if old is not None:
+ break
+
+ for _ in iterate_timeout(
+ 10, "Wait for all schedulers to have the same layout state"):
+ layout_states = [a.sched.local_layout_state.get("tenant-one")
+ for a in self.scheds.instances]
+ if all(l == old for l in layout_states):
+ break
+
+ pipeline_zk_path = app.sched.abide.tenants[
+ "tenant-one"].layout.pipelines["check"].state.getPath()
+
+ self.executor_server.hold_jobs_in_build = True
+ A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
+
+ # Let the first scheduler enqueue the change into the pipeline that
+ # will be removed later on.
+ with app.sched.run_handler_lock:
+ self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1))
+ self.waitUntilSettled(matcher=[self.scheds.first])
+
+ # Process item only on second scheduler so the first scheduler has
+ # an outdated pipeline state.
+ with self.scheds.first.sched.run_handler_lock:
+ self.executor_server.release('.*-merge')
+ self.waitUntilSettled(matcher=[app])
+ self.assertEqual(len(self.builds), 2)
+
+ self.commitConfigUpdate(
+ 'common-config',
+ 'layouts/live-reconfiguration-del-pipeline.yaml')
+ # Trigger a reconfiguration on the first scheduler with the outdated
+ # pipeline state of the pipeline that will be removed.
+ self.scheds.execute(lambda a: a.sched.reconfigure(a.config),
+ matcher=[self.scheds.first])
+
+ new = self.scheds.first.sched.tenant_layout_state.get("tenant-one")
+ for _ in iterate_timeout(
+ 10, "Wait for all schedulers to have the same layout state"):
+ layout_states = [a.sched.local_layout_state.get("tenant-one")
+ for a in self.scheds.instances]
+ if all(l == new for l in layout_states):
+ break
+
+ self.executor_server.hold_jobs_in_build = False
+ self.executor_server.release()
+ self.waitUntilSettled()
+
+ self.assertEqual(A.data['status'], 'NEW')
+ self.assertEqual(A.reported, 0)
+
+ self.assertHistory([
+ dict(name='project-merge', result='SUCCESS', changes='1,1'),
+ dict(name='project-test1', result='ABORTED', changes='1,1'),
+ dict(name='project-test2', result='ABORTED', changes='1,1'),
+ ], ordered=False)
+
+ tenant = self.scheds.first.sched.abide.tenants.get('tenant-one')
+ self.assertEqual(len(tenant.layout.pipelines), 0)
+ stat = self.zk_client.client.exists(pipeline_zk_path)
+ self.assertIsNone(stat)
+
def test_change_cache(self):
# Test re-using a change from the change cache.
A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
diff --git a/zuul/scheduler.py b/zuul/scheduler.py
index 0463409cb..7f61f3fe4 100644
--- a/zuul/scheduler.py
+++ b/zuul/scheduler.py
@@ -1758,7 +1758,12 @@ class Scheduler(threading.Thread):
new_pipeline = tenant.layout.pipelines.get(name)
if not new_pipeline:
with old_pipeline.manager.currentContext(context):
- self._reconfigureDeletePipeline(old_pipeline)
+ try:
+ self._reconfigureDeletePipeline(old_pipeline)
+ except Exception:
+ self.log.exception(
+ "Failed to cleanup deleted pipeline %s:",
+ old_pipeline)
self.management_events[tenant.name].initialize()
self.trigger_events[tenant.name].initialize()
@@ -1829,7 +1834,11 @@ class Scheduler(threading.Thread):
(tenant,))
for pipeline in tenant.layout.pipelines.values():
with pipeline.manager.currentContext(context):
- self._reconfigureDeletePipeline(pipeline)
+ try:
+ self._reconfigureDeletePipeline(pipeline)
+ except Exception:
+ self.log.exception(
+ "Failed to cleanup deleted pipeline %s:", pipeline)
# Delete the tenant root path for this tenant in ZooKeeper to remove
# all tenant specific event queues
@@ -1845,45 +1854,80 @@ class Scheduler(threading.Thread):
def _reconfigureDeletePipeline(self, pipeline):
self.log.info("Removing pipeline %s during reconfiguration" %
(pipeline,))
- for shared_queue in pipeline.queues:
- builds_to_cancel = []
- requests_to_cancel = []
- for item in shared_queue.queue:
- with item.activeContext(pipeline.manager.current_context):
- item.item_ahead = None
- item.items_behind = []
- self.log.info(
- "Removing item %s during reconfiguration" % (item,))
- for build in item.current_build_set.getBuilds():
- builds_to_cancel.append(build)
- for request_job, request in \
- item.current_build_set.getNodeRequests():
- requests_to_cancel.append(
- (
- item.current_build_set,
- request,
- item.getJob(request_job),
- )
+
+ ctx = pipeline.manager.current_context
+ pipeline.state.refresh(ctx)
+
+ builds_to_cancel = []
+ requests_to_cancel = []
+ for item in pipeline.getAllItems():
+ with item.activeContext(pipeline.manager.current_context):
+ item.item_ahead = None
+ item.items_behind = []
+ self.log.info(
+ "Removing item %s during reconfiguration" % (item,))
+ for build in item.current_build_set.getBuilds():
+ builds_to_cancel.append(build)
+ for request_job, request in \
+ item.current_build_set.getNodeRequests():
+ requests_to_cancel.append(
+ (
+ item.current_build_set,
+ request,
+ item.getJob(request_job),
)
- try:
- self.sql.reportBuildsetEnd(
- item.current_build_set, 'dequeue',
- final=False, result='DEQUEUED')
- except Exception:
- self.log.exception(
- "Error reporting buildset completion to DB:")
+ )
+ try:
+ self.sql.reportBuildsetEnd(
+ item.current_build_set, 'dequeue',
+ final=False, result='DEQUEUED')
+ except Exception:
+ self.log.exception(
+ "Error reporting buildset completion to DB:")
- for build in builds_to_cancel:
- self.log.info(
- "Canceling build %s during reconfiguration" % (build,))
+ for build in builds_to_cancel:
+ self.log.info(
+ "Canceling build %s during reconfiguration", build)
+ try:
self.cancelJob(build.build_set, build.job,
build=build, force=True)
- for build_set, request, request_job in requests_to_cancel:
- self.log.info(
- "Canceling node request %s during reconfiguration",
- request)
+ except Exception:
+ self.log.exception(
+ "Error canceling build %s during reconfiguration", build)
+ for build_set, request, request_job in requests_to_cancel:
+ self.log.info(
+ "Canceling node request %s during reconfiguration", request)
+ try:
self.cancelJob(build_set, request_job, force=True)
- shared_queue.delete(pipeline.manager.current_context)
+ except Exception:
+ self.log.exception(
+ "Error canceling node request %s during reconfiguration",
+ request)
+
+ # Delete the pipeline event root path in ZooKeeper to remove
+ # all pipeline specific event queues.
+ try:
+ self.zk_client.client.delete(
+ PIPELINE_NAME_ROOT.format(
+ tenant=pipeline.tenant.name,
+ pipeline=pipeline.name),
+ recursive=True)
+ except Exception:
+ # In case a pipeline event has been submitted during
+ # reconfiguration this cleanup will fail.
+ self.log.exception(
+ "Error removing event queues for deleted pipeline %s in "
+ "tenant %s", pipeline.name, pipeline.tenant.name)
+
+ # Delete the pipeline root path in ZooKeeper to remove all pipeline
+ # state.
+ try:
+ self.zk_client.client.delete(pipeline.state.getPath(),
+ recursive=True)
+ except Exception:
+ self.log.exception(
+ "Error removing state for deleted pipeline %s in tenant %s",
+ pipeline.name, pipeline.tenant.name)
def _doPromoteEvent(self, event):
tenant = self.abide.tenants.get(event.tenant_name)