diff options
author | Gordon Sim <gsim@apache.org> | 2008-10-24 12:34:29 +0000 |
---|---|---|
committer | Gordon Sim <gsim@apache.org> | 2008-10-24 12:34:29 +0000 |
commit | 8aab8c9e44cdf21762aef1e20fb29811f2272d19 (patch) | |
tree | 17fcab8698b5bb90e6942619b683015c8a7c2511 /qpid/cpp/src/tests/perftest.cpp | |
parent | 7e04e141dfb61fd0477c06a0e77d3c7e7be05761 (diff) | |
download | qpid-python-8aab8c9e44cdf21762aef1e20fb29811f2272d19.tar.gz |
Revised transactional options to perftest as they could not be used on older boost versions due to ambiguity.
Refactored TxAccept to avoid excessive testing and searching for delivery records.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@707615 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src/tests/perftest.cpp')
-rw-r--r-- | qpid/cpp/src/tests/perftest.cpp | 74 |
1 files changed, 44 insertions, 30 deletions
diff --git a/qpid/cpp/src/tests/perftest.cpp b/qpid/cpp/src/tests/perftest.cpp index 923405779c..2e15489525 100644 --- a/qpid/cpp/src/tests/perftest.cpp +++ b/qpid/cpp/src/tests/perftest.cpp @@ -97,9 +97,10 @@ struct Opts : public TestOptions { bool summary; uint32_t intervalSub; uint32_t intervalPub; - size_t tx_pub; - bool tx_pub_async; - size_t tx_sub; + size_t tx; + size_t txPub; + size_t txSub; + bool commitAsync; static const std::string helpText; @@ -109,7 +110,7 @@ struct Opts : public TestOptions { pubs(1), count(500000), size(1024), confirm(true), durable(false), uniqueData(false), syncPub(false), subs(1), ack(0), qt(1), iterations(1), mode(SHARED), summary(false), - intervalSub(0), intervalPub(0), tx_pub(0), tx_pub_async(false), tx_sub(0) + intervalSub(0), intervalPub(0), tx(0), txPub(0), txSub(0), commitAsync(false) { addOptions() ("setup", optValue(setup), "Create shared queues.") @@ -145,9 +146,10 @@ struct Opts : public TestOptions { ("interval_sub", optValue(intervalSub, "ms"), ">=0 delay between msg consume") ("interval_pub", optValue(intervalPub, "ms"), ">=0 delay between msg publish") - ("tx_pub", optValue(tx_pub, "N"), "if non-zero, the transaction batch size for publishing") - ("tx_pub_async", optValue(tx_pub_async, "yes|no"), "Publishing tx commit async") - ("tx_sub", optValue(tx_sub, "N"), "if non-zero, the transaction batch size for consuming"); + ("tx", optValue(tx, "N"), "if non-zero, the transaction batch size for publishing and consuming") + ("pub-tx", optValue(txPub, "N"), "if non-zero, the transaction batch size for publishing") + ("async-commit", optValue(commitAsync, "yes|no"), "Don't wait for completion of commit") + ("sub-tx", optValue(txSub, "N"), "if non-zero, the transaction batch size for consuming"); } // Computed values @@ -184,6 +186,18 @@ struct Opts : public TestOptions { break; } transfers=(totalPubs*count) + (totalSubs*subQuota); + if (tx) { + if (txPub) { + cerr << "WARNING: Using overriden tx value for publishers: " << txPub << std::endl; + } else { + txPub = tx; + } + if (txSub) { + cerr << "WARNING: Using overriden tx value for subscribers: " << txSub << std::endl; + } else { + txSub = tx; + } + } } }; @@ -457,12 +471,8 @@ struct PublishThread : public Client { msg.getDeliveryProperties().setDeliveryMode(framing::PERSISTENT); - if (opts.tx_pub){ - if (opts.tx_pub_async){ - session.txSelect(); - } else { - sync(session).txSelect(); - } + if (opts.txPub){ + session.txSelect(); } SubscriptionManager subs(session); LocalQueue lq; @@ -488,8 +498,8 @@ struct PublishThread : public Client { arg::content=msg, arg::acceptMode=1); } - if (opts.tx_pub && ((i+1) % opts.tx_pub == 0)){ - if (opts.tx_pub_async){ + if (opts.txPub && ((i+1) % opts.txPub == 0)){ + if (opts.commitAsync){ session.txCommit(); } else { sync(session).txCommit(); @@ -504,12 +514,8 @@ struct PublishThread : public Client { // Send result to controller. Message report(lexical_cast<string>(opts.count/time), "pub_done"); session.messageTransfer(arg::content=report, arg::acceptMode=1); - if (opts.tx_pub){ - if (opts.tx_pub_async){ - session.txCommit(); - }else{ - sync(session).txCommit(); - } + if (opts.txPub){ + sync(session).txCommit(); } } session.close(); @@ -552,16 +558,19 @@ struct SubscribeThread : public Client { void run() { // Subscribe try { - if (opts.tx_sub) sync(session).txSelect(); + if (opts.txSub) sync(session).txSelect(); SubscriptionManager subs(session); - LocalQueue lq(AckPolicy(opts.tx_sub ? opts.tx_sub : opts.ack)); - subs.setAcceptMode(opts.tx_sub || opts.ack ? 0 : 1); + LocalQueue lq(AckPolicy(opts.txSub ? opts.txSub : opts.ack)); + subs.setAcceptMode(opts.txSub || opts.ack ? 0 : 1); subs.setFlowControl(opts.subQuota, SubscriptionManager::UNLIMITED, false); subs.subscribe(lq, queue); // Notify controller we are ready. session.messageTransfer(arg::content=Message("ready", "sub_ready"), arg::acceptMode=1); - if (opts.tx_sub) sync(session).txCommit(); + if (opts.txSub) { + if (opts.commitAsync) session.txCommit(); + else sync(session).txCommit(); + } for (size_t j = 0; j < opts.iterations; ++j) { if (j > 0) { @@ -573,7 +582,10 @@ struct SubscribeThread : public Client { size_t expect=0; for (size_t i = 0; i < opts.subQuota; ++i) { msg=lq.pop(); - if (opts.tx_sub && ((i+1) % opts.tx_sub == 0)) sync(session).txCommit(); + if (opts.txSub && ((i+1) % opts.txSub == 0)) { + if (opts.commitAsync) session.txCommit(); + else sync(session).txCommit(); + } if (opts.intervalSub) ::usleep(opts.intervalSub*1000); // TODO aconway 2007-11-23: check message order for. // multiple publishers. Need an array of counters, @@ -590,17 +602,19 @@ struct SubscribeThread : public Client { expect = n+1; } } - if (opts.tx_sub || opts.ack) + if (opts.txSub || opts.ack) lq.getAckPolicy().ackOutstanding(session); // Cumulative ack for final batch. - if (opts.tx_sub) - sync(session).txCommit(); + if (opts.txSub) { + if (opts.commitAsync) session.txCommit(); + else sync(session).txCommit(); + } AbsTime end=now(); // Report to publisher. Message result(lexical_cast<string>(opts.subQuota/secs(start,end)), "sub_done"); session.messageTransfer(arg::content=result, arg::acceptMode=1); - if (opts.tx_sub) sync(session).txCommit(); + if (opts.txSub) sync(session).txCommit(); } session.close(); } |