summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2009-08-18 13:11:08 +0000
committerGordon Sim <gsim@apache.org>2009-08-18 13:11:08 +0000
commitb7c4401d26680d19270b33587195548c3383a4c5 (patch)
treef1545a39c905b8b266798cee0532903b6ea0743b
parent06d1a8ad0bbfb4580bd7ba9fca8622d951e7b5c9 (diff)
downloadqpid-python-b7c4401d26680d19270b33587195548c3383a4c5.tar.gz
QPID-2053: Allow queue names to be controlled for perftest (this allows multiple concurrent instances to be run). Based on a proposal and patch from Frantisek Reznicek.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@805404 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/src/tests/perftest.cpp53
1 files changed, 31 insertions, 22 deletions
diff --git a/qpid/cpp/src/tests/perftest.cpp b/qpid/cpp/src/tests/perftest.cpp
index f55528fcd5..d383e0eb80 100644
--- a/qpid/cpp/src/tests/perftest.cpp
+++ b/qpid/cpp/src/tests/perftest.cpp
@@ -75,6 +75,7 @@ struct Opts : public TestOptions {
// Queue policy
uint32_t queueMaxCount;
uint64_t queueMaxSize;
+ std::string baseName;
bool queueDurable;
// Publisher
@@ -106,8 +107,8 @@ struct Opts : public TestOptions {
static const std::string helpText;
Opts() :
- TestOptions(helpText),
- setup(false), control(false), publish(false), subscribe(false),
+ TestOptions(helpText),
+ setup(false), control(false), publish(false), subscribe(false), baseName("perftest"),
pubs(1), count(500000), size(1024), confirm(true), durable(false), uniqueData(false), syncPub(false),
subs(1), ack(0),
qt(1),singleConnect(false), iterations(1), mode(SHARED), summary(false),
@@ -144,6 +145,7 @@ struct Opts : public TestOptions {
("queue-max-count", optValue(queueMaxCount, "N"), "queue policy: count to trigger 'flow to disk'")
("queue-max-size", optValue(queueMaxSize, "N"), "queue policy: accumulated size to trigger 'flow to disk'")
+ ("base-name", optValue(baseName, "NAME"), "base name used for queues or topics")
("queue-durable", optValue(queueDurable, "N"), "Make queue durable (implied if durable set)")
("interval_sub", optValue(intervalSub, "ms"), ">=0 delay between msg consume")
@@ -219,6 +221,13 @@ const std::string Opts::helpText=
Opts opts;
Connection globalConnection;
+std::string fqn(const std::string& name)
+{
+ ostringstream fqn;
+ fqn << opts.baseName << "_" << name;
+ return fqn.str();
+}
+
struct Client : public Runnable {
Connection* connection;
Connection localConnection;
@@ -257,18 +266,18 @@ struct Setup : public Client {
}
void run() {
- queueInit("pub_start");
- queueInit("pub_done");
- queueInit("sub_ready");
- queueInit("sub_done");
- if (opts.iterations > 1) queueInit("sub_iteration");
+ queueInit(fqn("pub_start"));
+ queueInit(fqn("pub_done"));
+ queueInit(fqn("sub_ready"));
+ queueInit(fqn("sub_done"));
+ if (opts.iterations > 1) queueInit(fqn("sub_iteration"));
if (opts.mode==SHARED) {
framing::FieldTable settings;//queue policy settings
settings.setInt("qpid.max_count", opts.queueMaxCount);
settings.setInt("qpid.max_size", opts.queueMaxSize);
for (size_t i = 0; i < opts.qt; ++i) {
ostringstream qname;
- qname << "perftest" << i;
+ qname << opts.baseName << i;
queueInit(qname.str(), opts.durable || opts.queueDurable, settings);
}
}
@@ -384,13 +393,13 @@ struct Controller : public Client {
void run() { // Controller
try {
// Wait for subscribers to be ready.
- process(opts.totalSubs, "sub_ready", bind(expect, _1, "ready"));
+ process(opts.totalSubs, fqn("sub_ready"), bind(expect, _1, "ready"));
LocalQueue pubDone;
LocalQueue subDone;
subs.setFlowControl(0, SubscriptionManager::UNLIMITED, false);
- subs.subscribe(pubDone, "pub_done");
- subs.subscribe(subDone, "sub_done");
+ subs.subscribe(pubDone, fqn("pub_done"));
+ subs.subscribe(subDone, fqn("sub_done"));
double txrateTotal(0);
double mbytesTotal(0);
@@ -399,16 +408,16 @@ struct Controller : public Client {
for (size_t j = 0; j < opts.iterations; ++j) {
AbsTime start=now();
- send(opts.totalPubs, "pub_start", "start"); // Start publishers
+ send(opts.totalPubs, fqn("pub_start"), "start"); // Start publishers
if (j) {
- send(opts.totalPubs, "sub_iteration", "next"); // Start subscribers on next iteration
+ send(opts.totalPubs, fqn("sub_iteration"), "next"); // Start subscribers on next iteration
}
Stats pubRates;
Stats subRates;
- process(opts.totalPubs, pubDone, "pub_done", boost::ref(pubRates));
- process(opts.totalSubs, subDone, "sub_done", boost::ref(subRates));
+ process(opts.totalPubs, pubDone, fqn("pub_done"), boost::ref(pubRates));
+ process(opts.totalSubs, subDone, fqn("sub_done"), boost::ref(subRates));
AbsTime end=now();
@@ -497,7 +506,7 @@ struct PublishThread : public Client {
SubscriptionManager subs(session);
LocalQueue lq;
subs.setFlowControl(1, SubscriptionManager::UNLIMITED, true);
- subs.subscribe(lq, "pub_start");
+ subs.subscribe(lq, fqn("pub_start"));
for (size_t j = 0; j < opts.iterations; ++j) {
expect(lq.pop().getData(), "start");
@@ -533,7 +542,7 @@ struct PublishThread : public Client {
double time=secs(start,end);
// Send result to controller.
- Message report(lexical_cast<string>(opts.count/time), "pub_done");
+ Message report(lexical_cast<string>(opts.count/time), fqn("pub_done"));
session.messageTransfer(arg::content=report, arg::acceptMode=1);
if (opts.txPub){
sync(session).txCommit();
@@ -587,7 +596,7 @@ struct SubscribeThread : public Client {
LocalQueue lq;
Subscription subscription = subs.subscribe(lq, queue, settings);
// Notify controller we are ready.
- session.messageTransfer(arg::content=Message("ready", "sub_ready"), arg::acceptMode=1);
+ session.messageTransfer(arg::content=Message("ready", fqn("sub_ready")), arg::acceptMode=1);
if (opts.txSub) {
if (opts.commitAsync) session.txCommit();
else sync(session).txCommit();
@@ -595,13 +604,13 @@ struct SubscribeThread : public Client {
LocalQueue iterationControl;
if (opts.iterations > 1) {
- subs.subscribe(iterationControl, "sub_iteration", SubscriptionSettings(FlowControl::messageCredit(0)));
+ subs.subscribe(iterationControl, fqn("sub_iteration"), SubscriptionSettings(FlowControl::messageCredit(0)));
}
for (size_t j = 0; j < opts.iterations; ++j) {
if (j > 0) {
//need to wait here until all subs are done
- session.messageFlow("sub_iteration", 0, 1);
+ session.messageFlow(fqn("sub_iteration"), 0, 1);
iterationControl.pop();
//need to allocate some more credit for subscription
@@ -643,7 +652,7 @@ struct SubscribeThread : public Client {
// Report to publisher.
Message result(lexical_cast<string>(opts.subQuota/secs(start,end)),
- "sub_done");
+ fqn("sub_done"));
session.messageTransfer(arg::content=result, arg::acceptMode=1);
if (opts.txSub) sync(session).txCommit();
}
@@ -680,7 +689,7 @@ int main(int argc, char** argv) {
// Start pubs/subs for each queue/topic.
for (size_t i = 0; i < opts.qt; ++i) {
ostringstream key;
- key << "perftest" << i; // Queue or topic name.
+ key << opts.baseName << i; // Queue or topic name.
if (opts.publish) {
size_t n = singleProcess ? opts.pubs : 1;
for (size_t j = 0; j < n; ++j) {