summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/cluster/Cluster.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/cluster/Cluster.cpp')
-rw-r--r--cpp/src/qpid/cluster/Cluster.cpp58
1 files changed, 38 insertions, 20 deletions
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp
index 9549527416..79b76f68be 100644
--- a/cpp/src/qpid/cluster/Cluster.cpp
+++ b/cpp/src/qpid/cluster/Cluster.cpp
@@ -92,6 +92,11 @@ void Cluster::insert(const boost::intrusive_ptr<Connection>& c) {
handler->insert(c);
}
+void Cluster::catchUpClosed(const boost::intrusive_ptr<Connection>& c) {
+ Mutex::ScopedLock l(lock);
+ handler->catchUpClosed(c);
+}
+
void Cluster::erase(ConnectionId id) {
Mutex::ScopedLock l(lock);
connections.erase(id);
@@ -119,6 +124,7 @@ void Cluster::mcastBuffer(const char* data, size_t size, const ConnectionId& con
}
void Cluster::mcastEvent(const Event& e) {
+ QPID_LOG(trace, "MCAST " << e);
e.mcast(name, cpg);
}
@@ -170,8 +176,10 @@ void Cluster::deliver(
throw Exception(QPID_MSG("Invalid cluster control"));
}
}
- else
+ else {
+ QPID_LOG(trace, "DLVR" << (connectionEventQueue.isStopped() ? "(stalled)" : "") << " " << e);
handler->deliver(e);
+ }
}
catch (const std::exception& e) {
QPID_LOG(critical, "Error in cluster deliver: " << e.what());
@@ -181,14 +189,17 @@ void Cluster::deliver(
void Cluster::connectionEvent(const Event& e) {
Buffer buf(e);
- assert(e.getConnection());
- if (e.getType() == DATA)
- e.getConnection()->deliverBuffer(buf);
+ QPID_LOG(trace, "EXEC: " << e);
+ boost::intrusive_ptr<Connection> connection = getConnection(e.getConnectionId());
+ assert(connection);
+ if (e.getType() == DATA) {
+ connection->deliverBuffer(buf);
+ }
else { // control
AMQFrame frame;
while (frame.decode(buf)) {
- QPID_LOG(trace, "DLVR [" << self << "]: " << frame);
- e.getConnection()->received(frame);
+ QPID_LOG(trace, "EXEC [" << *connection << "]: " << frame);
+ connection->received(frame);
}
}
}
@@ -196,26 +207,28 @@ void Cluster::connectionEvent(const Event& e) {
struct AddrList {
const cpg_address* addrs;
int count;
- const char* prefix;
- AddrList(const cpg_address* a, int n, const char* p=0) : addrs(a), count(n), prefix(p) {}
+ const char *prefix, *suffix;
+ AddrList(const cpg_address* a, int n, const char* p="", const char* s="")
+ : addrs(a), count(n), prefix(p), suffix(s) {}
};
ostream& operator<<(ostream& o, const AddrList& a) {
- if (a.count && a.prefix) o << a.prefix;
+ if (!a.count) return o;
+ o << a.prefix;
for (const cpg_address* p = a.addrs; p < a.addrs+a.count; ++p) {
const char* reasonString;
switch (p->reason) {
- case CPG_REASON_JOIN: reasonString = " joined"; break;
- case CPG_REASON_LEAVE: reasonString = " left";break;
- case CPG_REASON_NODEDOWN: reasonString = " node-down";break;
- case CPG_REASON_NODEUP: reasonString = " node-up";break;
- case CPG_REASON_PROCDOWN: reasonString = " process-down";break;
+ case CPG_REASON_JOIN: reasonString = " joined "; break;
+ case CPG_REASON_LEAVE: reasonString = " left "; break;
+ case CPG_REASON_NODEDOWN: reasonString = " node-down "; break;
+ case CPG_REASON_NODEUP: reasonString = " node-up "; break;
+ case CPG_REASON_PROCDOWN: reasonString = " process-down "; break;
default: reasonString = " ";
}
qpid::cluster::MemberId member(*p);
- o << member << reasonString << ((p+1 < a.addrs+a.count) ? ", " : "");
+ o << member << reasonString;
}
- return o;
+ return o << a.suffix;
}
void Cluster::dispatch(sys::DispatchHandle& h) {
@@ -238,8 +251,8 @@ void Cluster::configChange(
cpg_address *joined, int nJoined)
{
Mutex::ScopedLock l(lock);
- QPID_LOG(debug, "Cluster: " << AddrList(current, nCurrent) << ". "
- << AddrList(left, nLeft, "Left: "));
+ QPID_LOG(debug, "CPG members: " << AddrList(current, nCurrent)
+ << AddrList(left, nLeft, "( ", ")"));
if (find(left, left+nLeft, self) != left+nLeft) {
// I have left the group, this is the final config change.
@@ -289,9 +302,14 @@ void Cluster::stall() {
}
void Cluster::ready() {
- // Called with lock held
- QPID_LOG(info, self << " ready at URL " << url);
+ QPID_LOG(debug, self << " ready at " << url);
+ unstall();
mcastControl(ClusterReadyBody(ProtocolVersion(), url.str()), 0);
+}
+
+void Cluster::unstall() {
+ // Called with lock held
+ QPID_LOG(debug, self << " un-stalling");
handler = &memberHandler; // Member mode.
connectionEventQueue.start(poller);
// if (mgmtObject!=0)