summaryrefslogtreecommitdiff
path: root/zuul/manager/independent.py
blob: aeeff5adcc3671bd4f7381ad6f2f8082e688e54a (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.

from zuul import model
from zuul.lib.logutil import get_annotated_logger
from zuul.manager import PipelineManager, DynamicChangeQueueContextManager


class IndependentPipelineManager(PipelineManager):
    """PipelineManager that puts every Change into its own ChangeQueue."""

    changes_merge = False
    type = 'independent'

    def getChangeQueue(self, change, event, existing=None):
        log = get_annotated_logger(self.log, event)

        # We ignore any shared change queues on the pipeline and
        # instead create a new change queue for every change.
        if existing:
            return DynamicChangeQueueContextManager(existing)
        change_queue = model.ChangeQueue.new(
            self.pipeline.manager.current_context,
            pipeline=self.pipeline,
            dynamic=True)
        change_queue.addProject(change.project, None)
        self.pipeline.addQueue(change_queue)
        log.debug("Dynamically created queue %s", change_queue)
        return DynamicChangeQueueContextManager(change_queue)

    def enqueueChangesAhead(self, change, event, quiet, ignore_requirements,
                            change_queue, history=None, dependency_graph=None,
                            warnings=None):
        log = get_annotated_logger(self.log, event)

        history = history if history is not None else []
        if hasattr(change, 'number'):
            history.append(change)
        else:
            # Don't enqueue dependencies ahead of a non-change ref.
            return True

        abort, needed_changes = self.getMissingNeededChanges(
            change, change_queue, event,
            dependency_graph=dependency_graph)
        if abort:
            return False
        if not needed_changes:
            return True
        log.debug("  Changes %s must be merged ahead of %s" % (
            needed_changes, change))
        for needed_change in needed_changes:
            # This differs from the dependent pipeline by enqueuing
            # changes ahead as "not live", that is, not intended to
            # have jobs run.  Also, pipeline requirements are always
            # ignored (which is safe because the changes are not
            # live).
            if needed_change not in history:
                r = self.addChange(needed_change, event, quiet=True,
                                   ignore_requirements=True, live=False,
                                   change_queue=change_queue, history=history,
                                   dependency_graph=dependency_graph)
                if not r:
                    return False
        return True

    def getMissingNeededChanges(self, change, change_queue, event,
                                dependency_graph=None):
        log = get_annotated_logger(self.log, event)

        if self.pipeline.ignore_dependencies:
            return False, []
        log.debug("Checking for changes needed by %s:" % change)
        # Return true if okay to proceed enqueing this change,
        # false if the change should not be enqueued.
        if not isinstance(change, model.Change):
            log.debug("  %s does not support dependencies" % type(change))
            return False, []
        if not change.getNeedsChanges(
                self.useDependenciesByTopic(change.project)):
            log.debug("  No changes needed")
            return False, []
        changes_needed = []
        abort = False
        for needed_change in self.resolveChangeReferences(
                change.getNeedsChanges(
                    self.useDependenciesByTopic(change.project))):
            log.debug("  Change %s needs change %s:" % (
                change, needed_change))
            if needed_change.is_merged:
                log.debug("  Needed change is merged")
                continue

            if dependency_graph is not None:
                log.debug("  Adding change %s to dependency graph for "
                          "change %s", needed_change, change)
                node = dependency_graph.setdefault(change, [])
                node.append(needed_change)

            if self.isChangeAlreadyInQueue(needed_change, change_queue):
                log.debug("  Needed change is already ahead in the queue")
                continue
            log.debug("  Change %s is needed" % needed_change)
            if needed_change not in changes_needed:
                changes_needed.append(needed_change)
                continue
            # This differs from the dependent pipeline check in not
            # verifying that the dependent change is mergable.
        return abort, changes_needed

    def dequeueItem(self, item):
        super(IndependentPipelineManager, self).dequeueItem(item)
        # An independent pipeline manager dynamically removes empty
        # queues
        if not item.queue.queue:
            self.pipeline.removeQueue(item.queue)