diff options
-rw-r--r-- | qpid/cpp/src/qpid/ha/Backup.cpp | 32 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/ha/Backup.h | 2 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/ha/BackupConnectionExcluder.h | 2 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/ha/BrokerReplicator.cpp | 3 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/ha/ConnectionObserver.cpp | 22 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/ha/ConnectionObserver.h | 4 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/ha/HaBroker.cpp | 14 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/ha/HaBroker.h | 2 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/ha/Primary.cpp | 12 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/ha/RemoteBackup.cpp | 2 |
10 files changed, 41 insertions, 54 deletions
diff --git a/qpid/cpp/src/qpid/ha/Backup.cpp b/qpid/cpp/src/qpid/ha/Backup.cpp index bac6fd23c8..e099554df6 100644 --- a/qpid/cpp/src/qpid/ha/Backup.cpp +++ b/qpid/cpp/src/qpid/ha/Backup.cpp @@ -51,35 +51,15 @@ Backup::Backup(HaBroker& hb, const Settings& s) : if (!s.brokerUrl.empty()) initialize(Url(s.brokerUrl)); } -bool Backup::isSelf(const Address& a) const { - return sys::SystemInfo::isLocalHost(a.host) && - a.port == haBroker.getBroker().getPort(a.protocol); -} - -// Remove my own address from the URL if possible. -// This isn't 100% reliable given the many ways to specify a host, -// but should work in most cases. We have additional measures to prevent -// self-connection in ConnectionObserver -Url Backup::removeSelf(const Url& brokers) const { - Url url; - for (Url::const_iterator i = brokers.begin(); i != brokers.end(); ++i) - if (!isSelf(*i)) url.push_back(*i); - if (url.empty()) - throw Url::Invalid(logPrefix+"Failover URL is empty"); - QPID_LOG(debug, logPrefix << "Failover URL (excluding self): " << url); - return url; -} - void Backup::initialize(const Url& brokers) { if (brokers.empty()) throw Url::Invalid("HA broker URL is empty"); QPID_LOG(info, logPrefix << "Connecting to cluster, broker URL: " << brokers); - Url url = removeSelf(brokers); - string protocol = url[0].protocol.empty() ? "tcp" : url[0].protocol; + string protocol = brokers[0].protocol.empty() ? "tcp" : brokers[0].protocol; types::Uuid uuid(true); // Declare the link std::pair<Link::shared_ptr, bool> result = broker.getLinks().declare( broker::QPID_NAME_PREFIX + string("ha.link.") + uuid.str(), - url[0].host, url[0].port, protocol, + brokers[0].host, brokers[0].port, protocol, false, // durable settings.mechanism, settings.username, settings.password, false); // no amq.failover - don't want to use client URL. @@ -90,7 +70,7 @@ void Backup::initialize(const Url& brokers) { replicator->initialize(); broker.getExchanges().registerExchange(replicator); } - link->setUrl(url); // Outside the lock, once set link doesn't change. + link->setUrl(brokers); // Outside the lock, once set link doesn't change. } Backup::~Backup() { @@ -107,10 +87,8 @@ void Backup::setBrokerUrl(const Url& url) { sys::Mutex::ScopedLock l(lock); linkSet = link; } - if (linkSet) { - QPID_LOG(info, logPrefix << "Broker URL set to: " << url); - link->setUrl(removeSelf(url)); // Outside lock, once set link doesn't change - } + if (linkSet) + link->setUrl(url); // Outside lock, once set link doesn't change else initialize(url); // Deferred initialization } diff --git a/qpid/cpp/src/qpid/ha/Backup.h b/qpid/cpp/src/qpid/ha/Backup.h index 1233a473ec..4f2d5babde 100644 --- a/qpid/cpp/src/qpid/ha/Backup.h +++ b/qpid/cpp/src/qpid/ha/Backup.h @@ -53,8 +53,6 @@ class Backup void setStatus(BrokerStatus); private: - bool isSelf(const Address& a) const; - Url removeSelf(const Url&) const; void initialize(const Url&); std::string logPrefix; diff --git a/qpid/cpp/src/qpid/ha/BackupConnectionExcluder.h b/qpid/cpp/src/qpid/ha/BackupConnectionExcluder.h index ef537ab90a..5a67cde922 100644 --- a/qpid/cpp/src/qpid/ha/BackupConnectionExcluder.h +++ b/qpid/cpp/src/qpid/ha/BackupConnectionExcluder.h @@ -36,7 +36,7 @@ class BackupConnectionExcluder : public broker::ConnectionObserver { public: void opened(broker::Connection& connection) { - QPID_LOG(debug, "Backup broker rejected connection "+connection.getMgmtId()); + QPID_LOG(debug, "Backup: Rejected connection "+connection.getMgmtId()); connection.abort(); } diff --git a/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp b/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp index c8c4a42d72..cf413f35d0 100644 --- a/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp +++ b/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp @@ -474,7 +474,8 @@ void BrokerReplicator::doResponseExchange(Variant::Map& values) { boost::shared_ptr<Exchange> exchange = createExchange( name, values[TYPE].asString(), values[DURABLE].asBool(), args, getAltExchange(values[ALTEXCHANGE])); - QPID_LOG_IF(debug, !exchange, logPrefix << "Exchange already exists: " << name); + // It is normal for the exchange to already exist if we are failing over. + QPID_LOG_IF(debug, !exchange, logPrefix << "Exchange already replicated: " << name); } namespace { diff --git a/qpid/cpp/src/qpid/ha/ConnectionObserver.cpp b/qpid/cpp/src/qpid/ha/ConnectionObserver.cpp index 3f7a1710d9..81ba3e4301 100644 --- a/qpid/cpp/src/qpid/ha/ConnectionObserver.cpp +++ b/qpid/cpp/src/qpid/ha/ConnectionObserver.cpp @@ -32,7 +32,7 @@ namespace ha { ConnectionObserver::ConnectionObserver(HaBroker& hb, const types::Uuid& uuid) : haBroker(hb), logPrefix("Connections: "), self(uuid) {} -bool ConnectionObserver::getBrokerInfo(broker::Connection& connection, BrokerInfo& info) { +bool ConnectionObserver::getBrokerInfo(const broker::Connection& connection, BrokerInfo& info) { framing::FieldTable ft; if (connection.getClientProperties().getTable(ConnectionObserver::BACKUP_TAG, ft)) { info = BrokerInfo(ft); @@ -51,21 +51,23 @@ ConnectionObserver::ObserverPtr ConnectionObserver::getObserver() { return observer; } +bool ConnectionObserver::isSelf(const broker::Connection& connection) { + BrokerInfo info; + return getBrokerInfo(connection, info) && info.getSystemId() == self; +} + void ConnectionObserver::opened(broker::Connection& connection) { try { if (connection.isLink()) return; // Allow outgoing links. if (connection.getClientProperties().isSet(ADMIN_TAG)) { - QPID_LOG(debug, logPrefix << "Allowing admin connection: " + QPID_LOG(debug, logPrefix << "Accepted admin connection: " << connection.getMgmtId()); return; // No need to call observer, always allow admins. } - BrokerInfo info; // Avoid self connections. - if (getBrokerInfo(connection, info)) { - if (info.getSystemId() == self) { - QPID_LOG(debug, "HA broker rejected self connection "+connection.getMgmtId()); - connection.abort(); - } - + if (isSelf(connection)) { // Reject self connections + QPID_LOG(debug, logPrefix << "Rejected self connection "+connection.getMgmtId()); + connection.abort(); + return; } ObserverPtr o(getObserver()); if (o) o->opened(connection); @@ -77,8 +79,8 @@ void ConnectionObserver::opened(broker::Connection& connection) { } void ConnectionObserver::closed(broker::Connection& connection) { + if (isSelf(connection)) return; // Ignore closing of self connections. try { - BrokerInfo info; ObserverPtr o(getObserver()); if (o) o->closed(connection); } diff --git a/qpid/cpp/src/qpid/ha/ConnectionObserver.h b/qpid/cpp/src/qpid/ha/ConnectionObserver.h index 5c1dabe8f8..e3a6d1154a 100644 --- a/qpid/cpp/src/qpid/ha/ConnectionObserver.h +++ b/qpid/cpp/src/qpid/ha/ConnectionObserver.h @@ -51,7 +51,7 @@ class ConnectionObserver : public broker::ConnectionObserver static const std::string ADMIN_TAG; static const std::string BACKUP_TAG; - static bool getBrokerInfo(broker::Connection& connection, BrokerInfo& info); + static bool getBrokerInfo(const broker::Connection& connection, BrokerInfo& info); ConnectionObserver(HaBroker& haBroker, const types::Uuid& self); @@ -62,6 +62,8 @@ class ConnectionObserver : public broker::ConnectionObserver void closed(broker::Connection& connection); private: + bool isSelf(const broker::Connection&); + sys::Mutex lock; HaBroker& haBroker; std::string logPrefix; diff --git a/qpid/cpp/src/qpid/ha/HaBroker.cpp b/qpid/cpp/src/qpid/ha/HaBroker.cpp index d126639813..ffbcb684bc 100644 --- a/qpid/cpp/src/qpid/ha/HaBroker.cpp +++ b/qpid/cpp/src/qpid/ha/HaBroker.cpp @@ -83,7 +83,11 @@ void HaBroker::initialize() { // FIXME aconway 2012-07-19: assumes there's a TCP transport with a meaningful port. brokerInfo = BrokerInfo( - broker.getSystem()->getNodeName(), broker.getPort(broker::Broker::TCP_TRANSPORT), systemId); + broker.getSystem()->getNodeName(), + broker.getPort(broker::Broker::TCP_TRANSPORT), + systemId); + + QPID_LOG(notice, logPrefix << "Initializing: " << brokerInfo); // Set up the management object. ManagementAgent* ma = broker.getManagementAgent(); @@ -111,8 +115,6 @@ void HaBroker::initialize() { if (!settings.brokerUrl.empty()) setBrokerUrl(Url(settings.brokerUrl)); - QPID_LOG(notice, logPrefix << "Initializing: " << brokerInfo); - // NOTE: lock is not needed in a constructor, but create one // to pass to functions that have a ScopedLock parameter. Mutex::ScopedLock l(lock); @@ -226,6 +228,7 @@ void HaBroker::setBrokerUrl(const Url& url) { if (url.empty()) throw Url::Invalid("HA broker URL is empty"); brokerUrl = url; mgmtObject->set_brokersUrl(brokerUrl.str()); + QPID_LOG(info, logPrefix << "Broker URL set to: " << url); if (backup.get()) backup->setBrokerUrl(brokerUrl); // Updating broker URL also updates defaulted client URL: if (clientUrl.empty()) updateClientUrl(l); @@ -292,6 +295,7 @@ void HaBroker::statusChanged(Mutex::ScopedLock& l) { } void HaBroker::membershipUpdated(Mutex::ScopedLock&) { + QPID_LOG(info, logPrefix << "Membership changed: " << membership); Variant::List brokers = membership.asList(); mgmtObject->set_members(brokers); broker.getManagementAgent()->raiseEvent(_qmf::EventMembersUpdate(brokers)); @@ -321,14 +325,14 @@ void HaBroker::resetMembership(const BrokerInfo& b) { void HaBroker::addBroker(const BrokerInfo& b) { Mutex::ScopedLock l(lock); membership.add(b); - QPID_LOG(debug, logPrefix << "Membership add: " << b << " now: " << membership); + QPID_LOG(debug, logPrefix << "Membership add: " << b); membershipUpdated(l); } void HaBroker::removeBroker(const Uuid& id) { Mutex::ScopedLock l(lock); membership.remove(id); - QPID_LOG(debug, logPrefix << "Membership remove: " << id << " now: " << membership); + QPID_LOG(debug, logPrefix << "Membership remove: " << id); membershipUpdated(l); } diff --git a/qpid/cpp/src/qpid/ha/HaBroker.h b/qpid/cpp/src/qpid/ha/HaBroker.h index 0ffc152097..7dabe6e35b 100644 --- a/qpid/cpp/src/qpid/ha/HaBroker.h +++ b/qpid/cpp/src/qpid/ha/HaBroker.h @@ -97,6 +97,8 @@ class HaBroker : public management::Manageable void addBroker(const BrokerInfo& b); // Add a broker to the membership. void removeBroker(const types::Uuid& id); // Remove a broker from membership. + types::Uuid getSystemId() const { return systemId; } + private: void setClientUrl(const Url&); void setBrokerUrl(const Url&); diff --git a/qpid/cpp/src/qpid/ha/Primary.cpp b/qpid/cpp/src/qpid/ha/Primary.cpp index 69c94bfc7d..45a0e246f3 100644 --- a/qpid/cpp/src/qpid/ha/Primary.cpp +++ b/qpid/cpp/src/qpid/ha/Primary.cpp @@ -225,6 +225,12 @@ void Primary::opened(broker::Connection& connection) { } void Primary::closed(broker::Connection& connection) { + // NOTE: It is possible for a backup connection to be rejected while we are + // a backup, but closed() is called after we have become primary. + // + // For this reason we do not remove from the backups map here, the backups + // map holds all the backups we know about whether connected or not. + // Mutex::ScopedLock l(lock); BrokerInfo info; if (ha::ConnectionObserver::getBrokerInfo(connection, info)) { @@ -233,12 +239,6 @@ void Primary::closed(broker::Connection& connection) { BackupMap::iterator i = backups.find(info.getSystemId()); if (i != backups.end()) i->second->setConnected(false); } - // NOTE: we do not remove from the backups map here, the backups map holds - // all the backups we know about whether connected or not. - // - // It is possible for a backup connection to be rejected while we are a backup, - // but the closed is seen after we have become primary. Removing the entry - // from backups in this case would be incorrect. } diff --git a/qpid/cpp/src/qpid/ha/RemoteBackup.cpp b/qpid/cpp/src/qpid/ha/RemoteBackup.cpp index a5693fd14e..9c7bf1e694 100644 --- a/qpid/cpp/src/qpid/ha/RemoteBackup.cpp +++ b/qpid/cpp/src/qpid/ha/RemoteBackup.cpp @@ -32,7 +32,7 @@ using sys::Mutex; using boost::bind; RemoteBackup::RemoteBackup(const BrokerInfo& info, ReplicationTest rt, bool con) : - logPrefix("Primary remote backup "+info.getLogId()+": "), + logPrefix("Primary: Remote backup "+info.getLogId()+": "), brokerInfo(info), replicationTest(rt), connected(con), reportedReady(false) {} |