summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/cluster/Cpg.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/cluster/Cpg.cpp')
-rw-r--r--cpp/src/qpid/cluster/Cpg.cpp48
1 files changed, 26 insertions, 22 deletions
diff --git a/cpp/src/qpid/cluster/Cpg.cpp b/cpp/src/qpid/cluster/Cpg.cpp
index 9b71e4235d..6a9c97139a 100644
--- a/cpp/src/qpid/cluster/Cpg.cpp
+++ b/cpp/src/qpid/cluster/Cpg.cpp
@@ -87,12 +87,13 @@ Cpg::~Cpg() {
}
}
-void Cpg::join(const Name& group) {
- check(cpg_join(handle, const_cast<Name*>(&group)),cantJoinMsg(group));
+void Cpg::join(const std::string& name) {
+ group = name;
+ check(cpg_join(handle, &group), cantJoinMsg(group));
}
-void Cpg::leave(const Name& group) {
- check(cpg_leave(handle,const_cast<Name*>(&group)),cantLeaveMsg(group));
+void Cpg::leave() {
+ check(cpg_leave(handle, &group), cantLeaveMsg(group));
}
bool Cpg::isFlowControlEnabled() {
@@ -101,29 +102,22 @@ bool Cpg::isFlowControlEnabled() {
return flowState == CPG_FLOW_CONTROL_ENABLED;
}
-// FIXME aconway 2008-08-07: better handling of cpg flow control, no sleeping.
-void Cpg::waitForFlowControl() {
- int delayNs=1000; // one millisecond
- int tries=8; // double the delay on each try.
- while (isFlowControlEnabled() && tries > 0) {
- QPID_LOG(warning, "CPG flow control enabled, retry in " << delayNs << "ns");
- ::usleep(delayNs);
- --tries;
- delayNs *= 2;
- };
- if (tries == 0) {
- // FIXME aconway 2008-08-07: this is a fatal leave-the-cluster condition.
- throw Cpg::Exception("CPG flow control enabled, failed to send.");
+bool Cpg::mcast(const iovec* iov, int iovLen) {
+ // Thread-safety note : the cpg_ calls are thread safe, but there
+ // is a race below between calling cpg_flow_control_state_get()
+ // and calling mcast_joined() where N threads could see the state
+ // as disabled and call mcast, but only M < N messages can be sent
+ // without exceeding flow control limits.
+ if (isFlowControlEnabled()) {
+ QPID_LOG(warning, "CPG flow control enabled")
+ return false;
}
-}
-
-void Cpg::mcast(const Name& group, const iovec* iov, int iovLen) {
- waitForFlowControl();
cpg_error_t result;
do {
result = cpg_mcast_joined(handle, CPG_TYPE_AGREED, const_cast<iovec*>(iov), iovLen);
if (result != CPG_ERR_TRY_AGAIN) check(result, cantMcastMsg(group));
} while(result == CPG_ERR_TRY_AGAIN);
+ return true;
}
void Cpg::shutdown() {
@@ -134,6 +128,10 @@ void Cpg::shutdown() {
}
}
+void Cpg::dispatch(cpg_dispatch_t type) {
+ check(cpg_dispatch(handle,type), "Error in CPG dispatch");
+}
+
string Cpg::errorStr(cpg_error_t err, const std::string& msg) {
switch (err) {
case CPG_OK: return msg+": ok";
@@ -173,8 +171,14 @@ MemberId Cpg::self() const {
return MemberId(nodeid, getpid());
}
+namespace { int byte(uint32_t value, int i) { return (value >> (i*8)) & 0xff; } }
+
ostream& operator <<(ostream& out, const MemberId& id) {
- return out << std::hex << id.first << ":" << std::dec << id.second;
+ out << byte(id.first, 0) << "."
+ << byte(id.first, 1) << "."
+ << byte(id.first, 2) << "."
+ << byte(id.first, 3);
+ return out << ":" << id.second;
}
ostream& operator<<(ostream& o, const ConnectionId& c) {