summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJames E. Blair <jim@acmegating.com>2023-02-01 16:13:32 -0800
committerJames E. Blair <jim@acmegating.com>2023-02-10 15:03:08 -0800
commit98dcd51d90972b0a2ba6c6993300158a6d5e7b2d (patch)
tree4f7e4713a04ea9d9af532a0d2ac187ac7d3b20ad
parentc3334743f6424eee40eab4632b6b5c7654cc17f9 (diff)
downloadzuul-98dcd51d90972b0a2ba6c6993300158a6d5e7b2d.tar.gz
Fix race condition in pipeline change list init
Simon Westphahl describes the race condition: > [The race condition] can occur after a reconfiguration while > some schedulers are updating their local layout and some > already start processing pipelines in the same tenant. > > In this case the pipeline manager's `_postConfig()` method that > calls `PipelineChangeList.create(...)` races with the pipeline > processor updating the change keys. > > This leads to two change lists being written as separate > shards, that can't be correctly loaded, as all shards combined > are expected to form a single JSON object. > > The sequence of events seems to be the following: > 1. S1: pipeline processor needs to update the change keys > 2. S1 the shard writer will delete the `change_list` key with the old > shards > 3. S2: configloader calls the `_postConfig()` method > 4. S2: `PipelineChangeList.create()` notices that the `change_list` node > doesn't exists in Zookeeper: > https://opendev.org/zuul/zuul/src/branch/master/zuul/model.py#L921 > 6. S2: the shard writer creates the first shard `0000000000` > 7. S1: the shard writer creates the second shared `0000000001` > > The race condition seems to be introduced with > Ib1e467b5adb907f93bab0de61da84d2efc22e2a7 That change updated the pipeline manager _postConfig method so that it no longer acquires the pipeline lock when initalizing the pipeline state and change lists. This greatly reduces potential delays during reconfiguration, but has, perhaps predictably, lead to the race condition above. In the commit message for that change, I noted that we might be able to avoid even more work if we accept some caveats related to error reporting. Avoiding that work mean avoiding performing any writes during _postConfig which addresses the root cause of the race condition (steps 3-6 above. Ed. note: there is no step 5). From the commit message: > We still need to attach new state and change list objects to > the pipeline in _postConfig (since our pipeline object is new). > We also should make sure that the objects exist in ZK before we > leave that method, so that if a new pipeline is created, other > schedulers will be able to load the (potentially still empty) > objects from ZK. As an alternative, we could avoid even this > work in _postConfig, but then we would have to handle missing > objects on refresh, and it would not be possible to tell if the > object was missing due to it being new or due to an error. To > avoid masking errors, we keep the current expectation that we > will create these objects in ZK on the initial reconfiguration. The current change does exactly that. We no longer perform any ZK write operations on the state and change list objects in _postConfig. Instead, inside of the refresh methods, we detect the cases where they should be newly created and do so at that time. This happens with the pipeline lock, so is safe against any simultaneous operation from other components. There will be "ERROR" level log messages indicating that reading the state from ZK has failed when these objects are first initialized. To indicate that this is probably okay, they will now be immediately followed by "WARNING" level messages explaining that. Strictly speaking, this particular race should only occur for the change list object, not the pipeline state, since the race condition above requires a sharded object and of the two, only the change list is sharded. However, to keep the lifecycle of these two objects matched (and to simplify _postConfig) the same treatment is applied to both. Note that change I7fa99cd83a857216321f8d946fd42abd9ec427a3 merged after Ib1e467b and changed the behavior slightly, introducing the old_state and old_list arguments. Curiously, the old_list argument is effectively unused, so it is removed entirely in this change. Old_state still has a purpose and is retained. Change-Id: I519348e7d5d74e675808e990920480fb6e1fb981
-rw-r--r--tests/unit/test_scheduler.py3
-rw-r--r--tests/unit/test_zk.py76
-rw-r--r--zuul/manager/__init__.py57
-rw-r--r--zuul/model.py129
-rw-r--r--zuul/scheduler.py14
-rw-r--r--zuul/zk/zkobject.py11
6 files changed, 241 insertions, 49 deletions
diff --git a/tests/unit/test_scheduler.py b/tests/unit/test_scheduler.py
index 80eddfc4a..30db44b62 100644
--- a/tests/unit/test_scheduler.py
+++ b/tests/unit/test_scheduler.py
@@ -490,9 +490,10 @@ class TestScheduler(ZuulTestCase):
'zuul.tenant.tenant-one.pipeline.gate.write_objects',
'zuul.tenant.tenant-one.pipeline.gate.read_znodes',
'zuul.tenant.tenant-one.pipeline.gate.write_znodes',
- 'zuul.tenant.tenant-one.pipeline.gate.read_bytes',
'zuul.tenant.tenant-one.pipeline.gate.write_bytes',
]:
+ # 'zuul.tenant.tenant-one.pipeline.gate.read_bytes' is
+ # expected to be zero since it's initialized after reading
val = self.assertReportedStat(key, kind='g')
self.assertTrue(0.0 < float(val) < 60000.0)
diff --git a/tests/unit/test_zk.py b/tests/unit/test_zk.py
index 7e3c19dfe..f5c1fd4c4 100644
--- a/tests/unit/test_zk.py
+++ b/tests/unit/test_zk.py
@@ -18,6 +18,7 @@ import json
import queue
import threading
import uuid
+from unittest import mock
import testtools
@@ -53,10 +54,12 @@ from tests.base import (
BaseTestCase, HoldableExecutorApi, HoldableMergerApi,
iterate_timeout
)
-from zuul.zk.zkobject import ShardedZKObject, ZKObject, ZKContext
+from zuul.zk.zkobject import (
+ ShardedZKObject, ZKObject, ZKContext
+)
from zuul.zk.locks import tenant_write_lock
-from kazoo.exceptions import ZookeeperError, OperationTimeoutError
+from kazoo.exceptions import ZookeeperError, OperationTimeoutError, NoNodeError
class ZooKeeperBaseTestCase(BaseTestCase):
@@ -2037,3 +2040,72 @@ class TestBlobStore(ZooKeeperBaseTestCase):
with testtools.ExpectedException(KeyError):
bs.get(path)
+
+
+class TestPipelineInit(ZooKeeperBaseTestCase):
+ # Test the initialize-on-refresh code paths of various pipeline objects
+
+ def test_pipeline_state_new_object(self):
+ # Test the initialize-on-refresh code path with no existing object
+ tenant = model.Tenant('tenant')
+ pipeline = model.Pipeline('gate', tenant)
+ layout = model.Layout(tenant)
+ tenant.layout = layout
+ pipeline.state = model.PipelineState.create(
+ pipeline, layout.uuid, pipeline.state)
+ context = ZKContext(self.zk_client, None, None, self.log)
+ pipeline.state.refresh(context)
+ self.assertTrue(self.zk_client.client.exists(pipeline.state.getPath()))
+
+ def test_pipeline_state_existing_object(self):
+ # Test the initialize-on-refresh code path with a pre-existing object
+ tenant = model.Tenant('tenant')
+ pipeline = model.Pipeline('gate', tenant)
+ layout = model.Layout(tenant)
+ tenant.layout = layout
+ pipeline.manager = mock.Mock()
+ pipeline.state = model.PipelineState.create(
+ pipeline, layout.uuid, pipeline.state)
+ pipeline.change_list = model.PipelineChangeList.create(
+ pipeline)
+ context = ZKContext(self.zk_client, None, None, self.log)
+ # We refresh the change list here purely for the side effect
+ # of creating the pipeline state object with no data (the list
+ # is a subpath of the state object).
+ pipeline.change_list.refresh(context)
+ pipeline.state.refresh(context)
+ self.assertTrue(
+ self.zk_client.client.exists(pipeline.change_list.getPath()))
+ self.assertTrue(self.zk_client.client.exists(pipeline.state.getPath()))
+
+ def test_pipeline_change_list_new_object(self):
+ # Test the initialize-on-refresh code path with no existing object
+ tenant = model.Tenant('tenant')
+ pipeline = model.Pipeline('gate', tenant)
+ layout = model.Layout(tenant)
+ tenant.layout = layout
+ pipeline.state = model.PipelineState.create(
+ pipeline, layout.uuid, pipeline.state)
+ pipeline.change_list = model.PipelineChangeList.create(
+ pipeline)
+ context = ZKContext(self.zk_client, None, None, self.log)
+ pipeline.change_list.refresh(context)
+ self.assertTrue(
+ self.zk_client.client.exists(pipeline.change_list.getPath()))
+
+ def test_pipeline_change_list_new_object_without_lock(self):
+ # Test the initialize-on-refresh code path if we don't have
+ # the lock. This should fail.
+ tenant = model.Tenant('tenant')
+ pipeline = model.Pipeline('gate', tenant)
+ layout = model.Layout(tenant)
+ tenant.layout = layout
+ pipeline.state = model.PipelineState.create(
+ pipeline, layout.uuid, pipeline.state)
+ pipeline.change_list = model.PipelineChangeList.create(
+ pipeline)
+ context = ZKContext(self.zk_client, None, None, self.log)
+ with testtools.ExpectedException(NoNodeError):
+ pipeline.change_list.refresh(context, allow_init=False)
+ self.assertIsNone(
+ self.zk_client.client.exists(pipeline.change_list.getPath()))
diff --git a/zuul/manager/__init__.py b/zuul/manager/__init__.py
index 832be780a..e85d5124e 100644
--- a/zuul/manager/__init__.py
+++ b/zuul/manager/__init__.py
@@ -28,6 +28,8 @@ from zuul.model import (
)
from zuul.zk.change_cache import ChangeKey
from zuul.zk.components import COMPONENT_REGISTRY
+from zuul.zk.exceptions import LockException
+from zuul.zk.locks import pipeline_lock
from opentelemetry import trace
@@ -95,21 +97,46 @@ class PipelineManager(metaclass=ABCMeta):
def _postConfig(self):
layout = self.pipeline.tenant.layout
self.buildChangeQueues(layout)
- with self.sched.createZKContext(None, self.log) as ctx,\
- self.currentContext(ctx):
- # Make sure we have state and change list objects, and
- # ensure that they exist in ZK. We don't hold the
- # pipeline lock, but if they don't exist, that means they
- # are new, so no one else will either, so the write on
- # create is okay. If they do exist and we have an old
- # object, we'll just reuse it. If it does exist and we
- # don't have an old object, we'll get a new empty one.
- # Regardless, these will not automatically refresh now, so
- # they will be out of date until they are refreshed later.
- self.pipeline.state = PipelineState.create(
- self.pipeline, layout.uuid, self.pipeline.state)
- self.pipeline.change_list = PipelineChangeList.create(
- self.pipeline)
+ # Make sure we have state and change list objects. We
+ # don't actually ensure they exist in ZK here; these are
+ # just local objects until they are serialized the first
+ # time. Since we don't hold the pipeline lock, we can't
+ # reliably perform any read or write operations; we just
+ # need to ensure we have in-memory objects to work with
+ # and they will be initialized or loaded on the next
+ # refresh.
+
+ # These will be out of date until they are refreshed later.
+ self.pipeline.state = PipelineState.create(
+ self.pipeline, layout.uuid, self.pipeline.state)
+ self.pipeline.change_list = PipelineChangeList.create(
+ self.pipeline)
+
+ # Now, try to acquire a non-blocking pipeline lock and refresh
+ # them for the side effect of initializing them if necessary.
+ # In the case of a new pipeline, no one else should have a
+ # lock anyway, and this helps us avoid emitting a whole bunch
+ # of errors elsewhere on startup when these objects don't
+ # exist. If the pipeline already exists and we can't acquire
+ # the lock, that's fine, we're much less likely to encounter
+ # read errors elsewhere in that case anyway.
+ try:
+ with pipeline_lock(
+ self.sched.zk_client, self.pipeline.tenant.name,
+ self.pipeline.name, blocking=False) as lock,\
+ self.sched.createZKContext(lock, self.log) as ctx,\
+ self.currentContext(ctx):
+ if not self.pipeline.state.exists(ctx):
+ # We only do this if the pipeline doesn't exist in
+ # ZK because in that case, this process should be
+ # fast since it's empty. If it does exist,
+ # refreshing it may be slow and since other actors
+ # won't encounter errors due to its absence, we
+ # would rather defer the work to later.
+ self.pipeline.state.refresh(ctx)
+ self.pipeline.change_list.refresh(ctx)
+ except LockException:
+ pass
def buildChangeQueues(self, layout):
self.log.debug("Building relative_priority queues")
diff --git a/zuul/model.py b/zuul/model.py
index 8e0ae4eee..3b6a49710 100644
--- a/zuul/model.py
+++ b/zuul/model.py
@@ -620,6 +620,18 @@ class PipelineState(zkobject.ZKObject):
_read_only=False,
)
+ def _lateInitData(self):
+ # If we're initializing the object on our initial refresh,
+ # reset the data to this.
+ return dict(
+ state=Pipeline.STATE_NORMAL,
+ queues=[],
+ old_queues=[],
+ consecutive_failures=0,
+ disabled=False,
+ layout_uuid=self.pipeline.tenant.layout.uuid,
+ )
+
@classmethod
def fromZK(klass, context, path, pipeline, **kw):
obj = klass()
@@ -632,20 +644,22 @@ class PipelineState(zkobject.ZKObject):
@classmethod
def create(cls, pipeline, layout_uuid, old_state=None):
- # If the object does not exist in ZK, create it with the
- # default attributes and the supplied layout UUID. Otherwise,
- # return an initialized object (or the old object for reuse)
- # without loading any data so that data can be loaded on the
- # next refresh.
- ctx = pipeline.manager.current_context
+ # If we are resetting an existing pipeline, we will have an
+ # old_state, so just clean up the object references there and
+ # let the next refresh handle updating any data.
+ if old_state:
+ old_state._resetObjectRefs()
+ return old_state
+
+ # Otherwise, we are initializing a pipeline that we haven't
+ # seen before. It still might exist in ZK, but since we
+ # haven't seen it, we don't have any object references to
+ # clean up. We can just start with a clean object, set the
+ # pipeline reference, and let the next refresh deal with
+ # whether there might be any data in ZK.
state = cls()
state._set(pipeline=pipeline)
- if state.exists(ctx):
- if old_state:
- old_state._resetObjectRefs()
- return old_state
- return state
- return cls.new(ctx, pipeline=pipeline, layout_uuid=layout_uuid)
+ return state
def _resetObjectRefs(self):
# Update the pipeline references on the queue objects.
@@ -712,8 +726,34 @@ class PipelineState(zkobject.ZKObject):
# This is so that we can refresh the object in circumstances
# where we haven't verified that our local layout matches
# what's in ZK.
+
+ # Notably, this need not prevent us from performing the
+ # initialization below if necessary. The case of the object
+ # being brand new in ZK supercedes our worry that our old copy
+ # might be out of date since our old copy is, itself, brand
+ # new.
self._set(_read_only=read_only)
- return super().refresh(context)
+ try:
+ return super().refresh(context)
+ except NoNodeError:
+ # If the object doesn't exist we will receive a
+ # NoNodeError. This happens because the postConfig call
+ # creates this object without holding the pipeline lock,
+ # so it can't determine whether or not it exists in ZK.
+ # We do hold the pipeline lock here, so if we get this
+ # error, we know we're initializing the object, and we
+ # should write it to ZK.
+
+ # Note that typically this code is not used since
+ # currently other objects end up creating the pipeline
+ # path in ZK first. It is included in case that ever
+ # changes. Currently the empty byte-string code path in
+ # deserialize() is used instead.
+ context.log.warning("Initializing pipeline state for %s; "
+ "this is expected only for new pipelines",
+ self.pipeline.name)
+ self._set(**self._lateInitData())
+ self.internalCreate(context)
def deserialize(self, raw, context):
# We may have old change objects in the pipeline cache, so
@@ -721,6 +761,20 @@ class PipelineState(zkobject.ZKObject):
# source change cache.
self.pipeline.manager.clearCache()
+ # If the object doesn't exist we will get back an empty byte
+ # string. This happens because the postConfig call creates
+ # this object without holding the pipeline lock, so it can't
+ # determine whether or not it exists in ZK. We do hold the
+ # pipeline lock here, so if we get the empty byte string, we
+ # know we're initializing the object. In that case, we should
+ # initialize the layout id to the current layout. Nothing
+ # else needs to be set.
+ if raw == b'':
+ context.log.warning("Initializing pipeline state for %s; "
+ "this is expected only for new pipelines",
+ self.pipeline.name)
+ return self._lateInitData()
+
data = super().deserialize(raw, context)
if not self._read_only:
@@ -898,9 +952,31 @@ class PipelineChangeList(zkobject.ShardedZKObject):
_change_keys=[],
)
- def refresh(self, context):
- self._retry(context, super().refresh,
- context, max_tries=5)
+ def refresh(self, context, allow_init=True):
+ # Set allow_init to false to indicate that we don't hold the
+ # lock and we should not try to initialize the object in ZK if
+ # it does not exist.
+ try:
+ self._retry(context, super().refresh,
+ context, max_tries=5)
+ except NoNodeError:
+ # If the object doesn't exist we will receive a
+ # NoNodeError. This happens because the postConfig call
+ # creates this object without holding the pipeline lock,
+ # so it can't determine whether or not it exists in ZK.
+ # We do hold the pipeline lock here, so if we get this
+ # error, we know we're initializing the object, and
+ # we should write it to ZK.
+ if allow_init:
+ context.log.warning(
+ "Initializing pipeline change list for %s; "
+ "this is expected only for new pipelines",
+ self.pipeline.name)
+ self.internalCreate(context)
+ else:
+ # If we're called from a context where we can't
+ # initialize the change list, re-raise the exception.
+ raise
def getPath(self):
return self.getChangeListPath(self.pipeline)
@@ -911,19 +987,14 @@ class PipelineChangeList(zkobject.ShardedZKObject):
return pipeline_path + '/change_list'
@classmethod
- def create(cls, pipeline, old_list=None):
- # If the object does not exist in ZK, create it with the
- # default attributes. Otherwise, return an initialized object
- # (or the old object for reuse) without loading any data so
- # that data can be loaded on the next refresh.
- ctx = pipeline.manager.current_context
+ def create(cls, pipeline):
+ # This object may or may not exist in ZK, but we using any of
+ # that data here. We can just start with a clean object, set
+ # the pipeline reference, and let the next refresh deal with
+ # whether there might be any data in ZK.
change_list = cls()
change_list._set(pipeline=pipeline)
- if change_list.exists(ctx):
- if old_list:
- return old_list
- return change_list
- return cls.new(ctx, pipeline=pipeline)
+ return change_list
def serialize(self, context):
data = {
@@ -931,8 +1002,8 @@ class PipelineChangeList(zkobject.ShardedZKObject):
}
return json.dumps(data, sort_keys=True).encode("utf8")
- def deserialize(self, data, context):
- data = super().deserialize(data, context)
+ def deserialize(self, raw, context):
+ data = super().deserialize(raw, context)
change_keys = []
# We must have a dictionary with a 'changes' key; otherwise we
# may be reading immediately after truncating. Allow the
diff --git a/zuul/scheduler.py b/zuul/scheduler.py
index 7f61f3fe4..a546339c3 100644
--- a/zuul/scheduler.py
+++ b/zuul/scheduler.py
@@ -2354,7 +2354,9 @@ class Scheduler(threading.Thread):
for pipeline in tenant.layout.pipelines.values():
self.log.debug("Gather relevant cache items for: %s %s",
tenant.name, pipeline.name)
- pipeline.change_list.refresh(ctx)
+ # This will raise an exception and abort the process if
+ # unable to refresh the change list.
+ pipeline.change_list.refresh(ctx, allow_init=False)
change_keys = pipeline.change_list.getChangeKeys()
relevant_changes = pipeline.manager.resolveChangeKeys(
change_keys)
@@ -2383,8 +2385,16 @@ class Scheduler(threading.Thread):
# Update the pipeline changes
ctx = self.createZKContext(None, self.log)
for pipeline in tenant.layout.pipelines.values():
+ # This will raise an exception if it is unable to
+ # refresh the change list. We will proceed anyway
+ # and use our data from the last time we did
+ # refresh in order to avoid stalling trigger
+ # processing. In this case we may not forward
+ # some events which are related to changes in the
+ # pipeline but don't match the pipeline trigger
+ # criteria.
try:
- pipeline.change_list.refresh(ctx)
+ pipeline.change_list.refresh(ctx, allow_init=False)
except Exception:
self.log.exception(
"Unable to refresh pipeline change list for %s",
diff --git a/zuul/zk/zkobject.py b/zuul/zk/zkobject.py
index b228ecaa4..87d76bca6 100644
--- a/zuul/zk/zkobject.py
+++ b/zuul/zk/zkobject.py
@@ -233,7 +233,18 @@ class ZKObject:
obj._load(context, path=path)
return obj
+ def internalCreate(self, context):
+ """Create the object in ZK from an existing ZKObject
+
+ This should only be used in special circumstances: when we
+ know it's safe to start using a ZKObject before it's actually
+ created in ZK. Normally use .new()
+ """
+ data = self._trySerialize(context)
+ self._save(context, data, create=True)
+
def refresh(self, context):
+
"""Update data from ZK"""
self._load(context)