diff options
author | James E. Blair <jim@acmegating.com> | 2022-11-02 14:19:56 -0700 |
---|---|---|
committer | Simon Westphahl <simon.westphahl@bmw.de> | 2022-11-21 09:22:25 +0100 |
commit | 279d7fb5cd2463f4b3aa98bfde742ac42abe0d0b (patch) | |
tree | daad080b012f1aaf3f5de087b203e2012a3d71af | |
parent | c8aac6a118b84fd28ea454161d33f28184c4bd0b (diff) | |
download | zuul-279d7fb5cd2463f4b3aa98bfde742ac42abe0d0b.tar.gz |
Fix deduplication exceptions in pipeline processing
If a build is to be deduplicated and has not started yet and has
a pending node request, we store a dictionary describing the target
deduplicated build in the node_requests dictionary on the buildset.
There were a few places where we directly accessed that dictionary
and assumed the results would be the node request id. Notably, this
could cause an error in pipeline processing (as well os potentially
some other edge cases such as reconfiguring).
Most of the time we can just ignore deduplicated node requests since
the "real" buildset will take care of them. This change enriches
the API to help with that. In other places, we add a check for the
type.
To test this, we enable relative_priority in the config file which
is used in the deduplication tests, and we also add an assertion
which runs at the end of every test that ensures there were no
pipeline exceptions during the test (almost all the existing dedup
tests fail this assertion before this change).
Change-Id: Ia0c3f000426011b59542d8e56b43767fccc89a22
-rw-r--r-- | tests/base.py | 13 | ||||
-rw-r--r-- | tests/fixtures/zuul-gerrit-github.conf | 1 | ||||
-rw-r--r-- | zuul/executor/client.py | 5 | ||||
-rw-r--r-- | zuul/manager/__init__.py | 2 | ||||
-rw-r--r-- | zuul/model.py | 16 | ||||
-rw-r--r-- | zuul/scheduler.py | 21 |
6 files changed, 43 insertions, 15 deletions
diff --git a/tests/base.py b/tests/base.py index 239b243a5..3ab5329da 100644 --- a/tests/base.py +++ b/tests/base.py @@ -3610,10 +3610,11 @@ class RecordingExecutorServer(zuul.executor.server.ExecutorServer): self.log.debug('No running builds to release') return - self.log.debug("Releasing build %s (%s)" % (regex, len(builds))) + self.log.debug("Releasing build %s %s (%s)" % ( + regex, change, len(builds))) for build in builds: - if (not regex or re.match(regex, build.name) and - not change or build.change == change): + if ((not regex or re.match(regex, build.name)) and + (not change or build.change == change)): self.log.debug("Releasing build %s" % (build.parameters['zuul']['build'])) build.release() @@ -5158,6 +5159,11 @@ class ZuulTestCase(BaseTestCase): self.assertIsNotNone(build.start_time) self.assertIsNotNone(build.end_time) + def assertNoPipelineExceptions(self): + for tenant in self.scheds.first.sched.abide.tenants.values(): + for pipeline in tenant.layout.pipelines.values(): + self.assertEqual(0, pipeline._exception_count) + def assertFinalState(self): self.log.debug("Assert final state") # Make sure no jobs are running @@ -5184,6 +5190,7 @@ class ZuulTestCase(BaseTestCase): for pipeline in tenant.layout.pipelines.values(): if isinstance(pipeline.manager, ipm): self.assertEqual(len(pipeline.queues), 0) + self.assertNoPipelineExceptions() def shutdown(self): self.log.debug("Shutting down after tests") diff --git a/tests/fixtures/zuul-gerrit-github.conf b/tests/fixtures/zuul-gerrit-github.conf index 0d03bacc2..8114159f7 100644 --- a/tests/fixtures/zuul-gerrit-github.conf +++ b/tests/fixtures/zuul-gerrit-github.conf @@ -5,6 +5,7 @@ server=127.0.0.1 [scheduler] tenant_config=main.yaml +relative_priority=true [merger] git_dir=/tmp/zuul-test/merger-git diff --git a/zuul/executor/client.py b/zuul/executor/client.py index 9aa38cbca..41e5e6916 100644 --- a/zuul/executor/client.py +++ b/zuul/executor/client.py @@ -118,6 +118,11 @@ class ExecutorClient(object): # Store the NodeRequest ID in the job arguments, so we can look it up # on the executor side to lock the nodes. req_id = build.build_set.getJobNodeRequestID(job.name) + if isinstance(req_id, dict): + # This should never happen + raise Exception( + "Attempt to start build with deduplicated node request ID " + f"{req_id}") if req_id: params["noderequest_id"] = req_id diff --git a/zuul/manager/__init__.py b/zuul/manager/__init__.py index 538da2c10..e3c5da1ae 100644 --- a/zuul/manager/__init__.py +++ b/zuul/manager/__init__.py @@ -1660,7 +1660,7 @@ class PipelineManager(metaclass=ABCMeta): if (item.live and not dequeued and self.sched.globals.use_relative_priority): priority = item.getNodePriority() - for request_id in item.current_build_set.node_requests.values(): + for _, request_id in item.current_build_set.getNodeRequests(): node_request = self.sched.nodepool.zk_nodepool.getNodeRequest( request_id, cached=True) if not node_request: diff --git a/zuul/model.py b/zuul/model.py index c2da4f65c..5ba607028 100644 --- a/zuul/model.py +++ b/zuul/model.py @@ -473,6 +473,8 @@ class Pipeline(object): self.window_decrease_factor = None self.state = None self.change_list = None + # Only used by the unit tests for assertions + self._exception_count = 0 @property def queues(self): @@ -4433,8 +4435,18 @@ class BuildSet(zkobject.ZKObject): with self.activeContext(self.item.pipeline.manager.current_context): self.node_requests[job_name] = request_id - def getJobNodeRequestID(self, job_name): - return self.node_requests.get(job_name) + def getJobNodeRequestID(self, job_name, ignore_deduplicate=False): + r = self.node_requests.get(job_name) + if ignore_deduplicate and isinstance(r, dict): + return None + return r + + def getNodeRequests(self): + # This ignores deduplicated node requests + for job_name, request in self.node_requests.items(): + if isinstance(request, dict): + continue + yield job_name, request def removeJobNodeRequestID(self, job_name): if job_name in self.node_requests: diff --git a/zuul/scheduler.py b/zuul/scheduler.py index 42f8afbb0..b18e9c395 100644 --- a/zuul/scheduler.py +++ b/zuul/scheduler.py @@ -688,8 +688,8 @@ class Scheduler(threading.Thread): # In case we're in the middle of a reconfig, # include the old queue items. for item in pipeline.getAllItems(include_old=True): - nrs = item.current_build_set.node_requests - for req_id in (nrs.values()): + nrs = item.current_build_set.getNodeRequests() + for _, req_id in nrs: outstanding_requests.add(req_id) leaked_requests = zk_requests - outstanding_requests for req_id in leaked_requests: @@ -1632,7 +1632,7 @@ class Scheduler(threading.Thread): item.removeBuild(build) builds_to_cancel.append(build) for request_job, request in \ - item.current_build_set.node_requests.items(): + item.current_build_set.getNodeRequests(): new_job = item.getJob(request_job) if not new_job: requests_to_cancel.append( @@ -1654,7 +1654,7 @@ class Scheduler(threading.Thread): for build in item.current_build_set.getBuilds(): builds_to_cancel.append(build) for request_job, request in \ - item.current_build_set.node_requests.items(): + item.current_build_set.getNodeRequests(): requests_to_cancel.append( ( item.current_build_set, @@ -1776,7 +1776,7 @@ class Scheduler(threading.Thread): for build in item.current_build_set.getBuilds(): builds_to_cancel.append(build) for request_job, request in \ - item.current_build_set.node_requests.items(): + item.current_build_set.getNodeRequests(): requests_to_cancel.append( ( item.current_build_set, @@ -2225,6 +2225,7 @@ class Scheduler(threading.Thread): pass except Exception: self.log.exception("Exception in pipeline processing:") + pipeline._exception_count += 1 pipeline.state.updateAttributes( ctx, state=pipeline.STATE_ERROR) # Continue processing other pipelines+tenants @@ -2820,7 +2821,8 @@ class Scheduler(threading.Thread): # In case the build didn't show up on any executor, the node # request does still exist, so we have to make sure it is # removed from ZK. - request_id = build.build_set.getJobNodeRequestID(build.job.name) + request_id = build.build_set.getJobNodeRequestID( + build.job.name, ignore_deduplicate=True) if request_id: self.nodepool.deleteNodeRequest( request_id, event_id=build.zuul_event_id) @@ -2921,9 +2923,10 @@ class Scheduler(threading.Thread): # Cancel node request if needed req_id = buildset.getJobNodeRequestID(job_name) if req_id: - req = self.nodepool.zk_nodepool.getNodeRequest(req_id) - if req: - self.nodepool.cancelRequest(req) + if not isinstance(req_id, dict): + req = self.nodepool.zk_nodepool.getNodeRequest(req_id) + if req: + self.nodepool.cancelRequest(req) buildset.removeJobNodeRequestID(job_name) # Cancel build if needed |