summaryrefslogtreecommitdiff
path: root/zuul
diff options
context:
space:
mode:
authorJenkins <jenkins@review.openstack.org>2017-08-03 22:16:09 +0000
committerGerrit Code Review <review@openstack.org>2017-08-03 22:16:09 +0000
commitb1439a554fb9b1922e411e6a71f9fed42baf2fee (patch)
tree04ac99122b4898f0f77a0e71c466c97750bee1ef /zuul
parent66ed17def4a607372ce8454b12dc50372d23bdd4 (diff)
parent289f5930facc76a9d5f3328a3c0a11be59eca596 (diff)
downloadzuul-b1439a554fb9b1922e411e6a71f9fed42baf2fee.tar.gz
Merge "Ensure ref-updated jobs run with their ref" into feature/zuulv3
Diffstat (limited to 'zuul')
-rw-r--r--zuul/executor/server.py16
-rw-r--r--zuul/manager/__init__.py24
-rw-r--r--zuul/manager/independent.py3
-rw-r--r--zuul/merger/client.py5
-rw-r--r--zuul/merger/merger.py72
-rw-r--r--zuul/merger/server.py12
6 files changed, 112 insertions, 20 deletions
diff --git a/zuul/executor/server.py b/zuul/executor/server.py
index b16611154..8d23cb77c 100644
--- a/zuul/executor/server.py
+++ b/zuul/executor/server.py
@@ -625,6 +625,7 @@ class ExecutorServer(object):
self.hostname)
self.merger_worker.registerFunction("merger:merge")
self.merger_worker.registerFunction("merger:cat")
+ self.merger_worker.registerFunction("merger:refstate")
def stop(self):
self.log.debug("Stopping")
@@ -721,6 +722,9 @@ class ExecutorServer(object):
elif job.name == 'merger:merge':
self.log.debug("Got merge job: %s" % job.unique)
self.merge(job)
+ elif job.name == 'merger:refstate':
+ self.log.debug("Got refstate job: %s" % job.unique)
+ self.refstate(job)
else:
self.log.error("Unable to handle job %s" % job.name)
job.sendWorkFail()
@@ -800,6 +804,14 @@ class ExecutorServer(object):
files=files)
job.sendWorkComplete(json.dumps(result))
+ def refstate(self, job):
+ args = json.loads(job.arguments)
+ with self.merger_lock:
+ success, repo_state = self.merger.getRepoState(args['items'])
+ result = dict(updated=success,
+ repo_state=repo_state)
+ job.sendWorkComplete(json.dumps(result))
+
def merge(self, job):
args = json.loads(job.arguments)
with self.merger_lock:
@@ -954,6 +966,10 @@ class AnsibleJob(object):
# a work complete result, don't run any jobs
return
+ state_items = [i for i in args['items'] if not i.get('number')]
+ if state_items:
+ merger.setRepoState(state_items, args['repo_state'])
+
for project in args['projects']:
repo = repos[project['canonical_name']]
# If this project is the Zuul project and this is a ref
diff --git a/zuul/manager/__init__.py b/zuul/manager/__init__.py
index dfb3238a2..8282f86a4 100644
--- a/zuul/manager/__init__.py
+++ b/zuul/manager/__init__.py
@@ -13,6 +13,7 @@
import logging
from zuul import exceptions
+from zuul import model
class DynamicChangeQueueContextManager(object):
@@ -483,20 +484,18 @@ class PipelineManager(object):
def scheduleMerge(self, item, files=None, dirs=None):
build_set = item.current_build_set
- if not hasattr(item.change, 'branch'):
- self.log.debug("Change %s does not have an associated branch, "
- "not scheduling a merge job for item %s" %
- (item.change, item))
- build_set.merge_state = build_set.COMPLETE
- return True
-
self.log.debug("Scheduling merge for item %s (files: %s, dirs: %s)" %
(item, files, dirs))
build_set = item.current_build_set
build_set.merge_state = build_set.PENDING
- self.sched.merger.mergeChanges(build_set.merger_items,
- item.current_build_set, files, dirs,
- precedence=self.pipeline.precedence)
+ if isinstance(item.change, model.Change):
+ self.sched.merger.mergeChanges(build_set.merger_items,
+ item.current_build_set, files, dirs,
+ precedence=self.pipeline.precedence)
+ else:
+ self.sched.merger.getRepoState(build_set.merger_items,
+ item.current_build_set,
+ precedence=self.pipeline.precedence)
return False
def prepareItem(self, item):
@@ -675,12 +674,13 @@ class PipelineManager(object):
build_set = event.build_set
item = build_set.item
build_set.merge_state = build_set.COMPLETE
+ build_set.repo_state = event.repo_state
if event.merged:
build_set.commit = event.commit
build_set.files.setFiles(event.files)
- build_set.repo_state = event.repo_state
elif event.updated:
- build_set.commit = item.change.newrev
+ build_set.commit = (item.change.newrev or
+ '0000000000000000000000000000000000000000')
if not build_set.commit:
self.log.info("Unable to merge change %s" % item.change)
item.setUnableToMerge()
diff --git a/zuul/manager/independent.py b/zuul/manager/independent.py
index 06c9a01a1..7b0a9f53c 100644
--- a/zuul/manager/independent.py
+++ b/zuul/manager/independent.py
@@ -44,6 +44,9 @@ class IndependentPipelineManager(PipelineManager):
if hasattr(change, 'number'):
history = history or []
history.append(change.number)
+ else:
+ # Don't enqueue dependencies ahead of a non-change ref.
+ return True
ret = self.checkForChangesNeededBy(change, change_queue)
if ret in [True, False]:
diff --git a/zuul/merger/client.py b/zuul/merger/client.py
index dd9c8d551..5191a44f9 100644
--- a/zuul/merger/client.py
+++ b/zuul/merger/client.py
@@ -116,6 +116,11 @@ class MergeClient(object):
repo_state=repo_state)
self.submitJob('merger:merge', data, build_set, precedence)
+ def getRepoState(self, items, build_set,
+ precedence=zuul.model.PRECEDENCE_NORMAL):
+ data = dict(items=items)
+ self.submitJob('merger:refstate', data, build_set, precedence)
+
def getFiles(self, connection_name, project_name, branch, files, dirs=[],
precedence=zuul.model.PRECEDENCE_HIGH):
data = dict(connection=connection_name,
diff --git a/zuul/merger/merger.py b/zuul/merger/merger.py
index c5d1f2ad5..ed98696ec 100644
--- a/zuul/merger/merger.py
+++ b/zuul/merger/merger.py
@@ -20,6 +20,8 @@ import logging
import zuul.model
+NULL_REF = '0000000000000000000000000000000000000000'
+
def reset_repo_to_head(repo):
# This lets us reset the repo even if there is a file in the root
@@ -178,8 +180,13 @@ class Repo(object):
self.setRef(path, hexsha, repo)
unseen.discard(path)
for path in unseen:
- self.log.debug("Delete reference %s", path)
- git.refs.SymbolicReference.delete(repo, ref.path)
+ self.deleteRef(path, repo)
+
+ def deleteRef(self, path, repo=None):
+ if repo is None:
+ repo = self.createRepoObject()
+ self.log.debug("Delete reference %s", path)
+ git.refs.SymbolicReference.delete(repo, path)
def checkout(self, ref):
repo = self.createRepoObject()
@@ -369,6 +376,16 @@ class Merger(object):
recent[key] = ref.object
project[ref.path] = ref.object.hexsha
+ def _alterRepoState(self, connection_name, project_name,
+ repo_state, path, hexsha):
+ projects = repo_state.setdefault(connection_name, {})
+ project = projects.setdefault(project_name, {})
+ if hexsha == NULL_REF:
+ if path in project:
+ del project[path]
+ else:
+ project[path] = hexsha
+
def _restoreRepoState(self, connection_name, project_name, repo,
repo_state):
projects = repo_state.get(connection_name, {})
@@ -470,12 +487,8 @@ class Merger(object):
if repo_state is None:
repo_state = {}
for item in items:
- if item.get("number") and item.get("patchset"):
- self.log.debug("Merging for change %s,%s." %
- (item["number"], item["patchset"]))
- elif item.get("newrev") and item.get("oldrev"):
- self.log.debug("Merging for rev %s with oldrev %s." %
- (item["newrev"], item["oldrev"]))
+ self.log.debug("Merging for change %s,%s" %
+ (item["number"], item["patchset"]))
commit = self._mergeItem(item, recent, repo_state)
if not commit:
return None
@@ -492,6 +505,49 @@ class Merger(object):
ret_recent[k] = v.hexsha
return commit.hexsha, read_files, repo_state, ret_recent
+ def setRepoState(self, items, repo_state):
+ # Sets the repo state for the items
+ seen = set()
+ for item in items:
+ repo = self.getRepo(item['connection'], item['project'])
+ key = (item['connection'], item['project'], item['branch'])
+
+ if key in seen:
+ continue
+
+ repo.reset()
+ self._restoreRepoState(item['connection'], item['project'], repo,
+ repo_state)
+
+ def getRepoState(self, items):
+ # Gets the repo state for items. Generally this will be
+ # called in any non-change pipeline. We will return the repo
+ # state for each item, but manipulated with any information in
+ # the item (eg, if it creates a ref, that will be in the repo
+ # state regardless of the actual state).
+ seen = set()
+ recent = {}
+ repo_state = {}
+ for item in items:
+ repo = self.getRepo(item['connection'], item['project'])
+ key = (item['connection'], item['project'], item['branch'])
+ if key not in seen:
+ try:
+ repo.reset()
+ except Exception:
+ self.log.exception("Unable to reset repo %s" % repo)
+ return (False, {})
+
+ self._saveRepoState(item['connection'], item['project'], repo,
+ repo_state, recent)
+
+ if item.get('newrev'):
+ # This is a ref update rather than a branch tip, so make sure
+ # our returned state includes this change.
+ self._alterRepoState(item['connection'], item['project'],
+ repo_state, item['ref'], item['newrev'])
+ return (True, repo_state)
+
def getFiles(self, connection_name, project_name, branch, files, dirs=[]):
repo = self.getRepo(connection_name, project_name)
return repo.getFiles(files, dirs, branch=branch)
diff --git a/zuul/merger/server.py b/zuul/merger/server.py
index c342e1ac8..fc599c117 100644
--- a/zuul/merger/server.py
+++ b/zuul/merger/server.py
@@ -58,6 +58,7 @@ class MergeServer(object):
def register(self):
self.worker.registerFunction("merger:merge")
self.worker.registerFunction("merger:cat")
+ self.worker.registerFunction("merger:refstate")
def stop(self):
self.log.debug("Stopping")
@@ -80,6 +81,9 @@ class MergeServer(object):
elif job.name == 'merger:cat':
self.log.debug("Got cat job: %s" % job.unique)
self.cat(job)
+ elif job.name == 'merger:refstate':
+ self.log.debug("Got refstate job: %s" % job.unique)
+ self.refstate(job)
else:
self.log.error("Unable to handle job %s" % job.name)
job.sendWorkFail()
@@ -104,6 +108,14 @@ class MergeServer(object):
recent) = ret
job.sendWorkComplete(json.dumps(result))
+ def refstate(self, job):
+ args = json.loads(job.arguments)
+
+ success, repo_state = self.merger.getItemRepoState(args['items'])
+ result = dict(updated=success,
+ repo_state=repo_state)
+ job.sendWorkComplete(json.dumps(result))
+
def cat(self, job):
args = json.loads(job.arguments)
self.merger.updateRepo(args['connection'], args['project'])