summaryrefslogtreecommitdiff
path: root/cpp/src
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2007-07-23 13:08:16 +0000
committerAlan Conway <aconway@apache.org>2007-07-23 13:08:16 +0000
commit1a469b992ef2f28d98f43e63cf4d520c1bf830a4 (patch)
treec743052e2d2ab10a28960234e3efd3534cdb14c1 /cpp/src
parent4ab144d3d0a48a4abc1814e3244ef830344f19b2 (diff)
downloadqpid-python-1a469b992ef2f28d98f43e63cf4d520c1bf830a4.tar.gz
* src/tests/cluster.mk: Enable cluster test.
* src/tests/Cluster.h (class TestHandler): Fixed race in TestHandler::waitFor * src/tests/Cluster.cpp - Allow separate start of parent and child processes. * src/qpid/Options.cpp (parse): Skip argv parsing if argc=0. * src/qpid/cluster/Cluster.cpp (configChange): assert group name. * src/qpid/cluster/Cpg.cpp, .h: Additional logging * src/qpid/framing/AMQFrame.cpp: Initialize all fields in ctor, avoid valgrind warning. * src/qpid/log/Logger.cpp: Initialize singleton automatically from environment so logging can be used on tests. * src/qpid/sys/Time.h: Avoid overflow in AbsTime(t, TIME_INFINITE) git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@558710 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src')
-rw-r--r--cpp/src/qpid/Options.cpp6
-rw-r--r--cpp/src/qpid/cluster/Cluster.cpp34
-rw-r--r--cpp/src/qpid/cluster/Cpg.cpp18
-rw-r--r--cpp/src/qpid/cluster/Cpg.h5
-rw-r--r--cpp/src/qpid/framing/AMQFrame.cpp4
-rw-r--r--cpp/src/qpid/log/Logger.cpp11
-rw-r--r--cpp/src/qpid/sys/Time.h48
-rw-r--r--cpp/src/tests/Cluster.cpp29
-rw-r--r--cpp/src/tests/Cluster.h19
-rw-r--r--cpp/src/tests/Cpg.cpp4
-rw-r--r--cpp/src/tests/cluster.mk9
11 files changed, 108 insertions, 79 deletions
diff --git a/cpp/src/qpid/Options.cpp b/cpp/src/qpid/Options.cpp
index 18a3f726f9..7f4536ff7b 100644
--- a/cpp/src/qpid/Options.cpp
+++ b/cpp/src/qpid/Options.cpp
@@ -27,12 +27,13 @@ namespace qpid {
using namespace std;
namespace {
-const std::string prefix("QPID_");
+
char env2optchar(char env) { return (env=='_') ? '-' : tolower(env); }
struct Mapper {
Mapper(const Options& o) : opts(o) {}
string operator()(const string& env) {
+ static const std::string prefix("QPID_");
if (env.substr(0, prefix.size()) == prefix) {
string opt = env.substr(prefix.size());
transform(opt.begin(), opt.end(), opt.begin(), env2optchar);
@@ -58,7 +59,8 @@ void Options::parse(int argc, char** argv, const std::string& configFile)
try {
po::variables_map vm;
parsing="command line options";
- po::store(po::parse_command_line(argc, argv, *this), vm);
+ if (argc > 0 && argv != 0)
+ po::store(po::parse_command_line(argc, argv, *this), vm);
parsing="environment variables";
po::store(po::parse_environment(*this, Mapper(*this)), vm);
po::notify(vm); // configFile may be updated from arg/env options.
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp
index 256378ccd5..b59bfe878d 100644
--- a/cpp/src/qpid/cluster/Cluster.cpp
+++ b/cpp/src/qpid/cluster/Cluster.cpp
@@ -115,13 +115,14 @@ Cluster::MemberList Cluster::getMembers() const {
}
void Cluster::deliver(
- cpg_handle_t /*handle*/,
- struct cpg_name* /* group */,
- uint32_t nodeid,
- uint32_t pid,
- void* msg,
- int msg_len)
+ cpg_handle_t /*handle*/,
+ cpg_name* group,
+ uint32_t nodeid,
+ uint32_t pid,
+ void* msg,
+ int msg_len)
{
+ assert(name == *group);
Id from(nodeid, pid);
Buffer buf(static_cast<char*>(msg), msg_len);
SessionFrame frame;
@@ -149,26 +150,27 @@ void Cluster::handleClusterFrame(Id from, AMQFrame& frame) {
ClusterNotifyBody* notifyIn=
dynamic_cast<ClusterNotifyBody*>(frame.getBody().get());
assert(notifyIn);
- MemberList list;
- {
- Mutex::ScopedLock l(lock);
+ MemberList list;
+ {
+ Mutex::ScopedLock l(lock);
shared_ptr<Member>& member=members[from];
if (!member)
member.reset(new Member(notifyIn->getUrl()));
- else
+ else
member->url = notifyIn->getUrl();
- lock.notifyAll();
+ lock.notifyAll();
QPID_LOG(trace, *this << ": members joined: " << members);
- }
+ }
}
void Cluster::configChange(
cpg_handle_t /*handle*/,
- struct cpg_name */*group*/,
- struct cpg_address */*current*/, int /*nCurrent*/,
- struct cpg_address *left, int nLeft,
- struct cpg_address *joined, int nJoined)
+ cpg_name *group,
+ cpg_address */*current*/, int /*nCurrent*/,
+ cpg_address *left, int nLeft,
+ cpg_address *joined, int nJoined)
{
+ assert(name == *group);
bool newMembers=false;
MemberList updated;
{
diff --git a/cpp/src/qpid/cluster/Cpg.cpp b/cpp/src/qpid/cluster/Cpg.cpp
index 3148e52789..87e483141e 100644
--- a/cpp/src/qpid/cluster/Cpg.cpp
+++ b/cpp/src/qpid/cluster/Cpg.cpp
@@ -90,25 +90,22 @@ Cpg::Cpg(Handler& h) : handler(h) {
cpg_callbacks_t callbacks = { &globalDeliver, &globalConfigChange };
check(cpg_initialize(&handle, &callbacks), "Cannot initialize CPG");
handles.put(handle, &handler);
+ QPID_LOG(debug, "Initialize CPG handle " << handle);
}
Cpg::~Cpg() {
try {
shutdown();
} catch (const std::exception& e) {
- QPID_LOG(error, string("Exception in Cpg destructor: ")+e.what());
+ QPID_LOG(error, "Exception in Cpg destructor: " << e.what());
}
}
-struct Cpg::ClearHandleOnExit {
- ClearHandleOnExit(cpg_handle_t h) : handle(h) {}
- ~ClearHandleOnExit() { Cpg::handles.put(handle, 0); }
- cpg_handle_t handle;
-};
-
void Cpg::shutdown() {
+ QPID_LOG(debug, "Shutdown CPG handle " << handle);
if (handles.get(handle)) {
- ClearHandleOnExit guard(handle); // Exception safe
+ QPID_LOG(debug, "Finalize CPG handle " << handle);
+ handles.put(handle, 0);
check(cpg_finalize(handle), "Error in shutdown of CPG");
}
}
@@ -173,8 +170,11 @@ ostream& operator <<(ostream& out, const Cpg::Id& id) {
return out << ":" << id.pid();
}
+ostream& operator <<(ostream& out, const cpg_name& name) {
+ return out << string(name.value, name.length);
+}
-}} // namespace qpid::cpg
+}} // namespace qpid::cluster
diff --git a/cpp/src/qpid/cluster/Cpg.h b/cpp/src/qpid/cluster/Cpg.h
index d616be74e2..351b65d56a 100644
--- a/cpp/src/qpid/cluster/Cpg.h
+++ b/cpp/src/qpid/cluster/Cpg.h
@@ -170,9 +170,14 @@ class Cpg : public Dispatchable {
Handler& handler;
};
+std::ostream& operator <<(std::ostream& out, const cpg_name& name);
std::ostream& operator <<(std::ostream& out, const Cpg::Id& id);
std::ostream& operator <<(std::ostream& out, const std::pair<cpg_address*,int> addresses);
+inline bool operator==(const cpg_name& a, const cpg_name& b) {
+ return a.length==b.length && strncmp(a.value, b.value, a.length) == 0;
+}
+inline bool operator!=(const cpg_name& a, const cpg_name& b) { return !(a == b); }
}} // namespace qpid::cluster
diff --git a/cpp/src/qpid/framing/AMQFrame.cpp b/cpp/src/qpid/framing/AMQFrame.cpp
index 13f1d3cece..778c9ab505 100644
--- a/cpp/src/qpid/framing/AMQFrame.cpp
+++ b/cpp/src/qpid/framing/AMQFrame.cpp
@@ -33,8 +33,8 @@ namespace framing {
AMQP_MethodVersionMap AMQFrame::versionMap;
-AMQFrame::AMQFrame(ProtocolVersion _version):
-version(_version)
+AMQFrame::AMQFrame(ProtocolVersion _version)
+ : channel(0), type(0), version(_version)
{
assert(version != ProtocolVersion(0,0));
}
diff --git a/cpp/src/qpid/log/Logger.cpp b/cpp/src/qpid/log/Logger.cpp
index ceb3977106..6e8f3a59cc 100644
--- a/cpp/src/qpid/log/Logger.cpp
+++ b/cpp/src/qpid/log/Logger.cpp
@@ -85,7 +85,15 @@ Logger& Logger::instance() {
return boost::details::pool::singleton_default<Logger>::instance();
}
-Logger::Logger() : flags(0) {}
+Logger::Logger() : flags(0) {
+ // Initialize myself from env variables so all programs
+ // (e.g. tests) can use logging even if they don't parse
+ // command line args.
+ Options opts;
+ opts.parse(0, 0);
+ configure(opts,"");
+}
+
Logger::~Logger() {}
void Logger::select(const Selector& s) {
@@ -190,6 +198,7 @@ void Logger::remove(Statement& s) {
void Logger::configure(const Options& opts, const std::string& prog)
{
+ clear();
Options o(opts);
if (o.trace)
o.selectors.push_back("trace+");
diff --git a/cpp/src/qpid/sys/Time.h b/cpp/src/qpid/sys/Time.h
index 25b1606844..cff5b70d8e 100644
--- a/cpp/src/qpid/sys/Time.h
+++ b/cpp/src/qpid/sys/Time.h
@@ -32,41 +32,43 @@ class Duration;
/** Times in nanoseconds */
class AbsTime {
+ static int64_t max() { return std::numeric_limits<int64_t>::max(); }
int64_t time_ns;
-
- friend class Duration;
+
+ friend class Duration;
-public:
- inline AbsTime() {}
- inline AbsTime(const AbsTime& time0, const Duration& duration);
- // Default asignment operation fine
- // Default copy constructor fine
+ public:
+ inline AbsTime() {}
+ inline AbsTime(const AbsTime& time0, const Duration& duration);
+ // Default asignment operation fine
+ // Default copy constructor fine
- static AbsTime now();
- inline static AbsTime FarFuture();
+ static AbsTime now();
+ inline static AbsTime FarFuture();
- friend bool operator<(const AbsTime& a, const AbsTime& b);
- friend bool operator>(const AbsTime& a, const AbsTime& b);
+ friend bool operator<(const AbsTime& a, const AbsTime& b);
+ friend bool operator>(const AbsTime& a, const AbsTime& b);
};
class Duration {
+ static int64_t max() { return std::numeric_limits<int64_t>::max(); }
int64_t nanosecs;
- friend class AbsTime;
+ friend class AbsTime;
-public:
- inline Duration(int64_t time0);
- inline explicit Duration(const AbsTime& time0);
- inline explicit Duration(const AbsTime& start, const AbsTime& finish);
- inline operator int64_t() const;
+ public:
+ inline Duration(int64_t time0);
+ inline explicit Duration(const AbsTime& time0);
+ inline explicit Duration(const AbsTime& start, const AbsTime& finish);
+ inline operator int64_t() const;
};
-AbsTime::AbsTime(const AbsTime& time0, const Duration& duration0) :
- time_ns(time0.time_ns+duration0.nanosecs)
+AbsTime::AbsTime(const AbsTime& t, const Duration& d) :
+ time_ns(d == Duration::max() ? max() : t.time_ns+d.nanosecs)
{}
-AbsTime AbsTime::FarFuture() { AbsTime ff; ff.time_ns = std::numeric_limits<int64_t>::max(); return ff;}
+AbsTime AbsTime::FarFuture() { AbsTime ff; ff.time_ns = max(); return ff;}
inline AbsTime now() { return AbsTime::now(); }
@@ -74,15 +76,15 @@ inline bool operator<(const AbsTime& a, const AbsTime& b) { return a.time_ns < b
inline bool operator>(const AbsTime& a, const AbsTime& b) { return a.time_ns > b.time_ns; }
Duration::Duration(int64_t time0) :
- nanosecs(time0)
+ nanosecs(time0)
{}
Duration::Duration(const AbsTime& time0) :
- nanosecs(time0.time_ns)
+ nanosecs(time0.time_ns)
{}
Duration::Duration(const AbsTime& start, const AbsTime& finish) :
- nanosecs(finish.time_ns - start.time_ns)
+ nanosecs(finish.time_ns - start.time_ns)
{}
Duration::operator int64_t() const
diff --git a/cpp/src/tests/Cluster.cpp b/cpp/src/tests/Cluster.cpp
index 56e17e06db..b22f312038 100644
--- a/cpp/src/tests/Cluster.cpp
+++ b/cpp/src/tests/Cluster.cpp
@@ -30,8 +30,6 @@
static const ProtocolVersion VER;
-using namespace qpid::log;
-
/** Verify membership in a cluster with one member. */
BOOST_AUTO_TEST_CASE(testClusterOne) {
TestCluster cluster("clusterOne", "amqp:one:1");
@@ -55,10 +53,15 @@ BOOST_AUTO_TEST_CASE(testClusterOne) {
/** Fork a process to test a cluster with two members */
BOOST_AUTO_TEST_CASE(testClusterTwo) {
- pid_t pid=fork();
- BOOST_REQUIRE(pid >= 0);
- if (pid) { // Parent, see Cluster_child.cpp for child.
- TestCluster cluster("clusterTwo", "amqp::1");
+ bool nofork=getenv("NOFORK") != 0;
+ pid_t pid=0;
+ if (!nofork) {
+ pid = fork();
+ BOOST_REQUIRE(pid >= 0);
+ }
+ if (pid || nofork) { // Parent
+ BOOST_MESSAGE("Parent start");
+ TestCluster cluster("clusterTwo", "amqp:parent:1");
BOOST_REQUIRE(cluster.waitFor(2)); // Myself and child.
// Exchange frames with child.
@@ -74,12 +77,14 @@ BOOST_AUTO_TEST_CASE(testClusterTwo) {
BOOST_REQUIRE(cluster.received.waitFor(2));
BOOST_CHECK_TYPEID_EQUAL(ChannelOkBody, *cluster.received[1].frame.getBody());
- // Wait for child to exit.
- int status;
- BOOST_CHECK_EQUAL(::wait(&status), pid);
- BOOST_CHECK_EQUAL(0, status);
- BOOST_CHECK(cluster.waitFor(1));
- BOOST_CHECK_EQUAL(1u, cluster.size());
+ if (!nofork) {
+ // Wait for child to exit.
+ int status;
+ BOOST_CHECK_EQUAL(::wait(&status), pid);
+ BOOST_CHECK_EQUAL(0, status);
+ BOOST_CHECK(cluster.waitFor(1));
+ BOOST_CHECK_EQUAL(1u, cluster.size());
+ }
}
else { // Child
BOOST_REQUIRE(execl("./Cluster_child", "./Cluster_child", NULL));
diff --git a/cpp/src/tests/Cluster.h b/cpp/src/tests/Cluster.h
index bf6d1c2a64..c2909e0c1b 100644
--- a/cpp/src/tests/Cluster.h
+++ b/cpp/src/tests/Cluster.h
@@ -48,20 +48,25 @@ using namespace boost;
void null_deleter(void*) {}
template <class T>
-struct TestHandler : public Handler<T&>, public vector<T>, public Monitor
+class TestHandler : public Handler<T&>, public vector<T>
{
+ Monitor lock;
+
+ public:
void handle(T& frame) {
- Mutex::ScopedLock l(*this);
+ Mutex::ScopedLock l(lock);
push_back(frame);
- notifyAll();
+ BOOST_MESSAGE(getpid()<<" TestHandler::handle: " << this->size());
+ lock.notifyAll();
}
bool waitFor(size_t n) {
- Mutex::ScopedLock l(*this);
- AbsTime deadline(now(), 5*TIME_SEC);
- while (vector<T>::size() != n && wait(deadline))
+ Mutex::ScopedLock l(lock);
+ BOOST_MESSAGE(getpid()<<" TestHandler::waitFor("<<n<<") "<<this->size());
+ AbsTime deadline(now(), 2*TIME_SEC);
+ while (vector<T>::size() < n && lock.wait(deadline))
;
- return vector<T>::size() == n;
+ return vector<T>::size() >= n;
}
};
diff --git a/cpp/src/tests/Cpg.cpp b/cpp/src/tests/Cpg.cpp
index ec98ca4fc2..7c0d6f7fd0 100644
--- a/cpp/src/tests/Cpg.cpp
+++ b/cpp/src/tests/Cpg.cpp
@@ -82,11 +82,11 @@ struct Callback : public Cpg::Handler {
}
};
-BOOST_AUTO_TEST_CASE(Cpg_basic) {
+BOOST_AUTO_TEST_CASE(CpgBasic) {
// Verify basic functionality of cpg. This will catch any
// openais configuration or permission errors.
//
- Cpg::Name group("foo");
+ Cpg::Name group("CpgBasic");
Callback cb(group.str());
Cpg cpg(cb);
cpg.join(group);
diff --git a/cpp/src/tests/cluster.mk b/cpp/src/tests/cluster.mk
index 506624569f..7407565f62 100644
--- a/cpp/src/tests/cluster.mk
+++ b/cpp/src/tests/cluster.mk
@@ -20,11 +20,10 @@ check_PROGRAMS+=Cpg
Cpg_SOURCES=Cpg.cpp
Cpg_LDADD=$(lib_cluster) -lboost_unit_test_framework
-# FIXME aconway 2007-07-19:
-# TESTS+=Cluster
-# check_PROGRAMS+=Cluster
-# Cluster_SOURCES=Cluster.cpp Cluster.h
-# Cluster_LDADD=$(lib_cluster) -lboost_unit_test_framework
+TESTS+=Cluster
+check_PROGRAMS+=Cluster
+Cluster_SOURCES=Cluster.cpp Cluster.h
+Cluster_LDADD=$(lib_cluster) -lboost_unit_test_framework
check_PROGRAMS+=Cluster_child
Cluster_child_SOURCES=Cluster_child.cpp Cluster.h