summaryrefslogtreecommitdiff
path: root/zuul/merger/client.py
blob: 3f0f2478de8f41be6cae0889f0f27dd20faa5abc (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
# Copyright 2014 OpenStack Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.

import logging
from uuid import uuid4

from zuul.lib.config import get_default
from zuul.lib.logutil import get_annotated_logger
import zuul.lib.tracing as tracing
from zuul.model import (
    FilesChangesCompletedEvent,
    MergeCompletedEvent,
    MergeRequest,
    PRECEDENCE_HIGH,
    PRECEDENCE_NORMAL,
)
from zuul.zk.event_queues import PipelineResultEventQueue
from zuul.zk.merger import MergerApi
from zuul.zk.exceptions import JobRequestNotFound

from kazoo.exceptions import BadVersionError, NoNodeError
from opentelemetry import trace

_JOB_TYPE_TO_SPAN_NAME = {
    MergeRequest.MERGE: "Merge",
    MergeRequest.CAT: "Cat",
    MergeRequest.REF_STATE: "RefState",
    MergeRequest.FILES_CHANGES: "FilesChanges",
}


class MergeClient(object):
    log = logging.getLogger("zuul.MergeClient")

    _merger_api_class = MergerApi
    tracer = trace.get_tracer("zuul")

    def __init__(self, config, sched):
        self.config = config
        self.sched = sched
        self.git_timeout = get_default(
            self.config, 'merger', 'git_timeout', 300)
        self.merger_api = self._merger_api_class(self.sched.zk_client)
        self.result_events = PipelineResultEventQueue.createRegistry(
            self.sched.zk_client
        )

    def submitJob(
        self,
        job_type,
        data,
        build_set,
        precedence=PRECEDENCE_NORMAL,
        needs_result=False,
        event=None,
    ):
        # We need the tenant, pipeline and queue names to put the merge result
        # in the correct queue. The only source for those values in this
        # context is the buildset. If no buildset is provided, we can't provide
        # a result event. In those cases a user of this function can fall back
        # to the return value which provides the result as a future stored in a
        # ZooKeeper path.
        build_set_uuid = None
        tenant_name = None
        pipeline_name = None
        parent_span = None

        if build_set is not None:
            build_set_uuid = build_set.uuid
            tenant_name = build_set.item.pipeline.tenant.name
            pipeline_name = build_set.item.pipeline.name
            parent_span = tracing.restoreSpan(build_set.span_info)

        with trace.use_span(parent_span):
            job_span = self.tracer.start_span(_JOB_TYPE_TO_SPAN_NAME[job_type])

        uuid = str(uuid4().hex)

        log = get_annotated_logger(self.log, event)
        log.debug("Submitting job %s with data %s", uuid, data)

        with trace.use_span(job_span):
            request = MergeRequest(
                uuid=uuid,
                job_type=job_type,
                build_set_uuid=build_set_uuid,
                tenant_name=tenant_name,
                pipeline_name=pipeline_name,
                event_id=event.zuul_event_id if event else None,
                precedence=precedence,
                span_info=tracing.getSpanInfo(job_span),
            )
        return self.merger_api.submit(request, data,
                                      needs_result=needs_result)

    def mergeChanges(self, items, build_set, files=None, dirs=None,
                     repo_state=None, precedence=PRECEDENCE_NORMAL,
                     branches=None, event=None):
        data = dict(items=items,
                    files=files,
                    dirs=dirs,
                    repo_state=repo_state,
                    branches=branches)
        self.submitJob(
            MergeRequest.MERGE, data, build_set, precedence, event=event)

    def getRepoState(self, items, build_set, precedence=PRECEDENCE_NORMAL,
                     branches=None, event=None):
        data = dict(items=items, branches=branches)
        self.submitJob(
            MergeRequest.REF_STATE, data, build_set, precedence, event=event)

    def getFiles(self, connection_name, project_name, branch, files, dirs=[],
                 precedence=PRECEDENCE_HIGH, event=None):
        data = dict(connection=connection_name,
                    project=project_name,
                    branch=branch,
                    files=files,
                    dirs=dirs)
        job = self.submitJob(
            MergeRequest.CAT,
            data,
            None,
            precedence,
            needs_result=True,
            event=event,
        )
        return job

    def getFilesChanges(self, connection_name, project_name, branch,
                        tosha=None, precedence=PRECEDENCE_HIGH,
                        build_set=None, needs_result=False, event=None):
        data = dict(connection=connection_name,
                    project=project_name,
                    branch=branch,
                    tosha=tosha)
        job = self.submitJob(
            MergeRequest.FILES_CHANGES,
            data,
            build_set,
            precedence,
            needs_result=needs_result,
            event=event,
        )
        return job

    def cleanupLostMergeRequests(self):
        for merge_request in self.merger_api.lostRequests():
            try:
                self.cleanupLostMergeRequest(merge_request)
            except Exception:
                self.log.exception("Exception cleaning up lost merge request:")

    def cleanupLostMergeRequest(self, merge_request):
        # Provide a result either via a result future or a result event
        if merge_request.result_path:
            self.log.debug(
                "Merge request cleanup providing synchronous result "
                "via future for %s", merge_request)
            result = {}
            self.merger_api.reportResult(merge_request, result)

        elif merge_request.build_set_uuid:
            self.log.debug(
                "Merge request cleanup providing asynchronous result "
                "via result event for %s", merge_request)
            if merge_request.job_type == MergeRequest.FILES_CHANGES:
                event = FilesChangesCompletedEvent(
                    merge_request.uuid,
                    merge_request.build_set_uuid,
                    files=None,
                    elapsed_time=None,
                    span_info=merge_request.span_info,
                )
            else:
                event = MergeCompletedEvent(
                    merge_request.uuid,
                    merge_request.build_set_uuid,
                    merged=False,
                    updated=False,
                    commit=None,
                    files=None,
                    repo_state=None,
                    item_in_branches=None,
                    errors=None,
                    elapsed_time=None,
                    span_info=merge_request.span_info,
                )
            try:
                self.result_events[merge_request.tenant_name][
                    merge_request.pipeline_name].put(event)
            except NoNodeError:
                self.log.warning("Pipeline was removed: %s",
                                 merge_request.pipeline_name)

        merge_request.state = MergeRequest.COMPLETED
        try:
            self.merger_api.update(merge_request)
            # No need to unlock the build, as it is by definition unlocked.
            # TODO (felix): If we want to optimize ZK requests, we could only
            # call the remove() here.
            self.merger_api.remove(merge_request)
        except JobRequestNotFound as e:
            self.log.warning("Could not complete merge: %s", str(e))
            # In case we re-created the lock directory, still remove
            # the request for the side effect of removing the lock.
            self.merger_api.remove(merge_request)
            return
        except BadVersionError:
            # There could be a race condition:
            # The merge request is found by lost_merge_requests in
            # state RUNNING but gets completed/unlocked before the
            # is_locked() check. Since we use the znode version, the
            # update will fail in this case and we can simply ignore
            # the exception.
            return

    def cancel(self, job):
        try:
            # Try to remove the request first
            request = self.merger_api.get(job.request_path)
            if request:
                if self.merger_api.lock(request, blocking=False):
                    try:
                        self.merger_api.remove(request)
                    finally:
                        self.merger_api.unlock(request)
        finally:
            # Regardless of that, remove the waiter node
            job.cancel()