diff options
Diffstat (limited to 'cpp/src/qpid/cluster/Cluster.cpp')
-rw-r--r-- | cpp/src/qpid/cluster/Cluster.cpp | 197 |
1 files changed, 122 insertions, 75 deletions
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp index 8d898eefa3..e691ad357d 100644 --- a/cpp/src/qpid/cluster/Cluster.cpp +++ b/cpp/src/qpid/cluster/Cluster.cpp @@ -32,65 +32,89 @@ using namespace qpid::sys; using namespace std; ostream& operator <<(ostream& out, const Cluster& cluster) { - return out << cluster.name.str() << "(" << cluster.self << ")"; + return out << "cluster[" << cluster.name.str() << " " << cluster.self << "]"; } -namespace { -Cluster::Member::Status statusMap[CPG_REASON_PROCDOWN+1]; -struct StatusMapInit { - StatusMapInit() { - statusMap[CPG_REASON_JOIN] = Cluster::Member::JOIN; - statusMap[CPG_REASON_LEAVE] = Cluster::Member::LEAVE; - statusMap[CPG_REASON_NODEDOWN] = Cluster::Member::NODEDOWN; - statusMap[CPG_REASON_NODEUP] = Cluster::Member::NODEUP; - statusMap[CPG_REASON_PROCDOWN] = Cluster::Member::PROCDOWN; - } -} instance; +ostream& operator<<(ostream& out, const Cluster::MemberMap::value_type& m) { + return out << m.first << "=" << m.second->url; } -Cluster::Member::Member(const cpg_address& addr) - : status(statusMap[addr.reason]) {} +ostream& operator <<(ostream& out, const Cluster::MemberMap& members) { + ostream_iterator<Cluster::MemberMap::value_type> o(out, " "); + copy(members.begin(), members.end(), o); + return out; +} + +namespace { + +/** We mark the high bit of a frame's channel number to know if it's + * an incoming or outgoing frame when frames arrive via multicast. + */ +bool isOutgoing(AMQFrame& frame) { return frame.channel&CHANNEL_HIGH_BIT; } +bool isIncoming(AMQFrame& frame) { return !isOutgoing(frame); } +void markOutgoing(AMQFrame& frame) { frame.channel |= CHANNEL_HIGH_BIT; } +void markIncoming(AMQFrame&) { /*noop*/ } +void unMark(AMQFrame& frame) { frame.channel &= ~CHANNEL_HIGH_BIT; } -void Cluster::notify() { - ProtocolVersion version; - // TODO aconway 2007-06-25: Use proxy here. - AMQFrame frame(version, 0, - make_shared_ptr(new ClusterNotifyBody(version, url))); - handle(frame); } +struct Cluster::IncomingHandler : public FrameHandler { + IncomingHandler(Cluster& c) : cluster(c) {} + void handle(AMQFrame& frame) { + markIncoming(frame); + cluster.mcast(frame); + } + Cluster& cluster; +}; + +struct Cluster::OutgoingHandler : public FrameHandler { + OutgoingHandler(Cluster& c) : cluster(c) {} + void handle(AMQFrame& frame) { + markOutgoing(frame); + cluster.mcast(frame); + } + Cluster& cluster; +}; + + +// TODO aconway 2007-06-28: Right now everything is backed up via +// multicast. When we have point-to-point backups the +// Incoming/Outgoing handlers must determine where each frame should +// be sent: to multicast or only to specific backup(s) via AMQP. + Cluster::Cluster(const std::string& name_, const std::string& url_) : + cpg(new Cpg(*this)), name(name_), url(url_), - cpg(new Cpg( - boost::bind(&Cluster::cpgDeliver, this, _1, _2, _3, _4, _5, _6), - boost::bind(&Cluster::cpgConfigChange, this, _1, _2, _3, _4, _5, _6, _7, _8))), - self(cpg->getLocalNoideId(), getpid()) -{} - -void Cluster::join(FrameHandler::Chain next) { + self(cpg->getLocalNoideId(), getpid()), + toChains(new IncomingHandler(*this), new OutgoingHandler(*this)) +{ QPID_LOG(trace, *this << " Joining cluster."); - next = next; - dispatcher=Thread(*this); cpg->join(name); notify(); + dispatcher=Thread(*this); + // Wait till we show up in the cluster map. + { + Mutex::ScopedLock l(lock); + while (empty()) + lock.wait(); + } } Cluster::~Cluster() { - if (cpg) { - try { - QPID_LOG(trace, *this << " Leaving cluster."); - cpg->leave(name); - cpg.reset(); - dispatcher.join(); - } catch (const std::exception& e) { - QPID_LOG(error, "Exception leaving cluster " << e.what()); - } + QPID_LOG(trace, *this << " Leaving cluster."); + try { + cpg->leave(name); + cpg.reset(); + dispatcher.join(); + } + catch (const std::exception& e) { + QPID_LOG(error, "Exception leaving cluster " << *this << ": " + << e.what()); } } -void Cluster::handle(AMQFrame& frame) { - assert(cpg); +void Cluster::mcast(AMQFrame& frame) { QPID_LOG(trace, *this << " SEND: " << frame); Buffer buf(frame.size()); frame.encode(buf); @@ -99,11 +123,24 @@ void Cluster::handle(AMQFrame& frame) { cpg->mcast(name, &iov, 1); } +void Cluster::notify() { + // TODO aconway 2007-06-25: Use proxy here. + ProtocolVersion version; + AMQFrame frame(version, 0, + make_shared_ptr(new ClusterNotifyBody(version, url))); + mcast(frame); +} + size_t Cluster::size() const { Mutex::ScopedLock l(lock); return members.size(); } +void Cluster::setFromChains(const framing::FrameHandler::Chains& chains) { + Mutex::ScopedLock l(lock); + fromChains = chains; +} + Cluster::MemberList Cluster::getMembers() const { Mutex::ScopedLock l(lock); MemberList result(members.size()); @@ -112,7 +149,7 @@ Cluster::MemberList Cluster::getMembers() const { return result; } -void Cluster::cpgDeliver( +void Cluster::deliver( cpg_handle_t /*handle*/, struct cpg_name* /* group */, uint32_t nodeid, @@ -124,61 +161,71 @@ void Cluster::cpgDeliver( Buffer buf(static_cast<char*>(msg), msg_len); AMQFrame frame; frame.decode(buf); - QPID_LOG(trace, *this << " RECV: " << frame); - // TODO aconway 2007-06-20: use visitor pattern. + QPID_LOG(trace, *this << " RECV: " << frame << " from: " << from); + if (!handleClusterFrame(from, frame)) { + FrameHandler::Chain chain = isIncoming(frame) ? fromChains.in : fromChains.out; + unMark(frame); + if (chain) + chain->handle(frame); + } +} + +bool Cluster::wait(boost::function<bool(const Cluster&)> predicate, + Duration timeout) const +{ + AbsTime deadline(now(), timeout); + Mutex::ScopedLock l(lock); + while (!predicate(*this) && lock.wait(deadline)) + ; + return (predicate(*this)); +} + +bool Cluster::handleClusterFrame(Id from, AMQFrame& frame) { + // TODO aconway 2007-06-20: use visitor pattern here. ClusterNotifyBody* notifyIn= dynamic_cast<ClusterNotifyBody*>(frame.getBody().get()); if (notifyIn) { + MemberList list; { Mutex::ScopedLock l(lock); - assert(members[from]); - members[from]->url = notifyIn->getUrl(); - members[from]->status = Member::BROKER; + if (!members[from]) + members[from].reset(new Member(url)); + else + members[from]->url = notifyIn->getUrl(); + QPID_LOG(trace, *this << ": member update: " << members); + lock.notifyAll(); } - if (callback) - callback(); + return true; } - else - next->handle(frame); + return false; } -void Cluster::cpgConfigChange( +void Cluster::configChange( cpg_handle_t /*handle*/, struct cpg_name */*group*/, - struct cpg_address *current, int nCurrent, + struct cpg_address */*current*/, int /*nCurrent*/, struct cpg_address *left, int nLeft, - struct cpg_address *joined, int nJoined -) + struct cpg_address *joined, int nJoined) { - QPID_LOG(trace, - *this << " Configuration change. " << endl - << " Joined: " << make_pair(joined, nJoined) << endl - << " Left: " << make_pair(left, nLeft) << endl - << " Current: " << make_pair(current, nCurrent)); - - bool needNotify=false; + bool newMembers=false; MemberList updated; { Mutex::ScopedLock l(lock); - for (int i = 0; i < nJoined; ++i) { - Id id(current[i]); - members[id].reset(new Member(current[i])); - if (id != self) - needNotify = true; // Notify new members other than myself. + if (nLeft) { + for (int i = 0; i < nLeft; ++i) + members.erase(Id(left[i])); + QPID_LOG(trace, *this << ": members left: " << members); + lock.notifyAll(); } - for (int i = 0; i < nLeft; ++i) - members.erase(Id(current[i])); - } // End of locked scope. - if (needNotify) + newMembers = nJoined > 1 || (nJoined==1 && Id(joined[0]) != self); + // We don't record members joining here, we record them when + // we get their ClusterNotify message. + } + if (newMembers) notify(); - if (callback) - callback(); } -void Cluster::setCallback(boost::function<void()> f) { callback=f; } - void Cluster::run() { - assert(cpg); cpg->dispatchBlocking(); } |