diff options
Diffstat (limited to 'qpid/cpp/src/tests')
-rwxr-xr-x | qpid/cpp/src/tests/qpid-cluster-benchmark | 44 | ||||
-rwxr-xr-x | qpid/cpp/src/tests/qpid-cpp-benchmark | 68 | ||||
-rw-r--r-- | qpid/cpp/src/tests/qpid-receive.cpp | 2 | ||||
-rw-r--r-- | qpid/cpp/src/tests/qpid-send.cpp | 2 |
4 files changed, 76 insertions, 40 deletions
diff --git a/qpid/cpp/src/tests/qpid-cluster-benchmark b/qpid/cpp/src/tests/qpid-cluster-benchmark index 4408e63866..5f0e020475 100755 --- a/qpid/cpp/src/tests/qpid-cluster-benchmark +++ b/qpid/cpp/src/tests/qpid-cluster-benchmark @@ -7,9 +7,9 @@ # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at -# +# # http://www.apache.org/licenses/LICENSE-2.0 -# +# # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -19,21 +19,37 @@ # # Benchmark script for comparing cluster performance. -#PORT=":5555" -BROKER=`echo $HOSTS | awk '{print $1}'` # Single broker -BROKERS=`echo $HOSTS | sed "s/\>/$PORT/g;s/ /,/g"` # Broker URL list -COUNT=100000 -RATE=20000 # Rate to throttle senders for latency results -run_test() { echo $*; "$@"; echo; echo; echo; } -# Thruput, unshared queue -run_test qpid-cpp-benchmark --repeat 10 -b $BROKER --no-timestamp -m $COUNT +# Default values +PORT="5672" +BROKERS=`echo $HOSTS | sed "s/\>/:$PORT/g;s/ /,/g"` # Broker URL list +COUNT=10000 +FLOW=100 # Flow control limit on queue depth for latency. +REPEAT=10 +SCALE=10 -# Latency -run_test qpid-cpp-benchmark --repeat 10 -b $BROKER --connection-options '{tcp-nodelay:true}' -m `expr $COUNT / 2` --send-rate $RATE +while getopts "p:c:f:r:t:b:" opt; do + case $opt in + p) PORT=$OPTARG;; + c) COUNT=$OPTARG;; + f) FLOW=$OPTARG;; + r) REPEAT=$OPTARG;; + s) SCALE=$OPTARG;; + b) BROKERS=$OPTARG;; + *) echo "Unknown option"; exit 1;; + esac +done + +BROKER=`echo $HOSTS | sed 's/,.*//'` # First broker + +run_test() { echo $*; shift; "$@"; echo; echo; echo; } # Multiple pubs/subs connect via multiple brokers (active-active) -run_test qpid-cpp-benchmark --repeat 10 -b $BROKERS --no-timestamp --summarize -s10 -r10 -m `expr $COUNT / 10` +run_test "multi-host-thruput" qpid-cpp-benchmark --repeat $REPEAT -b $BROKERS --no-timestamp --summarize -s$SCALE -r$SCALE -m $COUNT # Multiple pubs/subs connect via single broker (active-passive) -run_test qpid-cpp-benchmark --repeat 10 -b $BROKER --no-timestamp --summarize -s10 -r10 -m `expr $COUNT / 10` +run_test "single-host-thruput" qpid-cpp-benchmark --repeat $REPEAT -b $BROKER --no-timestamp --summarize -s$SCALE -r$SCALE -m $COUNT + +# Latency +run_test "latency" qpid-cpp-benchmark --repeat $REPEAT -b $BROKER --connection-options '{tcp-nodelay:true}' -m $COUNT --flow-control $FLOW + diff --git a/qpid/cpp/src/tests/qpid-cpp-benchmark b/qpid/cpp/src/tests/qpid-cpp-benchmark index 1f77226b4d..6138108558 100755 --- a/qpid/cpp/src/tests/qpid-cpp-benchmark +++ b/qpid/cpp/src/tests/qpid-cpp-benchmark @@ -77,6 +77,20 @@ def ssh_command(host, command): """Convert command into an ssh command on host with quoting""" return ["ssh", host] + [posix_quote(arg) for arg in command] +class Clients: + def __init__(self): self.clients=[] + + def add(self, client): + self.clients.append(client) + return client + + def kill(self): + for c in self.clients: + try: c.kill() + except: pass + +clients = Clients() + def start_receive(queue, index, opts, ready_queue, broker, host): address_opts=["create:receiver"] + opts.receive_option if opts.durable: address_opts += ["node:{durable:true}"] @@ -101,7 +115,7 @@ def start_receive(queue, index, opts, ready_queue, broker, host): if opts.connection_options: command += ["--connection-options",opts.connection_options] if host: command = ssh_command(host, command) - return Popen(command, stdout=PIPE) + return clients.add(Popen(command, stdout=PIPE)) def start_send(queue, opts, broker, host): address="%s;{%s}"%(queue,",".join(opts.send_option)) @@ -122,7 +136,7 @@ def start_send(queue, opts, broker, host): if opts.connection_options: command += ["--connection-options",opts.connection_options] if host: command = ssh_command(host, command) - return Popen(command, stdout=PIPE) + return clients.add(Popen(command, stdout=PIPE)) def first_line(p): out,err=p.communicate() @@ -133,7 +147,11 @@ def delete_queues(queues, broker): c = qpid.messaging.Connection(broker) c.open() for q in queues: - try: s = c.session().sender("%s;{delete:always}"%(q)) + try: + s = c.session() + snd = s.sender("%s;{delete:always}"%(q)) + snd.close() + s.sync() except qpid.messaging.exceptions.NotFound: pass # Ignore "no such queue" c.close() @@ -145,7 +163,6 @@ def print_header(timestamp): def parse(parser, lines): # Parse sender/receiver output for l in lines: fn_val = zip(parser, l) - return [map(lambda p: p[0](p[1]), zip(parser,line.split())) for line in lines] def parse_senders(senders): @@ -156,11 +173,12 @@ def parse_receivers(receivers): def print_data(send_stats, recv_stats): for send,recv in map(None, send_stats, recv_stats): - if send: print send[0], + line="" + if send: line += "%d"%send[0] if recv: - print "\t\t%d"%recv[0], - if len(recv) == 4: print "\t%.2f\t%.2f\t%.2f"%tuple(recv[1:]), - print + line += "\t\t%d"%recv[0] + if len(recv) == 4: line += "\t%.2f\t%.2f\t%.2f"%tuple(recv[1:]) + print line def print_summary(send_stats, recv_stats): def avg(s): sum(s) / len(s) @@ -184,11 +202,11 @@ class ReadyReceiver: self.receiver = self.connection.session().receiver( "%s;{create:receiver,delete:receiver,node:{durable:false}}"%(queue)) self.receiver.session.sync() - self.timeout=2 + self.timeout=10 def wait(self, receivers): try: - for i in xrange(len(receivers)): self.receiver.fetch(self.timeout) + for i in receivers: self.receiver.fetch(self.timeout) self.connection.close() except qpid.messaging.Empty: for r in receivers: @@ -221,20 +239,22 @@ def main(): receive_out = "" ready_queue="%s-ready"%(opts.queue_name) queues = ["%s-%s"%(opts.queue_name, i) for i in xrange(opts.queues)] - for i in xrange(opts.repeat): - delete_queues(queues, opts.broker[0]) - ready_receiver = ReadyReceiver(ready_queue, opts.broker[0]) - receivers = [start_receive(q, j, opts, ready_queue, brokers.next(), client_hosts.next()) - for q in queues for j in xrange(opts.receivers)] - ready_receiver.wait(filter(None, receivers)) # Wait for receivers to be ready. - senders = [start_send(q, opts,brokers.next(), client_hosts.next()) - for q in queues for j in xrange(opts.senders)] - if opts.report_header and i == 0: print_header(opts.timestamp) - send_stats=parse_senders(senders) - recv_stats=parse_receivers(receivers) - if opts.summarize: print_summary(send_stats, recv_stats) - else: print_data(send_stats, recv_stats) - delete_queues(queues, opts.broker[0]) + try: + for i in xrange(opts.repeat): + delete_queues(queues, opts.broker[0]) + ready_receiver = ReadyReceiver(ready_queue, opts.broker[0]) + receivers = [start_receive(q, j, opts, ready_queue, brokers.next(), client_hosts.next()) + for q in queues for j in xrange(opts.receivers)] + ready_receiver.wait(filter(None, receivers)) # Wait for receivers to be ready. + senders = [start_send(q, opts,brokers.next(), client_hosts.next()) + for q in queues for j in xrange(opts.senders)] + if opts.report_header and i == 0: print_header(opts.timestamp) + send_stats=parse_senders(senders) + recv_stats=parse_receivers(receivers) + if opts.summarize: print_summary(send_stats, recv_stats) + else: print_data(send_stats, recv_stats) + delete_queues(queues, opts.broker[0]) + finally: clients.kill() # No strays if __name__ == "__main__": main() diff --git a/qpid/cpp/src/tests/qpid-receive.cpp b/qpid/cpp/src/tests/qpid-receive.cpp index 5a85da4fd2..9c713e872a 100644 --- a/qpid/cpp/src/tests/qpid-receive.cpp +++ b/qpid/cpp/src/tests/qpid-receive.cpp @@ -262,7 +262,7 @@ int main(int argc, char ** argv) return 0; } } catch(const std::exception& error) { - std::cerr << "Failure: " << error.what() << std::endl; + std::cerr << "qpid-receive: " << error.what() << std::endl; connection.close(); return 1; } diff --git a/qpid/cpp/src/tests/qpid-send.cpp b/qpid/cpp/src/tests/qpid-send.cpp index 15fa284c48..ef5e98e2a0 100644 --- a/qpid/cpp/src/tests/qpid-send.cpp +++ b/qpid/cpp/src/tests/qpid-send.cpp @@ -368,7 +368,7 @@ int main(int argc, char ** argv) return 0; } } catch(const std::exception& error) { - std::cout << "Failed: " << error.what() << std::endl; + std::cerr << "qpid-send: " << error.what() << std::endl; connection.close(); return 1; } |