diff options
author | Alan Conway <aconway@apache.org> | 2007-11-07 19:57:46 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2007-11-07 19:57:46 +0000 |
commit | 710b8a1f1285b9aa5bccee5b1906500667dd7bc5 (patch) | |
tree | 83005778c44cf7d897cef882ced2330bc8bd2228 /cpp/src/tests/perftest.cpp | |
parent | d19657d82321b2b5e2cac386c49aa99f82b976fb (diff) | |
download | qpid-python-710b8a1f1285b9aa5bccee5b1906500667dd7bc5.tar.gz |
client::SubscriptionManager:
- Added autoStop support.
- Added LocalQueue subscriptions.
- Expose AckPolicy settings to user.
client::Message:
- incoming Messages carry their session for acknowledge
perftest: (see perftest --help for details...)
- allow multiple consumers.
- 3 queue modes: shared, fanout, topic.
- set size of messages
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@592869 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/tests/perftest.cpp')
-rw-r--r-- | cpp/src/tests/perftest.cpp | 380 |
1 files changed, 158 insertions, 222 deletions
diff --git a/cpp/src/tests/perftest.cpp b/cpp/src/tests/perftest.cpp index bc816f6597..80157da7f4 100644 --- a/cpp/src/tests/perftest.cpp +++ b/cpp/src/tests/perftest.cpp @@ -21,44 +21,48 @@ #include "TestOptions.h" -#include "qpid/client/Channel.h" -#include "qpid/client/Exchange.h" -#include "qpid/client/Queue.h" +#include "qpid/client/Session_0_10.h" +#include "qpid/client/SubscriptionManager.h" #include "qpid/client/Connection.h" -#include "qpid/client/MessageListener.h" #include "qpid/client/Message.h" -#include "qpid/sys/Monitor.h" #include "qpid/sys/Time.h" #include <iostream> -#include <cstdlib> -#include <iomanip> -#include <time.h> -#include <unistd.h> +#include <sstream> - -using namespace qpid; -using namespace qpid::client; -using namespace qpid::sys; using namespace std; +using namespace qpid; +using namespace client; +using namespace sys; struct Opts : public TestOptions { bool listen; bool publish; int count; - bool durable; + int size; + bool durable; + int consumers; + std::string mode; - Opts() : listen(false), publish(false), count(500000) { + Opts() : + listen(false), publish(false), count(500000), size(64), consumers(1), + mode("shared") + { addOptions() ("listen", optValue(listen), "Consume messages.") ("publish", optValue(publish), "Produce messages.") - ("count", optValue(count, "N"), "Messages to send/receive.") - ("durable", optValue(durable, "N"), "Publish messages as durable."); + ("count", optValue(count, "N"), "Messages to send.") + ("size", optValue(size, "BYTES"), "Size of messages.") + ("durable", optValue(durable, "N"), "Publish messages as durable.") + ("consumers", optValue(consumers, "N"), "Number of consumers.") + ("mode", optValue(mode, "shared|fanout|topic"), "consume mode"); } }; Opts opts; +enum Mode { SHARED, FANOUT, TOPIC }; +Mode mode; struct ListenThread : public Runnable { Thread thread; void run(); }; struct PublishThread : public Runnable { Thread thread; void run(); }; @@ -66,16 +70,22 @@ struct PublishThread : public Runnable { Thread thread; void run(); }; int main(int argc, char** argv) { try { opts.parse(argc, argv); + if (opts.mode=="shared") mode=SHARED; + else if (opts.mode=="fanout") mode = FANOUT; + else if (opts.mode=="topic") mode = TOPIC; + else throw Exception("Invalid mode"); if (!opts.listen && !opts.publish) opts.listen = opts.publish = true; - ListenThread listen; + std::vector<ListenThread> listen(opts.consumers); PublishThread publish; - if (opts.listen) - listen.thread=Thread(listen); + if (opts.listen) + for (int i = 0; i < opts.consumers; ++i) + listen[i].thread=Thread(listen[i]); if (opts.publish) publish.thread=Thread(publish); if (opts.listen) - listen.thread.join(); + for (int i = 0; i < opts.consumers; ++i) + listen[i].thread.join(); if (opts.publish) publish.thread.join(); } @@ -84,223 +94,149 @@ int main(int argc, char** argv) { } } -// ================================================================ -// Publish client -// +double secs(Duration d) { return double(d)/TIME_SEC; } +double secs(AbsTime start, AbsTime finish) { return secs(Duration(start,finish)); } -struct timespec operator-(const struct timespec& lhs, const struct timespec& rhs) { - timespec r; - r.tv_nsec = lhs.tv_nsec - rhs.tv_nsec; - r.tv_sec = lhs.tv_sec - rhs.tv_sec; - if (r.tv_nsec < 0) { - r.tv_nsec += 1000000000; - r.tv_sec -= 1; - } - return r; -} -ostream& operator<<(ostream& o, const struct timespec& ts) { - o << ts.tv_sec << "." << setw(9) << setfill('0') << right << ts.tv_nsec; - return o; -} +void expect(string actual, string expect) { + if (expect != actual) + throw Exception("Expecting "+expect+" but received "+actual); -double toDouble(const struct timespec& ts) { - return double(ts.tv_nsec)/1000000000 + ts.tv_sec; } -class PublishListener : public MessageListener { - - void set_time() { - timespec ts; - if (::clock_gettime(CLOCK_REALTIME, &ts)) - throw Exception(QPID_MSG("clock_gettime failed: " << strError(errno))); - startTime = ts; - } - - void print_time() { - timespec ts; - if (::clock_gettime(CLOCK_REALTIME, &ts)) - throw Exception(QPID_MSG("clock_gettime failed: " << strError(errno))); - cout << "Total Time:" << ts-startTime << endl; - double rate = messageCount*2/toDouble(ts-startTime); - cout << "returned Messages:" << messageCount << endl; - cout << "round trip Rate:" << rate << endl; - } - - struct timespec startTime; - int messageCount; - bool done; - Monitor lock; - - public: - - PublishListener(int mcount): messageCount(mcount), done(false) { - set_time(); - } - - void received(Message& msg) { - print_time(); - QPID_LOG(info, "Publisher: received: " << msg.getData()); - Mutex::ScopedLock l(lock); - QPID_LOG(info, "Publisher: done."); - done = true; - lock.notify(); +const char* exchange() { + switch (mode) { + case SHARED: return ""; // Deafult exchange. + case FANOUT: return "amq.fanout"; + case TOPIC: return "amq.topic"; } - - void wait() { - Mutex::ScopedLock l(lock); - while (!done) - lock.wait(); - } -}; - + assert(0); + return 0; +} void PublishThread::run() { - Connection connection; - Channel channel; - Message msg; - opts.open(connection); - connection.openChannel(channel); - channel.start(); - - cout << "Started publisher." << endl; - string queueControl = "control"; - Queue response(queueControl); - channel.declareQueue(response); - channel.bind(Exchange::STANDARD_TOPIC_EXCHANGE, response, queueControl); + try { + Connection connection; + opts.open(connection); + Session_0_10 session = connection.newSession(); + + session.queueDeclare(arg::queue="control"); // Control queue + session.queuePurge(arg::queue="control"); + if (mode==SHARED) { + session.queueDeclare(arg::queue="perftest"); // Shared data queue + session.queuePurge(arg::queue="perftest"); + } - string queueName ="queue01"; - string queueNameC =queueName+"-1"; - - // create publish queue - Queue publish(queueName); - channel.declareQueue(publish); - channel.bind(Exchange::STANDARD_TOPIC_EXCHANGE, publish, queueName); - - // create completion queue - Queue completion(queueNameC); - channel.declareQueue(completion); - channel.bind(Exchange::STANDARD_TOPIC_EXCHANGE, completion, queueNameC); - - // pass queue name - msg.setData(queueName); - channel.publish(msg, Exchange::STANDARD_TOPIC_EXCHANGE, queueControl); - - QPID_LOG(info, "Publisher: setup return queue: "<< queueNameC); - - int count = opts.count; - PublishListener listener(count); - channel.consume(completion, queueNameC, &listener); - QPID_LOG(info, "Publisher setup consumer: "<< queueNameC); - - struct timespec startTime; - if (::clock_gettime(CLOCK_REALTIME, &startTime)) - throw Exception(QPID_MSG("clock_gettime failed: " << strError(errno))); - - bool durable = opts.durable; - if (durable) + // Wait for consumers. + SubscriptionManager subs(session); + LocalQueue control; + subs.subscribe(control, "control"); + for (int i = 0; i < opts.consumers; ++i) + expect(control.pop().getData(), "ready"); + + // Create test message + size_t msgSize=max(opts.size, 32); + Message msg(string(msgSize, 'X'), "perftest"); + char* msgBuf = const_cast<char*>(msg.getData().data()); + if (opts.durable) msg.getDeliveryProperties().setDeliveryMode(framing::PERSISTENT); + // Time sending message. + AbsTime start=now(); + cout << "Publishing " << opts.count << " messages " << flush; + for (int i=0; i<opts.count; i++) { + sprintf(msgBuf, "%d", i); + session.messageTransfer(arg::destination=exchange(), + arg::content=msg); + if ((i%10000)==0) cout << "." << flush; + } + cout << " done." << endl; + msg.setData("done"); // Send done messages. + if (mode==SHARED) + for (int i = 0; i < opts.consumers; ++i) + session.messageTransfer(arg::destination=exchange(), arg::content=msg); + else + session.messageTransfer(arg::destination=exchange(), arg::content=msg); + AbsTime end=now(); + + // Report + cout << endl; + cout << "publish count:" << opts.count << endl; + cout << "publish secs:" << secs(start,end) << endl; + cout << "publish rate:" << (opts.count)/secs(start,end) << endl; + + // Wait for consumer(s) to finish. + for (int i = 0; i < opts.consumers; ++i) { + string report=control.pop().getData(); + if (report.find("consume") != 0) + throw Exception("Expected consumer report, got: "+report); + cout << endl << report; + } + end=now(); + + // Count total transfers from publisher and to subscribers. + int transfers; + if (mode==SHARED) // each message sent/receivd once. + transfers=2*opts.count; + else // sent once, received N times. + transfers=opts.count*(opts.consumers + 1); + + cout << endl + << "total transfers:" << transfers << endl + << "total secs:" << secs(start, end) << endl + << "total transfers/sec:" << transfers/secs(start, end) << endl; - for (int i=0; i<count; i++) { - msg.setData("Message 0123456789 "); - channel.publish(msg, Exchange::STANDARD_TOPIC_EXCHANGE, queueName); + connection.close(); + } + catch (const std::exception& e) { + cout << "PublishThread exception: " << e.what() << endl; } - - struct timespec endTime; - if (::clock_gettime(CLOCK_REALTIME, &endTime)) - throw Exception(QPID_MSG("clock_gettime failed: " << strError(errno))); - - cout << "publish Time:" << endTime-startTime << endl; - double rate = count/toDouble(endTime-startTime); - cout << "publish Messages:" << count << endl; - cout << "publish Rate:" << rate << endl; - - msg.setData(queueName); // last message to queue. - channel.publish(msg, Exchange::STANDARD_TOPIC_EXCHANGE, queueName); - - listener.wait(); - - channel.close(); - connection.close(); } - - -// ================================================================ -// Listen client -// - -class Listener : public MessageListener{ - string queueName; - Monitor lock; - bool done; - - public: - Listener (string& _queueName): queueName(_queueName), done(false) {}; - - void received(Message& msg) { - if (msg.getData() == queueName) - { - Mutex::ScopedLock l(lock); - QPID_LOG(info, "Listener: done. " << queueName); - done = true; - lock.notify(); +void ListenThread::run() { + try { + Connection connection; + opts.open(connection); + Session_0_10 session = connection.newSession(); + + string consumeQueue; + switch (mode) { + case SHARED: + consumeQueue="perftest"; + session.queueDeclare(arg::queue="perftest"); + break; + case FANOUT: + case TOPIC: + consumeQueue=session.getId().str(); // Unique + session.queueDeclare(arg::queue=consumeQueue, + arg::exclusive=true, + arg::autoDelete=true); + session.queueBind(arg::queue=consumeQueue, + arg::exchange=exchange(), + arg::routingKey="perftest"); } + // Notify publisher we are ready. + session.queueDeclare(arg::queue="control"); // Control queue + session.messageTransfer(arg::content=Message("ready", "control")); + + SubscriptionManager subs(session); + LocalQueue consume; + subs.subscribe(consume, consumeQueue); + int consumed=0; + AbsTime start=now(); + while (consume.pop().getData() != "done") + ++consumed; + AbsTime end=now(); + + // Report to publisher. + ostringstream report; + report << "consume count: " << consumed << endl + << "consume secs: " << secs(start, end) << endl + << "consume rate: " << consumed/secs(start,end) << endl; + session.messageTransfer(arg::content=Message(report.str(), "control")); + connection.close(); } - - void wait() { - Mutex::ScopedLock l(lock); - while (!done) - lock.wait(); - } -}; - -void ListenThread::run() { - Connection connection; - Channel channel; - Message msg; - Message msg1; - cout << "Started listener." << endl;; - opts.open(connection); - connection.openChannel(channel); - channel.start(); - - string queueControl = "control"; - Queue response(queueControl); - channel.declareQueue(response); - channel.bind(Exchange::STANDARD_TOPIC_EXCHANGE, response, queueControl); - while (!channel.get(msg, response, AUTO_ACK)) { - QPID_LOG(info, "Listener: waiting for queue name."); - sleep(1); + catch (const std::exception& e) { + cout << "PublishThread exception: " << e.what() << endl; } - string queueName =msg.getData(); - string queueNameC =queueName+ "-1"; - - QPID_LOG(info, "Listener: Using Queue:" << queueName); - QPID_LOG(info, "Listener: Reply Queue:" << queueNameC); - // create consume queue - Queue consume(queueName); - channel.declareQueue(consume); - channel.bind(Exchange::STANDARD_TOPIC_EXCHANGE, consume, queueName); - - // create completion queue - Queue completion(queueNameC); - channel.declareQueue(completion); - channel.bind(Exchange::STANDARD_TOPIC_EXCHANGE, completion, queueNameC); - - Listener listener(queueName); - channel.consume(consume, queueName, &listener); - QPID_LOG(info, "Listener: consuming..."); - - listener.wait(); - - QPID_LOG(info, "Listener: send final message."); - // complete. - msg1.setData(queueName); - channel.publish(msg1, Exchange::STANDARD_TOPIC_EXCHANGE, queueNameC); - - channel.close(); - connection.close(); } - |