diff options
author | Simon Westphahl <simon.westphahl@bmw.de> | 2021-10-20 14:29:27 +0200 |
---|---|---|
committer | Simon Westphahl <simon.westphahl@bmw.de> | 2021-10-29 12:04:44 +0200 |
commit | 4faf5498e41860ccdcfd6531bd0128934c661ad9 (patch) | |
tree | f24c21a2885e08139d756aa9c81e694dd2116611 | |
parent | 4ebedf55e1ab6c48cdfcb795f82fc57a49b28d91 (diff) | |
download | zuul-4faf5498e41860ccdcfd6531bd0128934c661ad9.tar.gz |
Move re-enqueue to pipeline processing
Moving the re-enqueue step after a reconfiguration to the pipeline
processing phase allows us to re-enqueue multiple pipelines in
parallel in a multi-scheduler environment.
In case of a failure the re-enqueue can also be retried the next time
the pipeline is processed.
Change-Id: Iad483c37610b51d94ead72573c3540b1dea9ab84
-rw-r--r-- | tests/unit/test_cross_crd.py | 2 | ||||
-rw-r--r-- | tests/unit/test_gerrit_crd.py | 1 | ||||
-rw-r--r-- | tests/unit/test_gerrit_legacy_crd.py | 1 | ||||
-rw-r--r-- | tests/unit/test_scheduler.py | 3 | ||||
-rw-r--r-- | zuul/model.py | 72 | ||||
-rw-r--r-- | zuul/scheduler.py | 30 |
6 files changed, 91 insertions, 18 deletions
diff --git a/tests/unit/test_cross_crd.py b/tests/unit/test_cross_crd.py index 9e04241f2..462190da1 100644 --- a/tests/unit/test_cross_crd.py +++ b/tests/unit/test_cross_crd.py @@ -347,6 +347,7 @@ class TestGerritToGithubCRD(ZuulTestCase): self.waitUntilSettled() self.scheds.execute(lambda app: app.sched.reconfigure(app.config)) + self.waitUntilSettled() # Make sure the items still share a change queue, and the # first one is not live. @@ -798,6 +799,7 @@ class TestGithubToGerritCRD(ZuulTestCase): self.waitUntilSettled() self.scheds.execute(lambda app: app.sched.reconfigure(app.config)) + self.waitUntilSettled() # Make sure the items still share a change queue, and the # first one is not live. diff --git a/tests/unit/test_gerrit_crd.py b/tests/unit/test_gerrit_crd.py index ff2c7277c..e42bafef5 100644 --- a/tests/unit/test_gerrit_crd.py +++ b/tests/unit/test_gerrit_crd.py @@ -517,6 +517,7 @@ class TestGerritCRD(ZuulTestCase): self.waitUntilSettled() self.scheds.execute(lambda app: app.sched.reconfigure(app.config)) + self.waitUntilSettled() # Make sure the items still share a change queue, and the # first one is not live. diff --git a/tests/unit/test_gerrit_legacy_crd.py b/tests/unit/test_gerrit_legacy_crd.py index f6337dd30..1e7488231 100644 --- a/tests/unit/test_gerrit_legacy_crd.py +++ b/tests/unit/test_gerrit_legacy_crd.py @@ -446,6 +446,7 @@ class TestGerritLegacyCRD(ZuulTestCase): self.waitUntilSettled() self.scheds.execute(lambda app: app.sched.reconfigure(app.config)) + self.waitUntilSettled() # Make sure the items still share a change queue, and the # first one is not live. diff --git a/tests/unit/test_scheduler.py b/tests/unit/test_scheduler.py index 2016fbb28..0bd925f35 100644 --- a/tests/unit/test_scheduler.py +++ b/tests/unit/test_scheduler.py @@ -5481,6 +5481,7 @@ class TestScheduler(ZuulTestCase): self.commitConfigUpdate('org/common-config', 'layouts/reconfigure-window2.yaml') self.scheds.execute(lambda app: app.sched.reconfigure(app.config)) + self.waitUntilSettled() tenant = self.scheds.first.sched.abide.tenants.get('tenant-one') queue = tenant.layout.pipelines['gate'].queues[0] # Even though we have configured a smaller window, the value @@ -5489,6 +5490,7 @@ class TestScheduler(ZuulTestCase): self.assertTrue(len(self.builds), 4) self.scheds.execute(lambda app: app.sched.reconfigure(app.config)) + self.waitUntilSettled() tenant = self.scheds.first.sched.abide.tenants.get('tenant-one') queue = tenant.layout.pipelines['gate'].queues[0] self.assertEqual(queue.window, 20) @@ -5539,6 +5541,7 @@ class TestScheduler(ZuulTestCase): self.assertEqual(len(self.builds), 4) self.scheds.execute(lambda app: app.sched.reconfigure(app.config)) + self.waitUntilSettled() tenant = self.scheds.first.sched.abide.tenants.get('tenant-one') queue = tenant.layout.pipelines['gate'].queues[0] self.assertEqual(queue.window, 1) diff --git a/zuul/model.py b/zuul/model.py index b3e530751..bdb1df17c 100644 --- a/zuul/model.py +++ b/zuul/model.py @@ -15,6 +15,7 @@ import abc from collections import OrderedDict, defaultdict, namedtuple, UserDict +import contextlib import copy import json import hashlib @@ -532,6 +533,7 @@ class PipelineState(zkobject.ZKObject): return dict( state=Pipeline.STATE_NORMAL, queues=[], + old_queues=[], relative_priority_queues={}, consecutive_failures=0, disabled=False, @@ -554,7 +556,11 @@ class PipelineState(zkobject.ZKObject): try: state = cls.fromZK(ctx, cls.pipelinePath(pipeline), pipeline=pipeline) - reset_state = {**cls.defaultState(), "pipeline": pipeline} + reset_state = { + **cls.defaultState(), + "pipeline": pipeline, + "old_queues": state.old_queues, + } state.updateAttributes(ctx, **reset_state) return state except NoNodeError: @@ -569,19 +575,49 @@ class PipelineState(zkobject.ZKObject): safe_pipeline = urllib.parse.quote_plus(pipeline.name) return f"/zuul/{safe_tenant}/pipeline/{safe_pipeline}" + def setOldQueues(self, context, queues): + old_queues = OrderedDict() + for queue in queues: + queue._set(pipeline=self.pipeline) + for item in queue.queue: + item._set(pipeline=self.pipeline) + old_queues[queue.uuid] = queue + + self.updateAttributes(context, old_queues=list(old_queues.values())) + + def removeOldQueue(self, context, queue): + with contextlib.suppress(ValueError): + self.old_queues.remove(queue) + self._save(context) + def serialize(self): data = { "state": self.state, "consecutive_failures": self.consecutive_failures, "disabled": self.disabled, "queues": [q.getPath() for q in self.queues], + "old_queues": [q.getPath() for q in self.old_queues], } return json.dumps(data).encode("utf8") def deserialize(self, raw, context): data = super().deserialize(raw, context) + existing_queues = { + q.getPath(): q for q in self.queues + self.old_queues + } - existing_queues = {q.getPath(): q for q in self.queues} + # Restore the old queues first, so that in case an item is + # already in one of the new queues the item(s) ahead/behind + # pointers are corrected when restoring the new queues. + old_queues_by_path = OrderedDict() + for queue_path in data["old_queues"]: + queue = existing_queues.get(queue_path) + if queue: + queue.refresh(context) + else: + queue = ChangeQueue.fromZK(context, queue_path, + pipeline=self.pipeline) + old_queues_by_path[queue_path] = queue queues_by_path = OrderedDict() for queue_path in data["queues"]: @@ -595,18 +631,25 @@ class PipelineState(zkobject.ZKObject): data.update({ "queues": list(queues_by_path.values()), + "old_queues": list(old_queues_by_path.values()), }) return data + def _getKnownItems(self): + items = [] + for queue in (*self.old_queues, *self.queues): + items.extend(queue.queue) + return items + def cleanup(self, context): pipeline_path = self.getPath() try: all_items = set(context.client.get_children( f"{pipeline_path}/item")) except NoNodeError: - return + all_items = set() - known_items = {item.uuid for item in self.pipeline.getAllItems()} + known_items = {i.uuid for i in self._getKnownItems()} stale_items = all_items - known_items for item_uuid in stale_items: self.pipeline.manager.log.debug("Cleaning up stale item %s", @@ -614,6 +657,21 @@ class PipelineState(zkobject.ZKObject): context.client.delete(QueueItem.itemPath(pipeline_path, item_uuid), recursive=True) + try: + all_queues = set(context.client.get_children( + f"{pipeline_path}/queue")) + except NoNodeError: + all_queues = set() + + known_queues = {q.uuid for q in (*self.old_queues, *self.queues)} + stale_queues = all_queues - known_queues + for queue_uuid in stale_queues: + self.pipeline.manager.log.debug("Cleaning up stale queue %s", + queue_uuid) + context.client.delete( + ChangeQueue.queuePath(pipeline_path, queue_uuid), + recursive=True) + class ChangeQueue(zkobject.ZKObject): """A ChangeQueue contains Changes to be processed for related projects. @@ -713,7 +771,11 @@ class ChangeQueue(zkobject.ZKObject): def getPath(self): pipeline_path = self.pipeline.state.getPath() - return f"{pipeline_path}/queue/{self.uuid}" + return self.queuePath(pipeline_path, self.uuid) + + @classmethod + def queuePath(cls, pipeline_path, queue_uuid): + return f"{pipeline_path}/queue/{queue_uuid}" @property def zk_context(self): diff --git a/zuul/scheduler.py b/zuul/scheduler.py index 4e53a704b..92c5504ee 100644 --- a/zuul/scheduler.py +++ b/zuul/scheduler.py @@ -1180,8 +1180,9 @@ class Scheduler(threading.Thread): continue with new_pipeline.manager.currentContext(context): - self._reenqueuePipeline( - tenant, new_pipeline, old_pipeline, context) + new_pipeline.state.setOldQueues( + context, [*old_pipeline.state.old_queues, + *old_pipeline.state.queues]) for name, old_pipeline in old_tenant.layout.pipelines.items(): new_pipeline = tenant.layout.pipelines.get(name) @@ -1189,7 +1190,7 @@ class Scheduler(threading.Thread): with old_pipeline.manager.currentContext(context): self._reconfigureDeletePipeline(old_pipeline) - def _reenqueuePipeline(self, tenant, new_pipeline, old_pipeline, context): + def _reenqueuePipeline(self, tenant, new_pipeline, context): self.log.debug("Re-enqueueing changes for pipeline %s", new_pipeline.name) # TODO(jeblair): This supports an undocument and @@ -1206,20 +1207,12 @@ class Scheduler(threading.Thread): static_window = True else: static_window = False - if old_pipeline.window and (not static_window): - new_pipeline.window = max(old_pipeline.window, - new_pipeline.window_floor) + items_to_remove = [] builds_to_cancel = [] requests_to_cancel = [] - for shared_queue in old_pipeline.queues: + for shared_queue in list(new_pipeline.state.old_queues): last_head = None - # Attempt to keep window sizes from shrinking where possible - project, branch = shared_queue.project_branches[0] - new_queue = new_pipeline.getQueue(project, branch) - if new_queue and shared_queue.window and (not static_window): - new_queue.window = max(shared_queue.window, - new_queue.window_floor) for item in shared_queue.queue: # If the old item ahead made it in, re-enqueue # this one behind it. @@ -1265,6 +1258,15 @@ class Scheduler(threading.Thread): (item.current_build_set, request)) else: items_to_remove.append(item) + + # Attempt to keep window sizes from shrinking where possible + project, branch = shared_queue.project_branches[0] + new_queue = new_pipeline.getQueue(project, branch) + if new_queue and shared_queue.window and (not static_window): + new_queue.updateAttributes( + context, window=max(shared_queue.window, + new_queue.window_floor)) + new_pipeline.state.removeOldQueue(context, shared_queue) for item in items_to_remove: self.log.info( "Removing item %s during reconfiguration" % (item,)) @@ -1615,6 +1617,8 @@ class Scheduler(threading.Thread): ctx = self.createZKContext(lock, self.log) with pipeline.manager.currentContext(ctx): pipeline.state.refresh(ctx) + if pipeline.state.old_queues: + self._reenqueuePipeline(tenant, pipeline, ctx) pipeline.state.cleanup(ctx) self._process_pipeline(tenant, pipeline) |