diff options
author | Alan Conway <aconway@apache.org> | 2008-09-24 17:34:08 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2008-09-24 17:34:08 +0000 |
commit | a2a56cf9a7483e165fb579d0b519b284d02009e3 (patch) | |
tree | 11264fc87ea6e54c54b476e245ad4ee9c83faaeb /cpp/src/tests/cluster_test.cpp | |
parent | 30be110b6914959a1eaee4803ff8c1c9938db7bb (diff) | |
download | qpid-python-a2a56cf9a7483e165fb579d0b519b284d02009e3.tar.gz |
Cluster replicates session command sequence state and consumers to newcomers.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@698666 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/tests/cluster_test.cpp')
-rw-r--r-- | cpp/src/tests/cluster_test.cpp | 49 |
1 files changed, 25 insertions, 24 deletions
diff --git a/cpp/src/tests/cluster_test.cpp b/cpp/src/tests/cluster_test.cpp index 6bb5e4a8ca..9573caf61d 100644 --- a/cpp/src/tests/cluster_test.cpp +++ b/cpp/src/tests/cluster_test.cpp @@ -98,7 +98,7 @@ struct ClusterFixture : public vector<uint16_t> { ClusterFixture::ClusterFixture(size_t n, bool init0_) : name(Uuid(true).str()), init0(init0_) { add(n); - if (!init0) return; // FIXME aconway 2008-09-18: can't use local hack in this case. + if (!init0) return; // Defer initialization of broker0 // Wait for all n members to join the cluster waitFor(n); BOOST_REQUIRE_EQUAL(n, getGlobalCluster().size()); @@ -164,36 +164,35 @@ ostream& operator<<(ostream& o, const pair<T*, int>& array) { return o; } -#if 0 // FIXME aconway 2008-09-22: enable. QPID_AUTO_TEST_CASE(DumpConsumers) { - ClusterFixture cluster(1); - Client c0(cluster[0]); + ClusterFixture cluster(1); + Client c0(cluster[0], "c0"); c0.session.queueDeclare("q"); - c0.subs.subscribe(c0.lq, "q"); - c0.session.messageTransfer(arg::content=Message("before", "q")); - Message m; - BOOST_CHECK(c0.lq.get(m, TIME_SEC)); - BOOST_CHECK_EQUAL(m.getData(), "before"); + c0.subs.subscribe(c0.lq, "q", FlowControl::zero()); + c0.session.sync(); - // Start new member + // Start new members cluster.add(); - Client c1(cluster[1]); + Client c1(cluster[1], "c1"); + cluster.add(); + Client c2(cluster[2], "c2"); - // Transfer some messages to the subscription by client c0. + // Transfer a message, verify all members see it. c0.session.messageTransfer(arg::content=Message("aaa", "q")); - BOOST_CHECK(c0.lq.get(m, TIME_SEC)); - BOOST_CHECK_EQUAL(m.getData(), "aaa"); + BOOST_CHECK_EQUAL(c0.session.queueQuery("q").getMessageCount(), 1u); + BOOST_CHECK_EQUAL(c1.session.queueQuery("q").getMessageCount(), 1u); + BOOST_CHECK_EQUAL(c2.session.queueQuery("q").getMessageCount(), 1u); - c1.session.messageTransfer(arg::content=Message("bbb", "q")); + // Activate the subscription, ensure message removed on all queues. + c0.subs.setFlowControl("q", FlowControl::messageCredit(1)); + Message m; BOOST_CHECK(c0.lq.get(m, TIME_SEC)); - BOOST_CHECK_EQUAL(m.getData(), "bbb"); + BOOST_CHECK_EQUAL(m.getData(), "aaa"); - // Verify that the queue has been drained on both brokers. - // This proves that the consumer was replicated when the second broker joined. BOOST_CHECK_EQUAL(c0.session.queueQuery("q").getMessageCount(), 0u); BOOST_CHECK_EQUAL(c1.session.queueQuery("q").getMessageCount(), 0u); + BOOST_CHECK_EQUAL(c2.session.queueQuery("q").getMessageCount(), 0u); } -#endif QPID_AUTO_TEST_CASE(testCatchupSharedState) { ClusterFixture cluster(1); @@ -217,7 +216,7 @@ QPID_AUTO_TEST_CASE(testCatchupSharedState) { cluster.waitFor(2); c0.session.messageTransfer(arg::content=Message("pbar","p")); - // Verify new brokers have all state. + // Verify new brokers have state. Message m; Client c1(cluster[1], "c1"); @@ -228,11 +227,14 @@ QPID_AUTO_TEST_CASE(testCatchupSharedState) { BOOST_CHECK_EQUAL(m.getData(), "bar"); BOOST_CHECK_EQUAL(c1.session.queueQuery("q").getMessageCount(), 0u); - BOOST_CHECK(c1.subs.get(m, "p", TIME_SEC)); + // Add & verify another broker. + cluster.add(); + Client c2(cluster[2], "c2"); + BOOST_CHECK(c2.subs.get(m, "p", TIME_SEC)); BOOST_CHECK_EQUAL(m.getData(), "pfoo"); - BOOST_CHECK(c1.subs.get(m, "p", TIME_SEC)); + BOOST_CHECK(c2.subs.get(m, "p", TIME_SEC)); BOOST_CHECK_EQUAL(m.getData(), "pbar"); - BOOST_CHECK_EQUAL(c1.session.queueQuery("p").getMessageCount(), 0u); + BOOST_CHECK_EQUAL(c2.session.queueQuery("p").getMessageCount(), 0u); } QPID_AUTO_TEST_CASE(testWiringReplication) { @@ -333,7 +335,6 @@ QPID_AUTO_TEST_CASE(testStall) { c1.session.messageTransfer(arg::content=Message("foo","q")); while (c1.session.queueQuery("q").getMessageCount() != 1) ::usleep(1000); // Wait for message to show up on broker 1. - sleep(2); // FIXME aconway 2008-09-11: remove. // But it should not be on broker 0. boost::shared_ptr<broker::Queue> q0 = cluster.broker0->broker->getQueues().find("q"); BOOST_REQUIRE(q0); |