diff options
Diffstat (limited to 'cpp/src/qpid/cluster/Cluster.cpp')
-rw-r--r-- | cpp/src/qpid/cluster/Cluster.cpp | 58 |
1 files changed, 38 insertions, 20 deletions
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp index 9549527416..79b76f68be 100644 --- a/cpp/src/qpid/cluster/Cluster.cpp +++ b/cpp/src/qpid/cluster/Cluster.cpp @@ -92,6 +92,11 @@ void Cluster::insert(const boost::intrusive_ptr<Connection>& c) { handler->insert(c); } +void Cluster::catchUpClosed(const boost::intrusive_ptr<Connection>& c) { + Mutex::ScopedLock l(lock); + handler->catchUpClosed(c); +} + void Cluster::erase(ConnectionId id) { Mutex::ScopedLock l(lock); connections.erase(id); @@ -119,6 +124,7 @@ void Cluster::mcastBuffer(const char* data, size_t size, const ConnectionId& con } void Cluster::mcastEvent(const Event& e) { + QPID_LOG(trace, "MCAST " << e); e.mcast(name, cpg); } @@ -170,8 +176,10 @@ void Cluster::deliver( throw Exception(QPID_MSG("Invalid cluster control")); } } - else + else { + QPID_LOG(trace, "DLVR" << (connectionEventQueue.isStopped() ? "(stalled)" : "") << " " << e); handler->deliver(e); + } } catch (const std::exception& e) { QPID_LOG(critical, "Error in cluster deliver: " << e.what()); @@ -181,14 +189,17 @@ void Cluster::deliver( void Cluster::connectionEvent(const Event& e) { Buffer buf(e); - assert(e.getConnection()); - if (e.getType() == DATA) - e.getConnection()->deliverBuffer(buf); + QPID_LOG(trace, "EXEC: " << e); + boost::intrusive_ptr<Connection> connection = getConnection(e.getConnectionId()); + assert(connection); + if (e.getType() == DATA) { + connection->deliverBuffer(buf); + } else { // control AMQFrame frame; while (frame.decode(buf)) { - QPID_LOG(trace, "DLVR [" << self << "]: " << frame); - e.getConnection()->received(frame); + QPID_LOG(trace, "EXEC [" << *connection << "]: " << frame); + connection->received(frame); } } } @@ -196,26 +207,28 @@ void Cluster::connectionEvent(const Event& e) { struct AddrList { const cpg_address* addrs; int count; - const char* prefix; - AddrList(const cpg_address* a, int n, const char* p=0) : addrs(a), count(n), prefix(p) {} + const char *prefix, *suffix; + AddrList(const cpg_address* a, int n, const char* p="", const char* s="") + : addrs(a), count(n), prefix(p), suffix(s) {} }; ostream& operator<<(ostream& o, const AddrList& a) { - if (a.count && a.prefix) o << a.prefix; + if (!a.count) return o; + o << a.prefix; for (const cpg_address* p = a.addrs; p < a.addrs+a.count; ++p) { const char* reasonString; switch (p->reason) { - case CPG_REASON_JOIN: reasonString = " joined"; break; - case CPG_REASON_LEAVE: reasonString = " left";break; - case CPG_REASON_NODEDOWN: reasonString = " node-down";break; - case CPG_REASON_NODEUP: reasonString = " node-up";break; - case CPG_REASON_PROCDOWN: reasonString = " process-down";break; + case CPG_REASON_JOIN: reasonString = " joined "; break; + case CPG_REASON_LEAVE: reasonString = " left "; break; + case CPG_REASON_NODEDOWN: reasonString = " node-down "; break; + case CPG_REASON_NODEUP: reasonString = " node-up "; break; + case CPG_REASON_PROCDOWN: reasonString = " process-down "; break; default: reasonString = " "; } qpid::cluster::MemberId member(*p); - o << member << reasonString << ((p+1 < a.addrs+a.count) ? ", " : ""); + o << member << reasonString; } - return o; + return o << a.suffix; } void Cluster::dispatch(sys::DispatchHandle& h) { @@ -238,8 +251,8 @@ void Cluster::configChange( cpg_address *joined, int nJoined) { Mutex::ScopedLock l(lock); - QPID_LOG(debug, "Cluster: " << AddrList(current, nCurrent) << ". " - << AddrList(left, nLeft, "Left: ")); + QPID_LOG(debug, "CPG members: " << AddrList(current, nCurrent) + << AddrList(left, nLeft, "( ", ")")); if (find(left, left+nLeft, self) != left+nLeft) { // I have left the group, this is the final config change. @@ -289,9 +302,14 @@ void Cluster::stall() { } void Cluster::ready() { - // Called with lock held - QPID_LOG(info, self << " ready at URL " << url); + QPID_LOG(debug, self << " ready at " << url); + unstall(); mcastControl(ClusterReadyBody(ProtocolVersion(), url.str()), 0); +} + +void Cluster::unstall() { + // Called with lock held + QPID_LOG(debug, self << " un-stalling"); handler = &memberHandler; // Member mode. connectionEventQueue.start(poller); // if (mgmtObject!=0) |