summaryrefslogtreecommitdiff
path: root/cpp/src/qpid
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2012-02-29 23:38:00 +0000
committerAlan Conway <aconway@apache.org>2012-02-29 23:38:00 +0000
commit9da195e712d38ef250b698fc394fef8ab6a936a5 (patch)
treea3801773e75304bdfb8d554b044f4d4e1aa09324 /cpp/src/qpid
parent316f2a9a92780a323b7b2fcbcb695883651b370f (diff)
downloadqpid-python-9da195e712d38ef250b698fc394fef8ab6a936a5.tar.gz
QPID-3603: HA support for stand-alone replication.
- New management method HaBroker.replicate to enable replication. - qpid-ha tool can enable replication of queues. - qpid-config tool can create queues with replication enabled. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1295339 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid')
-rw-r--r--cpp/src/qpid/broker/Link.cpp3
-rw-r--r--cpp/src/qpid/ha/HaBroker.cpp33
-rw-r--r--cpp/src/qpid/ha/HaPlugin.cpp8
-rw-r--r--cpp/src/qpid/ha/Settings.h4
-rw-r--r--cpp/src/qpid/ha/management-schema.xml5
5 files changed, 38 insertions, 15 deletions
diff --git a/cpp/src/qpid/broker/Link.cpp b/cpp/src/qpid/broker/Link.cpp
index 4af1e6d6bd..56a90e7fb7 100644
--- a/cpp/src/qpid/broker/Link.cpp
+++ b/cpp/src/qpid/broker/Link.cpp
@@ -206,11 +206,9 @@ void Link::closed(int, std::string text)
QPID_LOG (info, "Inter-broker link disconnected from " << host << ":" << port << " " << text);
connection = 0;
-
if (state == STATE_OPERATIONAL) {
stringstream addr;
addr << host << ":" << port;
- QPID_LOG(warning, "Inter-broker link disconnected from " << addr.str());
if (!hideManagement() && agent)
agent->raiseEvent(_qmf::EventBrokerLinkDown(addr.str()));
}
@@ -405,7 +403,6 @@ uint Link::nextChannel()
void Link::notifyConnectionForced(const string text)
{
Mutex::ScopedLock mutex(lock);
-
setStateLH(STATE_FAILED);
if (!hideManagement())
mgmtObject->set_lastError(text);
diff --git a/cpp/src/qpid/ha/HaBroker.cpp b/cpp/src/qpid/ha/HaBroker.cpp
index d92749abeb..f909aca44f 100644
--- a/cpp/src/qpid/ha/HaBroker.cpp
+++ b/cpp/src/qpid/ha/HaBroker.cpp
@@ -25,8 +25,11 @@
#include "ReplicatingSubscription.h"
#include "qpid/Exception.h"
#include "qpid/broker/Broker.h"
+#include "qpid/broker/Link.h"
+#include "qpid/broker/Queue.h"
#include "qpid/management/ManagementAgent.h"
#include "qmf/org/apache/qpid/ha/Package.h"
+#include "qmf/org/apache/qpid/ha/ArgsHaBrokerReplicate.h"
#include "qmf/org/apache/qpid/ha/ArgsHaBrokerSetBrokers.h"
#include "qmf/org/apache/qpid/ha/ArgsHaBrokerSetPublicBrokers.h"
#include "qmf/org/apache/qpid/ha/ArgsHaBrokerSetExpectedBackups.h"
@@ -50,7 +53,6 @@ const std::string BACKUP="backup";
HaBroker::HaBroker(broker::Broker& b, const Settings& s)
: broker(b),
settings(s),
- backup(new Backup(b, s)),
mgmtObject(0)
{
// Register a factory for replicating subscriptions.
@@ -72,6 +74,9 @@ HaBroker::HaBroker(broker::Broker& b, const Settings& s)
sys::Mutex::ScopedLock l(lock);
if (!settings.clientUrl.empty()) setClientUrl(Url(settings.clientUrl), l);
if (!settings.brokerUrl.empty()) setBrokerUrl(Url(settings.brokerUrl), l);
+
+ // If we are in a cluster, we start in backup mode.
+ if (settings.cluster) backup.reset(new Backup(b, s));
}
HaBroker::~HaBroker() {}
@@ -81,8 +86,8 @@ Manageable::status_t HaBroker::ManagementMethod (uint32_t methodId, Args& args,
switch (methodId) {
case _qmf::HaBroker::METHOD_PROMOTE: {
if (backup.get()) { // I am a backup
- // FIXME aconway 2012-01-26: create primary state before resetting backup
- // as that allows client connections.
+ // NOTE: resetting backup allows client connections, so any
+ // primary state should be set up here before backup.reset()
backup.reset();
QPID_LOG(notice, "HA: Primary promoted from backup");
mgmtObject->set_status(PRIMARY);
@@ -100,7 +105,27 @@ Manageable::status_t HaBroker::ManagementMethod (uint32_t methodId, Args& args,
case _qmf::HaBroker::METHOD_SETEXPECTEDBACKUPS: {
setExpectedBackups(dynamic_cast<_qmf::ArgsHaBrokerSetExpectedBackups&>(args).i_expectedBackups, l);
break;
- }
+ }
+ case _qmf::HaBroker::METHOD_REPLICATE: {
+ _qmf::ArgsHaBrokerReplicate& bq_args =
+ dynamic_cast<_qmf::ArgsHaBrokerReplicate&>(args);
+ QPID_LOG(debug, "HA replicating individual queue "<< bq_args.i_queue << " from " << bq_args.i_broker);
+
+ boost::shared_ptr<broker::Queue> queue = broker.getQueues().get(bq_args.i_queue);
+ Url url(bq_args.i_broker);
+ string protocol = url[0].protocol.empty() ? "tcp" : url[0].protocol;
+ std::pair<broker::Link::shared_ptr, bool> result = broker.getLinks().declare(
+ url[0].host, url[0].port, protocol,
+ false, // durable
+ settings.mechanism, settings.username, settings.password);
+ boost::shared_ptr<broker::Link> link = result.first;
+ link->setUrl(url);
+ // Create a queue replicator
+ boost::shared_ptr<QueueReplicator> qr(new QueueReplicator(queue, link));
+ broker.getExchanges().registerExchange(qr);
+ qr->activate();
+ break;
+ }
default:
return Manageable::STATUS_UNKNOWN_METHOD;
diff --git a/cpp/src/qpid/ha/HaPlugin.cpp b/cpp/src/qpid/ha/HaPlugin.cpp
index b3080330fb..6a43b591b0 100644
--- a/cpp/src/qpid/ha/HaPlugin.cpp
+++ b/cpp/src/qpid/ha/HaPlugin.cpp
@@ -31,7 +31,7 @@ struct Options : public qpid::Options {
Settings& settings;
Options(Settings& s) : qpid::Options("HA Options"), settings(s) {
addOptions()
- ("ha-cluster", optValue(settings.enabled, "yes|no"),
+ ("ha-cluster", optValue(settings.cluster, "yes|no"),
"Join a HA active/passive cluster.")
("ha-brokers", optValue(settings.brokerUrl,"URL"),
"URL that backup brokers use to connect and fail over.")
@@ -63,11 +63,7 @@ struct HaPlugin : public Plugin {
void initialize(Plugin::Target& target) {
broker::Broker* broker = dynamic_cast<broker::Broker*>(&target);
- if (broker && settings.enabled) {
- QPID_LOG(notice, "HA: Enabled");
- haBroker.reset(new ha::HaBroker(*broker, settings));
- } else
- QPID_LOG(notice, "HA: Disabled");
+ if (broker) haBroker.reset(new ha::HaBroker(*broker, settings));
}
};
diff --git a/cpp/src/qpid/ha/Settings.h b/cpp/src/qpid/ha/Settings.h
index 52a64c8330..7df18b4ef4 100644
--- a/cpp/src/qpid/ha/Settings.h
+++ b/cpp/src/qpid/ha/Settings.h
@@ -33,8 +33,8 @@ namespace ha {
class Settings
{
public:
- Settings() : enabled(false), expectedBackups(0) {}
- bool enabled;
+ Settings() : cluster(false), expectedBackups(0) {}
+ bool cluster; // True if we are a cluster member.
std::string clientUrl;
std::string brokerUrl;
size_t expectedBackups;
diff --git a/cpp/src/qpid/ha/management-schema.xml b/cpp/src/qpid/ha/management-schema.xml
index 05ed5f02ce..9a815b346c 100644
--- a/cpp/src/qpid/ha/management-schema.xml
+++ b/cpp/src/qpid/ha/management-schema.xml
@@ -47,6 +47,11 @@
<method name="setExpectedBackups" desc="Set number of backups expected">
<arg name="expectedBackups" type="uint16" dir="I"/>
</method>
+
+ <method name="replicate" desc="Replicate from a remote queue to the local broker.">
+ <arg name="broker" type="sstr" dir="I"/>
+ <arg name="queue" type="sstr" dir="I"/>
+ </method>
</class>
</schema>