summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/ha
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2012-02-17 14:54:46 +0000
committerAlan Conway <aconway@apache.org>2012-02-17 14:54:46 +0000
commit0a8773c335509c2b9e9b96df360de190a266dcad (patch)
tree288469c17dacc37199b5f77498965fee7e778d95 /cpp/src/qpid/ha
parentd82ce6836f7f0e4f7d647b2dc603141f549869d3 (diff)
downloadqpid-python-0a8773c335509c2b9e9b96df360de190a266dcad.tar.gz
QPID-3603: Merge new HA foundations.
Merged from qpid-3603-7. This is basic support for the new HA approach. For information & limitations see qpid/cpp/design_docs/new-ha-design.txt. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1245587 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/ha')
-rw-r--r--cpp/src/qpid/ha/Backup.cpp90
-rw-r--r--cpp/src/qpid/ha/Backup.h67
-rw-r--r--cpp/src/qpid/ha/BrokerReplicator.cpp497
-rw-r--r--cpp/src/qpid/ha/BrokerReplicator.h85
-rw-r--r--cpp/src/qpid/ha/ConnectionExcluder.cpp40
-rw-r--r--cpp/src/qpid/ha/ConnectionExcluder.h54
-rw-r--r--cpp/src/qpid/ha/HaBroker.cpp137
-rw-r--r--cpp/src/qpid/ha/HaBroker.h74
-rw-r--r--cpp/src/qpid/ha/HaPlugin.cpp67
-rw-r--r--cpp/src/qpid/ha/QueueReplicator.cpp174
-rw-r--r--cpp/src/qpid/ha/QueueReplicator.h86
-rw-r--r--cpp/src/qpid/ha/ReplicatingSubscription.cpp293
-rw-r--r--cpp/src/qpid/ha/ReplicatingSubscription.h134
-rw-r--r--cpp/src/qpid/ha/Settings.h45
-rw-r--r--cpp/src/qpid/ha/management-schema.xml38
15 files changed, 1881 insertions, 0 deletions
diff --git a/cpp/src/qpid/ha/Backup.cpp b/cpp/src/qpid/ha/Backup.cpp
new file mode 100644
index 0000000000..5acbfb9d5f
--- /dev/null
+++ b/cpp/src/qpid/ha/Backup.cpp
@@ -0,0 +1,90 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "Backup.h"
+#include "Settings.h"
+#include "BrokerReplicator.h"
+#include "ReplicatingSubscription.h"
+#include "ConnectionExcluder.h"
+#include "qpid/Url.h"
+#include "qpid/amqp_0_10/Codecs.h"
+#include "qpid/broker/Bridge.h"
+#include "qpid/broker/Broker.h"
+#include "qpid/broker/SessionHandler.h"
+#include "qpid/broker/Link.h"
+#include "qpid/framing/AMQP_ServerProxy.h"
+#include "qpid/framing/AMQFrame.h"
+#include "qpid/framing/FieldTable.h"
+#include "qpid/framing/MessageTransferBody.h"
+#include "qpid/types/Variant.h"
+
+namespace qpid {
+namespace ha {
+
+using namespace framing;
+using namespace broker;
+using types::Variant;
+using std::string;
+
+Backup::Backup(broker::Broker& b, const Settings& s) :
+ broker(b), settings(s), excluder(new ConnectionExcluder())
+{
+ // Empty brokerUrl means delay initialization until setUrl() is called.
+ if (!s.brokerUrl.empty()) initialize(Url(s.brokerUrl));
+}
+
+void Backup::initialize(const Url& url) {
+ assert(!url.empty());
+ QPID_LOG(notice, "Ha: Backup started: " << url);
+ string protocol = url[0].protocol.empty() ? "tcp" : url[0].protocol;
+ // Declare the link
+ std::pair<Link::shared_ptr, bool> result = broker.getLinks().declare(
+ url[0].host, url[0].port, protocol,
+ false, // durable
+ settings.mechanism, settings.username, settings.password);
+ assert(result.second); // FIXME aconway 2011-11-23: error handling
+ link = result.first;
+ link->setUrl(url);
+
+ replicator.reset(new BrokerReplicator(link));
+ broker.getExchanges().registerExchange(replicator);
+ broker.getConnectionObservers().add(excluder);
+}
+
+void Backup::setBrokerUrl(const Url& url) {
+ // Ignore empty URLs seen during start-up for some tests.
+ if (url.empty()) return;
+ sys::Mutex::ScopedLock l(lock);
+ if (link) { // URL changed after we initialized.
+ QPID_LOG(info, "HA: Backup failover URL set to " << url);
+ link->setUrl(url);
+ }
+ else {
+ initialize(url); // Deferred initialization
+ }
+}
+
+Backup::~Backup() {
+ if (link) link->close();
+ if (replicator.get()) broker.getExchanges().destroy(replicator->getName());
+ broker.getConnectionObservers().remove(excluder); // This allows client connections.
+}
+
+}} // namespace qpid::ha
diff --git a/cpp/src/qpid/ha/Backup.h b/cpp/src/qpid/ha/Backup.h
new file mode 100644
index 0000000000..526b238b82
--- /dev/null
+++ b/cpp/src/qpid/ha/Backup.h
@@ -0,0 +1,67 @@
+#ifndef QPID_HA_BACKUP_H
+#define QPID_HA_BACKUP_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "Settings.h"
+#include "qpid/Url.h"
+#include "qpid/sys/Mutex.h"
+#include <boost/shared_ptr.hpp>
+
+namespace qpid {
+
+namespace broker {
+class Broker;
+class Link;
+}
+
+namespace ha {
+class Settings;
+class ConnectionExcluder;
+class BrokerReplicator;
+
+/**
+ * State associated with a backup broker. Manages connections to primary.
+ *
+ * THREAD SAFE
+ */
+class Backup
+{
+ public:
+ Backup(broker::Broker&, const Settings&);
+ ~Backup();
+ void setBrokerUrl(const Url&);
+
+ private:
+ void initialize(const Url&);
+
+ sys::Mutex lock;
+ broker::Broker& broker;
+ Settings settings;
+ boost::shared_ptr<broker::Link> link;
+ boost::shared_ptr<BrokerReplicator> replicator;
+ boost::shared_ptr<ConnectionExcluder> excluder;
+};
+
+}} // namespace qpid::ha
+
+#endif /*!QPID_HA_BACKUP_H*/
diff --git a/cpp/src/qpid/ha/BrokerReplicator.cpp b/cpp/src/qpid/ha/BrokerReplicator.cpp
new file mode 100644
index 0000000000..a8f05c1fe3
--- /dev/null
+++ b/cpp/src/qpid/ha/BrokerReplicator.cpp
@@ -0,0 +1,497 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "BrokerReplicator.h"
+#include "QueueReplicator.h"
+#include "qpid/broker/Broker.h"
+#include "qpid/broker/Queue.h"
+#include "qpid/broker/Link.h"
+#include "qpid/framing/FieldTable.h"
+#include "qpid/log/Statement.h"
+#include "qpid/amqp_0_10/Codecs.h"
+#include "qpid/broker/SessionHandler.h"
+#include "qpid/framing/reply_exceptions.h"
+#include "qpid/framing/MessageTransferBody.h"
+#include "qmf/org/apache/qpid/broker/EventBind.h"
+#include "qmf/org/apache/qpid/broker/EventUnbind.h"
+#include "qmf/org/apache/qpid/broker/EventExchangeDeclare.h"
+#include "qmf/org/apache/qpid/broker/EventExchangeDelete.h"
+#include "qmf/org/apache/qpid/broker/EventQueueDeclare.h"
+#include "qmf/org/apache/qpid/broker/EventQueueDelete.h"
+#include "qmf/org/apache/qpid/broker/EventSubscribe.h"
+#include <algorithm>
+
+namespace qpid {
+namespace ha {
+
+using qmf::org::apache::qpid::broker::EventBind;
+using qmf::org::apache::qpid::broker::EventUnbind;
+using qmf::org::apache::qpid::broker::EventExchangeDeclare;
+using qmf::org::apache::qpid::broker::EventExchangeDelete;
+using qmf::org::apache::qpid::broker::EventQueueDeclare;
+using qmf::org::apache::qpid::broker::EventQueueDelete;
+using qmf::org::apache::qpid::broker::EventSubscribe;
+using namespace framing;
+using std::string;
+using types::Variant;
+using namespace broker;
+
+namespace {
+
+const string QPID_CONFIGURATION_REPLICATOR("qpid.configuration-replicator");
+const string QPID_REPLICATE("qpid.replicate");
+
+const string CLASS_NAME("_class_name");
+const string EVENT("_event");
+const string OBJECT_NAME("_object_name");
+const string PACKAGE_NAME("_package_name");
+const string QUERY_RESPONSE("_query_response");
+const string SCHEMA_ID("_schema_id");
+const string VALUES("_values");
+
+const string ALTEX("altEx");
+const string ARGS("args");
+const string ARGUMENTS("arguments");
+const string AUTODEL("autoDel");
+const string AUTODELETE("autoDelete");
+const string BIND("bind");
+const string UNBIND("unbind");
+const string BINDING("binding");
+const string CREATED("created");
+const string DISP("disp");
+const string DURABLE("durable");
+const string EXCHANGE("exchange");
+const string EXNAME("exName");
+const string EXTYPE("exType");
+const string KEY("key");
+const string NAME("name");
+const string QNAME("qName");
+const string QUEUE("queue");
+const string RHOST("rhost");
+const string TYPE("type");
+const string USER("user");
+
+const string AGENT_IND_EVENT_ORG_APACHE_QPID_BROKER("agent.ind.event.org_apache_qpid_broker.#");
+const string QMF2("qmf2");
+const string QMF_CONTENT("qmf.content");
+const string QMF_DEFAULT_TOPIC("qmf.default.topic");
+const string QMF_OPCODE("qmf.opcode");
+
+const string _WHAT("_what");
+const string _CLASS_NAME("_class_name");
+const string _PACKAGE_NAME("_package_name");
+const string _SCHEMA_ID("_schema_id");
+const string OBJECT("OBJECT");
+const string ORG_APACHE_QPID_BROKER("org.apache.qpid.broker");
+const string QMF_DEFAULT_DIRECT("qmf.default.direct");
+const string _QUERY_REQUEST("_query_request");
+const string BROKER("broker");
+
+bool isQMFv2(const Message& message) {
+ const framing::MessageProperties* props = message.getProperties<framing::MessageProperties>();
+ return props && props->getAppId() == QMF2;
+}
+
+template <class T> bool match(Variant::Map& schema) {
+ return T::match(schema[CLASS_NAME], schema[PACKAGE_NAME]);
+}
+
+enum ReplicateLevel { RL_NONE=0, RL_CONFIGURATION, RL_MESSAGES };
+const string S_NONE="none";
+const string S_CONFIGURATION="configuration";
+const string S_MESSAGES="messages";
+
+ReplicateLevel replicateLevel(const string& level) {
+ if (level == S_NONE) return RL_NONE;
+ if (level == S_CONFIGURATION) return RL_CONFIGURATION;
+ if (level == S_MESSAGES) return RL_MESSAGES;
+ throw Exception("Invalid value for "+QPID_REPLICATE+": "+level);
+}
+
+ReplicateLevel replicateLevel(const framing::FieldTable& f) {
+ if (f.isSet(QPID_REPLICATE)) return replicateLevel(f.getAsString(QPID_REPLICATE));
+ else return RL_NONE;
+}
+
+ReplicateLevel replicateLevel(const Variant::Map& m) {
+ Variant::Map::const_iterator i = m.find(QPID_REPLICATE);
+ if (i != m.end()) return replicateLevel(i->second.asString());
+ else return RL_NONE;
+}
+
+void sendQuery(const string className, const string& queueName, SessionHandler& sessionHandler) {
+ framing::AMQP_ServerProxy peer(sessionHandler.out);
+ Variant::Map request;
+ request[_WHAT] = OBJECT;
+ Variant::Map schema;
+ schema[_CLASS_NAME] = className;
+ schema[_PACKAGE_NAME] = ORG_APACHE_QPID_BROKER;
+ request[_SCHEMA_ID] = schema;
+
+ AMQFrame method((MessageTransferBody(ProtocolVersion(), QMF_DEFAULT_DIRECT, 0, 0)));
+ method.setBof(true);
+ method.setEof(false);
+ method.setBos(true);
+ method.setEos(true);
+ AMQHeaderBody headerBody;
+ MessageProperties* props = headerBody.get<MessageProperties>(true);
+ props->setReplyTo(qpid::framing::ReplyTo("", queueName));
+ props->setAppId(QMF2);
+ props->getApplicationHeaders().setString(QMF_OPCODE, _QUERY_REQUEST);
+ headerBody.get<qpid::framing::DeliveryProperties>(true)->setRoutingKey(BROKER);
+ AMQFrame header(headerBody);
+ header.setBof(false);
+ header.setEof(false);
+ header.setBos(true);
+ header.setEos(true);
+ AMQContentBody data;
+ qpid::amqp_0_10::MapCodec::encode(request, data.getData());
+ AMQFrame content(data);
+ content.setBof(false);
+ content.setEof(true);
+ content.setBos(true);
+ content.setEos(true);
+ sessionHandler.out->handle(method);
+ sessionHandler.out->handle(header);
+ sessionHandler.out->handle(content);
+}
+
+// Like Variant::asMap but treat void value as an empty map.
+Variant::Map asMapVoid(const Variant& value) {
+ if (!value.isVoid()) return value.asMap();
+ else return Variant::Map();
+}
+
+} // namespace
+
+BrokerReplicator::~BrokerReplicator() {}
+
+BrokerReplicator::BrokerReplicator(const boost::shared_ptr<Link>& l)
+ : Exchange(QPID_CONFIGURATION_REPLICATOR), broker(*l->getBroker()), link(l)
+{
+ QPID_LOG(info, "HA: Backup replicating from " <<
+ link->getTransport() << ":" << link->getHost() << ":" << link->getPort());
+ broker.getLinks().declare(
+ link->getHost(), link->getPort(),
+ false, // durable
+ QPID_CONFIGURATION_REPLICATOR, // src
+ QPID_CONFIGURATION_REPLICATOR, // dest
+ "", // key
+ false, // isQueue
+ false, // isLocal
+ "", // id/tag
+ "", // excludes
+ false, // dynamic
+ 0, // sync?
+ boost::bind(&BrokerReplicator::initializeBridge, this, _1, _2)
+ );
+}
+
+// This is called in the connection IO thread when the bridge is started.
+void BrokerReplicator::initializeBridge(Bridge& bridge, SessionHandler& sessionHandler) {
+ framing::AMQP_ServerProxy peer(sessionHandler.out);
+ string queueName = bridge.getQueueName();
+ const qmf::org::apache::qpid::broker::ArgsLinkBridge& args(bridge.getArgs());
+
+ //declare and bind an event queue
+ peer.getQueue().declare(queueName, "", false, false, true, true, FieldTable());
+ peer.getExchange().bind(queueName, QMF_DEFAULT_TOPIC, AGENT_IND_EVENT_ORG_APACHE_QPID_BROKER, FieldTable());
+ //subscribe to the queue
+ peer.getMessage().subscribe(queueName, args.i_dest, 1, 0, false, "", 0, FieldTable());
+ peer.getMessage().flow(args.i_dest, 0, 0xFFFFFFFF);
+ peer.getMessage().flow(args.i_dest, 1, 0xFFFFFFFF);
+
+ //issue a query request for queues and another for exchanges using event queue as the reply-to address
+ sendQuery(QUEUE, queueName, sessionHandler);
+ sendQuery(EXCHANGE, queueName, sessionHandler);
+ sendQuery(BINDING, queueName, sessionHandler);
+ QPID_LOG(debug, "HA: Backup activated configuration bridge: " << queueName);
+}
+
+// FIXME aconway 2011-12-02: error handling in route.
+void BrokerReplicator::route(Deliverable& msg, const string& /*key*/, const framing::FieldTable* headers) {
+ Variant::List list;
+ try {
+ if (!isQMFv2(msg.getMessage()) || !headers)
+ throw Exception("Unexpected message, not QMF2 event or query response.");
+ // decode as list
+ string content = msg.getMessage().getFrames().getContent();
+ amqp_0_10::ListCodec::decode(content, list);
+
+ if (headers->getAsString(QMF_CONTENT) == EVENT) {
+ for (Variant::List::iterator i = list.begin(); i != list.end(); ++i) {
+ Variant::Map& map = i->asMap();
+ Variant::Map& schema = map[SCHEMA_ID].asMap();
+ Variant::Map& values = map[VALUES].asMap();
+ if (match<EventQueueDeclare>(schema)) doEventQueueDeclare(values);
+ else if (match<EventQueueDelete>(schema)) doEventQueueDelete(values);
+ else if (match<EventExchangeDeclare>(schema)) doEventExchangeDeclare(values);
+ else if (match<EventExchangeDelete>(schema)) doEventExchangeDelete(values);
+ else if (match<EventBind>(schema)) doEventBind(values);
+ else if (match<EventUnbind>(schema)) doEventUnbind(values);
+ }
+ } else if (headers->getAsString(QMF_OPCODE) == QUERY_RESPONSE) {
+ for (Variant::List::iterator i = list.begin(); i != list.end(); ++i) {
+ string type = i->asMap()[SCHEMA_ID].asMap()[CLASS_NAME];
+ Variant::Map& values = i->asMap()[VALUES].asMap();
+ framing::FieldTable args;
+ amqp_0_10::translate(asMapVoid(values[ARGUMENTS]), args);
+ if (type == QUEUE) doResponseQueue(values);
+ else if (type == EXCHANGE) doResponseExchange(values);
+ else if (type == BINDING) doResponseBind(values);
+ else QPID_LOG(error, "HA: Backup received unknown response type=" << type
+ << " values=" << values);
+ }
+ } else QPID_LOG(error, "HA: Backup received unexpected message: " << *headers);
+ } catch (const std::exception& e) {
+ QPID_LOG(error, "HA: Backup replication error: " << e.what() << ": while handling: " << list);
+ }
+}
+
+void BrokerReplicator::doEventQueueDeclare(Variant::Map& values) {
+ string name = values[QNAME].asString();
+ Variant::Map argsMap = asMapVoid(values[ARGS]);
+ if (values[DISP] == CREATED && replicateLevel(argsMap)) {
+ framing::FieldTable args;
+ amqp_0_10::translate(argsMap, args);
+ std::pair<boost::shared_ptr<Queue>, bool> result =
+ broker.createQueue(
+ name,
+ values[DURABLE].asBool(),
+ values[AUTODEL].asBool(),
+ 0 /*i.e. no owner regardless of exclusivity on master*/,
+ values[ALTEX].asString(),
+ args,
+ values[USER].asString(),
+ values[RHOST].asString());
+ if (result.second) {
+ // FIXME aconway 2011-11-22: should delete old queue and
+ // re-create from event.
+ // Events are always up to date, whereas responses may be
+ // out of date.
+ QPID_LOG(debug, "HA: Backup created queue: " << name);
+ startQueueReplicator(result.first);
+ } else {
+ // FIXME aconway 2011-12-02: what's the right way to handle this?
+ QPID_LOG(warning, "HA: Backup queue already exists: " << name);
+ }
+ }
+}
+
+void BrokerReplicator::doEventQueueDelete(Variant::Map& values) {
+ // The remote queue has already been deleted so replicator
+ // sessions may be closed by a "queue deleted" exception.
+ string name = values[QNAME].asString();
+ boost::shared_ptr<Queue> queue = broker.getQueues().find(name);
+ if (queue && replicateLevel(queue->getSettings())) {
+ QPID_LOG(debug, "HA: Backup deleting queue: " << name);
+ string rname = QueueReplicator::replicatorName(name);
+ boost::shared_ptr<broker::Exchange> ex = broker.getExchanges().find(rname);
+ boost::shared_ptr<QueueReplicator> qr = boost::dynamic_pointer_cast<QueueReplicator>(ex);
+ if (qr) qr->deactivate();
+ // QueueReplicator's bridge is now queued for destruction but may not
+ // actually be destroyed, deleting the exhange
+ broker.getExchanges().destroy(rname);
+ broker.deleteQueue(name, values[USER].asString(), values[RHOST].asString());
+ }
+}
+
+void BrokerReplicator::doEventExchangeDeclare(Variant::Map& values) {
+ Variant::Map argsMap(asMapVoid(values[ARGS]));
+ if (values[DISP] == CREATED && replicateLevel(argsMap)) {
+ string name = values[EXNAME].asString();
+ framing::FieldTable args;
+ amqp_0_10::translate(argsMap, args);
+ if (broker.createExchange(
+ name,
+ values[EXTYPE].asString(),
+ values[DURABLE].asBool(),
+ values[ALTEX].asString(),
+ args,
+ values[USER].asString(),
+ values[RHOST].asString()).second)
+ {
+ QPID_LOG(debug, "HA: Backup created exchange: " << name);
+ } else {
+ // FIXME aconway 2011-11-22: should delete pre-exisitng exchange
+ // and re-create from event. See comment in doEventQueueDeclare.
+ QPID_LOG(warning, "HA: Backup exchange already exists: " << name);
+ }
+ }
+}
+
+void BrokerReplicator::doEventExchangeDelete(Variant::Map& values) {
+ string name = values[EXNAME].asString();
+ try {
+ boost::shared_ptr<Exchange> exchange = broker.getExchanges().find(name);
+ if (exchange && replicateLevel(exchange->getArgs())) {
+ QPID_LOG(debug, "HA: Backup deleting exchange:" << name);
+ broker.deleteExchange(
+ name,
+ values[USER].asString(),
+ values[RHOST].asString());
+ }
+ } catch (const framing::NotFoundException&) {}
+}
+
+void BrokerReplicator::doEventBind(Variant::Map& values) {
+ boost::shared_ptr<Exchange> exchange =
+ broker.getExchanges().find(values[EXNAME].asString());
+ boost::shared_ptr<Queue> queue =
+ broker.getQueues().find(values[QNAME].asString());
+ // We only replicate binds for a replicated queue to replicated
+ // exchange that both exist locally.
+ if (exchange && replicateLevel(exchange->getArgs()) &&
+ queue && replicateLevel(queue->getSettings()))
+ {
+ framing::FieldTable args;
+ amqp_0_10::translate(asMapVoid(values[ARGS]), args);
+ string key = values[KEY].asString();
+ QPID_LOG(debug, "HA: Backup replicated binding exchange=" << exchange->getName()
+ << " queue=" << queue->getName()
+ << " key=" << key);
+ exchange->bind(queue, key, &args);
+ }
+}
+
+void BrokerReplicator::doEventUnbind(Variant::Map& values) {
+ boost::shared_ptr<Exchange> exchange =
+ broker.getExchanges().find(values[EXNAME].asString());
+ boost::shared_ptr<Queue> queue =
+ broker.getQueues().find(values[QNAME].asString());
+ // We only replicate unbinds for a replicated queue to replicated
+ // exchange that both exist locally.
+ if (exchange && replicateLevel(exchange->getArgs()) &&
+ queue && replicateLevel(queue->getSettings()))
+ {
+ framing::FieldTable args;
+ amqp_0_10::translate(asMapVoid(values[ARGS]), args);
+ string key = values[KEY].asString();
+ QPID_LOG(debug, "HA: Backup replicated unbinding exchange=" << exchange->getName()
+ << " queue=" << queue->getName()
+ << " key=" << key);
+ exchange->unbind(queue, key, &args);
+ }
+}
+
+void BrokerReplicator::doResponseQueue(Variant::Map& values) {
+ // FIXME aconway 2011-11-22: more flexible ways & defaults to indicate replication
+ Variant::Map argsMap(asMapVoid(values[ARGUMENTS]));
+ if (!replicateLevel(argsMap)) return;
+ framing::FieldTable args;
+ amqp_0_10::translate(argsMap, args);
+ string name(values[NAME].asString());
+ std::pair<boost::shared_ptr<Queue>, bool> result =
+ broker.createQueue(
+ name,
+ values[DURABLE].asBool(),
+ values[AUTODELETE].asBool(),
+ 0 /*i.e. no owner regardless of exclusivity on master*/,
+ ""/*TODO: need to include alternate-exchange*/,
+ args,
+ ""/*TODO: who is the user?*/,
+ ""/*TODO: what should we use as connection id?*/);
+ if (result.second) {
+ QPID_LOG(debug, "HA: Backup created catch-up queue: " << values[NAME]);
+ startQueueReplicator(result.first);
+ } else {
+ // FIXME aconway 2011-11-22: Normal to find queue already
+ // exists if we're failing over.
+ QPID_LOG(warning, "HA: Backup catch-up queue already exists: " << name);
+ }
+}
+
+void BrokerReplicator::doResponseExchange(Variant::Map& values) {
+ Variant::Map argsMap(asMapVoid(values[ARGUMENTS]));
+ if (!replicateLevel(argsMap)) return;
+ framing::FieldTable args;
+ amqp_0_10::translate(argsMap, args);
+ if (broker.createExchange(
+ values[NAME].asString(),
+ values[TYPE].asString(),
+ values[DURABLE].asBool(),
+ ""/*TODO: need to include alternate-exchange*/,
+ args,
+ ""/*TODO: who is the user?*/,
+ ""/*TODO: what should we use as connection id?*/).second)
+ {
+ QPID_LOG(debug, "HA: Backup catch-up exchange: " << values[NAME]);
+ } else {
+ QPID_LOG(warning, "HA: Backup catch-up exchange already exists: " << values[QNAME]);
+ }
+}
+
+namespace {
+const std::string QUEUE_REF_PREFIX("org.apache.qpid.broker:queue:");
+const std::string EXCHANGE_REF_PREFIX("org.apache.qpid.broker:exchange:");
+
+std::string getRefName(const std::string& prefix, const Variant& ref) {
+ Variant::Map map(ref.asMap());
+ Variant::Map::const_iterator i = map.find(OBJECT_NAME);
+ if (i == map.end())
+ throw Exception(QPID_MSG("Replicator: invalid object reference: " << ref));
+ const std::string name = i->second.asString();
+ if (name.compare(0, prefix.size(), prefix) != 0)
+ throw Exception(QPID_MSG("Replicator: unexpected reference prefix: " << name));
+ std::string ret = name.substr(prefix.size());
+ return ret;
+}
+
+const std::string EXCHANGE_REF("exchangeRef");
+const std::string QUEUE_REF("queueRef");
+
+} // namespace
+
+void BrokerReplicator::doResponseBind(Variant::Map& values) {
+ std::string exName = getRefName(EXCHANGE_REF_PREFIX, values[EXCHANGE_REF]);
+ std::string qName = getRefName(QUEUE_REF_PREFIX, values[QUEUE_REF]);
+ boost::shared_ptr<Exchange> exchange = broker.getExchanges().find(exName);
+ boost::shared_ptr<Queue> queue = broker.getQueues().find(qName);
+ // FIXME aconway 2011-11-24: more flexible configuration for binding replication.
+
+ // Automatically replicate binding if queue and exchange exist and are replicated
+ if (exchange && replicateLevel(exchange->getArgs()) &&
+ queue && replicateLevel(queue->getSettings()))
+ {
+ framing::FieldTable args;
+ amqp_0_10::translate(asMapVoid(values[ARGUMENTS]), args);
+ string key = values[KEY].asString();
+ exchange->bind(queue, key, &args);
+ QPID_LOG(debug, "HA: Backup catch-up binding: exchange=" << exchange->getName()
+ << " queue=" << queue->getName()
+ << " key=" << key);
+ }
+}
+
+void BrokerReplicator::startQueueReplicator(const boost::shared_ptr<Queue>& queue) {
+ if (replicateLevel(queue->getSettings()) == RL_MESSAGES) {
+ boost::shared_ptr<QueueReplicator> qr(new QueueReplicator(queue, link));
+ broker.getExchanges().registerExchange(qr);
+ qr->activate();
+ }
+}
+
+bool BrokerReplicator::bind(boost::shared_ptr<Queue>, const string&, const framing::FieldTable*) { return false; }
+bool BrokerReplicator::unbind(boost::shared_ptr<Queue>, const string&, const framing::FieldTable*) { return false; }
+bool BrokerReplicator::isBound(boost::shared_ptr<Queue>, const string* const, const framing::FieldTable* const) { return false; }
+
+string BrokerReplicator::getType() const { return QPID_CONFIGURATION_REPLICATOR; }
+
+}} // namespace broker
diff --git a/cpp/src/qpid/ha/BrokerReplicator.h b/cpp/src/qpid/ha/BrokerReplicator.h
new file mode 100644
index 0000000000..cfb6cf9a28
--- /dev/null
+++ b/cpp/src/qpid/ha/BrokerReplicator.h
@@ -0,0 +1,85 @@
+#ifndef QPID_HA_REPLICATOR_H
+#define QPID_HA_REPLICATOR_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/broker/Exchange.h"
+#include "qpid/types/Variant.h"
+#include <boost/shared_ptr.hpp>
+
+namespace qpid {
+
+namespace broker {
+class Broker;
+class Link;
+class Bridge;
+class SessionHandler;
+}
+
+namespace ha {
+
+/**
+ * Replicate configuration on a backup broker.
+ *
+ * Implemented as an exchange that subscribes to receive QMF
+ * configuration events from the primary. It configures local queues
+ * exchanges and bindings to replicate the primary.
+ * It also creates QueueReplicators for newly replicated queues.
+ *
+ * THREAD SAFE: Has no mutable state.
+ *
+ */
+class BrokerReplicator : public broker::Exchange
+{
+ public:
+ BrokerReplicator(const boost::shared_ptr<broker::Link>&);
+ ~BrokerReplicator();
+ std::string getType() const;
+
+ // Exchange methods
+ bool bind(boost::shared_ptr<broker::Queue>, const std::string&, const framing::FieldTable*);
+ bool unbind(boost::shared_ptr<broker::Queue>, const std::string&, const framing::FieldTable*);
+ void route(broker::Deliverable&, const std::string&, const framing::FieldTable*);
+ bool isBound(boost::shared_ptr<broker::Queue>, const std::string* const, const framing::FieldTable* const);
+
+ private:
+ void initializeBridge(broker::Bridge&, broker::SessionHandler&);
+
+ void doEventQueueDeclare(types::Variant::Map& values);
+ void doEventQueueDelete(types::Variant::Map& values);
+ void doEventExchangeDeclare(types::Variant::Map& values);
+ void doEventExchangeDelete(types::Variant::Map& values);
+ void doEventBind(types::Variant::Map&);
+ void doEventUnbind(types::Variant::Map&);
+
+ void doResponseQueue(types::Variant::Map& values);
+ void doResponseExchange(types::Variant::Map& values);
+ void doResponseBind(types::Variant::Map& values);
+
+ void startQueueReplicator(const boost::shared_ptr<broker::Queue>&);
+
+ broker::Broker& broker;
+ boost::shared_ptr<broker::Link> link;
+};
+}} // namespace qpid::broker
+
+#endif /*!QPID_HA_REPLICATOR_H*/
diff --git a/cpp/src/qpid/ha/ConnectionExcluder.cpp b/cpp/src/qpid/ha/ConnectionExcluder.cpp
new file mode 100644
index 0000000000..67ad7202d6
--- /dev/null
+++ b/cpp/src/qpid/ha/ConnectionExcluder.cpp
@@ -0,0 +1,40 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "ConnectionExcluder.h"
+#include "qpid/broker/Connection.h"
+#include <boost/function.hpp>
+#include <sstream>
+
+namespace qpid {
+namespace ha {
+
+ConnectionExcluder::ConnectionExcluder() {}
+
+void ConnectionExcluder::opened(broker::Connection& connection) {
+ if (!connection.isLink() && !connection.getClientProperties().isSet(ADMIN_TAG))
+ throw Exception(
+ QPID_MSG("HA: Backup broker rejected connection " << connection.getMgmtId()));
+}
+
+const std::string ConnectionExcluder::ADMIN_TAG="qpid.ha-admin";
+
+}} // namespace qpid::ha
diff --git a/cpp/src/qpid/ha/ConnectionExcluder.h b/cpp/src/qpid/ha/ConnectionExcluder.h
new file mode 100644
index 0000000000..f8f2843a0c
--- /dev/null
+++ b/cpp/src/qpid/ha/ConnectionExcluder.h
@@ -0,0 +1,54 @@
+#ifndef QPID_HA_CONNECTIONEXCLUDER_H
+#define QPID_HA_CONNECTIONEXCLUDER_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/broker/ConnectionObserver.h"
+#include <boost/function.hpp>
+
+namespace qpid {
+
+namespace broker {
+class Connection;
+}
+
+namespace ha {
+
+/**
+ * Exclude normal connections to a backup broker.
+ * Admin connections are identified by a special flag in client-properties
+ * during connection negotiation.
+ */
+class ConnectionExcluder : public broker::ConnectionObserver
+{
+ public:
+ ConnectionExcluder();
+
+ void opened(broker::Connection& connection);
+
+ private:
+ static const std::string ADMIN_TAG;
+};
+
+}} // namespace qpid::ha
+
+#endif /*!QPID_HA_CONNECTIONEXCLUDER_H*/
diff --git a/cpp/src/qpid/ha/HaBroker.cpp b/cpp/src/qpid/ha/HaBroker.cpp
new file mode 100644
index 0000000000..0d3bd51439
--- /dev/null
+++ b/cpp/src/qpid/ha/HaBroker.cpp
@@ -0,0 +1,137 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "Backup.h"
+#include "ConnectionExcluder.h"
+#include "HaBroker.h"
+#include "Settings.h"
+#include "ReplicatingSubscription.h"
+#include "qpid/Exception.h"
+#include "qpid/broker/Broker.h"
+#include "qpid/management/ManagementAgent.h"
+#include "qmf/org/apache/qpid/ha/Package.h"
+#include "qmf/org/apache/qpid/ha/ArgsHaBrokerSetClientAddresses.h"
+#include "qmf/org/apache/qpid/ha/ArgsHaBrokerSetBrokerAddresses.h"
+#include "qpid/log/Statement.h"
+
+namespace qpid {
+namespace ha {
+
+namespace _qmf = ::qmf::org::apache::qpid::ha;
+using namespace management;
+using namespace std;
+
+namespace {
+
+const std::string PRIMARY="primary";
+const std::string BACKUP="backup";
+
+} // namespace
+
+
+HaBroker::HaBroker(broker::Broker& b, const Settings& s)
+ : broker(b),
+ settings(s),
+ backup(new Backup(b, s)),
+ mgmtObject(0)
+{
+ // Register a factory for replicating subscriptions.
+ broker.getConsumerFactories().add(
+ boost::shared_ptr<ReplicatingSubscription::Factory>(
+ new ReplicatingSubscription::Factory()));
+
+ broker.getKnownBrokers = boost::bind(&HaBroker::getKnownBrokers, this);
+
+ ManagementAgent* ma = broker.getManagementAgent();
+ if (!ma)
+ throw Exception("Cannot start HA: management is disabled");
+ if (ma) {
+ _qmf::Package packageInit(ma);
+ mgmtObject = new _qmf::HaBroker(ma, this, "ha-broker");
+ mgmtObject->set_status(BACKUP);
+ ma->addObject(mgmtObject);
+ }
+ sys::Mutex::ScopedLock l(lock);
+ if (!settings.clientUrl.empty()) setClientUrl(Url(settings.clientUrl), l);
+ if (!settings.brokerUrl.empty()) setBrokerUrl(Url(settings.brokerUrl), l);
+}
+
+HaBroker::~HaBroker() {}
+
+Manageable::status_t HaBroker::ManagementMethod (uint32_t methodId, Args& args, string&) {
+ sys::Mutex::ScopedLock l(lock);
+ switch (methodId) {
+ case _qmf::HaBroker::METHOD_PROMOTE: {
+ if (backup.get()) { // I am a backup
+ // FIXME aconway 2012-01-26: create primary state before resetting backup
+ // as that allows client connections.
+ backup.reset();
+ QPID_LOG(notice, "HA: Primary promoted from backup");
+ mgmtObject->set_status(PRIMARY);
+ }
+ break;
+ }
+ case _qmf::HaBroker::METHOD_SETCLIENTADDRESSES: {
+ setClientUrl(
+ Url(dynamic_cast<_qmf::ArgsHaBrokerSetClientAddresses&>(args).
+ i_clientAddresses), l);
+ break;
+ }
+ case _qmf::HaBroker::METHOD_SETBROKERADDRESSES: {
+ setBrokerUrl(
+ Url(dynamic_cast<_qmf::ArgsHaBrokerSetBrokerAddresses&>(args)
+ .i_brokerAddresses), l);
+ break;
+ }
+ default:
+ return Manageable::STATUS_UNKNOWN_METHOD;
+ }
+ return Manageable::STATUS_OK;
+}
+
+void HaBroker::setClientUrl(const Url& url, const sys::Mutex::ScopedLock& l) {
+ if (url.empty()) throw Exception("Invalid empty URL for HA client failover");
+ clientUrl = url;
+ updateClientUrl(l);
+}
+
+void HaBroker::updateClientUrl(const sys::Mutex::ScopedLock&) {
+ Url url = clientUrl.empty() ? brokerUrl : clientUrl;
+ assert(!url.empty());
+ mgmtObject->set_clientAddresses(url.str());
+ knownBrokers.clear();
+ knownBrokers.push_back(url);
+ QPID_LOG(debug, "HA: Setting client known-brokers to: " << url);
+}
+
+void HaBroker::setBrokerUrl(const Url& url, const sys::Mutex::ScopedLock& l) {
+ if (url.empty()) throw Exception("Invalid empty URL for HA broker failover");
+ brokerUrl = url;
+ mgmtObject->set_brokerAddresses(brokerUrl.str());
+ if (backup.get()) backup->setBrokerUrl(brokerUrl);
+ // Updating broker URL also updates defaulted client URL:
+ if (clientUrl.empty()) updateClientUrl(l);
+}
+
+std::vector<Url> HaBroker::getKnownBrokers() const {
+ return knownBrokers;
+}
+
+}} // namespace qpid::ha
diff --git a/cpp/src/qpid/ha/HaBroker.h b/cpp/src/qpid/ha/HaBroker.h
new file mode 100644
index 0000000000..4d7bf80c90
--- /dev/null
+++ b/cpp/src/qpid/ha/HaBroker.h
@@ -0,0 +1,74 @@
+#ifndef QPID_HA_BROKER_H
+#define QPID_HA_BROKER_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "Settings.h"
+#include "qpid/Url.h"
+#include "qpid/sys/Mutex.h"
+#include "qmf/org/apache/qpid/ha/HaBroker.h"
+#include "qmf/org/apache/qpid/ha/ArgsHaBrokerSetStatus.h"
+#include "qpid/management/Manageable.h"
+#include <memory>
+
+namespace qpid {
+namespace broker {
+class Broker;
+}
+namespace ha {
+class Backup;
+
+/**
+ * HA state and actions associated with a broker.
+ *
+ * THREAD SAFE: may be called in arbitrary broker IO or timer threads.
+ */
+class HaBroker : public management::Manageable
+{
+ public:
+ HaBroker(broker::Broker&, const Settings&);
+ ~HaBroker();
+
+ // Implement Manageable.
+ qpid::management::ManagementObject* GetManagementObject() const { return mgmtObject; }
+ management::Manageable::status_t ManagementMethod (
+ uint32_t methodId, management::Args& args, std::string& text);
+
+ private:
+ void setClientUrl(const Url&, const sys::Mutex::ScopedLock&);
+ void setBrokerUrl(const Url&, const sys::Mutex::ScopedLock&);
+ void updateClientUrl(const sys::Mutex::ScopedLock&);
+ bool isPrimary(const sys::Mutex::ScopedLock&) { return !backup.get(); }
+ std::vector<Url> getKnownBrokers() const;
+
+ broker::Broker& broker;
+ const Settings settings;
+
+ sys::Mutex lock;
+ std::auto_ptr<Backup> backup;
+ qmf::org::apache::qpid::ha::HaBroker* mgmtObject;
+ Url clientUrl, brokerUrl;
+ std::vector<Url> knownBrokers;
+};
+}} // namespace qpid::ha
+
+#endif /*!QPID_HA_BROKER_H*/
diff --git a/cpp/src/qpid/ha/HaPlugin.cpp b/cpp/src/qpid/ha/HaPlugin.cpp
new file mode 100644
index 0000000000..fc9e48411d
--- /dev/null
+++ b/cpp/src/qpid/ha/HaPlugin.cpp
@@ -0,0 +1,67 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#include "HaBroker.h"
+#include "Settings.h"
+#include "qpid/Plugin.h"
+#include "qpid/Options.h"
+#include "qpid/broker/Broker.h"
+
+
+namespace qpid {
+namespace ha {
+
+using namespace std;
+
+struct Options : public qpid::Options {
+ Settings& settings;
+ Options(Settings& s) : qpid::Options("HA Options"), settings(s) {
+ addOptions()
+ ("ha-enable", optValue(settings.enabled, "yes|no"), "Enable High Availability features")
+ ("ha-client-url", optValue(settings.clientUrl,"URL"), "URL that clients use to connect and fail over.")
+ ("ha-broker-url", optValue(settings.brokerUrl,"URL"), "URL that backup brokers use to connect and fail over.")
+ ("ha-username", optValue(settings.username, "USER"), "Username for connections between brokers")
+ ("ha-password", optValue(settings.password, "PASS"), "Password for connections between brokers")
+ ("ha-mechanism", optValue(settings.mechanism, "MECH"), "Authentication mechanism for connections between brokers")
+ ;
+ }
+};
+
+struct HaPlugin : public Plugin {
+
+ Settings settings;
+ Options options;
+ auto_ptr<HaBroker> haBroker;
+
+ HaPlugin() : options(settings) {}
+
+ Options* getOptions() { return &options; }
+
+ void earlyInitialize(Plugin::Target& ) {}
+
+ void initialize(Plugin::Target& target) {
+ broker::Broker* broker = dynamic_cast<broker::Broker*>(&target);
+ if (broker && settings.enabled) {
+ haBroker.reset(new ha::HaBroker(*broker, settings));
+ } else
+ QPID_LOG(notice, "HA: Disabled");
+ }
+};
+
+static HaPlugin instance; // Static initialization.
+
+}} // namespace qpid::ha
diff --git a/cpp/src/qpid/ha/QueueReplicator.cpp b/cpp/src/qpid/ha/QueueReplicator.cpp
new file mode 100644
index 0000000000..0017cc82cd
--- /dev/null
+++ b/cpp/src/qpid/ha/QueueReplicator.cpp
@@ -0,0 +1,174 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "QueueReplicator.h"
+#include "ReplicatingSubscription.h"
+#include "qpid/broker/Bridge.h"
+#include "qpid/broker/Broker.h"
+#include "qpid/broker/Link.h"
+#include "qpid/broker/Queue.h"
+#include "qpid/broker/QueueRegistry.h"
+#include "qpid/broker/SessionHandler.h"
+#include "qpid/framing/SequenceSet.h"
+#include "qpid/framing/FieldTable.h"
+#include "qpid/log/Statement.h"
+#include <boost/shared_ptr.hpp>
+#include <sstream>
+
+namespace {
+const std::string QPID_REPLICATOR_("qpid.replicator-");
+const std::string TYPE_NAME("qpid.queue-replicator");
+const std::string QPID_SYNC_FREQUENCY("qpid.sync_frequency");
+}
+
+namespace qpid {
+namespace ha {
+using namespace broker;
+using namespace framing;
+
+const std::string QueueReplicator::DEQUEUE_EVENT_KEY("qpid.dequeue-event");
+const std::string QueueReplicator::POSITION_EVENT_KEY("qpid.position-event");
+
+std::string QueueReplicator::replicatorName(const std::string& queueName) {
+ return QPID_REPLICATOR_ + queueName;
+}
+
+QueueReplicator::QueueReplicator(boost::shared_ptr<Queue> q, boost::shared_ptr<Link> l)
+ : Exchange(replicatorName(q->getName()), 0, q->getBroker()), queue(q), link(l)
+{
+ std::stringstream ss;
+ ss << "HA: Backup " << queue->getName() << ": ";
+ logPrefix = ss.str();
+ QPID_LOG(info, logPrefix << "Created, settings: " << q->getSettings());
+}
+
+// This must be separate from the constructor so we can call shared_from_this.
+void QueueReplicator::activate() {
+ // Note this may create a new bridge or use an existing one.
+ queue->getBroker()->getLinks().declare(
+ link->getHost(), link->getPort(),
+ false, // durable
+ queue->getName(), // src
+ getName(), // dest
+ "", // key
+ false, // isQueue
+ false, // isLocal
+ "", // id/tag
+ "", // excludes
+ false, // dynamic
+ 0, // sync?
+ // Include shared_ptr to self to ensure we are not deleted
+ // before initializeBridge is called.
+ boost::bind(&QueueReplicator::initializeBridge, this, _1, _2, shared_from_this())
+ );
+}
+
+QueueReplicator::~QueueReplicator() {}
+
+void QueueReplicator::deactivate() {
+ sys::Mutex::ScopedLock l(lock);
+ queue->getBroker()->getLinks().destroy(
+ link->getHost(), link->getPort(), queue->getName(), getName(), string());
+ QPID_LOG(debug, logPrefix << "Deactivated bridge " << bridgeName);
+}
+
+// Called in a broker connection thread when the bridge is created.
+// shared_ptr to self ensures we are not deleted before initializeBridge is called.
+void QueueReplicator::initializeBridge(Bridge& bridge, SessionHandler& sessionHandler,
+ boost::shared_ptr<QueueReplicator> /*self*/) {
+ sys::Mutex::ScopedLock l(lock);
+ bridgeName = bridge.getName();
+ framing::AMQP_ServerProxy peer(sessionHandler.out);
+ const qmf::org::apache::qpid::broker::ArgsLinkBridge& args(bridge.getArgs());
+ framing::FieldTable settings;
+
+ // FIXME aconway 2011-12-09: Failover optimization removed.
+ // There was code here to re-use messages already on the backup
+ // during fail-over. This optimization was removed to simplify
+ // the logic till we get the basic replication stable, it
+ // can be re-introduced later. Last revision with the optimization:
+ // r1213258 | QPID-3603: Fix QueueReplicator subscription parameters.
+
+ // Clear out any old messages, reset the queue to start replicating fresh.
+ queue->purge();
+ queue->setPosition(0);
+
+ settings.setInt(ReplicatingSubscription::QPID_REPLICATING_SUBSCRIPTION, 1);
+ // TODO aconway 2011-12-19: optimize.
+ settings.setInt(QPID_SYNC_FREQUENCY, 1);
+ peer.getMessage().subscribe(args.i_src, args.i_dest, 0/*accept-explicit*/, 1/*not-acquired*/, false/*exclusive*/, "", 0, settings);
+ peer.getMessage().flow(getName(), 0, 0xFFFFFFFF);
+ peer.getMessage().flow(getName(), 1, 0xFFFFFFFF);
+ QPID_LOG(debug, logPrefix << "Activated bridge " << bridgeName);
+}
+
+namespace {
+template <class T> T decodeContent(Message& m) {
+ std::string content;
+ m.getFrames().getContent(content);
+ Buffer buffer(const_cast<char*>(content.c_str()), content.size());
+ T result;
+ result.decode(buffer);
+ return result;
+}
+}
+
+void QueueReplicator::dequeue(SequenceNumber n, const sys::Mutex::ScopedLock&) {
+ // Thread safe: only calls thread safe Queue functions.
+ if (queue->getPosition() >= n) { // Ignore messages we haven't reached yet
+ QueuedMessage message;
+ if (queue->acquireMessageAt(n, message))
+ queue->dequeue(0, message);
+ }
+}
+
+// Called in connection thread of the queues bridge to primary.
+void QueueReplicator::route(Deliverable& msg, const std::string& key, const FieldTable*)
+{
+ sys::Mutex::ScopedLock l(lock);
+ if (key == DEQUEUE_EVENT_KEY) {
+ SequenceSet dequeues = decodeContent<SequenceSet>(msg.getMessage());
+ QPID_LOG(trace, logPrefix << "Dequeue: " << dequeues);
+ //TODO: should be able to optimise the following
+ for (SequenceSet::iterator i = dequeues.begin(); i != dequeues.end(); i++)
+ dequeue(*i, l);
+ } else if (key == POSITION_EVENT_KEY) {
+ SequenceNumber position = decodeContent<SequenceNumber>(msg.getMessage());
+ QPID_LOG(trace, logPrefix << "Position moved from " << queue->getPosition()
+ << " to " << position);
+ assert(queue->getPosition() <= position);
+ //TODO aconway 2011-12-14: Optimize this?
+ for (SequenceNumber i = queue->getPosition(); i < position; ++i)
+ dequeue(i,l);
+ queue->setPosition(position);
+ } else {
+ msg.deliverTo(queue);
+ QPID_LOG(trace, logPrefix << "Enqueued message " << queue->getPosition());
+ }
+}
+
+// Unused Exchange methods.
+bool QueueReplicator::bind(boost::shared_ptr<Queue>, const std::string&, const FieldTable*) { return false; }
+bool QueueReplicator::unbind(boost::shared_ptr<Queue>, const std::string&, const FieldTable*) { return false; }
+bool QueueReplicator::isBound(boost::shared_ptr<Queue>, const std::string* const, const FieldTable* const) { return false; }
+std::string QueueReplicator::getType() const { return TYPE_NAME; }
+
+}} // namespace qpid::broker
diff --git a/cpp/src/qpid/ha/QueueReplicator.h b/cpp/src/qpid/ha/QueueReplicator.h
new file mode 100644
index 0000000000..9de7dd480c
--- /dev/null
+++ b/cpp/src/qpid/ha/QueueReplicator.h
@@ -0,0 +1,86 @@
+#ifndef QPID_HA_QUEUEREPLICATOR_H
+#define QPID_HA_QUEUEREPLICATOR_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "qpid/broker/Exchange.h"
+#include "qpid/framing/SequenceSet.h"
+#include <boost/enable_shared_from_this.hpp>
+#include <iosfwd>
+
+namespace qpid {
+
+namespace broker {
+class Bridge;
+class Link;
+class Queue;
+class QueueRegistry;
+class SessionHandler;
+class Deliverable;
+}
+
+namespace ha {
+
+/**
+ * Exchange created on a backup broker to replicate a queue on the primary.
+ *
+ * Puts replicated messages on the local queue, handles dequeue events.
+ * Creates a ReplicatingSubscription on the primary by passing special
+ * arguments to the consume command.
+ *
+ * THREAD SAFE: Called in different connection threads.
+ */
+class QueueReplicator : public broker::Exchange,
+ public boost::enable_shared_from_this<QueueReplicator>
+{
+ public:
+ static const std::string DEQUEUE_EVENT_KEY;
+ static const std::string POSITION_EVENT_KEY;
+ static std::string replicatorName(const std::string& queueName);
+
+ QueueReplicator(boost::shared_ptr<broker::Queue> q, boost::shared_ptr<broker::Link> l);
+ ~QueueReplicator();
+
+ void activate(); // Call after ctor
+ void deactivate(); // Call before dtor
+
+ std::string getType() const;
+ bool bind(boost::shared_ptr<broker::Queue
+ >, const std::string&, const framing::FieldTable*);
+ bool unbind(boost::shared_ptr<broker::Queue>, const std::string&, const framing::FieldTable*);
+ void route(broker::Deliverable&, const std::string&, const framing::FieldTable*);
+ bool isBound(boost::shared_ptr<broker::Queue>, const std::string* const, const framing::FieldTable* const);
+
+ private:
+ void initializeBridge(broker::Bridge& bridge, broker::SessionHandler& sessionHandler,
+ boost::shared_ptr<QueueReplicator> self);
+ void dequeue(framing::SequenceNumber, const sys::Mutex::ScopedLock&);
+
+ std::string logPrefix;
+ std::string bridgeName;
+ sys::Mutex lock;
+ boost::shared_ptr<broker::Queue> queue;
+ boost::shared_ptr<broker::Link> link;
+};
+
+}} // namespace qpid::ha
+
+#endif /*!QPID_HA_QUEUEREPLICATOR_H*/
diff --git a/cpp/src/qpid/ha/ReplicatingSubscription.cpp b/cpp/src/qpid/ha/ReplicatingSubscription.cpp
new file mode 100644
index 0000000000..af6180305d
--- /dev/null
+++ b/cpp/src/qpid/ha/ReplicatingSubscription.cpp
@@ -0,0 +1,293 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "ReplicatingSubscription.h"
+#include "qpid/broker/Queue.h"
+#include "qpid/broker/SessionContext.h"
+#include "qpid/broker/ConnectionState.h"
+#include "qpid/framing/AMQFrame.h"
+#include "qpid/framing/MessageTransferBody.h"
+#include "qpid/log/Statement.h"
+#include <sstream>
+
+namespace qpid {
+namespace ha {
+
+using namespace framing;
+using namespace broker;
+using namespace std;
+
+const string ReplicatingSubscription::QPID_REPLICATING_SUBSCRIPTION("qpid.replicating-subscription");
+
+namespace {
+const string DOLLAR("$");
+const string INTERNAL("-internal");
+} // namespace
+
+string mask(const string& in)
+{
+ return DOLLAR + in + INTERNAL;
+}
+
+/* Called by SemanticState::consume to create a consumer */
+boost::shared_ptr<broker::SemanticState::ConsumerImpl>
+ReplicatingSubscription::Factory::create(
+ SemanticState* parent,
+ const string& name,
+ Queue::shared_ptr queue,
+ bool ack,
+ bool acquire,
+ bool exclusive,
+ const string& tag,
+ const string& resumeId,
+ uint64_t resumeTtl,
+ const framing::FieldTable& arguments
+) {
+ boost::shared_ptr<ReplicatingSubscription> rs;
+ if (arguments.isSet(QPID_REPLICATING_SUBSCRIPTION)) {
+ rs.reset(new ReplicatingSubscription(
+ parent, name, queue, ack, acquire, exclusive, tag,
+ resumeId, resumeTtl, arguments));
+ queue->addObserver(rs);
+ }
+ return rs;
+}
+
+ReplicatingSubscription::ReplicatingSubscription(
+ SemanticState* parent,
+ const string& name,
+ Queue::shared_ptr queue,
+ bool ack,
+ bool acquire,
+ bool exclusive,
+ const string& tag,
+ const string& resumeId,
+ uint64_t resumeTtl,
+ const framing::FieldTable& arguments
+) : ConsumerImpl(parent, name, queue, ack, acquire, exclusive, tag,
+ resumeId, resumeTtl, arguments),
+ events(new Queue(mask(name))),
+ consumer(new DelegatingConsumer(*this))
+{
+ stringstream ss;
+ ss << "HA: Primary: " << getQueue()->getName() << " at "
+ << parent->getSession().getConnection().getUrl() << ": ";
+ logPrefix = ss.str();
+
+ // FIXME aconway 2011-12-09: Failover optimization removed.
+ // There was code here to re-use messages already on the backup
+ // during fail-over. This optimization was removed to simplify
+ // the logic till we get the basic replication stable, it
+ // can be re-introduced later. Last revision with the optimization:
+ // r1213258 | QPID-3603: Fix QueueReplicator subscription parameters.
+
+ QPID_LOG(debug, logPrefix << "Created backup subscription " << getName());
+
+ // FIXME aconway 2011-12-15: ConsumerImpl::position is left at 0
+ // so we will start consuming from the lowest numbered message.
+ // This is incorrect if the sequence number wraps around, but
+ // this is what all consumers currently do.
+}
+
+// Message is delivered in the subscription's connection thread.
+bool ReplicatingSubscription::deliver(QueuedMessage& m) {
+ // Add position events for the subscribed queue, not for the internal event queue.
+ if (m.queue && m.queue == getQueue().get()) {
+ sys::Mutex::ScopedLock l(lock);
+ assert(position == m.position);
+ // m.position is the position of the newly enqueued m on the local queue.
+ // backupPosition is latest position on the backup queue (before enqueueing m.)
+ assert(m.position > backupPosition);
+ if (m.position - backupPosition > 1) {
+ // Position has advanced because of messages dequeued ahead of us.
+ SequenceNumber send(m.position);
+ --send; // Send the position before m was enqueued.
+ sendPositionEvent(send, l);
+ }
+ backupPosition = m.position;
+ QPID_LOG(trace, logPrefix << "Replicating message " << m.position);
+ }
+ return ConsumerImpl::deliver(m);
+}
+
+ReplicatingSubscription::~ReplicatingSubscription() {}
+
+
+// INVARIANT: delayed contains msg <=> we have outstanding startCompletion on msg
+
+// Mark a message completed. May be called by acknowledge or dequeued
+void ReplicatingSubscription::complete(
+ const QueuedMessage& qm, const sys::Mutex::ScopedLock&)
+{
+ // Handle completions for the subscribed queue, not the internal event queue.
+ if (qm.queue && qm.queue == getQueue().get()) {
+ QPID_LOG(trace, logPrefix << "Completed message " << qm.position);
+ Delayed::iterator i= delayed.find(qm.position);
+ // The same message can be completed twice, by acknowledged and
+ // dequeued, remove it from the set so it only gets completed
+ // once.
+ if (i != delayed.end()) {
+ assert(i->second.payload == qm.payload);
+ qm.payload->getIngressCompletion().finishCompleter();
+ delayed.erase(i);
+ }
+ }
+}
+
+// Called before we get notified of the message being available and
+// under the message lock in the queue. Called in arbitrary connection thread.
+void ReplicatingSubscription::enqueued(const QueuedMessage& qm) {
+ sys::Mutex::ScopedLock l(lock);
+ // Delay completion
+ QPID_LOG(trace, logPrefix << "Delaying completion of message " << qm.position);
+ qm.payload->getIngressCompletion().startCompleter();
+ assert(delayed.find(qm.position) == delayed.end());
+ delayed[qm.position] = qm;
+}
+
+
+// Function to complete a delayed message, called by cancel()
+void ReplicatingSubscription::cancelComplete(
+ const Delayed::value_type& v, const sys::Mutex::ScopedLock&)
+{
+ QPID_LOG(trace, logPrefix << "Cancel completed message " << v.second.position);
+ v.second.payload->getIngressCompletion().finishCompleter();
+}
+
+// Called in the subscription's connection thread.
+void ReplicatingSubscription::cancel()
+{
+ getQueue()->removeObserver(
+ boost::dynamic_pointer_cast<QueueObserver>(shared_from_this()));
+ {
+ sys::Mutex::ScopedLock l(lock);
+ QPID_LOG(debug, logPrefix <<"Cancelled backup subscription " << getName());
+ for_each(delayed.begin(), delayed.end(),
+ boost::bind(&ReplicatingSubscription::cancelComplete, this, _1, boost::ref(l)));
+ delayed.clear();
+ }
+ ConsumerImpl::cancel();
+}
+
+// Called on primary in the backups IO thread.
+void ReplicatingSubscription::acknowledged(const QueuedMessage& msg) {
+ sys::Mutex::ScopedLock l(lock);
+ // Finish completion of message, it has been acknowledged by the backup.
+ complete(msg, l);
+}
+
+// Hide the "queue deleted" error for a ReplicatingSubscription when a
+// queue is deleted, this is normal and not an error.
+bool ReplicatingSubscription::hideDeletedError() { return true; }
+
+// Called with lock held. Called in subscription's connection thread.
+void ReplicatingSubscription::sendDequeueEvent(const sys::Mutex::ScopedLock& l)
+{
+ QPID_LOG(trace, logPrefix << "Sending dequeues " << dequeues);
+ string buf(dequeues.encodedSize(),'\0');
+ framing::Buffer buffer(&buf[0], buf.size());
+ dequeues.encode(buffer);
+ dequeues.clear();
+ buffer.reset();
+ sendEvent(QueueReplicator::DEQUEUE_EVENT_KEY, buffer, l);
+}
+
+// Called after the message has been removed from the deque and under
+// the messageLock in the queue. Called in arbitrary connection threads.
+void ReplicatingSubscription::dequeued(const QueuedMessage& qm)
+{
+ {
+ sys::Mutex::ScopedLock l(lock);
+ QPID_LOG(trace, logPrefix << "Dequeued message " << qm.position);
+ dequeues.add(qm.position);
+ // If we have not yet sent this message to the backup, then
+ // complete it now as it will never be accepted.
+ if (qm.position > position) complete(qm, l);
+ }
+ notify(); // Ensure a call to doDispatch
+}
+
+// Called with lock held. Called in subscription's connection thread.
+void ReplicatingSubscription::sendPositionEvent(
+ SequenceNumber position, const sys::Mutex::ScopedLock&l )
+{
+ QPID_LOG(trace, logPrefix << "Sending position " << position
+ << ", was " << backupPosition);
+ string buf(backupPosition.encodedSize(),'\0');
+ framing::Buffer buffer(&buf[0], buf.size());
+ position.encode(buffer);
+ buffer.reset();
+ sendEvent(QueueReplicator::POSITION_EVENT_KEY, buffer, l);
+}
+
+void ReplicatingSubscription::sendEvent(const std::string& key, framing::Buffer& buffer,
+ const sys::Mutex::ScopedLock&)
+{
+ //generate event message
+ boost::intrusive_ptr<Message> event = new Message();
+ AMQFrame method((MessageTransferBody(ProtocolVersion(), string(), 0, 0)));
+ AMQFrame header((AMQHeaderBody()));
+ AMQFrame content((AMQContentBody()));
+ content.castBody<AMQContentBody>()->decode(buffer, buffer.getSize());
+ header.setBof(false);
+ header.setEof(false);
+ header.setBos(true);
+ header.setEos(true);
+ content.setBof(false);
+ content.setEof(true);
+ content.setBos(true);
+ content.setEos(true);
+ event->getFrames().append(method);
+ event->getFrames().append(header);
+ event->getFrames().append(content);
+
+ DeliveryProperties* props = event->getFrames().getHeaders()->get<DeliveryProperties>(true);
+ props->setRoutingKey(key);
+ // Send the event using the events queue. Consumer is a
+ // DelegatingConsumer that delegates to *this for everything but
+ // has an independnet position. We put an event on events and
+ // dispatch it through ourselves to send it in line with the
+ // normal browsing messages.
+ events->deliver(event);
+ events->dispatch(consumer);
+}
+
+
+// Called in subscription's connection thread.
+bool ReplicatingSubscription::doDispatch()
+{
+ {
+ sys::Mutex::ScopedLock l(lock);
+ if (!dequeues.empty()) sendDequeueEvent(l);
+ }
+ return ConsumerImpl::doDispatch();
+}
+
+ReplicatingSubscription::DelegatingConsumer::DelegatingConsumer(ReplicatingSubscription& c) : Consumer(c.getName(), true), delegate(c) {}
+ReplicatingSubscription::DelegatingConsumer::~DelegatingConsumer() {}
+bool ReplicatingSubscription::DelegatingConsumer::deliver(QueuedMessage& m) { return delegate.deliver(m); }
+void ReplicatingSubscription::DelegatingConsumer::notify() { delegate.notify(); }
+bool ReplicatingSubscription::DelegatingConsumer::filter(boost::intrusive_ptr<Message> msg) { return delegate.filter(msg); }
+bool ReplicatingSubscription::DelegatingConsumer::accept(boost::intrusive_ptr<Message> msg) { return delegate.accept(msg); }
+bool ReplicatingSubscription::DelegatingConsumer::browseAcquired() const { return delegate.browseAcquired(); }
+OwnershipToken* ReplicatingSubscription::DelegatingConsumer::getSession() { return delegate.getSession(); }
+
+}} // namespace qpid::ha
diff --git a/cpp/src/qpid/ha/ReplicatingSubscription.h b/cpp/src/qpid/ha/ReplicatingSubscription.h
new file mode 100644
index 0000000000..e311f9505a
--- /dev/null
+++ b/cpp/src/qpid/ha/ReplicatingSubscription.h
@@ -0,0 +1,134 @@
+#ifndef QPID_BROKER_REPLICATINGSUBSCRIPTION_H
+#define QPID_BROKER_REPLICATINGSUBSCRIPTION_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "QueueReplicator.h" // For DEQUEUE_EVENT_KEY
+#include "qpid/broker/SemanticState.h"
+#include "qpid/broker/QueueObserver.h"
+#include "qpid/broker/ConsumerFactory.h"
+#include <iosfwd>
+
+namespace qpid {
+
+namespace broker {
+class Message;
+class Queue;
+class QueuedMessage;
+class OwnershipToken;
+}
+
+namespace framing {
+class Buffer;
+}
+
+namespace ha {
+
+/**
+ * A susbcription that represents a backup replicating a queue.
+ *
+ * Runs on the primary. Delays completion of messages till the backup
+ * has acknowledged, informs backup of locally dequeued messages.
+ *
+ * THREAD SAFE: Used as a consumer in subscription's connection
+ * thread, and as a QueueObserver in arbitrary connection threads.
+ */
+class ReplicatingSubscription : public broker::SemanticState::ConsumerImpl,
+ public broker::QueueObserver
+{
+ public:
+ struct Factory : public broker::ConsumerFactory {
+ boost::shared_ptr<broker::SemanticState::ConsumerImpl> create(
+ broker::SemanticState* parent,
+ const std::string& name, boost::shared_ptr<broker::Queue> ,
+ bool ack, bool acquire, bool exclusive, const std::string& tag,
+ const std::string& resumeId, uint64_t resumeTtl,
+ const framing::FieldTable& arguments);
+ };
+
+ // Argument names for consume command.
+ static const std::string QPID_REPLICATING_SUBSCRIPTION;
+
+ ReplicatingSubscription(broker::SemanticState* parent,
+ const std::string& name, boost::shared_ptr<broker::Queue> ,
+ bool ack, bool acquire, bool exclusive, const std::string& tag,
+ const std::string& resumeId, uint64_t resumeTtl,
+ const framing::FieldTable& arguments);
+
+ ~ReplicatingSubscription();
+
+ // QueueObserver overrides.
+ bool deliver(broker::QueuedMessage& msg);
+ void enqueued(const broker::QueuedMessage&);
+ void dequeued(const broker::QueuedMessage&);
+ void acquired(const broker::QueuedMessage&) {}
+ void requeued(const broker::QueuedMessage&) {}
+
+ // Consumer overrides.
+ void cancel();
+ void acknowledged(const broker::QueuedMessage&);
+ bool browseAcquired() const { return true; }
+
+ bool hideDeletedError();
+
+ protected:
+ bool doDispatch();
+ private:
+ typedef std::map<framing::SequenceNumber, broker::QueuedMessage> Delayed;
+ std::string logPrefix;
+ boost::shared_ptr<broker::Queue> events;
+ boost::shared_ptr<broker::Consumer> consumer;
+ Delayed delayed;
+ framing::SequenceSet dequeues;
+ framing::SequenceNumber backupPosition;
+
+ void complete(const broker::QueuedMessage&, const sys::Mutex::ScopedLock&);
+ void cancelComplete(const Delayed::value_type& v, const sys::Mutex::ScopedLock&);
+ void sendDequeueEvent(const sys::Mutex::ScopedLock&);
+ void sendPositionEvent(framing::SequenceNumber, const sys::Mutex::ScopedLock&);
+ void sendEvent(const std::string& key, framing::Buffer&,
+ const sys::Mutex::ScopedLock&);
+
+ class DelegatingConsumer : public Consumer
+ {
+ public:
+ DelegatingConsumer(ReplicatingSubscription&);
+ ~DelegatingConsumer();
+ bool deliver(broker::QueuedMessage& msg);
+ void notify();
+ bool filter(boost::intrusive_ptr<broker::Message>);
+ bool accept(boost::intrusive_ptr<broker::Message>);
+ void cancel() {}
+ void acknowledged(const broker::QueuedMessage&) {}
+ bool browseAcquired() const;
+
+ broker::OwnershipToken* getSession();
+
+ private:
+ ReplicatingSubscription& delegate;
+ };
+};
+
+
+}} // namespace qpid::broker
+
+#endif /*!QPID_BROKER_REPLICATINGSUBSCRIPTION_H*/
diff --git a/cpp/src/qpid/ha/Settings.h b/cpp/src/qpid/ha/Settings.h
new file mode 100644
index 0000000000..049c873b9f
--- /dev/null
+++ b/cpp/src/qpid/ha/Settings.h
@@ -0,0 +1,45 @@
+#ifndef QPID_HA_SETTINGS_H
+#define QPID_HA_SETTINGS_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <string>
+
+namespace qpid {
+namespace ha {
+
+/**
+ * Configurable settings for HA.
+ */
+class Settings
+{
+ public:
+ Settings() : enabled(false) {}
+ bool enabled;
+ std::string clientUrl;
+ std::string brokerUrl;
+ std::string username, password, mechanism;
+ private:
+};
+}} // namespace qpid::ha
+
+#endif /*!QPID_HA_SETTINGS_H*/
diff --git a/cpp/src/qpid/ha/management-schema.xml b/cpp/src/qpid/ha/management-schema.xml
new file mode 100644
index 0000000000..fe4a14d111
--- /dev/null
+++ b/cpp/src/qpid/ha/management-schema.xml
@@ -0,0 +1,38 @@
+<schema package="org.apache.qpid.ha">
+
+ <!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+ -->
+
+ <!-- Monitor and control HA status of a broker. -->
+ <class name="HaBroker">
+ <property name="name" type="sstr" access="RC" index="y" desc="Primary Key"/>
+ <property name="status" type="sstr" desc="HA status: primary or backup"/>
+ <property name="clientAddresses" type="sstr" desc="List of addresses used by clients to connect to the HA cluster."/>
+ <property name="brokerAddresses" type="sstr" desc="List of addresses used by HA brokers to connect to each other."/>
+
+ <method name="promote" desc="Promote a backup broker to primary."/>
+ <method name="setClientAddresses" desc="Set HA client addresses">
+ <arg name="clientAddresses" type="sstr" dir="I"/>
+ </method>
+ <method name="setBrokerAddresses" desc="Set HA broker addresses">
+ <arg name="brokerAddresses" type="sstr" dir="I"/>
+ </method>
+ </class>
+
+</schema>