summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/ha
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/ha')
-rw-r--r--cpp/src/qpid/ha/Backup.cpp36
-rw-r--r--cpp/src/qpid/ha/Backup.h4
-rw-r--r--cpp/src/qpid/ha/BrokerReplicator.cpp190
-rw-r--r--cpp/src/qpid/ha/BrokerReplicator.h16
-rw-r--r--cpp/src/qpid/ha/HaBroker.cpp95
-rw-r--r--cpp/src/qpid/ha/HaBroker.h8
-rw-r--r--cpp/src/qpid/ha/HaPlugin.cpp28
-rw-r--r--cpp/src/qpid/ha/QueueReplicator.cpp62
-rw-r--r--cpp/src/qpid/ha/QueueReplicator.h5
-rw-r--r--cpp/src/qpid/ha/ReplicateLevel.cpp72
-rw-r--r--cpp/src/qpid/ha/ReplicateLevel.h52
-rw-r--r--cpp/src/qpid/ha/ReplicatingSubscription.cpp71
-rw-r--r--cpp/src/qpid/ha/ReplicatingSubscription.h4
-rw-r--r--cpp/src/qpid/ha/Settings.h7
-rw-r--r--cpp/src/qpid/ha/management-schema.xml35
15 files changed, 483 insertions, 202 deletions
diff --git a/cpp/src/qpid/ha/Backup.cpp b/cpp/src/qpid/ha/Backup.cpp
index 5acbfb9d5f..3f3fa87a01 100644
--- a/cpp/src/qpid/ha/Backup.cpp
+++ b/cpp/src/qpid/ha/Backup.cpp
@@ -19,10 +19,11 @@
*
*/
#include "Backup.h"
-#include "Settings.h"
#include "BrokerReplicator.h"
-#include "ReplicatingSubscription.h"
#include "ConnectionExcluder.h"
+#include "HaBroker.h"
+#include "ReplicatingSubscription.h"
+#include "Settings.h"
#include "qpid/Url.h"
#include "qpid/amqp_0_10/Codecs.h"
#include "qpid/broker/Bridge.h"
@@ -43,37 +44,44 @@ using namespace broker;
using types::Variant;
using std::string;
-Backup::Backup(broker::Broker& b, const Settings& s) :
- broker(b), settings(s), excluder(new ConnectionExcluder())
+Backup::Backup(HaBroker& hb, const Settings& s) :
+ haBroker(hb), broker(hb.getBroker()), settings(s), excluder(new ConnectionExcluder())
{
+ // Exclude client connections before starting the link to avoid self-connection.
+ broker.getConnectionObservers().add(excluder);
// Empty brokerUrl means delay initialization until setUrl() is called.
if (!s.brokerUrl.empty()) initialize(Url(s.brokerUrl));
}
void Backup::initialize(const Url& url) {
- assert(!url.empty());
- QPID_LOG(notice, "Ha: Backup started: " << url);
+ if (url.empty()) throw Url::Invalid("HA broker URL is empty");
+ QPID_LOG(notice, "HA: Backup initialized: " << url);
string protocol = url[0].protocol.empty() ? "tcp" : url[0].protocol;
// Declare the link
std::pair<Link::shared_ptr, bool> result = broker.getLinks().declare(
url[0].host, url[0].port, protocol,
false, // durable
settings.mechanism, settings.username, settings.password);
- assert(result.second); // FIXME aconway 2011-11-23: error handling
link = result.first;
link->setUrl(url);
-
- replicator.reset(new BrokerReplicator(link));
+ replicator.reset(new BrokerReplicator(haBroker, link));
broker.getExchanges().registerExchange(replicator);
- broker.getConnectionObservers().add(excluder);
}
+Backup::~Backup() {
+ if (link) link->close();
+ if (replicator.get()) broker.getExchanges().destroy(replicator->getName());
+ replicator.reset();
+ broker.getConnectionObservers().remove(excluder); // This allows client connections.
+}
+
+
void Backup::setBrokerUrl(const Url& url) {
// Ignore empty URLs seen during start-up for some tests.
if (url.empty()) return;
sys::Mutex::ScopedLock l(lock);
if (link) { // URL changed after we initialized.
- QPID_LOG(info, "HA: Backup failover URL set to " << url);
+ QPID_LOG(info, "HA: Backup broker URL set to " << url);
link->setUrl(url);
}
else {
@@ -81,10 +89,4 @@ void Backup::setBrokerUrl(const Url& url) {
}
}
-Backup::~Backup() {
- if (link) link->close();
- if (replicator.get()) broker.getExchanges().destroy(replicator->getName());
- broker.getConnectionObservers().remove(excluder); // This allows client connections.
-}
-
}} // namespace qpid::ha
diff --git a/cpp/src/qpid/ha/Backup.h b/cpp/src/qpid/ha/Backup.h
index 526b238b82..6c36996914 100644
--- a/cpp/src/qpid/ha/Backup.h
+++ b/cpp/src/qpid/ha/Backup.h
@@ -38,6 +38,7 @@ namespace ha {
class Settings;
class ConnectionExcluder;
class BrokerReplicator;
+class HaBroker;
/**
* State associated with a backup broker. Manages connections to primary.
@@ -47,7 +48,7 @@ class BrokerReplicator;
class Backup
{
public:
- Backup(broker::Broker&, const Settings&);
+ Backup(HaBroker&, const Settings&);
~Backup();
void setBrokerUrl(const Url&);
@@ -55,6 +56,7 @@ class Backup
void initialize(const Url&);
sys::Mutex lock;
+ HaBroker& haBroker;
broker::Broker& broker;
Settings settings;
boost::shared_ptr<broker::Link> link;
diff --git a/cpp/src/qpid/ha/BrokerReplicator.cpp b/cpp/src/qpid/ha/BrokerReplicator.cpp
index a8f05c1fe3..d0c99cbdb6 100644
--- a/cpp/src/qpid/ha/BrokerReplicator.cpp
+++ b/cpp/src/qpid/ha/BrokerReplicator.cpp
@@ -19,6 +19,7 @@
*
*/
#include "BrokerReplicator.h"
+#include "HaBroker.h"
#include "QueueReplicator.h"
#include "qpid/broker/Broker.h"
#include "qpid/broker/Queue.h"
@@ -37,6 +38,7 @@
#include "qmf/org/apache/qpid/broker/EventQueueDelete.h"
#include "qmf/org/apache/qpid/broker/EventSubscribe.h"
#include <algorithm>
+#include <sstream>
namespace qpid {
namespace ha {
@@ -87,6 +89,7 @@ const string QUEUE("queue");
const string RHOST("rhost");
const string TYPE("type");
const string USER("user");
+const string HA_BROKER("habroker");
const string AGENT_IND_EVENT_ORG_APACHE_QPID_BROKER("agent.ind.event.org_apache_qpid_broker.#");
const string QMF2("qmf2");
@@ -100,6 +103,7 @@ const string _PACKAGE_NAME("_package_name");
const string _SCHEMA_ID("_schema_id");
const string OBJECT("OBJECT");
const string ORG_APACHE_QPID_BROKER("org.apache.qpid.broker");
+const string ORG_APACHE_QPID_HA("org.apache.qpid.ha");
const string QMF_DEFAULT_DIRECT("qmf.default.direct");
const string _QUERY_REQUEST("_query_request");
const string BROKER("broker");
@@ -113,36 +117,13 @@ template <class T> bool match(Variant::Map& schema) {
return T::match(schema[CLASS_NAME], schema[PACKAGE_NAME]);
}
-enum ReplicateLevel { RL_NONE=0, RL_CONFIGURATION, RL_MESSAGES };
-const string S_NONE="none";
-const string S_CONFIGURATION="configuration";
-const string S_MESSAGES="messages";
-
-ReplicateLevel replicateLevel(const string& level) {
- if (level == S_NONE) return RL_NONE;
- if (level == S_CONFIGURATION) return RL_CONFIGURATION;
- if (level == S_MESSAGES) return RL_MESSAGES;
- throw Exception("Invalid value for "+QPID_REPLICATE+": "+level);
-}
-
-ReplicateLevel replicateLevel(const framing::FieldTable& f) {
- if (f.isSet(QPID_REPLICATE)) return replicateLevel(f.getAsString(QPID_REPLICATE));
- else return RL_NONE;
-}
-
-ReplicateLevel replicateLevel(const Variant::Map& m) {
- Variant::Map::const_iterator i = m.find(QPID_REPLICATE);
- if (i != m.end()) return replicateLevel(i->second.asString());
- else return RL_NONE;
-}
-
-void sendQuery(const string className, const string& queueName, SessionHandler& sessionHandler) {
+void sendQuery(const string& packageName, const string& className, const string& queueName, SessionHandler& sessionHandler) {
framing::AMQP_ServerProxy peer(sessionHandler.out);
Variant::Map request;
request[_WHAT] = OBJECT;
Variant::Map schema;
schema[_CLASS_NAME] = className;
- schema[_PACKAGE_NAME] = ORG_APACHE_QPID_BROKER;
+ schema[_PACKAGE_NAME] = packageName;
request[_SCHEMA_ID] = schema;
AMQFrame method((MessageTransferBody(ProtocolVersion(), QMF_DEFAULT_DIRECT, 0, 0)));
@@ -181,13 +162,34 @@ Variant::Map asMapVoid(const Variant& value) {
} // namespace
+
+ReplicateLevel BrokerReplicator::replicateLevel(const std::string& str) {
+ ReplicateLevel rl;
+ if (qpid::ha::replicateLevel(str, rl)) return rl;
+ else return haBroker.getSettings().replicateDefault;
+}
+
+ReplicateLevel BrokerReplicator::replicateLevel(const framing::FieldTable& f) {
+ if (f.isSet(QPID_REPLICATE))
+ return replicateLevel(f.getAsString(QPID_REPLICATE));
+ else
+ return haBroker.getSettings().replicateDefault;
+}
+
+ReplicateLevel BrokerReplicator::replicateLevel(const Variant::Map& m) {
+ Variant::Map::const_iterator i = m.find(QPID_REPLICATE);
+ if (i != m.end())
+ return replicateLevel(i->second.asString());
+ else
+ return haBroker.getSettings().replicateDefault;
+}
+
BrokerReplicator::~BrokerReplicator() {}
-BrokerReplicator::BrokerReplicator(const boost::shared_ptr<Link>& l)
- : Exchange(QPID_CONFIGURATION_REPLICATOR), broker(*l->getBroker()), link(l)
+BrokerReplicator::BrokerReplicator(HaBroker& hb, const boost::shared_ptr<Link>& l)
+ : Exchange(QPID_CONFIGURATION_REPLICATOR),
+ haBroker(hb), broker(hb.getBroker()), link(l)
{
- QPID_LOG(info, "HA: Backup replicating from " <<
- link->getTransport() << ":" << link->getHost() << ":" << link->getPort());
broker.getLinks().declare(
link->getHost(), link->getPort(),
false, // durable
@@ -211,22 +213,26 @@ void BrokerReplicator::initializeBridge(Bridge& bridge, SessionHandler& sessionH
const qmf::org::apache::qpid::broker::ArgsLinkBridge& args(bridge.getArgs());
//declare and bind an event queue
- peer.getQueue().declare(queueName, "", false, false, true, true, FieldTable());
+ FieldTable declareArgs;
+ declareArgs.setString(QPID_REPLICATE, str(RL_NONE));
+ peer.getQueue().declare(queueName, "", false, false, true, true, declareArgs);
peer.getExchange().bind(queueName, QMF_DEFAULT_TOPIC, AGENT_IND_EVENT_ORG_APACHE_QPID_BROKER, FieldTable());
//subscribe to the queue
peer.getMessage().subscribe(queueName, args.i_dest, 1, 0, false, "", 0, FieldTable());
peer.getMessage().flow(args.i_dest, 0, 0xFFFFFFFF);
peer.getMessage().flow(args.i_dest, 1, 0xFFFFFFFF);
- //issue a query request for queues and another for exchanges using event queue as the reply-to address
- sendQuery(QUEUE, queueName, sessionHandler);
- sendQuery(EXCHANGE, queueName, sessionHandler);
- sendQuery(BINDING, queueName, sessionHandler);
- QPID_LOG(debug, "HA: Backup activated configuration bridge: " << queueName);
+ // Issue a query request for queues, exchanges, bindings and the habroker
+ // using event queue as the reply-to address
+ sendQuery(ORG_APACHE_QPID_HA, HA_BROKER, queueName, sessionHandler);
+ sendQuery(ORG_APACHE_QPID_BROKER, QUEUE, queueName, sessionHandler);
+ sendQuery(ORG_APACHE_QPID_BROKER, EXCHANGE, queueName, sessionHandler);
+ sendQuery(ORG_APACHE_QPID_BROKER, BINDING, queueName, sessionHandler);
+ QPID_LOG(debug, "HA: Backup configuration bridge: " << queueName);
}
-// FIXME aconway 2011-12-02: error handling in route.
-void BrokerReplicator::route(Deliverable& msg, const string& /*key*/, const framing::FieldTable* headers) {
+void BrokerReplicator::route(Deliverable& msg) {
+ const framing::FieldTable* headers = msg.getMessage().getApplicationHeaders();
Variant::List list;
try {
if (!isQMFv2(msg.getMessage()) || !headers)
@@ -238,6 +244,7 @@ void BrokerReplicator::route(Deliverable& msg, const string& /*key*/, const fram
if (headers->getAsString(QMF_CONTENT) == EVENT) {
for (Variant::List::iterator i = list.begin(); i != list.end(); ++i) {
Variant::Map& map = i->asMap();
+ QPID_LOG(trace, "HA: Backup received event: " << map);
Variant::Map& schema = map[SCHEMA_ID].asMap();
Variant::Map& values = map[VALUES].asMap();
if (match<EventQueueDeclare>(schema)) doEventQueueDeclare(values);
@@ -249,19 +256,22 @@ void BrokerReplicator::route(Deliverable& msg, const string& /*key*/, const fram
}
} else if (headers->getAsString(QMF_OPCODE) == QUERY_RESPONSE) {
for (Variant::List::iterator i = list.begin(); i != list.end(); ++i) {
- string type = i->asMap()[SCHEMA_ID].asMap()[CLASS_NAME];
- Variant::Map& values = i->asMap()[VALUES].asMap();
+ Variant::Map& map = i->asMap();
+ QPID_LOG(trace, "HA: Backup received event: " << map);
+ string type = map[SCHEMA_ID].asMap()[CLASS_NAME];
+ Variant::Map& values = map[VALUES].asMap();
framing::FieldTable args;
amqp_0_10::translate(asMapVoid(values[ARGUMENTS]), args);
if (type == QUEUE) doResponseQueue(values);
else if (type == EXCHANGE) doResponseExchange(values);
else if (type == BINDING) doResponseBind(values);
- else QPID_LOG(error, "HA: Backup received unknown response type=" << type
- << " values=" << values);
+ else if (type == HA_BROKER) doResponseHaBroker(values);
}
- } else QPID_LOG(error, "HA: Backup received unexpected message: " << *headers);
+ }
} catch (const std::exception& e) {
- QPID_LOG(error, "HA: Backup replication error: " << e.what() << ": while handling: " << list);
+ QPID_LOG(critical, "HA: Backup configuration failed: " << e.what()
+ << ": while handling: " << list);
+ throw;
}
}
@@ -282,15 +292,13 @@ void BrokerReplicator::doEventQueueDeclare(Variant::Map& values) {
values[USER].asString(),
values[RHOST].asString());
if (result.second) {
- // FIXME aconway 2011-11-22: should delete old queue and
- // re-create from event.
- // Events are always up to date, whereas responses may be
- // out of date.
- QPID_LOG(debug, "HA: Backup created queue: " << name);
+ QPID_LOG(debug, "HA: Backup queue declare event: " << name);
startQueueReplicator(result.first);
} else {
// FIXME aconway 2011-12-02: what's the right way to handle this?
- QPID_LOG(warning, "HA: Backup queue already exists: " << name);
+ // Should we delete the old & re-create form the event? Responses
+ // may be old but events are always up-to-date.
+ QPID_LOG(warning, "HA: Backup queue declare event, already exists: " << name);
}
}
}
@@ -300,8 +308,11 @@ void BrokerReplicator::doEventQueueDelete(Variant::Map& values) {
// sessions may be closed by a "queue deleted" exception.
string name = values[QNAME].asString();
boost::shared_ptr<Queue> queue = broker.getQueues().find(name);
- if (queue && replicateLevel(queue->getSettings())) {
- QPID_LOG(debug, "HA: Backup deleting queue: " << name);
+ if (!queue) {
+ QPID_LOG(warning, "HA: Backup queue delete event, does not exist: " << name);
+ } else if (!replicateLevel(queue->getSettings())) {
+ QPID_LOG(warning, "HA: Backup queue delete event, not replicated: " << name);
+ } else {
string rname = QueueReplicator::replicatorName(name);
boost::shared_ptr<broker::Exchange> ex = broker.getExchanges().find(rname);
boost::shared_ptr<QueueReplicator> qr = boost::dynamic_pointer_cast<QueueReplicator>(ex);
@@ -310,6 +321,7 @@ void BrokerReplicator::doEventQueueDelete(Variant::Map& values) {
// actually be destroyed, deleting the exhange
broker.getExchanges().destroy(rname);
broker.deleteQueue(name, values[USER].asString(), values[RHOST].asString());
+ QPID_LOG(debug, "HA: Backup queue delete event: " << name);
}
}
@@ -328,27 +340,29 @@ void BrokerReplicator::doEventExchangeDeclare(Variant::Map& values) {
values[USER].asString(),
values[RHOST].asString()).second)
{
- QPID_LOG(debug, "HA: Backup created exchange: " << name);
+ QPID_LOG(debug, "HA: Backup exchange declare event: " << name);
} else {
- // FIXME aconway 2011-11-22: should delete pre-exisitng exchange
+ // FIXME aconway 2011-11-22: should delete pre-existing exchange
// and re-create from event. See comment in doEventQueueDeclare.
- QPID_LOG(warning, "HA: Backup exchange already exists: " << name);
+ QPID_LOG(debug, "HA: Backup exchange declare event, already exists: " << name);
}
}
}
void BrokerReplicator::doEventExchangeDelete(Variant::Map& values) {
string name = values[EXNAME].asString();
- try {
- boost::shared_ptr<Exchange> exchange = broker.getExchanges().find(name);
- if (exchange && replicateLevel(exchange->getArgs())) {
- QPID_LOG(debug, "HA: Backup deleting exchange:" << name);
- broker.deleteExchange(
- name,
- values[USER].asString(),
- values[RHOST].asString());
- }
- } catch (const framing::NotFoundException&) {}
+ boost::shared_ptr<Exchange> exchange = broker.getExchanges().find(name);
+ if (!exchange) {
+ QPID_LOG(warning, "HA: Backup exchange delete event, does not exist: " << name);
+ } else if (!replicateLevel(exchange->getArgs())) {
+ QPID_LOG(warning, "HA: Backup exchange delete event, not replicated: " << name);
+ } else {
+ QPID_LOG(debug, "HA: Backup exchange delete event:" << name);
+ broker.deleteExchange(
+ name,
+ values[USER].asString(),
+ values[RHOST].asString());
+ }
}
void BrokerReplicator::doEventBind(Variant::Map& values) {
@@ -364,10 +378,10 @@ void BrokerReplicator::doEventBind(Variant::Map& values) {
framing::FieldTable args;
amqp_0_10::translate(asMapVoid(values[ARGS]), args);
string key = values[KEY].asString();
- QPID_LOG(debug, "HA: Backup replicated binding exchange=" << exchange->getName()
+ exchange->bind(queue, key, &args);
+ QPID_LOG(debug, "HA: Backup bind event: exchange=" << exchange->getName()
<< " queue=" << queue->getName()
<< " key=" << key);
- exchange->bind(queue, key, &args);
}
}
@@ -384,15 +398,14 @@ void BrokerReplicator::doEventUnbind(Variant::Map& values) {
framing::FieldTable args;
amqp_0_10::translate(asMapVoid(values[ARGS]), args);
string key = values[KEY].asString();
- QPID_LOG(debug, "HA: Backup replicated unbinding exchange=" << exchange->getName()
+ exchange->unbind(queue, key, &args);
+ QPID_LOG(debug, "HA: Backup unbind event: exchange=" << exchange->getName()
<< " queue=" << queue->getName()
<< " key=" << key);
- exchange->unbind(queue, key, &args);
}
}
void BrokerReplicator::doResponseQueue(Variant::Map& values) {
- // FIXME aconway 2011-11-22: more flexible ways & defaults to indicate replication
Variant::Map argsMap(asMapVoid(values[ARGUMENTS]));
if (!replicateLevel(argsMap)) return;
framing::FieldTable args;
@@ -409,12 +422,12 @@ void BrokerReplicator::doResponseQueue(Variant::Map& values) {
""/*TODO: who is the user?*/,
""/*TODO: what should we use as connection id?*/);
if (result.second) {
- QPID_LOG(debug, "HA: Backup created catch-up queue: " << values[NAME]);
+ QPID_LOG(debug, "HA: Backup queue response: " << name);
startQueueReplicator(result.first);
} else {
// FIXME aconway 2011-11-22: Normal to find queue already
// exists if we're failing over.
- QPID_LOG(warning, "HA: Backup catch-up queue already exists: " << name);
+ QPID_LOG(warning, "HA: Backup queue response, already exists: " << name);
}
}
@@ -432,9 +445,10 @@ void BrokerReplicator::doResponseExchange(Variant::Map& values) {
""/*TODO: who is the user?*/,
""/*TODO: what should we use as connection id?*/).second)
{
- QPID_LOG(debug, "HA: Backup catch-up exchange: " << values[NAME]);
+ QPID_LOG(debug, "HA: Backup exchange response: " << values[NAME].asString());
} else {
- QPID_LOG(warning, "HA: Backup catch-up exchange already exists: " << values[QNAME]);
+ QPID_LOG(warning, "HA: Backup exchange query, already exists: " <<
+ values[QNAME].asString());
}
}
@@ -464,7 +478,6 @@ void BrokerReplicator::doResponseBind(Variant::Map& values) {
std::string qName = getRefName(QUEUE_REF_PREFIX, values[QUEUE_REF]);
boost::shared_ptr<Exchange> exchange = broker.getExchanges().find(exName);
boost::shared_ptr<Queue> queue = broker.getQueues().find(qName);
- // FIXME aconway 2011-11-24: more flexible configuration for binding replication.
// Automatically replicate binding if queue and exchange exist and are replicated
if (exchange && replicateLevel(exchange->getArgs()) &&
@@ -474,16 +487,39 @@ void BrokerReplicator::doResponseBind(Variant::Map& values) {
amqp_0_10::translate(asMapVoid(values[ARGUMENTS]), args);
string key = values[KEY].asString();
exchange->bind(queue, key, &args);
- QPID_LOG(debug, "HA: Backup catch-up binding: exchange=" << exchange->getName()
+ QPID_LOG(debug, "HA: Backup bind response: exchange=" << exchange->getName()
<< " queue=" << queue->getName()
<< " key=" << key);
}
}
+namespace {
+const string REPLICATE_DEFAULT="replicateDefault";
+}
+
+// Received the ha-broker configuration object for the primary broker.
+void BrokerReplicator::doResponseHaBroker(Variant::Map& values) {
+ try {
+ ReplicateLevel mine = haBroker.getSettings().replicateDefault;
+ ReplicateLevel primary = replicateLevel(values[REPLICATE_DEFAULT].asString());
+ if (mine != primary) {
+ std::ostringstream os;
+ os << "Replicate default on backup (" << mine
+ << ") does not match primary (" << primary << ")";
+ haBroker.shutdown(os.str());
+ }
+ } catch (const std::exception& e) {
+ std::ostringstream os;
+ os << "Received invalid replicate default from primary: " << e.what();
+ haBroker.shutdown(os.str());
+ }
+}
+
void BrokerReplicator::startQueueReplicator(const boost::shared_ptr<Queue>& queue) {
- if (replicateLevel(queue->getSettings()) == RL_MESSAGES) {
+ if (replicateLevel(queue->getSettings()) == RL_ALL) {
boost::shared_ptr<QueueReplicator> qr(new QueueReplicator(queue, link));
- broker.getExchanges().registerExchange(qr);
+ if (!broker.getExchanges().registerExchange(qr))
+ throw Exception(QPID_MSG("Duplicate queue replicator " << qr->getName()));
qr->activate();
}
}
diff --git a/cpp/src/qpid/ha/BrokerReplicator.h b/cpp/src/qpid/ha/BrokerReplicator.h
index cfb6cf9a28..c9d7b9f74c 100644
--- a/cpp/src/qpid/ha/BrokerReplicator.h
+++ b/cpp/src/qpid/ha/BrokerReplicator.h
@@ -22,6 +22,7 @@
*
*/
+#include "ReplicateLevel.h"
#include "qpid/broker/Exchange.h"
#include "qpid/types/Variant.h"
#include <boost/shared_ptr.hpp>
@@ -35,7 +36,12 @@ class Bridge;
class SessionHandler;
}
+namespace framing {
+class FieldTable;
+}
+
namespace ha {
+class HaBroker;
/**
* Replicate configuration on a backup broker.
@@ -51,19 +57,23 @@ namespace ha {
class BrokerReplicator : public broker::Exchange
{
public:
- BrokerReplicator(const boost::shared_ptr<broker::Link>&);
+ BrokerReplicator(HaBroker&, const boost::shared_ptr<broker::Link>&);
~BrokerReplicator();
std::string getType() const;
// Exchange methods
bool bind(boost::shared_ptr<broker::Queue>, const std::string&, const framing::FieldTable*);
bool unbind(boost::shared_ptr<broker::Queue>, const std::string&, const framing::FieldTable*);
- void route(broker::Deliverable&, const std::string&, const framing::FieldTable*);
+ void route(broker::Deliverable&);
bool isBound(boost::shared_ptr<broker::Queue>, const std::string* const, const framing::FieldTable* const);
private:
void initializeBridge(broker::Bridge&, broker::SessionHandler&);
+ ReplicateLevel replicateLevel(const std::string&);
+ ReplicateLevel replicateLevel(const framing::FieldTable& args);
+ ReplicateLevel replicateLevel(const types::Variant::Map& args);
+
void doEventQueueDeclare(types::Variant::Map& values);
void doEventQueueDelete(types::Variant::Map& values);
void doEventExchangeDeclare(types::Variant::Map& values);
@@ -74,9 +84,11 @@ class BrokerReplicator : public broker::Exchange
void doResponseQueue(types::Variant::Map& values);
void doResponseExchange(types::Variant::Map& values);
void doResponseBind(types::Variant::Map& values);
+ void doResponseHaBroker(types::Variant::Map& values);
void startQueueReplicator(const boost::shared_ptr<broker::Queue>&);
+ HaBroker& haBroker;
broker::Broker& broker;
boost::shared_ptr<broker::Link> link;
};
diff --git a/cpp/src/qpid/ha/HaBroker.cpp b/cpp/src/qpid/ha/HaBroker.cpp
index 0d3bd51439..7d82fb63bd 100644
--- a/cpp/src/qpid/ha/HaBroker.cpp
+++ b/cpp/src/qpid/ha/HaBroker.cpp
@@ -25,10 +25,15 @@
#include "ReplicatingSubscription.h"
#include "qpid/Exception.h"
#include "qpid/broker/Broker.h"
+#include "qpid/broker/Link.h"
+#include "qpid/broker/Queue.h"
+#include "qpid/broker/SignalHandler.h"
#include "qpid/management/ManagementAgent.h"
#include "qmf/org/apache/qpid/ha/Package.h"
-#include "qmf/org/apache/qpid/ha/ArgsHaBrokerSetClientAddresses.h"
-#include "qmf/org/apache/qpid/ha/ArgsHaBrokerSetBrokerAddresses.h"
+#include "qmf/org/apache/qpid/ha/ArgsHaBrokerReplicate.h"
+#include "qmf/org/apache/qpid/ha/ArgsHaBrokerSetBrokers.h"
+#include "qmf/org/apache/qpid/ha/ArgsHaBrokerSetPublicBrokers.h"
+#include "qmf/org/apache/qpid/ha/ArgsHaBrokerSetExpectedBackups.h"
#include "qpid/log/Statement.h"
namespace qpid {
@@ -40,8 +45,10 @@ using namespace std;
namespace {
-const std::string PRIMARY="primary";
+const std::string STANDALONE="standalone";
+const std::string CATCH_UP="catch-up";
const std::string BACKUP="backup";
+const std::string PRIMARY="primary";
} // namespace
@@ -49,7 +56,6 @@ const std::string BACKUP="backup";
HaBroker::HaBroker(broker::Broker& b, const Settings& s)
: broker(b),
settings(s),
- backup(new Backup(b, s)),
mgmtObject(0)
{
// Register a factory for replicating subscriptions.
@@ -62,15 +68,20 @@ HaBroker::HaBroker(broker::Broker& b, const Settings& s)
ManagementAgent* ma = broker.getManagementAgent();
if (!ma)
throw Exception("Cannot start HA: management is disabled");
- if (ma) {
- _qmf::Package packageInit(ma);
- mgmtObject = new _qmf::HaBroker(ma, this, "ha-broker");
- mgmtObject->set_status(BACKUP);
- ma->addObject(mgmtObject);
- }
+ _qmf::Package packageInit(ma);
+ mgmtObject = new _qmf::HaBroker(ma, this, "ha-broker");
+ mgmtObject->set_status(settings.cluster ? BACKUP : STANDALONE);
+ mgmtObject->set_replicateDefault(str(settings.replicateDefault));
+ ma->addObject(mgmtObject);
+
+ // NOTE: lock is not needed in a constructor but we created it just to pass
+ // to the set functions.
sys::Mutex::ScopedLock l(lock);
if (!settings.clientUrl.empty()) setClientUrl(Url(settings.clientUrl), l);
if (!settings.brokerUrl.empty()) setBrokerUrl(Url(settings.brokerUrl), l);
+
+ // If we are in a cluster, we start in backup mode.
+ if (settings.cluster) backup.reset(new Backup(*this, s));
}
HaBroker::~HaBroker() {}
@@ -80,26 +91,47 @@ Manageable::status_t HaBroker::ManagementMethod (uint32_t methodId, Args& args,
switch (methodId) {
case _qmf::HaBroker::METHOD_PROMOTE: {
if (backup.get()) { // I am a backup
- // FIXME aconway 2012-01-26: create primary state before resetting backup
- // as that allows client connections.
+ // NOTE: resetting backup allows client connections, so any
+ // primary state should be set up here before backup.reset()
backup.reset();
- QPID_LOG(notice, "HA: Primary promoted from backup");
+ QPID_LOG(notice, "HA: Promoted to primary");
mgmtObject->set_status(PRIMARY);
}
break;
}
- case _qmf::HaBroker::METHOD_SETCLIENTADDRESSES: {
- setClientUrl(
- Url(dynamic_cast<_qmf::ArgsHaBrokerSetClientAddresses&>(args).
- i_clientAddresses), l);
+ case _qmf::HaBroker::METHOD_SETBROKERS: {
+ setBrokerUrl(Url(dynamic_cast<_qmf::ArgsHaBrokerSetBrokers&>(args).i_url), l);
+ break;
+ }
+ case _qmf::HaBroker::METHOD_SETPUBLICBROKERS: {
+ setClientUrl(Url(dynamic_cast<_qmf::ArgsHaBrokerSetPublicBrokers&>(args).i_url), l);
break;
}
- case _qmf::HaBroker::METHOD_SETBROKERADDRESSES: {
- setBrokerUrl(
- Url(dynamic_cast<_qmf::ArgsHaBrokerSetBrokerAddresses&>(args)
- .i_brokerAddresses), l);
+ case _qmf::HaBroker::METHOD_SETEXPECTEDBACKUPS: {
+ setExpectedBackups(dynamic_cast<_qmf::ArgsHaBrokerSetExpectedBackups&>(args).i_expectedBackups, l);
+ break;
+ }
+ case _qmf::HaBroker::METHOD_REPLICATE: {
+ _qmf::ArgsHaBrokerReplicate& bq_args =
+ dynamic_cast<_qmf::ArgsHaBrokerReplicate&>(args);
+ QPID_LOG(debug, "HA replicating individual queue "<< bq_args.i_queue << " from " << bq_args.i_broker);
+
+ boost::shared_ptr<broker::Queue> queue = broker.getQueues().get(bq_args.i_queue);
+ Url url(bq_args.i_broker);
+ string protocol = url[0].protocol.empty() ? "tcp" : url[0].protocol;
+ std::pair<broker::Link::shared_ptr, bool> result = broker.getLinks().declare(
+ url[0].host, url[0].port, protocol,
+ false, // durable
+ settings.mechanism, settings.username, settings.password);
+ boost::shared_ptr<broker::Link> link = result.first;
+ link->setUrl(url);
+ // Create a queue replicator
+ boost::shared_ptr<QueueReplicator> qr(new QueueReplicator(queue, link));
+ qr->activate();
+ broker.getExchanges().registerExchange(qr);
break;
}
+
default:
return Manageable::STATUS_UNKNOWN_METHOD;
}
@@ -114,24 +146,35 @@ void HaBroker::setClientUrl(const Url& url, const sys::Mutex::ScopedLock& l) {
void HaBroker::updateClientUrl(const sys::Mutex::ScopedLock&) {
Url url = clientUrl.empty() ? brokerUrl : clientUrl;
- assert(!url.empty());
- mgmtObject->set_clientAddresses(url.str());
+ if (url.empty()) throw Url::Invalid("HA client URL is empty");
+ mgmtObject->set_publicBrokers(url.str());
knownBrokers.clear();
knownBrokers.push_back(url);
- QPID_LOG(debug, "HA: Setting client known-brokers to: " << url);
+ QPID_LOG(debug, "HA: Setting client URL to: " << url);
}
void HaBroker::setBrokerUrl(const Url& url, const sys::Mutex::ScopedLock& l) {
- if (url.empty()) throw Exception("Invalid empty URL for HA broker failover");
+ if (url.empty()) throw Url::Invalid("HA broker URL is empty");
+ QPID_LOG(debug, "HA: Setting broker URL to: " << url);
brokerUrl = url;
- mgmtObject->set_brokerAddresses(brokerUrl.str());
+ mgmtObject->set_brokers(brokerUrl.str());
if (backup.get()) backup->setBrokerUrl(brokerUrl);
// Updating broker URL also updates defaulted client URL:
if (clientUrl.empty()) updateClientUrl(l);
}
+void HaBroker::setExpectedBackups(size_t n, const sys::Mutex::ScopedLock&) {
+ expectedBackups = n;
+ mgmtObject->set_expectedBackups(n);
+}
+
std::vector<Url> HaBroker::getKnownBrokers() const {
return knownBrokers;
}
+void HaBroker::shutdown(const std::string& message) {
+ QPID_LOG(critical, "Shutting down: " << message);
+ broker.shutdown();
+}
+
}} // namespace qpid::ha
diff --git a/cpp/src/qpid/ha/HaBroker.h b/cpp/src/qpid/ha/HaBroker.h
index 835a47c749..99b30fd36b 100644
--- a/cpp/src/qpid/ha/HaBroker.h
+++ b/cpp/src/qpid/ha/HaBroker.h
@@ -52,9 +52,16 @@ class HaBroker : public management::Manageable
management::Manageable::status_t ManagementMethod (
uint32_t methodId, management::Args& args, std::string& text);
+ broker::Broker& getBroker() { return broker; }
+ const Settings& getSettings() const { return settings; }
+
+ // Log a critical error message and shut down the broker.
+ void shutdown(const std::string& message);
+
private:
void setClientUrl(const Url&, const sys::Mutex::ScopedLock&);
void setBrokerUrl(const Url&, const sys::Mutex::ScopedLock&);
+ void setExpectedBackups(size_t, const sys::Mutex::ScopedLock&);
void updateClientUrl(const sys::Mutex::ScopedLock&);
bool isPrimary(const sys::Mutex::ScopedLock&) { return !backup.get(); }
std::vector<Url> getKnownBrokers() const;
@@ -67,6 +74,7 @@ class HaBroker : public management::Manageable
qmf::org::apache::qpid::ha::HaBroker* mgmtObject;
Url clientUrl, brokerUrl;
std::vector<Url> knownBrokers;
+ size_t expectedBackups;
};
}} // namespace qpid::ha
diff --git a/cpp/src/qpid/ha/HaPlugin.cpp b/cpp/src/qpid/ha/HaPlugin.cpp
index fc9e48411d..4da3b0d7d2 100644
--- a/cpp/src/qpid/ha/HaPlugin.cpp
+++ b/cpp/src/qpid/ha/HaPlugin.cpp
@@ -31,12 +31,23 @@ struct Options : public qpid::Options {
Settings& settings;
Options(Settings& s) : qpid::Options("HA Options"), settings(s) {
addOptions()
- ("ha-enable", optValue(settings.enabled, "yes|no"), "Enable High Availability features")
- ("ha-client-url", optValue(settings.clientUrl,"URL"), "URL that clients use to connect and fail over.")
- ("ha-broker-url", optValue(settings.brokerUrl,"URL"), "URL that backup brokers use to connect and fail over.")
- ("ha-username", optValue(settings.username, "USER"), "Username for connections between brokers")
- ("ha-password", optValue(settings.password, "PASS"), "Password for connections between brokers")
- ("ha-mechanism", optValue(settings.mechanism, "MECH"), "Authentication mechanism for connections between brokers")
+ ("ha-cluster", optValue(settings.cluster, "yes|no"),
+ "Join a HA active/passive cluster.")
+ ("ha-brokers", optValue(settings.brokerUrl,"URL"),
+ "URL that backup brokers use to connect and fail over.")
+ ("ha-public-brokers", optValue(settings.clientUrl,"URL"),
+ "URL that clients use to connect and fail over, defaults to ha-brokers.")
+ ("ha-replicate",
+ optValue(settings.replicateDefault, "LEVEL"),
+ "Replication level for creating queues and exchanges if there is no qpid.replicate argument supplied. LEVEL is 'none', 'configuration' or 'all'")
+ ("ha-expected-backups", optValue(settings.expectedBackups, "N"),
+ "Number of backups expected to be active in the HA cluster.")
+ ("ha-username", optValue(settings.username, "USER"),
+ "Username for connections between HA brokers")
+ ("ha-password", optValue(settings.password, "PASS"),
+ "Password for connections between HA brokers")
+ ("ha-mechanism", optValue(settings.mechanism, "MECH"),
+ "Authentication mechanism for connections between HA brokers")
;
}
};
@@ -55,10 +66,7 @@ struct HaPlugin : public Plugin {
void initialize(Plugin::Target& target) {
broker::Broker* broker = dynamic_cast<broker::Broker*>(&target);
- if (broker && settings.enabled) {
- haBroker.reset(new ha::HaBroker(*broker, settings));
- } else
- QPID_LOG(notice, "HA: Disabled");
+ if (broker) haBroker.reset(new ha::HaBroker(*broker, settings));
}
};
diff --git a/cpp/src/qpid/ha/QueueReplicator.cpp b/cpp/src/qpid/ha/QueueReplicator.cpp
index 0017cc82cd..633619be13 100644
--- a/cpp/src/qpid/ha/QueueReplicator.cpp
+++ b/cpp/src/qpid/ha/QueueReplicator.cpp
@@ -30,8 +30,8 @@
#include "qpid/framing/SequenceSet.h"
#include "qpid/framing/FieldTable.h"
#include "qpid/log/Statement.h"
+#include "qpid/Msg.h"
#include <boost/shared_ptr.hpp>
-#include <sstream>
namespace {
const std::string QPID_REPLICATOR_("qpid.replicator-");
@@ -54,10 +54,8 @@ std::string QueueReplicator::replicatorName(const std::string& queueName) {
QueueReplicator::QueueReplicator(boost::shared_ptr<Queue> q, boost::shared_ptr<Link> l)
: Exchange(replicatorName(q->getName()), 0, q->getBroker()), queue(q), link(l)
{
- std::stringstream ss;
- ss << "HA: Backup " << queue->getName() << ": ";
- logPrefix = ss.str();
- QPID_LOG(info, logPrefix << "Created, settings: " << q->getSettings());
+ logPrefix = "HA: Backup of " + queue->getName() + ": ";
+ QPID_LOG(info, logPrefix << "Created");
}
// This must be separate from the constructor so we can call shared_from_this.
@@ -77,7 +75,7 @@ void QueueReplicator::activate() {
0, // sync?
// Include shared_ptr to self to ensure we are not deleted
// before initializeBridge is called.
- boost::bind(&QueueReplicator::initializeBridge, this, _1, _2, shared_from_this())
+ boost::bind(&QueueReplicator::initializeBridge, shared_from_this(), _1, _2)
);
}
@@ -91,9 +89,7 @@ void QueueReplicator::deactivate() {
}
// Called in a broker connection thread when the bridge is created.
-// shared_ptr to self ensures we are not deleted before initializeBridge is called.
-void QueueReplicator::initializeBridge(Bridge& bridge, SessionHandler& sessionHandler,
- boost::shared_ptr<QueueReplicator> /*self*/) {
+void QueueReplicator::initializeBridge(Bridge& bridge, SessionHandler& sessionHandler) {
sys::Mutex::ScopedLock l(lock);
bridgeName = bridge.getName();
framing::AMQP_ServerProxy peer(sessionHandler.out);
@@ -141,27 +137,35 @@ void QueueReplicator::dequeue(SequenceNumber n, const sys::Mutex::ScopedLock&)
}
// Called in connection thread of the queues bridge to primary.
-void QueueReplicator::route(Deliverable& msg, const std::string& key, const FieldTable*)
+void QueueReplicator::route(Deliverable& msg)
{
- sys::Mutex::ScopedLock l(lock);
- if (key == DEQUEUE_EVENT_KEY) {
- SequenceSet dequeues = decodeContent<SequenceSet>(msg.getMessage());
- QPID_LOG(trace, logPrefix << "Dequeue: " << dequeues);
- //TODO: should be able to optimise the following
- for (SequenceSet::iterator i = dequeues.begin(); i != dequeues.end(); i++)
- dequeue(*i, l);
- } else if (key == POSITION_EVENT_KEY) {
- SequenceNumber position = decodeContent<SequenceNumber>(msg.getMessage());
- QPID_LOG(trace, logPrefix << "Position moved from " << queue->getPosition()
- << " to " << position);
- assert(queue->getPosition() <= position);
- //TODO aconway 2011-12-14: Optimize this?
- for (SequenceNumber i = queue->getPosition(); i < position; ++i)
- dequeue(i,l);
- queue->setPosition(position);
- } else {
- msg.deliverTo(queue);
- QPID_LOG(trace, logPrefix << "Enqueued message " << queue->getPosition());
+ try {
+ const std::string& key = msg.getMessage().getRoutingKey();
+ sys::Mutex::ScopedLock l(lock);
+ if (key == DEQUEUE_EVENT_KEY) {
+ SequenceSet dequeues = decodeContent<SequenceSet>(msg.getMessage());
+ QPID_LOG(trace, logPrefix << "Dequeue: " << dequeues);
+ //TODO: should be able to optimise the following
+ for (SequenceSet::iterator i = dequeues.begin(); i != dequeues.end(); i++)
+ dequeue(*i, l);
+ } else if (key == POSITION_EVENT_KEY) {
+ SequenceNumber position = decodeContent<SequenceNumber>(msg.getMessage());
+ QPID_LOG(trace, logPrefix << "Position moved from " << queue->getPosition()
+ << " to " << position);
+ if (queue->getPosition() > position) {
+ throw Exception(
+ QPID_MSG(logPrefix << "Invalid position update from "
+ << queue->getPosition() << " to " << position));
+ }
+ queue->setPosition(position);
+ } else {
+ msg.deliverTo(queue);
+ QPID_LOG(trace, logPrefix << "Enqueued message " << queue->getPosition());
+ }
+ }
+ catch (const std::exception& e) {
+ QPID_LOG(critical, logPrefix << "Replication failed: " << e.what());
+ throw;
}
}
diff --git a/cpp/src/qpid/ha/QueueReplicator.h b/cpp/src/qpid/ha/QueueReplicator.h
index 9de7dd480c..bcbac988fa 100644
--- a/cpp/src/qpid/ha/QueueReplicator.h
+++ b/cpp/src/qpid/ha/QueueReplicator.h
@@ -66,12 +66,11 @@ class QueueReplicator : public broker::Exchange,
bool bind(boost::shared_ptr<broker::Queue
>, const std::string&, const framing::FieldTable*);
bool unbind(boost::shared_ptr<broker::Queue>, const std::string&, const framing::FieldTable*);
- void route(broker::Deliverable&, const std::string&, const framing::FieldTable*);
+ void route(broker::Deliverable&);
bool isBound(boost::shared_ptr<broker::Queue>, const std::string* const, const framing::FieldTable* const);
private:
- void initializeBridge(broker::Bridge& bridge, broker::SessionHandler& sessionHandler,
- boost::shared_ptr<QueueReplicator> self);
+ void initializeBridge(broker::Bridge& bridge, broker::SessionHandler& sessionHandler);
void dequeue(framing::SequenceNumber, const sys::Mutex::ScopedLock&);
std::string logPrefix;
diff --git a/cpp/src/qpid/ha/ReplicateLevel.cpp b/cpp/src/qpid/ha/ReplicateLevel.cpp
new file mode 100644
index 0000000000..4981577225
--- /dev/null
+++ b/cpp/src/qpid/ha/ReplicateLevel.cpp
@@ -0,0 +1,72 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "ReplicateLevel.h"
+#include "qpid/Exception.h"
+#include "qpid/Msg.h"
+#include <iostream>
+#include <assert.h>
+
+namespace qpid {
+namespace ha {
+
+using namespace std;
+
+// Note replicateLevel is called during plugin-initialization which
+// happens in the static construction phase so these constants need
+// to be POD, they can't be class objects
+//
+namespace {
+const char* S_NONE="none";
+const char* S_CONFIGURATION="configuration";
+const char* S_ALL="all";
+}
+
+bool replicateLevel(const string& level, ReplicateLevel& out) {
+ if (level == S_NONE) { out = RL_NONE; return true; }
+ if (level == S_CONFIGURATION) { out = RL_CONFIGURATION; return true; }
+ if (level == S_ALL) { out = RL_ALL; return true; }
+ return false;
+}
+
+ReplicateLevel replicateLevel(const string& level) {
+ ReplicateLevel rl;
+ if (!replicateLevel(level, rl))
+ throw Exception("Invalid value for replication level: "+level);
+ return rl;
+}
+
+string str(ReplicateLevel l) {
+ const char* names[] = { S_NONE, S_CONFIGURATION, S_ALL };
+ if (l > RL_ALL)
+ throw Exception(QPID_MSG("Invalid value for replication level: " << l));
+ return names[l];
+}
+
+ostream& operator<<(ostream& o, ReplicateLevel rl) { return o << str(rl); }
+
+istream& operator>>(istream& i, ReplicateLevel& rl) {
+ string str;
+ i >> str;
+ rl = replicateLevel(str);
+ return i;
+}
+
+}} // namespace qpid::ha
diff --git a/cpp/src/qpid/ha/ReplicateLevel.h b/cpp/src/qpid/ha/ReplicateLevel.h
new file mode 100644
index 0000000000..c11e03f0ce
--- /dev/null
+++ b/cpp/src/qpid/ha/ReplicateLevel.h
@@ -0,0 +1,52 @@
+#ifndef QPID_HA_REPLICATELEVEL_H
+#define QPID_HA_REPLICATELEVEL_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <string>
+#include <iosfwd>
+
+namespace qpid {
+namespace ha {
+
+enum ReplicateLevel { RL_NONE, RL_CONFIGURATION, RL_ALL };
+
+/**
+ * If str is a valid replicate level, set out and return true.
+ */
+bool replicateLevel(const std::string& str, ReplicateLevel& out);
+
+/**
+ *@return enum corresponding to string level.
+ *@throw qpid::Exception if level is not a valid replication level.
+ */
+ReplicateLevel replicateLevel(const std::string& level);
+
+/**@return string form of replicate level */
+std::string str(ReplicateLevel l);
+
+std::ostream& operator<<(std::ostream&, ReplicateLevel);
+std::istream& operator>>(std::istream&, ReplicateLevel&);
+
+}} // namespaces qpid::ha
+
+#endif /*!QPID_HA_REPLICATELEVEL_H*/
diff --git a/cpp/src/qpid/ha/ReplicatingSubscription.cpp b/cpp/src/qpid/ha/ReplicatingSubscription.cpp
index af6180305d..91a4538bc4 100644
--- a/cpp/src/qpid/ha/ReplicatingSubscription.cpp
+++ b/cpp/src/qpid/ha/ReplicatingSubscription.cpp
@@ -87,10 +87,13 @@ ReplicatingSubscription::ReplicatingSubscription(
events(new Queue(mask(name))),
consumer(new DelegatingConsumer(*this))
{
+ // Separate the remote part from a "local-remote" address.
+ string address = parent->getSession().getConnection().getUrl();
+ size_t i = address.find('-');
+ if (i != string::npos) address = address.substr(i+1);
+ logPrefix = "HA: Primary ";
stringstream ss;
- ss << "HA: Primary: " << getQueue()->getName() << " at "
- << parent->getSession().getConnection().getUrl() << ": ";
- logPrefix = ss.str();
+ logSuffix = " (" + address + ")";
// FIXME aconway 2011-12-09: Failover optimization removed.
// There was code here to re-use messages already on the backup
@@ -99,7 +102,7 @@ ReplicatingSubscription::ReplicatingSubscription(
// can be re-introduced later. Last revision with the optimization:
// r1213258 | QPID-3603: Fix QueueReplicator subscription parameters.
- QPID_LOG(debug, logPrefix << "Created backup subscription " << getName());
+ QPID_LOG(debug, logPrefix << "created backup subscription " << getName() << logSuffix);
// FIXME aconway 2011-12-15: ConsumerImpl::position is left at 0
// so we will start consuming from the lowest numbered message.
@@ -109,23 +112,36 @@ ReplicatingSubscription::ReplicatingSubscription(
// Message is delivered in the subscription's connection thread.
bool ReplicatingSubscription::deliver(QueuedMessage& m) {
- // Add position events for the subscribed queue, not for the internal event queue.
- if (m.queue && m.queue == getQueue().get()) {
- sys::Mutex::ScopedLock l(lock);
- assert(position == m.position);
- // m.position is the position of the newly enqueued m on the local queue.
- // backupPosition is latest position on the backup queue (before enqueueing m.)
- assert(m.position > backupPosition);
- if (m.position - backupPosition > 1) {
- // Position has advanced because of messages dequeued ahead of us.
- SequenceNumber send(m.position);
- --send; // Send the position before m was enqueued.
- sendPositionEvent(send, l);
+ try {
+ // Add position events for the subscribed queue, not for the internal event queue.
+ if (m.queue && m.queue == getQueue().get()) {
+ sys::Mutex::ScopedLock l(lock);
+ if (position != m.position)
+ throw Exception(
+ QPID_MSG("Expected position " << position
+ << " but got " << m.position));
+ // m.position is the position of the newly enqueued m on the local queue.
+ // backupPosition is latest position on the backup queue (before enqueueing m.)
+ if (m.position <= backupPosition)
+ throw Exception(
+ QPID_MSG("Expected position > " << backupPosition
+ << " but got " << m.position));
+
+ if (m.position - backupPosition > 1) {
+ // Position has advanced because of messages dequeued ahead of us.
+ SequenceNumber send(m.position);
+ --send; // Send the position before m was enqueued.
+ sendPositionEvent(send, l);
+ }
+ backupPosition = m.position;
+ QPID_LOG(trace, logPrefix << "replicating " << m << logSuffix);
}
- backupPosition = m.position;
- QPID_LOG(trace, logPrefix << "Replicating message " << m.position);
+ return ConsumerImpl::deliver(m);
+ } catch (const std::exception& e) {
+ QPID_LOG(critical, logPrefix << "error replicating " << getQueue()->getName()
+ << logSuffix << ": " << e.what());
+ throw;
}
- return ConsumerImpl::deliver(m);
}
ReplicatingSubscription::~ReplicatingSubscription() {}
@@ -139,7 +155,7 @@ void ReplicatingSubscription::complete(
{
// Handle completions for the subscribed queue, not the internal event queue.
if (qm.queue && qm.queue == getQueue().get()) {
- QPID_LOG(trace, logPrefix << "Completed message " << qm.position);
+ QPID_LOG(trace, logPrefix << "completed " << qm << logSuffix);
Delayed::iterator i= delayed.find(qm.position);
// The same message can be completed twice, by acknowledged and
// dequeued, remove it from the set so it only gets completed
@@ -157,7 +173,7 @@ void ReplicatingSubscription::complete(
void ReplicatingSubscription::enqueued(const QueuedMessage& qm) {
sys::Mutex::ScopedLock l(lock);
// Delay completion
- QPID_LOG(trace, logPrefix << "Delaying completion of message " << qm.position);
+ QPID_LOG(trace, logPrefix << "delaying completion of " << qm << logSuffix);
qm.payload->getIngressCompletion().startCompleter();
assert(delayed.find(qm.position) == delayed.end());
delayed[qm.position] = qm;
@@ -168,7 +184,7 @@ void ReplicatingSubscription::enqueued(const QueuedMessage& qm) {
void ReplicatingSubscription::cancelComplete(
const Delayed::value_type& v, const sys::Mutex::ScopedLock&)
{
- QPID_LOG(trace, logPrefix << "Cancel completed message " << v.second.position);
+ QPID_LOG(trace, logPrefix << "cancel completed " << v.second << logSuffix);
v.second.payload->getIngressCompletion().finishCompleter();
}
@@ -179,7 +195,7 @@ void ReplicatingSubscription::cancel()
boost::dynamic_pointer_cast<QueueObserver>(shared_from_this()));
{
sys::Mutex::ScopedLock l(lock);
- QPID_LOG(debug, logPrefix <<"Cancelled backup subscription " << getName());
+ QPID_LOG(debug, logPrefix << "cancel backup subscription " << getName() << logSuffix);
for_each(delayed.begin(), delayed.end(),
boost::bind(&ReplicatingSubscription::cancelComplete, this, _1, boost::ref(l)));
delayed.clear();
@@ -201,7 +217,8 @@ bool ReplicatingSubscription::hideDeletedError() { return true; }
// Called with lock held. Called in subscription's connection thread.
void ReplicatingSubscription::sendDequeueEvent(const sys::Mutex::ScopedLock& l)
{
- QPID_LOG(trace, logPrefix << "Sending dequeues " << dequeues);
+ QPID_LOG(trace, logPrefix << "sending dequeues " << dequeues
+ << " from " << getQueue()->getName() << logSuffix);
string buf(dequeues.encodedSize(),'\0');
framing::Buffer buffer(&buf[0], buf.size());
dequeues.encode(buffer);
@@ -216,7 +233,7 @@ void ReplicatingSubscription::dequeued(const QueuedMessage& qm)
{
{
sys::Mutex::ScopedLock l(lock);
- QPID_LOG(trace, logPrefix << "Dequeued message " << qm.position);
+ QPID_LOG(trace, logPrefix << "dequeued " << qm << logSuffix);
dequeues.add(qm.position);
// If we have not yet sent this message to the backup, then
// complete it now as it will never be accepted.
@@ -229,8 +246,8 @@ void ReplicatingSubscription::dequeued(const QueuedMessage& qm)
void ReplicatingSubscription::sendPositionEvent(
SequenceNumber position, const sys::Mutex::ScopedLock&l )
{
- QPID_LOG(trace, logPrefix << "Sending position " << position
- << ", was " << backupPosition);
+ QPID_LOG(trace, logPrefix << "sending position " << position
+ << ", was " << backupPosition << logSuffix);
string buf(backupPosition.encodedSize(),'\0');
framing::Buffer buffer(&buf[0], buf.size());
position.encode(buffer);
diff --git a/cpp/src/qpid/ha/ReplicatingSubscription.h b/cpp/src/qpid/ha/ReplicatingSubscription.h
index e311f9505a..f9176915f6 100644
--- a/cpp/src/qpid/ha/ReplicatingSubscription.h
+++ b/cpp/src/qpid/ha/ReplicatingSubscription.h
@@ -33,7 +33,7 @@ namespace qpid {
namespace broker {
class Message;
class Queue;
-class QueuedMessage;
+struct QueuedMessage;
class OwnershipToken;
}
@@ -94,7 +94,7 @@ class ReplicatingSubscription : public broker::SemanticState::ConsumerImpl,
bool doDispatch();
private:
typedef std::map<framing::SequenceNumber, broker::QueuedMessage> Delayed;
- std::string logPrefix;
+ std::string logPrefix, logSuffix;
boost::shared_ptr<broker::Queue> events;
boost::shared_ptr<broker::Consumer> consumer;
Delayed delayed;
diff --git a/cpp/src/qpid/ha/Settings.h b/cpp/src/qpid/ha/Settings.h
index 049c873b9f..bf70c3f3f7 100644
--- a/cpp/src/qpid/ha/Settings.h
+++ b/cpp/src/qpid/ha/Settings.h
@@ -22,6 +22,7 @@
*
*/
+#include "ReplicateLevel.h"
#include <string>
namespace qpid {
@@ -33,10 +34,12 @@ namespace ha {
class Settings
{
public:
- Settings() : enabled(false) {}
- bool enabled;
+ Settings() : cluster(false), expectedBackups(0), replicateDefault(RL_NONE) {}
+ bool cluster; // True if we are a cluster member.
std::string clientUrl;
std::string brokerUrl;
+ size_t expectedBackups;
+ ReplicateLevel replicateDefault;
std::string username, password, mechanism;
private:
};
diff --git a/cpp/src/qpid/ha/management-schema.xml b/cpp/src/qpid/ha/management-schema.xml
index fe4a14d111..363dff61fb 100644
--- a/cpp/src/qpid/ha/management-schema.xml
+++ b/cpp/src/qpid/ha/management-schema.xml
@@ -22,16 +22,39 @@
<!-- Monitor and control HA status of a broker. -->
<class name="HaBroker">
<property name="name" type="sstr" access="RC" index="y" desc="Primary Key"/>
+
<property name="status" type="sstr" desc="HA status: primary or backup"/>
- <property name="clientAddresses" type="sstr" desc="List of addresses used by clients to connect to the HA cluster."/>
- <property name="brokerAddresses" type="sstr" desc="List of addresses used by HA brokers to connect to each other."/>
+
+ <property name="brokers" type="sstr"
+ desc="Multiple-address URL used by HA brokers to connect to each other."/>
+
+ <property name="publicBrokers" type="sstr"
+ desc="Multiple-address URL used by clients to connect to the HA brokers."/>
+
+ <property name="expectedBackups" type="uint16"
+ desc="Number of HA backup brokers expected."/>
+
+ <property
+ name="replicateDefault" type="sstr"
+ desc="Replicate value for queues/exchanges without a qpid.replicate argument"/>
<method name="promote" desc="Promote a backup broker to primary."/>
- <method name="setClientAddresses" desc="Set HA client addresses">
- <arg name="clientAddresses" type="sstr" dir="I"/>
+
+ <method name="setBrokers" desc="Set URL for HA brokers to connect to each other.">
+ <arg name="url" type="sstr" dir="I"/>
</method>
- <method name="setBrokerAddresses" desc="Set HA broker addresses">
- <arg name="brokerAddresses" type="sstr" dir="I"/>
+
+ <method name="setPublicBrokers" desc="Set URL for clients to connect to HA brokers">
+ <arg name="url" type="sstr" dir="I"/>
+ </method>
+
+ <method name="setExpectedBackups" desc="Set number of backups expected">
+ <arg name="expectedBackups" type="uint16" dir="I"/>
+ </method>
+
+ <method name="replicate" desc="Replicate individual queue from remote broker.">
+ <arg name="broker" type="sstr" dir="I"/>
+ <arg name="queue" type="sstr" dir="I"/>
</method>
</class>