summaryrefslogtreecommitdiff
path: root/cpp/src/qpid
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2012-04-02 22:15:31 +0000
committerAlan Conway <aconway@apache.org>2012-04-02 22:15:31 +0000
commit0f747926548fb6ebaa52b6e463aec2916132671e (patch)
treeaddac105bee0beb801e2a90794071f16c0f6879e /cpp/src/qpid
parent212e544c4a10e6b54e081eb9ba2c9817c57ef356 (diff)
downloadqpid-python-0f747926548fb6ebaa52b6e463aec2916132671e.tar.gz
QPID-3603: Broker option --ha-replicate-default to specify default replication.
Takes values 'all', 'configuration', 'all'. This is the replication level to use if a queue or exchange is created without an explicit 'qpid.replicate' argument. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1308597 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid')
-rw-r--r--cpp/src/qpid/ha/Backup.cpp11
-rw-r--r--cpp/src/qpid/ha/Backup.h4
-rw-r--r--cpp/src/qpid/ha/BrokerReplicator.cpp99
-rw-r--r--cpp/src/qpid/ha/BrokerReplicator.h14
-rw-r--r--cpp/src/qpid/ha/HaBroker.cpp14
-rw-r--r--cpp/src/qpid/ha/HaBroker.h5
-rw-r--r--cpp/src/qpid/ha/HaPlugin.cpp3
-rw-r--r--cpp/src/qpid/ha/QueueReplicator.cpp2
-rw-r--r--cpp/src/qpid/ha/ReplicateLevel.cpp62
-rw-r--r--cpp/src/qpid/ha/ReplicateLevel.h52
-rw-r--r--cpp/src/qpid/ha/Settings.h4
-rw-r--r--cpp/src/qpid/ha/management-schema.xml8
12 files changed, 228 insertions, 50 deletions
diff --git a/cpp/src/qpid/ha/Backup.cpp b/cpp/src/qpid/ha/Backup.cpp
index 3d65e07202..eabf502f8b 100644
--- a/cpp/src/qpid/ha/Backup.cpp
+++ b/cpp/src/qpid/ha/Backup.cpp
@@ -19,10 +19,11 @@
*
*/
#include "Backup.h"
-#include "Settings.h"
#include "BrokerReplicator.h"
-#include "ReplicatingSubscription.h"
#include "ConnectionExcluder.h"
+#include "HaBroker.h"
+#include "ReplicatingSubscription.h"
+#include "Settings.h"
#include "qpid/Url.h"
#include "qpid/amqp_0_10/Codecs.h"
#include "qpid/broker/Bridge.h"
@@ -43,8 +44,8 @@ using namespace broker;
using types::Variant;
using std::string;
-Backup::Backup(broker::Broker& b, const Settings& s) :
- broker(b), settings(s), excluder(new ConnectionExcluder())
+Backup::Backup(HaBroker& hb, const Settings& s) :
+ haBroker(hb), broker(hb.getBroker()), settings(s), excluder(new ConnectionExcluder())
{
// Empty brokerUrl means delay initialization until setUrl() is called.
if (!s.brokerUrl.empty()) initialize(Url(s.brokerUrl));
@@ -63,7 +64,7 @@ void Backup::initialize(const Url& url) {
link = result.first;
link->setUrl(url);
- replicator.reset(new BrokerReplicator(link));
+ replicator.reset(new BrokerReplicator(haBroker, link));
broker.getExchanges().registerExchange(replicator);
broker.getConnectionObservers().add(excluder);
}
diff --git a/cpp/src/qpid/ha/Backup.h b/cpp/src/qpid/ha/Backup.h
index 526b238b82..6c36996914 100644
--- a/cpp/src/qpid/ha/Backup.h
+++ b/cpp/src/qpid/ha/Backup.h
@@ -38,6 +38,7 @@ namespace ha {
class Settings;
class ConnectionExcluder;
class BrokerReplicator;
+class HaBroker;
/**
* State associated with a backup broker. Manages connections to primary.
@@ -47,7 +48,7 @@ class BrokerReplicator;
class Backup
{
public:
- Backup(broker::Broker&, const Settings&);
+ Backup(HaBroker&, const Settings&);
~Backup();
void setBrokerUrl(const Url&);
@@ -55,6 +56,7 @@ class Backup
void initialize(const Url&);
sys::Mutex lock;
+ HaBroker& haBroker;
broker::Broker& broker;
Settings settings;
boost::shared_ptr<broker::Link> link;
diff --git a/cpp/src/qpid/ha/BrokerReplicator.cpp b/cpp/src/qpid/ha/BrokerReplicator.cpp
index 609a3378ad..9d0043a827 100644
--- a/cpp/src/qpid/ha/BrokerReplicator.cpp
+++ b/cpp/src/qpid/ha/BrokerReplicator.cpp
@@ -19,6 +19,7 @@
*
*/
#include "BrokerReplicator.h"
+#include "HaBroker.h"
#include "QueueReplicator.h"
#include "qpid/broker/Broker.h"
#include "qpid/broker/Queue.h"
@@ -37,6 +38,7 @@
#include "qmf/org/apache/qpid/broker/EventQueueDelete.h"
#include "qmf/org/apache/qpid/broker/EventSubscribe.h"
#include <algorithm>
+#include <sstream>
namespace qpid {
namespace ha {
@@ -87,6 +89,7 @@ const string QUEUE("queue");
const string RHOST("rhost");
const string TYPE("type");
const string USER("user");
+const string HA_BROKER("habroker");
const string AGENT_IND_EVENT_ORG_APACHE_QPID_BROKER("agent.ind.event.org_apache_qpid_broker.#");
const string QMF2("qmf2");
@@ -100,6 +103,7 @@ const string _PACKAGE_NAME("_package_name");
const string _SCHEMA_ID("_schema_id");
const string OBJECT("OBJECT");
const string ORG_APACHE_QPID_BROKER("org.apache.qpid.broker");
+const string ORG_APACHE_QPID_HA("org.apache.qpid.ha");
const string QMF_DEFAULT_DIRECT("qmf.default.direct");
const string _QUERY_REQUEST("_query_request");
const string BROKER("broker");
@@ -113,36 +117,13 @@ template <class T> bool match(Variant::Map& schema) {
return T::match(schema[CLASS_NAME], schema[PACKAGE_NAME]);
}
-enum ReplicateLevel { RL_NONE=0, RL_CONFIGURATION, RL_ALL };
-const string S_NONE="none";
-const string S_CONFIGURATION="configuration";
-const string S_ALL="all";
-
-ReplicateLevel replicateLevel(const string& level) {
- if (level == S_NONE) return RL_NONE;
- if (level == S_CONFIGURATION) return RL_CONFIGURATION;
- if (level == S_ALL) return RL_ALL;
- throw Exception("Invalid value for "+QPID_REPLICATE+": "+level);
-}
-
-ReplicateLevel replicateLevel(const framing::FieldTable& f) {
- if (f.isSet(QPID_REPLICATE)) return replicateLevel(f.getAsString(QPID_REPLICATE));
- else return RL_NONE;
-}
-
-ReplicateLevel replicateLevel(const Variant::Map& m) {
- Variant::Map::const_iterator i = m.find(QPID_REPLICATE);
- if (i != m.end()) return replicateLevel(i->second.asString());
- else return RL_NONE;
-}
-
-void sendQuery(const string className, const string& queueName, SessionHandler& sessionHandler) {
+void sendQuery(const string& packageName, const string& className, const string& queueName, SessionHandler& sessionHandler) {
framing::AMQP_ServerProxy peer(sessionHandler.out);
Variant::Map request;
request[_WHAT] = OBJECT;
Variant::Map schema;
schema[_CLASS_NAME] = className;
- schema[_PACKAGE_NAME] = ORG_APACHE_QPID_BROKER;
+ schema[_PACKAGE_NAME] = packageName;
request[_SCHEMA_ID] = schema;
AMQFrame method((MessageTransferBody(ProtocolVersion(), QMF_DEFAULT_DIRECT, 0, 0)));
@@ -181,10 +162,33 @@ Variant::Map asMapVoid(const Variant& value) {
} // namespace
+
+ReplicateLevel BrokerReplicator::replicateLevel(const std::string& str) {
+ ReplicateLevel rl;
+ if (qpid::ha::replicateLevel(str, rl)) return rl;
+ else return haBroker.getSettings().replicateDefault;
+}
+
+ReplicateLevel BrokerReplicator::replicateLevel(const framing::FieldTable& f) {
+ if (f.isSet(QPID_REPLICATE))
+ return replicateLevel(f.getAsString(QPID_REPLICATE));
+ else
+ return haBroker.getSettings().replicateDefault;
+}
+
+ReplicateLevel BrokerReplicator::replicateLevel(const Variant::Map& m) {
+ Variant::Map::const_iterator i = m.find(QPID_REPLICATE);
+ if (i != m.end())
+ return replicateLevel(i->second.asString());
+ else
+ return haBroker.getSettings().replicateDefault;
+}
+
BrokerReplicator::~BrokerReplicator() {}
-BrokerReplicator::BrokerReplicator(const boost::shared_ptr<Link>& l)
- : Exchange(QPID_CONFIGURATION_REPLICATOR), broker(*l->getBroker()), link(l)
+BrokerReplicator::BrokerReplicator(HaBroker& hb, const boost::shared_ptr<Link>& l)
+ : Exchange(QPID_CONFIGURATION_REPLICATOR),
+ haBroker(hb), broker(hb.getBroker()), link(l)
{
QPID_LOG(info, "HA: Backup replicating from " <<
link->getTransport() << ":" << link->getHost() << ":" << link->getPort());
@@ -211,17 +215,21 @@ void BrokerReplicator::initializeBridge(Bridge& bridge, SessionHandler& sessionH
const qmf::org::apache::qpid::broker::ArgsLinkBridge& args(bridge.getArgs());
//declare and bind an event queue
- peer.getQueue().declare(queueName, "", false, false, true, true, FieldTable());
+ FieldTable declareArgs;
+ declareArgs.setString(QPID_REPLICATE, str(RL_NONE));
+ peer.getQueue().declare(queueName, "", false, false, true, true, declareArgs);
peer.getExchange().bind(queueName, QMF_DEFAULT_TOPIC, AGENT_IND_EVENT_ORG_APACHE_QPID_BROKER, FieldTable());
//subscribe to the queue
peer.getMessage().subscribe(queueName, args.i_dest, 1, 0, false, "", 0, FieldTable());
peer.getMessage().flow(args.i_dest, 0, 0xFFFFFFFF);
peer.getMessage().flow(args.i_dest, 1, 0xFFFFFFFF);
- //issue a query request for queues and another for exchanges using event queue as the reply-to address
- sendQuery(QUEUE, queueName, sessionHandler);
- sendQuery(EXCHANGE, queueName, sessionHandler);
- sendQuery(BINDING, queueName, sessionHandler);
+ // Issue a query request for queues, exchanges, bindings and the habroker
+ // using event queue as the reply-to address
+ sendQuery(ORG_APACHE_QPID_HA, HA_BROKER, queueName, sessionHandler);
+ sendQuery(ORG_APACHE_QPID_BROKER, QUEUE, queueName, sessionHandler);
+ sendQuery(ORG_APACHE_QPID_BROKER, EXCHANGE, queueName, sessionHandler);
+ sendQuery(ORG_APACHE_QPID_BROKER, BINDING, queueName, sessionHandler);
QPID_LOG(debug, "HA: Backup activated configuration bridge: " << queueName);
}
@@ -257,6 +265,7 @@ void BrokerReplicator::route(Deliverable& msg) {
if (type == QUEUE) doResponseQueue(values);
else if (type == EXCHANGE) doResponseExchange(values);
else if (type == BINDING) doResponseBind(values);
+ else if (type == HA_BROKER) doResponseHaBroker(values);
else QPID_LOG(error, "HA: Backup received unknown response type=" << type
<< " values=" << values);
}
@@ -288,7 +297,6 @@ void BrokerReplicator::doEventQueueDeclare(Variant::Map& values) {
// re-create from event.
// Events are always up to date, whereas responses may be
// out of date.
- QPID_LOG(debug, "HA: Backup created queue: " << name);
startQueueReplicator(result.first);
} else {
// FIXME aconway 2011-12-02: what's the right way to handle this?
@@ -400,7 +408,6 @@ void BrokerReplicator::doEventUnbind(Variant::Map& values) {
void BrokerReplicator::doResponseQueue(Variant::Map& values) {
QPID_LOG(debug, "HA: Backup queue response " << values);
- // FIXME aconway 2011-11-22: more flexible ways & defaults to indicate replication
Variant::Map argsMap(asMapVoid(values[ARGUMENTS]));
if (!replicateLevel(argsMap)) return;
framing::FieldTable args;
@@ -417,7 +424,6 @@ void BrokerReplicator::doResponseQueue(Variant::Map& values) {
""/*TODO: who is the user?*/,
""/*TODO: what should we use as connection id?*/);
if (result.second) {
- QPID_LOG(debug, "HA: Backup created catch-up queue: " << values[NAME]);
startQueueReplicator(result.first);
} else {
// FIXME aconway 2011-11-22: Normal to find queue already
@@ -474,7 +480,6 @@ void BrokerReplicator::doResponseBind(Variant::Map& values) {
std::string qName = getRefName(QUEUE_REF_PREFIX, values[QUEUE_REF]);
boost::shared_ptr<Exchange> exchange = broker.getExchanges().find(exName);
boost::shared_ptr<Queue> queue = broker.getQueues().find(qName);
- // FIXME aconway 2011-11-24: more flexible configuration for binding replication.
// Automatically replicate binding if queue and exchange exist and are replicated
if (exchange && replicateLevel(exchange->getArgs()) &&
@@ -490,6 +495,28 @@ void BrokerReplicator::doResponseBind(Variant::Map& values) {
}
}
+namespace {
+const string REPLICATE_DEFAULT="replicateDefault";
+}
+
+// Received the ha-broker configuration object for the primary broker.
+void BrokerReplicator::doResponseHaBroker(Variant::Map& values) {
+ try {
+ ReplicateLevel mine = haBroker.getSettings().replicateDefault;
+ ReplicateLevel primary = replicateLevel(values[REPLICATE_DEFAULT].asString());
+ if (mine != primary) {
+ std::ostringstream os;
+ os << "Replicate default on backup (" << mine
+ << ") does not match primary (" << primary << ")";
+ haBroker.shutdown(os.str());
+ }
+ } catch (const std::exception& e) {
+ std::ostringstream os;
+ os << "Received invalid replicate default from primary: " << e.what();
+ haBroker.shutdown(os.str());
+ }
+}
+
void BrokerReplicator::startQueueReplicator(const boost::shared_ptr<Queue>& queue) {
if (replicateLevel(queue->getSettings()) == RL_ALL) {
boost::shared_ptr<QueueReplicator> qr(new QueueReplicator(queue, link));
diff --git a/cpp/src/qpid/ha/BrokerReplicator.h b/cpp/src/qpid/ha/BrokerReplicator.h
index 483c251126..c9d7b9f74c 100644
--- a/cpp/src/qpid/ha/BrokerReplicator.h
+++ b/cpp/src/qpid/ha/BrokerReplicator.h
@@ -22,6 +22,7 @@
*
*/
+#include "ReplicateLevel.h"
#include "qpid/broker/Exchange.h"
#include "qpid/types/Variant.h"
#include <boost/shared_ptr.hpp>
@@ -35,7 +36,12 @@ class Bridge;
class SessionHandler;
}
+namespace framing {
+class FieldTable;
+}
+
namespace ha {
+class HaBroker;
/**
* Replicate configuration on a backup broker.
@@ -51,7 +57,7 @@ namespace ha {
class BrokerReplicator : public broker::Exchange
{
public:
- BrokerReplicator(const boost::shared_ptr<broker::Link>&);
+ BrokerReplicator(HaBroker&, const boost::shared_ptr<broker::Link>&);
~BrokerReplicator();
std::string getType() const;
@@ -64,6 +70,10 @@ class BrokerReplicator : public broker::Exchange
private:
void initializeBridge(broker::Bridge&, broker::SessionHandler&);
+ ReplicateLevel replicateLevel(const std::string&);
+ ReplicateLevel replicateLevel(const framing::FieldTable& args);
+ ReplicateLevel replicateLevel(const types::Variant::Map& args);
+
void doEventQueueDeclare(types::Variant::Map& values);
void doEventQueueDelete(types::Variant::Map& values);
void doEventExchangeDeclare(types::Variant::Map& values);
@@ -74,9 +84,11 @@ class BrokerReplicator : public broker::Exchange
void doResponseQueue(types::Variant::Map& values);
void doResponseExchange(types::Variant::Map& values);
void doResponseBind(types::Variant::Map& values);
+ void doResponseHaBroker(types::Variant::Map& values);
void startQueueReplicator(const boost::shared_ptr<broker::Queue>&);
+ HaBroker& haBroker;
broker::Broker& broker;
boost::shared_ptr<broker::Link> link;
};
diff --git a/cpp/src/qpid/ha/HaBroker.cpp b/cpp/src/qpid/ha/HaBroker.cpp
index ad6719f207..39a01ddf6b 100644
--- a/cpp/src/qpid/ha/HaBroker.cpp
+++ b/cpp/src/qpid/ha/HaBroker.cpp
@@ -27,6 +27,7 @@
#include "qpid/broker/Broker.h"
#include "qpid/broker/Link.h"
#include "qpid/broker/Queue.h"
+#include "qpid/broker/SignalHandler.h"
#include "qpid/management/ManagementAgent.h"
#include "qmf/org/apache/qpid/ha/Package.h"
#include "qmf/org/apache/qpid/ha/ArgsHaBrokerReplicate.h"
@@ -69,16 +70,18 @@ HaBroker::HaBroker(broker::Broker& b, const Settings& s)
throw Exception("Cannot start HA: management is disabled");
_qmf::Package packageInit(ma);
mgmtObject = new _qmf::HaBroker(ma, this, "ha-broker");
- // FIXME aconway 2012-03-01: should start in catch-up state and move to backup
- // only when caught up.
mgmtObject->set_status(BACKUP);
+ mgmtObject->set_replicateDefault(str(settings.replicateDefault));
ma->addObject(mgmtObject);
+
+ // NOTE: lock is not needed in a constructor but we created it just to pass
+ // to the set functions.
sys::Mutex::ScopedLock l(lock);
if (!settings.clientUrl.empty()) setClientUrl(Url(settings.clientUrl), l);
if (!settings.brokerUrl.empty()) setBrokerUrl(Url(settings.brokerUrl), l);
// If we are in a cluster, we start in backup mode.
- if (settings.cluster) backup.reset(new Backup(b, s));
+ if (settings.cluster) backup.reset(new Backup(*this, s));
}
HaBroker::~HaBroker() {}
@@ -169,4 +172,9 @@ std::vector<Url> HaBroker::getKnownBrokers() const {
return knownBrokers;
}
+void HaBroker::shutdown(const std::string& message) {
+ QPID_LOG(critical, "Shutting down: " << message);
+ broker::SignalHandler::shutdown();
+}
+
}} // namespace qpid::ha
diff --git a/cpp/src/qpid/ha/HaBroker.h b/cpp/src/qpid/ha/HaBroker.h
index 4f4ee4c944..65ad3de4a0 100644
--- a/cpp/src/qpid/ha/HaBroker.h
+++ b/cpp/src/qpid/ha/HaBroker.h
@@ -52,6 +52,11 @@ class HaBroker : public management::Manageable
management::Manageable::status_t ManagementMethod (
uint32_t methodId, management::Args& args, std::string& text);
+ broker::Broker& getBroker() { return broker; }
+ const Settings& getSettings() const { return settings; }
+
+ // Log a critical error message and shut down the broker.
+ void shutdown(const std::string& message);
private:
void setClientUrl(const Url&, const sys::Mutex::ScopedLock&);
void setBrokerUrl(const Url&, const sys::Mutex::ScopedLock&);
diff --git a/cpp/src/qpid/ha/HaPlugin.cpp b/cpp/src/qpid/ha/HaPlugin.cpp
index 6a43b591b0..419ee962da 100644
--- a/cpp/src/qpid/ha/HaPlugin.cpp
+++ b/cpp/src/qpid/ha/HaPlugin.cpp
@@ -37,6 +37,9 @@ struct Options : public qpid::Options {
"URL that backup brokers use to connect and fail over.")
("ha-public-brokers", optValue(settings.clientUrl,"URL"),
"URL that clients use to connect and fail over, defaults to ha-brokers.")
+ ("ha-replicate-default",
+ optValue(settings.replicateDefault, "LEVEL"),
+ "Replication level for creating queues and exchanges if there is no qpid.replicate argument supplied. LEVEL is 'none', 'configuration' or 'all'")
("ha-expected-backups", optValue(settings.expectedBackups, "N"),
"Number of backups expected to be active in the HA cluster.")
("ha-username", optValue(settings.username, "USER"),
diff --git a/cpp/src/qpid/ha/QueueReplicator.cpp b/cpp/src/qpid/ha/QueueReplicator.cpp
index 6aff4879e3..83f3d28b6d 100644
--- a/cpp/src/qpid/ha/QueueReplicator.cpp
+++ b/cpp/src/qpid/ha/QueueReplicator.cpp
@@ -53,7 +53,7 @@ std::string QueueReplicator::replicatorName(const std::string& queueName) {
QueueReplicator::QueueReplicator(boost::shared_ptr<Queue> q, boost::shared_ptr<Link> l)
: Exchange(replicatorName(q->getName()), 0, q->getBroker()), queue(q), link(l)
{
- logPrefix = "HA: Backup " + queue->getName() + ": ";
+ logPrefix = "HA: Backup of queue " + queue->getName() + ": ";
QPID_LOG(info, logPrefix << "Created, settings: " << q->getSettings());
}
diff --git a/cpp/src/qpid/ha/ReplicateLevel.cpp b/cpp/src/qpid/ha/ReplicateLevel.cpp
new file mode 100644
index 0000000000..48f21a1f66
--- /dev/null
+++ b/cpp/src/qpid/ha/ReplicateLevel.cpp
@@ -0,0 +1,62 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "ReplicateLevel.h"
+#include "qpid/Exception.h"
+#include <iostream>
+
+namespace qpid {
+namespace ha {
+
+using namespace std;
+
+namespace {
+const string S_NONE="none";
+const string S_CONFIGURATION="configuration";
+const string S_ALL="all";
+
+string names[] = { S_NONE, S_CONFIGURATION, S_ALL };
+}
+
+bool replicateLevel(const string& level, ReplicateLevel& out) {
+ if (level == S_NONE) { out = RL_NONE; return true; }
+ if (level == S_CONFIGURATION) { out = RL_CONFIGURATION; return true; }
+ if (level == S_ALL) { out = RL_ALL; return true; }
+ return false;
+}
+
+ReplicateLevel replicateLevel(const string& level) {
+ ReplicateLevel rl;
+ if (!replicateLevel(level, rl))
+ throw Exception("Invalid value for replication level: "+level);
+ return rl;
+}
+
+string str(ReplicateLevel l) { return names[l]; }
+
+ostream& operator<<(ostream& o, ReplicateLevel rl) { return o << str(rl); }
+istream& operator>>(istream& i, ReplicateLevel& rl) {
+ string str;
+ i >> str;
+ rl = replicateLevel(str);
+ return i;
+}
+
+}} // namespace qpid::ha
diff --git a/cpp/src/qpid/ha/ReplicateLevel.h b/cpp/src/qpid/ha/ReplicateLevel.h
new file mode 100644
index 0000000000..c11e03f0ce
--- /dev/null
+++ b/cpp/src/qpid/ha/ReplicateLevel.h
@@ -0,0 +1,52 @@
+#ifndef QPID_HA_REPLICATELEVEL_H
+#define QPID_HA_REPLICATELEVEL_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <string>
+#include <iosfwd>
+
+namespace qpid {
+namespace ha {
+
+enum ReplicateLevel { RL_NONE, RL_CONFIGURATION, RL_ALL };
+
+/**
+ * If str is a valid replicate level, set out and return true.
+ */
+bool replicateLevel(const std::string& str, ReplicateLevel& out);
+
+/**
+ *@return enum corresponding to string level.
+ *@throw qpid::Exception if level is not a valid replication level.
+ */
+ReplicateLevel replicateLevel(const std::string& level);
+
+/**@return string form of replicate level */
+std::string str(ReplicateLevel l);
+
+std::ostream& operator<<(std::ostream&, ReplicateLevel);
+std::istream& operator>>(std::istream&, ReplicateLevel&);
+
+}} // namespaces qpid::ha
+
+#endif /*!QPID_HA_REPLICATELEVEL_H*/
diff --git a/cpp/src/qpid/ha/Settings.h b/cpp/src/qpid/ha/Settings.h
index 7df18b4ef4..bf70c3f3f7 100644
--- a/cpp/src/qpid/ha/Settings.h
+++ b/cpp/src/qpid/ha/Settings.h
@@ -22,6 +22,7 @@
*
*/
+#include "ReplicateLevel.h"
#include <string>
namespace qpid {
@@ -33,11 +34,12 @@ namespace ha {
class Settings
{
public:
- Settings() : cluster(false), expectedBackups(0) {}
+ Settings() : cluster(false), expectedBackups(0), replicateDefault(RL_NONE) {}
bool cluster; // True if we are a cluster member.
std::string clientUrl;
std::string brokerUrl;
size_t expectedBackups;
+ ReplicateLevel replicateDefault;
std::string username, password, mechanism;
private:
};
diff --git a/cpp/src/qpid/ha/management-schema.xml b/cpp/src/qpid/ha/management-schema.xml
index 9a815b346c..363dff61fb 100644
--- a/cpp/src/qpid/ha/management-schema.xml
+++ b/cpp/src/qpid/ha/management-schema.xml
@@ -32,7 +32,11 @@
desc="Multiple-address URL used by clients to connect to the HA brokers."/>
<property name="expectedBackups" type="uint16"
- desc="Number of HA backup brokers expected."/>>
+ desc="Number of HA backup brokers expected."/>
+
+ <property
+ name="replicateDefault" type="sstr"
+ desc="Replicate value for queues/exchanges without a qpid.replicate argument"/>
<method name="promote" desc="Promote a backup broker to primary."/>
@@ -48,7 +52,7 @@
<arg name="expectedBackups" type="uint16" dir="I"/>
</method>
- <method name="replicate" desc="Replicate from a remote queue to the local broker.">
+ <method name="replicate" desc="Replicate individual queue from remote broker.">
<arg name="broker" type="sstr" dir="I"/>
<arg name="queue" type="sstr" dir="I"/>
</method>