diff options
author | Alan Conway <aconway@apache.org> | 2008-09-18 20:18:29 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2008-09-18 20:18:29 +0000 |
commit | dbf2e47baa2047922fa81d3603391695b794a77c (patch) | |
tree | 2d7c86a1aaabacf2033a503001767ba7e1d6ed29 /qpid/cpp/src/tests | |
parent | f77b6753079800bc30600904312db1108dbaafc5 (diff) | |
download | qpid-python-dbf2e47baa2047922fa81d3603391695b794a77c.tar.gz |
Dump shared state to new cluster members.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@696788 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src/tests')
-rw-r--r-- | qpid/cpp/src/tests/cluster_test.cpp | 167 |
1 files changed, 88 insertions, 79 deletions
diff --git a/qpid/cpp/src/tests/cluster_test.cpp b/qpid/cpp/src/tests/cluster_test.cpp index 8dec23a09b..1b44902054 100644 --- a/qpid/cpp/src/tests/cluster_test.cpp +++ b/qpid/cpp/src/tests/cluster_test.cpp @@ -74,10 +74,12 @@ struct ClusterFixture : public vector<uint16_t> { string name; std::auto_ptr<BrokerFixture> broker0; boost::ptr_vector<ForkedBroker> forkedBrokers; + bool init0; - ClusterFixture(size_t n); + ClusterFixture(size_t n, bool init0=true); void add(size_t n) { for (size_t i=0; i < n; ++i) add(); } void add(); + void add0(bool force); void setup(); void kill(size_t n) { if (n) forkedBrokers[n-1].kill(); @@ -85,8 +87,9 @@ struct ClusterFixture : public vector<uint16_t> { } }; -ClusterFixture::ClusterFixture(size_t n) : name(Uuid(true).str()) { +ClusterFixture::ClusterFixture(size_t n, bool init0_) : name(Uuid(true).str()), init0(init0_) { add(n); + if (!init0) return; // FIXME aconway 2008-09-18: can't use local hack in this case. // Wait for all n members to join the cluster int retry=20; // TODO aconway 2008-07-16: nasty sleeps, clean this up. while (retry && getGlobalCluster().size() != n) { @@ -101,24 +104,42 @@ void ClusterFixture::add() { os << "fork" << size(); std::string prefix = os.str(); + if (size()) { // Not the first broker, fork. + + const char* argv[] = { + "qpidd " __FILE__ , + "--load-module=../.libs/cluster.so", + "--cluster-name", name.c_str(), + "--auth=no", "--no-data-dir", + "--log-prefix", prefix.c_str(), + }; + size_t argc = sizeof(argv)/sizeof(argv[0]); + + + forkedBrokers.push_back(new ForkedBroker(argc, argv)); + push_back(forkedBrokers.back().getPort()); + } + else { + add0(init0); // First broker, run in this process. + } +} + +void ClusterFixture::add0(bool init) { + if (!init) { + push_back(0); + return; + } const char* argv[] = { "qpidd " __FILE__ , "--load-module=../.libs/cluster.so", "--cluster-name", name.c_str(), - "--auth=no", "--no-data-dir", - "--log-prefix", prefix.c_str(), + "--auth=no", "--no-data-dir" }; size_t argc = sizeof(argv)/sizeof(argv[0]); - if (size()) { // Not the first broker, fork. - forkedBrokers.push_back(new ForkedBroker(argc, argv)); - push_back(forkedBrokers.back().getPort()); - } - else { // First broker, run in this process. - qpid::log::Logger::instance().setPrefix("main"); - broker0.reset(new BrokerFixture(parseOpts(argc, argv))); - push_back(broker0->getPort()); - } + qpid::log::Logger::instance().setPrefix("main"); + broker0.reset(new BrokerFixture(parseOpts(argc, argv))); + push_back(broker0->getPort()); } // For debugging: op << for CPG types. @@ -140,60 +161,6 @@ ostream& operator<<(ostream& o, const pair<T*, int>& array) { return o; } -QPID_AUTO_TEST_CASE_EXPECTED_FAILURES(testCatchupSharedState, 1) { - ClusterFixture cluster(1); - Client c0(cluster[0], "c0"); - // Create some shared state. - c0.session.queueDeclare("q"); - c0.session.messageTransfer(arg::content=Message("foo","q")); - while (c0.session.queueQuery("q").getMessageCount() != 1) - ::usleep(1000); // Wait for message to show up on broker 0. - - // Now join new broker, should catch up. - cluster.add(); - c0.session.messageTransfer(arg::content=Message("bar","q")); - c0.session.queueDeclare("p"); - c0.session.messageTransfer(arg::content=Message("poo","p")); - - // Verify new broker has all state. - Message m; - Client c1(cluster[1], "c1"); - BOOST_CHECK(c1.subs.get(m, "q", TIME_SEC)); - BOOST_CHECK_EQUAL(m.getData(), "foo"); - BOOST_CHECK(c1.subs.get(m, "q", TIME_SEC)); - BOOST_CHECK_EQUAL(m.getData(), "bar"); - BOOST_CHECK_EQUAL(c1.session.queueQuery("q").getMessageCount(), (unsigned)0); - BOOST_CHECK(c1.subs.get(m, "p", TIME_SEC)); - BOOST_CHECK_EQUAL(m.getData(), "poo"); - BOOST_CHECK_EQUAL(c1.session.queueQuery("p").getMessageCount(), (unsigned)0); -} - -QPID_AUTO_TEST_CASE(testStall) { - ClusterFixture cluster(2); - Client c0(cluster[0], "c0"); - Client c1(cluster[1], "c1"); - - // Declare on all to avoid race condition. - c0.session.queueDeclare("q"); - c1.session.queueDeclare("q"); - - // Stall 0, verify it does not process deliverys while stalled. - getGlobalCluster().stall(); - c1.session.messageTransfer(arg::content=Message("foo","q")); - while (c1.session.queueQuery("q").getMessageCount() != 1) - ::usleep(1000); // Wait for message to show up on broker 1. - sleep(2); // FIXME aconway 2008-09-11: remove. - // But it should not be on broker 0. - boost::shared_ptr<broker::Queue> q0 = cluster.broker0->broker->getQueues().find("q"); - BOOST_REQUIRE(q0); - BOOST_CHECK_EQUAL(q0->getMessageCount(), (unsigned)0); - // Now unstall and we should get the message. - getGlobalCluster().ready(); - Message m; - BOOST_CHECK(c0.subs.get(m, "q", TIME_SEC)); - BOOST_CHECK_EQUAL(m.getData(), "foo"); -} - #if 0 // FIXME aconway 2008-09-10: finish & enable QPID_AUTO_TEST_CASE(testDumpConsumers) { ClusterFixture cluster(1); @@ -226,20 +193,36 @@ QPID_AUTO_TEST_CASE(testDumpConsumers) { #endif -QPID_AUTO_TEST_CASE(testForkedBroker) { - // Verify the ForkedBroker works as expected. - const char* argv[] = { "", "--auth=no", "--no-data-dir", "--log-prefix=testForkedBroker" }; - ForkedBroker broker(sizeof(argv)/sizeof(argv[0]), argv); - Client c(broker.getPort()); - BOOST_CHECK_EQUAL("direct", c.session.exchangeQuery("amq.direct").getType()); -} -QPID_AUTO_TEST_CASE(testSingletonCluster) { - // Test against a singleton cluster, verify basic operation. +QPID_AUTO_TEST_CASE(testCatchupSharedState) { ClusterFixture cluster(1); - Client c(cluster[0]); - BOOST_CHECK(c.session.queueQuery("q").getQueue().empty()); - BOOST_CHECK(c.session.exchangeQuery("ex").getNotFound()); + + Client c0(cluster[0], "c0"); + // Create some shared state. + c0.session.queueDeclare("q"); + c0.session.messageTransfer(arg::content=Message("foo","q")); + c0.session.messageTransfer(arg::content=Message("bar","q")); + while (c0.session.queueQuery("q").getMessageCount() != 2) + ::usleep(1000); // Wait for message to show up on broker 0. + + // FIXME aconway 2008-09-18: close session until we catchup session state also. + c0.session.close(); + c0.connection.close(); + + // Now join new broker, should catch up. + cluster.add(); + + // FIXME aconway 2008-09-18: when we do session state try adding + // further stuff from broker 0, and leaving a subscription active. + + // Verify new broker has all state. + Message m; + Client c1(cluster[1], "c1"); + BOOST_CHECK(c1.subs.get(m, "q", TIME_SEC)); + BOOST_CHECK_EQUAL(m.getData(), "foo"); + BOOST_CHECK(c1.subs.get(m, "q", TIME_SEC)); + BOOST_CHECK_EQUAL(m.getData(), "bar"); + BOOST_CHECK_EQUAL(c1.session.queueQuery("q").getMessageCount(), (unsigned)0); } QPID_AUTO_TEST_CASE(testWiringReplication) { @@ -326,4 +309,30 @@ QPID_AUTO_TEST_CASE(testDequeueWaitingSubscription) { BOOST_CHECK_EQUAL(0u, c2.session.queueQuery("q").getMessageCount()); } +QPID_AUTO_TEST_CASE(testStall) { + ClusterFixture cluster(2); + Client c0(cluster[0], "c0"); + Client c1(cluster[1], "c1"); + + // Declare on all to avoid race condition. + c0.session.queueDeclare("q"); + c1.session.queueDeclare("q"); + + // Stall 0, verify it does not process deliverys while stalled. + getGlobalCluster().stall(); + c1.session.messageTransfer(arg::content=Message("foo","q")); + while (c1.session.queueQuery("q").getMessageCount() != 1) + ::usleep(1000); // Wait for message to show up on broker 1. + sleep(2); // FIXME aconway 2008-09-11: remove. + // But it should not be on broker 0. + boost::shared_ptr<broker::Queue> q0 = cluster.broker0->broker->getQueues().find("q"); + BOOST_REQUIRE(q0); + BOOST_CHECK_EQUAL(q0->getMessageCount(), (unsigned)0); + // Now unstall and we should get the message. + getGlobalCluster().ready(); + Message m; + BOOST_CHECK(c0.subs.get(m, "q", TIME_SEC)); + BOOST_CHECK_EQUAL(m.getData(), "foo"); +} + QPID_AUTO_TEST_SUITE_END() |