summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSimon Westphahl <simon.westphahl@bmw.de>2022-09-13 14:30:14 +0200
committerSimon Westphahl <simon.westphahl@bmw.de>2022-09-19 11:25:49 +0200
commitf1e3d67608e6b1f8850e6b0e8e5f28cfd3c73409 (patch)
tree00e6faa551fb84e56eb9301c19c6d2e306bd5dd1
parent075bdd0178e6ab2b0358ebcc87efebabfe9a54f8 (diff)
downloadzuul-f1e3d67608e6b1f8850e6b0e8e5f28cfd3c73409.tar.gz
Trace merge requests and merger operations
The span info for the different merger operations is stored on the request and will be returned to the scheduler via the result event. This also adds the request UUID to the "refstat" job so that we can attach that as a span attribute. Change-Id: Ib6ac7b5e7032d168f53fe32e28358bd0b87df435
-rw-r--r--tests/unit/test_tracing.py42
-rw-r--r--zuul/merger/client.py39
-rw-r--r--zuul/merger/server.py12
-rw-r--r--zuul/model.py19
-rw-r--r--zuul/scheduler.py18
5 files changed, 104 insertions, 26 deletions
diff --git a/tests/unit/test_tracing.py b/tests/unit/test_tracing.py
index d26a9510d..00ef85e36 100644
--- a/tests/unit/test_tracing.py
+++ b/tests/unit/test_tracing.py
@@ -31,6 +31,16 @@ class TestTracing(ZuulTestCase):
config_file = 'zuul-tracing.conf'
tenant_config_file = "config/single-tenant/main.yaml"
+ def _waitForSpans(self, *span_names, timeout=60,):
+ for _ in iterate_timeout(timeout, "requests to arrive"):
+ test_requests = [
+ r for r in self.otlp.requests
+ if r.resource_spans[0].scope_spans[0].spans[0].name
+ in span_names
+ ]
+ if len(test_requests) == len(span_names):
+ return test_requests
+
def test_tracing_api(self):
tracer = trace.get_tracer("zuul")
@@ -68,10 +78,10 @@ class TestTracing(ZuulTestCase):
tracing.endSavedSpan(span_info, end_time=time.time(),
attributes={'endattr': 'baz'})
- for _ in iterate_timeout(60, "request to arrive"):
- if len(self.otlp.requests) == 4:
- break
- req1 = self.otlp.requests[0]
+ test_requests = self._waitForSpans(
+ "parent-trace", "child1-trace", "child2-trace", "child3-trace")
+
+ req1 = test_requests[0]
self.log.debug("Received:\n%s", req1)
attrs = attributes_to_dict(req1.resource_spans[0].resource.attributes)
self.assertEqual({"service.name": "zuultest"}, attrs)
@@ -80,7 +90,7 @@ class TestTracing(ZuulTestCase):
span1 = req1.resource_spans[0].scope_spans[0].spans[0]
self.assertEqual("child1-trace", span1.name)
- req2 = self.otlp.requests[1]
+ req2 = test_requests[1]
self.log.debug("Received:\n%s", req2)
span2 = req2.resource_spans[0].scope_spans[0].spans[0]
self.assertEqual("child2-trace", span2.name)
@@ -88,12 +98,12 @@ class TestTracing(ZuulTestCase):
attrs = attributes_to_dict(span2.links[0].attributes)
self.assertEqual({"relationship": "prev"}, attrs)
- req3 = self.otlp.requests[2]
+ req3 = test_requests[2]
self.log.debug("Received:\n%s", req3)
span3 = req3.resource_spans[0].scope_spans[0].spans[0]
self.assertEqual("child3-trace", span3.name)
- req4 = self.otlp.requests[3]
+ req4 = test_requests[3]
self.log.debug("Received:\n%s", req4)
span4 = req4.resource_spans[0].scope_spans[0].spans[0]
self.assertEqual("parent-trace", span4.name)
@@ -138,10 +148,10 @@ class TestTracing(ZuulTestCase):
# End our root span manually.
span.end(end_time=time.time())
- for _ in iterate_timeout(60, "request to arrive"):
- if len(self.otlp.requests) == 3:
- break
- req1 = self.otlp.requests[0]
+ test_requests = self._waitForSpans(
+ "child1-trace", "child2-trace", "child3-trace")
+
+ req1 = test_requests[0]
self.log.debug("Received:\n%s", req1)
attrs = attributes_to_dict(req1.resource_spans[0].resource.attributes)
self.assertEqual({"service.name": "zuultest"}, attrs)
@@ -150,7 +160,7 @@ class TestTracing(ZuulTestCase):
span1 = req1.resource_spans[0].scope_spans[0].spans[0]
self.assertEqual("child1-trace", span1.name)
- req2 = self.otlp.requests[1]
+ req2 = test_requests[1]
self.log.debug("Received:\n%s", req2)
span2 = req2.resource_spans[0].scope_spans[0].spans[0]
self.assertEqual("child2-trace", span2.name)
@@ -158,7 +168,7 @@ class TestTracing(ZuulTestCase):
attrs = attributes_to_dict(span2.links[0].attributes)
self.assertEqual({"relationship": "prev"}, attrs)
- req3 = self.otlp.requests[2]
+ req3 = test_requests[2]
self.log.debug("Received:\n%s", req3)
span3 = req3.resource_spans[0].scope_spans[0].spans[0]
self.assertEqual("child3-trace", span3.name)
@@ -181,6 +191,8 @@ class TestTracing(ZuulTestCase):
self.log.debug("Received:\n%s", buildset)
item = self.getSpan('QueueItem')
self.log.debug("Received:\n%s", item)
+ merge_job = self.getSpan('Merge')
+ self.log.debug("Received:\n%s", merge_job)
build = self.getSpan('Build')
self.log.debug("Received:\n%s", build)
job = self.getSpan('JobExecution')
@@ -192,6 +204,10 @@ class TestTracing(ZuulTestCase):
item.start_time_unix_nano)
self.assertTrue(buildset.end_time_unix_nano <=
item.end_time_unix_nano)
+ self.assertTrue(merge_job.start_time_unix_nano >=
+ buildset.start_time_unix_nano)
+ self.assertTrue(merge_job.end_time_unix_nano <=
+ buildset.end_time_unix_nano)
item_attrs = attributes_to_dict(item.attributes)
self.assertTrue(item_attrs['ref_number'] == "1")
self.assertTrue(item_attrs['ref_patchset'] == "1")
diff --git a/zuul/merger/client.py b/zuul/merger/client.py
index 29fa39aaf..3f0f2478d 100644
--- a/zuul/merger/client.py
+++ b/zuul/merger/client.py
@@ -17,6 +17,7 @@ from uuid import uuid4
from zuul.lib.config import get_default
from zuul.lib.logutil import get_annotated_logger
+import zuul.lib.tracing as tracing
from zuul.model import (
FilesChangesCompletedEvent,
MergeCompletedEvent,
@@ -27,13 +28,23 @@ from zuul.model import (
from zuul.zk.event_queues import PipelineResultEventQueue
from zuul.zk.merger import MergerApi
from zuul.zk.exceptions import JobRequestNotFound
+
from kazoo.exceptions import BadVersionError, NoNodeError
+from opentelemetry import trace
+
+_JOB_TYPE_TO_SPAN_NAME = {
+ MergeRequest.MERGE: "Merge",
+ MergeRequest.CAT: "Cat",
+ MergeRequest.REF_STATE: "RefState",
+ MergeRequest.FILES_CHANGES: "FilesChanges",
+}
class MergeClient(object):
log = logging.getLogger("zuul.MergeClient")
_merger_api_class = MergerApi
+ tracer = trace.get_tracer("zuul")
def __init__(self, config, sched):
self.config = config
@@ -63,26 +74,33 @@ class MergeClient(object):
build_set_uuid = None
tenant_name = None
pipeline_name = None
+ parent_span = None
if build_set is not None:
build_set_uuid = build_set.uuid
tenant_name = build_set.item.pipeline.tenant.name
pipeline_name = build_set.item.pipeline.name
+ parent_span = tracing.restoreSpan(build_set.span_info)
+
+ with trace.use_span(parent_span):
+ job_span = self.tracer.start_span(_JOB_TYPE_TO_SPAN_NAME[job_type])
uuid = str(uuid4().hex)
log = get_annotated_logger(self.log, event)
log.debug("Submitting job %s with data %s", uuid, data)
- request = MergeRequest(
- uuid=uuid,
- job_type=job_type,
- build_set_uuid=build_set_uuid,
- tenant_name=tenant_name,
- pipeline_name=pipeline_name,
- event_id=event.zuul_event_id if event else None,
- precedence=precedence
- )
+ with trace.use_span(job_span):
+ request = MergeRequest(
+ uuid=uuid,
+ job_type=job_type,
+ build_set_uuid=build_set_uuid,
+ tenant_name=tenant_name,
+ pipeline_name=pipeline_name,
+ event_id=event.zuul_event_id if event else None,
+ precedence=precedence,
+ span_info=tracing.getSpanInfo(job_span),
+ )
return self.merger_api.submit(request, data,
needs_result=needs_result)
@@ -159,9 +177,11 @@ class MergeClient(object):
"via result event for %s", merge_request)
if merge_request.job_type == MergeRequest.FILES_CHANGES:
event = FilesChangesCompletedEvent(
+ merge_request.uuid,
merge_request.build_set_uuid,
files=None,
elapsed_time=None,
+ span_info=merge_request.span_info,
)
else:
event = MergeCompletedEvent(
@@ -175,6 +195,7 @@ class MergeClient(object):
item_in_branches=None,
errors=None,
elapsed_time=None,
+ span_info=merge_request.span_info,
)
try:
self.result_events[merge_request.tenant_name][
diff --git a/zuul/merger/server.py b/zuul/merger/server.py
index fe5b938a1..fec3df5e0 100644
--- a/zuul/merger/server.py
+++ b/zuul/merger/server.py
@@ -28,6 +28,7 @@ from kazoo.exceptions import NoNodeError
from zuul.lib import commandsocket
from zuul.lib.config import get_default
from zuul.lib.logutil import get_annotated_logger
+from zuul.lib import tracing
from zuul.merger import merger
from zuul.merger.merger import nullcontext
from zuul.model import (
@@ -94,6 +95,7 @@ class BaseMergeServer(metaclass=ABCMeta):
self.config = config
+ self.tracing = tracing.Tracing(self.config)
self.zk_client = ZooKeeperClient.fromConfig(self.config)
self.zk_client.connect()
@@ -180,6 +182,7 @@ class BaseMergeServer(metaclass=ABCMeta):
self._merger_running = False
self.merger_loop_wake_event.set()
self.zk_client.disconnect()
+ self.tracing.stop()
def join(self):
self.merger_loop_wake_event.set()
@@ -204,7 +207,11 @@ class BaseMergeServer(metaclass=ABCMeta):
for merge_request in self.merger_api.next():
if not self._merger_running:
break
- self._runMergeJob(merge_request)
+
+ with tracing.startSpanInContext(
+ merge_request.span_context, "MergerJob",
+ attributes={"merger": self.hostname}):
+ self._runMergeJob(merge_request)
except Exception:
self.log.exception("Error in merge thread:")
time.sleep(5)
@@ -411,9 +418,11 @@ class BaseMergeServer(metaclass=ABCMeta):
)
if merge_request.job_type == MergeRequest.FILES_CHANGES:
event = FilesChangesCompletedEvent(
+ merge_request.uuid,
merge_request.build_set_uuid,
files,
elapsed_time,
+ merge_request.span_info,
)
else:
event = MergeCompletedEvent(
@@ -427,6 +436,7 @@ class BaseMergeServer(metaclass=ABCMeta):
item_in_branches,
errors,
elapsed_time,
+ merge_request.span_info,
)
def put_complete_event(log, merge_request, event):
diff --git a/zuul/model.py b/zuul/model.py
index 66bd3ded4..5abb7fc5a 100644
--- a/zuul/model.py
+++ b/zuul/model.py
@@ -3473,13 +3473,14 @@ class MergeRequest(JobRequest):
def __init__(self, uuid, job_type, build_set_uuid, tenant_name,
pipeline_name, event_id, precedence=None, state=None,
- result_path=None, span_context=None):
+ result_path=None, span_context=None, span_info=None):
super().__init__(uuid, precedence, state, result_path, span_context)
self.job_type = job_type
self.build_set_uuid = build_set_uuid
self.tenant_name = tenant_name
self.pipeline_name = pipeline_name
self.event_id = event_id
+ self.span_info = span_info
def toDict(self):
d = super().toDict()
@@ -3489,6 +3490,7 @@ class MergeRequest(JobRequest):
"tenant_name": self.tenant_name,
"pipeline_name": self.pipeline_name,
"event_id": self.event_id,
+ "span_info": self.span_info,
})
return d
@@ -3505,6 +3507,7 @@ class MergeRequest(JobRequest):
state=data["state"],
result_path=data["result_path"],
span_context=data.get("span_context"),
+ span_info=data.get("span_info"),
)
def __repr__(self):
@@ -6369,7 +6372,7 @@ class MergeCompletedEvent(ResultEvent):
def __init__(self, request_uuid, build_set_uuid, merged, updated,
commit, files, repo_state, item_in_branches,
- errors, elapsed_time):
+ errors, elapsed_time, span_info=None):
self.request_uuid = request_uuid
self.build_set_uuid = build_set_uuid
self.merged = merged
@@ -6380,6 +6383,7 @@ class MergeCompletedEvent(ResultEvent):
self.item_in_branches = item_in_branches or []
self.errors = errors or []
self.elapsed_time = elapsed_time
+ self.span_info = span_info
def __repr__(self):
return ('<MergeCompletedEvent job: %s buildset: %s merged: %s '
@@ -6400,6 +6404,7 @@ class MergeCompletedEvent(ResultEvent):
"item_in_branches": list(self.item_in_branches),
"errors": list(self.errors),
"elapsed_time": self.elapsed_time,
+ "span_info": self.span_info,
}
@classmethod
@@ -6415,6 +6420,7 @@ class MergeCompletedEvent(ResultEvent):
list(data.get("item_in_branches", [])),
list(data.get("errors", [])),
data.get("elapsed_time"),
+ data.get("span_info"),
)
@@ -6426,24 +6432,31 @@ class FilesChangesCompletedEvent(ResultEvent):
:arg float elapsed_time: Elapsed time of merge op in seconds.
"""
- def __init__(self, build_set_uuid, files, elapsed_time):
+ def __init__(self, request_uuid, build_set_uuid, files, elapsed_time,
+ span_info=None):
+ self.request_uuid = request_uuid
self.build_set_uuid = build_set_uuid
self.files = files or []
self.elapsed_time = elapsed_time
+ self.span_info = span_info
def toDict(self):
return {
+ "request_uuid": self.request_uuid,
"build_set_uuid": self.build_set_uuid,
"files": list(self.files),
"elapsed_time": self.elapsed_time,
+ "span_info": self.span_info,
}
@classmethod
def fromDict(cls, data):
return cls(
+ data.get("request_uuid"),
data.get("build_set_uuid"),
list(data.get("files", [])),
data.get("elapsed_time"),
+ data.get("span_info"),
)
diff --git a/zuul/scheduler.py b/zuul/scheduler.py
index 79b73b2a0..9548cb6d2 100644
--- a/zuul/scheduler.py
+++ b/zuul/scheduler.py
@@ -2754,12 +2754,30 @@ class Scheduler(threading.Thread):
build_set = self._getBuildSetFromPipeline(event, pipeline)
if not build_set:
return
+
+ tracing.endSavedSpan(
+ event.span_info,
+ attributes={
+ "uuid": event.request_uuid,
+ "buildset_uuid": build_set.uuid,
+ "zuul_event_id": build_set.item.event.zuul_event_id,
+ }
+ )
pipeline.manager.onMergeCompleted(event, build_set)
def _doFilesChangesCompletedEvent(self, event, pipeline):
build_set = self._getBuildSetFromPipeline(event, pipeline)
if not build_set:
return
+
+ tracing.endSavedSpan(
+ event.span_info,
+ attributes={
+ "uuid": event.request_uuid,
+ "buildset_uuid": build_set.uuid,
+ "zuul_event_id": build_set.item.event.zuul_event_id,
+ }
+ )
pipeline.manager.onFilesChangesCompleted(event, build_set)
def _doNodesProvisionedEvent(self, event, pipeline):