summaryrefslogtreecommitdiff
path: root/zuul
diff options
context:
space:
mode:
Diffstat (limited to 'zuul')
-rw-r--r--zuul/driver/git/__init__.py7
-rw-r--r--zuul/driver/git/gitconnection.py200
-rw-r--r--zuul/driver/git/gitmodel.py86
-rw-r--r--zuul/driver/git/gitsource.py2
-rw-r--r--zuul/driver/git/gittrigger.py49
-rw-r--r--zuul/executor/server.py17
-rw-r--r--zuul/merger/client.py9
-rw-r--r--zuul/merger/merger.py17
-rw-r--r--zuul/merger/server.py13
9 files changed, 396 insertions, 4 deletions
diff --git a/zuul/driver/git/__init__.py b/zuul/driver/git/__init__.py
index 0faa0365a..1fe43f643 100644
--- a/zuul/driver/git/__init__.py
+++ b/zuul/driver/git/__init__.py
@@ -15,6 +15,7 @@
from zuul.driver import Driver, ConnectionInterface, SourceInterface
from zuul.driver.git import gitconnection
from zuul.driver.git import gitsource
+from zuul.driver.git import gittrigger
class GitDriver(Driver, ConnectionInterface, SourceInterface):
@@ -23,9 +24,15 @@ class GitDriver(Driver, ConnectionInterface, SourceInterface):
def getConnection(self, name, config):
return gitconnection.GitConnection(self, name, config)
+ def getTrigger(self, connection, config=None):
+ return gittrigger.GitTrigger(self, connection, config)
+
def getSource(self, connection):
return gitsource.GitSource(self, connection)
+ def getTriggerSchema(self):
+ return gittrigger.getSchema()
+
def getRequireSchema(self):
return {}
diff --git a/zuul/driver/git/gitconnection.py b/zuul/driver/git/gitconnection.py
index f93824d2f..03b24cadc 100644
--- a/zuul/driver/git/gitconnection.py
+++ b/zuul/driver/git/gitconnection.py
@@ -13,12 +13,119 @@
# License for the specific language governing permissions and limitations
# under the License.
+import os
+import git
+import time
import logging
import urllib
+import threading
import voluptuous as v
from zuul.connection import BaseConnection
+from zuul.driver.git.gitmodel import GitTriggerEvent, EMPTY_GIT_REF
+from zuul.model import Ref, Branch
+
+
+class GitWatcher(threading.Thread):
+ log = logging.getLogger("connection.git.GitWatcher")
+
+ def __init__(self, git_connection, baseurl, poll_delay):
+ threading.Thread.__init__(self)
+ self.daemon = True
+ self.git_connection = git_connection
+ self.baseurl = baseurl
+ self.poll_delay = poll_delay
+ self._stopped = False
+ self.projects_refs = self.git_connection.projects_refs
+
+ def compareRefs(self, project, refs):
+ partial_events = []
+ # Fetch previous refs state
+ base_refs = self.projects_refs.get(project)
+ # Create list of created refs
+ rcreateds = set(refs.keys()) - set(base_refs.keys())
+ # Create list of deleted refs
+ rdeleteds = set(base_refs.keys()) - set(refs.keys())
+ # Create the list of updated refs
+ updateds = {}
+ for ref, sha in refs.items():
+ if ref in base_refs and base_refs[ref] != sha:
+ updateds[ref] = sha
+ for ref in rcreateds:
+ event = {
+ 'ref': ref,
+ 'branch_created': True,
+ 'oldrev': EMPTY_GIT_REF,
+ 'newrev': refs[ref]
+ }
+ partial_events.append(event)
+ for ref in rdeleteds:
+ event = {
+ 'ref': ref,
+ 'branch_deleted': True,
+ 'oldrev': base_refs[ref],
+ 'newrev': EMPTY_GIT_REF
+ }
+ partial_events.append(event)
+ for ref, sha in updateds.items():
+ event = {
+ 'ref': ref,
+ 'branch_updated': True,
+ 'oldrev': base_refs[ref],
+ 'newrev': sha
+ }
+ partial_events.append(event)
+ events = []
+ for pevent in partial_events:
+ event = GitTriggerEvent()
+ event.type = 'ref-updated'
+ event.project_hostname = self.git_connection.canonical_hostname
+ event.project_name = project
+ for attr in ('ref', 'oldrev', 'newrev', 'branch_created',
+ 'branch_deleted', 'branch_updated'):
+ if attr in pevent:
+ setattr(event, attr, pevent[attr])
+ events.append(event)
+ return events
+
+ def _run(self):
+ self.log.debug("Walk through projects refs for connection: %s" %
+ self.git_connection.connection_name)
+ try:
+ for project in self.git_connection.projects:
+ refs = self.git_connection.lsRemote(project)
+ self.log.debug("Read refs %s for project %s" % (refs, project))
+ if not self.projects_refs.get(project):
+ # State for this project does not exist yet so add it.
+ # No event will be triggered in this loop as
+ # projects_refs['project'] and refs are equal
+ self.projects_refs[project] = refs
+ events = self.compareRefs(project, refs)
+ self.projects_refs[project] = refs
+ # Send events to the scheduler
+ for event in events:
+ self.log.debug("Handling event: %s" % event)
+ # Force changes cache update before passing
+ # the event to the scheduler
+ self.git_connection.getChange(event)
+ self.git_connection.logEvent(event)
+ # Pass the event to the scheduler
+ self.git_connection.sched.addEvent(event)
+ except Exception as e:
+ self.log.debug("Unexpected issue in _run loop: %s" % str(e))
+
+ def run(self):
+ while not self._stopped:
+ if not self.git_connection.w_pause:
+ self._run()
+ # Polling wait delay
+ else:
+ self.log.debug("Watcher is on pause")
+ time.sleep(self.poll_delay)
+
+ def stop(self):
+ self._stopped = True
class GitConnection(BaseConnection):
@@ -32,6 +139,8 @@ class GitConnection(BaseConnection):
raise Exception('baseurl is required for git connections in '
'%s' % self.connection_name)
self.baseurl = self.connection_config.get('baseurl')
+ self.poll_timeout = float(
+ self.connection_config.get('poll_delay', 3600 * 2))
self.canonical_hostname = self.connection_config.get(
'canonical_hostname')
if not self.canonical_hostname:
@@ -40,7 +149,10 @@ class GitConnection(BaseConnection):
self.canonical_hostname = r.hostname
else:
self.canonical_hostname = 'localhost'
+ self.w_pause = False
self.projects = {}
+ self.projects_refs = {}
+ self._change_cache = {}
def getProject(self, name):
return self.projects.get(name)
@@ -48,15 +160,97 @@ class GitConnection(BaseConnection):
def addProject(self, project):
self.projects[project.name] = project
+ def getChangeFilesUpdated(self, project_name, branch, tosha):
+ job = self.sched.merger.getFilesChanges(
+ self.connection_name, project_name, branch, tosha)
+ self.log.debug("Waiting for fileschanges job %s" % job)
+ job.wait()
+ if not job.updated:
+ raise Exception("Fileschanges job %s failed" % job)
+ self.log.debug("Fileschanges job %s got changes on files %s" %
+ (job, job.files))
+ return job.files
+
+ def lsRemote(self, project):
+ refs = {}
+ client = git.cmd.Git()
+ output = client.ls_remote(
+ os.path.join(self.baseurl, project))
+ for line in output.splitlines():
+ sha, ref = line.split('\t')
+ if ref.startswith('refs/'):
+ refs[ref] = sha
+ return refs
+
+ def maintainCache(self, relevant):
+ remove = {}
+ for branch, refschange in self._change_cache.items():
+ for ref, change in refschange.items():
+ if change not in relevant:
+ remove.setdefault(branch, []).append(ref)
+ for branch, refs in remove.items():
+ for ref in refs:
+ del self._change_cache[branch][ref]
+ if not self._change_cache[branch]:
+ del self._change_cache[branch]
+
+ def getChange(self, event, refresh=False):
+ if event.ref and event.ref.startswith('refs/heads/'):
+ branch = event.ref[len('refs/heads/'):]
+ change = self._change_cache.get(branch, {}).get(event.newrev)
+ if change:
+ return change
+ project = self.getProject(event.project_name)
+ change = Branch(project)
+ change.branch = branch
+ for attr in ('ref', 'oldrev', 'newrev'):
+ setattr(change, attr, getattr(event, attr))
+ change.url = ""
+ change.files = self.getChangeFilesUpdated(
+ event.project_name, change.branch, event.oldrev)
+ self._change_cache.setdefault(branch, {})[event.newrev] = change
+ elif event.ref:
+ # catch-all ref (ie, not a branch or head)
+ project = self.getProject(event.project_name)
+ change = Ref(project)
+ for attr in ('ref', 'oldrev', 'newrev'):
+ setattr(change, attr, getattr(event, attr))
+ change.url = ""
+ else:
+ self.log.warning("Unable to get change for %s" % (event,))
+ change = None
+ return change
+
def getProjectBranches(self, project, tenant):
- # TODO(jeblair): implement; this will need to handle local or
- # remote git urls.
- return ['master']
+ refs = self.lsRemote(project.name)
+ branches = [ref[len('refs/heads/'):] for ref in
+ refs if ref.startswith('refs/heads/')]
+ return branches
def getGitUrl(self, project):
url = '%s/%s' % (self.baseurl, project.name)
return url
+ def onLoad(self):
+ self.log.debug("Starting Git Watcher")
+ self._start_watcher_thread()
+
+ def onStop(self):
+ self.log.debug("Stopping Git Watcher")
+ self._stop_watcher_thread()
+
+ def _stop_watcher_thread(self):
+ if self.watcher_thread:
+ self.watcher_thread.stop()
+ self.watcher_thread.join()
+
+ def _start_watcher_thread(self):
+ self.watcher_thread = GitWatcher(
+ self,
+ self.baseurl,
+ self.poll_timeout)
+ self.watcher_thread.start()
+
def getSchema():
git_connection = v.Any(str, v.Schema(dict))
diff --git a/zuul/driver/git/gitmodel.py b/zuul/driver/git/gitmodel.py
new file mode 100644
index 000000000..5d12b36da
--- /dev/null
+++ b/zuul/driver/git/gitmodel.py
@@ -0,0 +1,86 @@
+# Copyright 2017 Red Hat, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import re
+
+from zuul.model import TriggerEvent
+from zuul.model import EventFilter
+
+
+EMPTY_GIT_REF = '0' * 40 # git sha of all zeros, used during creates/deletes
+
+
+class GitTriggerEvent(TriggerEvent):
+ """Incoming event from an external system."""
+
+ def __repr__(self):
+ ret = '<GitTriggerEvent %s %s' % (self.type,
+ self.project_name)
+
+ if self.branch:
+ ret += " %s" % self.branch
+ ret += " oldrev:%s" % self.oldrev
+ ret += " newrev:%s" % self.newrev
+ ret += '>'
+
+ return ret
+
+
+class GitEventFilter(EventFilter):
+ def __init__(self, trigger, types=[], refs=[],
+ ignore_deletes=True):
+
+ super().__init__(trigger)
+
+ self._refs = refs
+ self.types = types
+ self.refs = [re.compile(x) for x in refs]
+ self.ignore_deletes = ignore_deletes
+
+ def __repr__(self):
+ ret = '<GitEventFilter'
+
+ if self.types:
+ ret += ' types: %s' % ', '.join(self.types)
+ if self._refs:
+ ret += ' refs: %s' % ', '.join(self._refs)
+ if self.ignore_deletes:
+ ret += ' ignore_deletes: %s' % self.ignore_deletes
+ ret += '>'
+
+ return ret
+
+ def matches(self, event, change):
+ # event types are ORed
+ matches_type = False
+ for etype in self.types:
+ if etype == event.type:
+ matches_type = True
+ if self.types and not matches_type:
+ return False
+
+ # refs are ORed
+ matches_ref = False
+ if event.ref is not None:
+ for ref in self.refs:
+ if ref.match(event.ref):
+ matches_ref = True
+ if self.refs and not matches_ref:
+ return False
+ if self.ignore_deletes and event.newrev == EMPTY_GIT_REF:
+ # If the updated ref has an empty git sha (all 0s),
+ # then the ref is being deleted
+ return False
+
+ return True
diff --git a/zuul/driver/git/gitsource.py b/zuul/driver/git/gitsource.py
index 8d85c082f..78ae04ee7 100644
--- a/zuul/driver/git/gitsource.py
+++ b/zuul/driver/git/gitsource.py
@@ -36,7 +36,7 @@ class GitSource(BaseSource):
raise NotImplemented()
def getChange(self, event, refresh=False):
- raise NotImplemented()
+ return self.connection.getChange(event, refresh)
def getProject(self, name):
p = self.connection.getProject(name)
diff --git a/zuul/driver/git/gittrigger.py b/zuul/driver/git/gittrigger.py
new file mode 100644
index 000000000..28852307e
--- /dev/null
+++ b/zuul/driver/git/gittrigger.py
@@ -0,0 +1,49 @@
+# Copyright 2017 Red Hat, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import logging
+import voluptuous as v
+from zuul.trigger import BaseTrigger
+from zuul.driver.git.gitmodel import GitEventFilter
+from zuul.driver.util import scalar_or_list, to_list
+
+
+class GitTrigger(BaseTrigger):
+ name = 'git'
+ log = logging.getLogger("zuul.GitTrigger")
+
+ def getEventFilters(self, trigger_conf):
+ efilters = []
+ for trigger in to_list(trigger_conf):
+ f = GitEventFilter(
+ trigger=self,
+ types=to_list(trigger['event']),
+ refs=to_list(trigger.get('ref')),
+ ignore_deletes=trigger.get(
+ 'ignore-deletes', True)
+ )
+ efilters.append(f)
+
+ return efilters
+
+
+def getSchema():
+ git_trigger = {
+ v.Required('event'):
+ scalar_or_list(v.Any('ref-updated')),
+ 'ref': scalar_or_list(str),
+ 'ignore-deletes': bool,
+ }
+
+ return git_trigger
diff --git a/zuul/executor/server.py b/zuul/executor/server.py
index 7a93f896b..22dee9aca 100644
--- a/zuul/executor/server.py
+++ b/zuul/executor/server.py
@@ -1706,6 +1706,7 @@ class ExecutorServer(object):
self.merger_worker.registerFunction("merger:merge")
self.merger_worker.registerFunction("merger:cat")
self.merger_worker.registerFunction("merger:refstate")
+ self.merger_worker.registerFunction("merger:fileschanges")
def register_work(self):
if self._running:
@@ -1859,6 +1860,9 @@ class ExecutorServer(object):
elif job.name == 'merger:refstate':
self.log.debug("Got refstate job: %s" % job.unique)
self.refstate(job)
+ elif job.name == 'merger:fileschanges':
+ self.log.debug("Got fileschanges job: %s" % job.unique)
+ self.fileschanges(job)
else:
self.log.error("Unable to handle job %s" % job.name)
job.sendWorkFail()
@@ -1970,6 +1974,19 @@ class ExecutorServer(object):
files=files)
job.sendWorkComplete(json.dumps(result))
+ def fileschanges(self, job):
+ args = json.loads(job.arguments)
+ task = self.update(args['connection'], args['project'])
+ task.wait()
+ with self.merger_lock:
+ files = self.merger.getFilesChanges(
+ args['connection'], args['project'],
+ args['branch'],
+ args['tosha'])
+ result = dict(updated=True,
+ files=files)
+ job.sendWorkComplete(json.dumps(result))
+
def refstate(self, job):
args = json.loads(job.arguments)
with self.merger_lock:
diff --git a/zuul/merger/client.py b/zuul/merger/client.py
index 2614e5887..c89a6fba8 100644
--- a/zuul/merger/client.py
+++ b/zuul/merger/client.py
@@ -131,6 +131,15 @@ class MergeClient(object):
job = self.submitJob('merger:cat', data, None, precedence)
return job
+ def getFilesChanges(self, connection_name, project_name, branch,
+ tosha=None, precedence=zuul.model.PRECEDENCE_HIGH):
+ data = dict(connection=connection_name,
+ project=project_name,
+ branch=branch,
+ tosha=tosha)
+ job = self.submitJob('merger:fileschanges', data, None, precedence)
+ return job
+
def onBuildCompleted(self, job):
data = getJobData(job)
merged = data.get('merged', False)
diff --git a/zuul/merger/merger.py b/zuul/merger/merger.py
index 06ec4b2b9..bd4ca58ee 100644
--- a/zuul/merger/merger.py
+++ b/zuul/merger/merger.py
@@ -314,6 +314,18 @@ class Repo(object):
'utf-8')
return ret
+ def getFilesChanges(self, branch, tosha=None):
+ repo = self.createRepoObject()
+ files = set()
+ head = repo.heads[branch].commit
+ files.update(set(head.stats.files.keys()))
+ if tosha:
+ for cmt in head.iter_parents():
+ if cmt.hexsha == tosha:
+ break
+ files.update(set(cmt.stats.files.keys()))
+ return list(files)
+
def deleteRemote(self, remote):
repo = self.createRepoObject()
repo.delete_remote(repo.remotes[remote])
@@ -581,3 +593,8 @@ class Merger(object):
def getFiles(self, connection_name, project_name, branch, files, dirs=[]):
repo = self.getRepo(connection_name, project_name)
return repo.getFiles(files, dirs, branch=branch)
+
+ def getFilesChanges(self, connection_name, project_name, branch,
+ tosha=None):
+ repo = self.getRepo(connection_name, project_name)
+ return repo.getFilesChanges(branch, tosha)
diff --git a/zuul/merger/server.py b/zuul/merger/server.py
index 576d41ed5..aa04fc206 100644
--- a/zuul/merger/server.py
+++ b/zuul/merger/server.py
@@ -81,6 +81,7 @@ class MergeServer(object):
self.worker.registerFunction("merger:merge")
self.worker.registerFunction("merger:cat")
self.worker.registerFunction("merger:refstate")
+ self.worker.registerFunction("merger:fileschanges")
def stop(self):
self.log.debug("Stopping")
@@ -117,6 +118,9 @@ class MergeServer(object):
elif job.name == 'merger:refstate':
self.log.debug("Got refstate job: %s" % job.unique)
self.refstate(job)
+ elif job.name == 'merger:fileschanges':
+ self.log.debug("Got fileschanges job: %s" % job.unique)
+ self.fileschanges(job)
else:
self.log.error("Unable to handle job %s" % job.name)
job.sendWorkFail()
@@ -158,3 +162,12 @@ class MergeServer(object):
result = dict(updated=True,
files=files)
job.sendWorkComplete(json.dumps(result))
+
+ def fileschanges(self, job):
+ args = json.loads(job.arguments)
+ self.merger.updateRepo(args['connection'], args['project'])
+ files = self.merger.getFilesChanges(
+ args['connection'], args['project'], args['branch'], args['tosha'])
+ result = dict(updated=True,
+ files=files)
+ job.sendWorkComplete(json.dumps(result))