diff options
Diffstat (limited to 'cpp/src/qpid/ha/BrokerReplicator.cpp')
-rw-r--r-- | cpp/src/qpid/ha/BrokerReplicator.cpp | 420 |
1 files changed, 256 insertions, 164 deletions
diff --git a/cpp/src/qpid/ha/BrokerReplicator.cpp b/cpp/src/qpid/ha/BrokerReplicator.cpp index d0c99cbdb6..c8c4a42d72 100644 --- a/cpp/src/qpid/ha/BrokerReplicator.cpp +++ b/cpp/src/qpid/ha/BrokerReplicator.cpp @@ -22,6 +22,7 @@ #include "HaBroker.h" #include "QueueReplicator.h" #include "qpid/broker/Broker.h" +#include "qpid/broker/Connection.h" #include "qpid/broker/Queue.h" #include "qpid/broker/Link.h" #include "qpid/framing/FieldTable.h" @@ -37,8 +38,11 @@ #include "qmf/org/apache/qpid/broker/EventQueueDeclare.h" #include "qmf/org/apache/qpid/broker/EventQueueDelete.h" #include "qmf/org/apache/qpid/broker/EventSubscribe.h" +#include "qmf/org/apache/qpid/ha/EventMembersUpdate.h" #include <algorithm> #include <sstream> +#include <iostream> +#include <assert.h> namespace qpid { namespace ha { @@ -50,15 +54,16 @@ using qmf::org::apache::qpid::broker::EventExchangeDelete; using qmf::org::apache::qpid::broker::EventQueueDeclare; using qmf::org::apache::qpid::broker::EventQueueDelete; using qmf::org::apache::qpid::broker::EventSubscribe; +using qmf::org::apache::qpid::ha::EventMembersUpdate; using namespace framing; using std::string; +using std::ostream; using types::Variant; using namespace broker; namespace { const string QPID_CONFIGURATION_REPLICATOR("qpid.configuration-replicator"); -const string QPID_REPLICATE("qpid.replicate"); const string CLASS_NAME("_class_name"); const string EVENT("_event"); @@ -69,10 +74,13 @@ const string SCHEMA_ID("_schema_id"); const string VALUES("_values"); const string ALTEX("altEx"); +const string ALTEXCHANGE("altExchange"); const string ARGS("args"); const string ARGUMENTS("arguments"); const string AUTODEL("autoDel"); const string AUTODELETE("autoDelete"); +const string EXCL("excl"); +const string EXCLUSIVE("exclusive"); const string BIND("bind"); const string UNBIND("unbind"); const string BINDING("binding"); @@ -86,12 +94,12 @@ const string KEY("key"); const string NAME("name"); const string QNAME("qName"); const string QUEUE("queue"); -const string RHOST("rhost"); const string TYPE("type"); -const string USER("user"); const string HA_BROKER("habroker"); +const string PARTIAL("partial"); -const string AGENT_IND_EVENT_ORG_APACHE_QPID_BROKER("agent.ind.event.org_apache_qpid_broker.#"); +const string AGENT_EVENT_BROKER("agent.ind.event.org_apache_qpid_broker.#"); +const string AGENT_EVENT_HA("agent.ind.event.org_apache_qpid_ha.#"); const string QMF2("qmf2"); const string QMF_CONTENT("qmf.content"); const string QMF_DEFAULT_TOPIC("qmf.default.topic"); @@ -107,6 +115,7 @@ const string ORG_APACHE_QPID_HA("org.apache.qpid.ha"); const string QMF_DEFAULT_DIRECT("qmf.default.direct"); const string _QUERY_REQUEST("_query_request"); const string BROKER("broker"); +const string MEMBERS("members"); bool isQMFv2(const Message& message) { const framing::MessageProperties* props = message.getProperties<framing::MessageProperties>(); @@ -117,7 +126,9 @@ template <class T> bool match(Variant::Map& schema) { return T::match(schema[CLASS_NAME], schema[PACKAGE_NAME]); } -void sendQuery(const string& packageName, const string& className, const string& queueName, SessionHandler& sessionHandler) { +void sendQuery(const string& packageName, const string& className, const string& queueName, + SessionHandler& sessionHandler) +{ framing::AMQP_ServerProxy peer(sessionHandler.out); Variant::Map request; request[_WHAT] = OBJECT; @@ -137,6 +148,7 @@ void sendQuery(const string& packageName, const string& className, const string& props->setAppId(QMF2); props->getApplicationHeaders().setString(QMF_OPCODE, _QUERY_REQUEST); headerBody.get<qpid::framing::DeliveryProperties>(true)->setRoutingKey(BROKER); + headerBody.get<qpid::framing::MessageProperties>(true)->setCorrelationId(className); AMQFrame header(headerBody); header.setBof(false); header.setEof(false); @@ -159,39 +171,23 @@ Variant::Map asMapVoid(const Variant& value) { if (!value.isVoid()) return value.asMap(); else return Variant::Map(); } - } // namespace - -ReplicateLevel BrokerReplicator::replicateLevel(const std::string& str) { - ReplicateLevel rl; - if (qpid::ha::replicateLevel(str, rl)) return rl; - else return haBroker.getSettings().replicateDefault; -} - -ReplicateLevel BrokerReplicator::replicateLevel(const framing::FieldTable& f) { - if (f.isSet(QPID_REPLICATE)) - return replicateLevel(f.getAsString(QPID_REPLICATE)); - else - return haBroker.getSettings().replicateDefault; -} - -ReplicateLevel BrokerReplicator::replicateLevel(const Variant::Map& m) { - Variant::Map::const_iterator i = m.find(QPID_REPLICATE); - if (i != m.end()) - return replicateLevel(i->second.asString()); - else - return haBroker.getSettings().replicateDefault; -} - -BrokerReplicator::~BrokerReplicator() {} - BrokerReplicator::BrokerReplicator(HaBroker& hb, const boost::shared_ptr<Link>& l) : Exchange(QPID_CONFIGURATION_REPLICATOR), - haBroker(hb), broker(hb.getBroker()), link(l) -{ + logPrefix("Backup: "), replicationTest(hb.getReplicationTest()), + haBroker(hb), broker(hb.getBroker()), link(l), + initialized(false), + alternates(hb.getBroker().getExchanges()) +{} + +void BrokerReplicator::initialize() { + // Can't do this in the constructor because we need a shared_ptr to this. + types::Uuid uuid(true); + const std::string name(QPID_CONFIGURATION_REPLICATOR + ".bridge." + uuid.str()); broker.getLinks().declare( - link->getHost(), link->getPort(), + name, // name for bridge + *link, // parent false, // durable QPID_CONFIGURATION_REPLICATOR, // src QPID_CONFIGURATION_REPLICATOR, // dest @@ -202,21 +198,41 @@ BrokerReplicator::BrokerReplicator(HaBroker& hb, const boost::shared_ptr<Link>& "", // excludes false, // dynamic 0, // sync? - boost::bind(&BrokerReplicator::initializeBridge, this, _1, _2) + // shared_ptr keeps this in memory until outstanding initializeBridge + // calls are run. + boost::bind(&BrokerReplicator::initializeBridge, shared_from_this(), _1, _2) ); } +BrokerReplicator::~BrokerReplicator() { } + // This is called in the connection IO thread when the bridge is started. void BrokerReplicator::initializeBridge(Bridge& bridge, SessionHandler& sessionHandler) { - framing::AMQP_ServerProxy peer(sessionHandler.out); + // Use the credentials of the outgoing Link connection for creating queues, + // exchanges etc. We know link->getConnection() is non-zero because we are + // being called in the connections thread context. + // + assert(link->getConnection()); + userId = link->getConnection()->getUserId(); + remoteHost = link->getConnection()->getUrl(); + + link->getRemoteAddress(primary); string queueName = bridge.getQueueName(); + + QPID_LOG(info, logPrefix << (initialized ? "Connecting" : "Failing over") + << " to primary " << primary + << " status:" << printable(haBroker.getStatus())); + initialized = true; + + framing::AMQP_ServerProxy peer(sessionHandler.out); const qmf::org::apache::qpid::broker::ArgsLinkBridge& args(bridge.getArgs()); //declare and bind an event queue FieldTable declareArgs; - declareArgs.setString(QPID_REPLICATE, str(RL_NONE)); + declareArgs.setString(QPID_REPLICATE, printable(NONE).str()); peer.getQueue().declare(queueName, "", false, false, true, true, declareArgs); - peer.getExchange().bind(queueName, QMF_DEFAULT_TOPIC, AGENT_IND_EVENT_ORG_APACHE_QPID_BROKER, FieldTable()); + peer.getExchange().bind(queueName, QMF_DEFAULT_TOPIC, AGENT_EVENT_BROKER, FieldTable()); + peer.getExchange().bind(queueName, QMF_DEFAULT_TOPIC, AGENT_EVENT_HA, FieldTable()); //subscribe to the queue peer.getMessage().subscribe(queueName, args.i_dest, 1, 0, false, "", 0, FieldTable()); peer.getMessage().flow(args.i_dest, 0, 0xFFFFFFFF); @@ -228,23 +244,30 @@ void BrokerReplicator::initializeBridge(Bridge& bridge, SessionHandler& sessionH sendQuery(ORG_APACHE_QPID_BROKER, QUEUE, queueName, sessionHandler); sendQuery(ORG_APACHE_QPID_BROKER, EXCHANGE, queueName, sessionHandler); sendQuery(ORG_APACHE_QPID_BROKER, BINDING, queueName, sessionHandler); - QPID_LOG(debug, "HA: Backup configuration bridge: " << queueName); } void BrokerReplicator::route(Deliverable& msg) { + // We transition from JOINING->CATCHUP on the first message received from the primary. + // Until now we couldn't be sure if we had a good connection to the primary. + if (haBroker.getStatus() == JOINING) { + haBroker.setStatus(CATCHUP); + QPID_LOG(notice, logPrefix << "Connected to primary " << primary); + } + const framing::FieldTable* headers = msg.getMessage().getApplicationHeaders(); + const MessageProperties* messageProperties = msg.getMessage().getProperties<MessageProperties>(); Variant::List list; try { - if (!isQMFv2(msg.getMessage()) || !headers) + if (!isQMFv2(msg.getMessage()) || !headers || !messageProperties) throw Exception("Unexpected message, not QMF2 event or query response."); // decode as list string content = msg.getMessage().getFrames().getContent(); amqp_0_10::ListCodec::decode(content, list); - + QPID_LOG(trace, "Broker replicator received: " << *messageProperties); if (headers->getAsString(QMF_CONTENT) == EVENT) { for (Variant::List::iterator i = list.begin(); i != list.end(); ++i) { Variant::Map& map = i->asMap(); - QPID_LOG(trace, "HA: Backup received event: " << map); + QPID_LOG(trace, "Broker replicator event: " << map); Variant::Map& schema = map[SCHEMA_ID].asMap(); Variant::Map& values = map[VALUES].asMap(); if (match<EventQueueDeclare>(schema)) doEventQueueDeclare(values); @@ -253,12 +276,13 @@ void BrokerReplicator::route(Deliverable& msg) { else if (match<EventExchangeDelete>(schema)) doEventExchangeDelete(values); else if (match<EventBind>(schema)) doEventBind(values); else if (match<EventUnbind>(schema)) doEventUnbind(values); + else if (match<EventMembersUpdate>(schema)) doEventMembersUpdate(values); } } else if (headers->getAsString(QMF_OPCODE) == QUERY_RESPONSE) { for (Variant::List::iterator i = list.begin(); i != list.end(); ++i) { Variant::Map& map = i->asMap(); - QPID_LOG(trace, "HA: Backup received event: " << map); - string type = map[SCHEMA_ID].asMap()[CLASS_NAME]; + QPID_LOG(trace, "Broker replicator response: " << map); + string type = map[SCHEMA_ID].asMap()[CLASS_NAME].asString(); Variant::Map& values = map[VALUES].asMap(); framing::FieldTable args; amqp_0_10::translate(asMapVoid(values[ARGUMENTS]), args); @@ -267,85 +291,82 @@ void BrokerReplicator::route(Deliverable& msg) { else if (type == BINDING) doResponseBind(values); else if (type == HA_BROKER) doResponseHaBroker(values); } + if (messageProperties->getCorrelationId() == EXCHANGE && !headers->isSet(PARTIAL)) { + // We have received all of the exchange response. + alternates.clear(); + } } } catch (const std::exception& e) { - QPID_LOG(critical, "HA: Backup configuration failed: " << e.what() + QPID_LOG(critical, logPrefix << "Configuration failed: " << e.what() << ": while handling: " << list); + haBroker.shutdown(); throw; } } + void BrokerReplicator::doEventQueueDeclare(Variant::Map& values) { - string name = values[QNAME].asString(); Variant::Map argsMap = asMapVoid(values[ARGS]); - if (values[DISP] == CREATED && replicateLevel(argsMap)) { + bool autoDel = values[AUTODEL].asBool(); + bool excl = values[EXCL].asBool(); + if (values[DISP] == CREATED && + replicationTest.isReplicated(CONFIGURATION, argsMap, autoDel, excl)) + { + string name = values[QNAME].asString(); + QPID_LOG(debug, logPrefix << "Queue declare event: " << name); framing::FieldTable args; amqp_0_10::translate(argsMap, args); - std::pair<boost::shared_ptr<Queue>, bool> result = - broker.createQueue( - name, - values[DURABLE].asBool(), - values[AUTODEL].asBool(), - 0 /*i.e. no owner regardless of exclusivity on master*/, - values[ALTEX].asString(), - args, - values[USER].asString(), - values[RHOST].asString()); - if (result.second) { - QPID_LOG(debug, "HA: Backup queue declare event: " << name); - startQueueReplicator(result.first); - } else { - // FIXME aconway 2011-12-02: what's the right way to handle this? - // Should we delete the old & re-create form the event? Responses - // may be old but events are always up-to-date. - QPID_LOG(warning, "HA: Backup queue declare event, already exists: " << name); + // If we already have a queue with this name, replace it. + // The queue was definitely created on the primary. + if (broker.getQueues().find(name)) { + QPID_LOG(warning, logPrefix << "Replacing exsiting queue: " << name); + broker.getQueues().destroy(name); + stopQueueReplicator(name); } + boost::shared_ptr<Queue> queue = createQueue( + name, values[DURABLE].asBool(), autoDel, args, values[ALTEX].asString()); + assert(queue); // Should be created since we destroed the previous queue above. + if (queue) startQueueReplicator(queue); } } +boost::shared_ptr<QueueReplicator> BrokerReplicator::findQueueReplicator( + const std::string& qname) +{ + string rname = QueueReplicator::replicatorName(qname); + boost::shared_ptr<broker::Exchange> ex = broker.getExchanges().find(rname); + return boost::dynamic_pointer_cast<QueueReplicator>(ex); +} + void BrokerReplicator::doEventQueueDelete(Variant::Map& values) { // The remote queue has already been deleted so replicator // sessions may be closed by a "queue deleted" exception. string name = values[QNAME].asString(); boost::shared_ptr<Queue> queue = broker.getQueues().find(name); - if (!queue) { - QPID_LOG(warning, "HA: Backup queue delete event, does not exist: " << name); - } else if (!replicateLevel(queue->getSettings())) { - QPID_LOG(warning, "HA: Backup queue delete event, not replicated: " << name); - } else { - string rname = QueueReplicator::replicatorName(name); - boost::shared_ptr<broker::Exchange> ex = broker.getExchanges().find(rname); - boost::shared_ptr<QueueReplicator> qr = boost::dynamic_pointer_cast<QueueReplicator>(ex); - if (qr) qr->deactivate(); - // QueueReplicator's bridge is now queued for destruction but may not - // actually be destroyed, deleting the exhange - broker.getExchanges().destroy(rname); - broker.deleteQueue(name, values[USER].asString(), values[RHOST].asString()); - QPID_LOG(debug, "HA: Backup queue delete event: " << name); + if (queue && replicationTest.replicateLevel(queue->getSettings())) { + QPID_LOG(debug, logPrefix << "Queue delete event: " << name); + stopQueueReplicator(name); + broker.deleteQueue(name, userId, remoteHost); } } void BrokerReplicator::doEventExchangeDeclare(Variant::Map& values) { Variant::Map argsMap(asMapVoid(values[ARGS])); - if (values[DISP] == CREATED && replicateLevel(argsMap)) { + if (!replicationTest.replicateLevel(argsMap)) return; // Not a replicated exchange. + if (values[DISP] == CREATED && replicationTest.replicateLevel(argsMap)) { string name = values[EXNAME].asString(); + QPID_LOG(debug, logPrefix << "Exchange declare event: " << name); framing::FieldTable args; amqp_0_10::translate(argsMap, args); - if (broker.createExchange( - name, - values[EXTYPE].asString(), - values[DURABLE].asBool(), - values[ALTEX].asString(), - args, - values[USER].asString(), - values[RHOST].asString()).second) - { - QPID_LOG(debug, "HA: Backup exchange declare event: " << name); - } else { - // FIXME aconway 2011-11-22: should delete pre-existing exchange - // and re-create from event. See comment in doEventQueueDeclare. - QPID_LOG(debug, "HA: Backup exchange declare event, already exists: " << name); + // If we already have a exchange with this name, replace it. + // The exchange was definitely created on the primary. + if (broker.getExchanges().find(name)) { + broker.getExchanges().destroy(name); + QPID_LOG(warning, logPrefix << "Replaced exsiting exchange: " << name); } + boost::shared_ptr<Exchange> exchange = + createExchange(name, values[EXTYPE].asString(), values[DURABLE].asBool(), args, values[ALTEX].asString()); + assert(exchange); } } @@ -353,15 +374,12 @@ void BrokerReplicator::doEventExchangeDelete(Variant::Map& values) { string name = values[EXNAME].asString(); boost::shared_ptr<Exchange> exchange = broker.getExchanges().find(name); if (!exchange) { - QPID_LOG(warning, "HA: Backup exchange delete event, does not exist: " << name); - } else if (!replicateLevel(exchange->getArgs())) { - QPID_LOG(warning, "HA: Backup exchange delete event, not replicated: " << name); + QPID_LOG(warning, logPrefix << "Exchange delete event, does not exist: " << name); + } else if (!replicationTest.replicateLevel(exchange->getArgs())) { + QPID_LOG(warning, logPrefix << "Exchange delete event, not replicated: " << name); } else { - QPID_LOG(debug, "HA: Backup exchange delete event:" << name); - broker.deleteExchange( - name, - values[USER].asString(), - values[RHOST].asString()); + QPID_LOG(debug, logPrefix << "Exchange delete event:" << name); + broker.deleteExchange(name, userId, remoteHost); } } @@ -372,16 +390,16 @@ void BrokerReplicator::doEventBind(Variant::Map& values) { broker.getQueues().find(values[QNAME].asString()); // We only replicate binds for a replicated queue to replicated // exchange that both exist locally. - if (exchange && replicateLevel(exchange->getArgs()) && - queue && replicateLevel(queue->getSettings())) + if (exchange && replicationTest.replicateLevel(exchange->getArgs()) && + queue && replicationTest.replicateLevel(queue->getSettings())) { framing::FieldTable args; amqp_0_10::translate(asMapVoid(values[ARGS]), args); string key = values[KEY].asString(); - exchange->bind(queue, key, &args); - QPID_LOG(debug, "HA: Backup bind event: exchange=" << exchange->getName() + QPID_LOG(debug, logPrefix << "Bind event: exchange=" << exchange->getName() << " queue=" << queue->getName() << " key=" << key); + exchange->bind(queue, key, &args); } } @@ -392,64 +410,71 @@ void BrokerReplicator::doEventUnbind(Variant::Map& values) { broker.getQueues().find(values[QNAME].asString()); // We only replicate unbinds for a replicated queue to replicated // exchange that both exist locally. - if (exchange && replicateLevel(exchange->getArgs()) && - queue && replicateLevel(queue->getSettings())) + if (exchange && replicationTest.replicateLevel(exchange->getArgs()) && + queue && replicationTest.replicateLevel(queue->getSettings())) { framing::FieldTable args; amqp_0_10::translate(asMapVoid(values[ARGS]), args); string key = values[KEY].asString(); - exchange->unbind(queue, key, &args); - QPID_LOG(debug, "HA: Backup unbind event: exchange=" << exchange->getName() + QPID_LOG(debug, logPrefix << "Unbind event: exchange=" << exchange->getName() << " queue=" << queue->getName() << " key=" << key); + exchange->unbind(queue, key, &args); } } +void BrokerReplicator::doEventMembersUpdate(Variant::Map& values) { + Variant::List members = values[MEMBERS].asList(); + haBroker.setMembership(members); +} + +namespace { + +// Get the alternate exchange from the exchange field of a queue or exchange response. +static const string EXCHANGE_KEY_PREFIX("org.apache.qpid.broker:exchange:"); + +string getAltExchange(const types::Variant& var) { + if (!var.isVoid()) { + management::ObjectId oid(var); + string key = oid.getV2Key(); + if (key.find(EXCHANGE_KEY_PREFIX) != 0) throw Exception("Invalid exchange reference: "+key); + return key.substr(EXCHANGE_KEY_PREFIX.size()); + } + else return string(); +} +} + void BrokerReplicator::doResponseQueue(Variant::Map& values) { Variant::Map argsMap(asMapVoid(values[ARGUMENTS])); - if (!replicateLevel(argsMap)) return; + if (!replicationTest.isReplicated( + CONFIGURATION, + values[ARGUMENTS].asMap(), + values[AUTODELETE].asBool(), + values[EXCLUSIVE].asBool())) + return; + string name(values[NAME].asString()); + QPID_LOG(debug, logPrefix << "Queue response: " << name); framing::FieldTable args; amqp_0_10::translate(argsMap, args); - string name(values[NAME].asString()); - std::pair<boost::shared_ptr<Queue>, bool> result = - broker.createQueue( - name, - values[DURABLE].asBool(), - values[AUTODELETE].asBool(), - 0 /*i.e. no owner regardless of exclusivity on master*/, - ""/*TODO: need to include alternate-exchange*/, - args, - ""/*TODO: who is the user?*/, - ""/*TODO: what should we use as connection id?*/); - if (result.second) { - QPID_LOG(debug, "HA: Backup queue response: " << name); - startQueueReplicator(result.first); - } else { - // FIXME aconway 2011-11-22: Normal to find queue already - // exists if we're failing over. - QPID_LOG(warning, "HA: Backup queue response, already exists: " << name); - } + boost::shared_ptr<Queue> queue = + createQueue(name, values[DURABLE].asBool(), values[AUTODELETE].asBool(), args, + getAltExchange(values[ALTEXCHANGE])); + // It is normal for the queue to already exist if we are failing over. + if (queue) startQueueReplicator(queue); + else QPID_LOG(debug, logPrefix << "Queue already replicated: " << name); } void BrokerReplicator::doResponseExchange(Variant::Map& values) { Variant::Map argsMap(asMapVoid(values[ARGUMENTS])); - if (!replicateLevel(argsMap)) return; + if (!replicationTest.replicateLevel(argsMap)) return; + string name = values[NAME].asString(); + QPID_LOG(debug, logPrefix << "Exchange response: " << name); framing::FieldTable args; amqp_0_10::translate(argsMap, args); - if (broker.createExchange( - values[NAME].asString(), - values[TYPE].asString(), - values[DURABLE].asBool(), - ""/*TODO: need to include alternate-exchange*/, - args, - ""/*TODO: who is the user?*/, - ""/*TODO: what should we use as connection id?*/).second) - { - QPID_LOG(debug, "HA: Backup exchange response: " << values[NAME].asString()); - } else { - QPID_LOG(warning, "HA: Backup exchange query, already exists: " << - values[QNAME].asString()); - } + boost::shared_ptr<Exchange> exchange = createExchange( + name, values[TYPE].asString(), values[DURABLE].asBool(), args, + getAltExchange(values[ALTEXCHANGE])); + QPID_LOG_IF(debug, !exchange, logPrefix << "Exchange already exists: " << name); } namespace { @@ -480,16 +505,16 @@ void BrokerReplicator::doResponseBind(Variant::Map& values) { boost::shared_ptr<Queue> queue = broker.getQueues().find(qName); // Automatically replicate binding if queue and exchange exist and are replicated - if (exchange && replicateLevel(exchange->getArgs()) && - queue && replicateLevel(queue->getSettings())) + if (exchange && replicationTest.replicateLevel(exchange->getArgs()) && + queue && replicationTest.replicateLevel(queue->getSettings())) { + string key = values[KEY].asString(); + QPID_LOG(debug, logPrefix << "Bind response: exchange:" << exName + << " queue:" << qName + << " key:" << key); framing::FieldTable args; amqp_0_10::translate(asMapVoid(values[ARGUMENTS]), args); - string key = values[KEY].asString(); exchange->bind(queue, key, &args); - QPID_LOG(debug, "HA: Backup bind response: exchange=" << exchange->getName() - << " queue=" << queue->getName() - << " key=" << key); } } @@ -500,30 +525,97 @@ const string REPLICATE_DEFAULT="replicateDefault"; // Received the ha-broker configuration object for the primary broker. void BrokerReplicator::doResponseHaBroker(Variant::Map& values) { try { - ReplicateLevel mine = haBroker.getSettings().replicateDefault; - ReplicateLevel primary = replicateLevel(values[REPLICATE_DEFAULT].asString()); - if (mine != primary) { - std::ostringstream os; - os << "Replicate default on backup (" << mine - << ") does not match primary (" << primary << ")"; - haBroker.shutdown(os.str()); - } + QPID_LOG(trace, logPrefix << "HA Broker response: " << values); + ReplicateLevel mine = haBroker.getSettings().replicateDefault.get(); + ReplicateLevel primary = replicationTest.replicateLevel( + values[REPLICATE_DEFAULT].asString()); + if (mine != primary) + throw Exception(QPID_MSG("Replicate default on backup (" << mine + << ") does not match primary (" << primary << ")")); + haBroker.setMembership(values[MEMBERS].asList()); } catch (const std::exception& e) { - std::ostringstream os; - os << "Received invalid replicate default from primary: " << e.what(); - haBroker.shutdown(os.str()); + QPID_LOG(critical, logPrefix << "Invalid HA Broker response: " << e.what() + << ": " << values); + haBroker.shutdown(); + throw; } } -void BrokerReplicator::startQueueReplicator(const boost::shared_ptr<Queue>& queue) { - if (replicateLevel(queue->getSettings()) == RL_ALL) { - boost::shared_ptr<QueueReplicator> qr(new QueueReplicator(queue, link)); +void BrokerReplicator::startQueueReplicator(const boost::shared_ptr<Queue>& queue) +{ + if (replicationTest.replicateLevel(queue->getSettings()) == ALL) { + boost::shared_ptr<QueueReplicator> qr( + new QueueReplicator(haBroker, queue, link)); if (!broker.getExchanges().registerExchange(qr)) throw Exception(QPID_MSG("Duplicate queue replicator " << qr->getName())); qr->activate(); } } +void BrokerReplicator::stopQueueReplicator(const std::string& name) { + boost::shared_ptr<QueueReplicator> qr = findQueueReplicator(name); + if (qr) { + qr->deactivate(); + // QueueReplicator's bridge is now queued for destruction but may not + // actually be destroyed. + broker.getExchanges().destroy(qr->getName()); + } +} + +boost::shared_ptr<Queue> BrokerReplicator::createQueue( + const std::string& name, + bool durable, + bool autodelete, + const qpid::framing::FieldTable& arguments, + const std::string& alternateExchange) +{ + std::pair<boost::shared_ptr<Queue>, bool> result = + broker.createQueue( + name, + durable, + autodelete, + 0, // no owner regardless of exclusivity on primary + string(), // Set alternate exchange below + arguments, + userId, + remoteHost); + if (result.second) { + if (!alternateExchange.empty()) { + alternates.setAlternate( + alternateExchange, boost::bind(&Queue::setAlternateExchange, result.first, _1)); + } + return result.first; + } + else return boost::shared_ptr<Queue>(); +} + +boost::shared_ptr<Exchange> BrokerReplicator::createExchange( + const std::string& name, + const std::string& type, + bool durable, + const qpid::framing::FieldTable& args, + const std::string& alternateExchange) +{ + std::pair<boost::shared_ptr<Exchange>, bool> result = + broker.createExchange( + name, + type, + durable, + string(), // Set alternate exchange below + args, + userId, + remoteHost); + if (result.second) { + alternates.addExchange(result.first); + if (!alternateExchange.empty()) { + alternates.setAlternate( + alternateExchange, boost::bind(&Exchange::setAlternate, result.first, _1)); + } + return result.first; + } + else return boost::shared_ptr<Exchange>(); +} + bool BrokerReplicator::bind(boost::shared_ptr<Queue>, const string&, const framing::FieldTable*) { return false; } bool BrokerReplicator::unbind(boost::shared_ptr<Queue>, const string&, const framing::FieldTable*) { return false; } bool BrokerReplicator::isBound(boost::shared_ptr<Queue>, const string* const, const framing::FieldTable* const) { return false; } |