summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/tests/cluster_test.cpp
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2009-06-03 16:08:33 +0000
committerGordon Sim <gsim@apache.org>2009-06-03 16:08:33 +0000
commit33a258ce40be54e24f588f43f818efd927c9059a (patch)
tree1bd9c164b9c8497f05d441cd4a9ab4df82a8742c /qpid/cpp/src/tests/cluster_test.cpp
parent3a3e6ad753ed244dc8b60cf67fac0d07e81ec966 (diff)
downloadqpid-python-33a258ce40be54e24f588f43f818efd927c9059a.tar.gz
Ensure that ring queue behaves as expected when replicated to newly joined cluster node.
Altered queueDurabilityPropagationToNewbie test to not use in-process broker to fix error caused by linking change. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@781454 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src/tests/cluster_test.cpp')
-rw-r--r--qpid/cpp/src/tests/cluster_test.cpp79
1 files changed, 78 insertions, 1 deletions
diff --git a/qpid/cpp/src/tests/cluster_test.cpp b/qpid/cpp/src/tests/cluster_test.cpp
index 611911d126..0c249bdf15 100644
--- a/qpid/cpp/src/tests/cluster_test.cpp
+++ b/qpid/cpp/src/tests/cluster_test.cpp
@@ -704,7 +704,7 @@ QPID_AUTO_TEST_CASE(queueDurabilityPropagationToNewbie)
*/
ClusterFixture::Args args;
prepareArgs(args, durableFlag);
- ClusterFixture cluster(1, args);
+ ClusterFixture cluster(1, args, -1);
Client c0(cluster[0]);
c0.session.queueDeclare("durable_queue", arg::durable=true);
c0.session.queueDeclare("non_durable_queue", arg::durable=false);
@@ -712,6 +712,8 @@ QPID_AUTO_TEST_CASE(queueDurabilityPropagationToNewbie)
Client c1(cluster[1]);
QueueQueryResult durable_query = c1.session.queueQuery ( "durable_queue" );
QueueQueryResult non_durable_query = c1.session.queueQuery ( "non_durable_queue" );
+ BOOST_CHECK_EQUAL(durable_query.getQueue(), std::string("durable_queue"));
+ BOOST_CHECK_EQUAL(non_durable_query.getQueue(), std::string("non_durable_queue"));
BOOST_CHECK_EQUAL ( durable_query.getDurable(), true );
BOOST_CHECK_EQUAL ( non_durable_query.getDurable(), false );
@@ -854,4 +856,79 @@ QPID_AUTO_TEST_CASE(testExclusiveQueueUpdate) {
BOOST_CHECK_THROW(c2.session.queueDeclare(arg::queue="q", arg::passive=true), framing::NotFoundException);
}
+QPID_AUTO_TEST_CASE(testRingQueueUpdate) {
+ ScopedSuppressLogging allQuiet;
+ //tests that ring queues are accurately replicated on newly
+ //joined nodes
+ ClusterFixture::Args args;
+ args += "--log-enable", "critical";
+ prepareArgs(args, durableFlag);
+ ClusterFixture cluster(1, args, -1);
+ Client c1(cluster[0], "c1");
+ QueueOptions options;
+ options.setSizePolicy(RING, 0, 5);
+ c1.session.queueDeclare("q", arg::arguments=options, arg::durable=durableFlag);
+ for (int i = 0; i < 5; i++) {
+ c1.session.messageTransfer(arg::content=makeMessage((boost::format("%1%_%2%") % "m" % (i+1)).str(), "q", durableFlag));
+ }
+ //receive but don't ack a message
+ LocalQueue lq;
+ SubscriptionSettings lqSettings(FlowControl::messageCredit(1));
+ lqSettings.autoAck = 0;
+ Subscription lqSub = c1.subs.subscribe(lq, "q", lqSettings);
+ c1.session.messageFlush("q");
+
+ //add new node
+ cluster.add();
+
+ //send one more message
+ c1.session.messageTransfer(arg::content=makeMessage((boost::format("%1%_%2%") % "m" % 6).str(), "q", durableFlag));
+
+ c1.session.close();
+ c1.connection.close();
+
+ //check state of queue on both nodes
+ vector<string> expected = list_of<string>("m_2")("m_3")("m_4")("m_5")("m_6");
+ Client c3(cluster[0], "c3");
+ BOOST_CHECK_EQUAL(browse(c3, "q", 5), expected);
+ Client c2(cluster[1], "c2");
+ BOOST_CHECK_EQUAL(browse(c2, "q", 5), expected);
+}
+
+QPID_AUTO_TEST_CASE(testRelease) {
+ ScopedSuppressLogging allQuiet;
+ //tests that releasing a messages that was unacked when one node
+ //joined works correctly
+ ClusterFixture::Args args;
+ args += "--log-enable", "critical";
+ prepareArgs(args, durableFlag);
+ ClusterFixture cluster(1, args, -1);
+ Client c1(cluster[0], "c1");
+ c1.session.queueDeclare("q", arg::durable=durableFlag);
+ for (int i = 0; i < 5; i++) {
+ c1.session.messageTransfer(arg::content=makeMessage((boost::format("%1%_%2%") % "m" % (i+1)).str(), "q", durableFlag));
+ }
+ //receive but don't ack a message
+ LocalQueue lq;
+ SubscriptionSettings lqSettings(FlowControl::messageCredit(1));
+ lqSettings.autoAck = 0;
+ Subscription lqSub = c1.subs.subscribe(lq, "q", lqSettings);
+ c1.session.messageFlush("q");
+ Message received;
+ BOOST_CHECK(lq.get(received));
+ BOOST_CHECK_EQUAL(received.getData(), std::string("m_1"));
+
+ //add new node
+ cluster.add();
+
+ lqSub.release(lqSub.getUnaccepted());
+
+ //check state of queue on both nodes
+ vector<string> expected = list_of<string>("m_1")("m_2")("m_3")("m_4")("m_5");
+ Client c3(cluster[0], "c3");
+ BOOST_CHECK_EQUAL(browse(c3, "q", 5), expected);
+ Client c2(cluster[1], "c2");
+ BOOST_CHECK_EQUAL(browse(c2, "q", 5), expected);
+}
+
QPID_AUTO_TEST_SUITE_END()