summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSimon Westphahl <simon.westphahl@bmw.de>2021-10-20 14:29:27 +0200
committerSimon Westphahl <simon.westphahl@bmw.de>2021-10-29 12:04:44 +0200
commit4faf5498e41860ccdcfd6531bd0128934c661ad9 (patch)
treef24c21a2885e08139d756aa9c81e694dd2116611
parent4ebedf55e1ab6c48cdfcb795f82fc57a49b28d91 (diff)
downloadzuul-4faf5498e41860ccdcfd6531bd0128934c661ad9.tar.gz
Move re-enqueue to pipeline processing
Moving the re-enqueue step after a reconfiguration to the pipeline processing phase allows us to re-enqueue multiple pipelines in parallel in a multi-scheduler environment. In case of a failure the re-enqueue can also be retried the next time the pipeline is processed. Change-Id: Iad483c37610b51d94ead72573c3540b1dea9ab84
-rw-r--r--tests/unit/test_cross_crd.py2
-rw-r--r--tests/unit/test_gerrit_crd.py1
-rw-r--r--tests/unit/test_gerrit_legacy_crd.py1
-rw-r--r--tests/unit/test_scheduler.py3
-rw-r--r--zuul/model.py72
-rw-r--r--zuul/scheduler.py30
6 files changed, 91 insertions, 18 deletions
diff --git a/tests/unit/test_cross_crd.py b/tests/unit/test_cross_crd.py
index 9e04241f2..462190da1 100644
--- a/tests/unit/test_cross_crd.py
+++ b/tests/unit/test_cross_crd.py
@@ -347,6 +347,7 @@ class TestGerritToGithubCRD(ZuulTestCase):
self.waitUntilSettled()
self.scheds.execute(lambda app: app.sched.reconfigure(app.config))
+ self.waitUntilSettled()
# Make sure the items still share a change queue, and the
# first one is not live.
@@ -798,6 +799,7 @@ class TestGithubToGerritCRD(ZuulTestCase):
self.waitUntilSettled()
self.scheds.execute(lambda app: app.sched.reconfigure(app.config))
+ self.waitUntilSettled()
# Make sure the items still share a change queue, and the
# first one is not live.
diff --git a/tests/unit/test_gerrit_crd.py b/tests/unit/test_gerrit_crd.py
index ff2c7277c..e42bafef5 100644
--- a/tests/unit/test_gerrit_crd.py
+++ b/tests/unit/test_gerrit_crd.py
@@ -517,6 +517,7 @@ class TestGerritCRD(ZuulTestCase):
self.waitUntilSettled()
self.scheds.execute(lambda app: app.sched.reconfigure(app.config))
+ self.waitUntilSettled()
# Make sure the items still share a change queue, and the
# first one is not live.
diff --git a/tests/unit/test_gerrit_legacy_crd.py b/tests/unit/test_gerrit_legacy_crd.py
index f6337dd30..1e7488231 100644
--- a/tests/unit/test_gerrit_legacy_crd.py
+++ b/tests/unit/test_gerrit_legacy_crd.py
@@ -446,6 +446,7 @@ class TestGerritLegacyCRD(ZuulTestCase):
self.waitUntilSettled()
self.scheds.execute(lambda app: app.sched.reconfigure(app.config))
+ self.waitUntilSettled()
# Make sure the items still share a change queue, and the
# first one is not live.
diff --git a/tests/unit/test_scheduler.py b/tests/unit/test_scheduler.py
index 2016fbb28..0bd925f35 100644
--- a/tests/unit/test_scheduler.py
+++ b/tests/unit/test_scheduler.py
@@ -5481,6 +5481,7 @@ class TestScheduler(ZuulTestCase):
self.commitConfigUpdate('org/common-config',
'layouts/reconfigure-window2.yaml')
self.scheds.execute(lambda app: app.sched.reconfigure(app.config))
+ self.waitUntilSettled()
tenant = self.scheds.first.sched.abide.tenants.get('tenant-one')
queue = tenant.layout.pipelines['gate'].queues[0]
# Even though we have configured a smaller window, the value
@@ -5489,6 +5490,7 @@ class TestScheduler(ZuulTestCase):
self.assertTrue(len(self.builds), 4)
self.scheds.execute(lambda app: app.sched.reconfigure(app.config))
+ self.waitUntilSettled()
tenant = self.scheds.first.sched.abide.tenants.get('tenant-one')
queue = tenant.layout.pipelines['gate'].queues[0]
self.assertEqual(queue.window, 20)
@@ -5539,6 +5541,7 @@ class TestScheduler(ZuulTestCase):
self.assertEqual(len(self.builds), 4)
self.scheds.execute(lambda app: app.sched.reconfigure(app.config))
+ self.waitUntilSettled()
tenant = self.scheds.first.sched.abide.tenants.get('tenant-one')
queue = tenant.layout.pipelines['gate'].queues[0]
self.assertEqual(queue.window, 1)
diff --git a/zuul/model.py b/zuul/model.py
index b3e530751..bdb1df17c 100644
--- a/zuul/model.py
+++ b/zuul/model.py
@@ -15,6 +15,7 @@
import abc
from collections import OrderedDict, defaultdict, namedtuple, UserDict
+import contextlib
import copy
import json
import hashlib
@@ -532,6 +533,7 @@ class PipelineState(zkobject.ZKObject):
return dict(
state=Pipeline.STATE_NORMAL,
queues=[],
+ old_queues=[],
relative_priority_queues={},
consecutive_failures=0,
disabled=False,
@@ -554,7 +556,11 @@ class PipelineState(zkobject.ZKObject):
try:
state = cls.fromZK(ctx, cls.pipelinePath(pipeline),
pipeline=pipeline)
- reset_state = {**cls.defaultState(), "pipeline": pipeline}
+ reset_state = {
+ **cls.defaultState(),
+ "pipeline": pipeline,
+ "old_queues": state.old_queues,
+ }
state.updateAttributes(ctx, **reset_state)
return state
except NoNodeError:
@@ -569,19 +575,49 @@ class PipelineState(zkobject.ZKObject):
safe_pipeline = urllib.parse.quote_plus(pipeline.name)
return f"/zuul/{safe_tenant}/pipeline/{safe_pipeline}"
+ def setOldQueues(self, context, queues):
+ old_queues = OrderedDict()
+ for queue in queues:
+ queue._set(pipeline=self.pipeline)
+ for item in queue.queue:
+ item._set(pipeline=self.pipeline)
+ old_queues[queue.uuid] = queue
+
+ self.updateAttributes(context, old_queues=list(old_queues.values()))
+
+ def removeOldQueue(self, context, queue):
+ with contextlib.suppress(ValueError):
+ self.old_queues.remove(queue)
+ self._save(context)
+
def serialize(self):
data = {
"state": self.state,
"consecutive_failures": self.consecutive_failures,
"disabled": self.disabled,
"queues": [q.getPath() for q in self.queues],
+ "old_queues": [q.getPath() for q in self.old_queues],
}
return json.dumps(data).encode("utf8")
def deserialize(self, raw, context):
data = super().deserialize(raw, context)
+ existing_queues = {
+ q.getPath(): q for q in self.queues + self.old_queues
+ }
- existing_queues = {q.getPath(): q for q in self.queues}
+ # Restore the old queues first, so that in case an item is
+ # already in one of the new queues the item(s) ahead/behind
+ # pointers are corrected when restoring the new queues.
+ old_queues_by_path = OrderedDict()
+ for queue_path in data["old_queues"]:
+ queue = existing_queues.get(queue_path)
+ if queue:
+ queue.refresh(context)
+ else:
+ queue = ChangeQueue.fromZK(context, queue_path,
+ pipeline=self.pipeline)
+ old_queues_by_path[queue_path] = queue
queues_by_path = OrderedDict()
for queue_path in data["queues"]:
@@ -595,18 +631,25 @@ class PipelineState(zkobject.ZKObject):
data.update({
"queues": list(queues_by_path.values()),
+ "old_queues": list(old_queues_by_path.values()),
})
return data
+ def _getKnownItems(self):
+ items = []
+ for queue in (*self.old_queues, *self.queues):
+ items.extend(queue.queue)
+ return items
+
def cleanup(self, context):
pipeline_path = self.getPath()
try:
all_items = set(context.client.get_children(
f"{pipeline_path}/item"))
except NoNodeError:
- return
+ all_items = set()
- known_items = {item.uuid for item in self.pipeline.getAllItems()}
+ known_items = {i.uuid for i in self._getKnownItems()}
stale_items = all_items - known_items
for item_uuid in stale_items:
self.pipeline.manager.log.debug("Cleaning up stale item %s",
@@ -614,6 +657,21 @@ class PipelineState(zkobject.ZKObject):
context.client.delete(QueueItem.itemPath(pipeline_path, item_uuid),
recursive=True)
+ try:
+ all_queues = set(context.client.get_children(
+ f"{pipeline_path}/queue"))
+ except NoNodeError:
+ all_queues = set()
+
+ known_queues = {q.uuid for q in (*self.old_queues, *self.queues)}
+ stale_queues = all_queues - known_queues
+ for queue_uuid in stale_queues:
+ self.pipeline.manager.log.debug("Cleaning up stale queue %s",
+ queue_uuid)
+ context.client.delete(
+ ChangeQueue.queuePath(pipeline_path, queue_uuid),
+ recursive=True)
+
class ChangeQueue(zkobject.ZKObject):
"""A ChangeQueue contains Changes to be processed for related projects.
@@ -713,7 +771,11 @@ class ChangeQueue(zkobject.ZKObject):
def getPath(self):
pipeline_path = self.pipeline.state.getPath()
- return f"{pipeline_path}/queue/{self.uuid}"
+ return self.queuePath(pipeline_path, self.uuid)
+
+ @classmethod
+ def queuePath(cls, pipeline_path, queue_uuid):
+ return f"{pipeline_path}/queue/{queue_uuid}"
@property
def zk_context(self):
diff --git a/zuul/scheduler.py b/zuul/scheduler.py
index 4e53a704b..92c5504ee 100644
--- a/zuul/scheduler.py
+++ b/zuul/scheduler.py
@@ -1180,8 +1180,9 @@ class Scheduler(threading.Thread):
continue
with new_pipeline.manager.currentContext(context):
- self._reenqueuePipeline(
- tenant, new_pipeline, old_pipeline, context)
+ new_pipeline.state.setOldQueues(
+ context, [*old_pipeline.state.old_queues,
+ *old_pipeline.state.queues])
for name, old_pipeline in old_tenant.layout.pipelines.items():
new_pipeline = tenant.layout.pipelines.get(name)
@@ -1189,7 +1190,7 @@ class Scheduler(threading.Thread):
with old_pipeline.manager.currentContext(context):
self._reconfigureDeletePipeline(old_pipeline)
- def _reenqueuePipeline(self, tenant, new_pipeline, old_pipeline, context):
+ def _reenqueuePipeline(self, tenant, new_pipeline, context):
self.log.debug("Re-enqueueing changes for pipeline %s",
new_pipeline.name)
# TODO(jeblair): This supports an undocument and
@@ -1206,20 +1207,12 @@ class Scheduler(threading.Thread):
static_window = True
else:
static_window = False
- if old_pipeline.window and (not static_window):
- new_pipeline.window = max(old_pipeline.window,
- new_pipeline.window_floor)
+
items_to_remove = []
builds_to_cancel = []
requests_to_cancel = []
- for shared_queue in old_pipeline.queues:
+ for shared_queue in list(new_pipeline.state.old_queues):
last_head = None
- # Attempt to keep window sizes from shrinking where possible
- project, branch = shared_queue.project_branches[0]
- new_queue = new_pipeline.getQueue(project, branch)
- if new_queue and shared_queue.window and (not static_window):
- new_queue.window = max(shared_queue.window,
- new_queue.window_floor)
for item in shared_queue.queue:
# If the old item ahead made it in, re-enqueue
# this one behind it.
@@ -1265,6 +1258,15 @@ class Scheduler(threading.Thread):
(item.current_build_set, request))
else:
items_to_remove.append(item)
+
+ # Attempt to keep window sizes from shrinking where possible
+ project, branch = shared_queue.project_branches[0]
+ new_queue = new_pipeline.getQueue(project, branch)
+ if new_queue and shared_queue.window and (not static_window):
+ new_queue.updateAttributes(
+ context, window=max(shared_queue.window,
+ new_queue.window_floor))
+ new_pipeline.state.removeOldQueue(context, shared_queue)
for item in items_to_remove:
self.log.info(
"Removing item %s during reconfiguration" % (item,))
@@ -1615,6 +1617,8 @@ class Scheduler(threading.Thread):
ctx = self.createZKContext(lock, self.log)
with pipeline.manager.currentContext(ctx):
pipeline.state.refresh(ctx)
+ if pipeline.state.old_queues:
+ self._reenqueuePipeline(tenant, pipeline, ctx)
pipeline.state.cleanup(ctx)
self._process_pipeline(tenant, pipeline)