From e4540065984e2a791a3869826e0c03d596fce7eb Mon Sep 17 00:00:00 2001 From: Kim van der Riet Date: Mon, 18 Aug 2008 19:15:43 +0000 Subject: Added --dtx option to txtest for DTX transaction testing git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@686854 13f79535-47bb-0310-9956-ffa450edef68 --- cpp/src/tests/txtest.cpp | 33 ++++++++++++++++++++++++++++----- 1 file changed, 28 insertions(+), 5 deletions(-) diff --git a/cpp/src/tests/txtest.cpp b/cpp/src/tests/txtest.cpp index 6eb812738d..040e7e6613 100644 --- a/cpp/src/tests/txtest.cpp +++ b/cpp/src/tests/txtest.cpp @@ -20,6 +20,7 @@ */ #include +#include #include #include #include @@ -47,10 +48,12 @@ struct Args : public qpid::TestOptions { uint msgsPerTx; uint txCount; uint totalMsgCount; + bool dtx; Args() : init(true), transfer(true), check(true), size(256), durable(true), queues(2), - base("tx-test"), msgsPerTx(1), txCount(1), totalMsgCount(10) + base("tx-test"), msgsPerTx(1), txCount(1), totalMsgCount(10), + dtx(false) { addOptions() @@ -63,7 +66,8 @@ struct Args : public qpid::TestOptions { ("queue-base-name", optValue(base, ""), "base name for queues") ("messages-per-tx", optValue(msgsPerTx, "N"), "number of messages transferred per transaction") ("tx-count", optValue(txCount, "N"), "number of transactions per 'agent'") - ("total-messages", optValue(totalMsgCount, "N"), "total number of messages in 'circulation'"); + ("total-messages", optValue(totalMsgCount, "N"), "total number of messages in 'circulation'") + ("dtx", optValue(dtx, "yes|no"), "use distributed transactions"); } }; @@ -120,14 +124,17 @@ struct Transfer : public Client, public Runnable std::string src; std::string dest; Thread thread; + unsigned long xid_cnt; + framing::Xid xid; - Transfer(const std::string& to, const std::string& from) : src(to), dest(from) {} + Transfer(const std::string& to, const std::string& from) : src(to), dest(from), xid_cnt(0), xid(0x4c414e47, "", from) {} void run() { try { - session.txSelect(); + if (opts.dtx) session.dtxSelect(); + else session.txSelect(); SubscriptionManager subs(session); LocalQueue lq(AckPolicy(0));//manual acking @@ -137,6 +144,10 @@ struct Transfer : public Client, public Runnable for (uint t = 0; t < opts.txCount; t++) { Message in; Message out("", dest); + if (opts.dtx) { + setNextXid(xid); + session.dtxStart(arg::xid=xid); + } for (uint m = 0; m < opts.msgsPerTx; m++) { in = lq.pop(); out.setData(in.getData()); @@ -145,12 +156,24 @@ struct Transfer : public Client, public Runnable session.messageTransfer(arg::content=out, arg::acceptMode=1); } lq.getAckPolicy().ackOutstanding(session); - session.txCommit(); + if (opts.dtx) { + session.dtxEnd(arg::xid=xid); + session.dtxPrepare(arg::xid=xid); + session.dtxCommit(arg::xid=xid); + } else { + session.txCommit(); + } } } catch(const std::exception& e) { std::cout << "Transfer interrupted: " << e.what() << std::endl; } } + + void setNextXid(framing::Xid& xid) { + std::ostringstream oss; + oss << std::setfill('0') << std::hex << "xid-" << std::setw(12) << (++xid_cnt); + xid.setGlobalId(oss.str()); + } }; struct Controller : public Client -- cgit v1.2.1