summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2011-11-08 14:41:43 +0000
committerAlan Conway <aconway@apache.org>2011-11-08 14:41:43 +0000
commit0b1f6cbb66c19a7bd620e24d1a7e5dff284518cf (patch)
tree271c3802bf9422ea756bff96d236f6ce81656c64
parent1c90ca7456e04263635ef3d626808bf8a1118ad8 (diff)
downloadqpid-python-0b1f6cbb66c19a7bd620e24d1a7e5dff284518cf.tar.gz
NO-JIRA: Improvements to benchmark scripts.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1199265 13f79535-47bb-0310-9956-ffa450edef68
-rwxr-xr-xqpid/cpp/src/tests/qpid-cluster-benchmark56
-rwxr-xr-xqpid/cpp/src/tests/qpid-cpp-benchmark120
2 files changed, 113 insertions, 63 deletions
diff --git a/qpid/cpp/src/tests/qpid-cluster-benchmark b/qpid/cpp/src/tests/qpid-cluster-benchmark
index 662f604919..4d6eaba223 100755
--- a/qpid/cpp/src/tests/qpid-cluster-benchmark
+++ b/qpid/cpp/src/tests/qpid-cluster-benchmark
@@ -1,4 +1,4 @@
-#!/bin/bash
+#!/bin/sh
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
@@ -20,39 +20,35 @@
# Benchmark script for comparing cluster performance.
-# Default values
-PORT="5672"
-COUNT=10000
-FLOW=100 # Flow control limit on queue depth for latency.
-REPEAT=10
-QUEUES=4
-CLIENTS=3
-
-while getopts "p:c:f:r:t:b:q:c" opt; do
+# Default options
+MESSAGES="-m 10000"
+FLOW="--flow-control 100" # Flow control limit on queue depth for latency.
+REPEAT="--repeat 10"
+QUEUES="-q 6"
+SENDERS="-s 3"
+RECEIVERS="-r 3"
+BROKERS= # Local broker
+CLIENT_HOSTS= # No ssh, all clients are local
+
+while getopts "m:f:n:b:q:s:r:c:x:t" opt; do
case $opt in
- p) PORT=$OPTARG;;
- c) COUNT=$OPTARG;;
- f) FLOW=$OPTARG;;
- r) REPEAT=$OPTARG;;
- s) SCALE=$OPTARG;;
- b) BROKERS=$OPTARG;;
- q) QUEUES=$OPTARG;;
- c) CLIENTS=$OPTARG;;
+ m) MESSAGES="-m $OPTARG";;
+ f) FLOW="--flow-control $OPTARG";;
+ n) REPEAT="--repeat $OPTARG";;
+ b) BROKERS="-b $OPTARG";;
+ q) QUEUES="-q $OPTARG";;
+ s) SENDERS="-s $OPTARG";;
+ r) RECEIVERS="-r $OPTARG";;
+ c) CLIENT_HOSTS="-c $OPTARG";;
+ x) SAVE_RECEIVED="--save-received";;
+ t) TCP_NODELAY="--connection-options {tcp-nodelay:true}";;
*) echo "Unknown option"; exit 1;;
esac
done
-
-BROKERS=${BROKERS:-$(echo $HOSTS | sed "s/\>/:$PORT/g;s/ /,/g")} # Broker URL list
-BROKER=`echo $BROKERS | awk -F, '{print $1}'` # First broker
-
+BROKER=$(echo $BROKERS | sed s/,.*//)
run_test() { echo $*; shift; "$@"; echo; echo; echo; }
-# Multiple pubs/subs connect via multiple brokers (active-active)
-run_test "multi-host-thruput" qpid-cpp-benchmark --repeat $REPEAT -b $BROKERS --no-timestamp --summarize -q$QUEUES -s$CLIENTS -r$CLIENTS -m $COUNT
-
-# Multiple pubs/subs connect via single broker (active-passive)
-run_test "single-host-thruput" qpid-cpp-benchmark --repeat $REPEAT -b $BROKER --no-timestamp --summarize -q$QUEUES -s$CLIENTS -r$CLIENTS -m $COUNT
-
-# Latency
-run_test "latency" qpid-cpp-benchmark --repeat $REPEAT -b $BROKER --connection-options '{tcp-nodelay:true}' -m $COUNT --flow-control $FLOW
+OPTS="$REPEAT $BROKERS --summarize $QUEUES $SENDERS $RECEIVERS $MESSAGES $CLIENT_HOSTS $SAVE_RECEIVED $TCP_NODELAY"
+run_test "Queue contention:" qpid-cpp-benchmark $OPTS
+run_test "No queue contention: :" qpid-cpp-benchmark $OPTS --group-receivers
diff --git a/qpid/cpp/src/tests/qpid-cpp-benchmark b/qpid/cpp/src/tests/qpid-cpp-benchmark
index 300d34774f..9a0ee6b384 100755
--- a/qpid/cpp/src/tests/qpid-cpp-benchmark
+++ b/qpid/cpp/src/tests/qpid-cpp-benchmark
@@ -67,14 +67,19 @@ op.add_option("--flow-control", default=0, type="int", metavar="N",
help="Flow control each sender to limit queue depth to 2*N. 0 means no flow control.")
op.add_option("--durable", default=False, action="store_true",
help="Use durable queues and messages")
-
+op.add_option("--save-received", default=False, action="store_true",
+ help="Save received message content to files <queuename>-receiver-<n>.msg")
+op.add_option("--group-receivers", default=False, action="store_true",
+ help="Run receivers for the same queue on the same host.")
+op.add_option("--verbose", default=False, action="store_true",
+ help="Show commands executed")
single_quote_re = re.compile("'")
def posix_quote(string):
""" Quote a string for use as an argument in a posix shell"""
return "'" + single_quote_re.sub("\\'", string) + "'";
def ssh_command(host, command):
- """Convert command into an ssh command on host with quoting"""
+ """ Convert command into an ssh command on host with quoting"""
return ["ssh", host] + [posix_quote(arg) for arg in command]
class Clients:
@@ -89,10 +94,16 @@ class Clients:
try: c.kill()
except: pass
+class PopenCommand(Popen):
+ """Like Popen but you can query for the command"""
+ def __init__(self, command, *args, **kwargs):
+ self.command = command
+ Popen.__init__(self, command, *args, **kwargs)
+
clients = Clients()
def start_receive(queue, index, opts, ready_queue, broker, host):
- address_opts=["create:receiver"] + opts.receive_option
+ address_opts=opts.receive_option
if opts.durable: address_opts += ["node:{durable:true}"]
address="%s;{%s}"%(queue,",".join(address_opts))
msg_total=opts.senders*opts.messages
@@ -108,17 +119,20 @@ def start_receive(queue, index, opts, ready_queue, broker, host):
"--receive-rate", str(opts.receive_rate),
"--report-total",
"--ack-frequency", str(opts.ack_frequency),
- "--ready-address", ready_queue,
+ "--ready-address", "%s;{create:always}"%ready_queue,
"--report-header=no"
]
+ if opts.save_received:
+ command += ["--save-content=%s-receiver-%s.msg"%(queue,index)]
command += opts.receive_arg
if opts.connection_options:
command += ["--connection-options",opts.connection_options]
if host: command = ssh_command(host, command)
- return clients.add(Popen(command, stdout=PIPE))
+ if opts.verbose: print "Receiver: ", command
+ return clients.add(PopenCommand(command, stdout=PIPE, stderr=PIPE))
def start_send(queue, opts, broker, host):
- address="%s;{%s}"%(queue,",".join(opts.send_option))
+ address="%s;{%s}"%(queue,",".join(opts.send_option + ["create:always"]))
command = ["qpid-send",
"-b", broker,
"-a", address,
@@ -136,33 +150,52 @@ def start_send(queue, opts, broker, host):
if opts.connection_options:
command += ["--connection-options",opts.connection_options]
if host: command = ssh_command(host, command)
- return clients.add(Popen(command, stdout=PIPE))
+ if opts.verbose: print "Sender: ", command
+ return clients.add(PopenCommand(command, stdout=PIPE, stderr=PIPE))
+
+def error_msg(out, err):
+ return ("\n[stdout]\n%s\n[stderr]\n%s[end]"%(out, err))
def first_line(p):
out,err=p.communicate()
- if p.returncode != 0: raise Exception("Process failed: %s"%(out.strip()))
+ if p.returncode != 0:
+ raise Exception("Process exit %d: %s"%(p.returncode, error_msg(out,err)))
return out.split("\n")[0]
-def delete_queues(queues, broker):
+def queue_exists(queue,broker):
c = qpid.messaging.Connection(broker)
c.open()
- for q in queues:
+ try:
+ s = c.session()
try:
- s = c.session()
- snd = s.sender("%s;{delete:always}"%(q))
- snd.close()
- s.sync()
- except qpid.messaging.exceptions.NotFound: pass # Ignore "no such queue"
+ s.sender(queue)
+ return True
+ except qpid.messaging.exceptions.NotFound:
+ return False
+ finally: c.close()
+
+def recreate_queues(queues, brokers):
+ c = qpid.messaging.Connection(brokers[0])
+ c.open()
+ s = c.session()
+ for q in queues:
+ try: s.sender("%s;{delete:always}"%(q)).close()
+ except qpid.messaging.exceptions.NotFound: pass
+ # FIXME aconway 2011-05-04: new cluster async wiring, wait for changes to propagate
+ for b in brokers:
+ while queue_exists(q,b): time.sleep(0.1);
+ s.sender("%s;{create:always}"%q)
+ # FIXME aconway 2011-05-04: new cluster async wiring, wait for changes to propagate
+ for b in brokers:
+ while not queue_exists(q,b): time.sleep(0.1);
c.close()
def print_header(timestamp):
- if timestamp: latency_header="\tl-min\tl-max\tl-avg"
+ if timestamp: latency_header="\tl-min\tl-max\tl-avg\ttotal-tp"
else: latency_header=""
- print "send-tp\t\trecv-tp%s"%latency_header
+ print "send-tp\trecv-tp%s"%latency_header
def parse(parser, lines): # Parse sender/receiver output
- for l in lines:
- fn_val = zip(parser, l)
return [map(lambda p: p[0](p[1]), zip(parser,line.split())) for line in lines]
def parse_senders(senders):
@@ -171,32 +204,35 @@ def parse_senders(senders):
def parse_receivers(receivers):
return parse([int,float,float,float],[first_line(p) for p in receivers if p])
-def print_data(send_stats, recv_stats):
+def print_data(send_stats, recv_stats, total_tp):
for send,recv in map(None, send_stats, recv_stats):
line=""
if send: line += "%d"%send[0]
if recv:
- line += "\t\t%d"%recv[0]
+ line += "\t%d"%recv[0]
if len(recv) == 4: line += "\t%.2f\t%.2f\t%.2f"%tuple(recv[1:])
+ if total_tp is not None:
+ line += "\t%d"%total_tp
+ total_tp = None
print line
-def print_summary(send_stats, recv_stats):
+def print_summary(send_stats, recv_stats, total_tp):
def avg(s): sum(s) / len(s)
send_tp = sum([l[0] for l in send_stats])
recv_tp = sum([l[0] for l in recv_stats])
- summary = "%d\t\t%d"%(send_tp, recv_tp)
+ summary = "%d\t%d"%(send_tp, recv_tp)
if recv_stats and len(recv_stats[0]) == 4:
l_min = sum(l[1] for l in recv_stats)/len(recv_stats)
l_max = sum(l[2] for l in recv_stats)/len(recv_stats)
l_avg = sum(l[3] for l in recv_stats)/len(recv_stats)
summary += "\t%.2f\t%.2f\t%.2f"%(l_min, l_max, l_avg)
+ summary += "\t%d"%total_tp
print summary
class ReadyReceiver:
"""A receiver for ready messages"""
def __init__(self, queue, broker):
- delete_queues([queue], broker)
self.connection = qpid.messaging.Connection(broker)
self.connection.open()
self.receiver = self.connection.session().receiver(
@@ -212,7 +248,8 @@ class ReadyReceiver:
for r in receivers:
if (r.poll() is not None):
out,err=r.communicate()
- raise Exception("Receiver error: %s"%(out))
+ raise Exception("Receiver error: %s\n%s" %
+ (" ".join(r.command), error_msg(out,err)))
raise Exception("Timed out waiting for receivers to be ready")
def flatten(l):
@@ -231,9 +268,12 @@ class RoundRobin:
def main():
opts, args = op.parse_args()
- if not opts.broker: opts.broker = ["127.0.0.1"] # Deafult to local broker
- opts.broker = flatten(opts.broker)
opts.client_host = flatten(opts.client_host)
+ if not opts.broker:
+ if opts.client_host:
+ raise Exception("--broker must be specified if --client_host is.")
+ opts.broker = ["127.0.0.1"] # Deafult to local broker
+ opts.broker = flatten(opts.broker)
brokers = RoundRobin(opts.broker)
client_hosts = RoundRobin(opts.client_host)
send_out = ""
@@ -242,19 +282,33 @@ def main():
queues = ["%s-%s"%(opts.queue_name, i) for i in xrange(opts.queues)]
try:
for i in xrange(opts.repeat):
- delete_queues(queues, opts.broker[0])
+ recreate_queues(queues, opts.broker)
ready_receiver = ReadyReceiver(ready_queue, opts.broker[0])
- receivers = [start_receive(q, j, opts, ready_queue, brokers.next(), client_hosts.next())
- for q in queues for j in xrange(opts.receivers)]
+
+ if opts.group_receivers: # Run receivers for same queue against same broker.
+ receivers = []
+ for q in queues:
+ b = brokers.next()
+ for j in xrange(opts.receivers):
+ receivers.append(
+ start_receive(q, j, opts, ready_queue, b, client_hosts.next()))
+ else: # Don't group receivers
+ receivers = [start_receive(q, j, opts, ready_queue,
+ brokers.next(), client_hosts.next())
+ for q in queues for j in xrange(opts.receivers)]
+
ready_receiver.wait(filter(None, receivers)) # Wait for receivers to be ready.
+ start = time.time()
senders = [start_send(q, opts,brokers.next(), client_hosts.next())
for q in queues for j in xrange(opts.senders)]
if opts.report_header and i == 0: print_header(opts.timestamp)
+ for p in senders + receivers: p.wait()
+ total_sent = opts.queues * opts.senders * opts.messages
+ total_tp = total_sent / (time.time()-start)
send_stats=parse_senders(senders)
recv_stats=parse_receivers(receivers)
- if opts.summarize: print_summary(send_stats, recv_stats)
- else: print_data(send_stats, recv_stats)
- delete_queues(queues, opts.broker[0])
+ if opts.summarize: print_summary(send_stats, recv_stats, total_tp)
+ else: print_data(send_stats, recv_stats, total_tp)
finally: clients.kill() # No strays
if __name__ == "__main__": main()