diff options
author | Zuul <zuul@review.opendev.org> | 2020-05-07 16:22:58 +0000 |
---|---|---|
committer | Gerrit Code Review <review@openstack.org> | 2020-05-07 16:22:58 +0000 |
commit | 5996661ce0fb1238de33a2223a6eb916d516af8c (patch) | |
tree | cd0ac84a4397b529f01430117561cdd93bc2922d | |
parent | 0e85054aac82e97581476a450d07a00660b5f577 (diff) | |
parent | 22640baef4d33c80135aace1d11a223278b02e57 (diff) | |
download | zuul-5996661ce0fb1238de33a2223a6eb916d516af8c.tar.gz |
Merge "Add serial pipeline manager"
-rw-r--r-- | doc/source/examples/pipelines/gerrit-reference-pipelines.yaml | 18 | ||||
-rw-r--r-- | doc/source/reference/pipeline_def.rst | 40 | ||||
-rw-r--r-- | releasenotes/notes/serial-manager-8f2dcf924c72effe.yaml | 6 | ||||
-rw-r--r-- | tests/fixtures/layouts/serial.yaml | 48 | ||||
-rw-r--r-- | tests/unit/test_serial.py | 130 | ||||
-rw-r--r-- | zuul/configloader.py | 5 | ||||
-rw-r--r-- | zuul/manager/__init__.py | 3 | ||||
-rw-r--r-- | zuul/manager/dependent.py | 79 | ||||
-rw-r--r-- | zuul/manager/serial.py | 37 | ||||
-rw-r--r-- | zuul/manager/shared.py | 84 |
10 files changed, 377 insertions, 73 deletions
diff --git a/doc/source/examples/pipelines/gerrit-reference-pipelines.yaml b/doc/source/examples/pipelines/gerrit-reference-pipelines.yaml index a20647a03..4e0fccaf5 100644 --- a/doc/source/examples/pipelines/gerrit-reference-pipelines.yaml +++ b/doc/source/examples/pipelines/gerrit-reference-pipelines.yaml @@ -100,6 +100,24 @@ sqlreporter: - pipeline: + name: deploy + description: | + This pipeline runs jobs that operate after each change is merged + in order to deploy to production. + manager: serial + precedence: high + post-review: True + trigger: + gerrit: + - event: change-merged + success: + gerrit: {} + sqlreporter: + failure: + gerrit: {} + sqlreporter: + +- pipeline: name: release description: | When a commit is tagged as a release, this pipeline runs jobs diff --git a/doc/source/reference/pipeline_def.rst b/doc/source/reference/pipeline_def.rst index 8b73282e2..940758ba3 100644 --- a/doc/source/reference/pipeline_def.rst +++ b/doc/source/reference/pipeline_def.rst @@ -63,7 +63,18 @@ success, the pipeline reports back to Gerrit with ``Verified`` vote of .. attr:: manager :required: - There are three schemes for managing pipelines: + There are several schemes for managing pipelines. The following + table summarizes their features; each is described in detail + below. + + =========== ============ ===== ============= ========= + Manager Dependencies Merge Shared Queues Window + =========== ============ ===== ============= ========= + Independent No No No Unlimited + Dependent Yes Yes Yes Variable + Serial No No Yes 1 + Supercedent No No Project-ref 1 + =========== ============ ===== ============= ========= .. value:: independent @@ -107,6 +118,22 @@ success, the pipeline reports back to Gerrit with ``Verified`` vote of For more detail on the theory and operation of Zuul's dependent pipeline manager, see: :doc:`/discussion/gating`. + .. value:: serial + + This pipeline manager supports shared queues (like depedent + pipelines) but only one item in each shared queue is + processed at a time. + + This may be useful for post-merge pipelines which perform + partial production deployments (i.e., there are jobs with + file matchers which only deploy to affected parts of the + system). In such a case it is important for every change to + be processed, but they must still be processed one at a time + in order to ensure that the production system is not + inadvertently regressed. Support for shared queues ensures + that if multiple projects are involved deployment runs still + execute sequentially. + .. value:: supercedent This is like an independent pipeline, in that every item is @@ -124,11 +151,12 @@ success, the pipeline reports back to Gerrit with ``Verified`` vote of these cases, build resources can be conserved by avoiding building intermediate versions. - .. note:: Since this pipeline filters intermediate buildsets using - it in combination with file filters on jobs is dangerous. - In this case jobs of in between buildsets can be - unexpectedly skipped entirely. If file filters are needed - the independent pipeline manager should be used. + .. note:: Since this pipeline filters intermediate buildsets + using it in combination with file filters on jobs + is dangerous. In this case jobs of in between + buildsets can be unexpectedly skipped entirely. If + file filters are needed the ``independent`` or + ``serial`` pipeline managers should be used. .. attr:: post-review :default: false diff --git a/releasenotes/notes/serial-manager-8f2dcf924c72effe.yaml b/releasenotes/notes/serial-manager-8f2dcf924c72effe.yaml new file mode 100644 index 000000000..dd2a50ab2 --- /dev/null +++ b/releasenotes/notes/serial-manager-8f2dcf924c72effe.yaml @@ -0,0 +1,6 @@ +features: + - | + The :value:`pipeline.manager.serial` pipeline manager has been + added. It is designed to handle serialized deployment pipelines + where supercedent is unsuitable in the case that not all jobs run + on every merge. diff --git a/tests/fixtures/layouts/serial.yaml b/tests/fixtures/layouts/serial.yaml new file mode 100644 index 000000000..5a744ce82 --- /dev/null +++ b/tests/fixtures/layouts/serial.yaml @@ -0,0 +1,48 @@ +- pipeline: + name: deploy + manager: serial + trigger: + gerrit: + - event: change-merged + post-review: True + success: + gerrit: {} + failure: + gerrit: {} + +- job: + name: base + parent: null + nodeset: + nodes: + - label: ubuntu-xenial + name: controller + +- job: + name: job1 + run: playbooks/job1.yaml + +- job: + name: job2 + run: playbooks/job2.yaml + +- project: + name: org/project + deploy: + jobs: + - job1 + - job2 + +- project: + name: org/project1 + deploy: + queue: shared + jobs: + - job1 + +- project: + name: org/project2 + deploy: + queue: shared + jobs: + - job1 diff --git a/tests/unit/test_serial.py b/tests/unit/test_serial.py new file mode 100644 index 000000000..458d860db --- /dev/null +++ b/tests/unit/test_serial.py @@ -0,0 +1,130 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from tests.base import ( + ZuulTestCase, + simple_layout, +) + + +class TestSerial(ZuulTestCase): + tenant_config_file = 'config/single-tenant/main.yaml' + + @simple_layout('layouts/serial.yaml') + def test_deploy_window(self): + self.executor_server.hold_jobs_in_build = True + A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A') + A.setMerged() + self.fake_gerrit.addEvent(A.getChangeMergedEvent()) + self.waitUntilSettled() + # The gerrit upstream repo simulation isn't perfect -- when + # change A is merged above, the master ref is updated to point + # to that change, it doesn't actually "merge" it. The same is + # true for B, so if it didn't have A in its git history, then + # A would not appear in the jobs run for B. We simulate the + # correct situation by setting A as the git parent of B. + B = self.fake_gerrit.addFakeChange('org/project', 'master', 'B', + parent='refs/changes/1/1/1') + B.setMerged() + self.fake_gerrit.addEvent(B.getChangeMergedEvent()) + self.waitUntilSettled() + + self.assertEqual(len(self.builds), 2) + self.assertTrue(self.builds[0].hasChanges(A)) + self.assertTrue(self.builds[1].hasChanges(A)) + self.assertFalse(self.builds[0].hasChanges(B)) + self.assertFalse(self.builds[1].hasChanges(B)) + + self.executor_server.release() + self.waitUntilSettled() + + self.assertEqual(len(self.builds), 2) + self.assertTrue(self.builds[0].hasChanges(A)) + self.assertTrue(self.builds[1].hasChanges(A)) + self.assertTrue(self.builds[0].hasChanges(B)) + self.assertTrue(self.builds[1].hasChanges(B)) + + self.executor_server.release() + self.waitUntilSettled() + + self.assertEqual(A.reported, 1) + self.assertEqual(B.reported, 1) + self.assertHistory([ + dict(name='job1', result='SUCCESS', changes='1,1'), + dict(name='job2', result='SUCCESS', changes='1,1'), + dict(name='job1', result='SUCCESS', changes='2,1'), + dict(name='job2', result='SUCCESS', changes='2,1'), + ], ordered=False) + + @simple_layout('layouts/serial.yaml') + def test_deploy_shared(self): + # Same as test_deploy_window but with two separate projects + # sharing a queue. + self.executor_server.hold_jobs_in_build = True + A = self.fake_gerrit.addFakeChange('org/project1', 'master', 'A') + A.setMerged() + self.fake_gerrit.addEvent(A.getChangeMergedEvent()) + self.waitUntilSettled() + B = self.fake_gerrit.addFakeChange('org/project2', 'master', 'B') + B.setMerged() + self.fake_gerrit.addEvent(B.getChangeMergedEvent()) + self.waitUntilSettled() + + self.assertEqual(len(self.builds), 1) + self.assertTrue(self.builds[0].hasChanges(A)) + + self.executor_server.release() + self.waitUntilSettled() + + self.assertEqual(len(self.builds), 1) + self.assertTrue(self.builds[0].hasChanges(B)) + + self.executor_server.release() + self.waitUntilSettled() + + self.assertEqual(A.reported, 1) + self.assertEqual(B.reported, 1) + self.assertHistory([ + dict(name='job1', result='SUCCESS', changes='1,1'), + dict(name='job1', result='SUCCESS', changes='2,1'), + ], ordered=False) + + @simple_layout('layouts/serial.yaml') + def test_deploy_unshared(self): + # Test two projects which don't share a queue. + self.executor_server.hold_jobs_in_build = True + A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A') + A.setMerged() + self.fake_gerrit.addEvent(A.getChangeMergedEvent()) + self.waitUntilSettled() + + B = self.fake_gerrit.addFakeChange('org/project1', 'master', 'B') + B.setMerged() + self.fake_gerrit.addEvent(B.getChangeMergedEvent()) + self.waitUntilSettled() + + self.assertEqual(len(self.builds), 3) + self.assertTrue(self.builds[0].hasChanges(A)) + self.assertTrue(self.builds[1].hasChanges(A)) + self.assertTrue(self.builds[2].hasChanges(B)) + self.assertFalse(self.builds[2].hasChanges(A)) + + self.executor_server.release() + self.waitUntilSettled() + + self.assertEqual(A.reported, 1) + self.assertEqual(B.reported, 1) + self.assertHistory([ + dict(name='job1', result='SUCCESS', changes='1,1'), + dict(name='job2', result='SUCCESS', changes='1,1'), + dict(name='job1', result='SUCCESS', changes='2,1'), + ], ordered=False) diff --git a/zuul/configloader.py b/zuul/configloader.py index 629de58d5..f985804c6 100644 --- a/zuul/configloader.py +++ b/zuul/configloader.py @@ -29,6 +29,7 @@ from zuul.lib import yamlutil as yaml import zuul.manager.dependent import zuul.manager.independent import zuul.manager.supercedent +import zuul.manager.serial from zuul.lib import encryption from zuul.lib.keystorage import KeyStorage from zuul.lib.logutil import get_annotated_logger @@ -1178,6 +1179,7 @@ class PipelineParser(object): def getSchema(self): manager = vs.Any('independent', 'dependent', + 'serial', 'supercedent') precedence = vs.Any('normal', 'low', 'high') @@ -1292,6 +1294,9 @@ class PipelineParser(object): elif manager_name == 'independent': manager = zuul.manager.independent.IndependentPipelineManager( self.pcontext.scheduler, pipeline) + elif manager_name == 'serial': + manager = zuul.manager.serial.SerialPipelineManager( + self.pcontext.scheduler, pipeline) elif manager_name == 'supercedent': manager = zuul.manager.supercedent.SupercedentPipelineManager( self.pcontext.scheduler, pipeline) diff --git a/zuul/manager/__init__.py b/zuul/manager/__init__.py index ac0c410aa..469d2c1b3 100644 --- a/zuul/manager/__init__.py +++ b/zuul/manager/__init__.py @@ -13,6 +13,7 @@ import logging import textwrap import urllib +from abc import ABCMeta from zuul import exceptions from zuul import model @@ -43,7 +44,7 @@ class StaticChangeQueueContextManager(object): pass -class PipelineManager(object): +class PipelineManager(metaclass=ABCMeta): """Abstract Base Class for enqueing and processing Changes in a Pipeline""" def __init__(self, sched, pipeline): diff --git a/zuul/manager/dependent.py b/zuul/manager/dependent.py index 4aa96c176..cf8c7f8ba 100644 --- a/zuul/manager/dependent.py +++ b/zuul/manager/dependent.py @@ -12,11 +12,10 @@ from zuul import model from zuul.lib.logutil import get_annotated_logger -from zuul.manager import PipelineManager, StaticChangeQueueContextManager -from zuul.manager import DynamicChangeQueueContextManager +from zuul.manager.shared import SharedQueuePipelineManager -class DependentPipelineManager(PipelineManager): +class DependentPipelineManager(SharedQueuePipelineManager): """PipelineManager for handling interrelated Changes. The DependentPipelineManager puts Changes that share a Pipeline @@ -29,69 +28,17 @@ class DependentPipelineManager(PipelineManager): def __init__(self, *args, **kwargs): super(DependentPipelineManager, self).__init__(*args, **kwargs) - def buildChangeQueues(self, layout): - self.log.debug("Building shared change queues") - change_queues = {} - tenant = self.pipeline.tenant - layout_project_configs = layout.project_configs - - for project_name, project_configs in layout_project_configs.items(): - (trusted, project) = tenant.getProject(project_name) - queue_name = None - project_in_pipeline = False - for project_config in layout.getAllProjectConfigs(project_name): - project_pipeline_config = project_config.pipelines.get( - self.pipeline.name) - if project_pipeline_config is None: - continue - project_in_pipeline = True - queue_name = project_pipeline_config.queue_name - if queue_name: - break - if not project_in_pipeline: - continue - if queue_name and queue_name in change_queues: - change_queue = change_queues[queue_name] - else: - p = self.pipeline - change_queue = model.ChangeQueue( - p, - window=p.window, - window_floor=p.window_floor, - window_increase_type=p.window_increase_type, - window_increase_factor=p.window_increase_factor, - window_decrease_type=p.window_decrease_type, - window_decrease_factor=p.window_decrease_factor, - name=queue_name) - if queue_name: - # If this is a named queue, keep track of it in - # case it is referenced again. Otherwise, it will - # have a name automatically generated from its - # constituent projects. - change_queues[queue_name] = change_queue - self.pipeline.addQueue(change_queue) - self.log.debug("Created queue: %s" % change_queue) - change_queue.addProject(project) - self.log.debug("Added project %s to queue: %s" % - (project, change_queue)) - - def getChangeQueue(self, change, event, existing=None): - log = get_annotated_logger(self.log, event) - - # Ignore the existing queue, since we can always get the correct queue - # from the pipeline. This avoids enqueuing changes in a wrong queue - # e.g. during re-configuration. - queue = self.pipeline.getQueue(change.project) - if queue: - return StaticChangeQueueContextManager(queue) - else: - # There is no existing queue for this change. Create a - # dynamic one for this one change's use - change_queue = model.ChangeQueue(self.pipeline, dynamic=True) - change_queue.addProject(change.project) - self.pipeline.addQueue(change_queue) - log.debug("Dynamically created queue %s", change_queue) - return DynamicChangeQueueContextManager(change_queue) + def constructChangeQueue(self, queue_name): + p = self.pipeline + return model.ChangeQueue( + p, + window=p.window, + window_floor=p.window_floor, + window_increase_type=p.window_increase_type, + window_increase_factor=p.window_increase_factor, + window_decrease_type=p.window_decrease_type, + window_decrease_factor=p.window_decrease_factor, + name=queue_name) def getNodePriority(self, item): with self.getChangeQueue(item.change, item.event) as change_queue: diff --git a/zuul/manager/serial.py b/zuul/manager/serial.py new file mode 100644 index 000000000..caaa147c6 --- /dev/null +++ b/zuul/manager/serial.py @@ -0,0 +1,37 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from zuul import model +from zuul.manager.shared import SharedQueuePipelineManager + + +class SerialPipelineManager(SharedQueuePipelineManager): + """PipelineManager with shared queues and a window of 1""" + + changes_merge = False + + def constructChangeQueue(self, queue_name): + return model.ChangeQueue( + self.pipeline, + window=1, + window_floor=1, + window_increase_type='none', + window_decrease_type='none', + name=queue_name) + + def dequeueItem(self, item): + super(SerialPipelineManager, self).dequeueItem(item) + # If this was a dynamic queue from a speculative change, + # remove the queue (if empty) + if item.queue.dynamic: + if not item.queue.queue: + self.pipeline.removeQueue(item.queue) diff --git a/zuul/manager/shared.py b/zuul/manager/shared.py new file mode 100644 index 000000000..4b286f4f1 --- /dev/null +++ b/zuul/manager/shared.py @@ -0,0 +1,84 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from abc import ABCMeta + +from zuul import model +from zuul.lib.logutil import get_annotated_logger +from zuul.manager import PipelineManager, StaticChangeQueueContextManager +from zuul.manager import DynamicChangeQueueContextManager + + +class SharedQueuePipelineManager(PipelineManager, metaclass=ABCMeta): + """Intermediate class that adds the shared-queue behavior. + + This is not a full pipeline manager; it just adds the shared-queue + behavior to the base class and is used by the dependent and serial + managers. + """ + + changes_merge = False + + def buildChangeQueues(self, layout): + self.log.debug("Building shared change queues") + change_queues = {} + tenant = self.pipeline.tenant + layout_project_configs = layout.project_configs + + for project_name, project_configs in layout_project_configs.items(): + (trusted, project) = tenant.getProject(project_name) + queue_name = None + project_in_pipeline = False + for project_config in layout.getAllProjectConfigs(project_name): + project_pipeline_config = project_config.pipelines.get( + self.pipeline.name) + if project_pipeline_config is None: + continue + project_in_pipeline = True + queue_name = project_pipeline_config.queue_name + if queue_name: + break + if not project_in_pipeline: + continue + if queue_name and queue_name in change_queues: + change_queue = change_queues[queue_name] + else: + change_queue = self.constructChangeQueue(queue_name) + if queue_name: + # If this is a named queue, keep track of it in + # case it is referenced again. Otherwise, it will + # have a name automatically generated from its + # constituent projects. + change_queues[queue_name] = change_queue + self.pipeline.addQueue(change_queue) + self.log.debug("Created queue: %s" % change_queue) + change_queue.addProject(project) + self.log.debug("Added project %s to queue: %s" % + (project, change_queue)) + + def getChangeQueue(self, change, event, existing=None): + log = get_annotated_logger(self.log, event) + + # Ignore the existing queue, since we can always get the correct queue + # from the pipeline. This avoids enqueuing changes in a wrong queue + # e.g. during re-configuration. + queue = self.pipeline.getQueue(change.project) + if queue: + return StaticChangeQueueContextManager(queue) + else: + # There is no existing queue for this change. Create a + # dynamic one for this one change's use + change_queue = model.ChangeQueue(self.pipeline, dynamic=True) + change_queue.addProject(change.project) + self.pipeline.addQueue(change_queue) + log.debug("Dynamically created queue %s", change_queue) + return DynamicChangeQueueContextManager(change_queue) |