summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/cluster/Cluster.cpp
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2008-12-02 20:41:49 +0000
committerAlan Conway <aconway@apache.org>2008-12-02 20:41:49 +0000
commit7cdb9a9ab688988e596d9fce116a0998decd0972 (patch)
treeaef9d6d0bc837b2eb0116e863c8bc89ed8f45021 /cpp/src/qpid/cluster/Cluster.cpp
parent0fa4afae5e690b1cf147ebbe60641b448fcb5c31 (diff)
downloadqpid-python-7cdb9a9ab688988e596d9fce116a0998decd0972.tar.gz
Cluster: handle CPG flow-control conditions.
PollableQueue: allow dispatch functions to refuse dispatch. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@722614 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/cluster/Cluster.cpp')
-rw-r--r--cpp/src/qpid/cluster/Cluster.cpp36
1 files changed, 24 insertions, 12 deletions
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp
index d536ac59f2..0ac0da2be4 100644
--- a/cpp/src/qpid/cluster/Cluster.cpp
+++ b/cpp/src/qpid/cluster/Cluster.cpp
@@ -99,7 +99,7 @@ Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker& b, b
boost::bind(&Cluster::disconnect, this, _1) // disconnect
),
deliverQueue(boost::bind(&Cluster::delivered, this, _1), poller),
- mcastQueue(boost::bind(&Event::mcast, _1, boost::cref(name), boost::ref(cpg)), poller),
+ mcastQueue(boost::bind(&Cluster::sendMcast, this, _1), poller),
mcastId(0),
mgmtObject(0),
state(INIT),
@@ -109,7 +109,7 @@ Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker& b, b
ManagementAgent* agent = ManagementAgent::Singleton::getInstance();
if (agent != 0){
qmf::Package packageInit(agent);
- mgmtObject = new qmf::Cluster (agent, this, &broker,name.str(),myUrl.str());
+ mgmtObject = new qmf::Cluster (agent, this, &broker,name,myUrl.str());
agent->addObject (mgmtObject);
mgmtObject->set_status("JOINING");
}
@@ -118,7 +118,7 @@ Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker& b, b
cpgDispatchHandle.startWatch(poller);
deliverQueue.start();
mcastQueue.start();
- QPID_LOG(notice, *this << " joining cluster " << name.str() << " with url=" << myUrl);
+ QPID_LOG(notice, *this << " joining cluster " << name << " with url=" << myUrl);
if (useQuorum) quorum.init();
cpg.join(name);
broker.addFinalizer(boost::bind(&Cluster::brokerShutdown, this)); // Must be last for exception safety.
@@ -184,6 +184,17 @@ void Cluster::mcast(const Event& e, Lock&) {
mcastQueue.push(e);
}
+bool Cluster::sendMcast(const Event& e) {
+ try {
+ return e.mcast(cpg);
+ }
+ catch (const std::exception& e) {
+ QPID_LOG(critical, "Multicast failure: " << e.what());
+ leave();
+ return false;
+ }
+}
+
std::vector<Url> Cluster::getUrls() const {
Lock l(lock);
return getUrls(l);
@@ -201,10 +212,10 @@ void Cluster::leave() {
void Cluster::leave(Lock&) {
if (state != LEFT) {
state = LEFT;
- QPID_LOG(notice, *this << " leaving cluster " << name.str());
+ QPID_LOG(notice, *this << " leaving cluster " << name);
if (mgmtObject!=0) mgmtObject->set_status("SHUTDOWN");
if (!deliverQueue.isStopped()) deliverQueue.stop();
- try { cpg.leave(name); }
+ try { cpg.leave(); }
catch (const std::exception& e) {
QPID_LOG(critical, *this << " error leaving process group: " << e.what());
}
@@ -224,7 +235,7 @@ boost::intrusive_ptr<Connection> Cluster::getConnection(const ConnectionId& conn
}
else { // New shadow connection
std::ostringstream mgmtId;
- mgmtId << name.str() << ":" << connectionId;
+ mgmtId << name << ":" << connectionId;
ConnectionMap::value_type value(connectionId,
new Connection(*this, shadowOut, mgmtId.str(), connectionId));
i = connections.insert(value).first;
@@ -260,7 +271,7 @@ void Cluster::deliver(const Event& e, Lock&) {
}
// Entry point: called when deliverQueue has events to process.
-void Cluster::delivered(const Event& e) {
+bool Cluster::delivered(const Event& e) {
try {
Lock l(lock);
delivered(e,l);
@@ -268,7 +279,7 @@ void Cluster::delivered(const Event& e) {
QPID_LOG(critical, *this << " error in cluster delivery: " << e.what());
leave();
}
-
+ return true;
}
void Cluster::delivered(const Event& e, Lock& l) {
@@ -334,6 +345,7 @@ ostream& operator<<(ostream& o, const AddrList& a) {
void Cluster::dispatch(sys::DispatchHandle& h) {
try {
cpg.dispatchAll();
+ mcastQueue.start(); // In case it was stopped by flow control.
h.rewatch();
} catch (const std::exception& e) {
QPID_LOG(critical, *this << " error in cluster dispatch: " << e.what());
@@ -359,7 +371,7 @@ void Cluster::configChange (
cpg_address */*joined*/, int /*nJoined*/)
{
Mutex::ScopedLock l(lock);
- QPID_LOG(debug, *this << " enqueue config change: " << AddrList(current, nCurrent)
+ QPID_LOG(debug, *this << " config change: " << AddrList(current, nCurrent)
<< AddrList(left, nLeft, "( ", ")"));
std::string addresses;
for (cpg_address* p = current; p < current+nCurrent; ++p)
@@ -387,7 +399,7 @@ void Cluster::configChange(const MemberId&, const std::string& addresses, Lock&
}
else { // Joining established group.
state = NEWBIE;
- QPID_LOG(info, *this << " joining established cluster");
+ QPID_LOG(info, *this << " joining cluster: " << map);
mcastControl(ClusterDumpRequestBody(ProtocolVersion(), myUrl.str()), l);
}
}
@@ -542,12 +554,12 @@ void Cluster::stopClusterNode(Lock& l) {
}
void Cluster::stopFullCluster(Lock& l) {
- QPID_LOG(notice, *this << " shutting down cluster " << name.str());
+ QPID_LOG(notice, *this << " shutting down cluster " << name);
mcastControl(ClusterShutdownBody(), l);
}
void Cluster::memberUpdate(Lock& l) {
- QPID_LOG(info, *this << map.memberCount() << " members: " << map);
+ QPID_LOG(info, *this << " member update: " << map);
std::vector<Url> urls = getUrls(l);
size_t size = urls.size();
failoverExchange->setUrls(urls);