summaryrefslogtreecommitdiff
path: root/zuul/driver
diff options
context:
space:
mode:
authorSimon Westphahl <simon.westphahl@bmw.de>2021-08-27 12:29:15 +0200
committerSimon Westphahl <simon.westphahl@bmw.de>2021-09-16 10:49:17 +0200
commit01606275a1538a8ab566fbb72be6bc0b85b8bd7a (patch)
treeea795c659f6b44b25573e2911e1392b1b09a8252 /zuul/driver
parentcbab8c2775a517d82bee543cc20c4e41598f94a1 (diff)
downloadzuul-01606275a1538a8ab566fbb72be6bc0b85b8bd7a.tar.gz
Cache Pagure refs in Zookeeper
Change-Id: If8ed18679611809a053732e95ad4824e327424c5
Diffstat (limited to 'zuul/driver')
-rw-r--r--zuul/driver/pagure/pagureconnection.py127
-rw-r--r--zuul/driver/pagure/paguremodel.py23
-rw-r--r--zuul/driver/pagure/paguresource.py5
3 files changed, 106 insertions, 49 deletions
diff --git a/zuul/driver/pagure/pagureconnection.py b/zuul/driver/pagure/pagureconnection.py
index 4c7e76207..084585d38 100644
--- a/zuul/driver/pagure/pagureconnection.py
+++ b/zuul/driver/pagure/pagureconnection.py
@@ -26,8 +26,9 @@ import voluptuous as v
from zuul.connection import BaseConnection
from zuul.lib.logutil import get_annotated_logger
from zuul.web.handler import BaseWebController
-from zuul.model import Ref, Branch, Tag, CacheStat
+from zuul.model import Ref, Branch, Tag
from zuul.lib import dependson
+from zuul.zk.change_cache import AbstractChangeCache, ConcurrentUpdateError
from zuul.zk.event_queues import ConnectionEventQueue
from zuul.driver.pagure.paguremodel import PagureTriggerEvent, PullRequest
@@ -96,6 +97,23 @@ def _sign_request(body, secret):
return signature, body
+class PagureChangeCache(AbstractChangeCache):
+ log = logging.getLogger("zuul.driver.PagureChangeCache")
+
+ CHANGE_TYPE_MAP = {
+ "Ref": Ref,
+ "Tag": Tag,
+ "Branch": Branch,
+ "PullRequest": PullRequest,
+ }
+
+ def _getChangeClass(self, change_type):
+ return self.CHANGE_TYPE_MAP[change_type]
+
+ def _getChangeType(self, change):
+ return type(change).__name__
+
+
class PagureEventConnector(threading.Thread):
"""Move events from Pagure into the scheduler"""
@@ -455,7 +473,6 @@ class PagureConnection(BaseConnection):
def __init__(self, driver, connection_name, connection_config):
super(PagureConnection, self).__init__(
driver, connection_name, connection_config)
- self._change_cache = {}
self.project_branch_cache = {}
self.projects = {}
self.server = self.connection_config.get('server', 'pagure.io')
@@ -484,6 +501,8 @@ class PagureConnection(BaseConnection):
self.event_queue = ConnectionEventQueue(
self.sched.zk_client, self.connection_name
)
+ self.log.debug('Creating Zookeeper change cache')
+ self._change_cache = PagureChangeCache(self.sched.zk_client, self)
self.log.info('Starting event connector')
self._start_event_connector()
@@ -527,12 +546,18 @@ class PagureConnection(BaseConnection):
return token
def maintainCache(self, relevant):
- remove = set()
- for key, change in self._change_cache.items():
+ for change in self._change_cache:
if change not in relevant:
- remove.add(key)
- for key in remove:
- del self._change_cache[key]
+ self._change_cache.delete(change.cache_stat.key)
+ # TODO: remove entries older than X
+ self._change_cache.cleanup()
+
+ def updateChangeAttributes(self, change, **attrs):
+ def _update_attrs(c):
+ for name, value in attrs.items():
+ setattr(c, name, value)
+ self._change_cache.updateChangeWithRetry(change.cache_stat.key,
+ change, _update_attrs)
def getWebController(self, zuul_web):
return PagureWebController(zuul_web, self)
@@ -582,36 +607,15 @@ class PagureConnection(BaseConnection):
change = self._getChange(
project, event.change_number, event.patch_number,
refresh=refresh, event=event)
- change.is_current_patchset = (change.pr.get('commit_stop') ==
- event.patch_number)
else:
self.log.info("Getting change for %s ref:%s" % (
project, event.ref))
- if event.ref and event.ref.startswith('refs/tags/'):
- change = Tag(project)
- change.tag = event.tag
- change.branch = None
- elif event.ref and event.ref.startswith('refs/heads/'):
- change = Branch(project)
- change.branch = event.branch
- else:
- change = Ref(project)
- change.branch = None
- change.ref = event.ref
- change.oldrev = event.oldrev
- change.newrev = event.newrev
- change.url = self.getGitwebUrl(project, sha=event.newrev)
-
- # Pagure does not send files details in the git-receive event.
- # Explicitly set files to None and let the pipelines processor
- # call the merger asynchronuously
- change.files = None
-
+ change = self._getNonPRRef(project, event)
return change
def _getChange(self, project, number, patchset=None,
refresh=False, url=None, event=None):
- key = (project.name, number, patchset)
+ key = str((project.name, number, patchset))
change = self._change_cache.get(key)
if change and not refresh:
self.log.debug("Getting change from cache %s" % str(key))
@@ -626,18 +630,53 @@ class PagureConnection(BaseConnection):
change.uris = [
'%s/%s/pull/%s' % (self.baseurl, project, number),
]
- change.cache_stat = CacheStat(key, None, None)
- self._change_cache[key] = change
try:
self.log.debug("Getting change pr#%s from project %s" % (
number, project.name))
- self._updateChange(change, event)
+ self.log.info("Updating change from pagure %s" % change)
+ pull = self.getPull(change.project.name, change.number)
+
+ def _update_change(c):
+ self._updateChange(c, event, pull)
+
+ change = self._change_cache.updateChangeWithRetry(key, change,
+ _update_change)
+
+ if self.sched:
+ self.sched.onChangeUpdated(change, event)
except Exception:
- if key in self._change_cache:
- del self._change_cache[key]
+ self.log.warning("Deleting cache key %s due to exception", key)
+ self._change_cache.delete(key)
raise
return change
+ def _getNonPRRef(self, project, event):
+ key = str((project.name, event.ref, event.newrev))
+ change = self._change_cache.get(key)
+ if change:
+ return change
+ if event.ref and event.ref.startswith('refs/tags/'):
+ change = Tag(project)
+ change.tag = event.tag
+ elif event.ref and event.ref.startswith('refs/heads/'):
+ change = Branch(project)
+ change.branch = event.branch
+ else:
+ change = Ref(project)
+ change.ref = event.ref
+ change.oldrev = event.oldrev
+ change.newrev = event.newrev
+ change.url = self.getGitwebUrl(project, sha=event.newrev)
+ # Pagure does not send files details in the git-receive event.
+ # Explicitly set files to None and let the pipelines processor
+ # call the merger asynchronuously
+ change.files = None
+ try:
+ self._change_cache.set(key, change)
+ except ConcurrentUpdateError:
+ change = self._change_cache.get(key)
+ return change
+
def _hasRequiredStatusChecks(self, change):
pagure = self.get_project_api_client(change.project.name)
flag = pagure.get_pr_flags(change.number, self.username, last=True)
@@ -718,12 +757,12 @@ class PagureConnection(BaseConnection):
score_board[author] -= 1
return sum(score_board.values())
- def _updateChange(self, change, event):
- self.log.info("Updating change from pagure %s" % change)
- change.pr = self.getPull(change.project.name, change.number)
+ def _updateChange(self, change, event, pull):
+ change.pr = pull
change.ref = "refs/pull/%s/head" % change.number
change.branch = change.pr.get('branch')
- change.patchset = change.pr.get('commit_stop')
+ change.is_current_patchset = (change.pr.get('commit_stop') ==
+ change.patchset)
change.files = change.pr.get('files')
change.title = change.pr.get('title')
change.tags = change.pr.get('tags')
@@ -735,10 +774,6 @@ class PagureConnection(BaseConnection):
# last_updated seems to be touch for comment changed/flags - that's OK
change.updated_at = change.pr.get('last_updated')
self.log.info("Updated change from pagure %s" % change)
-
- if self.sched:
- self.sched.onChangeUpdated(change, event)
-
return change
def commentPull(self, project, number, message):
@@ -773,11 +808,11 @@ class PagureConnection(BaseConnection):
# a the depends-on string in PR initial message. Not a blocker
# for now, let's workaround using the local change cache !
changes_dependencies = []
- for cached_change_id, _change in self._change_cache.items():
+ for cached_change in self._change_cache:
for dep_header in dependson.find_dependency_headers(
- _change.message):
+ cached_change.message):
if change.url in dep_header:
- changes_dependencies.append(_change)
+ changes_dependencies.append(cached_change)
return changes_dependencies
def mergePull(self, project, number):
diff --git a/zuul/driver/pagure/paguremodel.py b/zuul/driver/pagure/paguremodel.py
index a0d105880..052f8e3b9 100644
--- a/zuul/driver/pagure/paguremodel.py
+++ b/zuul/driver/pagure/paguremodel.py
@@ -22,12 +22,10 @@ EMPTY_GIT_REF = '0' * 40 # git sha of all zeros, used during creates/deletes
class PullRequest(Change):
def __init__(self, project):
super(PullRequest, self).__init__(project)
- self.project = None
self.pr = None
self.updated_at = None
self.title = None
self.score = 0
- self.files = []
self.tags = []
self.status = None
@@ -53,6 +51,27 @@ class PullRequest(Change):
r.append('state: open')
return ' '.join(r) + '>'
+ def serialize(self):
+ d = super().serialize()
+ d.update({
+ "pr": self.pr,
+ "updated_at": self.updated_at,
+ "title": self.title,
+ "score": self.score,
+ "tags": self.tags,
+ "status": self.status,
+ })
+ return d
+
+ def deserialize(self, data):
+ super().deserialize(data)
+ self.pr = data.get("pr")
+ self.updated_at = data.get("updated_at")
+ self.title = data.get("title")
+ self.score = data.get("score", 0)
+ self.tags = data.get("tags", [])
+ self.status = data.get("status")
+
def isUpdateOf(self, other):
if (self.project == other.project and
hasattr(other, 'number') and self.number == other.number and
diff --git a/zuul/driver/pagure/paguresource.py b/zuul/driver/pagure/paguresource.py
index 4202df070..c7452b0b1 100644
--- a/zuul/driver/pagure/paguresource.py
+++ b/zuul/driver/pagure/paguresource.py
@@ -96,7 +96,7 @@ class PagureSource(BaseSource):
change, projects, tenant)
def getCachedChanges(self):
- return list(self.connection._change_cache.values())
+ yield from self.connection._change_cache
def getProject(self, name):
p = self.connection.getProject(name)
@@ -141,6 +141,9 @@ class PagureSource(BaseSource):
def getRefForChange(self, change):
raise NotImplementedError()
+ def setChangeAttributes(self, change, **attrs):
+ return self.connection.updateChangeAttributes(change, **attrs)
+
# Require model
def getRequireSchema():