summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/cluster/Cluster.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/cluster/Cluster.cpp')
-rw-r--r--cpp/src/qpid/cluster/Cluster.cpp72
1 files changed, 15 insertions, 57 deletions
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp
index 222aa07548..aac5bc1dd8 100644
--- a/cpp/src/qpid/cluster/Cluster.cpp
+++ b/cpp/src/qpid/cluster/Cluster.cpp
@@ -90,20 +90,19 @@ Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker& b, b
name(name_),
myUrl(url_),
myId(cpg.self()),
+ readMax(readMax_),
cpgDispatchHandle(
cpg,
boost::bind(&Cluster::dispatch, this, _1), // read
0, // write
boost::bind(&Cluster::disconnect, this, _1) // disconnect
),
- deliverQueue(boost::bind(&Cluster::delivered, this, _1), poller),
- mcastQueue(boost::bind(&Cluster::sendMcast, this, _1), poller),
- mcastId(0),
+ mcast(cpg, poller),
mgmtObject(0),
+ deliverQueue(boost::bind(&Cluster::delivered, this, _1), poller),
state(INIT),
lastSize(0),
- lastBroker(false),
- readMax(readMax_)
+ lastBroker(false)
{
ManagementAgent* agent = ManagementAgent::Singleton::getInstance();
if (agent != 0){
@@ -116,7 +115,6 @@ Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker& b, b
failoverExchange.reset(new FailoverExchange(this));
cpgDispatchHandle.startWatch(poller);
deliverQueue.start();
- mcastQueue.start();
QPID_LOG(notice, *this << " joining cluster " << name << " with url=" << myUrl);
if (quorum_) quorum.init();
cpg.join(name);
@@ -135,49 +133,6 @@ void Cluster::erase(ConnectionId id) {
connections.erase(id);
}
-void Cluster::mcastControl(const framing::AMQBody& body, const ConnectionId& id, uint32_t seq) {
- Event e(Event::control(body, id, seq));
- QPID_LOG(trace, *this << " MCAST " << e << ": " << body);
- mcast(e);
-}
-
-void Cluster::mcastControl(const framing::AMQBody& body) {
- Event e(Event::control(body, ConnectionId(myId,0), ++mcastId));
- QPID_LOG(trace, *this << " MCAST " << e << ": " << body);
- mcast(e);
-}
-
-void Cluster::mcastBuffer(const char* data, size_t size, const ConnectionId& connection, uint32_t id) {
- Event e(DATA, connection, size, id);
- memcpy(e.getData(), data, size);
- {
- Lock l(lock);
- if (state <= CATCHUP && e.isConnection()) {
- // Stall outgoing connection events untill we are fully READY
- QPID_LOG(trace, *this << " MCAST deferred: " << e );
- mcastStallQueue.push_back(e);
- return;
- }
- }
- QPID_LOG(trace, *this << " MCAST " << e);
- mcast(e);
-}
-
-void Cluster::mcast(const Event& e) { mcastQueue.push(e); }
-
-void Cluster::sendMcast(PollableEventQueue::Queue& values) {
- try {
- PollableEventQueue::Queue::iterator i = values.begin();
- while (i != values.end() && i->mcast(cpg))
- ++i;
- values.erase(values.begin(), i);
- }
- catch (const std::exception& e) {
- QPID_LOG(critical, "Multicast failure: " << e.what());
- leave();
- }
-}
-
std::vector<Url> Cluster::getUrls() const {
Lock l(lock);
return getUrls(l);
@@ -315,7 +270,6 @@ ostream& operator<<(ostream& o, const AddrList& a) {
void Cluster::dispatch(sys::DispatchHandle& h) {
try {
cpg.dispatchAll();
- mcastQueue.start(); // In case it was stopped by flow control.
h.rewatch();
} catch (const std::exception& e) {
QPID_LOG(critical, *this << " error in cluster dispatch: " << e.what());
@@ -361,7 +315,9 @@ void Cluster::configChange(const MemberId&, const std::string& addresses, Lock&
if (state == INIT) { // First configChange
if (map.aliveCount() == 1) {
setClusterId(true);
+ // FIXME aconway 2008-12-11: Centralize transition to READY and associated actions eg mcast.release()
state = READY;
+ mcast.release();
QPID_LOG(notice, *this << " first in cluster");
if (mgmtObject!=0) mgmtObject->set_status("ACTIVE");
map = ClusterMap(myId, myUrl, true);
@@ -370,7 +326,7 @@ void Cluster::configChange(const MemberId&, const std::string& addresses, Lock&
else { // Joining established group.
state = NEWBIE;
QPID_LOG(info, *this << " joining cluster: " << map);
- mcastControl(ClusterDumpRequestBody(ProtocolVersion(), myUrl.str()));
+ mcast.mcastControl(ClusterDumpRequestBody(ProtocolVersion(), myUrl.str()), myId);
}
}
else if (state >= READY && memberChange)
@@ -384,7 +340,7 @@ void Cluster::tryMakeOffer(const MemberId& id, Lock& ) {
if (state == READY && map.isNewbie(id)) {
state = OFFER;
QPID_LOG(info, *this << " send dump-offer to " << id);
- mcastControl(ClusterDumpOfferBody(ProtocolVersion(), id, clusterId));
+ mcast.mcastControl(ClusterDumpOfferBody(ProtocolVersion(), id, clusterId), myId);
}
}
@@ -414,10 +370,10 @@ void Cluster::ready(const MemberId& id, const std::string& url, Lock& l) {
memberUpdate(l);
if (state == CATCHUP && id == myId) {
state = READY;
+ mcast.release();
QPID_LOG(notice, *this << " caught up, active cluster member");
if (mgmtObject!=0) mgmtObject->set_status("ACTIVE");
- for_each(mcastStallQueue.begin(), mcastStallQueue.end(), boost::bind(&Cluster::mcast, this, _1));
- mcastStallQueue.clear();
+ mcast.release();
}
}
@@ -432,6 +388,7 @@ void Cluster::dumpOffer(const MemberId& dumper, uint64_t dumpeeInt, const Uuid&
}
else { // Another offer was first.
state = READY;
+ mcast.release();
QPID_LOG(info, *this << " cancelled dump offer to " << dumpee);
tryMakeOffer(map.firstNewbie(), l); // Maybe make another offer.
}
@@ -461,6 +418,7 @@ void Cluster::dumpStart(const MemberId& dumpee, const Url& url, Lock&) {
boost::bind(&Cluster::dumpOutError, this, _1)));
}
+// Called in dump thread.
void Cluster::dumpInDone(const ClusterMap& m) {
Lock l(lock);
dumpedMap = m;
@@ -471,8 +429,7 @@ void Cluster::checkDumpIn(Lock& ) {
if (state == LEFT) return;
if (state == DUMPEE && dumpedMap) {
map = *dumpedMap;
- mcastControl(ClusterReadyBody(ProtocolVersion(), myUrl.str()));
- // Don't flush the mcast queue till we are READY, on self-deliver.
+ mcast.mcastControl(ClusterReadyBody(ProtocolVersion(), myUrl.str()), myId);
state = CATCHUP;
QPID_LOG(info, *this << " received dump, starting catch-up");
deliverQueue.start();
@@ -487,6 +444,7 @@ void Cluster::dumpOutDone() {
void Cluster::dumpOutDone(Lock& l) {
assert(state == DUMPER);
state = READY;
+ mcast.release();
QPID_LOG(info, *this << " sent dump");
deliverQueue.start();
tryMakeOffer(map.firstNewbie(), l); // Try another offer
@@ -523,7 +481,7 @@ void Cluster::stopClusterNode(Lock& l) {
void Cluster::stopFullCluster(Lock& ) {
QPID_LOG(notice, *this << " shutting down cluster " << name);
- mcastControl(ClusterShutdownBody());
+ mcast.mcastControl(ClusterShutdownBody(), myId);
}
void Cluster::memberUpdate(Lock& l) {