diff options
Diffstat (limited to 'qpid/cpp')
-rw-r--r-- | qpid/cpp/src/tests/perftest.cpp | 28 |
1 files changed, 24 insertions, 4 deletions
diff --git a/qpid/cpp/src/tests/perftest.cpp b/qpid/cpp/src/tests/perftest.cpp index 8d970a0a6f..8e16844720 100644 --- a/qpid/cpp/src/tests/perftest.cpp +++ b/qpid/cpp/src/tests/perftest.cpp @@ -24,6 +24,7 @@ #include "qpid/client/Session_0_10.h" #include "qpid/client/SubscriptionManager.h" #include "qpid/client/Connection.h" +#include "qpid/client/Completion.h" #include "qpid/client/Message.h" #include "qpid/sys/Time.h" @@ -173,15 +174,22 @@ void PublishThread::run() { sprintf(msgBuf, "%d", i); session.messageTransfer(arg::destination=exchange(), arg::content=msg); - if (!opts.summary && (i%10000)==0) cout << "." << flush; + if (!opts.summary && (i%10000)==0){ + cout << "." << flush; + session.execution().sendSyncRequest(); + } } + session.execution().sendSyncRequest(); + + //Completion compl; if (!opts.summary) cout << " done." << endl; msg.setData("done"); // Send done messages. if (mode==SHARED) for (int i = 0; i < opts.consumers; ++i) - session.messageTransfer(arg::destination=exchange(), arg::content=msg); + session.messageTransfer(arg::destination=exchange(), arg::content=msg); else session.messageTransfer(arg::destination=exchange(), arg::content=msg); + session.execution().sendSyncRequest(); AbsTime end=now(); // Report @@ -191,6 +199,16 @@ void PublishThread::run() { << "publish count:" << opts.count << endl << "publish secs:" << secs(start,end) << endl << "publish rate:" << publish_rate << endl; + + + + // Report +// end=now(); //compl.wait(); (wait for publish confirm of write if durable) +// publish_rate=(opts.count)/secs(start,end); +// if (!opts.summary) +// cout << endl +// << "synced secs:" << secs(start,end) << endl +// << "synced rate:" << publish_rate << endl; double consume_rate = 0; // Average rate for consumers. // Wait for consumer(s) to finish. @@ -247,7 +265,8 @@ void ListenThread::run() { consumeQueue=session.getId().str(); // Unique name. session.queueDeclare(arg::queue=consumeQueue, arg::exclusive=true, - arg::autoDelete=true); + arg::autoDelete=true, + arg::durable=opts.durable); session.queueBind(arg::queue=consumeQueue, arg::exchange=exchange(), arg::routingKey="perftest"); @@ -263,8 +282,9 @@ void ListenThread::run() { Message msg; while ((msg=consume.pop()).getData() != "done") { ++consumed; + } - msg.acknowledge(); // Ack all outstanding messages. + msg.acknowledge(); // Ack all outstanding messages -- ?? AbsTime end=now(); // Report to publisher. |