summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/tests
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src/tests')
-rw-r--r--qpid/cpp/src/tests/cluster_test.cpp167
1 files changed, 88 insertions, 79 deletions
diff --git a/qpid/cpp/src/tests/cluster_test.cpp b/qpid/cpp/src/tests/cluster_test.cpp
index 8dec23a09b..1b44902054 100644
--- a/qpid/cpp/src/tests/cluster_test.cpp
+++ b/qpid/cpp/src/tests/cluster_test.cpp
@@ -74,10 +74,12 @@ struct ClusterFixture : public vector<uint16_t> {
string name;
std::auto_ptr<BrokerFixture> broker0;
boost::ptr_vector<ForkedBroker> forkedBrokers;
+ bool init0;
- ClusterFixture(size_t n);
+ ClusterFixture(size_t n, bool init0=true);
void add(size_t n) { for (size_t i=0; i < n; ++i) add(); }
void add();
+ void add0(bool force);
void setup();
void kill(size_t n) {
if (n) forkedBrokers[n-1].kill();
@@ -85,8 +87,9 @@ struct ClusterFixture : public vector<uint16_t> {
}
};
-ClusterFixture::ClusterFixture(size_t n) : name(Uuid(true).str()) {
+ClusterFixture::ClusterFixture(size_t n, bool init0_) : name(Uuid(true).str()), init0(init0_) {
add(n);
+ if (!init0) return; // FIXME aconway 2008-09-18: can't use local hack in this case.
// Wait for all n members to join the cluster
int retry=20; // TODO aconway 2008-07-16: nasty sleeps, clean this up.
while (retry && getGlobalCluster().size() != n) {
@@ -101,24 +104,42 @@ void ClusterFixture::add() {
os << "fork" << size();
std::string prefix = os.str();
+ if (size()) { // Not the first broker, fork.
+
+ const char* argv[] = {
+ "qpidd " __FILE__ ,
+ "--load-module=../.libs/cluster.so",
+ "--cluster-name", name.c_str(),
+ "--auth=no", "--no-data-dir",
+ "--log-prefix", prefix.c_str(),
+ };
+ size_t argc = sizeof(argv)/sizeof(argv[0]);
+
+
+ forkedBrokers.push_back(new ForkedBroker(argc, argv));
+ push_back(forkedBrokers.back().getPort());
+ }
+ else {
+ add0(init0); // First broker, run in this process.
+ }
+}
+
+void ClusterFixture::add0(bool init) {
+ if (!init) {
+ push_back(0);
+ return;
+ }
const char* argv[] = {
"qpidd " __FILE__ ,
"--load-module=../.libs/cluster.so",
"--cluster-name", name.c_str(),
- "--auth=no", "--no-data-dir",
- "--log-prefix", prefix.c_str(),
+ "--auth=no", "--no-data-dir"
};
size_t argc = sizeof(argv)/sizeof(argv[0]);
- if (size()) { // Not the first broker, fork.
- forkedBrokers.push_back(new ForkedBroker(argc, argv));
- push_back(forkedBrokers.back().getPort());
- }
- else { // First broker, run in this process.
- qpid::log::Logger::instance().setPrefix("main");
- broker0.reset(new BrokerFixture(parseOpts(argc, argv)));
- push_back(broker0->getPort());
- }
+ qpid::log::Logger::instance().setPrefix("main");
+ broker0.reset(new BrokerFixture(parseOpts(argc, argv)));
+ push_back(broker0->getPort());
}
// For debugging: op << for CPG types.
@@ -140,60 +161,6 @@ ostream& operator<<(ostream& o, const pair<T*, int>& array) {
return o;
}
-QPID_AUTO_TEST_CASE_EXPECTED_FAILURES(testCatchupSharedState, 1) {
- ClusterFixture cluster(1);
- Client c0(cluster[0], "c0");
- // Create some shared state.
- c0.session.queueDeclare("q");
- c0.session.messageTransfer(arg::content=Message("foo","q"));
- while (c0.session.queueQuery("q").getMessageCount() != 1)
- ::usleep(1000); // Wait for message to show up on broker 0.
-
- // Now join new broker, should catch up.
- cluster.add();
- c0.session.messageTransfer(arg::content=Message("bar","q"));
- c0.session.queueDeclare("p");
- c0.session.messageTransfer(arg::content=Message("poo","p"));
-
- // Verify new broker has all state.
- Message m;
- Client c1(cluster[1], "c1");
- BOOST_CHECK(c1.subs.get(m, "q", TIME_SEC));
- BOOST_CHECK_EQUAL(m.getData(), "foo");
- BOOST_CHECK(c1.subs.get(m, "q", TIME_SEC));
- BOOST_CHECK_EQUAL(m.getData(), "bar");
- BOOST_CHECK_EQUAL(c1.session.queueQuery("q").getMessageCount(), (unsigned)0);
- BOOST_CHECK(c1.subs.get(m, "p", TIME_SEC));
- BOOST_CHECK_EQUAL(m.getData(), "poo");
- BOOST_CHECK_EQUAL(c1.session.queueQuery("p").getMessageCount(), (unsigned)0);
-}
-
-QPID_AUTO_TEST_CASE(testStall) {
- ClusterFixture cluster(2);
- Client c0(cluster[0], "c0");
- Client c1(cluster[1], "c1");
-
- // Declare on all to avoid race condition.
- c0.session.queueDeclare("q");
- c1.session.queueDeclare("q");
-
- // Stall 0, verify it does not process deliverys while stalled.
- getGlobalCluster().stall();
- c1.session.messageTransfer(arg::content=Message("foo","q"));
- while (c1.session.queueQuery("q").getMessageCount() != 1)
- ::usleep(1000); // Wait for message to show up on broker 1.
- sleep(2); // FIXME aconway 2008-09-11: remove.
- // But it should not be on broker 0.
- boost::shared_ptr<broker::Queue> q0 = cluster.broker0->broker->getQueues().find("q");
- BOOST_REQUIRE(q0);
- BOOST_CHECK_EQUAL(q0->getMessageCount(), (unsigned)0);
- // Now unstall and we should get the message.
- getGlobalCluster().ready();
- Message m;
- BOOST_CHECK(c0.subs.get(m, "q", TIME_SEC));
- BOOST_CHECK_EQUAL(m.getData(), "foo");
-}
-
#if 0 // FIXME aconway 2008-09-10: finish & enable
QPID_AUTO_TEST_CASE(testDumpConsumers) {
ClusterFixture cluster(1);
@@ -226,20 +193,36 @@ QPID_AUTO_TEST_CASE(testDumpConsumers) {
#endif
-QPID_AUTO_TEST_CASE(testForkedBroker) {
- // Verify the ForkedBroker works as expected.
- const char* argv[] = { "", "--auth=no", "--no-data-dir", "--log-prefix=testForkedBroker" };
- ForkedBroker broker(sizeof(argv)/sizeof(argv[0]), argv);
- Client c(broker.getPort());
- BOOST_CHECK_EQUAL("direct", c.session.exchangeQuery("amq.direct").getType());
-}
-QPID_AUTO_TEST_CASE(testSingletonCluster) {
- // Test against a singleton cluster, verify basic operation.
+QPID_AUTO_TEST_CASE(testCatchupSharedState) {
ClusterFixture cluster(1);
- Client c(cluster[0]);
- BOOST_CHECK(c.session.queueQuery("q").getQueue().empty());
- BOOST_CHECK(c.session.exchangeQuery("ex").getNotFound());
+
+ Client c0(cluster[0], "c0");
+ // Create some shared state.
+ c0.session.queueDeclare("q");
+ c0.session.messageTransfer(arg::content=Message("foo","q"));
+ c0.session.messageTransfer(arg::content=Message("bar","q"));
+ while (c0.session.queueQuery("q").getMessageCount() != 2)
+ ::usleep(1000); // Wait for message to show up on broker 0.
+
+ // FIXME aconway 2008-09-18: close session until we catchup session state also.
+ c0.session.close();
+ c0.connection.close();
+
+ // Now join new broker, should catch up.
+ cluster.add();
+
+ // FIXME aconway 2008-09-18: when we do session state try adding
+ // further stuff from broker 0, and leaving a subscription active.
+
+ // Verify new broker has all state.
+ Message m;
+ Client c1(cluster[1], "c1");
+ BOOST_CHECK(c1.subs.get(m, "q", TIME_SEC));
+ BOOST_CHECK_EQUAL(m.getData(), "foo");
+ BOOST_CHECK(c1.subs.get(m, "q", TIME_SEC));
+ BOOST_CHECK_EQUAL(m.getData(), "bar");
+ BOOST_CHECK_EQUAL(c1.session.queueQuery("q").getMessageCount(), (unsigned)0);
}
QPID_AUTO_TEST_CASE(testWiringReplication) {
@@ -326,4 +309,30 @@ QPID_AUTO_TEST_CASE(testDequeueWaitingSubscription) {
BOOST_CHECK_EQUAL(0u, c2.session.queueQuery("q").getMessageCount());
}
+QPID_AUTO_TEST_CASE(testStall) {
+ ClusterFixture cluster(2);
+ Client c0(cluster[0], "c0");
+ Client c1(cluster[1], "c1");
+
+ // Declare on all to avoid race condition.
+ c0.session.queueDeclare("q");
+ c1.session.queueDeclare("q");
+
+ // Stall 0, verify it does not process deliverys while stalled.
+ getGlobalCluster().stall();
+ c1.session.messageTransfer(arg::content=Message("foo","q"));
+ while (c1.session.queueQuery("q").getMessageCount() != 1)
+ ::usleep(1000); // Wait for message to show up on broker 1.
+ sleep(2); // FIXME aconway 2008-09-11: remove.
+ // But it should not be on broker 0.
+ boost::shared_ptr<broker::Queue> q0 = cluster.broker0->broker->getQueues().find("q");
+ BOOST_REQUIRE(q0);
+ BOOST_CHECK_EQUAL(q0->getMessageCount(), (unsigned)0);
+ // Now unstall and we should get the message.
+ getGlobalCluster().ready();
+ Message m;
+ BOOST_CHECK(c0.subs.get(m, "q", TIME_SEC));
+ BOOST_CHECK_EQUAL(m.getData(), "foo");
+}
+
QPID_AUTO_TEST_SUITE_END()