summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--tests/unit/test_scheduler.py3
-rw-r--r--tests/unit/test_zk.py76
-rw-r--r--zuul/manager/__init__.py57
-rw-r--r--zuul/model.py129
-rw-r--r--zuul/scheduler.py14
-rw-r--r--zuul/zk/zkobject.py11
6 files changed, 241 insertions, 49 deletions
diff --git a/tests/unit/test_scheduler.py b/tests/unit/test_scheduler.py
index 80eddfc4a..30db44b62 100644
--- a/tests/unit/test_scheduler.py
+++ b/tests/unit/test_scheduler.py
@@ -490,9 +490,10 @@ class TestScheduler(ZuulTestCase):
'zuul.tenant.tenant-one.pipeline.gate.write_objects',
'zuul.tenant.tenant-one.pipeline.gate.read_znodes',
'zuul.tenant.tenant-one.pipeline.gate.write_znodes',
- 'zuul.tenant.tenant-one.pipeline.gate.read_bytes',
'zuul.tenant.tenant-one.pipeline.gate.write_bytes',
]:
+ # 'zuul.tenant.tenant-one.pipeline.gate.read_bytes' is
+ # expected to be zero since it's initialized after reading
val = self.assertReportedStat(key, kind='g')
self.assertTrue(0.0 < float(val) < 60000.0)
diff --git a/tests/unit/test_zk.py b/tests/unit/test_zk.py
index 7e3c19dfe..f5c1fd4c4 100644
--- a/tests/unit/test_zk.py
+++ b/tests/unit/test_zk.py
@@ -18,6 +18,7 @@ import json
import queue
import threading
import uuid
+from unittest import mock
import testtools
@@ -53,10 +54,12 @@ from tests.base import (
BaseTestCase, HoldableExecutorApi, HoldableMergerApi,
iterate_timeout
)
-from zuul.zk.zkobject import ShardedZKObject, ZKObject, ZKContext
+from zuul.zk.zkobject import (
+ ShardedZKObject, ZKObject, ZKContext
+)
from zuul.zk.locks import tenant_write_lock
-from kazoo.exceptions import ZookeeperError, OperationTimeoutError
+from kazoo.exceptions import ZookeeperError, OperationTimeoutError, NoNodeError
class ZooKeeperBaseTestCase(BaseTestCase):
@@ -2037,3 +2040,72 @@ class TestBlobStore(ZooKeeperBaseTestCase):
with testtools.ExpectedException(KeyError):
bs.get(path)
+
+
+class TestPipelineInit(ZooKeeperBaseTestCase):
+ # Test the initialize-on-refresh code paths of various pipeline objects
+
+ def test_pipeline_state_new_object(self):
+ # Test the initialize-on-refresh code path with no existing object
+ tenant = model.Tenant('tenant')
+ pipeline = model.Pipeline('gate', tenant)
+ layout = model.Layout(tenant)
+ tenant.layout = layout
+ pipeline.state = model.PipelineState.create(
+ pipeline, layout.uuid, pipeline.state)
+ context = ZKContext(self.zk_client, None, None, self.log)
+ pipeline.state.refresh(context)
+ self.assertTrue(self.zk_client.client.exists(pipeline.state.getPath()))
+
+ def test_pipeline_state_existing_object(self):
+ # Test the initialize-on-refresh code path with a pre-existing object
+ tenant = model.Tenant('tenant')
+ pipeline = model.Pipeline('gate', tenant)
+ layout = model.Layout(tenant)
+ tenant.layout = layout
+ pipeline.manager = mock.Mock()
+ pipeline.state = model.PipelineState.create(
+ pipeline, layout.uuid, pipeline.state)
+ pipeline.change_list = model.PipelineChangeList.create(
+ pipeline)
+ context = ZKContext(self.zk_client, None, None, self.log)
+ # We refresh the change list here purely for the side effect
+ # of creating the pipeline state object with no data (the list
+ # is a subpath of the state object).
+ pipeline.change_list.refresh(context)
+ pipeline.state.refresh(context)
+ self.assertTrue(
+ self.zk_client.client.exists(pipeline.change_list.getPath()))
+ self.assertTrue(self.zk_client.client.exists(pipeline.state.getPath()))
+
+ def test_pipeline_change_list_new_object(self):
+ # Test the initialize-on-refresh code path with no existing object
+ tenant = model.Tenant('tenant')
+ pipeline = model.Pipeline('gate', tenant)
+ layout = model.Layout(tenant)
+ tenant.layout = layout
+ pipeline.state = model.PipelineState.create(
+ pipeline, layout.uuid, pipeline.state)
+ pipeline.change_list = model.PipelineChangeList.create(
+ pipeline)
+ context = ZKContext(self.zk_client, None, None, self.log)
+ pipeline.change_list.refresh(context)
+ self.assertTrue(
+ self.zk_client.client.exists(pipeline.change_list.getPath()))
+
+ def test_pipeline_change_list_new_object_without_lock(self):
+ # Test the initialize-on-refresh code path if we don't have
+ # the lock. This should fail.
+ tenant = model.Tenant('tenant')
+ pipeline = model.Pipeline('gate', tenant)
+ layout = model.Layout(tenant)
+ tenant.layout = layout
+ pipeline.state = model.PipelineState.create(
+ pipeline, layout.uuid, pipeline.state)
+ pipeline.change_list = model.PipelineChangeList.create(
+ pipeline)
+ context = ZKContext(self.zk_client, None, None, self.log)
+ with testtools.ExpectedException(NoNodeError):
+ pipeline.change_list.refresh(context, allow_init=False)
+ self.assertIsNone(
+ self.zk_client.client.exists(pipeline.change_list.getPath()))
diff --git a/zuul/manager/__init__.py b/zuul/manager/__init__.py
index 832be780a..e85d5124e 100644
--- a/zuul/manager/__init__.py
+++ b/zuul/manager/__init__.py
@@ -28,6 +28,8 @@ from zuul.model import (
)
from zuul.zk.change_cache import ChangeKey
from zuul.zk.components import COMPONENT_REGISTRY
+from zuul.zk.exceptions import LockException
+from zuul.zk.locks import pipeline_lock
from opentelemetry import trace
@@ -95,21 +97,46 @@ class PipelineManager(metaclass=ABCMeta):
def _postConfig(self):
layout = self.pipeline.tenant.layout
self.buildChangeQueues(layout)
- with self.sched.createZKContext(None, self.log) as ctx,\
- self.currentContext(ctx):
- # Make sure we have state and change list objects, and
- # ensure that they exist in ZK. We don't hold the
- # pipeline lock, but if they don't exist, that means they
- # are new, so no one else will either, so the write on
- # create is okay. If they do exist and we have an old
- # object, we'll just reuse it. If it does exist and we
- # don't have an old object, we'll get a new empty one.
- # Regardless, these will not automatically refresh now, so
- # they will be out of date until they are refreshed later.
- self.pipeline.state = PipelineState.create(
- self.pipeline, layout.uuid, self.pipeline.state)
- self.pipeline.change_list = PipelineChangeList.create(
- self.pipeline)
+ # Make sure we have state and change list objects. We
+ # don't actually ensure they exist in ZK here; these are
+ # just local objects until they are serialized the first
+ # time. Since we don't hold the pipeline lock, we can't
+ # reliably perform any read or write operations; we just
+ # need to ensure we have in-memory objects to work with
+ # and they will be initialized or loaded on the next
+ # refresh.
+
+ # These will be out of date until they are refreshed later.
+ self.pipeline.state = PipelineState.create(
+ self.pipeline, layout.uuid, self.pipeline.state)
+ self.pipeline.change_list = PipelineChangeList.create(
+ self.pipeline)
+
+ # Now, try to acquire a non-blocking pipeline lock and refresh
+ # them for the side effect of initializing them if necessary.
+ # In the case of a new pipeline, no one else should have a
+ # lock anyway, and this helps us avoid emitting a whole bunch
+ # of errors elsewhere on startup when these objects don't
+ # exist. If the pipeline already exists and we can't acquire
+ # the lock, that's fine, we're much less likely to encounter
+ # read errors elsewhere in that case anyway.
+ try:
+ with pipeline_lock(
+ self.sched.zk_client, self.pipeline.tenant.name,
+ self.pipeline.name, blocking=False) as lock,\
+ self.sched.createZKContext(lock, self.log) as ctx,\
+ self.currentContext(ctx):
+ if not self.pipeline.state.exists(ctx):
+ # We only do this if the pipeline doesn't exist in
+ # ZK because in that case, this process should be
+ # fast since it's empty. If it does exist,
+ # refreshing it may be slow and since other actors
+ # won't encounter errors due to its absence, we
+ # would rather defer the work to later.
+ self.pipeline.state.refresh(ctx)
+ self.pipeline.change_list.refresh(ctx)
+ except LockException:
+ pass
def buildChangeQueues(self, layout):
self.log.debug("Building relative_priority queues")
diff --git a/zuul/model.py b/zuul/model.py
index 8e0ae4eee..3b6a49710 100644
--- a/zuul/model.py
+++ b/zuul/model.py
@@ -620,6 +620,18 @@ class PipelineState(zkobject.ZKObject):
_read_only=False,
)
+ def _lateInitData(self):
+ # If we're initializing the object on our initial refresh,
+ # reset the data to this.
+ return dict(
+ state=Pipeline.STATE_NORMAL,
+ queues=[],
+ old_queues=[],
+ consecutive_failures=0,
+ disabled=False,
+ layout_uuid=self.pipeline.tenant.layout.uuid,
+ )
+
@classmethod
def fromZK(klass, context, path, pipeline, **kw):
obj = klass()
@@ -632,20 +644,22 @@ class PipelineState(zkobject.ZKObject):
@classmethod
def create(cls, pipeline, layout_uuid, old_state=None):
- # If the object does not exist in ZK, create it with the
- # default attributes and the supplied layout UUID. Otherwise,
- # return an initialized object (or the old object for reuse)
- # without loading any data so that data can be loaded on the
- # next refresh.
- ctx = pipeline.manager.current_context
+ # If we are resetting an existing pipeline, we will have an
+ # old_state, so just clean up the object references there and
+ # let the next refresh handle updating any data.
+ if old_state:
+ old_state._resetObjectRefs()
+ return old_state
+
+ # Otherwise, we are initializing a pipeline that we haven't
+ # seen before. It still might exist in ZK, but since we
+ # haven't seen it, we don't have any object references to
+ # clean up. We can just start with a clean object, set the
+ # pipeline reference, and let the next refresh deal with
+ # whether there might be any data in ZK.
state = cls()
state._set(pipeline=pipeline)
- if state.exists(ctx):
- if old_state:
- old_state._resetObjectRefs()
- return old_state
- return state
- return cls.new(ctx, pipeline=pipeline, layout_uuid=layout_uuid)
+ return state
def _resetObjectRefs(self):
# Update the pipeline references on the queue objects.
@@ -712,8 +726,34 @@ class PipelineState(zkobject.ZKObject):
# This is so that we can refresh the object in circumstances
# where we haven't verified that our local layout matches
# what's in ZK.
+
+ # Notably, this need not prevent us from performing the
+ # initialization below if necessary. The case of the object
+ # being brand new in ZK supercedes our worry that our old copy
+ # might be out of date since our old copy is, itself, brand
+ # new.
self._set(_read_only=read_only)
- return super().refresh(context)
+ try:
+ return super().refresh(context)
+ except NoNodeError:
+ # If the object doesn't exist we will receive a
+ # NoNodeError. This happens because the postConfig call
+ # creates this object without holding the pipeline lock,
+ # so it can't determine whether or not it exists in ZK.
+ # We do hold the pipeline lock here, so if we get this
+ # error, we know we're initializing the object, and we
+ # should write it to ZK.
+
+ # Note that typically this code is not used since
+ # currently other objects end up creating the pipeline
+ # path in ZK first. It is included in case that ever
+ # changes. Currently the empty byte-string code path in
+ # deserialize() is used instead.
+ context.log.warning("Initializing pipeline state for %s; "
+ "this is expected only for new pipelines",
+ self.pipeline.name)
+ self._set(**self._lateInitData())
+ self.internalCreate(context)
def deserialize(self, raw, context):
# We may have old change objects in the pipeline cache, so
@@ -721,6 +761,20 @@ class PipelineState(zkobject.ZKObject):
# source change cache.
self.pipeline.manager.clearCache()
+ # If the object doesn't exist we will get back an empty byte
+ # string. This happens because the postConfig call creates
+ # this object without holding the pipeline lock, so it can't
+ # determine whether or not it exists in ZK. We do hold the
+ # pipeline lock here, so if we get the empty byte string, we
+ # know we're initializing the object. In that case, we should
+ # initialize the layout id to the current layout. Nothing
+ # else needs to be set.
+ if raw == b'':
+ context.log.warning("Initializing pipeline state for %s; "
+ "this is expected only for new pipelines",
+ self.pipeline.name)
+ return self._lateInitData()
+
data = super().deserialize(raw, context)
if not self._read_only:
@@ -898,9 +952,31 @@ class PipelineChangeList(zkobject.ShardedZKObject):
_change_keys=[],
)
- def refresh(self, context):
- self._retry(context, super().refresh,
- context, max_tries=5)
+ def refresh(self, context, allow_init=True):
+ # Set allow_init to false to indicate that we don't hold the
+ # lock and we should not try to initialize the object in ZK if
+ # it does not exist.
+ try:
+ self._retry(context, super().refresh,
+ context, max_tries=5)
+ except NoNodeError:
+ # If the object doesn't exist we will receive a
+ # NoNodeError. This happens because the postConfig call
+ # creates this object without holding the pipeline lock,
+ # so it can't determine whether or not it exists in ZK.
+ # We do hold the pipeline lock here, so if we get this
+ # error, we know we're initializing the object, and
+ # we should write it to ZK.
+ if allow_init:
+ context.log.warning(
+ "Initializing pipeline change list for %s; "
+ "this is expected only for new pipelines",
+ self.pipeline.name)
+ self.internalCreate(context)
+ else:
+ # If we're called from a context where we can't
+ # initialize the change list, re-raise the exception.
+ raise
def getPath(self):
return self.getChangeListPath(self.pipeline)
@@ -911,19 +987,14 @@ class PipelineChangeList(zkobject.ShardedZKObject):
return pipeline_path + '/change_list'
@classmethod
- def create(cls, pipeline, old_list=None):
- # If the object does not exist in ZK, create it with the
- # default attributes. Otherwise, return an initialized object
- # (or the old object for reuse) without loading any data so
- # that data can be loaded on the next refresh.
- ctx = pipeline.manager.current_context
+ def create(cls, pipeline):
+ # This object may or may not exist in ZK, but we using any of
+ # that data here. We can just start with a clean object, set
+ # the pipeline reference, and let the next refresh deal with
+ # whether there might be any data in ZK.
change_list = cls()
change_list._set(pipeline=pipeline)
- if change_list.exists(ctx):
- if old_list:
- return old_list
- return change_list
- return cls.new(ctx, pipeline=pipeline)
+ return change_list
def serialize(self, context):
data = {
@@ -931,8 +1002,8 @@ class PipelineChangeList(zkobject.ShardedZKObject):
}
return json.dumps(data, sort_keys=True).encode("utf8")
- def deserialize(self, data, context):
- data = super().deserialize(data, context)
+ def deserialize(self, raw, context):
+ data = super().deserialize(raw, context)
change_keys = []
# We must have a dictionary with a 'changes' key; otherwise we
# may be reading immediately after truncating. Allow the
diff --git a/zuul/scheduler.py b/zuul/scheduler.py
index 7f61f3fe4..a546339c3 100644
--- a/zuul/scheduler.py
+++ b/zuul/scheduler.py
@@ -2354,7 +2354,9 @@ class Scheduler(threading.Thread):
for pipeline in tenant.layout.pipelines.values():
self.log.debug("Gather relevant cache items for: %s %s",
tenant.name, pipeline.name)
- pipeline.change_list.refresh(ctx)
+ # This will raise an exception and abort the process if
+ # unable to refresh the change list.
+ pipeline.change_list.refresh(ctx, allow_init=False)
change_keys = pipeline.change_list.getChangeKeys()
relevant_changes = pipeline.manager.resolveChangeKeys(
change_keys)
@@ -2383,8 +2385,16 @@ class Scheduler(threading.Thread):
# Update the pipeline changes
ctx = self.createZKContext(None, self.log)
for pipeline in tenant.layout.pipelines.values():
+ # This will raise an exception if it is unable to
+ # refresh the change list. We will proceed anyway
+ # and use our data from the last time we did
+ # refresh in order to avoid stalling trigger
+ # processing. In this case we may not forward
+ # some events which are related to changes in the
+ # pipeline but don't match the pipeline trigger
+ # criteria.
try:
- pipeline.change_list.refresh(ctx)
+ pipeline.change_list.refresh(ctx, allow_init=False)
except Exception:
self.log.exception(
"Unable to refresh pipeline change list for %s",
diff --git a/zuul/zk/zkobject.py b/zuul/zk/zkobject.py
index b228ecaa4..87d76bca6 100644
--- a/zuul/zk/zkobject.py
+++ b/zuul/zk/zkobject.py
@@ -233,7 +233,18 @@ class ZKObject:
obj._load(context, path=path)
return obj
+ def internalCreate(self, context):
+ """Create the object in ZK from an existing ZKObject
+
+ This should only be used in special circumstances: when we
+ know it's safe to start using a ZKObject before it's actually
+ created in ZK. Normally use .new()
+ """
+ data = self._trySerialize(context)
+ self._save(context, data, create=True)
+
def refresh(self, context):
+
"""Update data from ZK"""
self._load(context)