summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker/DtxHandlerImpl.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/broker/DtxHandlerImpl.cpp')
-rw-r--r--cpp/src/qpid/broker/DtxHandlerImpl.cpp95
1 files changed, 72 insertions, 23 deletions
diff --git a/cpp/src/qpid/broker/DtxHandlerImpl.cpp b/cpp/src/qpid/broker/DtxHandlerImpl.cpp
index 933d787a8a..1d7c2df5f4 100644
--- a/cpp/src/qpid/broker/DtxHandlerImpl.cpp
+++ b/cpp/src/qpid/broker/DtxHandlerImpl.cpp
@@ -23,6 +23,7 @@
using namespace qpid::broker;
using qpid::framing::AMQP_ClientProxy;
+using qpid::framing::Buffer;
using qpid::framing::FieldTable;
using qpid::framing::MethodContext;
using std::string;
@@ -35,12 +36,22 @@ DtxHandlerImpl::DtxHandlerImpl(CoreRefs& parent) :
{
}
+const int XA_RBROLLBACK(1);
+const int XA_RBTIMEOUT(2);
+const int XA_HEURHAZ(3);
+const int XA_HEURCOM(4);
+const int XA_HEURRB(5);
+const int XA_HEURMIX(6);
+const int XA_RDONLY(7);
+const int XA_OK(8);
+
// DtxDemarcationHandler:
void DtxHandlerImpl::select(const MethodContext& context )
{
+ channel.selectDtx();
dClient.selectOk(context.getRequestId());
}
@@ -50,52 +61,58 @@ void DtxHandlerImpl::end(const MethodContext& context,
bool fail,
bool suspend)
{
- if (fail && suspend) {
- throw ConnectionException(503, "End and suspend cannot both be set.");
- }
- //TODO: handle fail
- if (suspend) {
- channel.suspendDtx(xid);
+ if (fail) {
+ channel.endDtx(xid, true);
+ if (suspend) {
+ throw ConnectionException(503, "End and suspend cannot both be set.");
+ } else {
+ dClient.endOk(XA_RBROLLBACK, context.getRequestId());
+ }
} else {
- channel.endDtx(xid);
+ if (suspend) {
+ channel.suspendDtx(xid);
+ } else {
+ channel.endDtx(xid, false);
+ }
+ dClient.endOk(XA_OK, context.getRequestId());
}
- dClient.endOk(0/*TODO - set flags*/, context.getRequestId());
}
void DtxHandlerImpl::start(const MethodContext& context,
u_int16_t /*ticket*/,
const string& xid,
- bool /*join*/,
+ bool join,
bool resume)
{
- //TODO: handle join
+ if (join && resume) {
+ throw ConnectionException(503, "Join and resume cannot both be set.");
+ }
if (resume) {
channel.resumeDtx(xid);
} else {
- channel.startDtx(xid, broker.getDtxManager());
+ channel.startDtx(xid, broker.getDtxManager(), join);
}
- dClient.startOk(0/*TODO - set flags*/, context.getRequestId());
+ dClient.startOk(XA_OK, context.getRequestId());
}
// DtxCoordinationHandler:
void DtxHandlerImpl::prepare(const MethodContext& context,
u_int16_t /*ticket*/,
- const string& xid )
+ const string& xid)
{
- broker.getDtxManager().prepare(xid);
- cClient.prepareOk(0/*TODO - set flags*/, context.getRequestId());
+ bool ok = broker.getDtxManager().prepare(xid);
+ cClient.prepareOk(ok ? XA_OK : XA_RBROLLBACK, context.getRequestId());
}
void DtxHandlerImpl::commit(const MethodContext& context,
u_int16_t /*ticket*/,
const string& xid,
- bool /*onePhase*/ )
+ bool onePhase)
{
- //TODO use onePhase flag to validate correct sequence
- broker.getDtxManager().commit(xid);
- cClient.commitOk(0/*TODO - set flags*/, context.getRequestId());
+ bool ok = broker.getDtxManager().commit(xid, onePhase);
+ cClient.commitOk(ok ? XA_OK : XA_RBROLLBACK, context.getRequestId());
}
@@ -104,22 +121,54 @@ void DtxHandlerImpl::rollback(const MethodContext& context,
const string& xid )
{
broker.getDtxManager().rollback(xid);
- cClient.rollbackOk(0/*TODO - set flags*/, context.getRequestId());
+ cClient.rollbackOk(XA_OK, context.getRequestId());
}
-void DtxHandlerImpl::recover(const MethodContext& /*context*/,
+void DtxHandlerImpl::recover(const MethodContext& context,
u_int16_t /*ticket*/,
bool /*startscan*/,
u_int32_t /*endscan*/ )
{
//TODO
+
+ //TODO: what do startscan and endscan actually mean?
+
+ // response should hold on key value pair with key = 'xids' and
+ // value = sequence of xids
+
+ // until sequences are supported (0-10 encoding), an alternate
+ // scheme is used for testing:
+ //
+ // key = 'xids' and value = a longstr containing shortstrs for each xid
+ //
+ // note that this restricts the length of the xids more than is
+ // strictly 'legal', but that is ok for testing
+ std::set<std::string> xids;
+ broker.getStore().collectPreparedXids(xids);
+ uint size(0);
+ for (std::set<std::string>::iterator i = xids.begin(); i != xids.end(); i++) {
+ size += i->size() + 1/*shortstr size*/;
+ }
+ Buffer buffer(size + 4/*longstr size*/);
+ buffer.putLong(size);
+ for (std::set<std::string>::iterator i = xids.begin(); i != xids.end(); i++) {
+ buffer.putShortString(*i);
+ }
+ buffer.flip();
+ string data;
+ buffer.getLongString(data);
+
+ FieldTable response;
+ response.setString("xids", data);
+ cClient.recoverOk(response, context.getRequestId());
}
void DtxHandlerImpl::forget(const MethodContext& /*context*/,
u_int16_t /*ticket*/,
- const string& /*xid*/ )
+ const string& xid)
{
- //TODO
+ //Currently no heuristic completion is supported, so this should never be used.
+ throw ConnectionException(503, boost::format("Forget is invalid. Branch with xid %1% not heuristically completed!") % xid);
}
void DtxHandlerImpl::getTimeout(const MethodContext& /*context*/,