summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2012-08-06 20:44:58 +0000
committerAlan Conway <aconway@apache.org>2012-08-06 20:44:58 +0000
commitb4bf620d26fcb029186be1c1ab9b73ea05eb0d37 (patch)
tree81831e351264ab5a77dcb6b5faf6b74941a1668e
parente4347cca011e0c7630e835bd96bc66b3e4e9a31c (diff)
downloadqpid-python-b4bf620d26fcb029186be1c1ab9b73ea05eb0d37.tar.gz
QPID-4191: HA removing self address breaks if a VIP is used.
Pre this patch the HA broker removed its own address from the set of cluster addresses to form the set of failover addresses. The goal was avoid useless self-connection attempts. However this was broken with a Virtual IP address where a single address is used for the entire cluster. The remove-self is not essential, self-connection attempts are prevented elsewhere. Backup brokers will be prevented from connecting to self by the same connection-observer as normal clients, and this patch addes self-connection checks ins This patch - removes the code to remove self-addresses - adds self-connection checks in ConnectionObserver - adds & reorders some log statements & comments for greater clarity. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1370002 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/src/qpid/ha/Backup.cpp32
-rw-r--r--qpid/cpp/src/qpid/ha/Backup.h2
-rw-r--r--qpid/cpp/src/qpid/ha/BackupConnectionExcluder.h2
-rw-r--r--qpid/cpp/src/qpid/ha/BrokerReplicator.cpp3
-rw-r--r--qpid/cpp/src/qpid/ha/ConnectionObserver.cpp22
-rw-r--r--qpid/cpp/src/qpid/ha/ConnectionObserver.h4
-rw-r--r--qpid/cpp/src/qpid/ha/HaBroker.cpp14
-rw-r--r--qpid/cpp/src/qpid/ha/HaBroker.h2
-rw-r--r--qpid/cpp/src/qpid/ha/Primary.cpp12
-rw-r--r--qpid/cpp/src/qpid/ha/RemoteBackup.cpp2
10 files changed, 41 insertions, 54 deletions
diff --git a/qpid/cpp/src/qpid/ha/Backup.cpp b/qpid/cpp/src/qpid/ha/Backup.cpp
index bac6fd23c8..e099554df6 100644
--- a/qpid/cpp/src/qpid/ha/Backup.cpp
+++ b/qpid/cpp/src/qpid/ha/Backup.cpp
@@ -51,35 +51,15 @@ Backup::Backup(HaBroker& hb, const Settings& s) :
if (!s.brokerUrl.empty()) initialize(Url(s.brokerUrl));
}
-bool Backup::isSelf(const Address& a) const {
- return sys::SystemInfo::isLocalHost(a.host) &&
- a.port == haBroker.getBroker().getPort(a.protocol);
-}
-
-// Remove my own address from the URL if possible.
-// This isn't 100% reliable given the many ways to specify a host,
-// but should work in most cases. We have additional measures to prevent
-// self-connection in ConnectionObserver
-Url Backup::removeSelf(const Url& brokers) const {
- Url url;
- for (Url::const_iterator i = brokers.begin(); i != brokers.end(); ++i)
- if (!isSelf(*i)) url.push_back(*i);
- if (url.empty())
- throw Url::Invalid(logPrefix+"Failover URL is empty");
- QPID_LOG(debug, logPrefix << "Failover URL (excluding self): " << url);
- return url;
-}
-
void Backup::initialize(const Url& brokers) {
if (brokers.empty()) throw Url::Invalid("HA broker URL is empty");
QPID_LOG(info, logPrefix << "Connecting to cluster, broker URL: " << brokers);
- Url url = removeSelf(brokers);
- string protocol = url[0].protocol.empty() ? "tcp" : url[0].protocol;
+ string protocol = brokers[0].protocol.empty() ? "tcp" : brokers[0].protocol;
types::Uuid uuid(true);
// Declare the link
std::pair<Link::shared_ptr, bool> result = broker.getLinks().declare(
broker::QPID_NAME_PREFIX + string("ha.link.") + uuid.str(),
- url[0].host, url[0].port, protocol,
+ brokers[0].host, brokers[0].port, protocol,
false, // durable
settings.mechanism, settings.username, settings.password,
false); // no amq.failover - don't want to use client URL.
@@ -90,7 +70,7 @@ void Backup::initialize(const Url& brokers) {
replicator->initialize();
broker.getExchanges().registerExchange(replicator);
}
- link->setUrl(url); // Outside the lock, once set link doesn't change.
+ link->setUrl(brokers); // Outside the lock, once set link doesn't change.
}
Backup::~Backup() {
@@ -107,10 +87,8 @@ void Backup::setBrokerUrl(const Url& url) {
sys::Mutex::ScopedLock l(lock);
linkSet = link;
}
- if (linkSet) {
- QPID_LOG(info, logPrefix << "Broker URL set to: " << url);
- link->setUrl(removeSelf(url)); // Outside lock, once set link doesn't change
- }
+ if (linkSet)
+ link->setUrl(url); // Outside lock, once set link doesn't change
else
initialize(url); // Deferred initialization
}
diff --git a/qpid/cpp/src/qpid/ha/Backup.h b/qpid/cpp/src/qpid/ha/Backup.h
index 1233a473ec..4f2d5babde 100644
--- a/qpid/cpp/src/qpid/ha/Backup.h
+++ b/qpid/cpp/src/qpid/ha/Backup.h
@@ -53,8 +53,6 @@ class Backup
void setStatus(BrokerStatus);
private:
- bool isSelf(const Address& a) const;
- Url removeSelf(const Url&) const;
void initialize(const Url&);
std::string logPrefix;
diff --git a/qpid/cpp/src/qpid/ha/BackupConnectionExcluder.h b/qpid/cpp/src/qpid/ha/BackupConnectionExcluder.h
index ef537ab90a..5a67cde922 100644
--- a/qpid/cpp/src/qpid/ha/BackupConnectionExcluder.h
+++ b/qpid/cpp/src/qpid/ha/BackupConnectionExcluder.h
@@ -36,7 +36,7 @@ class BackupConnectionExcluder : public broker::ConnectionObserver
{
public:
void opened(broker::Connection& connection) {
- QPID_LOG(debug, "Backup broker rejected connection "+connection.getMgmtId());
+ QPID_LOG(debug, "Backup: Rejected connection "+connection.getMgmtId());
connection.abort();
}
diff --git a/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp b/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
index c8c4a42d72..cf413f35d0 100644
--- a/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
+++ b/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
@@ -474,7 +474,8 @@ void BrokerReplicator::doResponseExchange(Variant::Map& values) {
boost::shared_ptr<Exchange> exchange = createExchange(
name, values[TYPE].asString(), values[DURABLE].asBool(), args,
getAltExchange(values[ALTEXCHANGE]));
- QPID_LOG_IF(debug, !exchange, logPrefix << "Exchange already exists: " << name);
+ // It is normal for the exchange to already exist if we are failing over.
+ QPID_LOG_IF(debug, !exchange, logPrefix << "Exchange already replicated: " << name);
}
namespace {
diff --git a/qpid/cpp/src/qpid/ha/ConnectionObserver.cpp b/qpid/cpp/src/qpid/ha/ConnectionObserver.cpp
index 3f7a1710d9..81ba3e4301 100644
--- a/qpid/cpp/src/qpid/ha/ConnectionObserver.cpp
+++ b/qpid/cpp/src/qpid/ha/ConnectionObserver.cpp
@@ -32,7 +32,7 @@ namespace ha {
ConnectionObserver::ConnectionObserver(HaBroker& hb, const types::Uuid& uuid)
: haBroker(hb), logPrefix("Connections: "), self(uuid) {}
-bool ConnectionObserver::getBrokerInfo(broker::Connection& connection, BrokerInfo& info) {
+bool ConnectionObserver::getBrokerInfo(const broker::Connection& connection, BrokerInfo& info) {
framing::FieldTable ft;
if (connection.getClientProperties().getTable(ConnectionObserver::BACKUP_TAG, ft)) {
info = BrokerInfo(ft);
@@ -51,21 +51,23 @@ ConnectionObserver::ObserverPtr ConnectionObserver::getObserver() {
return observer;
}
+bool ConnectionObserver::isSelf(const broker::Connection& connection) {
+ BrokerInfo info;
+ return getBrokerInfo(connection, info) && info.getSystemId() == self;
+}
+
void ConnectionObserver::opened(broker::Connection& connection) {
try {
if (connection.isLink()) return; // Allow outgoing links.
if (connection.getClientProperties().isSet(ADMIN_TAG)) {
- QPID_LOG(debug, logPrefix << "Allowing admin connection: "
+ QPID_LOG(debug, logPrefix << "Accepted admin connection: "
<< connection.getMgmtId());
return; // No need to call observer, always allow admins.
}
- BrokerInfo info; // Avoid self connections.
- if (getBrokerInfo(connection, info)) {
- if (info.getSystemId() == self) {
- QPID_LOG(debug, "HA broker rejected self connection "+connection.getMgmtId());
- connection.abort();
- }
-
+ if (isSelf(connection)) { // Reject self connections
+ QPID_LOG(debug, logPrefix << "Rejected self connection "+connection.getMgmtId());
+ connection.abort();
+ return;
}
ObserverPtr o(getObserver());
if (o) o->opened(connection);
@@ -77,8 +79,8 @@ void ConnectionObserver::opened(broker::Connection& connection) {
}
void ConnectionObserver::closed(broker::Connection& connection) {
+ if (isSelf(connection)) return; // Ignore closing of self connections.
try {
- BrokerInfo info;
ObserverPtr o(getObserver());
if (o) o->closed(connection);
}
diff --git a/qpid/cpp/src/qpid/ha/ConnectionObserver.h b/qpid/cpp/src/qpid/ha/ConnectionObserver.h
index 5c1dabe8f8..e3a6d1154a 100644
--- a/qpid/cpp/src/qpid/ha/ConnectionObserver.h
+++ b/qpid/cpp/src/qpid/ha/ConnectionObserver.h
@@ -51,7 +51,7 @@ class ConnectionObserver : public broker::ConnectionObserver
static const std::string ADMIN_TAG;
static const std::string BACKUP_TAG;
- static bool getBrokerInfo(broker::Connection& connection, BrokerInfo& info);
+ static bool getBrokerInfo(const broker::Connection& connection, BrokerInfo& info);
ConnectionObserver(HaBroker& haBroker, const types::Uuid& self);
@@ -62,6 +62,8 @@ class ConnectionObserver : public broker::ConnectionObserver
void closed(broker::Connection& connection);
private:
+ bool isSelf(const broker::Connection&);
+
sys::Mutex lock;
HaBroker& haBroker;
std::string logPrefix;
diff --git a/qpid/cpp/src/qpid/ha/HaBroker.cpp b/qpid/cpp/src/qpid/ha/HaBroker.cpp
index d126639813..ffbcb684bc 100644
--- a/qpid/cpp/src/qpid/ha/HaBroker.cpp
+++ b/qpid/cpp/src/qpid/ha/HaBroker.cpp
@@ -83,7 +83,11 @@ void HaBroker::initialize() {
// FIXME aconway 2012-07-19: assumes there's a TCP transport with a meaningful port.
brokerInfo = BrokerInfo(
- broker.getSystem()->getNodeName(), broker.getPort(broker::Broker::TCP_TRANSPORT), systemId);
+ broker.getSystem()->getNodeName(),
+ broker.getPort(broker::Broker::TCP_TRANSPORT),
+ systemId);
+
+ QPID_LOG(notice, logPrefix << "Initializing: " << brokerInfo);
// Set up the management object.
ManagementAgent* ma = broker.getManagementAgent();
@@ -111,8 +115,6 @@ void HaBroker::initialize() {
if (!settings.brokerUrl.empty()) setBrokerUrl(Url(settings.brokerUrl));
- QPID_LOG(notice, logPrefix << "Initializing: " << brokerInfo);
-
// NOTE: lock is not needed in a constructor, but create one
// to pass to functions that have a ScopedLock parameter.
Mutex::ScopedLock l(lock);
@@ -226,6 +228,7 @@ void HaBroker::setBrokerUrl(const Url& url) {
if (url.empty()) throw Url::Invalid("HA broker URL is empty");
brokerUrl = url;
mgmtObject->set_brokersUrl(brokerUrl.str());
+ QPID_LOG(info, logPrefix << "Broker URL set to: " << url);
if (backup.get()) backup->setBrokerUrl(brokerUrl);
// Updating broker URL also updates defaulted client URL:
if (clientUrl.empty()) updateClientUrl(l);
@@ -292,6 +295,7 @@ void HaBroker::statusChanged(Mutex::ScopedLock& l) {
}
void HaBroker::membershipUpdated(Mutex::ScopedLock&) {
+ QPID_LOG(info, logPrefix << "Membership changed: " << membership);
Variant::List brokers = membership.asList();
mgmtObject->set_members(brokers);
broker.getManagementAgent()->raiseEvent(_qmf::EventMembersUpdate(brokers));
@@ -321,14 +325,14 @@ void HaBroker::resetMembership(const BrokerInfo& b) {
void HaBroker::addBroker(const BrokerInfo& b) {
Mutex::ScopedLock l(lock);
membership.add(b);
- QPID_LOG(debug, logPrefix << "Membership add: " << b << " now: " << membership);
+ QPID_LOG(debug, logPrefix << "Membership add: " << b);
membershipUpdated(l);
}
void HaBroker::removeBroker(const Uuid& id) {
Mutex::ScopedLock l(lock);
membership.remove(id);
- QPID_LOG(debug, logPrefix << "Membership remove: " << id << " now: " << membership);
+ QPID_LOG(debug, logPrefix << "Membership remove: " << id);
membershipUpdated(l);
}
diff --git a/qpid/cpp/src/qpid/ha/HaBroker.h b/qpid/cpp/src/qpid/ha/HaBroker.h
index 0ffc152097..7dabe6e35b 100644
--- a/qpid/cpp/src/qpid/ha/HaBroker.h
+++ b/qpid/cpp/src/qpid/ha/HaBroker.h
@@ -97,6 +97,8 @@ class HaBroker : public management::Manageable
void addBroker(const BrokerInfo& b); // Add a broker to the membership.
void removeBroker(const types::Uuid& id); // Remove a broker from membership.
+ types::Uuid getSystemId() const { return systemId; }
+
private:
void setClientUrl(const Url&);
void setBrokerUrl(const Url&);
diff --git a/qpid/cpp/src/qpid/ha/Primary.cpp b/qpid/cpp/src/qpid/ha/Primary.cpp
index 69c94bfc7d..45a0e246f3 100644
--- a/qpid/cpp/src/qpid/ha/Primary.cpp
+++ b/qpid/cpp/src/qpid/ha/Primary.cpp
@@ -225,6 +225,12 @@ void Primary::opened(broker::Connection& connection) {
}
void Primary::closed(broker::Connection& connection) {
+ // NOTE: It is possible for a backup connection to be rejected while we are
+ // a backup, but closed() is called after we have become primary.
+ //
+ // For this reason we do not remove from the backups map here, the backups
+ // map holds all the backups we know about whether connected or not.
+ //
Mutex::ScopedLock l(lock);
BrokerInfo info;
if (ha::ConnectionObserver::getBrokerInfo(connection, info)) {
@@ -233,12 +239,6 @@ void Primary::closed(broker::Connection& connection) {
BackupMap::iterator i = backups.find(info.getSystemId());
if (i != backups.end()) i->second->setConnected(false);
}
- // NOTE: we do not remove from the backups map here, the backups map holds
- // all the backups we know about whether connected or not.
- //
- // It is possible for a backup connection to be rejected while we are a backup,
- // but the closed is seen after we have become primary. Removing the entry
- // from backups in this case would be incorrect.
}
diff --git a/qpid/cpp/src/qpid/ha/RemoteBackup.cpp b/qpid/cpp/src/qpid/ha/RemoteBackup.cpp
index a5693fd14e..9c7bf1e694 100644
--- a/qpid/cpp/src/qpid/ha/RemoteBackup.cpp
+++ b/qpid/cpp/src/qpid/ha/RemoteBackup.cpp
@@ -32,7 +32,7 @@ using sys::Mutex;
using boost::bind;
RemoteBackup::RemoteBackup(const BrokerInfo& info, ReplicationTest rt, bool con) :
- logPrefix("Primary remote backup "+info.getLogId()+": "),
+ logPrefix("Primary: Remote backup "+info.getLogId()+": "),
brokerInfo(info), replicationTest(rt), connected(con), reportedReady(false)
{}