diff options
author | Alan Conway <aconway@apache.org> | 2008-10-03 20:56:38 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2008-10-03 20:56:38 +0000 |
commit | ff5c8a9034026a6d3ae437fa89c9f3cb9a1ba022 (patch) | |
tree | d8109d15ce3a85a9b6175ba2c9b3c51d8706fe9c /cpp/src | |
parent | 2141967346b884e592a42353ae596d37eb90fe7b (diff) | |
download | qpid-python-ff5c8a9034026a6d3ae437fa89c9f3cb9a1ba022.tar.gz |
Cluster join & brain-dumps working.
cluster: improved join protocol, fixed race conditions.
client/ConnectionHandler,ConnectionImpl: fixed connection close race causing client hang.
src/qpid/sys/PollableQueue.h: fixed incorrect use of startWatch/stopWatch.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@701532 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src')
29 files changed, 789 insertions, 897 deletions
diff --git a/cpp/src/Makefile.am b/cpp/src/Makefile.am index c489af901b..f9c6e74bd8 100644 --- a/cpp/src/Makefile.am +++ b/cpp/src/Makefile.am @@ -595,6 +595,7 @@ nobase_include_HEADERS = \ qpid/sys/Dispatcher.h \ qpid/sys/IntegerTypes.h \ qpid/sys/IOHandle.h \ + qpid/sys/LockPtr.h \ qpid/sys/Monitor.h \ qpid/sys/Mutex.h \ qpid/sys/OutputControl.h \ diff --git a/cpp/src/cluster.mk b/cpp/src/cluster.mk index 8060e49b97..443db3fb15 100644 --- a/cpp/src/cluster.mk +++ b/cpp/src/cluster.mk @@ -28,13 +28,7 @@ cluster_la_SOURCES = \ qpid/cluster/DumpClient.h \ qpid/cluster/DumpClient.cpp \ qpid/cluster/ClusterMap.h \ - qpid/cluster/ClusterMap.cpp \ - qpid/cluster/ClusterHandler.h \ - qpid/cluster/ClusterHandler.cpp \ - qpid/cluster/JoiningHandler.h \ - qpid/cluster/JoiningHandler.cpp \ - qpid/cluster/MemberHandler.h \ - qpid/cluster/MemberHandler.cpp + qpid/cluster/ClusterMap.cpp cluster_la_LIBADD= -lcpg libqpidbroker.la libqpidclient.la diff --git a/cpp/src/qpid/broker/Connection.cpp b/cpp/src/qpid/broker/Connection.cpp index 320efc42f1..5d4ebad4b9 100644 --- a/cpp/src/qpid/broker/Connection.cpp +++ b/cpp/src/qpid/broker/Connection.cpp @@ -167,6 +167,7 @@ void Connection::setFederationLink(bool b) void Connection::close( ReplyCode code, const string& text, ClassId classId, MethodId methodId) { + QPID_LOG_IF(error, code != 200, "Connection " << mgmtId << " closed by error: " << text << "(" << code << ")"); adapter.close(code, text, classId, methodId); channels.clear(); getOutput().close(); diff --git a/cpp/src/qpid/client/ConnectionHandler.cpp b/cpp/src/qpid/client/ConnectionHandler.cpp index 0321c2e6aa..7f1cd5ce7f 100644 --- a/cpp/src/qpid/client/ConnectionHandler.cpp +++ b/cpp/src/qpid/client/ConnectionHandler.cpp @@ -198,5 +198,7 @@ bool ConnectionHandler::isOpen() const bool ConnectionHandler::isClosed() const { int s = getState(); - return s == CLOSING || s == CLOSED || s == FAILED; + return s == CLOSED || s == FAILED; } + +bool ConnectionHandler::isClosing() const { return getState() == CLOSING; } diff --git a/cpp/src/qpid/client/ConnectionHandler.h b/cpp/src/qpid/client/ConnectionHandler.h index ffb612fae8..40be3a5237 100644 --- a/cpp/src/qpid/client/ConnectionHandler.h +++ b/cpp/src/qpid/client/ConnectionHandler.h @@ -99,6 +99,7 @@ public: // Note that open and closed aren't related by open = !closed bool isOpen() const; bool isClosed() const; + bool isClosing() const; CloseListener onClose; ErrorListener onError; diff --git a/cpp/src/qpid/client/ConnectionImpl.cpp b/cpp/src/qpid/client/ConnectionImpl.cpp index f180c4f23e..d9ac65c1b3 100644 --- a/cpp/src/qpid/client/ConnectionImpl.cpp +++ b/cpp/src/qpid/client/ConnectionImpl.cpp @@ -143,11 +143,13 @@ static const std::string CONN_CLOSED("Connection closed by broker"); void ConnectionImpl::shutdown() { Mutex::ScopedLock l(lock); - // FIXME aconway 2008-06-06: exception use, connection-forced is incorrect here. - setException(new ConnectionException(CLOSE_CODE_CONNECTION_FORCED, CONN_CLOSED)); if (handler.isClosed()) return; + // FIXME aconway 2008-06-06: exception use, amqp0-10 does not seem to have + // an appropriate close-code. connection-forced is not right. + if (!handler.isClosing()) + closeInternal(boost::bind(&SessionImpl::connectionBroke, _1, CLOSE_CODE_CONNECTION_FORCED, CONN_CLOSED)); + setException(new ConnectionException(CLOSE_CODE_CONNECTION_FORCED, CONN_CLOSED)); handler.fail(CONN_CLOSED); - closeInternal(boost::bind(&SessionImpl::connectionBroke, _1, CLOSE_CODE_CONNECTION_FORCED, CONN_CLOSED)); } void ConnectionImpl::erase(uint16_t ch) { diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp index b48443526c..9c503d6d13 100644 --- a/cpp/src/qpid/cluster/Cluster.cpp +++ b/cpp/src/qpid/cluster/Cluster.cpp @@ -18,19 +18,24 @@ #include "Cluster.h" #include "Connection.h" +#include "DumpClient.h" #include "qpid/broker/Broker.h" #include "qpid/broker/SessionState.h" #include "qpid/broker/Connection.h" #include "qpid/broker/QueueRegistry.h" #include "qpid/framing/AMQFrame.h" +#include "qpid/framing/AMQP_AllOperations.h" +#include "qpid/framing/AllInvoker.h" #include "qpid/framing/ClusterDumpRequestBody.h" -#include "qpid/framing/ClusterUpdateBody.h" #include "qpid/framing/ClusterReadyBody.h" +#include "qpid/framing/ClusterDumpOfferBody.h" +#include "qpid/framing/ClusterDumpStartBody.h" #include "qpid/framing/ClusterShutdownBody.h" #include "qpid/framing/ClusterConnectionDeliverCloseBody.h" #include "qpid/framing/ClusterConnectionDeliverDoOutputBody.h" #include "qpid/log/Statement.h" +#include "qpid/sys/Thread.h" #include "qpid/memory.h" #include "qpid/shared_ptr.h" #include "qmf/org/apache/qpid/cluster/Package.h" @@ -55,156 +60,221 @@ using qpid::management::Manageable; using qpid::management::Args; namespace qmf = qmf::org::apache::qpid::cluster; +/**@file + Threading notes: + - Public functions may be called in local connection IO threads. + see .h. +*/ + +struct ClusterDispatcher : public framing::AMQP_AllOperations::ClusterHandler { + qpid::cluster::Cluster& cluster; + MemberId member; + Cluster::Lock& l; + ClusterDispatcher(Cluster& c, const MemberId& id, Cluster::Lock& l_) : cluster(c), member(id), l(l_) {} + + void dumpRequest(const std::string& url) { cluster.dumpRequest(member, url, l); } + void ready(const std::string& url) { cluster.ready(member, url, l); } + void dumpOffer(uint64_t dumpee) { cluster.dumpOffer(member, dumpee, l); } + void dumpStart(uint64_t dumpee, const std::string& url) { cluster.dumpStart(member, dumpee, url, l); } + void shutdown() { cluster.shutdown(member, l); } + + bool invoke(AMQBody& body) { return framing::invoke(*this, body).wasHandled(); } +}; + Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker& b) : broker(b), poller(b.getPoller()), cpg(*this), name(name_), - url(url_), - self(cpg.self()), - cpgDispatchHandle(cpg, - boost::bind(&Cluster::dispatch, this, _1), // read - 0, // write - boost::bind(&Cluster::disconnect, this, _1) // disconnect + myUrl(url_), + memberId(cpg.self()), + cpgDispatchHandle( + cpg, + boost::bind(&Cluster::dispatch, this, _1), // read + 0, // write + boost::bind(&Cluster::disconnect, this, _1) // disconnect ), - connectionEventQueue(EventQueue::forEach(boost::bind(&Cluster::connectionEvent, this, _1))), + deliverQueue(boost::bind(&Cluster::process, this, _1), poller), + mcastId(0), mgmtObject(0), - handler(&joiningHandler), - joiningHandler(*this), - memberHandler(*this), - mcastId(), - lastSize(1) + state(INIT), + lastSize(1) { ManagementAgent* agent = ManagementAgent::Singleton::getInstance(); if (agent != 0){ qmf::Package packageInit(agent); - mgmtObject = new qmf::Cluster (agent, this, &broker,name.str(),url.str()); + mgmtObject = new qmf::Cluster (agent, this, &broker,name.str(),myUrl.str()); agent->addObject (mgmtObject); mgmtObject->set_status("JOINING"); - // FIXME aconway 2008-09-24: // if first cluster up set new UUID to set_clusterID() else set UUID of cluster being joined. } - QPID_LOG(notice, self << " joining cluster " << name.str()); broker.addFinalizer(boost::bind(&Cluster::brokerShutdown, this)); cpgDispatchHandle.startWatch(poller); cpg.join(name); + QPID_LOG(notice, *this << " joining cluster " << name.str()); } -Cluster::~Cluster() {} +Cluster::~Cluster() { + if (dumpThread.id()) dumpThread.join(); // Join the previous dumpthread. +} void Cluster::insert(const boost::intrusive_ptr<Connection>& c) { - Mutex::ScopedLock l(lock); + Lock l(lock); + assert(!c->isCatchUp()); connections.insert(Cluster::ConnectionMap::value_type(c->getId(), c)); } -void Cluster::dumpComplete() { handler->dumpComplete(); } - void Cluster::erase(ConnectionId id) { - Mutex::ScopedLock l(lock); + Lock l(lock); connections.erase(id); } -void Cluster::leave() { - QPID_LOG(notice, self << " leaving cluster " << name.str()); - cpg.leave(name); - // Defer shut down to the final configChange when the group knows we've left. +void Cluster::mcastControl(const framing::AMQBody& body, Connection* cptr) { + Lock l(lock); + mcastControl(body, cptr, l); } -void Cluster::mcastControl(const framing::AMQBody& body, Connection* cptr) { - AMQFrame f(body); - Event e(CONTROL, ConnectionId(self, cptr), f.size(), ++mcastId); - Buffer buf(e); - f.encode(buf); - QPID_LOG(trace, "MCAST " << e << " " << body); - mcastEvent(e); +void Cluster::mcastControl(const framing::AMQBody& body, Connection* cptr, Lock&) { + Lock l(lock); + Event e(Event::control(body, ConnectionId(memberId, cptr), ++mcastId)); + QPID_LOG(trace, *this << " MCAST " << e << ": " << body); + mcast(e, l); } void Cluster::mcastBuffer(const char* data, size_t size, const ConnectionId& connection, uint32_t id) { + Lock l(lock); + mcastBuffer(data, size, connection, id, l); +} + +void Cluster::mcastBuffer(const char* data, size_t size, const ConnectionId& connection, uint32_t id, Lock&) { + Lock l(lock); Event e(DATA, connection, size, id); memcpy(e.getData(), data, size); - QPID_LOG(trace, "MCAST " << e); - mcastEvent(e); + QPID_LOG(trace, *this << " MCAST " << e); + mcast(e, l); } -void Cluster::mcastEvent(const Event& e) { - e.mcast(name, cpg); -} +void Cluster::mcast(const Event& e) { Lock l(lock); mcast(e, l); } -size_t Cluster::size() const { - Mutex::ScopedLock l(lock); - return map.size(); +void Cluster::mcast(const Event& e, Lock&) { + if (state == LEFT) return; + if (state < READY && e.isConnection()) { + // Stall outgoing connection events. + QPID_LOG(trace, *this << " MCAST deferred: " << e ); + mcastQueue.push_back(e); + } + else + e.mcast(name, cpg); } std::vector<Url> Cluster::getUrls() const { - Mutex::ScopedLock l(lock); + Lock l(lock); + return getUrls(l); +} + +std::vector<Url> Cluster::getUrls(Lock&) const { return map.memberUrls(); } -// FIXME aconway 2008-09-15: volatile for locked/unlocked functions. -// Check locking from Handler functions. -boost::intrusive_ptr<Connection> Cluster::getConnection(const ConnectionId& id) { - Mutex::ScopedLock l(lock); - if (id.getMember() == self) - return boost::intrusive_ptr<Connection>(id.getConnectionPtr()); - ConnectionMap::iterator i = connections.find(id); +void Cluster::leave() { + Lock l(lock); + leave(l); +} + +void Cluster::leave(Lock&) { + if (state != LEFT) { + state = LEFT; + QPID_LOG(notice, *this << " leaving cluster " << name.str()); + + if (!deliverQueue.isStopped()) deliverQueue.stop(); + if (mgmtObject!=0) mgmtObject->set_status("SHUTDOWN"); + try { cpg.leave(name); } + catch (const std::exception& e) { + QPID_LOG(critical, *this << " error leaving process group: " << e.what()); + } + try { broker.shutdown(); } + catch (const std::exception& e) { + QPID_LOG(critical, *this << " error during shutdown, aborting: " << e.what()); + abort(); // Big trouble. + } + } +} + +boost::intrusive_ptr<Connection> Cluster::getConnection(const ConnectionId& connectionId, Lock&) { + if (connectionId.getMember() == memberId) + return boost::intrusive_ptr<Connection>(connectionId.getPointer()); + ConnectionMap::iterator i = connections.find(connectionId); if (i == connections.end()) { // New shadow connection. - assert(id.getMember() != self); + assert(connectionId.getMember() != memberId); std::ostringstream mgmtId; - mgmtId << name.str() << ":" << id; - ConnectionMap::value_type value(id, new Connection(*this, shadowOut, mgmtId.str(), id)); + mgmtId << name.str() << ":" << connectionId; + ConnectionMap::value_type value(connectionId, new Connection(*this, shadowOut, mgmtId.str(), connectionId)); i = connections.insert(value).first; } return i->second; } -void Cluster::deliver( +Cluster::Connections Cluster::getConnections(Lock&) { + Connections result(connections.size()); + std::transform(connections.begin(), connections.end(), result.begin(), + boost::bind(&ConnectionMap::value_type::second, _1)); + return result; +} + +void Cluster::deliver( cpg_handle_t /*handle*/, cpg_name* /*group*/, uint32_t nodeid, uint32_t pid, void* msg, - int msg_len) + int msg_len) { - try { - MemberId from(nodeid, pid); - Event e = Event::delivered(from, msg, msg_len); + Mutex::ScopedLock l(lock); + MemberId from(nodeid, pid); + Event e = Event::delivered(from, msg, msg_len); + if (state == LEFT) return; + QPID_LOG(trace, *this << " DLVR: " << e); + if (e.isCluster() && state != DUMPEE) // Process cluster controls immediately unless in DUMPEE state. + process(e, l); + else if (state != NEWBIE) // Newbie discards events up to the dump offer. + deliverQueue.push(e); +} + +void Cluster::process(const Event& e) { + Lock l(lock); + process(e,l); +} - // Process cluster controls immediately - if (e.getConnectionId().getConnectionPtr() == 0) { // Cluster control - Buffer buf(e); - AMQFrame frame; +void Cluster::process(const Event& e, Lock& l) { + try { + Buffer buf(e); + AMQFrame frame; + if (e.isCluster()) { while (frame.decode(buf)) { - QPID_LOG(trace, "DLVR " << e << " " << frame); - if (!handler->invoke(e.getConnectionId().getMember(), frame)) + QPID_LOG(trace, *this << " PROC: " << e << " " << frame); + ClusterDispatcher dispatch(*this, e.getMemberId(), l); + if (!framing::invoke(dispatch, *frame.getBody()).wasHandled()) throw Exception(QPID_MSG("Invalid cluster control")); } } - else { - QPID_LOG(trace, "DLVR" << (connectionEventQueue.isStopped() ? "(stalled)" : "") << " " << e); - handler->deliver(e); + else { // e.isConnection() + boost::intrusive_ptr<Connection> connection = getConnection(e.getConnectionId(), l); + if (e.getType() == DATA) { + QPID_LOG(trace, *this << " PROC: " << e); + connection->deliverBuffer(buf); + } + else { // control + while (frame.decode(buf)) { + QPID_LOG(trace, *this << " PROC: " << e << " " << frame); + connection->delivered(frame); + } + } } } catch (const std::exception& e) { - QPID_LOG(critical, "Error in cluster deliver: " << e.what()); - leave(); - } -} - -void Cluster::connectionEvent(const Event& e) { - Buffer buf(e); - boost::intrusive_ptr<Connection> connection = getConnection(e.getConnectionId()); - assert(connection); - if (e.getType() == DATA) { - QPID_LOG(trace, "EXEC: " << e); - connection->deliverBuffer(buf); - } - else { // control - AMQFrame frame; - while (frame.decode(buf)) { - QPID_LOG(trace, "EXEC " << e << " " << frame); - connection->delivered(frame); - } + QPID_LOG(critical, *this << " error in cluster process: " << e.what()); + leave(l); } } @@ -236,16 +306,22 @@ ostream& operator<<(ostream& o, const AddrList& a) { } void Cluster::dispatch(sys::DispatchHandle& h) { - cpg.dispatchAll(); - h.rewatch(); + try { + cpg.dispatchAll(); + h.rewatch(); + } + catch (const std::exception& e) { + QPID_LOG(critical, *this << " error in cluster deliver: " << e.what()); + leave(); + } } void Cluster::disconnect(sys::DispatchHandle& ) { - QPID_LOG(critical, self << " unexpectedly disconnected from cluster, shutting down"); + QPID_LOG(critical, *this << " disconnected from cluster, shutting down"); broker.shutdown(); } -void Cluster::configChange( +void Cluster::configChange ( cpg_handle_t /*handle*/, cpg_name */*group*/, cpg_address *current, int nCurrent, @@ -253,49 +329,57 @@ void Cluster::configChange( cpg_address *joined, int nJoined) { Mutex::ScopedLock l(lock); - QPID_LOG(debug, "Process members: " << AddrList(current, nCurrent) + QPID_LOG(debug, *this << " configuration change: " << AddrList(current, nCurrent) << AddrList(left, nLeft, "( ", ")")); - - if (find(left, left+nLeft, self) != left+nLeft) { - // I have left the group, this is the final config change. - QPID_LOG(notice, self << " left cluster " << name.str()); - broker.shutdown(); - return; + map.configChange(current, nCurrent, left, nLeft, joined, nJoined); + updateMemberStats(l); + if (state == LEFT) return; + if (!map.isAlive(memberId)) { leave(l); return; } + + if(state == INIT) { // First configChange + if (map.aliveCount() == 1) { + QPID_LOG(info, *this << " first in cluster at " << myUrl); + map = ClusterMap(memberId, myUrl, true); + unstall(l); + } + else { // Joining established group. + state = NEWBIE; + mcastControl(ClusterDumpRequestBody(ProtocolVersion(), myUrl.str()), 0, l); + QPID_LOG(debug, *this << " send dump-request " << myUrl); + } } - - if (map.left(left, nLeft)) updateMemberStats(); - handler->configChange(current, nCurrent, left, nLeft, joined, nJoined); } - -broker::Broker& Cluster::getBroker(){ return broker; } - -void Cluster::stall() { - Mutex::ScopedLock l(lock); - QPID_LOG(debug, self << " stalling."); - // Stop processing connection events. We still process config changes - // and cluster controls in deliver() - connectionEventQueue.stop(); - if (mgmtObject!=0) mgmtObject->set_status("STALLED"); - - // FIXME aconway 2008-09-11: Flow control, we should slow down or - // stop reading from local connections while stalled to avoid an - // unbounded queue. +void Cluster::dumpInDone(const ClusterMap& m) { + Lock l(lock); + dumpedMap = m; + checkDumpIn(l); } -void Cluster::ready() { - // Called with lock held - QPID_LOG(debug, self << " ready at " << url); - unstall(); - mcastControl(ClusterReadyBody(ProtocolVersion(), url.str()), 0); +void Cluster::tryMakeOffer(const MemberId& id, Lock& l) { + if (state == READY && map.isNewbie(id)) { + state = OFFER; + QPID_LOG(debug, *this << " send dump-offer to " << id); + mcastControl(ClusterDumpOfferBody(ProtocolVersion(), id), 0, l); + } } -void Cluster::unstall() { +void Cluster::unstall(Lock& l) { // Called with lock held - QPID_LOG(debug, self << " un-stalling"); - handler = &memberHandler; // Member mode. - connectionEventQueue.start(poller); - if (mgmtObject!=0) mgmtObject->set_status("ACTIVE"); + switch (state) { + case INIT: case DUMPEE: case DUMPER: + QPID_LOG(debug, *this << " unstall: deliver=" << deliverQueue.size() + << " mcast=" << mcastQueue.size()); + deliverQueue.start(); + state = READY; + for_each(mcastQueue.begin(), mcastQueue.end(), boost::bind(&Cluster::mcast, this, _1, boost::ref(l))); + mcastQueue.clear(); + if (mgmtObject!=0) mgmtObject->set_status("ACTIVE"); + break; + case LEFT: break; + case NEWBIE: case READY: case OFFER: + assert(0); + } } // Called from Broker::~Broker when broker is shut down. At this @@ -303,17 +387,106 @@ void Cluster::unstall() { // invoked. We must ensure that CPG has also shut down so no CPG // callbacks will be invoked. // -void Cluster::brokerShutdown() { - QPID_LOG(notice, self << " shutting down."); - try { cpg.shutdown(); } - catch (const std::exception& e) { QPID_LOG(error, "During CPG shutdown: " << e.what()); } +void Cluster::brokerShutdown() { + QPID_LOG(notice, *this << " shutting down "); + if (state != LEFT) { + try { cpg.shutdown(); } + catch (const std::exception& e) { + QPID_LOG(error, *this << " during shutdown: " << e.what()); + } + } delete this; } -ManagementObject* Cluster::GetManagementObject(void) const { return mgmtObject; } +void Cluster::dumpRequest(const MemberId& id, const std::string& url, Lock& l) { + map.dumpRequest(id, url); + tryMakeOffer(id, l); +} + +void Cluster::ready(const MemberId& id, const std::string& url, Lock&) { + map.ready(id, Url(url)); +} + +void Cluster::dumpOffer(const MemberId& dumper, uint64_t dumpeeInt, Lock& l) { + if (state == LEFT) return; + MemberId dumpee(dumpeeInt); + boost::optional<Url> url = map.dumpOffer(dumper, dumpee); + if (dumper == memberId) { + assert(state == OFFER); + if (url) { // My offer was first. + QPID_LOG(debug, *this << " mark dump point for dump to " << dumpee); + // Put dump-start on my own deliver queue to mark the stall point. + // We will stall when it is processed. + deliverQueue.push(Event::control(ClusterDumpStartBody(ProtocolVersion(), dumpee, url->str()), memberId)); + } + else { // Another offer was first. + QPID_LOG(debug, *this << " cancel dump offer to " << dumpee); + state = READY; + tryMakeOffer(map.firstNewbie(), l); // Maybe make another offer. + } + } + else if (dumpee == memberId && url) { + assert(state == NEWBIE); + QPID_LOG(debug, *this << " accepted dump-offer from " << dumper); + state = DUMPEE; + checkDumpIn(l); + } +} + +void Cluster::dumpStart(const MemberId& , uint64_t dumpeeInt, const std::string& urlStr, Lock& l) { + if (state == LEFT) return; + MemberId dumpee(dumpeeInt); + Url url(urlStr); + assert(state == OFFER); + deliverQueue.stop(); + QPID_LOG(debug, *this << " stall and dump to " << dumpee << " at " << urlStr); + state = DUMPER; + if (dumpThread.id()) dumpThread.join(); // Join the previous dumpthread. + dumpThread = Thread( + new DumpClient(memberId, dumpee, url, broker, map, getConnections(l), + boost::bind(&Cluster::dumpOutDone, this), + boost::bind(&Cluster::dumpOutError, this, _1))); +} + +void Cluster::checkDumpIn(Lock& l) { + if (state == LEFT) return; + assert(state == DUMPEE || state == NEWBIE); + if (state == DUMPEE && dumpedMap) { + map = *dumpedMap; + QPID_LOG(debug, *this << " incoming dump complete. Members: " << map); + unstall(l); + mcastControl(ClusterReadyBody(ProtocolVersion(), myUrl.str()), 0, l); + } +} + +void Cluster::dumpOutDone() { + Monitor::ScopedLock l(lock); + dumpOutDone(l); +} + +void Cluster::dumpOutDone(Lock& l) { + QPID_LOG(debug, *this << " finished sending dump."); + assert(state == DUMPER); + unstall(l); + tryMakeOffer(map.firstNewbie(), l); // Try another offer +} + +void Cluster::dumpOutError(const std::exception& e) { + Monitor::ScopedLock l(lock); + QPID_LOG(error, *this << " error sending state dump: " << e.what()); + dumpOutDone(l); +} + +void Cluster ::shutdown(const MemberId& id, Lock& l) { + QPID_LOG(notice, *this << " received shutdown from " << id); + leave(l); +} + +ManagementObject* Cluster::GetManagementObject() const { return mgmtObject; } Manageable::status_t Cluster::ManagementMethod (uint32_t methodId, Args&, string&) { - QPID_LOG (debug, "Queue::ManagementMethod [id=" << methodId << "]"); + Lock l(lock); + QPID_LOG (debug, *this << " managementMethod [id=" << methodId << "]"); switch (methodId) { case qmf::Cluster::METHOD_STOPCLUSTERNODE: stopClusterNode(); break; case qmf::Cluster::METHOD_STOPFULLCLUSTER: stopFullCluster(); break; @@ -322,30 +495,32 @@ Manageable::status_t Cluster::ManagementMethod (uint32_t methodId, Args&, string return Manageable::STATUS_OK; } -void Cluster::stopClusterNode(void) { - QPID_LOG(notice, self << " stopped by admin"); +void Cluster::stopClusterNode() { + QPID_LOG(notice, *this << " stopped by admin"); leave(); } -void Cluster::stopFullCluster(void) { - QPID_LOG(notice, self << " sending shutdown to cluster."); - mcastControl(ClusterShutdownBody(), 0); +void Cluster::stopFullCluster() { + Lock l(lock); + QPID_LOG(notice, *this << " shutting down cluster " << name.str()); + mcastControl(ClusterShutdownBody(), 0, l); } -void Cluster::updateMemberStats() { +void Cluster::updateMemberStats(Lock& l) { if (mgmtObject) { - if (lastSize != size() && size() ==1){ - QPID_LOG(info, "Last node standing, updating queue policies, size:" <<size()); - broker.getQueues().updateQueueClusterState(true); - lastSize = size(); - }else if (lastSize != size() && size() > 1) { - QPID_LOG(info, "Recover back from last node standing, updating queue policies, size:" <<size()); - broker.getQueues().updateQueueClusterState(false); - lastSize = size(); - } + std::vector<Url> vectUrl = getUrls(l); + size_t size = vectUrl.size(); + if (lastSize != size && size == 1){ + QPID_LOG(info, *this << " last node standing, updating queue policies."); + broker.getQueues().updateQueueClusterState(true); + } + else if (lastSize != size && size > 1) { + QPID_LOG(info, *this << " recovered from last node standing, updating queue policies, size:" << size); + broker.getQueues().updateQueueClusterState(false); + } + lastSize = size; - mgmtObject->set_clusterSize(size()); - std::vector<Url> vectUrl = getUrls(); + mgmtObject->set_clusterSize(size); string urlstr; for(std::vector<Url>::iterator iter = vectUrl.begin(); iter != vectUrl.end(); iter++ ) { if (iter != vectUrl.begin()) urlstr += "\n"; @@ -355,4 +530,17 @@ void Cluster::updateMemberStats() { } } +std::ostream& operator<<(std::ostream& o, const Cluster& cluster) { + static const char* STATE[] = { "INIT", "NEWBIE", "DUMPEE", "READY", "OFFER", "DUMPER", "LEFT" }; + return o << cluster.memberId << "(" << STATE[cluster.state] << ")"; +} + +MemberId Cluster::getId() const { + return memberId; // Immutable, no need to lock. +} + +broker::Broker& Cluster::getBroker() const { + return broker; // Immutable, no need to lock. +} + }} // namespace qpid::cluster diff --git a/cpp/src/qpid/cluster/Cluster.h b/cpp/src/qpid/cluster/Cluster.h index a8c916a99b..d1cf4b752f 100644 --- a/cpp/src/qpid/cluster/Cluster.h +++ b/cpp/src/qpid/cluster/Cluster.h @@ -23,18 +23,18 @@ #include "Event.h" #include "NoOpConnectionOutputHandler.h" #include "ClusterMap.h" -#include "JoiningHandler.h" -#include "MemberHandler.h" #include "qpid/broker/Broker.h" #include "qpid/sys/PollableQueue.h" #include "qpid/sys/Monitor.h" -#include "qpid/Url.h" +#include "qpid/sys/LockPtr.h" #include "qpid/management/Manageable.h" +#include "qpid/Url.h" #include "qmf/org/apache/qpid/cluster/Cluster.h" #include <boost/intrusive_ptr.hpp> #include <boost/bind.hpp> +#include <boost/optional.hpp> #include <algorithm> #include <vector> @@ -47,12 +47,13 @@ class Connection; /** * Connection to the cluster. - * Keeps cluster membership data. + * */ -class Cluster : private Cpg::Handler, public management::Manageable -{ +class Cluster : private Cpg::Handler, public management::Manageable { public: - + typedef boost::intrusive_ptr<Connection> ConnectionPtr; + typedef std::vector<ConnectionPtr> Connections; + /** * Join a cluster. * @param name of the cluster. @@ -62,58 +63,68 @@ class Cluster : private Cpg::Handler, public management::Manageable virtual ~Cluster(); - // FIXME aconway 2008-09-26: thread safety - void insert(const boost::intrusive_ptr<Connection>&); + // Connection map + void insert(const ConnectionPtr&); void erase(ConnectionId); - void dumpComplete(); - - /** Get the URLs of current cluster members. */ - std::vector<Url> getUrls() const; - - /** Number of members in the cluster. */ - size_t size() const; - - bool empty() const { return size() == 0; } - /** Send to the cluster */ - void mcastControl(const framing::AMQBody& controlBody, Connection* cptr); + // Send to the cluster + void mcastControl(const framing::AMQBody& controlBody, Connection* cptr=0); void mcastBuffer(const char*, size_t, const ConnectionId&, uint32_t id); - void mcastEvent(const Event& e); - - /** Leave the cluster */ - void leave(); + void mcast(const Event& e); - MemberId getSelf() const { return self; } - MemberId getId() const { return self; } + // URLs of current cluster members. + std::vector<Url> getUrls() const; - void ready(); - void stall(); - void unstall(); + // Leave the cluster + void leave(); - void brokerShutdown(); + // Dump completedJo + void dumpInDone(const ClusterMap&); - broker::Broker& getBroker(); + MemberId getId() const; + broker::Broker& getBroker() const; - template <class F> void eachConnection(const F& f) { - for (ConnectionMap::const_iterator i = connections.begin(); i != connections.end(); ++i) - f(i->second); - } - private: + typedef sys::LockPtr<Cluster,sys::Monitor> LockPtr; + typedef sys::LockPtr<const Cluster,sys::Monitor> ConstLockPtr; + typedef sys::Monitor::ScopedLock Lock; + typedef std::map<ConnectionId, boost::intrusive_ptr<cluster::Connection> > ConnectionMap; - typedef sys::PollableQueue<Event> EventQueue; - enum State { - START, // Start state, no cluster update received yet. - DISCARD, // Discard updates up to dump start point. - CATCHUP, // Stalled at catchup point, waiting for dump. - DUMPING, // Stalled while sending a state dump. - READY // Normal processing. - }; - - void connectionEvent(const Event&); - - /** CPG deliver callback. */ - void deliver( + typedef sys::PollableQueue<Event> PollableEventQueue; + typedef std::deque<Event> PlainEventQueue; + + // Unlocked versions of public functions + void mcastControl(const framing::AMQBody& controlBody, Connection* cptr, Lock&); + void mcastBuffer(const char*, size_t, const ConnectionId&, uint32_t id, Lock&); + void mcast(const Event& e, Lock&); + void leave(Lock&); + std::vector<Url> getUrls(Lock&) const; + + // Called via CPG, deliverQueue or DumpClient threads. + void tryMakeOffer(const MemberId&, Lock&); + + // Called in CPG, connection IO and DumpClient threads. + void unstall(Lock&); + + // Called in main thread in ~Broker. + void brokerShutdown(); + + // Cluster controls implement XML methods from cluster.xml. + // May be called in CPG thread via deliver() OR in deliverQueue thread. + // + void dumpRequest(const MemberId&, const std::string&, Lock&); + void dumpOffer(const MemberId& dumper, uint64_t dumpee, Lock&); + void dumpStart(const MemberId& dumper, uint64_t dumpeeInt, const std::string& urlStr, Lock&); + void ready(const MemberId&, const std::string&, Lock&); + void shutdown(const MemberId&, Lock&); + void process(const Event&); // deliverQueue callback + void process(const Event&, Lock&); // unlocked version + + // CPG callbacks, called in CPG IO thread. + void dispatch(sys::DispatchHandle&); // Dispatch CPG events. + void disconnect(sys::DispatchHandle&); // PG was disconnected + + void deliver( // CPG deliver callback. cpg_handle_t /*handle*/, struct cpg_name *group, uint32_t /*nodeid*/, @@ -121,8 +132,7 @@ class Cluster : private Cpg::Handler, public management::Manageable void* /*msg*/, int /*msg_len*/); - /** CPG config change callback */ - void configChange( + void configChange( // CPG config change callback. cpg_handle_t /*handle*/, struct cpg_name */*group*/, struct cpg_address */*members*/, int /*nMembers*/, @@ -130,45 +140,50 @@ class Cluster : private Cpg::Handler, public management::Manageable struct cpg_address */*joined*/, int /*nJoined*/ ); - /** Callback to dispatch CPG events. */ - void dispatch(sys::DispatchHandle&); - /** Callback if CPG fd is disconnected. */ - void disconnect(sys::DispatchHandle&); + boost::intrusive_ptr<cluster::Connection> getConnection(const ConnectionId&, Lock&); + Connections getConnections(Lock&); - boost::intrusive_ptr<cluster::Connection> getConnection(const ConnectionId&); - - virtual qpid::management::ManagementObject* GetManagementObject(void) const; + virtual qpid::management::ManagementObject* GetManagementObject() const; virtual management::Manageable::status_t ManagementMethod (uint32_t methodId, management::Args& args, std::string& text); - void stopClusterNode(void); - void stopFullCluster(void); - void updateMemberStats(void); + void stopClusterNode(); + void stopFullCluster(); + void updateMemberStats(Lock&); + + // Called in connection IO threads . + void checkDumpIn(Lock&); + + // Called in DumpClient thread. + void dumpOutDone(); + void dumpOutError(const std::exception&); + void dumpOutDone(Lock&); + + mutable sys::Monitor lock; - mutable sys::Monitor lock; // Protect access to members. broker::Broker& broker; boost::shared_ptr<sys::Poller> poller; Cpg cpg; - Cpg::Name name; - Url url; - ClusterMap map; - MemberId self; + const Cpg::Name name; + const Url myUrl; + const MemberId memberId; + ConnectionMap connections; NoOpConnectionOutputHandler shadowOut; sys::DispatchHandle cpgDispatchHandle; - EventQueue connectionEventQueue; - State state; - qmf::org::apache::qpid::cluster::Cluster* mgmtObject; // mgnt owns lifecycle + PollableEventQueue deliverQueue; + PlainEventQueue mcastQueue; + uint32_t mcastId; - // Handlers for different states. - ClusterHandler* handler; - JoiningHandler joiningHandler; - MemberHandler memberHandler; + qmf::org::apache::qpid::cluster::Cluster* mgmtObject; // mgnt owns lifecycle - uint32_t mcastId; - size_t lastSize; + enum { INIT, NEWBIE, DUMPEE, READY, OFFER, DUMPER, LEFT } state; + ClusterMap map; + sys::Thread dumpThread; + boost::optional<ClusterMap> dumpedMap; + + size_t lastSize; - friend class ClusterHandler; - friend class JoiningHandler; - friend class MemberHandler; + friend std::ostream& operator<<(std::ostream&, const Cluster&); + friend class ClusterDispatcher; }; }} // namespace qpid::cluster diff --git a/cpp/src/qpid/cluster/ClusterHandler.cpp b/cpp/src/qpid/cluster/ClusterHandler.cpp deleted file mode 100644 index 7413f27192..0000000000 --- a/cpp/src/qpid/cluster/ClusterHandler.cpp +++ /dev/null @@ -1,62 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include "Cluster.h" -#include "ClusterHandler.h" - -#include "qpid/framing/AMQFrame.h" -#include "qpid/framing/AllInvoker.h" -#include "qpid/framing/FieldTable.h" -#include "qpid/log/Statement.h" - - - -namespace qpid { -namespace cluster { - -struct Operations : public framing::AMQP_AllOperations::ClusterHandler { - qpid::cluster::ClusterHandler& handler; - MemberId member; - Operations(qpid::cluster::ClusterHandler& c, const MemberId& id) : handler(c), member(id) {} - - void update(const framing::FieldTable& members, uint64_t dumping) { handler.update(member, members, dumping); } - void dumpRequest(const std::string& url) { handler.dumpRequest(member, url); } - void ready(const std::string& url) { handler.ready(member, url); } - void shutdown() { handler.shutdown(member); } -}; - -ClusterHandler::~ClusterHandler() {} - -ClusterHandler::ClusterHandler(Cluster& c) : cluster (c) {} - -bool ClusterHandler::invoke(const MemberId& id, framing::AMQFrame& frame) { - Operations ops(*this, id); - return framing::invoke(ops, *frame.getBody()).wasHandled(); -} - -void ClusterHandler::shutdown(const MemberId& id) { - QPID_LOG(notice, cluster.self << " received shutdown from " << id); - cluster.leave(); -} - - -}} // namespace qpid::cluster - diff --git a/cpp/src/qpid/cluster/ClusterHandler.h b/cpp/src/qpid/cluster/ClusterHandler.h deleted file mode 100644 index d8bcaa8fe8..0000000000 --- a/cpp/src/qpid/cluster/ClusterHandler.h +++ /dev/null @@ -1,70 +0,0 @@ -#ifndef QPID_CLUSTER_CLUSTERHANDLER_H -#define QPID_CLUSTER_CLUSTERHANDLER_H - -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include "Cpg.h" -#include "types.h" -#include <boost/intrusive_ptr.hpp> - -namespace qpid { - -namespace framing { class AMQFrame; } - -namespace cluster { - -class Connection; -class Cluster; -class Event; - -/** - * Interface for handing cluster events. - * Implementations provide different behavior for different states of a member.. - */ -class ClusterHandler -{ - public: - ClusterHandler(Cluster& c); - virtual ~ClusterHandler(); - - bool invoke(const MemberId&, framing::AMQFrame& f); - - virtual void update(const MemberId&, const framing::FieldTable& members, uint64_t dumping) = 0; - virtual void dumpRequest(const MemberId&, const std::string& url) = 0; - virtual void ready(const MemberId&, const std::string& url) = 0; - virtual void shutdown(const MemberId&); - - virtual void deliver(Event& e) = 0; // Deliver a connection event. - - virtual void configChange(cpg_address *current, int nCurrent, - cpg_address *left, int nLeft, - cpg_address *joined, int nJoined) = 0; - - virtual void dumpComplete() = 0; - - protected: - Cluster& cluster; -}; - -}} // namespace qpid::cluster - -#endif /*!QPID_CLUSTER_CLUSTERHANDLER_H*/ diff --git a/cpp/src/qpid/cluster/ClusterMap.cpp b/cpp/src/qpid/cluster/ClusterMap.cpp index b5b71cd397..f3b5451afb 100644 --- a/cpp/src/qpid/cluster/ClusterMap.cpp +++ b/cpp/src/qpid/cluster/ClusterMap.cpp @@ -33,73 +33,118 @@ using namespace framing; namespace cluster { -ClusterMap::ClusterMap() {} +namespace { +void insertSet(ClusterMap::Set& set, const ClusterMap::Map::value_type& v) { set.insert(v.first); } -MemberId ClusterMap::first() const { - return (members.empty()) ? MemberId() : members.begin()->first; +void insertMap(ClusterMap::Map& map, FieldTable::ValueMap::value_type vt) { + map.insert(ClusterMap::Map::value_type(vt.first, Url(vt.second->get<std::string>()))); } -bool ClusterMap::left(const cpg_address* addrs, size_t nLeft) { - bool changed=false; - for (const cpg_address* a = addrs; a < addrs+nLeft; ++a) - changed = members.erase(*a) || changed; - if (dumper && !isMember(dumper)) - dumper = MemberId(); - QPID_LOG_IF(debug, changed, "Members left. " << *this); - return changed; +void assignMap(ClusterMap::Map& map, const FieldTable& ft) { + map.clear(); + std::for_each(ft.begin(), ft.end(), boost::bind(&insertMap, boost::ref(map), _1)); } -framing::ClusterUpdateBody ClusterMap::toControl() const { - framing::ClusterUpdateBody b; - for (Members::const_iterator i = members.begin(); i != members.end(); ++i) - b.getMembers().setString(i->first.str(), i->second.str()); - b.setDumper(dumper); - return b; +void insertFieldTable(FieldTable& ft, const ClusterMap::Map::value_type& vt) { + return ft.setString(vt.first.str(), vt.second.str()); +} + +void assignFieldTable(FieldTable& ft, const ClusterMap::Map& map) { + ft.clear(); + std::for_each(map.begin(), map.end(), boost::bind(&insertFieldTable, boost::ref(ft), _1)); +} +} + +ClusterMap::ClusterMap() {} + +ClusterMap::ClusterMap(const MemberId& id, const Url& url , bool isMember) { + alive.insert(id); + if (isMember) + members[id] = url; + else + newbies[id] = url; } -bool ClusterMap::update(const framing::FieldTable& ftMembers, uint64_t dumper_) { - dumper = MemberId(dumper_); - bool changed = false; - framing:: FieldTable::ValueMap::const_iterator i; - for (i = ftMembers.begin(); i != ftMembers.end(); ++i) { - MemberId id(i->first); - Url url(i->second->get<std::string>()); - changed = members.insert(Members::value_type(id, url)).second || changed; +ClusterMap::ClusterMap(const FieldTable& newbiesFt, const FieldTable& membersFt) { + assignMap(newbies, newbiesFt); + assignMap(members, membersFt); + std::for_each(newbies.begin(), newbies.end(), boost::bind(&insertSet, boost::ref(alive), _1)); + std::for_each(members.begin(), members.end(), boost::bind(&insertSet, boost::ref(alive), _1)); +} + +void ClusterMap::configChange( + cpg_address *current, int nCurrent, + cpg_address *left, int nLeft, + cpg_address */*joined*/, int /*nJoined*/) +{ + cpg_address* a; + for (a = left; a != left+nLeft; ++a) { + members.erase(*a); + newbies.erase(*a); } - QPID_LOG_IF(debug, changed, "Update: " << *this); - return changed; + alive.clear(); + std::copy(current, current+nCurrent, std::inserter(alive, alive.end())); +} + +Url ClusterMap::getUrl(const Map& map, const MemberId& id) { + Map::const_iterator i = map.find(id); + return i == map.end() ? Url() : i->second; +} + +MemberId ClusterMap::firstNewbie() const { + return newbies.empty() ? MemberId() : newbies.begin()->first; +} + +ClusterConnectionMembershipBody ClusterMap::asMethodBody() const { + framing::ClusterConnectionMembershipBody b; + assignFieldTable(b.getNewbies(), newbies); + assignFieldTable(b.getMembers(), members); + return b; } std::vector<Url> ClusterMap::memberUrls() const { - std::vector<Url> result(size()); - std::transform(members.begin(), members.end(), result.begin(), - boost::bind(&Members::value_type::second, _1)); - return result; + std::vector<Url> urls(members.size()); + std::transform(members.begin(), members.end(), urls.begin(), + boost::bind(&Map::value_type::second, _1)); + return urls; +} + +std::ostream& operator<<(std::ostream& o, const ClusterMap::Map& m) { + std::ostream_iterator<MemberId> oi(o); + std::transform(m.begin(), m.end(), oi, boost::bind(&ClusterMap::Map::value_type::first, _1)); + return o; } std::ostream& operator<<(std::ostream& o, const ClusterMap& m) { - o << "Broker members:"; - for (ClusterMap::Members::const_iterator i=m.members.begin(); i != m.members.end(); ++i) { - o << " " << i->first; - if (i->first == m.dumper) o << "(dumping)"; + for (ClusterMap::Set::const_iterator i = m.alive.begin(); i != m.alive.end(); ++i) { + o << *i; + if (m.isMember(*i)) o << "(member)"; + if (m.isNewbie(*i)) o << "(newbie)"; + o << " "; } return o; } -bool ClusterMap::sendUpdate(const MemberId& id) const { - return dumper==id || (!dumper && first() == id); +bool ClusterMap::dumpRequest(const MemberId& id, const std::string& url) { + if (isAlive(id)) { + newbies[id] = Url(url); + return true; + } + return false; +} + +void ClusterMap::ready(const MemberId& id, const Url& url) { + if (isAlive(id)) members[id] = url; } -bool ClusterMap::ready(const MemberId& id, const Url& url) { - bool changed = members.insert(Members::value_type(id,url)).second; - if (id == dumper) { - dumper = MemberId(); - QPID_LOG(info, id << " finished dump. " << *this); - } - else { - QPID_LOG(info, id << " joined, url=" << url << ". " << *this); +boost::optional<Url> ClusterMap::dumpOffer(const MemberId& from, const MemberId& to) { + Map::iterator i = newbies.find(to); + if (isAlive(from) && i != newbies.end()) { + Url url= i->second; + newbies.erase(i); // No longer a potential dumpee. + return url; } - return changed; + return boost::none; } }} // namespace qpid::cluster diff --git a/cpp/src/qpid/cluster/ClusterMap.h b/cpp/src/qpid/cluster/ClusterMap.h index 60fef75f0e..79afba7dc0 100644 --- a/cpp/src/qpid/cluster/ClusterMap.h +++ b/cpp/src/qpid/cluster/ClusterMap.h @@ -23,58 +23,68 @@ */ #include "types.h" -#include "qpid/framing/ClusterUpdateBody.h" #include "qpid/Url.h" +#include "qpid/framing/ClusterConnectionMembershipBody.h" + #include <boost/function.hpp> +#include <boost/optional.hpp> + #include <vector> #include <deque> #include <map> +#include <set> #include <iosfwd> namespace qpid { namespace cluster { /** - * Map of established cluster members and brain-dumps in progress. - * A dumper is an established member that is sending catch-up data. - * A dumpee is an aspiring member that is receiving catch-up data. + * Map of established cluster members and newbies waiting for a brain dump. */ class ClusterMap { public: - typedef std::map<MemberId, Url> Members; - Members members; - MemberId dumper; + typedef std::map<MemberId, Url> Map; + typedef std::set<MemberId> Set; ClusterMap(); - - /** First member of the cluster in ID order, gets to perform one-off tasks. */ - MemberId first() const; + ClusterMap(const MemberId& id, const Url& url, bool isReady); + ClusterMap(const framing::FieldTable& urls, const framing::FieldTable& states); - /** Update for members leaving. - *@return true if the cluster membership changed. - */ - bool left(const cpg_address* addrs, size_t size); + /** Update from config change. */ + void configChange( + cpg_address *current, int nCurrent, + cpg_address *left, int nLeft, + cpg_address *joined, int nJoined); - /** Convert map contents to a cluster update body. */ - framing::ClusterUpdateBody toControl() const; + bool isNewbie(const MemberId& id) const { return newbies.find(id) != newbies.end(); } + bool isMember(const MemberId& id) const { return members.find(id) != members.end(); } + bool isAlive(const MemberId& id) const { return alive.find(id) != alive.end(); } + + Url getNewbieUrl(const MemberId& id) { return getUrl(newbies, id); } + Url getMemberUrl(const MemberId& id) { return getUrl(members, id); } - /** Add a new member or dump complete if id == dumper. */ - bool ready(const MemberId& id, const Url& url); + /** First newbie in the cluster in ID order, target for offers */ + MemberId firstNewbie() const; - /** Apply update delivered from cluster. - *@return true if cluster membership changed. - **/ - bool update(const framing::FieldTable& members, uint64_t dumper); + /** Convert map contents to a cluster control body. */ + framing::ClusterConnectionMembershipBody asMethodBody() const; - bool isMember(const MemberId& id) const { return members.find(id) != members.end(); } - - bool sendUpdate(const MemberId& id) const; // True if id should send an update. + size_t aliveCount() const { return alive.size(); } + size_t memberCount() const { return members.size(); } std::vector<Url> memberUrls() const; - size_t size() const { return members.size(); } - - bool empty() const { return members.empty(); } + + bool dumpRequest(const MemberId& id, const std::string& url); + /** Return non-empty Url if accepted */ + boost::optional<Url> dumpOffer(const MemberId& from, const MemberId& to); + void ready(const MemberId& id, const Url&); + private: + Url getUrl(const Map& map, const MemberId& id); + + Map newbies, members; + Set alive; + friend std::ostream& operator<<(std::ostream&, const Map&); friend std::ostream& operator<<(std::ostream&, const ClusterMap&); }; diff --git a/cpp/src/qpid/cluster/Connection.cpp b/cpp/src/qpid/cluster/Connection.cpp index 6aab31c177..ae731ed25e 100644 --- a/cpp/src/qpid/cluster/Connection.cpp +++ b/cpp/src/qpid/cluster/Connection.cpp @@ -77,10 +77,10 @@ void Connection::received(framing::AMQFrame& f) { QPID_LOG(trace, "RECV " << *this << ": " << f); if (isShadow()) { // Intercept the close that completes catch-up for shadow a connection. - if (catchUp && f.getMethod() && f.getMethod()->isA<ConnectionCloseBody>()) { + if (isShadow() && f.getMethod() && f.getMethod()->isA<ConnectionCloseBody>()) { catchUp = false; - AMQFrame ok(in_place<ConnectionCloseOkBody>()); cluster.insert(boost::intrusive_ptr<Connection>(this)); + AMQFrame ok(in_place<ConnectionCloseOkBody>()); connection.getOutput().send(ok); output.setOutputHandler(discardHandler); } @@ -107,7 +107,11 @@ void Connection::delivered(framing::AMQFrame& f) { void Connection::closed() { try { QPID_LOG(debug, "Connection closed " << *this); - if (catchUp) + if (catchUp) { + QPID_LOG(critical, cluster << " error on catch-up connection " << *this); + cluster.leave(); + } + else if (isDump()) connection.closed(); else if (isLocal()) { // This was a local replicated connection. Multicast a deliver @@ -131,7 +135,7 @@ void Connection::deliverClose () { // Decode data from local clients. size_t Connection::decode(const char* buffer, size_t size) { - if (catchUp) { // Handle catch-up locally. + if (catchUp || isDump()) { // Handle catch-up locally. Buffer buf(const_cast<char*>(buffer), size); while (localDecoder.decode(buf)) received(localDecoder.frame); @@ -179,11 +183,13 @@ void Connection::shadowReady(uint64_t memberId, uint64_t connectionId) { self = shadow; } -void Connection::dumpComplete() { - cluster.dumpComplete(); +void Connection::membership(const FieldTable& urls, const FieldTable& states) { + cluster.dumpInDone(ClusterMap(urls,states)); + catchUp = false; + self.second = 0; // Mark this as completed dump connection. } -bool Connection::isLocal() const { return self.first == cluster.getSelf() && self.second == this; } +bool Connection::isLocal() const { return self.first == cluster.getId() && self.second == this; } std::ostream& operator<<(std::ostream& o, const Connection& c) { return o << c.getId() << "(" << (c.isLocal() ? "local" : "shadow") diff --git a/cpp/src/qpid/cluster/Connection.h b/cpp/src/qpid/cluster/Connection.h index df3c035c8a..7d92987e01 100644 --- a/cpp/src/qpid/cluster/Connection.h +++ b/cpp/src/qpid/cluster/Connection.h @@ -69,6 +69,8 @@ class Connection : /** True if the connection is in "catch-up" mode: building initial broker state. */ bool isCatchUp() const { return catchUp; } + bool isDump() const { return self.getPointer() == 0; } + Cluster& getCluster() { return cluster; } // ConnectionInputHandler methods @@ -98,7 +100,7 @@ class Connection : void shadowReady(uint64_t memberId, uint64_t connectionId); - void dumpComplete(); + void membership(const framing::FieldTable&, const framing::FieldTable&); private: diff --git a/cpp/src/qpid/cluster/ConnectionCodec.cpp b/cpp/src/qpid/cluster/ConnectionCodec.cpp index 1458a87923..44e40f0591 100644 --- a/cpp/src/qpid/cluster/ConnectionCodec.cpp +++ b/cpp/src/qpid/cluster/ConnectionCodec.cpp @@ -52,17 +52,20 @@ ConnectionCodec::Factory::create(sys::OutputControl& out, const std::string& id) ConnectionCodec::ConnectionCodec(sys::OutputControl& out, const std::string& id, Cluster& cluster, bool catchUp) : codec(out, id, false), - interceptor(new Connection(cluster, codec, id, cluster.getSelf(), catchUp)), - id(interceptor->getId()) + interceptor(new Connection(cluster, codec, id, cluster.getId(), catchUp)), + id(interceptor->getId()), + localId(id) { std::auto_ptr<sys::ConnectionInputHandler> ih(new ProxyInputHandler(interceptor)); codec.setInputHandler(ih); - cluster.insert(interceptor); + if (!catchUp) // Don't put catchUp connections in the cluster map. + cluster.insert(interceptor); } ConnectionCodec::~ConnectionCodec() {} size_t ConnectionCodec::decode(const char* buffer, size_t size) { + QPID_LOG(trace, "RECVB [" << localId << "]: " << size << " bytes"); return interceptor->decode(buffer, size); } diff --git a/cpp/src/qpid/cluster/ConnectionCodec.h b/cpp/src/qpid/cluster/ConnectionCodec.h index e6ab7f5ba1..86fac270fa 100644 --- a/cpp/src/qpid/cluster/ConnectionCodec.h +++ b/cpp/src/qpid/cluster/ConnectionCodec.h @@ -72,6 +72,7 @@ class ConnectionCodec : public sys::ConnectionCodec { amqp_0_10::Connection codec; boost::intrusive_ptr<cluster::Connection> interceptor; cluster::ConnectionId id; + std::string localId; }; }} // namespace qpid::cluster diff --git a/cpp/src/qpid/cluster/DumpClient.cpp b/cpp/src/qpid/cluster/DumpClient.cpp index 59542a2e95..ed339b2f85 100644 --- a/cpp/src/qpid/cluster/DumpClient.cpp +++ b/cpp/src/qpid/cluster/DumpClient.cpp @@ -20,6 +20,7 @@ */ #include "DumpClient.h" #include "Cluster.h" +#include "ClusterMap.h" #include "Connection.h" #include "qpid/client/SessionBase_0_10Access.h" #include "qpid/broker/Broker.h" @@ -31,7 +32,7 @@ #include "qpid/broker/SessionHandler.h" #include "qpid/broker/SessionState.h" #include "qpid/framing/MessageTransferBody.h" -#include "qpid/framing/ClusterConnectionDumpCompleteBody.h" +#include "qpid/framing/ClusterConnectionMembershipBody.h" #include "qpid/framing/ClusterConnectionShadowReadyBody.h" #include "qpid/framing/enum.h" #include "qpid/framing/ProtocolVersion.h" @@ -63,6 +64,10 @@ struct ClusterConnectionProxy : public AMQP_AllProxy::ClusterConnection { ClusterConnectionProxy(client::Connection& c) : AMQP_AllProxy::ClusterConnection(*client::ConnectionAccess::getImpl(c)) {} }; +struct ClusterProxy : public AMQP_AllProxy::Cluster { + ClusterProxy(client::Connection& c) : + AMQP_AllProxy::Cluster(*client::ConnectionAccess::getImpl(c)) {} +}; // Create a connection with special version that marks it as a catch-up connection. @@ -80,10 +85,11 @@ void send(client::Session& s, const AMQBody& body) { // TODO aconway 2008-09-24: optimization: dump connections/sessions in parallel. -DumpClient::DumpClient(const Url& url, Cluster& c, +DumpClient::DumpClient(const MemberId& from, const MemberId& to, const Url& url, + broker::Broker& broker, const ClusterMap& m, const Cluster::Connections& cons, const boost::function<void()>& ok, const boost::function<void(const std::exception&)>& fail) - : receiver(url), donor(c), + : dumperId(to), dumpeeId(from), dumpeeUrl(url), dumperBroker(broker), map(m), connections(cons), connection(catchUpConnection()), shadowConnection(catchUpConnection()), done(ok), failed(fail) { @@ -98,18 +104,19 @@ static const char CATCH_UP_CHARS[] = "\000qpid-dump-exchange"; static const std::string CATCH_UP(CATCH_UP_CHARS, sizeof(CATCH_UP_CHARS)); void DumpClient::dump() { - QPID_LOG(debug, donor.getSelf() << " starting dump to " << receiver); - Broker& b = donor.getBroker(); + QPID_LOG(debug, dumperId << " dumping state to " << dumpeeId << " at " << dumpeeUrl); + Broker& b = dumperBroker; b.getExchanges().eachExchange(boost::bind(&DumpClient::dumpExchange, this, _1)); // Catch-up exchange is used to route messages to the proper queue without modifying routing key. session.exchangeDeclare(arg::exchange=CATCH_UP, arg::type="fanout", arg::autoDelete=true); b.getQueues().eachQueue(boost::bind(&DumpClient::dumpQueue, this, _1)); session.sync(); session.close(); - donor.eachConnection(boost::bind(&DumpClient::dumpConnection, this, _1)); - ClusterConnectionProxy(connection).dumpComplete(); + std::for_each(connections.begin(), connections.end(), boost::bind(&DumpClient::dumpConnection, this, _1)); + AMQFrame frame(map.asMethodBody()); + client::ConnectionAccess::getImpl(connection)->handle(frame); connection.close(); - QPID_LOG(debug, donor.getSelf() << " dumped all state to " << receiver); + QPID_LOG(debug, dumperId << " dumped state to " << dumpeeId << " at " << dumpeeUrl); } void DumpClient::run() { @@ -160,25 +167,22 @@ void DumpClient::dumpBinding(const std::string& queue, const QueueBinding& bindi } void DumpClient::dumpConnection(const boost::intrusive_ptr<Connection>& dumpConnection) { + QPID_LOG(debug, dumperId << " dumping connection " << *dumpConnection); shadowConnection = catchUpConnection(); broker::Connection& bc = dumpConnection->getBrokerConnection(); // FIXME aconway 2008-09-19: Open with identical settings to dumpConnection: password, vhost, frame size, // authentication etc. See ConnectionSettings. - shadowConnection.open(receiver, bc.getUserId()); + shadowConnection.open(dumpeeUrl, bc.getUserId()); dumpConnection->getBrokerConnection().eachSessionHandler(boost::bind(&DumpClient::dumpSession, this, _1)); ClusterConnectionProxy(shadowConnection).shadowReady( dumpConnection->getId().getMember(), - reinterpret_cast<uint64_t>(dumpConnection->getId().getConnectionPtr())); + reinterpret_cast<uint64_t>(dumpConnection->getId().getPointer())); shadowConnection.close(); - QPID_LOG(debug, donor.getId() << " dumped connection " << *dumpConnection); + QPID_LOG(debug, dumperId << " dumped connection " << *dumpConnection); } -// FIXME aconway 2008-09-26: REMOVE -void foo(broker::SemanticState::ConsumerImpl*) {} - - void DumpClient::dumpSession(broker::SessionHandler& sh) { - QPID_LOG(debug, donor.getId() << " dumping session " << &sh.getConnection() << "[" << sh.getChannel() << "] = " + QPID_LOG(debug, dumperId << " dumping session " << &sh.getConnection() << "[" << sh.getChannel() << "] = " << sh.getSession()->getId()); broker::SessionState* s = sh.getSession(); if (!s) return; // no session. @@ -214,17 +218,18 @@ void DumpClient::dumpSession(broker::SessionHandler& sh) { // FIXME aconway 2008-09-23: session replay list. - QPID_LOG(debug, donor.getId() << " dumped session " << sh.getSession()->getId()); + QPID_LOG(debug, dumperId << " dumped session " << sh.getSession()->getId()); } void DumpClient::dumpConsumer(broker::SemanticState::ConsumerImpl* ci) { + QPID_LOG(debug, dumperId << " dumping consumer " << ci->getName() << " on " << shadowSession.getId()); using namespace message; shadowSession.messageSubscribe( arg::queue = ci->getQueue()->getName(), arg::destination = ci->getName(), arg::acceptMode = ci->isAckExpected() ? ACCEPT_MODE_EXPLICIT : ACCEPT_MODE_NONE, arg::acquireMode = ci->isAcquire() ? ACQUIRE_MODE_PRE_ACQUIRED : ACQUIRE_MODE_NOT_ACQUIRED, - arg::exclusive = false , // FIXME aconway 2008-09-23: how to read. + arg::exclusive = false , // FIXME aconway 2008-09-23: duplicate from consumer // TODO aconway 2008-09-23: remaining args not used by current broker. // Update this code when they are. @@ -236,7 +241,7 @@ void DumpClient::dumpConsumer(broker::SemanticState::ConsumerImpl* ci) { shadowSession.messageFlow(ci->getName(), CREDIT_UNIT_MESSAGE, ci->getMsgCredit()); shadowSession.messageFlow(ci->getName(), CREDIT_UNIT_BYTE, ci->getByteCredit()); // FIXME aconway 2008-09-23: need to replicate ConsumerImpl::blocked and notifyEnabled? - QPID_LOG(debug, donor.getId() << " dumped consumer " << ci->getName() << " on " << shadowSession.getId()); + QPID_LOG(debug, dumperId << " dumped consumer " << ci->getName() << " on " << shadowSession.getId()); } }} // namespace qpid::cluster diff --git a/cpp/src/qpid/cluster/DumpClient.h b/cpp/src/qpid/cluster/DumpClient.h index 6ce41a53a9..d61779319a 100644 --- a/cpp/src/qpid/cluster/DumpClient.h +++ b/cpp/src/qpid/cluster/DumpClient.h @@ -22,6 +22,7 @@ * */ +#include "ClusterMap.h" #include "qpid/client/Connection.h" #include "qpid/client/AsyncSession.h" #include "qpid/broker/SemanticState.h" @@ -49,13 +50,15 @@ namespace cluster { class Cluster; class Connection; +class ClusterMap; /** * A client that dumps the contents of a local broker to a remote one using AMQP. */ class DumpClient : public sys::Runnable { public: - DumpClient(const Url& receiver, Cluster& donor, + DumpClient(const MemberId& dumper, const MemberId& dumpee, const Url&, + broker::Broker& donor, const ClusterMap& map, const std::vector<boost::intrusive_ptr<Connection> >& , const boost::function<void()>& done, const boost::function<void(const std::exception&)>& fail); @@ -73,8 +76,12 @@ class DumpClient : public sys::Runnable { void dumpConsumer(broker::SemanticState::ConsumerImpl*); private: - Url receiver; - Cluster& donor; + MemberId dumperId; + MemberId dumpeeId; + Url dumpeeUrl; + broker::Broker& dumperBroker; + ClusterMap map; + std::vector<boost::intrusive_ptr<Connection> > connections; client::Connection connection, shadowConnection; client::AsyncSession session, shadowSession; boost::function<void()> done; diff --git a/cpp/src/qpid/cluster/Event.cpp b/cpp/src/qpid/cluster/Event.cpp index 2531001504..f7389c1922 100644 --- a/cpp/src/qpid/cluster/Event.cpp +++ b/cpp/src/qpid/cluster/Event.cpp @@ -22,6 +22,7 @@ #include "Event.h" #include "Cpg.h" #include "qpid/framing/Buffer.h" +#include "qpid/framing/AMQFrame.h" #include <ostream> #include <iterator> #include <algorithm> @@ -46,12 +47,20 @@ Event Event::delivered(const MemberId& m, void* d, size_t s) { memcpy(e.getData(), static_cast<char*>(d)+OVERHEAD, s-OVERHEAD); return e; } + +Event Event::control(const framing::AMQBody& body, const ConnectionId& cid, uint32_t id) { + framing::AMQFrame f(body); + Event e(CONTROL, cid, f.size(), id); + Buffer buf(e); + f.encode(buf); + return e; +} void Event::mcast (const Cpg::Name& name, Cpg& cpg) const { char header[OVERHEAD]; Buffer b(header, OVERHEAD); b.putOctet(type); - b.putLongLong(reinterpret_cast<uint64_t>(connectionId.getConnectionPtr())); + b.putLongLong(reinterpret_cast<uint64_t>(connectionId.getPointer())); b.putLong(id); iovec iov[] = { { header, OVERHEAD }, { const_cast<char*>(getData()), getSize() } }; cpg.mcast(name, iov, sizeof(iov)/sizeof(*iov)); diff --git a/cpp/src/qpid/cluster/Event.h b/cpp/src/qpid/cluster/Event.h index 6d8655392e..9a2b12bf05 100644 --- a/cpp/src/qpid/cluster/Event.h +++ b/cpp/src/qpid/cluster/Event.h @@ -47,15 +47,21 @@ class Event { /** Create an event copied from delivered data. */ static Event delivered(const MemberId& m, void* data, size_t size); + + /** Create an event containing a control */ + static Event control(const framing::AMQBody&, const ConnectionId&, uint32_t id=0); void mcast(const Cpg::Name& name, Cpg& cpg) const; EventType getType() const { return type; } ConnectionId getConnectionId() const { return connectionId; } + MemberId getMemberId() const { return connectionId.getMember(); } size_t getSize() const { return size; } char* getData() { return data; } const char* getData() const { return data; } size_t getId() const { return id; } + bool isCluster() const { return connectionId.getPointer() == 0; } + bool isConnection() const { return connectionId.getPointer() != 0; } operator framing::Buffer() const; diff --git a/cpp/src/qpid/cluster/JoiningHandler.cpp b/cpp/src/qpid/cluster/JoiningHandler.cpp deleted file mode 100644 index dbee0ece61..0000000000 --- a/cpp/src/qpid/cluster/JoiningHandler.cpp +++ /dev/null @@ -1,124 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include "JoiningHandler.h" -#include "Cluster.h" -#include "qpid/framing/ClusterDumpRequestBody.h" -#include "qpid/framing/ClusterReadyBody.h" -#include "qpid/log/Statement.h" - -namespace qpid { -namespace cluster { - -using namespace sys; -using namespace framing; - -JoiningHandler::JoiningHandler(Cluster& c) : ClusterHandler(c), state(START), catchUpConnections(0) {} - -void JoiningHandler::configChange( - cpg_address *current, int nCurrent, - cpg_address */*left*/, int nLeft, - cpg_address */*joined*/, int /*nJoined*/) -{ - // FIXME aconway 2008-09-24: Called with lock held - volatile - if (nLeft == 0 && nCurrent == 1 && *current == cluster.self) { // First in cluster. - QPID_LOG(notice, cluster.self << " first in cluster."); - cluster.map.ready(cluster.self, cluster.url); - cluster.updateMemberStats(); - cluster.unstall(); - } -} - -void JoiningHandler::deliver(Event& e) { - Mutex::ScopedLock l(cluster.lock); - // Discard connection events unless we are stalled to receive a dump. - if (state == STALLED) - cluster.connectionEventQueue.push(e); - else - QPID_LOG(trace, "Discarded pre-join event " << e); -} - -void JoiningHandler::update(const MemberId&, const framing::FieldTable& members, uint64_t dumper) { - Mutex::ScopedLock l(cluster.lock); - if (cluster.map.update(members, dumper)) cluster.updateMemberStats(); - checkDumpRequest(); -} - -void JoiningHandler::checkDumpRequest() { // Call with lock held - if (state == START && !cluster.map.dumper) { - cluster.broker.getPort(); // ensure the broker is listening. - state = DUMP_REQUESTED; - cluster.mcastControl(ClusterDumpRequestBody(framing::ProtocolVersion(), cluster.url.str()), 0); - } -} - -void JoiningHandler::dumpRequest(const MemberId& dumpee, const std::string& ) { - Mutex::ScopedLock l(cluster.lock); - if (cluster.map.dumper) { // Already a dump in progress. - if (dumpee == cluster.self && state == DUMP_REQUESTED) - state = START; // Need to make another request. - } - else { // Start a new dump - cluster.map.dumper = cluster.map.first(); - QPID_LOG(debug, "Starting dump, dumper=" << cluster.map.dumper << " dumpee=" << dumpee); - if (dumpee == cluster.self) { // My turn - switch (state) { - case START: - case STALLED: - assert(0); break; - - case DUMP_REQUESTED: - QPID_LOG(debug, cluster.self << " stalling for dump from " << cluster.map.dumper); - state = STALLED; - cluster.stall(); - break; - - case DUMP_COMPLETE: - QPID_LOG(debug, cluster.self << " at start point and dump complete, ready."); - cluster.ready(); - break; - } - } - } -} - -void JoiningHandler::ready(const MemberId& id, const std::string& urlStr) { - Mutex::ScopedLock l(cluster.lock); - if (cluster.map.ready(id, Url(urlStr))) - cluster.updateMemberStats(); - checkDumpRequest(); -} - -void JoiningHandler::dumpComplete() { - Mutex::ScopedLock l(cluster.lock); - if (state == STALLED) { - QPID_LOG(debug, cluster.self << " received dump and stalled at start point, unstalling."); - cluster.ready(); - } - else { - QPID_LOG(debug, cluster.self << " received dump, waiting for start point."); - assert(state == DUMP_REQUESTED); - state = DUMP_COMPLETE; - } - // FIXME aconway 2008-09-18: need to detect incomplete dump. -} - - -}} // namespace qpid::cluster diff --git a/cpp/src/qpid/cluster/JoiningHandler.h b/cpp/src/qpid/cluster/JoiningHandler.h deleted file mode 100644 index cc47690ac5..0000000000 --- a/cpp/src/qpid/cluster/JoiningHandler.h +++ /dev/null @@ -1,60 +0,0 @@ -#ifndef QPID_CLUSTER_JOININGHANDLER_H -#define QPID_CLUSTER_JOININGHANDLER_H - -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include "ClusterHandler.h" - -namespace qpid { -namespace cluster { - -/** - * Cluster handler for the "joining" phase, before the process is a - * full cluster member. - */ -class JoiningHandler : public ClusterHandler -{ - public: - JoiningHandler(Cluster& c); - - void configChange(struct cpg_address */*members*/, int /*nMembers*/, - struct cpg_address */*left*/, int /*nLeft*/, - struct cpg_address */*joined*/, int /*nJoined*/ - ); - - void deliver(Event& e); - - void update(const MemberId&, const framing::FieldTable& members, uint64_t dumping); - void dumpRequest(const MemberId&, const std::string& url); - void ready(const MemberId&, const std::string& url); - - void dumpComplete(); - - private: - void checkDumpRequest(); - - enum { START, DUMP_REQUESTED, STALLED, DUMP_COMPLETE } state; - size_t catchUpConnections; - -}; -}} // namespace qpid::cluster - -#endif /*!QPID_CLUSTER_JOININGHANDLER_H*/ diff --git a/cpp/src/qpid/cluster/MemberHandler.cpp b/cpp/src/qpid/cluster/MemberHandler.cpp deleted file mode 100644 index 69fe2eec0b..0000000000 --- a/cpp/src/qpid/cluster/MemberHandler.cpp +++ /dev/null @@ -1,96 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include "MemberHandler.h" -#include "Cluster.h" -#include "DumpClient.h" -#include "qpid/log/Statement.h" -#include "qpid/framing/ClusterUpdateBody.h" -#include "qpid/framing/enum.h" - -namespace qpid { -namespace cluster { - -using namespace sys; -using namespace framing; - -MemberHandler::MemberHandler(Cluster& c) : ClusterHandler(c) {} - -MemberHandler::~MemberHandler() { - if (dumpThread.id()) - dumpThread.join(); // Join the last dumpthread. -} - -void MemberHandler::configChange( - cpg_address */*current*/, int /*nCurrent*/, - cpg_address */*left*/, int /*nLeft*/, - cpg_address */*joined*/, int nJoined) -{ - // FIXME aconway 2008-09-24: Called with lock held - volatile - if (nJoined && cluster.map.sendUpdate(cluster.self)) // New members need update - cluster.mcastControl(cluster.map.toControl(), 0); -} - -void MemberHandler::deliver(Event& e) { - cluster.connectionEventQueue.push(e); -} - -// Updates are for new joiners. -void MemberHandler::update(const MemberId&, const framing::FieldTable& , uint64_t) {} - -void MemberHandler::dumpRequest(const MemberId& dumpee, const std::string& urlStr) { - Mutex::ScopedLock l(cluster.lock); - if (cluster.map.dumper) return; // dump in progress, ignore request. - - cluster.map.dumper = cluster.map.first(); - if (cluster.map.dumper != cluster.self) return; - - QPID_LOG(info, cluster.self << " sending state dump to " << dumpee); - assert(!cluster.connectionEventQueue.isStopped()); // Not currently stalled. - cluster.stall(); - - if (dumpThread.id()) - dumpThread.join(); // Join the previous dumpthread. - dumpThread = Thread(new DumpClient(Url(urlStr), cluster, - boost::bind(&MemberHandler::dumpSent, this), - boost::bind(&MemberHandler::dumpError, this, _1))); -} - -void MemberHandler::ready(const MemberId& id, const std::string& urlStr) { - Mutex::ScopedLock l(cluster.lock); - if (cluster.map.ready(id, Url(urlStr))) - cluster.updateMemberStats(); -} - - -void MemberHandler::dumpSent() { - Mutex::ScopedLock l(cluster.lock); - QPID_LOG(debug, "Finished sending state dump."); - cluster.ready(); -} - -void MemberHandler::dumpError(const std::exception& e) { - QPID_LOG(error, cluster.self << " error sending state dump: " << e.what()); - dumpSent(); -} - -void MemberHandler::dumpComplete() { assert(0); } - -}} // namespace qpid::cluster diff --git a/cpp/src/qpid/cluster/MemberHandler.h b/cpp/src/qpid/cluster/MemberHandler.h deleted file mode 100644 index 37cf653b7b..0000000000 --- a/cpp/src/qpid/cluster/MemberHandler.h +++ /dev/null @@ -1,63 +0,0 @@ -#ifndef QPID_CLUSTER_MEMBERHANDLER_H -#define QPID_CLUSTER_MEMBERHANDLER_H - -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include "ClusterHandler.h" -#include "qpid/sys/Thread.h" - -namespace qpid { -namespace cluster { - -/** - * Cluster handler for the "member" phase, before the process is a - * full cluster member. - */ -class MemberHandler : public ClusterHandler -{ - public: - MemberHandler(Cluster& c); - ~MemberHandler(); - - void configChange( - struct cpg_address */*members*/, int /*nMembers*/, - struct cpg_address */*left*/, int /*nLeft*/, - struct cpg_address */*joined*/, int /*nJoined*/ - ); - - void deliver(Event& e); - - void update(const MemberId&, const framing::FieldTable& members, uint64_t dumping); - void dumpRequest(const MemberId&, const std::string& url); - void ready(const MemberId&, const std::string& url); - - void dumpSent(); - void dumpError(const std::exception&); - - void dumpComplete(); - - public: - sys::Thread dumpThread; -}; -}} // namespace qpid::cluster - -#endif /*!QPID_CLUSTER_MEMBERHANDLER_H*/ - diff --git a/cpp/src/qpid/cluster/types.h b/cpp/src/qpid/cluster/types.h index f16cc18f41..2154aa89ce 100644 --- a/cpp/src/qpid/cluster/types.h +++ b/cpp/src/qpid/cluster/types.h @@ -21,6 +21,9 @@ * under the License. * */ + +#include <qpid/Url.h> + #include <utility> #include <iosfwd> #include <string> @@ -49,7 +52,7 @@ struct MemberId : std::pair<uint32_t, uint32_t> { uint32_t getPid() const { return second; } operator uint64_t() const { return (uint64_t(first)<<32ull) + second; } - // Encode as string, network byte order. + // AsMethodBody as string, network byte order. std::string str() const; }; @@ -62,7 +65,7 @@ struct ConnectionId : public std::pair<MemberId, Connection*> { ConnectionId(uint64_t m, uint64_t c) : std::pair<MemberId, Connection*>(MemberId(m), reinterpret_cast<Connection*>(c)) {} MemberId getMember() const { return first; } - Connection* getConnectionPtr() const { return second; } + Connection* getPointer() const { return second; } }; std::ostream& operator<<(std::ostream&, const ConnectionId&); diff --git a/cpp/src/qpid/framing/FieldTable.h b/cpp/src/qpid/framing/FieldTable.h index b56e3ce3ba..1b6ef0a124 100644 --- a/cpp/src/qpid/framing/FieldTable.h +++ b/cpp/src/qpid/framing/FieldTable.h @@ -89,6 +89,9 @@ class FieldTable ValueMap::const_iterator end() const { return values.end(); } ValueMap::const_iterator find(const std::string& s) const { return values.find(s); } + std::pair <ValueMap::iterator, bool> insert(const ValueMap::value_type&); + void clear() { values.clear(); } + // ### Hack Alert ValueMap::iterator getValues() { return values.begin(); } diff --git a/cpp/src/qpid/sys/LockPtr.h b/cpp/src/qpid/sys/LockPtr.h new file mode 100644 index 0000000000..738a864317 --- /dev/null +++ b/cpp/src/qpid/sys/LockPtr.h @@ -0,0 +1,89 @@ +#ifndef QPID_SYS_LOCKPTR_H +#define QPID_SYS_LOCKPTR_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/sys/Mutex.h" +#include <boost/noncopyable.hpp> + +namespace qpid { +namespace sys { + +class Mutex; + +/** + * LockPtr is a smart pointer to T. It is constructed from a volatile + * T* and a Lock (by default a Mutex). It const_casts away the + * volatile qualifier and locks the Lock for the duration of its + * + * Used in conjuntion with the "volatile" keyword to get the compiler + * to help enforce correct concurrent use of mutli-threaded objects. + * See ochttp://www.ddj.com/cpp/184403766 for a detailed discussion. + * + * To summarize the convention: + * - Declare thread-safe member functions as volatile. + * - Declare instances of the class that may be called concurrently as volatile. + * - Use LockPtr to cast away the volatile qualifier while taking a lock. + * + * This means that code calling on a concurrently-used object + * (declared volatile) can only call thread-safe (volatile) member + * functions. Code that needs to use thread-unsafe members must use a + * LockPtr, thereby acquiring the lock and making it safe to do so. + * + * A good type-safe pattern is the internally-locked object: + * - It has it's own private lock member. + * - All public functions are thread safe and declared volatile. + * - Any thread-unsafe, non-volatile functions are private. + * - Only member function implementations use LockPtr to access private functions. + * + * This encapsulates all the locking logic inside the class. + * + * One nice feature of this convention is the common case where you + * need a public, locked version of some function foo() and also a + * private unlocked version to avoid recursive locks. They can be declared as + * volatile and non-volatile overloads of the same function: + * + * // public + * void Thing::foo() volatile { LockPtr<Thing>(this, myLock)->foo(); } + * // private + * void Thing::foo() { ... do stuff ...} + */ + +template <class T, class Lock> class LockPtr : public boost::noncopyable { + public: + LockPtr(volatile T* p, Lock& l) : ptr(const_cast<T*>(p)), lock(l) { lock.lock(); } + LockPtr(volatile T* p, volatile Lock& l) : ptr(const_cast<T*>(p)), lock(const_cast<Lock&>(l)) { lock.lock(); } + ~LockPtr() { lock.unlock(); } + + T& operator*() { return *ptr; } + T* operator->() { return ptr; } + + private: + T* ptr; + Lock& lock; +}; + + +}} // namespace qpid::sys + + +#endif /*!QPID_SYS_LOCKPTR_H*/ diff --git a/cpp/src/qpid/sys/PollableQueue.h b/cpp/src/qpid/sys/PollableQueue.h index 3a94c60be0..8313196623 100644 --- a/cpp/src/qpid/sys/PollableQueue.h +++ b/cpp/src/qpid/sys/PollableQueue.h @@ -42,40 +42,31 @@ class Poller; */ template <class T> class PollableQueue { - typedef std::deque<T> Queue; - public: - typedef typename Queue::iterator iterator; - /** Callback to process a range of items. */ - typedef boost::function<void (const iterator&, const iterator&)> Callback; + typedef boost::function<void (const T&)> Callback; - /** @see forEach() */ - template <class F> struct ForEach { - F handleOne; - ForEach(F f) : handleOne(f) {} - void operator()(const iterator& i, const iterator& j) const { std::for_each(i, j, handleOne); } - }; - - /** Create a range callback from a functor that processes a single item. */ - template <class F> static ForEach<F> forEach(const F& f) { return ForEach<F>(f); } - /** When the queue is selected by the poller, values are passed to callback cb. */ - explicit PollableQueue(const Callback& cb); + PollableQueue(const Callback& cb, const boost::shared_ptr<sys::Poller>& poller); + ~PollableQueue(); + /** Push a value onto the queue. Thread safe */ void push(const T& t); /** Start polling. */ - void start(const boost::shared_ptr<sys::Poller>& poller); + void start(); /** Stop polling and wait for the current callback, if any, to complete. */ void stop(); /** Are we currently stopped?*/ - bool isStopped() const; - + bool isStopped() const { ScopedLock l(lock); return stopped; } + + size_t size() { ScopedLock l(lock); return queue.size(); } + bool empty() { ScopedLock l(lock); return queue.empty(); } private: + typedef std::deque<T> Queue; typedef sys::Monitor::ScopedLock ScopedLock; typedef sys::Monitor::ScopedUnlock ScopedUnlock; @@ -83,59 +74,67 @@ class PollableQueue { mutable sys::Monitor lock; Callback callback; + boost::shared_ptr<sys::Poller> poller; PollableCondition condition; - sys::DispatchHandle handle; + DispatchHandle handle; Queue queue; - Queue batch; - bool dispatching, stopped; + Thread dispatcher; + bool stopped; }; -template <class T> PollableQueue<T>::PollableQueue(const Callback& cb) // FIXME aconway 2008-08-12: - : callback(cb), - handle(condition, boost::bind(&PollableQueue<T>::dispatch, this, _1), 0, 0), - dispatching(false), stopped(true) -{} +template <class T> PollableQueue<T>::PollableQueue( + const Callback& cb, const boost::shared_ptr<sys::Poller>& p) + : callback(cb), poller(p), + handle(condition, boost::bind(&PollableQueue<T>::dispatch, this, _1), 0, 0), stopped(true) +{ + handle.startWatch(poller); + handle.unwatch(); +} -template <class T> void PollableQueue<T>::start(const boost::shared_ptr<sys::Poller>& poller) { +template <class T> void PollableQueue<T>::start() { ScopedLock l(lock); + assert(stopped); stopped = false; - handle.startWatch(poller); + if (!queue.empty()) condition.set(); + handle.rewatch(); +} + +template <class T> PollableQueue<T>::~PollableQueue() { + handle.stopWatch(); } template <class T> void PollableQueue<T>::push(const T& t) { ScopedLock l(lock); + if (queue.empty()) condition.set(); queue.push_back(t); - condition.set(); } template <class T> void PollableQueue<T>::dispatch(sys::DispatchHandle& h) { - ScopedLock l(lock); - if (stopped) return; - dispatching = true; - condition.clear(); - batch.clear(); - batch.swap(queue); // Snapshot of current queue contents. - { - // Process outside the lock to allow concurrent push. - ScopedUnlock u(lock); - callback(batch.begin(), batch.end()); + ScopedLock l(lock); // Prevent concurrent push + assert(dispatcher.id() == 0 || dispatcher.id() == Thread::current().id()); + dispatcher = Thread::current(); + while (!stopped && !queue.empty()) { + T value = queue.front(); + queue.pop_front(); + { // callback outside the lock to allow concurrent push. + ScopedUnlock u(lock); + callback(value); + } } - batch.clear(); - dispatching = false; + if (queue.empty()) condition.clear(); if (stopped) lock.notifyAll(); - else h.rewatch(); + dispatcher = Thread(); + if (!stopped) h.rewatch(); } template <class T> void PollableQueue<T>::stop() { ScopedLock l(lock); - handle.stopWatch(); + assert(!stopped); + handle.unwatch(); stopped = true; - while (dispatching) lock.wait(); -} - -template <class T> bool PollableQueue<T>::isStopped() const { - ScopedLock l(lock); - return stopped; + // No deadlock if stop is called from the dispatcher thread + while (dispatcher.id() && dispatcher.id() != Thread::current().id()) + lock.wait(); } }} // namespace qpid::sys diff --git a/cpp/src/tests/cluster_test.cpp b/cpp/src/tests/cluster_test.cpp index 9573caf61d..7b67fed388 100644 --- a/cpp/src/tests/cluster_test.cpp +++ b/cpp/src/tests/cluster_test.cpp @@ -89,7 +89,7 @@ struct ClusterFixture : public vector<uint16_t> { void waitFor(size_t n) { size_t retry=1000; // TODO aconway 2008-07-16: nasty sleeps, clean this up. - while (retry && getGlobalCluster().size() != n) { + while (retry && getGlobalCluster().getUrls().size() != n) { ::usleep(1000); --retry; } @@ -101,7 +101,7 @@ ClusterFixture::ClusterFixture(size_t n, bool init0_) : name(Uuid(true).str()), if (!init0) return; // Defer initialization of broker0 // Wait for all n members to join the cluster waitFor(n); - BOOST_REQUIRE_EQUAL(n, getGlobalCluster().size()); + BOOST_REQUIRE_EQUAL(n, getGlobalCluster().getUrls().size()); } void ClusterFixture::add() { @@ -227,7 +227,7 @@ QPID_AUTO_TEST_CASE(testCatchupSharedState) { BOOST_CHECK_EQUAL(m.getData(), "bar"); BOOST_CHECK_EQUAL(c1.session.queueQuery("q").getMessageCount(), 0u); - // Add & verify another broker. + // Add another broker, don't wait for join - should be stalled till ready. cluster.add(); Client c2(cluster[2], "c2"); BOOST_CHECK(c2.subs.get(m, "p", TIME_SEC)); @@ -321,29 +321,4 @@ QPID_AUTO_TEST_CASE(testDequeueWaitingSubscription) { BOOST_CHECK_EQUAL(0u, c2.session.queueQuery("q").getMessageCount()); } -QPID_AUTO_TEST_CASE(testStall) { - ClusterFixture cluster(2); - Client c0(cluster[0], "c0"); - Client c1(cluster[1], "c1"); - - // Declare on all to avoid race condition. - c0.session.queueDeclare("q"); - c1.session.queueDeclare("q"); - - // Stall 0, verify it does not process deliverys while stalled. - getGlobalCluster().stall(); - c1.session.messageTransfer(arg::content=Message("foo","q")); - while (c1.session.queueQuery("q").getMessageCount() != 1) - ::usleep(1000); // Wait for message to show up on broker 1. - // But it should not be on broker 0. - boost::shared_ptr<broker::Queue> q0 = cluster.broker0->broker->getQueues().find("q"); - BOOST_REQUIRE(q0); - BOOST_CHECK_EQUAL(q0->getMessageCount(), (unsigned)0); - // Now unstall and we should get the message. - getGlobalCluster().ready(); - Message m; - BOOST_CHECK(c0.subs.get(m, "q", TIME_SEC)); - BOOST_CHECK_EQUAL(m.getData(), "foo"); -} - QPID_AUTO_TEST_SUITE_END() |