summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/cluster/Cluster.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/cluster/Cluster.cpp')
-rw-r--r--cpp/src/qpid/cluster/Cluster.cpp66
1 files changed, 38 insertions, 28 deletions
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp
index ce156e85e4..07ed4596e0 100644
--- a/cpp/src/qpid/cluster/Cluster.cpp
+++ b/cpp/src/qpid/cluster/Cluster.cpp
@@ -61,7 +61,7 @@ struct ClusterOperations : public AMQP_AllOperations::ClusterHandler {
};
Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker& b) :
- broker(&b),
+ broker(b),
poller(b.getPoller()),
cpg(*this),
name(name_),
@@ -74,15 +74,17 @@ Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker& b) :
),
deliverQueue(EventQueue::forEach(boost::bind(&Cluster::deliverEvent, this, _1)))
{
- broker->addFinalizer(boost::bind(&Cluster::leave, this));
- QPID_LOG(notice, "Joining cluster: " << name.str() << " as " << self);
+ QPID_LOG(notice, "Cluster member " << self << " joining cluster " << name.str());
+ broker.addFinalizer(boost::bind(&Cluster::shutdown, this));
cpg.join(name);
deliverQueue.start(poller);
cpgDispatchHandle.startWatch(poller);
}
-Cluster::~Cluster() {}
+Cluster::~Cluster() {
+ QPID_LOG(debug, "~Cluster()");
+}
void Cluster::insert(const boost::intrusive_ptr<Connection>& c) {
Mutex::ScopedLock l(lock);
@@ -94,20 +96,13 @@ void Cluster::erase(ConnectionId id) {
connections.erase(id);
}
+// FIXME aconway 2008-09-10: leave is currently not called,
+// It should be called if we are shut down by a cluster admin command.
+// Any other type of exit is caught in disconnect().
+//
void Cluster::leave() {
- Mutex::ScopedLock l(lock);
- if (!broker) return; // Already left.
- // Leave is called by from Broker destructor after the poller has
- // been shut down. No dispatches can occur.
-
- QPID_LOG(notice, "Leaving cluster " << name.str());
+ QPID_LOG(notice, "Cluster member " << self << " leaving cluster " << name.str());
cpg.leave(name);
- // broker= is set to 0 when the final config-change is delivered.
- while(broker) {
- Mutex::ScopedUnlock u(lock);
- cpg.dispatchAll();
- }
- cpg.shutdown();
}
template <class T> void decodePtr(Buffer& buf, T*& ptr) {
@@ -177,6 +172,7 @@ void Cluster::deliver(
{
try {
MemberId from(nodeid, pid);
+ QPID_LOG(debug, "Cluster::deliver from " << from << " to " << self); // FIXME aconway 2008-09-10:
deliverQueue.push(Event::delivered(from, msg, msg_len));
}
catch (const std::exception& e) {
@@ -238,7 +234,7 @@ void Cluster::configChange(
cpg_address *left, int nLeft,
cpg_address *joined, int nJoined)
{
- QPID_LOG(notice, "Cluster of " << nCurrent << ": " << AddrList(current, nCurrent) << ".\n Changes: "
+ QPID_LOG(info, "Cluster of " << nCurrent << ": " << AddrList(current, nCurrent) << ".\n Changes: "
<< AddrList(joined, nJoined) << AddrList(left, nLeft));
if (nJoined) // Notfiy new members of my URL.
@@ -246,13 +242,14 @@ void Cluster::configChange(
AMQFrame(in_place<ClusterJoiningBody>(ProtocolVersion(), url.str())),
ConnectionId(self,0));
-
+ if (find(left, left+nLeft, self) != left+nLeft) {
+ // We have left the group, this is the final config change.
+ QPID_LOG(notice, "Cluster member " << self << " left cluster " << name.str());
+ broker.shutdown();
+ }
Mutex::ScopedLock l(lock);
for (int i = 0; i < nLeft; ++i) urls.erase(left[i]);
// Add new members when their URL notice arraives.
-
- if (find(left, left+nLeft, self) != left+nLeft)
- broker = 0; // We have left the group, this is the final config change.
lock.notifyAll(); // Threads waiting for membership changes.
}
@@ -261,22 +258,35 @@ void Cluster::dispatch(sys::DispatchHandle& h) {
h.rewatch();
}
-void Cluster::disconnect(sys::DispatchHandle& h) {
- h.stopWatch();
- QPID_LOG(critical, "Disconnected from cluster, shutting down");
- broker->shutdown();
+void Cluster::disconnect(sys::DispatchHandle& ) {
+ // FIXME aconway 2008-09-11: this should be logged as critical,
+ // when we provide admin option to shut down cluster and let
+ // members leave cleanly.
+ QPID_LOG(notice, "Cluster member " << self << " disconnected from cluster " << name.str());
+ broker.shutdown();
}
void Cluster::joining(const MemberId& m, const string& url) {
- QPID_LOG(notice, "Cluster member " << m << " has URL " << url);
+ QPID_LOG(info, "Cluster member " << m << " has URL " << url);
urls.insert(UrlMap::value_type(m,Url(url)));
}
void Cluster::ready(const MemberId& ) {
// FIXME aconway 2008-09-08: TODO
}
-
-}} // namespace qpid::cluster
+// Called from Broker::~Broker when broker is shut down. At this
+// point we know the poller has stopped so no poller callbacks will be
+// invoked. We must ensure that CPG has also shut down so no CPG
+// callbacks will be invoked.
+//
+void Cluster::shutdown() {
+ QPID_LOG(notice, "Cluster member " << self << " shutting down.");
+ try { cpg.shutdown(); }
+ catch (const std::exception& e) { QPID_LOG(error, "During CPG shutdown: " << e.what()); }
+ delete this;
+}
+broker::Broker& Cluster::getBroker(){ return broker; }
+}} // namespace qpid::cluster