1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import logging
import threading
import time
from collections import defaultdict
from zuul import model
from zuul.lib import tracing
from zuul.lib.logutil import get_annotated_logger
from zuul.zk.event_queues import (
PipelineResultEventQueue,
NodepoolEventElection
)
from zuul.zk.exceptions import LockException
from zuul.zk.nodepool import NodeRequestEvent, ZooKeeperNodepool
class Nodepool(object):
log = logging.getLogger('zuul.nodepool')
# The kind of resources we report stats on. We need a complete
# list in order to report 0 level gauges.
resource_types = ('ram', 'cores', 'instances')
def __init__(self, zk_client, system_id, statsd, scheduler=False):
self._stopped = False
self.system_id = system_id
self.statsd = statsd
self.election_won = False
if scheduler:
# Only enable the node request cache/callback for the scheduler.
self.stop_watcher_event = threading.Event()
self.zk_nodepool = ZooKeeperNodepool(
zk_client,
enable_node_request_cache=True,
node_request_event_callback=self._handleNodeRequestEvent,
enable_node_cache=True)
self.election = NodepoolEventElection(zk_client)
self.event_thread = threading.Thread(target=self.runEventElection)
self.event_thread.daemon = True
self.event_thread.start()
else:
self.stop_watcher_event = None
self.zk_nodepool = ZooKeeperNodepool(zk_client)
self.election = None
self.event_thread = None
self.pipeline_result_events = PipelineResultEventQueue.createRegistry(
zk_client
)
def addResources(self, target, source):
for key, value in source.items():
if key in self.resource_types:
target[key] += value
def runEventElection(self):
while not self._stopped:
try:
self.log.debug("Running nodepool watcher election")
self.election.run(self._electionWon)
except Exception:
self.log.exception("Error in nodepool watcher:")
def stop(self):
self.log.debug("Stopping")
self._stopped = True
if self.election:
self.election.cancel()
self.stop_watcher_event.set()
self.event_thread.join()
# Delete the election to avoid a FD leak in tests.
del self.election
def _sendNodesProvisionedEvent(self, request):
tracing.endSavedSpan(request.span_info, attributes={
"request_id": request.id,
"state": request.state,
})
tenant_name = request.tenant_name
pipeline_name = request.pipeline_name
event = model.NodesProvisionedEvent(request.id, request.build_set_uuid)
self.pipeline_result_events[tenant_name][pipeline_name].put(event)
def _electionWon(self):
self.log.info("Watching nodepool requests")
# Iterate over every completed request in case we are starting
# up or missed something in the transition.
self.election_won = True
try:
for rid in self.zk_nodepool.getNodeRequests():
request = self.zk_nodepool.getNodeRequest(rid, cached=True)
if request.requestor != self.system_id:
continue
if (request.state in {model.STATE_FULFILLED,
model.STATE_FAILED}):
self._sendNodesProvisionedEvent(request)
# Now resume normal event processing.
self.stop_watcher_event.wait()
finally:
self.stop_watcher_event.clear()
self.election_won = False
def _handleNodeRequestEvent(self, request, event):
log = get_annotated_logger(self.log, event=request.event_id)
if request.requestor != self.system_id:
return
log.debug("Node request %s %s", request, request.state)
if event == NodeRequestEvent.COMPLETED:
try:
if self.election_won:
if self.election.is_still_valid():
self.emitStats(request)
self._sendNodesProvisionedEvent(request)
else:
self.stop_watcher_event.set()
except Exception:
# If there are any errors moving the event, re-run the
# election.
if self.stop_watcher_event is not None:
self.stop_watcher_event.set()
raise
def emitStats(self, request):
# Implements the following :
# counter zuul.nodepool.requests.<state>.total
# counter zuul.nodepool.requests.<state>.label.<label>
# counter zuul.nodepool.requests.<state>.size.<size>
# timer zuul.nodepool.requests.(fulfilled|failed)
# timer zuul.nodepool.requests.(fulfilled|failed).<label>
# timer zuul.nodepool.requests.(fulfilled|failed).<size>
if not self.statsd:
return
pipe = self.statsd.pipeline()
state = request.state
dt = None
if request.canceled:
state = 'canceled'
elif request.state in (model.STATE_FULFILLED, model.STATE_FAILED):
dt = (request.state_time - request.requested_time) * 1000
key = 'zuul.nodepool.requests.%s' % state
pipe.incr(key + ".total")
if dt:
pipe.timing(key, dt)
for label in request.labels:
pipe.incr(key + '.label.%s' % label)
if dt:
pipe.timing(key + '.label.%s' % label, dt)
pipe.incr(key + '.size.%s' % len(request.labels))
if dt:
pipe.timing(key + '.size.%s' % len(request.labels), dt)
pipe.send()
def emitStatsResourceCounters(self, tenant, project, resources, duration):
if not self.statsd:
return
for resource, value in resources.items():
key = 'zuul.nodepool.resources.in_use.tenant.{tenant}.{resource}'
self.statsd.incr(
key, value * duration, tenant=tenant, resource=resource)
for resource, value in resources.items():
key = 'zuul.nodepool.resources.in_use.project.' \
'{project}.{resource}'
self.statsd.incr(
key, value * duration, project=project, resource=resource)
def requestNodes(self, build_set_uuid, job, tenant_name, pipeline_name,
provider, priority, relative_priority, event=None):
log = get_annotated_logger(self.log, event)
labels = [n.label for n in job.nodeset.getNodes()]
if event:
event_id = event.zuul_event_id
else:
event_id = None
req = model.NodeRequest(self.system_id, build_set_uuid, tenant_name,
pipeline_name, job.name, labels, provider,
relative_priority, event_id)
if job.nodeset.nodes:
self.zk_nodepool.submitNodeRequest(req, priority)
# Logged after submission so that we have the request id
log.info("Submitted node request %s", req)
self.emitStats(req)
else:
# Directly fulfill the node request without submitting it to ZK,
# so nodepool doesn't have to deal with it.
log.info("Fulfilling empty node request %s", req)
req.state = model.STATE_FULFILLED
return req
def cancelRequest(self, request):
log = get_annotated_logger(self.log, request.event_id)
log.info("Canceling node request %s", request)
try:
request.canceled = True
if self.zk_nodepool.deleteNodeRequest(request.id):
self.emitStats(request)
except Exception:
log.exception("Error deleting node request:")
def reviseRequest(self, request, relative_priority=None):
'''Attempt to update the node request, if it is not currently being
processed.
:param: NodeRequest request: The request to update.
:param relative_priority int: If supplied, the new relative
priority to set on the request.
'''
log = get_annotated_logger(self.log, request.event_id)
if relative_priority is None:
return
try:
self.zk_nodepool.lockNodeRequest(request, blocking=False)
except LockException:
# It may be locked by nodepool, which is fine.
log.debug("Unable to revise locked node request %s", request)
return False
try:
old_priority = request.relative_priority
request.relative_priority = relative_priority
self.zk_nodepool.storeNodeRequest(request)
log.debug("Revised relative priority of "
"node request %s from %s to %s",
request, old_priority, relative_priority)
except Exception:
log.exception("Unable to update node request %s", request)
finally:
try:
self.zk_nodepool.unlockNodeRequest(request)
except Exception:
log.exception("Unable to unlock node request %s", request)
def holdNodeSet(self, nodeset, request, build, duration,
zuul_event_id=None):
'''
Perform a hold on the given set of nodes.
:param NodeSet nodeset: The object containing the set of nodes to hold.
:param HoldRequest request: Hold request associated with the NodeSet
'''
log = get_annotated_logger(self.log, zuul_event_id)
log.info("Holding nodeset %s", nodeset)
resources = defaultdict(int)
nodes = nodeset.getNodes()
log.info(
"Nodeset %s with %s nodes was in use for %s seconds for build %s "
"for project %s",
nodeset, len(nodeset.nodes), duration, build, request.project)
for node in nodes:
if node.lock is None:
raise Exception(f"Node {node} is not locked")
if node.resources:
self.addResources(resources, node.resources)
node.state = model.STATE_HOLD
node.hold_job = " ".join([request.tenant,
request.project,
request.job,
request.ref_filter])
node.comment = request.reason
if request.node_expiration:
node.hold_expiration = request.node_expiration
self.zk_nodepool.storeNode(node)
request.nodes.append(dict(
build=build.uuid,
nodes=[node.id for node in nodes],
))
request.current_count += 1
# Request has been used at least the maximum number of times so set
# the expiration time so that it can be auto-deleted.
if request.current_count >= request.max_count and not request.expired:
request.expired = time.time()
# Give ourselves a few seconds to try to obtain the lock rather than
# immediately give up.
self.zk_nodepool.lockHoldRequest(request, timeout=5)
try:
self.zk_nodepool.storeHoldRequest(request)
except Exception:
# If we fail to update the request count, we won't consider it
# a real autohold error by passing the exception up. It will
# just get used more than the original count specified.
# It's possible to leak some held nodes, though, which would
# require manual node deletes.
log.exception("Unable to update hold request %s:", request)
finally:
# Although any exceptions thrown here are handled higher up in
# _doBuildCompletedEvent, we always want to try to unlock it.
self.zk_nodepool.unlockHoldRequest(request)
if resources and duration:
self.emitStatsResourceCounters(
request.tenant, request.project, resources, duration)
def useNodeSet(self, nodeset, tenant_name, project_name,
zuul_event_id=None):
log = get_annotated_logger(self.log, zuul_event_id)
log.info("Setting nodeset %s in use", nodeset)
user_data = dict(
zuul_system=self.system_id,
project_name=project_name,
)
for node in nodeset.getNodes():
if node.lock is None:
raise Exception("Node %s is not locked", node)
node.state = model.STATE_IN_USE
node.user_data = user_data
self.zk_nodepool.storeNode(node)
def returnNodeSet(self, nodeset, build, tenant_name, project_name,
duration, zuul_event_id=None):
log = get_annotated_logger(self.log, zuul_event_id)
log.info("Returning nodeset %s", nodeset)
resources = defaultdict(int)
for node in nodeset.getNodes():
if node.lock is None:
log.error("Node %s is not locked", node)
else:
try:
if node.state == model.STATE_IN_USE:
if node.resources:
self.addResources(resources, node.resources)
node.state = model.STATE_USED
self.zk_nodepool.storeNode(node)
except Exception:
log.exception("Exception storing node %s "
"while unlocking:", node)
self.unlockNodeSet(nodeset)
log.info("Nodeset %s with %s nodes was in use "
"for %s seconds for build %s for project %s",
nodeset, len(nodeset.nodes), duration, build, project_name)
if resources and duration:
self.emitStatsResourceCounters(
tenant_name, project_name, resources, duration)
def unlockNodeSet(self, nodeset):
self._unlockNodes(nodeset.getNodes())
def _unlockNodes(self, nodes):
for node in nodes:
try:
self.zk_nodepool.unlockNode(node)
except Exception:
self.log.exception("Error unlocking node:")
def lockNodes(self, request, nodeset):
log = get_annotated_logger(self.log, event=request.event_id)
# Try to lock all of the supplied nodes. If any lock fails,
# try to unlock any which have already been locked before
# re-raising the error.
locked_nodes = []
try:
for node_id, node in zip(request.nodes, nodeset.getNodes()):
self.zk_nodepool.updateNode(node, node_id)
if node.allocated_to != request.id:
raise Exception("Node %s allocated to %s, not %s" %
(node.id, node.allocated_to, request.id))
log.debug("Locking node %s", node)
self.zk_nodepool.lockNode(node, timeout=30)
# Check the allocated_to again to ensure that nodepool didn't
# re-allocate the nodes to a different node request while we
# were locking them.
if node.allocated_to != request.id:
raise Exception(
"Node %s was reallocated during locking %s, not %s" %
(node.id, node.allocated_to, request.id))
locked_nodes.append(node)
except Exception:
log.exception("Error locking nodes:")
self._unlockNodes(locked_nodes)
raise
def getNodeSet(self, request, job_nodeset):
"""
Get a NodeSet object with info about real nodes.
:param NodeRequest request: The fulfilled NodeRequest
:param NodeSet job_nodeset: The NodeSet object attached to the job
:returns: A new NodeSet object which contains information from
nodepool about the actual allocated nodes.
"""
# A copy of the nodeset with information about the real nodes
nodeset = job_nodeset.copy()
# If we didn't request nodes and the request is fulfilled then just
# return. We don't have to do anything in this case.
if not request.labels and request.fulfilled:
return nodeset
# Load the node info from ZK.
for node_id, node in zip(request.nodes, nodeset.getNodes()):
self.zk_nodepool.updateNode(node, node_id)
return nodeset
def acceptNodes(self, request, nodeset):
# Accept the nodes supplied by request, mutate nodeset with
# the real node information.
locked = False
if request.fulfilled:
# If the request succeeded, try to lock the nodes.
try:
nodes = self.lockNodes(request, nodeset)
locked = True
except Exception:
log = get_annotated_logger(self.log, request.event_id)
log.exception("Error locking nodes:")
request.failed = True
# Regardless of whether locking (or even the request)
# succeeded, delete the request.
if not self.deleteNodeRequest(request.id, locked, request.event_id):
request.failed = True
self.unlockNodeSet(request.nodeset)
if request.failed:
raise Exception("Accepting nodes failed")
return nodes
def deleteNodeRequest(self, request_id, locked=False, event_id=None):
log = get_annotated_logger(self.log, event_id)
log.debug("Deleting node request %s", request_id)
try:
self.zk_nodepool.deleteNodeRequest(request_id)
except Exception:
log.exception("Error deleting node request:")
return False
return True
def getNodeRequests(self):
"""Get all node requests submitted by Zuul
Note this relies entirely on the internal cache.
:returns: An iterator of NodeRequest objects created by this Zuul
system.
"""
for req_id in self.zk_nodepool.getNodeRequests(cached=True):
req = self.zk_nodepool.getNodeRequest(req_id, cached=True)
if req.requestor == self.system_id:
yield req
def getNodes(self):
"""Get all nodes in use or held by Zuul
Note this relies entirely on the internal cache.
:returns: An iterator of Node objects in use (or held) by this Zuul
system.
"""
for node_id in self.zk_nodepool.getNodes(cached=True):
node = self.zk_nodepool.getNode(node_id)
if (node.user_data and
isinstance(node.user_data, dict) and
node.user_data.get('zuul_system') == self.system_id):
yield node
def emitStatsTotals(self, abide):
if not self.statsd:
return
total_requests = 0
tenant_requests = defaultdict(int)
in_use_resources_by_project = {}
in_use_resources_by_tenant = {}
total_resources_by_tenant = {}
empty_resource_dict = dict([(k, 0) for k in self.resource_types])
# Initialize zero values for gauges
for tenant in abide.tenants.values():
tenant_requests[tenant.name] = 0
in_use_resources_by_tenant[tenant.name] =\
empty_resource_dict.copy()
total_resources_by_tenant[tenant.name] = empty_resource_dict.copy()
for project in tenant.all_projects:
in_use_resources_by_project[project.canonical_name] =\
empty_resource_dict.copy()
# Count node requests
for req in self.getNodeRequests():
total_requests += 1
if not req.tenant_name:
continue
tenant_requests[req.tenant_name] += 1
self.statsd.gauge('zuul.nodepool.current_requests',
total_requests)
for tenant, request_count in tenant_requests.items():
self.statsd.gauge(
"zuul.nodepool.tenant.{tenant}.current_requests",
request_count,
tenant=tenant)
# Count nodes
for node in self.zk_nodepool.nodeIterator(cached=True):
if not node.resources:
continue
tenant_name = node.tenant_name
if tenant_name in total_resources_by_tenant and \
node.requestor == self.system_id:
self.addResources(
total_resources_by_tenant[tenant_name],
node.resources)
# below here, we are only interested in nodes which are either
# in-use, used, or currently held by this zuul system
if node.state not in {model.STATE_IN_USE,
model.STATE_USED,
model.STATE_HOLD}:
continue
if not node.user_data:
continue
if node.user_data.get('zuul_system') != self.system_id:
continue
if tenant_name in in_use_resources_by_tenant:
self.addResources(
in_use_resources_by_tenant[tenant_name],
node.resources)
project_name = node.user_data.get('project_name')
if project_name in in_use_resources_by_project:
self.addResources(
in_use_resources_by_project[project_name],
node.resources)
for tenant, resources in total_resources_by_tenant.items():
for resource, value in resources.items():
key = 'zuul.nodepool.resources.total.tenant.' \
'{tenant}.{resource}'
self.statsd.gauge(key, value, tenant=tenant, resource=resource)
for tenant, resources in in_use_resources_by_tenant.items():
for resource, value in resources.items():
key = 'zuul.nodepool.resources.in_use.tenant.' \
'{tenant}.{resource}'
self.statsd.gauge(key, value, tenant=tenant, resource=resource)
for project, resources in in_use_resources_by_project.items():
for resource, value in resources.items():
key = 'zuul.nodepool.resources.in_use.project.' \
'{project}.{resource}'
self.statsd.gauge(
key, value, project=project, resource=resource)
|