diff options
Diffstat (limited to 'qpid/cpp/src/qpid/ha/BrokerReplicator.cpp')
-rw-r--r-- | qpid/cpp/src/qpid/ha/BrokerReplicator.cpp | 194 |
1 files changed, 110 insertions, 84 deletions
diff --git a/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp b/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp index 039ec8c623..690337831c 100644 --- a/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp +++ b/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp @@ -19,6 +19,7 @@ * */ #include "BrokerReplicator.h" +#include "HaBroker.h" #include "QueueReplicator.h" #include "qpid/broker/Broker.h" #include "qpid/broker/Queue.h" @@ -37,6 +38,7 @@ #include "qmf/org/apache/qpid/broker/EventQueueDelete.h" #include "qmf/org/apache/qpid/broker/EventSubscribe.h" #include <algorithm> +#include <sstream> namespace qpid { namespace ha { @@ -87,6 +89,7 @@ const string QUEUE("queue"); const string RHOST("rhost"); const string TYPE("type"); const string USER("user"); +const string HA_BROKER("habroker"); const string AGENT_IND_EVENT_ORG_APACHE_QPID_BROKER("agent.ind.event.org_apache_qpid_broker.#"); const string QMF2("qmf2"); @@ -100,6 +103,7 @@ const string _PACKAGE_NAME("_package_name"); const string _SCHEMA_ID("_schema_id"); const string OBJECT("OBJECT"); const string ORG_APACHE_QPID_BROKER("org.apache.qpid.broker"); +const string ORG_APACHE_QPID_HA("org.apache.qpid.ha"); const string QMF_DEFAULT_DIRECT("qmf.default.direct"); const string _QUERY_REQUEST("_query_request"); const string BROKER("broker"); @@ -113,36 +117,13 @@ template <class T> bool match(Variant::Map& schema) { return T::match(schema[CLASS_NAME], schema[PACKAGE_NAME]); } -enum ReplicateLevel { RL_NONE=0, RL_CONFIGURATION, RL_ALL }; -const string S_NONE="none"; -const string S_CONFIGURATION="configuration"; -const string S_ALL="all"; - -ReplicateLevel replicateLevel(const string& level) { - if (level == S_NONE) return RL_NONE; - if (level == S_CONFIGURATION) return RL_CONFIGURATION; - if (level == S_ALL) return RL_ALL; - throw Exception("Invalid value for "+QPID_REPLICATE+": "+level); -} - -ReplicateLevel replicateLevel(const framing::FieldTable& f) { - if (f.isSet(QPID_REPLICATE)) return replicateLevel(f.getAsString(QPID_REPLICATE)); - else return RL_NONE; -} - -ReplicateLevel replicateLevel(const Variant::Map& m) { - Variant::Map::const_iterator i = m.find(QPID_REPLICATE); - if (i != m.end()) return replicateLevel(i->second.asString()); - else return RL_NONE; -} - -void sendQuery(const string className, const string& queueName, SessionHandler& sessionHandler) { +void sendQuery(const string& packageName, const string& className, const string& queueName, SessionHandler& sessionHandler) { framing::AMQP_ServerProxy peer(sessionHandler.out); Variant::Map request; request[_WHAT] = OBJECT; Variant::Map schema; schema[_CLASS_NAME] = className; - schema[_PACKAGE_NAME] = ORG_APACHE_QPID_BROKER; + schema[_PACKAGE_NAME] = packageName; request[_SCHEMA_ID] = schema; AMQFrame method((MessageTransferBody(ProtocolVersion(), QMF_DEFAULT_DIRECT, 0, 0))); @@ -181,13 +162,34 @@ Variant::Map asMapVoid(const Variant& value) { } // namespace + +ReplicateLevel BrokerReplicator::replicateLevel(const std::string& str) { + ReplicateLevel rl; + if (qpid::ha::replicateLevel(str, rl)) return rl; + else return haBroker.getSettings().replicateDefault; +} + +ReplicateLevel BrokerReplicator::replicateLevel(const framing::FieldTable& f) { + if (f.isSet(QPID_REPLICATE)) + return replicateLevel(f.getAsString(QPID_REPLICATE)); + else + return haBroker.getSettings().replicateDefault; +} + +ReplicateLevel BrokerReplicator::replicateLevel(const Variant::Map& m) { + Variant::Map::const_iterator i = m.find(QPID_REPLICATE); + if (i != m.end()) + return replicateLevel(i->second.asString()); + else + return haBroker.getSettings().replicateDefault; +} + BrokerReplicator::~BrokerReplicator() {} -BrokerReplicator::BrokerReplicator(const boost::shared_ptr<Link>& l) - : Exchange(QPID_CONFIGURATION_REPLICATOR), broker(*l->getBroker()), link(l) +BrokerReplicator::BrokerReplicator(HaBroker& hb, const boost::shared_ptr<Link>& l) + : Exchange(QPID_CONFIGURATION_REPLICATOR), + haBroker(hb), broker(hb.getBroker()), link(l) { - QPID_LOG(info, "HA: Backup replicating from " << - link->getTransport() << ":" << link->getHost() << ":" << link->getPort()); framing::Uuid uuid(true); const std::string name(QPID_CONFIGURATION_REPLICATOR + ".bridge." + uuid.str()); broker.getLinks().declare( @@ -214,21 +216,24 @@ void BrokerReplicator::initializeBridge(Bridge& bridge, SessionHandler& sessionH const qmf::org::apache::qpid::broker::ArgsLinkBridge& args(bridge.getArgs()); //declare and bind an event queue - peer.getQueue().declare(queueName, "", false, false, true, true, FieldTable()); + FieldTable declareArgs; + declareArgs.setString(QPID_REPLICATE, str(RL_NONE)); + peer.getQueue().declare(queueName, "", false, false, true, true, declareArgs); peer.getExchange().bind(queueName, QMF_DEFAULT_TOPIC, AGENT_IND_EVENT_ORG_APACHE_QPID_BROKER, FieldTable()); //subscribe to the queue peer.getMessage().subscribe(queueName, args.i_dest, 1, 0, false, "", 0, FieldTable()); peer.getMessage().flow(args.i_dest, 0, 0xFFFFFFFF); peer.getMessage().flow(args.i_dest, 1, 0xFFFFFFFF); - //issue a query request for queues and another for exchanges using event queue as the reply-to address - sendQuery(QUEUE, queueName, sessionHandler); - sendQuery(EXCHANGE, queueName, sessionHandler); - sendQuery(BINDING, queueName, sessionHandler); - QPID_LOG(debug, "HA: Backup activated configuration bridge: " << queueName); + // Issue a query request for queues, exchanges, bindings and the habroker + // using event queue as the reply-to address + sendQuery(ORG_APACHE_QPID_HA, HA_BROKER, queueName, sessionHandler); + sendQuery(ORG_APACHE_QPID_BROKER, QUEUE, queueName, sessionHandler); + sendQuery(ORG_APACHE_QPID_BROKER, EXCHANGE, queueName, sessionHandler); + sendQuery(ORG_APACHE_QPID_BROKER, BINDING, queueName, sessionHandler); + QPID_LOG(debug, "HA: Backup configuration bridge: " << queueName); } -// FIXME aconway 2011-12-02: error handling in route. void BrokerReplicator::route(Deliverable& msg) { const framing::FieldTable* headers = msg.getMessage().getApplicationHeaders(); Variant::List list; @@ -242,6 +247,7 @@ void BrokerReplicator::route(Deliverable& msg) { if (headers->getAsString(QMF_CONTENT) == EVENT) { for (Variant::List::iterator i = list.begin(); i != list.end(); ++i) { Variant::Map& map = i->asMap(); + QPID_LOG(trace, "HA: Backup received event: " << map); Variant::Map& schema = map[SCHEMA_ID].asMap(); Variant::Map& values = map[VALUES].asMap(); if (match<EventQueueDeclare>(schema)) doEventQueueDeclare(values); @@ -253,24 +259,26 @@ void BrokerReplicator::route(Deliverable& msg) { } } else if (headers->getAsString(QMF_OPCODE) == QUERY_RESPONSE) { for (Variant::List::iterator i = list.begin(); i != list.end(); ++i) { - string type = i->asMap()[SCHEMA_ID].asMap()[CLASS_NAME]; - Variant::Map& values = i->asMap()[VALUES].asMap(); + Variant::Map& map = i->asMap(); + QPID_LOG(trace, "HA: Backup received event: " << map); + string type = map[SCHEMA_ID].asMap()[CLASS_NAME]; + Variant::Map& values = map[VALUES].asMap(); framing::FieldTable args; amqp_0_10::translate(asMapVoid(values[ARGUMENTS]), args); if (type == QUEUE) doResponseQueue(values); else if (type == EXCHANGE) doResponseExchange(values); else if (type == BINDING) doResponseBind(values); - else QPID_LOG(error, "HA: Backup received unknown response type=" << type - << " values=" << values); + else if (type == HA_BROKER) doResponseHaBroker(values); } - } else QPID_LOG(error, "HA: Backup received unexpected message: " << *headers); + } } catch (const std::exception& e) { - QPID_LOG(error, "HA: Backup replication error: " << e.what() << ": while handling: " << list); + QPID_LOG(critical, "HA: Backup configuration failed: " << e.what() + << ": while handling: " << list); + throw; } } void BrokerReplicator::doEventQueueDeclare(Variant::Map& values) { - QPID_LOG(debug, "HA: Backup queue declare event " << values); string name = values[QNAME].asString(); Variant::Map argsMap = asMapVoid(values[ARGS]); if (values[DISP] == CREATED && replicateLevel(argsMap)) { @@ -287,27 +295,27 @@ void BrokerReplicator::doEventQueueDeclare(Variant::Map& values) { values[USER].asString(), values[RHOST].asString()); if (result.second) { - // FIXME aconway 2011-11-22: should delete old queue and - // re-create from event. - // Events are always up to date, whereas responses may be - // out of date. - QPID_LOG(debug, "HA: Backup created queue: " << name); + QPID_LOG(debug, "HA: Backup queue declare event: " << name); startQueueReplicator(result.first); } else { // FIXME aconway 2011-12-02: what's the right way to handle this? - QPID_LOG(warning, "HA: Backup queue already exists: " << name); + // Should we delete the old & re-create form the event? Responses + // may be old but events are always up-to-date. + QPID_LOG(warning, "HA: Backup queue declare event, already exists: " << name); } } } void BrokerReplicator::doEventQueueDelete(Variant::Map& values) { - QPID_LOG(debug, "HA: Backup queue delete event " << values); // The remote queue has already been deleted so replicator // sessions may be closed by a "queue deleted" exception. string name = values[QNAME].asString(); boost::shared_ptr<Queue> queue = broker.getQueues().find(name); - if (queue && replicateLevel(queue->getSettings())) { - QPID_LOG(debug, "HA: Backup deleting queue: " << name); + if (!queue) { + QPID_LOG(warning, "HA: Backup queue delete event, does not exist: " << name); + } else if (!replicateLevel(queue->getSettings())) { + QPID_LOG(warning, "HA: Backup queue delete event, not replicated: " << name); + } else { string rname = QueueReplicator::replicatorName(name); boost::shared_ptr<broker::Exchange> ex = broker.getExchanges().find(rname); boost::shared_ptr<QueueReplicator> qr = boost::dynamic_pointer_cast<QueueReplicator>(ex); @@ -316,11 +324,11 @@ void BrokerReplicator::doEventQueueDelete(Variant::Map& values) { // actually be destroyed, deleting the exhange broker.getExchanges().destroy(rname); broker.deleteQueue(name, values[USER].asString(), values[RHOST].asString()); + QPID_LOG(debug, "HA: Backup queue delete event: " << name); } } void BrokerReplicator::doEventExchangeDeclare(Variant::Map& values) { - QPID_LOG(debug, "HA: Backup exchange declare event " << values); Variant::Map argsMap(asMapVoid(values[ARGS])); if (values[DISP] == CREATED && replicateLevel(argsMap)) { string name = values[EXNAME].asString(); @@ -335,32 +343,32 @@ void BrokerReplicator::doEventExchangeDeclare(Variant::Map& values) { values[USER].asString(), values[RHOST].asString()).second) { - QPID_LOG(debug, "HA: Backup created exchange: " << name); + QPID_LOG(debug, "HA: Backup exchange declare event: " << name); } else { - // FIXME aconway 2011-11-22: should delete pre-exisitng exchange + // FIXME aconway 2011-11-22: should delete pre-existing exchange // and re-create from event. See comment in doEventQueueDeclare. - QPID_LOG(warning, "HA: Backup exchange already exists: " << name); + QPID_LOG(debug, "HA: Backup exchange declare event, already exists: " << name); } } } void BrokerReplicator::doEventExchangeDelete(Variant::Map& values) { - QPID_LOG(debug, "HA: Backup exchange delete event " << values); string name = values[EXNAME].asString(); - try { - boost::shared_ptr<Exchange> exchange = broker.getExchanges().find(name); - if (exchange && replicateLevel(exchange->getArgs())) { - QPID_LOG(debug, "HA: Backup deleting exchange:" << name); - broker.deleteExchange( - name, - values[USER].asString(), - values[RHOST].asString()); - } - } catch (const framing::NotFoundException&) {} + boost::shared_ptr<Exchange> exchange = broker.getExchanges().find(name); + if (!exchange) { + QPID_LOG(warning, "HA: Backup exchange delete event, does not exist: " << name); + } else if (!replicateLevel(exchange->getArgs())) { + QPID_LOG(warning, "HA: Backup exchange delete event, not replicated: " << name); + } else { + QPID_LOG(debug, "HA: Backup exchange delete event:" << name); + broker.deleteExchange( + name, + values[USER].asString(), + values[RHOST].asString()); + } } void BrokerReplicator::doEventBind(Variant::Map& values) { - QPID_LOG(debug, "HA: Backup bind event " << values); boost::shared_ptr<Exchange> exchange = broker.getExchanges().find(values[EXNAME].asString()); boost::shared_ptr<Queue> queue = @@ -373,15 +381,14 @@ void BrokerReplicator::doEventBind(Variant::Map& values) { framing::FieldTable args; amqp_0_10::translate(asMapVoid(values[ARGS]), args); string key = values[KEY].asString(); - QPID_LOG(debug, "HA: Backup replicated binding exchange=" << exchange->getName() + exchange->bind(queue, key, &args); + QPID_LOG(debug, "HA: Backup bind event: exchange=" << exchange->getName() << " queue=" << queue->getName() << " key=" << key); - exchange->bind(queue, key, &args); } } void BrokerReplicator::doEventUnbind(Variant::Map& values) { - QPID_LOG(debug, "HA: Backup unbind event " << values); boost::shared_ptr<Exchange> exchange = broker.getExchanges().find(values[EXNAME].asString()); boost::shared_ptr<Queue> queue = @@ -394,16 +401,14 @@ void BrokerReplicator::doEventUnbind(Variant::Map& values) { framing::FieldTable args; amqp_0_10::translate(asMapVoid(values[ARGS]), args); string key = values[KEY].asString(); - QPID_LOG(debug, "HA: Backup replicated unbinding exchange=" << exchange->getName() + exchange->unbind(queue, key, &args); + QPID_LOG(debug, "HA: Backup unbind event: exchange=" << exchange->getName() << " queue=" << queue->getName() << " key=" << key); - exchange->unbind(queue, key, &args); } } void BrokerReplicator::doResponseQueue(Variant::Map& values) { - QPID_LOG(debug, "HA: Backup queue response " << values); - // FIXME aconway 2011-11-22: more flexible ways & defaults to indicate replication Variant::Map argsMap(asMapVoid(values[ARGUMENTS])); if (!replicateLevel(argsMap)) return; framing::FieldTable args; @@ -420,17 +425,16 @@ void BrokerReplicator::doResponseQueue(Variant::Map& values) { ""/*TODO: who is the user?*/, ""/*TODO: what should we use as connection id?*/); if (result.second) { - QPID_LOG(debug, "HA: Backup created catch-up queue: " << values[NAME]); + QPID_LOG(debug, "HA: Backup queue response: " << name); startQueueReplicator(result.first); } else { // FIXME aconway 2011-11-22: Normal to find queue already // exists if we're failing over. - QPID_LOG(warning, "HA: Backup catch-up queue already exists: " << name); + QPID_LOG(warning, "HA: Backup queue response, already exists: " << name); } } void BrokerReplicator::doResponseExchange(Variant::Map& values) { - QPID_LOG(debug, "HA: Backup exchange response " << values); Variant::Map argsMap(asMapVoid(values[ARGUMENTS])); if (!replicateLevel(argsMap)) return; framing::FieldTable args; @@ -444,9 +448,10 @@ void BrokerReplicator::doResponseExchange(Variant::Map& values) { ""/*TODO: who is the user?*/, ""/*TODO: what should we use as connection id?*/).second) { - QPID_LOG(debug, "HA: Backup catch-up exchange: " << values[NAME]); + QPID_LOG(debug, "HA: Backup exchange response: " << values[NAME].asString()); } else { - QPID_LOG(warning, "HA: Backup catch-up exchange already exists: " << values[QNAME]); + QPID_LOG(warning, "HA: Backup exchange query, already exists: " << + values[QNAME].asString()); } } @@ -472,12 +477,10 @@ const std::string QUEUE_REF("queueRef"); } // namespace void BrokerReplicator::doResponseBind(Variant::Map& values) { - QPID_LOG(debug, "HA: Backup bind response " << values); std::string exName = getRefName(EXCHANGE_REF_PREFIX, values[EXCHANGE_REF]); std::string qName = getRefName(QUEUE_REF_PREFIX, values[QUEUE_REF]); boost::shared_ptr<Exchange> exchange = broker.getExchanges().find(exName); boost::shared_ptr<Queue> queue = broker.getQueues().find(qName); - // FIXME aconway 2011-11-24: more flexible configuration for binding replication. // Automatically replicate binding if queue and exchange exist and are replicated if (exchange && replicateLevel(exchange->getArgs()) && @@ -487,16 +490,39 @@ void BrokerReplicator::doResponseBind(Variant::Map& values) { amqp_0_10::translate(asMapVoid(values[ARGUMENTS]), args); string key = values[KEY].asString(); exchange->bind(queue, key, &args); - QPID_LOG(debug, "HA: Backup catch-up binding: exchange=" << exchange->getName() + QPID_LOG(debug, "HA: Backup bind response: exchange=" << exchange->getName() << " queue=" << queue->getName() << " key=" << key); } } +namespace { +const string REPLICATE_DEFAULT="replicateDefault"; +} + +// Received the ha-broker configuration object for the primary broker. +void BrokerReplicator::doResponseHaBroker(Variant::Map& values) { + try { + ReplicateLevel mine = haBroker.getSettings().replicateDefault; + ReplicateLevel primary = replicateLevel(values[REPLICATE_DEFAULT].asString()); + if (mine != primary) { + std::ostringstream os; + os << "Replicate default on backup (" << mine + << ") does not match primary (" << primary << ")"; + haBroker.shutdown(os.str()); + } + } catch (const std::exception& e) { + std::ostringstream os; + os << "Received invalid replicate default from primary: " << e.what(); + haBroker.shutdown(os.str()); + } +} + void BrokerReplicator::startQueueReplicator(const boost::shared_ptr<Queue>& queue) { if (replicateLevel(queue->getSettings()) == RL_ALL) { boost::shared_ptr<QueueReplicator> qr(new QueueReplicator(queue, link)); - broker.getExchanges().registerExchange(qr); + if (!broker.getExchanges().registerExchange(qr)) + throw Exception(QPID_MSG("Duplicate queue replicator " << qr->getName())); qr->activate(); } } |