diff options
Diffstat (limited to 'cpp')
-rw-r--r-- | cpp/src/qpid/cluster/Cluster.cpp | 11 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/Cluster.h | 6 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/ClusterPlugin.cpp | 8 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/Quorum_cman.cpp | 28 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/Quorum_cman.h | 3 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/Quorum_null.h | 8 | ||||
-rw-r--r-- | cpp/src/qpid/sys/posix/PollableCondition.cpp | 4 |
7 files changed, 30 insertions, 38 deletions
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp index b2650ffa7f..5dffae2b2f 100644 --- a/cpp/src/qpid/cluster/Cluster.cpp +++ b/cpp/src/qpid/cluster/Cluster.cpp @@ -85,8 +85,7 @@ struct ClusterDispatcher : public framing::AMQP_AllOperations::ClusterHandler { bool invoke(AMQBody& body) { return framing::invoke(*this, body).wasHandled(); } }; -Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker& b) : - isQuorate(isQuorateImpl), +Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker& b, bool useQuorum) : broker(b), poller(b.getPoller()), cpg(*this), @@ -117,8 +116,9 @@ Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker& b) : broker.addFinalizer(boost::bind(&Cluster::brokerShutdown, this)); cpgDispatchHandle.startWatch(poller); deliverQueue.start(); + QPID_LOG(notice, *this << " joining cluster " << name.str()); + if (useQuorum) quorum.init(); cpg.join(name); - QPID_LOG(notice, *this << " will join cluster " << name.str()); } Cluster::~Cluster() { @@ -592,11 +592,8 @@ broker::Broker& Cluster::getBroker() const { return broker; // Immutable, no need to lock. } -/** Default implementation for isQuorateImpl when there is no quorum service. */ -bool Cluster::isQuorateImpl() { return true; } - void Cluster::checkQuorum() { - if (!isQuorate()) { + if (!quorum.isQuorate()) { QPID_LOG(critical, *this << " disconnected from cluster quorum, shutting down"); leave(); throw Exception(QPID_MSG(*this << " disconnected from cluster quorum.")); diff --git a/cpp/src/qpid/cluster/Cluster.h b/cpp/src/qpid/cluster/Cluster.h index aff3f18c6d..2a659be2f1 100644 --- a/cpp/src/qpid/cluster/Cluster.h +++ b/cpp/src/qpid/cluster/Cluster.h @@ -24,6 +24,7 @@ #include "NoOpConnectionOutputHandler.h" #include "ClusterMap.h" #include "FailoverExchange.h" +#include "Quorum.h" #include "qpid/broker/Broker.h" #include "qpid/sys/PollableQueue.h" @@ -66,7 +67,7 @@ class Cluster : private Cpg::Handler, public management::Manageable { * @param name of the cluster. * @param url of this broker, sent to the cluster. */ - Cluster(const std::string& name, const Url& url, broker::Broker&); + Cluster(const std::string& name, const Url& url, broker::Broker&, bool useQuorum); virtual ~Cluster(); @@ -176,7 +177,6 @@ class Cluster : private Cpg::Handler, public management::Manageable { void dumpOutDone(Lock&); void setClusterId(const framing::Uuid&); - static bool isQuorateImpl(); mutable sys::Monitor lock; @@ -215,6 +215,8 @@ class Cluster : private Cpg::Handler, public management::Manageable { size_t lastSize; boost::shared_ptr<FailoverExchange> failoverExchange; + Quorum quorum; + friend std::ostream& operator<<(std::ostream&, const Cluster&); friend class ClusterDispatcher; }; diff --git a/cpp/src/qpid/cluster/ClusterPlugin.cpp b/cpp/src/qpid/cluster/ClusterPlugin.cpp index fad0563872..eaf2631d03 100644 --- a/cpp/src/qpid/cluster/ClusterPlugin.cpp +++ b/cpp/src/qpid/cluster/ClusterPlugin.cpp @@ -39,7 +39,10 @@ using broker::Broker; struct ClusterValues { string name; string url; + bool quorum; + ClusterValues() : quorum(false) {} + Url getUrl(uint16_t port) const { if (url.empty()) return Url::getIpAddressesUrl(port); return Url(url); @@ -59,6 +62,9 @@ struct ClusterOptions : public Options { ("cluster-url", optValue(values.url,"URL"), "URL of this broker, advertized to the cluster.\n" "Defaults to a URL listing all the local IP addresses\n") +#if HAVE_LIBCMAN + ("cluster-cman", optValue(values.quorum), "Integrate with Cluster Manager (CMAN) cluster.") +#endif ; } }; @@ -78,7 +84,7 @@ struct ClusterPlugin : public Plugin { if (values.name.empty()) return; // Only if --cluster-name option was specified. Broker* broker = dynamic_cast<Broker*>(&target); if (!broker) return; - cluster = new Cluster(values.name, values.getUrl(broker->getPort(Broker::TCP_TRANSPORT)), *broker); + cluster = new Cluster(values.name, values.getUrl(broker->getPort(Broker::TCP_TRANSPORT)), *broker, values.quorum); broker->setConnectionFactory( boost::shared_ptr<sys::ConnectionCodec::Factory>( new ConnectionCodec::Factory(broker->getConnectionFactory(), *cluster))); diff --git a/cpp/src/qpid/cluster/Quorum_cman.cpp b/cpp/src/qpid/cluster/Quorum_cman.cpp index 0d4656b536..d5df758b40 100644 --- a/cpp/src/qpid/cluster/Quorum_cman.cpp +++ b/cpp/src/qpid/cluster/Quorum_cman.cpp @@ -18,7 +18,7 @@ * under the License. * */ -#include "Quorum.h" +#include "Quorum_cman.h" #include "qpid/log/Statement.h" #include "qpid/Options.h" #include "qpid/sys/Time.h" @@ -30,24 +30,20 @@ Quorum::Quorum() : enable(false), cman(0) {} Quorum::~Quorum() { if (cman) cman_finish(cman); } -void Quorum::addOption(Options& opts) { - opts.addOptions()("cluster-cman", optValue(enable), "Enable integration with CMAN Cluster Manager"); -} - void Quorum::init() { - if (enable) { - cman = cman_init(0); - if (cman == 0) throw ErrnoException("Can't connect to cman service"); - // FIXME aconway 2008-11-13: configure max wait. - for (int retry = 0; !cman_is_quorate(cman) && retry < 30; retry++) { - QPID_LOG(notice, "Waiting for cluster quorum: " << sys::strError(errno)); - sys::sleep(1); - } - if (!cman_is_quorate(cman)) - throw ErrnoException("Timed out waiting for cluster quorum"); + QPID_LOG(info, "Waiting for cluster quorum"); + enable = true; + cman = cman_init(0); + if (cman == 0) throw ErrnoException("Can't connect to cman service"); + // FIXME aconway 2008-11-13: configure max wait. + for (int retry = 0; !cman_is_quorate(cman) && retry < 30; retry++) { + QPID_LOG(info, "Waiting for cluster quorum: " << sys::strError(errno)); + sys::sleep(1); } + if (!cman_is_quorate(cman)) + throw ErrnoException("Timed out waiting for cluster quorum."); } -bool Quorum::isQuorate() { return cman_is_quorate(cman); } +bool Quorum::isQuorate() { return enable ? cman_is_quorate(cman) : true; } }} // namespace qpid::cluster diff --git a/cpp/src/qpid/cluster/Quorum_cman.h b/cpp/src/qpid/cluster/Quorum_cman.h index bf02f697b0..d0f8b2c954 100644 --- a/cpp/src/qpid/cluster/Quorum_cman.h +++ b/cpp/src/qpid/cluster/Quorum_cman.h @@ -36,7 +36,6 @@ class Quorum { public: Quorum(); ~Quorum(); - void addOption(Options& opts); void init(); bool isQuorate(); @@ -48,6 +47,4 @@ class Quorum { }} // namespace qpid::cluster - // namespace qpid::cluster - #endif /*!QPID_CLUSTER_QUORUM_CMAN_H*/ diff --git a/cpp/src/qpid/cluster/Quorum_null.h b/cpp/src/qpid/cluster/Quorum_null.h index 96374a5e88..cbb6c20708 100644 --- a/cpp/src/qpid/cluster/Quorum_null.h +++ b/cpp/src/qpid/cluster/Quorum_null.h @@ -28,12 +28,10 @@ namespace cluster { class Quorum { public: - void init(); + void init() {} bool isQuorate() { return true; } - void addOption(Options& opts) {} }; -#endif - - #endif /*!QPID_CLUSTER_QUORUM_NULL_H*/ + +}} // namespace qpid::cluster diff --git a/cpp/src/qpid/sys/posix/PollableCondition.cpp b/cpp/src/qpid/sys/posix/PollableCondition.cpp index 4ff66d1106..0c55fd3c0d 100644 --- a/cpp/src/qpid/sys/posix/PollableCondition.cpp +++ b/cpp/src/qpid/sys/posix/PollableCondition.cpp @@ -22,10 +22,6 @@ * */ -// FIXME aconway 2008-08-11: this could be of more general interest, -// move to common lib. -// - #include "PollableCondition.h" #include "qpid/sys/posix/PrivatePosix.h" #include "qpid/Exception.h" |