summaryrefslogtreecommitdiff
path: root/tools/server-side/svnpubsub/svnpubsub/client.py
diff options
context:
space:
mode:
Diffstat (limited to 'tools/server-side/svnpubsub/svnpubsub/client.py')
-rw-r--r--tools/server-side/svnpubsub/svnpubsub/client.py230
1 files changed, 230 insertions, 0 deletions
diff --git a/tools/server-side/svnpubsub/svnpubsub/client.py b/tools/server-side/svnpubsub/svnpubsub/client.py
new file mode 100644
index 0000000..c1631d6
--- /dev/null
+++ b/tools/server-side/svnpubsub/svnpubsub/client.py
@@ -0,0 +1,230 @@
+#!/usr/bin/env python
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+#
+# Generic client for SvnPubSub
+#
+# ### usage...
+#
+#
+# EVENTS
+#
+# connected: a connection to the server has been opened (though not
+# necessarily established)
+# closed: the connection was closed. reconnect will be attempted.
+# error: an error closed the connection. reconnect will be attempted.
+# ping: the server has sent a keepalive
+# stale: no activity has been seen, so the connection will be closed
+# and reopened
+#
+
+import asyncore
+import asynchat
+import socket
+import functools
+import time
+import json
+try:
+ import urlparse
+except ImportError:
+ import urllib.parse as urlparse
+
+# How long the polling loop should wait for activity before returning.
+TIMEOUT = 30.0
+
+# Always delay a bit when trying to reconnect. This is not precise, but sets
+# a minimum amount of delay. At the moment, there is no further backoff.
+RECONNECT_DELAY = 25.0
+
+# If we don't see anything from the server for this amount time, then we
+# will drop and reconnect. The TCP connection may have gone down without
+# us noticing it somehow.
+STALE_DELAY = 60.0
+
+
+class SvnpubsubClientException(Exception):
+ pass
+
+class Client(asynchat.async_chat):
+
+ def __init__(self, url, commit_callback, event_callback):
+ asynchat.async_chat.__init__(self)
+
+ self.last_activity = time.time()
+ self.ibuffer = []
+
+ self.url = url
+ parsed_url = urlparse.urlsplit(url)
+ if parsed_url.scheme != 'http':
+ raise ValueError("URL scheme must be http: '%s'" % url)
+ host = parsed_url.hostname
+ port = parsed_url.port
+ resource = parsed_url.path
+ if parsed_url.query:
+ resource += "?%s" % parsed_url.query
+ if parsed_url.fragment:
+ resource += "#%s" % parsed_url.fragment
+
+ self.event_callback = event_callback
+
+ self.parser = JSONRecordHandler(commit_callback, event_callback)
+
+ # Wait for the end of headers. Then we start parsing JSON.
+ self.set_terminator(b'\r\n\r\n')
+ self.skipping_headers = True
+
+ self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
+ try:
+ self.connect((host, port))
+ except:
+ self.handle_error()
+ return
+
+ self.push(('GET %s HTTP/1.0\r\n\r\n' % resource).encode('ascii'))
+
+ def handle_connect(self):
+ self.event_callback('connected', None)
+
+ def handle_close(self):
+ self.event_callback('closed', None)
+ self.close()
+
+ def handle_error(self):
+ self.event_callback('error', None)
+ self.close()
+
+ def found_terminator(self):
+ if self.skipping_headers:
+ self.skipping_headers = False
+ # Each JSON record is terminated by a null character
+ self.set_terminator(b'\0')
+ else:
+ record = b"".join(self.ibuffer)
+ self.ibuffer = []
+ self.parser.feed(record.decode())
+
+ def collect_incoming_data(self, data):
+ # Remember the last time we saw activity
+ self.last_activity = time.time()
+
+ if not self.skipping_headers:
+ self.ibuffer.append(data)
+
+
+class JSONRecordHandler:
+ def __init__(self, commit_callback, event_callback):
+ self.commit_callback = commit_callback
+ self.event_callback = event_callback
+
+ def feed(self, record):
+ obj = json.loads(record)
+ if 'svnpubsub' in obj:
+ actual_version = obj['svnpubsub'].get('version')
+ EXPECTED_VERSION = 1
+ if actual_version != EXPECTED_VERSION:
+ raise SvnpubsubClientException("Unknown svnpubsub format: %r != %d"
+ % (actual_format, expected_format))
+ self.event_callback('version', obj['svnpubsub']['version'])
+ elif 'commit' in obj:
+ commit = Commit(obj['commit'])
+ self.commit_callback(commit)
+ elif 'stillalive' in obj:
+ self.event_callback('ping', obj['stillalive'])
+
+
+class Commit(object):
+ def __init__(self, commit):
+ self.__dict__.update(commit)
+
+
+class MultiClient(object):
+ def __init__(self, urls, commit_callback, event_callback):
+ self.commit_callback = commit_callback
+ self.event_callback = event_callback
+
+ # No target time, as no work to do
+ self.target_time = 0
+ self.work_items = [ ]
+
+ for url in urls:
+ self._add_channel(url)
+
+ def _reconnect(self, url, event_name, event_arg):
+ if event_name == 'closed' or event_name == 'error':
+ # Stupid connection closed for some reason. Set up a reconnect. Note
+ # that it should have been removed from asyncore.socket_map already.
+ self._reconnect_later(url)
+
+ # Call the user's callback now.
+ self.event_callback(url, event_name, event_arg)
+
+ def _reconnect_later(self, url):
+ # Set up a work item to reconnect in a little while.
+ self.work_items.append(url)
+
+ # Only set a target if one has not been set yet. Otherwise, we could
+ # create a race condition of continually moving out towards the future
+ if not self.target_time:
+ self.target_time = time.time() + RECONNECT_DELAY
+
+ def _add_channel(self, url):
+ # Simply instantiating the client will install it into the global map
+ # for processing in the main event loop.
+ Client(url,
+ functools.partial(self.commit_callback, url),
+ functools.partial(self._reconnect, url))
+
+ def _check_stale(self):
+ now = time.time()
+ for client in asyncore.socket_map.values():
+ if client.last_activity + STALE_DELAY < now:
+ # Whoops. No activity in a while. Signal this fact, Close the
+ # Client, then have it reconnected later on.
+ self.event_callback(client.url, 'stale', client.last_activity)
+
+ # This should remove it from .socket_map.
+ client.close()
+
+ self._reconnect_later(client.url)
+
+ def _maybe_work(self):
+ # If we haven't reach the targetted time, or have no work to do,
+ # then fast-path exit
+ if time.time() < self.target_time or not self.work_items:
+ return
+
+ # We'll take care of all the work items, so no target for future work
+ self.target_time = 0
+
+ # Play a little dance just in case work gets added while we're
+ # currently working on stuff
+ work = self.work_items
+ self.work_items = [ ]
+
+ for url in work:
+ self._add_channel(url)
+
+ def run_forever(self):
+ while True:
+ if asyncore.socket_map:
+ asyncore.loop(timeout=TIMEOUT, count=1)
+ else:
+ time.sleep(TIMEOUT)
+
+ self._check_stale()
+ self._maybe_work()