diff options
author | Gordon Sim <gsim@apache.org> | 2008-03-26 18:38:35 +0000 |
---|---|---|
committer | Gordon Sim <gsim@apache.org> | 2008-03-26 18:38:35 +0000 |
commit | 719c2529a14527c236e871603136ccbe44f632d3 (patch) | |
tree | 499f5c7b1d2348e46e34cb12d9c9dd5169901022 /cpp | |
parent | 5c8e2d27f805eff9f6a457d895fa38dc495301fd (diff) | |
download | qpid-python-719c2529a14527c236e871603136ccbe44f632d3.tar.gz |
Update to dtx inline with latest spec:
* Updated dtx handling in c++ broker to take account of separation of completion and acceptance.
* Added final dtx method defs to extra xml fragment and implemented appropriate handlers in c++ broker.
* Converted dtx python tests (recover test still requires some work on decoding arrays).
* Allow creation of structs without type codes through a python session method.
* Fixed exception handling in python client for commands with results.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@641464 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp')
-rwxr-xr-x | cpp/rubygen/99-0/Proxy.rb | 1 | ||||
-rw-r--r-- | cpp/src/Makefile.am | 2 | ||||
-rw-r--r-- | cpp/src/qpid/broker/BrokerAdapter.cpp | 2 | ||||
-rw-r--r-- | cpp/src/qpid/broker/BrokerAdapter.h | 1 | ||||
-rw-r--r-- | cpp/src/qpid/broker/DeliveryRecord.cpp | 12 | ||||
-rw-r--r-- | cpp/src/qpid/broker/DeliveryRecord.h | 3 | ||||
-rw-r--r-- | cpp/src/qpid/broker/DtxAck.cpp | 1 | ||||
-rw-r--r-- | cpp/src/qpid/broker/DtxManager.cpp | 3 | ||||
-rw-r--r-- | cpp/src/qpid/broker/SemanticState.cpp | 23 | ||||
-rw-r--r-- | cpp/src/qpid/broker/SemanticState.h | 2 | ||||
-rw-r--r-- | cpp/src/qpid/broker/SessionAdapter.cpp | 128 | ||||
-rw-r--r-- | cpp/src/qpid/broker/SessionAdapter.h | 34 | ||||
-rw-r--r-- | cpp/src/qpid/broker/SessionState.cpp | 1 | ||||
-rw-r--r-- | cpp/src/qpid/broker/TxAccept.cpp | 61 | ||||
-rw-r--r-- | cpp/src/qpid/broker/TxAccept.h | 57 | ||||
-rw-r--r-- | cpp/xml/extra.xml | 101 |
16 files changed, 415 insertions, 17 deletions
diff --git a/cpp/rubygen/99-0/Proxy.rb b/cpp/rubygen/99-0/Proxy.rb index 85db52da5b..87d809d4ad 100755 --- a/cpp/rubygen/99-0/Proxy.rb +++ b/cpp/rubygen/99-0/Proxy.rb @@ -43,6 +43,7 @@ EOS include "qpid/framing/Proxy.h" include "qpid/framing/Array.h" include "qpid/framing/amqp_types.h" + include "qpid/framing/amqp_structs.h" namespace("qpid::framing") { cpp_class(@classname, "public Proxy") { public diff --git a/cpp/src/Makefile.am b/cpp/src/Makefile.am index d40bbf699e..b71bd754f7 100644 --- a/cpp/src/Makefile.am +++ b/cpp/src/Makefile.am @@ -229,6 +229,7 @@ libqpidbroker_la_SOURCES = \ qpid/broker/System.cpp \ qpid/broker/Timer.cpp \ qpid/broker/TopicExchange.cpp \ + qpid/broker/TxAccept.cpp \ qpid/broker/TxAck.cpp \ qpid/broker/TxBuffer.cpp \ qpid/broker/TxPublish.cpp \ @@ -352,6 +353,7 @@ nobase_include_HEADERS = \ qpid/broker/Timer.h \ qpid/broker/TopicExchange.h \ qpid/broker/TransactionalStore.h \ + qpid/broker/TxAccept.h \ qpid/broker/TxAck.h \ qpid/broker/TxBuffer.h \ qpid/broker/TxOp.h \ diff --git a/cpp/src/qpid/broker/BrokerAdapter.cpp b/cpp/src/qpid/broker/BrokerAdapter.cpp index 526b58cb14..d7fd20db1d 100644 --- a/cpp/src/qpid/broker/BrokerAdapter.cpp +++ b/cpp/src/qpid/broker/BrokerAdapter.cpp @@ -340,7 +340,7 @@ void BrokerAdapter::TxHandlerImpl::select() void BrokerAdapter::TxHandlerImpl::commit() { - state.commit(&getBroker().getStore()); + state.commit(&getBroker().getStore(), true); } void BrokerAdapter::TxHandlerImpl::rollback() diff --git a/cpp/src/qpid/broker/BrokerAdapter.h b/cpp/src/qpid/broker/BrokerAdapter.h index 0e69a03465..b28c4ebdcc 100644 --- a/cpp/src/qpid/broker/BrokerAdapter.h +++ b/cpp/src/qpid/broker/BrokerAdapter.h @@ -84,6 +84,7 @@ class BrokerAdapter : public HandlerImpl, public framing::AMQP_ServerOperations Queue010Handler* getQueue010Handler() { throw framing::NotImplementedException("Class not implemented"); } Message010Handler* getMessage010Handler() { throw framing::NotImplementedException("Class not implemented"); } Tx010Handler* getTx010Handler() { throw framing::NotImplementedException("Class not implemented"); } + Dtx010Handler* getDtx010Handler() { throw framing::NotImplementedException("Class not implemented"); } Execution010Handler* getExecution010Handler() { throw framing::NotImplementedException("Class not implemented"); } // Handlers no longer implemented in BrokerAdapter: diff --git a/cpp/src/qpid/broker/DeliveryRecord.cpp b/cpp/src/qpid/broker/DeliveryRecord.cpp index ca90f32a5d..c5bf325afc 100644 --- a/cpp/src/qpid/broker/DeliveryRecord.cpp +++ b/cpp/src/qpid/broker/DeliveryRecord.cpp @@ -67,12 +67,8 @@ void DeliveryRecord::setEnded() ended = true; //reset msg pointer, don't need to hold on to it anymore msg.payload = boost::intrusive_ptr<Message>(); -} -void DeliveryRecord::dequeue(TransactionContext* ctxt) const{ - if (acquired && !ended) { - queue->dequeue(ctxt, msg.payload); - } + QPID_LOG(debug, "DeliveryRecord::setEnded() id=" << id); } bool DeliveryRecord::matches(DeliveryId tag) const{ @@ -136,6 +132,12 @@ void DeliveryRecord::accept(TransactionContext* ctxt) { } } +void DeliveryRecord::dequeue(TransactionContext* ctxt) const{ + if (acquired && !ended) { + queue->dequeue(ctxt, msg.payload); + } +} + void DeliveryRecord::reject() { Exchange::shared_ptr alternate = queue->getAlternateExchange(); diff --git a/cpp/src/qpid/broker/DeliveryRecord.h b/cpp/src/qpid/broker/DeliveryRecord.h index b2672345b4..eed7b81748 100644 --- a/cpp/src/qpid/broker/DeliveryRecord.h +++ b/cpp/src/qpid/broker/DeliveryRecord.h @@ -55,8 +55,6 @@ class DeliveryRecord{ bool completed; bool ended; - void setEnded(); - public: DeliveryRecord(const QueuedMessage& msg, Queue::shared_ptr queue, const std::string tag, DeliveryToken::shared_ptr token, const DeliveryId id, bool acquired, bool confirmed = false); @@ -76,6 +74,7 @@ class DeliveryRecord{ void acquire(DeliveryIds& results); void complete(); void accept(TransactionContext* ctxt); + void setEnded(); bool isAcquired() const { return acquired; } bool isComplete() const { return completed; } diff --git a/cpp/src/qpid/broker/DtxAck.cpp b/cpp/src/qpid/broker/DtxAck.cpp index 25186b4102..0e6f94d4e0 100644 --- a/cpp/src/qpid/broker/DtxAck.cpp +++ b/cpp/src/qpid/broker/DtxAck.cpp @@ -30,7 +30,6 @@ DtxAck::DtxAck(const framing::AccumulatedAck& acked, std::list<DeliveryRecord>& { remove_copy_if(unacked.begin(), unacked.end(), inserter(pending, pending.end()), not1(bind2nd(mem_fun_ref(&DeliveryRecord::coveredBy), &acked))); - unacked.remove_if(bind2nd(mem_fun_ref(&DeliveryRecord::coveredBy), &acked)); } bool DtxAck::prepare(TransactionContext* ctxt) throw() diff --git a/cpp/src/qpid/broker/DtxManager.cpp b/cpp/src/qpid/broker/DtxManager.cpp index cef3a4b02b..fb6b3f019e 100644 --- a/cpp/src/qpid/broker/DtxManager.cpp +++ b/cpp/src/qpid/broker/DtxManager.cpp @@ -54,6 +54,7 @@ void DtxManager::recover(const std::string& xid, std::auto_ptr<TPCTransactionCon bool DtxManager::prepare(const std::string& xid) { + QPID_LOG(debug, "preparing: " << xid); try { return getWork(xid)->prepare(); } catch (DtxTimeoutException& e) { @@ -64,6 +65,7 @@ bool DtxManager::prepare(const std::string& xid) bool DtxManager::commit(const std::string& xid, bool onePhase) { + QPID_LOG(debug, "committing: " << xid); try { bool result = getWork(xid)->commit(onePhase); remove(xid); @@ -76,6 +78,7 @@ bool DtxManager::commit(const std::string& xid, bool onePhase) void DtxManager::rollback(const std::string& xid) { + QPID_LOG(debug, "rolling back: " << xid); try { getWork(xid)->rollback(); remove(xid); diff --git a/cpp/src/qpid/broker/SemanticState.cpp b/cpp/src/qpid/broker/SemanticState.cpp index 1b5bcc2f7e..6f5577de5a 100644 --- a/cpp/src/qpid/broker/SemanticState.cpp +++ b/cpp/src/qpid/broker/SemanticState.cpp @@ -29,6 +29,7 @@ #include "Message.h" #include "SemanticHandler.h" #include "SessionHandler.h" +#include "TxAccept.h" #include "TxAck.h" #include "TxPublish.h" #include "qpid/framing/reply_exceptions.h" @@ -119,12 +120,14 @@ void SemanticState::startTx() txBuffer = TxBuffer::shared_ptr(new TxBuffer()); } -void SemanticState::commit(MessageStore* const store) +void SemanticState::commit(MessageStore* const store, bool completeOnCommit) { if (!txBuffer) throw CommandInvalidException(QPID_MSG("Session has not been selected for use with transactions")); - TxOp::shared_ptr txAck(new TxAck(accumulatedAck, unacked)); + TxOp::shared_ptr txAck(completeOnCommit ? + static_cast<TxOp*>(new TxAck(accumulatedAck, unacked)) : + static_cast<TxOp*>(new TxAccept(accumulatedAck, unacked))); txBuffer->enlist(txAck); if (txBuffer->commitLocal(store)) { accumulatedAck.clear(); @@ -402,9 +405,11 @@ void SemanticState::ack(DeliveryId first, DeliveryId last, bool cumulative) accumulatedAck.update(cumulative ? accumulatedAck.mark : first, last); if (dtxBuffer.get()) { - //if enlisted in a dtx, remove the relevant slice from - //unacked and record it against that transaction + //if enlisted in a dtx, copy the relevant slice from + //unacked and record it against that transaction: TxOp::shared_ptr txAck(new DtxAck(accumulatedAck, unacked)); + //then remove that slice from the unacked record: + unacked.remove_if(bind2nd(mem_fun_ref(&DeliveryRecord::coveredBy), &accumulatedAck)); accumulatedAck.clear(); dtxBuffer->enlist(txAck); } @@ -660,11 +665,19 @@ void SemanticState::accepted(DeliveryId first, DeliveryId last) accumulatedAck.update(first, last);//TODO convert accumulatedAck to SequenceSet if (dtxBuffer.get()) { - //if enlisted in a dtx, remove the relevant slice from + //if enlisted in a dtx, copy the relevant slice from //unacked and record it against that transaction TxOp::shared_ptr txAck(new DtxAck(accumulatedAck, unacked)); accumulatedAck.clear(); dtxBuffer->enlist(txAck); + + //mark the relevant messages as 'ended' in unacked + for_each(range.start, range.end, mem_fun_ref(&DeliveryRecord::setEnded)); + + //if the messages are already completed, they can be + //removed from the record + unacked.remove_if(mem_fun_ref(&DeliveryRecord::isRedundant)); + } } else { for_each(range.start, range.end, bind2nd(mem_fun_ref(&DeliveryRecord::accept), 0)); diff --git a/cpp/src/qpid/broker/SemanticState.h b/cpp/src/qpid/broker/SemanticState.h index f9f343de38..4bcd0dddb3 100644 --- a/cpp/src/qpid/broker/SemanticState.h +++ b/cpp/src/qpid/broker/SemanticState.h @@ -167,7 +167,7 @@ class SemanticState : public framing::FrameHandler::Chains, bool get(DeliveryToken::shared_ptr token, Queue::shared_ptr queue, bool ackExpected); void startTx(); - void commit(MessageStore* const store); + void commit(MessageStore* const store, bool completeOnCommit); void rollback(); void selectDtx(); void startDtx(const std::string& xid, DtxManager& mgr, bool join); diff --git a/cpp/src/qpid/broker/SessionAdapter.cpp b/cpp/src/qpid/broker/SessionAdapter.cpp index 5d33e68fab..2091e97584 100644 --- a/cpp/src/qpid/broker/SessionAdapter.cpp +++ b/cpp/src/qpid/broker/SessionAdapter.cpp @@ -22,6 +22,7 @@ #include "Queue.h" #include "qpid/Exception.h" #include "qpid/framing/reply_exceptions.h" +#include "qpid/framing/constants.h" #include <boost/format.hpp> #include <boost/cast.hpp> #include <boost/bind.hpp> @@ -40,7 +41,8 @@ SessionAdapter::SessionAdapter(SemanticState& s) : queueImpl(s), messageImpl(s), executionImpl(s), - txImpl(s) + txImpl(s), + dtxImpl(s) {} @@ -431,7 +433,7 @@ void SessionAdapter::TxHandlerImpl::select() void SessionAdapter::TxHandlerImpl::commit() { - state.commit(&getBroker().getStore()); + state.commit(&getBroker().getStore(), false); } void SessionAdapter::TxHandlerImpl::rollback() @@ -439,6 +441,128 @@ void SessionAdapter::TxHandlerImpl::rollback() state.rollback(); } +std::string SessionAdapter::DtxHandlerImpl::convert(const framing::Xid010& xid) +{ + std::stringstream out; + out << xid.getFormat() << xid.getGlobalId() << xid.getBranchId(); + return out.str(); +} + + +void SessionAdapter::DtxHandlerImpl::select() +{ + state.selectDtx(); +} + +Dtx010EndResult SessionAdapter::DtxHandlerImpl::end(const Xid010& xid, + bool fail, + bool suspend) +{ + try { + if (fail) { + state.endDtx(convert(xid), true); + if (suspend) { + throw CommandInvalidException(QPID_MSG("End and suspend cannot both be set.")); + } else { + return Dtx010EndResult(XA_RBROLLBACK); + } + } else { + if (suspend) { + state.suspendDtx(convert(xid)); + } else { + state.endDtx(convert(xid), false); + } + return Dtx010EndResult(XA_OK); + } + } catch (const DtxTimeoutException& e) { + return Dtx010EndResult(XA_RBTIMEOUT); + } +} + +Dtx010StartResult SessionAdapter::DtxHandlerImpl::start(const Xid010& xid, + bool join, + bool resume) +{ + if (join && resume) { + throw CommandInvalidException(QPID_MSG("Join and resume cannot both be set.")); + } + try { + if (resume) { + state.resumeDtx(convert(xid)); + } else { + state.startDtx(convert(xid), getBroker().getDtxManager(), join); + } + return Dtx010StartResult(XA_OK); + } catch (const DtxTimeoutException& e) { + return Dtx010StartResult(XA_RBTIMEOUT); + } +} + +Dtx010PrepareResult SessionAdapter::DtxHandlerImpl::prepare(const Xid010& xid) +{ + try { + bool ok = getBroker().getDtxManager().prepare(convert(xid)); + return Dtx010PrepareResult(ok ? XA_OK : XA_RBROLLBACK); + } catch (const DtxTimeoutException& e) { + return Dtx010PrepareResult(XA_RBTIMEOUT); + } +} + +Dtx010CommitResult SessionAdapter::DtxHandlerImpl::commit(const Xid010& xid, + bool onePhase) +{ + try { + bool ok = getBroker().getDtxManager().commit(convert(xid), onePhase); + return Dtx010CommitResult(ok ? XA_OK : XA_RBROLLBACK); + } catch (const DtxTimeoutException& e) { + return Dtx010CommitResult(XA_RBTIMEOUT); + } +} + + +Dtx010RollbackResult SessionAdapter::DtxHandlerImpl::rollback(const Xid010& xid) +{ + try { + getBroker().getDtxManager().rollback(convert(xid)); + return Dtx010RollbackResult(XA_OK); + } catch (const DtxTimeoutException& e) { + return Dtx010RollbackResult(XA_RBTIMEOUT); + } +} + +Dtx010RecoverResult SessionAdapter::DtxHandlerImpl::recover() +{ + std::set<std::string> xids; + getBroker().getStore().collectPreparedXids(xids); + + //TODO: remove the need to copy from one container type to another + std::vector<std::string> data; + for (std::set<std::string>::iterator i = xids.begin(); i != xids.end(); i++) { + data.push_back(*i); + } + Array indoubt(data); + return Dtx010RecoverResult(indoubt); +} + +void SessionAdapter::DtxHandlerImpl::forget(const Xid010& xid) +{ + //Currently no heuristic completion is supported, so this should never be used. + throw CommandInvalidException(QPID_MSG("Forget is invalid. Branch with xid " << xid << " not heuristically completed!")); +} + +Dtx010GetTimeoutResult SessionAdapter::DtxHandlerImpl::getTimeout(const Xid010& xid) +{ + uint32_t timeout = getBroker().getDtxManager().getTimeout(convert(xid)); + return Dtx010GetTimeoutResult(timeout); +} + + +void SessionAdapter::DtxHandlerImpl::setTimeout(const Xid010& xid, + u_int32_t timeout) +{ + getBroker().getDtxManager().setTimeout(convert(xid), timeout); +} + Queue::shared_ptr SessionAdapter::HandlerHelper::getQueue(const string& name) const { Queue::shared_ptr queue; diff --git a/cpp/src/qpid/broker/SessionAdapter.h b/cpp/src/qpid/broker/SessionAdapter.h index dad89cd123..c8aa9008cc 100644 --- a/cpp/src/qpid/broker/SessionAdapter.h +++ b/cpp/src/qpid/broker/SessionAdapter.h @@ -61,6 +61,7 @@ class Queue; Queue010Handler* getQueue010Handler(){ return &queueImpl; } Execution010Handler* getExecution010Handler(){ return &executionImpl; } Tx010Handler* getTx010Handler(){ return &txImpl; } + Dtx010Handler* getDtx010Handler(){ return &dtxImpl; } BasicHandler* getBasicHandler() { throw framing::NotImplementedException("Class not implemented"); } ExchangeHandler* getExchangeHandler(){ throw framing::NotImplementedException("Class not implemented"); } @@ -218,12 +219,45 @@ class Queue; void rollback(); }; + class DtxHandlerImpl : public Dtx010Handler, public HandlerHelper + { + std::string convert(const framing::Xid010& xid); + + public: + DtxHandlerImpl(SemanticState& session) : HandlerHelper(session) {} + + void select(); + + framing::Dtx010StartResult start(const framing::Xid010& xid, + bool join, + bool resume); + + framing::Dtx010EndResult end(const framing::Xid010& xid, + bool fail, + bool suspend); + + framing::Dtx010CommitResult commit(const framing::Xid010& xid, + bool onePhase); + + void forget(const framing::Xid010& xid); + + framing::Dtx010GetTimeoutResult getTimeout(const framing::Xid010& xid); + + framing::Dtx010PrepareResult prepare(const framing::Xid010& xid); + + framing::Dtx010RecoverResult recover(); + + framing::Dtx010RollbackResult rollback(const framing::Xid010& xid); + + void setTimeout(const framing::Xid010& xid, uint32_t timeout); + }; ExchangeHandlerImpl exchangeImpl; QueueHandlerImpl queueImpl; MessageHandlerImpl messageImpl; ExecutionHandlerImpl executionImpl; TxHandlerImpl txImpl; + DtxHandlerImpl dtxImpl; }; }} // namespace qpid::broker diff --git a/cpp/src/qpid/broker/SessionState.cpp b/cpp/src/qpid/broker/SessionState.cpp index 105f946dcb..35ad562a22 100644 --- a/cpp/src/qpid/broker/SessionState.cpp +++ b/cpp/src/qpid/broker/SessionState.cpp @@ -178,6 +178,7 @@ void SessionState::handleCommand(framing::AMQMethodBody* method, SequenceNumber& if (!invocation.wasHandled()) { throw NotImplementedException("Not implemented"); } else if (invocation.hasResult()) { + nextOut++;//execution result is now a command, so the counter must be incremented getProxy().getExecution010().result(id, invocation.getResult()); } if (method->isSync()) { diff --git a/cpp/src/qpid/broker/TxAccept.cpp b/cpp/src/qpid/broker/TxAccept.cpp new file mode 100644 index 0000000000..634d066ecc --- /dev/null +++ b/cpp/src/qpid/broker/TxAccept.cpp @@ -0,0 +1,61 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "TxAccept.h" +#include "qpid/log/Statement.h" + +using std::bind1st; +using std::bind2nd; +using std::mem_fun_ref; +using namespace qpid::broker; +using qpid::framing::AccumulatedAck; + +TxAccept::TxAccept(AccumulatedAck& _acked, std::list<DeliveryRecord>& _unacked) : + acked(_acked), unacked(_unacked) {} + +bool TxAccept::prepare(TransactionContext* ctxt) throw() +{ + try{ + //dequeue messages from their respective queues: + for (ack_iterator i = unacked.begin(); i != unacked.end(); i++) { + if (i->coveredBy(&acked)) { + i->dequeue(ctxt); + } + } + return true; + }catch(const std::exception& e){ + QPID_LOG(error, "Failed to prepare: " << e.what()); + return false; + }catch(...){ + QPID_LOG(error, "Failed to prepare"); + return false; + } +} + +void TxAccept::commit() throw() +{ + for (ack_iterator i = unacked.begin(); i != unacked.end(); i++) { + if (i->coveredBy(&acked)) i->setEnded(); + } + + unacked.remove_if(mem_fun_ref(&DeliveryRecord::isRedundant)); +} + +void TxAccept::rollback() throw() {} diff --git a/cpp/src/qpid/broker/TxAccept.h b/cpp/src/qpid/broker/TxAccept.h new file mode 100644 index 0000000000..011acf5d9e --- /dev/null +++ b/cpp/src/qpid/broker/TxAccept.h @@ -0,0 +1,57 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#ifndef _TxAccept_ +#define _TxAccept_ + +#include <algorithm> +#include <functional> +#include <list> +#include "qpid/framing/AccumulatedAck.h" +#include "DeliveryRecord.h" +#include "TxOp.h" + +namespace qpid { + namespace broker { + /** + * Defines the transactional behaviour for accepts received by + * a transactional channel. + */ + class TxAccept : public TxOp{ + framing::AccumulatedAck& acked; + std::list<DeliveryRecord>& unacked; + + public: + /** + * @param acked a representation of the accumulation of + * acks received + * @param unacked the record of delivered messages + */ + TxAccept(framing::AccumulatedAck& acked, std::list<DeliveryRecord>& unacked); + virtual bool prepare(TransactionContext* ctxt) throw(); + virtual void commit() throw(); + virtual void rollback() throw(); + virtual ~TxAccept(){} + }; + } +} + + +#endif diff --git a/cpp/xml/extra.xml b/cpp/xml/extra.xml index 0acc2260ad..6080800f60 100644 --- a/cpp/xml/extra.xml +++ b/cpp/xml/extra.xml @@ -33,6 +33,14 @@ </doc> </domain> + <domain name="xid010"> + <struct size="short" pack="short"> + <field name="format" domain="long" /> + <field name="global-id" domain="shortstr" /> + <field name="branch-id" domain="shortstr" /> + </struct> + </domain> + <domain name="delivery-properties-010"> <struct size="long" pack="short" type="1025"> <field name="discard-unroutable" domain="bit" label="controls discard of unroutable messages"/> @@ -739,6 +747,99 @@ </method> </class> +<class name="dtx010" index="6"> + <doc>blah, blah</doc> + <method name = "select" index="1"> + <doc>blah, blah</doc> + <chassis name="server" implement="MUST" /> + </method> + <method name = "start" index="2"> + <doc>blah, blah</doc> + <chassis name="server" implement="MUST" /> + <field name="xid" domain="xid010"/> + <field name="join" domain="bit"/> + <field name="resume" domain="bit"/> + <result> + <struct size="long" pack="short" type="1"> + <field name="status" domain="short" /> + </struct> + </result> + </method> + <method name = "end" index="3"> + <doc>blah, blah</doc> + <chassis name="server" implement="MUST" /> + <field name="xid" domain="xid010"/> + <field name="fail" domain="bit"/> + <field name="suspend" domain="bit"/> + <result> + <struct size="long" pack="short" type="1"> + <field name="status" domain="short" /> + </struct> + </result> + </method> + <method name = "commit" index="4"> + <doc>blah, blah</doc> + <chassis name="server" implement="MUST" /> + <field name="xid" domain="xid010"/> + <field name="one-phase" domain="bit"/> + <result> + <struct size="long" pack="short" type="1"> + <field name="status" domain="short" /> + </struct> + </result> + </method> + <method name = "forget" index="5"> + <doc>blah, blah</doc> + <chassis name="server" implement="MUST" /> + <field name="xid" domain="xid010"/> + </method> + <method name = "get-timeout" index="6"> + <doc>blah, blah</doc> + <chassis name="server" implement="MUST" /> + <field name="xid" domain="xid010"/> + <result> + <struct size="long" pack="short" type="2"> + <field name="timeout" domain="long" /> + </struct> + </result> + </method> + <method name = "prepare" index="7"> + <doc>blah, blah</doc> + <chassis name="server" implement="MUST" /> + <field name="xid" domain="xid010"/> + <result> + <struct size="long" pack="short" type="1"> + <field name="status" domain="short" /> + </struct> + </result> + </method> + <method name = "recover" index="8"> + <doc>blah, blah</doc> + <chassis name="server" implement="MUST" /> + <result> + <struct size="long" pack="short" type="3"> + <field name="in-doubt" domain="array" /> + </struct> + </result> + </method> + <method name = "rollback" index="9"> + <doc>blah, blah</doc> + <chassis name="server" implement="MUST" /> + <field name="xid" domain="xid010"/> + <result> + <struct size="long" pack="short" type="1"> + <field name="status" domain="short" /> + </struct> + </result> + </method> + <method name = "set-timeout" index="10"> + <doc>blah, blah</doc> + <chassis name="server" implement="MUST" /> + <field name="xid" domain="xid010"/> + <field name="timeout" domain="long"/> + </method> +</class> + <class name="exchange010" index="7"> <doc>blah, blah</doc> <method name = "declare" index="1"> |