summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/cluster/Cpg.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/cluster/Cpg.cpp')
-rw-r--r--cpp/src/qpid/cluster/Cpg.cpp102
1 files changed, 102 insertions, 0 deletions
diff --git a/cpp/src/qpid/cluster/Cpg.cpp b/cpp/src/qpid/cluster/Cpg.cpp
index 858d25f37c..a979ce1eeb 100644
--- a/cpp/src/qpid/cluster/Cpg.cpp
+++ b/cpp/src/qpid/cluster/Cpg.cpp
@@ -17,12 +17,86 @@
*/
#include "Cpg.h"
+#include "qpid/sys/Mutex.h"
+#include <vector>
+#include <limits>
+#include <iterator>
namespace qpid {
namespace cluster {
using namespace std;
+// Global vector of Cpg pointers by handle.
+// TODO aconway 2007-06-12: Replace this with cpg_get/set_context,
+// coming in in RHEL 5.1.
+class Cpg::Handles
+{
+ public:
+ void put(cpg_handle_t handle, Cpg* object) {
+ sys::Mutex::ScopedLock l(lock);
+ assert(object);
+ uint32_t index=uint32_t(handle); // Lower 32 bits is an array index.
+ if (index >= handles.size())
+ handles.resize(index+1, 0);
+ handles[index] = object;
+ }
+
+ Cpg* get(cpg_handle_t handle) {
+ sys::Mutex::ScopedLock l(lock);
+ uint32_t index=uint32_t(handle); // Lower 32 bits is an array index.
+ assert(index < handles.size());
+ assert(handles[index]);
+ return handles[index];
+ }
+
+ private:
+ sys::Mutex lock;
+ vector<Cpg*> handles;
+};
+
+Cpg::Handles Cpg::handles;
+
+// Global callback functions call per-object callbacks via handles vector.
+void Cpg::globalDeliver (
+ cpg_handle_t handle,
+ struct cpg_name *group,
+ uint32_t nodeid,
+ uint32_t pid,
+ void* msg,
+ int msg_len)
+{
+ handles.get(handle)->deliver(handle, group, nodeid, pid, msg, msg_len);
+}
+
+void Cpg::globalConfigChange(
+ cpg_handle_t handle,
+ struct cpg_name *group,
+ struct cpg_address *members, int nMembers,
+ struct cpg_address *left, int nLeft,
+ struct cpg_address *joined, int nJoined
+)
+{
+ handles.get(handle)->configChange(handle, group, members, nMembers, left, nLeft, joined, nJoined);
+}
+
+Cpg::Cpg(DeliverFn d, ConfigChangeFn c) : deliver(d), configChange(c)
+{
+ cpg_callbacks_t callbacks = { &globalDeliver, &globalConfigChange };
+ check(cpg_initialize(&handle, &callbacks), "Cannot initialize CPG");
+ handles.put(handle, this);
+}
+
+Cpg::~Cpg() {
+ try {
+ check(cpg_finalize(handle), "Error in shutdown of CPG");
+ }
+ catch (...) {
+ handles.put(handle, 0);
+ throw;
+ }
+}
+
string Cpg::errorStr(cpg_error_t err, const std::string& msg) {
switch (err) {
case CPG_OK: return msg+": ok";
@@ -56,6 +130,34 @@ std::string Cpg::cantMcastMsg(const Name& group) {
return "Cannot mcast to CPG group "+group.str();
}
+uint32_t Cpg::getLocalNoideId() const {
+ unsigned int nodeid;
+ check(cpg_local_get(handle, &nodeid), "Cannot get local node ID");
+ assert(nodeid <= std::numeric_limits<uint32_t>::max());
+ return nodeid;
+}
+
+ostream& operator<<(ostream& o, std::pair<cpg_address*,int> a) {
+ ostream_iterator<Cpg::Id> i(o, " ");
+ std::copy(a.first, a.first+a.second, i);
+ return o;
+}
+
+static int popbyte(uint32_t& n) {
+ uint8_t b=n&0xff;
+ n>>=8;
+ return b;
+}
+
+ostream& operator <<(ostream& out, const Cpg::Id& id) {
+ uint32_t node=id.nodeId();
+ out << popbyte(node);
+ for (int i = 0; i < 3; i++)
+ out << "." << popbyte(node);
+ return out << ":" << id.pid();
+}
+
+
}} // namespace qpid::cpg