summaryrefslogtreecommitdiff
path: root/zuul/model.py
diff options
context:
space:
mode:
Diffstat (limited to 'zuul/model.py')
-rw-r--r--zuul/model.py253
1 files changed, 203 insertions, 50 deletions
diff --git a/zuul/model.py b/zuul/model.py
index 0d889f557..32b1cfadf 100644
--- a/zuul/model.py
+++ b/zuul/model.py
@@ -33,6 +33,7 @@ import itertools
from kazoo.exceptions import NodeExistsError, NoNodeError
from cachetools.func import lru_cache
+from opentelemetry import trace
from zuul.lib import yamlutil as yaml
from zuul.lib.varnames import check_varnames
@@ -45,6 +46,7 @@ from zuul.lib.result_data import get_artifacts_from_result_data
from zuul.lib.logutil import get_annotated_logger
from zuul.lib.capabilities import capabilities_registry
from zuul.lib.jsonutil import json_dumps
+from zuul.lib import tracing
from zuul.zk import zkobject
from zuul.zk.blob_store import BlobStore
from zuul.zk.change_cache import ChangeKey
@@ -329,6 +331,9 @@ class Attributes(object):
def __init__(self, **kw):
setattr(self, '__dict__', kw)
+ def toDict(self):
+ return self.__dict__
+
class Freezable(object):
"""A mix-in class so that an object can be made immutable"""
@@ -659,6 +664,13 @@ class PipelineState(zkobject.ZKObject):
safe_pipeline = urllib.parse.quote_plus(pipeline.name)
return f"/zuul/tenant/{safe_tenant}/pipeline/{safe_pipeline}"
+ @classmethod
+ def parsePath(self, path):
+ """Return path components for use by the REST API"""
+ root, safe_tenant, pipeline, safe_pipeline = path.rsplit('/', 3)
+ return (urllib.parse.unquote_plus(safe_tenant),
+ urllib.parse.unquote_plus(safe_pipeline))
+
def _dirtyPath(self):
return f'{self.getPath()}/dirty'
@@ -1087,13 +1099,16 @@ class ChangeQueue(zkobject.ZKObject):
def matches(self, project_cname, branch):
return (project_cname, branch) in self.project_branches
- def enqueueChange(self, change, event):
+ def enqueueChange(self, change, event, span_info=None, enqueue_time=None):
+ if enqueue_time is None:
+ enqueue_time = time.time()
item = QueueItem.new(self.zk_context,
queue=self,
pipeline=self.pipeline,
change=change,
event=event,
- enqueue_time=time.time())
+ span_info=span_info,
+ enqueue_time=enqueue_time)
self.enqueueItem(item)
return item
@@ -1383,6 +1398,7 @@ class NodeSet(ConfigObject):
self.name = name or ''
self.nodes = OrderedDict()
self.groups = OrderedDict()
+ self.alternatives = []
def __ne__(self, other):
return not self.__eq__(other)
@@ -1391,7 +1407,9 @@ class NodeSet(ConfigObject):
if not isinstance(other, NodeSet):
return False
return (self.name == other.name and
- self.nodes == other.nodes)
+ self.nodes == other.nodes and
+ self.groups == other.groups and
+ self.alternatives == other.alternatives)
def toDict(self):
d = {}
@@ -1402,6 +1420,12 @@ class NodeSet(ConfigObject):
d['groups'] = []
for group in self.groups.values():
d['groups'].append(group.toDict())
+ d['alternatives'] = []
+ for alt in self.alternatives:
+ if isinstance(alt, NodeSet):
+ d['alternatives'].append(alt.toDict())
+ else:
+ d['alternatives'].append(alt)
return d
@classmethod
@@ -1411,6 +1435,12 @@ class NodeSet(ConfigObject):
nodeset.addNode(Node.fromDict(node))
for group in data["groups"]:
nodeset.addGroup(Group.fromDict(group))
+ for alt in data.get('alternatives', []):
+ if isinstance(alt, str):
+ if isinstance(alt, str):
+ nodeset.addAlternative(alt)
+ else:
+ nodeset.addAlternative(NodeSet.fromDict(alt))
return nodeset
def copy(self):
@@ -1419,6 +1449,11 @@ class NodeSet(ConfigObject):
n.addNode(Node(node.name, node.label))
for name, group in self.groups.items():
n.addGroup(Group(group.name, group.nodes[:]))
+ for alt in self.alternatives:
+ if isinstance(alt, str):
+ n.addAlternative(alt)
+ else:
+ n.addAlternative(alt.copy())
return n
def addNode(self, node):
@@ -1438,6 +1473,36 @@ class NodeSet(ConfigObject):
def getGroups(self):
return list(self.groups.values())
+ def addAlternative(self, alt):
+ self.alternatives.append(alt)
+
+ def flattenAlternatives(self, layout):
+ alts = []
+ history = []
+ self._flattenAlternatives(layout, self, alts, history)
+ return alts
+
+ def _flattenAlternatives(self, layout, nodeset,
+ alternatives, history):
+ if isinstance(nodeset, str):
+ # This references an existing named nodeset in the layout.
+ ns = layout.nodesets.get(nodeset)
+ if ns is None:
+ raise Exception(f'The nodeset "{nodeset}" was not found.')
+ else:
+ ns = nodeset
+ if ns in history:
+ raise Exception(f'Nodeset cycle detected on "{nodeset}"')
+ history.append(ns)
+ if ns.alternatives:
+ for alt in ns.alternatives:
+ self._flattenAlternatives(layout, alt, alternatives, history)
+ else:
+ alternatives.append(ns)
+
+ def validateReferences(self, layout):
+ self.flattenAlternatives(layout)
+
def __repr__(self):
if self.name:
name = self.name + ' '
@@ -1454,7 +1519,7 @@ class NodeRequest(object):
def __init__(self, requestor, build_set_uuid, tenant_name, pipeline_name,
job_name, labels, provider, relative_priority,
- event_id=None):
+ event_id=None, span_info=None):
self.requestor = requestor
self.build_set_uuid = build_set_uuid
self.tenant_name = tenant_name
@@ -1472,6 +1537,7 @@ class NodeRequest(object):
self.id = None
self._zk_data = {} # Data that we read back from ZK
self.event_id = event_id
+ self.span_info = span_info
# Zuul internal flags (not stored in ZK so they are not
# overwritten).
self.failed = False
@@ -1522,6 +1588,7 @@ class NodeRequest(object):
"tenant_name": self.tenant_name,
"pipeline_name": self.pipeline_name,
"job_name": self.job_name,
+ "span_info": self.span_info,
}
d.setdefault('node_types', self.labels)
d.setdefault('requestor', self.requestor)
@@ -1570,6 +1637,7 @@ class NodeRequest(object):
labels=data["node_types"],
provider=data["provider"],
relative_priority=data.get("relative_priority", 0),
+ span_info=requestor_data.get("span_info"),
)
request.updateFromDict(data)
@@ -2038,7 +2106,8 @@ class FrozenJob(zkobject.ZKObject):
'dependencies',
'inheritance_path',
'name',
- 'nodeset',
+ 'nodeset_alternatives',
+ 'nodeset_index',
'override_branch',
'override_checkout',
'post_timeout',
@@ -2149,8 +2218,8 @@ class FrozenJob(zkobject.ZKObject):
if not hasattr(self, k):
continue
v = getattr(self, k)
- if k == 'nodeset':
- v = v.toDict()
+ if k == 'nodeset_alternatives':
+ v = [alt.toDict() for alt in v]
elif k == 'dependencies':
# frozenset of JobDependency
v = [dep.toDict() for dep in v]
@@ -2173,6 +2242,9 @@ class FrozenJob(zkobject.ZKObject):
v = {'storage': 'local', 'data': v}
data[k] = v
+ if (COMPONENT_REGISTRY.model_api < 9):
+ data['nodeset'] = data['nodeset_alternatives'][0]
+
# Use json_dumps to strip any ZuulMark entries
return json_dumps(data, sort_keys=True).encode("utf8")
@@ -2183,13 +2255,18 @@ class FrozenJob(zkobject.ZKObject):
if 'deduplicate' not in data:
data['deduplicate'] = 'auto'
- if hasattr(self, 'nodeset'):
- nodeset = self.nodeset
+ # MODEL_API < 9
+ if data.get('nodeset'):
+ data['nodeset_alternatives'] = [data['nodeset']]
+ data['nodeset_index'] = 0
+ del data['nodeset']
+
+ if hasattr(self, 'nodeset_alternatives'):
+ alts = self.nodeset_alternatives
else:
- nodeset = data.get('nodeset')
- if nodeset:
- nodeset = NodeSet.fromDict(nodeset)
- data['nodeset'] = nodeset
+ alts = data.get('nodeset_alternatives', [])
+ alts = [NodeSet.fromDict(alt) for alt in alts]
+ data['nodeset_alternatives'] = alts
if hasattr(self, 'dependencies'):
data['dependencies'] = self.dependencies
@@ -2250,6 +2327,12 @@ class FrozenJob(zkobject.ZKObject):
return val
@property
+ def nodeset(self):
+ if self.nodeset_alternatives:
+ return self.nodeset_alternatives[self.nodeset_index]
+ return None
+
+ @property
def parent_data(self):
return self._getJobData('_parent_data')
@@ -2459,12 +2542,11 @@ class Job(ConfigObject):
d['parent'] = self.parent
else:
d['parent'] = tenant.default_base_job
- if isinstance(self.nodeset, str):
- ns = tenant.layout.nodesets.get(self.nodeset)
- else:
- ns = self.nodeset
- if ns:
- d['nodeset'] = ns.toDict()
+ alts = self.flattenNodesetAlternatives(tenant.layout)
+ if len(alts) == 1 and len(alts[0]):
+ d['nodeset'] = alts[0].toDict()
+ elif len(alts) > 1:
+ d['nodeset_alternatives'] = [x.toDict() for x in alts]
if self.ansible_version:
d['ansible_version'] = self.ansible_version
else:
@@ -2629,6 +2711,17 @@ class Job(ConfigObject):
secrets.append(secret_value)
playbook['secrets'][secret_key] = len(secrets) - 1
+ def flattenNodesetAlternatives(self, layout):
+ nodeset = self.nodeset
+ if isinstance(nodeset, str):
+ # This references an existing named nodeset in the layout.
+ ns = layout.nodesets.get(nodeset)
+ if ns is None:
+ raise Exception(f'The nodeset "{nodeset}" was not found.')
+ else:
+ ns = nodeset
+ return ns.flattenAlternatives(layout)
+
def freezeJob(self, context, tenant, layout, item,
redact_secrets_and_keys):
buildset = item.current_build_set
@@ -2640,6 +2733,9 @@ class Job(ConfigObject):
attributes.discard('secrets')
attributes.discard('affected_projects')
attributes.discard('config_hash')
+ # Nodeset alternatives are flattened at this point
+ attributes.discard('nodeset_alternatives')
+ attributes.discard('nodeset_index')
secrets = []
for k in attributes:
# If this is a config object, it's frozen, so it's
@@ -2663,6 +2759,8 @@ class Job(ConfigObject):
for pb in v:
self._deduplicateSecrets(context, secrets, pb)
kw[k] = v
+ kw['nodeset_alternatives'] = self.flattenNodesetAlternatives(layout)
+ kw['nodeset_index'] = 0
kw['secrets'] = secrets
kw['affected_projects'] = self._getAffectedProjects(tenant)
kw['config_hash'] = self.getConfigHash(tenant)
@@ -2735,7 +2833,7 @@ class Job(ConfigObject):
if self._get('cleanup_run') is not None:
self.cleanup_run = self.freezePlaybooks(self.cleanup_run, layout)
- def getNodeSet(self, layout):
+ def getNodeset(self, layout):
if isinstance(self.nodeset, str):
# This references an existing named nodeset in the layout.
ns = layout.nodesets.get(self.nodeset)
@@ -2752,14 +2850,14 @@ class Job(ConfigObject):
if not self.isBase() and self.parent:
layout.getJob(self.parent)
- ns = self.getNodeSet(layout)
- if layout.tenant.max_nodes_per_job != -1 and \
- len(ns) > layout.tenant.max_nodes_per_job:
- raise Exception(
- 'The job "{job}" exceeds tenant '
- 'max-nodes-per-job {maxnodes}.'.format(
- job=self.name,
- maxnodes=layout.tenant.max_nodes_per_job))
+ for ns in self.flattenNodesetAlternatives(layout):
+ if layout.tenant.max_nodes_per_job != -1 and \
+ len(ns) > layout.tenant.max_nodes_per_job:
+ raise Exception(
+ 'The job "{job}" exceeds tenant '
+ 'max-nodes-per-job {maxnodes}.'.format(
+ job=self.name,
+ maxnodes=layout.tenant.max_nodes_per_job))
for dependency in self.dependencies:
layout.getJob(dependency.name)
@@ -2956,7 +3054,7 @@ class Job(ConfigObject):
self.addRoles(other.roles)
# Freeze the nodeset
- self.nodeset = self.getNodeSet(layout)
+ self.nodeset = self.getNodeset(layout)
# Pass secrets to parents
secrets_for_parents = [s for s in other.secrets if s.pass_to_parent]
@@ -3297,7 +3395,8 @@ class JobRequest:
# This object participates in transactions, and therefore must
# remain small and unsharded.
- def __init__(self, uuid, precedence=None, state=None, result_path=None):
+ def __init__(self, uuid, precedence=None, state=None, result_path=None,
+ span_context=None):
self.uuid = uuid
if precedence is None:
self.precedence = 0
@@ -3310,6 +3409,12 @@ class JobRequest:
self.state = state
# Path to the future result if requested
self.result_path = result_path
+ # Reference to the parent span
+ if span_context:
+ self.span_context = span_context
+ else:
+ span = trace.get_current_span()
+ self.span_context = tracing.getSpanContext(span)
# ZK related data not serialized
self.path = None
@@ -3322,12 +3427,14 @@ class JobRequest:
"state": self.state,
"precedence": self.precedence,
"result_path": self.result_path,
+ "span_context": self.span_context,
}
def updateFromDict(self, data):
self.precedence = data["precedence"]
self.state = data["state"]
self.result_path = data["result_path"]
+ self.span_context = data.get("span_context")
@classmethod
def fromDict(cls, data):
@@ -3335,7 +3442,8 @@ class JobRequest:
data["uuid"],
precedence=data["precedence"],
state=data["state"],
- result_path=data["result_path"]
+ result_path=data["result_path"],
+ span_context=data.get("span_context"),
)
def __lt__(self, other):
@@ -3375,13 +3483,14 @@ class MergeRequest(JobRequest):
def __init__(self, uuid, job_type, build_set_uuid, tenant_name,
pipeline_name, event_id, precedence=None, state=None,
- result_path=None):
- super().__init__(uuid, precedence, state, result_path)
+ result_path=None, span_context=None, span_info=None):
+ super().__init__(uuid, precedence, state, result_path, span_context)
self.job_type = job_type
self.build_set_uuid = build_set_uuid
self.tenant_name = tenant_name
self.pipeline_name = pipeline_name
self.event_id = event_id
+ self.span_info = span_info
def toDict(self):
d = super().toDict()
@@ -3391,6 +3500,7 @@ class MergeRequest(JobRequest):
"tenant_name": self.tenant_name,
"pipeline_name": self.pipeline_name,
"event_id": self.event_id,
+ "span_info": self.span_info,
})
return d
@@ -3405,7 +3515,9 @@ class MergeRequest(JobRequest):
data["event_id"],
precedence=data["precedence"],
state=data["state"],
- result_path=data["result_path"]
+ result_path=data["result_path"],
+ span_context=data.get("span_context"),
+ span_info=data.get("span_info"),
)
def __repr__(self):
@@ -3425,8 +3537,8 @@ class BuildRequest(JobRequest):
def __init__(self, uuid, zone, build_set_uuid, job_name, tenant_name,
pipeline_name, event_id, precedence=None, state=None,
- result_path=None):
- super().__init__(uuid, precedence, state, result_path)
+ result_path=None, span_context=None):
+ super().__init__(uuid, precedence, state, result_path, span_context)
self.zone = zone
self.build_set_uuid = build_set_uuid
self.job_name = job_name
@@ -3463,7 +3575,8 @@ class BuildRequest(JobRequest):
data["event_id"],
precedence=data["precedence"],
state=data["state"],
- result_path=data["result_path"]
+ result_path=data["result_path"],
+ span_context=data.get("span_context"),
)
request.worker_info = data["worker_info"]
@@ -3521,6 +3634,7 @@ class Build(zkobject.ZKObject):
held=False,
zuul_event_id=None,
build_request_ref=None,
+ span_info=None,
)
def serialize(self, context):
@@ -3539,6 +3653,7 @@ class Build(zkobject.ZKObject):
"held": self.held,
"zuul_event_id": self.zuul_event_id,
"build_request_ref": self.build_request_ref,
+ "span_info": self.span_info,
}
if COMPONENT_REGISTRY.model_api < 5:
data["_result_data"] = (self._result_data.getPath()
@@ -3795,6 +3910,7 @@ class BuildSet(zkobject.ZKObject):
tries={},
files_state=self.NEW,
repo_state_state=self.NEW,
+ span_info=None,
configured=False,
configured_time=None, # When setConfigured was called
start_time=None, # When the buildset reported start
@@ -3871,6 +3987,13 @@ class BuildSet(zkobject.ZKObject):
def getPath(self):
return f"{self.item.getPath()}/buildset/{self.uuid}"
+ @classmethod
+ def parsePath(self, path):
+ """Return path components for use by the REST API"""
+ item_path, bs, uuid = path.rsplit('/', 2)
+ tenant, pipeline, item_uuid = QueueItem.parsePath(item_path)
+ return (tenant, pipeline, item_uuid, uuid)
+
def serialize(self, context):
data = {
# "item": self.item,
@@ -3903,6 +4026,7 @@ class BuildSet(zkobject.ZKObject):
"fail_fast": self.fail_fast,
"job_graph": (self.job_graph.toDict()
if self.job_graph else None),
+ "span_info": self.span_info,
"configured_time": self.configured_time,
"start_time": self.start_time,
"repo_state_request_time": self.repo_state_request_time,
@@ -4242,6 +4366,7 @@ class QueueItem(zkobject.ZKObject):
current_build_set=None,
item_ahead=None,
items_behind=[],
+ span_info=None,
enqueue_time=None,
report_time=None,
dequeue_time=None,
@@ -4273,8 +4398,12 @@ class QueueItem(zkobject.ZKObject):
obj._save(context, data, create=True)
files_state = (BuildSet.COMPLETE if obj.change.files is not None
else BuildSet.NEW)
- obj.updateAttributes(context, current_build_set=BuildSet.new(
- context, item=obj, files_state=files_state))
+
+ with trace.use_span(tracing.restoreSpan(obj.span_info)):
+ buildset_span_info = tracing.startSavedSpan("BuildSet")
+ obj.updateAttributes(context, current_build_set=BuildSet.new(
+ context, item=obj, files_state=files_state,
+ span_info=buildset_span_info))
return obj
def getPath(self):
@@ -4285,6 +4414,13 @@ class QueueItem(zkobject.ZKObject):
def itemPath(cls, pipeline_path, item_uuid):
return f"{pipeline_path}/item/{item_uuid}"
+ @classmethod
+ def parsePath(self, path):
+ """Return path components for use by the REST API"""
+ pipeline_path, item, uuid = path.rsplit('/', 2)
+ tenant, pipeline = PipelineState.parsePath(pipeline_path)
+ return (tenant, pipeline, uuid)
+
def serialize(self, context):
if isinstance(self.event, TriggerEvent):
event_type = "TriggerEvent"
@@ -4304,6 +4440,7 @@ class QueueItem(zkobject.ZKObject):
self.current_build_set.getPath()),
"item_ahead": self.item_ahead and self.item_ahead.getPath(),
"items_behind": [i.getPath() for i in self.items_behind],
+ "span_info": self.span_info,
"enqueue_time": self.enqueue_time,
"report_time": self.report_time,
"dequeue_time": self.dequeue_time,
@@ -4383,11 +4520,21 @@ class QueueItem(zkobject.ZKObject):
old_build_set = self.current_build_set
files_state = (BuildSet.COMPLETE if self.change.files is not None
else BuildSet.NEW)
- self.updateAttributes(
- context,
- current_build_set=BuildSet.new(context, item=self,
- files_state=files_state),
- layout_uuid=None)
+
+ with trace.use_span(tracing.restoreSpan(self.span_info)):
+ old_buildset_span = tracing.restoreSpan(old_build_set.span_info)
+ link = trace.Link(
+ old_buildset_span.get_span_context(),
+ attributes={"previous_buildset": old_build_set.uuid})
+ buildset_span_info = tracing.startSavedSpan(
+ "BuildSet", links=[link])
+
+ self.updateAttributes(
+ context,
+ current_build_set=BuildSet.new(context, item=self,
+ files_state=files_state,
+ span_info=buildset_span_info),
+ layout_uuid=None)
old_build_set.delete(context)
def addBuild(self, build):
@@ -6262,7 +6409,7 @@ class MergeCompletedEvent(ResultEvent):
def __init__(self, request_uuid, build_set_uuid, merged, updated,
commit, files, repo_state, item_in_branches,
- errors, elapsed_time):
+ errors, elapsed_time, span_info=None):
self.request_uuid = request_uuid
self.build_set_uuid = build_set_uuid
self.merged = merged
@@ -6273,6 +6420,7 @@ class MergeCompletedEvent(ResultEvent):
self.item_in_branches = item_in_branches or []
self.errors = errors or []
self.elapsed_time = elapsed_time
+ self.span_info = span_info
def __repr__(self):
return ('<MergeCompletedEvent job: %s buildset: %s merged: %s '
@@ -6293,6 +6441,7 @@ class MergeCompletedEvent(ResultEvent):
"item_in_branches": list(self.item_in_branches),
"errors": list(self.errors),
"elapsed_time": self.elapsed_time,
+ "span_info": self.span_info,
}
@classmethod
@@ -6308,6 +6457,7 @@ class MergeCompletedEvent(ResultEvent):
list(data.get("item_in_branches", [])),
list(data.get("errors", [])),
data.get("elapsed_time"),
+ data.get("span_info"),
)
@@ -6319,24 +6469,31 @@ class FilesChangesCompletedEvent(ResultEvent):
:arg float elapsed_time: Elapsed time of merge op in seconds.
"""
- def __init__(self, build_set_uuid, files, elapsed_time):
+ def __init__(self, request_uuid, build_set_uuid, files, elapsed_time,
+ span_info=None):
+ self.request_uuid = request_uuid
self.build_set_uuid = build_set_uuid
self.files = files or []
self.elapsed_time = elapsed_time
+ self.span_info = span_info
def toDict(self):
return {
+ "request_uuid": self.request_uuid,
"build_set_uuid": self.build_set_uuid,
"files": list(self.files),
"elapsed_time": self.elapsed_time,
+ "span_info": self.span_info,
}
@classmethod
def fromDict(cls, data):
return cls(
+ data.get("request_uuid"),
data.get("build_set_uuid"),
list(data.get("files", [])),
data.get("elapsed_time"),
+ data.get("span_info"),
)
@@ -6619,7 +6776,6 @@ class ProjectPipelineConfig(ConfigObject):
def __init__(self):
super(ProjectPipelineConfig, self).__init__()
self.job_list = JobList()
- self.queue_name = None
self.debug = False
self.debug_messages = []
self.fail_fast = None
@@ -6631,8 +6787,6 @@ class ProjectPipelineConfig(ConfigObject):
def update(self, other):
if not isinstance(other, ProjectPipelineConfig):
raise Exception("Unable to update from %s" % (other,))
- if self.queue_name is None:
- self.queue_name = other.queue_name
if other.debug:
self.debug = other.debug
if self.fail_fast is None:
@@ -6647,7 +6801,6 @@ class ProjectPipelineConfig(ConfigObject):
def toDict(self):
d = {}
- d['queue_name'] = self.queue_name
return d