summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/qpid/cluster/Quorum_cman.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src/qpid/cluster/Quorum_cman.cpp')
-rw-r--r--qpid/cpp/src/qpid/cluster/Quorum_cman.cpp105
1 files changed, 105 insertions, 0 deletions
diff --git a/qpid/cpp/src/qpid/cluster/Quorum_cman.cpp b/qpid/cpp/src/qpid/cluster/Quorum_cman.cpp
new file mode 100644
index 0000000000..728f824b16
--- /dev/null
+++ b/qpid/cpp/src/qpid/cluster/Quorum_cman.cpp
@@ -0,0 +1,105 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/cluster/Quorum_cman.h"
+#include "qpid/cluster/Cluster.h"
+#include "qpid/log/Statement.h"
+#include "qpid/Options.h"
+#include "qpid/sys/Time.h"
+#include "qpid/sys/posix/PrivatePosix.h"
+
+namespace qpid {
+namespace cluster {
+
+namespace {
+
+boost::function<void()> errorFn;
+
+void cmanCallbackFn(cman_handle_t handle, void */*privdata*/, int reason, int /*arg*/) {
+ if (reason == CMAN_REASON_STATECHANGE && !cman_is_quorate(handle)) {
+ QPID_LOG(critical, "Lost contact with cluster quorum.");
+ if (errorFn) errorFn();
+ cman_stop_notification(handle);
+ }
+}
+}
+
+Quorum::Quorum(boost::function<void()> err) : cman(0), cmanFd(0) {
+ errorFn = err;
+}
+
+Quorum::~Quorum() {
+ if (dispatchHandle.get()) dispatchHandle->stopWatch();
+ dispatchHandle.reset();
+ if (cman) cman_finish(cman);
+}
+
+void Quorum::start(boost::shared_ptr<sys::Poller> p) {
+ poller = p;
+ QPID_LOG(debug, "Connecting to quorum service.");
+ cman = cman_init(0);
+ if (cman == 0) throw ErrnoException("Can't connect to cman service");
+ if (!cman_is_quorate(cman)) {
+ QPID_LOG(notice, "Waiting for cluster quorum.");
+ while(!cman_is_quorate(cman)) sys::sleep(5);
+ }
+ int err = cman_start_notification(cman, cmanCallbackFn);
+ if (err != 0) throw ErrnoException("Can't register for cman notifications");
+ watch(getFd());
+}
+
+void Quorum::watch(int fd) {
+ cmanFd = fd;
+ if (dispatchHandle.get()) dispatchHandle->stopWatch();
+ ioHandle.reset(new sys::PosixIOHandle(cmanFd));
+ dispatchHandle.reset(
+ new sys::DispatchHandleRef(
+ *ioHandle, // This must outlive the dispatchHandleRef
+ boost::bind(&Quorum::dispatch, this, _1), // read
+ 0, // write
+ boost::bind(&Quorum::disconnect, this, _1) // disconnect
+ ));
+ dispatchHandle->startWatch(poller);
+}
+
+int Quorum::getFd() {
+ int fd = cman_get_fd(cman);
+ if (fd == 0) throw ErrnoException("Can't get cman file descriptor");
+ return fd;
+}
+
+void Quorum::dispatch(sys::DispatchHandle&) {
+ try {
+ cman_dispatch(cman, CMAN_DISPATCH_ALL);
+ int fd = getFd();
+ if (fd != cmanFd) watch(fd);
+ } catch (const std::exception& e) {
+ QPID_LOG(critical, "Error in quorum dispatch: " << e.what());
+ errorFn();
+ }
+}
+
+void Quorum::disconnect(sys::DispatchHandle&) {
+ QPID_LOG(critical, "Disconnected from quorum service");
+ errorFn();
+}
+
+}} // namespace qpid::cluster