diff options
Diffstat (limited to 'cpp')
-rwxr-xr-x | cpp/src/tests/perfdist | 33 | ||||
-rw-r--r-- | cpp/src/tests/perftest.cpp | 94 |
2 files changed, 91 insertions, 36 deletions
diff --git a/cpp/src/tests/perfdist b/cpp/src/tests/perfdist new file mode 100755 index 0000000000..07338f6822 --- /dev/null +++ b/cpp/src/tests/perfdist @@ -0,0 +1,33 @@ +#!/bin/bash +# +echo $_ +usage() { +cat <<EOF +Distributed perftest with ssh. +Arguments before -- are passed to perftest. +Arguments after -- are treated as host names. +Publisher runs on first host. One listener runs on each +Remaining host. To run multiple liseners on a host, list it more than once. +Do not pass --consumers, --publish or --listen options, they are computed +from the host list. +EOF +exit 1 +} + +while test "$HOSTS" != "$*"; do + case $1 in + --consumers|--publish|--listen) usage ;; + --) shift; PUB="$1" ; shift ; HOSTS="$*" ;; + *) ARGS="$ARGS $1"; shift ;; + esac +done +test -n "$HOSTS" || { echo "No -- found"; usage; } +N=`echo $HOSTS | wc -w` +PERFTEST=`PATH=$PWD:$PATH which perftest` +$PERFTEST --purge $ARGS +ssh $PUB $PERFTEST $ARGS --publish --consumers $N& +for h in $HOSTS; do + ssh $h $PERFTEST $ARGS --listen& +done +wait + diff --git a/cpp/src/tests/perftest.cpp b/cpp/src/tests/perftest.cpp index 4db90fd7be..8d970a0a6f 100644 --- a/cpp/src/tests/perftest.cpp +++ b/cpp/src/tests/perftest.cpp @@ -39,26 +39,32 @@ struct Opts : public TestOptions { bool listen; bool publish; + bool purge; int count; int size; bool durable; int consumers; std::string mode; int autoAck; + bool summary; Opts() : - listen(false), publish(false), count(500000), size(64), consumers(1), - mode("shared"), autoAck(100) + listen(false), publish(false), purge(false), + count(500000), size(64), consumers(1), + mode("shared"), autoAck(100), + summary(false) { addOptions() ("listen", optValue(listen), "Consume messages.") ("publish", optValue(publish), "Produce messages.") + ("purge", optValue(purge), "Purge shared queues.") ("count", optValue(count, "N"), "Messages to send.") ("size", optValue(size, "BYTES"), "Size of messages.") ("durable", optValue(durable, "N"), "Publish messages as durable.") ("consumers", optValue(consumers, "N"), "Number of consumers.") ("mode", optValue(mode, "shared|fanout|topic"), "consume mode") - ("auto-ack", optValue(autoAck, "N"), "ack every N messages."); + ("auto-ack", optValue(autoAck, "N"), "ack every N messages.") + ("summary,s", optValue(summary), "summary output only"); } }; @@ -71,16 +77,19 @@ struct PublishThread : public Runnable { Thread thread; void run(); }; // Create and purge the shared queues void setup() { - cout << "Create shared queues" << endl; Connection connection; opts.open(connection); Session_0_10 session = connection.newSession(); session.setSynchronous(true); // Make sure this is all completed. session.queueDeclare(arg::queue="control"); // Control queue - session.queuePurge(arg::queue="control"); + if (opts.purge) { + if (!opts.summary) cout << "Purging shared queues" << endl; + session.queuePurge(arg::queue="control"); + } if (mode==SHARED) { session.queueDeclare(arg::queue="perftest", arg::durable=opts.durable); // Shared data queue - session.queuePurge(arg::queue="perftest"); + if (opts.purge) + session.queuePurge(arg::queue="perftest"); } session.close(); connection.close(); @@ -93,8 +102,8 @@ int main(int argc, char** argv) { else if (opts.mode=="fanout") mode = FANOUT; else if (opts.mode=="topic") mode = TOPIC; else throw Exception("Invalid mode"); - if (!opts.listen && !opts.publish) - opts.listen = opts.publish = true; + if (!opts.listen && !opts.publish && !opts.purge) + opts.listen = opts.publish = opts.purge = true; setup(); std::vector<ListenThread> listen(opts.consumers); PublishThread publish; @@ -141,15 +150,15 @@ void PublishThread::run() { Session_0_10 session = connection.newSession(); // Wait for consumers. - cout << "Publisher wating for consumers " << flush; + if (!opts.summary) cout << "Waiting for consumers ready " << flush; SubscriptionManager subs(session); LocalQueue control; subs.subscribe(control, "control"); for (int i = 0; i < opts.consumers; ++i) { - cout << "." << flush; + if (!opts.summary) cout << "." << flush; expect(control.pop().getData(), "ready"); } - cout << endl; + if (!opts.summary) cout << endl; // Create test message size_t msgSize=max(opts.size, 32); @@ -159,14 +168,14 @@ void PublishThread::run() { msg.getDeliveryProperties().setDeliveryMode(framing::PERSISTENT); // Time sending message. AbsTime start=now(); - cout << "Publishing " << opts.count << " messages " << flush; + if (!opts.summary) cout << "Publishing " << opts.count << " messages " << flush; for (int i=0; i<opts.count; i++) { sprintf(msgBuf, "%d", i); session.messageTransfer(arg::destination=exchange(), arg::content=msg); - if ((i%10000)==0) cout << "." << flush; + if (!opts.summary && (i%10000)==0) cout << "." << flush; } - cout << " done." << endl; + if (!opts.summary) cout << " done." << endl; msg.setData("done"); // Send done messages. if (mode==SHARED) for (int i = 0; i < opts.consumers; ++i) @@ -176,18 +185,24 @@ void PublishThread::run() { AbsTime end=now(); // Report - cout << endl; - cout << "publish count:" << opts.count << endl; - cout << "publish secs:" << secs(start,end) << endl; - cout << "publish rate:" << (opts.count)/secs(start,end) << endl; + double publish_rate=(opts.count)/secs(start,end); + if (!opts.summary) + cout << endl + << "publish count:" << opts.count << endl + << "publish secs:" << secs(start,end) << endl + << "publish rate:" << publish_rate << endl; + double consume_rate = 0; // Average rate for consumers. // Wait for consumer(s) to finish. - cout << "Publisher wating for consumer reports. " << endl; + if (!opts.summary) cout << "Waiting for consumers done " << endl; for (int i = 0; i < opts.consumers; ++i) { string report=control.pop().getData(); - if (report.find("consume") != 0) - throw Exception("Expected consumer report, got: "+report); - cout << endl << report; + if (!opts.summary) + cout << endl << report; + else { + double rate=boost::lexical_cast<double>(report); + consume_rate += rate/opts.consumers; + } } end=now(); @@ -197,11 +212,19 @@ void PublishThread::run() { transfers=2*opts.count; else // sent once, received N times. transfers=opts.count*(opts.consumers + 1); - - cout << endl - << "total transfers:" << transfers << endl - << "total secs:" << secs(start, end) << endl - << "total transfers/sec:" << transfers/secs(start, end) << endl; + double total_rate=transfers/secs(start, end); + if (opts.summary) + cout << opts.mode << '(' << opts.count + << ':' << opts.consumers << ')' + << '\t' << publish_rate + << '\t' << consume_rate + << '\t' << total_rate + << endl; + else + cout << endl + << "total transfers:" << transfers << endl + << "total secs:" << secs(start, end) << endl + << "total rate:" << total_rate << endl; connection.close(); } @@ -238,23 +261,22 @@ void ListenThread::run() { int consumed=0; AbsTime start=now(); Message msg; - if (!opts.publish) - cout << "Consuming " << flush; while ((msg=consume.pop()).getData() != "done") { ++consumed; - if (!opts.publish && (consumed%10000) == 0) - cout << "." << flush; } - if (!opts.publish) - cout << endl; msg.acknowledge(); // Ack all outstanding messages. AbsTime end=now(); // Report to publisher. ostringstream report; - report << "consume count: " << consumed << endl - << "consume secs: " << secs(start, end) << endl - << "consume rate: " << consumed/secs(start,end) << endl; + double consume_rate=consumed/secs(start,end); + if (opts.summary) + report << consume_rate; + else + report << "consume count: " << consumed << endl + << "consume secs: " << secs(start, end) << endl + << "consume rate: " << consume_rate << endl; + session.messageTransfer(arg::content=Message(report.str(), "control")); connection.close(); } |