diff options
Diffstat (limited to 'cpp/src/qpid/cluster/JoiningHandler.cpp')
-rw-r--r-- | cpp/src/qpid/cluster/JoiningHandler.cpp | 25 |
1 files changed, 12 insertions, 13 deletions
diff --git a/cpp/src/qpid/cluster/JoiningHandler.cpp b/cpp/src/qpid/cluster/JoiningHandler.cpp index c188fe438e..8f08cb615f 100644 --- a/cpp/src/qpid/cluster/JoiningHandler.cpp +++ b/cpp/src/qpid/cluster/JoiningHandler.cpp @@ -40,14 +40,13 @@ void JoiningHandler::configChange( if (nLeft == 0 && nCurrent == 1 && *current == cluster.self) { // First in cluster. QPID_LOG(notice, cluster.self << " first in cluster."); cluster.map.ready(cluster.self, cluster.url); - cluster.ready(); + cluster.unstall(); } } void JoiningHandler::deliver(Event& e) { - // Discard connection events unless we are stalled and getting a dump. + // Discard connection events unless we are stalled to receive a dump. if (state == STALLED) { - e.setConnection(cluster.getConnection(e.getConnectionId())); cluster.connectionEventQueue.push(e); } } @@ -73,6 +72,7 @@ void JoiningHandler::dumpRequest(const MemberId& dumpee, const std::string& ) { } else { // Start a new dump cluster.map.dumper = cluster.map.first(); + QPID_LOG(debug, "Starting dump, dumper=" << cluster.map.dumper << " dumpee=" << dumpee); if (dumpee == cluster.self) { // My turn switch (state) { case START: @@ -101,24 +101,23 @@ void JoiningHandler::ready(const MemberId& id, const std::string& url) { void JoiningHandler::insert(const boost::intrusive_ptr<Connection>& c) { if (c->isCatchUp()) { ++catchUpConnections; - QPID_LOG(debug, "Received " << catchUpConnections << " catch-up connections."); - } - else if (c->isExCatchUp()) { - if (c->getId().getConnectionPtr() != c.get()) // become shadow connection - cluster.connections.insert(Cluster::ConnectionMap::value_type(c->getId(), c)); - QPID_LOG(debug, "Catch-up connection terminated " << catchUpConnections-1 << " remaining"); - if (--catchUpConnections == 0) - dumpComplete(); + QPID_LOG(debug, "Catch-up connection " << *c << " started, total " << catchUpConnections); } - else // Local connection, will be stalled till dump complete. + cluster.connections.insert(Cluster::ConnectionMap::value_type(c->getId(), c)); +} + +void JoiningHandler::catchUpClosed(const boost::intrusive_ptr<Connection>& c) { + QPID_LOG(debug, "Catch-up connection " << *c << " finished, remaining " << catchUpConnections-1); + if (c->isShadow()) cluster.connections.insert(Cluster::ConnectionMap::value_type(c->getId(), c)); + if (--catchUpConnections == 0) + dumpComplete(); } void JoiningHandler::dumpComplete() { // FIXME aconway 2008-09-18: need to detect incomplete dump. // if (state == STALLED) { - QPID_LOG(debug, "Dump complete, unstalling."); cluster.ready(); } else { |