From 6efddffd3c04fc16d2e63ccfa9db6c6ad0798825 Mon Sep 17 00:00:00 2001 From: Gordon Sim Date: Fri, 5 Jun 2009 17:39:07 +0000 Subject: Further fix to new cluster member state transfer to fix a case where unacked messages on ring policy queue cause inconsistencies in queue state between nodes. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@782075 13f79535-47bb-0310-9956-ffa450edef68 --- cpp/src/tests/cluster_test.cpp | 87 ++++++++++++++++++++++++++++++++---------- 1 file changed, 67 insertions(+), 20 deletions(-) (limited to 'cpp/src/tests/cluster_test.cpp') diff --git a/cpp/src/tests/cluster_test.cpp b/cpp/src/tests/cluster_test.cpp index 0c249bdf15..a903d192c2 100644 --- a/cpp/src/tests/cluster_test.cpp +++ b/cpp/src/tests/cluster_test.cpp @@ -856,6 +856,41 @@ QPID_AUTO_TEST_CASE(testExclusiveQueueUpdate) { BOOST_CHECK_THROW(c2.session.queueDeclare(arg::queue="q", arg::passive=true), framing::NotFoundException); } +/** + * Subscribes to specified queue and acquires up to the specified + * number of message but does not accept or release them. These + * message are therefore 'locked' by the clients session. + */ +Subscription lockMessages(Client& client, const std::string& queue, int count) +{ + LocalQueue q; + SubscriptionSettings settings(FlowControl::messageCredit(count)); + settings.autoAck = 0; + Subscription sub = client.subs.subscribe(q, queue, settings); + client.session.messageFlush(sub.getName()); + return sub; +} + +/** + * check that the specified queue contains the expected set of + * messages (matched on content) for all nodes in the cluster + */ +void checkQueue(ClusterFixture& cluster, const std::string& queue, const std::vector& messages) +{ + for (size_t i = 0; i < cluster.size(); i++) { + Client client(cluster[i], (boost::format("%1%_%2%") % "c" % (i+1)).str()); + BOOST_CHECK_EQUAL(browse(client, queue, messages.size()), messages); + client.close(); + } +} + +void send(Client& client, const std::string& queue, int count, int start=1, const std::string& base="m") +{ + for (int i = 0; i < count; i++) { + client.session.messageTransfer(arg::content=makeMessage((boost::format("%1%_%2%") % base % (i+start)).str(), queue, durableFlag)); + } +} + QPID_AUTO_TEST_CASE(testRingQueueUpdate) { ScopedSuppressLogging allQuiet; //tests that ring queues are accurately replicated on newly @@ -868,31 +903,43 @@ QPID_AUTO_TEST_CASE(testRingQueueUpdate) { QueueOptions options; options.setSizePolicy(RING, 0, 5); c1.session.queueDeclare("q", arg::arguments=options, arg::durable=durableFlag); - for (int i = 0; i < 5; i++) { - c1.session.messageTransfer(arg::content=makeMessage((boost::format("%1%_%2%") % "m" % (i+1)).str(), "q", durableFlag)); - } - //receive but don't ack a message - LocalQueue lq; - SubscriptionSettings lqSettings(FlowControl::messageCredit(1)); - lqSettings.autoAck = 0; - Subscription lqSub = c1.subs.subscribe(lq, "q", lqSettings); - c1.session.messageFlush("q"); - + send(c1, "q", 5); + lockMessages(c1, "q", 1); //add new node cluster.add(); - + BOOST_CHECK_EQUAL(2u, knownBrokerPorts(c1.connection, 2).size());//wait till joined //send one more message - c1.session.messageTransfer(arg::content=makeMessage((boost::format("%1%_%2%") % "m" % 6).str(), "q", durableFlag)); - - c1.session.close(); - c1.connection.close(); + send(c1, "q", 1, 6); + //release locked message + c1.close(); + //check state of queue on both nodes + checkQueue(cluster, "q", list_of("m_2")("m_3")("m_4")("m_5")("m_6")); +} +QPID_AUTO_TEST_CASE(testRingQueueUpdate2) { + ScopedSuppressLogging allQuiet; + //tests that ring queues are accurately replicated on newly joined + //nodes; just like testRingQueueUpdate, but new node joins after + //the sixth message has been sent. + ClusterFixture::Args args; + args += "--log-enable", "critical"; + prepareArgs(args, durableFlag); + ClusterFixture cluster(1, args, -1); + Client c1(cluster[0], "c1"); + QueueOptions options; + options.setSizePolicy(RING, 0, 5); + c1.session.queueDeclare("q", arg::arguments=options, arg::durable=durableFlag); + send(c1, "q", 5); + lockMessages(c1, "q", 1); + //send sixth message + send(c1, "q", 1, 6); + //add new node + cluster.add(); + BOOST_CHECK_EQUAL(2u, knownBrokerPorts(c1.connection, 2).size());//wait till joined + //release locked message + c1.close(); //check state of queue on both nodes - vector expected = list_of("m_2")("m_3")("m_4")("m_5")("m_6"); - Client c3(cluster[0], "c3"); - BOOST_CHECK_EQUAL(browse(c3, "q", 5), expected); - Client c2(cluster[1], "c2"); - BOOST_CHECK_EQUAL(browse(c2, "q", 5), expected); + checkQueue(cluster, "q", list_of("m_2")("m_3")("m_4")("m_5")("m_6")); } QPID_AUTO_TEST_CASE(testRelease) { -- cgit v1.2.1