diff options
Diffstat (limited to 'cpp/src/qpid/cluster/MemberHandler.cpp')
-rw-r--r-- | cpp/src/qpid/cluster/MemberHandler.cpp | 30 |
1 files changed, 21 insertions, 9 deletions
diff --git a/cpp/src/qpid/cluster/MemberHandler.cpp b/cpp/src/qpid/cluster/MemberHandler.cpp index e82eaec458..1997ced9b0 100644 --- a/cpp/src/qpid/cluster/MemberHandler.cpp +++ b/cpp/src/qpid/cluster/MemberHandler.cpp @@ -23,6 +23,7 @@ #include "DumpClient.h" #include "qpid/log/Statement.h" #include "qpid/framing/ClusterUpdateBody.h" +#include "qpid/framing/enum.h" namespace qpid { namespace cluster { @@ -32,6 +33,10 @@ using namespace framing; MemberHandler::MemberHandler(Cluster& c) : ClusterHandler(c) {} +MemberHandler::~MemberHandler() { + if (dumpThread.id()) dumpThread.join(); // Join the last dumpthread. +} + void MemberHandler::configChange( cpg_address */*current*/, int /*nCurrent*/, cpg_address */*left*/, int /*nLeft*/, @@ -58,11 +63,10 @@ void MemberHandler::dumpRequest(const MemberId& dumpee, const std::string& urlSt assert(!cluster.connectionEventQueue.isStopped()); // Not currently stalled. cluster.stall(); - cluster.ready(); // FIXME aconway 2008-09-18: Bypass dump - (void)urlStr; -// dumpThread = Thread(new DumpClient(Url(urlStr), cluster.broker, -// boost::bind(&MemberHandler::dumpDone, this), -// boost::bind(&MemberHandler::dumpError, this, _1))); + if (dumpThread.id()) dumpThread.join(); // Join the last dumpthread. + dumpThread = Thread(new DumpClient(Url(urlStr), cluster.broker, + boost::bind(&MemberHandler::dumpSent, this), + boost::bind(&MemberHandler::dumpError, this, _1))); } void MemberHandler::ready(const MemberId& id, const std::string& url) { @@ -70,14 +74,22 @@ void MemberHandler::ready(const MemberId& id, const std::string& url) { } -void MemberHandler::dumpDone() { - dumpThread.join(); // Clean up. +void MemberHandler::dumpSent() { + QPID_LOG(debug, "Finished sending state dump."); + Mutex::ScopedLock l(cluster.lock); cluster.ready(); } void MemberHandler::dumpError(const std::exception& e) { - QPID_LOG(error, "Error in state dump from " << cluster.self << ": " << e.what()); - dumpDone(); + QPID_LOG(error, "Error sending state dump from " << cluster.self << ": " << e.what()); + dumpSent(); +} + +void MemberHandler::insert(const boost::intrusive_ptr<Connection>& c) { + if (c->isCatchUp()) // Not allowed in member mode + c->getBrokerConnection().close(execution::ERROR_CODE_ILLEGAL_STATE, "Not in catch-up mode."); + else + cluster.connections[c->getId()] = c; } }} // namespace qpid::cluster |