summaryrefslogtreecommitdiff
path: root/cpp/src/tests/qpid-cpp-benchmark
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2010-12-15 18:10:12 +0000
committerAlan Conway <aconway@apache.org>2010-12-15 18:10:12 +0000
commit077facba2cddd2c49d14e496dfa942c23a5e66c9 (patch)
treee9c078351733678236a01056a88ec5bf5e50372c /cpp/src/tests/qpid-cpp-benchmark
parentc50499e4c309e43367c2ff4ab478d85f88c3124c (diff)
downloadqpid-python-077facba2cddd2c49d14e496dfa942c23a5e66c9.tar.gz
Fix flow control for qpid-cpp-benchmark with multiple senders.
Ensure senders & receivers agree on number of messages sent/received. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1049656 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/tests/qpid-cpp-benchmark')
-rwxr-xr-xcpp/src/tests/qpid-cpp-benchmark17
1 files changed, 10 insertions, 7 deletions
diff --git a/cpp/src/tests/qpid-cpp-benchmark b/cpp/src/tests/qpid-cpp-benchmark
index 559adee5a7..e865a49813 100755
--- a/cpp/src/tests/qpid-cpp-benchmark
+++ b/cpp/src/tests/qpid-cpp-benchmark
@@ -75,12 +75,16 @@ def ssh_command(host, command):
"""Convert command into an ssh command on host with quoting"""
return ["ssh", host] + [posix_quote(arg) for arg in command]
-def start_receive(queue, opts, ready_queue, broker, host):
+def start_receive(queue, index, opts, ready_queue, broker, host):
address="%s;{%s}"%(queue,",".join(["create:always"]+opts.receive_option))
+ msg_total=opts.senders*opts.messages
+ messages = msg_total/opts.receivers;
+ if (index < msg_total%opts.receivers): messages += 1
+ if (messages == 0): return None
command = ["qpid-receive",
"-b", broker,
"-a", address,
- "-m", str((opts.senders*opts.messages)/opts.receivers),
+ "-m", str(messages),
"--forever",
"--print-content=no",
"--receive-rate", str(opts.receive_rate),
@@ -101,7 +105,6 @@ def start_send(queue, opts, broker, host):
"-b", broker,
"-a", address,
"--messages", str(opts.messages),
- "--send-eos", str(opts.receivers),
"--content-size", str(opts.content_size),
"--send-rate", str(opts.send_rate),
"--report-total",
@@ -118,7 +121,7 @@ def start_send(queue, opts, broker, host):
def first_line(p):
out,err=p.communicate()
- if p.returncode != 0: raise Exception("ERROR:\n%s"%(out))
+ if p.returncode != 0: raise Exception("Process failed: %s"%(out.strip()))
return out.split("\n")[0]
def delete_queues(queues, broker):
@@ -144,7 +147,7 @@ def parse_senders(senders):
return parse([int],[first_line(p) for p in senders])
def parse_receivers(receivers):
- return parse([int,float,float,float],[first_line(p) for p in receivers])
+ return parse([int,float,float,float],[first_line(p) for p in receivers if p])
def print_data(send_stats, recv_stats):
for send,recv in map(None, send_stats, recv_stats):
@@ -216,9 +219,9 @@ def main():
for i in xrange(opts.repeat):
delete_queues(queues, opts.broker[0])
ready_receiver = ReadyReceiver(ready_queue, opts.broker[0])
- receivers = [start_receive(q, opts, ready_queue, brokers.next(), client_hosts.next())
+ receivers = [start_receive(q, j, opts, ready_queue, brokers.next(), client_hosts.next())
for q in queues for j in xrange(opts.receivers)]
- ready_receiver.wait(receivers) # Wait for receivers to be ready.
+ ready_receiver.wait(filter(None, receivers)) # Wait for receivers to be ready.
senders = [start_send(q, opts,brokers.next(), client_hosts.next())
for q in queues for j in xrange(opts.senders)]
if opts.report_header and i == 0: print_header(opts.timestamp)