diff options
author | Alan Conway <aconway@apache.org> | 2007-11-14 23:05:49 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2007-11-14 23:05:49 +0000 |
commit | 592e0ceac703fd424033a83845a1ce7a19689b91 (patch) | |
tree | 273d7db932140b78039bee820511a811a578b2b0 /cpp/src/tests/perftest.cpp | |
parent | 3c7bae6c9ebfd0135e3d0dc1269b3f0255c50510 (diff) | |
download | qpid-python-592e0ceac703fd424033a83845a1ce7a19689b91.tar.gz |
perftest.cpp
- Remove heap allocation per message in.
- Verify sequence numbers in message data.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@595115 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/tests/perftest.cpp')
-rw-r--r-- | cpp/src/tests/perftest.cpp | 59 |
1 files changed, 28 insertions, 31 deletions
diff --git a/cpp/src/tests/perftest.cpp b/cpp/src/tests/perftest.cpp index abf090af8d..019b1e1fce 100644 --- a/cpp/src/tests/perftest.cpp +++ b/cpp/src/tests/perftest.cpp @@ -41,12 +41,12 @@ struct Opts : public TestOptions { bool listen; bool publish; bool purge; - int count; - int size; + size_t count; + size_t size; bool durable; - int consumers; + size_t consumers; std::string mode; - int autoAck; + size_t autoAck; bool summary; Opts() : @@ -109,12 +109,12 @@ int main(int argc, char** argv) { std::vector<ListenThread> listen(opts.consumers); PublishThread publish; if (opts.listen) - for (int i = 0; i < opts.consumers; ++i) + for (size_t i = 0; i < opts.consumers; ++i) listen[i].thread=Thread(listen[i]); if (opts.publish) publish.thread=Thread(publish); if (opts.listen) - for (int i = 0; i < opts.consumers; ++i) + for (size_t i = 0; i < opts.consumers; ++i) listen[i].thread.join(); if (opts.publish) publish.thread.join(); @@ -155,26 +155,25 @@ void PublishThread::run() { SubscriptionManager subs(session); LocalQueue control; subs.subscribe(control, "control"); - for (int i = 0; i < opts.consumers; ++i) { + for (size_t i = 0; i < opts.consumers; ++i) { if (!opts.summary) cout << "." << flush; expect(control.pop().getData(), "ready"); } if (!opts.summary) cout << endl; - // Create test message - size_t msgSize=max(opts.size, 32); - char* msgBuf = new char[msgSize]; - memset(msgBuf,'X', msgSize); - - Message msg(string(), "perftest"); + size_t msgSize=max(opts.size, sizeof(size_t)); + Message msg(string(msgSize, 'X'), "perftest"); if (opts.durable) msg.getDeliveryProperties().setDeliveryMode(framing::PERSISTENT); - // Time sending message. + AbsTime start=now(); - if (!opts.summary) cout << "Publishing " << opts.count << " messages " << flush; - for (int i=0; i<opts.count; i++) { - sprintf(msgBuf, "%d", i); - msg.setData(string(msgBuf,msgSize)); + if (!opts.summary) cout << "Publishing " << opts.count + << " messages " << flush; + for (size_t i=0; i<opts.count; i++) { + // Stamp the iteration into the message data, careful to avoid + // any heap allocation. + char* data = const_cast<char*>(msg.getData().data()); + *reinterpret_cast<uint32_t*>(data) = i; session.messageTransfer(arg::destination=exchange(), arg::content=msg); if (!opts.summary && (i%10000)==0){ @@ -182,13 +181,12 @@ void PublishThread::run() { session.execution().sendSyncRequest(); } } - delete [] msgBuf; //Completion compl; if (!opts.summary) cout << " done." << endl; msg.setData("done"); // Send done messages. if (mode==SHARED) - for (int i = 0; i < opts.consumers; ++i) + for (size_t i = 0; i < opts.consumers; ++i) session.messageTransfer(arg::destination=exchange(), arg::content=msg); else session.messageTransfer(arg::destination=exchange(), arg::content=msg); @@ -203,20 +201,10 @@ void PublishThread::run() { << "publish secs:" << secs(start,end) << endl << "publish rate:" << publish_rate << endl; - - - // Report -// end=now(); //compl.wait(); (wait for publish confirm of write if durable) -// publish_rate=(opts.count)/secs(start,end); -// if (!opts.summary) -// cout << endl -// << "synced secs:" << secs(start,end) << endl -// << "synced rate:" << publish_rate << endl; - double consume_rate = 0; // Average rate for consumers. // Wait for consumer(s) to finish. if (!opts.summary) cout << "Waiting for consumers done " << endl; - for (int i = 0; i < opts.consumers; ++i) { + for (size_t i = 0; i < opts.consumers; ++i) { string report=control.pop().getData(); if (!opts.summary) cout << endl << report; @@ -283,7 +271,16 @@ void ListenThread::run() { int consumed=0; AbsTime start=now(); Message msg; + size_t i = 0; while ((msg=consume.pop()).getData() != "done") { + char* data=const_cast<char*>(msg.getData().data()); + size_t j=*reinterpret_cast<size_t*>(data); + if (i > j) + throw Exception( + QPID_MSG("Messages out of order " << i + << " before " << j)); + else + i = j; ++consumed; } msg.acknowledge(); // Ack all outstanding messages -- ?? |