diff options
| author | Alan Conway <aconway@apache.org> | 2012-06-21 16:05:09 +0000 |
|---|---|---|
| committer | Alan Conway <aconway@apache.org> | 2012-06-21 16:05:09 +0000 |
| commit | 30133c25a10861ca9e589e22c13bea4dd99f4c0f (patch) | |
| tree | aa8ff676fd4467b92c8138e68d6a8a5da1cb535a /cpp/src/qpid/cluster/Connection.cpp | |
| parent | f3900f98ccddc80daf4ad18e4acf718504864b7a (diff) | |
| download | qpid-python-30133c25a10861ca9e589e22c13bea4dd99f4c0f.tar.gz | |
QPID-4082: cluster de-sync after broker restart & queue replication
Having queue state replication between 2 clusters, restarting a broker in both
source+destination clusters sometimes leads to cluster de-sync. No QMF
communication is involved, though symptoms are similar to the bug caused by
missing propagation of QMF errors within a cluster.
The bug is caused by "deliveryCount" in SemanticState::ConsumerImpl
(qpid/broker/SemanticState.cpp) not being replicated to a joining cluster node
during catch-up. When the elder broker in src.cluster sends session.sync() after
sending 5 messages (per --ack 5 in qpid-route), the recently joiner node in
src.cluster does not do so, what leads to the cluster de-sync.
The patch:
- adds to "consumer-state" method (see xml/cluster.xml file change) to update a new joi-ner a new property deliveryCount
- updates cluster::Connection::consumerState to send deliveryCount to the method
- updates cluster::Connection::consumerState to set the received deliveryCount
- add two methods to broker::SemanticState::ConsumerImpl for getting and setting deliveryCount
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1352588 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/cluster/Connection.cpp')
| -rw-r--r-- | cpp/src/qpid/cluster/Connection.cpp | 3 |
1 files changed, 2 insertions, 1 deletions
diff --git a/cpp/src/qpid/cluster/Connection.cpp b/cpp/src/qpid/cluster/Connection.cpp index 081d54ab49..ee1bc1feb0 100644 --- a/cpp/src/qpid/cluster/Connection.cpp +++ b/cpp/src/qpid/cluster/Connection.cpp @@ -406,11 +406,12 @@ void Connection::shadowSetUser(const std::string& userId) { } void Connection::consumerState(const string& name, bool blocked, bool notifyEnabled, const SequenceNumber& position, - uint32_t usedMsgCredit, uint32_t usedByteCredit) + uint32_t usedMsgCredit, uint32_t usedByteCredit, const uint32_t deliveryCount) { broker::SemanticState::ConsumerImpl::shared_ptr c = semanticState().find(name); c->setPosition(position); c->setBlocked(blocked); + c->setDeliveryCount(deliveryCount); if (c->getCredit().isWindowMode()) c->getCredit().consume(usedMsgCredit, usedByteCredit); if (notifyEnabled) c->enableNotify(); else c->disableNotify(); updateIn.consumerNumbering.add(c); |
