diff options
Diffstat (limited to 'zuul/driver/git/gitconnection.py')
-rw-r--r-- | zuul/driver/git/gitconnection.py | 203 |
1 files changed, 200 insertions, 3 deletions
diff --git a/zuul/driver/git/gitconnection.py b/zuul/driver/git/gitconnection.py index f93824d2f..1886cfcca 100644 --- a/zuul/driver/git/gitconnection.py +++ b/zuul/driver/git/gitconnection.py @@ -13,12 +13,122 @@ # License for the specific language governing permissions and limitations # under the License. +import os +import git +import time import logging import urllib +import threading import voluptuous as v from zuul.connection import BaseConnection +from zuul.driver.git.gitmodel import GitTriggerEvent, EMPTY_GIT_REF +from zuul.model import Ref, Branch + + +class GitWatcher(threading.Thread): + log = logging.getLogger("connection.git.GitWatcher") + + def __init__(self, git_connection, baseurl, poll_delay): + threading.Thread.__init__(self) + self.daemon = True + self.git_connection = git_connection + self.baseurl = baseurl + self.poll_delay = poll_delay + self._stopped = False + self.projects_refs = self.git_connection.projects_refs + # This is used by the test framework + self._event_count = 0 + + def compareRefs(self, project, refs): + partial_events = [] + # Fetch previous refs state + base_refs = self.projects_refs.get(project) + # Create list of created refs + rcreateds = set(refs.keys()) - set(base_refs.keys()) + # Create list of deleted refs + rdeleteds = set(base_refs.keys()) - set(refs.keys()) + # Create the list of updated refs + updateds = {} + for ref, sha in refs.items(): + if ref in base_refs and base_refs[ref] != sha: + updateds[ref] = sha + for ref in rcreateds: + event = { + 'ref': ref, + 'branch_created': True, + 'oldrev': EMPTY_GIT_REF, + 'newrev': refs[ref] + } + partial_events.append(event) + for ref in rdeleteds: + event = { + 'ref': ref, + 'branch_deleted': True, + 'oldrev': base_refs[ref], + 'newrev': EMPTY_GIT_REF + } + partial_events.append(event) + for ref, sha in updateds.items(): + event = { + 'ref': ref, + 'branch_updated': True, + 'oldrev': base_refs[ref], + 'newrev': sha + } + partial_events.append(event) + events = [] + for pevent in partial_events: + event = GitTriggerEvent() + event.type = 'ref-updated' + event.project_hostname = self.git_connection.canonical_hostname + event.project_name = project + for attr in ('ref', 'oldrev', 'newrev', 'branch_created', + 'branch_deleted', 'branch_updated'): + if attr in pevent: + setattr(event, attr, pevent[attr]) + events.append(event) + return events + + def _run(self): + self.log.debug("Walk through projects refs for connection: %s" % + self.git_connection.connection_name) + try: + for project in self.git_connection.projects: + refs = self.git_connection.lsRemote(project) + self.log.debug("Read refs %s for project %s" % (refs, project)) + if not self.projects_refs.get(project): + # State for this project does not exist yet so add it. + # No event will be triggered in this loop as + # projects_refs['project'] and refs are equal + self.projects_refs[project] = refs + events = self.compareRefs(project, refs) + self.projects_refs[project] = refs + # Send events to the scheduler + for event in events: + self.log.debug("Handling event: %s" % event) + # Force changes cache update before passing + # the event to the scheduler + self.git_connection.getChange(event) + self.git_connection.logEvent(event) + # Pass the event to the scheduler + self.git_connection.sched.addEvent(event) + self._event_count += 1 + except Exception as e: + self.log.debug("Unexpected issue in _run loop: %s" % str(e)) + + def run(self): + while not self._stopped: + if not self.git_connection.w_pause: + self._run() + # Polling wait delay + else: + self.log.debug("Watcher is on pause") + time.sleep(self.poll_delay) + + def stop(self): + self._stopped = True class GitConnection(BaseConnection): @@ -32,6 +142,8 @@ class GitConnection(BaseConnection): raise Exception('baseurl is required for git connections in ' '%s' % self.connection_name) self.baseurl = self.connection_config.get('baseurl') + self.poll_timeout = float( + self.connection_config.get('poll_delay', 3600 * 2)) self.canonical_hostname = self.connection_config.get( 'canonical_hostname') if not self.canonical_hostname: @@ -40,7 +152,10 @@ class GitConnection(BaseConnection): self.canonical_hostname = r.hostname else: self.canonical_hostname = 'localhost' + self.w_pause = False self.projects = {} + self.projects_refs = {} + self._change_cache = {} def getProject(self, name): return self.projects.get(name) @@ -48,15 +163,97 @@ class GitConnection(BaseConnection): def addProject(self, project): self.projects[project.name] = project + def getChangeFilesUpdated(self, project_name, branch, tosha): + job = self.sched.merger.getFilesChanges( + self.connection_name, project_name, branch, tosha) + self.log.debug("Waiting for fileschanges job %s" % job) + job.wait() + if not job.updated: + raise Exception("Fileschanges job %s failed" % job) + self.log.debug("Fileschanges job %s got changes on files %s" % + (job, job.files)) + return job.files + + def lsRemote(self, project): + refs = {} + client = git.cmd.Git() + output = client.ls_remote( + os.path.join(self.baseurl, project)) + for line in output.splitlines(): + sha, ref = line.split('\t') + if ref.startswith('refs/'): + refs[ref] = sha + return refs + + def maintainCache(self, relevant): + remove = {} + for branch, refschange in self._change_cache.items(): + for ref, change in refschange.items(): + if change not in relevant: + remove.setdefault(branch, []).append(ref) + for branch, refs in remove.items(): + for ref in refs: + del self._change_cache[branch][ref] + if not self._change_cache[branch]: + del self._change_cache[branch] + + def getChange(self, event, refresh=False): + if event.ref and event.ref.startswith('refs/heads/'): + branch = event.ref[len('refs/heads/'):] + change = self._change_cache.get(branch, {}).get(event.newrev) + if change: + return change + project = self.getProject(event.project_name) + change = Branch(project) + change.branch = branch + for attr in ('ref', 'oldrev', 'newrev'): + setattr(change, attr, getattr(event, attr)) + change.url = "" + change.files = self.getChangeFilesUpdated( + event.project_name, change.branch, event.oldrev) + self._change_cache.setdefault(branch, {})[event.newrev] = change + elif event.ref: + # catch-all ref (ie, not a branch or head) + project = self.getProject(event.project_name) + change = Ref(project) + for attr in ('ref', 'oldrev', 'newrev'): + setattr(change, attr, getattr(event, attr)) + change.url = "" + else: + self.log.warning("Unable to get change for %s" % (event,)) + change = None + return change + def getProjectBranches(self, project, tenant): - # TODO(jeblair): implement; this will need to handle local or - # remote git urls. - return ['master'] + refs = self.lsRemote(project.name) + branches = [ref[len('refs/heads/'):] for ref in + refs if ref.startswith('refs/heads/')] + return branches def getGitUrl(self, project): url = '%s/%s' % (self.baseurl, project.name) return url + def onLoad(self): + self.log.debug("Starting Git Watcher") + self._start_watcher_thread() + + def onStop(self): + self.log.debug("Stopping Git Watcher") + self._stop_watcher_thread() + + def _stop_watcher_thread(self): + if self.watcher_thread: + self.watcher_thread.stop() + self.watcher_thread.join() + + def _start_watcher_thread(self): + self.watcher_thread = GitWatcher( + self, + self.baseurl, + self.poll_timeout) + self.watcher_thread.start() + def getSchema(): git_connection = v.Any(str, v.Schema(dict)) |