summaryrefslogtreecommitdiff
path: root/cpp/src
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2012-10-05 18:21:45 +0000
committerAlan Conway <aconway@apache.org>2012-10-05 18:21:45 +0000
commit3a0d4cd17abf3a39a8e5103cb0d9b5b4fd3bb55e (patch)
tree075cdd787c0a260c7b57b74bfeaea69e979035aa /cpp/src
parent67d03ede53fa2ad521bf4769a0fd8cc39e7d38f2 (diff)
downloadqpid-python-3a0d4cd17abf3a39a8e5103cb0d9b5b4fd3bb55e.tar.gz
QPID-4360: Non-ready HA broker can be incorrectly promoted to primary
A joining broker now attempts to contact all known members of the cluster and check their status. If any brokers are in a state other than "joining" the broker will refuse to promote. This will allow rgmanager to continue to try addresses till it finds a ready brokers. Note this reqiures ha-brokers-url to be a list of all known brokers, not a virtual IP. ha-public-url can still be a VIP. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1394706 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src')
-rw-r--r--cpp/src/CMakeLists.txt2
-rw-r--r--cpp/src/ha.mk4
-rw-r--r--cpp/src/qpid/broker/Broker.h2
-rw-r--r--cpp/src/qpid/client/ConnectionHandler.cpp8
-rw-r--r--cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp3
-rw-r--r--cpp/src/qpid/ha/BrokerInfo.cpp8
-rw-r--r--cpp/src/qpid/ha/HaBroker.cpp17
-rw-r--r--cpp/src/qpid/ha/HaBroker.h2
-rw-r--r--cpp/src/qpid/ha/StatusCheck.cpp132
-rw-r--r--cpp/src/qpid/ha/StatusCheck.h64
-rwxr-xr-xcpp/src/tests/ha_tests.py14
11 files changed, 243 insertions, 13 deletions
diff --git a/cpp/src/CMakeLists.txt b/cpp/src/CMakeLists.txt
index 299990a6e8..d0097ce5f4 100644
--- a/cpp/src/CMakeLists.txt
+++ b/cpp/src/CMakeLists.txt
@@ -654,6 +654,8 @@ if (BUILD_HA)
qpid/ha/ReplicatingSubscription.cpp
qpid/ha/ReplicatingSubscription.h
qpid/ha/Settings.h
+ qpid/ha/StatusCheck.cpp
+ qpid/ha/StatusCheck.h
qpid/ha/types.cpp
qpid/ha/types.h
qpid/ha/RemoteBackup.cpp
diff --git a/cpp/src/ha.mk b/cpp/src/ha.mk
index 96a3d872e4..0cc0760d94 100644
--- a/cpp/src/ha.mk
+++ b/cpp/src/ha.mk
@@ -50,10 +50,12 @@ ha_la_SOURCES = \
qpid/ha/ReplicationTest.cpp \
qpid/ha/ReplicationTest.h \
qpid/ha/Settings.h \
+ qpid/ha/StatusCheck.cpp \
+ qpid/ha/StatusCheck.h \
qpid/ha/RemoteBackup.cpp \
qpid/ha/RemoteBackup.h \
qpid/ha/types.cpp \
qpid/ha/types.h
-ha_la_LIBADD = libqpidbroker.la
+ha_la_LIBADD = libqpidbroker.la libqpidmessaging.la
ha_la_LDFLAGS = $(PLUGINLDFLAGS)
diff --git a/cpp/src/qpid/broker/Broker.h b/cpp/src/qpid/broker/Broker.h
index 8104009fa9..18f2f30528 100644
--- a/cpp/src/qpid/broker/Broker.h
+++ b/cpp/src/qpid/broker/Broker.h
@@ -362,8 +362,10 @@ class Broker : public sys::Runnable, public Plugin::Target,
QPID_BROKER_EXTERN framing::FieldTable getLinkClientProperties() const;
QPID_BROKER_EXTERN void setLinkClientProperties(const framing::FieldTable&);
+ QPID_BROKER_EXTERN uint16_t getLinkHearbeatInterval() { return config.linkHeartbeatInterval; }
/** Information identifying this system */
boost::shared_ptr<const System> getSystem() const { return systemObject; }
+ friend class StatusCheckThread;
};
}}
diff --git a/cpp/src/qpid/client/ConnectionHandler.cpp b/cpp/src/qpid/client/ConnectionHandler.cpp
index 91838d8e8b..4f88cb97ee 100644
--- a/cpp/src/qpid/client/ConnectionHandler.cpp
+++ b/cpp/src/qpid/client/ConnectionHandler.cpp
@@ -82,9 +82,11 @@ void ConnectionHandler::Adapter::handle(qpid::framing::AMQFrame& f)
handler.out(f);
}
-ConnectionHandler::ConnectionHandler(const ConnectionSettings& s, ProtocolVersion& v, Bounds& b)
- : StateManager(NOT_STARTED), ConnectionSettings(s), outHandler(*this, b), proxy(outHandler),
- errorCode(CLOSE_CODE_NORMAL), version(v)
+ConnectionHandler::ConnectionHandler(
+ const ConnectionSettings& s, ProtocolVersion& v, Bounds& b)
+ : StateManager(NOT_STARTED), ConnectionSettings(s),
+ outHandler(*this, b), proxy(outHandler), errorCode(CLOSE_CODE_NORMAL), version(v),
+ properties(s.clientProperties)
{
insist = true;
diff --git a/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp b/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp
index aaebec0720..f43119ea4c 100644
--- a/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp
+++ b/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp
@@ -26,6 +26,7 @@
#include "qpid/framing/Uuid.h"
#include "qpid/log/Statement.h"
#include "qpid/Url.h"
+#include "qpid/amqp_0_10/Codecs.h"
#include <boost/intrusive_ptr.hpp>
#include <vector>
#include <sstream>
@@ -156,6 +157,8 @@ void ConnectionImpl::setOption(const std::string& name, const Variant& value)
settings.sslCertName = value.asString();
} else if (name == "x-reconnect-on-limit-exceeded" || name == "x_reconnect_on_limit_exceeded") {
reconnectOnLimitExceeded = value;
+ } else if (name == "client-properties") {
+ amqp_0_10::translate(value.asMap(), settings.clientProperties);
} else {
throw qpid::messaging::MessagingException(QPID_MSG("Invalid option: " << name << " not recognised"));
}
diff --git a/cpp/src/qpid/ha/BrokerInfo.cpp b/cpp/src/qpid/ha/BrokerInfo.cpp
index 73e86d94fe..5a8dfa512a 100644
--- a/cpp/src/qpid/ha/BrokerInfo.cpp
+++ b/cpp/src/qpid/ha/BrokerInfo.cpp
@@ -33,10 +33,10 @@ namespace qpid {
namespace ha {
namespace {
-std::string SYSTEM_ID="system-id";
-std::string HOST_NAME="host-name";
-std::string PORT="port";
-std::string STATUS="status";
+const std::string SYSTEM_ID="system-id";
+const std::string HOST_NAME="host-name";
+const std::string PORT="port";
+const std::string STATUS="status";
}
using types::Uuid;
diff --git a/cpp/src/qpid/ha/HaBroker.cpp b/cpp/src/qpid/ha/HaBroker.cpp
index de9880b6db..877fa021d0 100644
--- a/cpp/src/qpid/ha/HaBroker.cpp
+++ b/cpp/src/qpid/ha/HaBroker.cpp
@@ -26,6 +26,7 @@
#include "QueueReplicator.h"
#include "ReplicatingSubscription.h"
#include "Settings.h"
+#include "StatusCheck.h"
#include "qpid/amqp_0_10/Codecs.h"
#include "qpid/Exception.h"
#include "qpid/broker/Broker.h"
@@ -87,7 +88,6 @@ void HaBroker::initialize() {
broker.getSystem()->getNodeName(),
broker.getPort(broker::Broker::TCP_TRANSPORT),
systemId);
-
QPID_LOG(notice, logPrefix << "Initializing: " << brokerInfo);
// Set up the management object.
@@ -110,6 +110,7 @@ void HaBroker::initialize() {
status = JOINING;
backup.reset(new Backup(*this, settings));
broker.getKnownBrokers = boost::bind(&HaBroker::getKnownBrokers, this);
+ statusCheck.reset(new StatusCheck(logPrefix, broker.getLinkHearbeatInterval(), brokerInfo));
}
if (!settings.clientUrl.empty()) setClientUrl(Url(settings.clientUrl));
@@ -158,8 +159,15 @@ Manageable::status_t HaBroker::ManagementMethod (uint32_t methodId, Args& args,
switch (methodId) {
case _qmf::HaBroker::METHOD_PROMOTE: {
switch (getStatus()) {
- case JOINING: recover(); break;
- case CATCHUP:
+ case JOINING:
+ if (statusCheck->canPromote())
+ recover();
+ else {
+ QPID_LOG(error, logPrefix << "Cluster already active, cannot be promoted");
+ throw Exception("Cluster already active, cannot be promoted.");
+ }
+ break;
+ case CATCHUP:
QPID_LOG(error, logPrefix << "Still catching up, cannot be promoted.");
throw Exception("Still catching up, cannot be promoted.");
break;
@@ -219,7 +227,6 @@ void HaBroker::setClientUrl(const Url& url) {
void HaBroker::updateClientUrl(Mutex::ScopedLock&) {
Url url = clientUrl.empty() ? brokerUrl : clientUrl;
- if (url.empty()) throw Url::Invalid("HA client URL is empty");
mgmtObject->set_publicUrl(url.str());
knownBrokers.clear();
knownBrokers.push_back(url);
@@ -230,10 +237,10 @@ void HaBroker::setBrokerUrl(const Url& url) {
boost::shared_ptr<Backup> b;
{
Mutex::ScopedLock l(lock);
- if (url.empty()) throw Url::Invalid("HA broker URL is empty");
brokerUrl = url;
mgmtObject->set_brokersUrl(brokerUrl.str());
QPID_LOG(info, logPrefix << "Broker URL set to: " << url);
+ if (status == JOINING && statusCheck.get()) statusCheck->setUrl(url);
// Updating broker URL also updates defaulted client URL:
if (clientUrl.empty()) updateClientUrl(l);
b = backup;
diff --git a/cpp/src/qpid/ha/HaBroker.h b/cpp/src/qpid/ha/HaBroker.h
index 3b39b9ec3d..48db0a8d3c 100644
--- a/cpp/src/qpid/ha/HaBroker.h
+++ b/cpp/src/qpid/ha/HaBroker.h
@@ -53,6 +53,7 @@ namespace ha {
class Backup;
class ConnectionObserver;
class Primary;
+class StatusCheck;
/**
* HA state and actions associated with a HA broker. Holds all the management info.
@@ -130,6 +131,7 @@ class HaBroker : public management::Manageable
BrokerInfo brokerInfo;
Membership membership;
ReplicationTest replicationTest;
+ std::auto_ptr<StatusCheck> statusCheck;
};
}} // namespace qpid::ha
diff --git a/cpp/src/qpid/ha/StatusCheck.cpp b/cpp/src/qpid/ha/StatusCheck.cpp
new file mode 100644
index 0000000000..e4597a5a45
--- /dev/null
+++ b/cpp/src/qpid/ha/StatusCheck.cpp
@@ -0,0 +1,132 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "StatusCheck.h"
+#include "qpid/log/Statement.h"
+#include "qpid/messaging/Address.h"
+#include "qpid/messaging/Connection.h"
+#include "qpid/messaging/Message.h"
+#include "qpid/messaging/Receiver.h"
+#include "qpid/messaging/Sender.h"
+#include "qpid/messaging/Session.h"
+#include "qpid/types/Variant.h"
+
+namespace qpid {
+namespace ha {
+
+using namespace qpid::messaging;
+using namespace qpid::types;
+using namespace std;
+using namespace sys;
+
+const string HA_BROKER = "org.apache.qpid.ha:habroker:ha-broker";
+
+class StatusCheckThread : public sys::Runnable {
+ public:
+ StatusCheckThread(StatusCheck& sc, const qpid::Address& addr, const BrokerInfo& self)
+ : url(addr), statusCheck(sc), brokerInfo(self) {}
+ void run();
+ private:
+ Url url;
+ StatusCheck& statusCheck;
+ uint16_t linkHeartbeatInterval;
+ BrokerInfo brokerInfo;
+};
+
+void StatusCheckThread::run() {
+ QPID_LOG(debug, statusCheck.logPrefix << "Checking status of " << url);
+ Variant::Map options, clientProperties;
+ clientProperties = brokerInfo.asMap(); // Detect self connections.
+ clientProperties["qpid.ha-admin"] = 1; // Allow connection to backups.
+
+ options["client-properties"] = clientProperties;
+ options["heartbeat"] = statusCheck.linkHeartbeatInterval;
+ Connection c(url.str(), options);
+
+ try {
+ c.open();
+ Session session = c.createSession();
+ messaging::Address responses("#;{create:always,node:{x-declare:{exclusive:True,auto-delete:True,arguments:{'qpid.replicate':none}}}}");
+ Receiver r = session.createReceiver(responses);
+ Sender s = session.createSender("qmf.default.direct/broker");
+ Message request;
+ request.setReplyTo(responses);
+ request.setContentType("amqp/map");
+ request.setProperty("x-amqp-0-10.app-id", "qmf2");
+ request.setProperty("qmf.opcode", "_query_request");
+ Variant::Map oid;
+ oid["_object_name"] = HA_BROKER;
+ Variant::Map content;
+ content["_what"] = "OBJECT";
+ content["_object_id"] = oid;
+ encode(content, request);
+ s.send(request);
+ Message response = r.fetch(statusCheck.linkHeartbeatInterval*Duration::SECOND);
+ session.acknowledge();
+ Variant::List contentIn;
+ decode(response, contentIn);
+ if (contentIn.size() == 1) {
+ Variant::Map details = contentIn.front().asMap()["_values"].asMap();
+ string status = details["status"].getString();
+ if (status != "joining") {
+ statusCheck.setPromote(false);
+ QPID_LOG(error, statusCheck.logPrefix << "Broker " << url << " status is " << status
+ << " this broker will refuse promotion.");
+ }
+ QPID_LOG(debug, statusCheck.logPrefix << "Status of " << url << ": " << status);
+ }
+ } catch(const exception& error) {
+ QPID_LOG(warning, "Error checking status of " << url << ": " << error.what());
+ }
+ delete this;
+}
+
+StatusCheck::StatusCheck(const string& lp, uint16_t lh, const BrokerInfo& self)
+ : logPrefix(lp), promote(true), linkHeartbeatInterval(lh), brokerInfo(self)
+{}
+
+StatusCheck::~StatusCheck() {
+ // Join any leftovers
+ for (size_t i = 0; i < threads.size(); ++i) threads[i].join();
+}
+
+void StatusCheck::setUrl(const Url& url) {
+ Mutex::ScopedLock l(lock);
+ for (size_t i = 0; i < url.size(); ++i)
+ threads.push_back(Thread(new StatusCheckThread(*this, url[i], brokerInfo)));
+}
+
+bool StatusCheck::canPromote() {
+ Mutex::ScopedLock l(lock);
+ while (!threads.empty()) {
+ Thread t = threads.back();
+ threads.pop_back();
+ Mutex::ScopedUnlock u(lock);
+ t.join();
+ }
+ return promote;
+}
+
+void StatusCheck::setPromote(bool p) {
+ Mutex::ScopedLock l(lock);
+ promote = p;
+}
+
+}} // namespace qpid::ha
diff --git a/cpp/src/qpid/ha/StatusCheck.h b/cpp/src/qpid/ha/StatusCheck.h
new file mode 100644
index 0000000000..3c62c43a22
--- /dev/null
+++ b/cpp/src/qpid/ha/StatusCheck.h
@@ -0,0 +1,64 @@
+#ifndef QPID_HA_STATUSCHECK_H
+#define QPID_HA_STATUSCHECK_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "BrokerInfo.h"
+#include "qpid/Url.h"
+#include "qpid/sys/Thread.h"
+#include "qpid/sys/Mutex.h"
+#include "qpid/sys/Runnable.h"
+#include <vector>
+
+namespace qpid {
+namespace ha {
+
+/**
+ * Check whether a JOINING broker can be promoted .
+ *
+ * A JOINING broker can be promoted as long as all the other brokers are also
+ * JOINING. If there are READY brokers in the cluster the JOINING broker should
+ * refuse to promote so that one of the READY brokers can. This situation
+ * only comes about if the primary is dead and no new primary has been promoted.
+ *
+ * THREAD SAFE: setUrl and canPromote are called in arbitrary management threads.
+ */
+class StatusCheck
+{
+ public:
+ StatusCheck(const std::string& logPrefix, uint16_t linkHeartbeatInteval, const BrokerInfo& self);
+ ~StatusCheck();
+ void setUrl(const Url&);
+ bool canPromote();
+ void setPromote(bool p);
+ private:
+ std::string logPrefix;
+ sys::Mutex lock;
+ std::vector<sys::Thread> threads;
+ bool promote;
+ uint16_t linkHeartbeatInterval;
+ BrokerInfo brokerInfo;
+ friend class StatusCheckThread;
+};
+}} // namespace qpid::ha
+
+#endif /*!QPID_HA_STATUSCHECK_H*/
diff --git a/cpp/src/tests/ha_tests.py b/cpp/src/tests/ha_tests.py
index f1620cf55d..a8d16a77c9 100755
--- a/cpp/src/tests/ha_tests.py
+++ b/cpp/src/tests/ha_tests.py
@@ -866,6 +866,20 @@ class RecoveryTests(BrokerTest):
s.sync(timeout=1) # And released after the timeout.
self.assertEqual(cluster[2].ha_status(), "active")
+ def test_join_ready_cluster(self):
+ """If we join a cluster where the primary is dead, the new primary is
+ not yet promoted and there are ready backups then we should refuse
+ promotion so that one of the ready backups can be chosen."""
+ # FIXME aconway 2012-10-05: smaller timeout
+ cluster = HaCluster(self, 2, args=["--link-heartbeat-interval", 1])
+ cluster[0].wait_status("active")
+ cluster[1].wait_status("ready")
+ cluster.bounce(0, promote_next=False)
+ self.assertRaises(Exception, cluster[0].promote)
+ os.kill(cluster[1].pid, signal.SIGSTOP) # Test for timeout if unresponsive.
+ cluster.bounce(0, promote_next=False)
+ cluster[0].promote()
+
if __name__ == "__main__":
shutil.rmtree("brokertest.tmp", True)
qpid_ha = os.getenv("QPID_HA_EXEC")