diff options
Diffstat (limited to 'cpp/src/qpid/cluster/Cpg.cpp')
-rw-r--r-- | cpp/src/qpid/cluster/Cpg.cpp | 223 |
1 files changed, 154 insertions, 69 deletions
diff --git a/cpp/src/qpid/cluster/Cpg.cpp b/cpp/src/qpid/cluster/Cpg.cpp index 2ffd3509bf..3ae0c970c7 100644 --- a/cpp/src/qpid/cluster/Cpg.cpp +++ b/cpp/src/qpid/cluster/Cpg.cpp @@ -16,33 +16,85 @@ * */ -#include "Cpg.h" +#include "qpid/cluster/Cpg.h" #include "qpid/sys/Mutex.h" -// Note cpg is currently unix-specific. Refactor if availble on other platforms. +#include "qpid/sys/Time.h" #include "qpid/sys/posix/PrivatePosix.h" #include "qpid/log/Statement.h" #include <vector> #include <limits> #include <iterator> +#include <sstream> #include <unistd.h> +// This is a macro instead of a function because we don't want to +// evaluate the MSG argument unless there is an error. +#define CPG_CHECK(RESULT, MSG) \ + if ((RESULT) != CPG_OK) throw Exception(errorStr((RESULT), (MSG))) + namespace qpid { namespace cluster { using namespace std; + + Cpg* Cpg::cpgFromHandle(cpg_handle_t handle) { void* cpg=0; - check(cpg_context_get(handle, &cpg), "Cannot get CPG instance."); + CPG_CHECK(cpg_context_get(handle, &cpg), "Cannot get CPG instance."); if (!cpg) throw Exception("Cannot get CPG instance."); return reinterpret_cast<Cpg*>(cpg); } +// Applies the same retry-logic to all cpg calls that need it. +void Cpg::callCpg ( CpgOp & c ) { + cpg_error_t result; + unsigned int snooze = 10; + for ( unsigned int nth_try = 0; nth_try < cpgRetries; ++ nth_try ) { + if ( CPG_OK == (result = c.op(handle, & group))) { + QPID_LOG(info, c.opName << " successful."); + break; + } + else if ( result == CPG_ERR_TRY_AGAIN ) { + QPID_LOG(info, "Retrying " << c.opName ); + sys::usleep ( snooze ); + snooze *= 10; + snooze = (snooze <= maxCpgRetrySleep) ? snooze : maxCpgRetrySleep; + } + else break; // Don't retry unless CPG tells us to. + } + + if ( result != CPG_OK ) + CPG_CHECK(result, c.msg(group)); +} + // Global callback functions. void Cpg::globalDeliver ( cpg_handle_t handle, + const struct cpg_name *group, + uint32_t nodeid, + uint32_t pid, + void* msg, + size_t msg_len) +{ + cpgFromHandle(handle)->handler.deliver(handle, group, nodeid, pid, msg, msg_len); +} + +void Cpg::globalConfigChange( + cpg_handle_t handle, + const struct cpg_name *group, + const struct cpg_address *members, size_t nMembers, + const struct cpg_address *left, size_t nLeft, + const struct cpg_address *joined, size_t nJoined +) +{ + cpgFromHandle(handle)->handler.configChange(handle, group, members, nMembers, left, nLeft, joined, nJoined); +} + +void Cpg::globalDeliver ( + cpg_handle_t handle, struct cpg_name *group, uint32_t nodeid, uint32_t pid, @@ -65,103 +117,119 @@ void Cpg::globalConfigChange( int Cpg::getFd() { int fd; - check(cpg_fd_get(handle, &fd), "Cannot get CPG file descriptor"); + CPG_CHECK(cpg_fd_get(handle, &fd), "Cannot get CPG file descriptor"); return fd; } Cpg::Cpg(Handler& h) : IOHandle(new sys::IOHandlePrivate), handler(h), isShutdown(false) { - cpg_callbacks_t callbacks = { &globalDeliver, &globalConfigChange }; - check(cpg_initialize(&handle, &callbacks), "Cannot initialize CPG"); - check(cpg_context_set(handle, this), "Cannot set CPG context"); + cpg_callbacks_t callbacks; + ::memset(&callbacks, 0, sizeof(callbacks)); + callbacks.cpg_deliver_fn = &globalDeliver; + callbacks.cpg_confchg_fn = &globalConfigChange; + + QPID_LOG(notice, "Initializing CPG"); + cpg_error_t err = cpg_initialize(&handle, &callbacks); + int retries = 6; // FIXME aconway 2009-08-06: make this configurable. + while (err == CPG_ERR_TRY_AGAIN && --retries) { + QPID_LOG(notice, "Re-trying CPG initialization."); + sys::sleep(5); + err = cpg_initialize(&handle, &callbacks); + } + CPG_CHECK(err, "Failed to initialize CPG."); + CPG_CHECK(cpg_context_set(handle, this), "Cannot set CPG context"); // Note: CPG is currently unix-specific. If CPG is ported to // windows then this needs to be refactored into // qpid::sys::<platform> IOHandle::impl->fd = getFd(); - QPID_LOG(debug, "Initialized CPG handle 0x" << std::hex << handle); } Cpg::~Cpg() { try { shutdown(); } catch (const std::exception& e) { - QPID_LOG(error, "Exception in Cpg destructor: " << e.what()); + QPID_LOG(error, "Error during CPG shutdown: " << e.what()); } } -void Cpg::join(const Name& group) { - check(cpg_join(handle, const_cast<Name*>(&group)),cantJoinMsg(group)); +void Cpg::join(const std::string& name) { + group = name; + callCpg ( cpgJoinOp ); } -void Cpg::leave(const Name& group) { - check(cpg_leave(handle,const_cast<Name*>(&group)),cantLeaveMsg(group)); +void Cpg::leave() { + callCpg ( cpgLeaveOp ); } -bool Cpg::isFlowControlEnabled() { + + + +bool Cpg::mcast(const iovec* iov, int iovLen) { + // Check for flow control cpg_flow_control_state_t flowState; - check(cpg_flow_control_state_get(handle, &flowState), "Cannot get CPG flow control status."); - return flowState == CPG_FLOW_CONTROL_ENABLED; -} - -// TODO aconway 2008-08-07: better handling of flow control. -// Wait for flow control to be disabled. -// FIXME aconway 2008-08-08: does flow control check involve a round-trip? If so maybe remove... -void Cpg::waitForFlowControl() { - int delayNs=1000; // one millisecond - int tries=8; // double the delay on each try. - while (isFlowControlEnabled() && tries > 0) { - QPID_LOG(warning, "CPG flow control enabled, retry in " << delayNs << "ns"); - ::usleep(delayNs); - --tries; - delayNs *= 2; - }; - if (tries == 0) { - // FIXME aconway 2008-08-07: this is a fatal leave-the-cluster condition. - throw Cpg::Exception("CPG flow control enabled, failed to send."); - } -} + CPG_CHECK(cpg_flow_control_state_get(handle, &flowState), "Cannot get CPG flow control status."); + if (flowState == CPG_FLOW_CONTROL_ENABLED) + return false; -void Cpg::mcast(const Name& group, const iovec* iov, int iovLen) { - waitForFlowControl(); cpg_error_t result; do { result = cpg_mcast_joined(handle, CPG_TYPE_AGREED, const_cast<iovec*>(iov), iovLen); - if (result != CPG_ERR_TRY_AGAIN) check(result, cantMcastMsg(group)); + if (result != CPG_ERR_TRY_AGAIN) CPG_CHECK(result, cantMcastMsg(group)); } while(result == CPG_ERR_TRY_AGAIN); + return true; } void Cpg::shutdown() { if (!isShutdown) { QPID_LOG(debug,"Shutting down CPG"); isShutdown=true; - check(cpg_finalize(handle), "Error in shutdown of CPG"); + + callCpg ( cpgFinalizeOp ); } } +void Cpg::dispatchOne() { + CPG_CHECK(cpg_dispatch(handle,CPG_DISPATCH_ONE), "Error in CPG dispatch"); +} + +void Cpg::dispatchAll() { + CPG_CHECK(cpg_dispatch(handle,CPG_DISPATCH_ALL), "Error in CPG dispatch"); +} + +void Cpg::dispatchBlocking() { + CPG_CHECK(cpg_dispatch(handle,CPG_DISPATCH_BLOCKING), "Error in CPG dispatch"); +} + string Cpg::errorStr(cpg_error_t err, const std::string& msg) { + std::ostringstream os; + os << msg << ": "; switch (err) { - case CPG_OK: return msg+": ok"; - case CPG_ERR_LIBRARY: return msg+": library"; - case CPG_ERR_TIMEOUT: return msg+": timeout"; - case CPG_ERR_TRY_AGAIN: return msg+": timeout. The aisexec daemon may not be running"; - case CPG_ERR_INVALID_PARAM: return msg+": invalid param"; - case CPG_ERR_NO_MEMORY: return msg+": no memory"; - case CPG_ERR_BAD_HANDLE: return msg+": bad handle"; - case CPG_ERR_ACCESS: return msg+": access denied. You may need to set your group ID to 'ais'"; - case CPG_ERR_NOT_EXIST: return msg+": not exist"; - case CPG_ERR_EXIST: return msg+": exist"; - case CPG_ERR_NOT_SUPPORTED: return msg+": not supported"; - case CPG_ERR_SECURITY: return msg+": security"; - case CPG_ERR_TOO_MANY_GROUPS: return msg+": too many groups"; - default: - assert(0); - return ": unknown"; + case CPG_OK: os << "ok"; break; + case CPG_ERR_LIBRARY: os << "library"; break; + case CPG_ERR_TIMEOUT: os << "timeout"; break; + case CPG_ERR_TRY_AGAIN: os << "try again"; break; + case CPG_ERR_INVALID_PARAM: os << "invalid param"; break; + case CPG_ERR_NO_MEMORY: os << "no memory"; break; + case CPG_ERR_BAD_HANDLE: os << "bad handle"; break; + case CPG_ERR_ACCESS: os << "access denied. You may need to set your group ID to 'ais'"; break; + case CPG_ERR_NOT_EXIST: os << "not exist"; break; + case CPG_ERR_EXIST: os << "exist"; break; + case CPG_ERR_NOT_SUPPORTED: os << "not supported"; break; + case CPG_ERR_SECURITY: os << "security"; break; + case CPG_ERR_TOO_MANY_GROUPS: os << "too many groups"; break; + default: os << ": unknown cpg error " << err; }; + os << " (" << err << ")"; + return os.str(); } std::string Cpg::cantJoinMsg(const Name& group) { return "Cannot join CPG group "+group.str(); } +std::string Cpg::cantFinalizeMsg(const Name& group) { + return "Cannot finalize CPG group "+group.str(); +} + std::string Cpg::cantLeaveMsg(const Name& group) { return "Cannot leave CPG group "+group.str(); } @@ -170,27 +238,44 @@ std::string Cpg::cantMcastMsg(const Name& group) { return "Cannot mcast to CPG group "+group.str(); } -Cpg::Id Cpg::self() const { +MemberId Cpg::self() const { unsigned int nodeid; - check(cpg_local_get(handle, &nodeid), "Cannot get local CPG identity"); - return Id(nodeid, getpid()); + CPG_CHECK(cpg_local_get(handle, &nodeid), "Cannot get local CPG identity"); + return MemberId(nodeid, getpid()); } -ostream& operator<<(ostream& o, std::pair<cpg_address*,int> a) { - ostream_iterator<Cpg::Id> i(o, " "); - std::copy(a.first, a.first+a.second, i); - return o; +namespace { int byte(uint32_t value, int i) { return (value >> (i*8)) & 0xff; } } + +ostream& operator<<(ostream& out, const MemberId& id) { + if (id.first) { + out << byte(id.first, 0) << "." + << byte(id.first, 1) << "." + << byte(id.first, 2) << "." + << byte(id.first, 3) + << ":"; + } + return out << id.second; } -ostream& operator <<(ostream& out, const Cpg::Id& id) { - return out << id.getNodeId() << "-" << id.getPid(); +ostream& operator<<(ostream& o, const ConnectionId& c) { + return o << c.first << "-" << c.second; } -ostream& operator <<(ostream& out, const cpg_name& name) { - return out << string(name.value, name.length); +std::string MemberId::str() const { + char s[8]; + uint32_t x; + x = htonl(first); + ::memcpy(s, &x, 4); + x = htonl(second); + ::memcpy(s+4, &x, 4); + return std::string(s,8); } +MemberId::MemberId(const std::string& s) { + uint32_t x; + memcpy(&x, &s[0], 4); + first = ntohl(x); + memcpy(&x, &s[4], 4); + second = ntohl(x); +} }} // namespace qpid::cluster - - - |