diff options
Diffstat (limited to 'qpid/cpp/src/qpid')
-rw-r--r-- | qpid/cpp/src/qpid/broker/Queue.cpp | 2 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/QueueObserver.h | 1 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/ha/BrokerReplicator.cpp | 226 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/ha/BrokerReplicator.h | 28 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/ha/QueueReplicator.cpp | 38 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/ha/QueueReplicator.h | 11 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp | 9 |
7 files changed, 238 insertions, 77 deletions
diff --git a/qpid/cpp/src/qpid/broker/Queue.cpp b/qpid/cpp/src/qpid/broker/Queue.cpp index 95cba7bd8e..9cf2f541ce 100644 --- a/qpid/cpp/src/qpid/broker/Queue.cpp +++ b/qpid/cpp/src/qpid/broker/Queue.cpp @@ -1072,6 +1072,8 @@ void Queue::destroyed() notifyDeleted(); { Mutex::ScopedLock lock(messageLock); + for_each(observers.begin(), observers.end(), + boost::bind(&QueueObserver::destroy, _1)); observers.clear(); } } diff --git a/qpid/cpp/src/qpid/broker/QueueObserver.h b/qpid/cpp/src/qpid/broker/QueueObserver.h index 29e867253e..2ba98f6945 100644 --- a/qpid/cpp/src/qpid/broker/QueueObserver.h +++ b/qpid/cpp/src/qpid/broker/QueueObserver.h @@ -69,6 +69,7 @@ class QueueObserver virtual void requeued(const Message&) = 0; virtual void consumerAdded( const Consumer& ) {}; virtual void consumerRemoved( const Consumer& ) {}; + virtual void destroy() {}; private: }; }} // namespace qpid::broker diff --git a/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp b/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp index 14e6e1a5d1..18d2af76c2 100644 --- a/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp +++ b/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp @@ -23,6 +23,7 @@ #include "QueueReplicator.h" #include "qpid/broker/Broker.h" #include "qpid/broker/Connection.h" +#include "qpid/broker/ConnectionObserver.h" #include "qpid/broker/Queue.h" #include "qpid/broker/QueueSettings.h" #include "qpid/broker/Link.h" @@ -84,25 +85,27 @@ const string ARGS("args"); const string ARGUMENTS("arguments"); const string AUTODEL("autoDel"); const string AUTODELETE("autoDelete"); -const string EXCL("excl"); -const string EXCLUSIVE("exclusive"); const string BIND("bind"); -const string UNBIND("unbind"); const string BINDING("binding"); +const string BINDING_KEY("bindingKey"); const string CREATED("created"); const string DISP("disp"); +const string DEST("dest"); const string DURABLE("durable"); const string EXCHANGE("exchange"); +const string EXCL("excl"); +const string EXCLUSIVE("exclusive"); const string EXNAME("exName"); const string EXTYPE("exType"); +const string HA_BROKER("habroker"); const string KEY("key"); -const string BINDING_KEY("bindingKey"); const string NAME("name"); +const string PARTIAL("partial"); const string QNAME("qName"); const string QUEUE("queue"); const string TYPE("type"); -const string HA_BROKER("habroker"); -const string PARTIAL("partial"); +const string UNBIND("unbind"); +const string CONSUMER_COUNT("consumerCount"); const string AGENT_EVENT_BROKER("agent.ind.event.org_apache_qpid_broker.#"); const string AGENT_EVENT_HA("agent.ind.event.org_apache_qpid_ha.#"); @@ -122,10 +125,7 @@ const string QMF_DEFAULT_DIRECT("qmf.default.direct"); const string _QUERY_REQUEST("_query_request"); const string BROKER("broker"); const string MEMBERS("members"); - -template <class T> bool match(Variant::Map& schema) { - return T::match(schema[CLASS_NAME], schema[PACKAGE_NAME]); -} +const string AUTO_DELETE_TIMEOUT("qpid.auto_delete_timeout"); void sendQuery(const string& packageName, const string& className, const string& queueName, SessionHandler& sessionHandler) @@ -174,22 +174,78 @@ Variant::Map asMapVoid(const Variant& value) { } } // namespace +// Listens for errors on the bridge session. +class BrokerReplicator::ErrorListener : public SessionHandler::ErrorListener { + public: + ErrorListener(const std::string& lp, BrokerReplicator& br) : + logPrefix(lp), brokerReplicator(br) {} + + void connectionException(framing::connection::CloseCode, const std::string& msg) { + QPID_LOG(error, logPrefix << "Connection error: " << msg); + } + void channelException(framing::session::DetachCode, const std::string& msg) { + QPID_LOG(error, logPrefix << "Channel error: " << msg); + } + void executionException(framing::execution::ErrorCode, const std::string& msg) { + QPID_LOG(error, logPrefix << "Execution error: " << msg); + } + void detach() { + QPID_LOG(debug, logPrefix << "Session detached."); + } + + private: + std::string logPrefix; + BrokerReplicator& brokerReplicator; +}; + +class BrokerReplicator::ConnectionObserver : public broker::ConnectionObserver +{ + public: + ConnectionObserver(BrokerReplicator& br) : brokerReplicator(br) {} + virtual void connection(Connection&) {} + virtual void opened(Connection&) {} + + virtual void closed(Connection& c) { + if (brokerReplicator.link && &c == brokerReplicator.connection) + brokerReplicator.disconnected(); + } + virtual void forced(Connection& c, const std::string& /*message*/) { closed(c); } + private: + BrokerReplicator& brokerReplicator; +}; + +template <class E> BrokerReplicator::EventKey eventKey() { + return make_pair(E::PACKAGE_NAME, E::EVENT_NAME); +} + BrokerReplicator::BrokerReplicator(HaBroker& hb, const boost::shared_ptr<Link>& l) : Exchange(QPID_CONFIGURATION_REPLICATOR), logPrefix("Backup: "), replicationTest(hb.getReplicationTest()), haBroker(hb), broker(hb.getBroker()), link(l), initialized(false), alternates(hb.getBroker().getExchanges()), - cleaner(*this) + cleaner(*this), + connection(0) { + broker.getConnectionObservers().add( + boost::shared_ptr<broker::ConnectionObserver>(new ConnectionObserver(*this))); getArgs().setString(QPID_REPLICATE, printable(NONE).str()); + + dispatch[eventKey<EventQueueDeclare>()] = &BrokerReplicator::doEventQueueDeclare; + dispatch[eventKey<EventQueueDelete>()] = &BrokerReplicator::doEventQueueDelete; + dispatch[eventKey<EventExchangeDeclare>()] = &BrokerReplicator::doEventExchangeDeclare; + dispatch[eventKey<EventExchangeDelete>()] = &BrokerReplicator::doEventExchangeDelete; + dispatch[eventKey<EventBind>()] = &BrokerReplicator::doEventBind; + dispatch[eventKey<EventUnbind>()] = &BrokerReplicator::doEventUnbind; + dispatch[eventKey<EventMembersUpdate>()] = &BrokerReplicator::doEventMembersUpdate; + dispatch[eventKey<EventSubscribe>()] = &BrokerReplicator::doEventSubscribe; } void BrokerReplicator::initialize() { // Can't do this in the constructor because we need a shared_ptr to this. types::Uuid uuid(true); const std::string name(QPID_CONFIGURATION_REPLICATOR + ".bridge." + uuid.str()); - broker.getLinks().declare( + std::pair<Bridge::shared_ptr, bool> result = broker.getLinks().declare( name, // name for bridge *link, // parent false, // durable @@ -206,13 +262,28 @@ void BrokerReplicator::initialize() { // calls are run. boost::bind(&BrokerReplicator::initializeBridge, shared_from_this(), _1, _2) ); + result.first->setErrorListener( + boost::shared_ptr<ErrorListener>(new ErrorListener(logPrefix, *this))); } BrokerReplicator::~BrokerReplicator() { shutdown(); } +namespace { +void collectQueueReplicators( + const boost::shared_ptr<Exchange> ex, set<boost::shared_ptr<QueueReplicator> >& collect) +{ + boost::shared_ptr<QueueReplicator> qr(boost::dynamic_pointer_cast<QueueReplicator>(ex)); + if (qr) collect.insert(qr); +} +} // namespace + void BrokerReplicator::shutdown() { QPID_LOG(debug, logPrefix << "BrokerReplicator shutting down."); - broker.getQueues().eachQueue(boost::bind(&BrokerReplicator::deactivate, this, _1)); + set<boost::shared_ptr<QueueReplicator> > collect; + broker.getExchanges().eachExchange( + boost::bind(&collectQueueReplicators, _1, boost::ref(collect))); + for_each(collect.begin(), collect.end(), + boost::bind(&QueueReplicator::deactivate, _1)); } // This is called in the connection IO thread when the bridge is started. @@ -221,7 +292,8 @@ void BrokerReplicator::initializeBridge(Bridge& bridge, SessionHandler& sessionH // exchanges etc. We know link->getConnection() is non-zero because we are // being called in the connections thread context. // - assert(link->getConnection()); + connection = link->getConnection(); + assert(connection); userId = link->getConnection()->getUserId(); remoteHost = link->getConnection()->getUrl(); @@ -280,13 +352,9 @@ void BrokerReplicator::route(Deliverable& msg) { QPID_LOG(trace, "Broker replicator event: " << map); Variant::Map& schema = map[SCHEMA_ID].asMap(); Variant::Map& values = map[VALUES].asMap(); - if (match<EventQueueDeclare>(schema)) doEventQueueDeclare(values); - else if (match<EventQueueDelete>(schema)) doEventQueueDelete(values); - else if (match<EventExchangeDeclare>(schema)) doEventExchangeDeclare(values); - else if (match<EventExchangeDelete>(schema)) doEventExchangeDelete(values); - else if (match<EventBind>(schema)) doEventBind(values); - else if (match<EventUnbind>(schema)) doEventUnbind(values); - else if (match<EventMembersUpdate>(schema)) doEventMembersUpdate(values); + EventKey key(schema[PACKAGE_NAME], schema[CLASS_NAME]); + EventDispatchMap::iterator i = dispatch.find(key); + if (i != dispatch.end()) (this->*(i->second))(values); } } else if (msg.getMessage().getPropertyAsString(QMF_OPCODE) == QUERY_RESPONSE) { for (Variant::List::iterator i = list.begin(); i != list.end(); ++i) { @@ -337,10 +405,8 @@ void BrokerReplicator::doEventQueueDeclare(Variant::Map& values) { QPID_LOG(warning, logPrefix << "Replacing exsiting queue: " << name); deleteQueue(name); } - settings.populate(args, settings.storeSettings); - CreateQueueResult result = createQueue( - name, values[DURABLE].asBool(), autoDel, args, values[ALTEX].asString()); - assert(result.second); // Should be created since we destroed the previous queue above. + replicateQueue(name, values[DURABLE].asBool(), autoDel, args, + values[ALTEX].asString()); } } @@ -447,6 +513,14 @@ void BrokerReplicator::doEventMembersUpdate(Variant::Map& values) { haBroker.setMembership(members); } +void BrokerReplicator::doEventSubscribe(Variant::Map& values) { + // Ignore queue replicator subscriptions. + if (QueueReplicator::isReplicatorName(values[DEST].asString())) return; + QPID_LOG(debug, logPrefix << "Subscribe event: " << values[QNAME]); + boost::shared_ptr<QueueReplicator> qr = findQueueReplicator(values[QNAME]); + if (qr) qr->setSubscribed(); +} + namespace { // Get the alternate exchange from the exchange field of a queue or exchange response. @@ -484,12 +558,15 @@ void BrokerReplicator::doResponseQueue(Variant::Map& values) { } framing::FieldTable args; qpid::amqp_0_10::translate(argsMap, args); - CreateQueueResult result = - createQueue(name, values[DURABLE].asBool(), values[AUTODELETE].asBool(), args, - getAltExchange(values[ALTEXCHANGE])); - // It is normal for the queue to already exist if we are failing over. - if (!result.second) - QPID_LOG(debug, logPrefix << "Queue already replicated: " << name); + boost::shared_ptr<QueueReplicator> qr = replicateQueue( + name, values[DURABLE].asBool(), values[AUTODELETE].asBool(), args, + getAltExchange(values[ALTEXCHANGE])); + if (qr) { + Variant::Map::const_iterator i = values.find(CONSUMER_COUNT); + if (i != values.end() && isIntegerType(i->second.getType())) { + if (i->second.asInt64()) qr->setSubscribed(); + } + } } void BrokerReplicator::doResponseExchange(Variant::Map& values) { @@ -580,7 +657,8 @@ void BrokerReplicator::doResponseHaBroker(Variant::Map& values) { } } -void BrokerReplicator::startQueueReplicator(const boost::shared_ptr<Queue>& queue) +boost::shared_ptr<QueueReplicator> BrokerReplicator::startQueueReplicator( + const boost::shared_ptr<Queue>& queue) { if (replicationTest.replicateLevel(queue->getSettings().storeSettings) == ALL) { boost::shared_ptr<QueueReplicator> qr( @@ -588,30 +666,22 @@ void BrokerReplicator::startQueueReplicator(const boost::shared_ptr<Queue>& queu if (!broker.getExchanges().registerExchange(qr)) throw Exception(QPID_MSG("Duplicate queue replicator " << qr->getName())); qr->activate(); + return qr; } -} - -void BrokerReplicator::deactivateQueue(const std::string& queueName) { - boost::shared_ptr<QueueReplicator> qr = findQueueReplicator(queueName); - if (qr) { - qr->deactivate(); - // QueueReplicator's bridge is now queued for destruction but may not - // actually be destroyed. - broker.getExchanges().destroy(qr->getName()); - } -} - -void BrokerReplicator::deactivate(boost::shared_ptr<broker::Queue> q) { - deactivateQueue(q->getName()); -} - -void BrokerReplicator::deleteQueue(const std::string& name) { - deactivateQueue(name); - try { + return boost::shared_ptr<QueueReplicator>(); +} + +void BrokerReplicator::deleteQueue(const std::string& name, bool purge) { + boost::shared_ptr<QueueReplicator> qr(findQueueReplicator(name)); + if (qr) qr->deactivate(); + Queue::shared_ptr queue = broker.getQueues().find(name); + if (queue) { + // Purge before deleting to ensure that we don't reroute any + // messages. Any reroutes will be done at the primary and + // replicated as normal. + if (purge) queue->purge(0, boost::shared_ptr<Exchange>()); broker.deleteQueue(name, userId, remoteHost); QPID_LOG(debug, logPrefix << "Queue deleted: " << name); - } catch (const framing::NotFoundException&) { - QPID_LOG(debug, logPrefix << "Queue not found for deletion: " << name); } } @@ -624,7 +694,7 @@ void BrokerReplicator::deleteExchange(const std::string& name) { } } -BrokerReplicator::CreateQueueResult BrokerReplicator::createQueue( +boost::shared_ptr<QueueReplicator> BrokerReplicator::replicateQueue( const std::string& name, bool durable, bool autodelete, @@ -641,14 +711,15 @@ BrokerReplicator::CreateQueueResult BrokerReplicator::createQueue( string(), // Set alternate exchange below userId, remoteHost); - boost::shared_ptr<Queue> queue = result.first; - if (!findQueueReplicator(queue->getName())) startQueueReplicator(queue); - if (result.second && !alternateExchange.empty()) { - alternates.setAlternate( - alternateExchange, - boost::bind(&Queue::setAlternateExchange, result.first, _1)); + boost::shared_ptr<QueueReplicator> qr; + if (!findQueueReplicator(name)) qr = startQueueReplicator(result.first); + if (result.second) { + if (!alternateExchange.empty()) { + alternates.setAlternate( + alternateExchange, boost::bind(&Queue::setAlternateExchange, result.first, _1)); + } } - return result; + return qr; } BrokerReplicator::CreateExchangeResult BrokerReplicator::createExchange( @@ -720,8 +791,39 @@ void BrokerReplicator::Cleaner::cleanExchanges() { void BrokerReplicator::Cleaner::cleanQueues() { for_each(queues.begin(), queues.end(), - boost::bind(&BrokerReplicator::deleteQueue, &brokerReplicator, _1)); + boost::bind(&BrokerReplicator::deleteQueue, &brokerReplicator, _1, true)); queues.clear(); } +void BrokerReplicator::autoDeleteCheck( + boost::shared_ptr<Exchange> ex, set<string>& result) +{ + boost::shared_ptr<QueueReplicator> qr(boost::dynamic_pointer_cast<QueueReplicator>(ex)); + if (!qr) return; + assert(qr); + if (qr->getQueue()->isAutoDelete() && qr->isSubscribed()) { + if (qr->getQueue()->getSettings().autoDeleteDelay) { + // Start the auto-delete timer + Queue::tryAutoDelete(broker, qr->getQueue(), remoteHost, userId); + } + else { + // Mark for immediate deletion. + result.insert(qr->getQueue()->getName()); + } + } +} + +void BrokerReplicator::disconnected() { + QPID_LOG(info, logPrefix << "Disconnected"); + connection = 0; + // Clean up auto-delete queues + set<string> deleteQueues; + broker.getExchanges().eachExchange(boost::bind(&BrokerReplicator::autoDeleteCheck, + this, _1, boost::ref(deleteQueues))); + // Don't purge before deleting, the primary is gone so we need to + // reroute the deleted messages. + for_each(deleteQueues.begin(), deleteQueues.end(), + boost::bind(&BrokerReplicator::deleteQueue, this, _1, false)); +} + }} // namespace broker diff --git a/qpid/cpp/src/qpid/ha/BrokerReplicator.h b/qpid/cpp/src/qpid/ha/BrokerReplicator.h index bbdf3e2c0e..4845360631 100644 --- a/qpid/cpp/src/qpid/ha/BrokerReplicator.h +++ b/qpid/cpp/src/qpid/ha/BrokerReplicator.h @@ -40,6 +40,7 @@ class Broker; class Link; class Bridge; class SessionHandler; +class Connection; } namespace framing { @@ -83,6 +84,13 @@ class BrokerReplicator : public broker::Exchange, typedef std::pair<boost::shared_ptr<broker::Queue>, bool> CreateQueueResult; typedef std::pair<boost::shared_ptr<broker::Exchange>, bool> CreateExchangeResult; + typedef std::pair<std::string,std::string> EventKey; + typedef void (BrokerReplicator::*DispatchFunction)(types::Variant::Map&); + typedef std::map<EventKey, DispatchFunction> EventDispatchMap; + + typedef std::map<std::string, QueueReplicatorPtr> QueueReplicatorMap; + + /** Keep track of queues and exchanges that need to be cleaned up. */ class Cleaner { public: @@ -94,7 +102,7 @@ class BrokerReplicator : public broker::Exchange, // Forget a queue/exchange that does not need cleaning void forgetExchange(const std::string& name); void forgetQueue(const std::string& name); - + // Clean up queues/exchange that are no longer on primary void cleanExchanges(); void cleanQueues(); @@ -110,6 +118,9 @@ class BrokerReplicator : public broker::Exchange, }; friend class Cleaner; + class ErrorListener; + class ConnectionObserver; + void initializeBridge(broker::Bridge&, broker::SessionHandler&); void doEventQueueDeclare(types::Variant::Map& values); @@ -119,6 +130,7 @@ class BrokerReplicator : public broker::Exchange, void doEventBind(types::Variant::Map&); void doEventUnbind(types::Variant::Map&); void doEventMembersUpdate(types::Variant::Map&); + void doEventSubscribe(types::Variant::Map&); void doResponseQueue(types::Variant::Map& values); void doResponseExchange(types::Variant::Map& values); @@ -126,9 +138,9 @@ class BrokerReplicator : public broker::Exchange, void doResponseHaBroker(types::Variant::Map& values); QueueReplicatorPtr findQueueReplicator(const std::string& qname); - void startQueueReplicator(const boost::shared_ptr<broker::Queue>&); + QueueReplicatorPtr startQueueReplicator(const boost::shared_ptr<broker::Queue>&); - CreateQueueResult createQueue( + QueueReplicatorPtr replicateQueue( const std::string& name, bool durable, bool autodelete, @@ -142,11 +154,13 @@ class BrokerReplicator : public broker::Exchange, const qpid::framing::FieldTable& args, const std::string& alternateExchange); - void deactivateQueue(const std::string& name); - void deactivate(boost::shared_ptr<broker::Queue> q); - void deleteQueue(const std::string& name); + bool deactivate(boost::shared_ptr<broker::Exchange> ex, bool destroy); + void deleteQueue(const std::string& name, bool purge=true); void deleteExchange(const std::string& name); + void autoDeleteCheck(boost::shared_ptr<broker::Exchange>, std::set<std::string>&); + void disconnected(); + std::string logPrefix; std::string userId, remoteHost; ReplicationTest replicationTest; @@ -159,6 +173,8 @@ class BrokerReplicator : public broker::Exchange, typedef std::set<std::string> StringSet; StringSet replicatedExchanges; // exchanges that have been replicated. Cleaner cleaner; + broker::Connection* connection; + EventDispatchMap dispatch; }; }} // namespace qpid::broker diff --git a/qpid/cpp/src/qpid/ha/QueueReplicator.cpp b/qpid/cpp/src/qpid/ha/QueueReplicator.cpp index 69c8a56873..5b9993bd90 100644 --- a/qpid/cpp/src/qpid/ha/QueueReplicator.cpp +++ b/qpid/cpp/src/qpid/ha/QueueReplicator.cpp @@ -26,6 +26,7 @@ #include "qpid/broker/Broker.h" #include "qpid/broker/Link.h" #include "qpid/broker/Queue.h" +#include "qpid/broker/QueueObserver.h" #include "qpid/broker/QueueRegistry.h" #include "qpid/broker/SessionHandler.h" #include "qpid/broker/SessionHandler.h" @@ -55,6 +56,10 @@ std::string QueueReplicator::replicatorName(const std::string& queueName) { return QPID_REPLICATOR_ + queueName; } +bool QueueReplicator::isReplicatorName(const std::string& name) { + return name.compare(0, QPID_REPLICATOR_.size(), QPID_REPLICATOR_) == 0; +} + bool QueueReplicator::isEventKey(const std::string key) { const std::string& prefix = QPID_HA_EVENT_PREFIX; bool ret = key.size() > prefix.size() && key.compare(0, prefix.size(), prefix) == 0; @@ -74,19 +79,33 @@ class QueueReplicator::ErrorListener : public SessionHandler::ErrorListener { QPID_LOG(error, logPrefix << "Execution error: " << msg); } void detach() { - QPID_LOG(error, logPrefix << "Unexpectedly detached."); + QPID_LOG(debug, logPrefix << "Session detached"); } private: std::string logPrefix; }; +class QueueReplicator::QueueObserver : public broker::QueueObserver { + public: + QueueObserver(boost::shared_ptr<QueueReplicator> qr) : queueReplicator(qr) {} + void enqueued(const Message&) {} + void dequeued(const Message&) {} + void acquired(const Message&) {} + void requeued(const Message&) {} + void consumerAdded( const Consumer& ) {} + void consumerRemoved( const Consumer& ) {} + void destroy() { queueReplicator->deactivate(); } + private: + boost::shared_ptr<QueueReplicator> queueReplicator; +}; + QueueReplicator::QueueReplicator(HaBroker& hb, boost::shared_ptr<Queue> q, boost::shared_ptr<Link> l) : Exchange(replicatorName(q->getName()), 0, q->getBroker()), haBroker(hb), logPrefix("Backup queue "+q->getName()+": "), - queue(q), link(l), brokerInfo(hb.getBrokerInfo()) + queue(q), link(l), brokerInfo(hb.getBrokerInfo()), subscribed(false) { args.setString(QPID_REPLICATE, printable(NONE).str()); Uuid uuid(true); @@ -118,18 +137,21 @@ void QueueReplicator::activate() { bridge = result.first; bridge->setErrorListener( boost::shared_ptr<ErrorListener>(new ErrorListener(logPrefix))); + boost::shared_ptr<QueueObserver> observer(new QueueObserver(shared_from_this())); + queue->addObserver(observer); } QueueReplicator::~QueueReplicator() { deactivate(); } void QueueReplicator::deactivate() { - // destroy the route + QPID_LOG(debug, logPrefix << "Deactivated"); sys::Mutex::ScopedLock l(lock); - if (bridge) { - bridge->close(); - bridge.reset(); - QPID_LOG(debug, logPrefix << "Deactivated bridge " << bridgeName); - } + if (bridge) bridge->close(); + // Need to drop shared pointers to avoid pointer cycles keeping this in memory. + queue.reset(); + link.reset(); + bridge.reset(); + getBroker()->getExchanges().destroy(getName()); } // Called in a broker connection thread when the bridge is created. diff --git a/qpid/cpp/src/qpid/ha/QueueReplicator.h b/qpid/cpp/src/qpid/ha/QueueReplicator.h index c9eb318aa1..b302162286 100644 --- a/qpid/cpp/src/qpid/ha/QueueReplicator.h +++ b/qpid/cpp/src/qpid/ha/QueueReplicator.h @@ -58,6 +58,8 @@ class QueueReplicator : public broker::Exchange, static const std::string DEQUEUE_EVENT_KEY; static const std::string POSITION_EVENT_KEY; static std::string replicatorName(const std::string& queueName); + static bool isReplicatorName(const std::string&); + /** Test if a string is an event key */ static bool isEventKey(const std::string key); @@ -77,8 +79,16 @@ class QueueReplicator : public broker::Exchange, void route(broker::Deliverable&); bool isBound(boost::shared_ptr<broker::Queue>, const std::string* const, const framing::FieldTable* const); + // Set if the queue has ever been subscribed to, used for auto-delete cleanup. + void setSubscribed() { subscribed = true; } + bool isSubscribed() { return subscribed; } + + boost::shared_ptr<broker::Queue> getQueue() const { return queue; } + private: class ErrorListener; + class QueueObserver; + void initializeBridge(broker::Bridge& bridge, broker::SessionHandler& sessionHandler); void dequeue(framing::SequenceNumber, sys::Mutex::ScopedLock&); @@ -90,6 +100,7 @@ class QueueReplicator : public broker::Exchange, boost::shared_ptr<broker::Link> link; boost::shared_ptr<broker::Bridge> bridge; BrokerInfo brokerInfo; + bool subscribed; }; }} // namespace qpid::ha diff --git a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp index ae6e7181d1..e48db44716 100644 --- a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp +++ b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp @@ -377,7 +377,14 @@ bool ReplicatingSubscription::doDispatch() Mutex::ScopedLock l(lock); if (!dequeues.empty()) sendDequeueEvent(l); } - return ConsumerImpl::doDispatch(); + try { + return ConsumerImpl::doDispatch(); + } + catch (const std::exception& e) { + // FIXME aconway 2012-10-05: detect queue deletion, no warning. + QPID_LOG(warning, logPrefix << " exception in dispatch: " << e.what()); + return false; + } } }} // namespace qpid::ha |