diff options
Diffstat (limited to 'cpp/src/qpid/ha')
-rw-r--r-- | cpp/src/qpid/ha/Backup.cpp | 36 | ||||
-rw-r--r-- | cpp/src/qpid/ha/Backup.h | 4 | ||||
-rw-r--r-- | cpp/src/qpid/ha/BrokerReplicator.cpp | 190 | ||||
-rw-r--r-- | cpp/src/qpid/ha/BrokerReplicator.h | 16 | ||||
-rw-r--r-- | cpp/src/qpid/ha/HaBroker.cpp | 95 | ||||
-rw-r--r-- | cpp/src/qpid/ha/HaBroker.h | 8 | ||||
-rw-r--r-- | cpp/src/qpid/ha/HaPlugin.cpp | 28 | ||||
-rw-r--r-- | cpp/src/qpid/ha/QueueReplicator.cpp | 62 | ||||
-rw-r--r-- | cpp/src/qpid/ha/QueueReplicator.h | 5 | ||||
-rw-r--r-- | cpp/src/qpid/ha/ReplicateLevel.cpp | 72 | ||||
-rw-r--r-- | cpp/src/qpid/ha/ReplicateLevel.h | 52 | ||||
-rw-r--r-- | cpp/src/qpid/ha/ReplicatingSubscription.cpp | 71 | ||||
-rw-r--r-- | cpp/src/qpid/ha/ReplicatingSubscription.h | 4 | ||||
-rw-r--r-- | cpp/src/qpid/ha/Settings.h | 7 | ||||
-rw-r--r-- | cpp/src/qpid/ha/management-schema.xml | 35 |
15 files changed, 483 insertions, 202 deletions
diff --git a/cpp/src/qpid/ha/Backup.cpp b/cpp/src/qpid/ha/Backup.cpp index 5acbfb9d5f..3f3fa87a01 100644 --- a/cpp/src/qpid/ha/Backup.cpp +++ b/cpp/src/qpid/ha/Backup.cpp @@ -19,10 +19,11 @@ * */ #include "Backup.h" -#include "Settings.h" #include "BrokerReplicator.h" -#include "ReplicatingSubscription.h" #include "ConnectionExcluder.h" +#include "HaBroker.h" +#include "ReplicatingSubscription.h" +#include "Settings.h" #include "qpid/Url.h" #include "qpid/amqp_0_10/Codecs.h" #include "qpid/broker/Bridge.h" @@ -43,37 +44,44 @@ using namespace broker; using types::Variant; using std::string; -Backup::Backup(broker::Broker& b, const Settings& s) : - broker(b), settings(s), excluder(new ConnectionExcluder()) +Backup::Backup(HaBroker& hb, const Settings& s) : + haBroker(hb), broker(hb.getBroker()), settings(s), excluder(new ConnectionExcluder()) { + // Exclude client connections before starting the link to avoid self-connection. + broker.getConnectionObservers().add(excluder); // Empty brokerUrl means delay initialization until setUrl() is called. if (!s.brokerUrl.empty()) initialize(Url(s.brokerUrl)); } void Backup::initialize(const Url& url) { - assert(!url.empty()); - QPID_LOG(notice, "Ha: Backup started: " << url); + if (url.empty()) throw Url::Invalid("HA broker URL is empty"); + QPID_LOG(notice, "HA: Backup initialized: " << url); string protocol = url[0].protocol.empty() ? "tcp" : url[0].protocol; // Declare the link std::pair<Link::shared_ptr, bool> result = broker.getLinks().declare( url[0].host, url[0].port, protocol, false, // durable settings.mechanism, settings.username, settings.password); - assert(result.second); // FIXME aconway 2011-11-23: error handling link = result.first; link->setUrl(url); - - replicator.reset(new BrokerReplicator(link)); + replicator.reset(new BrokerReplicator(haBroker, link)); broker.getExchanges().registerExchange(replicator); - broker.getConnectionObservers().add(excluder); } +Backup::~Backup() { + if (link) link->close(); + if (replicator.get()) broker.getExchanges().destroy(replicator->getName()); + replicator.reset(); + broker.getConnectionObservers().remove(excluder); // This allows client connections. +} + + void Backup::setBrokerUrl(const Url& url) { // Ignore empty URLs seen during start-up for some tests. if (url.empty()) return; sys::Mutex::ScopedLock l(lock); if (link) { // URL changed after we initialized. - QPID_LOG(info, "HA: Backup failover URL set to " << url); + QPID_LOG(info, "HA: Backup broker URL set to " << url); link->setUrl(url); } else { @@ -81,10 +89,4 @@ void Backup::setBrokerUrl(const Url& url) { } } -Backup::~Backup() { - if (link) link->close(); - if (replicator.get()) broker.getExchanges().destroy(replicator->getName()); - broker.getConnectionObservers().remove(excluder); // This allows client connections. -} - }} // namespace qpid::ha diff --git a/cpp/src/qpid/ha/Backup.h b/cpp/src/qpid/ha/Backup.h index 526b238b82..6c36996914 100644 --- a/cpp/src/qpid/ha/Backup.h +++ b/cpp/src/qpid/ha/Backup.h @@ -38,6 +38,7 @@ namespace ha { class Settings; class ConnectionExcluder; class BrokerReplicator; +class HaBroker; /** * State associated with a backup broker. Manages connections to primary. @@ -47,7 +48,7 @@ class BrokerReplicator; class Backup { public: - Backup(broker::Broker&, const Settings&); + Backup(HaBroker&, const Settings&); ~Backup(); void setBrokerUrl(const Url&); @@ -55,6 +56,7 @@ class Backup void initialize(const Url&); sys::Mutex lock; + HaBroker& haBroker; broker::Broker& broker; Settings settings; boost::shared_ptr<broker::Link> link; diff --git a/cpp/src/qpid/ha/BrokerReplicator.cpp b/cpp/src/qpid/ha/BrokerReplicator.cpp index a8f05c1fe3..d0c99cbdb6 100644 --- a/cpp/src/qpid/ha/BrokerReplicator.cpp +++ b/cpp/src/qpid/ha/BrokerReplicator.cpp @@ -19,6 +19,7 @@ * */ #include "BrokerReplicator.h" +#include "HaBroker.h" #include "QueueReplicator.h" #include "qpid/broker/Broker.h" #include "qpid/broker/Queue.h" @@ -37,6 +38,7 @@ #include "qmf/org/apache/qpid/broker/EventQueueDelete.h" #include "qmf/org/apache/qpid/broker/EventSubscribe.h" #include <algorithm> +#include <sstream> namespace qpid { namespace ha { @@ -87,6 +89,7 @@ const string QUEUE("queue"); const string RHOST("rhost"); const string TYPE("type"); const string USER("user"); +const string HA_BROKER("habroker"); const string AGENT_IND_EVENT_ORG_APACHE_QPID_BROKER("agent.ind.event.org_apache_qpid_broker.#"); const string QMF2("qmf2"); @@ -100,6 +103,7 @@ const string _PACKAGE_NAME("_package_name"); const string _SCHEMA_ID("_schema_id"); const string OBJECT("OBJECT"); const string ORG_APACHE_QPID_BROKER("org.apache.qpid.broker"); +const string ORG_APACHE_QPID_HA("org.apache.qpid.ha"); const string QMF_DEFAULT_DIRECT("qmf.default.direct"); const string _QUERY_REQUEST("_query_request"); const string BROKER("broker"); @@ -113,36 +117,13 @@ template <class T> bool match(Variant::Map& schema) { return T::match(schema[CLASS_NAME], schema[PACKAGE_NAME]); } -enum ReplicateLevel { RL_NONE=0, RL_CONFIGURATION, RL_MESSAGES }; -const string S_NONE="none"; -const string S_CONFIGURATION="configuration"; -const string S_MESSAGES="messages"; - -ReplicateLevel replicateLevel(const string& level) { - if (level == S_NONE) return RL_NONE; - if (level == S_CONFIGURATION) return RL_CONFIGURATION; - if (level == S_MESSAGES) return RL_MESSAGES; - throw Exception("Invalid value for "+QPID_REPLICATE+": "+level); -} - -ReplicateLevel replicateLevel(const framing::FieldTable& f) { - if (f.isSet(QPID_REPLICATE)) return replicateLevel(f.getAsString(QPID_REPLICATE)); - else return RL_NONE; -} - -ReplicateLevel replicateLevel(const Variant::Map& m) { - Variant::Map::const_iterator i = m.find(QPID_REPLICATE); - if (i != m.end()) return replicateLevel(i->second.asString()); - else return RL_NONE; -} - -void sendQuery(const string className, const string& queueName, SessionHandler& sessionHandler) { +void sendQuery(const string& packageName, const string& className, const string& queueName, SessionHandler& sessionHandler) { framing::AMQP_ServerProxy peer(sessionHandler.out); Variant::Map request; request[_WHAT] = OBJECT; Variant::Map schema; schema[_CLASS_NAME] = className; - schema[_PACKAGE_NAME] = ORG_APACHE_QPID_BROKER; + schema[_PACKAGE_NAME] = packageName; request[_SCHEMA_ID] = schema; AMQFrame method((MessageTransferBody(ProtocolVersion(), QMF_DEFAULT_DIRECT, 0, 0))); @@ -181,13 +162,34 @@ Variant::Map asMapVoid(const Variant& value) { } // namespace + +ReplicateLevel BrokerReplicator::replicateLevel(const std::string& str) { + ReplicateLevel rl; + if (qpid::ha::replicateLevel(str, rl)) return rl; + else return haBroker.getSettings().replicateDefault; +} + +ReplicateLevel BrokerReplicator::replicateLevel(const framing::FieldTable& f) { + if (f.isSet(QPID_REPLICATE)) + return replicateLevel(f.getAsString(QPID_REPLICATE)); + else + return haBroker.getSettings().replicateDefault; +} + +ReplicateLevel BrokerReplicator::replicateLevel(const Variant::Map& m) { + Variant::Map::const_iterator i = m.find(QPID_REPLICATE); + if (i != m.end()) + return replicateLevel(i->second.asString()); + else + return haBroker.getSettings().replicateDefault; +} + BrokerReplicator::~BrokerReplicator() {} -BrokerReplicator::BrokerReplicator(const boost::shared_ptr<Link>& l) - : Exchange(QPID_CONFIGURATION_REPLICATOR), broker(*l->getBroker()), link(l) +BrokerReplicator::BrokerReplicator(HaBroker& hb, const boost::shared_ptr<Link>& l) + : Exchange(QPID_CONFIGURATION_REPLICATOR), + haBroker(hb), broker(hb.getBroker()), link(l) { - QPID_LOG(info, "HA: Backup replicating from " << - link->getTransport() << ":" << link->getHost() << ":" << link->getPort()); broker.getLinks().declare( link->getHost(), link->getPort(), false, // durable @@ -211,22 +213,26 @@ void BrokerReplicator::initializeBridge(Bridge& bridge, SessionHandler& sessionH const qmf::org::apache::qpid::broker::ArgsLinkBridge& args(bridge.getArgs()); //declare and bind an event queue - peer.getQueue().declare(queueName, "", false, false, true, true, FieldTable()); + FieldTable declareArgs; + declareArgs.setString(QPID_REPLICATE, str(RL_NONE)); + peer.getQueue().declare(queueName, "", false, false, true, true, declareArgs); peer.getExchange().bind(queueName, QMF_DEFAULT_TOPIC, AGENT_IND_EVENT_ORG_APACHE_QPID_BROKER, FieldTable()); //subscribe to the queue peer.getMessage().subscribe(queueName, args.i_dest, 1, 0, false, "", 0, FieldTable()); peer.getMessage().flow(args.i_dest, 0, 0xFFFFFFFF); peer.getMessage().flow(args.i_dest, 1, 0xFFFFFFFF); - //issue a query request for queues and another for exchanges using event queue as the reply-to address - sendQuery(QUEUE, queueName, sessionHandler); - sendQuery(EXCHANGE, queueName, sessionHandler); - sendQuery(BINDING, queueName, sessionHandler); - QPID_LOG(debug, "HA: Backup activated configuration bridge: " << queueName); + // Issue a query request for queues, exchanges, bindings and the habroker + // using event queue as the reply-to address + sendQuery(ORG_APACHE_QPID_HA, HA_BROKER, queueName, sessionHandler); + sendQuery(ORG_APACHE_QPID_BROKER, QUEUE, queueName, sessionHandler); + sendQuery(ORG_APACHE_QPID_BROKER, EXCHANGE, queueName, sessionHandler); + sendQuery(ORG_APACHE_QPID_BROKER, BINDING, queueName, sessionHandler); + QPID_LOG(debug, "HA: Backup configuration bridge: " << queueName); } -// FIXME aconway 2011-12-02: error handling in route. -void BrokerReplicator::route(Deliverable& msg, const string& /*key*/, const framing::FieldTable* headers) { +void BrokerReplicator::route(Deliverable& msg) { + const framing::FieldTable* headers = msg.getMessage().getApplicationHeaders(); Variant::List list; try { if (!isQMFv2(msg.getMessage()) || !headers) @@ -238,6 +244,7 @@ void BrokerReplicator::route(Deliverable& msg, const string& /*key*/, const fram if (headers->getAsString(QMF_CONTENT) == EVENT) { for (Variant::List::iterator i = list.begin(); i != list.end(); ++i) { Variant::Map& map = i->asMap(); + QPID_LOG(trace, "HA: Backup received event: " << map); Variant::Map& schema = map[SCHEMA_ID].asMap(); Variant::Map& values = map[VALUES].asMap(); if (match<EventQueueDeclare>(schema)) doEventQueueDeclare(values); @@ -249,19 +256,22 @@ void BrokerReplicator::route(Deliverable& msg, const string& /*key*/, const fram } } else if (headers->getAsString(QMF_OPCODE) == QUERY_RESPONSE) { for (Variant::List::iterator i = list.begin(); i != list.end(); ++i) { - string type = i->asMap()[SCHEMA_ID].asMap()[CLASS_NAME]; - Variant::Map& values = i->asMap()[VALUES].asMap(); + Variant::Map& map = i->asMap(); + QPID_LOG(trace, "HA: Backup received event: " << map); + string type = map[SCHEMA_ID].asMap()[CLASS_NAME]; + Variant::Map& values = map[VALUES].asMap(); framing::FieldTable args; amqp_0_10::translate(asMapVoid(values[ARGUMENTS]), args); if (type == QUEUE) doResponseQueue(values); else if (type == EXCHANGE) doResponseExchange(values); else if (type == BINDING) doResponseBind(values); - else QPID_LOG(error, "HA: Backup received unknown response type=" << type - << " values=" << values); + else if (type == HA_BROKER) doResponseHaBroker(values); } - } else QPID_LOG(error, "HA: Backup received unexpected message: " << *headers); + } } catch (const std::exception& e) { - QPID_LOG(error, "HA: Backup replication error: " << e.what() << ": while handling: " << list); + QPID_LOG(critical, "HA: Backup configuration failed: " << e.what() + << ": while handling: " << list); + throw; } } @@ -282,15 +292,13 @@ void BrokerReplicator::doEventQueueDeclare(Variant::Map& values) { values[USER].asString(), values[RHOST].asString()); if (result.second) { - // FIXME aconway 2011-11-22: should delete old queue and - // re-create from event. - // Events are always up to date, whereas responses may be - // out of date. - QPID_LOG(debug, "HA: Backup created queue: " << name); + QPID_LOG(debug, "HA: Backup queue declare event: " << name); startQueueReplicator(result.first); } else { // FIXME aconway 2011-12-02: what's the right way to handle this? - QPID_LOG(warning, "HA: Backup queue already exists: " << name); + // Should we delete the old & re-create form the event? Responses + // may be old but events are always up-to-date. + QPID_LOG(warning, "HA: Backup queue declare event, already exists: " << name); } } } @@ -300,8 +308,11 @@ void BrokerReplicator::doEventQueueDelete(Variant::Map& values) { // sessions may be closed by a "queue deleted" exception. string name = values[QNAME].asString(); boost::shared_ptr<Queue> queue = broker.getQueues().find(name); - if (queue && replicateLevel(queue->getSettings())) { - QPID_LOG(debug, "HA: Backup deleting queue: " << name); + if (!queue) { + QPID_LOG(warning, "HA: Backup queue delete event, does not exist: " << name); + } else if (!replicateLevel(queue->getSettings())) { + QPID_LOG(warning, "HA: Backup queue delete event, not replicated: " << name); + } else { string rname = QueueReplicator::replicatorName(name); boost::shared_ptr<broker::Exchange> ex = broker.getExchanges().find(rname); boost::shared_ptr<QueueReplicator> qr = boost::dynamic_pointer_cast<QueueReplicator>(ex); @@ -310,6 +321,7 @@ void BrokerReplicator::doEventQueueDelete(Variant::Map& values) { // actually be destroyed, deleting the exhange broker.getExchanges().destroy(rname); broker.deleteQueue(name, values[USER].asString(), values[RHOST].asString()); + QPID_LOG(debug, "HA: Backup queue delete event: " << name); } } @@ -328,27 +340,29 @@ void BrokerReplicator::doEventExchangeDeclare(Variant::Map& values) { values[USER].asString(), values[RHOST].asString()).second) { - QPID_LOG(debug, "HA: Backup created exchange: " << name); + QPID_LOG(debug, "HA: Backup exchange declare event: " << name); } else { - // FIXME aconway 2011-11-22: should delete pre-exisitng exchange + // FIXME aconway 2011-11-22: should delete pre-existing exchange // and re-create from event. See comment in doEventQueueDeclare. - QPID_LOG(warning, "HA: Backup exchange already exists: " << name); + QPID_LOG(debug, "HA: Backup exchange declare event, already exists: " << name); } } } void BrokerReplicator::doEventExchangeDelete(Variant::Map& values) { string name = values[EXNAME].asString(); - try { - boost::shared_ptr<Exchange> exchange = broker.getExchanges().find(name); - if (exchange && replicateLevel(exchange->getArgs())) { - QPID_LOG(debug, "HA: Backup deleting exchange:" << name); - broker.deleteExchange( - name, - values[USER].asString(), - values[RHOST].asString()); - } - } catch (const framing::NotFoundException&) {} + boost::shared_ptr<Exchange> exchange = broker.getExchanges().find(name); + if (!exchange) { + QPID_LOG(warning, "HA: Backup exchange delete event, does not exist: " << name); + } else if (!replicateLevel(exchange->getArgs())) { + QPID_LOG(warning, "HA: Backup exchange delete event, not replicated: " << name); + } else { + QPID_LOG(debug, "HA: Backup exchange delete event:" << name); + broker.deleteExchange( + name, + values[USER].asString(), + values[RHOST].asString()); + } } void BrokerReplicator::doEventBind(Variant::Map& values) { @@ -364,10 +378,10 @@ void BrokerReplicator::doEventBind(Variant::Map& values) { framing::FieldTable args; amqp_0_10::translate(asMapVoid(values[ARGS]), args); string key = values[KEY].asString(); - QPID_LOG(debug, "HA: Backup replicated binding exchange=" << exchange->getName() + exchange->bind(queue, key, &args); + QPID_LOG(debug, "HA: Backup bind event: exchange=" << exchange->getName() << " queue=" << queue->getName() << " key=" << key); - exchange->bind(queue, key, &args); } } @@ -384,15 +398,14 @@ void BrokerReplicator::doEventUnbind(Variant::Map& values) { framing::FieldTable args; amqp_0_10::translate(asMapVoid(values[ARGS]), args); string key = values[KEY].asString(); - QPID_LOG(debug, "HA: Backup replicated unbinding exchange=" << exchange->getName() + exchange->unbind(queue, key, &args); + QPID_LOG(debug, "HA: Backup unbind event: exchange=" << exchange->getName() << " queue=" << queue->getName() << " key=" << key); - exchange->unbind(queue, key, &args); } } void BrokerReplicator::doResponseQueue(Variant::Map& values) { - // FIXME aconway 2011-11-22: more flexible ways & defaults to indicate replication Variant::Map argsMap(asMapVoid(values[ARGUMENTS])); if (!replicateLevel(argsMap)) return; framing::FieldTable args; @@ -409,12 +422,12 @@ void BrokerReplicator::doResponseQueue(Variant::Map& values) { ""/*TODO: who is the user?*/, ""/*TODO: what should we use as connection id?*/); if (result.second) { - QPID_LOG(debug, "HA: Backup created catch-up queue: " << values[NAME]); + QPID_LOG(debug, "HA: Backup queue response: " << name); startQueueReplicator(result.first); } else { // FIXME aconway 2011-11-22: Normal to find queue already // exists if we're failing over. - QPID_LOG(warning, "HA: Backup catch-up queue already exists: " << name); + QPID_LOG(warning, "HA: Backup queue response, already exists: " << name); } } @@ -432,9 +445,10 @@ void BrokerReplicator::doResponseExchange(Variant::Map& values) { ""/*TODO: who is the user?*/, ""/*TODO: what should we use as connection id?*/).second) { - QPID_LOG(debug, "HA: Backup catch-up exchange: " << values[NAME]); + QPID_LOG(debug, "HA: Backup exchange response: " << values[NAME].asString()); } else { - QPID_LOG(warning, "HA: Backup catch-up exchange already exists: " << values[QNAME]); + QPID_LOG(warning, "HA: Backup exchange query, already exists: " << + values[QNAME].asString()); } } @@ -464,7 +478,6 @@ void BrokerReplicator::doResponseBind(Variant::Map& values) { std::string qName = getRefName(QUEUE_REF_PREFIX, values[QUEUE_REF]); boost::shared_ptr<Exchange> exchange = broker.getExchanges().find(exName); boost::shared_ptr<Queue> queue = broker.getQueues().find(qName); - // FIXME aconway 2011-11-24: more flexible configuration for binding replication. // Automatically replicate binding if queue and exchange exist and are replicated if (exchange && replicateLevel(exchange->getArgs()) && @@ -474,16 +487,39 @@ void BrokerReplicator::doResponseBind(Variant::Map& values) { amqp_0_10::translate(asMapVoid(values[ARGUMENTS]), args); string key = values[KEY].asString(); exchange->bind(queue, key, &args); - QPID_LOG(debug, "HA: Backup catch-up binding: exchange=" << exchange->getName() + QPID_LOG(debug, "HA: Backup bind response: exchange=" << exchange->getName() << " queue=" << queue->getName() << " key=" << key); } } +namespace { +const string REPLICATE_DEFAULT="replicateDefault"; +} + +// Received the ha-broker configuration object for the primary broker. +void BrokerReplicator::doResponseHaBroker(Variant::Map& values) { + try { + ReplicateLevel mine = haBroker.getSettings().replicateDefault; + ReplicateLevel primary = replicateLevel(values[REPLICATE_DEFAULT].asString()); + if (mine != primary) { + std::ostringstream os; + os << "Replicate default on backup (" << mine + << ") does not match primary (" << primary << ")"; + haBroker.shutdown(os.str()); + } + } catch (const std::exception& e) { + std::ostringstream os; + os << "Received invalid replicate default from primary: " << e.what(); + haBroker.shutdown(os.str()); + } +} + void BrokerReplicator::startQueueReplicator(const boost::shared_ptr<Queue>& queue) { - if (replicateLevel(queue->getSettings()) == RL_MESSAGES) { + if (replicateLevel(queue->getSettings()) == RL_ALL) { boost::shared_ptr<QueueReplicator> qr(new QueueReplicator(queue, link)); - broker.getExchanges().registerExchange(qr); + if (!broker.getExchanges().registerExchange(qr)) + throw Exception(QPID_MSG("Duplicate queue replicator " << qr->getName())); qr->activate(); } } diff --git a/cpp/src/qpid/ha/BrokerReplicator.h b/cpp/src/qpid/ha/BrokerReplicator.h index cfb6cf9a28..c9d7b9f74c 100644 --- a/cpp/src/qpid/ha/BrokerReplicator.h +++ b/cpp/src/qpid/ha/BrokerReplicator.h @@ -22,6 +22,7 @@ * */ +#include "ReplicateLevel.h" #include "qpid/broker/Exchange.h" #include "qpid/types/Variant.h" #include <boost/shared_ptr.hpp> @@ -35,7 +36,12 @@ class Bridge; class SessionHandler; } +namespace framing { +class FieldTable; +} + namespace ha { +class HaBroker; /** * Replicate configuration on a backup broker. @@ -51,19 +57,23 @@ namespace ha { class BrokerReplicator : public broker::Exchange { public: - BrokerReplicator(const boost::shared_ptr<broker::Link>&); + BrokerReplicator(HaBroker&, const boost::shared_ptr<broker::Link>&); ~BrokerReplicator(); std::string getType() const; // Exchange methods bool bind(boost::shared_ptr<broker::Queue>, const std::string&, const framing::FieldTable*); bool unbind(boost::shared_ptr<broker::Queue>, const std::string&, const framing::FieldTable*); - void route(broker::Deliverable&, const std::string&, const framing::FieldTable*); + void route(broker::Deliverable&); bool isBound(boost::shared_ptr<broker::Queue>, const std::string* const, const framing::FieldTable* const); private: void initializeBridge(broker::Bridge&, broker::SessionHandler&); + ReplicateLevel replicateLevel(const std::string&); + ReplicateLevel replicateLevel(const framing::FieldTable& args); + ReplicateLevel replicateLevel(const types::Variant::Map& args); + void doEventQueueDeclare(types::Variant::Map& values); void doEventQueueDelete(types::Variant::Map& values); void doEventExchangeDeclare(types::Variant::Map& values); @@ -74,9 +84,11 @@ class BrokerReplicator : public broker::Exchange void doResponseQueue(types::Variant::Map& values); void doResponseExchange(types::Variant::Map& values); void doResponseBind(types::Variant::Map& values); + void doResponseHaBroker(types::Variant::Map& values); void startQueueReplicator(const boost::shared_ptr<broker::Queue>&); + HaBroker& haBroker; broker::Broker& broker; boost::shared_ptr<broker::Link> link; }; diff --git a/cpp/src/qpid/ha/HaBroker.cpp b/cpp/src/qpid/ha/HaBroker.cpp index 0d3bd51439..7d82fb63bd 100644 --- a/cpp/src/qpid/ha/HaBroker.cpp +++ b/cpp/src/qpid/ha/HaBroker.cpp @@ -25,10 +25,15 @@ #include "ReplicatingSubscription.h" #include "qpid/Exception.h" #include "qpid/broker/Broker.h" +#include "qpid/broker/Link.h" +#include "qpid/broker/Queue.h" +#include "qpid/broker/SignalHandler.h" #include "qpid/management/ManagementAgent.h" #include "qmf/org/apache/qpid/ha/Package.h" -#include "qmf/org/apache/qpid/ha/ArgsHaBrokerSetClientAddresses.h" -#include "qmf/org/apache/qpid/ha/ArgsHaBrokerSetBrokerAddresses.h" +#include "qmf/org/apache/qpid/ha/ArgsHaBrokerReplicate.h" +#include "qmf/org/apache/qpid/ha/ArgsHaBrokerSetBrokers.h" +#include "qmf/org/apache/qpid/ha/ArgsHaBrokerSetPublicBrokers.h" +#include "qmf/org/apache/qpid/ha/ArgsHaBrokerSetExpectedBackups.h" #include "qpid/log/Statement.h" namespace qpid { @@ -40,8 +45,10 @@ using namespace std; namespace { -const std::string PRIMARY="primary"; +const std::string STANDALONE="standalone"; +const std::string CATCH_UP="catch-up"; const std::string BACKUP="backup"; +const std::string PRIMARY="primary"; } // namespace @@ -49,7 +56,6 @@ const std::string BACKUP="backup"; HaBroker::HaBroker(broker::Broker& b, const Settings& s) : broker(b), settings(s), - backup(new Backup(b, s)), mgmtObject(0) { // Register a factory for replicating subscriptions. @@ -62,15 +68,20 @@ HaBroker::HaBroker(broker::Broker& b, const Settings& s) ManagementAgent* ma = broker.getManagementAgent(); if (!ma) throw Exception("Cannot start HA: management is disabled"); - if (ma) { - _qmf::Package packageInit(ma); - mgmtObject = new _qmf::HaBroker(ma, this, "ha-broker"); - mgmtObject->set_status(BACKUP); - ma->addObject(mgmtObject); - } + _qmf::Package packageInit(ma); + mgmtObject = new _qmf::HaBroker(ma, this, "ha-broker"); + mgmtObject->set_status(settings.cluster ? BACKUP : STANDALONE); + mgmtObject->set_replicateDefault(str(settings.replicateDefault)); + ma->addObject(mgmtObject); + + // NOTE: lock is not needed in a constructor but we created it just to pass + // to the set functions. sys::Mutex::ScopedLock l(lock); if (!settings.clientUrl.empty()) setClientUrl(Url(settings.clientUrl), l); if (!settings.brokerUrl.empty()) setBrokerUrl(Url(settings.brokerUrl), l); + + // If we are in a cluster, we start in backup mode. + if (settings.cluster) backup.reset(new Backup(*this, s)); } HaBroker::~HaBroker() {} @@ -80,26 +91,47 @@ Manageable::status_t HaBroker::ManagementMethod (uint32_t methodId, Args& args, switch (methodId) { case _qmf::HaBroker::METHOD_PROMOTE: { if (backup.get()) { // I am a backup - // FIXME aconway 2012-01-26: create primary state before resetting backup - // as that allows client connections. + // NOTE: resetting backup allows client connections, so any + // primary state should be set up here before backup.reset() backup.reset(); - QPID_LOG(notice, "HA: Primary promoted from backup"); + QPID_LOG(notice, "HA: Promoted to primary"); mgmtObject->set_status(PRIMARY); } break; } - case _qmf::HaBroker::METHOD_SETCLIENTADDRESSES: { - setClientUrl( - Url(dynamic_cast<_qmf::ArgsHaBrokerSetClientAddresses&>(args). - i_clientAddresses), l); + case _qmf::HaBroker::METHOD_SETBROKERS: { + setBrokerUrl(Url(dynamic_cast<_qmf::ArgsHaBrokerSetBrokers&>(args).i_url), l); + break; + } + case _qmf::HaBroker::METHOD_SETPUBLICBROKERS: { + setClientUrl(Url(dynamic_cast<_qmf::ArgsHaBrokerSetPublicBrokers&>(args).i_url), l); break; } - case _qmf::HaBroker::METHOD_SETBROKERADDRESSES: { - setBrokerUrl( - Url(dynamic_cast<_qmf::ArgsHaBrokerSetBrokerAddresses&>(args) - .i_brokerAddresses), l); + case _qmf::HaBroker::METHOD_SETEXPECTEDBACKUPS: { + setExpectedBackups(dynamic_cast<_qmf::ArgsHaBrokerSetExpectedBackups&>(args).i_expectedBackups, l); + break; + } + case _qmf::HaBroker::METHOD_REPLICATE: { + _qmf::ArgsHaBrokerReplicate& bq_args = + dynamic_cast<_qmf::ArgsHaBrokerReplicate&>(args); + QPID_LOG(debug, "HA replicating individual queue "<< bq_args.i_queue << " from " << bq_args.i_broker); + + boost::shared_ptr<broker::Queue> queue = broker.getQueues().get(bq_args.i_queue); + Url url(bq_args.i_broker); + string protocol = url[0].protocol.empty() ? "tcp" : url[0].protocol; + std::pair<broker::Link::shared_ptr, bool> result = broker.getLinks().declare( + url[0].host, url[0].port, protocol, + false, // durable + settings.mechanism, settings.username, settings.password); + boost::shared_ptr<broker::Link> link = result.first; + link->setUrl(url); + // Create a queue replicator + boost::shared_ptr<QueueReplicator> qr(new QueueReplicator(queue, link)); + qr->activate(); + broker.getExchanges().registerExchange(qr); break; } + default: return Manageable::STATUS_UNKNOWN_METHOD; } @@ -114,24 +146,35 @@ void HaBroker::setClientUrl(const Url& url, const sys::Mutex::ScopedLock& l) { void HaBroker::updateClientUrl(const sys::Mutex::ScopedLock&) { Url url = clientUrl.empty() ? brokerUrl : clientUrl; - assert(!url.empty()); - mgmtObject->set_clientAddresses(url.str()); + if (url.empty()) throw Url::Invalid("HA client URL is empty"); + mgmtObject->set_publicBrokers(url.str()); knownBrokers.clear(); knownBrokers.push_back(url); - QPID_LOG(debug, "HA: Setting client known-brokers to: " << url); + QPID_LOG(debug, "HA: Setting client URL to: " << url); } void HaBroker::setBrokerUrl(const Url& url, const sys::Mutex::ScopedLock& l) { - if (url.empty()) throw Exception("Invalid empty URL for HA broker failover"); + if (url.empty()) throw Url::Invalid("HA broker URL is empty"); + QPID_LOG(debug, "HA: Setting broker URL to: " << url); brokerUrl = url; - mgmtObject->set_brokerAddresses(brokerUrl.str()); + mgmtObject->set_brokers(brokerUrl.str()); if (backup.get()) backup->setBrokerUrl(brokerUrl); // Updating broker URL also updates defaulted client URL: if (clientUrl.empty()) updateClientUrl(l); } +void HaBroker::setExpectedBackups(size_t n, const sys::Mutex::ScopedLock&) { + expectedBackups = n; + mgmtObject->set_expectedBackups(n); +} + std::vector<Url> HaBroker::getKnownBrokers() const { return knownBrokers; } +void HaBroker::shutdown(const std::string& message) { + QPID_LOG(critical, "Shutting down: " << message); + broker.shutdown(); +} + }} // namespace qpid::ha diff --git a/cpp/src/qpid/ha/HaBroker.h b/cpp/src/qpid/ha/HaBroker.h index 835a47c749..99b30fd36b 100644 --- a/cpp/src/qpid/ha/HaBroker.h +++ b/cpp/src/qpid/ha/HaBroker.h @@ -52,9 +52,16 @@ class HaBroker : public management::Manageable management::Manageable::status_t ManagementMethod ( uint32_t methodId, management::Args& args, std::string& text); + broker::Broker& getBroker() { return broker; } + const Settings& getSettings() const { return settings; } + + // Log a critical error message and shut down the broker. + void shutdown(const std::string& message); + private: void setClientUrl(const Url&, const sys::Mutex::ScopedLock&); void setBrokerUrl(const Url&, const sys::Mutex::ScopedLock&); + void setExpectedBackups(size_t, const sys::Mutex::ScopedLock&); void updateClientUrl(const sys::Mutex::ScopedLock&); bool isPrimary(const sys::Mutex::ScopedLock&) { return !backup.get(); } std::vector<Url> getKnownBrokers() const; @@ -67,6 +74,7 @@ class HaBroker : public management::Manageable qmf::org::apache::qpid::ha::HaBroker* mgmtObject; Url clientUrl, brokerUrl; std::vector<Url> knownBrokers; + size_t expectedBackups; }; }} // namespace qpid::ha diff --git a/cpp/src/qpid/ha/HaPlugin.cpp b/cpp/src/qpid/ha/HaPlugin.cpp index fc9e48411d..4da3b0d7d2 100644 --- a/cpp/src/qpid/ha/HaPlugin.cpp +++ b/cpp/src/qpid/ha/HaPlugin.cpp @@ -31,12 +31,23 @@ struct Options : public qpid::Options { Settings& settings; Options(Settings& s) : qpid::Options("HA Options"), settings(s) { addOptions() - ("ha-enable", optValue(settings.enabled, "yes|no"), "Enable High Availability features") - ("ha-client-url", optValue(settings.clientUrl,"URL"), "URL that clients use to connect and fail over.") - ("ha-broker-url", optValue(settings.brokerUrl,"URL"), "URL that backup brokers use to connect and fail over.") - ("ha-username", optValue(settings.username, "USER"), "Username for connections between brokers") - ("ha-password", optValue(settings.password, "PASS"), "Password for connections between brokers") - ("ha-mechanism", optValue(settings.mechanism, "MECH"), "Authentication mechanism for connections between brokers") + ("ha-cluster", optValue(settings.cluster, "yes|no"), + "Join a HA active/passive cluster.") + ("ha-brokers", optValue(settings.brokerUrl,"URL"), + "URL that backup brokers use to connect and fail over.") + ("ha-public-brokers", optValue(settings.clientUrl,"URL"), + "URL that clients use to connect and fail over, defaults to ha-brokers.") + ("ha-replicate", + optValue(settings.replicateDefault, "LEVEL"), + "Replication level for creating queues and exchanges if there is no qpid.replicate argument supplied. LEVEL is 'none', 'configuration' or 'all'") + ("ha-expected-backups", optValue(settings.expectedBackups, "N"), + "Number of backups expected to be active in the HA cluster.") + ("ha-username", optValue(settings.username, "USER"), + "Username for connections between HA brokers") + ("ha-password", optValue(settings.password, "PASS"), + "Password for connections between HA brokers") + ("ha-mechanism", optValue(settings.mechanism, "MECH"), + "Authentication mechanism for connections between HA brokers") ; } }; @@ -55,10 +66,7 @@ struct HaPlugin : public Plugin { void initialize(Plugin::Target& target) { broker::Broker* broker = dynamic_cast<broker::Broker*>(&target); - if (broker && settings.enabled) { - haBroker.reset(new ha::HaBroker(*broker, settings)); - } else - QPID_LOG(notice, "HA: Disabled"); + if (broker) haBroker.reset(new ha::HaBroker(*broker, settings)); } }; diff --git a/cpp/src/qpid/ha/QueueReplicator.cpp b/cpp/src/qpid/ha/QueueReplicator.cpp index 0017cc82cd..633619be13 100644 --- a/cpp/src/qpid/ha/QueueReplicator.cpp +++ b/cpp/src/qpid/ha/QueueReplicator.cpp @@ -30,8 +30,8 @@ #include "qpid/framing/SequenceSet.h" #include "qpid/framing/FieldTable.h" #include "qpid/log/Statement.h" +#include "qpid/Msg.h" #include <boost/shared_ptr.hpp> -#include <sstream> namespace { const std::string QPID_REPLICATOR_("qpid.replicator-"); @@ -54,10 +54,8 @@ std::string QueueReplicator::replicatorName(const std::string& queueName) { QueueReplicator::QueueReplicator(boost::shared_ptr<Queue> q, boost::shared_ptr<Link> l) : Exchange(replicatorName(q->getName()), 0, q->getBroker()), queue(q), link(l) { - std::stringstream ss; - ss << "HA: Backup " << queue->getName() << ": "; - logPrefix = ss.str(); - QPID_LOG(info, logPrefix << "Created, settings: " << q->getSettings()); + logPrefix = "HA: Backup of " + queue->getName() + ": "; + QPID_LOG(info, logPrefix << "Created"); } // This must be separate from the constructor so we can call shared_from_this. @@ -77,7 +75,7 @@ void QueueReplicator::activate() { 0, // sync? // Include shared_ptr to self to ensure we are not deleted // before initializeBridge is called. - boost::bind(&QueueReplicator::initializeBridge, this, _1, _2, shared_from_this()) + boost::bind(&QueueReplicator::initializeBridge, shared_from_this(), _1, _2) ); } @@ -91,9 +89,7 @@ void QueueReplicator::deactivate() { } // Called in a broker connection thread when the bridge is created. -// shared_ptr to self ensures we are not deleted before initializeBridge is called. -void QueueReplicator::initializeBridge(Bridge& bridge, SessionHandler& sessionHandler, - boost::shared_ptr<QueueReplicator> /*self*/) { +void QueueReplicator::initializeBridge(Bridge& bridge, SessionHandler& sessionHandler) { sys::Mutex::ScopedLock l(lock); bridgeName = bridge.getName(); framing::AMQP_ServerProxy peer(sessionHandler.out); @@ -141,27 +137,35 @@ void QueueReplicator::dequeue(SequenceNumber n, const sys::Mutex::ScopedLock&) } // Called in connection thread of the queues bridge to primary. -void QueueReplicator::route(Deliverable& msg, const std::string& key, const FieldTable*) +void QueueReplicator::route(Deliverable& msg) { - sys::Mutex::ScopedLock l(lock); - if (key == DEQUEUE_EVENT_KEY) { - SequenceSet dequeues = decodeContent<SequenceSet>(msg.getMessage()); - QPID_LOG(trace, logPrefix << "Dequeue: " << dequeues); - //TODO: should be able to optimise the following - for (SequenceSet::iterator i = dequeues.begin(); i != dequeues.end(); i++) - dequeue(*i, l); - } else if (key == POSITION_EVENT_KEY) { - SequenceNumber position = decodeContent<SequenceNumber>(msg.getMessage()); - QPID_LOG(trace, logPrefix << "Position moved from " << queue->getPosition() - << " to " << position); - assert(queue->getPosition() <= position); - //TODO aconway 2011-12-14: Optimize this? - for (SequenceNumber i = queue->getPosition(); i < position; ++i) - dequeue(i,l); - queue->setPosition(position); - } else { - msg.deliverTo(queue); - QPID_LOG(trace, logPrefix << "Enqueued message " << queue->getPosition()); + try { + const std::string& key = msg.getMessage().getRoutingKey(); + sys::Mutex::ScopedLock l(lock); + if (key == DEQUEUE_EVENT_KEY) { + SequenceSet dequeues = decodeContent<SequenceSet>(msg.getMessage()); + QPID_LOG(trace, logPrefix << "Dequeue: " << dequeues); + //TODO: should be able to optimise the following + for (SequenceSet::iterator i = dequeues.begin(); i != dequeues.end(); i++) + dequeue(*i, l); + } else if (key == POSITION_EVENT_KEY) { + SequenceNumber position = decodeContent<SequenceNumber>(msg.getMessage()); + QPID_LOG(trace, logPrefix << "Position moved from " << queue->getPosition() + << " to " << position); + if (queue->getPosition() > position) { + throw Exception( + QPID_MSG(logPrefix << "Invalid position update from " + << queue->getPosition() << " to " << position)); + } + queue->setPosition(position); + } else { + msg.deliverTo(queue); + QPID_LOG(trace, logPrefix << "Enqueued message " << queue->getPosition()); + } + } + catch (const std::exception& e) { + QPID_LOG(critical, logPrefix << "Replication failed: " << e.what()); + throw; } } diff --git a/cpp/src/qpid/ha/QueueReplicator.h b/cpp/src/qpid/ha/QueueReplicator.h index 9de7dd480c..bcbac988fa 100644 --- a/cpp/src/qpid/ha/QueueReplicator.h +++ b/cpp/src/qpid/ha/QueueReplicator.h @@ -66,12 +66,11 @@ class QueueReplicator : public broker::Exchange, bool bind(boost::shared_ptr<broker::Queue >, const std::string&, const framing::FieldTable*); bool unbind(boost::shared_ptr<broker::Queue>, const std::string&, const framing::FieldTable*); - void route(broker::Deliverable&, const std::string&, const framing::FieldTable*); + void route(broker::Deliverable&); bool isBound(boost::shared_ptr<broker::Queue>, const std::string* const, const framing::FieldTable* const); private: - void initializeBridge(broker::Bridge& bridge, broker::SessionHandler& sessionHandler, - boost::shared_ptr<QueueReplicator> self); + void initializeBridge(broker::Bridge& bridge, broker::SessionHandler& sessionHandler); void dequeue(framing::SequenceNumber, const sys::Mutex::ScopedLock&); std::string logPrefix; diff --git a/cpp/src/qpid/ha/ReplicateLevel.cpp b/cpp/src/qpid/ha/ReplicateLevel.cpp new file mode 100644 index 0000000000..4981577225 --- /dev/null +++ b/cpp/src/qpid/ha/ReplicateLevel.cpp @@ -0,0 +1,72 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "ReplicateLevel.h" +#include "qpid/Exception.h" +#include "qpid/Msg.h" +#include <iostream> +#include <assert.h> + +namespace qpid { +namespace ha { + +using namespace std; + +// Note replicateLevel is called during plugin-initialization which +// happens in the static construction phase so these constants need +// to be POD, they can't be class objects +// +namespace { +const char* S_NONE="none"; +const char* S_CONFIGURATION="configuration"; +const char* S_ALL="all"; +} + +bool replicateLevel(const string& level, ReplicateLevel& out) { + if (level == S_NONE) { out = RL_NONE; return true; } + if (level == S_CONFIGURATION) { out = RL_CONFIGURATION; return true; } + if (level == S_ALL) { out = RL_ALL; return true; } + return false; +} + +ReplicateLevel replicateLevel(const string& level) { + ReplicateLevel rl; + if (!replicateLevel(level, rl)) + throw Exception("Invalid value for replication level: "+level); + return rl; +} + +string str(ReplicateLevel l) { + const char* names[] = { S_NONE, S_CONFIGURATION, S_ALL }; + if (l > RL_ALL) + throw Exception(QPID_MSG("Invalid value for replication level: " << l)); + return names[l]; +} + +ostream& operator<<(ostream& o, ReplicateLevel rl) { return o << str(rl); } + +istream& operator>>(istream& i, ReplicateLevel& rl) { + string str; + i >> str; + rl = replicateLevel(str); + return i; +} + +}} // namespace qpid::ha diff --git a/cpp/src/qpid/ha/ReplicateLevel.h b/cpp/src/qpid/ha/ReplicateLevel.h new file mode 100644 index 0000000000..c11e03f0ce --- /dev/null +++ b/cpp/src/qpid/ha/ReplicateLevel.h @@ -0,0 +1,52 @@ +#ifndef QPID_HA_REPLICATELEVEL_H +#define QPID_HA_REPLICATELEVEL_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include <string> +#include <iosfwd> + +namespace qpid { +namespace ha { + +enum ReplicateLevel { RL_NONE, RL_CONFIGURATION, RL_ALL }; + +/** + * If str is a valid replicate level, set out and return true. + */ +bool replicateLevel(const std::string& str, ReplicateLevel& out); + +/** + *@return enum corresponding to string level. + *@throw qpid::Exception if level is not a valid replication level. + */ +ReplicateLevel replicateLevel(const std::string& level); + +/**@return string form of replicate level */ +std::string str(ReplicateLevel l); + +std::ostream& operator<<(std::ostream&, ReplicateLevel); +std::istream& operator>>(std::istream&, ReplicateLevel&); + +}} // namespaces qpid::ha + +#endif /*!QPID_HA_REPLICATELEVEL_H*/ diff --git a/cpp/src/qpid/ha/ReplicatingSubscription.cpp b/cpp/src/qpid/ha/ReplicatingSubscription.cpp index af6180305d..91a4538bc4 100644 --- a/cpp/src/qpid/ha/ReplicatingSubscription.cpp +++ b/cpp/src/qpid/ha/ReplicatingSubscription.cpp @@ -87,10 +87,13 @@ ReplicatingSubscription::ReplicatingSubscription( events(new Queue(mask(name))), consumer(new DelegatingConsumer(*this)) { + // Separate the remote part from a "local-remote" address. + string address = parent->getSession().getConnection().getUrl(); + size_t i = address.find('-'); + if (i != string::npos) address = address.substr(i+1); + logPrefix = "HA: Primary "; stringstream ss; - ss << "HA: Primary: " << getQueue()->getName() << " at " - << parent->getSession().getConnection().getUrl() << ": "; - logPrefix = ss.str(); + logSuffix = " (" + address + ")"; // FIXME aconway 2011-12-09: Failover optimization removed. // There was code here to re-use messages already on the backup @@ -99,7 +102,7 @@ ReplicatingSubscription::ReplicatingSubscription( // can be re-introduced later. Last revision with the optimization: // r1213258 | QPID-3603: Fix QueueReplicator subscription parameters. - QPID_LOG(debug, logPrefix << "Created backup subscription " << getName()); + QPID_LOG(debug, logPrefix << "created backup subscription " << getName() << logSuffix); // FIXME aconway 2011-12-15: ConsumerImpl::position is left at 0 // so we will start consuming from the lowest numbered message. @@ -109,23 +112,36 @@ ReplicatingSubscription::ReplicatingSubscription( // Message is delivered in the subscription's connection thread. bool ReplicatingSubscription::deliver(QueuedMessage& m) { - // Add position events for the subscribed queue, not for the internal event queue. - if (m.queue && m.queue == getQueue().get()) { - sys::Mutex::ScopedLock l(lock); - assert(position == m.position); - // m.position is the position of the newly enqueued m on the local queue. - // backupPosition is latest position on the backup queue (before enqueueing m.) - assert(m.position > backupPosition); - if (m.position - backupPosition > 1) { - // Position has advanced because of messages dequeued ahead of us. - SequenceNumber send(m.position); - --send; // Send the position before m was enqueued. - sendPositionEvent(send, l); + try { + // Add position events for the subscribed queue, not for the internal event queue. + if (m.queue && m.queue == getQueue().get()) { + sys::Mutex::ScopedLock l(lock); + if (position != m.position) + throw Exception( + QPID_MSG("Expected position " << position + << " but got " << m.position)); + // m.position is the position of the newly enqueued m on the local queue. + // backupPosition is latest position on the backup queue (before enqueueing m.) + if (m.position <= backupPosition) + throw Exception( + QPID_MSG("Expected position > " << backupPosition + << " but got " << m.position)); + + if (m.position - backupPosition > 1) { + // Position has advanced because of messages dequeued ahead of us. + SequenceNumber send(m.position); + --send; // Send the position before m was enqueued. + sendPositionEvent(send, l); + } + backupPosition = m.position; + QPID_LOG(trace, logPrefix << "replicating " << m << logSuffix); } - backupPosition = m.position; - QPID_LOG(trace, logPrefix << "Replicating message " << m.position); + return ConsumerImpl::deliver(m); + } catch (const std::exception& e) { + QPID_LOG(critical, logPrefix << "error replicating " << getQueue()->getName() + << logSuffix << ": " << e.what()); + throw; } - return ConsumerImpl::deliver(m); } ReplicatingSubscription::~ReplicatingSubscription() {} @@ -139,7 +155,7 @@ void ReplicatingSubscription::complete( { // Handle completions for the subscribed queue, not the internal event queue. if (qm.queue && qm.queue == getQueue().get()) { - QPID_LOG(trace, logPrefix << "Completed message " << qm.position); + QPID_LOG(trace, logPrefix << "completed " << qm << logSuffix); Delayed::iterator i= delayed.find(qm.position); // The same message can be completed twice, by acknowledged and // dequeued, remove it from the set so it only gets completed @@ -157,7 +173,7 @@ void ReplicatingSubscription::complete( void ReplicatingSubscription::enqueued(const QueuedMessage& qm) { sys::Mutex::ScopedLock l(lock); // Delay completion - QPID_LOG(trace, logPrefix << "Delaying completion of message " << qm.position); + QPID_LOG(trace, logPrefix << "delaying completion of " << qm << logSuffix); qm.payload->getIngressCompletion().startCompleter(); assert(delayed.find(qm.position) == delayed.end()); delayed[qm.position] = qm; @@ -168,7 +184,7 @@ void ReplicatingSubscription::enqueued(const QueuedMessage& qm) { void ReplicatingSubscription::cancelComplete( const Delayed::value_type& v, const sys::Mutex::ScopedLock&) { - QPID_LOG(trace, logPrefix << "Cancel completed message " << v.second.position); + QPID_LOG(trace, logPrefix << "cancel completed " << v.second << logSuffix); v.second.payload->getIngressCompletion().finishCompleter(); } @@ -179,7 +195,7 @@ void ReplicatingSubscription::cancel() boost::dynamic_pointer_cast<QueueObserver>(shared_from_this())); { sys::Mutex::ScopedLock l(lock); - QPID_LOG(debug, logPrefix <<"Cancelled backup subscription " << getName()); + QPID_LOG(debug, logPrefix << "cancel backup subscription " << getName() << logSuffix); for_each(delayed.begin(), delayed.end(), boost::bind(&ReplicatingSubscription::cancelComplete, this, _1, boost::ref(l))); delayed.clear(); @@ -201,7 +217,8 @@ bool ReplicatingSubscription::hideDeletedError() { return true; } // Called with lock held. Called in subscription's connection thread. void ReplicatingSubscription::sendDequeueEvent(const sys::Mutex::ScopedLock& l) { - QPID_LOG(trace, logPrefix << "Sending dequeues " << dequeues); + QPID_LOG(trace, logPrefix << "sending dequeues " << dequeues + << " from " << getQueue()->getName() << logSuffix); string buf(dequeues.encodedSize(),'\0'); framing::Buffer buffer(&buf[0], buf.size()); dequeues.encode(buffer); @@ -216,7 +233,7 @@ void ReplicatingSubscription::dequeued(const QueuedMessage& qm) { { sys::Mutex::ScopedLock l(lock); - QPID_LOG(trace, logPrefix << "Dequeued message " << qm.position); + QPID_LOG(trace, logPrefix << "dequeued " << qm << logSuffix); dequeues.add(qm.position); // If we have not yet sent this message to the backup, then // complete it now as it will never be accepted. @@ -229,8 +246,8 @@ void ReplicatingSubscription::dequeued(const QueuedMessage& qm) void ReplicatingSubscription::sendPositionEvent( SequenceNumber position, const sys::Mutex::ScopedLock&l ) { - QPID_LOG(trace, logPrefix << "Sending position " << position - << ", was " << backupPosition); + QPID_LOG(trace, logPrefix << "sending position " << position + << ", was " << backupPosition << logSuffix); string buf(backupPosition.encodedSize(),'\0'); framing::Buffer buffer(&buf[0], buf.size()); position.encode(buffer); diff --git a/cpp/src/qpid/ha/ReplicatingSubscription.h b/cpp/src/qpid/ha/ReplicatingSubscription.h index e311f9505a..f9176915f6 100644 --- a/cpp/src/qpid/ha/ReplicatingSubscription.h +++ b/cpp/src/qpid/ha/ReplicatingSubscription.h @@ -33,7 +33,7 @@ namespace qpid { namespace broker { class Message; class Queue; -class QueuedMessage; +struct QueuedMessage; class OwnershipToken; } @@ -94,7 +94,7 @@ class ReplicatingSubscription : public broker::SemanticState::ConsumerImpl, bool doDispatch(); private: typedef std::map<framing::SequenceNumber, broker::QueuedMessage> Delayed; - std::string logPrefix; + std::string logPrefix, logSuffix; boost::shared_ptr<broker::Queue> events; boost::shared_ptr<broker::Consumer> consumer; Delayed delayed; diff --git a/cpp/src/qpid/ha/Settings.h b/cpp/src/qpid/ha/Settings.h index 049c873b9f..bf70c3f3f7 100644 --- a/cpp/src/qpid/ha/Settings.h +++ b/cpp/src/qpid/ha/Settings.h @@ -22,6 +22,7 @@ * */ +#include "ReplicateLevel.h" #include <string> namespace qpid { @@ -33,10 +34,12 @@ namespace ha { class Settings { public: - Settings() : enabled(false) {} - bool enabled; + Settings() : cluster(false), expectedBackups(0), replicateDefault(RL_NONE) {} + bool cluster; // True if we are a cluster member. std::string clientUrl; std::string brokerUrl; + size_t expectedBackups; + ReplicateLevel replicateDefault; std::string username, password, mechanism; private: }; diff --git a/cpp/src/qpid/ha/management-schema.xml b/cpp/src/qpid/ha/management-schema.xml index fe4a14d111..363dff61fb 100644 --- a/cpp/src/qpid/ha/management-schema.xml +++ b/cpp/src/qpid/ha/management-schema.xml @@ -22,16 +22,39 @@ <!-- Monitor and control HA status of a broker. --> <class name="HaBroker"> <property name="name" type="sstr" access="RC" index="y" desc="Primary Key"/> + <property name="status" type="sstr" desc="HA status: primary or backup"/> - <property name="clientAddresses" type="sstr" desc="List of addresses used by clients to connect to the HA cluster."/> - <property name="brokerAddresses" type="sstr" desc="List of addresses used by HA brokers to connect to each other."/> + + <property name="brokers" type="sstr" + desc="Multiple-address URL used by HA brokers to connect to each other."/> + + <property name="publicBrokers" type="sstr" + desc="Multiple-address URL used by clients to connect to the HA brokers."/> + + <property name="expectedBackups" type="uint16" + desc="Number of HA backup brokers expected."/> + + <property + name="replicateDefault" type="sstr" + desc="Replicate value for queues/exchanges without a qpid.replicate argument"/> <method name="promote" desc="Promote a backup broker to primary."/> - <method name="setClientAddresses" desc="Set HA client addresses"> - <arg name="clientAddresses" type="sstr" dir="I"/> + + <method name="setBrokers" desc="Set URL for HA brokers to connect to each other."> + <arg name="url" type="sstr" dir="I"/> </method> - <method name="setBrokerAddresses" desc="Set HA broker addresses"> - <arg name="brokerAddresses" type="sstr" dir="I"/> + + <method name="setPublicBrokers" desc="Set URL for clients to connect to HA brokers"> + <arg name="url" type="sstr" dir="I"/> + </method> + + <method name="setExpectedBackups" desc="Set number of backups expected"> + <arg name="expectedBackups" type="uint16" dir="I"/> + </method> + + <method name="replicate" desc="Replicate individual queue from remote broker."> + <arg name="broker" type="sstr" dir="I"/> + <arg name="queue" type="sstr" dir="I"/> </method> </class> |