summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/cluster/JoiningHandler.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/cluster/JoiningHandler.cpp')
-rw-r--r--cpp/src/qpid/cluster/JoiningHandler.cpp25
1 files changed, 12 insertions, 13 deletions
diff --git a/cpp/src/qpid/cluster/JoiningHandler.cpp b/cpp/src/qpid/cluster/JoiningHandler.cpp
index c188fe438e..8f08cb615f 100644
--- a/cpp/src/qpid/cluster/JoiningHandler.cpp
+++ b/cpp/src/qpid/cluster/JoiningHandler.cpp
@@ -40,14 +40,13 @@ void JoiningHandler::configChange(
if (nLeft == 0 && nCurrent == 1 && *current == cluster.self) { // First in cluster.
QPID_LOG(notice, cluster.self << " first in cluster.");
cluster.map.ready(cluster.self, cluster.url);
- cluster.ready();
+ cluster.unstall();
}
}
void JoiningHandler::deliver(Event& e) {
- // Discard connection events unless we are stalled and getting a dump.
+ // Discard connection events unless we are stalled to receive a dump.
if (state == STALLED) {
- e.setConnection(cluster.getConnection(e.getConnectionId()));
cluster.connectionEventQueue.push(e);
}
}
@@ -73,6 +72,7 @@ void JoiningHandler::dumpRequest(const MemberId& dumpee, const std::string& ) {
}
else { // Start a new dump
cluster.map.dumper = cluster.map.first();
+ QPID_LOG(debug, "Starting dump, dumper=" << cluster.map.dumper << " dumpee=" << dumpee);
if (dumpee == cluster.self) { // My turn
switch (state) {
case START:
@@ -101,24 +101,23 @@ void JoiningHandler::ready(const MemberId& id, const std::string& url) {
void JoiningHandler::insert(const boost::intrusive_ptr<Connection>& c) {
if (c->isCatchUp()) {
++catchUpConnections;
- QPID_LOG(debug, "Received " << catchUpConnections << " catch-up connections.");
- }
- else if (c->isExCatchUp()) {
- if (c->getId().getConnectionPtr() != c.get()) // become shadow connection
- cluster.connections.insert(Cluster::ConnectionMap::value_type(c->getId(), c));
- QPID_LOG(debug, "Catch-up connection terminated " << catchUpConnections-1 << " remaining");
- if (--catchUpConnections == 0)
- dumpComplete();
+ QPID_LOG(debug, "Catch-up connection " << *c << " started, total " << catchUpConnections);
}
- else // Local connection, will be stalled till dump complete.
+ cluster.connections.insert(Cluster::ConnectionMap::value_type(c->getId(), c));
+}
+
+void JoiningHandler::catchUpClosed(const boost::intrusive_ptr<Connection>& c) {
+ QPID_LOG(debug, "Catch-up connection " << *c << " finished, remaining " << catchUpConnections-1);
+ if (c->isShadow())
cluster.connections.insert(Cluster::ConnectionMap::value_type(c->getId(), c));
+ if (--catchUpConnections == 0)
+ dumpComplete();
}
void JoiningHandler::dumpComplete() {
// FIXME aconway 2008-09-18: need to detect incomplete dump.
//
if (state == STALLED) {
- QPID_LOG(debug, "Dump complete, unstalling.");
cluster.ready();
}
else {