diff options
Diffstat (limited to 'cpp/src/tests/cluster_test.cpp')
-rw-r--r-- | cpp/src/tests/cluster_test.cpp | 93 |
1 files changed, 67 insertions, 26 deletions
diff --git a/cpp/src/tests/cluster_test.cpp b/cpp/src/tests/cluster_test.cpp index d361919f0b..cafac489d2 100644 --- a/cpp/src/tests/cluster_test.cpp +++ b/cpp/src/tests/cluster_test.cpp @@ -16,13 +16,13 @@ * */ - #include "test_tools.h" #include "unit_test.h" #include "ForkedBroker.h" #include "BrokerFixture.h" #include "qpid/cluster/Cpg.h" +#include "qpid/cluster/Cluster.h" #include "qpid/framing/AMQBody.h" #include "qpid/client/Connection.h" #include "qpid/client/Session.h" @@ -37,10 +37,13 @@ #include <vector> #include <algorithm> -#include <signal.h> +namespace qpid { +namespace cluster { +boost::intrusive_ptr<Cluster> getGlobalCluster(); // Defined in qpid/cluster/ClusterPlugin.cpp +}} // namespace qpid::cluster -QPID_AUTO_TEST_SUITE(CpgTestSuite) +QPID_AUTO_TEST_SUITE(CpgTestSuite) using namespace std; using namespace qpid; @@ -49,27 +52,60 @@ using namespace qpid::framing; using namespace qpid::client; using qpid::broker::Broker; using boost::ptr_vector; +using qpid::cluster::Cluster; +using qpid::cluster::getGlobalCluster; -struct ClusterFixture : public ptr_vector<ForkedBroker> { +/** Cluster fixture is a vector of ports for the replicas. + * Replica 0 is in the current process, all others are forked as children. + */ +struct ClusterFixture : public vector<uint16_t> { string name; + Broker::Options opts; + std::auto_ptr<BrokerFixture> broker0; + boost::ptr_vector<ForkedBroker> forkedBrokers; - ClusterFixture(size_t n=0) : name(Uuid(true).str()) { add(n); } + ClusterFixture(size_t n); void add(size_t n) { for (size_t i=0; i < n; ++i) add(); } void add(); + void setup(); }; +ClusterFixture::ClusterFixture(size_t n) : name(Uuid(true).str()) { + add(n); + // Wait for all n members to join the cluster + int retry=20; // TODO aconway 2008-07-16: nasty sleeps, clean this up. + while (retry && getGlobalCluster()->size() != n) { + ::sleep(1); + --retry; + } + BOOST_CHECK_EQUAL(n, getGlobalCluster()->size()); +} + void ClusterFixture::add() { - broker::Broker::Options opts; - Plugin::addOptions(opts); // For cluster options. + std::ostringstream os; + os << "broker" << size(); + std::string prefix = os.str(); + const char* argv[] = { - "", "--cluster-name", name.c_str(), "--auth=no", "--no-data-dir" + "qpidd " __FILE__ , + "--load-module=../.libs/libqpidcluster.so", + "--cluster-name", name.c_str(), + "--auth=no", "--no-data-dir", + "--log-prefix", prefix.c_str(), }; - opts.parse(sizeof(argv)/sizeof(*argv), const_cast<char**>(argv)); - ostringstream prefix; - prefix << "b" << size() << "-"; - QPID_LOG(info, "ClusterFixture adding broker " << prefix.str()); - push_back(new ForkedBroker(opts, prefix.str())); - QPID_LOG(info, "ClusterFixture added broker " << prefix.str()); + size_t argc = sizeof(argv)/sizeof(argv[0]); + + if (size()) { // Not the first broker, fork. + forkedBrokers.push_back(new ForkedBroker(argc, argv)); + push_back(forkedBrokers.back().getPort()); + } + else { // First broker, run in this process. + Broker::Options opts; + Plugin::addOptions(opts); // Pick up cluster options. + opts.parse(argc, argv, "", true); // Allow-unknown for --load-module + broker0.reset(new BrokerFixture(opts)); + push_back(broker0->getPort()); + } } // For debugging: op << for CPG types. @@ -149,26 +185,25 @@ QPID_AUTO_TEST_CASE(CpgBasic) { QPID_AUTO_TEST_CASE(testForkedBroker) { // Verify the ForkedBroker works as expected. - Broker::Options opts; - opts.auth="no"; - opts.noDataDir=true; - ForkedBroker broker(opts); + const char* argv[] = { "", "--auth=no", "--no-data-dir", "--log-prefix=testForkedBroker" }; + ForkedBroker broker(sizeof(argv)/sizeof(argv[0]), argv); Client c(broker.getPort()); BOOST_CHECK_EQUAL("direct", c.session.exchangeQuery("amq.direct").getType()); } QPID_AUTO_TEST_CASE(testWiringReplication) { - ClusterFixture cluster(2); // FIXME aconway 2008-07-02: 3 brokers - Client c0(cluster[0].getPort()); + ClusterFixture cluster(3); + Client c0(cluster[0]); BOOST_CHECK(c0.session.queueQuery("q").getQueue().empty()); BOOST_CHECK(c0.session.exchangeQuery("ex").getType().empty()); c0.session.queueDeclare("q"); c0.session.exchangeDeclare("ex", arg::type="direct"); c0.session.close(); + c0.connection.close(); // Verify all brokers get wiring update. for (size_t i = 0; i < cluster.size(); ++i) { BOOST_MESSAGE("i == "<< i); - Client c(cluster[i].getPort()); + Client c(cluster[i]); BOOST_CHECK_EQUAL("q", c.session.queueQuery("q").getQueue()); BOOST_CHECK_EQUAL("direct", c.session.exchangeQuery("ex").getType()); } @@ -177,12 +212,12 @@ QPID_AUTO_TEST_CASE(testWiringReplication) { QPID_AUTO_TEST_CASE(testMessageEnqueue) { // Enqueue on one broker, dequeue on another. ClusterFixture cluster(2); - Client c0(cluster[0].getPort()); + Client c0(cluster[0]); c0.session.queueDeclare("q"); c0.session.messageTransfer(arg::content=TransferContent("foo", "q")); c0.session.messageTransfer(arg::content=TransferContent("bar", "q")); c0.session.close(); - Client c1(cluster[1].getPort()); + Client c1(cluster[1]); Message msg; BOOST_CHECK(c1.subs.get(msg, "q", qpid::sys::TIME_SEC)); BOOST_CHECK_EQUAL(string("foo"), msg.getData()); @@ -190,10 +225,14 @@ QPID_AUTO_TEST_CASE(testMessageEnqueue) { BOOST_CHECK_EQUAL(string("bar"), msg.getData()); } +#if 0 + +// FIXME aconway 2008-07-16: Implement cluster dequeue notification, enable this test. + QPID_AUTO_TEST_CASE(testMessageDequeue) { // Enqueue on one broker, dequeue on two others. ClusterFixture cluster (3); - Client c0(cluster[0].getPort()); + Client c0(cluster[0]); c0.session.queueDeclare("q"); c0.session.messageTransfer(arg::content=TransferContent("foo", "q")); c0.session.messageTransfer(arg::content=TransferContent("bar", "q")); @@ -201,11 +240,11 @@ QPID_AUTO_TEST_CASE(testMessageDequeue) { Message msg; - Client c1(cluster[1].getPort()); + Client c1(cluster[1]); BOOST_CHECK(c1.subs.get(msg, "q")); BOOST_CHECK_EQUAL("foo", msg.getData()); - Client c2(cluster[2].getPort()); + Client c2(cluster[2]); BOOST_CHECK(c1.subs.get(msg, "q")); BOOST_CHECK_EQUAL("bar", msg.getData()); QueueQueryResult r = c2.session.queueQuery("q"); @@ -214,4 +253,6 @@ QPID_AUTO_TEST_CASE(testMessageDequeue) { // TODO aconway 2008-06-25: failover. +#endif + QPID_AUTO_TEST_SUITE_END() |