summaryrefslogtreecommitdiff
path: root/cpp
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2008-03-26 18:38:35 +0000
committerGordon Sim <gsim@apache.org>2008-03-26 18:38:35 +0000
commit719c2529a14527c236e871603136ccbe44f632d3 (patch)
tree499f5c7b1d2348e46e34cb12d9c9dd5169901022 /cpp
parent5c8e2d27f805eff9f6a457d895fa38dc495301fd (diff)
downloadqpid-python-719c2529a14527c236e871603136ccbe44f632d3.tar.gz
Update to dtx inline with latest spec:
* Updated dtx handling in c++ broker to take account of separation of completion and acceptance. * Added final dtx method defs to extra xml fragment and implemented appropriate handlers in c++ broker. * Converted dtx python tests (recover test still requires some work on decoding arrays). * Allow creation of structs without type codes through a python session method. * Fixed exception handling in python client for commands with results. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@641464 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp')
-rwxr-xr-xcpp/rubygen/99-0/Proxy.rb1
-rw-r--r--cpp/src/Makefile.am2
-rw-r--r--cpp/src/qpid/broker/BrokerAdapter.cpp2
-rw-r--r--cpp/src/qpid/broker/BrokerAdapter.h1
-rw-r--r--cpp/src/qpid/broker/DeliveryRecord.cpp12
-rw-r--r--cpp/src/qpid/broker/DeliveryRecord.h3
-rw-r--r--cpp/src/qpid/broker/DtxAck.cpp1
-rw-r--r--cpp/src/qpid/broker/DtxManager.cpp3
-rw-r--r--cpp/src/qpid/broker/SemanticState.cpp23
-rw-r--r--cpp/src/qpid/broker/SemanticState.h2
-rw-r--r--cpp/src/qpid/broker/SessionAdapter.cpp128
-rw-r--r--cpp/src/qpid/broker/SessionAdapter.h34
-rw-r--r--cpp/src/qpid/broker/SessionState.cpp1
-rw-r--r--cpp/src/qpid/broker/TxAccept.cpp61
-rw-r--r--cpp/src/qpid/broker/TxAccept.h57
-rw-r--r--cpp/xml/extra.xml101
16 files changed, 415 insertions, 17 deletions
diff --git a/cpp/rubygen/99-0/Proxy.rb b/cpp/rubygen/99-0/Proxy.rb
index 85db52da5b..87d809d4ad 100755
--- a/cpp/rubygen/99-0/Proxy.rb
+++ b/cpp/rubygen/99-0/Proxy.rb
@@ -43,6 +43,7 @@ EOS
include "qpid/framing/Proxy.h"
include "qpid/framing/Array.h"
include "qpid/framing/amqp_types.h"
+ include "qpid/framing/amqp_structs.h"
namespace("qpid::framing") {
cpp_class(@classname, "public Proxy") {
public
diff --git a/cpp/src/Makefile.am b/cpp/src/Makefile.am
index d40bbf699e..b71bd754f7 100644
--- a/cpp/src/Makefile.am
+++ b/cpp/src/Makefile.am
@@ -229,6 +229,7 @@ libqpidbroker_la_SOURCES = \
qpid/broker/System.cpp \
qpid/broker/Timer.cpp \
qpid/broker/TopicExchange.cpp \
+ qpid/broker/TxAccept.cpp \
qpid/broker/TxAck.cpp \
qpid/broker/TxBuffer.cpp \
qpid/broker/TxPublish.cpp \
@@ -352,6 +353,7 @@ nobase_include_HEADERS = \
qpid/broker/Timer.h \
qpid/broker/TopicExchange.h \
qpid/broker/TransactionalStore.h \
+ qpid/broker/TxAccept.h \
qpid/broker/TxAck.h \
qpid/broker/TxBuffer.h \
qpid/broker/TxOp.h \
diff --git a/cpp/src/qpid/broker/BrokerAdapter.cpp b/cpp/src/qpid/broker/BrokerAdapter.cpp
index 526b58cb14..d7fd20db1d 100644
--- a/cpp/src/qpid/broker/BrokerAdapter.cpp
+++ b/cpp/src/qpid/broker/BrokerAdapter.cpp
@@ -340,7 +340,7 @@ void BrokerAdapter::TxHandlerImpl::select()
void BrokerAdapter::TxHandlerImpl::commit()
{
- state.commit(&getBroker().getStore());
+ state.commit(&getBroker().getStore(), true);
}
void BrokerAdapter::TxHandlerImpl::rollback()
diff --git a/cpp/src/qpid/broker/BrokerAdapter.h b/cpp/src/qpid/broker/BrokerAdapter.h
index 0e69a03465..b28c4ebdcc 100644
--- a/cpp/src/qpid/broker/BrokerAdapter.h
+++ b/cpp/src/qpid/broker/BrokerAdapter.h
@@ -84,6 +84,7 @@ class BrokerAdapter : public HandlerImpl, public framing::AMQP_ServerOperations
Queue010Handler* getQueue010Handler() { throw framing::NotImplementedException("Class not implemented"); }
Message010Handler* getMessage010Handler() { throw framing::NotImplementedException("Class not implemented"); }
Tx010Handler* getTx010Handler() { throw framing::NotImplementedException("Class not implemented"); }
+ Dtx010Handler* getDtx010Handler() { throw framing::NotImplementedException("Class not implemented"); }
Execution010Handler* getExecution010Handler() { throw framing::NotImplementedException("Class not implemented"); }
// Handlers no longer implemented in BrokerAdapter:
diff --git a/cpp/src/qpid/broker/DeliveryRecord.cpp b/cpp/src/qpid/broker/DeliveryRecord.cpp
index ca90f32a5d..c5bf325afc 100644
--- a/cpp/src/qpid/broker/DeliveryRecord.cpp
+++ b/cpp/src/qpid/broker/DeliveryRecord.cpp
@@ -67,12 +67,8 @@ void DeliveryRecord::setEnded()
ended = true;
//reset msg pointer, don't need to hold on to it anymore
msg.payload = boost::intrusive_ptr<Message>();
-}
-void DeliveryRecord::dequeue(TransactionContext* ctxt) const{
- if (acquired && !ended) {
- queue->dequeue(ctxt, msg.payload);
- }
+ QPID_LOG(debug, "DeliveryRecord::setEnded() id=" << id);
}
bool DeliveryRecord::matches(DeliveryId tag) const{
@@ -136,6 +132,12 @@ void DeliveryRecord::accept(TransactionContext* ctxt) {
}
}
+void DeliveryRecord::dequeue(TransactionContext* ctxt) const{
+ if (acquired && !ended) {
+ queue->dequeue(ctxt, msg.payload);
+ }
+}
+
void DeliveryRecord::reject()
{
Exchange::shared_ptr alternate = queue->getAlternateExchange();
diff --git a/cpp/src/qpid/broker/DeliveryRecord.h b/cpp/src/qpid/broker/DeliveryRecord.h
index b2672345b4..eed7b81748 100644
--- a/cpp/src/qpid/broker/DeliveryRecord.h
+++ b/cpp/src/qpid/broker/DeliveryRecord.h
@@ -55,8 +55,6 @@ class DeliveryRecord{
bool completed;
bool ended;
- void setEnded();
-
public:
DeliveryRecord(const QueuedMessage& msg, Queue::shared_ptr queue, const std::string tag, DeliveryToken::shared_ptr token,
const DeliveryId id, bool acquired, bool confirmed = false);
@@ -76,6 +74,7 @@ class DeliveryRecord{
void acquire(DeliveryIds& results);
void complete();
void accept(TransactionContext* ctxt);
+ void setEnded();
bool isAcquired() const { return acquired; }
bool isComplete() const { return completed; }
diff --git a/cpp/src/qpid/broker/DtxAck.cpp b/cpp/src/qpid/broker/DtxAck.cpp
index 25186b4102..0e6f94d4e0 100644
--- a/cpp/src/qpid/broker/DtxAck.cpp
+++ b/cpp/src/qpid/broker/DtxAck.cpp
@@ -30,7 +30,6 @@ DtxAck::DtxAck(const framing::AccumulatedAck& acked, std::list<DeliveryRecord>&
{
remove_copy_if(unacked.begin(), unacked.end(), inserter(pending, pending.end()),
not1(bind2nd(mem_fun_ref(&DeliveryRecord::coveredBy), &acked)));
- unacked.remove_if(bind2nd(mem_fun_ref(&DeliveryRecord::coveredBy), &acked));
}
bool DtxAck::prepare(TransactionContext* ctxt) throw()
diff --git a/cpp/src/qpid/broker/DtxManager.cpp b/cpp/src/qpid/broker/DtxManager.cpp
index cef3a4b02b..fb6b3f019e 100644
--- a/cpp/src/qpid/broker/DtxManager.cpp
+++ b/cpp/src/qpid/broker/DtxManager.cpp
@@ -54,6 +54,7 @@ void DtxManager::recover(const std::string& xid, std::auto_ptr<TPCTransactionCon
bool DtxManager::prepare(const std::string& xid)
{
+ QPID_LOG(debug, "preparing: " << xid);
try {
return getWork(xid)->prepare();
} catch (DtxTimeoutException& e) {
@@ -64,6 +65,7 @@ bool DtxManager::prepare(const std::string& xid)
bool DtxManager::commit(const std::string& xid, bool onePhase)
{
+ QPID_LOG(debug, "committing: " << xid);
try {
bool result = getWork(xid)->commit(onePhase);
remove(xid);
@@ -76,6 +78,7 @@ bool DtxManager::commit(const std::string& xid, bool onePhase)
void DtxManager::rollback(const std::string& xid)
{
+ QPID_LOG(debug, "rolling back: " << xid);
try {
getWork(xid)->rollback();
remove(xid);
diff --git a/cpp/src/qpid/broker/SemanticState.cpp b/cpp/src/qpid/broker/SemanticState.cpp
index 1b5bcc2f7e..6f5577de5a 100644
--- a/cpp/src/qpid/broker/SemanticState.cpp
+++ b/cpp/src/qpid/broker/SemanticState.cpp
@@ -29,6 +29,7 @@
#include "Message.h"
#include "SemanticHandler.h"
#include "SessionHandler.h"
+#include "TxAccept.h"
#include "TxAck.h"
#include "TxPublish.h"
#include "qpid/framing/reply_exceptions.h"
@@ -119,12 +120,14 @@ void SemanticState::startTx()
txBuffer = TxBuffer::shared_ptr(new TxBuffer());
}
-void SemanticState::commit(MessageStore* const store)
+void SemanticState::commit(MessageStore* const store, bool completeOnCommit)
{
if (!txBuffer) throw
CommandInvalidException(QPID_MSG("Session has not been selected for use with transactions"));
- TxOp::shared_ptr txAck(new TxAck(accumulatedAck, unacked));
+ TxOp::shared_ptr txAck(completeOnCommit ?
+ static_cast<TxOp*>(new TxAck(accumulatedAck, unacked)) :
+ static_cast<TxOp*>(new TxAccept(accumulatedAck, unacked)));
txBuffer->enlist(txAck);
if (txBuffer->commitLocal(store)) {
accumulatedAck.clear();
@@ -402,9 +405,11 @@ void SemanticState::ack(DeliveryId first, DeliveryId last, bool cumulative)
accumulatedAck.update(cumulative ? accumulatedAck.mark : first, last);
if (dtxBuffer.get()) {
- //if enlisted in a dtx, remove the relevant slice from
- //unacked and record it against that transaction
+ //if enlisted in a dtx, copy the relevant slice from
+ //unacked and record it against that transaction:
TxOp::shared_ptr txAck(new DtxAck(accumulatedAck, unacked));
+ //then remove that slice from the unacked record:
+ unacked.remove_if(bind2nd(mem_fun_ref(&DeliveryRecord::coveredBy), &accumulatedAck));
accumulatedAck.clear();
dtxBuffer->enlist(txAck);
}
@@ -660,11 +665,19 @@ void SemanticState::accepted(DeliveryId first, DeliveryId last)
accumulatedAck.update(first, last);//TODO convert accumulatedAck to SequenceSet
if (dtxBuffer.get()) {
- //if enlisted in a dtx, remove the relevant slice from
+ //if enlisted in a dtx, copy the relevant slice from
//unacked and record it against that transaction
TxOp::shared_ptr txAck(new DtxAck(accumulatedAck, unacked));
accumulatedAck.clear();
dtxBuffer->enlist(txAck);
+
+ //mark the relevant messages as 'ended' in unacked
+ for_each(range.start, range.end, mem_fun_ref(&DeliveryRecord::setEnded));
+
+ //if the messages are already completed, they can be
+ //removed from the record
+ unacked.remove_if(mem_fun_ref(&DeliveryRecord::isRedundant));
+
}
} else {
for_each(range.start, range.end, bind2nd(mem_fun_ref(&DeliveryRecord::accept), 0));
diff --git a/cpp/src/qpid/broker/SemanticState.h b/cpp/src/qpid/broker/SemanticState.h
index f9f343de38..4bcd0dddb3 100644
--- a/cpp/src/qpid/broker/SemanticState.h
+++ b/cpp/src/qpid/broker/SemanticState.h
@@ -167,7 +167,7 @@ class SemanticState : public framing::FrameHandler::Chains,
bool get(DeliveryToken::shared_ptr token, Queue::shared_ptr queue, bool ackExpected);
void startTx();
- void commit(MessageStore* const store);
+ void commit(MessageStore* const store, bool completeOnCommit);
void rollback();
void selectDtx();
void startDtx(const std::string& xid, DtxManager& mgr, bool join);
diff --git a/cpp/src/qpid/broker/SessionAdapter.cpp b/cpp/src/qpid/broker/SessionAdapter.cpp
index 5d33e68fab..2091e97584 100644
--- a/cpp/src/qpid/broker/SessionAdapter.cpp
+++ b/cpp/src/qpid/broker/SessionAdapter.cpp
@@ -22,6 +22,7 @@
#include "Queue.h"
#include "qpid/Exception.h"
#include "qpid/framing/reply_exceptions.h"
+#include "qpid/framing/constants.h"
#include <boost/format.hpp>
#include <boost/cast.hpp>
#include <boost/bind.hpp>
@@ -40,7 +41,8 @@ SessionAdapter::SessionAdapter(SemanticState& s) :
queueImpl(s),
messageImpl(s),
executionImpl(s),
- txImpl(s)
+ txImpl(s),
+ dtxImpl(s)
{}
@@ -431,7 +433,7 @@ void SessionAdapter::TxHandlerImpl::select()
void SessionAdapter::TxHandlerImpl::commit()
{
- state.commit(&getBroker().getStore());
+ state.commit(&getBroker().getStore(), false);
}
void SessionAdapter::TxHandlerImpl::rollback()
@@ -439,6 +441,128 @@ void SessionAdapter::TxHandlerImpl::rollback()
state.rollback();
}
+std::string SessionAdapter::DtxHandlerImpl::convert(const framing::Xid010& xid)
+{
+ std::stringstream out;
+ out << xid.getFormat() << xid.getGlobalId() << xid.getBranchId();
+ return out.str();
+}
+
+
+void SessionAdapter::DtxHandlerImpl::select()
+{
+ state.selectDtx();
+}
+
+Dtx010EndResult SessionAdapter::DtxHandlerImpl::end(const Xid010& xid,
+ bool fail,
+ bool suspend)
+{
+ try {
+ if (fail) {
+ state.endDtx(convert(xid), true);
+ if (suspend) {
+ throw CommandInvalidException(QPID_MSG("End and suspend cannot both be set."));
+ } else {
+ return Dtx010EndResult(XA_RBROLLBACK);
+ }
+ } else {
+ if (suspend) {
+ state.suspendDtx(convert(xid));
+ } else {
+ state.endDtx(convert(xid), false);
+ }
+ return Dtx010EndResult(XA_OK);
+ }
+ } catch (const DtxTimeoutException& e) {
+ return Dtx010EndResult(XA_RBTIMEOUT);
+ }
+}
+
+Dtx010StartResult SessionAdapter::DtxHandlerImpl::start(const Xid010& xid,
+ bool join,
+ bool resume)
+{
+ if (join && resume) {
+ throw CommandInvalidException(QPID_MSG("Join and resume cannot both be set."));
+ }
+ try {
+ if (resume) {
+ state.resumeDtx(convert(xid));
+ } else {
+ state.startDtx(convert(xid), getBroker().getDtxManager(), join);
+ }
+ return Dtx010StartResult(XA_OK);
+ } catch (const DtxTimeoutException& e) {
+ return Dtx010StartResult(XA_RBTIMEOUT);
+ }
+}
+
+Dtx010PrepareResult SessionAdapter::DtxHandlerImpl::prepare(const Xid010& xid)
+{
+ try {
+ bool ok = getBroker().getDtxManager().prepare(convert(xid));
+ return Dtx010PrepareResult(ok ? XA_OK : XA_RBROLLBACK);
+ } catch (const DtxTimeoutException& e) {
+ return Dtx010PrepareResult(XA_RBTIMEOUT);
+ }
+}
+
+Dtx010CommitResult SessionAdapter::DtxHandlerImpl::commit(const Xid010& xid,
+ bool onePhase)
+{
+ try {
+ bool ok = getBroker().getDtxManager().commit(convert(xid), onePhase);
+ return Dtx010CommitResult(ok ? XA_OK : XA_RBROLLBACK);
+ } catch (const DtxTimeoutException& e) {
+ return Dtx010CommitResult(XA_RBTIMEOUT);
+ }
+}
+
+
+Dtx010RollbackResult SessionAdapter::DtxHandlerImpl::rollback(const Xid010& xid)
+{
+ try {
+ getBroker().getDtxManager().rollback(convert(xid));
+ return Dtx010RollbackResult(XA_OK);
+ } catch (const DtxTimeoutException& e) {
+ return Dtx010RollbackResult(XA_RBTIMEOUT);
+ }
+}
+
+Dtx010RecoverResult SessionAdapter::DtxHandlerImpl::recover()
+{
+ std::set<std::string> xids;
+ getBroker().getStore().collectPreparedXids(xids);
+
+ //TODO: remove the need to copy from one container type to another
+ std::vector<std::string> data;
+ for (std::set<std::string>::iterator i = xids.begin(); i != xids.end(); i++) {
+ data.push_back(*i);
+ }
+ Array indoubt(data);
+ return Dtx010RecoverResult(indoubt);
+}
+
+void SessionAdapter::DtxHandlerImpl::forget(const Xid010& xid)
+{
+ //Currently no heuristic completion is supported, so this should never be used.
+ throw CommandInvalidException(QPID_MSG("Forget is invalid. Branch with xid " << xid << " not heuristically completed!"));
+}
+
+Dtx010GetTimeoutResult SessionAdapter::DtxHandlerImpl::getTimeout(const Xid010& xid)
+{
+ uint32_t timeout = getBroker().getDtxManager().getTimeout(convert(xid));
+ return Dtx010GetTimeoutResult(timeout);
+}
+
+
+void SessionAdapter::DtxHandlerImpl::setTimeout(const Xid010& xid,
+ u_int32_t timeout)
+{
+ getBroker().getDtxManager().setTimeout(convert(xid), timeout);
+}
+
Queue::shared_ptr SessionAdapter::HandlerHelper::getQueue(const string& name) const {
Queue::shared_ptr queue;
diff --git a/cpp/src/qpid/broker/SessionAdapter.h b/cpp/src/qpid/broker/SessionAdapter.h
index dad89cd123..c8aa9008cc 100644
--- a/cpp/src/qpid/broker/SessionAdapter.h
+++ b/cpp/src/qpid/broker/SessionAdapter.h
@@ -61,6 +61,7 @@ class Queue;
Queue010Handler* getQueue010Handler(){ return &queueImpl; }
Execution010Handler* getExecution010Handler(){ return &executionImpl; }
Tx010Handler* getTx010Handler(){ return &txImpl; }
+ Dtx010Handler* getDtx010Handler(){ return &dtxImpl; }
BasicHandler* getBasicHandler() { throw framing::NotImplementedException("Class not implemented"); }
ExchangeHandler* getExchangeHandler(){ throw framing::NotImplementedException("Class not implemented"); }
@@ -218,12 +219,45 @@ class Queue;
void rollback();
};
+ class DtxHandlerImpl : public Dtx010Handler, public HandlerHelper
+ {
+ std::string convert(const framing::Xid010& xid);
+
+ public:
+ DtxHandlerImpl(SemanticState& session) : HandlerHelper(session) {}
+
+ void select();
+
+ framing::Dtx010StartResult start(const framing::Xid010& xid,
+ bool join,
+ bool resume);
+
+ framing::Dtx010EndResult end(const framing::Xid010& xid,
+ bool fail,
+ bool suspend);
+
+ framing::Dtx010CommitResult commit(const framing::Xid010& xid,
+ bool onePhase);
+
+ void forget(const framing::Xid010& xid);
+
+ framing::Dtx010GetTimeoutResult getTimeout(const framing::Xid010& xid);
+
+ framing::Dtx010PrepareResult prepare(const framing::Xid010& xid);
+
+ framing::Dtx010RecoverResult recover();
+
+ framing::Dtx010RollbackResult rollback(const framing::Xid010& xid);
+
+ void setTimeout(const framing::Xid010& xid, uint32_t timeout);
+ };
ExchangeHandlerImpl exchangeImpl;
QueueHandlerImpl queueImpl;
MessageHandlerImpl messageImpl;
ExecutionHandlerImpl executionImpl;
TxHandlerImpl txImpl;
+ DtxHandlerImpl dtxImpl;
};
}} // namespace qpid::broker
diff --git a/cpp/src/qpid/broker/SessionState.cpp b/cpp/src/qpid/broker/SessionState.cpp
index 105f946dcb..35ad562a22 100644
--- a/cpp/src/qpid/broker/SessionState.cpp
+++ b/cpp/src/qpid/broker/SessionState.cpp
@@ -178,6 +178,7 @@ void SessionState::handleCommand(framing::AMQMethodBody* method, SequenceNumber&
if (!invocation.wasHandled()) {
throw NotImplementedException("Not implemented");
} else if (invocation.hasResult()) {
+ nextOut++;//execution result is now a command, so the counter must be incremented
getProxy().getExecution010().result(id, invocation.getResult());
}
if (method->isSync()) {
diff --git a/cpp/src/qpid/broker/TxAccept.cpp b/cpp/src/qpid/broker/TxAccept.cpp
new file mode 100644
index 0000000000..634d066ecc
--- /dev/null
+++ b/cpp/src/qpid/broker/TxAccept.cpp
@@ -0,0 +1,61 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "TxAccept.h"
+#include "qpid/log/Statement.h"
+
+using std::bind1st;
+using std::bind2nd;
+using std::mem_fun_ref;
+using namespace qpid::broker;
+using qpid::framing::AccumulatedAck;
+
+TxAccept::TxAccept(AccumulatedAck& _acked, std::list<DeliveryRecord>& _unacked) :
+ acked(_acked), unacked(_unacked) {}
+
+bool TxAccept::prepare(TransactionContext* ctxt) throw()
+{
+ try{
+ //dequeue messages from their respective queues:
+ for (ack_iterator i = unacked.begin(); i != unacked.end(); i++) {
+ if (i->coveredBy(&acked)) {
+ i->dequeue(ctxt);
+ }
+ }
+ return true;
+ }catch(const std::exception& e){
+ QPID_LOG(error, "Failed to prepare: " << e.what());
+ return false;
+ }catch(...){
+ QPID_LOG(error, "Failed to prepare");
+ return false;
+ }
+}
+
+void TxAccept::commit() throw()
+{
+ for (ack_iterator i = unacked.begin(); i != unacked.end(); i++) {
+ if (i->coveredBy(&acked)) i->setEnded();
+ }
+
+ unacked.remove_if(mem_fun_ref(&DeliveryRecord::isRedundant));
+}
+
+void TxAccept::rollback() throw() {}
diff --git a/cpp/src/qpid/broker/TxAccept.h b/cpp/src/qpid/broker/TxAccept.h
new file mode 100644
index 0000000000..011acf5d9e
--- /dev/null
+++ b/cpp/src/qpid/broker/TxAccept.h
@@ -0,0 +1,57 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#ifndef _TxAccept_
+#define _TxAccept_
+
+#include <algorithm>
+#include <functional>
+#include <list>
+#include "qpid/framing/AccumulatedAck.h"
+#include "DeliveryRecord.h"
+#include "TxOp.h"
+
+namespace qpid {
+ namespace broker {
+ /**
+ * Defines the transactional behaviour for accepts received by
+ * a transactional channel.
+ */
+ class TxAccept : public TxOp{
+ framing::AccumulatedAck& acked;
+ std::list<DeliveryRecord>& unacked;
+
+ public:
+ /**
+ * @param acked a representation of the accumulation of
+ * acks received
+ * @param unacked the record of delivered messages
+ */
+ TxAccept(framing::AccumulatedAck& acked, std::list<DeliveryRecord>& unacked);
+ virtual bool prepare(TransactionContext* ctxt) throw();
+ virtual void commit() throw();
+ virtual void rollback() throw();
+ virtual ~TxAccept(){}
+ };
+ }
+}
+
+
+#endif
diff --git a/cpp/xml/extra.xml b/cpp/xml/extra.xml
index 0acc2260ad..6080800f60 100644
--- a/cpp/xml/extra.xml
+++ b/cpp/xml/extra.xml
@@ -33,6 +33,14 @@
</doc>
</domain>
+ <domain name="xid010">
+ <struct size="short" pack="short">
+ <field name="format" domain="long" />
+ <field name="global-id" domain="shortstr" />
+ <field name="branch-id" domain="shortstr" />
+ </struct>
+ </domain>
+
<domain name="delivery-properties-010">
<struct size="long" pack="short" type="1025">
<field name="discard-unroutable" domain="bit" label="controls discard of unroutable messages"/>
@@ -739,6 +747,99 @@
</method>
</class>
+<class name="dtx010" index="6">
+ <doc>blah, blah</doc>
+ <method name = "select" index="1">
+ <doc>blah, blah</doc>
+ <chassis name="server" implement="MUST" />
+ </method>
+ <method name = "start" index="2">
+ <doc>blah, blah</doc>
+ <chassis name="server" implement="MUST" />
+ <field name="xid" domain="xid010"/>
+ <field name="join" domain="bit"/>
+ <field name="resume" domain="bit"/>
+ <result>
+ <struct size="long" pack="short" type="1">
+ <field name="status" domain="short" />
+ </struct>
+ </result>
+ </method>
+ <method name = "end" index="3">
+ <doc>blah, blah</doc>
+ <chassis name="server" implement="MUST" />
+ <field name="xid" domain="xid010"/>
+ <field name="fail" domain="bit"/>
+ <field name="suspend" domain="bit"/>
+ <result>
+ <struct size="long" pack="short" type="1">
+ <field name="status" domain="short" />
+ </struct>
+ </result>
+ </method>
+ <method name = "commit" index="4">
+ <doc>blah, blah</doc>
+ <chassis name="server" implement="MUST" />
+ <field name="xid" domain="xid010"/>
+ <field name="one-phase" domain="bit"/>
+ <result>
+ <struct size="long" pack="short" type="1">
+ <field name="status" domain="short" />
+ </struct>
+ </result>
+ </method>
+ <method name = "forget" index="5">
+ <doc>blah, blah</doc>
+ <chassis name="server" implement="MUST" />
+ <field name="xid" domain="xid010"/>
+ </method>
+ <method name = "get-timeout" index="6">
+ <doc>blah, blah</doc>
+ <chassis name="server" implement="MUST" />
+ <field name="xid" domain="xid010"/>
+ <result>
+ <struct size="long" pack="short" type="2">
+ <field name="timeout" domain="long" />
+ </struct>
+ </result>
+ </method>
+ <method name = "prepare" index="7">
+ <doc>blah, blah</doc>
+ <chassis name="server" implement="MUST" />
+ <field name="xid" domain="xid010"/>
+ <result>
+ <struct size="long" pack="short" type="1">
+ <field name="status" domain="short" />
+ </struct>
+ </result>
+ </method>
+ <method name = "recover" index="8">
+ <doc>blah, blah</doc>
+ <chassis name="server" implement="MUST" />
+ <result>
+ <struct size="long" pack="short" type="3">
+ <field name="in-doubt" domain="array" />
+ </struct>
+ </result>
+ </method>
+ <method name = "rollback" index="9">
+ <doc>blah, blah</doc>
+ <chassis name="server" implement="MUST" />
+ <field name="xid" domain="xid010"/>
+ <result>
+ <struct size="long" pack="short" type="1">
+ <field name="status" domain="short" />
+ </struct>
+ </result>
+ </method>
+ <method name = "set-timeout" index="10">
+ <doc>blah, blah</doc>
+ <chassis name="server" implement="MUST" />
+ <field name="xid" domain="xid010"/>
+ <field name="timeout" domain="long"/>
+ </method>
+</class>
+
<class name="exchange010" index="7">
<doc>blah, blah</doc>
<method name = "declare" index="1">