diff options
author | Alan Conway <aconway@apache.org> | 2008-07-17 00:03:50 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2008-07-17 00:03:50 +0000 |
commit | e65b0086a2924ff04640b1350393a816249d01b3 (patch) | |
tree | b372c5386cc44e3ad16c4ae585088ed038a629e4 /cpp/src/tests | |
parent | e596837411d54a16dd3cb1e5de717664496c2bd0 (diff) | |
download | qpid-python-e65b0086a2924ff04640b1350393a816249d01b3.tar.gz |
Cluster: shadow connections, fix lifecycle & valgrind issues.
- tests/ForkedBroker: improved broker forking, exec full qpidd.
- Plugin::addFinalizer - more flexible way to shutdown plugins.
- Reworked cluster extension points using boost::function.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@677471 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/tests')
-rw-r--r-- | cpp/src/tests/ForkedBroker.h | 100 | ||||
-rw-r--r-- | cpp/src/tests/cluster.mk | 5 | ||||
-rw-r--r-- | cpp/src/tests/cluster_test.cpp | 93 | ||||
-rwxr-xr-x | cpp/src/tests/run_test | 4 |
4 files changed, 121 insertions, 81 deletions
diff --git a/cpp/src/tests/ForkedBroker.h b/cpp/src/tests/ForkedBroker.h index e01034c355..6c20330c28 100644 --- a/cpp/src/tests/ForkedBroker.h +++ b/cpp/src/tests/ForkedBroker.h @@ -1,5 +1,5 @@ #ifndef TESTS_FORKEDBROKER_H -#define TESTS_FORKEDBROKER_H + /* * @@ -23,16 +23,11 @@ */ #include "qpid/Exception.h" -#include "qpid/sys/Fork.h" -#include "qpid/log/Logger.h" +#include "qpid/log/Statement.h" #include "qpid/broker/Broker.h" -#include "qpid/broker/SignalHandler.h" - #include <boost/lexical_cast.hpp> - #include <string> - -#include <signal.h> +#include <stdio.h> #include <sys/wait.h> /** @@ -48,63 +43,66 @@ * process.) * */ -class ForkedBroker : public qpid::sys::ForkWithMessage { - pid_t pid; - uint16_t port; - qpid::broker::Broker::Options opts; - std::string prefix; - +class ForkedBroker { public: - struct ChildExit {}; // Thrown in child processes. + ForkedBroker(std::vector<const char*> argv) { init(argv); } - ForkedBroker(const qpid::broker::Broker::Options& opts_=qpid::broker::Broker::Options(), - const std::string& prefix_=std::string()) - : pid(0), port(0), opts(opts_), prefix(prefix_) { fork(); } + ForkedBroker(int argc, const char* const argv[]) { + std::vector<const char*> args(argv, argv+argc); + init(args); + } ~ForkedBroker() { - try { stop(); } - catch(const std::exception& e) { - QPID_LOG(error, e.what()); + try { stop(); } catch(const std::exception& e) { + QPID_LOG(error, QPID_MSG("Stopping forked broker: " << e.what())); } } void stop() { - if (pid > 0) { // I am the parent, clean up children. - if (::kill(pid, SIGINT) < 0) - throw qpid::Exception(QPID_MSG("Can't kill process " << pid << ": " << qpid::strError(errno))); - int status = 0; - if (::waitpid(pid, &status, 0) < 0) - throw qpid::Exception(QPID_MSG("Waiting for process " << pid << ": " << qpid::strError(errno))); - if (WEXITSTATUS(status) != 0) - throw qpid::Exception(QPID_MSG("Process " << pid << " exited with status: " << WEXITSTATUS(status))); - } + using qpid::ErrnoException; + if (pid == 0) return; + if (::kill(pid, SIGINT) < 0) throw ErrnoException("kill failed"); + int status; + if (::waitpid(pid, &status, 0) < 0) throw ErrnoException("wait for forked process failed"); + if (WEXITSTATUS(status) != 0) + throw qpid::Exception(QPID_MSG("forked broker exited with: " << WEXITSTATUS(status))); + pid = 0; } - void parent(pid_t pid_) { - pid = pid_; - qpid::log::Logger::instance().setPrefix("parent"); - std::string portStr = wait(5); - port = boost::lexical_cast<uint16_t>(portStr); - } + uint16_t getPort() { return port; } - void child() { - prefix += boost::lexical_cast<std::string>(long(getpid())); - qpid::log::Logger::instance().setPrefix(prefix); - opts.port = 0; - boost::intrusive_ptr<qpid::broker::Broker> broker(new qpid::broker::Broker(opts)); - qpid::broker::SignalHandler::setBroker(broker); - QPID_LOG(info, "ForkedBroker started on " << broker->getPort()); - ready(boost::lexical_cast<std::string>(broker->getPort())); // Notify parent. - broker->run(); - QPID_LOG(notice, "ForkedBroker exiting."); + private: - // Force exit in the child process, otherwise we will try to - // carry with parent tests. - broker = 0; // Run broker dtor before we exit. - exit(0); + void init(const std::vector<const char*>& args) { + using qpid::ErrnoException; + pid = 0; + port = 0; + int pipeFds[2]; + if(::pipe(pipeFds) < 0) throw ErrnoException("Can't create pipe"); + pid = ::fork(); + if (pid < 0) throw ErrnoException("Fork failed"); + if (pid) { // parent + ::close(pipeFds[1]); + FILE* f = ::fdopen(pipeFds[0], "r"); + if (!f) throw ErrnoException("fopen failed"); + if (::fscanf(f, "%d", &port) != 1) throw ErrnoException("fscanf failed"); + } + else { // child + ::close(pipeFds[0]); + int fd = ::dup2(pipeFds[1], 1); + if (fd < 0) throw ErrnoException("dup2 failed"); + const char* prog = "../qpidd"; + std::vector<const char*> args2(args); + args2.push_back("--port=0"); + args2.push_back("--mgmt-enable=no"); // TODO aconway 2008-07-16: why does mgmt cause problems? + args2.push_back(0); + execv(prog, const_cast<char* const*>(&args2[0])); + throw ErrnoException("execv failed"); + } } - uint16_t getPort() { return port; } + pid_t pid; + int port; }; #endif /*!TESTS_FORKEDBROKER_H*/ diff --git a/cpp/src/tests/cluster.mk b/cpp/src/tests/cluster.mk index da5b4d6e90..9190eee4e5 100644 --- a/cpp/src/tests/cluster.mk +++ b/cpp/src/tests/cluster.mk @@ -10,9 +10,8 @@ lib_cluster = $(abs_builddir)/../libqpidcluster.la # -# FIXME aconway 2008-07-04: disabled till process leak is plugged. -# ais_check checks conditions for cluster tests and run them if ok. -#TESTS+=ais_check +# ais_check checks pre-requisites for cluster tests and runs them if ok. +TESTS+=ais_check EXTRA_DIST+=ais_check check_PROGRAMS+=cluster_test diff --git a/cpp/src/tests/cluster_test.cpp b/cpp/src/tests/cluster_test.cpp index d361919f0b..cafac489d2 100644 --- a/cpp/src/tests/cluster_test.cpp +++ b/cpp/src/tests/cluster_test.cpp @@ -16,13 +16,13 @@ * */ - #include "test_tools.h" #include "unit_test.h" #include "ForkedBroker.h" #include "BrokerFixture.h" #include "qpid/cluster/Cpg.h" +#include "qpid/cluster/Cluster.h" #include "qpid/framing/AMQBody.h" #include "qpid/client/Connection.h" #include "qpid/client/Session.h" @@ -37,10 +37,13 @@ #include <vector> #include <algorithm> -#include <signal.h> +namespace qpid { +namespace cluster { +boost::intrusive_ptr<Cluster> getGlobalCluster(); // Defined in qpid/cluster/ClusterPlugin.cpp +}} // namespace qpid::cluster -QPID_AUTO_TEST_SUITE(CpgTestSuite) +QPID_AUTO_TEST_SUITE(CpgTestSuite) using namespace std; using namespace qpid; @@ -49,27 +52,60 @@ using namespace qpid::framing; using namespace qpid::client; using qpid::broker::Broker; using boost::ptr_vector; +using qpid::cluster::Cluster; +using qpid::cluster::getGlobalCluster; -struct ClusterFixture : public ptr_vector<ForkedBroker> { +/** Cluster fixture is a vector of ports for the replicas. + * Replica 0 is in the current process, all others are forked as children. + */ +struct ClusterFixture : public vector<uint16_t> { string name; + Broker::Options opts; + std::auto_ptr<BrokerFixture> broker0; + boost::ptr_vector<ForkedBroker> forkedBrokers; - ClusterFixture(size_t n=0) : name(Uuid(true).str()) { add(n); } + ClusterFixture(size_t n); void add(size_t n) { for (size_t i=0; i < n; ++i) add(); } void add(); + void setup(); }; +ClusterFixture::ClusterFixture(size_t n) : name(Uuid(true).str()) { + add(n); + // Wait for all n members to join the cluster + int retry=20; // TODO aconway 2008-07-16: nasty sleeps, clean this up. + while (retry && getGlobalCluster()->size() != n) { + ::sleep(1); + --retry; + } + BOOST_CHECK_EQUAL(n, getGlobalCluster()->size()); +} + void ClusterFixture::add() { - broker::Broker::Options opts; - Plugin::addOptions(opts); // For cluster options. + std::ostringstream os; + os << "broker" << size(); + std::string prefix = os.str(); + const char* argv[] = { - "", "--cluster-name", name.c_str(), "--auth=no", "--no-data-dir" + "qpidd " __FILE__ , + "--load-module=../.libs/libqpidcluster.so", + "--cluster-name", name.c_str(), + "--auth=no", "--no-data-dir", + "--log-prefix", prefix.c_str(), }; - opts.parse(sizeof(argv)/sizeof(*argv), const_cast<char**>(argv)); - ostringstream prefix; - prefix << "b" << size() << "-"; - QPID_LOG(info, "ClusterFixture adding broker " << prefix.str()); - push_back(new ForkedBroker(opts, prefix.str())); - QPID_LOG(info, "ClusterFixture added broker " << prefix.str()); + size_t argc = sizeof(argv)/sizeof(argv[0]); + + if (size()) { // Not the first broker, fork. + forkedBrokers.push_back(new ForkedBroker(argc, argv)); + push_back(forkedBrokers.back().getPort()); + } + else { // First broker, run in this process. + Broker::Options opts; + Plugin::addOptions(opts); // Pick up cluster options. + opts.parse(argc, argv, "", true); // Allow-unknown for --load-module + broker0.reset(new BrokerFixture(opts)); + push_back(broker0->getPort()); + } } // For debugging: op << for CPG types. @@ -149,26 +185,25 @@ QPID_AUTO_TEST_CASE(CpgBasic) { QPID_AUTO_TEST_CASE(testForkedBroker) { // Verify the ForkedBroker works as expected. - Broker::Options opts; - opts.auth="no"; - opts.noDataDir=true; - ForkedBroker broker(opts); + const char* argv[] = { "", "--auth=no", "--no-data-dir", "--log-prefix=testForkedBroker" }; + ForkedBroker broker(sizeof(argv)/sizeof(argv[0]), argv); Client c(broker.getPort()); BOOST_CHECK_EQUAL("direct", c.session.exchangeQuery("amq.direct").getType()); } QPID_AUTO_TEST_CASE(testWiringReplication) { - ClusterFixture cluster(2); // FIXME aconway 2008-07-02: 3 brokers - Client c0(cluster[0].getPort()); + ClusterFixture cluster(3); + Client c0(cluster[0]); BOOST_CHECK(c0.session.queueQuery("q").getQueue().empty()); BOOST_CHECK(c0.session.exchangeQuery("ex").getType().empty()); c0.session.queueDeclare("q"); c0.session.exchangeDeclare("ex", arg::type="direct"); c0.session.close(); + c0.connection.close(); // Verify all brokers get wiring update. for (size_t i = 0; i < cluster.size(); ++i) { BOOST_MESSAGE("i == "<< i); - Client c(cluster[i].getPort()); + Client c(cluster[i]); BOOST_CHECK_EQUAL("q", c.session.queueQuery("q").getQueue()); BOOST_CHECK_EQUAL("direct", c.session.exchangeQuery("ex").getType()); } @@ -177,12 +212,12 @@ QPID_AUTO_TEST_CASE(testWiringReplication) { QPID_AUTO_TEST_CASE(testMessageEnqueue) { // Enqueue on one broker, dequeue on another. ClusterFixture cluster(2); - Client c0(cluster[0].getPort()); + Client c0(cluster[0]); c0.session.queueDeclare("q"); c0.session.messageTransfer(arg::content=TransferContent("foo", "q")); c0.session.messageTransfer(arg::content=TransferContent("bar", "q")); c0.session.close(); - Client c1(cluster[1].getPort()); + Client c1(cluster[1]); Message msg; BOOST_CHECK(c1.subs.get(msg, "q", qpid::sys::TIME_SEC)); BOOST_CHECK_EQUAL(string("foo"), msg.getData()); @@ -190,10 +225,14 @@ QPID_AUTO_TEST_CASE(testMessageEnqueue) { BOOST_CHECK_EQUAL(string("bar"), msg.getData()); } +#if 0 + +// FIXME aconway 2008-07-16: Implement cluster dequeue notification, enable this test. + QPID_AUTO_TEST_CASE(testMessageDequeue) { // Enqueue on one broker, dequeue on two others. ClusterFixture cluster (3); - Client c0(cluster[0].getPort()); + Client c0(cluster[0]); c0.session.queueDeclare("q"); c0.session.messageTransfer(arg::content=TransferContent("foo", "q")); c0.session.messageTransfer(arg::content=TransferContent("bar", "q")); @@ -201,11 +240,11 @@ QPID_AUTO_TEST_CASE(testMessageDequeue) { Message msg; - Client c1(cluster[1].getPort()); + Client c1(cluster[1]); BOOST_CHECK(c1.subs.get(msg, "q")); BOOST_CHECK_EQUAL("foo", msg.getData()); - Client c2(cluster[2].getPort()); + Client c2(cluster[2]); BOOST_CHECK(c1.subs.get(msg, "q")); BOOST_CHECK_EQUAL("bar", msg.getData()); QueueQueryResult r = c2.session.queueQuery("q"); @@ -214,4 +253,6 @@ QPID_AUTO_TEST_CASE(testMessageDequeue) { // TODO aconway 2008-06-25: failover. +#endif + QPID_AUTO_TEST_SUITE_END() diff --git a/cpp/src/tests/run_test b/cpp/src/tests/run_test index 8fbaaaee07..4d0da15d4c 100755 --- a/cpp/src/tests/run_test +++ b/cpp/src/tests/run_test @@ -38,9 +38,11 @@ VALGRIND_OPTS=" --demangle=yes --suppressions=$srcdir/.valgrind.supp --num-callers=25 ---trace-children=yes --log-file=$VG_LOG -- " +# FIXME aconway 2008-07-16: removed --trace-children=yes, problems with cluster tests forking +# qpidd libtool script. Investigate & restore --trace-children if possible. + ERROR=0 if grep -l "^# Generated by .*libtool" "$1" >/dev/null 2>&1; then # This is a libtool "executable". Valgrind it if VALGRIND specified. |