diff options
Diffstat (limited to 'cpp/src/qpid/ha/Backup.cpp')
-rw-r--r-- | cpp/src/qpid/ha/Backup.cpp | 106 |
1 files changed, 70 insertions, 36 deletions
diff --git a/cpp/src/qpid/ha/Backup.cpp b/cpp/src/qpid/ha/Backup.cpp index 6852a58b0c..2affc12bf6 100644 --- a/cpp/src/qpid/ha/Backup.cpp +++ b/cpp/src/qpid/ha/Backup.cpp @@ -20,9 +20,12 @@ */ #include "Backup.h" #include "BrokerReplicator.h" +#include "ConnectionObserver.h" #include "HaBroker.h" +#include "Primary.h" #include "ReplicatingSubscription.h" #include "Settings.h" +#include "StatusCheck.h" #include "qpid/Url.h" #include "qpid/amqp_0_10/Codecs.h" #include "qpid/broker/Bridge.h" @@ -44,28 +47,38 @@ using namespace framing; using namespace broker; using types::Variant; using std::string; +using sys::Mutex; Backup::Backup(HaBroker& hb, const Settings& s) : - logPrefix("Backup: "), haBroker(hb), broker(hb.getBroker()), settings(s) + logPrefix("Backup: "), membership(hb.getMembership()), stopped(false), + haBroker(hb), broker(hb.getBroker()), settings(s), + statusCheck( + new StatusCheck( + logPrefix, broker.getLinkHearbeatInterval(), hb.getBrokerInfo())) { - // Empty brokerUrl means delay initialization until seBrokertUrl() is called. - if (!s.brokerUrl.empty()) initialize(Url(s.brokerUrl)); + // Set link properties to tag outgoing links. + framing::FieldTable linkProperties = broker.getLinkClientProperties(); + linkProperties.setTable( + ConnectionObserver::BACKUP_TAG, hb.getBrokerInfo().asFieldTable()); + broker.setLinkClientProperties(linkProperties); } -void Backup::initialize(const Url& brokers) { - if (brokers.empty()) throw Url::Invalid("HA broker URL is empty"); - QPID_LOG(info, logPrefix << "Connecting to cluster, broker URL: " << brokers); - string protocol = brokers[0].protocol.empty() ? "tcp" : brokers[0].protocol; - types::Uuid uuid(true); - // Declare the link - std::pair<Link::shared_ptr, bool> result = broker.getLinks().declare( - broker::QPID_NAME_PREFIX + string("ha.link.") + uuid.str(), - brokers[0].host, brokers[0].port, protocol, - false, // durable - settings.mechanism, settings.username, settings.password, - false); // no amq.failover - don't want to use client URL. - { - sys::Mutex::ScopedLock l(lock); +void Backup::setBrokerUrl(const Url& brokers) { + if (brokers.empty()) return; + Mutex::ScopedLock l(lock); + if (stopped) return; + if (haBroker.getStatus() == JOINING) statusCheck->setUrl(brokers); + if (!link) { // Not yet initialized + QPID_LOG(info, logPrefix << "Connecting to cluster, broker URL: " << brokers); + string protocol = brokers[0].protocol.empty() ? "tcp" : brokers[0].protocol; + types::Uuid uuid(true); + std::pair<Link::shared_ptr, bool> result; + result = broker.getLinks().declare( + broker::QPID_NAME_PREFIX + string("ha.link.") + uuid.str(), + brokers[0].host, brokers[0].port, protocol, + false, // durable + settings.mechanism, settings.username, settings.password, + false); // no amq.failover - don't want to use client URL. link = result.first; replicator.reset(new BrokerReplicator(haBroker, link)); replicator->initialize(); @@ -74,36 +87,57 @@ void Backup::initialize(const Url& brokers) { link->setUrl(brokers); // Outside the lock, once set link doesn't change. } -Backup::~Backup() { +void Backup::stop(Mutex::ScopedLock&) { + if (stopped) return; + stopped = true; + QPID_LOG(debug, logPrefix << "Leaving backup role."); if (link) link->close(); - if (replicator.get()) broker.getExchanges().destroy(replicator->getName()); + if (replicator.get()) { + replicator->shutdown(); + replicator.reset(); + } } -// Called via management. -void Backup::setBrokerUrl(const Url& url) { - // Ignore empty URLs seen during start-up for some tests. - if (url.empty()) return; - bool linkSet = false; +Role* Backup::recover(Mutex::ScopedLock&) { + BrokerInfo::Set backups; { - sys::Mutex::ScopedLock l(lock); - linkSet = link; + Mutex::ScopedLock l(lock); + if (stopped) return 0; + stop(l); // Stop backup activity before starting primary. + QPID_LOG(notice, "Promoting to primary: " << haBroker.getBrokerInfo()); + // Reset membership before allowing backups to connect. + backups = membership.otherBackups(); + membership.clear(); + return new Primary(haBroker, backups); } - if (linkSet) - link->setUrl(url); // Outside lock, once set link doesn't change - else - initialize(url); // Deferred initialization } -void Backup::setStatus(BrokerStatus status) { - switch (status) { - case READY: - QPID_LOG(notice, logPrefix << "Ready to become primary."); +Role* Backup::promote() { + Mutex::ScopedLock l(lock); + if (stopped) return 0; + switch (haBroker.getStatus()) { + case JOINING: + if (statusCheck->canPromote()) return recover(l); + else { + QPID_LOG(error, + logPrefix << "Joining active cluster, cannot be promoted."); + throw Exception("Joining active cluster, cannot be promoted."); + } break; case CATCHUP: - QPID_LOG(notice, logPrefix << "Catching up on primary, cannot be promoted."); + QPID_LOG(error, logPrefix << "Still catching up, cannot be promoted."); + throw Exception("Still catching up, cannot be promoted."); + break; + case READY: return recover(l); break; default: - assert(0); + assert(0); // Not a valid state for the Backup role.. } + return 0; // Keep compiler happy +} + +Backup::~Backup() { + Mutex::ScopedLock l(lock); + stop(l); } }} // namespace qpid::ha |