summaryrefslogtreecommitdiff
path: root/cpp/src/tests/latencytest.cpp
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2009-07-14 14:49:33 +0000
committerAlan Conway <aconway@apache.org>2009-07-14 14:49:33 +0000
commit141f7814e093845265b24c47509fb0a9047c1881 (patch)
tree7f4fda51f6bf4eec62f332d4c44bf353c08913a5 /cpp/src/tests/latencytest.cpp
parent8978cf64e20e9cc89aa973ea7cce2ed3c85ec568 (diff)
downloadqpid-python-141f7814e093845265b24c47509fb0a9047c1881.tar.gz
Minor cluster optimizations.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@793917 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/tests/latencytest.cpp')
-rw-r--r--cpp/src/tests/latencytest.cpp57
1 files changed, 44 insertions, 13 deletions
diff --git a/cpp/src/tests/latencytest.cpp b/cpp/src/tests/latencytest.cpp
index 6ad84e1b82..a98aac4855 100644
--- a/cpp/src/tests/latencytest.cpp
+++ b/cpp/src/tests/latencytest.cpp
@@ -49,7 +49,7 @@ struct Args : public qpid::TestOptions {
bool sync;
uint reportFrequency;
uint timeLimit;
- uint queues;
+ uint concurrentConnections;
uint prefetch;
uint ack;
bool cumulative;
@@ -57,17 +57,18 @@ struct Args : public qpid::TestOptions {
bool durable;
string base;
bool singleConnect;
+ uint queues;
Args() : size(256), count(1000), rate(0), reportFrequency(1000),
- timeLimit(0), queues(1),
+ timeLimit(0), concurrentConnections(1),
prefetch(100), ack(0),
- durable(false), base("latency-test"), singleConnect(false)
+ durable(false), base("latency-test"), singleConnect(false), queues(1)
{
addOptions()
("size", optValue(size, "N"), "message size")
- ("queues", optValue(queues, "N"), "number of queues")
+ ("concurrentTests", optValue(concurrentConnections, "N"), "number of concurrent test setup")
("single-connection", optValue(singleConnect, "yes|no"), "Use one connection for multiple sessions.")
("count", optValue(count, "N"), "number of messages to send")
("rate", optValue(rate, "N"), "target message rate (causes count to be ignored)")
@@ -81,7 +82,8 @@ struct Args : public qpid::TestOptions {
("durable", optValue(durable, "yes|no"), "use durable messages")
("csv", optValue(csv), "print stats in csv format (rate,min,max,avg)")
("cumulative", optValue(cumulative), "cumulative stats in csv format")
- ("queue-base-name", optValue(base, "<name>"), "base name for queues");
+ ("queue-base-name", optValue(base, "<name>"), "base name for queues")
+ ("queues", optValue(queues, "N"), "declare N queues & bindings to test routing");
}
};
@@ -246,8 +248,8 @@ void Receiver::test()
void Receiver::received(Message& msg)
{
++count;
- uint64_t sentAt = msg.getDeliveryProperties().getTimestamp();
uint64_t receivedAt = current_time();
+ uint64_t sentAt = msg.getDeliveryProperties().getTimestamp();
stats.update(((double) (receivedAt - sentAt)) / TIME_MSEC);
@@ -357,11 +359,11 @@ void Sender::sendByRate()
++missedRate;
else
sys::usleep(delay / TIME_USEC);
- if (timeLimit != 0 && Duration(start, now()) > timeLimit) {
- session.sync();
- receiver.stop();
- break;
- }
+ if (timeLimit != 0 && Duration(start, now()) > timeLimit) {
+ session.sync();
+ receiver.stop();
+ break;
+ }
}
}
@@ -414,8 +416,32 @@ int main(int argc, char** argv)
opts.parse(argc, argv);
if (opts.cumulative)
opts.csv = true;
- boost::ptr_vector<Test> tests(opts.queues);
- for (uint i = 0; i < opts.queues; i++) {
+
+ Connection localConnection;
+ AsyncSession session;
+ if (opts.queues > 1){
+ opts.open(localConnection);
+ session = localConnection.newSession();
+ std::cout << "More than one queue being used, creating..." << std::endl;
+ // use default binding
+ for (uint i=0;i<opts.queues;i++){
+
+ std::ostringstream out;
+ out << opts.base << "-" << (i+1);
+ session.queueDeclare(arg::queue=out.str(), arg::durable=opts.durable, arg::autoDelete=true);
+ uint msgCount = session.queueQuery(arg::queue=out.str()).get().getMessageCount();
+ if (msgCount) {
+ std::cout << "Warning: found " << msgCount << " msgs on " << out.str() << ". Purging..." << std::endl;
+ session.queuePurge(arg::queue=out.str());
+ }
+
+ }
+ session.sync();
+ std::cout << "Complete..." << std::endl;
+ }
+
+ boost::ptr_vector<Test> tests(opts.concurrentConnections);
+ for (uint i = 0; i < opts.concurrentConnections; i++) {
std::ostringstream out;
out << opts.base << "-" << (i+1);
tests.push_back(new Test(out.str()));
@@ -437,6 +463,11 @@ int main(int argc, char** argv)
}
}
+ if (opts.queues > 1){
+ session.close();
+ localConnection.close();
+ }
+
return 0;
} catch(const std::exception& e) {
std::cout << e.what() << std::endl;