diff options
author | Gordon Sim <gsim@apache.org> | 2009-08-18 13:11:08 +0000 |
---|---|---|
committer | Gordon Sim <gsim@apache.org> | 2009-08-18 13:11:08 +0000 |
commit | b7c4401d26680d19270b33587195548c3383a4c5 (patch) | |
tree | f1545a39c905b8b266798cee0532903b6ea0743b | |
parent | 06d1a8ad0bbfb4580bd7ba9fca8622d951e7b5c9 (diff) | |
download | qpid-python-b7c4401d26680d19270b33587195548c3383a4c5.tar.gz |
QPID-2053: Allow queue names to be controlled for perftest (this allows multiple concurrent instances to be run). Based on a proposal and patch from Frantisek Reznicek.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@805404 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | qpid/cpp/src/tests/perftest.cpp | 53 |
1 files changed, 31 insertions, 22 deletions
diff --git a/qpid/cpp/src/tests/perftest.cpp b/qpid/cpp/src/tests/perftest.cpp index f55528fcd5..d383e0eb80 100644 --- a/qpid/cpp/src/tests/perftest.cpp +++ b/qpid/cpp/src/tests/perftest.cpp @@ -75,6 +75,7 @@ struct Opts : public TestOptions { // Queue policy uint32_t queueMaxCount; uint64_t queueMaxSize; + std::string baseName; bool queueDurable; // Publisher @@ -106,8 +107,8 @@ struct Opts : public TestOptions { static const std::string helpText; Opts() : - TestOptions(helpText), - setup(false), control(false), publish(false), subscribe(false), + TestOptions(helpText), + setup(false), control(false), publish(false), subscribe(false), baseName("perftest"), pubs(1), count(500000), size(1024), confirm(true), durable(false), uniqueData(false), syncPub(false), subs(1), ack(0), qt(1),singleConnect(false), iterations(1), mode(SHARED), summary(false), @@ -144,6 +145,7 @@ struct Opts : public TestOptions { ("queue-max-count", optValue(queueMaxCount, "N"), "queue policy: count to trigger 'flow to disk'") ("queue-max-size", optValue(queueMaxSize, "N"), "queue policy: accumulated size to trigger 'flow to disk'") + ("base-name", optValue(baseName, "NAME"), "base name used for queues or topics") ("queue-durable", optValue(queueDurable, "N"), "Make queue durable (implied if durable set)") ("interval_sub", optValue(intervalSub, "ms"), ">=0 delay between msg consume") @@ -219,6 +221,13 @@ const std::string Opts::helpText= Opts opts; Connection globalConnection; +std::string fqn(const std::string& name) +{ + ostringstream fqn; + fqn << opts.baseName << "_" << name; + return fqn.str(); +} + struct Client : public Runnable { Connection* connection; Connection localConnection; @@ -257,18 +266,18 @@ struct Setup : public Client { } void run() { - queueInit("pub_start"); - queueInit("pub_done"); - queueInit("sub_ready"); - queueInit("sub_done"); - if (opts.iterations > 1) queueInit("sub_iteration"); + queueInit(fqn("pub_start")); + queueInit(fqn("pub_done")); + queueInit(fqn("sub_ready")); + queueInit(fqn("sub_done")); + if (opts.iterations > 1) queueInit(fqn("sub_iteration")); if (opts.mode==SHARED) { framing::FieldTable settings;//queue policy settings settings.setInt("qpid.max_count", opts.queueMaxCount); settings.setInt("qpid.max_size", opts.queueMaxSize); for (size_t i = 0; i < opts.qt; ++i) { ostringstream qname; - qname << "perftest" << i; + qname << opts.baseName << i; queueInit(qname.str(), opts.durable || opts.queueDurable, settings); } } @@ -384,13 +393,13 @@ struct Controller : public Client { void run() { // Controller try { // Wait for subscribers to be ready. - process(opts.totalSubs, "sub_ready", bind(expect, _1, "ready")); + process(opts.totalSubs, fqn("sub_ready"), bind(expect, _1, "ready")); LocalQueue pubDone; LocalQueue subDone; subs.setFlowControl(0, SubscriptionManager::UNLIMITED, false); - subs.subscribe(pubDone, "pub_done"); - subs.subscribe(subDone, "sub_done"); + subs.subscribe(pubDone, fqn("pub_done")); + subs.subscribe(subDone, fqn("sub_done")); double txrateTotal(0); double mbytesTotal(0); @@ -399,16 +408,16 @@ struct Controller : public Client { for (size_t j = 0; j < opts.iterations; ++j) { AbsTime start=now(); - send(opts.totalPubs, "pub_start", "start"); // Start publishers + send(opts.totalPubs, fqn("pub_start"), "start"); // Start publishers if (j) { - send(opts.totalPubs, "sub_iteration", "next"); // Start subscribers on next iteration + send(opts.totalPubs, fqn("sub_iteration"), "next"); // Start subscribers on next iteration } Stats pubRates; Stats subRates; - process(opts.totalPubs, pubDone, "pub_done", boost::ref(pubRates)); - process(opts.totalSubs, subDone, "sub_done", boost::ref(subRates)); + process(opts.totalPubs, pubDone, fqn("pub_done"), boost::ref(pubRates)); + process(opts.totalSubs, subDone, fqn("sub_done"), boost::ref(subRates)); AbsTime end=now(); @@ -497,7 +506,7 @@ struct PublishThread : public Client { SubscriptionManager subs(session); LocalQueue lq; subs.setFlowControl(1, SubscriptionManager::UNLIMITED, true); - subs.subscribe(lq, "pub_start"); + subs.subscribe(lq, fqn("pub_start")); for (size_t j = 0; j < opts.iterations; ++j) { expect(lq.pop().getData(), "start"); @@ -533,7 +542,7 @@ struct PublishThread : public Client { double time=secs(start,end); // Send result to controller. - Message report(lexical_cast<string>(opts.count/time), "pub_done"); + Message report(lexical_cast<string>(opts.count/time), fqn("pub_done")); session.messageTransfer(arg::content=report, arg::acceptMode=1); if (opts.txPub){ sync(session).txCommit(); @@ -587,7 +596,7 @@ struct SubscribeThread : public Client { LocalQueue lq; Subscription subscription = subs.subscribe(lq, queue, settings); // Notify controller we are ready. - session.messageTransfer(arg::content=Message("ready", "sub_ready"), arg::acceptMode=1); + session.messageTransfer(arg::content=Message("ready", fqn("sub_ready")), arg::acceptMode=1); if (opts.txSub) { if (opts.commitAsync) session.txCommit(); else sync(session).txCommit(); @@ -595,13 +604,13 @@ struct SubscribeThread : public Client { LocalQueue iterationControl; if (opts.iterations > 1) { - subs.subscribe(iterationControl, "sub_iteration", SubscriptionSettings(FlowControl::messageCredit(0))); + subs.subscribe(iterationControl, fqn("sub_iteration"), SubscriptionSettings(FlowControl::messageCredit(0))); } for (size_t j = 0; j < opts.iterations; ++j) { if (j > 0) { //need to wait here until all subs are done - session.messageFlow("sub_iteration", 0, 1); + session.messageFlow(fqn("sub_iteration"), 0, 1); iterationControl.pop(); //need to allocate some more credit for subscription @@ -643,7 +652,7 @@ struct SubscribeThread : public Client { // Report to publisher. Message result(lexical_cast<string>(opts.subQuota/secs(start,end)), - "sub_done"); + fqn("sub_done")); session.messageTransfer(arg::content=result, arg::acceptMode=1); if (opts.txSub) sync(session).txCommit(); } @@ -680,7 +689,7 @@ int main(int argc, char** argv) { // Start pubs/subs for each queue/topic. for (size_t i = 0; i < opts.qt; ++i) { ostringstream key; - key << "perftest" << i; // Queue or topic name. + key << opts.baseName << i; // Queue or topic name. if (opts.publish) { size_t n = singleProcess ? opts.pubs : 1; for (size_t j = 0; j < n; ++j) { |