diff options
author | Alan Conway <aconway@apache.org> | 2008-11-04 16:03:03 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2008-11-04 16:03:03 +0000 |
commit | cc9bb48246251fa3941368be66731a533cdb385e (patch) | |
tree | 60e5abb847dd6c7380be2eb565609a13ed421fb3 /cpp/src | |
parent | 99b6a9983bf013af72bcb41a68466b343d3e1513 (diff) | |
download | qpid-python-cc9bb48246251fa3941368be66731a533cdb385e.tar.gz |
Allow local broker to be run in any position in a ClusterFixture.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@711283 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src')
-rw-r--r-- | cpp/src/tests/cluster_test.cpp | 161 |
1 files changed, 77 insertions, 84 deletions
diff --git a/cpp/src/tests/cluster_test.cpp b/cpp/src/tests/cluster_test.cpp index 5f18d0ff90..eeedbf5ec5 100644 --- a/cpp/src/tests/cluster_test.cpp +++ b/cpp/src/tests/cluster_test.cpp @@ -35,7 +35,7 @@ #include "qpid/log/Logger.h" #include <boost/bind.hpp> -#include <boost/ptr_container/ptr_vector.hpp> +#include <boost/shared_ptr.hpp> #include <string> #include <iostream> @@ -47,6 +47,7 @@ namespace qpid { namespace cluster { +// FIXME aconway 2008-11-04: remove. Cluster& getGlobalCluster(); // Defined in qpid/cluster/ClusterPlugin.cpp }} // namespace qpid::cluster @@ -66,7 +67,7 @@ using namespace qpid::framing; using namespace qpid::client; using qpid::sys::TIME_SEC; using qpid::broker::Broker; -using boost::ptr_vector; +using boost::shared_ptr; using qpid::cluster::Cluster; using qpid::cluster::getGlobalCluster; @@ -79,24 +80,34 @@ Broker::Options parseOpts(size_t argc, const char* argv[]) { } /** Cluster fixture is a vector of ports for the replicas. - * Replica 0 is in the current process, all others are forked as children. + * + * At most one replica (by default replica 0) is in the current + * process, all others are forked as children. */ -struct ClusterFixture : public vector<uint16_t> { +class ClusterFixture : public vector<uint16_t> { string name; - std::auto_ptr<BrokerFixture> broker0; - boost::ptr_vector<ForkedBroker> forkedBrokers; - bool init0; + std::auto_ptr<BrokerFixture> localBroker; + int localIndex; + std::vector<shared_ptr<ForkedBroker> > forkedBrokers; - ClusterFixture(size_t n, bool init0=true); + public: + /** @param localIndex can be -1 meaning don't automatically start a local broker. + * A local broker can be started with addLocal(). + */ + ClusterFixture(size_t n, int localIndex=0); void add(size_t n) { for (size_t i=0; i < n; ++i) add(); } - void add(); - void add0(bool force); + void add(); // Add a broker. + void addLocal(); // Add a local broker. void setup(); - /** Kill a forked broker with sig, or shutdown broker0 if n==0. */ + bool hasLocal() const { return localIndex >= 0 && size_t(localIndex) < size(); } + + /** Kill a forked broker with sig, or shutdown localBroker if n==0. */ void kill(size_t n, int sig=SIGINT) { - if (n) forkedBrokers[n-1].kill(sig); - else broker0->broker->shutdown(); + if (n == size_t(localIndex)) + localBroker->broker->shutdown(); + else + forkedBrokers[n]->kill(sig); } /** Kill a broker and suppress errors from connection. */ @@ -105,51 +116,34 @@ struct ClusterFixture : public vector<uint16_t> { kill(n,sig); try { c.close(); } catch(...) {} } - - void waitFor(size_t n) { - for (size_t retry = 1000; retry && getGlobalCluster().getUrls().size() != n; --retry) - ::usleep(1000); - } }; -ClusterFixture::ClusterFixture(size_t n, bool init0_) : name(Uuid(true).str()), init0(init0_) { +ClusterFixture::ClusterFixture(size_t n, int localIndex_) : name(Uuid(true).str()), localIndex(localIndex_) { add(n); - if (!init0) return; // Defer initialization of broker0 - // Wait for all n members to join the cluster - waitFor(n); - BOOST_REQUIRE_EQUAL(n, getGlobalCluster().getUrls().size()); } void ClusterFixture::add() { - std::ostringstream os; - os << "fork" << size(); - std::string prefix = os.str(); - - if (size()) { // Not the first broker, fork. - + if (size() != size_t(localIndex)) { // fork a broker process. + std::ostringstream os; os << "fork" << size(); const char* argv[] = { "qpidd " __FILE__ , "--load-module=../.libs/cluster.so", "--cluster-name", name.c_str(), "--auth=no", "--no-data-dir", - "--log-prefix", prefix.c_str(), + "--log-prefix", os.str().c_str(), }; size_t argc = sizeof(argv)/sizeof(argv[0]); - - - forkedBrokers.push_back(new ForkedBroker(argc, argv)); - push_back(forkedBrokers.back().getPort()); + forkedBrokers.push_back(shared_ptr<ForkedBroker>(new ForkedBroker(argc, argv))); + push_back(forkedBrokers.back()->getPort()); } - else { - add0(init0); // First broker, run in this process. + else { // Run in this process + addLocal(); } } -void ClusterFixture::add0(bool init) { - if (!init) { - push_back(0); - return; - } +void ClusterFixture::addLocal() { + assert(int(size()) == localIndex || localIndex == -1); + localIndex = size(); const char* argv[] = { "qpidd " __FILE__ , "--load-module=../.libs/cluster.so", @@ -157,10 +151,11 @@ void ClusterFixture::add0(bool init) { "--auth=no", "--no-data-dir" }; size_t argc = sizeof(argv)/sizeof(argv[0]); - - qpid::log::Logger::instance().setPrefix("main"); - broker0.reset(new BrokerFixture(parseOpts(argc, argv))); - if (size()) front() = broker0->getPort(); else push_back(broker0->getPort()); + ostringstream os; os << "local" << localIndex; + qpid::log::Logger::instance().setPrefix(os.str()); + localBroker.reset(new BrokerFixture(parseOpts(argc, argv))); + push_back(localBroker->getPort()); + forkedBrokers.push_back(shared_ptr<ForkedBroker>()); } ostream& operator<<(ostream& o, const cpg_name* n) { @@ -188,14 +183,15 @@ template <class C> set<uint16_t> makeSet(const C& c) { template <class T> std::set<uint16_t> knownBrokerPorts(T& source, int n=-1) { vector<Url> urls = source.getKnownBrokers(); - BOOST_MESSAGE("knownBrokerPorts " << n << ": " << urls); - if (n >= 0) { - for (size_t retry=10; urls.size() != unsigned(n) && retry != 0; --retry) { - ::usleep(100000); + if (n >= 0 && unsigned(n) != urls.size()) { + BOOST_MESSAGE("knownBrokerPorts waiting for " << n << ": " << urls); + // Retry up to 10 secs in .1 second intervals. + for (size_t retry=100; urls.size() != unsigned(n) && retry != 0; --retry) { + ::usleep(1000*100); // 0.1 secs urls = source.getKnownBrokers(); - BOOST_MESSAGE("knownBrokerPorts retry: " << urls); } } + BOOST_MESSAGE("knownBrokerPorts expecting " << n << ": " << urls); set<uint16_t> s; for (vector<Url>::const_iterator i = urls.begin(); i != urls.end(); ++i) s.insert((*i)[0].get<TcpAddress>()->port); @@ -266,7 +262,6 @@ QPID_AUTO_TEST_CASE(testUnacked) { // Add new member while there are unacked messages. cluster.add(); - cluster.waitFor(2); Client c1(cluster[1], "c1"); // Check queue counts @@ -336,7 +331,7 @@ QPID_AUTO_TEST_CASE(testDumpMessageBuilder) { // No reliable way to ensure the partial message has arrived // before we start the new broker, so we sleep. - ::usleep(250); + ::usleep(2500); cluster.add(); // Send final 2 frames of message. @@ -378,7 +373,6 @@ QPID_AUTO_TEST_CASE(testConnectionKnownHosts) { BOOST_CHECK_EQUAL(kb2,kb1); cluster.killWithSilencer(1,c1.connection,9); - cluster.waitFor(2); kb0 = knownBrokerPorts(c0.connection, 2); kb2 = knownBrokerPorts(c2.connection, 2); BOOST_CHECK_EQUAL(kb0.size(), 2u); @@ -386,58 +380,54 @@ QPID_AUTO_TEST_CASE(testConnectionKnownHosts) { } QPID_AUTO_TEST_CASE(DumpConsumers) { - ClusterFixture cluster(1, false); // Don't init broker 0 + ClusterFixture cluster(1, 1); - cluster.add(); - Client c1(cluster[1], "c1"); - c1.session.queueDeclare("p"); - c1.session.queueDeclare("q"); - c1.subs.subscribe(c1.lq, "q", FlowControl::zero()); + Client c0(cluster[0], "c0"); + c0.session.queueDeclare("p"); + c0.session.queueDeclare("q"); + c0.subs.subscribe(c0.lq, "q", FlowControl::zero()); LocalQueue lp; - c1.subs.subscribe(lp, "p", FlowControl::messageCredit(1)); - c1.session.sync(); + c0.subs.subscribe(lp, "p", FlowControl::messageCredit(1)); + c0.session.sync(); // Start new members - cluster.add0(true); - Client c0(cluster[0], "c0"); + cluster.add(); // Local + Client c1(cluster[1], "c1"); cluster.add(); Client c2(cluster[2], "c2"); // Transfer messages - c1.session.messageTransfer(arg::content=Message("aaa", "q")); - BOOST_CHECK_EQUAL(c0.session.queueQuery("q").getMessageCount(), 1u); - BOOST_CHECK_EQUAL(c1.session.queueQuery("q").getMessageCount(), 1u); - BOOST_CHECK_EQUAL(c2.session.queueQuery("q").getMessageCount(), 1u); + c0.session.messageTransfer(arg::content=Message("aaa", "q")); + + c0.session.messageTransfer(arg::content=Message("bbb", "p")); + c0.session.messageTransfer(arg::content=Message("ccc", "p")); - c1.session.messageTransfer(arg::content=Message("bbb", "p")); - c1.session.messageTransfer(arg::content=Message("ccc", "p")); - // Activate the subscription, ensure message removed on all queues. - c1.subs.setFlowControl("q", FlowControl::unlimited()); + c0.subs.setFlowControl("q", FlowControl::unlimited()); Message m; - BOOST_CHECK(c1.lq.get(m, TIME_SEC)); + BOOST_CHECK(c0.lq.get(m, TIME_SEC)); BOOST_CHECK_EQUAL(m.getData(), "aaa"); - BOOST_CHECK_EQUAL(c1.session.queueQuery("q").getMessageCount(), 0u); + BOOST_CHECK_EQUAL(c0.session.queueQuery("q").getMessageCount(), 0u); BOOST_CHECK_EQUAL(c1.session.queueQuery("q").getMessageCount(), 0u); BOOST_CHECK_EQUAL(c2.session.queueQuery("q").getMessageCount(), 0u); - // Check second subscription's flow control: getsnn first message, not second. + // Check second subscription's flow control: gets first message, not second. BOOST_CHECK(lp.get(m, TIME_SEC)); BOOST_CHECK_EQUAL(m.getData(), "bbb"); - BOOST_CHECK_EQUAL(c1.session.queueQuery("p").getMessageCount(), 1u); + BOOST_CHECK_EQUAL(c0.session.queueQuery("p").getMessageCount(), 1u); BOOST_CHECK_EQUAL(c1.session.queueQuery("p").getMessageCount(), 1u); BOOST_CHECK_EQUAL(c2.session.queueQuery("p").getMessageCount(), 1u); - BOOST_CHECK(c1.subs.get(m, "p", TIME_SEC)); + BOOST_CHECK(c0.subs.get(m, "p", TIME_SEC)); BOOST_CHECK_EQUAL(m.getData(), "ccc"); // Kill the subscribing member, ensure further messages are not removed. - cluster.killWithSilencer(1,c1.connection,9); - cluster.waitFor(2); + cluster.killWithSilencer(0,c0.connection,9); + BOOST_REQUIRE_EQUAL(knownBrokerPorts(c1.connection, 2).size(), 2); for (int i = 0; i < 10; ++i) { - c0.session.messageTransfer(arg::content=Message("bbb", "q")); - BOOST_REQUIRE(c0.subs.get(m, "q", TIME_SEC)); - BOOST_REQUIRE_EQUAL(m.getData(), "bbb"); + c1.session.messageTransfer(arg::content=Message("xxx", "q")); + BOOST_REQUIRE(c1.subs.get(m, "q", TIME_SEC)); + BOOST_REQUIRE_EQUAL(m.getData(), "xxx"); } } @@ -450,7 +440,7 @@ QPID_AUTO_TEST_CASE(testCatchupSharedState) { c0.session.messageTransfer(arg::content=Message("foo","q")); c0.session.messageTransfer(arg::content=Message("bar","q")); while (c0.session.queueQuery("q").getMessageCount() != 2) - ::usleep(1000); // Wait for message to show up on broker 0. + ::usleep(1000); // Wait for message to show up on broker 0. // Add a new broker, it should catch up. cluster.add(); @@ -460,7 +450,7 @@ QPID_AUTO_TEST_CASE(testCatchupSharedState) { c0.session.messageTransfer(arg::content=Message("pfoo","p")); // Do some work post-join - cluster.waitFor(2); + BOOST_REQUIRE_EQUAL(knownBrokerPorts(c0.connection, 2).size(), 2); c0.session.messageTransfer(arg::content=Message("pbar","p")); // Verify new brokers have state. @@ -545,10 +535,13 @@ QPID_AUTO_TEST_CASE(testMessageDequeue) { QPID_AUTO_TEST_CASE(testDequeueWaitingSubscription) { ClusterFixture cluster(3); - // First start a subscription. Client c0(cluster[0]); + BOOST_REQUIRE_EQUAL(knownBrokerPorts(c0.connection, 3).size(), 3u); // Wait for brokers. + + // First start a subscription. c0.session.queueDeclare("q"); c0.subs.subscribe(c0.lq, "q", FlowControl::messageCredit(2)); + // Now send messages Client c1(cluster[1]); c1.session.messageTransfer(arg::content=Message("foo", "q")); |