summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2007-05-22 09:50:45 +0000
committerGordon Sim <gsim@apache.org>2007-05-22 09:50:45 +0000
commit13a373d975a60d45b2dd6de4c3cc821296330e16 (patch)
tree2ae3d0c69b5032e595281b283aeb2b254eea2d8b
parent89d3aef1e2888d415f11b1c47450a5cf5ad32b3e (diff)
downloadqpid-python-13a373d975a60d45b2dd6de4c3cc821296330e16.tar.gz
Patch submitted to qpid-dev by ksmith@redhat.com. Fixes concurrency issues arising from previous move to use singleton apr pool.
"My patch does three things: 1) Modifies the APRPool class to use alloc/free semantics for APR memory pools. Each time a caller calls APRPool::get() they'll their own pool reference. I've fixed up all the call sites I can find to also call APRPool::free() at the appropriate time. 2) Caches freed APR memory pools in a STL stack. This cuts down on the number of memory pools created overall. 3) As a result of doing #1 and #2 I've introduced a guard mutex around APRPool::get() and APRPool::free(). This is to prevent concurrent access to the memory pool cache. If it's too heavyweight, the mutex along with the caching mechanism could be removed entirely." git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/M2@540511 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--cpp/lib/broker/QueueRegistry.cpp4
-rw-r--r--cpp/lib/common/sys/Module.h4
-rw-r--r--cpp/lib/common/sys/Monitor.h4
-rw-r--r--cpp/lib/common/sys/Mutex.h10
-rw-r--r--cpp/lib/common/sys/Thread.h22
-rw-r--r--cpp/lib/common/sys/apr/APRAcceptor.cpp23
-rw-r--r--cpp/lib/common/sys/apr/APRPool.cpp39
-rw-r--r--cpp/lib/common/sys/apr/APRPool.h13
-rw-r--r--cpp/lib/common/sys/apr/LFProcessor.cpp6
-rw-r--r--cpp/lib/common/sys/apr/LFProcessor.h3
-rw-r--r--cpp/lib/common/sys/apr/LFSessionContext.cpp6
-rw-r--r--cpp/lib/common/sys/apr/LFSessionContext.h2
-rw-r--r--cpp/lib/common/sys/apr/Socket.cpp8
-rw-r--r--cpp/lib/common/sys/apr/Thread.cpp4
-rw-r--r--cpp/lib/common/sys/posix/Thread.cpp3
15 files changed, 121 insertions, 30 deletions
diff --git a/cpp/lib/broker/QueueRegistry.cpp b/cpp/lib/broker/QueueRegistry.cpp
index 2d1382ef09..c69d553b06 100644
--- a/cpp/lib/broker/QueueRegistry.cpp
+++ b/cpp/lib/broker/QueueRegistry.cpp
@@ -28,7 +28,9 @@ using namespace qpid::sys;
QueueRegistry::QueueRegistry(MessageStore* const _store) : counter(1), store(_store){}
-QueueRegistry::~QueueRegistry(){}
+QueueRegistry::~QueueRegistry()
+{
+}
std::pair<Queue::shared_ptr, bool>
QueueRegistry::declare(const string& declareName, bool durable,
diff --git a/cpp/lib/common/sys/Module.h b/cpp/lib/common/sys/Module.h
index 9bf5d6e1fc..64ed309afb 100644
--- a/cpp/lib/common/sys/Module.h
+++ b/cpp/lib/common/sys/Module.h
@@ -104,7 +104,9 @@ template <class T> Module<T>::~Module() throw()
template <class T> void Module<T>::load(const std::string& name)
{
- CHECK_APR_SUCCESS(apr_dso_load(&handle, name.c_str(), APRPool::get()));
+ apr_pool_t* pool = APRPool::get();
+ CHECK_APR_SUCCESS(apr_dso_load(&handle, name.c_str(), pool));
+ APRPool::free(pool);
}
template <class T> void Module<T>::unload()
diff --git a/cpp/lib/common/sys/Monitor.h b/cpp/lib/common/sys/Monitor.h
index e58931e699..c615a97aa3 100644
--- a/cpp/lib/common/sys/Monitor.h
+++ b/cpp/lib/common/sys/Monitor.h
@@ -60,7 +60,9 @@ class Monitor : public Mutex
#ifdef USE_APR
Monitor::Monitor() {
- CHECK_APR_SUCCESS(apr_thread_cond_create(&condition, APRPool::get()));
+ apr_pool_t* pool = APRPool::get();
+ CHECK_APR_SUCCESS(apr_thread_cond_create(&condition, pool));
+ APRPool::free(pool);
}
Monitor::~Monitor() {
diff --git a/cpp/lib/common/sys/Mutex.h b/cpp/lib/common/sys/Mutex.h
index 4022da2f6e..9b1412aa09 100644
--- a/cpp/lib/common/sys/Mutex.h
+++ b/cpp/lib/common/sys/Mutex.h
@@ -21,8 +21,9 @@
#ifdef USE_APR
# include <apr_thread_mutex.h>
-# include <apr/APRBase.h>
-# include <apr/APRPool.h>
+# include <apr_pools.h>
+# include "apr/APRBase.h"
+# include "apr/APRPool.h"
#else
# include <pthread.h>
# include <posix/check.h>
@@ -62,6 +63,7 @@ class Mutex : private boost::noncopyable {
protected:
#ifdef USE_APR
apr_thread_mutex_t* mutex;
+ apr_pool_t* pool;
#else
pthread_mutex_t mutex;
#endif
@@ -71,11 +73,13 @@ class Mutex : private boost::noncopyable {
// APR ================================================================
Mutex::Mutex() {
- CHECK_APR_SUCCESS(apr_thread_mutex_create(&mutex, APR_THREAD_MUTEX_NESTED, APRPool::get()));
+ pool = APRPool::get();
+ CHECK_APR_SUCCESS(apr_thread_mutex_create(&mutex, APR_THREAD_MUTEX_NESTED, pool));
}
Mutex::~Mutex(){
CHECK_APR_SUCCESS(apr_thread_mutex_destroy(mutex));
+ APRPool::free(pool);
}
void Mutex::lock() {
diff --git a/cpp/lib/common/sys/Thread.h b/cpp/lib/common/sys/Thread.h
index 47b95b6234..c14c7cc6ad 100644
--- a/cpp/lib/common/sys/Thread.h
+++ b/cpp/lib/common/sys/Thread.h
@@ -44,11 +44,10 @@ class Thread
inline static void yield();
inline Thread();
- inline explicit Thread(qpid::sys::Runnable*);
- inline explicit Thread(qpid::sys::Runnable&);
-
+ inline Thread(qpid::sys::Runnable*);
+ inline Thread(qpid::sys::Runnable&);
+ ~Thread();
inline void join();
-
inline long id();
private:
@@ -70,13 +69,17 @@ Thread::Thread() : thread(0) {}
#ifdef USE_APR
Thread::Thread(Runnable* runnable) {
+ apr_pool_t* tmp_pool = APRPool::get();
CHECK_APR_SUCCESS(
- apr_thread_create(&thread, 0, runRunnable, runnable, APRPool::get()));
+ apr_thread_create(&thread, 0, runRunnable, runnable, tmp_pool));
+ APRPool::free(tmp_pool);
}
Thread::Thread(Runnable& runnable) {
+ apr_pool_t* tmp_pool = APRPool::get();
CHECK_APR_SUCCESS(
- apr_thread_create(&thread, 0, runRunnable, &runnable, APRPool::get()));
+ apr_thread_create(&thread, 0, runRunnable, &runnable, tmp_pool));
+ APRPool::free(tmp_pool);
}
void Thread::join(){
@@ -92,9 +95,11 @@ long Thread::id() {
Thread::Thread(apr_thread_t* t) : thread(t) {}
Thread Thread::current(){
+ apr_pool_t* tmp_pool = APRPool::get();
apr_thread_t* thr;
apr_os_thread_t osthr = apr_os_thread_current();
- CHECK_APR_SUCCESS(apr_os_thread_put(&thr, &osthr, APRPool::get()));
+ CHECK_APR_SUCCESS(apr_os_thread_put(&thr, &osthr, tmp_pool));
+ APRPool::free(tmp_pool);
return Thread(thr);
}
@@ -123,6 +128,9 @@ long Thread::id() {
return long(thread);
}
+Thread::~Thread() {
+}
+
Thread::Thread(pthread_t thr) : thread(thr) {}
Thread Thread::current() {
diff --git a/cpp/lib/common/sys/apr/APRAcceptor.cpp b/cpp/lib/common/sys/apr/APRAcceptor.cpp
index 6853833797..a427542fc3 100644
--- a/cpp/lib/common/sys/apr/APRAcceptor.cpp
+++ b/cpp/lib/common/sys/apr/APRAcceptor.cpp
@@ -20,6 +20,7 @@
*/
#include <sys/Acceptor.h>
#include <sys/SessionHandlerFactory.h>
+#include <apr_pools.h>
#include "LFProcessor.h"
#include "LFSessionContext.h"
#include "APRBase.h"
@@ -32,6 +33,7 @@ class APRAcceptor : public Acceptor
{
public:
APRAcceptor(int16_t port, int backlog, int threads, bool trace);
+ ~APRAcceptor();
virtual int16_t getPort() const;
virtual void run(qpid::sys::SessionHandlerFactory* factory);
virtual void shutdown();
@@ -46,6 +48,7 @@ class APRAcceptor : public Acceptor
apr_socket_t* socket;
volatile bool running;
Mutex shutdownLock;
+ apr_pool_t* pool;
};
// Define generic Acceptor::create() to return APRAcceptor.
@@ -54,16 +57,22 @@ Acceptor::shared_ptr Acceptor::create(int16_t port, int backlog, int threads, bo
return Acceptor::shared_ptr(new APRAcceptor(port, backlog, threads, trace));
}
// Must define Acceptor virtual dtor.
-Acceptor::~Acceptor() {}
+Acceptor::~Acceptor() {
+}
+
+APRAcceptor::~APRAcceptor() {
+ APRPool::free(pool);
+}
- APRAcceptor::APRAcceptor(int16_t port_, int backlog, int threads, bool trace_) :
+APRAcceptor::APRAcceptor(int16_t port_, int backlog, int threads, bool trace_) :
port(port_),
trace(trace_),
- processor(APRPool::get(), threads, 1000, 5000000)
+ processor(threads, 1000, 5000000)
{
+ pool = APRPool::get();
apr_sockaddr_t* address;
- CHECK_APR_SUCCESS(apr_sockaddr_info_get(&address, APR_ANYADDR, APR_UNSPEC, port, APR_IPV4_ADDR_OK, APRPool::get()));
- CHECK_APR_SUCCESS(apr_socket_create(&socket, APR_INET, SOCK_STREAM, APR_PROTO_TCP, APRPool::get()));
+ CHECK_APR_SUCCESS(apr_sockaddr_info_get(&address, APR_ANYADDR, APR_UNSPEC, port, APR_IPV4_ADDR_OK, pool));
+ CHECK_APR_SUCCESS(apr_socket_create(&socket, APR_INET, SOCK_STREAM, APR_PROTO_TCP, pool));
CHECK_APR_SUCCESS(apr_socket_opt_set(socket, APR_SO_REUSEADDR, 1));
CHECK_APR_SUCCESS(apr_socket_bind(socket, address));
CHECK_APR_SUCCESS(apr_socket_listen(socket, backlog));
@@ -81,7 +90,7 @@ void APRAcceptor::run(SessionHandlerFactory* factory) {
std::cout << "Listening on port " << getPort() << "..." << std::endl;
while(running){
apr_socket_t* client;
- apr_status_t status = apr_socket_accept(&client, socket, APRPool::get());
+ apr_status_t status = apr_socket_accept(&client, socket, pool);
if(status == APR_SUCCESS){
//make this socket non-blocking:
CHECK_APR_SUCCESS(apr_socket_timeout_set(client, 0));
@@ -89,7 +98,7 @@ void APRAcceptor::run(SessionHandlerFactory* factory) {
CHECK_APR_SUCCESS(apr_socket_opt_set(client, APR_TCP_NODELAY, 1));
CHECK_APR_SUCCESS(apr_socket_opt_set(client, APR_SO_SNDBUF, 32768));
CHECK_APR_SUCCESS(apr_socket_opt_set(client, APR_SO_RCVBUF, 32768));
- LFSessionContext* session = new LFSessionContext(APRPool::get(), client, &processor, trace);
+ LFSessionContext* session = new LFSessionContext(client, &processor, trace);
session->init(factory->create(session));
}else{
Mutex::ScopedLock locker(shutdownLock);
diff --git a/cpp/lib/common/sys/apr/APRPool.cpp b/cpp/lib/common/sys/apr/APRPool.cpp
index e8b71f6e8a..91481faf09 100644
--- a/cpp/lib/common/sys/apr/APRPool.cpp
+++ b/cpp/lib/common/sys/apr/APRPool.cpp
@@ -22,20 +22,57 @@
#include "APRPool.h"
#include "APRBase.h"
#include <boost/pool/detail/singleton.hpp>
+#include <iostream>
+#include <sstream>
+
using namespace qpid::sys;
APRPool::APRPool(){
APRBase::increment();
+ allocated_pools = new std::stack<apr_pool_t*>();
CHECK_APR_SUCCESS(apr_pool_create(&pool, NULL));
+ CHECK_APR_SUCCESS(apr_thread_mutex_create(&poolGuard, APR_THREAD_MUTEX_NESTED, pool));
}
APRPool::~APRPool(){
+ while(allocated_pools->size() > 0) {
+ apr_pool_t* pool = allocated_pools->top();
+ allocated_pools->pop();
+ apr_pool_destroy(pool);
+ }
apr_pool_destroy(pool);
+ apr_thread_mutex_destroy(poolGuard);
+ delete allocated_pools;
APRBase::decrement();
}
+void APRPool::free_pool(apr_pool_t* pool) {
+ CHECK_APR_SUCCESS(apr_thread_mutex_lock(poolGuard));
+ allocated_pools->push(pool);
+ CHECK_APR_SUCCESS(apr_thread_mutex_unlock(poolGuard));
+}
+
+apr_pool_t* APRPool::allocate_pool() {
+ CHECK_APR_SUCCESS(apr_thread_mutex_lock(poolGuard));
+ apr_pool_t* retval;
+ if (allocated_pools->size() == 0) {
+ CHECK_APR_SUCCESS(apr_pool_create(&retval, pool));
+ }
+ else {
+ retval = allocated_pools->top();
+ allocated_pools->pop();
+ }
+ CHECK_APR_SUCCESS(apr_thread_mutex_unlock(poolGuard));
+ return retval;
+}
+
apr_pool_t* APRPool::get() {
- return boost::details::pool::singleton_default<APRPool>::instance().pool;
+ return
+ boost::details::pool::singleton_default<APRPool>::instance().allocate_pool();
+}
+
+void APRPool::free(apr_pool_t* pool) {
+ boost::details::pool::singleton_default<APRPool>::instance().free_pool(pool);
}
diff --git a/cpp/lib/common/sys/apr/APRPool.h b/cpp/lib/common/sys/apr/APRPool.h
index da7661fcfa..c22338599e 100644
--- a/cpp/lib/common/sys/apr/APRPool.h
+++ b/cpp/lib/common/sys/apr/APRPool.h
@@ -23,6 +23,8 @@
*/
#include <boost/noncopyable.hpp>
#include <apr_pools.h>
+#include <apr_thread_mutex.h>
+#include <stack>
namespace qpid {
namespace sys {
@@ -33,12 +35,21 @@ class APRPool : private boost::noncopyable {
public:
APRPool();
~APRPool();
+
+ apr_pool_t* allocate_pool();
+
+ void free_pool(apr_pool_t* pool);
- /** Get singleton instance */
+ /** Allocate pool */
static apr_pool_t* get();
+
+ /** Free pool */
+ static void free(apr_pool_t* pool);
private:
apr_pool_t* pool;
+ apr_thread_mutex_t* poolGuard;
+ std::stack<apr_pool_t*>* allocated_pools;
};
}}
diff --git a/cpp/lib/common/sys/apr/LFProcessor.cpp b/cpp/lib/common/sys/apr/LFProcessor.cpp
index 2b6fc92623..22b601e688 100644
--- a/cpp/lib/common/sys/apr/LFProcessor.cpp
+++ b/cpp/lib/common/sys/apr/LFProcessor.cpp
@@ -22,6 +22,7 @@
#include <QpidError.h>
#include "LFProcessor.h"
#include "APRBase.h"
+#include "APRPool.h"
#include "LFSessionContext.h"
using namespace qpid::sys;
@@ -30,7 +31,7 @@ using qpid::QpidError;
// TODO aconway 2006-10-12: stopped is read outside locks.
//
-LFProcessor::LFProcessor(apr_pool_t* pool, int _workers, int _size, int _timeout) :
+LFProcessor::LFProcessor(int _workers, int _size, int _timeout) :
size(_size),
timeout(_timeout),
signalledCount(0),
@@ -41,7 +42,7 @@ LFProcessor::LFProcessor(apr_pool_t* pool, int _workers, int _size, int _timeout
workers(new Thread[_workers]),
stopped(false)
{
-
+ pool = APRPool::get();
CHECK_APR_SUCCESS(apr_pollset_create(&pollset, size, pool, APR_POLLSET_THREADSAFE));
}
@@ -50,6 +51,7 @@ LFProcessor::~LFProcessor(){
if (!stopped) stop();
delete[] workers;
CHECK_APR_SUCCESS(apr_pollset_destroy(pollset));
+ APRPool::free(pool);
}
void LFProcessor::start(){
diff --git a/cpp/lib/common/sys/apr/LFProcessor.h b/cpp/lib/common/sys/apr/LFProcessor.h
index de90199472..0f4850ee08 100644
--- a/cpp/lib/common/sys/apr/LFProcessor.h
+++ b/cpp/lib/common/sys/apr/LFProcessor.h
@@ -57,6 +57,7 @@ namespace sys {
qpid::sys::Mutex countLock;
std::vector<LFSessionContext*> sessions;
volatile bool stopped;
+ apr_pool_t* pool;
const apr_pollfd_t* getNextEvent();
void waitToLead();
@@ -65,7 +66,7 @@ namespace sys {
virtual void run();
public:
- LFProcessor(apr_pool_t* pool, int workers, int size, int timeout);
+ LFProcessor(int workers, int size, int timeout);
/**
* Add the fd to the poll set. Relies on the client_data being
* an instance of LFSessionContext.
diff --git a/cpp/lib/common/sys/apr/LFSessionContext.cpp b/cpp/lib/common/sys/apr/LFSessionContext.cpp
index 9b12747a97..a06b7537ee 100644
--- a/cpp/lib/common/sys/apr/LFSessionContext.cpp
+++ b/cpp/lib/common/sys/apr/LFSessionContext.cpp
@@ -20,6 +20,7 @@
*/
#include "LFSessionContext.h"
#include "APRBase.h"
+#include "APRPool.h"
#include <QpidError.h>
#include <assert.h>
@@ -27,7 +28,7 @@ using namespace qpid::sys;
using namespace qpid::sys;
using namespace qpid::framing;
-LFSessionContext::LFSessionContext(apr_pool_t* _pool, apr_socket_t* _socket,
+LFSessionContext::LFSessionContext(apr_socket_t* _socket,
LFProcessor* const _processor,
bool _debug) :
debug(_debug),
@@ -40,7 +41,7 @@ LFSessionContext::LFSessionContext(apr_pool_t* _pool, apr_socket_t* _socket,
closing(false)
{
- fd.p = _pool;
+ fd.p = APRPool::get();
fd.desc_type = APR_POLL_SOCKET;
fd.reqevents = APR_POLLIN;
fd.client_data = this;
@@ -156,6 +157,7 @@ void LFSessionContext::close(){
void LFSessionContext::handleClose(){
handler->closed();
+ APRPool::free(fd.p);
std::cout << "Session closed [" << &socket << "]" << std::endl;
delete handler;
delete this;
diff --git a/cpp/lib/common/sys/apr/LFSessionContext.h b/cpp/lib/common/sys/apr/LFSessionContext.h
index 9483cbe590..eeb8279d9a 100644
--- a/cpp/lib/common/sys/apr/LFSessionContext.h
+++ b/cpp/lib/common/sys/apr/LFSessionContext.h
@@ -66,7 +66,7 @@ class LFSessionContext : public virtual qpid::sys::SessionContext
public:
- LFSessionContext(apr_pool_t* pool, apr_socket_t* socket,
+ LFSessionContext(apr_socket_t* socket,
LFProcessor* const processor,
bool debug = false);
virtual ~LFSessionContext();
diff --git a/cpp/lib/common/sys/apr/Socket.cpp b/cpp/lib/common/sys/apr/Socket.cpp
index 5a5dc2a615..ab98c07479 100644
--- a/cpp/lib/common/sys/apr/Socket.cpp
+++ b/cpp/lib/common/sys/apr/Socket.cpp
@@ -30,10 +30,12 @@ using namespace qpid::sys;
Socket Socket::createTcp() {
Socket s;
+ apr_pool_t* pool = APRPool::get();
CHECK_APR_SUCCESS(
apr_socket_create(
&s.socket, APR_INET, SOCK_STREAM, APR_PROTO_TCP,
- APRPool::get()));
+ pool));
+ APRPool::free(pool);
return s;
}
@@ -47,11 +49,13 @@ void Socket::setTimeout(Time interval) {
void Socket::connect(const std::string& host, int port) {
apr_sockaddr_t* address;
+ apr_pool_t* pool = APRPool::get();
CHECK_APR_SUCCESS(
apr_sockaddr_info_get(
&address, host.c_str(), APR_UNSPEC, port, APR_IPV4_ADDR_OK,
- APRPool::get()));
+ pool));
CHECK_APR_SUCCESS(apr_socket_connect(socket, address));
+ APRPool::free(pool);
}
void Socket::close() {
diff --git a/cpp/lib/common/sys/apr/Thread.cpp b/cpp/lib/common/sys/apr/Thread.cpp
index 5c4799aa96..997ff03ab3 100644
--- a/cpp/lib/common/sys/apr/Thread.cpp
+++ b/cpp/lib/common/sys/apr/Thread.cpp
@@ -20,6 +20,7 @@
*/
#include <sys/Thread.h>
+#include "APRPool.h"
using namespace qpid::sys;
using qpid::sys::Runnable;
@@ -30,4 +31,7 @@ void* APR_THREAD_FUNC Thread::runRunnable(apr_thread_t* thread, void *data) {
return NULL;
}
+Thread::~Thread() {
+}
+
diff --git a/cpp/lib/common/sys/posix/Thread.cpp b/cpp/lib/common/sys/posix/Thread.cpp
index f524799556..7291022dc6 100644
--- a/cpp/lib/common/sys/posix/Thread.cpp
+++ b/cpp/lib/common/sys/posix/Thread.cpp
@@ -26,3 +26,6 @@ void* qpid::sys::Thread::runRunnable(void* p)
static_cast<Runnable*>(p)->run();
return 0;
}
+
+Thread::~Thread() {
+}