diff options
Diffstat (limited to 'cpp/src/qpid/cluster')
-rw-r--r-- | cpp/src/qpid/cluster/Cluster.cpp | 11 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/Connection.cpp | 8 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/Connection.h | 3 |
3 files changed, 22 insertions, 0 deletions
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp index 70c73191ee..f6022aa5b8 100644 --- a/cpp/src/qpid/cluster/Cluster.cpp +++ b/cpp/src/qpid/cluster/Cluster.cpp @@ -565,6 +565,17 @@ void Cluster::memberUpdate(Lock& l) { } mgmtObject->set_members(urlstr); } + + //close connections belonging to members that have now been excluded + for (ConnectionMap::iterator i = connections.begin(); i != connections.end();) { + MemberId member = i->first.getMember(); + if (member != myId && !map.isMember(member)) { + i->second->left(); + connections.erase(i++); + } else { + i++; + } + } } std::ostream& operator<<(std::ostream& o, const Cluster& cluster) { diff --git a/cpp/src/qpid/cluster/Connection.cpp b/cpp/src/qpid/cluster/Connection.cpp index 513816735d..bcdc4ffe27 100644 --- a/cpp/src/qpid/cluster/Connection.cpp +++ b/cpp/src/qpid/cluster/Connection.cpp @@ -141,6 +141,7 @@ void Connection::delivered(framing::AMQFrame& f) { } } +// A local connection is closed by the network layer. void Connection::closed() { try { if (catchUp) { @@ -165,12 +166,19 @@ void Connection::closed() { } } +// Self-delivery of close message, close the connection. void Connection::deliverClose () { assert(!catchUp); connection.closed(); cluster.erase(self); } +// Member of a shadow connection left the cluster. +void Connection::left() { + assert(isShadow()); + connection.closed(); +} + // Decode data from local clients. size_t Connection::decode(const char* buffer, size_t size) { if (catchUp) { // Handle catch-up locally. diff --git a/cpp/src/qpid/cluster/Connection.h b/cpp/src/qpid/cluster/Connection.h index 2eafa90f32..06176bf81d 100644 --- a/cpp/src/qpid/cluster/Connection.h +++ b/cpp/src/qpid/cluster/Connection.h @@ -89,6 +89,9 @@ class Connection : void idleOut() { connection.idleOut(); } void idleIn() { connection.idleIn(); } + /** Called if the connectors member has left the cluster */ + void left(); + // ConnectionCodec methods size_t decode(const char* buffer, size_t size); |