diff options
Diffstat (limited to 'cpp/src/qpid')
-rw-r--r-- | cpp/src/qpid/cluster/Cluster.cpp | 1 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/Cpg.cpp | 52 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/Cpg.h | 16 |
3 files changed, 20 insertions, 49 deletions
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp index 2727d5af0a..d97a840f82 100644 --- a/cpp/src/qpid/cluster/Cluster.cpp +++ b/cpp/src/qpid/cluster/Cluster.cpp @@ -114,6 +114,7 @@ Cluster::~Cluster() { QPID_LOG(trace, *this << " Leaving cluster."); try { cpg.leave(name); + cpg.shutdown(); dispatcher.join(); } catch (const std::exception& e) { diff --git a/cpp/src/qpid/cluster/Cpg.cpp b/cpp/src/qpid/cluster/Cpg.cpp index 7831f66da1..3118e11e57 100644 --- a/cpp/src/qpid/cluster/Cpg.cpp +++ b/cpp/src/qpid/cluster/Cpg.cpp @@ -32,36 +32,14 @@ namespace cluster { using namespace std; -// Global vector of Cpg pointers by handle. -// TODO aconway 2007-06-12: Replace this with cpg_get/set_context, -// coming in in RHEL 5.1. -class Cpg::Handles -{ - public: - void put(cpg_handle_t handle, Cpg::Handler* handler) { - sys::Mutex::ScopedLock l(lock); - uint32_t index=uint32_t(handle); // Lower 32 bits is an array index. - if (index >= handles.size()) - handles.resize(index+1, 0); - handles[index] = handler; - } - - Cpg::Handler* get(cpg_handle_t handle) { - sys::Mutex::ScopedLock l(lock); - uint32_t index=uint32_t(handle); // Lower 32 bits is an array index. - assert(index < handles.size()); - assert(handles[index]); - return handles[index]; - } - - private: - sys::Mutex lock; - vector<Cpg::Handler*> handles; -}; - -Cpg::Handles Cpg::handles; +Cpg* Cpg::cpgFromHandle(cpg_handle_t handle) { + void* cpg=0; + check(cpg_context_get(handle, &cpg), "Cannot get CPG instance."); + if (!cpg) throw Exception("Cannot get CPG instance."); + return reinterpret_cast<Cpg*>(cpg); +} -// Global callback functions call per-object callbacks via handles vector. +// Global callback functions. void Cpg::globalDeliver ( cpg_handle_t handle, struct cpg_name *group, @@ -70,9 +48,7 @@ void Cpg::globalDeliver ( void* msg, int msg_len) { - Cpg::Handler* handler=handles.get(handle); - if (handler) - handler->deliver(handle, group, nodeid, pid, msg, msg_len); + cpgFromHandle(handle)->handler.deliver(handle, group, nodeid, pid, msg, msg_len); } void Cpg::globalConfigChange( @@ -83,15 +59,13 @@ void Cpg::globalConfigChange( struct cpg_address *joined, int nJoined ) { - Cpg::Handler* handler=handles.get(handle); - if (handler) - handler->configChange(handle, group, members, nMembers, left, nLeft, joined, nJoined); + cpgFromHandle(handle)->handler.configChange(handle, group, members, nMembers, left, nLeft, joined, nJoined); } Cpg::Cpg(Handler& h) : handler(h) { cpg_callbacks_t callbacks = { &globalDeliver, &globalConfigChange }; check(cpg_initialize(&handle, &callbacks), "Cannot initialize CPG"); - handles.put(handle, &handler); + check(cpg_context_set(handle, this), "Cannot set CPG context"); QPID_LOG(debug, "Initialize CPG handle 0x" << std::hex << handle); } @@ -104,10 +78,10 @@ Cpg::~Cpg() { } void Cpg::shutdown() { - if (handles.get(handle)) { - QPID_LOG(debug, "Finalize CPG handle " << std::hex << handle); - handles.put(handle, 0); + if (handle) { + cpg_context_set(handle, 0); check(cpg_finalize(handle), "Error in shutdown of CPG"); + handle = 0; } } diff --git a/cpp/src/qpid/cluster/Cpg.h b/cpp/src/qpid/cluster/Cpg.h index a918fb0cbf..d3142efcb6 100644 --- a/cpp/src/qpid/cluster/Cpg.h +++ b/cpp/src/qpid/cluster/Cpg.h @@ -38,6 +38,8 @@ namespace cluster { * Lightweight C++ interface to cpg.h operations. * Manages a single CPG handle, initialized in ctor, finialzed in destructor. * On error all functions throw Cpg::Exception + * + * NOTE: only one at a time can exist per process. */ class Cpg : public Dispatchable { public: @@ -95,7 +97,7 @@ class Cpg : public Dispatchable { */ Cpg(Handler&); - /** Destructor calls shutdown. */ + /** Destructor calls shutdown if not already calledx. */ ~Cpg(); /** Disconnect from CPG */ @@ -134,22 +136,17 @@ class Cpg : public Dispatchable { Id self() const; private: - class Handles; - struct ClearHandleOnExit; - friend class Handles; - friend struct ClearHandleOnExit; - static std::string errorStr(cpg_error_t err, const std::string& msg); static std::string cantJoinMsg(const Name&); static std::string cantLeaveMsg(const Name&); static std::string cantMcastMsg(const Name&); static void check(cpg_error_t result, const std::string& msg) { - // TODO aconway 2007-06-01: Logging and exceptions. - if (result != CPG_OK) - throw Exception(errorStr(result, msg)); + if (result != CPG_OK) throw Exception(errorStr(result, msg)); } + static Cpg* cpgFromHandle(cpg_handle_t); + static void globalDeliver( cpg_handle_t handle, struct cpg_name *group, @@ -166,7 +163,6 @@ class Cpg : public Dispatchable { struct cpg_address *joined, int nJoined ); - static Handles handles; cpg_handle_t handle; Handler& handler; }; |