summaryrefslogtreecommitdiff
path: root/zuul/nodepool.py
blob: 6a6afce58335bcbf271d7cfa0945d2a48c90f0ec (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.

import logging
import threading
import time

from collections import defaultdict
from zuul import model
from zuul.lib import tracing
from zuul.lib.logutil import get_annotated_logger
from zuul.zk.event_queues import (
    PipelineResultEventQueue,
    NodepoolEventElection
)
from zuul.zk.exceptions import LockException
from zuul.zk.nodepool import NodeRequestEvent, ZooKeeperNodepool


class Nodepool(object):
    log = logging.getLogger('zuul.nodepool')
    # The kind of resources we report stats on.  We need a complete
    # list in order to report 0 level gauges.
    resource_types = ('ram', 'cores', 'instances')

    def __init__(self, zk_client, system_id, statsd, scheduler=False):
        self._stopped = False
        self.system_id = system_id
        self.statsd = statsd

        self.election_won = False
        if scheduler:
            # Only enable the node request cache/callback for the scheduler.
            self.stop_watcher_event = threading.Event()
            self.zk_nodepool = ZooKeeperNodepool(
                zk_client,
                enable_node_request_cache=True,
                node_request_event_callback=self._handleNodeRequestEvent,
                enable_node_cache=True)
            self.election = NodepoolEventElection(zk_client)
            self.event_thread = threading.Thread(target=self.runEventElection)
            self.event_thread.daemon = True
            self.event_thread.start()
        else:
            self.stop_watcher_event = None
            self.zk_nodepool = ZooKeeperNodepool(zk_client)
            self.election = None
            self.event_thread = None

        self.pipeline_result_events = PipelineResultEventQueue.createRegistry(
            zk_client
        )

    def addResources(self, target, source):
        for key, value in source.items():
            if key in self.resource_types:
                target[key] += value

    def runEventElection(self):
        while not self._stopped:
            try:
                self.log.debug("Running nodepool watcher election")
                self.election.run(self._electionWon)
            except Exception:
                self.log.exception("Error in nodepool watcher:")

    def stop(self):
        self.log.debug("Stopping")
        self._stopped = True
        if self.election:
            self.election.cancel()
            self.stop_watcher_event.set()
            self.event_thread.join()
            # Delete the election to avoid a FD leak in tests.
            del self.election

    def _sendNodesProvisionedEvent(self, request):
        tracing.endSavedSpan(request.span_info, attributes={
            "request_id": request.id,
            "state": request.state,
        })
        tenant_name = request.tenant_name
        pipeline_name = request.pipeline_name
        event = model.NodesProvisionedEvent(request.id, request.build_set_uuid)
        self.pipeline_result_events[tenant_name][pipeline_name].put(event)

    def _electionWon(self):
        self.log.info("Watching nodepool requests")
        # Iterate over every completed request in case we are starting
        # up or missed something in the transition.
        self.election_won = True
        try:
            for rid in self.zk_nodepool.getNodeRequests():
                request = self.zk_nodepool.getNodeRequest(rid, cached=True)
                if request.requestor != self.system_id:
                    continue
                if (request.state in {model.STATE_FULFILLED,
                                      model.STATE_FAILED}):
                    self._sendNodesProvisionedEvent(request)
            # Now resume normal event processing.
            self.stop_watcher_event.wait()
        finally:
            self.stop_watcher_event.clear()
            self.election_won = False

    def _handleNodeRequestEvent(self, request, event):
        log = get_annotated_logger(self.log, event=request.event_id)

        if request.requestor != self.system_id:
            return

        log.debug("Node request %s %s", request, request.state)
        if event == NodeRequestEvent.COMPLETED:
            try:
                if self.election_won:
                    if self.election.is_still_valid():
                        self.emitStats(request)
                        self._sendNodesProvisionedEvent(request)
                    else:
                        self.stop_watcher_event.set()
            except Exception:
                # If there are any errors moving the event, re-run the
                # election.
                if self.stop_watcher_event is not None:
                    self.stop_watcher_event.set()
                raise

    def emitStats(self, request):
        # Implements the following :
        #  counter zuul.nodepool.requests.<state>.total
        #  counter zuul.nodepool.requests.<state>.label.<label>
        #  counter zuul.nodepool.requests.<state>.size.<size>
        #  timer   zuul.nodepool.requests.(fulfilled|failed)
        #  timer   zuul.nodepool.requests.(fulfilled|failed).<label>
        #  timer   zuul.nodepool.requests.(fulfilled|failed).<size>
        if not self.statsd:
            return
        pipe = self.statsd.pipeline()
        state = request.state
        dt = None

        if request.canceled:
            state = 'canceled'
        elif request.state in (model.STATE_FULFILLED, model.STATE_FAILED):
            dt = (request.state_time - request.requested_time) * 1000

        key = 'zuul.nodepool.requests.%s' % state
        pipe.incr(key + ".total")

        if dt:
            pipe.timing(key, dt)
        for label in request.labels:
            pipe.incr(key + '.label.%s' % label)
            if dt:
                pipe.timing(key + '.label.%s' % label, dt)
        pipe.incr(key + '.size.%s' % len(request.labels))
        if dt:
            pipe.timing(key + '.size.%s' % len(request.labels), dt)
        pipe.send()

    def emitStatsResourceCounters(self, tenant, project, resources, duration):
        if not self.statsd:
            return

        for resource, value in resources.items():
            key = 'zuul.nodepool.resources.in_use.tenant.{tenant}.{resource}'
            self.statsd.incr(
                key, value * duration, tenant=tenant, resource=resource)
        for resource, value in resources.items():
            key = 'zuul.nodepool.resources.in_use.project.' \
                  '{project}.{resource}'
            self.statsd.incr(
                key, value * duration, project=project, resource=resource)

    def requestNodes(self, build_set_uuid, job, tenant_name, pipeline_name,
                     provider, priority, relative_priority, event=None):
        log = get_annotated_logger(self.log, event)
        labels = [n.label for n in job.nodeset.getNodes()]
        if event:
            event_id = event.zuul_event_id
        else:
            event_id = None
        req = model.NodeRequest(self.system_id, build_set_uuid, tenant_name,
                                pipeline_name, job.name, labels, provider,
                                relative_priority, event_id)

        if job.nodeset.nodes:
            self.zk_nodepool.submitNodeRequest(req, priority)
            # Logged after submission so that we have the request id
            log.info("Submitted node request %s", req)
            self.emitStats(req)
        else:
            # Directly fulfill the node request without submitting it to ZK,
            # so nodepool doesn't have to deal with it.
            log.info("Fulfilling empty node request %s", req)
            req.state = model.STATE_FULFILLED
        return req

    def cancelRequest(self, request):
        log = get_annotated_logger(self.log, request.event_id)
        log.info("Canceling node request %s", request)
        try:
            request.canceled = True
            if self.zk_nodepool.deleteNodeRequest(request.id):
                self.emitStats(request)
        except Exception:
            log.exception("Error deleting node request:")

    def reviseRequest(self, request, relative_priority=None):
        '''Attempt to update the node request, if it is not currently being
        processed.

        :param: NodeRequest request: The request to update.
        :param relative_priority int: If supplied, the new relative
            priority to set on the request.

        '''
        log = get_annotated_logger(self.log, request.event_id)
        if relative_priority is None:
            return
        try:
            self.zk_nodepool.lockNodeRequest(request, blocking=False)
        except LockException:
            # It may be locked by nodepool, which is fine.
            log.debug("Unable to revise locked node request %s", request)
            return False
        try:
            old_priority = request.relative_priority
            request.relative_priority = relative_priority
            self.zk_nodepool.storeNodeRequest(request)
            log.debug("Revised relative priority of "
                      "node request %s from %s to %s",
                      request, old_priority, relative_priority)
        except Exception:
            log.exception("Unable to update node request %s", request)
        finally:
            try:
                self.zk_nodepool.unlockNodeRequest(request)
            except Exception:
                log.exception("Unable to unlock node request %s", request)

    def holdNodeSet(self, nodeset, request, build, duration,
                    zuul_event_id=None):
        '''
        Perform a hold on the given set of nodes.

        :param NodeSet nodeset: The object containing the set of nodes to hold.
        :param HoldRequest request: Hold request associated with the NodeSet
        '''
        log = get_annotated_logger(self.log, zuul_event_id)
        log.info("Holding nodeset %s", nodeset)
        resources = defaultdict(int)
        nodes = nodeset.getNodes()

        log.info(
            "Nodeset %s with %s nodes was in use for %s seconds for build %s "
            "for project %s",
            nodeset, len(nodeset.nodes), duration, build, request.project)

        for node in nodes:
            if node.lock is None:
                raise Exception(f"Node {node} is not locked")
            if node.resources:
                self.addResources(resources, node.resources)
            node.state = model.STATE_HOLD
            node.hold_job = " ".join([request.tenant,
                                      request.project,
                                      request.job,
                                      request.ref_filter])
            node.comment = request.reason
            if request.node_expiration:
                node.hold_expiration = request.node_expiration
            self.zk_nodepool.storeNode(node)

        request.nodes.append(dict(
            build=build.uuid,
            nodes=[node.id for node in nodes],
        ))
        request.current_count += 1

        # Request has been used at least the maximum number of times so set
        # the expiration time so that it can be auto-deleted.
        if request.current_count >= request.max_count and not request.expired:
            request.expired = time.time()

        # Give ourselves a few seconds to try to obtain the lock rather than
        # immediately give up.
        self.zk_nodepool.lockHoldRequest(request, timeout=5)

        try:
            self.zk_nodepool.storeHoldRequest(request)
        except Exception:
            # If we fail to update the request count, we won't consider it
            # a real autohold error by passing the exception up. It will
            # just get used more than the original count specified.
            # It's possible to leak some held nodes, though, which would
            # require manual node deletes.
            log.exception("Unable to update hold request %s:", request)
        finally:
            # Although any exceptions thrown here are handled higher up in
            # _doBuildCompletedEvent, we always want to try to unlock it.
            self.zk_nodepool.unlockHoldRequest(request)

        if resources and duration:
            self.emitStatsResourceCounters(
                request.tenant, request.project, resources, duration)

    def useNodeSet(self, nodeset, tenant_name, project_name,
                   zuul_event_id=None):
        log = get_annotated_logger(self.log, zuul_event_id)
        log.info("Setting nodeset %s in use", nodeset)
        user_data = dict(
            zuul_system=self.system_id,
            project_name=project_name,
        )
        for node in nodeset.getNodes():
            if node.lock is None:
                raise Exception("Node %s is not locked", node)
            node.state = model.STATE_IN_USE
            node.user_data = user_data
            self.zk_nodepool.storeNode(node)

    def returnNodeSet(self, nodeset, build, tenant_name, project_name,
                      duration, zuul_event_id=None):
        log = get_annotated_logger(self.log, zuul_event_id)
        log.info("Returning nodeset %s", nodeset)
        resources = defaultdict(int)

        for node in nodeset.getNodes():
            if node.lock is None:
                log.error("Node %s is not locked", node)
            else:
                try:
                    if node.state == model.STATE_IN_USE:
                        if node.resources:
                            self.addResources(resources, node.resources)
                        node.state = model.STATE_USED
                        self.zk_nodepool.storeNode(node)
                except Exception:
                    log.exception("Exception storing node %s "
                                  "while unlocking:", node)
        self.unlockNodeSet(nodeset)

        log.info("Nodeset %s with %s nodes was in use "
                 "for %s seconds for build %s for project %s",
                 nodeset, len(nodeset.nodes), duration, build, project_name)

        if resources and duration:
            self.emitStatsResourceCounters(
                tenant_name, project_name, resources, duration)

    def unlockNodeSet(self, nodeset):
        self._unlockNodes(nodeset.getNodes())

    def _unlockNodes(self, nodes):
        for node in nodes:
            try:
                self.zk_nodepool.unlockNode(node)
            except Exception:
                self.log.exception("Error unlocking node:")

    def lockNodes(self, request, nodeset):
        log = get_annotated_logger(self.log, event=request.event_id)
        # Try to lock all of the supplied nodes.  If any lock fails,
        # try to unlock any which have already been locked before
        # re-raising the error.
        locked_nodes = []
        try:
            for node_id, node in zip(request.nodes, nodeset.getNodes()):
                self.zk_nodepool.updateNode(node, node_id)
                if node.allocated_to != request.id:
                    raise Exception("Node %s allocated to %s, not %s" %
                                    (node.id, node.allocated_to, request.id))
                log.debug("Locking node %s", node)
                self.zk_nodepool.lockNode(node, timeout=30)
                # Check the allocated_to again to ensure that nodepool didn't
                # re-allocate the nodes to a different node request while we
                # were locking them.
                if node.allocated_to != request.id:
                    raise Exception(
                        "Node %s was reallocated during locking %s, not %s" %
                        (node.id, node.allocated_to, request.id))
                locked_nodes.append(node)
        except Exception:
            log.exception("Error locking nodes:")
            self._unlockNodes(locked_nodes)
            raise

    def getNodeSet(self, request, job_nodeset):
        """
        Get a NodeSet object with info about real nodes.

        :param NodeRequest request: The fulfilled NodeRequest
        :param NodeSet job_nodeset: The NodeSet object attached to the job

        :returns: A new NodeSet object which contains information from
            nodepool about the actual allocated nodes.
        """
        # A copy of the nodeset with information about the real nodes
        nodeset = job_nodeset.copy()

        # If we didn't request nodes and the request is fulfilled then just
        # return. We don't have to do anything in this case.
        if not request.labels and request.fulfilled:
            return nodeset

        # Load the node info from ZK.
        for node_id, node in zip(request.nodes, nodeset.getNodes()):
            self.zk_nodepool.updateNode(node, node_id)

        return nodeset

    def acceptNodes(self, request, nodeset):
        # Accept the nodes supplied by request, mutate nodeset with
        # the real node information.
        locked = False
        if request.fulfilled:
            # If the request succeeded, try to lock the nodes.
            try:
                nodes = self.lockNodes(request, nodeset)
                locked = True
            except Exception:
                log = get_annotated_logger(self.log, request.event_id)
                log.exception("Error locking nodes:")
                request.failed = True

        # Regardless of whether locking (or even the request)
        # succeeded, delete the request.
        if not self.deleteNodeRequest(request.id, locked, request.event_id):
            request.failed = True
            self.unlockNodeSet(request.nodeset)

        if request.failed:
            raise Exception("Accepting nodes failed")
        return nodes

    def deleteNodeRequest(self, request_id, locked=False, event_id=None):
        log = get_annotated_logger(self.log, event_id)
        log.debug("Deleting node request %s", request_id)
        try:
            self.zk_nodepool.deleteNodeRequest(request_id)
        except Exception:
            log.exception("Error deleting node request:")
            return False
        return True

    def getNodeRequests(self):
        """Get all node requests submitted by Zuul

        Note this relies entirely on the internal cache.

        :returns: An iterator of NodeRequest objects created by this Zuul
            system.
        """
        for req_id in self.zk_nodepool.getNodeRequests(cached=True):
            req = self.zk_nodepool.getNodeRequest(req_id, cached=True)
            if req.requestor == self.system_id:
                yield req

    def getNodes(self):
        """Get all nodes in use or held by Zuul

        Note this relies entirely on the internal cache.

        :returns: An iterator of Node objects in use (or held) by this Zuul
            system.
        """
        for node_id in self.zk_nodepool.getNodes(cached=True):
            node = self.zk_nodepool.getNode(node_id)
            if (node.user_data and
                isinstance(node.user_data, dict) and
                node.user_data.get('zuul_system') == self.system_id):
                yield node

    def emitStatsTotals(self, abide):
        if not self.statsd:
            return

        total_requests = 0
        tenant_requests = defaultdict(int)
        in_use_resources_by_project = {}
        in_use_resources_by_tenant = {}
        total_resources_by_tenant = {}
        empty_resource_dict = dict([(k, 0) for k in self.resource_types])

        # Initialize zero values for gauges
        for tenant in abide.tenants.values():
            tenant_requests[tenant.name] = 0
            in_use_resources_by_tenant[tenant.name] =\
                empty_resource_dict.copy()
            total_resources_by_tenant[tenant.name] = empty_resource_dict.copy()
            for project in tenant.all_projects:
                in_use_resources_by_project[project.canonical_name] =\
                    empty_resource_dict.copy()

        # Count node requests
        for req in self.getNodeRequests():
            total_requests += 1
            if not req.tenant_name:
                continue
            tenant_requests[req.tenant_name] += 1

        self.statsd.gauge('zuul.nodepool.current_requests',
                          total_requests)
        for tenant, request_count in tenant_requests.items():
            self.statsd.gauge(
                "zuul.nodepool.tenant.{tenant}.current_requests",
                request_count,
                tenant=tenant)

        # Count nodes
        for node in self.zk_nodepool.nodeIterator(cached=True):
            if not node.resources:
                continue

            tenant_name = node.tenant_name
            if tenant_name in total_resources_by_tenant and \
               node.requestor == self.system_id:
                self.addResources(
                    total_resources_by_tenant[tenant_name],
                    node.resources)

            # below here, we are only interested in nodes which are either
            # in-use, used, or currently held by this zuul system
            if node.state not in {model.STATE_IN_USE,
                                  model.STATE_USED,
                                  model.STATE_HOLD}:
                continue
            if not node.user_data:
                continue
            if node.user_data.get('zuul_system') != self.system_id:
                continue

            if tenant_name in in_use_resources_by_tenant:
                self.addResources(
                    in_use_resources_by_tenant[tenant_name],
                    node.resources)

            project_name = node.user_data.get('project_name')
            if project_name in in_use_resources_by_project:
                self.addResources(
                    in_use_resources_by_project[project_name],
                    node.resources)

        for tenant, resources in total_resources_by_tenant.items():
            for resource, value in resources.items():
                key = 'zuul.nodepool.resources.total.tenant.' \
                      '{tenant}.{resource}'
                self.statsd.gauge(key, value, tenant=tenant, resource=resource)
        for tenant, resources in in_use_resources_by_tenant.items():
            for resource, value in resources.items():
                key = 'zuul.nodepool.resources.in_use.tenant.' \
                      '{tenant}.{resource}'
                self.statsd.gauge(key, value, tenant=tenant, resource=resource)
        for project, resources in in_use_resources_by_project.items():
            for resource, value in resources.items():
                key = 'zuul.nodepool.resources.in_use.project.' \
                      '{project}.{resource}'
                self.statsd.gauge(
                    key, value, project=project, resource=resource)