diff options
Diffstat (limited to 'zuul/model.py')
-rw-r--r-- | zuul/model.py | 353 |
1 files changed, 277 insertions, 76 deletions
diff --git a/zuul/model.py b/zuul/model.py index e339b1ffa..e526b749c 100644 --- a/zuul/model.py +++ b/zuul/model.py @@ -64,6 +64,7 @@ MERGER_MAP = { 'squash-merge': MERGER_SQUASH_MERGE, 'rebase': MERGER_REBASE, } +ALL_MERGE_MODES = list(MERGER_MAP.values()) PRECEDENCE_NORMAL = 0 PRECEDENCE_LOW = 1 @@ -437,6 +438,8 @@ class Pipeline(object): # reconfigured). A pipeline requires a tenant in order to # reach the currently active layout for that tenant. self.tenant = tenant + self.allow_other_connections = True + self.connections = [] self.source_context = None self.start_mark = None self.description = None @@ -472,6 +475,8 @@ class Pipeline(object): self.window_decrease_factor = None self.state = None self.change_list = None + # Only used by the unit tests for assertions + self._exception_count = 0 @property def queues(self): @@ -615,6 +620,18 @@ class PipelineState(zkobject.ZKObject): _read_only=False, ) + def _lateInitData(self): + # If we're initializing the object on our initial refresh, + # reset the data to this. + return dict( + state=Pipeline.STATE_NORMAL, + queues=[], + old_queues=[], + consecutive_failures=0, + disabled=False, + layout_uuid=self.pipeline.tenant.layout.uuid, + ) + @classmethod def fromZK(klass, context, path, pipeline, **kw): obj = klass() @@ -626,21 +643,23 @@ class PipelineState(zkobject.ZKObject): return obj @classmethod - def create(cls, pipeline, layout_uuid, old_state=None): - # If the object does not exist in ZK, create it with the - # default attributes and the supplied layout UUID. Otherwise, - # return an initialized object (or the old object for reuse) - # without loading any data so that data can be loaded on the - # next refresh. - ctx = pipeline.manager.current_context + def create(cls, pipeline, old_state=None): + # If we are resetting an existing pipeline, we will have an + # old_state, so just clean up the object references there and + # let the next refresh handle updating any data. + if old_state: + old_state._resetObjectRefs() + return old_state + + # Otherwise, we are initializing a pipeline that we haven't + # seen before. It still might exist in ZK, but since we + # haven't seen it, we don't have any object references to + # clean up. We can just start with a clean object, set the + # pipeline reference, and let the next refresh deal with + # whether there might be any data in ZK. state = cls() state._set(pipeline=pipeline) - if state.exists(ctx): - if old_state: - old_state._resetObjectRefs() - return old_state - return state - return cls.new(ctx, pipeline=pipeline, layout_uuid=layout_uuid) + return state def _resetObjectRefs(self): # Update the pipeline references on the queue objects. @@ -707,8 +726,34 @@ class PipelineState(zkobject.ZKObject): # This is so that we can refresh the object in circumstances # where we haven't verified that our local layout matches # what's in ZK. + + # Notably, this need not prevent us from performing the + # initialization below if necessary. The case of the object + # being brand new in ZK supercedes our worry that our old copy + # might be out of date since our old copy is, itself, brand + # new. self._set(_read_only=read_only) - return super().refresh(context) + try: + return super().refresh(context) + except NoNodeError: + # If the object doesn't exist we will receive a + # NoNodeError. This happens because the postConfig call + # creates this object without holding the pipeline lock, + # so it can't determine whether or not it exists in ZK. + # We do hold the pipeline lock here, so if we get this + # error, we know we're initializing the object, and we + # should write it to ZK. + + # Note that typically this code is not used since + # currently other objects end up creating the pipeline + # path in ZK first. It is included in case that ever + # changes. Currently the empty byte-string code path in + # deserialize() is used instead. + context.log.warning("Initializing pipeline state for %s; " + "this is expected only for new pipelines", + self.pipeline.name) + self._set(**self._lateInitData()) + self.internalCreate(context) def deserialize(self, raw, context): # We may have old change objects in the pipeline cache, so @@ -716,6 +761,20 @@ class PipelineState(zkobject.ZKObject): # source change cache. self.pipeline.manager.clearCache() + # If the object doesn't exist we will get back an empty byte + # string. This happens because the postConfig call creates + # this object without holding the pipeline lock, so it can't + # determine whether or not it exists in ZK. We do hold the + # pipeline lock here, so if we get the empty byte string, we + # know we're initializing the object. In that case, we should + # initialize the layout id to the current layout. Nothing + # else needs to be set. + if raw == b'': + context.log.warning("Initializing pipeline state for %s; " + "this is expected only for new pipelines", + self.pipeline.name) + return self._lateInitData() + data = super().deserialize(raw, context) if not self._read_only: @@ -890,11 +949,34 @@ class PipelineChangeList(zkobject.ShardedZKObject): super().__init__() self._set( changes=[], + _change_keys=[], ) - def refresh(self, context): - self._retry(context, super().refresh, - context, max_tries=5) + def refresh(self, context, allow_init=True): + # Set allow_init to false to indicate that we don't hold the + # lock and we should not try to initialize the object in ZK if + # it does not exist. + try: + self._retry(context, super().refresh, + context, max_tries=5) + except NoNodeError: + # If the object doesn't exist we will receive a + # NoNodeError. This happens because the postConfig call + # creates this object without holding the pipeline lock, + # so it can't determine whether or not it exists in ZK. + # We do hold the pipeline lock here, so if we get this + # error, we know we're initializing the object, and + # we should write it to ZK. + if allow_init: + context.log.warning( + "Initializing pipeline change list for %s; " + "this is expected only for new pipelines", + self.pipeline.name) + self.internalCreate(context) + else: + # If we're called from a context where we can't + # initialize the change list, re-raise the exception. + raise def getPath(self): return self.getChangeListPath(self.pipeline) @@ -905,19 +987,14 @@ class PipelineChangeList(zkobject.ShardedZKObject): return pipeline_path + '/change_list' @classmethod - def create(cls, pipeline, old_list=None): - # If the object does not exist in ZK, create it with the - # default attributes. Otherwise, return an initialized object - # (or the old object for reuse) without loading any data so - # that data can be loaded on the next refresh. - ctx = pipeline.manager.current_context + def create(cls, pipeline): + # This object may or may not exist in ZK, but we using any of + # that data here. We can just start with a clean object, set + # the pipeline reference, and let the next refresh deal with + # whether there might be any data in ZK. change_list = cls() change_list._set(pipeline=pipeline) - if change_list.exists(ctx): - if old_list: - return old_list - return change_list - return cls.new(ctx, pipeline=pipeline) + return change_list def serialize(self, context): data = { @@ -925,8 +1002,8 @@ class PipelineChangeList(zkobject.ShardedZKObject): } return json.dumps(data, sort_keys=True).encode("utf8") - def deserialize(self, data, context): - data = super().deserialize(data, context) + def deserialize(self, raw, context): + data = super().deserialize(raw, context) change_keys = [] # We must have a dictionary with a 'changes' key; otherwise we # may be reading immediately after truncating. Allow the @@ -1805,24 +1882,6 @@ class FrozenSecret(ConfigObject): ) -class ProjectContext(ConfigObject): - - def __init__(self, project_canonical_name, project_name): - super().__init__() - self.project_canonical_name = project_canonical_name - self.project_name = project_name - self.branch = None - self.path = None - - def __str__(self): - return self.project_name - - def toDict(self): - return dict( - project=self.project_name, - ) - - class SourceContext(ConfigObject): """A reference to the branch of a project in configuration. @@ -2391,6 +2450,12 @@ class FrozenJob(zkobject.ZKObject): data['_' + job_data_key] = None return data + def _save(self, context, *args, **kw): + # Before saving, update the buildset with the new job version + # so that future readers know to refresh it. + self.buildset.updateJobVersion(context, self) + return super()._save(context, *args, **kw) + def setWaitingStatus(self, status): if self.waiting_status == status: return @@ -3720,6 +3785,27 @@ class BuildReference: self._path = _path +class BuildEvent: + TYPE_PAUSED = "paused" + TYPE_RESUMED = "resumed" + + def __init__(self, event_time, event_type, description=None): + self.event_time = event_time + self.event_type = event_type + self.description = description + + def toDict(self): + return { + "event_time": self.event_time, + "event_type": self.event_type, + "description": self.description, + } + + @classmethod + def fromDict(cls, data): + return cls(data["event_time"], data["event_type"], data["description"]) + + class Build(zkobject.ZKObject): """A Build is an instance of a single execution of a Job. @@ -3760,6 +3846,8 @@ class Build(zkobject.ZKObject): zuul_event_id=None, build_request_ref=None, span_info=None, + # A list of build events like paused, resume, ... + events=[], ) def serialize(self, context): @@ -3779,6 +3867,7 @@ class Build(zkobject.ZKObject): "zuul_event_id": self.zuul_event_id, "build_request_ref": self.build_request_ref, "span_info": self.span_info, + "events": [e.toDict() for e in self.events], } if COMPONENT_REGISTRY.model_api < 5: data["_result_data"] = (self._result_data.getPath() @@ -3801,6 +3890,11 @@ class Build(zkobject.ZKObject): def deserialize(self, raw, context): data = super().deserialize(raw, context) + # Deserialize build events + data["events"] = [ + BuildEvent.fromDict(e) for e in data.get("events", []) + ] + # Result data can change (between a pause and build # completion). @@ -3844,6 +3938,12 @@ class Build(zkobject.ZKObject): def getPath(self): return f"{self.job.getPath()}/build/{self.uuid}" + def _save(self, context, *args, **kw): + # Before saving, update the buildset with the new job version + # so that future readers know to refresh it. + self.job.buildset.updateBuildVersion(context, self) + return super()._save(context, *args, **kw) + def __repr__(self): return ('<Build %s of %s voting:%s>' % (self.uuid, self.job.name, self.job.voting)) @@ -3875,6 +3975,12 @@ class Build(zkobject.ZKObject): data=secret_result_data, _path=self.getPath() + '/secret_result_data') + def addEvent(self, event): + if not self._active_context: + raise Exception( + "addEvent must be used with a context manager") + self.events.append(event) + @property def failed(self): if self.result and self.result not in ['SUCCESS', 'SKIPPED']: @@ -4048,6 +4154,8 @@ class BuildSet(zkobject.ZKObject): job_graph=None, jobs={}, deduplicated_jobs=[], + job_versions={}, + build_versions={}, # Cached job graph of previous layout; not serialized _old_job_graph=None, _old_jobs={}, @@ -4159,6 +4267,8 @@ class BuildSet(zkobject.ZKObject): "configured_time": self.configured_time, "start_time": self.start_time, "repo_state_request_time": self.repo_state_request_time, + "job_versions": self.job_versions, + "build_versions": self.build_versions, # jobs (serialize as separate objects) } return json.dumps(data, sort_keys=True).encode("utf8") @@ -4256,7 +4366,8 @@ class BuildSet(zkobject.ZKObject): if job_name in self.jobs: job = self.jobs[job_name] - if not old_build_exists: + if ((not old_build_exists) or + self.shouldRefreshJob(job)): tpe_jobs.append((None, job_name, tpe.submit(job.refresh, context))) else: @@ -4268,7 +4379,8 @@ class BuildSet(zkobject.ZKObject): build = self.builds.get(job_name) builds[job_name] = build if build and build.getPath() == build_path: - if not build.result: + if ((not build.result) or + self.shouldRefreshBuild(build)): tpe_jobs.append(( None, job_name, tpe.submit( build.refresh, context))) @@ -4323,6 +4435,48 @@ class BuildSet(zkobject.ZKObject): }) return data + def updateBuildVersion(self, context, build): + # It's tempting to update versions regardless of the model + # API, but if we start writing versions before all components + # are upgraded we could get out of sync. + if (COMPONENT_REGISTRY.model_api < 12): + return True + + # It is common for a lot of builds/jobs to be added at once, + # so to avoid writing this buildset object repeatedly during + # that time, we only update the version after the initial + # creation. + version = build.getZKVersion() + # If zstat is None, we created the object + if version is not None: + self.build_versions[build.uuid] = version + 1 + self.updateAttributes(context, build_versions=self.build_versions) + + def updateJobVersion(self, context, job): + if (COMPONENT_REGISTRY.model_api < 12): + return True + + version = job.getZKVersion() + if version is not None: + self.job_versions[job.name] = version + 1 + self.updateAttributes(context, job_versions=self.job_versions) + + def shouldRefreshBuild(self, build): + # Unless all schedulers are updating versions, we can't trust + # the data. + if (COMPONENT_REGISTRY.model_api < 12): + return True + current = build.getZKVersion() + expected = self.build_versions.get(build.uuid, 0) + return expected != current + + def shouldRefreshJob(self, job): + if (COMPONENT_REGISTRY.model_api < 12): + return True + current = job.getZKVersion() + expected = self.job_versions.get(job.name, 0) + return expected != current + @property def ref(self): # NOTE(jamielennox): The concept of buildset ref is to be removed and a @@ -4428,8 +4582,18 @@ class BuildSet(zkobject.ZKObject): with self.activeContext(self.item.pipeline.manager.current_context): self.node_requests[job_name] = request_id - def getJobNodeRequestID(self, job_name): - return self.node_requests.get(job_name) + def getJobNodeRequestID(self, job_name, ignore_deduplicate=False): + r = self.node_requests.get(job_name) + if ignore_deduplicate and isinstance(r, dict): + return None + return r + + def getNodeRequests(self): + # This ignores deduplicated node requests + for job_name, request in self.node_requests.items(): + if isinstance(request, dict): + continue + yield job_name, request def removeJobNodeRequestID(self, job_name): if job_name in self.node_requests: @@ -4502,6 +4666,37 @@ class BuildSet(zkobject.ZKObject): return Attributes(uuid=self.uuid) +class EventInfo: + + def __init__(self): + self.zuul_event_id = None + self.timestamp = time.time() + self.span_context = None + + @classmethod + def fromEvent(cls, event): + tinfo = cls() + tinfo.zuul_event_id = event.zuul_event_id + tinfo.timestamp = event.timestamp + tinfo.span_context = event.span_context + return tinfo + + @classmethod + def fromDict(cls, d): + tinfo = cls() + tinfo.zuul_event_id = d["zuul_event_id"] + tinfo.timestamp = d["timestamp"] + tinfo.span_context = d["span_context"] + return tinfo + + def toDict(self): + return { + "zuul_event_id": self.zuul_event_id, + "timestamp": self.timestamp, + "span_context": self.span_context, + } + + class QueueItem(zkobject.ZKObject): """Represents the position of a Change in a ChangeQueue. @@ -4536,7 +4731,7 @@ class QueueItem(zkobject.ZKObject): live=True, # Whether an item is intended to be processed at all layout_uuid=None, _cached_sql_results={}, - event=None, # The trigger event that lead to this queue item + event=None, # Info about the event that lead to this queue item # Additional container for connection specifig information to be # used by reporters throughout the lifecycle @@ -4558,6 +4753,9 @@ class QueueItem(zkobject.ZKObject): def new(klass, context, **kw): obj = klass() obj._set(**kw) + if COMPONENT_REGISTRY.model_api >= 13: + obj._set(event=obj.event and EventInfo.fromEvent(obj.event)) + data = obj._trySerialize(context) obj._save(context, data, create=True) files_state = (BuildSet.COMPLETE if obj.change.files is not None @@ -4586,10 +4784,18 @@ class QueueItem(zkobject.ZKObject): return (tenant, pipeline, uuid) def serialize(self, context): - if isinstance(self.event, TriggerEvent): - event_type = "TriggerEvent" + if COMPONENT_REGISTRY.model_api < 13: + if isinstance(self.event, TriggerEvent): + event_type = "TriggerEvent" + else: + event_type = self.event.__class__.__name__ else: - event_type = self.event.__class__.__name__ + event_type = "EventInfo" + if not isinstance(self.event, EventInfo): + # Convert our local trigger event to a trigger info + # object. This will only happen on the transition to + # model API version 13. + self._set(event=EventInfo.fromEvent(self.event)) data = { "uuid": self.uuid, @@ -4631,14 +4837,18 @@ class QueueItem(zkobject.ZKObject): # child objects. self._set(uuid=data["uuid"]) - event_type = data["event"]["type"] - if event_type == "TriggerEvent": - event_class = ( - self.pipeline.manager.sched.connections.getTriggerEventClass( - data["event"]["data"]["driver_name"]) - ) + if COMPONENT_REGISTRY.model_api < 13: + event_type = data["event"]["type"] + if event_type == "TriggerEvent": + event_class = ( + self.pipeline.manager.sched.connections + .getTriggerEventClass( + data["event"]["data"]["driver_name"]) + ) + else: + event_class = EventTypeIndex.event_type_mapping.get(event_type) else: - event_class = EventTypeIndex.event_type_mapping.get(event_type) + event_class = EventInfo if event_class is None: raise NotImplementedError( @@ -5860,8 +6070,7 @@ class Bundle: def deserialize(cls, context, queue, items_by_path, data): bundle = cls(data["uuid"]) bundle.items = [ - items_by_path.get(p) or QueueItem.fromZK( - context, p, pipeline=queue.pipeline, queue=queue) + items_by_path.get(p) or QueueItem.fromZK(context, p, queue=queue) for p in data["items"] ] bundle.started_reporting = data["started_reporting"] @@ -6968,6 +7177,7 @@ class TenantProjectConfig(object): self.extra_config_dirs = () # Load config from a different branch if this is a config project self.load_branch = None + self.merge_modes = None def isAlwaysDynamicBranch(self, branch): if self.always_dynamic_branches is None: @@ -6978,24 +7188,15 @@ class TenantProjectConfig(object): def includesBranch(self, branch): if self.include_branches is not None: - included = False for r in self.include_branches: if r.fullmatch(branch): - included = True - break - else: - included = True - if not included: + return True return False - excluded = False if self.exclude_branches is not None: for r in self.exclude_branches: if r.fullmatch(branch): - excluded = True - break - if excluded: - return False + return False return True |