summaryrefslogtreecommitdiff
path: root/cpp/src
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2008-11-18 19:55:59 +0000
committerAlan Conway <aconway@apache.org>2008-11-18 19:55:59 +0000
commit970fba7f2422eab256273a610135be33bd37f7d6 (patch)
tree2e1e66e3bd717c2adc02b9787bb55a0c20679381 /cpp/src
parentb355d0e5b46739c74e9cbef449d8fc50646e4db2 (diff)
downloadqpid-python-970fba7f2422eab256273a610135be33bd37f7d6.tar.gz
Optional cluster integration with cman quorum service.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@718693 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src')
-rw-r--r--cpp/src/cluster.mk17
-rw-r--r--cpp/src/qpid/cluster/Cluster.cpp12
-rw-r--r--cpp/src/qpid/cluster/Cluster.h6
-rw-r--r--cpp/src/qpid/cluster/OutputInterceptor.cpp1
-rw-r--r--cpp/src/qpid/cluster/Quorum.h32
-rw-r--r--cpp/src/qpid/cluster/Quorum_cman.cpp53
-rw-r--r--cpp/src/qpid/cluster/Quorum_cman.h53
-rw-r--r--cpp/src/qpid/cluster/Quorum_null.h39
-rw-r--r--cpp/src/tests/cluster.mk10
-rwxr-xr-xcpp/src/tests/ssl_test3
10 files changed, 213 insertions, 13 deletions
diff --git a/cpp/src/cluster.mk b/cpp/src/cluster.mk
index 0b62140465..a3f32dcf85 100644
--- a/cpp/src/cluster.mk
+++ b/cpp/src/cluster.mk
@@ -1,11 +1,20 @@
#
# Cluster library makefile fragment, to be included in Makefile.am
#
-if CPG
+# Optional CMAN support
+if HAVE_LIBCMAN
+CMAN_SOURCES= qpid/cluster/Quorum_cman.h qpid/cluster/Quorum_cman.cpp
+libcman = -lcman
+else
+CMAN_SOURCES= qpid/cluster/Quorum_null.h
+endif
+
+if HAVE_LIBCPG
dmodule_LTLIBRARIES += cluster.la
cluster_la_SOURCES = \
+ $(CMAN_SOURCES) \
qpid/cluster/types.h \
qpid/cluster/Cluster.cpp \
qpid/cluster/Cluster.h \
@@ -32,9 +41,7 @@ cluster_la_SOURCES = \
qpid/cluster/FailoverExchange.h \
qpid/cluster/FailoverExchange.cpp
-cluster_la_LIBADD= -lcpg libqpidbroker.la libqpidclient.la
-
+cluster_la_LIBADD= -lcpg $(libcman) libqpidbroker.la libqpidclient.la
cluster_la_LDFLAGS = $(PLUGINLDFLAGS)
-endif
-
+endif # HAVE_LIBCPG
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp
index f6022aa5b8..b2650ffa7f 100644
--- a/cpp/src/qpid/cluster/Cluster.cpp
+++ b/cpp/src/qpid/cluster/Cluster.cpp
@@ -86,6 +86,7 @@ struct ClusterDispatcher : public framing::AMQP_AllOperations::ClusterHandler {
};
Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker& b) :
+ isQuorate(isQuorateImpl),
broker(b),
poller(b.getPoller()),
cpg(*this),
@@ -591,6 +592,17 @@ broker::Broker& Cluster::getBroker() const {
return broker; // Immutable, no need to lock.
}
+/** Default implementation for isQuorateImpl when there is no quorum service. */
+bool Cluster::isQuorateImpl() { return true; }
+
+void Cluster::checkQuorum() {
+ if (!isQuorate()) {
+ QPID_LOG(critical, *this << " disconnected from cluster quorum, shutting down");
+ leave();
+ throw Exception(QPID_MSG(*this << " disconnected from cluster quorum."));
+ }
+}
+
void Cluster::setClusterId(const Uuid& uuid) {
clusterId = uuid;
if (mgmtObject)
diff --git a/cpp/src/qpid/cluster/Cluster.h b/cpp/src/qpid/cluster/Cluster.h
index d8b9c958d8..aff3f18c6d 100644
--- a/cpp/src/qpid/cluster/Cluster.h
+++ b/cpp/src/qpid/cluster/Cluster.h
@@ -92,6 +92,9 @@ class Cluster : private Cpg::Handler, public management::Manageable {
MemberId getId() const;
broker::Broker& getBroker() const;
+ boost::function<bool ()> isQuorate;
+ void checkQuorum();
+
private:
typedef sys::LockPtr<Cluster,sys::Monitor> LockPtr;
typedef sys::LockPtr<const Cluster,sys::Monitor> ConstLockPtr;
@@ -173,7 +176,8 @@ class Cluster : private Cpg::Handler, public management::Manageable {
void dumpOutDone(Lock&);
void setClusterId(const framing::Uuid&);
-
+ static bool isQuorateImpl();
+
mutable sys::Monitor lock;
broker::Broker& broker;
diff --git a/cpp/src/qpid/cluster/OutputInterceptor.cpp b/cpp/src/qpid/cluster/OutputInterceptor.cpp
index effd2c5bff..68afb9bda0 100644
--- a/cpp/src/qpid/cluster/OutputInterceptor.cpp
+++ b/cpp/src/qpid/cluster/OutputInterceptor.cpp
@@ -37,6 +37,7 @@ OutputInterceptor::OutputInterceptor(cluster::Connection& p, sys::ConnectionOutp
{}
void OutputInterceptor::send(framing::AMQFrame& f) {
+ parent.getCluster().checkQuorum();
Locker l(lock);
next->send(f);
if (!parent.isCatchUp())
diff --git a/cpp/src/qpid/cluster/Quorum.h b/cpp/src/qpid/cluster/Quorum.h
new file mode 100644
index 0000000000..f07b58dfa6
--- /dev/null
+++ b/cpp/src/qpid/cluster/Quorum.h
@@ -0,0 +1,32 @@
+#ifndef QPID_CLUSTER_QUORUM_H
+#define QPID_CLUSTER_QUORUM_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "config.h"
+
+#if HAVE_LIBCMAN
+#include "Quorum_cman.h"
+#else
+#include "Quorum_null.h"
+#endif
+
+#endif /*!QPID_CLUSTER_QUORUM_H*/
diff --git a/cpp/src/qpid/cluster/Quorum_cman.cpp b/cpp/src/qpid/cluster/Quorum_cman.cpp
new file mode 100644
index 0000000000..0d4656b536
--- /dev/null
+++ b/cpp/src/qpid/cluster/Quorum_cman.cpp
@@ -0,0 +1,53 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "Quorum.h"
+#include "qpid/log/Statement.h"
+#include "qpid/Options.h"
+#include "qpid/sys/Time.h"
+
+namespace qpid {
+namespace cluster {
+
+Quorum::Quorum() : enable(false), cman(0) {}
+
+Quorum::~Quorum() { if (cman) cman_finish(cman); }
+
+void Quorum::addOption(Options& opts) {
+ opts.addOptions()("cluster-cman", optValue(enable), "Enable integration with CMAN Cluster Manager");
+}
+
+void Quorum::init() {
+ if (enable) {
+ cman = cman_init(0);
+ if (cman == 0) throw ErrnoException("Can't connect to cman service");
+ // FIXME aconway 2008-11-13: configure max wait.
+ for (int retry = 0; !cman_is_quorate(cman) && retry < 30; retry++) {
+ QPID_LOG(notice, "Waiting for cluster quorum: " << sys::strError(errno));
+ sys::sleep(1);
+ }
+ if (!cman_is_quorate(cman))
+ throw ErrnoException("Timed out waiting for cluster quorum");
+ }
+}
+
+bool Quorum::isQuorate() { return cman_is_quorate(cman); }
+
+}} // namespace qpid::cluster
diff --git a/cpp/src/qpid/cluster/Quorum_cman.h b/cpp/src/qpid/cluster/Quorum_cman.h
new file mode 100644
index 0000000000..bf02f697b0
--- /dev/null
+++ b/cpp/src/qpid/cluster/Quorum_cman.h
@@ -0,0 +1,53 @@
+#ifndef QPID_CLUSTER_QUORUM_CMAN_H
+#define QPID_CLUSTER_QUORUM_CMAN_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+extern "C" {
+#include <libcman.h>
+}
+
+namespace qpid {
+
+class Options;
+
+namespace cluster {
+
+class Quorum {
+ public:
+ Quorum();
+ ~Quorum();
+ void addOption(Options& opts);
+ void init();
+ bool isQuorate();
+
+ private:
+ bool enable;
+ cman_handle_t cman;
+};
+
+
+}} // namespace qpid::cluster
+
+ // namespace qpid::cluster
+
+#endif /*!QPID_CLUSTER_QUORUM_CMAN_H*/
diff --git a/cpp/src/qpid/cluster/Quorum_null.h b/cpp/src/qpid/cluster/Quorum_null.h
new file mode 100644
index 0000000000..96374a5e88
--- /dev/null
+++ b/cpp/src/qpid/cluster/Quorum_null.h
@@ -0,0 +1,39 @@
+#ifndef QPID_CLUSTER_QUORUM_NULL_H
+#define QPID_CLUSTER_QUORUM_NULL_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+namespace qpid {
+namespace cluster {
+
+/** Null implementation of quorum. */
+
+class Quorum {
+ public:
+ void init();
+ bool isQuorate() { return true; }
+ void addOption(Options& opts) {}
+};
+
+#endif
+
+
+#endif /*!QPID_CLUSTER_QUORUM_NULL_H*/
diff --git a/cpp/src/tests/cluster.mk b/cpp/src/tests/cluster.mk
index 88574943fd..95d8d0f034 100644
--- a/cpp/src/tests/cluster.mk
+++ b/cpp/src/tests/cluster.mk
@@ -1,10 +1,10 @@
-if CPG
+
+if HAVE_LIBCPG
+
#
# Cluster tests makefile fragment, to be included in Makefile.am
#
-lib_cluster = $(abs_builddir)/../cluster.la
-
# NOTE: Programs using the openais library must be run with gid=ais
# You should do "newgrp ais" before running the tests to run these.
#
@@ -16,8 +16,8 @@ EXTRA_DIST+=ais_check start_cluster stop_cluster
check_PROGRAMS+=cluster_test
cluster_test_SOURCES=unit_test.cpp cluster_test.cpp
-cluster_test_LDADD=$(lib_client) $(lib_cluster) -lboost_unit_test_framework
+cluster_test_LDADD=$(lib_client) ../cluster.la -lboost_unit_test_framework
-unit_test_LDADD+=$(lib_cluster)
+unit_test_LDADD+=../cluster.la
endif
diff --git a/cpp/src/tests/ssl_test b/cpp/src/tests/ssl_test
index 2a12c79c5b..9fc54008b7 100755
--- a/cpp/src/tests/ssl_test
+++ b/cpp/src/tests/ssl_test
@@ -19,8 +19,7 @@ create_certs() {
}
start_broker() {
- ../qpidd --daemon --transport ssl --port 0 --ssl-port 0 --no-data-dir --no-module-dir --auth no --config $CONFIG\
- --load-module ../.libs/ssl.so --ssl-cert-db $CERT_DIR --ssl-cert-password-file $CERT_PW_FILE > qpidd.port
+ ../qpidd --daemon --transport ssl --port 0 --ssl-port 0 --no-data-dir --no-module-dir --auth no --config $CONFIG --load-module ../.libs/ssl.so --ssl-cert-db $CERT_DIR --ssl-cert-password-file $CERT_PW_FILE > qpidd.port
PORT=`cat qpidd.port`
}