summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/qpid/cluster/UpdateClient.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src/qpid/cluster/UpdateClient.cpp')
-rw-r--r--qpid/cpp/src/qpid/cluster/UpdateClient.cpp28
1 files changed, 20 insertions, 8 deletions
diff --git a/qpid/cpp/src/qpid/cluster/UpdateClient.cpp b/qpid/cpp/src/qpid/cluster/UpdateClient.cpp
index a5662bb2b3..3142b44d71 100644
--- a/qpid/cpp/src/qpid/cluster/UpdateClient.cpp
+++ b/qpid/cpp/src/qpid/cluster/UpdateClient.cpp
@@ -506,7 +506,8 @@ void UpdateClient::updateSession(broker::SessionHandler& sh) {
std::max(received, ss->receiverGetExpected().command),
received,
ss->receiverGetUnknownComplete(),
- ss->receiverGetIncomplete()
+ ss->receiverGetIncomplete(),
+ ss->getSemanticState().getDtxSelected()
);
// Send frames for partial message in progress.
@@ -552,7 +553,6 @@ void UpdateClient::updateUnacked(const broker::DeliveryRecord& dr,
client::AsyncSession& updateSession)
{
if (!dr.isEnded() && dr.isAcquired()) {
- // FIXME aconway 2011-08-19: should this be assert or if?
assert(dr.getMessage().payload);
// If the message is acquired then it is no longer on the
// updatees queue, put it on the update queue for updatee to pick up.
@@ -621,22 +621,34 @@ class TxOpUpdater : public broker::TxOpConstVisitor, public MessageUpdater {
ClusterConnectionProxy proxy;
};
+void UpdateClient::updateBufferRef(const broker::DtxBuffer::shared_ptr& dtx,bool suspended)
+{
+ ClusterConnectionProxy proxy(shadowSession);
+ broker::DtxWorkRecord* record =
+ updaterBroker.getDtxManager().getWork(dtx->getXid());
+ proxy.dtxBufferRef(dtx->getXid(), record->indexOf(dtx), suspended);
+
+}
+
void UpdateClient::updateTransactionState(broker::SemanticState& s) {
- broker::TxBuffer::shared_ptr tx = s.getTxBuffer();
- broker::DtxBuffer::shared_ptr dtx = s.getDtxBuffer();
ClusterConnectionProxy proxy(shadowSession);
proxy.accumulatedAck(s.getAccumulatedAck());
+ broker::TxBuffer::shared_ptr tx = s.getTxBuffer();
+ broker::DtxBuffer::shared_ptr dtx = s.getDtxBuffer();
if (dtx) {
- broker::DtxWorkRecord* record =
- updaterBroker.getDtxManager().getWork(dtx->getXid()); // throws if not found
- proxy.dtxBufferRef(dtx->getXid(), record->indexOf(dtx));
+ updateBufferRef(dtx, false); // Current transaction.
} else if (tx) {
- ClusterConnectionProxy proxy(shadowSession);
proxy.txStart();
TxOpUpdater updater(*this, shadowSession, expiry);
tx->accept(updater);
proxy.txEnd();
}
+ for (SemanticState::DtxBufferMap::iterator i = s.getSuspendedXids().begin();
+ i != s.getSuspendedXids().end();
+ ++i)
+ {
+ updateBufferRef(i->second, true);
+ }
}
void UpdateClient::updateDtxBuffer(const broker::DtxBuffer::shared_ptr& dtx) {