summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src/qpid/ha/BrokerReplicator.cpp')
-rw-r--r--qpid/cpp/src/qpid/ha/BrokerReplicator.cpp497
1 files changed, 497 insertions, 0 deletions
diff --git a/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp b/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
new file mode 100644
index 0000000000..a8f05c1fe3
--- /dev/null
+++ b/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
@@ -0,0 +1,497 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "BrokerReplicator.h"
+#include "QueueReplicator.h"
+#include "qpid/broker/Broker.h"
+#include "qpid/broker/Queue.h"
+#include "qpid/broker/Link.h"
+#include "qpid/framing/FieldTable.h"
+#include "qpid/log/Statement.h"
+#include "qpid/amqp_0_10/Codecs.h"
+#include "qpid/broker/SessionHandler.h"
+#include "qpid/framing/reply_exceptions.h"
+#include "qpid/framing/MessageTransferBody.h"
+#include "qmf/org/apache/qpid/broker/EventBind.h"
+#include "qmf/org/apache/qpid/broker/EventUnbind.h"
+#include "qmf/org/apache/qpid/broker/EventExchangeDeclare.h"
+#include "qmf/org/apache/qpid/broker/EventExchangeDelete.h"
+#include "qmf/org/apache/qpid/broker/EventQueueDeclare.h"
+#include "qmf/org/apache/qpid/broker/EventQueueDelete.h"
+#include "qmf/org/apache/qpid/broker/EventSubscribe.h"
+#include <algorithm>
+
+namespace qpid {
+namespace ha {
+
+using qmf::org::apache::qpid::broker::EventBind;
+using qmf::org::apache::qpid::broker::EventUnbind;
+using qmf::org::apache::qpid::broker::EventExchangeDeclare;
+using qmf::org::apache::qpid::broker::EventExchangeDelete;
+using qmf::org::apache::qpid::broker::EventQueueDeclare;
+using qmf::org::apache::qpid::broker::EventQueueDelete;
+using qmf::org::apache::qpid::broker::EventSubscribe;
+using namespace framing;
+using std::string;
+using types::Variant;
+using namespace broker;
+
+namespace {
+
+const string QPID_CONFIGURATION_REPLICATOR("qpid.configuration-replicator");
+const string QPID_REPLICATE("qpid.replicate");
+
+const string CLASS_NAME("_class_name");
+const string EVENT("_event");
+const string OBJECT_NAME("_object_name");
+const string PACKAGE_NAME("_package_name");
+const string QUERY_RESPONSE("_query_response");
+const string SCHEMA_ID("_schema_id");
+const string VALUES("_values");
+
+const string ALTEX("altEx");
+const string ARGS("args");
+const string ARGUMENTS("arguments");
+const string AUTODEL("autoDel");
+const string AUTODELETE("autoDelete");
+const string BIND("bind");
+const string UNBIND("unbind");
+const string BINDING("binding");
+const string CREATED("created");
+const string DISP("disp");
+const string DURABLE("durable");
+const string EXCHANGE("exchange");
+const string EXNAME("exName");
+const string EXTYPE("exType");
+const string KEY("key");
+const string NAME("name");
+const string QNAME("qName");
+const string QUEUE("queue");
+const string RHOST("rhost");
+const string TYPE("type");
+const string USER("user");
+
+const string AGENT_IND_EVENT_ORG_APACHE_QPID_BROKER("agent.ind.event.org_apache_qpid_broker.#");
+const string QMF2("qmf2");
+const string QMF_CONTENT("qmf.content");
+const string QMF_DEFAULT_TOPIC("qmf.default.topic");
+const string QMF_OPCODE("qmf.opcode");
+
+const string _WHAT("_what");
+const string _CLASS_NAME("_class_name");
+const string _PACKAGE_NAME("_package_name");
+const string _SCHEMA_ID("_schema_id");
+const string OBJECT("OBJECT");
+const string ORG_APACHE_QPID_BROKER("org.apache.qpid.broker");
+const string QMF_DEFAULT_DIRECT("qmf.default.direct");
+const string _QUERY_REQUEST("_query_request");
+const string BROKER("broker");
+
+bool isQMFv2(const Message& message) {
+ const framing::MessageProperties* props = message.getProperties<framing::MessageProperties>();
+ return props && props->getAppId() == QMF2;
+}
+
+template <class T> bool match(Variant::Map& schema) {
+ return T::match(schema[CLASS_NAME], schema[PACKAGE_NAME]);
+}
+
+enum ReplicateLevel { RL_NONE=0, RL_CONFIGURATION, RL_MESSAGES };
+const string S_NONE="none";
+const string S_CONFIGURATION="configuration";
+const string S_MESSAGES="messages";
+
+ReplicateLevel replicateLevel(const string& level) {
+ if (level == S_NONE) return RL_NONE;
+ if (level == S_CONFIGURATION) return RL_CONFIGURATION;
+ if (level == S_MESSAGES) return RL_MESSAGES;
+ throw Exception("Invalid value for "+QPID_REPLICATE+": "+level);
+}
+
+ReplicateLevel replicateLevel(const framing::FieldTable& f) {
+ if (f.isSet(QPID_REPLICATE)) return replicateLevel(f.getAsString(QPID_REPLICATE));
+ else return RL_NONE;
+}
+
+ReplicateLevel replicateLevel(const Variant::Map& m) {
+ Variant::Map::const_iterator i = m.find(QPID_REPLICATE);
+ if (i != m.end()) return replicateLevel(i->second.asString());
+ else return RL_NONE;
+}
+
+void sendQuery(const string className, const string& queueName, SessionHandler& sessionHandler) {
+ framing::AMQP_ServerProxy peer(sessionHandler.out);
+ Variant::Map request;
+ request[_WHAT] = OBJECT;
+ Variant::Map schema;
+ schema[_CLASS_NAME] = className;
+ schema[_PACKAGE_NAME] = ORG_APACHE_QPID_BROKER;
+ request[_SCHEMA_ID] = schema;
+
+ AMQFrame method((MessageTransferBody(ProtocolVersion(), QMF_DEFAULT_DIRECT, 0, 0)));
+ method.setBof(true);
+ method.setEof(false);
+ method.setBos(true);
+ method.setEos(true);
+ AMQHeaderBody headerBody;
+ MessageProperties* props = headerBody.get<MessageProperties>(true);
+ props->setReplyTo(qpid::framing::ReplyTo("", queueName));
+ props->setAppId(QMF2);
+ props->getApplicationHeaders().setString(QMF_OPCODE, _QUERY_REQUEST);
+ headerBody.get<qpid::framing::DeliveryProperties>(true)->setRoutingKey(BROKER);
+ AMQFrame header(headerBody);
+ header.setBof(false);
+ header.setEof(false);
+ header.setBos(true);
+ header.setEos(true);
+ AMQContentBody data;
+ qpid::amqp_0_10::MapCodec::encode(request, data.getData());
+ AMQFrame content(data);
+ content.setBof(false);
+ content.setEof(true);
+ content.setBos(true);
+ content.setEos(true);
+ sessionHandler.out->handle(method);
+ sessionHandler.out->handle(header);
+ sessionHandler.out->handle(content);
+}
+
+// Like Variant::asMap but treat void value as an empty map.
+Variant::Map asMapVoid(const Variant& value) {
+ if (!value.isVoid()) return value.asMap();
+ else return Variant::Map();
+}
+
+} // namespace
+
+BrokerReplicator::~BrokerReplicator() {}
+
+BrokerReplicator::BrokerReplicator(const boost::shared_ptr<Link>& l)
+ : Exchange(QPID_CONFIGURATION_REPLICATOR), broker(*l->getBroker()), link(l)
+{
+ QPID_LOG(info, "HA: Backup replicating from " <<
+ link->getTransport() << ":" << link->getHost() << ":" << link->getPort());
+ broker.getLinks().declare(
+ link->getHost(), link->getPort(),
+ false, // durable
+ QPID_CONFIGURATION_REPLICATOR, // src
+ QPID_CONFIGURATION_REPLICATOR, // dest
+ "", // key
+ false, // isQueue
+ false, // isLocal
+ "", // id/tag
+ "", // excludes
+ false, // dynamic
+ 0, // sync?
+ boost::bind(&BrokerReplicator::initializeBridge, this, _1, _2)
+ );
+}
+
+// This is called in the connection IO thread when the bridge is started.
+void BrokerReplicator::initializeBridge(Bridge& bridge, SessionHandler& sessionHandler) {
+ framing::AMQP_ServerProxy peer(sessionHandler.out);
+ string queueName = bridge.getQueueName();
+ const qmf::org::apache::qpid::broker::ArgsLinkBridge& args(bridge.getArgs());
+
+ //declare and bind an event queue
+ peer.getQueue().declare(queueName, "", false, false, true, true, FieldTable());
+ peer.getExchange().bind(queueName, QMF_DEFAULT_TOPIC, AGENT_IND_EVENT_ORG_APACHE_QPID_BROKER, FieldTable());
+ //subscribe to the queue
+ peer.getMessage().subscribe(queueName, args.i_dest, 1, 0, false, "", 0, FieldTable());
+ peer.getMessage().flow(args.i_dest, 0, 0xFFFFFFFF);
+ peer.getMessage().flow(args.i_dest, 1, 0xFFFFFFFF);
+
+ //issue a query request for queues and another for exchanges using event queue as the reply-to address
+ sendQuery(QUEUE, queueName, sessionHandler);
+ sendQuery(EXCHANGE, queueName, sessionHandler);
+ sendQuery(BINDING, queueName, sessionHandler);
+ QPID_LOG(debug, "HA: Backup activated configuration bridge: " << queueName);
+}
+
+// FIXME aconway 2011-12-02: error handling in route.
+void BrokerReplicator::route(Deliverable& msg, const string& /*key*/, const framing::FieldTable* headers) {
+ Variant::List list;
+ try {
+ if (!isQMFv2(msg.getMessage()) || !headers)
+ throw Exception("Unexpected message, not QMF2 event or query response.");
+ // decode as list
+ string content = msg.getMessage().getFrames().getContent();
+ amqp_0_10::ListCodec::decode(content, list);
+
+ if (headers->getAsString(QMF_CONTENT) == EVENT) {
+ for (Variant::List::iterator i = list.begin(); i != list.end(); ++i) {
+ Variant::Map& map = i->asMap();
+ Variant::Map& schema = map[SCHEMA_ID].asMap();
+ Variant::Map& values = map[VALUES].asMap();
+ if (match<EventQueueDeclare>(schema)) doEventQueueDeclare(values);
+ else if (match<EventQueueDelete>(schema)) doEventQueueDelete(values);
+ else if (match<EventExchangeDeclare>(schema)) doEventExchangeDeclare(values);
+ else if (match<EventExchangeDelete>(schema)) doEventExchangeDelete(values);
+ else if (match<EventBind>(schema)) doEventBind(values);
+ else if (match<EventUnbind>(schema)) doEventUnbind(values);
+ }
+ } else if (headers->getAsString(QMF_OPCODE) == QUERY_RESPONSE) {
+ for (Variant::List::iterator i = list.begin(); i != list.end(); ++i) {
+ string type = i->asMap()[SCHEMA_ID].asMap()[CLASS_NAME];
+ Variant::Map& values = i->asMap()[VALUES].asMap();
+ framing::FieldTable args;
+ amqp_0_10::translate(asMapVoid(values[ARGUMENTS]), args);
+ if (type == QUEUE) doResponseQueue(values);
+ else if (type == EXCHANGE) doResponseExchange(values);
+ else if (type == BINDING) doResponseBind(values);
+ else QPID_LOG(error, "HA: Backup received unknown response type=" << type
+ << " values=" << values);
+ }
+ } else QPID_LOG(error, "HA: Backup received unexpected message: " << *headers);
+ } catch (const std::exception& e) {
+ QPID_LOG(error, "HA: Backup replication error: " << e.what() << ": while handling: " << list);
+ }
+}
+
+void BrokerReplicator::doEventQueueDeclare(Variant::Map& values) {
+ string name = values[QNAME].asString();
+ Variant::Map argsMap = asMapVoid(values[ARGS]);
+ if (values[DISP] == CREATED && replicateLevel(argsMap)) {
+ framing::FieldTable args;
+ amqp_0_10::translate(argsMap, args);
+ std::pair<boost::shared_ptr<Queue>, bool> result =
+ broker.createQueue(
+ name,
+ values[DURABLE].asBool(),
+ values[AUTODEL].asBool(),
+ 0 /*i.e. no owner regardless of exclusivity on master*/,
+ values[ALTEX].asString(),
+ args,
+ values[USER].asString(),
+ values[RHOST].asString());
+ if (result.second) {
+ // FIXME aconway 2011-11-22: should delete old queue and
+ // re-create from event.
+ // Events are always up to date, whereas responses may be
+ // out of date.
+ QPID_LOG(debug, "HA: Backup created queue: " << name);
+ startQueueReplicator(result.first);
+ } else {
+ // FIXME aconway 2011-12-02: what's the right way to handle this?
+ QPID_LOG(warning, "HA: Backup queue already exists: " << name);
+ }
+ }
+}
+
+void BrokerReplicator::doEventQueueDelete(Variant::Map& values) {
+ // The remote queue has already been deleted so replicator
+ // sessions may be closed by a "queue deleted" exception.
+ string name = values[QNAME].asString();
+ boost::shared_ptr<Queue> queue = broker.getQueues().find(name);
+ if (queue && replicateLevel(queue->getSettings())) {
+ QPID_LOG(debug, "HA: Backup deleting queue: " << name);
+ string rname = QueueReplicator::replicatorName(name);
+ boost::shared_ptr<broker::Exchange> ex = broker.getExchanges().find(rname);
+ boost::shared_ptr<QueueReplicator> qr = boost::dynamic_pointer_cast<QueueReplicator>(ex);
+ if (qr) qr->deactivate();
+ // QueueReplicator's bridge is now queued for destruction but may not
+ // actually be destroyed, deleting the exhange
+ broker.getExchanges().destroy(rname);
+ broker.deleteQueue(name, values[USER].asString(), values[RHOST].asString());
+ }
+}
+
+void BrokerReplicator::doEventExchangeDeclare(Variant::Map& values) {
+ Variant::Map argsMap(asMapVoid(values[ARGS]));
+ if (values[DISP] == CREATED && replicateLevel(argsMap)) {
+ string name = values[EXNAME].asString();
+ framing::FieldTable args;
+ amqp_0_10::translate(argsMap, args);
+ if (broker.createExchange(
+ name,
+ values[EXTYPE].asString(),
+ values[DURABLE].asBool(),
+ values[ALTEX].asString(),
+ args,
+ values[USER].asString(),
+ values[RHOST].asString()).second)
+ {
+ QPID_LOG(debug, "HA: Backup created exchange: " << name);
+ } else {
+ // FIXME aconway 2011-11-22: should delete pre-exisitng exchange
+ // and re-create from event. See comment in doEventQueueDeclare.
+ QPID_LOG(warning, "HA: Backup exchange already exists: " << name);
+ }
+ }
+}
+
+void BrokerReplicator::doEventExchangeDelete(Variant::Map& values) {
+ string name = values[EXNAME].asString();
+ try {
+ boost::shared_ptr<Exchange> exchange = broker.getExchanges().find(name);
+ if (exchange && replicateLevel(exchange->getArgs())) {
+ QPID_LOG(debug, "HA: Backup deleting exchange:" << name);
+ broker.deleteExchange(
+ name,
+ values[USER].asString(),
+ values[RHOST].asString());
+ }
+ } catch (const framing::NotFoundException&) {}
+}
+
+void BrokerReplicator::doEventBind(Variant::Map& values) {
+ boost::shared_ptr<Exchange> exchange =
+ broker.getExchanges().find(values[EXNAME].asString());
+ boost::shared_ptr<Queue> queue =
+ broker.getQueues().find(values[QNAME].asString());
+ // We only replicate binds for a replicated queue to replicated
+ // exchange that both exist locally.
+ if (exchange && replicateLevel(exchange->getArgs()) &&
+ queue && replicateLevel(queue->getSettings()))
+ {
+ framing::FieldTable args;
+ amqp_0_10::translate(asMapVoid(values[ARGS]), args);
+ string key = values[KEY].asString();
+ QPID_LOG(debug, "HA: Backup replicated binding exchange=" << exchange->getName()
+ << " queue=" << queue->getName()
+ << " key=" << key);
+ exchange->bind(queue, key, &args);
+ }
+}
+
+void BrokerReplicator::doEventUnbind(Variant::Map& values) {
+ boost::shared_ptr<Exchange> exchange =
+ broker.getExchanges().find(values[EXNAME].asString());
+ boost::shared_ptr<Queue> queue =
+ broker.getQueues().find(values[QNAME].asString());
+ // We only replicate unbinds for a replicated queue to replicated
+ // exchange that both exist locally.
+ if (exchange && replicateLevel(exchange->getArgs()) &&
+ queue && replicateLevel(queue->getSettings()))
+ {
+ framing::FieldTable args;
+ amqp_0_10::translate(asMapVoid(values[ARGS]), args);
+ string key = values[KEY].asString();
+ QPID_LOG(debug, "HA: Backup replicated unbinding exchange=" << exchange->getName()
+ << " queue=" << queue->getName()
+ << " key=" << key);
+ exchange->unbind(queue, key, &args);
+ }
+}
+
+void BrokerReplicator::doResponseQueue(Variant::Map& values) {
+ // FIXME aconway 2011-11-22: more flexible ways & defaults to indicate replication
+ Variant::Map argsMap(asMapVoid(values[ARGUMENTS]));
+ if (!replicateLevel(argsMap)) return;
+ framing::FieldTable args;
+ amqp_0_10::translate(argsMap, args);
+ string name(values[NAME].asString());
+ std::pair<boost::shared_ptr<Queue>, bool> result =
+ broker.createQueue(
+ name,
+ values[DURABLE].asBool(),
+ values[AUTODELETE].asBool(),
+ 0 /*i.e. no owner regardless of exclusivity on master*/,
+ ""/*TODO: need to include alternate-exchange*/,
+ args,
+ ""/*TODO: who is the user?*/,
+ ""/*TODO: what should we use as connection id?*/);
+ if (result.second) {
+ QPID_LOG(debug, "HA: Backup created catch-up queue: " << values[NAME]);
+ startQueueReplicator(result.first);
+ } else {
+ // FIXME aconway 2011-11-22: Normal to find queue already
+ // exists if we're failing over.
+ QPID_LOG(warning, "HA: Backup catch-up queue already exists: " << name);
+ }
+}
+
+void BrokerReplicator::doResponseExchange(Variant::Map& values) {
+ Variant::Map argsMap(asMapVoid(values[ARGUMENTS]));
+ if (!replicateLevel(argsMap)) return;
+ framing::FieldTable args;
+ amqp_0_10::translate(argsMap, args);
+ if (broker.createExchange(
+ values[NAME].asString(),
+ values[TYPE].asString(),
+ values[DURABLE].asBool(),
+ ""/*TODO: need to include alternate-exchange*/,
+ args,
+ ""/*TODO: who is the user?*/,
+ ""/*TODO: what should we use as connection id?*/).second)
+ {
+ QPID_LOG(debug, "HA: Backup catch-up exchange: " << values[NAME]);
+ } else {
+ QPID_LOG(warning, "HA: Backup catch-up exchange already exists: " << values[QNAME]);
+ }
+}
+
+namespace {
+const std::string QUEUE_REF_PREFIX("org.apache.qpid.broker:queue:");
+const std::string EXCHANGE_REF_PREFIX("org.apache.qpid.broker:exchange:");
+
+std::string getRefName(const std::string& prefix, const Variant& ref) {
+ Variant::Map map(ref.asMap());
+ Variant::Map::const_iterator i = map.find(OBJECT_NAME);
+ if (i == map.end())
+ throw Exception(QPID_MSG("Replicator: invalid object reference: " << ref));
+ const std::string name = i->second.asString();
+ if (name.compare(0, prefix.size(), prefix) != 0)
+ throw Exception(QPID_MSG("Replicator: unexpected reference prefix: " << name));
+ std::string ret = name.substr(prefix.size());
+ return ret;
+}
+
+const std::string EXCHANGE_REF("exchangeRef");
+const std::string QUEUE_REF("queueRef");
+
+} // namespace
+
+void BrokerReplicator::doResponseBind(Variant::Map& values) {
+ std::string exName = getRefName(EXCHANGE_REF_PREFIX, values[EXCHANGE_REF]);
+ std::string qName = getRefName(QUEUE_REF_PREFIX, values[QUEUE_REF]);
+ boost::shared_ptr<Exchange> exchange = broker.getExchanges().find(exName);
+ boost::shared_ptr<Queue> queue = broker.getQueues().find(qName);
+ // FIXME aconway 2011-11-24: more flexible configuration for binding replication.
+
+ // Automatically replicate binding if queue and exchange exist and are replicated
+ if (exchange && replicateLevel(exchange->getArgs()) &&
+ queue && replicateLevel(queue->getSettings()))
+ {
+ framing::FieldTable args;
+ amqp_0_10::translate(asMapVoid(values[ARGUMENTS]), args);
+ string key = values[KEY].asString();
+ exchange->bind(queue, key, &args);
+ QPID_LOG(debug, "HA: Backup catch-up binding: exchange=" << exchange->getName()
+ << " queue=" << queue->getName()
+ << " key=" << key);
+ }
+}
+
+void BrokerReplicator::startQueueReplicator(const boost::shared_ptr<Queue>& queue) {
+ if (replicateLevel(queue->getSettings()) == RL_MESSAGES) {
+ boost::shared_ptr<QueueReplicator> qr(new QueueReplicator(queue, link));
+ broker.getExchanges().registerExchange(qr);
+ qr->activate();
+ }
+}
+
+bool BrokerReplicator::bind(boost::shared_ptr<Queue>, const string&, const framing::FieldTable*) { return false; }
+bool BrokerReplicator::unbind(boost::shared_ptr<Queue>, const string&, const framing::FieldTable*) { return false; }
+bool BrokerReplicator::isBound(boost::shared_ptr<Queue>, const string* const, const framing::FieldTable* const) { return false; }
+
+string BrokerReplicator::getType() const { return QPID_CONFIGURATION_REPLICATOR; }
+
+}} // namespace broker