summaryrefslogtreecommitdiff
path: root/cpp/src
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2007-07-02 22:35:33 +0000
committerAlan Conway <aconway@apache.org>2007-07-02 22:35:33 +0000
commit83b4417af81df92cb640de1694488156ba29d85f (patch)
tree630449e321fb571476080b737febd841e605ff2d /cpp/src
parenta36bef1975b1d273a65dd0e74994106fbaad4389 (diff)
downloadqpid-python-83b4417af81df92cb640de1694488156ba29d85f.tar.gz
2007-06-30 <aconway@redhat.com>
* src/qpid/cluster/Cluster.cpp: Refactor - expose 4 handler points for all traffic to/from cluster. Removed HandlerUpdater functionality, separate class. Cluster only deals with membership and connecting the 4 handler points to CPG multicast. * src/tests/cluster.mk: Dropped newgrp ais wrapper scripts, its much simpler if the user just does "newgrp ais" before building. * src/tests/ais_check: Test script to check if users gid is ais and give clear notice if not. * src/tests/Cluster.cpp: Updated for changes to Cluster. * src/qpid/cluster/Cpg.cpp: Better messages for common errors. * Handler.h: Remove nextHandler() minor convenience is outweighted by risk of undetected errors if handlers that expect next() to be set are called when it's not set. * src/qpid/cluster/Cpg.cpp: Added logging. Replaced boost::function with traditional virtual interface (nasty stack traces.) git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@552614 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src')
-rw-r--r--cpp/src/cluster.mk2
-rw-r--r--cpp/src/qpid/cluster/ChannelManager.cpp85
-rw-r--r--cpp/src/qpid/cluster/ChannelManager.h66
-rw-r--r--cpp/src/qpid/cluster/Cluster.cpp197
-rw-r--r--cpp/src/qpid/cluster/Cluster.h105
-rw-r--r--cpp/src/qpid/cluster/ClusterPluginProvider.cpp8
-rw-r--r--cpp/src/qpid/cluster/Cpg.cpp48
-rw-r--r--cpp/src/qpid/cluster/Cpg.h53
-rw-r--r--cpp/src/qpid/framing/Handler.h9
-rw-r--r--cpp/src/qpid/log/Selector.h2
-rw-r--r--cpp/src/tests/Cluster.cpp47
-rw-r--r--cpp/src/tests/Cluster.h52
-rw-r--r--cpp/src/tests/Cluster_child.cpp18
-rw-r--r--cpp/src/tests/Cpg.cpp9
-rw-r--r--cpp/src/tests/Makefile.am29
-rwxr-xr-xcpp/src/tests/ais_check16
-rw-r--r--cpp/src/tests/cluster.mk36
17 files changed, 353 insertions, 429 deletions
diff --git a/cpp/src/cluster.mk b/cpp/src/cluster.mk
index 4eddf4ffe7..07b4f045cc 100644
--- a/cpp/src/cluster.mk
+++ b/cpp/src/cluster.mk
@@ -11,8 +11,6 @@ libqpidcluster_la_SOURCES = \
qpid/cluster/Cpg.cpp \
qpid/cluster/Cpg.h \
qpid/cluster/Dispatchable.h \
- qpid/cluster/ChannelManager.h \
- qpid/cluster/ChannelManager.cpp \
qpid/cluster/ClusterPluginProvider.cpp
libqpidcluster_la_LIBADD= -lcpg libqpidbroker.la
diff --git a/cpp/src/qpid/cluster/ChannelManager.cpp b/cpp/src/qpid/cluster/ChannelManager.cpp
deleted file mode 100644
index f573d78ca1..0000000000
--- a/cpp/src/qpid/cluster/ChannelManager.cpp
+++ /dev/null
@@ -1,85 +0,0 @@
-/*
- *
- * Copyright (c) 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-#include "qpid/log/Statement.h"
-#include "qpid/framing/amqp_types.h"
-#include "qpid/framing/AMQFrame.h"
-#include "ChannelManager.h"
-
-namespace qpid {
-namespace cluster {
-
-using namespace framing;
-
-/** Handler to multicast to the cluster */
-struct ClusterHandler : public FrameHandler {
-
- ClusterHandler(FrameHandler::Chain next, ChannelId bitmask_)
- : FrameHandler(next), bitmask(bitmask_) {}
-
- void handle(AMQFrame& frame) {
- frame.channel |= bitmask; // Mark the frame
- nextHandler(frame);
- // TODO aconway 2007-06-28: Right now everything is backed up
- // via multicast. When we have point-to-point backups this
- // function must determine where each frame should be sent: to
- // multicast or only to specific backup(s) via AMQP.
- }
-
- ChannelId bitmask;
-};
-
-ChannelManager::ChannelManager(FrameHandler::Chain mcast) : mcastOut(mcast){}
-
-void ChannelManager::update(ChannelId id, FrameHandler::Chains& chains) {
- // Store the original cluster chains for the channel.
- channels[id] = chains;
-
- // Replace chains with multicast-to-cluster handlers that mark the
- // high-bit of the channel ID on outgoing frames so we can tell
- // them from incoming frames in handle()
- //
- // When handle() receives the frames from the cluster it
- // will forward them to the original channel chains stored in
- // channels map.
- //
- chains.in = make_shared_ptr(new ClusterHandler(mcastOut, 0));
- chains.out= make_shared_ptr(new ClusterHandler(mcastOut, CHANNEL_HIGH_BIT));
-}
-
-void ChannelManager::handle(AMQFrame& frame) {
- bool isOut = frame.channel | CHANNEL_HIGH_BIT;
- frame.channel |= ~CHANNEL_HIGH_BIT; // Clear the bit.
- ChannelMap::iterator i = channels.find(frame.getChannel());
- if (i != channels.end()) {
- Chain& chain = isOut ? i->second.out : i->second.in;
- chain->handle(frame);
- }
- else
- updateFailoverState(frame);
-}
-
-void ChannelManager::updateFailoverState(AMQFrame& ) {
- QPID_LOG(critical, "Failover is not implemented");
- // FIXME aconway 2007-06-28:
- // If the channel is not in my map then I'm not primary so
- // I don't pass the frame to the channel handler but I
- // do need to update the session failover state.
-}
-
-}} // namespace qpid::cluster
diff --git a/cpp/src/qpid/cluster/ChannelManager.h b/cpp/src/qpid/cluster/ChannelManager.h
deleted file mode 100644
index 59fce77957..0000000000
--- a/cpp/src/qpid/cluster/ChannelManager.h
+++ /dev/null
@@ -1,66 +0,0 @@
-#ifndef QPID_CLUSTER_CHANNELMANAGER_H
-#define QPID_CLUSTER_CHANNELMANAGER_H
-
-/*
- *
- * Copyright (c) 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-#include "qpid/framing/HandlerUpdater.h"
-#include <map>
-
-namespace qpid {
-namespace cluster {
-
-/**
- * Manage channels and handler chains on channels for the cluster.
- *
- * As HandlerUpdater plugin, updates channel handler chains with
- * cluster handlers.
- *
- * As a FrameHandler handles frames coming from the cluster and
- * dispatches them to the appropriate channel handler.
- *
- */
-class ChannelManager : public framing::HandlerUpdater,
- public framing::FrameHandler
-{
- public:
- /** Takes a handler to send frames to the cluster. */
- ChannelManager(framing::FrameHandler::Chain mcastOut);
-
- /** As FrameHandler handle frames from the cluster */
- void handle(framing::AMQFrame& frame);
-
- /** As ChannelUpdater update the handler chains. */
- void update(framing::ChannelId id, framing::FrameHandler::Chains& chains);
-
- private:
- void updateFailoverState(framing::AMQFrame&);
-
- typedef std::map<framing::ChannelId,
- framing::FrameHandler::Chains> ChannelMap;
-
- framing::FrameHandler::Chain mcastOut;
- ChannelMap channels;
-};
-
-
-}} // namespace qpid::cluster
-
-
-
-#endif /*!QPID_CLUSTER_CHANNELMANAGER_H*/
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp
index 8d898eefa3..e691ad357d 100644
--- a/cpp/src/qpid/cluster/Cluster.cpp
+++ b/cpp/src/qpid/cluster/Cluster.cpp
@@ -32,65 +32,89 @@ using namespace qpid::sys;
using namespace std;
ostream& operator <<(ostream& out, const Cluster& cluster) {
- return out << cluster.name.str() << "(" << cluster.self << ")";
+ return out << "cluster[" << cluster.name.str() << " " << cluster.self << "]";
}
-namespace {
-Cluster::Member::Status statusMap[CPG_REASON_PROCDOWN+1];
-struct StatusMapInit {
- StatusMapInit() {
- statusMap[CPG_REASON_JOIN] = Cluster::Member::JOIN;
- statusMap[CPG_REASON_LEAVE] = Cluster::Member::LEAVE;
- statusMap[CPG_REASON_NODEDOWN] = Cluster::Member::NODEDOWN;
- statusMap[CPG_REASON_NODEUP] = Cluster::Member::NODEUP;
- statusMap[CPG_REASON_PROCDOWN] = Cluster::Member::PROCDOWN;
- }
-} instance;
+ostream& operator<<(ostream& out, const Cluster::MemberMap::value_type& m) {
+ return out << m.first << "=" << m.second->url;
}
-Cluster::Member::Member(const cpg_address& addr)
- : status(statusMap[addr.reason]) {}
+ostream& operator <<(ostream& out, const Cluster::MemberMap& members) {
+ ostream_iterator<Cluster::MemberMap::value_type> o(out, " ");
+ copy(members.begin(), members.end(), o);
+ return out;
+}
+
+namespace {
+
+/** We mark the high bit of a frame's channel number to know if it's
+ * an incoming or outgoing frame when frames arrive via multicast.
+ */
+bool isOutgoing(AMQFrame& frame) { return frame.channel&CHANNEL_HIGH_BIT; }
+bool isIncoming(AMQFrame& frame) { return !isOutgoing(frame); }
+void markOutgoing(AMQFrame& frame) { frame.channel |= CHANNEL_HIGH_BIT; }
+void markIncoming(AMQFrame&) { /*noop*/ }
+void unMark(AMQFrame& frame) { frame.channel &= ~CHANNEL_HIGH_BIT; }
-void Cluster::notify() {
- ProtocolVersion version;
- // TODO aconway 2007-06-25: Use proxy here.
- AMQFrame frame(version, 0,
- make_shared_ptr(new ClusterNotifyBody(version, url)));
- handle(frame);
}
+struct Cluster::IncomingHandler : public FrameHandler {
+ IncomingHandler(Cluster& c) : cluster(c) {}
+ void handle(AMQFrame& frame) {
+ markIncoming(frame);
+ cluster.mcast(frame);
+ }
+ Cluster& cluster;
+};
+
+struct Cluster::OutgoingHandler : public FrameHandler {
+ OutgoingHandler(Cluster& c) : cluster(c) {}
+ void handle(AMQFrame& frame) {
+ markOutgoing(frame);
+ cluster.mcast(frame);
+ }
+ Cluster& cluster;
+};
+
+
+// TODO aconway 2007-06-28: Right now everything is backed up via
+// multicast. When we have point-to-point backups the
+// Incoming/Outgoing handlers must determine where each frame should
+// be sent: to multicast or only to specific backup(s) via AMQP.
+
Cluster::Cluster(const std::string& name_, const std::string& url_) :
+ cpg(new Cpg(*this)),
name(name_),
url(url_),
- cpg(new Cpg(
- boost::bind(&Cluster::cpgDeliver, this, _1, _2, _3, _4, _5, _6),
- boost::bind(&Cluster::cpgConfigChange, this, _1, _2, _3, _4, _5, _6, _7, _8))),
- self(cpg->getLocalNoideId(), getpid())
-{}
-
-void Cluster::join(FrameHandler::Chain next) {
+ self(cpg->getLocalNoideId(), getpid()),
+ toChains(new IncomingHandler(*this), new OutgoingHandler(*this))
+{
QPID_LOG(trace, *this << " Joining cluster.");
- next = next;
- dispatcher=Thread(*this);
cpg->join(name);
notify();
+ dispatcher=Thread(*this);
+ // Wait till we show up in the cluster map.
+ {
+ Mutex::ScopedLock l(lock);
+ while (empty())
+ lock.wait();
+ }
}
Cluster::~Cluster() {
- if (cpg) {
- try {
- QPID_LOG(trace, *this << " Leaving cluster.");
- cpg->leave(name);
- cpg.reset();
- dispatcher.join();
- } catch (const std::exception& e) {
- QPID_LOG(error, "Exception leaving cluster " << e.what());
- }
+ QPID_LOG(trace, *this << " Leaving cluster.");
+ try {
+ cpg->leave(name);
+ cpg.reset();
+ dispatcher.join();
+ }
+ catch (const std::exception& e) {
+ QPID_LOG(error, "Exception leaving cluster " << *this << ": "
+ << e.what());
}
}
-void Cluster::handle(AMQFrame& frame) {
- assert(cpg);
+void Cluster::mcast(AMQFrame& frame) {
QPID_LOG(trace, *this << " SEND: " << frame);
Buffer buf(frame.size());
frame.encode(buf);
@@ -99,11 +123,24 @@ void Cluster::handle(AMQFrame& frame) {
cpg->mcast(name, &iov, 1);
}
+void Cluster::notify() {
+ // TODO aconway 2007-06-25: Use proxy here.
+ ProtocolVersion version;
+ AMQFrame frame(version, 0,
+ make_shared_ptr(new ClusterNotifyBody(version, url)));
+ mcast(frame);
+}
+
size_t Cluster::size() const {
Mutex::ScopedLock l(lock);
return members.size();
}
+void Cluster::setFromChains(const framing::FrameHandler::Chains& chains) {
+ Mutex::ScopedLock l(lock);
+ fromChains = chains;
+}
+
Cluster::MemberList Cluster::getMembers() const {
Mutex::ScopedLock l(lock);
MemberList result(members.size());
@@ -112,7 +149,7 @@ Cluster::MemberList Cluster::getMembers() const {
return result;
}
-void Cluster::cpgDeliver(
+void Cluster::deliver(
cpg_handle_t /*handle*/,
struct cpg_name* /* group */,
uint32_t nodeid,
@@ -124,61 +161,71 @@ void Cluster::cpgDeliver(
Buffer buf(static_cast<char*>(msg), msg_len);
AMQFrame frame;
frame.decode(buf);
- QPID_LOG(trace, *this << " RECV: " << frame);
- // TODO aconway 2007-06-20: use visitor pattern.
+ QPID_LOG(trace, *this << " RECV: " << frame << " from: " << from);
+ if (!handleClusterFrame(from, frame)) {
+ FrameHandler::Chain chain = isIncoming(frame) ? fromChains.in : fromChains.out;
+ unMark(frame);
+ if (chain)
+ chain->handle(frame);
+ }
+}
+
+bool Cluster::wait(boost::function<bool(const Cluster&)> predicate,
+ Duration timeout) const
+{
+ AbsTime deadline(now(), timeout);
+ Mutex::ScopedLock l(lock);
+ while (!predicate(*this) && lock.wait(deadline))
+ ;
+ return (predicate(*this));
+}
+
+bool Cluster::handleClusterFrame(Id from, AMQFrame& frame) {
+ // TODO aconway 2007-06-20: use visitor pattern here.
ClusterNotifyBody* notifyIn=
dynamic_cast<ClusterNotifyBody*>(frame.getBody().get());
if (notifyIn) {
+ MemberList list;
{
Mutex::ScopedLock l(lock);
- assert(members[from]);
- members[from]->url = notifyIn->getUrl();
- members[from]->status = Member::BROKER;
+ if (!members[from])
+ members[from].reset(new Member(url));
+ else
+ members[from]->url = notifyIn->getUrl();
+ QPID_LOG(trace, *this << ": member update: " << members);
+ lock.notifyAll();
}
- if (callback)
- callback();
+ return true;
}
- else
- next->handle(frame);
+ return false;
}
-void Cluster::cpgConfigChange(
+void Cluster::configChange(
cpg_handle_t /*handle*/,
struct cpg_name */*group*/,
- struct cpg_address *current, int nCurrent,
+ struct cpg_address */*current*/, int /*nCurrent*/,
struct cpg_address *left, int nLeft,
- struct cpg_address *joined, int nJoined
-)
+ struct cpg_address *joined, int nJoined)
{
- QPID_LOG(trace,
- *this << " Configuration change. " << endl
- << " Joined: " << make_pair(joined, nJoined) << endl
- << " Left: " << make_pair(left, nLeft) << endl
- << " Current: " << make_pair(current, nCurrent));
-
- bool needNotify=false;
+ bool newMembers=false;
MemberList updated;
{
Mutex::ScopedLock l(lock);
- for (int i = 0; i < nJoined; ++i) {
- Id id(current[i]);
- members[id].reset(new Member(current[i]));
- if (id != self)
- needNotify = true; // Notify new members other than myself.
+ if (nLeft) {
+ for (int i = 0; i < nLeft; ++i)
+ members.erase(Id(left[i]));
+ QPID_LOG(trace, *this << ": members left: " << members);
+ lock.notifyAll();
}
- for (int i = 0; i < nLeft; ++i)
- members.erase(Id(current[i]));
- } // End of locked scope.
- if (needNotify)
+ newMembers = nJoined > 1 || (nJoined==1 && Id(joined[0]) != self);
+ // We don't record members joining here, we record them when
+ // we get their ClusterNotify message.
+ }
+ if (newMembers)
notify();
- if (callback)
- callback();
}
-void Cluster::setCallback(boost::function<void()> f) { callback=f; }
-
void Cluster::run() {
- assert(cpg);
cpg->dispatchBlocking();
}
diff --git a/cpp/src/qpid/cluster/Cluster.h b/cpp/src/qpid/cluster/Cluster.h
index aff213b6c9..199a93a7c5 100644
--- a/cpp/src/qpid/cluster/Cluster.h
+++ b/cpp/src/qpid/cluster/Cluster.h
@@ -21,89 +21,80 @@
#include "qpid/cluster/Cpg.h"
#include "qpid/framing/FrameHandler.h"
-#include "qpid/sys/Thread.h"
+#include "qpid/shared_ptr.h"
#include "qpid/sys/Monitor.h"
#include "qpid/sys/Runnable.h"
-#include "qpid/shared_ptr.h"
+#include "qpid/sys/Thread.h"
+#include "qpid/log/Logger.h"
+
#include <boost/function.hpp>
#include <boost/scoped_ptr.hpp>
+
#include <map>
#include <vector>
-namespace qpid {
-
-namespace broker {
-class HandlerUpdater;
-}
-
-namespace cluster {
-
-class ChannelManager;
+namespace qpid { namespace cluster {
/**
* Represents a cluster, provides access to data about members.
- *
- * Implements a FrameHandler that multicasts frames to the cluster.
- *
- * Requires a handler for frames arriving from the cluster,
- * normally a ChannelManager but other handlers could be interposed
- * for testing, logging etc.
+ * Implements HandlerUpdater to manage handlers that route frames to
+ * and from the cluster.
*/
-class Cluster : public framing::FrameHandler, private sys::Runnable {
+class Cluster : private sys::Runnable, private Cpg::Handler
+{
public:
/** Details of a cluster member */
struct Member {
typedef shared_ptr<const Member> Ptr;
- /** Status of a cluster member. */
- enum Status {
- JOIN, ///< Process joined the group.
- LEAVE, ///< Process left the group cleanly.
- NODEDOWN, ///< Process's node went down.
- NODEUP, ///< Process's node joined the cluster.
- PROCDOWN, ///< Process died without leaving.
- BROKER ///< Broker details are available.
- };
-
- Member(const cpg_address&);
- std::string url;
- Status status;
+ Member(const std::string& url_) : url(url_) {}
+ std::string url; ///< Broker address.
};
typedef std::vector<Member::Ptr> MemberList;
-
+
/**
- * Create a cluster object but do not joing.
+ * Join a cluster.
* @param name of the cluster.
* @param url of this broker, sent to the cluster.
*/
Cluster(const std::string& name, const std::string& url);
- ~Cluster();
-
- /** Join the cluster.
- *@handler is the handler for frames arriving from the cluster.
- */
- void join(framing::FrameHandler::Chain handler);
-
- /** Multicast a frame to the cluster. */
- void handle(framing::AMQFrame&);
+ virtual ~Cluster();
/** Get the current cluster membership. */
MemberList getMembers() const;
- /** Called when membership changes. */
- void setCallback(boost::function<void()>);
-
/** Number of members in the cluster. */
size_t size() const;
+ bool empty() const { return size() == 0; }
+
+ /** Get handler chains to send frames to the cluster */
+ framing::FrameHandler::Chains getToChains() {
+ return toChains;
+ }
+
+ /** Set handler chains for frames received from the cluster */
+ void setFromChains(const framing::FrameHandler::Chains& chains);
+
+ /** Wait for predicate(*this) to be true, up to timeout.
+ *@return True if predicate became true, false if timed out.
+ *Note the predicate may not be true after wait returns,
+ *all the caller can say is it was true at some earlier point.
+ */
+ bool wait(boost::function<bool(const Cluster&)> predicate,
+ sys::Duration timeout=sys::TIME_INFINITE) const;
+
private:
typedef Cpg::Id Id;
typedef std::map<Id, shared_ptr<Member> > MemberMap;
+ typedef std::map<
+ framing::ChannelId, framing::FrameHandler::Chains> ChannelMap;
+
+ void mcast(framing::AMQFrame&); ///< send frame by multicast.
+ void notify(); ///< Notify cluster of my details.
- void run();
- void notify();
- void cpgDeliver(
+ void deliver(
cpg_handle_t /*handle*/,
struct cpg_name *group,
uint32_t /*nodeid*/,
@@ -111,7 +102,7 @@ class Cluster : public framing::FrameHandler, private sys::Runnable {
void* /*msg*/,
int /*msg_len*/);
- void cpgConfigChange(
+ void configChange(
cpg_handle_t /*handle*/,
struct cpg_name */*group*/,
struct cpg_address */*members*/, int /*nMembers*/,
@@ -119,16 +110,30 @@ class Cluster : public framing::FrameHandler, private sys::Runnable {
struct cpg_address */*joined*/, int /*nJoined*/
);
+ void run();
+ bool handleClusterFrame(Id from, framing::AMQFrame&);
+
mutable sys::Monitor lock;
+ boost::scoped_ptr<Cpg> cpg;
Cpg::Name name;
std::string url;
- boost::scoped_ptr<Cpg> cpg;
Id self;
MemberMap members;
+ ChannelMap channels;
sys::Thread dispatcher;
boost::function<void()> callback;
+ framing::FrameHandler::Chains toChains;
+ framing::FrameHandler::Chains fromChains;
+
+ struct IncomingHandler;
+ struct OutgoingHandler;
+
+ friend struct IncomingHandler;
+ friend struct OutgoingHandler;
friend std::ostream& operator <<(std::ostream&, const Cluster&);
+ friend std::ostream& operator <<(std::ostream&, const MemberMap::value_type&);
+ friend std::ostream& operator <<(std::ostream&, const MemberMap&);
};
}} // namespace qpid::cluster
diff --git a/cpp/src/qpid/cluster/ClusterPluginProvider.cpp b/cpp/src/qpid/cluster/ClusterPluginProvider.cpp
index 3a09a66b81..d48fbadf7b 100644
--- a/cpp/src/qpid/cluster/ClusterPluginProvider.cpp
+++ b/cpp/src/qpid/cluster/ClusterPluginProvider.cpp
@@ -18,7 +18,6 @@
#include "qpid/broker/Broker.h"
#include "qpid/framing/HandlerUpdater.h"
#include "qpid/cluster/Cluster.h"
-#include "qpid/cluster/ChannelManager.h"
#include "qpid/Plugin.h"
#include "qpid/Options.h"
@@ -51,12 +50,7 @@ struct ClusterPluginProvider : public PluginProvider {
if (broker && !options.clusterName.empty()) {
assert(!cluster); // A process can only belong to one cluster.
cluster.reset(new Cluster(options.clusterName, broker->getUrl()));
-
- // Channel manager is both the next handler for the cluster
- // and the HandlerUpdater plugin for the broker.
- shared_ptr<ChannelManager> manager(new ChannelManager(cluster));
- cluster->join(manager);
- broker->use(manager);
+ // FIXME aconway 2007-06-29: register HandlerUpdater.
}
}
};
diff --git a/cpp/src/qpid/cluster/Cpg.cpp b/cpp/src/qpid/cluster/Cpg.cpp
index a979ce1eeb..3148e52789 100644
--- a/cpp/src/qpid/cluster/Cpg.cpp
+++ b/cpp/src/qpid/cluster/Cpg.cpp
@@ -17,7 +17,10 @@
*/
#include "Cpg.h"
+
#include "qpid/sys/Mutex.h"
+#include "qpid/log/Statement.h"
+
#include <vector>
#include <limits>
#include <iterator>
@@ -33,16 +36,15 @@ using namespace std;
class Cpg::Handles
{
public:
- void put(cpg_handle_t handle, Cpg* object) {
+ void put(cpg_handle_t handle, Cpg::Handler* handler) {
sys::Mutex::ScopedLock l(lock);
- assert(object);
uint32_t index=uint32_t(handle); // Lower 32 bits is an array index.
if (index >= handles.size())
handles.resize(index+1, 0);
- handles[index] = object;
+ handles[index] = handler;
}
- Cpg* get(cpg_handle_t handle) {
+ Cpg::Handler* get(cpg_handle_t handle) {
sys::Mutex::ScopedLock l(lock);
uint32_t index=uint32_t(handle); // Lower 32 bits is an array index.
assert(index < handles.size());
@@ -52,7 +54,7 @@ class Cpg::Handles
private:
sys::Mutex lock;
- vector<Cpg*> handles;
+ vector<Cpg::Handler*> handles;
};
Cpg::Handles Cpg::handles;
@@ -66,7 +68,9 @@ void Cpg::globalDeliver (
void* msg,
int msg_len)
{
- handles.get(handle)->deliver(handle, group, nodeid, pid, msg, msg_len);
+ Cpg::Handler* handler=handles.get(handle);
+ if (handler)
+ handler->deliver(handle, group, nodeid, pid, msg, msg_len);
}
void Cpg::globalConfigChange(
@@ -77,23 +81,35 @@ void Cpg::globalConfigChange(
struct cpg_address *joined, int nJoined
)
{
- handles.get(handle)->configChange(handle, group, members, nMembers, left, nLeft, joined, nJoined);
+ Cpg::Handler* handler=handles.get(handle);
+ if (handler)
+ handler->configChange(handle, group, members, nMembers, left, nLeft, joined, nJoined);
}
-Cpg::Cpg(DeliverFn d, ConfigChangeFn c) : deliver(d), configChange(c)
-{
+Cpg::Cpg(Handler& h) : handler(h) {
cpg_callbacks_t callbacks = { &globalDeliver, &globalConfigChange };
check(cpg_initialize(&handle, &callbacks), "Cannot initialize CPG");
- handles.put(handle, this);
+ handles.put(handle, &handler);
}
Cpg::~Cpg() {
try {
- check(cpg_finalize(handle), "Error in shutdown of CPG");
+ shutdown();
+ } catch (const std::exception& e) {
+ QPID_LOG(error, string("Exception in Cpg destructor: ")+e.what());
}
- catch (...) {
- handles.put(handle, 0);
- throw;
+}
+
+struct Cpg::ClearHandleOnExit {
+ ClearHandleOnExit(cpg_handle_t h) : handle(h) {}
+ ~ClearHandleOnExit() { Cpg::handles.put(handle, 0); }
+ cpg_handle_t handle;
+};
+
+void Cpg::shutdown() {
+ if (handles.get(handle)) {
+ ClearHandleOnExit guard(handle); // Exception safe
+ check(cpg_finalize(handle), "Error in shutdown of CPG");
}
}
@@ -102,11 +118,11 @@ string Cpg::errorStr(cpg_error_t err, const std::string& msg) {
case CPG_OK: return msg+": ok";
case CPG_ERR_LIBRARY: return msg+": library";
case CPG_ERR_TIMEOUT: return msg+": timeout";
- case CPG_ERR_TRY_AGAIN: return msg+": try again";
+ case CPG_ERR_TRY_AGAIN: return msg+": timeout. The aisexec daemon may not be running";
case CPG_ERR_INVALID_PARAM: return msg+": invalid param";
case CPG_ERR_NO_MEMORY: return msg+": no memory";
case CPG_ERR_BAD_HANDLE: return msg+": bad handle";
- case CPG_ERR_ACCESS: return msg+": access";
+ case CPG_ERR_ACCESS: return msg+": access denied. You may need to set your group ID to 'ais'";
case CPG_ERR_NOT_EXIST: return msg+": not exist";
case CPG_ERR_EXIST: return msg+": exist";
case CPG_ERR_NOT_SUPPORTED: return msg+": not supported";
diff --git a/cpp/src/qpid/cluster/Cpg.h b/cpp/src/qpid/cluster/Cpg.h
index e164ed1215..d616be74e2 100644
--- a/cpp/src/qpid/cluster/Cpg.h
+++ b/cpp/src/qpid/cluster/Cpg.h
@@ -21,8 +21,9 @@
#include "qpid/Exception.h"
#include "qpid/cluster/Dispatchable.h"
-#include <boost/function.hpp>
+
#include <cassert>
+
extern "C" {
#include <openais/cpg.h>
}
@@ -69,31 +70,36 @@ class Cpg : public Dispatchable {
return std::string(n.value, n.length);
}
- typedef boost::function<void (
- cpg_handle_t /*handle*/,
- struct cpg_name *group,
- uint32_t /*nodeid*/,
- uint32_t /*pid*/,
- void* /*msg*/,
- int /*msg_len*/)> DeliverFn;
-
- typedef boost::function<void (
- cpg_handle_t /*handle*/,
- struct cpg_name */*group*/,
- struct cpg_address */*members*/, int /*nMembers*/,
- struct cpg_address */*left*/, int /*nLeft*/,
- struct cpg_address */*joined*/, int /*nJoined*/
- )> ConfigChangeFn;
+ struct Handler {
+ virtual ~Handler() {};
+ virtual void deliver(
+ cpg_handle_t /*handle*/,
+ struct cpg_name *group,
+ uint32_t /*nodeid*/,
+ uint32_t /*pid*/,
+ void* /*msg*/,
+ int /*msg_len*/) = 0;
+
+ virtual void configChange(
+ cpg_handle_t /*handle*/,
+ struct cpg_name */*group*/,
+ struct cpg_address */*members*/, int /*nMembers*/,
+ struct cpg_address */*left*/, int /*nLeft*/,
+ struct cpg_address */*joined*/, int /*nJoined*/
+ ) = 0;
+ };
/** Open a CPG handle.
- *@param deliver - free function called when a message is delivered.
- *@param reconfig - free function called when CPG configuration changes.
+ *@param handler for CPG events.
*/
- Cpg(DeliverFn deliver, ConfigChangeFn reconfig);
-
- /** Disconnect from CPG. */
+ Cpg(Handler&);
+
+ /** Destructor calls shutdown. */
~Cpg();
+ /** Disconnect from CPG */
+ void shutdown();
+
/** Dispatch CPG events.
*@param type one of
* - CPG_DISPATCH_ONE - dispatch exactly one event.
@@ -128,7 +134,9 @@ class Cpg : public Dispatchable {
private:
class Handles;
+ struct ClearHandleOnExit;
friend class Handles;
+ friend struct ClearHandleOnExit;
static std::string errorStr(cpg_error_t err, const std::string& msg);
static std::string cantJoinMsg(const Name&);
@@ -159,8 +167,7 @@ class Cpg : public Dispatchable {
static Handles handles;
cpg_handle_t handle;
- DeliverFn deliver;
- ConfigChangeFn configChange;
+ Handler& handler;
};
std::ostream& operator <<(std::ostream& out, const Cpg::Id& id);
diff --git a/cpp/src/qpid/framing/Handler.h b/cpp/src/qpid/framing/Handler.h
index f6b59393d9..2f09911325 100644
--- a/cpp/src/qpid/framing/Handler.h
+++ b/cpp/src/qpid/framing/Handler.h
@@ -36,6 +36,9 @@ template <class T> struct Handler {
/** Handler chains for incoming and outgoing traffic. */
struct Chains {
+ Chains() {}
+ Chains(Chain i, Chain o) : in(i), out(o) {}
+ Chains(Handler* i, Handler* o) : in(i), out(o) {}
Chain in;
Chain out;
};
@@ -48,12 +51,6 @@ template <class T> struct Handler {
/** Next handler. Public so chains can be modified by altering next. */
Chain next;
-
- protected:
- /** Derived handle() implementations call nextHandler to invoke the
- * next handler in the chain. */
- void nextHandler(T data) { if (next) next->handle(data); }
-
};
}}
diff --git a/cpp/src/qpid/log/Selector.h b/cpp/src/qpid/log/Selector.h
index 329541b7fc..bba3f05bfc 100644
--- a/cpp/src/qpid/log/Selector.h
+++ b/cpp/src/qpid/log/Selector.h
@@ -43,6 +43,8 @@ class Selector {
Selector(Level l, const std::string& s=std::string()) {
enable(l,s);
}
+
+ Selector(const std::string& enableStr) { enable(enableStr); }
/**
* Enable messages with level in levels where the file
* name contains substring. Empty string matches all.
diff --git a/cpp/src/tests/Cluster.cpp b/cpp/src/tests/Cluster.cpp
index 008575140b..2ec140b924 100644
--- a/cpp/src/tests/Cluster.cpp
+++ b/cpp/src/tests/Cluster.cpp
@@ -20,51 +20,52 @@
#include <boost/test/auto_unit_test.hpp>
#include "test_tools.h"
#include "Cluster.h"
+#include "qpid/framing/ChannelPingBody.h"
#include "qpid/framing/ChannelOkBody.h"
-#include "qpid/framing/BasicGetOkBody.h"
-
static const ProtocolVersion VER;
-/** Verify membership ind a cluster with one member. */
+using namespace qpid::log;
+
+/** Verify membership in a cluster with one member. */
BOOST_AUTO_TEST_CASE(clusterOne) {
- Cluster cluster("Test", "amqp:one:1");
- TestClusterHandler handler(cluster);
- AMQFrame frame(VER, 1, new ChannelOkBody(VER));
- cluster.handle(frame);
- BOOST_REQUIRE(handler.waitFrames(1));
+ TestCluster cluster("clusterOne", "amqp:one:1");
+ AMQFrame frame(VER, 1, new ChannelPingBody(VER));
+ cluster.getToChains().in->handle(frame);
+ BOOST_REQUIRE(cluster.in.waitFor(1));
+
+ BOOST_CHECK_TYPEID_EQUAL(ChannelPingBody, *cluster.in[0].getBody());
BOOST_CHECK_EQUAL(1u, cluster.size());
Cluster::MemberList members = cluster.getMembers();
BOOST_CHECK_EQUAL(1u, members.size());
- BOOST_REQUIRE_EQUAL(members.front()->url, "amqp:one:1");
- BOOST_CHECK_EQUAL(1u, handler.size());
- BOOST_CHECK_TYPEID_EQUAL(ChannelOkBody, *handler[0].getBody());
+ shared_ptr<const Cluster::Member> me=members.front();
+ BOOST_REQUIRE_EQUAL(me->url, "amqp:one:1");
}
-/** Fork a process to verify membership in a cluster with two members */
+/** Fork a process to test a cluster with two members */
BOOST_AUTO_TEST_CASE(clusterTwo) {
pid_t pid=fork();
BOOST_REQUIRE(pid >= 0);
- if (pid) { // Parent see Cluster_child.cpp for child.
- Cluster cluster("Test", "amqp::1");
- TestClusterHandler handler(cluster);
- BOOST_REQUIRE(handler.waitMembers(2));
+ if (pid) { // Parent, see Cluster_child.cpp for child.
+ TestCluster cluster("clusterTwo", "amqp::1");
+ BOOST_REQUIRE(cluster.waitFor(2)); // Myself and child.
// Exchange frames with child.
- AMQFrame frame(VER, 1, new ChannelOkBody(VER));
- cluster.handle(frame);
- BOOST_REQUIRE(handler.waitFrames(2));
- BOOST_CHECK_TYPEID_EQUAL(ChannelOkBody, *handler[0].getBody());
- BOOST_CHECK_TYPEID_EQUAL(BasicGetOkBody, *handler[1].getBody());
+ AMQFrame frame(VER, 1, new ChannelPingBody(VER));
+ cluster.getToChains().in->handle(frame);
+ BOOST_REQUIRE(cluster.in.waitFor(1));
+ BOOST_CHECK_TYPEID_EQUAL(ChannelPingBody, *cluster.in[0].getBody());
+ BOOST_REQUIRE(cluster.out.waitFor(1));
+ BOOST_CHECK_TYPEID_EQUAL(ChannelOkBody, *cluster.out[0].getBody());
// Wait for child to exit.
int status;
BOOST_CHECK_EQUAL(::wait(&status), pid);
BOOST_CHECK_EQUAL(0, status);
- BOOST_CHECK(handler.waitMembers(1));
+ BOOST_CHECK(cluster.waitFor(1));
BOOST_CHECK_EQUAL(1u, cluster.size());
}
else { // Child
- BOOST_REQUIRE(execl("Cluster_child", "Cluster_child", NULL));
+ BOOST_REQUIRE(execl("./Cluster_child", "./Cluster_child", NULL));
}
}
diff --git a/cpp/src/tests/Cluster.h b/cpp/src/tests/Cluster.h
index edb1f1524f..f37c87a9ad 100644
--- a/cpp/src/tests/Cluster.h
+++ b/cpp/src/tests/Cluster.h
@@ -27,6 +27,7 @@
#include <boost/bind.hpp>
#include <iostream>
#include <vector>
+#include <functional>
/**
* Definitions for the Cluster.cpp and Cluster_child.cpp child program.
@@ -39,45 +40,48 @@ using namespace qpid;
using namespace qpid::cluster;
using namespace qpid::framing;
using namespace qpid::sys;
+using namespace boost;
void null_deleter(void*) {}
-struct TestClusterHandler :
- public std::vector<AMQFrame>, public FrameHandler, public Monitor
-
+struct TestFrameHandler :
+ public FrameHandler, public vector<AMQFrame>, public Monitor
{
- TestClusterHandler(Cluster& c) : cluster(c) {
- cluster.join(make_shared_ptr(this, &null_deleter));
- cluster.setCallback(boost::bind(&Monitor::notify, this));
- }
-
- void handle(AMQFrame& f) {
- ScopedLock l(*this);
- push_back(f);
+ void handle(AMQFrame& frame) {
+ Mutex::ScopedLock l(*this);
+ push_back(frame);
notifyAll();
}
- /** Wait for the vector to contain n frames. */
- bool waitFrames(size_t n) {
- ScopedLock l(*this);
- AbsTime deadline(now(), TIME_SEC);
+ bool waitFor(size_t n) {
+ Mutex::ScopedLock l(*this);
+ AbsTime deadline(now(), 5*TIME_SEC);
while (size() != n && wait(deadline))
;
return size() == n;
}
+};
- /** Wait for the cluster to have n members */
- bool waitMembers(size_t n) {
- ScopedLock l(*this);
- AbsTime deadline(now(), TIME_SEC);
- while (cluster.size() != n && wait(deadline))
- ;
- return cluster.size() == n;
+void nullDeleter(void*) {}
+
+struct TestCluster : public Cluster
+{
+ TestCluster(string name, string url) : Cluster(name, url)
+ {
+ setFromChains(
+ FrameHandler::Chains(
+ make_shared_ptr(&in, nullDeleter),
+ make_shared_ptr(&out, nullDeleter)
+ ));
}
- Cluster& cluster;
-};
+ /** Wait for cluster to be of size n. */
+ bool waitFor(size_t n) {
+ return wait(boost::bind(equal_to<size_t>(), bind(&Cluster::size,this), n));
+ }
+ TestFrameHandler in, out;
+};
diff --git a/cpp/src/tests/Cluster_child.cpp b/cpp/src/tests/Cluster_child.cpp
index a5ac3e9669..d73d2bdbc7 100644
--- a/cpp/src/tests/Cluster_child.cpp
+++ b/cpp/src/tests/Cluster_child.cpp
@@ -26,20 +26,20 @@ using namespace qpid;
using namespace qpid::cluster;
using namespace qpid::framing;
using namespace qpid::sys;
-
+using namespace qpid::log;
static const ProtocolVersion VER;
/** Chlid part of Cluster::clusterTwo test */
void clusterTwo() {
- Cluster cluster("Test", "amqp::2");
- TestClusterHandler handler(cluster);
- BOOST_REQUIRE(handler.waitFrames(1));
- BOOST_CHECK_TYPEID_EQUAL(ChannelOkBody, *handler[0].getBody());
- AMQFrame frame(VER, 1, new BasicGetOkBody(VER));
- cluster.handle(frame);
- BOOST_REQUIRE(handler.waitFrames(2));
- BOOST_CHECK_TYPEID_EQUAL(BasicGetOkBody, *handler[1].getBody());
+ TestCluster cluster("clusterTwo", "amqp::2");
+ BOOST_REQUIRE(cluster.in.waitFor(1)); // Frame from parent.
+ BOOST_CHECK_TYPEID_EQUAL(ChannelPingBody, *cluster.in[0].getBody());
+ BOOST_CHECK_EQUAL(2u, cluster.size()); // Me and parent
+ AMQFrame frame(VER, 1, new ChannelOkBody(VER));
+ cluster.getToChains().out->handle(frame);
+ BOOST_REQUIRE(cluster.out.waitFor(1));
+ BOOST_CHECK_TYPEID_EQUAL(ChannelOkBody, *cluster.out[0].getBody());
}
int test_main(int, char**) {
diff --git a/cpp/src/tests/Cpg.cpp b/cpp/src/tests/Cpg.cpp
index 97b829ea63..ec98ca4fc2 100644
--- a/cpp/src/tests/Cpg.cpp
+++ b/cpp/src/tests/Cpg.cpp
@@ -47,11 +47,11 @@ ostream& operator<<(ostream& o, const pair<T*, int>& array) {
o << "{ ";
ostream_iterator<cpg_address> i(o, " ");
copy(array.first, array.first+array.second, i);
- cout << "}";
+ o << "}";
return o;
}
-struct Callback {
+struct Callback : public Cpg::Handler {
Callback(const string group_) : group(group_) {}
string group;
vector<string> delivered;
@@ -88,10 +88,7 @@ BOOST_AUTO_TEST_CASE(Cpg_basic) {
//
Cpg::Name group("foo");
Callback cb(group.str());
- Cpg::DeliverFn deliver=boost::bind(&Callback::deliver, &cb, _1, _2, _3, _4, _5, _6);
- Cpg::ConfigChangeFn reconfig=boost::bind<void>(&Callback::configChange, &cb, _1, _2, _3, _4, _5, _6, _7, _8);
-
- Cpg cpg(deliver, reconfig);
+ Cpg cpg(cb);
cpg.join(group);
iovec iov = { (void*)"Hello!", 6 };
cpg.mcast(group, &iov, 1);
diff --git a/cpp/src/tests/Makefile.am b/cpp/src/tests/Makefile.am
index 3303afa0be..5a8ad1bde6 100644
--- a/cpp/src/tests/Makefile.am
+++ b/cpp/src/tests/Makefile.am
@@ -11,17 +11,19 @@ lib_broker = $(abs_builddir)/../libqpidbroker.la
# Initialize variables that are incremented with +=
#
check_PROGRAMS=
-unit_progs=
-unit_wrappers=
+TESTS=
+EXTRA_DIST=
#
# Unit test programs.
#
-unit_progs+=logging
+TESTS+=logging
+check_PROGRAMS+=logging
logging_SOURCES=logging.cpp test_tools.h
logging_LDADD=-lboost_unit_test_framework -lboost_regex $(lib_common)
-unit_progs+=Url
+TESTS+=Url
+check_PROGRAMS+=Url
Url_SOURCES=Url.cpp test_tools.h
Url_LDADD=-lboost_unit_test_framework $(lib_common)
@@ -76,21 +78,20 @@ unit_tests = \
# Executables for client tests
-testprogs = \
+testprogs= \
client_test \
echo_service \
topic_listener \
- topic_publisher
-
-
-check_PROGRAMS += $(unit_progs) $(testprogs) interop_runner
+ topic_publisher \
+ interop_runner
+check_PROGRAMS += $(testprogs)
TESTS_ENVIRONMENT = VALGRIND=$(VALGRIND) srcdir=$(srcdir) $(srcdir)/run_test
system_tests = client_test quick_topictest
-TESTS = dummy_test $(unit_progs) $(unit_wrappers) run-unit-tests start_broker $(system_tests) python_tests kill_broker
+TESTS += run-unit-tests start_broker $(system_tests) python_tests kill_broker
-EXTRA_DIST = \
+EXTRA_DIST += \
test_env run_test \
run-unit-tests start_broker python_tests kill_broker \
quick_topictest \
@@ -131,10 +132,8 @@ gen.mk: Makefile.am
check-unit:
$(MAKE) check TESTS=$(UNIT_TESTS) run-unit-tests
-# Dummy test to force necessary test files to be generated.
-dummy_test: .valgrind.supp .valgrindrc
- { echo "#!/bin/sh"; echo "# Dummy test, does nothing. "; } > $@
- chmod a+x $@
+# Make sure valgrind files are generated.
+all: .valgrind.supp .valgrindrc
# Create a copy so that can be modified without risk of committing the changes.
.valgrindrc: .valgrindrc-default
diff --git a/cpp/src/tests/ais_check b/cpp/src/tests/ais_check
new file mode 100755
index 0000000000..df40899065
--- /dev/null
+++ b/cpp/src/tests/ais_check
@@ -0,0 +1,16 @@
+#!/bin/sh
+test `id -ng` = "ais" || {
+ cat <<EOF
+ =========================== NOTICE==============================
+
+ You do not appear to have you group ID set to "ais".
+
+ Cluster tests that require the openais library will fail.Make sure
+ you are a member of group ais and run "newgrp ais" before running
+ the tests.
+
+ ================================================================
+
+EOF
+exit 1;
+}
diff --git a/cpp/src/tests/cluster.mk b/cpp/src/tests/cluster.mk
index 6f59b13107..33e1569d3c 100644
--- a/cpp/src/tests/cluster.mk
+++ b/cpp/src/tests/cluster.mk
@@ -5,33 +5,25 @@ if CLUSTER
lib_cluster = $(abs_builddir)/../libqpidcluster.la
# NOTE: Programs using the openais library must be run with gid=ais
-# Such programs are built as *.ais, with a wrapper script *.sh that
-# runs the program with newgrp ais.
+# You should do "newgrp ais" before running the tests to run these.
#
-# Rule to generate wrapper scripts for tests that require gid=ais.
-run_test="env VALGRIND=$(VALGRIND) srcdir=$(srcdir) $(srcdir)/run_test"
-.ais.sh:
- echo "if groups | grep '\bais\b' >/dev/null;" > $@_t
- echo "then echo $(run_test) ./$< \"$$@ \"| newgrp ais;" >>$@_t
- echo "else echo WARNING: `whoami` not in group ais, skipping $<.;" >>$@_t
- echo "fi" >> $@_t
- mv $@_t $@
- chmod a+x $@
-
#
# Cluster tests.
#
-check_PROGRAMS+=Cpg.ais
-Cpg_ais_SOURCES=Cpg.cpp
-Cpg_ais_LDADD=$(lib_cluster) -lboost_unit_test_framework
-unit_wrappers+=Cpg.sh
-
-# FIXME aconway 2007-06-29: Fixing problems with the test.
-# check_PROGRAMS+=Cluster.ais
-# Cluster_ais_SOURCES=Cluster.cpp Cluster.h
-# Cluster_ais_LDADD=$(lib_cluster) -lboost_unit_test_framework
-# unit_wrappers+=Cluster.sh
+
+TESTS+=ais_check
+EXTRA_DIST+=ais_check
+
+TESTS+=Cpg
+check_PROGRAMS+=Cpg
+Cpg_SOURCES=Cpg.cpp
+Cpg_LDADD=$(lib_cluster) -lboost_unit_test_framework
+
+TESTS+=Cluster
+check_PROGRAMS+=Cluster
+Cluster_SOURCES=Cluster.cpp Cluster.h
+Cluster_LDADD=$(lib_cluster) -lboost_unit_test_framework
check_PROGRAMS+=Cluster_child
Cluster_child_SOURCES=Cluster_child.cpp Cluster.h