summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorZuul <zuul@review.opendev.org>2022-09-28 08:11:35 +0000
committerGerrit Code Review <review@openstack.org>2022-09-28 08:11:35 +0000
commitc53e3ca5b634e4c93a5caca689475e91d22601c0 (patch)
treef38586a4841e1d9a73c6120e2a4fcf05fd43114e
parent013b9fbd5324459cd48e225cdd918846f91043f8 (diff)
parentf1e3d67608e6b1f8850e6b0e8e5f28cfd3c73409 (diff)
downloadzuul-c53e3ca5b634e4c93a5caca689475e91d22601c0.tar.gz
Merge "Trace merge requests and merger operations"
-rw-r--r--tests/unit/test_tracing.py42
-rw-r--r--zuul/merger/client.py39
-rw-r--r--zuul/merger/server.py12
-rw-r--r--zuul/model.py19
-rw-r--r--zuul/scheduler.py18
5 files changed, 104 insertions, 26 deletions
diff --git a/tests/unit/test_tracing.py b/tests/unit/test_tracing.py
index d26a9510d..00ef85e36 100644
--- a/tests/unit/test_tracing.py
+++ b/tests/unit/test_tracing.py
@@ -31,6 +31,16 @@ class TestTracing(ZuulTestCase):
config_file = 'zuul-tracing.conf'
tenant_config_file = "config/single-tenant/main.yaml"
+ def _waitForSpans(self, *span_names, timeout=60,):
+ for _ in iterate_timeout(timeout, "requests to arrive"):
+ test_requests = [
+ r for r in self.otlp.requests
+ if r.resource_spans[0].scope_spans[0].spans[0].name
+ in span_names
+ ]
+ if len(test_requests) == len(span_names):
+ return test_requests
+
def test_tracing_api(self):
tracer = trace.get_tracer("zuul")
@@ -68,10 +78,10 @@ class TestTracing(ZuulTestCase):
tracing.endSavedSpan(span_info, end_time=time.time(),
attributes={'endattr': 'baz'})
- for _ in iterate_timeout(60, "request to arrive"):
- if len(self.otlp.requests) == 4:
- break
- req1 = self.otlp.requests[0]
+ test_requests = self._waitForSpans(
+ "parent-trace", "child1-trace", "child2-trace", "child3-trace")
+
+ req1 = test_requests[0]
self.log.debug("Received:\n%s", req1)
attrs = attributes_to_dict(req1.resource_spans[0].resource.attributes)
self.assertEqual({"service.name": "zuultest"}, attrs)
@@ -80,7 +90,7 @@ class TestTracing(ZuulTestCase):
span1 = req1.resource_spans[0].scope_spans[0].spans[0]
self.assertEqual("child1-trace", span1.name)
- req2 = self.otlp.requests[1]
+ req2 = test_requests[1]
self.log.debug("Received:\n%s", req2)
span2 = req2.resource_spans[0].scope_spans[0].spans[0]
self.assertEqual("child2-trace", span2.name)
@@ -88,12 +98,12 @@ class TestTracing(ZuulTestCase):
attrs = attributes_to_dict(span2.links[0].attributes)
self.assertEqual({"relationship": "prev"}, attrs)
- req3 = self.otlp.requests[2]
+ req3 = test_requests[2]
self.log.debug("Received:\n%s", req3)
span3 = req3.resource_spans[0].scope_spans[0].spans[0]
self.assertEqual("child3-trace", span3.name)
- req4 = self.otlp.requests[3]
+ req4 = test_requests[3]
self.log.debug("Received:\n%s", req4)
span4 = req4.resource_spans[0].scope_spans[0].spans[0]
self.assertEqual("parent-trace", span4.name)
@@ -138,10 +148,10 @@ class TestTracing(ZuulTestCase):
# End our root span manually.
span.end(end_time=time.time())
- for _ in iterate_timeout(60, "request to arrive"):
- if len(self.otlp.requests) == 3:
- break
- req1 = self.otlp.requests[0]
+ test_requests = self._waitForSpans(
+ "child1-trace", "child2-trace", "child3-trace")
+
+ req1 = test_requests[0]
self.log.debug("Received:\n%s", req1)
attrs = attributes_to_dict(req1.resource_spans[0].resource.attributes)
self.assertEqual({"service.name": "zuultest"}, attrs)
@@ -150,7 +160,7 @@ class TestTracing(ZuulTestCase):
span1 = req1.resource_spans[0].scope_spans[0].spans[0]
self.assertEqual("child1-trace", span1.name)
- req2 = self.otlp.requests[1]
+ req2 = test_requests[1]
self.log.debug("Received:\n%s", req2)
span2 = req2.resource_spans[0].scope_spans[0].spans[0]
self.assertEqual("child2-trace", span2.name)
@@ -158,7 +168,7 @@ class TestTracing(ZuulTestCase):
attrs = attributes_to_dict(span2.links[0].attributes)
self.assertEqual({"relationship": "prev"}, attrs)
- req3 = self.otlp.requests[2]
+ req3 = test_requests[2]
self.log.debug("Received:\n%s", req3)
span3 = req3.resource_spans[0].scope_spans[0].spans[0]
self.assertEqual("child3-trace", span3.name)
@@ -181,6 +191,8 @@ class TestTracing(ZuulTestCase):
self.log.debug("Received:\n%s", buildset)
item = self.getSpan('QueueItem')
self.log.debug("Received:\n%s", item)
+ merge_job = self.getSpan('Merge')
+ self.log.debug("Received:\n%s", merge_job)
build = self.getSpan('Build')
self.log.debug("Received:\n%s", build)
job = self.getSpan('JobExecution')
@@ -192,6 +204,10 @@ class TestTracing(ZuulTestCase):
item.start_time_unix_nano)
self.assertTrue(buildset.end_time_unix_nano <=
item.end_time_unix_nano)
+ self.assertTrue(merge_job.start_time_unix_nano >=
+ buildset.start_time_unix_nano)
+ self.assertTrue(merge_job.end_time_unix_nano <=
+ buildset.end_time_unix_nano)
item_attrs = attributes_to_dict(item.attributes)
self.assertTrue(item_attrs['ref_number'] == "1")
self.assertTrue(item_attrs['ref_patchset'] == "1")
diff --git a/zuul/merger/client.py b/zuul/merger/client.py
index 29fa39aaf..3f0f2478d 100644
--- a/zuul/merger/client.py
+++ b/zuul/merger/client.py
@@ -17,6 +17,7 @@ from uuid import uuid4
from zuul.lib.config import get_default
from zuul.lib.logutil import get_annotated_logger
+import zuul.lib.tracing as tracing
from zuul.model import (
FilesChangesCompletedEvent,
MergeCompletedEvent,
@@ -27,13 +28,23 @@ from zuul.model import (
from zuul.zk.event_queues import PipelineResultEventQueue
from zuul.zk.merger import MergerApi
from zuul.zk.exceptions import JobRequestNotFound
+
from kazoo.exceptions import BadVersionError, NoNodeError
+from opentelemetry import trace
+
+_JOB_TYPE_TO_SPAN_NAME = {
+ MergeRequest.MERGE: "Merge",
+ MergeRequest.CAT: "Cat",
+ MergeRequest.REF_STATE: "RefState",
+ MergeRequest.FILES_CHANGES: "FilesChanges",
+}
class MergeClient(object):
log = logging.getLogger("zuul.MergeClient")
_merger_api_class = MergerApi
+ tracer = trace.get_tracer("zuul")
def __init__(self, config, sched):
self.config = config
@@ -63,26 +74,33 @@ class MergeClient(object):
build_set_uuid = None
tenant_name = None
pipeline_name = None
+ parent_span = None
if build_set is not None:
build_set_uuid = build_set.uuid
tenant_name = build_set.item.pipeline.tenant.name
pipeline_name = build_set.item.pipeline.name
+ parent_span = tracing.restoreSpan(build_set.span_info)
+
+ with trace.use_span(parent_span):
+ job_span = self.tracer.start_span(_JOB_TYPE_TO_SPAN_NAME[job_type])
uuid = str(uuid4().hex)
log = get_annotated_logger(self.log, event)
log.debug("Submitting job %s with data %s", uuid, data)
- request = MergeRequest(
- uuid=uuid,
- job_type=job_type,
- build_set_uuid=build_set_uuid,
- tenant_name=tenant_name,
- pipeline_name=pipeline_name,
- event_id=event.zuul_event_id if event else None,
- precedence=precedence
- )
+ with trace.use_span(job_span):
+ request = MergeRequest(
+ uuid=uuid,
+ job_type=job_type,
+ build_set_uuid=build_set_uuid,
+ tenant_name=tenant_name,
+ pipeline_name=pipeline_name,
+ event_id=event.zuul_event_id if event else None,
+ precedence=precedence,
+ span_info=tracing.getSpanInfo(job_span),
+ )
return self.merger_api.submit(request, data,
needs_result=needs_result)
@@ -159,9 +177,11 @@ class MergeClient(object):
"via result event for %s", merge_request)
if merge_request.job_type == MergeRequest.FILES_CHANGES:
event = FilesChangesCompletedEvent(
+ merge_request.uuid,
merge_request.build_set_uuid,
files=None,
elapsed_time=None,
+ span_info=merge_request.span_info,
)
else:
event = MergeCompletedEvent(
@@ -175,6 +195,7 @@ class MergeClient(object):
item_in_branches=None,
errors=None,
elapsed_time=None,
+ span_info=merge_request.span_info,
)
try:
self.result_events[merge_request.tenant_name][
diff --git a/zuul/merger/server.py b/zuul/merger/server.py
index fe5b938a1..fec3df5e0 100644
--- a/zuul/merger/server.py
+++ b/zuul/merger/server.py
@@ -28,6 +28,7 @@ from kazoo.exceptions import NoNodeError
from zuul.lib import commandsocket
from zuul.lib.config import get_default
from zuul.lib.logutil import get_annotated_logger
+from zuul.lib import tracing
from zuul.merger import merger
from zuul.merger.merger import nullcontext
from zuul.model import (
@@ -94,6 +95,7 @@ class BaseMergeServer(metaclass=ABCMeta):
self.config = config
+ self.tracing = tracing.Tracing(self.config)
self.zk_client = ZooKeeperClient.fromConfig(self.config)
self.zk_client.connect()
@@ -180,6 +182,7 @@ class BaseMergeServer(metaclass=ABCMeta):
self._merger_running = False
self.merger_loop_wake_event.set()
self.zk_client.disconnect()
+ self.tracing.stop()
def join(self):
self.merger_loop_wake_event.set()
@@ -204,7 +207,11 @@ class BaseMergeServer(metaclass=ABCMeta):
for merge_request in self.merger_api.next():
if not self._merger_running:
break
- self._runMergeJob(merge_request)
+
+ with tracing.startSpanInContext(
+ merge_request.span_context, "MergerJob",
+ attributes={"merger": self.hostname}):
+ self._runMergeJob(merge_request)
except Exception:
self.log.exception("Error in merge thread:")
time.sleep(5)
@@ -411,9 +418,11 @@ class BaseMergeServer(metaclass=ABCMeta):
)
if merge_request.job_type == MergeRequest.FILES_CHANGES:
event = FilesChangesCompletedEvent(
+ merge_request.uuid,
merge_request.build_set_uuid,
files,
elapsed_time,
+ merge_request.span_info,
)
else:
event = MergeCompletedEvent(
@@ -427,6 +436,7 @@ class BaseMergeServer(metaclass=ABCMeta):
item_in_branches,
errors,
elapsed_time,
+ merge_request.span_info,
)
def put_complete_event(log, merge_request, event):
diff --git a/zuul/model.py b/zuul/model.py
index c87037688..e50594a2d 100644
--- a/zuul/model.py
+++ b/zuul/model.py
@@ -3480,13 +3480,14 @@ class MergeRequest(JobRequest):
def __init__(self, uuid, job_type, build_set_uuid, tenant_name,
pipeline_name, event_id, precedence=None, state=None,
- result_path=None, span_context=None):
+ result_path=None, span_context=None, span_info=None):
super().__init__(uuid, precedence, state, result_path, span_context)
self.job_type = job_type
self.build_set_uuid = build_set_uuid
self.tenant_name = tenant_name
self.pipeline_name = pipeline_name
self.event_id = event_id
+ self.span_info = span_info
def toDict(self):
d = super().toDict()
@@ -3496,6 +3497,7 @@ class MergeRequest(JobRequest):
"tenant_name": self.tenant_name,
"pipeline_name": self.pipeline_name,
"event_id": self.event_id,
+ "span_info": self.span_info,
})
return d
@@ -3512,6 +3514,7 @@ class MergeRequest(JobRequest):
state=data["state"],
result_path=data["result_path"],
span_context=data.get("span_context"),
+ span_info=data.get("span_info"),
)
def __repr__(self):
@@ -6390,7 +6393,7 @@ class MergeCompletedEvent(ResultEvent):
def __init__(self, request_uuid, build_set_uuid, merged, updated,
commit, files, repo_state, item_in_branches,
- errors, elapsed_time):
+ errors, elapsed_time, span_info=None):
self.request_uuid = request_uuid
self.build_set_uuid = build_set_uuid
self.merged = merged
@@ -6401,6 +6404,7 @@ class MergeCompletedEvent(ResultEvent):
self.item_in_branches = item_in_branches or []
self.errors = errors or []
self.elapsed_time = elapsed_time
+ self.span_info = span_info
def __repr__(self):
return ('<MergeCompletedEvent job: %s buildset: %s merged: %s '
@@ -6421,6 +6425,7 @@ class MergeCompletedEvent(ResultEvent):
"item_in_branches": list(self.item_in_branches),
"errors": list(self.errors),
"elapsed_time": self.elapsed_time,
+ "span_info": self.span_info,
}
@classmethod
@@ -6436,6 +6441,7 @@ class MergeCompletedEvent(ResultEvent):
list(data.get("item_in_branches", [])),
list(data.get("errors", [])),
data.get("elapsed_time"),
+ data.get("span_info"),
)
@@ -6447,24 +6453,31 @@ class FilesChangesCompletedEvent(ResultEvent):
:arg float elapsed_time: Elapsed time of merge op in seconds.
"""
- def __init__(self, build_set_uuid, files, elapsed_time):
+ def __init__(self, request_uuid, build_set_uuid, files, elapsed_time,
+ span_info=None):
+ self.request_uuid = request_uuid
self.build_set_uuid = build_set_uuid
self.files = files or []
self.elapsed_time = elapsed_time
+ self.span_info = span_info
def toDict(self):
return {
+ "request_uuid": self.request_uuid,
"build_set_uuid": self.build_set_uuid,
"files": list(self.files),
"elapsed_time": self.elapsed_time,
+ "span_info": self.span_info,
}
@classmethod
def fromDict(cls, data):
return cls(
+ data.get("request_uuid"),
data.get("build_set_uuid"),
list(data.get("files", [])),
data.get("elapsed_time"),
+ data.get("span_info"),
)
diff --git a/zuul/scheduler.py b/zuul/scheduler.py
index 79b73b2a0..9548cb6d2 100644
--- a/zuul/scheduler.py
+++ b/zuul/scheduler.py
@@ -2754,12 +2754,30 @@ class Scheduler(threading.Thread):
build_set = self._getBuildSetFromPipeline(event, pipeline)
if not build_set:
return
+
+ tracing.endSavedSpan(
+ event.span_info,
+ attributes={
+ "uuid": event.request_uuid,
+ "buildset_uuid": build_set.uuid,
+ "zuul_event_id": build_set.item.event.zuul_event_id,
+ }
+ )
pipeline.manager.onMergeCompleted(event, build_set)
def _doFilesChangesCompletedEvent(self, event, pipeline):
build_set = self._getBuildSetFromPipeline(event, pipeline)
if not build_set:
return
+
+ tracing.endSavedSpan(
+ event.span_info,
+ attributes={
+ "uuid": event.request_uuid,
+ "buildset_uuid": build_set.uuid,
+ "zuul_event_id": build_set.item.event.zuul_event_id,
+ }
+ )
pipeline.manager.onFilesChangesCompleted(event, build_set)
def _doNodesProvisionedEvent(self, event, pipeline):