diff options
Diffstat (limited to 'cpp/src/qpid/broker/SessionAdapter.cpp')
-rw-r--r-- | cpp/src/qpid/broker/SessionAdapter.cpp | 128 |
1 files changed, 126 insertions, 2 deletions
diff --git a/cpp/src/qpid/broker/SessionAdapter.cpp b/cpp/src/qpid/broker/SessionAdapter.cpp index 5d33e68fab..2091e97584 100644 --- a/cpp/src/qpid/broker/SessionAdapter.cpp +++ b/cpp/src/qpid/broker/SessionAdapter.cpp @@ -22,6 +22,7 @@ #include "Queue.h" #include "qpid/Exception.h" #include "qpid/framing/reply_exceptions.h" +#include "qpid/framing/constants.h" #include <boost/format.hpp> #include <boost/cast.hpp> #include <boost/bind.hpp> @@ -40,7 +41,8 @@ SessionAdapter::SessionAdapter(SemanticState& s) : queueImpl(s), messageImpl(s), executionImpl(s), - txImpl(s) + txImpl(s), + dtxImpl(s) {} @@ -431,7 +433,7 @@ void SessionAdapter::TxHandlerImpl::select() void SessionAdapter::TxHandlerImpl::commit() { - state.commit(&getBroker().getStore()); + state.commit(&getBroker().getStore(), false); } void SessionAdapter::TxHandlerImpl::rollback() @@ -439,6 +441,128 @@ void SessionAdapter::TxHandlerImpl::rollback() state.rollback(); } +std::string SessionAdapter::DtxHandlerImpl::convert(const framing::Xid010& xid) +{ + std::stringstream out; + out << xid.getFormat() << xid.getGlobalId() << xid.getBranchId(); + return out.str(); +} + + +void SessionAdapter::DtxHandlerImpl::select() +{ + state.selectDtx(); +} + +Dtx010EndResult SessionAdapter::DtxHandlerImpl::end(const Xid010& xid, + bool fail, + bool suspend) +{ + try { + if (fail) { + state.endDtx(convert(xid), true); + if (suspend) { + throw CommandInvalidException(QPID_MSG("End and suspend cannot both be set.")); + } else { + return Dtx010EndResult(XA_RBROLLBACK); + } + } else { + if (suspend) { + state.suspendDtx(convert(xid)); + } else { + state.endDtx(convert(xid), false); + } + return Dtx010EndResult(XA_OK); + } + } catch (const DtxTimeoutException& e) { + return Dtx010EndResult(XA_RBTIMEOUT); + } +} + +Dtx010StartResult SessionAdapter::DtxHandlerImpl::start(const Xid010& xid, + bool join, + bool resume) +{ + if (join && resume) { + throw CommandInvalidException(QPID_MSG("Join and resume cannot both be set.")); + } + try { + if (resume) { + state.resumeDtx(convert(xid)); + } else { + state.startDtx(convert(xid), getBroker().getDtxManager(), join); + } + return Dtx010StartResult(XA_OK); + } catch (const DtxTimeoutException& e) { + return Dtx010StartResult(XA_RBTIMEOUT); + } +} + +Dtx010PrepareResult SessionAdapter::DtxHandlerImpl::prepare(const Xid010& xid) +{ + try { + bool ok = getBroker().getDtxManager().prepare(convert(xid)); + return Dtx010PrepareResult(ok ? XA_OK : XA_RBROLLBACK); + } catch (const DtxTimeoutException& e) { + return Dtx010PrepareResult(XA_RBTIMEOUT); + } +} + +Dtx010CommitResult SessionAdapter::DtxHandlerImpl::commit(const Xid010& xid, + bool onePhase) +{ + try { + bool ok = getBroker().getDtxManager().commit(convert(xid), onePhase); + return Dtx010CommitResult(ok ? XA_OK : XA_RBROLLBACK); + } catch (const DtxTimeoutException& e) { + return Dtx010CommitResult(XA_RBTIMEOUT); + } +} + + +Dtx010RollbackResult SessionAdapter::DtxHandlerImpl::rollback(const Xid010& xid) +{ + try { + getBroker().getDtxManager().rollback(convert(xid)); + return Dtx010RollbackResult(XA_OK); + } catch (const DtxTimeoutException& e) { + return Dtx010RollbackResult(XA_RBTIMEOUT); + } +} + +Dtx010RecoverResult SessionAdapter::DtxHandlerImpl::recover() +{ + std::set<std::string> xids; + getBroker().getStore().collectPreparedXids(xids); + + //TODO: remove the need to copy from one container type to another + std::vector<std::string> data; + for (std::set<std::string>::iterator i = xids.begin(); i != xids.end(); i++) { + data.push_back(*i); + } + Array indoubt(data); + return Dtx010RecoverResult(indoubt); +} + +void SessionAdapter::DtxHandlerImpl::forget(const Xid010& xid) +{ + //Currently no heuristic completion is supported, so this should never be used. + throw CommandInvalidException(QPID_MSG("Forget is invalid. Branch with xid " << xid << " not heuristically completed!")); +} + +Dtx010GetTimeoutResult SessionAdapter::DtxHandlerImpl::getTimeout(const Xid010& xid) +{ + uint32_t timeout = getBroker().getDtxManager().getTimeout(convert(xid)); + return Dtx010GetTimeoutResult(timeout); +} + + +void SessionAdapter::DtxHandlerImpl::setTimeout(const Xid010& xid, + u_int32_t timeout) +{ + getBroker().getDtxManager().setTimeout(convert(xid), timeout); +} + Queue::shared_ptr SessionAdapter::HandlerHelper::getQueue(const string& name) const { Queue::shared_ptr queue; |