summaryrefslogtreecommitdiff
path: root/cpp/src/tests/perftest.cpp
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2007-11-14 23:05:49 +0000
committerAlan Conway <aconway@apache.org>2007-11-14 23:05:49 +0000
commit592e0ceac703fd424033a83845a1ce7a19689b91 (patch)
tree273d7db932140b78039bee820511a811a578b2b0 /cpp/src/tests/perftest.cpp
parent3c7bae6c9ebfd0135e3d0dc1269b3f0255c50510 (diff)
downloadqpid-python-592e0ceac703fd424033a83845a1ce7a19689b91.tar.gz
perftest.cpp
- Remove heap allocation per message in. - Verify sequence numbers in message data. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@595115 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/tests/perftest.cpp')
-rw-r--r--cpp/src/tests/perftest.cpp59
1 files changed, 28 insertions, 31 deletions
diff --git a/cpp/src/tests/perftest.cpp b/cpp/src/tests/perftest.cpp
index abf090af8d..019b1e1fce 100644
--- a/cpp/src/tests/perftest.cpp
+++ b/cpp/src/tests/perftest.cpp
@@ -41,12 +41,12 @@ struct Opts : public TestOptions {
bool listen;
bool publish;
bool purge;
- int count;
- int size;
+ size_t count;
+ size_t size;
bool durable;
- int consumers;
+ size_t consumers;
std::string mode;
- int autoAck;
+ size_t autoAck;
bool summary;
Opts() :
@@ -109,12 +109,12 @@ int main(int argc, char** argv) {
std::vector<ListenThread> listen(opts.consumers);
PublishThread publish;
if (opts.listen)
- for (int i = 0; i < opts.consumers; ++i)
+ for (size_t i = 0; i < opts.consumers; ++i)
listen[i].thread=Thread(listen[i]);
if (opts.publish)
publish.thread=Thread(publish);
if (opts.listen)
- for (int i = 0; i < opts.consumers; ++i)
+ for (size_t i = 0; i < opts.consumers; ++i)
listen[i].thread.join();
if (opts.publish)
publish.thread.join();
@@ -155,26 +155,25 @@ void PublishThread::run() {
SubscriptionManager subs(session);
LocalQueue control;
subs.subscribe(control, "control");
- for (int i = 0; i < opts.consumers; ++i) {
+ for (size_t i = 0; i < opts.consumers; ++i) {
if (!opts.summary) cout << "." << flush;
expect(control.pop().getData(), "ready");
}
if (!opts.summary) cout << endl;
- // Create test message
- size_t msgSize=max(opts.size, 32);
- char* msgBuf = new char[msgSize];
- memset(msgBuf,'X', msgSize);
-
- Message msg(string(), "perftest");
+ size_t msgSize=max(opts.size, sizeof(size_t));
+ Message msg(string(msgSize, 'X'), "perftest");
if (opts.durable)
msg.getDeliveryProperties().setDeliveryMode(framing::PERSISTENT);
- // Time sending message.
+
AbsTime start=now();
- if (!opts.summary) cout << "Publishing " << opts.count << " messages " << flush;
- for (int i=0; i<opts.count; i++) {
- sprintf(msgBuf, "%d", i);
- msg.setData(string(msgBuf,msgSize));
+ if (!opts.summary) cout << "Publishing " << opts.count
+ << " messages " << flush;
+ for (size_t i=0; i<opts.count; i++) {
+ // Stamp the iteration into the message data, careful to avoid
+ // any heap allocation.
+ char* data = const_cast<char*>(msg.getData().data());
+ *reinterpret_cast<uint32_t*>(data) = i;
session.messageTransfer(arg::destination=exchange(),
arg::content=msg);
if (!opts.summary && (i%10000)==0){
@@ -182,13 +181,12 @@ void PublishThread::run() {
session.execution().sendSyncRequest();
}
}
- delete [] msgBuf;
//Completion compl;
if (!opts.summary) cout << " done." << endl;
msg.setData("done"); // Send done messages.
if (mode==SHARED)
- for (int i = 0; i < opts.consumers; ++i)
+ for (size_t i = 0; i < opts.consumers; ++i)
session.messageTransfer(arg::destination=exchange(), arg::content=msg);
else
session.messageTransfer(arg::destination=exchange(), arg::content=msg);
@@ -203,20 +201,10 @@ void PublishThread::run() {
<< "publish secs:" << secs(start,end) << endl
<< "publish rate:" << publish_rate << endl;
-
-
- // Report
-// end=now(); //compl.wait(); (wait for publish confirm of write if durable)
-// publish_rate=(opts.count)/secs(start,end);
-// if (!opts.summary)
-// cout << endl
-// << "synced secs:" << secs(start,end) << endl
-// << "synced rate:" << publish_rate << endl;
-
double consume_rate = 0; // Average rate for consumers.
// Wait for consumer(s) to finish.
if (!opts.summary) cout << "Waiting for consumers done " << endl;
- for (int i = 0; i < opts.consumers; ++i) {
+ for (size_t i = 0; i < opts.consumers; ++i) {
string report=control.pop().getData();
if (!opts.summary)
cout << endl << report;
@@ -283,7 +271,16 @@ void ListenThread::run() {
int consumed=0;
AbsTime start=now();
Message msg;
+ size_t i = 0;
while ((msg=consume.pop()).getData() != "done") {
+ char* data=const_cast<char*>(msg.getData().data());
+ size_t j=*reinterpret_cast<size_t*>(data);
+ if (i > j)
+ throw Exception(
+ QPID_MSG("Messages out of order " << i
+ << " before " << j));
+ else
+ i = j;
++consumed;
}
msg.acknowledge(); // Ack all outstanding messages -- ??