diff options
author | Zuul <zuul@review.opendev.org> | 2022-08-19 21:33:58 +0000 |
---|---|---|
committer | Gerrit Code Review <review@openstack.org> | 2022-08-19 21:33:58 +0000 |
commit | 7491e081bd1e08defd98245489e50604d317b6aa (patch) | |
tree | ea14483edd1880b77341ce8b04556930146da6c3 | |
parent | 61a5b7f0934aec9380a80ddca70c24555f472caf (diff) | |
parent | e6530d11d058e50c41872f3c2c9ac286b57ed70e (diff) | |
download | zuul-7491e081bd1e08defd98245489e50604d317b6aa.tar.gz |
Merge "Reduce redundant Gerrit queries"
-rw-r--r-- | tests/unit/test_gerrit.py | 56 | ||||
-rw-r--r-- | zuul/driver/gerrit/gerritconnection.py | 27 | ||||
-rw-r--r-- | zuul/driver/gerrit/gerritmodel.py | 4 | ||||
-rw-r--r-- | zuul/zk/event_queues.py | 3 |
4 files changed, 83 insertions, 7 deletions
diff --git a/tests/unit/test_gerrit.py b/tests/unit/test_gerrit.py index aa2bb1758..47545b9be 100644 --- a/tests/unit/test_gerrit.py +++ b/tests/unit/test_gerrit.py @@ -13,6 +13,7 @@ # under the License. import os +import threading import textwrap from unittest import mock @@ -868,3 +869,58 @@ class TestGerritFake(ZuulTestCase): # The Gerrit connection method filters out the queried change ret = self.fake_gerrit._getSubmittedTogether(C1, None) self.assertEqual(ret, [(4, 1)]) + + +class TestGerritConnection(ZuulTestCase): + config_file = 'zuul-gerrit-web.conf' + tenant_config_file = 'config/single-tenant/main.yaml' + + def test_zuul_query_ltime(self): + # Add a lock around the event queue iterator so that we can + # ensure that multiple events arrive before the first is + # processed. + lock = threading.Lock() + + orig_iterEvents = self.fake_gerrit.gerrit_event_connector.\ + event_queue._iterEvents + + def _iterEvents(*args, **kw): + with lock: + return orig_iterEvents(*args, **kw) + + self.patch(self.fake_gerrit.gerrit_event_connector.event_queue, + '_iterEvents', _iterEvents) + + A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A') + B = self.fake_gerrit.addFakeChange('org/project', 'master', 'B') + B.setDependsOn(A, 1) + # Hold the connection queue processing so these events get + # processed together + with lock: + self.fake_gerrit.addEvent(A.addApproval('Code-Review', 2)) + self.fake_gerrit.addEvent(B.addApproval('Approved', 1)) + self.fake_gerrit.addEvent(B.addApproval('Code-Review', 2)) + self.waitUntilSettled() + self.assertHistory([]) + # One query for each change in the above cluster of events. + self.assertEqual(A.queried, 1) + self.assertEqual(B.queried, 1) + self.fake_gerrit.addEvent(A.addApproval('Approved', 1)) + self.waitUntilSettled() + self.assertHistory([ + dict(name="project-merge", result="SUCCESS", changes="1,1"), + dict(name="project-test1", result="SUCCESS", changes="1,1"), + dict(name="project-test2", result="SUCCESS", changes="1,1"), + dict(name="project-merge", result="SUCCESS", changes="1,1 2,1"), + dict(name="project-test1", result="SUCCESS", changes="1,1 2,1"), + dict(name="project-test2", result="SUCCESS", changes="1,1 2,1"), + ], ordered=False) + # One query due to the event on change A, followed by a query + # to verify the merge. + self.assertEqual(A.queried, 3) + # No query for change B necessary since our cache is up to + # date with respect for the triggering event. One query to + # verify the merge. + self.assertEqual(B.queried, 2) + self.assertEqual(A.data['status'], 'MERGED') + self.assertEqual(B.data['status'], 'MERGED') diff --git a/zuul/driver/gerrit/gerritconnection.py b/zuul/driver/gerrit/gerritconnection.py index 1ec334915..2213293f8 100644 --- a/zuul/driver/gerrit/gerritconnection.py +++ b/zuul/driver/gerrit/gerritconnection.py @@ -103,10 +103,12 @@ class GerritChangeData(object): SSH = 1 HTTP = 2 - def __init__(self, fmt, data, related=None, files=None): + def __init__(self, fmt, data, related=None, files=None, + zuul_query_ltime=None): self.format = fmt self.data = data self.files = files + self.zuul_query_ltime = zuul_query_ltime if fmt == self.SSH: self.parseSSH(data) @@ -329,19 +331,20 @@ class GerritEventConnector(threading.Thread): self.connection.clearConnectionCacheOnBranchEvent(event) - self._getChange(event) + self._getChange(event, connection_event.zuul_event_ltime) self.connection.logEvent(event) self.connection.sched.addTriggerEvent( self.connection.driver_name, event ) - def _getChange(self, event): + def _getChange(self, event, connection_event_ltime): # Grab the change if we are managing the project or if it exists in the # cache as it may be a dependency if event.change_number: refresh = True change_key = self.connection.source.getChangeKey(event) - if self.connection._change_cache.get(change_key) is None: + change = self.connection._change_cache.get(change_key) + if change is None: refresh = False for tenant in self.connection.sched.abide.tenants.values(): # TODO(fungi): it would be better to have some simple means @@ -353,6 +356,13 @@ class GerritEventConnector(threading.Thread): event.project_name))): refresh = True break + else: + # We have a cache entry for this change Get the + # query ltime for the cache entry; if it's after the + # event ltime, we don't need to refresh. + if (change.zuul_query_ltime and + change.zuul_query_ltime > connection_event_ltime): + refresh = False if refresh: # Call _getChange for the side effect of updating the @@ -1418,15 +1428,20 @@ class GerritConnection(ZKChangeCacheMixin, ZKBranchCacheMixin, BaseConnection): def queryChange(self, number, event=None): for attempt in range(3): + # Get a query ltime -- any events before this point should be + # included in our change data. + zuul_query_ltime = self.sched.zk_client.getCurrentLtime() try: if self.session: data, related, files = self.queryChangeHTTP( number, event=event) return GerritChangeData(GerritChangeData.HTTP, - data, related, files) + data, related, files, + zuul_query_ltime=zuul_query_ltime) else: data = self.queryChangeSSH(number, event=event) - return GerritChangeData(GerritChangeData.SSH, data) + return GerritChangeData(GerritChangeData.SSH, data, + zuul_query_ltime=zuul_query_ltime) except Exception: if attempt >= 3: raise diff --git a/zuul/driver/gerrit/gerritmodel.py b/zuul/driver/gerrit/gerritmodel.py index f0ec32f77..0ac3e7f9d 100644 --- a/zuul/driver/gerrit/gerritmodel.py +++ b/zuul/driver/gerrit/gerritmodel.py @@ -35,8 +35,10 @@ class GerritChange(Change): self.approvals = [] self.missing_labels = set() self.commit = None + self.zuul_query_ltime = None def update(self, data, connection): + self.zuul_query_ltime = data.zuul_query_ltime if data.format == data.SSH: self.updateFromSSH(data.data, connection) else: @@ -51,6 +53,7 @@ class GerritChange(Change): "approvals": self.approvals, "missing_labels": list(self.missing_labels), "commit": self.commit, + "zuul_query_ltime": self.zuul_query_ltime, }) return d @@ -62,6 +65,7 @@ class GerritChange(Change): self.approvals = data["approvals"] self.missing_labels = set(data["missing_labels"]) self.commit = data.get("commit") + self.zuul_query_ltime = data.get("zuul_query_ltime") def updateFromSSH(self, data, connection): if self.patchset is None: diff --git a/zuul/zk/event_queues.py b/zuul/zk/event_queues.py index 52ffd582e..ad7529791 100644 --- a/zuul/zk/event_queues.py +++ b/zuul/zk/event_queues.py @@ -909,7 +909,7 @@ class ConnectionEventQueue(ZooKeeperEventQueue): self._put({'event_data': data}) def __iter__(self): - for data, ack_ref, _ in self._iterEvents(): + for data, ack_ref, zstat in self._iterEvents(): if not data: self.log.warning("Malformed event found: %s", data) self._remove(ack_ref.path) @@ -918,6 +918,7 @@ class ConnectionEventQueue(ZooKeeperEventQueue): event = model.ConnectionEvent.fromDict( data.get('event_data', data)) event.ack_ref = ack_ref + event.zuul_event_ltime = zstat.creation_transaction_id yield event |