summaryrefslogtreecommitdiff
path: root/cpp/src/tests
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/tests')
-rw-r--r--cpp/src/tests/ClientSessionTest.cpp2
-rw-r--r--cpp/src/tests/perftest.cpp380
2 files changed, 159 insertions, 223 deletions
diff --git a/cpp/src/tests/ClientSessionTest.cpp b/cpp/src/tests/ClientSessionTest.cpp
index ed3d733c20..369477131c 100644
--- a/cpp/src/tests/ClientSessionTest.cpp
+++ b/cpp/src/tests/ClientSessionTest.cpp
@@ -48,7 +48,7 @@ struct DummyListener : public MessageListener
void listen()
{
- dispatcher.listen(name, this, true, 1);
+ dispatcher.listen(name, this);
dispatcher.run();
}
diff --git a/cpp/src/tests/perftest.cpp b/cpp/src/tests/perftest.cpp
index bc816f6597..80157da7f4 100644
--- a/cpp/src/tests/perftest.cpp
+++ b/cpp/src/tests/perftest.cpp
@@ -21,44 +21,48 @@
#include "TestOptions.h"
-#include "qpid/client/Channel.h"
-#include "qpid/client/Exchange.h"
-#include "qpid/client/Queue.h"
+#include "qpid/client/Session_0_10.h"
+#include "qpid/client/SubscriptionManager.h"
#include "qpid/client/Connection.h"
-#include "qpid/client/MessageListener.h"
#include "qpid/client/Message.h"
-#include "qpid/sys/Monitor.h"
#include "qpid/sys/Time.h"
#include <iostream>
-#include <cstdlib>
-#include <iomanip>
-#include <time.h>
-#include <unistd.h>
+#include <sstream>
-
-using namespace qpid;
-using namespace qpid::client;
-using namespace qpid::sys;
using namespace std;
+using namespace qpid;
+using namespace client;
+using namespace sys;
struct Opts : public TestOptions {
bool listen;
bool publish;
int count;
- bool durable;
+ int size;
+ bool durable;
+ int consumers;
+ std::string mode;
- Opts() : listen(false), publish(false), count(500000) {
+ Opts() :
+ listen(false), publish(false), count(500000), size(64), consumers(1),
+ mode("shared")
+ {
addOptions()
("listen", optValue(listen), "Consume messages.")
("publish", optValue(publish), "Produce messages.")
- ("count", optValue(count, "N"), "Messages to send/receive.")
- ("durable", optValue(durable, "N"), "Publish messages as durable.");
+ ("count", optValue(count, "N"), "Messages to send.")
+ ("size", optValue(size, "BYTES"), "Size of messages.")
+ ("durable", optValue(durable, "N"), "Publish messages as durable.")
+ ("consumers", optValue(consumers, "N"), "Number of consumers.")
+ ("mode", optValue(mode, "shared|fanout|topic"), "consume mode");
}
};
Opts opts;
+enum Mode { SHARED, FANOUT, TOPIC };
+Mode mode;
struct ListenThread : public Runnable { Thread thread; void run(); };
struct PublishThread : public Runnable { Thread thread; void run(); };
@@ -66,16 +70,22 @@ struct PublishThread : public Runnable { Thread thread; void run(); };
int main(int argc, char** argv) {
try {
opts.parse(argc, argv);
+ if (opts.mode=="shared") mode=SHARED;
+ else if (opts.mode=="fanout") mode = FANOUT;
+ else if (opts.mode=="topic") mode = TOPIC;
+ else throw Exception("Invalid mode");
if (!opts.listen && !opts.publish)
opts.listen = opts.publish = true;
- ListenThread listen;
+ std::vector<ListenThread> listen(opts.consumers);
PublishThread publish;
- if (opts.listen)
- listen.thread=Thread(listen);
+ if (opts.listen)
+ for (int i = 0; i < opts.consumers; ++i)
+ listen[i].thread=Thread(listen[i]);
if (opts.publish)
publish.thread=Thread(publish);
if (opts.listen)
- listen.thread.join();
+ for (int i = 0; i < opts.consumers; ++i)
+ listen[i].thread.join();
if (opts.publish)
publish.thread.join();
}
@@ -84,223 +94,149 @@ int main(int argc, char** argv) {
}
}
-// ================================================================
-// Publish client
-//
+double secs(Duration d) { return double(d)/TIME_SEC; }
+double secs(AbsTime start, AbsTime finish) { return secs(Duration(start,finish)); }
-struct timespec operator-(const struct timespec& lhs, const struct timespec& rhs) {
- timespec r;
- r.tv_nsec = lhs.tv_nsec - rhs.tv_nsec;
- r.tv_sec = lhs.tv_sec - rhs.tv_sec;
- if (r.tv_nsec < 0) {
- r.tv_nsec += 1000000000;
- r.tv_sec -= 1;
- }
- return r;
-}
-ostream& operator<<(ostream& o, const struct timespec& ts) {
- o << ts.tv_sec << "." << setw(9) << setfill('0') << right << ts.tv_nsec;
- return o;
-}
+void expect(string actual, string expect) {
+ if (expect != actual)
+ throw Exception("Expecting "+expect+" but received "+actual);
-double toDouble(const struct timespec& ts) {
- return double(ts.tv_nsec)/1000000000 + ts.tv_sec;
}
-class PublishListener : public MessageListener {
-
- void set_time() {
- timespec ts;
- if (::clock_gettime(CLOCK_REALTIME, &ts))
- throw Exception(QPID_MSG("clock_gettime failed: " << strError(errno)));
- startTime = ts;
- }
-
- void print_time() {
- timespec ts;
- if (::clock_gettime(CLOCK_REALTIME, &ts))
- throw Exception(QPID_MSG("clock_gettime failed: " << strError(errno)));
- cout << "Total Time:" << ts-startTime << endl;
- double rate = messageCount*2/toDouble(ts-startTime);
- cout << "returned Messages:" << messageCount << endl;
- cout << "round trip Rate:" << rate << endl;
- }
-
- struct timespec startTime;
- int messageCount;
- bool done;
- Monitor lock;
-
- public:
-
- PublishListener(int mcount): messageCount(mcount), done(false) {
- set_time();
- }
-
- void received(Message& msg) {
- print_time();
- QPID_LOG(info, "Publisher: received: " << msg.getData());
- Mutex::ScopedLock l(lock);
- QPID_LOG(info, "Publisher: done.");
- done = true;
- lock.notify();
+const char* exchange() {
+ switch (mode) {
+ case SHARED: return ""; // Deafult exchange.
+ case FANOUT: return "amq.fanout";
+ case TOPIC: return "amq.topic";
}
-
- void wait() {
- Mutex::ScopedLock l(lock);
- while (!done)
- lock.wait();
- }
-};
-
+ assert(0);
+ return 0;
+}
void PublishThread::run() {
- Connection connection;
- Channel channel;
- Message msg;
- opts.open(connection);
- connection.openChannel(channel);
- channel.start();
-
- cout << "Started publisher." << endl;
- string queueControl = "control";
- Queue response(queueControl);
- channel.declareQueue(response);
- channel.bind(Exchange::STANDARD_TOPIC_EXCHANGE, response, queueControl);
+ try {
+ Connection connection;
+ opts.open(connection);
+ Session_0_10 session = connection.newSession();
+
+ session.queueDeclare(arg::queue="control"); // Control queue
+ session.queuePurge(arg::queue="control");
+ if (mode==SHARED) {
+ session.queueDeclare(arg::queue="perftest"); // Shared data queue
+ session.queuePurge(arg::queue="perftest");
+ }
- string queueName ="queue01";
- string queueNameC =queueName+"-1";
-
- // create publish queue
- Queue publish(queueName);
- channel.declareQueue(publish);
- channel.bind(Exchange::STANDARD_TOPIC_EXCHANGE, publish, queueName);
-
- // create completion queue
- Queue completion(queueNameC);
- channel.declareQueue(completion);
- channel.bind(Exchange::STANDARD_TOPIC_EXCHANGE, completion, queueNameC);
-
- // pass queue name
- msg.setData(queueName);
- channel.publish(msg, Exchange::STANDARD_TOPIC_EXCHANGE, queueControl);
-
- QPID_LOG(info, "Publisher: setup return queue: "<< queueNameC);
-
- int count = opts.count;
- PublishListener listener(count);
- channel.consume(completion, queueNameC, &listener);
- QPID_LOG(info, "Publisher setup consumer: "<< queueNameC);
-
- struct timespec startTime;
- if (::clock_gettime(CLOCK_REALTIME, &startTime))
- throw Exception(QPID_MSG("clock_gettime failed: " << strError(errno)));
-
- bool durable = opts.durable;
- if (durable)
+ // Wait for consumers.
+ SubscriptionManager subs(session);
+ LocalQueue control;
+ subs.subscribe(control, "control");
+ for (int i = 0; i < opts.consumers; ++i)
+ expect(control.pop().getData(), "ready");
+
+ // Create test message
+ size_t msgSize=max(opts.size, 32);
+ Message msg(string(msgSize, 'X'), "perftest");
+ char* msgBuf = const_cast<char*>(msg.getData().data());
+ if (opts.durable)
msg.getDeliveryProperties().setDeliveryMode(framing::PERSISTENT);
+ // Time sending message.
+ AbsTime start=now();
+ cout << "Publishing " << opts.count << " messages " << flush;
+ for (int i=0; i<opts.count; i++) {
+ sprintf(msgBuf, "%d", i);
+ session.messageTransfer(arg::destination=exchange(),
+ arg::content=msg);
+ if ((i%10000)==0) cout << "." << flush;
+ }
+ cout << " done." << endl;
+ msg.setData("done"); // Send done messages.
+ if (mode==SHARED)
+ for (int i = 0; i < opts.consumers; ++i)
+ session.messageTransfer(arg::destination=exchange(), arg::content=msg);
+ else
+ session.messageTransfer(arg::destination=exchange(), arg::content=msg);
+ AbsTime end=now();
+
+ // Report
+ cout << endl;
+ cout << "publish count:" << opts.count << endl;
+ cout << "publish secs:" << secs(start,end) << endl;
+ cout << "publish rate:" << (opts.count)/secs(start,end) << endl;
+
+ // Wait for consumer(s) to finish.
+ for (int i = 0; i < opts.consumers; ++i) {
+ string report=control.pop().getData();
+ if (report.find("consume") != 0)
+ throw Exception("Expected consumer report, got: "+report);
+ cout << endl << report;
+ }
+ end=now();
+
+ // Count total transfers from publisher and to subscribers.
+ int transfers;
+ if (mode==SHARED) // each message sent/receivd once.
+ transfers=2*opts.count;
+ else // sent once, received N times.
+ transfers=opts.count*(opts.consumers + 1);
+
+ cout << endl
+ << "total transfers:" << transfers << endl
+ << "total secs:" << secs(start, end) << endl
+ << "total transfers/sec:" << transfers/secs(start, end) << endl;
- for (int i=0; i<count; i++) {
- msg.setData("Message 0123456789 ");
- channel.publish(msg, Exchange::STANDARD_TOPIC_EXCHANGE, queueName);
+ connection.close();
+ }
+ catch (const std::exception& e) {
+ cout << "PublishThread exception: " << e.what() << endl;
}
-
- struct timespec endTime;
- if (::clock_gettime(CLOCK_REALTIME, &endTime))
- throw Exception(QPID_MSG("clock_gettime failed: " << strError(errno)));
-
- cout << "publish Time:" << endTime-startTime << endl;
- double rate = count/toDouble(endTime-startTime);
- cout << "publish Messages:" << count << endl;
- cout << "publish Rate:" << rate << endl;
-
- msg.setData(queueName); // last message to queue.
- channel.publish(msg, Exchange::STANDARD_TOPIC_EXCHANGE, queueName);
-
- listener.wait();
-
- channel.close();
- connection.close();
}
-
-
-// ================================================================
-// Listen client
-//
-
-class Listener : public MessageListener{
- string queueName;
- Monitor lock;
- bool done;
-
- public:
- Listener (string& _queueName): queueName(_queueName), done(false) {};
-
- void received(Message& msg) {
- if (msg.getData() == queueName)
- {
- Mutex::ScopedLock l(lock);
- QPID_LOG(info, "Listener: done. " << queueName);
- done = true;
- lock.notify();
+void ListenThread::run() {
+ try {
+ Connection connection;
+ opts.open(connection);
+ Session_0_10 session = connection.newSession();
+
+ string consumeQueue;
+ switch (mode) {
+ case SHARED:
+ consumeQueue="perftest";
+ session.queueDeclare(arg::queue="perftest");
+ break;
+ case FANOUT:
+ case TOPIC:
+ consumeQueue=session.getId().str(); // Unique
+ session.queueDeclare(arg::queue=consumeQueue,
+ arg::exclusive=true,
+ arg::autoDelete=true);
+ session.queueBind(arg::queue=consumeQueue,
+ arg::exchange=exchange(),
+ arg::routingKey="perftest");
}
+ // Notify publisher we are ready.
+ session.queueDeclare(arg::queue="control"); // Control queue
+ session.messageTransfer(arg::content=Message("ready", "control"));
+
+ SubscriptionManager subs(session);
+ LocalQueue consume;
+ subs.subscribe(consume, consumeQueue);
+ int consumed=0;
+ AbsTime start=now();
+ while (consume.pop().getData() != "done")
+ ++consumed;
+ AbsTime end=now();
+
+ // Report to publisher.
+ ostringstream report;
+ report << "consume count: " << consumed << endl
+ << "consume secs: " << secs(start, end) << endl
+ << "consume rate: " << consumed/secs(start,end) << endl;
+ session.messageTransfer(arg::content=Message(report.str(), "control"));
+ connection.close();
}
-
- void wait() {
- Mutex::ScopedLock l(lock);
- while (!done)
- lock.wait();
- }
-};
-
-void ListenThread::run() {
- Connection connection;
- Channel channel;
- Message msg;
- Message msg1;
- cout << "Started listener." << endl;;
- opts.open(connection);
- connection.openChannel(channel);
- channel.start();
-
- string queueControl = "control";
- Queue response(queueControl);
- channel.declareQueue(response);
- channel.bind(Exchange::STANDARD_TOPIC_EXCHANGE, response, queueControl);
- while (!channel.get(msg, response, AUTO_ACK)) {
- QPID_LOG(info, "Listener: waiting for queue name.");
- sleep(1);
+ catch (const std::exception& e) {
+ cout << "PublishThread exception: " << e.what() << endl;
}
- string queueName =msg.getData();
- string queueNameC =queueName+ "-1";
-
- QPID_LOG(info, "Listener: Using Queue:" << queueName);
- QPID_LOG(info, "Listener: Reply Queue:" << queueNameC);
- // create consume queue
- Queue consume(queueName);
- channel.declareQueue(consume);
- channel.bind(Exchange::STANDARD_TOPIC_EXCHANGE, consume, queueName);
-
- // create completion queue
- Queue completion(queueNameC);
- channel.declareQueue(completion);
- channel.bind(Exchange::STANDARD_TOPIC_EXCHANGE, completion, queueNameC);
-
- Listener listener(queueName);
- channel.consume(consume, queueName, &listener);
- QPID_LOG(info, "Listener: consuming...");
-
- listener.wait();
-
- QPID_LOG(info, "Listener: send final message.");
- // complete.
- msg1.setData(queueName);
- channel.publish(msg1, Exchange::STANDARD_TOPIC_EXCHANGE, queueNameC);
-
- channel.close();
- connection.close();
}
-