diff options
-rw-r--r-- | tests/unit/test_tracing.py | 42 | ||||
-rw-r--r-- | zuul/merger/client.py | 39 | ||||
-rw-r--r-- | zuul/merger/server.py | 12 | ||||
-rw-r--r-- | zuul/model.py | 19 | ||||
-rw-r--r-- | zuul/scheduler.py | 18 |
5 files changed, 104 insertions, 26 deletions
diff --git a/tests/unit/test_tracing.py b/tests/unit/test_tracing.py index d26a9510d..00ef85e36 100644 --- a/tests/unit/test_tracing.py +++ b/tests/unit/test_tracing.py @@ -31,6 +31,16 @@ class TestTracing(ZuulTestCase): config_file = 'zuul-tracing.conf' tenant_config_file = "config/single-tenant/main.yaml" + def _waitForSpans(self, *span_names, timeout=60,): + for _ in iterate_timeout(timeout, "requests to arrive"): + test_requests = [ + r for r in self.otlp.requests + if r.resource_spans[0].scope_spans[0].spans[0].name + in span_names + ] + if len(test_requests) == len(span_names): + return test_requests + def test_tracing_api(self): tracer = trace.get_tracer("zuul") @@ -68,10 +78,10 @@ class TestTracing(ZuulTestCase): tracing.endSavedSpan(span_info, end_time=time.time(), attributes={'endattr': 'baz'}) - for _ in iterate_timeout(60, "request to arrive"): - if len(self.otlp.requests) == 4: - break - req1 = self.otlp.requests[0] + test_requests = self._waitForSpans( + "parent-trace", "child1-trace", "child2-trace", "child3-trace") + + req1 = test_requests[0] self.log.debug("Received:\n%s", req1) attrs = attributes_to_dict(req1.resource_spans[0].resource.attributes) self.assertEqual({"service.name": "zuultest"}, attrs) @@ -80,7 +90,7 @@ class TestTracing(ZuulTestCase): span1 = req1.resource_spans[0].scope_spans[0].spans[0] self.assertEqual("child1-trace", span1.name) - req2 = self.otlp.requests[1] + req2 = test_requests[1] self.log.debug("Received:\n%s", req2) span2 = req2.resource_spans[0].scope_spans[0].spans[0] self.assertEqual("child2-trace", span2.name) @@ -88,12 +98,12 @@ class TestTracing(ZuulTestCase): attrs = attributes_to_dict(span2.links[0].attributes) self.assertEqual({"relationship": "prev"}, attrs) - req3 = self.otlp.requests[2] + req3 = test_requests[2] self.log.debug("Received:\n%s", req3) span3 = req3.resource_spans[0].scope_spans[0].spans[0] self.assertEqual("child3-trace", span3.name) - req4 = self.otlp.requests[3] + req4 = test_requests[3] self.log.debug("Received:\n%s", req4) span4 = req4.resource_spans[0].scope_spans[0].spans[0] self.assertEqual("parent-trace", span4.name) @@ -138,10 +148,10 @@ class TestTracing(ZuulTestCase): # End our root span manually. span.end(end_time=time.time()) - for _ in iterate_timeout(60, "request to arrive"): - if len(self.otlp.requests) == 3: - break - req1 = self.otlp.requests[0] + test_requests = self._waitForSpans( + "child1-trace", "child2-trace", "child3-trace") + + req1 = test_requests[0] self.log.debug("Received:\n%s", req1) attrs = attributes_to_dict(req1.resource_spans[0].resource.attributes) self.assertEqual({"service.name": "zuultest"}, attrs) @@ -150,7 +160,7 @@ class TestTracing(ZuulTestCase): span1 = req1.resource_spans[0].scope_spans[0].spans[0] self.assertEqual("child1-trace", span1.name) - req2 = self.otlp.requests[1] + req2 = test_requests[1] self.log.debug("Received:\n%s", req2) span2 = req2.resource_spans[0].scope_spans[0].spans[0] self.assertEqual("child2-trace", span2.name) @@ -158,7 +168,7 @@ class TestTracing(ZuulTestCase): attrs = attributes_to_dict(span2.links[0].attributes) self.assertEqual({"relationship": "prev"}, attrs) - req3 = self.otlp.requests[2] + req3 = test_requests[2] self.log.debug("Received:\n%s", req3) span3 = req3.resource_spans[0].scope_spans[0].spans[0] self.assertEqual("child3-trace", span3.name) @@ -181,6 +191,8 @@ class TestTracing(ZuulTestCase): self.log.debug("Received:\n%s", buildset) item = self.getSpan('QueueItem') self.log.debug("Received:\n%s", item) + merge_job = self.getSpan('Merge') + self.log.debug("Received:\n%s", merge_job) build = self.getSpan('Build') self.log.debug("Received:\n%s", build) job = self.getSpan('JobExecution') @@ -192,6 +204,10 @@ class TestTracing(ZuulTestCase): item.start_time_unix_nano) self.assertTrue(buildset.end_time_unix_nano <= item.end_time_unix_nano) + self.assertTrue(merge_job.start_time_unix_nano >= + buildset.start_time_unix_nano) + self.assertTrue(merge_job.end_time_unix_nano <= + buildset.end_time_unix_nano) item_attrs = attributes_to_dict(item.attributes) self.assertTrue(item_attrs['ref_number'] == "1") self.assertTrue(item_attrs['ref_patchset'] == "1") diff --git a/zuul/merger/client.py b/zuul/merger/client.py index 29fa39aaf..3f0f2478d 100644 --- a/zuul/merger/client.py +++ b/zuul/merger/client.py @@ -17,6 +17,7 @@ from uuid import uuid4 from zuul.lib.config import get_default from zuul.lib.logutil import get_annotated_logger +import zuul.lib.tracing as tracing from zuul.model import ( FilesChangesCompletedEvent, MergeCompletedEvent, @@ -27,13 +28,23 @@ from zuul.model import ( from zuul.zk.event_queues import PipelineResultEventQueue from zuul.zk.merger import MergerApi from zuul.zk.exceptions import JobRequestNotFound + from kazoo.exceptions import BadVersionError, NoNodeError +from opentelemetry import trace + +_JOB_TYPE_TO_SPAN_NAME = { + MergeRequest.MERGE: "Merge", + MergeRequest.CAT: "Cat", + MergeRequest.REF_STATE: "RefState", + MergeRequest.FILES_CHANGES: "FilesChanges", +} class MergeClient(object): log = logging.getLogger("zuul.MergeClient") _merger_api_class = MergerApi + tracer = trace.get_tracer("zuul") def __init__(self, config, sched): self.config = config @@ -63,26 +74,33 @@ class MergeClient(object): build_set_uuid = None tenant_name = None pipeline_name = None + parent_span = None if build_set is not None: build_set_uuid = build_set.uuid tenant_name = build_set.item.pipeline.tenant.name pipeline_name = build_set.item.pipeline.name + parent_span = tracing.restoreSpan(build_set.span_info) + + with trace.use_span(parent_span): + job_span = self.tracer.start_span(_JOB_TYPE_TO_SPAN_NAME[job_type]) uuid = str(uuid4().hex) log = get_annotated_logger(self.log, event) log.debug("Submitting job %s with data %s", uuid, data) - request = MergeRequest( - uuid=uuid, - job_type=job_type, - build_set_uuid=build_set_uuid, - tenant_name=tenant_name, - pipeline_name=pipeline_name, - event_id=event.zuul_event_id if event else None, - precedence=precedence - ) + with trace.use_span(job_span): + request = MergeRequest( + uuid=uuid, + job_type=job_type, + build_set_uuid=build_set_uuid, + tenant_name=tenant_name, + pipeline_name=pipeline_name, + event_id=event.zuul_event_id if event else None, + precedence=precedence, + span_info=tracing.getSpanInfo(job_span), + ) return self.merger_api.submit(request, data, needs_result=needs_result) @@ -159,9 +177,11 @@ class MergeClient(object): "via result event for %s", merge_request) if merge_request.job_type == MergeRequest.FILES_CHANGES: event = FilesChangesCompletedEvent( + merge_request.uuid, merge_request.build_set_uuid, files=None, elapsed_time=None, + span_info=merge_request.span_info, ) else: event = MergeCompletedEvent( @@ -175,6 +195,7 @@ class MergeClient(object): item_in_branches=None, errors=None, elapsed_time=None, + span_info=merge_request.span_info, ) try: self.result_events[merge_request.tenant_name][ diff --git a/zuul/merger/server.py b/zuul/merger/server.py index fe5b938a1..fec3df5e0 100644 --- a/zuul/merger/server.py +++ b/zuul/merger/server.py @@ -28,6 +28,7 @@ from kazoo.exceptions import NoNodeError from zuul.lib import commandsocket from zuul.lib.config import get_default from zuul.lib.logutil import get_annotated_logger +from zuul.lib import tracing from zuul.merger import merger from zuul.merger.merger import nullcontext from zuul.model import ( @@ -94,6 +95,7 @@ class BaseMergeServer(metaclass=ABCMeta): self.config = config + self.tracing = tracing.Tracing(self.config) self.zk_client = ZooKeeperClient.fromConfig(self.config) self.zk_client.connect() @@ -180,6 +182,7 @@ class BaseMergeServer(metaclass=ABCMeta): self._merger_running = False self.merger_loop_wake_event.set() self.zk_client.disconnect() + self.tracing.stop() def join(self): self.merger_loop_wake_event.set() @@ -204,7 +207,11 @@ class BaseMergeServer(metaclass=ABCMeta): for merge_request in self.merger_api.next(): if not self._merger_running: break - self._runMergeJob(merge_request) + + with tracing.startSpanInContext( + merge_request.span_context, "MergerJob", + attributes={"merger": self.hostname}): + self._runMergeJob(merge_request) except Exception: self.log.exception("Error in merge thread:") time.sleep(5) @@ -411,9 +418,11 @@ class BaseMergeServer(metaclass=ABCMeta): ) if merge_request.job_type == MergeRequest.FILES_CHANGES: event = FilesChangesCompletedEvent( + merge_request.uuid, merge_request.build_set_uuid, files, elapsed_time, + merge_request.span_info, ) else: event = MergeCompletedEvent( @@ -427,6 +436,7 @@ class BaseMergeServer(metaclass=ABCMeta): item_in_branches, errors, elapsed_time, + merge_request.span_info, ) def put_complete_event(log, merge_request, event): diff --git a/zuul/model.py b/zuul/model.py index c87037688..e50594a2d 100644 --- a/zuul/model.py +++ b/zuul/model.py @@ -3480,13 +3480,14 @@ class MergeRequest(JobRequest): def __init__(self, uuid, job_type, build_set_uuid, tenant_name, pipeline_name, event_id, precedence=None, state=None, - result_path=None, span_context=None): + result_path=None, span_context=None, span_info=None): super().__init__(uuid, precedence, state, result_path, span_context) self.job_type = job_type self.build_set_uuid = build_set_uuid self.tenant_name = tenant_name self.pipeline_name = pipeline_name self.event_id = event_id + self.span_info = span_info def toDict(self): d = super().toDict() @@ -3496,6 +3497,7 @@ class MergeRequest(JobRequest): "tenant_name": self.tenant_name, "pipeline_name": self.pipeline_name, "event_id": self.event_id, + "span_info": self.span_info, }) return d @@ -3512,6 +3514,7 @@ class MergeRequest(JobRequest): state=data["state"], result_path=data["result_path"], span_context=data.get("span_context"), + span_info=data.get("span_info"), ) def __repr__(self): @@ -6390,7 +6393,7 @@ class MergeCompletedEvent(ResultEvent): def __init__(self, request_uuid, build_set_uuid, merged, updated, commit, files, repo_state, item_in_branches, - errors, elapsed_time): + errors, elapsed_time, span_info=None): self.request_uuid = request_uuid self.build_set_uuid = build_set_uuid self.merged = merged @@ -6401,6 +6404,7 @@ class MergeCompletedEvent(ResultEvent): self.item_in_branches = item_in_branches or [] self.errors = errors or [] self.elapsed_time = elapsed_time + self.span_info = span_info def __repr__(self): return ('<MergeCompletedEvent job: %s buildset: %s merged: %s ' @@ -6421,6 +6425,7 @@ class MergeCompletedEvent(ResultEvent): "item_in_branches": list(self.item_in_branches), "errors": list(self.errors), "elapsed_time": self.elapsed_time, + "span_info": self.span_info, } @classmethod @@ -6436,6 +6441,7 @@ class MergeCompletedEvent(ResultEvent): list(data.get("item_in_branches", [])), list(data.get("errors", [])), data.get("elapsed_time"), + data.get("span_info"), ) @@ -6447,24 +6453,31 @@ class FilesChangesCompletedEvent(ResultEvent): :arg float elapsed_time: Elapsed time of merge op in seconds. """ - def __init__(self, build_set_uuid, files, elapsed_time): + def __init__(self, request_uuid, build_set_uuid, files, elapsed_time, + span_info=None): + self.request_uuid = request_uuid self.build_set_uuid = build_set_uuid self.files = files or [] self.elapsed_time = elapsed_time + self.span_info = span_info def toDict(self): return { + "request_uuid": self.request_uuid, "build_set_uuid": self.build_set_uuid, "files": list(self.files), "elapsed_time": self.elapsed_time, + "span_info": self.span_info, } @classmethod def fromDict(cls, data): return cls( + data.get("request_uuid"), data.get("build_set_uuid"), list(data.get("files", [])), data.get("elapsed_time"), + data.get("span_info"), ) diff --git a/zuul/scheduler.py b/zuul/scheduler.py index 79b73b2a0..9548cb6d2 100644 --- a/zuul/scheduler.py +++ b/zuul/scheduler.py @@ -2754,12 +2754,30 @@ class Scheduler(threading.Thread): build_set = self._getBuildSetFromPipeline(event, pipeline) if not build_set: return + + tracing.endSavedSpan( + event.span_info, + attributes={ + "uuid": event.request_uuid, + "buildset_uuid": build_set.uuid, + "zuul_event_id": build_set.item.event.zuul_event_id, + } + ) pipeline.manager.onMergeCompleted(event, build_set) def _doFilesChangesCompletedEvent(self, event, pipeline): build_set = self._getBuildSetFromPipeline(event, pipeline) if not build_set: return + + tracing.endSavedSpan( + event.span_info, + attributes={ + "uuid": event.request_uuid, + "buildset_uuid": build_set.uuid, + "zuul_event_id": build_set.item.event.zuul_event_id, + } + ) pipeline.manager.onFilesChangesCompleted(event, build_set) def _doNodesProvisionedEvent(self, event, pipeline): |