From ee303e9ced411b54d2c03ed3d5d913a0d168d662 Mon Sep 17 00:00:00 2001 From: Alan Conway Date: Fri, 17 Feb 2012 14:11:58 +0000 Subject: QPID-3603: Rename WiringReplicator to BrokerReplicator. git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/qpid-3603-7@1245520 13f79535-47bb-0310-9956-ffa450edef68 --- qpid/cpp/src/CMakeLists.txt | 4 +- qpid/cpp/src/ha.mk | 4 +- qpid/cpp/src/qpid/broker/Bridge.cpp | 2 +- qpid/cpp/src/qpid/ha/Backup.cpp | 4 +- qpid/cpp/src/qpid/ha/BrokerReplicator.cpp | 468 ++++++++++++++++++++++++++++++ qpid/cpp/src/qpid/ha/BrokerReplicator.h | 81 ++++++ qpid/cpp/src/qpid/ha/WiringReplicator.cpp | 468 ------------------------------ qpid/cpp/src/qpid/ha/WiringReplicator.h | 81 ------ 8 files changed, 556 insertions(+), 556 deletions(-) create mode 100644 qpid/cpp/src/qpid/ha/BrokerReplicator.cpp create mode 100644 qpid/cpp/src/qpid/ha/BrokerReplicator.h delete mode 100644 qpid/cpp/src/qpid/ha/WiringReplicator.cpp delete mode 100644 qpid/cpp/src/qpid/ha/WiringReplicator.h diff --git a/qpid/cpp/src/CMakeLists.txt b/qpid/cpp/src/CMakeLists.txt index 26bc303cc7..1d963850ca 100644 --- a/qpid/cpp/src/CMakeLists.txt +++ b/qpid/cpp/src/CMakeLists.txt @@ -611,8 +611,8 @@ if (BUILD_HA) qpid/ha/QueueReplicator.cpp qpid/ha/ReplicatingSubscription.h qpid/ha/ReplicatingSubscription.cpp - qpid/ha/WiringReplicator.cpp - qpid/ha/WiringReplicator.h + qpid/ha/BrokerReplicator.cpp + qpid/ha/BrokerReplicator.h ) add_library (ha MODULE ${ha_SOURCES}) diff --git a/qpid/cpp/src/ha.mk b/qpid/cpp/src/ha.mk index d367ba2101..272fdbf296 100644 --- a/qpid/cpp/src/ha.mk +++ b/qpid/cpp/src/ha.mk @@ -33,8 +33,8 @@ ha_la_SOURCES = \ qpid/ha/QueueReplicator.cpp \ qpid/ha/ReplicatingSubscription.h \ qpid/ha/ReplicatingSubscription.cpp \ - qpid/ha/WiringReplicator.cpp \ - qpid/ha/WiringReplicator.h + qpid/ha/BrokerReplicator.cpp \ + qpid/ha/BrokerReplicator.h ha_la_LIBADD = libqpidbroker.la ha_la_LDFLAGS = $(PLUGINLDFLAGS) diff --git a/qpid/cpp/src/qpid/broker/Bridge.cpp b/qpid/cpp/src/qpid/broker/Bridge.cpp index 1d75fc0138..9a1f4be468 100644 --- a/qpid/cpp/src/qpid/broker/Bridge.cpp +++ b/qpid/cpp/src/qpid/broker/Bridge.cpp @@ -24,7 +24,7 @@ #include "qpid/broker/Connection.h" #include "qpid/broker/Link.h" #include "qpid/broker/LinkRegistry.h" -#include "qpid/ha/WiringReplicator.h" +#include "qpid/ha/BrokerReplicator.h" #include "qpid/broker/SessionState.h" #include "qpid/management/ManagementAgent.h" diff --git a/qpid/cpp/src/qpid/ha/Backup.cpp b/qpid/cpp/src/qpid/ha/Backup.cpp index 48ece928b6..3476af3fe3 100644 --- a/qpid/cpp/src/qpid/ha/Backup.cpp +++ b/qpid/cpp/src/qpid/ha/Backup.cpp @@ -20,7 +20,7 @@ */ #include "Backup.h" #include "Settings.h" -#include "WiringReplicator.h" +#include "BrokerReplicator.h" #include "ReplicatingSubscription.h" #include "qpid/Url.h" #include "qpid/amqp_0_10/Codecs.h" @@ -54,7 +54,7 @@ Backup::Backup(broker::Broker& b, const Settings& s) : broker(b), settings(s) { s.mechanism, s.username, s.password); assert(result.second); // FIXME aconway 2011-11-23: error handling link = result.first; - boost::shared_ptr wr(new WiringReplicator(link)); + boost::shared_ptr wr(new BrokerReplicator(link)); broker.getExchanges().registerExchange(wr); } diff --git a/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp b/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp new file mode 100644 index 0000000000..7fd224d753 --- /dev/null +++ b/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp @@ -0,0 +1,468 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "BrokerReplicator.h" +#include "QueueReplicator.h" +#include "qpid/broker/Broker.h" +#include "qpid/broker/Queue.h" +#include "qpid/broker/Link.h" +#include "qpid/framing/FieldTable.h" +#include "qpid/log/Statement.h" +#include "qpid/amqp_0_10/Codecs.h" +#include "qpid/broker/SessionHandler.h" +#include "qpid/framing/reply_exceptions.h" +#include "qpid/framing/MessageTransferBody.h" +#include "qmf/org/apache/qpid/broker/EventBind.h" +#include "qmf/org/apache/qpid/broker/EventExchangeDeclare.h" +#include "qmf/org/apache/qpid/broker/EventExchangeDelete.h" +#include "qmf/org/apache/qpid/broker/EventQueueDeclare.h" +#include "qmf/org/apache/qpid/broker/EventQueueDelete.h" +#include "qmf/org/apache/qpid/broker/EventSubscribe.h" + +namespace qpid { +namespace ha { + +using qmf::org::apache::qpid::broker::EventBind; +using qmf::org::apache::qpid::broker::EventExchangeDeclare; +using qmf::org::apache::qpid::broker::EventExchangeDelete; +using qmf::org::apache::qpid::broker::EventQueueDeclare; +using qmf::org::apache::qpid::broker::EventQueueDelete; +using qmf::org::apache::qpid::broker::EventSubscribe; +using namespace framing; +using std::string; +using types::Variant; +using namespace broker; + +namespace { + +const string QPID_WIRING_REPLICATOR("qpid.wiring-replicator"); +const string QPID_REPLICATE("qpid.replicate"); + +const string CLASS_NAME("_class_name"); +const string EVENT("_event"); +const string OBJECT_NAME("_object_name"); +const string PACKAGE_NAME("_package_name"); +const string QUERY_RESPONSE("_query_response"); +const string SCHEMA_ID("_schema_id"); +const string VALUES("_values"); + +const string ALTEX("altEx"); +const string ARGS("args"); +const string ARGUMENTS("arguments"); +const string AUTODEL("autoDel"); +const string AUTODELETE("autoDelete"); +const string BIND("bind"); +const string BINDING("binding"); +const string CREATED("created"); +const string DISP("disp"); +const string DURABLE("durable"); +const string EXCHANGE("exchange"); +const string EXNAME("exName"); +const string EXTYPE("exType"); +const string KEY("key"); +const string NAME("name"); +const string QNAME("qName"); +const string QUEUE("queue"); +const string RHOST("rhost"); +const string TYPE("type"); +const string USER("user"); + +const string AGENT_IND_EVENT_ORG_APACHE_QPID_BROKER("agent.ind.event.org_apache_qpid_broker.#"); +const string QMF2("qmf2"); +const string QMF_CONTENT("qmf.content"); +const string QMF_DEFAULT_TOPIC("qmf.default.topic"); +const string QMF_OPCODE("qmf.opcode"); + +const string _WHAT("_what"); +const string _CLASS_NAME("_class_name"); +const string _PACKAGE_NAME("_package_name"); +const string _SCHEMA_ID("_schema_id"); +const string OBJECT("OBJECT"); +const string ORG_APACHE_QPID_BROKER("org.apache.qpid.broker"); +const string QMF_DEFAULT_DIRECT("qmf.default.direct"); +const string _QUERY_REQUEST("_query_request"); +const string BROKER("broker"); + +bool isQMFv2(const Message& message) { + const framing::MessageProperties* props = message.getProperties(); + return props && props->getAppId() == QMF2; +} + +template bool match(Variant::Map& schema) { + return T::match(schema[CLASS_NAME], schema[PACKAGE_NAME]); +} + +enum ReplicateLevel { RL_NONE=0, RL_WIRING, RL_ALL }; +const string S_NONE="none"; +const string S_WIRING="wiring"; +const string S_ALL="all"; + +ReplicateLevel replicateLevel(const string& str) { + ReplicateLevel rl = RL_NONE; + if (str == S_WIRING) rl = RL_WIRING; + else if (str == S_ALL) rl = RL_ALL; + return rl; +} + +ReplicateLevel replicateLevel(const framing::FieldTable& f) { + if (f.isSet(QPID_REPLICATE)) return replicateLevel(f.getAsString(QPID_REPLICATE)); + else return RL_NONE; +} + +ReplicateLevel replicateLevel(const Variant::Map& m) { + Variant::Map::const_iterator i = m.find(QPID_REPLICATE); + if (i != m.end()) return replicateLevel(i->second.asString()); + else return RL_NONE; +} + +void sendQuery(const string className, const string& queueName, SessionHandler& sessionHandler) { + framing::AMQP_ServerProxy peer(sessionHandler.out); + Variant::Map request; + request[_WHAT] = OBJECT; + Variant::Map schema; + schema[_CLASS_NAME] = className; + schema[_PACKAGE_NAME] = ORG_APACHE_QPID_BROKER; + request[_SCHEMA_ID] = schema; + + AMQFrame method((MessageTransferBody(ProtocolVersion(), QMF_DEFAULT_DIRECT, 0, 0))); + method.setBof(true); + method.setEof(false); + method.setBos(true); + method.setEos(true); + AMQHeaderBody headerBody; + MessageProperties* props = headerBody.get(true); + props->setReplyTo(qpid::framing::ReplyTo("", queueName)); + props->setAppId(QMF2); + props->getApplicationHeaders().setString(QMF_OPCODE, _QUERY_REQUEST); + headerBody.get(true)->setRoutingKey(BROKER); + AMQFrame header(headerBody); + header.setBof(false); + header.setEof(false); + header.setBos(true); + header.setEos(true); + AMQContentBody data; + qpid::amqp_0_10::MapCodec::encode(request, data.getData()); + AMQFrame content(data); + content.setBof(false); + content.setEof(true); + content.setBos(true); + content.setEos(true); + sessionHandler.out->handle(method); + sessionHandler.out->handle(header); + sessionHandler.out->handle(content); +} +} // namespace + +BrokerReplicator::~BrokerReplicator() {} + +BrokerReplicator::BrokerReplicator(const boost::shared_ptr& l) + : Exchange(QPID_WIRING_REPLICATOR), broker(*l->getBroker()), link(l) +{ + QPID_LOG(info, "HA: Backup replicating from " << + link->getTransport() << ":" << link->getHost() << ":" << link->getPort()); + broker.getLinks().declare( + link->getHost(), link->getPort(), + false, // durable + QPID_WIRING_REPLICATOR, // src + QPID_WIRING_REPLICATOR, // dest + "", // key + false, // isQueue + false, // isLocal + "", // id/tag + "", // excludes + false, // dynamic + 0, // sync? + boost::bind(&BrokerReplicator::initializeBridge, this, _1, _2) + ); +} + +// This is called in the connection IO thread when the bridge is started. +void BrokerReplicator::initializeBridge(Bridge& bridge, SessionHandler& sessionHandler) { + framing::AMQP_ServerProxy peer(sessionHandler.out); + string queueName = bridge.getQueueName(); + const qmf::org::apache::qpid::broker::ArgsLinkBridge& args(bridge.getArgs()); + + //declare and bind an event queue + peer.getQueue().declare(queueName, "", false, false, true, true, FieldTable()); + peer.getExchange().bind(queueName, QMF_DEFAULT_TOPIC, AGENT_IND_EVENT_ORG_APACHE_QPID_BROKER, FieldTable()); + //subscribe to the queue + peer.getMessage().subscribe(queueName, args.i_dest, 1, 0, false, "", 0, FieldTable()); + peer.getMessage().flow(args.i_dest, 0, 0xFFFFFFFF); + peer.getMessage().flow(args.i_dest, 1, 0xFFFFFFFF); + + //issue a query request for queues and another for exchanges using event queue as the reply-to address + sendQuery(QUEUE, queueName, sessionHandler); + sendQuery(EXCHANGE, queueName, sessionHandler); + sendQuery(BINDING, queueName, sessionHandler); + QPID_LOG(debug, "HA: Backup activated wiring bridge: " << queueName); +} + +// FIXME aconway 2011-12-02: error handling in route. +void BrokerReplicator::route(Deliverable& msg, const string& /*key*/, const framing::FieldTable* headers) { + Variant::List list; + try { + if (!isQMFv2(msg.getMessage()) || !headers) + throw Exception("Unexpected message, not QMF2 event or query response."); + // decode as list + string content = msg.getMessage().getFrames().getContent(); + amqp_0_10::ListCodec::decode(content, list); + + if (headers->getAsString(QMF_CONTENT) == EVENT) { + for (Variant::List::iterator i = list.begin(); i != list.end(); ++i) { + Variant::Map& map = i->asMap(); + Variant::Map& schema = map[SCHEMA_ID].asMap(); + Variant::Map& values = map[VALUES].asMap(); + if (match(schema)) doEventQueueDeclare(values); + else if (match(schema)) doEventQueueDelete(values); + else if (match(schema)) doEventExchangeDeclare(values); + else if (match(schema)) doEventExchangeDelete(values); + else if (match(schema)) doEventBind(values); + // FIXME aconway 2011-11-21: handle unbind & all other relevant events. + } + } else if (headers->getAsString(QMF_OPCODE) == QUERY_RESPONSE) { + for (Variant::List::iterator i = list.begin(); i != list.end(); ++i) { + string type = i->asMap()[SCHEMA_ID].asMap()[CLASS_NAME]; + Variant::Map& values = i->asMap()[VALUES].asMap(); + framing::FieldTable args; + amqp_0_10::translate(values[ARGUMENTS].asMap(), args); + if (type == QUEUE) doResponseQueue(values); + else if (type == EXCHANGE) doResponseExchange(values); + else if (type == BINDING) doResponseBind(values); + else QPID_LOG(error, "HA: Backup received unknown response: type=" << type + << " values=" << values); + + // FIXME aconway 2011-12-06: handle all relevant response types. + } + } else QPID_LOG(error, "HA: Backup received unexpected message: " << *headers); + } catch (const std::exception& e) { + QPID_LOG(error, "HA: Backup replication error: " << e.what() << ": while handling: " << list); + } +} + +void BrokerReplicator::doEventQueueDeclare(Variant::Map& values) { + string name = values[QNAME].asString(); + Variant::Map argsMap = values[ARGS].asMap(); + if (values[DISP] == CREATED && replicateLevel(argsMap)) { + framing::FieldTable args; + amqp_0_10::translate(argsMap, args); + std::pair, bool> result = + broker.createQueue( + name, + values[DURABLE].asBool(), + values[AUTODEL].asBool(), + 0 /*i.e. no owner regardless of exclusivity on master*/, + values[ALTEX].asString(), + args, + values[USER].asString(), + values[RHOST].asString()); + if (result.second) { + // FIXME aconway 2011-11-22: should delete old queue and + // re-create from event. + // Events are always up to date, whereas responses may be + // out of date. + QPID_LOG(debug, "HA: Backup created queue: " << name); + startQueueReplicator(result.first); + } else { + // FIXME aconway 2011-12-02: what's the right way to handle this? + QPID_LOG(warning, "HA: Backup queue already exists: " << name); + } + } +} + +void BrokerReplicator::doEventQueueDelete(Variant::Map& values) { + // The remote queue has already been deleted so replicator + // sessions may be closed by a "queue deleted" exception. + string name = values[QNAME].asString(); + boost::shared_ptr queue = broker.getQueues().find(name); + if (queue && replicateLevel(queue->getSettings())) { + QPID_LOG(debug, "HA: Backup deleting queue: " << name); + string rname = QueueReplicator::replicatorName(name); + boost::shared_ptr ex = broker.getExchanges().find(rname); + boost::shared_ptr qr = boost::dynamic_pointer_cast(ex); + if (qr) qr->deactivate(); + // QueueReplicator's bridge is now queued for destruction but may not + // actually be destroyed, deleting the exhange + broker.getExchanges().destroy(rname); + broker.deleteQueue(name, values[USER].asString(), values[RHOST].asString()); + } +} + +void BrokerReplicator::doEventExchangeDeclare(Variant::Map& values) { + Variant::Map argsMap(values[ARGS].asMap()); + if (values[DISP] == CREATED && replicateLevel(argsMap)) { + string name = values[EXNAME].asString(); + framing::FieldTable args; + amqp_0_10::translate(argsMap, args); + if (broker.createExchange( + name, + values[EXTYPE].asString(), + values[DURABLE].asBool(), + values[ALTEX].asString(), + args, + values[USER].asString(), + values[RHOST].asString()).second) + { + QPID_LOG(debug, "HA: Backup created exchange: " << name); + } else { + // FIXME aconway 2011-11-22: should delete pre-exisitng exchange + // and re-create from event. See comment in doEventQueueDeclare. + QPID_LOG(warning, "HA: Backup exchange already exists: " << name); + } + } +} + +void BrokerReplicator::doEventExchangeDelete(Variant::Map& values) { + string name = values[EXNAME].asString(); + try { + boost::shared_ptr exchange = broker.getExchanges().find(name); + if (exchange && replicateLevel(exchange->getArgs())) { + QPID_LOG(debug, "HA: Backup deleting exchange:" << name); + broker.deleteExchange( + name, + values[USER].asString(), + values[RHOST].asString()); + } + } catch (const framing::NotFoundException&) {} +} + +void BrokerReplicator::doEventBind(Variant::Map& values) { + boost::shared_ptr exchange = + broker.getExchanges().find(values[EXNAME].asString()); + boost::shared_ptr queue = + broker.getQueues().find(values[QNAME].asString()); + // We only replicate binds for a replicated queue to replicated + // exchange that both exist locally. + if (exchange && replicateLevel(exchange->getArgs()) && + queue && replicateLevel(queue->getSettings())) + { + framing::FieldTable args; + amqp_0_10::translate(values[ARGS].asMap(), args); + string key = values[KEY].asString(); + QPID_LOG(debug, "HA: Backup replicated binding exchange=" << exchange->getName() + << " queue=" << queue->getName() + << " key=" << key); + exchange->bind(queue, key, &args); + } +} + +void BrokerReplicator::doResponseQueue(Variant::Map& values) { + // FIXME aconway 2011-11-22: more flexible ways & defaults to indicate replication + Variant::Map argsMap(values[ARGUMENTS].asMap()); + if (!replicateLevel(argsMap)) return; + framing::FieldTable args; + amqp_0_10::translate(argsMap, args); + string name(values[NAME].asString()); + std::pair, bool> result = + broker.createQueue( + name, + values[DURABLE].asBool(), + values[AUTODELETE].asBool(), + 0 /*i.e. no owner regardless of exclusivity on master*/, + ""/*TODO: need to include alternate-exchange*/, + args, + ""/*TODO: who is the user?*/, + ""/*TODO: what should we use as connection id?*/); + if (result.second) { + QPID_LOG(debug, "HA: Backup created catch-up queue: " << values[NAME]); + startQueueReplicator(result.first); + } else { + // FIXME aconway 2011-11-22: Normal to find queue already + // exists if we're failing over. + QPID_LOG(warning, "HA: Backup catch-up queue already exists: " << name); + } +} + +void BrokerReplicator::doResponseExchange(Variant::Map& values) { + Variant::Map argsMap(values[ARGUMENTS].asMap()); + if (!replicateLevel(argsMap)) return; + framing::FieldTable args; + amqp_0_10::translate(argsMap, args); + if (broker.createExchange( + values[NAME].asString(), + values[TYPE].asString(), + values[DURABLE].asBool(), + ""/*TODO: need to include alternate-exchange*/, + args, + ""/*TODO: who is the user?*/, + ""/*TODO: what should we use as connection id?*/).second) + { + QPID_LOG(debug, "HA: Backup catch-up exchange: " << values[NAME]); + } else { + QPID_LOG(warning, "HA: Backup catch-up exchange already exists: " << values[QNAME]); + } +} + +namespace { +const std::string QUEUE_REF_PREFIX("org.apache.qpid.broker:queue:"); +const std::string EXCHANGE_REF_PREFIX("org.apache.qpid.broker:exchange:"); + +std::string getRefName(const std::string& prefix, const Variant& ref) { + Variant::Map map(ref.asMap()); + Variant::Map::const_iterator i = map.find(OBJECT_NAME); + if (i == map.end()) + throw Exception(QPID_MSG("Replicator: invalid object reference: " << ref)); + const std::string name = i->second.asString(); + if (name.compare(0, prefix.size(), prefix) != 0) + throw Exception(QPID_MSG("Replicator: unexpected reference prefix: " << name)); + std::string ret = name.substr(prefix.size()); + return ret; +} + +const std::string EXCHANGE_REF("exchangeRef"); +const std::string QUEUE_REF("queueRef"); + +} // namespace + +void BrokerReplicator::doResponseBind(Variant::Map& values) { + std::string exName = getRefName(EXCHANGE_REF_PREFIX, values[EXCHANGE_REF]); + std::string qName = getRefName(QUEUE_REF_PREFIX, values[QUEUE_REF]); + boost::shared_ptr exchange = broker.getExchanges().find(exName); + boost::shared_ptr queue = broker.getQueues().find(qName); + // FIXME aconway 2011-11-24: more flexible configuration for binding replication. + + // Automatically replicate binding if queue and exchange exist and are replicated + if (exchange && replicateLevel(exchange->getArgs()) && + queue && replicateLevel(queue->getSettings())) + { + framing::FieldTable args; + amqp_0_10::translate(values[ARGUMENTS].asMap(), args); + string key = values[KEY].asString(); + exchange->bind(queue, key, &args); + QPID_LOG(debug, "HA: Backup catch-up binding: exchange=" << exchange->getName() + << " queue=" << queue->getName() + << " key=" << key); + } +} + +void BrokerReplicator::startQueueReplicator(const boost::shared_ptr& queue) { + if (replicateLevel(queue->getSettings()) == RL_ALL) { + boost::shared_ptr qr(new QueueReplicator(queue, link)); + broker.getExchanges().registerExchange(qr); + qr->activate(); + } +} + +bool BrokerReplicator::bind(boost::shared_ptr, const string&, const framing::FieldTable*) { return false; } +bool BrokerReplicator::unbind(boost::shared_ptr, const string&, const framing::FieldTable*) { return false; } +bool BrokerReplicator::isBound(boost::shared_ptr, const string* const, const framing::FieldTable* const) { return false; } + +string BrokerReplicator::getType() const { return QPID_WIRING_REPLICATOR; } + +}} // namespace broker diff --git a/qpid/cpp/src/qpid/ha/BrokerReplicator.h b/qpid/cpp/src/qpid/ha/BrokerReplicator.h new file mode 100644 index 0000000000..154dd340ac --- /dev/null +++ b/qpid/cpp/src/qpid/ha/BrokerReplicator.h @@ -0,0 +1,81 @@ +#ifndef QPID_HA_REPLICATOR_H +#define QPID_HA_REPLICATOR_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/broker/Exchange.h" +#include "qpid/types/Variant.h" +#include + +namespace qpid { + +namespace broker { +class Broker; +class Link; +class Bridge; +class SessionHandler; +} + +namespace ha { + +/** + * Replicate wiring on a backup broker. + * + * Implemented as an exchange that subscribes to receive QMF + * configuration events from the primary. It configures local queues + * exchanges and bindings to replicate the primary. + * It also creates QueueReplicators for newly replicated queues. + * + * THREAD SAFE: Has no mutable state. + * + */ +class BrokerReplicator : public broker::Exchange +{ + public: + BrokerReplicator(const boost::shared_ptr&); + ~BrokerReplicator(); + std::string getType() const; + + // Exchange methods + bool bind(boost::shared_ptr, const std::string&, const framing::FieldTable*); + bool unbind(boost::shared_ptr, const std::string&, const framing::FieldTable*); + void route(broker::Deliverable&, const std::string&, const framing::FieldTable*); + bool isBound(boost::shared_ptr, const std::string* const, const framing::FieldTable* const); + + private: + void initializeBridge(broker::Bridge&, broker::SessionHandler&); + void doEventQueueDeclare(types::Variant::Map& values); + void doEventQueueDelete(types::Variant::Map& values); + void doEventExchangeDeclare(types::Variant::Map& values); + void doEventExchangeDelete(types::Variant::Map& values); + void doEventBind(types::Variant::Map&); + void doResponseQueue(types::Variant::Map& values); + void doResponseExchange(types::Variant::Map& values); + void doResponseBind(types::Variant::Map& values); + void startQueueReplicator(const boost::shared_ptr&); + + broker::Broker& broker; + boost::shared_ptr link; +}; +}} // namespace qpid::broker + +#endif /*!QPID_HA_REPLICATOR_H*/ diff --git a/qpid/cpp/src/qpid/ha/WiringReplicator.cpp b/qpid/cpp/src/qpid/ha/WiringReplicator.cpp deleted file mode 100644 index b86b7cec4a..0000000000 --- a/qpid/cpp/src/qpid/ha/WiringReplicator.cpp +++ /dev/null @@ -1,468 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include "WiringReplicator.h" -#include "QueueReplicator.h" -#include "qpid/broker/Broker.h" -#include "qpid/broker/Queue.h" -#include "qpid/broker/Link.h" -#include "qpid/framing/FieldTable.h" -#include "qpid/log/Statement.h" -#include "qpid/amqp_0_10/Codecs.h" -#include "qpid/broker/SessionHandler.h" -#include "qpid/framing/reply_exceptions.h" -#include "qpid/framing/MessageTransferBody.h" -#include "qmf/org/apache/qpid/broker/EventBind.h" -#include "qmf/org/apache/qpid/broker/EventExchangeDeclare.h" -#include "qmf/org/apache/qpid/broker/EventExchangeDelete.h" -#include "qmf/org/apache/qpid/broker/EventQueueDeclare.h" -#include "qmf/org/apache/qpid/broker/EventQueueDelete.h" -#include "qmf/org/apache/qpid/broker/EventSubscribe.h" - -namespace qpid { -namespace ha { - -using qmf::org::apache::qpid::broker::EventBind; -using qmf::org::apache::qpid::broker::EventExchangeDeclare; -using qmf::org::apache::qpid::broker::EventExchangeDelete; -using qmf::org::apache::qpid::broker::EventQueueDeclare; -using qmf::org::apache::qpid::broker::EventQueueDelete; -using qmf::org::apache::qpid::broker::EventSubscribe; -using namespace framing; -using std::string; -using types::Variant; -using namespace broker; - -namespace { - -const string QPID_WIRING_REPLICATOR("qpid.wiring-replicator"); -const string QPID_REPLICATE("qpid.replicate"); - -const string CLASS_NAME("_class_name"); -const string EVENT("_event"); -const string OBJECT_NAME("_object_name"); -const string PACKAGE_NAME("_package_name"); -const string QUERY_RESPONSE("_query_response"); -const string SCHEMA_ID("_schema_id"); -const string VALUES("_values"); - -const string ALTEX("altEx"); -const string ARGS("args"); -const string ARGUMENTS("arguments"); -const string AUTODEL("autoDel"); -const string AUTODELETE("autoDelete"); -const string BIND("bind"); -const string BINDING("binding"); -const string CREATED("created"); -const string DISP("disp"); -const string DURABLE("durable"); -const string EXCHANGE("exchange"); -const string EXNAME("exName"); -const string EXTYPE("exType"); -const string KEY("key"); -const string NAME("name"); -const string QNAME("qName"); -const string QUEUE("queue"); -const string RHOST("rhost"); -const string TYPE("type"); -const string USER("user"); - -const string AGENT_IND_EVENT_ORG_APACHE_QPID_BROKER("agent.ind.event.org_apache_qpid_broker.#"); -const string QMF2("qmf2"); -const string QMF_CONTENT("qmf.content"); -const string QMF_DEFAULT_TOPIC("qmf.default.topic"); -const string QMF_OPCODE("qmf.opcode"); - -const string _WHAT("_what"); -const string _CLASS_NAME("_class_name"); -const string _PACKAGE_NAME("_package_name"); -const string _SCHEMA_ID("_schema_id"); -const string OBJECT("OBJECT"); -const string ORG_APACHE_QPID_BROKER("org.apache.qpid.broker"); -const string QMF_DEFAULT_DIRECT("qmf.default.direct"); -const string _QUERY_REQUEST("_query_request"); -const string BROKER("broker"); - -bool isQMFv2(const Message& message) { - const framing::MessageProperties* props = message.getProperties(); - return props && props->getAppId() == QMF2; -} - -template bool match(Variant::Map& schema) { - return T::match(schema[CLASS_NAME], schema[PACKAGE_NAME]); -} - -enum ReplicateLevel { RL_NONE=0, RL_WIRING, RL_ALL }; -const string S_NONE="none"; -const string S_WIRING="wiring"; -const string S_ALL="all"; - -ReplicateLevel replicateLevel(const string& str) { - ReplicateLevel rl = RL_NONE; - if (str == S_WIRING) rl = RL_WIRING; - else if (str == S_ALL) rl = RL_ALL; - return rl; -} - -ReplicateLevel replicateLevel(const framing::FieldTable& f) { - if (f.isSet(QPID_REPLICATE)) return replicateLevel(f.getAsString(QPID_REPLICATE)); - else return RL_NONE; -} - -ReplicateLevel replicateLevel(const Variant::Map& m) { - Variant::Map::const_iterator i = m.find(QPID_REPLICATE); - if (i != m.end()) return replicateLevel(i->second.asString()); - else return RL_NONE; -} - -void sendQuery(const string className, const string& queueName, SessionHandler& sessionHandler) { - framing::AMQP_ServerProxy peer(sessionHandler.out); - Variant::Map request; - request[_WHAT] = OBJECT; - Variant::Map schema; - schema[_CLASS_NAME] = className; - schema[_PACKAGE_NAME] = ORG_APACHE_QPID_BROKER; - request[_SCHEMA_ID] = schema; - - AMQFrame method((MessageTransferBody(ProtocolVersion(), QMF_DEFAULT_DIRECT, 0, 0))); - method.setBof(true); - method.setEof(false); - method.setBos(true); - method.setEos(true); - AMQHeaderBody headerBody; - MessageProperties* props = headerBody.get(true); - props->setReplyTo(qpid::framing::ReplyTo("", queueName)); - props->setAppId(QMF2); - props->getApplicationHeaders().setString(QMF_OPCODE, _QUERY_REQUEST); - headerBody.get(true)->setRoutingKey(BROKER); - AMQFrame header(headerBody); - header.setBof(false); - header.setEof(false); - header.setBos(true); - header.setEos(true); - AMQContentBody data; - qpid::amqp_0_10::MapCodec::encode(request, data.getData()); - AMQFrame content(data); - content.setBof(false); - content.setEof(true); - content.setBos(true); - content.setEos(true); - sessionHandler.out->handle(method); - sessionHandler.out->handle(header); - sessionHandler.out->handle(content); -} -} // namespace - -WiringReplicator::~WiringReplicator() {} - -WiringReplicator::WiringReplicator(const boost::shared_ptr& l) - : Exchange(QPID_WIRING_REPLICATOR), broker(*l->getBroker()), link(l) -{ - QPID_LOG(info, "HA: Backup replicating from " << - link->getTransport() << ":" << link->getHost() << ":" << link->getPort()); - broker.getLinks().declare( - link->getHost(), link->getPort(), - false, // durable - QPID_WIRING_REPLICATOR, // src - QPID_WIRING_REPLICATOR, // dest - "", // key - false, // isQueue - false, // isLocal - "", // id/tag - "", // excludes - false, // dynamic - 0, // sync? - boost::bind(&WiringReplicator::initializeBridge, this, _1, _2) - ); -} - -// This is called in the connection IO thread when the bridge is started. -void WiringReplicator::initializeBridge(Bridge& bridge, SessionHandler& sessionHandler) { - framing::AMQP_ServerProxy peer(sessionHandler.out); - string queueName = bridge.getQueueName(); - const qmf::org::apache::qpid::broker::ArgsLinkBridge& args(bridge.getArgs()); - - //declare and bind an event queue - peer.getQueue().declare(queueName, "", false, false, true, true, FieldTable()); - peer.getExchange().bind(queueName, QMF_DEFAULT_TOPIC, AGENT_IND_EVENT_ORG_APACHE_QPID_BROKER, FieldTable()); - //subscribe to the queue - peer.getMessage().subscribe(queueName, args.i_dest, 1, 0, false, "", 0, FieldTable()); - peer.getMessage().flow(args.i_dest, 0, 0xFFFFFFFF); - peer.getMessage().flow(args.i_dest, 1, 0xFFFFFFFF); - - //issue a query request for queues and another for exchanges using event queue as the reply-to address - sendQuery(QUEUE, queueName, sessionHandler); - sendQuery(EXCHANGE, queueName, sessionHandler); - sendQuery(BINDING, queueName, sessionHandler); - QPID_LOG(debug, "HA: Backup activated wiring bridge: " << queueName); -} - -// FIXME aconway 2011-12-02: error handling in route. -void WiringReplicator::route(Deliverable& msg, const string& /*key*/, const framing::FieldTable* headers) { - Variant::List list; - try { - if (!isQMFv2(msg.getMessage()) || !headers) - throw Exception("Unexpected message, not QMF2 event or query response."); - // decode as list - string content = msg.getMessage().getFrames().getContent(); - amqp_0_10::ListCodec::decode(content, list); - - if (headers->getAsString(QMF_CONTENT) == EVENT) { - for (Variant::List::iterator i = list.begin(); i != list.end(); ++i) { - Variant::Map& map = i->asMap(); - Variant::Map& schema = map[SCHEMA_ID].asMap(); - Variant::Map& values = map[VALUES].asMap(); - if (match(schema)) doEventQueueDeclare(values); - else if (match(schema)) doEventQueueDelete(values); - else if (match(schema)) doEventExchangeDeclare(values); - else if (match(schema)) doEventExchangeDelete(values); - else if (match(schema)) doEventBind(values); - // FIXME aconway 2011-11-21: handle unbind & all other relevant events. - } - } else if (headers->getAsString(QMF_OPCODE) == QUERY_RESPONSE) { - for (Variant::List::iterator i = list.begin(); i != list.end(); ++i) { - string type = i->asMap()[SCHEMA_ID].asMap()[CLASS_NAME]; - Variant::Map& values = i->asMap()[VALUES].asMap(); - framing::FieldTable args; - amqp_0_10::translate(values[ARGUMENTS].asMap(), args); - if (type == QUEUE) doResponseQueue(values); - else if (type == EXCHANGE) doResponseExchange(values); - else if (type == BINDING) doResponseBind(values); - else QPID_LOG(error, "HA: Backup received unknown response: type=" << type - << " values=" << values); - - // FIXME aconway 2011-12-06: handle all relevant response types. - } - } else QPID_LOG(error, "HA: Backup received unexpected message: " << *headers); - } catch (const std::exception& e) { - QPID_LOG(error, "HA: Backup replication error: " << e.what() << ": while handling: " << list); - } -} - -void WiringReplicator::doEventQueueDeclare(Variant::Map& values) { - string name = values[QNAME].asString(); - Variant::Map argsMap = values[ARGS].asMap(); - if (values[DISP] == CREATED && replicateLevel(argsMap)) { - framing::FieldTable args; - amqp_0_10::translate(argsMap, args); - std::pair, bool> result = - broker.createQueue( - name, - values[DURABLE].asBool(), - values[AUTODEL].asBool(), - 0 /*i.e. no owner regardless of exclusivity on master*/, - values[ALTEX].asString(), - args, - values[USER].asString(), - values[RHOST].asString()); - if (result.second) { - // FIXME aconway 2011-11-22: should delete old queue and - // re-create from event. - // Events are always up to date, whereas responses may be - // out of date. - QPID_LOG(debug, "HA: Backup created queue: " << name); - startQueueReplicator(result.first); - } else { - // FIXME aconway 2011-12-02: what's the right way to handle this? - QPID_LOG(warning, "HA: Backup queue already exists: " << name); - } - } -} - -void WiringReplicator::doEventQueueDelete(Variant::Map& values) { - // The remote queue has already been deleted so replicator - // sessions may be closed by a "queue deleted" exception. - string name = values[QNAME].asString(); - boost::shared_ptr queue = broker.getQueues().find(name); - if (queue && replicateLevel(queue->getSettings())) { - QPID_LOG(debug, "HA: Backup deleting queue: " << name); - string rname = QueueReplicator::replicatorName(name); - boost::shared_ptr ex = broker.getExchanges().find(rname); - boost::shared_ptr qr = boost::dynamic_pointer_cast(ex); - if (qr) qr->deactivate(); - // QueueReplicator's bridge is now queued for destruction but may not - // actually be destroyed, deleting the exhange - broker.getExchanges().destroy(rname); - broker.deleteQueue(name, values[USER].asString(), values[RHOST].asString()); - } -} - -void WiringReplicator::doEventExchangeDeclare(Variant::Map& values) { - Variant::Map argsMap(values[ARGS].asMap()); - if (values[DISP] == CREATED && replicateLevel(argsMap)) { - string name = values[EXNAME].asString(); - framing::FieldTable args; - amqp_0_10::translate(argsMap, args); - if (broker.createExchange( - name, - values[EXTYPE].asString(), - values[DURABLE].asBool(), - values[ALTEX].asString(), - args, - values[USER].asString(), - values[RHOST].asString()).second) - { - QPID_LOG(debug, "HA: Backup created exchange: " << name); - } else { - // FIXME aconway 2011-11-22: should delete pre-exisitng exchange - // and re-create from event. See comment in doEventQueueDeclare. - QPID_LOG(warning, "HA: Backup exchange already exists: " << name); - } - } -} - -void WiringReplicator::doEventExchangeDelete(Variant::Map& values) { - string name = values[EXNAME].asString(); - try { - boost::shared_ptr exchange = broker.getExchanges().find(name); - if (exchange && replicateLevel(exchange->getArgs())) { - QPID_LOG(debug, "HA: Backup deleting exchange:" << name); - broker.deleteExchange( - name, - values[USER].asString(), - values[RHOST].asString()); - } - } catch (const framing::NotFoundException&) {} -} - -void WiringReplicator::doEventBind(Variant::Map& values) { - boost::shared_ptr exchange = - broker.getExchanges().find(values[EXNAME].asString()); - boost::shared_ptr queue = - broker.getQueues().find(values[QNAME].asString()); - // We only replicate binds for a replicated queue to replicated - // exchange that both exist locally. - if (exchange && replicateLevel(exchange->getArgs()) && - queue && replicateLevel(queue->getSettings())) - { - framing::FieldTable args; - amqp_0_10::translate(values[ARGS].asMap(), args); - string key = values[KEY].asString(); - QPID_LOG(debug, "HA: Backup replicated binding exchange=" << exchange->getName() - << " queue=" << queue->getName() - << " key=" << key); - exchange->bind(queue, key, &args); - } -} - -void WiringReplicator::doResponseQueue(Variant::Map& values) { - // FIXME aconway 2011-11-22: more flexible ways & defaults to indicate replication - Variant::Map argsMap(values[ARGUMENTS].asMap()); - if (!replicateLevel(argsMap)) return; - framing::FieldTable args; - amqp_0_10::translate(argsMap, args); - string name(values[NAME].asString()); - std::pair, bool> result = - broker.createQueue( - name, - values[DURABLE].asBool(), - values[AUTODELETE].asBool(), - 0 /*i.e. no owner regardless of exclusivity on master*/, - ""/*TODO: need to include alternate-exchange*/, - args, - ""/*TODO: who is the user?*/, - ""/*TODO: what should we use as connection id?*/); - if (result.second) { - QPID_LOG(debug, "HA: Backup created catch-up queue: " << values[NAME]); - startQueueReplicator(result.first); - } else { - // FIXME aconway 2011-11-22: Normal to find queue already - // exists if we're failing over. - QPID_LOG(warning, "HA: Backup catch-up queue already exists: " << name); - } -} - -void WiringReplicator::doResponseExchange(Variant::Map& values) { - Variant::Map argsMap(values[ARGUMENTS].asMap()); - if (!replicateLevel(argsMap)) return; - framing::FieldTable args; - amqp_0_10::translate(argsMap, args); - if (broker.createExchange( - values[NAME].asString(), - values[TYPE].asString(), - values[DURABLE].asBool(), - ""/*TODO: need to include alternate-exchange*/, - args, - ""/*TODO: who is the user?*/, - ""/*TODO: what should we use as connection id?*/).second) - { - QPID_LOG(debug, "HA: Backup catch-up exchange: " << values[NAME]); - } else { - QPID_LOG(warning, "HA: Backup catch-up exchange already exists: " << values[QNAME]); - } -} - -namespace { -const std::string QUEUE_REF_PREFIX("org.apache.qpid.broker:queue:"); -const std::string EXCHANGE_REF_PREFIX("org.apache.qpid.broker:exchange:"); - -std::string getRefName(const std::string& prefix, const Variant& ref) { - Variant::Map map(ref.asMap()); - Variant::Map::const_iterator i = map.find(OBJECT_NAME); - if (i == map.end()) - throw Exception(QPID_MSG("Replicator: invalid object reference: " << ref)); - const std::string name = i->second.asString(); - if (name.compare(0, prefix.size(), prefix) != 0) - throw Exception(QPID_MSG("Replicator: unexpected reference prefix: " << name)); - std::string ret = name.substr(prefix.size()); - return ret; -} - -const std::string EXCHANGE_REF("exchangeRef"); -const std::string QUEUE_REF("queueRef"); - -} // namespace - -void WiringReplicator::doResponseBind(Variant::Map& values) { - std::string exName = getRefName(EXCHANGE_REF_PREFIX, values[EXCHANGE_REF]); - std::string qName = getRefName(QUEUE_REF_PREFIX, values[QUEUE_REF]); - boost::shared_ptr exchange = broker.getExchanges().find(exName); - boost::shared_ptr queue = broker.getQueues().find(qName); - // FIXME aconway 2011-11-24: more flexible configuration for binding replication. - - // Automatically replicate binding if queue and exchange exist and are replicated - if (exchange && replicateLevel(exchange->getArgs()) && - queue && replicateLevel(queue->getSettings())) - { - framing::FieldTable args; - amqp_0_10::translate(values[ARGUMENTS].asMap(), args); - string key = values[KEY].asString(); - exchange->bind(queue, key, &args); - QPID_LOG(debug, "HA: Backup catch-up binding: exchange=" << exchange->getName() - << " queue=" << queue->getName() - << " key=" << key); - } -} - -void WiringReplicator::startQueueReplicator(const boost::shared_ptr& queue) { - if (replicateLevel(queue->getSettings()) == RL_ALL) { - boost::shared_ptr qr(new QueueReplicator(queue, link)); - broker.getExchanges().registerExchange(qr); - qr->activate(); - } -} - -bool WiringReplicator::bind(boost::shared_ptr, const string&, const framing::FieldTable*) { return false; } -bool WiringReplicator::unbind(boost::shared_ptr, const string&, const framing::FieldTable*) { return false; } -bool WiringReplicator::isBound(boost::shared_ptr, const string* const, const framing::FieldTable* const) { return false; } - -string WiringReplicator::getType() const { return QPID_WIRING_REPLICATOR; } - -}} // namespace broker diff --git a/qpid/cpp/src/qpid/ha/WiringReplicator.h b/qpid/cpp/src/qpid/ha/WiringReplicator.h deleted file mode 100644 index 32109d8368..0000000000 --- a/qpid/cpp/src/qpid/ha/WiringReplicator.h +++ /dev/null @@ -1,81 +0,0 @@ -#ifndef QPID_HA_REPLICATOR_H -#define QPID_HA_REPLICATOR_H - -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include "qpid/broker/Exchange.h" -#include "qpid/types/Variant.h" -#include - -namespace qpid { - -namespace broker { -class Broker; -class Link; -class Bridge; -class SessionHandler; -} - -namespace ha { - -/** - * Replicate wiring on a backup broker. - * - * Implemented as an exchange that subscribes to receive QMF - * configuration events from the primary. It configures local queues - * exchanges and bindings to replicate the primary. - * It also creates QueueReplicators for newly replicated queues. - * - * THREAD SAFE: Has no mutable state. - * - */ -class WiringReplicator : public broker::Exchange -{ - public: - WiringReplicator(const boost::shared_ptr&); - ~WiringReplicator(); - std::string getType() const; - - // Exchange methods - bool bind(boost::shared_ptr, const std::string&, const framing::FieldTable*); - bool unbind(boost::shared_ptr, const std::string&, const framing::FieldTable*); - void route(broker::Deliverable&, const std::string&, const framing::FieldTable*); - bool isBound(boost::shared_ptr, const std::string* const, const framing::FieldTable* const); - - private: - void initializeBridge(broker::Bridge&, broker::SessionHandler&); - void doEventQueueDeclare(types::Variant::Map& values); - void doEventQueueDelete(types::Variant::Map& values); - void doEventExchangeDeclare(types::Variant::Map& values); - void doEventExchangeDelete(types::Variant::Map& values); - void doEventBind(types::Variant::Map&); - void doResponseQueue(types::Variant::Map& values); - void doResponseExchange(types::Variant::Map& values); - void doResponseBind(types::Variant::Map& values); - void startQueueReplicator(const boost::shared_ptr&); - - broker::Broker& broker; - boost::shared_ptr link; -}; -}} // namespace qpid::broker - -#endif /*!QPID_HA_REPLICATOR_H*/ -- cgit v1.2.1