summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/qpid/ha/Membership.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src/qpid/ha/Membership.cpp')
-rw-r--r--qpid/cpp/src/qpid/ha/Membership.cpp228
1 files changed, 228 insertions, 0 deletions
diff --git a/qpid/cpp/src/qpid/ha/Membership.cpp b/qpid/cpp/src/qpid/ha/Membership.cpp
new file mode 100644
index 0000000000..92a0b7db70
--- /dev/null
+++ b/qpid/cpp/src/qpid/ha/Membership.cpp
@@ -0,0 +1,228 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "ConnectionObserver.h"
+#include "HaBroker.h"
+#include "Membership.h"
+#include "qpid/broker/Broker.h"
+#include "qpid/framing/FieldTable.h"
+#include "qpid/management/ManagementAgent.h"
+#include "qpid/types/Variant.h"
+#include "qmf/org/apache/qpid/ha/EventMembersUpdate.h"
+#include "qmf/org/apache/qpid/ha/HaBroker.h"
+#include <boost/bind.hpp>
+#include <iostream>
+#include <iterator>
+
+namespace qpid {
+namespace ha {
+
+namespace _qmf = ::qmf::org::apache::qpid::ha;
+
+using sys::Mutex;
+using types::Variant;
+
+Membership::Membership(const BrokerInfo& info, HaBroker& b)
+ : haBroker(b), self(info.getSystemId())
+{
+ brokers[self] = info;
+ setPrefix();
+ oldStatus = info.getStatus();
+}
+
+void Membership::setPrefix() {
+ haBroker.logPrefix = Msg() << shortStr(brokers[self].getSystemId())
+ << "(" << printable(brokers[self].getStatus()) << ") ";
+}
+void Membership::clear() {
+ Mutex::ScopedLock l(lock);
+ BrokerInfo me = brokers[self];
+ brokers.clear();
+ brokers[self] = me;
+}
+
+void Membership::add(const BrokerInfo& b) {
+ Mutex::ScopedLock l(lock);
+ assert(b.getSystemId() != self);
+ brokers[b.getSystemId()] = b;
+ update(true, l);
+}
+
+
+void Membership::remove(const types::Uuid& id) {
+ Mutex::ScopedLock l(lock);
+ if (id == self) return; // Never remove myself
+ BrokerInfo::Map::iterator i = brokers.find(id);
+ if (i != brokers.end()) {
+ brokers.erase(i);
+ update(true, l);
+ }
+}
+
+bool Membership::contains(const types::Uuid& id) {
+ Mutex::ScopedLock l(lock);
+ return brokers.find(id) != brokers.end();
+}
+
+void Membership::assign(const types::Variant::List& list) {
+ Mutex::ScopedLock l(lock);
+ clear();
+ for (types::Variant::List::const_iterator i = list.begin(); i != list.end(); ++i) {
+ BrokerInfo b(i->asMap());
+ brokers[b.getSystemId()] = b;
+ }
+ update(true, l);
+}
+
+types::Variant::List Membership::asList() const {
+ Mutex::ScopedLock l(lock);
+ return asList(l);
+}
+
+types::Variant::List Membership::asList(sys::Mutex::ScopedLock&) const {
+ types::Variant::List list;
+ for (BrokerInfo::Map::const_iterator i = brokers.begin(); i != brokers.end(); ++i)
+ list.push_back(i->second.asMap());
+ return list;
+}
+
+BrokerInfo::Set Membership::otherBackups() const {
+ Mutex::ScopedLock l(lock);
+ BrokerInfo::Set result;
+ for (BrokerInfo::Map::const_iterator i = brokers.begin(); i != brokers.end(); ++i)
+ if (i->second.getStatus() == READY && i->second.getSystemId() != self)
+ result.insert(i->second);
+ return result;
+}
+
+BrokerInfo::Set Membership::getBrokers() const {
+ Mutex::ScopedLock l(lock);
+ BrokerInfo::Set result;
+ transform(brokers.begin(), brokers.end(), inserter(result, result.begin()),
+ boost::bind(&BrokerInfo::Map::value_type::second, _1));
+ return result;
+}
+
+bool Membership::get(const types::Uuid& id, BrokerInfo& result) const {
+ Mutex::ScopedLock l(lock);
+ BrokerInfo::Map::const_iterator i = brokers.find(id);
+ if (i == brokers.end()) return false;
+ result = i->second;
+ return true;
+}
+
+namespace {
+bool checkTransition(BrokerStatus from, BrokerStatus to) {
+ // Legal state transitions. Initial state is JOINING, ACTIVE is terminal.
+ static const BrokerStatus TRANSITIONS[][2] = {
+ { STANDALONE, JOINING }, // Initialization of backup broker
+ { JOINING, CATCHUP }, // Connected to primary
+ { JOINING, RECOVERING }, // Chosen as initial primary.
+ { CATCHUP, READY }, // Caught up all queues, ready to take over.
+ { READY, RECOVERING }, // Chosen as new primary
+ { READY, CATCHUP }, // Timed out failing over, demoted to catch-up.
+ { RECOVERING, ACTIVE } // All expected backups are ready
+ };
+ static const size_t N = sizeof(TRANSITIONS)/sizeof(TRANSITIONS[0]);
+ for (size_t i = 0; i < N; ++i) {
+ if (TRANSITIONS[i][0] == from && TRANSITIONS[i][1] == to)
+ return true;
+ }
+ return false;
+}
+} // namespace
+
+void Membership::update(bool log, Mutex::ScopedLock& l) {
+ // Update managment and send update event.
+ BrokerStatus newStatus = getStatus(l);
+ Variant::List brokerList = asList(l);
+ if (mgmtObject) {
+ mgmtObject->set_status(printable(newStatus).str());
+ mgmtObject->set_members(brokerList);
+ }
+ haBroker.getBroker().getManagementAgent()->raiseEvent(
+ _qmf::EventMembersUpdate(brokerList));
+
+ // Update link client properties
+ framing::FieldTable linkProperties = haBroker.getBroker().getLinkClientProperties();
+ if (isBackup(newStatus)) {
+ // Set backup tag on outgoing link properties.
+ linkProperties.setTable(
+ ConnectionObserver::BACKUP_TAG, brokers[types::Uuid(self)].asFieldTable());
+ haBroker.getBroker().setLinkClientProperties(linkProperties);
+ } else {
+ // Remove backup tag property from outgoing link properties.
+ linkProperties.erase(ConnectionObserver::BACKUP_TAG);
+ haBroker.getBroker().setLinkClientProperties(linkProperties);
+ }
+
+ // Check status transitions
+ if (oldStatus != newStatus) {
+ QPID_LOG(info, haBroker.logPrefix << "Status change: "
+ << printable(oldStatus) << " -> " << printable(newStatus));
+ if (!checkTransition(oldStatus, newStatus)) {
+ haBroker.shutdown(QPID_MSG("Illegal state transition: " << printable(oldStatus)
+ << " -> " << printable(newStatus)));
+ }
+ oldStatus = newStatus;
+ setPrefix();
+ if (newStatus == READY) QPID_LOG(notice, haBroker.logPrefix << "Backup is ready");
+ }
+ if (log) QPID_LOG(info, haBroker.logPrefix << "Membership update: " << brokers);
+}
+
+void Membership::setMgmtObject(boost::shared_ptr<_qmf::HaBroker> mo) {
+ Mutex::ScopedLock l(lock);
+ mgmtObject = mo;
+ update(false, l);
+}
+
+
+void Membership::setStatus(BrokerStatus newStatus) {
+ Mutex::ScopedLock l(lock);
+ brokers[self].setStatus(newStatus);
+ update(false, l);
+}
+
+BrokerStatus Membership::getStatus() const {
+ Mutex::ScopedLock l(lock);
+ return getStatus(l);
+}
+
+BrokerStatus Membership::getStatus(sys::Mutex::ScopedLock&) const {
+ BrokerInfo::Map::const_iterator i = brokers.find(self);
+ assert(i != brokers.end());
+ return i->second.getStatus();
+}
+
+BrokerInfo Membership::getSelf() const {
+ Mutex::ScopedLock l(lock);
+ BrokerInfo::Map::const_iterator i = brokers.find(self);
+ assert(i != brokers.end());
+ return i->second;
+}
+
+void Membership::setSelfAddress(const Address& a) {
+ Mutex::ScopedLock l(lock);
+ brokers[self].setAddress(a);
+ update(false, l);
+}
+
+}} // namespace qpid::ha