diff options
author | Gordon Sim <gsim@apache.org> | 2007-05-22 09:50:45 +0000 |
---|---|---|
committer | Gordon Sim <gsim@apache.org> | 2007-05-22 09:50:45 +0000 |
commit | 13a373d975a60d45b2dd6de4c3cc821296330e16 (patch) | |
tree | 2ae3d0c69b5032e595281b283aeb2b254eea2d8b | |
parent | 89d3aef1e2888d415f11b1c47450a5cf5ad32b3e (diff) | |
download | qpid-python-13a373d975a60d45b2dd6de4c3cc821296330e16.tar.gz |
Patch submitted to qpid-dev by ksmith@redhat.com. Fixes concurrency issues arising from previous move to use singleton apr pool.
"My patch does three things:
1) Modifies the APRPool class to use alloc/free semantics for APR memory pools. Each time a caller calls APRPool::get() they'll their own pool reference. I've fixed up all the call sites I can find to also call APRPool::free() at the appropriate time.
2) Caches freed APR memory pools in a STL stack. This cuts down on the number of memory pools created overall.
3) As a result of doing #1 and #2 I've introduced a guard mutex around APRPool::get() and APRPool::free(). This is to prevent concurrent access to the memory pool cache. If it's too heavyweight, the mutex along with the caching mechanism could be removed entirely."
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/M2@540511 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | cpp/lib/broker/QueueRegistry.cpp | 4 | ||||
-rw-r--r-- | cpp/lib/common/sys/Module.h | 4 | ||||
-rw-r--r-- | cpp/lib/common/sys/Monitor.h | 4 | ||||
-rw-r--r-- | cpp/lib/common/sys/Mutex.h | 10 | ||||
-rw-r--r-- | cpp/lib/common/sys/Thread.h | 22 | ||||
-rw-r--r-- | cpp/lib/common/sys/apr/APRAcceptor.cpp | 23 | ||||
-rw-r--r-- | cpp/lib/common/sys/apr/APRPool.cpp | 39 | ||||
-rw-r--r-- | cpp/lib/common/sys/apr/APRPool.h | 13 | ||||
-rw-r--r-- | cpp/lib/common/sys/apr/LFProcessor.cpp | 6 | ||||
-rw-r--r-- | cpp/lib/common/sys/apr/LFProcessor.h | 3 | ||||
-rw-r--r-- | cpp/lib/common/sys/apr/LFSessionContext.cpp | 6 | ||||
-rw-r--r-- | cpp/lib/common/sys/apr/LFSessionContext.h | 2 | ||||
-rw-r--r-- | cpp/lib/common/sys/apr/Socket.cpp | 8 | ||||
-rw-r--r-- | cpp/lib/common/sys/apr/Thread.cpp | 4 | ||||
-rw-r--r-- | cpp/lib/common/sys/posix/Thread.cpp | 3 |
15 files changed, 121 insertions, 30 deletions
diff --git a/cpp/lib/broker/QueueRegistry.cpp b/cpp/lib/broker/QueueRegistry.cpp index 2d1382ef09..c69d553b06 100644 --- a/cpp/lib/broker/QueueRegistry.cpp +++ b/cpp/lib/broker/QueueRegistry.cpp @@ -28,7 +28,9 @@ using namespace qpid::sys; QueueRegistry::QueueRegistry(MessageStore* const _store) : counter(1), store(_store){} -QueueRegistry::~QueueRegistry(){} +QueueRegistry::~QueueRegistry() +{ +} std::pair<Queue::shared_ptr, bool> QueueRegistry::declare(const string& declareName, bool durable, diff --git a/cpp/lib/common/sys/Module.h b/cpp/lib/common/sys/Module.h index 9bf5d6e1fc..64ed309afb 100644 --- a/cpp/lib/common/sys/Module.h +++ b/cpp/lib/common/sys/Module.h @@ -104,7 +104,9 @@ template <class T> Module<T>::~Module() throw() template <class T> void Module<T>::load(const std::string& name) { - CHECK_APR_SUCCESS(apr_dso_load(&handle, name.c_str(), APRPool::get())); + apr_pool_t* pool = APRPool::get(); + CHECK_APR_SUCCESS(apr_dso_load(&handle, name.c_str(), pool)); + APRPool::free(pool); } template <class T> void Module<T>::unload() diff --git a/cpp/lib/common/sys/Monitor.h b/cpp/lib/common/sys/Monitor.h index e58931e699..c615a97aa3 100644 --- a/cpp/lib/common/sys/Monitor.h +++ b/cpp/lib/common/sys/Monitor.h @@ -60,7 +60,9 @@ class Monitor : public Mutex #ifdef USE_APR Monitor::Monitor() { - CHECK_APR_SUCCESS(apr_thread_cond_create(&condition, APRPool::get())); + apr_pool_t* pool = APRPool::get(); + CHECK_APR_SUCCESS(apr_thread_cond_create(&condition, pool)); + APRPool::free(pool); } Monitor::~Monitor() { diff --git a/cpp/lib/common/sys/Mutex.h b/cpp/lib/common/sys/Mutex.h index 4022da2f6e..9b1412aa09 100644 --- a/cpp/lib/common/sys/Mutex.h +++ b/cpp/lib/common/sys/Mutex.h @@ -21,8 +21,9 @@ #ifdef USE_APR # include <apr_thread_mutex.h> -# include <apr/APRBase.h> -# include <apr/APRPool.h> +# include <apr_pools.h> +# include "apr/APRBase.h" +# include "apr/APRPool.h" #else # include <pthread.h> # include <posix/check.h> @@ -62,6 +63,7 @@ class Mutex : private boost::noncopyable { protected: #ifdef USE_APR apr_thread_mutex_t* mutex; + apr_pool_t* pool; #else pthread_mutex_t mutex; #endif @@ -71,11 +73,13 @@ class Mutex : private boost::noncopyable { // APR ================================================================ Mutex::Mutex() { - CHECK_APR_SUCCESS(apr_thread_mutex_create(&mutex, APR_THREAD_MUTEX_NESTED, APRPool::get())); + pool = APRPool::get(); + CHECK_APR_SUCCESS(apr_thread_mutex_create(&mutex, APR_THREAD_MUTEX_NESTED, pool)); } Mutex::~Mutex(){ CHECK_APR_SUCCESS(apr_thread_mutex_destroy(mutex)); + APRPool::free(pool); } void Mutex::lock() { diff --git a/cpp/lib/common/sys/Thread.h b/cpp/lib/common/sys/Thread.h index 47b95b6234..c14c7cc6ad 100644 --- a/cpp/lib/common/sys/Thread.h +++ b/cpp/lib/common/sys/Thread.h @@ -44,11 +44,10 @@ class Thread inline static void yield(); inline Thread(); - inline explicit Thread(qpid::sys::Runnable*); - inline explicit Thread(qpid::sys::Runnable&); - + inline Thread(qpid::sys::Runnable*); + inline Thread(qpid::sys::Runnable&); + ~Thread(); inline void join(); - inline long id(); private: @@ -70,13 +69,17 @@ Thread::Thread() : thread(0) {} #ifdef USE_APR Thread::Thread(Runnable* runnable) { + apr_pool_t* tmp_pool = APRPool::get(); CHECK_APR_SUCCESS( - apr_thread_create(&thread, 0, runRunnable, runnable, APRPool::get())); + apr_thread_create(&thread, 0, runRunnable, runnable, tmp_pool)); + APRPool::free(tmp_pool); } Thread::Thread(Runnable& runnable) { + apr_pool_t* tmp_pool = APRPool::get(); CHECK_APR_SUCCESS( - apr_thread_create(&thread, 0, runRunnable, &runnable, APRPool::get())); + apr_thread_create(&thread, 0, runRunnable, &runnable, tmp_pool)); + APRPool::free(tmp_pool); } void Thread::join(){ @@ -92,9 +95,11 @@ long Thread::id() { Thread::Thread(apr_thread_t* t) : thread(t) {} Thread Thread::current(){ + apr_pool_t* tmp_pool = APRPool::get(); apr_thread_t* thr; apr_os_thread_t osthr = apr_os_thread_current(); - CHECK_APR_SUCCESS(apr_os_thread_put(&thr, &osthr, APRPool::get())); + CHECK_APR_SUCCESS(apr_os_thread_put(&thr, &osthr, tmp_pool)); + APRPool::free(tmp_pool); return Thread(thr); } @@ -123,6 +128,9 @@ long Thread::id() { return long(thread); } +Thread::~Thread() { +} + Thread::Thread(pthread_t thr) : thread(thr) {} Thread Thread::current() { diff --git a/cpp/lib/common/sys/apr/APRAcceptor.cpp b/cpp/lib/common/sys/apr/APRAcceptor.cpp index 6853833797..a427542fc3 100644 --- a/cpp/lib/common/sys/apr/APRAcceptor.cpp +++ b/cpp/lib/common/sys/apr/APRAcceptor.cpp @@ -20,6 +20,7 @@ */ #include <sys/Acceptor.h> #include <sys/SessionHandlerFactory.h> +#include <apr_pools.h> #include "LFProcessor.h" #include "LFSessionContext.h" #include "APRBase.h" @@ -32,6 +33,7 @@ class APRAcceptor : public Acceptor { public: APRAcceptor(int16_t port, int backlog, int threads, bool trace); + ~APRAcceptor(); virtual int16_t getPort() const; virtual void run(qpid::sys::SessionHandlerFactory* factory); virtual void shutdown(); @@ -46,6 +48,7 @@ class APRAcceptor : public Acceptor apr_socket_t* socket; volatile bool running; Mutex shutdownLock; + apr_pool_t* pool; }; // Define generic Acceptor::create() to return APRAcceptor. @@ -54,16 +57,22 @@ Acceptor::shared_ptr Acceptor::create(int16_t port, int backlog, int threads, bo return Acceptor::shared_ptr(new APRAcceptor(port, backlog, threads, trace)); } // Must define Acceptor virtual dtor. -Acceptor::~Acceptor() {} +Acceptor::~Acceptor() { +} + +APRAcceptor::~APRAcceptor() { + APRPool::free(pool); +} - APRAcceptor::APRAcceptor(int16_t port_, int backlog, int threads, bool trace_) : +APRAcceptor::APRAcceptor(int16_t port_, int backlog, int threads, bool trace_) : port(port_), trace(trace_), - processor(APRPool::get(), threads, 1000, 5000000) + processor(threads, 1000, 5000000) { + pool = APRPool::get(); apr_sockaddr_t* address; - CHECK_APR_SUCCESS(apr_sockaddr_info_get(&address, APR_ANYADDR, APR_UNSPEC, port, APR_IPV4_ADDR_OK, APRPool::get())); - CHECK_APR_SUCCESS(apr_socket_create(&socket, APR_INET, SOCK_STREAM, APR_PROTO_TCP, APRPool::get())); + CHECK_APR_SUCCESS(apr_sockaddr_info_get(&address, APR_ANYADDR, APR_UNSPEC, port, APR_IPV4_ADDR_OK, pool)); + CHECK_APR_SUCCESS(apr_socket_create(&socket, APR_INET, SOCK_STREAM, APR_PROTO_TCP, pool)); CHECK_APR_SUCCESS(apr_socket_opt_set(socket, APR_SO_REUSEADDR, 1)); CHECK_APR_SUCCESS(apr_socket_bind(socket, address)); CHECK_APR_SUCCESS(apr_socket_listen(socket, backlog)); @@ -81,7 +90,7 @@ void APRAcceptor::run(SessionHandlerFactory* factory) { std::cout << "Listening on port " << getPort() << "..." << std::endl; while(running){ apr_socket_t* client; - apr_status_t status = apr_socket_accept(&client, socket, APRPool::get()); + apr_status_t status = apr_socket_accept(&client, socket, pool); if(status == APR_SUCCESS){ //make this socket non-blocking: CHECK_APR_SUCCESS(apr_socket_timeout_set(client, 0)); @@ -89,7 +98,7 @@ void APRAcceptor::run(SessionHandlerFactory* factory) { CHECK_APR_SUCCESS(apr_socket_opt_set(client, APR_TCP_NODELAY, 1)); CHECK_APR_SUCCESS(apr_socket_opt_set(client, APR_SO_SNDBUF, 32768)); CHECK_APR_SUCCESS(apr_socket_opt_set(client, APR_SO_RCVBUF, 32768)); - LFSessionContext* session = new LFSessionContext(APRPool::get(), client, &processor, trace); + LFSessionContext* session = new LFSessionContext(client, &processor, trace); session->init(factory->create(session)); }else{ Mutex::ScopedLock locker(shutdownLock); diff --git a/cpp/lib/common/sys/apr/APRPool.cpp b/cpp/lib/common/sys/apr/APRPool.cpp index e8b71f6e8a..91481faf09 100644 --- a/cpp/lib/common/sys/apr/APRPool.cpp +++ b/cpp/lib/common/sys/apr/APRPool.cpp @@ -22,20 +22,57 @@ #include "APRPool.h" #include "APRBase.h" #include <boost/pool/detail/singleton.hpp> +#include <iostream> +#include <sstream> + using namespace qpid::sys; APRPool::APRPool(){ APRBase::increment(); + allocated_pools = new std::stack<apr_pool_t*>(); CHECK_APR_SUCCESS(apr_pool_create(&pool, NULL)); + CHECK_APR_SUCCESS(apr_thread_mutex_create(&poolGuard, APR_THREAD_MUTEX_NESTED, pool)); } APRPool::~APRPool(){ + while(allocated_pools->size() > 0) { + apr_pool_t* pool = allocated_pools->top(); + allocated_pools->pop(); + apr_pool_destroy(pool); + } apr_pool_destroy(pool); + apr_thread_mutex_destroy(poolGuard); + delete allocated_pools; APRBase::decrement(); } +void APRPool::free_pool(apr_pool_t* pool) { + CHECK_APR_SUCCESS(apr_thread_mutex_lock(poolGuard)); + allocated_pools->push(pool); + CHECK_APR_SUCCESS(apr_thread_mutex_unlock(poolGuard)); +} + +apr_pool_t* APRPool::allocate_pool() { + CHECK_APR_SUCCESS(apr_thread_mutex_lock(poolGuard)); + apr_pool_t* retval; + if (allocated_pools->size() == 0) { + CHECK_APR_SUCCESS(apr_pool_create(&retval, pool)); + } + else { + retval = allocated_pools->top(); + allocated_pools->pop(); + } + CHECK_APR_SUCCESS(apr_thread_mutex_unlock(poolGuard)); + return retval; +} + apr_pool_t* APRPool::get() { - return boost::details::pool::singleton_default<APRPool>::instance().pool; + return + boost::details::pool::singleton_default<APRPool>::instance().allocate_pool(); +} + +void APRPool::free(apr_pool_t* pool) { + boost::details::pool::singleton_default<APRPool>::instance().free_pool(pool); } diff --git a/cpp/lib/common/sys/apr/APRPool.h b/cpp/lib/common/sys/apr/APRPool.h index da7661fcfa..c22338599e 100644 --- a/cpp/lib/common/sys/apr/APRPool.h +++ b/cpp/lib/common/sys/apr/APRPool.h @@ -23,6 +23,8 @@ */ #include <boost/noncopyable.hpp> #include <apr_pools.h> +#include <apr_thread_mutex.h> +#include <stack> namespace qpid { namespace sys { @@ -33,12 +35,21 @@ class APRPool : private boost::noncopyable { public: APRPool(); ~APRPool(); + + apr_pool_t* allocate_pool(); + + void free_pool(apr_pool_t* pool); - /** Get singleton instance */ + /** Allocate pool */ static apr_pool_t* get(); + + /** Free pool */ + static void free(apr_pool_t* pool); private: apr_pool_t* pool; + apr_thread_mutex_t* poolGuard; + std::stack<apr_pool_t*>* allocated_pools; }; }} diff --git a/cpp/lib/common/sys/apr/LFProcessor.cpp b/cpp/lib/common/sys/apr/LFProcessor.cpp index 2b6fc92623..22b601e688 100644 --- a/cpp/lib/common/sys/apr/LFProcessor.cpp +++ b/cpp/lib/common/sys/apr/LFProcessor.cpp @@ -22,6 +22,7 @@ #include <QpidError.h> #include "LFProcessor.h" #include "APRBase.h" +#include "APRPool.h" #include "LFSessionContext.h" using namespace qpid::sys; @@ -30,7 +31,7 @@ using qpid::QpidError; // TODO aconway 2006-10-12: stopped is read outside locks. // -LFProcessor::LFProcessor(apr_pool_t* pool, int _workers, int _size, int _timeout) : +LFProcessor::LFProcessor(int _workers, int _size, int _timeout) : size(_size), timeout(_timeout), signalledCount(0), @@ -41,7 +42,7 @@ LFProcessor::LFProcessor(apr_pool_t* pool, int _workers, int _size, int _timeout workers(new Thread[_workers]), stopped(false) { - + pool = APRPool::get(); CHECK_APR_SUCCESS(apr_pollset_create(&pollset, size, pool, APR_POLLSET_THREADSAFE)); } @@ -50,6 +51,7 @@ LFProcessor::~LFProcessor(){ if (!stopped) stop(); delete[] workers; CHECK_APR_SUCCESS(apr_pollset_destroy(pollset)); + APRPool::free(pool); } void LFProcessor::start(){ diff --git a/cpp/lib/common/sys/apr/LFProcessor.h b/cpp/lib/common/sys/apr/LFProcessor.h index de90199472..0f4850ee08 100644 --- a/cpp/lib/common/sys/apr/LFProcessor.h +++ b/cpp/lib/common/sys/apr/LFProcessor.h @@ -57,6 +57,7 @@ namespace sys { qpid::sys::Mutex countLock; std::vector<LFSessionContext*> sessions; volatile bool stopped; + apr_pool_t* pool; const apr_pollfd_t* getNextEvent(); void waitToLead(); @@ -65,7 +66,7 @@ namespace sys { virtual void run(); public: - LFProcessor(apr_pool_t* pool, int workers, int size, int timeout); + LFProcessor(int workers, int size, int timeout); /** * Add the fd to the poll set. Relies on the client_data being * an instance of LFSessionContext. diff --git a/cpp/lib/common/sys/apr/LFSessionContext.cpp b/cpp/lib/common/sys/apr/LFSessionContext.cpp index 9b12747a97..a06b7537ee 100644 --- a/cpp/lib/common/sys/apr/LFSessionContext.cpp +++ b/cpp/lib/common/sys/apr/LFSessionContext.cpp @@ -20,6 +20,7 @@ */ #include "LFSessionContext.h" #include "APRBase.h" +#include "APRPool.h" #include <QpidError.h> #include <assert.h> @@ -27,7 +28,7 @@ using namespace qpid::sys; using namespace qpid::sys; using namespace qpid::framing; -LFSessionContext::LFSessionContext(apr_pool_t* _pool, apr_socket_t* _socket, +LFSessionContext::LFSessionContext(apr_socket_t* _socket, LFProcessor* const _processor, bool _debug) : debug(_debug), @@ -40,7 +41,7 @@ LFSessionContext::LFSessionContext(apr_pool_t* _pool, apr_socket_t* _socket, closing(false) { - fd.p = _pool; + fd.p = APRPool::get(); fd.desc_type = APR_POLL_SOCKET; fd.reqevents = APR_POLLIN; fd.client_data = this; @@ -156,6 +157,7 @@ void LFSessionContext::close(){ void LFSessionContext::handleClose(){ handler->closed(); + APRPool::free(fd.p); std::cout << "Session closed [" << &socket << "]" << std::endl; delete handler; delete this; diff --git a/cpp/lib/common/sys/apr/LFSessionContext.h b/cpp/lib/common/sys/apr/LFSessionContext.h index 9483cbe590..eeb8279d9a 100644 --- a/cpp/lib/common/sys/apr/LFSessionContext.h +++ b/cpp/lib/common/sys/apr/LFSessionContext.h @@ -66,7 +66,7 @@ class LFSessionContext : public virtual qpid::sys::SessionContext public: - LFSessionContext(apr_pool_t* pool, apr_socket_t* socket, + LFSessionContext(apr_socket_t* socket, LFProcessor* const processor, bool debug = false); virtual ~LFSessionContext(); diff --git a/cpp/lib/common/sys/apr/Socket.cpp b/cpp/lib/common/sys/apr/Socket.cpp index 5a5dc2a615..ab98c07479 100644 --- a/cpp/lib/common/sys/apr/Socket.cpp +++ b/cpp/lib/common/sys/apr/Socket.cpp @@ -30,10 +30,12 @@ using namespace qpid::sys; Socket Socket::createTcp() { Socket s; + apr_pool_t* pool = APRPool::get(); CHECK_APR_SUCCESS( apr_socket_create( &s.socket, APR_INET, SOCK_STREAM, APR_PROTO_TCP, - APRPool::get())); + pool)); + APRPool::free(pool); return s; } @@ -47,11 +49,13 @@ void Socket::setTimeout(Time interval) { void Socket::connect(const std::string& host, int port) { apr_sockaddr_t* address; + apr_pool_t* pool = APRPool::get(); CHECK_APR_SUCCESS( apr_sockaddr_info_get( &address, host.c_str(), APR_UNSPEC, port, APR_IPV4_ADDR_OK, - APRPool::get())); + pool)); CHECK_APR_SUCCESS(apr_socket_connect(socket, address)); + APRPool::free(pool); } void Socket::close() { diff --git a/cpp/lib/common/sys/apr/Thread.cpp b/cpp/lib/common/sys/apr/Thread.cpp index 5c4799aa96..997ff03ab3 100644 --- a/cpp/lib/common/sys/apr/Thread.cpp +++ b/cpp/lib/common/sys/apr/Thread.cpp @@ -20,6 +20,7 @@ */ #include <sys/Thread.h> +#include "APRPool.h" using namespace qpid::sys; using qpid::sys::Runnable; @@ -30,4 +31,7 @@ void* APR_THREAD_FUNC Thread::runRunnable(apr_thread_t* thread, void *data) { return NULL; } +Thread::~Thread() { +} + diff --git a/cpp/lib/common/sys/posix/Thread.cpp b/cpp/lib/common/sys/posix/Thread.cpp index f524799556..7291022dc6 100644 --- a/cpp/lib/common/sys/posix/Thread.cpp +++ b/cpp/lib/common/sys/posix/Thread.cpp @@ -26,3 +26,6 @@ void* qpid::sys::Thread::runRunnable(void* p) static_cast<Runnable*>(p)->run(); return 0; } + +Thread::~Thread() { +} |