summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKim van der Riet <kpvdr@apache.org>2008-08-18 19:15:43 +0000
committerKim van der Riet <kpvdr@apache.org>2008-08-18 19:15:43 +0000
commite4540065984e2a791a3869826e0c03d596fce7eb (patch)
treedc42bf1b43fcfcb34424bdf7a9cd1092a5ae278a
parentfe569210176b355cc161561160ec8b2b23920919 (diff)
downloadqpid-python-e4540065984e2a791a3869826e0c03d596fce7eb.tar.gz
Added --dtx option to txtest for DTX transaction testing
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@686854 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--cpp/src/tests/txtest.cpp33
1 files changed, 28 insertions, 5 deletions
diff --git a/cpp/src/tests/txtest.cpp b/cpp/src/tests/txtest.cpp
index 6eb812738d..040e7e6613 100644
--- a/cpp/src/tests/txtest.cpp
+++ b/cpp/src/tests/txtest.cpp
@@ -20,6 +20,7 @@
*/
#include <algorithm>
+#include <iomanip>
#include <iostream>
#include <memory>
#include <sstream>
@@ -47,10 +48,12 @@ struct Args : public qpid::TestOptions {
uint msgsPerTx;
uint txCount;
uint totalMsgCount;
+ bool dtx;
Args() : init(true), transfer(true), check(true),
size(256), durable(true), queues(2),
- base("tx-test"), msgsPerTx(1), txCount(1), totalMsgCount(10)
+ base("tx-test"), msgsPerTx(1), txCount(1), totalMsgCount(10),
+ dtx(false)
{
addOptions()
@@ -63,7 +66,8 @@ struct Args : public qpid::TestOptions {
("queue-base-name", optValue(base, "<name>"), "base name for queues")
("messages-per-tx", optValue(msgsPerTx, "N"), "number of messages transferred per transaction")
("tx-count", optValue(txCount, "N"), "number of transactions per 'agent'")
- ("total-messages", optValue(totalMsgCount, "N"), "total number of messages in 'circulation'");
+ ("total-messages", optValue(totalMsgCount, "N"), "total number of messages in 'circulation'")
+ ("dtx", optValue(dtx, "yes|no"), "use distributed transactions");
}
};
@@ -120,14 +124,17 @@ struct Transfer : public Client, public Runnable
std::string src;
std::string dest;
Thread thread;
+ unsigned long xid_cnt;
+ framing::Xid xid;
- Transfer(const std::string& to, const std::string& from) : src(to), dest(from) {}
+ Transfer(const std::string& to, const std::string& from) : src(to), dest(from), xid_cnt(0), xid(0x4c414e47, "", from) {}
void run()
{
try {
- session.txSelect();
+ if (opts.dtx) session.dtxSelect();
+ else session.txSelect();
SubscriptionManager subs(session);
LocalQueue lq(AckPolicy(0));//manual acking
@@ -137,6 +144,10 @@ struct Transfer : public Client, public Runnable
for (uint t = 0; t < opts.txCount; t++) {
Message in;
Message out("", dest);
+ if (opts.dtx) {
+ setNextXid(xid);
+ session.dtxStart(arg::xid=xid);
+ }
for (uint m = 0; m < opts.msgsPerTx; m++) {
in = lq.pop();
out.setData(in.getData());
@@ -145,12 +156,24 @@ struct Transfer : public Client, public Runnable
session.messageTransfer(arg::content=out, arg::acceptMode=1);
}
lq.getAckPolicy().ackOutstanding(session);
- session.txCommit();
+ if (opts.dtx) {
+ session.dtxEnd(arg::xid=xid);
+ session.dtxPrepare(arg::xid=xid);
+ session.dtxCommit(arg::xid=xid);
+ } else {
+ session.txCommit();
+ }
}
} catch(const std::exception& e) {
std::cout << "Transfer interrupted: " << e.what() << std::endl;
}
}
+
+ void setNextXid(framing::Xid& xid) {
+ std::ostringstream oss;
+ oss << std::setfill('0') << std::hex << "xid-" << std::setw(12) << (++xid_cnt);
+ xid.setGlobalId(oss.str());
+ }
};
struct Controller : public Client