diff options
Diffstat (limited to 'cpp/src/tests')
-rw-r--r-- | cpp/src/tests/BrokerFixture.h | 10 | ||||
-rw-r--r-- | cpp/src/tests/ClusterFixture.cpp | 32 | ||||
-rw-r--r-- | cpp/src/tests/ClusterFixture.h | 18 | ||||
-rw-r--r-- | cpp/src/tests/ForkedBroker.cpp | 26 | ||||
-rw-r--r-- | cpp/src/tests/ForkedBroker.h | 1 | ||||
-rw-r--r-- | cpp/src/tests/Makefile.am | 8 | ||||
-rw-r--r-- | cpp/src/tests/PartialFailure.cpp | 222 | ||||
-rw-r--r-- | cpp/src/tests/cluster.mk | 6 | ||||
-rw-r--r-- | cpp/src/tests/cluster_test.cpp | 30 | ||||
-rwxr-xr-x | cpp/src/tests/clustered_replication_test | 1 | ||||
-rw-r--r-- | cpp/src/tests/failover_soak.cpp | 61 | ||||
-rwxr-xr-x | cpp/src/tests/run_failover_soak | 1 | ||||
-rwxr-xr-x | cpp/src/tests/start_cluster | 12 | ||||
-rw-r--r-- | cpp/src/tests/test_store.cpp | 146 |
14 files changed, 468 insertions, 106 deletions
diff --git a/cpp/src/tests/BrokerFixture.h b/cpp/src/tests/BrokerFixture.h index f55560739d..b32b7f44ba 100644 --- a/cpp/src/tests/BrokerFixture.h +++ b/cpp/src/tests/BrokerFixture.h @@ -114,10 +114,12 @@ struct ClientT { SessionType session; qpid::client::SubscriptionManager subs; qpid::client::LocalQueue lq; - ClientT(uint16_t port, const std::string& name=std::string()) - : connection(port), session(connection.newSession(name)), subs(session) {} - ClientT(const qpid::client::ConnectionSettings& settings, const std::string& name=std::string()) - : connection(settings), session(connection.newSession(name)), subs(session) {} + std::string name; + + ClientT(uint16_t port, const std::string& name_=std::string()) + : connection(port), session(connection.newSession(name_)), subs(session), name(name_) {} + ClientT(const qpid::client::ConnectionSettings& settings, const std::string& name_=std::string()) + : connection(settings), session(connection.newSession(name_)), subs(session), name(name_) {} ~ClientT() { connection.close(); } }; diff --git a/cpp/src/tests/ClusterFixture.cpp b/cpp/src/tests/ClusterFixture.cpp index 5658957b48..d49be76f79 100644 --- a/cpp/src/tests/ClusterFixture.cpp +++ b/cpp/src/tests/ClusterFixture.cpp @@ -67,16 +67,23 @@ ClusterFixture::ClusterFixture(size_t n, int localIndex_, const Args& args_) add(n); } +ClusterFixture::ClusterFixture(size_t n, int localIndex_, boost::function<void (Args&, size_t)> updateArgs_) + : name(Uuid(true).str()), localIndex(localIndex_), updateArgs(updateArgs_) +{ + add(n); +} + const ClusterFixture::Args ClusterFixture::DEFAULT_ARGS = list_of<string>("--auth=no")("--no-data-dir"); -ClusterFixture::Args ClusterFixture::makeArgs(const std::string& prefix) { +ClusterFixture::Args ClusterFixture::makeArgs(const std::string& prefix, size_t index) { Args args = list_of<string>("qpidd " __FILE__) ("--no-module-dir") ("--load-module=../.libs/cluster.so") ("--cluster-name")(name) ("--log-prefix")(prefix); args.insert(args.end(), userArgs.begin(), userArgs.end()); + if (updateArgs) updateArgs(args, index); return args; } @@ -84,7 +91,7 @@ void ClusterFixture::add() { if (size() != size_t(localIndex)) { // fork a broker process. std::ostringstream os; os << "fork" << size(); std::string prefix = os.str(); - forkedBrokers.push_back(shared_ptr<ForkedBroker>(new ForkedBroker(makeArgs(prefix)))); + forkedBrokers.push_back(shared_ptr<ForkedBroker>(new ForkedBroker(makeArgs(prefix, size())))); push_back(forkedBrokers.back()->getPort()); } else { // Run in this process @@ -106,7 +113,7 @@ void ClusterFixture::addLocal() { assert(int(size()) == localIndex); ostringstream os; os << "local" << localIndex; string prefix = os.str(); - Args args(makeArgs(prefix)); + Args args(makeArgs(prefix, localIndex)); vector<const char*> argv(args.size()); transform(args.begin(), args.end(), argv.begin(), boost::bind(&string::c_str, _1)); qpid::log::Logger::instance().setPrefix(prefix); @@ -131,3 +138,22 @@ void ClusterFixture::killWithSilencer(size_t n, client::Connection& c, int sig) kill(n,sig); try { c.close(); } catch(...) {} } + +/** + * Get the known broker ports from a Connection. + *@param n if specified wait for the cluster size to be n, up to a timeout. + */ +std::set<int> knownBrokerPorts(qpid::client::Connection& source, int n) { + std::vector<qpid::Url> urls = source.getKnownBrokers(); + if (n >= 0 && unsigned(n) != urls.size()) { + // Retry up to 10 secs in .1 second intervals. + for (size_t retry=100; urls.size() != unsigned(n) && retry != 0; --retry) { + qpid::sys::usleep(1000*100); // 0.1 secs + urls = source.getKnownBrokers(); + } + } + std::set<int> s; + for (std::vector<qpid::Url>::const_iterator i = urls.begin(); i != urls.end(); ++i) + s.insert((*i)[0].get<qpid::TcpAddress>()->port); + return s; +} diff --git a/cpp/src/tests/ClusterFixture.h b/cpp/src/tests/ClusterFixture.h index 84fb9f2202..75d39fc7e5 100644 --- a/cpp/src/tests/ClusterFixture.h +++ b/cpp/src/tests/ClusterFixture.h @@ -38,6 +38,7 @@ #include "qpid/log/Logger.h" #include <boost/bind.hpp> +#include <boost/function.hpp> #include <boost/shared_ptr.hpp> #include <string> @@ -69,33 +70,44 @@ using qpid::cluster::Cluster; class ClusterFixture : public vector<uint16_t> { public: typedef std::vector<std::string> Args; + static const Args DEFAULT_ARGS; + /** @param localIndex can be -1 meaning don't automatically start a local broker. * A local broker can be started with addLocal(). */ ClusterFixture(size_t n, int localIndex=0, const Args& args=DEFAULT_ARGS); + + /**@param updateArgs function is passed the index of the cluster member and can update the arguments. */ + ClusterFixture(size_t n, int localIndex, boost::function<void (Args&, size_t)> updateArgs); + void add(size_t n) { for (size_t i=0; i < n; ++i) add(); } void add(); // Add a broker. void setup(); bool hasLocal() const; - /** Kill a forked broker with sig, or shutdown localBroker if n==0. */ + /** Kill a forked broker with sig, or shutdown localBroker. */ void kill(size_t n, int sig=SIGINT); /** Kill a broker and suppressing errors from closing connection c. */ void killWithSilencer(size_t n, client::Connection& c, int sig=SIGINT); private: - static const Args DEFAULT_ARGS; void addLocal(); // Add a local broker. - Args makeArgs(const std::string& prefix); + Args makeArgs(const std::string& prefix, size_t index); string name; std::auto_ptr<BrokerFixture> localBroker; int localIndex; std::vector<shared_ptr<ForkedBroker> > forkedBrokers; Args userArgs; + boost::function<void (Args&, size_t)> updateArgs; }; +/** + * Get the known broker ports from a Connection. + *@param n if specified wait for the cluster size to be n, up to a timeout. + */ +std::set<int> knownBrokerPorts(qpid::client::Connection& source, int n=-1); #endif /*!CLUSTER_FIXTURE_H*/ diff --git a/cpp/src/tests/ForkedBroker.cpp b/cpp/src/tests/ForkedBroker.cpp index f90f76aeb2..383dcc496c 100644 --- a/cpp/src/tests/ForkedBroker.cpp +++ b/cpp/src/tests/ForkedBroker.cpp @@ -20,12 +20,17 @@ */ #include "ForkedBroker.h" +#include "qpid/log/Statement.h" #include <boost/bind.hpp> +#include <boost/algorithm/string.hpp> #include <algorithm> #include <stdlib.h> #include <sys/types.h> #include <signal.h> +using namespace std; +using qpid::ErrnoException; + ForkedBroker::ForkedBroker(const Args& args) { init(args); } ForkedBroker::ForkedBroker(int argc, const char* const argv[]) { init(Args(argv, argc+argv)); } @@ -42,14 +47,25 @@ void ForkedBroker::kill(int sig) { pid = 0; // Reset pid here in case of an exception. using qpid::ErrnoException; if (::kill(savePid, sig) < 0) - throw ErrnoException("kill failed"); + throw ErrnoException("kill failed"); int status; if (::waitpid(savePid, &status, 0) < 0 && sig != 9) throw ErrnoException("wait for forked process failed"); if (WEXITSTATUS(status) != 0 && sig != 9) throw qpid::Exception(QPID_MSG("Forked broker exited with: " << WEXITSTATUS(status))); } + +namespace std { +static ostream& operator<<(ostream& o, const ForkedBroker::Args& a) { + copy(a.begin(), a.end(), ostream_iterator<string>(o, " ")); + return o; +} +bool isLogOption(const std::string& s) { + return boost::starts_with(s, "--log-enable") || boost::starts_with(s, "--trace"); +} + +} void ForkedBroker::init(const Args& userArgs) { using qpid::ErrnoException; @@ -70,17 +86,19 @@ void ForkedBroker::init(const Args& userArgs) { } else { // child ::close(pipeFds[0]); - // FIXME aconway 2009-02-12: int fd = ::dup2(pipeFds[1], 1); // pipe stdout to the parent. if (fd < 0) throw ErrnoException("dup2 failed"); const char* prog = "../qpidd"; Args args(userArgs); args.push_back("--port=0"); - if (!::getenv("QPID_TRACE") && !::getenv("QPID_LOG_ENABLE")) - args.push_back("--log-enable=error+"); // Keep quiet except for errors. + // Keep quiet except for errors. + if (!::getenv("QPID_TRACE") && !::getenv("QPID_LOG_ENABLE") + && find_if(userArgs.begin(), userArgs.end(), isLogOption) == userArgs.end()) + args.push_back("--log-enable=error+"); std::vector<const char*> argv(args.size()); std::transform(args.begin(), args.end(), argv.begin(), boost::bind(&std::string::c_str, _1)); argv.push_back(0); + QPID_LOG(debug, "ForkedBroker exec " << prog << ": " << args); execv(prog, const_cast<char* const*>(&argv[0])); throw ErrnoException("execv failed"); } diff --git a/cpp/src/tests/ForkedBroker.h b/cpp/src/tests/ForkedBroker.h index 6f97fbdc09..e72f421563 100644 --- a/cpp/src/tests/ForkedBroker.h +++ b/cpp/src/tests/ForkedBroker.h @@ -53,6 +53,7 @@ class ForkedBroker { ~ForkedBroker(); void kill(int sig=SIGINT); + int wait(); // Wait for exit, return exit status. uint16_t getPort() { return port; } pid_t getPID() { return pid; } diff --git a/cpp/src/tests/Makefile.am b/cpp/src/tests/Makefile.am index 9a81ef18b3..4b59e8ebe9 100644 --- a/cpp/src/tests/Makefile.am +++ b/cpp/src/tests/Makefile.am @@ -110,11 +110,17 @@ endif # amqp_0_10/Map.cpp \ # amqp_0_10/handlers.cpp +TESTLIBFLAGS = -module -rpath $(abs_builddir) check_LTLIBRARIES += libshlibtest.la -libshlibtest_la_LDFLAGS = -module -rpath $(abs_builddir) +libshlibtest_la_LDFLAGS = $(TESTLIBFLAGS) libshlibtest_la_SOURCES = shlibtest.cpp +check_LTLIBRARIES += test_store.la +test_store_la_SOURCES = test_store.cpp +test_store_la_LIBADD = $(lib_broker) # FIXME aconway 2009-04-03: required? +test_store_la_LDFLAGS = $(TESTLIBFLAGS) + include cluster.mk if SSL include ssl.mk diff --git a/cpp/src/tests/PartialFailure.cpp b/cpp/src/tests/PartialFailure.cpp new file mode 100644 index 0000000000..f7187b2e77 --- /dev/null +++ b/cpp/src/tests/PartialFailure.cpp @@ -0,0 +1,222 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +/**@file Tests for partial failure in a cluster. + * Partial failure means some nodes experience a failure while others do not. + * In this case the failed nodes must shut down. + */ + +#include "test_tools.h" +#include "unit_test.h" +#include "ClusterFixture.h" +#include <boost/assign.hpp> +#include <boost/algorithm/string.hpp> +#include <boost/bind.hpp> + +QPID_AUTO_TEST_SUITE(PartialFailureTestSuite) + + using namespace std; +using namespace qpid; +using namespace qpid::cluster; +using namespace qpid::framing; +using namespace qpid::client; +using namespace qpid::client::arg; +using namespace boost::assign; +using broker::Broker; +using boost::shared_ptr; + +// Timeout for tests that wait for messages +const sys::Duration TIMEOUT=sys::TIME_SEC/4; + +static bool isLogOption(const std::string& s) { return boost::starts_with(s, "--log-enable"); } + +void updateArgs(ClusterFixture::Args& args, size_t index) { + ostringstream os; + os << "--test-store-name=s" << index; + args.push_back(os.str()); + args.push_back("--load-module=.libs/test_store.so"); + string dataDir("/tmp/PartialFailure.XXXXXX"); + if (!mkdtemp(const_cast<char*>(dataDir.c_str()))) + throw ErrnoException("Can't create data dir"); + args.push_back("--data-dir="+dataDir); + args.push_back("--auth=no"); + + // These tests generate errors deliberately, disable error logging unless a log env var is set. + if (!::getenv("QPID_TRACE") && !::getenv("QPID_LOG_ENABLE")) { + remove_if(args.begin(), args.end(), isLogOption); + args.push_back("--log-enable=critical+:DISABLED"); // hacky way to disable logs. + } +} + +Message pMessage(string data, string q) { + Message msg(data, q); + msg.getDeliveryProperties().setDeliveryMode(PERSISTENT); + return msg; +} + +void queueAndSub(Client& c) { + c.session.queueDeclare(c.name, durable=true); + c.subs.subscribe(c.lq, c.name); +} + +// Verify normal cluster-wide errors. +QPID_AUTO_TEST_CASE(testNormalErrors) { + // FIXME aconway 2009-04-10: Would like to put a scope just around + // the statements expected to fail (in BOOST_CHECK_THROW) but that + // sproadically lets out messages, possibly because they're in + // Connection thread. + ScopedSuppressLogging allQuiet; + + ClusterFixture cluster(3, -1, updateArgs); + Client c0(cluster[0], "c0"); + Client c1(cluster[1], "c1"); + Client c2(cluster[2], "c2"); + + queueAndSub(c0); + c0.session.messageTransfer(content=Message("x", "c0")); + BOOST_CHECK_EQUAL(c0.lq.get(TIMEOUT).getData(), "x"); + + // Session error. + BOOST_CHECK_THROW(c0.session.exchangeBind(), SessionException); + c1.session.messageTransfer(content=Message("stay", "c0")); // Will stay on queue, session c0 is dead. + + // Connection error, kill c1 on all members. + queueAndSub(c1); + BOOST_CHECK_THROW( + c1.session.messageTransfer( + content=pMessage("TEST_STORE_DO: s0[exception] s1[exception] s2[exception] testNormalErrors", "c1")), + ConnectionException); + c2.session.messageTransfer(content=Message("stay", "c1")); // Will stay on queue, session/connection c1 is dead. + + BOOST_CHECK_EQUAL(3u, knownBrokerPorts(c2.connection, 3).size()); + BOOST_CHECK_EQUAL(c2.subs.get("c0", TIMEOUT).getData(), "stay"); + BOOST_CHECK_EQUAL(c2.subs.get("c1", TIMEOUT).getData(), "stay"); +} + + +// Test errors after a new member joins to verify frame-sequence-numbers are ok in update. +QPID_AUTO_TEST_CASE(testErrorAfterJoin) { + ScopedSuppressLogging allQuiet; + + ClusterFixture cluster(1, -1, updateArgs); + Client c0(cluster[0]); + c0.session.queueDeclare("q", durable=true); + c0.session.messageTransfer(content=pMessage("a", "q")); + + // Kill the new guy + cluster.add(); + Client c1(cluster[1]); + c0.session.messageTransfer(content=pMessage("TEST_STORE_DO: s1[exception] testErrorAfterJoin", "q")); + BOOST_CHECK_THROW(c1.session.messageTransfer(content=pMessage("xxx", "q")), TransportFailure); + BOOST_CHECK_EQUAL(1u, knownBrokerPorts(c0.connection, 1).size()); + + // Kill the old guy + cluster.add(); + Client c2(cluster[2]); + c2.session.messageTransfer(content=pMessage("TEST_STORE_DO: s0[exception] testErrorAfterJoin2", "q")); + BOOST_CHECK_THROW(c0.session.messageTransfer(content=pMessage("xxx", "q")), TransportFailure); + + BOOST_CHECK_EQUAL(1u, knownBrokerPorts(c2.connection, 1).size()); +} + +// Test that if one member fails and others do not, the failure leaves the cluster. +QPID_AUTO_TEST_CASE(testSinglePartialFailure) { + ScopedSuppressLogging allQuiet; + + ClusterFixture cluster(3, -1, updateArgs); + Client c0(cluster[0], "c0"); + Client c1(cluster[1], "c1"); + Client c2(cluster[2], "c2"); + + c0.session.queueDeclare("q", durable=true); + c0.session.messageTransfer(content=pMessage("a", "q")); + // Cause partial failure on c1 + c0.session.messageTransfer(content=pMessage("TEST_STORE_DO: s1[exception] testSinglePartialFailure", "q")); + BOOST_CHECK_THROW(c1.session.queueQuery("q"), TransportFailure); + + c0.session.messageTransfer(content=pMessage("b", "q")); + BOOST_CHECK_EQUAL(c0.session.queueQuery("q").getMessageCount(), 3u); + BOOST_CHECK_EQUAL(2u, knownBrokerPorts(c0.connection, 2).size()); + + // Cause partial failure on c2 + c0.session.messageTransfer(content=pMessage("TEST_STORE_DO: s2[exception] testSinglePartialFailure2", "q")); + BOOST_CHECK_THROW(c2.session.queueQuery("q"), TransportFailure); + + c0.session.messageTransfer(content=pMessage("c", "q")); + BOOST_CHECK_EQUAL(c0.session.queueQuery("q").getMessageCount(), 5u); + BOOST_CHECK_EQUAL(1u, knownBrokerPorts(c0.connection, 1).size()); +} + +// Test multiple partial falures: 2 fail 2 pass +QPID_AUTO_TEST_CASE(testMultiPartialFailure) { + ScopedSuppressLogging allQuiet; + + ClusterFixture cluster(4, -1, updateArgs); + Client c0(cluster[0], "c0"); + Client c1(cluster[1], "c1"); + Client c2(cluster[2], "c2"); + Client c3(cluster[3], "c3"); + + c0.session.queueDeclare("q", durable=true); + c0.session.messageTransfer(content=pMessage("a", "q")); + + // Cause partial failure on c1, c2 + c0.session.messageTransfer(content=pMessage("TEST_STORE_DO: s1[exception] s2[exception] testMultiPartialFailure", "q")); + BOOST_CHECK_THROW(c1.session.queueQuery("q"), TransportFailure); + BOOST_CHECK_THROW(c2.session.queueQuery("q"), TransportFailure); + + c0.session.messageTransfer(content=pMessage("b", "q")); + c3.session.messageTransfer(content=pMessage("c", "q")); + BOOST_CHECK_EQUAL(c3.session.queueQuery("q").getMessageCount(), 4u); + BOOST_CHECK_EQUAL(2u, knownBrokerPorts(c0.connection, 2).size()); +} + +/** FIXME aconway 2009-04-10: + * The current approach to shutting down a process in test_store + * sometimes leads to assertion failures and errors in the shut-down + * process. Need a cleaner solution + */ +#if 0 +QPID_AUTO_TEST_CASE(testPartialFailureMemberLeaves) { + ScopedSuppressLogging allQuiet; + + ClusterFixture cluster(2, -1, updateArgs); + Client c0(cluster[0], "c0"); + Client c1(cluster[1], "c1"); + + c0.session.queueDeclare("q", durable=true); + c0.session.messageTransfer(content=pMessage("a", "q")); + + // Cause failure on member 0 and simultaneous crash on member 1. + BOOST_CHECK_THROW( + c0.session.messageTransfer( + content=pMessage("TEST_STORE_DO: s0[exception] s1[exit_process] testPartialFailureMemberLeaves", "q")), + ConnectionException); + cluster.wait(1); + + Client c00(cluster[0], "c00"); // Old connection is dead. + BOOST_CHECK_EQUAL(c00.session.queueQuery("q").getMessageCount(), 1u); + BOOST_CHECK_EQUAL(1u, knownBrokerPorts(c00.connection, 1).size()); +} +#endif + + +QPID_AUTO_TEST_SUITE_END() diff --git a/cpp/src/tests/cluster.mk b/cpp/src/tests/cluster.mk index 5d115de5a5..f92bb112e4 100644 --- a/cpp/src/tests/cluster.mk +++ b/cpp/src/tests/cluster.mk @@ -34,8 +34,10 @@ EXTRA_DIST+=ais_check start_cluster stop_cluster restart_cluster cluster_python_ federated_cluster_test clustered_replication_test check_PROGRAMS+=cluster_test -cluster_test_SOURCES=unit_test.cpp cluster_test.cpp ClusterFixture.cpp ClusterFixture.h ForkedBroker.h ForkedBroker.cpp -cluster_test_LDADD=$(lib_client) ../cluster.la -lboost_unit_test_framework +cluster_test_SOURCES=unit_test.cpp ClusterFixture.cpp ClusterFixture.h ForkedBroker.h ForkedBroker.cpp \ + cluster_test.cpp PartialFailure.cpp + +cluster_test_LDADD=$(lib_client) ../cluster.la test_store.la -lboost_unit_test_framework unit_test_LDADD+=../cluster.la diff --git a/cpp/src/tests/cluster_test.cpp b/cpp/src/tests/cluster_test.cpp index eee2df58cc..98b399c187 100644 --- a/cpp/src/tests/cluster_test.cpp +++ b/cpp/src/tests/cluster_test.cpp @@ -73,7 +73,7 @@ const sys::Duration TIMEOUT=sys::TIME_SEC/4; ostream& operator<<(ostream& o, const cpg_name* n) { - return o << cluster::Cpg::str(*n); + return o << Cpg::str(*n); } ostream& operator<<(ostream& o, const cpg_address& a) { @@ -89,29 +89,12 @@ ostream& operator<<(ostream& o, const pair<T*, int>& array) { return o; } -template <class C> set<uint16_t> makeSet(const C& c) { - set<uint16_t> s; +template <class C> set<int> makeSet(const C& c) { + set<int> s; copy(c.begin(), c.end(), inserter(s, s.begin())); return s; } -template <class T> set<uint16_t> knownBrokerPorts(T& source, int n=-1) { - vector<Url> urls = source.getKnownBrokers(); - if (n >= 0 && unsigned(n) != urls.size()) { - BOOST_MESSAGE("knownBrokerPorts waiting for " << n << ": " << urls); - // Retry up to 10 secs in .1 second intervals. - for (size_t retry=100; urls.size() != unsigned(n) && retry != 0; --retry) { - sys::usleep(1000*100); // 0.1 secs - urls = source.getKnownBrokers(); - } - } - BOOST_MESSAGE("knownBrokerPorts expecting " << n << ": " << urls); - set<uint16_t> s; - for (vector<Url>::const_iterator i = urls.begin(); i != urls.end(); ++i) - s.insert((*i)[0].get<TcpAddress>()->port); - return s; -} - class Sender { public: Sender(boost::shared_ptr<ConnectionImpl> ci, uint16_t ch) : connection(ci), channel(ch) {} @@ -175,7 +158,6 @@ ConnectionSettings aclSettings(int port, const std::string& id) { QPID_AUTO_TEST_CASE(testAcl) { ofstream policyFile("cluster_test.acl"); - // FIXME aconway 2009-02-12: guest -> qpidd? policyFile << "acl allow foo@QPID create queue name=foo" << endl << "acl allow foo@QPID create queue name=foo2" << endl << "acl deny foo@QPID create queue name=bar" << endl @@ -446,13 +428,13 @@ QPID_AUTO_TEST_CASE(testUpdateMessageBuilder) { QPID_AUTO_TEST_CASE(testConnectionKnownHosts) { ClusterFixture cluster(1); Client c0(cluster[0], "c0"); - set<uint16_t> kb0 = knownBrokerPorts(c0.connection); + set<int> kb0 = knownBrokerPorts(c0.connection); BOOST_CHECK_EQUAL(kb0.size(), 1u); BOOST_CHECK_EQUAL(kb0, makeSet(cluster)); cluster.add(); Client c1(cluster[1], "c1"); - set<uint16_t> kb1 = knownBrokerPorts(c1.connection); + set<int> kb1 = knownBrokerPorts(c1.connection); kb0 = knownBrokerPorts(c0.connection, 2); BOOST_CHECK_EQUAL(kb1.size(), 2u); BOOST_CHECK_EQUAL(kb1, makeSet(cluster)); @@ -460,7 +442,7 @@ QPID_AUTO_TEST_CASE(testConnectionKnownHosts) { cluster.add(); Client c2(cluster[2], "c2"); - set<uint16_t> kb2 = knownBrokerPorts(c2.connection); + set<int> kb2 = knownBrokerPorts(c2.connection); kb1 = knownBrokerPorts(c1.connection, 3); kb0 = knownBrokerPorts(c0.connection, 3); BOOST_CHECK_EQUAL(kb2.size(), 3u); diff --git a/cpp/src/tests/clustered_replication_test b/cpp/src/tests/clustered_replication_test index 2a3e742632..7afda87733 100755 --- a/cpp/src/tests/clustered_replication_test +++ b/cpp/src/tests/clustered_replication_test @@ -23,6 +23,7 @@ # failures: srcdir=`dirname $0` PYTHON_DIR=$srcdir/../../../python +export PYTHONPATH=$PYTHON_DIR trap stop_brokers INT EXIT diff --git a/cpp/src/tests/failover_soak.cpp b/cpp/src/tests/failover_soak.cpp index 18d693e5ec..2da60f47b5 100644 --- a/cpp/src/tests/failover_soak.cpp +++ b/cpp/src/tests/failover_soak.cpp @@ -220,63 +220,13 @@ struct children : public vector<child *> cout << "\n\n\n\n"; } - - /* - Only call this if you already know there is at least - one child still running. Supply a time in seconds. - If it has been at least that long since a shild stopped - running, we judge the system to have hung. - */ - int - hanging ( int hangTime ) - { - struct timeval now, - duration; - gettimeofday ( &now, 0 ); - - int how_many_hanging = 0; - - vector<child *>::iterator i; - for ( i = begin(); i != end(); ++ i ) - { - //Not in POSIX - //timersub ( & now, &((*i)->startTime), & duration ); - duration.tv_sec = now.tv_sec - (*i)->startTime.tv_sec; - duration.tv_usec = now.tv_usec - (*i)->startTime.tv_usec; - if (duration.tv_usec < 0) { - --duration.tv_sec; - duration.tv_usec += 1000000; - } - - if ( (COMPLETED != (*i)->status) // child isn't done running - && - ( duration.tv_sec >= hangTime ) // it's been too long - ) - { - std::cerr << "Child of type " - << (*i)->type - << " hanging. " - << "PID is " - << (*i)->pid - << endl; - ++ how_many_hanging; - } - } - - return how_many_hanging; - } - - int verbosity; }; - children allMyChildren; - - void childExit ( int ) { @@ -389,6 +339,7 @@ startNewBroker ( brokerVector & brokers, ("--log-prefix") (prefix.str()) ("--log-to-file") + ("--log-enable=error+") (prefix.str()+".log"); if (endsWith(moduleOrDir, "cluster.so")) { @@ -818,16 +769,6 @@ main ( int argc, char const ** argv ) return ERROR_ON_CHILD; } - // If one is hanging, quit. - if ( allMyChildren.hanging ( 120 ) ) - { - /* - * Don't kill any processes. Leave alive for questioning. - * */ - std::cerr << "END_OF_TEST ERROR_HANGING\n"; - return HANGING; - } - if ( verbosity > 1 ) { std::cerr << "------- next kill-broker loop --------\n"; allMyChildren.print(); diff --git a/cpp/src/tests/run_failover_soak b/cpp/src/tests/run_failover_soak index cf9646ac58..333dd0be23 100755 --- a/cpp/src/tests/run_failover_soak +++ b/cpp/src/tests/run_failover_soak @@ -51,5 +51,6 @@ REPORT_FREQUENCY=${REPORT_FREQUENCY:-`expr $MESSAGES / 20`} VERBOSITY=${VERBOSITY:-1} DURABILITY=${DURABILITY:-0} +rm -f soak-*.log exec ./failover_soak $MODULES ./declare_queues ./replaying_sender ./resuming_receiver $MESSAGES $REPORT_FREQUENCY $VERBOSITY $DURABILITY diff --git a/cpp/src/tests/start_cluster b/cpp/src/tests/start_cluster index 053b23da33..585ba082d5 100755 --- a/cpp/src/tests/start_cluster +++ b/cpp/src/tests/start_cluster @@ -28,15 +28,17 @@ with_ais_group() { echo $* | newgrp ais } -rm -f cluster*.log -SIZE=${1:-1}; shift +rm -f cluster*.log cluster.ports qpidd.port + +SIZE=${1:-3}; shift CLUSTER=`pwd` # Cluster name=pwd, avoid clashes. -OPTS="-d --no-module-dir --load-module ../.libs/cluster.so --cluster-name=$CLUSTER --no-data-dir --auth=no $@" +OPTS="-d --no-module-dir --load-module ../.libs/cluster.so --cluster-name=$CLUSTER --auth=no $@" for (( i=0; i<SIZE; ++i )); do - PORT=`with_ais_group ../qpidd -p0 --log-to-file=cluster$i.log $OPTS` || exit 1 + DDIR=`mktemp -d /tmp/start_cluster.XXXXXXXXXX` + PORT=`with_ais_group ../qpidd -p0 --log-to-file=cluster$i.log $OPTS --data-dir=$DDIR` || exit 1 echo $PORT >> cluster.ports done -head cluster.ports > qpidd.port # First member's port for tests. +head -n 1 cluster.ports > qpidd.port # First member's port for tests. diff --git a/cpp/src/tests/test_store.cpp b/cpp/src/tests/test_store.cpp new file mode 100644 index 0000000000..da4f03192a --- /dev/null +++ b/cpp/src/tests/test_store.cpp @@ -0,0 +1,146 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + + +/**@file + * Plug-in message store for tests. + * + * Add functionality as required, build up a comprehensive set of + * features to support persistent behavior tests. + * + * Current features special "action" messages can: + * - raise exception from enqueue. + * - force host process to exit. + * - do async completion after a delay. + */ + +#include "qpid/broker/NullMessageStore.h" +#include "qpid/broker/Broker.h" +#include "qpid/log/Statement.h" +#include "qpid/Plugin.h" +#include "qpid/Options.h" +#include <boost/algorithm/string.hpp> +#include <boost/cast.hpp> +#include <boost/lexical_cast.hpp> + +using namespace qpid; +using namespace broker; +using namespace std; +using namespace boost; +using namespace qpid::sys; + +struct TestStoreOptions : public Options { + + string name; + + TestStoreOptions() : Options("Test Store Options") { + addOptions() + ("test-store-name", optValue(name, "NAME"), "Name to identify test store instance."); + } +}; + +struct Completer : public Runnable { + intrusive_ptr<PersistableMessage> message; + int usecs; + Completer(intrusive_ptr<PersistableMessage> m, int u) : message(m), usecs(u) {} + void run() { + qpid::sys::usleep(usecs); + message->enqueueComplete(); + delete this; + } +}; + +class TestStore : public NullMessageStore { + public: + TestStore(const string& name_, Broker& broker_) : name(name_), broker(broker_) {} + + ~TestStore() { + for_each(threads.begin(), threads.end(), boost::bind(&Thread::join, _1)); + } + + void enqueue(TransactionContext* , + const boost::intrusive_ptr<PersistableMessage>& msg, + const PersistableQueue& ) + { + string data = polymorphic_downcast<Message*>(msg.get())->getFrames().getContent(); + + // Check the message for special instructions. + size_t i, j; + if (starts_with(data, TEST_STORE_DO) + && (i = data.find(name+"[")) != string::npos + && (j = data.find("]", i)) != string::npos) + { + size_t start = i+name.size()+1; + string action = data.substr(start, j-start); + + if (action == EXCEPTION) { + throw Exception(QPID_MSG("TestStore " << name << " throwing exception for: " << data)); + } + else if (action == EXIT_PROCESS) { + // FIXME aconway 2009-04-10: this is a dubious way to + // close the process at best, it can cause assertions or seg faults + // rather than clean exit. + QPID_LOG(critical, "TestStore " << name << " forcing process exit for: " << data); + exit(0); + } + else if (starts_with(action, ASYNC)) { + std::string delayStr(action.substr(ASYNC.size())); + int delay = lexical_cast<int>(delayStr); + threads.push_back(Thread(*new Completer(msg, delay))); + } + else { + QPID_LOG(error, "TestStore " << name << " unknown action " << action); + msg->enqueueComplete(); + } + } + else + msg->enqueueComplete(); + } + + private: + static const string TEST_STORE_DO, EXCEPTION, EXIT_PROCESS, ASYNC; + string name; + Broker& broker; + vector<Thread> threads; +}; + +const string TestStore::TEST_STORE_DO = "TEST_STORE_DO: "; +const string TestStore::EXCEPTION = "exception"; +const string TestStore::EXIT_PROCESS = "exit_process"; +const string TestStore::ASYNC="async "; + +struct TestStorePlugin : public Plugin { + + TestStoreOptions options; + + Options* getOptions() { return &options; } + + void earlyInitialize (Plugin::Target& target) + { + Broker* broker = dynamic_cast<Broker*>(&target); + if (!broker) return; + broker->setStore (new TestStore(options.name, *broker)); + } + + void initialize(qpid::Plugin::Target&) {} +}; + +static TestStorePlugin pluginInstance; |