diff options
Diffstat (limited to 'cpp/src/qpid/cluster/Cpg.cpp')
-rw-r--r-- | cpp/src/qpid/cluster/Cpg.cpp | 48 |
1 files changed, 26 insertions, 22 deletions
diff --git a/cpp/src/qpid/cluster/Cpg.cpp b/cpp/src/qpid/cluster/Cpg.cpp index 9b71e4235d..6a9c97139a 100644 --- a/cpp/src/qpid/cluster/Cpg.cpp +++ b/cpp/src/qpid/cluster/Cpg.cpp @@ -87,12 +87,13 @@ Cpg::~Cpg() { } } -void Cpg::join(const Name& group) { - check(cpg_join(handle, const_cast<Name*>(&group)),cantJoinMsg(group)); +void Cpg::join(const std::string& name) { + group = name; + check(cpg_join(handle, &group), cantJoinMsg(group)); } -void Cpg::leave(const Name& group) { - check(cpg_leave(handle,const_cast<Name*>(&group)),cantLeaveMsg(group)); +void Cpg::leave() { + check(cpg_leave(handle, &group), cantLeaveMsg(group)); } bool Cpg::isFlowControlEnabled() { @@ -101,29 +102,22 @@ bool Cpg::isFlowControlEnabled() { return flowState == CPG_FLOW_CONTROL_ENABLED; } -// FIXME aconway 2008-08-07: better handling of cpg flow control, no sleeping. -void Cpg::waitForFlowControl() { - int delayNs=1000; // one millisecond - int tries=8; // double the delay on each try. - while (isFlowControlEnabled() && tries > 0) { - QPID_LOG(warning, "CPG flow control enabled, retry in " << delayNs << "ns"); - ::usleep(delayNs); - --tries; - delayNs *= 2; - }; - if (tries == 0) { - // FIXME aconway 2008-08-07: this is a fatal leave-the-cluster condition. - throw Cpg::Exception("CPG flow control enabled, failed to send."); +bool Cpg::mcast(const iovec* iov, int iovLen) { + // Thread-safety note : the cpg_ calls are thread safe, but there + // is a race below between calling cpg_flow_control_state_get() + // and calling mcast_joined() where N threads could see the state + // as disabled and call mcast, but only M < N messages can be sent + // without exceeding flow control limits. + if (isFlowControlEnabled()) { + QPID_LOG(warning, "CPG flow control enabled") + return false; } -} - -void Cpg::mcast(const Name& group, const iovec* iov, int iovLen) { - waitForFlowControl(); cpg_error_t result; do { result = cpg_mcast_joined(handle, CPG_TYPE_AGREED, const_cast<iovec*>(iov), iovLen); if (result != CPG_ERR_TRY_AGAIN) check(result, cantMcastMsg(group)); } while(result == CPG_ERR_TRY_AGAIN); + return true; } void Cpg::shutdown() { @@ -134,6 +128,10 @@ void Cpg::shutdown() { } } +void Cpg::dispatch(cpg_dispatch_t type) { + check(cpg_dispatch(handle,type), "Error in CPG dispatch"); +} + string Cpg::errorStr(cpg_error_t err, const std::string& msg) { switch (err) { case CPG_OK: return msg+": ok"; @@ -173,8 +171,14 @@ MemberId Cpg::self() const { return MemberId(nodeid, getpid()); } +namespace { int byte(uint32_t value, int i) { return (value >> (i*8)) & 0xff; } } + ostream& operator <<(ostream& out, const MemberId& id) { - return out << std::hex << id.first << ":" << std::dec << id.second; + out << byte(id.first, 0) << "." + << byte(id.first, 1) << "." + << byte(id.first, 2) << "." + << byte(id.first, 3); + return out << ":" << id.second; } ostream& operator<<(ostream& o, const ConnectionId& c) { |