summaryrefslogtreecommitdiff
path: root/cpp/lib/common/sys/apr
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2007-05-22 09:50:45 +0000
committerGordon Sim <gsim@apache.org>2007-05-22 09:50:45 +0000
commit13a373d975a60d45b2dd6de4c3cc821296330e16 (patch)
tree2ae3d0c69b5032e595281b283aeb2b254eea2d8b /cpp/lib/common/sys/apr
parent89d3aef1e2888d415f11b1c47450a5cf5ad32b3e (diff)
downloadqpid-python-13a373d975a60d45b2dd6de4c3cc821296330e16.tar.gz
Patch submitted to qpid-dev by ksmith@redhat.com. Fixes concurrency issues arising from previous move to use singleton apr pool.
"My patch does three things: 1) Modifies the APRPool class to use alloc/free semantics for APR memory pools. Each time a caller calls APRPool::get() they'll their own pool reference. I've fixed up all the call sites I can find to also call APRPool::free() at the appropriate time. 2) Caches freed APR memory pools in a STL stack. This cuts down on the number of memory pools created overall. 3) As a result of doing #1 and #2 I've introduced a guard mutex around APRPool::get() and APRPool::free(). This is to prevent concurrent access to the memory pool cache. If it's too heavyweight, the mutex along with the caching mechanism could be removed entirely." git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/M2@540511 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/lib/common/sys/apr')
-rw-r--r--cpp/lib/common/sys/apr/APRAcceptor.cpp23
-rw-r--r--cpp/lib/common/sys/apr/APRPool.cpp39
-rw-r--r--cpp/lib/common/sys/apr/APRPool.h13
-rw-r--r--cpp/lib/common/sys/apr/LFProcessor.cpp6
-rw-r--r--cpp/lib/common/sys/apr/LFProcessor.h3
-rw-r--r--cpp/lib/common/sys/apr/LFSessionContext.cpp6
-rw-r--r--cpp/lib/common/sys/apr/LFSessionContext.h2
-rw-r--r--cpp/lib/common/sys/apr/Socket.cpp8
-rw-r--r--cpp/lib/common/sys/apr/Thread.cpp4
9 files changed, 87 insertions, 17 deletions
diff --git a/cpp/lib/common/sys/apr/APRAcceptor.cpp b/cpp/lib/common/sys/apr/APRAcceptor.cpp
index 6853833797..a427542fc3 100644
--- a/cpp/lib/common/sys/apr/APRAcceptor.cpp
+++ b/cpp/lib/common/sys/apr/APRAcceptor.cpp
@@ -20,6 +20,7 @@
*/
#include <sys/Acceptor.h>
#include <sys/SessionHandlerFactory.h>
+#include <apr_pools.h>
#include "LFProcessor.h"
#include "LFSessionContext.h"
#include "APRBase.h"
@@ -32,6 +33,7 @@ class APRAcceptor : public Acceptor
{
public:
APRAcceptor(int16_t port, int backlog, int threads, bool trace);
+ ~APRAcceptor();
virtual int16_t getPort() const;
virtual void run(qpid::sys::SessionHandlerFactory* factory);
virtual void shutdown();
@@ -46,6 +48,7 @@ class APRAcceptor : public Acceptor
apr_socket_t* socket;
volatile bool running;
Mutex shutdownLock;
+ apr_pool_t* pool;
};
// Define generic Acceptor::create() to return APRAcceptor.
@@ -54,16 +57,22 @@ Acceptor::shared_ptr Acceptor::create(int16_t port, int backlog, int threads, bo
return Acceptor::shared_ptr(new APRAcceptor(port, backlog, threads, trace));
}
// Must define Acceptor virtual dtor.
-Acceptor::~Acceptor() {}
+Acceptor::~Acceptor() {
+}
+
+APRAcceptor::~APRAcceptor() {
+ APRPool::free(pool);
+}
- APRAcceptor::APRAcceptor(int16_t port_, int backlog, int threads, bool trace_) :
+APRAcceptor::APRAcceptor(int16_t port_, int backlog, int threads, bool trace_) :
port(port_),
trace(trace_),
- processor(APRPool::get(), threads, 1000, 5000000)
+ processor(threads, 1000, 5000000)
{
+ pool = APRPool::get();
apr_sockaddr_t* address;
- CHECK_APR_SUCCESS(apr_sockaddr_info_get(&address, APR_ANYADDR, APR_UNSPEC, port, APR_IPV4_ADDR_OK, APRPool::get()));
- CHECK_APR_SUCCESS(apr_socket_create(&socket, APR_INET, SOCK_STREAM, APR_PROTO_TCP, APRPool::get()));
+ CHECK_APR_SUCCESS(apr_sockaddr_info_get(&address, APR_ANYADDR, APR_UNSPEC, port, APR_IPV4_ADDR_OK, pool));
+ CHECK_APR_SUCCESS(apr_socket_create(&socket, APR_INET, SOCK_STREAM, APR_PROTO_TCP, pool));
CHECK_APR_SUCCESS(apr_socket_opt_set(socket, APR_SO_REUSEADDR, 1));
CHECK_APR_SUCCESS(apr_socket_bind(socket, address));
CHECK_APR_SUCCESS(apr_socket_listen(socket, backlog));
@@ -81,7 +90,7 @@ void APRAcceptor::run(SessionHandlerFactory* factory) {
std::cout << "Listening on port " << getPort() << "..." << std::endl;
while(running){
apr_socket_t* client;
- apr_status_t status = apr_socket_accept(&client, socket, APRPool::get());
+ apr_status_t status = apr_socket_accept(&client, socket, pool);
if(status == APR_SUCCESS){
//make this socket non-blocking:
CHECK_APR_SUCCESS(apr_socket_timeout_set(client, 0));
@@ -89,7 +98,7 @@ void APRAcceptor::run(SessionHandlerFactory* factory) {
CHECK_APR_SUCCESS(apr_socket_opt_set(client, APR_TCP_NODELAY, 1));
CHECK_APR_SUCCESS(apr_socket_opt_set(client, APR_SO_SNDBUF, 32768));
CHECK_APR_SUCCESS(apr_socket_opt_set(client, APR_SO_RCVBUF, 32768));
- LFSessionContext* session = new LFSessionContext(APRPool::get(), client, &processor, trace);
+ LFSessionContext* session = new LFSessionContext(client, &processor, trace);
session->init(factory->create(session));
}else{
Mutex::ScopedLock locker(shutdownLock);
diff --git a/cpp/lib/common/sys/apr/APRPool.cpp b/cpp/lib/common/sys/apr/APRPool.cpp
index e8b71f6e8a..91481faf09 100644
--- a/cpp/lib/common/sys/apr/APRPool.cpp
+++ b/cpp/lib/common/sys/apr/APRPool.cpp
@@ -22,20 +22,57 @@
#include "APRPool.h"
#include "APRBase.h"
#include <boost/pool/detail/singleton.hpp>
+#include <iostream>
+#include <sstream>
+
using namespace qpid::sys;
APRPool::APRPool(){
APRBase::increment();
+ allocated_pools = new std::stack<apr_pool_t*>();
CHECK_APR_SUCCESS(apr_pool_create(&pool, NULL));
+ CHECK_APR_SUCCESS(apr_thread_mutex_create(&poolGuard, APR_THREAD_MUTEX_NESTED, pool));
}
APRPool::~APRPool(){
+ while(allocated_pools->size() > 0) {
+ apr_pool_t* pool = allocated_pools->top();
+ allocated_pools->pop();
+ apr_pool_destroy(pool);
+ }
apr_pool_destroy(pool);
+ apr_thread_mutex_destroy(poolGuard);
+ delete allocated_pools;
APRBase::decrement();
}
+void APRPool::free_pool(apr_pool_t* pool) {
+ CHECK_APR_SUCCESS(apr_thread_mutex_lock(poolGuard));
+ allocated_pools->push(pool);
+ CHECK_APR_SUCCESS(apr_thread_mutex_unlock(poolGuard));
+}
+
+apr_pool_t* APRPool::allocate_pool() {
+ CHECK_APR_SUCCESS(apr_thread_mutex_lock(poolGuard));
+ apr_pool_t* retval;
+ if (allocated_pools->size() == 0) {
+ CHECK_APR_SUCCESS(apr_pool_create(&retval, pool));
+ }
+ else {
+ retval = allocated_pools->top();
+ allocated_pools->pop();
+ }
+ CHECK_APR_SUCCESS(apr_thread_mutex_unlock(poolGuard));
+ return retval;
+}
+
apr_pool_t* APRPool::get() {
- return boost::details::pool::singleton_default<APRPool>::instance().pool;
+ return
+ boost::details::pool::singleton_default<APRPool>::instance().allocate_pool();
+}
+
+void APRPool::free(apr_pool_t* pool) {
+ boost::details::pool::singleton_default<APRPool>::instance().free_pool(pool);
}
diff --git a/cpp/lib/common/sys/apr/APRPool.h b/cpp/lib/common/sys/apr/APRPool.h
index da7661fcfa..c22338599e 100644
--- a/cpp/lib/common/sys/apr/APRPool.h
+++ b/cpp/lib/common/sys/apr/APRPool.h
@@ -23,6 +23,8 @@
*/
#include <boost/noncopyable.hpp>
#include <apr_pools.h>
+#include <apr_thread_mutex.h>
+#include <stack>
namespace qpid {
namespace sys {
@@ -33,12 +35,21 @@ class APRPool : private boost::noncopyable {
public:
APRPool();
~APRPool();
+
+ apr_pool_t* allocate_pool();
+
+ void free_pool(apr_pool_t* pool);
- /** Get singleton instance */
+ /** Allocate pool */
static apr_pool_t* get();
+
+ /** Free pool */
+ static void free(apr_pool_t* pool);
private:
apr_pool_t* pool;
+ apr_thread_mutex_t* poolGuard;
+ std::stack<apr_pool_t*>* allocated_pools;
};
}}
diff --git a/cpp/lib/common/sys/apr/LFProcessor.cpp b/cpp/lib/common/sys/apr/LFProcessor.cpp
index 2b6fc92623..22b601e688 100644
--- a/cpp/lib/common/sys/apr/LFProcessor.cpp
+++ b/cpp/lib/common/sys/apr/LFProcessor.cpp
@@ -22,6 +22,7 @@
#include <QpidError.h>
#include "LFProcessor.h"
#include "APRBase.h"
+#include "APRPool.h"
#include "LFSessionContext.h"
using namespace qpid::sys;
@@ -30,7 +31,7 @@ using qpid::QpidError;
// TODO aconway 2006-10-12: stopped is read outside locks.
//
-LFProcessor::LFProcessor(apr_pool_t* pool, int _workers, int _size, int _timeout) :
+LFProcessor::LFProcessor(int _workers, int _size, int _timeout) :
size(_size),
timeout(_timeout),
signalledCount(0),
@@ -41,7 +42,7 @@ LFProcessor::LFProcessor(apr_pool_t* pool, int _workers, int _size, int _timeout
workers(new Thread[_workers]),
stopped(false)
{
-
+ pool = APRPool::get();
CHECK_APR_SUCCESS(apr_pollset_create(&pollset, size, pool, APR_POLLSET_THREADSAFE));
}
@@ -50,6 +51,7 @@ LFProcessor::~LFProcessor(){
if (!stopped) stop();
delete[] workers;
CHECK_APR_SUCCESS(apr_pollset_destroy(pollset));
+ APRPool::free(pool);
}
void LFProcessor::start(){
diff --git a/cpp/lib/common/sys/apr/LFProcessor.h b/cpp/lib/common/sys/apr/LFProcessor.h
index de90199472..0f4850ee08 100644
--- a/cpp/lib/common/sys/apr/LFProcessor.h
+++ b/cpp/lib/common/sys/apr/LFProcessor.h
@@ -57,6 +57,7 @@ namespace sys {
qpid::sys::Mutex countLock;
std::vector<LFSessionContext*> sessions;
volatile bool stopped;
+ apr_pool_t* pool;
const apr_pollfd_t* getNextEvent();
void waitToLead();
@@ -65,7 +66,7 @@ namespace sys {
virtual void run();
public:
- LFProcessor(apr_pool_t* pool, int workers, int size, int timeout);
+ LFProcessor(int workers, int size, int timeout);
/**
* Add the fd to the poll set. Relies on the client_data being
* an instance of LFSessionContext.
diff --git a/cpp/lib/common/sys/apr/LFSessionContext.cpp b/cpp/lib/common/sys/apr/LFSessionContext.cpp
index 9b12747a97..a06b7537ee 100644
--- a/cpp/lib/common/sys/apr/LFSessionContext.cpp
+++ b/cpp/lib/common/sys/apr/LFSessionContext.cpp
@@ -20,6 +20,7 @@
*/
#include "LFSessionContext.h"
#include "APRBase.h"
+#include "APRPool.h"
#include <QpidError.h>
#include <assert.h>
@@ -27,7 +28,7 @@ using namespace qpid::sys;
using namespace qpid::sys;
using namespace qpid::framing;
-LFSessionContext::LFSessionContext(apr_pool_t* _pool, apr_socket_t* _socket,
+LFSessionContext::LFSessionContext(apr_socket_t* _socket,
LFProcessor* const _processor,
bool _debug) :
debug(_debug),
@@ -40,7 +41,7 @@ LFSessionContext::LFSessionContext(apr_pool_t* _pool, apr_socket_t* _socket,
closing(false)
{
- fd.p = _pool;
+ fd.p = APRPool::get();
fd.desc_type = APR_POLL_SOCKET;
fd.reqevents = APR_POLLIN;
fd.client_data = this;
@@ -156,6 +157,7 @@ void LFSessionContext::close(){
void LFSessionContext::handleClose(){
handler->closed();
+ APRPool::free(fd.p);
std::cout << "Session closed [" << &socket << "]" << std::endl;
delete handler;
delete this;
diff --git a/cpp/lib/common/sys/apr/LFSessionContext.h b/cpp/lib/common/sys/apr/LFSessionContext.h
index 9483cbe590..eeb8279d9a 100644
--- a/cpp/lib/common/sys/apr/LFSessionContext.h
+++ b/cpp/lib/common/sys/apr/LFSessionContext.h
@@ -66,7 +66,7 @@ class LFSessionContext : public virtual qpid::sys::SessionContext
public:
- LFSessionContext(apr_pool_t* pool, apr_socket_t* socket,
+ LFSessionContext(apr_socket_t* socket,
LFProcessor* const processor,
bool debug = false);
virtual ~LFSessionContext();
diff --git a/cpp/lib/common/sys/apr/Socket.cpp b/cpp/lib/common/sys/apr/Socket.cpp
index 5a5dc2a615..ab98c07479 100644
--- a/cpp/lib/common/sys/apr/Socket.cpp
+++ b/cpp/lib/common/sys/apr/Socket.cpp
@@ -30,10 +30,12 @@ using namespace qpid::sys;
Socket Socket::createTcp() {
Socket s;
+ apr_pool_t* pool = APRPool::get();
CHECK_APR_SUCCESS(
apr_socket_create(
&s.socket, APR_INET, SOCK_STREAM, APR_PROTO_TCP,
- APRPool::get()));
+ pool));
+ APRPool::free(pool);
return s;
}
@@ -47,11 +49,13 @@ void Socket::setTimeout(Time interval) {
void Socket::connect(const std::string& host, int port) {
apr_sockaddr_t* address;
+ apr_pool_t* pool = APRPool::get();
CHECK_APR_SUCCESS(
apr_sockaddr_info_get(
&address, host.c_str(), APR_UNSPEC, port, APR_IPV4_ADDR_OK,
- APRPool::get()));
+ pool));
CHECK_APR_SUCCESS(apr_socket_connect(socket, address));
+ APRPool::free(pool);
}
void Socket::close() {
diff --git a/cpp/lib/common/sys/apr/Thread.cpp b/cpp/lib/common/sys/apr/Thread.cpp
index 5c4799aa96..997ff03ab3 100644
--- a/cpp/lib/common/sys/apr/Thread.cpp
+++ b/cpp/lib/common/sys/apr/Thread.cpp
@@ -20,6 +20,7 @@
*/
#include <sys/Thread.h>
+#include "APRPool.h"
using namespace qpid::sys;
using qpid::sys::Runnable;
@@ -30,4 +31,7 @@ void* APR_THREAD_FUNC Thread::runRunnable(apr_thread_t* thread, void *data) {
return NULL;
}
+Thread::~Thread() {
+}
+