summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/qpid/cluster/Connection.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src/qpid/cluster/Connection.cpp')
-rw-r--r--qpid/cpp/src/qpid/cluster/Connection.cpp29
1 files changed, 18 insertions, 11 deletions
diff --git a/qpid/cpp/src/qpid/cluster/Connection.cpp b/qpid/cpp/src/qpid/cluster/Connection.cpp
index 0691aae711..e0ce8cf9e0 100644
--- a/qpid/cpp/src/qpid/cluster/Connection.cpp
+++ b/qpid/cpp/src/qpid/cluster/Connection.cpp
@@ -419,7 +419,8 @@ void Connection::sessionState(
const SequenceNumber& expected,
const SequenceNumber& received,
const SequenceSet& unknownCompleted,
- const SequenceSet& receivedIncomplete)
+ const SequenceSet& receivedIncomplete,
+ bool dtxSelected)
{
sessionState().setState(
replayStart,
@@ -429,7 +430,9 @@ void Connection::sessionState(
received,
unknownCompleted,
receivedIncomplete);
- QPID_LOG(debug, cluster << " received session state update for " << sessionState().getId());
+ if (dtxSelected) semanticState().selectDtx();
+ QPID_LOG(debug, cluster << " received session state update for "
+ << sessionState().getId());
// The output tasks will be added later in the update process.
connection->getOutputTasks().removeAll();
}
@@ -459,11 +462,14 @@ void Connection::shadowReady(
output.setSendMax(sendMax);
}
-void Connection::setDtxBuffer(const UpdateReceiver::DtxBuffers::value_type &v) {
+void Connection::setDtxBuffer(const UpdateReceiver::DtxBufferRef& bufRef) {
broker::DtxManager& mgr = cluster.getBroker().getDtxManager();
- broker::DtxWorkRecord* record = mgr.getWork(v.first.first); // XID
- uint32_t index = v.first.second; // Index
- v.second->setDtxBuffer((*record)[index]);
+ broker::DtxWorkRecord* record = mgr.getWork(bufRef.xid);
+ broker::DtxBuffer::shared_ptr buffer = (*record)[bufRef.index];
+ if (bufRef.suspended)
+ bufRef.semanticState->getSuspendedXids()[bufRef.xid] = buffer;
+ else
+ bufRef.semanticState->setDtxBuffer(buffer);
}
// Marks the end of the update.
@@ -694,11 +700,12 @@ void Connection::dtxAck() {
dtxAckRecords.clear();
}
-void Connection::dtxBufferRef(const std::string& xid, uint32_t index) {
- // Save the association between DtxBuffer and session so we can
- // set the DtxBuffer on the session at the end of the update
- // when the DtxManager has been replicated.
- updateIn.dtxBuffers[std::make_pair(xid, index)] = &semanticState();
+void Connection::dtxBufferRef(const std::string& xid, uint32_t index, bool suspended) {
+ // Save the association between DtxBuffers and the session so we
+ // can set the DtxBuffers at the end of the update when the
+ // DtxManager has been replicated.
+ updateIn.dtxBuffers.push_back(
+ UpdateReceiver::DtxBufferRef(xid, index, suspended, &semanticState()));
}
// Sent at end of work record.