summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/cluster/JoiningHandler.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/cluster/JoiningHandler.cpp')
-rw-r--r--cpp/src/qpid/cluster/JoiningHandler.cpp37
1 files changed, 31 insertions, 6 deletions
diff --git a/cpp/src/qpid/cluster/JoiningHandler.cpp b/cpp/src/qpid/cluster/JoiningHandler.cpp
index 3358e3404b..c188fe438e 100644
--- a/cpp/src/qpid/cluster/JoiningHandler.cpp
+++ b/cpp/src/qpid/cluster/JoiningHandler.cpp
@@ -30,7 +30,7 @@ namespace cluster {
using namespace sys;
using namespace framing;
-JoiningHandler::JoiningHandler(Cluster& c) : ClusterHandler(c), state(START) {}
+JoiningHandler::JoiningHandler(Cluster& c) : ClusterHandler(c), state(START), catchUpConnections(0) {}
void JoiningHandler::configChange(
cpg_address *current, int nCurrent,
@@ -74,21 +74,17 @@ void JoiningHandler::dumpRequest(const MemberId& dumpee, const std::string& ) {
else { // Start a new dump
cluster.map.dumper = cluster.map.first();
if (dumpee == cluster.self) { // My turn
-
- state = DUMP_COMPLETE; // FIXME aconway 2008-09-18: bypass dump
-
- QPID_LOG(info, cluster.self << " receiving state dump from " << cluster.map.dumper);
switch (state) {
case START:
case STALLED:
assert(0); break;
case DUMP_REQUESTED:
+ QPID_LOG(info, cluster.self << " stalling for dump from " << cluster.map.dumper);
state = STALLED;
cluster.stall();
break;
- // FIXME aconway 2008-09-17: no transition to DUMP_COMPLETE state.
case DUMP_COMPLETE:
cluster.ready();
break;
@@ -102,5 +98,34 @@ void JoiningHandler::ready(const MemberId& id, const std::string& url) {
checkDumpRequest();
}
+void JoiningHandler::insert(const boost::intrusive_ptr<Connection>& c) {
+ if (c->isCatchUp()) {
+ ++catchUpConnections;
+ QPID_LOG(debug, "Received " << catchUpConnections << " catch-up connections.");
+ }
+ else if (c->isExCatchUp()) {
+ if (c->getId().getConnectionPtr() != c.get()) // become shadow connection
+ cluster.connections.insert(Cluster::ConnectionMap::value_type(c->getId(), c));
+ QPID_LOG(debug, "Catch-up connection terminated " << catchUpConnections-1 << " remaining");
+ if (--catchUpConnections == 0)
+ dumpComplete();
+ }
+ else // Local connection, will be stalled till dump complete.
+ cluster.connections.insert(Cluster::ConnectionMap::value_type(c->getId(), c));
+}
+
+void JoiningHandler::dumpComplete() {
+ // FIXME aconway 2008-09-18: need to detect incomplete dump.
+ //
+ if (state == STALLED) {
+ QPID_LOG(debug, "Dump complete, unstalling.");
+ cluster.ready();
+ }
+ else {
+ QPID_LOG(debug, "Dump complete, waiting for stall point.");
+ assert(state == DUMP_REQUESTED);
+ state = DUMP_COMPLETE;
+ }
+}
}} // namespace qpid::cluster