summaryrefslogtreecommitdiff
path: root/zuul/model.py
diff options
context:
space:
mode:
Diffstat (limited to 'zuul/model.py')
-rw-r--r--zuul/model.py353
1 files changed, 277 insertions, 76 deletions
diff --git a/zuul/model.py b/zuul/model.py
index e339b1ffa..e526b749c 100644
--- a/zuul/model.py
+++ b/zuul/model.py
@@ -64,6 +64,7 @@ MERGER_MAP = {
'squash-merge': MERGER_SQUASH_MERGE,
'rebase': MERGER_REBASE,
}
+ALL_MERGE_MODES = list(MERGER_MAP.values())
PRECEDENCE_NORMAL = 0
PRECEDENCE_LOW = 1
@@ -437,6 +438,8 @@ class Pipeline(object):
# reconfigured). A pipeline requires a tenant in order to
# reach the currently active layout for that tenant.
self.tenant = tenant
+ self.allow_other_connections = True
+ self.connections = []
self.source_context = None
self.start_mark = None
self.description = None
@@ -472,6 +475,8 @@ class Pipeline(object):
self.window_decrease_factor = None
self.state = None
self.change_list = None
+ # Only used by the unit tests for assertions
+ self._exception_count = 0
@property
def queues(self):
@@ -615,6 +620,18 @@ class PipelineState(zkobject.ZKObject):
_read_only=False,
)
+ def _lateInitData(self):
+ # If we're initializing the object on our initial refresh,
+ # reset the data to this.
+ return dict(
+ state=Pipeline.STATE_NORMAL,
+ queues=[],
+ old_queues=[],
+ consecutive_failures=0,
+ disabled=False,
+ layout_uuid=self.pipeline.tenant.layout.uuid,
+ )
+
@classmethod
def fromZK(klass, context, path, pipeline, **kw):
obj = klass()
@@ -626,21 +643,23 @@ class PipelineState(zkobject.ZKObject):
return obj
@classmethod
- def create(cls, pipeline, layout_uuid, old_state=None):
- # If the object does not exist in ZK, create it with the
- # default attributes and the supplied layout UUID. Otherwise,
- # return an initialized object (or the old object for reuse)
- # without loading any data so that data can be loaded on the
- # next refresh.
- ctx = pipeline.manager.current_context
+ def create(cls, pipeline, old_state=None):
+ # If we are resetting an existing pipeline, we will have an
+ # old_state, so just clean up the object references there and
+ # let the next refresh handle updating any data.
+ if old_state:
+ old_state._resetObjectRefs()
+ return old_state
+
+ # Otherwise, we are initializing a pipeline that we haven't
+ # seen before. It still might exist in ZK, but since we
+ # haven't seen it, we don't have any object references to
+ # clean up. We can just start with a clean object, set the
+ # pipeline reference, and let the next refresh deal with
+ # whether there might be any data in ZK.
state = cls()
state._set(pipeline=pipeline)
- if state.exists(ctx):
- if old_state:
- old_state._resetObjectRefs()
- return old_state
- return state
- return cls.new(ctx, pipeline=pipeline, layout_uuid=layout_uuid)
+ return state
def _resetObjectRefs(self):
# Update the pipeline references on the queue objects.
@@ -707,8 +726,34 @@ class PipelineState(zkobject.ZKObject):
# This is so that we can refresh the object in circumstances
# where we haven't verified that our local layout matches
# what's in ZK.
+
+ # Notably, this need not prevent us from performing the
+ # initialization below if necessary. The case of the object
+ # being brand new in ZK supercedes our worry that our old copy
+ # might be out of date since our old copy is, itself, brand
+ # new.
self._set(_read_only=read_only)
- return super().refresh(context)
+ try:
+ return super().refresh(context)
+ except NoNodeError:
+ # If the object doesn't exist we will receive a
+ # NoNodeError. This happens because the postConfig call
+ # creates this object without holding the pipeline lock,
+ # so it can't determine whether or not it exists in ZK.
+ # We do hold the pipeline lock here, so if we get this
+ # error, we know we're initializing the object, and we
+ # should write it to ZK.
+
+ # Note that typically this code is not used since
+ # currently other objects end up creating the pipeline
+ # path in ZK first. It is included in case that ever
+ # changes. Currently the empty byte-string code path in
+ # deserialize() is used instead.
+ context.log.warning("Initializing pipeline state for %s; "
+ "this is expected only for new pipelines",
+ self.pipeline.name)
+ self._set(**self._lateInitData())
+ self.internalCreate(context)
def deserialize(self, raw, context):
# We may have old change objects in the pipeline cache, so
@@ -716,6 +761,20 @@ class PipelineState(zkobject.ZKObject):
# source change cache.
self.pipeline.manager.clearCache()
+ # If the object doesn't exist we will get back an empty byte
+ # string. This happens because the postConfig call creates
+ # this object without holding the pipeline lock, so it can't
+ # determine whether or not it exists in ZK. We do hold the
+ # pipeline lock here, so if we get the empty byte string, we
+ # know we're initializing the object. In that case, we should
+ # initialize the layout id to the current layout. Nothing
+ # else needs to be set.
+ if raw == b'':
+ context.log.warning("Initializing pipeline state for %s; "
+ "this is expected only for new pipelines",
+ self.pipeline.name)
+ return self._lateInitData()
+
data = super().deserialize(raw, context)
if not self._read_only:
@@ -890,11 +949,34 @@ class PipelineChangeList(zkobject.ShardedZKObject):
super().__init__()
self._set(
changes=[],
+ _change_keys=[],
)
- def refresh(self, context):
- self._retry(context, super().refresh,
- context, max_tries=5)
+ def refresh(self, context, allow_init=True):
+ # Set allow_init to false to indicate that we don't hold the
+ # lock and we should not try to initialize the object in ZK if
+ # it does not exist.
+ try:
+ self._retry(context, super().refresh,
+ context, max_tries=5)
+ except NoNodeError:
+ # If the object doesn't exist we will receive a
+ # NoNodeError. This happens because the postConfig call
+ # creates this object without holding the pipeline lock,
+ # so it can't determine whether or not it exists in ZK.
+ # We do hold the pipeline lock here, so if we get this
+ # error, we know we're initializing the object, and
+ # we should write it to ZK.
+ if allow_init:
+ context.log.warning(
+ "Initializing pipeline change list for %s; "
+ "this is expected only for new pipelines",
+ self.pipeline.name)
+ self.internalCreate(context)
+ else:
+ # If we're called from a context where we can't
+ # initialize the change list, re-raise the exception.
+ raise
def getPath(self):
return self.getChangeListPath(self.pipeline)
@@ -905,19 +987,14 @@ class PipelineChangeList(zkobject.ShardedZKObject):
return pipeline_path + '/change_list'
@classmethod
- def create(cls, pipeline, old_list=None):
- # If the object does not exist in ZK, create it with the
- # default attributes. Otherwise, return an initialized object
- # (or the old object for reuse) without loading any data so
- # that data can be loaded on the next refresh.
- ctx = pipeline.manager.current_context
+ def create(cls, pipeline):
+ # This object may or may not exist in ZK, but we using any of
+ # that data here. We can just start with a clean object, set
+ # the pipeline reference, and let the next refresh deal with
+ # whether there might be any data in ZK.
change_list = cls()
change_list._set(pipeline=pipeline)
- if change_list.exists(ctx):
- if old_list:
- return old_list
- return change_list
- return cls.new(ctx, pipeline=pipeline)
+ return change_list
def serialize(self, context):
data = {
@@ -925,8 +1002,8 @@ class PipelineChangeList(zkobject.ShardedZKObject):
}
return json.dumps(data, sort_keys=True).encode("utf8")
- def deserialize(self, data, context):
- data = super().deserialize(data, context)
+ def deserialize(self, raw, context):
+ data = super().deserialize(raw, context)
change_keys = []
# We must have a dictionary with a 'changes' key; otherwise we
# may be reading immediately after truncating. Allow the
@@ -1805,24 +1882,6 @@ class FrozenSecret(ConfigObject):
)
-class ProjectContext(ConfigObject):
-
- def __init__(self, project_canonical_name, project_name):
- super().__init__()
- self.project_canonical_name = project_canonical_name
- self.project_name = project_name
- self.branch = None
- self.path = None
-
- def __str__(self):
- return self.project_name
-
- def toDict(self):
- return dict(
- project=self.project_name,
- )
-
-
class SourceContext(ConfigObject):
"""A reference to the branch of a project in configuration.
@@ -2391,6 +2450,12 @@ class FrozenJob(zkobject.ZKObject):
data['_' + job_data_key] = None
return data
+ def _save(self, context, *args, **kw):
+ # Before saving, update the buildset with the new job version
+ # so that future readers know to refresh it.
+ self.buildset.updateJobVersion(context, self)
+ return super()._save(context, *args, **kw)
+
def setWaitingStatus(self, status):
if self.waiting_status == status:
return
@@ -3720,6 +3785,27 @@ class BuildReference:
self._path = _path
+class BuildEvent:
+ TYPE_PAUSED = "paused"
+ TYPE_RESUMED = "resumed"
+
+ def __init__(self, event_time, event_type, description=None):
+ self.event_time = event_time
+ self.event_type = event_type
+ self.description = description
+
+ def toDict(self):
+ return {
+ "event_time": self.event_time,
+ "event_type": self.event_type,
+ "description": self.description,
+ }
+
+ @classmethod
+ def fromDict(cls, data):
+ return cls(data["event_time"], data["event_type"], data["description"])
+
+
class Build(zkobject.ZKObject):
"""A Build is an instance of a single execution of a Job.
@@ -3760,6 +3846,8 @@ class Build(zkobject.ZKObject):
zuul_event_id=None,
build_request_ref=None,
span_info=None,
+ # A list of build events like paused, resume, ...
+ events=[],
)
def serialize(self, context):
@@ -3779,6 +3867,7 @@ class Build(zkobject.ZKObject):
"zuul_event_id": self.zuul_event_id,
"build_request_ref": self.build_request_ref,
"span_info": self.span_info,
+ "events": [e.toDict() for e in self.events],
}
if COMPONENT_REGISTRY.model_api < 5:
data["_result_data"] = (self._result_data.getPath()
@@ -3801,6 +3890,11 @@ class Build(zkobject.ZKObject):
def deserialize(self, raw, context):
data = super().deserialize(raw, context)
+ # Deserialize build events
+ data["events"] = [
+ BuildEvent.fromDict(e) for e in data.get("events", [])
+ ]
+
# Result data can change (between a pause and build
# completion).
@@ -3844,6 +3938,12 @@ class Build(zkobject.ZKObject):
def getPath(self):
return f"{self.job.getPath()}/build/{self.uuid}"
+ def _save(self, context, *args, **kw):
+ # Before saving, update the buildset with the new job version
+ # so that future readers know to refresh it.
+ self.job.buildset.updateBuildVersion(context, self)
+ return super()._save(context, *args, **kw)
+
def __repr__(self):
return ('<Build %s of %s voting:%s>' %
(self.uuid, self.job.name, self.job.voting))
@@ -3875,6 +3975,12 @@ class Build(zkobject.ZKObject):
data=secret_result_data,
_path=self.getPath() + '/secret_result_data')
+ def addEvent(self, event):
+ if not self._active_context:
+ raise Exception(
+ "addEvent must be used with a context manager")
+ self.events.append(event)
+
@property
def failed(self):
if self.result and self.result not in ['SUCCESS', 'SKIPPED']:
@@ -4048,6 +4154,8 @@ class BuildSet(zkobject.ZKObject):
job_graph=None,
jobs={},
deduplicated_jobs=[],
+ job_versions={},
+ build_versions={},
# Cached job graph of previous layout; not serialized
_old_job_graph=None,
_old_jobs={},
@@ -4159,6 +4267,8 @@ class BuildSet(zkobject.ZKObject):
"configured_time": self.configured_time,
"start_time": self.start_time,
"repo_state_request_time": self.repo_state_request_time,
+ "job_versions": self.job_versions,
+ "build_versions": self.build_versions,
# jobs (serialize as separate objects)
}
return json.dumps(data, sort_keys=True).encode("utf8")
@@ -4256,7 +4366,8 @@ class BuildSet(zkobject.ZKObject):
if job_name in self.jobs:
job = self.jobs[job_name]
- if not old_build_exists:
+ if ((not old_build_exists) or
+ self.shouldRefreshJob(job)):
tpe_jobs.append((None, job_name,
tpe.submit(job.refresh, context)))
else:
@@ -4268,7 +4379,8 @@ class BuildSet(zkobject.ZKObject):
build = self.builds.get(job_name)
builds[job_name] = build
if build and build.getPath() == build_path:
- if not build.result:
+ if ((not build.result) or
+ self.shouldRefreshBuild(build)):
tpe_jobs.append((
None, job_name, tpe.submit(
build.refresh, context)))
@@ -4323,6 +4435,48 @@ class BuildSet(zkobject.ZKObject):
})
return data
+ def updateBuildVersion(self, context, build):
+ # It's tempting to update versions regardless of the model
+ # API, but if we start writing versions before all components
+ # are upgraded we could get out of sync.
+ if (COMPONENT_REGISTRY.model_api < 12):
+ return True
+
+ # It is common for a lot of builds/jobs to be added at once,
+ # so to avoid writing this buildset object repeatedly during
+ # that time, we only update the version after the initial
+ # creation.
+ version = build.getZKVersion()
+ # If zstat is None, we created the object
+ if version is not None:
+ self.build_versions[build.uuid] = version + 1
+ self.updateAttributes(context, build_versions=self.build_versions)
+
+ def updateJobVersion(self, context, job):
+ if (COMPONENT_REGISTRY.model_api < 12):
+ return True
+
+ version = job.getZKVersion()
+ if version is not None:
+ self.job_versions[job.name] = version + 1
+ self.updateAttributes(context, job_versions=self.job_versions)
+
+ def shouldRefreshBuild(self, build):
+ # Unless all schedulers are updating versions, we can't trust
+ # the data.
+ if (COMPONENT_REGISTRY.model_api < 12):
+ return True
+ current = build.getZKVersion()
+ expected = self.build_versions.get(build.uuid, 0)
+ return expected != current
+
+ def shouldRefreshJob(self, job):
+ if (COMPONENT_REGISTRY.model_api < 12):
+ return True
+ current = job.getZKVersion()
+ expected = self.job_versions.get(job.name, 0)
+ return expected != current
+
@property
def ref(self):
# NOTE(jamielennox): The concept of buildset ref is to be removed and a
@@ -4428,8 +4582,18 @@ class BuildSet(zkobject.ZKObject):
with self.activeContext(self.item.pipeline.manager.current_context):
self.node_requests[job_name] = request_id
- def getJobNodeRequestID(self, job_name):
- return self.node_requests.get(job_name)
+ def getJobNodeRequestID(self, job_name, ignore_deduplicate=False):
+ r = self.node_requests.get(job_name)
+ if ignore_deduplicate and isinstance(r, dict):
+ return None
+ return r
+
+ def getNodeRequests(self):
+ # This ignores deduplicated node requests
+ for job_name, request in self.node_requests.items():
+ if isinstance(request, dict):
+ continue
+ yield job_name, request
def removeJobNodeRequestID(self, job_name):
if job_name in self.node_requests:
@@ -4502,6 +4666,37 @@ class BuildSet(zkobject.ZKObject):
return Attributes(uuid=self.uuid)
+class EventInfo:
+
+ def __init__(self):
+ self.zuul_event_id = None
+ self.timestamp = time.time()
+ self.span_context = None
+
+ @classmethod
+ def fromEvent(cls, event):
+ tinfo = cls()
+ tinfo.zuul_event_id = event.zuul_event_id
+ tinfo.timestamp = event.timestamp
+ tinfo.span_context = event.span_context
+ return tinfo
+
+ @classmethod
+ def fromDict(cls, d):
+ tinfo = cls()
+ tinfo.zuul_event_id = d["zuul_event_id"]
+ tinfo.timestamp = d["timestamp"]
+ tinfo.span_context = d["span_context"]
+ return tinfo
+
+ def toDict(self):
+ return {
+ "zuul_event_id": self.zuul_event_id,
+ "timestamp": self.timestamp,
+ "span_context": self.span_context,
+ }
+
+
class QueueItem(zkobject.ZKObject):
"""Represents the position of a Change in a ChangeQueue.
@@ -4536,7 +4731,7 @@ class QueueItem(zkobject.ZKObject):
live=True, # Whether an item is intended to be processed at all
layout_uuid=None,
_cached_sql_results={},
- event=None, # The trigger event that lead to this queue item
+ event=None, # Info about the event that lead to this queue item
# Additional container for connection specifig information to be
# used by reporters throughout the lifecycle
@@ -4558,6 +4753,9 @@ class QueueItem(zkobject.ZKObject):
def new(klass, context, **kw):
obj = klass()
obj._set(**kw)
+ if COMPONENT_REGISTRY.model_api >= 13:
+ obj._set(event=obj.event and EventInfo.fromEvent(obj.event))
+
data = obj._trySerialize(context)
obj._save(context, data, create=True)
files_state = (BuildSet.COMPLETE if obj.change.files is not None
@@ -4586,10 +4784,18 @@ class QueueItem(zkobject.ZKObject):
return (tenant, pipeline, uuid)
def serialize(self, context):
- if isinstance(self.event, TriggerEvent):
- event_type = "TriggerEvent"
+ if COMPONENT_REGISTRY.model_api < 13:
+ if isinstance(self.event, TriggerEvent):
+ event_type = "TriggerEvent"
+ else:
+ event_type = self.event.__class__.__name__
else:
- event_type = self.event.__class__.__name__
+ event_type = "EventInfo"
+ if not isinstance(self.event, EventInfo):
+ # Convert our local trigger event to a trigger info
+ # object. This will only happen on the transition to
+ # model API version 13.
+ self._set(event=EventInfo.fromEvent(self.event))
data = {
"uuid": self.uuid,
@@ -4631,14 +4837,18 @@ class QueueItem(zkobject.ZKObject):
# child objects.
self._set(uuid=data["uuid"])
- event_type = data["event"]["type"]
- if event_type == "TriggerEvent":
- event_class = (
- self.pipeline.manager.sched.connections.getTriggerEventClass(
- data["event"]["data"]["driver_name"])
- )
+ if COMPONENT_REGISTRY.model_api < 13:
+ event_type = data["event"]["type"]
+ if event_type == "TriggerEvent":
+ event_class = (
+ self.pipeline.manager.sched.connections
+ .getTriggerEventClass(
+ data["event"]["data"]["driver_name"])
+ )
+ else:
+ event_class = EventTypeIndex.event_type_mapping.get(event_type)
else:
- event_class = EventTypeIndex.event_type_mapping.get(event_type)
+ event_class = EventInfo
if event_class is None:
raise NotImplementedError(
@@ -5860,8 +6070,7 @@ class Bundle:
def deserialize(cls, context, queue, items_by_path, data):
bundle = cls(data["uuid"])
bundle.items = [
- items_by_path.get(p) or QueueItem.fromZK(
- context, p, pipeline=queue.pipeline, queue=queue)
+ items_by_path.get(p) or QueueItem.fromZK(context, p, queue=queue)
for p in data["items"]
]
bundle.started_reporting = data["started_reporting"]
@@ -6968,6 +7177,7 @@ class TenantProjectConfig(object):
self.extra_config_dirs = ()
# Load config from a different branch if this is a config project
self.load_branch = None
+ self.merge_modes = None
def isAlwaysDynamicBranch(self, branch):
if self.always_dynamic_branches is None:
@@ -6978,24 +7188,15 @@ class TenantProjectConfig(object):
def includesBranch(self, branch):
if self.include_branches is not None:
- included = False
for r in self.include_branches:
if r.fullmatch(branch):
- included = True
- break
- else:
- included = True
- if not included:
+ return True
return False
- excluded = False
if self.exclude_branches is not None:
for r in self.exclude_branches:
if r.fullmatch(branch):
- excluded = True
- break
- if excluded:
- return False
+ return False
return True