1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
|
# Copyright 2021 BMW Group
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import logging
from kazoo.exceptions import NoNodeError
from zuul.lib.collections import DefaultKeyDict
from zuul.model import BuildRequest
from zuul.zk.job_request_queue import JobRequestQueue
class ExecutorQueue(JobRequestQueue):
log = logging.getLogger("zuul.ExecutorQueue")
request_class = BuildRequest
def __init__(self, client, root,
initial_state_getter,
use_cache=True,
request_callback=None,
event_callback=None):
self.log.debug("Creating executor queue at root %s", root)
self._initial_state_getter = initial_state_getter
super().__init__(
client, root, use_cache, request_callback, event_callback)
@property
def initial_state(self):
# This supports holding requests in tests
return self._initial_state_getter()
def lostRequests(self):
# Get a list of requests which are running but not locked by
# any client.
yield from filter(
lambda b: not self.isLocked(b),
self.inState(self.request_class.RUNNING,
self.request_class.PAUSED),
)
class ExecutorApi:
log = logging.getLogger("zuul.ExecutorApi")
def __init__(self, client, zone_filter=None, use_cache=True,
build_request_callback=None,
build_event_callback=None):
self.client = client
self.use_cache = use_cache
self.request_callback = build_request_callback
self.event_callback = build_event_callback
self.zone_filter = zone_filter
self._watched_zones = set()
self.root = '/zuul/executor'
self.unzoned_root = f"{self.root}/unzoned"
self.zones_root = f"{self.root}/zones"
self.zone_queues = DefaultKeyDict(
lambda zone: ExecutorQueue(
self.client,
self._getZoneRoot(zone),
self._getInitialState,
self.use_cache,
self.request_callback,
self.event_callback))
if zone_filter is None:
self.registerAllZones()
else:
for zone in zone_filter:
# For the side effect of creating a queue
self.zone_queues[zone]
def _getInitialState(self):
return BuildRequest.REQUESTED
def _getZoneRoot(self, zone):
if zone is None:
return self.unzoned_root
else:
return f"{self.zones_root}/{zone}"
def registerAllZones(self):
# Register a child watch that listens to new zones and automatically
# registers to them.
def watch_zones(children):
for zone in children:
# For the side effect of creating a queue
self.zone_queues[zone]
self.client.client.ChildrenWatch(self.zones_root, watch_zones)
# For the side effect of creating a queue
self.zone_queues[None]
def _getAllZones(self):
# Get a list of all zones without using the cache.
try:
# Get all available zones from ZooKeeper
zones = self.client.client.get_children(self.zones_root)
zones.append(None)
except NoNodeError:
zones = [None]
return zones
# Override JobRequestQueue methods to accomodate the zone dict.
def inState(self, *states):
requests = []
for queue in self.zone_queues.values():
requests.extend(queue.inState(*states))
return sorted(requests)
def next(self):
for request in self.inState(BuildRequest.REQUESTED):
for queue in self.zone_queues.values():
request2 = queue._cached_requests.get(request.path)
if (request2 and
request2.state == BuildRequest.REQUESTED):
yield request2
break
def submit(self, request, params):
return self.zone_queues[request.zone].submit(request, params)
def getRequestUpdater(self, request):
return self.zone_queues[request.zone].getRequestUpdater(request)
def update(self, request):
return self.zone_queues[request.zone].update(request)
def reportResult(self, request, result):
return self.zone_queues[request.zone].reportResult(request)
def get(self, path):
if path.startswith(self.zones_root):
# Remove zone root so we end up with: <zone>/requests/<uuid>
rel_path = path[len(f"{self.zones_root}/"):]
zone = rel_path.split("/")[0]
else:
zone = None
return self.zone_queues[zone].get(path)
def getByUuid(self, uuid):
"""Find a build request by its UUID.
This method will search for the UUID in all available zones.
"""
for zone in self._getAllZones():
request = self.zone_queues[zone].getByUuid(uuid)
if request:
# TODO (felix): Remove the zone return value after a
# deprecation period. This is kept for backwards compatibility
# until all executors store their zone information in the
# worker_info dictionary on the BuildRequest.
return request, zone
return None, None
def remove(self, request):
return self.zone_queues[request.zone].remove(request)
def requestResume(self, request):
return self.zone_queues[request.zone].requestResume(request)
def requestCancel(self, request):
return self.zone_queues[request.zone].requestCancel(request)
def fulfillResume(self, request):
return self.zone_queues[request.zone].fulfillResume(request)
def fulfillCancel(self, request):
return self.zone_queues[request.zone].fulfillCancel(request)
def lock(self, request, *args, **kw):
return self.zone_queues[request.zone].lock(request, *args, **kw)
def unlock(self, request):
return self.zone_queues[request.zone].unlock(request)
def isLocked(self, request):
return self.zone_queues[request.zone].isLocked(request)
def lostRequests(self):
for queue in self.zone_queues.values():
yield from queue.lostRequests()
def cleanup(self, age=300):
for queue in self.zone_queues.values():
queue.cleanup(age)
def clearParams(self, request):
return self.zone_queues[request.zone].clearParams(request)
def getParams(self, request):
return self.zone_queues[request.zone].getParams(request)
def _getAllRequestIds(self):
ret = []
for queue in self.zone_queues.values():
ret.extend(queue._getAllRequestIds())
return ret
|