summaryrefslogtreecommitdiff
path: root/cpp/src/tests
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/tests')
-rw-r--r--cpp/src/tests/BrokerFixture.h10
-rw-r--r--cpp/src/tests/ClusterFixture.cpp32
-rw-r--r--cpp/src/tests/ClusterFixture.h18
-rw-r--r--cpp/src/tests/ForkedBroker.cpp26
-rw-r--r--cpp/src/tests/ForkedBroker.h1
-rw-r--r--cpp/src/tests/Makefile.am8
-rw-r--r--cpp/src/tests/PartialFailure.cpp222
-rw-r--r--cpp/src/tests/cluster.mk6
-rw-r--r--cpp/src/tests/cluster_test.cpp30
-rwxr-xr-xcpp/src/tests/clustered_replication_test1
-rw-r--r--cpp/src/tests/failover_soak.cpp61
-rwxr-xr-xcpp/src/tests/run_failover_soak1
-rwxr-xr-xcpp/src/tests/start_cluster12
-rw-r--r--cpp/src/tests/test_store.cpp146
14 files changed, 468 insertions, 106 deletions
diff --git a/cpp/src/tests/BrokerFixture.h b/cpp/src/tests/BrokerFixture.h
index f55560739d..b32b7f44ba 100644
--- a/cpp/src/tests/BrokerFixture.h
+++ b/cpp/src/tests/BrokerFixture.h
@@ -114,10 +114,12 @@ struct ClientT {
SessionType session;
qpid::client::SubscriptionManager subs;
qpid::client::LocalQueue lq;
- ClientT(uint16_t port, const std::string& name=std::string())
- : connection(port), session(connection.newSession(name)), subs(session) {}
- ClientT(const qpid::client::ConnectionSettings& settings, const std::string& name=std::string())
- : connection(settings), session(connection.newSession(name)), subs(session) {}
+ std::string name;
+
+ ClientT(uint16_t port, const std::string& name_=std::string())
+ : connection(port), session(connection.newSession(name_)), subs(session), name(name_) {}
+ ClientT(const qpid::client::ConnectionSettings& settings, const std::string& name_=std::string())
+ : connection(settings), session(connection.newSession(name_)), subs(session), name(name_) {}
~ClientT() { connection.close(); }
};
diff --git a/cpp/src/tests/ClusterFixture.cpp b/cpp/src/tests/ClusterFixture.cpp
index 5658957b48..d49be76f79 100644
--- a/cpp/src/tests/ClusterFixture.cpp
+++ b/cpp/src/tests/ClusterFixture.cpp
@@ -67,16 +67,23 @@ ClusterFixture::ClusterFixture(size_t n, int localIndex_, const Args& args_)
add(n);
}
+ClusterFixture::ClusterFixture(size_t n, int localIndex_, boost::function<void (Args&, size_t)> updateArgs_)
+ : name(Uuid(true).str()), localIndex(localIndex_), updateArgs(updateArgs_)
+{
+ add(n);
+}
+
const ClusterFixture::Args ClusterFixture::DEFAULT_ARGS =
list_of<string>("--auth=no")("--no-data-dir");
-ClusterFixture::Args ClusterFixture::makeArgs(const std::string& prefix) {
+ClusterFixture::Args ClusterFixture::makeArgs(const std::string& prefix, size_t index) {
Args args = list_of<string>("qpidd " __FILE__)
("--no-module-dir")
("--load-module=../.libs/cluster.so")
("--cluster-name")(name)
("--log-prefix")(prefix);
args.insert(args.end(), userArgs.begin(), userArgs.end());
+ if (updateArgs) updateArgs(args, index);
return args;
}
@@ -84,7 +91,7 @@ void ClusterFixture::add() {
if (size() != size_t(localIndex)) { // fork a broker process.
std::ostringstream os; os << "fork" << size();
std::string prefix = os.str();
- forkedBrokers.push_back(shared_ptr<ForkedBroker>(new ForkedBroker(makeArgs(prefix))));
+ forkedBrokers.push_back(shared_ptr<ForkedBroker>(new ForkedBroker(makeArgs(prefix, size()))));
push_back(forkedBrokers.back()->getPort());
}
else { // Run in this process
@@ -106,7 +113,7 @@ void ClusterFixture::addLocal() {
assert(int(size()) == localIndex);
ostringstream os; os << "local" << localIndex;
string prefix = os.str();
- Args args(makeArgs(prefix));
+ Args args(makeArgs(prefix, localIndex));
vector<const char*> argv(args.size());
transform(args.begin(), args.end(), argv.begin(), boost::bind(&string::c_str, _1));
qpid::log::Logger::instance().setPrefix(prefix);
@@ -131,3 +138,22 @@ void ClusterFixture::killWithSilencer(size_t n, client::Connection& c, int sig)
kill(n,sig);
try { c.close(); } catch(...) {}
}
+
+/**
+ * Get the known broker ports from a Connection.
+ *@param n if specified wait for the cluster size to be n, up to a timeout.
+ */
+std::set<int> knownBrokerPorts(qpid::client::Connection& source, int n) {
+ std::vector<qpid::Url> urls = source.getKnownBrokers();
+ if (n >= 0 && unsigned(n) != urls.size()) {
+ // Retry up to 10 secs in .1 second intervals.
+ for (size_t retry=100; urls.size() != unsigned(n) && retry != 0; --retry) {
+ qpid::sys::usleep(1000*100); // 0.1 secs
+ urls = source.getKnownBrokers();
+ }
+ }
+ std::set<int> s;
+ for (std::vector<qpid::Url>::const_iterator i = urls.begin(); i != urls.end(); ++i)
+ s.insert((*i)[0].get<qpid::TcpAddress>()->port);
+ return s;
+}
diff --git a/cpp/src/tests/ClusterFixture.h b/cpp/src/tests/ClusterFixture.h
index 84fb9f2202..75d39fc7e5 100644
--- a/cpp/src/tests/ClusterFixture.h
+++ b/cpp/src/tests/ClusterFixture.h
@@ -38,6 +38,7 @@
#include "qpid/log/Logger.h"
#include <boost/bind.hpp>
+#include <boost/function.hpp>
#include <boost/shared_ptr.hpp>
#include <string>
@@ -69,33 +70,44 @@ using qpid::cluster::Cluster;
class ClusterFixture : public vector<uint16_t> {
public:
typedef std::vector<std::string> Args;
+ static const Args DEFAULT_ARGS;
+
/** @param localIndex can be -1 meaning don't automatically start a local broker.
* A local broker can be started with addLocal().
*/
ClusterFixture(size_t n, int localIndex=0, const Args& args=DEFAULT_ARGS);
+
+ /**@param updateArgs function is passed the index of the cluster member and can update the arguments. */
+ ClusterFixture(size_t n, int localIndex, boost::function<void (Args&, size_t)> updateArgs);
+
void add(size_t n) { for (size_t i=0; i < n; ++i) add(); }
void add(); // Add a broker.
void setup();
bool hasLocal() const;
- /** Kill a forked broker with sig, or shutdown localBroker if n==0. */
+ /** Kill a forked broker with sig, or shutdown localBroker. */
void kill(size_t n, int sig=SIGINT);
/** Kill a broker and suppressing errors from closing connection c. */
void killWithSilencer(size_t n, client::Connection& c, int sig=SIGINT);
private:
- static const Args DEFAULT_ARGS;
void addLocal(); // Add a local broker.
- Args makeArgs(const std::string& prefix);
+ Args makeArgs(const std::string& prefix, size_t index);
string name;
std::auto_ptr<BrokerFixture> localBroker;
int localIndex;
std::vector<shared_ptr<ForkedBroker> > forkedBrokers;
Args userArgs;
+ boost::function<void (Args&, size_t)> updateArgs;
};
+/**
+ * Get the known broker ports from a Connection.
+ *@param n if specified wait for the cluster size to be n, up to a timeout.
+ */
+std::set<int> knownBrokerPorts(qpid::client::Connection& source, int n=-1);
#endif /*!CLUSTER_FIXTURE_H*/
diff --git a/cpp/src/tests/ForkedBroker.cpp b/cpp/src/tests/ForkedBroker.cpp
index f90f76aeb2..383dcc496c 100644
--- a/cpp/src/tests/ForkedBroker.cpp
+++ b/cpp/src/tests/ForkedBroker.cpp
@@ -20,12 +20,17 @@
*/
#include "ForkedBroker.h"
+#include "qpid/log/Statement.h"
#include <boost/bind.hpp>
+#include <boost/algorithm/string.hpp>
#include <algorithm>
#include <stdlib.h>
#include <sys/types.h>
#include <signal.h>
+using namespace std;
+using qpid::ErrnoException;
+
ForkedBroker::ForkedBroker(const Args& args) { init(args); }
ForkedBroker::ForkedBroker(int argc, const char* const argv[]) { init(Args(argv, argc+argv)); }
@@ -42,14 +47,25 @@ void ForkedBroker::kill(int sig) {
pid = 0; // Reset pid here in case of an exception.
using qpid::ErrnoException;
if (::kill(savePid, sig) < 0)
- throw ErrnoException("kill failed");
+ throw ErrnoException("kill failed");
int status;
if (::waitpid(savePid, &status, 0) < 0 && sig != 9)
throw ErrnoException("wait for forked process failed");
if (WEXITSTATUS(status) != 0 && sig != 9)
throw qpid::Exception(QPID_MSG("Forked broker exited with: " << WEXITSTATUS(status)));
}
+
+namespace std {
+static ostream& operator<<(ostream& o, const ForkedBroker::Args& a) {
+ copy(a.begin(), a.end(), ostream_iterator<string>(o, " "));
+ return o;
+}
+bool isLogOption(const std::string& s) {
+ return boost::starts_with(s, "--log-enable") || boost::starts_with(s, "--trace");
+}
+
+}
void ForkedBroker::init(const Args& userArgs) {
using qpid::ErrnoException;
@@ -70,17 +86,19 @@ void ForkedBroker::init(const Args& userArgs) {
}
else { // child
::close(pipeFds[0]);
- // FIXME aconway 2009-02-12:
int fd = ::dup2(pipeFds[1], 1); // pipe stdout to the parent.
if (fd < 0) throw ErrnoException("dup2 failed");
const char* prog = "../qpidd";
Args args(userArgs);
args.push_back("--port=0");
- if (!::getenv("QPID_TRACE") && !::getenv("QPID_LOG_ENABLE"))
- args.push_back("--log-enable=error+"); // Keep quiet except for errors.
+ // Keep quiet except for errors.
+ if (!::getenv("QPID_TRACE") && !::getenv("QPID_LOG_ENABLE")
+ && find_if(userArgs.begin(), userArgs.end(), isLogOption) == userArgs.end())
+ args.push_back("--log-enable=error+");
std::vector<const char*> argv(args.size());
std::transform(args.begin(), args.end(), argv.begin(), boost::bind(&std::string::c_str, _1));
argv.push_back(0);
+ QPID_LOG(debug, "ForkedBroker exec " << prog << ": " << args);
execv(prog, const_cast<char* const*>(&argv[0]));
throw ErrnoException("execv failed");
}
diff --git a/cpp/src/tests/ForkedBroker.h b/cpp/src/tests/ForkedBroker.h
index 6f97fbdc09..e72f421563 100644
--- a/cpp/src/tests/ForkedBroker.h
+++ b/cpp/src/tests/ForkedBroker.h
@@ -53,6 +53,7 @@ class ForkedBroker {
~ForkedBroker();
void kill(int sig=SIGINT);
+ int wait(); // Wait for exit, return exit status.
uint16_t getPort() { return port; }
pid_t getPID() { return pid; }
diff --git a/cpp/src/tests/Makefile.am b/cpp/src/tests/Makefile.am
index 9a81ef18b3..4b59e8ebe9 100644
--- a/cpp/src/tests/Makefile.am
+++ b/cpp/src/tests/Makefile.am
@@ -110,11 +110,17 @@ endif
# amqp_0_10/Map.cpp \
# amqp_0_10/handlers.cpp
+TESTLIBFLAGS = -module -rpath $(abs_builddir)
check_LTLIBRARIES += libshlibtest.la
-libshlibtest_la_LDFLAGS = -module -rpath $(abs_builddir)
+libshlibtest_la_LDFLAGS = $(TESTLIBFLAGS)
libshlibtest_la_SOURCES = shlibtest.cpp
+check_LTLIBRARIES += test_store.la
+test_store_la_SOURCES = test_store.cpp
+test_store_la_LIBADD = $(lib_broker) # FIXME aconway 2009-04-03: required?
+test_store_la_LDFLAGS = $(TESTLIBFLAGS)
+
include cluster.mk
if SSL
include ssl.mk
diff --git a/cpp/src/tests/PartialFailure.cpp b/cpp/src/tests/PartialFailure.cpp
new file mode 100644
index 0000000000..f7187b2e77
--- /dev/null
+++ b/cpp/src/tests/PartialFailure.cpp
@@ -0,0 +1,222 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+/**@file Tests for partial failure in a cluster.
+ * Partial failure means some nodes experience a failure while others do not.
+ * In this case the failed nodes must shut down.
+ */
+
+#include "test_tools.h"
+#include "unit_test.h"
+#include "ClusterFixture.h"
+#include <boost/assign.hpp>
+#include <boost/algorithm/string.hpp>
+#include <boost/bind.hpp>
+
+QPID_AUTO_TEST_SUITE(PartialFailureTestSuite)
+
+ using namespace std;
+using namespace qpid;
+using namespace qpid::cluster;
+using namespace qpid::framing;
+using namespace qpid::client;
+using namespace qpid::client::arg;
+using namespace boost::assign;
+using broker::Broker;
+using boost::shared_ptr;
+
+// Timeout for tests that wait for messages
+const sys::Duration TIMEOUT=sys::TIME_SEC/4;
+
+static bool isLogOption(const std::string& s) { return boost::starts_with(s, "--log-enable"); }
+
+void updateArgs(ClusterFixture::Args& args, size_t index) {
+ ostringstream os;
+ os << "--test-store-name=s" << index;
+ args.push_back(os.str());
+ args.push_back("--load-module=.libs/test_store.so");
+ string dataDir("/tmp/PartialFailure.XXXXXX");
+ if (!mkdtemp(const_cast<char*>(dataDir.c_str())))
+ throw ErrnoException("Can't create data dir");
+ args.push_back("--data-dir="+dataDir);
+ args.push_back("--auth=no");
+
+ // These tests generate errors deliberately, disable error logging unless a log env var is set.
+ if (!::getenv("QPID_TRACE") && !::getenv("QPID_LOG_ENABLE")) {
+ remove_if(args.begin(), args.end(), isLogOption);
+ args.push_back("--log-enable=critical+:DISABLED"); // hacky way to disable logs.
+ }
+}
+
+Message pMessage(string data, string q) {
+ Message msg(data, q);
+ msg.getDeliveryProperties().setDeliveryMode(PERSISTENT);
+ return msg;
+}
+
+void queueAndSub(Client& c) {
+ c.session.queueDeclare(c.name, durable=true);
+ c.subs.subscribe(c.lq, c.name);
+}
+
+// Verify normal cluster-wide errors.
+QPID_AUTO_TEST_CASE(testNormalErrors) {
+ // FIXME aconway 2009-04-10: Would like to put a scope just around
+ // the statements expected to fail (in BOOST_CHECK_THROW) but that
+ // sproadically lets out messages, possibly because they're in
+ // Connection thread.
+ ScopedSuppressLogging allQuiet;
+
+ ClusterFixture cluster(3, -1, updateArgs);
+ Client c0(cluster[0], "c0");
+ Client c1(cluster[1], "c1");
+ Client c2(cluster[2], "c2");
+
+ queueAndSub(c0);
+ c0.session.messageTransfer(content=Message("x", "c0"));
+ BOOST_CHECK_EQUAL(c0.lq.get(TIMEOUT).getData(), "x");
+
+ // Session error.
+ BOOST_CHECK_THROW(c0.session.exchangeBind(), SessionException);
+ c1.session.messageTransfer(content=Message("stay", "c0")); // Will stay on queue, session c0 is dead.
+
+ // Connection error, kill c1 on all members.
+ queueAndSub(c1);
+ BOOST_CHECK_THROW(
+ c1.session.messageTransfer(
+ content=pMessage("TEST_STORE_DO: s0[exception] s1[exception] s2[exception] testNormalErrors", "c1")),
+ ConnectionException);
+ c2.session.messageTransfer(content=Message("stay", "c1")); // Will stay on queue, session/connection c1 is dead.
+
+ BOOST_CHECK_EQUAL(3u, knownBrokerPorts(c2.connection, 3).size());
+ BOOST_CHECK_EQUAL(c2.subs.get("c0", TIMEOUT).getData(), "stay");
+ BOOST_CHECK_EQUAL(c2.subs.get("c1", TIMEOUT).getData(), "stay");
+}
+
+
+// Test errors after a new member joins to verify frame-sequence-numbers are ok in update.
+QPID_AUTO_TEST_CASE(testErrorAfterJoin) {
+ ScopedSuppressLogging allQuiet;
+
+ ClusterFixture cluster(1, -1, updateArgs);
+ Client c0(cluster[0]);
+ c0.session.queueDeclare("q", durable=true);
+ c0.session.messageTransfer(content=pMessage("a", "q"));
+
+ // Kill the new guy
+ cluster.add();
+ Client c1(cluster[1]);
+ c0.session.messageTransfer(content=pMessage("TEST_STORE_DO: s1[exception] testErrorAfterJoin", "q"));
+ BOOST_CHECK_THROW(c1.session.messageTransfer(content=pMessage("xxx", "q")), TransportFailure);
+ BOOST_CHECK_EQUAL(1u, knownBrokerPorts(c0.connection, 1).size());
+
+ // Kill the old guy
+ cluster.add();
+ Client c2(cluster[2]);
+ c2.session.messageTransfer(content=pMessage("TEST_STORE_DO: s0[exception] testErrorAfterJoin2", "q"));
+ BOOST_CHECK_THROW(c0.session.messageTransfer(content=pMessage("xxx", "q")), TransportFailure);
+
+ BOOST_CHECK_EQUAL(1u, knownBrokerPorts(c2.connection, 1).size());
+}
+
+// Test that if one member fails and others do not, the failure leaves the cluster.
+QPID_AUTO_TEST_CASE(testSinglePartialFailure) {
+ ScopedSuppressLogging allQuiet;
+
+ ClusterFixture cluster(3, -1, updateArgs);
+ Client c0(cluster[0], "c0");
+ Client c1(cluster[1], "c1");
+ Client c2(cluster[2], "c2");
+
+ c0.session.queueDeclare("q", durable=true);
+ c0.session.messageTransfer(content=pMessage("a", "q"));
+ // Cause partial failure on c1
+ c0.session.messageTransfer(content=pMessage("TEST_STORE_DO: s1[exception] testSinglePartialFailure", "q"));
+ BOOST_CHECK_THROW(c1.session.queueQuery("q"), TransportFailure);
+
+ c0.session.messageTransfer(content=pMessage("b", "q"));
+ BOOST_CHECK_EQUAL(c0.session.queueQuery("q").getMessageCount(), 3u);
+ BOOST_CHECK_EQUAL(2u, knownBrokerPorts(c0.connection, 2).size());
+
+ // Cause partial failure on c2
+ c0.session.messageTransfer(content=pMessage("TEST_STORE_DO: s2[exception] testSinglePartialFailure2", "q"));
+ BOOST_CHECK_THROW(c2.session.queueQuery("q"), TransportFailure);
+
+ c0.session.messageTransfer(content=pMessage("c", "q"));
+ BOOST_CHECK_EQUAL(c0.session.queueQuery("q").getMessageCount(), 5u);
+ BOOST_CHECK_EQUAL(1u, knownBrokerPorts(c0.connection, 1).size());
+}
+
+// Test multiple partial falures: 2 fail 2 pass
+QPID_AUTO_TEST_CASE(testMultiPartialFailure) {
+ ScopedSuppressLogging allQuiet;
+
+ ClusterFixture cluster(4, -1, updateArgs);
+ Client c0(cluster[0], "c0");
+ Client c1(cluster[1], "c1");
+ Client c2(cluster[2], "c2");
+ Client c3(cluster[3], "c3");
+
+ c0.session.queueDeclare("q", durable=true);
+ c0.session.messageTransfer(content=pMessage("a", "q"));
+
+ // Cause partial failure on c1, c2
+ c0.session.messageTransfer(content=pMessage("TEST_STORE_DO: s1[exception] s2[exception] testMultiPartialFailure", "q"));
+ BOOST_CHECK_THROW(c1.session.queueQuery("q"), TransportFailure);
+ BOOST_CHECK_THROW(c2.session.queueQuery("q"), TransportFailure);
+
+ c0.session.messageTransfer(content=pMessage("b", "q"));
+ c3.session.messageTransfer(content=pMessage("c", "q"));
+ BOOST_CHECK_EQUAL(c3.session.queueQuery("q").getMessageCount(), 4u);
+ BOOST_CHECK_EQUAL(2u, knownBrokerPorts(c0.connection, 2).size());
+}
+
+/** FIXME aconway 2009-04-10:
+ * The current approach to shutting down a process in test_store
+ * sometimes leads to assertion failures and errors in the shut-down
+ * process. Need a cleaner solution
+ */
+#if 0
+QPID_AUTO_TEST_CASE(testPartialFailureMemberLeaves) {
+ ScopedSuppressLogging allQuiet;
+
+ ClusterFixture cluster(2, -1, updateArgs);
+ Client c0(cluster[0], "c0");
+ Client c1(cluster[1], "c1");
+
+ c0.session.queueDeclare("q", durable=true);
+ c0.session.messageTransfer(content=pMessage("a", "q"));
+
+ // Cause failure on member 0 and simultaneous crash on member 1.
+ BOOST_CHECK_THROW(
+ c0.session.messageTransfer(
+ content=pMessage("TEST_STORE_DO: s0[exception] s1[exit_process] testPartialFailureMemberLeaves", "q")),
+ ConnectionException);
+ cluster.wait(1);
+
+ Client c00(cluster[0], "c00"); // Old connection is dead.
+ BOOST_CHECK_EQUAL(c00.session.queueQuery("q").getMessageCount(), 1u);
+ BOOST_CHECK_EQUAL(1u, knownBrokerPorts(c00.connection, 1).size());
+}
+#endif
+
+
+QPID_AUTO_TEST_SUITE_END()
diff --git a/cpp/src/tests/cluster.mk b/cpp/src/tests/cluster.mk
index 5d115de5a5..f92bb112e4 100644
--- a/cpp/src/tests/cluster.mk
+++ b/cpp/src/tests/cluster.mk
@@ -34,8 +34,10 @@ EXTRA_DIST+=ais_check start_cluster stop_cluster restart_cluster cluster_python_
federated_cluster_test clustered_replication_test
check_PROGRAMS+=cluster_test
-cluster_test_SOURCES=unit_test.cpp cluster_test.cpp ClusterFixture.cpp ClusterFixture.h ForkedBroker.h ForkedBroker.cpp
-cluster_test_LDADD=$(lib_client) ../cluster.la -lboost_unit_test_framework
+cluster_test_SOURCES=unit_test.cpp ClusterFixture.cpp ClusterFixture.h ForkedBroker.h ForkedBroker.cpp \
+ cluster_test.cpp PartialFailure.cpp
+
+cluster_test_LDADD=$(lib_client) ../cluster.la test_store.la -lboost_unit_test_framework
unit_test_LDADD+=../cluster.la
diff --git a/cpp/src/tests/cluster_test.cpp b/cpp/src/tests/cluster_test.cpp
index eee2df58cc..98b399c187 100644
--- a/cpp/src/tests/cluster_test.cpp
+++ b/cpp/src/tests/cluster_test.cpp
@@ -73,7 +73,7 @@ const sys::Duration TIMEOUT=sys::TIME_SEC/4;
ostream& operator<<(ostream& o, const cpg_name* n) {
- return o << cluster::Cpg::str(*n);
+ return o << Cpg::str(*n);
}
ostream& operator<<(ostream& o, const cpg_address& a) {
@@ -89,29 +89,12 @@ ostream& operator<<(ostream& o, const pair<T*, int>& array) {
return o;
}
-template <class C> set<uint16_t> makeSet(const C& c) {
- set<uint16_t> s;
+template <class C> set<int> makeSet(const C& c) {
+ set<int> s;
copy(c.begin(), c.end(), inserter(s, s.begin()));
return s;
}
-template <class T> set<uint16_t> knownBrokerPorts(T& source, int n=-1) {
- vector<Url> urls = source.getKnownBrokers();
- if (n >= 0 && unsigned(n) != urls.size()) {
- BOOST_MESSAGE("knownBrokerPorts waiting for " << n << ": " << urls);
- // Retry up to 10 secs in .1 second intervals.
- for (size_t retry=100; urls.size() != unsigned(n) && retry != 0; --retry) {
- sys::usleep(1000*100); // 0.1 secs
- urls = source.getKnownBrokers();
- }
- }
- BOOST_MESSAGE("knownBrokerPorts expecting " << n << ": " << urls);
- set<uint16_t> s;
- for (vector<Url>::const_iterator i = urls.begin(); i != urls.end(); ++i)
- s.insert((*i)[0].get<TcpAddress>()->port);
- return s;
-}
-
class Sender {
public:
Sender(boost::shared_ptr<ConnectionImpl> ci, uint16_t ch) : connection(ci), channel(ch) {}
@@ -175,7 +158,6 @@ ConnectionSettings aclSettings(int port, const std::string& id) {
QPID_AUTO_TEST_CASE(testAcl) {
ofstream policyFile("cluster_test.acl");
- // FIXME aconway 2009-02-12: guest -> qpidd?
policyFile << "acl allow foo@QPID create queue name=foo" << endl
<< "acl allow foo@QPID create queue name=foo2" << endl
<< "acl deny foo@QPID create queue name=bar" << endl
@@ -446,13 +428,13 @@ QPID_AUTO_TEST_CASE(testUpdateMessageBuilder) {
QPID_AUTO_TEST_CASE(testConnectionKnownHosts) {
ClusterFixture cluster(1);
Client c0(cluster[0], "c0");
- set<uint16_t> kb0 = knownBrokerPorts(c0.connection);
+ set<int> kb0 = knownBrokerPorts(c0.connection);
BOOST_CHECK_EQUAL(kb0.size(), 1u);
BOOST_CHECK_EQUAL(kb0, makeSet(cluster));
cluster.add();
Client c1(cluster[1], "c1");
- set<uint16_t> kb1 = knownBrokerPorts(c1.connection);
+ set<int> kb1 = knownBrokerPorts(c1.connection);
kb0 = knownBrokerPorts(c0.connection, 2);
BOOST_CHECK_EQUAL(kb1.size(), 2u);
BOOST_CHECK_EQUAL(kb1, makeSet(cluster));
@@ -460,7 +442,7 @@ QPID_AUTO_TEST_CASE(testConnectionKnownHosts) {
cluster.add();
Client c2(cluster[2], "c2");
- set<uint16_t> kb2 = knownBrokerPorts(c2.connection);
+ set<int> kb2 = knownBrokerPorts(c2.connection);
kb1 = knownBrokerPorts(c1.connection, 3);
kb0 = knownBrokerPorts(c0.connection, 3);
BOOST_CHECK_EQUAL(kb2.size(), 3u);
diff --git a/cpp/src/tests/clustered_replication_test b/cpp/src/tests/clustered_replication_test
index 2a3e742632..7afda87733 100755
--- a/cpp/src/tests/clustered_replication_test
+++ b/cpp/src/tests/clustered_replication_test
@@ -23,6 +23,7 @@
# failures:
srcdir=`dirname $0`
PYTHON_DIR=$srcdir/../../../python
+export PYTHONPATH=$PYTHON_DIR
trap stop_brokers INT EXIT
diff --git a/cpp/src/tests/failover_soak.cpp b/cpp/src/tests/failover_soak.cpp
index 18d693e5ec..2da60f47b5 100644
--- a/cpp/src/tests/failover_soak.cpp
+++ b/cpp/src/tests/failover_soak.cpp
@@ -220,63 +220,13 @@ struct children : public vector<child *>
cout << "\n\n\n\n";
}
-
- /*
- Only call this if you already know there is at least
- one child still running. Supply a time in seconds.
- If it has been at least that long since a shild stopped
- running, we judge the system to have hung.
- */
- int
- hanging ( int hangTime )
- {
- struct timeval now,
- duration;
- gettimeofday ( &now, 0 );
-
- int how_many_hanging = 0;
-
- vector<child *>::iterator i;
- for ( i = begin(); i != end(); ++ i )
- {
- //Not in POSIX
- //timersub ( & now, &((*i)->startTime), & duration );
- duration.tv_sec = now.tv_sec - (*i)->startTime.tv_sec;
- duration.tv_usec = now.tv_usec - (*i)->startTime.tv_usec;
- if (duration.tv_usec < 0) {
- --duration.tv_sec;
- duration.tv_usec += 1000000;
- }
-
- if ( (COMPLETED != (*i)->status) // child isn't done running
- &&
- ( duration.tv_sec >= hangTime ) // it's been too long
- )
- {
- std::cerr << "Child of type "
- << (*i)->type
- << " hanging. "
- << "PID is "
- << (*i)->pid
- << endl;
- ++ how_many_hanging;
- }
- }
-
- return how_many_hanging;
- }
-
-
int verbosity;
};
-
children allMyChildren;
-
-
void
childExit ( int )
{
@@ -389,6 +339,7 @@ startNewBroker ( brokerVector & brokers,
("--log-prefix")
(prefix.str())
("--log-to-file")
+ ("--log-enable=error+")
(prefix.str()+".log");
if (endsWith(moduleOrDir, "cluster.so")) {
@@ -818,16 +769,6 @@ main ( int argc, char const ** argv )
return ERROR_ON_CHILD;
}
- // If one is hanging, quit.
- if ( allMyChildren.hanging ( 120 ) )
- {
- /*
- * Don't kill any processes. Leave alive for questioning.
- * */
- std::cerr << "END_OF_TEST ERROR_HANGING\n";
- return HANGING;
- }
-
if ( verbosity > 1 ) {
std::cerr << "------- next kill-broker loop --------\n";
allMyChildren.print();
diff --git a/cpp/src/tests/run_failover_soak b/cpp/src/tests/run_failover_soak
index cf9646ac58..333dd0be23 100755
--- a/cpp/src/tests/run_failover_soak
+++ b/cpp/src/tests/run_failover_soak
@@ -51,5 +51,6 @@ REPORT_FREQUENCY=${REPORT_FREQUENCY:-`expr $MESSAGES / 20`}
VERBOSITY=${VERBOSITY:-1}
DURABILITY=${DURABILITY:-0}
+rm -f soak-*.log
exec ./failover_soak $MODULES ./declare_queues ./replaying_sender ./resuming_receiver $MESSAGES $REPORT_FREQUENCY $VERBOSITY $DURABILITY
diff --git a/cpp/src/tests/start_cluster b/cpp/src/tests/start_cluster
index 053b23da33..585ba082d5 100755
--- a/cpp/src/tests/start_cluster
+++ b/cpp/src/tests/start_cluster
@@ -28,15 +28,17 @@ with_ais_group() {
echo $* | newgrp ais
}
-rm -f cluster*.log
-SIZE=${1:-1}; shift
+rm -f cluster*.log cluster.ports qpidd.port
+
+SIZE=${1:-3}; shift
CLUSTER=`pwd` # Cluster name=pwd, avoid clashes.
-OPTS="-d --no-module-dir --load-module ../.libs/cluster.so --cluster-name=$CLUSTER --no-data-dir --auth=no $@"
+OPTS="-d --no-module-dir --load-module ../.libs/cluster.so --cluster-name=$CLUSTER --auth=no $@"
for (( i=0; i<SIZE; ++i )); do
- PORT=`with_ais_group ../qpidd -p0 --log-to-file=cluster$i.log $OPTS` || exit 1
+ DDIR=`mktemp -d /tmp/start_cluster.XXXXXXXXXX`
+ PORT=`with_ais_group ../qpidd -p0 --log-to-file=cluster$i.log $OPTS --data-dir=$DDIR` || exit 1
echo $PORT >> cluster.ports
done
-head cluster.ports > qpidd.port # First member's port for tests.
+head -n 1 cluster.ports > qpidd.port # First member's port for tests.
diff --git a/cpp/src/tests/test_store.cpp b/cpp/src/tests/test_store.cpp
new file mode 100644
index 0000000000..da4f03192a
--- /dev/null
+++ b/cpp/src/tests/test_store.cpp
@@ -0,0 +1,146 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+
+/**@file
+ * Plug-in message store for tests.
+ *
+ * Add functionality as required, build up a comprehensive set of
+ * features to support persistent behavior tests.
+ *
+ * Current features special "action" messages can:
+ * - raise exception from enqueue.
+ * - force host process to exit.
+ * - do async completion after a delay.
+ */
+
+#include "qpid/broker/NullMessageStore.h"
+#include "qpid/broker/Broker.h"
+#include "qpid/log/Statement.h"
+#include "qpid/Plugin.h"
+#include "qpid/Options.h"
+#include <boost/algorithm/string.hpp>
+#include <boost/cast.hpp>
+#include <boost/lexical_cast.hpp>
+
+using namespace qpid;
+using namespace broker;
+using namespace std;
+using namespace boost;
+using namespace qpid::sys;
+
+struct TestStoreOptions : public Options {
+
+ string name;
+
+ TestStoreOptions() : Options("Test Store Options") {
+ addOptions()
+ ("test-store-name", optValue(name, "NAME"), "Name to identify test store instance.");
+ }
+};
+
+struct Completer : public Runnable {
+ intrusive_ptr<PersistableMessage> message;
+ int usecs;
+ Completer(intrusive_ptr<PersistableMessage> m, int u) : message(m), usecs(u) {}
+ void run() {
+ qpid::sys::usleep(usecs);
+ message->enqueueComplete();
+ delete this;
+ }
+};
+
+class TestStore : public NullMessageStore {
+ public:
+ TestStore(const string& name_, Broker& broker_) : name(name_), broker(broker_) {}
+
+ ~TestStore() {
+ for_each(threads.begin(), threads.end(), boost::bind(&Thread::join, _1));
+ }
+
+ void enqueue(TransactionContext* ,
+ const boost::intrusive_ptr<PersistableMessage>& msg,
+ const PersistableQueue& )
+ {
+ string data = polymorphic_downcast<Message*>(msg.get())->getFrames().getContent();
+
+ // Check the message for special instructions.
+ size_t i, j;
+ if (starts_with(data, TEST_STORE_DO)
+ && (i = data.find(name+"[")) != string::npos
+ && (j = data.find("]", i)) != string::npos)
+ {
+ size_t start = i+name.size()+1;
+ string action = data.substr(start, j-start);
+
+ if (action == EXCEPTION) {
+ throw Exception(QPID_MSG("TestStore " << name << " throwing exception for: " << data));
+ }
+ else if (action == EXIT_PROCESS) {
+ // FIXME aconway 2009-04-10: this is a dubious way to
+ // close the process at best, it can cause assertions or seg faults
+ // rather than clean exit.
+ QPID_LOG(critical, "TestStore " << name << " forcing process exit for: " << data);
+ exit(0);
+ }
+ else if (starts_with(action, ASYNC)) {
+ std::string delayStr(action.substr(ASYNC.size()));
+ int delay = lexical_cast<int>(delayStr);
+ threads.push_back(Thread(*new Completer(msg, delay)));
+ }
+ else {
+ QPID_LOG(error, "TestStore " << name << " unknown action " << action);
+ msg->enqueueComplete();
+ }
+ }
+ else
+ msg->enqueueComplete();
+ }
+
+ private:
+ static const string TEST_STORE_DO, EXCEPTION, EXIT_PROCESS, ASYNC;
+ string name;
+ Broker& broker;
+ vector<Thread> threads;
+};
+
+const string TestStore::TEST_STORE_DO = "TEST_STORE_DO: ";
+const string TestStore::EXCEPTION = "exception";
+const string TestStore::EXIT_PROCESS = "exit_process";
+const string TestStore::ASYNC="async ";
+
+struct TestStorePlugin : public Plugin {
+
+ TestStoreOptions options;
+
+ Options* getOptions() { return &options; }
+
+ void earlyInitialize (Plugin::Target& target)
+ {
+ Broker* broker = dynamic_cast<Broker*>(&target);
+ if (!broker) return;
+ broker->setStore (new TestStore(options.name, *broker));
+ }
+
+ void initialize(qpid::Plugin::Target&) {}
+};
+
+static TestStorePlugin pluginInstance;