summaryrefslogtreecommitdiff
path: root/cpp/src/tests/qpid-send.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/tests/qpid-send.cpp')
-rw-r--r--cpp/src/tests/qpid-send.cpp27
1 files changed, 25 insertions, 2 deletions
diff --git a/cpp/src/tests/qpid-send.cpp b/cpp/src/tests/qpid-send.cpp
index 7e96cc1a09..e5ae6a9e4a 100644
--- a/cpp/src/tests/qpid-send.cpp
+++ b/cpp/src/tests/qpid-send.cpp
@@ -23,6 +23,7 @@
#include <qpid/messaging/Connection.h>
#include <qpid/messaging/Message.h>
#include <qpid/messaging/Sender.h>
+#include <qpid/messaging/Receiver.h>
#include <qpid/messaging/Session.h>
#include <qpid/messaging/FailoverUpdates.h>
#include <qpid/sys/Time.h>
@@ -71,6 +72,7 @@ struct Options : public qpid::Options
uint reportEvery;
bool reportHeader;
uint sendRate;
+ uint flowControl;
bool sequence;
bool timestamp;
@@ -94,6 +96,7 @@ struct Options : public qpid::Options
reportEvery(0),
reportHeader(true),
sendRate(0),
+ flowControl(0),
sequence(true),
timestamp(true)
{
@@ -122,6 +125,7 @@ struct Options : public qpid::Options
("report-every", qpid::optValue(reportEvery,"N"), "Report throughput statistics every N messages")
("report-header", qpid::optValue(reportHeader, "yes|no"), "Headers on report.")
("send-rate", qpid::optValue(sendRate,"N"), "Send at rate of N messages/second. 0 means send as fast as possible.")
+ ("flow-control", qpid::optValue(flowControl,"N"), "Do end to end flow control to limit queue depth to 2*N. 0 means no flow control.")
("sequence", qpid::optValue(sequence, "yes|no"), "Add a sequence number messages property (required for duplicate/lost message detection)")
("timestamp", qpid::optValue(timestamp, "yes|no"), "Add a time stamp messages property (required for latency measurement)")
("help", qpid::optValue(help), "print this usage statement");
@@ -286,6 +290,14 @@ int main(int argc, char ** argv)
int64_t interval = 0;
if (opts.sendRate) interval = qpid::sys::TIME_SEC/opts.sendRate;
+ Receiver flowControlReceiver;
+ Address flowControlAddress(Uuid(true).str()+";{create:always}");
+ uint flowSent = 0;
+ if (opts.flowControl) {
+ flowControlReceiver = session.createReceiver(flowControlAddress);
+ flowControlReceiver.setCapacity(2);
+ }
+
while (contentGen->setContent(msg)) {
++sent;
if (opts.sequence)
@@ -293,6 +305,11 @@ int main(int argc, char ** argv)
if (opts.timestamp)
msg.getProperties()[TS] = int64_t(
qpid::sys::Duration(qpid::sys::EPOCH, qpid::sys::now()));
+ if (opts.flowControl && ((sent % opts.flowControl) == 0)) {
+ msg.setReplyTo(flowControlAddress);
+ ++flowSent;
+ }
+
sender.send(msg);
reporter.message(msg);
if (opts.tx && (sent % opts.tx == 0)) {
@@ -303,12 +320,18 @@ int main(int argc, char ** argv)
session.commit();
}
if (opts.messages && sent >= opts.messages) break;
+
+ if (opts.flowControl && flowSent == 2) {
+ flowControlReceiver.get(Duration::SECOND*1);
+ --flowSent;
+ }
+
if (opts.sendRate) {
qpid::sys::AbsTime waitTill(start, sent*interval);
int64_t delay = qpid::sys::Duration(qpid::sys::now(), waitTill);
- if (delay > 0)
- qpid::sys::usleep(delay/qpid::sys::TIME_USEC);
+ if (delay > 0) qpid::sys::usleep(delay/qpid::sys::TIME_USEC);
}
+ msg = Message(); // Clear out contents and properties for next iteration
}
if (opts.reportTotal) reporter.report();
for (uint i = opts.sendEos; i > 0; --i) {