summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rwxr-xr-xcpp/src/tests/qpid-cpp-benchmark7
-rw-r--r--cpp/src/tests/qpid-receive.cpp18
2 files changed, 22 insertions, 3 deletions
diff --git a/cpp/src/tests/qpid-cpp-benchmark b/cpp/src/tests/qpid-cpp-benchmark
index 7d9d45435f..d69f28a250 100755
--- a/cpp/src/tests/qpid-cpp-benchmark
+++ b/cpp/src/tests/qpid-cpp-benchmark
@@ -38,8 +38,10 @@ op.add_option("-m", "--messages", default=100000, type="int", metavar="N",
help="send N messages per sender (default %default)")
op.add_option("--queue-name", default="benchmark", metavar="NAME",
help="base name for queues (default %default)")
-op.add_option("--send-rate", default=0, metavar="R",
- help="send rate limited to R messages/second, 0 means no limit (default %default)")
+op.add_option("--send-rate", default=0, metavar="N",
+ help="send rate limited to N messages/second, 0 means no limit (default %default)")
+op.add_option("--receive-rate", default=0, metavar="N",
+ help="receive rate limited to N messages/second, 0 means no limit (default %default)")
op.add_option("--content-size", default=1024, type="int", metavar="BYTES",
help="message size in bytes (default %default)")
op.add_option("--ack-frequency", default=0, metavar="N", type="int",
@@ -79,6 +81,7 @@ def start_receive(queue, opts, ready_queue, broker, host):
"-m", str((opts.senders*opts.messages)/opts.receivers),
"--forever",
"--print-content=no",
+ "--receive-rate", str(opts.receive_rate),
"--report-total",
"--ack-frequency", str(opts.ack_frequency),
"--ready-address", ready_queue,
diff --git a/cpp/src/tests/qpid-receive.cpp b/cpp/src/tests/qpid-receive.cpp
index c8bb58ac54..823756268c 100644
--- a/cpp/src/tests/qpid-receive.cpp
+++ b/cpp/src/tests/qpid-receive.cpp
@@ -29,6 +29,7 @@
#include <qpid/Options.h>
#include <qpid/log/Logger.h>
#include <qpid/log/Options.h>
+#include "qpid/sys/Time.h"
#include "TestOptions.h"
#include "Statistics.h"
@@ -64,6 +65,7 @@ struct Options : public qpid::Options
uint reportEvery;
bool reportHeader;
string readyAddress;
+ uint receiveRate;
Options(const std::string& argv0=std::string())
: qpid::Options("Options"),
@@ -83,7 +85,8 @@ struct Options : public qpid::Options
log(argv0),
reportTotal(false),
reportEvery(0),
- reportHeader(true)
+ reportHeader(true),
+ receiveRate(0)
{
addOptions()
("broker,b", qpid::optValue(url, "URL"), "url of broker to connect to")
@@ -104,6 +107,7 @@ struct Options : public qpid::Options
("report-every", qpid::optValue(reportEvery,"N"), "Report throughput and latency statistics every N messages.")
("report-header", qpid::optValue(reportHeader, "yes|no"), "Headers on report.")
("ready-address", qpid::optValue(readyAddress, "ADDRESS"), "send a message to this address when ready to receive")
+ ("receive-rate", qpid::optValue(receiveRate,"N"), "Receive at rate of N messages/second. 0 means receive as fast as possible.")
("help", qpid::optValue(help), "print this usage statement");
add(log);
}
@@ -181,7 +185,14 @@ int main(int argc, char ** argv)
Reporter<ThroughputAndLatency> reporter(std::cout, opts.reportEvery, opts.reportHeader);
if (!opts.readyAddress.empty())
session.createSender(opts.readyAddress).send(msg);
+
+ uint received = 0;
+ qpid::sys::AbsTime start = qpid::sys::now();
+ int64_t interval = 0;
+ if (opts.receiveRate) interval = qpid::sys::TIME_SEC/opts.receiveRate;
+
while (!done && receiver.fetch(msg, timeout)) {
+ ++received;
reporter.message(msg);
if (!opts.ignoreDuplicates || !sequenceTracker.isDuplicate(msg)) {
if (msg.getContent() == EOS) {
@@ -213,6 +224,11 @@ int main(int argc, char ** argv)
} else if (opts.ackFrequency && (count % opts.ackFrequency == 0)) {
session.acknowledge();
}
+ if (opts.receiveRate) {
+ qpid::sys::AbsTime waitTill(start, received*interval);
+ int64_t delay = qpid::sys::Duration(qpid::sys::now(), waitTill);
+ if (delay > 0) qpid::sys::usleep(delay/qpid::sys::TIME_USEC);
+ }
//opts.rejectFrequency??
}
if (opts.reportTotal) reporter.report();