diff options
author | Gordon Sim <gsim@apache.org> | 2008-01-16 15:18:34 +0000 |
---|---|---|
committer | Gordon Sim <gsim@apache.org> | 2008-01-16 15:18:34 +0000 |
commit | 40a1f2d87faebcbf2f59f7f9597d6451572d5b7c (patch) | |
tree | f7a9c7a75cccd9646e0c001793e9077dba6cb09b /cpp | |
parent | 4441461688a301e03e8ed2f69484ae8f0b2c14b7 (diff) | |
download | qpid-python-40a1f2d87faebcbf2f59f7f9597d6451572d5b7c.tar.gz |
Add option to perftest to run for n iterations and print averages of all reported rates.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@612478 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp')
-rw-r--r-- | cpp/src/tests/perftest.cpp | 203 |
1 files changed, 128 insertions, 75 deletions
diff --git a/cpp/src/tests/perftest.cpp b/cpp/src/tests/perftest.cpp index f6a7490050..bba4efd8fc 100644 --- a/cpp/src/tests/perftest.cpp +++ b/cpp/src/tests/perftest.cpp @@ -87,6 +87,7 @@ struct Opts : public TestOptions { // General size_t qt; + size_t iterations; Mode mode; bool summary; @@ -97,7 +98,7 @@ struct Opts : public TestOptions { setup(false), control(false), publish(false), subscribe(false), pubs(1), count(500000), size(1024), confirm(true), durable(false), uniqueData(false), subs(1), ack(0), - qt(1), mode(SHARED), summary(false) + qt(1), iterations(1), mode(SHARED), summary(false) { addOptions() ("setup", optValue(setup), "Create shared queues.") @@ -122,6 +123,7 @@ struct Opts : public TestOptions { "N==0: Subscriber uses unconfirmed mode") ("qt", optValue(qt, "N"), "Create N queues or topics.") + ("iterations", optValue(iterations, "N"), "Desired number of iterations of the test.") ("summary,s", optValue(summary), "Summary output: pubs/sec subs/sec transfers/sec Mbytes/sec") ("queue_max_count", optValue(queueMaxCount, "N"), "queue policy: count to trigger 'flow to disk'") @@ -303,6 +305,20 @@ struct Controller : public Client { if (!opts.summary) cout << " done." << endl; } + void process(size_t n, LocalQueue lq, string queue, + boost::function<void (const string&)> msgFn) + { + session.messageFlow(queue, 0, n); + if (!opts.summary) + cout << "Processing " << n << " messages from " + << queue << " " << flush; + for (size_t i = 0; i < n; ++i) { + if (!opts.summary) cout << "." << flush; + msgFn(lq.pop().getData()); + } + if (!opts.summary) cout << " done." << endl; + } + void send(size_t n, string queue, string data) { if (!opts.summary) cout << "Sending " << data << " " << n << " times to " << queue @@ -317,35 +333,63 @@ struct Controller : public Client { // Wait for subscribers to be ready. process(opts.totalSubs, "sub_ready", bind(expect, _1, "ready")); - Stats pubRates; - Stats subRates; - - AbsTime start=now(); - send(opts.totalPubs, "pub_start", "start"); // Start publishers - process(opts.totalPubs, "pub_done", boost::ref(pubRates)); - process(opts.totalSubs, "sub_done", boost::ref(subRates)); - AbsTime end=now(); - double time=secs(start, end); - double txrate=opts.transfers/time; - double mbytes=(txrate*opts.size)/(1024*1024); - - if (!opts.summary) { - cout << endl << "Total " << opts.transfers << " transfers of " - << opts.size << " bytes in " - << time << " seconds." << endl; - cout << endl << "Publish transfers/sec: " << endl; - pubRates.print(cout); - cout << endl << "Subscribe transfers/sec: " << endl; - subRates.print(cout); - cout << endl - << "Total transfers/sec: " << txrate << endl - << "Total Mbytes/sec: " << mbytes << endl; + LocalQueue pubDone; + LocalQueue subDone; + subs.setFlowControl(0, SubscriptionManager::UNLIMITED, false); + subs.subscribe(pubDone, "pub_done"); + subs.subscribe(subDone, "sub_done"); + + double txrateTotal(0); + double mbytesTotal(0); + double pubRateTotal(0); + double subRateTotal(0); + + for (size_t j = 0; j < opts.iterations; ++j) { + AbsTime start=now(); + send(opts.totalPubs, "pub_start", "start"); // Start publishers + + Stats pubRates; + Stats subRates; + + process(opts.totalPubs, pubDone, "pub_done", boost::ref(pubRates)); + process(opts.totalSubs, subDone, "sub_done", boost::ref(subRates)); + + AbsTime end=now(); + + double time=secs(start, end); + double txrate=opts.transfers/time; + double mbytes=(txrate*opts.size)/(1024*1024); + + if (!opts.summary) { + cout << endl << "Total " << opts.transfers << " transfers of " + << opts.size << " bytes in " + << time << " seconds." << endl; + cout << endl << "Publish transfers/sec: " << endl; + pubRates.print(cout); + cout << endl << "Subscribe transfers/sec: " << endl; + subRates.print(cout); + cout << endl + << "Total transfers/sec: " << txrate << endl + << "Total Mbytes/sec: " << mbytes << endl; + } + else { + cout << pubRates.mean() << "\t" + << subRates.mean() << "\t" + << txrate << "\t" + << mbytes << endl; + } + + txrateTotal += txrate; + mbytesTotal += mbytes; + pubRateTotal += pubRates.mean(); + subRateTotal += subRates.mean(); } - else { - cout << pubRates.mean() << "\t" - << subRates.mean() << "\t" - << txrate << "\t" - << mbytes << endl; + if (opts.iterations > 1) { + cout << "Averages: "<< endl + << (pubRateTotal / opts.iterations) << "\t" + << (subRateTotal / opts.iterations) << "\t" + << (txrateTotal / opts.iterations) << "\t" + << (mbytesTotal / opts.iterations) << endl; } } catch (const std::exception& e) { @@ -394,29 +438,31 @@ struct PublishThread : public Client { SubscriptionManager subs(session); - LocalQueue lq(AckPolicy(opts.ack)); - subs.setFlowControl(1, SubscriptionManager::UNLIMITED, false); + LocalQueue lq; + subs.setFlowControl(1, SubscriptionManager::UNLIMITED, true); subs.subscribe(lq, "pub_start"); - expect(lq.pop().getData(), "start"); - AbsTime start=now(); - for (size_t i=0; i<opts.count; i++) { - // Stamp the iteration into the message data, avoid - // any heap allocation. - const_cast<std::string&>(msg.getData()).replace(offset, sizeof(uint32_t), reinterpret_cast<const char*>(&i), sizeof(uint32_t)); - completion = session.messageTransfer( - arg::destination=destination, - arg::content=msg, - arg::confirmMode=opts.confirm); + for (size_t j = 0; j < opts.iterations; ++j) { + expect(lq.pop().getData(), "start"); + AbsTime start=now(); + for (size_t i=0; i<opts.count; i++) { + // Stamp the iteration into the message data, avoid + // any heap allocation. + const_cast<std::string&>(msg.getData()).replace(offset, sizeof(uint32_t), + reinterpret_cast<const char*>(&i), sizeof(uint32_t)); + completion = session.messageTransfer( + arg::destination=destination, + arg::content=msg, + arg::confirmMode=opts.confirm); + } + if (opts.confirm) completion.sync(); + AbsTime end=now(); + double time=secs(start,end); + + // Send result to controller. + Message report(lexical_cast<string>(opts.count/time), "pub_done"); + session.messageTransfer(arg::content=report); } - if (opts.confirm) completion.sync(); - AbsTime end=now(); - double time=secs(start,end); - - // Send result to controller. - msg.setData(lexical_cast<string>(opts.count/time)); - msg.getDeliveryProperties().setRoutingKey("pub_done"); - session.messageTransfer(arg::content=msg); session.close(); } catch (const std::exception& e) { @@ -466,34 +512,41 @@ struct SubscribeThread : public Client { // Notify controller we are ready. session.messageTransfer(arg::content=Message("ready", "sub_ready")); - Message msg; - AbsTime start=now(); - size_t expect=0; - for (size_t i = 0; i < opts.subQuota; ++i) { - msg=lq.pop(); - // TODO aconway 2007-11-23: check message order for. - // multiple publishers. Need an acorray of counters, - // one per publisher and a publisher ID in the - // message. Careful not to introduce a lot of overhead - // here, e.g. no std::map, std::string etc. - // - // For now verify order only for a single publisher. - size_t offset = opts.uniqueData ? 5 /*marker is 'data:'*/ : 0; - size_t n = *reinterpret_cast<const uint32_t*>(msg.getData().data() + offset); - if (opts.pubs == 1) { - if (opts.subs == 1 || opts.mode == FANOUT) verify(n==expect, "==", expect, n); - else verify(n>=expect, ">=", expect, n); - expect = n+1; + + for (size_t j = 0; j < opts.iterations; ++j) { + if (j > 0) { + //need to allocate some more credit + session.messageFlow(queue, 0, opts.subQuota); + } + Message msg; + AbsTime start=now(); + size_t expect=0; + for (size_t i = 0; i < opts.subQuota; ++i) { + msg=lq.pop(); + // TODO aconway 2007-11-23: check message order for. + // multiple publishers. Need an acorray of counters, + // one per publisher and a publisher ID in the + // message. Careful not to introduce a lot of overhead + // here, e.g. no std::map, std::string etc. + // + // For now verify order only for a single publisher. + size_t offset = opts.uniqueData ? 5 /*marker is 'data:'*/ : 0; + size_t n = *reinterpret_cast<const uint32_t*>(msg.getData().data() + offset); + if (opts.pubs == 1) { + if (opts.subs == 1 || opts.mode == FANOUT) verify(n==expect, "==", expect, n); + else verify(n>=expect, ">=", expect, n); + expect = n+1; + } } + if (opts.ack !=0) + msg.acknowledge(); // Cumulative ack for final batch. + AbsTime end=now(); + + // Report to publisher. + Message result(lexical_cast<string>(opts.subQuota/secs(start,end)), + "sub_done"); + session.messageTransfer(arg::content=result); } - if (opts.ack !=0) - msg.acknowledge(); // Cumulative ack for final batch. - AbsTime end=now(); - - // Report to publisher. - Message result(lexical_cast<string>(opts.subQuota/secs(start,end)), - "sub_done"); - session.messageTransfer(arg::content=result); session.close(); } catch (const std::exception& e) { |