diff options
author | Alan Conway <aconway@apache.org> | 2009-07-14 14:49:33 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2009-07-14 14:49:33 +0000 |
commit | 141f7814e093845265b24c47509fb0a9047c1881 (patch) | |
tree | 7f4fda51f6bf4eec62f332d4c44bf353c08913a5 /cpp/src/tests/latencytest.cpp | |
parent | 8978cf64e20e9cc89aa973ea7cce2ed3c85ec568 (diff) | |
download | qpid-python-141f7814e093845265b24c47509fb0a9047c1881.tar.gz |
Minor cluster optimizations.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@793917 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/tests/latencytest.cpp')
-rw-r--r-- | cpp/src/tests/latencytest.cpp | 57 |
1 files changed, 44 insertions, 13 deletions
diff --git a/cpp/src/tests/latencytest.cpp b/cpp/src/tests/latencytest.cpp index 6ad84e1b82..a98aac4855 100644 --- a/cpp/src/tests/latencytest.cpp +++ b/cpp/src/tests/latencytest.cpp @@ -49,7 +49,7 @@ struct Args : public qpid::TestOptions { bool sync; uint reportFrequency; uint timeLimit; - uint queues; + uint concurrentConnections; uint prefetch; uint ack; bool cumulative; @@ -57,17 +57,18 @@ struct Args : public qpid::TestOptions { bool durable; string base; bool singleConnect; + uint queues; Args() : size(256), count(1000), rate(0), reportFrequency(1000), - timeLimit(0), queues(1), + timeLimit(0), concurrentConnections(1), prefetch(100), ack(0), - durable(false), base("latency-test"), singleConnect(false) + durable(false), base("latency-test"), singleConnect(false), queues(1) { addOptions() ("size", optValue(size, "N"), "message size") - ("queues", optValue(queues, "N"), "number of queues") + ("concurrentTests", optValue(concurrentConnections, "N"), "number of concurrent test setup") ("single-connection", optValue(singleConnect, "yes|no"), "Use one connection for multiple sessions.") ("count", optValue(count, "N"), "number of messages to send") ("rate", optValue(rate, "N"), "target message rate (causes count to be ignored)") @@ -81,7 +82,8 @@ struct Args : public qpid::TestOptions { ("durable", optValue(durable, "yes|no"), "use durable messages") ("csv", optValue(csv), "print stats in csv format (rate,min,max,avg)") ("cumulative", optValue(cumulative), "cumulative stats in csv format") - ("queue-base-name", optValue(base, "<name>"), "base name for queues"); + ("queue-base-name", optValue(base, "<name>"), "base name for queues") + ("queues", optValue(queues, "N"), "declare N queues & bindings to test routing"); } }; @@ -246,8 +248,8 @@ void Receiver::test() void Receiver::received(Message& msg) { ++count; - uint64_t sentAt = msg.getDeliveryProperties().getTimestamp(); uint64_t receivedAt = current_time(); + uint64_t sentAt = msg.getDeliveryProperties().getTimestamp(); stats.update(((double) (receivedAt - sentAt)) / TIME_MSEC); @@ -357,11 +359,11 @@ void Sender::sendByRate() ++missedRate; else sys::usleep(delay / TIME_USEC); - if (timeLimit != 0 && Duration(start, now()) > timeLimit) { - session.sync(); - receiver.stop(); - break; - } + if (timeLimit != 0 && Duration(start, now()) > timeLimit) { + session.sync(); + receiver.stop(); + break; + } } } @@ -414,8 +416,32 @@ int main(int argc, char** argv) opts.parse(argc, argv); if (opts.cumulative) opts.csv = true; - boost::ptr_vector<Test> tests(opts.queues); - for (uint i = 0; i < opts.queues; i++) { + + Connection localConnection; + AsyncSession session; + if (opts.queues > 1){ + opts.open(localConnection); + session = localConnection.newSession(); + std::cout << "More than one queue being used, creating..." << std::endl; + // use default binding + for (uint i=0;i<opts.queues;i++){ + + std::ostringstream out; + out << opts.base << "-" << (i+1); + session.queueDeclare(arg::queue=out.str(), arg::durable=opts.durable, arg::autoDelete=true); + uint msgCount = session.queueQuery(arg::queue=out.str()).get().getMessageCount(); + if (msgCount) { + std::cout << "Warning: found " << msgCount << " msgs on " << out.str() << ". Purging..." << std::endl; + session.queuePurge(arg::queue=out.str()); + } + + } + session.sync(); + std::cout << "Complete..." << std::endl; + } + + boost::ptr_vector<Test> tests(opts.concurrentConnections); + for (uint i = 0; i < opts.concurrentConnections; i++) { std::ostringstream out; out << opts.base << "-" << (i+1); tests.push_back(new Test(out.str())); @@ -437,6 +463,11 @@ int main(int argc, char** argv) } } + if (opts.queues > 1){ + session.close(); + localConnection.close(); + } + return 0; } catch(const std::exception& e) { std::cout << e.what() << std::endl; |