diff options
author | Gordon Sim <gsim@apache.org> | 2008-06-09 20:53:57 +0000 |
---|---|---|
committer | Gordon Sim <gsim@apache.org> | 2008-06-09 20:53:57 +0000 |
commit | 139e4884dbc5efb83d1de9b4562ed26fd8f16f34 (patch) | |
tree | eea32f79b1996b1b1e66e67f495e9b2ec16c9356 /qpid/cpp/src/tests/perftest.cpp | |
parent | 7df7f79a8a05a64b6f70d64202f628bce21f8d42 (diff) | |
download | qpid-python-139e4884dbc5efb83d1de9b4562ed26fd8f16f34.tar.gz |
Moved from AccumulatedAck to SequenceSet in managing transactional accepts
Added transactional option to perftest
Removed clientid from ConnectionSettings as it appears not to be used
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@665890 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src/tests/perftest.cpp')
-rw-r--r-- | qpid/cpp/src/tests/perftest.cpp | 29 |
1 files changed, 20 insertions, 9 deletions
diff --git a/qpid/cpp/src/tests/perftest.cpp b/qpid/cpp/src/tests/perftest.cpp index e0e947eb74..23e9e565e0 100644 --- a/qpid/cpp/src/tests/perftest.cpp +++ b/qpid/cpp/src/tests/perftest.cpp @@ -95,8 +95,9 @@ struct Opts : public TestOptions { size_t iterations; Mode mode; bool summary; - uint32_t intervalSub; - uint32_t intervalPub; + uint32_t intervalSub; + uint32_t intervalPub; + size_t tx; static const std::string helpText; @@ -106,7 +107,7 @@ struct Opts : public TestOptions { pubs(1), count(500000), size(1024), confirm(true), durable(false), uniqueData(false), syncPub(false), subs(1), ack(0), qt(1), iterations(1), mode(SHARED), summary(false), - intervalSub(0), intervalPub(0) + intervalSub(0), intervalPub(0), tx(0) { addOptions() ("setup", optValue(setup), "Create shared queues.") @@ -140,7 +141,9 @@ struct Opts : public TestOptions { ("queue-durable", optValue(queueDurable, "N"), "Make queue durable (implied if durable set)") ("interval_sub", optValue(intervalSub, "ms"), ">=0 delay between msg consume") - ("interval_pub", optValue(intervalPub, "ms"), ">=0 delay between msg publish"); + ("interval_pub", optValue(intervalPub, "ms"), ">=0 delay between msg publish") + + ("tx", optValue(tx, "N"), "if non-zero, the transaction batch size"); } // Computed values @@ -450,6 +453,7 @@ struct PublishThread : public Client { msg.getDeliveryProperties().setDeliveryMode(framing::PERSISTENT); + if (opts.tx) sync(session).txSelect(); SubscriptionManager subs(session); LocalQueue lq; subs.setFlowControl(1, SubscriptionManager::UNLIMITED, true); @@ -474,6 +478,7 @@ struct PublishThread : public Client { arg::content=msg, arg::acceptMode=1); } + if (opts.tx && (j % opts.tx == 0)) sync(session).txCommit(); if (opts.intervalPub) ::usleep(opts.intervalPub*1000); } if (opts.confirm) session.sync(); @@ -483,6 +488,7 @@ struct PublishThread : public Client { // Send result to controller. Message report(lexical_cast<string>(opts.count/time), "pub_done"); session.messageTransfer(arg::content=report, arg::acceptMode=1); + if (opts.tx) sync(session).txCommit(); } session.close(); } @@ -523,16 +529,17 @@ struct SubscribeThread : public Client { } void run() { // Subscribe - try { + try { + if (opts.tx) sync(session).txSelect(); SubscriptionManager subs(session); - LocalQueue lq(AckPolicy(opts.ack)); - subs.setAcceptMode(opts.ack > 0 ? 0 : 1); + LocalQueue lq(AckPolicy(opts.tx ? opts.tx : opts.ack)); + subs.setAcceptMode(opts.tx || opts.ack ? 0 : 1); subs.setFlowControl(opts.subQuota, SubscriptionManager::UNLIMITED, false); subs.subscribe(lq, queue); // Notify controller we are ready. session.messageTransfer(arg::content=Message("ready", "sub_ready"), arg::acceptMode=1); - + if (opts.tx) sync(session).txCommit(); for (size_t j = 0; j < opts.iterations; ++j) { if (j > 0) { @@ -544,6 +551,7 @@ struct SubscribeThread : public Client { size_t expect=0; for (size_t i = 0; i < opts.subQuota; ++i) { msg=lq.pop(); + if (opts.tx && (i % opts.tx == 0)) sync(session).txCommit(); if (opts.intervalSub) ::usleep(opts.intervalSub*1000); // TODO aconway 2007-11-23: check message order for. // multiple publishers. Need an array of counters, @@ -560,14 +568,17 @@ struct SubscribeThread : public Client { expect = n+1; } } - if (opts.ack !=0) + if (opts.ack) subs.getAckPolicy().ackOutstanding(session); // Cumulative ack for final batch. + if (opts.tx) + sync(session).txCommit(); AbsTime end=now(); // Report to publisher. Message result(lexical_cast<string>(opts.subQuota/secs(start,end)), "sub_done"); session.messageTransfer(arg::content=result, arg::acceptMode=1); + if (opts.tx) sync(session).txCommit(); } session.close(); } |