summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorZuul <zuul@review.opendev.org>2022-09-23 06:46:25 +0000
committerGerrit Code Review <review@openstack.org>2022-09-23 06:46:25 +0000
commit2bc750ac70f6fc1389d7eb3315660cf9a7bedeca (patch)
treef84de9246fe353f78e74baf6bbdfce86ed8c1d44
parent6fa84faf3fb595dd8d80a6583891dc4f2e8043d2 (diff)
parent8c2433a2c427367b4e26ec5fd4a0cd0f67399383 (diff)
downloadzuul-2bc750ac70f6fc1389d7eb3315660cf9a7bedeca.tar.gz
Merge "Tracing: implement span save/restore"
-rw-r--r--tests/otlp_fixture.py5
-rw-r--r--tests/unit/test_tracing.py176
-rw-r--r--zuul/lib/tracing.py203
-rw-r--r--zuul/manager/__init__.py25
-rw-r--r--zuul/model.py17
5 files changed, 401 insertions, 25 deletions
diff --git a/tests/otlp_fixture.py b/tests/otlp_fixture.py
index cd2329483..633296fac 100644
--- a/tests/otlp_fixture.py
+++ b/tests/otlp_fixture.py
@@ -11,11 +11,11 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
-
from concurrent import futures
import fixtures
import grpc
+from opentelemetry import trace
from opentelemetry.proto.collector.trace.v1.trace_service_pb2_grpc import (
TraceServiceServicer,
add_TraceServiceServicer_to_server
@@ -45,6 +45,9 @@ class OTLPFixture(fixtures.Fixture):
self.server = grpc.server(self.executor)
add_TraceServiceServicer_to_server(TraceServer(self), self.server)
self.port = self.server.add_insecure_port('[::]:0')
+ # Reset global tracer provider
+ trace._TRACER_PROVIDER_SET_ONCE = trace.Once()
+ trace._TRACER_PROVIDER = None
def _setUp(self):
self.server.start()
diff --git a/tests/unit/test_tracing.py b/tests/unit/test_tracing.py
index ed64c8a7c..3c452dd53 100644
--- a/tests/unit/test_tracing.py
+++ b/tests/unit/test_tracing.py
@@ -12,8 +12,13 @@
# License for the specific language governing permissions and limitations
# under the License.
+import time
+
from tests.base import iterate_timeout, ZuulTestCase
+import zuul.lib.tracing as tracing
+from opentelemetry import trace
+
def attributes_to_dict(attrlist):
ret = {}
@@ -26,16 +31,169 @@ class TestTracing(ZuulTestCase):
config_file = 'zuul-tracing.conf'
tenant_config_file = "config/single-tenant/main.yaml"
- def test_tracing(self):
- self.scheds.first.sched.tracing.test()
+ def test_tracing_api(self):
+ tracer = trace.get_tracer("zuul")
+
+ # We have a lot of timestamps stored as floats, so make sure
+ # our root span is a ZuulSpan that can handle that input.
+ span_info = tracing.startSavedSpan('parent-trace',
+ start_time=time.time(),
+ attributes={'startattr': 'bar'},
+ include_attributes=True)
+
+ # Simulate a reconstructed root span
+ span = tracing.restoreSpan(span_info)
+
+ # Within the root span, use the more typical OpenTelemetry
+ # context manager api.
+ with trace.use_span(span):
+ with tracer.start_span('child1-trace') as child1_span:
+ link = trace.Link(child1_span.context,
+ attributes={'relationship': 'prev'})
+
+ # Make sure that we can manually start and stop a child span,
+ # and that it is a ZuulSpan as well.
+ with trace.use_span(span):
+ child = tracer.start_span('child2-trace', start_time=time.time(),
+ links=[link])
+ child.end(end_time=time.time())
+
+ # Make sure that we can start a child span from a span
+ # context and not a full span:
+ span_context = tracing.getSpanContext(span)
+ with tracing.startSpanInContext(span_context, 'child3-trace') as child:
+ child.end(end_time=time.time())
+
+ # End our root span manually.
+ tracing.endSavedSpan(span_info, end_time=time.time(),
+ attributes={'endattr': 'baz'})
+
+ for _ in iterate_timeout(60, "request to arrive"):
+ if len(self.otlp.requests) == 4:
+ break
+ req1 = self.otlp.requests[0]
+ self.log.debug("Received:\n%s", req1)
+ attrs = attributes_to_dict(req1.resource_spans[0].resource.attributes)
+ self.assertEqual({"service.name": "zuultest"}, attrs)
+ self.assertEqual("zuul",
+ req1.resource_spans[0].scope_spans[0].scope.name)
+ span1 = req1.resource_spans[0].scope_spans[0].spans[0]
+ self.assertEqual("child1-trace", span1.name)
+
+ req2 = self.otlp.requests[1]
+ self.log.debug("Received:\n%s", req2)
+ span2 = req2.resource_spans[0].scope_spans[0].spans[0]
+ self.assertEqual("child2-trace", span2.name)
+ self.assertEqual(span2.links[0].span_id, span1.span_id)
+ attrs = attributes_to_dict(span2.links[0].attributes)
+ self.assertEqual({"relationship": "prev"}, attrs)
+
+ req3 = self.otlp.requests[2]
+ self.log.debug("Received:\n%s", req3)
+ span3 = req3.resource_spans[0].scope_spans[0].spans[0]
+ self.assertEqual("child3-trace", span3.name)
+
+ req4 = self.otlp.requests[3]
+ self.log.debug("Received:\n%s", req4)
+ span4 = req4.resource_spans[0].scope_spans[0].spans[0]
+ self.assertEqual("parent-trace", span4.name)
+ attrs = attributes_to_dict(span4.attributes)
+ self.assertEqual({"startattr": "bar",
+ "endattr": "baz"}, attrs)
+
+ self.assertEqual(span1.trace_id, span4.trace_id)
+ self.assertEqual(span2.trace_id, span4.trace_id)
+ self.assertEqual(span3.trace_id, span4.trace_id)
+
+ def test_tracing_api_null(self):
+ tracer = trace.get_tracer("zuul")
+
+ # Test that restoring spans and span contexts works with
+ # null values.
+
+ span_info = None
+ # Simulate a reconstructed root span from a null value
+ span = tracing.restoreSpan(span_info)
+
+ # Within the root span, use the more typical OpenTelemetry
+ # context manager api.
+ with trace.use_span(span):
+ with tracer.start_span('child1-trace') as child1_span:
+ link = trace.Link(child1_span.context,
+ attributes={'relationship': 'prev'})
+
+ # Make sure that we can manually start and stop a child span,
+ # and that it is a ZuulSpan as well.
+ with trace.use_span(span):
+ child = tracer.start_span('child2-trace', start_time=time.time(),
+ links=[link])
+ child.end(end_time=time.time())
+
+ # Make sure that we can start a child span from a null span
+ # context:
+ span_context = None
+ with tracing.startSpanInContext(span_context, 'child3-trace') as child:
+ child.end(end_time=time.time())
+
+ # End our root span manually.
+ span.end(end_time=time.time())
+
for _ in iterate_timeout(60, "request to arrive"):
- if self.otlp.requests:
+ if len(self.otlp.requests) == 3:
break
- req = self.otlp.requests[0]
- self.log.debug("Received:\n%s", req)
- attrs = attributes_to_dict(req.resource_spans[0].resource.attributes)
+ req1 = self.otlp.requests[0]
+ self.log.debug("Received:\n%s", req1)
+ attrs = attributes_to_dict(req1.resource_spans[0].resource.attributes)
self.assertEqual({"service.name": "zuultest"}, attrs)
self.assertEqual("zuul",
- req.resource_spans[0].scope_spans[0].scope.name)
- span = req.resource_spans[0].scope_spans[0].spans[0]
- self.assertEqual("test-trace", span.name)
+ req1.resource_spans[0].scope_spans[0].scope.name)
+ span1 = req1.resource_spans[0].scope_spans[0].spans[0]
+ self.assertEqual("child1-trace", span1.name)
+
+ req2 = self.otlp.requests[1]
+ self.log.debug("Received:\n%s", req2)
+ span2 = req2.resource_spans[0].scope_spans[0].spans[0]
+ self.assertEqual("child2-trace", span2.name)
+ self.assertEqual(span2.links[0].span_id, span1.span_id)
+ attrs = attributes_to_dict(span2.links[0].attributes)
+ self.assertEqual({"relationship": "prev"}, attrs)
+
+ req3 = self.otlp.requests[2]
+ self.log.debug("Received:\n%s", req3)
+ span3 = req3.resource_spans[0].scope_spans[0].spans[0]
+ self.assertEqual("child3-trace", span3.name)
+
+ self.assertNotEqual(span1.trace_id, span2.trace_id)
+ self.assertNotEqual(span2.trace_id, span3.trace_id)
+ self.assertNotEqual(span1.trace_id, span3.trace_id)
+
+ def test_tracing(self):
+ A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
+ A.addApproval('Code-Review', 2)
+ self.fake_gerrit.addEvent(A.addApproval('Approved', 1))
+ self.waitUntilSettled()
+
+ for _ in iterate_timeout(60, "request to arrive"):
+ if len(self.otlp.requests) >= 2:
+ break
+
+ buildset = self.getSpan('BuildSet')
+ self.log.debug("Received:\n%s", buildset)
+ item = self.getSpan('QueueItem')
+ self.log.debug("Received:\n%s", item)
+ self.assertEqual(item.trace_id, buildset.trace_id)
+ self.assertNotEqual(item.span_id, buildset.span_id)
+ self.assertTrue(buildset.start_time_unix_nano >=
+ item.start_time_unix_nano)
+ self.assertTrue(buildset.end_time_unix_nano <=
+ item.end_time_unix_nano)
+ item_attrs = attributes_to_dict(item.attributes)
+ self.assertTrue(item_attrs['ref_number'] == "1")
+ self.assertTrue(item_attrs['ref_patchset'] == "1")
+ self.assertTrue('zuul_event_id' in item_attrs)
+
+ def getSpan(self, name):
+ for req in self.otlp.requests:
+ span = req.resource_spans[0].scope_spans[0].spans[0]
+ if span.name == name:
+ return span
diff --git a/zuul/lib/tracing.py b/zuul/lib/tracing.py
index 2eb4d8903..42b2681f3 100644
--- a/zuul/lib/tracing.py
+++ b/zuul/lib/tracing.py
@@ -18,12 +18,203 @@ from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import \
from opentelemetry.exporter.otlp.proto.http.trace_exporter import \
OTLPSpanExporter as HTTPExporter
from opentelemetry.sdk.resources import SERVICE_NAME, Resource
-from opentelemetry.sdk.trace import TracerProvider
+from opentelemetry.sdk.trace import TracerProvider, Span
from opentelemetry.sdk.trace.export import BatchSpanProcessor
+from opentelemetry import trace
+from opentelemetry.sdk import trace as trace_sdk
from zuul.lib.config import get_default, any_to_bool
+class ZuulSpan(Span):
+ """An implementation of Span which accepts floating point
+ times and converts them to the expected nanoseconds."""
+
+ def start(self, start_time=None, parent_context=None):
+ if isinstance(start_time, float):
+ start_time = int(start_time * (10**9))
+ return super().start(start_time, parent_context)
+
+ def end(self, end_time=None):
+ if isinstance(end_time, float):
+ end_time = int(end_time * (10**9))
+ return super().end(end_time)
+
+
+# Patch the OpenTelemetry SDK Span class to return a ZuulSpan so that
+# we can supply floating point timestamps.
+trace_sdk._Span = ZuulSpan
+
+
+def _formatContext(context):
+ return {
+ 'trace_id': context.trace_id,
+ 'span_id': context.span_id,
+ }
+
+
+def _formatAttributes(attrs):
+ if attrs is None:
+ return None
+ return attrs.copy()
+
+
+def getSpanInfo(span, include_attributes=False):
+ """Return a dict for use in serializing a Span."""
+ links = [{'context': _formatContext(l.context),
+ 'attributes': _formatAttributes(l.attributes)}
+ for l in span.links]
+ attrs = _formatAttributes(span.attributes)
+ context = span.get_span_context()
+ ret = {
+ 'name': span.name,
+ 'trace_id': context.trace_id,
+ 'span_id': context.span_id,
+ 'trace_flags': context.trace_flags,
+ 'start_time': span.start_time,
+ }
+ if links:
+ ret['links'] = links
+ if attrs:
+ if not include_attributes:
+ # Avoid setting attributes when we start saved spans
+ # because we have to store them in ZooKeeper and we should
+ # minimize what we store there (especially since it is
+ # usually duplicative). If you really need to set
+ # attributes at the start of a span (because the info is
+ # not available later), set include_attributes to True.
+ # Otherwise, we raise an error here to remind ourselves to
+ # avoid that programming pattern.
+ raise RuntimeError("Attributes were set on a saved span; "
+ "either set them when ending the span, "
+ "or set include_attributes=True")
+ ret['attributes'] = attrs
+ return ret
+
+
+def restoreSpan(span_info, is_remote=True):
+ """Restore a Span from the serialized dict provided by getSpanInfo
+
+ Return None if unable to serialize the span.
+ """
+ tracer = trace.get_tracer("zuul")
+ if span_info is None:
+ return trace.INVALID_SPAN
+ required_keys = {'name', 'trace_id', 'span_id', 'trace_flags'}
+ if not required_keys <= set(span_info.keys()):
+ return trace.INVALID_SPAN
+ span_context = trace.SpanContext(
+ span_info['trace_id'],
+ span_info['span_id'],
+ is_remote=is_remote,
+ trace_flags=trace.TraceFlags(span_info['trace_flags']),
+ )
+ links = []
+ for link_info in span_info.get('links', []):
+ link_context = trace.SpanContext(
+ link_info['context']['trace_id'],
+ link_info['context']['span_id'])
+ link = trace.Link(link_context, link_info['attributes'])
+ links.append(link)
+ attributes = span_info.get('attributes', {})
+
+ span = ZuulSpan(
+ name=span_info['name'],
+ context=span_context,
+ parent=None,
+ sampler=tracer.sampler,
+ resource=tracer.resource,
+ attributes=attributes,
+ span_processor=tracer.span_processor,
+ kind=trace.SpanKind.INTERNAL,
+ links=links,
+ instrumentation_info=tracer.instrumentation_info,
+ record_exception=False,
+ set_status_on_exception=True,
+ limits=tracer._span_limits,
+ instrumentation_scope=tracer._instrumentation_scope,
+ )
+ span.start(start_time=span_info['start_time'])
+
+ return span
+
+
+def startSavedSpan(*args, **kw):
+ """Start a span and serialize it
+
+ This is a convenience method which starts a span (either root
+ or child) and immediately serializes it.
+
+ Most spans in Zuul should use this method.
+ """
+ tracer = trace.get_tracer("zuul")
+ include_attributes = kw.pop('include_attributes', False)
+ span = tracer.start_span(*args, **kw)
+ return getSpanInfo(span, include_attributes)
+
+
+def endSavedSpan(span_info, end_time=None, attributes=None):
+ """End a saved span.
+
+ This is a convenience method to restore a saved span and
+ immediately end it.
+
+ Most spans in Zuul should use this method.
+ """
+ span = restoreSpan(span_info, is_remote=False)
+ if span:
+ if attributes:
+ span.set_attributes(attributes)
+ span.end(end_time=end_time)
+
+
+def getSpanContext(span):
+ """Return a dict for use in serializing a Span Context.
+
+ The span context information used here is a lightweight
+ encoding of the span information so that remote child spans
+ can be started without access to a fully restored parent.
+ This is equivalent to (but not the same format) as the
+ OpenTelemetry trace context propogator.
+ """
+ context = span.get_span_context()
+ return {
+ 'trace_id': context.trace_id,
+ 'span_id': context.span_id,
+ 'trace_flags': context.trace_flags,
+ }
+
+
+def restoreSpanContext(span_context):
+ """Return a span with remote context information from getSpanContext.
+
+ This returns a non-recording span to use as a parent span. It
+ avoids the necessity of fully restoring the parent span.
+ """
+ if span_context:
+ span_context = trace.SpanContext(
+ trace_id=span_context['trace_id'],
+ span_id=span_context['span_id'],
+ is_remote=True,
+ trace_flags=trace.TraceFlags(span_context['trace_flags'])
+ )
+ else:
+ span_context = trace.INVALID_SPAN_CONTEXT
+ parent = trace.NonRecordingSpan(span_context)
+ return parent
+
+
+def startSpanInContext(span_context, *args, **kw):
+ """Start a span in a saved context.
+
+ This restores a span from a saved context and starts a new child span.
+ """
+ tracer = trace.get_tracer("zuul")
+ parent = restoreSpanContext(span_context)
+ with trace.use_span(parent):
+ return tracer.start_span(*args, **kw)
+
+
class Tracing:
PROTOCOL_GRPC = 'grpc'
PROTOCOL_HTTP_PROTOBUF = 'http/protobuf'
@@ -33,10 +224,10 @@ class Tracing:
service_name = get_default(config, "tracing", "service_name", "zuul")
resource = Resource(attributes={SERVICE_NAME: service_name})
provider = TracerProvider(resource=resource)
+ trace.set_tracer_provider(provider)
enabled = get_default(config, "tracing", "enabled")
if not any_to_bool(enabled):
self.processor = None
- self.tracer = provider.get_tracer("zuul")
return
protocol = get_default(config, "tracing", "protocol",
@@ -93,16 +284,8 @@ class Tracing:
raise Exception(f"Unknown tracing protocol {protocol}")
self.processor = self.processor_class(exporter)
provider.add_span_processor(self.processor)
- self.tracer = provider.get_tracer("zuul")
def stop(self):
if not self.processor:
return
self.processor.shutdown()
-
- def test(self):
- # TODO: remove once we have actual traces
- if not self.tracer:
- return
- with self.tracer.start_as_current_span('test-trace'):
- pass
diff --git a/zuul/manager/__init__.py b/zuul/manager/__init__.py
index 365435f3d..cd9b2381d 100644
--- a/zuul/manager/__init__.py
+++ b/zuul/manager/__init__.py
@@ -22,6 +22,7 @@ from zuul import model
from zuul.lib.dependson import find_dependency_headers
from zuul.lib.logutil import get_annotated_logger
from zuul.lib.tarjan import strongly_connected_components
+import zuul.lib.tracing as tracing
from zuul.model import (
Change, DequeueEvent, PipelineState, PipelineChangeList, QueueItem,
PipelinePostConfigEvent,
@@ -30,6 +31,8 @@ from zuul.zk.change_cache import ChangeKey
from zuul.zk.components import COMPONENT_REGISTRY
from zuul.zk.locks import pipeline_lock
+from opentelemetry import trace
+
class DynamicChangeQueueContextManager(object):
def __init__(self, change_queue):
@@ -579,7 +582,13 @@ class PipelineManager(metaclass=ABCMeta):
log.info("Adding change %s to queue %s in %s" %
(change, change_queue, self.pipeline))
- item = change_queue.enqueueChange(change, event)
+ if enqueue_time is None:
+ enqueue_time = time.time()
+ span_info = tracing.startSavedSpan(
+ 'QueueItem', start_time=enqueue_time)
+ item = change_queue.enqueueChange(change, event,
+ span_info=span_info,
+ enqueue_time=enqueue_time)
self.updateBundle(item, change_queue, cycle)
with item.activeContext(self.current_context):
@@ -739,6 +748,15 @@ class PipelineManager(metaclass=ABCMeta):
self.reportDequeue(item)
item.queue.dequeueItem(item)
+ span_attrs = {
+ 'zuul_event_id': item.event.zuul_event_id,
+ }
+ for k, v in item.change.getSafeAttributes().toDict().items():
+ span_attrs['ref_' + k] = v
+ tracing.endSavedSpan(item.current_build_set.span_info)
+ tracing.endSavedSpan(item.span_info,
+ attributes=span_attrs)
+
def removeItem(self, item):
log = get_annotated_logger(self.log, item.event)
# Remove an item from the queue, probably because it has been
@@ -964,6 +982,7 @@ class PipelineManager(metaclass=ABCMeta):
self.reportNormalBuildsetEnd(
item.current_build_set, 'dequeue', final=False,
result='DEQUEUED')
+ tracing.endSavedSpan(item.current_build_set.span_info)
item.resetAllBuilds()
for item_behind in item.items_behind:
@@ -1337,7 +1356,9 @@ class PipelineManager(metaclass=ABCMeta):
# isn't already set.
tpc = tenant.project_configs.get(item.change.project.canonical_name)
if not build_set.ref:
- build_set.setConfiguration(self.current_context)
+ with trace.use_span(tracing.restoreSpan(item.span_info)):
+ span_info = tracing.startSavedSpan('BuildSet')
+ build_set.setConfiguration(self.current_context, span_info)
# Next, if a change ahead has a broken config, then so does
# this one. Record that and don't do anything else.
diff --git a/zuul/model.py b/zuul/model.py
index 5aaa22a5f..254556fda 100644
--- a/zuul/model.py
+++ b/zuul/model.py
@@ -329,6 +329,9 @@ class Attributes(object):
def __init__(self, **kw):
setattr(self, '__dict__', kw)
+ def toDict(self):
+ return self.__dict__
+
class Freezable(object):
"""A mix-in class so that an object can be made immutable"""
@@ -1094,13 +1097,16 @@ class ChangeQueue(zkobject.ZKObject):
def matches(self, project_cname, branch):
return (project_cname, branch) in self.project_branches
- def enqueueChange(self, change, event):
+ def enqueueChange(self, change, event, span_info=None, enqueue_time=None):
+ if enqueue_time is None:
+ enqueue_time = time.time()
item = QueueItem.new(self.zk_context,
queue=self,
pipeline=self.pipeline,
change=change,
event=event,
- enqueue_time=time.time())
+ span_info=span_info,
+ enqueue_time=enqueue_time)
self.enqueueItem(item)
return item
@@ -3882,6 +3888,7 @@ class BuildSet(zkobject.ZKObject):
tries={},
files_state=self.NEW,
repo_state_state=self.NEW,
+ span_info=None,
configured=False,
configured_time=None, # When setConfigured was called
start_time=None, # When the buildset reported start
@@ -3997,6 +4004,7 @@ class BuildSet(zkobject.ZKObject):
"fail_fast": self.fail_fast,
"job_graph": (self.job_graph.toDict()
if self.job_graph else None),
+ "span_info": self.span_info,
"configured_time": self.configured_time,
"start_time": self.start_time,
"repo_state_request_time": self.repo_state_request_time,
@@ -4147,7 +4155,7 @@ class BuildSet(zkobject.ZKObject):
len(self.builds),
self.getStateName(self.merge_state))
- def setConfiguration(self, context):
+ def setConfiguration(self, context, span_info):
with self.activeContext(context):
# The change isn't enqueued until after it's created
# so we don't know what the other changes ahead will be
@@ -4167,6 +4175,7 @@ class BuildSet(zkobject.ZKObject):
self.merger_items = [i.makeMergerItem() for i in items]
self.configured = True
self.configured_time = time.time()
+ self.span_info = span_info
def _toChangeDict(self, item):
# Inject bundle_id to dict if available, this can be used to decide
@@ -4336,6 +4345,7 @@ class QueueItem(zkobject.ZKObject):
current_build_set=None,
item_ahead=None,
items_behind=[],
+ span_info=None,
enqueue_time=None,
report_time=None,
dequeue_time=None,
@@ -4405,6 +4415,7 @@ class QueueItem(zkobject.ZKObject):
self.current_build_set.getPath()),
"item_ahead": self.item_ahead and self.item_ahead.getPath(),
"items_behind": [i.getPath() for i in self.items_behind],
+ "span_info": self.span_info,
"enqueue_time": self.enqueue_time,
"report_time": self.report_time,
"dequeue_time": self.dequeue_time,