summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2012-06-21 16:05:09 +0000
committerAlan Conway <aconway@apache.org>2012-06-21 16:05:09 +0000
commit317188a84b04b38c4c905ccd4c8b2fa9b8ae46b8 (patch)
tree997e1e32fe1de3733a030bc14e1203b20c42e63c
parent84c38f398788acc88d412f37aa4866c8b01e70e0 (diff)
downloadqpid-python-317188a84b04b38c4c905ccd4c8b2fa9b8ae46b8.tar.gz
QPID-4082: cluster de-sync after broker restart & queue replication
Having queue state replication between 2 clusters, restarting a broker in both source+destination clusters sometimes leads to cluster de-sync. No QMF communication is involved, though symptoms are similar to the bug caused by missing propagation of QMF errors within a cluster. The bug is caused by "deliveryCount" in SemanticState::ConsumerImpl (qpid/broker/SemanticState.cpp) not being replicated to a joining cluster node during catch-up. When the elder broker in src.cluster sends session.sync() after sending 5 messages (per --ack 5 in qpid-route), the recently joiner node in src.cluster does not do so, what leads to the cluster de-sync. The patch: - adds to "consumer-state" method (see xml/cluster.xml file change) to update a new joi-ner a new property deliveryCount - updates cluster::Connection::consumerState to send deliveryCount to the method - updates cluster::Connection::consumerState to set the received deliveryCount - add two methods to broker::SemanticState::ConsumerImpl for getting and setting deliveryCount git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1352588 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/src/qpid/broker/SemanticState.h2
-rw-r--r--qpid/cpp/src/qpid/cluster/Connection.cpp3
-rw-r--r--qpid/cpp/src/qpid/cluster/Connection.h2
-rw-r--r--qpid/cpp/src/qpid/cluster/UpdateClient.cpp3
-rw-r--r--qpid/cpp/xml/cluster.xml1
5 files changed, 8 insertions, 3 deletions
diff --git a/qpid/cpp/src/qpid/broker/SemanticState.h b/qpid/cpp/src/qpid/broker/SemanticState.h
index e5e1d2da16..a3cced9c67 100644
--- a/qpid/cpp/src/qpid/broker/SemanticState.h
+++ b/qpid/cpp/src/qpid/broker/SemanticState.h
@@ -146,6 +146,8 @@ class SemanticState : private boost::noncopyable {
std::string getResumeId() const { return resumeId; };
const std::string& getTag() const { return tag; }
uint64_t getResumeTtl() const { return resumeTtl; }
+ uint32_t getDeliveryCount() const { return deliveryCount; }
+ void setDeliveryCount(uint32_t _deliveryCount) { deliveryCount = _deliveryCount; }
const framing::FieldTable& getArguments() const { return arguments; }
SemanticState& getParent() { return *parent; }
diff --git a/qpid/cpp/src/qpid/cluster/Connection.cpp b/qpid/cpp/src/qpid/cluster/Connection.cpp
index 081d54ab49..ee1bc1feb0 100644
--- a/qpid/cpp/src/qpid/cluster/Connection.cpp
+++ b/qpid/cpp/src/qpid/cluster/Connection.cpp
@@ -406,11 +406,12 @@ void Connection::shadowSetUser(const std::string& userId) {
}
void Connection::consumerState(const string& name, bool blocked, bool notifyEnabled, const SequenceNumber& position,
- uint32_t usedMsgCredit, uint32_t usedByteCredit)
+ uint32_t usedMsgCredit, uint32_t usedByteCredit, const uint32_t deliveryCount)
{
broker::SemanticState::ConsumerImpl::shared_ptr c = semanticState().find(name);
c->setPosition(position);
c->setBlocked(blocked);
+ c->setDeliveryCount(deliveryCount);
if (c->getCredit().isWindowMode()) c->getCredit().consume(usedMsgCredit, usedByteCredit);
if (notifyEnabled) c->enableNotify(); else c->disableNotify();
updateIn.consumerNumbering.add(c);
diff --git a/qpid/cpp/src/qpid/cluster/Connection.h b/qpid/cpp/src/qpid/cluster/Connection.h
index 26514c76e2..d03d6f610a 100644
--- a/qpid/cpp/src/qpid/cluster/Connection.h
+++ b/qpid/cpp/src/qpid/cluster/Connection.h
@@ -110,7 +110,7 @@ class Connection :
void deliveredFrame(const EventFrame&);
void consumerState(const std::string& name, bool blocked, bool notifyEnabled, const qpid::framing::SequenceNumber& position,
- uint32_t usedMsgCredit, uint32_t usedByteCredit);
+ uint32_t usedMsgCredit, uint32_t usedByteCredit, const uint32_t deliveryCount);
// ==== Used in catch-up mode to build initial state.
//
diff --git a/qpid/cpp/src/qpid/cluster/UpdateClient.cpp b/qpid/cpp/src/qpid/cluster/UpdateClient.cpp
index d1b9a65e14..53818aed85 100644
--- a/qpid/cpp/src/qpid/cluster/UpdateClient.cpp
+++ b/qpid/cpp/src/qpid/cluster/UpdateClient.cpp
@@ -547,7 +547,8 @@ void UpdateClient::updateConsumer(
ci->isNotifyEnabled(),
ci->getPosition(),
ci->getCredit().used().messages,
- ci->getCredit().used().bytes
+ ci->getCredit().used().bytes,
+ ci->getDeliveryCount()
);
consumerNumbering.add(ci.get());
diff --git a/qpid/cpp/xml/cluster.xml b/qpid/cpp/xml/cluster.xml
index f9b8caf185..09434ea37b 100644
--- a/qpid/cpp/xml/cluster.xml
+++ b/qpid/cpp/xml/cluster.xml
@@ -179,6 +179,7 @@
<field name="position" type="sequence-no"/>
<field name="used-msg-credit" type="uint32"/>
<field name="used-byte-credit" type="uint32"/>
+ <field name="deliveryCount" type="uint32"/>
</control>
<!-- Delivery-record for outgoing messages sent but not yet accepted. -->