diff options
author | Alan Conway <aconway@apache.org> | 2012-08-06 20:44:58 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2012-08-06 20:44:58 +0000 |
commit | b4bf620d26fcb029186be1c1ab9b73ea05eb0d37 (patch) | |
tree | 81831e351264ab5a77dcb6b5faf6b74941a1668e | |
parent | e4347cca011e0c7630e835bd96bc66b3e4e9a31c (diff) | |
download | qpid-python-b4bf620d26fcb029186be1c1ab9b73ea05eb0d37.tar.gz |
QPID-4191: HA removing self address breaks if a VIP is used.
Pre this patch the HA broker removed its own address from the set of cluster
addresses to form the set of failover addresses. The goal was avoid useless
self-connection attempts. However this was broken with a Virtual IP address
where a single address is used for the entire cluster.
The remove-self is not essential, self-connection attempts are prevented
elsewhere. Backup brokers will be prevented from connecting to self by the same
connection-observer as normal clients, and this patch addes self-connection
checks ins
This patch
- removes the code to remove self-addresses
- adds self-connection checks in ConnectionObserver
- adds & reorders some log statements & comments for greater clarity.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1370002 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | qpid/cpp/src/qpid/ha/Backup.cpp | 32 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/ha/Backup.h | 2 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/ha/BackupConnectionExcluder.h | 2 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/ha/BrokerReplicator.cpp | 3 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/ha/ConnectionObserver.cpp | 22 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/ha/ConnectionObserver.h | 4 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/ha/HaBroker.cpp | 14 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/ha/HaBroker.h | 2 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/ha/Primary.cpp | 12 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/ha/RemoteBackup.cpp | 2 |
10 files changed, 41 insertions, 54 deletions
diff --git a/qpid/cpp/src/qpid/ha/Backup.cpp b/qpid/cpp/src/qpid/ha/Backup.cpp index bac6fd23c8..e099554df6 100644 --- a/qpid/cpp/src/qpid/ha/Backup.cpp +++ b/qpid/cpp/src/qpid/ha/Backup.cpp @@ -51,35 +51,15 @@ Backup::Backup(HaBroker& hb, const Settings& s) : if (!s.brokerUrl.empty()) initialize(Url(s.brokerUrl)); } -bool Backup::isSelf(const Address& a) const { - return sys::SystemInfo::isLocalHost(a.host) && - a.port == haBroker.getBroker().getPort(a.protocol); -} - -// Remove my own address from the URL if possible. -// This isn't 100% reliable given the many ways to specify a host, -// but should work in most cases. We have additional measures to prevent -// self-connection in ConnectionObserver -Url Backup::removeSelf(const Url& brokers) const { - Url url; - for (Url::const_iterator i = brokers.begin(); i != brokers.end(); ++i) - if (!isSelf(*i)) url.push_back(*i); - if (url.empty()) - throw Url::Invalid(logPrefix+"Failover URL is empty"); - QPID_LOG(debug, logPrefix << "Failover URL (excluding self): " << url); - return url; -} - void Backup::initialize(const Url& brokers) { if (brokers.empty()) throw Url::Invalid("HA broker URL is empty"); QPID_LOG(info, logPrefix << "Connecting to cluster, broker URL: " << brokers); - Url url = removeSelf(brokers); - string protocol = url[0].protocol.empty() ? "tcp" : url[0].protocol; + string protocol = brokers[0].protocol.empty() ? "tcp" : brokers[0].protocol; types::Uuid uuid(true); // Declare the link std::pair<Link::shared_ptr, bool> result = broker.getLinks().declare( broker::QPID_NAME_PREFIX + string("ha.link.") + uuid.str(), - url[0].host, url[0].port, protocol, + brokers[0].host, brokers[0].port, protocol, false, // durable settings.mechanism, settings.username, settings.password, false); // no amq.failover - don't want to use client URL. @@ -90,7 +70,7 @@ void Backup::initialize(const Url& brokers) { replicator->initialize(); broker.getExchanges().registerExchange(replicator); } - link->setUrl(url); // Outside the lock, once set link doesn't change. + link->setUrl(brokers); // Outside the lock, once set link doesn't change. } Backup::~Backup() { @@ -107,10 +87,8 @@ void Backup::setBrokerUrl(const Url& url) { sys::Mutex::ScopedLock l(lock); linkSet = link; } - if (linkSet) { - QPID_LOG(info, logPrefix << "Broker URL set to: " << url); - link->setUrl(removeSelf(url)); // Outside lock, once set link doesn't change - } + if (linkSet) + link->setUrl(url); // Outside lock, once set link doesn't change else initialize(url); // Deferred initialization } diff --git a/qpid/cpp/src/qpid/ha/Backup.h b/qpid/cpp/src/qpid/ha/Backup.h index 1233a473ec..4f2d5babde 100644 --- a/qpid/cpp/src/qpid/ha/Backup.h +++ b/qpid/cpp/src/qpid/ha/Backup.h @@ -53,8 +53,6 @@ class Backup void setStatus(BrokerStatus); private: - bool isSelf(const Address& a) const; - Url removeSelf(const Url&) const; void initialize(const Url&); std::string logPrefix; diff --git a/qpid/cpp/src/qpid/ha/BackupConnectionExcluder.h b/qpid/cpp/src/qpid/ha/BackupConnectionExcluder.h index ef537ab90a..5a67cde922 100644 --- a/qpid/cpp/src/qpid/ha/BackupConnectionExcluder.h +++ b/qpid/cpp/src/qpid/ha/BackupConnectionExcluder.h @@ -36,7 +36,7 @@ class BackupConnectionExcluder : public broker::ConnectionObserver { public: void opened(broker::Connection& connection) { - QPID_LOG(debug, "Backup broker rejected connection "+connection.getMgmtId()); + QPID_LOG(debug, "Backup: Rejected connection "+connection.getMgmtId()); connection.abort(); } diff --git a/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp b/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp index c8c4a42d72..cf413f35d0 100644 --- a/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp +++ b/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp @@ -474,7 +474,8 @@ void BrokerReplicator::doResponseExchange(Variant::Map& values) { boost::shared_ptr<Exchange> exchange = createExchange( name, values[TYPE].asString(), values[DURABLE].asBool(), args, getAltExchange(values[ALTEXCHANGE])); - QPID_LOG_IF(debug, !exchange, logPrefix << "Exchange already exists: " << name); + // It is normal for the exchange to already exist if we are failing over. + QPID_LOG_IF(debug, !exchange, logPrefix << "Exchange already replicated: " << name); } namespace { diff --git a/qpid/cpp/src/qpid/ha/ConnectionObserver.cpp b/qpid/cpp/src/qpid/ha/ConnectionObserver.cpp index 3f7a1710d9..81ba3e4301 100644 --- a/qpid/cpp/src/qpid/ha/ConnectionObserver.cpp +++ b/qpid/cpp/src/qpid/ha/ConnectionObserver.cpp @@ -32,7 +32,7 @@ namespace ha { ConnectionObserver::ConnectionObserver(HaBroker& hb, const types::Uuid& uuid) : haBroker(hb), logPrefix("Connections: "), self(uuid) {} -bool ConnectionObserver::getBrokerInfo(broker::Connection& connection, BrokerInfo& info) { +bool ConnectionObserver::getBrokerInfo(const broker::Connection& connection, BrokerInfo& info) { framing::FieldTable ft; if (connection.getClientProperties().getTable(ConnectionObserver::BACKUP_TAG, ft)) { info = BrokerInfo(ft); @@ -51,21 +51,23 @@ ConnectionObserver::ObserverPtr ConnectionObserver::getObserver() { return observer; } +bool ConnectionObserver::isSelf(const broker::Connection& connection) { + BrokerInfo info; + return getBrokerInfo(connection, info) && info.getSystemId() == self; +} + void ConnectionObserver::opened(broker::Connection& connection) { try { if (connection.isLink()) return; // Allow outgoing links. if (connection.getClientProperties().isSet(ADMIN_TAG)) { - QPID_LOG(debug, logPrefix << "Allowing admin connection: " + QPID_LOG(debug, logPrefix << "Accepted admin connection: " << connection.getMgmtId()); return; // No need to call observer, always allow admins. } - BrokerInfo info; // Avoid self connections. - if (getBrokerInfo(connection, info)) { - if (info.getSystemId() == self) { - QPID_LOG(debug, "HA broker rejected self connection "+connection.getMgmtId()); - connection.abort(); - } - + if (isSelf(connection)) { // Reject self connections + QPID_LOG(debug, logPrefix << "Rejected self connection "+connection.getMgmtId()); + connection.abort(); + return; } ObserverPtr o(getObserver()); if (o) o->opened(connection); @@ -77,8 +79,8 @@ void ConnectionObserver::opened(broker::Connection& connection) { } void ConnectionObserver::closed(broker::Connection& connection) { + if (isSelf(connection)) return; // Ignore closing of self connections. try { - BrokerInfo info; ObserverPtr o(getObserver()); if (o) o->closed(connection); } diff --git a/qpid/cpp/src/qpid/ha/ConnectionObserver.h b/qpid/cpp/src/qpid/ha/ConnectionObserver.h index 5c1dabe8f8..e3a6d1154a 100644 --- a/qpid/cpp/src/qpid/ha/ConnectionObserver.h +++ b/qpid/cpp/src/qpid/ha/ConnectionObserver.h @@ -51,7 +51,7 @@ class ConnectionObserver : public broker::ConnectionObserver static const std::string ADMIN_TAG; static const std::string BACKUP_TAG; - static bool getBrokerInfo(broker::Connection& connection, BrokerInfo& info); + static bool getBrokerInfo(const broker::Connection& connection, BrokerInfo& info); ConnectionObserver(HaBroker& haBroker, const types::Uuid& self); @@ -62,6 +62,8 @@ class ConnectionObserver : public broker::ConnectionObserver void closed(broker::Connection& connection); private: + bool isSelf(const broker::Connection&); + sys::Mutex lock; HaBroker& haBroker; std::string logPrefix; diff --git a/qpid/cpp/src/qpid/ha/HaBroker.cpp b/qpid/cpp/src/qpid/ha/HaBroker.cpp index d126639813..ffbcb684bc 100644 --- a/qpid/cpp/src/qpid/ha/HaBroker.cpp +++ b/qpid/cpp/src/qpid/ha/HaBroker.cpp @@ -83,7 +83,11 @@ void HaBroker::initialize() { // FIXME aconway 2012-07-19: assumes there's a TCP transport with a meaningful port. brokerInfo = BrokerInfo( - broker.getSystem()->getNodeName(), broker.getPort(broker::Broker::TCP_TRANSPORT), systemId); + broker.getSystem()->getNodeName(), + broker.getPort(broker::Broker::TCP_TRANSPORT), + systemId); + + QPID_LOG(notice, logPrefix << "Initializing: " << brokerInfo); // Set up the management object. ManagementAgent* ma = broker.getManagementAgent(); @@ -111,8 +115,6 @@ void HaBroker::initialize() { if (!settings.brokerUrl.empty()) setBrokerUrl(Url(settings.brokerUrl)); - QPID_LOG(notice, logPrefix << "Initializing: " << brokerInfo); - // NOTE: lock is not needed in a constructor, but create one // to pass to functions that have a ScopedLock parameter. Mutex::ScopedLock l(lock); @@ -226,6 +228,7 @@ void HaBroker::setBrokerUrl(const Url& url) { if (url.empty()) throw Url::Invalid("HA broker URL is empty"); brokerUrl = url; mgmtObject->set_brokersUrl(brokerUrl.str()); + QPID_LOG(info, logPrefix << "Broker URL set to: " << url); if (backup.get()) backup->setBrokerUrl(brokerUrl); // Updating broker URL also updates defaulted client URL: if (clientUrl.empty()) updateClientUrl(l); @@ -292,6 +295,7 @@ void HaBroker::statusChanged(Mutex::ScopedLock& l) { } void HaBroker::membershipUpdated(Mutex::ScopedLock&) { + QPID_LOG(info, logPrefix << "Membership changed: " << membership); Variant::List brokers = membership.asList(); mgmtObject->set_members(brokers); broker.getManagementAgent()->raiseEvent(_qmf::EventMembersUpdate(brokers)); @@ -321,14 +325,14 @@ void HaBroker::resetMembership(const BrokerInfo& b) { void HaBroker::addBroker(const BrokerInfo& b) { Mutex::ScopedLock l(lock); membership.add(b); - QPID_LOG(debug, logPrefix << "Membership add: " << b << " now: " << membership); + QPID_LOG(debug, logPrefix << "Membership add: " << b); membershipUpdated(l); } void HaBroker::removeBroker(const Uuid& id) { Mutex::ScopedLock l(lock); membership.remove(id); - QPID_LOG(debug, logPrefix << "Membership remove: " << id << " now: " << membership); + QPID_LOG(debug, logPrefix << "Membership remove: " << id); membershipUpdated(l); } diff --git a/qpid/cpp/src/qpid/ha/HaBroker.h b/qpid/cpp/src/qpid/ha/HaBroker.h index 0ffc152097..7dabe6e35b 100644 --- a/qpid/cpp/src/qpid/ha/HaBroker.h +++ b/qpid/cpp/src/qpid/ha/HaBroker.h @@ -97,6 +97,8 @@ class HaBroker : public management::Manageable void addBroker(const BrokerInfo& b); // Add a broker to the membership. void removeBroker(const types::Uuid& id); // Remove a broker from membership. + types::Uuid getSystemId() const { return systemId; } + private: void setClientUrl(const Url&); void setBrokerUrl(const Url&); diff --git a/qpid/cpp/src/qpid/ha/Primary.cpp b/qpid/cpp/src/qpid/ha/Primary.cpp index 69c94bfc7d..45a0e246f3 100644 --- a/qpid/cpp/src/qpid/ha/Primary.cpp +++ b/qpid/cpp/src/qpid/ha/Primary.cpp @@ -225,6 +225,12 @@ void Primary::opened(broker::Connection& connection) { } void Primary::closed(broker::Connection& connection) { + // NOTE: It is possible for a backup connection to be rejected while we are + // a backup, but closed() is called after we have become primary. + // + // For this reason we do not remove from the backups map here, the backups + // map holds all the backups we know about whether connected or not. + // Mutex::ScopedLock l(lock); BrokerInfo info; if (ha::ConnectionObserver::getBrokerInfo(connection, info)) { @@ -233,12 +239,6 @@ void Primary::closed(broker::Connection& connection) { BackupMap::iterator i = backups.find(info.getSystemId()); if (i != backups.end()) i->second->setConnected(false); } - // NOTE: we do not remove from the backups map here, the backups map holds - // all the backups we know about whether connected or not. - // - // It is possible for a backup connection to be rejected while we are a backup, - // but the closed is seen after we have become primary. Removing the entry - // from backups in this case would be incorrect. } diff --git a/qpid/cpp/src/qpid/ha/RemoteBackup.cpp b/qpid/cpp/src/qpid/ha/RemoteBackup.cpp index a5693fd14e..9c7bf1e694 100644 --- a/qpid/cpp/src/qpid/ha/RemoteBackup.cpp +++ b/qpid/cpp/src/qpid/ha/RemoteBackup.cpp @@ -32,7 +32,7 @@ using sys::Mutex; using boost::bind; RemoteBackup::RemoteBackup(const BrokerInfo& info, ReplicationTest rt, bool con) : - logPrefix("Primary remote backup "+info.getLogId()+": "), + logPrefix("Primary: Remote backup "+info.getLogId()+": "), brokerInfo(info), replicationTest(rt), connected(con), reportedReady(false) {} |