1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
|
# Copyright 2022 Acme Gating, LLC
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import grpc
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import \
OTLPSpanExporter as GRPCExporter
from opentelemetry.exporter.otlp.proto.http.trace_exporter import \
OTLPSpanExporter as HTTPExporter
from opentelemetry.sdk.resources import SERVICE_NAME, Resource
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from zuul.lib.config import get_default, any_to_bool
class Tracing:
PROTOCOL_GRPC = 'grpc'
PROTOCOL_HTTP_PROTOBUF = 'http/protobuf'
processor_class = BatchSpanProcessor
def __init__(self, config):
service_name = get_default(config, "tracing", "service_name", "zuul")
resource = Resource(attributes={SERVICE_NAME: service_name})
provider = TracerProvider(resource=resource)
enabled = get_default(config, "tracing", "enabled")
if not any_to_bool(enabled):
self.processor = None
self.tracer = provider.get_tracer("zuul")
return
protocol = get_default(config, "tracing", "protocol",
self.PROTOCOL_GRPC)
endpoint = get_default(config, "tracing", "endpoint")
tls_key = get_default(config, "tracing", "tls_key")
tls_cert = get_default(config, "tracing", "tls_cert")
tls_ca = get_default(config, "tracing", "tls_ca")
certificate_file = get_default(config, "tracing", "certificate_file")
insecure = get_default(config, "tracing", "insecure")
if insecure is not None:
insecure = any_to_bool(insecure)
timeout = get_default(config, "tracing", "timeout")
if timeout is not None:
timeout = int(timeout)
compression = get_default(config, "tracing", "compression")
if protocol == self.PROTOCOL_GRPC:
if certificate_file:
raise Exception("The certificate_file tracing option "
f"is not valid for {protocol} endpoints")
if any([tls_ca, tls_key, tls_cert]):
if tls_ca:
tls_ca = open(tls_ca, 'rb').read()
if tls_key:
tls_key = open(tls_key, 'rb').read()
if tls_cert:
tls_cert = open(tls_cert, 'rb').read()
creds = grpc.ssl_channel_credentials(
root_certificates=tls_ca,
private_key=tls_key,
certificate_chain=tls_cert)
else:
creds = None
exporter = GRPCExporter(
endpoint=endpoint,
insecure=insecure,
credentials=creds,
timeout=timeout,
compression=compression)
elif protocol == self.PROTOCOL_HTTP_PROTOBUF:
if insecure:
raise Exception("The insecure tracing option "
f"is not valid for {protocol} endpoints")
if any([tls_ca, tls_key, tls_cert]):
raise Exception("The tls_* tracing options "
f"are not valid for {protocol} endpoints")
exporter = HTTPExporter(
endpoint=endpoint,
certificate_file=certificate_file,
timeout=timeout,
compression=compression)
else:
raise Exception(f"Unknown tracing protocol {protocol}")
self.processor = self.processor_class(exporter)
provider.add_span_processor(self.processor)
self.tracer = provider.get_tracer("zuul")
def stop(self):
if not self.processor:
return
self.processor.shutdown()
def test(self):
# TODO: remove once we have actual traces
if not self.tracer:
return
with self.tracer.start_as_current_span('test-trace'):
pass
|