summaryrefslogtreecommitdiff
path: root/cpp/src
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2007-06-26 02:11:55 +0000
committerAlan Conway <aconway@apache.org>2007-06-26 02:11:55 +0000
commite6566439f627e375f12f77044819bbb37b585348 (patch)
tree18c52172d536b53df57e82a274a31bcfabc35f7b /cpp/src
parent87c376ebc8fe6af86dc8aef8dcec03510ff5dcc0 (diff)
downloadqpid-python-e6566439f627e375f12f77044819bbb37b585348.tar.gz
2007-06-25 <aconway@redhat.com>
Cluster class implementing cluster membership map. * src/qpid/cluster/Cluster.cpp: Cluster membership implementation. * src/qpid/cluster/Cpg.cpp: Support for boost::function callbacks. * src/tests/Url.cpp: Implements AMQP-95 URL format. * xml/cluster.xml: Cluster join method. Build/packaging * README: Remove mention of openais till clustering is functional. For now it is optional and we depend on an unpackaged version. * configure.ac: Check openais has cpg_local_get(). * Makefile.am: Added cluster.xml to EXTRA_DIST. * src/generate.sh: add cluster.xml to codegen. * src/tests/Makefile.am: - Generate individual "sudo -u ais" wrappers for openais tests. - Drop "unit" directory, all unit tests in "tests" directory Minor changes: * src/qpid/sys/posix/Socket.cpp: * src/qpid/sys/posix/PosixAcceptor.cpp: * src/qpid/sys/posix/EventChannelAcceptor.cpp: * src/qpid/sys/apr/APRAcceptor.cpp: * src/qpid/sys/Acceptor.h (getHost): Added getHost() * src/tests/.valgrind.supp-default: Suppress benign valgrind warning in libcpg. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@550658 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src')
-rw-r--r--cpp/src/Makefile.am1
-rw-r--r--cpp/src/cluster.mk11
-rwxr-xr-xcpp/src/generate.sh3
-rw-r--r--cpp/src/qpid/cluster/Cluster.cpp159
-rw-r--r--cpp/src/qpid/cluster/Cluster.h116
-rw-r--r--cpp/src/qpid/cluster/Cpg.cpp102
-rw-r--r--cpp/src/qpid/cluster/Cpg.h87
-rw-r--r--cpp/src/qpid/cluster/Dispatchable.h52
-rw-r--r--cpp/src/qpid/framing/FrameHandler.h39
-rw-r--r--cpp/src/qpid/sys/Acceptor.h1
-rw-r--r--cpp/src/qpid/sys/apr/APRAcceptor.cpp7
-rw-r--r--cpp/src/qpid/sys/posix/EventChannelAcceptor.cpp5
-rw-r--r--cpp/src/qpid/sys/posix/PosixAcceptor.cpp1
-rw-r--r--cpp/src/qpid/sys/posix/Socket.cpp16
-rw-r--r--cpp/src/tests/.valgrind.supp-default9
-rw-r--r--cpp/src/tests/Cluster.cpp72
-rw-r--r--cpp/src/tests/Cluster.h83
-rw-r--r--cpp/src/tests/Cpg.cpp (renamed from cpp/src/tests/unit/Cpg.cpp)80
-rw-r--r--cpp/src/tests/Makefile.am53
-rw-r--r--cpp/src/tests/Url.cpp (renamed from cpp/src/tests/unit/Url.cpp)1
-rwxr-xr-xcpp/src/tests/ais_unit_tests2
-rw-r--r--cpp/src/tests/cluster.mk38
-rw-r--r--cpp/src/tests/logging.cpp (renamed from cpp/src/tests/unit/logging.cpp)1
-rwxr-xr-xcpp/src/tests/run_test6
-rw-r--r--cpp/src/tests/test_tools.h (renamed from cpp/src/tests/unit/test_tools.h)5
25 files changed, 850 insertions, 100 deletions
diff --git a/cpp/src/Makefile.am b/cpp/src/Makefile.am
index 5b09172ee2..47e714955c 100644
--- a/cpp/src/Makefile.am
+++ b/cpp/src/Makefile.am
@@ -133,6 +133,7 @@ libqpidcommon_la_SOURCES = \
qpid/framing/AMQHeaderBody.cpp \
qpid/framing/AMQHeartbeatBody.cpp \
qpid/framing/AMQMethodBody.cpp \
+ qpid/framing/FrameHandler.h \
qpid/framing/MethodContext.cpp \
qpid/framing/BasicHeaderProperties.cpp \
qpid/framing/BodyHandler.cpp \
diff --git a/cpp/src/cluster.mk b/cpp/src/cluster.mk
index d5b0bed97f..f97e95c208 100644
--- a/cpp/src/cluster.mk
+++ b/cpp/src/cluster.mk
@@ -1,4 +1,4 @@
-#-*-Makefile-*-
+#
# Cluster library makefile fragment, to be included in Makefile.am
#
lib_LTLIBRARIES += libqpidcluster.la
@@ -6,11 +6,16 @@ lib_LTLIBRARIES += libqpidcluster.la
if CLUSTER
libqpidcluster_la_SOURCES = \
+ qpid/cluster/Cluster.cpp \
+ qpid/cluster/Cluster.h \
qpid/cluster/Cpg.cpp \
- qpid/cluster/Cpg.h
-libqpidcluster_la_LIBADD= -lcpg
+ qpid/cluster/Cpg.h \
+ qpid/cluster/Dispatchable.h
+
+libqpidcluster_la_LIBADD= -lcpg libqpidcommon.la
else
# Empty stub library to satisfy rpm spec file.
libqpidcluster_la_SOURCES =
+
endif
diff --git a/cpp/src/generate.sh b/cpp/src/generate.sh
index a600897cc3..4f97f72684 100755
--- a/cpp/src/generate.sh
+++ b/cpp/src/generate.sh
@@ -7,7 +7,8 @@ set -e
gentools_dir="$srcdir/../gentools"
specs_dir="$srcdir/../../specs"
-specs="$specs_dir/amqp.0-9.xml $specs_dir/amqp-errata.0-9.xml $specs_dir/amqp-dtx-preview.0-9.xml"
+specs="$specs_dir/amqp.0-9.xml $specs_dir/amqp-errata.0-9.xml $specs_dir/amqp-dtx-preview.0-9.xml $srcdir/../xml/cluster.xml"
+
test -z "$JAVA" && JAVA=java ;
test -z "$JAVAC" && JAVAC=javac ;
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp
new file mode 100644
index 0000000000..30073c4551
--- /dev/null
+++ b/cpp/src/qpid/cluster/Cluster.cpp
@@ -0,0 +1,159 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include "Cluster.h"
+#include "Cpg.h"
+#include "qpid/framing/AMQFrame.h"
+#include "qpid/framing/ClusterNotifyBody.h"
+#include "qpid/log/Statement.h"
+#include <boost/bind.hpp>
+#include <algorithm>
+#include <iterator>
+
+namespace qpid {
+namespace cluster {
+using namespace qpid::framing;
+using namespace qpid::sys;
+using namespace std;
+
+ostream& operator <<(ostream& out, const Cluster& cluster) {
+ return out << cluster.name.str() << "(" << cluster.self << ")";
+}
+
+void Cluster::notify() {
+ // TODO aconway 2007-06-25: Use proxy here.
+ AMQFrame frame(version, 0,
+ make_shared_ptr(new ClusterNotifyBody(version, url)));
+ handle(frame);
+}
+
+Cluster::Cluster(
+ const std::string& name_, const std::string& url_, FrameHandler& next_,
+ ProtocolVersion ver)
+ : name(name_), url(url_), version(ver),
+ cpg(new Cpg(boost::bind(&Cluster::cpgDeliver, this, _1, _2, _3, _4, _5, _6),
+ boost::bind(&Cluster::cpgConfigChange, this, _1, _2, _3, _4, _5, _6, _7, _8))),
+ next(next_)
+{
+ self=Id(cpg->getLocalNoideId(), getpid());
+ QPID_LOG(trace, *this << " Joining cluster.");
+ cpg->join(name);
+ notify();
+ dispatcher=Thread(*this);
+}
+
+Cluster::~Cluster() {
+ try {
+ QPID_LOG(trace, *this << " Leaving cluster.");
+ cpg->leave(name);
+ cpg.reset();
+ dispatcher.join();
+ } catch (const std::exception& e) {
+ QPID_LOG(error, "Exception leaving cluster " << e.what());
+ }
+}
+
+void Cluster::handle(AMQFrame& frame) {
+ QPID_LOG(trace, *this << " SEND: " << frame);
+ Buffer buf(frame.size());
+ frame.encode(buf);
+ buf.flip();
+ iovec iov = { buf.start(), frame.size() };
+ cpg->mcast(name, &iov, 1);
+}
+
+size_t Cluster::size() const {
+ Mutex::ScopedLock l(lock);
+ return members.size();
+}
+
+Cluster::MemberList Cluster::getMembers() const {
+ Mutex::ScopedLock l(lock);
+ MemberList result(members.size());
+ std::transform(members.begin(), members.end(), result.begin(),
+ boost::bind(&MemberMap::value_type::second, _1));
+ return result;
+}
+
+void Cluster::cpgDeliver(
+ cpg_handle_t /*handle*/,
+ struct cpg_name* /* group */,
+ uint32_t nodeid,
+ uint32_t pid,
+ void* msg,
+ int msg_len)
+{
+ Id from(nodeid, pid);
+ Buffer buf(static_cast<char*>(msg), msg_len);
+ AMQFrame frame;
+ frame.decode(buf);
+ QPID_LOG(trace, *this << " RECV: " << frame);
+ // TODO aconway 2007-06-20: use visitor pattern.
+ ClusterNotifyBody* notifyIn= dynamic_cast<ClusterNotifyBody*>(frame.getBody().get());
+ if (notifyIn) {
+ Mutex::ScopedLock l(lock);
+ members[from].reset(new Member(notifyIn->getUrl()));
+ lock.notifyAll();
+ }
+ else
+ next.handle(frame);
+}
+
+void Cluster::cpgConfigChange(
+ cpg_handle_t /*handle*/,
+ struct cpg_name */*group*/,
+ struct cpg_address *ccMembers, int nMembers,
+ struct cpg_address *left, int nLeft,
+ struct cpg_address *joined, int nJoined
+)
+{
+ QPID_LOG(
+ trace,
+ *this << " Configuration change. " << endl
+ << " Joined: " << make_pair(joined, nJoined) << endl
+ << " Left: " << make_pair(left, nLeft) << endl
+ << " Current: " << make_pair(ccMembers, nMembers));
+
+ {
+ Mutex::ScopedLock l(lock);
+ // Erase members that left.
+ for (int i = 0; i < nLeft; ++i)
+ members.erase(Id(left[i]));
+ lock.notifyAll();
+ }
+
+ // If there are new members (other than myself) then notify.
+ for (int i=0; i< nJoined; ++i) {
+ if (Id(joined[i]) != self) {
+ notify();
+ break;
+ }
+ }
+
+ // Note: New members are be added to my map when cpgDeliver
+ // gets a cluster.notify frame.
+}
+
+void Cluster::run() {
+ cpg->dispatchBlocking();
+}
+
+}} // namespace qpid::cluster
+
+
+
diff --git a/cpp/src/qpid/cluster/Cluster.h b/cpp/src/qpid/cluster/Cluster.h
new file mode 100644
index 0000000000..1cbbb249f2
--- /dev/null
+++ b/cpp/src/qpid/cluster/Cluster.h
@@ -0,0 +1,116 @@
+#ifndef QPID_CLUSTER_CLUSTER_H
+#define QPID_CLUSTER_CLUSTER_H
+
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include "qpid/cluster/Cpg.h"
+#include "qpid/framing/FrameHandler.h"
+#include "qpid/sys/Thread.h"
+#include "qpid/sys/Monitor.h"
+#include "qpid/sys/Runnable.h"
+#include "qpid/shared_ptr.h"
+#include "qpid/framing/ProtocolVersion.h"
+#include <boost/scoped_ptr.hpp>
+#include <map>
+#include <vector>
+
+namespace qpid {
+namespace cluster {
+
+/**
+ * Represents a cluster. Creating an instance joins current process
+ * to the cluster.
+ */
+class Cluster : public framing::FrameHandler, private sys::Runnable {
+ public:
+ /** Details of a cluster member */
+ struct Member {
+ Member(const std::string& url_) : url(url_) {}
+ std::string url;
+ };
+
+ typedef std::vector<shared_ptr<const Member> > MemberList;
+
+ /**
+ * Join a cluster.
+ * @param name of the cluster.
+ * @param url of this broker, sent to the cluster.
+ * @param next handler receives the frame when it has been
+ * acknowledged by the cluster.
+ */
+ Cluster(const std::string& name,
+ const std::string& url,
+ framing::FrameHandler& next,
+ framing::ProtocolVersion);
+
+ ~Cluster();
+
+ /** Multicast a frame to the cluster. */
+ void handle(framing::AMQFrame&);
+
+ /** Get the current cluster membership. */
+ MemberList getMembers() const;
+
+ /** Number of members in the cluster. */
+ size_t size() const;
+
+ private:
+ typedef Cpg::Id Id;
+ typedef std::map<Id, shared_ptr<Member> > MemberMap;
+
+ void run();
+ void notify();
+
+ void cpgDeliver(
+ cpg_handle_t /*handle*/,
+ struct cpg_name *group,
+ uint32_t /*nodeid*/,
+ uint32_t /*pid*/,
+ void* /*msg*/,
+ int /*msg_len*/);
+
+ void cpgConfigChange(
+ cpg_handle_t /*handle*/,
+ struct cpg_name */*group*/,
+ struct cpg_address */*members*/, int /*nMembers*/,
+ struct cpg_address */*left*/, int /*nLeft*/,
+ struct cpg_address */*joined*/, int /*nJoined*/
+ );
+
+ Id self;
+ Cpg::Name name;
+ std::string url;
+ framing::ProtocolVersion version;
+ boost::scoped_ptr<Cpg> cpg;
+ framing::FrameHandler& next;
+ MemberMap members;
+ sys::Thread dispatcher;
+
+ protected:
+ // Allow access from ClusterTest subclass.
+ mutable sys::Monitor lock;
+
+ friend std::ostream& operator <<(std::ostream&, const Cluster&);
+};
+
+}} // namespace qpid::cluster
+
+
+
+#endif /*!QPID_CLUSTER_CLUSTER_H*/
diff --git a/cpp/src/qpid/cluster/Cpg.cpp b/cpp/src/qpid/cluster/Cpg.cpp
index 858d25f37c..a979ce1eeb 100644
--- a/cpp/src/qpid/cluster/Cpg.cpp
+++ b/cpp/src/qpid/cluster/Cpg.cpp
@@ -17,12 +17,86 @@
*/
#include "Cpg.h"
+#include "qpid/sys/Mutex.h"
+#include <vector>
+#include <limits>
+#include <iterator>
namespace qpid {
namespace cluster {
using namespace std;
+// Global vector of Cpg pointers by handle.
+// TODO aconway 2007-06-12: Replace this with cpg_get/set_context,
+// coming in in RHEL 5.1.
+class Cpg::Handles
+{
+ public:
+ void put(cpg_handle_t handle, Cpg* object) {
+ sys::Mutex::ScopedLock l(lock);
+ assert(object);
+ uint32_t index=uint32_t(handle); // Lower 32 bits is an array index.
+ if (index >= handles.size())
+ handles.resize(index+1, 0);
+ handles[index] = object;
+ }
+
+ Cpg* get(cpg_handle_t handle) {
+ sys::Mutex::ScopedLock l(lock);
+ uint32_t index=uint32_t(handle); // Lower 32 bits is an array index.
+ assert(index < handles.size());
+ assert(handles[index]);
+ return handles[index];
+ }
+
+ private:
+ sys::Mutex lock;
+ vector<Cpg*> handles;
+};
+
+Cpg::Handles Cpg::handles;
+
+// Global callback functions call per-object callbacks via handles vector.
+void Cpg::globalDeliver (
+ cpg_handle_t handle,
+ struct cpg_name *group,
+ uint32_t nodeid,
+ uint32_t pid,
+ void* msg,
+ int msg_len)
+{
+ handles.get(handle)->deliver(handle, group, nodeid, pid, msg, msg_len);
+}
+
+void Cpg::globalConfigChange(
+ cpg_handle_t handle,
+ struct cpg_name *group,
+ struct cpg_address *members, int nMembers,
+ struct cpg_address *left, int nLeft,
+ struct cpg_address *joined, int nJoined
+)
+{
+ handles.get(handle)->configChange(handle, group, members, nMembers, left, nLeft, joined, nJoined);
+}
+
+Cpg::Cpg(DeliverFn d, ConfigChangeFn c) : deliver(d), configChange(c)
+{
+ cpg_callbacks_t callbacks = { &globalDeliver, &globalConfigChange };
+ check(cpg_initialize(&handle, &callbacks), "Cannot initialize CPG");
+ handles.put(handle, this);
+}
+
+Cpg::~Cpg() {
+ try {
+ check(cpg_finalize(handle), "Error in shutdown of CPG");
+ }
+ catch (...) {
+ handles.put(handle, 0);
+ throw;
+ }
+}
+
string Cpg::errorStr(cpg_error_t err, const std::string& msg) {
switch (err) {
case CPG_OK: return msg+": ok";
@@ -56,6 +130,34 @@ std::string Cpg::cantMcastMsg(const Name& group) {
return "Cannot mcast to CPG group "+group.str();
}
+uint32_t Cpg::getLocalNoideId() const {
+ unsigned int nodeid;
+ check(cpg_local_get(handle, &nodeid), "Cannot get local node ID");
+ assert(nodeid <= std::numeric_limits<uint32_t>::max());
+ return nodeid;
+}
+
+ostream& operator<<(ostream& o, std::pair<cpg_address*,int> a) {
+ ostream_iterator<Cpg::Id> i(o, " ");
+ std::copy(a.first, a.first+a.second, i);
+ return o;
+}
+
+static int popbyte(uint32_t& n) {
+ uint8_t b=n&0xff;
+ n>>=8;
+ return b;
+}
+
+ostream& operator <<(ostream& out, const Cpg::Id& id) {
+ uint32_t node=id.nodeId();
+ out << popbyte(node);
+ for (int i = 0; i < 3; i++)
+ out << "." << popbyte(node);
+ return out << ":" << id.pid();
+}
+
+
}} // namespace qpid::cpg
diff --git a/cpp/src/qpid/cluster/Cpg.h b/cpp/src/qpid/cluster/Cpg.h
index 6e61fa8a6e..6b157301a7 100644
--- a/cpp/src/qpid/cluster/Cpg.h
+++ b/cpp/src/qpid/cluster/Cpg.h
@@ -19,7 +19,9 @@
*
*/
-#include <stdexcept>
+#include "qpid/Exception.h"
+#include "qpid/cluster/Dispatchable.h"
+#include <boost/function.hpp>
#include <cassert>
#ifdef CLUSTER
extern "C" {
@@ -34,11 +36,10 @@ namespace cluster {
* Manages a single CPG handle, initialized in ctor, finialzed in destructor.
* On error all functions throw Cpg::Exception
*/
-class Cpg {
+class Cpg : public Dispatchable {
public:
- // FIXME aconway 2007-06-01: qpid::Exception
- struct Exception : public std::runtime_error {
- Exception(const std::string& msg) : runtime_error(msg) {}
+ struct Exception : public ::qpid::Exception {
+ Exception(const std::string& msg) : ::qpid::Exception(msg) {}
};
struct Name : public cpg_name {
@@ -54,26 +55,45 @@ class Cpg {
std::string str() const { return std::string(value, length); }
};
- static inline std::string str(const cpg_name& n) {
+ struct Id {
+ uint64_t id;
+ Id() : id(0) {}
+ Id(uint32_t nodeid, uint32_t pid) { id=(uint64_t(nodeid)<<32)+ pid; }
+ Id(const cpg_address& addr) : id(Id(addr.nodeid, addr.pid)) {}
+
+ operator uint64_t() const { return id; }
+ uint32_t nodeId() const { return id >> 32; }
+ pid_t pid() const { return id & 0xFFFF; }
+ };
+
+ static std::string str(const cpg_name& n) {
return std::string(n.value, n.length);
}
- // TODO aconway 2007-06-01: when cpg handle supports a context pointer
- // use callback objects (boost::function) instead of free functions.
- //
+ typedef boost::function<void (
+ cpg_handle_t /*handle*/,
+ struct cpg_name *group,
+ uint32_t /*nodeid*/,
+ uint32_t /*pid*/,
+ void* /*msg*/,
+ int /*msg_len*/)> DeliverFn;
+
+ typedef boost::function<void (
+ cpg_handle_t /*handle*/,
+ struct cpg_name */*group*/,
+ struct cpg_address */*members*/, int /*nMembers*/,
+ struct cpg_address */*left*/, int /*nLeft*/,
+ struct cpg_address */*joined*/, int /*nJoined*/
+ )> ConfigChangeFn;
+
/** Open a CPG handle.
*@param deliver - free function called when a message is delivered.
*@param reconfig - free function called when CPG configuration changes.
*/
- Cpg(cpg_deliver_fn_t deliver, cpg_confchg_fn_t reconfig) {
- cpg_callbacks_t callbacks = { deliver, reconfig };
- check(cpg_initialize(&handle, &callbacks), "Cannot initialize CPG");
- }
+ Cpg(DeliverFn deliver, ConfigChangeFn reconfig);
- /** Disconnect */
- ~Cpg() {
- check(cpg_finalize(handle), "Cannot finalize CPG");
- }
+ /** Disconnect from CPG. */
+ ~Cpg();
/** Dispatch CPG events.
*@param type one of
@@ -85,6 +105,10 @@ class Cpg {
check(cpg_dispatch(handle,type), "Error in CPG dispatch");
}
+ void dispatchOne() { dispatch(CPG_DISPATCH_ONE); }
+ void dispatchAll() { dispatch(CPG_DISPATCH_ALL); }
+ void dispatchBlocking() { dispatch(CPG_DISPATCH_BLOCKING); }
+
void join(const Name& group) {
check(cpg_join(handle, const_cast<Name*>(&group)),cantJoinMsg(group));
};
@@ -99,7 +123,14 @@ class Cpg {
cantMcastMsg(group));
}
+ cpg_handle_t getHandle() const { return handle; }
+
+ uint32_t getLocalNoideId() const;
+
private:
+ class Handles;
+ friend class Handles;
+
static std::string errorStr(cpg_error_t err, const std::string& msg);
static std::string cantJoinMsg(const Name&);
static std::string cantLeaveMsg(const Name&);
@@ -110,9 +141,31 @@ class Cpg {
if (result != CPG_OK)
throw Exception(errorStr(result, msg));
}
+
+ static void globalDeliver(
+ cpg_handle_t /*handle*/,
+ struct cpg_name *group,
+ uint32_t /*nodeid*/,
+ uint32_t /*pid*/,
+ void* /*msg*/,
+ int /*msg_len*/);
+
+ static void globalConfigChange(
+ cpg_handle_t /*handle*/,
+ struct cpg_name */*group*/,
+ struct cpg_address */*members*/, int /*nMembers*/,
+ struct cpg_address */*left*/, int /*nLeft*/,
+ struct cpg_address */*joined*/, int /*nJoined*/
+ );
+
+ static Handles handles;
cpg_handle_t handle;
+ DeliverFn deliver;
+ ConfigChangeFn configChange;
};
+std::ostream& operator <<(std::ostream& out, const Cpg::Id& id);
+std::ostream& operator <<(std::ostream& out, const std::pair<cpg_address*,int> addresses);
}} // namespace qpid::cluster
diff --git a/cpp/src/qpid/cluster/Dispatchable.h b/cpp/src/qpid/cluster/Dispatchable.h
new file mode 100644
index 0000000000..e7f0df4218
--- /dev/null
+++ b/cpp/src/qpid/cluster/Dispatchable.h
@@ -0,0 +1,52 @@
+#ifndef QPID_CLUSTER_DISPATCHABLE_H
+#define QPID_CLUSTER_DISPATCHABLE_H
+
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+namespace qpid {
+namespace cluster {
+
+/**
+ * Interface for classes that have some "events" that need dispatching
+ * in a thread.
+ */
+class Dispatchable
+{
+ public:
+ virtual ~Dispatchable() {}
+
+ /** Dispatch one event in current thread. */
+ virtual void dispatchOne() = 0;
+ /** Dispatch all available events, don't block. */
+ virtual void dispatchAll() = 0;
+ /** Blocking loop to dispatch cluster events */
+ virtual void dispatchBlocking() = 0;
+
+ /** Wait for at least one event, then dispatch all available events.
+ * Don't block. Useful for tests.
+ */
+ virtual void dispatchSome() { dispatchOne(); dispatchAll(); }
+
+};
+
+}} // namespace qpid::cluster
+
+
+
+#endif /*!QPID_CLUSTER_DISPATCHABLE_H*/
diff --git a/cpp/src/qpid/framing/FrameHandler.h b/cpp/src/qpid/framing/FrameHandler.h
new file mode 100644
index 0000000000..817c569119
--- /dev/null
+++ b/cpp/src/qpid/framing/FrameHandler.h
@@ -0,0 +1,39 @@
+#ifndef QPID_FRAMING_FRAMEHANDLER_H
+#define QPID_FRAMING_FRAMEHANDLER_H
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <boost/noncopyable.hpp>
+
+namespace qpid {
+namespace framing {
+class AMQFrame;
+
+class FrameHandler : private boost::noncopyable {
+ public:
+ virtual ~FrameHandler() {}
+ virtual void handle(AMQFrame& frame) = 0;
+};
+
+}}
+
+
+#endif /*!QPID_FRAMING_FRAMEHANDLER_H*/
diff --git a/cpp/src/qpid/sys/Acceptor.h b/cpp/src/qpid/sys/Acceptor.h
index 5e624a956e..8d6bca8f29 100644
--- a/cpp/src/qpid/sys/Acceptor.h
+++ b/cpp/src/qpid/sys/Acceptor.h
@@ -36,6 +36,7 @@ class Acceptor : public qpid::SharedObject<Acceptor>
static Acceptor::shared_ptr create(int16_t port, int backlog, int threads, bool trace = false);
virtual ~Acceptor() = 0;
virtual uint16_t getPort() const = 0;
+ virtual std::string getHost() const = 0;
virtual void run(qpid::sys::ConnectionInputHandlerFactory* factory) = 0;
virtual void shutdown() = 0;
};
diff --git a/cpp/src/qpid/sys/apr/APRAcceptor.cpp b/cpp/src/qpid/sys/apr/APRAcceptor.cpp
index 0f0853b35d..8662e602c2 100644
--- a/cpp/src/qpid/sys/apr/APRAcceptor.cpp
+++ b/cpp/src/qpid/sys/apr/APRAcceptor.cpp
@@ -35,6 +35,7 @@ class APRAcceptor : public Acceptor
public:
APRAcceptor(int16_t port, int backlog, int threads, bool trace);
virtual uint16_t getPort() const;
+ virtual std::string getHost() const;
virtual void run(qpid::sys::ConnectionInputHandlerFactory* factory);
virtual void shutdown();
@@ -72,6 +73,12 @@ APRAcceptor::APRAcceptor(int16_t port_, int backlog, int threads, bool trace_) :
CHECK_APR_SUCCESS(apr_socket_listen(socket, backlog));
}
+std::string APRAcceptor::getHost() const {
+ apr_sockaddr_t* address;
+ CHECK_APR_SUCCESS(apr_socket_addr_get(&address, APR_LOCAL, socket));
+ return address->hostname;
+}
+
uint16_t APRAcceptor::getPort() const {
apr_sockaddr_t* address;
CHECK_APR_SUCCESS(apr_socket_addr_get(&address, APR_LOCAL, socket));
diff --git a/cpp/src/qpid/sys/posix/EventChannelAcceptor.cpp b/cpp/src/qpid/sys/posix/EventChannelAcceptor.cpp
index 1a5fceb56e..cbda216cfc 100644
--- a/cpp/src/qpid/sys/posix/EventChannelAcceptor.cpp
+++ b/cpp/src/qpid/sys/posix/EventChannelAcceptor.cpp
@@ -52,6 +52,7 @@ class EventChannelAcceptor : public Acceptor {
);
uint16_t getPort() const;
+ std::string getHost() const;
void run(ConnectionInputHandlerFactory* factory);
@@ -100,6 +101,10 @@ uint16_t EventChannelAcceptor::getPort() const {
return port; // Immutable no need for lock.
}
+uint16_t EventChannelAcceptor::getPort() const {
+ return port; // Immutable no need for lock.
+}
+
void EventChannelAcceptor::run(ConnectionInputHandlerFactory* f) {
{
Mutex::ScopedLock l(lock);
diff --git a/cpp/src/qpid/sys/posix/PosixAcceptor.cpp b/cpp/src/qpid/sys/posix/PosixAcceptor.cpp
index af200d393d..0575380a14 100644
--- a/cpp/src/qpid/sys/posix/PosixAcceptor.cpp
+++ b/cpp/src/qpid/sys/posix/PosixAcceptor.cpp
@@ -32,6 +32,7 @@ void fail() { throw qpid::Exception("PosixAcceptor not implemented"); }
class PosixAcceptor : public Acceptor {
public:
virtual uint16_t getPort() const { fail(); return 0; }
+ virtual std::string getPort() const { fail(); return std::string(); }
virtual void run(qpid::sys::ConnectionInputHandlerFactory* ) { fail(); }
virtual void shutdown() { fail(); }
};
diff --git a/cpp/src/qpid/sys/posix/Socket.cpp b/cpp/src/qpid/sys/posix/Socket.cpp
index 39651fa821..50cbfa7c4d 100644
--- a/cpp/src/qpid/sys/posix/Socket.cpp
+++ b/cpp/src/qpid/sys/posix/Socket.cpp
@@ -112,7 +112,21 @@ int Socket::listen(int port, int backlog)
return ntohs(name.sin_port);
}
-
+std::string getHost() const {
+ // TODO aconway 2007-06-11: Won't work for ip6
+ struct sockaddr_in name;
+ socklen_t namelen = sizeof(name);
+ if (::getsockname(socket, (struct sockaddr*)&name, &namelen) < 0)
+ throw QPID_POSIX_ERROR(errno);
+ uint32_t addr = name.sin_host.s_addr;
+ ostringstream os;
+ os << uint8_t(addr >> 24) << '.'
+ << uint8_t(addr >> 16) << '.'
+ << uint8_t(addr >> 8) << '.'
+ << uint8_t(addr);
+ return os.str();
+}
+
int Socket::fd()
{
return socket;
diff --git a/cpp/src/tests/.valgrind.supp-default b/cpp/src/tests/.valgrind.supp-default
index 8b13789179..21fa58db45 100644
--- a/cpp/src/tests/.valgrind.supp-default
+++ b/cpp/src/tests/.valgrind.supp-default
@@ -1 +1,10 @@
+{
+ Benign error in libcpg.
+
+ Memcheck:Param
+ socketcall.sendmsg(msg.msg_iov[i])
+ obj:*/libpthread-2.5.so
+ obj:*/libcpg.so.2.0.0
+}
+
diff --git a/cpp/src/tests/Cluster.cpp b/cpp/src/tests/Cluster.cpp
new file mode 100644
index 0000000000..ed50cc5d7b
--- /dev/null
+++ b/cpp/src/tests/Cluster.cpp
@@ -0,0 +1,72 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#define BOOST_AUTO_TEST_MAIN // Must come before #include<boost/test/*>
+#include <boost/test/auto_unit_test.hpp>
+#include "test_tools.h"
+#include "Cluster.h"
+#include "qpid/framing/ChannelOkBody.h"
+#include "qpid/framing/BasicGetOkBody.h"
+
+
+
+static const ProtocolVersion VER;
+
+/** Verify membership ind a cluster with one member. */
+BOOST_AUTO_TEST_CASE(clusterOne) {
+ VectorFrameHandler received;
+ Cluster cluster("Test", "amqp:one:1", received, VER);
+ AMQFrame frame(VER, 1, new ChannelOkBody(VER));
+
+ cluster.handle(frame);
+ BOOST_REQUIRE(received.waitFor(1));
+ BOOST_CHECK_EQUAL(1u, cluster.size());
+ Cluster::MemberList members = cluster.getMembers();
+ BOOST_CHECK_EQUAL(1u, members.size());
+ BOOST_REQUIRE_EQUAL(members.front()->url, "amqp:one:1");
+ BOOST_CHECK_EQUAL(1u, received.size());
+ BOOST_CHECK_TYPEID_EQUAL(ChannelOkBody, *received[0].getBody());
+}
+
+/** Fork a process to verify membership in a cluster with two members */
+BOOST_AUTO_TEST_CASE(clusterTwo) {
+ VectorFrameHandler received;
+ pid_t pid=fork();
+ BOOST_REQUIRE(pid >= 0);
+ if (pid) { // Parent
+ TestCluster cluster("Test", "amqp::1", received, VER);
+ BOOST_REQUIRE(cluster.waitFor(2));
+
+ // Exchange frames with child.
+ AMQFrame frame(VER, 1, new ChannelOkBody(VER));
+ cluster.handle(frame);
+ BOOST_REQUIRE(received.waitFor(2));
+ BOOST_CHECK_TYPEID_EQUAL(ChannelOkBody, *received[0].getBody());
+ BOOST_CHECK_TYPEID_EQUAL(BasicGetOkBody, *received[1].getBody());
+
+ // Wait for child to exit.
+ int status;
+ BOOST_CHECK_EQUAL(::wait(&status), pid);
+ BOOST_CHECK_EQUAL(0, status);
+ BOOST_CHECK(cluster.waitFor(1));
+ BOOST_CHECK_EQUAL(1u, cluster.size());
+ }
+ else { // Child
+ BOOST_REQUIRE(execl("Cluster_child", "Cluster_child", NULL));
+ }
+}
diff --git a/cpp/src/tests/Cluster.h b/cpp/src/tests/Cluster.h
new file mode 100644
index 0000000000..7ca5445e10
--- /dev/null
+++ b/cpp/src/tests/Cluster.h
@@ -0,0 +1,83 @@
+#ifndef CLUSTER_H
+#define CLUSTER_H
+
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include "qpid/cluster/Cluster.h"
+#include "qpid/framing/AMQFrame.h"
+#include "qpid/framing/ChannelOkBody.h"
+#include "qpid/framing/BasicGetOkBody.h"
+#include "qpid/log/Logger.h"
+#include <iostream>
+#include <vector>
+
+/**
+ * Definitions for the Cluster.cpp and Cluster_child.cpp child program.
+ */
+
+// using namespace in header file is bad manners, but this is strictly for
+// the tests.
+using namespace std;
+using namespace qpid;
+using namespace qpid::cluster;
+using namespace qpid::framing;
+using namespace qpid::sys;
+
+struct TestCluster : public Cluster {
+ TestCluster(const std::string& name,
+ const std::string& url,
+ framing::FrameHandler& next,
+ framing::ProtocolVersion ver) : Cluster(name,url,next, ver) {}
+
+ /** Wait for the cluster to be of expected size (exactly) */
+ bool waitFor(size_t n) {
+ Mutex::ScopedLock l(lock);
+ AbsTime deadline(now(),2*TIME_SEC);
+ while(size() != n && lock.wait(deadline))
+ ;
+ return size() == n;
+ }
+};
+
+struct VectorFrameHandler :
+ public std::vector<AMQFrame>, public FrameHandler, public Monitor
+
+{
+ void handle(AMQFrame& f) {
+ ScopedLock l(*this);
+ push_back(f);
+ notifyAll();
+ }
+
+ /** Wait for vector to reach size n exactly */
+ bool waitFor(size_t n) {
+ ScopedLock l(*this);
+ AbsTime deadline(now(), 1*TIME_SEC);
+ while (size() != n && wait(deadline))
+ ;
+ return size() == n;
+ }
+};
+
+
+// namespace
+
+
+
+#endif /*!CLUSTER_H*/
diff --git a/cpp/src/tests/unit/Cpg.cpp b/cpp/src/tests/Cpg.cpp
index 74c6532338..97b829ea63 100644
--- a/cpp/src/tests/unit/Cpg.cpp
+++ b/cpp/src/tests/Cpg.cpp
@@ -17,8 +17,11 @@
*/
#define BOOST_AUTO_TEST_MAIN // Must come before #include<boost/test/*>
+#include <boost/test/auto_unit_test.hpp>
#include "test_tools.h"
#include "qpid/cluster/Cpg.h"
+#include "qpid/framing/AMQBody.h"
+#include <boost/bind.hpp>
#include <string>
#include <iostream>
#include <iterator>
@@ -27,6 +30,7 @@
using namespace std;
using namespace qpid::cluster;
+using namespace qpid::framing;
// For debugging: op << for CPG types.
@@ -47,51 +51,57 @@ ostream& operator<<(ostream& o, const pair<T*, int>& array) {
return o;
}
-const string testGroup("foo");
-vector<string> delivered;
-vector<int> configChanges;
+struct Callback {
+ Callback(const string group_) : group(group_) {}
+ string group;
+ vector<string> delivered;
+ vector<int> configChanges;
-void testDeliver (
- cpg_handle_t /*handle*/,
- struct cpg_name *group,
- uint32_t /*nodeid*/,
- uint32_t /*pid*/,
- void* msg,
- int msg_len)
-{
- BOOST_CHECK_EQUAL(testGroup, Cpg::str(*group));
- delivered.push_back(string((char*)msg,msg_len));
-}
+ void deliver (
+ cpg_handle_t /*handle*/,
+ struct cpg_name *grp,
+ uint32_t /*nodeid*/,
+ uint32_t /*pid*/,
+ void* msg,
+ int msg_len)
+ {
+ BOOST_CHECK_EQUAL(group, Cpg::str(*grp));
+ delivered.push_back(string((char*)msg,msg_len));
+ }
-void testConfigChange(
- cpg_handle_t /*handle*/,
- struct cpg_name *group,
- struct cpg_address */*members*/, int nMembers,
- struct cpg_address */*left*/, int /*nLeft*/,
- struct cpg_address */*joined*/, int /*nJoined*/
-)
-{
- BOOST_CHECK_EQUAL(testGroup, Cpg::str(*group));
- configChanges.push_back(nMembers);
-}
+ void configChange(
+ cpg_handle_t /*handle*/,
+ struct cpg_name *grp,
+ struct cpg_address */*members*/, int nMembers,
+ struct cpg_address */*left*/, int /*nLeft*/,
+ struct cpg_address */*joined*/, int /*nJoined*/
+ )
+ {
+ BOOST_CHECK_EQUAL(group, Cpg::str(*grp));
+ configChanges.push_back(nMembers);
+ }
+};
-BOOST_AUTO_TEST_CASE(basic) {
+BOOST_AUTO_TEST_CASE(Cpg_basic) {
// Verify basic functionality of cpg. This will catch any
// openais configuration or permission errors.
- //
- Cpg cpg(&testDeliver, &testConfigChange);
+ //
Cpg::Name group("foo");
+ Callback cb(group.str());
+ Cpg::DeliverFn deliver=boost::bind(&Callback::deliver, &cb, _1, _2, _3, _4, _5, _6);
+ Cpg::ConfigChangeFn reconfig=boost::bind<void>(&Callback::configChange, &cb, _1, _2, _3, _4, _5, _6, _7, _8);
+ Cpg cpg(deliver, reconfig);
cpg.join(group);
iovec iov = { (void*)"Hello!", 6 };
cpg.mcast(group, &iov, 1);
cpg.leave(group);
+ cpg.dispatchSome();
- cpg.dispatch(CPG_DISPATCH_ONE); // Wait for at least one.
- cpg.dispatch(CPG_DISPATCH_ALL);
- BOOST_REQUIRE_EQUAL(1u, delivered.size());
- BOOST_CHECK_EQUAL("Hello!", delivered.front());
- BOOST_REQUIRE_EQUAL(2u, configChanges.size());
- BOOST_CHECK_EQUAL(1, configChanges[0]);
- BOOST_CHECK_EQUAL(0, configChanges[1]);
+ BOOST_REQUIRE_EQUAL(1u, cb.delivered.size());
+ BOOST_CHECK_EQUAL("Hello!", cb.delivered.front());
+ BOOST_REQUIRE_EQUAL(2u, cb.configChanges.size());
+ BOOST_CHECK_EQUAL(1, cb.configChanges[0]);
+ BOOST_CHECK_EQUAL(0, cb.configChanges[1]);
}
+
diff --git a/cpp/src/tests/Makefile.am b/cpp/src/tests/Makefile.am
index 76d675ee12..ab772ea744 100644
--- a/cpp/src/tests/Makefile.am
+++ b/cpp/src/tests/Makefile.am
@@ -1,25 +1,34 @@
AM_CXXFLAGS = $(WARNING_CFLAGS) $(CPPUNIT_CXXFLAGS) $(APR_CXXFLAGS)
INCLUDES = -I$(srcdir)/.. -I$(srcdir)/../gen -I$(top_builddir)/src/gen
+abs_builddir=@abs_builddir@
+extra_libs = $(CPPUNIT_LIBS)
+lib_client = $(abs_builddir)/../libqpidclient.la
+lib_common = $(abs_builddir)/../libqpidcommon.la
+lib_broker = $(abs_builddir)/../libqpidbroker.la
+
#
-# Unit test programs.
+# Initialize variables that are incremented with +=
#
-UNIT_TESTS=
+check_PROGRAMS=
+unit_progs=
+unit_wrappers=
-UNIT_TESTS+=logging
-logging_SOURCES=unit/logging.cpp unit/test_tools.h
-logging_LDADD=-lboost_unit_test_framework -lboost_regex $(lib_common)
+#
+# Unit test programs.
+#
+unit_progs+=logging
+logging_SOURCES=logging.cpp test_tools.h
+logging_LDADD=-lboost_unit_test_framework -lboost_regex $(lib_common)
-UNIT_TESTS+=Url
-Url_SOURCES=unit/Url.cpp unit/test_tools.h
-Url_LDADD=-lboost_unit_test_framework $(lib_common)
+unit_progs+=Url
+Url_SOURCES=Url.cpp test_tools.h
+Url_LDADD=-lboost_unit_test_framework $(lib_common)
-if CLUSTER
include cluster.mk
-endif
# NB: CppUnit test libraries below will be migrated to boost test programs.
-# UNIT_TESTS+= ...
+#
# Unit tests
broker_unit_tests = \
@@ -74,7 +83,7 @@ testprogs = \
topic_publisher
-check_PROGRAMS = $(UNIT_TESTS) $(AIS_UNIT_TESTS) $(testprogs) interop_runner
+check_PROGRAMS += $(unit_progs) $(testprogs) interop_runner
# FIXME aconway 2007-05-30: TESTS_ENVIRONMENT should have ./run_test
# as below to run valgrind on all test programs.
@@ -83,13 +92,12 @@ check_PROGRAMS = $(UNIT_TESTS) $(AIS_UNIT_TESTS) $(testprogs) interop_runner
#TESTS_ENVIRONMENT = VALGRIND=$(VALGRIND) srcdir=$(srcdir) ./run_test
TESTS_ENVIRONMENT = VALGRIND=$(VALGRIND) srcdir=$(srcdir)
-SYSTEM_TESTS = client_test quick_topictest
-TESTS = $(UNIT_TESTS) run-unit-tests $(RUN_AIS_TESTS) start_broker $(SYSTEM_TESTS) python_tests kill_broker daemon_test
+system_tests = client_test quick_topictest
+TESTS = dummy_test $(unit_progs) $(unit_wrappers) run-unit-tests start_broker $(system_tests) python_tests kill_broker daemon_test
EXTRA_DIST = \
test_env run_test \
run-unit-tests start_broker python_tests kill_broker daemon_test \
- ais_unit_tests \
quick_topictest \
topictest \
.valgrind.supp-default \
@@ -106,12 +114,6 @@ check_LTLIBRARIES += libdlclose_noop.la
libdlclose_noop_la_LDFLAGS = -module -rpath /home/aconway/svn/qpid/cpp/tests
libdlclose_noop_la_SOURCES = dlclose_noop.c
-abs_builddir = @abs_builddir@
-extra_libs = $(CPPUNIT_LIBS)
-lib_client = $(abs_builddir)/../libqpidclient.la
-lib_common = $(abs_builddir)/../libqpidcommon.la
-lib_broker = $(abs_builddir)/../libqpidbroker.la
-
gen.mk: Makefile.am
( \
for i in $(testprogs); do \
@@ -131,11 +133,14 @@ gen.mk: Makefile.am
> $@-t
mv $@-t $@
-check: .valgrindrc .valgrind.supp
-
check-unit:
$(MAKE) check TESTS=$(UNIT_TESTS) run-unit-tests
+# Dummy test to force necessary test files to be generated.
+dummy_test: .valgrind.supp .valgrindrc
+ { echo "#!/bin/sh"; echo "# Dummy test, does nothing. "; } > $@
+ chmod a+x $@
+
# Create a copy so that can be modified without risk of committing the changes.
.valgrindrc: .valgrindrc-default
cp $^ $@
@@ -147,7 +152,7 @@ check-unit:
# ltmain invocations, one may corrupt the temporaries of the other.
.NOTPARALLEL:
-CLEANFILES=valgrind.out qpidd.log .valgrindrc .valgrind.supp
+CLEANFILES=valgrind.out qpidd.log .valgrindrc .valgrind.supp dummy_test $(unit_wrappers)
MAINTAINERCLEANFILES=gen.mk
interop_runner_SOURCES = \
diff --git a/cpp/src/tests/unit/Url.cpp b/cpp/src/tests/Url.cpp
index a8b415e641..09aabb80b3 100644
--- a/cpp/src/tests/unit/Url.cpp
+++ b/cpp/src/tests/Url.cpp
@@ -17,6 +17,7 @@
*/
#define BOOST_AUTO_TEST_MAIN // Must come before #include<boost/test/*>
+#include <boost/test/auto_unit_test.hpp>
#include "test_tools.h"
#include "qpid/Url.h"
#include <boost/assign.hpp>
diff --git a/cpp/src/tests/ais_unit_tests b/cpp/src/tests/ais_unit_tests
deleted file mode 100755
index 9758891891..0000000000
--- a/cpp/src/tests/ais_unit_tests
+++ /dev/null
@@ -1,2 +0,0 @@
-#!/bin/sh
-make check-ais
diff --git a/cpp/src/tests/cluster.mk b/cpp/src/tests/cluster.mk
index 489eec748e..fd8f789363 100644
--- a/cpp/src/tests/cluster.mk
+++ b/cpp/src/tests/cluster.mk
@@ -1,23 +1,35 @@
-#-*-Makefile-*-
+if CLUSTER
# Cluster tests makefile fragment, to be included in Makefile.am
#
lib_cluster = $(abs_builddir)/../libqpidcluster.la
-#
-# AIS_UNIT_TESTS must be called with gid=ais. They are run
-# separately under sudo -u ais.
-#
-AIS_UNIT_TESTS=
-AIS_UNIT_TESTS+=Cpg
-Cpg_SOURCES=unit/Cpg.cpp
-Cpg_LDADD=-lboost_unit_test_framework $(lib_cluster)
-
-RUN_AIS_TESTS=ais_unit_tests # Run ais unit tests via check-ais.
+# NOTE: Programs using the openais library must be run with gid=ais
+# Such programs are built as *.ais, with a wrapper script *.sh that
+# runs the program under sudo -u ais.
+#
+# Rule to generate wrappers.
# The chmod is a horrible hack to allow libtools annoying wrapers to
# relink the executable when run as user ais.
-check-ais: $(AIS_UNIT_TESTS)
+.ais.sh:
+ echo sudo -u ais env VALGRIND=$(VALGRIND) srcdir=$(srcdir) $(srcdir)/run_test ./$< >$@; chmod a+x $@
chmod a+rwx . .libs
- sudo -u ais $(MAKE) check TESTS=$(AIS_UNIT_TESTS)
+# Cluster tests.
+#
+check_PROGRAMS+=Cpg.ais
+Cpg_ais_SOURCES=Cpg.cpp
+Cpg_ais_LDADD=$(lib_cluster) -lboost_unit_test_framework
+unit_wrappers+=Cpg.sh
+
+check_PROGRAMS+=Cluster.ais
+Cluster_ais_SOURCES=Cluster.cpp Cluster.h
+Cluster_ais_LDADD=$(lib_cluster) -lboost_unit_test_framework
+unit_wrappers+=Cluster.sh
+
+check_PROGRAMS+=Cluster_child
+Cluster_child_SOURCES=Cluster_child.cpp Cluster.h
+Cluster_child_LDADD=$(lib_cluster) -lboost_test_exec_monitor
+
+endif
diff --git a/cpp/src/tests/unit/logging.cpp b/cpp/src/tests/logging.cpp
index c80bf7b337..ebe8f4d6e8 100644
--- a/cpp/src/tests/unit/logging.cpp
+++ b/cpp/src/tests/logging.cpp
@@ -17,6 +17,7 @@
*/
#define BOOST_AUTO_TEST_MAIN // Must come before #include<boost/test/*>
+#include <boost/test/auto_unit_test.hpp>
#include "test_tools.h"
#include "qpid/log/Logger.h"
#include "qpid/log/Options.h"
diff --git a/cpp/src/tests/run_test b/cpp/src/tests/run_test
index ef608e55ca..bfd6991481 100755
--- a/cpp/src/tests/run_test
+++ b/cpp/src/tests/run_test
@@ -22,7 +22,7 @@ vg_check()
grep -E '^==[0-9]+== ERROR SUMMARY:' $VG_LOG > /dev/null || \
vg_failed "No valgrind ERROR SUMMARY line in $$vg_failed."
# Ensure that the number of errors is 0.
- grep -E '^==[0-9]+== ERROR SUMMARY: [^0] ' $VG_LOG > /dev/null && \
+ grep -E '^==[0-9]+== ERROR SUMMARY: [^0]' $VG_LOG > /dev/null && \
vg_failed "Valgrind reported errors in $vg_out; see above."
# Check for leaks.
grep -E '^==[0-9]+== +.* lost: [^0]' $VG_LOG && \
@@ -41,14 +41,14 @@ if grep -l "^# Generated by .*libtool" "$1" >/dev/null 2>&1; then
# This is a libtool "executable". Valgrind it if VALGRIND specified.
test -n "$VALGRIND" && VALGRIND="$VALGRIND --log-file-exactly=$VG_LOG --"
# Hide output unless there's an error.
- libtool --mode=execute $VALGRIND "$@" >$TEST_LOG 2>&1 || {
+ libtool --mode=execute "$VALGRIND" "$@" >$TEST_LOG 2>&1 || {
ERROR=$?
cat $TEST_LOG
}
test -n "$VALGRIND" && vg_check
else
# This is a non-libtool shell script, just execute it.
- "$@"
+ exec "$@"
fi
if test -z "$ERROR"; then
diff --git a/cpp/src/tests/unit/test_tools.h b/cpp/src/tests/test_tools.h
index faa198af9a..e564b9a473 100644
--- a/cpp/src/tests/unit/test_tools.h
+++ b/cpp/src/tests/test_tools.h
@@ -19,7 +19,7 @@
*
*/
-#include <boost/test/auto_unit_test.hpp>
+#include <boost/test/test_tools.hpp>
#include <boost/assign/list_of.hpp>
#include <boost/regex.hpp>
#include <vector>
@@ -49,5 +49,8 @@ inline bool regexPredicate(const std::string& re, const std::string& text) {
#define BOOST_CHECK_REGEX(re, text) \
BOOST_CHECK_PREDICATE(regexPredicate, (re)(text))
+/** Check if types of two objects (as given by typeinfo::name()) match. */
+#define BOOST_CHECK_TYPEID_EQUAL(a,b) BOOST_CHECK_EQUAL(typeid(a).name(),typeid(b).name())
+
#endif /*!TEST_TOOLS_H*/