diff options
author | Alan Conway <aconway@apache.org> | 2012-10-05 18:21:45 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2012-10-05 18:21:45 +0000 |
commit | 3a0d4cd17abf3a39a8e5103cb0d9b5b4fd3bb55e (patch) | |
tree | 075cdd787c0a260c7b57b74bfeaea69e979035aa /cpp/src | |
parent | 67d03ede53fa2ad521bf4769a0fd8cc39e7d38f2 (diff) | |
download | qpid-python-3a0d4cd17abf3a39a8e5103cb0d9b5b4fd3bb55e.tar.gz |
QPID-4360: Non-ready HA broker can be incorrectly promoted to primary
A joining broker now attempts to contact all known members of the cluster and
check their status. If any brokers are in a state other than "joining" the
broker will refuse to promote. This will allow rgmanager to continue to try
addresses till it finds a ready brokers.
Note this reqiures ha-brokers-url to be a list of all known brokers, not a
virtual IP. ha-public-url can still be a VIP.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1394706 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src')
-rw-r--r-- | cpp/src/CMakeLists.txt | 2 | ||||
-rw-r--r-- | cpp/src/ha.mk | 4 | ||||
-rw-r--r-- | cpp/src/qpid/broker/Broker.h | 2 | ||||
-rw-r--r-- | cpp/src/qpid/client/ConnectionHandler.cpp | 8 | ||||
-rw-r--r-- | cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp | 3 | ||||
-rw-r--r-- | cpp/src/qpid/ha/BrokerInfo.cpp | 8 | ||||
-rw-r--r-- | cpp/src/qpid/ha/HaBroker.cpp | 17 | ||||
-rw-r--r-- | cpp/src/qpid/ha/HaBroker.h | 2 | ||||
-rw-r--r-- | cpp/src/qpid/ha/StatusCheck.cpp | 132 | ||||
-rw-r--r-- | cpp/src/qpid/ha/StatusCheck.h | 64 | ||||
-rwxr-xr-x | cpp/src/tests/ha_tests.py | 14 |
11 files changed, 243 insertions, 13 deletions
diff --git a/cpp/src/CMakeLists.txt b/cpp/src/CMakeLists.txt index 299990a6e8..d0097ce5f4 100644 --- a/cpp/src/CMakeLists.txt +++ b/cpp/src/CMakeLists.txt @@ -654,6 +654,8 @@ if (BUILD_HA) qpid/ha/ReplicatingSubscription.cpp qpid/ha/ReplicatingSubscription.h qpid/ha/Settings.h + qpid/ha/StatusCheck.cpp + qpid/ha/StatusCheck.h qpid/ha/types.cpp qpid/ha/types.h qpid/ha/RemoteBackup.cpp diff --git a/cpp/src/ha.mk b/cpp/src/ha.mk index 96a3d872e4..0cc0760d94 100644 --- a/cpp/src/ha.mk +++ b/cpp/src/ha.mk @@ -50,10 +50,12 @@ ha_la_SOURCES = \ qpid/ha/ReplicationTest.cpp \ qpid/ha/ReplicationTest.h \ qpid/ha/Settings.h \ + qpid/ha/StatusCheck.cpp \ + qpid/ha/StatusCheck.h \ qpid/ha/RemoteBackup.cpp \ qpid/ha/RemoteBackup.h \ qpid/ha/types.cpp \ qpid/ha/types.h -ha_la_LIBADD = libqpidbroker.la +ha_la_LIBADD = libqpidbroker.la libqpidmessaging.la ha_la_LDFLAGS = $(PLUGINLDFLAGS) diff --git a/cpp/src/qpid/broker/Broker.h b/cpp/src/qpid/broker/Broker.h index 8104009fa9..18f2f30528 100644 --- a/cpp/src/qpid/broker/Broker.h +++ b/cpp/src/qpid/broker/Broker.h @@ -362,8 +362,10 @@ class Broker : public sys::Runnable, public Plugin::Target, QPID_BROKER_EXTERN framing::FieldTable getLinkClientProperties() const; QPID_BROKER_EXTERN void setLinkClientProperties(const framing::FieldTable&); + QPID_BROKER_EXTERN uint16_t getLinkHearbeatInterval() { return config.linkHeartbeatInterval; } /** Information identifying this system */ boost::shared_ptr<const System> getSystem() const { return systemObject; } + friend class StatusCheckThread; }; }} diff --git a/cpp/src/qpid/client/ConnectionHandler.cpp b/cpp/src/qpid/client/ConnectionHandler.cpp index 91838d8e8b..4f88cb97ee 100644 --- a/cpp/src/qpid/client/ConnectionHandler.cpp +++ b/cpp/src/qpid/client/ConnectionHandler.cpp @@ -82,9 +82,11 @@ void ConnectionHandler::Adapter::handle(qpid::framing::AMQFrame& f) handler.out(f); } -ConnectionHandler::ConnectionHandler(const ConnectionSettings& s, ProtocolVersion& v, Bounds& b) - : StateManager(NOT_STARTED), ConnectionSettings(s), outHandler(*this, b), proxy(outHandler), - errorCode(CLOSE_CODE_NORMAL), version(v) +ConnectionHandler::ConnectionHandler( + const ConnectionSettings& s, ProtocolVersion& v, Bounds& b) + : StateManager(NOT_STARTED), ConnectionSettings(s), + outHandler(*this, b), proxy(outHandler), errorCode(CLOSE_CODE_NORMAL), version(v), + properties(s.clientProperties) { insist = true; diff --git a/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp b/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp index aaebec0720..f43119ea4c 100644 --- a/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp +++ b/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp @@ -26,6 +26,7 @@ #include "qpid/framing/Uuid.h" #include "qpid/log/Statement.h" #include "qpid/Url.h" +#include "qpid/amqp_0_10/Codecs.h" #include <boost/intrusive_ptr.hpp> #include <vector> #include <sstream> @@ -156,6 +157,8 @@ void ConnectionImpl::setOption(const std::string& name, const Variant& value) settings.sslCertName = value.asString(); } else if (name == "x-reconnect-on-limit-exceeded" || name == "x_reconnect_on_limit_exceeded") { reconnectOnLimitExceeded = value; + } else if (name == "client-properties") { + amqp_0_10::translate(value.asMap(), settings.clientProperties); } else { throw qpid::messaging::MessagingException(QPID_MSG("Invalid option: " << name << " not recognised")); } diff --git a/cpp/src/qpid/ha/BrokerInfo.cpp b/cpp/src/qpid/ha/BrokerInfo.cpp index 73e86d94fe..5a8dfa512a 100644 --- a/cpp/src/qpid/ha/BrokerInfo.cpp +++ b/cpp/src/qpid/ha/BrokerInfo.cpp @@ -33,10 +33,10 @@ namespace qpid { namespace ha { namespace { -std::string SYSTEM_ID="system-id"; -std::string HOST_NAME="host-name"; -std::string PORT="port"; -std::string STATUS="status"; +const std::string SYSTEM_ID="system-id"; +const std::string HOST_NAME="host-name"; +const std::string PORT="port"; +const std::string STATUS="status"; } using types::Uuid; diff --git a/cpp/src/qpid/ha/HaBroker.cpp b/cpp/src/qpid/ha/HaBroker.cpp index de9880b6db..877fa021d0 100644 --- a/cpp/src/qpid/ha/HaBroker.cpp +++ b/cpp/src/qpid/ha/HaBroker.cpp @@ -26,6 +26,7 @@ #include "QueueReplicator.h" #include "ReplicatingSubscription.h" #include "Settings.h" +#include "StatusCheck.h" #include "qpid/amqp_0_10/Codecs.h" #include "qpid/Exception.h" #include "qpid/broker/Broker.h" @@ -87,7 +88,6 @@ void HaBroker::initialize() { broker.getSystem()->getNodeName(), broker.getPort(broker::Broker::TCP_TRANSPORT), systemId); - QPID_LOG(notice, logPrefix << "Initializing: " << brokerInfo); // Set up the management object. @@ -110,6 +110,7 @@ void HaBroker::initialize() { status = JOINING; backup.reset(new Backup(*this, settings)); broker.getKnownBrokers = boost::bind(&HaBroker::getKnownBrokers, this); + statusCheck.reset(new StatusCheck(logPrefix, broker.getLinkHearbeatInterval(), brokerInfo)); } if (!settings.clientUrl.empty()) setClientUrl(Url(settings.clientUrl)); @@ -158,8 +159,15 @@ Manageable::status_t HaBroker::ManagementMethod (uint32_t methodId, Args& args, switch (methodId) { case _qmf::HaBroker::METHOD_PROMOTE: { switch (getStatus()) { - case JOINING: recover(); break; - case CATCHUP: + case JOINING: + if (statusCheck->canPromote()) + recover(); + else { + QPID_LOG(error, logPrefix << "Cluster already active, cannot be promoted"); + throw Exception("Cluster already active, cannot be promoted."); + } + break; + case CATCHUP: QPID_LOG(error, logPrefix << "Still catching up, cannot be promoted."); throw Exception("Still catching up, cannot be promoted."); break; @@ -219,7 +227,6 @@ void HaBroker::setClientUrl(const Url& url) { void HaBroker::updateClientUrl(Mutex::ScopedLock&) { Url url = clientUrl.empty() ? brokerUrl : clientUrl; - if (url.empty()) throw Url::Invalid("HA client URL is empty"); mgmtObject->set_publicUrl(url.str()); knownBrokers.clear(); knownBrokers.push_back(url); @@ -230,10 +237,10 @@ void HaBroker::setBrokerUrl(const Url& url) { boost::shared_ptr<Backup> b; { Mutex::ScopedLock l(lock); - if (url.empty()) throw Url::Invalid("HA broker URL is empty"); brokerUrl = url; mgmtObject->set_brokersUrl(brokerUrl.str()); QPID_LOG(info, logPrefix << "Broker URL set to: " << url); + if (status == JOINING && statusCheck.get()) statusCheck->setUrl(url); // Updating broker URL also updates defaulted client URL: if (clientUrl.empty()) updateClientUrl(l); b = backup; diff --git a/cpp/src/qpid/ha/HaBroker.h b/cpp/src/qpid/ha/HaBroker.h index 3b39b9ec3d..48db0a8d3c 100644 --- a/cpp/src/qpid/ha/HaBroker.h +++ b/cpp/src/qpid/ha/HaBroker.h @@ -53,6 +53,7 @@ namespace ha { class Backup; class ConnectionObserver; class Primary; +class StatusCheck; /** * HA state and actions associated with a HA broker. Holds all the management info. @@ -130,6 +131,7 @@ class HaBroker : public management::Manageable BrokerInfo brokerInfo; Membership membership; ReplicationTest replicationTest; + std::auto_ptr<StatusCheck> statusCheck; }; }} // namespace qpid::ha diff --git a/cpp/src/qpid/ha/StatusCheck.cpp b/cpp/src/qpid/ha/StatusCheck.cpp new file mode 100644 index 0000000000..e4597a5a45 --- /dev/null +++ b/cpp/src/qpid/ha/StatusCheck.cpp @@ -0,0 +1,132 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "StatusCheck.h" +#include "qpid/log/Statement.h" +#include "qpid/messaging/Address.h" +#include "qpid/messaging/Connection.h" +#include "qpid/messaging/Message.h" +#include "qpid/messaging/Receiver.h" +#include "qpid/messaging/Sender.h" +#include "qpid/messaging/Session.h" +#include "qpid/types/Variant.h" + +namespace qpid { +namespace ha { + +using namespace qpid::messaging; +using namespace qpid::types; +using namespace std; +using namespace sys; + +const string HA_BROKER = "org.apache.qpid.ha:habroker:ha-broker"; + +class StatusCheckThread : public sys::Runnable { + public: + StatusCheckThread(StatusCheck& sc, const qpid::Address& addr, const BrokerInfo& self) + : url(addr), statusCheck(sc), brokerInfo(self) {} + void run(); + private: + Url url; + StatusCheck& statusCheck; + uint16_t linkHeartbeatInterval; + BrokerInfo brokerInfo; +}; + +void StatusCheckThread::run() { + QPID_LOG(debug, statusCheck.logPrefix << "Checking status of " << url); + Variant::Map options, clientProperties; + clientProperties = brokerInfo.asMap(); // Detect self connections. + clientProperties["qpid.ha-admin"] = 1; // Allow connection to backups. + + options["client-properties"] = clientProperties; + options["heartbeat"] = statusCheck.linkHeartbeatInterval; + Connection c(url.str(), options); + + try { + c.open(); + Session session = c.createSession(); + messaging::Address responses("#;{create:always,node:{x-declare:{exclusive:True,auto-delete:True,arguments:{'qpid.replicate':none}}}}"); + Receiver r = session.createReceiver(responses); + Sender s = session.createSender("qmf.default.direct/broker"); + Message request; + request.setReplyTo(responses); + request.setContentType("amqp/map"); + request.setProperty("x-amqp-0-10.app-id", "qmf2"); + request.setProperty("qmf.opcode", "_query_request"); + Variant::Map oid; + oid["_object_name"] = HA_BROKER; + Variant::Map content; + content["_what"] = "OBJECT"; + content["_object_id"] = oid; + encode(content, request); + s.send(request); + Message response = r.fetch(statusCheck.linkHeartbeatInterval*Duration::SECOND); + session.acknowledge(); + Variant::List contentIn; + decode(response, contentIn); + if (contentIn.size() == 1) { + Variant::Map details = contentIn.front().asMap()["_values"].asMap(); + string status = details["status"].getString(); + if (status != "joining") { + statusCheck.setPromote(false); + QPID_LOG(error, statusCheck.logPrefix << "Broker " << url << " status is " << status + << " this broker will refuse promotion."); + } + QPID_LOG(debug, statusCheck.logPrefix << "Status of " << url << ": " << status); + } + } catch(const exception& error) { + QPID_LOG(warning, "Error checking status of " << url << ": " << error.what()); + } + delete this; +} + +StatusCheck::StatusCheck(const string& lp, uint16_t lh, const BrokerInfo& self) + : logPrefix(lp), promote(true), linkHeartbeatInterval(lh), brokerInfo(self) +{} + +StatusCheck::~StatusCheck() { + // Join any leftovers + for (size_t i = 0; i < threads.size(); ++i) threads[i].join(); +} + +void StatusCheck::setUrl(const Url& url) { + Mutex::ScopedLock l(lock); + for (size_t i = 0; i < url.size(); ++i) + threads.push_back(Thread(new StatusCheckThread(*this, url[i], brokerInfo))); +} + +bool StatusCheck::canPromote() { + Mutex::ScopedLock l(lock); + while (!threads.empty()) { + Thread t = threads.back(); + threads.pop_back(); + Mutex::ScopedUnlock u(lock); + t.join(); + } + return promote; +} + +void StatusCheck::setPromote(bool p) { + Mutex::ScopedLock l(lock); + promote = p; +} + +}} // namespace qpid::ha diff --git a/cpp/src/qpid/ha/StatusCheck.h b/cpp/src/qpid/ha/StatusCheck.h new file mode 100644 index 0000000000..3c62c43a22 --- /dev/null +++ b/cpp/src/qpid/ha/StatusCheck.h @@ -0,0 +1,64 @@ +#ifndef QPID_HA_STATUSCHECK_H +#define QPID_HA_STATUSCHECK_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "BrokerInfo.h" +#include "qpid/Url.h" +#include "qpid/sys/Thread.h" +#include "qpid/sys/Mutex.h" +#include "qpid/sys/Runnable.h" +#include <vector> + +namespace qpid { +namespace ha { + +/** + * Check whether a JOINING broker can be promoted . + * + * A JOINING broker can be promoted as long as all the other brokers are also + * JOINING. If there are READY brokers in the cluster the JOINING broker should + * refuse to promote so that one of the READY brokers can. This situation + * only comes about if the primary is dead and no new primary has been promoted. + * + * THREAD SAFE: setUrl and canPromote are called in arbitrary management threads. + */ +class StatusCheck +{ + public: + StatusCheck(const std::string& logPrefix, uint16_t linkHeartbeatInteval, const BrokerInfo& self); + ~StatusCheck(); + void setUrl(const Url&); + bool canPromote(); + void setPromote(bool p); + private: + std::string logPrefix; + sys::Mutex lock; + std::vector<sys::Thread> threads; + bool promote; + uint16_t linkHeartbeatInterval; + BrokerInfo brokerInfo; + friend class StatusCheckThread; +}; +}} // namespace qpid::ha + +#endif /*!QPID_HA_STATUSCHECK_H*/ diff --git a/cpp/src/tests/ha_tests.py b/cpp/src/tests/ha_tests.py index f1620cf55d..a8d16a77c9 100755 --- a/cpp/src/tests/ha_tests.py +++ b/cpp/src/tests/ha_tests.py @@ -866,6 +866,20 @@ class RecoveryTests(BrokerTest): s.sync(timeout=1) # And released after the timeout. self.assertEqual(cluster[2].ha_status(), "active") + def test_join_ready_cluster(self): + """If we join a cluster where the primary is dead, the new primary is + not yet promoted and there are ready backups then we should refuse + promotion so that one of the ready backups can be chosen.""" + # FIXME aconway 2012-10-05: smaller timeout + cluster = HaCluster(self, 2, args=["--link-heartbeat-interval", 1]) + cluster[0].wait_status("active") + cluster[1].wait_status("ready") + cluster.bounce(0, promote_next=False) + self.assertRaises(Exception, cluster[0].promote) + os.kill(cluster[1].pid, signal.SIGSTOP) # Test for timeout if unresponsive. + cluster.bounce(0, promote_next=False) + cluster[0].promote() + if __name__ == "__main__": shutil.rmtree("brokertest.tmp", True) qpid_ha = os.getenv("QPID_HA_EXEC") |