diff options
Diffstat (limited to 'qpid/cpp/src/qpid/ha/BrokerReplicator.cpp')
-rw-r--r-- | qpid/cpp/src/qpid/ha/BrokerReplicator.cpp | 497 |
1 files changed, 497 insertions, 0 deletions
diff --git a/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp b/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp new file mode 100644 index 0000000000..a8f05c1fe3 --- /dev/null +++ b/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp @@ -0,0 +1,497 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "BrokerReplicator.h" +#include "QueueReplicator.h" +#include "qpid/broker/Broker.h" +#include "qpid/broker/Queue.h" +#include "qpid/broker/Link.h" +#include "qpid/framing/FieldTable.h" +#include "qpid/log/Statement.h" +#include "qpid/amqp_0_10/Codecs.h" +#include "qpid/broker/SessionHandler.h" +#include "qpid/framing/reply_exceptions.h" +#include "qpid/framing/MessageTransferBody.h" +#include "qmf/org/apache/qpid/broker/EventBind.h" +#include "qmf/org/apache/qpid/broker/EventUnbind.h" +#include "qmf/org/apache/qpid/broker/EventExchangeDeclare.h" +#include "qmf/org/apache/qpid/broker/EventExchangeDelete.h" +#include "qmf/org/apache/qpid/broker/EventQueueDeclare.h" +#include "qmf/org/apache/qpid/broker/EventQueueDelete.h" +#include "qmf/org/apache/qpid/broker/EventSubscribe.h" +#include <algorithm> + +namespace qpid { +namespace ha { + +using qmf::org::apache::qpid::broker::EventBind; +using qmf::org::apache::qpid::broker::EventUnbind; +using qmf::org::apache::qpid::broker::EventExchangeDeclare; +using qmf::org::apache::qpid::broker::EventExchangeDelete; +using qmf::org::apache::qpid::broker::EventQueueDeclare; +using qmf::org::apache::qpid::broker::EventQueueDelete; +using qmf::org::apache::qpid::broker::EventSubscribe; +using namespace framing; +using std::string; +using types::Variant; +using namespace broker; + +namespace { + +const string QPID_CONFIGURATION_REPLICATOR("qpid.configuration-replicator"); +const string QPID_REPLICATE("qpid.replicate"); + +const string CLASS_NAME("_class_name"); +const string EVENT("_event"); +const string OBJECT_NAME("_object_name"); +const string PACKAGE_NAME("_package_name"); +const string QUERY_RESPONSE("_query_response"); +const string SCHEMA_ID("_schema_id"); +const string VALUES("_values"); + +const string ALTEX("altEx"); +const string ARGS("args"); +const string ARGUMENTS("arguments"); +const string AUTODEL("autoDel"); +const string AUTODELETE("autoDelete"); +const string BIND("bind"); +const string UNBIND("unbind"); +const string BINDING("binding"); +const string CREATED("created"); +const string DISP("disp"); +const string DURABLE("durable"); +const string EXCHANGE("exchange"); +const string EXNAME("exName"); +const string EXTYPE("exType"); +const string KEY("key"); +const string NAME("name"); +const string QNAME("qName"); +const string QUEUE("queue"); +const string RHOST("rhost"); +const string TYPE("type"); +const string USER("user"); + +const string AGENT_IND_EVENT_ORG_APACHE_QPID_BROKER("agent.ind.event.org_apache_qpid_broker.#"); +const string QMF2("qmf2"); +const string QMF_CONTENT("qmf.content"); +const string QMF_DEFAULT_TOPIC("qmf.default.topic"); +const string QMF_OPCODE("qmf.opcode"); + +const string _WHAT("_what"); +const string _CLASS_NAME("_class_name"); +const string _PACKAGE_NAME("_package_name"); +const string _SCHEMA_ID("_schema_id"); +const string OBJECT("OBJECT"); +const string ORG_APACHE_QPID_BROKER("org.apache.qpid.broker"); +const string QMF_DEFAULT_DIRECT("qmf.default.direct"); +const string _QUERY_REQUEST("_query_request"); +const string BROKER("broker"); + +bool isQMFv2(const Message& message) { + const framing::MessageProperties* props = message.getProperties<framing::MessageProperties>(); + return props && props->getAppId() == QMF2; +} + +template <class T> bool match(Variant::Map& schema) { + return T::match(schema[CLASS_NAME], schema[PACKAGE_NAME]); +} + +enum ReplicateLevel { RL_NONE=0, RL_CONFIGURATION, RL_MESSAGES }; +const string S_NONE="none"; +const string S_CONFIGURATION="configuration"; +const string S_MESSAGES="messages"; + +ReplicateLevel replicateLevel(const string& level) { + if (level == S_NONE) return RL_NONE; + if (level == S_CONFIGURATION) return RL_CONFIGURATION; + if (level == S_MESSAGES) return RL_MESSAGES; + throw Exception("Invalid value for "+QPID_REPLICATE+": "+level); +} + +ReplicateLevel replicateLevel(const framing::FieldTable& f) { + if (f.isSet(QPID_REPLICATE)) return replicateLevel(f.getAsString(QPID_REPLICATE)); + else return RL_NONE; +} + +ReplicateLevel replicateLevel(const Variant::Map& m) { + Variant::Map::const_iterator i = m.find(QPID_REPLICATE); + if (i != m.end()) return replicateLevel(i->second.asString()); + else return RL_NONE; +} + +void sendQuery(const string className, const string& queueName, SessionHandler& sessionHandler) { + framing::AMQP_ServerProxy peer(sessionHandler.out); + Variant::Map request; + request[_WHAT] = OBJECT; + Variant::Map schema; + schema[_CLASS_NAME] = className; + schema[_PACKAGE_NAME] = ORG_APACHE_QPID_BROKER; + request[_SCHEMA_ID] = schema; + + AMQFrame method((MessageTransferBody(ProtocolVersion(), QMF_DEFAULT_DIRECT, 0, 0))); + method.setBof(true); + method.setEof(false); + method.setBos(true); + method.setEos(true); + AMQHeaderBody headerBody; + MessageProperties* props = headerBody.get<MessageProperties>(true); + props->setReplyTo(qpid::framing::ReplyTo("", queueName)); + props->setAppId(QMF2); + props->getApplicationHeaders().setString(QMF_OPCODE, _QUERY_REQUEST); + headerBody.get<qpid::framing::DeliveryProperties>(true)->setRoutingKey(BROKER); + AMQFrame header(headerBody); + header.setBof(false); + header.setEof(false); + header.setBos(true); + header.setEos(true); + AMQContentBody data; + qpid::amqp_0_10::MapCodec::encode(request, data.getData()); + AMQFrame content(data); + content.setBof(false); + content.setEof(true); + content.setBos(true); + content.setEos(true); + sessionHandler.out->handle(method); + sessionHandler.out->handle(header); + sessionHandler.out->handle(content); +} + +// Like Variant::asMap but treat void value as an empty map. +Variant::Map asMapVoid(const Variant& value) { + if (!value.isVoid()) return value.asMap(); + else return Variant::Map(); +} + +} // namespace + +BrokerReplicator::~BrokerReplicator() {} + +BrokerReplicator::BrokerReplicator(const boost::shared_ptr<Link>& l) + : Exchange(QPID_CONFIGURATION_REPLICATOR), broker(*l->getBroker()), link(l) +{ + QPID_LOG(info, "HA: Backup replicating from " << + link->getTransport() << ":" << link->getHost() << ":" << link->getPort()); + broker.getLinks().declare( + link->getHost(), link->getPort(), + false, // durable + QPID_CONFIGURATION_REPLICATOR, // src + QPID_CONFIGURATION_REPLICATOR, // dest + "", // key + false, // isQueue + false, // isLocal + "", // id/tag + "", // excludes + false, // dynamic + 0, // sync? + boost::bind(&BrokerReplicator::initializeBridge, this, _1, _2) + ); +} + +// This is called in the connection IO thread when the bridge is started. +void BrokerReplicator::initializeBridge(Bridge& bridge, SessionHandler& sessionHandler) { + framing::AMQP_ServerProxy peer(sessionHandler.out); + string queueName = bridge.getQueueName(); + const qmf::org::apache::qpid::broker::ArgsLinkBridge& args(bridge.getArgs()); + + //declare and bind an event queue + peer.getQueue().declare(queueName, "", false, false, true, true, FieldTable()); + peer.getExchange().bind(queueName, QMF_DEFAULT_TOPIC, AGENT_IND_EVENT_ORG_APACHE_QPID_BROKER, FieldTable()); + //subscribe to the queue + peer.getMessage().subscribe(queueName, args.i_dest, 1, 0, false, "", 0, FieldTable()); + peer.getMessage().flow(args.i_dest, 0, 0xFFFFFFFF); + peer.getMessage().flow(args.i_dest, 1, 0xFFFFFFFF); + + //issue a query request for queues and another for exchanges using event queue as the reply-to address + sendQuery(QUEUE, queueName, sessionHandler); + sendQuery(EXCHANGE, queueName, sessionHandler); + sendQuery(BINDING, queueName, sessionHandler); + QPID_LOG(debug, "HA: Backup activated configuration bridge: " << queueName); +} + +// FIXME aconway 2011-12-02: error handling in route. +void BrokerReplicator::route(Deliverable& msg, const string& /*key*/, const framing::FieldTable* headers) { + Variant::List list; + try { + if (!isQMFv2(msg.getMessage()) || !headers) + throw Exception("Unexpected message, not QMF2 event or query response."); + // decode as list + string content = msg.getMessage().getFrames().getContent(); + amqp_0_10::ListCodec::decode(content, list); + + if (headers->getAsString(QMF_CONTENT) == EVENT) { + for (Variant::List::iterator i = list.begin(); i != list.end(); ++i) { + Variant::Map& map = i->asMap(); + Variant::Map& schema = map[SCHEMA_ID].asMap(); + Variant::Map& values = map[VALUES].asMap(); + if (match<EventQueueDeclare>(schema)) doEventQueueDeclare(values); + else if (match<EventQueueDelete>(schema)) doEventQueueDelete(values); + else if (match<EventExchangeDeclare>(schema)) doEventExchangeDeclare(values); + else if (match<EventExchangeDelete>(schema)) doEventExchangeDelete(values); + else if (match<EventBind>(schema)) doEventBind(values); + else if (match<EventUnbind>(schema)) doEventUnbind(values); + } + } else if (headers->getAsString(QMF_OPCODE) == QUERY_RESPONSE) { + for (Variant::List::iterator i = list.begin(); i != list.end(); ++i) { + string type = i->asMap()[SCHEMA_ID].asMap()[CLASS_NAME]; + Variant::Map& values = i->asMap()[VALUES].asMap(); + framing::FieldTable args; + amqp_0_10::translate(asMapVoid(values[ARGUMENTS]), args); + if (type == QUEUE) doResponseQueue(values); + else if (type == EXCHANGE) doResponseExchange(values); + else if (type == BINDING) doResponseBind(values); + else QPID_LOG(error, "HA: Backup received unknown response type=" << type + << " values=" << values); + } + } else QPID_LOG(error, "HA: Backup received unexpected message: " << *headers); + } catch (const std::exception& e) { + QPID_LOG(error, "HA: Backup replication error: " << e.what() << ": while handling: " << list); + } +} + +void BrokerReplicator::doEventQueueDeclare(Variant::Map& values) { + string name = values[QNAME].asString(); + Variant::Map argsMap = asMapVoid(values[ARGS]); + if (values[DISP] == CREATED && replicateLevel(argsMap)) { + framing::FieldTable args; + amqp_0_10::translate(argsMap, args); + std::pair<boost::shared_ptr<Queue>, bool> result = + broker.createQueue( + name, + values[DURABLE].asBool(), + values[AUTODEL].asBool(), + 0 /*i.e. no owner regardless of exclusivity on master*/, + values[ALTEX].asString(), + args, + values[USER].asString(), + values[RHOST].asString()); + if (result.second) { + // FIXME aconway 2011-11-22: should delete old queue and + // re-create from event. + // Events are always up to date, whereas responses may be + // out of date. + QPID_LOG(debug, "HA: Backup created queue: " << name); + startQueueReplicator(result.first); + } else { + // FIXME aconway 2011-12-02: what's the right way to handle this? + QPID_LOG(warning, "HA: Backup queue already exists: " << name); + } + } +} + +void BrokerReplicator::doEventQueueDelete(Variant::Map& values) { + // The remote queue has already been deleted so replicator + // sessions may be closed by a "queue deleted" exception. + string name = values[QNAME].asString(); + boost::shared_ptr<Queue> queue = broker.getQueues().find(name); + if (queue && replicateLevel(queue->getSettings())) { + QPID_LOG(debug, "HA: Backup deleting queue: " << name); + string rname = QueueReplicator::replicatorName(name); + boost::shared_ptr<broker::Exchange> ex = broker.getExchanges().find(rname); + boost::shared_ptr<QueueReplicator> qr = boost::dynamic_pointer_cast<QueueReplicator>(ex); + if (qr) qr->deactivate(); + // QueueReplicator's bridge is now queued for destruction but may not + // actually be destroyed, deleting the exhange + broker.getExchanges().destroy(rname); + broker.deleteQueue(name, values[USER].asString(), values[RHOST].asString()); + } +} + +void BrokerReplicator::doEventExchangeDeclare(Variant::Map& values) { + Variant::Map argsMap(asMapVoid(values[ARGS])); + if (values[DISP] == CREATED && replicateLevel(argsMap)) { + string name = values[EXNAME].asString(); + framing::FieldTable args; + amqp_0_10::translate(argsMap, args); + if (broker.createExchange( + name, + values[EXTYPE].asString(), + values[DURABLE].asBool(), + values[ALTEX].asString(), + args, + values[USER].asString(), + values[RHOST].asString()).second) + { + QPID_LOG(debug, "HA: Backup created exchange: " << name); + } else { + // FIXME aconway 2011-11-22: should delete pre-exisitng exchange + // and re-create from event. See comment in doEventQueueDeclare. + QPID_LOG(warning, "HA: Backup exchange already exists: " << name); + } + } +} + +void BrokerReplicator::doEventExchangeDelete(Variant::Map& values) { + string name = values[EXNAME].asString(); + try { + boost::shared_ptr<Exchange> exchange = broker.getExchanges().find(name); + if (exchange && replicateLevel(exchange->getArgs())) { + QPID_LOG(debug, "HA: Backup deleting exchange:" << name); + broker.deleteExchange( + name, + values[USER].asString(), + values[RHOST].asString()); + } + } catch (const framing::NotFoundException&) {} +} + +void BrokerReplicator::doEventBind(Variant::Map& values) { + boost::shared_ptr<Exchange> exchange = + broker.getExchanges().find(values[EXNAME].asString()); + boost::shared_ptr<Queue> queue = + broker.getQueues().find(values[QNAME].asString()); + // We only replicate binds for a replicated queue to replicated + // exchange that both exist locally. + if (exchange && replicateLevel(exchange->getArgs()) && + queue && replicateLevel(queue->getSettings())) + { + framing::FieldTable args; + amqp_0_10::translate(asMapVoid(values[ARGS]), args); + string key = values[KEY].asString(); + QPID_LOG(debug, "HA: Backup replicated binding exchange=" << exchange->getName() + << " queue=" << queue->getName() + << " key=" << key); + exchange->bind(queue, key, &args); + } +} + +void BrokerReplicator::doEventUnbind(Variant::Map& values) { + boost::shared_ptr<Exchange> exchange = + broker.getExchanges().find(values[EXNAME].asString()); + boost::shared_ptr<Queue> queue = + broker.getQueues().find(values[QNAME].asString()); + // We only replicate unbinds for a replicated queue to replicated + // exchange that both exist locally. + if (exchange && replicateLevel(exchange->getArgs()) && + queue && replicateLevel(queue->getSettings())) + { + framing::FieldTable args; + amqp_0_10::translate(asMapVoid(values[ARGS]), args); + string key = values[KEY].asString(); + QPID_LOG(debug, "HA: Backup replicated unbinding exchange=" << exchange->getName() + << " queue=" << queue->getName() + << " key=" << key); + exchange->unbind(queue, key, &args); + } +} + +void BrokerReplicator::doResponseQueue(Variant::Map& values) { + // FIXME aconway 2011-11-22: more flexible ways & defaults to indicate replication + Variant::Map argsMap(asMapVoid(values[ARGUMENTS])); + if (!replicateLevel(argsMap)) return; + framing::FieldTable args; + amqp_0_10::translate(argsMap, args); + string name(values[NAME].asString()); + std::pair<boost::shared_ptr<Queue>, bool> result = + broker.createQueue( + name, + values[DURABLE].asBool(), + values[AUTODELETE].asBool(), + 0 /*i.e. no owner regardless of exclusivity on master*/, + ""/*TODO: need to include alternate-exchange*/, + args, + ""/*TODO: who is the user?*/, + ""/*TODO: what should we use as connection id?*/); + if (result.second) { + QPID_LOG(debug, "HA: Backup created catch-up queue: " << values[NAME]); + startQueueReplicator(result.first); + } else { + // FIXME aconway 2011-11-22: Normal to find queue already + // exists if we're failing over. + QPID_LOG(warning, "HA: Backup catch-up queue already exists: " << name); + } +} + +void BrokerReplicator::doResponseExchange(Variant::Map& values) { + Variant::Map argsMap(asMapVoid(values[ARGUMENTS])); + if (!replicateLevel(argsMap)) return; + framing::FieldTable args; + amqp_0_10::translate(argsMap, args); + if (broker.createExchange( + values[NAME].asString(), + values[TYPE].asString(), + values[DURABLE].asBool(), + ""/*TODO: need to include alternate-exchange*/, + args, + ""/*TODO: who is the user?*/, + ""/*TODO: what should we use as connection id?*/).second) + { + QPID_LOG(debug, "HA: Backup catch-up exchange: " << values[NAME]); + } else { + QPID_LOG(warning, "HA: Backup catch-up exchange already exists: " << values[QNAME]); + } +} + +namespace { +const std::string QUEUE_REF_PREFIX("org.apache.qpid.broker:queue:"); +const std::string EXCHANGE_REF_PREFIX("org.apache.qpid.broker:exchange:"); + +std::string getRefName(const std::string& prefix, const Variant& ref) { + Variant::Map map(ref.asMap()); + Variant::Map::const_iterator i = map.find(OBJECT_NAME); + if (i == map.end()) + throw Exception(QPID_MSG("Replicator: invalid object reference: " << ref)); + const std::string name = i->second.asString(); + if (name.compare(0, prefix.size(), prefix) != 0) + throw Exception(QPID_MSG("Replicator: unexpected reference prefix: " << name)); + std::string ret = name.substr(prefix.size()); + return ret; +} + +const std::string EXCHANGE_REF("exchangeRef"); +const std::string QUEUE_REF("queueRef"); + +} // namespace + +void BrokerReplicator::doResponseBind(Variant::Map& values) { + std::string exName = getRefName(EXCHANGE_REF_PREFIX, values[EXCHANGE_REF]); + std::string qName = getRefName(QUEUE_REF_PREFIX, values[QUEUE_REF]); + boost::shared_ptr<Exchange> exchange = broker.getExchanges().find(exName); + boost::shared_ptr<Queue> queue = broker.getQueues().find(qName); + // FIXME aconway 2011-11-24: more flexible configuration for binding replication. + + // Automatically replicate binding if queue and exchange exist and are replicated + if (exchange && replicateLevel(exchange->getArgs()) && + queue && replicateLevel(queue->getSettings())) + { + framing::FieldTable args; + amqp_0_10::translate(asMapVoid(values[ARGUMENTS]), args); + string key = values[KEY].asString(); + exchange->bind(queue, key, &args); + QPID_LOG(debug, "HA: Backup catch-up binding: exchange=" << exchange->getName() + << " queue=" << queue->getName() + << " key=" << key); + } +} + +void BrokerReplicator::startQueueReplicator(const boost::shared_ptr<Queue>& queue) { + if (replicateLevel(queue->getSettings()) == RL_MESSAGES) { + boost::shared_ptr<QueueReplicator> qr(new QueueReplicator(queue, link)); + broker.getExchanges().registerExchange(qr); + qr->activate(); + } +} + +bool BrokerReplicator::bind(boost::shared_ptr<Queue>, const string&, const framing::FieldTable*) { return false; } +bool BrokerReplicator::unbind(boost::shared_ptr<Queue>, const string&, const framing::FieldTable*) { return false; } +bool BrokerReplicator::isBound(boost::shared_ptr<Queue>, const string* const, const framing::FieldTable* const) { return false; } + +string BrokerReplicator::getType() const { return QPID_CONFIGURATION_REPLICATOR; } + +}} // namespace broker |