summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker/SemanticState.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/broker/SemanticState.cpp')
-rw-r--r--cpp/src/qpid/broker/SemanticState.cpp23
1 files changed, 18 insertions, 5 deletions
diff --git a/cpp/src/qpid/broker/SemanticState.cpp b/cpp/src/qpid/broker/SemanticState.cpp
index 1b5bcc2f7e..6f5577de5a 100644
--- a/cpp/src/qpid/broker/SemanticState.cpp
+++ b/cpp/src/qpid/broker/SemanticState.cpp
@@ -29,6 +29,7 @@
#include "Message.h"
#include "SemanticHandler.h"
#include "SessionHandler.h"
+#include "TxAccept.h"
#include "TxAck.h"
#include "TxPublish.h"
#include "qpid/framing/reply_exceptions.h"
@@ -119,12 +120,14 @@ void SemanticState::startTx()
txBuffer = TxBuffer::shared_ptr(new TxBuffer());
}
-void SemanticState::commit(MessageStore* const store)
+void SemanticState::commit(MessageStore* const store, bool completeOnCommit)
{
if (!txBuffer) throw
CommandInvalidException(QPID_MSG("Session has not been selected for use with transactions"));
- TxOp::shared_ptr txAck(new TxAck(accumulatedAck, unacked));
+ TxOp::shared_ptr txAck(completeOnCommit ?
+ static_cast<TxOp*>(new TxAck(accumulatedAck, unacked)) :
+ static_cast<TxOp*>(new TxAccept(accumulatedAck, unacked)));
txBuffer->enlist(txAck);
if (txBuffer->commitLocal(store)) {
accumulatedAck.clear();
@@ -402,9 +405,11 @@ void SemanticState::ack(DeliveryId first, DeliveryId last, bool cumulative)
accumulatedAck.update(cumulative ? accumulatedAck.mark : first, last);
if (dtxBuffer.get()) {
- //if enlisted in a dtx, remove the relevant slice from
- //unacked and record it against that transaction
+ //if enlisted in a dtx, copy the relevant slice from
+ //unacked and record it against that transaction:
TxOp::shared_ptr txAck(new DtxAck(accumulatedAck, unacked));
+ //then remove that slice from the unacked record:
+ unacked.remove_if(bind2nd(mem_fun_ref(&DeliveryRecord::coveredBy), &accumulatedAck));
accumulatedAck.clear();
dtxBuffer->enlist(txAck);
}
@@ -660,11 +665,19 @@ void SemanticState::accepted(DeliveryId first, DeliveryId last)
accumulatedAck.update(first, last);//TODO convert accumulatedAck to SequenceSet
if (dtxBuffer.get()) {
- //if enlisted in a dtx, remove the relevant slice from
+ //if enlisted in a dtx, copy the relevant slice from
//unacked and record it against that transaction
TxOp::shared_ptr txAck(new DtxAck(accumulatedAck, unacked));
accumulatedAck.clear();
dtxBuffer->enlist(txAck);
+
+ //mark the relevant messages as 'ended' in unacked
+ for_each(range.start, range.end, mem_fun_ref(&DeliveryRecord::setEnded));
+
+ //if the messages are already completed, they can be
+ //removed from the record
+ unacked.remove_if(mem_fun_ref(&DeliveryRecord::isRedundant));
+
}
} else {
for_each(range.start, range.end, bind2nd(mem_fun_ref(&DeliveryRecord::accept), 0));