summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2009-06-05 17:39:07 +0000
committerGordon Sim <gsim@apache.org>2009-06-05 17:39:07 +0000
commit262dc4f141a94094827b7ccef18c2d210c710f97 (patch)
tree670ccaf4ffd6308edf3321ce9c44f91090027cff
parentcf08370085e71331c31adc6c51752b0de655e1a2 (diff)
downloadqpid-python-262dc4f141a94094827b7ccef18c2d210c710f97.tar.gz
Further fix to new cluster member state transfer to fix a case where unacked messages on ring policy queue cause inconsistencies in queue state between nodes.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@782075 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/src/qpid/broker/Queue.cpp9
-rw-r--r--qpid/cpp/src/qpid/broker/Queue.h9
-rw-r--r--qpid/cpp/src/qpid/cluster/Connection.cpp3
-rw-r--r--qpid/cpp/src/qpid/cluster/Connection.h1
-rw-r--r--qpid/cpp/src/qpid/cluster/UpdateClient.cpp1
-rw-r--r--qpid/cpp/src/tests/BrokerFixture.h1
-rw-r--r--qpid/cpp/src/tests/cluster_test.cpp87
-rw-r--r--qpid/cpp/xml/cluster.xml1
8 files changed, 89 insertions, 23 deletions
diff --git a/qpid/cpp/src/qpid/broker/Queue.cpp b/qpid/cpp/src/qpid/broker/Queue.cpp
index bbdbf19e92..ffe743bac1 100644
--- a/qpid/cpp/src/qpid/broker/Queue.cpp
+++ b/qpid/cpp/src/qpid/broker/Queue.cpp
@@ -202,7 +202,7 @@ void Queue::process(boost::intrusive_ptr<Message>& msg){
}
void Queue::requeue(const QueuedMessage& msg){
- if (policy.get() && !policy->isEnqueued(msg)) return;
+ if (!isEnqueued(msg)) return;
QueueListeners::NotificationSet copy;
{
@@ -691,7 +691,7 @@ bool Queue::dequeue(TransactionContext* ctxt, const QueuedMessage& msg)
{
{
Mutex::ScopedLock locker(messageLock);
- if (policy.get() && !policy->isEnqueued(msg)) return false;
+ if (!isEnqueued(msg)) return false;
if (!ctxt) {
dequeued(msg);
}
@@ -1019,3 +1019,8 @@ void Queue::enqueued(const QueuedMessage& m)
QPID_LOG(warning, "Queue informed of enqueued message that has no payload");
}
}
+
+bool Queue::isEnqueued(const QueuedMessage& msg)
+{
+ return policy.get() && policy->isEnqueued(msg);
+}
diff --git a/qpid/cpp/src/qpid/broker/Queue.h b/qpid/cpp/src/qpid/broker/Queue.h
index de60362854..a8b775cba7 100644
--- a/qpid/cpp/src/qpid/broker/Queue.h
+++ b/qpid/cpp/src/qpid/broker/Queue.h
@@ -257,6 +257,15 @@ namespace qpid {
* clustered broker.
*/
void enqueued(const QueuedMessage& msg);
+
+ /**
+ * Test whether the specified message (identified by its
+ * sequence/position), is still enqueued (note this
+ * doesn't mean it is available for delivery as it may
+ * have been delievered to a subscriber who has not yet
+ * accepted it).
+ */
+ bool isEnqueued(const QueuedMessage& msg);
/**
* Gets the next available message
diff --git a/qpid/cpp/src/qpid/cluster/Connection.cpp b/qpid/cpp/src/qpid/cluster/Connection.cpp
index f2904c6734..be6281b4e3 100644
--- a/qpid/cpp/src/qpid/cluster/Connection.cpp
+++ b/qpid/cpp/src/qpid/cluster/Connection.cpp
@@ -325,6 +325,7 @@ void Connection::deliveryRecord(const string& qname,
bool completed,
bool ended,
bool windowing,
+ bool enqueued,
uint32_t credit)
{
broker::QueuedMessage m;
@@ -333,7 +334,7 @@ void Connection::deliveryRecord(const string& qname,
if (acquired) { // Message is on the update queue
m = getUpdateMessage();
m.queue = queue.get();
- queue->enqueued(m); //inform queue of the message
+ if (enqueued) queue->enqueued(m); //inform queue of the message
} else { // Message at original position in original queue
m = queue->find(position);
}
diff --git a/qpid/cpp/src/qpid/cluster/Connection.h b/qpid/cpp/src/qpid/cluster/Connection.h
index a0be2203e4..8e3b0ad337 100644
--- a/qpid/cpp/src/qpid/cluster/Connection.h
+++ b/qpid/cpp/src/qpid/cluster/Connection.h
@@ -129,6 +129,7 @@ class Connection :
bool completed,
bool ended,
bool windowing,
+ bool enqueued,
uint32_t credit);
void queuePosition(const std::string&, const framing::SequenceNumber&);
diff --git a/qpid/cpp/src/qpid/cluster/UpdateClient.cpp b/qpid/cpp/src/qpid/cluster/UpdateClient.cpp
index 7fdbe73926..332e74c512 100644
--- a/qpid/cpp/src/qpid/cluster/UpdateClient.cpp
+++ b/qpid/cpp/src/qpid/cluster/UpdateClient.cpp
@@ -372,6 +372,7 @@ void UpdateClient::updateUnacked(const broker::DeliveryRecord& dr) {
dr.isComplete(),
dr.isEnded(),
dr.isWindowing(),
+ dr.getQueue()->isEnqueued(dr.getMessage()),
dr.getCredit()
);
}
diff --git a/qpid/cpp/src/tests/BrokerFixture.h b/qpid/cpp/src/tests/BrokerFixture.h
index c691e351e7..397045d00b 100644
--- a/qpid/cpp/src/tests/BrokerFixture.h
+++ b/qpid/cpp/src/tests/BrokerFixture.h
@@ -123,6 +123,7 @@ struct ClientT {
: connection(settings), session(connection.newSession(name_)), subs(session), name(name_) {}
~ClientT() { connection.close(); }
+ void close() { session.close(); connection.close(); }
};
typedef ClientT<> Client;
diff --git a/qpid/cpp/src/tests/cluster_test.cpp b/qpid/cpp/src/tests/cluster_test.cpp
index 0c249bdf15..a903d192c2 100644
--- a/qpid/cpp/src/tests/cluster_test.cpp
+++ b/qpid/cpp/src/tests/cluster_test.cpp
@@ -856,6 +856,41 @@ QPID_AUTO_TEST_CASE(testExclusiveQueueUpdate) {
BOOST_CHECK_THROW(c2.session.queueDeclare(arg::queue="q", arg::passive=true), framing::NotFoundException);
}
+/**
+ * Subscribes to specified queue and acquires up to the specified
+ * number of message but does not accept or release them. These
+ * message are therefore 'locked' by the clients session.
+ */
+Subscription lockMessages(Client& client, const std::string& queue, int count)
+{
+ LocalQueue q;
+ SubscriptionSettings settings(FlowControl::messageCredit(count));
+ settings.autoAck = 0;
+ Subscription sub = client.subs.subscribe(q, queue, settings);
+ client.session.messageFlush(sub.getName());
+ return sub;
+}
+
+/**
+ * check that the specified queue contains the expected set of
+ * messages (matched on content) for all nodes in the cluster
+ */
+void checkQueue(ClusterFixture& cluster, const std::string& queue, const std::vector<std::string>& messages)
+{
+ for (size_t i = 0; i < cluster.size(); i++) {
+ Client client(cluster[i], (boost::format("%1%_%2%") % "c" % (i+1)).str());
+ BOOST_CHECK_EQUAL(browse(client, queue, messages.size()), messages);
+ client.close();
+ }
+}
+
+void send(Client& client, const std::string& queue, int count, int start=1, const std::string& base="m")
+{
+ for (int i = 0; i < count; i++) {
+ client.session.messageTransfer(arg::content=makeMessage((boost::format("%1%_%2%") % base % (i+start)).str(), queue, durableFlag));
+ }
+}
+
QPID_AUTO_TEST_CASE(testRingQueueUpdate) {
ScopedSuppressLogging allQuiet;
//tests that ring queues are accurately replicated on newly
@@ -868,31 +903,43 @@ QPID_AUTO_TEST_CASE(testRingQueueUpdate) {
QueueOptions options;
options.setSizePolicy(RING, 0, 5);
c1.session.queueDeclare("q", arg::arguments=options, arg::durable=durableFlag);
- for (int i = 0; i < 5; i++) {
- c1.session.messageTransfer(arg::content=makeMessage((boost::format("%1%_%2%") % "m" % (i+1)).str(), "q", durableFlag));
- }
- //receive but don't ack a message
- LocalQueue lq;
- SubscriptionSettings lqSettings(FlowControl::messageCredit(1));
- lqSettings.autoAck = 0;
- Subscription lqSub = c1.subs.subscribe(lq, "q", lqSettings);
- c1.session.messageFlush("q");
-
+ send(c1, "q", 5);
+ lockMessages(c1, "q", 1);
//add new node
cluster.add();
-
+ BOOST_CHECK_EQUAL(2u, knownBrokerPorts(c1.connection, 2).size());//wait till joined
//send one more message
- c1.session.messageTransfer(arg::content=makeMessage((boost::format("%1%_%2%") % "m" % 6).str(), "q", durableFlag));
-
- c1.session.close();
- c1.connection.close();
+ send(c1, "q", 1, 6);
+ //release locked message
+ c1.close();
+ //check state of queue on both nodes
+ checkQueue(cluster, "q", list_of<string>("m_2")("m_3")("m_4")("m_5")("m_6"));
+}
+QPID_AUTO_TEST_CASE(testRingQueueUpdate2) {
+ ScopedSuppressLogging allQuiet;
+ //tests that ring queues are accurately replicated on newly joined
+ //nodes; just like testRingQueueUpdate, but new node joins after
+ //the sixth message has been sent.
+ ClusterFixture::Args args;
+ args += "--log-enable", "critical";
+ prepareArgs(args, durableFlag);
+ ClusterFixture cluster(1, args, -1);
+ Client c1(cluster[0], "c1");
+ QueueOptions options;
+ options.setSizePolicy(RING, 0, 5);
+ c1.session.queueDeclare("q", arg::arguments=options, arg::durable=durableFlag);
+ send(c1, "q", 5);
+ lockMessages(c1, "q", 1);
+ //send sixth message
+ send(c1, "q", 1, 6);
+ //add new node
+ cluster.add();
+ BOOST_CHECK_EQUAL(2u, knownBrokerPorts(c1.connection, 2).size());//wait till joined
+ //release locked message
+ c1.close();
//check state of queue on both nodes
- vector<string> expected = list_of<string>("m_2")("m_3")("m_4")("m_5")("m_6");
- Client c3(cluster[0], "c3");
- BOOST_CHECK_EQUAL(browse(c3, "q", 5), expected);
- Client c2(cluster[1], "c2");
- BOOST_CHECK_EQUAL(browse(c2, "q", 5), expected);
+ checkQueue(cluster, "q", list_of<string>("m_2")("m_3")("m_4")("m_5")("m_6"));
}
QPID_AUTO_TEST_CASE(testRelease) {
diff --git a/qpid/cpp/xml/cluster.xml b/qpid/cpp/xml/cluster.xml
index f66c7a7791..58b067a3db 100644
--- a/qpid/cpp/xml/cluster.xml
+++ b/qpid/cpp/xml/cluster.xml
@@ -105,6 +105,7 @@
<field name="completed" type="bit"/>
<field name="ended" type="bit"/>
<field name="windowing" type="bit"/>
+ <field name="enqueued" type="bit"/>
<field name="credit" type="uint32"/>
</control>