diff options
author | Kim van der Riet <kpvdr@apache.org> | 2012-05-04 15:39:19 +0000 |
---|---|---|
committer | Kim van der Riet <kpvdr@apache.org> | 2012-05-04 15:39:19 +0000 |
commit | 633c33f224f3196f3f9bd80bd2e418d8143fea06 (patch) | |
tree | 1391da89470593209466df68c0b40b89c14963b1 /cpp/src/qpid/ha/HaBroker.cpp | |
parent | c73f9286ebff93a6c8dbc29cf05e258c4b55c976 (diff) | |
download | qpid-python-633c33f224f3196f3f9bd80bd2e418d8143fea06.tar.gz |
QPID-3858: Updated branch - merged from trunk r.1333987
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/asyncstore@1334037 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/ha/HaBroker.cpp')
-rw-r--r-- | cpp/src/qpid/ha/HaBroker.cpp | 95 |
1 files changed, 69 insertions, 26 deletions
diff --git a/cpp/src/qpid/ha/HaBroker.cpp b/cpp/src/qpid/ha/HaBroker.cpp index 0d3bd51439..7d82fb63bd 100644 --- a/cpp/src/qpid/ha/HaBroker.cpp +++ b/cpp/src/qpid/ha/HaBroker.cpp @@ -25,10 +25,15 @@ #include "ReplicatingSubscription.h" #include "qpid/Exception.h" #include "qpid/broker/Broker.h" +#include "qpid/broker/Link.h" +#include "qpid/broker/Queue.h" +#include "qpid/broker/SignalHandler.h" #include "qpid/management/ManagementAgent.h" #include "qmf/org/apache/qpid/ha/Package.h" -#include "qmf/org/apache/qpid/ha/ArgsHaBrokerSetClientAddresses.h" -#include "qmf/org/apache/qpid/ha/ArgsHaBrokerSetBrokerAddresses.h" +#include "qmf/org/apache/qpid/ha/ArgsHaBrokerReplicate.h" +#include "qmf/org/apache/qpid/ha/ArgsHaBrokerSetBrokers.h" +#include "qmf/org/apache/qpid/ha/ArgsHaBrokerSetPublicBrokers.h" +#include "qmf/org/apache/qpid/ha/ArgsHaBrokerSetExpectedBackups.h" #include "qpid/log/Statement.h" namespace qpid { @@ -40,8 +45,10 @@ using namespace std; namespace { -const std::string PRIMARY="primary"; +const std::string STANDALONE="standalone"; +const std::string CATCH_UP="catch-up"; const std::string BACKUP="backup"; +const std::string PRIMARY="primary"; } // namespace @@ -49,7 +56,6 @@ const std::string BACKUP="backup"; HaBroker::HaBroker(broker::Broker& b, const Settings& s) : broker(b), settings(s), - backup(new Backup(b, s)), mgmtObject(0) { // Register a factory for replicating subscriptions. @@ -62,15 +68,20 @@ HaBroker::HaBroker(broker::Broker& b, const Settings& s) ManagementAgent* ma = broker.getManagementAgent(); if (!ma) throw Exception("Cannot start HA: management is disabled"); - if (ma) { - _qmf::Package packageInit(ma); - mgmtObject = new _qmf::HaBroker(ma, this, "ha-broker"); - mgmtObject->set_status(BACKUP); - ma->addObject(mgmtObject); - } + _qmf::Package packageInit(ma); + mgmtObject = new _qmf::HaBroker(ma, this, "ha-broker"); + mgmtObject->set_status(settings.cluster ? BACKUP : STANDALONE); + mgmtObject->set_replicateDefault(str(settings.replicateDefault)); + ma->addObject(mgmtObject); + + // NOTE: lock is not needed in a constructor but we created it just to pass + // to the set functions. sys::Mutex::ScopedLock l(lock); if (!settings.clientUrl.empty()) setClientUrl(Url(settings.clientUrl), l); if (!settings.brokerUrl.empty()) setBrokerUrl(Url(settings.brokerUrl), l); + + // If we are in a cluster, we start in backup mode. + if (settings.cluster) backup.reset(new Backup(*this, s)); } HaBroker::~HaBroker() {} @@ -80,26 +91,47 @@ Manageable::status_t HaBroker::ManagementMethod (uint32_t methodId, Args& args, switch (methodId) { case _qmf::HaBroker::METHOD_PROMOTE: { if (backup.get()) { // I am a backup - // FIXME aconway 2012-01-26: create primary state before resetting backup - // as that allows client connections. + // NOTE: resetting backup allows client connections, so any + // primary state should be set up here before backup.reset() backup.reset(); - QPID_LOG(notice, "HA: Primary promoted from backup"); + QPID_LOG(notice, "HA: Promoted to primary"); mgmtObject->set_status(PRIMARY); } break; } - case _qmf::HaBroker::METHOD_SETCLIENTADDRESSES: { - setClientUrl( - Url(dynamic_cast<_qmf::ArgsHaBrokerSetClientAddresses&>(args). - i_clientAddresses), l); + case _qmf::HaBroker::METHOD_SETBROKERS: { + setBrokerUrl(Url(dynamic_cast<_qmf::ArgsHaBrokerSetBrokers&>(args).i_url), l); + break; + } + case _qmf::HaBroker::METHOD_SETPUBLICBROKERS: { + setClientUrl(Url(dynamic_cast<_qmf::ArgsHaBrokerSetPublicBrokers&>(args).i_url), l); break; } - case _qmf::HaBroker::METHOD_SETBROKERADDRESSES: { - setBrokerUrl( - Url(dynamic_cast<_qmf::ArgsHaBrokerSetBrokerAddresses&>(args) - .i_brokerAddresses), l); + case _qmf::HaBroker::METHOD_SETEXPECTEDBACKUPS: { + setExpectedBackups(dynamic_cast<_qmf::ArgsHaBrokerSetExpectedBackups&>(args).i_expectedBackups, l); + break; + } + case _qmf::HaBroker::METHOD_REPLICATE: { + _qmf::ArgsHaBrokerReplicate& bq_args = + dynamic_cast<_qmf::ArgsHaBrokerReplicate&>(args); + QPID_LOG(debug, "HA replicating individual queue "<< bq_args.i_queue << " from " << bq_args.i_broker); + + boost::shared_ptr<broker::Queue> queue = broker.getQueues().get(bq_args.i_queue); + Url url(bq_args.i_broker); + string protocol = url[0].protocol.empty() ? "tcp" : url[0].protocol; + std::pair<broker::Link::shared_ptr, bool> result = broker.getLinks().declare( + url[0].host, url[0].port, protocol, + false, // durable + settings.mechanism, settings.username, settings.password); + boost::shared_ptr<broker::Link> link = result.first; + link->setUrl(url); + // Create a queue replicator + boost::shared_ptr<QueueReplicator> qr(new QueueReplicator(queue, link)); + qr->activate(); + broker.getExchanges().registerExchange(qr); break; } + default: return Manageable::STATUS_UNKNOWN_METHOD; } @@ -114,24 +146,35 @@ void HaBroker::setClientUrl(const Url& url, const sys::Mutex::ScopedLock& l) { void HaBroker::updateClientUrl(const sys::Mutex::ScopedLock&) { Url url = clientUrl.empty() ? brokerUrl : clientUrl; - assert(!url.empty()); - mgmtObject->set_clientAddresses(url.str()); + if (url.empty()) throw Url::Invalid("HA client URL is empty"); + mgmtObject->set_publicBrokers(url.str()); knownBrokers.clear(); knownBrokers.push_back(url); - QPID_LOG(debug, "HA: Setting client known-brokers to: " << url); + QPID_LOG(debug, "HA: Setting client URL to: " << url); } void HaBroker::setBrokerUrl(const Url& url, const sys::Mutex::ScopedLock& l) { - if (url.empty()) throw Exception("Invalid empty URL for HA broker failover"); + if (url.empty()) throw Url::Invalid("HA broker URL is empty"); + QPID_LOG(debug, "HA: Setting broker URL to: " << url); brokerUrl = url; - mgmtObject->set_brokerAddresses(brokerUrl.str()); + mgmtObject->set_brokers(brokerUrl.str()); if (backup.get()) backup->setBrokerUrl(brokerUrl); // Updating broker URL also updates defaulted client URL: if (clientUrl.empty()) updateClientUrl(l); } +void HaBroker::setExpectedBackups(size_t n, const sys::Mutex::ScopedLock&) { + expectedBackups = n; + mgmtObject->set_expectedBackups(n); +} + std::vector<Url> HaBroker::getKnownBrokers() const { return knownBrokers; } +void HaBroker::shutdown(const std::string& message) { + QPID_LOG(critical, "Shutting down: " << message); + broker.shutdown(); +} + }} // namespace qpid::ha |