summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2008-08-07 20:46:18 +0000
committerAlan Conway <aconway@apache.org>2008-08-07 20:46:18 +0000
commit2100b4498daa1d89e2850196dca19bae1b4a6151 (patch)
tree638f3fb5435545e176a78e2229bb1f56e856e197
parentf4dc59ea3028b87c1f8640df02c2a73b5cafcf1a (diff)
downloadqpid-python-2100b4498daa1d89e2850196dca19bae1b4a6151.tar.gz
Check CPG flow control.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@683711 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--cpp/src/qpid/cluster/Cpg.cpp40
-rw-r--r--cpp/src/qpid/cluster/Cpg.h22
-rw-r--r--cpp/src/tests/ForkedBroker.h2
3 files changed, 48 insertions, 16 deletions
diff --git a/cpp/src/qpid/cluster/Cpg.cpp b/cpp/src/qpid/cluster/Cpg.cpp
index 674781ac06..6b01d73197 100644
--- a/cpp/src/qpid/cluster/Cpg.cpp
+++ b/cpp/src/qpid/cluster/Cpg.cpp
@@ -77,6 +77,46 @@ Cpg::~Cpg() {
}
}
+void Cpg::join(const Name& group) {
+ check(cpg_join(handle, const_cast<Name*>(&group)),cantJoinMsg(group));
+}
+
+void Cpg::leave(const Name& group) {
+ check(cpg_leave(handle,const_cast<Name*>(&group)),cantLeaveMsg(group));
+}
+
+bool Cpg::isFlowControlEnabled() {
+ cpg_flow_control_state_t flowState;
+ check(cpg_flow_control_state_get(handle, &flowState), "Cannot get CPG flow control status.");
+ return flowState == CPG_FLOW_CONTROL_ENABLED;
+}
+
+// TODO aconway 2008-08-07: better handling of flow control.
+// Wait for flow control to be disabled.
+void Cpg::waitForFlowControl() {
+ int delayNs=1000; // one millisecond
+ int tries=8; // double the delay on each try.
+ while (isFlowControlEnabled() && tries > 0) {
+ QPID_LOG(warning, "CPG flow control enabled, retry in " << delayNs << "ns");
+ ::usleep(delayNs);
+ --tries;
+ delayNs *= 2;
+ };
+ if (tries == 0) {
+ // FIXME aconway 2008-08-07: this is a fatal leave-the-cluster condition.
+ throw Cpg::Exception("CPG flow control enabled, failed to send.");
+ }
+}
+
+void Cpg::mcast(const Name& group, const iovec* iov, int iovLen) {
+ waitForFlowControl();
+ cpg_error_t result;
+ do {
+ result = cpg_mcast_joined(handle, CPG_TYPE_AGREED, const_cast<iovec*>(iov), iovLen);
+ if (result != CPG_ERR_TRY_AGAIN) check(result, cantMcastMsg(group));
+ } while(result == CPG_ERR_TRY_AGAIN);
+}
+
void Cpg::shutdown() {
if (!isShutdown) {
QPID_LOG(debug,"Shutting down CPG");
diff --git a/cpp/src/qpid/cluster/Cpg.h b/cpp/src/qpid/cluster/Cpg.h
index c89bf3e121..ab5af16b3d 100644
--- a/cpp/src/qpid/cluster/Cpg.h
+++ b/cpp/src/qpid/cluster/Cpg.h
@@ -117,19 +117,9 @@ class Cpg : public Dispatchable {
void dispatchAll() { dispatch(CPG_DISPATCH_ALL); }
void dispatchBlocking() { dispatch(CPG_DISPATCH_BLOCKING); }
- void join(const Name& group) {
- check(cpg_join(handle, const_cast<Name*>(&group)),cantJoinMsg(group));
- };
-
- void leave(const Name& group) {
- check(cpg_leave(handle,const_cast<Name*>(&group)),cantLeaveMsg(group));
- }
-
- void mcast(const Name& group, const iovec* iov, int iovLen) {
- check(cpg_mcast_joined(
- handle, CPG_TYPE_AGREED, const_cast<iovec*>(iov), iovLen),
- cantMcastMsg(group));
- }
+ void join(const Name& group);
+ void leave(const Name& group);
+ void mcast(const Name& group, const iovec* iov, int iovLen);
cpg_handle_t getHandle() const { return handle; }
@@ -138,8 +128,7 @@ class Cpg : public Dispatchable {
private:
static std::string errorStr(cpg_error_t err, const std::string& msg);
static std::string cantJoinMsg(const Name&);
- static std::string cantLeaveMsg(const Name&);
- static std::string cantMcastMsg(const Name&);
+ static std::string cantLeaveMsg(const Name&); std::string cantMcastMsg(const Name&);
static void check(cpg_error_t result, const std::string& msg) {
if (result != CPG_OK) throw Exception(errorStr(result, msg));
@@ -163,6 +152,9 @@ class Cpg : public Dispatchable {
struct cpg_address *joined, int nJoined
);
+ bool isFlowControlEnabled();
+ void waitForFlowControl();
+
cpg_handle_t handle;
Handler& handler;
bool isShutdown;
diff --git a/cpp/src/tests/ForkedBroker.h b/cpp/src/tests/ForkedBroker.h
index 6c20330c28..a7869ff602 100644
--- a/cpp/src/tests/ForkedBroker.h
+++ b/cpp/src/tests/ForkedBroker.h
@@ -85,7 +85,7 @@ class ForkedBroker {
::close(pipeFds[1]);
FILE* f = ::fdopen(pipeFds[0], "r");
if (!f) throw ErrnoException("fopen failed");
- if (::fscanf(f, "%d", &port) != 1) throw ErrnoException("fscanf failed");
+ if (::fscanf(f, "%d", &port) != 1) throw ErrnoException("ill-formatted port");
}
else { // child
::close(pipeFds[0]);