diff options
author | Alan Conway <aconway@apache.org> | 2012-10-11 19:23:30 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2012-10-11 19:23:30 +0000 |
commit | f75678b1f215af028b25802c46e274e2d10551b5 (patch) | |
tree | 89267ad6025ae3884d8de9a2d783051014db2472 /qpid/cpp/src/qpid/ha/BrokerReplicator.cpp | |
parent | 12eb9e230a50eee63378d1a84f62e57bce4e2e92 (diff) | |
download | qpid-python-f75678b1f215af028b25802c46e274e2d10551b5.tar.gz |
Bug 860701 - QPID-4350: HA handle auto-delete queues
Subscribed auto-delete queues are deleted by the backup.
Timed auto-delete queues are deleted after the timeout.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1397243 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src/qpid/ha/BrokerReplicator.cpp')
-rw-r--r-- | qpid/cpp/src/qpid/ha/BrokerReplicator.cpp | 226 |
1 files changed, 164 insertions, 62 deletions
diff --git a/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp b/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp index 14e6e1a5d1..18d2af76c2 100644 --- a/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp +++ b/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp @@ -23,6 +23,7 @@ #include "QueueReplicator.h" #include "qpid/broker/Broker.h" #include "qpid/broker/Connection.h" +#include "qpid/broker/ConnectionObserver.h" #include "qpid/broker/Queue.h" #include "qpid/broker/QueueSettings.h" #include "qpid/broker/Link.h" @@ -84,25 +85,27 @@ const string ARGS("args"); const string ARGUMENTS("arguments"); const string AUTODEL("autoDel"); const string AUTODELETE("autoDelete"); -const string EXCL("excl"); -const string EXCLUSIVE("exclusive"); const string BIND("bind"); -const string UNBIND("unbind"); const string BINDING("binding"); +const string BINDING_KEY("bindingKey"); const string CREATED("created"); const string DISP("disp"); +const string DEST("dest"); const string DURABLE("durable"); const string EXCHANGE("exchange"); +const string EXCL("excl"); +const string EXCLUSIVE("exclusive"); const string EXNAME("exName"); const string EXTYPE("exType"); +const string HA_BROKER("habroker"); const string KEY("key"); -const string BINDING_KEY("bindingKey"); const string NAME("name"); +const string PARTIAL("partial"); const string QNAME("qName"); const string QUEUE("queue"); const string TYPE("type"); -const string HA_BROKER("habroker"); -const string PARTIAL("partial"); +const string UNBIND("unbind"); +const string CONSUMER_COUNT("consumerCount"); const string AGENT_EVENT_BROKER("agent.ind.event.org_apache_qpid_broker.#"); const string AGENT_EVENT_HA("agent.ind.event.org_apache_qpid_ha.#"); @@ -122,10 +125,7 @@ const string QMF_DEFAULT_DIRECT("qmf.default.direct"); const string _QUERY_REQUEST("_query_request"); const string BROKER("broker"); const string MEMBERS("members"); - -template <class T> bool match(Variant::Map& schema) { - return T::match(schema[CLASS_NAME], schema[PACKAGE_NAME]); -} +const string AUTO_DELETE_TIMEOUT("qpid.auto_delete_timeout"); void sendQuery(const string& packageName, const string& className, const string& queueName, SessionHandler& sessionHandler) @@ -174,22 +174,78 @@ Variant::Map asMapVoid(const Variant& value) { } } // namespace +// Listens for errors on the bridge session. +class BrokerReplicator::ErrorListener : public SessionHandler::ErrorListener { + public: + ErrorListener(const std::string& lp, BrokerReplicator& br) : + logPrefix(lp), brokerReplicator(br) {} + + void connectionException(framing::connection::CloseCode, const std::string& msg) { + QPID_LOG(error, logPrefix << "Connection error: " << msg); + } + void channelException(framing::session::DetachCode, const std::string& msg) { + QPID_LOG(error, logPrefix << "Channel error: " << msg); + } + void executionException(framing::execution::ErrorCode, const std::string& msg) { + QPID_LOG(error, logPrefix << "Execution error: " << msg); + } + void detach() { + QPID_LOG(debug, logPrefix << "Session detached."); + } + + private: + std::string logPrefix; + BrokerReplicator& brokerReplicator; +}; + +class BrokerReplicator::ConnectionObserver : public broker::ConnectionObserver +{ + public: + ConnectionObserver(BrokerReplicator& br) : brokerReplicator(br) {} + virtual void connection(Connection&) {} + virtual void opened(Connection&) {} + + virtual void closed(Connection& c) { + if (brokerReplicator.link && &c == brokerReplicator.connection) + brokerReplicator.disconnected(); + } + virtual void forced(Connection& c, const std::string& /*message*/) { closed(c); } + private: + BrokerReplicator& brokerReplicator; +}; + +template <class E> BrokerReplicator::EventKey eventKey() { + return make_pair(E::PACKAGE_NAME, E::EVENT_NAME); +} + BrokerReplicator::BrokerReplicator(HaBroker& hb, const boost::shared_ptr<Link>& l) : Exchange(QPID_CONFIGURATION_REPLICATOR), logPrefix("Backup: "), replicationTest(hb.getReplicationTest()), haBroker(hb), broker(hb.getBroker()), link(l), initialized(false), alternates(hb.getBroker().getExchanges()), - cleaner(*this) + cleaner(*this), + connection(0) { + broker.getConnectionObservers().add( + boost::shared_ptr<broker::ConnectionObserver>(new ConnectionObserver(*this))); getArgs().setString(QPID_REPLICATE, printable(NONE).str()); + + dispatch[eventKey<EventQueueDeclare>()] = &BrokerReplicator::doEventQueueDeclare; + dispatch[eventKey<EventQueueDelete>()] = &BrokerReplicator::doEventQueueDelete; + dispatch[eventKey<EventExchangeDeclare>()] = &BrokerReplicator::doEventExchangeDeclare; + dispatch[eventKey<EventExchangeDelete>()] = &BrokerReplicator::doEventExchangeDelete; + dispatch[eventKey<EventBind>()] = &BrokerReplicator::doEventBind; + dispatch[eventKey<EventUnbind>()] = &BrokerReplicator::doEventUnbind; + dispatch[eventKey<EventMembersUpdate>()] = &BrokerReplicator::doEventMembersUpdate; + dispatch[eventKey<EventSubscribe>()] = &BrokerReplicator::doEventSubscribe; } void BrokerReplicator::initialize() { // Can't do this in the constructor because we need a shared_ptr to this. types::Uuid uuid(true); const std::string name(QPID_CONFIGURATION_REPLICATOR + ".bridge." + uuid.str()); - broker.getLinks().declare( + std::pair<Bridge::shared_ptr, bool> result = broker.getLinks().declare( name, // name for bridge *link, // parent false, // durable @@ -206,13 +262,28 @@ void BrokerReplicator::initialize() { // calls are run. boost::bind(&BrokerReplicator::initializeBridge, shared_from_this(), _1, _2) ); + result.first->setErrorListener( + boost::shared_ptr<ErrorListener>(new ErrorListener(logPrefix, *this))); } BrokerReplicator::~BrokerReplicator() { shutdown(); } +namespace { +void collectQueueReplicators( + const boost::shared_ptr<Exchange> ex, set<boost::shared_ptr<QueueReplicator> >& collect) +{ + boost::shared_ptr<QueueReplicator> qr(boost::dynamic_pointer_cast<QueueReplicator>(ex)); + if (qr) collect.insert(qr); +} +} // namespace + void BrokerReplicator::shutdown() { QPID_LOG(debug, logPrefix << "BrokerReplicator shutting down."); - broker.getQueues().eachQueue(boost::bind(&BrokerReplicator::deactivate, this, _1)); + set<boost::shared_ptr<QueueReplicator> > collect; + broker.getExchanges().eachExchange( + boost::bind(&collectQueueReplicators, _1, boost::ref(collect))); + for_each(collect.begin(), collect.end(), + boost::bind(&QueueReplicator::deactivate, _1)); } // This is called in the connection IO thread when the bridge is started. @@ -221,7 +292,8 @@ void BrokerReplicator::initializeBridge(Bridge& bridge, SessionHandler& sessionH // exchanges etc. We know link->getConnection() is non-zero because we are // being called in the connections thread context. // - assert(link->getConnection()); + connection = link->getConnection(); + assert(connection); userId = link->getConnection()->getUserId(); remoteHost = link->getConnection()->getUrl(); @@ -280,13 +352,9 @@ void BrokerReplicator::route(Deliverable& msg) { QPID_LOG(trace, "Broker replicator event: " << map); Variant::Map& schema = map[SCHEMA_ID].asMap(); Variant::Map& values = map[VALUES].asMap(); - if (match<EventQueueDeclare>(schema)) doEventQueueDeclare(values); - else if (match<EventQueueDelete>(schema)) doEventQueueDelete(values); - else if (match<EventExchangeDeclare>(schema)) doEventExchangeDeclare(values); - else if (match<EventExchangeDelete>(schema)) doEventExchangeDelete(values); - else if (match<EventBind>(schema)) doEventBind(values); - else if (match<EventUnbind>(schema)) doEventUnbind(values); - else if (match<EventMembersUpdate>(schema)) doEventMembersUpdate(values); + EventKey key(schema[PACKAGE_NAME], schema[CLASS_NAME]); + EventDispatchMap::iterator i = dispatch.find(key); + if (i != dispatch.end()) (this->*(i->second))(values); } } else if (msg.getMessage().getPropertyAsString(QMF_OPCODE) == QUERY_RESPONSE) { for (Variant::List::iterator i = list.begin(); i != list.end(); ++i) { @@ -337,10 +405,8 @@ void BrokerReplicator::doEventQueueDeclare(Variant::Map& values) { QPID_LOG(warning, logPrefix << "Replacing exsiting queue: " << name); deleteQueue(name); } - settings.populate(args, settings.storeSettings); - CreateQueueResult result = createQueue( - name, values[DURABLE].asBool(), autoDel, args, values[ALTEX].asString()); - assert(result.second); // Should be created since we destroed the previous queue above. + replicateQueue(name, values[DURABLE].asBool(), autoDel, args, + values[ALTEX].asString()); } } @@ -447,6 +513,14 @@ void BrokerReplicator::doEventMembersUpdate(Variant::Map& values) { haBroker.setMembership(members); } +void BrokerReplicator::doEventSubscribe(Variant::Map& values) { + // Ignore queue replicator subscriptions. + if (QueueReplicator::isReplicatorName(values[DEST].asString())) return; + QPID_LOG(debug, logPrefix << "Subscribe event: " << values[QNAME]); + boost::shared_ptr<QueueReplicator> qr = findQueueReplicator(values[QNAME]); + if (qr) qr->setSubscribed(); +} + namespace { // Get the alternate exchange from the exchange field of a queue or exchange response. @@ -484,12 +558,15 @@ void BrokerReplicator::doResponseQueue(Variant::Map& values) { } framing::FieldTable args; qpid::amqp_0_10::translate(argsMap, args); - CreateQueueResult result = - createQueue(name, values[DURABLE].asBool(), values[AUTODELETE].asBool(), args, - getAltExchange(values[ALTEXCHANGE])); - // It is normal for the queue to already exist if we are failing over. - if (!result.second) - QPID_LOG(debug, logPrefix << "Queue already replicated: " << name); + boost::shared_ptr<QueueReplicator> qr = replicateQueue( + name, values[DURABLE].asBool(), values[AUTODELETE].asBool(), args, + getAltExchange(values[ALTEXCHANGE])); + if (qr) { + Variant::Map::const_iterator i = values.find(CONSUMER_COUNT); + if (i != values.end() && isIntegerType(i->second.getType())) { + if (i->second.asInt64()) qr->setSubscribed(); + } + } } void BrokerReplicator::doResponseExchange(Variant::Map& values) { @@ -580,7 +657,8 @@ void BrokerReplicator::doResponseHaBroker(Variant::Map& values) { } } -void BrokerReplicator::startQueueReplicator(const boost::shared_ptr<Queue>& queue) +boost::shared_ptr<QueueReplicator> BrokerReplicator::startQueueReplicator( + const boost::shared_ptr<Queue>& queue) { if (replicationTest.replicateLevel(queue->getSettings().storeSettings) == ALL) { boost::shared_ptr<QueueReplicator> qr( @@ -588,30 +666,22 @@ void BrokerReplicator::startQueueReplicator(const boost::shared_ptr<Queue>& queu if (!broker.getExchanges().registerExchange(qr)) throw Exception(QPID_MSG("Duplicate queue replicator " << qr->getName())); qr->activate(); + return qr; } -} - -void BrokerReplicator::deactivateQueue(const std::string& queueName) { - boost::shared_ptr<QueueReplicator> qr = findQueueReplicator(queueName); - if (qr) { - qr->deactivate(); - // QueueReplicator's bridge is now queued for destruction but may not - // actually be destroyed. - broker.getExchanges().destroy(qr->getName()); - } -} - -void BrokerReplicator::deactivate(boost::shared_ptr<broker::Queue> q) { - deactivateQueue(q->getName()); -} - -void BrokerReplicator::deleteQueue(const std::string& name) { - deactivateQueue(name); - try { + return boost::shared_ptr<QueueReplicator>(); +} + +void BrokerReplicator::deleteQueue(const std::string& name, bool purge) { + boost::shared_ptr<QueueReplicator> qr(findQueueReplicator(name)); + if (qr) qr->deactivate(); + Queue::shared_ptr queue = broker.getQueues().find(name); + if (queue) { + // Purge before deleting to ensure that we don't reroute any + // messages. Any reroutes will be done at the primary and + // replicated as normal. + if (purge) queue->purge(0, boost::shared_ptr<Exchange>()); broker.deleteQueue(name, userId, remoteHost); QPID_LOG(debug, logPrefix << "Queue deleted: " << name); - } catch (const framing::NotFoundException&) { - QPID_LOG(debug, logPrefix << "Queue not found for deletion: " << name); } } @@ -624,7 +694,7 @@ void BrokerReplicator::deleteExchange(const std::string& name) { } } -BrokerReplicator::CreateQueueResult BrokerReplicator::createQueue( +boost::shared_ptr<QueueReplicator> BrokerReplicator::replicateQueue( const std::string& name, bool durable, bool autodelete, @@ -641,14 +711,15 @@ BrokerReplicator::CreateQueueResult BrokerReplicator::createQueue( string(), // Set alternate exchange below userId, remoteHost); - boost::shared_ptr<Queue> queue = result.first; - if (!findQueueReplicator(queue->getName())) startQueueReplicator(queue); - if (result.second && !alternateExchange.empty()) { - alternates.setAlternate( - alternateExchange, - boost::bind(&Queue::setAlternateExchange, result.first, _1)); + boost::shared_ptr<QueueReplicator> qr; + if (!findQueueReplicator(name)) qr = startQueueReplicator(result.first); + if (result.second) { + if (!alternateExchange.empty()) { + alternates.setAlternate( + alternateExchange, boost::bind(&Queue::setAlternateExchange, result.first, _1)); + } } - return result; + return qr; } BrokerReplicator::CreateExchangeResult BrokerReplicator::createExchange( @@ -720,8 +791,39 @@ void BrokerReplicator::Cleaner::cleanExchanges() { void BrokerReplicator::Cleaner::cleanQueues() { for_each(queues.begin(), queues.end(), - boost::bind(&BrokerReplicator::deleteQueue, &brokerReplicator, _1)); + boost::bind(&BrokerReplicator::deleteQueue, &brokerReplicator, _1, true)); queues.clear(); } +void BrokerReplicator::autoDeleteCheck( + boost::shared_ptr<Exchange> ex, set<string>& result) +{ + boost::shared_ptr<QueueReplicator> qr(boost::dynamic_pointer_cast<QueueReplicator>(ex)); + if (!qr) return; + assert(qr); + if (qr->getQueue()->isAutoDelete() && qr->isSubscribed()) { + if (qr->getQueue()->getSettings().autoDeleteDelay) { + // Start the auto-delete timer + Queue::tryAutoDelete(broker, qr->getQueue(), remoteHost, userId); + } + else { + // Mark for immediate deletion. + result.insert(qr->getQueue()->getName()); + } + } +} + +void BrokerReplicator::disconnected() { + QPID_LOG(info, logPrefix << "Disconnected"); + connection = 0; + // Clean up auto-delete queues + set<string> deleteQueues; + broker.getExchanges().eachExchange(boost::bind(&BrokerReplicator::autoDeleteCheck, + this, _1, boost::ref(deleteQueues))); + // Don't purge before deleting, the primary is gone so we need to + // reroute the deleted messages. + for_each(deleteQueues.begin(), deleteQueues.end(), + boost::bind(&BrokerReplicator::deleteQueue, this, _1, false)); +} + }} // namespace broker |