diff options
Diffstat (limited to 'zuul')
-rw-r--r-- | zuul/driver/git/__init__.py | 7 | ||||
-rw-r--r-- | zuul/driver/git/gitconnection.py | 200 | ||||
-rw-r--r-- | zuul/driver/git/gitmodel.py | 86 | ||||
-rw-r--r-- | zuul/driver/git/gitsource.py | 2 | ||||
-rw-r--r-- | zuul/driver/git/gittrigger.py | 49 | ||||
-rw-r--r-- | zuul/executor/server.py | 17 | ||||
-rw-r--r-- | zuul/merger/client.py | 9 | ||||
-rw-r--r-- | zuul/merger/merger.py | 17 | ||||
-rw-r--r-- | zuul/merger/server.py | 13 |
9 files changed, 396 insertions, 4 deletions
diff --git a/zuul/driver/git/__init__.py b/zuul/driver/git/__init__.py index 0faa0365a..1fe43f643 100644 --- a/zuul/driver/git/__init__.py +++ b/zuul/driver/git/__init__.py @@ -15,6 +15,7 @@ from zuul.driver import Driver, ConnectionInterface, SourceInterface from zuul.driver.git import gitconnection from zuul.driver.git import gitsource +from zuul.driver.git import gittrigger class GitDriver(Driver, ConnectionInterface, SourceInterface): @@ -23,9 +24,15 @@ class GitDriver(Driver, ConnectionInterface, SourceInterface): def getConnection(self, name, config): return gitconnection.GitConnection(self, name, config) + def getTrigger(self, connection, config=None): + return gittrigger.GitTrigger(self, connection, config) + def getSource(self, connection): return gitsource.GitSource(self, connection) + def getTriggerSchema(self): + return gittrigger.getSchema() + def getRequireSchema(self): return {} diff --git a/zuul/driver/git/gitconnection.py b/zuul/driver/git/gitconnection.py index f93824d2f..03b24cadc 100644 --- a/zuul/driver/git/gitconnection.py +++ b/zuul/driver/git/gitconnection.py @@ -13,12 +13,119 @@ # License for the specific language governing permissions and limitations # under the License. +import os +import git +import time import logging import urllib +import threading import voluptuous as v from zuul.connection import BaseConnection +from zuul.driver.git.gitmodel import GitTriggerEvent, EMPTY_GIT_REF +from zuul.model import Ref, Branch + + +class GitWatcher(threading.Thread): + log = logging.getLogger("connection.git.GitWatcher") + + def __init__(self, git_connection, baseurl, poll_delay): + threading.Thread.__init__(self) + self.daemon = True + self.git_connection = git_connection + self.baseurl = baseurl + self.poll_delay = poll_delay + self._stopped = False + self.projects_refs = self.git_connection.projects_refs + + def compareRefs(self, project, refs): + partial_events = [] + # Fetch previous refs state + base_refs = self.projects_refs.get(project) + # Create list of created refs + rcreateds = set(refs.keys()) - set(base_refs.keys()) + # Create list of deleted refs + rdeleteds = set(base_refs.keys()) - set(refs.keys()) + # Create the list of updated refs + updateds = {} + for ref, sha in refs.items(): + if ref in base_refs and base_refs[ref] != sha: + updateds[ref] = sha + for ref in rcreateds: + event = { + 'ref': ref, + 'branch_created': True, + 'oldrev': EMPTY_GIT_REF, + 'newrev': refs[ref] + } + partial_events.append(event) + for ref in rdeleteds: + event = { + 'ref': ref, + 'branch_deleted': True, + 'oldrev': base_refs[ref], + 'newrev': EMPTY_GIT_REF + } + partial_events.append(event) + for ref, sha in updateds.items(): + event = { + 'ref': ref, + 'branch_updated': True, + 'oldrev': base_refs[ref], + 'newrev': sha + } + partial_events.append(event) + events = [] + for pevent in partial_events: + event = GitTriggerEvent() + event.type = 'ref-updated' + event.project_hostname = self.git_connection.canonical_hostname + event.project_name = project + for attr in ('ref', 'oldrev', 'newrev', 'branch_created', + 'branch_deleted', 'branch_updated'): + if attr in pevent: + setattr(event, attr, pevent[attr]) + events.append(event) + return events + + def _run(self): + self.log.debug("Walk through projects refs for connection: %s" % + self.git_connection.connection_name) + try: + for project in self.git_connection.projects: + refs = self.git_connection.lsRemote(project) + self.log.debug("Read refs %s for project %s" % (refs, project)) + if not self.projects_refs.get(project): + # State for this project does not exist yet so add it. + # No event will be triggered in this loop as + # projects_refs['project'] and refs are equal + self.projects_refs[project] = refs + events = self.compareRefs(project, refs) + self.projects_refs[project] = refs + # Send events to the scheduler + for event in events: + self.log.debug("Handling event: %s" % event) + # Force changes cache update before passing + # the event to the scheduler + self.git_connection.getChange(event) + self.git_connection.logEvent(event) + # Pass the event to the scheduler + self.git_connection.sched.addEvent(event) + except Exception as e: + self.log.debug("Unexpected issue in _run loop: %s" % str(e)) + + def run(self): + while not self._stopped: + if not self.git_connection.w_pause: + self._run() + # Polling wait delay + else: + self.log.debug("Watcher is on pause") + time.sleep(self.poll_delay) + + def stop(self): + self._stopped = True class GitConnection(BaseConnection): @@ -32,6 +139,8 @@ class GitConnection(BaseConnection): raise Exception('baseurl is required for git connections in ' '%s' % self.connection_name) self.baseurl = self.connection_config.get('baseurl') + self.poll_timeout = float( + self.connection_config.get('poll_delay', 3600 * 2)) self.canonical_hostname = self.connection_config.get( 'canonical_hostname') if not self.canonical_hostname: @@ -40,7 +149,10 @@ class GitConnection(BaseConnection): self.canonical_hostname = r.hostname else: self.canonical_hostname = 'localhost' + self.w_pause = False self.projects = {} + self.projects_refs = {} + self._change_cache = {} def getProject(self, name): return self.projects.get(name) @@ -48,15 +160,97 @@ class GitConnection(BaseConnection): def addProject(self, project): self.projects[project.name] = project + def getChangeFilesUpdated(self, project_name, branch, tosha): + job = self.sched.merger.getFilesChanges( + self.connection_name, project_name, branch, tosha) + self.log.debug("Waiting for fileschanges job %s" % job) + job.wait() + if not job.updated: + raise Exception("Fileschanges job %s failed" % job) + self.log.debug("Fileschanges job %s got changes on files %s" % + (job, job.files)) + return job.files + + def lsRemote(self, project): + refs = {} + client = git.cmd.Git() + output = client.ls_remote( + os.path.join(self.baseurl, project)) + for line in output.splitlines(): + sha, ref = line.split('\t') + if ref.startswith('refs/'): + refs[ref] = sha + return refs + + def maintainCache(self, relevant): + remove = {} + for branch, refschange in self._change_cache.items(): + for ref, change in refschange.items(): + if change not in relevant: + remove.setdefault(branch, []).append(ref) + for branch, refs in remove.items(): + for ref in refs: + del self._change_cache[branch][ref] + if not self._change_cache[branch]: + del self._change_cache[branch] + + def getChange(self, event, refresh=False): + if event.ref and event.ref.startswith('refs/heads/'): + branch = event.ref[len('refs/heads/'):] + change = self._change_cache.get(branch, {}).get(event.newrev) + if change: + return change + project = self.getProject(event.project_name) + change = Branch(project) + change.branch = branch + for attr in ('ref', 'oldrev', 'newrev'): + setattr(change, attr, getattr(event, attr)) + change.url = "" + change.files = self.getChangeFilesUpdated( + event.project_name, change.branch, event.oldrev) + self._change_cache.setdefault(branch, {})[event.newrev] = change + elif event.ref: + # catch-all ref (ie, not a branch or head) + project = self.getProject(event.project_name) + change = Ref(project) + for attr in ('ref', 'oldrev', 'newrev'): + setattr(change, attr, getattr(event, attr)) + change.url = "" + else: + self.log.warning("Unable to get change for %s" % (event,)) + change = None + return change + def getProjectBranches(self, project, tenant): - # TODO(jeblair): implement; this will need to handle local or - # remote git urls. - return ['master'] + refs = self.lsRemote(project.name) + branches = [ref[len('refs/heads/'):] for ref in + refs if ref.startswith('refs/heads/')] + return branches def getGitUrl(self, project): url = '%s/%s' % (self.baseurl, project.name) return url + def onLoad(self): + self.log.debug("Starting Git Watcher") + self._start_watcher_thread() + + def onStop(self): + self.log.debug("Stopping Git Watcher") + self._stop_watcher_thread() + + def _stop_watcher_thread(self): + if self.watcher_thread: + self.watcher_thread.stop() + self.watcher_thread.join() + + def _start_watcher_thread(self): + self.watcher_thread = GitWatcher( + self, + self.baseurl, + self.poll_timeout) + self.watcher_thread.start() + def getSchema(): git_connection = v.Any(str, v.Schema(dict)) diff --git a/zuul/driver/git/gitmodel.py b/zuul/driver/git/gitmodel.py new file mode 100644 index 000000000..5d12b36da --- /dev/null +++ b/zuul/driver/git/gitmodel.py @@ -0,0 +1,86 @@ +# Copyright 2017 Red Hat, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import re + +from zuul.model import TriggerEvent +from zuul.model import EventFilter + + +EMPTY_GIT_REF = '0' * 40 # git sha of all zeros, used during creates/deletes + + +class GitTriggerEvent(TriggerEvent): + """Incoming event from an external system.""" + + def __repr__(self): + ret = '<GitTriggerEvent %s %s' % (self.type, + self.project_name) + + if self.branch: + ret += " %s" % self.branch + ret += " oldrev:%s" % self.oldrev + ret += " newrev:%s" % self.newrev + ret += '>' + + return ret + + +class GitEventFilter(EventFilter): + def __init__(self, trigger, types=[], refs=[], + ignore_deletes=True): + + super().__init__(trigger) + + self._refs = refs + self.types = types + self.refs = [re.compile(x) for x in refs] + self.ignore_deletes = ignore_deletes + + def __repr__(self): + ret = '<GitEventFilter' + + if self.types: + ret += ' types: %s' % ', '.join(self.types) + if self._refs: + ret += ' refs: %s' % ', '.join(self._refs) + if self.ignore_deletes: + ret += ' ignore_deletes: %s' % self.ignore_deletes + ret += '>' + + return ret + + def matches(self, event, change): + # event types are ORed + matches_type = False + for etype in self.types: + if etype == event.type: + matches_type = True + if self.types and not matches_type: + return False + + # refs are ORed + matches_ref = False + if event.ref is not None: + for ref in self.refs: + if ref.match(event.ref): + matches_ref = True + if self.refs and not matches_ref: + return False + if self.ignore_deletes and event.newrev == EMPTY_GIT_REF: + # If the updated ref has an empty git sha (all 0s), + # then the ref is being deleted + return False + + return True diff --git a/zuul/driver/git/gitsource.py b/zuul/driver/git/gitsource.py index 8d85c082f..78ae04ee7 100644 --- a/zuul/driver/git/gitsource.py +++ b/zuul/driver/git/gitsource.py @@ -36,7 +36,7 @@ class GitSource(BaseSource): raise NotImplemented() def getChange(self, event, refresh=False): - raise NotImplemented() + return self.connection.getChange(event, refresh) def getProject(self, name): p = self.connection.getProject(name) diff --git a/zuul/driver/git/gittrigger.py b/zuul/driver/git/gittrigger.py new file mode 100644 index 000000000..28852307e --- /dev/null +++ b/zuul/driver/git/gittrigger.py @@ -0,0 +1,49 @@ +# Copyright 2017 Red Hat, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import logging +import voluptuous as v +from zuul.trigger import BaseTrigger +from zuul.driver.git.gitmodel import GitEventFilter +from zuul.driver.util import scalar_or_list, to_list + + +class GitTrigger(BaseTrigger): + name = 'git' + log = logging.getLogger("zuul.GitTrigger") + + def getEventFilters(self, trigger_conf): + efilters = [] + for trigger in to_list(trigger_conf): + f = GitEventFilter( + trigger=self, + types=to_list(trigger['event']), + refs=to_list(trigger.get('ref')), + ignore_deletes=trigger.get( + 'ignore-deletes', True) + ) + efilters.append(f) + + return efilters + + +def getSchema(): + git_trigger = { + v.Required('event'): + scalar_or_list(v.Any('ref-updated')), + 'ref': scalar_or_list(str), + 'ignore-deletes': bool, + } + + return git_trigger diff --git a/zuul/executor/server.py b/zuul/executor/server.py index 7a93f896b..22dee9aca 100644 --- a/zuul/executor/server.py +++ b/zuul/executor/server.py @@ -1706,6 +1706,7 @@ class ExecutorServer(object): self.merger_worker.registerFunction("merger:merge") self.merger_worker.registerFunction("merger:cat") self.merger_worker.registerFunction("merger:refstate") + self.merger_worker.registerFunction("merger:fileschanges") def register_work(self): if self._running: @@ -1859,6 +1860,9 @@ class ExecutorServer(object): elif job.name == 'merger:refstate': self.log.debug("Got refstate job: %s" % job.unique) self.refstate(job) + elif job.name == 'merger:fileschanges': + self.log.debug("Got fileschanges job: %s" % job.unique) + self.fileschanges(job) else: self.log.error("Unable to handle job %s" % job.name) job.sendWorkFail() @@ -1970,6 +1974,19 @@ class ExecutorServer(object): files=files) job.sendWorkComplete(json.dumps(result)) + def fileschanges(self, job): + args = json.loads(job.arguments) + task = self.update(args['connection'], args['project']) + task.wait() + with self.merger_lock: + files = self.merger.getFilesChanges( + args['connection'], args['project'], + args['branch'], + args['tosha']) + result = dict(updated=True, + files=files) + job.sendWorkComplete(json.dumps(result)) + def refstate(self, job): args = json.loads(job.arguments) with self.merger_lock: diff --git a/zuul/merger/client.py b/zuul/merger/client.py index 2614e5887..c89a6fba8 100644 --- a/zuul/merger/client.py +++ b/zuul/merger/client.py @@ -131,6 +131,15 @@ class MergeClient(object): job = self.submitJob('merger:cat', data, None, precedence) return job + def getFilesChanges(self, connection_name, project_name, branch, + tosha=None, precedence=zuul.model.PRECEDENCE_HIGH): + data = dict(connection=connection_name, + project=project_name, + branch=branch, + tosha=tosha) + job = self.submitJob('merger:fileschanges', data, None, precedence) + return job + def onBuildCompleted(self, job): data = getJobData(job) merged = data.get('merged', False) diff --git a/zuul/merger/merger.py b/zuul/merger/merger.py index 06ec4b2b9..bd4ca58ee 100644 --- a/zuul/merger/merger.py +++ b/zuul/merger/merger.py @@ -314,6 +314,18 @@ class Repo(object): 'utf-8') return ret + def getFilesChanges(self, branch, tosha=None): + repo = self.createRepoObject() + files = set() + head = repo.heads[branch].commit + files.update(set(head.stats.files.keys())) + if tosha: + for cmt in head.iter_parents(): + if cmt.hexsha == tosha: + break + files.update(set(cmt.stats.files.keys())) + return list(files) + def deleteRemote(self, remote): repo = self.createRepoObject() repo.delete_remote(repo.remotes[remote]) @@ -581,3 +593,8 @@ class Merger(object): def getFiles(self, connection_name, project_name, branch, files, dirs=[]): repo = self.getRepo(connection_name, project_name) return repo.getFiles(files, dirs, branch=branch) + + def getFilesChanges(self, connection_name, project_name, branch, + tosha=None): + repo = self.getRepo(connection_name, project_name) + return repo.getFilesChanges(branch, tosha) diff --git a/zuul/merger/server.py b/zuul/merger/server.py index 576d41ed5..aa04fc206 100644 --- a/zuul/merger/server.py +++ b/zuul/merger/server.py @@ -81,6 +81,7 @@ class MergeServer(object): self.worker.registerFunction("merger:merge") self.worker.registerFunction("merger:cat") self.worker.registerFunction("merger:refstate") + self.worker.registerFunction("merger:fileschanges") def stop(self): self.log.debug("Stopping") @@ -117,6 +118,9 @@ class MergeServer(object): elif job.name == 'merger:refstate': self.log.debug("Got refstate job: %s" % job.unique) self.refstate(job) + elif job.name == 'merger:fileschanges': + self.log.debug("Got fileschanges job: %s" % job.unique) + self.fileschanges(job) else: self.log.error("Unable to handle job %s" % job.name) job.sendWorkFail() @@ -158,3 +162,12 @@ class MergeServer(object): result = dict(updated=True, files=files) job.sendWorkComplete(json.dumps(result)) + + def fileschanges(self, job): + args = json.loads(job.arguments) + self.merger.updateRepo(args['connection'], args['project']) + files = self.merger.getFilesChanges( + args['connection'], args['project'], args['branch'], args['tosha']) + result = dict(updated=True, + files=files) + job.sendWorkComplete(json.dumps(result)) |