diff options
-rw-r--r-- | qpid/cpp/src/qpid/ha/Backup.cpp | 32 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/ha/Backup.h | 2 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/ha/BackupConnectionExcluder.h | 2 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/ha/BrokerReplicator.cpp | 3 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/ha/ConnectionObserver.cpp | 48 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/ha/ConnectionObserver.h | 4 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/ha/HaBroker.cpp | 14 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/ha/HaBroker.h | 2 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/ha/Primary.cpp | 15 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/ha/RemoteBackup.cpp | 2 | ||||
-rwxr-xr-x | qpid/cpp/src/tests/ha_tests.py | 11 |
11 files changed, 68 insertions, 67 deletions
diff --git a/qpid/cpp/src/qpid/ha/Backup.cpp b/qpid/cpp/src/qpid/ha/Backup.cpp index bac6fd23c8..e099554df6 100644 --- a/qpid/cpp/src/qpid/ha/Backup.cpp +++ b/qpid/cpp/src/qpid/ha/Backup.cpp @@ -51,35 +51,15 @@ Backup::Backup(HaBroker& hb, const Settings& s) : if (!s.brokerUrl.empty()) initialize(Url(s.brokerUrl)); } -bool Backup::isSelf(const Address& a) const { - return sys::SystemInfo::isLocalHost(a.host) && - a.port == haBroker.getBroker().getPort(a.protocol); -} - -// Remove my own address from the URL if possible. -// This isn't 100% reliable given the many ways to specify a host, -// but should work in most cases. We have additional measures to prevent -// self-connection in ConnectionObserver -Url Backup::removeSelf(const Url& brokers) const { - Url url; - for (Url::const_iterator i = brokers.begin(); i != brokers.end(); ++i) - if (!isSelf(*i)) url.push_back(*i); - if (url.empty()) - throw Url::Invalid(logPrefix+"Failover URL is empty"); - QPID_LOG(debug, logPrefix << "Failover URL (excluding self): " << url); - return url; -} - void Backup::initialize(const Url& brokers) { if (brokers.empty()) throw Url::Invalid("HA broker URL is empty"); QPID_LOG(info, logPrefix << "Connecting to cluster, broker URL: " << brokers); - Url url = removeSelf(brokers); - string protocol = url[0].protocol.empty() ? "tcp" : url[0].protocol; + string protocol = brokers[0].protocol.empty() ? "tcp" : brokers[0].protocol; types::Uuid uuid(true); // Declare the link std::pair<Link::shared_ptr, bool> result = broker.getLinks().declare( broker::QPID_NAME_PREFIX + string("ha.link.") + uuid.str(), - url[0].host, url[0].port, protocol, + brokers[0].host, brokers[0].port, protocol, false, // durable settings.mechanism, settings.username, settings.password, false); // no amq.failover - don't want to use client URL. @@ -90,7 +70,7 @@ void Backup::initialize(const Url& brokers) { replicator->initialize(); broker.getExchanges().registerExchange(replicator); } - link->setUrl(url); // Outside the lock, once set link doesn't change. + link->setUrl(brokers); // Outside the lock, once set link doesn't change. } Backup::~Backup() { @@ -107,10 +87,8 @@ void Backup::setBrokerUrl(const Url& url) { sys::Mutex::ScopedLock l(lock); linkSet = link; } - if (linkSet) { - QPID_LOG(info, logPrefix << "Broker URL set to: " << url); - link->setUrl(removeSelf(url)); // Outside lock, once set link doesn't change - } + if (linkSet) + link->setUrl(url); // Outside lock, once set link doesn't change else initialize(url); // Deferred initialization } diff --git a/qpid/cpp/src/qpid/ha/Backup.h b/qpid/cpp/src/qpid/ha/Backup.h index 1233a473ec..4f2d5babde 100644 --- a/qpid/cpp/src/qpid/ha/Backup.h +++ b/qpid/cpp/src/qpid/ha/Backup.h @@ -53,8 +53,6 @@ class Backup void setStatus(BrokerStatus); private: - bool isSelf(const Address& a) const; - Url removeSelf(const Url&) const; void initialize(const Url&); std::string logPrefix; diff --git a/qpid/cpp/src/qpid/ha/BackupConnectionExcluder.h b/qpid/cpp/src/qpid/ha/BackupConnectionExcluder.h index ef537ab90a..5a67cde922 100644 --- a/qpid/cpp/src/qpid/ha/BackupConnectionExcluder.h +++ b/qpid/cpp/src/qpid/ha/BackupConnectionExcluder.h @@ -36,7 +36,7 @@ class BackupConnectionExcluder : public broker::ConnectionObserver { public: void opened(broker::Connection& connection) { - QPID_LOG(debug, "Backup broker rejected connection "+connection.getMgmtId()); + QPID_LOG(debug, "Backup: Rejected connection "+connection.getMgmtId()); connection.abort(); } diff --git a/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp b/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp index 91a4bf242b..b7d36c641b 100644 --- a/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp +++ b/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp @@ -472,7 +472,8 @@ void BrokerReplicator::doResponseExchange(Variant::Map& values) { boost::shared_ptr<Exchange> exchange = createExchange( name, values[TYPE].asString(), values[DURABLE].asBool(), args, getAltExchange(values[ALTEXCHANGE])); - QPID_LOG_IF(debug, !exchange, logPrefix << "Exchange already exists: " << name); + // It is normal for the exchange to already exist if we are failing over. + QPID_LOG_IF(debug, !exchange, logPrefix << "Exchange already replicated: " << name); } namespace { diff --git a/qpid/cpp/src/qpid/ha/ConnectionObserver.cpp b/qpid/cpp/src/qpid/ha/ConnectionObserver.cpp index 553f50d802..81ba3e4301 100644 --- a/qpid/cpp/src/qpid/ha/ConnectionObserver.cpp +++ b/qpid/cpp/src/qpid/ha/ConnectionObserver.cpp @@ -32,7 +32,7 @@ namespace ha { ConnectionObserver::ConnectionObserver(HaBroker& hb, const types::Uuid& uuid) : haBroker(hb), logPrefix("Connections: "), self(uuid) {} -bool ConnectionObserver::getBrokerInfo(broker::Connection& connection, BrokerInfo& info) { +bool ConnectionObserver::getBrokerInfo(const broker::Connection& connection, BrokerInfo& info) { framing::FieldTable ft; if (connection.getClientProperties().getTable(ConnectionObserver::BACKUP_TAG, ft)) { info = BrokerInfo(ft); @@ -51,29 +51,43 @@ ConnectionObserver::ObserverPtr ConnectionObserver::getObserver() { return observer; } +bool ConnectionObserver::isSelf(const broker::Connection& connection) { + BrokerInfo info; + return getBrokerInfo(connection, info) && info.getSystemId() == self; +} + void ConnectionObserver::opened(broker::Connection& connection) { - if (connection.isLink()) return; // Allow outgoing links. - if (connection.getClientProperties().isSet(ADMIN_TAG)) { - QPID_LOG(debug, logPrefix << "Allowing admin connection: " - << connection.getMgmtId()); - return; // No need to call observer, always allow admins. - } - BrokerInfo info; // Avoid self connections. - if (getBrokerInfo(connection, info)) { - if (info.getSystemId() == self) { - QPID_LOG(debug, "HA broker rejected self connection "+connection.getMgmtId()); + try { + if (connection.isLink()) return; // Allow outgoing links. + if (connection.getClientProperties().isSet(ADMIN_TAG)) { + QPID_LOG(debug, logPrefix << "Accepted admin connection: " + << connection.getMgmtId()); + return; // No need to call observer, always allow admins. + } + if (isSelf(connection)) { // Reject self connections + QPID_LOG(debug, logPrefix << "Rejected self connection "+connection.getMgmtId()); connection.abort(); + return; } - + ObserverPtr o(getObserver()); + if (o) o->opened(connection); + } + catch (const std::exception& e) { + QPID_LOG(error, logPrefix << "Open error: " << e.what()); + throw; } - ObserverPtr o(getObserver()); - if (o) o->opened(connection); } void ConnectionObserver::closed(broker::Connection& connection) { - BrokerInfo info; - ObserverPtr o(getObserver()); - if (o) o->closed(connection); + if (isSelf(connection)) return; // Ignore closing of self connections. + try { + ObserverPtr o(getObserver()); + if (o) o->closed(connection); + } + catch (const std::exception& e) { + QPID_LOG(error, logPrefix << "Close error: " << e.what()); + throw; + } } const std::string ConnectionObserver::ADMIN_TAG="qpid.ha-admin"; diff --git a/qpid/cpp/src/qpid/ha/ConnectionObserver.h b/qpid/cpp/src/qpid/ha/ConnectionObserver.h index 5c1dabe8f8..e3a6d1154a 100644 --- a/qpid/cpp/src/qpid/ha/ConnectionObserver.h +++ b/qpid/cpp/src/qpid/ha/ConnectionObserver.h @@ -51,7 +51,7 @@ class ConnectionObserver : public broker::ConnectionObserver static const std::string ADMIN_TAG; static const std::string BACKUP_TAG; - static bool getBrokerInfo(broker::Connection& connection, BrokerInfo& info); + static bool getBrokerInfo(const broker::Connection& connection, BrokerInfo& info); ConnectionObserver(HaBroker& haBroker, const types::Uuid& self); @@ -62,6 +62,8 @@ class ConnectionObserver : public broker::ConnectionObserver void closed(broker::Connection& connection); private: + bool isSelf(const broker::Connection&); + sys::Mutex lock; HaBroker& haBroker; std::string logPrefix; diff --git a/qpid/cpp/src/qpid/ha/HaBroker.cpp b/qpid/cpp/src/qpid/ha/HaBroker.cpp index fc74ce633a..3c984a023a 100644 --- a/qpid/cpp/src/qpid/ha/HaBroker.cpp +++ b/qpid/cpp/src/qpid/ha/HaBroker.cpp @@ -83,7 +83,11 @@ void HaBroker::initialize() { // FIXME aconway 2012-07-19: assumes there's a TCP transport with a meaningful port. brokerInfo = BrokerInfo( - broker.getSystem()->getNodeName(), broker.getPort(broker::Broker::TCP_TRANSPORT), systemId); + broker.getSystem()->getNodeName(), + broker.getPort(broker::Broker::TCP_TRANSPORT), + systemId); + + QPID_LOG(notice, logPrefix << "Initializing: " << brokerInfo); // Set up the management object. ManagementAgent* ma = broker.getManagementAgent(); @@ -111,8 +115,6 @@ void HaBroker::initialize() { if (!settings.brokerUrl.empty()) setBrokerUrl(Url(settings.brokerUrl)); - QPID_LOG(notice, logPrefix << "Initializing: " << brokerInfo); - // NOTE: lock is not needed in a constructor, but create one // to pass to functions that have a ScopedLock parameter. Mutex::ScopedLock l(lock); @@ -226,6 +228,7 @@ void HaBroker::setBrokerUrl(const Url& url) { if (url.empty()) throw Url::Invalid("HA broker URL is empty"); brokerUrl = url; mgmtObject->set_brokersUrl(brokerUrl.str()); + QPID_LOG(info, logPrefix << "Broker URL set to: " << url); if (backup.get()) backup->setBrokerUrl(brokerUrl); // Updating broker URL also updates defaulted client URL: if (clientUrl.empty()) updateClientUrl(l); @@ -292,6 +295,7 @@ void HaBroker::statusChanged(Mutex::ScopedLock& l) { } void HaBroker::membershipUpdated(Mutex::ScopedLock&) { + QPID_LOG(info, logPrefix << "Membership changed: " << membership); Variant::List brokers = membership.asList(); mgmtObject->set_members(brokers); broker.getManagementAgent()->raiseEvent(_qmf::EventMembersUpdate(brokers)); @@ -321,14 +325,14 @@ void HaBroker::resetMembership(const BrokerInfo& b) { void HaBroker::addBroker(const BrokerInfo& b) { Mutex::ScopedLock l(lock); membership.add(b); - QPID_LOG(debug, logPrefix << "Membership add: " << b << " now: " << membership); + QPID_LOG(debug, logPrefix << "Membership add: " << b); membershipUpdated(l); } void HaBroker::removeBroker(const Uuid& id) { Mutex::ScopedLock l(lock); membership.remove(id); - QPID_LOG(debug, logPrefix << "Membership remove: " << id << " now: " << membership); + QPID_LOG(debug, logPrefix << "Membership remove: " << id); membershipUpdated(l); } diff --git a/qpid/cpp/src/qpid/ha/HaBroker.h b/qpid/cpp/src/qpid/ha/HaBroker.h index 0ffc152097..7dabe6e35b 100644 --- a/qpid/cpp/src/qpid/ha/HaBroker.h +++ b/qpid/cpp/src/qpid/ha/HaBroker.h @@ -97,6 +97,8 @@ class HaBroker : public management::Manageable void addBroker(const BrokerInfo& b); // Add a broker to the membership. void removeBroker(const types::Uuid& id); // Remove a broker from membership. + types::Uuid getSystemId() const { return systemId; } + private: void setClientUrl(const Url&); void setBrokerUrl(const Url&); diff --git a/qpid/cpp/src/qpid/ha/Primary.cpp b/qpid/cpp/src/qpid/ha/Primary.cpp index e7aa4858be..c3261533e6 100644 --- a/qpid/cpp/src/qpid/ha/Primary.cpp +++ b/qpid/cpp/src/qpid/ha/Primary.cpp @@ -190,11 +190,13 @@ void Primary::queueDestroy(const QueuePtr& q) { } void Primary::opened(broker::Connection& connection) { + QPID_LOG(critical, "FIXME opened " << connection.getMgmtId()); BrokerInfo info; if (ha::ConnectionObserver::getBrokerInfo(connection, info)) { Mutex::ScopedLock l(lock); BackupMap::iterator i = backups.find(info.getSystemId()); if (i == backups.end()) { + QPID_LOG(debug, logPrefix << "New backup connected: " << info); boost::shared_ptr<RemoteBackup> backup( new RemoteBackup(info, haBroker.getReplicationTest(), true)); { @@ -203,7 +205,6 @@ void Primary::opened(broker::Connection& connection) { backup->setInitialQueues(haBroker.getBroker().getQueues(), false); } backups[info.getSystemId()] = backup; - QPID_LOG(debug, logPrefix << "New backup connected: " << info); } else { QPID_LOG(debug, logPrefix << "Known backup connected: " << info); @@ -219,6 +220,12 @@ void Primary::opened(broker::Connection& connection) { } void Primary::closed(broker::Connection& connection) { + // NOTE: It is possible for a backup connection to be rejected while we are + // a backup, but closed() is called after we have become primary. + // + // For this reason we do not remove from the backups map here, the backups + // map holds all the backups we know about whether connected or not. + // Mutex::ScopedLock l(lock); BrokerInfo info; if (ha::ConnectionObserver::getBrokerInfo(connection, info)) { @@ -227,12 +234,6 @@ void Primary::closed(broker::Connection& connection) { BackupMap::iterator i = backups.find(info.getSystemId()); if (i != backups.end()) i->second->setConnected(false); } - // NOTE: we do not remove from the backups map here, the backups map holds - // all the backups we know about whether connected or not. - // - // It is possible for a backup connection to be rejected while we are a backup, - // but the closed is seen after we have become primary. Removing the entry - // from backups in this case would be incorrect. } diff --git a/qpid/cpp/src/qpid/ha/RemoteBackup.cpp b/qpid/cpp/src/qpid/ha/RemoteBackup.cpp index a5693fd14e..9c7bf1e694 100644 --- a/qpid/cpp/src/qpid/ha/RemoteBackup.cpp +++ b/qpid/cpp/src/qpid/ha/RemoteBackup.cpp @@ -32,7 +32,7 @@ using sys::Mutex; using boost::bind; RemoteBackup::RemoteBackup(const BrokerInfo& info, ReplicationTest rt, bool con) : - logPrefix("Primary remote backup "+info.getLogId()+": "), + logPrefix("Primary: Remote backup "+info.getLogId()+": "), brokerInfo(info), replicationTest(rt), connected(con), reportedReady(false) {} diff --git a/qpid/cpp/src/tests/ha_tests.py b/qpid/cpp/src/tests/ha_tests.py index 414ede7cca..246b0ed423 100755 --- a/qpid/cpp/src/tests/ha_tests.py +++ b/qpid/cpp/src/tests/ha_tests.py @@ -63,7 +63,6 @@ class HaBroker(Broker): args = copy(args) args += ["--load-module", BrokerTest.ha_lib, "--log-enable=debug+:ha::", - "--log-enable=trace+:ha::", # FIXME aconway 2012-07-12: # FIXME aconway 2012-02-13: workaround slow link failover. "--link-maintenace-interval=0.1", "--ha-cluster=%s"%ha_cluster] @@ -112,9 +111,11 @@ class HaBroker(Broker): def wait_status(self, status): def try_get_status(): # Ignore ConnectionError, the broker may not be up yet. - try: return self.ha_status() == status; + try: + self._status = self.ha_status() + return self._status == status; except ConnectionError: return False - assert retry(try_get_status, timeout=20), "%s status != %r"%(self, status) + assert retry(try_get_status, timeout=20), "%s %r != %r"%(self, self._status, status) # FIXME aconway 2012-05-01: do direct python call to qpid-config code. def qpid_config(self, args): @@ -761,7 +762,7 @@ acl deny all all s1.sender("ex").send("foo"); self.assertEqual(s1.receiver("q").fetch().content, "foo") - def test_alterante_exchange(self): + def test_alternate_exchange(self): """Verify that alternate-exchange on exchanges and queues is propagated to new members of a cluster. """ cluster = HaCluster(self, 2) @@ -964,7 +965,7 @@ class RecoveryTests(BrokerTest): """ cluster = HaCluster(self, 3, args=["--ha-backup-timeout=0.5"]); cluster[0].wait_status("active") # Primary ready - for b in cluster[1:4]: b.wait_status("ready") # Backups ready + for b in cluster[1:3]: b.wait_status("ready") # Backups ready for i in [0,1]: cluster.kill(i, False) cluster[2].promote() # New primary, backups will be 1 and 2 cluster[2].wait_status("recovering") |