diff options
author | Kim van der Riet <kpvdr@apache.org> | 2012-08-03 12:13:32 +0000 |
---|---|---|
committer | Kim van der Riet <kpvdr@apache.org> | 2012-08-03 12:13:32 +0000 |
commit | d43d1912b376322e27fdcda551a73f9ff5487972 (patch) | |
tree | ce493e10baa95f44be8beb5778ce51783463196d /cpp/src/qpid/ha/HaBroker.cpp | |
parent | 04877fec0c6346edec67072d7f2d247740cf2af5 (diff) | |
download | qpid-python-d43d1912b376322e27fdcda551a73f9ff5487972.tar.gz |
QPID-3858: Updated branch - merged from trunk r.1368650
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/asyncstore@1368910 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/ha/HaBroker.cpp')
-rw-r--r-- | cpp/src/qpid/ha/HaBroker.cpp | 300 |
1 files changed, 235 insertions, 65 deletions
diff --git a/cpp/src/qpid/ha/HaBroker.cpp b/cpp/src/qpid/ha/HaBroker.cpp index 7d82fb63bd..d126639813 100644 --- a/cpp/src/qpid/ha/HaBroker.cpp +++ b/cpp/src/qpid/ha/HaBroker.cpp @@ -19,22 +19,31 @@ * */ #include "Backup.h" -#include "ConnectionExcluder.h" +#include "BackupConnectionExcluder.h" +#include "ConnectionObserver.h" #include "HaBroker.h" -#include "Settings.h" +#include "Primary.h" +#include "QueueReplicator.h" #include "ReplicatingSubscription.h" +#include "Settings.h" +#include "qpid/amqp_0_10/Codecs.h" #include "qpid/Exception.h" #include "qpid/broker/Broker.h" #include "qpid/broker/Link.h" #include "qpid/broker/Queue.h" #include "qpid/broker/SignalHandler.h" +#include "qpid/framing/FieldTable.h" #include "qpid/management/ManagementAgent.h" +#include "qpid/sys/SystemInfo.h" +#include "qpid/types/Uuid.h" +#include "qpid/framing/Uuid.h" #include "qmf/org/apache/qpid/ha/Package.h" #include "qmf/org/apache/qpid/ha/ArgsHaBrokerReplicate.h" -#include "qmf/org/apache/qpid/ha/ArgsHaBrokerSetBrokers.h" -#include "qmf/org/apache/qpid/ha/ArgsHaBrokerSetPublicBrokers.h" -#include "qmf/org/apache/qpid/ha/ArgsHaBrokerSetExpectedBackups.h" +#include "qmf/org/apache/qpid/ha/ArgsHaBrokerSetBrokersUrl.h" +#include "qmf/org/apache/qpid/ha/ArgsHaBrokerSetPublicUrl.h" +#include "qmf/org/apache/qpid/ha/EventMembersUpdate.h" #include "qpid/log/Statement.h" +#include <boost/shared_ptr.hpp> namespace qpid { namespace ha { @@ -42,91 +51,149 @@ namespace ha { namespace _qmf = ::qmf::org::apache::qpid::ha; using namespace management; using namespace std; +using types::Variant; +using types::Uuid; +using sys::Mutex; -namespace { - -const std::string STANDALONE="standalone"; -const std::string CATCH_UP="catch-up"; -const std::string BACKUP="backup"; -const std::string PRIMARY="primary"; - -} // namespace - - +// Called in Plugin::earlyInitialize HaBroker::HaBroker(broker::Broker& b, const Settings& s) - : broker(b), + : logPrefix("Broker: "), + broker(b), + systemId(broker.getSystem()->getSystemId().data()), settings(s), - mgmtObject(0) + observer(new ConnectionObserver(*this, systemId)), + mgmtObject(0), + status(STANDALONE), + membership(systemId), + replicationTest(s.replicateDefault.get()) { - // Register a factory for replicating subscriptions. - broker.getConsumerFactories().add( - boost::shared_ptr<ReplicatingSubscription::Factory>( - new ReplicatingSubscription::Factory())); + // If we are joining a cluster we must start excluding clients now, + // otherwise there's a window for a client to connect before we get to + // initialize() + if (settings.cluster) { + QPID_LOG(debug, logPrefix << "Rejecting client connections."); + observer->setObserver(boost::shared_ptr<broker::ConnectionObserver>( + new BackupConnectionExcluder)); + broker.getConnectionObservers().add(observer); + } +} - broker.getKnownBrokers = boost::bind(&HaBroker::getKnownBrokers, this); +// Called in Plugin::initialize +void HaBroker::initialize() { + // FIXME aconway 2012-07-19: assumes there's a TCP transport with a meaningful port. + brokerInfo = BrokerInfo( + broker.getSystem()->getNodeName(), broker.getPort(broker::Broker::TCP_TRANSPORT), systemId); + + // Set up the management object. ManagementAgent* ma = broker.getManagementAgent(); - if (!ma) + if (settings.cluster && !ma) throw Exception("Cannot start HA: management is disabled"); _qmf::Package packageInit(ma); mgmtObject = new _qmf::HaBroker(ma, this, "ha-broker"); - mgmtObject->set_status(settings.cluster ? BACKUP : STANDALONE); - mgmtObject->set_replicateDefault(str(settings.replicateDefault)); + mgmtObject->set_replicateDefault(settings.replicateDefault.str()); + mgmtObject->set_systemId(systemId); ma->addObject(mgmtObject); - // NOTE: lock is not needed in a constructor but we created it just to pass - // to the set functions. - sys::Mutex::ScopedLock l(lock); - if (!settings.clientUrl.empty()) setClientUrl(Url(settings.clientUrl), l); - if (!settings.brokerUrl.empty()) setBrokerUrl(Url(settings.brokerUrl), l); + // Register a factory for replicating subscriptions. + broker.getConsumerFactories().add( + boost::shared_ptr<ReplicatingSubscription::Factory>( + new ReplicatingSubscription::Factory())); + + // If we are in a cluster, start as backup in joining state. + if (settings.cluster) { + status = JOINING; + backup.reset(new Backup(*this, settings)); + broker.getKnownBrokers = boost::bind(&HaBroker::getKnownBrokers, this); + } + + if (!settings.clientUrl.empty()) setClientUrl(Url(settings.clientUrl)); + if (!settings.brokerUrl.empty()) setBrokerUrl(Url(settings.brokerUrl)); + + + QPID_LOG(notice, logPrefix << "Initializing: " << brokerInfo); - // If we are in a cluster, we start in backup mode. - if (settings.cluster) backup.reset(new Backup(*this, s)); + // NOTE: lock is not needed in a constructor, but create one + // to pass to functions that have a ScopedLock parameter. + Mutex::ScopedLock l(lock); + statusChanged(l); } -HaBroker::~HaBroker() {} +HaBroker::~HaBroker() { + QPID_LOG(notice, logPrefix << "Shut down: " << brokerInfo); + broker.getConnectionObservers().remove(observer); +} + +void HaBroker::recover() { + auto_ptr<Backup> b; + { + Mutex::ScopedLock l(lock); + // No longer replicating, close link. Note: link must be closed before we + // setStatus(RECOVERING) as that will remove our broker info from the + // outgoing link properties so we won't recognize self-connects. + b = backup; + } + b.reset(); // Call destructor outside of lock. + BrokerInfo::Set backups; + { + Mutex::ScopedLock l(lock); + setStatus(RECOVERING, l); + backups = membership.otherBackups(); + membership.reset(brokerInfo); + // Drop the lock, new Primary may call back on activate. + } + // Outside of lock, may call back on activate() + primary.reset(new Primary(*this, backups)); // Starts primary-ready check. +} + +// Called back from Primary active check. +void HaBroker::activate() { setStatus(ACTIVE); } Manageable::status_t HaBroker::ManagementMethod (uint32_t methodId, Args& args, string&) { - sys::Mutex::ScopedLock l(lock); switch (methodId) { case _qmf::HaBroker::METHOD_PROMOTE: { - if (backup.get()) { // I am a backup - // NOTE: resetting backup allows client connections, so any - // primary state should be set up here before backup.reset() - backup.reset(); - QPID_LOG(notice, "HA: Promoted to primary"); - mgmtObject->set_status(PRIMARY); + switch (getStatus()) { + case JOINING: recover(); break; + case CATCHUP: + QPID_LOG(error, logPrefix << "Still catching up, cannot be promoted."); + throw Exception("Still catching up, cannot be promoted."); + break; + case READY: recover(); break; + case RECOVERING: break; + case ACTIVE: break; + case STANDALONE: break; } break; } - case _qmf::HaBroker::METHOD_SETBROKERS: { - setBrokerUrl(Url(dynamic_cast<_qmf::ArgsHaBrokerSetBrokers&>(args).i_url), l); + case _qmf::HaBroker::METHOD_SETBROKERSURL: { + setBrokerUrl(Url(dynamic_cast<_qmf::ArgsHaBrokerSetBrokersUrl&>(args).i_url)); break; } - case _qmf::HaBroker::METHOD_SETPUBLICBROKERS: { - setClientUrl(Url(dynamic_cast<_qmf::ArgsHaBrokerSetPublicBrokers&>(args).i_url), l); + case _qmf::HaBroker::METHOD_SETPUBLICURL: { + setClientUrl(Url(dynamic_cast<_qmf::ArgsHaBrokerSetPublicUrl&>(args).i_url)); break; } - case _qmf::HaBroker::METHOD_SETEXPECTEDBACKUPS: { - setExpectedBackups(dynamic_cast<_qmf::ArgsHaBrokerSetExpectedBackups&>(args).i_expectedBackups, l); - break; - } case _qmf::HaBroker::METHOD_REPLICATE: { _qmf::ArgsHaBrokerReplicate& bq_args = dynamic_cast<_qmf::ArgsHaBrokerReplicate&>(args); - QPID_LOG(debug, "HA replicating individual queue "<< bq_args.i_queue << " from " << bq_args.i_broker); + QPID_LOG(debug, logPrefix << "Replicate individual queue " + << bq_args.i_queue << " from " << bq_args.i_broker); boost::shared_ptr<broker::Queue> queue = broker.getQueues().get(bq_args.i_queue); Url url(bq_args.i_broker); string protocol = url[0].protocol.empty() ? "tcp" : url[0].protocol; + Uuid uuid(true); std::pair<broker::Link::shared_ptr, bool> result = broker.getLinks().declare( + broker::QPID_NAME_PREFIX + string("ha.link.") + uuid.str(), url[0].host, url[0].port, protocol, false, // durable - settings.mechanism, settings.username, settings.password); + settings.mechanism, settings.username, settings.password, + false); // no amq.failover - don't want to use client URL. boost::shared_ptr<broker::Link> link = result.first; link->setUrl(url); // Create a queue replicator - boost::shared_ptr<QueueReplicator> qr(new QueueReplicator(queue, link)); + boost::shared_ptr<QueueReplicator> qr( + new QueueReplicator(*this, queue, link)); qr->activate(); broker.getExchanges().registerExchange(qr); break; @@ -138,43 +205,146 @@ Manageable::status_t HaBroker::ManagementMethod (uint32_t methodId, Args& args, return Manageable::STATUS_OK; } -void HaBroker::setClientUrl(const Url& url, const sys::Mutex::ScopedLock& l) { +void HaBroker::setClientUrl(const Url& url) { + Mutex::ScopedLock l(lock); if (url.empty()) throw Exception("Invalid empty URL for HA client failover"); clientUrl = url; updateClientUrl(l); } -void HaBroker::updateClientUrl(const sys::Mutex::ScopedLock&) { +void HaBroker::updateClientUrl(Mutex::ScopedLock&) { Url url = clientUrl.empty() ? brokerUrl : clientUrl; if (url.empty()) throw Url::Invalid("HA client URL is empty"); - mgmtObject->set_publicBrokers(url.str()); + mgmtObject->set_publicUrl(url.str()); knownBrokers.clear(); knownBrokers.push_back(url); - QPID_LOG(debug, "HA: Setting client URL to: " << url); + QPID_LOG(debug, logPrefix << "Setting client URL to: " << url); } -void HaBroker::setBrokerUrl(const Url& url, const sys::Mutex::ScopedLock& l) { +void HaBroker::setBrokerUrl(const Url& url) { + Mutex::ScopedLock l(lock); if (url.empty()) throw Url::Invalid("HA broker URL is empty"); - QPID_LOG(debug, "HA: Setting broker URL to: " << url); brokerUrl = url; - mgmtObject->set_brokers(brokerUrl.str()); + mgmtObject->set_brokersUrl(brokerUrl.str()); if (backup.get()) backup->setBrokerUrl(brokerUrl); // Updating broker URL also updates defaulted client URL: if (clientUrl.empty()) updateClientUrl(l); } -void HaBroker::setExpectedBackups(size_t n, const sys::Mutex::ScopedLock&) { - expectedBackups = n; - mgmtObject->set_expectedBackups(n); -} - std::vector<Url> HaBroker::getKnownBrokers() const { + Mutex::ScopedLock l(lock); return knownBrokers; } -void HaBroker::shutdown(const std::string& message) { - QPID_LOG(critical, "Shutting down: " << message); +void HaBroker::shutdown() { + QPID_LOG(critical, logPrefix << "Critical error, shutting down."); broker.shutdown(); } +BrokerStatus HaBroker::getStatus() const { + Mutex::ScopedLock l(lock); + return status; +} + +void HaBroker::setStatus(BrokerStatus newStatus) { + Mutex::ScopedLock l(lock); + setStatus(newStatus, l); +} + +namespace { +bool checkTransition(BrokerStatus from, BrokerStatus to) { + // Legal state transitions. Initial state is JOINING, ACTIVE is terminal. + static const BrokerStatus TRANSITIONS[][2] = { + { JOINING, CATCHUP }, // Connected to primary + { JOINING, RECOVERING }, // Chosen as initial primary. + { CATCHUP, READY }, // Caught up all queues, ready to take over. + { READY, RECOVERING }, // Chosen as new primary + { READY, CATCHUP }, // Timed out failing over, demoted to catch-up. + { RECOVERING, ACTIVE } // All expected backups are ready + }; + static const size_t N = sizeof(TRANSITIONS)/sizeof(TRANSITIONS[0]); + for (size_t i = 0; i < N; ++i) { + if (TRANSITIONS[i][0] == from && TRANSITIONS[i][1] == to) + return true; + } + return false; +} +} // namespace + +void HaBroker::setStatus(BrokerStatus newStatus, Mutex::ScopedLock& l) { + QPID_LOG(info, logPrefix << "Status change: " + << printable(status) << " -> " << printable(newStatus)); + bool legal = checkTransition(status, newStatus); + assert(legal); + if (!legal) { + QPID_LOG(critical, logPrefix << "Illegal state transition: " + << printable(status) << " -> " << printable(newStatus)); + shutdown(); + } + status = newStatus; + statusChanged(l); +} + +void HaBroker::statusChanged(Mutex::ScopedLock& l) { + mgmtObject->set_status(printable(status).str()); + brokerInfo.setStatus(status); + setLinkProperties(l); +} + +void HaBroker::membershipUpdated(Mutex::ScopedLock&) { + Variant::List brokers = membership.asList(); + mgmtObject->set_members(brokers); + broker.getManagementAgent()->raiseEvent(_qmf::EventMembersUpdate(brokers)); +} + +void HaBroker::setMembership(const Variant::List& brokers) { + Mutex::ScopedLock l(lock); + membership.assign(brokers); + QPID_LOG(info, logPrefix << "Membership update: " << membership); + BrokerInfo info; + // Update my status to what the primary says it is. The primary can toggle + // status between READY and CATCHUP based on the state of our subscriptions. + if (membership.get(systemId, info) && status != info.getStatus()) { + setStatus(info.getStatus(), l); + if (backup.get()) backup->setStatus(status); + } + membershipUpdated(l); +} + +void HaBroker::resetMembership(const BrokerInfo& b) { + Mutex::ScopedLock l(lock); + membership.reset(b); + QPID_LOG(debug, logPrefix << "Membership reset to: " << membership); + membershipUpdated(l); +} + +void HaBroker::addBroker(const BrokerInfo& b) { + Mutex::ScopedLock l(lock); + membership.add(b); + QPID_LOG(debug, logPrefix << "Membership add: " << b << " now: " << membership); + membershipUpdated(l); +} + +void HaBroker::removeBroker(const Uuid& id) { + Mutex::ScopedLock l(lock); + membership.remove(id); + QPID_LOG(debug, logPrefix << "Membership remove: " << id << " now: " << membership); + membershipUpdated(l); +} + +void HaBroker::setLinkProperties(Mutex::ScopedLock&) { + framing::FieldTable linkProperties = broker.getLinkClientProperties(); + if (isBackup(status)) { + // If this is a backup then any outgoing links are backup + // links and need to be tagged. + linkProperties.setTable(ConnectionObserver::BACKUP_TAG, brokerInfo.asFieldTable()); + } + else { + // If this is a primary then any outgoing links are federation links + // and should not be tagged. + linkProperties.erase(ConnectionObserver::BACKUP_TAG); + } + broker.setLinkClientProperties(linkProperties); +} + }} // namespace qpid::ha |