summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2014-01-15 19:28:44 +0000
committerAlan Conway <aconway@apache.org>2014-01-15 19:28:44 +0000
commit4b6e899e06106dd310810c7813d689cd2a23c42b (patch)
tree2118d9b3760eec1e5320be3276713d71cb0ab0e7
parentc21a787325efe24dbc297bf65ada518a8e493845 (diff)
downloadqpid-python-4b6e899e06106dd310810c7813d689cd2a23c42b.tar.gz
QPID-5482: HA Backup becomes useless if a connection-forced error is raised.
Backup will now shut-down with critical error if it receives a connection-forced while trying to connect to the primary so the problem is obvious. ha/StatusCheck: don't use the HaBroker outside the StatusCheck constructor as it may be deleted. Avoids core dump if broker shuts down early while status check is ongoing. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1558538 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/src/qpid/ha/BrokerReplicator.cpp17
-rw-r--r--qpid/cpp/src/qpid/ha/BrokerReplicator.h8
-rw-r--r--qpid/cpp/src/qpid/ha/Membership.cpp2
-rw-r--r--qpid/cpp/src/qpid/ha/StatusCheck.cpp28
-rw-r--r--qpid/cpp/src/qpid/ha/StatusCheck.h4
5 files changed, 39 insertions, 20 deletions
diff --git a/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp b/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
index 33b2d65ba6..e9734170b8 100644
--- a/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
+++ b/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
@@ -863,6 +863,23 @@ bool BrokerReplicator::unbind(boost::shared_ptr<Queue>, const string&, const fra
bool BrokerReplicator::isBound(boost::shared_ptr<Queue>, const string* const, const framing::FieldTable* const) { return false; }
bool BrokerReplicator::hasBindings() { return false; }
+// ConnectionObserver methods
+void BrokerReplicator::connection(broker::Connection&) {}
+void BrokerReplicator::opened(broker::Connection&) {}
+
+void BrokerReplicator::closed(broker::Connection& c) {
+ if (link && &c == connect) disconnected();
+}
+
+void BrokerReplicator::forced(broker::Connection& c, const std::string& message) {
+ if (link && &c == link->getConnection()) {
+ haBroker.shutdown(
+ QPID_MSG(logPrefix << "Connection forced, cluster may be misconfigured: "
+ << message));
+ }
+ closed(c);
+}
+
string BrokerReplicator::getType() const { return QPID_CONFIGURATION_REPLICATOR; }
void BrokerReplicator::disconnectedQueueReplicator(boost::shared_ptr<Exchange> ex) {
diff --git a/qpid/cpp/src/qpid/ha/BrokerReplicator.h b/qpid/cpp/src/qpid/ha/BrokerReplicator.h
index b3e3fe3223..a6bf02c392 100644
--- a/qpid/cpp/src/qpid/ha/BrokerReplicator.h
+++ b/qpid/cpp/src/qpid/ha/BrokerReplicator.h
@@ -90,10 +90,10 @@ class BrokerReplicator : public broker::Exchange,
bool hasBindings();
// ConnectionObserver methods
- void connection(broker::Connection&) {}
- void opened(broker::Connection&) {}
- void closed(broker::Connection& c) { if (link && &c == connect) disconnected(); }
- void forced(broker::Connection& c, const std::string& /*message*/) { closed(c); }
+ void connection(broker::Connection&);
+ void opened(broker::Connection&);
+ void closed(broker::Connection&);
+ void forced(broker::Connection&, const std::string& /*message*/);
QueueReplicatorPtr findQueueReplicator(const std::string& qname);
diff --git a/qpid/cpp/src/qpid/ha/Membership.cpp b/qpid/cpp/src/qpid/ha/Membership.cpp
index 2f8ef1da97..fdb47014d9 100644
--- a/qpid/cpp/src/qpid/ha/Membership.cpp
+++ b/qpid/cpp/src/qpid/ha/Membership.cpp
@@ -146,7 +146,7 @@ bool checkTransition(BrokerStatus from, BrokerStatus to) {
void Membership::update(Mutex::ScopedLock& l) {
QPID_LOG(info, "Membership: " << brokers);
-// Update managment and send update event.
+ // Update managment and send update event.
BrokerStatus newStatus = getStatus(l);
Variant::List brokerList = asList(l);
if (mgmtObject) {
diff --git a/qpid/cpp/src/qpid/ha/StatusCheck.cpp b/qpid/cpp/src/qpid/ha/StatusCheck.cpp
index fdc256d5a2..4962570424 100644
--- a/qpid/cpp/src/qpid/ha/StatusCheck.cpp
+++ b/qpid/cpp/src/qpid/ha/StatusCheck.cpp
@@ -57,20 +57,19 @@ void StatusCheckThread::run() {
try {
// Check for self connections
Variant::Map options, clientProperties;
- clientProperties[ConnectionObserver::ADMIN_TAG] = 1; // Allow connection to backups.
+ clientProperties[ConnectionObserver::ADMIN_TAG] = 1; // Allow connection to backups
clientProperties[ConnectionObserver::ADDRESS_TAG] = url.str();
- clientProperties[ConnectionObserver::BACKUP_TAG] = statusCheck.haBroker.getBrokerInfo().asMap();
+ clientProperties[ConnectionObserver::BACKUP_TAG] = statusCheck.brokerInfo.asMap();
// Set connection options
- Settings settings(statusCheck.haBroker.getSettings());
+ const Settings& settings = statusCheck.settings;
if (settings.username.size()) options["username"] = settings.username;
if (settings.password.size()) options["password"] = settings.password;
if (settings.mechanism.size()) options["sasl_mechanisms"] = settings.mechanism;
options["client-properties"] = clientProperties;
- sys::Duration heartbeat(statusCheck.haBroker.getBroker().getOptions().linkHeartbeatInterval);
- options["heartbeat"] = heartbeat/sys::TIME_SEC;
- c = Connection(url.str(), options);
+ options["heartbeat"] = statusCheck.heartbeat/sys::TIME_SEC;
+ c = Connection(url.str(), options);
c.open();
Session session = c.createSession();
messaging::Address responses("#;{create:always,node:{x-declare:{exclusive:True,auto-delete:True,arguments:{'qpid.replicate':none}}}}");
@@ -88,7 +87,7 @@ void StatusCheckThread::run() {
content["_object_id"] = oid;
encode(content, request);
s.send(request);
- messaging::Duration timeout(heartbeat/sys::TIME_MSEC);
+ messaging::Duration timeout(statusCheck.heartbeat/sys::TIME_MSEC);
Message response = r.fetch(timeout);
session.acknowledge();
Variant::List contentIn;
@@ -103,17 +102,18 @@ void StatusCheckThread::run() {
}
}
else
- QPID_LOG(error, logPrefix << "Invalid response " << response.getContent())
- } catch(const exception& error) {
- // Its not an error to fail to connect to self.
- if (statusCheck.haBroker.getBrokerInfo().getAddress() != url[0])
- QPID_LOG(warning, logPrefix << error.what());
- }
+ QPID_LOG(error, logPrefix << "Invalid response " << response.getContent());
+ } catch(...) {}
try { c.close(); } catch(...) {}
delete this;
}
-StatusCheck::StatusCheck(HaBroker& hb) : promote(true), haBroker(hb)
+// Note: Don't use hb outside of the constructor, it may be deleted.
+StatusCheck::StatusCheck(HaBroker& hb) :
+ promote(true),
+ settings(hb.getSettings()),
+ heartbeat(hb.getBroker().getOptions().linkHeartbeatInterval),
+ brokerInfo(hb.getBrokerInfo())
{}
StatusCheck::~StatusCheck() {
diff --git a/qpid/cpp/src/qpid/ha/StatusCheck.h b/qpid/cpp/src/qpid/ha/StatusCheck.h
index 8896969f55..087e586b2b 100644
--- a/qpid/cpp/src/qpid/ha/StatusCheck.h
+++ b/qpid/cpp/src/qpid/ha/StatusCheck.h
@@ -65,7 +65,9 @@ class StatusCheck
sys::Mutex lock;
std::vector<sys::Thread> threads;
bool promote;
- HaBroker& haBroker;
+ const Settings settings;
+ const sys::Duration heartbeat;
+ const BrokerInfo brokerInfo;
friend class StatusCheckThread;
};