summaryrefslogtreecommitdiff
path: root/tools/server-side/svnpubsub
diff options
context:
space:
mode:
Diffstat (limited to 'tools/server-side/svnpubsub')
-rw-r--r--tools/server-side/svnpubsub/README.txt24
-rwxr-xr-xtools/server-side/svnpubsub/commit-hook.py93
-rw-r--r--tools/server-side/svnpubsub/daemonize.py272
-rwxr-xr-xtools/server-side/svnpubsub/irkerbridge.py322
l---------tools/server-side/svnpubsub/rc.d/svnpubsub1
-rwxr-xr-xtools/server-side/svnpubsub/rc.d/svnpubsub.debian62
-rwxr-xr-xtools/server-side/svnpubsub/rc.d/svnpubsub.freebsd37
-rwxr-xr-xtools/server-side/svnpubsub/rc.d/svnpubsub.solaris53
l---------tools/server-side/svnpubsub/rc.d/svnwcsub1
-rwxr-xr-xtools/server-side/svnpubsub/rc.d/svnwcsub.debian65
-rwxr-xr-xtools/server-side/svnpubsub/rc.d/svnwcsub.freebsd39
-rwxr-xr-xtools/server-side/svnpubsub/rc.d/svnwcsub.solaris56
-rw-r--r--tools/server-side/svnpubsub/svnpubsub.tac33
-rw-r--r--tools/server-side/svnpubsub/svnpubsub/__init__.py1
-rw-r--r--tools/server-side/svnpubsub/svnpubsub/client.py230
-rw-r--r--tools/server-side/svnpubsub/svnpubsub/server.py241
-rwxr-xr-xtools/server-side/svnpubsub/svntweet.py237
-rw-r--r--tools/server-side/svnpubsub/svnwcsub.conf.example16
-rwxr-xr-xtools/server-side/svnpubsub/svnwcsub.py546
-rwxr-xr-xtools/server-side/svnpubsub/testserver.py50
-rwxr-xr-xtools/server-side/svnpubsub/watcher.py55
21 files changed, 2434 insertions, 0 deletions
diff --git a/tools/server-side/svnpubsub/README.txt b/tools/server-side/svnpubsub/README.txt
new file mode 100644
index 0000000..ad4975e
--- /dev/null
+++ b/tools/server-side/svnpubsub/README.txt
@@ -0,0 +1,24 @@
+Installation instructions:
+
+1. Set up an svnpubsub service.
+
+ This directory should be checked out to /usr/local/svnpubsub (or /opt/svnpubsub
+ on Debian).
+
+ There are init scripts for several OSes in the rc.d/ directory; add them
+ to your OS boot process in the usual way for your OS. (For example, via
+ rc.conf(5) or update-rc.d(8).)
+
+2. Run "commit-hook.py $REPOS $REV" from your post-commit hook.
+
+ (As of 1.7, these are the same ordered arguments the post-commmit hook
+ itself receives, so you can just symlink commit-hook.py as hooks/post-commit
+ hook if you don't need any other hooks to run in the server process. (This
+ isn't as insane as it sounds --- post-commit email hooks could also feed of
+ svnpubsub, and thus not be run within the committing server thread, but on
+ any other process or box that listens to the svnpubsub stream!))
+
+3. Set up svnpubsub clients.
+
+ (eg svnwcsub.py, svnpubsub/client.py,
+ 'curl -sN http://${hostname}:2069/commits')
diff --git a/tools/server-side/svnpubsub/commit-hook.py b/tools/server-side/svnpubsub/commit-hook.py
new file mode 100755
index 0000000..4a1a3f3
--- /dev/null
+++ b/tools/server-side/svnpubsub/commit-hook.py
@@ -0,0 +1,93 @@
+#!/usr/local/bin/python
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+SVNLOOK="/usr/local/svn-install/current/bin/svnlook"
+#SVNLOOK="/usr/local/bin/svnlook"
+
+HOST="127.0.0.1"
+PORT=2069
+
+import sys
+import subprocess
+try:
+ import simplejson as json
+except ImportError:
+ import json
+
+import urllib2
+
+def svncmd(cmd):
+ return subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE)
+
+def svncmd_uuid(repo):
+ cmd = "%s uuid %s" % (SVNLOOK, repo)
+ p = svncmd(cmd)
+ return p.stdout.read().strip()
+
+def svncmd_info(repo, revision):
+ cmd = "%s info -r %s %s" % (SVNLOOK, revision, repo)
+ p = svncmd(cmd)
+ data = p.stdout.read().split("\n")
+ #print data
+ return {'author': data[0].strip(),
+ 'date': data[1].strip(),
+ 'log': "\n".join(data[3:]).strip()}
+
+def svncmd_changed(repo, revision):
+ cmd = "%s changed -r %s %s" % (SVNLOOK, revision, repo)
+ p = svncmd(cmd)
+ changed = {}
+ while True:
+ line = p.stdout.readline()
+ if not line:
+ break
+ line = line.strip()
+ (flags, filename) = (line[0:3], line[4:])
+ changed[filename] = {'flags': flags}
+ return changed
+
+def do_put(body):
+ opener = urllib2.build_opener(urllib2.HTTPHandler)
+ request = urllib2.Request("http://%s:%d/commits" %(HOST, PORT), data=body)
+ request.add_header('Content-Type', 'application/json')
+ request.get_method = lambda: 'PUT'
+ url = opener.open(request)
+
+
+def main(repo, revision):
+ revision = revision.lstrip('r')
+ i = svncmd_info(repo, revision)
+ data = {'type': 'svn',
+ 'format': 1,
+ 'id': int(revision),
+ 'changed': {},
+ 'repository': svncmd_uuid(repo),
+ 'committer': i['author'],
+ 'log': i['log'],
+ 'date': i['date'],
+ }
+ data['changed'].update(svncmd_changed(repo, revision))
+ body = json.dumps(data)
+ do_put(body)
+
+if __name__ == "__main__":
+ if len(sys.argv) not in (3, 4):
+ sys.stderr.write("invalid args\n")
+ sys.exit(0)
+
+ main(*sys.argv[1:3])
diff --git a/tools/server-side/svnpubsub/daemonize.py b/tools/server-side/svnpubsub/daemonize.py
new file mode 100644
index 0000000..8b85258
--- /dev/null
+++ b/tools/server-side/svnpubsub/daemonize.py
@@ -0,0 +1,272 @@
+# ---------------------------------------------------------------------------
+#
+# Copyright (c) 2005, Greg Stein
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# ---------------------------------------------------------------------------
+#
+# This software lives at:
+# http://gstein.googlecode.com/svn/trunk/python/daemonize.py
+#
+
+import os
+import signal
+import sys
+import time
+
+
+# possible return values from Daemon.daemonize()
+DAEMON_RUNNING = 'The daemon is running'
+DAEMON_NOT_RUNNING = 'The daemon is not running'
+DAEMON_COMPLETE = 'The daemon has completed its operations'
+DAEMON_STARTED = 'The daemon has been started'
+
+
+class Daemon(object):
+
+ def __init__(self, logfile, pidfile):
+ self.logfile = logfile
+ self.pidfile = pidfile
+
+ def foreground(self):
+ "Run in the foreground."
+ ### we should probably create a pidfile. other systems may try to detect
+ ### the pidfile to see if this "daemon" is running.
+ self.setup()
+ self.run()
+ ### remove the pidfile
+
+ def daemonize_exit(self):
+ try:
+ result = self.daemonize()
+ except (ChildFailed, DaemonFailed) as e:
+ # duplicate the exit code
+ sys.exit(e.code)
+ except (ChildTerminatedAbnormally, ChildForkFailed,
+ DaemonTerminatedAbnormally, DaemonForkFailed) as e:
+ sys.stderr.write('ERROR: %s\n' % e)
+ sys.exit(1)
+ except ChildResumedIncorrectly:
+ sys.stderr.write('ERROR: continued after receiving unknown signal.\n')
+ sys.exit(1)
+
+ if result == DAEMON_STARTED or result == DAEMON_COMPLETE:
+ sys.exit(0)
+ elif result == DAEMON_NOT_RUNNING:
+ sys.stderr.write('ERROR: the daemon exited with a success code '
+ 'without signalling its startup.\n')
+ sys.exit(1)
+
+ # in original process. daemon is up and running. we're done.
+
+ def daemonize(self):
+ # fork off a child that can detach itself from this process.
+ try:
+ pid = os.fork()
+ except OSError as e:
+ raise ChildForkFailed(e.errno, e.strerror)
+
+ if pid > 0:
+ # we're in the parent. let's wait for the child to finish setting
+ # things up -- on our exit, we want to ensure the child is accepting
+ # connections.
+ cpid, status = os.waitpid(pid, 0)
+ assert pid == cpid
+ if os.WIFEXITED(status):
+ code = os.WEXITSTATUS(status)
+ if code:
+ raise ChildFailed(code)
+ return DAEMON_RUNNING
+
+ # the child did not exit cleanly.
+ raise ChildTerminatedAbnormally(status)
+
+ # we're in the child.
+
+ # decouple from the parent process
+ os.chdir('/')
+ os.umask(0)
+ os.setsid()
+
+ # remember this pid so the second child can signal it.
+ thispid = os.getpid()
+
+ # register a signal handler so the SIGUSR1 doesn't stop the process.
+ # this object will also record whether if got signalled.
+ daemon_accepting = SignalCatcher(signal.SIGUSR1)
+
+ # if the daemon process exits before sending SIGUSR1, then we need to see
+ # the problem. trap SIGCHLD with a SignalCatcher.
+ daemon_exit = SignalCatcher(signal.SIGCHLD)
+
+ # perform the second fork
+ try:
+ pid = os.fork()
+ except OSError as e:
+ raise DaemonForkFailed(e.errno, e.strerror)
+
+ if pid > 0:
+ # in the parent.
+
+ # we want to wait for the daemon to signal that it has created and
+ # bound the socket, and is (thus) ready for connections. if the
+ # daemon improperly exits before serving, we'll see SIGCHLD and the
+ # .pause will return.
+ ### we should add a timeout to this. allow an optional parameter to
+ ### specify the timeout, in case it takes a long time to start up.
+ signal.pause()
+
+ if daemon_exit.signalled:
+ # reap the daemon process, getting its exit code. bubble it up.
+ cpid, status = os.waitpid(pid, 0)
+ assert pid == cpid
+ if os.WIFEXITED(status):
+ code = os.WEXITSTATUS(status)
+ if code:
+ raise DaemonFailed(code)
+ return DAEMON_NOT_RUNNING
+
+ # the daemon did not exit cleanly.
+ raise DaemonTerminatedAbnormally(status)
+
+ if daemon_accepting.signalled:
+ # the daemon is up and running, so save the pid and return success.
+ if self.pidfile:
+ # Be wary of symlink attacks
+ try:
+ os.remove(self.pidfile)
+ except OSError:
+ pass
+ fd = os.open(self.pidfile, os.O_WRONLY | os.O_CREAT | os.O_EXCL, 0444)
+ os.write(fd, '%d\n' % pid)
+ os.close(fd)
+ return DAEMON_STARTED
+
+ # some other signal popped us out of the pause. the daemon might not
+ # be running.
+ raise ChildResumedIncorrectly()
+
+ # we're a deamon now. get rid of the final remnants of the parent.
+ # start by restoring default signal handlers
+ signal.signal(signal.SIGUSR1, signal.SIG_DFL)
+ signal.signal(signal.SIGCHLD, signal.SIG_DFL)
+ sys.stdout.flush()
+ sys.stderr.flush()
+ si = open('/dev/null', 'r')
+ so = open(self.logfile, 'a+')
+ se = open(self.logfile, 'a+', 0) # unbuffered
+ os.dup2(si.fileno(), sys.stdin.fileno())
+ os.dup2(so.fileno(), sys.stdout.fileno())
+ os.dup2(se.fileno(), sys.stderr.fileno())
+ # note: we could not inline the open() calls. after the fileno() completed,
+ # the file would be closed, making the fileno invalid. gotta hold them
+ # open until now:
+ si.close()
+ so.close()
+ se.close()
+
+ # TEST: don't release the parent immediately. the whole parent stack
+ # should pause along with this sleep.
+ #time.sleep(10)
+
+ # everything is set up. call the initialization function.
+ self.setup()
+
+ # sleep for one second before signalling. we want to make sure the
+ # parent has called signal.pause()
+ ### we should think of a better wait around the race condition.
+ time.sleep(1)
+
+ # okay. the daemon is ready. signal the parent to tell it we're set.
+ os.kill(thispid, signal.SIGUSR1)
+
+ # start the daemon now.
+ self.run()
+
+ # The daemon is shutting down, so toss the pidfile.
+ try:
+ os.remove(self.pidfile)
+ except OSError:
+ pass
+
+ return DAEMON_COMPLETE
+
+ def setup(self):
+ raise NotImplementedError
+
+ def run(self):
+ raise NotImplementedError
+
+
+class SignalCatcher(object):
+ def __init__(self, signum):
+ self.signalled = False
+ signal.signal(signum, self.sig_handler)
+
+ def sig_handler(self, signum, frame):
+ self.signalled = True
+
+
+class ChildTerminatedAbnormally(Exception):
+ "The child process terminated abnormally."
+ def __init__(self, status):
+ Exception.__init__(self, status)
+ self.status = status
+ def __str__(self):
+ return 'child terminated abnormally (0x%04x)' % self.status
+
+class ChildFailed(Exception):
+ "The child process exited with a failure code."
+ def __init__(self, code):
+ Exception.__init__(self, code)
+ self.code = code
+ def __str__(self):
+ return 'child failed with exit code %d' % self.code
+
+class ChildForkFailed(Exception):
+ "The child process could not be forked."
+ def __init__(self, errno, strerror):
+ Exception.__init__(self, errno, strerror)
+ self.errno = errno
+ self.strerror = strerror
+ def __str__(self):
+ return 'child fork failed with error %d (%s)' % self.args
+
+class ChildResumedIncorrectly(Exception):
+ "The child resumed its operation incorrectly."
+
+class DaemonTerminatedAbnormally(Exception):
+ "The daemon process terminated abnormally."
+ def __init__(self, status):
+ Exception.__init__(self, status)
+ self.status = status
+ def __str__(self):
+ return 'daemon terminated abnormally (0x%04x)' % self.status
+
+class DaemonFailed(Exception):
+ "The daemon process exited with a failure code."
+ def __init__(self, code):
+ Exception.__init__(self, code)
+ self.code = code
+ def __str__(self):
+ return 'daemon failed with exit code %d' % self.code
+
+class DaemonForkFailed(Exception):
+ "The daemon process could not be forked."
+ def __init__(self, errno, strerror):
+ Exception.__init__(self, errno, strerror)
+ self.errno = errno
+ self.strerror = strerror
+ def __str__(self):
+ return 'daemon fork failed with error %d (%s)' % self.args
diff --git a/tools/server-side/svnpubsub/irkerbridge.py b/tools/server-side/svnpubsub/irkerbridge.py
new file mode 100755
index 0000000..04b7ee2
--- /dev/null
+++ b/tools/server-side/svnpubsub/irkerbridge.py
@@ -0,0 +1,322 @@
+#!/usr/bin/env python
+#
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# IrkerBridge - Bridge an SvnPubSub stream to Irker.
+
+# Example:
+# irkerbridge.py --daemon --pidfile pid --logfile log config
+#
+# For detailed option help use:
+# irkerbridge.py --help
+
+# It expects a config file that has the following parameters:
+# streams=url
+# Space separated list of URLs to streams.
+# This option should only be in the DEFAULT section, is ignored in
+# all other sections.
+# irker=hostname:port
+# The hostname/port combination of the irker daemon. If port is
+# omitted it defaults to 6659. Irker is connected to over UDP.
+# match=What to use to decide if the commit should be sent to irker.
+# It consists of the repository UUID followed by a slash and a glob pattern.
+# The UUID may be replaced by a * to match all UUIDs. The glob pattern will
+# be matched against all of the dirs_changed. Both the UUID and the glob
+# pattern must match to send the message to irker.
+# to=url
+# Space separated list of URLs (any URL that Irker will accept) to
+# send the resulting message to. At current Irker only supports IRC.
+# template=string
+# A string to use to format the output. The string is a Python
+# string Template. The following variables are available:
+# $committer, $id, $date, $repository, $log, $log_firstline,
+# $log_firstparagraph, $dirs_changed, $dirs_count, $dirs_count_s,
+# $subdirs_count, $subdirs_count_s, $dirs_root
+# Most of them should be self explanatory. $dirs_count is the number of
+# entries in $dirs_changed, $dirs_count_s is a friendly string version,
+# $dirs_root is the common root of all the $dirs_changed, $subdirs_count
+# is the number of subdirs under the $dirs_root that changed,
+# $subdirs_root_s is a friendly string version. $log_firstparagraph cuts
+# the log message at the first blank line and replaces newlines with spaces.
+#
+# Within the config file you have sections. Any configuration option
+# missing from a given section is found in the [DEFAULT] section.
+#
+# Section names are arbitrary names that mean nothing to the bridge. Each
+# section other than the [DEFAULT] section consists of a configuration that
+# may match and send a message to irker to deliver. All matching sections
+# will generate a message.
+#
+# Interpolation of values within the config file is allowed by including
+# %(name)s within a value. For example I can reference the UUID of a repo
+# repeatedly by doing:
+# [DEFAULT]
+# ASF_REPO=13f79535-47bb-0310-9956-ffa450edef68
+#
+# [#commits]
+# match=%(ASF_REPO)s/
+#
+# You can HUP the process to reload the config file without restarting the
+# process. However, you cannot change the streams it is listening to without
+# restarting the process.
+#
+# TODO: Logging in a better way.
+
+# Messages longer than this will be truncated and ... added to the end such
+# that the resulting message is no longer than this:
+MAX_PRIVMSG = 400
+
+import os
+import sys
+import posixpath
+import socket
+import json
+import urlparse
+import optparse
+import ConfigParser
+import traceback
+import signal
+import re
+import fnmatch
+from string import Template
+
+# Packages that come with svnpubsub
+import svnpubsub.client
+import daemonize
+
+class Daemon(daemonize.Daemon):
+ def __init__(self, logfile, pidfile, bdec):
+ daemonize.Daemon.__init__(self, logfile, pidfile)
+
+ self.bdec = bdec
+
+ def setup(self):
+ # There is no setup which the parent needs to wait for.
+ pass
+
+ def run(self):
+ print 'irkerbridge started, pid=%d' % (os.getpid())
+
+ mc = svnpubsub.client.MultiClient(self.bdec.urls,
+ self.bdec.commit,
+ self.bdec.event)
+ mc.run_forever()
+
+
+class BigDoEverythingClass(object):
+ def __init__(self, config, options):
+ self.config = config
+ self.options = options
+ self.urls = config.get_value('streams').split()
+
+ def locate_matching_configs(self, commit):
+ result = [ ]
+ for section in self.config.sections():
+ match = self.config.get(section, "match").split('/', 1)
+ if len(match) < 2:
+ # No slash so assume all paths
+ match.append('*')
+ match_uuid, match_path = match
+ if commit.repository == match_uuid or match_uuid == "*":
+ for path in commit.changed:
+ if fnmatch.fnmatch(path, match_path):
+ result.append(section)
+ break
+ return result
+
+ def _generate_dirs_changed(self, commit):
+ if hasattr(commit, 'dirs_changed') or not hasattr(commit, 'changed'):
+ return
+
+ dirs_changed = set()
+ for p in commit.changed:
+ if p[-1] == '/' and commit.changed[p]['flags'][1] == 'U':
+ # directory with property changes add the directory itself.
+ dirs_changed.add(p)
+ else:
+ # everything else add the parent of the path
+ # directories have a trailing slash so if it's present remove
+ # it before finding the parent. The result will be a directory
+ # so it needs a trailing slash
+ dirs_changed.add(posixpath.dirname(p.rstrip('/')) + '/')
+
+ commit.dirs_changed = dirs_changed
+ return
+
+ def fill_in_extra_args(self, commit):
+ # Set any empty members to the string "<null>"
+ v = vars(commit)
+ for k in v.keys():
+ if not v[k]:
+ v[k] = '<null>'
+
+ self._generate_dirs_changed(commit)
+ # Add entries to the commit object that are useful for
+ # formatting.
+ commit.log_firstline = commit.log.split("\n",1)[0]
+ commit.log_firstparagraph = re.split("\r?\n\r?\n",commit.log,1)[0]
+ commit.log_firstparagraph = re.sub("\r?\n"," ",commit.log_firstparagraph)
+ if commit.dirs_changed:
+ commit.dirs_root = posixpath.commonprefix(commit.dirs_changed)
+ if commit.dirs_root == '':
+ commit.dirs_root = '/'
+ commit.dirs_count = len(commit.dirs_changed)
+ if commit.dirs_count > 1:
+ commit.dirs_count_s = " (%d dirs)" %(commit.dirs_count)
+ else:
+ commit.dirs_count_s = ""
+
+ commit.subdirs_count = commit.dirs_count
+ if commit.dirs_root in commit.dirs_changed:
+ commit.subdirs_count -= 1
+ if commit.subdirs_count >= 1:
+ commit.subdirs_count_s = " + %d subdirs" % (commit.subdirs_count)
+ else:
+ commit.subdirs_count_s = ""
+
+ def _send(self, irker, msg):
+ sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
+ irker_list = irker.split(':')
+ if len(irker_list) < 2:
+ irker_list.append(6659)
+ json_msg = json.dumps(msg)
+ sock.sendto(json_msg, (irker_list[0],int(irker_list[1])))
+ if self.options.verbose:
+ print "SENT: %s to %s" % (json_msg, irker)
+
+ def join_all(self):
+ # Like self.commit(), but ignores self.config.get(section, "template").
+ for section in self.config.sections():
+ irker = self.config.get(section, "irker")
+ to_list = self.config.get(section, "to").split()
+ if not irker or not to_list:
+ continue
+ for to in to_list:
+ msg = {'to': to, 'privmsg': ''}
+ self._send(irker, msg)
+
+ def commit(self, url, commit):
+ if self.options.verbose:
+ print "RECV: from %s" % url
+ print json.dumps(vars(commit), indent=2)
+
+ try:
+ config_sections = self.locate_matching_configs(commit)
+ if len(config_sections) > 0:
+ self.fill_in_extra_args(commit)
+ for section in config_sections:
+ irker = self.config.get(section, "irker")
+ to_list = self.config.get(section, "to").split()
+ template = self.config.get(section, "template")
+ if not irker or not to_list or not template:
+ continue
+ privmsg = Template(template).safe_substitute(vars(commit))
+ if len(privmsg) > MAX_PRIVMSG:
+ privmsg = privmsg[:MAX_PRIVMSG-3] + '...'
+ for to in to_list:
+ msg = {'to': to, 'privmsg': privmsg}
+ self._send(irker, msg)
+
+ except:
+ print "Unexpected error:"
+ traceback.print_exc()
+ sys.stdout.flush()
+ raise
+
+ def event(self, url, event_name, event_arg):
+ if self.options.verbose or event_name != "ping":
+ print 'EVENT: %s from %s' % (event_name, url)
+ sys.stdout.flush()
+
+
+
+class ReloadableConfig(ConfigParser.SafeConfigParser):
+ def __init__(self, fname):
+ ConfigParser.SafeConfigParser.__init__(self)
+
+ self.fname = fname
+ self.read(fname)
+
+ signal.signal(signal.SIGHUP, self.hangup)
+
+ def hangup(self, signalnum, frame):
+ self.reload()
+
+ def reload(self):
+ print "RELOAD: config file: %s" % self.fname
+ sys.stdout.flush()
+
+ # Delete everything. Just re-reading would overlay, and would not
+ # remove sections/options. Note that [DEFAULT] will not be removed.
+ for section in self.sections():
+ self.remove_section(section)
+
+ # Get rid of [DEFAULT]
+ self.remove_section(ConfigParser.DEFAULTSECT)
+
+ # Now re-read the configuration file.
+ self.read(self.fname)
+
+ def get_value(self, which):
+ return self.get(ConfigParser.DEFAULTSECT, which)
+
+
+def main(args):
+ parser = optparse.OptionParser(
+ description='An SvnPubSub client that bridges the data to irker.',
+ usage='Usage: %prog [options] CONFIG_FILE',
+ )
+ parser.add_option('--logfile',
+ help='filename for logging')
+ parser.add_option('--verbose', action='store_true',
+ help="enable verbose logging")
+ parser.add_option('--pidfile',
+ help="the process' PID will be written to this file")
+ parser.add_option('--daemon', action='store_true',
+ help='run as a background daemon')
+
+ options, extra = parser.parse_args(args)
+
+ if len(extra) != 1:
+ parser.error('CONFIG_FILE is requried')
+ config_file = os.path.abspath(extra[0])
+
+ logfile, pidfile = None, None
+ if options.daemon:
+ if options.logfile:
+ logfile = os.path.abspath(options.logfile)
+ else:
+ parser.error('LOGFILE is required when running as a daemon')
+
+ if options.pidfile:
+ pidfile = os.path.abspath(options.pidfile)
+ else:
+ parser.error('PIDFILE is required when running as a daemon')
+
+
+ config = ReloadableConfig(config_file)
+ bdec = BigDoEverythingClass(config, options)
+
+ d = Daemon(logfile, pidfile, bdec)
+ if options.daemon:
+ d.daemonize_exit()
+ else:
+ d.foreground()
+
+if __name__ == "__main__":
+ main(sys.argv[1:])
diff --git a/tools/server-side/svnpubsub/rc.d/svnpubsub b/tools/server-side/svnpubsub/rc.d/svnpubsub
new file mode 120000
index 0000000..b05e35e
--- /dev/null
+++ b/tools/server-side/svnpubsub/rc.d/svnpubsub
@@ -0,0 +1 @@
+svnpubsub.freebsd \ No newline at end of file
diff --git a/tools/server-side/svnpubsub/rc.d/svnpubsub.debian b/tools/server-side/svnpubsub/rc.d/svnpubsub.debian
new file mode 100755
index 0000000..c61057d
--- /dev/null
+++ b/tools/server-side/svnpubsub/rc.d/svnpubsub.debian
@@ -0,0 +1,62 @@
+#!/bin/bash
+### BEGIN INIT INFO
+# Provides: svnpubsub
+# Required-Start: $remote_fs
+# Required-Stop: $remote_fs
+# Default-Start: 2 3 4 5
+# Default-Stop: 0 1 6
+# Short-Description: SvnPubSub
+# Description: start SvnPubSub daemon
+#### END INIT INFO
+
+. /lib/init/vars.sh
+. /lib/lsb/init-functions
+
+svnpubsub_user=${svnpubsub_user-"daemon"}
+svnpubsub_group=${svnpubsub_group-"daemon"}
+svnpubsub_reactor=${svnpubsub_reactor-"poll"}
+svnpubsub_pidfile=${svnpubsub_pidfile-"/var/run/svnpubsub.pid"}
+pidfile="${svnpubsub_pidfile}"
+
+TWSITD_CMD="/usr/bin/twistd -y /opt/svnpubsub/svnpubsub.tac \
+ --logfile=/var/log/svnpubsub/svnpubsub.log \
+ --pidfile=${pidfile} \
+ --uid=${svnpubsub_user} --gid=${svnpubsub_user} \
+ -r${svnpubsub_reactor}"
+
+RETVAL=0
+
+start() {
+ echo "Starting SvnPubSub Server: "
+ $TWSITD_CMD
+ RETVAL=$?
+ [ $RETVAL -eq 0 ] && echo "ok" || echo "failed"
+ return $RETVAL
+}
+
+stop() {
+ echo "Stopping SvnPubSub Server: "
+ THE_PID=`cat ${pidfile}`
+ kill $THE_PID
+ RETVAL=$?
+ [ $RETVAL -eq 0 ] && echo "ok" || echo "failed"
+ return $RETVAL
+}
+
+case "$1" in
+ start)
+ start
+ ;;
+ stop)
+ stop
+ ;;
+ restart)
+ stop
+ start
+ ;;
+ *)
+ echo "Usage: $0 {start|stop|restart}"
+ exit 1
+esac
+
+exit $RETVAL
diff --git a/tools/server-side/svnpubsub/rc.d/svnpubsub.freebsd b/tools/server-side/svnpubsub/rc.d/svnpubsub.freebsd
new file mode 100755
index 0000000..71fc8c8
--- /dev/null
+++ b/tools/server-side/svnpubsub/rc.d/svnpubsub.freebsd
@@ -0,0 +1,37 @@
+#!/bin/sh
+#
+# PROVIDE: svnpubsub
+# REQUIRE: DAEMON
+# KEYWORD: shutdown
+
+. /etc/rc.subr
+
+name="svnpubsub"
+rcvar=`set_rcvar`
+
+load_rc_config $name
+
+#
+# DO NOT CHANGE THESE DEFAULT VALUES HERE
+# SET THEM IN THE /etc/rc.conf FILE
+#
+svnpubsub_enable=${svnpubsub_enable-"NO"}
+svnpubsub_user=${svnpubsub_user-"svn"}
+svnpubsub_group=${svnpubsub_group-"svn"}
+svnpubsub_reactor=${svnpubsub_reactor-"poll"}
+svnpubsub_pidfile=${svnpubsub_pidfile-"/var/run/svnpubsub/svnpubsub.pid"}
+svnpubsub_cmd_int=${svnpubsub_cmd_int-"python"}
+pidfile="${svnpubsub_pidfile}"
+
+export PYTHON_EGG_CACHE="/home/svn/.python-eggs"
+
+command="/usr/local/bin/twistd"
+command_interpreter="/usr/local/bin/${svnwcsub_cmd_int}"
+command_args="-y /usr/local/svnpubsub/svnpubsub.tac \
+ --logfile=/var/log/vc/svnpubsub.log \
+ --pidfile=${pidfile} \
+ --uid=${svnpubsub_user} --gid=${svnpubsub_user} \
+ -r${svnpubsub_reactor}"
+
+
+run_rc_command "$1"
diff --git a/tools/server-side/svnpubsub/rc.d/svnpubsub.solaris b/tools/server-side/svnpubsub/rc.d/svnpubsub.solaris
new file mode 100755
index 0000000..3a9cf9f
--- /dev/null
+++ b/tools/server-side/svnpubsub/rc.d/svnpubsub.solaris
@@ -0,0 +1,53 @@
+#!/usr/bin/bash
+#
+# a dumb init script for twistd on solaris. cus like, writing XML for SMF is f'ing lame.
+#
+
+svnpubsub_user=${svnpubsub_user-"daemon"}
+svnpubsub_group=${svnpubsub_group-"daemon"}
+svnpubsub_reactor=${svnpubsub_reactor-"poll"}
+svnpubsub_pidfile=${svnpubsub_pidfile-"/var/run/svnpubsub/svnpubsub.pid"}
+pidfile="${svnpubsub_pidfile}"
+
+TWSITD_CMD="/opt/local/bin//twistd -y /usr/local/svnpubsub/svnpubsub.tac \
+ --logfile=/x1/log/svnpubsub.log \
+ --pidfile=${pidfile} \
+ --uid=${svnpubsub_user} --gid=${svnpubsub_user} \
+ -r${svnpubsub_reactor}"
+
+RETVAL=0
+
+start() {
+ echo "Starting SvnPubSub Server: "
+ $TWSITD_CMD
+ RETVAL=$?
+ [ $RETVAL -eq 0 ] && echo "ok" || echo "failed"
+ return $RETVAL
+}
+
+stop() {
+ echo "Stopping SvnPubSub Server: "
+ THE_PID=`cat ${pidfile}`
+ kill $THE_PID
+ RETVAL=$?
+ [ $RETVAL -eq 0 ] && echo "ok" || echo "failed"
+ return $RETVAL
+}
+
+case "$1" in
+ start)
+ start
+ ;;
+ stop)
+ stop
+ ;;
+ restart)
+ stop
+ start
+ ;;
+ *)
+ echo "Usage: $0 {start|stop|restart}"
+ exit 1
+esac
+
+exit $RETVAL
diff --git a/tools/server-side/svnpubsub/rc.d/svnwcsub b/tools/server-side/svnpubsub/rc.d/svnwcsub
new file mode 120000
index 0000000..310fcbe
--- /dev/null
+++ b/tools/server-side/svnpubsub/rc.d/svnwcsub
@@ -0,0 +1 @@
+svnwcsub.freebsd \ No newline at end of file
diff --git a/tools/server-side/svnpubsub/rc.d/svnwcsub.debian b/tools/server-side/svnpubsub/rc.d/svnwcsub.debian
new file mode 100755
index 0000000..caf5511
--- /dev/null
+++ b/tools/server-side/svnpubsub/rc.d/svnwcsub.debian
@@ -0,0 +1,65 @@
+#!/bin/bash
+### BEGIN INIT INFO
+# Provides: svnwcsub
+# Required-Start: $remote_fs
+# Required-Stop: $remote_fs
+# Default-Start: 2 3 4 5
+# Default-Stop: 0 1 6
+# Short-Description: SvnWcSub
+# Description: start SvnWcSub daemon
+#### END INIT INFO
+
+. /lib/init/vars.sh
+. /lib/lsb/init-functions
+
+svnwcsub_user=${svnwcsub_user-"svnwc"}
+svnwcsub_group=${svnwcsub_group-"svnwc"}
+svnwcsub_pidfile=${svnwcsub_pidfile-"/var/run/svnwcsub.pid"}
+svnwcsub_config=${svnwcsub_config-"/etc/svnwcsub.conf"}
+svnwcsub_logfile=${svnwcsub_logfile-"/var/log/svnwcsub/svnwcsub.log"}
+pidfile="${svnwcsub_pidfile}"
+
+SVNWCSUB_CMD="/opt/svnpubsub/svnwcsub.py \
+ --daemon \
+ --logfile=${svnwcsub_logfile} \
+ --pidfile=${pidfile} \
+ --uid=${svnwcsub_user} --gid=${svnwcsub_group} \
+ --umask=002 \
+ ${svnwcsub_config} "
+
+RETVAL=0
+
+start() {
+ echo "Starting SvnWcSub Server: "
+ $SVNWCSUB_CMD
+ RETVAL=$?
+ [ $RETVAL -eq 0 ] && echo "ok" || echo "failed"
+ return $RETVAL
+}
+
+stop() {
+ echo "Stopping SvnWcSub Server: "
+ THE_PID=`cat ${pidfile}`
+ kill $THE_PID
+ RETVAL=$?
+ [ $RETVAL -eq 0 ] && echo "ok" || echo "failed"
+ return $RETVAL
+}
+
+case "$1" in
+ start)
+ start
+ ;;
+ stop)
+ stop
+ ;;
+ restart)
+ stop
+ start
+ ;;
+ *)
+ echo "Usage: $0 {start|stop|restart}"
+ exit 1
+esac
+
+exit $RETVAL
diff --git a/tools/server-side/svnpubsub/rc.d/svnwcsub.freebsd b/tools/server-side/svnpubsub/rc.d/svnwcsub.freebsd
new file mode 100755
index 0000000..58ad386
--- /dev/null
+++ b/tools/server-side/svnpubsub/rc.d/svnwcsub.freebsd
@@ -0,0 +1,39 @@
+#!/bin/sh
+#
+# PROVIDE: svnwcsub
+# REQUIRE: DAEMON
+# KEYWORD: shutdown
+
+. /etc/rc.subr
+
+name="svnwcsub"
+rcvar=`set_rcvar`
+
+load_rc_config $name
+
+#
+# DO NOT CHANGE THESE DEFAULT VALUES HERE
+# SET THEM IN THE /etc/rc.conf FILE
+#
+svnwcsub_enable=${svnwcsub_enable-"NO"}
+svnwcsub_user=${svnwcsub_user-"svnwc"}
+svnwcsub_group=${svnwcsub_group-"svnwc"}
+svnwcsub_pidfile=${svnwcsub_pidfile-"/var/run/svnwcsub/svnwcsub.pub"}
+svnwcsub_env="PYTHON_EGG_CACHE"
+svnwcsub_cmd_int=${svnwcsub_cmd_int-"python"}
+svnwcsub_config=${svnwcsub_config-"/etc/svnwcsub.conf"}
+svnwcsub_logfile=${svnwcsub_logfile-"/var/log/svnwcsub/svnwcsub.log"}
+pidfile="${svnwcsub_pidfile}"
+
+export PYTHON_EGG_CACHE="/var/run/svnwcsub"
+
+command="/usr/local/svnpubsub/svnwcsub.py"
+command_interpreter="/usr/local/bin/${svnwcsub_cmd_int}"
+command_args="--daemon \
+ --logfile=${svnwcsub_logfile} \
+ --pidfile=${pidfile} \
+ --uid=${svnwcsub_user} --gid=${svnwcsub_group} \
+ --umask=002 \
+ ${svnwcsub_config}"
+
+run_rc_command "$1"
diff --git a/tools/server-side/svnpubsub/rc.d/svnwcsub.solaris b/tools/server-side/svnpubsub/rc.d/svnwcsub.solaris
new file mode 100755
index 0000000..bd0c2bd
--- /dev/null
+++ b/tools/server-side/svnpubsub/rc.d/svnwcsub.solaris
@@ -0,0 +1,56 @@
+#!/usr/bin/bash
+#
+# a dumb init script for twistd on solaris. cus like, writing XML for SMF is f'ing lame.
+#
+
+svnwcsub_user=${svnwcsub_user-"svnwc"}
+svnwcsub_group=${svnwcsub_group-"other"}
+svnwcsub_pidfile=${svnwcsub_pidfile-"/var/run/svnwcsub/svnwcsub.pid"}
+svnwcsub_config=${svnwcsub_config-"/etc/svnwcsub.conf"}
+svnwcsub_logfile=${svnwcsub_logfile-"/x1/log/svnwcsub/svnwcsub.log"}
+pidfile="${svnwcsub_pidfile}"
+
+SVNWCSUB_CMD="/usr/local/svnpubsub/svnwcsub.py \
+ --daemon \
+ --logfile=${svnwcsub_logfile} \
+ --pidfile=${pidfile} \
+ --uid=${svnwcsub_user} --gid=${svnwcsub_group} \
+ --umask=002 \
+ ${svnwcsub_config}"
+
+RETVAL=0
+
+start() {
+ echo "Starting SvnWcSub Server: "
+ $SVNWCSUB_CMD
+ RETVAL=$?
+ [ $RETVAL -eq 0 ] && echo "ok" || echo "failed"
+ return $RETVAL
+}
+
+stop() {
+ echo "Stopping SvnWcSub Server: "
+ THE_PID=`cat ${pidfile}`
+ kill $THE_PID
+ RETVAL=$?
+ [ $RETVAL -eq 0 ] && echo "ok" || echo "failed"
+ return $RETVAL
+}
+
+case "$1" in
+ start)
+ start
+ ;;
+ stop)
+ stop
+ ;;
+ restart)
+ stop
+ start
+ ;;
+ *)
+ echo "Usage: $0 {start|stop|restart}"
+ exit 1
+esac
+
+exit $RETVAL
diff --git a/tools/server-side/svnpubsub/svnpubsub.tac b/tools/server-side/svnpubsub/svnpubsub.tac
new file mode 100644
index 0000000..574ad24
--- /dev/null
+++ b/tools/server-side/svnpubsub/svnpubsub.tac
@@ -0,0 +1,33 @@
+#!/usr/bin/env python
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import sys
+import os
+from twisted.application import service, internet
+
+sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
+
+from svnpubsub.server import svnpubsub_server
+
+application = service.Application("SvnPubSub")
+
+def get_service():
+ return internet.TCPServer(2069, svnpubsub_server())
+
+service = get_service()
+service.setServiceParent(application)
diff --git a/tools/server-side/svnpubsub/svnpubsub/__init__.py b/tools/server-side/svnpubsub/svnpubsub/__init__.py
new file mode 100644
index 0000000..f50e195
--- /dev/null
+++ b/tools/server-side/svnpubsub/svnpubsub/__init__.py
@@ -0,0 +1 @@
+# Turn svnpubsub/ into a package.
diff --git a/tools/server-side/svnpubsub/svnpubsub/client.py b/tools/server-side/svnpubsub/svnpubsub/client.py
new file mode 100644
index 0000000..c1631d6
--- /dev/null
+++ b/tools/server-side/svnpubsub/svnpubsub/client.py
@@ -0,0 +1,230 @@
+#!/usr/bin/env python
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+#
+# Generic client for SvnPubSub
+#
+# ### usage...
+#
+#
+# EVENTS
+#
+# connected: a connection to the server has been opened (though not
+# necessarily established)
+# closed: the connection was closed. reconnect will be attempted.
+# error: an error closed the connection. reconnect will be attempted.
+# ping: the server has sent a keepalive
+# stale: no activity has been seen, so the connection will be closed
+# and reopened
+#
+
+import asyncore
+import asynchat
+import socket
+import functools
+import time
+import json
+try:
+ import urlparse
+except ImportError:
+ import urllib.parse as urlparse
+
+# How long the polling loop should wait for activity before returning.
+TIMEOUT = 30.0
+
+# Always delay a bit when trying to reconnect. This is not precise, but sets
+# a minimum amount of delay. At the moment, there is no further backoff.
+RECONNECT_DELAY = 25.0
+
+# If we don't see anything from the server for this amount time, then we
+# will drop and reconnect. The TCP connection may have gone down without
+# us noticing it somehow.
+STALE_DELAY = 60.0
+
+
+class SvnpubsubClientException(Exception):
+ pass
+
+class Client(asynchat.async_chat):
+
+ def __init__(self, url, commit_callback, event_callback):
+ asynchat.async_chat.__init__(self)
+
+ self.last_activity = time.time()
+ self.ibuffer = []
+
+ self.url = url
+ parsed_url = urlparse.urlsplit(url)
+ if parsed_url.scheme != 'http':
+ raise ValueError("URL scheme must be http: '%s'" % url)
+ host = parsed_url.hostname
+ port = parsed_url.port
+ resource = parsed_url.path
+ if parsed_url.query:
+ resource += "?%s" % parsed_url.query
+ if parsed_url.fragment:
+ resource += "#%s" % parsed_url.fragment
+
+ self.event_callback = event_callback
+
+ self.parser = JSONRecordHandler(commit_callback, event_callback)
+
+ # Wait for the end of headers. Then we start parsing JSON.
+ self.set_terminator(b'\r\n\r\n')
+ self.skipping_headers = True
+
+ self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
+ try:
+ self.connect((host, port))
+ except:
+ self.handle_error()
+ return
+
+ self.push(('GET %s HTTP/1.0\r\n\r\n' % resource).encode('ascii'))
+
+ def handle_connect(self):
+ self.event_callback('connected', None)
+
+ def handle_close(self):
+ self.event_callback('closed', None)
+ self.close()
+
+ def handle_error(self):
+ self.event_callback('error', None)
+ self.close()
+
+ def found_terminator(self):
+ if self.skipping_headers:
+ self.skipping_headers = False
+ # Each JSON record is terminated by a null character
+ self.set_terminator(b'\0')
+ else:
+ record = b"".join(self.ibuffer)
+ self.ibuffer = []
+ self.parser.feed(record.decode())
+
+ def collect_incoming_data(self, data):
+ # Remember the last time we saw activity
+ self.last_activity = time.time()
+
+ if not self.skipping_headers:
+ self.ibuffer.append(data)
+
+
+class JSONRecordHandler:
+ def __init__(self, commit_callback, event_callback):
+ self.commit_callback = commit_callback
+ self.event_callback = event_callback
+
+ def feed(self, record):
+ obj = json.loads(record)
+ if 'svnpubsub' in obj:
+ actual_version = obj['svnpubsub'].get('version')
+ EXPECTED_VERSION = 1
+ if actual_version != EXPECTED_VERSION:
+ raise SvnpubsubClientException("Unknown svnpubsub format: %r != %d"
+ % (actual_format, expected_format))
+ self.event_callback('version', obj['svnpubsub']['version'])
+ elif 'commit' in obj:
+ commit = Commit(obj['commit'])
+ self.commit_callback(commit)
+ elif 'stillalive' in obj:
+ self.event_callback('ping', obj['stillalive'])
+
+
+class Commit(object):
+ def __init__(self, commit):
+ self.__dict__.update(commit)
+
+
+class MultiClient(object):
+ def __init__(self, urls, commit_callback, event_callback):
+ self.commit_callback = commit_callback
+ self.event_callback = event_callback
+
+ # No target time, as no work to do
+ self.target_time = 0
+ self.work_items = [ ]
+
+ for url in urls:
+ self._add_channel(url)
+
+ def _reconnect(self, url, event_name, event_arg):
+ if event_name == 'closed' or event_name == 'error':
+ # Stupid connection closed for some reason. Set up a reconnect. Note
+ # that it should have been removed from asyncore.socket_map already.
+ self._reconnect_later(url)
+
+ # Call the user's callback now.
+ self.event_callback(url, event_name, event_arg)
+
+ def _reconnect_later(self, url):
+ # Set up a work item to reconnect in a little while.
+ self.work_items.append(url)
+
+ # Only set a target if one has not been set yet. Otherwise, we could
+ # create a race condition of continually moving out towards the future
+ if not self.target_time:
+ self.target_time = time.time() + RECONNECT_DELAY
+
+ def _add_channel(self, url):
+ # Simply instantiating the client will install it into the global map
+ # for processing in the main event loop.
+ Client(url,
+ functools.partial(self.commit_callback, url),
+ functools.partial(self._reconnect, url))
+
+ def _check_stale(self):
+ now = time.time()
+ for client in asyncore.socket_map.values():
+ if client.last_activity + STALE_DELAY < now:
+ # Whoops. No activity in a while. Signal this fact, Close the
+ # Client, then have it reconnected later on.
+ self.event_callback(client.url, 'stale', client.last_activity)
+
+ # This should remove it from .socket_map.
+ client.close()
+
+ self._reconnect_later(client.url)
+
+ def _maybe_work(self):
+ # If we haven't reach the targetted time, or have no work to do,
+ # then fast-path exit
+ if time.time() < self.target_time or not self.work_items:
+ return
+
+ # We'll take care of all the work items, so no target for future work
+ self.target_time = 0
+
+ # Play a little dance just in case work gets added while we're
+ # currently working on stuff
+ work = self.work_items
+ self.work_items = [ ]
+
+ for url in work:
+ self._add_channel(url)
+
+ def run_forever(self):
+ while True:
+ if asyncore.socket_map:
+ asyncore.loop(timeout=TIMEOUT, count=1)
+ else:
+ time.sleep(TIMEOUT)
+
+ self._check_stale()
+ self._maybe_work()
diff --git a/tools/server-side/svnpubsub/svnpubsub/server.py b/tools/server-side/svnpubsub/svnpubsub/server.py
new file mode 100644
index 0000000..faee423
--- /dev/null
+++ b/tools/server-side/svnpubsub/svnpubsub/server.py
@@ -0,0 +1,241 @@
+#!/usr/bin/env python
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+#
+# SvnPubSub - Simple Push Notification of Subversion commits
+#
+# Based on the theory behind the Live Journal Atom Streaming Service:
+# <http://atom.services.livejournal.com/>
+#
+# Instead of using a complicated XMPP/AMPQ/JMS/super messaging service,
+# we have simple HTTP GETs and PUTs to get data in and out.
+#
+# Currently supports both XML and JSON serialization.
+#
+# Example Sub clients:
+# curl -sN http://127.0.0.1:2069/commits
+# curl -sN http://127.0.0.1:2069/commits/svn/*
+# curl -sN http://127.0.0.1:2069/commits/svn
+# curl -sN http://127.0.0.1:2069/commits/*/13f79535-47bb-0310-9956-ffa450edef68
+# curl -sN http://127.0.0.1:2069/commits/svn/13f79535-47bb-0310-9956-ffa450edef68
+#
+# URL is built into 2 parts:
+# /commits/${optional_type}/${optional_repository}
+#
+# If the type is included in the URL, you will only get commits of that type.
+# The type can be * and then you will receive commits of any type.
+#
+# If the repository is included in the URL, you will only receive
+# messages about that repository. The repository can be * and then you
+# will receive messages about all repositories.
+#
+# Example Pub clients:
+# curl -T revinfo.json -i http://127.0.0.1:2069/commits
+#
+# TODO:
+# - Add Real access controls (not just 127.0.0.1)
+# - Document PUT format
+# - Convert to twisted.python.log
+
+
+
+
+try:
+ import simplejson as json
+except ImportError:
+ import json
+
+import sys
+
+import twisted
+from twisted.internet import reactor
+from twisted.internet import defer
+from twisted.web import server
+from twisted.web import resource
+from twisted.python import log
+
+import time
+
+class Commit:
+ def __init__(self, r):
+ self.__dict__.update(r)
+ if not self.check_value('repository'):
+ raise ValueError('Invalid Repository Value')
+ if not self.check_value('type'):
+ raise ValueError('Invalid Type Value')
+ if not self.check_value('format'):
+ raise ValueError('Invalid Format Value')
+ if not self.check_value('id'):
+ raise ValueError('Invalid ID Value')
+
+ def check_value(self, k):
+ return hasattr(self, k) and self.__dict__[k]
+
+ def render_commit(self):
+ obj = {'commit': {}}
+ obj['commit'].update(self.__dict__)
+ return json.dumps(obj)
+
+ def render_log(self):
+ try:
+ paths_changed = " %d paths changed" % len(self.changed)
+ except:
+ paths_changed = ""
+ return "%s:%s repo '%s' id '%s'%s" % (self.type,
+ self.format,
+ self.repository,
+ self.id,
+ paths_changed)
+
+
+HEARTBEAT_TIME = 15
+
+class Client(object):
+ def __init__(self, pubsub, r, type, repository):
+ self.pubsub = pubsub
+ r.notifyFinish().addErrback(self.finished)
+ self.r = r
+ self.type = type
+ self.repository = repository
+ self.alive = True
+ log.msg("OPEN: %s:%d (%d clients online)"% (r.getClientIP(), r.client.port, pubsub.cc()+1))
+
+ def finished(self, reason):
+ self.alive = False
+ log.msg("CLOSE: %s:%d (%d clients online)"% (self.r.getClientIP(), self.r.client.port, self.pubsub.cc()))
+ try:
+ self.pubsub.remove(self)
+ except ValueError:
+ pass
+
+ def interested_in(self, commit):
+ if self.type and self.type != commit.type:
+ return False
+
+ if self.repository and self.repository != commit.repository:
+ return False
+
+ return True
+
+ def notify(self, data):
+ self.write(data)
+
+ def start(self):
+ self.write_start()
+ reactor.callLater(HEARTBEAT_TIME, self.heartbeat, None)
+
+ def heartbeat(self, args):
+ if self.alive:
+ self.write_heartbeat()
+ reactor.callLater(HEARTBEAT_TIME, self.heartbeat, None)
+
+ def write_data(self, data):
+ self.write(data + "\n\0")
+
+ """ "Data must not be unicode" is what the interfaces.ITransport says... grr. """
+ def write(self, input):
+ self.r.write(str(input))
+
+ def write_start(self):
+ self.r.setHeader('X-SVNPubSub-Version', '1')
+ self.r.setHeader('content-type', 'application/vnd.apache.vc-notify+json')
+ self.write('{"svnpubsub": {"version": 1}}\n\0')
+
+ def write_heartbeat(self):
+ self.write(json.dumps({"stillalive": time.time()}) + "\n\0")
+
+
+class SvnPubSub(resource.Resource):
+ isLeaf = True
+ clients = []
+
+ def cc(self):
+ return len(self.clients)
+
+ def remove(self, c):
+ self.clients.remove(c)
+
+ def render_GET(self, request):
+ log.msg("REQUEST: %s" % (request.uri))
+ request.setHeader('content-type', 'text/plain')
+
+ repository = None
+ type = None
+
+ uri = request.uri.split('/')
+ uri_len = len(uri)
+ if uri_len < 2 or uri_len > 4:
+ request.setResponseCode(400)
+ return "Invalid path\n"
+
+ if uri_len >= 3:
+ type = uri[2]
+
+ if uri_len == 4:
+ repository = uri[3]
+
+ # Convert wild card to None.
+ if type == '*':
+ type = None
+ if repository == '*':
+ repository = None
+
+ c = Client(self, request, type, repository)
+ self.clients.append(c)
+ c.start()
+ return twisted.web.server.NOT_DONE_YET
+
+ def notifyAll(self, commit):
+ data = commit.render_commit()
+
+ log.msg("COMMIT: %s (%d clients)" % (commit.render_log(), self.cc()))
+ for client in self.clients:
+ if client.interested_in(commit):
+ client.write_data(data)
+
+ def render_PUT(self, request):
+ request.setHeader('content-type', 'text/plain')
+ ip = request.getClientIP()
+ if ip != "127.0.0.1":
+ request.setResponseCode(401)
+ return "Access Denied"
+ input = request.content.read()
+ #import pdb;pdb.set_trace()
+ #print "input: %s" % (input)
+ try:
+ c = json.loads(input)
+ commit = Commit(c)
+ except ValueError as e:
+ request.setResponseCode(400)
+ log.msg("COMMIT: failed due to: %s" % str(e))
+ return str(e)
+ self.notifyAll(commit)
+ return "Ok"
+
+def svnpubsub_server():
+ root = resource.Resource()
+ s = SvnPubSub()
+ root.putChild("commits", s)
+ return server.Site(root)
+
+if __name__ == "__main__":
+ log.startLogging(sys.stdout)
+ # Port 2069 "HTTP Event Port", whatever, sounds good to me
+ reactor.listenTCP(2069, svnpubsub_server())
+ reactor.run()
+
diff --git a/tools/server-side/svnpubsub/svntweet.py b/tools/server-side/svnpubsub/svntweet.py
new file mode 100755
index 0000000..ed426bd
--- /dev/null
+++ b/tools/server-side/svnpubsub/svntweet.py
@@ -0,0 +1,237 @@
+#!/usr/bin/env python
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+#
+# SvnTweet - Subscribe to a SvnPubSub stream, and Twitter about it!
+#
+# Example:
+# svntweet.py my-config.json
+#
+# With my-config.json containing stream paths and the twitter auth info:
+# {"stream": "http://svn.apache.org:2069/commits",
+# "username": "asfcommits",
+# "password": "MyLuggageComboIs1234"}
+#
+#
+#
+
+import threading
+import sys
+import os
+try:
+ import simplejson as json
+except ImportError:
+ import json
+
+from twisted.internet import defer, reactor, task, threads
+from twisted.python import failure, log
+from twisted.web.client import HTTPClientFactory, HTTPPageDownloader
+
+from urlparse import urlparse
+import time
+import posixpath
+
+sys.path.insert(0, os.path.join(os.path.dirname(os.path.abspath(__file__)), "twitty-twister", "lib"))
+try:
+ import twitter
+except:
+ print "Get a copy of twitty-twister from <http://github.com/dustin/twitty-twister>"
+ sys.exit(-1)
+class Config(object):
+ def __init__(self, path):
+ self.path = path
+ self.mtime_path = 0
+ self.config = {}
+ self._load_config()
+
+ def _load_config(self):
+ mtime = os.path.getmtime(self.path)
+ if mtime != self.mtime_path:
+ fp = open(self.path, "rb")
+ self.mtime_path = mtime
+ self.config = json.loads(fp.read())
+
+class HTTPStream(HTTPClientFactory):
+ protocol = HTTPPageDownloader
+
+ def __init__(self, url):
+ HTTPClientFactory.__init__(self, url, method="GET", agent="SvnTweet/0.1.0")
+
+ def pageStart(self, partial):
+ pass
+
+ def pagePart(self, data):
+ pass
+
+ def pageEnd(self):
+ pass
+
+class Commit(object):
+ def __init__(self, commit):
+ self.__dict__.update(commit)
+
+class JSONRecordHandler:
+ def __init__(self, bdec):
+ self.bdec = bdec
+
+ def feed(self, record):
+ obj = json.loads(record)
+ if 'svnpubsub' in obj:
+ actual_version = obj['svnpubsub'].get('version')
+ EXPECTED_VERSION = 1
+ if actual_version != EXPECTED_VERSION:
+ raise ValueException("Unknown svnpubsub format: %r != %d"
+ % (actual_format, expected_format))
+ elif 'commit' in obj:
+ commit = Commit(obj['commit'])
+ if not hasattr(commit, 'type'):
+ raise ValueException("Commit object is missing type field.")
+ if not hasattr(commit, 'format'):
+ raise ValueException("Commit object is missing format field.")
+ if commit.type != 'svn' and commit.format != 1:
+ raise ValueException("Unexpected type and/or format: %s:%s"
+ % (commit.type, commit.format))
+ self.bdec.commit(commit)
+ elif 'stillalive' in obj:
+ self.bdec.stillalive()
+
+class JSONHTTPStream(HTTPStream):
+ def __init__(self, url, bdec):
+ HTTPStream.__init__(self, url)
+ self.bdec = bdec
+ self.ibuffer = []
+ self.parser = JSONRecordHandler(bdec)
+
+ def pageStart(self, partial):
+ self.bdec.pageStart()
+
+ def pagePart(self, data):
+ eor = data.find("\0")
+ if eor >= 0:
+ self.ibuffer.append(data[0:eor])
+ self.parser.feed(''.join(self.ibuffer))
+ self.ibuffer = [data[eor+1:]]
+ else:
+ self.ibuffer.append(data)
+
+def connectTo(url, bdec):
+ u = urlparse(url)
+ port = u.port
+ if not port:
+ port = 80
+ s = JSONHTTPStream(url, bdec)
+ conn = reactor.connectTCP(u.hostname, u.port, s)
+ return [s, conn]
+
+
+CHECKBEAT_TIME = 90
+
+class BigDoEverythingClasss(object):
+ def __init__(self, config):
+ self.c = config
+ self.c._load_config()
+ self.url = str(self.c.config.get('stream'))
+ self.failures = 0
+ self.alive = time.time()
+ self.checker = task.LoopingCall(self._checkalive)
+ self.transport = None
+ self.stream = None
+ self._restartStream()
+ self.watch = []
+ self.twit = twitter.Twitter(self.c.config.get('username'), self.c.config.get('password'))
+
+ def pageStart(self):
+ log.msg("Stream Connection Established")
+ self.failures = 0
+
+ def _restartStream(self):
+ (self.stream, self.transport) = connectTo(self.url, self)
+ self.stream.deferred.addBoth(self.streamDead)
+ self.alive = time.time()
+ self.checker.start(CHECKBEAT_TIME)
+
+ def _checkalive(self):
+ n = time.time()
+ if n - self.alive > CHECKBEAT_TIME:
+ log.msg("Stream is dead, reconnecting")
+ self.transport.disconnect()
+
+ def stillalive(self):
+ self.alive = time.time()
+
+ def streamDead(self, v):
+ BACKOFF_SECS = 5
+ BACKOFF_MAX = 60
+ self.checker.stop()
+
+ self.stream = None
+ self.failures += 1
+ backoff = min(self.failures * BACKOFF_SECS, BACKOFF_MAX)
+ log.msg("Stream disconnected, trying again in %d seconds.... %s" % (backoff, self.url))
+ reactor.callLater(backoff, self._restartStream)
+
+ def _normalize_path(self, path):
+ if path[0] != '/':
+ return "/" + path
+ return posixpath.abspath(path)
+
+ def tweet(self, msg):
+ log.msg("SEND TWEET: %s" % (msg))
+ self.twit.update(msg).addCallback(self.tweet_done).addErrback(log.msg)
+
+ def tweet_done(self, x):
+ log.msg("TWEET: Success!")
+
+ def build_tweet(self, commit):
+ maxlen = 144
+ left = maxlen
+ paths = map(self._normalize_path, commit.changed)
+ if not len(paths):
+ return None
+ path = posixpath.commonprefix(paths)
+ if path[0:1] == '/' and len(path) > 1:
+ path = path[1:]
+
+ #TODO: allow URL to be configurable.
+ link = " - http://svn.apache.org/r%d" % (commit.id)
+ left -= len(link)
+ msg = "r%d in %s by %s: " % (commit.id, path, commit.committer)
+ left -= len(msg)
+ if left > 3:
+ msg += commit.log[0:left]
+ msg += link
+ return msg
+
+ def commit(self, commit):
+ log.msg("COMMIT r%d (%d paths)" % (commit.id, len(commit.changed)))
+ msg = self.build_tweet(commit)
+ if msg:
+ self.tweet(msg)
+ #print "Common Prefix: %s" % (pre)
+
+def main(config_file):
+ c = Config(config_file)
+ big = BigDoEverythingClasss(c)
+ reactor.run()
+
+if __name__ == "__main__":
+ if len(sys.argv) != 2:
+ print "invalid args, read source code"
+ sys.exit(0)
+ log.startLogging(sys.stdout)
+ main(sys.argv[1])
diff --git a/tools/server-side/svnpubsub/svnwcsub.conf.example b/tools/server-side/svnpubsub/svnwcsub.conf.example
new file mode 100644
index 0000000..644a3b7
--- /dev/null
+++ b/tools/server-side/svnpubsub/svnwcsub.conf.example
@@ -0,0 +1,16 @@
+[DEFAULT]
+svnbin: /usr/local/bin/svn
+streams: http://svn.example.org:2069/commits/svn
+# hook: /usr/bin/true
+
+## The values below are used by ConfigParser's interpolation syntax.
+## See http://docs.python.org/library/configparser
+SOME_REPOS: svn://svn.example.org/repos/chaos
+
+[env]
+HOME: /home/svn
+LANG: en_US.UTF-8
+
+[track]
+/usr/local/foo/prod: %(SOME_REPOS)s/foo/production
+/usr/local/foo/dev: %(SOME_REPOS)s/foo/trunk
diff --git a/tools/server-side/svnpubsub/svnwcsub.py b/tools/server-side/svnpubsub/svnwcsub.py
new file mode 100755
index 0000000..366df7c
--- /dev/null
+++ b/tools/server-side/svnpubsub/svnwcsub.py
@@ -0,0 +1,546 @@
+#!/usr/bin/env python
+# encoding: UTF-8
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+#
+# SvnWcSub - Subscribe to a SvnPubSub stream, and keep a set of working copy
+# paths in sync
+#
+# Example:
+# svnwcsub.py svnwcsub.conf
+#
+# On startup svnwcsub checks the working copy's path, runs a single svn update
+# and then watches for changes to that path.
+#
+# See svnwcsub.conf for more information on its contents.
+#
+
+# TODO:
+# - bulk update at startup time to avoid backlog warnings
+# - fold BDEC into Daemon
+# - fold WorkingCopy._get_match() into __init__
+# - remove wc_ready(). assume all WorkingCopy instances are usable.
+# place the instances into .watch at creation. the .update_applies()
+# just returns if the wc is disabled (eg. could not find wc dir)
+# - figure out way to avoid the ASF-specific PRODUCTION_RE_FILTER
+# (a base path exclusion list should work for the ASF)
+# - add support for SIGHUP to reread the config and reinitialize working copies
+# - joes will write documentation for svnpubsub as these items become fulfilled
+# - make LOGLEVEL configurable
+
+import errno
+import subprocess
+import threading
+import sys
+import os
+import re
+import posixpath
+try:
+ import ConfigParser
+except ImportError:
+ import configparser as ConfigParser
+import time
+import logging.handlers
+try:
+ import Queue
+except ImportError:
+ import queue as Queue
+import optparse
+import functools
+try:
+ import urlparse
+except ImportError:
+ import urllib.parse as urlparse
+
+import daemonize
+import svnpubsub.client
+
+# check_output() is only available in Python 2.7. Allow us to run with
+# earlier versions
+try:
+ check_output = subprocess.check_output
+except AttributeError:
+ def check_output(args, env): # note: we only use these two args
+ pipe = subprocess.Popen(args, stdout=subprocess.PIPE, env=env)
+ output, _ = pipe.communicate()
+ if pipe.returncode:
+ raise subprocess.CalledProcessError(pipe.returncode, args)
+ return output
+
+assert hasattr(subprocess, 'check_call')
+def check_call(*args, **kwds):
+ """Wrapper around subprocess.check_call() that logs stderr upon failure."""
+ assert 'stderr' not in kwds
+ kwds.update(stderr=subprocess.PIPE)
+ pipe = subprocess.Popen(*args, **kwds)
+ output, errput = pipe.communicate()
+ if pipe.returncode:
+ cmd = args[0] if len(args) else kwds.get('args', '(no command)')
+ # TODO: log stdout too?
+ logging.error('Command failed: returncode=%d command=%r stderr=%r',
+ pipe.returncode, cmd, errput)
+ raise subprocess.CalledProcessError(pipe.returncode, args)
+ return pipe.returncode # is EXIT_OK
+
+### note: this runs synchronously. within the current Twisted environment,
+### it is called from ._get_match() which is run on a thread so it won't
+### block the Twisted main loop.
+def svn_info(svnbin, env, path):
+ "Run 'svn info' on the target path, returning a dict of info data."
+ args = [svnbin, "info", "--non-interactive", "--", path]
+ output = check_output(args, env=env).strip()
+ info = { }
+ for line in output.split('\n'):
+ idx = line.index(':')
+ info[line[:idx]] = line[idx+1:].strip()
+ return info
+
+try:
+ import glob
+ glob.iglob
+ def is_emptydir(path):
+ # ### If the directory contains only dotfile children, this will readdir()
+ # ### the entire directory. But os.readdir() is not exposed to us...
+ for x in glob.iglob('%s/*' % path):
+ return False
+ for x in glob.iglob('%s/.*' % path):
+ return False
+ return True
+except (ImportError, AttributeError):
+ # Python ≤2.4
+ def is_emptydir(path):
+ # This will read the entire directory list to memory.
+ return not os.listdir(path)
+
+class WorkingCopy(object):
+ def __init__(self, bdec, path, url):
+ self.path = path
+ self.url = url
+
+ try:
+ self.match, self.uuid = self._get_match(bdec.svnbin, bdec.env)
+ bdec.wc_ready(self)
+ except:
+ logging.exception('problem with working copy: %s', path)
+
+ def update_applies(self, uuid, path):
+ if self.uuid != uuid:
+ return False
+
+ path = str(path)
+ if path == self.match:
+ #print "ua: Simple match"
+ # easy case. woo.
+ return True
+ if len(path) < len(self.match):
+ # path is potentially a parent directory of match?
+ #print "ua: parent check"
+ if self.match[0:len(path)] == path:
+ return True
+ if len(path) > len(self.match):
+ # path is potentially a sub directory of match
+ #print "ua: sub dir check"
+ if path[0:len(self.match)] == self.match:
+ return True
+ return False
+
+ def _get_match(self, svnbin, env):
+ ### quick little hack to auto-checkout missing working copies
+ dotsvn = os.path.join(self.path, ".svn")
+ if not os.path.isdir(dotsvn) or is_emptydir(dotsvn):
+ logging.info("autopopulate %s from %s" % (self.path, self.url))
+ check_call([svnbin, 'co', '-q',
+ '--force',
+ '--non-interactive',
+ '--config-option',
+ 'config:miscellany:use-commit-times=on',
+ '--', self.url, self.path],
+ env=env)
+
+ # Fetch the info for matching dirs_changed against this WC
+ info = svn_info(svnbin, env, self.path)
+ root = info['Repository Root']
+ url = info['URL']
+ relpath = url[len(root):] # also has leading '/'
+ uuid = info['Repository UUID']
+ return str(relpath), uuid
+
+
+PRODUCTION_RE_FILTER = re.compile("/websites/production/[^/]+/")
+
+class BigDoEverythingClasss(object):
+ def __init__(self, config):
+ self.svnbin = config.get_value('svnbin')
+ self.env = config.get_env()
+ self.tracking = config.get_track()
+ self.hook = config.get_optional_value('hook')
+ self.streams = config.get_value('streams').split()
+ self.worker = BackgroundWorker(self.svnbin, self.env, self.hook)
+ self.watch = [ ]
+
+ def start(self):
+ for path, url in self.tracking.items():
+ # working copies auto-register with the BDEC when they are ready.
+ WorkingCopy(self, path, url)
+
+ def wc_ready(self, wc):
+ # called when a working copy object has its basic info/url,
+ # Add it to our watchers, and trigger an svn update.
+ logging.info("Watching WC at %s <-> %s" % (wc.path, wc.url))
+ self.watch.append(wc)
+ self.worker.add_work(OP_BOOT, wc)
+
+ def _normalize_path(self, path):
+ if path[0] != '/':
+ return "/" + path
+ return posixpath.abspath(path)
+
+ def commit(self, url, commit):
+ if commit.type != 'svn' or commit.format != 1:
+ logging.info("SKIP unknown commit format (%s.%d)",
+ commit.type, commit.format)
+ return
+ logging.info("COMMIT r%d (%d paths) from %s"
+ % (commit.id, len(commit.changed), url))
+
+ paths = map(self._normalize_path, commit.changed)
+ if len(paths):
+ pre = posixpath.commonprefix(paths)
+ if pre == "/websites/":
+ # special case for svnmucc "dynamic content" buildbot commits
+ # just take the first production path to avoid updating all cms working copies
+ for p in paths:
+ m = PRODUCTION_RE_FILTER.match(p)
+ if m:
+ pre = m.group(0)
+ break
+
+ #print "Common Prefix: %s" % (pre)
+ wcs = [wc for wc in self.watch if wc.update_applies(commit.repository, pre)]
+ logging.info("Updating %d WC for r%d" % (len(wcs), commit.id))
+ for wc in wcs:
+ self.worker.add_work(OP_UPDATE, wc)
+
+
+# Start logging warnings if the work backlog reaches this many items
+BACKLOG_TOO_HIGH = 20
+OP_BOOT = 'boot'
+OP_UPDATE = 'update'
+OP_CLEANUP = 'cleanup'
+
+class BackgroundWorker(threading.Thread):
+ def __init__(self, svnbin, env, hook):
+ threading.Thread.__init__(self)
+
+ # The main thread/process should not wait for this thread to exit.
+ ### compat with Python 2.5
+ self.setDaemon(True)
+
+ self.svnbin = svnbin
+ self.env = env
+ self.hook = hook
+ self.q = Queue.Queue()
+
+ self.has_started = False
+
+ def run(self):
+ while True:
+ # This will block until something arrives
+ operation, wc = self.q.get()
+
+ # Warn if the queue is too long.
+ # (Note: the other thread might have added entries to self.q
+ # after the .get() and before the .qsize().)
+ qsize = self.q.qsize()+1
+ if operation != OP_BOOT and qsize > BACKLOG_TOO_HIGH:
+ logging.warn('worker backlog is at %d', qsize)
+
+ try:
+ if operation == OP_UPDATE:
+ self._update(wc)
+ elif operation == OP_BOOT:
+ self._update(wc, boot=True)
+ elif operation == OP_CLEANUP:
+ self._cleanup(wc)
+ else:
+ logging.critical('unknown operation: %s', operation)
+ except:
+ logging.exception('exception in worker')
+
+ # In case we ever want to .join() against the work queue
+ self.q.task_done()
+
+ def add_work(self, operation, wc):
+ # Start the thread when work first arrives. Thread-start needs to
+ # be delayed in case the process forks itself to become a daemon.
+ if not self.has_started:
+ self.start()
+ self.has_started = True
+
+ self.q.put((operation, wc))
+
+ def _update(self, wc, boot=False):
+ "Update the specified working copy."
+
+ # For giggles, let's clean up the working copy in case something
+ # happened earlier.
+ self._cleanup(wc)
+
+ logging.info("updating: %s", wc.path)
+
+ ### we need to move some of these args into the config. these are
+ ### still specific to the ASF setup.
+ args = [self.svnbin, 'switch',
+ '--quiet',
+ '--non-interactive',
+ '--trust-server-cert',
+ '--ignore-externals',
+ '--config-option',
+ 'config:miscellany:use-commit-times=on',
+ '--',
+ wc.url,
+ wc.path]
+ check_call(args, env=self.env)
+
+ ### check the loglevel before running 'svn info'?
+ info = svn_info(self.svnbin, self.env, wc.path)
+ logging.info("updated: %s now at r%s", wc.path, info['Revision'])
+
+ ## Run the hook
+ if self.hook:
+ hook_mode = ['post-update', 'boot'][boot]
+ logging.info('running hook: %s at revision %s due to %s',
+ wc.path, info['Revision'], hook_mode)
+ args = [self.hook, hook_mode,
+ wc.path, info['Revision'], wc.url]
+ check_call(args, env=self.env)
+
+ def _cleanup(self, wc):
+ "Run a cleanup on the specified working copy."
+
+ ### we need to move some of these args into the config. these are
+ ### still specific to the ASF setup.
+ args = [self.svnbin, 'cleanup',
+ '--non-interactive',
+ '--trust-server-cert',
+ '--config-option',
+ 'config:miscellany:use-commit-times=on',
+ wc.path]
+ check_call(args, env=self.env)
+
+
+class ReloadableConfig(ConfigParser.SafeConfigParser):
+ def __init__(self, fname):
+ ConfigParser.SafeConfigParser.__init__(self)
+
+ self.fname = fname
+ self.read(fname)
+
+ ### install a signal handler to set SHOULD_RELOAD. BDEC should
+ ### poll this flag, and then adjust its internal structures after
+ ### the reload.
+ self.should_reload = False
+
+ def reload(self):
+ # Delete everything. Just re-reading would overlay, and would not
+ # remove sections/options. Note that [DEFAULT] will not be removed.
+ for section in self.sections():
+ self.remove_section(section)
+
+ # Now re-read the configuration file.
+ self.read(fname)
+
+ def get_value(self, which):
+ return self.get(ConfigParser.DEFAULTSECT, which)
+
+ def get_optional_value(self, which, default=None):
+ if self.has_option(ConfigParser.DEFAULTSECT, which):
+ return self.get(ConfigParser.DEFAULTSECT, which)
+ else:
+ return default
+
+ def get_env(self):
+ env = os.environ.copy()
+ default_options = self.defaults().keys()
+ for name, value in self.items('env'):
+ if name not in default_options:
+ env[name] = value
+ return env
+
+ def get_track(self):
+ "Return the {PATH: URL} dictionary of working copies to track."
+ track = dict(self.items('track'))
+ for name in self.defaults().keys():
+ del track[name]
+ return track
+
+ def optionxform(self, option):
+ # Do not lowercase the option name.
+ return str(option)
+
+
+class Daemon(daemonize.Daemon):
+ def __init__(self, logfile, pidfile, umask, bdec):
+ daemonize.Daemon.__init__(self, logfile, pidfile)
+
+ self.umask = umask
+ self.bdec = bdec
+
+ def setup(self):
+ # There is no setup which the parent needs to wait for.
+ pass
+
+ def run(self):
+ logging.info('svnwcsub started, pid=%d', os.getpid())
+
+ # Set the umask in the daemon process. Defaults to 000 for
+ # daemonized processes. Foreground processes simply inherit
+ # the value from the parent process.
+ if self.umask is not None:
+ umask = int(self.umask, 8)
+ os.umask(umask)
+ logging.info('umask set to %03o', umask)
+
+ # Start the BDEC (on the main thread), then start the client
+ self.bdec.start()
+
+ mc = svnpubsub.client.MultiClient(self.bdec.streams,
+ self.bdec.commit,
+ self._event)
+ mc.run_forever()
+
+ def _event(self, url, event_name, event_arg):
+ if event_name == 'error':
+ logging.exception('from %s', url)
+ elif event_name == 'ping':
+ logging.debug('ping from %s', url)
+ else:
+ logging.info('"%s" from %s', event_name, url)
+
+
+def prepare_logging(logfile):
+ "Log to the specified file, or to stdout if None."
+
+ if logfile:
+ # Rotate logs daily, keeping 7 days worth.
+ handler = logging.handlers.TimedRotatingFileHandler(
+ logfile, when='midnight', backupCount=7,
+ )
+ else:
+ handler = logging.StreamHandler(sys.stdout)
+
+ # Add a timestamp to the log records
+ formatter = logging.Formatter('%(asctime)s [%(levelname)s] %(message)s',
+ '%Y-%m-%d %H:%M:%S')
+ handler.setFormatter(formatter)
+
+ # Apply the handler to the root logger
+ root = logging.getLogger()
+ root.addHandler(handler)
+
+ ### use logging.INFO for now. switch to cmdline option or a config?
+ root.setLevel(logging.INFO)
+
+
+def handle_options(options):
+ # Set up the logging, then process the rest of the options.
+ prepare_logging(options.logfile)
+
+ # In daemon mode, we let the daemonize module handle the pidfile.
+ # Otherwise, we should write this (foreground) PID into the file.
+ if options.pidfile and not options.daemon:
+ pid = os.getpid()
+ # Be wary of symlink attacks
+ try:
+ os.remove(options.pidfile)
+ except OSError:
+ pass
+ fd = os.open(options.pidfile, os.O_WRONLY | os.O_CREAT | os.O_EXCL,
+ 0444)
+ os.write(fd, '%d\n' % pid)
+ os.close(fd)
+ logging.info('pid %d written to %s', pid, options.pidfile)
+
+ if options.gid:
+ try:
+ gid = int(options.gid)
+ except ValueError:
+ import grp
+ gid = grp.getgrnam(options.gid)[2]
+ logging.info('setting gid %d', gid)
+ os.setgid(gid)
+
+ if options.uid:
+ try:
+ uid = int(options.uid)
+ except ValueError:
+ import pwd
+ uid = pwd.getpwnam(options.uid)[2]
+ logging.info('setting uid %d', uid)
+ os.setuid(uid)
+
+
+def main(args):
+ parser = optparse.OptionParser(
+ description='An SvnPubSub client to keep working copies synchronized '
+ 'with a repository.',
+ usage='Usage: %prog [options] CONFIG_FILE',
+ )
+ parser.add_option('--logfile',
+ help='filename for logging')
+ parser.add_option('--pidfile',
+ help="the process' PID will be written to this file")
+ parser.add_option('--uid',
+ help='switch to this UID before running')
+ parser.add_option('--gid',
+ help='switch to this GID before running')
+ parser.add_option('--umask',
+ help='set this (octal) umask before running')
+ parser.add_option('--daemon', action='store_true',
+ help='run as a background daemon')
+
+ options, extra = parser.parse_args(args)
+
+ if len(extra) != 1:
+ parser.error('CONFIG_FILE is required')
+ config_file = extra[0]
+
+ if options.daemon and not options.logfile:
+ parser.error('LOGFILE is required when running as a daemon')
+ if options.daemon and not options.pidfile:
+ parser.error('PIDFILE is required when running as a daemon')
+
+ # Process any provided options.
+ handle_options(options)
+
+ c = ReloadableConfig(config_file)
+ bdec = BigDoEverythingClasss(c)
+
+ # We manage the logfile ourselves (along with possible rotation). The
+ # daemon process can just drop stdout/stderr into /dev/null.
+ d = Daemon('/dev/null', options.pidfile, options.umask, bdec)
+ if options.daemon:
+ # Daemonize the process and call sys.exit() with appropriate code
+ d.daemonize_exit()
+ else:
+ # Just run in the foreground (the default)
+ d.foreground()
+
+
+if __name__ == "__main__":
+ main(sys.argv[1:])
diff --git a/tools/server-side/svnpubsub/testserver.py b/tools/server-side/svnpubsub/testserver.py
new file mode 100755
index 0000000..8966a95
--- /dev/null
+++ b/tools/server-side/svnpubsub/testserver.py
@@ -0,0 +1,50 @@
+#!/usr/bin/env python
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+#
+# A simple test server for responding in different ways to SvnPubSub clients.
+# This avoids the complexity of the Twisted framework in order to direct
+# various (abnormal) conditions at the client.
+#
+# ### usage...
+#
+
+import sys
+import BaseHTTPServer
+
+
+PORT = 2069
+
+TEST_BODY = '{"svnpubsub": {"version": 1}}\n\0{"commit": {"type": "svn", "format": 1, "repository": "12345678-1234-1234-1234-123456789012", "id": "1234", "committer": "johndoe", "date": "2012-01-01 01:01:01 +0000 (Sun, 01 Jan 2012)", "log": "Frob the ganoozle with the snookish", "changed": {"one/path/alpha": {"flags": "U "}, "some/other/directory/": {"flags": "_U "}}}}\n\0'
+
+SEND_KEEPALIVE = True
+
+
+class TestHandler(BaseHTTPServer.BaseHTTPRequestHandler):
+ def do_GET(self):
+ self.send_response(200)
+ self.send_header('Content-Length', str(len(TEST_BODY)))
+ self.send_header('Connection', 'keep-alive')
+ self.end_headers()
+ self.wfile.write(TEST_BODY)
+
+
+if __name__ == '__main__':
+ server = BaseHTTPServer.HTTPServer(('', PORT), TestHandler)
+ sys.stderr.write('Now listening on port %d...\n' % (PORT,))
+ server.serve_forever()
diff --git a/tools/server-side/svnpubsub/watcher.py b/tools/server-side/svnpubsub/watcher.py
new file mode 100755
index 0000000..340b100
--- /dev/null
+++ b/tools/server-side/svnpubsub/watcher.py
@@ -0,0 +1,55 @@
+#!/usr/bin/env python
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+#
+# Watch for events from SvnPubSub and print them to stdout
+#
+#
+
+import sys
+import pprint
+try:
+ import urlparse
+except ImportError:
+ import urllib.parse as urlparse
+
+import svnpubsub.client
+
+
+def _commit(url, commit):
+ print('COMMIT: from %s' % url)
+ pprint.pprint(vars(commit), indent=2)
+
+
+def _event(url, event_name, event_arg):
+ if event_arg:
+ print('EVENT: from %s "%s" "%s"' % (url, event_name, event_arg))
+ else:
+ print('EVENT: from %s "%s"' % (url, event_name))
+
+
+def main(urls):
+ mc = svnpubsub.client.MultiClient(urls, _commit, _event)
+ mc.run_forever()
+
+
+if __name__ == "__main__":
+ if len(sys.argv) < 2:
+ print("usage: watcher.py URL [URL...]")
+ sys.exit(0)
+ main(sys.argv[1:])