diff options
Diffstat (limited to 'cpp/src/qpid/cluster/Cpg.cpp')
-rw-r--r-- | cpp/src/qpid/cluster/Cpg.cpp | 52 |
1 files changed, 13 insertions, 39 deletions
diff --git a/cpp/src/qpid/cluster/Cpg.cpp b/cpp/src/qpid/cluster/Cpg.cpp index 7831f66da1..3118e11e57 100644 --- a/cpp/src/qpid/cluster/Cpg.cpp +++ b/cpp/src/qpid/cluster/Cpg.cpp @@ -32,36 +32,14 @@ namespace cluster { using namespace std; -// Global vector of Cpg pointers by handle. -// TODO aconway 2007-06-12: Replace this with cpg_get/set_context, -// coming in in RHEL 5.1. -class Cpg::Handles -{ - public: - void put(cpg_handle_t handle, Cpg::Handler* handler) { - sys::Mutex::ScopedLock l(lock); - uint32_t index=uint32_t(handle); // Lower 32 bits is an array index. - if (index >= handles.size()) - handles.resize(index+1, 0); - handles[index] = handler; - } - - Cpg::Handler* get(cpg_handle_t handle) { - sys::Mutex::ScopedLock l(lock); - uint32_t index=uint32_t(handle); // Lower 32 bits is an array index. - assert(index < handles.size()); - assert(handles[index]); - return handles[index]; - } - - private: - sys::Mutex lock; - vector<Cpg::Handler*> handles; -}; - -Cpg::Handles Cpg::handles; +Cpg* Cpg::cpgFromHandle(cpg_handle_t handle) { + void* cpg=0; + check(cpg_context_get(handle, &cpg), "Cannot get CPG instance."); + if (!cpg) throw Exception("Cannot get CPG instance."); + return reinterpret_cast<Cpg*>(cpg); +} -// Global callback functions call per-object callbacks via handles vector. +// Global callback functions. void Cpg::globalDeliver ( cpg_handle_t handle, struct cpg_name *group, @@ -70,9 +48,7 @@ void Cpg::globalDeliver ( void* msg, int msg_len) { - Cpg::Handler* handler=handles.get(handle); - if (handler) - handler->deliver(handle, group, nodeid, pid, msg, msg_len); + cpgFromHandle(handle)->handler.deliver(handle, group, nodeid, pid, msg, msg_len); } void Cpg::globalConfigChange( @@ -83,15 +59,13 @@ void Cpg::globalConfigChange( struct cpg_address *joined, int nJoined ) { - Cpg::Handler* handler=handles.get(handle); - if (handler) - handler->configChange(handle, group, members, nMembers, left, nLeft, joined, nJoined); + cpgFromHandle(handle)->handler.configChange(handle, group, members, nMembers, left, nLeft, joined, nJoined); } Cpg::Cpg(Handler& h) : handler(h) { cpg_callbacks_t callbacks = { &globalDeliver, &globalConfigChange }; check(cpg_initialize(&handle, &callbacks), "Cannot initialize CPG"); - handles.put(handle, &handler); + check(cpg_context_set(handle, this), "Cannot set CPG context"); QPID_LOG(debug, "Initialize CPG handle 0x" << std::hex << handle); } @@ -104,10 +78,10 @@ Cpg::~Cpg() { } void Cpg::shutdown() { - if (handles.get(handle)) { - QPID_LOG(debug, "Finalize CPG handle " << std::hex << handle); - handles.put(handle, 0); + if (handle) { + cpg_context_set(handle, 0); check(cpg_finalize(handle), "Error in shutdown of CPG"); + handle = 0; } } |