summaryrefslogtreecommitdiff
path: root/cpp/src/qpid
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2007-07-02 22:35:33 +0000
committerAlan Conway <aconway@apache.org>2007-07-02 22:35:33 +0000
commit83b4417af81df92cb640de1694488156ba29d85f (patch)
tree630449e321fb571476080b737febd841e605ff2d /cpp/src/qpid
parenta36bef1975b1d273a65dd0e74994106fbaad4389 (diff)
downloadqpid-python-83b4417af81df92cb640de1694488156ba29d85f.tar.gz
2007-06-30 <aconway@redhat.com>
* src/qpid/cluster/Cluster.cpp: Refactor - expose 4 handler points for all traffic to/from cluster. Removed HandlerUpdater functionality, separate class. Cluster only deals with membership and connecting the 4 handler points to CPG multicast. * src/tests/cluster.mk: Dropped newgrp ais wrapper scripts, its much simpler if the user just does "newgrp ais" before building. * src/tests/ais_check: Test script to check if users gid is ais and give clear notice if not. * src/tests/Cluster.cpp: Updated for changes to Cluster. * src/qpid/cluster/Cpg.cpp: Better messages for common errors. * Handler.h: Remove nextHandler() minor convenience is outweighted by risk of undetected errors if handlers that expect next() to be set are called when it's not set. * src/qpid/cluster/Cpg.cpp: Added logging. Replaced boost::function with traditional virtual interface (nasty stack traces.) git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@552614 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid')
-rw-r--r--cpp/src/qpid/cluster/ChannelManager.cpp85
-rw-r--r--cpp/src/qpid/cluster/ChannelManager.h66
-rw-r--r--cpp/src/qpid/cluster/Cluster.cpp197
-rw-r--r--cpp/src/qpid/cluster/Cluster.h105
-rw-r--r--cpp/src/qpid/cluster/ClusterPluginProvider.cpp8
-rw-r--r--cpp/src/qpid/cluster/Cpg.cpp48
-rw-r--r--cpp/src/qpid/cluster/Cpg.h53
-rw-r--r--cpp/src/qpid/framing/Handler.h9
-rw-r--r--cpp/src/qpid/log/Selector.h2
9 files changed, 245 insertions, 328 deletions
diff --git a/cpp/src/qpid/cluster/ChannelManager.cpp b/cpp/src/qpid/cluster/ChannelManager.cpp
deleted file mode 100644
index f573d78ca1..0000000000
--- a/cpp/src/qpid/cluster/ChannelManager.cpp
+++ /dev/null
@@ -1,85 +0,0 @@
-/*
- *
- * Copyright (c) 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-#include "qpid/log/Statement.h"
-#include "qpid/framing/amqp_types.h"
-#include "qpid/framing/AMQFrame.h"
-#include "ChannelManager.h"
-
-namespace qpid {
-namespace cluster {
-
-using namespace framing;
-
-/** Handler to multicast to the cluster */
-struct ClusterHandler : public FrameHandler {
-
- ClusterHandler(FrameHandler::Chain next, ChannelId bitmask_)
- : FrameHandler(next), bitmask(bitmask_) {}
-
- void handle(AMQFrame& frame) {
- frame.channel |= bitmask; // Mark the frame
- nextHandler(frame);
- // TODO aconway 2007-06-28: Right now everything is backed up
- // via multicast. When we have point-to-point backups this
- // function must determine where each frame should be sent: to
- // multicast or only to specific backup(s) via AMQP.
- }
-
- ChannelId bitmask;
-};
-
-ChannelManager::ChannelManager(FrameHandler::Chain mcast) : mcastOut(mcast){}
-
-void ChannelManager::update(ChannelId id, FrameHandler::Chains& chains) {
- // Store the original cluster chains for the channel.
- channels[id] = chains;
-
- // Replace chains with multicast-to-cluster handlers that mark the
- // high-bit of the channel ID on outgoing frames so we can tell
- // them from incoming frames in handle()
- //
- // When handle() receives the frames from the cluster it
- // will forward them to the original channel chains stored in
- // channels map.
- //
- chains.in = make_shared_ptr(new ClusterHandler(mcastOut, 0));
- chains.out= make_shared_ptr(new ClusterHandler(mcastOut, CHANNEL_HIGH_BIT));
-}
-
-void ChannelManager::handle(AMQFrame& frame) {
- bool isOut = frame.channel | CHANNEL_HIGH_BIT;
- frame.channel |= ~CHANNEL_HIGH_BIT; // Clear the bit.
- ChannelMap::iterator i = channels.find(frame.getChannel());
- if (i != channels.end()) {
- Chain& chain = isOut ? i->second.out : i->second.in;
- chain->handle(frame);
- }
- else
- updateFailoverState(frame);
-}
-
-void ChannelManager::updateFailoverState(AMQFrame& ) {
- QPID_LOG(critical, "Failover is not implemented");
- // FIXME aconway 2007-06-28:
- // If the channel is not in my map then I'm not primary so
- // I don't pass the frame to the channel handler but I
- // do need to update the session failover state.
-}
-
-}} // namespace qpid::cluster
diff --git a/cpp/src/qpid/cluster/ChannelManager.h b/cpp/src/qpid/cluster/ChannelManager.h
deleted file mode 100644
index 59fce77957..0000000000
--- a/cpp/src/qpid/cluster/ChannelManager.h
+++ /dev/null
@@ -1,66 +0,0 @@
-#ifndef QPID_CLUSTER_CHANNELMANAGER_H
-#define QPID_CLUSTER_CHANNELMANAGER_H
-
-/*
- *
- * Copyright (c) 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-#include "qpid/framing/HandlerUpdater.h"
-#include <map>
-
-namespace qpid {
-namespace cluster {
-
-/**
- * Manage channels and handler chains on channels for the cluster.
- *
- * As HandlerUpdater plugin, updates channel handler chains with
- * cluster handlers.
- *
- * As a FrameHandler handles frames coming from the cluster and
- * dispatches them to the appropriate channel handler.
- *
- */
-class ChannelManager : public framing::HandlerUpdater,
- public framing::FrameHandler
-{
- public:
- /** Takes a handler to send frames to the cluster. */
- ChannelManager(framing::FrameHandler::Chain mcastOut);
-
- /** As FrameHandler handle frames from the cluster */
- void handle(framing::AMQFrame& frame);
-
- /** As ChannelUpdater update the handler chains. */
- void update(framing::ChannelId id, framing::FrameHandler::Chains& chains);
-
- private:
- void updateFailoverState(framing::AMQFrame&);
-
- typedef std::map<framing::ChannelId,
- framing::FrameHandler::Chains> ChannelMap;
-
- framing::FrameHandler::Chain mcastOut;
- ChannelMap channels;
-};
-
-
-}} // namespace qpid::cluster
-
-
-
-#endif /*!QPID_CLUSTER_CHANNELMANAGER_H*/
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp
index 8d898eefa3..e691ad357d 100644
--- a/cpp/src/qpid/cluster/Cluster.cpp
+++ b/cpp/src/qpid/cluster/Cluster.cpp
@@ -32,65 +32,89 @@ using namespace qpid::sys;
using namespace std;
ostream& operator <<(ostream& out, const Cluster& cluster) {
- return out << cluster.name.str() << "(" << cluster.self << ")";
+ return out << "cluster[" << cluster.name.str() << " " << cluster.self << "]";
}
-namespace {
-Cluster::Member::Status statusMap[CPG_REASON_PROCDOWN+1];
-struct StatusMapInit {
- StatusMapInit() {
- statusMap[CPG_REASON_JOIN] = Cluster::Member::JOIN;
- statusMap[CPG_REASON_LEAVE] = Cluster::Member::LEAVE;
- statusMap[CPG_REASON_NODEDOWN] = Cluster::Member::NODEDOWN;
- statusMap[CPG_REASON_NODEUP] = Cluster::Member::NODEUP;
- statusMap[CPG_REASON_PROCDOWN] = Cluster::Member::PROCDOWN;
- }
-} instance;
+ostream& operator<<(ostream& out, const Cluster::MemberMap::value_type& m) {
+ return out << m.first << "=" << m.second->url;
}
-Cluster::Member::Member(const cpg_address& addr)
- : status(statusMap[addr.reason]) {}
+ostream& operator <<(ostream& out, const Cluster::MemberMap& members) {
+ ostream_iterator<Cluster::MemberMap::value_type> o(out, " ");
+ copy(members.begin(), members.end(), o);
+ return out;
+}
+
+namespace {
+
+/** We mark the high bit of a frame's channel number to know if it's
+ * an incoming or outgoing frame when frames arrive via multicast.
+ */
+bool isOutgoing(AMQFrame& frame) { return frame.channel&CHANNEL_HIGH_BIT; }
+bool isIncoming(AMQFrame& frame) { return !isOutgoing(frame); }
+void markOutgoing(AMQFrame& frame) { frame.channel |= CHANNEL_HIGH_BIT; }
+void markIncoming(AMQFrame&) { /*noop*/ }
+void unMark(AMQFrame& frame) { frame.channel &= ~CHANNEL_HIGH_BIT; }
-void Cluster::notify() {
- ProtocolVersion version;
- // TODO aconway 2007-06-25: Use proxy here.
- AMQFrame frame(version, 0,
- make_shared_ptr(new ClusterNotifyBody(version, url)));
- handle(frame);
}
+struct Cluster::IncomingHandler : public FrameHandler {
+ IncomingHandler(Cluster& c) : cluster(c) {}
+ void handle(AMQFrame& frame) {
+ markIncoming(frame);
+ cluster.mcast(frame);
+ }
+ Cluster& cluster;
+};
+
+struct Cluster::OutgoingHandler : public FrameHandler {
+ OutgoingHandler(Cluster& c) : cluster(c) {}
+ void handle(AMQFrame& frame) {
+ markOutgoing(frame);
+ cluster.mcast(frame);
+ }
+ Cluster& cluster;
+};
+
+
+// TODO aconway 2007-06-28: Right now everything is backed up via
+// multicast. When we have point-to-point backups the
+// Incoming/Outgoing handlers must determine where each frame should
+// be sent: to multicast or only to specific backup(s) via AMQP.
+
Cluster::Cluster(const std::string& name_, const std::string& url_) :
+ cpg(new Cpg(*this)),
name(name_),
url(url_),
- cpg(new Cpg(
- boost::bind(&Cluster::cpgDeliver, this, _1, _2, _3, _4, _5, _6),
- boost::bind(&Cluster::cpgConfigChange, this, _1, _2, _3, _4, _5, _6, _7, _8))),
- self(cpg->getLocalNoideId(), getpid())
-{}
-
-void Cluster::join(FrameHandler::Chain next) {
+ self(cpg->getLocalNoideId(), getpid()),
+ toChains(new IncomingHandler(*this), new OutgoingHandler(*this))
+{
QPID_LOG(trace, *this << " Joining cluster.");
- next = next;
- dispatcher=Thread(*this);
cpg->join(name);
notify();
+ dispatcher=Thread(*this);
+ // Wait till we show up in the cluster map.
+ {
+ Mutex::ScopedLock l(lock);
+ while (empty())
+ lock.wait();
+ }
}
Cluster::~Cluster() {
- if (cpg) {
- try {
- QPID_LOG(trace, *this << " Leaving cluster.");
- cpg->leave(name);
- cpg.reset();
- dispatcher.join();
- } catch (const std::exception& e) {
- QPID_LOG(error, "Exception leaving cluster " << e.what());
- }
+ QPID_LOG(trace, *this << " Leaving cluster.");
+ try {
+ cpg->leave(name);
+ cpg.reset();
+ dispatcher.join();
+ }
+ catch (const std::exception& e) {
+ QPID_LOG(error, "Exception leaving cluster " << *this << ": "
+ << e.what());
}
}
-void Cluster::handle(AMQFrame& frame) {
- assert(cpg);
+void Cluster::mcast(AMQFrame& frame) {
QPID_LOG(trace, *this << " SEND: " << frame);
Buffer buf(frame.size());
frame.encode(buf);
@@ -99,11 +123,24 @@ void Cluster::handle(AMQFrame& frame) {
cpg->mcast(name, &iov, 1);
}
+void Cluster::notify() {
+ // TODO aconway 2007-06-25: Use proxy here.
+ ProtocolVersion version;
+ AMQFrame frame(version, 0,
+ make_shared_ptr(new ClusterNotifyBody(version, url)));
+ mcast(frame);
+}
+
size_t Cluster::size() const {
Mutex::ScopedLock l(lock);
return members.size();
}
+void Cluster::setFromChains(const framing::FrameHandler::Chains& chains) {
+ Mutex::ScopedLock l(lock);
+ fromChains = chains;
+}
+
Cluster::MemberList Cluster::getMembers() const {
Mutex::ScopedLock l(lock);
MemberList result(members.size());
@@ -112,7 +149,7 @@ Cluster::MemberList Cluster::getMembers() const {
return result;
}
-void Cluster::cpgDeliver(
+void Cluster::deliver(
cpg_handle_t /*handle*/,
struct cpg_name* /* group */,
uint32_t nodeid,
@@ -124,61 +161,71 @@ void Cluster::cpgDeliver(
Buffer buf(static_cast<char*>(msg), msg_len);
AMQFrame frame;
frame.decode(buf);
- QPID_LOG(trace, *this << " RECV: " << frame);
- // TODO aconway 2007-06-20: use visitor pattern.
+ QPID_LOG(trace, *this << " RECV: " << frame << " from: " << from);
+ if (!handleClusterFrame(from, frame)) {
+ FrameHandler::Chain chain = isIncoming(frame) ? fromChains.in : fromChains.out;
+ unMark(frame);
+ if (chain)
+ chain->handle(frame);
+ }
+}
+
+bool Cluster::wait(boost::function<bool(const Cluster&)> predicate,
+ Duration timeout) const
+{
+ AbsTime deadline(now(), timeout);
+ Mutex::ScopedLock l(lock);
+ while (!predicate(*this) && lock.wait(deadline))
+ ;
+ return (predicate(*this));
+}
+
+bool Cluster::handleClusterFrame(Id from, AMQFrame& frame) {
+ // TODO aconway 2007-06-20: use visitor pattern here.
ClusterNotifyBody* notifyIn=
dynamic_cast<ClusterNotifyBody*>(frame.getBody().get());
if (notifyIn) {
+ MemberList list;
{
Mutex::ScopedLock l(lock);
- assert(members[from]);
- members[from]->url = notifyIn->getUrl();
- members[from]->status = Member::BROKER;
+ if (!members[from])
+ members[from].reset(new Member(url));
+ else
+ members[from]->url = notifyIn->getUrl();
+ QPID_LOG(trace, *this << ": member update: " << members);
+ lock.notifyAll();
}
- if (callback)
- callback();
+ return true;
}
- else
- next->handle(frame);
+ return false;
}
-void Cluster::cpgConfigChange(
+void Cluster::configChange(
cpg_handle_t /*handle*/,
struct cpg_name */*group*/,
- struct cpg_address *current, int nCurrent,
+ struct cpg_address */*current*/, int /*nCurrent*/,
struct cpg_address *left, int nLeft,
- struct cpg_address *joined, int nJoined
-)
+ struct cpg_address *joined, int nJoined)
{
- QPID_LOG(trace,
- *this << " Configuration change. " << endl
- << " Joined: " << make_pair(joined, nJoined) << endl
- << " Left: " << make_pair(left, nLeft) << endl
- << " Current: " << make_pair(current, nCurrent));
-
- bool needNotify=false;
+ bool newMembers=false;
MemberList updated;
{
Mutex::ScopedLock l(lock);
- for (int i = 0; i < nJoined; ++i) {
- Id id(current[i]);
- members[id].reset(new Member(current[i]));
- if (id != self)
- needNotify = true; // Notify new members other than myself.
+ if (nLeft) {
+ for (int i = 0; i < nLeft; ++i)
+ members.erase(Id(left[i]));
+ QPID_LOG(trace, *this << ": members left: " << members);
+ lock.notifyAll();
}
- for (int i = 0; i < nLeft; ++i)
- members.erase(Id(current[i]));
- } // End of locked scope.
- if (needNotify)
+ newMembers = nJoined > 1 || (nJoined==1 && Id(joined[0]) != self);
+ // We don't record members joining here, we record them when
+ // we get their ClusterNotify message.
+ }
+ if (newMembers)
notify();
- if (callback)
- callback();
}
-void Cluster::setCallback(boost::function<void()> f) { callback=f; }
-
void Cluster::run() {
- assert(cpg);
cpg->dispatchBlocking();
}
diff --git a/cpp/src/qpid/cluster/Cluster.h b/cpp/src/qpid/cluster/Cluster.h
index aff213b6c9..199a93a7c5 100644
--- a/cpp/src/qpid/cluster/Cluster.h
+++ b/cpp/src/qpid/cluster/Cluster.h
@@ -21,89 +21,80 @@
#include "qpid/cluster/Cpg.h"
#include "qpid/framing/FrameHandler.h"
-#include "qpid/sys/Thread.h"
+#include "qpid/shared_ptr.h"
#include "qpid/sys/Monitor.h"
#include "qpid/sys/Runnable.h"
-#include "qpid/shared_ptr.h"
+#include "qpid/sys/Thread.h"
+#include "qpid/log/Logger.h"
+
#include <boost/function.hpp>
#include <boost/scoped_ptr.hpp>
+
#include <map>
#include <vector>
-namespace qpid {
-
-namespace broker {
-class HandlerUpdater;
-}
-
-namespace cluster {
-
-class ChannelManager;
+namespace qpid { namespace cluster {
/**
* Represents a cluster, provides access to data about members.
- *
- * Implements a FrameHandler that multicasts frames to the cluster.
- *
- * Requires a handler for frames arriving from the cluster,
- * normally a ChannelManager but other handlers could be interposed
- * for testing, logging etc.
+ * Implements HandlerUpdater to manage handlers that route frames to
+ * and from the cluster.
*/
-class Cluster : public framing::FrameHandler, private sys::Runnable {
+class Cluster : private sys::Runnable, private Cpg::Handler
+{
public:
/** Details of a cluster member */
struct Member {
typedef shared_ptr<const Member> Ptr;
- /** Status of a cluster member. */
- enum Status {
- JOIN, ///< Process joined the group.
- LEAVE, ///< Process left the group cleanly.
- NODEDOWN, ///< Process's node went down.
- NODEUP, ///< Process's node joined the cluster.
- PROCDOWN, ///< Process died without leaving.
- BROKER ///< Broker details are available.
- };
-
- Member(const cpg_address&);
- std::string url;
- Status status;
+ Member(const std::string& url_) : url(url_) {}
+ std::string url; ///< Broker address.
};
typedef std::vector<Member::Ptr> MemberList;
-
+
/**
- * Create a cluster object but do not joing.
+ * Join a cluster.
* @param name of the cluster.
* @param url of this broker, sent to the cluster.
*/
Cluster(const std::string& name, const std::string& url);
- ~Cluster();
-
- /** Join the cluster.
- *@handler is the handler for frames arriving from the cluster.
- */
- void join(framing::FrameHandler::Chain handler);
-
- /** Multicast a frame to the cluster. */
- void handle(framing::AMQFrame&);
+ virtual ~Cluster();
/** Get the current cluster membership. */
MemberList getMembers() const;
- /** Called when membership changes. */
- void setCallback(boost::function<void()>);
-
/** Number of members in the cluster. */
size_t size() const;
+ bool empty() const { return size() == 0; }
+
+ /** Get handler chains to send frames to the cluster */
+ framing::FrameHandler::Chains getToChains() {
+ return toChains;
+ }
+
+ /** Set handler chains for frames received from the cluster */
+ void setFromChains(const framing::FrameHandler::Chains& chains);
+
+ /** Wait for predicate(*this) to be true, up to timeout.
+ *@return True if predicate became true, false if timed out.
+ *Note the predicate may not be true after wait returns,
+ *all the caller can say is it was true at some earlier point.
+ */
+ bool wait(boost::function<bool(const Cluster&)> predicate,
+ sys::Duration timeout=sys::TIME_INFINITE) const;
+
private:
typedef Cpg::Id Id;
typedef std::map<Id, shared_ptr<Member> > MemberMap;
+ typedef std::map<
+ framing::ChannelId, framing::FrameHandler::Chains> ChannelMap;
+
+ void mcast(framing::AMQFrame&); ///< send frame by multicast.
+ void notify(); ///< Notify cluster of my details.
- void run();
- void notify();
- void cpgDeliver(
+ void deliver(
cpg_handle_t /*handle*/,
struct cpg_name *group,
uint32_t /*nodeid*/,
@@ -111,7 +102,7 @@ class Cluster : public framing::FrameHandler, private sys::Runnable {
void* /*msg*/,
int /*msg_len*/);
- void cpgConfigChange(
+ void configChange(
cpg_handle_t /*handle*/,
struct cpg_name */*group*/,
struct cpg_address */*members*/, int /*nMembers*/,
@@ -119,16 +110,30 @@ class Cluster : public framing::FrameHandler, private sys::Runnable {
struct cpg_address */*joined*/, int /*nJoined*/
);
+ void run();
+ bool handleClusterFrame(Id from, framing::AMQFrame&);
+
mutable sys::Monitor lock;
+ boost::scoped_ptr<Cpg> cpg;
Cpg::Name name;
std::string url;
- boost::scoped_ptr<Cpg> cpg;
Id self;
MemberMap members;
+ ChannelMap channels;
sys::Thread dispatcher;
boost::function<void()> callback;
+ framing::FrameHandler::Chains toChains;
+ framing::FrameHandler::Chains fromChains;
+
+ struct IncomingHandler;
+ struct OutgoingHandler;
+
+ friend struct IncomingHandler;
+ friend struct OutgoingHandler;
friend std::ostream& operator <<(std::ostream&, const Cluster&);
+ friend std::ostream& operator <<(std::ostream&, const MemberMap::value_type&);
+ friend std::ostream& operator <<(std::ostream&, const MemberMap&);
};
}} // namespace qpid::cluster
diff --git a/cpp/src/qpid/cluster/ClusterPluginProvider.cpp b/cpp/src/qpid/cluster/ClusterPluginProvider.cpp
index 3a09a66b81..d48fbadf7b 100644
--- a/cpp/src/qpid/cluster/ClusterPluginProvider.cpp
+++ b/cpp/src/qpid/cluster/ClusterPluginProvider.cpp
@@ -18,7 +18,6 @@
#include "qpid/broker/Broker.h"
#include "qpid/framing/HandlerUpdater.h"
#include "qpid/cluster/Cluster.h"
-#include "qpid/cluster/ChannelManager.h"
#include "qpid/Plugin.h"
#include "qpid/Options.h"
@@ -51,12 +50,7 @@ struct ClusterPluginProvider : public PluginProvider {
if (broker && !options.clusterName.empty()) {
assert(!cluster); // A process can only belong to one cluster.
cluster.reset(new Cluster(options.clusterName, broker->getUrl()));
-
- // Channel manager is both the next handler for the cluster
- // and the HandlerUpdater plugin for the broker.
- shared_ptr<ChannelManager> manager(new ChannelManager(cluster));
- cluster->join(manager);
- broker->use(manager);
+ // FIXME aconway 2007-06-29: register HandlerUpdater.
}
}
};
diff --git a/cpp/src/qpid/cluster/Cpg.cpp b/cpp/src/qpid/cluster/Cpg.cpp
index a979ce1eeb..3148e52789 100644
--- a/cpp/src/qpid/cluster/Cpg.cpp
+++ b/cpp/src/qpid/cluster/Cpg.cpp
@@ -17,7 +17,10 @@
*/
#include "Cpg.h"
+
#include "qpid/sys/Mutex.h"
+#include "qpid/log/Statement.h"
+
#include <vector>
#include <limits>
#include <iterator>
@@ -33,16 +36,15 @@ using namespace std;
class Cpg::Handles
{
public:
- void put(cpg_handle_t handle, Cpg* object) {
+ void put(cpg_handle_t handle, Cpg::Handler* handler) {
sys::Mutex::ScopedLock l(lock);
- assert(object);
uint32_t index=uint32_t(handle); // Lower 32 bits is an array index.
if (index >= handles.size())
handles.resize(index+1, 0);
- handles[index] = object;
+ handles[index] = handler;
}
- Cpg* get(cpg_handle_t handle) {
+ Cpg::Handler* get(cpg_handle_t handle) {
sys::Mutex::ScopedLock l(lock);
uint32_t index=uint32_t(handle); // Lower 32 bits is an array index.
assert(index < handles.size());
@@ -52,7 +54,7 @@ class Cpg::Handles
private:
sys::Mutex lock;
- vector<Cpg*> handles;
+ vector<Cpg::Handler*> handles;
};
Cpg::Handles Cpg::handles;
@@ -66,7 +68,9 @@ void Cpg::globalDeliver (
void* msg,
int msg_len)
{
- handles.get(handle)->deliver(handle, group, nodeid, pid, msg, msg_len);
+ Cpg::Handler* handler=handles.get(handle);
+ if (handler)
+ handler->deliver(handle, group, nodeid, pid, msg, msg_len);
}
void Cpg::globalConfigChange(
@@ -77,23 +81,35 @@ void Cpg::globalConfigChange(
struct cpg_address *joined, int nJoined
)
{
- handles.get(handle)->configChange(handle, group, members, nMembers, left, nLeft, joined, nJoined);
+ Cpg::Handler* handler=handles.get(handle);
+ if (handler)
+ handler->configChange(handle, group, members, nMembers, left, nLeft, joined, nJoined);
}
-Cpg::Cpg(DeliverFn d, ConfigChangeFn c) : deliver(d), configChange(c)
-{
+Cpg::Cpg(Handler& h) : handler(h) {
cpg_callbacks_t callbacks = { &globalDeliver, &globalConfigChange };
check(cpg_initialize(&handle, &callbacks), "Cannot initialize CPG");
- handles.put(handle, this);
+ handles.put(handle, &handler);
}
Cpg::~Cpg() {
try {
- check(cpg_finalize(handle), "Error in shutdown of CPG");
+ shutdown();
+ } catch (const std::exception& e) {
+ QPID_LOG(error, string("Exception in Cpg destructor: ")+e.what());
}
- catch (...) {
- handles.put(handle, 0);
- throw;
+}
+
+struct Cpg::ClearHandleOnExit {
+ ClearHandleOnExit(cpg_handle_t h) : handle(h) {}
+ ~ClearHandleOnExit() { Cpg::handles.put(handle, 0); }
+ cpg_handle_t handle;
+};
+
+void Cpg::shutdown() {
+ if (handles.get(handle)) {
+ ClearHandleOnExit guard(handle); // Exception safe
+ check(cpg_finalize(handle), "Error in shutdown of CPG");
}
}
@@ -102,11 +118,11 @@ string Cpg::errorStr(cpg_error_t err, const std::string& msg) {
case CPG_OK: return msg+": ok";
case CPG_ERR_LIBRARY: return msg+": library";
case CPG_ERR_TIMEOUT: return msg+": timeout";
- case CPG_ERR_TRY_AGAIN: return msg+": try again";
+ case CPG_ERR_TRY_AGAIN: return msg+": timeout. The aisexec daemon may not be running";
case CPG_ERR_INVALID_PARAM: return msg+": invalid param";
case CPG_ERR_NO_MEMORY: return msg+": no memory";
case CPG_ERR_BAD_HANDLE: return msg+": bad handle";
- case CPG_ERR_ACCESS: return msg+": access";
+ case CPG_ERR_ACCESS: return msg+": access denied. You may need to set your group ID to 'ais'";
case CPG_ERR_NOT_EXIST: return msg+": not exist";
case CPG_ERR_EXIST: return msg+": exist";
case CPG_ERR_NOT_SUPPORTED: return msg+": not supported";
diff --git a/cpp/src/qpid/cluster/Cpg.h b/cpp/src/qpid/cluster/Cpg.h
index e164ed1215..d616be74e2 100644
--- a/cpp/src/qpid/cluster/Cpg.h
+++ b/cpp/src/qpid/cluster/Cpg.h
@@ -21,8 +21,9 @@
#include "qpid/Exception.h"
#include "qpid/cluster/Dispatchable.h"
-#include <boost/function.hpp>
+
#include <cassert>
+
extern "C" {
#include <openais/cpg.h>
}
@@ -69,31 +70,36 @@ class Cpg : public Dispatchable {
return std::string(n.value, n.length);
}
- typedef boost::function<void (
- cpg_handle_t /*handle*/,
- struct cpg_name *group,
- uint32_t /*nodeid*/,
- uint32_t /*pid*/,
- void* /*msg*/,
- int /*msg_len*/)> DeliverFn;
-
- typedef boost::function<void (
- cpg_handle_t /*handle*/,
- struct cpg_name */*group*/,
- struct cpg_address */*members*/, int /*nMembers*/,
- struct cpg_address */*left*/, int /*nLeft*/,
- struct cpg_address */*joined*/, int /*nJoined*/
- )> ConfigChangeFn;
+ struct Handler {
+ virtual ~Handler() {};
+ virtual void deliver(
+ cpg_handle_t /*handle*/,
+ struct cpg_name *group,
+ uint32_t /*nodeid*/,
+ uint32_t /*pid*/,
+ void* /*msg*/,
+ int /*msg_len*/) = 0;
+
+ virtual void configChange(
+ cpg_handle_t /*handle*/,
+ struct cpg_name */*group*/,
+ struct cpg_address */*members*/, int /*nMembers*/,
+ struct cpg_address */*left*/, int /*nLeft*/,
+ struct cpg_address */*joined*/, int /*nJoined*/
+ ) = 0;
+ };
/** Open a CPG handle.
- *@param deliver - free function called when a message is delivered.
- *@param reconfig - free function called when CPG configuration changes.
+ *@param handler for CPG events.
*/
- Cpg(DeliverFn deliver, ConfigChangeFn reconfig);
-
- /** Disconnect from CPG. */
+ Cpg(Handler&);
+
+ /** Destructor calls shutdown. */
~Cpg();
+ /** Disconnect from CPG */
+ void shutdown();
+
/** Dispatch CPG events.
*@param type one of
* - CPG_DISPATCH_ONE - dispatch exactly one event.
@@ -128,7 +134,9 @@ class Cpg : public Dispatchable {
private:
class Handles;
+ struct ClearHandleOnExit;
friend class Handles;
+ friend struct ClearHandleOnExit;
static std::string errorStr(cpg_error_t err, const std::string& msg);
static std::string cantJoinMsg(const Name&);
@@ -159,8 +167,7 @@ class Cpg : public Dispatchable {
static Handles handles;
cpg_handle_t handle;
- DeliverFn deliver;
- ConfigChangeFn configChange;
+ Handler& handler;
};
std::ostream& operator <<(std::ostream& out, const Cpg::Id& id);
diff --git a/cpp/src/qpid/framing/Handler.h b/cpp/src/qpid/framing/Handler.h
index f6b59393d9..2f09911325 100644
--- a/cpp/src/qpid/framing/Handler.h
+++ b/cpp/src/qpid/framing/Handler.h
@@ -36,6 +36,9 @@ template <class T> struct Handler {
/** Handler chains for incoming and outgoing traffic. */
struct Chains {
+ Chains() {}
+ Chains(Chain i, Chain o) : in(i), out(o) {}
+ Chains(Handler* i, Handler* o) : in(i), out(o) {}
Chain in;
Chain out;
};
@@ -48,12 +51,6 @@ template <class T> struct Handler {
/** Next handler. Public so chains can be modified by altering next. */
Chain next;
-
- protected:
- /** Derived handle() implementations call nextHandler to invoke the
- * next handler in the chain. */
- void nextHandler(T data) { if (next) next->handle(data); }
-
};
}}
diff --git a/cpp/src/qpid/log/Selector.h b/cpp/src/qpid/log/Selector.h
index 329541b7fc..bba3f05bfc 100644
--- a/cpp/src/qpid/log/Selector.h
+++ b/cpp/src/qpid/log/Selector.h
@@ -43,6 +43,8 @@ class Selector {
Selector(Level l, const std::string& s=std::string()) {
enable(l,s);
}
+
+ Selector(const std::string& enableStr) { enable(enableStr); }
/**
* Enable messages with level in levels where the file
* name contains substring. Empty string matches all.