diff options
author | Gordon Sim <gsim@apache.org> | 2009-06-05 17:39:07 +0000 |
---|---|---|
committer | Gordon Sim <gsim@apache.org> | 2009-06-05 17:39:07 +0000 |
commit | 6efddffd3c04fc16d2e63ccfa9db6c6ad0798825 (patch) | |
tree | 7b9e3b3b7e02b058af5919649cc0831bee99605a /cpp/src/tests | |
parent | db28bbe4b54cef5457e5349f85a154393db70a8d (diff) | |
download | qpid-python-6efddffd3c04fc16d2e63ccfa9db6c6ad0798825.tar.gz |
Further fix to new cluster member state transfer to fix a case where unacked messages on ring policy queue cause inconsistencies in queue state between nodes.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@782075 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/tests')
-rw-r--r-- | cpp/src/tests/BrokerFixture.h | 1 | ||||
-rw-r--r-- | cpp/src/tests/cluster_test.cpp | 87 |
2 files changed, 68 insertions, 20 deletions
diff --git a/cpp/src/tests/BrokerFixture.h b/cpp/src/tests/BrokerFixture.h index c691e351e7..397045d00b 100644 --- a/cpp/src/tests/BrokerFixture.h +++ b/cpp/src/tests/BrokerFixture.h @@ -123,6 +123,7 @@ struct ClientT { : connection(settings), session(connection.newSession(name_)), subs(session), name(name_) {} ~ClientT() { connection.close(); } + void close() { session.close(); connection.close(); } }; typedef ClientT<> Client; diff --git a/cpp/src/tests/cluster_test.cpp b/cpp/src/tests/cluster_test.cpp index 0c249bdf15..a903d192c2 100644 --- a/cpp/src/tests/cluster_test.cpp +++ b/cpp/src/tests/cluster_test.cpp @@ -856,6 +856,41 @@ QPID_AUTO_TEST_CASE(testExclusiveQueueUpdate) { BOOST_CHECK_THROW(c2.session.queueDeclare(arg::queue="q", arg::passive=true), framing::NotFoundException); } +/** + * Subscribes to specified queue and acquires up to the specified + * number of message but does not accept or release them. These + * message are therefore 'locked' by the clients session. + */ +Subscription lockMessages(Client& client, const std::string& queue, int count) +{ + LocalQueue q; + SubscriptionSettings settings(FlowControl::messageCredit(count)); + settings.autoAck = 0; + Subscription sub = client.subs.subscribe(q, queue, settings); + client.session.messageFlush(sub.getName()); + return sub; +} + +/** + * check that the specified queue contains the expected set of + * messages (matched on content) for all nodes in the cluster + */ +void checkQueue(ClusterFixture& cluster, const std::string& queue, const std::vector<std::string>& messages) +{ + for (size_t i = 0; i < cluster.size(); i++) { + Client client(cluster[i], (boost::format("%1%_%2%") % "c" % (i+1)).str()); + BOOST_CHECK_EQUAL(browse(client, queue, messages.size()), messages); + client.close(); + } +} + +void send(Client& client, const std::string& queue, int count, int start=1, const std::string& base="m") +{ + for (int i = 0; i < count; i++) { + client.session.messageTransfer(arg::content=makeMessage((boost::format("%1%_%2%") % base % (i+start)).str(), queue, durableFlag)); + } +} + QPID_AUTO_TEST_CASE(testRingQueueUpdate) { ScopedSuppressLogging allQuiet; //tests that ring queues are accurately replicated on newly @@ -868,31 +903,43 @@ QPID_AUTO_TEST_CASE(testRingQueueUpdate) { QueueOptions options; options.setSizePolicy(RING, 0, 5); c1.session.queueDeclare("q", arg::arguments=options, arg::durable=durableFlag); - for (int i = 0; i < 5; i++) { - c1.session.messageTransfer(arg::content=makeMessage((boost::format("%1%_%2%") % "m" % (i+1)).str(), "q", durableFlag)); - } - //receive but don't ack a message - LocalQueue lq; - SubscriptionSettings lqSettings(FlowControl::messageCredit(1)); - lqSettings.autoAck = 0; - Subscription lqSub = c1.subs.subscribe(lq, "q", lqSettings); - c1.session.messageFlush("q"); - + send(c1, "q", 5); + lockMessages(c1, "q", 1); //add new node cluster.add(); - + BOOST_CHECK_EQUAL(2u, knownBrokerPorts(c1.connection, 2).size());//wait till joined //send one more message - c1.session.messageTransfer(arg::content=makeMessage((boost::format("%1%_%2%") % "m" % 6).str(), "q", durableFlag)); - - c1.session.close(); - c1.connection.close(); + send(c1, "q", 1, 6); + //release locked message + c1.close(); + //check state of queue on both nodes + checkQueue(cluster, "q", list_of<string>("m_2")("m_3")("m_4")("m_5")("m_6")); +} +QPID_AUTO_TEST_CASE(testRingQueueUpdate2) { + ScopedSuppressLogging allQuiet; + //tests that ring queues are accurately replicated on newly joined + //nodes; just like testRingQueueUpdate, but new node joins after + //the sixth message has been sent. + ClusterFixture::Args args; + args += "--log-enable", "critical"; + prepareArgs(args, durableFlag); + ClusterFixture cluster(1, args, -1); + Client c1(cluster[0], "c1"); + QueueOptions options; + options.setSizePolicy(RING, 0, 5); + c1.session.queueDeclare("q", arg::arguments=options, arg::durable=durableFlag); + send(c1, "q", 5); + lockMessages(c1, "q", 1); + //send sixth message + send(c1, "q", 1, 6); + //add new node + cluster.add(); + BOOST_CHECK_EQUAL(2u, knownBrokerPorts(c1.connection, 2).size());//wait till joined + //release locked message + c1.close(); //check state of queue on both nodes - vector<string> expected = list_of<string>("m_2")("m_3")("m_4")("m_5")("m_6"); - Client c3(cluster[0], "c3"); - BOOST_CHECK_EQUAL(browse(c3, "q", 5), expected); - Client c2(cluster[1], "c2"); - BOOST_CHECK_EQUAL(browse(c2, "q", 5), expected); + checkQueue(cluster, "q", list_of<string>("m_2")("m_3")("m_4")("m_5")("m_6")); } QPID_AUTO_TEST_CASE(testRelease) { |