summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/tests/qpid-perftest.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src/tests/qpid-perftest.cpp')
-rw-r--r--qpid/cpp/src/tests/qpid-perftest.cpp746
1 files changed, 746 insertions, 0 deletions
diff --git a/qpid/cpp/src/tests/qpid-perftest.cpp b/qpid/cpp/src/tests/qpid-perftest.cpp
new file mode 100644
index 0000000000..8a5cf05775
--- /dev/null
+++ b/qpid/cpp/src/tests/qpid-perftest.cpp
@@ -0,0 +1,746 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "TestOptions.h"
+
+#include "qpid/client/AsyncSession.h"
+#include "qpid/client/SubscriptionManager.h"
+#include "qpid/client/Connection.h"
+#include "qpid/client/Completion.h"
+#include "qpid/client/Message.h"
+#include "qpid/framing/FieldTable.h"
+#include "qpid/sys/Time.h"
+#include "qpid/sys/Thread.h"
+
+#include <boost/lexical_cast.hpp>
+#include <boost/bind.hpp>
+#include <boost/function.hpp>
+#include <boost/ptr_container/ptr_vector.hpp>
+
+#include <iostream>
+#include <sstream>
+#include <numeric>
+#include <algorithm>
+#include <math.h>
+
+
+using namespace std;
+using namespace qpid;
+using namespace client;
+using namespace sys;
+using boost::lexical_cast;
+using boost::bind;
+
+namespace qpid {
+namespace tests {
+
+enum Mode { SHARED, FANOUT, TOPIC };
+const char* modeNames[] = { "shared", "fanout", "topic" };
+
+// istream/ostream ops so Options can read/display Mode.
+istream& operator>>(istream& in, Mode& mode) {
+ string s;
+ in >> s;
+ int i = find(modeNames, modeNames+3, s) - modeNames;
+ if (i >= 3) throw Exception("Invalid mode: "+s);
+ mode = Mode(i);
+ return in;
+}
+
+ostream& operator<<(ostream& out, Mode mode) {
+ return out << modeNames[mode];
+}
+
+
+struct Opts : public TestOptions {
+
+ // Actions
+ bool setup, control, publish, subscribe;
+
+ // Queue policy
+ uint32_t queueMaxCount;
+ uint64_t queueMaxSize;
+ std::string baseName;
+ bool queueDurable;
+
+ // Publisher
+ size_t pubs;
+ size_t count ;
+ size_t size;
+ bool confirm;
+ bool durable;
+ bool uniqueData;
+ bool syncPub;
+
+ // Subscriber
+ size_t subs;
+ size_t ack;
+
+ // General
+ size_t qt;
+ bool singleConnect;
+ size_t iterations;
+ Mode mode;
+ bool summary;
+ uint32_t intervalSub;
+ uint32_t intervalPub;
+ size_t tx;
+ size_t txPub;
+ size_t txSub;
+ bool commitAsync;
+
+ static const std::string helpText;
+
+ Opts() :
+ TestOptions(helpText),
+ setup(false), control(false), publish(false), subscribe(false), baseName("qpid-perftest"),
+ pubs(1), count(500000), size(1024), confirm(true), durable(false), uniqueData(false), syncPub(false),
+ subs(1), ack(0),
+ qt(1),singleConnect(false), iterations(1), mode(SHARED), summary(false),
+ intervalSub(0), intervalPub(0), tx(0), txPub(0), txSub(0), commitAsync(false)
+ {
+ addOptions()
+ ("setup", optValue(setup), "Create shared queues.")
+ ("control", optValue(control), "Run test, print report.")
+ ("publish", optValue(publish), "Publish messages.")
+ ("subscribe", optValue(subscribe), "Subscribe for messages.")
+
+ ("mode", optValue(mode, "shared|fanout|topic"), "Test mode."
+ "\nshared: --qt queues, --npubs publishers and --nsubs subscribers per queue.\n"
+ "\nfanout: --npubs publishers, --nsubs subscribers, fanout exchange."
+ "\ntopic: --qt topics, --npubs publishers and --nsubs subscribers per topic.\n")
+
+ ("npubs", optValue(pubs, "N"), "Create N publishers.")
+ ("count", optValue(count, "N"), "Each publisher sends N messages.")
+ ("size", optValue(size, "BYTES"), "Size of messages in bytes.")
+ ("pub-confirm", optValue(confirm, "yes|no"), "Publisher use confirm-mode.")
+ ("durable", optValue(durable, "yes|no"), "Publish messages as durable.")
+ ("unique-data", optValue(uniqueData, "yes|no"), "Make data for each message unique.")
+ ("sync-publish", optValue(syncPub, "yes|no"), "Wait for confirmation of each message before sending the next one.")
+
+ ("nsubs", optValue(subs, "N"), "Create N subscribers.")
+ ("sub-ack", optValue(ack, "N"), "N>0: Subscriber acks batches of N.\n"
+ "N==0: Subscriber uses unconfirmed mode")
+
+ ("qt", optValue(qt, "N"), "Create N queues or topics.")
+ ("single-connection", optValue(singleConnect, "yes|no"), "Use one connection for multiple sessions.")
+
+ ("iterations", optValue(iterations, "N"), "Desired number of iterations of the test.")
+ ("summary,s", optValue(summary), "Summary output: pubs/sec subs/sec transfers/sec Mbytes/sec")
+
+ ("queue-max-count", optValue(queueMaxCount, "N"), "queue policy: count to trigger 'flow to disk'")
+ ("queue-max-size", optValue(queueMaxSize, "N"), "queue policy: accumulated size to trigger 'flow to disk'")
+ ("base-name", optValue(baseName, "NAME"), "base name used for queues or topics")
+ ("queue-durable", optValue(queueDurable, "N"), "Make queue durable (implied if durable set)")
+
+ ("interval_sub", optValue(intervalSub, "ms"), ">=0 delay between msg consume")
+ ("interval_pub", optValue(intervalPub, "ms"), ">=0 delay between msg publish")
+
+ ("tx", optValue(tx, "N"), "if non-zero, the transaction batch size for publishing and consuming")
+ ("pub-tx", optValue(txPub, "N"), "if non-zero, the transaction batch size for publishing")
+ ("async-commit", optValue(commitAsync, "yes|no"), "Don't wait for completion of commit")
+ ("sub-tx", optValue(txSub, "N"), "if non-zero, the transaction batch size for consuming");
+ }
+
+ // Computed values
+ size_t totalPubs;
+ size_t totalSubs;
+ size_t transfers;
+ size_t subQuota;
+
+ void parse(int argc, char** argv) {
+ TestOptions::parse(argc, argv);
+ switch (mode) {
+ case SHARED:
+ if (count % subs) {
+ count += subs - (count % subs);
+ cout << "WARNING: Adjusted --count to " << count
+ << " the nearest multiple of --nsubs" << endl;
+ }
+ totalPubs = pubs*qt;
+ totalSubs = subs*qt;
+ subQuota = (pubs*count)/subs;
+ break;
+ case FANOUT:
+ if (qt != 1) cerr << "WARNING: Fanout mode, ignoring --qt="
+ << qt << endl;
+ qt=1;
+ totalPubs = pubs;
+ totalSubs = subs;
+ subQuota = totalPubs*count;
+ break;
+ case TOPIC:
+ totalPubs = pubs*qt;
+ totalSubs = subs*qt;
+ subQuota = pubs*count;
+ break;
+ }
+ transfers=(totalPubs*count) + (totalSubs*subQuota);
+ if (tx) {
+ if (txPub) {
+ cerr << "WARNING: Using overriden tx value for publishers: " << txPub << std::endl;
+ } else {
+ txPub = tx;
+ }
+ if (txSub) {
+ cerr << "WARNING: Using overriden tx value for subscribers: " << txSub << std::endl;
+ } else {
+ txSub = tx;
+ }
+ }
+ }
+};
+
+const std::string Opts::helpText=
+"There are two ways to use qpid-perftest: single process or multi-process.\n\n"
+"If none of the --setup, --publish, --subscribe or --control options\n"
+"are given qpid-perftest will run a single-process test.\n"
+"For a multi-process test first run:\n"
+" qpid-perftest --setup <other options>\n"
+"and wait for it to complete. The remaining process should run concurrently::\n"
+"Run --npubs times: qpid-perftest --publish <other options>\n"
+"Run --nsubs times: qpid-perftest --subscribe <other options>\n"
+"Run once: qpid-perftest --control <other options>\n"
+"Note the <other options> must be identical for all processes.\n";
+
+Opts opts;
+Connection globalConnection;
+
+std::string fqn(const std::string& name)
+{
+ ostringstream fqn;
+ fqn << opts.baseName << "_" << name;
+ return fqn.str();
+}
+
+struct Client : public Runnable {
+ Connection* connection;
+ Connection localConnection;
+ AsyncSession session;
+ Thread thread;
+
+ Client() {
+ if (opts.singleConnect){
+ connection = &globalConnection;
+ if (!globalConnection.isOpen()) opts.open(globalConnection);
+ }else{
+ connection = &localConnection;
+ opts.open(localConnection);
+ }
+ session = connection->newSession();
+ }
+
+ ~Client() {
+ try {
+ if (connection->isOpen()) {
+ session.close();
+ connection->close();
+ }
+ } catch (const std::exception& e) {
+ std::cerr << "Error in shutdown: " << e.what() << std::endl;
+ }
+ }
+};
+
+struct Setup : public Client {
+
+ void queueInit(string name, bool durable=false, const framing::FieldTable& settings=framing::FieldTable()) {
+ session.queueDeclare(arg::queue=name, arg::durable=durable, arg::arguments=settings);
+ session.queuePurge(arg::queue=name);
+ session.sync();
+ }
+
+ void run() {
+ queueInit(fqn("pub_start"));
+ queueInit(fqn("pub_done"));
+ queueInit(fqn("sub_ready"));
+ queueInit(fqn("sub_done"));
+ if (opts.iterations > 1) queueInit(fqn("sub_iteration"));
+ if (opts.mode==SHARED) {
+ framing::FieldTable settings;//queue policy settings
+ settings.setInt("qpid.max_count", opts.queueMaxCount);
+ settings.setInt("qpid.max_size", opts.queueMaxSize);
+ for (size_t i = 0; i < opts.qt; ++i) {
+ ostringstream qname;
+ qname << opts.baseName << i;
+ queueInit(qname.str(), opts.durable || opts.queueDurable, settings);
+ }
+ }
+ }
+};
+
+void expect(string actual, string expect) {
+ if (expect != actual)
+ throw Exception("Expecting "+expect+" but received "+actual);
+
+}
+
+double secs(Duration d) { return double(d)/TIME_SEC; }
+double secs(AbsTime start, AbsTime finish) {
+ return secs(Duration(start,finish));
+}
+
+
+// Collect rates & print stats.
+class Stats {
+ vector<double> values;
+ double sum;
+
+ public:
+ Stats() : sum(0) {}
+
+ // Functor to collect rates.
+ void operator()(const string& data) {
+ try {
+ double d=lexical_cast<double>(data);
+ values.push_back(d);
+ sum += d;
+ } catch (const std::exception&) {
+ throw Exception("Bad report: "+data);
+ }
+ }
+
+ double mean() const {
+ return sum/values.size();
+ }
+
+ double stdev() const {
+ if (values.size() <= 1) return 0;
+ double avg = mean();
+ double ssq = 0;
+ for (vector<double>::const_iterator i = values.begin();
+ i != values.end(); ++i) {
+ double x=*i;
+ x -= avg;
+ ssq += x*x;
+ }
+ return sqrt(ssq/(values.size()-1));
+ }
+
+ ostream& print(ostream& out) {
+ ostream_iterator<double> o(out, "\n");
+ copy(values.begin(), values.end(), o);
+ out << "Average: " << mean();
+ if (values.size() > 1)
+ out << " (std.dev. " << stdev() << ")";
+ return out << endl;
+ }
+};
+
+
+// Manage control queues, collect and print reports.
+struct Controller : public Client {
+
+ SubscriptionManager subs;
+
+ Controller() : subs(session) {}
+
+ /** Process messages from queue by applying a functor. */
+ void process(size_t n, string queue,
+ boost::function<void (const string&)> msgFn)
+ {
+ if (!opts.summary)
+ cout << "Processing " << n << " messages from "
+ << queue << " " << flush;
+ LocalQueue lq;
+ subs.setFlowControl(n, SubscriptionManager::UNLIMITED, false);
+ subs.subscribe(lq, queue);
+ for (size_t i = 0; i < n; ++i) {
+ if (!opts.summary) cout << "." << flush;
+ msgFn(lq.pop().getData());
+ }
+ if (!opts.summary) cout << " done." << endl;
+ }
+
+ void process(size_t n, LocalQueue lq, string queue,
+ boost::function<void (const string&)> msgFn)
+ {
+ session.messageFlow(queue, 0, n);
+ if (!opts.summary)
+ cout << "Processing " << n << " messages from "
+ << queue << " " << flush;
+ for (size_t i = 0; i < n; ++i) {
+ if (!opts.summary) cout << "." << flush;
+ msgFn(lq.pop().getData());
+ }
+ if (!opts.summary) cout << " done." << endl;
+ }
+
+ void send(size_t n, string queue, string data) {
+ if (!opts.summary)
+ cout << "Sending " << data << " " << n << " times to " << queue
+ << endl;
+ Message msg(data, queue);
+ for (size_t i = 0; i < n; ++i)
+ session.messageTransfer(arg::content=msg, arg::acceptMode=1);
+ }
+
+ void run() { // Controller
+ try {
+ // Wait for subscribers to be ready.
+ process(opts.totalSubs, fqn("sub_ready"), bind(expect, _1, "ready"));
+
+ LocalQueue pubDone;
+ LocalQueue subDone;
+ subs.setFlowControl(0, SubscriptionManager::UNLIMITED, false);
+ subs.subscribe(pubDone, fqn("pub_done"));
+ subs.subscribe(subDone, fqn("sub_done"));
+
+ double txrateTotal(0);
+ double mbytesTotal(0);
+ double pubRateTotal(0);
+ double subRateTotal(0);
+
+ for (size_t j = 0; j < opts.iterations; ++j) {
+ AbsTime start=now();
+ send(opts.totalPubs, fqn("pub_start"), "start"); // Start publishers
+ if (j) {
+ send(opts.totalPubs, fqn("sub_iteration"), "next"); // Start subscribers on next iteration
+ }
+
+ Stats pubRates;
+ Stats subRates;
+
+ process(opts.totalPubs, pubDone, fqn("pub_done"), boost::ref(pubRates));
+ process(opts.totalSubs, subDone, fqn("sub_done"), boost::ref(subRates));
+
+ AbsTime end=now();
+ double time=secs(start, end);
+ if (time <= 0.0) {
+ throw Exception("ERROR: Test completed in zero seconds. Try again with a larger message count.");
+ }
+ double txrate=opts.transfers/time;
+ double mbytes=(txrate*opts.size)/(1024*1024);
+
+ if (!opts.summary) {
+ cout << endl << "Total " << opts.transfers << " transfers of "
+ << opts.size << " bytes in "
+ << time << " seconds." << endl;
+ cout << endl << "Publish transfers/sec: " << endl;
+ pubRates.print(cout);
+ cout << endl << "Subscribe transfers/sec: " << endl;
+ subRates.print(cout);
+ cout << endl
+ << "Total transfers/sec: " << txrate << endl
+ << "Total Mbytes/sec: " << mbytes << endl;
+ }
+ else {
+ cout << pubRates.mean() << "\t"
+ << subRates.mean() << "\t"
+ << txrate << "\t"
+ << mbytes << endl;
+ }
+
+ txrateTotal += txrate;
+ mbytesTotal += mbytes;
+ pubRateTotal += pubRates.mean();
+ subRateTotal += subRates.mean();
+ }
+ if (opts.iterations > 1) {
+ cout << "Averages: "<< endl
+ << (pubRateTotal / opts.iterations) << "\t"
+ << (subRateTotal / opts.iterations) << "\t"
+ << (txrateTotal / opts.iterations) << "\t"
+ << (mbytesTotal / opts.iterations) << endl;
+ }
+ }
+ catch (const std::exception& e) {
+ cout << "Controller exception: " << e.what() << endl;
+ }
+ }
+};
+
+
+struct PublishThread : public Client {
+ string destination;
+ string routingKey;
+
+ PublishThread() {};
+
+ PublishThread(string key, string dest=string()) {
+ destination=dest;
+ routingKey=key;
+ }
+
+ void run() { // Publisher
+ try {
+ string data;
+ size_t offset(0);
+ if (opts.uniqueData) {
+ offset = 5;
+ data += "data:";//marker (requested for latency testing tool scripts)
+ data += string(sizeof(size_t), 'X');//space for seq no
+ data += session.getId().str();
+ if (opts.size > data.size()) {
+ data += string(opts.size - data.size(), 'X');
+ } else if(opts.size < data.size()) {
+ cout << "WARNING: Increased --size to " << data.size()
+ << " to honour --unique-data" << endl;
+ }
+ } else {
+ size_t msgSize=max(opts.size, sizeof(size_t));
+ data = string(msgSize, 'X');
+ }
+
+ Message msg(data, routingKey);
+ if (opts.durable)
+ msg.getDeliveryProperties().setDeliveryMode(framing::PERSISTENT);
+
+
+ if (opts.txPub){
+ session.txSelect();
+ }
+ SubscriptionManager subs(session);
+ LocalQueue lq;
+ subs.setFlowControl(1, SubscriptionManager::UNLIMITED, true);
+ subs.subscribe(lq, fqn("pub_start"));
+
+ for (size_t j = 0; j < opts.iterations; ++j) {
+ expect(lq.pop().getData(), "start");
+ AbsTime start=now();
+ for (size_t i=0; i<opts.count; i++) {
+ // Stamp the iteration into the message data, avoid
+ // any heap allocation.
+ const_cast<std::string&>(msg.getData()).replace(offset, sizeof(size_t),
+ reinterpret_cast<const char*>(&i), sizeof(size_t));
+ if (opts.syncPub) {
+ sync(session).messageTransfer(
+ arg::destination=destination,
+ arg::content=msg,
+ arg::acceptMode=1);
+ } else {
+ session.messageTransfer(
+ arg::destination=destination,
+ arg::content=msg,
+ arg::acceptMode=1);
+ }
+ if (opts.txPub && ((i+1) % opts.txPub == 0)){
+ if (opts.commitAsync){
+ session.txCommit();
+ } else {
+ sync(session).txCommit();
+ }
+ }
+ if (opts.intervalPub)
+ qpid::sys::usleep(opts.intervalPub*1000);
+ }
+ if (opts.confirm) session.sync();
+ AbsTime end=now();
+ double time=secs(start,end);
+ if (time <= 0.0) {
+ throw Exception("ERROR: Test completed in zero seconds. Try again with a larger message count.");
+ }
+
+ // Send result to controller.
+ Message report(lexical_cast<string>(opts.count/time), fqn("pub_done"));
+ session.messageTransfer(arg::content=report, arg::acceptMode=1);
+ if (opts.txPub){
+ sync(session).txCommit();
+ }
+ }
+ session.close();
+ }
+ catch (const std::exception& e) {
+ cout << "PublishThread exception: " << e.what() << endl;
+ }
+ }
+};
+
+struct SubscribeThread : public Client {
+
+ string queue;
+
+ SubscribeThread() {}
+
+ SubscribeThread(string q) { queue = q; }
+
+ SubscribeThread(string key, string ex) {
+ queue=session.getId().str(); // Unique name.
+ session.queueDeclare(arg::queue=queue,
+ arg::exclusive=true,
+ arg::autoDelete=true,
+ arg::durable=opts.durable);
+ session.exchangeBind(arg::queue=queue,
+ arg::exchange=ex,
+ arg::bindingKey=key);
+ }
+
+ void verify(bool cond, const char* test, uint32_t expect, uint32_t actual) {
+ if (!cond) {
+ Message error(
+ QPID_MSG("Sequence error: expected n" << test << expect << " but got " << actual),
+ "sub_done");
+ session.messageTransfer(arg::content=error, arg::acceptMode=1);
+ throw Exception(error.getData());
+ }
+ }
+
+ void run() { // Subscribe
+ try {
+ if (opts.txSub) sync(session).txSelect();
+ SubscriptionManager subs(session);
+ SubscriptionSettings settings;
+ settings.autoAck = opts.txSub ? opts.txSub : opts.ack;
+ settings.acceptMode = (opts.txSub || opts.ack ? ACCEPT_MODE_EXPLICIT : ACCEPT_MODE_NONE);
+ settings.flowControl = FlowControl::messageCredit(opts.subQuota);
+ LocalQueue lq;
+ Subscription subscription = subs.subscribe(lq, queue, settings);
+ // Notify controller we are ready.
+ session.messageTransfer(arg::content=Message("ready", fqn("sub_ready")), arg::acceptMode=1);
+ if (opts.txSub) {
+ if (opts.commitAsync) session.txCommit();
+ else sync(session).txCommit();
+ }
+
+ LocalQueue iterationControl;
+ if (opts.iterations > 1) {
+ subs.subscribe(iterationControl, fqn("sub_iteration"), SubscriptionSettings(FlowControl::messageCredit(0)));
+ }
+
+ for (size_t j = 0; j < opts.iterations; ++j) {
+ if (j > 0) {
+ //need to wait here until all subs are done
+ session.messageFlow(fqn("sub_iteration"), 0, 1);
+ iterationControl.pop();
+
+ //need to allocate some more credit for subscription
+ session.messageFlow(queue, 0, opts.subQuota);
+ }
+ Message msg;
+ AbsTime start=now();
+ size_t expect=0;
+ for (size_t i = 0; i < opts.subQuota; ++i) {
+ msg=lq.pop();
+ if (opts.txSub && ((i+1) % opts.txSub == 0)) {
+ if (opts.commitAsync) session.txCommit();
+ else sync(session).txCommit();
+ }
+ if (opts.intervalSub)
+ qpid::sys::usleep(opts.intervalSub*1000);
+ // TODO aconway 2007-11-23: check message order for.
+ // multiple publishers. Need an array of counters,
+ // one per publisher and a publisher ID in the
+ // message. Careful not to introduce a lot of overhead
+ // here, e.g. no std::map, std::string etc.
+ //
+ // For now verify order only for a single publisher.
+ size_t offset = opts.uniqueData ? 5 /*marker is 'data:'*/ : 0;
+ size_t n = *reinterpret_cast<const size_t*>(msg.getData().data() + offset);
+ if (opts.pubs == 1) {
+ if (opts.subs == 1 || opts.mode == FANOUT) verify(n==expect, "==", expect, n);
+ else verify(n>=expect, ">=", expect, n);
+ expect = n+1;
+ }
+ }
+ if (opts.txSub || opts.ack)
+ subscription.accept(subscription.getUnaccepted());
+ if (opts.txSub) {
+ if (opts.commitAsync) session.txCommit();
+ else sync(session).txCommit();
+ }
+ AbsTime end=now();
+
+ // Report to publisher.
+ Message result(lexical_cast<string>(opts.subQuota/secs(start,end)),
+ fqn("sub_done"));
+ session.messageTransfer(arg::content=result, arg::acceptMode=1);
+ if (opts.txSub) sync(session).txCommit();
+ }
+ session.close();
+ }
+ catch (const std::exception& e) {
+ cout << "SubscribeThread exception: " << e.what() << endl;
+ }
+ }
+};
+
+}} // namespace qpid::tests
+
+using namespace qpid::tests;
+
+int main(int argc, char** argv) {
+ int exitCode = 0;
+ boost::ptr_vector<Client> subs(opts.subs);
+ boost::ptr_vector<Client> pubs(opts.pubs);
+
+ try {
+ opts.parse(argc, argv);
+
+ string exchange;
+ switch (opts.mode) {
+ case FANOUT: exchange="amq.fanout"; break;
+ case TOPIC: exchange="amq.topic"; break;
+ case SHARED: break;
+ }
+
+ bool singleProcess=
+ (!opts.setup && !opts.control && !opts.publish && !opts.subscribe);
+ if (singleProcess)
+ opts.setup = opts.control = opts.publish = opts.subscribe = true;
+
+ if (opts.setup) Setup().run(); // Set up queues
+
+ // Start pubs/subs for each queue/topic.
+ for (size_t i = 0; i < opts.qt; ++i) {
+ ostringstream key;
+ key << opts.baseName << i; // Queue or topic name.
+ if (opts.publish) {
+ size_t n = singleProcess ? opts.pubs : 1;
+ for (size_t j = 0; j < n; ++j) {
+ pubs.push_back(new PublishThread(key.str(), exchange));
+ pubs.back().thread=Thread(pubs.back());
+ }
+ }
+ if (opts.subscribe) {
+ size_t n = singleProcess ? opts.subs : 1;
+ for (size_t j = 0; j < n; ++j) {
+ if (opts.mode==SHARED)
+ subs.push_back(new SubscribeThread(key.str()));
+ else
+ subs.push_back(new SubscribeThread(key.str(),exchange));
+ subs.back().thread=Thread(subs.back());
+ }
+ }
+ }
+
+ if (opts.control) Controller().run();
+ }
+ catch (const std::exception& e) {
+ cout << endl << e.what() << endl;
+ exitCode = 1;
+ }
+
+ // Wait for started threads.
+ if (opts.publish) {
+ for (boost::ptr_vector<Client>::iterator i=pubs.begin();
+ i != pubs.end();
+ ++i)
+ i->thread.join();
+ }
+
+ if (opts.subscribe) {
+ for (boost::ptr_vector<Client>::iterator i=subs.begin();
+ i != subs.end();
+ ++i)
+ i->thread.join();
+ }
+ return exitCode;
+}