summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorZuul <zuul@review.opendev.org>2022-08-19 21:33:58 +0000
committerGerrit Code Review <review@openstack.org>2022-08-19 21:33:58 +0000
commit7491e081bd1e08defd98245489e50604d317b6aa (patch)
treeea14483edd1880b77341ce8b04556930146da6c3
parent61a5b7f0934aec9380a80ddca70c24555f472caf (diff)
parente6530d11d058e50c41872f3c2c9ac286b57ed70e (diff)
downloadzuul-7491e081bd1e08defd98245489e50604d317b6aa.tar.gz
Merge "Reduce redundant Gerrit queries"
-rw-r--r--tests/unit/test_gerrit.py56
-rw-r--r--zuul/driver/gerrit/gerritconnection.py27
-rw-r--r--zuul/driver/gerrit/gerritmodel.py4
-rw-r--r--zuul/zk/event_queues.py3
4 files changed, 83 insertions, 7 deletions
diff --git a/tests/unit/test_gerrit.py b/tests/unit/test_gerrit.py
index aa2bb1758..47545b9be 100644
--- a/tests/unit/test_gerrit.py
+++ b/tests/unit/test_gerrit.py
@@ -13,6 +13,7 @@
# under the License.
import os
+import threading
import textwrap
from unittest import mock
@@ -868,3 +869,58 @@ class TestGerritFake(ZuulTestCase):
# The Gerrit connection method filters out the queried change
ret = self.fake_gerrit._getSubmittedTogether(C1, None)
self.assertEqual(ret, [(4, 1)])
+
+
+class TestGerritConnection(ZuulTestCase):
+ config_file = 'zuul-gerrit-web.conf'
+ tenant_config_file = 'config/single-tenant/main.yaml'
+
+ def test_zuul_query_ltime(self):
+ # Add a lock around the event queue iterator so that we can
+ # ensure that multiple events arrive before the first is
+ # processed.
+ lock = threading.Lock()
+
+ orig_iterEvents = self.fake_gerrit.gerrit_event_connector.\
+ event_queue._iterEvents
+
+ def _iterEvents(*args, **kw):
+ with lock:
+ return orig_iterEvents(*args, **kw)
+
+ self.patch(self.fake_gerrit.gerrit_event_connector.event_queue,
+ '_iterEvents', _iterEvents)
+
+ A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
+ B = self.fake_gerrit.addFakeChange('org/project', 'master', 'B')
+ B.setDependsOn(A, 1)
+ # Hold the connection queue processing so these events get
+ # processed together
+ with lock:
+ self.fake_gerrit.addEvent(A.addApproval('Code-Review', 2))
+ self.fake_gerrit.addEvent(B.addApproval('Approved', 1))
+ self.fake_gerrit.addEvent(B.addApproval('Code-Review', 2))
+ self.waitUntilSettled()
+ self.assertHistory([])
+ # One query for each change in the above cluster of events.
+ self.assertEqual(A.queried, 1)
+ self.assertEqual(B.queried, 1)
+ self.fake_gerrit.addEvent(A.addApproval('Approved', 1))
+ self.waitUntilSettled()
+ self.assertHistory([
+ dict(name="project-merge", result="SUCCESS", changes="1,1"),
+ dict(name="project-test1", result="SUCCESS", changes="1,1"),
+ dict(name="project-test2", result="SUCCESS", changes="1,1"),
+ dict(name="project-merge", result="SUCCESS", changes="1,1 2,1"),
+ dict(name="project-test1", result="SUCCESS", changes="1,1 2,1"),
+ dict(name="project-test2", result="SUCCESS", changes="1,1 2,1"),
+ ], ordered=False)
+ # One query due to the event on change A, followed by a query
+ # to verify the merge.
+ self.assertEqual(A.queried, 3)
+ # No query for change B necessary since our cache is up to
+ # date with respect for the triggering event. One query to
+ # verify the merge.
+ self.assertEqual(B.queried, 2)
+ self.assertEqual(A.data['status'], 'MERGED')
+ self.assertEqual(B.data['status'], 'MERGED')
diff --git a/zuul/driver/gerrit/gerritconnection.py b/zuul/driver/gerrit/gerritconnection.py
index 1ec334915..2213293f8 100644
--- a/zuul/driver/gerrit/gerritconnection.py
+++ b/zuul/driver/gerrit/gerritconnection.py
@@ -103,10 +103,12 @@ class GerritChangeData(object):
SSH = 1
HTTP = 2
- def __init__(self, fmt, data, related=None, files=None):
+ def __init__(self, fmt, data, related=None, files=None,
+ zuul_query_ltime=None):
self.format = fmt
self.data = data
self.files = files
+ self.zuul_query_ltime = zuul_query_ltime
if fmt == self.SSH:
self.parseSSH(data)
@@ -329,19 +331,20 @@ class GerritEventConnector(threading.Thread):
self.connection.clearConnectionCacheOnBranchEvent(event)
- self._getChange(event)
+ self._getChange(event, connection_event.zuul_event_ltime)
self.connection.logEvent(event)
self.connection.sched.addTriggerEvent(
self.connection.driver_name, event
)
- def _getChange(self, event):
+ def _getChange(self, event, connection_event_ltime):
# Grab the change if we are managing the project or if it exists in the
# cache as it may be a dependency
if event.change_number:
refresh = True
change_key = self.connection.source.getChangeKey(event)
- if self.connection._change_cache.get(change_key) is None:
+ change = self.connection._change_cache.get(change_key)
+ if change is None:
refresh = False
for tenant in self.connection.sched.abide.tenants.values():
# TODO(fungi): it would be better to have some simple means
@@ -353,6 +356,13 @@ class GerritEventConnector(threading.Thread):
event.project_name))):
refresh = True
break
+ else:
+ # We have a cache entry for this change Get the
+ # query ltime for the cache entry; if it's after the
+ # event ltime, we don't need to refresh.
+ if (change.zuul_query_ltime and
+ change.zuul_query_ltime > connection_event_ltime):
+ refresh = False
if refresh:
# Call _getChange for the side effect of updating the
@@ -1418,15 +1428,20 @@ class GerritConnection(ZKChangeCacheMixin, ZKBranchCacheMixin, BaseConnection):
def queryChange(self, number, event=None):
for attempt in range(3):
+ # Get a query ltime -- any events before this point should be
+ # included in our change data.
+ zuul_query_ltime = self.sched.zk_client.getCurrentLtime()
try:
if self.session:
data, related, files = self.queryChangeHTTP(
number, event=event)
return GerritChangeData(GerritChangeData.HTTP,
- data, related, files)
+ data, related, files,
+ zuul_query_ltime=zuul_query_ltime)
else:
data = self.queryChangeSSH(number, event=event)
- return GerritChangeData(GerritChangeData.SSH, data)
+ return GerritChangeData(GerritChangeData.SSH, data,
+ zuul_query_ltime=zuul_query_ltime)
except Exception:
if attempt >= 3:
raise
diff --git a/zuul/driver/gerrit/gerritmodel.py b/zuul/driver/gerrit/gerritmodel.py
index f0ec32f77..0ac3e7f9d 100644
--- a/zuul/driver/gerrit/gerritmodel.py
+++ b/zuul/driver/gerrit/gerritmodel.py
@@ -35,8 +35,10 @@ class GerritChange(Change):
self.approvals = []
self.missing_labels = set()
self.commit = None
+ self.zuul_query_ltime = None
def update(self, data, connection):
+ self.zuul_query_ltime = data.zuul_query_ltime
if data.format == data.SSH:
self.updateFromSSH(data.data, connection)
else:
@@ -51,6 +53,7 @@ class GerritChange(Change):
"approvals": self.approvals,
"missing_labels": list(self.missing_labels),
"commit": self.commit,
+ "zuul_query_ltime": self.zuul_query_ltime,
})
return d
@@ -62,6 +65,7 @@ class GerritChange(Change):
self.approvals = data["approvals"]
self.missing_labels = set(data["missing_labels"])
self.commit = data.get("commit")
+ self.zuul_query_ltime = data.get("zuul_query_ltime")
def updateFromSSH(self, data, connection):
if self.patchset is None:
diff --git a/zuul/zk/event_queues.py b/zuul/zk/event_queues.py
index 52ffd582e..ad7529791 100644
--- a/zuul/zk/event_queues.py
+++ b/zuul/zk/event_queues.py
@@ -909,7 +909,7 @@ class ConnectionEventQueue(ZooKeeperEventQueue):
self._put({'event_data': data})
def __iter__(self):
- for data, ack_ref, _ in self._iterEvents():
+ for data, ack_ref, zstat in self._iterEvents():
if not data:
self.log.warning("Malformed event found: %s", data)
self._remove(ack_ref.path)
@@ -918,6 +918,7 @@ class ConnectionEventQueue(ZooKeeperEventQueue):
event = model.ConnectionEvent.fromDict(
data.get('event_data', data))
event.ack_ref = ack_ref
+ event.zuul_event_ltime = zstat.creation_transaction_id
yield event