summaryrefslogtreecommitdiff
path: root/cpp/src
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2010-12-14 21:30:55 +0000
committerAlan Conway <aconway@apache.org>2010-12-14 21:30:55 +0000
commitdfd13447e4e26c14ea8c71cd7bdbea886f4f7d4b (patch)
tree4433184663f276b3a23198a94c0574c769c6bdea /cpp/src
parent526b38f7cd4fc17f5ac6418495cfbf122654e38b (diff)
downloadqpid-python-dfd13447e4e26c14ea8c71cd7bdbea886f4f7d4b.tar.gz
Add end-to-end flow control to qpid-send, qpid-receive and qpid-cpp-benchmark.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1049286 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src')
-rwxr-xr-xcpp/src/tests/qpid-cpp-benchmark17
-rw-r--r--cpp/src/tests/qpid-receive.cpp14
-rw-r--r--cpp/src/tests/qpid-send.cpp27
3 files changed, 49 insertions, 9 deletions
diff --git a/cpp/src/tests/qpid-cpp-benchmark b/cpp/src/tests/qpid-cpp-benchmark
index d69f28a250..559adee5a7 100755
--- a/cpp/src/tests/qpid-cpp-benchmark
+++ b/cpp/src/tests/qpid-cpp-benchmark
@@ -44,7 +44,7 @@ op.add_option("--receive-rate", default=0, metavar="N",
help="receive rate limited to N messages/second, 0 means no limit (default %default)")
op.add_option("--content-size", default=1024, type="int", metavar="BYTES",
help="message size in bytes (default %default)")
-op.add_option("--ack-frequency", default=0, metavar="N", type="int",
+op.add_option("--ack-frequency", default=100, metavar="N", type="int",
help="receiver ack's every N messages, 0 means unconfirmed (default %default)")
op.add_option("--no-report-header", dest="report_header", default=True,
action="store_false", help="don't print header on report")
@@ -63,8 +63,10 @@ op.add_option("--no-timestamp", dest="timestamp", default=True,
action="store_false", help="don't add a timestamp, no latency results")
op.add_option("--connection-options", type="str",
help="Connection options for senders & receivers")
-single_quote_re = re.compile("'")
+op.add_option("--flow-control", default=0, type="int", metavar="N",
+ help="Flow control each sender to limit queue depth to 2*N. 0 means no flow control.")
+single_quote_re = re.compile("'")
def posix_quote(string):
""" Quote a string for use as an argument in a posix shell"""
return "'" + single_quote_re.sub("\\'", string) + "'";
@@ -85,12 +87,13 @@ def start_receive(queue, opts, ready_queue, broker, host):
"--report-total",
"--ack-frequency", str(opts.ack_frequency),
"--ready-address", ready_queue,
- "--report-header=no"]
+ "--report-header=no"
+ ]
command += opts.receive_arg
if opts.connection_options:
command += ["--connection-options",opts.connection_options]
if host: command = ssh_command(host, command)
- return Popen(command, stdout=PIPE, stderr=STDOUT)
+ return Popen(command, stdout=PIPE)
def start_send(queue, opts, broker, host):
address="%s;{%s}"%(queue,",".join(opts.send_option))
@@ -104,12 +107,14 @@ def start_send(queue, opts, broker, host):
"--report-total",
"--report-header=no",
"--timestamp=%s"%(opts.timestamp and "yes" or "no"),
- "--sequence=no"]
+ "--sequence=no",
+ "--flow-control", str(opts.flow_control)
+ ]
command += opts.send_arg
if opts.connection_options:
command += ["--connection-options",opts.connection_options]
if host: command = ssh_command(host, command)
- return Popen(command, stdout=PIPE, stderr=STDOUT)
+ return Popen(command, stdout=PIPE)
def first_line(p):
out,err=p.communicate()
diff --git a/cpp/src/tests/qpid-receive.cpp b/cpp/src/tests/qpid-receive.cpp
index a85d882a0f..9b84306605 100644
--- a/cpp/src/tests/qpid-receive.cpp
+++ b/cpp/src/tests/qpid-receive.cpp
@@ -191,6 +191,9 @@ int main(int argc, char ** argv)
int64_t interval = 0;
if (opts.receiveRate) interval = qpid::sys::TIME_SEC/opts.receiveRate;
+ Address replyToAddress;
+ Sender replyToSender;
+
while (!done && receiver.fetch(msg, timeout)) {
reporter.message(msg);
if (!opts.ignoreDuplicates || !sequenceTracker.isDuplicate(msg)) {
@@ -223,12 +226,21 @@ int main(int argc, char ** argv)
} else if (opts.ackFrequency && (count % opts.ackFrequency == 0)) {
session.acknowledge();
}
+ if (msg.getReplyTo()) { // Echo message back to reply-to address.
+ if (msg.getReplyTo() != replyToAddress) {
+ replyToSender = session.createSender(msg.getReplyTo());
+ replyToSender.setCapacity(opts.capacity);
+ replyToAddress = msg.getReplyTo();
+ }
+ replyToSender.send(msg);
+ }
if (opts.receiveRate) {
qpid::sys::AbsTime waitTill(start, count*interval);
int64_t delay = qpid::sys::Duration(qpid::sys::now(), waitTill);
if (delay > 0) qpid::sys::usleep(delay/qpid::sys::TIME_USEC);
}
- //opts.rejectFrequency??
+ // Clear out message properties & content for next iteration.
+ msg = Message(); // TODO aconway 2010-12-01: should be done by fetch
}
if (opts.reportTotal) reporter.report();
if (opts.tx) {
diff --git a/cpp/src/tests/qpid-send.cpp b/cpp/src/tests/qpid-send.cpp
index 7e96cc1a09..e5ae6a9e4a 100644
--- a/cpp/src/tests/qpid-send.cpp
+++ b/cpp/src/tests/qpid-send.cpp
@@ -23,6 +23,7 @@
#include <qpid/messaging/Connection.h>
#include <qpid/messaging/Message.h>
#include <qpid/messaging/Sender.h>
+#include <qpid/messaging/Receiver.h>
#include <qpid/messaging/Session.h>
#include <qpid/messaging/FailoverUpdates.h>
#include <qpid/sys/Time.h>
@@ -71,6 +72,7 @@ struct Options : public qpid::Options
uint reportEvery;
bool reportHeader;
uint sendRate;
+ uint flowControl;
bool sequence;
bool timestamp;
@@ -94,6 +96,7 @@ struct Options : public qpid::Options
reportEvery(0),
reportHeader(true),
sendRate(0),
+ flowControl(0),
sequence(true),
timestamp(true)
{
@@ -122,6 +125,7 @@ struct Options : public qpid::Options
("report-every", qpid::optValue(reportEvery,"N"), "Report throughput statistics every N messages")
("report-header", qpid::optValue(reportHeader, "yes|no"), "Headers on report.")
("send-rate", qpid::optValue(sendRate,"N"), "Send at rate of N messages/second. 0 means send as fast as possible.")
+ ("flow-control", qpid::optValue(flowControl,"N"), "Do end to end flow control to limit queue depth to 2*N. 0 means no flow control.")
("sequence", qpid::optValue(sequence, "yes|no"), "Add a sequence number messages property (required for duplicate/lost message detection)")
("timestamp", qpid::optValue(timestamp, "yes|no"), "Add a time stamp messages property (required for latency measurement)")
("help", qpid::optValue(help), "print this usage statement");
@@ -286,6 +290,14 @@ int main(int argc, char ** argv)
int64_t interval = 0;
if (opts.sendRate) interval = qpid::sys::TIME_SEC/opts.sendRate;
+ Receiver flowControlReceiver;
+ Address flowControlAddress(Uuid(true).str()+";{create:always}");
+ uint flowSent = 0;
+ if (opts.flowControl) {
+ flowControlReceiver = session.createReceiver(flowControlAddress);
+ flowControlReceiver.setCapacity(2);
+ }
+
while (contentGen->setContent(msg)) {
++sent;
if (opts.sequence)
@@ -293,6 +305,11 @@ int main(int argc, char ** argv)
if (opts.timestamp)
msg.getProperties()[TS] = int64_t(
qpid::sys::Duration(qpid::sys::EPOCH, qpid::sys::now()));
+ if (opts.flowControl && ((sent % opts.flowControl) == 0)) {
+ msg.setReplyTo(flowControlAddress);
+ ++flowSent;
+ }
+
sender.send(msg);
reporter.message(msg);
if (opts.tx && (sent % opts.tx == 0)) {
@@ -303,12 +320,18 @@ int main(int argc, char ** argv)
session.commit();
}
if (opts.messages && sent >= opts.messages) break;
+
+ if (opts.flowControl && flowSent == 2) {
+ flowControlReceiver.get(Duration::SECOND*1);
+ --flowSent;
+ }
+
if (opts.sendRate) {
qpid::sys::AbsTime waitTill(start, sent*interval);
int64_t delay = qpid::sys::Duration(qpid::sys::now(), waitTill);
- if (delay > 0)
- qpid::sys::usleep(delay/qpid::sys::TIME_USEC);
+ if (delay > 0) qpid::sys::usleep(delay/qpid::sys::TIME_USEC);
}
+ msg = Message(); // Clear out contents and properties for next iteration
}
if (opts.reportTotal) reporter.report();
for (uint i = opts.sendEos; i > 0; --i) {