summaryrefslogtreecommitdiff
path: root/zuul/zk/executor.py
blob: b8f828c551ef51b943f6a4ae90eae8f6287f9b0e (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
# Copyright 2021 BMW Group
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.

import logging

from kazoo.exceptions import NoNodeError

from zuul.lib.collections import DefaultKeyDict
from zuul.model import BuildRequest
from zuul.zk.job_request_queue import JobRequestQueue


class ExecutorQueue(JobRequestQueue):
    log = logging.getLogger("zuul.ExecutorQueue")
    request_class = BuildRequest

    def __init__(self, client, root,
                 initial_state_getter,
                 use_cache=True,
                 request_callback=None,
                 event_callback=None):
        self.log.debug("Creating executor queue at root %s", root)
        self._initial_state_getter = initial_state_getter
        super().__init__(
            client, root, use_cache, request_callback, event_callback)

    @property
    def initial_state(self):
        # This supports holding requests in tests
        return self._initial_state_getter()

    def lostRequests(self):
        # Get a list of requests which are running but not locked by
        # any client.
        yield from filter(
            lambda b: not self.isLocked(b),
            self.inState(self.request_class.RUNNING,
                         self.request_class.PAUSED),
        )


class ExecutorApi:
    log = logging.getLogger("zuul.ExecutorApi")

    def __init__(self, client, zone_filter=None, use_cache=True,
                 build_request_callback=None,
                 build_event_callback=None):
        self.client = client
        self.use_cache = use_cache
        self.request_callback = build_request_callback
        self.event_callback = build_event_callback
        self.zone_filter = zone_filter
        self._watched_zones = set()
        self.root = '/zuul/executor'
        self.unzoned_root = f"{self.root}/unzoned"
        self.zones_root = f"{self.root}/zones"

        self.zone_queues = DefaultKeyDict(
            lambda zone: ExecutorQueue(
                self.client,
                self._getZoneRoot(zone),
                self._getInitialState,
                self.use_cache,
                self.request_callback,
                self.event_callback))

        if zone_filter is None:
            self.registerAllZones()
        else:
            for zone in zone_filter:
                # For the side effect of creating a queue
                self.zone_queues[zone]

    def _getInitialState(self):
        return BuildRequest.REQUESTED

    def _getZoneRoot(self, zone):
        if zone is None:
            return self.unzoned_root
        else:
            return f"{self.zones_root}/{zone}"

    def registerAllZones(self):
        # Register a child watch that listens to new zones and automatically
        # registers to them.
        def watch_zones(children):
            for zone in children:
                # For the side effect of creating a queue
                self.zone_queues[zone]

        self.client.client.ChildrenWatch(self.zones_root, watch_zones)
        # For the side effect of creating a queue
        self.zone_queues[None]

    def _getAllZones(self):
        # Get a list of all zones without using the cache.
        try:
            # Get all available zones from ZooKeeper
            zones = self.client.client.get_children(self.zones_root)
            zones.append(None)
        except NoNodeError:
            zones = [None]
        return zones

    # Override JobRequestQueue methods to accomodate the zone dict.

    def inState(self, *states):
        requests = []
        for queue in self.zone_queues.values():
            requests.extend(queue.inState(*states))
        return sorted(requests)

    def next(self):
        for request in self.inState(BuildRequest.REQUESTED):
            for queue in self.zone_queues.values():
                request2 = queue._cached_requests.get(request.path)
                if (request2 and
                    request2.state == BuildRequest.REQUESTED):
                    yield request2
                    break

    def submit(self, request, params):
        return self.zone_queues[request.zone].submit(request, params)

    def getRequestUpdater(self, request):
        return self.zone_queues[request.zone].getRequestUpdater(request)

    def update(self, request):
        return self.zone_queues[request.zone].update(request)

    def reportResult(self, request, result):
        return self.zone_queues[request.zone].reportResult(request)

    def get(self, path):
        if path.startswith(self.zones_root):
            # Remove zone root so we end up with: <zone>/requests/<uuid>
            rel_path = path[len(f"{self.zones_root}/"):]
            zone = rel_path.split("/")[0]
        else:
            zone = None
        return self.zone_queues[zone].get(path)

    def getByUuid(self, uuid):
        """Find a build request by its UUID.

        This method will search for the UUID in all available zones.
        """
        for zone in self._getAllZones():
            request = self.zone_queues[zone].getByUuid(uuid)
            if request:
                # TODO (felix): Remove the zone return value after a
                # deprecation period. This is kept for backwards compatibility
                # until all executors store their zone information in the
                # worker_info dictionary on the BuildRequest.
                return request, zone
        return None, None

    def remove(self, request):
        return self.zone_queues[request.zone].remove(request)

    def requestResume(self, request):
        return self.zone_queues[request.zone].requestResume(request)

    def requestCancel(self, request):
        return self.zone_queues[request.zone].requestCancel(request)

    def fulfillResume(self, request):
        return self.zone_queues[request.zone].fulfillResume(request)

    def fulfillCancel(self, request):
        return self.zone_queues[request.zone].fulfillCancel(request)

    def lock(self, request, *args, **kw):
        return self.zone_queues[request.zone].lock(request, *args, **kw)

    def unlock(self, request):
        return self.zone_queues[request.zone].unlock(request)

    def isLocked(self, request):
        return self.zone_queues[request.zone].isLocked(request)

    def lostRequests(self):
        for queue in self.zone_queues.values():
            yield from queue.lostRequests()

    def cleanup(self, age=300):
        for queue in self.zone_queues.values():
            queue.cleanup(age)

    def clearParams(self, request):
        return self.zone_queues[request.zone].clearParams(request)

    def getParams(self, request):
        return self.zone_queues[request.zone].getParams(request)

    def _getAllRequestIds(self):
        ret = []
        for queue in self.zone_queues.values():
            ret.extend(queue._getAllRequestIds())
        return ret