diff options
author | Zuul <zuul@review.opendev.org> | 2023-02-03 13:56:41 +0000 |
---|---|---|
committer | Gerrit Code Review <review@openstack.org> | 2023-02-03 13:56:41 +0000 |
commit | 1ae2d929a369f700a23799f695831686b8b8e150 (patch) | |
tree | 1bf048f2be62fdfe6b57f52f30c61b740419bbd6 | |
parent | 263e5801758cd260a9d84a018412d0df44749b12 (diff) | |
parent | 9048706d93aa60ac414a7f5a1f94eda21a670cf9 (diff) | |
download | zuul-1ae2d929a369f700a23799f695831686b8b8e150.tar.gz |
Merge "Cleanup deleted pipelines and and event queues"
-rw-r--r-- | tests/base.py | 1 | ||||
-rw-r--r-- | tests/unit/test_sos.py | 73 | ||||
-rw-r--r-- | zuul/scheduler.py | 116 |
3 files changed, 154 insertions, 36 deletions
diff --git a/tests/base.py b/tests/base.py index cdcf63390..8ac32f2d4 100644 --- a/tests/base.py +++ b/tests/base.py @@ -5622,6 +5622,7 @@ class ZuulTestCase(BaseTestCase): time.sleep(0.1) def refreshPipelines(self, sched): + ctx = None for tenant in sched.abide.tenants.values(): with tenant_read_lock(self.zk_client, tenant.name): for pipeline in tenant.layout.pipelines.values(): diff --git a/tests/unit/test_sos.py b/tests/unit/test_sos.py index 37a47c6ae..4f2110f3e 100644 --- a/tests/unit/test_sos.py +++ b/tests/unit/test_sos.py @@ -244,6 +244,79 @@ class TestScaleOutScheduler(ZuulTestCase): self.assertTrue(all(l == new.uuid for l in layout_uuids)) self.waitUntilSettled() + def test_live_reconfiguration_del_pipeline(self): + # Test pipeline deletion while changes are enqueued + + # Create a second scheduler instance + app = self.createScheduler() + app.start() + self.assertEqual(len(self.scheds), 2) + + for _ in iterate_timeout(10, "Wait until priming is complete"): + old = self.scheds.first.sched.tenant_layout_state.get("tenant-one") + if old is not None: + break + + for _ in iterate_timeout( + 10, "Wait for all schedulers to have the same layout state"): + layout_states = [a.sched.local_layout_state.get("tenant-one") + for a in self.scheds.instances] + if all(l == old for l in layout_states): + break + + pipeline_zk_path = app.sched.abide.tenants[ + "tenant-one"].layout.pipelines["check"].state.getPath() + + self.executor_server.hold_jobs_in_build = True + A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A') + + # Let the first scheduler enqueue the change into the pipeline that + # will be removed later on. + with app.sched.run_handler_lock: + self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1)) + self.waitUntilSettled(matcher=[self.scheds.first]) + + # Process item only on second scheduler so the first scheduler has + # an outdated pipeline state. + with self.scheds.first.sched.run_handler_lock: + self.executor_server.release('.*-merge') + self.waitUntilSettled(matcher=[app]) + self.assertEqual(len(self.builds), 2) + + self.commitConfigUpdate( + 'common-config', + 'layouts/live-reconfiguration-del-pipeline.yaml') + # Trigger a reconfiguration on the first scheduler with the outdated + # pipeline state of the pipeline that will be removed. + self.scheds.execute(lambda a: a.sched.reconfigure(a.config), + matcher=[self.scheds.first]) + + new = self.scheds.first.sched.tenant_layout_state.get("tenant-one") + for _ in iterate_timeout( + 10, "Wait for all schedulers to have the same layout state"): + layout_states = [a.sched.local_layout_state.get("tenant-one") + for a in self.scheds.instances] + if all(l == new for l in layout_states): + break + + self.executor_server.hold_jobs_in_build = False + self.executor_server.release() + self.waitUntilSettled() + + self.assertEqual(A.data['status'], 'NEW') + self.assertEqual(A.reported, 0) + + self.assertHistory([ + dict(name='project-merge', result='SUCCESS', changes='1,1'), + dict(name='project-test1', result='ABORTED', changes='1,1'), + dict(name='project-test2', result='ABORTED', changes='1,1'), + ], ordered=False) + + tenant = self.scheds.first.sched.abide.tenants.get('tenant-one') + self.assertEqual(len(tenant.layout.pipelines), 0) + stat = self.zk_client.client.exists(pipeline_zk_path) + self.assertIsNone(stat) + def test_change_cache(self): # Test re-using a change from the change cache. A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A') diff --git a/zuul/scheduler.py b/zuul/scheduler.py index 0463409cb..7f61f3fe4 100644 --- a/zuul/scheduler.py +++ b/zuul/scheduler.py @@ -1758,7 +1758,12 @@ class Scheduler(threading.Thread): new_pipeline = tenant.layout.pipelines.get(name) if not new_pipeline: with old_pipeline.manager.currentContext(context): - self._reconfigureDeletePipeline(old_pipeline) + try: + self._reconfigureDeletePipeline(old_pipeline) + except Exception: + self.log.exception( + "Failed to cleanup deleted pipeline %s:", + old_pipeline) self.management_events[tenant.name].initialize() self.trigger_events[tenant.name].initialize() @@ -1829,7 +1834,11 @@ class Scheduler(threading.Thread): (tenant,)) for pipeline in tenant.layout.pipelines.values(): with pipeline.manager.currentContext(context): - self._reconfigureDeletePipeline(pipeline) + try: + self._reconfigureDeletePipeline(pipeline) + except Exception: + self.log.exception( + "Failed to cleanup deleted pipeline %s:", pipeline) # Delete the tenant root path for this tenant in ZooKeeper to remove # all tenant specific event queues @@ -1845,45 +1854,80 @@ class Scheduler(threading.Thread): def _reconfigureDeletePipeline(self, pipeline): self.log.info("Removing pipeline %s during reconfiguration" % (pipeline,)) - for shared_queue in pipeline.queues: - builds_to_cancel = [] - requests_to_cancel = [] - for item in shared_queue.queue: - with item.activeContext(pipeline.manager.current_context): - item.item_ahead = None - item.items_behind = [] - self.log.info( - "Removing item %s during reconfiguration" % (item,)) - for build in item.current_build_set.getBuilds(): - builds_to_cancel.append(build) - for request_job, request in \ - item.current_build_set.getNodeRequests(): - requests_to_cancel.append( - ( - item.current_build_set, - request, - item.getJob(request_job), - ) + + ctx = pipeline.manager.current_context + pipeline.state.refresh(ctx) + + builds_to_cancel = [] + requests_to_cancel = [] + for item in pipeline.getAllItems(): + with item.activeContext(pipeline.manager.current_context): + item.item_ahead = None + item.items_behind = [] + self.log.info( + "Removing item %s during reconfiguration" % (item,)) + for build in item.current_build_set.getBuilds(): + builds_to_cancel.append(build) + for request_job, request in \ + item.current_build_set.getNodeRequests(): + requests_to_cancel.append( + ( + item.current_build_set, + request, + item.getJob(request_job), ) - try: - self.sql.reportBuildsetEnd( - item.current_build_set, 'dequeue', - final=False, result='DEQUEUED') - except Exception: - self.log.exception( - "Error reporting buildset completion to DB:") + ) + try: + self.sql.reportBuildsetEnd( + item.current_build_set, 'dequeue', + final=False, result='DEQUEUED') + except Exception: + self.log.exception( + "Error reporting buildset completion to DB:") - for build in builds_to_cancel: - self.log.info( - "Canceling build %s during reconfiguration" % (build,)) + for build in builds_to_cancel: + self.log.info( + "Canceling build %s during reconfiguration", build) + try: self.cancelJob(build.build_set, build.job, build=build, force=True) - for build_set, request, request_job in requests_to_cancel: - self.log.info( - "Canceling node request %s during reconfiguration", - request) + except Exception: + self.log.exception( + "Error canceling build %s during reconfiguration", build) + for build_set, request, request_job in requests_to_cancel: + self.log.info( + "Canceling node request %s during reconfiguration", request) + try: self.cancelJob(build_set, request_job, force=True) - shared_queue.delete(pipeline.manager.current_context) + except Exception: + self.log.exception( + "Error canceling node request %s during reconfiguration", + request) + + # Delete the pipeline event root path in ZooKeeper to remove + # all pipeline specific event queues. + try: + self.zk_client.client.delete( + PIPELINE_NAME_ROOT.format( + tenant=pipeline.tenant.name, + pipeline=pipeline.name), + recursive=True) + except Exception: + # In case a pipeline event has been submitted during + # reconfiguration this cleanup will fail. + self.log.exception( + "Error removing event queues for deleted pipeline %s in " + "tenant %s", pipeline.name, pipeline.tenant.name) + + # Delete the pipeline root path in ZooKeeper to remove all pipeline + # state. + try: + self.zk_client.client.delete(pipeline.state.getPath(), + recursive=True) + except Exception: + self.log.exception( + "Error removing state for deleted pipeline %s in tenant %s", + pipeline.name, pipeline.tenant.name) def _doPromoteEvent(self, event): tenant = self.abide.tenants.get(event.tenant_name) |