summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJames E. Blair <jim@acmegating.com>2022-11-02 14:19:56 -0700
committerSimon Westphahl <simon.westphahl@bmw.de>2022-11-21 09:22:25 +0100
commit279d7fb5cd2463f4b3aa98bfde742ac42abe0d0b (patch)
treedaad080b012f1aaf3f5de087b203e2012a3d71af
parentc8aac6a118b84fd28ea454161d33f28184c4bd0b (diff)
downloadzuul-279d7fb5cd2463f4b3aa98bfde742ac42abe0d0b.tar.gz
Fix deduplication exceptions in pipeline processing
If a build is to be deduplicated and has not started yet and has a pending node request, we store a dictionary describing the target deduplicated build in the node_requests dictionary on the buildset. There were a few places where we directly accessed that dictionary and assumed the results would be the node request id. Notably, this could cause an error in pipeline processing (as well os potentially some other edge cases such as reconfiguring). Most of the time we can just ignore deduplicated node requests since the "real" buildset will take care of them. This change enriches the API to help with that. In other places, we add a check for the type. To test this, we enable relative_priority in the config file which is used in the deduplication tests, and we also add an assertion which runs at the end of every test that ensures there were no pipeline exceptions during the test (almost all the existing dedup tests fail this assertion before this change). Change-Id: Ia0c3f000426011b59542d8e56b43767fccc89a22
-rw-r--r--tests/base.py13
-rw-r--r--tests/fixtures/zuul-gerrit-github.conf1
-rw-r--r--zuul/executor/client.py5
-rw-r--r--zuul/manager/__init__.py2
-rw-r--r--zuul/model.py16
-rw-r--r--zuul/scheduler.py21
6 files changed, 43 insertions, 15 deletions
diff --git a/tests/base.py b/tests/base.py
index 239b243a5..3ab5329da 100644
--- a/tests/base.py
+++ b/tests/base.py
@@ -3610,10 +3610,11 @@ class RecordingExecutorServer(zuul.executor.server.ExecutorServer):
self.log.debug('No running builds to release')
return
- self.log.debug("Releasing build %s (%s)" % (regex, len(builds)))
+ self.log.debug("Releasing build %s %s (%s)" % (
+ regex, change, len(builds)))
for build in builds:
- if (not regex or re.match(regex, build.name) and
- not change or build.change == change):
+ if ((not regex or re.match(regex, build.name)) and
+ (not change or build.change == change)):
self.log.debug("Releasing build %s" %
(build.parameters['zuul']['build']))
build.release()
@@ -5158,6 +5159,11 @@ class ZuulTestCase(BaseTestCase):
self.assertIsNotNone(build.start_time)
self.assertIsNotNone(build.end_time)
+ def assertNoPipelineExceptions(self):
+ for tenant in self.scheds.first.sched.abide.tenants.values():
+ for pipeline in tenant.layout.pipelines.values():
+ self.assertEqual(0, pipeline._exception_count)
+
def assertFinalState(self):
self.log.debug("Assert final state")
# Make sure no jobs are running
@@ -5184,6 +5190,7 @@ class ZuulTestCase(BaseTestCase):
for pipeline in tenant.layout.pipelines.values():
if isinstance(pipeline.manager, ipm):
self.assertEqual(len(pipeline.queues), 0)
+ self.assertNoPipelineExceptions()
def shutdown(self):
self.log.debug("Shutting down after tests")
diff --git a/tests/fixtures/zuul-gerrit-github.conf b/tests/fixtures/zuul-gerrit-github.conf
index 0d03bacc2..8114159f7 100644
--- a/tests/fixtures/zuul-gerrit-github.conf
+++ b/tests/fixtures/zuul-gerrit-github.conf
@@ -5,6 +5,7 @@ server=127.0.0.1
[scheduler]
tenant_config=main.yaml
+relative_priority=true
[merger]
git_dir=/tmp/zuul-test/merger-git
diff --git a/zuul/executor/client.py b/zuul/executor/client.py
index 9aa38cbca..41e5e6916 100644
--- a/zuul/executor/client.py
+++ b/zuul/executor/client.py
@@ -118,6 +118,11 @@ class ExecutorClient(object):
# Store the NodeRequest ID in the job arguments, so we can look it up
# on the executor side to lock the nodes.
req_id = build.build_set.getJobNodeRequestID(job.name)
+ if isinstance(req_id, dict):
+ # This should never happen
+ raise Exception(
+ "Attempt to start build with deduplicated node request ID "
+ f"{req_id}")
if req_id:
params["noderequest_id"] = req_id
diff --git a/zuul/manager/__init__.py b/zuul/manager/__init__.py
index 538da2c10..e3c5da1ae 100644
--- a/zuul/manager/__init__.py
+++ b/zuul/manager/__init__.py
@@ -1660,7 +1660,7 @@ class PipelineManager(metaclass=ABCMeta):
if (item.live and not dequeued
and self.sched.globals.use_relative_priority):
priority = item.getNodePriority()
- for request_id in item.current_build_set.node_requests.values():
+ for _, request_id in item.current_build_set.getNodeRequests():
node_request = self.sched.nodepool.zk_nodepool.getNodeRequest(
request_id, cached=True)
if not node_request:
diff --git a/zuul/model.py b/zuul/model.py
index c2da4f65c..5ba607028 100644
--- a/zuul/model.py
+++ b/zuul/model.py
@@ -473,6 +473,8 @@ class Pipeline(object):
self.window_decrease_factor = None
self.state = None
self.change_list = None
+ # Only used by the unit tests for assertions
+ self._exception_count = 0
@property
def queues(self):
@@ -4433,8 +4435,18 @@ class BuildSet(zkobject.ZKObject):
with self.activeContext(self.item.pipeline.manager.current_context):
self.node_requests[job_name] = request_id
- def getJobNodeRequestID(self, job_name):
- return self.node_requests.get(job_name)
+ def getJobNodeRequestID(self, job_name, ignore_deduplicate=False):
+ r = self.node_requests.get(job_name)
+ if ignore_deduplicate and isinstance(r, dict):
+ return None
+ return r
+
+ def getNodeRequests(self):
+ # This ignores deduplicated node requests
+ for job_name, request in self.node_requests.items():
+ if isinstance(request, dict):
+ continue
+ yield job_name, request
def removeJobNodeRequestID(self, job_name):
if job_name in self.node_requests:
diff --git a/zuul/scheduler.py b/zuul/scheduler.py
index 42f8afbb0..b18e9c395 100644
--- a/zuul/scheduler.py
+++ b/zuul/scheduler.py
@@ -688,8 +688,8 @@ class Scheduler(threading.Thread):
# In case we're in the middle of a reconfig,
# include the old queue items.
for item in pipeline.getAllItems(include_old=True):
- nrs = item.current_build_set.node_requests
- for req_id in (nrs.values()):
+ nrs = item.current_build_set.getNodeRequests()
+ for _, req_id in nrs:
outstanding_requests.add(req_id)
leaked_requests = zk_requests - outstanding_requests
for req_id in leaked_requests:
@@ -1632,7 +1632,7 @@ class Scheduler(threading.Thread):
item.removeBuild(build)
builds_to_cancel.append(build)
for request_job, request in \
- item.current_build_set.node_requests.items():
+ item.current_build_set.getNodeRequests():
new_job = item.getJob(request_job)
if not new_job:
requests_to_cancel.append(
@@ -1654,7 +1654,7 @@ class Scheduler(threading.Thread):
for build in item.current_build_set.getBuilds():
builds_to_cancel.append(build)
for request_job, request in \
- item.current_build_set.node_requests.items():
+ item.current_build_set.getNodeRequests():
requests_to_cancel.append(
(
item.current_build_set,
@@ -1776,7 +1776,7 @@ class Scheduler(threading.Thread):
for build in item.current_build_set.getBuilds():
builds_to_cancel.append(build)
for request_job, request in \
- item.current_build_set.node_requests.items():
+ item.current_build_set.getNodeRequests():
requests_to_cancel.append(
(
item.current_build_set,
@@ -2225,6 +2225,7 @@ class Scheduler(threading.Thread):
pass
except Exception:
self.log.exception("Exception in pipeline processing:")
+ pipeline._exception_count += 1
pipeline.state.updateAttributes(
ctx, state=pipeline.STATE_ERROR)
# Continue processing other pipelines+tenants
@@ -2820,7 +2821,8 @@ class Scheduler(threading.Thread):
# In case the build didn't show up on any executor, the node
# request does still exist, so we have to make sure it is
# removed from ZK.
- request_id = build.build_set.getJobNodeRequestID(build.job.name)
+ request_id = build.build_set.getJobNodeRequestID(
+ build.job.name, ignore_deduplicate=True)
if request_id:
self.nodepool.deleteNodeRequest(
request_id, event_id=build.zuul_event_id)
@@ -2921,9 +2923,10 @@ class Scheduler(threading.Thread):
# Cancel node request if needed
req_id = buildset.getJobNodeRequestID(job_name)
if req_id:
- req = self.nodepool.zk_nodepool.getNodeRequest(req_id)
- if req:
- self.nodepool.cancelRequest(req)
+ if not isinstance(req_id, dict):
+ req = self.nodepool.zk_nodepool.getNodeRequest(req_id)
+ if req:
+ self.nodepool.cancelRequest(req)
buildset.removeJobNodeRequestID(job_name)
# Cancel build if needed