diff options
author | Zuul <zuul@review.opendev.org> | 2019-05-20 21:45:53 +0000 |
---|---|---|
committer | Gerrit Code Review <review@openstack.org> | 2019-05-20 21:45:53 +0000 |
commit | b45b375554cf3583a93fdc723f1b67a1b473a767 (patch) | |
tree | 48fa162c7689f2fa8b88a9a77860971da7326e24 /zuul | |
parent | 049a864a2f70356b60b4645fb5d5b1a06456735a (diff) | |
parent | 7639053905f6c94f57b15793cbe2493229828cd6 (diff) | |
download | zuul-b45b375554cf3583a93fdc723f1b67a1b473a767.tar.gz |
Merge "Annotate merger logs with event id"
Diffstat (limited to 'zuul')
-rw-r--r-- | zuul/executor/server.py | 11 | ||||
-rw-r--r-- | zuul/lib/logutil.py | 10 | ||||
-rw-r--r-- | zuul/manager/__init__.py | 9 | ||||
-rw-r--r-- | zuul/merger/client.py | 64 | ||||
-rw-r--r-- | zuul/merger/merger.py | 329 | ||||
-rw-r--r-- | zuul/merger/server.py | 16 | ||||
-rw-r--r-- | zuul/scheduler.py | 1 |
7 files changed, 268 insertions, 172 deletions
diff --git a/zuul/executor/server.py b/zuul/executor/server.py index 317950cb0..e2000f291 100644 --- a/zuul/executor/server.py +++ b/zuul/executor/server.py @@ -2732,6 +2732,7 @@ class ExecutorServer(object): def fileschanges(self, job): args = json.loads(job.arguments) + zuul_event_id = args.get('zuul_event_id') task = self.update(args['connection'], args['project']) task.wait() lock = self.repo_locks.getRepoLock( @@ -2740,29 +2741,35 @@ class ExecutorServer(object): files = self.merger.getFilesChanges( args['connection'], args['project'], args['branch'], - args['tosha']) + args['tosha'], zuul_event_id=zuul_event_id) result = dict(updated=True, files=files) + result['zuul_event_id'] = zuul_event_id job.sendWorkComplete(json.dumps(result)) def refstate(self, job): args = json.loads(job.arguments) + zuul_event_id = args.get('zuul_event_id') success, repo_state = self.merger.getRepoState( args['items'], repo_locks=self.repo_locks) result = dict(updated=success, repo_state=repo_state) + result['zuul_event_id'] = zuul_event_id job.sendWorkComplete(json.dumps(result)) def merge(self, job): args = json.loads(job.arguments) + zuul_event_id = args.get('zuul_event_id') ret = self.merger.mergeChanges(args['items'], args.get('files'), args.get('dirs', []), args.get('repo_state'), - repo_locks=self.repo_locks) + repo_locks=self.repo_locks, + zuul_event_id=zuul_event_id) result = dict(merged=(ret is not None)) if ret is None: result['commit'] = result['files'] = result['repo_state'] = None else: (result['commit'], result['files'], result['repo_state'], recent, orig_commit) = ret + result['zuul_event_id'] = zuul_event_id job.sendWorkComplete(json.dumps(result)) diff --git a/zuul/lib/logutil.py b/zuul/lib/logutil.py index f87f45fe6..0bf70f887 100644 --- a/zuul/lib/logutil.py +++ b/zuul/lib/logutil.py @@ -18,7 +18,12 @@ from zuul.model import TriggerEvent def get_annotated_logger(logger, event, build=None): - extra = {} + # Note(tobiash): When running with python 3.5 log adapters cannot be + # stacked. We need to detect this case and modify the original one. + if isinstance(logger, EventIdLogAdapter): + extra = logger.extra + else: + extra = {} if event is not None: if isinstance(event, TriggerEvent): @@ -29,6 +34,9 @@ def get_annotated_logger(logger, event, build=None): if build is not None: extra['build'] = build + if isinstance(logger, EventIdLogAdapter): + return logger + return EventIdLogAdapter(logger, extra) diff --git a/zuul/manager/__init__.py b/zuul/manager/__init__.py index 08761da59..e2f2b9bfd 100644 --- a/zuul/manager/__init__.py +++ b/zuul/manager/__init__.py @@ -623,11 +623,13 @@ class PipelineManager(object): if isinstance(item.change, model.Change): self.sched.merger.mergeChanges(build_set.merger_items, item.current_build_set, files, dirs, - precedence=self.pipeline.precedence) + precedence=self.pipeline.precedence, + event=item.event) else: self.sched.merger.getRepoState(build_set.merger_items, item.current_build_set, - precedence=self.pipeline.precedence) + precedence=self.pipeline.precedence, + event=item.event) return False def scheduleFilesChanges(self, item): @@ -638,7 +640,8 @@ class PipelineManager(object): self.sched.merger.getFilesChanges( item.change.project.connection_name, item.change.project.name, - item.change.ref, item.change.branch, build_set=build_set) + item.change.ref, item.change.branch, build_set=build_set, + event=item.event) return False def prepareItem(self, item): diff --git a/zuul/merger/client.py b/zuul/merger/client.py index b5b05903c..fdc55d447 100644 --- a/zuul/merger/client.py +++ b/zuul/merger/client.py @@ -21,6 +21,7 @@ import gear import zuul.model from zuul.lib.config import get_default +from zuul.lib.logutil import get_annotated_logger def getJobData(job): @@ -100,63 +101,92 @@ class MergeClient(object): return False def submitJob(self, name, data, build_set, - precedence=zuul.model.PRECEDENCE_NORMAL): + precedence=zuul.model.PRECEDENCE_NORMAL, event=None): + log = get_annotated_logger(self.log, event) uuid = str(uuid4().hex) job = MergeJob(name, json.dumps(data), unique=uuid) job.build_set = build_set - self.log.debug("Submitting job %s with data %s" % (job, data)) + log.debug("Submitting job %s with data %s", job, data) self.jobs.add(job) self.gearman.submitJob(job, precedence=precedence, timeout=300) return job def mergeChanges(self, items, build_set, files=None, dirs=None, - repo_state=None, precedence=zuul.model.PRECEDENCE_NORMAL): + repo_state=None, precedence=zuul.model.PRECEDENCE_NORMAL, + event=None): + if event is not None: + zuul_event_id = event.zuul_event_id + else: + zuul_event_id = None data = dict(items=items, files=files, dirs=dirs, - repo_state=repo_state) - self.submitJob('merger:merge', data, build_set, precedence) + repo_state=repo_state, + zuul_event_id=zuul_event_id) + self.submitJob('merger:merge', data, build_set, precedence, + event=event) def getRepoState(self, items, build_set, - precedence=zuul.model.PRECEDENCE_NORMAL): - data = dict(items=items) - self.submitJob('merger:refstate', data, build_set, precedence) + precedence=zuul.model.PRECEDENCE_NORMAL, + event=None): + if event is not None: + zuul_event_id = event.zuul_event_id + else: + zuul_event_id = None + + data = dict(items=items, zuul_event_id=zuul_event_id) + self.submitJob('merger:refstate', data, build_set, precedence, + event=event) def getFiles(self, connection_name, project_name, branch, files, dirs=[], - precedence=zuul.model.PRECEDENCE_HIGH): + precedence=zuul.model.PRECEDENCE_HIGH, event=None): + if event is not None: + zuul_event_id = event.zuul_event_id + else: + zuul_event_id = None + data = dict(connection=connection_name, project=project_name, branch=branch, files=files, - dirs=dirs) - job = self.submitJob('merger:cat', data, None, precedence) + dirs=dirs, + zuul_event_id=zuul_event_id) + job = self.submitJob('merger:cat', data, None, precedence, event=event) return job def getFilesChanges(self, connection_name, project_name, branch, tosha=None, precedence=zuul.model.PRECEDENCE_HIGH, - build_set=None): + build_set=None, event=None): + if event is not None: + zuul_event_id = event.zuul_event_id + else: + zuul_event_id = None + data = dict(connection=connection_name, project=project_name, branch=branch, - tosha=tosha) + tosha=tosha, + zuul_event_id=zuul_event_id) job = self.submitJob('merger:fileschanges', data, build_set, - precedence) + precedence, event=event) return job def onBuildCompleted(self, job): data = getJobData(job) + zuul_event_id = data.get('zuul_event_id') + log = get_annotated_logger(self.log, zuul_event_id) + merged = data.get('merged', False) job.updated = data.get('updated', False) commit = data.get('commit') files = data.get('files', {}) repo_state = data.get('repo_state', {}) job.files = files - self.log.info("Merge %s complete, merged: %s, updated: %s, " - "commit: %s" % - (job, merged, job.updated, commit)) + log.info("Merge %s complete, merged: %s, updated: %s, " + "commit: %s", job, merged, job.updated, commit) job.setComplete() if job.build_set: if job.name == 'merger:fileschanges': diff --git a/zuul/merger/merger.py b/zuul/merger/merger.py index 5519806c3..f7e7d3e66 100644 --- a/zuul/merger/merger.py +++ b/zuul/merger/merger.py @@ -27,6 +27,8 @@ import paramiko import zuul.model +from zuul.lib.logutil import get_annotated_logger + NULL_REF = '0000000000000000000000000000000000000000' @@ -79,11 +81,12 @@ class Repo(object): def __init__(self, remote, local, email, username, speed_limit, speed_time, sshkey=None, cache_path=None, logger=None, git_timeout=300, - retry_attempts=3, retry_interval=30): + retry_attempts=3, retry_interval=30, zuul_event_id=None): if logger is None: self.log = logging.getLogger("zuul.Repo") else: self.log = logger + log = get_annotated_logger(self.log, zuul_event_id) self.env = { 'GIT_HTTP_LOW_SPEED_LIMIT': speed_limit, 'GIT_HTTP_LOW_SPEED_TIME': speed_time, @@ -104,13 +107,13 @@ class Repo(object): try: self._setup_known_hosts() except Exception: - self.log.exception("Unable to set up known_hosts for %s" % remote) + log.exception("Unable to set up known_hosts for %s", remote) try: - self._ensure_cloned() + self._ensure_cloned(zuul_event_id) self._git_set_remote_url( git.Repo(self.local_path), self.remote_url) except Exception: - self.log.exception("Unable to initialize repo for %s" % remote) + log.exception("Unable to initialize repo for %s", remote) def __repr__(self): return "<Repo {} {}>".format(hex(id(self)), self.local_path) @@ -145,7 +148,8 @@ class Repo(object): # connection and DoS Gerrit. client.close() - def _ensure_cloned(self): + def _ensure_cloned(self, zuul_event_id): + log = get_annotated_logger(self.log, zuul_event_id) repo_is_cloned = os.path.exists(os.path.join(self.local_path, '.git')) if self._initialized and repo_is_cloned: try: @@ -167,9 +171,9 @@ class Repo(object): else: clone_url = self.remote_url - self.log.debug("Cloning from %s to %s" % ( - redact_url(clone_url), self.local_path)) - self._git_clone(clone_url) + log.debug("Cloning from %s to %s", + redact_url(clone_url), self.local_path) + self._git_clone(clone_url, zuul_event_id) repo = git.Repo(self.local_path) repo.git.update_environment(**self.env) @@ -193,7 +197,8 @@ class Repo(object): def isInitialized(self): return self._initialized - def _git_clone(self, url): + def _git_clone(self, url, zuul_event_id): + log = get_annotated_logger(self.log, zuul_event_id) mygit = git.cmd.Git(os.getcwd()) mygit.update_environment(**self.env) @@ -206,12 +211,12 @@ class Repo(object): except Exception: if attempt < self.retry_attempts: time.sleep(self.retry_interval) - self.log.warning("Retry %s: Clone %s" % ( - attempt, self.local_path)) + log.warning("Retry %s: Clone %s", attempt, self.local_path) else: raise - def _git_fetch(self, repo, remote, ref=None, **kwargs): + def _git_fetch(self, repo, remote, zuul_event_id, ref=None, **kwargs): + log = get_annotated_logger(self.log, zuul_event_id) for attempt in range(1, self.retry_attempts + 1): try: with timeout_handler(self.local_path): @@ -245,9 +250,9 @@ class Repo(object): shutil.rmtree(self.local_path) else: time.sleep(self.retry_interval) - self.log.exception("Retry %s: Fetch %s %s %s" % ( + log.exception("Retry %s: Fetch %s %s %s" % ( attempt, self.local_path, remote, ref)) - self._ensure_cloned() + self._ensure_cloned(zuul_event_id) else: raise @@ -255,16 +260,17 @@ class Repo(object): with repo.remotes.origin.config_writer as config_writer: config_writer.set('url', url) - def createRepoObject(self): - self._ensure_cloned() + def createRepoObject(self, zuul_event_id): + self._ensure_cloned(zuul_event_id) repo = git.Repo(self.local_path) repo.git.update_environment(**self.env) return repo - def reset(self): - self.log.debug("Resetting repository %s" % self.local_path) - self.update() - repo = self.createRepoObject() + def reset(self, zuul_event_id=None): + log = get_annotated_logger(self.log, zuul_event_id) + log.debug("Resetting repository %s", self.local_path) + self.update(zuul_event_id=zuul_event_id) + repo = self.createRepoObject(zuul_event_id) origin = repo.remotes.origin seen = set() head = None @@ -282,10 +288,10 @@ class Repo(object): seen.add(ref.remote_head) if head is None: head = ref.remote_head - self.log.debug("Reset to %s", head) + log.debug("Reset to %s", head) repo.head.reference = head for ref in stale_refs: - self.log.debug("Delete stale ref %s", ref.remote_head) + log.debug("Delete stale ref %s", ref.remote_head) # A stale ref means the upstream branch (e.g. foobar) was deleted # so we need to delete both our local head (if existing) and the # remote tracking head. Both repo.heads and ref.remote_head @@ -296,89 +302,94 @@ class Repo(object): break git.refs.RemoteReference.delete(repo, ref, force=True) - def prune(self): - repo = self.createRepoObject() + def prune(self, zuul_event_id=None): + log = get_annotated_logger(self.log, zuul_event_id) + repo = self.createRepoObject(zuul_event_id) origin = repo.remotes.origin stale_refs = origin.stale_refs if stale_refs: - self.log.debug("Pruning stale refs: %s", stale_refs) + log.debug("Pruning stale refs: %s", stale_refs) git.refs.RemoteReference.delete(repo, force=True, *stale_refs) - def getBranchHead(self, branch): - repo = self.createRepoObject() + def getBranchHead(self, branch, zuul_event_id=None): + repo = self.createRepoObject(zuul_event_id) branch_head = repo.heads[branch] return branch_head.commit - def hasBranch(self, branch): - repo = self.createRepoObject() + def hasBranch(self, branch, zuul_event_id=None): + repo = self.createRepoObject(zuul_event_id) origin = repo.remotes.origin return branch in origin.refs - def getBranches(self): + def getBranches(self, zuul_event_id=None): # TODO(jeblair): deprecate with override-branch; replaced by # getRefs(). - repo = self.createRepoObject() + repo = self.createRepoObject(zuul_event_id) return [x.name for x in repo.heads] - def getCommitFromRef(self, refname): - repo = self.createRepoObject() + def getCommitFromRef(self, refname, zuul_event_id=None): + repo = self.createRepoObject(zuul_event_id) if refname not in repo.refs: return None ref = repo.refs[refname] return ref.commit - def getRefs(self): - repo = self.createRepoObject() + def getRefs(self, zuul_event_id=None): + repo = self.createRepoObject(zuul_event_id) return repo.refs - def setRef(self, path, hexsha, repo=None): - self.log.debug("Create reference %s at %s in %s", - path, hexsha, self.local_path) + def setRef(self, path, hexsha, repo=None, zuul_event_id=None): + log = get_annotated_logger(self.log, zuul_event_id) + log.debug("Create reference %s at %s in %s", + path, hexsha, self.local_path) if repo is None: - repo = self.createRepoObject() + repo = self.createRepoObject(zuul_event_id) binsha = gitdb.util.to_bin_sha(hexsha) obj = git.objects.Object.new_from_sha(repo, binsha) git.refs.Reference.create(repo, path, obj, force=True) - def setRefs(self, refs, keep_remotes=False): - repo = self.createRepoObject() + def setRefs(self, refs, keep_remotes=False, zuul_event_id=None): + repo = self.createRepoObject(zuul_event_id) current_refs = {} for ref in repo.refs: current_refs[ref.path] = ref unseen = set(current_refs.keys()) for path, hexsha in refs.items(): - self.setRef(path, hexsha, repo) + self.setRef(path, hexsha, repo, zuul_event_id=zuul_event_id) unseen.discard(path) ref = current_refs.get(path) if keep_remotes and ref: unseen.discard('refs/remotes/origin/{}'.format(ref.name)) for path in unseen: - self.deleteRef(path, repo) + self.deleteRef(path, repo, zuul_event_id=zuul_event_id) - def setRemoteRef(self, branch, rev): - repo = self.createRepoObject() + def setRemoteRef(self, branch, rev, zuul_event_id=None): + log = get_annotated_logger(self.log, zuul_event_id) + repo = self.createRepoObject(zuul_event_id) try: origin_ref = repo.remotes.origin.refs[branch] except IndexError: - self.log.warning("No remote ref found for branch %s", branch) + log.warning("No remote ref found for branch %s", branch) return - self.log.debug("Updating remote reference %s to %s", origin_ref, rev) + log.debug("Updating remote reference %s to %s", origin_ref, rev) origin_ref.commit = rev - def deleteRef(self, path, repo=None): + def deleteRef(self, path, repo=None, zuul_event_id=None): + log = get_annotated_logger(self.log, zuul_event_id) if repo is None: - repo = self.createRepoObject() - self.log.debug("Delete reference %s", path) + repo = self.createRepoObject(zuul_event_id) + log.debug("Delete reference %s", path) git.refs.SymbolicReference.delete(repo, path) - def checkout(self, ref): - repo = self.createRepoObject() + def checkout(self, ref, zuul_event_id=None): + log = get_annotated_logger(self.log, zuul_event_id) + repo = self.createRepoObject(zuul_event_id) # NOTE(pabelanger): We need to check for detached repo head, otherwise # gitpython will raise an exception if we access the reference. if not repo.head.is_detached and repo.head.reference == ref: - self.log.debug("Repo is already at %s" % ref) + log.debug("Repo is already at %s" % ref) else: - self.log.debug("Checking out %s" % ref) + log.debug("Checking out %s" % ref) # Perform a hard reset to the correct ref before checking out so # that we clean up anything that might be left over from a merge # while still only preparing the working copy once. @@ -389,26 +400,28 @@ class Repo(object): return repo.head.commit - def cherryPick(self, ref): - repo = self.createRepoObject() - self.log.debug("Cherry-picking %s" % ref) - self.fetch(ref) + def cherryPick(self, ref, zuul_event_id=None): + log = get_annotated_logger(self.log, zuul_event_id) + repo = self.createRepoObject(zuul_event_id) + log.debug("Cherry-picking %s", ref) + self.fetch(ref, zuul_event_id=zuul_event_id) repo.git.cherry_pick("FETCH_HEAD") return repo.head.commit - def merge(self, ref, strategy=None): - repo = self.createRepoObject() + def merge(self, ref, strategy=None, zuul_event_id=None): + log = get_annotated_logger(self.log, zuul_event_id) + repo = self.createRepoObject(zuul_event_id) args = [] if strategy: args += ['-s', strategy] args.append('FETCH_HEAD') - self.fetch(ref) - self.log.debug("Merging %s with args %s" % (ref, args)) + self.fetch(ref, zuul_event_id=zuul_event_id) + log.debug("Merging %s with args %s", ref, args) repo.git.merge(*args) return repo.head.commit - def fetch(self, ref): - repo = self.createRepoObject() + def fetch(self, ref, zuul_event_id=None): + repo = self.createRepoObject(zuul_event_id) # NOTE: The following is currently not applicable, but if we # switch back to fetch methods from GitPython, we need to # consider it: @@ -416,37 +429,39 @@ class Repo(object): # interpret it improperly causing an AssertionError. Because the # data was fetched properly subsequent fetches don't seem to fail. # So try again if an AssertionError is caught. - self._git_fetch(repo, 'origin', ref) + self._git_fetch(repo, 'origin', zuul_event_id, ref=ref) - def revParse(self, ref): - repo = self.createRepoObject() + def revParse(self, ref, zuul_event_id=None): + repo = self.createRepoObject(zuul_event_id) return repo.git.rev_parse(ref) - def fetchFrom(self, repository, ref): - repo = self.createRepoObject() - self._git_fetch(repo, repository, ref) + def fetchFrom(self, repository, ref, zuul_event_id=None): + repo = self.createRepoObject(zuul_event_id) + self._git_fetch(repo, repository, zuul_event_id, ref=ref) - def push(self, local, remote): - repo = self.createRepoObject() - self.log.debug("Pushing %s:%s to %s" % (local, remote, - self.remote_url)) + def push(self, local, remote, zuul_event_id=None): + log = get_annotated_logger(self.log, zuul_event_id) + repo = self.createRepoObject(zuul_event_id) + log.debug("Pushing %s:%s to %s", local, remote, self.remote_url) repo.remotes.origin.push('%s:%s' % (local, remote)) - def update(self): - repo = self.createRepoObject() - self.log.debug("Updating repository %s" % self.local_path) + def update(self, zuul_event_id=None): + log = get_annotated_logger(self.log, zuul_event_id) + repo = self.createRepoObject(zuul_event_id) + log.debug("Updating repository %s" % self.local_path) if repo.git.version_info[:2] < (1, 9): # Before 1.9, 'git fetch --tags' did not include the # behavior covered by 'git --fetch', so we run both # commands in that case. Starting with 1.9, 'git fetch # --tags' is all that is necessary. See # https://github.com/git/git/blob/master/Documentation/RelNotes/1.9.0.txt#L18-L20 - self._git_fetch(repo, 'origin') - self._git_fetch(repo, 'origin', tags=True) + self._git_fetch(repo, 'origin', zuul_event_id) + self._git_fetch(repo, 'origin', zuul_event_id, tags=True) - def getFiles(self, files, dirs=[], branch=None, commit=None): + def getFiles(self, files, dirs=[], branch=None, commit=None, + zuul_event_id=None): ret = {} - repo = self.createRepoObject() + repo = self.createRepoObject(zuul_event_id) if branch: tree = repo.heads[branch].commit.tree else: @@ -466,10 +481,11 @@ class Repo(object): 'utf-8') return ret - def getFilesChanges(self, branch, tosha=None): - repo = self.createRepoObject() - self.fetch(branch) - head = repo.commit(self.revParse('FETCH_HEAD')) + def getFilesChanges(self, branch, tosha=None, zuul_event_id=None): + repo = self.createRepoObject(zuul_event_id) + self.fetch(branch, zuul_event_id=zuul_event_id) + head = repo.commit( + self.revParse('FETCH_HEAD', zuul_event_id=zuul_event_id)) files = set() if tosha: @@ -480,19 +496,22 @@ class Repo(object): files.update(head.stats.files.keys()) return list(files) - def deleteRemote(self, remote): - repo = self.createRepoObject() + def deleteRemote(self, remote, zuul_event_id=None): + repo = self.createRepoObject(zuul_event_id) repo.delete_remote(repo.remotes[remote]) - def setRemoteUrl(self, url): + def setRemoteUrl(self, url, zuul_event_id=None): if self.remote_url == url: return - self.log.debug("Set remote url to %s" % redact_url(url)) + log = get_annotated_logger(self.log, zuul_event_id) + log.debug("Set remote url to %s", redact_url(url)) self.remote_url = url - self._git_set_remote_url(self.createRepoObject(), self.remote_url) + self._git_set_remote_url( + self.createRepoObject(zuul_event_id), + self.remote_url) - def mapLine(self, commit, filename, lineno): - repo = self.createRepoObject() + def mapLine(self, commit, filename, lineno, zuul_event_id=None): + repo = self.createRepoObject(zuul_event_id) # Trace the specified line back to the specified commit and # return the line number in that commit. cur_commit = None @@ -534,7 +553,7 @@ class Merger(object): # behavior e.g. to keep the 'origin' remote intact. self.execution_context = execution_context - def _addProject(self, hostname, project_name, url, sshkey): + def _addProject(self, hostname, project_name, url, sshkey, zuul_event_id): repo = None key = '/'.join([hostname, project_name]) try: @@ -547,15 +566,17 @@ class Merger(object): repo = Repo( url, path, self.email, self.username, self.speed_limit, self.speed_time, sshkey=sshkey, cache_path=cache_path, - logger=self.logger, git_timeout=self.git_timeout) + logger=self.logger, git_timeout=self.git_timeout, + zuul_event_id=zuul_event_id) self.repos[key] = repo except Exception: - self.log.exception("Unable to add project %s/%s" % - (hostname, project_name)) + log = get_annotated_logger(self.log, zuul_event_id) + log.exception("Unable to add project %s/%s", + hostname, project_name) return repo - def getRepo(self, connection_name, project_name): + def getRepo(self, connection_name, project_name, zuul_event_id=None): source = self.connections.getSource(connection_name) project = source.getProject(project_name) hostname = project.canonical_hostname @@ -571,23 +592,29 @@ class Merger(object): raise Exception("Unable to set up repo for project %s/%s" " without a url" % (connection_name, project_name,)) - return self._addProject(hostname, project_name, url, sshkey) + return self._addProject(hostname, project_name, url, sshkey, + zuul_event_id) - def updateRepo(self, connection_name, project_name): - repo = self.getRepo(connection_name, project_name) + def updateRepo(self, connection_name, project_name, zuul_event_id=None): + log = get_annotated_logger(self.log, zuul_event_id) + repo = self.getRepo(connection_name, project_name, + zuul_event_id=zuul_event_id) try: - self.log.info("Updating local repository %s/%s", - connection_name, project_name) + log.info("Updating local repository %s/%s", + connection_name, project_name) repo.reset() except Exception: - self.log.exception("Unable to update %s/%s", - connection_name, project_name) + log.exception("Unable to update %s/%s", + connection_name, project_name) - def checkoutBranch(self, connection_name, project_name, branch): - self.log.info("Checking out %s/%s branch %s", - connection_name, project_name, branch) - repo = self.getRepo(connection_name, project_name) - repo.checkout(branch) + def checkoutBranch(self, connection_name, project_name, branch, + zuul_event_id=None): + log = get_annotated_logger(self.log, zuul_event_id) + log.info("Checking out %s/%s branch %s", + connection_name, project_name, branch) + repo = self.getRepo(connection_name, project_name, + zuul_event_id=zuul_event_id) + repo.checkout(branch, zuul_event_id=zuul_event_id) def _saveRepoState(self, connection_name, project_name, repo, repo_state, recent): @@ -616,51 +643,58 @@ class Merger(object): project[path] = hexsha def _restoreRepoState(self, connection_name, project_name, repo, - repo_state): + repo_state, zuul_event_id): + log = get_annotated_logger(self.log, zuul_event_id) projects = repo_state.get(connection_name, {}) project = projects.get(project_name, {}) if not project: # We don't have a state for this project. return - self.log.debug("Restore repo state for project %s/%s", - connection_name, project_name) - repo.setRefs(project, keep_remotes=self.execution_context) - - def _mergeChange(self, item, ref): - repo = self.getRepo(item['connection'], item['project']) + log.debug("Restore repo state for project %s/%s", + connection_name, project_name) + repo.setRefs(project, keep_remotes=self.execution_context, + zuul_event_id=zuul_event_id) + + def _mergeChange(self, item, ref, zuul_event_id): + log = get_annotated_logger(self.log, zuul_event_id) + repo = self.getRepo(item['connection'], item['project'], + zuul_event_id=zuul_event_id) try: - repo.checkout(ref) + repo.checkout(ref, zuul_event_id=zuul_event_id) except Exception: - self.log.exception("Unable to checkout %s" % ref) + log.exception("Unable to checkout %s", ref) return None, None try: mode = item['merge_mode'] if mode == zuul.model.MERGER_MERGE: - commit = repo.merge(item['ref']) + commit = repo.merge(item['ref'], zuul_event_id=zuul_event_id) elif mode == zuul.model.MERGER_MERGE_RESOLVE: - commit = repo.merge(item['ref'], 'resolve') + commit = repo.merge(item['ref'], 'resolve', + zuul_event_id=zuul_event_id) elif mode == zuul.model.MERGER_CHERRY_PICK: - commit = repo.cherryPick(item['ref']) + commit = repo.cherryPick(item['ref'], + zuul_event_id=zuul_event_id) else: raise Exception("Unsupported merge mode: %s" % mode) except git.GitCommandError: # Log git exceptions at debug level because they are # usually benign merge conflicts - self.log.debug("Unable to merge %s" % item, exc_info=True) + log.debug("Unable to merge %s", item, exc_info=True) return None, None except Exception: - self.log.exception("Exception while merging a change:") + log.exception("Exception while merging a change:") return None, None orig_commit = repo.revParse('FETCH_HEAD') return orig_commit, commit - def _mergeItem(self, item, recent, repo_state): - self.log.debug("Processing ref %s for project %s/%s / %s uuid %s" % - (item['ref'], item['connection'], - item['project'], item['branch'], - item['buildset_uuid'])) + def _mergeItem(self, item, recent, repo_state, zuul_event_id): + log = get_annotated_logger(self.log, zuul_event_id) + log.debug("Processing ref %s for project %s/%s / %s uuid %s" % + (item['ref'], item['connection'], + item['project'], item['branch'], + item['buildset_uuid'])) repo = self.getRepo(item['connection'], item['project']) key = (item['connection'], item['project'], item['branch']) @@ -670,14 +704,14 @@ class Merger(object): if not base: # There is none, so use the branch tip # we need to reset here in order to call getBranchHead - self.log.debug("No base commit found for %s" % (key,)) + log.debug("No base commit found for %s" % (key,)) try: - repo.reset() + repo.reset(zuul_event_id=zuul_event_id) except Exception: - self.log.exception("Unable to reset repo %s" % repo) + log.exception("Unable to reset repo %s" % repo) return None, None self._restoreRepoState(item['connection'], item['project'], repo, - repo_state) + repo_state, zuul_event_id) base = repo.getBranchHead(item['branch']) # Save the repo state so that later mergers can repeat @@ -685,16 +719,17 @@ class Merger(object): self._saveRepoState(item['connection'], item['project'], repo, repo_state, recent) else: - self.log.debug("Found base commit %s for %s" % (base, key,)) + log.debug("Found base commit %s for %s" % (base, key,)) if self.execution_context: # Set origin branch to the rev of the current (speculative) base. # This allows tools to determine the commits that are part of a # change by looking at origin/master..master. - repo.setRemoteRef(item['branch'], base) + repo.setRemoteRef(item['branch'], base, + zuul_event_id=zuul_event_id) # Merge the change - orig_commit, commit = self._mergeChange(item, base) + orig_commit, commit = self._mergeChange(item, base, zuul_event_id) if not commit: return None, None # Store this commit as the most recent for this project-branch @@ -702,7 +737,8 @@ class Merger(object): return orig_commit, commit def mergeChanges(self, items, files=None, dirs=None, repo_state=None, - repo_locks=None): + repo_locks=None, zuul_event_id=None): + log = get_annotated_logger(self.log, zuul_event_id) # connection+project+branch -> commit recent = {} commit = None @@ -719,9 +755,10 @@ class Merger(object): else: lock = nullcontext() with lock: - self.log.debug("Merging for change %s,%s" % - (item["number"], item["patchset"])) - orig_commit, commit = self._mergeItem(item, recent, repo_state) + log.debug("Merging for change %s,%s" % + (item["number"], item["patchset"])) + orig_commit, commit = self._mergeItem( + item, recent, repo_state, zuul_event_id) if not commit: return None if files or dirs: @@ -737,11 +774,12 @@ class Merger(object): ret_recent[k] = v.hexsha return commit.hexsha, read_files, repo_state, ret_recent, orig_commit - def setRepoState(self, items, repo_state): + def setRepoState(self, items, repo_state, zuul_event_id=None): # Sets the repo state for the items seen = set() for item in items: - repo = self.getRepo(item['connection'], item['project']) + repo = self.getRepo(item['connection'], item['project'], + zuul_event_id=zuul_event_id) key = (item['connection'], item['project'], item['branch']) if key in seen: @@ -749,7 +787,7 @@ class Merger(object): repo.reset() self._restoreRepoState(item['connection'], item['project'], repo, - repo_state) + repo_state, zuul_event_id) def getRepoState(self, items, repo_locks=None): # Gets the repo state for items. Generally this will be @@ -794,6 +832,7 @@ class Merger(object): return repo.getFiles(files, dirs, branch=branch) def getFilesChanges(self, connection_name, project_name, branch, - tosha=None): - repo = self.getRepo(connection_name, project_name) - return repo.getFilesChanges(branch, tosha) + tosha=None, zuul_event_id=None): + repo = self.getRepo(connection_name, project_name, + zuul_event_id=zuul_event_id) + return repo.getFilesChanges(branch, tosha, zuul_event_id=zuul_event_id) diff --git a/zuul/merger/server.py b/zuul/merger/server.py index fc8942d94..d8a8feb54 100644 --- a/zuul/merger/server.py +++ b/zuul/merger/server.py @@ -137,23 +137,27 @@ class MergeServer(object): def merge(self, job): args = json.loads(job.arguments) + zuul_event_id = args.get('zuul_event_id') ret = self.merger.mergeChanges( args['items'], args.get('files'), - args.get('dirs'), args.get('repo_state')) + args.get('dirs'), args.get('repo_state'), + zuul_event_id=zuul_event_id) result = dict(merged=(ret is not None)) if ret is None: result['commit'] = result['files'] = result['repo_state'] = None else: (result['commit'], result['files'], result['repo_state'], recent, orig_commit) = ret + result['zuul_event_id'] = zuul_event_id job.sendWorkComplete(json.dumps(result)) def refstate(self, job): args = json.loads(job.arguments) - + zuul_event_id = args.get('zuul_event_id') success, repo_state = self.merger.getRepoState(args['items']) result = dict(updated=success, repo_state=repo_state) + result['zuul_event_id'] = zuul_event_id job.sendWorkComplete(json.dumps(result)) def cat(self, job): @@ -168,9 +172,13 @@ class MergeServer(object): def fileschanges(self, job): args = json.loads(job.arguments) - self.merger.updateRepo(args['connection'], args['project']) + zuul_event_id = args.get('zuul_event_id') + self.merger.updateRepo(args['connection'], args['project'], + zuul_event_id=zuul_event_id) files = self.merger.getFilesChanges( - args['connection'], args['project'], args['branch'], args['tosha']) + args['connection'], args['project'], args['branch'], args['tosha'], + zuul_event_id=zuul_event_id) result = dict(updated=True, files=files) + result['zuul_event_id'] = zuul_event_id job.sendWorkComplete(json.dumps(result)) diff --git a/zuul/scheduler.py b/zuul/scheduler.py index 84378c6e3..b66ed40a8 100644 --- a/zuul/scheduler.py +++ b/zuul/scheduler.py @@ -48,6 +48,7 @@ class ManagementEvent(object): def __init__(self): self._wait_event = threading.Event() self._exc_info = None + self.zuul_event_id = None def exception(self, exc_info): self._exc_info = exc_info |