summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2014-02-06 16:49:15 +0000
committerAlan Conway <aconway@apache.org>2014-02-06 16:49:15 +0000
commit84fb665b89f8f57dff69ad3e9982f7c1fa19ae4d (patch)
treea8436b14790873ea60d9926667e6efc9c927445a
parent2762f18d32aa2aadc66dfccf94bea823df75336e (diff)
downloadqpid-python-84fb665b89f8f57dff69ad3e9982f7c1fa19ae4d.tar.gz
QPID-5544: HA memory leak in backup broker after shutdown.
The memory leaks were due to shared_ptr cycles between the QueueReplicator, Bridge and Link objects. This patch breaks the cycles using weak_ptrs in the appropriate places. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1565340 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/src/qpid/ha/Backup.cpp8
-rw-r--r--qpid/cpp/src/qpid/ha/BrokerReplicator.cpp56
-rw-r--r--qpid/cpp/src/qpid/ha/BrokerReplicator.h4
-rw-r--r--qpid/cpp/src/qpid/ha/ErrorListener.h60
-rw-r--r--qpid/cpp/src/qpid/ha/QueueReplicator.cpp24
-rw-r--r--qpid/cpp/src/qpid/ha/QueueReplicator.h2
6 files changed, 45 insertions, 109 deletions
diff --git a/qpid/cpp/src/qpid/ha/Backup.cpp b/qpid/cpp/src/qpid/ha/Backup.cpp
index beae53d85f..9d50b1c665 100644
--- a/qpid/cpp/src/qpid/ha/Backup.cpp
+++ b/qpid/cpp/src/qpid/ha/Backup.cpp
@@ -63,18 +63,16 @@ void Backup::setBrokerUrl(const Url& brokers) {
QPID_LOG(info, logPrefix << "Connecting to cluster, broker URL: " << brokers);
string protocol = brokers[0].protocol.empty() ? "tcp" : brokers[0].protocol;
types::Uuid uuid(true);
- std::pair<Link::shared_ptr, bool> result;
- result = broker.getLinks().declare(
+ link = broker.getLinks().declare(
broker::QPID_NAME_PREFIX + string("ha.link.") + uuid.str(),
brokers[0].host, brokers[0].port, protocol,
false, // durable
settings.mechanism, settings.username, settings.password,
- false); // no amq.failover - don't want to use client URL.
- link = result.first;
+ false).first; // no amq.failover - don't want to use client URL.
replicator = BrokerReplicator::create(haBroker, link);
broker.getExchanges().registerExchange(replicator);
}
- link->setUrl(brokers); // Outside the lock, once set link doesn't change.
+ link->setUrl(brokers);
}
void Backup::stop(Mutex::ScopedLock&) {
diff --git a/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp b/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
index 7928b6ab71..3957ef5a0c 100644
--- a/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
+++ b/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
@@ -270,7 +270,8 @@ template <class EventType> std::string key() {
}
boost::shared_ptr<BrokerReplicator> BrokerReplicator::create(
- HaBroker& hb, const boost::shared_ptr<broker::Link>& l) {
+ HaBroker& hb, const boost::shared_ptr<broker::Link>& l)
+{
boost::shared_ptr<BrokerReplicator> br(new BrokerReplicator(hb, l));
br->initialize();
return br;
@@ -330,13 +331,21 @@ void BrokerReplicator::initialize() {
BrokerReplicator::~BrokerReplicator() {}
namespace {
-void collectQueueReplicators(
- const boost::shared_ptr<Exchange>& ex,
- set<boost::shared_ptr<QueueReplicator> >& collect)
-{
- boost::shared_ptr<QueueReplicator> qr(boost::dynamic_pointer_cast<QueueReplicator>(ex));
- if (qr) collect.insert(qr);
-}
+struct QueueReplicators : public std::deque<boost::shared_ptr<QueueReplicator> > {
+ QueueReplicators(const ExchangeRegistry& er) { addAll(er); }
+
+ /** Add the exchange if it is a QueueReplicator. */
+ void add(const boost::shared_ptr<Exchange>& ex) {
+ boost::shared_ptr<QueueReplicator> qr =
+ boost::dynamic_pointer_cast<QueueReplicator>(ex);
+ if (qr) push_back(qr);
+ }
+ /** Add all QueueReplicator in the ExchangeRegistry. */
+ void addAll(const ExchangeRegistry& er) {
+ // Make copy of exchanges so we can work outside the registry lock.
+ er.eachExchange(boost::bind(&QueueReplicators::add, this, _1));
+ }
+};
} // namespace
void BrokerReplicator::shutdown() {
@@ -877,35 +886,22 @@ void BrokerReplicator::forced(broker::Connection& c, const std::string& message)
string BrokerReplicator::getType() const { return QPID_CONFIGURATION_REPLICATOR; }
-void BrokerReplicator::disconnectedQueueReplicator(boost::shared_ptr<Exchange> ex) {
- boost::shared_ptr<QueueReplicator> qr(boost::dynamic_pointer_cast<QueueReplicator>(ex));
- if (qr) {
- qr->disconnect();
- if (TxReplicator::isTxQueue(qr->getQueue()->getName())) {
- // Transactions are aborted on failover so clean up tx-queues
- deleteQueue(qr->getQueue()->getName());
- }
+void BrokerReplicator::disconnectedQueueReplicator(
+ const boost::shared_ptr<QueueReplicator>& qr)
+{
+ qr->disconnect();
+ if (TxReplicator::isTxQueue(qr->getQueue()->getName())) {
+ // Transactions are aborted on failover so clean up tx-queues
+ deleteQueue(qr->getQueue()->getName());
}
}
-typedef vector<boost::shared_ptr<Exchange> > ExchangeVector;
-
-// Callback function for accumulating exchange candidates
-namespace {
-void exchangeAccumulatorCallback(ExchangeVector& ev, const Exchange::shared_ptr& i) {
- ev.push_back(i);
-}
-}
-
// Called by ConnectionObserver::disconnected, disconnected from the network side.
void BrokerReplicator::disconnected() {
QPID_LOG(info, logPrefix << "Disconnected from primary " << primary);
connect = 0;
-
- // Make copy of exchanges so we can work outside the registry lock.
- ExchangeVector exs;
- exchanges.eachExchange(boost::bind(&exchangeAccumulatorCallback, boost::ref(exs), _1));
- for_each(exs.begin(), exs.end(),
+ QueueReplicators qrs(broker.getExchanges());
+ for_each(qrs.begin(), qrs.end(),
boost::bind(&BrokerReplicator::disconnectedQueueReplicator, this, _1));
}
diff --git a/qpid/cpp/src/qpid/ha/BrokerReplicator.h b/qpid/cpp/src/qpid/ha/BrokerReplicator.h
index 445406ad19..1e051878ae 100644
--- a/qpid/cpp/src/qpid/ha/BrokerReplicator.h
+++ b/qpid/cpp/src/qpid/ha/BrokerReplicator.h
@@ -108,8 +108,6 @@ class BrokerReplicator : public broker::Exchange,
typedef void (BrokerReplicator::*DispatchFunction)(types::Variant::Map&);
typedef qpid::sys::unordered_map<std::string, DispatchFunction> EventDispatchMap;
- typedef qpid::sys::unordered_map<std::string, QueueReplicatorPtr> QueueReplicatorMap;
-
class UpdateTracker;
class ErrorListener;
@@ -152,7 +150,7 @@ class BrokerReplicator : public broker::Exchange,
void deleteQueue(const std::string& name, bool purge=true);
void deleteExchange(const std::string& name);
- void disconnectedQueueReplicator(boost::shared_ptr<broker::Exchange>);
+ void disconnectedQueueReplicator(const boost::shared_ptr<QueueReplicator>&);
void disconnected();
void setMembership(const types::Variant::List&); // Set membership from list.
diff --git a/qpid/cpp/src/qpid/ha/ErrorListener.h b/qpid/cpp/src/qpid/ha/ErrorListener.h
deleted file mode 100644
index 1ae2078a11..0000000000
--- a/qpid/cpp/src/qpid/ha/ErrorListener.h
+++ /dev/null
@@ -1,60 +0,0 @@
-#ifndef QPID_HA_ERRORLISTENER_H
-#define QPID_HA_ERRORLISTENER_H
-
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-#include "qpid/broker/SessionHandler.h"
-#include "qpid/log/Statement.h"
-#include "qpid/framing/reply_exceptions.h"
-
-namespace qpid {
-namespace ha {
-
-/** Default ErrorListener for HA module */
-class ErrorListener : public broker::SessionHandler::ErrorListener {
- public:
- ErrorListener(const std::string& logPrefix_) : logPrefix(logPrefix_) {}
-
- void connectionException(framing::connection::CloseCode code, const std::string& msg) {
- QPID_LOG(error, logPrefix << framing::createConnectionException(code, msg).what());
- }
- void channelException(framing::session::DetachCode code, const std::string& msg) {
- QPID_LOG(error, logPrefix << framing::createChannelException(code, msg).what());
- }
- void executionException(framing::execution::ErrorCode code, const std::string& msg) {
- QPID_LOG(error, logPrefix << framing::createSessionException(code, msg).what());
- }
- void incomingExecutionException(framing::execution::ErrorCode code, const std::string& msg) {
- QPID_LOG(error, logPrefix << "Incoming " << framing::createSessionException(code, msg).what());
- }
- void detach() {
- QPID_LOG(error, logPrefix << "Session detached.");
- }
-
- private:
- std::string logPrefix;
-};
-
-
-}} // namespace qpid::ha
-
-#endif /*!QPID_HA_ERRORLISTENER_H*/
diff --git a/qpid/cpp/src/qpid/ha/QueueReplicator.cpp b/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
index 59b2013f59..6881896f5e 100644
--- a/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
+++ b/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
@@ -39,6 +39,7 @@
#include "qpid/Msg.h"
#include "qpid/assert.h"
#include <boost/shared_ptr.hpp>
+#include <boost/weak_ptr.hpp>
#include <boost/bind.hpp>
@@ -90,7 +91,8 @@ class QueueReplicator::ErrorListener : public SessionHandler::ErrorListener {
QPID_LOG(debug, logPrefix << framing::createSessionException(code, msg).what());
}
void incomingExecutionException(ErrorCode code, const std::string& msg) {
- if (!queueReplicator->deletedOnPrimary(code, msg))
+ boost::shared_ptr<QueueReplicator> qr = queueReplicator.lock();
+ if (qr && !qr->deletedOnPrimary(code, msg))
QPID_LOG(error, logPrefix << "Incoming "
<< framing::createSessionException(code, msg).what());
}
@@ -98,7 +100,7 @@ class QueueReplicator::ErrorListener : public SessionHandler::ErrorListener {
QPID_LOG(debug, logPrefix << "Session detached");
}
private:
- boost::shared_ptr<QueueReplicator> queueReplicator;
+ boost::weak_ptr<QueueReplicator> queueReplicator;
std::string logPrefix;
};
@@ -112,9 +114,12 @@ class QueueReplicator::QueueObserver : public broker::QueueObserver {
void consumerAdded( const Consumer& ) {}
void consumerRemoved( const Consumer& ) {}
// Queue observer is destroyed when the queue is.
- void destroy() { queueReplicator->destroy(); }
+ void destroy() {
+ boost::shared_ptr<QueueReplicator> qr = queueReplicator.lock();
+ if (qr) qr->destroy();
+ }
private:
- boost::shared_ptr<QueueReplicator> queueReplicator;
+ boost::weak_ptr<QueueReplicator> queueReplicator;
};
boost::shared_ptr<QueueReplicator> QueueReplicator::create(
@@ -171,8 +176,7 @@ void QueueReplicator::initialize() {
throw Exception(QPID_MSG("Duplicate queue replicator " << getName()));
// Enable callback to initializeBridge
- std::pair<Bridge::shared_ptr, bool> result =
- queue->getBroker()->getLinks().declare(
+ boost::shared_ptr<Bridge> b = queue->getBroker()->getLinks().declare(
bridgeName,
*link,
false, // durable
@@ -189,10 +193,10 @@ void QueueReplicator::initialize() {
// Include shared_ptr to self to ensure we are not deleted
// before initializeBridge is called.
boost::bind(&QueueReplicator::initializeBridge, shared_from_this(), _1, _2)
- );
- bridge = result.first;
- bridge->setErrorListener(
+ ).first;
+ b->setErrorListener(
boost::shared_ptr<ErrorListener>(new ErrorListener(shared_from_this())));
+ bridge = b; // bridge is a weak_ptr to avoid a cycle.
// Enable callback to destroy()
queue->getObservers().add(
@@ -211,7 +215,7 @@ void QueueReplicator::destroy() {
{
Mutex::ScopedLock l(lock);
if (!queue) return; // Already destroyed
- bridge2 = bridge; // call close outside the lock.
+ bridge2 = bridge.lock(); // !call close outside the lock.
destroy(l);
}
if (bridge2) bridge2->close(); // Outside of lock, avoid deadlock.
diff --git a/qpid/cpp/src/qpid/ha/QueueReplicator.h b/qpid/cpp/src/qpid/ha/QueueReplicator.h
index f94c6de116..757f12c7a9 100644
--- a/qpid/cpp/src/qpid/ha/QueueReplicator.h
+++ b/qpid/cpp/src/qpid/ha/QueueReplicator.h
@@ -112,7 +112,7 @@ class QueueReplicator : public broker::Exchange,
const BrokerInfo brokerInfo;
DispatchMap dispatch;
boost::shared_ptr<broker::Link> link;
- boost::shared_ptr<broker::Bridge> bridge;
+ boost::weak_ptr<broker::Bridge> bridge;
boost::shared_ptr<broker::Queue> queue;
broker::SessionHandler* sessionHandler;