summaryrefslogtreecommitdiff
path: root/cpp/src/tests/cluster_test.cpp
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2008-09-21 05:04:04 +0000
committerAlan Conway <aconway@apache.org>2008-09-21 05:04:04 +0000
commit558e92d6c9cf8dfb875c1250ab8fe1cefaf30b05 (patch)
tree9b306597ee07b264fa18580546ed5645f0c3766d /cpp/src/tests/cluster_test.cpp
parent7c70d21ca2d788d4432cfa89851c9b928c9f30aa (diff)
downloadqpid-python-558e92d6c9cf8dfb875c1250ab8fe1cefaf30b05.tar.gz
DumpClient send connections & session IDs to new members.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@697446 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/tests/cluster_test.cpp')
-rw-r--r--cpp/src/tests/cluster_test.cpp46
1 files changed, 29 insertions, 17 deletions
diff --git a/cpp/src/tests/cluster_test.cpp b/cpp/src/tests/cluster_test.cpp
index 1b44902054..60f85df02d 100644
--- a/cpp/src/tests/cluster_test.cpp
+++ b/cpp/src/tests/cluster_test.cpp
@@ -81,21 +81,26 @@ struct ClusterFixture : public vector<uint16_t> {
void add();
void add0(bool force);
void setup();
+
void kill(size_t n) {
if (n) forkedBrokers[n-1].kill();
else broker0->broker->shutdown();
}
+
+ void waitFor(size_t n) {
+ size_t retry=1000; // TODO aconway 2008-07-16: nasty sleeps, clean this up.
+ while (retry && getGlobalCluster().size() != n) {
+ ::usleep(1000);
+ --retry;
+ }
+ }
};
ClusterFixture::ClusterFixture(size_t n, bool init0_) : name(Uuid(true).str()), init0(init0_) {
add(n);
if (!init0) return; // FIXME aconway 2008-09-18: can't use local hack in this case.
// Wait for all n members to join the cluster
- int retry=20; // TODO aconway 2008-07-16: nasty sleeps, clean this up.
- while (retry && getGlobalCluster().size() != n) {
- ::sleep(1);
- --retry;
- }
+ waitFor(n);
BOOST_REQUIRE_EQUAL(n, getGlobalCluster().size());
}
@@ -139,7 +144,7 @@ void ClusterFixture::add0(bool init) {
qpid::log::Logger::instance().setPrefix("main");
broker0.reset(new BrokerFixture(parseOpts(argc, argv)));
- push_back(broker0->getPort());
+ if (size()) front() = broker0->getPort(); else push_back(broker0->getPort());
}
// For debugging: op << for CPG types.
@@ -190,14 +195,12 @@ QPID_AUTO_TEST_CASE(testDumpConsumers) {
BOOST_CHECK_EQUAL(a.session.queueQuery("q").getMessageCount(), (unsigned)0);
}
-
#endif
-
QPID_AUTO_TEST_CASE(testCatchupSharedState) {
ClusterFixture cluster(1);
-
Client c0(cluster[0], "c0");
+
// Create some shared state.
c0.session.queueDeclare("q");
c0.session.messageTransfer(arg::content=Message("foo","q"));
@@ -205,24 +208,33 @@ QPID_AUTO_TEST_CASE(testCatchupSharedState) {
while (c0.session.queueQuery("q").getMessageCount() != 2)
::usleep(1000); // Wait for message to show up on broker 0.
- // FIXME aconway 2008-09-18: close session until we catchup session state also.
- c0.session.close();
- c0.connection.close();
-
- // Now join new broker, should catch up.
+ // Add a new broker, it should catch up.
cluster.add();
- // FIXME aconway 2008-09-18: when we do session state try adding
- // further stuff from broker 0, and leaving a subscription active.
+ // Do some work post-add
+ c0.session.queueDeclare("p");
+ c0.session.messageTransfer(arg::content=Message("pfoo","p"));
+ // Do some work post-join
+ cluster.waitFor(2);
+ c0.session.messageTransfer(arg::content=Message("pbar","p"));
+
// Verify new broker has all state.
Message m;
+
Client c1(cluster[1], "c1");
+
BOOST_CHECK(c1.subs.get(m, "q", TIME_SEC));
BOOST_CHECK_EQUAL(m.getData(), "foo");
BOOST_CHECK(c1.subs.get(m, "q", TIME_SEC));
BOOST_CHECK_EQUAL(m.getData(), "bar");
- BOOST_CHECK_EQUAL(c1.session.queueQuery("q").getMessageCount(), (unsigned)0);
+ BOOST_CHECK_EQUAL(c1.session.queueQuery("q").getMessageCount(), 0u);
+
+ BOOST_CHECK(c1.subs.get(m, "p", TIME_SEC));
+ BOOST_CHECK_EQUAL(m.getData(), "pfoo");
+ BOOST_CHECK(c1.subs.get(m, "p", TIME_SEC));
+ BOOST_CHECK_EQUAL(m.getData(), "pbar");
+ BOOST_CHECK_EQUAL(c1.session.queueQuery("p").getMessageCount(), 0u);
}
QPID_AUTO_TEST_CASE(testWiringReplication) {