diff options
author | Alan Conway <aconway@apache.org> | 2008-11-18 19:55:59 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2008-11-18 19:55:59 +0000 |
commit | 970fba7f2422eab256273a610135be33bd37f7d6 (patch) | |
tree | 2e1e66e3bd717c2adc02b9787bb55a0c20679381 /cpp/src | |
parent | b355d0e5b46739c74e9cbef449d8fc50646e4db2 (diff) | |
download | qpid-python-970fba7f2422eab256273a610135be33bd37f7d6.tar.gz |
Optional cluster integration with cman quorum service.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@718693 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src')
-rw-r--r-- | cpp/src/cluster.mk | 17 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/Cluster.cpp | 12 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/Cluster.h | 6 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/OutputInterceptor.cpp | 1 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/Quorum.h | 32 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/Quorum_cman.cpp | 53 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/Quorum_cman.h | 53 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/Quorum_null.h | 39 | ||||
-rw-r--r-- | cpp/src/tests/cluster.mk | 10 | ||||
-rwxr-xr-x | cpp/src/tests/ssl_test | 3 |
10 files changed, 213 insertions, 13 deletions
diff --git a/cpp/src/cluster.mk b/cpp/src/cluster.mk index 0b62140465..a3f32dcf85 100644 --- a/cpp/src/cluster.mk +++ b/cpp/src/cluster.mk @@ -1,11 +1,20 @@ # # Cluster library makefile fragment, to be included in Makefile.am # -if CPG +# Optional CMAN support +if HAVE_LIBCMAN +CMAN_SOURCES= qpid/cluster/Quorum_cman.h qpid/cluster/Quorum_cman.cpp +libcman = -lcman +else +CMAN_SOURCES= qpid/cluster/Quorum_null.h +endif + +if HAVE_LIBCPG dmodule_LTLIBRARIES += cluster.la cluster_la_SOURCES = \ + $(CMAN_SOURCES) \ qpid/cluster/types.h \ qpid/cluster/Cluster.cpp \ qpid/cluster/Cluster.h \ @@ -32,9 +41,7 @@ cluster_la_SOURCES = \ qpid/cluster/FailoverExchange.h \ qpid/cluster/FailoverExchange.cpp -cluster_la_LIBADD= -lcpg libqpidbroker.la libqpidclient.la - +cluster_la_LIBADD= -lcpg $(libcman) libqpidbroker.la libqpidclient.la cluster_la_LDFLAGS = $(PLUGINLDFLAGS) -endif - +endif # HAVE_LIBCPG diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp index f6022aa5b8..b2650ffa7f 100644 --- a/cpp/src/qpid/cluster/Cluster.cpp +++ b/cpp/src/qpid/cluster/Cluster.cpp @@ -86,6 +86,7 @@ struct ClusterDispatcher : public framing::AMQP_AllOperations::ClusterHandler { }; Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker& b) : + isQuorate(isQuorateImpl), broker(b), poller(b.getPoller()), cpg(*this), @@ -591,6 +592,17 @@ broker::Broker& Cluster::getBroker() const { return broker; // Immutable, no need to lock. } +/** Default implementation for isQuorateImpl when there is no quorum service. */ +bool Cluster::isQuorateImpl() { return true; } + +void Cluster::checkQuorum() { + if (!isQuorate()) { + QPID_LOG(critical, *this << " disconnected from cluster quorum, shutting down"); + leave(); + throw Exception(QPID_MSG(*this << " disconnected from cluster quorum.")); + } +} + void Cluster::setClusterId(const Uuid& uuid) { clusterId = uuid; if (mgmtObject) diff --git a/cpp/src/qpid/cluster/Cluster.h b/cpp/src/qpid/cluster/Cluster.h index d8b9c958d8..aff3f18c6d 100644 --- a/cpp/src/qpid/cluster/Cluster.h +++ b/cpp/src/qpid/cluster/Cluster.h @@ -92,6 +92,9 @@ class Cluster : private Cpg::Handler, public management::Manageable { MemberId getId() const; broker::Broker& getBroker() const; + boost::function<bool ()> isQuorate; + void checkQuorum(); + private: typedef sys::LockPtr<Cluster,sys::Monitor> LockPtr; typedef sys::LockPtr<const Cluster,sys::Monitor> ConstLockPtr; @@ -173,7 +176,8 @@ class Cluster : private Cpg::Handler, public management::Manageable { void dumpOutDone(Lock&); void setClusterId(const framing::Uuid&); - + static bool isQuorateImpl(); + mutable sys::Monitor lock; broker::Broker& broker; diff --git a/cpp/src/qpid/cluster/OutputInterceptor.cpp b/cpp/src/qpid/cluster/OutputInterceptor.cpp index effd2c5bff..68afb9bda0 100644 --- a/cpp/src/qpid/cluster/OutputInterceptor.cpp +++ b/cpp/src/qpid/cluster/OutputInterceptor.cpp @@ -37,6 +37,7 @@ OutputInterceptor::OutputInterceptor(cluster::Connection& p, sys::ConnectionOutp {} void OutputInterceptor::send(framing::AMQFrame& f) { + parent.getCluster().checkQuorum(); Locker l(lock); next->send(f); if (!parent.isCatchUp()) diff --git a/cpp/src/qpid/cluster/Quorum.h b/cpp/src/qpid/cluster/Quorum.h new file mode 100644 index 0000000000..f07b58dfa6 --- /dev/null +++ b/cpp/src/qpid/cluster/Quorum.h @@ -0,0 +1,32 @@ +#ifndef QPID_CLUSTER_QUORUM_H +#define QPID_CLUSTER_QUORUM_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "config.h" + +#if HAVE_LIBCMAN +#include "Quorum_cman.h" +#else +#include "Quorum_null.h" +#endif + +#endif /*!QPID_CLUSTER_QUORUM_H*/ diff --git a/cpp/src/qpid/cluster/Quorum_cman.cpp b/cpp/src/qpid/cluster/Quorum_cman.cpp new file mode 100644 index 0000000000..0d4656b536 --- /dev/null +++ b/cpp/src/qpid/cluster/Quorum_cman.cpp @@ -0,0 +1,53 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "Quorum.h" +#include "qpid/log/Statement.h" +#include "qpid/Options.h" +#include "qpid/sys/Time.h" + +namespace qpid { +namespace cluster { + +Quorum::Quorum() : enable(false), cman(0) {} + +Quorum::~Quorum() { if (cman) cman_finish(cman); } + +void Quorum::addOption(Options& opts) { + opts.addOptions()("cluster-cman", optValue(enable), "Enable integration with CMAN Cluster Manager"); +} + +void Quorum::init() { + if (enable) { + cman = cman_init(0); + if (cman == 0) throw ErrnoException("Can't connect to cman service"); + // FIXME aconway 2008-11-13: configure max wait. + for (int retry = 0; !cman_is_quorate(cman) && retry < 30; retry++) { + QPID_LOG(notice, "Waiting for cluster quorum: " << sys::strError(errno)); + sys::sleep(1); + } + if (!cman_is_quorate(cman)) + throw ErrnoException("Timed out waiting for cluster quorum"); + } +} + +bool Quorum::isQuorate() { return cman_is_quorate(cman); } + +}} // namespace qpid::cluster diff --git a/cpp/src/qpid/cluster/Quorum_cman.h b/cpp/src/qpid/cluster/Quorum_cman.h new file mode 100644 index 0000000000..bf02f697b0 --- /dev/null +++ b/cpp/src/qpid/cluster/Quorum_cman.h @@ -0,0 +1,53 @@ +#ifndef QPID_CLUSTER_QUORUM_CMAN_H +#define QPID_CLUSTER_QUORUM_CMAN_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +extern "C" { +#include <libcman.h> +} + +namespace qpid { + +class Options; + +namespace cluster { + +class Quorum { + public: + Quorum(); + ~Quorum(); + void addOption(Options& opts); + void init(); + bool isQuorate(); + + private: + bool enable; + cman_handle_t cman; +}; + + +}} // namespace qpid::cluster + + // namespace qpid::cluster + +#endif /*!QPID_CLUSTER_QUORUM_CMAN_H*/ diff --git a/cpp/src/qpid/cluster/Quorum_null.h b/cpp/src/qpid/cluster/Quorum_null.h new file mode 100644 index 0000000000..96374a5e88 --- /dev/null +++ b/cpp/src/qpid/cluster/Quorum_null.h @@ -0,0 +1,39 @@ +#ifndef QPID_CLUSTER_QUORUM_NULL_H +#define QPID_CLUSTER_QUORUM_NULL_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +namespace qpid { +namespace cluster { + +/** Null implementation of quorum. */ + +class Quorum { + public: + void init(); + bool isQuorate() { return true; } + void addOption(Options& opts) {} +}; + +#endif + + +#endif /*!QPID_CLUSTER_QUORUM_NULL_H*/ diff --git a/cpp/src/tests/cluster.mk b/cpp/src/tests/cluster.mk index 88574943fd..95d8d0f034 100644 --- a/cpp/src/tests/cluster.mk +++ b/cpp/src/tests/cluster.mk @@ -1,10 +1,10 @@ -if CPG + +if HAVE_LIBCPG + # # Cluster tests makefile fragment, to be included in Makefile.am # -lib_cluster = $(abs_builddir)/../cluster.la - # NOTE: Programs using the openais library must be run with gid=ais # You should do "newgrp ais" before running the tests to run these. # @@ -16,8 +16,8 @@ EXTRA_DIST+=ais_check start_cluster stop_cluster check_PROGRAMS+=cluster_test cluster_test_SOURCES=unit_test.cpp cluster_test.cpp -cluster_test_LDADD=$(lib_client) $(lib_cluster) -lboost_unit_test_framework +cluster_test_LDADD=$(lib_client) ../cluster.la -lboost_unit_test_framework -unit_test_LDADD+=$(lib_cluster) +unit_test_LDADD+=../cluster.la endif diff --git a/cpp/src/tests/ssl_test b/cpp/src/tests/ssl_test index 2a12c79c5b..9fc54008b7 100755 --- a/cpp/src/tests/ssl_test +++ b/cpp/src/tests/ssl_test @@ -19,8 +19,7 @@ create_certs() { } start_broker() { - ../qpidd --daemon --transport ssl --port 0 --ssl-port 0 --no-data-dir --no-module-dir --auth no --config $CONFIG\ - --load-module ../.libs/ssl.so --ssl-cert-db $CERT_DIR --ssl-cert-password-file $CERT_PW_FILE > qpidd.port + ../qpidd --daemon --transport ssl --port 0 --ssl-port 0 --no-data-dir --no-module-dir --auth no --config $CONFIG --load-module ../.libs/ssl.so --ssl-cert-db $CERT_DIR --ssl-cert-password-file $CERT_PW_FILE > qpidd.port PORT=`cat qpidd.port` } |