summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/cluster/MemberHandler.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/cluster/MemberHandler.cpp')
-rw-r--r--cpp/src/qpid/cluster/MemberHandler.cpp30
1 files changed, 21 insertions, 9 deletions
diff --git a/cpp/src/qpid/cluster/MemberHandler.cpp b/cpp/src/qpid/cluster/MemberHandler.cpp
index e82eaec458..1997ced9b0 100644
--- a/cpp/src/qpid/cluster/MemberHandler.cpp
+++ b/cpp/src/qpid/cluster/MemberHandler.cpp
@@ -23,6 +23,7 @@
#include "DumpClient.h"
#include "qpid/log/Statement.h"
#include "qpid/framing/ClusterUpdateBody.h"
+#include "qpid/framing/enum.h"
namespace qpid {
namespace cluster {
@@ -32,6 +33,10 @@ using namespace framing;
MemberHandler::MemberHandler(Cluster& c) : ClusterHandler(c) {}
+MemberHandler::~MemberHandler() {
+ if (dumpThread.id()) dumpThread.join(); // Join the last dumpthread.
+}
+
void MemberHandler::configChange(
cpg_address */*current*/, int /*nCurrent*/,
cpg_address */*left*/, int /*nLeft*/,
@@ -58,11 +63,10 @@ void MemberHandler::dumpRequest(const MemberId& dumpee, const std::string& urlSt
assert(!cluster.connectionEventQueue.isStopped()); // Not currently stalled.
cluster.stall();
- cluster.ready(); // FIXME aconway 2008-09-18: Bypass dump
- (void)urlStr;
-// dumpThread = Thread(new DumpClient(Url(urlStr), cluster.broker,
-// boost::bind(&MemberHandler::dumpDone, this),
-// boost::bind(&MemberHandler::dumpError, this, _1)));
+ if (dumpThread.id()) dumpThread.join(); // Join the last dumpthread.
+ dumpThread = Thread(new DumpClient(Url(urlStr), cluster.broker,
+ boost::bind(&MemberHandler::dumpSent, this),
+ boost::bind(&MemberHandler::dumpError, this, _1)));
}
void MemberHandler::ready(const MemberId& id, const std::string& url) {
@@ -70,14 +74,22 @@ void MemberHandler::ready(const MemberId& id, const std::string& url) {
}
-void MemberHandler::dumpDone() {
- dumpThread.join(); // Clean up.
+void MemberHandler::dumpSent() {
+ QPID_LOG(debug, "Finished sending state dump.");
+ Mutex::ScopedLock l(cluster.lock);
cluster.ready();
}
void MemberHandler::dumpError(const std::exception& e) {
- QPID_LOG(error, "Error in state dump from " << cluster.self << ": " << e.what());
- dumpDone();
+ QPID_LOG(error, "Error sending state dump from " << cluster.self << ": " << e.what());
+ dumpSent();
+}
+
+void MemberHandler::insert(const boost::intrusive_ptr<Connection>& c) {
+ if (c->isCatchUp()) // Not allowed in member mode
+ c->getBrokerConnection().close(execution::ERROR_CODE_ILLEGAL_STATE, "Not in catch-up mode.");
+ else
+ cluster.connections[c->getId()] = c;
}
}} // namespace qpid::cluster