summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/tests/perftest.cpp
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2008-06-09 20:53:57 +0000
committerGordon Sim <gsim@apache.org>2008-06-09 20:53:57 +0000
commit139e4884dbc5efb83d1de9b4562ed26fd8f16f34 (patch)
treeeea32f79b1996b1b1e66e67f495e9b2ec16c9356 /qpid/cpp/src/tests/perftest.cpp
parent7df7f79a8a05a64b6f70d64202f628bce21f8d42 (diff)
downloadqpid-python-139e4884dbc5efb83d1de9b4562ed26fd8f16f34.tar.gz
Moved from AccumulatedAck to SequenceSet in managing transactional accepts
Added transactional option to perftest Removed clientid from ConnectionSettings as it appears not to be used git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@665890 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src/tests/perftest.cpp')
-rw-r--r--qpid/cpp/src/tests/perftest.cpp29
1 files changed, 20 insertions, 9 deletions
diff --git a/qpid/cpp/src/tests/perftest.cpp b/qpid/cpp/src/tests/perftest.cpp
index e0e947eb74..23e9e565e0 100644
--- a/qpid/cpp/src/tests/perftest.cpp
+++ b/qpid/cpp/src/tests/perftest.cpp
@@ -95,8 +95,9 @@ struct Opts : public TestOptions {
size_t iterations;
Mode mode;
bool summary;
- uint32_t intervalSub;
- uint32_t intervalPub;
+ uint32_t intervalSub;
+ uint32_t intervalPub;
+ size_t tx;
static const std::string helpText;
@@ -106,7 +107,7 @@ struct Opts : public TestOptions {
pubs(1), count(500000), size(1024), confirm(true), durable(false), uniqueData(false), syncPub(false),
subs(1), ack(0),
qt(1), iterations(1), mode(SHARED), summary(false),
- intervalSub(0), intervalPub(0)
+ intervalSub(0), intervalPub(0), tx(0)
{
addOptions()
("setup", optValue(setup), "Create shared queues.")
@@ -140,7 +141,9 @@ struct Opts : public TestOptions {
("queue-durable", optValue(queueDurable, "N"), "Make queue durable (implied if durable set)")
("interval_sub", optValue(intervalSub, "ms"), ">=0 delay between msg consume")
- ("interval_pub", optValue(intervalPub, "ms"), ">=0 delay between msg publish");
+ ("interval_pub", optValue(intervalPub, "ms"), ">=0 delay between msg publish")
+
+ ("tx", optValue(tx, "N"), "if non-zero, the transaction batch size");
}
// Computed values
@@ -450,6 +453,7 @@ struct PublishThread : public Client {
msg.getDeliveryProperties().setDeliveryMode(framing::PERSISTENT);
+ if (opts.tx) sync(session).txSelect();
SubscriptionManager subs(session);
LocalQueue lq;
subs.setFlowControl(1, SubscriptionManager::UNLIMITED, true);
@@ -474,6 +478,7 @@ struct PublishThread : public Client {
arg::content=msg,
arg::acceptMode=1);
}
+ if (opts.tx && (j % opts.tx == 0)) sync(session).txCommit();
if (opts.intervalPub) ::usleep(opts.intervalPub*1000);
}
if (opts.confirm) session.sync();
@@ -483,6 +488,7 @@ struct PublishThread : public Client {
// Send result to controller.
Message report(lexical_cast<string>(opts.count/time), "pub_done");
session.messageTransfer(arg::content=report, arg::acceptMode=1);
+ if (opts.tx) sync(session).txCommit();
}
session.close();
}
@@ -523,16 +529,17 @@ struct SubscribeThread : public Client {
}
void run() { // Subscribe
- try {
+ try {
+ if (opts.tx) sync(session).txSelect();
SubscriptionManager subs(session);
- LocalQueue lq(AckPolicy(opts.ack));
- subs.setAcceptMode(opts.ack > 0 ? 0 : 1);
+ LocalQueue lq(AckPolicy(opts.tx ? opts.tx : opts.ack));
+ subs.setAcceptMode(opts.tx || opts.ack ? 0 : 1);
subs.setFlowControl(opts.subQuota, SubscriptionManager::UNLIMITED,
false);
subs.subscribe(lq, queue);
// Notify controller we are ready.
session.messageTransfer(arg::content=Message("ready", "sub_ready"), arg::acceptMode=1);
-
+ if (opts.tx) sync(session).txCommit();
for (size_t j = 0; j < opts.iterations; ++j) {
if (j > 0) {
@@ -544,6 +551,7 @@ struct SubscribeThread : public Client {
size_t expect=0;
for (size_t i = 0; i < opts.subQuota; ++i) {
msg=lq.pop();
+ if (opts.tx && (i % opts.tx == 0)) sync(session).txCommit();
if (opts.intervalSub) ::usleep(opts.intervalSub*1000);
// TODO aconway 2007-11-23: check message order for.
// multiple publishers. Need an array of counters,
@@ -560,14 +568,17 @@ struct SubscribeThread : public Client {
expect = n+1;
}
}
- if (opts.ack !=0)
+ if (opts.ack)
subs.getAckPolicy().ackOutstanding(session); // Cumulative ack for final batch.
+ if (opts.tx)
+ sync(session).txCommit();
AbsTime end=now();
// Report to publisher.
Message result(lexical_cast<string>(opts.subQuota/secs(start,end)),
"sub_done");
session.messageTransfer(arg::content=result, arg::acceptMode=1);
+ if (opts.tx) sync(session).txCommit();
}
session.close();
}