diff options
author | Alan Conway <aconway@apache.org> | 2010-12-15 18:10:12 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2010-12-15 18:10:12 +0000 |
commit | 077facba2cddd2c49d14e496dfa942c23a5e66c9 (patch) | |
tree | e9c078351733678236a01056a88ec5bf5e50372c /cpp/src/tests/qpid-cpp-benchmark | |
parent | c50499e4c309e43367c2ff4ab478d85f88c3124c (diff) | |
download | qpid-python-077facba2cddd2c49d14e496dfa942c23a5e66c9.tar.gz |
Fix flow control for qpid-cpp-benchmark with multiple senders.
Ensure senders & receivers agree on number of messages sent/received.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1049656 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/tests/qpid-cpp-benchmark')
-rwxr-xr-x | cpp/src/tests/qpid-cpp-benchmark | 17 |
1 files changed, 10 insertions, 7 deletions
diff --git a/cpp/src/tests/qpid-cpp-benchmark b/cpp/src/tests/qpid-cpp-benchmark index 559adee5a7..e865a49813 100755 --- a/cpp/src/tests/qpid-cpp-benchmark +++ b/cpp/src/tests/qpid-cpp-benchmark @@ -75,12 +75,16 @@ def ssh_command(host, command): """Convert command into an ssh command on host with quoting""" return ["ssh", host] + [posix_quote(arg) for arg in command] -def start_receive(queue, opts, ready_queue, broker, host): +def start_receive(queue, index, opts, ready_queue, broker, host): address="%s;{%s}"%(queue,",".join(["create:always"]+opts.receive_option)) + msg_total=opts.senders*opts.messages + messages = msg_total/opts.receivers; + if (index < msg_total%opts.receivers): messages += 1 + if (messages == 0): return None command = ["qpid-receive", "-b", broker, "-a", address, - "-m", str((opts.senders*opts.messages)/opts.receivers), + "-m", str(messages), "--forever", "--print-content=no", "--receive-rate", str(opts.receive_rate), @@ -101,7 +105,6 @@ def start_send(queue, opts, broker, host): "-b", broker, "-a", address, "--messages", str(opts.messages), - "--send-eos", str(opts.receivers), "--content-size", str(opts.content_size), "--send-rate", str(opts.send_rate), "--report-total", @@ -118,7 +121,7 @@ def start_send(queue, opts, broker, host): def first_line(p): out,err=p.communicate() - if p.returncode != 0: raise Exception("ERROR:\n%s"%(out)) + if p.returncode != 0: raise Exception("Process failed: %s"%(out.strip())) return out.split("\n")[0] def delete_queues(queues, broker): @@ -144,7 +147,7 @@ def parse_senders(senders): return parse([int],[first_line(p) for p in senders]) def parse_receivers(receivers): - return parse([int,float,float,float],[first_line(p) for p in receivers]) + return parse([int,float,float,float],[first_line(p) for p in receivers if p]) def print_data(send_stats, recv_stats): for send,recv in map(None, send_stats, recv_stats): @@ -216,9 +219,9 @@ def main(): for i in xrange(opts.repeat): delete_queues(queues, opts.broker[0]) ready_receiver = ReadyReceiver(ready_queue, opts.broker[0]) - receivers = [start_receive(q, opts, ready_queue, brokers.next(), client_hosts.next()) + receivers = [start_receive(q, j, opts, ready_queue, brokers.next(), client_hosts.next()) for q in queues for j in xrange(opts.receivers)] - ready_receiver.wait(receivers) # Wait for receivers to be ready. + ready_receiver.wait(filter(None, receivers)) # Wait for receivers to be ready. senders = [start_send(q, opts,brokers.next(), client_hosts.next()) for q in queues for j in xrange(opts.senders)] if opts.report_header and i == 0: print_header(opts.timestamp) |