summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJames E. Blair <jeblair@redhat.com>2020-04-24 11:03:48 -0700
committerJames E. Blair <jeblair@redhat.com>2020-04-29 14:59:15 -0700
commit22640baef4d33c80135aace1d11a223278b02e57 (patch)
treebd7f7ede79144e62a4c1db03bcdc9daf131b1a0a
parentccddc37fa04dc66ab29b9bc97cc63b0665e64430 (diff)
downloadzuul-22640baef4d33c80135aace1d11a223278b02e57.tar.gz
Add serial pipeline manager
This is designed to handle the case where we want: * The pipeline to be triggered by changes so results report back to the change. * Triggered on change merged. * Jobs with file matchers so that if a subsystem is changed, only the deployment job for that subsystem is run. * Every change is processed in strict sequence. This is designed to accomodate a deployment pipeline with the above constraints. The pipeline manager hierarchy is getting complicated; mark the base class as abstract, and also move the shared-queue methods into an intermediate abstract class. These are shared by both serial and dependent managers. Change-Id: I3c5f3b2f6298292c5c25665923e3a10b07be5419
-rw-r--r--doc/source/examples/pipelines/gerrit-reference-pipelines.yaml18
-rw-r--r--doc/source/reference/pipeline_def.rst40
-rw-r--r--releasenotes/notes/serial-manager-8f2dcf924c72effe.yaml6
-rw-r--r--tests/fixtures/layouts/serial.yaml48
-rw-r--r--tests/unit/test_serial.py130
-rw-r--r--zuul/configloader.py5
-rw-r--r--zuul/manager/__init__.py3
-rw-r--r--zuul/manager/dependent.py79
-rw-r--r--zuul/manager/serial.py37
-rw-r--r--zuul/manager/shared.py84
10 files changed, 377 insertions, 73 deletions
diff --git a/doc/source/examples/pipelines/gerrit-reference-pipelines.yaml b/doc/source/examples/pipelines/gerrit-reference-pipelines.yaml
index a20647a03..4e0fccaf5 100644
--- a/doc/source/examples/pipelines/gerrit-reference-pipelines.yaml
+++ b/doc/source/examples/pipelines/gerrit-reference-pipelines.yaml
@@ -100,6 +100,24 @@
sqlreporter:
- pipeline:
+ name: deploy
+ description: |
+ This pipeline runs jobs that operate after each change is merged
+ in order to deploy to production.
+ manager: serial
+ precedence: high
+ post-review: True
+ trigger:
+ gerrit:
+ - event: change-merged
+ success:
+ gerrit: {}
+ sqlreporter:
+ failure:
+ gerrit: {}
+ sqlreporter:
+
+- pipeline:
name: release
description: |
When a commit is tagged as a release, this pipeline runs jobs
diff --git a/doc/source/reference/pipeline_def.rst b/doc/source/reference/pipeline_def.rst
index 8b73282e2..940758ba3 100644
--- a/doc/source/reference/pipeline_def.rst
+++ b/doc/source/reference/pipeline_def.rst
@@ -63,7 +63,18 @@ success, the pipeline reports back to Gerrit with ``Verified`` vote of
.. attr:: manager
:required:
- There are three schemes for managing pipelines:
+ There are several schemes for managing pipelines. The following
+ table summarizes their features; each is described in detail
+ below.
+
+ =========== ============ ===== ============= =========
+ Manager Dependencies Merge Shared Queues Window
+ =========== ============ ===== ============= =========
+ Independent No No No Unlimited
+ Dependent Yes Yes Yes Variable
+ Serial No No Yes 1
+ Supercedent No No Project-ref 1
+ =========== ============ ===== ============= =========
.. value:: independent
@@ -107,6 +118,22 @@ success, the pipeline reports back to Gerrit with ``Verified`` vote of
For more detail on the theory and operation of Zuul's
dependent pipeline manager, see: :doc:`/discussion/gating`.
+ .. value:: serial
+
+ This pipeline manager supports shared queues (like depedent
+ pipelines) but only one item in each shared queue is
+ processed at a time.
+
+ This may be useful for post-merge pipelines which perform
+ partial production deployments (i.e., there are jobs with
+ file matchers which only deploy to affected parts of the
+ system). In such a case it is important for every change to
+ be processed, but they must still be processed one at a time
+ in order to ensure that the production system is not
+ inadvertently regressed. Support for shared queues ensures
+ that if multiple projects are involved deployment runs still
+ execute sequentially.
+
.. value:: supercedent
This is like an independent pipeline, in that every item is
@@ -124,11 +151,12 @@ success, the pipeline reports back to Gerrit with ``Verified`` vote of
these cases, build resources can be conserved by avoiding
building intermediate versions.
- .. note:: Since this pipeline filters intermediate buildsets using
- it in combination with file filters on jobs is dangerous.
- In this case jobs of in between buildsets can be
- unexpectedly skipped entirely. If file filters are needed
- the independent pipeline manager should be used.
+ .. note:: Since this pipeline filters intermediate buildsets
+ using it in combination with file filters on jobs
+ is dangerous. In this case jobs of in between
+ buildsets can be unexpectedly skipped entirely. If
+ file filters are needed the ``independent`` or
+ ``serial`` pipeline managers should be used.
.. attr:: post-review
:default: false
diff --git a/releasenotes/notes/serial-manager-8f2dcf924c72effe.yaml b/releasenotes/notes/serial-manager-8f2dcf924c72effe.yaml
new file mode 100644
index 000000000..dd2a50ab2
--- /dev/null
+++ b/releasenotes/notes/serial-manager-8f2dcf924c72effe.yaml
@@ -0,0 +1,6 @@
+features:
+ - |
+ The :value:`pipeline.manager.serial` pipeline manager has been
+ added. It is designed to handle serialized deployment pipelines
+ where supercedent is unsuitable in the case that not all jobs run
+ on every merge.
diff --git a/tests/fixtures/layouts/serial.yaml b/tests/fixtures/layouts/serial.yaml
new file mode 100644
index 000000000..5a744ce82
--- /dev/null
+++ b/tests/fixtures/layouts/serial.yaml
@@ -0,0 +1,48 @@
+- pipeline:
+ name: deploy
+ manager: serial
+ trigger:
+ gerrit:
+ - event: change-merged
+ post-review: True
+ success:
+ gerrit: {}
+ failure:
+ gerrit: {}
+
+- job:
+ name: base
+ parent: null
+ nodeset:
+ nodes:
+ - label: ubuntu-xenial
+ name: controller
+
+- job:
+ name: job1
+ run: playbooks/job1.yaml
+
+- job:
+ name: job2
+ run: playbooks/job2.yaml
+
+- project:
+ name: org/project
+ deploy:
+ jobs:
+ - job1
+ - job2
+
+- project:
+ name: org/project1
+ deploy:
+ queue: shared
+ jobs:
+ - job1
+
+- project:
+ name: org/project2
+ deploy:
+ queue: shared
+ jobs:
+ - job1
diff --git a/tests/unit/test_serial.py b/tests/unit/test_serial.py
new file mode 100644
index 000000000..458d860db
--- /dev/null
+++ b/tests/unit/test_serial.py
@@ -0,0 +1,130 @@
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+from tests.base import (
+ ZuulTestCase,
+ simple_layout,
+)
+
+
+class TestSerial(ZuulTestCase):
+ tenant_config_file = 'config/single-tenant/main.yaml'
+
+ @simple_layout('layouts/serial.yaml')
+ def test_deploy_window(self):
+ self.executor_server.hold_jobs_in_build = True
+ A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
+ A.setMerged()
+ self.fake_gerrit.addEvent(A.getChangeMergedEvent())
+ self.waitUntilSettled()
+ # The gerrit upstream repo simulation isn't perfect -- when
+ # change A is merged above, the master ref is updated to point
+ # to that change, it doesn't actually "merge" it. The same is
+ # true for B, so if it didn't have A in its git history, then
+ # A would not appear in the jobs run for B. We simulate the
+ # correct situation by setting A as the git parent of B.
+ B = self.fake_gerrit.addFakeChange('org/project', 'master', 'B',
+ parent='refs/changes/1/1/1')
+ B.setMerged()
+ self.fake_gerrit.addEvent(B.getChangeMergedEvent())
+ self.waitUntilSettled()
+
+ self.assertEqual(len(self.builds), 2)
+ self.assertTrue(self.builds[0].hasChanges(A))
+ self.assertTrue(self.builds[1].hasChanges(A))
+ self.assertFalse(self.builds[0].hasChanges(B))
+ self.assertFalse(self.builds[1].hasChanges(B))
+
+ self.executor_server.release()
+ self.waitUntilSettled()
+
+ self.assertEqual(len(self.builds), 2)
+ self.assertTrue(self.builds[0].hasChanges(A))
+ self.assertTrue(self.builds[1].hasChanges(A))
+ self.assertTrue(self.builds[0].hasChanges(B))
+ self.assertTrue(self.builds[1].hasChanges(B))
+
+ self.executor_server.release()
+ self.waitUntilSettled()
+
+ self.assertEqual(A.reported, 1)
+ self.assertEqual(B.reported, 1)
+ self.assertHistory([
+ dict(name='job1', result='SUCCESS', changes='1,1'),
+ dict(name='job2', result='SUCCESS', changes='1,1'),
+ dict(name='job1', result='SUCCESS', changes='2,1'),
+ dict(name='job2', result='SUCCESS', changes='2,1'),
+ ], ordered=False)
+
+ @simple_layout('layouts/serial.yaml')
+ def test_deploy_shared(self):
+ # Same as test_deploy_window but with two separate projects
+ # sharing a queue.
+ self.executor_server.hold_jobs_in_build = True
+ A = self.fake_gerrit.addFakeChange('org/project1', 'master', 'A')
+ A.setMerged()
+ self.fake_gerrit.addEvent(A.getChangeMergedEvent())
+ self.waitUntilSettled()
+ B = self.fake_gerrit.addFakeChange('org/project2', 'master', 'B')
+ B.setMerged()
+ self.fake_gerrit.addEvent(B.getChangeMergedEvent())
+ self.waitUntilSettled()
+
+ self.assertEqual(len(self.builds), 1)
+ self.assertTrue(self.builds[0].hasChanges(A))
+
+ self.executor_server.release()
+ self.waitUntilSettled()
+
+ self.assertEqual(len(self.builds), 1)
+ self.assertTrue(self.builds[0].hasChanges(B))
+
+ self.executor_server.release()
+ self.waitUntilSettled()
+
+ self.assertEqual(A.reported, 1)
+ self.assertEqual(B.reported, 1)
+ self.assertHistory([
+ dict(name='job1', result='SUCCESS', changes='1,1'),
+ dict(name='job1', result='SUCCESS', changes='2,1'),
+ ], ordered=False)
+
+ @simple_layout('layouts/serial.yaml')
+ def test_deploy_unshared(self):
+ # Test two projects which don't share a queue.
+ self.executor_server.hold_jobs_in_build = True
+ A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
+ A.setMerged()
+ self.fake_gerrit.addEvent(A.getChangeMergedEvent())
+ self.waitUntilSettled()
+
+ B = self.fake_gerrit.addFakeChange('org/project1', 'master', 'B')
+ B.setMerged()
+ self.fake_gerrit.addEvent(B.getChangeMergedEvent())
+ self.waitUntilSettled()
+
+ self.assertEqual(len(self.builds), 3)
+ self.assertTrue(self.builds[0].hasChanges(A))
+ self.assertTrue(self.builds[1].hasChanges(A))
+ self.assertTrue(self.builds[2].hasChanges(B))
+ self.assertFalse(self.builds[2].hasChanges(A))
+
+ self.executor_server.release()
+ self.waitUntilSettled()
+
+ self.assertEqual(A.reported, 1)
+ self.assertEqual(B.reported, 1)
+ self.assertHistory([
+ dict(name='job1', result='SUCCESS', changes='1,1'),
+ dict(name='job2', result='SUCCESS', changes='1,1'),
+ dict(name='job1', result='SUCCESS', changes='2,1'),
+ ], ordered=False)
diff --git a/zuul/configloader.py b/zuul/configloader.py
index 629de58d5..f985804c6 100644
--- a/zuul/configloader.py
+++ b/zuul/configloader.py
@@ -29,6 +29,7 @@ from zuul.lib import yamlutil as yaml
import zuul.manager.dependent
import zuul.manager.independent
import zuul.manager.supercedent
+import zuul.manager.serial
from zuul.lib import encryption
from zuul.lib.keystorage import KeyStorage
from zuul.lib.logutil import get_annotated_logger
@@ -1178,6 +1179,7 @@ class PipelineParser(object):
def getSchema(self):
manager = vs.Any('independent',
'dependent',
+ 'serial',
'supercedent')
precedence = vs.Any('normal', 'low', 'high')
@@ -1292,6 +1294,9 @@ class PipelineParser(object):
elif manager_name == 'independent':
manager = zuul.manager.independent.IndependentPipelineManager(
self.pcontext.scheduler, pipeline)
+ elif manager_name == 'serial':
+ manager = zuul.manager.serial.SerialPipelineManager(
+ self.pcontext.scheduler, pipeline)
elif manager_name == 'supercedent':
manager = zuul.manager.supercedent.SupercedentPipelineManager(
self.pcontext.scheduler, pipeline)
diff --git a/zuul/manager/__init__.py b/zuul/manager/__init__.py
index bbc439711..8a98330d7 100644
--- a/zuul/manager/__init__.py
+++ b/zuul/manager/__init__.py
@@ -13,6 +13,7 @@
import logging
import textwrap
import urllib
+from abc import ABCMeta
from zuul import exceptions
from zuul import model
@@ -43,7 +44,7 @@ class StaticChangeQueueContextManager(object):
pass
-class PipelineManager(object):
+class PipelineManager(metaclass=ABCMeta):
"""Abstract Base Class for enqueing and processing Changes in a Pipeline"""
def __init__(self, sched, pipeline):
diff --git a/zuul/manager/dependent.py b/zuul/manager/dependent.py
index 4aa96c176..cf8c7f8ba 100644
--- a/zuul/manager/dependent.py
+++ b/zuul/manager/dependent.py
@@ -12,11 +12,10 @@
from zuul import model
from zuul.lib.logutil import get_annotated_logger
-from zuul.manager import PipelineManager, StaticChangeQueueContextManager
-from zuul.manager import DynamicChangeQueueContextManager
+from zuul.manager.shared import SharedQueuePipelineManager
-class DependentPipelineManager(PipelineManager):
+class DependentPipelineManager(SharedQueuePipelineManager):
"""PipelineManager for handling interrelated Changes.
The DependentPipelineManager puts Changes that share a Pipeline
@@ -29,69 +28,17 @@ class DependentPipelineManager(PipelineManager):
def __init__(self, *args, **kwargs):
super(DependentPipelineManager, self).__init__(*args, **kwargs)
- def buildChangeQueues(self, layout):
- self.log.debug("Building shared change queues")
- change_queues = {}
- tenant = self.pipeline.tenant
- layout_project_configs = layout.project_configs
-
- for project_name, project_configs in layout_project_configs.items():
- (trusted, project) = tenant.getProject(project_name)
- queue_name = None
- project_in_pipeline = False
- for project_config in layout.getAllProjectConfigs(project_name):
- project_pipeline_config = project_config.pipelines.get(
- self.pipeline.name)
- if project_pipeline_config is None:
- continue
- project_in_pipeline = True
- queue_name = project_pipeline_config.queue_name
- if queue_name:
- break
- if not project_in_pipeline:
- continue
- if queue_name and queue_name in change_queues:
- change_queue = change_queues[queue_name]
- else:
- p = self.pipeline
- change_queue = model.ChangeQueue(
- p,
- window=p.window,
- window_floor=p.window_floor,
- window_increase_type=p.window_increase_type,
- window_increase_factor=p.window_increase_factor,
- window_decrease_type=p.window_decrease_type,
- window_decrease_factor=p.window_decrease_factor,
- name=queue_name)
- if queue_name:
- # If this is a named queue, keep track of it in
- # case it is referenced again. Otherwise, it will
- # have a name automatically generated from its
- # constituent projects.
- change_queues[queue_name] = change_queue
- self.pipeline.addQueue(change_queue)
- self.log.debug("Created queue: %s" % change_queue)
- change_queue.addProject(project)
- self.log.debug("Added project %s to queue: %s" %
- (project, change_queue))
-
- def getChangeQueue(self, change, event, existing=None):
- log = get_annotated_logger(self.log, event)
-
- # Ignore the existing queue, since we can always get the correct queue
- # from the pipeline. This avoids enqueuing changes in a wrong queue
- # e.g. during re-configuration.
- queue = self.pipeline.getQueue(change.project)
- if queue:
- return StaticChangeQueueContextManager(queue)
- else:
- # There is no existing queue for this change. Create a
- # dynamic one for this one change's use
- change_queue = model.ChangeQueue(self.pipeline, dynamic=True)
- change_queue.addProject(change.project)
- self.pipeline.addQueue(change_queue)
- log.debug("Dynamically created queue %s", change_queue)
- return DynamicChangeQueueContextManager(change_queue)
+ def constructChangeQueue(self, queue_name):
+ p = self.pipeline
+ return model.ChangeQueue(
+ p,
+ window=p.window,
+ window_floor=p.window_floor,
+ window_increase_type=p.window_increase_type,
+ window_increase_factor=p.window_increase_factor,
+ window_decrease_type=p.window_decrease_type,
+ window_decrease_factor=p.window_decrease_factor,
+ name=queue_name)
def getNodePriority(self, item):
with self.getChangeQueue(item.change, item.event) as change_queue:
diff --git a/zuul/manager/serial.py b/zuul/manager/serial.py
new file mode 100644
index 000000000..caaa147c6
--- /dev/null
+++ b/zuul/manager/serial.py
@@ -0,0 +1,37 @@
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+from zuul import model
+from zuul.manager.shared import SharedQueuePipelineManager
+
+
+class SerialPipelineManager(SharedQueuePipelineManager):
+ """PipelineManager with shared queues and a window of 1"""
+
+ changes_merge = False
+
+ def constructChangeQueue(self, queue_name):
+ return model.ChangeQueue(
+ self.pipeline,
+ window=1,
+ window_floor=1,
+ window_increase_type='none',
+ window_decrease_type='none',
+ name=queue_name)
+
+ def dequeueItem(self, item):
+ super(SerialPipelineManager, self).dequeueItem(item)
+ # If this was a dynamic queue from a speculative change,
+ # remove the queue (if empty)
+ if item.queue.dynamic:
+ if not item.queue.queue:
+ self.pipeline.removeQueue(item.queue)
diff --git a/zuul/manager/shared.py b/zuul/manager/shared.py
new file mode 100644
index 000000000..4b286f4f1
--- /dev/null
+++ b/zuul/manager/shared.py
@@ -0,0 +1,84 @@
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+from abc import ABCMeta
+
+from zuul import model
+from zuul.lib.logutil import get_annotated_logger
+from zuul.manager import PipelineManager, StaticChangeQueueContextManager
+from zuul.manager import DynamicChangeQueueContextManager
+
+
+class SharedQueuePipelineManager(PipelineManager, metaclass=ABCMeta):
+ """Intermediate class that adds the shared-queue behavior.
+
+ This is not a full pipeline manager; it just adds the shared-queue
+ behavior to the base class and is used by the dependent and serial
+ managers.
+ """
+
+ changes_merge = False
+
+ def buildChangeQueues(self, layout):
+ self.log.debug("Building shared change queues")
+ change_queues = {}
+ tenant = self.pipeline.tenant
+ layout_project_configs = layout.project_configs
+
+ for project_name, project_configs in layout_project_configs.items():
+ (trusted, project) = tenant.getProject(project_name)
+ queue_name = None
+ project_in_pipeline = False
+ for project_config in layout.getAllProjectConfigs(project_name):
+ project_pipeline_config = project_config.pipelines.get(
+ self.pipeline.name)
+ if project_pipeline_config is None:
+ continue
+ project_in_pipeline = True
+ queue_name = project_pipeline_config.queue_name
+ if queue_name:
+ break
+ if not project_in_pipeline:
+ continue
+ if queue_name and queue_name in change_queues:
+ change_queue = change_queues[queue_name]
+ else:
+ change_queue = self.constructChangeQueue(queue_name)
+ if queue_name:
+ # If this is a named queue, keep track of it in
+ # case it is referenced again. Otherwise, it will
+ # have a name automatically generated from its
+ # constituent projects.
+ change_queues[queue_name] = change_queue
+ self.pipeline.addQueue(change_queue)
+ self.log.debug("Created queue: %s" % change_queue)
+ change_queue.addProject(project)
+ self.log.debug("Added project %s to queue: %s" %
+ (project, change_queue))
+
+ def getChangeQueue(self, change, event, existing=None):
+ log = get_annotated_logger(self.log, event)
+
+ # Ignore the existing queue, since we can always get the correct queue
+ # from the pipeline. This avoids enqueuing changes in a wrong queue
+ # e.g. during re-configuration.
+ queue = self.pipeline.getQueue(change.project)
+ if queue:
+ return StaticChangeQueueContextManager(queue)
+ else:
+ # There is no existing queue for this change. Create a
+ # dynamic one for this one change's use
+ change_queue = model.ChangeQueue(self.pipeline, dynamic=True)
+ change_queue.addProject(change.project)
+ self.pipeline.addQueue(change_queue)
+ log.debug("Dynamically created queue %s", change_queue)
+ return DynamicChangeQueueContextManager(change_queue)