summaryrefslogtreecommitdiff
path: root/cpp
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2008-01-16 15:18:34 +0000
committerGordon Sim <gsim@apache.org>2008-01-16 15:18:34 +0000
commit40a1f2d87faebcbf2f59f7f9597d6451572d5b7c (patch)
treef7a9c7a75cccd9646e0c001793e9077dba6cb09b /cpp
parent4441461688a301e03e8ed2f69484ae8f0b2c14b7 (diff)
downloadqpid-python-40a1f2d87faebcbf2f59f7f9597d6451572d5b7c.tar.gz
Add option to perftest to run for n iterations and print averages of all reported rates.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@612478 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp')
-rw-r--r--cpp/src/tests/perftest.cpp203
1 files changed, 128 insertions, 75 deletions
diff --git a/cpp/src/tests/perftest.cpp b/cpp/src/tests/perftest.cpp
index f6a7490050..bba4efd8fc 100644
--- a/cpp/src/tests/perftest.cpp
+++ b/cpp/src/tests/perftest.cpp
@@ -87,6 +87,7 @@ struct Opts : public TestOptions {
// General
size_t qt;
+ size_t iterations;
Mode mode;
bool summary;
@@ -97,7 +98,7 @@ struct Opts : public TestOptions {
setup(false), control(false), publish(false), subscribe(false),
pubs(1), count(500000), size(1024), confirm(true), durable(false), uniqueData(false),
subs(1), ack(0),
- qt(1), mode(SHARED), summary(false)
+ qt(1), iterations(1), mode(SHARED), summary(false)
{
addOptions()
("setup", optValue(setup), "Create shared queues.")
@@ -122,6 +123,7 @@ struct Opts : public TestOptions {
"N==0: Subscriber uses unconfirmed mode")
("qt", optValue(qt, "N"), "Create N queues or topics.")
+ ("iterations", optValue(iterations, "N"), "Desired number of iterations of the test.")
("summary,s", optValue(summary), "Summary output: pubs/sec subs/sec transfers/sec Mbytes/sec")
("queue_max_count", optValue(queueMaxCount, "N"), "queue policy: count to trigger 'flow to disk'")
@@ -303,6 +305,20 @@ struct Controller : public Client {
if (!opts.summary) cout << " done." << endl;
}
+ void process(size_t n, LocalQueue lq, string queue,
+ boost::function<void (const string&)> msgFn)
+ {
+ session.messageFlow(queue, 0, n);
+ if (!opts.summary)
+ cout << "Processing " << n << " messages from "
+ << queue << " " << flush;
+ for (size_t i = 0; i < n; ++i) {
+ if (!opts.summary) cout << "." << flush;
+ msgFn(lq.pop().getData());
+ }
+ if (!opts.summary) cout << " done." << endl;
+ }
+
void send(size_t n, string queue, string data) {
if (!opts.summary)
cout << "Sending " << data << " " << n << " times to " << queue
@@ -317,35 +333,63 @@ struct Controller : public Client {
// Wait for subscribers to be ready.
process(opts.totalSubs, "sub_ready", bind(expect, _1, "ready"));
- Stats pubRates;
- Stats subRates;
-
- AbsTime start=now();
- send(opts.totalPubs, "pub_start", "start"); // Start publishers
- process(opts.totalPubs, "pub_done", boost::ref(pubRates));
- process(opts.totalSubs, "sub_done", boost::ref(subRates));
- AbsTime end=now();
- double time=secs(start, end);
- double txrate=opts.transfers/time;
- double mbytes=(txrate*opts.size)/(1024*1024);
-
- if (!opts.summary) {
- cout << endl << "Total " << opts.transfers << " transfers of "
- << opts.size << " bytes in "
- << time << " seconds." << endl;
- cout << endl << "Publish transfers/sec: " << endl;
- pubRates.print(cout);
- cout << endl << "Subscribe transfers/sec: " << endl;
- subRates.print(cout);
- cout << endl
- << "Total transfers/sec: " << txrate << endl
- << "Total Mbytes/sec: " << mbytes << endl;
+ LocalQueue pubDone;
+ LocalQueue subDone;
+ subs.setFlowControl(0, SubscriptionManager::UNLIMITED, false);
+ subs.subscribe(pubDone, "pub_done");
+ subs.subscribe(subDone, "sub_done");
+
+ double txrateTotal(0);
+ double mbytesTotal(0);
+ double pubRateTotal(0);
+ double subRateTotal(0);
+
+ for (size_t j = 0; j < opts.iterations; ++j) {
+ AbsTime start=now();
+ send(opts.totalPubs, "pub_start", "start"); // Start publishers
+
+ Stats pubRates;
+ Stats subRates;
+
+ process(opts.totalPubs, pubDone, "pub_done", boost::ref(pubRates));
+ process(opts.totalSubs, subDone, "sub_done", boost::ref(subRates));
+
+ AbsTime end=now();
+
+ double time=secs(start, end);
+ double txrate=opts.transfers/time;
+ double mbytes=(txrate*opts.size)/(1024*1024);
+
+ if (!opts.summary) {
+ cout << endl << "Total " << opts.transfers << " transfers of "
+ << opts.size << " bytes in "
+ << time << " seconds." << endl;
+ cout << endl << "Publish transfers/sec: " << endl;
+ pubRates.print(cout);
+ cout << endl << "Subscribe transfers/sec: " << endl;
+ subRates.print(cout);
+ cout << endl
+ << "Total transfers/sec: " << txrate << endl
+ << "Total Mbytes/sec: " << mbytes << endl;
+ }
+ else {
+ cout << pubRates.mean() << "\t"
+ << subRates.mean() << "\t"
+ << txrate << "\t"
+ << mbytes << endl;
+ }
+
+ txrateTotal += txrate;
+ mbytesTotal += mbytes;
+ pubRateTotal += pubRates.mean();
+ subRateTotal += subRates.mean();
}
- else {
- cout << pubRates.mean() << "\t"
- << subRates.mean() << "\t"
- << txrate << "\t"
- << mbytes << endl;
+ if (opts.iterations > 1) {
+ cout << "Averages: "<< endl
+ << (pubRateTotal / opts.iterations) << "\t"
+ << (subRateTotal / opts.iterations) << "\t"
+ << (txrateTotal / opts.iterations) << "\t"
+ << (mbytesTotal / opts.iterations) << endl;
}
}
catch (const std::exception& e) {
@@ -394,29 +438,31 @@ struct PublishThread : public Client {
SubscriptionManager subs(session);
- LocalQueue lq(AckPolicy(opts.ack));
- subs.setFlowControl(1, SubscriptionManager::UNLIMITED, false);
+ LocalQueue lq;
+ subs.setFlowControl(1, SubscriptionManager::UNLIMITED, true);
subs.subscribe(lq, "pub_start");
- expect(lq.pop().getData(), "start");
- AbsTime start=now();
- for (size_t i=0; i<opts.count; i++) {
- // Stamp the iteration into the message data, avoid
- // any heap allocation.
- const_cast<std::string&>(msg.getData()).replace(offset, sizeof(uint32_t), reinterpret_cast<const char*>(&i), sizeof(uint32_t));
- completion = session.messageTransfer(
- arg::destination=destination,
- arg::content=msg,
- arg::confirmMode=opts.confirm);
+ for (size_t j = 0; j < opts.iterations; ++j) {
+ expect(lq.pop().getData(), "start");
+ AbsTime start=now();
+ for (size_t i=0; i<opts.count; i++) {
+ // Stamp the iteration into the message data, avoid
+ // any heap allocation.
+ const_cast<std::string&>(msg.getData()).replace(offset, sizeof(uint32_t),
+ reinterpret_cast<const char*>(&i), sizeof(uint32_t));
+ completion = session.messageTransfer(
+ arg::destination=destination,
+ arg::content=msg,
+ arg::confirmMode=opts.confirm);
+ }
+ if (opts.confirm) completion.sync();
+ AbsTime end=now();
+ double time=secs(start,end);
+
+ // Send result to controller.
+ Message report(lexical_cast<string>(opts.count/time), "pub_done");
+ session.messageTransfer(arg::content=report);
}
- if (opts.confirm) completion.sync();
- AbsTime end=now();
- double time=secs(start,end);
-
- // Send result to controller.
- msg.setData(lexical_cast<string>(opts.count/time));
- msg.getDeliveryProperties().setRoutingKey("pub_done");
- session.messageTransfer(arg::content=msg);
session.close();
}
catch (const std::exception& e) {
@@ -466,34 +512,41 @@ struct SubscribeThread : public Client {
// Notify controller we are ready.
session.messageTransfer(arg::content=Message("ready", "sub_ready"));
- Message msg;
- AbsTime start=now();
- size_t expect=0;
- for (size_t i = 0; i < opts.subQuota; ++i) {
- msg=lq.pop();
- // TODO aconway 2007-11-23: check message order for.
- // multiple publishers. Need an acorray of counters,
- // one per publisher and a publisher ID in the
- // message. Careful not to introduce a lot of overhead
- // here, e.g. no std::map, std::string etc.
- //
- // For now verify order only for a single publisher.
- size_t offset = opts.uniqueData ? 5 /*marker is 'data:'*/ : 0;
- size_t n = *reinterpret_cast<const uint32_t*>(msg.getData().data() + offset);
- if (opts.pubs == 1) {
- if (opts.subs == 1 || opts.mode == FANOUT) verify(n==expect, "==", expect, n);
- else verify(n>=expect, ">=", expect, n);
- expect = n+1;
+
+ for (size_t j = 0; j < opts.iterations; ++j) {
+ if (j > 0) {
+ //need to allocate some more credit
+ session.messageFlow(queue, 0, opts.subQuota);
+ }
+ Message msg;
+ AbsTime start=now();
+ size_t expect=0;
+ for (size_t i = 0; i < opts.subQuota; ++i) {
+ msg=lq.pop();
+ // TODO aconway 2007-11-23: check message order for.
+ // multiple publishers. Need an acorray of counters,
+ // one per publisher and a publisher ID in the
+ // message. Careful not to introduce a lot of overhead
+ // here, e.g. no std::map, std::string etc.
+ //
+ // For now verify order only for a single publisher.
+ size_t offset = opts.uniqueData ? 5 /*marker is 'data:'*/ : 0;
+ size_t n = *reinterpret_cast<const uint32_t*>(msg.getData().data() + offset);
+ if (opts.pubs == 1) {
+ if (opts.subs == 1 || opts.mode == FANOUT) verify(n==expect, "==", expect, n);
+ else verify(n>=expect, ">=", expect, n);
+ expect = n+1;
+ }
}
+ if (opts.ack !=0)
+ msg.acknowledge(); // Cumulative ack for final batch.
+ AbsTime end=now();
+
+ // Report to publisher.
+ Message result(lexical_cast<string>(opts.subQuota/secs(start,end)),
+ "sub_done");
+ session.messageTransfer(arg::content=result);
}
- if (opts.ack !=0)
- msg.acknowledge(); // Cumulative ack for final batch.
- AbsTime end=now();
-
- // Report to publisher.
- Message result(lexical_cast<string>(opts.subQuota/secs(start,end)),
- "sub_done");
- session.messageTransfer(arg::content=result);
session.close();
}
catch (const std::exception& e) {