summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/cluster/Cluster.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/cluster/Cluster.cpp')
-rw-r--r--cpp/src/qpid/cluster/Cluster.cpp58
1 files changed, 22 insertions, 36 deletions
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp
index 15332c2cac..602933b88b 100644
--- a/cpp/src/qpid/cluster/Cluster.cpp
+++ b/cpp/src/qpid/cluster/Cluster.cpp
@@ -137,48 +137,34 @@ void Cluster::erase(ConnectionId id) {
}
void Cluster::mcastControl(const framing::AMQBody& body, const ConnectionId& id, uint32_t seq) {
- Lock l(lock);
- mcastControl(body, id, seq, l);
-}
-
-void Cluster::mcastControl(const framing::AMQBody& body, const ConnectionId& id, uint32_t seq, Lock& l) {
Event e(Event::control(body, id, seq));
QPID_LOG(trace, *this << " MCAST " << e << ": " << body);
- mcast(e, l);
+ mcast(e);
}
-void Cluster::mcastControl(const framing::AMQBody& body, Lock& l) {
+void Cluster::mcastControl(const framing::AMQBody& body) {
Event e(Event::control(body, ConnectionId(myId,0), ++mcastId));
QPID_LOG(trace, *this << " MCAST " << e << ": " << body);
- mcast(e, l);
+ mcast(e);
}
void Cluster::mcastBuffer(const char* data, size_t size, const ConnectionId& connection, uint32_t id) {
- Lock l(lock);
- mcastBuffer(data, size, connection, id, l);
-}
-
-void Cluster::mcastBuffer(const char* data, size_t size, const ConnectionId& connection, uint32_t id, Lock&) {
- Lock l(lock);
Event e(DATA, connection, size, id);
memcpy(e.getData(), data, size);
+ {
+ Lock l(lock);
+ if (state <= CATCHUP && e.isConnection()) {
+ // Stall outgoing connection events untill we are fully READY
+ QPID_LOG(trace, *this << " MCAST deferred: " << e );
+ mcastStallQueue.push_back(e);
+ return;
+ }
+ }
QPID_LOG(trace, *this << " MCAST " << e);
- mcast(e, l);
+ mcast(e);
}
-void Cluster::mcast(const Event& e) { Lock l(lock); mcast(e, l); }
-
-void Cluster::mcast(const Event& e, Lock&) {
- if (state == LEFT)
- return;
- if (state <= CATCHUP && e.isConnection()) {
- // Stall outgoing connection events untill we are fully READY
- QPID_LOG(trace, *this << " MCAST deferred: " << e );
- mcastStallQueue.push_back(e);
- }
- else
- mcastQueue.push(e);
-}
+void Cluster::mcast(const Event& e) { mcastQueue.push(e); }
bool Cluster::sendMcast(const Event& e) {
try {
@@ -383,7 +369,7 @@ void Cluster::configChange(const MemberId&, const std::string& addresses, Lock&
else { // Joining established group.
state = NEWBIE;
QPID_LOG(info, *this << " joining cluster: " << map);
- mcastControl(ClusterDumpRequestBody(ProtocolVersion(), myUrl.str()), l);
+ mcastControl(ClusterDumpRequestBody(ProtocolVersion(), myUrl.str()));
}
}
else if (state >= READY && memberChange)
@@ -393,11 +379,11 @@ void Cluster::configChange(const MemberId&, const std::string& addresses, Lock&
-void Cluster::tryMakeOffer(const MemberId& id, Lock& l) {
+void Cluster::tryMakeOffer(const MemberId& id, Lock& ) {
if (state == READY && map.isNewbie(id)) {
state = OFFER;
QPID_LOG(info, *this << " send dump-offer to " << id);
- mcastControl(ClusterDumpOfferBody(ProtocolVersion(), id, clusterId), l);
+ mcastControl(ClusterDumpOfferBody(ProtocolVersion(), id, clusterId));
}
}
@@ -429,7 +415,7 @@ void Cluster::ready(const MemberId& id, const std::string& url, Lock& l) {
state = READY;
QPID_LOG(notice, *this << " caught up, active cluster member");
if (mgmtObject!=0) mgmtObject->set_status("ACTIVE");
- for_each(mcastStallQueue.begin(), mcastStallQueue.end(), boost::bind(&Cluster::mcast, this, _1, boost::ref(l)));
+ for_each(mcastStallQueue.begin(), mcastStallQueue.end(), boost::bind(&Cluster::mcast, this, _1));
mcastStallQueue.clear();
}
}
@@ -482,11 +468,11 @@ void Cluster::dumpInDone(const ClusterMap& m) {
checkDumpIn(l);
}
-void Cluster::checkDumpIn(Lock& l) {
+void Cluster::checkDumpIn(Lock& ) {
if (state == LEFT) return;
if (state == DUMPEE && dumpedMap) {
map = *dumpedMap;
- mcastControl(ClusterReadyBody(ProtocolVersion(), myUrl.str()), l);
+ mcastControl(ClusterReadyBody(ProtocolVersion(), myUrl.str()));
// Don't flush the mcast queue till we are READY, on self-deliver.
state = CATCHUP;
QPID_LOG(info, *this << " received dump, starting catch-up");
@@ -536,9 +522,9 @@ void Cluster::stopClusterNode(Lock& l) {
leave(l);
}
-void Cluster::stopFullCluster(Lock& l) {
+void Cluster::stopFullCluster(Lock& ) {
QPID_LOG(notice, *this << " shutting down cluster " << name);
- mcastControl(ClusterShutdownBody(), l);
+ mcastControl(ClusterShutdownBody());
}
void Cluster::memberUpdate(Lock& l) {