diff options
Diffstat (limited to 'osprofiler/drivers')
-rw-r--r-- | osprofiler/drivers/__init__.py | 1 | ||||
-rw-r--r-- | osprofiler/drivers/otlp.py | 179 |
2 files changed, 180 insertions, 0 deletions
diff --git a/osprofiler/drivers/__init__.py b/osprofiler/drivers/__init__.py index 022b094..6e3b93d 100644 --- a/osprofiler/drivers/__init__.py +++ b/osprofiler/drivers/__init__.py @@ -1,6 +1,7 @@ from osprofiler.drivers import base # noqa from osprofiler.drivers import elasticsearch_driver # noqa from osprofiler.drivers import jaeger # noqa +from osprofiler.drivers import otlp # noqa from osprofiler.drivers import loginsight # noqa from osprofiler.drivers import messaging # noqa from osprofiler.drivers import mongodb # noqa diff --git a/osprofiler/drivers/otlp.py b/osprofiler/drivers/otlp.py new file mode 100644 index 0000000..c1d210b --- /dev/null +++ b/osprofiler/drivers/otlp.py @@ -0,0 +1,179 @@ +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import collections +from urllib import parse as parser + +from oslo_config import cfg +from oslo_serialization import jsonutils + +from osprofiler import _utils as utils +from osprofiler.drivers import base +from osprofiler import exc + + +class OTLP(base.Driver): + def __init__(self, connection_str, project=None, service=None, host=None, + conf=cfg.CONF, **kwargs): + """OTLP driver using OTLP exporters.""" + + super(OTLP, self).__init__(connection_str, project=project, + service=service, host=host, + conf=conf, **kwargs) + try: + from opentelemetry import trace as trace_api + + from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter # noqa + from opentelemetry.sdk.resources import Resource + from opentelemetry.sdk.trace.export import BatchSpanProcessor + from opentelemetry.sdk.trace import TracerProvider + + self.trace_api = trace_api + except ImportError: + raise exc.CommandError( + "To use OSProfiler with OTLP exporters, " + "please install `opentelemetry-sdk` and " + "opentelemetry-exporter-otlp libraries. " + "To install with pip:\n `pip install opentelemetry-sdk " + "opentelemetry-exporter-otlp`.") + + service_name = self._get_service_name(conf, project, service) + resource = Resource(attributes={ + "service.name": service_name + }) + + parsed_url = parser.urlparse(connection_str) + # TODO("sahid"): We also want to handle https scheme? + parsed_url = parsed_url._replace(scheme="http") + + self.trace_api.set_tracer_provider( + TracerProvider(resource=resource)) + self.tracer = self.trace_api.get_tracer(__name__) + + exporter = OTLPSpanExporter("{}/v1/traces".format( + parsed_url.geturl())) + self.trace_api.get_tracer_provider().add_span_processor( + BatchSpanProcessor(exporter)) + + self.spans = collections.deque() + + def _get_service_name(self, conf, project, service): + prefix = conf.profiler_otlp.service_name_prefix + if prefix: + return "{}-{}-{}".format(prefix, project, service) + return "{}-{}".format(project, service) + + @classmethod + def get_name(cls): + return "otlp" + + def _kind(self, name): + if "wsgi" in name: + return self.trace_api.SpanKind.SERVER + elif ("db" in name or "http_client" in name or "api" in name): + return self.trace_api.SpanKind.CLIENT + return self.trace_api.SpanKind.INTERNAL + + def _name(self, payload): + info = payload["info"] + if info.get("request"): + return "{}_{}".format( + info["request"]["method"], info["request"]["path"]) + elif info.get("db"): + return "SQL_{}".format( + info["db"]["statement"].split(' ', 1)[0].upper()) + return payload["name"].rstrip("-start") + + def notify(self, payload): + if payload["name"].endswith("start"): + parent = self.trace_api.SpanContext( + trace_id=utils.uuid_to_int128(payload["base_id"]), + span_id=utils.shorten_id(payload["parent_id"]), + is_remote=False, + trace_flags=self.trace_api.TraceFlags( + self.trace_api.TraceFlags.SAMPLED)) + + ctx = self.trace_api.set_span_in_context( + self.trace_api.NonRecordingSpan(parent)) + + # OTLP Tracing span + span = self.tracer.start_span( + name=self._name(payload), + kind=self._kind(payload['name']), + attributes=self.create_span_tags(payload), + context=ctx) + + span._context = self.trace_api.SpanContext( + trace_id=span.context.trace_id, + span_id=utils.shorten_id(payload["trace_id"]), + is_remote=span.context.is_remote, + trace_flags=span.context.trace_flags, + trace_state=span.context.trace_state) + + self.spans.append(span) + else: + span = self.spans.pop() + + # Store result of db call and function call + for call in ("db", "function"): + if payload.get("info", {}).get(call): + span.set_attribute( + "result", payload["info"][call]["result"]) + # Span error tag and log + if payload["info"].get("etype"): + span.set_attribute("error", True) + span.add_event("log", { + "error.kind": payload["info"]["etype"], + "message": payload["info"]["message"]}) + span.end() + + def get_report(self, base_id): + return self._parse_results() + + def list_traces(self, fields=None): + return [] + + def list_error_traces(self): + return [] + + def create_span_tags(self, payload): + """Create tags an OpenTracing compatible span. + + :param info: Information from OSProfiler trace. + :returns tags: A dictionary contains standard tags + from OpenTracing sematic conventions, + and some other custom tags related to http, db calls. + """ + tags = {} + info = payload["info"] + + if info.get("db"): + # DB calls + tags["db.statement"] = info["db"]["statement"] + tags["db.params"] = jsonutils.dumps(info["db"]["params"]) + elif info.get("request"): + # WSGI call + tags["http.path"] = info["request"]["path"] + tags["http.query"] = info["request"]["query"] + tags["http.method"] = info["request"]["method"] + tags["http.scheme"] = info["request"]["scheme"] + elif info.get("function"): + # RPC, function calls + if "args" in info["function"]: + tags["args"] = info["function"]["args"] + if "kwargs" in info["function"]: + tags["kwargs"] = info["function"]["kwargs"] + tags["name"] = info["function"]["name"] + + return tags |