summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/cluster/Connection.cpp
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2012-06-21 16:05:09 +0000
committerAlan Conway <aconway@apache.org>2012-06-21 16:05:09 +0000
commit30133c25a10861ca9e589e22c13bea4dd99f4c0f (patch)
treeaa8ff676fd4467b92c8138e68d6a8a5da1cb535a /cpp/src/qpid/cluster/Connection.cpp
parentf3900f98ccddc80daf4ad18e4acf718504864b7a (diff)
downloadqpid-python-30133c25a10861ca9e589e22c13bea4dd99f4c0f.tar.gz
QPID-4082: cluster de-sync after broker restart & queue replication
Having queue state replication between 2 clusters, restarting a broker in both source+destination clusters sometimes leads to cluster de-sync. No QMF communication is involved, though symptoms are similar to the bug caused by missing propagation of QMF errors within a cluster. The bug is caused by "deliveryCount" in SemanticState::ConsumerImpl (qpid/broker/SemanticState.cpp) not being replicated to a joining cluster node during catch-up. When the elder broker in src.cluster sends session.sync() after sending 5 messages (per --ack 5 in qpid-route), the recently joiner node in src.cluster does not do so, what leads to the cluster de-sync. The patch: - adds to "consumer-state" method (see xml/cluster.xml file change) to update a new joi-ner a new property deliveryCount - updates cluster::Connection::consumerState to send deliveryCount to the method - updates cluster::Connection::consumerState to set the received deliveryCount - add two methods to broker::SemanticState::ConsumerImpl for getting and setting deliveryCount git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1352588 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/cluster/Connection.cpp')
-rw-r--r--cpp/src/qpid/cluster/Connection.cpp3
1 files changed, 2 insertions, 1 deletions
diff --git a/cpp/src/qpid/cluster/Connection.cpp b/cpp/src/qpid/cluster/Connection.cpp
index 081d54ab49..ee1bc1feb0 100644
--- a/cpp/src/qpid/cluster/Connection.cpp
+++ b/cpp/src/qpid/cluster/Connection.cpp
@@ -406,11 +406,12 @@ void Connection::shadowSetUser(const std::string& userId) {
}
void Connection::consumerState(const string& name, bool blocked, bool notifyEnabled, const SequenceNumber& position,
- uint32_t usedMsgCredit, uint32_t usedByteCredit)
+ uint32_t usedMsgCredit, uint32_t usedByteCredit, const uint32_t deliveryCount)
{
broker::SemanticState::ConsumerImpl::shared_ptr c = semanticState().find(name);
c->setPosition(position);
c->setBlocked(blocked);
+ c->setDeliveryCount(deliveryCount);
if (c->getCredit().isWindowMode()) c->getCredit().consume(usedMsgCredit, usedByteCredit);
if (notifyEnabled) c->enableNotify(); else c->disableNotify();
updateIn.consumerNumbering.add(c);