summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/tests/cluster_test.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src/tests/cluster_test.cpp')
-rw-r--r--qpid/cpp/src/tests/cluster_test.cpp144
1 files changed, 107 insertions, 37 deletions
diff --git a/qpid/cpp/src/tests/cluster_test.cpp b/qpid/cpp/src/tests/cluster_test.cpp
index 1f9aac8fc5..da77d7405e 100644
--- a/qpid/cpp/src/tests/cluster_test.cpp
+++ b/qpid/cpp/src/tests/cluster_test.cpp
@@ -59,12 +59,19 @@ using boost::ptr_vector;
using qpid::cluster::Cluster;
using qpid::cluster::getGlobalCluster;
+/** Parse broker & cluster options */
+Broker::Options parseOpts(size_t argc, const char* argv[]) {
+ Broker::Options opts;
+ Plugin::addOptions(opts); // Pick up cluster options.
+ opts.parse(argc, argv, "", true); // Allow-unknown for --load-module
+ return opts;
+}
+
/** Cluster fixture is a vector of ports for the replicas.
* Replica 0 is in the current process, all others are forked as children.
*/
struct ClusterFixture : public vector<uint16_t> {
string name;
- Broker::Options opts;
std::auto_ptr<BrokerFixture> broker0;
boost::ptr_vector<ForkedBroker> forkedBrokers;
@@ -96,7 +103,7 @@ void ClusterFixture::add() {
const char* argv[] = {
"qpidd " __FILE__ ,
- "--load-module=../.libs/libqpidcluster.so",
+ "--load-module=../.libs/cluster.so",
"--cluster-name", name.c_str(),
"--auth=no", "--no-data-dir",
"--log-prefix", prefix.c_str(),
@@ -108,11 +115,8 @@ void ClusterFixture::add() {
push_back(forkedBrokers.back().getPort());
}
else { // First broker, run in this process.
- Broker::Options opts;
qpid::log::Logger::instance().setPrefix("main");
- Plugin::addOptions(opts); // Pick up cluster options.
- opts.parse(argc, argv, "", true); // Allow-unknown for --load-module
- broker0.reset(new BrokerFixture(opts));
+ broker0.reset(new BrokerFixture(parseOpts(argc, argv)));
push_back(broker0->getPort());
}
}
@@ -136,38 +140,16 @@ ostream& operator<<(ostream& o, const pair<T*, int>& array) {
return o;
}
-#if 0 // FIXME aconway 2008-09-10: finish & enable
-QPID_AUTO_TEST_CASE(testDumpConsumers) {
- ClusterFixture cluster(1);
- Client a(cluster[0]);
- a.session.queueDeclare("q");
- a.subs.subscribe(a.lq, "q");
-
- cluster.add();
- Client b(cluster[1]);
- try {
- b.connection.newSession(a.session.getId().getName());
- BOOST_FAIL("Expected SessionBusyException for " << a.session.getId().getName());
- } catch (const SessionBusyException&) {}
-
- // Transfer some messages to the subscription by client a.
- Message m;
- a.session.messageTransfer(arg::bindingKey="q", arg::content=Message("aaa", "q"));
- BOOST_CHECK(a.lq.get(m, TIME_SEC));
- BOOST_CHECK_EQUAL(m.getData(), "aaa");
-
- b.session.messageTransfer(arg::bindingKey="q", arg::content=Message("bbb", "q"));
- BOOST_CHECK(a.lq.get(m, TIME_SEC));
- BOOST_CHECK_EQUAL(m.getData(), "bbb");
-
- // Verify that the queue has been drained on both brokers.
- // This proves that the consumer was replicated when the second broker joined.
- BOOST_CHECK_EQUAL(a.session.queueQuery("q").getMessageCount(), 0);
-}
-#endif
-
+// FIXME aconway 2008-09-11: This test has to be first otherwise
+// it picks up the cluster name from a previous test and runs the
+// brokers as cluster nodes. Something wrong with option parsing...
+//
QPID_AUTO_TEST_CASE(testDumpClientSharedState) {
- BrokerFixture donor, receiver;
+ // In this test we don't want the cluster plugin to initialize, so set --cluster-name=""
+ const char* argv[] = { "--cluster-name", "" };
+ Broker::Options opts = parseOpts(sizeof(argv)/sizeof(*argv), argv);
+
+ BrokerFixture donor(opts), receiver(opts);
{
Client c(donor.getPort());
FieldTable args;
@@ -246,6 +228,94 @@ QPID_AUTO_TEST_CASE(testDumpClientSharedState) {
}
}
+
+// FIXME aconway 2008-09-12: finish the new join protocol.
+QPID_AUTO_TEST_CASE_EXPECTED_FAILURES(testCatchUpSharedState, 1) {
+ ClusterFixture cluster(1);
+ Client c0(cluster[0], "c0");
+ // Create some shared state.
+ c0.session.queueDeclare("q");
+ c0.session.messageTransfer(arg::content=Message("foo","q"));
+ while (c0.session.queueQuery("q").getMessageCount() != 1)
+ ::usleep(1000); // Wait for message to show up on broker 0.
+
+ // Now join new broker, should catch up.
+ cluster.add();
+ c0.session.messageTransfer(arg::content=Message("bar","q"));
+ c0.session.queueDeclare("p");
+ c0.session.messageTransfer(arg::content=Message("poo","p"));
+
+ // Verify new broker has all state.
+ Message m;
+ Client c1(cluster[1], "c1");
+ BOOST_CHECK(c1.subs.get(m, "q", TIME_SEC));
+ BOOST_CHECK_EQUAL(m.getData(), "foo");
+ BOOST_CHECK(c1.subs.get(m, "q", TIME_SEC));
+ BOOST_CHECK_EQUAL(m.getData(), "bar");
+ BOOST_CHECK_EQUAL(c1.session.queueQuery("q").getMessageCount(), 0);
+ BOOST_CHECK(c1.subs.get(m, "p", TIME_SEC));
+ BOOST_CHECK_EQUAL(m.getData(), "poo");
+ BOOST_CHECK_EQUAL(c1.session.queueQuery("p").getMessageCount(), 0);
+}
+
+QPID_AUTO_TEST_CASE(testStall) {
+ ClusterFixture cluster(2);
+ Client c0(cluster[0], "c0");
+ Client c1(cluster[1], "c1");
+
+ // Declare on all to avoid race condition.
+ c0.session.queueDeclare("q");
+ c1.session.queueDeclare("q");
+
+ // Stall 0, verify it does not process deliverys while stalled.
+ getGlobalCluster().stall();
+ c1.session.messageTransfer(arg::content=Message("foo","q"));
+ while (c1.session.queueQuery("q").getMessageCount() != 1)
+ ::usleep(1000); // Wait for message to show up on broker 1.
+ sleep(2); // FIXME aconway 2008-09-11: remove.
+ // But it should not be on broker 0.
+ boost::shared_ptr<broker::Queue> q0 = cluster.broker0->broker->getQueues().find("q");
+ BOOST_REQUIRE(q0);
+ BOOST_CHECK_EQUAL(q0->getMessageCount(), 0);
+ // Now unstall and we should get the message.
+ getGlobalCluster().unStall();
+ Message m;
+ BOOST_CHECK(c0.subs.get(m, "q", TIME_SEC));
+ BOOST_CHECK_EQUAL(m.getData(), "foo");
+}
+
+#if 0 // FIXME aconway 2008-09-10: finish & enable
+QPID_AUTO_TEST_CASE(testDumpConsumers) {
+ ClusterFixture cluster(1);
+ Client a(cluster[0]);
+ a.session.queueDeclare("q");
+ a.subs.subscribe(a.lq, "q");
+
+ cluster.add();
+ Client b(cluster[1]);
+ try {
+ b.connection.newSession(a.session.getId().getName());
+ BOOST_FAIL("Expected SessionBusyException for " << a.session.getId().getName());
+ } catch (const SessionBusyException&) {}
+
+ // Transfer some messages to the subscription by client a.
+ Message m;
+ a.session.messageTransfer(arg::content=Message("aaa", "q"));
+ BOOST_CHECK(a.lq.get(m, TIME_SEC));
+ BOOST_CHECK_EQUAL(m.getData(), "aaa");
+
+ b.session.messageTransfer(arg::content=Message("bbb", "q"));
+ BOOST_CHECK(a.lq.get(m, TIME_SEC));
+ BOOST_CHECK_EQUAL(m.getData(), "bbb");
+
+ // Verify that the queue has been drained on both brokers.
+ // This proves that the consumer was replicated when the second broker joined.
+ BOOST_CHECK_EQUAL(a.session.queueQuery("q").getMessageCount(), 0);
+}
+
+
+#endif
+
QPID_AUTO_TEST_CASE(testForkedBroker) {
// Verify the ForkedBroker works as expected.
const char* argv[] = { "", "--auth=no", "--no-data-dir", "--log-prefix=testForkedBroker" };