diff options
author | Alan Conway <aconway@apache.org> | 2007-06-26 02:11:55 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2007-06-26 02:11:55 +0000 |
commit | baae218dacf7b5e56b52602ad4fbf2be0b81a2af (patch) | |
tree | a40207d3c21d96f7df16c7005602fecbd1d90d88 | |
parent | b8c5c8d42d8dc73cfc58acd015e0af2b4eac0dcc (diff) | |
download | qpid-python-baae218dacf7b5e56b52602ad4fbf2be0b81a2af.tar.gz |
2007-06-25 <aconway@redhat.com>
Cluster class implementing cluster membership map.
* src/qpid/cluster/Cluster.cpp: Cluster membership implementation.
* src/qpid/cluster/Cpg.cpp: Support for boost::function callbacks.
* src/tests/Url.cpp: Implements AMQP-95 URL format.
* xml/cluster.xml: Cluster join method.
Build/packaging
* README: Remove mention of openais till clustering is functional.
For now it is optional and we depend on an unpackaged version.
* configure.ac: Check openais has cpg_local_get().
* Makefile.am: Added cluster.xml to EXTRA_DIST.
* src/generate.sh: add cluster.xml to codegen.
* src/tests/Makefile.am:
- Generate individual "sudo -u ais" wrappers for openais tests.
- Drop "unit" directory, all unit tests in "tests" directory
Minor changes:
* src/qpid/sys/posix/Socket.cpp:
* src/qpid/sys/posix/PosixAcceptor.cpp:
* src/qpid/sys/posix/EventChannelAcceptor.cpp:
* src/qpid/sys/apr/APRAcceptor.cpp:
* src/qpid/sys/Acceptor.h (getHost): Added getHost()
* src/tests/.valgrind.supp-default: Suppress benign valgrind
warning in libcpg.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@550658 13f79535-47bb-0310-9956-ffa450edef68
29 files changed, 909 insertions, 117 deletions
diff --git a/qpid/cpp/Makefile.am b/qpid/cpp/Makefile.am index 515b811a29..2d9f982eee 100644 --- a/qpid/cpp/Makefile.am +++ b/qpid/cpp/Makefile.am @@ -7,7 +7,8 @@ EXTRA_DIST = \ LICENSE NOTICE README \ etc/qpidd \ $(SPEC) $(SPEC).in \ - rpm/README.qpidd-devel + rpm/README.qpidd-devel \ + xml/cluster.xml SUBDIRS = src docs/api docs/man examples diff --git a/qpid/cpp/README b/qpid/cpp/README index 95104cf34f..805a42cd86 100644 --- a/qpid/cpp/README +++ b/qpid/cpp/README @@ -39,7 +39,6 @@ Qpid is compiled against libraries: * boost <http://www.boost.org> (1.33.1) * cppunit <http://cppunit.sourceforge.net> (1.11.4) * libdaemon <http://www.stud.uni-hamburg.de/users/lennart/projects/libdaemon> (0.10) - * opeais <http://www.openais.org/> (0.80.2) Using tools: @@ -54,10 +53,7 @@ Using tools: * graphviz <http://www.graphviz.org/> (2.12) * JDK 5.0 <http://java.sun.com/j2se/1.5.0/> (1.5.0.11) -=== Optional tools an libraries === - -If openais is not installed the broker will be built without cluster -functionality. +=== Optional tools and libraries === The following are only required if you generate documentation. (Source distributions contain pre-generated documentation.) @@ -77,10 +73,9 @@ If building from a source distribution you do not need: On linux most packages can be installed using your distribution's package management tool. For example on Fedora: - # yum install apr-devel boost-devel cppunit-devel libdaemon-devel openais-devel + # yum install apr-devel boost-devel cppunit-devel libdaemon-devel # yum install pkgconfig doxygen graphviz help2man - Follow the manual installation instruction below for any packages not available through yum. @@ -96,7 +91,8 @@ It is recommended that you create a directory to install them to, for example, # make install The exceptions to this are boost and JDK 5.0. - To build the boost library: + +==== To build the boost library ==== 1. Unpack boost-jam. 2. Add bjam in the unpacked directory to your path. @@ -105,7 +101,8 @@ It is recommended that you create a directory to install them to, for example, # bjam -sTOOLS=gcc --prefix=~/qpid-tools -To install JDK 5.0 download and run its install script, or whatever +==== To install JDK 5.0 ==== +Download and run its install script, or whatever alternative instructions may be on the sun website. Ensure that all the build tools are available on your path, when they are diff --git a/qpid/cpp/configure.ac b/qpid/cpp/configure.ac index d0d1302ec7..67fe40d84d 100644 --- a/qpid/cpp/configure.ac +++ b/qpid/cpp/configure.ac @@ -162,15 +162,22 @@ AC_CHECK_HEADERS([boost/shared_ptr.hpp],,[missing="$missing boost"]) test -z "$missing" || AC_MSG_ERROR([Missing required headers. Install the folowing packages or -devel rpms: $missing.]) -# Enable/disable cluster functionality based on presence of openais -AC_CHECK_HEADER([openais/cpg.h],[cluster=yes],[cluster=no]) -AM_CONDITIONAL([CLUSTER], test x$cluster = xyes) -if test x$cluster = xyes; then - LDFLAGS="$LDFLAGS -L/usr/lib/openais -L/usr/lib64/openais" +# Enable/disable cluster functionality based on presence of usable openais +# and devel libs. +# cpg_local_get is not yet in a packaged release as of 2007-06-20 +LDFLAGS_save=$LDFLAGS +LDFLAGS="$LDFLAGS -L/usr/lib/openais -L/usr/lib64/openais" +AC_CHECK_LIB([cpg], [cpg_local_get], [libcpg=yes], [libcpg=no]) +AC_CHECK_HEADER([openais/cpg.h],[cpg_h=yes],[cpg_h=no]) +if test x$libcpg = xyes -a x$cpg_h = xyes; then + AM_CONDITIONAL([CLUSTER], true) CPPFLAGS+=-DCLUSTER - AC_CHECK_LIB([cpg], [cpg_initialize], [], - [AC_MSG_ERROR([Cannot find library -lcpg. Install openais.])]) +else + LDFLAGS=LDFLAGS_save fi +if test x$libcpg = xno -a x$cpg_h = xyes; then + AC_MSG_WARN([Found cpg.h but libcpg is missing or does not contain cpg_local_get. Need build of openais whitetank branch head as of 2007-06-20]) +fi AC_CONFIG_FILES([ qpidc.spec diff --git a/qpid/cpp/src/Makefile.am b/qpid/cpp/src/Makefile.am index 5b09172ee2..47e714955c 100644 --- a/qpid/cpp/src/Makefile.am +++ b/qpid/cpp/src/Makefile.am @@ -133,6 +133,7 @@ libqpidcommon_la_SOURCES = \ qpid/framing/AMQHeaderBody.cpp \ qpid/framing/AMQHeartbeatBody.cpp \ qpid/framing/AMQMethodBody.cpp \ + qpid/framing/FrameHandler.h \ qpid/framing/MethodContext.cpp \ qpid/framing/BasicHeaderProperties.cpp \ qpid/framing/BodyHandler.cpp \ diff --git a/qpid/cpp/src/cluster.mk b/qpid/cpp/src/cluster.mk index d5b0bed97f..f97e95c208 100644 --- a/qpid/cpp/src/cluster.mk +++ b/qpid/cpp/src/cluster.mk @@ -1,4 +1,4 @@ -#-*-Makefile-*- +# # Cluster library makefile fragment, to be included in Makefile.am # lib_LTLIBRARIES += libqpidcluster.la @@ -6,11 +6,16 @@ lib_LTLIBRARIES += libqpidcluster.la if CLUSTER libqpidcluster_la_SOURCES = \ + qpid/cluster/Cluster.cpp \ + qpid/cluster/Cluster.h \ qpid/cluster/Cpg.cpp \ - qpid/cluster/Cpg.h -libqpidcluster_la_LIBADD= -lcpg + qpid/cluster/Cpg.h \ + qpid/cluster/Dispatchable.h + +libqpidcluster_la_LIBADD= -lcpg libqpidcommon.la else # Empty stub library to satisfy rpm spec file. libqpidcluster_la_SOURCES = + endif diff --git a/qpid/cpp/src/generate.sh b/qpid/cpp/src/generate.sh index a600897cc3..4f97f72684 100755 --- a/qpid/cpp/src/generate.sh +++ b/qpid/cpp/src/generate.sh @@ -7,7 +7,8 @@ set -e gentools_dir="$srcdir/../gentools" specs_dir="$srcdir/../../specs" -specs="$specs_dir/amqp.0-9.xml $specs_dir/amqp-errata.0-9.xml $specs_dir/amqp-dtx-preview.0-9.xml" +specs="$specs_dir/amqp.0-9.xml $specs_dir/amqp-errata.0-9.xml $specs_dir/amqp-dtx-preview.0-9.xml $srcdir/../xml/cluster.xml" + test -z "$JAVA" && JAVA=java ; test -z "$JAVAC" && JAVAC=javac ; diff --git a/qpid/cpp/src/qpid/cluster/Cluster.cpp b/qpid/cpp/src/qpid/cluster/Cluster.cpp new file mode 100644 index 0000000000..30073c4551 --- /dev/null +++ b/qpid/cpp/src/qpid/cluster/Cluster.cpp @@ -0,0 +1,159 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include "Cluster.h" +#include "Cpg.h" +#include "qpid/framing/AMQFrame.h" +#include "qpid/framing/ClusterNotifyBody.h" +#include "qpid/log/Statement.h" +#include <boost/bind.hpp> +#include <algorithm> +#include <iterator> + +namespace qpid { +namespace cluster { +using namespace qpid::framing; +using namespace qpid::sys; +using namespace std; + +ostream& operator <<(ostream& out, const Cluster& cluster) { + return out << cluster.name.str() << "(" << cluster.self << ")"; +} + +void Cluster::notify() { + // TODO aconway 2007-06-25: Use proxy here. + AMQFrame frame(version, 0, + make_shared_ptr(new ClusterNotifyBody(version, url))); + handle(frame); +} + +Cluster::Cluster( + const std::string& name_, const std::string& url_, FrameHandler& next_, + ProtocolVersion ver) + : name(name_), url(url_), version(ver), + cpg(new Cpg(boost::bind(&Cluster::cpgDeliver, this, _1, _2, _3, _4, _5, _6), + boost::bind(&Cluster::cpgConfigChange, this, _1, _2, _3, _4, _5, _6, _7, _8))), + next(next_) +{ + self=Id(cpg->getLocalNoideId(), getpid()); + QPID_LOG(trace, *this << " Joining cluster."); + cpg->join(name); + notify(); + dispatcher=Thread(*this); +} + +Cluster::~Cluster() { + try { + QPID_LOG(trace, *this << " Leaving cluster."); + cpg->leave(name); + cpg.reset(); + dispatcher.join(); + } catch (const std::exception& e) { + QPID_LOG(error, "Exception leaving cluster " << e.what()); + } +} + +void Cluster::handle(AMQFrame& frame) { + QPID_LOG(trace, *this << " SEND: " << frame); + Buffer buf(frame.size()); + frame.encode(buf); + buf.flip(); + iovec iov = { buf.start(), frame.size() }; + cpg->mcast(name, &iov, 1); +} + +size_t Cluster::size() const { + Mutex::ScopedLock l(lock); + return members.size(); +} + +Cluster::MemberList Cluster::getMembers() const { + Mutex::ScopedLock l(lock); + MemberList result(members.size()); + std::transform(members.begin(), members.end(), result.begin(), + boost::bind(&MemberMap::value_type::second, _1)); + return result; +} + +void Cluster::cpgDeliver( + cpg_handle_t /*handle*/, + struct cpg_name* /* group */, + uint32_t nodeid, + uint32_t pid, + void* msg, + int msg_len) +{ + Id from(nodeid, pid); + Buffer buf(static_cast<char*>(msg), msg_len); + AMQFrame frame; + frame.decode(buf); + QPID_LOG(trace, *this << " RECV: " << frame); + // TODO aconway 2007-06-20: use visitor pattern. + ClusterNotifyBody* notifyIn= dynamic_cast<ClusterNotifyBody*>(frame.getBody().get()); + if (notifyIn) { + Mutex::ScopedLock l(lock); + members[from].reset(new Member(notifyIn->getUrl())); + lock.notifyAll(); + } + else + next.handle(frame); +} + +void Cluster::cpgConfigChange( + cpg_handle_t /*handle*/, + struct cpg_name */*group*/, + struct cpg_address *ccMembers, int nMembers, + struct cpg_address *left, int nLeft, + struct cpg_address *joined, int nJoined +) +{ + QPID_LOG( + trace, + *this << " Configuration change. " << endl + << " Joined: " << make_pair(joined, nJoined) << endl + << " Left: " << make_pair(left, nLeft) << endl + << " Current: " << make_pair(ccMembers, nMembers)); + + { + Mutex::ScopedLock l(lock); + // Erase members that left. + for (int i = 0; i < nLeft; ++i) + members.erase(Id(left[i])); + lock.notifyAll(); + } + + // If there are new members (other than myself) then notify. + for (int i=0; i< nJoined; ++i) { + if (Id(joined[i]) != self) { + notify(); + break; + } + } + + // Note: New members are be added to my map when cpgDeliver + // gets a cluster.notify frame. +} + +void Cluster::run() { + cpg->dispatchBlocking(); +} + +}} // namespace qpid::cluster + + + diff --git a/qpid/cpp/src/qpid/cluster/Cluster.h b/qpid/cpp/src/qpid/cluster/Cluster.h new file mode 100644 index 0000000000..1cbbb249f2 --- /dev/null +++ b/qpid/cpp/src/qpid/cluster/Cluster.h @@ -0,0 +1,116 @@ +#ifndef QPID_CLUSTER_CLUSTER_H +#define QPID_CLUSTER_CLUSTER_H + +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include "qpid/cluster/Cpg.h" +#include "qpid/framing/FrameHandler.h" +#include "qpid/sys/Thread.h" +#include "qpid/sys/Monitor.h" +#include "qpid/sys/Runnable.h" +#include "qpid/shared_ptr.h" +#include "qpid/framing/ProtocolVersion.h" +#include <boost/scoped_ptr.hpp> +#include <map> +#include <vector> + +namespace qpid { +namespace cluster { + +/** + * Represents a cluster. Creating an instance joins current process + * to the cluster. + */ +class Cluster : public framing::FrameHandler, private sys::Runnable { + public: + /** Details of a cluster member */ + struct Member { + Member(const std::string& url_) : url(url_) {} + std::string url; + }; + + typedef std::vector<shared_ptr<const Member> > MemberList; + + /** + * Join a cluster. + * @param name of the cluster. + * @param url of this broker, sent to the cluster. + * @param next handler receives the frame when it has been + * acknowledged by the cluster. + */ + Cluster(const std::string& name, + const std::string& url, + framing::FrameHandler& next, + framing::ProtocolVersion); + + ~Cluster(); + + /** Multicast a frame to the cluster. */ + void handle(framing::AMQFrame&); + + /** Get the current cluster membership. */ + MemberList getMembers() const; + + /** Number of members in the cluster. */ + size_t size() const; + + private: + typedef Cpg::Id Id; + typedef std::map<Id, shared_ptr<Member> > MemberMap; + + void run(); + void notify(); + + void cpgDeliver( + cpg_handle_t /*handle*/, + struct cpg_name *group, + uint32_t /*nodeid*/, + uint32_t /*pid*/, + void* /*msg*/, + int /*msg_len*/); + + void cpgConfigChange( + cpg_handle_t /*handle*/, + struct cpg_name */*group*/, + struct cpg_address */*members*/, int /*nMembers*/, + struct cpg_address */*left*/, int /*nLeft*/, + struct cpg_address */*joined*/, int /*nJoined*/ + ); + + Id self; + Cpg::Name name; + std::string url; + framing::ProtocolVersion version; + boost::scoped_ptr<Cpg> cpg; + framing::FrameHandler& next; + MemberMap members; + sys::Thread dispatcher; + + protected: + // Allow access from ClusterTest subclass. + mutable sys::Monitor lock; + + friend std::ostream& operator <<(std::ostream&, const Cluster&); +}; + +}} // namespace qpid::cluster + + + +#endif /*!QPID_CLUSTER_CLUSTER_H*/ diff --git a/qpid/cpp/src/qpid/cluster/Cpg.cpp b/qpid/cpp/src/qpid/cluster/Cpg.cpp index 858d25f37c..a979ce1eeb 100644 --- a/qpid/cpp/src/qpid/cluster/Cpg.cpp +++ b/qpid/cpp/src/qpid/cluster/Cpg.cpp @@ -17,12 +17,86 @@ */ #include "Cpg.h" +#include "qpid/sys/Mutex.h" +#include <vector> +#include <limits> +#include <iterator> namespace qpid { namespace cluster { using namespace std; +// Global vector of Cpg pointers by handle. +// TODO aconway 2007-06-12: Replace this with cpg_get/set_context, +// coming in in RHEL 5.1. +class Cpg::Handles +{ + public: + void put(cpg_handle_t handle, Cpg* object) { + sys::Mutex::ScopedLock l(lock); + assert(object); + uint32_t index=uint32_t(handle); // Lower 32 bits is an array index. + if (index >= handles.size()) + handles.resize(index+1, 0); + handles[index] = object; + } + + Cpg* get(cpg_handle_t handle) { + sys::Mutex::ScopedLock l(lock); + uint32_t index=uint32_t(handle); // Lower 32 bits is an array index. + assert(index < handles.size()); + assert(handles[index]); + return handles[index]; + } + + private: + sys::Mutex lock; + vector<Cpg*> handles; +}; + +Cpg::Handles Cpg::handles; + +// Global callback functions call per-object callbacks via handles vector. +void Cpg::globalDeliver ( + cpg_handle_t handle, + struct cpg_name *group, + uint32_t nodeid, + uint32_t pid, + void* msg, + int msg_len) +{ + handles.get(handle)->deliver(handle, group, nodeid, pid, msg, msg_len); +} + +void Cpg::globalConfigChange( + cpg_handle_t handle, + struct cpg_name *group, + struct cpg_address *members, int nMembers, + struct cpg_address *left, int nLeft, + struct cpg_address *joined, int nJoined +) +{ + handles.get(handle)->configChange(handle, group, members, nMembers, left, nLeft, joined, nJoined); +} + +Cpg::Cpg(DeliverFn d, ConfigChangeFn c) : deliver(d), configChange(c) +{ + cpg_callbacks_t callbacks = { &globalDeliver, &globalConfigChange }; + check(cpg_initialize(&handle, &callbacks), "Cannot initialize CPG"); + handles.put(handle, this); +} + +Cpg::~Cpg() { + try { + check(cpg_finalize(handle), "Error in shutdown of CPG"); + } + catch (...) { + handles.put(handle, 0); + throw; + } +} + string Cpg::errorStr(cpg_error_t err, const std::string& msg) { switch (err) { case CPG_OK: return msg+": ok"; @@ -56,6 +130,34 @@ std::string Cpg::cantMcastMsg(const Name& group) { return "Cannot mcast to CPG group "+group.str(); } +uint32_t Cpg::getLocalNoideId() const { + unsigned int nodeid; + check(cpg_local_get(handle, &nodeid), "Cannot get local node ID"); + assert(nodeid <= std::numeric_limits<uint32_t>::max()); + return nodeid; +} + +ostream& operator<<(ostream& o, std::pair<cpg_address*,int> a) { + ostream_iterator<Cpg::Id> i(o, " "); + std::copy(a.first, a.first+a.second, i); + return o; +} + +static int popbyte(uint32_t& n) { + uint8_t b=n&0xff; + n>>=8; + return b; +} + +ostream& operator <<(ostream& out, const Cpg::Id& id) { + uint32_t node=id.nodeId(); + out << popbyte(node); + for (int i = 0; i < 3; i++) + out << "." << popbyte(node); + return out << ":" << id.pid(); +} + + }} // namespace qpid::cpg diff --git a/qpid/cpp/src/qpid/cluster/Cpg.h b/qpid/cpp/src/qpid/cluster/Cpg.h index 6e61fa8a6e..6b157301a7 100644 --- a/qpid/cpp/src/qpid/cluster/Cpg.h +++ b/qpid/cpp/src/qpid/cluster/Cpg.h @@ -19,7 +19,9 @@ * */ -#include <stdexcept> +#include "qpid/Exception.h" +#include "qpid/cluster/Dispatchable.h" +#include <boost/function.hpp> #include <cassert> #ifdef CLUSTER extern "C" { @@ -34,11 +36,10 @@ namespace cluster { * Manages a single CPG handle, initialized in ctor, finialzed in destructor. * On error all functions throw Cpg::Exception */ -class Cpg { +class Cpg : public Dispatchable { public: - // FIXME aconway 2007-06-01: qpid::Exception - struct Exception : public std::runtime_error { - Exception(const std::string& msg) : runtime_error(msg) {} + struct Exception : public ::qpid::Exception { + Exception(const std::string& msg) : ::qpid::Exception(msg) {} }; struct Name : public cpg_name { @@ -54,26 +55,45 @@ class Cpg { std::string str() const { return std::string(value, length); } }; - static inline std::string str(const cpg_name& n) { + struct Id { + uint64_t id; + Id() : id(0) {} + Id(uint32_t nodeid, uint32_t pid) { id=(uint64_t(nodeid)<<32)+ pid; } + Id(const cpg_address& addr) : id(Id(addr.nodeid, addr.pid)) {} + + operator uint64_t() const { return id; } + uint32_t nodeId() const { return id >> 32; } + pid_t pid() const { return id & 0xFFFF; } + }; + + static std::string str(const cpg_name& n) { return std::string(n.value, n.length); } - // TODO aconway 2007-06-01: when cpg handle supports a context pointer - // use callback objects (boost::function) instead of free functions. - // + typedef boost::function<void ( + cpg_handle_t /*handle*/, + struct cpg_name *group, + uint32_t /*nodeid*/, + uint32_t /*pid*/, + void* /*msg*/, + int /*msg_len*/)> DeliverFn; + + typedef boost::function<void ( + cpg_handle_t /*handle*/, + struct cpg_name */*group*/, + struct cpg_address */*members*/, int /*nMembers*/, + struct cpg_address */*left*/, int /*nLeft*/, + struct cpg_address */*joined*/, int /*nJoined*/ + )> ConfigChangeFn; + /** Open a CPG handle. *@param deliver - free function called when a message is delivered. *@param reconfig - free function called when CPG configuration changes. */ - Cpg(cpg_deliver_fn_t deliver, cpg_confchg_fn_t reconfig) { - cpg_callbacks_t callbacks = { deliver, reconfig }; - check(cpg_initialize(&handle, &callbacks), "Cannot initialize CPG"); - } + Cpg(DeliverFn deliver, ConfigChangeFn reconfig); - /** Disconnect */ - ~Cpg() { - check(cpg_finalize(handle), "Cannot finalize CPG"); - } + /** Disconnect from CPG. */ + ~Cpg(); /** Dispatch CPG events. *@param type one of @@ -85,6 +105,10 @@ class Cpg { check(cpg_dispatch(handle,type), "Error in CPG dispatch"); } + void dispatchOne() { dispatch(CPG_DISPATCH_ONE); } + void dispatchAll() { dispatch(CPG_DISPATCH_ALL); } + void dispatchBlocking() { dispatch(CPG_DISPATCH_BLOCKING); } + void join(const Name& group) { check(cpg_join(handle, const_cast<Name*>(&group)),cantJoinMsg(group)); }; @@ -99,7 +123,14 @@ class Cpg { cantMcastMsg(group)); } + cpg_handle_t getHandle() const { return handle; } + + uint32_t getLocalNoideId() const; + private: + class Handles; + friend class Handles; + static std::string errorStr(cpg_error_t err, const std::string& msg); static std::string cantJoinMsg(const Name&); static std::string cantLeaveMsg(const Name&); @@ -110,9 +141,31 @@ class Cpg { if (result != CPG_OK) throw Exception(errorStr(result, msg)); } + + static void globalDeliver( + cpg_handle_t /*handle*/, + struct cpg_name *group, + uint32_t /*nodeid*/, + uint32_t /*pid*/, + void* /*msg*/, + int /*msg_len*/); + + static void globalConfigChange( + cpg_handle_t /*handle*/, + struct cpg_name */*group*/, + struct cpg_address */*members*/, int /*nMembers*/, + struct cpg_address */*left*/, int /*nLeft*/, + struct cpg_address */*joined*/, int /*nJoined*/ + ); + + static Handles handles; cpg_handle_t handle; + DeliverFn deliver; + ConfigChangeFn configChange; }; +std::ostream& operator <<(std::ostream& out, const Cpg::Id& id); +std::ostream& operator <<(std::ostream& out, const std::pair<cpg_address*,int> addresses); }} // namespace qpid::cluster diff --git a/qpid/cpp/src/qpid/cluster/Dispatchable.h b/qpid/cpp/src/qpid/cluster/Dispatchable.h new file mode 100644 index 0000000000..e7f0df4218 --- /dev/null +++ b/qpid/cpp/src/qpid/cluster/Dispatchable.h @@ -0,0 +1,52 @@ +#ifndef QPID_CLUSTER_DISPATCHABLE_H +#define QPID_CLUSTER_DISPATCHABLE_H + +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +namespace qpid { +namespace cluster { + +/** + * Interface for classes that have some "events" that need dispatching + * in a thread. + */ +class Dispatchable +{ + public: + virtual ~Dispatchable() {} + + /** Dispatch one event in current thread. */ + virtual void dispatchOne() = 0; + /** Dispatch all available events, don't block. */ + virtual void dispatchAll() = 0; + /** Blocking loop to dispatch cluster events */ + virtual void dispatchBlocking() = 0; + + /** Wait for at least one event, then dispatch all available events. + * Don't block. Useful for tests. + */ + virtual void dispatchSome() { dispatchOne(); dispatchAll(); } + +}; + +}} // namespace qpid::cluster + + + +#endif /*!QPID_CLUSTER_DISPATCHABLE_H*/ diff --git a/qpid/cpp/src/qpid/framing/FrameHandler.h b/qpid/cpp/src/qpid/framing/FrameHandler.h new file mode 100644 index 0000000000..817c569119 --- /dev/null +++ b/qpid/cpp/src/qpid/framing/FrameHandler.h @@ -0,0 +1,39 @@ +#ifndef QPID_FRAMING_FRAMEHANDLER_H +#define QPID_FRAMING_FRAMEHANDLER_H +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include <boost/noncopyable.hpp> + +namespace qpid { +namespace framing { +class AMQFrame; + +class FrameHandler : private boost::noncopyable { + public: + virtual ~FrameHandler() {} + virtual void handle(AMQFrame& frame) = 0; +}; + +}} + + +#endif /*!QPID_FRAMING_FRAMEHANDLER_H*/ diff --git a/qpid/cpp/src/qpid/sys/Acceptor.h b/qpid/cpp/src/qpid/sys/Acceptor.h index 5e624a956e..8d6bca8f29 100644 --- a/qpid/cpp/src/qpid/sys/Acceptor.h +++ b/qpid/cpp/src/qpid/sys/Acceptor.h @@ -36,6 +36,7 @@ class Acceptor : public qpid::SharedObject<Acceptor> static Acceptor::shared_ptr create(int16_t port, int backlog, int threads, bool trace = false); virtual ~Acceptor() = 0; virtual uint16_t getPort() const = 0; + virtual std::string getHost() const = 0; virtual void run(qpid::sys::ConnectionInputHandlerFactory* factory) = 0; virtual void shutdown() = 0; }; diff --git a/qpid/cpp/src/qpid/sys/apr/APRAcceptor.cpp b/qpid/cpp/src/qpid/sys/apr/APRAcceptor.cpp index 0f0853b35d..8662e602c2 100644 --- a/qpid/cpp/src/qpid/sys/apr/APRAcceptor.cpp +++ b/qpid/cpp/src/qpid/sys/apr/APRAcceptor.cpp @@ -35,6 +35,7 @@ class APRAcceptor : public Acceptor public: APRAcceptor(int16_t port, int backlog, int threads, bool trace); virtual uint16_t getPort() const; + virtual std::string getHost() const; virtual void run(qpid::sys::ConnectionInputHandlerFactory* factory); virtual void shutdown(); @@ -72,6 +73,12 @@ APRAcceptor::APRAcceptor(int16_t port_, int backlog, int threads, bool trace_) : CHECK_APR_SUCCESS(apr_socket_listen(socket, backlog)); } +std::string APRAcceptor::getHost() const { + apr_sockaddr_t* address; + CHECK_APR_SUCCESS(apr_socket_addr_get(&address, APR_LOCAL, socket)); + return address->hostname; +} + uint16_t APRAcceptor::getPort() const { apr_sockaddr_t* address; CHECK_APR_SUCCESS(apr_socket_addr_get(&address, APR_LOCAL, socket)); diff --git a/qpid/cpp/src/qpid/sys/posix/EventChannelAcceptor.cpp b/qpid/cpp/src/qpid/sys/posix/EventChannelAcceptor.cpp index 1a5fceb56e..cbda216cfc 100644 --- a/qpid/cpp/src/qpid/sys/posix/EventChannelAcceptor.cpp +++ b/qpid/cpp/src/qpid/sys/posix/EventChannelAcceptor.cpp @@ -52,6 +52,7 @@ class EventChannelAcceptor : public Acceptor { ); uint16_t getPort() const; + std::string getHost() const; void run(ConnectionInputHandlerFactory* factory); @@ -100,6 +101,10 @@ uint16_t EventChannelAcceptor::getPort() const { return port; // Immutable no need for lock. } +uint16_t EventChannelAcceptor::getPort() const { + return port; // Immutable no need for lock. +} + void EventChannelAcceptor::run(ConnectionInputHandlerFactory* f) { { Mutex::ScopedLock l(lock); diff --git a/qpid/cpp/src/qpid/sys/posix/PosixAcceptor.cpp b/qpid/cpp/src/qpid/sys/posix/PosixAcceptor.cpp index af200d393d..0575380a14 100644 --- a/qpid/cpp/src/qpid/sys/posix/PosixAcceptor.cpp +++ b/qpid/cpp/src/qpid/sys/posix/PosixAcceptor.cpp @@ -32,6 +32,7 @@ void fail() { throw qpid::Exception("PosixAcceptor not implemented"); } class PosixAcceptor : public Acceptor { public: virtual uint16_t getPort() const { fail(); return 0; } + virtual std::string getPort() const { fail(); return std::string(); } virtual void run(qpid::sys::ConnectionInputHandlerFactory* ) { fail(); } virtual void shutdown() { fail(); } }; diff --git a/qpid/cpp/src/qpid/sys/posix/Socket.cpp b/qpid/cpp/src/qpid/sys/posix/Socket.cpp index 39651fa821..50cbfa7c4d 100644 --- a/qpid/cpp/src/qpid/sys/posix/Socket.cpp +++ b/qpid/cpp/src/qpid/sys/posix/Socket.cpp @@ -112,7 +112,21 @@ int Socket::listen(int port, int backlog) return ntohs(name.sin_port); } - +std::string getHost() const { + // TODO aconway 2007-06-11: Won't work for ip6 + struct sockaddr_in name; + socklen_t namelen = sizeof(name); + if (::getsockname(socket, (struct sockaddr*)&name, &namelen) < 0) + throw QPID_POSIX_ERROR(errno); + uint32_t addr = name.sin_host.s_addr; + ostringstream os; + os << uint8_t(addr >> 24) << '.' + << uint8_t(addr >> 16) << '.' + << uint8_t(addr >> 8) << '.' + << uint8_t(addr); + return os.str(); +} + int Socket::fd() { return socket; diff --git a/qpid/cpp/src/tests/.valgrind.supp-default b/qpid/cpp/src/tests/.valgrind.supp-default index 8b13789179..21fa58db45 100644 --- a/qpid/cpp/src/tests/.valgrind.supp-default +++ b/qpid/cpp/src/tests/.valgrind.supp-default @@ -1 +1,10 @@ +{ + Benign error in libcpg. + + Memcheck:Param + socketcall.sendmsg(msg.msg_iov[i]) + obj:*/libpthread-2.5.so + obj:*/libcpg.so.2.0.0 +} + diff --git a/qpid/cpp/src/tests/Cluster.cpp b/qpid/cpp/src/tests/Cluster.cpp new file mode 100644 index 0000000000..ed50cc5d7b --- /dev/null +++ b/qpid/cpp/src/tests/Cluster.cpp @@ -0,0 +1,72 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#define BOOST_AUTO_TEST_MAIN // Must come before #include<boost/test/*> +#include <boost/test/auto_unit_test.hpp> +#include "test_tools.h" +#include "Cluster.h" +#include "qpid/framing/ChannelOkBody.h" +#include "qpid/framing/BasicGetOkBody.h" + + + +static const ProtocolVersion VER; + +/** Verify membership ind a cluster with one member. */ +BOOST_AUTO_TEST_CASE(clusterOne) { + VectorFrameHandler received; + Cluster cluster("Test", "amqp:one:1", received, VER); + AMQFrame frame(VER, 1, new ChannelOkBody(VER)); + + cluster.handle(frame); + BOOST_REQUIRE(received.waitFor(1)); + BOOST_CHECK_EQUAL(1u, cluster.size()); + Cluster::MemberList members = cluster.getMembers(); + BOOST_CHECK_EQUAL(1u, members.size()); + BOOST_REQUIRE_EQUAL(members.front()->url, "amqp:one:1"); + BOOST_CHECK_EQUAL(1u, received.size()); + BOOST_CHECK_TYPEID_EQUAL(ChannelOkBody, *received[0].getBody()); +} + +/** Fork a process to verify membership in a cluster with two members */ +BOOST_AUTO_TEST_CASE(clusterTwo) { + VectorFrameHandler received; + pid_t pid=fork(); + BOOST_REQUIRE(pid >= 0); + if (pid) { // Parent + TestCluster cluster("Test", "amqp::1", received, VER); + BOOST_REQUIRE(cluster.waitFor(2)); + + // Exchange frames with child. + AMQFrame frame(VER, 1, new ChannelOkBody(VER)); + cluster.handle(frame); + BOOST_REQUIRE(received.waitFor(2)); + BOOST_CHECK_TYPEID_EQUAL(ChannelOkBody, *received[0].getBody()); + BOOST_CHECK_TYPEID_EQUAL(BasicGetOkBody, *received[1].getBody()); + + // Wait for child to exit. + int status; + BOOST_CHECK_EQUAL(::wait(&status), pid); + BOOST_CHECK_EQUAL(0, status); + BOOST_CHECK(cluster.waitFor(1)); + BOOST_CHECK_EQUAL(1u, cluster.size()); + } + else { // Child + BOOST_REQUIRE(execl("Cluster_child", "Cluster_child", NULL)); + } +} diff --git a/qpid/cpp/src/tests/Cluster.h b/qpid/cpp/src/tests/Cluster.h new file mode 100644 index 0000000000..7ca5445e10 --- /dev/null +++ b/qpid/cpp/src/tests/Cluster.h @@ -0,0 +1,83 @@ +#ifndef CLUSTER_H +#define CLUSTER_H + +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include "qpid/cluster/Cluster.h" +#include "qpid/framing/AMQFrame.h" +#include "qpid/framing/ChannelOkBody.h" +#include "qpid/framing/BasicGetOkBody.h" +#include "qpid/log/Logger.h" +#include <iostream> +#include <vector> + +/** + * Definitions for the Cluster.cpp and Cluster_child.cpp child program. + */ + +// using namespace in header file is bad manners, but this is strictly for +// the tests. +using namespace std; +using namespace qpid; +using namespace qpid::cluster; +using namespace qpid::framing; +using namespace qpid::sys; + +struct TestCluster : public Cluster { + TestCluster(const std::string& name, + const std::string& url, + framing::FrameHandler& next, + framing::ProtocolVersion ver) : Cluster(name,url,next, ver) {} + + /** Wait for the cluster to be of expected size (exactly) */ + bool waitFor(size_t n) { + Mutex::ScopedLock l(lock); + AbsTime deadline(now(),2*TIME_SEC); + while(size() != n && lock.wait(deadline)) + ; + return size() == n; + } +}; + +struct VectorFrameHandler : + public std::vector<AMQFrame>, public FrameHandler, public Monitor + +{ + void handle(AMQFrame& f) { + ScopedLock l(*this); + push_back(f); + notifyAll(); + } + + /** Wait for vector to reach size n exactly */ + bool waitFor(size_t n) { + ScopedLock l(*this); + AbsTime deadline(now(), 1*TIME_SEC); + while (size() != n && wait(deadline)) + ; + return size() == n; + } +}; + + +// namespace + + + +#endif /*!CLUSTER_H*/ diff --git a/qpid/cpp/src/tests/unit/Cpg.cpp b/qpid/cpp/src/tests/Cpg.cpp index 74c6532338..97b829ea63 100644 --- a/qpid/cpp/src/tests/unit/Cpg.cpp +++ b/qpid/cpp/src/tests/Cpg.cpp @@ -17,8 +17,11 @@ */ #define BOOST_AUTO_TEST_MAIN // Must come before #include<boost/test/*> +#include <boost/test/auto_unit_test.hpp> #include "test_tools.h" #include "qpid/cluster/Cpg.h" +#include "qpid/framing/AMQBody.h" +#include <boost/bind.hpp> #include <string> #include <iostream> #include <iterator> @@ -27,6 +30,7 @@ using namespace std; using namespace qpid::cluster; +using namespace qpid::framing; // For debugging: op << for CPG types. @@ -47,51 +51,57 @@ ostream& operator<<(ostream& o, const pair<T*, int>& array) { return o; } -const string testGroup("foo"); -vector<string> delivered; -vector<int> configChanges; +struct Callback { + Callback(const string group_) : group(group_) {} + string group; + vector<string> delivered; + vector<int> configChanges; -void testDeliver ( - cpg_handle_t /*handle*/, - struct cpg_name *group, - uint32_t /*nodeid*/, - uint32_t /*pid*/, - void* msg, - int msg_len) -{ - BOOST_CHECK_EQUAL(testGroup, Cpg::str(*group)); - delivered.push_back(string((char*)msg,msg_len)); -} + void deliver ( + cpg_handle_t /*handle*/, + struct cpg_name *grp, + uint32_t /*nodeid*/, + uint32_t /*pid*/, + void* msg, + int msg_len) + { + BOOST_CHECK_EQUAL(group, Cpg::str(*grp)); + delivered.push_back(string((char*)msg,msg_len)); + } -void testConfigChange( - cpg_handle_t /*handle*/, - struct cpg_name *group, - struct cpg_address */*members*/, int nMembers, - struct cpg_address */*left*/, int /*nLeft*/, - struct cpg_address */*joined*/, int /*nJoined*/ -) -{ - BOOST_CHECK_EQUAL(testGroup, Cpg::str(*group)); - configChanges.push_back(nMembers); -} + void configChange( + cpg_handle_t /*handle*/, + struct cpg_name *grp, + struct cpg_address */*members*/, int nMembers, + struct cpg_address */*left*/, int /*nLeft*/, + struct cpg_address */*joined*/, int /*nJoined*/ + ) + { + BOOST_CHECK_EQUAL(group, Cpg::str(*grp)); + configChanges.push_back(nMembers); + } +}; -BOOST_AUTO_TEST_CASE(basic) { +BOOST_AUTO_TEST_CASE(Cpg_basic) { // Verify basic functionality of cpg. This will catch any // openais configuration or permission errors. - // - Cpg cpg(&testDeliver, &testConfigChange); + // Cpg::Name group("foo"); + Callback cb(group.str()); + Cpg::DeliverFn deliver=boost::bind(&Callback::deliver, &cb, _1, _2, _3, _4, _5, _6); + Cpg::ConfigChangeFn reconfig=boost::bind<void>(&Callback::configChange, &cb, _1, _2, _3, _4, _5, _6, _7, _8); + Cpg cpg(deliver, reconfig); cpg.join(group); iovec iov = { (void*)"Hello!", 6 }; cpg.mcast(group, &iov, 1); cpg.leave(group); + cpg.dispatchSome(); - cpg.dispatch(CPG_DISPATCH_ONE); // Wait for at least one. - cpg.dispatch(CPG_DISPATCH_ALL); - BOOST_REQUIRE_EQUAL(1u, delivered.size()); - BOOST_CHECK_EQUAL("Hello!", delivered.front()); - BOOST_REQUIRE_EQUAL(2u, configChanges.size()); - BOOST_CHECK_EQUAL(1, configChanges[0]); - BOOST_CHECK_EQUAL(0, configChanges[1]); + BOOST_REQUIRE_EQUAL(1u, cb.delivered.size()); + BOOST_CHECK_EQUAL("Hello!", cb.delivered.front()); + BOOST_REQUIRE_EQUAL(2u, cb.configChanges.size()); + BOOST_CHECK_EQUAL(1, cb.configChanges[0]); + BOOST_CHECK_EQUAL(0, cb.configChanges[1]); } + diff --git a/qpid/cpp/src/tests/Makefile.am b/qpid/cpp/src/tests/Makefile.am index 76d675ee12..ab772ea744 100644 --- a/qpid/cpp/src/tests/Makefile.am +++ b/qpid/cpp/src/tests/Makefile.am @@ -1,25 +1,34 @@ AM_CXXFLAGS = $(WARNING_CFLAGS) $(CPPUNIT_CXXFLAGS) $(APR_CXXFLAGS) INCLUDES = -I$(srcdir)/.. -I$(srcdir)/../gen -I$(top_builddir)/src/gen +abs_builddir=@abs_builddir@ +extra_libs = $(CPPUNIT_LIBS) +lib_client = $(abs_builddir)/../libqpidclient.la +lib_common = $(abs_builddir)/../libqpidcommon.la +lib_broker = $(abs_builddir)/../libqpidbroker.la + # -# Unit test programs. +# Initialize variables that are incremented with += # -UNIT_TESTS= +check_PROGRAMS= +unit_progs= +unit_wrappers= -UNIT_TESTS+=logging -logging_SOURCES=unit/logging.cpp unit/test_tools.h -logging_LDADD=-lboost_unit_test_framework -lboost_regex $(lib_common) +# +# Unit test programs. +# +unit_progs+=logging +logging_SOURCES=logging.cpp test_tools.h +logging_LDADD=-lboost_unit_test_framework -lboost_regex $(lib_common) -UNIT_TESTS+=Url -Url_SOURCES=unit/Url.cpp unit/test_tools.h -Url_LDADD=-lboost_unit_test_framework $(lib_common) +unit_progs+=Url +Url_SOURCES=Url.cpp test_tools.h +Url_LDADD=-lboost_unit_test_framework $(lib_common) -if CLUSTER include cluster.mk -endif # NB: CppUnit test libraries below will be migrated to boost test programs. -# UNIT_TESTS+= ... +# # Unit tests broker_unit_tests = \ @@ -74,7 +83,7 @@ testprogs = \ topic_publisher -check_PROGRAMS = $(UNIT_TESTS) $(AIS_UNIT_TESTS) $(testprogs) interop_runner +check_PROGRAMS += $(unit_progs) $(testprogs) interop_runner # FIXME aconway 2007-05-30: TESTS_ENVIRONMENT should have ./run_test # as below to run valgrind on all test programs. @@ -83,13 +92,12 @@ check_PROGRAMS = $(UNIT_TESTS) $(AIS_UNIT_TESTS) $(testprogs) interop_runner #TESTS_ENVIRONMENT = VALGRIND=$(VALGRIND) srcdir=$(srcdir) ./run_test TESTS_ENVIRONMENT = VALGRIND=$(VALGRIND) srcdir=$(srcdir) -SYSTEM_TESTS = client_test quick_topictest -TESTS = $(UNIT_TESTS) run-unit-tests $(RUN_AIS_TESTS) start_broker $(SYSTEM_TESTS) python_tests kill_broker daemon_test +system_tests = client_test quick_topictest +TESTS = dummy_test $(unit_progs) $(unit_wrappers) run-unit-tests start_broker $(system_tests) python_tests kill_broker daemon_test EXTRA_DIST = \ test_env run_test \ run-unit-tests start_broker python_tests kill_broker daemon_test \ - ais_unit_tests \ quick_topictest \ topictest \ .valgrind.supp-default \ @@ -106,12 +114,6 @@ check_LTLIBRARIES += libdlclose_noop.la libdlclose_noop_la_LDFLAGS = -module -rpath /home/aconway/svn/qpid/cpp/tests libdlclose_noop_la_SOURCES = dlclose_noop.c -abs_builddir = @abs_builddir@ -extra_libs = $(CPPUNIT_LIBS) -lib_client = $(abs_builddir)/../libqpidclient.la -lib_common = $(abs_builddir)/../libqpidcommon.la -lib_broker = $(abs_builddir)/../libqpidbroker.la - gen.mk: Makefile.am ( \ for i in $(testprogs); do \ @@ -131,11 +133,14 @@ gen.mk: Makefile.am > $@-t mv $@-t $@ -check: .valgrindrc .valgrind.supp - check-unit: $(MAKE) check TESTS=$(UNIT_TESTS) run-unit-tests +# Dummy test to force necessary test files to be generated. +dummy_test: .valgrind.supp .valgrindrc + { echo "#!/bin/sh"; echo "# Dummy test, does nothing. "; } > $@ + chmod a+x $@ + # Create a copy so that can be modified without risk of committing the changes. .valgrindrc: .valgrindrc-default cp $^ $@ @@ -147,7 +152,7 @@ check-unit: # ltmain invocations, one may corrupt the temporaries of the other. .NOTPARALLEL: -CLEANFILES=valgrind.out qpidd.log .valgrindrc .valgrind.supp +CLEANFILES=valgrind.out qpidd.log .valgrindrc .valgrind.supp dummy_test $(unit_wrappers) MAINTAINERCLEANFILES=gen.mk interop_runner_SOURCES = \ diff --git a/qpid/cpp/src/tests/unit/Url.cpp b/qpid/cpp/src/tests/Url.cpp index a8b415e641..09aabb80b3 100644 --- a/qpid/cpp/src/tests/unit/Url.cpp +++ b/qpid/cpp/src/tests/Url.cpp @@ -17,6 +17,7 @@ */ #define BOOST_AUTO_TEST_MAIN // Must come before #include<boost/test/*> +#include <boost/test/auto_unit_test.hpp> #include "test_tools.h" #include "qpid/Url.h" #include <boost/assign.hpp> diff --git a/qpid/cpp/src/tests/ais_unit_tests b/qpid/cpp/src/tests/ais_unit_tests deleted file mode 100755 index 9758891891..0000000000 --- a/qpid/cpp/src/tests/ais_unit_tests +++ /dev/null @@ -1,2 +0,0 @@ -#!/bin/sh -make check-ais diff --git a/qpid/cpp/src/tests/cluster.mk b/qpid/cpp/src/tests/cluster.mk index 489eec748e..fd8f789363 100644 --- a/qpid/cpp/src/tests/cluster.mk +++ b/qpid/cpp/src/tests/cluster.mk @@ -1,23 +1,35 @@ -#-*-Makefile-*- +if CLUSTER # Cluster tests makefile fragment, to be included in Makefile.am # lib_cluster = $(abs_builddir)/../libqpidcluster.la -# -# AIS_UNIT_TESTS must be called with gid=ais. They are run -# separately under sudo -u ais. -# -AIS_UNIT_TESTS= -AIS_UNIT_TESTS+=Cpg -Cpg_SOURCES=unit/Cpg.cpp -Cpg_LDADD=-lboost_unit_test_framework $(lib_cluster) - -RUN_AIS_TESTS=ais_unit_tests # Run ais unit tests via check-ais. +# NOTE: Programs using the openais library must be run with gid=ais +# Such programs are built as *.ais, with a wrapper script *.sh that +# runs the program under sudo -u ais. +# +# Rule to generate wrappers. # The chmod is a horrible hack to allow libtools annoying wrapers to # relink the executable when run as user ais. -check-ais: $(AIS_UNIT_TESTS) +.ais.sh: + echo sudo -u ais env VALGRIND=$(VALGRIND) srcdir=$(srcdir) $(srcdir)/run_test ./$< >$@; chmod a+x $@ chmod a+rwx . .libs - sudo -u ais $(MAKE) check TESTS=$(AIS_UNIT_TESTS) +# Cluster tests. +# +check_PROGRAMS+=Cpg.ais +Cpg_ais_SOURCES=Cpg.cpp +Cpg_ais_LDADD=$(lib_cluster) -lboost_unit_test_framework +unit_wrappers+=Cpg.sh + +check_PROGRAMS+=Cluster.ais +Cluster_ais_SOURCES=Cluster.cpp Cluster.h +Cluster_ais_LDADD=$(lib_cluster) -lboost_unit_test_framework +unit_wrappers+=Cluster.sh + +check_PROGRAMS+=Cluster_child +Cluster_child_SOURCES=Cluster_child.cpp Cluster.h +Cluster_child_LDADD=$(lib_cluster) -lboost_test_exec_monitor + +endif diff --git a/qpid/cpp/src/tests/unit/logging.cpp b/qpid/cpp/src/tests/logging.cpp index c80bf7b337..ebe8f4d6e8 100644 --- a/qpid/cpp/src/tests/unit/logging.cpp +++ b/qpid/cpp/src/tests/logging.cpp @@ -17,6 +17,7 @@ */ #define BOOST_AUTO_TEST_MAIN // Must come before #include<boost/test/*> +#include <boost/test/auto_unit_test.hpp> #include "test_tools.h" #include "qpid/log/Logger.h" #include "qpid/log/Options.h" diff --git a/qpid/cpp/src/tests/run_test b/qpid/cpp/src/tests/run_test index ef608e55ca..bfd6991481 100755 --- a/qpid/cpp/src/tests/run_test +++ b/qpid/cpp/src/tests/run_test @@ -22,7 +22,7 @@ vg_check() grep -E '^==[0-9]+== ERROR SUMMARY:' $VG_LOG > /dev/null || \ vg_failed "No valgrind ERROR SUMMARY line in $$vg_failed." # Ensure that the number of errors is 0. - grep -E '^==[0-9]+== ERROR SUMMARY: [^0] ' $VG_LOG > /dev/null && \ + grep -E '^==[0-9]+== ERROR SUMMARY: [^0]' $VG_LOG > /dev/null && \ vg_failed "Valgrind reported errors in $vg_out; see above." # Check for leaks. grep -E '^==[0-9]+== +.* lost: [^0]' $VG_LOG && \ @@ -41,14 +41,14 @@ if grep -l "^# Generated by .*libtool" "$1" >/dev/null 2>&1; then # This is a libtool "executable". Valgrind it if VALGRIND specified. test -n "$VALGRIND" && VALGRIND="$VALGRIND --log-file-exactly=$VG_LOG --" # Hide output unless there's an error. - libtool --mode=execute $VALGRIND "$@" >$TEST_LOG 2>&1 || { + libtool --mode=execute "$VALGRIND" "$@" >$TEST_LOG 2>&1 || { ERROR=$? cat $TEST_LOG } test -n "$VALGRIND" && vg_check else # This is a non-libtool shell script, just execute it. - "$@" + exec "$@" fi if test -z "$ERROR"; then diff --git a/qpid/cpp/src/tests/unit/test_tools.h b/qpid/cpp/src/tests/test_tools.h index faa198af9a..e564b9a473 100644 --- a/qpid/cpp/src/tests/unit/test_tools.h +++ b/qpid/cpp/src/tests/test_tools.h @@ -19,7 +19,7 @@ * */ -#include <boost/test/auto_unit_test.hpp> +#include <boost/test/test_tools.hpp> #include <boost/assign/list_of.hpp> #include <boost/regex.hpp> #include <vector> @@ -49,5 +49,8 @@ inline bool regexPredicate(const std::string& re, const std::string& text) { #define BOOST_CHECK_REGEX(re, text) \ BOOST_CHECK_PREDICATE(regexPredicate, (re)(text)) +/** Check if types of two objects (as given by typeinfo::name()) match. */ +#define BOOST_CHECK_TYPEID_EQUAL(a,b) BOOST_CHECK_EQUAL(typeid(a).name(),typeid(b).name()) + #endif /*!TEST_TOOLS_H*/ diff --git a/qpid/cpp/xml/cluster.xml b/qpid/cpp/xml/cluster.xml new file mode 100644 index 0000000000..2c9746e908 --- /dev/null +++ b/qpid/cpp/xml/cluster.xml @@ -0,0 +1,37 @@ +<?xml version="1.0"?> +<!-- + - + - Licensed to the Apache Software Foundation (ASF) under one + - or more contributor license agreements. See the NOTICE file + - distributed with this work for additional information + - regarding copyright ownership. The ASF licenses this file + - to you under the Apache License, Version 2.0 (the + - "License"); you may not use this file except in compliance + - with the License. You may obtain a copy of the License at + - + - http://www.apache.org/licenses/LICENSE-2.0 + - + - Unless required by applicable law or agreed to in writing, + - software distributed under the License is distributed on an + - "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + - KIND, either express or implied. See the License for the + - specific language governing permissions and limitations + - under the License. + - + --> + +<amqp major="0" minor="9" port="5672" comment="AMQ protocol 0.80"> + +<class name = "cluster" index = "301"> + +<doc>Qpid extension class to allow clustered brokers to communicate.</doc> + +<method name = "notify" index="10"> + <doc>Notify the cluster of a members URL</doc> + <!-- No chassis element, this is handled by separte cluster code for now.--> + <field name = "url" type = "longstr" /> +</method> + +</class> + +</amqp> |