diff options
Diffstat (limited to 'cpp')
-rw-r--r-- | cpp/src/qpid/cluster/Cluster.cpp | 58 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/Cluster.h | 9 |
2 files changed, 25 insertions, 42 deletions
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp index 15332c2cac..602933b88b 100644 --- a/cpp/src/qpid/cluster/Cluster.cpp +++ b/cpp/src/qpid/cluster/Cluster.cpp @@ -137,48 +137,34 @@ void Cluster::erase(ConnectionId id) { } void Cluster::mcastControl(const framing::AMQBody& body, const ConnectionId& id, uint32_t seq) { - Lock l(lock); - mcastControl(body, id, seq, l); -} - -void Cluster::mcastControl(const framing::AMQBody& body, const ConnectionId& id, uint32_t seq, Lock& l) { Event e(Event::control(body, id, seq)); QPID_LOG(trace, *this << " MCAST " << e << ": " << body); - mcast(e, l); + mcast(e); } -void Cluster::mcastControl(const framing::AMQBody& body, Lock& l) { +void Cluster::mcastControl(const framing::AMQBody& body) { Event e(Event::control(body, ConnectionId(myId,0), ++mcastId)); QPID_LOG(trace, *this << " MCAST " << e << ": " << body); - mcast(e, l); + mcast(e); } void Cluster::mcastBuffer(const char* data, size_t size, const ConnectionId& connection, uint32_t id) { - Lock l(lock); - mcastBuffer(data, size, connection, id, l); -} - -void Cluster::mcastBuffer(const char* data, size_t size, const ConnectionId& connection, uint32_t id, Lock&) { - Lock l(lock); Event e(DATA, connection, size, id); memcpy(e.getData(), data, size); + { + Lock l(lock); + if (state <= CATCHUP && e.isConnection()) { + // Stall outgoing connection events untill we are fully READY + QPID_LOG(trace, *this << " MCAST deferred: " << e ); + mcastStallQueue.push_back(e); + return; + } + } QPID_LOG(trace, *this << " MCAST " << e); - mcast(e, l); + mcast(e); } -void Cluster::mcast(const Event& e) { Lock l(lock); mcast(e, l); } - -void Cluster::mcast(const Event& e, Lock&) { - if (state == LEFT) - return; - if (state <= CATCHUP && e.isConnection()) { - // Stall outgoing connection events untill we are fully READY - QPID_LOG(trace, *this << " MCAST deferred: " << e ); - mcastStallQueue.push_back(e); - } - else - mcastQueue.push(e); -} +void Cluster::mcast(const Event& e) { mcastQueue.push(e); } bool Cluster::sendMcast(const Event& e) { try { @@ -383,7 +369,7 @@ void Cluster::configChange(const MemberId&, const std::string& addresses, Lock& else { // Joining established group. state = NEWBIE; QPID_LOG(info, *this << " joining cluster: " << map); - mcastControl(ClusterDumpRequestBody(ProtocolVersion(), myUrl.str()), l); + mcastControl(ClusterDumpRequestBody(ProtocolVersion(), myUrl.str())); } } else if (state >= READY && memberChange) @@ -393,11 +379,11 @@ void Cluster::configChange(const MemberId&, const std::string& addresses, Lock& -void Cluster::tryMakeOffer(const MemberId& id, Lock& l) { +void Cluster::tryMakeOffer(const MemberId& id, Lock& ) { if (state == READY && map.isNewbie(id)) { state = OFFER; QPID_LOG(info, *this << " send dump-offer to " << id); - mcastControl(ClusterDumpOfferBody(ProtocolVersion(), id, clusterId), l); + mcastControl(ClusterDumpOfferBody(ProtocolVersion(), id, clusterId)); } } @@ -429,7 +415,7 @@ void Cluster::ready(const MemberId& id, const std::string& url, Lock& l) { state = READY; QPID_LOG(notice, *this << " caught up, active cluster member"); if (mgmtObject!=0) mgmtObject->set_status("ACTIVE"); - for_each(mcastStallQueue.begin(), mcastStallQueue.end(), boost::bind(&Cluster::mcast, this, _1, boost::ref(l))); + for_each(mcastStallQueue.begin(), mcastStallQueue.end(), boost::bind(&Cluster::mcast, this, _1)); mcastStallQueue.clear(); } } @@ -482,11 +468,11 @@ void Cluster::dumpInDone(const ClusterMap& m) { checkDumpIn(l); } -void Cluster::checkDumpIn(Lock& l) { +void Cluster::checkDumpIn(Lock& ) { if (state == LEFT) return; if (state == DUMPEE && dumpedMap) { map = *dumpedMap; - mcastControl(ClusterReadyBody(ProtocolVersion(), myUrl.str()), l); + mcastControl(ClusterReadyBody(ProtocolVersion(), myUrl.str())); // Don't flush the mcast queue till we are READY, on self-deliver. state = CATCHUP; QPID_LOG(info, *this << " received dump, starting catch-up"); @@ -536,9 +522,9 @@ void Cluster::stopClusterNode(Lock& l) { leave(l); } -void Cluster::stopFullCluster(Lock& l) { +void Cluster::stopFullCluster(Lock& ) { QPID_LOG(notice, *this << " shutting down cluster " << name); - mcastControl(ClusterShutdownBody(), l); + mcastControl(ClusterShutdownBody()); } void Cluster::memberUpdate(Lock& l) { diff --git a/cpp/src/qpid/cluster/Cluster.h b/cpp/src/qpid/cluster/Cluster.h index 14df4db905..2ab2da6fa8 100644 --- a/cpp/src/qpid/cluster/Cluster.h +++ b/cpp/src/qpid/cluster/Cluster.h @@ -79,7 +79,6 @@ class Cluster : private Cpg::Handler, public management::Manageable { // Send to the cluster void mcastControl(const framing::AMQBody& controlBody, const ConnectionId&, uint32_t id); void mcastBuffer(const char*, size_t, const ConnectionId&, uint32_t id); - void mcast(const Event& e); // URLs of current cluster members. std::vector<Url> getUrls() const; @@ -110,11 +109,9 @@ class Cluster : private Cpg::Handler, public management::Manageable { // The parameter makes it hard to forget since you have to have an instance of // a Lock to call the unlocked functions. - // Unlocked versions of public functions - void mcastControl(const framing::AMQBody& controlBody, const ConnectionId&, uint32_t, Lock&); - void mcastControl(const framing::AMQBody& controlBody, Lock&); - void mcastBuffer(const char*, size_t, const ConnectionId&, uint32_t id, Lock&); - void mcast(const Event& e, Lock&); + void mcastControl(const framing::AMQBody& controlBody); + void mcast(const Event& e); + void leave(Lock&); std::vector<Url> getUrls(Lock&) const; |