diff options
| author | Alan Conway <aconway@apache.org> | 2012-02-17 14:54:46 +0000 |
|---|---|---|
| committer | Alan Conway <aconway@apache.org> | 2012-02-17 14:54:46 +0000 |
| commit | 0a8773c335509c2b9e9b96df360de190a266dcad (patch) | |
| tree | 288469c17dacc37199b5f77498965fee7e778d95 /cpp/src/qpid/ha | |
| parent | d82ce6836f7f0e4f7d647b2dc603141f549869d3 (diff) | |
| download | qpid-python-0a8773c335509c2b9e9b96df360de190a266dcad.tar.gz | |
QPID-3603: Merge new HA foundations.
Merged from qpid-3603-7. This is basic support for the new HA approach.
For information & limitations see qpid/cpp/design_docs/new-ha-design.txt.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1245587 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/ha')
| -rw-r--r-- | cpp/src/qpid/ha/Backup.cpp | 90 | ||||
| -rw-r--r-- | cpp/src/qpid/ha/Backup.h | 67 | ||||
| -rw-r--r-- | cpp/src/qpid/ha/BrokerReplicator.cpp | 497 | ||||
| -rw-r--r-- | cpp/src/qpid/ha/BrokerReplicator.h | 85 | ||||
| -rw-r--r-- | cpp/src/qpid/ha/ConnectionExcluder.cpp | 40 | ||||
| -rw-r--r-- | cpp/src/qpid/ha/ConnectionExcluder.h | 54 | ||||
| -rw-r--r-- | cpp/src/qpid/ha/HaBroker.cpp | 137 | ||||
| -rw-r--r-- | cpp/src/qpid/ha/HaBroker.h | 74 | ||||
| -rw-r--r-- | cpp/src/qpid/ha/HaPlugin.cpp | 67 | ||||
| -rw-r--r-- | cpp/src/qpid/ha/QueueReplicator.cpp | 174 | ||||
| -rw-r--r-- | cpp/src/qpid/ha/QueueReplicator.h | 86 | ||||
| -rw-r--r-- | cpp/src/qpid/ha/ReplicatingSubscription.cpp | 293 | ||||
| -rw-r--r-- | cpp/src/qpid/ha/ReplicatingSubscription.h | 134 | ||||
| -rw-r--r-- | cpp/src/qpid/ha/Settings.h | 45 | ||||
| -rw-r--r-- | cpp/src/qpid/ha/management-schema.xml | 38 |
15 files changed, 1881 insertions, 0 deletions
diff --git a/cpp/src/qpid/ha/Backup.cpp b/cpp/src/qpid/ha/Backup.cpp new file mode 100644 index 0000000000..5acbfb9d5f --- /dev/null +++ b/cpp/src/qpid/ha/Backup.cpp @@ -0,0 +1,90 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "Backup.h" +#include "Settings.h" +#include "BrokerReplicator.h" +#include "ReplicatingSubscription.h" +#include "ConnectionExcluder.h" +#include "qpid/Url.h" +#include "qpid/amqp_0_10/Codecs.h" +#include "qpid/broker/Bridge.h" +#include "qpid/broker/Broker.h" +#include "qpid/broker/SessionHandler.h" +#include "qpid/broker/Link.h" +#include "qpid/framing/AMQP_ServerProxy.h" +#include "qpid/framing/AMQFrame.h" +#include "qpid/framing/FieldTable.h" +#include "qpid/framing/MessageTransferBody.h" +#include "qpid/types/Variant.h" + +namespace qpid { +namespace ha { + +using namespace framing; +using namespace broker; +using types::Variant; +using std::string; + +Backup::Backup(broker::Broker& b, const Settings& s) : + broker(b), settings(s), excluder(new ConnectionExcluder()) +{ + // Empty brokerUrl means delay initialization until setUrl() is called. + if (!s.brokerUrl.empty()) initialize(Url(s.brokerUrl)); +} + +void Backup::initialize(const Url& url) { + assert(!url.empty()); + QPID_LOG(notice, "Ha: Backup started: " << url); + string protocol = url[0].protocol.empty() ? "tcp" : url[0].protocol; + // Declare the link + std::pair<Link::shared_ptr, bool> result = broker.getLinks().declare( + url[0].host, url[0].port, protocol, + false, // durable + settings.mechanism, settings.username, settings.password); + assert(result.second); // FIXME aconway 2011-11-23: error handling + link = result.first; + link->setUrl(url); + + replicator.reset(new BrokerReplicator(link)); + broker.getExchanges().registerExchange(replicator); + broker.getConnectionObservers().add(excluder); +} + +void Backup::setBrokerUrl(const Url& url) { + // Ignore empty URLs seen during start-up for some tests. + if (url.empty()) return; + sys::Mutex::ScopedLock l(lock); + if (link) { // URL changed after we initialized. + QPID_LOG(info, "HA: Backup failover URL set to " << url); + link->setUrl(url); + } + else { + initialize(url); // Deferred initialization + } +} + +Backup::~Backup() { + if (link) link->close(); + if (replicator.get()) broker.getExchanges().destroy(replicator->getName()); + broker.getConnectionObservers().remove(excluder); // This allows client connections. +} + +}} // namespace qpid::ha diff --git a/cpp/src/qpid/ha/Backup.h b/cpp/src/qpid/ha/Backup.h new file mode 100644 index 0000000000..526b238b82 --- /dev/null +++ b/cpp/src/qpid/ha/Backup.h @@ -0,0 +1,67 @@ +#ifndef QPID_HA_BACKUP_H +#define QPID_HA_BACKUP_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "Settings.h" +#include "qpid/Url.h" +#include "qpid/sys/Mutex.h" +#include <boost/shared_ptr.hpp> + +namespace qpid { + +namespace broker { +class Broker; +class Link; +} + +namespace ha { +class Settings; +class ConnectionExcluder; +class BrokerReplicator; + +/** + * State associated with a backup broker. Manages connections to primary. + * + * THREAD SAFE + */ +class Backup +{ + public: + Backup(broker::Broker&, const Settings&); + ~Backup(); + void setBrokerUrl(const Url&); + + private: + void initialize(const Url&); + + sys::Mutex lock; + broker::Broker& broker; + Settings settings; + boost::shared_ptr<broker::Link> link; + boost::shared_ptr<BrokerReplicator> replicator; + boost::shared_ptr<ConnectionExcluder> excluder; +}; + +}} // namespace qpid::ha + +#endif /*!QPID_HA_BACKUP_H*/ diff --git a/cpp/src/qpid/ha/BrokerReplicator.cpp b/cpp/src/qpid/ha/BrokerReplicator.cpp new file mode 100644 index 0000000000..a8f05c1fe3 --- /dev/null +++ b/cpp/src/qpid/ha/BrokerReplicator.cpp @@ -0,0 +1,497 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "BrokerReplicator.h" +#include "QueueReplicator.h" +#include "qpid/broker/Broker.h" +#include "qpid/broker/Queue.h" +#include "qpid/broker/Link.h" +#include "qpid/framing/FieldTable.h" +#include "qpid/log/Statement.h" +#include "qpid/amqp_0_10/Codecs.h" +#include "qpid/broker/SessionHandler.h" +#include "qpid/framing/reply_exceptions.h" +#include "qpid/framing/MessageTransferBody.h" +#include "qmf/org/apache/qpid/broker/EventBind.h" +#include "qmf/org/apache/qpid/broker/EventUnbind.h" +#include "qmf/org/apache/qpid/broker/EventExchangeDeclare.h" +#include "qmf/org/apache/qpid/broker/EventExchangeDelete.h" +#include "qmf/org/apache/qpid/broker/EventQueueDeclare.h" +#include "qmf/org/apache/qpid/broker/EventQueueDelete.h" +#include "qmf/org/apache/qpid/broker/EventSubscribe.h" +#include <algorithm> + +namespace qpid { +namespace ha { + +using qmf::org::apache::qpid::broker::EventBind; +using qmf::org::apache::qpid::broker::EventUnbind; +using qmf::org::apache::qpid::broker::EventExchangeDeclare; +using qmf::org::apache::qpid::broker::EventExchangeDelete; +using qmf::org::apache::qpid::broker::EventQueueDeclare; +using qmf::org::apache::qpid::broker::EventQueueDelete; +using qmf::org::apache::qpid::broker::EventSubscribe; +using namespace framing; +using std::string; +using types::Variant; +using namespace broker; + +namespace { + +const string QPID_CONFIGURATION_REPLICATOR("qpid.configuration-replicator"); +const string QPID_REPLICATE("qpid.replicate"); + +const string CLASS_NAME("_class_name"); +const string EVENT("_event"); +const string OBJECT_NAME("_object_name"); +const string PACKAGE_NAME("_package_name"); +const string QUERY_RESPONSE("_query_response"); +const string SCHEMA_ID("_schema_id"); +const string VALUES("_values"); + +const string ALTEX("altEx"); +const string ARGS("args"); +const string ARGUMENTS("arguments"); +const string AUTODEL("autoDel"); +const string AUTODELETE("autoDelete"); +const string BIND("bind"); +const string UNBIND("unbind"); +const string BINDING("binding"); +const string CREATED("created"); +const string DISP("disp"); +const string DURABLE("durable"); +const string EXCHANGE("exchange"); +const string EXNAME("exName"); +const string EXTYPE("exType"); +const string KEY("key"); +const string NAME("name"); +const string QNAME("qName"); +const string QUEUE("queue"); +const string RHOST("rhost"); +const string TYPE("type"); +const string USER("user"); + +const string AGENT_IND_EVENT_ORG_APACHE_QPID_BROKER("agent.ind.event.org_apache_qpid_broker.#"); +const string QMF2("qmf2"); +const string QMF_CONTENT("qmf.content"); +const string QMF_DEFAULT_TOPIC("qmf.default.topic"); +const string QMF_OPCODE("qmf.opcode"); + +const string _WHAT("_what"); +const string _CLASS_NAME("_class_name"); +const string _PACKAGE_NAME("_package_name"); +const string _SCHEMA_ID("_schema_id"); +const string OBJECT("OBJECT"); +const string ORG_APACHE_QPID_BROKER("org.apache.qpid.broker"); +const string QMF_DEFAULT_DIRECT("qmf.default.direct"); +const string _QUERY_REQUEST("_query_request"); +const string BROKER("broker"); + +bool isQMFv2(const Message& message) { + const framing::MessageProperties* props = message.getProperties<framing::MessageProperties>(); + return props && props->getAppId() == QMF2; +} + +template <class T> bool match(Variant::Map& schema) { + return T::match(schema[CLASS_NAME], schema[PACKAGE_NAME]); +} + +enum ReplicateLevel { RL_NONE=0, RL_CONFIGURATION, RL_MESSAGES }; +const string S_NONE="none"; +const string S_CONFIGURATION="configuration"; +const string S_MESSAGES="messages"; + +ReplicateLevel replicateLevel(const string& level) { + if (level == S_NONE) return RL_NONE; + if (level == S_CONFIGURATION) return RL_CONFIGURATION; + if (level == S_MESSAGES) return RL_MESSAGES; + throw Exception("Invalid value for "+QPID_REPLICATE+": "+level); +} + +ReplicateLevel replicateLevel(const framing::FieldTable& f) { + if (f.isSet(QPID_REPLICATE)) return replicateLevel(f.getAsString(QPID_REPLICATE)); + else return RL_NONE; +} + +ReplicateLevel replicateLevel(const Variant::Map& m) { + Variant::Map::const_iterator i = m.find(QPID_REPLICATE); + if (i != m.end()) return replicateLevel(i->second.asString()); + else return RL_NONE; +} + +void sendQuery(const string className, const string& queueName, SessionHandler& sessionHandler) { + framing::AMQP_ServerProxy peer(sessionHandler.out); + Variant::Map request; + request[_WHAT] = OBJECT; + Variant::Map schema; + schema[_CLASS_NAME] = className; + schema[_PACKAGE_NAME] = ORG_APACHE_QPID_BROKER; + request[_SCHEMA_ID] = schema; + + AMQFrame method((MessageTransferBody(ProtocolVersion(), QMF_DEFAULT_DIRECT, 0, 0))); + method.setBof(true); + method.setEof(false); + method.setBos(true); + method.setEos(true); + AMQHeaderBody headerBody; + MessageProperties* props = headerBody.get<MessageProperties>(true); + props->setReplyTo(qpid::framing::ReplyTo("", queueName)); + props->setAppId(QMF2); + props->getApplicationHeaders().setString(QMF_OPCODE, _QUERY_REQUEST); + headerBody.get<qpid::framing::DeliveryProperties>(true)->setRoutingKey(BROKER); + AMQFrame header(headerBody); + header.setBof(false); + header.setEof(false); + header.setBos(true); + header.setEos(true); + AMQContentBody data; + qpid::amqp_0_10::MapCodec::encode(request, data.getData()); + AMQFrame content(data); + content.setBof(false); + content.setEof(true); + content.setBos(true); + content.setEos(true); + sessionHandler.out->handle(method); + sessionHandler.out->handle(header); + sessionHandler.out->handle(content); +} + +// Like Variant::asMap but treat void value as an empty map. +Variant::Map asMapVoid(const Variant& value) { + if (!value.isVoid()) return value.asMap(); + else return Variant::Map(); +} + +} // namespace + +BrokerReplicator::~BrokerReplicator() {} + +BrokerReplicator::BrokerReplicator(const boost::shared_ptr<Link>& l) + : Exchange(QPID_CONFIGURATION_REPLICATOR), broker(*l->getBroker()), link(l) +{ + QPID_LOG(info, "HA: Backup replicating from " << + link->getTransport() << ":" << link->getHost() << ":" << link->getPort()); + broker.getLinks().declare( + link->getHost(), link->getPort(), + false, // durable + QPID_CONFIGURATION_REPLICATOR, // src + QPID_CONFIGURATION_REPLICATOR, // dest + "", // key + false, // isQueue + false, // isLocal + "", // id/tag + "", // excludes + false, // dynamic + 0, // sync? + boost::bind(&BrokerReplicator::initializeBridge, this, _1, _2) + ); +} + +// This is called in the connection IO thread when the bridge is started. +void BrokerReplicator::initializeBridge(Bridge& bridge, SessionHandler& sessionHandler) { + framing::AMQP_ServerProxy peer(sessionHandler.out); + string queueName = bridge.getQueueName(); + const qmf::org::apache::qpid::broker::ArgsLinkBridge& args(bridge.getArgs()); + + //declare and bind an event queue + peer.getQueue().declare(queueName, "", false, false, true, true, FieldTable()); + peer.getExchange().bind(queueName, QMF_DEFAULT_TOPIC, AGENT_IND_EVENT_ORG_APACHE_QPID_BROKER, FieldTable()); + //subscribe to the queue + peer.getMessage().subscribe(queueName, args.i_dest, 1, 0, false, "", 0, FieldTable()); + peer.getMessage().flow(args.i_dest, 0, 0xFFFFFFFF); + peer.getMessage().flow(args.i_dest, 1, 0xFFFFFFFF); + + //issue a query request for queues and another for exchanges using event queue as the reply-to address + sendQuery(QUEUE, queueName, sessionHandler); + sendQuery(EXCHANGE, queueName, sessionHandler); + sendQuery(BINDING, queueName, sessionHandler); + QPID_LOG(debug, "HA: Backup activated configuration bridge: " << queueName); +} + +// FIXME aconway 2011-12-02: error handling in route. +void BrokerReplicator::route(Deliverable& msg, const string& /*key*/, const framing::FieldTable* headers) { + Variant::List list; + try { + if (!isQMFv2(msg.getMessage()) || !headers) + throw Exception("Unexpected message, not QMF2 event or query response."); + // decode as list + string content = msg.getMessage().getFrames().getContent(); + amqp_0_10::ListCodec::decode(content, list); + + if (headers->getAsString(QMF_CONTENT) == EVENT) { + for (Variant::List::iterator i = list.begin(); i != list.end(); ++i) { + Variant::Map& map = i->asMap(); + Variant::Map& schema = map[SCHEMA_ID].asMap(); + Variant::Map& values = map[VALUES].asMap(); + if (match<EventQueueDeclare>(schema)) doEventQueueDeclare(values); + else if (match<EventQueueDelete>(schema)) doEventQueueDelete(values); + else if (match<EventExchangeDeclare>(schema)) doEventExchangeDeclare(values); + else if (match<EventExchangeDelete>(schema)) doEventExchangeDelete(values); + else if (match<EventBind>(schema)) doEventBind(values); + else if (match<EventUnbind>(schema)) doEventUnbind(values); + } + } else if (headers->getAsString(QMF_OPCODE) == QUERY_RESPONSE) { + for (Variant::List::iterator i = list.begin(); i != list.end(); ++i) { + string type = i->asMap()[SCHEMA_ID].asMap()[CLASS_NAME]; + Variant::Map& values = i->asMap()[VALUES].asMap(); + framing::FieldTable args; + amqp_0_10::translate(asMapVoid(values[ARGUMENTS]), args); + if (type == QUEUE) doResponseQueue(values); + else if (type == EXCHANGE) doResponseExchange(values); + else if (type == BINDING) doResponseBind(values); + else QPID_LOG(error, "HA: Backup received unknown response type=" << type + << " values=" << values); + } + } else QPID_LOG(error, "HA: Backup received unexpected message: " << *headers); + } catch (const std::exception& e) { + QPID_LOG(error, "HA: Backup replication error: " << e.what() << ": while handling: " << list); + } +} + +void BrokerReplicator::doEventQueueDeclare(Variant::Map& values) { + string name = values[QNAME].asString(); + Variant::Map argsMap = asMapVoid(values[ARGS]); + if (values[DISP] == CREATED && replicateLevel(argsMap)) { + framing::FieldTable args; + amqp_0_10::translate(argsMap, args); + std::pair<boost::shared_ptr<Queue>, bool> result = + broker.createQueue( + name, + values[DURABLE].asBool(), + values[AUTODEL].asBool(), + 0 /*i.e. no owner regardless of exclusivity on master*/, + values[ALTEX].asString(), + args, + values[USER].asString(), + values[RHOST].asString()); + if (result.second) { + // FIXME aconway 2011-11-22: should delete old queue and + // re-create from event. + // Events are always up to date, whereas responses may be + // out of date. + QPID_LOG(debug, "HA: Backup created queue: " << name); + startQueueReplicator(result.first); + } else { + // FIXME aconway 2011-12-02: what's the right way to handle this? + QPID_LOG(warning, "HA: Backup queue already exists: " << name); + } + } +} + +void BrokerReplicator::doEventQueueDelete(Variant::Map& values) { + // The remote queue has already been deleted so replicator + // sessions may be closed by a "queue deleted" exception. + string name = values[QNAME].asString(); + boost::shared_ptr<Queue> queue = broker.getQueues().find(name); + if (queue && replicateLevel(queue->getSettings())) { + QPID_LOG(debug, "HA: Backup deleting queue: " << name); + string rname = QueueReplicator::replicatorName(name); + boost::shared_ptr<broker::Exchange> ex = broker.getExchanges().find(rname); + boost::shared_ptr<QueueReplicator> qr = boost::dynamic_pointer_cast<QueueReplicator>(ex); + if (qr) qr->deactivate(); + // QueueReplicator's bridge is now queued for destruction but may not + // actually be destroyed, deleting the exhange + broker.getExchanges().destroy(rname); + broker.deleteQueue(name, values[USER].asString(), values[RHOST].asString()); + } +} + +void BrokerReplicator::doEventExchangeDeclare(Variant::Map& values) { + Variant::Map argsMap(asMapVoid(values[ARGS])); + if (values[DISP] == CREATED && replicateLevel(argsMap)) { + string name = values[EXNAME].asString(); + framing::FieldTable args; + amqp_0_10::translate(argsMap, args); + if (broker.createExchange( + name, + values[EXTYPE].asString(), + values[DURABLE].asBool(), + values[ALTEX].asString(), + args, + values[USER].asString(), + values[RHOST].asString()).second) + { + QPID_LOG(debug, "HA: Backup created exchange: " << name); + } else { + // FIXME aconway 2011-11-22: should delete pre-exisitng exchange + // and re-create from event. See comment in doEventQueueDeclare. + QPID_LOG(warning, "HA: Backup exchange already exists: " << name); + } + } +} + +void BrokerReplicator::doEventExchangeDelete(Variant::Map& values) { + string name = values[EXNAME].asString(); + try { + boost::shared_ptr<Exchange> exchange = broker.getExchanges().find(name); + if (exchange && replicateLevel(exchange->getArgs())) { + QPID_LOG(debug, "HA: Backup deleting exchange:" << name); + broker.deleteExchange( + name, + values[USER].asString(), + values[RHOST].asString()); + } + } catch (const framing::NotFoundException&) {} +} + +void BrokerReplicator::doEventBind(Variant::Map& values) { + boost::shared_ptr<Exchange> exchange = + broker.getExchanges().find(values[EXNAME].asString()); + boost::shared_ptr<Queue> queue = + broker.getQueues().find(values[QNAME].asString()); + // We only replicate binds for a replicated queue to replicated + // exchange that both exist locally. + if (exchange && replicateLevel(exchange->getArgs()) && + queue && replicateLevel(queue->getSettings())) + { + framing::FieldTable args; + amqp_0_10::translate(asMapVoid(values[ARGS]), args); + string key = values[KEY].asString(); + QPID_LOG(debug, "HA: Backup replicated binding exchange=" << exchange->getName() + << " queue=" << queue->getName() + << " key=" << key); + exchange->bind(queue, key, &args); + } +} + +void BrokerReplicator::doEventUnbind(Variant::Map& values) { + boost::shared_ptr<Exchange> exchange = + broker.getExchanges().find(values[EXNAME].asString()); + boost::shared_ptr<Queue> queue = + broker.getQueues().find(values[QNAME].asString()); + // We only replicate unbinds for a replicated queue to replicated + // exchange that both exist locally. + if (exchange && replicateLevel(exchange->getArgs()) && + queue && replicateLevel(queue->getSettings())) + { + framing::FieldTable args; + amqp_0_10::translate(asMapVoid(values[ARGS]), args); + string key = values[KEY].asString(); + QPID_LOG(debug, "HA: Backup replicated unbinding exchange=" << exchange->getName() + << " queue=" << queue->getName() + << " key=" << key); + exchange->unbind(queue, key, &args); + } +} + +void BrokerReplicator::doResponseQueue(Variant::Map& values) { + // FIXME aconway 2011-11-22: more flexible ways & defaults to indicate replication + Variant::Map argsMap(asMapVoid(values[ARGUMENTS])); + if (!replicateLevel(argsMap)) return; + framing::FieldTable args; + amqp_0_10::translate(argsMap, args); + string name(values[NAME].asString()); + std::pair<boost::shared_ptr<Queue>, bool> result = + broker.createQueue( + name, + values[DURABLE].asBool(), + values[AUTODELETE].asBool(), + 0 /*i.e. no owner regardless of exclusivity on master*/, + ""/*TODO: need to include alternate-exchange*/, + args, + ""/*TODO: who is the user?*/, + ""/*TODO: what should we use as connection id?*/); + if (result.second) { + QPID_LOG(debug, "HA: Backup created catch-up queue: " << values[NAME]); + startQueueReplicator(result.first); + } else { + // FIXME aconway 2011-11-22: Normal to find queue already + // exists if we're failing over. + QPID_LOG(warning, "HA: Backup catch-up queue already exists: " << name); + } +} + +void BrokerReplicator::doResponseExchange(Variant::Map& values) { + Variant::Map argsMap(asMapVoid(values[ARGUMENTS])); + if (!replicateLevel(argsMap)) return; + framing::FieldTable args; + amqp_0_10::translate(argsMap, args); + if (broker.createExchange( + values[NAME].asString(), + values[TYPE].asString(), + values[DURABLE].asBool(), + ""/*TODO: need to include alternate-exchange*/, + args, + ""/*TODO: who is the user?*/, + ""/*TODO: what should we use as connection id?*/).second) + { + QPID_LOG(debug, "HA: Backup catch-up exchange: " << values[NAME]); + } else { + QPID_LOG(warning, "HA: Backup catch-up exchange already exists: " << values[QNAME]); + } +} + +namespace { +const std::string QUEUE_REF_PREFIX("org.apache.qpid.broker:queue:"); +const std::string EXCHANGE_REF_PREFIX("org.apache.qpid.broker:exchange:"); + +std::string getRefName(const std::string& prefix, const Variant& ref) { + Variant::Map map(ref.asMap()); + Variant::Map::const_iterator i = map.find(OBJECT_NAME); + if (i == map.end()) + throw Exception(QPID_MSG("Replicator: invalid object reference: " << ref)); + const std::string name = i->second.asString(); + if (name.compare(0, prefix.size(), prefix) != 0) + throw Exception(QPID_MSG("Replicator: unexpected reference prefix: " << name)); + std::string ret = name.substr(prefix.size()); + return ret; +} + +const std::string EXCHANGE_REF("exchangeRef"); +const std::string QUEUE_REF("queueRef"); + +} // namespace + +void BrokerReplicator::doResponseBind(Variant::Map& values) { + std::string exName = getRefName(EXCHANGE_REF_PREFIX, values[EXCHANGE_REF]); + std::string qName = getRefName(QUEUE_REF_PREFIX, values[QUEUE_REF]); + boost::shared_ptr<Exchange> exchange = broker.getExchanges().find(exName); + boost::shared_ptr<Queue> queue = broker.getQueues().find(qName); + // FIXME aconway 2011-11-24: more flexible configuration for binding replication. + + // Automatically replicate binding if queue and exchange exist and are replicated + if (exchange && replicateLevel(exchange->getArgs()) && + queue && replicateLevel(queue->getSettings())) + { + framing::FieldTable args; + amqp_0_10::translate(asMapVoid(values[ARGUMENTS]), args); + string key = values[KEY].asString(); + exchange->bind(queue, key, &args); + QPID_LOG(debug, "HA: Backup catch-up binding: exchange=" << exchange->getName() + << " queue=" << queue->getName() + << " key=" << key); + } +} + +void BrokerReplicator::startQueueReplicator(const boost::shared_ptr<Queue>& queue) { + if (replicateLevel(queue->getSettings()) == RL_MESSAGES) { + boost::shared_ptr<QueueReplicator> qr(new QueueReplicator(queue, link)); + broker.getExchanges().registerExchange(qr); + qr->activate(); + } +} + +bool BrokerReplicator::bind(boost::shared_ptr<Queue>, const string&, const framing::FieldTable*) { return false; } +bool BrokerReplicator::unbind(boost::shared_ptr<Queue>, const string&, const framing::FieldTable*) { return false; } +bool BrokerReplicator::isBound(boost::shared_ptr<Queue>, const string* const, const framing::FieldTable* const) { return false; } + +string BrokerReplicator::getType() const { return QPID_CONFIGURATION_REPLICATOR; } + +}} // namespace broker diff --git a/cpp/src/qpid/ha/BrokerReplicator.h b/cpp/src/qpid/ha/BrokerReplicator.h new file mode 100644 index 0000000000..cfb6cf9a28 --- /dev/null +++ b/cpp/src/qpid/ha/BrokerReplicator.h @@ -0,0 +1,85 @@ +#ifndef QPID_HA_REPLICATOR_H +#define QPID_HA_REPLICATOR_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/broker/Exchange.h" +#include "qpid/types/Variant.h" +#include <boost/shared_ptr.hpp> + +namespace qpid { + +namespace broker { +class Broker; +class Link; +class Bridge; +class SessionHandler; +} + +namespace ha { + +/** + * Replicate configuration on a backup broker. + * + * Implemented as an exchange that subscribes to receive QMF + * configuration events from the primary. It configures local queues + * exchanges and bindings to replicate the primary. + * It also creates QueueReplicators for newly replicated queues. + * + * THREAD SAFE: Has no mutable state. + * + */ +class BrokerReplicator : public broker::Exchange +{ + public: + BrokerReplicator(const boost::shared_ptr<broker::Link>&); + ~BrokerReplicator(); + std::string getType() const; + + // Exchange methods + bool bind(boost::shared_ptr<broker::Queue>, const std::string&, const framing::FieldTable*); + bool unbind(boost::shared_ptr<broker::Queue>, const std::string&, const framing::FieldTable*); + void route(broker::Deliverable&, const std::string&, const framing::FieldTable*); + bool isBound(boost::shared_ptr<broker::Queue>, const std::string* const, const framing::FieldTable* const); + + private: + void initializeBridge(broker::Bridge&, broker::SessionHandler&); + + void doEventQueueDeclare(types::Variant::Map& values); + void doEventQueueDelete(types::Variant::Map& values); + void doEventExchangeDeclare(types::Variant::Map& values); + void doEventExchangeDelete(types::Variant::Map& values); + void doEventBind(types::Variant::Map&); + void doEventUnbind(types::Variant::Map&); + + void doResponseQueue(types::Variant::Map& values); + void doResponseExchange(types::Variant::Map& values); + void doResponseBind(types::Variant::Map& values); + + void startQueueReplicator(const boost::shared_ptr<broker::Queue>&); + + broker::Broker& broker; + boost::shared_ptr<broker::Link> link; +}; +}} // namespace qpid::broker + +#endif /*!QPID_HA_REPLICATOR_H*/ diff --git a/cpp/src/qpid/ha/ConnectionExcluder.cpp b/cpp/src/qpid/ha/ConnectionExcluder.cpp new file mode 100644 index 0000000000..67ad7202d6 --- /dev/null +++ b/cpp/src/qpid/ha/ConnectionExcluder.cpp @@ -0,0 +1,40 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "ConnectionExcluder.h" +#include "qpid/broker/Connection.h" +#include <boost/function.hpp> +#include <sstream> + +namespace qpid { +namespace ha { + +ConnectionExcluder::ConnectionExcluder() {} + +void ConnectionExcluder::opened(broker::Connection& connection) { + if (!connection.isLink() && !connection.getClientProperties().isSet(ADMIN_TAG)) + throw Exception( + QPID_MSG("HA: Backup broker rejected connection " << connection.getMgmtId())); +} + +const std::string ConnectionExcluder::ADMIN_TAG="qpid.ha-admin"; + +}} // namespace qpid::ha diff --git a/cpp/src/qpid/ha/ConnectionExcluder.h b/cpp/src/qpid/ha/ConnectionExcluder.h new file mode 100644 index 0000000000..f8f2843a0c --- /dev/null +++ b/cpp/src/qpid/ha/ConnectionExcluder.h @@ -0,0 +1,54 @@ +#ifndef QPID_HA_CONNECTIONEXCLUDER_H +#define QPID_HA_CONNECTIONEXCLUDER_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/broker/ConnectionObserver.h" +#include <boost/function.hpp> + +namespace qpid { + +namespace broker { +class Connection; +} + +namespace ha { + +/** + * Exclude normal connections to a backup broker. + * Admin connections are identified by a special flag in client-properties + * during connection negotiation. + */ +class ConnectionExcluder : public broker::ConnectionObserver +{ + public: + ConnectionExcluder(); + + void opened(broker::Connection& connection); + + private: + static const std::string ADMIN_TAG; +}; + +}} // namespace qpid::ha + +#endif /*!QPID_HA_CONNECTIONEXCLUDER_H*/ diff --git a/cpp/src/qpid/ha/HaBroker.cpp b/cpp/src/qpid/ha/HaBroker.cpp new file mode 100644 index 0000000000..0d3bd51439 --- /dev/null +++ b/cpp/src/qpid/ha/HaBroker.cpp @@ -0,0 +1,137 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "Backup.h" +#include "ConnectionExcluder.h" +#include "HaBroker.h" +#include "Settings.h" +#include "ReplicatingSubscription.h" +#include "qpid/Exception.h" +#include "qpid/broker/Broker.h" +#include "qpid/management/ManagementAgent.h" +#include "qmf/org/apache/qpid/ha/Package.h" +#include "qmf/org/apache/qpid/ha/ArgsHaBrokerSetClientAddresses.h" +#include "qmf/org/apache/qpid/ha/ArgsHaBrokerSetBrokerAddresses.h" +#include "qpid/log/Statement.h" + +namespace qpid { +namespace ha { + +namespace _qmf = ::qmf::org::apache::qpid::ha; +using namespace management; +using namespace std; + +namespace { + +const std::string PRIMARY="primary"; +const std::string BACKUP="backup"; + +} // namespace + + +HaBroker::HaBroker(broker::Broker& b, const Settings& s) + : broker(b), + settings(s), + backup(new Backup(b, s)), + mgmtObject(0) +{ + // Register a factory for replicating subscriptions. + broker.getConsumerFactories().add( + boost::shared_ptr<ReplicatingSubscription::Factory>( + new ReplicatingSubscription::Factory())); + + broker.getKnownBrokers = boost::bind(&HaBroker::getKnownBrokers, this); + + ManagementAgent* ma = broker.getManagementAgent(); + if (!ma) + throw Exception("Cannot start HA: management is disabled"); + if (ma) { + _qmf::Package packageInit(ma); + mgmtObject = new _qmf::HaBroker(ma, this, "ha-broker"); + mgmtObject->set_status(BACKUP); + ma->addObject(mgmtObject); + } + sys::Mutex::ScopedLock l(lock); + if (!settings.clientUrl.empty()) setClientUrl(Url(settings.clientUrl), l); + if (!settings.brokerUrl.empty()) setBrokerUrl(Url(settings.brokerUrl), l); +} + +HaBroker::~HaBroker() {} + +Manageable::status_t HaBroker::ManagementMethod (uint32_t methodId, Args& args, string&) { + sys::Mutex::ScopedLock l(lock); + switch (methodId) { + case _qmf::HaBroker::METHOD_PROMOTE: { + if (backup.get()) { // I am a backup + // FIXME aconway 2012-01-26: create primary state before resetting backup + // as that allows client connections. + backup.reset(); + QPID_LOG(notice, "HA: Primary promoted from backup"); + mgmtObject->set_status(PRIMARY); + } + break; + } + case _qmf::HaBroker::METHOD_SETCLIENTADDRESSES: { + setClientUrl( + Url(dynamic_cast<_qmf::ArgsHaBrokerSetClientAddresses&>(args). + i_clientAddresses), l); + break; + } + case _qmf::HaBroker::METHOD_SETBROKERADDRESSES: { + setBrokerUrl( + Url(dynamic_cast<_qmf::ArgsHaBrokerSetBrokerAddresses&>(args) + .i_brokerAddresses), l); + break; + } + default: + return Manageable::STATUS_UNKNOWN_METHOD; + } + return Manageable::STATUS_OK; +} + +void HaBroker::setClientUrl(const Url& url, const sys::Mutex::ScopedLock& l) { + if (url.empty()) throw Exception("Invalid empty URL for HA client failover"); + clientUrl = url; + updateClientUrl(l); +} + +void HaBroker::updateClientUrl(const sys::Mutex::ScopedLock&) { + Url url = clientUrl.empty() ? brokerUrl : clientUrl; + assert(!url.empty()); + mgmtObject->set_clientAddresses(url.str()); + knownBrokers.clear(); + knownBrokers.push_back(url); + QPID_LOG(debug, "HA: Setting client known-brokers to: " << url); +} + +void HaBroker::setBrokerUrl(const Url& url, const sys::Mutex::ScopedLock& l) { + if (url.empty()) throw Exception("Invalid empty URL for HA broker failover"); + brokerUrl = url; + mgmtObject->set_brokerAddresses(brokerUrl.str()); + if (backup.get()) backup->setBrokerUrl(brokerUrl); + // Updating broker URL also updates defaulted client URL: + if (clientUrl.empty()) updateClientUrl(l); +} + +std::vector<Url> HaBroker::getKnownBrokers() const { + return knownBrokers; +} + +}} // namespace qpid::ha diff --git a/cpp/src/qpid/ha/HaBroker.h b/cpp/src/qpid/ha/HaBroker.h new file mode 100644 index 0000000000..4d7bf80c90 --- /dev/null +++ b/cpp/src/qpid/ha/HaBroker.h @@ -0,0 +1,74 @@ +#ifndef QPID_HA_BROKER_H +#define QPID_HA_BROKER_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "Settings.h" +#include "qpid/Url.h" +#include "qpid/sys/Mutex.h" +#include "qmf/org/apache/qpid/ha/HaBroker.h" +#include "qmf/org/apache/qpid/ha/ArgsHaBrokerSetStatus.h" +#include "qpid/management/Manageable.h" +#include <memory> + +namespace qpid { +namespace broker { +class Broker; +} +namespace ha { +class Backup; + +/** + * HA state and actions associated with a broker. + * + * THREAD SAFE: may be called in arbitrary broker IO or timer threads. + */ +class HaBroker : public management::Manageable +{ + public: + HaBroker(broker::Broker&, const Settings&); + ~HaBroker(); + + // Implement Manageable. + qpid::management::ManagementObject* GetManagementObject() const { return mgmtObject; } + management::Manageable::status_t ManagementMethod ( + uint32_t methodId, management::Args& args, std::string& text); + + private: + void setClientUrl(const Url&, const sys::Mutex::ScopedLock&); + void setBrokerUrl(const Url&, const sys::Mutex::ScopedLock&); + void updateClientUrl(const sys::Mutex::ScopedLock&); + bool isPrimary(const sys::Mutex::ScopedLock&) { return !backup.get(); } + std::vector<Url> getKnownBrokers() const; + + broker::Broker& broker; + const Settings settings; + + sys::Mutex lock; + std::auto_ptr<Backup> backup; + qmf::org::apache::qpid::ha::HaBroker* mgmtObject; + Url clientUrl, brokerUrl; + std::vector<Url> knownBrokers; +}; +}} // namespace qpid::ha + +#endif /*!QPID_HA_BROKER_H*/ diff --git a/cpp/src/qpid/ha/HaPlugin.cpp b/cpp/src/qpid/ha/HaPlugin.cpp new file mode 100644 index 0000000000..fc9e48411d --- /dev/null +++ b/cpp/src/qpid/ha/HaPlugin.cpp @@ -0,0 +1,67 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#include "HaBroker.h" +#include "Settings.h" +#include "qpid/Plugin.h" +#include "qpid/Options.h" +#include "qpid/broker/Broker.h" + + +namespace qpid { +namespace ha { + +using namespace std; + +struct Options : public qpid::Options { + Settings& settings; + Options(Settings& s) : qpid::Options("HA Options"), settings(s) { + addOptions() + ("ha-enable", optValue(settings.enabled, "yes|no"), "Enable High Availability features") + ("ha-client-url", optValue(settings.clientUrl,"URL"), "URL that clients use to connect and fail over.") + ("ha-broker-url", optValue(settings.brokerUrl,"URL"), "URL that backup brokers use to connect and fail over.") + ("ha-username", optValue(settings.username, "USER"), "Username for connections between brokers") + ("ha-password", optValue(settings.password, "PASS"), "Password for connections between brokers") + ("ha-mechanism", optValue(settings.mechanism, "MECH"), "Authentication mechanism for connections between brokers") + ; + } +}; + +struct HaPlugin : public Plugin { + + Settings settings; + Options options; + auto_ptr<HaBroker> haBroker; + + HaPlugin() : options(settings) {} + + Options* getOptions() { return &options; } + + void earlyInitialize(Plugin::Target& ) {} + + void initialize(Plugin::Target& target) { + broker::Broker* broker = dynamic_cast<broker::Broker*>(&target); + if (broker && settings.enabled) { + haBroker.reset(new ha::HaBroker(*broker, settings)); + } else + QPID_LOG(notice, "HA: Disabled"); + } +}; + +static HaPlugin instance; // Static initialization. + +}} // namespace qpid::ha diff --git a/cpp/src/qpid/ha/QueueReplicator.cpp b/cpp/src/qpid/ha/QueueReplicator.cpp new file mode 100644 index 0000000000..0017cc82cd --- /dev/null +++ b/cpp/src/qpid/ha/QueueReplicator.cpp @@ -0,0 +1,174 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "QueueReplicator.h" +#include "ReplicatingSubscription.h" +#include "qpid/broker/Bridge.h" +#include "qpid/broker/Broker.h" +#include "qpid/broker/Link.h" +#include "qpid/broker/Queue.h" +#include "qpid/broker/QueueRegistry.h" +#include "qpid/broker/SessionHandler.h" +#include "qpid/framing/SequenceSet.h" +#include "qpid/framing/FieldTable.h" +#include "qpid/log/Statement.h" +#include <boost/shared_ptr.hpp> +#include <sstream> + +namespace { +const std::string QPID_REPLICATOR_("qpid.replicator-"); +const std::string TYPE_NAME("qpid.queue-replicator"); +const std::string QPID_SYNC_FREQUENCY("qpid.sync_frequency"); +} + +namespace qpid { +namespace ha { +using namespace broker; +using namespace framing; + +const std::string QueueReplicator::DEQUEUE_EVENT_KEY("qpid.dequeue-event"); +const std::string QueueReplicator::POSITION_EVENT_KEY("qpid.position-event"); + +std::string QueueReplicator::replicatorName(const std::string& queueName) { + return QPID_REPLICATOR_ + queueName; +} + +QueueReplicator::QueueReplicator(boost::shared_ptr<Queue> q, boost::shared_ptr<Link> l) + : Exchange(replicatorName(q->getName()), 0, q->getBroker()), queue(q), link(l) +{ + std::stringstream ss; + ss << "HA: Backup " << queue->getName() << ": "; + logPrefix = ss.str(); + QPID_LOG(info, logPrefix << "Created, settings: " << q->getSettings()); +} + +// This must be separate from the constructor so we can call shared_from_this. +void QueueReplicator::activate() { + // Note this may create a new bridge or use an existing one. + queue->getBroker()->getLinks().declare( + link->getHost(), link->getPort(), + false, // durable + queue->getName(), // src + getName(), // dest + "", // key + false, // isQueue + false, // isLocal + "", // id/tag + "", // excludes + false, // dynamic + 0, // sync? + // Include shared_ptr to self to ensure we are not deleted + // before initializeBridge is called. + boost::bind(&QueueReplicator::initializeBridge, this, _1, _2, shared_from_this()) + ); +} + +QueueReplicator::~QueueReplicator() {} + +void QueueReplicator::deactivate() { + sys::Mutex::ScopedLock l(lock); + queue->getBroker()->getLinks().destroy( + link->getHost(), link->getPort(), queue->getName(), getName(), string()); + QPID_LOG(debug, logPrefix << "Deactivated bridge " << bridgeName); +} + +// Called in a broker connection thread when the bridge is created. +// shared_ptr to self ensures we are not deleted before initializeBridge is called. +void QueueReplicator::initializeBridge(Bridge& bridge, SessionHandler& sessionHandler, + boost::shared_ptr<QueueReplicator> /*self*/) { + sys::Mutex::ScopedLock l(lock); + bridgeName = bridge.getName(); + framing::AMQP_ServerProxy peer(sessionHandler.out); + const qmf::org::apache::qpid::broker::ArgsLinkBridge& args(bridge.getArgs()); + framing::FieldTable settings; + + // FIXME aconway 2011-12-09: Failover optimization removed. + // There was code here to re-use messages already on the backup + // during fail-over. This optimization was removed to simplify + // the logic till we get the basic replication stable, it + // can be re-introduced later. Last revision with the optimization: + // r1213258 | QPID-3603: Fix QueueReplicator subscription parameters. + + // Clear out any old messages, reset the queue to start replicating fresh. + queue->purge(); + queue->setPosition(0); + + settings.setInt(ReplicatingSubscription::QPID_REPLICATING_SUBSCRIPTION, 1); + // TODO aconway 2011-12-19: optimize. + settings.setInt(QPID_SYNC_FREQUENCY, 1); + peer.getMessage().subscribe(args.i_src, args.i_dest, 0/*accept-explicit*/, 1/*not-acquired*/, false/*exclusive*/, "", 0, settings); + peer.getMessage().flow(getName(), 0, 0xFFFFFFFF); + peer.getMessage().flow(getName(), 1, 0xFFFFFFFF); + QPID_LOG(debug, logPrefix << "Activated bridge " << bridgeName); +} + +namespace { +template <class T> T decodeContent(Message& m) { + std::string content; + m.getFrames().getContent(content); + Buffer buffer(const_cast<char*>(content.c_str()), content.size()); + T result; + result.decode(buffer); + return result; +} +} + +void QueueReplicator::dequeue(SequenceNumber n, const sys::Mutex::ScopedLock&) { + // Thread safe: only calls thread safe Queue functions. + if (queue->getPosition() >= n) { // Ignore messages we haven't reached yet + QueuedMessage message; + if (queue->acquireMessageAt(n, message)) + queue->dequeue(0, message); + } +} + +// Called in connection thread of the queues bridge to primary. +void QueueReplicator::route(Deliverable& msg, const std::string& key, const FieldTable*) +{ + sys::Mutex::ScopedLock l(lock); + if (key == DEQUEUE_EVENT_KEY) { + SequenceSet dequeues = decodeContent<SequenceSet>(msg.getMessage()); + QPID_LOG(trace, logPrefix << "Dequeue: " << dequeues); + //TODO: should be able to optimise the following + for (SequenceSet::iterator i = dequeues.begin(); i != dequeues.end(); i++) + dequeue(*i, l); + } else if (key == POSITION_EVENT_KEY) { + SequenceNumber position = decodeContent<SequenceNumber>(msg.getMessage()); + QPID_LOG(trace, logPrefix << "Position moved from " << queue->getPosition() + << " to " << position); + assert(queue->getPosition() <= position); + //TODO aconway 2011-12-14: Optimize this? + for (SequenceNumber i = queue->getPosition(); i < position; ++i) + dequeue(i,l); + queue->setPosition(position); + } else { + msg.deliverTo(queue); + QPID_LOG(trace, logPrefix << "Enqueued message " << queue->getPosition()); + } +} + +// Unused Exchange methods. +bool QueueReplicator::bind(boost::shared_ptr<Queue>, const std::string&, const FieldTable*) { return false; } +bool QueueReplicator::unbind(boost::shared_ptr<Queue>, const std::string&, const FieldTable*) { return false; } +bool QueueReplicator::isBound(boost::shared_ptr<Queue>, const std::string* const, const FieldTable* const) { return false; } +std::string QueueReplicator::getType() const { return TYPE_NAME; } + +}} // namespace qpid::broker diff --git a/cpp/src/qpid/ha/QueueReplicator.h b/cpp/src/qpid/ha/QueueReplicator.h new file mode 100644 index 0000000000..9de7dd480c --- /dev/null +++ b/cpp/src/qpid/ha/QueueReplicator.h @@ -0,0 +1,86 @@ +#ifndef QPID_HA_QUEUEREPLICATOR_H +#define QPID_HA_QUEUEREPLICATOR_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/broker/Exchange.h" +#include "qpid/framing/SequenceSet.h" +#include <boost/enable_shared_from_this.hpp> +#include <iosfwd> + +namespace qpid { + +namespace broker { +class Bridge; +class Link; +class Queue; +class QueueRegistry; +class SessionHandler; +class Deliverable; +} + +namespace ha { + +/** + * Exchange created on a backup broker to replicate a queue on the primary. + * + * Puts replicated messages on the local queue, handles dequeue events. + * Creates a ReplicatingSubscription on the primary by passing special + * arguments to the consume command. + * + * THREAD SAFE: Called in different connection threads. + */ +class QueueReplicator : public broker::Exchange, + public boost::enable_shared_from_this<QueueReplicator> +{ + public: + static const std::string DEQUEUE_EVENT_KEY; + static const std::string POSITION_EVENT_KEY; + static std::string replicatorName(const std::string& queueName); + + QueueReplicator(boost::shared_ptr<broker::Queue> q, boost::shared_ptr<broker::Link> l); + ~QueueReplicator(); + + void activate(); // Call after ctor + void deactivate(); // Call before dtor + + std::string getType() const; + bool bind(boost::shared_ptr<broker::Queue + >, const std::string&, const framing::FieldTable*); + bool unbind(boost::shared_ptr<broker::Queue>, const std::string&, const framing::FieldTable*); + void route(broker::Deliverable&, const std::string&, const framing::FieldTable*); + bool isBound(boost::shared_ptr<broker::Queue>, const std::string* const, const framing::FieldTable* const); + + private: + void initializeBridge(broker::Bridge& bridge, broker::SessionHandler& sessionHandler, + boost::shared_ptr<QueueReplicator> self); + void dequeue(framing::SequenceNumber, const sys::Mutex::ScopedLock&); + + std::string logPrefix; + std::string bridgeName; + sys::Mutex lock; + boost::shared_ptr<broker::Queue> queue; + boost::shared_ptr<broker::Link> link; +}; + +}} // namespace qpid::ha + +#endif /*!QPID_HA_QUEUEREPLICATOR_H*/ diff --git a/cpp/src/qpid/ha/ReplicatingSubscription.cpp b/cpp/src/qpid/ha/ReplicatingSubscription.cpp new file mode 100644 index 0000000000..af6180305d --- /dev/null +++ b/cpp/src/qpid/ha/ReplicatingSubscription.cpp @@ -0,0 +1,293 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "ReplicatingSubscription.h" +#include "qpid/broker/Queue.h" +#include "qpid/broker/SessionContext.h" +#include "qpid/broker/ConnectionState.h" +#include "qpid/framing/AMQFrame.h" +#include "qpid/framing/MessageTransferBody.h" +#include "qpid/log/Statement.h" +#include <sstream> + +namespace qpid { +namespace ha { + +using namespace framing; +using namespace broker; +using namespace std; + +const string ReplicatingSubscription::QPID_REPLICATING_SUBSCRIPTION("qpid.replicating-subscription"); + +namespace { +const string DOLLAR("$"); +const string INTERNAL("-internal"); +} // namespace + +string mask(const string& in) +{ + return DOLLAR + in + INTERNAL; +} + +/* Called by SemanticState::consume to create a consumer */ +boost::shared_ptr<broker::SemanticState::ConsumerImpl> +ReplicatingSubscription::Factory::create( + SemanticState* parent, + const string& name, + Queue::shared_ptr queue, + bool ack, + bool acquire, + bool exclusive, + const string& tag, + const string& resumeId, + uint64_t resumeTtl, + const framing::FieldTable& arguments +) { + boost::shared_ptr<ReplicatingSubscription> rs; + if (arguments.isSet(QPID_REPLICATING_SUBSCRIPTION)) { + rs.reset(new ReplicatingSubscription( + parent, name, queue, ack, acquire, exclusive, tag, + resumeId, resumeTtl, arguments)); + queue->addObserver(rs); + } + return rs; +} + +ReplicatingSubscription::ReplicatingSubscription( + SemanticState* parent, + const string& name, + Queue::shared_ptr queue, + bool ack, + bool acquire, + bool exclusive, + const string& tag, + const string& resumeId, + uint64_t resumeTtl, + const framing::FieldTable& arguments +) : ConsumerImpl(parent, name, queue, ack, acquire, exclusive, tag, + resumeId, resumeTtl, arguments), + events(new Queue(mask(name))), + consumer(new DelegatingConsumer(*this)) +{ + stringstream ss; + ss << "HA: Primary: " << getQueue()->getName() << " at " + << parent->getSession().getConnection().getUrl() << ": "; + logPrefix = ss.str(); + + // FIXME aconway 2011-12-09: Failover optimization removed. + // There was code here to re-use messages already on the backup + // during fail-over. This optimization was removed to simplify + // the logic till we get the basic replication stable, it + // can be re-introduced later. Last revision with the optimization: + // r1213258 | QPID-3603: Fix QueueReplicator subscription parameters. + + QPID_LOG(debug, logPrefix << "Created backup subscription " << getName()); + + // FIXME aconway 2011-12-15: ConsumerImpl::position is left at 0 + // so we will start consuming from the lowest numbered message. + // This is incorrect if the sequence number wraps around, but + // this is what all consumers currently do. +} + +// Message is delivered in the subscription's connection thread. +bool ReplicatingSubscription::deliver(QueuedMessage& m) { + // Add position events for the subscribed queue, not for the internal event queue. + if (m.queue && m.queue == getQueue().get()) { + sys::Mutex::ScopedLock l(lock); + assert(position == m.position); + // m.position is the position of the newly enqueued m on the local queue. + // backupPosition is latest position on the backup queue (before enqueueing m.) + assert(m.position > backupPosition); + if (m.position - backupPosition > 1) { + // Position has advanced because of messages dequeued ahead of us. + SequenceNumber send(m.position); + --send; // Send the position before m was enqueued. + sendPositionEvent(send, l); + } + backupPosition = m.position; + QPID_LOG(trace, logPrefix << "Replicating message " << m.position); + } + return ConsumerImpl::deliver(m); +} + +ReplicatingSubscription::~ReplicatingSubscription() {} + + +// INVARIANT: delayed contains msg <=> we have outstanding startCompletion on msg + +// Mark a message completed. May be called by acknowledge or dequeued +void ReplicatingSubscription::complete( + const QueuedMessage& qm, const sys::Mutex::ScopedLock&) +{ + // Handle completions for the subscribed queue, not the internal event queue. + if (qm.queue && qm.queue == getQueue().get()) { + QPID_LOG(trace, logPrefix << "Completed message " << qm.position); + Delayed::iterator i= delayed.find(qm.position); + // The same message can be completed twice, by acknowledged and + // dequeued, remove it from the set so it only gets completed + // once. + if (i != delayed.end()) { + assert(i->second.payload == qm.payload); + qm.payload->getIngressCompletion().finishCompleter(); + delayed.erase(i); + } + } +} + +// Called before we get notified of the message being available and +// under the message lock in the queue. Called in arbitrary connection thread. +void ReplicatingSubscription::enqueued(const QueuedMessage& qm) { + sys::Mutex::ScopedLock l(lock); + // Delay completion + QPID_LOG(trace, logPrefix << "Delaying completion of message " << qm.position); + qm.payload->getIngressCompletion().startCompleter(); + assert(delayed.find(qm.position) == delayed.end()); + delayed[qm.position] = qm; +} + + +// Function to complete a delayed message, called by cancel() +void ReplicatingSubscription::cancelComplete( + const Delayed::value_type& v, const sys::Mutex::ScopedLock&) +{ + QPID_LOG(trace, logPrefix << "Cancel completed message " << v.second.position); + v.second.payload->getIngressCompletion().finishCompleter(); +} + +// Called in the subscription's connection thread. +void ReplicatingSubscription::cancel() +{ + getQueue()->removeObserver( + boost::dynamic_pointer_cast<QueueObserver>(shared_from_this())); + { + sys::Mutex::ScopedLock l(lock); + QPID_LOG(debug, logPrefix <<"Cancelled backup subscription " << getName()); + for_each(delayed.begin(), delayed.end(), + boost::bind(&ReplicatingSubscription::cancelComplete, this, _1, boost::ref(l))); + delayed.clear(); + } + ConsumerImpl::cancel(); +} + +// Called on primary in the backups IO thread. +void ReplicatingSubscription::acknowledged(const QueuedMessage& msg) { + sys::Mutex::ScopedLock l(lock); + // Finish completion of message, it has been acknowledged by the backup. + complete(msg, l); +} + +// Hide the "queue deleted" error for a ReplicatingSubscription when a +// queue is deleted, this is normal and not an error. +bool ReplicatingSubscription::hideDeletedError() { return true; } + +// Called with lock held. Called in subscription's connection thread. +void ReplicatingSubscription::sendDequeueEvent(const sys::Mutex::ScopedLock& l) +{ + QPID_LOG(trace, logPrefix << "Sending dequeues " << dequeues); + string buf(dequeues.encodedSize(),'\0'); + framing::Buffer buffer(&buf[0], buf.size()); + dequeues.encode(buffer); + dequeues.clear(); + buffer.reset(); + sendEvent(QueueReplicator::DEQUEUE_EVENT_KEY, buffer, l); +} + +// Called after the message has been removed from the deque and under +// the messageLock in the queue. Called in arbitrary connection threads. +void ReplicatingSubscription::dequeued(const QueuedMessage& qm) +{ + { + sys::Mutex::ScopedLock l(lock); + QPID_LOG(trace, logPrefix << "Dequeued message " << qm.position); + dequeues.add(qm.position); + // If we have not yet sent this message to the backup, then + // complete it now as it will never be accepted. + if (qm.position > position) complete(qm, l); + } + notify(); // Ensure a call to doDispatch +} + +// Called with lock held. Called in subscription's connection thread. +void ReplicatingSubscription::sendPositionEvent( + SequenceNumber position, const sys::Mutex::ScopedLock&l ) +{ + QPID_LOG(trace, logPrefix << "Sending position " << position + << ", was " << backupPosition); + string buf(backupPosition.encodedSize(),'\0'); + framing::Buffer buffer(&buf[0], buf.size()); + position.encode(buffer); + buffer.reset(); + sendEvent(QueueReplicator::POSITION_EVENT_KEY, buffer, l); +} + +void ReplicatingSubscription::sendEvent(const std::string& key, framing::Buffer& buffer, + const sys::Mutex::ScopedLock&) +{ + //generate event message + boost::intrusive_ptr<Message> event = new Message(); + AMQFrame method((MessageTransferBody(ProtocolVersion(), string(), 0, 0))); + AMQFrame header((AMQHeaderBody())); + AMQFrame content((AMQContentBody())); + content.castBody<AMQContentBody>()->decode(buffer, buffer.getSize()); + header.setBof(false); + header.setEof(false); + header.setBos(true); + header.setEos(true); + content.setBof(false); + content.setEof(true); + content.setBos(true); + content.setEos(true); + event->getFrames().append(method); + event->getFrames().append(header); + event->getFrames().append(content); + + DeliveryProperties* props = event->getFrames().getHeaders()->get<DeliveryProperties>(true); + props->setRoutingKey(key); + // Send the event using the events queue. Consumer is a + // DelegatingConsumer that delegates to *this for everything but + // has an independnet position. We put an event on events and + // dispatch it through ourselves to send it in line with the + // normal browsing messages. + events->deliver(event); + events->dispatch(consumer); +} + + +// Called in subscription's connection thread. +bool ReplicatingSubscription::doDispatch() +{ + { + sys::Mutex::ScopedLock l(lock); + if (!dequeues.empty()) sendDequeueEvent(l); + } + return ConsumerImpl::doDispatch(); +} + +ReplicatingSubscription::DelegatingConsumer::DelegatingConsumer(ReplicatingSubscription& c) : Consumer(c.getName(), true), delegate(c) {} +ReplicatingSubscription::DelegatingConsumer::~DelegatingConsumer() {} +bool ReplicatingSubscription::DelegatingConsumer::deliver(QueuedMessage& m) { return delegate.deliver(m); } +void ReplicatingSubscription::DelegatingConsumer::notify() { delegate.notify(); } +bool ReplicatingSubscription::DelegatingConsumer::filter(boost::intrusive_ptr<Message> msg) { return delegate.filter(msg); } +bool ReplicatingSubscription::DelegatingConsumer::accept(boost::intrusive_ptr<Message> msg) { return delegate.accept(msg); } +bool ReplicatingSubscription::DelegatingConsumer::browseAcquired() const { return delegate.browseAcquired(); } +OwnershipToken* ReplicatingSubscription::DelegatingConsumer::getSession() { return delegate.getSession(); } + +}} // namespace qpid::ha diff --git a/cpp/src/qpid/ha/ReplicatingSubscription.h b/cpp/src/qpid/ha/ReplicatingSubscription.h new file mode 100644 index 0000000000..e311f9505a --- /dev/null +++ b/cpp/src/qpid/ha/ReplicatingSubscription.h @@ -0,0 +1,134 @@ +#ifndef QPID_BROKER_REPLICATINGSUBSCRIPTION_H +#define QPID_BROKER_REPLICATINGSUBSCRIPTION_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "QueueReplicator.h" // For DEQUEUE_EVENT_KEY +#include "qpid/broker/SemanticState.h" +#include "qpid/broker/QueueObserver.h" +#include "qpid/broker/ConsumerFactory.h" +#include <iosfwd> + +namespace qpid { + +namespace broker { +class Message; +class Queue; +class QueuedMessage; +class OwnershipToken; +} + +namespace framing { +class Buffer; +} + +namespace ha { + +/** + * A susbcription that represents a backup replicating a queue. + * + * Runs on the primary. Delays completion of messages till the backup + * has acknowledged, informs backup of locally dequeued messages. + * + * THREAD SAFE: Used as a consumer in subscription's connection + * thread, and as a QueueObserver in arbitrary connection threads. + */ +class ReplicatingSubscription : public broker::SemanticState::ConsumerImpl, + public broker::QueueObserver +{ + public: + struct Factory : public broker::ConsumerFactory { + boost::shared_ptr<broker::SemanticState::ConsumerImpl> create( + broker::SemanticState* parent, + const std::string& name, boost::shared_ptr<broker::Queue> , + bool ack, bool acquire, bool exclusive, const std::string& tag, + const std::string& resumeId, uint64_t resumeTtl, + const framing::FieldTable& arguments); + }; + + // Argument names for consume command. + static const std::string QPID_REPLICATING_SUBSCRIPTION; + + ReplicatingSubscription(broker::SemanticState* parent, + const std::string& name, boost::shared_ptr<broker::Queue> , + bool ack, bool acquire, bool exclusive, const std::string& tag, + const std::string& resumeId, uint64_t resumeTtl, + const framing::FieldTable& arguments); + + ~ReplicatingSubscription(); + + // QueueObserver overrides. + bool deliver(broker::QueuedMessage& msg); + void enqueued(const broker::QueuedMessage&); + void dequeued(const broker::QueuedMessage&); + void acquired(const broker::QueuedMessage&) {} + void requeued(const broker::QueuedMessage&) {} + + // Consumer overrides. + void cancel(); + void acknowledged(const broker::QueuedMessage&); + bool browseAcquired() const { return true; } + + bool hideDeletedError(); + + protected: + bool doDispatch(); + private: + typedef std::map<framing::SequenceNumber, broker::QueuedMessage> Delayed; + std::string logPrefix; + boost::shared_ptr<broker::Queue> events; + boost::shared_ptr<broker::Consumer> consumer; + Delayed delayed; + framing::SequenceSet dequeues; + framing::SequenceNumber backupPosition; + + void complete(const broker::QueuedMessage&, const sys::Mutex::ScopedLock&); + void cancelComplete(const Delayed::value_type& v, const sys::Mutex::ScopedLock&); + void sendDequeueEvent(const sys::Mutex::ScopedLock&); + void sendPositionEvent(framing::SequenceNumber, const sys::Mutex::ScopedLock&); + void sendEvent(const std::string& key, framing::Buffer&, + const sys::Mutex::ScopedLock&); + + class DelegatingConsumer : public Consumer + { + public: + DelegatingConsumer(ReplicatingSubscription&); + ~DelegatingConsumer(); + bool deliver(broker::QueuedMessage& msg); + void notify(); + bool filter(boost::intrusive_ptr<broker::Message>); + bool accept(boost::intrusive_ptr<broker::Message>); + void cancel() {} + void acknowledged(const broker::QueuedMessage&) {} + bool browseAcquired() const; + + broker::OwnershipToken* getSession(); + + private: + ReplicatingSubscription& delegate; + }; +}; + + +}} // namespace qpid::broker + +#endif /*!QPID_BROKER_REPLICATINGSUBSCRIPTION_H*/ diff --git a/cpp/src/qpid/ha/Settings.h b/cpp/src/qpid/ha/Settings.h new file mode 100644 index 0000000000..049c873b9f --- /dev/null +++ b/cpp/src/qpid/ha/Settings.h @@ -0,0 +1,45 @@ +#ifndef QPID_HA_SETTINGS_H +#define QPID_HA_SETTINGS_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include <string> + +namespace qpid { +namespace ha { + +/** + * Configurable settings for HA. + */ +class Settings +{ + public: + Settings() : enabled(false) {} + bool enabled; + std::string clientUrl; + std::string brokerUrl; + std::string username, password, mechanism; + private: +}; +}} // namespace qpid::ha + +#endif /*!QPID_HA_SETTINGS_H*/ diff --git a/cpp/src/qpid/ha/management-schema.xml b/cpp/src/qpid/ha/management-schema.xml new file mode 100644 index 0000000000..fe4a14d111 --- /dev/null +++ b/cpp/src/qpid/ha/management-schema.xml @@ -0,0 +1,38 @@ +<schema package="org.apache.qpid.ha"> + + <!-- + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + --> + + <!-- Monitor and control HA status of a broker. --> + <class name="HaBroker"> + <property name="name" type="sstr" access="RC" index="y" desc="Primary Key"/> + <property name="status" type="sstr" desc="HA status: primary or backup"/> + <property name="clientAddresses" type="sstr" desc="List of addresses used by clients to connect to the HA cluster."/> + <property name="brokerAddresses" type="sstr" desc="List of addresses used by HA brokers to connect to each other."/> + + <method name="promote" desc="Promote a backup broker to primary."/> + <method name="setClientAddresses" desc="Set HA client addresses"> + <arg name="clientAddresses" type="sstr" dir="I"/> + </method> + <method name="setBrokerAddresses" desc="Set HA broker addresses"> + <arg name="brokerAddresses" type="sstr" dir="I"/> + </method> + </class> + +</schema> |
