summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/tests/perftest.cpp
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2008-10-24 12:34:29 +0000
committerGordon Sim <gsim@apache.org>2008-10-24 12:34:29 +0000
commit8aab8c9e44cdf21762aef1e20fb29811f2272d19 (patch)
tree17fcab8698b5bb90e6942619b683015c8a7c2511 /qpid/cpp/src/tests/perftest.cpp
parent7e04e141dfb61fd0477c06a0e77d3c7e7be05761 (diff)
downloadqpid-python-8aab8c9e44cdf21762aef1e20fb29811f2272d19.tar.gz
Revised transactional options to perftest as they could not be used on older boost versions due to ambiguity.
Refactored TxAccept to avoid excessive testing and searching for delivery records. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@707615 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src/tests/perftest.cpp')
-rw-r--r--qpid/cpp/src/tests/perftest.cpp74
1 files changed, 44 insertions, 30 deletions
diff --git a/qpid/cpp/src/tests/perftest.cpp b/qpid/cpp/src/tests/perftest.cpp
index 923405779c..2e15489525 100644
--- a/qpid/cpp/src/tests/perftest.cpp
+++ b/qpid/cpp/src/tests/perftest.cpp
@@ -97,9 +97,10 @@ struct Opts : public TestOptions {
bool summary;
uint32_t intervalSub;
uint32_t intervalPub;
- size_t tx_pub;
- bool tx_pub_async;
- size_t tx_sub;
+ size_t tx;
+ size_t txPub;
+ size_t txSub;
+ bool commitAsync;
static const std::string helpText;
@@ -109,7 +110,7 @@ struct Opts : public TestOptions {
pubs(1), count(500000), size(1024), confirm(true), durable(false), uniqueData(false), syncPub(false),
subs(1), ack(0),
qt(1), iterations(1), mode(SHARED), summary(false),
- intervalSub(0), intervalPub(0), tx_pub(0), tx_pub_async(false), tx_sub(0)
+ intervalSub(0), intervalPub(0), tx(0), txPub(0), txSub(0), commitAsync(false)
{
addOptions()
("setup", optValue(setup), "Create shared queues.")
@@ -145,9 +146,10 @@ struct Opts : public TestOptions {
("interval_sub", optValue(intervalSub, "ms"), ">=0 delay between msg consume")
("interval_pub", optValue(intervalPub, "ms"), ">=0 delay between msg publish")
- ("tx_pub", optValue(tx_pub, "N"), "if non-zero, the transaction batch size for publishing")
- ("tx_pub_async", optValue(tx_pub_async, "yes|no"), "Publishing tx commit async")
- ("tx_sub", optValue(tx_sub, "N"), "if non-zero, the transaction batch size for consuming");
+ ("tx", optValue(tx, "N"), "if non-zero, the transaction batch size for publishing and consuming")
+ ("pub-tx", optValue(txPub, "N"), "if non-zero, the transaction batch size for publishing")
+ ("async-commit", optValue(commitAsync, "yes|no"), "Don't wait for completion of commit")
+ ("sub-tx", optValue(txSub, "N"), "if non-zero, the transaction batch size for consuming");
}
// Computed values
@@ -184,6 +186,18 @@ struct Opts : public TestOptions {
break;
}
transfers=(totalPubs*count) + (totalSubs*subQuota);
+ if (tx) {
+ if (txPub) {
+ cerr << "WARNING: Using overriden tx value for publishers: " << txPub << std::endl;
+ } else {
+ txPub = tx;
+ }
+ if (txSub) {
+ cerr << "WARNING: Using overriden tx value for subscribers: " << txSub << std::endl;
+ } else {
+ txSub = tx;
+ }
+ }
}
};
@@ -457,12 +471,8 @@ struct PublishThread : public Client {
msg.getDeliveryProperties().setDeliveryMode(framing::PERSISTENT);
- if (opts.tx_pub){
- if (opts.tx_pub_async){
- session.txSelect();
- } else {
- sync(session).txSelect();
- }
+ if (opts.txPub){
+ session.txSelect();
}
SubscriptionManager subs(session);
LocalQueue lq;
@@ -488,8 +498,8 @@ struct PublishThread : public Client {
arg::content=msg,
arg::acceptMode=1);
}
- if (opts.tx_pub && ((i+1) % opts.tx_pub == 0)){
- if (opts.tx_pub_async){
+ if (opts.txPub && ((i+1) % opts.txPub == 0)){
+ if (opts.commitAsync){
session.txCommit();
} else {
sync(session).txCommit();
@@ -504,12 +514,8 @@ struct PublishThread : public Client {
// Send result to controller.
Message report(lexical_cast<string>(opts.count/time), "pub_done");
session.messageTransfer(arg::content=report, arg::acceptMode=1);
- if (opts.tx_pub){
- if (opts.tx_pub_async){
- session.txCommit();
- }else{
- sync(session).txCommit();
- }
+ if (opts.txPub){
+ sync(session).txCommit();
}
}
session.close();
@@ -552,16 +558,19 @@ struct SubscribeThread : public Client {
void run() { // Subscribe
try {
- if (opts.tx_sub) sync(session).txSelect();
+ if (opts.txSub) sync(session).txSelect();
SubscriptionManager subs(session);
- LocalQueue lq(AckPolicy(opts.tx_sub ? opts.tx_sub : opts.ack));
- subs.setAcceptMode(opts.tx_sub || opts.ack ? 0 : 1);
+ LocalQueue lq(AckPolicy(opts.txSub ? opts.txSub : opts.ack));
+ subs.setAcceptMode(opts.txSub || opts.ack ? 0 : 1);
subs.setFlowControl(opts.subQuota, SubscriptionManager::UNLIMITED,
false);
subs.subscribe(lq, queue);
// Notify controller we are ready.
session.messageTransfer(arg::content=Message("ready", "sub_ready"), arg::acceptMode=1);
- if (opts.tx_sub) sync(session).txCommit();
+ if (opts.txSub) {
+ if (opts.commitAsync) session.txCommit();
+ else sync(session).txCommit();
+ }
for (size_t j = 0; j < opts.iterations; ++j) {
if (j > 0) {
@@ -573,7 +582,10 @@ struct SubscribeThread : public Client {
size_t expect=0;
for (size_t i = 0; i < opts.subQuota; ++i) {
msg=lq.pop();
- if (opts.tx_sub && ((i+1) % opts.tx_sub == 0)) sync(session).txCommit();
+ if (opts.txSub && ((i+1) % opts.txSub == 0)) {
+ if (opts.commitAsync) session.txCommit();
+ else sync(session).txCommit();
+ }
if (opts.intervalSub) ::usleep(opts.intervalSub*1000);
// TODO aconway 2007-11-23: check message order for.
// multiple publishers. Need an array of counters,
@@ -590,17 +602,19 @@ struct SubscribeThread : public Client {
expect = n+1;
}
}
- if (opts.tx_sub || opts.ack)
+ if (opts.txSub || opts.ack)
lq.getAckPolicy().ackOutstanding(session); // Cumulative ack for final batch.
- if (opts.tx_sub)
- sync(session).txCommit();
+ if (opts.txSub) {
+ if (opts.commitAsync) session.txCommit();
+ else sync(session).txCommit();
+ }
AbsTime end=now();
// Report to publisher.
Message result(lexical_cast<string>(opts.subQuota/secs(start,end)),
"sub_done");
session.messageTransfer(arg::content=result, arg::acceptMode=1);
- if (opts.tx_sub) sync(session).txCommit();
+ if (opts.txSub) sync(session).txCommit();
}
session.close();
}