path: root/tools/server-side/svnpubsub/
diff options
Diffstat (limited to 'tools/server-side/svnpubsub/')
1 files changed, 546 insertions, 0 deletions
diff --git a/tools/server-side/svnpubsub/ b/tools/server-side/svnpubsub/
new file mode 100755
index 0000000..366df7c
--- /dev/null
+++ b/tools/server-side/svnpubsub/
@@ -0,0 +1,546 @@
+#!/usr/bin/env python
+# encoding: UTF-8
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# SvnWcSub - Subscribe to a SvnPubSub stream, and keep a set of working copy
+# paths in sync
+# Example:
+# svnwcsub.conf
+# On startup svnwcsub checks the working copy's path, runs a single svn update
+# and then watches for changes to that path.
+# See svnwcsub.conf for more information on its contents.
+# TODO:
+# - bulk update at startup time to avoid backlog warnings
+# - fold BDEC into Daemon
+# - fold WorkingCopy._get_match() into __init__
+# - remove wc_ready(). assume all WorkingCopy instances are usable.
+# place the instances into .watch at creation. the .update_applies()
+# just returns if the wc is disabled (eg. could not find wc dir)
+# - figure out way to avoid the ASF-specific PRODUCTION_RE_FILTER
+# (a base path exclusion list should work for the ASF)
+# - add support for SIGHUP to reread the config and reinitialize working copies
+# - joes will write documentation for svnpubsub as these items become fulfilled
+# - make LOGLEVEL configurable
+import errno
+import subprocess
+import threading
+import sys
+import os
+import re
+import posixpath
+ import ConfigParser
+except ImportError:
+ import configparser as ConfigParser
+import time
+import logging.handlers
+ import Queue
+except ImportError:
+ import queue as Queue
+import optparse
+import functools
+ import urlparse
+except ImportError:
+ import urllib.parse as urlparse
+import daemonize
+import svnpubsub.client
+# check_output() is only available in Python 2.7. Allow us to run with
+# earlier versions
+ check_output = subprocess.check_output
+except AttributeError:
+ def check_output(args, env): # note: we only use these two args
+ pipe = subprocess.Popen(args, stdout=subprocess.PIPE, env=env)
+ output, _ = pipe.communicate()
+ if pipe.returncode:
+ raise subprocess.CalledProcessError(pipe.returncode, args)
+ return output
+assert hasattr(subprocess, 'check_call')
+def check_call(*args, **kwds):
+ """Wrapper around subprocess.check_call() that logs stderr upon failure."""
+ assert 'stderr' not in kwds
+ kwds.update(stderr=subprocess.PIPE)
+ pipe = subprocess.Popen(*args, **kwds)
+ output, errput = pipe.communicate()
+ if pipe.returncode:
+ cmd = args[0] if len(args) else kwds.get('args', '(no command)')
+ # TODO: log stdout too?
+ logging.error('Command failed: returncode=%d command=%r stderr=%r',
+ pipe.returncode, cmd, errput)
+ raise subprocess.CalledProcessError(pipe.returncode, args)
+ return pipe.returncode # is EXIT_OK
+### note: this runs synchronously. within the current Twisted environment,
+### it is called from ._get_match() which is run on a thread so it won't
+### block the Twisted main loop.
+def svn_info(svnbin, env, path):
+ "Run 'svn info' on the target path, returning a dict of info data."
+ args = [svnbin, "info", "--non-interactive", "--", path]
+ output = check_output(args, env=env).strip()
+ info = { }
+ for line in output.split('\n'):
+ idx = line.index(':')
+ info[line[:idx]] = line[idx+1:].strip()
+ return info
+ import glob
+ glob.iglob
+ def is_emptydir(path):
+ # ### If the directory contains only dotfile children, this will readdir()
+ # ### the entire directory. But os.readdir() is not exposed to us...
+ for x in glob.iglob('%s/*' % path):
+ return False
+ for x in glob.iglob('%s/.*' % path):
+ return False
+ return True
+except (ImportError, AttributeError):
+ # Python ≤2.4
+ def is_emptydir(path):
+ # This will read the entire directory list to memory.
+ return not os.listdir(path)
+class WorkingCopy(object):
+ def __init__(self, bdec, path, url):
+ self.path = path
+ self.url = url
+ try:
+ self.match, self.uuid = self._get_match(bdec.svnbin, bdec.env)
+ bdec.wc_ready(self)
+ except:
+ logging.exception('problem with working copy: %s', path)
+ def update_applies(self, uuid, path):
+ if self.uuid != uuid:
+ return False
+ path = str(path)
+ if path == self.match:
+ #print "ua: Simple match"
+ # easy case. woo.
+ return True
+ if len(path) < len(self.match):
+ # path is potentially a parent directory of match?
+ #print "ua: parent check"
+ if self.match[0:len(path)] == path:
+ return True
+ if len(path) > len(self.match):
+ # path is potentially a sub directory of match
+ #print "ua: sub dir check"
+ if path[0:len(self.match)] == self.match:
+ return True
+ return False
+ def _get_match(self, svnbin, env):
+ ### quick little hack to auto-checkout missing working copies
+ dotsvn = os.path.join(self.path, ".svn")
+ if not os.path.isdir(dotsvn) or is_emptydir(dotsvn):
+"autopopulate %s from %s" % (self.path, self.url))
+ check_call([svnbin, 'co', '-q',
+ '--force',
+ '--non-interactive',
+ '--config-option',
+ 'config:miscellany:use-commit-times=on',
+ '--', self.url, self.path],
+ env=env)
+ # Fetch the info for matching dirs_changed against this WC
+ info = svn_info(svnbin, env, self.path)
+ root = info['Repository Root']
+ url = info['URL']
+ relpath = url[len(root):] # also has leading '/'
+ uuid = info['Repository UUID']
+ return str(relpath), uuid
+PRODUCTION_RE_FILTER = re.compile("/websites/production/[^/]+/")
+class BigDoEverythingClasss(object):
+ def __init__(self, config):
+ self.svnbin = config.get_value('svnbin')
+ self.env = config.get_env()
+ self.tracking = config.get_track()
+ self.hook = config.get_optional_value('hook')
+ self.streams = config.get_value('streams').split()
+ self.worker = BackgroundWorker(self.svnbin, self.env, self.hook)
+ = [ ]
+ def start(self):
+ for path, url in self.tracking.items():
+ # working copies auto-register with the BDEC when they are ready.
+ WorkingCopy(self, path, url)
+ def wc_ready(self, wc):
+ # called when a working copy object has its basic info/url,
+ # Add it to our watchers, and trigger an svn update.
+"Watching WC at %s <-> %s" % (wc.path, wc.url))
+ self.worker.add_work(OP_BOOT, wc)
+ def _normalize_path(self, path):
+ if path[0] != '/':
+ return "/" + path
+ return posixpath.abspath(path)
+ def commit(self, url, commit):
+ if commit.type != 'svn' or commit.format != 1:
+"SKIP unknown commit format (%s.%d)",
+ commit.type, commit.format)
+ return
+"COMMIT r%d (%d paths) from %s"
+ % (, len(commit.changed), url))
+ paths = map(self._normalize_path, commit.changed)
+ if len(paths):
+ pre = posixpath.commonprefix(paths)
+ if pre == "/websites/":
+ # special case for svnmucc "dynamic content" buildbot commits
+ # just take the first production path to avoid updating all cms working copies
+ for p in paths:
+ if m:
+ pre =
+ break
+ #print "Common Prefix: %s" % (pre)
+ wcs = [wc for wc in if wc.update_applies(commit.repository, pre)]
+"Updating %d WC for r%d" % (len(wcs),
+ for wc in wcs:
+ self.worker.add_work(OP_UPDATE, wc)
+# Start logging warnings if the work backlog reaches this many items
+OP_BOOT = 'boot'
+OP_UPDATE = 'update'
+OP_CLEANUP = 'cleanup'
+class BackgroundWorker(threading.Thread):
+ def __init__(self, svnbin, env, hook):
+ threading.Thread.__init__(self)
+ # The main thread/process should not wait for this thread to exit.
+ ### compat with Python 2.5
+ self.setDaemon(True)
+ self.svnbin = svnbin
+ self.env = env
+ self.hook = hook
+ self.q = Queue.Queue()
+ self.has_started = False
+ def run(self):
+ while True:
+ # This will block until something arrives
+ operation, wc = self.q.get()
+ # Warn if the queue is too long.
+ # (Note: the other thread might have added entries to self.q
+ # after the .get() and before the .qsize().)
+ qsize = self.q.qsize()+1
+ if operation != OP_BOOT and qsize > BACKLOG_TOO_HIGH:
+ logging.warn('worker backlog is at %d', qsize)
+ try:
+ if operation == OP_UPDATE:
+ self._update(wc)
+ elif operation == OP_BOOT:
+ self._update(wc, boot=True)
+ elif operation == OP_CLEANUP:
+ self._cleanup(wc)
+ else:
+ logging.critical('unknown operation: %s', operation)
+ except:
+ logging.exception('exception in worker')
+ # In case we ever want to .join() against the work queue
+ self.q.task_done()
+ def add_work(self, operation, wc):
+ # Start the thread when work first arrives. Thread-start needs to
+ # be delayed in case the process forks itself to become a daemon.
+ if not self.has_started:
+ self.start()
+ self.has_started = True
+ self.q.put((operation, wc))
+ def _update(self, wc, boot=False):
+ "Update the specified working copy."
+ # For giggles, let's clean up the working copy in case something
+ # happened earlier.
+ self._cleanup(wc)
+"updating: %s", wc.path)
+ ### we need to move some of these args into the config. these are
+ ### still specific to the ASF setup.
+ args = [self.svnbin, 'switch',
+ '--quiet',
+ '--non-interactive',
+ '--trust-server-cert',
+ '--ignore-externals',
+ '--config-option',
+ 'config:miscellany:use-commit-times=on',
+ '--',
+ wc.url,
+ wc.path]
+ check_call(args, env=self.env)
+ ### check the loglevel before running 'svn info'?
+ info = svn_info(self.svnbin, self.env, wc.path)
+"updated: %s now at r%s", wc.path, info['Revision'])
+ ## Run the hook
+ if self.hook:
+ hook_mode = ['post-update', 'boot'][boot]
+'running hook: %s at revision %s due to %s',
+ wc.path, info['Revision'], hook_mode)
+ args = [self.hook, hook_mode,
+ wc.path, info['Revision'], wc.url]
+ check_call(args, env=self.env)
+ def _cleanup(self, wc):
+ "Run a cleanup on the specified working copy."
+ ### we need to move some of these args into the config. these are
+ ### still specific to the ASF setup.
+ args = [self.svnbin, 'cleanup',
+ '--non-interactive',
+ '--trust-server-cert',
+ '--config-option',
+ 'config:miscellany:use-commit-times=on',
+ wc.path]
+ check_call(args, env=self.env)
+class ReloadableConfig(ConfigParser.SafeConfigParser):
+ def __init__(self, fname):
+ ConfigParser.SafeConfigParser.__init__(self)
+ self.fname = fname
+ ### install a signal handler to set SHOULD_RELOAD. BDEC should
+ ### poll this flag, and then adjust its internal structures after
+ ### the reload.
+ self.should_reload = False
+ def reload(self):
+ # Delete everything. Just re-reading would overlay, and would not
+ # remove sections/options. Note that [DEFAULT] will not be removed.
+ for section in self.sections():
+ self.remove_section(section)
+ # Now re-read the configuration file.
+ def get_value(self, which):
+ return self.get(ConfigParser.DEFAULTSECT, which)
+ def get_optional_value(self, which, default=None):
+ if self.has_option(ConfigParser.DEFAULTSECT, which):
+ return self.get(ConfigParser.DEFAULTSECT, which)
+ else:
+ return default
+ def get_env(self):
+ env = os.environ.copy()
+ default_options = self.defaults().keys()
+ for name, value in self.items('env'):
+ if name not in default_options:
+ env[name] = value
+ return env
+ def get_track(self):
+ "Return the {PATH: URL} dictionary of working copies to track."
+ track = dict(self.items('track'))
+ for name in self.defaults().keys():
+ del track[name]
+ return track
+ def optionxform(self, option):
+ # Do not lowercase the option name.
+ return str(option)
+class Daemon(daemonize.Daemon):
+ def __init__(self, logfile, pidfile, umask, bdec):
+ daemonize.Daemon.__init__(self, logfile, pidfile)
+ self.umask = umask
+ self.bdec = bdec
+ def setup(self):
+ # There is no setup which the parent needs to wait for.
+ pass
+ def run(self):
+'svnwcsub started, pid=%d', os.getpid())
+ # Set the umask in the daemon process. Defaults to 000 for
+ # daemonized processes. Foreground processes simply inherit
+ # the value from the parent process.
+ if self.umask is not None:
+ umask = int(self.umask, 8)
+ os.umask(umask)
+'umask set to %03o', umask)
+ # Start the BDEC (on the main thread), then start the client
+ self.bdec.start()
+ mc = svnpubsub.client.MultiClient(self.bdec.streams,
+ self.bdec.commit,
+ self._event)
+ mc.run_forever()
+ def _event(self, url, event_name, event_arg):
+ if event_name == 'error':
+ logging.exception('from %s', url)
+ elif event_name == 'ping':
+ logging.debug('ping from %s', url)
+ else:
+'"%s" from %s', event_name, url)
+def prepare_logging(logfile):
+ "Log to the specified file, or to stdout if None."
+ if logfile:
+ # Rotate logs daily, keeping 7 days worth.
+ handler = logging.handlers.TimedRotatingFileHandler(
+ logfile, when='midnight', backupCount=7,
+ )
+ else:
+ handler = logging.StreamHandler(sys.stdout)
+ # Add a timestamp to the log records
+ formatter = logging.Formatter('%(asctime)s [%(levelname)s] %(message)s',
+ '%Y-%m-%d %H:%M:%S')
+ handler.setFormatter(formatter)
+ # Apply the handler to the root logger
+ root = logging.getLogger()
+ root.addHandler(handler)
+ ### use logging.INFO for now. switch to cmdline option or a config?
+ root.setLevel(logging.INFO)
+def handle_options(options):
+ # Set up the logging, then process the rest of the options.
+ prepare_logging(options.logfile)
+ # In daemon mode, we let the daemonize module handle the pidfile.
+ # Otherwise, we should write this (foreground) PID into the file.
+ if options.pidfile and not options.daemon:
+ pid = os.getpid()
+ # Be wary of symlink attacks
+ try:
+ os.remove(options.pidfile)
+ except OSError:
+ pass
+ fd =, os.O_WRONLY | os.O_CREAT | os.O_EXCL,
+ 0444)
+ os.write(fd, '%d\n' % pid)
+ os.close(fd)
+'pid %d written to %s', pid, options.pidfile)
+ if options.gid:
+ try:
+ gid = int(options.gid)
+ except ValueError:
+ import grp
+ gid = grp.getgrnam(options.gid)[2]
+'setting gid %d', gid)
+ os.setgid(gid)
+ if options.uid:
+ try:
+ uid = int(options.uid)
+ except ValueError:
+ import pwd
+ uid = pwd.getpwnam(options.uid)[2]
+'setting uid %d', uid)
+ os.setuid(uid)
+def main(args):
+ parser = optparse.OptionParser(
+ description='An SvnPubSub client to keep working copies synchronized '
+ 'with a repository.',
+ usage='Usage: %prog [options] CONFIG_FILE',
+ )
+ parser.add_option('--logfile',
+ help='filename for logging')
+ parser.add_option('--pidfile',
+ help="the process' PID will be written to this file")
+ parser.add_option('--uid',
+ help='switch to this UID before running')
+ parser.add_option('--gid',
+ help='switch to this GID before running')
+ parser.add_option('--umask',
+ help='set this (octal) umask before running')
+ parser.add_option('--daemon', action='store_true',
+ help='run as a background daemon')
+ options, extra = parser.parse_args(args)
+ if len(extra) != 1:
+ parser.error('CONFIG_FILE is required')
+ config_file = extra[0]
+ if options.daemon and not options.logfile:
+ parser.error('LOGFILE is required when running as a daemon')
+ if options.daemon and not options.pidfile:
+ parser.error('PIDFILE is required when running as a daemon')
+ # Process any provided options.
+ handle_options(options)
+ c = ReloadableConfig(config_file)
+ bdec = BigDoEverythingClasss(c)
+ # We manage the logfile ourselves (along with possible rotation). The
+ # daemon process can just drop stdout/stderr into /dev/null.
+ d = Daemon('/dev/null', options.pidfile, options.umask, bdec)
+ if options.daemon:
+ # Daemonize the process and call sys.exit() with appropriate code
+ d.daemonize_exit()
+ else:
+ # Just run in the foreground (the default)
+ d.foreground()
+if __name__ == "__main__":
+ main(sys.argv[1:])