summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2016-04-05 16:13:24 +0000
committerAlan Conway <aconway@apache.org>2016-04-05 16:13:24 +0000
commitc9f1b7641fa089eca017a543991485be5de35775 (patch)
tree93604b752bdaf1f4415bc6d66ed1a6a9245a9e4a
parent77a319a41c90efeabbfe42b7b85d6c7ac30a68e6 (diff)
downloadqpid-python-c9f1b7641fa089eca017a543991485be5de35775.tar.gz
QPID-7149: Active HA broker memory leak
The leak was caused by the sys::Poller. When two pollers run in the same process and one is idles, PollerHandles accumulate on the idle poller threads and are never released. The HA broker uses the qpid::messaging API to get initial status from other brokers. The messaging API creates a separate Poller from the broker. The fix is to shut down the qpid::messaging poller as soon as initial status checks are complete so it does not interfere with the broker's Poller. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1737852 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/include/qpid/messaging/shutdown.h38
-rw-r--r--qpid/cpp/src/CMakeLists.txt1
-rw-r--r--qpid/cpp/src/qpid/client/ConnectionImpl.cpp3
-rw-r--r--qpid/cpp/src/qpid/client/ConnectionImpl.h3
-rw-r--r--qpid/cpp/src/qpid/ha/StatusCheck.cpp23
-rw-r--r--qpid/cpp/src/qpid/ha/StatusCheck.h5
-rw-r--r--qpid/cpp/src/qpid/messaging/amqp/DriverImpl.cpp8
-rw-r--r--qpid/cpp/src/qpid/messaging/shutdown.cpp32
8 files changed, 104 insertions, 9 deletions
diff --git a/qpid/cpp/include/qpid/messaging/shutdown.h b/qpid/cpp/include/qpid/messaging/shutdown.h
new file mode 100644
index 0000000000..7744630f13
--- /dev/null
+++ b/qpid/cpp/include/qpid/messaging/shutdown.h
@@ -0,0 +1,38 @@
+#ifndef QPID_MESSAGING_SHUTDOWN_H
+#define QPID_MESSAGING_SHUTDOWN_H
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "qpid/messaging/ImportExport.h"
+
+namespace qpid {
+namespace messaging {
+
+/** Shut down the qpid::messaging library, clean up resources and stop background threads.
+ * Note you cannot use any of the qpid::messaging classes or functions after calling this.
+ *
+ * It is is not normally necessary to call this, the library cleans up automatically on process exit.
+ * You can use it to clean up resources early in unusual situations.
+ */
+QPID_MESSAGING_EXTERN void shutdown();
+
+}}
+
+#endif // QPID_MESSAGING_SHUTDOWN_H
diff --git a/qpid/cpp/src/CMakeLists.txt b/qpid/cpp/src/CMakeLists.txt
index 45f5987a6c..36a4632e96 100644
--- a/qpid/cpp/src/CMakeLists.txt
+++ b/qpid/cpp/src/CMakeLists.txt
@@ -964,6 +964,7 @@ set (qpidmessaging_SOURCES
qpid/messaging/ReceiverImpl.h
qpid/messaging/SessionImpl.h
qpid/messaging/SenderImpl.h
+ qpid/messaging/shutdown.cpp
qpid/client/amqp0_10/AcceptTracker.h
qpid/client/amqp0_10/AcceptTracker.cpp
qpid/client/amqp0_10/AddressResolution.h
diff --git a/qpid/cpp/src/qpid/client/ConnectionImpl.cpp b/qpid/cpp/src/qpid/client/ConnectionImpl.cpp
index a9d88bdb87..4c2cbc0245 100644
--- a/qpid/cpp/src/qpid/client/ConnectionImpl.cpp
+++ b/qpid/cpp/src/qpid/client/ConnectionImpl.cpp
@@ -448,5 +448,8 @@ std::ostream& operator<<(std::ostream& o, const ConnectionImpl& c) {
return o << "Connection <not connected>";
}
+void shutdown() {
+ theIO().poller()->shutdown();
+}
}} // namespace qpid::client
diff --git a/qpid/cpp/src/qpid/client/ConnectionImpl.h b/qpid/cpp/src/qpid/client/ConnectionImpl.h
index b07ce142fb..1087204e35 100644
--- a/qpid/cpp/src/qpid/client/ConnectionImpl.h
+++ b/qpid/cpp/src/qpid/client/ConnectionImpl.h
@@ -98,6 +98,9 @@ class ConnectionImpl : public Bounds,
friend std::ostream& operator<<(std::ostream&, const ConnectionImpl&);
};
+// Shut down the poller early. Internal use only.
+void shutdown();
+
}}
diff --git a/qpid/cpp/src/qpid/ha/StatusCheck.cpp b/qpid/cpp/src/qpid/ha/StatusCheck.cpp
index 8acf8d6cdc..b6bce0fd7b 100644
--- a/qpid/cpp/src/qpid/ha/StatusCheck.cpp
+++ b/qpid/cpp/src/qpid/ha/StatusCheck.cpp
@@ -23,6 +23,7 @@
#include "HaBroker.h"
#include "qpid/broker/Broker.h"
#include "qpid/log/Statement.h"
+#include "qpid/messaging/shutdown.h"
#include "qpid/messaging/Address.h"
#include "qpid/messaging/Connection.h"
#include "qpid/messaging/Message.h"
@@ -97,14 +98,17 @@ void StatusCheckThread::run() {
string status = details["status"].getString();
QPID_LOG(debug, logPrefix << status);
if (status != "joining") {
- statusCheck.setPromote(false);
+ statusCheck.noPromote();
QPID_LOG(info, logPrefix << "Joining established cluster");
}
}
else
QPID_LOG(error, logPrefix << "Invalid response " << response.getContent());
- } catch(...) {}
+ } catch(const std::exception& e) {
+ QPID_LOG(info, logPrefix << e.what());
+ }
try { c.close(); } catch(...) {}
+ statusCheck.endThread();
delete this;
}
@@ -117,16 +121,25 @@ StatusCheck::StatusCheck(HaBroker& hb) :
{}
StatusCheck::~StatusCheck() {
- // Join any leftovers
+ // In case canPromote was never called.
for (size_t i = 0; i < threads.size(); ++i) threads[i].join();
}
void StatusCheck::setUrl(const Url& url) {
Mutex::ScopedLock l(lock);
+ threadCount = url.size();
for (size_t i = 0; i < url.size(); ++i)
threads.push_back(Thread(new StatusCheckThread(*this, url[i])));
}
+void StatusCheck::endThread() {
+ // Shut down the client poller ASAP to avoid conflict with the broker's poller.
+ // See https://issues.apache.org/jira/browse/QPID-7149
+ if (--threadCount == 0) {
+ messaging::shutdown();
+ }
+}
+
bool StatusCheck::canPromote() {
Mutex::ScopedLock l(lock);
while (!threads.empty()) {
@@ -138,9 +151,9 @@ bool StatusCheck::canPromote() {
return promote;
}
-void StatusCheck::setPromote(bool p) {
+void StatusCheck::noPromote() {
Mutex::ScopedLock l(lock);
- promote = p;
+ promote = false;
}
}} // namespace qpid::ha
diff --git a/qpid/cpp/src/qpid/ha/StatusCheck.h b/qpid/cpp/src/qpid/ha/StatusCheck.h
index 087e586b2b..2ff6f1ffba 100644
--- a/qpid/cpp/src/qpid/ha/StatusCheck.h
+++ b/qpid/cpp/src/qpid/ha/StatusCheck.h
@@ -25,6 +25,7 @@
#include "BrokerInfo.h"
#include "Settings.h"
#include "qpid/Url.h"
+#include "qpid/sys/AtomicValue.h"
#include "qpid/sys/Thread.h"
#include "qpid/sys/Mutex.h"
#include "qpid/sys/Runnable.h"
@@ -60,10 +61,12 @@ class StatusCheck
bool canPromote();
private:
- void setPromote(bool p);
+ void noPromote();
+ void endThread();
sys::Mutex lock;
std::vector<sys::Thread> threads;
+ sys::AtomicValue<int> threadCount;
bool promote;
const Settings settings;
const sys::Duration heartbeat;
diff --git a/qpid/cpp/src/qpid/messaging/amqp/DriverImpl.cpp b/qpid/cpp/src/qpid/messaging/amqp/DriverImpl.cpp
index ebe3fff1cb..67b2ae2f6f 100644
--- a/qpid/cpp/src/qpid/messaging/amqp/DriverImpl.cpp
+++ b/qpid/cpp/src/qpid/messaging/amqp/DriverImpl.cpp
@@ -47,9 +47,11 @@ void DriverImpl::start()
void DriverImpl::stop()
{
QPID_LOG(debug, "Driver stopped");
- poller->shutdown();
- thread.join();
- timer->stop();
+ if (!poller->hasShutdown()) {
+ poller->shutdown();
+ thread.join();
+ timer->stop();
+ }
}
boost::shared_ptr<Transport> DriverImpl::getTransport(const std::string& protocol, TransportContext& connection)
diff --git a/qpid/cpp/src/qpid/messaging/shutdown.cpp b/qpid/cpp/src/qpid/messaging/shutdown.cpp
new file mode 100644
index 0000000000..01c7f56ac0
--- /dev/null
+++ b/qpid/cpp/src/qpid/messaging/shutdown.cpp
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <qpid/messaging/shutdown.h>
+#include "../client/ConnectionImpl.h"
+#include "amqp/DriverImpl.h"
+
+namespace qpid {
+namespace messaging {
+
+void shutdown() {
+ amqp::DriverImpl::getDefault()->stop();
+ qpid::client::shutdown();
+}
+
+}}