diff options
author | Zuul <zuul@review.opendev.org> | 2022-09-16 16:52:44 +0000 |
---|---|---|
committer | Gerrit Code Review <review@openstack.org> | 2022-09-16 16:52:44 +0000 |
commit | e612a442e37dc0fff00fe6bb2a219fa42c005c2a (patch) | |
tree | 110da0aeebfccd6fe9c3eb11fca0c7087fcb74f9 /zuul | |
parent | 9a83b7da5964643d6c38328f4d896b9b9b4de447 (diff) | |
parent | 1958bbad03d4c2eb59502df8682bdfcb248400c2 (diff) | |
download | zuul-e612a442e37dc0fff00fe6bb2a219fa42c005c2a.tar.gz |
Merge "Add nodeset alternatives"
Diffstat (limited to 'zuul')
-rw-r--r-- | zuul/configloader.py | 46 | ||||
-rw-r--r-- | zuul/manager/__init__.py | 72 | ||||
-rw-r--r-- | zuul/model.py | 132 | ||||
-rw-r--r-- | zuul/model_api.py | 2 |
4 files changed, 200 insertions, 52 deletions
diff --git a/zuul/configloader.py b/zuul/configloader.py index 0327f1796..037fc48aa 100644 --- a/zuul/configloader.py +++ b/zuul/configloader.py @@ -488,14 +488,26 @@ class NodeSetParser(object): vs.Required('nodes'): to_list(str), } - nodeset = {vs.Required('nodes'): to_list(node), - 'groups': to_list(group), - '_source_context': model.SourceContext, - '_start_mark': model.ZuulMark, - } + real_nodeset = {vs.Required('nodes'): to_list(node), + 'groups': to_list(group), + } + + alt_nodeset = {vs.Required('alternatives'): + [vs.Any(real_nodeset, str)]} + top_nodeset = {'_source_context': model.SourceContext, + '_start_mark': model.ZuulMark, + } if not anonymous: - nodeset[vs.Required('name')] = str + top_nodeset[vs.Required('name')] = str + + top_real_nodeset = real_nodeset.copy() + top_real_nodeset.update(top_nodeset) + top_alt_nodeset = alt_nodeset.copy() + top_alt_nodeset.update(top_nodeset) + + nodeset = vs.Any(top_real_nodeset, top_alt_nodeset) + return vs.Schema(nodeset) def fromYaml(self, conf, anonymous=False): @@ -503,6 +515,24 @@ class NodeSetParser(object): self.anon_schema(conf) else: self.schema(conf) + + if 'alternatives' in conf: + return self.loadAlternatives(conf) + else: + return self.loadNodeset(conf) + + def loadAlternatives(self, conf): + ns = model.NodeSet(conf.get('name')) + ns.source_context = conf.get('_source_context') + ns.start_mark = conf.get('_start_mark') + for alt in conf['alternatives']: + if isinstance(alt, str): + ns.addAlternative(alt) + else: + ns.addAlternative(self.loadNodeset(alt)) + return ns + + def loadNodeset(self, conf): ns = model.NodeSet(conf.get('name')) ns.source_context = conf.get('_source_context') ns.start_mark = conf.get('_start_mark') @@ -2388,6 +2418,10 @@ class TenantParser(object): # Now that all the jobs are loaded, verify references to other # config objects. + for nodeset in layout.nodesets.values(): + with reference_exceptions('nodeset', nodeset, + layout.loading_errors): + nodeset.validateReferences(layout) for jobs in layout.jobs.values(): for job in jobs: with reference_exceptions('job', job, layout.loading_errors): diff --git a/zuul/manager/__init__.py b/zuul/manager/__init__.py index dcaea5283..365435f3d 100644 --- a/zuul/manager/__init__.py +++ b/zuul/manager/__init__.py @@ -862,23 +862,28 @@ class PipelineManager(metaclass=ABCMeta): else: relative_priority = 0 for job in jobs: - provider = self._getPausedParentProvider(build_set, job) - priority = self._calculateNodeRequestPriority(build_set, job) - tenant_name = build_set.item.pipeline.tenant.name - pipeline_name = build_set.item.pipeline.name - req = self.sched.nodepool.requestNodes( - build_set.uuid, job, tenant_name, pipeline_name, provider, - priority, relative_priority, event=item.event) - log.debug("Adding node request %s for job %s to item %s", - req, job, item) - build_set.setJobNodeRequestID(job.name, req.id) - if req.fulfilled: - nodeset = self.sched.nodepool.getNodeSet(req, job.nodeset) - build_set.jobNodeRequestComplete(req.job_name, nodeset) - else: - job.setWaitingStatus(f'node request: {req.id}') + self._makeNodepoolRequest(log, build_set, job, relative_priority) return True + def _makeNodepoolRequest(self, log, build_set, job, relative_priority, + alternative=0): + provider = self._getPausedParentProvider(build_set, job) + priority = self._calculateNodeRequestPriority(build_set, job) + tenant_name = build_set.item.pipeline.tenant.name + pipeline_name = build_set.item.pipeline.name + item = build_set.item + req = self.sched.nodepool.requestNodes( + build_set.uuid, job, tenant_name, pipeline_name, provider, + priority, relative_priority, event=item.event) + log.debug("Adding node request %s for job %s to item %s", + req, job, item) + build_set.setJobNodeRequestID(job.name, req.id) + if req.fulfilled: + nodeset = self.sched.nodepool.getNodeSet(req, job.nodeset) + build_set.jobNodeRequestComplete(req.job_name, nodeset) + else: + job.setWaitingStatus(f'node request: {req.id}') + def _getPausedParent(self, build_set, job): job_graph = build_set.job_graph if job_graph: @@ -1891,17 +1896,46 @@ class PipelineManager(metaclass=ABCMeta): build_set.setExtraRepoState(event.repo_state) build_set.repo_state_state = build_set.COMPLETE + def _handleNodeRequestFallback(self, log, build_set, job, old_request): + if len(job.nodeset_alternatives) <= job.nodeset_index + 1: + # No alternatives to fall back upon + return False + + # Increment the nodeset index and remove the old request + with job.activeContext(self.current_context): + job.nodeset_index = job.nodeset_index + 1 + + log.info("Re-attempting node request for job " + f"{job.name} of item {build_set.item} " + f"with nodeset alternative {job.nodeset_index}") + + build_set.removeJobNodeRequestID(job.name) + + # Make a new request + if self.sched.globals.use_relative_priority: + relative_priority = build_set.item.getNodePriority() + else: + relative_priority = 0 + log = build_set.item.annotateLogger(self.log) + self._makeNodepoolRequest(log, build_set, job, relative_priority) + return True + def onNodesProvisioned(self, request, nodeset, build_set): - # TODOv3(jeblair): handle provisioning failure here log = get_annotated_logger(self.log, request.event_id) self.reportPipelineTiming('node_request_time', request.created_time) - if nodeset is not None: - build_set.jobNodeRequestComplete(request.job_name, nodeset) + job = build_set.item.getJob(request.job_name) + # First see if we need to retry the request if not request.fulfilled: log.info("Node request %s: failure for %s", request, request.job_name) - job = build_set.item.getJob(request.job_name) + if self._handleNodeRequestFallback(log, build_set, job, request): + return + # No more fallbacks -- tell the buildset the request is complete + if nodeset is not None: + build_set.jobNodeRequestComplete(request.job_name, nodeset) + # Put a fake build through the cycle to clean it up. + if not request.fulfilled: fakebuild = build_set.item.setNodeRequestFailure(job) try: self.sql.reportBuildEnd( diff --git a/zuul/model.py b/zuul/model.py index 1b0395aee..f66a5e875 100644 --- a/zuul/model.py +++ b/zuul/model.py @@ -1383,6 +1383,7 @@ class NodeSet(ConfigObject): self.name = name or '' self.nodes = OrderedDict() self.groups = OrderedDict() + self.alternatives = [] def __ne__(self, other): return not self.__eq__(other) @@ -1391,7 +1392,9 @@ class NodeSet(ConfigObject): if not isinstance(other, NodeSet): return False return (self.name == other.name and - self.nodes == other.nodes) + self.nodes == other.nodes and + self.groups == other.groups and + self.alternatives == other.alternatives) def toDict(self): d = {} @@ -1402,6 +1405,12 @@ class NodeSet(ConfigObject): d['groups'] = [] for group in self.groups.values(): d['groups'].append(group.toDict()) + d['alternatives'] = [] + for alt in self.alternatives: + if isinstance(alt, NodeSet): + d['alternatives'].append(alt.toDict()) + else: + d['alternatives'].append(alt) return d @classmethod @@ -1411,6 +1420,12 @@ class NodeSet(ConfigObject): nodeset.addNode(Node.fromDict(node)) for group in data["groups"]: nodeset.addGroup(Group.fromDict(group)) + for alt in data.get('alternatives', []): + if isinstance(alt, str): + if isinstance(alt, str): + nodeset.addAlternative(alt) + else: + nodeset.addAlternative(NodeSet.fromDict(alt)) return nodeset def copy(self): @@ -1419,6 +1434,11 @@ class NodeSet(ConfigObject): n.addNode(Node(node.name, node.label)) for name, group in self.groups.items(): n.addGroup(Group(group.name, group.nodes[:])) + for alt in self.alternatives: + if isinstance(alt, str): + n.addAlternative(alt) + else: + n.addAlternative(alt.copy()) return n def addNode(self, node): @@ -1438,6 +1458,36 @@ class NodeSet(ConfigObject): def getGroups(self): return list(self.groups.values()) + def addAlternative(self, alt): + self.alternatives.append(alt) + + def flattenAlternatives(self, layout): + alts = [] + history = [] + self._flattenAlternatives(layout, self, alts, history) + return alts + + def _flattenAlternatives(self, layout, nodeset, + alternatives, history): + if isinstance(nodeset, str): + # This references an existing named nodeset in the layout. + ns = layout.nodesets.get(nodeset) + if ns is None: + raise Exception(f'The nodeset "{nodeset}" was not found.') + else: + ns = nodeset + if ns in history: + raise Exception(f'Nodeset cycle detected on "{nodeset}"') + history.append(ns) + if ns.alternatives: + for alt in ns.alternatives: + self._flattenAlternatives(layout, alt, alternatives, history) + else: + alternatives.append(ns) + + def validateReferences(self, layout): + self.flattenAlternatives(layout) + def __repr__(self): if self.name: name = self.name + ' ' @@ -2038,7 +2088,8 @@ class FrozenJob(zkobject.ZKObject): 'dependencies', 'inheritance_path', 'name', - 'nodeset', + 'nodeset_alternatives', + 'nodeset_index', 'override_branch', 'override_checkout', 'post_timeout', @@ -2149,8 +2200,8 @@ class FrozenJob(zkobject.ZKObject): if not hasattr(self, k): continue v = getattr(self, k) - if k == 'nodeset': - v = v.toDict() + if k == 'nodeset_alternatives': + v = [alt.toDict() for alt in v] elif k == 'dependencies': # frozenset of JobDependency v = [dep.toDict() for dep in v] @@ -2173,6 +2224,9 @@ class FrozenJob(zkobject.ZKObject): v = {'storage': 'local', 'data': v} data[k] = v + if (COMPONENT_REGISTRY.model_api < 9): + data['nodeset'] = data['nodeset_alternatives'][0] + # Use json_dumps to strip any ZuulMark entries return json_dumps(data, sort_keys=True).encode("utf8") @@ -2183,13 +2237,18 @@ class FrozenJob(zkobject.ZKObject): if 'deduplicate' not in data: data['deduplicate'] = 'auto' - if hasattr(self, 'nodeset'): - nodeset = self.nodeset + # MODEL_API < 9 + if data.get('nodeset'): + data['nodeset_alternatives'] = [data['nodeset']] + data['nodeset_index'] = 0 + del data['nodeset'] + + if hasattr(self, 'nodeset_alternatives'): + alts = self.nodeset_alternatives else: - nodeset = data.get('nodeset') - if nodeset: - nodeset = NodeSet.fromDict(nodeset) - data['nodeset'] = nodeset + alts = data.get('nodeset_alternatives', []) + alts = [NodeSet.fromDict(alt) for alt in alts] + data['nodeset_alternatives'] = alts if hasattr(self, 'dependencies'): data['dependencies'] = self.dependencies @@ -2250,6 +2309,12 @@ class FrozenJob(zkobject.ZKObject): return val @property + def nodeset(self): + if self.nodeset_alternatives: + return self.nodeset_alternatives[self.nodeset_index] + return None + + @property def parent_data(self): return self._getJobData('_parent_data') @@ -2459,12 +2524,11 @@ class Job(ConfigObject): d['parent'] = self.parent else: d['parent'] = tenant.default_base_job - if isinstance(self.nodeset, str): - ns = tenant.layout.nodesets.get(self.nodeset) - else: - ns = self.nodeset - if ns: - d['nodeset'] = ns.toDict() + alts = self.flattenNodesetAlternatives(tenant.layout) + if len(alts) == 1 and len(alts[0]): + d['nodeset'] = alts[0].toDict() + elif len(alts) > 1: + d['nodeset_alternatives'] = [x.toDict() for x in alts] if self.ansible_version: d['ansible_version'] = self.ansible_version else: @@ -2629,6 +2693,17 @@ class Job(ConfigObject): secrets.append(secret_value) playbook['secrets'][secret_key] = len(secrets) - 1 + def flattenNodesetAlternatives(self, layout): + nodeset = self.nodeset + if isinstance(nodeset, str): + # This references an existing named nodeset in the layout. + ns = layout.nodesets.get(nodeset) + if ns is None: + raise Exception(f'The nodeset "{nodeset}" was not found.') + else: + ns = nodeset + return ns.flattenAlternatives(layout) + def freezeJob(self, context, tenant, layout, item, redact_secrets_and_keys): buildset = item.current_build_set @@ -2640,6 +2715,9 @@ class Job(ConfigObject): attributes.discard('secrets') attributes.discard('affected_projects') attributes.discard('config_hash') + # Nodeset alternatives are flattened at this point + attributes.discard('nodeset_alternatives') + attributes.discard('nodeset_index') secrets = [] for k in attributes: # If this is a config object, it's frozen, so it's @@ -2663,6 +2741,8 @@ class Job(ConfigObject): for pb in v: self._deduplicateSecrets(context, secrets, pb) kw[k] = v + kw['nodeset_alternatives'] = self.flattenNodesetAlternatives(layout) + kw['nodeset_index'] = 0 kw['secrets'] = secrets kw['affected_projects'] = self._getAffectedProjects(tenant) kw['config_hash'] = self.getConfigHash(tenant) @@ -2735,7 +2815,7 @@ class Job(ConfigObject): if self._get('cleanup_run') is not None: self.cleanup_run = self.freezePlaybooks(self.cleanup_run, layout) - def getNodeSet(self, layout): + def getNodeset(self, layout): if isinstance(self.nodeset, str): # This references an existing named nodeset in the layout. ns = layout.nodesets.get(self.nodeset) @@ -2752,14 +2832,14 @@ class Job(ConfigObject): if not self.isBase() and self.parent: layout.getJob(self.parent) - ns = self.getNodeSet(layout) - if layout.tenant.max_nodes_per_job != -1 and \ - len(ns) > layout.tenant.max_nodes_per_job: - raise Exception( - 'The job "{job}" exceeds tenant ' - 'max-nodes-per-job {maxnodes}.'.format( - job=self.name, - maxnodes=layout.tenant.max_nodes_per_job)) + for ns in self.flattenNodesetAlternatives(layout): + if layout.tenant.max_nodes_per_job != -1 and \ + len(ns) > layout.tenant.max_nodes_per_job: + raise Exception( + 'The job "{job}" exceeds tenant ' + 'max-nodes-per-job {maxnodes}.'.format( + job=self.name, + maxnodes=layout.tenant.max_nodes_per_job)) for dependency in self.dependencies: layout.getJob(dependency.name) @@ -2956,7 +3036,7 @@ class Job(ConfigObject): self.addRoles(other.roles) # Freeze the nodeset - self.nodeset = self.getNodeSet(layout) + self.nodeset = self.getNodeset(layout) # Pass secrets to parents secrets_for_parents = [s for s in other.secrets if s.pass_to_parent] diff --git a/zuul/model_api.py b/zuul/model_api.py index 0534ee9c4..27d4a2b07 100644 --- a/zuul/model_api.py +++ b/zuul/model_api.py @@ -14,4 +14,4 @@ # When making ZK schema changes, increment this and add a record to # docs/developer/model-changelog.rst -MODEL_API = 8 +MODEL_API = 9 |