diff options
| author | Alan Conway <aconway@apache.org> | 2012-02-29 23:38:00 +0000 |
|---|---|---|
| committer | Alan Conway <aconway@apache.org> | 2012-02-29 23:38:00 +0000 |
| commit | 9da195e712d38ef250b698fc394fef8ab6a936a5 (patch) | |
| tree | a3801773e75304bdfb8d554b044f4d4e1aa09324 /cpp/src/qpid | |
| parent | 316f2a9a92780a323b7b2fcbcb695883651b370f (diff) | |
| download | qpid-python-9da195e712d38ef250b698fc394fef8ab6a936a5.tar.gz | |
QPID-3603: HA support for stand-alone replication.
- New management method HaBroker.replicate to enable replication.
- qpid-ha tool can enable replication of queues.
- qpid-config tool can create queues with replication enabled.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1295339 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid')
| -rw-r--r-- | cpp/src/qpid/broker/Link.cpp | 3 | ||||
| -rw-r--r-- | cpp/src/qpid/ha/HaBroker.cpp | 33 | ||||
| -rw-r--r-- | cpp/src/qpid/ha/HaPlugin.cpp | 8 | ||||
| -rw-r--r-- | cpp/src/qpid/ha/Settings.h | 4 | ||||
| -rw-r--r-- | cpp/src/qpid/ha/management-schema.xml | 5 |
5 files changed, 38 insertions, 15 deletions
diff --git a/cpp/src/qpid/broker/Link.cpp b/cpp/src/qpid/broker/Link.cpp index 4af1e6d6bd..56a90e7fb7 100644 --- a/cpp/src/qpid/broker/Link.cpp +++ b/cpp/src/qpid/broker/Link.cpp @@ -206,11 +206,9 @@ void Link::closed(int, std::string text) QPID_LOG (info, "Inter-broker link disconnected from " << host << ":" << port << " " << text); connection = 0; - if (state == STATE_OPERATIONAL) { stringstream addr; addr << host << ":" << port; - QPID_LOG(warning, "Inter-broker link disconnected from " << addr.str()); if (!hideManagement() && agent) agent->raiseEvent(_qmf::EventBrokerLinkDown(addr.str())); } @@ -405,7 +403,6 @@ uint Link::nextChannel() void Link::notifyConnectionForced(const string text) { Mutex::ScopedLock mutex(lock); - setStateLH(STATE_FAILED); if (!hideManagement()) mgmtObject->set_lastError(text); diff --git a/cpp/src/qpid/ha/HaBroker.cpp b/cpp/src/qpid/ha/HaBroker.cpp index d92749abeb..f909aca44f 100644 --- a/cpp/src/qpid/ha/HaBroker.cpp +++ b/cpp/src/qpid/ha/HaBroker.cpp @@ -25,8 +25,11 @@ #include "ReplicatingSubscription.h" #include "qpid/Exception.h" #include "qpid/broker/Broker.h" +#include "qpid/broker/Link.h" +#include "qpid/broker/Queue.h" #include "qpid/management/ManagementAgent.h" #include "qmf/org/apache/qpid/ha/Package.h" +#include "qmf/org/apache/qpid/ha/ArgsHaBrokerReplicate.h" #include "qmf/org/apache/qpid/ha/ArgsHaBrokerSetBrokers.h" #include "qmf/org/apache/qpid/ha/ArgsHaBrokerSetPublicBrokers.h" #include "qmf/org/apache/qpid/ha/ArgsHaBrokerSetExpectedBackups.h" @@ -50,7 +53,6 @@ const std::string BACKUP="backup"; HaBroker::HaBroker(broker::Broker& b, const Settings& s) : broker(b), settings(s), - backup(new Backup(b, s)), mgmtObject(0) { // Register a factory for replicating subscriptions. @@ -72,6 +74,9 @@ HaBroker::HaBroker(broker::Broker& b, const Settings& s) sys::Mutex::ScopedLock l(lock); if (!settings.clientUrl.empty()) setClientUrl(Url(settings.clientUrl), l); if (!settings.brokerUrl.empty()) setBrokerUrl(Url(settings.brokerUrl), l); + + // If we are in a cluster, we start in backup mode. + if (settings.cluster) backup.reset(new Backup(b, s)); } HaBroker::~HaBroker() {} @@ -81,8 +86,8 @@ Manageable::status_t HaBroker::ManagementMethod (uint32_t methodId, Args& args, switch (methodId) { case _qmf::HaBroker::METHOD_PROMOTE: { if (backup.get()) { // I am a backup - // FIXME aconway 2012-01-26: create primary state before resetting backup - // as that allows client connections. + // NOTE: resetting backup allows client connections, so any + // primary state should be set up here before backup.reset() backup.reset(); QPID_LOG(notice, "HA: Primary promoted from backup"); mgmtObject->set_status(PRIMARY); @@ -100,7 +105,27 @@ Manageable::status_t HaBroker::ManagementMethod (uint32_t methodId, Args& args, case _qmf::HaBroker::METHOD_SETEXPECTEDBACKUPS: { setExpectedBackups(dynamic_cast<_qmf::ArgsHaBrokerSetExpectedBackups&>(args).i_expectedBackups, l); break; - } + } + case _qmf::HaBroker::METHOD_REPLICATE: { + _qmf::ArgsHaBrokerReplicate& bq_args = + dynamic_cast<_qmf::ArgsHaBrokerReplicate&>(args); + QPID_LOG(debug, "HA replicating individual queue "<< bq_args.i_queue << " from " << bq_args.i_broker); + + boost::shared_ptr<broker::Queue> queue = broker.getQueues().get(bq_args.i_queue); + Url url(bq_args.i_broker); + string protocol = url[0].protocol.empty() ? "tcp" : url[0].protocol; + std::pair<broker::Link::shared_ptr, bool> result = broker.getLinks().declare( + url[0].host, url[0].port, protocol, + false, // durable + settings.mechanism, settings.username, settings.password); + boost::shared_ptr<broker::Link> link = result.first; + link->setUrl(url); + // Create a queue replicator + boost::shared_ptr<QueueReplicator> qr(new QueueReplicator(queue, link)); + broker.getExchanges().registerExchange(qr); + qr->activate(); + break; + } default: return Manageable::STATUS_UNKNOWN_METHOD; diff --git a/cpp/src/qpid/ha/HaPlugin.cpp b/cpp/src/qpid/ha/HaPlugin.cpp index b3080330fb..6a43b591b0 100644 --- a/cpp/src/qpid/ha/HaPlugin.cpp +++ b/cpp/src/qpid/ha/HaPlugin.cpp @@ -31,7 +31,7 @@ struct Options : public qpid::Options { Settings& settings; Options(Settings& s) : qpid::Options("HA Options"), settings(s) { addOptions() - ("ha-cluster", optValue(settings.enabled, "yes|no"), + ("ha-cluster", optValue(settings.cluster, "yes|no"), "Join a HA active/passive cluster.") ("ha-brokers", optValue(settings.brokerUrl,"URL"), "URL that backup brokers use to connect and fail over.") @@ -63,11 +63,7 @@ struct HaPlugin : public Plugin { void initialize(Plugin::Target& target) { broker::Broker* broker = dynamic_cast<broker::Broker*>(&target); - if (broker && settings.enabled) { - QPID_LOG(notice, "HA: Enabled"); - haBroker.reset(new ha::HaBroker(*broker, settings)); - } else - QPID_LOG(notice, "HA: Disabled"); + if (broker) haBroker.reset(new ha::HaBroker(*broker, settings)); } }; diff --git a/cpp/src/qpid/ha/Settings.h b/cpp/src/qpid/ha/Settings.h index 52a64c8330..7df18b4ef4 100644 --- a/cpp/src/qpid/ha/Settings.h +++ b/cpp/src/qpid/ha/Settings.h @@ -33,8 +33,8 @@ namespace ha { class Settings { public: - Settings() : enabled(false), expectedBackups(0) {} - bool enabled; + Settings() : cluster(false), expectedBackups(0) {} + bool cluster; // True if we are a cluster member. std::string clientUrl; std::string brokerUrl; size_t expectedBackups; diff --git a/cpp/src/qpid/ha/management-schema.xml b/cpp/src/qpid/ha/management-schema.xml index 05ed5f02ce..9a815b346c 100644 --- a/cpp/src/qpid/ha/management-schema.xml +++ b/cpp/src/qpid/ha/management-schema.xml @@ -47,6 +47,11 @@ <method name="setExpectedBackups" desc="Set number of backups expected"> <arg name="expectedBackups" type="uint16" dir="I"/> </method> + + <method name="replicate" desc="Replicate from a remote queue to the local broker."> + <arg name="broker" type="sstr" dir="I"/> + <arg name="queue" type="sstr" dir="I"/> + </method> </class> </schema> |
