diff options
author | Alan Conway <aconway@apache.org> | 2008-12-02 20:41:49 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2008-12-02 20:41:49 +0000 |
commit | 7cdb9a9ab688988e596d9fce116a0998decd0972 (patch) | |
tree | aef9d6d0bc837b2eb0116e863c8bc89ed8f45021 /cpp/src | |
parent | 0fa4afae5e690b1cf147ebbe60641b448fcb5c31 (diff) | |
download | qpid-python-7cdb9a9ab688988e596d9fce116a0998decd0972.tar.gz |
Cluster: handle CPG flow-control conditions.
PollableQueue: allow dispatch functions to refuse dispatch.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@722614 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src')
-rw-r--r-- | cpp/src/qpid/broker/SaslAuthenticator.cpp | 2 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/Cluster.cpp | 36 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/Cluster.h | 6 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/ClusterPlugin.cpp | 1 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/Cpg.cpp | 48 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/Cpg.h | 35 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/DumpClient.cpp | 4 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/Event.cpp | 4 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/Event.h | 2 | ||||
-rw-r--r-- | cpp/src/qpid/sys/PollableQueue.h | 30 |
10 files changed, 101 insertions, 67 deletions
diff --git a/cpp/src/qpid/broker/SaslAuthenticator.cpp b/cpp/src/qpid/broker/SaslAuthenticator.cpp index 4cbc3898f8..370de8a1d1 100644 --- a/cpp/src/qpid/broker/SaslAuthenticator.cpp +++ b/cpp/src/qpid/broker/SaslAuthenticator.cpp @@ -118,10 +118,12 @@ void SaslAuthenticator::fini(void) std::auto_ptr<SaslAuthenticator> SaslAuthenticator::createAuthenticator(Connection& c) { + static bool needWarning = true; if (c.getBroker().getOptions().auth) { return std::auto_ptr<SaslAuthenticator>(new CyrusAuthenticator(c)); } else { QPID_LOG(warning, "SASL: No Authentication Performed"); + needWarning = false; return std::auto_ptr<SaslAuthenticator>(new NullAuthenticator(c)); } } diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp index d536ac59f2..0ac0da2be4 100644 --- a/cpp/src/qpid/cluster/Cluster.cpp +++ b/cpp/src/qpid/cluster/Cluster.cpp @@ -99,7 +99,7 @@ Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker& b, b boost::bind(&Cluster::disconnect, this, _1) // disconnect ), deliverQueue(boost::bind(&Cluster::delivered, this, _1), poller), - mcastQueue(boost::bind(&Event::mcast, _1, boost::cref(name), boost::ref(cpg)), poller), + mcastQueue(boost::bind(&Cluster::sendMcast, this, _1), poller), mcastId(0), mgmtObject(0), state(INIT), @@ -109,7 +109,7 @@ Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker& b, b ManagementAgent* agent = ManagementAgent::Singleton::getInstance(); if (agent != 0){ qmf::Package packageInit(agent); - mgmtObject = new qmf::Cluster (agent, this, &broker,name.str(),myUrl.str()); + mgmtObject = new qmf::Cluster (agent, this, &broker,name,myUrl.str()); agent->addObject (mgmtObject); mgmtObject->set_status("JOINING"); } @@ -118,7 +118,7 @@ Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker& b, b cpgDispatchHandle.startWatch(poller); deliverQueue.start(); mcastQueue.start(); - QPID_LOG(notice, *this << " joining cluster " << name.str() << " with url=" << myUrl); + QPID_LOG(notice, *this << " joining cluster " << name << " with url=" << myUrl); if (useQuorum) quorum.init(); cpg.join(name); broker.addFinalizer(boost::bind(&Cluster::brokerShutdown, this)); // Must be last for exception safety. @@ -184,6 +184,17 @@ void Cluster::mcast(const Event& e, Lock&) { mcastQueue.push(e); } +bool Cluster::sendMcast(const Event& e) { + try { + return e.mcast(cpg); + } + catch (const std::exception& e) { + QPID_LOG(critical, "Multicast failure: " << e.what()); + leave(); + return false; + } +} + std::vector<Url> Cluster::getUrls() const { Lock l(lock); return getUrls(l); @@ -201,10 +212,10 @@ void Cluster::leave() { void Cluster::leave(Lock&) { if (state != LEFT) { state = LEFT; - QPID_LOG(notice, *this << " leaving cluster " << name.str()); + QPID_LOG(notice, *this << " leaving cluster " << name); if (mgmtObject!=0) mgmtObject->set_status("SHUTDOWN"); if (!deliverQueue.isStopped()) deliverQueue.stop(); - try { cpg.leave(name); } + try { cpg.leave(); } catch (const std::exception& e) { QPID_LOG(critical, *this << " error leaving process group: " << e.what()); } @@ -224,7 +235,7 @@ boost::intrusive_ptr<Connection> Cluster::getConnection(const ConnectionId& conn } else { // New shadow connection std::ostringstream mgmtId; - mgmtId << name.str() << ":" << connectionId; + mgmtId << name << ":" << connectionId; ConnectionMap::value_type value(connectionId, new Connection(*this, shadowOut, mgmtId.str(), connectionId)); i = connections.insert(value).first; @@ -260,7 +271,7 @@ void Cluster::deliver(const Event& e, Lock&) { } // Entry point: called when deliverQueue has events to process. -void Cluster::delivered(const Event& e) { +bool Cluster::delivered(const Event& e) { try { Lock l(lock); delivered(e,l); @@ -268,7 +279,7 @@ void Cluster::delivered(const Event& e) { QPID_LOG(critical, *this << " error in cluster delivery: " << e.what()); leave(); } - + return true; } void Cluster::delivered(const Event& e, Lock& l) { @@ -334,6 +345,7 @@ ostream& operator<<(ostream& o, const AddrList& a) { void Cluster::dispatch(sys::DispatchHandle& h) { try { cpg.dispatchAll(); + mcastQueue.start(); // In case it was stopped by flow control. h.rewatch(); } catch (const std::exception& e) { QPID_LOG(critical, *this << " error in cluster dispatch: " << e.what()); @@ -359,7 +371,7 @@ void Cluster::configChange ( cpg_address */*joined*/, int /*nJoined*/) { Mutex::ScopedLock l(lock); - QPID_LOG(debug, *this << " enqueue config change: " << AddrList(current, nCurrent) + QPID_LOG(debug, *this << " config change: " << AddrList(current, nCurrent) << AddrList(left, nLeft, "( ", ")")); std::string addresses; for (cpg_address* p = current; p < current+nCurrent; ++p) @@ -387,7 +399,7 @@ void Cluster::configChange(const MemberId&, const std::string& addresses, Lock& } else { // Joining established group. state = NEWBIE; - QPID_LOG(info, *this << " joining established cluster"); + QPID_LOG(info, *this << " joining cluster: " << map); mcastControl(ClusterDumpRequestBody(ProtocolVersion(), myUrl.str()), l); } } @@ -542,12 +554,12 @@ void Cluster::stopClusterNode(Lock& l) { } void Cluster::stopFullCluster(Lock& l) { - QPID_LOG(notice, *this << " shutting down cluster " << name.str()); + QPID_LOG(notice, *this << " shutting down cluster " << name); mcastControl(ClusterShutdownBody(), l); } void Cluster::memberUpdate(Lock& l) { - QPID_LOG(info, *this << map.memberCount() << " members: " << map); + QPID_LOG(info, *this << " member update: " << map); std::vector<Url> urls = getUrls(l); size_t size = urls.size(); failoverExchange->setUrls(urls); diff --git a/cpp/src/qpid/cluster/Cluster.h b/cpp/src/qpid/cluster/Cluster.h index 81feef4919..94f0c6a95f 100644 --- a/cpp/src/qpid/cluster/Cluster.h +++ b/cpp/src/qpid/cluster/Cluster.h @@ -118,6 +118,8 @@ class Cluster : private Cpg::Handler, public management::Manageable { void leave(Lock&); std::vector<Url> getUrls(Lock&) const; + bool sendMcast(const Event& e); + // Called via CPG, deliverQueue or DumpClient threads. void tryMakeOffer(const MemberId&, Lock&); @@ -133,7 +135,7 @@ class Cluster : private Cpg::Handler, public management::Manageable { void ready(const MemberId&, const std::string&, Lock&); void configChange(const MemberId&, const std::string& addresses, Lock& l); void shutdown(const MemberId&, Lock&); - void delivered(const Event&); // deliverQueue callback + bool delivered(const Event&); // deliverQueue callback void delivered(const Event&, Lock&); // unlocked version // CPG callbacks, called in CPG IO thread. @@ -183,7 +185,7 @@ class Cluster : private Cpg::Handler, public management::Manageable { broker::Broker& broker; boost::shared_ptr<sys::Poller> poller; Cpg cpg; - const Cpg::Name name; + const std::string name; const Url myUrl; const MemberId myId; diff --git a/cpp/src/qpid/cluster/ClusterPlugin.cpp b/cpp/src/qpid/cluster/ClusterPlugin.cpp index eaf2631d03..02e6fffb71 100644 --- a/cpp/src/qpid/cluster/ClusterPlugin.cpp +++ b/cpp/src/qpid/cluster/ClusterPlugin.cpp @@ -29,6 +29,7 @@ #include "qpid/log/Statement.h" #include <boost/utility/in_place_factory.hpp> +#include <boost/scoped_ptr.hpp> namespace qpid { namespace cluster { diff --git a/cpp/src/qpid/cluster/Cpg.cpp b/cpp/src/qpid/cluster/Cpg.cpp index 9b71e4235d..6a9c97139a 100644 --- a/cpp/src/qpid/cluster/Cpg.cpp +++ b/cpp/src/qpid/cluster/Cpg.cpp @@ -87,12 +87,13 @@ Cpg::~Cpg() { } } -void Cpg::join(const Name& group) { - check(cpg_join(handle, const_cast<Name*>(&group)),cantJoinMsg(group)); +void Cpg::join(const std::string& name) { + group = name; + check(cpg_join(handle, &group), cantJoinMsg(group)); } -void Cpg::leave(const Name& group) { - check(cpg_leave(handle,const_cast<Name*>(&group)),cantLeaveMsg(group)); +void Cpg::leave() { + check(cpg_leave(handle, &group), cantLeaveMsg(group)); } bool Cpg::isFlowControlEnabled() { @@ -101,29 +102,22 @@ bool Cpg::isFlowControlEnabled() { return flowState == CPG_FLOW_CONTROL_ENABLED; } -// FIXME aconway 2008-08-07: better handling of cpg flow control, no sleeping. -void Cpg::waitForFlowControl() { - int delayNs=1000; // one millisecond - int tries=8; // double the delay on each try. - while (isFlowControlEnabled() && tries > 0) { - QPID_LOG(warning, "CPG flow control enabled, retry in " << delayNs << "ns"); - ::usleep(delayNs); - --tries; - delayNs *= 2; - }; - if (tries == 0) { - // FIXME aconway 2008-08-07: this is a fatal leave-the-cluster condition. - throw Cpg::Exception("CPG flow control enabled, failed to send."); +bool Cpg::mcast(const iovec* iov, int iovLen) { + // Thread-safety note : the cpg_ calls are thread safe, but there + // is a race below between calling cpg_flow_control_state_get() + // and calling mcast_joined() where N threads could see the state + // as disabled and call mcast, but only M < N messages can be sent + // without exceeding flow control limits. + if (isFlowControlEnabled()) { + QPID_LOG(warning, "CPG flow control enabled") + return false; } -} - -void Cpg::mcast(const Name& group, const iovec* iov, int iovLen) { - waitForFlowControl(); cpg_error_t result; do { result = cpg_mcast_joined(handle, CPG_TYPE_AGREED, const_cast<iovec*>(iov), iovLen); if (result != CPG_ERR_TRY_AGAIN) check(result, cantMcastMsg(group)); } while(result == CPG_ERR_TRY_AGAIN); + return true; } void Cpg::shutdown() { @@ -134,6 +128,10 @@ void Cpg::shutdown() { } } +void Cpg::dispatch(cpg_dispatch_t type) { + check(cpg_dispatch(handle,type), "Error in CPG dispatch"); +} + string Cpg::errorStr(cpg_error_t err, const std::string& msg) { switch (err) { case CPG_OK: return msg+": ok"; @@ -173,8 +171,14 @@ MemberId Cpg::self() const { return MemberId(nodeid, getpid()); } +namespace { int byte(uint32_t value, int i) { return (value >> (i*8)) & 0xff; } } + ostream& operator <<(ostream& out, const MemberId& id) { - return out << std::hex << id.first << ":" << std::dec << id.second; + out << byte(id.first, 0) << "." + << byte(id.first, 1) << "." + << byte(id.first, 2) << "." + << byte(id.first, 3); + return out << ":" << id.second; } ostream& operator<<(ostream& o, const ConnectionId& c) { diff --git a/cpp/src/qpid/cluster/Cpg.h b/cpp/src/qpid/cluster/Cpg.h index 5ffd42e12a..2bd58cea1f 100644 --- a/cpp/src/qpid/cluster/Cpg.h +++ b/cpp/src/qpid/cluster/Cpg.h @@ -19,16 +19,15 @@ * */ -#include "qpid/cluster/types.h" -#include "qpid/cluster/Dispatchable.h" - #include "qpid/Exception.h" +#include "qpid/cluster/Dispatchable.h" +#include "qpid/cluster/types.h" #include "qpid/sys/IOHandle.h" +#include "qpid/sys/Mutex.h" #include <boost/scoped_ptr.hpp> #include <cassert> - #include <string.h> extern "C" { @@ -38,7 +37,6 @@ extern "C" { namespace qpid { namespace cluster { - /** * Lightweight C++ interface to cpg.h operations. * @@ -53,6 +51,7 @@ class Cpg : public sys::IOHandle { }; struct Name : public cpg_name { + Name() { length = 0; } Name(const char* s) { copy(s, strlen(s)); } Name(const char* s, size_t n) { copy(s,n); } Name(const std::string& s) { copy(s.data(), s.size()); } @@ -105,17 +104,21 @@ class Cpg : public sys::IOHandle { * - CPG_DISPATCH_ALL - dispatch all available events, don't wait. * - CPG_DISPATCH_BLOCKING - blocking dispatch loop. */ - void dispatch(cpg_dispatch_t type) { - check(cpg_dispatch(handle,type), "Error in CPG dispatch"); - } + void dispatch(cpg_dispatch_t type); void dispatchOne() { dispatch(CPG_DISPATCH_ONE); } void dispatchAll() { dispatch(CPG_DISPATCH_ALL); } void dispatchBlocking() { dispatch(CPG_DISPATCH_BLOCKING); } - void join(const Name& group); - void leave(const Name& group); - void mcast(const Name& group, const iovec* iov, int iovLen); + void join(const std::string& group); + void leave(); + + /** Multicast to the group. NB: must not be called concurrently. + * + *@return true if the message was multi-cast, false if + * it was not sent due to flow control. + */ + bool mcast(const iovec* iov, int iovLen); cpg_handle_t getHandle() const { return handle; } @@ -123,10 +126,13 @@ class Cpg : public sys::IOHandle { int getFd(); + bool isFlowControlEnabled(); + private: static std::string errorStr(cpg_error_t err, const std::string& msg); static std::string cantJoinMsg(const Name&); - static std::string cantLeaveMsg(const Name&); std::string cantMcastMsg(const Name&); + static std::string cantLeaveMsg(const Name&); + static std::string cantMcastMsg(const Name&); static void check(cpg_error_t result, const std::string& msg) { if (result != CPG_OK) throw Exception(errorStr(result, msg)); @@ -150,12 +156,11 @@ class Cpg : public sys::IOHandle { struct cpg_address *joined, int nJoined ); - bool isFlowControlEnabled(); - void waitForFlowControl(); - cpg_handle_t handle; Handler& handler; bool isShutdown; + Name group; + sys::Mutex dispatchLock; }; inline bool operator==(const cpg_name& a, const cpg_name& b) { diff --git a/cpp/src/qpid/cluster/DumpClient.cpp b/cpp/src/qpid/cluster/DumpClient.cpp index 3a4f217721..3f3212470d 100644 --- a/cpp/src/qpid/cluster/DumpClient.cpp +++ b/cpp/src/qpid/cluster/DumpClient.cpp @@ -85,11 +85,11 @@ void send(client::AsyncSession& s, const AMQBody& body) { // TODO aconway 2008-09-24: optimization: dump connections/sessions in parallel. -DumpClient::DumpClient(const MemberId& from, const MemberId& to, const Url& url, +DumpClient::DumpClient(const MemberId& dumper, const MemberId& dumpee, const Url& url, broker::Broker& broker, const ClusterMap& m, const Cluster::Connections& cons, const boost::function<void()>& ok, const boost::function<void(const std::exception&)>& fail) - : dumperId(to), dumpeeId(from), dumpeeUrl(url), dumperBroker(broker), map(m), connections(cons), + : dumperId(dumper), dumpeeId(dumpee), dumpeeUrl(url), dumperBroker(broker), map(m), connections(cons), connection(catchUpConnection()), shadowConnection(catchUpConnection()), done(ok), failed(fail) { diff --git a/cpp/src/qpid/cluster/Event.cpp b/cpp/src/qpid/cluster/Event.cpp index 3f2b5443d2..87cc7e7bd3 100644 --- a/cpp/src/qpid/cluster/Event.cpp +++ b/cpp/src/qpid/cluster/Event.cpp @@ -56,14 +56,14 @@ Event Event::control(const framing::AMQBody& body, const ConnectionId& cid, uint return e; } -void Event::mcast (const Cpg::Name& name, Cpg& cpg) const { +bool Event::mcast (Cpg& cpg) const { char header[OVERHEAD]; Buffer b(header, OVERHEAD); b.putOctet(type); b.putLongLong(reinterpret_cast<uint64_t>(connectionId.getPointer())); b.putLong(id); iovec iov[] = { { header, OVERHEAD }, { const_cast<char*>(getData()), getSize() } }; - cpg.mcast(name, iov, sizeof(iov)/sizeof(*iov)); + return cpg.mcast(iov, sizeof(iov)/sizeof(*iov)); } Event::operator Buffer() const { diff --git a/cpp/src/qpid/cluster/Event.h b/cpp/src/qpid/cluster/Event.h index 9a2b12bf05..b61ce0e60d 100644 --- a/cpp/src/qpid/cluster/Event.h +++ b/cpp/src/qpid/cluster/Event.h @@ -51,7 +51,7 @@ class Event { /** Create an event containing a control */ static Event control(const framing::AMQBody&, const ConnectionId&, uint32_t id=0); - void mcast(const Cpg::Name& name, Cpg& cpg) const; + bool mcast(Cpg& cpg) const; EventType getType() const { return type; } ConnectionId getConnectionId() const { return connectionId; } diff --git a/cpp/src/qpid/sys/PollableQueue.h b/cpp/src/qpid/sys/PollableQueue.h index a594dab86d..2ee29db022 100644 --- a/cpp/src/qpid/sys/PollableQueue.h +++ b/cpp/src/qpid/sys/PollableQueue.h @@ -44,8 +44,13 @@ class Poller; template <class T> class PollableQueue { public: - /** Callback to process a range of items. */ - typedef boost::function<void (const T&)> Callback; + /** + * Callback to process an item from the queue. + * + * @return If true the item is removed from the queue else it + * remains on the queue and the queue is stopped. + */ + typedef boost::function<bool (const T&)> Callback; /** When the queue is selected by the poller, values are passed to callback cb. */ PollableQueue(const Callback& cb, const boost::shared_ptr<sys::Poller>& poller); @@ -66,6 +71,7 @@ class PollableQueue { size_t size() { ScopedLock l(lock); return queue.size(); } bool empty() { ScopedLock l(lock); return queue.empty(); } + private: typedef std::deque<T> Queue; typedef sys::Monitor::ScopedLock ScopedLock; @@ -94,7 +100,7 @@ template <class T> PollableQueue<T>::PollableQueue( template <class T> void PollableQueue<T>::start() { ScopedLock l(lock); - assert(stopped); + if (!stopped) return; stopped = false; if (!queue.empty()) condition.set(); handle.rewatch(); @@ -115,25 +121,27 @@ template <class T> void PollableQueue<T>::dispatch(sys::DispatchHandle& h) { assert(dispatcher.id() == 0 || dispatcher.id() == Thread::current().id()); dispatcher = Thread::current(); while (!stopped && !queue.empty()) { - T value = queue.front(); - queue.pop_front(); - { // callback outside the lock to allow concurrent push. + bool ok = false; + { // unlock to allow concurrent push or call to stop() in callback. ScopedUnlock u(lock); - callback(value); + // FIXME aconway 2008-12-02: exception-safe if callback throws. + ok = callback(queue.front()); } + if (ok) queue.pop_front(); + else stopped=true; } + dispatcher = Thread(); if (queue.empty()) condition.clear(); if (stopped) lock.notifyAll(); - dispatcher = Thread(); - if (!stopped) h.rewatch(); + else h.rewatch(); } template <class T> void PollableQueue<T>::stop() { ScopedLock l(lock); - assert(!stopped); + if (stopped) return; handle.unwatch(); stopped = true; - // No deadlock if stop is called from the dispatcher thread + // Avoid deadlock if stop is called from the dispatch thread while (dispatcher.id() && dispatcher.id() != Thread::current().id()) lock.wait(); } |