From ba8f39a8b818af7773bbdda6a021a4bb8aa4459c Mon Sep 17 00:00:00 2001 From: Alan Conway Date: Fri, 22 Jun 2012 19:28:42 +0000 Subject: NO-JIRA: Simplify locking and remove member-update callback in HA code. Get rid of the separate Membership lock and put HaBroker in control of membership changes. Removes a potential deadlock which could explain some observed failures in long-running failover tests. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1353001 13f79535-47bb-0310-9956-ffa450edef68 --- qpid/cpp/src/qpid/ha/Backup.cpp | 3 +- qpid/cpp/src/qpid/ha/BrokerReplicator.cpp | 2 +- qpid/cpp/src/qpid/ha/HaBroker.cpp | 52 ++++++++++++++++++++++++++----- qpid/cpp/src/qpid/ha/HaBroker.h | 9 ++++-- qpid/cpp/src/qpid/ha/Membership.cpp | 31 +----------------- qpid/cpp/src/qpid/ha/Membership.h | 12 ++----- qpid/cpp/src/qpid/ha/Primary.cpp | 6 ++-- 7 files changed, 60 insertions(+), 55 deletions(-) diff --git a/qpid/cpp/src/qpid/ha/Backup.cpp b/qpid/cpp/src/qpid/ha/Backup.cpp index 90c615aaf5..4b9cef05bc 100644 --- a/qpid/cpp/src/qpid/ha/Backup.cpp +++ b/qpid/cpp/src/qpid/ha/Backup.cpp @@ -83,6 +83,8 @@ void Backup::initialize(const Url& brokers) { false, // durable settings.mechanism, settings.username, settings.password, false); // amq.failover + + sys::Mutex::ScopedLock l(lock); link = result.first; link->setUrl(url); replicator.reset(new BrokerReplicator(haBroker, link)); @@ -93,7 +95,6 @@ void Backup::initialize(const Url& brokers) { Backup::~Backup() { if (link) link->close(); if (replicator.get()) broker.getExchanges().destroy(replicator->getName()); - replicator.reset(); } diff --git a/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp b/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp index 8f09c5db8f..c3521162a1 100644 --- a/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp +++ b/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp @@ -546,7 +546,7 @@ void BrokerReplicator::doResponseHaBroker(Variant::Map& values) { if (mine != primary) throw Exception(QPID_MSG("Replicate default on backup (" << mine << ") does not match primary (" << primary << ")")); - haBroker.getMembership().assign(values[MEMBERS].asList()); + haBroker.setMembership(values[MEMBERS].asList()); } catch (const std::exception& e) { QPID_LOG(critical, logPrefix << "Invalid HA Broker response: " << e.what()); haBroker.shutdown(); diff --git a/qpid/cpp/src/qpid/ha/HaBroker.cpp b/qpid/cpp/src/qpid/ha/HaBroker.cpp index 4752d51190..395a3ce6fa 100644 --- a/qpid/cpp/src/qpid/ha/HaBroker.cpp +++ b/qpid/cpp/src/qpid/ha/HaBroker.cpp @@ -66,7 +66,7 @@ HaBroker::HaBroker(broker::Broker& b, const Settings& s) brokerInfo(broker.getSystem()->getNodeName(), // TODO aconway 2012-05-24: other transports? broker.getPort(broker::Broker::TCP_TRANSPORT), systemId), - membership(systemId, boost::bind(&HaBroker::membershipUpdate, this, _1)), + membership(systemId), replicationTest(s.replicateDefault.get()) { // Set up the management object. @@ -271,17 +271,53 @@ void HaBroker::statusChanged(Mutex::ScopedLock& l) { setLinkProperties(l); } -void HaBroker::membershipUpdate(const Variant::List& brokers) { - // FIXME aconway 2012-06-12: nasty callback in callback, clean up. - BrokerInfo info; - if (getStatus() == CATCHUP && getMembership().get(systemId, info) && info.getStatus() == READY) - setStatus(READY); - - // No lock, only calls thread-safe objects. +void HaBroker::membershipUpdated(const Variant::List& brokers) { + // No lock, these are thread-safe. mgmtObject->set_members(brokers); broker.getManagementAgent()->raiseEvent(_qmf::EventMembersUpdate(brokers)); } +void HaBroker::setMembership(const Variant::List& brokers) { + Mutex::ScopedLock l(lock); + membership.assign(brokers); + BrokerInfo info; + // Check if my own status has been updated to READY + if (getStatus() == CATCHUP && + membership.get(systemId, info) && info.getStatus() == READY) + setStatus(READY, l); + membershipUpdated(brokers); +} + +void HaBroker::resetMembership(const BrokerInfo& b) { + Variant::List members; + { + Mutex::ScopedLock l(lock); + membership.reset(b); + members = membership.asList(); + } + membershipUpdated(members); +} + +void HaBroker::addBroker(const BrokerInfo& b) { + Variant::List members; + { + Mutex::ScopedLock l(lock); + membership.add(b); + members = membership.asList(); + } + membershipUpdated(members); +} + +void HaBroker::removeBroker(const Uuid& id) { + Variant::List members; + { + Mutex::ScopedLock l(lock); + membership.remove(id); + members = membership.asList(); + } + membershipUpdated(members); +} + void HaBroker::setLinkProperties(Mutex::ScopedLock&) { framing::FieldTable linkProperties = broker.getLinkClientProperties(); if (isBackup(status)) { diff --git a/qpid/cpp/src/qpid/ha/HaBroker.h b/qpid/cpp/src/qpid/ha/HaBroker.h index 7ba1599c09..28fe3c755e 100644 --- a/qpid/cpp/src/qpid/ha/HaBroker.h +++ b/qpid/cpp/src/qpid/ha/HaBroker.h @@ -87,8 +87,11 @@ class HaBroker : public management::Manageable boost::shared_ptr getObserver() { return observer; } const BrokerInfo& getBrokerInfo() const { return brokerInfo; } - Membership& getMembership() { return membership; } - void membershipUpdate(const types::Variant::List&); + + void setMembership(const types::Variant::List&); // Set membership from list. + void resetMembership(const BrokerInfo& b); // Reset to contain just one member. + void addBroker(const BrokerInfo& b); // Add a broker to the membership. + void removeBroker(const types::Uuid& id); // Remove a broker from membership. private: void setClientUrl(const Url&, sys::Mutex::ScopedLock&); @@ -105,6 +108,8 @@ class HaBroker : public management::Manageable std::vector getKnownBrokers() const; + void membershipUpdated(const types::Variant::List&); + std::string logPrefix; broker::Broker& broker; types::Uuid systemId; diff --git a/qpid/cpp/src/qpid/ha/Membership.cpp b/qpid/cpp/src/qpid/ha/Membership.cpp index 34c1ccb657..b85c4d4164 100644 --- a/qpid/cpp/src/qpid/ha/Membership.cpp +++ b/qpid/cpp/src/qpid/ha/Membership.cpp @@ -28,70 +28,42 @@ namespace ha { void Membership::reset(const BrokerInfo& b) { - sys::Mutex::ScopedLock l(lock); brokers.clear(); brokers[b.getSystemId()] = b; - update(l); } void Membership::add(const BrokerInfo& b) { - sys::Mutex::ScopedLock l(lock); brokers[b.getSystemId()] = b; - update(l); } void Membership::remove(const types::Uuid& id) { - sys::Mutex::ScopedLock l(lock); BrokerInfo::Map::iterator i = brokers.find(id); if (i != brokers.end()) { brokers.erase(i); - update(l); - } + } } bool Membership::contains(const types::Uuid& id) { - sys::Mutex::ScopedLock l(lock); return brokers.find(id) != brokers.end(); } void Membership::assign(const types::Variant::List& list) { - sys::Mutex::ScopedLock l(lock); brokers.clear(); for (types::Variant::List::const_iterator i = list.begin(); i != list.end(); ++i) { BrokerInfo b(i->asMap()); brokers[b.getSystemId()] = b; } - update(l); } types::Variant::List Membership::asList() const { - sys::Mutex::ScopedLock l(lock); - return asList(l); -} - -types::Variant::List Membership::asList(sys::Mutex::ScopedLock&) const { types::Variant::List list; for (BrokerInfo::Map::const_iterator i = brokers.begin(); i != brokers.end(); ++i) list.push_back(i->second.asMap()); return list; } -void Membership::update(sys::Mutex::ScopedLock& l) { - if (updateCallback) { - types::Variant::List list = asList(l); - sys::Mutex::ScopedUnlock u(lock); - updateCallback(list); - } - QPID_LOG(debug, " HA: Membership update: " << brokers); -} - BrokerInfo::Set Membership::otherBackups() const { - sys::Mutex::ScopedLock l(lock); - return otherBackups(l); -} - -BrokerInfo::Set Membership::otherBackups(sys::Mutex::ScopedLock&) const { BrokerInfo::Set result; for (BrokerInfo::Map::const_iterator i = brokers.begin(); i != brokers.end(); ++i) if (isBackup(i->second.getStatus()) && i->second.getSystemId() != self) @@ -100,7 +72,6 @@ BrokerInfo::Set Membership::otherBackups(sys::Mutex::ScopedLock&) const { } bool Membership::get(const types::Uuid& id, BrokerInfo& result) { - sys::Mutex::ScopedLock l(lock); BrokerInfo::Map::iterator i = brokers.find(id); if (i == brokers.end()) return false; result = i->second; diff --git a/qpid/cpp/src/qpid/ha/Membership.h b/qpid/cpp/src/qpid/ha/Membership.h index 623d77970c..41245f0e06 100644 --- a/qpid/cpp/src/qpid/ha/Membership.h +++ b/qpid/cpp/src/qpid/ha/Membership.h @@ -36,14 +36,12 @@ namespace ha { /** * Keep track of the brokers in the membership. - * THREAD SAFE: updated in arbitrary connection threads. + * THREAD UNSAFE: caller must serialize */ class Membership { public: - typedef boost::function UpdateCallback; - Membership(const types::Uuid& self_, UpdateCallback updateFn) - : self(self_), updateCallback(updateFn) {} + Membership(const types::Uuid& self_) : self(self_) {} void reset(const BrokerInfo& b); ///< Reset to contain just one member. void add(const BrokerInfo& b); @@ -58,14 +56,8 @@ class Membership bool get(const types::Uuid& id, BrokerInfo& result); private: - BrokerInfo::Set otherBackups(sys::Mutex::ScopedLock&) const; - types::Variant::List asList(sys::Mutex::ScopedLock&) const; - void update(sys::Mutex::ScopedLock&); - - mutable sys::Mutex lock; types::Uuid self; BrokerInfo::Map brokers; - UpdateCallback updateCallback; }; }} // namespace qpid::ha diff --git a/qpid/cpp/src/qpid/ha/Primary.cpp b/qpid/cpp/src/qpid/ha/Primary.cpp index 1fa51b6f68..b315142e70 100644 --- a/qpid/cpp/src/qpid/ha/Primary.cpp +++ b/qpid/cpp/src/qpid/ha/Primary.cpp @@ -114,7 +114,7 @@ void Primary::checkReady(BackupMap::iterator i, Mutex::ScopedLock& l) { if (i != backups.end() && i->second->isReady()) { BrokerInfo info = i->second->getBrokerInfo(); info.setStatus(READY); - haBroker.getMembership().add(info); + haBroker.addBroker(info); initialBackups.erase(i->second); checkReady(l); } @@ -160,7 +160,7 @@ void Primary::opened(broker::Connection& connection) { else { QPID_LOG(debug, logPrefix << "Known backup connected: " << info); } - haBroker.getMembership().add(info); + haBroker.addBroker(info); } } @@ -168,7 +168,7 @@ void Primary::closed(broker::Connection& connection) { Mutex::ScopedLock l(lock); BrokerInfo info; if (ha::ConnectionObserver::getBrokerInfo(connection, info)) { - haBroker.getMembership().remove(info.getSystemId()); + haBroker.removeBroker(info.getSystemId()); QPID_LOG(debug, logPrefix << "Backup disconnected: " << info); } // NOTE: we do not modify backups here, we only add to the known backups set -- cgit v1.2.1