diff options
| author | Alan Conway <aconway@apache.org> | 2012-04-02 22:15:31 +0000 |
|---|---|---|
| committer | Alan Conway <aconway@apache.org> | 2012-04-02 22:15:31 +0000 |
| commit | 0f747926548fb6ebaa52b6e463aec2916132671e (patch) | |
| tree | addac105bee0beb801e2a90794071f16c0f6879e /cpp/src/qpid | |
| parent | 212e544c4a10e6b54e081eb9ba2c9817c57ef356 (diff) | |
| download | qpid-python-0f747926548fb6ebaa52b6e463aec2916132671e.tar.gz | |
QPID-3603: Broker option --ha-replicate-default to specify default replication.
Takes values 'all', 'configuration', 'all'. This is the replication level to use
if a queue or exchange is created without an explicit 'qpid.replicate' argument.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1308597 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid')
| -rw-r--r-- | cpp/src/qpid/ha/Backup.cpp | 11 | ||||
| -rw-r--r-- | cpp/src/qpid/ha/Backup.h | 4 | ||||
| -rw-r--r-- | cpp/src/qpid/ha/BrokerReplicator.cpp | 99 | ||||
| -rw-r--r-- | cpp/src/qpid/ha/BrokerReplicator.h | 14 | ||||
| -rw-r--r-- | cpp/src/qpid/ha/HaBroker.cpp | 14 | ||||
| -rw-r--r-- | cpp/src/qpid/ha/HaBroker.h | 5 | ||||
| -rw-r--r-- | cpp/src/qpid/ha/HaPlugin.cpp | 3 | ||||
| -rw-r--r-- | cpp/src/qpid/ha/QueueReplicator.cpp | 2 | ||||
| -rw-r--r-- | cpp/src/qpid/ha/ReplicateLevel.cpp | 62 | ||||
| -rw-r--r-- | cpp/src/qpid/ha/ReplicateLevel.h | 52 | ||||
| -rw-r--r-- | cpp/src/qpid/ha/Settings.h | 4 | ||||
| -rw-r--r-- | cpp/src/qpid/ha/management-schema.xml | 8 |
12 files changed, 228 insertions, 50 deletions
diff --git a/cpp/src/qpid/ha/Backup.cpp b/cpp/src/qpid/ha/Backup.cpp index 3d65e07202..eabf502f8b 100644 --- a/cpp/src/qpid/ha/Backup.cpp +++ b/cpp/src/qpid/ha/Backup.cpp @@ -19,10 +19,11 @@ * */ #include "Backup.h" -#include "Settings.h" #include "BrokerReplicator.h" -#include "ReplicatingSubscription.h" #include "ConnectionExcluder.h" +#include "HaBroker.h" +#include "ReplicatingSubscription.h" +#include "Settings.h" #include "qpid/Url.h" #include "qpid/amqp_0_10/Codecs.h" #include "qpid/broker/Bridge.h" @@ -43,8 +44,8 @@ using namespace broker; using types::Variant; using std::string; -Backup::Backup(broker::Broker& b, const Settings& s) : - broker(b), settings(s), excluder(new ConnectionExcluder()) +Backup::Backup(HaBroker& hb, const Settings& s) : + haBroker(hb), broker(hb.getBroker()), settings(s), excluder(new ConnectionExcluder()) { // Empty brokerUrl means delay initialization until setUrl() is called. if (!s.brokerUrl.empty()) initialize(Url(s.brokerUrl)); @@ -63,7 +64,7 @@ void Backup::initialize(const Url& url) { link = result.first; link->setUrl(url); - replicator.reset(new BrokerReplicator(link)); + replicator.reset(new BrokerReplicator(haBroker, link)); broker.getExchanges().registerExchange(replicator); broker.getConnectionObservers().add(excluder); } diff --git a/cpp/src/qpid/ha/Backup.h b/cpp/src/qpid/ha/Backup.h index 526b238b82..6c36996914 100644 --- a/cpp/src/qpid/ha/Backup.h +++ b/cpp/src/qpid/ha/Backup.h @@ -38,6 +38,7 @@ namespace ha { class Settings; class ConnectionExcluder; class BrokerReplicator; +class HaBroker; /** * State associated with a backup broker. Manages connections to primary. @@ -47,7 +48,7 @@ class BrokerReplicator; class Backup { public: - Backup(broker::Broker&, const Settings&); + Backup(HaBroker&, const Settings&); ~Backup(); void setBrokerUrl(const Url&); @@ -55,6 +56,7 @@ class Backup void initialize(const Url&); sys::Mutex lock; + HaBroker& haBroker; broker::Broker& broker; Settings settings; boost::shared_ptr<broker::Link> link; diff --git a/cpp/src/qpid/ha/BrokerReplicator.cpp b/cpp/src/qpid/ha/BrokerReplicator.cpp index 609a3378ad..9d0043a827 100644 --- a/cpp/src/qpid/ha/BrokerReplicator.cpp +++ b/cpp/src/qpid/ha/BrokerReplicator.cpp @@ -19,6 +19,7 @@ * */ #include "BrokerReplicator.h" +#include "HaBroker.h" #include "QueueReplicator.h" #include "qpid/broker/Broker.h" #include "qpid/broker/Queue.h" @@ -37,6 +38,7 @@ #include "qmf/org/apache/qpid/broker/EventQueueDelete.h" #include "qmf/org/apache/qpid/broker/EventSubscribe.h" #include <algorithm> +#include <sstream> namespace qpid { namespace ha { @@ -87,6 +89,7 @@ const string QUEUE("queue"); const string RHOST("rhost"); const string TYPE("type"); const string USER("user"); +const string HA_BROKER("habroker"); const string AGENT_IND_EVENT_ORG_APACHE_QPID_BROKER("agent.ind.event.org_apache_qpid_broker.#"); const string QMF2("qmf2"); @@ -100,6 +103,7 @@ const string _PACKAGE_NAME("_package_name"); const string _SCHEMA_ID("_schema_id"); const string OBJECT("OBJECT"); const string ORG_APACHE_QPID_BROKER("org.apache.qpid.broker"); +const string ORG_APACHE_QPID_HA("org.apache.qpid.ha"); const string QMF_DEFAULT_DIRECT("qmf.default.direct"); const string _QUERY_REQUEST("_query_request"); const string BROKER("broker"); @@ -113,36 +117,13 @@ template <class T> bool match(Variant::Map& schema) { return T::match(schema[CLASS_NAME], schema[PACKAGE_NAME]); } -enum ReplicateLevel { RL_NONE=0, RL_CONFIGURATION, RL_ALL }; -const string S_NONE="none"; -const string S_CONFIGURATION="configuration"; -const string S_ALL="all"; - -ReplicateLevel replicateLevel(const string& level) { - if (level == S_NONE) return RL_NONE; - if (level == S_CONFIGURATION) return RL_CONFIGURATION; - if (level == S_ALL) return RL_ALL; - throw Exception("Invalid value for "+QPID_REPLICATE+": "+level); -} - -ReplicateLevel replicateLevel(const framing::FieldTable& f) { - if (f.isSet(QPID_REPLICATE)) return replicateLevel(f.getAsString(QPID_REPLICATE)); - else return RL_NONE; -} - -ReplicateLevel replicateLevel(const Variant::Map& m) { - Variant::Map::const_iterator i = m.find(QPID_REPLICATE); - if (i != m.end()) return replicateLevel(i->second.asString()); - else return RL_NONE; -} - -void sendQuery(const string className, const string& queueName, SessionHandler& sessionHandler) { +void sendQuery(const string& packageName, const string& className, const string& queueName, SessionHandler& sessionHandler) { framing::AMQP_ServerProxy peer(sessionHandler.out); Variant::Map request; request[_WHAT] = OBJECT; Variant::Map schema; schema[_CLASS_NAME] = className; - schema[_PACKAGE_NAME] = ORG_APACHE_QPID_BROKER; + schema[_PACKAGE_NAME] = packageName; request[_SCHEMA_ID] = schema; AMQFrame method((MessageTransferBody(ProtocolVersion(), QMF_DEFAULT_DIRECT, 0, 0))); @@ -181,10 +162,33 @@ Variant::Map asMapVoid(const Variant& value) { } // namespace + +ReplicateLevel BrokerReplicator::replicateLevel(const std::string& str) { + ReplicateLevel rl; + if (qpid::ha::replicateLevel(str, rl)) return rl; + else return haBroker.getSettings().replicateDefault; +} + +ReplicateLevel BrokerReplicator::replicateLevel(const framing::FieldTable& f) { + if (f.isSet(QPID_REPLICATE)) + return replicateLevel(f.getAsString(QPID_REPLICATE)); + else + return haBroker.getSettings().replicateDefault; +} + +ReplicateLevel BrokerReplicator::replicateLevel(const Variant::Map& m) { + Variant::Map::const_iterator i = m.find(QPID_REPLICATE); + if (i != m.end()) + return replicateLevel(i->second.asString()); + else + return haBroker.getSettings().replicateDefault; +} + BrokerReplicator::~BrokerReplicator() {} -BrokerReplicator::BrokerReplicator(const boost::shared_ptr<Link>& l) - : Exchange(QPID_CONFIGURATION_REPLICATOR), broker(*l->getBroker()), link(l) +BrokerReplicator::BrokerReplicator(HaBroker& hb, const boost::shared_ptr<Link>& l) + : Exchange(QPID_CONFIGURATION_REPLICATOR), + haBroker(hb), broker(hb.getBroker()), link(l) { QPID_LOG(info, "HA: Backup replicating from " << link->getTransport() << ":" << link->getHost() << ":" << link->getPort()); @@ -211,17 +215,21 @@ void BrokerReplicator::initializeBridge(Bridge& bridge, SessionHandler& sessionH const qmf::org::apache::qpid::broker::ArgsLinkBridge& args(bridge.getArgs()); //declare and bind an event queue - peer.getQueue().declare(queueName, "", false, false, true, true, FieldTable()); + FieldTable declareArgs; + declareArgs.setString(QPID_REPLICATE, str(RL_NONE)); + peer.getQueue().declare(queueName, "", false, false, true, true, declareArgs); peer.getExchange().bind(queueName, QMF_DEFAULT_TOPIC, AGENT_IND_EVENT_ORG_APACHE_QPID_BROKER, FieldTable()); //subscribe to the queue peer.getMessage().subscribe(queueName, args.i_dest, 1, 0, false, "", 0, FieldTable()); peer.getMessage().flow(args.i_dest, 0, 0xFFFFFFFF); peer.getMessage().flow(args.i_dest, 1, 0xFFFFFFFF); - //issue a query request for queues and another for exchanges using event queue as the reply-to address - sendQuery(QUEUE, queueName, sessionHandler); - sendQuery(EXCHANGE, queueName, sessionHandler); - sendQuery(BINDING, queueName, sessionHandler); + // Issue a query request for queues, exchanges, bindings and the habroker + // using event queue as the reply-to address + sendQuery(ORG_APACHE_QPID_HA, HA_BROKER, queueName, sessionHandler); + sendQuery(ORG_APACHE_QPID_BROKER, QUEUE, queueName, sessionHandler); + sendQuery(ORG_APACHE_QPID_BROKER, EXCHANGE, queueName, sessionHandler); + sendQuery(ORG_APACHE_QPID_BROKER, BINDING, queueName, sessionHandler); QPID_LOG(debug, "HA: Backup activated configuration bridge: " << queueName); } @@ -257,6 +265,7 @@ void BrokerReplicator::route(Deliverable& msg) { if (type == QUEUE) doResponseQueue(values); else if (type == EXCHANGE) doResponseExchange(values); else if (type == BINDING) doResponseBind(values); + else if (type == HA_BROKER) doResponseHaBroker(values); else QPID_LOG(error, "HA: Backup received unknown response type=" << type << " values=" << values); } @@ -288,7 +297,6 @@ void BrokerReplicator::doEventQueueDeclare(Variant::Map& values) { // re-create from event. // Events are always up to date, whereas responses may be // out of date. - QPID_LOG(debug, "HA: Backup created queue: " << name); startQueueReplicator(result.first); } else { // FIXME aconway 2011-12-02: what's the right way to handle this? @@ -400,7 +408,6 @@ void BrokerReplicator::doEventUnbind(Variant::Map& values) { void BrokerReplicator::doResponseQueue(Variant::Map& values) { QPID_LOG(debug, "HA: Backup queue response " << values); - // FIXME aconway 2011-11-22: more flexible ways & defaults to indicate replication Variant::Map argsMap(asMapVoid(values[ARGUMENTS])); if (!replicateLevel(argsMap)) return; framing::FieldTable args; @@ -417,7 +424,6 @@ void BrokerReplicator::doResponseQueue(Variant::Map& values) { ""/*TODO: who is the user?*/, ""/*TODO: what should we use as connection id?*/); if (result.second) { - QPID_LOG(debug, "HA: Backup created catch-up queue: " << values[NAME]); startQueueReplicator(result.first); } else { // FIXME aconway 2011-11-22: Normal to find queue already @@ -474,7 +480,6 @@ void BrokerReplicator::doResponseBind(Variant::Map& values) { std::string qName = getRefName(QUEUE_REF_PREFIX, values[QUEUE_REF]); boost::shared_ptr<Exchange> exchange = broker.getExchanges().find(exName); boost::shared_ptr<Queue> queue = broker.getQueues().find(qName); - // FIXME aconway 2011-11-24: more flexible configuration for binding replication. // Automatically replicate binding if queue and exchange exist and are replicated if (exchange && replicateLevel(exchange->getArgs()) && @@ -490,6 +495,28 @@ void BrokerReplicator::doResponseBind(Variant::Map& values) { } } +namespace { +const string REPLICATE_DEFAULT="replicateDefault"; +} + +// Received the ha-broker configuration object for the primary broker. +void BrokerReplicator::doResponseHaBroker(Variant::Map& values) { + try { + ReplicateLevel mine = haBroker.getSettings().replicateDefault; + ReplicateLevel primary = replicateLevel(values[REPLICATE_DEFAULT].asString()); + if (mine != primary) { + std::ostringstream os; + os << "Replicate default on backup (" << mine + << ") does not match primary (" << primary << ")"; + haBroker.shutdown(os.str()); + } + } catch (const std::exception& e) { + std::ostringstream os; + os << "Received invalid replicate default from primary: " << e.what(); + haBroker.shutdown(os.str()); + } +} + void BrokerReplicator::startQueueReplicator(const boost::shared_ptr<Queue>& queue) { if (replicateLevel(queue->getSettings()) == RL_ALL) { boost::shared_ptr<QueueReplicator> qr(new QueueReplicator(queue, link)); diff --git a/cpp/src/qpid/ha/BrokerReplicator.h b/cpp/src/qpid/ha/BrokerReplicator.h index 483c251126..c9d7b9f74c 100644 --- a/cpp/src/qpid/ha/BrokerReplicator.h +++ b/cpp/src/qpid/ha/BrokerReplicator.h @@ -22,6 +22,7 @@ * */ +#include "ReplicateLevel.h" #include "qpid/broker/Exchange.h" #include "qpid/types/Variant.h" #include <boost/shared_ptr.hpp> @@ -35,7 +36,12 @@ class Bridge; class SessionHandler; } +namespace framing { +class FieldTable; +} + namespace ha { +class HaBroker; /** * Replicate configuration on a backup broker. @@ -51,7 +57,7 @@ namespace ha { class BrokerReplicator : public broker::Exchange { public: - BrokerReplicator(const boost::shared_ptr<broker::Link>&); + BrokerReplicator(HaBroker&, const boost::shared_ptr<broker::Link>&); ~BrokerReplicator(); std::string getType() const; @@ -64,6 +70,10 @@ class BrokerReplicator : public broker::Exchange private: void initializeBridge(broker::Bridge&, broker::SessionHandler&); + ReplicateLevel replicateLevel(const std::string&); + ReplicateLevel replicateLevel(const framing::FieldTable& args); + ReplicateLevel replicateLevel(const types::Variant::Map& args); + void doEventQueueDeclare(types::Variant::Map& values); void doEventQueueDelete(types::Variant::Map& values); void doEventExchangeDeclare(types::Variant::Map& values); @@ -74,9 +84,11 @@ class BrokerReplicator : public broker::Exchange void doResponseQueue(types::Variant::Map& values); void doResponseExchange(types::Variant::Map& values); void doResponseBind(types::Variant::Map& values); + void doResponseHaBroker(types::Variant::Map& values); void startQueueReplicator(const boost::shared_ptr<broker::Queue>&); + HaBroker& haBroker; broker::Broker& broker; boost::shared_ptr<broker::Link> link; }; diff --git a/cpp/src/qpid/ha/HaBroker.cpp b/cpp/src/qpid/ha/HaBroker.cpp index ad6719f207..39a01ddf6b 100644 --- a/cpp/src/qpid/ha/HaBroker.cpp +++ b/cpp/src/qpid/ha/HaBroker.cpp @@ -27,6 +27,7 @@ #include "qpid/broker/Broker.h" #include "qpid/broker/Link.h" #include "qpid/broker/Queue.h" +#include "qpid/broker/SignalHandler.h" #include "qpid/management/ManagementAgent.h" #include "qmf/org/apache/qpid/ha/Package.h" #include "qmf/org/apache/qpid/ha/ArgsHaBrokerReplicate.h" @@ -69,16 +70,18 @@ HaBroker::HaBroker(broker::Broker& b, const Settings& s) throw Exception("Cannot start HA: management is disabled"); _qmf::Package packageInit(ma); mgmtObject = new _qmf::HaBroker(ma, this, "ha-broker"); - // FIXME aconway 2012-03-01: should start in catch-up state and move to backup - // only when caught up. mgmtObject->set_status(BACKUP); + mgmtObject->set_replicateDefault(str(settings.replicateDefault)); ma->addObject(mgmtObject); + + // NOTE: lock is not needed in a constructor but we created it just to pass + // to the set functions. sys::Mutex::ScopedLock l(lock); if (!settings.clientUrl.empty()) setClientUrl(Url(settings.clientUrl), l); if (!settings.brokerUrl.empty()) setBrokerUrl(Url(settings.brokerUrl), l); // If we are in a cluster, we start in backup mode. - if (settings.cluster) backup.reset(new Backup(b, s)); + if (settings.cluster) backup.reset(new Backup(*this, s)); } HaBroker::~HaBroker() {} @@ -169,4 +172,9 @@ std::vector<Url> HaBroker::getKnownBrokers() const { return knownBrokers; } +void HaBroker::shutdown(const std::string& message) { + QPID_LOG(critical, "Shutting down: " << message); + broker::SignalHandler::shutdown(); +} + }} // namespace qpid::ha diff --git a/cpp/src/qpid/ha/HaBroker.h b/cpp/src/qpid/ha/HaBroker.h index 4f4ee4c944..65ad3de4a0 100644 --- a/cpp/src/qpid/ha/HaBroker.h +++ b/cpp/src/qpid/ha/HaBroker.h @@ -52,6 +52,11 @@ class HaBroker : public management::Manageable management::Manageable::status_t ManagementMethod ( uint32_t methodId, management::Args& args, std::string& text); + broker::Broker& getBroker() { return broker; } + const Settings& getSettings() const { return settings; } + + // Log a critical error message and shut down the broker. + void shutdown(const std::string& message); private: void setClientUrl(const Url&, const sys::Mutex::ScopedLock&); void setBrokerUrl(const Url&, const sys::Mutex::ScopedLock&); diff --git a/cpp/src/qpid/ha/HaPlugin.cpp b/cpp/src/qpid/ha/HaPlugin.cpp index 6a43b591b0..419ee962da 100644 --- a/cpp/src/qpid/ha/HaPlugin.cpp +++ b/cpp/src/qpid/ha/HaPlugin.cpp @@ -37,6 +37,9 @@ struct Options : public qpid::Options { "URL that backup brokers use to connect and fail over.") ("ha-public-brokers", optValue(settings.clientUrl,"URL"), "URL that clients use to connect and fail over, defaults to ha-brokers.") + ("ha-replicate-default", + optValue(settings.replicateDefault, "LEVEL"), + "Replication level for creating queues and exchanges if there is no qpid.replicate argument supplied. LEVEL is 'none', 'configuration' or 'all'") ("ha-expected-backups", optValue(settings.expectedBackups, "N"), "Number of backups expected to be active in the HA cluster.") ("ha-username", optValue(settings.username, "USER"), diff --git a/cpp/src/qpid/ha/QueueReplicator.cpp b/cpp/src/qpid/ha/QueueReplicator.cpp index 6aff4879e3..83f3d28b6d 100644 --- a/cpp/src/qpid/ha/QueueReplicator.cpp +++ b/cpp/src/qpid/ha/QueueReplicator.cpp @@ -53,7 +53,7 @@ std::string QueueReplicator::replicatorName(const std::string& queueName) { QueueReplicator::QueueReplicator(boost::shared_ptr<Queue> q, boost::shared_ptr<Link> l) : Exchange(replicatorName(q->getName()), 0, q->getBroker()), queue(q), link(l) { - logPrefix = "HA: Backup " + queue->getName() + ": "; + logPrefix = "HA: Backup of queue " + queue->getName() + ": "; QPID_LOG(info, logPrefix << "Created, settings: " << q->getSettings()); } diff --git a/cpp/src/qpid/ha/ReplicateLevel.cpp b/cpp/src/qpid/ha/ReplicateLevel.cpp new file mode 100644 index 0000000000..48f21a1f66 --- /dev/null +++ b/cpp/src/qpid/ha/ReplicateLevel.cpp @@ -0,0 +1,62 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "ReplicateLevel.h" +#include "qpid/Exception.h" +#include <iostream> + +namespace qpid { +namespace ha { + +using namespace std; + +namespace { +const string S_NONE="none"; +const string S_CONFIGURATION="configuration"; +const string S_ALL="all"; + +string names[] = { S_NONE, S_CONFIGURATION, S_ALL }; +} + +bool replicateLevel(const string& level, ReplicateLevel& out) { + if (level == S_NONE) { out = RL_NONE; return true; } + if (level == S_CONFIGURATION) { out = RL_CONFIGURATION; return true; } + if (level == S_ALL) { out = RL_ALL; return true; } + return false; +} + +ReplicateLevel replicateLevel(const string& level) { + ReplicateLevel rl; + if (!replicateLevel(level, rl)) + throw Exception("Invalid value for replication level: "+level); + return rl; +} + +string str(ReplicateLevel l) { return names[l]; } + +ostream& operator<<(ostream& o, ReplicateLevel rl) { return o << str(rl); } +istream& operator>>(istream& i, ReplicateLevel& rl) { + string str; + i >> str; + rl = replicateLevel(str); + return i; +} + +}} // namespace qpid::ha diff --git a/cpp/src/qpid/ha/ReplicateLevel.h b/cpp/src/qpid/ha/ReplicateLevel.h new file mode 100644 index 0000000000..c11e03f0ce --- /dev/null +++ b/cpp/src/qpid/ha/ReplicateLevel.h @@ -0,0 +1,52 @@ +#ifndef QPID_HA_REPLICATELEVEL_H +#define QPID_HA_REPLICATELEVEL_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include <string> +#include <iosfwd> + +namespace qpid { +namespace ha { + +enum ReplicateLevel { RL_NONE, RL_CONFIGURATION, RL_ALL }; + +/** + * If str is a valid replicate level, set out and return true. + */ +bool replicateLevel(const std::string& str, ReplicateLevel& out); + +/** + *@return enum corresponding to string level. + *@throw qpid::Exception if level is not a valid replication level. + */ +ReplicateLevel replicateLevel(const std::string& level); + +/**@return string form of replicate level */ +std::string str(ReplicateLevel l); + +std::ostream& operator<<(std::ostream&, ReplicateLevel); +std::istream& operator>>(std::istream&, ReplicateLevel&); + +}} // namespaces qpid::ha + +#endif /*!QPID_HA_REPLICATELEVEL_H*/ diff --git a/cpp/src/qpid/ha/Settings.h b/cpp/src/qpid/ha/Settings.h index 7df18b4ef4..bf70c3f3f7 100644 --- a/cpp/src/qpid/ha/Settings.h +++ b/cpp/src/qpid/ha/Settings.h @@ -22,6 +22,7 @@ * */ +#include "ReplicateLevel.h" #include <string> namespace qpid { @@ -33,11 +34,12 @@ namespace ha { class Settings { public: - Settings() : cluster(false), expectedBackups(0) {} + Settings() : cluster(false), expectedBackups(0), replicateDefault(RL_NONE) {} bool cluster; // True if we are a cluster member. std::string clientUrl; std::string brokerUrl; size_t expectedBackups; + ReplicateLevel replicateDefault; std::string username, password, mechanism; private: }; diff --git a/cpp/src/qpid/ha/management-schema.xml b/cpp/src/qpid/ha/management-schema.xml index 9a815b346c..363dff61fb 100644 --- a/cpp/src/qpid/ha/management-schema.xml +++ b/cpp/src/qpid/ha/management-schema.xml @@ -32,7 +32,11 @@ desc="Multiple-address URL used by clients to connect to the HA brokers."/> <property name="expectedBackups" type="uint16" - desc="Number of HA backup brokers expected."/>> + desc="Number of HA backup brokers expected."/> + + <property + name="replicateDefault" type="sstr" + desc="Replicate value for queues/exchanges without a qpid.replicate argument"/> <method name="promote" desc="Promote a backup broker to primary."/> @@ -48,7 +52,7 @@ <arg name="expectedBackups" type="uint16" dir="I"/> </method> - <method name="replicate" desc="Replicate from a remote queue to the local broker."> + <method name="replicate" desc="Replicate individual queue from remote broker."> <arg name="broker" type="sstr" dir="I"/> <arg name="queue" type="sstr" dir="I"/> </method> |
