diff options
author | Jenkins <jenkins@review.openstack.org> | 2017-08-03 22:16:09 +0000 |
---|---|---|
committer | Gerrit Code Review <review@openstack.org> | 2017-08-03 22:16:09 +0000 |
commit | b1439a554fb9b1922e411e6a71f9fed42baf2fee (patch) | |
tree | 04ac99122b4898f0f77a0e71c466c97750bee1ef /zuul | |
parent | 66ed17def4a607372ce8454b12dc50372d23bdd4 (diff) | |
parent | 289f5930facc76a9d5f3328a3c0a11be59eca596 (diff) | |
download | zuul-b1439a554fb9b1922e411e6a71f9fed42baf2fee.tar.gz |
Merge "Ensure ref-updated jobs run with their ref" into feature/zuulv3
Diffstat (limited to 'zuul')
-rw-r--r-- | zuul/executor/server.py | 16 | ||||
-rw-r--r-- | zuul/manager/__init__.py | 24 | ||||
-rw-r--r-- | zuul/manager/independent.py | 3 | ||||
-rw-r--r-- | zuul/merger/client.py | 5 | ||||
-rw-r--r-- | zuul/merger/merger.py | 72 | ||||
-rw-r--r-- | zuul/merger/server.py | 12 |
6 files changed, 112 insertions, 20 deletions
diff --git a/zuul/executor/server.py b/zuul/executor/server.py index b16611154..8d23cb77c 100644 --- a/zuul/executor/server.py +++ b/zuul/executor/server.py @@ -625,6 +625,7 @@ class ExecutorServer(object): self.hostname) self.merger_worker.registerFunction("merger:merge") self.merger_worker.registerFunction("merger:cat") + self.merger_worker.registerFunction("merger:refstate") def stop(self): self.log.debug("Stopping") @@ -721,6 +722,9 @@ class ExecutorServer(object): elif job.name == 'merger:merge': self.log.debug("Got merge job: %s" % job.unique) self.merge(job) + elif job.name == 'merger:refstate': + self.log.debug("Got refstate job: %s" % job.unique) + self.refstate(job) else: self.log.error("Unable to handle job %s" % job.name) job.sendWorkFail() @@ -800,6 +804,14 @@ class ExecutorServer(object): files=files) job.sendWorkComplete(json.dumps(result)) + def refstate(self, job): + args = json.loads(job.arguments) + with self.merger_lock: + success, repo_state = self.merger.getRepoState(args['items']) + result = dict(updated=success, + repo_state=repo_state) + job.sendWorkComplete(json.dumps(result)) + def merge(self, job): args = json.loads(job.arguments) with self.merger_lock: @@ -954,6 +966,10 @@ class AnsibleJob(object): # a work complete result, don't run any jobs return + state_items = [i for i in args['items'] if not i.get('number')] + if state_items: + merger.setRepoState(state_items, args['repo_state']) + for project in args['projects']: repo = repos[project['canonical_name']] # If this project is the Zuul project and this is a ref diff --git a/zuul/manager/__init__.py b/zuul/manager/__init__.py index dfb3238a2..8282f86a4 100644 --- a/zuul/manager/__init__.py +++ b/zuul/manager/__init__.py @@ -13,6 +13,7 @@ import logging from zuul import exceptions +from zuul import model class DynamicChangeQueueContextManager(object): @@ -483,20 +484,18 @@ class PipelineManager(object): def scheduleMerge(self, item, files=None, dirs=None): build_set = item.current_build_set - if not hasattr(item.change, 'branch'): - self.log.debug("Change %s does not have an associated branch, " - "not scheduling a merge job for item %s" % - (item.change, item)) - build_set.merge_state = build_set.COMPLETE - return True - self.log.debug("Scheduling merge for item %s (files: %s, dirs: %s)" % (item, files, dirs)) build_set = item.current_build_set build_set.merge_state = build_set.PENDING - self.sched.merger.mergeChanges(build_set.merger_items, - item.current_build_set, files, dirs, - precedence=self.pipeline.precedence) + if isinstance(item.change, model.Change): + self.sched.merger.mergeChanges(build_set.merger_items, + item.current_build_set, files, dirs, + precedence=self.pipeline.precedence) + else: + self.sched.merger.getRepoState(build_set.merger_items, + item.current_build_set, + precedence=self.pipeline.precedence) return False def prepareItem(self, item): @@ -675,12 +674,13 @@ class PipelineManager(object): build_set = event.build_set item = build_set.item build_set.merge_state = build_set.COMPLETE + build_set.repo_state = event.repo_state if event.merged: build_set.commit = event.commit build_set.files.setFiles(event.files) - build_set.repo_state = event.repo_state elif event.updated: - build_set.commit = item.change.newrev + build_set.commit = (item.change.newrev or + '0000000000000000000000000000000000000000') if not build_set.commit: self.log.info("Unable to merge change %s" % item.change) item.setUnableToMerge() diff --git a/zuul/manager/independent.py b/zuul/manager/independent.py index 06c9a01a1..7b0a9f53c 100644 --- a/zuul/manager/independent.py +++ b/zuul/manager/independent.py @@ -44,6 +44,9 @@ class IndependentPipelineManager(PipelineManager): if hasattr(change, 'number'): history = history or [] history.append(change.number) + else: + # Don't enqueue dependencies ahead of a non-change ref. + return True ret = self.checkForChangesNeededBy(change, change_queue) if ret in [True, False]: diff --git a/zuul/merger/client.py b/zuul/merger/client.py index dd9c8d551..5191a44f9 100644 --- a/zuul/merger/client.py +++ b/zuul/merger/client.py @@ -116,6 +116,11 @@ class MergeClient(object): repo_state=repo_state) self.submitJob('merger:merge', data, build_set, precedence) + def getRepoState(self, items, build_set, + precedence=zuul.model.PRECEDENCE_NORMAL): + data = dict(items=items) + self.submitJob('merger:refstate', data, build_set, precedence) + def getFiles(self, connection_name, project_name, branch, files, dirs=[], precedence=zuul.model.PRECEDENCE_HIGH): data = dict(connection=connection_name, diff --git a/zuul/merger/merger.py b/zuul/merger/merger.py index c5d1f2ad5..ed98696ec 100644 --- a/zuul/merger/merger.py +++ b/zuul/merger/merger.py @@ -20,6 +20,8 @@ import logging import zuul.model +NULL_REF = '0000000000000000000000000000000000000000' + def reset_repo_to_head(repo): # This lets us reset the repo even if there is a file in the root @@ -178,8 +180,13 @@ class Repo(object): self.setRef(path, hexsha, repo) unseen.discard(path) for path in unseen: - self.log.debug("Delete reference %s", path) - git.refs.SymbolicReference.delete(repo, ref.path) + self.deleteRef(path, repo) + + def deleteRef(self, path, repo=None): + if repo is None: + repo = self.createRepoObject() + self.log.debug("Delete reference %s", path) + git.refs.SymbolicReference.delete(repo, path) def checkout(self, ref): repo = self.createRepoObject() @@ -369,6 +376,16 @@ class Merger(object): recent[key] = ref.object project[ref.path] = ref.object.hexsha + def _alterRepoState(self, connection_name, project_name, + repo_state, path, hexsha): + projects = repo_state.setdefault(connection_name, {}) + project = projects.setdefault(project_name, {}) + if hexsha == NULL_REF: + if path in project: + del project[path] + else: + project[path] = hexsha + def _restoreRepoState(self, connection_name, project_name, repo, repo_state): projects = repo_state.get(connection_name, {}) @@ -470,12 +487,8 @@ class Merger(object): if repo_state is None: repo_state = {} for item in items: - if item.get("number") and item.get("patchset"): - self.log.debug("Merging for change %s,%s." % - (item["number"], item["patchset"])) - elif item.get("newrev") and item.get("oldrev"): - self.log.debug("Merging for rev %s with oldrev %s." % - (item["newrev"], item["oldrev"])) + self.log.debug("Merging for change %s,%s" % + (item["number"], item["patchset"])) commit = self._mergeItem(item, recent, repo_state) if not commit: return None @@ -492,6 +505,49 @@ class Merger(object): ret_recent[k] = v.hexsha return commit.hexsha, read_files, repo_state, ret_recent + def setRepoState(self, items, repo_state): + # Sets the repo state for the items + seen = set() + for item in items: + repo = self.getRepo(item['connection'], item['project']) + key = (item['connection'], item['project'], item['branch']) + + if key in seen: + continue + + repo.reset() + self._restoreRepoState(item['connection'], item['project'], repo, + repo_state) + + def getRepoState(self, items): + # Gets the repo state for items. Generally this will be + # called in any non-change pipeline. We will return the repo + # state for each item, but manipulated with any information in + # the item (eg, if it creates a ref, that will be in the repo + # state regardless of the actual state). + seen = set() + recent = {} + repo_state = {} + for item in items: + repo = self.getRepo(item['connection'], item['project']) + key = (item['connection'], item['project'], item['branch']) + if key not in seen: + try: + repo.reset() + except Exception: + self.log.exception("Unable to reset repo %s" % repo) + return (False, {}) + + self._saveRepoState(item['connection'], item['project'], repo, + repo_state, recent) + + if item.get('newrev'): + # This is a ref update rather than a branch tip, so make sure + # our returned state includes this change. + self._alterRepoState(item['connection'], item['project'], + repo_state, item['ref'], item['newrev']) + return (True, repo_state) + def getFiles(self, connection_name, project_name, branch, files, dirs=[]): repo = self.getRepo(connection_name, project_name) return repo.getFiles(files, dirs, branch=branch) diff --git a/zuul/merger/server.py b/zuul/merger/server.py index c342e1ac8..fc599c117 100644 --- a/zuul/merger/server.py +++ b/zuul/merger/server.py @@ -58,6 +58,7 @@ class MergeServer(object): def register(self): self.worker.registerFunction("merger:merge") self.worker.registerFunction("merger:cat") + self.worker.registerFunction("merger:refstate") def stop(self): self.log.debug("Stopping") @@ -80,6 +81,9 @@ class MergeServer(object): elif job.name == 'merger:cat': self.log.debug("Got cat job: %s" % job.unique) self.cat(job) + elif job.name == 'merger:refstate': + self.log.debug("Got refstate job: %s" % job.unique) + self.refstate(job) else: self.log.error("Unable to handle job %s" % job.name) job.sendWorkFail() @@ -104,6 +108,14 @@ class MergeServer(object): recent) = ret job.sendWorkComplete(json.dumps(result)) + def refstate(self, job): + args = json.loads(job.arguments) + + success, repo_state = self.merger.getItemRepoState(args['items']) + result = dict(updated=success, + repo_state=repo_state) + job.sendWorkComplete(json.dumps(result)) + def cat(self, job): args = json.loads(job.arguments) self.merger.updateRepo(args['connection'], args['project']) |