summaryrefslogtreecommitdiff
path: root/zuul/driver/mqtt/mqttreporter.py
blob: 11448bf69331615fae60360752b8399d2a300425 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
# Copyright 2017 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.

import logging
import time
import voluptuous as v

from zuul.lib.logutil import get_annotated_logger
from zuul.reporter import BaseReporter


class MQTTReporter(BaseReporter):
    """Publish messages to a topic via mqtt"""

    name = 'mqtt'
    log = logging.getLogger("zuul.MQTTReporter")

    def report(self, item):
        log = get_annotated_logger(self.log, item.event)
        log.debug("Report change %s, params %s", item.change, self.config)
        message = {
            'timestamp': time.time(),
            'action': self._action,
            'tenant': item.pipeline.tenant.name,
            'zuul_ref': item.current_build_set.ref,
            'pipeline': item.pipeline.name,
            'project': item.change.project.name,
            'branch': getattr(item.change, 'branch', ''),
            'change_url': item.change.url,
            'change': getattr(item.change, 'number', ''),
            'patchset': getattr(item.change, 'patchset', ''),
            'ref': getattr(item.change, 'ref', ''),
            'message': self._formatItemReport(
                item, with_jobs=False),
            'enqueue_time': item.enqueue_time,
            'buildset': {
                'uuid': item.current_build_set.uuid,
                'builds': []
            },
        }
        for job in item.getJobs():
            job_informations = {
                'job_name': job.name,
                'voting': job.voting,
            }
            build = item.current_build_set.getBuild(job.name)
            if build:
                # Report build data if available
                (result, url) = item.formatJobResult(job)
                job_informations.update({
                    'uuid': build.uuid,
                    'start_time': build.start_time,
                    'end_time': build.end_time,
                    'execute_time': build.execute_time,
                    'log_url': url,
                    'result': result,
                })
            message['buildset']['builds'].append(job_informations)
        topic = None
        try:
            topic = self.config['topic'].format(
                tenant=item.pipeline.tenant.name,
                pipeline=item.pipeline.name,
                project=item.change.project.name,
                branch=getattr(item.change, 'branch', None),
                change=getattr(item.change, 'number', None),
                patchset=getattr(item.change, 'patchset', None),
                ref=getattr(item.change, 'ref', None))
        except Exception:
            log.exception("Error while formatting MQTT topic %s:",
                          self.config['topic'])
        if topic is not None:
            self.connection.publish(
                topic, message, self.config.get('qos', 0), item.event)


def topicValue(value):
    if not isinstance(value, str):
        raise v.Invalid("topic is not a string")
    try:
        value.format(
            tenant='test',
            pipeline='test',
            project='test',
            branch='test',
            change='test',
            patchset='test',
            ref='test')
    except KeyError as e:
        raise v.Invalid("topic component %s is invalid" % str(e))
    return value


def qosValue(value):
    if not isinstance(value, int):
        raise v.Invalid("qos is not a integer")
    if value not in (0, 1, 2):
        raise v.Invalid("qos can only be 0, 1 or 2")
    return value


def getSchema():
    return v.Schema({v.Required('topic'): topicValue, 'qos': qosValue})