summaryrefslogtreecommitdiff
path: root/zuul/driver
diff options
context:
space:
mode:
authorJames E. Blair <jim@acmegating.com>2022-02-03 14:33:08 -0800
committerJames E. Blair <jim@acmegating.com>2022-02-17 13:14:23 -0800
commitdf220cd4d6d5dae4e2b62ab2298dd3dbac88640a (patch)
tree8973a0d5e18117ea5d6ea662436d5198ec6e4fe4 /zuul/driver
parent6e991361accdbd122eae27169332a29a15d5df63 (diff)
downloadzuul-df220cd4d6d5dae4e2b62ab2298dd3dbac88640a.tar.gz
Populate missing change cache entries
The drivers are expected to populate the change cache before passing trigger events to the scheduler so that all the difficult work is done outside the main loop. Further, the cache cleanup is designed to accomodate this so that events in-flight don't have their change cache entries removed early. However, at several points since moving the change cache into ZK, programming errors have caused us to encounter enqueued changes without entries in the cache. This usually causes Zuul to abort pipeline processing and is unrecoverable. We should continue to address all incidences of those since they represent Zuul not working as designed. However, it would be nice if Zuul was able to recover from this. To that end, this change allows missing changes to be added to the change cache. That is primarily accomplished by adjusting the Source.getChange method to accept a ChangeKey instead of an Event. Events are only available when the triggering event happens, whereas a ChangeKey is available when loading the pipeline state. A ChangeKey represents the minimal distinguishing characteristics of a change, and so can be used in all cases. Some drivers obtain extra information from events, so we still pass it into the getChange method if available, but it's entirely optional -- we should still get a workable Change object whether or not it's supplied. Ref (and derived: Branch, Tag) objects currently only store their newrev attribute in the ChangeKey, however we need to be able to create Ref objects with an oldrev as well. Since the old and new revs of a Ref are not inherent to the ref but rather the generating event, we can't get that from the source system. So we need to extend the ChangeKey object to include that. Adding an extra attribute is troublesome since the ChangeKey is not a ZKObject and therefore doesn't have access to the model api version. However, it's not too much of a stretch to say that the "revision" field (which like all ChangeKey fileds is driver-dependent) should include the old and new revs. Therefore, in these cases the field is upgraded in a backwards compatible way to include old and newrev in the standard "old..new" git encoding format. We also need to support "None" since that is a valid value in Zuul. So that we can continue to identify cache errors, any time we encounter a change key that is not in the cache and we also don't have an event object, we log an error. Almost all of this commit is the refactor to accept change keys instead of events in getChange. The functional change to populate the cache if it's missing basically consists of just removing getChangeByKey and replacing it with getChange. A test which deletes the cache midway through is added. Change-Id: I4252bea6430cd434dbfaacd583db584cc796dfaa
Diffstat (limited to 'zuul/driver')
-rw-r--r--zuul/driver/gerrit/gerritconnection.py165
-rw-r--r--zuul/driver/gerrit/gerritmodel.py1
-rw-r--r--zuul/driver/gerrit/gerritsource.py51
-rw-r--r--zuul/driver/git/gitconnection.py42
-rw-r--r--zuul/driver/git/gitmodel.py1
-rw-r--r--zuul/driver/git/gitsource.py21
-rw-r--r--zuul/driver/github/githubconnection.py154
-rw-r--r--zuul/driver/github/githubsource.py41
-rw-r--r--zuul/driver/gitlab/gitlabconnection.py114
-rw-r--r--zuul/driver/gitlab/gitlabsource.py39
-rw-r--r--zuul/driver/pagure/pagureconnection.py93
-rw-r--r--zuul/driver/pagure/paguresource.py41
-rw-r--r--zuul/driver/timer/__init__.py4
13 files changed, 439 insertions, 328 deletions
diff --git a/zuul/driver/gerrit/gerritconnection.py b/zuul/driver/gerrit/gerritconnection.py
index 18f31c189..9d339cb09 100644
--- a/zuul/driver/gerrit/gerritconnection.py
+++ b/zuul/driver/gerrit/gerritconnection.py
@@ -324,10 +324,8 @@ class GerritEventConnector(threading.Thread):
# cache as it may be a dependency
if event.change_number:
refresh = True
- key = ChangeKey(self.connection.connection_name, None,
- 'GerritChange', str(event.change_number),
- str(event.patch_number))
- if self.connection._change_cache.get(key) is None:
+ change_key = self.connection.source.getChangeKey(event)
+ if self.connection._change_cache.get(change_key) is None:
refresh = False
for tenant in self.connection.sched.abide.tenants.values():
# TODO(fungi): it would be better to have some simple means
@@ -350,8 +348,7 @@ class GerritEventConnector(threading.Thread):
# we need to update those objects by reference so that they
# have the correct/new information and also avoid hitting
# gerrit multiple times.
- self.connection._getChange(event.change_number,
- event.patch_number,
+ self.connection._getChange(change_key,
refresh=True, event=event)
@@ -743,110 +740,101 @@ class GerritConnection(ZKChangeCacheMixin, ZKBranchCacheMixin, BaseConnection):
def addProject(self, project: Project) -> None:
self.projects[project.name] = project
- def getChange(self, event, refresh=False):
- if event.change_number:
- change = self._getChange(event.change_number, event.patch_number,
- refresh=refresh)
- elif event.ref and event.ref.startswith('refs/tags/'):
- change = self._getTag(event, refresh=refresh)
- elif event.ref and not event.ref.startswith('refs/'):
- # Pre 2.13 Gerrit ref-updated events don't have branch prefixes.
- change = self._getBranch(event, branch=event.ref,
- ref=f'refs/heads/{event.ref}',
- refresh=refresh)
- elif event.ref and event.ref.startswith('refs/heads/'):
- # From the timer trigger or Post 2.13 Gerrit
- change = self._getBranch(event,
- branch=event.ref[len('refs/heads/'):],
- ref=event.ref, refresh=refresh)
- elif event.ref:
- # catch-all ref (ie, not a branch or head)
- change = self._getRef(event, refresh=refresh)
- else:
- self.log.warning("Unable to get change for %s" % (event,))
- change = None
- return change
-
- def _getChange(self, number, patchset, refresh=False, history=None,
+ def getChange(self, change_key, refresh=False, event=None):
+ if change_key.connection_name != self.connection_name:
+ return None
+ if change_key.change_type == 'GerritChange':
+ return self._getChange(change_key, refresh=refresh, event=event)
+ elif change_key.change_type == 'Tag':
+ return self._getTag(change_key, refresh=refresh, event=event)
+ elif change_key.change_type == 'Branch':
+ return self._getBranch(change_key, refresh=refresh, event=event)
+ elif change_key.change_type == 'Ref':
+ return self._getRef(change_key, refresh=refresh, event=event)
+
+ def _getChange(self, change_key, refresh=False, history=None,
event=None):
# Ensure number and patchset are str
- number = str(number)
- patchset = str(patchset)
- key = ChangeKey(self.connection_name, None,
- 'GerritChange', number, patchset)
- change = self._change_cache.get(key)
+ change = self._change_cache.get(change_key)
if change and not refresh:
return change
if not change:
+ if not event:
+ self.log.error("Change %s not found in cache and no event",
+ change_key)
change = GerritChange(None)
- change.number = number
- change.patchset = patchset
- return self._updateChange(key, change, event, history)
-
- def _getTag(self, event, refresh=False):
- tag = event.ref[len('refs/tags/'):]
- key = ChangeKey(self.connection_name, event.project_name,
- 'Tag', tag, event.newrev)
- change = self._change_cache.get(key)
+ change.number = change_key.stable_id
+ change.patchset = change_key.revision
+ return self._updateChange(change_key, change, event, history)
+
+ def _getTag(self, change_key, refresh=False, event=None):
+ tag = change_key.stable_id
+ change = self._change_cache.get(change_key)
if change:
if refresh:
self._change_cache.updateChangeWithRetry(
- key, change, lambda c: None)
+ change_key, change, lambda c: None)
return change
- project = self.source.getProject(event.project_name)
+ if not event:
+ self.log.error("Change %s not found in cache and no event",
+ change_key)
+ project = self.source.getProject(change_key.project_name)
change = Tag(project)
change.tag = tag
- change.ref = event.ref
- change.oldrev = event.oldrev
- change.newrev = event.newrev
- change.url = self._getWebUrl(project, sha=event.newrev)
+ change.ref = f'refs/tags/{tag}'
+ change.oldrev = change_key.oldrev
+ change.newrev = change_key.newrev
+ change.url = self._getWebUrl(project, sha=change.newrev)
try:
- self._change_cache.set(key, change)
+ self._change_cache.set(change_key, change)
except ConcurrentUpdateError:
- change = self._change_cache.get(key)
+ change = self._change_cache.get(change_key)
return change
- def _getBranch(self, event, branch, ref, refresh=False):
- key = ChangeKey(self.connection_name, event.project_name,
- 'Branch', branch, event.newrev)
- change = self._change_cache.get(key)
+ def _getBranch(self, change_key, refresh=False, event=None):
+ branch = change_key.stable_id
+ change = self._change_cache.get(change_key)
if change:
if refresh:
self._change_cache.updateChangeWithRetry(
- key, change, lambda c: None)
+ change_key, change, lambda c: None)
return change
- project = self.source.getProject(event.project_name)
+ if not event:
+ self.log.error("Change %s not found in cache and no event",
+ change_key)
+ project = self.source.getProject(change_key.project_name)
change = Branch(project)
change.branch = branch
- change.ref = ref
- change.oldrev = event.oldrev
- change.newrev = event.newrev
- change.url = self._getWebUrl(project, sha=event.newrev)
+ change.ref = f'refs/heads/{branch}'
+ change.oldrev = change_key.oldrev
+ change.newrev = change_key.newrev
+ change.url = self._getWebUrl(project, sha=change.newrev)
try:
- self._change_cache.set(key, change)
+ self._change_cache.set(change_key, change)
except ConcurrentUpdateError:
- change = self._change_cache.get(key)
+ change = self._change_cache.get(change_key)
return change
- def _getRef(self, event, refresh=False):
- key = ChangeKey(self.connection_name, event.project_name,
- 'Ref', event.ref, event.newrev)
- change = self._change_cache.get(key)
+ def _getRef(self, change_key, refresh=False, event=None):
+ change = self._change_cache.get(change_key)
if change:
if refresh:
self._change_cache.updateChangeWithRetry(
- key, change, lambda c: None)
+ change_key, change, lambda c: None)
return change
- project = self.source.getProject(event.project_name)
+ if not event:
+ self.log.error("Change %s not found in cache and no event",
+ change_key)
+ project = self.source.getProject(change_key.project_name)
change = Ref(project)
- change.ref = event.ref
- change.oldrev = event.oldrev
- change.newrev = event.newrev
- change.url = self._getWebUrl(project, sha=event.newrev)
+ change.ref = change_key.stable_id
+ change.oldrev = change_key.oldrev
+ change.newrev = change_key.newrev
+ change.url = self._getWebUrl(project, sha=change.newrev)
try:
- self._change_cache.set(key, change)
+ self._change_cache.set(change_key, change)
except ConcurrentUpdateError:
- change = self._change_cache.get(key)
+ change = self._change_cache.get(change_key)
return change
def _getDependsOnFromCommit(self, message, change, event):
@@ -939,7 +927,9 @@ class GerritConnection(ZKChangeCacheMixin, ZKBranchCacheMixin, BaseConnection):
dep_num, dep_ps = data.depends_on
log.debug("Updating %s: Getting git-dependent change %s,%s",
change, dep_num, dep_ps)
- dep = self._getChange(dep_num, dep_ps, history=history,
+ dep_key = ChangeKey(self.connection_name, None,
+ 'GerritChange', str(dep_num), str(dep_ps))
+ dep = self._getChange(dep_key, history=history,
event=event)
# This is a git commit dependency. So we only ignore it if it is
# already merged. So even if it is "ABANDONED", we should not
@@ -953,7 +943,9 @@ class GerritConnection(ZKChangeCacheMixin, ZKBranchCacheMixin, BaseConnection):
change.message, change, event):
log.debug("Updating %s: Getting commit-dependent "
"change %s,%s", change, dep_num, dep_ps)
- dep = self._getChange(dep_num, dep_ps, history=history,
+ dep_key = ChangeKey(self.connection_name, None,
+ 'GerritChange', str(dep_num), str(dep_ps))
+ dep = self._getChange(dep_key, history=history,
event=event)
if dep.open and dep not in needs_changes:
compat_needs_changes.append(dep.cache_key)
@@ -965,7 +957,9 @@ class GerritConnection(ZKChangeCacheMixin, ZKBranchCacheMixin, BaseConnection):
try:
log.debug("Updating %s: Getting git-needed change %s,%s",
change, dep_num, dep_ps)
- dep = self._getChange(dep_num, dep_ps, history=history,
+ dep_key = ChangeKey(self.connection_name, None,
+ 'GerritChange', str(dep_num), str(dep_ps))
+ dep = self._getChange(dep_key, history=history,
event=event)
if (dep.open and dep.is_current_patchset and
dep not in needed_by_changes):
@@ -987,8 +981,10 @@ class GerritConnection(ZKChangeCacheMixin, ZKBranchCacheMixin, BaseConnection):
# change). In case the dep is already in history we already
# refreshed this change so refresh is not needed in this case.
refresh = (dep_num, dep_ps) not in history
+ dep_key = ChangeKey(self.connection_name, None,
+ 'GerritChange', str(dep_num), str(dep_ps))
dep = self._getChange(
- dep_num, dep_ps, refresh=refresh, history=history,
+ dep_key, refresh=refresh, history=history,
event=event)
if (dep.open and dep.is_current_patchset
and dep not in needed_by_changes):
@@ -1094,8 +1090,11 @@ class GerritConnection(ZKChangeCacheMixin, ZKBranchCacheMixin, BaseConnection):
changes = [] # type: List[GerritChange]
for record in data:
try:
- changes.append(
- self._getChange(record.number, record.current_patchset))
+ change_key = ChangeKey(self.connection_name, None,
+ 'GerritChange',
+ str(record.number),
+ str(record.current_patchset))
+ changes.append(self._getChange(change_key))
except Exception:
self.log.exception("Unable to query change %s",
record.number)
diff --git a/zuul/driver/gerrit/gerritmodel.py b/zuul/driver/gerrit/gerritmodel.py
index 9fae239f8..1cdf77ccc 100644
--- a/zuul/driver/gerrit/gerritmodel.py
+++ b/zuul/driver/gerrit/gerritmodel.py
@@ -23,7 +23,6 @@ from zuul.model import Change, TriggerEvent
from zuul.driver.util import time_to_seconds
from zuul import exceptions
-
EMPTY_GIT_REF = '0' * 40 # git sha of all zeros, used during creates/deletes
diff --git a/zuul/driver/gerrit/gerritsource.py b/zuul/driver/gerrit/gerritsource.py
index 55a8957fb..086323e3c 100644
--- a/zuul/driver/gerrit/gerritsource.py
+++ b/zuul/driver/gerrit/gerritsource.py
@@ -22,6 +22,7 @@ from zuul.model import Project
from zuul.driver.gerrit.gerritmodel import GerritRefFilter
from zuul.driver.util import scalar_or_list, to_list
from zuul.lib.dependson import find_dependency_headers
+from zuul.zk.change_cache import ChangeKey
class GerritSource(BaseSource):
@@ -54,8 +55,36 @@ class GerritSource(BaseSource):
def postConfig(self):
pass
- def getChange(self, event, refresh=False):
- return self.connection.getChange(event, refresh)
+ def getChangeKey(self, event):
+ connection_name = self.connection.connection_name
+ if event.change_number:
+ return ChangeKey(connection_name, None,
+ 'GerritChange',
+ str(event.change_number),
+ str(event.patch_number))
+ revision = f'{event.oldrev}..{event.newrev}'
+ if event.ref and event.ref.startswith('refs/tags/'):
+ tag = event.ref[len('refs/tags/'):]
+ return ChangeKey(connection_name, event.project_name,
+ 'Tag', tag, revision)
+ if event.ref and not event.ref.startswith('refs/'):
+ # Pre 2.13 Gerrit ref-updated events don't have branch prefixes.
+ return ChangeKey(connection_name, event.project_name,
+ 'Branch', event.ref, revision)
+ if event.ref and event.ref.startswith('refs/heads/'):
+ # From the timer trigger or Post 2.13 Gerrit
+ branch = event.ref[len('refs/heads/'):]
+ return ChangeKey(connection_name, event.project_name,
+ 'Branch', branch, revision)
+ if event.ref:
+ # catch-all ref (ie, not a branch or head)
+ return ChangeKey(connection_name, event.project_name,
+ 'Ref', event.ref, revision)
+ self.log.warning("Unable to format change key for %s" % (event,))
+
+ def getChange(self, change_key, refresh=False, event=None):
+ return self.connection.getChange(change_key, refresh=refresh,
+ event=event)
def getChangeByURL(self, url, event):
try:
@@ -76,14 +105,13 @@ class GerritSource(BaseSource):
results = self.connection.simpleQuery(query, event=event)
if not results:
return None
- change = self.connection._getChange(
- results[0].number, results[0].current_patchset,
- event=event)
+ change_key = ChangeKey(self.connection.connection_name, None,
+ 'GerritChange',
+ str(results[0].number),
+ str(results[0].current_patchset))
+ change = self.connection._getChange(change_key, event=event)
return change
- def getChangeByKey(self, key):
- return self.connection.getChangeByKey(key)
-
def getChangesDependingOn(self, change, projects, tenant):
changes = []
if not change.uris:
@@ -107,8 +135,11 @@ class GerritSource(BaseSource):
if key in seen:
continue
seen.add(key)
- change = self.connection._getChange(
- result.number, result.current_patchset)
+ change_key = ChangeKey(self.connection.connection_name, None,
+ 'GerritChange',
+ str(result.number),
+ str(result.current_patchset))
+ change = self.connection._getChange(change_key)
changes.append(change)
return changes
diff --git a/zuul/driver/git/gitconnection.py b/zuul/driver/git/gitconnection.py
index c0ade80ab..4e477558b 100644
--- a/zuul/driver/git/gitconnection.py
+++ b/zuul/driver/git/gitconnection.py
@@ -25,7 +25,6 @@ from zuul.driver.git.gitwatcher import GitWatcher
from zuul.model import Ref, Branch
from zuul.zk.change_cache import (
AbstractChangeCache,
- ChangeKey,
ConcurrentUpdateError,
)
@@ -102,40 +101,40 @@ class GitConnection(ZKChangeCacheMixin, BaseConnection):
refs[ref] = sha
return refs
- def getChange(self, event, refresh=False):
- key = ChangeKey(self.connection_name, event.project_name,
- 'Ref', event.ref, event.newrev)
- change = self._change_cache.get(key)
+ def getChange(self, change_key, refresh=False, event=None):
+ change = self._change_cache.get(change_key)
if change:
return change
- if event.ref and event.ref.startswith('refs/heads/'):
- branch = event.ref[len('refs/heads/'):]
- project = self.getProject(event.project_name)
+ if not event:
+ self.log.error("Change %s not found in cache and no event",
+ change_key)
+ project = self.source.getProject(change_key.project_name)
+ if change_key.change_type == 'Branch':
+ branch = change_key.stable_id
change = Branch(project)
change.branch = branch
- change.ref = event.ref
- change.oldrev = event.oldrev
- change.newrev = event.newrev
+ change.ref = f'refs/heads/{branch}'
+ change.oldrev = change_key.oldrev
+ change.newrev = change_key.newrev
change.url = ""
change.files = self.getChangeFilesUpdated(
- event.project_name, change.branch, event.oldrev)
- elif event.ref:
+ change_key.project_name, branch, change_key.oldrev)
+ elif change_key.change_type == 'Ref':
# catch-all ref (ie, not a branch or head)
- project = self.getProject(event.project_name)
change = Ref(project)
- change.ref = event.ref
- change.oldrev = event.oldrev
- change.newrev = event.newrev
+ change.ref = change_key.stable_id
+ change.oldrev = change_key.oldrev
+ change.newrev = change_key.newrev
change.url = ""
else:
- self.log.warning("Unable to get change for %s", event)
+ self.log.warning("Unable to get change for %s", change_key)
return None
try:
- self._change_cache.set(key, change)
+ self._change_cache.set(change_key, change)
except ConcurrentUpdateError:
- change = self._change_cache.get(key)
+ change = self._change_cache.get(change_key)
return change
def getProjectBranches(self, project, tenant, min_ltime=-1):
@@ -161,7 +160,8 @@ class GitConnection(ZKChangeCacheMixin, BaseConnection):
# Force changes cache update before passing
# the event to the scheduler
- self.getChange(event)
+ change_key = self.source.getChangeKey(event)
+ self.getChange(change_key)
self.logEvent(event)
# Pass the event to the scheduler
self.sched.addTriggerEvent(self.driver_name, event)
diff --git a/zuul/driver/git/gitmodel.py b/zuul/driver/git/gitmodel.py
index 3c472f3df..b7286ca51 100644
--- a/zuul/driver/git/gitmodel.py
+++ b/zuul/driver/git/gitmodel.py
@@ -17,7 +17,6 @@ import re
from zuul.model import TriggerEvent
from zuul.model import EventFilter
-
EMPTY_GIT_REF = '0' * 40 # git sha of all zeros, used during creates/deletes
diff --git a/zuul/driver/git/gitsource.py b/zuul/driver/git/gitsource.py
index 3c4f88608..bab617084 100644
--- a/zuul/driver/git/gitsource.py
+++ b/zuul/driver/git/gitsource.py
@@ -15,6 +15,7 @@
import logging
from zuul.source import BaseSource
from zuul.model import Project
+from zuul.zk.change_cache import ChangeKey
class GitSource(BaseSource):
@@ -35,15 +36,25 @@ class GitSource(BaseSource):
def canMerge(self, change, allow_needs, event=None, allow_refresh=False):
raise NotImplementedError()
- def getChange(self, event, refresh=False):
- return self.connection.getChange(event, refresh)
+ def getChangeKey(self, event):
+ connection_name = self.connection.connection_name
+ revision = f'{event.oldrev}..{event.newrev}'
+ if event.ref and event.ref.startswith('refs/heads/'):
+ branch = event.ref[len('refs/heads/'):]
+ return ChangeKey(connection_name, event.project_name,
+ 'Branch', branch, revision)
+ if event.ref:
+ return ChangeKey(connection_name, event.project_name,
+ 'Ref', event.ref, revision)
+ self.log.warning("Unable to format change key for %s" % (self,))
+
+ def getChange(self, change_key, refresh=False, event=None):
+ return self.connection.getChange(change_key, refresh=refresh,
+ event=event)
def getChangeByURL(self, url, event):
return None
- def getChangeByKey(self, key):
- return self.connection.getChangeByKey(key)
-
def getChangesDependingOn(self, change, projects, tenant):
return []
diff --git a/zuul/driver/github/githubconnection.py b/zuul/driver/github/githubconnection.py
index e56063d58..401a39bce 100644
--- a/zuul/driver/github/githubconnection.py
+++ b/zuul/driver/github/githubconnection.py
@@ -397,12 +397,10 @@ class GithubEventProcessor(object):
project = self.connection.source.getProject(event.project_name)
change = None
if event.change_number:
- change = self.connection._getChange(
- project,
- event.change_number,
- event.patch_number,
- refresh=True,
- event=event)
+ change_key = self.connection.source.getChangeKey(event)
+ change = self.connection._getChange(change_key,
+ refresh=True,
+ event=event)
self.log.debug("Refreshed change %s,%s",
event.change_number, event.patch_number)
@@ -1296,47 +1294,43 @@ class GithubConnection(ZKChangeCacheMixin, ZKBranchCacheMixin, BaseConnection):
return self._github_client_manager.getGithubClient(
project_name=project_name, zuul_event_id=zuul_event_id)
- def getChange(self, event, refresh=False):
- """Get the change representing an event."""
-
- project = self.source.getProject(event.project_name)
- if event.change_number:
- change = self._getChange(project, event.change_number,
- event.patch_number, refresh=refresh,
- event=event)
- else:
- if event.ref and event.ref.startswith('refs/tags/'):
- change = self._getTag(project, event, refresh=refresh)
- elif event.ref and event.ref.startswith('refs/heads/'):
- change = self._getBranch(project, event, refresh=refresh)
- else:
- change = self._getRef(project, event, refresh=refresh)
- return change
-
- def _getChange(self, project, number, patchset=None, refresh=False,
- event=None):
+ def getChange(self, change_key, refresh=False, event=None):
+ if change_key.connection_name != self.connection_name:
+ return None
+ if change_key.change_type == 'PullRequest':
+ return self._getChange(change_key, refresh=refresh, event=event)
+ elif change_key.change_type == 'Tag':
+ return self._getTag(change_key, refresh=refresh, event=event)
+ elif change_key.change_type == 'Branch':
+ return self._getBranch(change_key, refresh=refresh, event=event)
+ elif change_key.change_type == 'Ref':
+ return self._getRef(change_key, refresh=refresh, event=event)
+
+ def _getChange(self, change_key, refresh=False, event=None):
# Note(tobiash): We force the pull request number to int centrally here
# because it can originate from different sources (github event, manual
# enqueue event) where some might just parse the string and forward it.
- number = int(number)
- key = ChangeKey(self.connection_name, project.name,
- 'PullRequest', str(number),
- str(patchset))
- change = self._change_cache.get(key)
+ number = int(change_key.stable_id)
+ change = self._change_cache.get(change_key)
if change and not refresh:
return change
+ project = self.source.getProject(change_key.project_name)
if not change:
+ if not event:
+ self.log.error("Change %s not found in cache and no event",
+ change_key)
change = PullRequest(project.name)
change.project = project
change.number = number
- change.patchset = patchset
+ change.patchset = change_key.revision
# This can be called multi-threaded during github event
# preprocessing. In order to avoid data races perform locking
# by cached key. Try to acquire the lock non-blocking at first.
# If the lock is already taken we're currently updating the very
# same chnange right now and would likely get the same data again.
- lock = self._change_update_lock.setdefault(key, threading.Lock())
+ lock = self._change_update_lock.setdefault(change_key,
+ threading.Lock())
if lock.acquire(blocking=False):
try:
pull = self.getPull(change.project.name, change.number,
@@ -1346,11 +1340,11 @@ class GithubConnection(ZKChangeCacheMixin, ZKBranchCacheMixin, BaseConnection):
self._updateChange(c, event, pull)
change = self._change_cache.updateChangeWithRetry(
- key, change, _update_change)
+ change_key, change, _update_change)
finally:
# We need to remove the lock here again so we don't leak
# them.
- del self._change_update_lock[key]
+ del self._change_update_lock[change_key]
lock.release()
else:
# We didn't get the lock so we don't need to update the same
@@ -1363,75 +1357,81 @@ class GithubConnection(ZKChangeCacheMixin, ZKBranchCacheMixin, BaseConnection):
log.debug('Finished updating change %s', change)
return change
- def _getTag(self, project, event, refresh=False):
- tag = event.ref[len('refs/tags/'):]
- key = ChangeKey(self.connection_name, project.name,
- 'Tag', tag, event.newrev)
- change = self._change_cache.get(key)
+ def _getTag(self, change_key, refresh=False, event=None):
+ tag = change_key.stable_id
+ change = self._change_cache.get(change_key)
if change:
if refresh:
self._change_cache.updateChangeWithRetry(
- key, change, lambda c: None)
+ change_key, change, lambda c: None)
return change
+ if not event:
+ self.log.error("Change %s not found in cache and no event",
+ change_key)
+ project = self.source.getProject(change_key.project_name)
change = Tag(project)
change.tag = tag
- change.ref = event.ref
- change.oldrev = event.oldrev
- change.newrev = event.newrev
+ change.ref = f'refs/tags/{tag}'
+ change.oldrev = change_key.oldrev
+ change.newrev = change_key.newrev
# Build the url pointing to this tag/release on GitHub.
- change.url = self.getGitwebUrl(project, sha=event.newrev, tag=tag)
+ change.url = self.getGitwebUrl(project, sha=change.newrev, tag=tag)
if hasattr(event, 'commits'):
change.files = self.getPushedFileNames(event)
try:
- self._change_cache.set(key, change)
+ self._change_cache.set(change_key, change)
except ConcurrentUpdateError:
- change = self._change_cache.get(key)
+ change = self._change_cache.get(change_key)
return change
- def _getBranch(self, project, event, refresh=False):
- branch = event.ref[len('refs/heads/'):]
- key = ChangeKey(self.connection_name, project.name,
- 'Branch', branch, event.newrev)
- change = self._change_cache.get(key)
+ def _getBranch(self, change_key, refresh=False, event=None):
+ branch = change_key.stable_id
+ change = self._change_cache.get(change_key)
if change:
if refresh:
self._change_cache.updateChangeWithRetry(
- key, change, lambda c: None)
+ change_key, change, lambda c: None)
return change
+ if not event:
+ self.log.error("Change %s not found in cache and no event",
+ change_key)
+ project = self.source.getProject(change_key.project_name)
change = Branch(project)
change.branch = branch
- change.ref = event.ref
- change.oldrev = event.oldrev
- change.newrev = event.newrev
- change.url = self.getGitwebUrl(project, sha=event.newrev)
+ change.ref = f'refs/heads/{branch}'
+ change.oldrev = change_key.oldrev
+ change.newrev = change_key.newrev
+ change.url = self.getGitwebUrl(project, sha=change.newrev)
if hasattr(event, 'commits'):
change.files = self.getPushedFileNames(event)
try:
- self._change_cache.set(key, change)
+ self._change_cache.set(change_key, change)
except ConcurrentUpdateError:
- change = self._change_cache.get(key)
+ change = self._change_cache.get(change_key)
return change
- def _getRef(self, project, event, refresh=False):
- key = ChangeKey(self.connection_name, project.name,
- 'Ref', event.ref, event.newrev)
- change = self._change_cache.get(key)
+ def _getRef(self, change_key, refresh=False, event=None):
+ change = self._change_cache.get(change_key)
if change:
if refresh:
self._change_cache.updateChangeWithRetry(
- key, change, lambda c: None)
+ change_key, change, lambda c: None)
return change
+ if not event:
+ self.log.error("Change %s not found in cache and no event",
+ change_key)
+ project = self.source.getProject(change_key.project_name)
change = Ref(project)
- change.ref = event.ref
- change.oldrev = event.oldrev
- change.newrev = event.newrev
- change.url = self.getGitwebUrl(project, sha=event.newrev)
+ change.ref = change_key.stable_id
+ change.oldrev = change_key.oldrev
+ change.newrev = change_key.newrev
+ change.url = self.getGitwebUrl(project, sha=change.newrev)
if hasattr(event, 'commits'):
change.files = self.getPushedFileNames(event)
try:
- self._change_cache.set(key, change)
+ self._change_cache.set(change_key, change)
except ConcurrentUpdateError:
- change = self._change_cache.get(key)
+ change = self._change_cache.get(change_key)
return change
def getChangesDependingOn(self, change, projects, tenant):
@@ -1491,8 +1491,9 @@ class GithubConnection(ZKChangeCacheMixin, ZKBranchCacheMixin, BaseConnection):
for key in keys:
(proj, num, sha) = key
- project = self.source.getProject(proj)
- change = self._getChange(project, int(num), patchset=sha)
+ dep_change_key = ChangeKey(self.connection_name, proj,
+ 'PullRequest', str(num), str(sha))
+ change = self._getChange(dep_change_key)
changes.append(change)
return changes
@@ -1770,10 +1771,11 @@ class GithubConnection(ZKChangeCacheMixin, ZKBranchCacheMixin, BaseConnection):
cached_pr_numbers = self._sha_pr_cache.get(project_name, sha)
if len(cached_pr_numbers) > 1:
raise Exception('Multiple pulls found with head sha %s' % sha)
- project = self.getProject(project_name)
if len(cached_pr_numbers) == 1:
for pr in cached_pr_numbers:
- pr_body = self._getChange(project, pr, sha, event=event).pr
+ pr_change_key = ChangeKey(self.connection_name, project_name,
+ 'PullRequest', str(pr), str(sha))
+ pr_body = self._getChange(pr_change_key, event=event).pr
return pr_body
github = self.getGithubClient(project_name, zuul_event_id=event)
@@ -1789,8 +1791,10 @@ class GithubConnection(ZKChangeCacheMixin, ZKBranchCacheMixin, BaseConnection):
# with multiple pulls found.
found_pr_body = None
for item in issues:
- pr_body = self._getChange(
- project, item.issue.number, sha, event=event).pr
+ pr_change_key = ChangeKey(self.connection_name, project_name,
+ 'PullRequest', str(item.issue.number),
+ str(sha))
+ pr_body = self._getChange(pr_change_key, event=event).pr
self._sha_pr_cache.update(project_name, pr_body)
if pr_body['head']['sha'] == sha:
if found_pr_body:
diff --git a/zuul/driver/github/githubsource.py b/zuul/driver/github/githubsource.py
index 8c08a8b13..ebf3abae0 100644
--- a/zuul/driver/github/githubsource.py
+++ b/zuul/driver/github/githubsource.py
@@ -22,6 +22,7 @@ from zuul.source import BaseSource
from zuul.model import Project
from zuul.driver.github.githubmodel import GithubRefFilter
from zuul.driver.util import scalar_or_list, to_list
+from zuul.zk.change_cache import ChangeKey
class GithubSource(BaseSource):
@@ -64,8 +65,31 @@ class GithubSource(BaseSource):
"""Called after configuration has been processed."""
pass
- def getChange(self, event, refresh=False):
- return self.connection.getChange(event, refresh)
+ def getChangeKey(self, event):
+ connection_name = self.connection.connection_name
+ if event.change_number:
+ return ChangeKey(connection_name, event.project_name,
+ 'PullRequest',
+ str(event.change_number),
+ str(event.patch_number))
+ revision = f'{event.oldrev}..{event.newrev}'
+ if event.ref and event.ref.startswith('refs/tags/'):
+ tag = event.ref[len('refs/tags/'):]
+ return ChangeKey(connection_name, event.project_name,
+ 'Tag', tag, revision)
+ if event.ref and event.ref.startswith('refs/heads/'):
+ branch = event.ref[len('refs/heads/'):]
+ return ChangeKey(connection_name, event.project_name,
+ 'Branch', branch, revision)
+ if event.ref:
+ return ChangeKey(connection_name, event.project_name,
+ 'Ref', event.ref, revision)
+
+ self.log.warning("Unable to format change key for %s" % (self,))
+
+ def getChange(self, change_key, refresh=False, event=None):
+ return self.connection.getChange(change_key, refresh=refresh,
+ event=event)
change_re = re.compile(r"/(.*?)/(.*?)/pull/(\d+)[\w]*")
@@ -88,16 +112,13 @@ class GithubSource(BaseSource):
if not pull:
return None
proj = pull.get('base').get('repo').get('full_name')
- project = self.getProject(proj)
- change = self.connection._getChange(
- project, num,
- patchset=pull.get('head').get('sha'),
- event=event)
+ change_key = ChangeKey(self.connection.connection_name, proj,
+ 'PullRequest',
+ str(num),
+ pull.get('head').get('sha'))
+ change = self.connection._getChange(change_key, event=event)
return change
- def getChangeByKey(self, key):
- return self.connection.getChangeByKey(key)
-
def getChangesDependingOn(self, change, projects, tenant):
return self.connection.getChangesDependingOn(change, projects, tenant)
diff --git a/zuul/driver/gitlab/gitlabconnection.py b/zuul/driver/gitlab/gitlabconnection.py
index 88a35d65a..bf8ed6250 100644
--- a/zuul/driver/gitlab/gitlabconnection.py
+++ b/zuul/driver/gitlab/gitlabconnection.py
@@ -41,7 +41,6 @@ from zuul.driver.gitlab.gitlabmodel import GitlabTriggerEvent, MergeRequest
from zuul.zk.branch_cache import BranchCache
from zuul.zk.change_cache import (
AbstractChangeCache,
- ChangeKey,
ConcurrentUpdateError,
)
from zuul.zk.event_queues import ConnectionEventQueue
@@ -237,12 +236,8 @@ class GitlabEventConnector(threading.Thread):
event.timestamp = timestamp
event.project_hostname = self.connection.canonical_hostname
if event.change_number:
- project = self.connection.source.getProject(event.project_name)
- self.connection._getChange(project,
- event.change_number,
- event.patch_number,
- refresh=True,
- url=event.change_url,
+ change_key = self.connection.source.getChangeKey(event)
+ self.connection._getChange(change_key, refresh=True,
event=event)
# If this event references a branch and we're excluding
@@ -562,36 +557,39 @@ class GitlabConnection(ZKChangeCacheMixin, ZKBranchCacheMixin, BaseConnection):
project.name)
return cloneurl
- def getChange(self, event, refresh=False):
- project = self.source.getProject(event.project_name)
- if event.change_number:
+ def getChange(self, change_key, refresh=False, event=None):
+ if change_key.connection_name != self.connection_name:
+ return None
+ if change_key.change_type == 'MergeRequest':
self.log.info("Getting change for %s#%s" % (
- project, event.change_number))
- change = self._getChange(
- project, event.change_number, event.patch_number,
- refresh=refresh, event=event)
+ change_key.project_name, change_key.stable_id))
+ change = self._getChange(change_key,
+ refresh=refresh, event=event)
else:
self.log.info("Getting change for %s ref:%s" % (
- project, event.ref))
- change = self._getNonMRRef(project, event)
+ change_key.project_name, change_key.stable_id))
+ change = self._getNonMRRef(change_key, event=event)
return change
- def _getChange(self, project, number, patch_number=None,
- refresh=False, url=None, event=None):
+ def _getChange(self, change_key, refresh=False, event=None):
log = get_annotated_logger(self.log, event)
- key = ChangeKey(self.connection_name, project.name,
- 'MergeRequest', str(number),
- str(patch_number))
- change = self._change_cache.get(key)
+ number = int(change_key.stable_id)
+ change = self._change_cache.get(change_key)
if change and not refresh:
- log.debug("Getting change from cache %s" % str(key))
+ log.debug("Getting change from cache %s" % str(change_key))
return change
+ project = self.source.getProject(change_key.project_name)
if not change:
+ if not event:
+ self.log.error("Change %s not found in cache and no event",
+ change_key)
+ if event:
+ url = event.change_url
change = MergeRequest(project.name)
change.project = project
change.number = number
# patch_number is the tips commit SHA of the MR
- change.patchset = patch_number
+ change.patchset = change_key.revision
change.url = url or self.getMRUrl(project.name, number)
change.uris = [change.url.split('://', 1)[-1]] # remove scheme
@@ -603,10 +601,46 @@ class GitlabConnection(ZKChangeCacheMixin, ZKBranchCacheMixin, BaseConnection):
def _update_change(c):
self._updateChange(c, event, mr)
- change = self._change_cache.updateChangeWithRetry(key, change,
+ change = self._change_cache.updateChangeWithRetry(change_key, change,
_update_change)
return change
+ def _getNonMRRef(self, change_key, refresh=False, event=None):
+ change = self._change_cache.get(change_key)
+ if change:
+ if refresh:
+ self._change_cache.updateChangeWithRetry(
+ change_key, change, lambda c: None)
+ return change
+ if not event:
+ self.log.error("Change %s not found in cache and no event",
+ change_key)
+ project = self.source.getProject(change_key.project_name)
+ if change_key.change_type == 'Tag':
+ change = Tag(project)
+ tag = change_key.stable_id
+ change.tag = tag
+ change.ref = f'refs/tags/{tag}'
+ elif change_key.change_type == 'Branch':
+ branch = change_key.stable_id
+ change = Branch(project)
+ change.branch = branch
+ change.ref = f'refs/heads/{branch}'
+ else:
+ change = Ref(project)
+ change.ref = change_key.stable_id
+ change.oldrev = change_key.oldrev
+ change.newrev = change_key.newrev
+ change.url = self.getGitwebUrl(project, sha=change.newrev)
+ # Explicitly set files to None and let the pipelines processor
+ # call the merger asynchronuously
+ change.files = None
+ try:
+ self._change_cache.set(change_key, change)
+ except ConcurrentUpdateError:
+ change = self._change_cache.get(change_key)
+ return change
+
def _updateChange(self, change, event, mr):
log = get_annotated_logger(self.log, event)
change.mr = mr
@@ -632,36 +666,6 @@ class GitlabConnection(ZKChangeCacheMixin, ZKBranchCacheMixin, BaseConnection):
log.info("Updated change from Gitlab %s" % change)
return change
- def _getNonMRRef(self, project, event, refresh=False):
- key = ChangeKey(self.connection_name, project.name,
- 'Ref', event.ref, event.newrev)
- change = self._change_cache.get(key)
- if change:
- if refresh:
- self._change_cache.updateChangeWithRetry(
- key, change, lambda c: None)
- return change
- if event.ref and event.ref.startswith('refs/tags/'):
- change = Tag(project)
- change.tag = event.tag
- elif event.ref and event.ref.startswith('refs/heads/'):
- change = Branch(project)
- change.branch = event.branch
- else:
- change = Ref(project)
- change.ref = event.ref
- change.oldrev = event.oldrev
- change.newrev = event.newrev
- change.url = self.getGitwebUrl(project, sha=event.newrev)
- # Explicitly set files to None and let the pipelines processor
- # call the merger asynchronuously
- change.files = None
- try:
- self._change_cache.set(key, change)
- except ConcurrentUpdateError:
- change = self._change_cache.get(key)
- return change
-
def canMerge(self, change, allow_needs, event=None):
log = get_annotated_logger(self.log, event)
can_merge = True if change.merge_status == "can_be_merged" else False
diff --git a/zuul/driver/gitlab/gitlabsource.py b/zuul/driver/gitlab/gitlabsource.py
index bda18736a..67bcd41d9 100644
--- a/zuul/driver/gitlab/gitlabsource.py
+++ b/zuul/driver/gitlab/gitlabsource.py
@@ -18,9 +18,9 @@ import urllib
from zuul.model import Project
from zuul.source import BaseSource
-
from zuul.driver.gitlab.gitlabmodel import GitlabRefFilter
from zuul.driver.util import scalar_or_list, to_list
+from zuul.zk.change_cache import ChangeKey
class GitlabSource(BaseSource):
@@ -57,8 +57,30 @@ class GitlabSource(BaseSource):
"""Called after configuration has been processed."""
raise NotImplementedError()
- def getChange(self, event, refresh=False):
- return self.connection.getChange(event, refresh)
+ def getChangeKey(self, event):
+ connection_name = self.connection.connection_name
+ if event.change_number:
+ return ChangeKey(connection_name, event.project_name,
+ 'MergeRequest',
+ str(event.change_number),
+ str(event.patch_number))
+ revision = f'{event.oldrev}..{event.newrev}'
+ if event.ref and event.ref.startswith('refs/tags/'):
+ tag = event.ref[len('refs/tags/'):]
+ return ChangeKey(connection_name, event.project_name,
+ 'Tag', tag, revision)
+ if event.ref and event.ref.startswith('refs/heads/'):
+ branch = event.ref[len('refs/heads/'):]
+ return ChangeKey(connection_name, event.project_name,
+ 'Branch', branch, revision)
+ if event.ref:
+ return ChangeKey(connection_name, event.project_name,
+ 'Ref', event.ref, revision)
+ self.log.warning("Unable to format change key for %s" % (self,))
+
+ def getChange(self, change_key, refresh=False, event=None):
+ return self.connection.getChange(change_key, refresh=refresh,
+ event=event)
def getChangeByURL(self, url, event):
try:
@@ -76,15 +98,12 @@ class GitlabSource(BaseSource):
mr = self.connection.getMR(project_name, num)
if not mr:
return None
- project = self.getProject(project_name)
- change = self.connection._getChange(
- project, num, mr['sha'], url=url,
- event=event)
+ change_key = ChangeKey(self.connection.connection_name, project_name,
+ 'MergeRequest',
+ str(num), mr['sha'])
+ change = self.connection._getChange(change_key, event=event)
return change
- def getChangeByKey(self, key):
- return self.connection.getChangeByKey(key)
-
def getChangesDependingOn(self, change, projects, tenant):
return self.connection.getChangesDependingOn(
change, projects, tenant)
diff --git a/zuul/driver/pagure/pagureconnection.py b/zuul/driver/pagure/pagureconnection.py
index c54546e76..4fba2a27e 100644
--- a/zuul/driver/pagure/pagureconnection.py
+++ b/zuul/driver/pagure/pagureconnection.py
@@ -33,7 +33,6 @@ from zuul.lib import dependson
from zuul.zk.branch_cache import BranchCache
from zuul.zk.change_cache import (
AbstractChangeCache,
- ChangeKey,
ConcurrentUpdateError,
)
from zuul.zk.event_queues import ConnectionEventQueue
@@ -212,12 +211,8 @@ class PagureEventConnector(threading.Thread):
if event:
event.timestamp = timestamp
if event.change_number:
- project = self.connection.source.getProject(event.project_name)
- self.connection._getChange(project,
- event.change_number,
- event.patch_number,
- refresh=True,
- url=event.change_url,
+ change_key = self.connection.source.getChangeKey(event)
+ self.connection._getChange(change_key, refresh=True,
event=event)
event.project_hostname = self.connection.canonical_hostname
self.connection.logEvent(event)
@@ -600,81 +595,91 @@ class PagureConnection(ZKChangeCacheMixin, ZKBranchCacheMixin, BaseConnection):
def getGitUrl(self, project):
return '%s/%s' % (self.cloneurl, project.name)
- def getChange(self, event, refresh=False):
- project = self.source.getProject(event.project_name)
- if event.change_number:
+ def getChange(self, change_key, refresh=False, event=None):
+ if change_key.connection_name != self.connection_name:
+ return None
+ if change_key.change_type == 'PullRequest':
self.log.info("Getting change for %s#%s" % (
- project, event.change_number))
- change = self._getChange(
- project, event.change_number, event.patch_number,
- refresh=refresh, event=event)
+ change_key.project_name, change_key.stable_id))
+ change = self._getChange(change_key,
+ refresh=refresh, event=event)
else:
self.log.info("Getting change for %s ref:%s" % (
- project, event.ref))
- change = self._getNonPRRef(project, event, refresh=refresh)
+ change_key.project_name, change_key.stable_id))
+ change = self._getNonPRRef(change_key, event=event)
return change
- def _getChange(self, project, number, patchset=None,
- refresh=False, url=None, event=None):
- key = ChangeKey(self.connection_name, project.name,
- 'PullRequest', str(number),
- str(patchset))
- change = self._change_cache.get(key)
+ def _getChange(self, change_key, refresh=False, event=None):
+ log = get_annotated_logger(self.log, event)
+ number = int(change_key.stable_id)
+ change = self._change_cache.get(change_key)
if change and not refresh:
- self.log.debug("Getting change from cache %s" % str(key))
+ log.debug("Getting change from cache %s" % str(change_key))
return change
+ project = self.source.getProject(change_key.project_name)
if not change:
+ if not event:
+ self.log.error("Change %s not found in cache and no event",
+ change_key)
+ if event:
+ url = event.change_url
change = PullRequest(project.name)
change.project = project
change.number = number
# patchset is the tips commit of the PR
- change.patchset = patchset
- change.url = url
+ change.patchset = change_key.revision
+ change.url = url or self.getPullUrl(project.name, number)
change.uris = [
- '%s/%s/pull/%s' % (self.baseurl, project, number),
+ '%s/%s/pull/%s' % (self.baseurl, project.name, number),
]
- self.log.debug("Getting change pr#%s from project %s" % (
+ log.debug("Getting change pr#%s from project %s" % (
number, project.name))
- self.log.info("Updating change from pagure %s" % change)
+ log.info("Updating change from pagure %s" % change)
pull = self.getPull(change.project.name, change.number)
def _update_change(c):
self._updateChange(c, event, pull)
- change = self._change_cache.updateChangeWithRetry(key, change,
+ change = self._change_cache.updateChangeWithRetry(change_key, change,
_update_change)
return change
- def _getNonPRRef(self, project, event, refresh=False):
- key = ChangeKey(self.connection_name, project.name,
- 'Ref', event.ref, event.newrev)
- change = self._change_cache.get(key)
+ def _getNonPRRef(self, change_key, refresh=False, event=None):
+ change = self._change_cache.get(change_key)
if change:
if refresh:
self._change_cache.updateChangeWithRetry(
- key, change, lambda: None)
+ change_key, change, lambda: None)
return change
- if event.ref and event.ref.startswith('refs/tags/'):
+ if not event:
+ self.log.error("Change %s not found in cache and no event",
+ change_key)
+ project = self.source.getProject(change_key.project_name)
+ if change_key.change_type == 'Tag':
change = Tag(project)
- change.tag = event.tag
- elif event.ref and event.ref.startswith('refs/heads/'):
+ tag = change_key.stable_id
+ change.tag = tag
+ change.ref = f'refs/tags/{tag}'
+ elif change_key.change_type == 'Branch':
+ branch = change_key.stable_id
change = Branch(project)
- change.branch = event.branch
+ change.branch = branch
+ change.ref = f'refs/heads/{branch}'
else:
change = Ref(project)
- change.ref = event.ref
- change.oldrev = event.oldrev
- change.newrev = event.newrev
- change.url = self.getGitwebUrl(project, sha=event.newrev)
+ change.ref = change_key.stable_id
+ change.oldrev = change_key.oldrev
+ change.newrev = change_key.newrev
+ change.url = self.getGitwebUrl(project, sha=change.newrev)
# Pagure does not send files details in the git-receive event.
# Explicitly set files to None and let the pipelines processor
# call the merger asynchronuously
change.files = None
try:
- self._change_cache.set(key, change)
+ self._change_cache.set(change_key, change)
except ConcurrentUpdateError:
- change = self._change_cache.get(key)
+ change = self._change_cache.get(change_key)
return change
def _hasRequiredStatusChecks(self, change):
diff --git a/zuul/driver/pagure/paguresource.py b/zuul/driver/pagure/paguresource.py
index 25d7533d7..b97df867e 100644
--- a/zuul/driver/pagure/paguresource.py
+++ b/zuul/driver/pagure/paguresource.py
@@ -18,9 +18,9 @@ import logging
from zuul.source import BaseSource
from zuul.model import Project
-
from zuul.driver.pagure.paguremodel import PagureRefFilter
from zuul.driver.util import scalar_or_list, to_list
+from zuul.zk.change_cache import ChangeKey
class PagureSource(BaseSource):
@@ -61,8 +61,30 @@ class PagureSource(BaseSource):
"""Called after configuration has been processed."""
raise NotImplementedError()
- def getChange(self, event, refresh=False):
- return self.connection.getChange(event, refresh)
+ def getChangeKey(self, event):
+ connection_name = self.connection.connection_name
+ if event.change_number:
+ return ChangeKey(connection_name, event.project_name,
+ 'PullRequest',
+ str(event.change_number),
+ str(event.patch_number))
+ revision = f'{event.oldrev}..{event.newrev}'
+ if event.ref and event.ref.startswith('refs/tags/'):
+ tag = event.ref[len('refs/tags/'):]
+ return ChangeKey(connection_name, event.project_name,
+ 'Tag', tag, revision)
+ if event.ref and event.ref.startswith('refs/heads/'):
+ branch = event.ref[len('refs/heads/'):]
+ return ChangeKey(connection_name, event.project_name,
+ 'Branch', branch, revision)
+ if event.ref:
+ return ChangeKey(connection_name, event.project_name,
+ 'Ref', event.ref, revision)
+ self.log.warning("Unable to format change key for %s" % (self,))
+
+ def getChange(self, change_key, refresh=False, event=None):
+ return self.connection.getChange(change_key, refresh=refresh,
+ event=event)
def getChangeByURL(self, url, event):
try:
@@ -80,17 +102,12 @@ class PagureSource(BaseSource):
pull = self.connection.getPull(project_name, num, event=event)
if not pull:
return None
- project = self.getProject(project_name)
- change = self.connection._getChange(
- project, num,
- patchset=pull.get('commit_stop'),
- url=url,
- event=event)
+ change_key = ChangeKey(self.connection.connection_name, project_name,
+ 'PullRequest',
+ str(num), pull.get('commit_stop'))
+ change = self.connection._getChange(change_key, event=event)
return change
- def getChangeByKey(self, key):
- return self.connection.getChangeByKey(key)
-
def getChangesDependingOn(self, change, projects, tenant):
return self.connection.getChangesDependingOn(
change, projects, tenant)
diff --git a/zuul/driver/timer/__init__.py b/zuul/driver/timer/__init__.py
index e0e3c3aa9..32d875a38 100644
--- a/zuul/driver/timer/__init__.py
+++ b/zuul/driver/timer/__init__.py
@@ -180,8 +180,10 @@ class TimerDriver(Driver, TriggerInterface):
event.timestamp = time.time()
# Refresh the branch in order to update the item in the
# change cache.
+ change_key = project.source.getChangeKey(event)
with self.project_update_locks[project.canonical_name]:
- project.source.getChange(event, refresh=True)
+ project.source.getChange(change_key, refresh=True,
+ event=event)
log = get_annotated_logger(self.log, event)
log.debug("Adding event")
self.sched.addTriggerEvent(self.name, event)