diff options
author | Gordon Sim <gsim@apache.org> | 2009-06-05 17:39:07 +0000 |
---|---|---|
committer | Gordon Sim <gsim@apache.org> | 2009-06-05 17:39:07 +0000 |
commit | 262dc4f141a94094827b7ccef18c2d210c710f97 (patch) | |
tree | 670ccaf4ffd6308edf3321ce9c44f91090027cff | |
parent | cf08370085e71331c31adc6c51752b0de655e1a2 (diff) | |
download | qpid-python-262dc4f141a94094827b7ccef18c2d210c710f97.tar.gz |
Further fix to new cluster member state transfer to fix a case where unacked messages on ring policy queue cause inconsistencies in queue state between nodes.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@782075 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | qpid/cpp/src/qpid/broker/Queue.cpp | 9 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/Queue.h | 9 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/cluster/Connection.cpp | 3 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/cluster/Connection.h | 1 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/cluster/UpdateClient.cpp | 1 | ||||
-rw-r--r-- | qpid/cpp/src/tests/BrokerFixture.h | 1 | ||||
-rw-r--r-- | qpid/cpp/src/tests/cluster_test.cpp | 87 | ||||
-rw-r--r-- | qpid/cpp/xml/cluster.xml | 1 |
8 files changed, 89 insertions, 23 deletions
diff --git a/qpid/cpp/src/qpid/broker/Queue.cpp b/qpid/cpp/src/qpid/broker/Queue.cpp index bbdbf19e92..ffe743bac1 100644 --- a/qpid/cpp/src/qpid/broker/Queue.cpp +++ b/qpid/cpp/src/qpid/broker/Queue.cpp @@ -202,7 +202,7 @@ void Queue::process(boost::intrusive_ptr<Message>& msg){ } void Queue::requeue(const QueuedMessage& msg){ - if (policy.get() && !policy->isEnqueued(msg)) return; + if (!isEnqueued(msg)) return; QueueListeners::NotificationSet copy; { @@ -691,7 +691,7 @@ bool Queue::dequeue(TransactionContext* ctxt, const QueuedMessage& msg) { { Mutex::ScopedLock locker(messageLock); - if (policy.get() && !policy->isEnqueued(msg)) return false; + if (!isEnqueued(msg)) return false; if (!ctxt) { dequeued(msg); } @@ -1019,3 +1019,8 @@ void Queue::enqueued(const QueuedMessage& m) QPID_LOG(warning, "Queue informed of enqueued message that has no payload"); } } + +bool Queue::isEnqueued(const QueuedMessage& msg) +{ + return policy.get() && policy->isEnqueued(msg); +} diff --git a/qpid/cpp/src/qpid/broker/Queue.h b/qpid/cpp/src/qpid/broker/Queue.h index de60362854..a8b775cba7 100644 --- a/qpid/cpp/src/qpid/broker/Queue.h +++ b/qpid/cpp/src/qpid/broker/Queue.h @@ -257,6 +257,15 @@ namespace qpid { * clustered broker. */ void enqueued(const QueuedMessage& msg); + + /** + * Test whether the specified message (identified by its + * sequence/position), is still enqueued (note this + * doesn't mean it is available for delivery as it may + * have been delievered to a subscriber who has not yet + * accepted it). + */ + bool isEnqueued(const QueuedMessage& msg); /** * Gets the next available message diff --git a/qpid/cpp/src/qpid/cluster/Connection.cpp b/qpid/cpp/src/qpid/cluster/Connection.cpp index f2904c6734..be6281b4e3 100644 --- a/qpid/cpp/src/qpid/cluster/Connection.cpp +++ b/qpid/cpp/src/qpid/cluster/Connection.cpp @@ -325,6 +325,7 @@ void Connection::deliveryRecord(const string& qname, bool completed, bool ended, bool windowing, + bool enqueued, uint32_t credit) { broker::QueuedMessage m; @@ -333,7 +334,7 @@ void Connection::deliveryRecord(const string& qname, if (acquired) { // Message is on the update queue m = getUpdateMessage(); m.queue = queue.get(); - queue->enqueued(m); //inform queue of the message + if (enqueued) queue->enqueued(m); //inform queue of the message } else { // Message at original position in original queue m = queue->find(position); } diff --git a/qpid/cpp/src/qpid/cluster/Connection.h b/qpid/cpp/src/qpid/cluster/Connection.h index a0be2203e4..8e3b0ad337 100644 --- a/qpid/cpp/src/qpid/cluster/Connection.h +++ b/qpid/cpp/src/qpid/cluster/Connection.h @@ -129,6 +129,7 @@ class Connection : bool completed, bool ended, bool windowing, + bool enqueued, uint32_t credit); void queuePosition(const std::string&, const framing::SequenceNumber&); diff --git a/qpid/cpp/src/qpid/cluster/UpdateClient.cpp b/qpid/cpp/src/qpid/cluster/UpdateClient.cpp index 7fdbe73926..332e74c512 100644 --- a/qpid/cpp/src/qpid/cluster/UpdateClient.cpp +++ b/qpid/cpp/src/qpid/cluster/UpdateClient.cpp @@ -372,6 +372,7 @@ void UpdateClient::updateUnacked(const broker::DeliveryRecord& dr) { dr.isComplete(), dr.isEnded(), dr.isWindowing(), + dr.getQueue()->isEnqueued(dr.getMessage()), dr.getCredit() ); } diff --git a/qpid/cpp/src/tests/BrokerFixture.h b/qpid/cpp/src/tests/BrokerFixture.h index c691e351e7..397045d00b 100644 --- a/qpid/cpp/src/tests/BrokerFixture.h +++ b/qpid/cpp/src/tests/BrokerFixture.h @@ -123,6 +123,7 @@ struct ClientT { : connection(settings), session(connection.newSession(name_)), subs(session), name(name_) {} ~ClientT() { connection.close(); } + void close() { session.close(); connection.close(); } }; typedef ClientT<> Client; diff --git a/qpid/cpp/src/tests/cluster_test.cpp b/qpid/cpp/src/tests/cluster_test.cpp index 0c249bdf15..a903d192c2 100644 --- a/qpid/cpp/src/tests/cluster_test.cpp +++ b/qpid/cpp/src/tests/cluster_test.cpp @@ -856,6 +856,41 @@ QPID_AUTO_TEST_CASE(testExclusiveQueueUpdate) { BOOST_CHECK_THROW(c2.session.queueDeclare(arg::queue="q", arg::passive=true), framing::NotFoundException); } +/** + * Subscribes to specified queue and acquires up to the specified + * number of message but does not accept or release them. These + * message are therefore 'locked' by the clients session. + */ +Subscription lockMessages(Client& client, const std::string& queue, int count) +{ + LocalQueue q; + SubscriptionSettings settings(FlowControl::messageCredit(count)); + settings.autoAck = 0; + Subscription sub = client.subs.subscribe(q, queue, settings); + client.session.messageFlush(sub.getName()); + return sub; +} + +/** + * check that the specified queue contains the expected set of + * messages (matched on content) for all nodes in the cluster + */ +void checkQueue(ClusterFixture& cluster, const std::string& queue, const std::vector<std::string>& messages) +{ + for (size_t i = 0; i < cluster.size(); i++) { + Client client(cluster[i], (boost::format("%1%_%2%") % "c" % (i+1)).str()); + BOOST_CHECK_EQUAL(browse(client, queue, messages.size()), messages); + client.close(); + } +} + +void send(Client& client, const std::string& queue, int count, int start=1, const std::string& base="m") +{ + for (int i = 0; i < count; i++) { + client.session.messageTransfer(arg::content=makeMessage((boost::format("%1%_%2%") % base % (i+start)).str(), queue, durableFlag)); + } +} + QPID_AUTO_TEST_CASE(testRingQueueUpdate) { ScopedSuppressLogging allQuiet; //tests that ring queues are accurately replicated on newly @@ -868,31 +903,43 @@ QPID_AUTO_TEST_CASE(testRingQueueUpdate) { QueueOptions options; options.setSizePolicy(RING, 0, 5); c1.session.queueDeclare("q", arg::arguments=options, arg::durable=durableFlag); - for (int i = 0; i < 5; i++) { - c1.session.messageTransfer(arg::content=makeMessage((boost::format("%1%_%2%") % "m" % (i+1)).str(), "q", durableFlag)); - } - //receive but don't ack a message - LocalQueue lq; - SubscriptionSettings lqSettings(FlowControl::messageCredit(1)); - lqSettings.autoAck = 0; - Subscription lqSub = c1.subs.subscribe(lq, "q", lqSettings); - c1.session.messageFlush("q"); - + send(c1, "q", 5); + lockMessages(c1, "q", 1); //add new node cluster.add(); - + BOOST_CHECK_EQUAL(2u, knownBrokerPorts(c1.connection, 2).size());//wait till joined //send one more message - c1.session.messageTransfer(arg::content=makeMessage((boost::format("%1%_%2%") % "m" % 6).str(), "q", durableFlag)); - - c1.session.close(); - c1.connection.close(); + send(c1, "q", 1, 6); + //release locked message + c1.close(); + //check state of queue on both nodes + checkQueue(cluster, "q", list_of<string>("m_2")("m_3")("m_4")("m_5")("m_6")); +} +QPID_AUTO_TEST_CASE(testRingQueueUpdate2) { + ScopedSuppressLogging allQuiet; + //tests that ring queues are accurately replicated on newly joined + //nodes; just like testRingQueueUpdate, but new node joins after + //the sixth message has been sent. + ClusterFixture::Args args; + args += "--log-enable", "critical"; + prepareArgs(args, durableFlag); + ClusterFixture cluster(1, args, -1); + Client c1(cluster[0], "c1"); + QueueOptions options; + options.setSizePolicy(RING, 0, 5); + c1.session.queueDeclare("q", arg::arguments=options, arg::durable=durableFlag); + send(c1, "q", 5); + lockMessages(c1, "q", 1); + //send sixth message + send(c1, "q", 1, 6); + //add new node + cluster.add(); + BOOST_CHECK_EQUAL(2u, knownBrokerPorts(c1.connection, 2).size());//wait till joined + //release locked message + c1.close(); //check state of queue on both nodes - vector<string> expected = list_of<string>("m_2")("m_3")("m_4")("m_5")("m_6"); - Client c3(cluster[0], "c3"); - BOOST_CHECK_EQUAL(browse(c3, "q", 5), expected); - Client c2(cluster[1], "c2"); - BOOST_CHECK_EQUAL(browse(c2, "q", 5), expected); + checkQueue(cluster, "q", list_of<string>("m_2")("m_3")("m_4")("m_5")("m_6")); } QPID_AUTO_TEST_CASE(testRelease) { diff --git a/qpid/cpp/xml/cluster.xml b/qpid/cpp/xml/cluster.xml index f66c7a7791..58b067a3db 100644 --- a/qpid/cpp/xml/cluster.xml +++ b/qpid/cpp/xml/cluster.xml @@ -105,6 +105,7 @@ <field name="completed" type="bit"/> <field name="ended" type="bit"/> <field name="windowing" type="bit"/> + <field name="enqueued" type="bit"/> <field name="credit" type="uint32"/> </control> |