summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/qpid/cluster/Cpg.h
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src/qpid/cluster/Cpg.h')
-rw-r--r--qpid/cpp/src/qpid/cluster/Cpg.h236
1 files changed, 236 insertions, 0 deletions
diff --git a/qpid/cpp/src/qpid/cluster/Cpg.h b/qpid/cpp/src/qpid/cluster/Cpg.h
new file mode 100644
index 0000000000..6b81c602bd
--- /dev/null
+++ b/qpid/cpp/src/qpid/cluster/Cpg.h
@@ -0,0 +1,236 @@
+#ifndef CPG_H
+#define CPG_H
+
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include "qpid/Exception.h"
+#include "qpid/cluster/types.h"
+#include "qpid/sys/IOHandle.h"
+#include "qpid/sys/Mutex.h"
+
+#include <boost/scoped_ptr.hpp>
+
+#include <cassert>
+#include <string.h>
+
+namespace qpid {
+namespace cluster {
+
+/**
+ * Lightweight C++ interface to cpg.h operations.
+ *
+ * Manages a single CPG handle, initialized in ctor, finialzed in destructor.
+ * On error all functions throw Cpg::Exception.
+ *
+ */
+
+class Cpg : public sys::IOHandle {
+ public:
+ struct Exception : public ::qpid::Exception {
+ Exception(const std::string& msg) : ::qpid::Exception(msg) {}
+ };
+
+ struct Name : public cpg_name {
+ Name() { length = 0; }
+ Name(const char* s) { copy(s, strlen(s)); }
+ Name(const char* s, size_t n) { copy(s,n); }
+ Name(const std::string& s) { copy(s.data(), s.size()); }
+ void copy(const char* s, size_t n) {
+ assert(n < CPG_MAX_NAME_LENGTH);
+ memcpy(value, s, n);
+ length=n;
+ }
+
+ std::string str() const { return std::string(value, length); }
+ };
+
+ static std::string str(const cpg_name& n) {
+ return std::string(n.value, n.length);
+ }
+
+ struct Handler {
+ virtual ~Handler() {};
+ virtual void deliver(
+ cpg_handle_t /*handle*/,
+ const struct cpg_name *group,
+ uint32_t /*nodeid*/,
+ uint32_t /*pid*/,
+ void* /*msg*/,
+ int /*msg_len*/) = 0;
+
+ virtual void configChange(
+ cpg_handle_t /*handle*/,
+ const struct cpg_name */*group*/,
+ const struct cpg_address */*members*/, int /*nMembers*/,
+ const struct cpg_address */*left*/, int /*nLeft*/,
+ const struct cpg_address */*joined*/, int /*nJoined*/
+ ) = 0;
+ };
+
+ /** Open a CPG handle.
+ *@param handler for CPG events.
+ */
+ Cpg(Handler&);
+
+ /** Destructor calls shutdown if not already calledx. */
+ ~Cpg();
+
+ /** Disconnect from CPG */
+ void shutdown();
+
+ void dispatchOne();
+ void dispatchAll();
+ void dispatchBlocking();
+
+ void join(const std::string& group);
+ void leave();
+
+ /** Multicast to the group. NB: must not be called concurrently.
+ *
+ *@return true if the message was multi-cast, false if
+ * it was not sent due to flow control.
+ */
+ bool mcast(const iovec* iov, int iovLen);
+
+ cpg_handle_t getHandle() const { return handle; }
+
+ MemberId self() const;
+
+ int getFd();
+
+ private:
+
+ // Maximum number of retries for cog functions that can tell
+ // us to "try again later".
+ static const unsigned int cpgRetries = 5;
+
+ // Don't let sleep-time between cpg retries to go above 0.1 second.
+ static const unsigned int maxCpgRetrySleep = 100000;
+
+
+ // Base class for the Cpg operations that need retry capability.
+ struct CpgOp {
+ std::string opName;
+
+ CpgOp ( std::string opName )
+ : opName(opName) { }
+
+ virtual cpg_error_t op ( cpg_handle_t handle, struct cpg_name * ) = 0;
+ virtual std::string msg(const Name&) = 0;
+ virtual ~CpgOp ( ) { }
+ };
+
+
+ struct CpgJoinOp : public CpgOp {
+ CpgJoinOp ( )
+ : CpgOp ( std::string("cpg_join") ) { }
+
+ cpg_error_t op(cpg_handle_t handle, struct cpg_name * group) {
+ return cpg_join ( handle, group );
+ }
+
+ std::string msg(const Name& name) { return cantJoinMsg(name); }
+ };
+
+ struct CpgLeaveOp : public CpgOp {
+ CpgLeaveOp ( )
+ : CpgOp ( std::string("cpg_leave") ) { }
+
+ cpg_error_t op(cpg_handle_t handle, struct cpg_name * group) {
+ return cpg_leave ( handle, group );
+ }
+
+ std::string msg(const Name& name) { return cantLeaveMsg(name); }
+ };
+
+ struct CpgFinalizeOp : public CpgOp {
+ CpgFinalizeOp ( )
+ : CpgOp ( std::string("cpg_finalize") ) { }
+
+ cpg_error_t op(cpg_handle_t handle, struct cpg_name *) {
+ return cpg_finalize ( handle );
+ }
+
+ std::string msg(const Name& name) { return cantFinalizeMsg(name); }
+ };
+
+ // This fn standardizes retry policy across all Cpg ops that need it.
+ void callCpg ( CpgOp & );
+
+ CpgJoinOp cpgJoinOp;
+ CpgLeaveOp cpgLeaveOp;
+ CpgFinalizeOp cpgFinalizeOp;
+
+ static std::string errorStr(cpg_error_t err, const std::string& msg);
+ static std::string cantJoinMsg(const Name&);
+ static std::string cantLeaveMsg(const Name&);
+ static std::string cantMcastMsg(const Name&);
+ static std::string cantFinalizeMsg(const Name&);
+
+ static Cpg* cpgFromHandle(cpg_handle_t);
+
+ // New versions for corosync 1.0 and higher
+ static void globalDeliver(
+ cpg_handle_t handle,
+ const struct cpg_name *group,
+ uint32_t nodeid,
+ uint32_t pid,
+ void* msg,
+ size_t msg_len);
+
+ static void globalConfigChange(
+ cpg_handle_t handle,
+ const struct cpg_name *group,
+ const struct cpg_address *members, size_t nMembers,
+ const struct cpg_address *left, size_t nLeft,
+ const struct cpg_address *joined, size_t nJoined
+ );
+
+ // Old versions for openais
+ static void globalDeliver(
+ cpg_handle_t handle,
+ struct cpg_name *group,
+ uint32_t nodeid,
+ uint32_t pid,
+ void* msg,
+ int msg_len);
+
+ static void globalConfigChange(
+ cpg_handle_t handle,
+ struct cpg_name *group,
+ struct cpg_address *members, int nMembers,
+ struct cpg_address *left, int nLeft,
+ struct cpg_address *joined, int nJoined
+ );
+
+ cpg_handle_t handle;
+ Handler& handler;
+ bool isShutdown;
+ Name group;
+ sys::Mutex dispatchLock;
+};
+
+inline bool operator==(const cpg_name& a, const cpg_name& b) {
+ return a.length==b.length && strncmp(a.value, b.value, a.length) == 0;
+}
+inline bool operator!=(const cpg_name& a, const cpg_name& b) { return !(a == b); }
+
+}} // namespace qpid::cluster
+
+#endif /*!CPG_H*/