diff options
Diffstat (limited to 'cpp/src/qpid/ha/BrokerReplicator.cpp')
-rw-r--r-- | cpp/src/qpid/ha/BrokerReplicator.cpp | 606 |
1 files changed, 448 insertions, 158 deletions
diff --git a/cpp/src/qpid/ha/BrokerReplicator.cpp b/cpp/src/qpid/ha/BrokerReplicator.cpp index 3a3c9c2954..983b976d76 100644 --- a/cpp/src/qpid/ha/BrokerReplicator.cpp +++ b/cpp/src/qpid/ha/BrokerReplicator.cpp @@ -23,16 +23,19 @@ #include "QueueReplicator.h" #include "qpid/broker/Broker.h" #include "qpid/broker/Connection.h" +#include "qpid/broker/ConnectionObserver.h" #include "qpid/broker/Queue.h" #include "qpid/broker/QueueSettings.h" #include "qpid/broker/Link.h" #include "qpid/broker/amqp_0_10/MessageTransfer.h" #include "qpid/framing/FieldTable.h" +#include "qpid/framing/FieldValue.h" #include "qpid/log/Statement.h" #include "qpid/amqp_0_10/Codecs.h" #include "qpid/broker/SessionHandler.h" #include "qpid/framing/reply_exceptions.h" #include "qpid/framing/MessageTransferBody.h" +#include "qpid/framing/reply_exceptions.h" #include "qmf/org/apache/qpid/broker/EventBind.h" #include "qmf/org/apache/qpid/broker/EventUnbind.h" #include "qmf/org/apache/qpid/broker/EventExchangeDeclare.h" @@ -41,6 +44,7 @@ #include "qmf/org/apache/qpid/broker/EventQueueDelete.h" #include "qmf/org/apache/qpid/broker/EventSubscribe.h" #include "qmf/org/apache/qpid/ha/EventMembersUpdate.h" +#include <boost/bind.hpp> #include <algorithm> #include <sstream> #include <iostream> @@ -57,23 +61,25 @@ using qmf::org::apache::qpid::broker::EventQueueDeclare; using qmf::org::apache::qpid::broker::EventQueueDelete; using qmf::org::apache::qpid::broker::EventSubscribe; using qmf::org::apache::qpid::ha::EventMembersUpdate; +using qpid::broker::amqp_0_10::MessageTransfer; using namespace framing; -using std::string; +using namespace std; using std::ostream; using types::Variant; using namespace broker; namespace { -const string QPID_CONFIGURATION_REPLICATOR("qpid.configuration-replicator"); +const string QPID_CONFIGURATION_REPLICATOR("qpid.broker-replicator"); const string CLASS_NAME("_class_name"); const string EVENT("_event"); const string OBJECT_NAME("_object_name"); const string PACKAGE_NAME("_package_name"); const string QUERY_RESPONSE("_query_response"); -const string SCHEMA_ID("_schema_id"); const string VALUES("_values"); +const string SCHEMA_ID("_schema_id"); +const string WHAT("_what"); const string ALTEX("altEx"); const string ALTEXCHANGE("altExchange"); @@ -81,24 +87,27 @@ const string ARGS("args"); const string ARGUMENTS("arguments"); const string AUTODEL("autoDel"); const string AUTODELETE("autoDelete"); -const string EXCL("excl"); -const string EXCLUSIVE("exclusive"); const string BIND("bind"); -const string UNBIND("unbind"); const string BINDING("binding"); +const string BINDING_KEY("bindingKey"); const string CREATED("created"); const string DISP("disp"); +const string DEST("dest"); const string DURABLE("durable"); const string EXCHANGE("exchange"); +const string EXCL("excl"); +const string EXCLUSIVE("exclusive"); const string EXNAME("exName"); const string EXTYPE("exType"); +const string HA_BROKER("habroker"); const string KEY("key"); const string NAME("name"); +const string PARTIAL("partial"); const string QNAME("qName"); const string QUEUE("queue"); const string TYPE("type"); -const string HA_BROKER("habroker"); -const string PARTIAL("partial"); +const string UNBIND("unbind"); +const string CONSUMER_COUNT("consumerCount"); const string AGENT_EVENT_BROKER("agent.ind.event.org_apache_qpid_broker.#"); const string AGENT_EVENT_HA("agent.ind.event.org_apache_qpid_ha.#"); @@ -107,10 +116,6 @@ const string QMF_CONTENT("qmf.content"); const string QMF_DEFAULT_TOPIC("qmf.default.topic"); const string QMF_OPCODE("qmf.opcode"); -const string _WHAT("_what"); -const string _CLASS_NAME("_class_name"); -const string _PACKAGE_NAME("_package_name"); -const string _SCHEMA_ID("_schema_id"); const string OBJECT("OBJECT"); const string ORG_APACHE_QPID_BROKER("org.apache.qpid.broker"); const string ORG_APACHE_QPID_HA("org.apache.qpid.ha"); @@ -118,21 +123,18 @@ const string QMF_DEFAULT_DIRECT("qmf.default.direct"); const string _QUERY_REQUEST("_query_request"); const string BROKER("broker"); const string MEMBERS("members"); - -template <class T> bool match(Variant::Map& schema) { - return T::match(schema[CLASS_NAME], schema[PACKAGE_NAME]); -} +const string AUTO_DELETE_TIMEOUT("qpid.auto_delete_timeout"); void sendQuery(const string& packageName, const string& className, const string& queueName, SessionHandler& sessionHandler) { framing::AMQP_ServerProxy peer(sessionHandler.out); Variant::Map request; - request[_WHAT] = OBJECT; + request[WHAT] = OBJECT; Variant::Map schema; - schema[_CLASS_NAME] = className; - schema[_PACKAGE_NAME] = packageName; - request[_SCHEMA_ID] = schema; + schema[CLASS_NAME] = className; + schema[PACKAGE_NAME] = packageName; + request[SCHEMA_ID] = schema; AMQFrame method((MessageTransferBody(ProtocolVersion(), QMF_DEFAULT_DIRECT, 0, 0))); method.setBof(true); @@ -170,19 +172,144 @@ Variant::Map asMapVoid(const Variant& value) { } } // namespace +// Listens for errors on the bridge session. +class BrokerReplicator::ErrorListener : public SessionHandler::ErrorListener { + public: + ErrorListener(const std::string& lp, BrokerReplicator& br) : + logPrefix(lp), brokerReplicator(br) {} + + void connectionException(framing::connection::CloseCode, const std::string& msg) { + QPID_LOG(error, logPrefix << "Connection error: " << msg); + } + void channelException(framing::session::DetachCode, const std::string& msg) { + QPID_LOG(error, logPrefix << "Channel error: " << msg); + } + void executionException(framing::execution::ErrorCode, const std::string& msg) { + QPID_LOG(error, logPrefix << "Execution error: " << msg); + } + void detach() { + QPID_LOG(debug, logPrefix << "Session detached."); + } + + private: + std::string logPrefix; + BrokerReplicator& brokerReplicator; +}; + +class BrokerReplicator::ConnectionObserver : public broker::ConnectionObserver +{ + public: + ConnectionObserver(BrokerReplicator& br) : brokerReplicator(br) {} + virtual void connection(Connection&) {} + virtual void opened(Connection&) {} + + virtual void closed(Connection& c) { + if (brokerReplicator.link && &c == brokerReplicator.connection) + brokerReplicator.disconnected(); + } + virtual void forced(Connection& c, const std::string& /*message*/) { closed(c); } + private: + BrokerReplicator& brokerReplicator; +}; + +/** Keep track of queues or exchanges during the update process to solve 2 + * problems. + * + * 1. Once all responses are processed, remove any queues/exchanges + * that were not mentioned as they no longer exist on the primary. + * + * 2. During the update if we see an event for an object we should + * ignore any subsequent responses for that object as they are out + * of date. + */ +class BrokerReplicator::UpdateTracker { + public: + typedef std::set<std::string> Names; + typedef boost::function<void (const std::string&)> CleanFn; + + UpdateTracker(const std::string& type_, // "queue" or "exchange" + CleanFn f, const ReplicationTest& rt) + : type(type_), cleanFn(f), repTest(rt) {} + + /** Destructor cleans up remaining initial queues. */ + ~UpdateTracker() { + // Don't throw in a destructor. + try { for_each(initial.begin(), initial.end(), cleanFn); } + catch (const std::exception& e) { + QPID_LOG(error, "Error in cleanup of lost objects: " << e.what()); + } + } + + /** Add an exchange name */ + void addExchange(Exchange::shared_ptr ex) { + if (repTest.getLevel(*ex)) + initial.insert(ex->getName()); + } + + /** Add a queue name. */ + void addQueue(Queue::shared_ptr q) { + if (repTest.getLevel(*q)) + initial.insert(q->getName()); + } + + /** Received an event for name */ + void event(const std::string& name) { + initial.erase(name); // no longer a candidate for deleting + events.insert(name); // we have seen an event for this name + } + + /** Received a response for name. + *@return true if this response should be processed, false if we have + *already seen an event for this object. + */ + bool response(const std::string& name) { + initial.erase(name); // no longer a candidate for deleting + return events.find(name) == events.end(); // true if no event seen yet. + } + + private: + void clean(const std::string& name) { + QPID_LOG(info, "Backup updated, deleting " << type << " " << name); + cleanFn(name); + } + + std::string type; + Names initial, events; + CleanFn cleanFn; + ReplicationTest repTest; +}; + BrokerReplicator::BrokerReplicator(HaBroker& hb, const boost::shared_ptr<Link>& l) : Exchange(QPID_CONFIGURATION_REPLICATOR), - logPrefix("Backup: "), replicationTest(hb.getReplicationTest()), - haBroker(hb), broker(hb.getBroker()), link(l), + logPrefix("Backup: "), replicationTest(NONE), + haBroker(hb), broker(hb.getBroker()), + exchanges(broker.getExchanges()), queues(broker.getQueues()), + link(l), initialized(false), - alternates(hb.getBroker().getExchanges()) -{} + alternates(hb.getBroker().getExchanges()), + connection(0) +{ + connectionObserver.reset(new ConnectionObserver(*this)); + broker.getConnectionObservers().add(connectionObserver); + framing::FieldTable args = getArgs(); + args.setString(QPID_REPLICATE, printable(NONE).str()); + setArgs(args); + + dispatch[EventQueueDeclare::getFullName()] = &BrokerReplicator::doEventQueueDeclare; + dispatch[EventQueueDelete::getFullName()] = &BrokerReplicator::doEventQueueDelete; + dispatch[EventExchangeDeclare::getFullName()] = &BrokerReplicator::doEventExchangeDeclare; + dispatch[EventExchangeDelete::getFullName()] = &BrokerReplicator::doEventExchangeDelete; + dispatch[EventBind::getFullName()] = &BrokerReplicator::doEventBind; + dispatch[EventUnbind::getFullName()] = &BrokerReplicator::doEventUnbind; + dispatch[EventMembersUpdate::getFullName()] = &BrokerReplicator::doEventMembersUpdate; + dispatch[EventSubscribe::getFullName()] = &BrokerReplicator::doEventSubscribe; +} void BrokerReplicator::initialize() { // Can't do this in the constructor because we need a shared_ptr to this. types::Uuid uuid(true); const std::string name(QPID_CONFIGURATION_REPLICATOR + ".bridge." + uuid.str()); - broker.getLinks().declare( + std::pair<Bridge::shared_ptr, bool> result = broker.getLinks().declare( name, // name for bridge *link, // parent false, // durable @@ -195,21 +322,47 @@ void BrokerReplicator::initialize() { "", // excludes false, // dynamic 0, // sync? - // shared_ptr keeps this in memory until outstanding initializeBridge + // shared_ptr keeps this in memory until outstanding connected // calls are run. - boost::bind(&BrokerReplicator::initializeBridge, shared_from_this(), _1, _2) + boost::bind(&BrokerReplicator::connected, shared_from_this(), _1, _2) ); + assert(result.second); + result.first->setErrorListener( + boost::shared_ptr<ErrorListener>(new ErrorListener(logPrefix, *this))); } -BrokerReplicator::~BrokerReplicator() { } +BrokerReplicator::~BrokerReplicator() { shutdown(); } + +namespace { +void collectQueueReplicators( + const boost::shared_ptr<Exchange> ex, set<boost::shared_ptr<QueueReplicator> >& collect) +{ + boost::shared_ptr<QueueReplicator> qr(boost::dynamic_pointer_cast<QueueReplicator>(ex)); + if (qr) collect.insert(qr); +} +} // namespace + +void BrokerReplicator::shutdown() { + // NOTE: this is called in a QMF dispatch thread, not the Link's connection + // thread. It's OK to be unlocked because it doesn't use any mutable state, + // it only calls thread safe functions objects belonging to the Broker. + + // Unregister with broker objects: + if (connectionObserver) { + broker.getConnectionObservers().remove(connectionObserver); + connectionObserver.reset(); + } + broker.getExchanges().destroy(getName()); +} // This is called in the connection IO thread when the bridge is started. -void BrokerReplicator::initializeBridge(Bridge& bridge, SessionHandler& sessionHandler) { +void BrokerReplicator::connected(Bridge& bridge, SessionHandler& sessionHandler) { // Use the credentials of the outgoing Link connection for creating queues, // exchanges etc. We know link->getConnection() is non-zero because we are // being called in the connections thread context. // - assert(link->getConnection()); + connection = link->getConnection(); + assert(connection); userId = link->getConnection()->getUserId(); remoteHost = link->getConnection()->getUrl(); @@ -221,6 +374,19 @@ void BrokerReplicator::initializeBridge(Bridge& bridge, SessionHandler& sessionH << " status:" << printable(haBroker.getStatus())); initialized = true; + exchangeTracker.reset( + new UpdateTracker("exchange", + boost::bind(&BrokerReplicator::deleteExchange, this, _1), + replicationTest)); + exchanges.eachExchange( + boost::bind(&UpdateTracker::addExchange, exchangeTracker.get(), _1)); + + queueTracker.reset( + new UpdateTracker("queue", + boost::bind(&BrokerReplicator::deleteQueue, this, _1, true), + replicationTest)); + queues.eachQueue(boost::bind(&UpdateTracker::addQueue, queueTracker.get(), _1)); + framing::AMQP_ServerProxy peer(sessionHandler.out); const qmf::org::apache::qpid::broker::ArgsLinkBridge& args(bridge.getArgs()); @@ -231,9 +397,14 @@ void BrokerReplicator::initializeBridge(Bridge& bridge, SessionHandler& sessionH peer.getExchange().bind(queueName, QMF_DEFAULT_TOPIC, AGENT_EVENT_BROKER, FieldTable()); peer.getExchange().bind(queueName, QMF_DEFAULT_TOPIC, AGENT_EVENT_HA, FieldTable()); //subscribe to the queue - peer.getMessage().subscribe(queueName, args.i_dest, 1, 0, false, "", 0, FieldTable()); - peer.getMessage().flow(args.i_dest, 0, 0xFFFFFFFF); - peer.getMessage().flow(args.i_dest, 1, 0xFFFFFFFF); + FieldTable arguments; + arguments.setInt(QueueReplicator::QPID_SYNC_FREQUENCY, 1); // FIXME aconway 2012-05-22: optimize? + peer.getMessage().subscribe( + queueName, args.i_dest, 1/*accept-none*/, 0/*pre-acquired*/, + false/*exclusive*/, "", 0, arguments); + peer.getMessage().setFlowMode(args.i_dest, 1); // Window + peer.getMessage().flow(args.i_dest, 0, haBroker.getSettings().getFlowMessages()); + peer.getMessage().flow(args.i_dest, 1, haBroker.getSettings().getFlowBytes()); // Issue a query request for queues, exchanges, bindings and the habroker // using event queue as the reply-to address @@ -247,12 +418,12 @@ void BrokerReplicator::route(Deliverable& msg) { // We transition from JOINING->CATCHUP on the first message received from the primary. // Until now we couldn't be sure if we had a good connection to the primary. if (haBroker.getStatus() == JOINING) { - haBroker.setStatus(CATCHUP); + haBroker.getMembership().setStatus(CATCHUP); QPID_LOG(notice, logPrefix << "Connected to primary " << primary); } Variant::List list; try { - if (!qpid::broker::amqp_0_10::MessageTransfer::isQMFv2(msg.getMessage())) + if (!MessageTransfer::isQMFv2(msg.getMessage())) throw Exception("Unexpected message, not QMF2 event or query response."); // decode as list string content = msg.getMessage().getContent(); @@ -264,13 +435,9 @@ void BrokerReplicator::route(Deliverable& msg) { QPID_LOG(trace, "Broker replicator event: " << map); Variant::Map& schema = map[SCHEMA_ID].asMap(); Variant::Map& values = map[VALUES].asMap(); - if (match<EventQueueDeclare>(schema)) doEventQueueDeclare(values); - else if (match<EventQueueDelete>(schema)) doEventQueueDelete(values); - else if (match<EventExchangeDeclare>(schema)) doEventExchangeDeclare(values); - else if (match<EventExchangeDelete>(schema)) doEventExchangeDelete(values); - else if (match<EventBind>(schema)) doEventBind(values); - else if (match<EventUnbind>(schema)) doEventUnbind(values); - else if (match<EventMembersUpdate>(schema)) doEventMembersUpdate(values); + EventKey key(schema[PACKAGE_NAME], schema[CLASS_NAME]); + EventDispatchMap::iterator j = dispatch.find(key); + if (j != dispatch.end()) (this->*(j->second))(values); } } else if (msg.getMessage().getPropertyAsString(QMF_OPCODE) == QUERY_RESPONSE) { for (Variant::List::iterator i = list.begin(); i != list.end(); ++i) { @@ -285,15 +452,21 @@ void BrokerReplicator::route(Deliverable& msg) { else if (type == BINDING) doResponseBind(values); else if (type == HA_BROKER) doResponseHaBroker(values); } - if (qpid::broker::amqp_0_10::MessageTransfer::isLastQMFResponse(msg.getMessage(), EXCHANGE)) { - // We have received all of the exchange response. + if (MessageTransfer::isLastQMFResponse(msg.getMessage(), EXCHANGE)) { + QPID_LOG(debug, logPrefix << "All exchange responses received.") + exchangeTracker.reset(); // Clean up exchanges that no longer exist in the primary alternates.clear(); } + if (MessageTransfer::isLastQMFResponse(msg.getMessage(), QUEUE)) { + QPID_LOG(debug, logPrefix << "All queue responses received."); + queueTracker.reset(); // Clean up queues that no longer exist in the primary + } } } catch (const std::exception& e) { - QPID_LOG(critical, logPrefix << "Configuration failed: " << e.what() - << ": while handling: " << list); - haBroker.shutdown(); +; + haBroker.shutdown( + QPID_MSG(logPrefix << "Configuration replication failed: " + << e.what() << ": while handling: " << list)); throw; } } @@ -301,31 +474,22 @@ void BrokerReplicator::route(Deliverable& msg) { void BrokerReplicator::doEventQueueDeclare(Variant::Map& values) { Variant::Map argsMap = asMapVoid(values[ARGS]); - bool autoDel = values[AUTODEL].asBool(); - bool excl = values[EXCL].asBool(); - if (values[DISP] == CREATED && replicationTest.isReplicated(CONFIGURATION, argsMap, autoDel, excl)) { + if (values[DISP] == CREATED && replicationTest.getLevel(argsMap)) { string name = values[QNAME].asString(); QueueSettings settings(values[DURABLE].asBool(), values[AUTODEL].asBool()); + QPID_LOG(debug, logPrefix << "Queue declare event: " << name); + if (queueTracker.get()) queueTracker->event(name); framing::FieldTable args; qpid::amqp_0_10::translate(argsMap, args); // If we already have a queue with this name, replace it. // The queue was definitely created on the primary. - if (broker.getQueues().find(name)) { - QPID_LOG(warning, logPrefix << "Replacing exsiting queue: " << name); - broker.getQueues().destroy(name); - stopQueueReplicator(name); + if (queues.find(name)) { + QPID_LOG(warning, logPrefix << "Declare event, replacing exsiting queue: " + << name); + deleteQueue(name); } - settings.populate(args, settings.storeSettings); - std::pair<boost::shared_ptr<Queue>, bool> result = - broker.createQueue( - name, - settings, - 0 /*i.e. no owner regardless of exclusivity on master*/, - values[ALTEX].asString(), - userId, - remoteHost); - assert(result.second); // Should be true since we destroyed existing queue above - startQueueReplicator(result.first); + replicateQueue(name, values[DURABLE].asBool(), values[AUTODEL].asBool(), args, + values[ALTEX].asString()); } } @@ -333,7 +497,7 @@ boost::shared_ptr<QueueReplicator> BrokerReplicator::findQueueReplicator( const std::string& qname) { string rname = QueueReplicator::replicatorName(qname); - boost::shared_ptr<broker::Exchange> ex = broker.getExchanges().find(rname); + boost::shared_ptr<broker::Exchange> ex = exchanges.find(rname); return boost::dynamic_pointer_cast<QueueReplicator>(ex); } @@ -341,79 +505,85 @@ void BrokerReplicator::doEventQueueDelete(Variant::Map& values) { // The remote queue has already been deleted so replicator // sessions may be closed by a "queue deleted" exception. string name = values[QNAME].asString(); - boost::shared_ptr<Queue> queue = broker.getQueues().find(name); - if (queue && replicationTest.replicateLevel(queue->getSettings().storeSettings)) { + boost::shared_ptr<Queue> queue = queues.find(name); + if (queue && replicationTest.getLevel(*queue)) { QPID_LOG(debug, logPrefix << "Queue delete event: " << name); - stopQueueReplicator(name); - broker.deleteQueue(name, userId, remoteHost); + if (queueTracker.get()) queueTracker->event(name); + deleteQueue(name); } } void BrokerReplicator::doEventExchangeDeclare(Variant::Map& values) { Variant::Map argsMap(asMapVoid(values[ARGS])); - if (!replicationTest.replicateLevel(argsMap)) return; // Not a replicated exchange. - if (values[DISP] == CREATED && replicationTest.replicateLevel(argsMap)) { + if (values[DISP] == CREATED && replicationTest.getLevel(argsMap)) { string name = values[EXNAME].asString(); QPID_LOG(debug, logPrefix << "Exchange declare event: " << name); + if (exchangeTracker.get()) exchangeTracker->event(name); framing::FieldTable args; qpid::amqp_0_10::translate(argsMap, args); // If we already have a exchange with this name, replace it. // The exchange was definitely created on the primary. - if (broker.getExchanges().find(name)) { - broker.getExchanges().destroy(name); - QPID_LOG(warning, logPrefix << "Replaced exsiting exchange: " << name); + if (exchanges.find(name)) { + deleteExchange(name); + QPID_LOG(warning, logPrefix << "Declare event, replacing existing exchange: " + << name); } - boost::shared_ptr<Exchange> exchange = - createExchange(name, values[EXTYPE].asString(), values[DURABLE].asBool(), args, values[ALTEX].asString()); - assert(exchange); + CreateExchangeResult result = createExchange( + name, values[EXTYPE].asString(), values[DURABLE].asBool(), args, + values[ALTEX].asString()); + replicatedExchanges.insert(name); + assert(result.second); } } void BrokerReplicator::doEventExchangeDelete(Variant::Map& values) { string name = values[EXNAME].asString(); - boost::shared_ptr<Exchange> exchange = broker.getExchanges().find(name); + boost::shared_ptr<Exchange> exchange = exchanges.find(name); if (!exchange) { - QPID_LOG(warning, logPrefix << "Exchange delete event, does not exist: " << name); - } else if (!replicationTest.replicateLevel(exchange->getArgs())) { + QPID_LOG(warning, logPrefix << "Exchange delete event, not found: " << name); + } else if (!replicationTest.getLevel(*exchange)) { QPID_LOG(warning, logPrefix << "Exchange delete event, not replicated: " << name); } else { QPID_LOG(debug, logPrefix << "Exchange delete event:" << name); - broker.deleteExchange(name, userId, remoteHost); + if (exchangeTracker.get()) exchangeTracker->event(name); + deleteExchange(name); + replicatedExchanges.erase(name); } } void BrokerReplicator::doEventBind(Variant::Map& values) { boost::shared_ptr<Exchange> exchange = - broker.getExchanges().find(values[EXNAME].asString()); + exchanges.find(values[EXNAME].asString()); boost::shared_ptr<Queue> queue = - broker.getQueues().find(values[QNAME].asString()); - // We only replicate binds for a replicated queue to replicated - // exchange that both exist locally. - if (exchange && replicationTest.replicateLevel(exchange->getArgs()) && - queue && replicationTest.replicateLevel(queue->getSettings().storeSettings)) + queues.find(values[QNAME].asString()); + framing::FieldTable args; + qpid::amqp_0_10::translate(asMapVoid(values[ARGS]), args); + // We only replicate binds for a replicated queue to replicated exchange + // that both exist locally. Respect the replication level set in the + // bind arguments, but replicate by default. + if (exchange && replicationTest.getLevel(*exchange) && + queue && replicationTest.getLevel(*queue) && + ReplicationTest(ALL).getLevel(args)) { - framing::FieldTable args; - qpid::amqp_0_10::translate(asMapVoid(values[ARGS]), args); string key = values[KEY].asString(); QPID_LOG(debug, logPrefix << "Bind event: exchange=" << exchange->getName() << " queue=" << queue->getName() - << " key=" << key); + << " key=" << key + << " args=" << args); exchange->bind(queue, key, &args, 0); } } void BrokerReplicator::doEventUnbind(Variant::Map& values) { boost::shared_ptr<Exchange> exchange = - broker.getExchanges().find(values[EXNAME].asString()); + exchanges.find(values[EXNAME].asString()); boost::shared_ptr<Queue> queue = - broker.getQueues().find(values[QNAME].asString()); + queues.find(values[QNAME].asString()); // We only replicate unbinds for a replicated queue to replicated // exchange that both exist locally. - if (exchange && replicationTest.replicateLevel(exchange->getArgs()) && - queue && replicationTest.replicateLevel(queue->getSettings().storeSettings)) + if (exchange && replicationTest.getLevel(*exchange) && + queue && replicationTest.getLevel(*queue)) { - framing::FieldTable args; - qpid::amqp_0_10::translate(asMapVoid(values[ARGS]), args); string key = values[KEY].asString(); QPID_LOG(debug, logPrefix << "Unbind event: exchange=" << exchange->getName() << " queue=" << queue->getName() @@ -424,7 +594,17 @@ void BrokerReplicator::doEventUnbind(Variant::Map& values) { void BrokerReplicator::doEventMembersUpdate(Variant::Map& values) { Variant::List members = values[MEMBERS].asList(); - haBroker.setMembership(members); + setMembership(members); +} + +void BrokerReplicator::doEventSubscribe(Variant::Map& values) { + // Ignore queue replicator subscriptions. + if (QueueReplicator::isReplicatorName(values[DEST].asString())) return; + boost::shared_ptr<QueueReplicator> qr = findQueueReplicator(values[QNAME]); + if (qr) { + qr->setSubscribed(); + QPID_LOG(debug, logPrefix << "Subscribe event: " << values[QNAME]); + } } namespace { @@ -441,40 +621,68 @@ string getAltExchange(const types::Variant& var) { } else return string(); } + +Variant getHaUuid(const Variant::Map& map) { + Variant::Map::const_iterator i = map.find(QPID_HA_UUID); + return i == map.end() ? Variant() : i->second; } +} // namespace + + void BrokerReplicator::doResponseQueue(Variant::Map& values) { Variant::Map argsMap(asMapVoid(values[ARGUMENTS])); - if (!replicationTest.isReplicated( - CONFIGURATION, - values[ARGUMENTS].asMap(), - values[AUTODELETE].asBool(), - values[EXCLUSIVE].asBool())) - return; + if (!replicationTest.getLevel(argsMap)) return; string name(values[NAME].asString()); + if (!queueTracker.get()) + throw Exception(QPID_MSG("Unexpected queue response: " << values)); + if (!queueTracker->response(name)) return; // Response is out-of-date QPID_LOG(debug, logPrefix << "Queue response: " << name); + // If we see a queue with the same name as one we have, but not the same UUID, + // then replace the one we have. + boost::shared_ptr<Queue> queue = queues.find(name); + if (queue && getHaUuid(queue->getSettings().original) != getHaUuid(argsMap)) { + QPID_LOG(warning, logPrefix << "UUID mismatch, replacing queue: " + << name); + deleteQueue(name); + } framing::FieldTable args; qpid::amqp_0_10::translate(argsMap, args); - boost::shared_ptr<Queue> queue = - createQueue(name, values[DURABLE].asBool(), values[AUTODELETE].asBool(), args, - getAltExchange(values[ALTEXCHANGE])); - // It is normal for the queue to already exist if we are failing over. - if (queue) startQueueReplicator(queue); - else QPID_LOG(debug, logPrefix << "Queue already replicated: " << name); + boost::shared_ptr<QueueReplicator> qr = replicateQueue( + name, values[DURABLE].asBool(), values[AUTODELETE].asBool(), args, + getAltExchange(values[ALTEXCHANGE])); + if (qr) { + Variant::Map::const_iterator i = values.find(CONSUMER_COUNT); + if (i != values.end() && isIntegerType(i->second.getType())) { + if (i->second.asInt64()) qr->setSubscribed(); + } + } } void BrokerReplicator::doResponseExchange(Variant::Map& values) { Variant::Map argsMap(asMapVoid(values[ARGUMENTS])); - if (!replicationTest.replicateLevel(argsMap)) return; + if (!replicationTest.getLevel(argsMap)) return; string name = values[NAME].asString(); + if (!exchangeTracker.get()) + throw Exception(QPID_MSG("Unexpected exchange response: " << values)); + if (!exchangeTracker->response(name)) return; // Response is out of date. QPID_LOG(debug, logPrefix << "Exchange response: " << name); framing::FieldTable args; qpid::amqp_0_10::translate(argsMap, args); - boost::shared_ptr<Exchange> exchange = createExchange( + // If we see an exchange with the same name as one we have, but not the same UUID, + // then replace the one we have. + boost::shared_ptr<Exchange> exchange = exchanges.find(name); + if (exchange && + exchange->getArgs().getAsString(QPID_HA_UUID) != args.getAsString(QPID_HA_UUID)) + { + QPID_LOG(warning, logPrefix << "UUID mismatch, replacing exchange: " + << name); + deleteExchange(name); + } + CreateExchangeResult result = createExchange( name, values[TYPE].asString(), values[DURABLE].asBool(), args, getAltExchange(values[ALTEXCHANGE])); - // It is normal for the exchange to already exist if we are failing over. - QPID_LOG_IF(debug, !exchange, logPrefix << "Exchange already replicated: " << name); + replicatedExchanges.insert(name); } namespace { @@ -501,19 +709,25 @@ const std::string QUEUE_REF("queueRef"); void BrokerReplicator::doResponseBind(Variant::Map& values) { std::string exName = getRefName(EXCHANGE_REF_PREFIX, values[EXCHANGE_REF]); std::string qName = getRefName(QUEUE_REF_PREFIX, values[QUEUE_REF]); - boost::shared_ptr<Exchange> exchange = broker.getExchanges().find(exName); - boost::shared_ptr<Queue> queue = broker.getQueues().find(qName); + boost::shared_ptr<Exchange> exchange = exchanges.find(exName); + boost::shared_ptr<Queue> queue = queues.find(qName); - // Automatically replicate binding if queue and exchange exist and are replicated - if (exchange && replicationTest.replicateLevel(exchange->getArgs()) && - queue && replicationTest.replicateLevel(queue->getSettings().storeSettings)) + framing::FieldTable args; + qpid::amqp_0_10::translate(asMapVoid(values[ARGUMENTS]), args); + + // Automatically replicate binding if queue and exchange exist and are replicated. + // Respect replicate setting in binding args but default to replicated. + if (exchange && replicationTest.getLevel(*exchange) && + queue && replicationTest.getLevel(*queue) && + ReplicationTest(ALL).getLevel(args)) { - string key = values[KEY].asString(); + string key = values[BINDING_KEY].asString(); QPID_LOG(debug, logPrefix << "Bind response: exchange:" << exName << " queue:" << qName - << " key:" << key); - framing::FieldTable args; - qpid::amqp_0_10::translate(asMapVoid(values[ARGUMENTS]), args); + << " key:" << key + << " args:" << args); +// framing::FieldTable args; +// qpid::amqp_0_10::translate(asMapVoid(values[ARGUMENTS]), args); exchange->bind(queue, key, &args, 0); } } @@ -527,42 +741,65 @@ void BrokerReplicator::doResponseHaBroker(Variant::Map& values) { try { QPID_LOG(trace, logPrefix << "HA Broker response: " << values); ReplicateLevel mine = haBroker.getSettings().replicateDefault.get(); - ReplicateLevel primary = replicationTest.replicateLevel( - values[REPLICATE_DEFAULT].asString()); + ReplicateLevel primary = replicationTest.getLevel(values[REPLICATE_DEFAULT].asString()); if (mine != primary) throw Exception(QPID_MSG("Replicate default on backup (" << mine << ") does not match primary (" << primary << ")")); - haBroker.setMembership(values[MEMBERS].asList()); + setMembership(values[MEMBERS].asList()); } catch (const std::exception& e) { - QPID_LOG(critical, logPrefix << "Invalid HA Broker response: " << e.what() - << ": " << values); - haBroker.shutdown(); + haBroker.shutdown( + QPID_MSG(logPrefix << "Invalid HA Broker response: " << e.what() + << ": " << values)); + throw; } } -void BrokerReplicator::startQueueReplicator(const boost::shared_ptr<Queue>& queue) +boost::shared_ptr<QueueReplicator> BrokerReplicator::startQueueReplicator( + const boost::shared_ptr<Queue>& queue) { - if (replicationTest.replicateLevel(queue->getSettings().storeSettings) == ALL) { + if (replicationTest.getLevel(*queue) == ALL) { boost::shared_ptr<QueueReplicator> qr( new QueueReplicator(haBroker, queue, link)); - if (!broker.getExchanges().registerExchange(qr)) + if (!exchanges.registerExchange(qr)) throw Exception(QPID_MSG("Duplicate queue replicator " << qr->getName())); qr->activate(); + return qr; } + return boost::shared_ptr<QueueReplicator>(); } -void BrokerReplicator::stopQueueReplicator(const std::string& name) { - boost::shared_ptr<QueueReplicator> qr = findQueueReplicator(name); - if (qr) { - qr->deactivate(); - // QueueReplicator's bridge is now queued for destruction but may not - // actually be destroyed. - broker.getExchanges().destroy(qr->getName()); +void BrokerReplicator::deleteQueue(const std::string& name, bool purge) { + Queue::shared_ptr queue = queues.find(name); + if (queue) { + // Purge before deleting to ensure that we don't reroute any + // messages. Any reroutes will be done at the primary and + // replicated as normal. + if (purge) queue->purge(0, boost::shared_ptr<Exchange>()); + broker.deleteQueue(name, userId, remoteHost); + QPID_LOG(debug, logPrefix << "Queue deleted: " << name); + } +} + +void BrokerReplicator::deleteExchange(const std::string& name) { + try { + boost::shared_ptr<broker::Exchange> exchange = exchanges.find(name); + if (!exchange) { + QPID_LOG(warning, logPrefix << "Cannot delete exchange, not found: " << name); + return; + } + if (exchange->inUseAsAlternate()) { + QPID_LOG(warning, "Cannot delete exchange, in use as alternate: " << name); + return; + } + broker.deleteExchange(name, userId, remoteHost); + QPID_LOG(debug, logPrefix << "Exchange deleted: " << name); + } catch (const framing::NotFoundException&) { + QPID_LOG(debug, logPrefix << "Exchange not found for deletion: " << name); } } -boost::shared_ptr<Queue> BrokerReplicator::createQueue( +boost::shared_ptr<QueueReplicator> BrokerReplicator::replicateQueue( const std::string& name, bool durable, bool autodelete, @@ -571,7 +808,7 @@ boost::shared_ptr<Queue> BrokerReplicator::createQueue( { QueueSettings settings(durable, autodelete); settings.populate(arguments, settings.storeSettings); - std::pair<boost::shared_ptr<Queue>, bool> result = + CreateQueueResult result = broker.createQueue( name, settings, @@ -579,24 +816,23 @@ boost::shared_ptr<Queue> BrokerReplicator::createQueue( string(), // Set alternate exchange below userId, remoteHost); - if (result.second) { - if (!alternateExchange.empty()) { - alternates.setAlternate( - alternateExchange, boost::bind(&Queue::setAlternateExchange, result.first, _1)); - } - return result.first; + boost::shared_ptr<QueueReplicator> qr; + if (!findQueueReplicator(name)) qr = startQueueReplicator(result.first); + if (result.second && !alternateExchange.empty()) { + alternates.setAlternate( + alternateExchange, boost::bind(&Queue::setAlternateExchange, result.first, _1)); } - else return boost::shared_ptr<Queue>(); + return qr; } -boost::shared_ptr<Exchange> BrokerReplicator::createExchange( +BrokerReplicator::CreateExchangeResult BrokerReplicator::createExchange( const std::string& name, const std::string& type, bool durable, const qpid::framing::FieldTable& args, const std::string& alternateExchange) { - std::pair<boost::shared_ptr<Exchange>, bool> result = + CreateExchangeResult result = broker.createExchange( name, type, @@ -605,15 +841,12 @@ boost::shared_ptr<Exchange> BrokerReplicator::createExchange( args, userId, remoteHost); - if (result.second) { - alternates.addExchange(result.first); - if (!alternateExchange.empty()) { - alternates.setAlternate( - alternateExchange, boost::bind(&Exchange::setAlternate, result.first, _1)); - } - return result.first; + alternates.addExchange(result.first); + if (!alternateExchange.empty()) { + alternates.setAlternate( + alternateExchange, boost::bind(&Exchange::setAlternate, result.first, _1)); } - else return boost::shared_ptr<Exchange>(); + return result; } bool BrokerReplicator::bind(boost::shared_ptr<Queue>, const string&, const framing::FieldTable*, qpid::broker::AsyncStore* const) { return false; } @@ -626,4 +859,61 @@ void BrokerReplicator::write(char* /*target*/) {} string BrokerReplicator::getType() const { return QPID_CONFIGURATION_REPLICATOR; } +void BrokerReplicator::autoDeleteCheck(boost::shared_ptr<Exchange> ex) { + boost::shared_ptr<QueueReplicator> qr(boost::dynamic_pointer_cast<QueueReplicator>(ex)); + if (!qr) return; + assert(qr); + if (qr->getQueue()->isAutoDelete() && qr->isSubscribed()) { + if (qr->getQueue()->getSettings().autoDeleteDelay) { + // Start the auto-delete timer + Queue::tryAutoDelete(broker, qr->getQueue(), remoteHost, userId); + } + else { + // Delete immediately. Don't purge, the primary is gone so we need + // to reroute the deleted messages. + deleteQueue(qr->getQueue()->getName(), false); + } + } +} + +// Callback function for accumulating exchange candidates +namespace { + void exchangeAccumulatorCallback(vector<boost::shared_ptr<Exchange> >& c, const Exchange::shared_ptr& i) { + c.push_back(i); + } +} + +void BrokerReplicator::disconnected() { + QPID_LOG(info, logPrefix << "Disconnected from " << primary); + connection = 0; + // Clean up auto-delete queues + vector<boost::shared_ptr<Exchange> > collect; + // Make a copy so we can work outside the ExchangeRegistry lock + exchanges.eachExchange( + boost::bind(&exchangeAccumulatorCallback, boost::ref(collect), _1)); + for_each(collect.begin(), collect.end(), + boost::bind(&BrokerReplicator::autoDeleteCheck, this, _1)); +} + +void BrokerReplicator::setMembership(const Variant::List& brokers) { + Membership& membership(haBroker.getMembership()); + membership.assign(brokers); + // Check if the primary has signalled a change in my status: + // from CATCHUP to READY when we are caught up. + // from READY TO CATCHUP if we are timed out during fail-over. + BrokerInfo info; + if (membership.get(membership.getSelf(), info)) { + BrokerStatus oldStatus = haBroker.getStatus(); + BrokerStatus newStatus = info.getStatus(); + if (oldStatus == CATCHUP && newStatus == READY) { + QPID_LOG(info, logPrefix << logPrefix << "Caught-up and ready"); + haBroker.getMembership().setStatus(READY); + } + else if (oldStatus == READY && newStatus == CATCHUP) { + QPID_LOG(info, logPrefix << logPrefix << "No longer ready, catching up"); + haBroker.getMembership().setStatus(CATCHUP); + } + } +} + }} // namespace broker |