diff options
Diffstat (limited to 'qpid/cpp/src/tests/cluster_test.cpp')
-rw-r--r-- | qpid/cpp/src/tests/cluster_test.cpp | 144 |
1 files changed, 107 insertions, 37 deletions
diff --git a/qpid/cpp/src/tests/cluster_test.cpp b/qpid/cpp/src/tests/cluster_test.cpp index 1f9aac8fc5..da77d7405e 100644 --- a/qpid/cpp/src/tests/cluster_test.cpp +++ b/qpid/cpp/src/tests/cluster_test.cpp @@ -59,12 +59,19 @@ using boost::ptr_vector; using qpid::cluster::Cluster; using qpid::cluster::getGlobalCluster; +/** Parse broker & cluster options */ +Broker::Options parseOpts(size_t argc, const char* argv[]) { + Broker::Options opts; + Plugin::addOptions(opts); // Pick up cluster options. + opts.parse(argc, argv, "", true); // Allow-unknown for --load-module + return opts; +} + /** Cluster fixture is a vector of ports for the replicas. * Replica 0 is in the current process, all others are forked as children. */ struct ClusterFixture : public vector<uint16_t> { string name; - Broker::Options opts; std::auto_ptr<BrokerFixture> broker0; boost::ptr_vector<ForkedBroker> forkedBrokers; @@ -96,7 +103,7 @@ void ClusterFixture::add() { const char* argv[] = { "qpidd " __FILE__ , - "--load-module=../.libs/libqpidcluster.so", + "--load-module=../.libs/cluster.so", "--cluster-name", name.c_str(), "--auth=no", "--no-data-dir", "--log-prefix", prefix.c_str(), @@ -108,11 +115,8 @@ void ClusterFixture::add() { push_back(forkedBrokers.back().getPort()); } else { // First broker, run in this process. - Broker::Options opts; qpid::log::Logger::instance().setPrefix("main"); - Plugin::addOptions(opts); // Pick up cluster options. - opts.parse(argc, argv, "", true); // Allow-unknown for --load-module - broker0.reset(new BrokerFixture(opts)); + broker0.reset(new BrokerFixture(parseOpts(argc, argv))); push_back(broker0->getPort()); } } @@ -136,38 +140,16 @@ ostream& operator<<(ostream& o, const pair<T*, int>& array) { return o; } -#if 0 // FIXME aconway 2008-09-10: finish & enable -QPID_AUTO_TEST_CASE(testDumpConsumers) { - ClusterFixture cluster(1); - Client a(cluster[0]); - a.session.queueDeclare("q"); - a.subs.subscribe(a.lq, "q"); - - cluster.add(); - Client b(cluster[1]); - try { - b.connection.newSession(a.session.getId().getName()); - BOOST_FAIL("Expected SessionBusyException for " << a.session.getId().getName()); - } catch (const SessionBusyException&) {} - - // Transfer some messages to the subscription by client a. - Message m; - a.session.messageTransfer(arg::bindingKey="q", arg::content=Message("aaa", "q")); - BOOST_CHECK(a.lq.get(m, TIME_SEC)); - BOOST_CHECK_EQUAL(m.getData(), "aaa"); - - b.session.messageTransfer(arg::bindingKey="q", arg::content=Message("bbb", "q")); - BOOST_CHECK(a.lq.get(m, TIME_SEC)); - BOOST_CHECK_EQUAL(m.getData(), "bbb"); - - // Verify that the queue has been drained on both brokers. - // This proves that the consumer was replicated when the second broker joined. - BOOST_CHECK_EQUAL(a.session.queueQuery("q").getMessageCount(), 0); -} -#endif - +// FIXME aconway 2008-09-11: This test has to be first otherwise +// it picks up the cluster name from a previous test and runs the +// brokers as cluster nodes. Something wrong with option parsing... +// QPID_AUTO_TEST_CASE(testDumpClientSharedState) { - BrokerFixture donor, receiver; + // In this test we don't want the cluster plugin to initialize, so set --cluster-name="" + const char* argv[] = { "--cluster-name", "" }; + Broker::Options opts = parseOpts(sizeof(argv)/sizeof(*argv), argv); + + BrokerFixture donor(opts), receiver(opts); { Client c(donor.getPort()); FieldTable args; @@ -246,6 +228,94 @@ QPID_AUTO_TEST_CASE(testDumpClientSharedState) { } } + +// FIXME aconway 2008-09-12: finish the new join protocol. +QPID_AUTO_TEST_CASE_EXPECTED_FAILURES(testCatchUpSharedState, 1) { + ClusterFixture cluster(1); + Client c0(cluster[0], "c0"); + // Create some shared state. + c0.session.queueDeclare("q"); + c0.session.messageTransfer(arg::content=Message("foo","q")); + while (c0.session.queueQuery("q").getMessageCount() != 1) + ::usleep(1000); // Wait for message to show up on broker 0. + + // Now join new broker, should catch up. + cluster.add(); + c0.session.messageTransfer(arg::content=Message("bar","q")); + c0.session.queueDeclare("p"); + c0.session.messageTransfer(arg::content=Message("poo","p")); + + // Verify new broker has all state. + Message m; + Client c1(cluster[1], "c1"); + BOOST_CHECK(c1.subs.get(m, "q", TIME_SEC)); + BOOST_CHECK_EQUAL(m.getData(), "foo"); + BOOST_CHECK(c1.subs.get(m, "q", TIME_SEC)); + BOOST_CHECK_EQUAL(m.getData(), "bar"); + BOOST_CHECK_EQUAL(c1.session.queueQuery("q").getMessageCount(), 0); + BOOST_CHECK(c1.subs.get(m, "p", TIME_SEC)); + BOOST_CHECK_EQUAL(m.getData(), "poo"); + BOOST_CHECK_EQUAL(c1.session.queueQuery("p").getMessageCount(), 0); +} + +QPID_AUTO_TEST_CASE(testStall) { + ClusterFixture cluster(2); + Client c0(cluster[0], "c0"); + Client c1(cluster[1], "c1"); + + // Declare on all to avoid race condition. + c0.session.queueDeclare("q"); + c1.session.queueDeclare("q"); + + // Stall 0, verify it does not process deliverys while stalled. + getGlobalCluster().stall(); + c1.session.messageTransfer(arg::content=Message("foo","q")); + while (c1.session.queueQuery("q").getMessageCount() != 1) + ::usleep(1000); // Wait for message to show up on broker 1. + sleep(2); // FIXME aconway 2008-09-11: remove. + // But it should not be on broker 0. + boost::shared_ptr<broker::Queue> q0 = cluster.broker0->broker->getQueues().find("q"); + BOOST_REQUIRE(q0); + BOOST_CHECK_EQUAL(q0->getMessageCount(), 0); + // Now unstall and we should get the message. + getGlobalCluster().unStall(); + Message m; + BOOST_CHECK(c0.subs.get(m, "q", TIME_SEC)); + BOOST_CHECK_EQUAL(m.getData(), "foo"); +} + +#if 0 // FIXME aconway 2008-09-10: finish & enable +QPID_AUTO_TEST_CASE(testDumpConsumers) { + ClusterFixture cluster(1); + Client a(cluster[0]); + a.session.queueDeclare("q"); + a.subs.subscribe(a.lq, "q"); + + cluster.add(); + Client b(cluster[1]); + try { + b.connection.newSession(a.session.getId().getName()); + BOOST_FAIL("Expected SessionBusyException for " << a.session.getId().getName()); + } catch (const SessionBusyException&) {} + + // Transfer some messages to the subscription by client a. + Message m; + a.session.messageTransfer(arg::content=Message("aaa", "q")); + BOOST_CHECK(a.lq.get(m, TIME_SEC)); + BOOST_CHECK_EQUAL(m.getData(), "aaa"); + + b.session.messageTransfer(arg::content=Message("bbb", "q")); + BOOST_CHECK(a.lq.get(m, TIME_SEC)); + BOOST_CHECK_EQUAL(m.getData(), "bbb"); + + // Verify that the queue has been drained on both brokers. + // This proves that the consumer was replicated when the second broker joined. + BOOST_CHECK_EQUAL(a.session.queueQuery("q").getMessageCount(), 0); +} + + +#endif + QPID_AUTO_TEST_CASE(testForkedBroker) { // Verify the ForkedBroker works as expected. const char* argv[] = { "", "--auth=no", "--no-data-dir", "--log-prefix=testForkedBroker" }; |