summaryrefslogtreecommitdiff
path: root/cpp/src
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2008-12-02 20:41:49 +0000
committerAlan Conway <aconway@apache.org>2008-12-02 20:41:49 +0000
commit7cdb9a9ab688988e596d9fce116a0998decd0972 (patch)
treeaef9d6d0bc837b2eb0116e863c8bc89ed8f45021 /cpp/src
parent0fa4afae5e690b1cf147ebbe60641b448fcb5c31 (diff)
downloadqpid-python-7cdb9a9ab688988e596d9fce116a0998decd0972.tar.gz
Cluster: handle CPG flow-control conditions.
PollableQueue: allow dispatch functions to refuse dispatch. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@722614 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src')
-rw-r--r--cpp/src/qpid/broker/SaslAuthenticator.cpp2
-rw-r--r--cpp/src/qpid/cluster/Cluster.cpp36
-rw-r--r--cpp/src/qpid/cluster/Cluster.h6
-rw-r--r--cpp/src/qpid/cluster/ClusterPlugin.cpp1
-rw-r--r--cpp/src/qpid/cluster/Cpg.cpp48
-rw-r--r--cpp/src/qpid/cluster/Cpg.h35
-rw-r--r--cpp/src/qpid/cluster/DumpClient.cpp4
-rw-r--r--cpp/src/qpid/cluster/Event.cpp4
-rw-r--r--cpp/src/qpid/cluster/Event.h2
-rw-r--r--cpp/src/qpid/sys/PollableQueue.h30
10 files changed, 101 insertions, 67 deletions
diff --git a/cpp/src/qpid/broker/SaslAuthenticator.cpp b/cpp/src/qpid/broker/SaslAuthenticator.cpp
index 4cbc3898f8..370de8a1d1 100644
--- a/cpp/src/qpid/broker/SaslAuthenticator.cpp
+++ b/cpp/src/qpid/broker/SaslAuthenticator.cpp
@@ -118,10 +118,12 @@ void SaslAuthenticator::fini(void)
std::auto_ptr<SaslAuthenticator> SaslAuthenticator::createAuthenticator(Connection& c)
{
+ static bool needWarning = true;
if (c.getBroker().getOptions().auth) {
return std::auto_ptr<SaslAuthenticator>(new CyrusAuthenticator(c));
} else {
QPID_LOG(warning, "SASL: No Authentication Performed");
+ needWarning = false;
return std::auto_ptr<SaslAuthenticator>(new NullAuthenticator(c));
}
}
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp
index d536ac59f2..0ac0da2be4 100644
--- a/cpp/src/qpid/cluster/Cluster.cpp
+++ b/cpp/src/qpid/cluster/Cluster.cpp
@@ -99,7 +99,7 @@ Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker& b, b
boost::bind(&Cluster::disconnect, this, _1) // disconnect
),
deliverQueue(boost::bind(&Cluster::delivered, this, _1), poller),
- mcastQueue(boost::bind(&Event::mcast, _1, boost::cref(name), boost::ref(cpg)), poller),
+ mcastQueue(boost::bind(&Cluster::sendMcast, this, _1), poller),
mcastId(0),
mgmtObject(0),
state(INIT),
@@ -109,7 +109,7 @@ Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker& b, b
ManagementAgent* agent = ManagementAgent::Singleton::getInstance();
if (agent != 0){
qmf::Package packageInit(agent);
- mgmtObject = new qmf::Cluster (agent, this, &broker,name.str(),myUrl.str());
+ mgmtObject = new qmf::Cluster (agent, this, &broker,name,myUrl.str());
agent->addObject (mgmtObject);
mgmtObject->set_status("JOINING");
}
@@ -118,7 +118,7 @@ Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker& b, b
cpgDispatchHandle.startWatch(poller);
deliverQueue.start();
mcastQueue.start();
- QPID_LOG(notice, *this << " joining cluster " << name.str() << " with url=" << myUrl);
+ QPID_LOG(notice, *this << " joining cluster " << name << " with url=" << myUrl);
if (useQuorum) quorum.init();
cpg.join(name);
broker.addFinalizer(boost::bind(&Cluster::brokerShutdown, this)); // Must be last for exception safety.
@@ -184,6 +184,17 @@ void Cluster::mcast(const Event& e, Lock&) {
mcastQueue.push(e);
}
+bool Cluster::sendMcast(const Event& e) {
+ try {
+ return e.mcast(cpg);
+ }
+ catch (const std::exception& e) {
+ QPID_LOG(critical, "Multicast failure: " << e.what());
+ leave();
+ return false;
+ }
+}
+
std::vector<Url> Cluster::getUrls() const {
Lock l(lock);
return getUrls(l);
@@ -201,10 +212,10 @@ void Cluster::leave() {
void Cluster::leave(Lock&) {
if (state != LEFT) {
state = LEFT;
- QPID_LOG(notice, *this << " leaving cluster " << name.str());
+ QPID_LOG(notice, *this << " leaving cluster " << name);
if (mgmtObject!=0) mgmtObject->set_status("SHUTDOWN");
if (!deliverQueue.isStopped()) deliverQueue.stop();
- try { cpg.leave(name); }
+ try { cpg.leave(); }
catch (const std::exception& e) {
QPID_LOG(critical, *this << " error leaving process group: " << e.what());
}
@@ -224,7 +235,7 @@ boost::intrusive_ptr<Connection> Cluster::getConnection(const ConnectionId& conn
}
else { // New shadow connection
std::ostringstream mgmtId;
- mgmtId << name.str() << ":" << connectionId;
+ mgmtId << name << ":" << connectionId;
ConnectionMap::value_type value(connectionId,
new Connection(*this, shadowOut, mgmtId.str(), connectionId));
i = connections.insert(value).first;
@@ -260,7 +271,7 @@ void Cluster::deliver(const Event& e, Lock&) {
}
// Entry point: called when deliverQueue has events to process.
-void Cluster::delivered(const Event& e) {
+bool Cluster::delivered(const Event& e) {
try {
Lock l(lock);
delivered(e,l);
@@ -268,7 +279,7 @@ void Cluster::delivered(const Event& e) {
QPID_LOG(critical, *this << " error in cluster delivery: " << e.what());
leave();
}
-
+ return true;
}
void Cluster::delivered(const Event& e, Lock& l) {
@@ -334,6 +345,7 @@ ostream& operator<<(ostream& o, const AddrList& a) {
void Cluster::dispatch(sys::DispatchHandle& h) {
try {
cpg.dispatchAll();
+ mcastQueue.start(); // In case it was stopped by flow control.
h.rewatch();
} catch (const std::exception& e) {
QPID_LOG(critical, *this << " error in cluster dispatch: " << e.what());
@@ -359,7 +371,7 @@ void Cluster::configChange (
cpg_address */*joined*/, int /*nJoined*/)
{
Mutex::ScopedLock l(lock);
- QPID_LOG(debug, *this << " enqueue config change: " << AddrList(current, nCurrent)
+ QPID_LOG(debug, *this << " config change: " << AddrList(current, nCurrent)
<< AddrList(left, nLeft, "( ", ")"));
std::string addresses;
for (cpg_address* p = current; p < current+nCurrent; ++p)
@@ -387,7 +399,7 @@ void Cluster::configChange(const MemberId&, const std::string& addresses, Lock&
}
else { // Joining established group.
state = NEWBIE;
- QPID_LOG(info, *this << " joining established cluster");
+ QPID_LOG(info, *this << " joining cluster: " << map);
mcastControl(ClusterDumpRequestBody(ProtocolVersion(), myUrl.str()), l);
}
}
@@ -542,12 +554,12 @@ void Cluster::stopClusterNode(Lock& l) {
}
void Cluster::stopFullCluster(Lock& l) {
- QPID_LOG(notice, *this << " shutting down cluster " << name.str());
+ QPID_LOG(notice, *this << " shutting down cluster " << name);
mcastControl(ClusterShutdownBody(), l);
}
void Cluster::memberUpdate(Lock& l) {
- QPID_LOG(info, *this << map.memberCount() << " members: " << map);
+ QPID_LOG(info, *this << " member update: " << map);
std::vector<Url> urls = getUrls(l);
size_t size = urls.size();
failoverExchange->setUrls(urls);
diff --git a/cpp/src/qpid/cluster/Cluster.h b/cpp/src/qpid/cluster/Cluster.h
index 81feef4919..94f0c6a95f 100644
--- a/cpp/src/qpid/cluster/Cluster.h
+++ b/cpp/src/qpid/cluster/Cluster.h
@@ -118,6 +118,8 @@ class Cluster : private Cpg::Handler, public management::Manageable {
void leave(Lock&);
std::vector<Url> getUrls(Lock&) const;
+ bool sendMcast(const Event& e);
+
// Called via CPG, deliverQueue or DumpClient threads.
void tryMakeOffer(const MemberId&, Lock&);
@@ -133,7 +135,7 @@ class Cluster : private Cpg::Handler, public management::Manageable {
void ready(const MemberId&, const std::string&, Lock&);
void configChange(const MemberId&, const std::string& addresses, Lock& l);
void shutdown(const MemberId&, Lock&);
- void delivered(const Event&); // deliverQueue callback
+ bool delivered(const Event&); // deliverQueue callback
void delivered(const Event&, Lock&); // unlocked version
// CPG callbacks, called in CPG IO thread.
@@ -183,7 +185,7 @@ class Cluster : private Cpg::Handler, public management::Manageable {
broker::Broker& broker;
boost::shared_ptr<sys::Poller> poller;
Cpg cpg;
- const Cpg::Name name;
+ const std::string name;
const Url myUrl;
const MemberId myId;
diff --git a/cpp/src/qpid/cluster/ClusterPlugin.cpp b/cpp/src/qpid/cluster/ClusterPlugin.cpp
index eaf2631d03..02e6fffb71 100644
--- a/cpp/src/qpid/cluster/ClusterPlugin.cpp
+++ b/cpp/src/qpid/cluster/ClusterPlugin.cpp
@@ -29,6 +29,7 @@
#include "qpid/log/Statement.h"
#include <boost/utility/in_place_factory.hpp>
+#include <boost/scoped_ptr.hpp>
namespace qpid {
namespace cluster {
diff --git a/cpp/src/qpid/cluster/Cpg.cpp b/cpp/src/qpid/cluster/Cpg.cpp
index 9b71e4235d..6a9c97139a 100644
--- a/cpp/src/qpid/cluster/Cpg.cpp
+++ b/cpp/src/qpid/cluster/Cpg.cpp
@@ -87,12 +87,13 @@ Cpg::~Cpg() {
}
}
-void Cpg::join(const Name& group) {
- check(cpg_join(handle, const_cast<Name*>(&group)),cantJoinMsg(group));
+void Cpg::join(const std::string& name) {
+ group = name;
+ check(cpg_join(handle, &group), cantJoinMsg(group));
}
-void Cpg::leave(const Name& group) {
- check(cpg_leave(handle,const_cast<Name*>(&group)),cantLeaveMsg(group));
+void Cpg::leave() {
+ check(cpg_leave(handle, &group), cantLeaveMsg(group));
}
bool Cpg::isFlowControlEnabled() {
@@ -101,29 +102,22 @@ bool Cpg::isFlowControlEnabled() {
return flowState == CPG_FLOW_CONTROL_ENABLED;
}
-// FIXME aconway 2008-08-07: better handling of cpg flow control, no sleeping.
-void Cpg::waitForFlowControl() {
- int delayNs=1000; // one millisecond
- int tries=8; // double the delay on each try.
- while (isFlowControlEnabled() && tries > 0) {
- QPID_LOG(warning, "CPG flow control enabled, retry in " << delayNs << "ns");
- ::usleep(delayNs);
- --tries;
- delayNs *= 2;
- };
- if (tries == 0) {
- // FIXME aconway 2008-08-07: this is a fatal leave-the-cluster condition.
- throw Cpg::Exception("CPG flow control enabled, failed to send.");
+bool Cpg::mcast(const iovec* iov, int iovLen) {
+ // Thread-safety note : the cpg_ calls are thread safe, but there
+ // is a race below between calling cpg_flow_control_state_get()
+ // and calling mcast_joined() where N threads could see the state
+ // as disabled and call mcast, but only M < N messages can be sent
+ // without exceeding flow control limits.
+ if (isFlowControlEnabled()) {
+ QPID_LOG(warning, "CPG flow control enabled")
+ return false;
}
-}
-
-void Cpg::mcast(const Name& group, const iovec* iov, int iovLen) {
- waitForFlowControl();
cpg_error_t result;
do {
result = cpg_mcast_joined(handle, CPG_TYPE_AGREED, const_cast<iovec*>(iov), iovLen);
if (result != CPG_ERR_TRY_AGAIN) check(result, cantMcastMsg(group));
} while(result == CPG_ERR_TRY_AGAIN);
+ return true;
}
void Cpg::shutdown() {
@@ -134,6 +128,10 @@ void Cpg::shutdown() {
}
}
+void Cpg::dispatch(cpg_dispatch_t type) {
+ check(cpg_dispatch(handle,type), "Error in CPG dispatch");
+}
+
string Cpg::errorStr(cpg_error_t err, const std::string& msg) {
switch (err) {
case CPG_OK: return msg+": ok";
@@ -173,8 +171,14 @@ MemberId Cpg::self() const {
return MemberId(nodeid, getpid());
}
+namespace { int byte(uint32_t value, int i) { return (value >> (i*8)) & 0xff; } }
+
ostream& operator <<(ostream& out, const MemberId& id) {
- return out << std::hex << id.first << ":" << std::dec << id.second;
+ out << byte(id.first, 0) << "."
+ << byte(id.first, 1) << "."
+ << byte(id.first, 2) << "."
+ << byte(id.first, 3);
+ return out << ":" << id.second;
}
ostream& operator<<(ostream& o, const ConnectionId& c) {
diff --git a/cpp/src/qpid/cluster/Cpg.h b/cpp/src/qpid/cluster/Cpg.h
index 5ffd42e12a..2bd58cea1f 100644
--- a/cpp/src/qpid/cluster/Cpg.h
+++ b/cpp/src/qpid/cluster/Cpg.h
@@ -19,16 +19,15 @@
*
*/
-#include "qpid/cluster/types.h"
-#include "qpid/cluster/Dispatchable.h"
-
#include "qpid/Exception.h"
+#include "qpid/cluster/Dispatchable.h"
+#include "qpid/cluster/types.h"
#include "qpid/sys/IOHandle.h"
+#include "qpid/sys/Mutex.h"
#include <boost/scoped_ptr.hpp>
#include <cassert>
-
#include <string.h>
extern "C" {
@@ -38,7 +37,6 @@ extern "C" {
namespace qpid {
namespace cluster {
-
/**
* Lightweight C++ interface to cpg.h operations.
*
@@ -53,6 +51,7 @@ class Cpg : public sys::IOHandle {
};
struct Name : public cpg_name {
+ Name() { length = 0; }
Name(const char* s) { copy(s, strlen(s)); }
Name(const char* s, size_t n) { copy(s,n); }
Name(const std::string& s) { copy(s.data(), s.size()); }
@@ -105,17 +104,21 @@ class Cpg : public sys::IOHandle {
* - CPG_DISPATCH_ALL - dispatch all available events, don't wait.
* - CPG_DISPATCH_BLOCKING - blocking dispatch loop.
*/
- void dispatch(cpg_dispatch_t type) {
- check(cpg_dispatch(handle,type), "Error in CPG dispatch");
- }
+ void dispatch(cpg_dispatch_t type);
void dispatchOne() { dispatch(CPG_DISPATCH_ONE); }
void dispatchAll() { dispatch(CPG_DISPATCH_ALL); }
void dispatchBlocking() { dispatch(CPG_DISPATCH_BLOCKING); }
- void join(const Name& group);
- void leave(const Name& group);
- void mcast(const Name& group, const iovec* iov, int iovLen);
+ void join(const std::string& group);
+ void leave();
+
+ /** Multicast to the group. NB: must not be called concurrently.
+ *
+ *@return true if the message was multi-cast, false if
+ * it was not sent due to flow control.
+ */
+ bool mcast(const iovec* iov, int iovLen);
cpg_handle_t getHandle() const { return handle; }
@@ -123,10 +126,13 @@ class Cpg : public sys::IOHandle {
int getFd();
+ bool isFlowControlEnabled();
+
private:
static std::string errorStr(cpg_error_t err, const std::string& msg);
static std::string cantJoinMsg(const Name&);
- static std::string cantLeaveMsg(const Name&); std::string cantMcastMsg(const Name&);
+ static std::string cantLeaveMsg(const Name&);
+ static std::string cantMcastMsg(const Name&);
static void check(cpg_error_t result, const std::string& msg) {
if (result != CPG_OK) throw Exception(errorStr(result, msg));
@@ -150,12 +156,11 @@ class Cpg : public sys::IOHandle {
struct cpg_address *joined, int nJoined
);
- bool isFlowControlEnabled();
- void waitForFlowControl();
-
cpg_handle_t handle;
Handler& handler;
bool isShutdown;
+ Name group;
+ sys::Mutex dispatchLock;
};
inline bool operator==(const cpg_name& a, const cpg_name& b) {
diff --git a/cpp/src/qpid/cluster/DumpClient.cpp b/cpp/src/qpid/cluster/DumpClient.cpp
index 3a4f217721..3f3212470d 100644
--- a/cpp/src/qpid/cluster/DumpClient.cpp
+++ b/cpp/src/qpid/cluster/DumpClient.cpp
@@ -85,11 +85,11 @@ void send(client::AsyncSession& s, const AMQBody& body) {
// TODO aconway 2008-09-24: optimization: dump connections/sessions in parallel.
-DumpClient::DumpClient(const MemberId& from, const MemberId& to, const Url& url,
+DumpClient::DumpClient(const MemberId& dumper, const MemberId& dumpee, const Url& url,
broker::Broker& broker, const ClusterMap& m, const Cluster::Connections& cons,
const boost::function<void()>& ok,
const boost::function<void(const std::exception&)>& fail)
- : dumperId(to), dumpeeId(from), dumpeeUrl(url), dumperBroker(broker), map(m), connections(cons),
+ : dumperId(dumper), dumpeeId(dumpee), dumpeeUrl(url), dumperBroker(broker), map(m), connections(cons),
connection(catchUpConnection()), shadowConnection(catchUpConnection()),
done(ok), failed(fail)
{
diff --git a/cpp/src/qpid/cluster/Event.cpp b/cpp/src/qpid/cluster/Event.cpp
index 3f2b5443d2..87cc7e7bd3 100644
--- a/cpp/src/qpid/cluster/Event.cpp
+++ b/cpp/src/qpid/cluster/Event.cpp
@@ -56,14 +56,14 @@ Event Event::control(const framing::AMQBody& body, const ConnectionId& cid, uint
return e;
}
-void Event::mcast (const Cpg::Name& name, Cpg& cpg) const {
+bool Event::mcast (Cpg& cpg) const {
char header[OVERHEAD];
Buffer b(header, OVERHEAD);
b.putOctet(type);
b.putLongLong(reinterpret_cast<uint64_t>(connectionId.getPointer()));
b.putLong(id);
iovec iov[] = { { header, OVERHEAD }, { const_cast<char*>(getData()), getSize() } };
- cpg.mcast(name, iov, sizeof(iov)/sizeof(*iov));
+ return cpg.mcast(iov, sizeof(iov)/sizeof(*iov));
}
Event::operator Buffer() const {
diff --git a/cpp/src/qpid/cluster/Event.h b/cpp/src/qpid/cluster/Event.h
index 9a2b12bf05..b61ce0e60d 100644
--- a/cpp/src/qpid/cluster/Event.h
+++ b/cpp/src/qpid/cluster/Event.h
@@ -51,7 +51,7 @@ class Event {
/** Create an event containing a control */
static Event control(const framing::AMQBody&, const ConnectionId&, uint32_t id=0);
- void mcast(const Cpg::Name& name, Cpg& cpg) const;
+ bool mcast(Cpg& cpg) const;
EventType getType() const { return type; }
ConnectionId getConnectionId() const { return connectionId; }
diff --git a/cpp/src/qpid/sys/PollableQueue.h b/cpp/src/qpid/sys/PollableQueue.h
index a594dab86d..2ee29db022 100644
--- a/cpp/src/qpid/sys/PollableQueue.h
+++ b/cpp/src/qpid/sys/PollableQueue.h
@@ -44,8 +44,13 @@ class Poller;
template <class T>
class PollableQueue {
public:
- /** Callback to process a range of items. */
- typedef boost::function<void (const T&)> Callback;
+ /**
+ * Callback to process an item from the queue.
+ *
+ * @return If true the item is removed from the queue else it
+ * remains on the queue and the queue is stopped.
+ */
+ typedef boost::function<bool (const T&)> Callback;
/** When the queue is selected by the poller, values are passed to callback cb. */
PollableQueue(const Callback& cb, const boost::shared_ptr<sys::Poller>& poller);
@@ -66,6 +71,7 @@ class PollableQueue {
size_t size() { ScopedLock l(lock); return queue.size(); }
bool empty() { ScopedLock l(lock); return queue.empty(); }
+
private:
typedef std::deque<T> Queue;
typedef sys::Monitor::ScopedLock ScopedLock;
@@ -94,7 +100,7 @@ template <class T> PollableQueue<T>::PollableQueue(
template <class T> void PollableQueue<T>::start() {
ScopedLock l(lock);
- assert(stopped);
+ if (!stopped) return;
stopped = false;
if (!queue.empty()) condition.set();
handle.rewatch();
@@ -115,25 +121,27 @@ template <class T> void PollableQueue<T>::dispatch(sys::DispatchHandle& h) {
assert(dispatcher.id() == 0 || dispatcher.id() == Thread::current().id());
dispatcher = Thread::current();
while (!stopped && !queue.empty()) {
- T value = queue.front();
- queue.pop_front();
- { // callback outside the lock to allow concurrent push.
+ bool ok = false;
+ { // unlock to allow concurrent push or call to stop() in callback.
ScopedUnlock u(lock);
- callback(value);
+ // FIXME aconway 2008-12-02: exception-safe if callback throws.
+ ok = callback(queue.front());
}
+ if (ok) queue.pop_front();
+ else stopped=true;
}
+ dispatcher = Thread();
if (queue.empty()) condition.clear();
if (stopped) lock.notifyAll();
- dispatcher = Thread();
- if (!stopped) h.rewatch();
+ else h.rewatch();
}
template <class T> void PollableQueue<T>::stop() {
ScopedLock l(lock);
- assert(!stopped);
+ if (stopped) return;
handle.unwatch();
stopped = true;
- // No deadlock if stop is called from the dispatcher thread
+ // Avoid deadlock if stop is called from the dispatch thread
while (dispatcher.id() && dispatcher.id() != Thread::current().id())
lock.wait();
}