summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorZuul <zuul@review.opendev.org>2023-02-27 15:18:23 +0000
committerGerrit Code Review <review@openstack.org>2023-02-27 15:18:23 +0000
commit7254a75cd8ec9a34d9eb10e51a4b93de2e4defd8 (patch)
treedaed83ec463dac18254aaa29c8992c2caa768d35
parent48ad958bb4e37e165b422daaaefb59d0ab708306 (diff)
parent8ca99e015489f51690af88187f19bd8cf35b06b7 (diff)
downloadzuul-7254a75cd8ec9a34d9eb10e51a4b93de2e4defd8.tar.gz
Merge "Only store trigger event info on queue item"
-rw-r--r--doc/source/developer/model-changelog.rst7
-rw-r--r--tests/unit/test_model_upgrade.py27
-rw-r--r--zuul/manager/__init__.py15
-rw-r--r--zuul/model.py68
-rw-r--r--zuul/model_api.py2
5 files changed, 101 insertions, 18 deletions
diff --git a/doc/source/developer/model-changelog.rst b/doc/source/developer/model-changelog.rst
index b80979362..f78b3f0a0 100644
--- a/doc/source/developer/model-changelog.rst
+++ b/doc/source/developer/model-changelog.rst
@@ -112,3 +112,10 @@ Version 12
:Prior Zuul version: 8.0.1
:Description: Adds job_versions and build_versions to BuildSet.
Affects schedulers.
+
+Version 13
+----------
+:Prior Zuul version: 8.2.0
+:Description: Stores only the necessary event info as part of a queue item
+ instead of the full trigger event.
+ Affects schedulers.
diff --git a/tests/unit/test_model_upgrade.py b/tests/unit/test_model_upgrade.py
index a5a49bed4..c6cdee7ea 100644
--- a/tests/unit/test_model_upgrade.py
+++ b/tests/unit/test_model_upgrade.py
@@ -293,6 +293,33 @@ class TestModelUpgrade(ZuulTestCase):
result='SUCCESS', changes='1,1'),
], ordered=False)
+ @model_version(12)
+ def test_model_12_13(self):
+ # Initially queue items will still have the full trigger event
+ # stored in Zookeeper. The trigger event will be converted to
+ # an event info object after the model API update.
+ self.executor_server.hold_jobs_in_build = True
+ A = self.fake_gerrit.addFakeChange('org/project1', 'master', 'A')
+ self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1))
+ self.waitUntilSettled()
+
+ self.assertEqual(len(self.builds), 1)
+
+ # Upgrade our component
+ self.model_test_component_info.model_api = 13
+
+ self.executor_server.hold_jobs_in_build = False
+ self.executor_server.release()
+ self.waitUntilSettled()
+
+ self.assertHistory([
+ dict(name='project-merge', result='SUCCESS', changes='1,1'),
+ dict(name='project-test1', result='SUCCESS', changes='1,1'),
+ dict(name='project-test2', result='SUCCESS', changes='1,1'),
+ dict(name='project1-project2-integration',
+ result='SUCCESS', changes='1,1'),
+ ], ordered=False)
+
class TestGithubModelUpgrade(ZuulTestCase):
config_file = 'zuul-github-driver.conf'
diff --git a/zuul/manager/__init__.py b/zuul/manager/__init__.py
index 36361df11..22543b5cd 100644
--- a/zuul/manager/__init__.py
+++ b/zuul/manager/__init__.py
@@ -625,7 +625,7 @@ class PipelineManager(metaclass=ABCMeta):
if enqueue_time:
item.enqueue_time = enqueue_time
item.live = live
- self.reportStats(item, added=True)
+ self.reportStats(item, trigger_event=event)
item.quiet = quiet
if item.live:
@@ -2197,7 +2197,7 @@ class PipelineManager(metaclass=ABCMeta):
log.error("Reporting item %s received: %s", item, ret)
return action, (not ret)
- def reportStats(self, item, added=False):
+ def reportStats(self, item, trigger_event=None):
if not self.sched.statsd:
return
try:
@@ -2236,18 +2236,21 @@ class PipelineManager(metaclass=ABCMeta):
if dt:
self.sched.statsd.timing(key + '.resident_time', dt)
self.sched.statsd.incr(key + '.total_changes')
- if added and hasattr(item.event, 'arrived_at_scheduler_timestamp'):
+ if (
+ trigger_event
+ and hasattr(trigger_event, 'arrived_at_scheduler_timestamp')
+ ):
now = time.time()
- arrived = item.event.arrived_at_scheduler_timestamp
+ arrived = trigger_event.arrived_at_scheduler_timestamp
processing = (now - arrived) * 1000
- elapsed = (now - item.event.timestamp) * 1000
+ elapsed = (now - trigger_event.timestamp) * 1000
self.sched.statsd.timing(
basekey + '.event_enqueue_processing_time',
processing)
self.sched.statsd.timing(
basekey + '.event_enqueue_time', elapsed)
self.reportPipelineTiming('event_enqueue_time',
- item.event.timestamp)
+ trigger_event.timestamp)
except Exception:
self.log.exception("Exception reporting pipeline stats")
diff --git a/zuul/model.py b/zuul/model.py
index 1d82b5f2c..e526b749c 100644
--- a/zuul/model.py
+++ b/zuul/model.py
@@ -4666,6 +4666,37 @@ class BuildSet(zkobject.ZKObject):
return Attributes(uuid=self.uuid)
+class EventInfo:
+
+ def __init__(self):
+ self.zuul_event_id = None
+ self.timestamp = time.time()
+ self.span_context = None
+
+ @classmethod
+ def fromEvent(cls, event):
+ tinfo = cls()
+ tinfo.zuul_event_id = event.zuul_event_id
+ tinfo.timestamp = event.timestamp
+ tinfo.span_context = event.span_context
+ return tinfo
+
+ @classmethod
+ def fromDict(cls, d):
+ tinfo = cls()
+ tinfo.zuul_event_id = d["zuul_event_id"]
+ tinfo.timestamp = d["timestamp"]
+ tinfo.span_context = d["span_context"]
+ return tinfo
+
+ def toDict(self):
+ return {
+ "zuul_event_id": self.zuul_event_id,
+ "timestamp": self.timestamp,
+ "span_context": self.span_context,
+ }
+
+
class QueueItem(zkobject.ZKObject):
"""Represents the position of a Change in a ChangeQueue.
@@ -4700,7 +4731,7 @@ class QueueItem(zkobject.ZKObject):
live=True, # Whether an item is intended to be processed at all
layout_uuid=None,
_cached_sql_results={},
- event=None, # The trigger event that lead to this queue item
+ event=None, # Info about the event that lead to this queue item
# Additional container for connection specifig information to be
# used by reporters throughout the lifecycle
@@ -4722,6 +4753,9 @@ class QueueItem(zkobject.ZKObject):
def new(klass, context, **kw):
obj = klass()
obj._set(**kw)
+ if COMPONENT_REGISTRY.model_api >= 13:
+ obj._set(event=obj.event and EventInfo.fromEvent(obj.event))
+
data = obj._trySerialize(context)
obj._save(context, data, create=True)
files_state = (BuildSet.COMPLETE if obj.change.files is not None
@@ -4750,10 +4784,18 @@ class QueueItem(zkobject.ZKObject):
return (tenant, pipeline, uuid)
def serialize(self, context):
- if isinstance(self.event, TriggerEvent):
- event_type = "TriggerEvent"
+ if COMPONENT_REGISTRY.model_api < 13:
+ if isinstance(self.event, TriggerEvent):
+ event_type = "TriggerEvent"
+ else:
+ event_type = self.event.__class__.__name__
else:
- event_type = self.event.__class__.__name__
+ event_type = "EventInfo"
+ if not isinstance(self.event, EventInfo):
+ # Convert our local trigger event to a trigger info
+ # object. This will only happen on the transition to
+ # model API version 13.
+ self._set(event=EventInfo.fromEvent(self.event))
data = {
"uuid": self.uuid,
@@ -4795,14 +4837,18 @@ class QueueItem(zkobject.ZKObject):
# child objects.
self._set(uuid=data["uuid"])
- event_type = data["event"]["type"]
- if event_type == "TriggerEvent":
- event_class = (
- self.pipeline.manager.sched.connections.getTriggerEventClass(
- data["event"]["data"]["driver_name"])
- )
+ if COMPONENT_REGISTRY.model_api < 13:
+ event_type = data["event"]["type"]
+ if event_type == "TriggerEvent":
+ event_class = (
+ self.pipeline.manager.sched.connections
+ .getTriggerEventClass(
+ data["event"]["data"]["driver_name"])
+ )
+ else:
+ event_class = EventTypeIndex.event_type_mapping.get(event_type)
else:
- event_class = EventTypeIndex.event_type_mapping.get(event_type)
+ event_class = EventInfo
if event_class is None:
raise NotImplementedError(
diff --git a/zuul/model_api.py b/zuul/model_api.py
index ccb12077d..0244296dd 100644
--- a/zuul/model_api.py
+++ b/zuul/model_api.py
@@ -14,4 +14,4 @@
# When making ZK schema changes, increment this and add a record to
# doc/source/developer/model-changelog.rst
-MODEL_API = 12
+MODEL_API = 13