From bf08e0296d434464bcc6f4af9c4a32a2397089cb Mon Sep 17 00:00:00 2001 From: Alan Conway Date: Thu, 26 Jul 2012 21:26:37 +0000 Subject: QPID-4107 HA does not replicate alternate-exchange Set alternate exchange on replicated queues and exchanges. If the exchange is available, set it immediately. Otherwise remember what needs to be set so it can be set when the exchange becomes available. git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/0.18@1366206 13f79535-47bb-0310-9956-ffa450edef68 --- qpid/cpp/src/CMakeLists.txt | 1 + qpid/cpp/src/ha.mk | 1 + qpid/cpp/src/qpid/ha/AlternateExchangeSetter.h | 73 ++++++++++++ qpid/cpp/src/qpid/ha/BrokerReplicator.cpp | 156 ++++++++++++++++--------- qpid/cpp/src/qpid/ha/BrokerReplicator.h | 17 +++ qpid/cpp/src/tests/ha_tests.py | 51 +++++++- 6 files changed, 244 insertions(+), 55 deletions(-) create mode 100644 qpid/cpp/src/qpid/ha/AlternateExchangeSetter.h diff --git a/qpid/cpp/src/CMakeLists.txt b/qpid/cpp/src/CMakeLists.txt index e27aca0630..bb8a1a7227 100644 --- a/qpid/cpp/src/CMakeLists.txt +++ b/qpid/cpp/src/CMakeLists.txt @@ -626,6 +626,7 @@ set (ha_default ON) option(BUILD_HA "Build Active-Passive HA plugin" ${ha_default}) if (BUILD_HA) set (ha_SOURCES + qpid/ha/AltExchangeSetter.h qpid/ha/BackupConnectionExcluder.h qpid/ha/BrokerInfo.cpp qpid/ha/BrokerInfo.h diff --git a/qpid/cpp/src/ha.mk b/qpid/cpp/src/ha.mk index 906dd775bd..9ec46a5156 100644 --- a/qpid/cpp/src/ha.mk +++ b/qpid/cpp/src/ha.mk @@ -23,6 +23,7 @@ dmoduleexec_LTLIBRARIES += ha.la ha_la_SOURCES = \ + qpid/ha/AltExchangeSetter.h \ qpid/ha/Backup.cpp \ qpid/ha/Backup.h \ qpid/ha/BackupConnectionExcluder.h \ diff --git a/qpid/cpp/src/qpid/ha/AlternateExchangeSetter.h b/qpid/cpp/src/qpid/ha/AlternateExchangeSetter.h new file mode 100644 index 0000000000..08690e68bc --- /dev/null +++ b/qpid/cpp/src/qpid/ha/AlternateExchangeSetter.h @@ -0,0 +1,73 @@ +#ifndef QPID_HA_ALTERNATEEXCHANGESETTER_H +#define QPID_HA_ALTERNATEEXCHANGESETTER_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/log/Statement.h" +#include "qpid/broker/Exchange.h" +#include "qpid/broker/ExchangeRegistry.h" +#include "boost/function.hpp" +#include + +namespace qpid { +namespace ha { + +/** + * Sets the alternate exchange on queues and exchanges. + * Holds onto queues/exchanges if necessary till the alternate exchange is available. + * THREAD UNSAFE + */ +class AlternateExchangeSetter +{ + public: + typedef boost::function)> SetFunction; + + AlternateExchangeSetter(broker::ExchangeRegistry& er) : exchanges(er) {} + + void setAlternate(const std::string& altEx, const SetFunction& setter) { + broker::Exchange::shared_ptr ex = exchanges.find(altEx); + if (ex) setter(ex); // Set immediately. + else setters.insert(Setters::value_type(altEx, setter)); // Save for later. + } + + void addExchange(boost::shared_ptr exchange) { + // Update the setters for this exchange + std::pair range = setters.equal_range(exchange->getName()); + for (Setters::iterator i = range.first; i != range.second; ++i) + i->second(exchange); + setters.erase(range.first, range.second); + } + + void clear() { + if (!setters.empty()) + QPID_LOG(warning, "Some alternate exchanges were not resolved."); + setters.clear(); + } + + private: + typedef std::multimap Setters; + broker::ExchangeRegistry& exchanges; + Setters setters; +}; +}} // namespace qpid::ha + +#endif /*!QPID_HA_ALTERNATEEXCHANGESETTER_H*/ diff --git a/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp b/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp index 1a1cad1e42..9214bc2f87 100644 --- a/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp +++ b/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp @@ -41,6 +41,7 @@ #include "qmf/org/apache/qpid/ha/EventMembersUpdate.h" #include #include +#include #include namespace qpid { @@ -56,6 +57,7 @@ using qmf::org::apache::qpid::broker::EventSubscribe; using qmf::org::apache::qpid::ha::EventMembersUpdate; using namespace framing; using std::string; +using std::ostream; using types::Variant; using namespace broker; @@ -72,6 +74,7 @@ const string SCHEMA_ID("_schema_id"); const string VALUES("_values"); const string ALTEX("altEx"); +const string ALTEXCHANGE("altExchange"); const string ARGS("args"); const string ARGUMENTS("arguments"); const string AUTODEL("autoDel"); @@ -93,6 +96,7 @@ const string QNAME("qName"); const string QUEUE("queue"); const string TYPE("type"); const string HA_BROKER("habroker"); +const string PARTIAL("partial"); const string AGENT_EVENT_BROKER("agent.ind.event.org_apache_qpid_broker.#"); const string AGENT_EVENT_HA("agent.ind.event.org_apache_qpid_ha.#"); @@ -122,7 +126,9 @@ template bool match(Variant::Map& schema) { return T::match(schema[CLASS_NAME], schema[PACKAGE_NAME]); } -void sendQuery(const string& packageName, const string& className, const string& queueName, SessionHandler& sessionHandler) { +void sendQuery(const string& packageName, const string& className, const string& queueName, + SessionHandler& sessionHandler) +{ framing::AMQP_ServerProxy peer(sessionHandler.out); Variant::Map request; request[_WHAT] = OBJECT; @@ -142,6 +148,7 @@ void sendQuery(const string& packageName, const string& className, const string& props->setAppId(QMF2); props->getApplicationHeaders().setString(QMF_OPCODE, _QUERY_REQUEST); headerBody.get(true)->setRoutingKey(BROKER); + headerBody.get(true)->setCorrelationId(className); AMQFrame header(headerBody); header.setBof(false); header.setEof(false); @@ -164,14 +171,14 @@ Variant::Map asMapVoid(const Variant& value) { if (!value.isVoid()) return value.asMap(); else return Variant::Map(); } - } // namespace BrokerReplicator::BrokerReplicator(HaBroker& hb, const boost::shared_ptr& l) : Exchange(QPID_CONFIGURATION_REPLICATOR), logPrefix("Backup: "), replicationTest(hb.getReplicationTest()), haBroker(hb), broker(hb.getBroker()), link(l), - initialized(false) + initialized(false), + alternates(hb.getBroker().getExchanges()) {} void BrokerReplicator::initialize() { @@ -249,13 +256,15 @@ void BrokerReplicator::route(Deliverable& msg) { if (haBroker.getStatus() == JOINING) haBroker.setStatus(CATCHUP); const framing::FieldTable* headers = msg.getMessage().getApplicationHeaders(); + const MessageProperties* messageProperties = msg.getMessage().getProperties(); Variant::List list; try { - if (!isQMFv2(msg.getMessage()) || !headers) + if (!isQMFv2(msg.getMessage()) || !headers || !messageProperties) throw Exception("Unexpected message, not QMF2 event or query response."); // decode as list string content = msg.getMessage().getFrames().getContent(); amqp_0_10::ListCodec::decode(content, list); + QPID_LOG(trace, "Broker replicator received: " << *messageProperties); if (headers->getAsString(QMF_CONTENT) == EVENT) { for (Variant::List::iterator i = list.begin(); i != list.end(); ++i) { Variant::Map& map = i->asMap(); @@ -283,6 +292,10 @@ void BrokerReplicator::route(Deliverable& msg) { else if (type == BINDING) doResponseBind(values); else if (type == HA_BROKER) doResponseHaBroker(values); } + if (messageProperties->getCorrelationId() == EXCHANGE && !headers->isSet(PARTIAL)) { + // We have received all of the exchange response. + alternates.clear(); + } } } catch (const std::exception& e) { QPID_LOG(critical, logPrefix << "Configuration failed: " << e.what() @@ -309,19 +322,10 @@ void BrokerReplicator::doEventQueueDeclare(Variant::Map& values) { broker.getQueues().destroy(name); stopQueueReplicator(name); } - std::pair, bool> result = - broker.createQueue( - name, - values[DURABLE].asBool(), - autoDel, - 0, // no owner regardless of exclusivity on primary - // FIXME aconway 2012-07-06: handle alternate exchange - values[ALTEX].asString(), - args, - userId, - remoteHost); - assert(result.second); // Should be true since we destroyed existing queue above - startQueueReplicator(result.first); + boost::shared_ptr queue = createQueue( + name, values[DURABLE].asBool(), autoDel, args, values[ALTEX].asString()); + assert(queue); // Should be created since we destroed the previous queue above. + if (queue) startQueueReplicator(queue); } } @@ -359,17 +363,9 @@ void BrokerReplicator::doEventExchangeDeclare(Variant::Map& values) { broker.getExchanges().destroy(name); QPID_LOG(warning, logPrefix << "Replaced exsiting exchange: " << name); } - std::pair, bool> result = - broker.createExchange( - name, - values[EXTYPE].asString(), - values[DURABLE].asBool(), - // FIXME aconway 2012-07-06: handle alternate exchanges - values[ALTEX].asString(), - args, - userId, - remoteHost); - assert(result.second); + boost::shared_ptr exchange = + createExchange(name, values[EXTYPE].asString(), values[DURABLE].asBool(), args, values[ALTEX].asString()); + assert(exchange); } } @@ -431,6 +427,22 @@ void BrokerReplicator::doEventMembersUpdate(Variant::Map& values) { haBroker.setMembership(members); } +namespace { + +// Get the alternate exchange from the exchange field of a queue or exchange response. +static const string EXCHANGE_KEY_PREFIX("org.apache.qpid.broker:exchange:"); + +string getAltExchange(const types::Variant& var) { + if (!var.isVoid()) { + management::ObjectId oid(var); + string key = oid.getV2Key(); + if (key.find(EXCHANGE_KEY_PREFIX) != 0) throw Exception("Invalid exchange reference: "+key); + return key.substr(EXCHANGE_KEY_PREFIX.size()); + } + else return string(); +} +} + void BrokerReplicator::doResponseQueue(Variant::Map& values) { Variant::Map argsMap(asMapVoid(values[ARGUMENTS])); if (!replicationTest.isReplicated( @@ -443,22 +455,12 @@ void BrokerReplicator::doResponseQueue(Variant::Map& values) { QPID_LOG(debug, logPrefix << "Queue response: " << name); framing::FieldTable args; amqp_0_10::translate(argsMap, args); - std::pair, bool> result = - broker.createQueue( - name, - values[DURABLE].asBool(), - values[AUTODELETE].asBool(), - 0 /*i.e. no owner regardless of exclusivity on master*/, - ""/*TODO: need to include alternate-exchange*/, - args, - userId, - remoteHost); - + boost::shared_ptr queue = + createQueue(name, values[DURABLE].asBool(), values[AUTODELETE].asBool(), args, + getAltExchange(values[ALTEXCHANGE])); // It is normal for the queue to already exist if we are failing over. - if (result.second) - startQueueReplicator(result.first); - else - QPID_LOG(debug, logPrefix << "Queue already replicated: " << name); + if (queue) startQueueReplicator(queue); + else QPID_LOG(debug, logPrefix << "Queue already replicated: " << name); } void BrokerReplicator::doResponseExchange(Variant::Map& values) { @@ -468,16 +470,10 @@ void BrokerReplicator::doResponseExchange(Variant::Map& values) { QPID_LOG(debug, logPrefix << "Exchange response: " << name); framing::FieldTable args; amqp_0_10::translate(argsMap, args); - bool created = broker.createExchange( - name, - values[TYPE].asString(), - values[DURABLE].asBool(), - "", // FIXME aconway 2012-07-09: need to include alternate-exchange - args, - userId, - remoteHost - ).second; - QPID_LOG_IF(debug, !created, logPrefix << "Exchange already exists: " << name); + boost::shared_ptr exchange = createExchange( + name, values[TYPE].asString(), values[DURABLE].asBool(), args, + getAltExchange(values[ALTEXCHANGE])); + QPID_LOG_IF(debug, !exchange, logPrefix << "Exchange already exists: " << name); } namespace { @@ -564,6 +560,60 @@ void BrokerReplicator::stopQueueReplicator(const std::string& name) { } } +boost::shared_ptr BrokerReplicator::createQueue( + const std::string& name, + bool durable, + bool autodelete, + const qpid::framing::FieldTable& arguments, + const std::string& alternateExchange) +{ + std::pair, bool> result = + broker.createQueue( + name, + durable, + autodelete, + 0, // no owner regardless of exclusivity on primary + string(), // Set alternate exchange below + arguments, + userId, + remoteHost); + if (result.second) { + if (!alternateExchange.empty()) { + alternates.setAlternate( + alternateExchange, boost::bind(&Queue::setAlternateExchange, result.first, _1)); + } + return result.first; + } + else return boost::shared_ptr(); +} + +boost::shared_ptr BrokerReplicator::createExchange( + const std::string& name, + const std::string& type, + bool durable, + const qpid::framing::FieldTable& args, + const std::string& alternateExchange) +{ + std::pair, bool> result = + broker.createExchange( + name, + type, + durable, + string(), // Set alternate exchange below + args, + userId, + remoteHost); + if (result.second) { + alternates.addExchange(result.first); + if (!alternateExchange.empty()) { + alternates.setAlternate( + alternateExchange, boost::bind(&Exchange::setAlternate, result.first, _1)); + } + return result.first; + } + else return boost::shared_ptr(); +} + bool BrokerReplicator::bind(boost::shared_ptr, const string&, const framing::FieldTable*) { return false; } bool BrokerReplicator::unbind(boost::shared_ptr, const string&, const framing::FieldTable*) { return false; } bool BrokerReplicator::isBound(boost::shared_ptr, const string* const, const framing::FieldTable* const) { return false; } diff --git a/qpid/cpp/src/qpid/ha/BrokerReplicator.h b/qpid/cpp/src/qpid/ha/BrokerReplicator.h index e2ca8f9e14..dbe4822d74 100644 --- a/qpid/cpp/src/qpid/ha/BrokerReplicator.h +++ b/qpid/cpp/src/qpid/ha/BrokerReplicator.h @@ -24,8 +24,10 @@ #include "types.h" #include "ReplicationTest.h" +#include "AlternateExchangeSetter.h" #include "qpid/broker/Exchange.h" #include "qpid/types/Variant.h" +#include "qpid/management/ManagementObject.h" #include #include @@ -95,6 +97,20 @@ class BrokerReplicator : public broker::Exchange, void startQueueReplicator(const boost::shared_ptr&); void stopQueueReplicator(const std::string& name); + boost::shared_ptr createQueue( + const std::string& name, + bool durable, + bool autodelete, + const qpid::framing::FieldTable& arguments, + const std::string& alternateExchange); + + boost::shared_ptr createExchange( + const std::string& name, + const std::string& type, + bool durable, + const qpid::framing::FieldTable& args, + const std::string& alternateExchange); + std::string logPrefix; std::string userId, remoteHost; ReplicationTest replicationTest; @@ -102,6 +118,7 @@ class BrokerReplicator : public broker::Exchange, broker::Broker& broker; boost::shared_ptr link; bool initialized; + AlternateExchangeSetter alternates; }; }} // namespace qpid::broker diff --git a/qpid/cpp/src/tests/ha_tests.py b/qpid/cpp/src/tests/ha_tests.py index cf2ab3508f..414ede7cca 100755 --- a/qpid/cpp/src/tests/ha_tests.py +++ b/qpid/cpp/src/tests/ha_tests.py @@ -20,7 +20,7 @@ import os, signal, sys, time, imp, re, subprocess, glob, random, logging, shutil, math, unittest import traceback -from qpid.messaging import Message, NotFound, ConnectionError, ReceiverError, Connection, Timeout +from qpid.messaging import Message, NotFound, ConnectionError, ReceiverError, Connection, Timeout, Disposition, REJECTED from qpid.datatypes import uuid4 from brokertest import * from threading import Thread, Lock, Condition @@ -63,6 +63,7 @@ class HaBroker(Broker): args = copy(args) args += ["--load-module", BrokerTest.ha_lib, "--log-enable=debug+:ha::", + "--log-enable=trace+:ha::", # FIXME aconway 2012-07-12: # FIXME aconway 2012-02-13: workaround slow link failover. "--link-maintenace-interval=0.1", "--ha-cluster=%s"%ha_cluster] @@ -188,7 +189,7 @@ class HaCluster(object): self.broker_id += 1 return name - def start(self, update_urls=True): + def start(self, update_urls=True, args=[]): """Start a new broker in the cluster""" b = HaBroker(self.test, name=self.next_name(), **self.kwargs) self._brokers.append(b) @@ -760,6 +761,52 @@ acl deny all all s1.sender("ex").send("foo"); self.assertEqual(s1.receiver("q").fetch().content, "foo") + def test_alterante_exchange(self): + """Verify that alternate-exchange on exchanges and queues is propagated + to new members of a cluster. """ + cluster = HaCluster(self, 2) + s = cluster[0].connect().session() + # altex exchange: acts as alternate exchange + s.sender("altex;{create:always,node:{type:topic,x-declare:{type:'fanout'}}}") + # altq queue bound to altex, collect re-routed messages. + s.sender("altq;{create:always,node:{x-bindings:[{exchange:'altex',queue:altq}]}}") + # 0ex exchange with alternate-exchange altex and no queues bound + s.sender("0ex;{create:always,node:{type:topic, x-declare:{type:'direct', alternate-exchange:'altex'}}}") + # create queue q with alternate-exchange altex + s.sender("q;{create:always,node:{type:queue, x-declare:{alternate-exchange:'altex'}}}") + # create a bunch of exchanges to ensure we don't clean up prematurely if the + # response comes in multiple fragments. + for i in xrange(200): s.sender("00ex%s;{create:always,node:{type:topic}}"%i) + + def verify(broker): + s = broker.connect().session() + # Verify unmatched message goes to ex's alternate. + s.sender("0ex").send("foo") + altq = s.receiver("altq") + self.assertEqual("foo", altq.fetch(timeout=0).content) + s.acknowledge() + # Verify rejected message goes to q's alternate. + s.sender("q").send("bar") + msg = s.receiver("q").fetch(timeout=0) + self.assertEqual("bar", msg.content) + s.acknowledge(msg, Disposition(REJECTED)) # Reject the message + self.assertEqual("bar", altq.fetch(timeout=0).content) + s.acknowledge() + + # Sanity check: alternate exchanges on original broker + verify(cluster[0]) + # Check backup that was connected during setup. + cluster[1].wait_backup("0ex") + cluster[1].wait_backup("q") + cluster.bounce(0) + verify(cluster[1]) + # Check a newly started backup. + cluster.start() + cluster[2].wait_backup("0ex") + cluster[2].wait_backup("q") + cluster.bounce(1) + verify(cluster[2]) + def fairshare(msgs, limit, levels): """ Generator to return prioritised messages in expected order for a given fairshare limit -- cgit v1.2.1