diff options
Diffstat (limited to 'cpp/src/qpid/cluster/Cluster.cpp')
-rw-r--r-- | cpp/src/qpid/cluster/Cluster.cpp | 66 |
1 files changed, 38 insertions, 28 deletions
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp index ce156e85e4..07ed4596e0 100644 --- a/cpp/src/qpid/cluster/Cluster.cpp +++ b/cpp/src/qpid/cluster/Cluster.cpp @@ -61,7 +61,7 @@ struct ClusterOperations : public AMQP_AllOperations::ClusterHandler { }; Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker& b) : - broker(&b), + broker(b), poller(b.getPoller()), cpg(*this), name(name_), @@ -74,15 +74,17 @@ Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker& b) : ), deliverQueue(EventQueue::forEach(boost::bind(&Cluster::deliverEvent, this, _1))) { - broker->addFinalizer(boost::bind(&Cluster::leave, this)); - QPID_LOG(notice, "Joining cluster: " << name.str() << " as " << self); + QPID_LOG(notice, "Cluster member " << self << " joining cluster " << name.str()); + broker.addFinalizer(boost::bind(&Cluster::shutdown, this)); cpg.join(name); deliverQueue.start(poller); cpgDispatchHandle.startWatch(poller); } -Cluster::~Cluster() {} +Cluster::~Cluster() { + QPID_LOG(debug, "~Cluster()"); +} void Cluster::insert(const boost::intrusive_ptr<Connection>& c) { Mutex::ScopedLock l(lock); @@ -94,20 +96,13 @@ void Cluster::erase(ConnectionId id) { connections.erase(id); } +// FIXME aconway 2008-09-10: leave is currently not called, +// It should be called if we are shut down by a cluster admin command. +// Any other type of exit is caught in disconnect(). +// void Cluster::leave() { - Mutex::ScopedLock l(lock); - if (!broker) return; // Already left. - // Leave is called by from Broker destructor after the poller has - // been shut down. No dispatches can occur. - - QPID_LOG(notice, "Leaving cluster " << name.str()); + QPID_LOG(notice, "Cluster member " << self << " leaving cluster " << name.str()); cpg.leave(name); - // broker= is set to 0 when the final config-change is delivered. - while(broker) { - Mutex::ScopedUnlock u(lock); - cpg.dispatchAll(); - } - cpg.shutdown(); } template <class T> void decodePtr(Buffer& buf, T*& ptr) { @@ -177,6 +172,7 @@ void Cluster::deliver( { try { MemberId from(nodeid, pid); + QPID_LOG(debug, "Cluster::deliver from " << from << " to " << self); // FIXME aconway 2008-09-10: deliverQueue.push(Event::delivered(from, msg, msg_len)); } catch (const std::exception& e) { @@ -238,7 +234,7 @@ void Cluster::configChange( cpg_address *left, int nLeft, cpg_address *joined, int nJoined) { - QPID_LOG(notice, "Cluster of " << nCurrent << ": " << AddrList(current, nCurrent) << ".\n Changes: " + QPID_LOG(info, "Cluster of " << nCurrent << ": " << AddrList(current, nCurrent) << ".\n Changes: " << AddrList(joined, nJoined) << AddrList(left, nLeft)); if (nJoined) // Notfiy new members of my URL. @@ -246,13 +242,14 @@ void Cluster::configChange( AMQFrame(in_place<ClusterJoiningBody>(ProtocolVersion(), url.str())), ConnectionId(self,0)); - + if (find(left, left+nLeft, self) != left+nLeft) { + // We have left the group, this is the final config change. + QPID_LOG(notice, "Cluster member " << self << " left cluster " << name.str()); + broker.shutdown(); + } Mutex::ScopedLock l(lock); for (int i = 0; i < nLeft; ++i) urls.erase(left[i]); // Add new members when their URL notice arraives. - - if (find(left, left+nLeft, self) != left+nLeft) - broker = 0; // We have left the group, this is the final config change. lock.notifyAll(); // Threads waiting for membership changes. } @@ -261,22 +258,35 @@ void Cluster::dispatch(sys::DispatchHandle& h) { h.rewatch(); } -void Cluster::disconnect(sys::DispatchHandle& h) { - h.stopWatch(); - QPID_LOG(critical, "Disconnected from cluster, shutting down"); - broker->shutdown(); +void Cluster::disconnect(sys::DispatchHandle& ) { + // FIXME aconway 2008-09-11: this should be logged as critical, + // when we provide admin option to shut down cluster and let + // members leave cleanly. + QPID_LOG(notice, "Cluster member " << self << " disconnected from cluster " << name.str()); + broker.shutdown(); } void Cluster::joining(const MemberId& m, const string& url) { - QPID_LOG(notice, "Cluster member " << m << " has URL " << url); + QPID_LOG(info, "Cluster member " << m << " has URL " << url); urls.insert(UrlMap::value_type(m,Url(url))); } void Cluster::ready(const MemberId& ) { // FIXME aconway 2008-09-08: TODO } - -}} // namespace qpid::cluster +// Called from Broker::~Broker when broker is shut down. At this +// point we know the poller has stopped so no poller callbacks will be +// invoked. We must ensure that CPG has also shut down so no CPG +// callbacks will be invoked. +// +void Cluster::shutdown() { + QPID_LOG(notice, "Cluster member " << self << " shutting down."); + try { cpg.shutdown(); } + catch (const std::exception& e) { QPID_LOG(error, "During CPG shutdown: " << e.what()); } + delete this; +} +broker::Broker& Cluster::getBroker(){ return broker; } +}} // namespace qpid::cluster |