summaryrefslogtreecommitdiff
path: root/cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp')
-rw-r--r--cpp/src/qpid/cluster/Cluster.cpp11
-rw-r--r--cpp/src/qpid/cluster/Cluster.h6
-rw-r--r--cpp/src/qpid/cluster/ClusterPlugin.cpp8
-rw-r--r--cpp/src/qpid/cluster/Quorum_cman.cpp28
-rw-r--r--cpp/src/qpid/cluster/Quorum_cman.h3
-rw-r--r--cpp/src/qpid/cluster/Quorum_null.h8
-rw-r--r--cpp/src/qpid/sys/posix/PollableCondition.cpp4
7 files changed, 30 insertions, 38 deletions
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp
index b2650ffa7f..5dffae2b2f 100644
--- a/cpp/src/qpid/cluster/Cluster.cpp
+++ b/cpp/src/qpid/cluster/Cluster.cpp
@@ -85,8 +85,7 @@ struct ClusterDispatcher : public framing::AMQP_AllOperations::ClusterHandler {
bool invoke(AMQBody& body) { return framing::invoke(*this, body).wasHandled(); }
};
-Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker& b) :
- isQuorate(isQuorateImpl),
+Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker& b, bool useQuorum) :
broker(b),
poller(b.getPoller()),
cpg(*this),
@@ -117,8 +116,9 @@ Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker& b) :
broker.addFinalizer(boost::bind(&Cluster::brokerShutdown, this));
cpgDispatchHandle.startWatch(poller);
deliverQueue.start();
+ QPID_LOG(notice, *this << " joining cluster " << name.str());
+ if (useQuorum) quorum.init();
cpg.join(name);
- QPID_LOG(notice, *this << " will join cluster " << name.str());
}
Cluster::~Cluster() {
@@ -592,11 +592,8 @@ broker::Broker& Cluster::getBroker() const {
return broker; // Immutable, no need to lock.
}
-/** Default implementation for isQuorateImpl when there is no quorum service. */
-bool Cluster::isQuorateImpl() { return true; }
-
void Cluster::checkQuorum() {
- if (!isQuorate()) {
+ if (!quorum.isQuorate()) {
QPID_LOG(critical, *this << " disconnected from cluster quorum, shutting down");
leave();
throw Exception(QPID_MSG(*this << " disconnected from cluster quorum."));
diff --git a/cpp/src/qpid/cluster/Cluster.h b/cpp/src/qpid/cluster/Cluster.h
index aff3f18c6d..2a659be2f1 100644
--- a/cpp/src/qpid/cluster/Cluster.h
+++ b/cpp/src/qpid/cluster/Cluster.h
@@ -24,6 +24,7 @@
#include "NoOpConnectionOutputHandler.h"
#include "ClusterMap.h"
#include "FailoverExchange.h"
+#include "Quorum.h"
#include "qpid/broker/Broker.h"
#include "qpid/sys/PollableQueue.h"
@@ -66,7 +67,7 @@ class Cluster : private Cpg::Handler, public management::Manageable {
* @param name of the cluster.
* @param url of this broker, sent to the cluster.
*/
- Cluster(const std::string& name, const Url& url, broker::Broker&);
+ Cluster(const std::string& name, const Url& url, broker::Broker&, bool useQuorum);
virtual ~Cluster();
@@ -176,7 +177,6 @@ class Cluster : private Cpg::Handler, public management::Manageable {
void dumpOutDone(Lock&);
void setClusterId(const framing::Uuid&);
- static bool isQuorateImpl();
mutable sys::Monitor lock;
@@ -215,6 +215,8 @@ class Cluster : private Cpg::Handler, public management::Manageable {
size_t lastSize;
boost::shared_ptr<FailoverExchange> failoverExchange;
+ Quorum quorum;
+
friend std::ostream& operator<<(std::ostream&, const Cluster&);
friend class ClusterDispatcher;
};
diff --git a/cpp/src/qpid/cluster/ClusterPlugin.cpp b/cpp/src/qpid/cluster/ClusterPlugin.cpp
index fad0563872..eaf2631d03 100644
--- a/cpp/src/qpid/cluster/ClusterPlugin.cpp
+++ b/cpp/src/qpid/cluster/ClusterPlugin.cpp
@@ -39,7 +39,10 @@ using broker::Broker;
struct ClusterValues {
string name;
string url;
+ bool quorum;
+ ClusterValues() : quorum(false) {}
+
Url getUrl(uint16_t port) const {
if (url.empty()) return Url::getIpAddressesUrl(port);
return Url(url);
@@ -59,6 +62,9 @@ struct ClusterOptions : public Options {
("cluster-url", optValue(values.url,"URL"),
"URL of this broker, advertized to the cluster.\n"
"Defaults to a URL listing all the local IP addresses\n")
+#if HAVE_LIBCMAN
+ ("cluster-cman", optValue(values.quorum), "Integrate with Cluster Manager (CMAN) cluster.")
+#endif
;
}
};
@@ -78,7 +84,7 @@ struct ClusterPlugin : public Plugin {
if (values.name.empty()) return; // Only if --cluster-name option was specified.
Broker* broker = dynamic_cast<Broker*>(&target);
if (!broker) return;
- cluster = new Cluster(values.name, values.getUrl(broker->getPort(Broker::TCP_TRANSPORT)), *broker);
+ cluster = new Cluster(values.name, values.getUrl(broker->getPort(Broker::TCP_TRANSPORT)), *broker, values.quorum);
broker->setConnectionFactory(
boost::shared_ptr<sys::ConnectionCodec::Factory>(
new ConnectionCodec::Factory(broker->getConnectionFactory(), *cluster)));
diff --git a/cpp/src/qpid/cluster/Quorum_cman.cpp b/cpp/src/qpid/cluster/Quorum_cman.cpp
index 0d4656b536..d5df758b40 100644
--- a/cpp/src/qpid/cluster/Quorum_cman.cpp
+++ b/cpp/src/qpid/cluster/Quorum_cman.cpp
@@ -18,7 +18,7 @@
* under the License.
*
*/
-#include "Quorum.h"
+#include "Quorum_cman.h"
#include "qpid/log/Statement.h"
#include "qpid/Options.h"
#include "qpid/sys/Time.h"
@@ -30,24 +30,20 @@ Quorum::Quorum() : enable(false), cman(0) {}
Quorum::~Quorum() { if (cman) cman_finish(cman); }
-void Quorum::addOption(Options& opts) {
- opts.addOptions()("cluster-cman", optValue(enable), "Enable integration with CMAN Cluster Manager");
-}
-
void Quorum::init() {
- if (enable) {
- cman = cman_init(0);
- if (cman == 0) throw ErrnoException("Can't connect to cman service");
- // FIXME aconway 2008-11-13: configure max wait.
- for (int retry = 0; !cman_is_quorate(cman) && retry < 30; retry++) {
- QPID_LOG(notice, "Waiting for cluster quorum: " << sys::strError(errno));
- sys::sleep(1);
- }
- if (!cman_is_quorate(cman))
- throw ErrnoException("Timed out waiting for cluster quorum");
+ QPID_LOG(info, "Waiting for cluster quorum");
+ enable = true;
+ cman = cman_init(0);
+ if (cman == 0) throw ErrnoException("Can't connect to cman service");
+ // FIXME aconway 2008-11-13: configure max wait.
+ for (int retry = 0; !cman_is_quorate(cman) && retry < 30; retry++) {
+ QPID_LOG(info, "Waiting for cluster quorum: " << sys::strError(errno));
+ sys::sleep(1);
}
+ if (!cman_is_quorate(cman))
+ throw ErrnoException("Timed out waiting for cluster quorum.");
}
-bool Quorum::isQuorate() { return cman_is_quorate(cman); }
+bool Quorum::isQuorate() { return enable ? cman_is_quorate(cman) : true; }
}} // namespace qpid::cluster
diff --git a/cpp/src/qpid/cluster/Quorum_cman.h b/cpp/src/qpid/cluster/Quorum_cman.h
index bf02f697b0..d0f8b2c954 100644
--- a/cpp/src/qpid/cluster/Quorum_cman.h
+++ b/cpp/src/qpid/cluster/Quorum_cman.h
@@ -36,7 +36,6 @@ class Quorum {
public:
Quorum();
~Quorum();
- void addOption(Options& opts);
void init();
bool isQuorate();
@@ -48,6 +47,4 @@ class Quorum {
}} // namespace qpid::cluster
- // namespace qpid::cluster
-
#endif /*!QPID_CLUSTER_QUORUM_CMAN_H*/
diff --git a/cpp/src/qpid/cluster/Quorum_null.h b/cpp/src/qpid/cluster/Quorum_null.h
index 96374a5e88..cbb6c20708 100644
--- a/cpp/src/qpid/cluster/Quorum_null.h
+++ b/cpp/src/qpid/cluster/Quorum_null.h
@@ -28,12 +28,10 @@ namespace cluster {
class Quorum {
public:
- void init();
+ void init() {}
bool isQuorate() { return true; }
- void addOption(Options& opts) {}
};
-#endif
-
-
#endif /*!QPID_CLUSTER_QUORUM_NULL_H*/
+
+}} // namespace qpid::cluster
diff --git a/cpp/src/qpid/sys/posix/PollableCondition.cpp b/cpp/src/qpid/sys/posix/PollableCondition.cpp
index 4ff66d1106..0c55fd3c0d 100644
--- a/cpp/src/qpid/sys/posix/PollableCondition.cpp
+++ b/cpp/src/qpid/sys/posix/PollableCondition.cpp
@@ -22,10 +22,6 @@
*
*/
-// FIXME aconway 2008-08-11: this could be of more general interest,
-// move to common lib.
-//
-
#include "PollableCondition.h"
#include "qpid/sys/posix/PrivatePosix.h"
#include "qpid/Exception.h"