summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/qpid/ha/Membership.cpp
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2013-01-23 21:58:03 +0000
committerAlan Conway <aconway@apache.org>2013-01-23 21:58:03 +0000
commit25c6f2104c02054b05d362f517268c1235bc36a2 (patch)
treedde0a9005fc1880654b100bd43729ae2ee966c10 /qpid/cpp/src/qpid/ha/Membership.cpp
parent5705c6575e717d74e6bd2a942b7ee085eb62cffb (diff)
downloadqpid-python-25c6f2104c02054b05d362f517268c1235bc36a2.tar.gz
NO-JIRA: HA refactor, re-organise code for clarity and thread safety.
Introduce Role base class. Primary and Backup are now subclasses of Role. Moved backup/primary specific code from HaBroker to the Backup and Primary roles. HaBroker always holds a single Role, via a thread-safe RoleHolder. RoleHolder ensures atomic transition between roles: the old role is deleted before the new role is created. Membership is now independently thread safe, breaking the potential deadlock between HaBroker and the Roles. Logging improvements and other minor cleanup. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1437771 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src/qpid/ha/Membership.cpp')
-rw-r--r--qpid/cpp/src/qpid/ha/Membership.cpp110
1 files changed, 103 insertions, 7 deletions
diff --git a/qpid/cpp/src/qpid/ha/Membership.cpp b/qpid/cpp/src/qpid/ha/Membership.cpp
index 74580f9b1e..d33d57c37f 100644
--- a/qpid/cpp/src/qpid/ha/Membership.cpp
+++ b/qpid/cpp/src/qpid/ha/Membership.cpp
@@ -19,6 +19,12 @@
*
*/
#include "Membership.h"
+#include "HaBroker.h"
+#include "qpid/broker/Broker.h"
+#include "qpid/management/ManagementAgent.h"
+#include "qpid/types/Variant.h"
+#include "qmf/org/apache/qpid/ha/EventMembersUpdate.h"
+#include "qmf/org/apache/qpid/ha/HaBroker.h"
#include <boost/bind.hpp>
#include <iostream>
#include <iterator>
@@ -26,37 +32,57 @@
namespace qpid {
namespace ha {
+namespace _qmf = ::qmf::org::apache::qpid::ha;
-void Membership::reset(const BrokerInfo& b) {
+using sys::Mutex;
+using types::Variant;
+
+Membership::Membership(const BrokerInfo& info, HaBroker& b)
+ : haBroker(b), self(info.getSystemId())
+{
+ brokers[self] = info;
+}
+
+void Membership::clear() {
+ Mutex::ScopedLock l(lock);
+ BrokerInfo me = brokers[self];
brokers.clear();
- brokers[b.getSystemId()] = b;
+ brokers[self] = me;
}
void Membership::add(const BrokerInfo& b) {
+ Mutex::ScopedLock l(lock);
brokers[b.getSystemId()] = b;
+ update(l);
}
void Membership::remove(const types::Uuid& id) {
+ Mutex::ScopedLock l(lock);
BrokerInfo::Map::iterator i = brokers.find(id);
if (i != brokers.end()) {
brokers.erase(i);
- }
+ update(l);
+ }
}
bool Membership::contains(const types::Uuid& id) {
+ Mutex::ScopedLock l(lock);
return brokers.find(id) != brokers.end();
}
void Membership::assign(const types::Variant::List& list) {
+ Mutex::ScopedLock l(lock);
brokers.clear();
for (types::Variant::List::const_iterator i = list.begin(); i != list.end(); ++i) {
BrokerInfo b(i->asMap());
brokers[b.getSystemId()] = b;
}
+ update(l);
}
types::Variant::List Membership::asList() const {
+ Mutex::ScopedLock l(lock);
types::Variant::List list;
for (BrokerInfo::Map::const_iterator i = brokers.begin(); i != brokers.end(); ++i)
list.push_back(i->second.asMap());
@@ -64,6 +90,7 @@ types::Variant::List Membership::asList() const {
}
BrokerInfo::Set Membership::otherBackups() const {
+ Mutex::ScopedLock l(lock);
BrokerInfo::Set result;
for (BrokerInfo::Map::const_iterator i = brokers.begin(); i != brokers.end(); ++i)
if (i->second.getStatus() == READY && i->second.getSystemId() != self)
@@ -71,15 +98,84 @@ BrokerInfo::Set Membership::otherBackups() const {
return result;
}
-bool Membership::get(const types::Uuid& id, BrokerInfo& result) {
- BrokerInfo::Map::iterator i = brokers.find(id);
+bool Membership::get(const types::Uuid& id, BrokerInfo& result) const {
+ Mutex::ScopedLock l(lock);
+ BrokerInfo::Map::const_iterator i = brokers.find(id);
if (i == brokers.end()) return false;
result = i->second;
return true;
}
-std::ostream& operator<<(std::ostream& o, const Membership& members) {
- return o << members.brokers;
+void Membership::update(Mutex::ScopedLock& l) {
+ QPID_LOG(info, "Membership: " << brokers);
+ Variant::List brokers = asList();
+ if (mgmtObject) mgmtObject->set_status(printable(getStatus(l)).str());
+ if (mgmtObject) mgmtObject->set_members(brokers);
+ haBroker.getBroker().getManagementAgent()->raiseEvent(
+ _qmf::EventMembersUpdate(brokers));
+}
+
+void Membership::setMgmtObject(boost::shared_ptr<_qmf::HaBroker> mo) {
+ Mutex::ScopedLock l(lock);
+ mgmtObject = mo;
+ update(l);
+}
+
+
+namespace {
+bool checkTransition(BrokerStatus from, BrokerStatus to) {
+ // Legal state transitions. Initial state is JOINING, ACTIVE is terminal.
+ static const BrokerStatus TRANSITIONS[][2] = {
+ { STANDALONE, JOINING }, // Initialization of backup broker
+ { JOINING, CATCHUP }, // Connected to primary
+ { JOINING, RECOVERING }, // Chosen as initial primary.
+ { CATCHUP, READY }, // Caught up all queues, ready to take over.
+ { READY, RECOVERING }, // Chosen as new primary
+ { READY, CATCHUP }, // Timed out failing over, demoted to catch-up.
+ { RECOVERING, ACTIVE } // All expected backups are ready
+ };
+ static const size_t N = sizeof(TRANSITIONS)/sizeof(TRANSITIONS[0]);
+ for (size_t i = 0; i < N; ++i) {
+ if (TRANSITIONS[i][0] == from && TRANSITIONS[i][1] == to)
+ return true;
+ }
+ return false;
+}
+} // namespace
+
+void Membership::setStatus(BrokerStatus newStatus) {
+ BrokerStatus status = getStatus();
+ QPID_LOG(info, "Status change: "
+ << printable(status) << " -> " << printable(newStatus));
+ bool legal = checkTransition(status, newStatus);
+ if (!legal) {
+ haBroker.shutdown(QPID_MSG("Illegal state transition: " << printable(status)
+ << " -> " << printable(newStatus)));
+ }
+
+ Mutex::ScopedLock l(lock);
+ brokers[self].setStatus(newStatus);
+ if (mgmtObject) mgmtObject->set_status(printable(newStatus).str());
+ update(l);
+}
+
+BrokerStatus Membership::getStatus() const {
+ Mutex::ScopedLock l(lock);
+ return getStatus(l);
+}
+
+BrokerStatus Membership::getStatus(sys::Mutex::ScopedLock&) const {
+ BrokerInfo::Map::const_iterator i = brokers.find(self);
+ assert(i != brokers.end());
+ return i->second.getStatus();
+}
+
+BrokerInfo Membership::getInfo() const {
+ Mutex::ScopedLock l(lock);
+ BrokerInfo::Map::const_iterator i = brokers.find(self);
+ assert(i != brokers.end());
+ return i->second;
}
+// FIXME aconway 2013-01-23: move to .h?
}} // namespace qpid::ha