diff options
author | Andrew Stitcher <astitcher@apache.org> | 2009-12-07 15:42:14 +0000 |
---|---|---|
committer | Andrew Stitcher <astitcher@apache.org> | 2009-12-07 15:42:14 +0000 |
commit | 7bbd0fdd577b167127633a7b52fe7ea487b1f267 (patch) | |
tree | 5752af0626dad35879b91c96d09d802595f779d8 /cpp/src | |
parent | 6a667d2a64bb124c585ef165821dfd60e3f5de04 (diff) | |
download | qpid-python-7bbd0fdd577b167127633a7b52fe7ea487b1f267.tar.gz |
QPID-2214: Opening and closing client connections causes memory use to grow unboundedly
- Clean up the DeletionManager state for each thread when the thread exits
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@887956 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src')
-rw-r--r-- | cpp/src/qpid/sys/DeletionManager.h | 40 | ||||
-rw-r--r-- | cpp/src/qpid/sys/epoll/EpollPoller.cpp | 50 |
2 files changed, 63 insertions, 27 deletions
diff --git a/cpp/src/qpid/sys/DeletionManager.h b/cpp/src/qpid/sys/DeletionManager.h index 43154eb98e..5d8428966e 100644 --- a/cpp/src/qpid/sys/DeletionManager.h +++ b/cpp/src/qpid/sys/DeletionManager.h @@ -54,6 +54,8 @@ struct deleter template <typename H> class DeletionManager { + struct ThreadStatus; + public: // Mark every thread as using the handle - it will be deleted // below after every thread marks the handle as unused @@ -65,6 +67,28 @@ public: // handles get deleted here when no one else // is using them either static void markAllUnusedInThisThread() { + ThreadStatus* threadStatus = getThreadStatus(); + ScopedLock<Mutex> l(threadStatus->lock); + + // The actual deletions will happen here when all the shared_ptr + // ref counts hit 0 (that is when every thread marks the handle unused) + threadStatus->handles.clear(); + } + + static void destroyThreadState() { + ThreadStatus* threadStatus = getThreadStatus(); + { + ScopedLock<Mutex> l(threadStatus->lock); + + allThreadsStatuses.delThreadStatus(threadStatus); + } + delete threadStatus; + threadStatus = 0; + } + +private: + + static ThreadStatus*& getThreadStatus() { static __thread ThreadStatus* threadStatus = 0; // Thread local vars can't be dynamically constructed so we need @@ -75,14 +99,9 @@ public: allThreadsStatuses.addThreadStatus(threadStatus); } - ScopedLock<Mutex> l(threadStatus->lock); - - // The actual deletions will happen here when all the shared_ptr - // ref counts hit 0 (that is when every thread marks the handle unused) - threadStatus->handles.clear(); + return threadStatus; } -private: typedef boost::shared_ptr<H> shared_ptr; // In theory we know that we never need more handles than the number of @@ -125,6 +144,15 @@ private: statuses.push_back(t); } + void delThreadStatus(ThreadStatus* t) { + ScopedLock<Mutex> l(lock); + typename std::vector<ThreadStatus*>::iterator it = + std::find(statuses.begin(),statuses.end(), t); + if (it != statuses.end()) { + statuses.erase(it); + } + } + void addHandle(shared_ptr h) { ScopedLock<Mutex> l(lock); std::for_each(statuses.begin(), statuses.end(), handleAdder(h)); diff --git a/cpp/src/qpid/sys/epoll/EpollPoller.cpp b/cpp/src/qpid/sys/epoll/EpollPoller.cpp index fd9a4b3468..d7f64f3b4c 100644 --- a/cpp/src/qpid/sys/epoll/EpollPoller.cpp +++ b/cpp/src/qpid/sys/epoll/EpollPoller.cpp @@ -25,6 +25,7 @@ #include "qpid/sys/DeletionManager.h" #include "qpid/sys/posix/check.h" #include "qpid/sys/posix/PrivatePosix.h" +#include "qpid/log/Statement.h" #include <sys/epoll.h> #include <errno.h> @@ -467,28 +468,35 @@ bool Poller::interrupt(PollerHandle& handle) { } void Poller::run() { - // Make sure we can't be interrupted by signals at a bad time - ::sigset_t ss; - ::sigfillset(&ss); - ::pthread_sigmask(SIG_SETMASK, &ss, 0); - - do { - Event event = wait(); - - // If can read/write then dispatch appropriate callbacks - if (event.handle) { - event.process(); - } else { - // Handle shutdown - switch (event.type) { - case SHUTDOWN: - return; - default: - // This should be impossible - assert(false); + // Ensure that we exit thread responsibly under all circumstances + try { + // Make sure we can't be interrupted by signals at a bad time + ::sigset_t ss; + ::sigfillset(&ss); + ::pthread_sigmask(SIG_SETMASK, &ss, 0); + + do { + Event event = wait(); + + // If can read/write then dispatch appropriate callbacks + if (event.handle) { + event.process(); + } else { + // Handle shutdown + switch (event.type) { + case SHUTDOWN: + PollerHandleDeletionManager.destroyThreadState(); + return; + default: + // This should be impossible + assert(false); + } } - } - } while (true); + } while (true); + } catch (const std::exception& e) { + QPID_LOG(error, "IO worker thread exiting with unhandled exception: " << e.what()); + } + PollerHandleDeletionManager.destroyThreadState(); } Poller::Event Poller::wait(Duration timeout) { |