diff options
author | Alan Conway <aconway@apache.org> | 2010-12-14 21:30:55 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2010-12-14 21:30:55 +0000 |
commit | dfd13447e4e26c14ea8c71cd7bdbea886f4f7d4b (patch) | |
tree | 4433184663f276b3a23198a94c0574c769c6bdea /cpp/src | |
parent | 526b38f7cd4fc17f5ac6418495cfbf122654e38b (diff) | |
download | qpid-python-dfd13447e4e26c14ea8c71cd7bdbea886f4f7d4b.tar.gz |
Add end-to-end flow control to qpid-send, qpid-receive and qpid-cpp-benchmark.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1049286 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src')
-rwxr-xr-x | cpp/src/tests/qpid-cpp-benchmark | 17 | ||||
-rw-r--r-- | cpp/src/tests/qpid-receive.cpp | 14 | ||||
-rw-r--r-- | cpp/src/tests/qpid-send.cpp | 27 |
3 files changed, 49 insertions, 9 deletions
diff --git a/cpp/src/tests/qpid-cpp-benchmark b/cpp/src/tests/qpid-cpp-benchmark index d69f28a250..559adee5a7 100755 --- a/cpp/src/tests/qpid-cpp-benchmark +++ b/cpp/src/tests/qpid-cpp-benchmark @@ -44,7 +44,7 @@ op.add_option("--receive-rate", default=0, metavar="N", help="receive rate limited to N messages/second, 0 means no limit (default %default)") op.add_option("--content-size", default=1024, type="int", metavar="BYTES", help="message size in bytes (default %default)") -op.add_option("--ack-frequency", default=0, metavar="N", type="int", +op.add_option("--ack-frequency", default=100, metavar="N", type="int", help="receiver ack's every N messages, 0 means unconfirmed (default %default)") op.add_option("--no-report-header", dest="report_header", default=True, action="store_false", help="don't print header on report") @@ -63,8 +63,10 @@ op.add_option("--no-timestamp", dest="timestamp", default=True, action="store_false", help="don't add a timestamp, no latency results") op.add_option("--connection-options", type="str", help="Connection options for senders & receivers") -single_quote_re = re.compile("'") +op.add_option("--flow-control", default=0, type="int", metavar="N", + help="Flow control each sender to limit queue depth to 2*N. 0 means no flow control.") +single_quote_re = re.compile("'") def posix_quote(string): """ Quote a string for use as an argument in a posix shell""" return "'" + single_quote_re.sub("\\'", string) + "'"; @@ -85,12 +87,13 @@ def start_receive(queue, opts, ready_queue, broker, host): "--report-total", "--ack-frequency", str(opts.ack_frequency), "--ready-address", ready_queue, - "--report-header=no"] + "--report-header=no" + ] command += opts.receive_arg if opts.connection_options: command += ["--connection-options",opts.connection_options] if host: command = ssh_command(host, command) - return Popen(command, stdout=PIPE, stderr=STDOUT) + return Popen(command, stdout=PIPE) def start_send(queue, opts, broker, host): address="%s;{%s}"%(queue,",".join(opts.send_option)) @@ -104,12 +107,14 @@ def start_send(queue, opts, broker, host): "--report-total", "--report-header=no", "--timestamp=%s"%(opts.timestamp and "yes" or "no"), - "--sequence=no"] + "--sequence=no", + "--flow-control", str(opts.flow_control) + ] command += opts.send_arg if opts.connection_options: command += ["--connection-options",opts.connection_options] if host: command = ssh_command(host, command) - return Popen(command, stdout=PIPE, stderr=STDOUT) + return Popen(command, stdout=PIPE) def first_line(p): out,err=p.communicate() diff --git a/cpp/src/tests/qpid-receive.cpp b/cpp/src/tests/qpid-receive.cpp index a85d882a0f..9b84306605 100644 --- a/cpp/src/tests/qpid-receive.cpp +++ b/cpp/src/tests/qpid-receive.cpp @@ -191,6 +191,9 @@ int main(int argc, char ** argv) int64_t interval = 0; if (opts.receiveRate) interval = qpid::sys::TIME_SEC/opts.receiveRate; + Address replyToAddress; + Sender replyToSender; + while (!done && receiver.fetch(msg, timeout)) { reporter.message(msg); if (!opts.ignoreDuplicates || !sequenceTracker.isDuplicate(msg)) { @@ -223,12 +226,21 @@ int main(int argc, char ** argv) } else if (opts.ackFrequency && (count % opts.ackFrequency == 0)) { session.acknowledge(); } + if (msg.getReplyTo()) { // Echo message back to reply-to address. + if (msg.getReplyTo() != replyToAddress) { + replyToSender = session.createSender(msg.getReplyTo()); + replyToSender.setCapacity(opts.capacity); + replyToAddress = msg.getReplyTo(); + } + replyToSender.send(msg); + } if (opts.receiveRate) { qpid::sys::AbsTime waitTill(start, count*interval); int64_t delay = qpid::sys::Duration(qpid::sys::now(), waitTill); if (delay > 0) qpid::sys::usleep(delay/qpid::sys::TIME_USEC); } - //opts.rejectFrequency?? + // Clear out message properties & content for next iteration. + msg = Message(); // TODO aconway 2010-12-01: should be done by fetch } if (opts.reportTotal) reporter.report(); if (opts.tx) { diff --git a/cpp/src/tests/qpid-send.cpp b/cpp/src/tests/qpid-send.cpp index 7e96cc1a09..e5ae6a9e4a 100644 --- a/cpp/src/tests/qpid-send.cpp +++ b/cpp/src/tests/qpid-send.cpp @@ -23,6 +23,7 @@ #include <qpid/messaging/Connection.h> #include <qpid/messaging/Message.h> #include <qpid/messaging/Sender.h> +#include <qpid/messaging/Receiver.h> #include <qpid/messaging/Session.h> #include <qpid/messaging/FailoverUpdates.h> #include <qpid/sys/Time.h> @@ -71,6 +72,7 @@ struct Options : public qpid::Options uint reportEvery; bool reportHeader; uint sendRate; + uint flowControl; bool sequence; bool timestamp; @@ -94,6 +96,7 @@ struct Options : public qpid::Options reportEvery(0), reportHeader(true), sendRate(0), + flowControl(0), sequence(true), timestamp(true) { @@ -122,6 +125,7 @@ struct Options : public qpid::Options ("report-every", qpid::optValue(reportEvery,"N"), "Report throughput statistics every N messages") ("report-header", qpid::optValue(reportHeader, "yes|no"), "Headers on report.") ("send-rate", qpid::optValue(sendRate,"N"), "Send at rate of N messages/second. 0 means send as fast as possible.") + ("flow-control", qpid::optValue(flowControl,"N"), "Do end to end flow control to limit queue depth to 2*N. 0 means no flow control.") ("sequence", qpid::optValue(sequence, "yes|no"), "Add a sequence number messages property (required for duplicate/lost message detection)") ("timestamp", qpid::optValue(timestamp, "yes|no"), "Add a time stamp messages property (required for latency measurement)") ("help", qpid::optValue(help), "print this usage statement"); @@ -286,6 +290,14 @@ int main(int argc, char ** argv) int64_t interval = 0; if (opts.sendRate) interval = qpid::sys::TIME_SEC/opts.sendRate; + Receiver flowControlReceiver; + Address flowControlAddress(Uuid(true).str()+";{create:always}"); + uint flowSent = 0; + if (opts.flowControl) { + flowControlReceiver = session.createReceiver(flowControlAddress); + flowControlReceiver.setCapacity(2); + } + while (contentGen->setContent(msg)) { ++sent; if (opts.sequence) @@ -293,6 +305,11 @@ int main(int argc, char ** argv) if (opts.timestamp) msg.getProperties()[TS] = int64_t( qpid::sys::Duration(qpid::sys::EPOCH, qpid::sys::now())); + if (opts.flowControl && ((sent % opts.flowControl) == 0)) { + msg.setReplyTo(flowControlAddress); + ++flowSent; + } + sender.send(msg); reporter.message(msg); if (opts.tx && (sent % opts.tx == 0)) { @@ -303,12 +320,18 @@ int main(int argc, char ** argv) session.commit(); } if (opts.messages && sent >= opts.messages) break; + + if (opts.flowControl && flowSent == 2) { + flowControlReceiver.get(Duration::SECOND*1); + --flowSent; + } + if (opts.sendRate) { qpid::sys::AbsTime waitTill(start, sent*interval); int64_t delay = qpid::sys::Duration(qpid::sys::now(), waitTill); - if (delay > 0) - qpid::sys::usleep(delay/qpid::sys::TIME_USEC); + if (delay > 0) qpid::sys::usleep(delay/qpid::sys::TIME_USEC); } + msg = Message(); // Clear out contents and properties for next iteration } if (opts.reportTotal) reporter.report(); for (uint i = opts.sendEos; i > 0; --i) { |