diff options
Diffstat (limited to 'zuul/model.py')
-rw-r--r-- | zuul/model.py | 253 |
1 files changed, 203 insertions, 50 deletions
diff --git a/zuul/model.py b/zuul/model.py index 0d889f557..32b1cfadf 100644 --- a/zuul/model.py +++ b/zuul/model.py @@ -33,6 +33,7 @@ import itertools from kazoo.exceptions import NodeExistsError, NoNodeError from cachetools.func import lru_cache +from opentelemetry import trace from zuul.lib import yamlutil as yaml from zuul.lib.varnames import check_varnames @@ -45,6 +46,7 @@ from zuul.lib.result_data import get_artifacts_from_result_data from zuul.lib.logutil import get_annotated_logger from zuul.lib.capabilities import capabilities_registry from zuul.lib.jsonutil import json_dumps +from zuul.lib import tracing from zuul.zk import zkobject from zuul.zk.blob_store import BlobStore from zuul.zk.change_cache import ChangeKey @@ -329,6 +331,9 @@ class Attributes(object): def __init__(self, **kw): setattr(self, '__dict__', kw) + def toDict(self): + return self.__dict__ + class Freezable(object): """A mix-in class so that an object can be made immutable""" @@ -659,6 +664,13 @@ class PipelineState(zkobject.ZKObject): safe_pipeline = urllib.parse.quote_plus(pipeline.name) return f"/zuul/tenant/{safe_tenant}/pipeline/{safe_pipeline}" + @classmethod + def parsePath(self, path): + """Return path components for use by the REST API""" + root, safe_tenant, pipeline, safe_pipeline = path.rsplit('/', 3) + return (urllib.parse.unquote_plus(safe_tenant), + urllib.parse.unquote_plus(safe_pipeline)) + def _dirtyPath(self): return f'{self.getPath()}/dirty' @@ -1087,13 +1099,16 @@ class ChangeQueue(zkobject.ZKObject): def matches(self, project_cname, branch): return (project_cname, branch) in self.project_branches - def enqueueChange(self, change, event): + def enqueueChange(self, change, event, span_info=None, enqueue_time=None): + if enqueue_time is None: + enqueue_time = time.time() item = QueueItem.new(self.zk_context, queue=self, pipeline=self.pipeline, change=change, event=event, - enqueue_time=time.time()) + span_info=span_info, + enqueue_time=enqueue_time) self.enqueueItem(item) return item @@ -1383,6 +1398,7 @@ class NodeSet(ConfigObject): self.name = name or '' self.nodes = OrderedDict() self.groups = OrderedDict() + self.alternatives = [] def __ne__(self, other): return not self.__eq__(other) @@ -1391,7 +1407,9 @@ class NodeSet(ConfigObject): if not isinstance(other, NodeSet): return False return (self.name == other.name and - self.nodes == other.nodes) + self.nodes == other.nodes and + self.groups == other.groups and + self.alternatives == other.alternatives) def toDict(self): d = {} @@ -1402,6 +1420,12 @@ class NodeSet(ConfigObject): d['groups'] = [] for group in self.groups.values(): d['groups'].append(group.toDict()) + d['alternatives'] = [] + for alt in self.alternatives: + if isinstance(alt, NodeSet): + d['alternatives'].append(alt.toDict()) + else: + d['alternatives'].append(alt) return d @classmethod @@ -1411,6 +1435,12 @@ class NodeSet(ConfigObject): nodeset.addNode(Node.fromDict(node)) for group in data["groups"]: nodeset.addGroup(Group.fromDict(group)) + for alt in data.get('alternatives', []): + if isinstance(alt, str): + if isinstance(alt, str): + nodeset.addAlternative(alt) + else: + nodeset.addAlternative(NodeSet.fromDict(alt)) return nodeset def copy(self): @@ -1419,6 +1449,11 @@ class NodeSet(ConfigObject): n.addNode(Node(node.name, node.label)) for name, group in self.groups.items(): n.addGroup(Group(group.name, group.nodes[:])) + for alt in self.alternatives: + if isinstance(alt, str): + n.addAlternative(alt) + else: + n.addAlternative(alt.copy()) return n def addNode(self, node): @@ -1438,6 +1473,36 @@ class NodeSet(ConfigObject): def getGroups(self): return list(self.groups.values()) + def addAlternative(self, alt): + self.alternatives.append(alt) + + def flattenAlternatives(self, layout): + alts = [] + history = [] + self._flattenAlternatives(layout, self, alts, history) + return alts + + def _flattenAlternatives(self, layout, nodeset, + alternatives, history): + if isinstance(nodeset, str): + # This references an existing named nodeset in the layout. + ns = layout.nodesets.get(nodeset) + if ns is None: + raise Exception(f'The nodeset "{nodeset}" was not found.') + else: + ns = nodeset + if ns in history: + raise Exception(f'Nodeset cycle detected on "{nodeset}"') + history.append(ns) + if ns.alternatives: + for alt in ns.alternatives: + self._flattenAlternatives(layout, alt, alternatives, history) + else: + alternatives.append(ns) + + def validateReferences(self, layout): + self.flattenAlternatives(layout) + def __repr__(self): if self.name: name = self.name + ' ' @@ -1454,7 +1519,7 @@ class NodeRequest(object): def __init__(self, requestor, build_set_uuid, tenant_name, pipeline_name, job_name, labels, provider, relative_priority, - event_id=None): + event_id=None, span_info=None): self.requestor = requestor self.build_set_uuid = build_set_uuid self.tenant_name = tenant_name @@ -1472,6 +1537,7 @@ class NodeRequest(object): self.id = None self._zk_data = {} # Data that we read back from ZK self.event_id = event_id + self.span_info = span_info # Zuul internal flags (not stored in ZK so they are not # overwritten). self.failed = False @@ -1522,6 +1588,7 @@ class NodeRequest(object): "tenant_name": self.tenant_name, "pipeline_name": self.pipeline_name, "job_name": self.job_name, + "span_info": self.span_info, } d.setdefault('node_types', self.labels) d.setdefault('requestor', self.requestor) @@ -1570,6 +1637,7 @@ class NodeRequest(object): labels=data["node_types"], provider=data["provider"], relative_priority=data.get("relative_priority", 0), + span_info=requestor_data.get("span_info"), ) request.updateFromDict(data) @@ -2038,7 +2106,8 @@ class FrozenJob(zkobject.ZKObject): 'dependencies', 'inheritance_path', 'name', - 'nodeset', + 'nodeset_alternatives', + 'nodeset_index', 'override_branch', 'override_checkout', 'post_timeout', @@ -2149,8 +2218,8 @@ class FrozenJob(zkobject.ZKObject): if not hasattr(self, k): continue v = getattr(self, k) - if k == 'nodeset': - v = v.toDict() + if k == 'nodeset_alternatives': + v = [alt.toDict() for alt in v] elif k == 'dependencies': # frozenset of JobDependency v = [dep.toDict() for dep in v] @@ -2173,6 +2242,9 @@ class FrozenJob(zkobject.ZKObject): v = {'storage': 'local', 'data': v} data[k] = v + if (COMPONENT_REGISTRY.model_api < 9): + data['nodeset'] = data['nodeset_alternatives'][0] + # Use json_dumps to strip any ZuulMark entries return json_dumps(data, sort_keys=True).encode("utf8") @@ -2183,13 +2255,18 @@ class FrozenJob(zkobject.ZKObject): if 'deduplicate' not in data: data['deduplicate'] = 'auto' - if hasattr(self, 'nodeset'): - nodeset = self.nodeset + # MODEL_API < 9 + if data.get('nodeset'): + data['nodeset_alternatives'] = [data['nodeset']] + data['nodeset_index'] = 0 + del data['nodeset'] + + if hasattr(self, 'nodeset_alternatives'): + alts = self.nodeset_alternatives else: - nodeset = data.get('nodeset') - if nodeset: - nodeset = NodeSet.fromDict(nodeset) - data['nodeset'] = nodeset + alts = data.get('nodeset_alternatives', []) + alts = [NodeSet.fromDict(alt) for alt in alts] + data['nodeset_alternatives'] = alts if hasattr(self, 'dependencies'): data['dependencies'] = self.dependencies @@ -2250,6 +2327,12 @@ class FrozenJob(zkobject.ZKObject): return val @property + def nodeset(self): + if self.nodeset_alternatives: + return self.nodeset_alternatives[self.nodeset_index] + return None + + @property def parent_data(self): return self._getJobData('_parent_data') @@ -2459,12 +2542,11 @@ class Job(ConfigObject): d['parent'] = self.parent else: d['parent'] = tenant.default_base_job - if isinstance(self.nodeset, str): - ns = tenant.layout.nodesets.get(self.nodeset) - else: - ns = self.nodeset - if ns: - d['nodeset'] = ns.toDict() + alts = self.flattenNodesetAlternatives(tenant.layout) + if len(alts) == 1 and len(alts[0]): + d['nodeset'] = alts[0].toDict() + elif len(alts) > 1: + d['nodeset_alternatives'] = [x.toDict() for x in alts] if self.ansible_version: d['ansible_version'] = self.ansible_version else: @@ -2629,6 +2711,17 @@ class Job(ConfigObject): secrets.append(secret_value) playbook['secrets'][secret_key] = len(secrets) - 1 + def flattenNodesetAlternatives(self, layout): + nodeset = self.nodeset + if isinstance(nodeset, str): + # This references an existing named nodeset in the layout. + ns = layout.nodesets.get(nodeset) + if ns is None: + raise Exception(f'The nodeset "{nodeset}" was not found.') + else: + ns = nodeset + return ns.flattenAlternatives(layout) + def freezeJob(self, context, tenant, layout, item, redact_secrets_and_keys): buildset = item.current_build_set @@ -2640,6 +2733,9 @@ class Job(ConfigObject): attributes.discard('secrets') attributes.discard('affected_projects') attributes.discard('config_hash') + # Nodeset alternatives are flattened at this point + attributes.discard('nodeset_alternatives') + attributes.discard('nodeset_index') secrets = [] for k in attributes: # If this is a config object, it's frozen, so it's @@ -2663,6 +2759,8 @@ class Job(ConfigObject): for pb in v: self._deduplicateSecrets(context, secrets, pb) kw[k] = v + kw['nodeset_alternatives'] = self.flattenNodesetAlternatives(layout) + kw['nodeset_index'] = 0 kw['secrets'] = secrets kw['affected_projects'] = self._getAffectedProjects(tenant) kw['config_hash'] = self.getConfigHash(tenant) @@ -2735,7 +2833,7 @@ class Job(ConfigObject): if self._get('cleanup_run') is not None: self.cleanup_run = self.freezePlaybooks(self.cleanup_run, layout) - def getNodeSet(self, layout): + def getNodeset(self, layout): if isinstance(self.nodeset, str): # This references an existing named nodeset in the layout. ns = layout.nodesets.get(self.nodeset) @@ -2752,14 +2850,14 @@ class Job(ConfigObject): if not self.isBase() and self.parent: layout.getJob(self.parent) - ns = self.getNodeSet(layout) - if layout.tenant.max_nodes_per_job != -1 and \ - len(ns) > layout.tenant.max_nodes_per_job: - raise Exception( - 'The job "{job}" exceeds tenant ' - 'max-nodes-per-job {maxnodes}.'.format( - job=self.name, - maxnodes=layout.tenant.max_nodes_per_job)) + for ns in self.flattenNodesetAlternatives(layout): + if layout.tenant.max_nodes_per_job != -1 and \ + len(ns) > layout.tenant.max_nodes_per_job: + raise Exception( + 'The job "{job}" exceeds tenant ' + 'max-nodes-per-job {maxnodes}.'.format( + job=self.name, + maxnodes=layout.tenant.max_nodes_per_job)) for dependency in self.dependencies: layout.getJob(dependency.name) @@ -2956,7 +3054,7 @@ class Job(ConfigObject): self.addRoles(other.roles) # Freeze the nodeset - self.nodeset = self.getNodeSet(layout) + self.nodeset = self.getNodeset(layout) # Pass secrets to parents secrets_for_parents = [s for s in other.secrets if s.pass_to_parent] @@ -3297,7 +3395,8 @@ class JobRequest: # This object participates in transactions, and therefore must # remain small and unsharded. - def __init__(self, uuid, precedence=None, state=None, result_path=None): + def __init__(self, uuid, precedence=None, state=None, result_path=None, + span_context=None): self.uuid = uuid if precedence is None: self.precedence = 0 @@ -3310,6 +3409,12 @@ class JobRequest: self.state = state # Path to the future result if requested self.result_path = result_path + # Reference to the parent span + if span_context: + self.span_context = span_context + else: + span = trace.get_current_span() + self.span_context = tracing.getSpanContext(span) # ZK related data not serialized self.path = None @@ -3322,12 +3427,14 @@ class JobRequest: "state": self.state, "precedence": self.precedence, "result_path": self.result_path, + "span_context": self.span_context, } def updateFromDict(self, data): self.precedence = data["precedence"] self.state = data["state"] self.result_path = data["result_path"] + self.span_context = data.get("span_context") @classmethod def fromDict(cls, data): @@ -3335,7 +3442,8 @@ class JobRequest: data["uuid"], precedence=data["precedence"], state=data["state"], - result_path=data["result_path"] + result_path=data["result_path"], + span_context=data.get("span_context"), ) def __lt__(self, other): @@ -3375,13 +3483,14 @@ class MergeRequest(JobRequest): def __init__(self, uuid, job_type, build_set_uuid, tenant_name, pipeline_name, event_id, precedence=None, state=None, - result_path=None): - super().__init__(uuid, precedence, state, result_path) + result_path=None, span_context=None, span_info=None): + super().__init__(uuid, precedence, state, result_path, span_context) self.job_type = job_type self.build_set_uuid = build_set_uuid self.tenant_name = tenant_name self.pipeline_name = pipeline_name self.event_id = event_id + self.span_info = span_info def toDict(self): d = super().toDict() @@ -3391,6 +3500,7 @@ class MergeRequest(JobRequest): "tenant_name": self.tenant_name, "pipeline_name": self.pipeline_name, "event_id": self.event_id, + "span_info": self.span_info, }) return d @@ -3405,7 +3515,9 @@ class MergeRequest(JobRequest): data["event_id"], precedence=data["precedence"], state=data["state"], - result_path=data["result_path"] + result_path=data["result_path"], + span_context=data.get("span_context"), + span_info=data.get("span_info"), ) def __repr__(self): @@ -3425,8 +3537,8 @@ class BuildRequest(JobRequest): def __init__(self, uuid, zone, build_set_uuid, job_name, tenant_name, pipeline_name, event_id, precedence=None, state=None, - result_path=None): - super().__init__(uuid, precedence, state, result_path) + result_path=None, span_context=None): + super().__init__(uuid, precedence, state, result_path, span_context) self.zone = zone self.build_set_uuid = build_set_uuid self.job_name = job_name @@ -3463,7 +3575,8 @@ class BuildRequest(JobRequest): data["event_id"], precedence=data["precedence"], state=data["state"], - result_path=data["result_path"] + result_path=data["result_path"], + span_context=data.get("span_context"), ) request.worker_info = data["worker_info"] @@ -3521,6 +3634,7 @@ class Build(zkobject.ZKObject): held=False, zuul_event_id=None, build_request_ref=None, + span_info=None, ) def serialize(self, context): @@ -3539,6 +3653,7 @@ class Build(zkobject.ZKObject): "held": self.held, "zuul_event_id": self.zuul_event_id, "build_request_ref": self.build_request_ref, + "span_info": self.span_info, } if COMPONENT_REGISTRY.model_api < 5: data["_result_data"] = (self._result_data.getPath() @@ -3795,6 +3910,7 @@ class BuildSet(zkobject.ZKObject): tries={}, files_state=self.NEW, repo_state_state=self.NEW, + span_info=None, configured=False, configured_time=None, # When setConfigured was called start_time=None, # When the buildset reported start @@ -3871,6 +3987,13 @@ class BuildSet(zkobject.ZKObject): def getPath(self): return f"{self.item.getPath()}/buildset/{self.uuid}" + @classmethod + def parsePath(self, path): + """Return path components for use by the REST API""" + item_path, bs, uuid = path.rsplit('/', 2) + tenant, pipeline, item_uuid = QueueItem.parsePath(item_path) + return (tenant, pipeline, item_uuid, uuid) + def serialize(self, context): data = { # "item": self.item, @@ -3903,6 +4026,7 @@ class BuildSet(zkobject.ZKObject): "fail_fast": self.fail_fast, "job_graph": (self.job_graph.toDict() if self.job_graph else None), + "span_info": self.span_info, "configured_time": self.configured_time, "start_time": self.start_time, "repo_state_request_time": self.repo_state_request_time, @@ -4242,6 +4366,7 @@ class QueueItem(zkobject.ZKObject): current_build_set=None, item_ahead=None, items_behind=[], + span_info=None, enqueue_time=None, report_time=None, dequeue_time=None, @@ -4273,8 +4398,12 @@ class QueueItem(zkobject.ZKObject): obj._save(context, data, create=True) files_state = (BuildSet.COMPLETE if obj.change.files is not None else BuildSet.NEW) - obj.updateAttributes(context, current_build_set=BuildSet.new( - context, item=obj, files_state=files_state)) + + with trace.use_span(tracing.restoreSpan(obj.span_info)): + buildset_span_info = tracing.startSavedSpan("BuildSet") + obj.updateAttributes(context, current_build_set=BuildSet.new( + context, item=obj, files_state=files_state, + span_info=buildset_span_info)) return obj def getPath(self): @@ -4285,6 +4414,13 @@ class QueueItem(zkobject.ZKObject): def itemPath(cls, pipeline_path, item_uuid): return f"{pipeline_path}/item/{item_uuid}" + @classmethod + def parsePath(self, path): + """Return path components for use by the REST API""" + pipeline_path, item, uuid = path.rsplit('/', 2) + tenant, pipeline = PipelineState.parsePath(pipeline_path) + return (tenant, pipeline, uuid) + def serialize(self, context): if isinstance(self.event, TriggerEvent): event_type = "TriggerEvent" @@ -4304,6 +4440,7 @@ class QueueItem(zkobject.ZKObject): self.current_build_set.getPath()), "item_ahead": self.item_ahead and self.item_ahead.getPath(), "items_behind": [i.getPath() for i in self.items_behind], + "span_info": self.span_info, "enqueue_time": self.enqueue_time, "report_time": self.report_time, "dequeue_time": self.dequeue_time, @@ -4383,11 +4520,21 @@ class QueueItem(zkobject.ZKObject): old_build_set = self.current_build_set files_state = (BuildSet.COMPLETE if self.change.files is not None else BuildSet.NEW) - self.updateAttributes( - context, - current_build_set=BuildSet.new(context, item=self, - files_state=files_state), - layout_uuid=None) + + with trace.use_span(tracing.restoreSpan(self.span_info)): + old_buildset_span = tracing.restoreSpan(old_build_set.span_info) + link = trace.Link( + old_buildset_span.get_span_context(), + attributes={"previous_buildset": old_build_set.uuid}) + buildset_span_info = tracing.startSavedSpan( + "BuildSet", links=[link]) + + self.updateAttributes( + context, + current_build_set=BuildSet.new(context, item=self, + files_state=files_state, + span_info=buildset_span_info), + layout_uuid=None) old_build_set.delete(context) def addBuild(self, build): @@ -6262,7 +6409,7 @@ class MergeCompletedEvent(ResultEvent): def __init__(self, request_uuid, build_set_uuid, merged, updated, commit, files, repo_state, item_in_branches, - errors, elapsed_time): + errors, elapsed_time, span_info=None): self.request_uuid = request_uuid self.build_set_uuid = build_set_uuid self.merged = merged @@ -6273,6 +6420,7 @@ class MergeCompletedEvent(ResultEvent): self.item_in_branches = item_in_branches or [] self.errors = errors or [] self.elapsed_time = elapsed_time + self.span_info = span_info def __repr__(self): return ('<MergeCompletedEvent job: %s buildset: %s merged: %s ' @@ -6293,6 +6441,7 @@ class MergeCompletedEvent(ResultEvent): "item_in_branches": list(self.item_in_branches), "errors": list(self.errors), "elapsed_time": self.elapsed_time, + "span_info": self.span_info, } @classmethod @@ -6308,6 +6457,7 @@ class MergeCompletedEvent(ResultEvent): list(data.get("item_in_branches", [])), list(data.get("errors", [])), data.get("elapsed_time"), + data.get("span_info"), ) @@ -6319,24 +6469,31 @@ class FilesChangesCompletedEvent(ResultEvent): :arg float elapsed_time: Elapsed time of merge op in seconds. """ - def __init__(self, build_set_uuid, files, elapsed_time): + def __init__(self, request_uuid, build_set_uuid, files, elapsed_time, + span_info=None): + self.request_uuid = request_uuid self.build_set_uuid = build_set_uuid self.files = files or [] self.elapsed_time = elapsed_time + self.span_info = span_info def toDict(self): return { + "request_uuid": self.request_uuid, "build_set_uuid": self.build_set_uuid, "files": list(self.files), "elapsed_time": self.elapsed_time, + "span_info": self.span_info, } @classmethod def fromDict(cls, data): return cls( + data.get("request_uuid"), data.get("build_set_uuid"), list(data.get("files", [])), data.get("elapsed_time"), + data.get("span_info"), ) @@ -6619,7 +6776,6 @@ class ProjectPipelineConfig(ConfigObject): def __init__(self): super(ProjectPipelineConfig, self).__init__() self.job_list = JobList() - self.queue_name = None self.debug = False self.debug_messages = [] self.fail_fast = None @@ -6631,8 +6787,6 @@ class ProjectPipelineConfig(ConfigObject): def update(self, other): if not isinstance(other, ProjectPipelineConfig): raise Exception("Unable to update from %s" % (other,)) - if self.queue_name is None: - self.queue_name = other.queue_name if other.debug: self.debug = other.debug if self.fail_fast is None: @@ -6647,7 +6801,6 @@ class ProjectPipelineConfig(ConfigObject): def toDict(self): d = {} - d['queue_name'] = self.queue_name return d |