summaryrefslogtreecommitdiff
path: root/cpp/src
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2008-12-01 15:40:14 +0000
committerAlan Conway <aconway@apache.org>2008-12-01 15:40:14 +0000
commit8f38c8b511bbf232c5a65203b415aff456e61992 (patch)
tree765c803dc8bef2c0c0fd753893cd7938dabc3a78 /cpp/src
parentf2e3b43e71612f1ed8abee1cf8250e37141cd19e (diff)
downloadqpid-python-8f38c8b511bbf232c5a65203b415aff456e61992.tar.gz
cluster: Queue outgoing multicast events.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@722101 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src')
-rw-r--r--cpp/src/qpid/cluster/Cluster.cpp12
-rw-r--r--cpp/src/qpid/cluster/Cluster.h4
2 files changed, 9 insertions, 7 deletions
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp
index a881516597..d536ac59f2 100644
--- a/cpp/src/qpid/cluster/Cluster.cpp
+++ b/cpp/src/qpid/cluster/Cluster.cpp
@@ -99,6 +99,7 @@ Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker& b, b
boost::bind(&Cluster::disconnect, this, _1) // disconnect
),
deliverQueue(boost::bind(&Cluster::delivered, this, _1), poller),
+ mcastQueue(boost::bind(&Event::mcast, _1, boost::cref(name), boost::ref(cpg)), poller),
mcastId(0),
mgmtObject(0),
state(INIT),
@@ -116,6 +117,7 @@ Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker& b, b
failoverExchange.reset(new FailoverExchange(this));
cpgDispatchHandle.startWatch(poller);
deliverQueue.start();
+ mcastQueue.start();
QPID_LOG(notice, *this << " joining cluster " << name.str() << " with url=" << myUrl);
if (useQuorum) quorum.init();
cpg.join(name);
@@ -176,10 +178,10 @@ void Cluster::mcast(const Event& e, Lock&) {
if (state <= CATCHUP && e.isConnection()) {
// Stall outgoing connection events untill we are fully READY
QPID_LOG(trace, *this << " MCAST deferred: " << e );
- mcastQueue.push_back(e);
+ mcastStallQueue.push_back(e);
}
else
- e.mcast(name, cpg);
+ mcastQueue.push(e);
}
std::vector<Url> Cluster::getUrls() const {
@@ -432,8 +434,8 @@ void Cluster::ready(const MemberId& id, const std::string& url, Lock& l) {
state = READY;
QPID_LOG(notice, *this << " caught up, active cluster member");
if (mgmtObject!=0) mgmtObject->set_status("ACTIVE");
- for_each(mcastQueue.begin(), mcastQueue.end(), boost::bind(&Cluster::mcast, this, _1, boost::ref(l)));
- mcastQueue.clear();
+ for_each(mcastStallQueue.begin(), mcastStallQueue.end(), boost::bind(&Cluster::mcast, this, _1, boost::ref(l)));
+ mcastStallQueue.clear();
}
}
@@ -545,7 +547,7 @@ void Cluster::stopFullCluster(Lock& l) {
}
void Cluster::memberUpdate(Lock& l) {
- QPID_LOG(info, *this << " member update, map=" << map);
+ QPID_LOG(info, *this << map.memberCount() << " members: " << map);
std::vector<Url> urls = getUrls(l);
size_t size = urls.size();
failoverExchange->setUrls(urls);
diff --git a/cpp/src/qpid/cluster/Cluster.h b/cpp/src/qpid/cluster/Cluster.h
index d8be3f101d..81feef4919 100644
--- a/cpp/src/qpid/cluster/Cluster.h
+++ b/cpp/src/qpid/cluster/Cluster.h
@@ -190,8 +190,8 @@ class Cluster : private Cpg::Handler, public management::Manageable {
ConnectionMap connections;
NoOpConnectionOutputHandler shadowOut;
sys::DispatchHandle cpgDispatchHandle;
- PollableEventQueue deliverQueue;
- PlainEventQueue mcastQueue;
+ PollableEventQueue deliverQueue, mcastQueue;
+ PlainEventQueue mcastStallQueue;
uint32_t mcastId;
framing::Uuid clusterId;