summaryrefslogtreecommitdiff
path: root/cpp/src
diff options
context:
space:
mode:
authorAndrew Stitcher <astitcher@apache.org>2009-12-07 15:42:14 +0000
committerAndrew Stitcher <astitcher@apache.org>2009-12-07 15:42:14 +0000
commit7bbd0fdd577b167127633a7b52fe7ea487b1f267 (patch)
tree5752af0626dad35879b91c96d09d802595f779d8 /cpp/src
parent6a667d2a64bb124c585ef165821dfd60e3f5de04 (diff)
downloadqpid-python-7bbd0fdd577b167127633a7b52fe7ea487b1f267.tar.gz
QPID-2214: Opening and closing client connections causes memory use to grow unboundedly
- Clean up the DeletionManager state for each thread when the thread exits git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@887956 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src')
-rw-r--r--cpp/src/qpid/sys/DeletionManager.h40
-rw-r--r--cpp/src/qpid/sys/epoll/EpollPoller.cpp50
2 files changed, 63 insertions, 27 deletions
diff --git a/cpp/src/qpid/sys/DeletionManager.h b/cpp/src/qpid/sys/DeletionManager.h
index 43154eb98e..5d8428966e 100644
--- a/cpp/src/qpid/sys/DeletionManager.h
+++ b/cpp/src/qpid/sys/DeletionManager.h
@@ -54,6 +54,8 @@ struct deleter
template <typename H>
class DeletionManager
{
+ struct ThreadStatus;
+
public:
// Mark every thread as using the handle - it will be deleted
// below after every thread marks the handle as unused
@@ -65,6 +67,28 @@ public:
// handles get deleted here when no one else
// is using them either
static void markAllUnusedInThisThread() {
+ ThreadStatus* threadStatus = getThreadStatus();
+ ScopedLock<Mutex> l(threadStatus->lock);
+
+ // The actual deletions will happen here when all the shared_ptr
+ // ref counts hit 0 (that is when every thread marks the handle unused)
+ threadStatus->handles.clear();
+ }
+
+ static void destroyThreadState() {
+ ThreadStatus* threadStatus = getThreadStatus();
+ {
+ ScopedLock<Mutex> l(threadStatus->lock);
+
+ allThreadsStatuses.delThreadStatus(threadStatus);
+ }
+ delete threadStatus;
+ threadStatus = 0;
+ }
+
+private:
+
+ static ThreadStatus*& getThreadStatus() {
static __thread ThreadStatus* threadStatus = 0;
// Thread local vars can't be dynamically constructed so we need
@@ -75,14 +99,9 @@ public:
allThreadsStatuses.addThreadStatus(threadStatus);
}
- ScopedLock<Mutex> l(threadStatus->lock);
-
- // The actual deletions will happen here when all the shared_ptr
- // ref counts hit 0 (that is when every thread marks the handle unused)
- threadStatus->handles.clear();
+ return threadStatus;
}
-private:
typedef boost::shared_ptr<H> shared_ptr;
// In theory we know that we never need more handles than the number of
@@ -125,6 +144,15 @@ private:
statuses.push_back(t);
}
+ void delThreadStatus(ThreadStatus* t) {
+ ScopedLock<Mutex> l(lock);
+ typename std::vector<ThreadStatus*>::iterator it =
+ std::find(statuses.begin(),statuses.end(), t);
+ if (it != statuses.end()) {
+ statuses.erase(it);
+ }
+ }
+
void addHandle(shared_ptr h) {
ScopedLock<Mutex> l(lock);
std::for_each(statuses.begin(), statuses.end(), handleAdder(h));
diff --git a/cpp/src/qpid/sys/epoll/EpollPoller.cpp b/cpp/src/qpid/sys/epoll/EpollPoller.cpp
index fd9a4b3468..d7f64f3b4c 100644
--- a/cpp/src/qpid/sys/epoll/EpollPoller.cpp
+++ b/cpp/src/qpid/sys/epoll/EpollPoller.cpp
@@ -25,6 +25,7 @@
#include "qpid/sys/DeletionManager.h"
#include "qpid/sys/posix/check.h"
#include "qpid/sys/posix/PrivatePosix.h"
+#include "qpid/log/Statement.h"
#include <sys/epoll.h>
#include <errno.h>
@@ -467,28 +468,35 @@ bool Poller::interrupt(PollerHandle& handle) {
}
void Poller::run() {
- // Make sure we can't be interrupted by signals at a bad time
- ::sigset_t ss;
- ::sigfillset(&ss);
- ::pthread_sigmask(SIG_SETMASK, &ss, 0);
-
- do {
- Event event = wait();
-
- // If can read/write then dispatch appropriate callbacks
- if (event.handle) {
- event.process();
- } else {
- // Handle shutdown
- switch (event.type) {
- case SHUTDOWN:
- return;
- default:
- // This should be impossible
- assert(false);
+ // Ensure that we exit thread responsibly under all circumstances
+ try {
+ // Make sure we can't be interrupted by signals at a bad time
+ ::sigset_t ss;
+ ::sigfillset(&ss);
+ ::pthread_sigmask(SIG_SETMASK, &ss, 0);
+
+ do {
+ Event event = wait();
+
+ // If can read/write then dispatch appropriate callbacks
+ if (event.handle) {
+ event.process();
+ } else {
+ // Handle shutdown
+ switch (event.type) {
+ case SHUTDOWN:
+ PollerHandleDeletionManager.destroyThreadState();
+ return;
+ default:
+ // This should be impossible
+ assert(false);
+ }
}
- }
- } while (true);
+ } while (true);
+ } catch (const std::exception& e) {
+ QPID_LOG(error, "IO worker thread exiting with unhandled exception: " << e.what());
+ }
+ PollerHandleDeletionManager.destroyThreadState();
}
Poller::Event Poller::wait(Duration timeout) {