diff options
author | Alan Conway <aconway@apache.org> | 2010-12-15 18:10:12 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2010-12-15 18:10:12 +0000 |
commit | 077facba2cddd2c49d14e496dfa942c23a5e66c9 (patch) | |
tree | e9c078351733678236a01056a88ec5bf5e50372c /cpp/src | |
parent | c50499e4c309e43367c2ff4ab478d85f88c3124c (diff) | |
download | qpid-python-077facba2cddd2c49d14e496dfa942c23a5e66c9.tar.gz |
Fix flow control for qpid-cpp-benchmark with multiple senders.
Ensure senders & receivers agree on number of messages sent/received.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1049656 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src')
-rwxr-xr-x | cpp/src/tests/qpid-cpp-benchmark | 17 | ||||
-rw-r--r-- | cpp/src/tests/qpid-receive.cpp | 13 | ||||
-rw-r--r-- | cpp/src/tests/qpid-send.cpp | 8 |
3 files changed, 21 insertions, 17 deletions
diff --git a/cpp/src/tests/qpid-cpp-benchmark b/cpp/src/tests/qpid-cpp-benchmark index 559adee5a7..e865a49813 100755 --- a/cpp/src/tests/qpid-cpp-benchmark +++ b/cpp/src/tests/qpid-cpp-benchmark @@ -75,12 +75,16 @@ def ssh_command(host, command): """Convert command into an ssh command on host with quoting""" return ["ssh", host] + [posix_quote(arg) for arg in command] -def start_receive(queue, opts, ready_queue, broker, host): +def start_receive(queue, index, opts, ready_queue, broker, host): address="%s;{%s}"%(queue,",".join(["create:always"]+opts.receive_option)) + msg_total=opts.senders*opts.messages + messages = msg_total/opts.receivers; + if (index < msg_total%opts.receivers): messages += 1 + if (messages == 0): return None command = ["qpid-receive", "-b", broker, "-a", address, - "-m", str((opts.senders*opts.messages)/opts.receivers), + "-m", str(messages), "--forever", "--print-content=no", "--receive-rate", str(opts.receive_rate), @@ -101,7 +105,6 @@ def start_send(queue, opts, broker, host): "-b", broker, "-a", address, "--messages", str(opts.messages), - "--send-eos", str(opts.receivers), "--content-size", str(opts.content_size), "--send-rate", str(opts.send_rate), "--report-total", @@ -118,7 +121,7 @@ def start_send(queue, opts, broker, host): def first_line(p): out,err=p.communicate() - if p.returncode != 0: raise Exception("ERROR:\n%s"%(out)) + if p.returncode != 0: raise Exception("Process failed: %s"%(out.strip())) return out.split("\n")[0] def delete_queues(queues, broker): @@ -144,7 +147,7 @@ def parse_senders(senders): return parse([int],[first_line(p) for p in senders]) def parse_receivers(receivers): - return parse([int,float,float,float],[first_line(p) for p in receivers]) + return parse([int,float,float,float],[first_line(p) for p in receivers if p]) def print_data(send_stats, recv_stats): for send,recv in map(None, send_stats, recv_stats): @@ -216,9 +219,9 @@ def main(): for i in xrange(opts.repeat): delete_queues(queues, opts.broker[0]) ready_receiver = ReadyReceiver(ready_queue, opts.broker[0]) - receivers = [start_receive(q, opts, ready_queue, brokers.next(), client_hosts.next()) + receivers = [start_receive(q, j, opts, ready_queue, brokers.next(), client_hosts.next()) for q in queues for j in xrange(opts.receivers)] - ready_receiver.wait(receivers) # Wait for receivers to be ready. + ready_receiver.wait(filter(None, receivers)) # Wait for receivers to be ready. senders = [start_send(q, opts,brokers.next(), client_hosts.next()) for q in queues for j in xrange(opts.senders)] if opts.report_header and i == 0: print_header(opts.timestamp) diff --git a/cpp/src/tests/qpid-receive.cpp b/cpp/src/tests/qpid-receive.cpp index 9b84306605..28e229ca27 100644 --- a/cpp/src/tests/qpid-receive.cpp +++ b/cpp/src/tests/qpid-receive.cpp @@ -191,8 +191,7 @@ int main(int argc, char ** argv) int64_t interval = 0; if (opts.receiveRate) interval = qpid::sys::TIME_SEC/opts.receiveRate; - Address replyToAddress; - Sender replyToSender; + std::map<std::string,Sender> replyTo; while (!done && receiver.fetch(msg, timeout)) { reporter.message(msg); @@ -227,12 +226,12 @@ int main(int argc, char ** argv) session.acknowledge(); } if (msg.getReplyTo()) { // Echo message back to reply-to address. - if (msg.getReplyTo() != replyToAddress) { - replyToSender = session.createSender(msg.getReplyTo()); - replyToSender.setCapacity(opts.capacity); - replyToAddress = msg.getReplyTo(); + Sender& s = replyTo[msg.getReplyTo().str()]; + if (s.isNull()) { + s = session.createSender(msg.getReplyTo()); + s.setCapacity(opts.capacity); } - replyToSender.send(msg); + s.send(msg); } if (opts.receiveRate) { qpid::sys::AbsTime waitTill(start, count*interval); diff --git a/cpp/src/tests/qpid-send.cpp b/cpp/src/tests/qpid-send.cpp index e5ae6a9e4a..c71cb83f9a 100644 --- a/cpp/src/tests/qpid-send.cpp +++ b/cpp/src/tests/qpid-send.cpp @@ -291,7 +291,7 @@ int main(int argc, char ** argv) if (opts.sendRate) interval = qpid::sys::TIME_SEC/opts.sendRate; Receiver flowControlReceiver; - Address flowControlAddress(Uuid(true).str()+";{create:always}"); + Address flowControlAddress("flow-"+Uuid(true).str()+";{create:always,delete:always}"); uint flowSent = 0; if (opts.flowControl) { flowControlReceiver = session.createReceiver(flowControlAddress); @@ -322,7 +322,7 @@ int main(int argc, char ** argv) if (opts.messages && sent >= opts.messages) break; if (opts.flowControl && flowSent == 2) { - flowControlReceiver.get(Duration::SECOND*1); + flowControlReceiver.get(Duration::SECOND); --flowSent; } @@ -333,11 +333,13 @@ int main(int argc, char ** argv) } msg = Message(); // Clear out contents and properties for next iteration } + for ( ; flowSent>0; --flowSent) + flowControlReceiver.get(Duration::SECOND); if (opts.reportTotal) reporter.report(); for (uint i = opts.sendEos; i > 0; --i) { if (opts.sequence) msg.getProperties()[SN] = ++sent; - msg.setContent(EOS);//TODO: add in ability to send digest or similar + msg.setContent(EOS); //TODO: add in ability to send digest or similar sender.send(msg); } if (opts.tx) { |