summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJames E. Blair <jim@acmegating.com>2022-07-02 10:29:28 -0700
committerJames E. Blair <jim@acmegating.com>2022-08-19 10:08:57 -0700
commite6530d11d058e50c41872f3c2c9ac286b57ed70e (patch)
tree10446e5d5736ad1f2459a77570c1db4fc0fb3076
parent87ea63eee39b061e6cb5e11697ac7c4df8247c6d (diff)
downloadzuul-e6530d11d058e50c41872f3c2c9ac286b57ed70e.tar.gz
Reduce redundant Gerrit queries
Sometimes Gerrit events may arrive in batches (for example, an automated process modifies several related changes nearly simultaneously). Because of our inbuilt delay (10 seconds by default), it's possible that in these cases, many or all of the updates represented by these events will have settled on the Gerrit server before we even start processing the first event. In these cases, we don't need to query the same changes multiple times. Take for example a stack of 10 changes. Someone approves all 10 simultaneously. That would produce (at least) 10 events for Zuul to process. Each event would cause Zuul to query all 10 changes in the series (since they are related). That's 100 change queries (and each change query requires 2 or 3 HTTP queries). But if we know that all the event arrived before our first set of change queries, we can reduce that set of 100 queries to 10 by suppressing any queries after the first. This change generates a logical timestamp (ltime) immediately before querying Gerrit for a change, and stores that ltime in the change cache. Whenever an event arrives for processing with an ltime later than the query ltime, we assume the change is already up to date with that event and do not perform another query. Change-Id: Ib1b9245cc84ab3f5a0624697f4e3fc73bc8e03bd
-rw-r--r--tests/unit/test_gerrit.py56
-rw-r--r--zuul/driver/gerrit/gerritconnection.py27
-rw-r--r--zuul/driver/gerrit/gerritmodel.py4
-rw-r--r--zuul/zk/event_queues.py3
4 files changed, 83 insertions, 7 deletions
diff --git a/tests/unit/test_gerrit.py b/tests/unit/test_gerrit.py
index f0f9027bd..0f129ae7a 100644
--- a/tests/unit/test_gerrit.py
+++ b/tests/unit/test_gerrit.py
@@ -13,6 +13,7 @@
# under the License.
import os
+import threading
import textwrap
from unittest import mock
@@ -867,3 +868,58 @@ class TestGerritFake(ZuulTestCase):
# The Gerrit connection method filters out the queried change
ret = self.fake_gerrit._getSubmittedTogether(C1, None)
self.assertEqual(ret, [(4, 1)])
+
+
+class TestGerritConnection(ZuulTestCase):
+ config_file = 'zuul-gerrit-web.conf'
+ tenant_config_file = 'config/single-tenant/main.yaml'
+
+ def test_zuul_query_ltime(self):
+ # Add a lock around the event queue iterator so that we can
+ # ensure that multiple events arrive before the first is
+ # processed.
+ lock = threading.Lock()
+
+ orig_iterEvents = self.fake_gerrit.gerrit_event_connector.\
+ event_queue._iterEvents
+
+ def _iterEvents(*args, **kw):
+ with lock:
+ return orig_iterEvents(*args, **kw)
+
+ self.patch(self.fake_gerrit.gerrit_event_connector.event_queue,
+ '_iterEvents', _iterEvents)
+
+ A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
+ B = self.fake_gerrit.addFakeChange('org/project', 'master', 'B')
+ B.setDependsOn(A, 1)
+ # Hold the connection queue processing so these events get
+ # processed together
+ with lock:
+ self.fake_gerrit.addEvent(A.addApproval('Code-Review', 2))
+ self.fake_gerrit.addEvent(B.addApproval('Approved', 1))
+ self.fake_gerrit.addEvent(B.addApproval('Code-Review', 2))
+ self.waitUntilSettled()
+ self.assertHistory([])
+ # One query for each change in the above cluster of events.
+ self.assertEqual(A.queried, 1)
+ self.assertEqual(B.queried, 1)
+ self.fake_gerrit.addEvent(A.addApproval('Approved', 1))
+ self.waitUntilSettled()
+ self.assertHistory([
+ dict(name="project-merge", result="SUCCESS", changes="1,1"),
+ dict(name="project-test1", result="SUCCESS", changes="1,1"),
+ dict(name="project-test2", result="SUCCESS", changes="1,1"),
+ dict(name="project-merge", result="SUCCESS", changes="1,1 2,1"),
+ dict(name="project-test1", result="SUCCESS", changes="1,1 2,1"),
+ dict(name="project-test2", result="SUCCESS", changes="1,1 2,1"),
+ ], ordered=False)
+ # One query due to the event on change A, followed by a query
+ # to verify the merge.
+ self.assertEqual(A.queried, 3)
+ # No query for change B necessary since our cache is up to
+ # date with respect for the triggering event. One query to
+ # verify the merge.
+ self.assertEqual(B.queried, 2)
+ self.assertEqual(A.data['status'], 'MERGED')
+ self.assertEqual(B.data['status'], 'MERGED')
diff --git a/zuul/driver/gerrit/gerritconnection.py b/zuul/driver/gerrit/gerritconnection.py
index 6aea4388b..12fbedeed 100644
--- a/zuul/driver/gerrit/gerritconnection.py
+++ b/zuul/driver/gerrit/gerritconnection.py
@@ -103,10 +103,12 @@ class GerritChangeData(object):
SSH = 1
HTTP = 2
- def __init__(self, fmt, data, related=None, files=None):
+ def __init__(self, fmt, data, related=None, files=None,
+ zuul_query_ltime=None):
self.format = fmt
self.data = data
self.files = files
+ self.zuul_query_ltime = zuul_query_ltime
if fmt == self.SSH:
self.parseSSH(data)
@@ -329,19 +331,20 @@ class GerritEventConnector(threading.Thread):
self.connection.clearConnectionCacheOnBranchEvent(event)
- self._getChange(event)
+ self._getChange(event, connection_event.zuul_event_ltime)
self.connection.logEvent(event)
self.connection.sched.addTriggerEvent(
self.connection.driver_name, event
)
- def _getChange(self, event):
+ def _getChange(self, event, connection_event_ltime):
# Grab the change if we are managing the project or if it exists in the
# cache as it may be a dependency
if event.change_number:
refresh = True
change_key = self.connection.source.getChangeKey(event)
- if self.connection._change_cache.get(change_key) is None:
+ change = self.connection._change_cache.get(change_key)
+ if change is None:
refresh = False
for tenant in self.connection.sched.abide.tenants.values():
# TODO(fungi): it would be better to have some simple means
@@ -353,6 +356,13 @@ class GerritEventConnector(threading.Thread):
event.project_name))):
refresh = True
break
+ else:
+ # We have a cache entry for this change Get the
+ # query ltime for the cache entry; if it's after the
+ # event ltime, we don't need to refresh.
+ if (change.zuul_query_ltime and
+ change.zuul_query_ltime > connection_event_ltime):
+ refresh = False
if refresh:
# Call _getChange for the side effect of updating the
@@ -1418,15 +1428,20 @@ class GerritConnection(ZKChangeCacheMixin, ZKBranchCacheMixin, BaseConnection):
def queryChange(self, number, event=None):
for attempt in range(3):
+ # Get a query ltime -- any events before this point should be
+ # included in our change data.
+ zuul_query_ltime = self.sched.zk_client.getCurrentLtime()
try:
if self.session:
data, related, files = self.queryChangeHTTP(
number, event=event)
return GerritChangeData(GerritChangeData.HTTP,
- data, related, files)
+ data, related, files,
+ zuul_query_ltime=zuul_query_ltime)
else:
data = self.queryChangeSSH(number, event=event)
- return GerritChangeData(GerritChangeData.SSH, data)
+ return GerritChangeData(GerritChangeData.SSH, data,
+ zuul_query_ltime=zuul_query_ltime)
except Exception:
if attempt >= 3:
raise
diff --git a/zuul/driver/gerrit/gerritmodel.py b/zuul/driver/gerrit/gerritmodel.py
index f0ec32f77..0ac3e7f9d 100644
--- a/zuul/driver/gerrit/gerritmodel.py
+++ b/zuul/driver/gerrit/gerritmodel.py
@@ -35,8 +35,10 @@ class GerritChange(Change):
self.approvals = []
self.missing_labels = set()
self.commit = None
+ self.zuul_query_ltime = None
def update(self, data, connection):
+ self.zuul_query_ltime = data.zuul_query_ltime
if data.format == data.SSH:
self.updateFromSSH(data.data, connection)
else:
@@ -51,6 +53,7 @@ class GerritChange(Change):
"approvals": self.approvals,
"missing_labels": list(self.missing_labels),
"commit": self.commit,
+ "zuul_query_ltime": self.zuul_query_ltime,
})
return d
@@ -62,6 +65,7 @@ class GerritChange(Change):
self.approvals = data["approvals"]
self.missing_labels = set(data["missing_labels"])
self.commit = data.get("commit")
+ self.zuul_query_ltime = data.get("zuul_query_ltime")
def updateFromSSH(self, data, connection):
if self.patchset is None:
diff --git a/zuul/zk/event_queues.py b/zuul/zk/event_queues.py
index 8718f609c..fa4e06bd0 100644
--- a/zuul/zk/event_queues.py
+++ b/zuul/zk/event_queues.py
@@ -884,7 +884,7 @@ class ConnectionEventQueue(ZooKeeperEventQueue):
self._put({'event_data': data})
def __iter__(self):
- for data, ack_ref, _ in self._iterEvents():
+ for data, ack_ref, zstat in self._iterEvents():
if not data:
self.log.warning("Malformed event found: %s", data)
self._remove(ack_ref.path)
@@ -893,6 +893,7 @@ class ConnectionEventQueue(ZooKeeperEventQueue):
event = model.ConnectionEvent.fromDict(
data.get('event_data', data))
event.ack_ref = ack_ref
+ event.zuul_event_ltime = zstat.creation_transaction_id
yield event