diff options
Diffstat (limited to 'qpid/cpp/src/tests/qpid-cluster-lag.py')
-rwxr-xr-x | qpid/cpp/src/tests/qpid-cluster-lag.py | 93 |
1 files changed, 93 insertions, 0 deletions
diff --git a/qpid/cpp/src/tests/qpid-cluster-lag.py b/qpid/cpp/src/tests/qpid-cluster-lag.py new file mode 100755 index 0000000000..5b24353241 --- /dev/null +++ b/qpid/cpp/src/tests/qpid-cluster-lag.py @@ -0,0 +1,93 @@ +#!/usr/bin/env python + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +"""%prog [options] broker... +Check for brokers that lag behind other brokers in a cluster.""" + +import os, os.path, sys, socket, time, re +from qpid.messaging import * +from optparse import OptionParser +from threading import Thread + +class Browser(Thread): + def __init__(self, broker, queue, timeout): + Thread.__init__(self) + self.broker = broker + self.queue = queue + self.timeout = timeout + self.error = None + self.time = None + + def run(self): + try: + self.connection = Connection(self.broker) + self.connection.open() + self.session = self.connection.session() + self.receiver = self.session.receiver("%s;{mode:browse}"%self.queue) + self.msg = self.receiver.fetch(timeout=self.timeout) + self.time = time.time() + if (self.msg.content != self.queue): + raise Exception("Wrong message content, expected '%s' found '%s'"% + (self.queue, self.msg.content)) + except Empty: + self.error = "No message on queue %s"%self.queue + except Exception, e: + self.error = "Error: %s"%e + +def main(argv): + op = OptionParser(usage=__doc__) + op.add_option("--timeout", type="float", default=None, metavar="TIMEOUT", + help="Give up after TIMEOUT milliseconds, default never timeout") + (opts, args) = op.parse_args(argv) + if (len(args) <= 1): op.error("No brokers were specified") + brokers = args[1:] + + # Put a message on a uniquely named queue. + queue = "%s:%s:%s"%(os.path.basename(args[0]), socket.gethostname(), os.getpid()) + connection = Connection(brokers[0]) + connection.open() + session = connection.session() + sender = session.sender( + "%s;{create:always,delete:always,node:{durable:False}}"%queue) + sender.send(Message(content=queue)) + start = time.time() + # Browse for the message on each broker + if opts.timeout: opts.timeout + threads = [Browser(b, queue, opts.timeout) for b in brokers] + for t in threads: t.start() + delays=[] + + for t in threads: + t.join() + if t.error: + delay=t.error + else: + delay = t.time-start + delays.append([delay, t.broker]) + print "%s: %s"%(t.broker,delay) + if delays: + delays.sort() + print "lag: %s (%s-%s)"%(delays[-1][0] - delays[0][0], delays[-1][1], delays[0][1]) + # Clean up + sender.close() + session.close() + connection.close() + +if __name__ == "__main__": sys.exit(main(sys.argv)) |