diff options
Diffstat (limited to 'qpid/cpp/src/qpid/cluster/Connection.cpp')
-rw-r--r-- | qpid/cpp/src/qpid/cluster/Connection.cpp | 29 |
1 files changed, 18 insertions, 11 deletions
diff --git a/qpid/cpp/src/qpid/cluster/Connection.cpp b/qpid/cpp/src/qpid/cluster/Connection.cpp index 0691aae711..e0ce8cf9e0 100644 --- a/qpid/cpp/src/qpid/cluster/Connection.cpp +++ b/qpid/cpp/src/qpid/cluster/Connection.cpp @@ -419,7 +419,8 @@ void Connection::sessionState( const SequenceNumber& expected, const SequenceNumber& received, const SequenceSet& unknownCompleted, - const SequenceSet& receivedIncomplete) + const SequenceSet& receivedIncomplete, + bool dtxSelected) { sessionState().setState( replayStart, @@ -429,7 +430,9 @@ void Connection::sessionState( received, unknownCompleted, receivedIncomplete); - QPID_LOG(debug, cluster << " received session state update for " << sessionState().getId()); + if (dtxSelected) semanticState().selectDtx(); + QPID_LOG(debug, cluster << " received session state update for " + << sessionState().getId()); // The output tasks will be added later in the update process. connection->getOutputTasks().removeAll(); } @@ -459,11 +462,14 @@ void Connection::shadowReady( output.setSendMax(sendMax); } -void Connection::setDtxBuffer(const UpdateReceiver::DtxBuffers::value_type &v) { +void Connection::setDtxBuffer(const UpdateReceiver::DtxBufferRef& bufRef) { broker::DtxManager& mgr = cluster.getBroker().getDtxManager(); - broker::DtxWorkRecord* record = mgr.getWork(v.first.first); // XID - uint32_t index = v.first.second; // Index - v.second->setDtxBuffer((*record)[index]); + broker::DtxWorkRecord* record = mgr.getWork(bufRef.xid); + broker::DtxBuffer::shared_ptr buffer = (*record)[bufRef.index]; + if (bufRef.suspended) + bufRef.semanticState->getSuspendedXids()[bufRef.xid] = buffer; + else + bufRef.semanticState->setDtxBuffer(buffer); } // Marks the end of the update. @@ -694,11 +700,12 @@ void Connection::dtxAck() { dtxAckRecords.clear(); } -void Connection::dtxBufferRef(const std::string& xid, uint32_t index) { - // Save the association between DtxBuffer and session so we can - // set the DtxBuffer on the session at the end of the update - // when the DtxManager has been replicated. - updateIn.dtxBuffers[std::make_pair(xid, index)] = &semanticState(); +void Connection::dtxBufferRef(const std::string& xid, uint32_t index, bool suspended) { + // Save the association between DtxBuffers and the session so we + // can set the DtxBuffers at the end of the update when the + // DtxManager has been replicated. + updateIn.dtxBuffers.push_back( + UpdateReceiver::DtxBufferRef(xid, index, suspended, &semanticState())); } // Sent at end of work record. |