diff options
author | Gordon Sim <gsim@apache.org> | 2009-06-03 16:08:33 +0000 |
---|---|---|
committer | Gordon Sim <gsim@apache.org> | 2009-06-03 16:08:33 +0000 |
commit | 33a258ce40be54e24f588f43f818efd927c9059a (patch) | |
tree | 1bd9c164b9c8497f05d441cd4a9ab4df82a8742c /qpid/cpp/src/tests/cluster_test.cpp | |
parent | 3a3e6ad753ed244dc8b60cf67fac0d07e81ec966 (diff) | |
download | qpid-python-33a258ce40be54e24f588f43f818efd927c9059a.tar.gz |
Ensure that ring queue behaves as expected when replicated to newly joined cluster node.
Altered queueDurabilityPropagationToNewbie test to not use in-process broker to fix error caused by linking change.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@781454 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src/tests/cluster_test.cpp')
-rw-r--r-- | qpid/cpp/src/tests/cluster_test.cpp | 79 |
1 files changed, 78 insertions, 1 deletions
diff --git a/qpid/cpp/src/tests/cluster_test.cpp b/qpid/cpp/src/tests/cluster_test.cpp index 611911d126..0c249bdf15 100644 --- a/qpid/cpp/src/tests/cluster_test.cpp +++ b/qpid/cpp/src/tests/cluster_test.cpp @@ -704,7 +704,7 @@ QPID_AUTO_TEST_CASE(queueDurabilityPropagationToNewbie) */ ClusterFixture::Args args; prepareArgs(args, durableFlag); - ClusterFixture cluster(1, args); + ClusterFixture cluster(1, args, -1); Client c0(cluster[0]); c0.session.queueDeclare("durable_queue", arg::durable=true); c0.session.queueDeclare("non_durable_queue", arg::durable=false); @@ -712,6 +712,8 @@ QPID_AUTO_TEST_CASE(queueDurabilityPropagationToNewbie) Client c1(cluster[1]); QueueQueryResult durable_query = c1.session.queueQuery ( "durable_queue" ); QueueQueryResult non_durable_query = c1.session.queueQuery ( "non_durable_queue" ); + BOOST_CHECK_EQUAL(durable_query.getQueue(), std::string("durable_queue")); + BOOST_CHECK_EQUAL(non_durable_query.getQueue(), std::string("non_durable_queue")); BOOST_CHECK_EQUAL ( durable_query.getDurable(), true ); BOOST_CHECK_EQUAL ( non_durable_query.getDurable(), false ); @@ -854,4 +856,79 @@ QPID_AUTO_TEST_CASE(testExclusiveQueueUpdate) { BOOST_CHECK_THROW(c2.session.queueDeclare(arg::queue="q", arg::passive=true), framing::NotFoundException); } +QPID_AUTO_TEST_CASE(testRingQueueUpdate) { + ScopedSuppressLogging allQuiet; + //tests that ring queues are accurately replicated on newly + //joined nodes + ClusterFixture::Args args; + args += "--log-enable", "critical"; + prepareArgs(args, durableFlag); + ClusterFixture cluster(1, args, -1); + Client c1(cluster[0], "c1"); + QueueOptions options; + options.setSizePolicy(RING, 0, 5); + c1.session.queueDeclare("q", arg::arguments=options, arg::durable=durableFlag); + for (int i = 0; i < 5; i++) { + c1.session.messageTransfer(arg::content=makeMessage((boost::format("%1%_%2%") % "m" % (i+1)).str(), "q", durableFlag)); + } + //receive but don't ack a message + LocalQueue lq; + SubscriptionSettings lqSettings(FlowControl::messageCredit(1)); + lqSettings.autoAck = 0; + Subscription lqSub = c1.subs.subscribe(lq, "q", lqSettings); + c1.session.messageFlush("q"); + + //add new node + cluster.add(); + + //send one more message + c1.session.messageTransfer(arg::content=makeMessage((boost::format("%1%_%2%") % "m" % 6).str(), "q", durableFlag)); + + c1.session.close(); + c1.connection.close(); + + //check state of queue on both nodes + vector<string> expected = list_of<string>("m_2")("m_3")("m_4")("m_5")("m_6"); + Client c3(cluster[0], "c3"); + BOOST_CHECK_EQUAL(browse(c3, "q", 5), expected); + Client c2(cluster[1], "c2"); + BOOST_CHECK_EQUAL(browse(c2, "q", 5), expected); +} + +QPID_AUTO_TEST_CASE(testRelease) { + ScopedSuppressLogging allQuiet; + //tests that releasing a messages that was unacked when one node + //joined works correctly + ClusterFixture::Args args; + args += "--log-enable", "critical"; + prepareArgs(args, durableFlag); + ClusterFixture cluster(1, args, -1); + Client c1(cluster[0], "c1"); + c1.session.queueDeclare("q", arg::durable=durableFlag); + for (int i = 0; i < 5; i++) { + c1.session.messageTransfer(arg::content=makeMessage((boost::format("%1%_%2%") % "m" % (i+1)).str(), "q", durableFlag)); + } + //receive but don't ack a message + LocalQueue lq; + SubscriptionSettings lqSettings(FlowControl::messageCredit(1)); + lqSettings.autoAck = 0; + Subscription lqSub = c1.subs.subscribe(lq, "q", lqSettings); + c1.session.messageFlush("q"); + Message received; + BOOST_CHECK(lq.get(received)); + BOOST_CHECK_EQUAL(received.getData(), std::string("m_1")); + + //add new node + cluster.add(); + + lqSub.release(lqSub.getUnaccepted()); + + //check state of queue on both nodes + vector<string> expected = list_of<string>("m_1")("m_2")("m_3")("m_4")("m_5"); + Client c3(cluster[0], "c3"); + BOOST_CHECK_EQUAL(browse(c3, "q", 5), expected); + Client c2(cluster[1], "c2"); + BOOST_CHECK_EQUAL(browse(c2, "q", 5), expected); +} + QPID_AUTO_TEST_SUITE_END() |