From a2a56cf9a7483e165fb579d0b519b284d02009e3 Mon Sep 17 00:00:00 2001 From: Alan Conway Date: Wed, 24 Sep 2008 17:34:08 +0000 Subject: Cluster replicates session command sequence state and consumers to newcomers. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@698666 13f79535-47bb-0310-9956-ffa450edef68 --- cpp/src/tests/cluster_test.cpp | 49 +++++++++++++++++++++--------------------- 1 file changed, 25 insertions(+), 24 deletions(-) (limited to 'cpp/src/tests/cluster_test.cpp') diff --git a/cpp/src/tests/cluster_test.cpp b/cpp/src/tests/cluster_test.cpp index 6bb5e4a8ca..9573caf61d 100644 --- a/cpp/src/tests/cluster_test.cpp +++ b/cpp/src/tests/cluster_test.cpp @@ -98,7 +98,7 @@ struct ClusterFixture : public vector { ClusterFixture::ClusterFixture(size_t n, bool init0_) : name(Uuid(true).str()), init0(init0_) { add(n); - if (!init0) return; // FIXME aconway 2008-09-18: can't use local hack in this case. + if (!init0) return; // Defer initialization of broker0 // Wait for all n members to join the cluster waitFor(n); BOOST_REQUIRE_EQUAL(n, getGlobalCluster().size()); @@ -164,36 +164,35 @@ ostream& operator<<(ostream& o, const pair& array) { return o; } -#if 0 // FIXME aconway 2008-09-22: enable. QPID_AUTO_TEST_CASE(DumpConsumers) { - ClusterFixture cluster(1); - Client c0(cluster[0]); + ClusterFixture cluster(1); + Client c0(cluster[0], "c0"); c0.session.queueDeclare("q"); - c0.subs.subscribe(c0.lq, "q"); - c0.session.messageTransfer(arg::content=Message("before", "q")); - Message m; - BOOST_CHECK(c0.lq.get(m, TIME_SEC)); - BOOST_CHECK_EQUAL(m.getData(), "before"); + c0.subs.subscribe(c0.lq, "q", FlowControl::zero()); + c0.session.sync(); - // Start new member + // Start new members cluster.add(); - Client c1(cluster[1]); + Client c1(cluster[1], "c1"); + cluster.add(); + Client c2(cluster[2], "c2"); - // Transfer some messages to the subscription by client c0. + // Transfer a message, verify all members see it. c0.session.messageTransfer(arg::content=Message("aaa", "q")); - BOOST_CHECK(c0.lq.get(m, TIME_SEC)); - BOOST_CHECK_EQUAL(m.getData(), "aaa"); + BOOST_CHECK_EQUAL(c0.session.queueQuery("q").getMessageCount(), 1u); + BOOST_CHECK_EQUAL(c1.session.queueQuery("q").getMessageCount(), 1u); + BOOST_CHECK_EQUAL(c2.session.queueQuery("q").getMessageCount(), 1u); - c1.session.messageTransfer(arg::content=Message("bbb", "q")); + // Activate the subscription, ensure message removed on all queues. + c0.subs.setFlowControl("q", FlowControl::messageCredit(1)); + Message m; BOOST_CHECK(c0.lq.get(m, TIME_SEC)); - BOOST_CHECK_EQUAL(m.getData(), "bbb"); + BOOST_CHECK_EQUAL(m.getData(), "aaa"); - // Verify that the queue has been drained on both brokers. - // This proves that the consumer was replicated when the second broker joined. BOOST_CHECK_EQUAL(c0.session.queueQuery("q").getMessageCount(), 0u); BOOST_CHECK_EQUAL(c1.session.queueQuery("q").getMessageCount(), 0u); + BOOST_CHECK_EQUAL(c2.session.queueQuery("q").getMessageCount(), 0u); } -#endif QPID_AUTO_TEST_CASE(testCatchupSharedState) { ClusterFixture cluster(1); @@ -217,7 +216,7 @@ QPID_AUTO_TEST_CASE(testCatchupSharedState) { cluster.waitFor(2); c0.session.messageTransfer(arg::content=Message("pbar","p")); - // Verify new brokers have all state. + // Verify new brokers have state. Message m; Client c1(cluster[1], "c1"); @@ -228,11 +227,14 @@ QPID_AUTO_TEST_CASE(testCatchupSharedState) { BOOST_CHECK_EQUAL(m.getData(), "bar"); BOOST_CHECK_EQUAL(c1.session.queueQuery("q").getMessageCount(), 0u); - BOOST_CHECK(c1.subs.get(m, "p", TIME_SEC)); + // Add & verify another broker. + cluster.add(); + Client c2(cluster[2], "c2"); + BOOST_CHECK(c2.subs.get(m, "p", TIME_SEC)); BOOST_CHECK_EQUAL(m.getData(), "pfoo"); - BOOST_CHECK(c1.subs.get(m, "p", TIME_SEC)); + BOOST_CHECK(c2.subs.get(m, "p", TIME_SEC)); BOOST_CHECK_EQUAL(m.getData(), "pbar"); - BOOST_CHECK_EQUAL(c1.session.queueQuery("p").getMessageCount(), 0u); + BOOST_CHECK_EQUAL(c2.session.queueQuery("p").getMessageCount(), 0u); } QPID_AUTO_TEST_CASE(testWiringReplication) { @@ -333,7 +335,6 @@ QPID_AUTO_TEST_CASE(testStall) { c1.session.messageTransfer(arg::content=Message("foo","q")); while (c1.session.queueQuery("q").getMessageCount() != 1) ::usleep(1000); // Wait for message to show up on broker 1. - sleep(2); // FIXME aconway 2008-09-11: remove. // But it should not be on broker 0. boost::shared_ptr q0 = cluster.broker0->broker->getQueues().find("q"); BOOST_REQUIRE(q0); -- cgit v1.2.1