diff options
Diffstat (limited to 'cpp/src/qpid/cluster/JoiningHandler.cpp')
-rw-r--r-- | cpp/src/qpid/cluster/JoiningHandler.cpp | 37 |
1 files changed, 31 insertions, 6 deletions
diff --git a/cpp/src/qpid/cluster/JoiningHandler.cpp b/cpp/src/qpid/cluster/JoiningHandler.cpp index 3358e3404b..c188fe438e 100644 --- a/cpp/src/qpid/cluster/JoiningHandler.cpp +++ b/cpp/src/qpid/cluster/JoiningHandler.cpp @@ -30,7 +30,7 @@ namespace cluster { using namespace sys; using namespace framing; -JoiningHandler::JoiningHandler(Cluster& c) : ClusterHandler(c), state(START) {} +JoiningHandler::JoiningHandler(Cluster& c) : ClusterHandler(c), state(START), catchUpConnections(0) {} void JoiningHandler::configChange( cpg_address *current, int nCurrent, @@ -74,21 +74,17 @@ void JoiningHandler::dumpRequest(const MemberId& dumpee, const std::string& ) { else { // Start a new dump cluster.map.dumper = cluster.map.first(); if (dumpee == cluster.self) { // My turn - - state = DUMP_COMPLETE; // FIXME aconway 2008-09-18: bypass dump - - QPID_LOG(info, cluster.self << " receiving state dump from " << cluster.map.dumper); switch (state) { case START: case STALLED: assert(0); break; case DUMP_REQUESTED: + QPID_LOG(info, cluster.self << " stalling for dump from " << cluster.map.dumper); state = STALLED; cluster.stall(); break; - // FIXME aconway 2008-09-17: no transition to DUMP_COMPLETE state. case DUMP_COMPLETE: cluster.ready(); break; @@ -102,5 +98,34 @@ void JoiningHandler::ready(const MemberId& id, const std::string& url) { checkDumpRequest(); } +void JoiningHandler::insert(const boost::intrusive_ptr<Connection>& c) { + if (c->isCatchUp()) { + ++catchUpConnections; + QPID_LOG(debug, "Received " << catchUpConnections << " catch-up connections."); + } + else if (c->isExCatchUp()) { + if (c->getId().getConnectionPtr() != c.get()) // become shadow connection + cluster.connections.insert(Cluster::ConnectionMap::value_type(c->getId(), c)); + QPID_LOG(debug, "Catch-up connection terminated " << catchUpConnections-1 << " remaining"); + if (--catchUpConnections == 0) + dumpComplete(); + } + else // Local connection, will be stalled till dump complete. + cluster.connections.insert(Cluster::ConnectionMap::value_type(c->getId(), c)); +} + +void JoiningHandler::dumpComplete() { + // FIXME aconway 2008-09-18: need to detect incomplete dump. + // + if (state == STALLED) { + QPID_LOG(debug, "Dump complete, unstalling."); + cluster.ready(); + } + else { + QPID_LOG(debug, "Dump complete, waiting for stall point."); + assert(state == DUMP_REQUESTED); + state = DUMP_COMPLETE; + } +} }} // namespace qpid::cluster |