summaryrefslogtreecommitdiff
path: root/zuul/model.py
diff options
context:
space:
mode:
authorJames E. Blair <jim@acmegating.com>2023-02-01 16:13:32 -0800
committerJames E. Blair <jim@acmegating.com>2023-02-10 15:03:08 -0800
commit98dcd51d90972b0a2ba6c6993300158a6d5e7b2d (patch)
tree4f7e4713a04ea9d9af532a0d2ac187ac7d3b20ad /zuul/model.py
parentc3334743f6424eee40eab4632b6b5c7654cc17f9 (diff)
downloadzuul-98dcd51d90972b0a2ba6c6993300158a6d5e7b2d.tar.gz
Fix race condition in pipeline change list init
Simon Westphahl describes the race condition: > [The race condition] can occur after a reconfiguration while > some schedulers are updating their local layout and some > already start processing pipelines in the same tenant. > > In this case the pipeline manager's `_postConfig()` method that > calls `PipelineChangeList.create(...)` races with the pipeline > processor updating the change keys. > > This leads to two change lists being written as separate > shards, that can't be correctly loaded, as all shards combined > are expected to form a single JSON object. > > The sequence of events seems to be the following: > 1. S1: pipeline processor needs to update the change keys > 2. S1 the shard writer will delete the `change_list` key with the old > shards > 3. S2: configloader calls the `_postConfig()` method > 4. S2: `PipelineChangeList.create()` notices that the `change_list` node > doesn't exists in Zookeeper: > https://opendev.org/zuul/zuul/src/branch/master/zuul/model.py#L921 > 6. S2: the shard writer creates the first shard `0000000000` > 7. S1: the shard writer creates the second shared `0000000001` > > The race condition seems to be introduced with > Ib1e467b5adb907f93bab0de61da84d2efc22e2a7 That change updated the pipeline manager _postConfig method so that it no longer acquires the pipeline lock when initalizing the pipeline state and change lists. This greatly reduces potential delays during reconfiguration, but has, perhaps predictably, lead to the race condition above. In the commit message for that change, I noted that we might be able to avoid even more work if we accept some caveats related to error reporting. Avoiding that work mean avoiding performing any writes during _postConfig which addresses the root cause of the race condition (steps 3-6 above. Ed. note: there is no step 5). From the commit message: > We still need to attach new state and change list objects to > the pipeline in _postConfig (since our pipeline object is new). > We also should make sure that the objects exist in ZK before we > leave that method, so that if a new pipeline is created, other > schedulers will be able to load the (potentially still empty) > objects from ZK. As an alternative, we could avoid even this > work in _postConfig, but then we would have to handle missing > objects on refresh, and it would not be possible to tell if the > object was missing due to it being new or due to an error. To > avoid masking errors, we keep the current expectation that we > will create these objects in ZK on the initial reconfiguration. The current change does exactly that. We no longer perform any ZK write operations on the state and change list objects in _postConfig. Instead, inside of the refresh methods, we detect the cases where they should be newly created and do so at that time. This happens with the pipeline lock, so is safe against any simultaneous operation from other components. There will be "ERROR" level log messages indicating that reading the state from ZK has failed when these objects are first initialized. To indicate that this is probably okay, they will now be immediately followed by "WARNING" level messages explaining that. Strictly speaking, this particular race should only occur for the change list object, not the pipeline state, since the race condition above requires a sharded object and of the two, only the change list is sharded. However, to keep the lifecycle of these two objects matched (and to simplify _postConfig) the same treatment is applied to both. Note that change I7fa99cd83a857216321f8d946fd42abd9ec427a3 merged after Ib1e467b and changed the behavior slightly, introducing the old_state and old_list arguments. Curiously, the old_list argument is effectively unused, so it is removed entirely in this change. Old_state still has a purpose and is retained. Change-Id: I519348e7d5d74e675808e990920480fb6e1fb981
Diffstat (limited to 'zuul/model.py')
-rw-r--r--zuul/model.py129
1 files changed, 100 insertions, 29 deletions
diff --git a/zuul/model.py b/zuul/model.py
index 8e0ae4eee..3b6a49710 100644
--- a/zuul/model.py
+++ b/zuul/model.py
@@ -620,6 +620,18 @@ class PipelineState(zkobject.ZKObject):
_read_only=False,
)
+ def _lateInitData(self):
+ # If we're initializing the object on our initial refresh,
+ # reset the data to this.
+ return dict(
+ state=Pipeline.STATE_NORMAL,
+ queues=[],
+ old_queues=[],
+ consecutive_failures=0,
+ disabled=False,
+ layout_uuid=self.pipeline.tenant.layout.uuid,
+ )
+
@classmethod
def fromZK(klass, context, path, pipeline, **kw):
obj = klass()
@@ -632,20 +644,22 @@ class PipelineState(zkobject.ZKObject):
@classmethod
def create(cls, pipeline, layout_uuid, old_state=None):
- # If the object does not exist in ZK, create it with the
- # default attributes and the supplied layout UUID. Otherwise,
- # return an initialized object (or the old object for reuse)
- # without loading any data so that data can be loaded on the
- # next refresh.
- ctx = pipeline.manager.current_context
+ # If we are resetting an existing pipeline, we will have an
+ # old_state, so just clean up the object references there and
+ # let the next refresh handle updating any data.
+ if old_state:
+ old_state._resetObjectRefs()
+ return old_state
+
+ # Otherwise, we are initializing a pipeline that we haven't
+ # seen before. It still might exist in ZK, but since we
+ # haven't seen it, we don't have any object references to
+ # clean up. We can just start with a clean object, set the
+ # pipeline reference, and let the next refresh deal with
+ # whether there might be any data in ZK.
state = cls()
state._set(pipeline=pipeline)
- if state.exists(ctx):
- if old_state:
- old_state._resetObjectRefs()
- return old_state
- return state
- return cls.new(ctx, pipeline=pipeline, layout_uuid=layout_uuid)
+ return state
def _resetObjectRefs(self):
# Update the pipeline references on the queue objects.
@@ -712,8 +726,34 @@ class PipelineState(zkobject.ZKObject):
# This is so that we can refresh the object in circumstances
# where we haven't verified that our local layout matches
# what's in ZK.
+
+ # Notably, this need not prevent us from performing the
+ # initialization below if necessary. The case of the object
+ # being brand new in ZK supercedes our worry that our old copy
+ # might be out of date since our old copy is, itself, brand
+ # new.
self._set(_read_only=read_only)
- return super().refresh(context)
+ try:
+ return super().refresh(context)
+ except NoNodeError:
+ # If the object doesn't exist we will receive a
+ # NoNodeError. This happens because the postConfig call
+ # creates this object without holding the pipeline lock,
+ # so it can't determine whether or not it exists in ZK.
+ # We do hold the pipeline lock here, so if we get this
+ # error, we know we're initializing the object, and we
+ # should write it to ZK.
+
+ # Note that typically this code is not used since
+ # currently other objects end up creating the pipeline
+ # path in ZK first. It is included in case that ever
+ # changes. Currently the empty byte-string code path in
+ # deserialize() is used instead.
+ context.log.warning("Initializing pipeline state for %s; "
+ "this is expected only for new pipelines",
+ self.pipeline.name)
+ self._set(**self._lateInitData())
+ self.internalCreate(context)
def deserialize(self, raw, context):
# We may have old change objects in the pipeline cache, so
@@ -721,6 +761,20 @@ class PipelineState(zkobject.ZKObject):
# source change cache.
self.pipeline.manager.clearCache()
+ # If the object doesn't exist we will get back an empty byte
+ # string. This happens because the postConfig call creates
+ # this object without holding the pipeline lock, so it can't
+ # determine whether or not it exists in ZK. We do hold the
+ # pipeline lock here, so if we get the empty byte string, we
+ # know we're initializing the object. In that case, we should
+ # initialize the layout id to the current layout. Nothing
+ # else needs to be set.
+ if raw == b'':
+ context.log.warning("Initializing pipeline state for %s; "
+ "this is expected only for new pipelines",
+ self.pipeline.name)
+ return self._lateInitData()
+
data = super().deserialize(raw, context)
if not self._read_only:
@@ -898,9 +952,31 @@ class PipelineChangeList(zkobject.ShardedZKObject):
_change_keys=[],
)
- def refresh(self, context):
- self._retry(context, super().refresh,
- context, max_tries=5)
+ def refresh(self, context, allow_init=True):
+ # Set allow_init to false to indicate that we don't hold the
+ # lock and we should not try to initialize the object in ZK if
+ # it does not exist.
+ try:
+ self._retry(context, super().refresh,
+ context, max_tries=5)
+ except NoNodeError:
+ # If the object doesn't exist we will receive a
+ # NoNodeError. This happens because the postConfig call
+ # creates this object without holding the pipeline lock,
+ # so it can't determine whether or not it exists in ZK.
+ # We do hold the pipeline lock here, so if we get this
+ # error, we know we're initializing the object, and
+ # we should write it to ZK.
+ if allow_init:
+ context.log.warning(
+ "Initializing pipeline change list for %s; "
+ "this is expected only for new pipelines",
+ self.pipeline.name)
+ self.internalCreate(context)
+ else:
+ # If we're called from a context where we can't
+ # initialize the change list, re-raise the exception.
+ raise
def getPath(self):
return self.getChangeListPath(self.pipeline)
@@ -911,19 +987,14 @@ class PipelineChangeList(zkobject.ShardedZKObject):
return pipeline_path + '/change_list'
@classmethod
- def create(cls, pipeline, old_list=None):
- # If the object does not exist in ZK, create it with the
- # default attributes. Otherwise, return an initialized object
- # (or the old object for reuse) without loading any data so
- # that data can be loaded on the next refresh.
- ctx = pipeline.manager.current_context
+ def create(cls, pipeline):
+ # This object may or may not exist in ZK, but we using any of
+ # that data here. We can just start with a clean object, set
+ # the pipeline reference, and let the next refresh deal with
+ # whether there might be any data in ZK.
change_list = cls()
change_list._set(pipeline=pipeline)
- if change_list.exists(ctx):
- if old_list:
- return old_list
- return change_list
- return cls.new(ctx, pipeline=pipeline)
+ return change_list
def serialize(self, context):
data = {
@@ -931,8 +1002,8 @@ class PipelineChangeList(zkobject.ShardedZKObject):
}
return json.dumps(data, sort_keys=True).encode("utf8")
- def deserialize(self, data, context):
- data = super().deserialize(data, context)
+ def deserialize(self, raw, context):
+ data = super().deserialize(raw, context)
change_keys = []
# We must have a dictionary with a 'changes' key; otherwise we
# may be reading immediately after truncating. Allow the