summaryrefslogtreecommitdiff
path: root/zuul
diff options
context:
space:
mode:
authorZuul <zuul@review.opendev.org>2022-09-16 16:52:44 +0000
committerGerrit Code Review <review@openstack.org>2022-09-16 16:52:44 +0000
commite612a442e37dc0fff00fe6bb2a219fa42c005c2a (patch)
tree110da0aeebfccd6fe9c3eb11fca0c7087fcb74f9 /zuul
parent9a83b7da5964643d6c38328f4d896b9b9b4de447 (diff)
parent1958bbad03d4c2eb59502df8682bdfcb248400c2 (diff)
downloadzuul-e612a442e37dc0fff00fe6bb2a219fa42c005c2a.tar.gz
Merge "Add nodeset alternatives"
Diffstat (limited to 'zuul')
-rw-r--r--zuul/configloader.py46
-rw-r--r--zuul/manager/__init__.py72
-rw-r--r--zuul/model.py132
-rw-r--r--zuul/model_api.py2
4 files changed, 200 insertions, 52 deletions
diff --git a/zuul/configloader.py b/zuul/configloader.py
index 0327f1796..037fc48aa 100644
--- a/zuul/configloader.py
+++ b/zuul/configloader.py
@@ -488,14 +488,26 @@ class NodeSetParser(object):
vs.Required('nodes'): to_list(str),
}
- nodeset = {vs.Required('nodes'): to_list(node),
- 'groups': to_list(group),
- '_source_context': model.SourceContext,
- '_start_mark': model.ZuulMark,
- }
+ real_nodeset = {vs.Required('nodes'): to_list(node),
+ 'groups': to_list(group),
+ }
+
+ alt_nodeset = {vs.Required('alternatives'):
+ [vs.Any(real_nodeset, str)]}
+ top_nodeset = {'_source_context': model.SourceContext,
+ '_start_mark': model.ZuulMark,
+ }
if not anonymous:
- nodeset[vs.Required('name')] = str
+ top_nodeset[vs.Required('name')] = str
+
+ top_real_nodeset = real_nodeset.copy()
+ top_real_nodeset.update(top_nodeset)
+ top_alt_nodeset = alt_nodeset.copy()
+ top_alt_nodeset.update(top_nodeset)
+
+ nodeset = vs.Any(top_real_nodeset, top_alt_nodeset)
+
return vs.Schema(nodeset)
def fromYaml(self, conf, anonymous=False):
@@ -503,6 +515,24 @@ class NodeSetParser(object):
self.anon_schema(conf)
else:
self.schema(conf)
+
+ if 'alternatives' in conf:
+ return self.loadAlternatives(conf)
+ else:
+ return self.loadNodeset(conf)
+
+ def loadAlternatives(self, conf):
+ ns = model.NodeSet(conf.get('name'))
+ ns.source_context = conf.get('_source_context')
+ ns.start_mark = conf.get('_start_mark')
+ for alt in conf['alternatives']:
+ if isinstance(alt, str):
+ ns.addAlternative(alt)
+ else:
+ ns.addAlternative(self.loadNodeset(alt))
+ return ns
+
+ def loadNodeset(self, conf):
ns = model.NodeSet(conf.get('name'))
ns.source_context = conf.get('_source_context')
ns.start_mark = conf.get('_start_mark')
@@ -2388,6 +2418,10 @@ class TenantParser(object):
# Now that all the jobs are loaded, verify references to other
# config objects.
+ for nodeset in layout.nodesets.values():
+ with reference_exceptions('nodeset', nodeset,
+ layout.loading_errors):
+ nodeset.validateReferences(layout)
for jobs in layout.jobs.values():
for job in jobs:
with reference_exceptions('job', job, layout.loading_errors):
diff --git a/zuul/manager/__init__.py b/zuul/manager/__init__.py
index dcaea5283..365435f3d 100644
--- a/zuul/manager/__init__.py
+++ b/zuul/manager/__init__.py
@@ -862,23 +862,28 @@ class PipelineManager(metaclass=ABCMeta):
else:
relative_priority = 0
for job in jobs:
- provider = self._getPausedParentProvider(build_set, job)
- priority = self._calculateNodeRequestPriority(build_set, job)
- tenant_name = build_set.item.pipeline.tenant.name
- pipeline_name = build_set.item.pipeline.name
- req = self.sched.nodepool.requestNodes(
- build_set.uuid, job, tenant_name, pipeline_name, provider,
- priority, relative_priority, event=item.event)
- log.debug("Adding node request %s for job %s to item %s",
- req, job, item)
- build_set.setJobNodeRequestID(job.name, req.id)
- if req.fulfilled:
- nodeset = self.sched.nodepool.getNodeSet(req, job.nodeset)
- build_set.jobNodeRequestComplete(req.job_name, nodeset)
- else:
- job.setWaitingStatus(f'node request: {req.id}')
+ self._makeNodepoolRequest(log, build_set, job, relative_priority)
return True
+ def _makeNodepoolRequest(self, log, build_set, job, relative_priority,
+ alternative=0):
+ provider = self._getPausedParentProvider(build_set, job)
+ priority = self._calculateNodeRequestPriority(build_set, job)
+ tenant_name = build_set.item.pipeline.tenant.name
+ pipeline_name = build_set.item.pipeline.name
+ item = build_set.item
+ req = self.sched.nodepool.requestNodes(
+ build_set.uuid, job, tenant_name, pipeline_name, provider,
+ priority, relative_priority, event=item.event)
+ log.debug("Adding node request %s for job %s to item %s",
+ req, job, item)
+ build_set.setJobNodeRequestID(job.name, req.id)
+ if req.fulfilled:
+ nodeset = self.sched.nodepool.getNodeSet(req, job.nodeset)
+ build_set.jobNodeRequestComplete(req.job_name, nodeset)
+ else:
+ job.setWaitingStatus(f'node request: {req.id}')
+
def _getPausedParent(self, build_set, job):
job_graph = build_set.job_graph
if job_graph:
@@ -1891,17 +1896,46 @@ class PipelineManager(metaclass=ABCMeta):
build_set.setExtraRepoState(event.repo_state)
build_set.repo_state_state = build_set.COMPLETE
+ def _handleNodeRequestFallback(self, log, build_set, job, old_request):
+ if len(job.nodeset_alternatives) <= job.nodeset_index + 1:
+ # No alternatives to fall back upon
+ return False
+
+ # Increment the nodeset index and remove the old request
+ with job.activeContext(self.current_context):
+ job.nodeset_index = job.nodeset_index + 1
+
+ log.info("Re-attempting node request for job "
+ f"{job.name} of item {build_set.item} "
+ f"with nodeset alternative {job.nodeset_index}")
+
+ build_set.removeJobNodeRequestID(job.name)
+
+ # Make a new request
+ if self.sched.globals.use_relative_priority:
+ relative_priority = build_set.item.getNodePriority()
+ else:
+ relative_priority = 0
+ log = build_set.item.annotateLogger(self.log)
+ self._makeNodepoolRequest(log, build_set, job, relative_priority)
+ return True
+
def onNodesProvisioned(self, request, nodeset, build_set):
- # TODOv3(jeblair): handle provisioning failure here
log = get_annotated_logger(self.log, request.event_id)
self.reportPipelineTiming('node_request_time', request.created_time)
- if nodeset is not None:
- build_set.jobNodeRequestComplete(request.job_name, nodeset)
+ job = build_set.item.getJob(request.job_name)
+ # First see if we need to retry the request
if not request.fulfilled:
log.info("Node request %s: failure for %s",
request, request.job_name)
- job = build_set.item.getJob(request.job_name)
+ if self._handleNodeRequestFallback(log, build_set, job, request):
+ return
+ # No more fallbacks -- tell the buildset the request is complete
+ if nodeset is not None:
+ build_set.jobNodeRequestComplete(request.job_name, nodeset)
+ # Put a fake build through the cycle to clean it up.
+ if not request.fulfilled:
fakebuild = build_set.item.setNodeRequestFailure(job)
try:
self.sql.reportBuildEnd(
diff --git a/zuul/model.py b/zuul/model.py
index 1b0395aee..f66a5e875 100644
--- a/zuul/model.py
+++ b/zuul/model.py
@@ -1383,6 +1383,7 @@ class NodeSet(ConfigObject):
self.name = name or ''
self.nodes = OrderedDict()
self.groups = OrderedDict()
+ self.alternatives = []
def __ne__(self, other):
return not self.__eq__(other)
@@ -1391,7 +1392,9 @@ class NodeSet(ConfigObject):
if not isinstance(other, NodeSet):
return False
return (self.name == other.name and
- self.nodes == other.nodes)
+ self.nodes == other.nodes and
+ self.groups == other.groups and
+ self.alternatives == other.alternatives)
def toDict(self):
d = {}
@@ -1402,6 +1405,12 @@ class NodeSet(ConfigObject):
d['groups'] = []
for group in self.groups.values():
d['groups'].append(group.toDict())
+ d['alternatives'] = []
+ for alt in self.alternatives:
+ if isinstance(alt, NodeSet):
+ d['alternatives'].append(alt.toDict())
+ else:
+ d['alternatives'].append(alt)
return d
@classmethod
@@ -1411,6 +1420,12 @@ class NodeSet(ConfigObject):
nodeset.addNode(Node.fromDict(node))
for group in data["groups"]:
nodeset.addGroup(Group.fromDict(group))
+ for alt in data.get('alternatives', []):
+ if isinstance(alt, str):
+ if isinstance(alt, str):
+ nodeset.addAlternative(alt)
+ else:
+ nodeset.addAlternative(NodeSet.fromDict(alt))
return nodeset
def copy(self):
@@ -1419,6 +1434,11 @@ class NodeSet(ConfigObject):
n.addNode(Node(node.name, node.label))
for name, group in self.groups.items():
n.addGroup(Group(group.name, group.nodes[:]))
+ for alt in self.alternatives:
+ if isinstance(alt, str):
+ n.addAlternative(alt)
+ else:
+ n.addAlternative(alt.copy())
return n
def addNode(self, node):
@@ -1438,6 +1458,36 @@ class NodeSet(ConfigObject):
def getGroups(self):
return list(self.groups.values())
+ def addAlternative(self, alt):
+ self.alternatives.append(alt)
+
+ def flattenAlternatives(self, layout):
+ alts = []
+ history = []
+ self._flattenAlternatives(layout, self, alts, history)
+ return alts
+
+ def _flattenAlternatives(self, layout, nodeset,
+ alternatives, history):
+ if isinstance(nodeset, str):
+ # This references an existing named nodeset in the layout.
+ ns = layout.nodesets.get(nodeset)
+ if ns is None:
+ raise Exception(f'The nodeset "{nodeset}" was not found.')
+ else:
+ ns = nodeset
+ if ns in history:
+ raise Exception(f'Nodeset cycle detected on "{nodeset}"')
+ history.append(ns)
+ if ns.alternatives:
+ for alt in ns.alternatives:
+ self._flattenAlternatives(layout, alt, alternatives, history)
+ else:
+ alternatives.append(ns)
+
+ def validateReferences(self, layout):
+ self.flattenAlternatives(layout)
+
def __repr__(self):
if self.name:
name = self.name + ' '
@@ -2038,7 +2088,8 @@ class FrozenJob(zkobject.ZKObject):
'dependencies',
'inheritance_path',
'name',
- 'nodeset',
+ 'nodeset_alternatives',
+ 'nodeset_index',
'override_branch',
'override_checkout',
'post_timeout',
@@ -2149,8 +2200,8 @@ class FrozenJob(zkobject.ZKObject):
if not hasattr(self, k):
continue
v = getattr(self, k)
- if k == 'nodeset':
- v = v.toDict()
+ if k == 'nodeset_alternatives':
+ v = [alt.toDict() for alt in v]
elif k == 'dependencies':
# frozenset of JobDependency
v = [dep.toDict() for dep in v]
@@ -2173,6 +2224,9 @@ class FrozenJob(zkobject.ZKObject):
v = {'storage': 'local', 'data': v}
data[k] = v
+ if (COMPONENT_REGISTRY.model_api < 9):
+ data['nodeset'] = data['nodeset_alternatives'][0]
+
# Use json_dumps to strip any ZuulMark entries
return json_dumps(data, sort_keys=True).encode("utf8")
@@ -2183,13 +2237,18 @@ class FrozenJob(zkobject.ZKObject):
if 'deduplicate' not in data:
data['deduplicate'] = 'auto'
- if hasattr(self, 'nodeset'):
- nodeset = self.nodeset
+ # MODEL_API < 9
+ if data.get('nodeset'):
+ data['nodeset_alternatives'] = [data['nodeset']]
+ data['nodeset_index'] = 0
+ del data['nodeset']
+
+ if hasattr(self, 'nodeset_alternatives'):
+ alts = self.nodeset_alternatives
else:
- nodeset = data.get('nodeset')
- if nodeset:
- nodeset = NodeSet.fromDict(nodeset)
- data['nodeset'] = nodeset
+ alts = data.get('nodeset_alternatives', [])
+ alts = [NodeSet.fromDict(alt) for alt in alts]
+ data['nodeset_alternatives'] = alts
if hasattr(self, 'dependencies'):
data['dependencies'] = self.dependencies
@@ -2250,6 +2309,12 @@ class FrozenJob(zkobject.ZKObject):
return val
@property
+ def nodeset(self):
+ if self.nodeset_alternatives:
+ return self.nodeset_alternatives[self.nodeset_index]
+ return None
+
+ @property
def parent_data(self):
return self._getJobData('_parent_data')
@@ -2459,12 +2524,11 @@ class Job(ConfigObject):
d['parent'] = self.parent
else:
d['parent'] = tenant.default_base_job
- if isinstance(self.nodeset, str):
- ns = tenant.layout.nodesets.get(self.nodeset)
- else:
- ns = self.nodeset
- if ns:
- d['nodeset'] = ns.toDict()
+ alts = self.flattenNodesetAlternatives(tenant.layout)
+ if len(alts) == 1 and len(alts[0]):
+ d['nodeset'] = alts[0].toDict()
+ elif len(alts) > 1:
+ d['nodeset_alternatives'] = [x.toDict() for x in alts]
if self.ansible_version:
d['ansible_version'] = self.ansible_version
else:
@@ -2629,6 +2693,17 @@ class Job(ConfigObject):
secrets.append(secret_value)
playbook['secrets'][secret_key] = len(secrets) - 1
+ def flattenNodesetAlternatives(self, layout):
+ nodeset = self.nodeset
+ if isinstance(nodeset, str):
+ # This references an existing named nodeset in the layout.
+ ns = layout.nodesets.get(nodeset)
+ if ns is None:
+ raise Exception(f'The nodeset "{nodeset}" was not found.')
+ else:
+ ns = nodeset
+ return ns.flattenAlternatives(layout)
+
def freezeJob(self, context, tenant, layout, item,
redact_secrets_and_keys):
buildset = item.current_build_set
@@ -2640,6 +2715,9 @@ class Job(ConfigObject):
attributes.discard('secrets')
attributes.discard('affected_projects')
attributes.discard('config_hash')
+ # Nodeset alternatives are flattened at this point
+ attributes.discard('nodeset_alternatives')
+ attributes.discard('nodeset_index')
secrets = []
for k in attributes:
# If this is a config object, it's frozen, so it's
@@ -2663,6 +2741,8 @@ class Job(ConfigObject):
for pb in v:
self._deduplicateSecrets(context, secrets, pb)
kw[k] = v
+ kw['nodeset_alternatives'] = self.flattenNodesetAlternatives(layout)
+ kw['nodeset_index'] = 0
kw['secrets'] = secrets
kw['affected_projects'] = self._getAffectedProjects(tenant)
kw['config_hash'] = self.getConfigHash(tenant)
@@ -2735,7 +2815,7 @@ class Job(ConfigObject):
if self._get('cleanup_run') is not None:
self.cleanup_run = self.freezePlaybooks(self.cleanup_run, layout)
- def getNodeSet(self, layout):
+ def getNodeset(self, layout):
if isinstance(self.nodeset, str):
# This references an existing named nodeset in the layout.
ns = layout.nodesets.get(self.nodeset)
@@ -2752,14 +2832,14 @@ class Job(ConfigObject):
if not self.isBase() and self.parent:
layout.getJob(self.parent)
- ns = self.getNodeSet(layout)
- if layout.tenant.max_nodes_per_job != -1 and \
- len(ns) > layout.tenant.max_nodes_per_job:
- raise Exception(
- 'The job "{job}" exceeds tenant '
- 'max-nodes-per-job {maxnodes}.'.format(
- job=self.name,
- maxnodes=layout.tenant.max_nodes_per_job))
+ for ns in self.flattenNodesetAlternatives(layout):
+ if layout.tenant.max_nodes_per_job != -1 and \
+ len(ns) > layout.tenant.max_nodes_per_job:
+ raise Exception(
+ 'The job "{job}" exceeds tenant '
+ 'max-nodes-per-job {maxnodes}.'.format(
+ job=self.name,
+ maxnodes=layout.tenant.max_nodes_per_job))
for dependency in self.dependencies:
layout.getJob(dependency.name)
@@ -2956,7 +3036,7 @@ class Job(ConfigObject):
self.addRoles(other.roles)
# Freeze the nodeset
- self.nodeset = self.getNodeSet(layout)
+ self.nodeset = self.getNodeset(layout)
# Pass secrets to parents
secrets_for_parents = [s for s in other.secrets if s.pass_to_parent]
diff --git a/zuul/model_api.py b/zuul/model_api.py
index 0534ee9c4..27d4a2b07 100644
--- a/zuul/model_api.py
+++ b/zuul/model_api.py
@@ -14,4 +14,4 @@
# When making ZK schema changes, increment this and add a record to
# docs/developer/model-changelog.rst
-MODEL_API = 8
+MODEL_API = 9