summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2009-12-22 14:22:17 +0000
committerAlan Conway <aconway@apache.org>2009-12-22 14:22:17 +0000
commitda669e5182ba84da83817313073777d4f8a3bd00 (patch)
tree29efe0c2a386c5a4f82fbefc605d194f10e519e6
parentb18369bc119468c0a320a191f7a6b9f76a9389b9 (diff)
downloadqpid-python-da669e5182ba84da83817313073777d4f8a3bd00.tar.gz
QPID-2296: Cluster errors when using acquire-mode-not-acquired
Replicate consumer's queue position to new cluster nodes. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@893175 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--cpp/src/qpid/cluster/Connection.cpp3
-rw-r--r--cpp/src/qpid/cluster/Connection.h2
-rw-r--r--cpp/src/qpid/cluster/UpdateClient.cpp3
-rw-r--r--cpp/src/tests/cluster_test.cpp37
-rw-r--r--cpp/xml/cluster.xml1
5 files changed, 43 insertions, 3 deletions
diff --git a/cpp/src/qpid/cluster/Connection.cpp b/cpp/src/qpid/cluster/Connection.cpp
index 2eda84ae11..d223244f15 100644
--- a/cpp/src/qpid/cluster/Connection.cpp
+++ b/cpp/src/qpid/cluster/Connection.cpp
@@ -269,9 +269,10 @@ broker::SemanticState& Connection::semanticState() {
return sessionState().getSemanticState();
}
-void Connection::consumerState(const string& name, bool blocked, bool notifyEnabled)
+void Connection::consumerState(const string& name, bool blocked, bool notifyEnabled, const SequenceNumber& position)
{
broker::SemanticState::ConsumerImpl& c = semanticState().find(name);
+ c.position = position;
c.setBlocked(blocked);
if (notifyEnabled) c.enableNotify(); else c.disableNotify();
consumerNumbering.add(c.shared_from_this());
diff --git a/cpp/src/qpid/cluster/Connection.h b/cpp/src/qpid/cluster/Connection.h
index 57cca865db..7f94338348 100644
--- a/cpp/src/qpid/cluster/Connection.h
+++ b/cpp/src/qpid/cluster/Connection.h
@@ -108,7 +108,7 @@ class Connection :
// Called for data delivered from the cluster.
void deliveredFrame(const EventFrame&);
- void consumerState(const std::string& name, bool blocked, bool notifyEnabled);
+ void consumerState(const std::string& name, bool blocked, bool notifyEnabled, const qpid::framing::SequenceNumber& position);
// ==== Used in catch-up mode to build initial state.
//
diff --git a/cpp/src/qpid/cluster/UpdateClient.cpp b/cpp/src/qpid/cluster/UpdateClient.cpp
index 279284da2c..b20cc907a2 100644
--- a/cpp/src/qpid/cluster/UpdateClient.cpp
+++ b/cpp/src/qpid/cluster/UpdateClient.cpp
@@ -381,7 +381,8 @@ void UpdateClient::updateConsumer(
ClusterConnectionProxy(shadowSession).consumerState(
ci->getName(),
ci->isBlocked(),
- ci->isNotifyEnabled()
+ ci->isNotifyEnabled(),
+ ci->position
);
consumerNumbering.add(ci);
diff --git a/cpp/src/tests/cluster_test.cpp b/cpp/src/tests/cluster_test.cpp
index a2e18b8f78..33f23aa5b3 100644
--- a/cpp/src/tests/cluster_test.cpp
+++ b/cpp/src/tests/cluster_test.cpp
@@ -1149,6 +1149,43 @@ QPID_AUTO_TEST_CASE(testExactByteCredit) {
BOOST_CHECK(!browseByteCredit(c1, "q", size-1, m));
}
+// Test that consumer positions are updated correctly.
+// Regression test for https://bugzilla.redhat.com/show_bug.cgi?id=541927
+//
+QPID_AUTO_TEST_CASE(testUpdateConsumerPosition) {
+ ClusterFixture::Args args;
+ prepareArgs(args, durableFlag);
+ ClusterFixture cluster(1, args, -1);
+ Client c0(cluster[0], "c0");
+
+ c0.session.queueDeclare("q", arg::durable=durableFlag);
+ SubscriptionSettings settings;
+ settings.autoAck = 0;
+ // Set the acquire mode to 'not-acquired' the consumer moves along the queue
+ // but does not acquire (remove) messages.
+ settings.acquireMode = ACQUIRE_MODE_NOT_ACQUIRED;
+ Subscription s = c0.subs.subscribe(c0.lq, "q", settings);
+ c0.session.messageTransfer(arg::content=makeMessage("1", "q", durableFlag));
+ BOOST_CHECK_EQUAL("1", c0.lq.get(TIMEOUT).getData());
+
+ // Add another member, send/receive another message and acquire
+ // the messages. With the bug, this creates an inconsistency
+ // because the browse position was not updated to the new member.
+ cluster.add();
+ c0.session.messageTransfer(arg::content=makeMessage("2", "q", durableFlag));
+ BOOST_CHECK_EQUAL("2", c0.lq.get(TIMEOUT).getData());
+ s.acquire(s.getUnacquired());
+ s.accept(s.getUnaccepted());
+
+ // In the bug we now have 0 messages on cluster[0] and 1 message on cluster[1]
+ // Subscribing on cluster[1] provokes an error that shuts down cluster[0]
+ Client c1(cluster[1], "c1");
+ Subscription s1 = c1.subs.subscribe(c1.lq, "q"); // Default auto-ack=1
+ Message m;
+ BOOST_CHECK(!c1.lq.get(m, TIMEOUT/10));
+ BOOST_CHECK_EQUAL(c1.session.queueQuery("q").getMessageCount(), 0u);
+ BOOST_CHECK_EQUAL(c0.session.queueQuery("q").getMessageCount(), 0u);
+}
QPID_AUTO_TEST_SUITE_END()
}} // namespace qpid::tests
diff --git a/cpp/xml/cluster.xml b/cpp/xml/cluster.xml
index c0b7127f68..e2c69d33fd 100644
--- a/cpp/xml/cluster.xml
+++ b/cpp/xml/cluster.xml
@@ -137,6 +137,7 @@
<field name="name" type="str8"/>
<field name="blocked" type="bit"/>
<field name="notifyEnabled" type="bit"/>
+ <field name="position" type="sequence-no"/>
</control>
<!-- Delivery-record for outgoing messages sent but not yet accepted. -->