diff options
Diffstat (limited to 'cpp/src/qpid/broker/SemanticState.cpp')
-rw-r--r-- | cpp/src/qpid/broker/SemanticState.cpp | 23 |
1 files changed, 18 insertions, 5 deletions
diff --git a/cpp/src/qpid/broker/SemanticState.cpp b/cpp/src/qpid/broker/SemanticState.cpp index 1b5bcc2f7e..6f5577de5a 100644 --- a/cpp/src/qpid/broker/SemanticState.cpp +++ b/cpp/src/qpid/broker/SemanticState.cpp @@ -29,6 +29,7 @@ #include "Message.h" #include "SemanticHandler.h" #include "SessionHandler.h" +#include "TxAccept.h" #include "TxAck.h" #include "TxPublish.h" #include "qpid/framing/reply_exceptions.h" @@ -119,12 +120,14 @@ void SemanticState::startTx() txBuffer = TxBuffer::shared_ptr(new TxBuffer()); } -void SemanticState::commit(MessageStore* const store) +void SemanticState::commit(MessageStore* const store, bool completeOnCommit) { if (!txBuffer) throw CommandInvalidException(QPID_MSG("Session has not been selected for use with transactions")); - TxOp::shared_ptr txAck(new TxAck(accumulatedAck, unacked)); + TxOp::shared_ptr txAck(completeOnCommit ? + static_cast<TxOp*>(new TxAck(accumulatedAck, unacked)) : + static_cast<TxOp*>(new TxAccept(accumulatedAck, unacked))); txBuffer->enlist(txAck); if (txBuffer->commitLocal(store)) { accumulatedAck.clear(); @@ -402,9 +405,11 @@ void SemanticState::ack(DeliveryId first, DeliveryId last, bool cumulative) accumulatedAck.update(cumulative ? accumulatedAck.mark : first, last); if (dtxBuffer.get()) { - //if enlisted in a dtx, remove the relevant slice from - //unacked and record it against that transaction + //if enlisted in a dtx, copy the relevant slice from + //unacked and record it against that transaction: TxOp::shared_ptr txAck(new DtxAck(accumulatedAck, unacked)); + //then remove that slice from the unacked record: + unacked.remove_if(bind2nd(mem_fun_ref(&DeliveryRecord::coveredBy), &accumulatedAck)); accumulatedAck.clear(); dtxBuffer->enlist(txAck); } @@ -660,11 +665,19 @@ void SemanticState::accepted(DeliveryId first, DeliveryId last) accumulatedAck.update(first, last);//TODO convert accumulatedAck to SequenceSet if (dtxBuffer.get()) { - //if enlisted in a dtx, remove the relevant slice from + //if enlisted in a dtx, copy the relevant slice from //unacked and record it against that transaction TxOp::shared_ptr txAck(new DtxAck(accumulatedAck, unacked)); accumulatedAck.clear(); dtxBuffer->enlist(txAck); + + //mark the relevant messages as 'ended' in unacked + for_each(range.start, range.end, mem_fun_ref(&DeliveryRecord::setEnded)); + + //if the messages are already completed, they can be + //removed from the record + unacked.remove_if(mem_fun_ref(&DeliveryRecord::isRedundant)); + } } else { for_each(range.start, range.end, bind2nd(mem_fun_ref(&DeliveryRecord::accept), 0)); |