diff options
Diffstat (limited to 'cpp/src/qpid/cluster/Cpg.cpp')
-rw-r--r-- | cpp/src/qpid/cluster/Cpg.cpp | 102 |
1 files changed, 102 insertions, 0 deletions
diff --git a/cpp/src/qpid/cluster/Cpg.cpp b/cpp/src/qpid/cluster/Cpg.cpp index 858d25f37c..a979ce1eeb 100644 --- a/cpp/src/qpid/cluster/Cpg.cpp +++ b/cpp/src/qpid/cluster/Cpg.cpp @@ -17,12 +17,86 @@ */ #include "Cpg.h" +#include "qpid/sys/Mutex.h" +#include <vector> +#include <limits> +#include <iterator> namespace qpid { namespace cluster { using namespace std; +// Global vector of Cpg pointers by handle. +// TODO aconway 2007-06-12: Replace this with cpg_get/set_context, +// coming in in RHEL 5.1. +class Cpg::Handles +{ + public: + void put(cpg_handle_t handle, Cpg* object) { + sys::Mutex::ScopedLock l(lock); + assert(object); + uint32_t index=uint32_t(handle); // Lower 32 bits is an array index. + if (index >= handles.size()) + handles.resize(index+1, 0); + handles[index] = object; + } + + Cpg* get(cpg_handle_t handle) { + sys::Mutex::ScopedLock l(lock); + uint32_t index=uint32_t(handle); // Lower 32 bits is an array index. + assert(index < handles.size()); + assert(handles[index]); + return handles[index]; + } + + private: + sys::Mutex lock; + vector<Cpg*> handles; +}; + +Cpg::Handles Cpg::handles; + +// Global callback functions call per-object callbacks via handles vector. +void Cpg::globalDeliver ( + cpg_handle_t handle, + struct cpg_name *group, + uint32_t nodeid, + uint32_t pid, + void* msg, + int msg_len) +{ + handles.get(handle)->deliver(handle, group, nodeid, pid, msg, msg_len); +} + +void Cpg::globalConfigChange( + cpg_handle_t handle, + struct cpg_name *group, + struct cpg_address *members, int nMembers, + struct cpg_address *left, int nLeft, + struct cpg_address *joined, int nJoined +) +{ + handles.get(handle)->configChange(handle, group, members, nMembers, left, nLeft, joined, nJoined); +} + +Cpg::Cpg(DeliverFn d, ConfigChangeFn c) : deliver(d), configChange(c) +{ + cpg_callbacks_t callbacks = { &globalDeliver, &globalConfigChange }; + check(cpg_initialize(&handle, &callbacks), "Cannot initialize CPG"); + handles.put(handle, this); +} + +Cpg::~Cpg() { + try { + check(cpg_finalize(handle), "Error in shutdown of CPG"); + } + catch (...) { + handles.put(handle, 0); + throw; + } +} + string Cpg::errorStr(cpg_error_t err, const std::string& msg) { switch (err) { case CPG_OK: return msg+": ok"; @@ -56,6 +130,34 @@ std::string Cpg::cantMcastMsg(const Name& group) { return "Cannot mcast to CPG group "+group.str(); } +uint32_t Cpg::getLocalNoideId() const { + unsigned int nodeid; + check(cpg_local_get(handle, &nodeid), "Cannot get local node ID"); + assert(nodeid <= std::numeric_limits<uint32_t>::max()); + return nodeid; +} + +ostream& operator<<(ostream& o, std::pair<cpg_address*,int> a) { + ostream_iterator<Cpg::Id> i(o, " "); + std::copy(a.first, a.first+a.second, i); + return o; +} + +static int popbyte(uint32_t& n) { + uint8_t b=n&0xff; + n>>=8; + return b; +} + +ostream& operator <<(ostream& out, const Cpg::Id& id) { + uint32_t node=id.nodeId(); + out << popbyte(node); + for (int i = 0; i < 3; i++) + out << "." << popbyte(node); + return out << ":" << id.pid(); +} + + }} // namespace qpid::cpg |