diff options
author | Alan Conway <aconway@apache.org> | 2012-04-02 22:15:31 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2012-04-02 22:15:31 +0000 |
commit | d052286fc446f712f68cce4527ef1bc6d2717092 (patch) | |
tree | 4812c379df5c522f2f1ab346e221cde543713d38 | |
parent | a493aaaa625352344652543be606c1a00f9f685b (diff) | |
download | qpid-python-d052286fc446f712f68cce4527ef1bc6d2717092.tar.gz |
QPID-3603: Broker option --ha-replicate-default to specify default replication.
Takes values 'all', 'configuration', 'all'. This is the replication level to use
if a queue or exchange is created without an explicit 'qpid.replicate' argument.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1308597 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | qpid/cpp/src/ha.mk | 16 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/ha/Backup.cpp | 11 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/ha/Backup.h | 4 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/ha/BrokerReplicator.cpp | 99 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/ha/BrokerReplicator.h | 14 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/ha/HaBroker.cpp | 14 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/ha/HaBroker.h | 5 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/ha/HaPlugin.cpp | 3 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/ha/QueueReplicator.cpp | 2 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/ha/ReplicateLevel.cpp | 62 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/ha/ReplicateLevel.h | 52 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/ha/Settings.h | 4 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/ha/management-schema.xml | 8 | ||||
-rwxr-xr-x | qpid/cpp/src/tests/ha_tests.py | 142 |
14 files changed, 316 insertions, 120 deletions
diff --git a/qpid/cpp/src/ha.mk b/qpid/cpp/src/ha.mk index 8a2cee30c7..be1fb73e89 100644 --- a/qpid/cpp/src/ha.mk +++ b/qpid/cpp/src/ha.mk @@ -25,18 +25,20 @@ dmoduleexec_LTLIBRARIES += ha.la ha_la_SOURCES = \ qpid/ha/Backup.cpp \ qpid/ha/Backup.h \ + qpid/ha/BrokerReplicator.cpp \ + qpid/ha/BrokerReplicator.h \ + qpid/ha/ConnectionExcluder.cpp \ + qpid/ha/ConnectionExcluder.h \ qpid/ha/HaBroker.cpp \ qpid/ha/HaBroker.h \ qpid/ha/HaPlugin.cpp \ - qpid/ha/Settings.h \ - qpid/ha/QueueReplicator.h \ qpid/ha/QueueReplicator.cpp \ - qpid/ha/ReplicatingSubscription.h \ + qpid/ha/QueueReplicator.h \ + qpid/ha/ReplicateLevel.cpp \ + qpid/ha/ReplicateLevel.h \ qpid/ha/ReplicatingSubscription.cpp \ - qpid/ha/BrokerReplicator.cpp \ - qpid/ha/BrokerReplicator.h \ - qpid/ha/ConnectionExcluder.cpp \ - qpid/ha/ConnectionExcluder.h + qpid/ha/ReplicatingSubscription.h \ + qpid/ha/Settings.h ha_la_LIBADD = libqpidbroker.la ha_la_LDFLAGS = $(PLUGINLDFLAGS) diff --git a/qpid/cpp/src/qpid/ha/Backup.cpp b/qpid/cpp/src/qpid/ha/Backup.cpp index 3d65e07202..eabf502f8b 100644 --- a/qpid/cpp/src/qpid/ha/Backup.cpp +++ b/qpid/cpp/src/qpid/ha/Backup.cpp @@ -19,10 +19,11 @@ * */ #include "Backup.h" -#include "Settings.h" #include "BrokerReplicator.h" -#include "ReplicatingSubscription.h" #include "ConnectionExcluder.h" +#include "HaBroker.h" +#include "ReplicatingSubscription.h" +#include "Settings.h" #include "qpid/Url.h" #include "qpid/amqp_0_10/Codecs.h" #include "qpid/broker/Bridge.h" @@ -43,8 +44,8 @@ using namespace broker; using types::Variant; using std::string; -Backup::Backup(broker::Broker& b, const Settings& s) : - broker(b), settings(s), excluder(new ConnectionExcluder()) +Backup::Backup(HaBroker& hb, const Settings& s) : + haBroker(hb), broker(hb.getBroker()), settings(s), excluder(new ConnectionExcluder()) { // Empty brokerUrl means delay initialization until setUrl() is called. if (!s.brokerUrl.empty()) initialize(Url(s.brokerUrl)); @@ -63,7 +64,7 @@ void Backup::initialize(const Url& url) { link = result.first; link->setUrl(url); - replicator.reset(new BrokerReplicator(link)); + replicator.reset(new BrokerReplicator(haBroker, link)); broker.getExchanges().registerExchange(replicator); broker.getConnectionObservers().add(excluder); } diff --git a/qpid/cpp/src/qpid/ha/Backup.h b/qpid/cpp/src/qpid/ha/Backup.h index 526b238b82..6c36996914 100644 --- a/qpid/cpp/src/qpid/ha/Backup.h +++ b/qpid/cpp/src/qpid/ha/Backup.h @@ -38,6 +38,7 @@ namespace ha { class Settings; class ConnectionExcluder; class BrokerReplicator; +class HaBroker; /** * State associated with a backup broker. Manages connections to primary. @@ -47,7 +48,7 @@ class BrokerReplicator; class Backup { public: - Backup(broker::Broker&, const Settings&); + Backup(HaBroker&, const Settings&); ~Backup(); void setBrokerUrl(const Url&); @@ -55,6 +56,7 @@ class Backup void initialize(const Url&); sys::Mutex lock; + HaBroker& haBroker; broker::Broker& broker; Settings settings; boost::shared_ptr<broker::Link> link; diff --git a/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp b/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp index 609a3378ad..9d0043a827 100644 --- a/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp +++ b/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp @@ -19,6 +19,7 @@ * */ #include "BrokerReplicator.h" +#include "HaBroker.h" #include "QueueReplicator.h" #include "qpid/broker/Broker.h" #include "qpid/broker/Queue.h" @@ -37,6 +38,7 @@ #include "qmf/org/apache/qpid/broker/EventQueueDelete.h" #include "qmf/org/apache/qpid/broker/EventSubscribe.h" #include <algorithm> +#include <sstream> namespace qpid { namespace ha { @@ -87,6 +89,7 @@ const string QUEUE("queue"); const string RHOST("rhost"); const string TYPE("type"); const string USER("user"); +const string HA_BROKER("habroker"); const string AGENT_IND_EVENT_ORG_APACHE_QPID_BROKER("agent.ind.event.org_apache_qpid_broker.#"); const string QMF2("qmf2"); @@ -100,6 +103,7 @@ const string _PACKAGE_NAME("_package_name"); const string _SCHEMA_ID("_schema_id"); const string OBJECT("OBJECT"); const string ORG_APACHE_QPID_BROKER("org.apache.qpid.broker"); +const string ORG_APACHE_QPID_HA("org.apache.qpid.ha"); const string QMF_DEFAULT_DIRECT("qmf.default.direct"); const string _QUERY_REQUEST("_query_request"); const string BROKER("broker"); @@ -113,36 +117,13 @@ template <class T> bool match(Variant::Map& schema) { return T::match(schema[CLASS_NAME], schema[PACKAGE_NAME]); } -enum ReplicateLevel { RL_NONE=0, RL_CONFIGURATION, RL_ALL }; -const string S_NONE="none"; -const string S_CONFIGURATION="configuration"; -const string S_ALL="all"; - -ReplicateLevel replicateLevel(const string& level) { - if (level == S_NONE) return RL_NONE; - if (level == S_CONFIGURATION) return RL_CONFIGURATION; - if (level == S_ALL) return RL_ALL; - throw Exception("Invalid value for "+QPID_REPLICATE+": "+level); -} - -ReplicateLevel replicateLevel(const framing::FieldTable& f) { - if (f.isSet(QPID_REPLICATE)) return replicateLevel(f.getAsString(QPID_REPLICATE)); - else return RL_NONE; -} - -ReplicateLevel replicateLevel(const Variant::Map& m) { - Variant::Map::const_iterator i = m.find(QPID_REPLICATE); - if (i != m.end()) return replicateLevel(i->second.asString()); - else return RL_NONE; -} - -void sendQuery(const string className, const string& queueName, SessionHandler& sessionHandler) { +void sendQuery(const string& packageName, const string& className, const string& queueName, SessionHandler& sessionHandler) { framing::AMQP_ServerProxy peer(sessionHandler.out); Variant::Map request; request[_WHAT] = OBJECT; Variant::Map schema; schema[_CLASS_NAME] = className; - schema[_PACKAGE_NAME] = ORG_APACHE_QPID_BROKER; + schema[_PACKAGE_NAME] = packageName; request[_SCHEMA_ID] = schema; AMQFrame method((MessageTransferBody(ProtocolVersion(), QMF_DEFAULT_DIRECT, 0, 0))); @@ -181,10 +162,33 @@ Variant::Map asMapVoid(const Variant& value) { } // namespace + +ReplicateLevel BrokerReplicator::replicateLevel(const std::string& str) { + ReplicateLevel rl; + if (qpid::ha::replicateLevel(str, rl)) return rl; + else return haBroker.getSettings().replicateDefault; +} + +ReplicateLevel BrokerReplicator::replicateLevel(const framing::FieldTable& f) { + if (f.isSet(QPID_REPLICATE)) + return replicateLevel(f.getAsString(QPID_REPLICATE)); + else + return haBroker.getSettings().replicateDefault; +} + +ReplicateLevel BrokerReplicator::replicateLevel(const Variant::Map& m) { + Variant::Map::const_iterator i = m.find(QPID_REPLICATE); + if (i != m.end()) + return replicateLevel(i->second.asString()); + else + return haBroker.getSettings().replicateDefault; +} + BrokerReplicator::~BrokerReplicator() {} -BrokerReplicator::BrokerReplicator(const boost::shared_ptr<Link>& l) - : Exchange(QPID_CONFIGURATION_REPLICATOR), broker(*l->getBroker()), link(l) +BrokerReplicator::BrokerReplicator(HaBroker& hb, const boost::shared_ptr<Link>& l) + : Exchange(QPID_CONFIGURATION_REPLICATOR), + haBroker(hb), broker(hb.getBroker()), link(l) { QPID_LOG(info, "HA: Backup replicating from " << link->getTransport() << ":" << link->getHost() << ":" << link->getPort()); @@ -211,17 +215,21 @@ void BrokerReplicator::initializeBridge(Bridge& bridge, SessionHandler& sessionH const qmf::org::apache::qpid::broker::ArgsLinkBridge& args(bridge.getArgs()); //declare and bind an event queue - peer.getQueue().declare(queueName, "", false, false, true, true, FieldTable()); + FieldTable declareArgs; + declareArgs.setString(QPID_REPLICATE, str(RL_NONE)); + peer.getQueue().declare(queueName, "", false, false, true, true, declareArgs); peer.getExchange().bind(queueName, QMF_DEFAULT_TOPIC, AGENT_IND_EVENT_ORG_APACHE_QPID_BROKER, FieldTable()); //subscribe to the queue peer.getMessage().subscribe(queueName, args.i_dest, 1, 0, false, "", 0, FieldTable()); peer.getMessage().flow(args.i_dest, 0, 0xFFFFFFFF); peer.getMessage().flow(args.i_dest, 1, 0xFFFFFFFF); - //issue a query request for queues and another for exchanges using event queue as the reply-to address - sendQuery(QUEUE, queueName, sessionHandler); - sendQuery(EXCHANGE, queueName, sessionHandler); - sendQuery(BINDING, queueName, sessionHandler); + // Issue a query request for queues, exchanges, bindings and the habroker + // using event queue as the reply-to address + sendQuery(ORG_APACHE_QPID_HA, HA_BROKER, queueName, sessionHandler); + sendQuery(ORG_APACHE_QPID_BROKER, QUEUE, queueName, sessionHandler); + sendQuery(ORG_APACHE_QPID_BROKER, EXCHANGE, queueName, sessionHandler); + sendQuery(ORG_APACHE_QPID_BROKER, BINDING, queueName, sessionHandler); QPID_LOG(debug, "HA: Backup activated configuration bridge: " << queueName); } @@ -257,6 +265,7 @@ void BrokerReplicator::route(Deliverable& msg) { if (type == QUEUE) doResponseQueue(values); else if (type == EXCHANGE) doResponseExchange(values); else if (type == BINDING) doResponseBind(values); + else if (type == HA_BROKER) doResponseHaBroker(values); else QPID_LOG(error, "HA: Backup received unknown response type=" << type << " values=" << values); } @@ -288,7 +297,6 @@ void BrokerReplicator::doEventQueueDeclare(Variant::Map& values) { // re-create from event. // Events are always up to date, whereas responses may be // out of date. - QPID_LOG(debug, "HA: Backup created queue: " << name); startQueueReplicator(result.first); } else { // FIXME aconway 2011-12-02: what's the right way to handle this? @@ -400,7 +408,6 @@ void BrokerReplicator::doEventUnbind(Variant::Map& values) { void BrokerReplicator::doResponseQueue(Variant::Map& values) { QPID_LOG(debug, "HA: Backup queue response " << values); - // FIXME aconway 2011-11-22: more flexible ways & defaults to indicate replication Variant::Map argsMap(asMapVoid(values[ARGUMENTS])); if (!replicateLevel(argsMap)) return; framing::FieldTable args; @@ -417,7 +424,6 @@ void BrokerReplicator::doResponseQueue(Variant::Map& values) { ""/*TODO: who is the user?*/, ""/*TODO: what should we use as connection id?*/); if (result.second) { - QPID_LOG(debug, "HA: Backup created catch-up queue: " << values[NAME]); startQueueReplicator(result.first); } else { // FIXME aconway 2011-11-22: Normal to find queue already @@ -474,7 +480,6 @@ void BrokerReplicator::doResponseBind(Variant::Map& values) { std::string qName = getRefName(QUEUE_REF_PREFIX, values[QUEUE_REF]); boost::shared_ptr<Exchange> exchange = broker.getExchanges().find(exName); boost::shared_ptr<Queue> queue = broker.getQueues().find(qName); - // FIXME aconway 2011-11-24: more flexible configuration for binding replication. // Automatically replicate binding if queue and exchange exist and are replicated if (exchange && replicateLevel(exchange->getArgs()) && @@ -490,6 +495,28 @@ void BrokerReplicator::doResponseBind(Variant::Map& values) { } } +namespace { +const string REPLICATE_DEFAULT="replicateDefault"; +} + +// Received the ha-broker configuration object for the primary broker. +void BrokerReplicator::doResponseHaBroker(Variant::Map& values) { + try { + ReplicateLevel mine = haBroker.getSettings().replicateDefault; + ReplicateLevel primary = replicateLevel(values[REPLICATE_DEFAULT].asString()); + if (mine != primary) { + std::ostringstream os; + os << "Replicate default on backup (" << mine + << ") does not match primary (" << primary << ")"; + haBroker.shutdown(os.str()); + } + } catch (const std::exception& e) { + std::ostringstream os; + os << "Received invalid replicate default from primary: " << e.what(); + haBroker.shutdown(os.str()); + } +} + void BrokerReplicator::startQueueReplicator(const boost::shared_ptr<Queue>& queue) { if (replicateLevel(queue->getSettings()) == RL_ALL) { boost::shared_ptr<QueueReplicator> qr(new QueueReplicator(queue, link)); diff --git a/qpid/cpp/src/qpid/ha/BrokerReplicator.h b/qpid/cpp/src/qpid/ha/BrokerReplicator.h index 483c251126..c9d7b9f74c 100644 --- a/qpid/cpp/src/qpid/ha/BrokerReplicator.h +++ b/qpid/cpp/src/qpid/ha/BrokerReplicator.h @@ -22,6 +22,7 @@ * */ +#include "ReplicateLevel.h" #include "qpid/broker/Exchange.h" #include "qpid/types/Variant.h" #include <boost/shared_ptr.hpp> @@ -35,7 +36,12 @@ class Bridge; class SessionHandler; } +namespace framing { +class FieldTable; +} + namespace ha { +class HaBroker; /** * Replicate configuration on a backup broker. @@ -51,7 +57,7 @@ namespace ha { class BrokerReplicator : public broker::Exchange { public: - BrokerReplicator(const boost::shared_ptr<broker::Link>&); + BrokerReplicator(HaBroker&, const boost::shared_ptr<broker::Link>&); ~BrokerReplicator(); std::string getType() const; @@ -64,6 +70,10 @@ class BrokerReplicator : public broker::Exchange private: void initializeBridge(broker::Bridge&, broker::SessionHandler&); + ReplicateLevel replicateLevel(const std::string&); + ReplicateLevel replicateLevel(const framing::FieldTable& args); + ReplicateLevel replicateLevel(const types::Variant::Map& args); + void doEventQueueDeclare(types::Variant::Map& values); void doEventQueueDelete(types::Variant::Map& values); void doEventExchangeDeclare(types::Variant::Map& values); @@ -74,9 +84,11 @@ class BrokerReplicator : public broker::Exchange void doResponseQueue(types::Variant::Map& values); void doResponseExchange(types::Variant::Map& values); void doResponseBind(types::Variant::Map& values); + void doResponseHaBroker(types::Variant::Map& values); void startQueueReplicator(const boost::shared_ptr<broker::Queue>&); + HaBroker& haBroker; broker::Broker& broker; boost::shared_ptr<broker::Link> link; }; diff --git a/qpid/cpp/src/qpid/ha/HaBroker.cpp b/qpid/cpp/src/qpid/ha/HaBroker.cpp index ad6719f207..39a01ddf6b 100644 --- a/qpid/cpp/src/qpid/ha/HaBroker.cpp +++ b/qpid/cpp/src/qpid/ha/HaBroker.cpp @@ -27,6 +27,7 @@ #include "qpid/broker/Broker.h" #include "qpid/broker/Link.h" #include "qpid/broker/Queue.h" +#include "qpid/broker/SignalHandler.h" #include "qpid/management/ManagementAgent.h" #include "qmf/org/apache/qpid/ha/Package.h" #include "qmf/org/apache/qpid/ha/ArgsHaBrokerReplicate.h" @@ -69,16 +70,18 @@ HaBroker::HaBroker(broker::Broker& b, const Settings& s) throw Exception("Cannot start HA: management is disabled"); _qmf::Package packageInit(ma); mgmtObject = new _qmf::HaBroker(ma, this, "ha-broker"); - // FIXME aconway 2012-03-01: should start in catch-up state and move to backup - // only when caught up. mgmtObject->set_status(BACKUP); + mgmtObject->set_replicateDefault(str(settings.replicateDefault)); ma->addObject(mgmtObject); + + // NOTE: lock is not needed in a constructor but we created it just to pass + // to the set functions. sys::Mutex::ScopedLock l(lock); if (!settings.clientUrl.empty()) setClientUrl(Url(settings.clientUrl), l); if (!settings.brokerUrl.empty()) setBrokerUrl(Url(settings.brokerUrl), l); // If we are in a cluster, we start in backup mode. - if (settings.cluster) backup.reset(new Backup(b, s)); + if (settings.cluster) backup.reset(new Backup(*this, s)); } HaBroker::~HaBroker() {} @@ -169,4 +172,9 @@ std::vector<Url> HaBroker::getKnownBrokers() const { return knownBrokers; } +void HaBroker::shutdown(const std::string& message) { + QPID_LOG(critical, "Shutting down: " << message); + broker::SignalHandler::shutdown(); +} + }} // namespace qpid::ha diff --git a/qpid/cpp/src/qpid/ha/HaBroker.h b/qpid/cpp/src/qpid/ha/HaBroker.h index 4f4ee4c944..65ad3de4a0 100644 --- a/qpid/cpp/src/qpid/ha/HaBroker.h +++ b/qpid/cpp/src/qpid/ha/HaBroker.h @@ -52,6 +52,11 @@ class HaBroker : public management::Manageable management::Manageable::status_t ManagementMethod ( uint32_t methodId, management::Args& args, std::string& text); + broker::Broker& getBroker() { return broker; } + const Settings& getSettings() const { return settings; } + + // Log a critical error message and shut down the broker. + void shutdown(const std::string& message); private: void setClientUrl(const Url&, const sys::Mutex::ScopedLock&); void setBrokerUrl(const Url&, const sys::Mutex::ScopedLock&); diff --git a/qpid/cpp/src/qpid/ha/HaPlugin.cpp b/qpid/cpp/src/qpid/ha/HaPlugin.cpp index 6a43b591b0..419ee962da 100644 --- a/qpid/cpp/src/qpid/ha/HaPlugin.cpp +++ b/qpid/cpp/src/qpid/ha/HaPlugin.cpp @@ -37,6 +37,9 @@ struct Options : public qpid::Options { "URL that backup brokers use to connect and fail over.") ("ha-public-brokers", optValue(settings.clientUrl,"URL"), "URL that clients use to connect and fail over, defaults to ha-brokers.") + ("ha-replicate-default", + optValue(settings.replicateDefault, "LEVEL"), + "Replication level for creating queues and exchanges if there is no qpid.replicate argument supplied. LEVEL is 'none', 'configuration' or 'all'") ("ha-expected-backups", optValue(settings.expectedBackups, "N"), "Number of backups expected to be active in the HA cluster.") ("ha-username", optValue(settings.username, "USER"), diff --git a/qpid/cpp/src/qpid/ha/QueueReplicator.cpp b/qpid/cpp/src/qpid/ha/QueueReplicator.cpp index 6aff4879e3..83f3d28b6d 100644 --- a/qpid/cpp/src/qpid/ha/QueueReplicator.cpp +++ b/qpid/cpp/src/qpid/ha/QueueReplicator.cpp @@ -53,7 +53,7 @@ std::string QueueReplicator::replicatorName(const std::string& queueName) { QueueReplicator::QueueReplicator(boost::shared_ptr<Queue> q, boost::shared_ptr<Link> l) : Exchange(replicatorName(q->getName()), 0, q->getBroker()), queue(q), link(l) { - logPrefix = "HA: Backup " + queue->getName() + ": "; + logPrefix = "HA: Backup of queue " + queue->getName() + ": "; QPID_LOG(info, logPrefix << "Created, settings: " << q->getSettings()); } diff --git a/qpid/cpp/src/qpid/ha/ReplicateLevel.cpp b/qpid/cpp/src/qpid/ha/ReplicateLevel.cpp new file mode 100644 index 0000000000..48f21a1f66 --- /dev/null +++ b/qpid/cpp/src/qpid/ha/ReplicateLevel.cpp @@ -0,0 +1,62 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "ReplicateLevel.h" +#include "qpid/Exception.h" +#include <iostream> + +namespace qpid { +namespace ha { + +using namespace std; + +namespace { +const string S_NONE="none"; +const string S_CONFIGURATION="configuration"; +const string S_ALL="all"; + +string names[] = { S_NONE, S_CONFIGURATION, S_ALL }; +} + +bool replicateLevel(const string& level, ReplicateLevel& out) { + if (level == S_NONE) { out = RL_NONE; return true; } + if (level == S_CONFIGURATION) { out = RL_CONFIGURATION; return true; } + if (level == S_ALL) { out = RL_ALL; return true; } + return false; +} + +ReplicateLevel replicateLevel(const string& level) { + ReplicateLevel rl; + if (!replicateLevel(level, rl)) + throw Exception("Invalid value for replication level: "+level); + return rl; +} + +string str(ReplicateLevel l) { return names[l]; } + +ostream& operator<<(ostream& o, ReplicateLevel rl) { return o << str(rl); } +istream& operator>>(istream& i, ReplicateLevel& rl) { + string str; + i >> str; + rl = replicateLevel(str); + return i; +} + +}} // namespace qpid::ha diff --git a/qpid/cpp/src/qpid/ha/ReplicateLevel.h b/qpid/cpp/src/qpid/ha/ReplicateLevel.h new file mode 100644 index 0000000000..c11e03f0ce --- /dev/null +++ b/qpid/cpp/src/qpid/ha/ReplicateLevel.h @@ -0,0 +1,52 @@ +#ifndef QPID_HA_REPLICATELEVEL_H +#define QPID_HA_REPLICATELEVEL_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include <string> +#include <iosfwd> + +namespace qpid { +namespace ha { + +enum ReplicateLevel { RL_NONE, RL_CONFIGURATION, RL_ALL }; + +/** + * If str is a valid replicate level, set out and return true. + */ +bool replicateLevel(const std::string& str, ReplicateLevel& out); + +/** + *@return enum corresponding to string level. + *@throw qpid::Exception if level is not a valid replication level. + */ +ReplicateLevel replicateLevel(const std::string& level); + +/**@return string form of replicate level */ +std::string str(ReplicateLevel l); + +std::ostream& operator<<(std::ostream&, ReplicateLevel); +std::istream& operator>>(std::istream&, ReplicateLevel&); + +}} // namespaces qpid::ha + +#endif /*!QPID_HA_REPLICATELEVEL_H*/ diff --git a/qpid/cpp/src/qpid/ha/Settings.h b/qpid/cpp/src/qpid/ha/Settings.h index 7df18b4ef4..bf70c3f3f7 100644 --- a/qpid/cpp/src/qpid/ha/Settings.h +++ b/qpid/cpp/src/qpid/ha/Settings.h @@ -22,6 +22,7 @@ * */ +#include "ReplicateLevel.h" #include <string> namespace qpid { @@ -33,11 +34,12 @@ namespace ha { class Settings { public: - Settings() : cluster(false), expectedBackups(0) {} + Settings() : cluster(false), expectedBackups(0), replicateDefault(RL_NONE) {} bool cluster; // True if we are a cluster member. std::string clientUrl; std::string brokerUrl; size_t expectedBackups; + ReplicateLevel replicateDefault; std::string username, password, mechanism; private: }; diff --git a/qpid/cpp/src/qpid/ha/management-schema.xml b/qpid/cpp/src/qpid/ha/management-schema.xml index 9a815b346c..363dff61fb 100644 --- a/qpid/cpp/src/qpid/ha/management-schema.xml +++ b/qpid/cpp/src/qpid/ha/management-schema.xml @@ -32,7 +32,11 @@ desc="Multiple-address URL used by clients to connect to the HA brokers."/> <property name="expectedBackups" type="uint16" - desc="Number of HA backup brokers expected."/>> + desc="Number of HA backup brokers expected."/> + + <property + name="replicateDefault" type="sstr" + desc="Replicate value for queues/exchanges without a qpid.replicate argument"/> <method name="promote" desc="Promote a backup broker to primary."/> @@ -48,7 +52,7 @@ <arg name="expectedBackups" type="uint16" dir="I"/> </method> - <method name="replicate" desc="Replicate from a remote queue to the local broker."> + <method name="replicate" desc="Replicate individual queue from remote broker."> <arg name="broker" type="sstr" dir="I"/> <arg name="queue" type="sstr" dir="I"/> </method> diff --git a/qpid/cpp/src/tests/ha_tests.py b/qpid/cpp/src/tests/ha_tests.py index e9d44c21e0..10d5fc0db2 100755 --- a/qpid/cpp/src/tests/ha_tests.py +++ b/qpid/cpp/src/tests/ha_tests.py @@ -29,13 +29,17 @@ from qpidtoollibs import BrokerAgent log = getLogger("qpid.ha-tests") class HaBroker(Broker): - def __init__(self, test, args=[], broker_url=None, ha_cluster=True, **kwargs): + def __init__(self, test, args=[], broker_url=None, ha_cluster=True, + ha_replicate_default="all", **kwargs): assert BrokerTest.ha_lib, "Cannot locate HA plug-in" args = copy(args) - args.extend(["--load-module", BrokerTest.ha_lib, - # FIXME aconway 2012-02-13: workaround slow link failover. - "--link-maintenace-interval=0.1", - "--ha-cluster=%s"%ha_cluster]) + args += ["--load-module", BrokerTest.ha_lib, + "--log-enable=info+", "--log-enable=debug+:ha::", + # FIXME aconway 2012-02-13: workaround slow link failover. + "--link-maintenace-interval=0.1", + "--ha-cluster=%s"%ha_cluster] + if ha_replicate_default is not None: + args += [ "--ha-replicate-default=%s"%ha_replicate_default ] if broker_url: args.extend([ "--ha-brokers", broker_url ]) Broker.__init__(self, test, args, **kwargs) self.commands=os.getenv("PYTHON_COMMANDS") @@ -64,6 +68,10 @@ class HaBroker(Broker): assert os.system( "%s/qpid-config --broker=%s add queue %s --replicate %s"%(self.commands, self.host_port(), queue, replication)) == 0 + def connect_admin(self, **kwargs): + return Broker.connect(self, client_properties={"qpid.ha-admin":1}, **kwargs) + + class HaCluster(object): _cluster_count = 0 @@ -72,9 +80,9 @@ class HaCluster(object): self.test = test self._brokers = [ HaBroker(test, name="broker%s-%s"%(HaCluster._cluster_count, i), **kwargs) for i in xrange(n)] HaCluster._cluster_count += 1 - self[0].promote() self.url = ",".join([b.host_port() for b in self]) for b in self: b.set_broker_url(self.url) + self[0].promote() def connect(self, i): """Connect with reconnect_urls""" @@ -98,8 +106,6 @@ class HaCluster(object): def __iter__(self): return self._brokers.__iter__() -def qr_node(value="all"): return "node:{x-declare:{arguments:{'qpid.replicate':%s}}}" % value - class HaTest(BrokerTest): """Base class for HA test cases, defines convenience functions""" @@ -114,13 +120,13 @@ class HaTest(BrokerTest): # Wait for address to become valid on a backup broker. def wait_backup(self, backup, address): - bs = self.connect_admin(backup).session() + bs = backup.connect_admin().session() self.wait(bs, address) bs.connection.close() # Combines wait_backup and assert_browse_retry def assert_browse_backup(self, backup, queue, expected, **kwargs): - bs = self.connect_admin(backup).session() + bs = backup.connect_admin().session() self.wait(bs, queue) self.assert_browse_retry(bs, queue, expected, **kwargs) bs.connection.close() @@ -128,12 +134,9 @@ class HaTest(BrokerTest): def assert_missing(self, session, address): try: session.receiver(address) - self.fail("Should not have been replicated: %s"%(address)) + self.fail("Expected NotFound: %s"%(address)) except NotFound: pass - def connect_admin(self, backup, **kwargs): - """Connect to a backup broker as an admin connection""" - return backup.connect(client_properties={"qpid.ha-admin":1}, **kwargs) class ReplicationTests(HaTest): """Correctness tests for HA replication.""" @@ -173,7 +176,6 @@ class ReplicationTests(HaTest): def verify(b, prefix, p): """Verify setup was replicated to backup b""" - # Wait for configuration to replicate. self.wait(b, prefix+"x"); self.assert_browse_retry(b, prefix+"q1", ["b", "1", "4"]) @@ -203,7 +205,7 @@ class ReplicationTests(HaTest): setup(p, "2", primary) # Verify the data on the backup - b = self.connect_admin(backup).session() + b = backup.connect_admin().session() verify(b, "1", p) verify(b, "2", p) # Test a series of messages, enqueue all then dequeue all. @@ -230,12 +232,10 @@ class ReplicationTests(HaTest): self.assert_browse_retry(b, "foo", msgs[i+1:]) def test_sync(self): - def queue(name, replicate): - return "%s;{create:always,%s}"%(name, qr_node(replicate)) primary = HaBroker(self, name="primary") primary.promote() p = primary.connect().session() - s = p.sender(queue("q","all")) + s = p.sender("q;{create:always}") for m in [str(i) for i in range(0,10)]: s.send(m) s.sync() backup1 = HaBroker(self, name="backup1", broker_url=primary.host_port()) @@ -246,46 +246,37 @@ class ReplicationTests(HaTest): s.sync() msgs = [str(i) for i in range(30)] - b1 = self.connect_admin(backup1).session() + b1 = backup1.connect_admin().session() self.wait(b1, "q"); self.assert_browse_retry(b1, "q", msgs) - b2 = self.connect_admin(backup2).session() + b2 = backup2.connect_admin().session() self.wait(b2, "q"); self.assert_browse_retry(b2, "q", msgs) def test_send_receive(self): """Verify sequence numbers of messages sent by qpid-send""" + brokers = HaCluster(self, 3) getLogger().setLevel(ERROR) # Hide expected WARNING log messages from failover. - primary = HaBroker(self, name="primary") - primary.promote() - backup1 = HaBroker(self, name="backup1", broker_url=primary.host_port()) - backup2 = HaBroker(self, name="backup2", broker_url=primary.host_port()) sender = self.popen( ["qpid-send", - "--broker", primary.host_port(), - "--address", "q;{create:always,%s}"%(qr_node("all")), + "--broker", brokers[0].host_port(), + "--address", "q;{create:always}", "--messages=1000", "--content-string=x" ]) receiver = self.popen( ["qpid-receive", - "--broker", primary.host_port(), - "--address", "q;{create:always,%s}"%(qr_node("all")), + "--broker", brokers[0].host_port(), + "--address", "q;{create:always}", "--messages=990", "--timeout=10" ]) - try: - self.assertEqual(sender.wait(), 0) - self.assertEqual(receiver.wait(), 0) - expect = [long(i) for i in range(991, 1001)] - sn = lambda m: m.properties["sn"] - self.assert_browse_retry(self.connect_admin(backup1).session(), "q", expect, transform=sn) - self.assert_browse_retry(self.connect_admin(backup2).session(), "q", expect, transform=sn) - except: - print self.browse(primary.connect().session(), "q", transform=sn) - print self.browse(self.connect_admin(backup1).session(), "q", transform=sn) - print self.browse(self.connect_admin(backup2).session(), "q", transform=sn) - raise + self.assertEqual(sender.wait(), 0) + self.assertEqual(receiver.wait(), 0) + expect = [long(i) for i in range(991, 1001)] + sn = lambda m: m.properties["sn"] + self.assert_browse_backup(brokers[1], "q", expect, transform=sn) + self.assert_browse_backup(brokers[2], "q", expect, transform=sn) def test_failover_python(self): """Verify that backups rejects connections and that fail-over works in python client""" @@ -299,12 +290,12 @@ class ReplicationTests(HaTest): self.fail("Expected connection to backup to fail") except ConnectionError: pass # Check that admin connections are allowed to backup. - self.connect_admin(backup).close() + backup.connect_admin().close() # Test discovery: should connect to primary after reject by backup c = backup.connect(reconnect_urls=[primary.host_port(), backup.host_port()], reconnect=True) s = c.session() - sender = s.sender("q;{create:always,%s}"%(qr_node())) + sender = s.sender("q;{create:always}") self.wait_backup(backup, "q") sender.send("foo") primary.kill() @@ -319,7 +310,7 @@ class ReplicationTests(HaTest): primary.promote() backup = HaBroker(self, name="backup", broker_url=primary.host_port()) url="%s,%s"%(primary.host_port(), backup.host_port()) - primary.connect().session().sender("q;{create:always,%s}"%(qr_node())) + primary.connect().session().sender("q;{create:always}") self.wait_backup(backup, "q") sender = NumberedSender(primary, url=url, queue="q", failover_updates = False) @@ -340,8 +331,7 @@ class ReplicationTests(HaTest): def test_backup_failover(self): """Verify that a backup broker fails over and recovers queue state""" brokers = HaCluster(self, 3) - brokers[0].connect().session().sender( - "q;{create:always,%s}"%(qr_node())).send("a") + brokers[0].connect().session().sender("q;{create:always}").send("a") for b in brokers[1:]: self.assert_browse_backup(b, "q", ["a"]) brokers[0].expect = EXPECT_EXIT_FAIL brokers.kill(0) @@ -362,11 +352,11 @@ class ReplicationTests(HaTest): def test_standalone_queue_replica(self): """Test replication of individual queues outside of cluster mode""" getLogger().setLevel(ERROR) # Hide expected WARNING log messages from failover. - primary = HaBroker(self, name="primary", ha_cluster=False, args=["--log-enable=debug+"]) + primary = HaBroker(self, name="primary", ha_cluster=False) pc = primary.connect() ps = pc.session().sender("q;{create:always}") pr = pc.session().receiver("q;{create:always}") - backup = HaBroker(self, name="backup", ha_cluster=False, args=["--log-enable=debug+"]) + backup = HaBroker(self, name="backup", ha_cluster=False) br = backup.connect().session().receiver("q;{create:always}") # Set up replication with qpid-ha @@ -392,9 +382,9 @@ class ReplicationTests(HaTest): cluster = HaCluster(self, 2) primary = cluster[0] pc = cluster.connect(0) - ps = pc.session().sender("q;{create:always,%s}"%qr_node("all")) - pr = pc.session().receiver("q;{create:always,%s}"%qr_node("all")) - backup = HaBroker(self, name="backup", ha_cluster=False, args=["--log-enable=debug+"]) + ps = pc.session().sender("q;{create:always}") + pr = pc.session().receiver("q;{create:always}") + backup = HaBroker(self, name="backup", ha_cluster=False) br = backup.connect().session().receiver("q;{create:always}") backup.replicate(cluster.url, "q") ps.send("a") @@ -413,7 +403,7 @@ class ReplicationTests(HaTest): primary = HaBroker(self, name="primary") primary.promote() backup = HaBroker(self, name="backup", broker_url=primary.host_port()) - s = primary.connect().session().sender("lvq; {create:always, node:{x-declare:{arguments:{'qpid.last_value_queue_key':lvq-key, 'qpid.replicate':all}}}}") + s = primary.connect().session().sender("lvq; {create:always, node:{x-declare:{arguments:{'qpid.last_value_queue_key':lvq-key}}}}") def send(key,value): s.send(Message(content=value,properties={"lvq-key":key})) for kv in [("a","a-1"),("b","b-1"),("a","a-2"),("a","a-3"),("c","c-1"),("c","c-2")]: send(*kv) @@ -426,19 +416,21 @@ class ReplicationTests(HaTest): self.assert_browse_backup(backup, "lvq", ["a-3", "b-2", "c-3", "d-1"]) def test_ring(self): + """Test replication with the ring queue policy""" primary = HaBroker(self, name="primary") primary.promote() backup = HaBroker(self, name="backup", broker_url=primary.host_port()) - s = primary.connect().session().sender("q; {create:always, node:{x-declare:{arguments:{'qpid.policy_type':ring, 'qpid.max_count':5, 'qpid.replicate':all}}}}") + s = primary.connect().session().sender("q; {create:always, node:{x-declare:{arguments:{'qpid.policy_type':ring, 'qpid.max_count':5}}}}") for i in range(10): s.send(Message(str(i))) self.assert_browse_backup(backup, "q", [str(i) for i in range(5,10)]) def test_reject(self): + """Test replication with the reject queue policy""" getLogger().setLevel(ERROR) # Hide expected WARNING log messages from failover. primary = HaBroker(self, name="primary") primary.promote() backup = HaBroker(self, name="backup", broker_url=primary.host_port()) - s = primary.connect().session().sender("q; {create:always, node:{x-declare:{arguments:{'qpid.policy_type':reject, 'qpid.max_count':5, 'qpid.replicate':all}}}}") + s = primary.connect().session().sender("q; {create:always, node:{x-declare:{arguments:{'qpid.policy_type':reject, 'qpid.max_count':5}}}}") try: for i in range(10): s.send(Message(str(i)), sync=False) except qpid.messaging.exceptions.TargetCapacityExceeded: pass @@ -450,12 +442,12 @@ class ReplicationTests(HaTest): primary.promote() backup = HaBroker(self, name="backup", broker_url=primary.host_port()) session = primary.connect().session() - s = session.sender("priority-queue; {create:always, node:{x-declare:{arguments:{'qpid.priorities':10, 'qpid.replicate':all}}}}") + s = session.sender("priority-queue; {create:always, node:{x-declare:{arguments:{'qpid.priorities':10}}}}") priorities = [8,9,5,1,2,2,3,4,9,7,8,9,9,2] for p in priorities: s.send(Message(priority=p)) # Can't use browse_backup as browser sees messages in delivery order not priority. self.wait_backup(backup, "priority-queue") - r = self.connect_admin(backup).session().receiver("priority-queue") + r = backup.connect_admin().session().receiver("priority-queue") received = [r.fetch().priority for i in priorities] self.assertEqual(sorted(priorities, reverse=True), received) @@ -469,11 +461,11 @@ class ReplicationTests(HaTest): priorities = [4,5,3,7,8,8,2,8,2,8,8,16,6,6,6,6,6,6,8,3,5,8,3,5,5,3,3,8,8,3,7,3,7,7,7,8,8,8,2,3] limits={7:0,6:4,5:3,4:2,3:2,2:2,1:2} limit_policy = ",".join(["'qpid.fairshare':5"] + ["'qpid.fairshare-%s':%s"%(i[0],i[1]) for i in limits.iteritems()]) - s = session.sender("priority-queue; {create:always, node:{x-declare:{arguments:{'qpid.priorities':%s, %s, 'qpid.replicate':all}}}}"%(levels,limit_policy)) + s = session.sender("priority-queue; {create:always, node:{x-declare:{arguments:{'qpid.priorities':%s, %s}}}}"%(levels,limit_policy)) messages = [Message(content=str(uuid4()), priority = p) for p in priorities] for m in messages: s.send(m) self.wait_backup(backup, s.target) - r = self.connect_admin(backup).session().receiver("priority-queue") + r = backup.connect_admin().session().receiver("priority-queue") received = [r.fetch().content for i in priorities] sort = sorted(messages, key=lambda m: priority_level(m.priority, levels), reverse=True) fair = [m.content for m in fairshare(sort, lambda l: limits.get(l,0), levels)] @@ -483,13 +475,14 @@ class ReplicationTests(HaTest): primary = HaBroker(self, name="primary") primary.promote() backup = HaBroker(self, name="backup", broker_url=primary.host_port()) - s = primary.connect().session().sender("q; {create:always, node:{x-declare:{arguments:{'qpid.policy_type':ring, 'qpid.max_count':5, 'qpid.priorities':10, 'qpid.replicate':all}}}}") + s = primary.connect().session().sender("q; {create:always, node:{x-declare:{arguments:{'qpid.policy_type':ring, 'qpid.max_count':5, 'qpid.priorities':10}}}}") priorities = [8,9,5,1,2,2,3,4,9,7,8,9,9,2] for p in priorities: s.send(Message(priority=p)) - # FIXME aconway 2012-02-22: there is a bug in priority ring queues that allows a low - # priority message to displace a high one. The following commented-out assert_browse - # is for the correct result, the uncommented one is for the actualy buggy result. - # See https://issues.apache.org/jira/browse/QPID-3866 + # FIXME aconway 2012-02-22: there is a bug in priority ring + # queues that allows a low priority message to displace a high + # one. The following commented-out assert_browse is for the + # correct result, the uncommented one is for the actualy buggy + # result. See https://issues.apache.org/jira/browse/QPID-3866 # # self.assert_browse_backup(backup, "q", sorted(priorities,reverse=True)[0:5], transform=lambda m: m.priority) self.assert_browse_backup(backup, "q", [9,9,9,9,2], transform=lambda m: m.priority) @@ -539,6 +532,29 @@ class ReplicationTests(HaTest): for t in tests: t.verify(self, backup1) for t in tests: t.verify(self, backup2) + def test_replicate_default(self): + """Make sure we don't replicate if ha-replicate-default is unspecified or none""" + cluster1 = HaCluster(self, 2, ha_replicate_default=None) + c1 = cluster1[0].connect().session().sender("q;{create:always}") + cluster2 = HaCluster(self, 2, ha_replicate_default="none") + cluster2[0].connect().session().sender("q;{create:always}") + time.sleep(.1) # Give replication a chance. + try: + cluster1[1].connect_admin().session().receiver("q") + self.fail("Excpected no-such-queue exception") + except NotFound: pass + try: + cluster2[1].connect_admin().session().receiver("q") + self.fail("Excpected no-such-queue exception") + except NotFound: pass + + def test_invalid_default(self): + """Verify that a queue with an invalid qpid.replicate gets default treatment""" + cluster = HaCluster(self, 2, ha_replicate_default="all") + c = cluster[0].connect().session().sender("q;{create:always, node:{x-declare:{arguments:{'qpid.replicate':XXinvalidXX}}}}") + self.wait_backup(cluster[1], "q") + + def fairshare(msgs, limit, levels): """ Generator to return prioritised messages in expected order for a given fairshare limit |