summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSimon Westphahl <simon.westphahl@bmw.de>2023-02-21 10:39:03 +0100
committerSimon Westphahl <simon.westphahl@bmw.de>2023-02-22 15:07:33 +0100
commit8ca99e015489f51690af88187f19bd8cf35b06b7 (patch)
treecaa00c2b06bf55d16e13e74aa326da515f567f09
parentd57942a41b9077152b29e0f40f30e904a62a8fbc (diff)
downloadzuul-8ca99e015489f51690af88187f19bd8cf35b06b7.tar.gz
Only store trigger event info on queue item
The event that's currently stored as part of the queue item is not sharded. This means that we can see Zookeeper disconnects when the queue item data exceeds the max. Znode size of 1MB. Since we only need the event's timestamp and the Zuul event-id after an item is enqueued, we can reduce the amount of data we store in Zookeeper and also avoid sharding the event. Change-Id: I13577498e55fd4bb189679836219dea4dc5729fc
-rw-r--r--doc/source/developer/model-changelog.rst7
-rw-r--r--tests/unit/test_model_upgrade.py27
-rw-r--r--zuul/manager/__init__.py15
-rw-r--r--zuul/model.py68
-rw-r--r--zuul/model_api.py2
5 files changed, 101 insertions, 18 deletions
diff --git a/doc/source/developer/model-changelog.rst b/doc/source/developer/model-changelog.rst
index b80979362..f78b3f0a0 100644
--- a/doc/source/developer/model-changelog.rst
+++ b/doc/source/developer/model-changelog.rst
@@ -112,3 +112,10 @@ Version 12
:Prior Zuul version: 8.0.1
:Description: Adds job_versions and build_versions to BuildSet.
Affects schedulers.
+
+Version 13
+----------
+:Prior Zuul version: 8.2.0
+:Description: Stores only the necessary event info as part of a queue item
+ instead of the full trigger event.
+ Affects schedulers.
diff --git a/tests/unit/test_model_upgrade.py b/tests/unit/test_model_upgrade.py
index a5a49bed4..c6cdee7ea 100644
--- a/tests/unit/test_model_upgrade.py
+++ b/tests/unit/test_model_upgrade.py
@@ -293,6 +293,33 @@ class TestModelUpgrade(ZuulTestCase):
result='SUCCESS', changes='1,1'),
], ordered=False)
+ @model_version(12)
+ def test_model_12_13(self):
+ # Initially queue items will still have the full trigger event
+ # stored in Zookeeper. The trigger event will be converted to
+ # an event info object after the model API update.
+ self.executor_server.hold_jobs_in_build = True
+ A = self.fake_gerrit.addFakeChange('org/project1', 'master', 'A')
+ self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1))
+ self.waitUntilSettled()
+
+ self.assertEqual(len(self.builds), 1)
+
+ # Upgrade our component
+ self.model_test_component_info.model_api = 13
+
+ self.executor_server.hold_jobs_in_build = False
+ self.executor_server.release()
+ self.waitUntilSettled()
+
+ self.assertHistory([
+ dict(name='project-merge', result='SUCCESS', changes='1,1'),
+ dict(name='project-test1', result='SUCCESS', changes='1,1'),
+ dict(name='project-test2', result='SUCCESS', changes='1,1'),
+ dict(name='project1-project2-integration',
+ result='SUCCESS', changes='1,1'),
+ ], ordered=False)
+
class TestGithubModelUpgrade(ZuulTestCase):
config_file = 'zuul-github-driver.conf'
diff --git a/zuul/manager/__init__.py b/zuul/manager/__init__.py
index 36361df11..22543b5cd 100644
--- a/zuul/manager/__init__.py
+++ b/zuul/manager/__init__.py
@@ -625,7 +625,7 @@ class PipelineManager(metaclass=ABCMeta):
if enqueue_time:
item.enqueue_time = enqueue_time
item.live = live
- self.reportStats(item, added=True)
+ self.reportStats(item, trigger_event=event)
item.quiet = quiet
if item.live:
@@ -2197,7 +2197,7 @@ class PipelineManager(metaclass=ABCMeta):
log.error("Reporting item %s received: %s", item, ret)
return action, (not ret)
- def reportStats(self, item, added=False):
+ def reportStats(self, item, trigger_event=None):
if not self.sched.statsd:
return
try:
@@ -2236,18 +2236,21 @@ class PipelineManager(metaclass=ABCMeta):
if dt:
self.sched.statsd.timing(key + '.resident_time', dt)
self.sched.statsd.incr(key + '.total_changes')
- if added and hasattr(item.event, 'arrived_at_scheduler_timestamp'):
+ if (
+ trigger_event
+ and hasattr(trigger_event, 'arrived_at_scheduler_timestamp')
+ ):
now = time.time()
- arrived = item.event.arrived_at_scheduler_timestamp
+ arrived = trigger_event.arrived_at_scheduler_timestamp
processing = (now - arrived) * 1000
- elapsed = (now - item.event.timestamp) * 1000
+ elapsed = (now - trigger_event.timestamp) * 1000
self.sched.statsd.timing(
basekey + '.event_enqueue_processing_time',
processing)
self.sched.statsd.timing(
basekey + '.event_enqueue_time', elapsed)
self.reportPipelineTiming('event_enqueue_time',
- item.event.timestamp)
+ trigger_event.timestamp)
except Exception:
self.log.exception("Exception reporting pipeline stats")
diff --git a/zuul/model.py b/zuul/model.py
index 1d82b5f2c..e526b749c 100644
--- a/zuul/model.py
+++ b/zuul/model.py
@@ -4666,6 +4666,37 @@ class BuildSet(zkobject.ZKObject):
return Attributes(uuid=self.uuid)
+class EventInfo:
+
+ def __init__(self):
+ self.zuul_event_id = None
+ self.timestamp = time.time()
+ self.span_context = None
+
+ @classmethod
+ def fromEvent(cls, event):
+ tinfo = cls()
+ tinfo.zuul_event_id = event.zuul_event_id
+ tinfo.timestamp = event.timestamp
+ tinfo.span_context = event.span_context
+ return tinfo
+
+ @classmethod
+ def fromDict(cls, d):
+ tinfo = cls()
+ tinfo.zuul_event_id = d["zuul_event_id"]
+ tinfo.timestamp = d["timestamp"]
+ tinfo.span_context = d["span_context"]
+ return tinfo
+
+ def toDict(self):
+ return {
+ "zuul_event_id": self.zuul_event_id,
+ "timestamp": self.timestamp,
+ "span_context": self.span_context,
+ }
+
+
class QueueItem(zkobject.ZKObject):
"""Represents the position of a Change in a ChangeQueue.
@@ -4700,7 +4731,7 @@ class QueueItem(zkobject.ZKObject):
live=True, # Whether an item is intended to be processed at all
layout_uuid=None,
_cached_sql_results={},
- event=None, # The trigger event that lead to this queue item
+ event=None, # Info about the event that lead to this queue item
# Additional container for connection specifig information to be
# used by reporters throughout the lifecycle
@@ -4722,6 +4753,9 @@ class QueueItem(zkobject.ZKObject):
def new(klass, context, **kw):
obj = klass()
obj._set(**kw)
+ if COMPONENT_REGISTRY.model_api >= 13:
+ obj._set(event=obj.event and EventInfo.fromEvent(obj.event))
+
data = obj._trySerialize(context)
obj._save(context, data, create=True)
files_state = (BuildSet.COMPLETE if obj.change.files is not None
@@ -4750,10 +4784,18 @@ class QueueItem(zkobject.ZKObject):
return (tenant, pipeline, uuid)
def serialize(self, context):
- if isinstance(self.event, TriggerEvent):
- event_type = "TriggerEvent"
+ if COMPONENT_REGISTRY.model_api < 13:
+ if isinstance(self.event, TriggerEvent):
+ event_type = "TriggerEvent"
+ else:
+ event_type = self.event.__class__.__name__
else:
- event_type = self.event.__class__.__name__
+ event_type = "EventInfo"
+ if not isinstance(self.event, EventInfo):
+ # Convert our local trigger event to a trigger info
+ # object. This will only happen on the transition to
+ # model API version 13.
+ self._set(event=EventInfo.fromEvent(self.event))
data = {
"uuid": self.uuid,
@@ -4795,14 +4837,18 @@ class QueueItem(zkobject.ZKObject):
# child objects.
self._set(uuid=data["uuid"])
- event_type = data["event"]["type"]
- if event_type == "TriggerEvent":
- event_class = (
- self.pipeline.manager.sched.connections.getTriggerEventClass(
- data["event"]["data"]["driver_name"])
- )
+ if COMPONENT_REGISTRY.model_api < 13:
+ event_type = data["event"]["type"]
+ if event_type == "TriggerEvent":
+ event_class = (
+ self.pipeline.manager.sched.connections
+ .getTriggerEventClass(
+ data["event"]["data"]["driver_name"])
+ )
+ else:
+ event_class = EventTypeIndex.event_type_mapping.get(event_type)
else:
- event_class = EventTypeIndex.event_type_mapping.get(event_type)
+ event_class = EventInfo
if event_class is None:
raise NotImplementedError(
diff --git a/zuul/model_api.py b/zuul/model_api.py
index ccb12077d..0244296dd 100644
--- a/zuul/model_api.py
+++ b/zuul/model_api.py
@@ -14,4 +14,4 @@
# When making ZK schema changes, increment this and add a record to
# doc/source/developer/model-changelog.rst
-MODEL_API = 12
+MODEL_API = 13