diff options
author | Alan Conway <aconway@apache.org> | 2014-02-06 16:49:15 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2014-02-06 16:49:15 +0000 |
commit | 84fb665b89f8f57dff69ad3e9982f7c1fa19ae4d (patch) | |
tree | a8436b14790873ea60d9926667e6efc9c927445a | |
parent | 2762f18d32aa2aadc66dfccf94bea823df75336e (diff) | |
download | qpid-python-84fb665b89f8f57dff69ad3e9982f7c1fa19ae4d.tar.gz |
QPID-5544: HA memory leak in backup broker after shutdown.
The memory leaks were due to shared_ptr cycles between the QueueReplicator,
Bridge and Link objects. This patch breaks the cycles using weak_ptrs
in the appropriate places.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1565340 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | qpid/cpp/src/qpid/ha/Backup.cpp | 8 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/ha/BrokerReplicator.cpp | 56 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/ha/BrokerReplicator.h | 4 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/ha/ErrorListener.h | 60 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/ha/QueueReplicator.cpp | 24 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/ha/QueueReplicator.h | 2 |
6 files changed, 45 insertions, 109 deletions
diff --git a/qpid/cpp/src/qpid/ha/Backup.cpp b/qpid/cpp/src/qpid/ha/Backup.cpp index beae53d85f..9d50b1c665 100644 --- a/qpid/cpp/src/qpid/ha/Backup.cpp +++ b/qpid/cpp/src/qpid/ha/Backup.cpp @@ -63,18 +63,16 @@ void Backup::setBrokerUrl(const Url& brokers) { QPID_LOG(info, logPrefix << "Connecting to cluster, broker URL: " << brokers); string protocol = brokers[0].protocol.empty() ? "tcp" : brokers[0].protocol; types::Uuid uuid(true); - std::pair<Link::shared_ptr, bool> result; - result = broker.getLinks().declare( + link = broker.getLinks().declare( broker::QPID_NAME_PREFIX + string("ha.link.") + uuid.str(), brokers[0].host, brokers[0].port, protocol, false, // durable settings.mechanism, settings.username, settings.password, - false); // no amq.failover - don't want to use client URL. - link = result.first; + false).first; // no amq.failover - don't want to use client URL. replicator = BrokerReplicator::create(haBroker, link); broker.getExchanges().registerExchange(replicator); } - link->setUrl(brokers); // Outside the lock, once set link doesn't change. + link->setUrl(brokers); } void Backup::stop(Mutex::ScopedLock&) { diff --git a/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp b/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp index 7928b6ab71..3957ef5a0c 100644 --- a/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp +++ b/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp @@ -270,7 +270,8 @@ template <class EventType> std::string key() { } boost::shared_ptr<BrokerReplicator> BrokerReplicator::create( - HaBroker& hb, const boost::shared_ptr<broker::Link>& l) { + HaBroker& hb, const boost::shared_ptr<broker::Link>& l) +{ boost::shared_ptr<BrokerReplicator> br(new BrokerReplicator(hb, l)); br->initialize(); return br; @@ -330,13 +331,21 @@ void BrokerReplicator::initialize() { BrokerReplicator::~BrokerReplicator() {} namespace { -void collectQueueReplicators( - const boost::shared_ptr<Exchange>& ex, - set<boost::shared_ptr<QueueReplicator> >& collect) -{ - boost::shared_ptr<QueueReplicator> qr(boost::dynamic_pointer_cast<QueueReplicator>(ex)); - if (qr) collect.insert(qr); -} +struct QueueReplicators : public std::deque<boost::shared_ptr<QueueReplicator> > { + QueueReplicators(const ExchangeRegistry& er) { addAll(er); } + + /** Add the exchange if it is a QueueReplicator. */ + void add(const boost::shared_ptr<Exchange>& ex) { + boost::shared_ptr<QueueReplicator> qr = + boost::dynamic_pointer_cast<QueueReplicator>(ex); + if (qr) push_back(qr); + } + /** Add all QueueReplicator in the ExchangeRegistry. */ + void addAll(const ExchangeRegistry& er) { + // Make copy of exchanges so we can work outside the registry lock. + er.eachExchange(boost::bind(&QueueReplicators::add, this, _1)); + } +}; } // namespace void BrokerReplicator::shutdown() { @@ -877,35 +886,22 @@ void BrokerReplicator::forced(broker::Connection& c, const std::string& message) string BrokerReplicator::getType() const { return QPID_CONFIGURATION_REPLICATOR; } -void BrokerReplicator::disconnectedQueueReplicator(boost::shared_ptr<Exchange> ex) { - boost::shared_ptr<QueueReplicator> qr(boost::dynamic_pointer_cast<QueueReplicator>(ex)); - if (qr) { - qr->disconnect(); - if (TxReplicator::isTxQueue(qr->getQueue()->getName())) { - // Transactions are aborted on failover so clean up tx-queues - deleteQueue(qr->getQueue()->getName()); - } +void BrokerReplicator::disconnectedQueueReplicator( + const boost::shared_ptr<QueueReplicator>& qr) +{ + qr->disconnect(); + if (TxReplicator::isTxQueue(qr->getQueue()->getName())) { + // Transactions are aborted on failover so clean up tx-queues + deleteQueue(qr->getQueue()->getName()); } } -typedef vector<boost::shared_ptr<Exchange> > ExchangeVector; - -// Callback function for accumulating exchange candidates -namespace { -void exchangeAccumulatorCallback(ExchangeVector& ev, const Exchange::shared_ptr& i) { - ev.push_back(i); -} -} - // Called by ConnectionObserver::disconnected, disconnected from the network side. void BrokerReplicator::disconnected() { QPID_LOG(info, logPrefix << "Disconnected from primary " << primary); connect = 0; - - // Make copy of exchanges so we can work outside the registry lock. - ExchangeVector exs; - exchanges.eachExchange(boost::bind(&exchangeAccumulatorCallback, boost::ref(exs), _1)); - for_each(exs.begin(), exs.end(), + QueueReplicators qrs(broker.getExchanges()); + for_each(qrs.begin(), qrs.end(), boost::bind(&BrokerReplicator::disconnectedQueueReplicator, this, _1)); } diff --git a/qpid/cpp/src/qpid/ha/BrokerReplicator.h b/qpid/cpp/src/qpid/ha/BrokerReplicator.h index 445406ad19..1e051878ae 100644 --- a/qpid/cpp/src/qpid/ha/BrokerReplicator.h +++ b/qpid/cpp/src/qpid/ha/BrokerReplicator.h @@ -108,8 +108,6 @@ class BrokerReplicator : public broker::Exchange, typedef void (BrokerReplicator::*DispatchFunction)(types::Variant::Map&); typedef qpid::sys::unordered_map<std::string, DispatchFunction> EventDispatchMap; - typedef qpid::sys::unordered_map<std::string, QueueReplicatorPtr> QueueReplicatorMap; - class UpdateTracker; class ErrorListener; @@ -152,7 +150,7 @@ class BrokerReplicator : public broker::Exchange, void deleteQueue(const std::string& name, bool purge=true); void deleteExchange(const std::string& name); - void disconnectedQueueReplicator(boost::shared_ptr<broker::Exchange>); + void disconnectedQueueReplicator(const boost::shared_ptr<QueueReplicator>&); void disconnected(); void setMembership(const types::Variant::List&); // Set membership from list. diff --git a/qpid/cpp/src/qpid/ha/ErrorListener.h b/qpid/cpp/src/qpid/ha/ErrorListener.h deleted file mode 100644 index 1ae2078a11..0000000000 --- a/qpid/cpp/src/qpid/ha/ErrorListener.h +++ /dev/null @@ -1,60 +0,0 @@ -#ifndef QPID_HA_ERRORLISTENER_H -#define QPID_HA_ERRORLISTENER_H - -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include "qpid/broker/SessionHandler.h" -#include "qpid/log/Statement.h" -#include "qpid/framing/reply_exceptions.h" - -namespace qpid { -namespace ha { - -/** Default ErrorListener for HA module */ -class ErrorListener : public broker::SessionHandler::ErrorListener { - public: - ErrorListener(const std::string& logPrefix_) : logPrefix(logPrefix_) {} - - void connectionException(framing::connection::CloseCode code, const std::string& msg) { - QPID_LOG(error, logPrefix << framing::createConnectionException(code, msg).what()); - } - void channelException(framing::session::DetachCode code, const std::string& msg) { - QPID_LOG(error, logPrefix << framing::createChannelException(code, msg).what()); - } - void executionException(framing::execution::ErrorCode code, const std::string& msg) { - QPID_LOG(error, logPrefix << framing::createSessionException(code, msg).what()); - } - void incomingExecutionException(framing::execution::ErrorCode code, const std::string& msg) { - QPID_LOG(error, logPrefix << "Incoming " << framing::createSessionException(code, msg).what()); - } - void detach() { - QPID_LOG(error, logPrefix << "Session detached."); - } - - private: - std::string logPrefix; -}; - - -}} // namespace qpid::ha - -#endif /*!QPID_HA_ERRORLISTENER_H*/ diff --git a/qpid/cpp/src/qpid/ha/QueueReplicator.cpp b/qpid/cpp/src/qpid/ha/QueueReplicator.cpp index 59b2013f59..6881896f5e 100644 --- a/qpid/cpp/src/qpid/ha/QueueReplicator.cpp +++ b/qpid/cpp/src/qpid/ha/QueueReplicator.cpp @@ -39,6 +39,7 @@ #include "qpid/Msg.h" #include "qpid/assert.h" #include <boost/shared_ptr.hpp> +#include <boost/weak_ptr.hpp> #include <boost/bind.hpp> @@ -90,7 +91,8 @@ class QueueReplicator::ErrorListener : public SessionHandler::ErrorListener { QPID_LOG(debug, logPrefix << framing::createSessionException(code, msg).what()); } void incomingExecutionException(ErrorCode code, const std::string& msg) { - if (!queueReplicator->deletedOnPrimary(code, msg)) + boost::shared_ptr<QueueReplicator> qr = queueReplicator.lock(); + if (qr && !qr->deletedOnPrimary(code, msg)) QPID_LOG(error, logPrefix << "Incoming " << framing::createSessionException(code, msg).what()); } @@ -98,7 +100,7 @@ class QueueReplicator::ErrorListener : public SessionHandler::ErrorListener { QPID_LOG(debug, logPrefix << "Session detached"); } private: - boost::shared_ptr<QueueReplicator> queueReplicator; + boost::weak_ptr<QueueReplicator> queueReplicator; std::string logPrefix; }; @@ -112,9 +114,12 @@ class QueueReplicator::QueueObserver : public broker::QueueObserver { void consumerAdded( const Consumer& ) {} void consumerRemoved( const Consumer& ) {} // Queue observer is destroyed when the queue is. - void destroy() { queueReplicator->destroy(); } + void destroy() { + boost::shared_ptr<QueueReplicator> qr = queueReplicator.lock(); + if (qr) qr->destroy(); + } private: - boost::shared_ptr<QueueReplicator> queueReplicator; + boost::weak_ptr<QueueReplicator> queueReplicator; }; boost::shared_ptr<QueueReplicator> QueueReplicator::create( @@ -171,8 +176,7 @@ void QueueReplicator::initialize() { throw Exception(QPID_MSG("Duplicate queue replicator " << getName())); // Enable callback to initializeBridge - std::pair<Bridge::shared_ptr, bool> result = - queue->getBroker()->getLinks().declare( + boost::shared_ptr<Bridge> b = queue->getBroker()->getLinks().declare( bridgeName, *link, false, // durable @@ -189,10 +193,10 @@ void QueueReplicator::initialize() { // Include shared_ptr to self to ensure we are not deleted // before initializeBridge is called. boost::bind(&QueueReplicator::initializeBridge, shared_from_this(), _1, _2) - ); - bridge = result.first; - bridge->setErrorListener( + ).first; + b->setErrorListener( boost::shared_ptr<ErrorListener>(new ErrorListener(shared_from_this()))); + bridge = b; // bridge is a weak_ptr to avoid a cycle. // Enable callback to destroy() queue->getObservers().add( @@ -211,7 +215,7 @@ void QueueReplicator::destroy() { { Mutex::ScopedLock l(lock); if (!queue) return; // Already destroyed - bridge2 = bridge; // call close outside the lock. + bridge2 = bridge.lock(); // !call close outside the lock. destroy(l); } if (bridge2) bridge2->close(); // Outside of lock, avoid deadlock. diff --git a/qpid/cpp/src/qpid/ha/QueueReplicator.h b/qpid/cpp/src/qpid/ha/QueueReplicator.h index f94c6de116..757f12c7a9 100644 --- a/qpid/cpp/src/qpid/ha/QueueReplicator.h +++ b/qpid/cpp/src/qpid/ha/QueueReplicator.h @@ -112,7 +112,7 @@ class QueueReplicator : public broker::Exchange, const BrokerInfo brokerInfo; DispatchMap dispatch; boost::shared_ptr<broker::Link> link; - boost::shared_ptr<broker::Bridge> bridge; + boost::weak_ptr<broker::Bridge> bridge; boost::shared_ptr<broker::Queue> queue; broker::SessionHandler* sessionHandler; |