diff options
author | Alan Conway <aconway@apache.org> | 2014-01-15 19:28:44 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2014-01-15 19:28:44 +0000 |
commit | 4b6e899e06106dd310810c7813d689cd2a23c42b (patch) | |
tree | 2118d9b3760eec1e5320be3276713d71cb0ab0e7 | |
parent | c21a787325efe24dbc297bf65ada518a8e493845 (diff) | |
download | qpid-python-4b6e899e06106dd310810c7813d689cd2a23c42b.tar.gz |
QPID-5482: HA Backup becomes useless if a connection-forced error is raised.
Backup will now shut-down with critical error if it receives a connection-forced
while trying to connect to the primary so the problem is obvious.
ha/StatusCheck: don't use the HaBroker outside the StatusCheck constructor
as it may be deleted. Avoids core dump if broker shuts down early while
status check is ongoing.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1558538 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | qpid/cpp/src/qpid/ha/BrokerReplicator.cpp | 17 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/ha/BrokerReplicator.h | 8 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/ha/Membership.cpp | 2 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/ha/StatusCheck.cpp | 28 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/ha/StatusCheck.h | 4 |
5 files changed, 39 insertions, 20 deletions
diff --git a/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp b/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp index 33b2d65ba6..e9734170b8 100644 --- a/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp +++ b/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp @@ -863,6 +863,23 @@ bool BrokerReplicator::unbind(boost::shared_ptr<Queue>, const string&, const fra bool BrokerReplicator::isBound(boost::shared_ptr<Queue>, const string* const, const framing::FieldTable* const) { return false; } bool BrokerReplicator::hasBindings() { return false; } +// ConnectionObserver methods +void BrokerReplicator::connection(broker::Connection&) {} +void BrokerReplicator::opened(broker::Connection&) {} + +void BrokerReplicator::closed(broker::Connection& c) { + if (link && &c == connect) disconnected(); +} + +void BrokerReplicator::forced(broker::Connection& c, const std::string& message) { + if (link && &c == link->getConnection()) { + haBroker.shutdown( + QPID_MSG(logPrefix << "Connection forced, cluster may be misconfigured: " + << message)); + } + closed(c); +} + string BrokerReplicator::getType() const { return QPID_CONFIGURATION_REPLICATOR; } void BrokerReplicator::disconnectedQueueReplicator(boost::shared_ptr<Exchange> ex) { diff --git a/qpid/cpp/src/qpid/ha/BrokerReplicator.h b/qpid/cpp/src/qpid/ha/BrokerReplicator.h index b3e3fe3223..a6bf02c392 100644 --- a/qpid/cpp/src/qpid/ha/BrokerReplicator.h +++ b/qpid/cpp/src/qpid/ha/BrokerReplicator.h @@ -90,10 +90,10 @@ class BrokerReplicator : public broker::Exchange, bool hasBindings(); // ConnectionObserver methods - void connection(broker::Connection&) {} - void opened(broker::Connection&) {} - void closed(broker::Connection& c) { if (link && &c == connect) disconnected(); } - void forced(broker::Connection& c, const std::string& /*message*/) { closed(c); } + void connection(broker::Connection&); + void opened(broker::Connection&); + void closed(broker::Connection&); + void forced(broker::Connection&, const std::string& /*message*/); QueueReplicatorPtr findQueueReplicator(const std::string& qname); diff --git a/qpid/cpp/src/qpid/ha/Membership.cpp b/qpid/cpp/src/qpid/ha/Membership.cpp index 2f8ef1da97..fdb47014d9 100644 --- a/qpid/cpp/src/qpid/ha/Membership.cpp +++ b/qpid/cpp/src/qpid/ha/Membership.cpp @@ -146,7 +146,7 @@ bool checkTransition(BrokerStatus from, BrokerStatus to) { void Membership::update(Mutex::ScopedLock& l) { QPID_LOG(info, "Membership: " << brokers); -// Update managment and send update event. + // Update managment and send update event. BrokerStatus newStatus = getStatus(l); Variant::List brokerList = asList(l); if (mgmtObject) { diff --git a/qpid/cpp/src/qpid/ha/StatusCheck.cpp b/qpid/cpp/src/qpid/ha/StatusCheck.cpp index fdc256d5a2..4962570424 100644 --- a/qpid/cpp/src/qpid/ha/StatusCheck.cpp +++ b/qpid/cpp/src/qpid/ha/StatusCheck.cpp @@ -57,20 +57,19 @@ void StatusCheckThread::run() { try { // Check for self connections Variant::Map options, clientProperties; - clientProperties[ConnectionObserver::ADMIN_TAG] = 1; // Allow connection to backups. + clientProperties[ConnectionObserver::ADMIN_TAG] = 1; // Allow connection to backups clientProperties[ConnectionObserver::ADDRESS_TAG] = url.str(); - clientProperties[ConnectionObserver::BACKUP_TAG] = statusCheck.haBroker.getBrokerInfo().asMap(); + clientProperties[ConnectionObserver::BACKUP_TAG] = statusCheck.brokerInfo.asMap(); // Set connection options - Settings settings(statusCheck.haBroker.getSettings()); + const Settings& settings = statusCheck.settings; if (settings.username.size()) options["username"] = settings.username; if (settings.password.size()) options["password"] = settings.password; if (settings.mechanism.size()) options["sasl_mechanisms"] = settings.mechanism; options["client-properties"] = clientProperties; - sys::Duration heartbeat(statusCheck.haBroker.getBroker().getOptions().linkHeartbeatInterval); - options["heartbeat"] = heartbeat/sys::TIME_SEC; - c = Connection(url.str(), options); + options["heartbeat"] = statusCheck.heartbeat/sys::TIME_SEC; + c = Connection(url.str(), options); c.open(); Session session = c.createSession(); messaging::Address responses("#;{create:always,node:{x-declare:{exclusive:True,auto-delete:True,arguments:{'qpid.replicate':none}}}}"); @@ -88,7 +87,7 @@ void StatusCheckThread::run() { content["_object_id"] = oid; encode(content, request); s.send(request); - messaging::Duration timeout(heartbeat/sys::TIME_MSEC); + messaging::Duration timeout(statusCheck.heartbeat/sys::TIME_MSEC); Message response = r.fetch(timeout); session.acknowledge(); Variant::List contentIn; @@ -103,17 +102,18 @@ void StatusCheckThread::run() { } } else - QPID_LOG(error, logPrefix << "Invalid response " << response.getContent()) - } catch(const exception& error) { - // Its not an error to fail to connect to self. - if (statusCheck.haBroker.getBrokerInfo().getAddress() != url[0]) - QPID_LOG(warning, logPrefix << error.what()); - } + QPID_LOG(error, logPrefix << "Invalid response " << response.getContent()); + } catch(...) {} try { c.close(); } catch(...) {} delete this; } -StatusCheck::StatusCheck(HaBroker& hb) : promote(true), haBroker(hb) +// Note: Don't use hb outside of the constructor, it may be deleted. +StatusCheck::StatusCheck(HaBroker& hb) : + promote(true), + settings(hb.getSettings()), + heartbeat(hb.getBroker().getOptions().linkHeartbeatInterval), + brokerInfo(hb.getBrokerInfo()) {} StatusCheck::~StatusCheck() { diff --git a/qpid/cpp/src/qpid/ha/StatusCheck.h b/qpid/cpp/src/qpid/ha/StatusCheck.h index 8896969f55..087e586b2b 100644 --- a/qpid/cpp/src/qpid/ha/StatusCheck.h +++ b/qpid/cpp/src/qpid/ha/StatusCheck.h @@ -65,7 +65,9 @@ class StatusCheck sys::Mutex lock; std::vector<sys::Thread> threads; bool promote; - HaBroker& haBroker; + const Settings settings; + const sys::Duration heartbeat; + const BrokerInfo brokerInfo; friend class StatusCheckThread; }; |