diff options
author | Michael Goulish <mgoulish@apache.org> | 2009-12-11 15:29:00 +0000 |
---|---|---|
committer | Michael Goulish <mgoulish@apache.org> | 2009-12-11 15:29:00 +0000 |
commit | ae56849be17361e90a7cd7074fc31674df2d724a (patch) | |
tree | 4e906993e2ad8f1a1ae53f726ef35b4f7202bb3b /cpp/src | |
parent | 2e43ee118c7a7e1076bcba10afa3ecaa903cb537 (diff) | |
download | qpid-python-ae56849be17361e90a7cd7074fc31674df2d724a.tar.gz |
Add retry capability to several cpg calls.
First retry is immediate, next one after 10 usec,
then 100 usec, etc ... for 5 retries.
Retry pause maxes out at 0.1 second.
Then give up and report error.
The lack of retry on one of these calls must have
been responsible for several hard-to-reproduce
failures seen over the last month or so.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@889657 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src')
-rw-r--r-- | cpp/src/qpid/cluster/Cpg.cpp | 35 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/Cpg.h | 64 |
2 files changed, 96 insertions, 3 deletions
diff --git a/cpp/src/qpid/cluster/Cpg.cpp b/cpp/src/qpid/cluster/Cpg.cpp index 5efa91f11c..3ae0c970c7 100644 --- a/cpp/src/qpid/cluster/Cpg.cpp +++ b/cpp/src/qpid/cluster/Cpg.cpp @@ -39,6 +39,8 @@ namespace cluster { using namespace std; + + Cpg* Cpg::cpgFromHandle(cpg_handle_t handle) { void* cpg=0; CPG_CHECK(cpg_context_get(handle, &cpg), "Cannot get CPG instance."); @@ -46,6 +48,28 @@ Cpg* Cpg::cpgFromHandle(cpg_handle_t handle) { return reinterpret_cast<Cpg*>(cpg); } +// Applies the same retry-logic to all cpg calls that need it. +void Cpg::callCpg ( CpgOp & c ) { + cpg_error_t result; + unsigned int snooze = 10; + for ( unsigned int nth_try = 0; nth_try < cpgRetries; ++ nth_try ) { + if ( CPG_OK == (result = c.op(handle, & group))) { + QPID_LOG(info, c.opName << " successful."); + break; + } + else if ( result == CPG_ERR_TRY_AGAIN ) { + QPID_LOG(info, "Retrying " << c.opName ); + sys::usleep ( snooze ); + snooze *= 10; + snooze = (snooze <= maxCpgRetrySleep) ? snooze : maxCpgRetrySleep; + } + else break; // Don't retry unless CPG tells us to. + } + + if ( result != CPG_OK ) + CPG_CHECK(result, c.msg(group)); +} + // Global callback functions. void Cpg::globalDeliver ( cpg_handle_t handle, @@ -129,11 +153,11 @@ Cpg::~Cpg() { void Cpg::join(const std::string& name) { group = name; - CPG_CHECK(cpg_join(handle, &group), cantJoinMsg(group)); + callCpg ( cpgJoinOp ); } void Cpg::leave() { - CPG_CHECK(cpg_leave(handle, &group), cantLeaveMsg(group)); + callCpg ( cpgLeaveOp ); } @@ -158,7 +182,8 @@ void Cpg::shutdown() { if (!isShutdown) { QPID_LOG(debug,"Shutting down CPG"); isShutdown=true; - CPG_CHECK(cpg_finalize(handle), "Error in shutdown of CPG"); + + callCpg ( cpgFinalizeOp ); } } @@ -201,6 +226,10 @@ std::string Cpg::cantJoinMsg(const Name& group) { return "Cannot join CPG group "+group.str(); } +std::string Cpg::cantFinalizeMsg(const Name& group) { + return "Cannot finalize CPG group "+group.str(); +} + std::string Cpg::cantLeaveMsg(const Name& group) { return "Cannot leave CPG group "+group.str(); } diff --git a/cpp/src/qpid/cluster/Cpg.h b/cpp/src/qpid/cluster/Cpg.h index cffbf0bdb3..6b81c602bd 100644 --- a/cpp/src/qpid/cluster/Cpg.h +++ b/cpp/src/qpid/cluster/Cpg.h @@ -39,6 +39,7 @@ namespace cluster { * On error all functions throw Cpg::Exception. * */ + class Cpg : public sys::IOHandle { public: struct Exception : public ::qpid::Exception { @@ -114,10 +115,73 @@ class Cpg : public sys::IOHandle { int getFd(); private: + + // Maximum number of retries for cog functions that can tell + // us to "try again later". + static const unsigned int cpgRetries = 5; + + // Don't let sleep-time between cpg retries to go above 0.1 second. + static const unsigned int maxCpgRetrySleep = 100000; + + + // Base class for the Cpg operations that need retry capability. + struct CpgOp { + std::string opName; + + CpgOp ( std::string opName ) + : opName(opName) { } + + virtual cpg_error_t op ( cpg_handle_t handle, struct cpg_name * ) = 0; + virtual std::string msg(const Name&) = 0; + virtual ~CpgOp ( ) { } + }; + + + struct CpgJoinOp : public CpgOp { + CpgJoinOp ( ) + : CpgOp ( std::string("cpg_join") ) { } + + cpg_error_t op(cpg_handle_t handle, struct cpg_name * group) { + return cpg_join ( handle, group ); + } + + std::string msg(const Name& name) { return cantJoinMsg(name); } + }; + + struct CpgLeaveOp : public CpgOp { + CpgLeaveOp ( ) + : CpgOp ( std::string("cpg_leave") ) { } + + cpg_error_t op(cpg_handle_t handle, struct cpg_name * group) { + return cpg_leave ( handle, group ); + } + + std::string msg(const Name& name) { return cantLeaveMsg(name); } + }; + + struct CpgFinalizeOp : public CpgOp { + CpgFinalizeOp ( ) + : CpgOp ( std::string("cpg_finalize") ) { } + + cpg_error_t op(cpg_handle_t handle, struct cpg_name *) { + return cpg_finalize ( handle ); + } + + std::string msg(const Name& name) { return cantFinalizeMsg(name); } + }; + + // This fn standardizes retry policy across all Cpg ops that need it. + void callCpg ( CpgOp & ); + + CpgJoinOp cpgJoinOp; + CpgLeaveOp cpgLeaveOp; + CpgFinalizeOp cpgFinalizeOp; + static std::string errorStr(cpg_error_t err, const std::string& msg); static std::string cantJoinMsg(const Name&); static std::string cantLeaveMsg(const Name&); static std::string cantMcastMsg(const Name&); + static std::string cantFinalizeMsg(const Name&); static Cpg* cpgFromHandle(cpg_handle_t); |