summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/cluster/Cluster.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/cluster/Cluster.cpp')
-rw-r--r--cpp/src/qpid/cluster/Cluster.cpp12
1 files changed, 7 insertions, 5 deletions
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp
index a881516597..d536ac59f2 100644
--- a/cpp/src/qpid/cluster/Cluster.cpp
+++ b/cpp/src/qpid/cluster/Cluster.cpp
@@ -99,6 +99,7 @@ Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker& b, b
boost::bind(&Cluster::disconnect, this, _1) // disconnect
),
deliverQueue(boost::bind(&Cluster::delivered, this, _1), poller),
+ mcastQueue(boost::bind(&Event::mcast, _1, boost::cref(name), boost::ref(cpg)), poller),
mcastId(0),
mgmtObject(0),
state(INIT),
@@ -116,6 +117,7 @@ Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker& b, b
failoverExchange.reset(new FailoverExchange(this));
cpgDispatchHandle.startWatch(poller);
deliverQueue.start();
+ mcastQueue.start();
QPID_LOG(notice, *this << " joining cluster " << name.str() << " with url=" << myUrl);
if (useQuorum) quorum.init();
cpg.join(name);
@@ -176,10 +178,10 @@ void Cluster::mcast(const Event& e, Lock&) {
if (state <= CATCHUP && e.isConnection()) {
// Stall outgoing connection events untill we are fully READY
QPID_LOG(trace, *this << " MCAST deferred: " << e );
- mcastQueue.push_back(e);
+ mcastStallQueue.push_back(e);
}
else
- e.mcast(name, cpg);
+ mcastQueue.push(e);
}
std::vector<Url> Cluster::getUrls() const {
@@ -432,8 +434,8 @@ void Cluster::ready(const MemberId& id, const std::string& url, Lock& l) {
state = READY;
QPID_LOG(notice, *this << " caught up, active cluster member");
if (mgmtObject!=0) mgmtObject->set_status("ACTIVE");
- for_each(mcastQueue.begin(), mcastQueue.end(), boost::bind(&Cluster::mcast, this, _1, boost::ref(l)));
- mcastQueue.clear();
+ for_each(mcastStallQueue.begin(), mcastStallQueue.end(), boost::bind(&Cluster::mcast, this, _1, boost::ref(l)));
+ mcastStallQueue.clear();
}
}
@@ -545,7 +547,7 @@ void Cluster::stopFullCluster(Lock& l) {
}
void Cluster::memberUpdate(Lock& l) {
- QPID_LOG(info, *this << " member update, map=" << map);
+ QPID_LOG(info, *this << map.memberCount() << " members: " << map);
std::vector<Url> urls = getUrls(l);
size_t size = urls.size();
failoverExchange->setUrls(urls);