diff options
author | Alan Conway <aconway@apache.org> | 2007-07-23 13:08:16 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2007-07-23 13:08:16 +0000 |
commit | 1a469b992ef2f28d98f43e63cf4d520c1bf830a4 (patch) | |
tree | c743052e2d2ab10a28960234e3efd3534cdb14c1 /cpp/src | |
parent | 4ab144d3d0a48a4abc1814e3244ef830344f19b2 (diff) | |
download | qpid-python-1a469b992ef2f28d98f43e63cf4d520c1bf830a4.tar.gz |
* src/tests/cluster.mk: Enable cluster test.
* src/tests/Cluster.h (class TestHandler):
Fixed race in TestHandler::waitFor
* src/tests/Cluster.cpp
- Allow separate start of parent and child processes.
* src/qpid/Options.cpp (parse): Skip argv parsing if argc=0.
* src/qpid/cluster/Cluster.cpp (configChange): assert group name.
* src/qpid/cluster/Cpg.cpp, .h: Additional logging
* src/qpid/framing/AMQFrame.cpp: Initialize all fields in ctor,
avoid valgrind warning.
* src/qpid/log/Logger.cpp: Initialize singleton automatically
from environment so logging can be used on tests.
* src/qpid/sys/Time.h: Avoid overflow in AbsTime(t, TIME_INFINITE)
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@558710 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src')
-rw-r--r-- | cpp/src/qpid/Options.cpp | 6 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/Cluster.cpp | 34 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/Cpg.cpp | 18 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/Cpg.h | 5 | ||||
-rw-r--r-- | cpp/src/qpid/framing/AMQFrame.cpp | 4 | ||||
-rw-r--r-- | cpp/src/qpid/log/Logger.cpp | 11 | ||||
-rw-r--r-- | cpp/src/qpid/sys/Time.h | 48 | ||||
-rw-r--r-- | cpp/src/tests/Cluster.cpp | 29 | ||||
-rw-r--r-- | cpp/src/tests/Cluster.h | 19 | ||||
-rw-r--r-- | cpp/src/tests/Cpg.cpp | 4 | ||||
-rw-r--r-- | cpp/src/tests/cluster.mk | 9 |
11 files changed, 108 insertions, 79 deletions
diff --git a/cpp/src/qpid/Options.cpp b/cpp/src/qpid/Options.cpp index 18a3f726f9..7f4536ff7b 100644 --- a/cpp/src/qpid/Options.cpp +++ b/cpp/src/qpid/Options.cpp @@ -27,12 +27,13 @@ namespace qpid { using namespace std; namespace { -const std::string prefix("QPID_"); + char env2optchar(char env) { return (env=='_') ? '-' : tolower(env); } struct Mapper { Mapper(const Options& o) : opts(o) {} string operator()(const string& env) { + static const std::string prefix("QPID_"); if (env.substr(0, prefix.size()) == prefix) { string opt = env.substr(prefix.size()); transform(opt.begin(), opt.end(), opt.begin(), env2optchar); @@ -58,7 +59,8 @@ void Options::parse(int argc, char** argv, const std::string& configFile) try { po::variables_map vm; parsing="command line options"; - po::store(po::parse_command_line(argc, argv, *this), vm); + if (argc > 0 && argv != 0) + po::store(po::parse_command_line(argc, argv, *this), vm); parsing="environment variables"; po::store(po::parse_environment(*this, Mapper(*this)), vm); po::notify(vm); // configFile may be updated from arg/env options. diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp index 256378ccd5..b59bfe878d 100644 --- a/cpp/src/qpid/cluster/Cluster.cpp +++ b/cpp/src/qpid/cluster/Cluster.cpp @@ -115,13 +115,14 @@ Cluster::MemberList Cluster::getMembers() const { } void Cluster::deliver( - cpg_handle_t /*handle*/, - struct cpg_name* /* group */, - uint32_t nodeid, - uint32_t pid, - void* msg, - int msg_len) + cpg_handle_t /*handle*/, + cpg_name* group, + uint32_t nodeid, + uint32_t pid, + void* msg, + int msg_len) { + assert(name == *group); Id from(nodeid, pid); Buffer buf(static_cast<char*>(msg), msg_len); SessionFrame frame; @@ -149,26 +150,27 @@ void Cluster::handleClusterFrame(Id from, AMQFrame& frame) { ClusterNotifyBody* notifyIn= dynamic_cast<ClusterNotifyBody*>(frame.getBody().get()); assert(notifyIn); - MemberList list; - { - Mutex::ScopedLock l(lock); + MemberList list; + { + Mutex::ScopedLock l(lock); shared_ptr<Member>& member=members[from]; if (!member) member.reset(new Member(notifyIn->getUrl())); - else + else member->url = notifyIn->getUrl(); - lock.notifyAll(); + lock.notifyAll(); QPID_LOG(trace, *this << ": members joined: " << members); - } + } } void Cluster::configChange( cpg_handle_t /*handle*/, - struct cpg_name */*group*/, - struct cpg_address */*current*/, int /*nCurrent*/, - struct cpg_address *left, int nLeft, - struct cpg_address *joined, int nJoined) + cpg_name *group, + cpg_address */*current*/, int /*nCurrent*/, + cpg_address *left, int nLeft, + cpg_address *joined, int nJoined) { + assert(name == *group); bool newMembers=false; MemberList updated; { diff --git a/cpp/src/qpid/cluster/Cpg.cpp b/cpp/src/qpid/cluster/Cpg.cpp index 3148e52789..87e483141e 100644 --- a/cpp/src/qpid/cluster/Cpg.cpp +++ b/cpp/src/qpid/cluster/Cpg.cpp @@ -90,25 +90,22 @@ Cpg::Cpg(Handler& h) : handler(h) { cpg_callbacks_t callbacks = { &globalDeliver, &globalConfigChange }; check(cpg_initialize(&handle, &callbacks), "Cannot initialize CPG"); handles.put(handle, &handler); + QPID_LOG(debug, "Initialize CPG handle " << handle); } Cpg::~Cpg() { try { shutdown(); } catch (const std::exception& e) { - QPID_LOG(error, string("Exception in Cpg destructor: ")+e.what()); + QPID_LOG(error, "Exception in Cpg destructor: " << e.what()); } } -struct Cpg::ClearHandleOnExit { - ClearHandleOnExit(cpg_handle_t h) : handle(h) {} - ~ClearHandleOnExit() { Cpg::handles.put(handle, 0); } - cpg_handle_t handle; -}; - void Cpg::shutdown() { + QPID_LOG(debug, "Shutdown CPG handle " << handle); if (handles.get(handle)) { - ClearHandleOnExit guard(handle); // Exception safe + QPID_LOG(debug, "Finalize CPG handle " << handle); + handles.put(handle, 0); check(cpg_finalize(handle), "Error in shutdown of CPG"); } } @@ -173,8 +170,11 @@ ostream& operator <<(ostream& out, const Cpg::Id& id) { return out << ":" << id.pid(); } +ostream& operator <<(ostream& out, const cpg_name& name) { + return out << string(name.value, name.length); +} -}} // namespace qpid::cpg +}} // namespace qpid::cluster diff --git a/cpp/src/qpid/cluster/Cpg.h b/cpp/src/qpid/cluster/Cpg.h index d616be74e2..351b65d56a 100644 --- a/cpp/src/qpid/cluster/Cpg.h +++ b/cpp/src/qpid/cluster/Cpg.h @@ -170,9 +170,14 @@ class Cpg : public Dispatchable { Handler& handler; }; +std::ostream& operator <<(std::ostream& out, const cpg_name& name); std::ostream& operator <<(std::ostream& out, const Cpg::Id& id); std::ostream& operator <<(std::ostream& out, const std::pair<cpg_address*,int> addresses); +inline bool operator==(const cpg_name& a, const cpg_name& b) { + return a.length==b.length && strncmp(a.value, b.value, a.length) == 0; +} +inline bool operator!=(const cpg_name& a, const cpg_name& b) { return !(a == b); } }} // namespace qpid::cluster diff --git a/cpp/src/qpid/framing/AMQFrame.cpp b/cpp/src/qpid/framing/AMQFrame.cpp index 13f1d3cece..778c9ab505 100644 --- a/cpp/src/qpid/framing/AMQFrame.cpp +++ b/cpp/src/qpid/framing/AMQFrame.cpp @@ -33,8 +33,8 @@ namespace framing { AMQP_MethodVersionMap AMQFrame::versionMap; -AMQFrame::AMQFrame(ProtocolVersion _version): -version(_version) +AMQFrame::AMQFrame(ProtocolVersion _version) + : channel(0), type(0), version(_version) { assert(version != ProtocolVersion(0,0)); } diff --git a/cpp/src/qpid/log/Logger.cpp b/cpp/src/qpid/log/Logger.cpp index ceb3977106..6e8f3a59cc 100644 --- a/cpp/src/qpid/log/Logger.cpp +++ b/cpp/src/qpid/log/Logger.cpp @@ -85,7 +85,15 @@ Logger& Logger::instance() { return boost::details::pool::singleton_default<Logger>::instance(); } -Logger::Logger() : flags(0) {} +Logger::Logger() : flags(0) { + // Initialize myself from env variables so all programs + // (e.g. tests) can use logging even if they don't parse + // command line args. + Options opts; + opts.parse(0, 0); + configure(opts,""); +} + Logger::~Logger() {} void Logger::select(const Selector& s) { @@ -190,6 +198,7 @@ void Logger::remove(Statement& s) { void Logger::configure(const Options& opts, const std::string& prog) { + clear(); Options o(opts); if (o.trace) o.selectors.push_back("trace+"); diff --git a/cpp/src/qpid/sys/Time.h b/cpp/src/qpid/sys/Time.h index 25b1606844..cff5b70d8e 100644 --- a/cpp/src/qpid/sys/Time.h +++ b/cpp/src/qpid/sys/Time.h @@ -32,41 +32,43 @@ class Duration; /** Times in nanoseconds */ class AbsTime { + static int64_t max() { return std::numeric_limits<int64_t>::max(); } int64_t time_ns; - - friend class Duration; + + friend class Duration; -public: - inline AbsTime() {} - inline AbsTime(const AbsTime& time0, const Duration& duration); - // Default asignment operation fine - // Default copy constructor fine + public: + inline AbsTime() {} + inline AbsTime(const AbsTime& time0, const Duration& duration); + // Default asignment operation fine + // Default copy constructor fine - static AbsTime now(); - inline static AbsTime FarFuture(); + static AbsTime now(); + inline static AbsTime FarFuture(); - friend bool operator<(const AbsTime& a, const AbsTime& b); - friend bool operator>(const AbsTime& a, const AbsTime& b); + friend bool operator<(const AbsTime& a, const AbsTime& b); + friend bool operator>(const AbsTime& a, const AbsTime& b); }; class Duration { + static int64_t max() { return std::numeric_limits<int64_t>::max(); } int64_t nanosecs; - friend class AbsTime; + friend class AbsTime; -public: - inline Duration(int64_t time0); - inline explicit Duration(const AbsTime& time0); - inline explicit Duration(const AbsTime& start, const AbsTime& finish); - inline operator int64_t() const; + public: + inline Duration(int64_t time0); + inline explicit Duration(const AbsTime& time0); + inline explicit Duration(const AbsTime& start, const AbsTime& finish); + inline operator int64_t() const; }; -AbsTime::AbsTime(const AbsTime& time0, const Duration& duration0) : - time_ns(time0.time_ns+duration0.nanosecs) +AbsTime::AbsTime(const AbsTime& t, const Duration& d) : + time_ns(d == Duration::max() ? max() : t.time_ns+d.nanosecs) {} -AbsTime AbsTime::FarFuture() { AbsTime ff; ff.time_ns = std::numeric_limits<int64_t>::max(); return ff;} +AbsTime AbsTime::FarFuture() { AbsTime ff; ff.time_ns = max(); return ff;} inline AbsTime now() { return AbsTime::now(); } @@ -74,15 +76,15 @@ inline bool operator<(const AbsTime& a, const AbsTime& b) { return a.time_ns < b inline bool operator>(const AbsTime& a, const AbsTime& b) { return a.time_ns > b.time_ns; } Duration::Duration(int64_t time0) : - nanosecs(time0) + nanosecs(time0) {} Duration::Duration(const AbsTime& time0) : - nanosecs(time0.time_ns) + nanosecs(time0.time_ns) {} Duration::Duration(const AbsTime& start, const AbsTime& finish) : - nanosecs(finish.time_ns - start.time_ns) + nanosecs(finish.time_ns - start.time_ns) {} Duration::operator int64_t() const diff --git a/cpp/src/tests/Cluster.cpp b/cpp/src/tests/Cluster.cpp index 56e17e06db..b22f312038 100644 --- a/cpp/src/tests/Cluster.cpp +++ b/cpp/src/tests/Cluster.cpp @@ -30,8 +30,6 @@ static const ProtocolVersion VER; -using namespace qpid::log; - /** Verify membership in a cluster with one member. */ BOOST_AUTO_TEST_CASE(testClusterOne) { TestCluster cluster("clusterOne", "amqp:one:1"); @@ -55,10 +53,15 @@ BOOST_AUTO_TEST_CASE(testClusterOne) { /** Fork a process to test a cluster with two members */ BOOST_AUTO_TEST_CASE(testClusterTwo) { - pid_t pid=fork(); - BOOST_REQUIRE(pid >= 0); - if (pid) { // Parent, see Cluster_child.cpp for child. - TestCluster cluster("clusterTwo", "amqp::1"); + bool nofork=getenv("NOFORK") != 0; + pid_t pid=0; + if (!nofork) { + pid = fork(); + BOOST_REQUIRE(pid >= 0); + } + if (pid || nofork) { // Parent + BOOST_MESSAGE("Parent start"); + TestCluster cluster("clusterTwo", "amqp:parent:1"); BOOST_REQUIRE(cluster.waitFor(2)); // Myself and child. // Exchange frames with child. @@ -74,12 +77,14 @@ BOOST_AUTO_TEST_CASE(testClusterTwo) { BOOST_REQUIRE(cluster.received.waitFor(2)); BOOST_CHECK_TYPEID_EQUAL(ChannelOkBody, *cluster.received[1].frame.getBody()); - // Wait for child to exit. - int status; - BOOST_CHECK_EQUAL(::wait(&status), pid); - BOOST_CHECK_EQUAL(0, status); - BOOST_CHECK(cluster.waitFor(1)); - BOOST_CHECK_EQUAL(1u, cluster.size()); + if (!nofork) { + // Wait for child to exit. + int status; + BOOST_CHECK_EQUAL(::wait(&status), pid); + BOOST_CHECK_EQUAL(0, status); + BOOST_CHECK(cluster.waitFor(1)); + BOOST_CHECK_EQUAL(1u, cluster.size()); + } } else { // Child BOOST_REQUIRE(execl("./Cluster_child", "./Cluster_child", NULL)); diff --git a/cpp/src/tests/Cluster.h b/cpp/src/tests/Cluster.h index bf6d1c2a64..c2909e0c1b 100644 --- a/cpp/src/tests/Cluster.h +++ b/cpp/src/tests/Cluster.h @@ -48,20 +48,25 @@ using namespace boost; void null_deleter(void*) {} template <class T> -struct TestHandler : public Handler<T&>, public vector<T>, public Monitor +class TestHandler : public Handler<T&>, public vector<T> { + Monitor lock; + + public: void handle(T& frame) { - Mutex::ScopedLock l(*this); + Mutex::ScopedLock l(lock); push_back(frame); - notifyAll(); + BOOST_MESSAGE(getpid()<<" TestHandler::handle: " << this->size()); + lock.notifyAll(); } bool waitFor(size_t n) { - Mutex::ScopedLock l(*this); - AbsTime deadline(now(), 5*TIME_SEC); - while (vector<T>::size() != n && wait(deadline)) + Mutex::ScopedLock l(lock); + BOOST_MESSAGE(getpid()<<" TestHandler::waitFor("<<n<<") "<<this->size()); + AbsTime deadline(now(), 2*TIME_SEC); + while (vector<T>::size() < n && lock.wait(deadline)) ; - return vector<T>::size() == n; + return vector<T>::size() >= n; } }; diff --git a/cpp/src/tests/Cpg.cpp b/cpp/src/tests/Cpg.cpp index ec98ca4fc2..7c0d6f7fd0 100644 --- a/cpp/src/tests/Cpg.cpp +++ b/cpp/src/tests/Cpg.cpp @@ -82,11 +82,11 @@ struct Callback : public Cpg::Handler { } }; -BOOST_AUTO_TEST_CASE(Cpg_basic) { +BOOST_AUTO_TEST_CASE(CpgBasic) { // Verify basic functionality of cpg. This will catch any // openais configuration or permission errors. // - Cpg::Name group("foo"); + Cpg::Name group("CpgBasic"); Callback cb(group.str()); Cpg cpg(cb); cpg.join(group); diff --git a/cpp/src/tests/cluster.mk b/cpp/src/tests/cluster.mk index 506624569f..7407565f62 100644 --- a/cpp/src/tests/cluster.mk +++ b/cpp/src/tests/cluster.mk @@ -20,11 +20,10 @@ check_PROGRAMS+=Cpg Cpg_SOURCES=Cpg.cpp Cpg_LDADD=$(lib_cluster) -lboost_unit_test_framework -# FIXME aconway 2007-07-19: -# TESTS+=Cluster -# check_PROGRAMS+=Cluster -# Cluster_SOURCES=Cluster.cpp Cluster.h -# Cluster_LDADD=$(lib_cluster) -lboost_unit_test_framework +TESTS+=Cluster +check_PROGRAMS+=Cluster +Cluster_SOURCES=Cluster.cpp Cluster.h +Cluster_LDADD=$(lib_cluster) -lboost_unit_test_framework check_PROGRAMS+=Cluster_child Cluster_child_SOURCES=Cluster_child.cpp Cluster.h |