diff options
Diffstat (limited to 'cpp/src/tests/qpid-receive.cpp')
-rw-r--r-- | cpp/src/tests/qpid-receive.cpp | 18 |
1 files changed, 17 insertions, 1 deletions
diff --git a/cpp/src/tests/qpid-receive.cpp b/cpp/src/tests/qpid-receive.cpp index c8bb58ac54..823756268c 100644 --- a/cpp/src/tests/qpid-receive.cpp +++ b/cpp/src/tests/qpid-receive.cpp @@ -29,6 +29,7 @@ #include <qpid/Options.h> #include <qpid/log/Logger.h> #include <qpid/log/Options.h> +#include "qpid/sys/Time.h" #include "TestOptions.h" #include "Statistics.h" @@ -64,6 +65,7 @@ struct Options : public qpid::Options uint reportEvery; bool reportHeader; string readyAddress; + uint receiveRate; Options(const std::string& argv0=std::string()) : qpid::Options("Options"), @@ -83,7 +85,8 @@ struct Options : public qpid::Options log(argv0), reportTotal(false), reportEvery(0), - reportHeader(true) + reportHeader(true), + receiveRate(0) { addOptions() ("broker,b", qpid::optValue(url, "URL"), "url of broker to connect to") @@ -104,6 +107,7 @@ struct Options : public qpid::Options ("report-every", qpid::optValue(reportEvery,"N"), "Report throughput and latency statistics every N messages.") ("report-header", qpid::optValue(reportHeader, "yes|no"), "Headers on report.") ("ready-address", qpid::optValue(readyAddress, "ADDRESS"), "send a message to this address when ready to receive") + ("receive-rate", qpid::optValue(receiveRate,"N"), "Receive at rate of N messages/second. 0 means receive as fast as possible.") ("help", qpid::optValue(help), "print this usage statement"); add(log); } @@ -181,7 +185,14 @@ int main(int argc, char ** argv) Reporter<ThroughputAndLatency> reporter(std::cout, opts.reportEvery, opts.reportHeader); if (!opts.readyAddress.empty()) session.createSender(opts.readyAddress).send(msg); + + uint received = 0; + qpid::sys::AbsTime start = qpid::sys::now(); + int64_t interval = 0; + if (opts.receiveRate) interval = qpid::sys::TIME_SEC/opts.receiveRate; + while (!done && receiver.fetch(msg, timeout)) { + ++received; reporter.message(msg); if (!opts.ignoreDuplicates || !sequenceTracker.isDuplicate(msg)) { if (msg.getContent() == EOS) { @@ -213,6 +224,11 @@ int main(int argc, char ** argv) } else if (opts.ackFrequency && (count % opts.ackFrequency == 0)) { session.acknowledge(); } + if (opts.receiveRate) { + qpid::sys::AbsTime waitTill(start, received*interval); + int64_t delay = qpid::sys::Duration(qpid::sys::now(), waitTill); + if (delay > 0) qpid::sys::usleep(delay/qpid::sys::TIME_USEC); + } //opts.rejectFrequency?? } if (opts.reportTotal) reporter.report(); |