diff options
Diffstat (limited to 'qpid/cpp/src/qpid/cluster/UpdateClient.cpp')
-rw-r--r-- | qpid/cpp/src/qpid/cluster/UpdateClient.cpp | 28 |
1 files changed, 20 insertions, 8 deletions
diff --git a/qpid/cpp/src/qpid/cluster/UpdateClient.cpp b/qpid/cpp/src/qpid/cluster/UpdateClient.cpp index a5662bb2b3..3142b44d71 100644 --- a/qpid/cpp/src/qpid/cluster/UpdateClient.cpp +++ b/qpid/cpp/src/qpid/cluster/UpdateClient.cpp @@ -506,7 +506,8 @@ void UpdateClient::updateSession(broker::SessionHandler& sh) { std::max(received, ss->receiverGetExpected().command), received, ss->receiverGetUnknownComplete(), - ss->receiverGetIncomplete() + ss->receiverGetIncomplete(), + ss->getSemanticState().getDtxSelected() ); // Send frames for partial message in progress. @@ -552,7 +553,6 @@ void UpdateClient::updateUnacked(const broker::DeliveryRecord& dr, client::AsyncSession& updateSession) { if (!dr.isEnded() && dr.isAcquired()) { - // FIXME aconway 2011-08-19: should this be assert or if? assert(dr.getMessage().payload); // If the message is acquired then it is no longer on the // updatees queue, put it on the update queue for updatee to pick up. @@ -621,22 +621,34 @@ class TxOpUpdater : public broker::TxOpConstVisitor, public MessageUpdater { ClusterConnectionProxy proxy; }; +void UpdateClient::updateBufferRef(const broker::DtxBuffer::shared_ptr& dtx,bool suspended) +{ + ClusterConnectionProxy proxy(shadowSession); + broker::DtxWorkRecord* record = + updaterBroker.getDtxManager().getWork(dtx->getXid()); + proxy.dtxBufferRef(dtx->getXid(), record->indexOf(dtx), suspended); + +} + void UpdateClient::updateTransactionState(broker::SemanticState& s) { - broker::TxBuffer::shared_ptr tx = s.getTxBuffer(); - broker::DtxBuffer::shared_ptr dtx = s.getDtxBuffer(); ClusterConnectionProxy proxy(shadowSession); proxy.accumulatedAck(s.getAccumulatedAck()); + broker::TxBuffer::shared_ptr tx = s.getTxBuffer(); + broker::DtxBuffer::shared_ptr dtx = s.getDtxBuffer(); if (dtx) { - broker::DtxWorkRecord* record = - updaterBroker.getDtxManager().getWork(dtx->getXid()); // throws if not found - proxy.dtxBufferRef(dtx->getXid(), record->indexOf(dtx)); + updateBufferRef(dtx, false); // Current transaction. } else if (tx) { - ClusterConnectionProxy proxy(shadowSession); proxy.txStart(); TxOpUpdater updater(*this, shadowSession, expiry); tx->accept(updater); proxy.txEnd(); } + for (SemanticState::DtxBufferMap::iterator i = s.getSuspendedXids().begin(); + i != s.getSuspendedXids().end(); + ++i) + { + updateBufferRef(i->second, true); + } } void UpdateClient::updateDtxBuffer(const broker::DtxBuffer::shared_ptr& dtx) { |