diff options
Diffstat (limited to 'cpp/src/tests')
-rw-r--r-- | cpp/src/tests/cluster_test.cpp | 65 |
1 files changed, 63 insertions, 2 deletions
diff --git a/cpp/src/tests/cluster_test.cpp b/cpp/src/tests/cluster_test.cpp index b87bbf4aa5..50ca241b5d 100644 --- a/cpp/src/tests/cluster_test.cpp +++ b/cpp/src/tests/cluster_test.cpp @@ -936,10 +936,13 @@ void checkQueue(ClusterFixture& cluster, const std::string& queue, const std::ve } } -void send(Client& client, const std::string& queue, int count, int start=1, const std::string& base="m") +void send(Client& client, const std::string& queue, int count, int start=1, const std::string& base="m", + const std::string& lvqKey="") { for (int i = 0; i < count; i++) { - client.session.messageTransfer(arg::content=makeMessage((boost::format("%1%_%2%") % base % (i+start)).str(), queue, durableFlag)); + Message message = makeMessage((boost::format("%1%_%2%") % base % (i+start)).str(), queue, durableFlag); + if (!lvqKey.empty()) message.getHeaders().setString(QueueOptions::strLVQMatchProperty, lvqKey); + client.session.messageTransfer(arg::content=message); } } @@ -998,6 +1001,64 @@ QPID_AUTO_TEST_CASE(testRingQueueUpdate2) { } } +QPID_AUTO_TEST_CASE(testLvqUpdate) { + //tests that lvqs are accurately replicated on newly joined nodes + ClusterFixture::Args args; + args += "--log-enable", "critical"; + prepareArgs(args, durableFlag); + ClusterFixture cluster(1, args, -1); + Client c1(cluster[0], "c1"); + { + ScopedSuppressLogging allQuiet; + QueueOptions options; + options.setOrdering(LVQ); + c1.session.queueDeclare("q", arg::arguments=options, arg::durable=durableFlag); + + send(c1, "q", 5, 1, "a", "a"); + send(c1, "q", 2, 1, "b", "b"); + send(c1, "q", 1, 1, "c", "c"); + send(c1, "q", 1, 3, "b", "b"); + + //add new node + cluster.add(); + BOOST_CHECK_EQUAL(2u, knownBrokerPorts(c1.connection, 2).size());//wait till joined + + //check state of queue on both nodes + checkQueue(cluster, "q", list_of<string>("a_5")("b_3")("c_1")); + } +} + + +QPID_AUTO_TEST_CASE(testBrowsedLvqUpdate) { + //tests that lvqs are accurately replicated on newly joined nodes + //if the lvq state has been affected by browsers + ClusterFixture::Args args; + args += "--log-enable", "critical"; + prepareArgs(args, durableFlag); + ClusterFixture cluster(1, args, -1); + Client c1(cluster[0], "c1"); + { + ScopedSuppressLogging allQuiet; + QueueOptions options; + options.setOrdering(LVQ); + c1.session.queueDeclare("q", arg::arguments=options, arg::durable=durableFlag); + + send(c1, "q", 1, 1, "a", "a"); + send(c1, "q", 2, 1, "b", "b"); + send(c1, "q", 1, 1, "c", "c"); + checkQueue(cluster, "q", list_of<string>("a_1")("b_2")("c_1")); + send(c1, "q", 4, 2, "a", "a"); + send(c1, "q", 1, 3, "b", "b"); + + //add new node + cluster.add(); + BOOST_CHECK_EQUAL(2u, knownBrokerPorts(c1.connection, 2).size());//wait till joined + + //check state of queue on both nodes + checkQueue(cluster, "q", list_of<string>("a_1")("b_2")("c_1")("a_5")("b_3")); + } +} + QPID_AUTO_TEST_CASE(testRelease) { //tests that releasing a messages that was unacked when one node //joined works correctly |