1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import logging
from zuul import model
from zuul.lib.logutil import get_annotated_logger
from zuul.zk import LockException
class Nodepool(object):
log = logging.getLogger('zuul.nodepool')
def __init__(self, scheduler):
self.requests = {}
self.sched = scheduler
def emitStats(self, request):
# Implements the following :
# counter zuul.nodepool.requests.<state>.total
# counter zuul.nodepool.requests.<state>.label.<label>
# counter zuul.nodepool.requests.<state>.size.<size>
# timer zuul.nodepool.requests.(fulfilled|failed)
# timer zuul.nodepool.requests.(fulfilled|failed).<label>
# timer zuul.nodepool.requests.(fulfilled|failed).<size>
# gauge zuul.nodepool.current_requests
if not self.sched.statsd:
return
statsd = self.sched.statsd
pipe = statsd.pipeline()
state = request.state
dt = None
if request.canceled:
state = 'canceled'
elif request.state in (model.STATE_FULFILLED, model.STATE_FAILED):
dt = int((request.state_time - request.requested_time) * 1000)
key = 'zuul.nodepool.requests.%s' % state
pipe.incr(key + ".total")
if dt:
pipe.timing(key, dt)
for node in request.nodeset.getNodes():
pipe.incr(key + '.label.%s' % node.label)
if dt:
pipe.timing(key + '.label.%s' % node.label, dt)
pipe.incr(key + '.size.%s' % len(request.nodeset.nodes))
if dt:
pipe.timing(key + '.size.%s' % len(request.nodeset.nodes), dt)
pipe.gauge('zuul.nodepool.current_requests', len(self.requests))
pipe.send()
def requestNodes(self, build_set, job, relative_priority, event=None):
log = get_annotated_logger(self.log, event)
# Create a copy of the nodeset to represent the actual nodes
# returned by nodepool.
nodeset = job.nodeset.copy()
req = model.NodeRequest(self.sched.hostname, build_set, job,
nodeset, relative_priority, event=event)
self.requests[req.uid] = req
if nodeset.nodes:
self.sched.zk.submitNodeRequest(req, self._updateNodeRequest)
# Logged after submission so that we have the request id
log.info("Submitted node request %s", req)
self.emitStats(req)
else:
log.info("Fulfilling empty node request %s", req)
req.state = model.STATE_FULFILLED
self.sched.onNodesProvisioned(req)
del self.requests[req.uid]
return req
def cancelRequest(self, request):
log = get_annotated_logger(self.log, request.event_id)
log.info("Canceling node request %s", request)
if request.uid in self.requests:
request.canceled = True
try:
self.sched.zk.deleteNodeRequest(request)
except Exception:
log.exception("Error deleting node request:")
def reviseRequest(self, request, relative_priority=None):
'''Attempt to update the node request, if it is not currently being
processed.
:param: NodeRequest request: The request to update.
:param relative_priority int: If supplied, the new relative
priority to set on the request.
'''
log = get_annotated_logger(self.log, request.event_id)
if relative_priority is None:
return
try:
self.sched.zk.lockNodeRequest(request, blocking=False)
except LockException:
# It may be locked by nodepool, which is fine.
log.debug("Unable to revise locked node request %s", request)
return False
try:
old_priority = request.relative_priority
request.relative_priority = relative_priority
self.sched.zk.storeNodeRequest(request)
log.debug("Revised relative priority of "
"node request %s from %s to %s",
request, old_priority, relative_priority)
except Exception:
log.exception("Unable to update node request %s", request)
finally:
try:
self.sched.zk.unlockNodeRequest(request)
except Exception:
log.exception("Unable to unlock node request %s", request)
def holdNodeSet(self, nodeset, autohold_key):
'''
Perform a hold on the given set of nodes.
:param NodeSet nodeset: The object containing the set of nodes to hold.
:param set autohold_key: A set with the tenant/project/job names
associated with the given NodeSet.
'''
self.log.info("Holding nodeset %s" % (nodeset,))
(hold_iterations,
reason,
node_hold_expiration) = self.sched.autohold_requests[autohold_key]
nodes = nodeset.getNodes()
for node in nodes:
if node.lock is None:
raise Exception("Node %s is not locked" % (node,))
node.state = model.STATE_HOLD
node.hold_job = " ".join(autohold_key)
node.comment = reason
if node_hold_expiration:
node.hold_expiration = node_hold_expiration
self.sched.zk.storeNode(node)
# We remove the autohold when the number of nodes in hold
# is equal to or greater than (run iteration count can be
# altered) the number of nodes used in a single job run
# times the number of run iterations requested.
nodes_in_hold = self.sched.zk.heldNodeCount(autohold_key)
if nodes_in_hold >= len(nodes) * hold_iterations:
self.log.debug("Removing autohold for %s", autohold_key)
del self.sched.autohold_requests[autohold_key]
def useNodeSet(self, nodeset):
self.log.info("Setting nodeset %s in use" % (nodeset,))
for node in nodeset.getNodes():
if node.lock is None:
raise Exception("Node %s is not locked" % (node,))
node.state = model.STATE_IN_USE
self.sched.zk.storeNode(node)
def returnNodeSet(self, nodeset, build=None):
self.log.info("Returning nodeset %s" % (nodeset,))
if (build and build.start_time and build.end_time and
build.build_set and build.build_set.item and
build.build_set.item.change and
build.build_set.item.change.project):
duration = build.end_time - build.start_time
project = build.build_set.item.change.project
self.log.info("Nodeset %s with %s nodes was in use "
"for %s seconds for build %s for project %s",
nodeset, len(nodeset.nodes), duration, build,
project)
for node in nodeset.getNodes():
if node.lock is None:
self.log.error("Node %s is not locked" % (node,))
else:
try:
if node.state == model.STATE_IN_USE:
node.state = model.STATE_USED
self.sched.zk.storeNode(node)
except Exception:
self.log.exception("Exception storing node %s "
"while unlocking:" % (node,))
self._unlockNodes(nodeset.getNodes())
def unlockNodeSet(self, nodeset):
self._unlockNodes(nodeset.getNodes())
def _unlockNodes(self, nodes):
for node in nodes:
try:
self.sched.zk.unlockNode(node)
except Exception:
self.log.exception("Error unlocking node:")
def lockNodeSet(self, nodeset, request_id):
self._lockNodes(nodeset.getNodes(), request_id)
def _lockNodes(self, nodes, request_id):
# Try to lock all of the supplied nodes. If any lock fails,
# try to unlock any which have already been locked before
# re-raising the error.
locked_nodes = []
try:
for node in nodes:
if node.allocated_to != request_id:
raise Exception("Node %s allocated to %s, not %s" %
(node.id, node.allocated_to, request_id))
self.log.debug("Locking node %s" % (node,))
self.sched.zk.lockNode(node, timeout=30)
locked_nodes.append(node)
except Exception:
self.log.exception("Error locking nodes:")
self._unlockNodes(locked_nodes)
raise
def _updateNodeRequest(self, request, deleted):
log = get_annotated_logger(self.log, request.event_id)
# Return False to indicate that we should stop watching the
# node.
log.debug("Updating node request %s", request)
if request.uid not in self.requests:
log.debug("Request %s is unknown", request.uid)
return False
if request.canceled:
del self.requests[request.uid]
self.emitStats(request)
return False
# TODOv3(jeblair): handle allocation failure
if deleted:
log.debug("Resubmitting lost node request %s", request)
request.id = None
self.sched.zk.submitNodeRequest(request, self._updateNodeRequest)
elif request.state in (model.STATE_FULFILLED, model.STATE_FAILED):
log.info("Node request %s %s", request, request.state)
# Give our results to the scheduler.
self.sched.onNodesProvisioned(request)
del self.requests[request.uid]
self.emitStats(request)
# Stop watching this request node.
return False
return True
def acceptNodes(self, request, request_id):
log = get_annotated_logger(self.log, request.event_id)
# Called by the scheduler when it wants to accept and lock
# nodes for (potential) use. Return False if there is a
# problem with the request (canceled or retrying), True if it
# is ready to be acted upon (success or failure).
log.info("Accepting node request %s", request)
if request_id != request.id:
log.info("Skipping node accept for %s (resubmitted as %s)",
request_id, request.id)
return False
if request.canceled:
log.info("Ignoring canceled node request %s", request)
# The request was already deleted when it was canceled
return False
# If we didn't request nodes and the request is fulfilled then just
# return. We don't have to do anything in this case. Further don't even
# ask ZK for the request as empty requests are not put into ZK.
if not request.nodeset.nodes and request.fulfilled:
return True
# Make sure the request still exists. It's possible it could have
# disappeared if we lost the ZK session between when the fulfillment
# response was added to our queue, and when we actually get around to
# processing it. Nodepool will automatically reallocate the assigned
# nodes in that situation.
try:
if not self.sched.zk.nodeRequestExists(request):
log.info("Request %s no longer exists, resubmitting",
request.id)
request.id = None
request.state = model.STATE_REQUESTED
self.requests[request.uid] = request
self.sched.zk.submitNodeRequest(
request, self._updateNodeRequest)
return False
except Exception:
# If we cannot retrieve the node request from ZK we probably lost
# the connection and thus the ZK session. Resubmitting the node
# request probably doesn't make sense at this point in time as it
# is likely to directly fail again. So just log the problem
# with zookeeper and fail here.
log.exception("Error getting node request %s:", request_id)
request.failed = True
return True
locked = False
if request.fulfilled:
# If the request suceeded, try to lock the nodes.
try:
self.lockNodeSet(request.nodeset, request.id)
locked = True
except Exception:
log.exception("Error locking nodes:")
request.failed = True
# Regardless of whether locking (or even the request)
# succeeded, delete the request.
log.debug("Deleting node request %s", request)
try:
self.sched.zk.deleteNodeRequest(request)
except Exception:
log.exception("Error deleting node request:")
request.failed = True
# If deleting the request failed, and we did lock the
# nodes, unlock the nodes since we're not going to use
# them.
if locked:
self.unlockNodeSet(request.nodeset)
return True
|