From 25c6f2104c02054b05d362f517268c1235bc36a2 Mon Sep 17 00:00:00 2001 From: Alan Conway Date: Wed, 23 Jan 2013 21:58:03 +0000 Subject: NO-JIRA: HA refactor, re-organise code for clarity and thread safety. Introduce Role base class. Primary and Backup are now subclasses of Role. Moved backup/primary specific code from HaBroker to the Backup and Primary roles. HaBroker always holds a single Role, via a thread-safe RoleHolder. RoleHolder ensures atomic transition between roles: the old role is deleted before the new role is created. Membership is now independently thread safe, breaking the potential deadlock between HaBroker and the Roles. Logging improvements and other minor cleanup. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1437771 13f79535-47bb-0310-9956-ffa450edef68 --- qpid/cpp/src/qpid/ha/Membership.cpp | 110 +++++++++++++++++++++++++++++++++--- 1 file changed, 103 insertions(+), 7 deletions(-) (limited to 'qpid/cpp/src/qpid/ha/Membership.cpp') diff --git a/qpid/cpp/src/qpid/ha/Membership.cpp b/qpid/cpp/src/qpid/ha/Membership.cpp index 74580f9b1e..d33d57c37f 100644 --- a/qpid/cpp/src/qpid/ha/Membership.cpp +++ b/qpid/cpp/src/qpid/ha/Membership.cpp @@ -19,6 +19,12 @@ * */ #include "Membership.h" +#include "HaBroker.h" +#include "qpid/broker/Broker.h" +#include "qpid/management/ManagementAgent.h" +#include "qpid/types/Variant.h" +#include "qmf/org/apache/qpid/ha/EventMembersUpdate.h" +#include "qmf/org/apache/qpid/ha/HaBroker.h" #include #include #include @@ -26,37 +32,57 @@ namespace qpid { namespace ha { +namespace _qmf = ::qmf::org::apache::qpid::ha; -void Membership::reset(const BrokerInfo& b) { +using sys::Mutex; +using types::Variant; + +Membership::Membership(const BrokerInfo& info, HaBroker& b) + : haBroker(b), self(info.getSystemId()) +{ + brokers[self] = info; +} + +void Membership::clear() { + Mutex::ScopedLock l(lock); + BrokerInfo me = brokers[self]; brokers.clear(); - brokers[b.getSystemId()] = b; + brokers[self] = me; } void Membership::add(const BrokerInfo& b) { + Mutex::ScopedLock l(lock); brokers[b.getSystemId()] = b; + update(l); } void Membership::remove(const types::Uuid& id) { + Mutex::ScopedLock l(lock); BrokerInfo::Map::iterator i = brokers.find(id); if (i != brokers.end()) { brokers.erase(i); - } + update(l); + } } bool Membership::contains(const types::Uuid& id) { + Mutex::ScopedLock l(lock); return brokers.find(id) != brokers.end(); } void Membership::assign(const types::Variant::List& list) { + Mutex::ScopedLock l(lock); brokers.clear(); for (types::Variant::List::const_iterator i = list.begin(); i != list.end(); ++i) { BrokerInfo b(i->asMap()); brokers[b.getSystemId()] = b; } + update(l); } types::Variant::List Membership::asList() const { + Mutex::ScopedLock l(lock); types::Variant::List list; for (BrokerInfo::Map::const_iterator i = brokers.begin(); i != brokers.end(); ++i) list.push_back(i->second.asMap()); @@ -64,6 +90,7 @@ types::Variant::List Membership::asList() const { } BrokerInfo::Set Membership::otherBackups() const { + Mutex::ScopedLock l(lock); BrokerInfo::Set result; for (BrokerInfo::Map::const_iterator i = brokers.begin(); i != brokers.end(); ++i) if (i->second.getStatus() == READY && i->second.getSystemId() != self) @@ -71,15 +98,84 @@ BrokerInfo::Set Membership::otherBackups() const { return result; } -bool Membership::get(const types::Uuid& id, BrokerInfo& result) { - BrokerInfo::Map::iterator i = brokers.find(id); +bool Membership::get(const types::Uuid& id, BrokerInfo& result) const { + Mutex::ScopedLock l(lock); + BrokerInfo::Map::const_iterator i = brokers.find(id); if (i == brokers.end()) return false; result = i->second; return true; } -std::ostream& operator<<(std::ostream& o, const Membership& members) { - return o << members.brokers; +void Membership::update(Mutex::ScopedLock& l) { + QPID_LOG(info, "Membership: " << brokers); + Variant::List brokers = asList(); + if (mgmtObject) mgmtObject->set_status(printable(getStatus(l)).str()); + if (mgmtObject) mgmtObject->set_members(brokers); + haBroker.getBroker().getManagementAgent()->raiseEvent( + _qmf::EventMembersUpdate(brokers)); +} + +void Membership::setMgmtObject(boost::shared_ptr<_qmf::HaBroker> mo) { + Mutex::ScopedLock l(lock); + mgmtObject = mo; + update(l); +} + + +namespace { +bool checkTransition(BrokerStatus from, BrokerStatus to) { + // Legal state transitions. Initial state is JOINING, ACTIVE is terminal. + static const BrokerStatus TRANSITIONS[][2] = { + { STANDALONE, JOINING }, // Initialization of backup broker + { JOINING, CATCHUP }, // Connected to primary + { JOINING, RECOVERING }, // Chosen as initial primary. + { CATCHUP, READY }, // Caught up all queues, ready to take over. + { READY, RECOVERING }, // Chosen as new primary + { READY, CATCHUP }, // Timed out failing over, demoted to catch-up. + { RECOVERING, ACTIVE } // All expected backups are ready + }; + static const size_t N = sizeof(TRANSITIONS)/sizeof(TRANSITIONS[0]); + for (size_t i = 0; i < N; ++i) { + if (TRANSITIONS[i][0] == from && TRANSITIONS[i][1] == to) + return true; + } + return false; +} +} // namespace + +void Membership::setStatus(BrokerStatus newStatus) { + BrokerStatus status = getStatus(); + QPID_LOG(info, "Status change: " + << printable(status) << " -> " << printable(newStatus)); + bool legal = checkTransition(status, newStatus); + if (!legal) { + haBroker.shutdown(QPID_MSG("Illegal state transition: " << printable(status) + << " -> " << printable(newStatus))); + } + + Mutex::ScopedLock l(lock); + brokers[self].setStatus(newStatus); + if (mgmtObject) mgmtObject->set_status(printable(newStatus).str()); + update(l); +} + +BrokerStatus Membership::getStatus() const { + Mutex::ScopedLock l(lock); + return getStatus(l); +} + +BrokerStatus Membership::getStatus(sys::Mutex::ScopedLock&) const { + BrokerInfo::Map::const_iterator i = brokers.find(self); + assert(i != brokers.end()); + return i->second.getStatus(); +} + +BrokerInfo Membership::getInfo() const { + Mutex::ScopedLock l(lock); + BrokerInfo::Map::const_iterator i = brokers.find(self); + assert(i != brokers.end()); + return i->second; } +// FIXME aconway 2013-01-23: move to .h? }} // namespace qpid::ha -- cgit v1.2.1