diff options
author | Alan Conway <aconway@apache.org> | 2010-11-09 22:14:45 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2010-11-09 22:14:45 +0000 |
commit | fcccfa34ac3b304d13f2d97b9f0ed0ea95d9af1c (patch) | |
tree | 671edf6f9f87860b536df025388e597f98685296 /cpp/src | |
parent | 4870a01b7976362d3836ba0d56291f01b7f2e6af (diff) | |
download | qpid-python-fcccfa34ac3b304d13f2d97b9f0ed0ea95d9af1c.tar.gz |
Added --receive-rate to qpid-recieve to allow simulation of a slow receiver.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1033264 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src')
-rwxr-xr-x | cpp/src/tests/qpid-cpp-benchmark | 7 | ||||
-rw-r--r-- | cpp/src/tests/qpid-receive.cpp | 18 |
2 files changed, 22 insertions, 3 deletions
diff --git a/cpp/src/tests/qpid-cpp-benchmark b/cpp/src/tests/qpid-cpp-benchmark index 7d9d45435f..d69f28a250 100755 --- a/cpp/src/tests/qpid-cpp-benchmark +++ b/cpp/src/tests/qpid-cpp-benchmark @@ -38,8 +38,10 @@ op.add_option("-m", "--messages", default=100000, type="int", metavar="N", help="send N messages per sender (default %default)") op.add_option("--queue-name", default="benchmark", metavar="NAME", help="base name for queues (default %default)") -op.add_option("--send-rate", default=0, metavar="R", - help="send rate limited to R messages/second, 0 means no limit (default %default)") +op.add_option("--send-rate", default=0, metavar="N", + help="send rate limited to N messages/second, 0 means no limit (default %default)") +op.add_option("--receive-rate", default=0, metavar="N", + help="receive rate limited to N messages/second, 0 means no limit (default %default)") op.add_option("--content-size", default=1024, type="int", metavar="BYTES", help="message size in bytes (default %default)") op.add_option("--ack-frequency", default=0, metavar="N", type="int", @@ -79,6 +81,7 @@ def start_receive(queue, opts, ready_queue, broker, host): "-m", str((opts.senders*opts.messages)/opts.receivers), "--forever", "--print-content=no", + "--receive-rate", str(opts.receive_rate), "--report-total", "--ack-frequency", str(opts.ack_frequency), "--ready-address", ready_queue, diff --git a/cpp/src/tests/qpid-receive.cpp b/cpp/src/tests/qpid-receive.cpp index c8bb58ac54..823756268c 100644 --- a/cpp/src/tests/qpid-receive.cpp +++ b/cpp/src/tests/qpid-receive.cpp @@ -29,6 +29,7 @@ #include <qpid/Options.h> #include <qpid/log/Logger.h> #include <qpid/log/Options.h> +#include "qpid/sys/Time.h" #include "TestOptions.h" #include "Statistics.h" @@ -64,6 +65,7 @@ struct Options : public qpid::Options uint reportEvery; bool reportHeader; string readyAddress; + uint receiveRate; Options(const std::string& argv0=std::string()) : qpid::Options("Options"), @@ -83,7 +85,8 @@ struct Options : public qpid::Options log(argv0), reportTotal(false), reportEvery(0), - reportHeader(true) + reportHeader(true), + receiveRate(0) { addOptions() ("broker,b", qpid::optValue(url, "URL"), "url of broker to connect to") @@ -104,6 +107,7 @@ struct Options : public qpid::Options ("report-every", qpid::optValue(reportEvery,"N"), "Report throughput and latency statistics every N messages.") ("report-header", qpid::optValue(reportHeader, "yes|no"), "Headers on report.") ("ready-address", qpid::optValue(readyAddress, "ADDRESS"), "send a message to this address when ready to receive") + ("receive-rate", qpid::optValue(receiveRate,"N"), "Receive at rate of N messages/second. 0 means receive as fast as possible.") ("help", qpid::optValue(help), "print this usage statement"); add(log); } @@ -181,7 +185,14 @@ int main(int argc, char ** argv) Reporter<ThroughputAndLatency> reporter(std::cout, opts.reportEvery, opts.reportHeader); if (!opts.readyAddress.empty()) session.createSender(opts.readyAddress).send(msg); + + uint received = 0; + qpid::sys::AbsTime start = qpid::sys::now(); + int64_t interval = 0; + if (opts.receiveRate) interval = qpid::sys::TIME_SEC/opts.receiveRate; + while (!done && receiver.fetch(msg, timeout)) { + ++received; reporter.message(msg); if (!opts.ignoreDuplicates || !sequenceTracker.isDuplicate(msg)) { if (msg.getContent() == EOS) { @@ -213,6 +224,11 @@ int main(int argc, char ** argv) } else if (opts.ackFrequency && (count % opts.ackFrequency == 0)) { session.acknowledge(); } + if (opts.receiveRate) { + qpid::sys::AbsTime waitTill(start, received*interval); + int64_t delay = qpid::sys::Duration(qpid::sys::now(), waitTill); + if (delay > 0) qpid::sys::usleep(delay/qpid::sys::TIME_USEC); + } //opts.rejectFrequency?? } if (opts.reportTotal) reporter.report(); |