diff options
Diffstat (limited to 'cpp/src/qpid/broker/DtxHandlerImpl.cpp')
-rw-r--r-- | cpp/src/qpid/broker/DtxHandlerImpl.cpp | 95 |
1 files changed, 72 insertions, 23 deletions
diff --git a/cpp/src/qpid/broker/DtxHandlerImpl.cpp b/cpp/src/qpid/broker/DtxHandlerImpl.cpp index 933d787a8a..1d7c2df5f4 100644 --- a/cpp/src/qpid/broker/DtxHandlerImpl.cpp +++ b/cpp/src/qpid/broker/DtxHandlerImpl.cpp @@ -23,6 +23,7 @@ using namespace qpid::broker; using qpid::framing::AMQP_ClientProxy; +using qpid::framing::Buffer; using qpid::framing::FieldTable; using qpid::framing::MethodContext; using std::string; @@ -35,12 +36,22 @@ DtxHandlerImpl::DtxHandlerImpl(CoreRefs& parent) : { } +const int XA_RBROLLBACK(1); +const int XA_RBTIMEOUT(2); +const int XA_HEURHAZ(3); +const int XA_HEURCOM(4); +const int XA_HEURRB(5); +const int XA_HEURMIX(6); +const int XA_RDONLY(7); +const int XA_OK(8); + // DtxDemarcationHandler: void DtxHandlerImpl::select(const MethodContext& context ) { + channel.selectDtx(); dClient.selectOk(context.getRequestId()); } @@ -50,52 +61,58 @@ void DtxHandlerImpl::end(const MethodContext& context, bool fail, bool suspend) { - if (fail && suspend) { - throw ConnectionException(503, "End and suspend cannot both be set."); - } - //TODO: handle fail - if (suspend) { - channel.suspendDtx(xid); + if (fail) { + channel.endDtx(xid, true); + if (suspend) { + throw ConnectionException(503, "End and suspend cannot both be set."); + } else { + dClient.endOk(XA_RBROLLBACK, context.getRequestId()); + } } else { - channel.endDtx(xid); + if (suspend) { + channel.suspendDtx(xid); + } else { + channel.endDtx(xid, false); + } + dClient.endOk(XA_OK, context.getRequestId()); } - dClient.endOk(0/*TODO - set flags*/, context.getRequestId()); } void DtxHandlerImpl::start(const MethodContext& context, u_int16_t /*ticket*/, const string& xid, - bool /*join*/, + bool join, bool resume) { - //TODO: handle join + if (join && resume) { + throw ConnectionException(503, "Join and resume cannot both be set."); + } if (resume) { channel.resumeDtx(xid); } else { - channel.startDtx(xid, broker.getDtxManager()); + channel.startDtx(xid, broker.getDtxManager(), join); } - dClient.startOk(0/*TODO - set flags*/, context.getRequestId()); + dClient.startOk(XA_OK, context.getRequestId()); } // DtxCoordinationHandler: void DtxHandlerImpl::prepare(const MethodContext& context, u_int16_t /*ticket*/, - const string& xid ) + const string& xid) { - broker.getDtxManager().prepare(xid); - cClient.prepareOk(0/*TODO - set flags*/, context.getRequestId()); + bool ok = broker.getDtxManager().prepare(xid); + cClient.prepareOk(ok ? XA_OK : XA_RBROLLBACK, context.getRequestId()); } void DtxHandlerImpl::commit(const MethodContext& context, u_int16_t /*ticket*/, const string& xid, - bool /*onePhase*/ ) + bool onePhase) { - //TODO use onePhase flag to validate correct sequence - broker.getDtxManager().commit(xid); - cClient.commitOk(0/*TODO - set flags*/, context.getRequestId()); + bool ok = broker.getDtxManager().commit(xid, onePhase); + cClient.commitOk(ok ? XA_OK : XA_RBROLLBACK, context.getRequestId()); } @@ -104,22 +121,54 @@ void DtxHandlerImpl::rollback(const MethodContext& context, const string& xid ) { broker.getDtxManager().rollback(xid); - cClient.rollbackOk(0/*TODO - set flags*/, context.getRequestId()); + cClient.rollbackOk(XA_OK, context.getRequestId()); } -void DtxHandlerImpl::recover(const MethodContext& /*context*/, +void DtxHandlerImpl::recover(const MethodContext& context, u_int16_t /*ticket*/, bool /*startscan*/, u_int32_t /*endscan*/ ) { //TODO + + //TODO: what do startscan and endscan actually mean? + + // response should hold on key value pair with key = 'xids' and + // value = sequence of xids + + // until sequences are supported (0-10 encoding), an alternate + // scheme is used for testing: + // + // key = 'xids' and value = a longstr containing shortstrs for each xid + // + // note that this restricts the length of the xids more than is + // strictly 'legal', but that is ok for testing + std::set<std::string> xids; + broker.getStore().collectPreparedXids(xids); + uint size(0); + for (std::set<std::string>::iterator i = xids.begin(); i != xids.end(); i++) { + size += i->size() + 1/*shortstr size*/; + } + Buffer buffer(size + 4/*longstr size*/); + buffer.putLong(size); + for (std::set<std::string>::iterator i = xids.begin(); i != xids.end(); i++) { + buffer.putShortString(*i); + } + buffer.flip(); + string data; + buffer.getLongString(data); + + FieldTable response; + response.setString("xids", data); + cClient.recoverOk(response, context.getRequestId()); } void DtxHandlerImpl::forget(const MethodContext& /*context*/, u_int16_t /*ticket*/, - const string& /*xid*/ ) + const string& xid) { - //TODO + //Currently no heuristic completion is supported, so this should never be used. + throw ConnectionException(503, boost::format("Forget is invalid. Branch with xid %1% not heuristically completed!") % xid); } void DtxHandlerImpl::getTimeout(const MethodContext& /*context*/, |