summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--cpp/src/qpid/cluster/Cluster.cpp58
-rw-r--r--cpp/src/qpid/cluster/Cluster.h9
2 files changed, 25 insertions, 42 deletions
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp
index 15332c2cac..602933b88b 100644
--- a/cpp/src/qpid/cluster/Cluster.cpp
+++ b/cpp/src/qpid/cluster/Cluster.cpp
@@ -137,48 +137,34 @@ void Cluster::erase(ConnectionId id) {
}
void Cluster::mcastControl(const framing::AMQBody& body, const ConnectionId& id, uint32_t seq) {
- Lock l(lock);
- mcastControl(body, id, seq, l);
-}
-
-void Cluster::mcastControl(const framing::AMQBody& body, const ConnectionId& id, uint32_t seq, Lock& l) {
Event e(Event::control(body, id, seq));
QPID_LOG(trace, *this << " MCAST " << e << ": " << body);
- mcast(e, l);
+ mcast(e);
}
-void Cluster::mcastControl(const framing::AMQBody& body, Lock& l) {
+void Cluster::mcastControl(const framing::AMQBody& body) {
Event e(Event::control(body, ConnectionId(myId,0), ++mcastId));
QPID_LOG(trace, *this << " MCAST " << e << ": " << body);
- mcast(e, l);
+ mcast(e);
}
void Cluster::mcastBuffer(const char* data, size_t size, const ConnectionId& connection, uint32_t id) {
- Lock l(lock);
- mcastBuffer(data, size, connection, id, l);
-}
-
-void Cluster::mcastBuffer(const char* data, size_t size, const ConnectionId& connection, uint32_t id, Lock&) {
- Lock l(lock);
Event e(DATA, connection, size, id);
memcpy(e.getData(), data, size);
+ {
+ Lock l(lock);
+ if (state <= CATCHUP && e.isConnection()) {
+ // Stall outgoing connection events untill we are fully READY
+ QPID_LOG(trace, *this << " MCAST deferred: " << e );
+ mcastStallQueue.push_back(e);
+ return;
+ }
+ }
QPID_LOG(trace, *this << " MCAST " << e);
- mcast(e, l);
+ mcast(e);
}
-void Cluster::mcast(const Event& e) { Lock l(lock); mcast(e, l); }
-
-void Cluster::mcast(const Event& e, Lock&) {
- if (state == LEFT)
- return;
- if (state <= CATCHUP && e.isConnection()) {
- // Stall outgoing connection events untill we are fully READY
- QPID_LOG(trace, *this << " MCAST deferred: " << e );
- mcastStallQueue.push_back(e);
- }
- else
- mcastQueue.push(e);
-}
+void Cluster::mcast(const Event& e) { mcastQueue.push(e); }
bool Cluster::sendMcast(const Event& e) {
try {
@@ -383,7 +369,7 @@ void Cluster::configChange(const MemberId&, const std::string& addresses, Lock&
else { // Joining established group.
state = NEWBIE;
QPID_LOG(info, *this << " joining cluster: " << map);
- mcastControl(ClusterDumpRequestBody(ProtocolVersion(), myUrl.str()), l);
+ mcastControl(ClusterDumpRequestBody(ProtocolVersion(), myUrl.str()));
}
}
else if (state >= READY && memberChange)
@@ -393,11 +379,11 @@ void Cluster::configChange(const MemberId&, const std::string& addresses, Lock&
-void Cluster::tryMakeOffer(const MemberId& id, Lock& l) {
+void Cluster::tryMakeOffer(const MemberId& id, Lock& ) {
if (state == READY && map.isNewbie(id)) {
state = OFFER;
QPID_LOG(info, *this << " send dump-offer to " << id);
- mcastControl(ClusterDumpOfferBody(ProtocolVersion(), id, clusterId), l);
+ mcastControl(ClusterDumpOfferBody(ProtocolVersion(), id, clusterId));
}
}
@@ -429,7 +415,7 @@ void Cluster::ready(const MemberId& id, const std::string& url, Lock& l) {
state = READY;
QPID_LOG(notice, *this << " caught up, active cluster member");
if (mgmtObject!=0) mgmtObject->set_status("ACTIVE");
- for_each(mcastStallQueue.begin(), mcastStallQueue.end(), boost::bind(&Cluster::mcast, this, _1, boost::ref(l)));
+ for_each(mcastStallQueue.begin(), mcastStallQueue.end(), boost::bind(&Cluster::mcast, this, _1));
mcastStallQueue.clear();
}
}
@@ -482,11 +468,11 @@ void Cluster::dumpInDone(const ClusterMap& m) {
checkDumpIn(l);
}
-void Cluster::checkDumpIn(Lock& l) {
+void Cluster::checkDumpIn(Lock& ) {
if (state == LEFT) return;
if (state == DUMPEE && dumpedMap) {
map = *dumpedMap;
- mcastControl(ClusterReadyBody(ProtocolVersion(), myUrl.str()), l);
+ mcastControl(ClusterReadyBody(ProtocolVersion(), myUrl.str()));
// Don't flush the mcast queue till we are READY, on self-deliver.
state = CATCHUP;
QPID_LOG(info, *this << " received dump, starting catch-up");
@@ -536,9 +522,9 @@ void Cluster::stopClusterNode(Lock& l) {
leave(l);
}
-void Cluster::stopFullCluster(Lock& l) {
+void Cluster::stopFullCluster(Lock& ) {
QPID_LOG(notice, *this << " shutting down cluster " << name);
- mcastControl(ClusterShutdownBody(), l);
+ mcastControl(ClusterShutdownBody());
}
void Cluster::memberUpdate(Lock& l) {
diff --git a/cpp/src/qpid/cluster/Cluster.h b/cpp/src/qpid/cluster/Cluster.h
index 14df4db905..2ab2da6fa8 100644
--- a/cpp/src/qpid/cluster/Cluster.h
+++ b/cpp/src/qpid/cluster/Cluster.h
@@ -79,7 +79,6 @@ class Cluster : private Cpg::Handler, public management::Manageable {
// Send to the cluster
void mcastControl(const framing::AMQBody& controlBody, const ConnectionId&, uint32_t id);
void mcastBuffer(const char*, size_t, const ConnectionId&, uint32_t id);
- void mcast(const Event& e);
// URLs of current cluster members.
std::vector<Url> getUrls() const;
@@ -110,11 +109,9 @@ class Cluster : private Cpg::Handler, public management::Manageable {
// The parameter makes it hard to forget since you have to have an instance of
// a Lock to call the unlocked functions.
- // Unlocked versions of public functions
- void mcastControl(const framing::AMQBody& controlBody, const ConnectionId&, uint32_t, Lock&);
- void mcastControl(const framing::AMQBody& controlBody, Lock&);
- void mcastBuffer(const char*, size_t, const ConnectionId&, uint32_t id, Lock&);
- void mcast(const Event& e, Lock&);
+ void mcastControl(const framing::AMQBody& controlBody);
+ void mcast(const Event& e);
+
void leave(Lock&);
std::vector<Url> getUrls(Lock&) const;