diff options
Diffstat (limited to 'tools/server-side/svnpubsub')
21 files changed, 2434 insertions, 0 deletions
diff --git a/tools/server-side/svnpubsub/README.txt b/tools/server-side/svnpubsub/README.txt new file mode 100644 index 0000000..ad4975e --- /dev/null +++ b/tools/server-side/svnpubsub/README.txt @@ -0,0 +1,24 @@ +Installation instructions: + +1. Set up an svnpubsub service. + + This directory should be checked out to /usr/local/svnpubsub (or /opt/svnpubsub + on Debian). + + There are init scripts for several OSes in the rc.d/ directory; add them + to your OS boot process in the usual way for your OS. (For example, via + rc.conf(5) or update-rc.d(8).) + +2. Run "commit-hook.py $REPOS $REV" from your post-commit hook. + + (As of 1.7, these are the same ordered arguments the post-commmit hook + itself receives, so you can just symlink commit-hook.py as hooks/post-commit + hook if you don't need any other hooks to run in the server process. (This + isn't as insane as it sounds --- post-commit email hooks could also feed of + svnpubsub, and thus not be run within the committing server thread, but on + any other process or box that listens to the svnpubsub stream!)) + +3. Set up svnpubsub clients. + + (eg svnwcsub.py, svnpubsub/client.py, + 'curl -sN http://${hostname}:2069/commits') diff --git a/tools/server-side/svnpubsub/commit-hook.py b/tools/server-side/svnpubsub/commit-hook.py new file mode 100755 index 0000000..4a1a3f3 --- /dev/null +++ b/tools/server-side/svnpubsub/commit-hook.py @@ -0,0 +1,93 @@ +#!/usr/local/bin/python +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +SVNLOOK="/usr/local/svn-install/current/bin/svnlook" +#SVNLOOK="/usr/local/bin/svnlook" + +HOST="127.0.0.1" +PORT=2069 + +import sys +import subprocess +try: + import simplejson as json +except ImportError: + import json + +import urllib2 + +def svncmd(cmd): + return subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE) + +def svncmd_uuid(repo): + cmd = "%s uuid %s" % (SVNLOOK, repo) + p = svncmd(cmd) + return p.stdout.read().strip() + +def svncmd_info(repo, revision): + cmd = "%s info -r %s %s" % (SVNLOOK, revision, repo) + p = svncmd(cmd) + data = p.stdout.read().split("\n") + #print data + return {'author': data[0].strip(), + 'date': data[1].strip(), + 'log': "\n".join(data[3:]).strip()} + +def svncmd_changed(repo, revision): + cmd = "%s changed -r %s %s" % (SVNLOOK, revision, repo) + p = svncmd(cmd) + changed = {} + while True: + line = p.stdout.readline() + if not line: + break + line = line.strip() + (flags, filename) = (line[0:3], line[4:]) + changed[filename] = {'flags': flags} + return changed + +def do_put(body): + opener = urllib2.build_opener(urllib2.HTTPHandler) + request = urllib2.Request("http://%s:%d/commits" %(HOST, PORT), data=body) + request.add_header('Content-Type', 'application/json') + request.get_method = lambda: 'PUT' + url = opener.open(request) + + +def main(repo, revision): + revision = revision.lstrip('r') + i = svncmd_info(repo, revision) + data = {'type': 'svn', + 'format': 1, + 'id': int(revision), + 'changed': {}, + 'repository': svncmd_uuid(repo), + 'committer': i['author'], + 'log': i['log'], + 'date': i['date'], + } + data['changed'].update(svncmd_changed(repo, revision)) + body = json.dumps(data) + do_put(body) + +if __name__ == "__main__": + if len(sys.argv) not in (3, 4): + sys.stderr.write("invalid args\n") + sys.exit(0) + + main(*sys.argv[1:3]) diff --git a/tools/server-side/svnpubsub/daemonize.py b/tools/server-side/svnpubsub/daemonize.py new file mode 100644 index 0000000..8b85258 --- /dev/null +++ b/tools/server-side/svnpubsub/daemonize.py @@ -0,0 +1,272 @@ +# --------------------------------------------------------------------------- +# +# Copyright (c) 2005, Greg Stein +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# --------------------------------------------------------------------------- +# +# This software lives at: +# http://gstein.googlecode.com/svn/trunk/python/daemonize.py +# + +import os +import signal +import sys +import time + + +# possible return values from Daemon.daemonize() +DAEMON_RUNNING = 'The daemon is running' +DAEMON_NOT_RUNNING = 'The daemon is not running' +DAEMON_COMPLETE = 'The daemon has completed its operations' +DAEMON_STARTED = 'The daemon has been started' + + +class Daemon(object): + + def __init__(self, logfile, pidfile): + self.logfile = logfile + self.pidfile = pidfile + + def foreground(self): + "Run in the foreground." + ### we should probably create a pidfile. other systems may try to detect + ### the pidfile to see if this "daemon" is running. + self.setup() + self.run() + ### remove the pidfile + + def daemonize_exit(self): + try: + result = self.daemonize() + except (ChildFailed, DaemonFailed) as e: + # duplicate the exit code + sys.exit(e.code) + except (ChildTerminatedAbnormally, ChildForkFailed, + DaemonTerminatedAbnormally, DaemonForkFailed) as e: + sys.stderr.write('ERROR: %s\n' % e) + sys.exit(1) + except ChildResumedIncorrectly: + sys.stderr.write('ERROR: continued after receiving unknown signal.\n') + sys.exit(1) + + if result == DAEMON_STARTED or result == DAEMON_COMPLETE: + sys.exit(0) + elif result == DAEMON_NOT_RUNNING: + sys.stderr.write('ERROR: the daemon exited with a success code ' + 'without signalling its startup.\n') + sys.exit(1) + + # in original process. daemon is up and running. we're done. + + def daemonize(self): + # fork off a child that can detach itself from this process. + try: + pid = os.fork() + except OSError as e: + raise ChildForkFailed(e.errno, e.strerror) + + if pid > 0: + # we're in the parent. let's wait for the child to finish setting + # things up -- on our exit, we want to ensure the child is accepting + # connections. + cpid, status = os.waitpid(pid, 0) + assert pid == cpid + if os.WIFEXITED(status): + code = os.WEXITSTATUS(status) + if code: + raise ChildFailed(code) + return DAEMON_RUNNING + + # the child did not exit cleanly. + raise ChildTerminatedAbnormally(status) + + # we're in the child. + + # decouple from the parent process + os.chdir('/') + os.umask(0) + os.setsid() + + # remember this pid so the second child can signal it. + thispid = os.getpid() + + # register a signal handler so the SIGUSR1 doesn't stop the process. + # this object will also record whether if got signalled. + daemon_accepting = SignalCatcher(signal.SIGUSR1) + + # if the daemon process exits before sending SIGUSR1, then we need to see + # the problem. trap SIGCHLD with a SignalCatcher. + daemon_exit = SignalCatcher(signal.SIGCHLD) + + # perform the second fork + try: + pid = os.fork() + except OSError as e: + raise DaemonForkFailed(e.errno, e.strerror) + + if pid > 0: + # in the parent. + + # we want to wait for the daemon to signal that it has created and + # bound the socket, and is (thus) ready for connections. if the + # daemon improperly exits before serving, we'll see SIGCHLD and the + # .pause will return. + ### we should add a timeout to this. allow an optional parameter to + ### specify the timeout, in case it takes a long time to start up. + signal.pause() + + if daemon_exit.signalled: + # reap the daemon process, getting its exit code. bubble it up. + cpid, status = os.waitpid(pid, 0) + assert pid == cpid + if os.WIFEXITED(status): + code = os.WEXITSTATUS(status) + if code: + raise DaemonFailed(code) + return DAEMON_NOT_RUNNING + + # the daemon did not exit cleanly. + raise DaemonTerminatedAbnormally(status) + + if daemon_accepting.signalled: + # the daemon is up and running, so save the pid and return success. + if self.pidfile: + # Be wary of symlink attacks + try: + os.remove(self.pidfile) + except OSError: + pass + fd = os.open(self.pidfile, os.O_WRONLY | os.O_CREAT | os.O_EXCL, 0444) + os.write(fd, '%d\n' % pid) + os.close(fd) + return DAEMON_STARTED + + # some other signal popped us out of the pause. the daemon might not + # be running. + raise ChildResumedIncorrectly() + + # we're a deamon now. get rid of the final remnants of the parent. + # start by restoring default signal handlers + signal.signal(signal.SIGUSR1, signal.SIG_DFL) + signal.signal(signal.SIGCHLD, signal.SIG_DFL) + sys.stdout.flush() + sys.stderr.flush() + si = open('/dev/null', 'r') + so = open(self.logfile, 'a+') + se = open(self.logfile, 'a+', 0) # unbuffered + os.dup2(si.fileno(), sys.stdin.fileno()) + os.dup2(so.fileno(), sys.stdout.fileno()) + os.dup2(se.fileno(), sys.stderr.fileno()) + # note: we could not inline the open() calls. after the fileno() completed, + # the file would be closed, making the fileno invalid. gotta hold them + # open until now: + si.close() + so.close() + se.close() + + # TEST: don't release the parent immediately. the whole parent stack + # should pause along with this sleep. + #time.sleep(10) + + # everything is set up. call the initialization function. + self.setup() + + # sleep for one second before signalling. we want to make sure the + # parent has called signal.pause() + ### we should think of a better wait around the race condition. + time.sleep(1) + + # okay. the daemon is ready. signal the parent to tell it we're set. + os.kill(thispid, signal.SIGUSR1) + + # start the daemon now. + self.run() + + # The daemon is shutting down, so toss the pidfile. + try: + os.remove(self.pidfile) + except OSError: + pass + + return DAEMON_COMPLETE + + def setup(self): + raise NotImplementedError + + def run(self): + raise NotImplementedError + + +class SignalCatcher(object): + def __init__(self, signum): + self.signalled = False + signal.signal(signum, self.sig_handler) + + def sig_handler(self, signum, frame): + self.signalled = True + + +class ChildTerminatedAbnormally(Exception): + "The child process terminated abnormally." + def __init__(self, status): + Exception.__init__(self, status) + self.status = status + def __str__(self): + return 'child terminated abnormally (0x%04x)' % self.status + +class ChildFailed(Exception): + "The child process exited with a failure code." + def __init__(self, code): + Exception.__init__(self, code) + self.code = code + def __str__(self): + return 'child failed with exit code %d' % self.code + +class ChildForkFailed(Exception): + "The child process could not be forked." + def __init__(self, errno, strerror): + Exception.__init__(self, errno, strerror) + self.errno = errno + self.strerror = strerror + def __str__(self): + return 'child fork failed with error %d (%s)' % self.args + +class ChildResumedIncorrectly(Exception): + "The child resumed its operation incorrectly." + +class DaemonTerminatedAbnormally(Exception): + "The daemon process terminated abnormally." + def __init__(self, status): + Exception.__init__(self, status) + self.status = status + def __str__(self): + return 'daemon terminated abnormally (0x%04x)' % self.status + +class DaemonFailed(Exception): + "The daemon process exited with a failure code." + def __init__(self, code): + Exception.__init__(self, code) + self.code = code + def __str__(self): + return 'daemon failed with exit code %d' % self.code + +class DaemonForkFailed(Exception): + "The daemon process could not be forked." + def __init__(self, errno, strerror): + Exception.__init__(self, errno, strerror) + self.errno = errno + self.strerror = strerror + def __str__(self): + return 'daemon fork failed with error %d (%s)' % self.args diff --git a/tools/server-side/svnpubsub/irkerbridge.py b/tools/server-side/svnpubsub/irkerbridge.py new file mode 100755 index 0000000..04b7ee2 --- /dev/null +++ b/tools/server-side/svnpubsub/irkerbridge.py @@ -0,0 +1,322 @@ +#!/usr/bin/env python +# +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# IrkerBridge - Bridge an SvnPubSub stream to Irker. + +# Example: +# irkerbridge.py --daemon --pidfile pid --logfile log config +# +# For detailed option help use: +# irkerbridge.py --help + +# It expects a config file that has the following parameters: +# streams=url +# Space separated list of URLs to streams. +# This option should only be in the DEFAULT section, is ignored in +# all other sections. +# irker=hostname:port +# The hostname/port combination of the irker daemon. If port is +# omitted it defaults to 6659. Irker is connected to over UDP. +# match=What to use to decide if the commit should be sent to irker. +# It consists of the repository UUID followed by a slash and a glob pattern. +# The UUID may be replaced by a * to match all UUIDs. The glob pattern will +# be matched against all of the dirs_changed. Both the UUID and the glob +# pattern must match to send the message to irker. +# to=url +# Space separated list of URLs (any URL that Irker will accept) to +# send the resulting message to. At current Irker only supports IRC. +# template=string +# A string to use to format the output. The string is a Python +# string Template. The following variables are available: +# $committer, $id, $date, $repository, $log, $log_firstline, +# $log_firstparagraph, $dirs_changed, $dirs_count, $dirs_count_s, +# $subdirs_count, $subdirs_count_s, $dirs_root +# Most of them should be self explanatory. $dirs_count is the number of +# entries in $dirs_changed, $dirs_count_s is a friendly string version, +# $dirs_root is the common root of all the $dirs_changed, $subdirs_count +# is the number of subdirs under the $dirs_root that changed, +# $subdirs_root_s is a friendly string version. $log_firstparagraph cuts +# the log message at the first blank line and replaces newlines with spaces. +# +# Within the config file you have sections. Any configuration option +# missing from a given section is found in the [DEFAULT] section. +# +# Section names are arbitrary names that mean nothing to the bridge. Each +# section other than the [DEFAULT] section consists of a configuration that +# may match and send a message to irker to deliver. All matching sections +# will generate a message. +# +# Interpolation of values within the config file is allowed by including +# %(name)s within a value. For example I can reference the UUID of a repo +# repeatedly by doing: +# [DEFAULT] +# ASF_REPO=13f79535-47bb-0310-9956-ffa450edef68 +# +# [#commits] +# match=%(ASF_REPO)s/ +# +# You can HUP the process to reload the config file without restarting the +# process. However, you cannot change the streams it is listening to without +# restarting the process. +# +# TODO: Logging in a better way. + +# Messages longer than this will be truncated and ... added to the end such +# that the resulting message is no longer than this: +MAX_PRIVMSG = 400 + +import os +import sys +import posixpath +import socket +import json +import urlparse +import optparse +import ConfigParser +import traceback +import signal +import re +import fnmatch +from string import Template + +# Packages that come with svnpubsub +import svnpubsub.client +import daemonize + +class Daemon(daemonize.Daemon): + def __init__(self, logfile, pidfile, bdec): + daemonize.Daemon.__init__(self, logfile, pidfile) + + self.bdec = bdec + + def setup(self): + # There is no setup which the parent needs to wait for. + pass + + def run(self): + print 'irkerbridge started, pid=%d' % (os.getpid()) + + mc = svnpubsub.client.MultiClient(self.bdec.urls, + self.bdec.commit, + self.bdec.event) + mc.run_forever() + + +class BigDoEverythingClass(object): + def __init__(self, config, options): + self.config = config + self.options = options + self.urls = config.get_value('streams').split() + + def locate_matching_configs(self, commit): + result = [ ] + for section in self.config.sections(): + match = self.config.get(section, "match").split('/', 1) + if len(match) < 2: + # No slash so assume all paths + match.append('*') + match_uuid, match_path = match + if commit.repository == match_uuid or match_uuid == "*": + for path in commit.changed: + if fnmatch.fnmatch(path, match_path): + result.append(section) + break + return result + + def _generate_dirs_changed(self, commit): + if hasattr(commit, 'dirs_changed') or not hasattr(commit, 'changed'): + return + + dirs_changed = set() + for p in commit.changed: + if p[-1] == '/' and commit.changed[p]['flags'][1] == 'U': + # directory with property changes add the directory itself. + dirs_changed.add(p) + else: + # everything else add the parent of the path + # directories have a trailing slash so if it's present remove + # it before finding the parent. The result will be a directory + # so it needs a trailing slash + dirs_changed.add(posixpath.dirname(p.rstrip('/')) + '/') + + commit.dirs_changed = dirs_changed + return + + def fill_in_extra_args(self, commit): + # Set any empty members to the string "<null>" + v = vars(commit) + for k in v.keys(): + if not v[k]: + v[k] = '<null>' + + self._generate_dirs_changed(commit) + # Add entries to the commit object that are useful for + # formatting. + commit.log_firstline = commit.log.split("\n",1)[0] + commit.log_firstparagraph = re.split("\r?\n\r?\n",commit.log,1)[0] + commit.log_firstparagraph = re.sub("\r?\n"," ",commit.log_firstparagraph) + if commit.dirs_changed: + commit.dirs_root = posixpath.commonprefix(commit.dirs_changed) + if commit.dirs_root == '': + commit.dirs_root = '/' + commit.dirs_count = len(commit.dirs_changed) + if commit.dirs_count > 1: + commit.dirs_count_s = " (%d dirs)" %(commit.dirs_count) + else: + commit.dirs_count_s = "" + + commit.subdirs_count = commit.dirs_count + if commit.dirs_root in commit.dirs_changed: + commit.subdirs_count -= 1 + if commit.subdirs_count >= 1: + commit.subdirs_count_s = " + %d subdirs" % (commit.subdirs_count) + else: + commit.subdirs_count_s = "" + + def _send(self, irker, msg): + sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + irker_list = irker.split(':') + if len(irker_list) < 2: + irker_list.append(6659) + json_msg = json.dumps(msg) + sock.sendto(json_msg, (irker_list[0],int(irker_list[1]))) + if self.options.verbose: + print "SENT: %s to %s" % (json_msg, irker) + + def join_all(self): + # Like self.commit(), but ignores self.config.get(section, "template"). + for section in self.config.sections(): + irker = self.config.get(section, "irker") + to_list = self.config.get(section, "to").split() + if not irker or not to_list: + continue + for to in to_list: + msg = {'to': to, 'privmsg': ''} + self._send(irker, msg) + + def commit(self, url, commit): + if self.options.verbose: + print "RECV: from %s" % url + print json.dumps(vars(commit), indent=2) + + try: + config_sections = self.locate_matching_configs(commit) + if len(config_sections) > 0: + self.fill_in_extra_args(commit) + for section in config_sections: + irker = self.config.get(section, "irker") + to_list = self.config.get(section, "to").split() + template = self.config.get(section, "template") + if not irker or not to_list or not template: + continue + privmsg = Template(template).safe_substitute(vars(commit)) + if len(privmsg) > MAX_PRIVMSG: + privmsg = privmsg[:MAX_PRIVMSG-3] + '...' + for to in to_list: + msg = {'to': to, 'privmsg': privmsg} + self._send(irker, msg) + + except: + print "Unexpected error:" + traceback.print_exc() + sys.stdout.flush() + raise + + def event(self, url, event_name, event_arg): + if self.options.verbose or event_name != "ping": + print 'EVENT: %s from %s' % (event_name, url) + sys.stdout.flush() + + + +class ReloadableConfig(ConfigParser.SafeConfigParser): + def __init__(self, fname): + ConfigParser.SafeConfigParser.__init__(self) + + self.fname = fname + self.read(fname) + + signal.signal(signal.SIGHUP, self.hangup) + + def hangup(self, signalnum, frame): + self.reload() + + def reload(self): + print "RELOAD: config file: %s" % self.fname + sys.stdout.flush() + + # Delete everything. Just re-reading would overlay, and would not + # remove sections/options. Note that [DEFAULT] will not be removed. + for section in self.sections(): + self.remove_section(section) + + # Get rid of [DEFAULT] + self.remove_section(ConfigParser.DEFAULTSECT) + + # Now re-read the configuration file. + self.read(self.fname) + + def get_value(self, which): + return self.get(ConfigParser.DEFAULTSECT, which) + + +def main(args): + parser = optparse.OptionParser( + description='An SvnPubSub client that bridges the data to irker.', + usage='Usage: %prog [options] CONFIG_FILE', + ) + parser.add_option('--logfile', + help='filename for logging') + parser.add_option('--verbose', action='store_true', + help="enable verbose logging") + parser.add_option('--pidfile', + help="the process' PID will be written to this file") + parser.add_option('--daemon', action='store_true', + help='run as a background daemon') + + options, extra = parser.parse_args(args) + + if len(extra) != 1: + parser.error('CONFIG_FILE is requried') + config_file = os.path.abspath(extra[0]) + + logfile, pidfile = None, None + if options.daemon: + if options.logfile: + logfile = os.path.abspath(options.logfile) + else: + parser.error('LOGFILE is required when running as a daemon') + + if options.pidfile: + pidfile = os.path.abspath(options.pidfile) + else: + parser.error('PIDFILE is required when running as a daemon') + + + config = ReloadableConfig(config_file) + bdec = BigDoEverythingClass(config, options) + + d = Daemon(logfile, pidfile, bdec) + if options.daemon: + d.daemonize_exit() + else: + d.foreground() + +if __name__ == "__main__": + main(sys.argv[1:]) diff --git a/tools/server-side/svnpubsub/rc.d/svnpubsub b/tools/server-side/svnpubsub/rc.d/svnpubsub new file mode 120000 index 0000000..b05e35e --- /dev/null +++ b/tools/server-side/svnpubsub/rc.d/svnpubsub @@ -0,0 +1 @@ +svnpubsub.freebsd
\ No newline at end of file diff --git a/tools/server-side/svnpubsub/rc.d/svnpubsub.debian b/tools/server-side/svnpubsub/rc.d/svnpubsub.debian new file mode 100755 index 0000000..c61057d --- /dev/null +++ b/tools/server-side/svnpubsub/rc.d/svnpubsub.debian @@ -0,0 +1,62 @@ +#!/bin/bash +### BEGIN INIT INFO +# Provides: svnpubsub +# Required-Start: $remote_fs +# Required-Stop: $remote_fs +# Default-Start: 2 3 4 5 +# Default-Stop: 0 1 6 +# Short-Description: SvnPubSub +# Description: start SvnPubSub daemon +#### END INIT INFO + +. /lib/init/vars.sh +. /lib/lsb/init-functions + +svnpubsub_user=${svnpubsub_user-"daemon"} +svnpubsub_group=${svnpubsub_group-"daemon"} +svnpubsub_reactor=${svnpubsub_reactor-"poll"} +svnpubsub_pidfile=${svnpubsub_pidfile-"/var/run/svnpubsub.pid"} +pidfile="${svnpubsub_pidfile}" + +TWSITD_CMD="/usr/bin/twistd -y /opt/svnpubsub/svnpubsub.tac \ + --logfile=/var/log/svnpubsub/svnpubsub.log \ + --pidfile=${pidfile} \ + --uid=${svnpubsub_user} --gid=${svnpubsub_user} \ + -r${svnpubsub_reactor}" + +RETVAL=0 + +start() { + echo "Starting SvnPubSub Server: " + $TWSITD_CMD + RETVAL=$? + [ $RETVAL -eq 0 ] && echo "ok" || echo "failed" + return $RETVAL +} + +stop() { + echo "Stopping SvnPubSub Server: " + THE_PID=`cat ${pidfile}` + kill $THE_PID + RETVAL=$? + [ $RETVAL -eq 0 ] && echo "ok" || echo "failed" + return $RETVAL +} + +case "$1" in + start) + start + ;; + stop) + stop + ;; + restart) + stop + start + ;; + *) + echo "Usage: $0 {start|stop|restart}" + exit 1 +esac + +exit $RETVAL diff --git a/tools/server-side/svnpubsub/rc.d/svnpubsub.freebsd b/tools/server-side/svnpubsub/rc.d/svnpubsub.freebsd new file mode 100755 index 0000000..71fc8c8 --- /dev/null +++ b/tools/server-side/svnpubsub/rc.d/svnpubsub.freebsd @@ -0,0 +1,37 @@ +#!/bin/sh +# +# PROVIDE: svnpubsub +# REQUIRE: DAEMON +# KEYWORD: shutdown + +. /etc/rc.subr + +name="svnpubsub" +rcvar=`set_rcvar` + +load_rc_config $name + +# +# DO NOT CHANGE THESE DEFAULT VALUES HERE +# SET THEM IN THE /etc/rc.conf FILE +# +svnpubsub_enable=${svnpubsub_enable-"NO"} +svnpubsub_user=${svnpubsub_user-"svn"} +svnpubsub_group=${svnpubsub_group-"svn"} +svnpubsub_reactor=${svnpubsub_reactor-"poll"} +svnpubsub_pidfile=${svnpubsub_pidfile-"/var/run/svnpubsub/svnpubsub.pid"} +svnpubsub_cmd_int=${svnpubsub_cmd_int-"python"} +pidfile="${svnpubsub_pidfile}" + +export PYTHON_EGG_CACHE="/home/svn/.python-eggs" + +command="/usr/local/bin/twistd" +command_interpreter="/usr/local/bin/${svnwcsub_cmd_int}" +command_args="-y /usr/local/svnpubsub/svnpubsub.tac \ + --logfile=/var/log/vc/svnpubsub.log \ + --pidfile=${pidfile} \ + --uid=${svnpubsub_user} --gid=${svnpubsub_user} \ + -r${svnpubsub_reactor}" + + +run_rc_command "$1" diff --git a/tools/server-side/svnpubsub/rc.d/svnpubsub.solaris b/tools/server-side/svnpubsub/rc.d/svnpubsub.solaris new file mode 100755 index 0000000..3a9cf9f --- /dev/null +++ b/tools/server-side/svnpubsub/rc.d/svnpubsub.solaris @@ -0,0 +1,53 @@ +#!/usr/bin/bash +# +# a dumb init script for twistd on solaris. cus like, writing XML for SMF is f'ing lame. +# + +svnpubsub_user=${svnpubsub_user-"daemon"} +svnpubsub_group=${svnpubsub_group-"daemon"} +svnpubsub_reactor=${svnpubsub_reactor-"poll"} +svnpubsub_pidfile=${svnpubsub_pidfile-"/var/run/svnpubsub/svnpubsub.pid"} +pidfile="${svnpubsub_pidfile}" + +TWSITD_CMD="/opt/local/bin//twistd -y /usr/local/svnpubsub/svnpubsub.tac \ + --logfile=/x1/log/svnpubsub.log \ + --pidfile=${pidfile} \ + --uid=${svnpubsub_user} --gid=${svnpubsub_user} \ + -r${svnpubsub_reactor}" + +RETVAL=0 + +start() { + echo "Starting SvnPubSub Server: " + $TWSITD_CMD + RETVAL=$? + [ $RETVAL -eq 0 ] && echo "ok" || echo "failed" + return $RETVAL +} + +stop() { + echo "Stopping SvnPubSub Server: " + THE_PID=`cat ${pidfile}` + kill $THE_PID + RETVAL=$? + [ $RETVAL -eq 0 ] && echo "ok" || echo "failed" + return $RETVAL +} + +case "$1" in + start) + start + ;; + stop) + stop + ;; + restart) + stop + start + ;; + *) + echo "Usage: $0 {start|stop|restart}" + exit 1 +esac + +exit $RETVAL diff --git a/tools/server-side/svnpubsub/rc.d/svnwcsub b/tools/server-side/svnpubsub/rc.d/svnwcsub new file mode 120000 index 0000000..310fcbe --- /dev/null +++ b/tools/server-side/svnpubsub/rc.d/svnwcsub @@ -0,0 +1 @@ +svnwcsub.freebsd
\ No newline at end of file diff --git a/tools/server-side/svnpubsub/rc.d/svnwcsub.debian b/tools/server-side/svnpubsub/rc.d/svnwcsub.debian new file mode 100755 index 0000000..caf5511 --- /dev/null +++ b/tools/server-side/svnpubsub/rc.d/svnwcsub.debian @@ -0,0 +1,65 @@ +#!/bin/bash +### BEGIN INIT INFO +# Provides: svnwcsub +# Required-Start: $remote_fs +# Required-Stop: $remote_fs +# Default-Start: 2 3 4 5 +# Default-Stop: 0 1 6 +# Short-Description: SvnWcSub +# Description: start SvnWcSub daemon +#### END INIT INFO + +. /lib/init/vars.sh +. /lib/lsb/init-functions + +svnwcsub_user=${svnwcsub_user-"svnwc"} +svnwcsub_group=${svnwcsub_group-"svnwc"} +svnwcsub_pidfile=${svnwcsub_pidfile-"/var/run/svnwcsub.pid"} +svnwcsub_config=${svnwcsub_config-"/etc/svnwcsub.conf"} +svnwcsub_logfile=${svnwcsub_logfile-"/var/log/svnwcsub/svnwcsub.log"} +pidfile="${svnwcsub_pidfile}" + +SVNWCSUB_CMD="/opt/svnpubsub/svnwcsub.py \ + --daemon \ + --logfile=${svnwcsub_logfile} \ + --pidfile=${pidfile} \ + --uid=${svnwcsub_user} --gid=${svnwcsub_group} \ + --umask=002 \ + ${svnwcsub_config} " + +RETVAL=0 + +start() { + echo "Starting SvnWcSub Server: " + $SVNWCSUB_CMD + RETVAL=$? + [ $RETVAL -eq 0 ] && echo "ok" || echo "failed" + return $RETVAL +} + +stop() { + echo "Stopping SvnWcSub Server: " + THE_PID=`cat ${pidfile}` + kill $THE_PID + RETVAL=$? + [ $RETVAL -eq 0 ] && echo "ok" || echo "failed" + return $RETVAL +} + +case "$1" in + start) + start + ;; + stop) + stop + ;; + restart) + stop + start + ;; + *) + echo "Usage: $0 {start|stop|restart}" + exit 1 +esac + +exit $RETVAL diff --git a/tools/server-side/svnpubsub/rc.d/svnwcsub.freebsd b/tools/server-side/svnpubsub/rc.d/svnwcsub.freebsd new file mode 100755 index 0000000..58ad386 --- /dev/null +++ b/tools/server-side/svnpubsub/rc.d/svnwcsub.freebsd @@ -0,0 +1,39 @@ +#!/bin/sh +# +# PROVIDE: svnwcsub +# REQUIRE: DAEMON +# KEYWORD: shutdown + +. /etc/rc.subr + +name="svnwcsub" +rcvar=`set_rcvar` + +load_rc_config $name + +# +# DO NOT CHANGE THESE DEFAULT VALUES HERE +# SET THEM IN THE /etc/rc.conf FILE +# +svnwcsub_enable=${svnwcsub_enable-"NO"} +svnwcsub_user=${svnwcsub_user-"svnwc"} +svnwcsub_group=${svnwcsub_group-"svnwc"} +svnwcsub_pidfile=${svnwcsub_pidfile-"/var/run/svnwcsub/svnwcsub.pub"} +svnwcsub_env="PYTHON_EGG_CACHE" +svnwcsub_cmd_int=${svnwcsub_cmd_int-"python"} +svnwcsub_config=${svnwcsub_config-"/etc/svnwcsub.conf"} +svnwcsub_logfile=${svnwcsub_logfile-"/var/log/svnwcsub/svnwcsub.log"} +pidfile="${svnwcsub_pidfile}" + +export PYTHON_EGG_CACHE="/var/run/svnwcsub" + +command="/usr/local/svnpubsub/svnwcsub.py" +command_interpreter="/usr/local/bin/${svnwcsub_cmd_int}" +command_args="--daemon \ + --logfile=${svnwcsub_logfile} \ + --pidfile=${pidfile} \ + --uid=${svnwcsub_user} --gid=${svnwcsub_group} \ + --umask=002 \ + ${svnwcsub_config}" + +run_rc_command "$1" diff --git a/tools/server-side/svnpubsub/rc.d/svnwcsub.solaris b/tools/server-side/svnpubsub/rc.d/svnwcsub.solaris new file mode 100755 index 0000000..bd0c2bd --- /dev/null +++ b/tools/server-side/svnpubsub/rc.d/svnwcsub.solaris @@ -0,0 +1,56 @@ +#!/usr/bin/bash +# +# a dumb init script for twistd on solaris. cus like, writing XML for SMF is f'ing lame. +# + +svnwcsub_user=${svnwcsub_user-"svnwc"} +svnwcsub_group=${svnwcsub_group-"other"} +svnwcsub_pidfile=${svnwcsub_pidfile-"/var/run/svnwcsub/svnwcsub.pid"} +svnwcsub_config=${svnwcsub_config-"/etc/svnwcsub.conf"} +svnwcsub_logfile=${svnwcsub_logfile-"/x1/log/svnwcsub/svnwcsub.log"} +pidfile="${svnwcsub_pidfile}" + +SVNWCSUB_CMD="/usr/local/svnpubsub/svnwcsub.py \ + --daemon \ + --logfile=${svnwcsub_logfile} \ + --pidfile=${pidfile} \ + --uid=${svnwcsub_user} --gid=${svnwcsub_group} \ + --umask=002 \ + ${svnwcsub_config}" + +RETVAL=0 + +start() { + echo "Starting SvnWcSub Server: " + $SVNWCSUB_CMD + RETVAL=$? + [ $RETVAL -eq 0 ] && echo "ok" || echo "failed" + return $RETVAL +} + +stop() { + echo "Stopping SvnWcSub Server: " + THE_PID=`cat ${pidfile}` + kill $THE_PID + RETVAL=$? + [ $RETVAL -eq 0 ] && echo "ok" || echo "failed" + return $RETVAL +} + +case "$1" in + start) + start + ;; + stop) + stop + ;; + restart) + stop + start + ;; + *) + echo "Usage: $0 {start|stop|restart}" + exit 1 +esac + +exit $RETVAL diff --git a/tools/server-side/svnpubsub/svnpubsub.tac b/tools/server-side/svnpubsub/svnpubsub.tac new file mode 100644 index 0000000..574ad24 --- /dev/null +++ b/tools/server-side/svnpubsub/svnpubsub.tac @@ -0,0 +1,33 @@ +#!/usr/bin/env python +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import sys +import os +from twisted.application import service, internet + +sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) + +from svnpubsub.server import svnpubsub_server + +application = service.Application("SvnPubSub") + +def get_service(): + return internet.TCPServer(2069, svnpubsub_server()) + +service = get_service() +service.setServiceParent(application) diff --git a/tools/server-side/svnpubsub/svnpubsub/__init__.py b/tools/server-side/svnpubsub/svnpubsub/__init__.py new file mode 100644 index 0000000..f50e195 --- /dev/null +++ b/tools/server-side/svnpubsub/svnpubsub/__init__.py @@ -0,0 +1 @@ +# Turn svnpubsub/ into a package. diff --git a/tools/server-side/svnpubsub/svnpubsub/client.py b/tools/server-side/svnpubsub/svnpubsub/client.py new file mode 100644 index 0000000..c1631d6 --- /dev/null +++ b/tools/server-side/svnpubsub/svnpubsub/client.py @@ -0,0 +1,230 @@ +#!/usr/bin/env python +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# +# Generic client for SvnPubSub +# +# ### usage... +# +# +# EVENTS +# +# connected: a connection to the server has been opened (though not +# necessarily established) +# closed: the connection was closed. reconnect will be attempted. +# error: an error closed the connection. reconnect will be attempted. +# ping: the server has sent a keepalive +# stale: no activity has been seen, so the connection will be closed +# and reopened +# + +import asyncore +import asynchat +import socket +import functools +import time +import json +try: + import urlparse +except ImportError: + import urllib.parse as urlparse + +# How long the polling loop should wait for activity before returning. +TIMEOUT = 30.0 + +# Always delay a bit when trying to reconnect. This is not precise, but sets +# a minimum amount of delay. At the moment, there is no further backoff. +RECONNECT_DELAY = 25.0 + +# If we don't see anything from the server for this amount time, then we +# will drop and reconnect. The TCP connection may have gone down without +# us noticing it somehow. +STALE_DELAY = 60.0 + + +class SvnpubsubClientException(Exception): + pass + +class Client(asynchat.async_chat): + + def __init__(self, url, commit_callback, event_callback): + asynchat.async_chat.__init__(self) + + self.last_activity = time.time() + self.ibuffer = [] + + self.url = url + parsed_url = urlparse.urlsplit(url) + if parsed_url.scheme != 'http': + raise ValueError("URL scheme must be http: '%s'" % url) + host = parsed_url.hostname + port = parsed_url.port + resource = parsed_url.path + if parsed_url.query: + resource += "?%s" % parsed_url.query + if parsed_url.fragment: + resource += "#%s" % parsed_url.fragment + + self.event_callback = event_callback + + self.parser = JSONRecordHandler(commit_callback, event_callback) + + # Wait for the end of headers. Then we start parsing JSON. + self.set_terminator(b'\r\n\r\n') + self.skipping_headers = True + + self.create_socket(socket.AF_INET, socket.SOCK_STREAM) + try: + self.connect((host, port)) + except: + self.handle_error() + return + + self.push(('GET %s HTTP/1.0\r\n\r\n' % resource).encode('ascii')) + + def handle_connect(self): + self.event_callback('connected', None) + + def handle_close(self): + self.event_callback('closed', None) + self.close() + + def handle_error(self): + self.event_callback('error', None) + self.close() + + def found_terminator(self): + if self.skipping_headers: + self.skipping_headers = False + # Each JSON record is terminated by a null character + self.set_terminator(b'\0') + else: + record = b"".join(self.ibuffer) + self.ibuffer = [] + self.parser.feed(record.decode()) + + def collect_incoming_data(self, data): + # Remember the last time we saw activity + self.last_activity = time.time() + + if not self.skipping_headers: + self.ibuffer.append(data) + + +class JSONRecordHandler: + def __init__(self, commit_callback, event_callback): + self.commit_callback = commit_callback + self.event_callback = event_callback + + def feed(self, record): + obj = json.loads(record) + if 'svnpubsub' in obj: + actual_version = obj['svnpubsub'].get('version') + EXPECTED_VERSION = 1 + if actual_version != EXPECTED_VERSION: + raise SvnpubsubClientException("Unknown svnpubsub format: %r != %d" + % (actual_format, expected_format)) + self.event_callback('version', obj['svnpubsub']['version']) + elif 'commit' in obj: + commit = Commit(obj['commit']) + self.commit_callback(commit) + elif 'stillalive' in obj: + self.event_callback('ping', obj['stillalive']) + + +class Commit(object): + def __init__(self, commit): + self.__dict__.update(commit) + + +class MultiClient(object): + def __init__(self, urls, commit_callback, event_callback): + self.commit_callback = commit_callback + self.event_callback = event_callback + + # No target time, as no work to do + self.target_time = 0 + self.work_items = [ ] + + for url in urls: + self._add_channel(url) + + def _reconnect(self, url, event_name, event_arg): + if event_name == 'closed' or event_name == 'error': + # Stupid connection closed for some reason. Set up a reconnect. Note + # that it should have been removed from asyncore.socket_map already. + self._reconnect_later(url) + + # Call the user's callback now. + self.event_callback(url, event_name, event_arg) + + def _reconnect_later(self, url): + # Set up a work item to reconnect in a little while. + self.work_items.append(url) + + # Only set a target if one has not been set yet. Otherwise, we could + # create a race condition of continually moving out towards the future + if not self.target_time: + self.target_time = time.time() + RECONNECT_DELAY + + def _add_channel(self, url): + # Simply instantiating the client will install it into the global map + # for processing in the main event loop. + Client(url, + functools.partial(self.commit_callback, url), + functools.partial(self._reconnect, url)) + + def _check_stale(self): + now = time.time() + for client in asyncore.socket_map.values(): + if client.last_activity + STALE_DELAY < now: + # Whoops. No activity in a while. Signal this fact, Close the + # Client, then have it reconnected later on. + self.event_callback(client.url, 'stale', client.last_activity) + + # This should remove it from .socket_map. + client.close() + + self._reconnect_later(client.url) + + def _maybe_work(self): + # If we haven't reach the targetted time, or have no work to do, + # then fast-path exit + if time.time() < self.target_time or not self.work_items: + return + + # We'll take care of all the work items, so no target for future work + self.target_time = 0 + + # Play a little dance just in case work gets added while we're + # currently working on stuff + work = self.work_items + self.work_items = [ ] + + for url in work: + self._add_channel(url) + + def run_forever(self): + while True: + if asyncore.socket_map: + asyncore.loop(timeout=TIMEOUT, count=1) + else: + time.sleep(TIMEOUT) + + self._check_stale() + self._maybe_work() diff --git a/tools/server-side/svnpubsub/svnpubsub/server.py b/tools/server-side/svnpubsub/svnpubsub/server.py new file mode 100644 index 0000000..faee423 --- /dev/null +++ b/tools/server-side/svnpubsub/svnpubsub/server.py @@ -0,0 +1,241 @@ +#!/usr/bin/env python +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# +# SvnPubSub - Simple Push Notification of Subversion commits +# +# Based on the theory behind the Live Journal Atom Streaming Service: +# <http://atom.services.livejournal.com/> +# +# Instead of using a complicated XMPP/AMPQ/JMS/super messaging service, +# we have simple HTTP GETs and PUTs to get data in and out. +# +# Currently supports both XML and JSON serialization. +# +# Example Sub clients: +# curl -sN http://127.0.0.1:2069/commits +# curl -sN http://127.0.0.1:2069/commits/svn/* +# curl -sN http://127.0.0.1:2069/commits/svn +# curl -sN http://127.0.0.1:2069/commits/*/13f79535-47bb-0310-9956-ffa450edef68 +# curl -sN http://127.0.0.1:2069/commits/svn/13f79535-47bb-0310-9956-ffa450edef68 +# +# URL is built into 2 parts: +# /commits/${optional_type}/${optional_repository} +# +# If the type is included in the URL, you will only get commits of that type. +# The type can be * and then you will receive commits of any type. +# +# If the repository is included in the URL, you will only receive +# messages about that repository. The repository can be * and then you +# will receive messages about all repositories. +# +# Example Pub clients: +# curl -T revinfo.json -i http://127.0.0.1:2069/commits +# +# TODO: +# - Add Real access controls (not just 127.0.0.1) +# - Document PUT format +# - Convert to twisted.python.log + + + + +try: + import simplejson as json +except ImportError: + import json + +import sys + +import twisted +from twisted.internet import reactor +from twisted.internet import defer +from twisted.web import server +from twisted.web import resource +from twisted.python import log + +import time + +class Commit: + def __init__(self, r): + self.__dict__.update(r) + if not self.check_value('repository'): + raise ValueError('Invalid Repository Value') + if not self.check_value('type'): + raise ValueError('Invalid Type Value') + if not self.check_value('format'): + raise ValueError('Invalid Format Value') + if not self.check_value('id'): + raise ValueError('Invalid ID Value') + + def check_value(self, k): + return hasattr(self, k) and self.__dict__[k] + + def render_commit(self): + obj = {'commit': {}} + obj['commit'].update(self.__dict__) + return json.dumps(obj) + + def render_log(self): + try: + paths_changed = " %d paths changed" % len(self.changed) + except: + paths_changed = "" + return "%s:%s repo '%s' id '%s'%s" % (self.type, + self.format, + self.repository, + self.id, + paths_changed) + + +HEARTBEAT_TIME = 15 + +class Client(object): + def __init__(self, pubsub, r, type, repository): + self.pubsub = pubsub + r.notifyFinish().addErrback(self.finished) + self.r = r + self.type = type + self.repository = repository + self.alive = True + log.msg("OPEN: %s:%d (%d clients online)"% (r.getClientIP(), r.client.port, pubsub.cc()+1)) + + def finished(self, reason): + self.alive = False + log.msg("CLOSE: %s:%d (%d clients online)"% (self.r.getClientIP(), self.r.client.port, self.pubsub.cc())) + try: + self.pubsub.remove(self) + except ValueError: + pass + + def interested_in(self, commit): + if self.type and self.type != commit.type: + return False + + if self.repository and self.repository != commit.repository: + return False + + return True + + def notify(self, data): + self.write(data) + + def start(self): + self.write_start() + reactor.callLater(HEARTBEAT_TIME, self.heartbeat, None) + + def heartbeat(self, args): + if self.alive: + self.write_heartbeat() + reactor.callLater(HEARTBEAT_TIME, self.heartbeat, None) + + def write_data(self, data): + self.write(data + "\n\0") + + """ "Data must not be unicode" is what the interfaces.ITransport says... grr. """ + def write(self, input): + self.r.write(str(input)) + + def write_start(self): + self.r.setHeader('X-SVNPubSub-Version', '1') + self.r.setHeader('content-type', 'application/vnd.apache.vc-notify+json') + self.write('{"svnpubsub": {"version": 1}}\n\0') + + def write_heartbeat(self): + self.write(json.dumps({"stillalive": time.time()}) + "\n\0") + + +class SvnPubSub(resource.Resource): + isLeaf = True + clients = [] + + def cc(self): + return len(self.clients) + + def remove(self, c): + self.clients.remove(c) + + def render_GET(self, request): + log.msg("REQUEST: %s" % (request.uri)) + request.setHeader('content-type', 'text/plain') + + repository = None + type = None + + uri = request.uri.split('/') + uri_len = len(uri) + if uri_len < 2 or uri_len > 4: + request.setResponseCode(400) + return "Invalid path\n" + + if uri_len >= 3: + type = uri[2] + + if uri_len == 4: + repository = uri[3] + + # Convert wild card to None. + if type == '*': + type = None + if repository == '*': + repository = None + + c = Client(self, request, type, repository) + self.clients.append(c) + c.start() + return twisted.web.server.NOT_DONE_YET + + def notifyAll(self, commit): + data = commit.render_commit() + + log.msg("COMMIT: %s (%d clients)" % (commit.render_log(), self.cc())) + for client in self.clients: + if client.interested_in(commit): + client.write_data(data) + + def render_PUT(self, request): + request.setHeader('content-type', 'text/plain') + ip = request.getClientIP() + if ip != "127.0.0.1": + request.setResponseCode(401) + return "Access Denied" + input = request.content.read() + #import pdb;pdb.set_trace() + #print "input: %s" % (input) + try: + c = json.loads(input) + commit = Commit(c) + except ValueError as e: + request.setResponseCode(400) + log.msg("COMMIT: failed due to: %s" % str(e)) + return str(e) + self.notifyAll(commit) + return "Ok" + +def svnpubsub_server(): + root = resource.Resource() + s = SvnPubSub() + root.putChild("commits", s) + return server.Site(root) + +if __name__ == "__main__": + log.startLogging(sys.stdout) + # Port 2069 "HTTP Event Port", whatever, sounds good to me + reactor.listenTCP(2069, svnpubsub_server()) + reactor.run() + diff --git a/tools/server-side/svnpubsub/svntweet.py b/tools/server-side/svnpubsub/svntweet.py new file mode 100755 index 0000000..ed426bd --- /dev/null +++ b/tools/server-side/svnpubsub/svntweet.py @@ -0,0 +1,237 @@ +#!/usr/bin/env python +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# +# SvnTweet - Subscribe to a SvnPubSub stream, and Twitter about it! +# +# Example: +# svntweet.py my-config.json +# +# With my-config.json containing stream paths and the twitter auth info: +# {"stream": "http://svn.apache.org:2069/commits", +# "username": "asfcommits", +# "password": "MyLuggageComboIs1234"} +# +# +# + +import threading +import sys +import os +try: + import simplejson as json +except ImportError: + import json + +from twisted.internet import defer, reactor, task, threads +from twisted.python import failure, log +from twisted.web.client import HTTPClientFactory, HTTPPageDownloader + +from urlparse import urlparse +import time +import posixpath + +sys.path.insert(0, os.path.join(os.path.dirname(os.path.abspath(__file__)), "twitty-twister", "lib")) +try: + import twitter +except: + print "Get a copy of twitty-twister from <http://github.com/dustin/twitty-twister>" + sys.exit(-1) +class Config(object): + def __init__(self, path): + self.path = path + self.mtime_path = 0 + self.config = {} + self._load_config() + + def _load_config(self): + mtime = os.path.getmtime(self.path) + if mtime != self.mtime_path: + fp = open(self.path, "rb") + self.mtime_path = mtime + self.config = json.loads(fp.read()) + +class HTTPStream(HTTPClientFactory): + protocol = HTTPPageDownloader + + def __init__(self, url): + HTTPClientFactory.__init__(self, url, method="GET", agent="SvnTweet/0.1.0") + + def pageStart(self, partial): + pass + + def pagePart(self, data): + pass + + def pageEnd(self): + pass + +class Commit(object): + def __init__(self, commit): + self.__dict__.update(commit) + +class JSONRecordHandler: + def __init__(self, bdec): + self.bdec = bdec + + def feed(self, record): + obj = json.loads(record) + if 'svnpubsub' in obj: + actual_version = obj['svnpubsub'].get('version') + EXPECTED_VERSION = 1 + if actual_version != EXPECTED_VERSION: + raise ValueException("Unknown svnpubsub format: %r != %d" + % (actual_format, expected_format)) + elif 'commit' in obj: + commit = Commit(obj['commit']) + if not hasattr(commit, 'type'): + raise ValueException("Commit object is missing type field.") + if not hasattr(commit, 'format'): + raise ValueException("Commit object is missing format field.") + if commit.type != 'svn' and commit.format != 1: + raise ValueException("Unexpected type and/or format: %s:%s" + % (commit.type, commit.format)) + self.bdec.commit(commit) + elif 'stillalive' in obj: + self.bdec.stillalive() + +class JSONHTTPStream(HTTPStream): + def __init__(self, url, bdec): + HTTPStream.__init__(self, url) + self.bdec = bdec + self.ibuffer = [] + self.parser = JSONRecordHandler(bdec) + + def pageStart(self, partial): + self.bdec.pageStart() + + def pagePart(self, data): + eor = data.find("\0") + if eor >= 0: + self.ibuffer.append(data[0:eor]) + self.parser.feed(''.join(self.ibuffer)) + self.ibuffer = [data[eor+1:]] + else: + self.ibuffer.append(data) + +def connectTo(url, bdec): + u = urlparse(url) + port = u.port + if not port: + port = 80 + s = JSONHTTPStream(url, bdec) + conn = reactor.connectTCP(u.hostname, u.port, s) + return [s, conn] + + +CHECKBEAT_TIME = 90 + +class BigDoEverythingClasss(object): + def __init__(self, config): + self.c = config + self.c._load_config() + self.url = str(self.c.config.get('stream')) + self.failures = 0 + self.alive = time.time() + self.checker = task.LoopingCall(self._checkalive) + self.transport = None + self.stream = None + self._restartStream() + self.watch = [] + self.twit = twitter.Twitter(self.c.config.get('username'), self.c.config.get('password')) + + def pageStart(self): + log.msg("Stream Connection Established") + self.failures = 0 + + def _restartStream(self): + (self.stream, self.transport) = connectTo(self.url, self) + self.stream.deferred.addBoth(self.streamDead) + self.alive = time.time() + self.checker.start(CHECKBEAT_TIME) + + def _checkalive(self): + n = time.time() + if n - self.alive > CHECKBEAT_TIME: + log.msg("Stream is dead, reconnecting") + self.transport.disconnect() + + def stillalive(self): + self.alive = time.time() + + def streamDead(self, v): + BACKOFF_SECS = 5 + BACKOFF_MAX = 60 + self.checker.stop() + + self.stream = None + self.failures += 1 + backoff = min(self.failures * BACKOFF_SECS, BACKOFF_MAX) + log.msg("Stream disconnected, trying again in %d seconds.... %s" % (backoff, self.url)) + reactor.callLater(backoff, self._restartStream) + + def _normalize_path(self, path): + if path[0] != '/': + return "/" + path + return posixpath.abspath(path) + + def tweet(self, msg): + log.msg("SEND TWEET: %s" % (msg)) + self.twit.update(msg).addCallback(self.tweet_done).addErrback(log.msg) + + def tweet_done(self, x): + log.msg("TWEET: Success!") + + def build_tweet(self, commit): + maxlen = 144 + left = maxlen + paths = map(self._normalize_path, commit.changed) + if not len(paths): + return None + path = posixpath.commonprefix(paths) + if path[0:1] == '/' and len(path) > 1: + path = path[1:] + + #TODO: allow URL to be configurable. + link = " - http://svn.apache.org/r%d" % (commit.id) + left -= len(link) + msg = "r%d in %s by %s: " % (commit.id, path, commit.committer) + left -= len(msg) + if left > 3: + msg += commit.log[0:left] + msg += link + return msg + + def commit(self, commit): + log.msg("COMMIT r%d (%d paths)" % (commit.id, len(commit.changed))) + msg = self.build_tweet(commit) + if msg: + self.tweet(msg) + #print "Common Prefix: %s" % (pre) + +def main(config_file): + c = Config(config_file) + big = BigDoEverythingClasss(c) + reactor.run() + +if __name__ == "__main__": + if len(sys.argv) != 2: + print "invalid args, read source code" + sys.exit(0) + log.startLogging(sys.stdout) + main(sys.argv[1]) diff --git a/tools/server-side/svnpubsub/svnwcsub.conf.example b/tools/server-side/svnpubsub/svnwcsub.conf.example new file mode 100644 index 0000000..644a3b7 --- /dev/null +++ b/tools/server-side/svnpubsub/svnwcsub.conf.example @@ -0,0 +1,16 @@ +[DEFAULT] +svnbin: /usr/local/bin/svn +streams: http://svn.example.org:2069/commits/svn +# hook: /usr/bin/true + +## The values below are used by ConfigParser's interpolation syntax. +## See http://docs.python.org/library/configparser +SOME_REPOS: svn://svn.example.org/repos/chaos + +[env] +HOME: /home/svn +LANG: en_US.UTF-8 + +[track] +/usr/local/foo/prod: %(SOME_REPOS)s/foo/production +/usr/local/foo/dev: %(SOME_REPOS)s/foo/trunk diff --git a/tools/server-side/svnpubsub/svnwcsub.py b/tools/server-side/svnpubsub/svnwcsub.py new file mode 100755 index 0000000..366df7c --- /dev/null +++ b/tools/server-side/svnpubsub/svnwcsub.py @@ -0,0 +1,546 @@ +#!/usr/bin/env python +# encoding: UTF-8 +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# +# SvnWcSub - Subscribe to a SvnPubSub stream, and keep a set of working copy +# paths in sync +# +# Example: +# svnwcsub.py svnwcsub.conf +# +# On startup svnwcsub checks the working copy's path, runs a single svn update +# and then watches for changes to that path. +# +# See svnwcsub.conf for more information on its contents. +# + +# TODO: +# - bulk update at startup time to avoid backlog warnings +# - fold BDEC into Daemon +# - fold WorkingCopy._get_match() into __init__ +# - remove wc_ready(). assume all WorkingCopy instances are usable. +# place the instances into .watch at creation. the .update_applies() +# just returns if the wc is disabled (eg. could not find wc dir) +# - figure out way to avoid the ASF-specific PRODUCTION_RE_FILTER +# (a base path exclusion list should work for the ASF) +# - add support for SIGHUP to reread the config and reinitialize working copies +# - joes will write documentation for svnpubsub as these items become fulfilled +# - make LOGLEVEL configurable + +import errno +import subprocess +import threading +import sys +import os +import re +import posixpath +try: + import ConfigParser +except ImportError: + import configparser as ConfigParser +import time +import logging.handlers +try: + import Queue +except ImportError: + import queue as Queue +import optparse +import functools +try: + import urlparse +except ImportError: + import urllib.parse as urlparse + +import daemonize +import svnpubsub.client + +# check_output() is only available in Python 2.7. Allow us to run with +# earlier versions +try: + check_output = subprocess.check_output +except AttributeError: + def check_output(args, env): # note: we only use these two args + pipe = subprocess.Popen(args, stdout=subprocess.PIPE, env=env) + output, _ = pipe.communicate() + if pipe.returncode: + raise subprocess.CalledProcessError(pipe.returncode, args) + return output + +assert hasattr(subprocess, 'check_call') +def check_call(*args, **kwds): + """Wrapper around subprocess.check_call() that logs stderr upon failure.""" + assert 'stderr' not in kwds + kwds.update(stderr=subprocess.PIPE) + pipe = subprocess.Popen(*args, **kwds) + output, errput = pipe.communicate() + if pipe.returncode: + cmd = args[0] if len(args) else kwds.get('args', '(no command)') + # TODO: log stdout too? + logging.error('Command failed: returncode=%d command=%r stderr=%r', + pipe.returncode, cmd, errput) + raise subprocess.CalledProcessError(pipe.returncode, args) + return pipe.returncode # is EXIT_OK + +### note: this runs synchronously. within the current Twisted environment, +### it is called from ._get_match() which is run on a thread so it won't +### block the Twisted main loop. +def svn_info(svnbin, env, path): + "Run 'svn info' on the target path, returning a dict of info data." + args = [svnbin, "info", "--non-interactive", "--", path] + output = check_output(args, env=env).strip() + info = { } + for line in output.split('\n'): + idx = line.index(':') + info[line[:idx]] = line[idx+1:].strip() + return info + +try: + import glob + glob.iglob + def is_emptydir(path): + # ### If the directory contains only dotfile children, this will readdir() + # ### the entire directory. But os.readdir() is not exposed to us... + for x in glob.iglob('%s/*' % path): + return False + for x in glob.iglob('%s/.*' % path): + return False + return True +except (ImportError, AttributeError): + # Python ≤2.4 + def is_emptydir(path): + # This will read the entire directory list to memory. + return not os.listdir(path) + +class WorkingCopy(object): + def __init__(self, bdec, path, url): + self.path = path + self.url = url + + try: + self.match, self.uuid = self._get_match(bdec.svnbin, bdec.env) + bdec.wc_ready(self) + except: + logging.exception('problem with working copy: %s', path) + + def update_applies(self, uuid, path): + if self.uuid != uuid: + return False + + path = str(path) + if path == self.match: + #print "ua: Simple match" + # easy case. woo. + return True + if len(path) < len(self.match): + # path is potentially a parent directory of match? + #print "ua: parent check" + if self.match[0:len(path)] == path: + return True + if len(path) > len(self.match): + # path is potentially a sub directory of match + #print "ua: sub dir check" + if path[0:len(self.match)] == self.match: + return True + return False + + def _get_match(self, svnbin, env): + ### quick little hack to auto-checkout missing working copies + dotsvn = os.path.join(self.path, ".svn") + if not os.path.isdir(dotsvn) or is_emptydir(dotsvn): + logging.info("autopopulate %s from %s" % (self.path, self.url)) + check_call([svnbin, 'co', '-q', + '--force', + '--non-interactive', + '--config-option', + 'config:miscellany:use-commit-times=on', + '--', self.url, self.path], + env=env) + + # Fetch the info for matching dirs_changed against this WC + info = svn_info(svnbin, env, self.path) + root = info['Repository Root'] + url = info['URL'] + relpath = url[len(root):] # also has leading '/' + uuid = info['Repository UUID'] + return str(relpath), uuid + + +PRODUCTION_RE_FILTER = re.compile("/websites/production/[^/]+/") + +class BigDoEverythingClasss(object): + def __init__(self, config): + self.svnbin = config.get_value('svnbin') + self.env = config.get_env() + self.tracking = config.get_track() + self.hook = config.get_optional_value('hook') + self.streams = config.get_value('streams').split() + self.worker = BackgroundWorker(self.svnbin, self.env, self.hook) + self.watch = [ ] + + def start(self): + for path, url in self.tracking.items(): + # working copies auto-register with the BDEC when they are ready. + WorkingCopy(self, path, url) + + def wc_ready(self, wc): + # called when a working copy object has its basic info/url, + # Add it to our watchers, and trigger an svn update. + logging.info("Watching WC at %s <-> %s" % (wc.path, wc.url)) + self.watch.append(wc) + self.worker.add_work(OP_BOOT, wc) + + def _normalize_path(self, path): + if path[0] != '/': + return "/" + path + return posixpath.abspath(path) + + def commit(self, url, commit): + if commit.type != 'svn' or commit.format != 1: + logging.info("SKIP unknown commit format (%s.%d)", + commit.type, commit.format) + return + logging.info("COMMIT r%d (%d paths) from %s" + % (commit.id, len(commit.changed), url)) + + paths = map(self._normalize_path, commit.changed) + if len(paths): + pre = posixpath.commonprefix(paths) + if pre == "/websites/": + # special case for svnmucc "dynamic content" buildbot commits + # just take the first production path to avoid updating all cms working copies + for p in paths: + m = PRODUCTION_RE_FILTER.match(p) + if m: + pre = m.group(0) + break + + #print "Common Prefix: %s" % (pre) + wcs = [wc for wc in self.watch if wc.update_applies(commit.repository, pre)] + logging.info("Updating %d WC for r%d" % (len(wcs), commit.id)) + for wc in wcs: + self.worker.add_work(OP_UPDATE, wc) + + +# Start logging warnings if the work backlog reaches this many items +BACKLOG_TOO_HIGH = 20 +OP_BOOT = 'boot' +OP_UPDATE = 'update' +OP_CLEANUP = 'cleanup' + +class BackgroundWorker(threading.Thread): + def __init__(self, svnbin, env, hook): + threading.Thread.__init__(self) + + # The main thread/process should not wait for this thread to exit. + ### compat with Python 2.5 + self.setDaemon(True) + + self.svnbin = svnbin + self.env = env + self.hook = hook + self.q = Queue.Queue() + + self.has_started = False + + def run(self): + while True: + # This will block until something arrives + operation, wc = self.q.get() + + # Warn if the queue is too long. + # (Note: the other thread might have added entries to self.q + # after the .get() and before the .qsize().) + qsize = self.q.qsize()+1 + if operation != OP_BOOT and qsize > BACKLOG_TOO_HIGH: + logging.warn('worker backlog is at %d', qsize) + + try: + if operation == OP_UPDATE: + self._update(wc) + elif operation == OP_BOOT: + self._update(wc, boot=True) + elif operation == OP_CLEANUP: + self._cleanup(wc) + else: + logging.critical('unknown operation: %s', operation) + except: + logging.exception('exception in worker') + + # In case we ever want to .join() against the work queue + self.q.task_done() + + def add_work(self, operation, wc): + # Start the thread when work first arrives. Thread-start needs to + # be delayed in case the process forks itself to become a daemon. + if not self.has_started: + self.start() + self.has_started = True + + self.q.put((operation, wc)) + + def _update(self, wc, boot=False): + "Update the specified working copy." + + # For giggles, let's clean up the working copy in case something + # happened earlier. + self._cleanup(wc) + + logging.info("updating: %s", wc.path) + + ### we need to move some of these args into the config. these are + ### still specific to the ASF setup. + args = [self.svnbin, 'switch', + '--quiet', + '--non-interactive', + '--trust-server-cert', + '--ignore-externals', + '--config-option', + 'config:miscellany:use-commit-times=on', + '--', + wc.url, + wc.path] + check_call(args, env=self.env) + + ### check the loglevel before running 'svn info'? + info = svn_info(self.svnbin, self.env, wc.path) + logging.info("updated: %s now at r%s", wc.path, info['Revision']) + + ## Run the hook + if self.hook: + hook_mode = ['post-update', 'boot'][boot] + logging.info('running hook: %s at revision %s due to %s', + wc.path, info['Revision'], hook_mode) + args = [self.hook, hook_mode, + wc.path, info['Revision'], wc.url] + check_call(args, env=self.env) + + def _cleanup(self, wc): + "Run a cleanup on the specified working copy." + + ### we need to move some of these args into the config. these are + ### still specific to the ASF setup. + args = [self.svnbin, 'cleanup', + '--non-interactive', + '--trust-server-cert', + '--config-option', + 'config:miscellany:use-commit-times=on', + wc.path] + check_call(args, env=self.env) + + +class ReloadableConfig(ConfigParser.SafeConfigParser): + def __init__(self, fname): + ConfigParser.SafeConfigParser.__init__(self) + + self.fname = fname + self.read(fname) + + ### install a signal handler to set SHOULD_RELOAD. BDEC should + ### poll this flag, and then adjust its internal structures after + ### the reload. + self.should_reload = False + + def reload(self): + # Delete everything. Just re-reading would overlay, and would not + # remove sections/options. Note that [DEFAULT] will not be removed. + for section in self.sections(): + self.remove_section(section) + + # Now re-read the configuration file. + self.read(fname) + + def get_value(self, which): + return self.get(ConfigParser.DEFAULTSECT, which) + + def get_optional_value(self, which, default=None): + if self.has_option(ConfigParser.DEFAULTSECT, which): + return self.get(ConfigParser.DEFAULTSECT, which) + else: + return default + + def get_env(self): + env = os.environ.copy() + default_options = self.defaults().keys() + for name, value in self.items('env'): + if name not in default_options: + env[name] = value + return env + + def get_track(self): + "Return the {PATH: URL} dictionary of working copies to track." + track = dict(self.items('track')) + for name in self.defaults().keys(): + del track[name] + return track + + def optionxform(self, option): + # Do not lowercase the option name. + return str(option) + + +class Daemon(daemonize.Daemon): + def __init__(self, logfile, pidfile, umask, bdec): + daemonize.Daemon.__init__(self, logfile, pidfile) + + self.umask = umask + self.bdec = bdec + + def setup(self): + # There is no setup which the parent needs to wait for. + pass + + def run(self): + logging.info('svnwcsub started, pid=%d', os.getpid()) + + # Set the umask in the daemon process. Defaults to 000 for + # daemonized processes. Foreground processes simply inherit + # the value from the parent process. + if self.umask is not None: + umask = int(self.umask, 8) + os.umask(umask) + logging.info('umask set to %03o', umask) + + # Start the BDEC (on the main thread), then start the client + self.bdec.start() + + mc = svnpubsub.client.MultiClient(self.bdec.streams, + self.bdec.commit, + self._event) + mc.run_forever() + + def _event(self, url, event_name, event_arg): + if event_name == 'error': + logging.exception('from %s', url) + elif event_name == 'ping': + logging.debug('ping from %s', url) + else: + logging.info('"%s" from %s', event_name, url) + + +def prepare_logging(logfile): + "Log to the specified file, or to stdout if None." + + if logfile: + # Rotate logs daily, keeping 7 days worth. + handler = logging.handlers.TimedRotatingFileHandler( + logfile, when='midnight', backupCount=7, + ) + else: + handler = logging.StreamHandler(sys.stdout) + + # Add a timestamp to the log records + formatter = logging.Formatter('%(asctime)s [%(levelname)s] %(message)s', + '%Y-%m-%d %H:%M:%S') + handler.setFormatter(formatter) + + # Apply the handler to the root logger + root = logging.getLogger() + root.addHandler(handler) + + ### use logging.INFO for now. switch to cmdline option or a config? + root.setLevel(logging.INFO) + + +def handle_options(options): + # Set up the logging, then process the rest of the options. + prepare_logging(options.logfile) + + # In daemon mode, we let the daemonize module handle the pidfile. + # Otherwise, we should write this (foreground) PID into the file. + if options.pidfile and not options.daemon: + pid = os.getpid() + # Be wary of symlink attacks + try: + os.remove(options.pidfile) + except OSError: + pass + fd = os.open(options.pidfile, os.O_WRONLY | os.O_CREAT | os.O_EXCL, + 0444) + os.write(fd, '%d\n' % pid) + os.close(fd) + logging.info('pid %d written to %s', pid, options.pidfile) + + if options.gid: + try: + gid = int(options.gid) + except ValueError: + import grp + gid = grp.getgrnam(options.gid)[2] + logging.info('setting gid %d', gid) + os.setgid(gid) + + if options.uid: + try: + uid = int(options.uid) + except ValueError: + import pwd + uid = pwd.getpwnam(options.uid)[2] + logging.info('setting uid %d', uid) + os.setuid(uid) + + +def main(args): + parser = optparse.OptionParser( + description='An SvnPubSub client to keep working copies synchronized ' + 'with a repository.', + usage='Usage: %prog [options] CONFIG_FILE', + ) + parser.add_option('--logfile', + help='filename for logging') + parser.add_option('--pidfile', + help="the process' PID will be written to this file") + parser.add_option('--uid', + help='switch to this UID before running') + parser.add_option('--gid', + help='switch to this GID before running') + parser.add_option('--umask', + help='set this (octal) umask before running') + parser.add_option('--daemon', action='store_true', + help='run as a background daemon') + + options, extra = parser.parse_args(args) + + if len(extra) != 1: + parser.error('CONFIG_FILE is required') + config_file = extra[0] + + if options.daemon and not options.logfile: + parser.error('LOGFILE is required when running as a daemon') + if options.daemon and not options.pidfile: + parser.error('PIDFILE is required when running as a daemon') + + # Process any provided options. + handle_options(options) + + c = ReloadableConfig(config_file) + bdec = BigDoEverythingClasss(c) + + # We manage the logfile ourselves (along with possible rotation). The + # daemon process can just drop stdout/stderr into /dev/null. + d = Daemon('/dev/null', options.pidfile, options.umask, bdec) + if options.daemon: + # Daemonize the process and call sys.exit() with appropriate code + d.daemonize_exit() + else: + # Just run in the foreground (the default) + d.foreground() + + +if __name__ == "__main__": + main(sys.argv[1:]) diff --git a/tools/server-side/svnpubsub/testserver.py b/tools/server-side/svnpubsub/testserver.py new file mode 100755 index 0000000..8966a95 --- /dev/null +++ b/tools/server-side/svnpubsub/testserver.py @@ -0,0 +1,50 @@ +#!/usr/bin/env python +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# +# A simple test server for responding in different ways to SvnPubSub clients. +# This avoids the complexity of the Twisted framework in order to direct +# various (abnormal) conditions at the client. +# +# ### usage... +# + +import sys +import BaseHTTPServer + + +PORT = 2069 + +TEST_BODY = '{"svnpubsub": {"version": 1}}\n\0{"commit": {"type": "svn", "format": 1, "repository": "12345678-1234-1234-1234-123456789012", "id": "1234", "committer": "johndoe", "date": "2012-01-01 01:01:01 +0000 (Sun, 01 Jan 2012)", "log": "Frob the ganoozle with the snookish", "changed": {"one/path/alpha": {"flags": "U "}, "some/other/directory/": {"flags": "_U "}}}}\n\0' + +SEND_KEEPALIVE = True + + +class TestHandler(BaseHTTPServer.BaseHTTPRequestHandler): + def do_GET(self): + self.send_response(200) + self.send_header('Content-Length', str(len(TEST_BODY))) + self.send_header('Connection', 'keep-alive') + self.end_headers() + self.wfile.write(TEST_BODY) + + +if __name__ == '__main__': + server = BaseHTTPServer.HTTPServer(('', PORT), TestHandler) + sys.stderr.write('Now listening on port %d...\n' % (PORT,)) + server.serve_forever() diff --git a/tools/server-side/svnpubsub/watcher.py b/tools/server-side/svnpubsub/watcher.py new file mode 100755 index 0000000..340b100 --- /dev/null +++ b/tools/server-side/svnpubsub/watcher.py @@ -0,0 +1,55 @@ +#!/usr/bin/env python +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# +# Watch for events from SvnPubSub and print them to stdout +# +# + +import sys +import pprint +try: + import urlparse +except ImportError: + import urllib.parse as urlparse + +import svnpubsub.client + + +def _commit(url, commit): + print('COMMIT: from %s' % url) + pprint.pprint(vars(commit), indent=2) + + +def _event(url, event_name, event_arg): + if event_arg: + print('EVENT: from %s "%s" "%s"' % (url, event_name, event_arg)) + else: + print('EVENT: from %s "%s"' % (url, event_name)) + + +def main(urls): + mc = svnpubsub.client.MultiClient(urls, _commit, _event) + mc.run_forever() + + +if __name__ == "__main__": + if len(sys.argv) < 2: + print("usage: watcher.py URL [URL...]") + sys.exit(0) + main(sys.argv[1:]) |