diff options
Diffstat (limited to 'cpp/src/tests')
-rw-r--r-- | cpp/src/tests/cluster_test.cpp | 46 |
1 files changed, 29 insertions, 17 deletions
diff --git a/cpp/src/tests/cluster_test.cpp b/cpp/src/tests/cluster_test.cpp index 1b44902054..60f85df02d 100644 --- a/cpp/src/tests/cluster_test.cpp +++ b/cpp/src/tests/cluster_test.cpp @@ -81,21 +81,26 @@ struct ClusterFixture : public vector<uint16_t> { void add(); void add0(bool force); void setup(); + void kill(size_t n) { if (n) forkedBrokers[n-1].kill(); else broker0->broker->shutdown(); } + + void waitFor(size_t n) { + size_t retry=1000; // TODO aconway 2008-07-16: nasty sleeps, clean this up. + while (retry && getGlobalCluster().size() != n) { + ::usleep(1000); + --retry; + } + } }; ClusterFixture::ClusterFixture(size_t n, bool init0_) : name(Uuid(true).str()), init0(init0_) { add(n); if (!init0) return; // FIXME aconway 2008-09-18: can't use local hack in this case. // Wait for all n members to join the cluster - int retry=20; // TODO aconway 2008-07-16: nasty sleeps, clean this up. - while (retry && getGlobalCluster().size() != n) { - ::sleep(1); - --retry; - } + waitFor(n); BOOST_REQUIRE_EQUAL(n, getGlobalCluster().size()); } @@ -139,7 +144,7 @@ void ClusterFixture::add0(bool init) { qpid::log::Logger::instance().setPrefix("main"); broker0.reset(new BrokerFixture(parseOpts(argc, argv))); - push_back(broker0->getPort()); + if (size()) front() = broker0->getPort(); else push_back(broker0->getPort()); } // For debugging: op << for CPG types. @@ -190,14 +195,12 @@ QPID_AUTO_TEST_CASE(testDumpConsumers) { BOOST_CHECK_EQUAL(a.session.queueQuery("q").getMessageCount(), (unsigned)0); } - #endif - QPID_AUTO_TEST_CASE(testCatchupSharedState) { ClusterFixture cluster(1); - Client c0(cluster[0], "c0"); + // Create some shared state. c0.session.queueDeclare("q"); c0.session.messageTransfer(arg::content=Message("foo","q")); @@ -205,24 +208,33 @@ QPID_AUTO_TEST_CASE(testCatchupSharedState) { while (c0.session.queueQuery("q").getMessageCount() != 2) ::usleep(1000); // Wait for message to show up on broker 0. - // FIXME aconway 2008-09-18: close session until we catchup session state also. - c0.session.close(); - c0.connection.close(); - - // Now join new broker, should catch up. + // Add a new broker, it should catch up. cluster.add(); - // FIXME aconway 2008-09-18: when we do session state try adding - // further stuff from broker 0, and leaving a subscription active. + // Do some work post-add + c0.session.queueDeclare("p"); + c0.session.messageTransfer(arg::content=Message("pfoo","p")); + // Do some work post-join + cluster.waitFor(2); + c0.session.messageTransfer(arg::content=Message("pbar","p")); + // Verify new broker has all state. Message m; + Client c1(cluster[1], "c1"); + BOOST_CHECK(c1.subs.get(m, "q", TIME_SEC)); BOOST_CHECK_EQUAL(m.getData(), "foo"); BOOST_CHECK(c1.subs.get(m, "q", TIME_SEC)); BOOST_CHECK_EQUAL(m.getData(), "bar"); - BOOST_CHECK_EQUAL(c1.session.queueQuery("q").getMessageCount(), (unsigned)0); + BOOST_CHECK_EQUAL(c1.session.queueQuery("q").getMessageCount(), 0u); + + BOOST_CHECK(c1.subs.get(m, "p", TIME_SEC)); + BOOST_CHECK_EQUAL(m.getData(), "pfoo"); + BOOST_CHECK(c1.subs.get(m, "p", TIME_SEC)); + BOOST_CHECK_EQUAL(m.getData(), "pbar"); + BOOST_CHECK_EQUAL(c1.session.queueQuery("p").getMessageCount(), 0u); } QPID_AUTO_TEST_CASE(testWiringReplication) { |