diff options
Diffstat (limited to 'cpp/src')
-rw-r--r-- | cpp/src/qpid/client/ConnectionImpl.cpp | 18 | ||||
-rw-r--r-- | cpp/src/qpid/client/ConnectionImpl.h | 2 | ||||
-rw-r--r-- | cpp/src/qpid/client/Dispatcher.cpp | 14 | ||||
-rw-r--r-- | cpp/src/qpid/client/FailoverListener.cpp | 43 | ||||
-rw-r--r-- | cpp/src/qpid/client/FailoverListener.h | 5 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/Cluster.cpp | 214 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/Cluster.h | 31 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/ClusterMap.cpp | 69 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/ClusterMap.h | 2 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/Connection.cpp | 84 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/Connection.h | 6 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/DumpClient.cpp | 3 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/OutputInterceptor.cpp | 5 | ||||
-rw-r--r-- | cpp/src/tests/cluster_test.cpp | 63 |
14 files changed, 329 insertions, 230 deletions
diff --git a/cpp/src/qpid/client/ConnectionImpl.cpp b/cpp/src/qpid/client/ConnectionImpl.cpp index bb3517f839..652d59f448 100644 --- a/cpp/src/qpid/client/ConnectionImpl.cpp +++ b/cpp/src/qpid/client/ConnectionImpl.cpp @@ -32,17 +32,18 @@ #include <boost/bind.hpp> #include <boost/format.hpp> -using namespace qpid::client; + +namespace qpid { +namespace client { + using namespace qpid::framing; using namespace qpid::framing::connection; using namespace qpid::sys; - using namespace qpid::framing::connection;//for connection error codes ConnectionImpl::ConnectionImpl(framing::ProtocolVersion v, const ConnectionSettings& settings) : Bounds(settings.maxFrameSize * settings.bounds), handler(settings, v), - failover(new FailoverListener()), version(v), nextChannel(1) { @@ -51,7 +52,6 @@ ConnectionImpl::ConnectionImpl(framing::ProtocolVersion v, const ConnectionSetti handler.out = boost::bind(&Connector::send, boost::ref(connector), _1); handler.onClose = boost::bind(&ConnectionImpl::closed, this, CLOSE_CODE_NORMAL, std::string()); - //only set error handler once open handler.onError = boost::bind(&ConnectionImpl::closed, this, _1, _2); } @@ -69,7 +69,8 @@ void ConnectionImpl::addSession(const boost::shared_ptr<SessionImpl>& session, u Mutex::ScopedLock l(lock); session->setChannel(channel ? channel : nextChannel++); boost::weak_ptr<SessionImpl>& s = sessions[session->getChannel()]; - if (s.lock()) throw SessionBusyException(); + boost::shared_ptr<SessionImpl> ss = s.lock(); + if (ss) throw SessionBusyException(QPID_MSG("Channel " << ss->getChannel() << " attachd to " << ss->getId())); s = session; } @@ -110,7 +111,7 @@ void ConnectionImpl::open() connector->init(); handler.waitForOpen(); - if (failover.get()) failover->start(shared_from_this()); + failover.reset(new FailoverListener(shared_from_this(), handler.knownBrokersUrls)); } void ConnectionImpl::idleIn() @@ -176,7 +177,6 @@ const ConnectionSettings& ConnectionImpl::getNegotiatedSettings() } std::vector<qpid::Url> ConnectionImpl::getKnownBrokers() { - // FIXME aconway 2008-10-08: ensure we never return empty list, always include self Url. return failover ? failover->getKnownBrokers() : handler.knownBrokersUrls; } @@ -187,4 +187,6 @@ boost::shared_ptr<SessionImpl> ConnectionImpl::newSession(const std::string& na return simpl; } -void ConnectionImpl::stopFailoverListener() { failover.reset(); } +void ConnectionImpl::stopFailoverListener() { failover->stop(); } + +}} // namespace qpid::client diff --git a/cpp/src/qpid/client/ConnectionImpl.h b/cpp/src/qpid/client/ConnectionImpl.h index a432afff4f..c2cc6f006d 100644 --- a/cpp/src/qpid/client/ConnectionImpl.h +++ b/cpp/src/qpid/client/ConnectionImpl.h @@ -89,6 +89,8 @@ class ConnectionImpl : public Bounds, std::vector<Url> getKnownBrokers(); void registerFailureCallback ( boost::function<void ()> fn ) { failureCallback = fn; } void stopFailoverListener(); + + framing::ProtocolVersion getVersion() { return version; } }; }} diff --git a/cpp/src/qpid/client/Dispatcher.cpp b/cpp/src/qpid/client/Dispatcher.cpp index fd9d8a8ad1..32d0001040 100644 --- a/cpp/src/qpid/client/Dispatcher.cpp +++ b/cpp/src/qpid/client/Dispatcher.cpp @@ -72,7 +72,7 @@ void Dispatcher::run() boost::state_saver<bool> reset(running); // Reset to false on exit. running = true; try { - while (!queue->isClosed()) { + while (true) { Mutex::ScopedUnlock u(lock); FrameSet::shared_ptr content = queue->pop(); if (content->isA<MessageTransferBody>()) { @@ -92,12 +92,14 @@ void Dispatcher::run() } } } - session.sync(); // Make sure all our acks are received before returning. } - catch (const ClosedException& e) - { - QPID_LOG(debug, "Ignored exception in client dispatch thread: " << e.what()); - } //ignore it and return + catch (const ClosedException& e) { + QPID_LOG(debug, "Dispatch thread exiting, session closed: " << session.getId()); + try { + session.sync(); // Make sure all our acks are received before returning. + } + catch(...) {} + } catch (const std::exception& e) { QPID_LOG(error, "Exception in client dispatch thread: " << e.what()); if ( failoverHandler ) diff --git a/cpp/src/qpid/client/FailoverListener.cpp b/cpp/src/qpid/client/FailoverListener.cpp index e13f240439..98df12fc57 100644 --- a/cpp/src/qpid/client/FailoverListener.cpp +++ b/cpp/src/qpid/client/FailoverListener.cpp @@ -21,6 +21,7 @@ #include "FailoverListener.h" #include "SessionBase_0_10Access.h" #include "qpid/client/SubscriptionManager.h" +#include "qpid/framing/Uuid.h" #include "qpid/log/Statement.h" #include "qpid/log/Helpers.h" @@ -40,40 +41,58 @@ static Session makeSession(boost::shared_ptr<SessionImpl> si) { return s; } -FailoverListener::FailoverListener() {} +FailoverListener::FailoverListener(const boost::shared_ptr<ConnectionImpl>& c, const std::vector<Url>& initUrls) + : knownBrokers(initUrls) + { + // Special versions used to mark cluster catch-up connections + // which do not need a FailoverListener + if (c->getVersion().getMajor() >= 0x80) { + QPID_LOG(debug, "No failover listener for catch-up connection."); + return; + } -void FailoverListener::start(const boost::shared_ptr<ConnectionImpl>& c) { - Session session = makeSession(c->newSession(std::string(), 0)); + Session session = makeSession(c->newSession(AMQ_FAILOVER+framing::Uuid(true).str(), 0)); if (session.exchangeQuery(arg::name=AMQ_FAILOVER).getNotFound()) { session.close(); return; } subscriptions.reset(new SubscriptionManager(session)); - std::string qname=AMQ_FAILOVER + "." + session.getId().getName(); + std::string qname=session.getId().getName(); session.queueDeclare(arg::queue=qname, arg::exclusive=true, arg::autoDelete=true); session.exchangeBind(arg::queue=qname, arg::exchange=AMQ_FAILOVER); subscriptions->subscribe(*this, qname, FlowControl::unlimited()); thread = sys::Thread(*subscriptions); } -void FailoverListener::stop() { - if (subscriptions.get()) subscriptions->stop(); - if (thread.id()) thread.join(); - if (subscriptions.get()) subscriptions->getSession().close(); - thread=sys::Thread(); - subscriptions.reset(); -} FailoverListener::~FailoverListener() { try { stop(); } catch (const std::exception& e) {} } +void FailoverListener::stop() { + if (subscriptions.get()) + subscriptions->stop(); + + if (thread.id() == sys::Thread::current().id()) { + // FIXME aconway 2008-10-16: this can happen if ConnectionImpl + // dtor runs when my session drops its weak pointer lock. + // For now, leak subscriptions to prevent a core if we delete + // without joining. + subscriptions.release(); + } + else if (thread.id()) { + thread.join(); + thread=sys::Thread(); + subscriptions.reset(); // Safe to delete after join. + } +} + void FailoverListener::received(Message& msg) { sys::Mutex::ScopedLock l(lock); knownBrokers.clear(); framing::Array urlArray; msg.getHeaders().getArray("amq.failover", urlArray); - for (framing::Array::ValueVector::const_iterator i = urlArray.begin(); i < urlArray.end(); ++i ) + for (framing::Array::ValueVector::const_iterator i = urlArray.begin(); i != urlArray.end(); ++i ) knownBrokers.push_back(Url((*i)->get<std::string>())); QPID_LOG(info, "Known-brokers update: " << log::formatList(knownBrokers)); } diff --git a/cpp/src/qpid/client/FailoverListener.h b/cpp/src/qpid/client/FailoverListener.h index c702fed846..fc0cca28f1 100644 --- a/cpp/src/qpid/client/FailoverListener.h +++ b/cpp/src/qpid/client/FailoverListener.h @@ -38,11 +38,10 @@ class SubscriptionManager; */ class FailoverListener : public MessageListener { public: - FailoverListener(); + FailoverListener(const boost::shared_ptr<ConnectionImpl>&, const std::vector<Url>& initUrls); ~FailoverListener(); - void start(const boost::shared_ptr<ConnectionImpl>&); void stop(); - + std::vector<Url> getKnownBrokers() const; void received(Message& msg); diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp index 811c1c9557..a0bcb9ae02 100644 --- a/cpp/src/qpid/cluster/Cluster.cpp +++ b/cpp/src/qpid/cluster/Cluster.cpp @@ -30,6 +30,7 @@ #include "qpid/framing/AllInvoker.h" #include "qpid/framing/ClusterDumpRequestBody.h" #include "qpid/framing/ClusterReadyBody.h" +#include "qpid/framing/ClusterConfigChangeBody.h" #include "qpid/framing/ClusterDumpOfferBody.h" #include "qpid/framing/ClusterDumpStartBody.h" #include "qpid/framing/ClusterShutdownBody.h" @@ -76,6 +77,7 @@ struct ClusterDispatcher : public framing::AMQP_AllOperations::ClusterHandler { void dumpRequest(const std::string& url) { cluster.dumpRequest(member, url, l); } void ready(const std::string& url) { cluster.ready(member, url, l); } + void configChange(const std::string& addresses) { cluster.configChange(member, addresses, l); } void dumpOffer(uint64_t dumpee) { cluster.dumpOffer(member, dumpee, l); } void dumpStart(uint64_t dumpee, const std::string& url) { cluster.dumpStart(member, dumpee, url, l); } void shutdown() { cluster.shutdown(member, l); } @@ -89,14 +91,14 @@ Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker& b) : cpg(*this), name(name_), myUrl(url_), - memberId(cpg.self()), + myId(cpg.self()), cpgDispatchHandle( cpg, boost::bind(&Cluster::dispatch, this, _1), // read 0, // write boost::bind(&Cluster::disconnect, this, _1) // disconnect ), - deliverQueue(boost::bind(&Cluster::process, this, _1), poller), + deliverQueue(boost::bind(&Cluster::delivered, this, _1), poller), mcastId(0), mgmtObject(0), state(INIT), @@ -115,20 +117,20 @@ Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker& b) : failoverExchange.reset(new FailoverExchange(this)); broker.addFinalizer(boost::bind(&Cluster::brokerShutdown, this)); cpgDispatchHandle.startWatch(poller); + deliverQueue.start(); cpg.join(name); - QPID_LOG(notice, *this << " joining cluster " << name.str()); + QPID_LOG(notice, *this << " will join cluster " << name.str()); } Cluster::~Cluster() { if (dumpThread.id()) dumpThread.join(); // Join the previous dumpthread. } -void Cluster::insert(const boost::intrusive_ptr<Connection>& c) { +bool Cluster::insert(const boost::intrusive_ptr<Connection>& c) { Lock l(lock); - // FIXME aconway 2008-10-08: what keeps catchUp connections in memory if not in map? - // esp shadow connections? See race comment in getConnection. - assert(!c->isCatchUp()); - connections.insert(Cluster::ConnectionMap::value_type(c->getId(), c)); + bool result = connections.insert(Cluster::ConnectionMap::value_type(c->getId(), c)).second; + assert(result); + return result; } void Cluster::erase(ConnectionId id) { @@ -136,14 +138,19 @@ void Cluster::erase(ConnectionId id) { connections.erase(id); } -void Cluster::mcastControl(const framing::AMQBody& body, Connection* cptr) { +void Cluster::mcastControl(const framing::AMQBody& body, const ConnectionId& id, uint32_t seq) { Lock l(lock); - mcastControl(body, cptr, l); + mcastControl(body, id, seq, l); } -void Cluster::mcastControl(const framing::AMQBody& body, Connection* cptr, Lock&) { - Lock l(lock); - Event e(Event::control(body, ConnectionId(memberId, cptr), ++mcastId)); +void Cluster::mcastControl(const framing::AMQBody& body, const ConnectionId& id, uint32_t seq, Lock& l) { + Event e(Event::control(body, id, seq)); + QPID_LOG(trace, *this << " MCAST " << e << ": " << body); + mcast(e, l); +} + +void Cluster::mcastControl(const framing::AMQBody& body, Lock& l) { + Event e(Event::control(body, ConnectionId(myId,0), ++mcastId)); QPID_LOG(trace, *this << " MCAST " << e << ": " << body); mcast(e, l); } @@ -166,8 +173,8 @@ void Cluster::mcast(const Event& e) { Lock l(lock); mcast(e, l); } void Cluster::mcast(const Event& e, Lock&) { if (state == LEFT) return; - if (state < READY && e.isConnection()) { - // Stall outgoing connection events. + if (state <= CATCHUP && e.isConnection()) { + // Stall outgoing connection events untill we are fully READY QPID_LOG(trace, *this << " MCAST deferred: " << e ); mcastQueue.push_back(e); } @@ -192,10 +199,10 @@ void Cluster::leave() { void Cluster::leave(Lock&) { if (state != LEFT) { state = LEFT; + if (mgmtObject!=0) mgmtObject->set_status("SHUTDOWN"); QPID_LOG(notice, *this << " leaving cluster " << name.str()); if (!deliverQueue.isStopped()) deliverQueue.stop(); - if (mgmtObject!=0) mgmtObject->set_status("SHUTDOWN"); try { cpg.leave(name); } catch (const std::exception& e) { QPID_LOG(critical, *this << " error leaving process group: " << e.what()); @@ -211,14 +218,15 @@ void Cluster::leave(Lock&) { boost::intrusive_ptr<Connection> Cluster::getConnection(const ConnectionId& connectionId, Lock&) { ConnectionMap::iterator i = connections.find(connectionId); if (i == connections.end()) { - if (connectionId.getMember() == memberId) { // Closed local connection + if (connectionId.getMember() == myId) { // Closed local connection QPID_LOG(warning, *this << " attempt to use closed connection " << connectionId); return boost::intrusive_ptr<Connection>(); } else { // New shadow connection std::ostringstream mgmtId; mgmtId << name.str() << ":" << connectionId; - ConnectionMap::value_type value(connectionId, new Connection(*this, shadowOut, mgmtId.str(), connectionId)); + ConnectionMap::value_type value(connectionId, + new Connection(*this, shadowOut, mgmtId.str(), connectionId)); i = connections.insert(value).first; } } @@ -242,50 +250,54 @@ void Cluster::deliver( { Mutex::ScopedLock l(lock); MemberId from(nodeid, pid); - Event e = Event::delivered(from, msg, msg_len); + deliver(Event::delivered(from, msg, msg_len), l); +} + +void Cluster::deliver(const Event& e, Lock&) { if (state == LEFT) return; - QPID_LOG(trace, *this << " DLVR: " << e); - if (e.isCluster() && state != DUMPEE) // Process cluster controls immediately unless in DUMPEE state. - process(e, l); - else if (state != NEWBIE) // Newbie discards events up to the dump offer. - deliverQueue.push(e); + QPID_LOG(trace, *this << " PUSH: " << e); + deliverQueue.push(e); // Otherwise enqueue for processing. } -void Cluster::process(const Event& e) { +void Cluster::delivered(const Event& e) { Lock l(lock); - process(e,l); + delivered(e,l); } -void Cluster::process(const Event& e, Lock& l) { +void Cluster::delivered(const Event& e, Lock& l) { try { Buffer buf(e); AMQFrame frame; if (e.isCluster()) { while (frame.decode(buf)) { - QPID_LOG(trace, *this << " PROC: " << e << " " << frame); + QPID_LOG(trace, *this << " DLVR: " << e << " " << frame); ClusterDispatcher dispatch(*this, e.getMemberId(), l); if (!framing::invoke(dispatch, *frame.getBody()).wasHandled()) throw Exception(QPID_MSG("Invalid cluster control")); } } else { // e.isConnection() - boost::intrusive_ptr<Connection> connection = getConnection(e.getConnectionId(), l); - if (connection) { // Ignore if no connection. - if (e.getType() == DATA) { - QPID_LOG(trace, *this << " PROC: " << e); - connection->deliverBuffer(buf); - } - else { // control + if (state == NEWBIE) { + QPID_LOG(trace, *this << " DROP: " << e); + } + else { + boost::intrusive_ptr<Connection> connection = getConnection(e.getConnectionId(), l); + if (!connection) return; + if (e.getType() == CONTROL) { while (frame.decode(buf)) { - QPID_LOG(trace, *this << " PROC: " << e << " " << frame); + QPID_LOG(trace, *this << " DLVR: " << e << " " << frame); connection->delivered(frame); } } + else { + QPID_LOG(trace, *this << " DLVR: " << e); + connection->deliverBuffer(buf); + } } } } catch (const std::exception& e) { - QPID_LOG(critical, *this << " error in cluster process: " << e.what()); + QPID_LOG(critical, *this << " error in cluster delivered: " << e.what()); leave(l); } } @@ -304,11 +316,11 @@ ostream& operator<<(ostream& o, const AddrList& a) { for (const cpg_address* p = a.addrs; p < a.addrs+a.count; ++p) { const char* reasonString; switch (p->reason) { - case CPG_REASON_JOIN: reasonString = " joined "; break; - case CPG_REASON_LEAVE: reasonString = " left "; break; - case CPG_REASON_NODEDOWN: reasonString = " node-down "; break; - case CPG_REASON_NODEUP: reasonString = " node-up "; break; - case CPG_REASON_PROCDOWN: reasonString = " process-down "; break; + case CPG_REASON_JOIN: reasonString = " (joined) "; break; + case CPG_REASON_LEAVE: reasonString = " (left) "; break; + case CPG_REASON_NODEDOWN: reasonString = " (node-down) "; break; + case CPG_REASON_NODEUP: reasonString = " (node-up) "; break; + case CPG_REASON_PROCDOWN: reasonString = " (process-down) "; break; default: reasonString = " "; } qpid::cluster::MemberId member(*p); @@ -338,61 +350,52 @@ void Cluster::configChange ( cpg_name */*group*/, cpg_address *current, int nCurrent, cpg_address *left, int nLeft, - cpg_address *joined, int nJoined) + cpg_address */*joined*/, int /*nJoined*/) { Mutex::ScopedLock l(lock); - QPID_LOG(debug, *this << " configuration change: " << AddrList(current, nCurrent) + QPID_LOG(debug, *this << " enqueue config change: " << AddrList(current, nCurrent) << AddrList(left, nLeft, "( ", ")")); - map.configChange(current, nCurrent, left, nLeft, joined, nJoined); + std::string addresses; + for (cpg_address* p = current; p < current+nCurrent; ++p) + addresses.append(MemberId(*p).str()); + deliver(Event::control(ClusterConfigChangeBody(ProtocolVersion(), addresses), myId), l); +} + +void Cluster::configChange(const MemberId&, const std::string& addresses, Lock& l) { + bool memberChange = map.configChange(addresses); if (state == LEFT) return; - if (!map.isAlive(memberId)) { leave(l); return; } - if(state == INIT) { // First configChange - if (map.aliveCount() == 1) { + if (!map.isAlive(myId)) { // Final config change. + leave(l); + return; + } + + if (state == INIT) { // First configChange + if (map.aliveCount() == 1) { QPID_LOG(info, *this << " first in cluster at " << myUrl); - map = ClusterMap(memberId, myUrl, true); + state = READY; + if (mgmtObject!=0) mgmtObject->set_status("ACTIVE"); + map = ClusterMap(myId, myUrl, true); memberUpdate(l); - unstall(l); } else { // Joining established group. state = NEWBIE; - mcastControl(ClusterDumpRequestBody(ProtocolVersion(), myUrl.str()), 0, l); + mcastControl(ClusterDumpRequestBody(ProtocolVersion(), myUrl.str()), l); QPID_LOG(debug, *this << " send dump-request " << myUrl); } } - else if (state >= READY) + else if (state >= READY && memberChange) memberUpdate(l); } -void Cluster::dumpInDone(const ClusterMap& m) { - Lock l(lock); - dumpedMap = m; - checkDumpIn(l); -} + + void Cluster::tryMakeOffer(const MemberId& id, Lock& l) { if (state == READY && map.isNewbie(id)) { state = OFFER; QPID_LOG(debug, *this << " send dump-offer to " << id); - mcastControl(ClusterDumpOfferBody(ProtocolVersion(), id), 0, l); - } -} - -void Cluster::unstall(Lock& l) { - // Called with lock held - switch (state) { - case INIT: case DUMPEE: case DUMPER: case READY: - QPID_LOG(debug, *this << " unstall: deliver=" << deliverQueue.size() - << " mcast=" << mcastQueue.size()); - deliverQueue.start(); - state = READY; - for_each(mcastQueue.begin(), mcastQueue.end(), boost::bind(&Cluster::mcast, this, _1, boost::ref(l))); - mcastQueue.clear(); - if (mgmtObject!=0) mgmtObject->set_status("ACTIVE"); - break; - case LEFT: break; - case NEWBIE: case OFFER: - assert(0); + mcastControl(ClusterDumpOfferBody(ProtocolVersion(), id), l); } } @@ -418,23 +421,25 @@ void Cluster::dumpRequest(const MemberId& id, const std::string& url, Lock& l) { } void Cluster::ready(const MemberId& id, const std::string& url, Lock& l) { - map.ready(id, Url(url)); - if (id == memberId) - unstall(l); - memberUpdate(l); + if (map.ready(id, Url(url))) + memberUpdate(l); + if (state == CATCHUP && id == myId) { + QPID_LOG(debug, *this << " caught-up, going to ready mode."); + state = READY; + if (mgmtObject!=0) mgmtObject->set_status("ACTIVE"); + for_each(mcastQueue.begin(), mcastQueue.end(), boost::bind(&Cluster::mcast, this, _1, boost::ref(l))); + mcastQueue.clear(); + } } void Cluster::dumpOffer(const MemberId& dumper, uint64_t dumpeeInt, Lock& l) { if (state == LEFT) return; MemberId dumpee(dumpeeInt); boost::optional<Url> url = map.dumpOffer(dumper, dumpee); - if (dumper == memberId) { + if (dumper == myId) { assert(state == OFFER); if (url) { // My offer was first. - QPID_LOG(debug, *this << " mark dump point for dump to " << dumpee); - // Put dump-start on my own deliver queue to mark the stall point. - // We will stall when it is processed. - deliverQueue.push(Event::control(ClusterDumpStartBody(ProtocolVersion(), dumpee, url->str()), memberId)); + dumpStart(myId, dumpee, url->str(), l); } else { // Another offer was first. QPID_LOG(debug, *this << " cancel dump offer to " << dumpee); @@ -442,38 +447,47 @@ void Cluster::dumpOffer(const MemberId& dumper, uint64_t dumpeeInt, Lock& l) { tryMakeOffer(map.firstNewbie(), l); // Maybe make another offer. } } - else if (dumpee == memberId && url) { + else if (dumpee == myId && url) { assert(state == NEWBIE); QPID_LOG(debug, *this << " accepted dump-offer from " << dumper); state = DUMPEE; + deliverQueue.stop(); checkDumpIn(l); } } +// FIXME aconway 2008-10-15: no longer need a separate control now +// that the dump control is in the deliver queue. void Cluster::dumpStart(const MemberId& , uint64_t dumpeeInt, const std::string& urlStr, Lock& l) { if (state == LEFT) return; MemberId dumpee(dumpeeInt); Url url(urlStr); assert(state == OFFER); + state = DUMPER; deliverQueue.stop(); QPID_LOG(debug, *this << " stall and dump to " << dumpee << " at " << urlStr); - state = DUMPER; if (dumpThread.id()) dumpThread.join(); // Join the previous dumpthread. dumpThread = Thread( - new DumpClient(memberId, dumpee, url, broker, map, getConnections(l), + new DumpClient(myId, dumpee, url, broker, map, getConnections(l), boost::bind(&Cluster::dumpOutDone, this), boost::bind(&Cluster::dumpOutError, this, _1))); } +void Cluster::dumpInDone(const ClusterMap& m) { + Lock l(lock); + dumpedMap = m; + checkDumpIn(l); +} + void Cluster::checkDumpIn(Lock& l) { if (state == LEFT) return; - assert(state == DUMPEE || state == NEWBIE); if (state == DUMPEE && dumpedMap) { map = *dumpedMap; - QPID_LOG(debug, *this << " incoming dump complete. Members: " << map); - mcastControl(ClusterReadyBody(ProtocolVersion(), myUrl.str()), 0, l); - state = READY; - // unstall when ready control is self-delivered. + QPID_LOG(debug, *this << " incoming dump complete, start catchup. map=" << map); + mcastControl(ClusterReadyBody(ProtocolVersion(), myUrl.str()), l); + // Don't flush the mcast queue till we are READY, on self-deliver. + state = CATCHUP; + deliverQueue.start(); } } @@ -485,7 +499,8 @@ void Cluster::dumpOutDone() { void Cluster::dumpOutDone(Lock& l) { QPID_LOG(debug, *this << " finished sending dump."); assert(state == DUMPER); - unstall(l); + state = READY; + deliverQueue.start(); tryMakeOffer(map.firstNewbie(), l); // Try another offer } @@ -504,7 +519,7 @@ ManagementObject* Cluster::GetManagementObject() const { return mgmtObject; } Manageable::status_t Cluster::ManagementMethod (uint32_t methodId, Args&, string&) { Lock l(lock); - QPID_LOG (debug, *this << " managementMethod [id=" << methodId << "]"); + QPID_LOG(debug, *this << " managementMethod [id=" << methodId << "]"); switch (methodId) { case qmf::Cluster::METHOD_STOPCLUSTERNODE: stopClusterNode(l); break; case qmf::Cluster::METHOD_STOPFULLCLUSTER: stopFullCluster(l); break; @@ -520,10 +535,11 @@ void Cluster::stopClusterNode(Lock&) { void Cluster::stopFullCluster(Lock& l) { QPID_LOG(notice, *this << " shutting down cluster " << name.str()); - mcastControl(ClusterShutdownBody(), 0, l); + mcastControl(ClusterShutdownBody(), l); } void Cluster::memberUpdate(Lock& l) { + QPID_LOG(debug, *this << " member update, map=" << map); std::vector<Url> vectUrl = getUrls(l); size_t size = vectUrl.size(); @@ -552,12 +568,12 @@ void Cluster::memberUpdate(Lock& l) { } std::ostream& operator<<(std::ostream& o, const Cluster& cluster) { - static const char* STATE[] = { "INIT", "NEWBIE", "DUMPEE", "READY", "OFFER", "DUMPER", "LEFT" }; - return o << cluster.memberId << "(" << STATE[cluster.state] << ")"; + static const char* STATE[] = { "INIT", "NEWBIE", "DUMPEE", "CATCHUP", "READY", "OFFER", "DUMPER", "LEFT" }; + return o << cluster.myId << "(" << STATE[cluster.state] << ")"; } MemberId Cluster::getId() const { - return memberId; // Immutable, no need to lock. + return myId; // Immutable, no need to lock. } broker::Broker& Cluster::getBroker() const { diff --git a/cpp/src/qpid/cluster/Cluster.h b/cpp/src/qpid/cluster/Cluster.h index 723a23d1bd..8ee55e68f8 100644 --- a/cpp/src/qpid/cluster/Cluster.h +++ b/cpp/src/qpid/cluster/Cluster.h @@ -65,11 +65,11 @@ class Cluster : private Cpg::Handler, public management::Manageable { virtual ~Cluster(); // Connection map - void insert(const ConnectionPtr&); + bool insert(const ConnectionPtr&); void erase(ConnectionId); // Send to the cluster - void mcastControl(const framing::AMQBody& controlBody, Connection* cptr=0); + void mcastControl(const framing::AMQBody& controlBody, const ConnectionId&, uint32_t id); void mcastBuffer(const char*, size_t, const ConnectionId&, uint32_t id); void mcast(const Event& e); @@ -101,7 +101,8 @@ class Cluster : private Cpg::Handler, public management::Manageable { // a Lock to call the unlocked functions. // Unlocked versions of public functions - void mcastControl(const framing::AMQBody& controlBody, Connection* cptr, Lock&); + void mcastControl(const framing::AMQBody& controlBody, const ConnectionId&, uint32_t, Lock&); + void mcastControl(const framing::AMQBody& controlBody, Lock&); void mcastBuffer(const char*, size_t, const ConnectionId&, uint32_t id, Lock&); void mcast(const Event& e, Lock&); void leave(Lock&); @@ -110,9 +111,6 @@ class Cluster : private Cpg::Handler, public management::Manageable { // Called via CPG, deliverQueue or DumpClient threads. void tryMakeOffer(const MemberId&, Lock&); - // Called in CPG, connection IO and DumpClient threads. - void unstall(Lock&); - // Called in main thread in ~Broker. void brokerShutdown(); @@ -123,9 +121,10 @@ class Cluster : private Cpg::Handler, public management::Manageable { void dumpOffer(const MemberId& dumper, uint64_t dumpee, Lock&); void dumpStart(const MemberId& dumper, uint64_t dumpeeInt, const std::string& urlStr, Lock&); void ready(const MemberId&, const std::string&, Lock&); + void configChange(const MemberId&, const std::string& addresses, Lock& l); void shutdown(const MemberId&, Lock&); - void process(const Event&); // deliverQueue callback - void process(const Event&, Lock&); // unlocked version + void delivered(const Event&); // deliverQueue callback + void delivered(const Event&, Lock&); // unlocked version // CPG callbacks, called in CPG IO thread. void dispatch(sys::DispatchHandle&); // Dispatch CPG events. @@ -139,6 +138,8 @@ class Cluster : private Cpg::Handler, public management::Manageable { void* /*msg*/, int /*msg_len*/); + void deliver(const Event& e, Lock&); + void configChange( // CPG config change callback. cpg_handle_t /*handle*/, struct cpg_name */*group*/, @@ -172,7 +173,7 @@ class Cluster : private Cpg::Handler, public management::Manageable { Cpg cpg; const Cpg::Name name; const Url myUrl; - const MemberId memberId; + const MemberId myId; ConnectionMap connections; NoOpConnectionOutputHandler shadowOut; @@ -183,7 +184,17 @@ class Cluster : private Cpg::Handler, public management::Manageable { qmf::org::apache::qpid::cluster::Cluster* mgmtObject; // mgnt owns lifecycle - enum { INIT, NEWBIE, DUMPEE, READY, OFFER, DUMPER, LEFT } state; + enum { + INIT, ///< Initial state, no CPG messages received. + NEWBIE, ///< Sent dump request, waiting for dump offer. + DUMPEE, ///< Stalled receive queue at dump offer, waiting for dump to complete. + CATCHUP, ///< Dump complete, unstalled but has not yet seen own "ready" event. + READY, ///< Fully operational + OFFER, ///< Sent an offer, waiting for accept/reject. + DUMPER, ///< Offer accepted, sending a state dump. + LEFT ///< Final state, left the cluster. + } state; + ClusterMap map; sys::Thread dumpThread; boost::optional<ClusterMap> dumpedMap; diff --git a/cpp/src/qpid/cluster/ClusterMap.cpp b/cpp/src/qpid/cluster/ClusterMap.cpp index f8c5695b23..873f0be928 100644 --- a/cpp/src/qpid/cluster/ClusterMap.cpp +++ b/cpp/src/qpid/cluster/ClusterMap.cpp @@ -34,25 +34,24 @@ using namespace framing; namespace cluster { namespace { -void insertSet(ClusterMap::Set& set, const ClusterMap::Map::value_type& v) { set.insert(v.first); } -void insertMap(ClusterMap::Map& map, FieldTable::ValueMap::value_type vt) { - map.insert(ClusterMap::Map::value_type(vt.first, Url(vt.second->get<std::string>()))); +void addFieldTableValue(FieldTable::ValueMap::value_type vt, ClusterMap::Map& map, ClusterMap::Set& set) { + MemberId id(vt.first); + set.insert(id); + std::string url = vt.second->get<std::string>(); + if (!url.empty()) + map.insert(ClusterMap::Map::value_type(id, Url(url))); } -void assignMap(ClusterMap::Map& map, const FieldTable& ft) { - map.clear(); - std::for_each(ft.begin(), ft.end(), boost::bind(&insertMap, boost::ref(map), _1)); -} - -void insertFieldTable(FieldTable& ft, const ClusterMap::Map::value_type& vt) { - return ft.setString(vt.first.str(), vt.second.str()); +void insertFieldTableFromMapValue(FieldTable& ft, const ClusterMap::Map::value_type& vt) { + ft.setString(vt.first.str(), vt.second.str()); } void assignFieldTable(FieldTable& ft, const ClusterMap::Map& map) { ft.clear(); - std::for_each(map.begin(), map.end(), boost::bind(&insertFieldTable, boost::ref(ft), _1)); + std::for_each(map.begin(), map.end(), boost::bind(&insertFieldTableFromMapValue, boost::ref(ft), _1)); } + } ClusterMap::ClusterMap() {} @@ -66,10 +65,21 @@ ClusterMap::ClusterMap(const MemberId& id, const Url& url , bool isMember) { } ClusterMap::ClusterMap(const FieldTable& newbiesFt, const FieldTable& membersFt) { - assignMap(newbies, newbiesFt); - assignMap(members, membersFt); - std::for_each(newbies.begin(), newbies.end(), boost::bind(&insertSet, boost::ref(alive), _1)); - std::for_each(members.begin(), members.end(), boost::bind(&insertSet, boost::ref(alive), _1)); + std::for_each(newbiesFt.begin(), newbiesFt.end(), boost::bind(&addFieldTableValue, _1, boost::ref(newbies), boost::ref(alive))); + std::for_each(membersFt.begin(), membersFt.end(), boost::bind(&addFieldTableValue, _1, boost::ref(members), boost::ref(alive))); +} + +ClusterConnectionMembershipBody ClusterMap::asMethodBody() const { + framing::ClusterConnectionMembershipBody b; + b.getNewbies().clear(); + std::for_each(newbies.begin(), newbies.end(), boost::bind(&insertFieldTableFromMapValue, boost::ref(b.getNewbies()), _1)); + for(Set::const_iterator i = alive.begin(); i != alive.end(); ++i) { + if (!isMember(*i) && !isNewbie(*i)) + b.getNewbies().setString(i->str(), std::string()); + } + b.getMembers().clear(); + std::for_each(members.begin(), members.end(), boost::bind(&insertFieldTableFromMapValue, boost::ref(b.getMembers()), _1)); + return b; } bool ClusterMap::configChange( @@ -80,7 +90,7 @@ bool ClusterMap::configChange( cpg_address* a; bool memberChange=false; for (a = left; a != left+nLeft; ++a) { - memberChange = members.erase(*a); + memberChange = memberChange || members.erase(*a); newbies.erase(*a); } alive.clear(); @@ -97,13 +107,6 @@ MemberId ClusterMap::firstNewbie() const { return newbies.empty() ? MemberId() : newbies.begin()->first; } -ClusterConnectionMembershipBody ClusterMap::asMethodBody() const { - framing::ClusterConnectionMembershipBody b; - assignFieldTable(b.getNewbies(), newbies); - assignFieldTable(b.getMembers(), members); - return b; -} - std::vector<Url> ClusterMap::memberUrls() const { std::vector<Url> urls(members.size()); std::transform(members.begin(), members.end(), urls.begin(), @@ -121,7 +124,8 @@ std::ostream& operator<<(std::ostream& o, const ClusterMap& m) { for (ClusterMap::Set::const_iterator i = m.alive.begin(); i != m.alive.end(); ++i) { o << *i; if (m.isMember(*i)) o << "(member)"; - if (m.isNewbie(*i)) o << "(newbie)"; + else if (m.isNewbie(*i)) o << "(newbie)"; + else o << "(unknown)"; o << " "; } return o; @@ -139,6 +143,23 @@ bool ClusterMap::ready(const MemberId& id, const Url& url) { return isAlive(id) && members.insert(Map::value_type(id,url)).second; } +bool ClusterMap::configChange(const std::string& addresses) { + bool memberChange = false; + Set update; + for (std::string::const_iterator i = addresses.begin(); i < addresses.end(); i += 8) + update.insert(MemberId(std::string(i, i+8))); + Set removed; + std::set_difference(alive.begin(), alive.end(), + update.begin(), update.end(), + std::inserter(removed, removed.begin())); + alive = update; + for (Set::const_iterator i = removed.begin(); i != removed.end(); ++i) { + memberChange = memberChange || members.erase(*i); + newbies.erase(*i); + } + return memberChange; +} + boost::optional<Url> ClusterMap::dumpOffer(const MemberId& from, const MemberId& to) { Map::iterator i = newbies.find(to); if (isAlive(from) && i != newbies.end()) { diff --git a/cpp/src/qpid/cluster/ClusterMap.h b/cpp/src/qpid/cluster/ClusterMap.h index c0012facaf..5c1981269f 100644 --- a/cpp/src/qpid/cluster/ClusterMap.h +++ b/cpp/src/qpid/cluster/ClusterMap.h @@ -58,6 +58,8 @@ class ClusterMap { cpg_address *left, int nLeft, cpg_address *joined, int nJoined); + bool configChange(const std::string& addresses); + bool isNewbie(const MemberId& id) const { return newbies.find(id) != newbies.end(); } bool isMember(const MemberId& id) const { return members.find(id) != members.end(); } bool isAlive(const MemberId& id) const { return alive.find(id) != alive.end(); } diff --git a/cpp/src/qpid/cluster/Connection.cpp b/cpp/src/qpid/cluster/Connection.cpp index ae731ed25e..2f1518f871 100644 --- a/cpp/src/qpid/cluster/Connection.cpp +++ b/cpp/src/qpid/cluster/Connection.cpp @@ -44,7 +44,7 @@ Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out, : cluster(c), self(myId), catchUp(false), output(*this, out), connection(&output, cluster.getBroker(), wrappedId) { - QPID_LOG(debug, "New connection: " << *this); + QPID_LOG(debug, cluster << " new connection: " << *this); } // Local connections @@ -53,11 +53,11 @@ Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out, : cluster(c), self(myId, this), catchUp(isCatchUp), output(*this, out), connection(&output, cluster.getBroker(), wrappedId) { - QPID_LOG(debug, "New connection: " << *this); + QPID_LOG(debug, cluster << " new connection: " << *this); } Connection::~Connection() { - QPID_LOG(debug, "Deleted connection: " << *this); + QPID_LOG(debug, cluster << " deleted connection: " << *this); } bool Connection::doOutput() { @@ -72,32 +72,36 @@ void Connection::deliverDoOutput(uint32_t requested) { output.deliverDoOutput(requested); } +// FIXME aconway 2008-10-15: changes here, dubious. + // Received from a directly connected client. void Connection::received(framing::AMQFrame& f) { - QPID_LOG(trace, "RECV " << *this << ": " << f); - if (isShadow()) { - // Intercept the close that completes catch-up for shadow a connection. - if (isShadow() && f.getMethod() && f.getMethod()->isA<ConnectionCloseBody>()) { - catchUp = false; - cluster.insert(boost::intrusive_ptr<Connection>(this)); + QPID_LOG(trace, cluster << " RECV " << *this << ": " << f); + if (isLocal()) { + currentChannel = f.getChannel(); + if (!framing::invoke(*this, *f.getBody()).wasHandled()) + connection.received(f); + } + else { // Shadow or dumped ex catch-up connection. + if (f.getMethod() && f.getMethod()->isA<ConnectionCloseBody>()) { + if (isShadow()) { + QPID_LOG(debug, cluster << " inserting connection " << *this); + cluster.insert(boost::intrusive_ptr<Connection>(this)); + } AMQFrame ok(in_place<ConnectionCloseOkBody>()); connection.getOutput().send(ok); output.setOutputHandler(discardHandler); + catchUp = false; } else - QPID_LOG(warning, *this << " ignoring unexpected frame: " << f); - } - else { - currentChannel = f.getChannel(); - if (!framing::invoke(*this, *f.getBody()).wasHandled()) - connection.received(f); + QPID_LOG(warning, cluster << " ignoring unexpected frame " << *this << ": " << f); } } // Delivered from cluster. void Connection::delivered(framing::AMQFrame& f) { - QPID_LOG(trace, "DLVR " << *this << ": " << f); - assert(!isCatchUp()); + QPID_LOG(trace, cluster << "DLVR " << *this << ": " << f); + assert(!catchUp); // Handle connection controls, deliver other frames to connection. currentChannel = f.getChannel(); if (!framing::invoke(*this, *f.getBody()).wasHandled()) @@ -106,24 +110,25 @@ void Connection::delivered(framing::AMQFrame& f) { void Connection::closed() { try { - QPID_LOG(debug, "Connection closed " << *this); if (catchUp) { - QPID_LOG(critical, cluster << " error on catch-up connection " << *this); + QPID_LOG(critical, cluster << " catch-up connection closed prematurely " << *this); cluster.leave(); } - else if (isDump()) + else if (isDumped()) { + QPID_LOG(debug, cluster << " closed dump connection " << *this); connection.closed(); + } else if (isLocal()) { + QPID_LOG(debug, cluster << " local close of replicated connection " << *this); // This was a local replicated connection. Multicast a deliver // closed and process any outstanding frames from the cluster // until self-delivery of deliver-close. output.setOutputHandler(discardHandler); - cluster.mcastControl(ClusterConnectionDeliverCloseBody(), this); - ++mcastSeq; + cluster.mcastControl(ClusterConnectionDeliverCloseBody(), self, ++mcastSeq); } } catch (const std::exception& e) { - QPID_LOG(error, QPID_MSG("While closing connection: " << e.what())); + QPID_LOG(error, cluster << " error closing connection " << *this << ": " << e.what()); } } @@ -135,7 +140,7 @@ void Connection::deliverClose () { // Decode data from local clients. size_t Connection::decode(const char* buffer, size_t size) { - if (catchUp || isDump()) { // Handle catch-up locally. + if (catchUp) { // Handle catch-up locally. Buffer buf(const_cast<char*>(buffer), size); while (localDecoder.decode(buf)) received(localDecoder.frame); @@ -174,26 +179,39 @@ void Connection::sessionState( received, unknownCompleted, receivedIncomplete); - QPID_LOG(debug, "Received session state dump for " << s->getId()); + QPID_LOG(debug, cluster << " received session state dump for " << s->getId()); } void Connection::shadowReady(uint64_t memberId, uint64_t connectionId) { ConnectionId shadow = ConnectionId(memberId, connectionId); - QPID_LOG(debug, "Catch-up connection " << self << " becomes shadow " << shadow); + QPID_LOG(debug, cluster << " catch-up connection " << *this << " becomes shadow " << shadow); self = shadow; } -void Connection::membership(const FieldTable& urls, const FieldTable& states) { - cluster.dumpInDone(ClusterMap(urls,states)); - catchUp = false; - self.second = 0; // Mark this as completed dump connection. +void Connection::membership(const FieldTable& newbies, const FieldTable& members) { + QPID_LOG(debug, cluster << " incoming dump complete on connection " << *this); + cluster.dumpInDone(ClusterMap(newbies, members)); + self.second = 0; // Mark this as completed dump connection. } -bool Connection::isLocal() const { return self.first == cluster.getId() && self.second == this; } +bool Connection::isLocal() const { + return self.first == cluster.getId() && self.second == this; +} + +bool Connection::isShadow() const { + return self.first != cluster.getId(); +} + +bool Connection::isDumped() const { + return self.first == cluster.getId() && self.second == 0; +} std::ostream& operator<<(std::ostream& o, const Connection& c) { - return o << c.getId() << "(" << (c.isLocal() ? "local" : "shadow") - << (c.isCatchUp() ? ",catchup" : "") << ")"; + const char* type="unknown"; + if (c.isLocal()) type = "local"; + else if (c.isShadow()) type = "shadow"; + else if (c.isDumped()) type = "dumped"; + return o << c.getId() << "(" << type << (c.isCatchUp() ? ",catchup" : "") << ")"; } }} // namespace qpid::cluster diff --git a/cpp/src/qpid/cluster/Connection.h b/cpp/src/qpid/cluster/Connection.h index 7d92987e01..b537470b41 100644 --- a/cpp/src/qpid/cluster/Connection.h +++ b/cpp/src/qpid/cluster/Connection.h @@ -64,12 +64,13 @@ class Connection : bool isLocal() const; /** True for connections that are shadowing remote broker connections */ - bool isShadow() const { return !isLocal(); } + bool isShadow() const; /** True if the connection is in "catch-up" mode: building initial broker state. */ bool isCatchUp() const { return catchUp; } - bool isDump() const { return self.getPointer() == 0; } + /** True if the connection is a completed shared dump connection */ + bool isDumped() const; Cluster& getCluster() { return cluster; } @@ -103,6 +104,7 @@ class Connection : void membership(const framing::FieldTable&, const framing::FieldTable&); private: + bool catcUp; void deliverClose(); void deliverDoOutput(uint32_t requested); diff --git a/cpp/src/qpid/cluster/DumpClient.cpp b/cpp/src/qpid/cluster/DumpClient.cpp index 2b079a22bc..853eb689c6 100644 --- a/cpp/src/qpid/cluster/DumpClient.cpp +++ b/cpp/src/qpid/cluster/DumpClient.cpp @@ -168,9 +168,6 @@ void DumpClient::dumpConnection(const boost::intrusive_ptr<Connection>& dumpConn // authentication etc. See ConnectionSettings. shadowConnection.open(dumpeeUrl, bc.getUserId()); - // Stop the failover listener as its session will conflict with re-creating-sessions - client::ConnectionAccess::getImpl(shadowConnection)->stopFailoverListener(); - dumpConnection->getBrokerConnection().eachSessionHandler(boost::bind(&DumpClient::dumpSession, this, _1)); ClusterConnectionProxy(shadowConnection).shadowReady( dumpConnection->getId().getMember(), diff --git a/cpp/src/qpid/cluster/OutputInterceptor.cpp b/cpp/src/qpid/cluster/OutputInterceptor.cpp index f53b48ec1e..effd2c5bff 100644 --- a/cpp/src/qpid/cluster/OutputInterceptor.cpp +++ b/cpp/src/qpid/cluster/OutputInterceptor.cpp @@ -105,8 +105,9 @@ void OutputInterceptor::sendDoOutput() { // Note we may send 0 size request if there's more than 2*estimate in the buffer. // Send it anyway to keep the doOutput chain going until we are sure there's no more output // (in deliverDoOutput) - // - parent.getCluster().mcastControl(ClusterConnectionDeliverDoOutputBody(ProtocolVersion(), request), &parent); + // + // FIXME aconway 2008-10-16: use ++parent.mcastSeq as sequence no,not 0 + parent.getCluster().mcastControl(ClusterConnectionDeliverDoOutputBody(ProtocolVersion(), request), parent.getId(), 0); QPID_LOG(trace, &parent << "Send doOutput request for " << request); } diff --git a/cpp/src/tests/cluster_test.cpp b/cpp/src/tests/cluster_test.cpp index 5b9657c2c7..c8f5fb9083 100644 --- a/cpp/src/tests/cluster_test.cpp +++ b/cpp/src/tests/cluster_test.cpp @@ -177,46 +177,52 @@ template <class C> set<uint16_t> makeSet(const C& c) { return s; } -template <class T> std::set<uint16_t> knownBrokerPorts(T& source, size_t n) { +template <class T> std::set<uint16_t> knownBrokerPorts(T& source, int n=-1) { vector<Url> urls = source.getKnownBrokers(); - for (size_t retry=1000; urls.size() != n && retry != 0; --retry) { - ::usleep(1000); - urls = source.getKnownBrokers(); + BOOST_MESSAGE("knownBrokerPorts " << n << ": " << urls); + if (n >= 0) { + for (size_t retry=10; urls.size() != unsigned(n) && retry != 0; --retry) { + ::usleep(100000); + urls = source.getKnownBrokers(); + BOOST_MESSAGE("knownBrokerPorts retry: " << urls); + } } set<uint16_t> s; - for (vector<Url>::const_iterator i = urls.begin(); i != urls.end(); ++i) { - BOOST_MESSAGE("Failover URL: " << *i); - BOOST_CHECK(i->size() >= 1); - BOOST_CHECK((*i)[0].get<TcpAddress>()); + for (vector<Url>::const_iterator i = urls.begin(); i != urls.end(); ++i) s.insert((*i)[0].get<TcpAddress>()->port); - } return s; } -QPID_AUTO_TEST_CASE(testFailoverListener) { - ClusterFixture cluster(2); +QPID_AUTO_TEST_CASE(testConnectionKnownHosts) { + ClusterFixture cluster(1); Client c0(cluster[0], "c0"); - FailoverListener fl; - fl.start(ConnectionAccess::getImpl(c0.connection)); - set<uint16_t> set0=makeSet(cluster); + set<uint16_t> kb0 = knownBrokerPorts(c0.connection); + BOOST_CHECK_EQUAL(kb0.size(), 1); + BOOST_CHECK_EQUAL(kb0, makeSet(cluster)); - BOOST_CHECK_EQUAL(set0, knownBrokerPorts(fl, 2)); cluster.add(); - BOOST_CHECK_EQUAL(makeSet(cluster), knownBrokerPorts(fl, 3)); - cluster.kill(2); - BOOST_CHECK_EQUAL(set0, knownBrokerPorts(fl, 2)); -} - -QPID_AUTO_TEST_CASE(testConnectionKnownHosts) { - ClusterFixture cluster(2); - Client c0(cluster[0], "c0"); - set<uint16_t> set0=makeSet(cluster); + Client c1(cluster[1], "c1"); + set<uint16_t> kb1 = knownBrokerPorts(c1.connection); + kb0 = knownBrokerPorts(c0.connection, 2); + BOOST_CHECK_EQUAL(kb1.size(), 2); + BOOST_CHECK_EQUAL(kb1, makeSet(cluster)); + BOOST_CHECK_EQUAL(kb1,kb0); - BOOST_CHECK_EQUAL(set0, knownBrokerPorts(c0.connection, 2)); cluster.add(); - BOOST_CHECK_EQUAL(makeSet(cluster), knownBrokerPorts(c0.connection, 3)); - cluster.kill(2); - BOOST_CHECK_EQUAL(set0, knownBrokerPorts(c0.connection, 2)); + Client c2(cluster[2], "c2"); + set<uint16_t> kb2 = knownBrokerPorts(c2.connection); + kb1 = knownBrokerPorts(c1.connection, 3); + kb0 = knownBrokerPorts(c0.connection, 3); + BOOST_CHECK_EQUAL(kb2.size(), 3); + BOOST_CHECK_EQUAL(kb2, makeSet(cluster)); + BOOST_CHECK_EQUAL(kb2,kb0); + BOOST_CHECK_EQUAL(kb2,kb1); + + cluster.kill(1); + kb0 = knownBrokerPorts(c0.connection, 2); + kb2 = knownBrokerPorts(c2.connection, 2); + BOOST_CHECK_EQUAL(kb0.size(), 2); + BOOST_CHECK_EQUAL(kb0, kb2); } QPID_AUTO_TEST_CASE(DumpConsumers) { @@ -238,6 +244,7 @@ QPID_AUTO_TEST_CASE(DumpConsumers) { BOOST_CHECK_EQUAL(c1.session.queueQuery("q").getMessageCount(), 1u); BOOST_CHECK_EQUAL(c2.session.queueQuery("q").getMessageCount(), 1u); + // Activate the subscription, ensure message removed on all queues. c0.subs.setFlowControl("q", FlowControl::messageCredit(1)); Message m; |