diff options
author | James E. Blair <jim@acmegating.com> | 2022-02-03 14:33:08 -0800 |
---|---|---|
committer | James E. Blair <jim@acmegating.com> | 2022-02-17 13:14:23 -0800 |
commit | df220cd4d6d5dae4e2b62ab2298dd3dbac88640a (patch) | |
tree | 8973a0d5e18117ea5d6ea662436d5198ec6e4fe4 /zuul/driver/github | |
parent | 6e991361accdbd122eae27169332a29a15d5df63 (diff) | |
download | zuul-df220cd4d6d5dae4e2b62ab2298dd3dbac88640a.tar.gz |
Populate missing change cache entries
The drivers are expected to populate the change cache before
passing trigger events to the scheduler so that all the difficult
work is done outside the main loop. Further, the cache cleanup
is designed to accomodate this so that events in-flight don't have
their change cache entries removed early.
However, at several points since moving the change cache into ZK,
programming errors have caused us to encounter enqueued changes
without entries in the cache. This usually causes Zuul to abort
pipeline processing and is unrecoverable.
We should continue to address all incidences of those since they
represent Zuul not working as designed. However, it would be nice
if Zuul was able to recover from this.
To that end, this change allows missing changes to be added to the
change cache.
That is primarily accomplished by adjusting the Source.getChange
method to accept a ChangeKey instead of an Event. Events are only
available when the triggering event happens, whereas a ChangeKey
is available when loading the pipeline state.
A ChangeKey represents the minimal distinguishing characteristics
of a change, and so can be used in all cases. Some drivers obtain
extra information from events, so we still pass it into the getChange
method if available, but it's entirely optional -- we should still
get a workable Change object whether or not it's supplied.
Ref (and derived: Branch, Tag) objects currently only store their
newrev attribute in the ChangeKey, however we need to be able to
create Ref objects with an oldrev as well. Since the old and new
revs of a Ref are not inherent to the ref but rather the generating
event, we can't get that from the source system. So we need to
extend the ChangeKey object to include that. Adding an extra
attribute is troublesome since the ChangeKey is not a ZKObject and
therefore doesn't have access to the model api version. However,
it's not too much of a stretch to say that the "revision" field
(which like all ChangeKey fileds is driver-dependent) should include
the old and new revs. Therefore, in these cases the field is
upgraded in a backwards compatible way to include old and newrev
in the standard "old..new" git encoding format. We also need to
support "None" since that is a valid value in Zuul.
So that we can continue to identify cache errors, any time we encounter
a change key that is not in the cache and we also don't have an
event object, we log an error.
Almost all of this commit is the refactor to accept change keys
instead of events in getChange. The functional change to populate
the cache if it's missing basically consists of just removing
getChangeByKey and replacing it with getChange. A test which deletes
the cache midway through is added.
Change-Id: I4252bea6430cd434dbfaacd583db584cc796dfaa
Diffstat (limited to 'zuul/driver/github')
-rw-r--r-- | zuul/driver/github/githubconnection.py | 154 | ||||
-rw-r--r-- | zuul/driver/github/githubsource.py | 41 |
2 files changed, 110 insertions, 85 deletions
diff --git a/zuul/driver/github/githubconnection.py b/zuul/driver/github/githubconnection.py index e56063d58..401a39bce 100644 --- a/zuul/driver/github/githubconnection.py +++ b/zuul/driver/github/githubconnection.py @@ -397,12 +397,10 @@ class GithubEventProcessor(object): project = self.connection.source.getProject(event.project_name) change = None if event.change_number: - change = self.connection._getChange( - project, - event.change_number, - event.patch_number, - refresh=True, - event=event) + change_key = self.connection.source.getChangeKey(event) + change = self.connection._getChange(change_key, + refresh=True, + event=event) self.log.debug("Refreshed change %s,%s", event.change_number, event.patch_number) @@ -1296,47 +1294,43 @@ class GithubConnection(ZKChangeCacheMixin, ZKBranchCacheMixin, BaseConnection): return self._github_client_manager.getGithubClient( project_name=project_name, zuul_event_id=zuul_event_id) - def getChange(self, event, refresh=False): - """Get the change representing an event.""" - - project = self.source.getProject(event.project_name) - if event.change_number: - change = self._getChange(project, event.change_number, - event.patch_number, refresh=refresh, - event=event) - else: - if event.ref and event.ref.startswith('refs/tags/'): - change = self._getTag(project, event, refresh=refresh) - elif event.ref and event.ref.startswith('refs/heads/'): - change = self._getBranch(project, event, refresh=refresh) - else: - change = self._getRef(project, event, refresh=refresh) - return change - - def _getChange(self, project, number, patchset=None, refresh=False, - event=None): + def getChange(self, change_key, refresh=False, event=None): + if change_key.connection_name != self.connection_name: + return None + if change_key.change_type == 'PullRequest': + return self._getChange(change_key, refresh=refresh, event=event) + elif change_key.change_type == 'Tag': + return self._getTag(change_key, refresh=refresh, event=event) + elif change_key.change_type == 'Branch': + return self._getBranch(change_key, refresh=refresh, event=event) + elif change_key.change_type == 'Ref': + return self._getRef(change_key, refresh=refresh, event=event) + + def _getChange(self, change_key, refresh=False, event=None): # Note(tobiash): We force the pull request number to int centrally here # because it can originate from different sources (github event, manual # enqueue event) where some might just parse the string and forward it. - number = int(number) - key = ChangeKey(self.connection_name, project.name, - 'PullRequest', str(number), - str(patchset)) - change = self._change_cache.get(key) + number = int(change_key.stable_id) + change = self._change_cache.get(change_key) if change and not refresh: return change + project = self.source.getProject(change_key.project_name) if not change: + if not event: + self.log.error("Change %s not found in cache and no event", + change_key) change = PullRequest(project.name) change.project = project change.number = number - change.patchset = patchset + change.patchset = change_key.revision # This can be called multi-threaded during github event # preprocessing. In order to avoid data races perform locking # by cached key. Try to acquire the lock non-blocking at first. # If the lock is already taken we're currently updating the very # same chnange right now and would likely get the same data again. - lock = self._change_update_lock.setdefault(key, threading.Lock()) + lock = self._change_update_lock.setdefault(change_key, + threading.Lock()) if lock.acquire(blocking=False): try: pull = self.getPull(change.project.name, change.number, @@ -1346,11 +1340,11 @@ class GithubConnection(ZKChangeCacheMixin, ZKBranchCacheMixin, BaseConnection): self._updateChange(c, event, pull) change = self._change_cache.updateChangeWithRetry( - key, change, _update_change) + change_key, change, _update_change) finally: # We need to remove the lock here again so we don't leak # them. - del self._change_update_lock[key] + del self._change_update_lock[change_key] lock.release() else: # We didn't get the lock so we don't need to update the same @@ -1363,75 +1357,81 @@ class GithubConnection(ZKChangeCacheMixin, ZKBranchCacheMixin, BaseConnection): log.debug('Finished updating change %s', change) return change - def _getTag(self, project, event, refresh=False): - tag = event.ref[len('refs/tags/'):] - key = ChangeKey(self.connection_name, project.name, - 'Tag', tag, event.newrev) - change = self._change_cache.get(key) + def _getTag(self, change_key, refresh=False, event=None): + tag = change_key.stable_id + change = self._change_cache.get(change_key) if change: if refresh: self._change_cache.updateChangeWithRetry( - key, change, lambda c: None) + change_key, change, lambda c: None) return change + if not event: + self.log.error("Change %s not found in cache and no event", + change_key) + project = self.source.getProject(change_key.project_name) change = Tag(project) change.tag = tag - change.ref = event.ref - change.oldrev = event.oldrev - change.newrev = event.newrev + change.ref = f'refs/tags/{tag}' + change.oldrev = change_key.oldrev + change.newrev = change_key.newrev # Build the url pointing to this tag/release on GitHub. - change.url = self.getGitwebUrl(project, sha=event.newrev, tag=tag) + change.url = self.getGitwebUrl(project, sha=change.newrev, tag=tag) if hasattr(event, 'commits'): change.files = self.getPushedFileNames(event) try: - self._change_cache.set(key, change) + self._change_cache.set(change_key, change) except ConcurrentUpdateError: - change = self._change_cache.get(key) + change = self._change_cache.get(change_key) return change - def _getBranch(self, project, event, refresh=False): - branch = event.ref[len('refs/heads/'):] - key = ChangeKey(self.connection_name, project.name, - 'Branch', branch, event.newrev) - change = self._change_cache.get(key) + def _getBranch(self, change_key, refresh=False, event=None): + branch = change_key.stable_id + change = self._change_cache.get(change_key) if change: if refresh: self._change_cache.updateChangeWithRetry( - key, change, lambda c: None) + change_key, change, lambda c: None) return change + if not event: + self.log.error("Change %s not found in cache and no event", + change_key) + project = self.source.getProject(change_key.project_name) change = Branch(project) change.branch = branch - change.ref = event.ref - change.oldrev = event.oldrev - change.newrev = event.newrev - change.url = self.getGitwebUrl(project, sha=event.newrev) + change.ref = f'refs/heads/{branch}' + change.oldrev = change_key.oldrev + change.newrev = change_key.newrev + change.url = self.getGitwebUrl(project, sha=change.newrev) if hasattr(event, 'commits'): change.files = self.getPushedFileNames(event) try: - self._change_cache.set(key, change) + self._change_cache.set(change_key, change) except ConcurrentUpdateError: - change = self._change_cache.get(key) + change = self._change_cache.get(change_key) return change - def _getRef(self, project, event, refresh=False): - key = ChangeKey(self.connection_name, project.name, - 'Ref', event.ref, event.newrev) - change = self._change_cache.get(key) + def _getRef(self, change_key, refresh=False, event=None): + change = self._change_cache.get(change_key) if change: if refresh: self._change_cache.updateChangeWithRetry( - key, change, lambda c: None) + change_key, change, lambda c: None) return change + if not event: + self.log.error("Change %s not found in cache and no event", + change_key) + project = self.source.getProject(change_key.project_name) change = Ref(project) - change.ref = event.ref - change.oldrev = event.oldrev - change.newrev = event.newrev - change.url = self.getGitwebUrl(project, sha=event.newrev) + change.ref = change_key.stable_id + change.oldrev = change_key.oldrev + change.newrev = change_key.newrev + change.url = self.getGitwebUrl(project, sha=change.newrev) if hasattr(event, 'commits'): change.files = self.getPushedFileNames(event) try: - self._change_cache.set(key, change) + self._change_cache.set(change_key, change) except ConcurrentUpdateError: - change = self._change_cache.get(key) + change = self._change_cache.get(change_key) return change def getChangesDependingOn(self, change, projects, tenant): @@ -1491,8 +1491,9 @@ class GithubConnection(ZKChangeCacheMixin, ZKBranchCacheMixin, BaseConnection): for key in keys: (proj, num, sha) = key - project = self.source.getProject(proj) - change = self._getChange(project, int(num), patchset=sha) + dep_change_key = ChangeKey(self.connection_name, proj, + 'PullRequest', str(num), str(sha)) + change = self._getChange(dep_change_key) changes.append(change) return changes @@ -1770,10 +1771,11 @@ class GithubConnection(ZKChangeCacheMixin, ZKBranchCacheMixin, BaseConnection): cached_pr_numbers = self._sha_pr_cache.get(project_name, sha) if len(cached_pr_numbers) > 1: raise Exception('Multiple pulls found with head sha %s' % sha) - project = self.getProject(project_name) if len(cached_pr_numbers) == 1: for pr in cached_pr_numbers: - pr_body = self._getChange(project, pr, sha, event=event).pr + pr_change_key = ChangeKey(self.connection_name, project_name, + 'PullRequest', str(pr), str(sha)) + pr_body = self._getChange(pr_change_key, event=event).pr return pr_body github = self.getGithubClient(project_name, zuul_event_id=event) @@ -1789,8 +1791,10 @@ class GithubConnection(ZKChangeCacheMixin, ZKBranchCacheMixin, BaseConnection): # with multiple pulls found. found_pr_body = None for item in issues: - pr_body = self._getChange( - project, item.issue.number, sha, event=event).pr + pr_change_key = ChangeKey(self.connection_name, project_name, + 'PullRequest', str(item.issue.number), + str(sha)) + pr_body = self._getChange(pr_change_key, event=event).pr self._sha_pr_cache.update(project_name, pr_body) if pr_body['head']['sha'] == sha: if found_pr_body: diff --git a/zuul/driver/github/githubsource.py b/zuul/driver/github/githubsource.py index 8c08a8b13..ebf3abae0 100644 --- a/zuul/driver/github/githubsource.py +++ b/zuul/driver/github/githubsource.py @@ -22,6 +22,7 @@ from zuul.source import BaseSource from zuul.model import Project from zuul.driver.github.githubmodel import GithubRefFilter from zuul.driver.util import scalar_or_list, to_list +from zuul.zk.change_cache import ChangeKey class GithubSource(BaseSource): @@ -64,8 +65,31 @@ class GithubSource(BaseSource): """Called after configuration has been processed.""" pass - def getChange(self, event, refresh=False): - return self.connection.getChange(event, refresh) + def getChangeKey(self, event): + connection_name = self.connection.connection_name + if event.change_number: + return ChangeKey(connection_name, event.project_name, + 'PullRequest', + str(event.change_number), + str(event.patch_number)) + revision = f'{event.oldrev}..{event.newrev}' + if event.ref and event.ref.startswith('refs/tags/'): + tag = event.ref[len('refs/tags/'):] + return ChangeKey(connection_name, event.project_name, + 'Tag', tag, revision) + if event.ref and event.ref.startswith('refs/heads/'): + branch = event.ref[len('refs/heads/'):] + return ChangeKey(connection_name, event.project_name, + 'Branch', branch, revision) + if event.ref: + return ChangeKey(connection_name, event.project_name, + 'Ref', event.ref, revision) + + self.log.warning("Unable to format change key for %s" % (self,)) + + def getChange(self, change_key, refresh=False, event=None): + return self.connection.getChange(change_key, refresh=refresh, + event=event) change_re = re.compile(r"/(.*?)/(.*?)/pull/(\d+)[\w]*") @@ -88,16 +112,13 @@ class GithubSource(BaseSource): if not pull: return None proj = pull.get('base').get('repo').get('full_name') - project = self.getProject(proj) - change = self.connection._getChange( - project, num, - patchset=pull.get('head').get('sha'), - event=event) + change_key = ChangeKey(self.connection.connection_name, proj, + 'PullRequest', + str(num), + pull.get('head').get('sha')) + change = self.connection._getChange(change_key, event=event) return change - def getChangeByKey(self, key): - return self.connection.getChangeByKey(key) - def getChangesDependingOn(self, change, projects, tenant): return self.connection.getChangesDependingOn(change, projects, tenant) |