From 9809badd6486af7697767b80f123885eb1892e4a Mon Sep 17 00:00:00 2001 From: Alan Conway Date: Fri, 26 Sep 2008 21:49:52 +0000 Subject: Clean up end-of-dump protocol for new cluster members. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@699513 13f79535-47bb-0310-9956-ffa450edef68 --- qpid/cpp/src/qpid/cluster/Cluster.cpp | 7 +++++-- qpid/cpp/src/qpid/cluster/Cluster.h | 10 ++++------ qpid/cpp/src/qpid/cluster/ClusterHandler.h | 3 +-- qpid/cpp/src/qpid/cluster/ClusterMap.cpp | 9 ++++----- qpid/cpp/src/qpid/cluster/Connection.cpp | 30 +++++++++------------------- qpid/cpp/src/qpid/cluster/DumpClient.cpp | 15 +++++++++----- qpid/cpp/src/qpid/cluster/JoiningHandler.cpp | 22 ++------------------ qpid/cpp/src/qpid/cluster/JoiningHandler.h | 4 +--- qpid/cpp/src/qpid/cluster/MemberHandler.cpp | 14 +------------ qpid/cpp/src/qpid/cluster/MemberHandler.h | 3 +-- 10 files changed, 38 insertions(+), 79 deletions(-) (limited to 'qpid/cpp') diff --git a/qpid/cpp/src/qpid/cluster/Cluster.cpp b/qpid/cpp/src/qpid/cluster/Cluster.cpp index 9cd8d1842c..19f4318a9c 100644 --- a/qpid/cpp/src/qpid/cluster/Cluster.cpp +++ b/qpid/cpp/src/qpid/cluster/Cluster.cpp @@ -91,9 +91,12 @@ Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker& b) : Cluster::~Cluster() {} -void Cluster::insert(const boost::intrusive_ptr& c) { handler->insert(c); } +void Cluster::insert(const boost::intrusive_ptr& c) { + Mutex::ScopedLock l(lock); + connections.insert(Cluster::ConnectionMap::value_type(c->getId(), c)); +} -void Cluster::catchUpClosed(const boost::intrusive_ptr& c) { handler->catchUpClosed(c); } +void Cluster::dumpComplete() { handler->dumpComplete(); } void Cluster::erase(ConnectionId id) { Mutex::ScopedLock l(lock); diff --git a/qpid/cpp/src/qpid/cluster/Cluster.h b/qpid/cpp/src/qpid/cluster/Cluster.h index eaa91202dc..0b56540f9a 100644 --- a/qpid/cpp/src/qpid/cluster/Cluster.h +++ b/qpid/cpp/src/qpid/cluster/Cluster.h @@ -62,10 +62,10 @@ class Cluster : private Cpg::Handler, public management::Manageable virtual ~Cluster(); - void insert(const boost::intrusive_ptr&); // Insert a local connection - void erase(ConnectionId); // Erase a connection. - - void catchUpClosed(const boost::intrusive_ptr&); // Insert a local connection + // FIXME aconway 2008-09-26: thread safety + void insert(const boost::intrusive_ptr&); + void erase(ConnectionId); + void dumpComplete(); /** Get the URLs of current cluster members. */ std::vector getUrls() const; @@ -94,8 +94,6 @@ class Cluster : private Cpg::Handler, public management::Manageable broker::Broker& getBroker(); - void setDumpComplete(); - template void eachConnection(const F& f) { for (ConnectionMap::const_iterator i = connections.begin(); i != connections.end(); ++i) f(i->second); diff --git a/qpid/cpp/src/qpid/cluster/ClusterHandler.h b/qpid/cpp/src/qpid/cluster/ClusterHandler.h index 37b0ed4d79..d8bcaa8fe8 100644 --- a/qpid/cpp/src/qpid/cluster/ClusterHandler.h +++ b/qpid/cpp/src/qpid/cluster/ClusterHandler.h @@ -59,8 +59,7 @@ class ClusterHandler cpg_address *left, int nLeft, cpg_address *joined, int nJoined) = 0; - virtual void insert(const boost::intrusive_ptr& c) = 0; - virtual void catchUpClosed(const boost::intrusive_ptr& c) = 0; + virtual void dumpComplete() = 0; protected: Cluster& cluster; diff --git a/qpid/cpp/src/qpid/cluster/ClusterMap.cpp b/qpid/cpp/src/qpid/cluster/ClusterMap.cpp index c721749aba..b5b71cd397 100644 --- a/qpid/cpp/src/qpid/cluster/ClusterMap.cpp +++ b/qpid/cpp/src/qpid/cluster/ClusterMap.cpp @@ -45,7 +45,7 @@ bool ClusterMap::left(const cpg_address* addrs, size_t nLeft) { changed = members.erase(*a) || changed; if (dumper && !isMember(dumper)) dumper = MemberId(); - QPID_LOG_IF(debug, changed, *this); + QPID_LOG_IF(debug, changed, "Members left. " << *this); return changed; } @@ -66,7 +66,7 @@ bool ClusterMap::update(const framing::FieldTable& ftMembers, uint64_t dumper_) Url url(i->second->get()); changed = members.insert(Members::value_type(id, url)).second || changed; } - QPID_LOG_IF(debug, changed, *this); + QPID_LOG_IF(debug, changed, "Update: " << *this); return changed; } @@ -94,12 +94,11 @@ bool ClusterMap::ready(const MemberId& id, const Url& url) { bool changed = members.insert(Members::value_type(id,url)).second; if (id == dumper) { dumper = MemberId(); - QPID_LOG(info, id << " finished dump."); + QPID_LOG(info, id << " finished dump. " << *this); } else { - QPID_LOG(info, id << " joined cluster, url=" << url); + QPID_LOG(info, id << " joined, url=" << url << ". " << *this); } - QPID_LOG_IF(debug, changed, *this); return changed; } diff --git a/qpid/cpp/src/qpid/cluster/Connection.cpp b/qpid/cpp/src/qpid/cluster/Connection.cpp index 21eec0f86e..6aab31c177 100644 --- a/qpid/cpp/src/qpid/cluster/Connection.cpp +++ b/qpid/cpp/src/qpid/cluster/Connection.cpp @@ -76,10 +76,13 @@ void Connection::deliverDoOutput(uint32_t requested) { void Connection::received(framing::AMQFrame& f) { QPID_LOG(trace, "RECV " << *this << ": " << f); if (isShadow()) { - // Final close that completes catch-up for shadow connection. + // Intercept the close that completes catch-up for shadow a connection. if (catchUp && f.getMethod() && f.getMethod()->isA()) { + catchUp = false; AMQFrame ok(in_place()); + cluster.insert(boost::intrusive_ptr(this)); connection.getOutput().send(ok); + output.setOutputHandler(discardHandler); } else QPID_LOG(warning, *this << " ignoring unexpected frame: " << f); @@ -104,27 +107,13 @@ void Connection::delivered(framing::AMQFrame& f) { void Connection::closed() { try { QPID_LOG(debug, "Connection closed " << *this); - - if (catchUp) { - cluster.catchUpClosed(boost::intrusive_ptr(this)); - if (isShadow()) - catchUp = false; - else { - connection.closed(); - return; - } - } - - // Local network connection has closed. We need to keep the - // connection around but replace the output handler with a - // no-op handler as the network output handler will be - // deleted. - output.setOutputHandler(discardHandler); - - if (isLocal() && !catchUp) { + if (catchUp) + connection.closed(); + else if (isLocal()) { // This was a local replicated connection. Multicast a deliver // closed and process any outstanding frames from the cluster // until self-delivery of deliver-close. + output.setOutputHandler(discardHandler); cluster.mcastControl(ClusterConnectionDeliverCloseBody(), this); ++mcastSeq; } @@ -188,11 +177,10 @@ void Connection::shadowReady(uint64_t memberId, uint64_t connectionId) { ConnectionId shadow = ConnectionId(memberId, connectionId); QPID_LOG(debug, "Catch-up connection " << self << " becomes shadow " << shadow); self = shadow; - assert(isShadow()); } void Connection::dumpComplete() { - // FIXME aconway 2008-09-18: use or remove. + cluster.dumpComplete(); } bool Connection::isLocal() const { return self.first == cluster.getSelf() && self.second == this; } diff --git a/qpid/cpp/src/qpid/cluster/DumpClient.cpp b/qpid/cpp/src/qpid/cluster/DumpClient.cpp index d2d3c9bb15..59542a2e95 100644 --- a/qpid/cpp/src/qpid/cluster/DumpClient.cpp +++ b/qpid/cpp/src/qpid/cluster/DumpClient.cpp @@ -59,6 +59,12 @@ using namespace framing; namespace arg=client::arg; using client::SessionBase_0_10Access; +struct ClusterConnectionProxy : public AMQP_AllProxy::ClusterConnection { + ClusterConnectionProxy(client::Connection& c) : + AMQP_AllProxy::ClusterConnection(*client::ConnectionAccess::getImpl(c)) {} +}; + + // Create a connection with special version that marks it as a catch-up connection. client::Connection catchUpConnection() { client::Connection c; @@ -101,7 +107,7 @@ void DumpClient::dump() { session.sync(); session.close(); donor.eachConnection(boost::bind(&DumpClient::dumpConnection, this, _1)); - // FIXME aconway 2008-09-18: inidicate successful end-of-dump. + ClusterConnectionProxy(connection).dumpComplete(); connection.close(); QPID_LOG(debug, donor.getSelf() << " dumped all state to " << receiver); } @@ -160,10 +166,9 @@ void DumpClient::dumpConnection(const boost::intrusive_ptr& dumpConn // authentication etc. See ConnectionSettings. shadowConnection.open(receiver, bc.getUserId()); dumpConnection->getBrokerConnection().eachSessionHandler(boost::bind(&DumpClient::dumpSession, this, _1)); - boost::shared_ptr impl = client::ConnectionAccess::getImpl(shadowConnection); - AMQP_AllProxy::ClusterConnection proxy(*impl); - proxy.shadowReady(dumpConnection->getId().getMember(), - reinterpret_cast(dumpConnection->getId().getConnectionPtr())); + ClusterConnectionProxy(shadowConnection).shadowReady( + dumpConnection->getId().getMember(), + reinterpret_cast(dumpConnection->getId().getConnectionPtr())); shadowConnection.close(); QPID_LOG(debug, donor.getId() << " dumped connection " << *dumpConnection); } diff --git a/qpid/cpp/src/qpid/cluster/JoiningHandler.cpp b/qpid/cpp/src/qpid/cluster/JoiningHandler.cpp index 58c444ac62..6838313263 100644 --- a/qpid/cpp/src/qpid/cluster/JoiningHandler.cpp +++ b/qpid/cpp/src/qpid/cluster/JoiningHandler.cpp @@ -105,27 +105,8 @@ void JoiningHandler::ready(const MemberId& id, const std::string& urlStr) { checkDumpRequest(); } -void JoiningHandler::insert(const boost::intrusive_ptr& c) { - Mutex::ScopedLock l(cluster.lock); - if (c->isCatchUp()) { - ++catchUpConnections; - QPID_LOG(debug, "Catch-up connection " << *c << " started, total " << catchUpConnections); - } - cluster.connections.insert(Cluster::ConnectionMap::value_type(c->getId(), c)); -} - -void JoiningHandler::catchUpClosed(const boost::intrusive_ptr& c) { - Mutex::ScopedLock l(cluster.lock); - QPID_LOG(debug, "Catch-up complete for " << *c << ", remaining catch-ups: " << catchUpConnections-1); - if (c->isShadow()) - cluster.connections.insert(Cluster::ConnectionMap::value_type(c->getId(), c)); - if (--catchUpConnections == 0) - dumpComplete(); -} - void JoiningHandler::dumpComplete() { - // FIXME aconway 2008-09-18: need to detect incomplete dump. - // Called with lock - volatile? + Mutex::ScopedLock l(cluster.lock); if (state == STALLED) { QPID_LOG(debug, cluster.self << " received dump and stalled at start point, unstalling."); cluster.ready(); @@ -135,6 +116,7 @@ void JoiningHandler::dumpComplete() { assert(state == DUMP_REQUESTED); state = DUMP_COMPLETE; } + // FIXME aconway 2008-09-18: need to detect incomplete dump. } diff --git a/qpid/cpp/src/qpid/cluster/JoiningHandler.h b/qpid/cpp/src/qpid/cluster/JoiningHandler.h index 097d765e5e..cc47690ac5 100644 --- a/qpid/cpp/src/qpid/cluster/JoiningHandler.h +++ b/qpid/cpp/src/qpid/cluster/JoiningHandler.h @@ -46,12 +46,10 @@ class JoiningHandler : public ClusterHandler void dumpRequest(const MemberId&, const std::string& url); void ready(const MemberId&, const std::string& url); - void insert(const boost::intrusive_ptr& c); - void catchUpClosed(const boost::intrusive_ptr& c); + void dumpComplete(); private: void checkDumpRequest(); - void dumpComplete(); enum { START, DUMP_REQUESTED, STALLED, DUMP_COMPLETE } state; size_t catchUpConnections; diff --git a/qpid/cpp/src/qpid/cluster/MemberHandler.cpp b/qpid/cpp/src/qpid/cluster/MemberHandler.cpp index 5f02754443..99e7b7d683 100644 --- a/qpid/cpp/src/qpid/cluster/MemberHandler.cpp +++ b/qpid/cpp/src/qpid/cluster/MemberHandler.cpp @@ -90,18 +90,6 @@ void MemberHandler::dumpError(const std::exception& e) { dumpSent(); } -void MemberHandler::insert(const boost::intrusive_ptr& c) { - Mutex::ScopedLock l(cluster.lock); - if (c->isCatchUp()) // Not allowed in member mode - c->getBrokerConnection().close(execution::ERROR_CODE_ILLEGAL_STATE, "Not in catch-up mode."); - else - cluster.connections[c->getId()] = c; -} - -void MemberHandler::catchUpClosed(const boost::intrusive_ptr& c) { - Mutex::ScopedLock l(cluster.lock); - QPID_LOG(warning, "Catch-up connection " << c << " closed in member mode"); - assert(0); -} +void MemberHandler::dumpComplete() { assert(0); } }} // namespace qpid::cluster diff --git a/qpid/cpp/src/qpid/cluster/MemberHandler.h b/qpid/cpp/src/qpid/cluster/MemberHandler.h index 7655034ed8..37cf653b7b 100644 --- a/qpid/cpp/src/qpid/cluster/MemberHandler.h +++ b/qpid/cpp/src/qpid/cluster/MemberHandler.h @@ -52,8 +52,7 @@ class MemberHandler : public ClusterHandler void dumpSent(); void dumpError(const std::exception&); - void insert(const boost::intrusive_ptr& c); - void catchUpClosed(const boost::intrusive_ptr& ); + void dumpComplete(); public: sys::Thread dumpThread; -- cgit v1.2.1