/* * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. * */ #include "BrokerReplicator.h" #include "QueueReplicator.h" #include "qpid/broker/Broker.h" #include "qpid/broker/Queue.h" #include "qpid/broker/Link.h" #include "qpid/framing/FieldTable.h" #include "qpid/log/Statement.h" #include "qpid/amqp_0_10/Codecs.h" #include "qpid/broker/SessionHandler.h" #include "qpid/framing/reply_exceptions.h" #include "qpid/framing/MessageTransferBody.h" #include "qmf/org/apache/qpid/broker/EventBind.h" #include "qmf/org/apache/qpid/broker/EventUnbind.h" #include "qmf/org/apache/qpid/broker/EventExchangeDeclare.h" #include "qmf/org/apache/qpid/broker/EventExchangeDelete.h" #include "qmf/org/apache/qpid/broker/EventQueueDeclare.h" #include "qmf/org/apache/qpid/broker/EventQueueDelete.h" #include "qmf/org/apache/qpid/broker/EventSubscribe.h" #include namespace qpid { namespace ha { using qmf::org::apache::qpid::broker::EventBind; using qmf::org::apache::qpid::broker::EventUnbind; using qmf::org::apache::qpid::broker::EventExchangeDeclare; using qmf::org::apache::qpid::broker::EventExchangeDelete; using qmf::org::apache::qpid::broker::EventQueueDeclare; using qmf::org::apache::qpid::broker::EventQueueDelete; using qmf::org::apache::qpid::broker::EventSubscribe; using namespace framing; using std::string; using types::Variant; using namespace broker; namespace { const string QPID_CONFIGURATION_REPLICATOR("qpid.configuration-replicator"); const string QPID_REPLICATE("qpid.replicate"); const string CLASS_NAME("_class_name"); const string EVENT("_event"); const string OBJECT_NAME("_object_name"); const string PACKAGE_NAME("_package_name"); const string QUERY_RESPONSE("_query_response"); const string SCHEMA_ID("_schema_id"); const string VALUES("_values"); const string ALTEX("altEx"); const string ARGS("args"); const string ARGUMENTS("arguments"); const string AUTODEL("autoDel"); const string AUTODELETE("autoDelete"); const string BIND("bind"); const string UNBIND("unbind"); const string BINDING("binding"); const string CREATED("created"); const string DISP("disp"); const string DURABLE("durable"); const string EXCHANGE("exchange"); const string EXNAME("exName"); const string EXTYPE("exType"); const string KEY("key"); const string NAME("name"); const string QNAME("qName"); const string QUEUE("queue"); const string RHOST("rhost"); const string TYPE("type"); const string USER("user"); const string AGENT_IND_EVENT_ORG_APACHE_QPID_BROKER("agent.ind.event.org_apache_qpid_broker.#"); const string QMF2("qmf2"); const string QMF_CONTENT("qmf.content"); const string QMF_DEFAULT_TOPIC("qmf.default.topic"); const string QMF_OPCODE("qmf.opcode"); const string _WHAT("_what"); const string _CLASS_NAME("_class_name"); const string _PACKAGE_NAME("_package_name"); const string _SCHEMA_ID("_schema_id"); const string OBJECT("OBJECT"); const string ORG_APACHE_QPID_BROKER("org.apache.qpid.broker"); const string QMF_DEFAULT_DIRECT("qmf.default.direct"); const string _QUERY_REQUEST("_query_request"); const string BROKER("broker"); bool isQMFv2(const Message& message) { const framing::MessageProperties* props = message.getProperties(); return props && props->getAppId() == QMF2; } template bool match(Variant::Map& schema) { return T::match(schema[CLASS_NAME], schema[PACKAGE_NAME]); } enum ReplicateLevel { RL_NONE=0, RL_CONFIGURATION, RL_MESSAGES }; const string S_NONE="none"; const string S_CONFIGURATION="configuration"; const string S_MESSAGES="messages"; ReplicateLevel replicateLevel(const string& level) { if (level == S_NONE) return RL_NONE; if (level == S_CONFIGURATION) return RL_CONFIGURATION; if (level == S_MESSAGES) return RL_MESSAGES; throw Exception("Invalid value for "+QPID_REPLICATE+": "+level); } ReplicateLevel replicateLevel(const framing::FieldTable& f) { if (f.isSet(QPID_REPLICATE)) return replicateLevel(f.getAsString(QPID_REPLICATE)); else return RL_NONE; } ReplicateLevel replicateLevel(const Variant::Map& m) { Variant::Map::const_iterator i = m.find(QPID_REPLICATE); if (i != m.end()) return replicateLevel(i->second.asString()); else return RL_NONE; } void sendQuery(const string className, const string& queueName, SessionHandler& sessionHandler) { framing::AMQP_ServerProxy peer(sessionHandler.out); Variant::Map request; request[_WHAT] = OBJECT; Variant::Map schema; schema[_CLASS_NAME] = className; schema[_PACKAGE_NAME] = ORG_APACHE_QPID_BROKER; request[_SCHEMA_ID] = schema; AMQFrame method((MessageTransferBody(ProtocolVersion(), QMF_DEFAULT_DIRECT, 0, 0))); method.setBof(true); method.setEof(false); method.setBos(true); method.setEos(true); AMQHeaderBody headerBody; MessageProperties* props = headerBody.get(true); props->setReplyTo(qpid::framing::ReplyTo("", queueName)); props->setAppId(QMF2); props->getApplicationHeaders().setString(QMF_OPCODE, _QUERY_REQUEST); headerBody.get(true)->setRoutingKey(BROKER); AMQFrame header(headerBody); header.setBof(false); header.setEof(false); header.setBos(true); header.setEos(true); AMQContentBody data; qpid::amqp_0_10::MapCodec::encode(request, data.getData()); AMQFrame content(data); content.setBof(false); content.setEof(true); content.setBos(true); content.setEos(true); sessionHandler.out->handle(method); sessionHandler.out->handle(header); sessionHandler.out->handle(content); } // Like Variant::asMap but treat void value as an empty map. Variant::Map asMapVoid(const Variant& value) { if (!value.isVoid()) return value.asMap(); else return Variant::Map(); } } // namespace BrokerReplicator::~BrokerReplicator() {} BrokerReplicator::BrokerReplicator(const boost::shared_ptr& l) : Exchange(QPID_CONFIGURATION_REPLICATOR), broker(*l->getBroker()), link(l) { QPID_LOG(info, "HA: Backup replicating from " << link->getTransport() << ":" << link->getHost() << ":" << link->getPort()); broker.getLinks().declare( link->getHost(), link->getPort(), false, // durable QPID_CONFIGURATION_REPLICATOR, // src QPID_CONFIGURATION_REPLICATOR, // dest "", // key false, // isQueue false, // isLocal "", // id/tag "", // excludes false, // dynamic 0, // sync? boost::bind(&BrokerReplicator::initializeBridge, this, _1, _2) ); } // This is called in the connection IO thread when the bridge is started. void BrokerReplicator::initializeBridge(Bridge& bridge, SessionHandler& sessionHandler) { framing::AMQP_ServerProxy peer(sessionHandler.out); string queueName = bridge.getQueueName(); const qmf::org::apache::qpid::broker::ArgsLinkBridge& args(bridge.getArgs()); //declare and bind an event queue peer.getQueue().declare(queueName, "", false, false, true, true, FieldTable()); peer.getExchange().bind(queueName, QMF_DEFAULT_TOPIC, AGENT_IND_EVENT_ORG_APACHE_QPID_BROKER, FieldTable()); //subscribe to the queue peer.getMessage().subscribe(queueName, args.i_dest, 1, 0, false, "", 0, FieldTable()); peer.getMessage().flow(args.i_dest, 0, 0xFFFFFFFF); peer.getMessage().flow(args.i_dest, 1, 0xFFFFFFFF); //issue a query request for queues and another for exchanges using event queue as the reply-to address sendQuery(QUEUE, queueName, sessionHandler); sendQuery(EXCHANGE, queueName, sessionHandler); sendQuery(BINDING, queueName, sessionHandler); QPID_LOG(debug, "HA: Backup activated configuration bridge: " << queueName); } // FIXME aconway 2011-12-02: error handling in route. void BrokerReplicator::route(Deliverable& msg, const string& /*key*/, const framing::FieldTable* headers) { Variant::List list; try { if (!isQMFv2(msg.getMessage()) || !headers) throw Exception("Unexpected message, not QMF2 event or query response."); // decode as list string content = msg.getMessage().getFrames().getContent(); amqp_0_10::ListCodec::decode(content, list); if (headers->getAsString(QMF_CONTENT) == EVENT) { for (Variant::List::iterator i = list.begin(); i != list.end(); ++i) { Variant::Map& map = i->asMap(); Variant::Map& schema = map[SCHEMA_ID].asMap(); Variant::Map& values = map[VALUES].asMap(); if (match(schema)) doEventQueueDeclare(values); else if (match(schema)) doEventQueueDelete(values); else if (match(schema)) doEventExchangeDeclare(values); else if (match(schema)) doEventExchangeDelete(values); else if (match(schema)) doEventBind(values); else if (match(schema)) doEventUnbind(values); } } else if (headers->getAsString(QMF_OPCODE) == QUERY_RESPONSE) { for (Variant::List::iterator i = list.begin(); i != list.end(); ++i) { string type = i->asMap()[SCHEMA_ID].asMap()[CLASS_NAME]; Variant::Map& values = i->asMap()[VALUES].asMap(); framing::FieldTable args; amqp_0_10::translate(asMapVoid(values[ARGUMENTS]), args); if (type == QUEUE) doResponseQueue(values); else if (type == EXCHANGE) doResponseExchange(values); else if (type == BINDING) doResponseBind(values); else QPID_LOG(error, "HA: Backup received unknown response type=" << type << " values=" << values); } } else QPID_LOG(error, "HA: Backup received unexpected message: " << *headers); } catch (const std::exception& e) { QPID_LOG(error, "HA: Backup replication error: " << e.what() << ": while handling: " << list); } } void BrokerReplicator::doEventQueueDeclare(Variant::Map& values) { string name = values[QNAME].asString(); Variant::Map argsMap = asMapVoid(values[ARGS]); if (values[DISP] == CREATED && replicateLevel(argsMap)) { framing::FieldTable args; amqp_0_10::translate(argsMap, args); std::pair, bool> result = broker.createQueue( name, values[DURABLE].asBool(), values[AUTODEL].asBool(), 0 /*i.e. no owner regardless of exclusivity on master*/, values[ALTEX].asString(), args, values[USER].asString(), values[RHOST].asString()); if (result.second) { // FIXME aconway 2011-11-22: should delete old queue and // re-create from event. // Events are always up to date, whereas responses may be // out of date. QPID_LOG(debug, "HA: Backup created queue: " << name); startQueueReplicator(result.first); } else { // FIXME aconway 2011-12-02: what's the right way to handle this? QPID_LOG(warning, "HA: Backup queue already exists: " << name); } } } void BrokerReplicator::doEventQueueDelete(Variant::Map& values) { // The remote queue has already been deleted so replicator // sessions may be closed by a "queue deleted" exception. string name = values[QNAME].asString(); boost::shared_ptr queue = broker.getQueues().find(name); if (queue && replicateLevel(queue->getSettings())) { QPID_LOG(debug, "HA: Backup deleting queue: " << name); string rname = QueueReplicator::replicatorName(name); boost::shared_ptr ex = broker.getExchanges().find(rname); boost::shared_ptr qr = boost::dynamic_pointer_cast(ex); if (qr) qr->deactivate(); // QueueReplicator's bridge is now queued for destruction but may not // actually be destroyed, deleting the exhange broker.getExchanges().destroy(rname); broker.deleteQueue(name, values[USER].asString(), values[RHOST].asString()); } } void BrokerReplicator::doEventExchangeDeclare(Variant::Map& values) { Variant::Map argsMap(asMapVoid(values[ARGS])); if (values[DISP] == CREATED && replicateLevel(argsMap)) { string name = values[EXNAME].asString(); framing::FieldTable args; amqp_0_10::translate(argsMap, args); if (broker.createExchange( name, values[EXTYPE].asString(), values[DURABLE].asBool(), values[ALTEX].asString(), args, values[USER].asString(), values[RHOST].asString()).second) { QPID_LOG(debug, "HA: Backup created exchange: " << name); } else { // FIXME aconway 2011-11-22: should delete pre-exisitng exchange // and re-create from event. See comment in doEventQueueDeclare. QPID_LOG(warning, "HA: Backup exchange already exists: " << name); } } } void BrokerReplicator::doEventExchangeDelete(Variant::Map& values) { string name = values[EXNAME].asString(); try { boost::shared_ptr exchange = broker.getExchanges().find(name); if (exchange && replicateLevel(exchange->getArgs())) { QPID_LOG(debug, "HA: Backup deleting exchange:" << name); broker.deleteExchange( name, values[USER].asString(), values[RHOST].asString()); } } catch (const framing::NotFoundException&) {} } void BrokerReplicator::doEventBind(Variant::Map& values) { boost::shared_ptr exchange = broker.getExchanges().find(values[EXNAME].asString()); boost::shared_ptr queue = broker.getQueues().find(values[QNAME].asString()); // We only replicate binds for a replicated queue to replicated // exchange that both exist locally. if (exchange && replicateLevel(exchange->getArgs()) && queue && replicateLevel(queue->getSettings())) { framing::FieldTable args; amqp_0_10::translate(asMapVoid(values[ARGS]), args); string key = values[KEY].asString(); QPID_LOG(debug, "HA: Backup replicated binding exchange=" << exchange->getName() << " queue=" << queue->getName() << " key=" << key); exchange->bind(queue, key, &args); } } void BrokerReplicator::doEventUnbind(Variant::Map& values) { boost::shared_ptr exchange = broker.getExchanges().find(values[EXNAME].asString()); boost::shared_ptr queue = broker.getQueues().find(values[QNAME].asString()); // We only replicate unbinds for a replicated queue to replicated // exchange that both exist locally. if (exchange && replicateLevel(exchange->getArgs()) && queue && replicateLevel(queue->getSettings())) { framing::FieldTable args; amqp_0_10::translate(asMapVoid(values[ARGS]), args); string key = values[KEY].asString(); QPID_LOG(debug, "HA: Backup replicated unbinding exchange=" << exchange->getName() << " queue=" << queue->getName() << " key=" << key); exchange->unbind(queue, key, &args); } } void BrokerReplicator::doResponseQueue(Variant::Map& values) { // FIXME aconway 2011-11-22: more flexible ways & defaults to indicate replication Variant::Map argsMap(asMapVoid(values[ARGUMENTS])); if (!replicateLevel(argsMap)) return; framing::FieldTable args; amqp_0_10::translate(argsMap, args); string name(values[NAME].asString()); std::pair, bool> result = broker.createQueue( name, values[DURABLE].asBool(), values[AUTODELETE].asBool(), 0 /*i.e. no owner regardless of exclusivity on master*/, ""/*TODO: need to include alternate-exchange*/, args, ""/*TODO: who is the user?*/, ""/*TODO: what should we use as connection id?*/); if (result.second) { QPID_LOG(debug, "HA: Backup created catch-up queue: " << values[NAME]); startQueueReplicator(result.first); } else { // FIXME aconway 2011-11-22: Normal to find queue already // exists if we're failing over. QPID_LOG(warning, "HA: Backup catch-up queue already exists: " << name); } } void BrokerReplicator::doResponseExchange(Variant::Map& values) { Variant::Map argsMap(asMapVoid(values[ARGUMENTS])); if (!replicateLevel(argsMap)) return; framing::FieldTable args; amqp_0_10::translate(argsMap, args); if (broker.createExchange( values[NAME].asString(), values[TYPE].asString(), values[DURABLE].asBool(), ""/*TODO: need to include alternate-exchange*/, args, ""/*TODO: who is the user?*/, ""/*TODO: what should we use as connection id?*/).second) { QPID_LOG(debug, "HA: Backup catch-up exchange: " << values[NAME]); } else { QPID_LOG(warning, "HA: Backup catch-up exchange already exists: " << values[QNAME]); } } namespace { const std::string QUEUE_REF_PREFIX("org.apache.qpid.broker:queue:"); const std::string EXCHANGE_REF_PREFIX("org.apache.qpid.broker:exchange:"); std::string getRefName(const std::string& prefix, const Variant& ref) { Variant::Map map(ref.asMap()); Variant::Map::const_iterator i = map.find(OBJECT_NAME); if (i == map.end()) throw Exception(QPID_MSG("Replicator: invalid object reference: " << ref)); const std::string name = i->second.asString(); if (name.compare(0, prefix.size(), prefix) != 0) throw Exception(QPID_MSG("Replicator: unexpected reference prefix: " << name)); std::string ret = name.substr(prefix.size()); return ret; } const std::string EXCHANGE_REF("exchangeRef"); const std::string QUEUE_REF("queueRef"); } // namespace void BrokerReplicator::doResponseBind(Variant::Map& values) { std::string exName = getRefName(EXCHANGE_REF_PREFIX, values[EXCHANGE_REF]); std::string qName = getRefName(QUEUE_REF_PREFIX, values[QUEUE_REF]); boost::shared_ptr exchange = broker.getExchanges().find(exName); boost::shared_ptr queue = broker.getQueues().find(qName); // FIXME aconway 2011-11-24: more flexible configuration for binding replication. // Automatically replicate binding if queue and exchange exist and are replicated if (exchange && replicateLevel(exchange->getArgs()) && queue && replicateLevel(queue->getSettings())) { framing::FieldTable args; amqp_0_10::translate(asMapVoid(values[ARGUMENTS]), args); string key = values[KEY].asString(); exchange->bind(queue, key, &args); QPID_LOG(debug, "HA: Backup catch-up binding: exchange=" << exchange->getName() << " queue=" << queue->getName() << " key=" << key); } } void BrokerReplicator::startQueueReplicator(const boost::shared_ptr& queue) { if (replicateLevel(queue->getSettings()) == RL_MESSAGES) { boost::shared_ptr qr(new QueueReplicator(queue, link)); broker.getExchanges().registerExchange(qr); qr->activate(); } } bool BrokerReplicator::bind(boost::shared_ptr, const string&, const framing::FieldTable*) { return false; } bool BrokerReplicator::unbind(boost::shared_ptr, const string&, const framing::FieldTable*) { return false; } bool BrokerReplicator::isBound(boost::shared_ptr, const string* const, const framing::FieldTable* const) { return false; } string BrokerReplicator::getType() const { return QPID_CONFIGURATION_REPLICATOR; } }} // namespace broker