From 98dcd51d90972b0a2ba6c6993300158a6d5e7b2d Mon Sep 17 00:00:00 2001 From: "James E. Blair" Date: Wed, 1 Feb 2023 16:13:32 -0800 Subject: Fix race condition in pipeline change list init Simon Westphahl describes the race condition: > [The race condition] can occur after a reconfiguration while > some schedulers are updating their local layout and some > already start processing pipelines in the same tenant. > > In this case the pipeline manager's `_postConfig()` method that > calls `PipelineChangeList.create(...)` races with the pipeline > processor updating the change keys. > > This leads to two change lists being written as separate > shards, that can't be correctly loaded, as all shards combined > are expected to form a single JSON object. > > The sequence of events seems to be the following: > 1. S1: pipeline processor needs to update the change keys > 2. S1 the shard writer will delete the `change_list` key with the old > shards > 3. S2: configloader calls the `_postConfig()` method > 4. S2: `PipelineChangeList.create()` notices that the `change_list` node > doesn't exists in Zookeeper: > https://opendev.org/zuul/zuul/src/branch/master/zuul/model.py#L921 > 6. S2: the shard writer creates the first shard `0000000000` > 7. S1: the shard writer creates the second shared `0000000001` > > The race condition seems to be introduced with > Ib1e467b5adb907f93bab0de61da84d2efc22e2a7 That change updated the pipeline manager _postConfig method so that it no longer acquires the pipeline lock when initalizing the pipeline state and change lists. This greatly reduces potential delays during reconfiguration, but has, perhaps predictably, lead to the race condition above. In the commit message for that change, I noted that we might be able to avoid even more work if we accept some caveats related to error reporting. Avoiding that work mean avoiding performing any writes during _postConfig which addresses the root cause of the race condition (steps 3-6 above. Ed. note: there is no step 5). From the commit message: > We still need to attach new state and change list objects to > the pipeline in _postConfig (since our pipeline object is new). > We also should make sure that the objects exist in ZK before we > leave that method, so that if a new pipeline is created, other > schedulers will be able to load the (potentially still empty) > objects from ZK. As an alternative, we could avoid even this > work in _postConfig, but then we would have to handle missing > objects on refresh, and it would not be possible to tell if the > object was missing due to it being new or due to an error. To > avoid masking errors, we keep the current expectation that we > will create these objects in ZK on the initial reconfiguration. The current change does exactly that. We no longer perform any ZK write operations on the state and change list objects in _postConfig. Instead, inside of the refresh methods, we detect the cases where they should be newly created and do so at that time. This happens with the pipeline lock, so is safe against any simultaneous operation from other components. There will be "ERROR" level log messages indicating that reading the state from ZK has failed when these objects are first initialized. To indicate that this is probably okay, they will now be immediately followed by "WARNING" level messages explaining that. Strictly speaking, this particular race should only occur for the change list object, not the pipeline state, since the race condition above requires a sharded object and of the two, only the change list is sharded. However, to keep the lifecycle of these two objects matched (and to simplify _postConfig) the same treatment is applied to both. Note that change I7fa99cd83a857216321f8d946fd42abd9ec427a3 merged after Ib1e467b and changed the behavior slightly, introducing the old_state and old_list arguments. Curiously, the old_list argument is effectively unused, so it is removed entirely in this change. Old_state still has a purpose and is retained. Change-Id: I519348e7d5d74e675808e990920480fb6e1fb981 --- tests/unit/test_scheduler.py | 3 +- tests/unit/test_zk.py | 76 ++++++++++++++++++++++++++++++++++++++++++-- 2 files changed, 76 insertions(+), 3 deletions(-) (limited to 'tests') diff --git a/tests/unit/test_scheduler.py b/tests/unit/test_scheduler.py index 80eddfc4a..30db44b62 100644 --- a/tests/unit/test_scheduler.py +++ b/tests/unit/test_scheduler.py @@ -490,9 +490,10 @@ class TestScheduler(ZuulTestCase): 'zuul.tenant.tenant-one.pipeline.gate.write_objects', 'zuul.tenant.tenant-one.pipeline.gate.read_znodes', 'zuul.tenant.tenant-one.pipeline.gate.write_znodes', - 'zuul.tenant.tenant-one.pipeline.gate.read_bytes', 'zuul.tenant.tenant-one.pipeline.gate.write_bytes', ]: + # 'zuul.tenant.tenant-one.pipeline.gate.read_bytes' is + # expected to be zero since it's initialized after reading val = self.assertReportedStat(key, kind='g') self.assertTrue(0.0 < float(val) < 60000.0) diff --git a/tests/unit/test_zk.py b/tests/unit/test_zk.py index 7e3c19dfe..f5c1fd4c4 100644 --- a/tests/unit/test_zk.py +++ b/tests/unit/test_zk.py @@ -18,6 +18,7 @@ import json import queue import threading import uuid +from unittest import mock import testtools @@ -53,10 +54,12 @@ from tests.base import ( BaseTestCase, HoldableExecutorApi, HoldableMergerApi, iterate_timeout ) -from zuul.zk.zkobject import ShardedZKObject, ZKObject, ZKContext +from zuul.zk.zkobject import ( + ShardedZKObject, ZKObject, ZKContext +) from zuul.zk.locks import tenant_write_lock -from kazoo.exceptions import ZookeeperError, OperationTimeoutError +from kazoo.exceptions import ZookeeperError, OperationTimeoutError, NoNodeError class ZooKeeperBaseTestCase(BaseTestCase): @@ -2037,3 +2040,72 @@ class TestBlobStore(ZooKeeperBaseTestCase): with testtools.ExpectedException(KeyError): bs.get(path) + + +class TestPipelineInit(ZooKeeperBaseTestCase): + # Test the initialize-on-refresh code paths of various pipeline objects + + def test_pipeline_state_new_object(self): + # Test the initialize-on-refresh code path with no existing object + tenant = model.Tenant('tenant') + pipeline = model.Pipeline('gate', tenant) + layout = model.Layout(tenant) + tenant.layout = layout + pipeline.state = model.PipelineState.create( + pipeline, layout.uuid, pipeline.state) + context = ZKContext(self.zk_client, None, None, self.log) + pipeline.state.refresh(context) + self.assertTrue(self.zk_client.client.exists(pipeline.state.getPath())) + + def test_pipeline_state_existing_object(self): + # Test the initialize-on-refresh code path with a pre-existing object + tenant = model.Tenant('tenant') + pipeline = model.Pipeline('gate', tenant) + layout = model.Layout(tenant) + tenant.layout = layout + pipeline.manager = mock.Mock() + pipeline.state = model.PipelineState.create( + pipeline, layout.uuid, pipeline.state) + pipeline.change_list = model.PipelineChangeList.create( + pipeline) + context = ZKContext(self.zk_client, None, None, self.log) + # We refresh the change list here purely for the side effect + # of creating the pipeline state object with no data (the list + # is a subpath of the state object). + pipeline.change_list.refresh(context) + pipeline.state.refresh(context) + self.assertTrue( + self.zk_client.client.exists(pipeline.change_list.getPath())) + self.assertTrue(self.zk_client.client.exists(pipeline.state.getPath())) + + def test_pipeline_change_list_new_object(self): + # Test the initialize-on-refresh code path with no existing object + tenant = model.Tenant('tenant') + pipeline = model.Pipeline('gate', tenant) + layout = model.Layout(tenant) + tenant.layout = layout + pipeline.state = model.PipelineState.create( + pipeline, layout.uuid, pipeline.state) + pipeline.change_list = model.PipelineChangeList.create( + pipeline) + context = ZKContext(self.zk_client, None, None, self.log) + pipeline.change_list.refresh(context) + self.assertTrue( + self.zk_client.client.exists(pipeline.change_list.getPath())) + + def test_pipeline_change_list_new_object_without_lock(self): + # Test the initialize-on-refresh code path if we don't have + # the lock. This should fail. + tenant = model.Tenant('tenant') + pipeline = model.Pipeline('gate', tenant) + layout = model.Layout(tenant) + tenant.layout = layout + pipeline.state = model.PipelineState.create( + pipeline, layout.uuid, pipeline.state) + pipeline.change_list = model.PipelineChangeList.create( + pipeline) + context = ZKContext(self.zk_client, None, None, self.log) + with testtools.ExpectedException(NoNodeError): + pipeline.change_list.refresh(context, allow_init=False) + self.assertIsNone( + self.zk_client.client.exists(pipeline.change_list.getPath())) -- cgit v1.2.1