diff options
Diffstat (limited to 'cpp/src/qpid/cluster/Cpg.cpp')
-rw-r--r-- | cpp/src/qpid/cluster/Cpg.cpp | 35 |
1 files changed, 32 insertions, 3 deletions
diff --git a/cpp/src/qpid/cluster/Cpg.cpp b/cpp/src/qpid/cluster/Cpg.cpp index 5efa91f11c..3ae0c970c7 100644 --- a/cpp/src/qpid/cluster/Cpg.cpp +++ b/cpp/src/qpid/cluster/Cpg.cpp @@ -39,6 +39,8 @@ namespace cluster { using namespace std; + + Cpg* Cpg::cpgFromHandle(cpg_handle_t handle) { void* cpg=0; CPG_CHECK(cpg_context_get(handle, &cpg), "Cannot get CPG instance."); @@ -46,6 +48,28 @@ Cpg* Cpg::cpgFromHandle(cpg_handle_t handle) { return reinterpret_cast<Cpg*>(cpg); } +// Applies the same retry-logic to all cpg calls that need it. +void Cpg::callCpg ( CpgOp & c ) { + cpg_error_t result; + unsigned int snooze = 10; + for ( unsigned int nth_try = 0; nth_try < cpgRetries; ++ nth_try ) { + if ( CPG_OK == (result = c.op(handle, & group))) { + QPID_LOG(info, c.opName << " successful."); + break; + } + else if ( result == CPG_ERR_TRY_AGAIN ) { + QPID_LOG(info, "Retrying " << c.opName ); + sys::usleep ( snooze ); + snooze *= 10; + snooze = (snooze <= maxCpgRetrySleep) ? snooze : maxCpgRetrySleep; + } + else break; // Don't retry unless CPG tells us to. + } + + if ( result != CPG_OK ) + CPG_CHECK(result, c.msg(group)); +} + // Global callback functions. void Cpg::globalDeliver ( cpg_handle_t handle, @@ -129,11 +153,11 @@ Cpg::~Cpg() { void Cpg::join(const std::string& name) { group = name; - CPG_CHECK(cpg_join(handle, &group), cantJoinMsg(group)); + callCpg ( cpgJoinOp ); } void Cpg::leave() { - CPG_CHECK(cpg_leave(handle, &group), cantLeaveMsg(group)); + callCpg ( cpgLeaveOp ); } @@ -158,7 +182,8 @@ void Cpg::shutdown() { if (!isShutdown) { QPID_LOG(debug,"Shutting down CPG"); isShutdown=true; - CPG_CHECK(cpg_finalize(handle), "Error in shutdown of CPG"); + + callCpg ( cpgFinalizeOp ); } } @@ -201,6 +226,10 @@ std::string Cpg::cantJoinMsg(const Name& group) { return "Cannot join CPG group "+group.str(); } +std::string Cpg::cantFinalizeMsg(const Name& group) { + return "Cannot finalize CPG group "+group.str(); +} + std::string Cpg::cantLeaveMsg(const Name& group) { return "Cannot leave CPG group "+group.str(); } |