1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from zuul import model
from zuul.lib.logutil import get_annotated_logger
from zuul.manager import PipelineManager, DynamicChangeQueueContextManager
class SupercedentPipelineManager(PipelineManager):
"""PipelineManager with one queue per project and a window of 1"""
changes_merge = False
type = 'supercedent'
def getChangeQueue(self, change, event, existing=None):
log = get_annotated_logger(self.log, event)
# creates a new change queue for every project-ref
# combination.
if existing:
return DynamicChangeQueueContextManager(existing)
# Don't use Pipeline.getQueue to find an existing queue
# because we're matching project and (branch or ref).
for queue in self.pipeline.queues:
if (queue.queue[-1].change.project == change.project and
((hasattr(change, 'branch') and
hasattr(queue.queue[-1].change, 'branch') and
queue.queue[-1].change.branch == change.branch) or
queue.queue[-1].change.ref == change.ref)):
log.debug("Found existing queue %s", queue)
return DynamicChangeQueueContextManager(queue)
change_queue = model.ChangeQueue.new(
self.pipeline.manager.current_context,
pipeline=self.pipeline,
window=1,
window_floor=1,
window_increase_type='none',
window_decrease_type='none',
dynamic=True)
change_queue.addProject(change.project, None)
self.pipeline.addQueue(change_queue)
log.debug("Dynamically created queue %s", change_queue)
return DynamicChangeQueueContextManager(change_queue)
def _pruneQueues(self):
# Leave the first item in the queue, as it's running, and the
# last item, as it's the most recent, but remove any items in
# between. This is what causes the last item to "supercede"
# any previously enqueued items (which we know aren't running
# jobs because the window size is 1).
for queue in self.pipeline.queues[:]:
remove = queue.queue[1:-1]
for item in remove:
self.log.debug("Item %s is superceded by %s, removing" %
(item, queue.queue[-1]))
self.removeItem(item)
def addChange(self, *args, **kw):
ret = super(SupercedentPipelineManager, self).addChange(
*args, **kw)
if ret:
self._pruneQueues()
return ret
def dequeueItem(self, item):
super(SupercedentPipelineManager, self).dequeueItem(item)
# A supercedent pipeline manager dynamically removes empty
# queues
if not item.queue.queue:
self.pipeline.removeQueue(item.queue)
|