diff options
Diffstat (limited to 'qpid/cpp/src/tests/perftest.cpp')
-rw-r--r-- | qpid/cpp/src/tests/perftest.cpp | 77 |
1 files changed, 42 insertions, 35 deletions
diff --git a/qpid/cpp/src/tests/perftest.cpp b/qpid/cpp/src/tests/perftest.cpp index d383e0eb80..88d9fd15cb 100644 --- a/qpid/cpp/src/tests/perftest.cpp +++ b/qpid/cpp/src/tests/perftest.cpp @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -49,6 +49,9 @@ using namespace sys; using boost::lexical_cast; using boost::bind; +namespace qpid { +namespace tests { + enum Mode { SHARED, FANOUT, TOPIC }; const char* modeNames[] = { "shared", "fanout", "topic" }; @@ -105,9 +108,9 @@ struct Opts : public TestOptions { bool commitAsync; static const std::string helpText; - + Opts() : - TestOptions(helpText), + TestOptions(helpText), setup(false), control(false), publish(false), subscribe(false), baseName("perftest"), pubs(1), count(500000), size(1024), confirm(true), durable(false), uniqueData(false), syncPub(false), subs(1), ack(0), @@ -136,16 +139,16 @@ struct Opts : public TestOptions { ("nsubs", optValue(subs, "N"), "Create N subscribers.") ("sub-ack", optValue(ack, "N"), "N>0: Subscriber acks batches of N.\n" "N==0: Subscriber uses unconfirmed mode") - + ("qt", optValue(qt, "N"), "Create N queues or topics.") ("single-connection", optValue(singleConnect, "yes|no"), "Use one connection for multiple sessions.") - + ("iterations", optValue(iterations, "N"), "Desired number of iterations of the test.") ("summary,s", optValue(summary), "Summary output: pubs/sec subs/sec transfers/sec Mbytes/sec") ("queue-max-count", optValue(queueMaxCount, "N"), "queue policy: count to trigger 'flow to disk'") ("queue-max-size", optValue(queueMaxSize, "N"), "queue policy: accumulated size to trigger 'flow to disk'") - ("base-name", optValue(baseName, "NAME"), "base name used for queues or topics") + ("base-name", optValue(baseName, "NAME"), "base name used for queues or topics") ("queue-durable", optValue(queueDurable, "N"), "Make queue durable (implied if durable set)") ("interval_sub", optValue(intervalSub, "ms"), ">=0 delay between msg consume") @@ -171,7 +174,7 @@ struct Opts : public TestOptions { count += subs - (count % subs); cout << "WARNING: Adjusted --count to " << count << " the nearest multiple of --nsubs" << endl; - } + } totalPubs = pubs*qt; totalSubs = subs*qt; subQuota = (pubs*count)/subs; @@ -258,7 +261,7 @@ struct Client : public Runnable { }; struct Setup : public Client { - + void queueInit(string name, bool durable=false, const framing::FieldTable& settings=framing::FieldTable()) { session.queueDeclare(arg::queue=name, arg::durable=durable, arg::arguments=settings); session.queuePurge(arg::queue=name); @@ -278,7 +281,7 @@ struct Setup : public Client { for (size_t i = 0; i < opts.qt; ++i) { ostringstream qname; qname << opts.baseName << i; - queueInit(qname.str(), opts.durable || opts.queueDurable, settings); + queueInit(qname.str(), opts.durable || opts.queueDurable, settings); } } } @@ -303,7 +306,7 @@ class Stats { public: Stats() : sum(0) {} - + // Functor to collect rates. void operator()(const string& data) { try { @@ -314,7 +317,7 @@ class Stats { throw Exception("Bad report: "+data); } } - + double mean() const { return sum/values.size(); } @@ -331,7 +334,7 @@ class Stats { } return sqrt(ssq/(values.size()-1)); } - + ostream& print(ostream& out) { ostream_iterator<double> o(out, "\n"); copy(values.begin(), values.end(), o); @@ -341,11 +344,11 @@ class Stats { return out << endl; } }; - + // Manage control queues, collect and print reports. struct Controller : public Client { - + SubscriptionManager subs; Controller() : subs(session) {} @@ -354,7 +357,7 @@ struct Controller : public Client { void process(size_t n, string queue, boost::function<void (const string&)> msgFn) { - if (!opts.summary) + if (!opts.summary) cout << "Processing " << n << " messages from " << queue << " " << flush; LocalQueue lq; @@ -370,8 +373,8 @@ struct Controller : public Client { void process(size_t n, LocalQueue lq, string queue, boost::function<void (const string&)> msgFn) { - session.messageFlow(queue, 0, n); - if (!opts.summary) + session.messageFlow(queue, 0, n); + if (!opts.summary) cout << "Processing " << n << " messages from " << queue << " " << flush; for (size_t i = 0; i < n; ++i) { @@ -386,7 +389,7 @@ struct Controller : public Client { cout << "Sending " << data << " " << n << " times to " << queue << endl; Message msg(data, queue); - for (size_t i = 0; i < n; ++i) + for (size_t i = 0; i < n; ++i) session.messageTransfer(arg::content=msg, arg::acceptMode=1); } @@ -419,7 +422,7 @@ struct Controller : public Client { process(opts.totalPubs, pubDone, fqn("pub_done"), boost::ref(pubRates)); process(opts.totalSubs, subDone, fqn("sub_done"), boost::ref(subRates)); - AbsTime end=now(); + AbsTime end=now(); double time=secs(start, end); double txrate=opts.transfers/time; @@ -469,12 +472,12 @@ struct PublishThread : public Client { string routingKey; PublishThread() {}; - + PublishThread(string key, string dest=string()) { destination=dest; routingKey=key; } - + void run() { // Publisher try { string data; @@ -492,7 +495,7 @@ struct PublishThread : public Client { } } else { size_t msgSize=max(opts.size, sizeof(size_t)); - data = string(msgSize, 'X'); + data = string(msgSize, 'X'); } Message msg(data, routingKey); @@ -500,21 +503,21 @@ struct PublishThread : public Client { msg.getDeliveryProperties().setDeliveryMode(framing::PERSISTENT); - if (opts.txPub){ + if (opts.txPub){ session.txSelect(); } SubscriptionManager subs(session); LocalQueue lq; - subs.setFlowControl(1, SubscriptionManager::UNLIMITED, true); - subs.subscribe(lq, fqn("pub_start")); - + subs.setFlowControl(1, SubscriptionManager::UNLIMITED, true); + subs.subscribe(lq, fqn("pub_start")); + for (size_t j = 0; j < opts.iterations; ++j) { expect(lq.pop().getData(), "start"); AbsTime start=now(); for (size_t i=0; i<opts.count; i++) { // Stamp the iteration into the message data, avoid // any heap allocation. - const_cast<std::string&>(msg.getData()).replace(offset, sizeof(size_t), + const_cast<std::string&>(msg.getData()).replace(offset, sizeof(size_t), reinterpret_cast<const char*>(&i), sizeof(size_t)); if (opts.syncPub) { sync(session).messageTransfer( @@ -540,7 +543,7 @@ struct PublishThread : public Client { if (opts.confirm) session.sync(); AbsTime end=now(); double time=secs(start,end); - + // Send result to controller. Message report(lexical_cast<string>(opts.count/time), fqn("pub_done")); session.messageTransfer(arg::content=report, arg::acceptMode=1); @@ -561,7 +564,7 @@ struct SubscribeThread : public Client { string queue; SubscribeThread() {} - + SubscribeThread(string q) { queue = q; } SubscribeThread(string key, string ex) { @@ -586,7 +589,7 @@ struct SubscribeThread : public Client { } void run() { // Subscribe - try { + try { if (opts.txSub) sync(session).txSelect(); SubscriptionManager subs(session); SubscriptionSettings settings; @@ -606,15 +609,15 @@ struct SubscribeThread : public Client { if (opts.iterations > 1) { subs.subscribe(iterationControl, fqn("sub_iteration"), SubscriptionSettings(FlowControl::messageCredit(0))); } - + for (size_t j = 0; j < opts.iterations; ++j) { if (j > 0) { //need to wait here until all subs are done - session.messageFlow(fqn("sub_iteration"), 0, 1); + session.messageFlow(fqn("sub_iteration"), 0, 1); iterationControl.pop(); //need to allocate some more credit for subscription - session.messageFlow(queue, 0, opts.subQuota); + session.messageFlow(queue, 0, opts.subQuota); } Message msg; AbsTime start=now(); @@ -627,7 +630,7 @@ struct SubscribeThread : public Client { } if (opts.intervalSub) qpid::sys::usleep(opts.intervalSub*1000); - // TODO aconway 2007-11-23: check message order for. + // TODO aconway 2007-11-23: check message order for. // multiple publishers. Need an array of counters, // one per publisher and a publisher ID in the // message. Careful not to introduce a lot of overhead @@ -664,6 +667,10 @@ struct SubscribeThread : public Client { } }; +}} // namespace qpid::tests + +using namespace qpid::tests; + int main(int argc, char** argv) { int exitCode = 0; boost::ptr_vector<Client> subs(opts.subs); |