diff options
author | Alan Conway <aconway@apache.org> | 2007-11-07 23:07:51 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2007-11-07 23:07:51 +0000 |
commit | b930ecd07bc7af075bef2f3fa958bfc118ad5f84 (patch) | |
tree | 50249df94aa1864f9b682377edd10d226b073d93 /cpp | |
parent | bf7f244c28c899b90789e77eda5f585edda5bcd9 (diff) | |
download | qpid-python-b930ecd07bc7af075bef2f3fa958bfc118ad5f84.tar.gz |
Fix race condition in perftest.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@592941 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp')
-rw-r--r-- | cpp/src/tests/perftest.cpp | 55 |
1 files changed, 37 insertions, 18 deletions
diff --git a/cpp/src/tests/perftest.cpp b/cpp/src/tests/perftest.cpp index c775407cdf..d9316527bc 100644 --- a/cpp/src/tests/perftest.cpp +++ b/cpp/src/tests/perftest.cpp @@ -68,7 +68,24 @@ Mode mode; struct ListenThread : public Runnable { Thread thread; void run(); }; struct PublishThread : public Runnable { Thread thread; void run(); }; - + +// Create and purge the shared queues +void setup() { + cout << "Create shared queues" << endl; + Connection connection; + opts.open(connection); + Session_0_10 session = connection.newSession(); + session.setSynchronous(true); // Make sure this is all completed. + session.queueDeclare(arg::queue="control"); // Control queue + session.queuePurge(arg::queue="control"); + if (mode==SHARED) { + session.queueDeclare(arg::queue="perftest"); // Shared data queue + session.queuePurge(arg::queue="perftest"); + } + session.close(); + connection.close(); +} + int main(int argc, char** argv) { try { opts.parse(argc, argv); @@ -78,6 +95,7 @@ int main(int argc, char** argv) { else throw Exception("Invalid mode"); if (!opts.listen && !opts.publish) opts.listen = opts.publish = true; + setup(); std::vector<ListenThread> listen(opts.consumers); PublishThread publish; if (opts.listen) @@ -122,19 +140,16 @@ void PublishThread::run() { opts.open(connection); Session_0_10 session = connection.newSession(); - session.queueDeclare(arg::queue="control"); // Control queue - session.queuePurge(arg::queue="control"); - if (mode==SHARED) { - session.queueDeclare(arg::queue="perftest"); // Shared data queue - session.queuePurge(arg::queue="perftest"); - } - // Wait for consumers. + cout << "Publisher wating for consumers " << flush; SubscriptionManager subs(session); LocalQueue control; subs.subscribe(control, "control"); - for (int i = 0; i < opts.consumers; ++i) + for (int i = 0; i < opts.consumers; ++i) { + cout << "." << flush; expect(control.pop().getData(), "ready"); + } + cout << endl; // Create test message size_t msgSize=max(opts.size, 32); @@ -167,6 +182,7 @@ void PublishThread::run() { cout << "publish rate:" << (opts.count)/secs(start,end) << endl; // Wait for consumer(s) to finish. + cout << "Publisher wating for consumer reports. " << endl; for (int i = 0; i < opts.consumers; ++i) { string report=control.pop().getData(); if (report.find("consume") != 0) @@ -201,14 +217,11 @@ void ListenThread::run() { Session_0_10 session = connection.newSession(); string consumeQueue; - switch (mode) { - case SHARED: + if (mode == SHARED) { consumeQueue="perftest"; - session.queueDeclare(arg::queue="perftest"); - break; - case FANOUT: - case TOPIC: - consumeQueue=session.getId().str(); // Unique + } + else { + consumeQueue=session.getId().str(); // Unique name. session.queueDeclare(arg::queue=consumeQueue, arg::exclusive=true, arg::autoDelete=true); @@ -217,7 +230,6 @@ void ListenThread::run() { arg::routingKey="perftest"); } // Notify publisher we are ready. - session.queueDeclare(arg::queue="control"); // Control queue session.messageTransfer(arg::content=Message("ready", "control")); SubscriptionManager subs(session); @@ -226,8 +238,15 @@ void ListenThread::run() { int consumed=0; AbsTime start=now(); Message msg; - while ((msg=consume.pop()).getData() != "done") + if (!opts.publish) + cout << "Consuming " << flush; + while ((msg=consume.pop()).getData() != "done") { ++consumed; + if (!opts.publish && (consumed%10000) == 0) + cout << "." << flush; + } + if (!opts.publish) + cout << endl; msg.acknowledge(); // Ack all outstanding messages. AbsTime end=now(); |