diff options
author | Simon Westphahl <simon.westphahl@bmw.de> | 2021-08-27 12:29:15 +0200 |
---|---|---|
committer | Simon Westphahl <simon.westphahl@bmw.de> | 2021-09-16 10:49:17 +0200 |
commit | 01606275a1538a8ab566fbb72be6bc0b85b8bd7a (patch) | |
tree | ea795c659f6b44b25573e2911e1392b1b09a8252 /zuul/driver | |
parent | cbab8c2775a517d82bee543cc20c4e41598f94a1 (diff) | |
download | zuul-01606275a1538a8ab566fbb72be6bc0b85b8bd7a.tar.gz |
Cache Pagure refs in Zookeeper
Change-Id: If8ed18679611809a053732e95ad4824e327424c5
Diffstat (limited to 'zuul/driver')
-rw-r--r-- | zuul/driver/pagure/pagureconnection.py | 127 | ||||
-rw-r--r-- | zuul/driver/pagure/paguremodel.py | 23 | ||||
-rw-r--r-- | zuul/driver/pagure/paguresource.py | 5 |
3 files changed, 106 insertions, 49 deletions
diff --git a/zuul/driver/pagure/pagureconnection.py b/zuul/driver/pagure/pagureconnection.py index 4c7e76207..084585d38 100644 --- a/zuul/driver/pagure/pagureconnection.py +++ b/zuul/driver/pagure/pagureconnection.py @@ -26,8 +26,9 @@ import voluptuous as v from zuul.connection import BaseConnection from zuul.lib.logutil import get_annotated_logger from zuul.web.handler import BaseWebController -from zuul.model import Ref, Branch, Tag, CacheStat +from zuul.model import Ref, Branch, Tag from zuul.lib import dependson +from zuul.zk.change_cache import AbstractChangeCache, ConcurrentUpdateError from zuul.zk.event_queues import ConnectionEventQueue from zuul.driver.pagure.paguremodel import PagureTriggerEvent, PullRequest @@ -96,6 +97,23 @@ def _sign_request(body, secret): return signature, body +class PagureChangeCache(AbstractChangeCache): + log = logging.getLogger("zuul.driver.PagureChangeCache") + + CHANGE_TYPE_MAP = { + "Ref": Ref, + "Tag": Tag, + "Branch": Branch, + "PullRequest": PullRequest, + } + + def _getChangeClass(self, change_type): + return self.CHANGE_TYPE_MAP[change_type] + + def _getChangeType(self, change): + return type(change).__name__ + + class PagureEventConnector(threading.Thread): """Move events from Pagure into the scheduler""" @@ -455,7 +473,6 @@ class PagureConnection(BaseConnection): def __init__(self, driver, connection_name, connection_config): super(PagureConnection, self).__init__( driver, connection_name, connection_config) - self._change_cache = {} self.project_branch_cache = {} self.projects = {} self.server = self.connection_config.get('server', 'pagure.io') @@ -484,6 +501,8 @@ class PagureConnection(BaseConnection): self.event_queue = ConnectionEventQueue( self.sched.zk_client, self.connection_name ) + self.log.debug('Creating Zookeeper change cache') + self._change_cache = PagureChangeCache(self.sched.zk_client, self) self.log.info('Starting event connector') self._start_event_connector() @@ -527,12 +546,18 @@ class PagureConnection(BaseConnection): return token def maintainCache(self, relevant): - remove = set() - for key, change in self._change_cache.items(): + for change in self._change_cache: if change not in relevant: - remove.add(key) - for key in remove: - del self._change_cache[key] + self._change_cache.delete(change.cache_stat.key) + # TODO: remove entries older than X + self._change_cache.cleanup() + + def updateChangeAttributes(self, change, **attrs): + def _update_attrs(c): + for name, value in attrs.items(): + setattr(c, name, value) + self._change_cache.updateChangeWithRetry(change.cache_stat.key, + change, _update_attrs) def getWebController(self, zuul_web): return PagureWebController(zuul_web, self) @@ -582,36 +607,15 @@ class PagureConnection(BaseConnection): change = self._getChange( project, event.change_number, event.patch_number, refresh=refresh, event=event) - change.is_current_patchset = (change.pr.get('commit_stop') == - event.patch_number) else: self.log.info("Getting change for %s ref:%s" % ( project, event.ref)) - if event.ref and event.ref.startswith('refs/tags/'): - change = Tag(project) - change.tag = event.tag - change.branch = None - elif event.ref and event.ref.startswith('refs/heads/'): - change = Branch(project) - change.branch = event.branch - else: - change = Ref(project) - change.branch = None - change.ref = event.ref - change.oldrev = event.oldrev - change.newrev = event.newrev - change.url = self.getGitwebUrl(project, sha=event.newrev) - - # Pagure does not send files details in the git-receive event. - # Explicitly set files to None and let the pipelines processor - # call the merger asynchronuously - change.files = None - + change = self._getNonPRRef(project, event) return change def _getChange(self, project, number, patchset=None, refresh=False, url=None, event=None): - key = (project.name, number, patchset) + key = str((project.name, number, patchset)) change = self._change_cache.get(key) if change and not refresh: self.log.debug("Getting change from cache %s" % str(key)) @@ -626,18 +630,53 @@ class PagureConnection(BaseConnection): change.uris = [ '%s/%s/pull/%s' % (self.baseurl, project, number), ] - change.cache_stat = CacheStat(key, None, None) - self._change_cache[key] = change try: self.log.debug("Getting change pr#%s from project %s" % ( number, project.name)) - self._updateChange(change, event) + self.log.info("Updating change from pagure %s" % change) + pull = self.getPull(change.project.name, change.number) + + def _update_change(c): + self._updateChange(c, event, pull) + + change = self._change_cache.updateChangeWithRetry(key, change, + _update_change) + + if self.sched: + self.sched.onChangeUpdated(change, event) except Exception: - if key in self._change_cache: - del self._change_cache[key] + self.log.warning("Deleting cache key %s due to exception", key) + self._change_cache.delete(key) raise return change + def _getNonPRRef(self, project, event): + key = str((project.name, event.ref, event.newrev)) + change = self._change_cache.get(key) + if change: + return change + if event.ref and event.ref.startswith('refs/tags/'): + change = Tag(project) + change.tag = event.tag + elif event.ref and event.ref.startswith('refs/heads/'): + change = Branch(project) + change.branch = event.branch + else: + change = Ref(project) + change.ref = event.ref + change.oldrev = event.oldrev + change.newrev = event.newrev + change.url = self.getGitwebUrl(project, sha=event.newrev) + # Pagure does not send files details in the git-receive event. + # Explicitly set files to None and let the pipelines processor + # call the merger asynchronuously + change.files = None + try: + self._change_cache.set(key, change) + except ConcurrentUpdateError: + change = self._change_cache.get(key) + return change + def _hasRequiredStatusChecks(self, change): pagure = self.get_project_api_client(change.project.name) flag = pagure.get_pr_flags(change.number, self.username, last=True) @@ -718,12 +757,12 @@ class PagureConnection(BaseConnection): score_board[author] -= 1 return sum(score_board.values()) - def _updateChange(self, change, event): - self.log.info("Updating change from pagure %s" % change) - change.pr = self.getPull(change.project.name, change.number) + def _updateChange(self, change, event, pull): + change.pr = pull change.ref = "refs/pull/%s/head" % change.number change.branch = change.pr.get('branch') - change.patchset = change.pr.get('commit_stop') + change.is_current_patchset = (change.pr.get('commit_stop') == + change.patchset) change.files = change.pr.get('files') change.title = change.pr.get('title') change.tags = change.pr.get('tags') @@ -735,10 +774,6 @@ class PagureConnection(BaseConnection): # last_updated seems to be touch for comment changed/flags - that's OK change.updated_at = change.pr.get('last_updated') self.log.info("Updated change from pagure %s" % change) - - if self.sched: - self.sched.onChangeUpdated(change, event) - return change def commentPull(self, project, number, message): @@ -773,11 +808,11 @@ class PagureConnection(BaseConnection): # a the depends-on string in PR initial message. Not a blocker # for now, let's workaround using the local change cache ! changes_dependencies = [] - for cached_change_id, _change in self._change_cache.items(): + for cached_change in self._change_cache: for dep_header in dependson.find_dependency_headers( - _change.message): + cached_change.message): if change.url in dep_header: - changes_dependencies.append(_change) + changes_dependencies.append(cached_change) return changes_dependencies def mergePull(self, project, number): diff --git a/zuul/driver/pagure/paguremodel.py b/zuul/driver/pagure/paguremodel.py index a0d105880..052f8e3b9 100644 --- a/zuul/driver/pagure/paguremodel.py +++ b/zuul/driver/pagure/paguremodel.py @@ -22,12 +22,10 @@ EMPTY_GIT_REF = '0' * 40 # git sha of all zeros, used during creates/deletes class PullRequest(Change): def __init__(self, project): super(PullRequest, self).__init__(project) - self.project = None self.pr = None self.updated_at = None self.title = None self.score = 0 - self.files = [] self.tags = [] self.status = None @@ -53,6 +51,27 @@ class PullRequest(Change): r.append('state: open') return ' '.join(r) + '>' + def serialize(self): + d = super().serialize() + d.update({ + "pr": self.pr, + "updated_at": self.updated_at, + "title": self.title, + "score": self.score, + "tags": self.tags, + "status": self.status, + }) + return d + + def deserialize(self, data): + super().deserialize(data) + self.pr = data.get("pr") + self.updated_at = data.get("updated_at") + self.title = data.get("title") + self.score = data.get("score", 0) + self.tags = data.get("tags", []) + self.status = data.get("status") + def isUpdateOf(self, other): if (self.project == other.project and hasattr(other, 'number') and self.number == other.number and diff --git a/zuul/driver/pagure/paguresource.py b/zuul/driver/pagure/paguresource.py index 4202df070..c7452b0b1 100644 --- a/zuul/driver/pagure/paguresource.py +++ b/zuul/driver/pagure/paguresource.py @@ -96,7 +96,7 @@ class PagureSource(BaseSource): change, projects, tenant) def getCachedChanges(self): - return list(self.connection._change_cache.values()) + yield from self.connection._change_cache def getProject(self, name): p = self.connection.getProject(name) @@ -141,6 +141,9 @@ class PagureSource(BaseSource): def getRefForChange(self, change): raise NotImplementedError() + def setChangeAttributes(self, change, **attrs): + return self.connection.updateChangeAttributes(change, **attrs) + # Require model def getRequireSchema(): |