summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2012-04-02 22:15:31 +0000
committerAlan Conway <aconway@apache.org>2012-04-02 22:15:31 +0000
commitd052286fc446f712f68cce4527ef1bc6d2717092 (patch)
tree4812c379df5c522f2f1ab346e221cde543713d38
parenta493aaaa625352344652543be606c1a00f9f685b (diff)
downloadqpid-python-d052286fc446f712f68cce4527ef1bc6d2717092.tar.gz
QPID-3603: Broker option --ha-replicate-default to specify default replication.
Takes values 'all', 'configuration', 'all'. This is the replication level to use if a queue or exchange is created without an explicit 'qpid.replicate' argument. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1308597 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/src/ha.mk16
-rw-r--r--qpid/cpp/src/qpid/ha/Backup.cpp11
-rw-r--r--qpid/cpp/src/qpid/ha/Backup.h4
-rw-r--r--qpid/cpp/src/qpid/ha/BrokerReplicator.cpp99
-rw-r--r--qpid/cpp/src/qpid/ha/BrokerReplicator.h14
-rw-r--r--qpid/cpp/src/qpid/ha/HaBroker.cpp14
-rw-r--r--qpid/cpp/src/qpid/ha/HaBroker.h5
-rw-r--r--qpid/cpp/src/qpid/ha/HaPlugin.cpp3
-rw-r--r--qpid/cpp/src/qpid/ha/QueueReplicator.cpp2
-rw-r--r--qpid/cpp/src/qpid/ha/ReplicateLevel.cpp62
-rw-r--r--qpid/cpp/src/qpid/ha/ReplicateLevel.h52
-rw-r--r--qpid/cpp/src/qpid/ha/Settings.h4
-rw-r--r--qpid/cpp/src/qpid/ha/management-schema.xml8
-rwxr-xr-xqpid/cpp/src/tests/ha_tests.py142
14 files changed, 316 insertions, 120 deletions
diff --git a/qpid/cpp/src/ha.mk b/qpid/cpp/src/ha.mk
index 8a2cee30c7..be1fb73e89 100644
--- a/qpid/cpp/src/ha.mk
+++ b/qpid/cpp/src/ha.mk
@@ -25,18 +25,20 @@ dmoduleexec_LTLIBRARIES += ha.la
ha_la_SOURCES = \
qpid/ha/Backup.cpp \
qpid/ha/Backup.h \
+ qpid/ha/BrokerReplicator.cpp \
+ qpid/ha/BrokerReplicator.h \
+ qpid/ha/ConnectionExcluder.cpp \
+ qpid/ha/ConnectionExcluder.h \
qpid/ha/HaBroker.cpp \
qpid/ha/HaBroker.h \
qpid/ha/HaPlugin.cpp \
- qpid/ha/Settings.h \
- qpid/ha/QueueReplicator.h \
qpid/ha/QueueReplicator.cpp \
- qpid/ha/ReplicatingSubscription.h \
+ qpid/ha/QueueReplicator.h \
+ qpid/ha/ReplicateLevel.cpp \
+ qpid/ha/ReplicateLevel.h \
qpid/ha/ReplicatingSubscription.cpp \
- qpid/ha/BrokerReplicator.cpp \
- qpid/ha/BrokerReplicator.h \
- qpid/ha/ConnectionExcluder.cpp \
- qpid/ha/ConnectionExcluder.h
+ qpid/ha/ReplicatingSubscription.h \
+ qpid/ha/Settings.h
ha_la_LIBADD = libqpidbroker.la
ha_la_LDFLAGS = $(PLUGINLDFLAGS)
diff --git a/qpid/cpp/src/qpid/ha/Backup.cpp b/qpid/cpp/src/qpid/ha/Backup.cpp
index 3d65e07202..eabf502f8b 100644
--- a/qpid/cpp/src/qpid/ha/Backup.cpp
+++ b/qpid/cpp/src/qpid/ha/Backup.cpp
@@ -19,10 +19,11 @@
*
*/
#include "Backup.h"
-#include "Settings.h"
#include "BrokerReplicator.h"
-#include "ReplicatingSubscription.h"
#include "ConnectionExcluder.h"
+#include "HaBroker.h"
+#include "ReplicatingSubscription.h"
+#include "Settings.h"
#include "qpid/Url.h"
#include "qpid/amqp_0_10/Codecs.h"
#include "qpid/broker/Bridge.h"
@@ -43,8 +44,8 @@ using namespace broker;
using types::Variant;
using std::string;
-Backup::Backup(broker::Broker& b, const Settings& s) :
- broker(b), settings(s), excluder(new ConnectionExcluder())
+Backup::Backup(HaBroker& hb, const Settings& s) :
+ haBroker(hb), broker(hb.getBroker()), settings(s), excluder(new ConnectionExcluder())
{
// Empty brokerUrl means delay initialization until setUrl() is called.
if (!s.brokerUrl.empty()) initialize(Url(s.brokerUrl));
@@ -63,7 +64,7 @@ void Backup::initialize(const Url& url) {
link = result.first;
link->setUrl(url);
- replicator.reset(new BrokerReplicator(link));
+ replicator.reset(new BrokerReplicator(haBroker, link));
broker.getExchanges().registerExchange(replicator);
broker.getConnectionObservers().add(excluder);
}
diff --git a/qpid/cpp/src/qpid/ha/Backup.h b/qpid/cpp/src/qpid/ha/Backup.h
index 526b238b82..6c36996914 100644
--- a/qpid/cpp/src/qpid/ha/Backup.h
+++ b/qpid/cpp/src/qpid/ha/Backup.h
@@ -38,6 +38,7 @@ namespace ha {
class Settings;
class ConnectionExcluder;
class BrokerReplicator;
+class HaBroker;
/**
* State associated with a backup broker. Manages connections to primary.
@@ -47,7 +48,7 @@ class BrokerReplicator;
class Backup
{
public:
- Backup(broker::Broker&, const Settings&);
+ Backup(HaBroker&, const Settings&);
~Backup();
void setBrokerUrl(const Url&);
@@ -55,6 +56,7 @@ class Backup
void initialize(const Url&);
sys::Mutex lock;
+ HaBroker& haBroker;
broker::Broker& broker;
Settings settings;
boost::shared_ptr<broker::Link> link;
diff --git a/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp b/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
index 609a3378ad..9d0043a827 100644
--- a/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
+++ b/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
@@ -19,6 +19,7 @@
*
*/
#include "BrokerReplicator.h"
+#include "HaBroker.h"
#include "QueueReplicator.h"
#include "qpid/broker/Broker.h"
#include "qpid/broker/Queue.h"
@@ -37,6 +38,7 @@
#include "qmf/org/apache/qpid/broker/EventQueueDelete.h"
#include "qmf/org/apache/qpid/broker/EventSubscribe.h"
#include <algorithm>
+#include <sstream>
namespace qpid {
namespace ha {
@@ -87,6 +89,7 @@ const string QUEUE("queue");
const string RHOST("rhost");
const string TYPE("type");
const string USER("user");
+const string HA_BROKER("habroker");
const string AGENT_IND_EVENT_ORG_APACHE_QPID_BROKER("agent.ind.event.org_apache_qpid_broker.#");
const string QMF2("qmf2");
@@ -100,6 +103,7 @@ const string _PACKAGE_NAME("_package_name");
const string _SCHEMA_ID("_schema_id");
const string OBJECT("OBJECT");
const string ORG_APACHE_QPID_BROKER("org.apache.qpid.broker");
+const string ORG_APACHE_QPID_HA("org.apache.qpid.ha");
const string QMF_DEFAULT_DIRECT("qmf.default.direct");
const string _QUERY_REQUEST("_query_request");
const string BROKER("broker");
@@ -113,36 +117,13 @@ template <class T> bool match(Variant::Map& schema) {
return T::match(schema[CLASS_NAME], schema[PACKAGE_NAME]);
}
-enum ReplicateLevel { RL_NONE=0, RL_CONFIGURATION, RL_ALL };
-const string S_NONE="none";
-const string S_CONFIGURATION="configuration";
-const string S_ALL="all";
-
-ReplicateLevel replicateLevel(const string& level) {
- if (level == S_NONE) return RL_NONE;
- if (level == S_CONFIGURATION) return RL_CONFIGURATION;
- if (level == S_ALL) return RL_ALL;
- throw Exception("Invalid value for "+QPID_REPLICATE+": "+level);
-}
-
-ReplicateLevel replicateLevel(const framing::FieldTable& f) {
- if (f.isSet(QPID_REPLICATE)) return replicateLevel(f.getAsString(QPID_REPLICATE));
- else return RL_NONE;
-}
-
-ReplicateLevel replicateLevel(const Variant::Map& m) {
- Variant::Map::const_iterator i = m.find(QPID_REPLICATE);
- if (i != m.end()) return replicateLevel(i->second.asString());
- else return RL_NONE;
-}
-
-void sendQuery(const string className, const string& queueName, SessionHandler& sessionHandler) {
+void sendQuery(const string& packageName, const string& className, const string& queueName, SessionHandler& sessionHandler) {
framing::AMQP_ServerProxy peer(sessionHandler.out);
Variant::Map request;
request[_WHAT] = OBJECT;
Variant::Map schema;
schema[_CLASS_NAME] = className;
- schema[_PACKAGE_NAME] = ORG_APACHE_QPID_BROKER;
+ schema[_PACKAGE_NAME] = packageName;
request[_SCHEMA_ID] = schema;
AMQFrame method((MessageTransferBody(ProtocolVersion(), QMF_DEFAULT_DIRECT, 0, 0)));
@@ -181,10 +162,33 @@ Variant::Map asMapVoid(const Variant& value) {
} // namespace
+
+ReplicateLevel BrokerReplicator::replicateLevel(const std::string& str) {
+ ReplicateLevel rl;
+ if (qpid::ha::replicateLevel(str, rl)) return rl;
+ else return haBroker.getSettings().replicateDefault;
+}
+
+ReplicateLevel BrokerReplicator::replicateLevel(const framing::FieldTable& f) {
+ if (f.isSet(QPID_REPLICATE))
+ return replicateLevel(f.getAsString(QPID_REPLICATE));
+ else
+ return haBroker.getSettings().replicateDefault;
+}
+
+ReplicateLevel BrokerReplicator::replicateLevel(const Variant::Map& m) {
+ Variant::Map::const_iterator i = m.find(QPID_REPLICATE);
+ if (i != m.end())
+ return replicateLevel(i->second.asString());
+ else
+ return haBroker.getSettings().replicateDefault;
+}
+
BrokerReplicator::~BrokerReplicator() {}
-BrokerReplicator::BrokerReplicator(const boost::shared_ptr<Link>& l)
- : Exchange(QPID_CONFIGURATION_REPLICATOR), broker(*l->getBroker()), link(l)
+BrokerReplicator::BrokerReplicator(HaBroker& hb, const boost::shared_ptr<Link>& l)
+ : Exchange(QPID_CONFIGURATION_REPLICATOR),
+ haBroker(hb), broker(hb.getBroker()), link(l)
{
QPID_LOG(info, "HA: Backup replicating from " <<
link->getTransport() << ":" << link->getHost() << ":" << link->getPort());
@@ -211,17 +215,21 @@ void BrokerReplicator::initializeBridge(Bridge& bridge, SessionHandler& sessionH
const qmf::org::apache::qpid::broker::ArgsLinkBridge& args(bridge.getArgs());
//declare and bind an event queue
- peer.getQueue().declare(queueName, "", false, false, true, true, FieldTable());
+ FieldTable declareArgs;
+ declareArgs.setString(QPID_REPLICATE, str(RL_NONE));
+ peer.getQueue().declare(queueName, "", false, false, true, true, declareArgs);
peer.getExchange().bind(queueName, QMF_DEFAULT_TOPIC, AGENT_IND_EVENT_ORG_APACHE_QPID_BROKER, FieldTable());
//subscribe to the queue
peer.getMessage().subscribe(queueName, args.i_dest, 1, 0, false, "", 0, FieldTable());
peer.getMessage().flow(args.i_dest, 0, 0xFFFFFFFF);
peer.getMessage().flow(args.i_dest, 1, 0xFFFFFFFF);
- //issue a query request for queues and another for exchanges using event queue as the reply-to address
- sendQuery(QUEUE, queueName, sessionHandler);
- sendQuery(EXCHANGE, queueName, sessionHandler);
- sendQuery(BINDING, queueName, sessionHandler);
+ // Issue a query request for queues, exchanges, bindings and the habroker
+ // using event queue as the reply-to address
+ sendQuery(ORG_APACHE_QPID_HA, HA_BROKER, queueName, sessionHandler);
+ sendQuery(ORG_APACHE_QPID_BROKER, QUEUE, queueName, sessionHandler);
+ sendQuery(ORG_APACHE_QPID_BROKER, EXCHANGE, queueName, sessionHandler);
+ sendQuery(ORG_APACHE_QPID_BROKER, BINDING, queueName, sessionHandler);
QPID_LOG(debug, "HA: Backup activated configuration bridge: " << queueName);
}
@@ -257,6 +265,7 @@ void BrokerReplicator::route(Deliverable& msg) {
if (type == QUEUE) doResponseQueue(values);
else if (type == EXCHANGE) doResponseExchange(values);
else if (type == BINDING) doResponseBind(values);
+ else if (type == HA_BROKER) doResponseHaBroker(values);
else QPID_LOG(error, "HA: Backup received unknown response type=" << type
<< " values=" << values);
}
@@ -288,7 +297,6 @@ void BrokerReplicator::doEventQueueDeclare(Variant::Map& values) {
// re-create from event.
// Events are always up to date, whereas responses may be
// out of date.
- QPID_LOG(debug, "HA: Backup created queue: " << name);
startQueueReplicator(result.first);
} else {
// FIXME aconway 2011-12-02: what's the right way to handle this?
@@ -400,7 +408,6 @@ void BrokerReplicator::doEventUnbind(Variant::Map& values) {
void BrokerReplicator::doResponseQueue(Variant::Map& values) {
QPID_LOG(debug, "HA: Backup queue response " << values);
- // FIXME aconway 2011-11-22: more flexible ways & defaults to indicate replication
Variant::Map argsMap(asMapVoid(values[ARGUMENTS]));
if (!replicateLevel(argsMap)) return;
framing::FieldTable args;
@@ -417,7 +424,6 @@ void BrokerReplicator::doResponseQueue(Variant::Map& values) {
""/*TODO: who is the user?*/,
""/*TODO: what should we use as connection id?*/);
if (result.second) {
- QPID_LOG(debug, "HA: Backup created catch-up queue: " << values[NAME]);
startQueueReplicator(result.first);
} else {
// FIXME aconway 2011-11-22: Normal to find queue already
@@ -474,7 +480,6 @@ void BrokerReplicator::doResponseBind(Variant::Map& values) {
std::string qName = getRefName(QUEUE_REF_PREFIX, values[QUEUE_REF]);
boost::shared_ptr<Exchange> exchange = broker.getExchanges().find(exName);
boost::shared_ptr<Queue> queue = broker.getQueues().find(qName);
- // FIXME aconway 2011-11-24: more flexible configuration for binding replication.
// Automatically replicate binding if queue and exchange exist and are replicated
if (exchange && replicateLevel(exchange->getArgs()) &&
@@ -490,6 +495,28 @@ void BrokerReplicator::doResponseBind(Variant::Map& values) {
}
}
+namespace {
+const string REPLICATE_DEFAULT="replicateDefault";
+}
+
+// Received the ha-broker configuration object for the primary broker.
+void BrokerReplicator::doResponseHaBroker(Variant::Map& values) {
+ try {
+ ReplicateLevel mine = haBroker.getSettings().replicateDefault;
+ ReplicateLevel primary = replicateLevel(values[REPLICATE_DEFAULT].asString());
+ if (mine != primary) {
+ std::ostringstream os;
+ os << "Replicate default on backup (" << mine
+ << ") does not match primary (" << primary << ")";
+ haBroker.shutdown(os.str());
+ }
+ } catch (const std::exception& e) {
+ std::ostringstream os;
+ os << "Received invalid replicate default from primary: " << e.what();
+ haBroker.shutdown(os.str());
+ }
+}
+
void BrokerReplicator::startQueueReplicator(const boost::shared_ptr<Queue>& queue) {
if (replicateLevel(queue->getSettings()) == RL_ALL) {
boost::shared_ptr<QueueReplicator> qr(new QueueReplicator(queue, link));
diff --git a/qpid/cpp/src/qpid/ha/BrokerReplicator.h b/qpid/cpp/src/qpid/ha/BrokerReplicator.h
index 483c251126..c9d7b9f74c 100644
--- a/qpid/cpp/src/qpid/ha/BrokerReplicator.h
+++ b/qpid/cpp/src/qpid/ha/BrokerReplicator.h
@@ -22,6 +22,7 @@
*
*/
+#include "ReplicateLevel.h"
#include "qpid/broker/Exchange.h"
#include "qpid/types/Variant.h"
#include <boost/shared_ptr.hpp>
@@ -35,7 +36,12 @@ class Bridge;
class SessionHandler;
}
+namespace framing {
+class FieldTable;
+}
+
namespace ha {
+class HaBroker;
/**
* Replicate configuration on a backup broker.
@@ -51,7 +57,7 @@ namespace ha {
class BrokerReplicator : public broker::Exchange
{
public:
- BrokerReplicator(const boost::shared_ptr<broker::Link>&);
+ BrokerReplicator(HaBroker&, const boost::shared_ptr<broker::Link>&);
~BrokerReplicator();
std::string getType() const;
@@ -64,6 +70,10 @@ class BrokerReplicator : public broker::Exchange
private:
void initializeBridge(broker::Bridge&, broker::SessionHandler&);
+ ReplicateLevel replicateLevel(const std::string&);
+ ReplicateLevel replicateLevel(const framing::FieldTable& args);
+ ReplicateLevel replicateLevel(const types::Variant::Map& args);
+
void doEventQueueDeclare(types::Variant::Map& values);
void doEventQueueDelete(types::Variant::Map& values);
void doEventExchangeDeclare(types::Variant::Map& values);
@@ -74,9 +84,11 @@ class BrokerReplicator : public broker::Exchange
void doResponseQueue(types::Variant::Map& values);
void doResponseExchange(types::Variant::Map& values);
void doResponseBind(types::Variant::Map& values);
+ void doResponseHaBroker(types::Variant::Map& values);
void startQueueReplicator(const boost::shared_ptr<broker::Queue>&);
+ HaBroker& haBroker;
broker::Broker& broker;
boost::shared_ptr<broker::Link> link;
};
diff --git a/qpid/cpp/src/qpid/ha/HaBroker.cpp b/qpid/cpp/src/qpid/ha/HaBroker.cpp
index ad6719f207..39a01ddf6b 100644
--- a/qpid/cpp/src/qpid/ha/HaBroker.cpp
+++ b/qpid/cpp/src/qpid/ha/HaBroker.cpp
@@ -27,6 +27,7 @@
#include "qpid/broker/Broker.h"
#include "qpid/broker/Link.h"
#include "qpid/broker/Queue.h"
+#include "qpid/broker/SignalHandler.h"
#include "qpid/management/ManagementAgent.h"
#include "qmf/org/apache/qpid/ha/Package.h"
#include "qmf/org/apache/qpid/ha/ArgsHaBrokerReplicate.h"
@@ -69,16 +70,18 @@ HaBroker::HaBroker(broker::Broker& b, const Settings& s)
throw Exception("Cannot start HA: management is disabled");
_qmf::Package packageInit(ma);
mgmtObject = new _qmf::HaBroker(ma, this, "ha-broker");
- // FIXME aconway 2012-03-01: should start in catch-up state and move to backup
- // only when caught up.
mgmtObject->set_status(BACKUP);
+ mgmtObject->set_replicateDefault(str(settings.replicateDefault));
ma->addObject(mgmtObject);
+
+ // NOTE: lock is not needed in a constructor but we created it just to pass
+ // to the set functions.
sys::Mutex::ScopedLock l(lock);
if (!settings.clientUrl.empty()) setClientUrl(Url(settings.clientUrl), l);
if (!settings.brokerUrl.empty()) setBrokerUrl(Url(settings.brokerUrl), l);
// If we are in a cluster, we start in backup mode.
- if (settings.cluster) backup.reset(new Backup(b, s));
+ if (settings.cluster) backup.reset(new Backup(*this, s));
}
HaBroker::~HaBroker() {}
@@ -169,4 +172,9 @@ std::vector<Url> HaBroker::getKnownBrokers() const {
return knownBrokers;
}
+void HaBroker::shutdown(const std::string& message) {
+ QPID_LOG(critical, "Shutting down: " << message);
+ broker::SignalHandler::shutdown();
+}
+
}} // namespace qpid::ha
diff --git a/qpid/cpp/src/qpid/ha/HaBroker.h b/qpid/cpp/src/qpid/ha/HaBroker.h
index 4f4ee4c944..65ad3de4a0 100644
--- a/qpid/cpp/src/qpid/ha/HaBroker.h
+++ b/qpid/cpp/src/qpid/ha/HaBroker.h
@@ -52,6 +52,11 @@ class HaBroker : public management::Manageable
management::Manageable::status_t ManagementMethod (
uint32_t methodId, management::Args& args, std::string& text);
+ broker::Broker& getBroker() { return broker; }
+ const Settings& getSettings() const { return settings; }
+
+ // Log a critical error message and shut down the broker.
+ void shutdown(const std::string& message);
private:
void setClientUrl(const Url&, const sys::Mutex::ScopedLock&);
void setBrokerUrl(const Url&, const sys::Mutex::ScopedLock&);
diff --git a/qpid/cpp/src/qpid/ha/HaPlugin.cpp b/qpid/cpp/src/qpid/ha/HaPlugin.cpp
index 6a43b591b0..419ee962da 100644
--- a/qpid/cpp/src/qpid/ha/HaPlugin.cpp
+++ b/qpid/cpp/src/qpid/ha/HaPlugin.cpp
@@ -37,6 +37,9 @@ struct Options : public qpid::Options {
"URL that backup brokers use to connect and fail over.")
("ha-public-brokers", optValue(settings.clientUrl,"URL"),
"URL that clients use to connect and fail over, defaults to ha-brokers.")
+ ("ha-replicate-default",
+ optValue(settings.replicateDefault, "LEVEL"),
+ "Replication level for creating queues and exchanges if there is no qpid.replicate argument supplied. LEVEL is 'none', 'configuration' or 'all'")
("ha-expected-backups", optValue(settings.expectedBackups, "N"),
"Number of backups expected to be active in the HA cluster.")
("ha-username", optValue(settings.username, "USER"),
diff --git a/qpid/cpp/src/qpid/ha/QueueReplicator.cpp b/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
index 6aff4879e3..83f3d28b6d 100644
--- a/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
+++ b/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
@@ -53,7 +53,7 @@ std::string QueueReplicator::replicatorName(const std::string& queueName) {
QueueReplicator::QueueReplicator(boost::shared_ptr<Queue> q, boost::shared_ptr<Link> l)
: Exchange(replicatorName(q->getName()), 0, q->getBroker()), queue(q), link(l)
{
- logPrefix = "HA: Backup " + queue->getName() + ": ";
+ logPrefix = "HA: Backup of queue " + queue->getName() + ": ";
QPID_LOG(info, logPrefix << "Created, settings: " << q->getSettings());
}
diff --git a/qpid/cpp/src/qpid/ha/ReplicateLevel.cpp b/qpid/cpp/src/qpid/ha/ReplicateLevel.cpp
new file mode 100644
index 0000000000..48f21a1f66
--- /dev/null
+++ b/qpid/cpp/src/qpid/ha/ReplicateLevel.cpp
@@ -0,0 +1,62 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "ReplicateLevel.h"
+#include "qpid/Exception.h"
+#include <iostream>
+
+namespace qpid {
+namespace ha {
+
+using namespace std;
+
+namespace {
+const string S_NONE="none";
+const string S_CONFIGURATION="configuration";
+const string S_ALL="all";
+
+string names[] = { S_NONE, S_CONFIGURATION, S_ALL };
+}
+
+bool replicateLevel(const string& level, ReplicateLevel& out) {
+ if (level == S_NONE) { out = RL_NONE; return true; }
+ if (level == S_CONFIGURATION) { out = RL_CONFIGURATION; return true; }
+ if (level == S_ALL) { out = RL_ALL; return true; }
+ return false;
+}
+
+ReplicateLevel replicateLevel(const string& level) {
+ ReplicateLevel rl;
+ if (!replicateLevel(level, rl))
+ throw Exception("Invalid value for replication level: "+level);
+ return rl;
+}
+
+string str(ReplicateLevel l) { return names[l]; }
+
+ostream& operator<<(ostream& o, ReplicateLevel rl) { return o << str(rl); }
+istream& operator>>(istream& i, ReplicateLevel& rl) {
+ string str;
+ i >> str;
+ rl = replicateLevel(str);
+ return i;
+}
+
+}} // namespace qpid::ha
diff --git a/qpid/cpp/src/qpid/ha/ReplicateLevel.h b/qpid/cpp/src/qpid/ha/ReplicateLevel.h
new file mode 100644
index 0000000000..c11e03f0ce
--- /dev/null
+++ b/qpid/cpp/src/qpid/ha/ReplicateLevel.h
@@ -0,0 +1,52 @@
+#ifndef QPID_HA_REPLICATELEVEL_H
+#define QPID_HA_REPLICATELEVEL_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <string>
+#include <iosfwd>
+
+namespace qpid {
+namespace ha {
+
+enum ReplicateLevel { RL_NONE, RL_CONFIGURATION, RL_ALL };
+
+/**
+ * If str is a valid replicate level, set out and return true.
+ */
+bool replicateLevel(const std::string& str, ReplicateLevel& out);
+
+/**
+ *@return enum corresponding to string level.
+ *@throw qpid::Exception if level is not a valid replication level.
+ */
+ReplicateLevel replicateLevel(const std::string& level);
+
+/**@return string form of replicate level */
+std::string str(ReplicateLevel l);
+
+std::ostream& operator<<(std::ostream&, ReplicateLevel);
+std::istream& operator>>(std::istream&, ReplicateLevel&);
+
+}} // namespaces qpid::ha
+
+#endif /*!QPID_HA_REPLICATELEVEL_H*/
diff --git a/qpid/cpp/src/qpid/ha/Settings.h b/qpid/cpp/src/qpid/ha/Settings.h
index 7df18b4ef4..bf70c3f3f7 100644
--- a/qpid/cpp/src/qpid/ha/Settings.h
+++ b/qpid/cpp/src/qpid/ha/Settings.h
@@ -22,6 +22,7 @@
*
*/
+#include "ReplicateLevel.h"
#include <string>
namespace qpid {
@@ -33,11 +34,12 @@ namespace ha {
class Settings
{
public:
- Settings() : cluster(false), expectedBackups(0) {}
+ Settings() : cluster(false), expectedBackups(0), replicateDefault(RL_NONE) {}
bool cluster; // True if we are a cluster member.
std::string clientUrl;
std::string brokerUrl;
size_t expectedBackups;
+ ReplicateLevel replicateDefault;
std::string username, password, mechanism;
private:
};
diff --git a/qpid/cpp/src/qpid/ha/management-schema.xml b/qpid/cpp/src/qpid/ha/management-schema.xml
index 9a815b346c..363dff61fb 100644
--- a/qpid/cpp/src/qpid/ha/management-schema.xml
+++ b/qpid/cpp/src/qpid/ha/management-schema.xml
@@ -32,7 +32,11 @@
desc="Multiple-address URL used by clients to connect to the HA brokers."/>
<property name="expectedBackups" type="uint16"
- desc="Number of HA backup brokers expected."/>>
+ desc="Number of HA backup brokers expected."/>
+
+ <property
+ name="replicateDefault" type="sstr"
+ desc="Replicate value for queues/exchanges without a qpid.replicate argument"/>
<method name="promote" desc="Promote a backup broker to primary."/>
@@ -48,7 +52,7 @@
<arg name="expectedBackups" type="uint16" dir="I"/>
</method>
- <method name="replicate" desc="Replicate from a remote queue to the local broker.">
+ <method name="replicate" desc="Replicate individual queue from remote broker.">
<arg name="broker" type="sstr" dir="I"/>
<arg name="queue" type="sstr" dir="I"/>
</method>
diff --git a/qpid/cpp/src/tests/ha_tests.py b/qpid/cpp/src/tests/ha_tests.py
index e9d44c21e0..10d5fc0db2 100755
--- a/qpid/cpp/src/tests/ha_tests.py
+++ b/qpid/cpp/src/tests/ha_tests.py
@@ -29,13 +29,17 @@ from qpidtoollibs import BrokerAgent
log = getLogger("qpid.ha-tests")
class HaBroker(Broker):
- def __init__(self, test, args=[], broker_url=None, ha_cluster=True, **kwargs):
+ def __init__(self, test, args=[], broker_url=None, ha_cluster=True,
+ ha_replicate_default="all", **kwargs):
assert BrokerTest.ha_lib, "Cannot locate HA plug-in"
args = copy(args)
- args.extend(["--load-module", BrokerTest.ha_lib,
- # FIXME aconway 2012-02-13: workaround slow link failover.
- "--link-maintenace-interval=0.1",
- "--ha-cluster=%s"%ha_cluster])
+ args += ["--load-module", BrokerTest.ha_lib,
+ "--log-enable=info+", "--log-enable=debug+:ha::",
+ # FIXME aconway 2012-02-13: workaround slow link failover.
+ "--link-maintenace-interval=0.1",
+ "--ha-cluster=%s"%ha_cluster]
+ if ha_replicate_default is not None:
+ args += [ "--ha-replicate-default=%s"%ha_replicate_default ]
if broker_url: args.extend([ "--ha-brokers", broker_url ])
Broker.__init__(self, test, args, **kwargs)
self.commands=os.getenv("PYTHON_COMMANDS")
@@ -64,6 +68,10 @@ class HaBroker(Broker):
assert os.system(
"%s/qpid-config --broker=%s add queue %s --replicate %s"%(self.commands, self.host_port(), queue, replication)) == 0
+ def connect_admin(self, **kwargs):
+ return Broker.connect(self, client_properties={"qpid.ha-admin":1}, **kwargs)
+
+
class HaCluster(object):
_cluster_count = 0
@@ -72,9 +80,9 @@ class HaCluster(object):
self.test = test
self._brokers = [ HaBroker(test, name="broker%s-%s"%(HaCluster._cluster_count, i), **kwargs) for i in xrange(n)]
HaCluster._cluster_count += 1
- self[0].promote()
self.url = ",".join([b.host_port() for b in self])
for b in self: b.set_broker_url(self.url)
+ self[0].promote()
def connect(self, i):
"""Connect with reconnect_urls"""
@@ -98,8 +106,6 @@ class HaCluster(object):
def __iter__(self): return self._brokers.__iter__()
-def qr_node(value="all"): return "node:{x-declare:{arguments:{'qpid.replicate':%s}}}" % value
-
class HaTest(BrokerTest):
"""Base class for HA test cases, defines convenience functions"""
@@ -114,13 +120,13 @@ class HaTest(BrokerTest):
# Wait for address to become valid on a backup broker.
def wait_backup(self, backup, address):
- bs = self.connect_admin(backup).session()
+ bs = backup.connect_admin().session()
self.wait(bs, address)
bs.connection.close()
# Combines wait_backup and assert_browse_retry
def assert_browse_backup(self, backup, queue, expected, **kwargs):
- bs = self.connect_admin(backup).session()
+ bs = backup.connect_admin().session()
self.wait(bs, queue)
self.assert_browse_retry(bs, queue, expected, **kwargs)
bs.connection.close()
@@ -128,12 +134,9 @@ class HaTest(BrokerTest):
def assert_missing(self, session, address):
try:
session.receiver(address)
- self.fail("Should not have been replicated: %s"%(address))
+ self.fail("Expected NotFound: %s"%(address))
except NotFound: pass
- def connect_admin(self, backup, **kwargs):
- """Connect to a backup broker as an admin connection"""
- return backup.connect(client_properties={"qpid.ha-admin":1}, **kwargs)
class ReplicationTests(HaTest):
"""Correctness tests for HA replication."""
@@ -173,7 +176,6 @@ class ReplicationTests(HaTest):
def verify(b, prefix, p):
"""Verify setup was replicated to backup b"""
-
# Wait for configuration to replicate.
self.wait(b, prefix+"x");
self.assert_browse_retry(b, prefix+"q1", ["b", "1", "4"])
@@ -203,7 +205,7 @@ class ReplicationTests(HaTest):
setup(p, "2", primary)
# Verify the data on the backup
- b = self.connect_admin(backup).session()
+ b = backup.connect_admin().session()
verify(b, "1", p)
verify(b, "2", p)
# Test a series of messages, enqueue all then dequeue all.
@@ -230,12 +232,10 @@ class ReplicationTests(HaTest):
self.assert_browse_retry(b, "foo", msgs[i+1:])
def test_sync(self):
- def queue(name, replicate):
- return "%s;{create:always,%s}"%(name, qr_node(replicate))
primary = HaBroker(self, name="primary")
primary.promote()
p = primary.connect().session()
- s = p.sender(queue("q","all"))
+ s = p.sender("q;{create:always}")
for m in [str(i) for i in range(0,10)]: s.send(m)
s.sync()
backup1 = HaBroker(self, name="backup1", broker_url=primary.host_port())
@@ -246,46 +246,37 @@ class ReplicationTests(HaTest):
s.sync()
msgs = [str(i) for i in range(30)]
- b1 = self.connect_admin(backup1).session()
+ b1 = backup1.connect_admin().session()
self.wait(b1, "q");
self.assert_browse_retry(b1, "q", msgs)
- b2 = self.connect_admin(backup2).session()
+ b2 = backup2.connect_admin().session()
self.wait(b2, "q");
self.assert_browse_retry(b2, "q", msgs)
def test_send_receive(self):
"""Verify sequence numbers of messages sent by qpid-send"""
+ brokers = HaCluster(self, 3)
getLogger().setLevel(ERROR) # Hide expected WARNING log messages from failover.
- primary = HaBroker(self, name="primary")
- primary.promote()
- backup1 = HaBroker(self, name="backup1", broker_url=primary.host_port())
- backup2 = HaBroker(self, name="backup2", broker_url=primary.host_port())
sender = self.popen(
["qpid-send",
- "--broker", primary.host_port(),
- "--address", "q;{create:always,%s}"%(qr_node("all")),
+ "--broker", brokers[0].host_port(),
+ "--address", "q;{create:always}",
"--messages=1000",
"--content-string=x"
])
receiver = self.popen(
["qpid-receive",
- "--broker", primary.host_port(),
- "--address", "q;{create:always,%s}"%(qr_node("all")),
+ "--broker", brokers[0].host_port(),
+ "--address", "q;{create:always}",
"--messages=990",
"--timeout=10"
])
- try:
- self.assertEqual(sender.wait(), 0)
- self.assertEqual(receiver.wait(), 0)
- expect = [long(i) for i in range(991, 1001)]
- sn = lambda m: m.properties["sn"]
- self.assert_browse_retry(self.connect_admin(backup1).session(), "q", expect, transform=sn)
- self.assert_browse_retry(self.connect_admin(backup2).session(), "q", expect, transform=sn)
- except:
- print self.browse(primary.connect().session(), "q", transform=sn)
- print self.browse(self.connect_admin(backup1).session(), "q", transform=sn)
- print self.browse(self.connect_admin(backup2).session(), "q", transform=sn)
- raise
+ self.assertEqual(sender.wait(), 0)
+ self.assertEqual(receiver.wait(), 0)
+ expect = [long(i) for i in range(991, 1001)]
+ sn = lambda m: m.properties["sn"]
+ self.assert_browse_backup(brokers[1], "q", expect, transform=sn)
+ self.assert_browse_backup(brokers[2], "q", expect, transform=sn)
def test_failover_python(self):
"""Verify that backups rejects connections and that fail-over works in python client"""
@@ -299,12 +290,12 @@ class ReplicationTests(HaTest):
self.fail("Expected connection to backup to fail")
except ConnectionError: pass
# Check that admin connections are allowed to backup.
- self.connect_admin(backup).close()
+ backup.connect_admin().close()
# Test discovery: should connect to primary after reject by backup
c = backup.connect(reconnect_urls=[primary.host_port(), backup.host_port()], reconnect=True)
s = c.session()
- sender = s.sender("q;{create:always,%s}"%(qr_node()))
+ sender = s.sender("q;{create:always}")
self.wait_backup(backup, "q")
sender.send("foo")
primary.kill()
@@ -319,7 +310,7 @@ class ReplicationTests(HaTest):
primary.promote()
backup = HaBroker(self, name="backup", broker_url=primary.host_port())
url="%s,%s"%(primary.host_port(), backup.host_port())
- primary.connect().session().sender("q;{create:always,%s}"%(qr_node()))
+ primary.connect().session().sender("q;{create:always}")
self.wait_backup(backup, "q")
sender = NumberedSender(primary, url=url, queue="q", failover_updates = False)
@@ -340,8 +331,7 @@ class ReplicationTests(HaTest):
def test_backup_failover(self):
"""Verify that a backup broker fails over and recovers queue state"""
brokers = HaCluster(self, 3)
- brokers[0].connect().session().sender(
- "q;{create:always,%s}"%(qr_node())).send("a")
+ brokers[0].connect().session().sender("q;{create:always}").send("a")
for b in brokers[1:]: self.assert_browse_backup(b, "q", ["a"])
brokers[0].expect = EXPECT_EXIT_FAIL
brokers.kill(0)
@@ -362,11 +352,11 @@ class ReplicationTests(HaTest):
def test_standalone_queue_replica(self):
"""Test replication of individual queues outside of cluster mode"""
getLogger().setLevel(ERROR) # Hide expected WARNING log messages from failover.
- primary = HaBroker(self, name="primary", ha_cluster=False, args=["--log-enable=debug+"])
+ primary = HaBroker(self, name="primary", ha_cluster=False)
pc = primary.connect()
ps = pc.session().sender("q;{create:always}")
pr = pc.session().receiver("q;{create:always}")
- backup = HaBroker(self, name="backup", ha_cluster=False, args=["--log-enable=debug+"])
+ backup = HaBroker(self, name="backup", ha_cluster=False)
br = backup.connect().session().receiver("q;{create:always}")
# Set up replication with qpid-ha
@@ -392,9 +382,9 @@ class ReplicationTests(HaTest):
cluster = HaCluster(self, 2)
primary = cluster[0]
pc = cluster.connect(0)
- ps = pc.session().sender("q;{create:always,%s}"%qr_node("all"))
- pr = pc.session().receiver("q;{create:always,%s}"%qr_node("all"))
- backup = HaBroker(self, name="backup", ha_cluster=False, args=["--log-enable=debug+"])
+ ps = pc.session().sender("q;{create:always}")
+ pr = pc.session().receiver("q;{create:always}")
+ backup = HaBroker(self, name="backup", ha_cluster=False)
br = backup.connect().session().receiver("q;{create:always}")
backup.replicate(cluster.url, "q")
ps.send("a")
@@ -413,7 +403,7 @@ class ReplicationTests(HaTest):
primary = HaBroker(self, name="primary")
primary.promote()
backup = HaBroker(self, name="backup", broker_url=primary.host_port())
- s = primary.connect().session().sender("lvq; {create:always, node:{x-declare:{arguments:{'qpid.last_value_queue_key':lvq-key, 'qpid.replicate':all}}}}")
+ s = primary.connect().session().sender("lvq; {create:always, node:{x-declare:{arguments:{'qpid.last_value_queue_key':lvq-key}}}}")
def send(key,value): s.send(Message(content=value,properties={"lvq-key":key}))
for kv in [("a","a-1"),("b","b-1"),("a","a-2"),("a","a-3"),("c","c-1"),("c","c-2")]:
send(*kv)
@@ -426,19 +416,21 @@ class ReplicationTests(HaTest):
self.assert_browse_backup(backup, "lvq", ["a-3", "b-2", "c-3", "d-1"])
def test_ring(self):
+ """Test replication with the ring queue policy"""
primary = HaBroker(self, name="primary")
primary.promote()
backup = HaBroker(self, name="backup", broker_url=primary.host_port())
- s = primary.connect().session().sender("q; {create:always, node:{x-declare:{arguments:{'qpid.policy_type':ring, 'qpid.max_count':5, 'qpid.replicate':all}}}}")
+ s = primary.connect().session().sender("q; {create:always, node:{x-declare:{arguments:{'qpid.policy_type':ring, 'qpid.max_count':5}}}}")
for i in range(10): s.send(Message(str(i)))
self.assert_browse_backup(backup, "q", [str(i) for i in range(5,10)])
def test_reject(self):
+ """Test replication with the reject queue policy"""
getLogger().setLevel(ERROR) # Hide expected WARNING log messages from failover.
primary = HaBroker(self, name="primary")
primary.promote()
backup = HaBroker(self, name="backup", broker_url=primary.host_port())
- s = primary.connect().session().sender("q; {create:always, node:{x-declare:{arguments:{'qpid.policy_type':reject, 'qpid.max_count':5, 'qpid.replicate':all}}}}")
+ s = primary.connect().session().sender("q; {create:always, node:{x-declare:{arguments:{'qpid.policy_type':reject, 'qpid.max_count':5}}}}")
try:
for i in range(10): s.send(Message(str(i)), sync=False)
except qpid.messaging.exceptions.TargetCapacityExceeded: pass
@@ -450,12 +442,12 @@ class ReplicationTests(HaTest):
primary.promote()
backup = HaBroker(self, name="backup", broker_url=primary.host_port())
session = primary.connect().session()
- s = session.sender("priority-queue; {create:always, node:{x-declare:{arguments:{'qpid.priorities':10, 'qpid.replicate':all}}}}")
+ s = session.sender("priority-queue; {create:always, node:{x-declare:{arguments:{'qpid.priorities':10}}}}")
priorities = [8,9,5,1,2,2,3,4,9,7,8,9,9,2]
for p in priorities: s.send(Message(priority=p))
# Can't use browse_backup as browser sees messages in delivery order not priority.
self.wait_backup(backup, "priority-queue")
- r = self.connect_admin(backup).session().receiver("priority-queue")
+ r = backup.connect_admin().session().receiver("priority-queue")
received = [r.fetch().priority for i in priorities]
self.assertEqual(sorted(priorities, reverse=True), received)
@@ -469,11 +461,11 @@ class ReplicationTests(HaTest):
priorities = [4,5,3,7,8,8,2,8,2,8,8,16,6,6,6,6,6,6,8,3,5,8,3,5,5,3,3,8,8,3,7,3,7,7,7,8,8,8,2,3]
limits={7:0,6:4,5:3,4:2,3:2,2:2,1:2}
limit_policy = ",".join(["'qpid.fairshare':5"] + ["'qpid.fairshare-%s':%s"%(i[0],i[1]) for i in limits.iteritems()])
- s = session.sender("priority-queue; {create:always, node:{x-declare:{arguments:{'qpid.priorities':%s, %s, 'qpid.replicate':all}}}}"%(levels,limit_policy))
+ s = session.sender("priority-queue; {create:always, node:{x-declare:{arguments:{'qpid.priorities':%s, %s}}}}"%(levels,limit_policy))
messages = [Message(content=str(uuid4()), priority = p) for p in priorities]
for m in messages: s.send(m)
self.wait_backup(backup, s.target)
- r = self.connect_admin(backup).session().receiver("priority-queue")
+ r = backup.connect_admin().session().receiver("priority-queue")
received = [r.fetch().content for i in priorities]
sort = sorted(messages, key=lambda m: priority_level(m.priority, levels), reverse=True)
fair = [m.content for m in fairshare(sort, lambda l: limits.get(l,0), levels)]
@@ -483,13 +475,14 @@ class ReplicationTests(HaTest):
primary = HaBroker(self, name="primary")
primary.promote()
backup = HaBroker(self, name="backup", broker_url=primary.host_port())
- s = primary.connect().session().sender("q; {create:always, node:{x-declare:{arguments:{'qpid.policy_type':ring, 'qpid.max_count':5, 'qpid.priorities':10, 'qpid.replicate':all}}}}")
+ s = primary.connect().session().sender("q; {create:always, node:{x-declare:{arguments:{'qpid.policy_type':ring, 'qpid.max_count':5, 'qpid.priorities':10}}}}")
priorities = [8,9,5,1,2,2,3,4,9,7,8,9,9,2]
for p in priorities: s.send(Message(priority=p))
- # FIXME aconway 2012-02-22: there is a bug in priority ring queues that allows a low
- # priority message to displace a high one. The following commented-out assert_browse
- # is for the correct result, the uncommented one is for the actualy buggy result.
- # See https://issues.apache.org/jira/browse/QPID-3866
+ # FIXME aconway 2012-02-22: there is a bug in priority ring
+ # queues that allows a low priority message to displace a high
+ # one. The following commented-out assert_browse is for the
+ # correct result, the uncommented one is for the actualy buggy
+ # result. See https://issues.apache.org/jira/browse/QPID-3866
#
# self.assert_browse_backup(backup, "q", sorted(priorities,reverse=True)[0:5], transform=lambda m: m.priority)
self.assert_browse_backup(backup, "q", [9,9,9,9,2], transform=lambda m: m.priority)
@@ -539,6 +532,29 @@ class ReplicationTests(HaTest):
for t in tests: t.verify(self, backup1)
for t in tests: t.verify(self, backup2)
+ def test_replicate_default(self):
+ """Make sure we don't replicate if ha-replicate-default is unspecified or none"""
+ cluster1 = HaCluster(self, 2, ha_replicate_default=None)
+ c1 = cluster1[0].connect().session().sender("q;{create:always}")
+ cluster2 = HaCluster(self, 2, ha_replicate_default="none")
+ cluster2[0].connect().session().sender("q;{create:always}")
+ time.sleep(.1) # Give replication a chance.
+ try:
+ cluster1[1].connect_admin().session().receiver("q")
+ self.fail("Excpected no-such-queue exception")
+ except NotFound: pass
+ try:
+ cluster2[1].connect_admin().session().receiver("q")
+ self.fail("Excpected no-such-queue exception")
+ except NotFound: pass
+
+ def test_invalid_default(self):
+ """Verify that a queue with an invalid qpid.replicate gets default treatment"""
+ cluster = HaCluster(self, 2, ha_replicate_default="all")
+ c = cluster[0].connect().session().sender("q;{create:always, node:{x-declare:{arguments:{'qpid.replicate':XXinvalidXX}}}}")
+ self.wait_backup(cluster[1], "q")
+
+
def fairshare(msgs, limit, levels):
"""
Generator to return prioritised messages in expected order for a given fairshare limit