diff options
Diffstat (limited to 'zuul/manager/__init__.py')
-rw-r--r-- | zuul/manager/__init__.py | 69 |
1 files changed, 48 insertions, 21 deletions
diff --git a/zuul/manager/__init__.py b/zuul/manager/__init__.py index 60eb479e0..36361df11 100644 --- a/zuul/manager/__init__.py +++ b/zuul/manager/__init__.py @@ -28,6 +28,8 @@ from zuul.model import ( ) from zuul.zk.change_cache import ChangeKey from zuul.zk.components import COMPONENT_REGISTRY +from zuul.zk.exceptions import LockException +from zuul.zk.locks import pipeline_lock from opentelemetry import trace @@ -95,21 +97,46 @@ class PipelineManager(metaclass=ABCMeta): def _postConfig(self): layout = self.pipeline.tenant.layout self.buildChangeQueues(layout) - with self.sched.createZKContext(None, self.log) as ctx,\ - self.currentContext(ctx): - # Make sure we have state and change list objects, and - # ensure that they exist in ZK. We don't hold the - # pipeline lock, but if they don't exist, that means they - # are new, so no one else will either, so the write on - # create is okay. If they do exist and we have an old - # object, we'll just reuse it. If it does exist and we - # don't have an old object, we'll get a new empty one. - # Regardless, these will not automatically refresh now, so - # they will be out of date until they are refreshed later. - self.pipeline.state = PipelineState.create( - self.pipeline, layout.uuid, self.pipeline.state) - self.pipeline.change_list = PipelineChangeList.create( - self.pipeline) + # Make sure we have state and change list objects. We + # don't actually ensure they exist in ZK here; these are + # just local objects until they are serialized the first + # time. Since we don't hold the pipeline lock, we can't + # reliably perform any read or write operations; we just + # need to ensure we have in-memory objects to work with + # and they will be initialized or loaded on the next + # refresh. + + # These will be out of date until they are refreshed later. + self.pipeline.state = PipelineState.create( + self.pipeline, self.pipeline.state) + self.pipeline.change_list = PipelineChangeList.create( + self.pipeline) + + # Now, try to acquire a non-blocking pipeline lock and refresh + # them for the side effect of initializing them if necessary. + # In the case of a new pipeline, no one else should have a + # lock anyway, and this helps us avoid emitting a whole bunch + # of errors elsewhere on startup when these objects don't + # exist. If the pipeline already exists and we can't acquire + # the lock, that's fine, we're much less likely to encounter + # read errors elsewhere in that case anyway. + try: + with pipeline_lock( + self.sched.zk_client, self.pipeline.tenant.name, + self.pipeline.name, blocking=False) as lock,\ + self.sched.createZKContext(lock, self.log) as ctx,\ + self.currentContext(ctx): + if not self.pipeline.state.exists(ctx): + # We only do this if the pipeline doesn't exist in + # ZK because in that case, this process should be + # fast since it's empty. If it does exist, + # refreshing it may be slow and since other actors + # won't encounter errors due to its absence, we + # would rather defer the work to later. + self.pipeline.state.refresh(ctx) + self.pipeline.change_list.refresh(ctx) + except LockException: + pass def buildChangeQueues(self, layout): self.log.debug("Building relative_priority queues") @@ -276,19 +303,19 @@ class PipelineManager(metaclass=ABCMeta): if not isinstance(change, model.Change): return - change_in_pipeline = False + to_refresh = set() for item in self.pipeline.getAllItems(): if not isinstance(item.change, model.Change): continue + if item.change.equals(change): + to_refresh.add(item.change) for dep_change_ref in item.change.commit_needs_changes: - if item.change.equals(change): - change_in_pipeline = True dep_change_key = ChangeKey.fromReference(dep_change_ref) if dep_change_key.isSameChange(change.cache_stat.key): - self.updateCommitDependencies(item.change, None, event) + to_refresh.add(item.change) - if change_in_pipeline: - self.updateCommitDependencies(change, None, event) + for existing_change in to_refresh: + self.updateCommitDependencies(existing_change, None, event) def reportEnqueue(self, item): if not self.pipeline.state.disabled: |