diff options
Diffstat (limited to 'cpp/src')
-rw-r--r-- | cpp/src/qpid/broker/Broker.cpp | 4 | ||||
-rw-r--r-- | cpp/src/qpid/client/Completion.h | 2 | ||||
-rw-r--r-- | cpp/src/qpid/client/Connector.cpp | 6 | ||||
-rw-r--r-- | cpp/src/qpid/client/SessionCore.cpp | 23 | ||||
-rw-r--r-- | cpp/src/qpid/client/SessionCore.h | 1 | ||||
-rw-r--r-- | cpp/src/qpid/framing/AMQContentBody.cpp | 4 | ||||
-rw-r--r-- | cpp/src/qpid/framing/TransferContent.cpp | 4 | ||||
-rw-r--r-- | cpp/src/qpid/log/Options.cpp | 2 | ||||
-rw-r--r-- | cpp/src/tests/perftest.cpp | 636 |
9 files changed, 441 insertions, 241 deletions
diff --git a/cpp/src/qpid/broker/Broker.cpp b/cpp/src/qpid/broker/Broker.cpp index 4e22cb7352..e10f33e426 100644 --- a/cpp/src/qpid/broker/Broker.cpp +++ b/cpp/src/qpid/broker/Broker.cpp @@ -74,7 +74,7 @@ Broker::Options::Options(const std::string& name) : storeForce(false), enableMgmt(0), mgmtPubInterval(10), - ack(100) + ack(0) { int c = sys::SystemInfo::concurrency(); if (c > 0) workerThreads=c; @@ -102,7 +102,7 @@ Broker::Options::Options(const std::string& name) : ("mgmt-pub-interval", optValue(mgmtPubInterval, "SECONDS"), "Management Publish Interval") ("ack", optValue(ack, "N"), - "Send ack/solicit-ack at least every N frames. 0 disables voluntary acks/solitict-ack"); + "Send session.ack/solicit-ack at least every N frames. 0 disables voluntary ack/solitict-ack"); } const std::string empty; diff --git a/cpp/src/qpid/client/Completion.h b/cpp/src/qpid/client/Completion.h index a126bc9766..4d324aaf28 100644 --- a/cpp/src/qpid/client/Completion.h +++ b/cpp/src/qpid/client/Completion.h @@ -36,6 +36,8 @@ protected: shared_ptr<SessionCore> session; public: + Completion() {} + Completion(Future f, shared_ptr<SessionCore> s) : future(f), session(s) {} void sync() diff --git a/cpp/src/qpid/client/Connector.cpp b/cpp/src/qpid/client/Connector.cpp index 80d97b10aa..497288bc3f 100644 --- a/cpp/src/qpid/client/Connector.cpp +++ b/cpp/src/qpid/client/Connector.cpp @@ -108,7 +108,7 @@ void Connector::send(AMQFrame& frame){ writeFrameQueue.push(frame); aio->queueWrite(); - QPID_LOG(trace, "SENT: " << frame); + QPID_LOG(trace, "SENT [" << this << "]: " << frame); } void Connector::handleClosed() { @@ -180,8 +180,8 @@ void Connector::readbuff(AsynchIO& aio, AsynchIO::BufferBase* buff) { AMQFrame frame; while(frame.decode(in)){ - QPID_LOG(trace, "RECV: " << frame); - input->received(frame); + QPID_LOG(trace, "RECV [" << this << "]: " << frame); + input->received(frame); } // TODO: unreading needs to go away, and when we can cope // with multiple sub-buffers in the general buffer scheme, it will diff --git a/cpp/src/qpid/client/SessionCore.cpp b/cpp/src/qpid/client/SessionCore.cpp index 8eab54fa62..3a26734892 100644 --- a/cpp/src/qpid/client/SessionCore.cpp +++ b/cpp/src/qpid/client/SessionCore.cpp @@ -87,7 +87,6 @@ inline void SessionCore::waitFor(State s) { // We can be CLOSED or SUSPENDED by error at any time. state.waitFor(States(s, CLOSED, SUSPENDED)); check(); - assert(state==s); invariant(); } @@ -97,7 +96,8 @@ SessionCore::SessionCore(shared_ptr<ConnectionImpl> conn, sync(false), channel(ch), proxy(channel), - state(OPENING) + state(OPENING), + detachedLifetime(0) { l3.out = &out; attaching(conn); @@ -166,10 +166,11 @@ FrameSet::shared_ptr SessionCore::get() { // user thread static const std::string CANNOT_REOPEN_SESSION="Cannot re-open a session."; -void SessionCore::open(uint32_t detachedLifetime) { // user thread +void SessionCore::open(uint32_t timeout) { // user thread Lock l(state); check(state==OPENING && !session, COMMAND_INVALID, CANNOT_REOPEN_SESSION); + detachedLifetime=timeout; proxy.open(detachedLifetime); waitFor(OPEN); } @@ -364,8 +365,22 @@ Future SessionCore::send(const AMQBody& command, const MethodContent& content) return Future(l3.send(command, content)); } +namespace { +bool isCloseResponse(const AMQFrame& frame) { + return frame.getMethod() && + frame.getMethod()->amqpClassId() == SESSION_CLASS_ID && + frame.getMethod()->amqpMethodId() == SESSION_CLOSED_METHOD_ID; +} +} + // Network thread. void SessionCore::handleIn(AMQFrame& frame) { + { + Lock l(state); + // Ignore frames received while closing other than closed response. + if (state==CLOSING && !isCloseResponse(frame)) + return; + } try { // Cast to expose private SessionHandler functions. if (!invoke(static_cast<SessionHandler&>(*this), *frame.getBody())) { @@ -382,7 +397,7 @@ void SessionCore::handleOut(AMQFrame& frame) { Lock l(state); if (state==OPEN) { - if (session->sent(frame)) + if (detachedLifetime > 0 && session->sent(frame)) proxy.solicitAck(); channel.handle(frame); } diff --git a/cpp/src/qpid/client/SessionCore.h b/cpp/src/qpid/client/SessionCore.h index 38c72359a3..2bb0f41fbf 100644 --- a/cpp/src/qpid/client/SessionCore.h +++ b/cpp/src/qpid/client/SessionCore.h @@ -133,6 +133,7 @@ class SessionCore : public framing::FrameHandler::InOutHandler, framing::ChannelHandler channel; framing::AMQP_ServerProxy::Session proxy; mutable StateMonitor state; + uint32_t detachedLifetime; }; }} // namespace qpid::client diff --git a/cpp/src/qpid/framing/AMQContentBody.cpp b/cpp/src/qpid/framing/AMQContentBody.cpp index 13491589c4..59f3619ef2 100644 --- a/cpp/src/qpid/framing/AMQContentBody.cpp +++ b/cpp/src/qpid/framing/AMQContentBody.cpp @@ -40,7 +40,5 @@ void qpid::framing::AMQContentBody::decode(Buffer& buffer, uint32_t _size){ void qpid::framing::AMQContentBody::print(std::ostream& out) const { out << "content (" << size() << " bytes)"; -#ifndef NDEBUG - out << " " << data.substr(0,10) << "..."; -#endif + out << " " << data.substr(0,16) << "..."; } diff --git a/cpp/src/qpid/framing/TransferContent.cpp b/cpp/src/qpid/framing/TransferContent.cpp index 1bb69fbca9..99f5d365e8 100644 --- a/cpp/src/qpid/framing/TransferContent.cpp +++ b/cpp/src/qpid/framing/TransferContent.cpp @@ -29,8 +29,8 @@ TransferContent::TransferContent(const std::string& data, const std::string& exchange) { setData(data); - getDeliveryProperties().setRoutingKey(routingKey); - getDeliveryProperties().setExchange(exchange); + if (routingKey.size()) getDeliveryProperties().setRoutingKey(routingKey); + if (exchange.size()) getDeliveryProperties().setExchange(exchange); } AMQHeaderBody TransferContent::getHeader() const diff --git a/cpp/src/qpid/log/Options.cpp b/cpp/src/qpid/log/Options.cpp index 41a15dcf9f..72dbf39ca8 100644 --- a/cpp/src/qpid/log/Options.cpp +++ b/cpp/src/qpid/log/Options.cpp @@ -28,7 +28,7 @@ using namespace std; Options::Options(const std::string& name) : qpid::Options(name), time(true), level(true), thread(false), source(false), function(false), trace(false) { - outputs.push_back("stderr"); + outputs.push_back("stdout"); selectors.push_back("error+"); ostringstream levels; diff --git a/cpp/src/tests/perftest.cpp b/cpp/src/tests/perftest.cpp index 78712aec5c..b4eec8468d 100644 --- a/cpp/src/tests/perftest.cpp +++ b/cpp/src/tests/perftest.cpp @@ -28,109 +28,172 @@ #include "qpid/client/Message.h" #include "qpid/sys/Time.h" +#include <boost/lexical_cast.hpp> +#include <boost/bind.hpp> +#include <boost/function.hpp> +#include <boost/ptr_container/ptr_vector.hpp> + #include <iostream> #include <sstream> +#include <numeric> +#include <algorithm> using namespace std; using namespace qpid; using namespace client; using namespace sys; +using boost::lexical_cast; +using boost::bind; + +enum Mode { SHARED, FANOUT, TOPIC }; +const char* modeNames[] = { "shared", "fanout", "topic" }; + +// istream/ostream ops so Options can read/display Mode. +istream& operator>>(istream& in, Mode& mode) { + string s; + in >> s; + int i = find(modeNames, modeNames+3, s) - modeNames; + if (i >= 3) throw Exception("Invalid mode: "+s); + mode = Mode(i); + return in; +} + +ostream& operator<<(ostream& out, Mode mode) { + return out << modeNames[mode]; +} + struct Opts : public TestOptions { - bool listen; - bool publish; - bool purge; - size_t count; + // Actions + bool setup, control, publish, subscribe; + + // Publisher + size_t pubs; + size_t count ; size_t size; + bool confirm; bool durable; - size_t consumers; - std::string mode; - size_t autoAck; + + // Subscriber + size_t subs; + size_t ack; + + // General + size_t qt; + Mode mode; bool summary; - bool confirmMode; - bool acquireMode; Opts() : - listen(false), publish(false), purge(false), - count(500000), size(64), consumers(1), - mode("shared"), autoAck(100), - summary(false), confirmMode(false), acquireMode(false) + setup(false), control(false), publish(false), subscribe(false), + pubs(1), count(500000), size(64), confirm(false), durable(false), + subs(1), ack(0), + qt(1), mode(SHARED), summary(false) { - addOptions() - ("listen", optValue(listen), "Consume messages.") - ("publish", optValue(publish), "Produce messages.") - ("purge", optValue(purge), "Purge shared queues.") - ("count", optValue(count, "N"), "Messages to send.") - ("size", optValue(size, "BYTES"), "Size of messages.") + addOptions() + ("setup", optValue(setup), "Create shared queues.") + ("control", optValue(control), "Run test, print report.") + ("publish", optValue(publish), "Publish messages.") + ("subscribe", optValue(subscribe), "Subscribe for messages.") + + ("mode", optValue(mode, "shared|fanout|topic"), "Test mode." + "\nshared: --qt queues, --npubs publishers and --nsubs subscribers per queue.\n" + "\nfanout: --npubs publishers, --nsubs subscribers, fanout exchange." + "\ntopic: --qt topics, --npubs publishers and --nsubs subscribers per topic.\n") + + ("npubs", optValue(pubs, "N"), "Create N publishers.") + ("count", optValue(count, "N"), "Each publisher sends N messages.") + ("size", optValue(size, "BYTES"), "Size of messages in bytes.") + ("pub-confirm", optValue(confirm), "Publisher use confirm-mode.") ("durable", optValue(durable, "N"), "Publish messages as durable.") - ("consumers", optValue(consumers, "N"), "Number of consumers.") - ("mode", optValue(mode, "shared|fanout|topic"), "consume mode") - ("auto-ack", optValue(autoAck, "N"), "ack every N messages.") - ("summary,s", optValue(summary), "summary output only") - ("confirm-mode", optValue(confirmMode, "N"), "confirm mode") - ("acquire-mode", optValue(acquireMode, "N"), "acquire mode (N - pre acquire, Y - no acquire"); + + ("nsubs", optValue(subs, "N"), "Create N subscribers.") + ("sub-ack", optValue(ack, "N"), "N>0: Subscriber acks batches of N.\n" + "N==0: Subscriber uses unconfirmed mode") + + ("qt", optValue(qt, "N"), "Create N queues or topics.") + ("summary,s", optValue(summary), "Summary output only."); + } + + // Computed values + size_t totalPubs; + size_t totalSubs; + size_t transfers; + size_t subQuota; + + void parse(int argc, char** argv) { + TestOptions::parse(argc, argv); + switch (mode) { + case SHARED: + if (count % subs) { + count += subs - (count % subs); + cout << "WARNING: Adjusted --count to " << count + << " the nearest multiple of --nsubs" << endl; + } + totalPubs = pubs*qt; + totalSubs = subs*qt; + subQuota = (pubs*count)/subs; + break; + case FANOUT: + if (qt != 1) cerr << "WARNING: Fanout mode, ignoring --qt=" + << qt << endl; + qt=1; + totalPubs = pubs; + totalSubs = subs; + subQuota = totalPubs*count; + break; + case TOPIC: + totalPubs = pubs*qt; + totalSubs = subs*qt; + subQuota = pubs*count; + break; + } + transfers=(totalPubs*count) + (totalSubs*subQuota); } }; -Opts opts; -enum Mode { SHARED, FANOUT, TOPIC }; -Mode mode; -struct ListenThread : public Runnable { Thread thread; void run(); }; -struct PublishThread : public Runnable { Thread thread; void run(); }; +Opts opts; -// Create and purge the shared queues -void setup() { +struct Client : public Runnable { Connection connection; - opts.open(connection); - Session_0_10 session = connection.newSession(); - session.setSynchronous(true); // Make sure this is all completed. - session.queueDeclare(arg::queue="control"); // Control queue - if (opts.purge) { - if (!opts.summary) cout << "Purging shared queues" << endl; - session.queuePurge(arg::queue="control"); - } - if (mode==SHARED) { - session.queueDeclare(arg::queue="perftest", arg::durable=opts.durable); // Shared data queue - if (opts.purge) - session.queuePurge(arg::queue="perftest"); - } - session.close(); - connection.close(); -} + Session_0_10 session; + Thread thread; -int main(int argc, char** argv) { - try { - opts.parse(argc, argv); - if (opts.mode=="shared") mode=SHARED; - else if (opts.mode=="fanout") mode = FANOUT; - else if (opts.mode=="topic") mode = TOPIC; - else throw Exception("Invalid mode"); - if (!opts.listen && !opts.publish && !opts.purge) - opts.listen = opts.publish = opts.purge = true; - setup(); - std::vector<ListenThread> listen(opts.consumers); - PublishThread publish; - if (opts.listen) - for (size_t i = 0; i < opts.consumers; ++i) - listen[i].thread=Thread(listen[i]); - if (opts.publish) - publish.thread=Thread(publish); - if (opts.listen) - for (size_t i = 0; i < opts.consumers; ++i) - listen[i].thread.join(); - if (opts.publish) - publish.thread.join(); + Client() { + opts.open(connection); + session = connection.newSession(); } - catch (const std::exception& e) { - cout << "Unexpected exception: " << e.what() << endl; + + ~Client() { + session.close(); + connection.close(); } -} +}; -double secs(Duration d) { return double(d)/TIME_SEC; } -double secs(AbsTime start, AbsTime finish) { return secs(Duration(start,finish)); } +struct Setup : public Client { + + void queueInit(string name, bool durable=false) { + session.queueDeclare(arg::queue=name, arg::durable=durable); + session.queuePurge(arg::queue=name); + } + void run() { + queueInit("pub_start"); + queueInit("pub_done"); + queueInit("sub_ready"); + queueInit("sub_done"); + if (opts.mode==SHARED) { + for (size_t i = 0; i < opts.qt; ++i) { + ostringstream qname; + qname << "perftest" << i; + queueInit(qname.str(), opts.durable); + } + } + // Make sure this is all completed before we return. + session.execution().sendSyncRequest(); + } +}; void expect(string actual, string expect) { if (expect != actual) @@ -138,176 +201,297 @@ void expect(string actual, string expect) { } -const char* exchange() { - switch (mode) { - case SHARED: return ""; // Deafult exchange. - case FANOUT: return "amq.fanout"; - case TOPIC: return "amq.topic"; - } - assert(0); - return 0; +double secs(Duration d) { return double(d)/TIME_SEC; } +double secs(AbsTime start, AbsTime finish) { + return secs(Duration(start,finish)); } -void PublishThread::run() { - try { - Connection connection; - opts.open(connection); - Session_0_10 session = connection.newSession(); - - // Wait for consumers. - if (!opts.summary) cout << "Waiting for consumers ready " << flush; - SubscriptionManager subs(session); - LocalQueue control; - subs.subscribe(control, "control"); - for (size_t i = 0; i < opts.consumers; ++i) { - if (!opts.summary) cout << "." << flush; - expect(control.pop().getData(), "ready"); + +// Collect rates & print stats. +class Stats { + vector<double> values; + double sum; + + public: + Stats() : sum(0) {} + + // Functor to collect rates. + void operator()(const string& data) { + double d=lexical_cast<double>(data); + values.push_back(d); + sum += d; + } + + double mean() const { + return sum/values.size(); + } + + double stdev() const { + if (values.size() <= 1) return 0; + double avg = mean(); + double ssq = 0; + for (vector<double>::const_iterator i = values.begin(); + i != values.end(); ++i) { + double x=*i; + x -= avg; + ssq += x*x; } - if (!opts.summary) cout << endl; - - size_t msgSize=max(opts.size, sizeof(size_t)); - Message msg(string(msgSize, 'X'), "perftest"); - if (opts.durable) - msg.getDeliveryProperties().setDeliveryMode(framing::PERSISTENT); - - AbsTime start=now(); - if (!opts.summary) cout << "Publishing " << opts.count - << " messages " << flush; - for (size_t i=0; i<opts.count; i++) { - // Stamp the iteration into the message data, careful to avoid - // any heap allocation. - char* data = const_cast<char*>(msg.getData().data()); - *reinterpret_cast<uint32_t*>(data) = i; - session.messageTransfer(arg::destination=exchange(), - arg::content=msg, arg::confirmMode=opts.confirmMode, - arg::acquireMode=opts.acquireMode); - if (!opts.summary && (i%10000)==0){ - cout << "." << flush; - session.execution().sendSyncRequest(); - } + return sqrt(ssq/(values.size()-1)); + } + + ostream& print(ostream& out) { + ostream_iterator<double> o(out, "\n"); + copy(values.begin(), values.end(), o); + out << "Average: " << mean(); + if (values.size() > 1) + out << " (std.dev. " << stdev() << ")"; + return out << endl; + } +}; + + +// Manage control queues, collect and print reports. +struct Controller : public Client { + + SubscriptionManager subs; + + Controller() : subs(session) {} + + /** Process messages from queue by applying a functor. */ + void process(size_t n, string queue, + boost::function<void (const string&)> msgFn) + { + if (!opts.summary) + cout << "Processing " << n << " messages from " + << queue << " " << flush; + LocalQueue lq; + subs.setFlowControl(n, SubscriptionManager::UNLIMITED, false); + subs.subscribe(lq, queue); + for (size_t i = 0; i < n; ++i) { + if (!opts.summary) cout << "." << flush; + msgFn(lq.pop().getData()); } - - //Completion compl; if (!opts.summary) cout << " done." << endl; - msg.setData("done"); // Send done messages. - if (mode==SHARED) - for (size_t i = 0; i < opts.consumers; ++i) - session.messageTransfer(arg::destination=exchange(), arg::content=msg); - else - session.messageTransfer(arg::destination=exchange(), arg::content=msg); - session.execution().sendSyncRequest(); - AbsTime end=now(); + } - // Report - double publish_rate=(opts.count)/secs(start,end); + void send(size_t n, string queue, string data) { if (!opts.summary) - cout << endl - << "publish count:" << opts.count << endl - << "publish secs:" << secs(start,end) << endl - << "publish rate:" << publish_rate << endl; - - double consume_rate = 0; // Average rate for consumers. - // Wait for consumer(s) to finish. - if (!opts.summary) cout << "Waiting for consumers done " << endl; - for (size_t i = 0; i < opts.consumers; ++i) { - string report=control.pop().getData(); - if (!opts.summary) - cout << endl << report; + cout << "Sending " << data << " " << n << " times to " << queue + << endl; + Message msg(data, queue); + for (size_t i = 0; i < n; ++i) + session.messageTransfer(arg::content=msg); + } + + void run() { // Controller + try { + // Wait for subscribers to be ready. + process(opts.totalSubs, "sub_ready", bind(expect, _1, "ready")); + + Stats pubRates; + Stats subRates; + + AbsTime start=now(); + send(opts.totalPubs, "pub_start", "start"); // Start publishers + process(opts.totalPubs, "pub_done", boost::ref(pubRates)); + process(opts.totalSubs, "sub_done", boost::ref(subRates)); + AbsTime end=now(); + double time=secs(start, end); + + if (!opts.summary) { + cout << endl << "Publish rates: " << endl; + pubRates.print(cout); + cout << endl << "Subscribe rates: " << endl; + subRates.print(cout); + cout << endl << "Total transfers: " << opts.transfers << endl; + cout << "Total time (secs): " << time << endl; + cout << "Total rate: " << opts.transfers/time << endl; + } else { - double rate=boost::lexical_cast<double>(report); - consume_rate += rate/opts.consumers; + cout << pubRates.mean() << "\t" + << subRates.mean() << "\t" + << opts.transfers/time << endl; } } - end=now(); - - // Count total transfers from publisher and to subscribers. - int transfers; - if (mode==SHARED) // each message sent/receivd once. - transfers=2*opts.count; - else // sent once, received N times. - transfers=opts.count*(opts.consumers + 1); - double total_rate=transfers/secs(start, end); - if (opts.summary) - cout << opts.mode << '(' << opts.count - << ':' << opts.consumers << ')' - << '\t' << publish_rate - << '\t' << consume_rate - << '\t' << total_rate - << endl; - else - cout << endl - << "total transfers:" << transfers << endl - << "total secs:" << secs(start, end) << endl - << "total rate:" << total_rate << endl; - - connection.close(); + catch (const std::exception& e) { + cout << "Controller exception: " << e.what() << endl; + exit(1); + } } - catch (const std::exception& e) { - cout << "PublishThread exception: " << e.what() << endl; +}; + +struct PublishThread : public Client { + string destination; + string routingKey; + + PublishThread() {}; + + PublishThread(string key, string dest=string()) { + destination=dest; + routingKey=key; + } + + void run() { // Publisher + Completion completion; + try { + size_t msgSize=max(opts.size, sizeof(size_t)); + Message msg(string(msgSize, 'X'), routingKey); + if (opts.durable) + msg.getDeliveryProperties().setDeliveryMode(framing::PERSISTENT); + + SubscriptionManager subs(session); + LocalQueue lq(AckPolicy(opts.ack)); + subs.setFlowControl(1, SubscriptionManager::UNLIMITED, false); + subs.subscribe(lq, "pub_start"); + expect(lq.pop().getData(), "start"); + + AbsTime start=now(); + for (size_t i=0; i<opts.count; i++) { + // Stamp the iteration into the message data, avoid + // any heap allocation. + char* data = const_cast<char*>(msg.getData().data()); + *reinterpret_cast<uint32_t*>(data) = i; + completion = session.messageTransfer( + arg::destination=destination, + arg::content=msg, + arg::confirmMode=opts.confirm); + } + if (opts.confirm) completion.sync(); + AbsTime end=now(); + double time=secs(start,end); + + // Send result to controller. + msg.setData(lexical_cast<string>(opts.count/time)); + msg.getDeliveryProperties().setRoutingKey("pub_done"); + session.messageTransfer(arg::content=msg); + session.close(); + } + catch (const std::exception& e) { + cout << "PublishThread exception: " << e.what() << endl; + exit(1); + } + } +}; + +struct SubscribeThread : public Client { + + string queue; + + SubscribeThread() {} + + SubscribeThread(string q) { queue = q; } + + SubscribeThread(string key, string ex) { + queue=session.getId().str(); // Unique name. + session.queueDeclare(arg::queue=queue, + arg::exclusive=true, + arg::autoDelete=true, + arg::durable=opts.durable); + session.queueBind(arg::queue=queue, + arg::exchange=ex, + arg::routingKey=key); + } + + void run() { // Subscribe + try { + SubscriptionManager subs(session); + LocalQueue lq(AckPolicy(opts.ack)); + subs.setConfirmMode(opts.ack > 0); + subs.setFlowControl(opts.subQuota, SubscriptionManager::UNLIMITED, + false); + subs.subscribe(lq, queue); + // Notify controller we are ready. + session.messageTransfer(arg::content=Message("ready", "sub_ready")); + + Message msg; + AbsTime start=now(); + for (size_t i = 0; i < opts.subQuota; ++i) { + msg=lq.pop(); + // FIXME aconway 2007-11-23: Verify message sequence numbers. + // Need an array of counters, one per publisher and need + // publisher ID in the message for multiple publishers. + } + if (opts.ack !=0) + msg.acknowledge(); // Cumulative ack for final batch. + AbsTime end=now(); + + // FIXME aconway 2007-11-23: close the subscription, + // release any pending messages. + + // Report to publisher. + Message result(lexical_cast<string>(opts.subQuota/secs(start,end)), + "sub_done"); + session.messageTransfer(arg::content=result); + session.close(); + } + catch (const std::exception& e) { + cout << "Publisher exception: " << e.what() << endl; + exit(1); + } + } +}; + +int main(int argc, char** argv) { + string exchange; + switch (opts.mode) { + case FANOUT: exchange="amq.fanout"; break; + case TOPIC: exchange="amq.topic"; break; + case SHARED: break; } -} -void ListenThread::run() { try { - Connection connection; - opts.open(connection); - Session_0_10 session = connection.newSession(); + opts.parse(argc, argv); + if (!opts.setup && !opts.control && !opts.publish && !opts.subscribe) + opts.setup = opts.control = opts.publish = opts.subscribe = true; + + if (opts.setup) Setup().run(); // Set up queues - string consumeQueue; - if (mode == SHARED) { - consumeQueue="perftest"; + boost::ptr_vector<Client> subs(opts.subs); + boost::ptr_vector<Client> pubs(opts.pubs); + + // Start pubs/subs for each queue/topic. + for (size_t i = 0; i < opts.qt; ++i) { + ostringstream key; + key << "perftest" << i; // Queue or topic name. + if (opts.publish) { + for (size_t j = 0; j < opts.pubs; ++j) { + pubs.push_back(new PublishThread(key.str(), exchange)); + pubs.back().thread=Thread(pubs.back()); + } + } + if (opts.subscribe) { + for (size_t j = 0; j < opts.subs; ++j) { + if (opts.mode==SHARED) + subs.push_back(new SubscribeThread(key.str())); + else + subs.push_back(new SubscribeThread(key.str(),exchange)); + subs.back().thread=Thread(subs.back()); + } + } } - else { - consumeQueue=session.getId().str(); // Unique name. - session.queueDeclare(arg::queue=consumeQueue, - arg::exclusive=true, - arg::autoDelete=true, - arg::durable=opts.durable); - session.queueBind(arg::queue=consumeQueue, - arg::exchange=exchange(), - arg::routingKey="perftest"); + + if (opts.control) Controller().run(); + + + // Wait for started threads. + if (opts.publish) { + for (boost::ptr_vector<Client>::iterator i=pubs.begin(); + i != pubs.end(); + ++i) + i->thread.join(); } - // Notify publisher we are ready. - session.messageTransfer(arg::content=Message("ready", "control")); - - SubscriptionManager subs(session); - LocalQueue consume(AckPolicy(opts.autoAck)); - subs.setConfirmMode(opts.confirmMode); - subs.setAcquireMode(opts.acquireMode); - subs.subscribe(consume, consumeQueue); - int consumed=0; - AbsTime start=now(); - Message msg; - size_t i = 0; - while ((msg=consume.pop()).getData() != "done") { - char* data=const_cast<char*>(msg.getData().data()); - size_t j=*reinterpret_cast<size_t*>(data); - if (i > j) - throw Exception( - QPID_MSG("Messages out of order " << i - << " before " << j)); - else - i = j; - ++consumed; + + + if (opts.subscribe) { + for (boost::ptr_vector<Client>::iterator i=subs.begin(); + i != subs.end(); + ++i) + i->thread.join(); } - msg.acknowledge(); // Ack all outstanding messages -- ?? - AbsTime end=now(); - - // Report to publisher. - ostringstream report; - double consume_rate=consumed/secs(start,end); - if (opts.summary) - report << consume_rate; - else - report << "consume count: " << consumed << endl - << "consume secs: " << secs(start, end) << endl - << "consume rate: " << consume_rate << endl; - - session.messageTransfer(arg::content=Message(report.str(), "control")); - connection.close(); + return 0; } catch (const std::exception& e) { - cout << "PublishThread exception: " << e.what() << endl; + cout << "Unexpected exception: " << e.what() << endl; + return 1; } } - |