summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/tests
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src/tests')
-rwxr-xr-xqpid/cpp/src/tests/qpid-cluster-benchmark44
-rwxr-xr-xqpid/cpp/src/tests/qpid-cpp-benchmark68
-rw-r--r--qpid/cpp/src/tests/qpid-receive.cpp2
-rw-r--r--qpid/cpp/src/tests/qpid-send.cpp2
4 files changed, 76 insertions, 40 deletions
diff --git a/qpid/cpp/src/tests/qpid-cluster-benchmark b/qpid/cpp/src/tests/qpid-cluster-benchmark
index 4408e63866..5f0e020475 100755
--- a/qpid/cpp/src/tests/qpid-cluster-benchmark
+++ b/qpid/cpp/src/tests/qpid-cluster-benchmark
@@ -7,9 +7,9 @@
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
-#
+#
# http://www.apache.org/licenses/LICENSE-2.0
-#
+#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -19,21 +19,37 @@
#
# Benchmark script for comparing cluster performance.
-#PORT=":5555"
-BROKER=`echo $HOSTS | awk '{print $1}'` # Single broker
-BROKERS=`echo $HOSTS | sed "s/\>/$PORT/g;s/ /,/g"` # Broker URL list
-COUNT=100000
-RATE=20000 # Rate to throttle senders for latency results
-run_test() { echo $*; "$@"; echo; echo; echo; }
-# Thruput, unshared queue
-run_test qpid-cpp-benchmark --repeat 10 -b $BROKER --no-timestamp -m $COUNT
+# Default values
+PORT="5672"
+BROKERS=`echo $HOSTS | sed "s/\>/:$PORT/g;s/ /,/g"` # Broker URL list
+COUNT=10000
+FLOW=100 # Flow control limit on queue depth for latency.
+REPEAT=10
+SCALE=10
-# Latency
-run_test qpid-cpp-benchmark --repeat 10 -b $BROKER --connection-options '{tcp-nodelay:true}' -m `expr $COUNT / 2` --send-rate $RATE
+while getopts "p:c:f:r:t:b:" opt; do
+ case $opt in
+ p) PORT=$OPTARG;;
+ c) COUNT=$OPTARG;;
+ f) FLOW=$OPTARG;;
+ r) REPEAT=$OPTARG;;
+ s) SCALE=$OPTARG;;
+ b) BROKERS=$OPTARG;;
+ *) echo "Unknown option"; exit 1;;
+ esac
+done
+
+BROKER=`echo $HOSTS | sed 's/,.*//'` # First broker
+
+run_test() { echo $*; shift; "$@"; echo; echo; echo; }
# Multiple pubs/subs connect via multiple brokers (active-active)
-run_test qpid-cpp-benchmark --repeat 10 -b $BROKERS --no-timestamp --summarize -s10 -r10 -m `expr $COUNT / 10`
+run_test "multi-host-thruput" qpid-cpp-benchmark --repeat $REPEAT -b $BROKERS --no-timestamp --summarize -s$SCALE -r$SCALE -m $COUNT
# Multiple pubs/subs connect via single broker (active-passive)
-run_test qpid-cpp-benchmark --repeat 10 -b $BROKER --no-timestamp --summarize -s10 -r10 -m `expr $COUNT / 10`
+run_test "single-host-thruput" qpid-cpp-benchmark --repeat $REPEAT -b $BROKER --no-timestamp --summarize -s$SCALE -r$SCALE -m $COUNT
+
+# Latency
+run_test "latency" qpid-cpp-benchmark --repeat $REPEAT -b $BROKER --connection-options '{tcp-nodelay:true}' -m $COUNT --flow-control $FLOW
+
diff --git a/qpid/cpp/src/tests/qpid-cpp-benchmark b/qpid/cpp/src/tests/qpid-cpp-benchmark
index 1f77226b4d..6138108558 100755
--- a/qpid/cpp/src/tests/qpid-cpp-benchmark
+++ b/qpid/cpp/src/tests/qpid-cpp-benchmark
@@ -77,6 +77,20 @@ def ssh_command(host, command):
"""Convert command into an ssh command on host with quoting"""
return ["ssh", host] + [posix_quote(arg) for arg in command]
+class Clients:
+ def __init__(self): self.clients=[]
+
+ def add(self, client):
+ self.clients.append(client)
+ return client
+
+ def kill(self):
+ for c in self.clients:
+ try: c.kill()
+ except: pass
+
+clients = Clients()
+
def start_receive(queue, index, opts, ready_queue, broker, host):
address_opts=["create:receiver"] + opts.receive_option
if opts.durable: address_opts += ["node:{durable:true}"]
@@ -101,7 +115,7 @@ def start_receive(queue, index, opts, ready_queue, broker, host):
if opts.connection_options:
command += ["--connection-options",opts.connection_options]
if host: command = ssh_command(host, command)
- return Popen(command, stdout=PIPE)
+ return clients.add(Popen(command, stdout=PIPE))
def start_send(queue, opts, broker, host):
address="%s;{%s}"%(queue,",".join(opts.send_option))
@@ -122,7 +136,7 @@ def start_send(queue, opts, broker, host):
if opts.connection_options:
command += ["--connection-options",opts.connection_options]
if host: command = ssh_command(host, command)
- return Popen(command, stdout=PIPE)
+ return clients.add(Popen(command, stdout=PIPE))
def first_line(p):
out,err=p.communicate()
@@ -133,7 +147,11 @@ def delete_queues(queues, broker):
c = qpid.messaging.Connection(broker)
c.open()
for q in queues:
- try: s = c.session().sender("%s;{delete:always}"%(q))
+ try:
+ s = c.session()
+ snd = s.sender("%s;{delete:always}"%(q))
+ snd.close()
+ s.sync()
except qpid.messaging.exceptions.NotFound: pass # Ignore "no such queue"
c.close()
@@ -145,7 +163,6 @@ def print_header(timestamp):
def parse(parser, lines): # Parse sender/receiver output
for l in lines:
fn_val = zip(parser, l)
-
return [map(lambda p: p[0](p[1]), zip(parser,line.split())) for line in lines]
def parse_senders(senders):
@@ -156,11 +173,12 @@ def parse_receivers(receivers):
def print_data(send_stats, recv_stats):
for send,recv in map(None, send_stats, recv_stats):
- if send: print send[0],
+ line=""
+ if send: line += "%d"%send[0]
if recv:
- print "\t\t%d"%recv[0],
- if len(recv) == 4: print "\t%.2f\t%.2f\t%.2f"%tuple(recv[1:]),
- print
+ line += "\t\t%d"%recv[0]
+ if len(recv) == 4: line += "\t%.2f\t%.2f\t%.2f"%tuple(recv[1:])
+ print line
def print_summary(send_stats, recv_stats):
def avg(s): sum(s) / len(s)
@@ -184,11 +202,11 @@ class ReadyReceiver:
self.receiver = self.connection.session().receiver(
"%s;{create:receiver,delete:receiver,node:{durable:false}}"%(queue))
self.receiver.session.sync()
- self.timeout=2
+ self.timeout=10
def wait(self, receivers):
try:
- for i in xrange(len(receivers)): self.receiver.fetch(self.timeout)
+ for i in receivers: self.receiver.fetch(self.timeout)
self.connection.close()
except qpid.messaging.Empty:
for r in receivers:
@@ -221,20 +239,22 @@ def main():
receive_out = ""
ready_queue="%s-ready"%(opts.queue_name)
queues = ["%s-%s"%(opts.queue_name, i) for i in xrange(opts.queues)]
- for i in xrange(opts.repeat):
- delete_queues(queues, opts.broker[0])
- ready_receiver = ReadyReceiver(ready_queue, opts.broker[0])
- receivers = [start_receive(q, j, opts, ready_queue, brokers.next(), client_hosts.next())
- for q in queues for j in xrange(opts.receivers)]
- ready_receiver.wait(filter(None, receivers)) # Wait for receivers to be ready.
- senders = [start_send(q, opts,brokers.next(), client_hosts.next())
- for q in queues for j in xrange(opts.senders)]
- if opts.report_header and i == 0: print_header(opts.timestamp)
- send_stats=parse_senders(senders)
- recv_stats=parse_receivers(receivers)
- if opts.summarize: print_summary(send_stats, recv_stats)
- else: print_data(send_stats, recv_stats)
- delete_queues(queues, opts.broker[0])
+ try:
+ for i in xrange(opts.repeat):
+ delete_queues(queues, opts.broker[0])
+ ready_receiver = ReadyReceiver(ready_queue, opts.broker[0])
+ receivers = [start_receive(q, j, opts, ready_queue, brokers.next(), client_hosts.next())
+ for q in queues for j in xrange(opts.receivers)]
+ ready_receiver.wait(filter(None, receivers)) # Wait for receivers to be ready.
+ senders = [start_send(q, opts,brokers.next(), client_hosts.next())
+ for q in queues for j in xrange(opts.senders)]
+ if opts.report_header and i == 0: print_header(opts.timestamp)
+ send_stats=parse_senders(senders)
+ recv_stats=parse_receivers(receivers)
+ if opts.summarize: print_summary(send_stats, recv_stats)
+ else: print_data(send_stats, recv_stats)
+ delete_queues(queues, opts.broker[0])
+ finally: clients.kill() # No strays
if __name__ == "__main__": main()
diff --git a/qpid/cpp/src/tests/qpid-receive.cpp b/qpid/cpp/src/tests/qpid-receive.cpp
index 5a85da4fd2..9c713e872a 100644
--- a/qpid/cpp/src/tests/qpid-receive.cpp
+++ b/qpid/cpp/src/tests/qpid-receive.cpp
@@ -262,7 +262,7 @@ int main(int argc, char ** argv)
return 0;
}
} catch(const std::exception& error) {
- std::cerr << "Failure: " << error.what() << std::endl;
+ std::cerr << "qpid-receive: " << error.what() << std::endl;
connection.close();
return 1;
}
diff --git a/qpid/cpp/src/tests/qpid-send.cpp b/qpid/cpp/src/tests/qpid-send.cpp
index 15fa284c48..ef5e98e2a0 100644
--- a/qpid/cpp/src/tests/qpid-send.cpp
+++ b/qpid/cpp/src/tests/qpid-send.cpp
@@ -368,7 +368,7 @@ int main(int argc, char ** argv)
return 0;
}
} catch(const std::exception& error) {
- std::cout << "Failed: " << error.what() << std::endl;
+ std::cerr << "qpid-send: " << error.what() << std::endl;
connection.close();
return 1;
}