diff options
author | Alan Conway <aconway@apache.org> | 2011-11-08 14:41:43 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2011-11-08 14:41:43 +0000 |
commit | 0b1f6cbb66c19a7bd620e24d1a7e5dff284518cf (patch) | |
tree | 271c3802bf9422ea756bff96d236f6ce81656c64 | |
parent | 1c90ca7456e04263635ef3d626808bf8a1118ad8 (diff) | |
download | qpid-python-0b1f6cbb66c19a7bd620e24d1a7e5dff284518cf.tar.gz |
NO-JIRA: Improvements to benchmark scripts.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1199265 13f79535-47bb-0310-9956-ffa450edef68
-rwxr-xr-x | qpid/cpp/src/tests/qpid-cluster-benchmark | 56 | ||||
-rwxr-xr-x | qpid/cpp/src/tests/qpid-cpp-benchmark | 120 |
2 files changed, 113 insertions, 63 deletions
diff --git a/qpid/cpp/src/tests/qpid-cluster-benchmark b/qpid/cpp/src/tests/qpid-cluster-benchmark index 662f604919..4d6eaba223 100755 --- a/qpid/cpp/src/tests/qpid-cluster-benchmark +++ b/qpid/cpp/src/tests/qpid-cluster-benchmark @@ -1,4 +1,4 @@ -#!/bin/bash +#!/bin/sh # # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file @@ -20,39 +20,35 @@ # Benchmark script for comparing cluster performance. -# Default values -PORT="5672" -COUNT=10000 -FLOW=100 # Flow control limit on queue depth for latency. -REPEAT=10 -QUEUES=4 -CLIENTS=3 - -while getopts "p:c:f:r:t:b:q:c" opt; do +# Default options +MESSAGES="-m 10000" +FLOW="--flow-control 100" # Flow control limit on queue depth for latency. +REPEAT="--repeat 10" +QUEUES="-q 6" +SENDERS="-s 3" +RECEIVERS="-r 3" +BROKERS= # Local broker +CLIENT_HOSTS= # No ssh, all clients are local + +while getopts "m:f:n:b:q:s:r:c:x:t" opt; do case $opt in - p) PORT=$OPTARG;; - c) COUNT=$OPTARG;; - f) FLOW=$OPTARG;; - r) REPEAT=$OPTARG;; - s) SCALE=$OPTARG;; - b) BROKERS=$OPTARG;; - q) QUEUES=$OPTARG;; - c) CLIENTS=$OPTARG;; + m) MESSAGES="-m $OPTARG";; + f) FLOW="--flow-control $OPTARG";; + n) REPEAT="--repeat $OPTARG";; + b) BROKERS="-b $OPTARG";; + q) QUEUES="-q $OPTARG";; + s) SENDERS="-s $OPTARG";; + r) RECEIVERS="-r $OPTARG";; + c) CLIENT_HOSTS="-c $OPTARG";; + x) SAVE_RECEIVED="--save-received";; + t) TCP_NODELAY="--connection-options {tcp-nodelay:true}";; *) echo "Unknown option"; exit 1;; esac done - -BROKERS=${BROKERS:-$(echo $HOSTS | sed "s/\>/:$PORT/g;s/ /,/g")} # Broker URL list -BROKER=`echo $BROKERS | awk -F, '{print $1}'` # First broker - +BROKER=$(echo $BROKERS | sed s/,.*//) run_test() { echo $*; shift; "$@"; echo; echo; echo; } -# Multiple pubs/subs connect via multiple brokers (active-active) -run_test "multi-host-thruput" qpid-cpp-benchmark --repeat $REPEAT -b $BROKERS --no-timestamp --summarize -q$QUEUES -s$CLIENTS -r$CLIENTS -m $COUNT - -# Multiple pubs/subs connect via single broker (active-passive) -run_test "single-host-thruput" qpid-cpp-benchmark --repeat $REPEAT -b $BROKER --no-timestamp --summarize -q$QUEUES -s$CLIENTS -r$CLIENTS -m $COUNT - -# Latency -run_test "latency" qpid-cpp-benchmark --repeat $REPEAT -b $BROKER --connection-options '{tcp-nodelay:true}' -m $COUNT --flow-control $FLOW +OPTS="$REPEAT $BROKERS --summarize $QUEUES $SENDERS $RECEIVERS $MESSAGES $CLIENT_HOSTS $SAVE_RECEIVED $TCP_NODELAY" +run_test "Queue contention:" qpid-cpp-benchmark $OPTS +run_test "No queue contention: :" qpid-cpp-benchmark $OPTS --group-receivers diff --git a/qpid/cpp/src/tests/qpid-cpp-benchmark b/qpid/cpp/src/tests/qpid-cpp-benchmark index 300d34774f..9a0ee6b384 100755 --- a/qpid/cpp/src/tests/qpid-cpp-benchmark +++ b/qpid/cpp/src/tests/qpid-cpp-benchmark @@ -67,14 +67,19 @@ op.add_option("--flow-control", default=0, type="int", metavar="N", help="Flow control each sender to limit queue depth to 2*N. 0 means no flow control.") op.add_option("--durable", default=False, action="store_true", help="Use durable queues and messages") - +op.add_option("--save-received", default=False, action="store_true", + help="Save received message content to files <queuename>-receiver-<n>.msg") +op.add_option("--group-receivers", default=False, action="store_true", + help="Run receivers for the same queue on the same host.") +op.add_option("--verbose", default=False, action="store_true", + help="Show commands executed") single_quote_re = re.compile("'") def posix_quote(string): """ Quote a string for use as an argument in a posix shell""" return "'" + single_quote_re.sub("\\'", string) + "'"; def ssh_command(host, command): - """Convert command into an ssh command on host with quoting""" + """ Convert command into an ssh command on host with quoting""" return ["ssh", host] + [posix_quote(arg) for arg in command] class Clients: @@ -89,10 +94,16 @@ class Clients: try: c.kill() except: pass +class PopenCommand(Popen): + """Like Popen but you can query for the command""" + def __init__(self, command, *args, **kwargs): + self.command = command + Popen.__init__(self, command, *args, **kwargs) + clients = Clients() def start_receive(queue, index, opts, ready_queue, broker, host): - address_opts=["create:receiver"] + opts.receive_option + address_opts=opts.receive_option if opts.durable: address_opts += ["node:{durable:true}"] address="%s;{%s}"%(queue,",".join(address_opts)) msg_total=opts.senders*opts.messages @@ -108,17 +119,20 @@ def start_receive(queue, index, opts, ready_queue, broker, host): "--receive-rate", str(opts.receive_rate), "--report-total", "--ack-frequency", str(opts.ack_frequency), - "--ready-address", ready_queue, + "--ready-address", "%s;{create:always}"%ready_queue, "--report-header=no" ] + if opts.save_received: + command += ["--save-content=%s-receiver-%s.msg"%(queue,index)] command += opts.receive_arg if opts.connection_options: command += ["--connection-options",opts.connection_options] if host: command = ssh_command(host, command) - return clients.add(Popen(command, stdout=PIPE)) + if opts.verbose: print "Receiver: ", command + return clients.add(PopenCommand(command, stdout=PIPE, stderr=PIPE)) def start_send(queue, opts, broker, host): - address="%s;{%s}"%(queue,",".join(opts.send_option)) + address="%s;{%s}"%(queue,",".join(opts.send_option + ["create:always"])) command = ["qpid-send", "-b", broker, "-a", address, @@ -136,33 +150,52 @@ def start_send(queue, opts, broker, host): if opts.connection_options: command += ["--connection-options",opts.connection_options] if host: command = ssh_command(host, command) - return clients.add(Popen(command, stdout=PIPE)) + if opts.verbose: print "Sender: ", command + return clients.add(PopenCommand(command, stdout=PIPE, stderr=PIPE)) + +def error_msg(out, err): + return ("\n[stdout]\n%s\n[stderr]\n%s[end]"%(out, err)) def first_line(p): out,err=p.communicate() - if p.returncode != 0: raise Exception("Process failed: %s"%(out.strip())) + if p.returncode != 0: + raise Exception("Process exit %d: %s"%(p.returncode, error_msg(out,err))) return out.split("\n")[0] -def delete_queues(queues, broker): +def queue_exists(queue,broker): c = qpid.messaging.Connection(broker) c.open() - for q in queues: + try: + s = c.session() try: - s = c.session() - snd = s.sender("%s;{delete:always}"%(q)) - snd.close() - s.sync() - except qpid.messaging.exceptions.NotFound: pass # Ignore "no such queue" + s.sender(queue) + return True + except qpid.messaging.exceptions.NotFound: + return False + finally: c.close() + +def recreate_queues(queues, brokers): + c = qpid.messaging.Connection(brokers[0]) + c.open() + s = c.session() + for q in queues: + try: s.sender("%s;{delete:always}"%(q)).close() + except qpid.messaging.exceptions.NotFound: pass + # FIXME aconway 2011-05-04: new cluster async wiring, wait for changes to propagate + for b in brokers: + while queue_exists(q,b): time.sleep(0.1); + s.sender("%s;{create:always}"%q) + # FIXME aconway 2011-05-04: new cluster async wiring, wait for changes to propagate + for b in brokers: + while not queue_exists(q,b): time.sleep(0.1); c.close() def print_header(timestamp): - if timestamp: latency_header="\tl-min\tl-max\tl-avg" + if timestamp: latency_header="\tl-min\tl-max\tl-avg\ttotal-tp" else: latency_header="" - print "send-tp\t\trecv-tp%s"%latency_header + print "send-tp\trecv-tp%s"%latency_header def parse(parser, lines): # Parse sender/receiver output - for l in lines: - fn_val = zip(parser, l) return [map(lambda p: p[0](p[1]), zip(parser,line.split())) for line in lines] def parse_senders(senders): @@ -171,32 +204,35 @@ def parse_senders(senders): def parse_receivers(receivers): return parse([int,float,float,float],[first_line(p) for p in receivers if p]) -def print_data(send_stats, recv_stats): +def print_data(send_stats, recv_stats, total_tp): for send,recv in map(None, send_stats, recv_stats): line="" if send: line += "%d"%send[0] if recv: - line += "\t\t%d"%recv[0] + line += "\t%d"%recv[0] if len(recv) == 4: line += "\t%.2f\t%.2f\t%.2f"%tuple(recv[1:]) + if total_tp is not None: + line += "\t%d"%total_tp + total_tp = None print line -def print_summary(send_stats, recv_stats): +def print_summary(send_stats, recv_stats, total_tp): def avg(s): sum(s) / len(s) send_tp = sum([l[0] for l in send_stats]) recv_tp = sum([l[0] for l in recv_stats]) - summary = "%d\t\t%d"%(send_tp, recv_tp) + summary = "%d\t%d"%(send_tp, recv_tp) if recv_stats and len(recv_stats[0]) == 4: l_min = sum(l[1] for l in recv_stats)/len(recv_stats) l_max = sum(l[2] for l in recv_stats)/len(recv_stats) l_avg = sum(l[3] for l in recv_stats)/len(recv_stats) summary += "\t%.2f\t%.2f\t%.2f"%(l_min, l_max, l_avg) + summary += "\t%d"%total_tp print summary class ReadyReceiver: """A receiver for ready messages""" def __init__(self, queue, broker): - delete_queues([queue], broker) self.connection = qpid.messaging.Connection(broker) self.connection.open() self.receiver = self.connection.session().receiver( @@ -212,7 +248,8 @@ class ReadyReceiver: for r in receivers: if (r.poll() is not None): out,err=r.communicate() - raise Exception("Receiver error: %s"%(out)) + raise Exception("Receiver error: %s\n%s" % + (" ".join(r.command), error_msg(out,err))) raise Exception("Timed out waiting for receivers to be ready") def flatten(l): @@ -231,9 +268,12 @@ class RoundRobin: def main(): opts, args = op.parse_args() - if not opts.broker: opts.broker = ["127.0.0.1"] # Deafult to local broker - opts.broker = flatten(opts.broker) opts.client_host = flatten(opts.client_host) + if not opts.broker: + if opts.client_host: + raise Exception("--broker must be specified if --client_host is.") + opts.broker = ["127.0.0.1"] # Deafult to local broker + opts.broker = flatten(opts.broker) brokers = RoundRobin(opts.broker) client_hosts = RoundRobin(opts.client_host) send_out = "" @@ -242,19 +282,33 @@ def main(): queues = ["%s-%s"%(opts.queue_name, i) for i in xrange(opts.queues)] try: for i in xrange(opts.repeat): - delete_queues(queues, opts.broker[0]) + recreate_queues(queues, opts.broker) ready_receiver = ReadyReceiver(ready_queue, opts.broker[0]) - receivers = [start_receive(q, j, opts, ready_queue, brokers.next(), client_hosts.next()) - for q in queues for j in xrange(opts.receivers)] + + if opts.group_receivers: # Run receivers for same queue against same broker. + receivers = [] + for q in queues: + b = brokers.next() + for j in xrange(opts.receivers): + receivers.append( + start_receive(q, j, opts, ready_queue, b, client_hosts.next())) + else: # Don't group receivers + receivers = [start_receive(q, j, opts, ready_queue, + brokers.next(), client_hosts.next()) + for q in queues for j in xrange(opts.receivers)] + ready_receiver.wait(filter(None, receivers)) # Wait for receivers to be ready. + start = time.time() senders = [start_send(q, opts,brokers.next(), client_hosts.next()) for q in queues for j in xrange(opts.senders)] if opts.report_header and i == 0: print_header(opts.timestamp) + for p in senders + receivers: p.wait() + total_sent = opts.queues * opts.senders * opts.messages + total_tp = total_sent / (time.time()-start) send_stats=parse_senders(senders) recv_stats=parse_receivers(receivers) - if opts.summarize: print_summary(send_stats, recv_stats) - else: print_data(send_stats, recv_stats) - delete_queues(queues, opts.broker[0]) + if opts.summarize: print_summary(send_stats, recv_stats, total_tp) + else: print_data(send_stats, recv_stats, total_tp) finally: clients.kill() # No strays if __name__ == "__main__": main() |