summaryrefslogtreecommitdiff
path: root/zuul/manager/supercedent.py
blob: 14832a24a04b51b06cff7a97a504b5ec6507fa6a (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.

from zuul import model
from zuul.lib.logutil import get_annotated_logger
from zuul.manager import PipelineManager, DynamicChangeQueueContextManager


class SupercedentPipelineManager(PipelineManager):
    """PipelineManager with one queue per project and a window of 1"""

    changes_merge = False
    type = 'supercedent'

    def getChangeQueue(self, change, event, existing=None):
        log = get_annotated_logger(self.log, event)

        # creates a new change queue for every project-ref
        # combination.
        if existing:
            return DynamicChangeQueueContextManager(existing)

        # Don't use Pipeline.getQueue to find an existing queue
        # because we're matching project and (branch or ref).
        for queue in self.pipeline.queues:
            if (queue.queue[-1].change.project == change.project and
                ((hasattr(change, 'branch') and
                  hasattr(queue.queue[-1].change, 'branch') and
                  queue.queue[-1].change.branch == change.branch) or
                queue.queue[-1].change.ref == change.ref)):
                log.debug("Found existing queue %s", queue)
                return DynamicChangeQueueContextManager(queue)
        change_queue = model.ChangeQueue.new(
            self.pipeline.manager.current_context,
            pipeline=self.pipeline,
            window=1,
            window_floor=1,
            window_increase_type='none',
            window_decrease_type='none',
            dynamic=True)
        change_queue.addProject(change.project, None)
        self.pipeline.addQueue(change_queue)
        log.debug("Dynamically created queue %s", change_queue)
        return DynamicChangeQueueContextManager(change_queue)

    def _pruneQueues(self):
        # Leave the first item in the queue, as it's running, and the
        # last item, as it's the most recent, but remove any items in
        # between.  This is what causes the last item to "supercede"
        # any previously enqueued items (which we know aren't running
        # jobs because the window size is 1).
        for queue in self.pipeline.queues[:]:
            remove = queue.queue[1:-1]
            for item in remove:
                self.log.debug("Item %s is superceded by %s, removing" %
                               (item, queue.queue[-1]))
                self.removeItem(item)

    def addChange(self, *args, **kw):
        ret = super(SupercedentPipelineManager, self).addChange(
            *args, **kw)
        if ret:
            self._pruneQueues()
        return ret

    def dequeueItem(self, item):
        super(SupercedentPipelineManager, self).dequeueItem(item)
        # A supercedent pipeline manager dynamically removes empty
        # queues
        if not item.queue.queue:
            self.pipeline.removeQueue(item.queue)