summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker/SessionAdapter.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/broker/SessionAdapter.cpp')
-rw-r--r--cpp/src/qpid/broker/SessionAdapter.cpp128
1 files changed, 126 insertions, 2 deletions
diff --git a/cpp/src/qpid/broker/SessionAdapter.cpp b/cpp/src/qpid/broker/SessionAdapter.cpp
index 5d33e68fab..2091e97584 100644
--- a/cpp/src/qpid/broker/SessionAdapter.cpp
+++ b/cpp/src/qpid/broker/SessionAdapter.cpp
@@ -22,6 +22,7 @@
#include "Queue.h"
#include "qpid/Exception.h"
#include "qpid/framing/reply_exceptions.h"
+#include "qpid/framing/constants.h"
#include <boost/format.hpp>
#include <boost/cast.hpp>
#include <boost/bind.hpp>
@@ -40,7 +41,8 @@ SessionAdapter::SessionAdapter(SemanticState& s) :
queueImpl(s),
messageImpl(s),
executionImpl(s),
- txImpl(s)
+ txImpl(s),
+ dtxImpl(s)
{}
@@ -431,7 +433,7 @@ void SessionAdapter::TxHandlerImpl::select()
void SessionAdapter::TxHandlerImpl::commit()
{
- state.commit(&getBroker().getStore());
+ state.commit(&getBroker().getStore(), false);
}
void SessionAdapter::TxHandlerImpl::rollback()
@@ -439,6 +441,128 @@ void SessionAdapter::TxHandlerImpl::rollback()
state.rollback();
}
+std::string SessionAdapter::DtxHandlerImpl::convert(const framing::Xid010& xid)
+{
+ std::stringstream out;
+ out << xid.getFormat() << xid.getGlobalId() << xid.getBranchId();
+ return out.str();
+}
+
+
+void SessionAdapter::DtxHandlerImpl::select()
+{
+ state.selectDtx();
+}
+
+Dtx010EndResult SessionAdapter::DtxHandlerImpl::end(const Xid010& xid,
+ bool fail,
+ bool suspend)
+{
+ try {
+ if (fail) {
+ state.endDtx(convert(xid), true);
+ if (suspend) {
+ throw CommandInvalidException(QPID_MSG("End and suspend cannot both be set."));
+ } else {
+ return Dtx010EndResult(XA_RBROLLBACK);
+ }
+ } else {
+ if (suspend) {
+ state.suspendDtx(convert(xid));
+ } else {
+ state.endDtx(convert(xid), false);
+ }
+ return Dtx010EndResult(XA_OK);
+ }
+ } catch (const DtxTimeoutException& e) {
+ return Dtx010EndResult(XA_RBTIMEOUT);
+ }
+}
+
+Dtx010StartResult SessionAdapter::DtxHandlerImpl::start(const Xid010& xid,
+ bool join,
+ bool resume)
+{
+ if (join && resume) {
+ throw CommandInvalidException(QPID_MSG("Join and resume cannot both be set."));
+ }
+ try {
+ if (resume) {
+ state.resumeDtx(convert(xid));
+ } else {
+ state.startDtx(convert(xid), getBroker().getDtxManager(), join);
+ }
+ return Dtx010StartResult(XA_OK);
+ } catch (const DtxTimeoutException& e) {
+ return Dtx010StartResult(XA_RBTIMEOUT);
+ }
+}
+
+Dtx010PrepareResult SessionAdapter::DtxHandlerImpl::prepare(const Xid010& xid)
+{
+ try {
+ bool ok = getBroker().getDtxManager().prepare(convert(xid));
+ return Dtx010PrepareResult(ok ? XA_OK : XA_RBROLLBACK);
+ } catch (const DtxTimeoutException& e) {
+ return Dtx010PrepareResult(XA_RBTIMEOUT);
+ }
+}
+
+Dtx010CommitResult SessionAdapter::DtxHandlerImpl::commit(const Xid010& xid,
+ bool onePhase)
+{
+ try {
+ bool ok = getBroker().getDtxManager().commit(convert(xid), onePhase);
+ return Dtx010CommitResult(ok ? XA_OK : XA_RBROLLBACK);
+ } catch (const DtxTimeoutException& e) {
+ return Dtx010CommitResult(XA_RBTIMEOUT);
+ }
+}
+
+
+Dtx010RollbackResult SessionAdapter::DtxHandlerImpl::rollback(const Xid010& xid)
+{
+ try {
+ getBroker().getDtxManager().rollback(convert(xid));
+ return Dtx010RollbackResult(XA_OK);
+ } catch (const DtxTimeoutException& e) {
+ return Dtx010RollbackResult(XA_RBTIMEOUT);
+ }
+}
+
+Dtx010RecoverResult SessionAdapter::DtxHandlerImpl::recover()
+{
+ std::set<std::string> xids;
+ getBroker().getStore().collectPreparedXids(xids);
+
+ //TODO: remove the need to copy from one container type to another
+ std::vector<std::string> data;
+ for (std::set<std::string>::iterator i = xids.begin(); i != xids.end(); i++) {
+ data.push_back(*i);
+ }
+ Array indoubt(data);
+ return Dtx010RecoverResult(indoubt);
+}
+
+void SessionAdapter::DtxHandlerImpl::forget(const Xid010& xid)
+{
+ //Currently no heuristic completion is supported, so this should never be used.
+ throw CommandInvalidException(QPID_MSG("Forget is invalid. Branch with xid " << xid << " not heuristically completed!"));
+}
+
+Dtx010GetTimeoutResult SessionAdapter::DtxHandlerImpl::getTimeout(const Xid010& xid)
+{
+ uint32_t timeout = getBroker().getDtxManager().getTimeout(convert(xid));
+ return Dtx010GetTimeoutResult(timeout);
+}
+
+
+void SessionAdapter::DtxHandlerImpl::setTimeout(const Xid010& xid,
+ u_int32_t timeout)
+{
+ getBroker().getDtxManager().setTimeout(convert(xid), timeout);
+}
+
Queue::shared_ptr SessionAdapter::HandlerHelper::getQueue(const string& name) const {
Queue::shared_ptr queue;