summaryrefslogtreecommitdiff
path: root/zuul/driver/elasticsearch/connection.py
blob: eda15089f3aa3a44cc120ebb7d51fc7fc6dc4dc3 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
# Copyright 2019 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.

import yaml
import logging

from datetime import datetime
from elasticsearch import Elasticsearch
from elasticsearch.client import IndicesClient
from elasticsearch.helpers import bulk
from elasticsearch.helpers import BulkIndexError

from zuul.connection import BaseConnection


class ElasticsearchConnection(BaseConnection):
    driver_name = 'elasticsearch'
    log = logging.getLogger("zuul.ElasticSearchConnection")
    properties = {
        #  Common attribute
        "uuid": {"type": "keyword"},
        "build_type": {"type": "keyword"},
        "result": {"type": "keyword"},
        "duration": {"type": "integer"},
        "@timestamp": {"type": "date"},
        # BuildSet type specific attributes
        "zuul_ref": {"type": "keyword"},
        "pipeline": {"type": "keyword"},
        "project": {"type": "keyword"},
        "branch": {"type": "keyword"},
        "change": {"type": "integer"},
        "patchset": {"type": "keyword"},
        "ref": {"type": "keyword"},
        "oldrev": {"type": "keyword"},
        "newrev": {"type": "keyword"},
        "ref_url": {"type": "keyword"},
        "message": {"type": "text"},
        "tenant": {"type": "keyword"},
        # Build type specific attibutes
        "buildset_uuid": {"type": "keyword"},
        "job_name": {"type": "keyword"},
        "start_time": {"type": "date", "format": "epoch_second"},
        "end_time": {"type": "date", "format": "epoch_second"},
        "voting": {"type": "boolean"},
        "log_url": {"type": "keyword"},
        "nodeset": {"type": "keyword"}
    }

    def __init__(self, driver, connection_name, connection_config):
        super(ElasticsearchConnection, self).__init__(
            driver, connection_name, connection_config)
        self.uri = self.connection_config.get('uri').split(',')
        self.cnx_opts = {}
        use_ssl = self.connection_config.get('use_ssl', True)
        if isinstance(use_ssl, str):
            if use_ssl.lower() == 'false':
                use_ssl = False
            else:
                use_ssl = True
        self.cnx_opts['use_ssl'] = use_ssl
        if use_ssl:
            verify_certs = self.connection_config.get('verify_certs', True)
            if isinstance(verify_certs, str):
                if verify_certs.lower() == 'false':
                    verify_certs = False
                else:
                    verify_certs = True
            self.cnx_opts['verify_certs'] = verify_certs
            self.cnx_opts['ca_certs'] = self.connection_config.get(
                'ca_certs', None)
            self.cnx_opts['client_cert'] = self.connection_config.get(
                'client_cert', None)
            self.cnx_opts['client_key'] = self.connection_config.get(
                'client_key', None)
        self.es = Elasticsearch(
            self.uri, **self.cnx_opts)
        try:
            self.log.debug("Elasticsearch info: %s" % self.es.info())
        except Exception as e:
            self.log.warn("An error occured on estabilishing "
                          "connection to Elasticsearch: %s" % e)
        self.ic = IndicesClient(self.es)

    def setIndex(self, index):
        settings = {
            'mappings': {
                '_doc': {
                    "properties": self.properties
                }
            }
        }
        try:
            self.ic.create(index=index, ignore=400, body=settings)
        except Exception:
            self.log.exception(
                "Unable to create the index %s on connection %s" % (
                    index, self.connection_name))

    def gen(self, it, index):
        for source in it:
            d = {}
            source['@timestamp'] = datetime.utcfromtimestamp(
                int(source['start_time'])).strftime("%Y-%m-%dT%H:%M:%S.%fZ")
            d['_index'] = index
            d['_op_type'] = 'index'
            d['_source'] = source
            yield d

    def add_docs(self, source_it, index):

        self.setIndex(index)

        try:
            bulk(self.es, self.gen(source_it, index))
            self.es.indices.refresh(index=index)
            self.log.debug('%s docs indexed to %s' % (
                len(source_it), self.connection_name))
        except BulkIndexError as exc:
            self.log.warn("Some docs failed to be indexed (%s)" % exc)
            # We give flexibility by allowing any type of job's vars and
            # zuul return data to be indexed with EL dynamic mapping enabled.
            # It may happen that a doc own a field with a value that does not
            # match the previous data type that EL has detected for that field.
            # In that case the whole doc is not indexed by EL.
            # Here we want to mitigate by indexing the errorneous docs in a
            # <index-name>.errorneous index by flattening the doc data as yaml.
            # This ensures the doc is indexed and can be tracked and eventually
            # be modified and re-indexed by an operator.
            errorneous_docs = []
            for d in exc.errors:
                if d['index']['error']['type'] == 'mapper_parsing_exception':
                    errorneous_doc = {
                        'uuid': d['index']['data']['uuid'],
                        'blob': yaml.dump(d['index']['data'])
                    }
                    errorneous_docs.append(errorneous_doc)
            try:
                mapping_errorneous_index = "%s.errorneous" % index
                bulk(
                    self.es,
                    self.gen(errorneous_docs, mapping_errorneous_index))
                self.es.indices.refresh(index=mapping_errorneous_index)
                self.log.info(
                    "%s errorneous docs indexed" % (len(errorneous_docs)))
            except BulkIndexError as exc:
                self.log.warn(
                    "Some errorneous docs failed to be indexed (%s)" % exc)