summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKim van der Riet <kpvdr@apache.org>2008-08-19 19:37:39 +0000
committerKim van der Riet <kpvdr@apache.org>2008-08-19 19:37:39 +0000
commitdd0742736ddf61278df5e73650d1d21eb2f2db12 (patch)
tree7dfc2dfcfc634d481d303ddebf0d453016baadd1
parent95fe018230b23aac426282b12fbf8d6b9b048b75 (diff)
downloadqpid-python-dd0742736ddf61278df5e73650d1d21eb2f2db12.tar.gz
Missing DTX recover code for --dtx mode in txtest
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@687140 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/src/qpid/framing/Array.h2
-rw-r--r--qpid/cpp/src/tests/txtest.cpp29
2 files changed, 30 insertions, 1 deletions
diff --git a/qpid/cpp/src/qpid/framing/Array.h b/qpid/cpp/src/qpid/framing/Array.h
index 1367a023f2..2cbd4c0b29 100644
--- a/qpid/cpp/src/qpid/framing/Array.h
+++ b/qpid/cpp/src/qpid/framing/Array.h
@@ -54,7 +54,7 @@ class Array
void add(ValuePtr value);
template <class T>
- void collect(std::vector<T>& out)
+ void collect(std::vector<T>& out) const
{
for (ValueVector::const_iterator i = values.begin(); i != values.end(); ++i) {
out.push_back((*i)->get<T>());
diff --git a/qpid/cpp/src/tests/txtest.cpp b/qpid/cpp/src/tests/txtest.cpp
index 040e7e6613..fec7c18b4a 100644
--- a/qpid/cpp/src/tests/txtest.cpp
+++ b/qpid/cpp/src/tests/txtest.cpp
@@ -31,6 +31,8 @@
#include "qpid/client/Message.h"
#include "qpid/client/AsyncSession.h"
#include "qpid/client/SubscriptionManager.h"
+#include "qpid/framing/Array.h"
+#include "qpid/framing/Buffer.h"
using namespace qpid;
using namespace qpid::client;
@@ -231,6 +233,33 @@ struct Controller : public Client
subs.setFlowControl(SubscriptionManager::UNLIMITED, SubscriptionManager::UNLIMITED, false);
subs.setAcceptMode(1/*not-required*/);
+ // Recover DTX transactions (if any)
+ if (opts.dtx) {
+ std::vector<std::string> inDoubtXids;
+ framing::DtxRecoverResult dtxRes = session.dtxRecover().get();
+ const framing::Array& xidArr = dtxRes.getInDoubt();
+ xidArr.collect(inDoubtXids);
+
+ if (inDoubtXids.size()) {
+ std::cout << "Recovering DTX in-doubt transaction(s):" << std::endl;
+ framing::StructHelper decoder;
+ framing::Xid xid;
+ // abort even, commit odd transactions
+// for (std::vector<std::string>::const_iterator i = inDoubtXids.begin(), int cnt = 1; i < inDoubtXids.end(); i++, cnt++) {
+ for (unsigned i = 0; i < inDoubtXids.size(); i++) {
+ decoder.decode(xid, inDoubtXids[i]);
+ std::cout << (i%2 ? " * aborting " : " * committing ");
+ xid.print(std::cout);
+ std::cout << std::endl;
+ if (i%2) {
+ session.dtxRollback(arg::xid=xid);
+ } else {
+ session.dtxCommit(arg::xid=xid);
+ }
+ }
+ }
+ }
+
StringSet drained;
//drain each queue and verify the correct set of messages are available
for (StringSet::iterator i = queues.begin(); i != queues.end(); i++) {