summaryrefslogtreecommitdiff
path: root/cpp/src/tests/cluster_test.cpp
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2008-09-24 17:34:08 +0000
committerAlan Conway <aconway@apache.org>2008-09-24 17:34:08 +0000
commita2a56cf9a7483e165fb579d0b519b284d02009e3 (patch)
tree11264fc87ea6e54c54b476e245ad4ee9c83faaeb /cpp/src/tests/cluster_test.cpp
parent30be110b6914959a1eaee4803ff8c1c9938db7bb (diff)
downloadqpid-python-a2a56cf9a7483e165fb579d0b519b284d02009e3.tar.gz
Cluster replicates session command sequence state and consumers to newcomers.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@698666 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/tests/cluster_test.cpp')
-rw-r--r--cpp/src/tests/cluster_test.cpp49
1 files changed, 25 insertions, 24 deletions
diff --git a/cpp/src/tests/cluster_test.cpp b/cpp/src/tests/cluster_test.cpp
index 6bb5e4a8ca..9573caf61d 100644
--- a/cpp/src/tests/cluster_test.cpp
+++ b/cpp/src/tests/cluster_test.cpp
@@ -98,7 +98,7 @@ struct ClusterFixture : public vector<uint16_t> {
ClusterFixture::ClusterFixture(size_t n, bool init0_) : name(Uuid(true).str()), init0(init0_) {
add(n);
- if (!init0) return; // FIXME aconway 2008-09-18: can't use local hack in this case.
+ if (!init0) return; // Defer initialization of broker0
// Wait for all n members to join the cluster
waitFor(n);
BOOST_REQUIRE_EQUAL(n, getGlobalCluster().size());
@@ -164,36 +164,35 @@ ostream& operator<<(ostream& o, const pair<T*, int>& array) {
return o;
}
-#if 0 // FIXME aconway 2008-09-22: enable.
QPID_AUTO_TEST_CASE(DumpConsumers) {
- ClusterFixture cluster(1);
- Client c0(cluster[0]);
+ ClusterFixture cluster(1);
+ Client c0(cluster[0], "c0");
c0.session.queueDeclare("q");
- c0.subs.subscribe(c0.lq, "q");
- c0.session.messageTransfer(arg::content=Message("before", "q"));
- Message m;
- BOOST_CHECK(c0.lq.get(m, TIME_SEC));
- BOOST_CHECK_EQUAL(m.getData(), "before");
+ c0.subs.subscribe(c0.lq, "q", FlowControl::zero());
+ c0.session.sync();
- // Start new member
+ // Start new members
cluster.add();
- Client c1(cluster[1]);
+ Client c1(cluster[1], "c1");
+ cluster.add();
+ Client c2(cluster[2], "c2");
- // Transfer some messages to the subscription by client c0.
+ // Transfer a message, verify all members see it.
c0.session.messageTransfer(arg::content=Message("aaa", "q"));
- BOOST_CHECK(c0.lq.get(m, TIME_SEC));
- BOOST_CHECK_EQUAL(m.getData(), "aaa");
+ BOOST_CHECK_EQUAL(c0.session.queueQuery("q").getMessageCount(), 1u);
+ BOOST_CHECK_EQUAL(c1.session.queueQuery("q").getMessageCount(), 1u);
+ BOOST_CHECK_EQUAL(c2.session.queueQuery("q").getMessageCount(), 1u);
- c1.session.messageTransfer(arg::content=Message("bbb", "q"));
+ // Activate the subscription, ensure message removed on all queues.
+ c0.subs.setFlowControl("q", FlowControl::messageCredit(1));
+ Message m;
BOOST_CHECK(c0.lq.get(m, TIME_SEC));
- BOOST_CHECK_EQUAL(m.getData(), "bbb");
+ BOOST_CHECK_EQUAL(m.getData(), "aaa");
- // Verify that the queue has been drained on both brokers.
- // This proves that the consumer was replicated when the second broker joined.
BOOST_CHECK_EQUAL(c0.session.queueQuery("q").getMessageCount(), 0u);
BOOST_CHECK_EQUAL(c1.session.queueQuery("q").getMessageCount(), 0u);
+ BOOST_CHECK_EQUAL(c2.session.queueQuery("q").getMessageCount(), 0u);
}
-#endif
QPID_AUTO_TEST_CASE(testCatchupSharedState) {
ClusterFixture cluster(1);
@@ -217,7 +216,7 @@ QPID_AUTO_TEST_CASE(testCatchupSharedState) {
cluster.waitFor(2);
c0.session.messageTransfer(arg::content=Message("pbar","p"));
- // Verify new brokers have all state.
+ // Verify new brokers have state.
Message m;
Client c1(cluster[1], "c1");
@@ -228,11 +227,14 @@ QPID_AUTO_TEST_CASE(testCatchupSharedState) {
BOOST_CHECK_EQUAL(m.getData(), "bar");
BOOST_CHECK_EQUAL(c1.session.queueQuery("q").getMessageCount(), 0u);
- BOOST_CHECK(c1.subs.get(m, "p", TIME_SEC));
+ // Add & verify another broker.
+ cluster.add();
+ Client c2(cluster[2], "c2");
+ BOOST_CHECK(c2.subs.get(m, "p", TIME_SEC));
BOOST_CHECK_EQUAL(m.getData(), "pfoo");
- BOOST_CHECK(c1.subs.get(m, "p", TIME_SEC));
+ BOOST_CHECK(c2.subs.get(m, "p", TIME_SEC));
BOOST_CHECK_EQUAL(m.getData(), "pbar");
- BOOST_CHECK_EQUAL(c1.session.queueQuery("p").getMessageCount(), 0u);
+ BOOST_CHECK_EQUAL(c2.session.queueQuery("p").getMessageCount(), 0u);
}
QPID_AUTO_TEST_CASE(testWiringReplication) {
@@ -333,7 +335,6 @@ QPID_AUTO_TEST_CASE(testStall) {
c1.session.messageTransfer(arg::content=Message("foo","q"));
while (c1.session.queueQuery("q").getMessageCount() != 1)
::usleep(1000); // Wait for message to show up on broker 1.
- sleep(2); // FIXME aconway 2008-09-11: remove.
// But it should not be on broker 0.
boost::shared_ptr<broker::Queue> q0 = cluster.broker0->broker->getQueues().find("q");
BOOST_REQUIRE(q0);