diff options
author | Alan Conway <aconway@apache.org> | 2008-08-07 20:46:18 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2008-08-07 20:46:18 +0000 |
commit | 2100b4498daa1d89e2850196dca19bae1b4a6151 (patch) | |
tree | 638f3fb5435545e176a78e2229bb1f56e856e197 | |
parent | f4dc59ea3028b87c1f8640df02c2a73b5cafcf1a (diff) | |
download | qpid-python-2100b4498daa1d89e2850196dca19bae1b4a6151.tar.gz |
Check CPG flow control.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@683711 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | cpp/src/qpid/cluster/Cpg.cpp | 40 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/Cpg.h | 22 | ||||
-rw-r--r-- | cpp/src/tests/ForkedBroker.h | 2 |
3 files changed, 48 insertions, 16 deletions
diff --git a/cpp/src/qpid/cluster/Cpg.cpp b/cpp/src/qpid/cluster/Cpg.cpp index 674781ac06..6b01d73197 100644 --- a/cpp/src/qpid/cluster/Cpg.cpp +++ b/cpp/src/qpid/cluster/Cpg.cpp @@ -77,6 +77,46 @@ Cpg::~Cpg() { } } +void Cpg::join(const Name& group) { + check(cpg_join(handle, const_cast<Name*>(&group)),cantJoinMsg(group)); +} + +void Cpg::leave(const Name& group) { + check(cpg_leave(handle,const_cast<Name*>(&group)),cantLeaveMsg(group)); +} + +bool Cpg::isFlowControlEnabled() { + cpg_flow_control_state_t flowState; + check(cpg_flow_control_state_get(handle, &flowState), "Cannot get CPG flow control status."); + return flowState == CPG_FLOW_CONTROL_ENABLED; +} + +// TODO aconway 2008-08-07: better handling of flow control. +// Wait for flow control to be disabled. +void Cpg::waitForFlowControl() { + int delayNs=1000; // one millisecond + int tries=8; // double the delay on each try. + while (isFlowControlEnabled() && tries > 0) { + QPID_LOG(warning, "CPG flow control enabled, retry in " << delayNs << "ns"); + ::usleep(delayNs); + --tries; + delayNs *= 2; + }; + if (tries == 0) { + // FIXME aconway 2008-08-07: this is a fatal leave-the-cluster condition. + throw Cpg::Exception("CPG flow control enabled, failed to send."); + } +} + +void Cpg::mcast(const Name& group, const iovec* iov, int iovLen) { + waitForFlowControl(); + cpg_error_t result; + do { + result = cpg_mcast_joined(handle, CPG_TYPE_AGREED, const_cast<iovec*>(iov), iovLen); + if (result != CPG_ERR_TRY_AGAIN) check(result, cantMcastMsg(group)); + } while(result == CPG_ERR_TRY_AGAIN); +} + void Cpg::shutdown() { if (!isShutdown) { QPID_LOG(debug,"Shutting down CPG"); diff --git a/cpp/src/qpid/cluster/Cpg.h b/cpp/src/qpid/cluster/Cpg.h index c89bf3e121..ab5af16b3d 100644 --- a/cpp/src/qpid/cluster/Cpg.h +++ b/cpp/src/qpid/cluster/Cpg.h @@ -117,19 +117,9 @@ class Cpg : public Dispatchable { void dispatchAll() { dispatch(CPG_DISPATCH_ALL); } void dispatchBlocking() { dispatch(CPG_DISPATCH_BLOCKING); } - void join(const Name& group) { - check(cpg_join(handle, const_cast<Name*>(&group)),cantJoinMsg(group)); - }; - - void leave(const Name& group) { - check(cpg_leave(handle,const_cast<Name*>(&group)),cantLeaveMsg(group)); - } - - void mcast(const Name& group, const iovec* iov, int iovLen) { - check(cpg_mcast_joined( - handle, CPG_TYPE_AGREED, const_cast<iovec*>(iov), iovLen), - cantMcastMsg(group)); - } + void join(const Name& group); + void leave(const Name& group); + void mcast(const Name& group, const iovec* iov, int iovLen); cpg_handle_t getHandle() const { return handle; } @@ -138,8 +128,7 @@ class Cpg : public Dispatchable { private: static std::string errorStr(cpg_error_t err, const std::string& msg); static std::string cantJoinMsg(const Name&); - static std::string cantLeaveMsg(const Name&); - static std::string cantMcastMsg(const Name&); + static std::string cantLeaveMsg(const Name&); std::string cantMcastMsg(const Name&); static void check(cpg_error_t result, const std::string& msg) { if (result != CPG_OK) throw Exception(errorStr(result, msg)); @@ -163,6 +152,9 @@ class Cpg : public Dispatchable { struct cpg_address *joined, int nJoined ); + bool isFlowControlEnabled(); + void waitForFlowControl(); + cpg_handle_t handle; Handler& handler; bool isShutdown; diff --git a/cpp/src/tests/ForkedBroker.h b/cpp/src/tests/ForkedBroker.h index 6c20330c28..a7869ff602 100644 --- a/cpp/src/tests/ForkedBroker.h +++ b/cpp/src/tests/ForkedBroker.h @@ -85,7 +85,7 @@ class ForkedBroker { ::close(pipeFds[1]); FILE* f = ::fdopen(pipeFds[0], "r"); if (!f) throw ErrnoException("fopen failed"); - if (::fscanf(f, "%d", &port) != 1) throw ErrnoException("fscanf failed"); + if (::fscanf(f, "%d", &port) != 1) throw ErrnoException("ill-formatted port"); } else { // child ::close(pipeFds[0]); |