diff options
Diffstat (limited to 'cpp/src/qpid/cluster/Cluster.cpp')
-rw-r--r-- | cpp/src/qpid/cluster/Cluster.cpp | 121 |
1 files changed, 75 insertions, 46 deletions
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp index 30073c4551..8d898eefa3 100644 --- a/cpp/src/qpid/cluster/Cluster.cpp +++ b/cpp/src/qpid/cluster/Cluster.cpp @@ -17,13 +17,13 @@ */ #include "Cluster.h" -#include "Cpg.h" #include "qpid/framing/AMQFrame.h" #include "qpid/framing/ClusterNotifyBody.h" #include "qpid/log/Statement.h" #include <boost/bind.hpp> #include <algorithm> #include <iterator> +#include <map> namespace qpid { namespace cluster { @@ -35,40 +35,62 @@ ostream& operator <<(ostream& out, const Cluster& cluster) { return out << cluster.name.str() << "(" << cluster.self << ")"; } +namespace { +Cluster::Member::Status statusMap[CPG_REASON_PROCDOWN+1]; +struct StatusMapInit { + StatusMapInit() { + statusMap[CPG_REASON_JOIN] = Cluster::Member::JOIN; + statusMap[CPG_REASON_LEAVE] = Cluster::Member::LEAVE; + statusMap[CPG_REASON_NODEDOWN] = Cluster::Member::NODEDOWN; + statusMap[CPG_REASON_NODEUP] = Cluster::Member::NODEUP; + statusMap[CPG_REASON_PROCDOWN] = Cluster::Member::PROCDOWN; + } +} instance; +} + +Cluster::Member::Member(const cpg_address& addr) + : status(statusMap[addr.reason]) {} + void Cluster::notify() { + ProtocolVersion version; // TODO aconway 2007-06-25: Use proxy here. AMQFrame frame(version, 0, make_shared_ptr(new ClusterNotifyBody(version, url))); handle(frame); } -Cluster::Cluster( - const std::string& name_, const std::string& url_, FrameHandler& next_, - ProtocolVersion ver) - : name(name_), url(url_), version(ver), - cpg(new Cpg(boost::bind(&Cluster::cpgDeliver, this, _1, _2, _3, _4, _5, _6), - boost::bind(&Cluster::cpgConfigChange, this, _1, _2, _3, _4, _5, _6, _7, _8))), - next(next_) -{ - self=Id(cpg->getLocalNoideId(), getpid()); +Cluster::Cluster(const std::string& name_, const std::string& url_) : + name(name_), + url(url_), + cpg(new Cpg( + boost::bind(&Cluster::cpgDeliver, this, _1, _2, _3, _4, _5, _6), + boost::bind(&Cluster::cpgConfigChange, this, _1, _2, _3, _4, _5, _6, _7, _8))), + self(cpg->getLocalNoideId(), getpid()) +{} + +void Cluster::join(FrameHandler::Chain next) { QPID_LOG(trace, *this << " Joining cluster."); + next = next; + dispatcher=Thread(*this); cpg->join(name); notify(); - dispatcher=Thread(*this); } Cluster::~Cluster() { - try { - QPID_LOG(trace, *this << " Leaving cluster."); - cpg->leave(name); - cpg.reset(); - dispatcher.join(); - } catch (const std::exception& e) { - QPID_LOG(error, "Exception leaving cluster " << e.what()); + if (cpg) { + try { + QPID_LOG(trace, *this << " Leaving cluster."); + cpg->leave(name); + cpg.reset(); + dispatcher.join(); + } catch (const std::exception& e) { + QPID_LOG(error, "Exception leaving cluster " << e.what()); + } } } void Cluster::handle(AMQFrame& frame) { + assert(cpg); QPID_LOG(trace, *this << " SEND: " << frame); Buffer buf(frame.size()); frame.encode(buf); @@ -104,52 +126,59 @@ void Cluster::cpgDeliver( frame.decode(buf); QPID_LOG(trace, *this << " RECV: " << frame); // TODO aconway 2007-06-20: use visitor pattern. - ClusterNotifyBody* notifyIn= dynamic_cast<ClusterNotifyBody*>(frame.getBody().get()); + ClusterNotifyBody* notifyIn= + dynamic_cast<ClusterNotifyBody*>(frame.getBody().get()); if (notifyIn) { - Mutex::ScopedLock l(lock); - members[from].reset(new Member(notifyIn->getUrl())); - lock.notifyAll(); + { + Mutex::ScopedLock l(lock); + assert(members[from]); + members[from]->url = notifyIn->getUrl(); + members[from]->status = Member::BROKER; + } + if (callback) + callback(); } else - next.handle(frame); + next->handle(frame); } void Cluster::cpgConfigChange( cpg_handle_t /*handle*/, struct cpg_name */*group*/, - struct cpg_address *ccMembers, int nMembers, + struct cpg_address *current, int nCurrent, struct cpg_address *left, int nLeft, struct cpg_address *joined, int nJoined ) { - QPID_LOG( - trace, - *this << " Configuration change. " << endl - << " Joined: " << make_pair(joined, nJoined) << endl - << " Left: " << make_pair(left, nLeft) << endl - << " Current: " << make_pair(ccMembers, nMembers)); - + QPID_LOG(trace, + *this << " Configuration change. " << endl + << " Joined: " << make_pair(joined, nJoined) << endl + << " Left: " << make_pair(left, nLeft) << endl + << " Current: " << make_pair(current, nCurrent)); + + bool needNotify=false; + MemberList updated; { Mutex::ScopedLock l(lock); - // Erase members that left. - for (int i = 0; i < nLeft; ++i) - members.erase(Id(left[i])); - lock.notifyAll(); - } - - // If there are new members (other than myself) then notify. - for (int i=0; i< nJoined; ++i) { - if (Id(joined[i]) != self) { - notify(); - break; + for (int i = 0; i < nJoined; ++i) { + Id id(current[i]); + members[id].reset(new Member(current[i])); + if (id != self) + needNotify = true; // Notify new members other than myself. } - } - - // Note: New members are be added to my map when cpgDeliver - // gets a cluster.notify frame. + for (int i = 0; i < nLeft; ++i) + members.erase(Id(current[i])); + } // End of locked scope. + if (needNotify) + notify(); + if (callback) + callback(); } +void Cluster::setCallback(boost::function<void()> f) { callback=f; } + void Cluster::run() { + assert(cpg); cpg->dispatchBlocking(); } |