diff options
Diffstat (limited to 'qpid/cpp/src/qpid/ha/Membership.cpp')
-rw-r--r-- | qpid/cpp/src/qpid/ha/Membership.cpp | 228 |
1 files changed, 228 insertions, 0 deletions
diff --git a/qpid/cpp/src/qpid/ha/Membership.cpp b/qpid/cpp/src/qpid/ha/Membership.cpp new file mode 100644 index 0000000000..92a0b7db70 --- /dev/null +++ b/qpid/cpp/src/qpid/ha/Membership.cpp @@ -0,0 +1,228 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "ConnectionObserver.h" +#include "HaBroker.h" +#include "Membership.h" +#include "qpid/broker/Broker.h" +#include "qpid/framing/FieldTable.h" +#include "qpid/management/ManagementAgent.h" +#include "qpid/types/Variant.h" +#include "qmf/org/apache/qpid/ha/EventMembersUpdate.h" +#include "qmf/org/apache/qpid/ha/HaBroker.h" +#include <boost/bind.hpp> +#include <iostream> +#include <iterator> + +namespace qpid { +namespace ha { + +namespace _qmf = ::qmf::org::apache::qpid::ha; + +using sys::Mutex; +using types::Variant; + +Membership::Membership(const BrokerInfo& info, HaBroker& b) + : haBroker(b), self(info.getSystemId()) +{ + brokers[self] = info; + setPrefix(); + oldStatus = info.getStatus(); +} + +void Membership::setPrefix() { + haBroker.logPrefix = Msg() << shortStr(brokers[self].getSystemId()) + << "(" << printable(brokers[self].getStatus()) << ") "; +} +void Membership::clear() { + Mutex::ScopedLock l(lock); + BrokerInfo me = brokers[self]; + brokers.clear(); + brokers[self] = me; +} + +void Membership::add(const BrokerInfo& b) { + Mutex::ScopedLock l(lock); + assert(b.getSystemId() != self); + brokers[b.getSystemId()] = b; + update(true, l); +} + + +void Membership::remove(const types::Uuid& id) { + Mutex::ScopedLock l(lock); + if (id == self) return; // Never remove myself + BrokerInfo::Map::iterator i = brokers.find(id); + if (i != brokers.end()) { + brokers.erase(i); + update(true, l); + } +} + +bool Membership::contains(const types::Uuid& id) { + Mutex::ScopedLock l(lock); + return brokers.find(id) != brokers.end(); +} + +void Membership::assign(const types::Variant::List& list) { + Mutex::ScopedLock l(lock); + clear(); + for (types::Variant::List::const_iterator i = list.begin(); i != list.end(); ++i) { + BrokerInfo b(i->asMap()); + brokers[b.getSystemId()] = b; + } + update(true, l); +} + +types::Variant::List Membership::asList() const { + Mutex::ScopedLock l(lock); + return asList(l); +} + +types::Variant::List Membership::asList(sys::Mutex::ScopedLock&) const { + types::Variant::List list; + for (BrokerInfo::Map::const_iterator i = brokers.begin(); i != brokers.end(); ++i) + list.push_back(i->second.asMap()); + return list; +} + +BrokerInfo::Set Membership::otherBackups() const { + Mutex::ScopedLock l(lock); + BrokerInfo::Set result; + for (BrokerInfo::Map::const_iterator i = brokers.begin(); i != brokers.end(); ++i) + if (i->second.getStatus() == READY && i->second.getSystemId() != self) + result.insert(i->second); + return result; +} + +BrokerInfo::Set Membership::getBrokers() const { + Mutex::ScopedLock l(lock); + BrokerInfo::Set result; + transform(brokers.begin(), brokers.end(), inserter(result, result.begin()), + boost::bind(&BrokerInfo::Map::value_type::second, _1)); + return result; +} + +bool Membership::get(const types::Uuid& id, BrokerInfo& result) const { + Mutex::ScopedLock l(lock); + BrokerInfo::Map::const_iterator i = brokers.find(id); + if (i == brokers.end()) return false; + result = i->second; + return true; +} + +namespace { +bool checkTransition(BrokerStatus from, BrokerStatus to) { + // Legal state transitions. Initial state is JOINING, ACTIVE is terminal. + static const BrokerStatus TRANSITIONS[][2] = { + { STANDALONE, JOINING }, // Initialization of backup broker + { JOINING, CATCHUP }, // Connected to primary + { JOINING, RECOVERING }, // Chosen as initial primary. + { CATCHUP, READY }, // Caught up all queues, ready to take over. + { READY, RECOVERING }, // Chosen as new primary + { READY, CATCHUP }, // Timed out failing over, demoted to catch-up. + { RECOVERING, ACTIVE } // All expected backups are ready + }; + static const size_t N = sizeof(TRANSITIONS)/sizeof(TRANSITIONS[0]); + for (size_t i = 0; i < N; ++i) { + if (TRANSITIONS[i][0] == from && TRANSITIONS[i][1] == to) + return true; + } + return false; +} +} // namespace + +void Membership::update(bool log, Mutex::ScopedLock& l) { + // Update managment and send update event. + BrokerStatus newStatus = getStatus(l); + Variant::List brokerList = asList(l); + if (mgmtObject) { + mgmtObject->set_status(printable(newStatus).str()); + mgmtObject->set_members(brokerList); + } + haBroker.getBroker().getManagementAgent()->raiseEvent( + _qmf::EventMembersUpdate(brokerList)); + + // Update link client properties + framing::FieldTable linkProperties = haBroker.getBroker().getLinkClientProperties(); + if (isBackup(newStatus)) { + // Set backup tag on outgoing link properties. + linkProperties.setTable( + ConnectionObserver::BACKUP_TAG, brokers[types::Uuid(self)].asFieldTable()); + haBroker.getBroker().setLinkClientProperties(linkProperties); + } else { + // Remove backup tag property from outgoing link properties. + linkProperties.erase(ConnectionObserver::BACKUP_TAG); + haBroker.getBroker().setLinkClientProperties(linkProperties); + } + + // Check status transitions + if (oldStatus != newStatus) { + QPID_LOG(info, haBroker.logPrefix << "Status change: " + << printable(oldStatus) << " -> " << printable(newStatus)); + if (!checkTransition(oldStatus, newStatus)) { + haBroker.shutdown(QPID_MSG("Illegal state transition: " << printable(oldStatus) + << " -> " << printable(newStatus))); + } + oldStatus = newStatus; + setPrefix(); + if (newStatus == READY) QPID_LOG(notice, haBroker.logPrefix << "Backup is ready"); + } + if (log) QPID_LOG(info, haBroker.logPrefix << "Membership update: " << brokers); +} + +void Membership::setMgmtObject(boost::shared_ptr<_qmf::HaBroker> mo) { + Mutex::ScopedLock l(lock); + mgmtObject = mo; + update(false, l); +} + + +void Membership::setStatus(BrokerStatus newStatus) { + Mutex::ScopedLock l(lock); + brokers[self].setStatus(newStatus); + update(false, l); +} + +BrokerStatus Membership::getStatus() const { + Mutex::ScopedLock l(lock); + return getStatus(l); +} + +BrokerStatus Membership::getStatus(sys::Mutex::ScopedLock&) const { + BrokerInfo::Map::const_iterator i = brokers.find(self); + assert(i != brokers.end()); + return i->second.getStatus(); +} + +BrokerInfo Membership::getSelf() const { + Mutex::ScopedLock l(lock); + BrokerInfo::Map::const_iterator i = brokers.find(self); + assert(i != brokers.end()); + return i->second; +} + +void Membership::setSelfAddress(const Address& a) { + Mutex::ScopedLock l(lock); + brokers[self].setAddress(a); + update(false, l); +} + +}} // namespace qpid::ha |