diff options
Diffstat (limited to 'cpp/common/io')
-rw-r--r-- | cpp/common/io/Makefile | 4 | ||||
-rw-r--r-- | cpp/common/io/inc/APRConnector.h | 2 | ||||
-rw-r--r-- | cpp/common/io/inc/ConnectorImpl.h | 4 | ||||
-rw-r--r-- | cpp/common/io/inc/LFProcessor.h | 2 | ||||
-rw-r--r-- | cpp/common/io/src/APRConnector.cpp | 23 | ||||
-rw-r--r-- | cpp/common/io/src/APRSocket.cpp | 4 | ||||
-rw-r--r-- | cpp/common/io/src/BlockingAPRAcceptor.cpp | 9 | ||||
-rw-r--r-- | cpp/common/io/src/BlockingAPRSessionContext.cpp | 17 | ||||
-rw-r--r-- | cpp/common/io/src/LFAcceptor.cpp | 12 | ||||
-rw-r--r-- | cpp/common/io/src/LFProcessor.cpp | 32 | ||||
-rw-r--r-- | cpp/common/io/src/LFSessionContext.cpp | 34 |
11 files changed, 73 insertions, 70 deletions
diff --git a/cpp/common/io/Makefile b/cpp/common/io/Makefile index e94e802afa..617b91448a 100644 --- a/cpp/common/io/Makefile +++ b/cpp/common/io/Makefile @@ -16,10 +16,6 @@ QPID_HOME = ../../.. include ${QPID_HOME}/cpp/options.mk - -# Compiler flags -CXXFLAGS = ${DEBUG} ${OPT} -MMD -I inc -I ../concurrent/inc -I ../error/inc -I ../framing/inc -I ../framing/generated -I ${APR_HOME}/include/apr-1/ - SOURCES := $(wildcard src/*.cpp) OBJECTS := $(subst .cpp,.o,$(SOURCES)) DEPS := $(subst .cpp,.d,$(SOURCES)) diff --git a/cpp/common/io/inc/APRConnector.h b/cpp/common/io/inc/APRConnector.h index c292c4d7e0..c6ed887f78 100644 --- a/cpp/common/io/inc/APRConnector.h +++ b/cpp/common/io/inc/APRConnector.h @@ -68,7 +68,7 @@ namespace io { void checkIdle(apr_status_t status); void writeBlock(qpid::framing::AMQDataBlock* data); - void writeToSocket(char* data, int available); + void writeToSocket(char* data, size_t available); void setSocketTimeout(); void run(); diff --git a/cpp/common/io/inc/ConnectorImpl.h b/cpp/common/io/inc/ConnectorImpl.h index 242f3aed49..1abb72f32a 100644 --- a/cpp/common/io/inc/ConnectorImpl.h +++ b/cpp/common/io/inc/ConnectorImpl.h @@ -32,7 +32,7 @@ namespace io { { public: - ConnectorImpl(bool debug = false, u_int32_t buffer_size = 1024):APRConnector(debug,buffer_size){}; + ConnectorImpl(bool _debug = false, u_int32_t buffer_size = 1024):APRConnector(_debug,buffer_size){}; virtual ~ConnectorImpl(){}; }; #else @@ -40,7 +40,7 @@ namespace io { { public: - ConnectorImpl(bool debug = false, u_int32_t buffer_size = 1024):LConnector(debug, buffer_size){}; + ConnectorImpl(bool _debug = false, u_int32_t buffer_size = 1024):LConnector(_debug, buffer_size){}; virtual ~ConnectorImpl(){}; }; diff --git a/cpp/common/io/inc/LFProcessor.h b/cpp/common/io/inc/LFProcessor.h index 8cfbd237a3..6e67268906 100644 --- a/cpp/common/io/inc/LFProcessor.h +++ b/cpp/common/io/inc/LFProcessor.h @@ -48,12 +48,12 @@ namespace io { const apr_pollfd_t* signalledFDs; int count; const int workerCount; + bool hasLeader; qpid::concurrent::Thread** const workers; qpid::concurrent::APRMonitor leadLock; qpid::concurrent::APRMonitor countLock; qpid::concurrent::APRThreadFactory factory; std::vector<LFSessionContext*> sessions; - bool hasLeader; volatile bool stopped; const apr_pollfd_t* getNextEvent(); diff --git a/cpp/common/io/src/APRConnector.cpp b/cpp/common/io/src/APRConnector.cpp index 0e022a8c73..5f3bfd6957 100644 --- a/cpp/common/io/src/APRConnector.cpp +++ b/cpp/common/io/src/APRConnector.cpp @@ -26,15 +26,18 @@ using namespace qpid::concurrent; using namespace qpid::framing; using qpid::QpidError; -APRConnector::APRConnector(bool _debug, u_int32_t buffer_size) : closed(true), debug(_debug), - idleIn(0), idleOut(0), timeout(0), - timeoutHandler(0), - shutdownHandler(0), - lastIn(0), lastOut(0), - receive_buffer_size(buffer_size), - send_buffer_size(buffer_size), - inbuf(receive_buffer_size), - outbuf(send_buffer_size){ +APRConnector::APRConnector(bool _debug, u_int32_t buffer_size) : + debug(_debug), + receive_buffer_size(buffer_size), + send_buffer_size(buffer_size), + closed(true), + lastIn(0), lastOut(0), + timeout(0), + idleIn(0), idleOut(0), + timeoutHandler(0), + shutdownHandler(0), + inbuf(receive_buffer_size), + outbuf(send_buffer_size){ APRBase::increment(); @@ -104,7 +107,7 @@ void APRConnector::writeBlock(AMQDataBlock* data){ writeLock->release(); } -void APRConnector::writeToSocket(char* data, int available){ +void APRConnector::writeToSocket(char* data, size_t available){ apr_size_t bytes(available); apr_size_t written(0); while(written < available && !closed){ diff --git a/cpp/common/io/src/APRSocket.cpp b/cpp/common/io/src/APRSocket.cpp index 32861ea442..1ef7e270a3 100644 --- a/cpp/common/io/src/APRSocket.cpp +++ b/cpp/common/io/src/APRSocket.cpp @@ -17,7 +17,7 @@ */ #include "APRBase.h" #include "APRSocket.h" - +#include <assert.h> #include <iostream> using namespace qpid::io; @@ -45,6 +45,8 @@ void APRSocket::write(qpid::framing::Buffer& buffer){ do{ bytes = buffer.available(); apr_status_t s = apr_socket_send(socket, buffer.start(), &bytes); + // TODO aconway 2006-10-05: better error handling + assert(s == 0); buffer.move(bytes); }while(bytes > 0); } diff --git a/cpp/common/io/src/BlockingAPRAcceptor.cpp b/cpp/common/io/src/BlockingAPRAcceptor.cpp index 380318bcfa..bf74260a55 100644 --- a/cpp/common/io/src/BlockingAPRAcceptor.cpp +++ b/cpp/common/io/src/BlockingAPRAcceptor.cpp @@ -24,10 +24,11 @@ using namespace qpid::concurrent; using namespace qpid::framing; using namespace qpid::io; -BlockingAPRAcceptor::BlockingAPRAcceptor(bool _debug, int c) : connectionBacklog(c), - threadFactory(new APRThreadFactory()), - debug(_debug){ - +BlockingAPRAcceptor::BlockingAPRAcceptor(bool _debug, int c) : + debug(_debug), + threadFactory(new APRThreadFactory()), + connectionBacklog(c) +{ APRBase::increment(); CHECK_APR_SUCCESS(apr_pool_create(&apr_pool, NULL)); } diff --git a/cpp/common/io/src/BlockingAPRSessionContext.cpp b/cpp/common/io/src/BlockingAPRSessionContext.cpp index 99352c90d5..6d1dc3470c 100644 --- a/cpp/common/io/src/BlockingAPRSessionContext.cpp +++ b/cpp/common/io/src/BlockingAPRSessionContext.cpp @@ -15,6 +15,7 @@ * limitations under the License. * */ +#include <assert.h> #include <iostream> #include "BlockingAPRSessionContext.h" #include "BlockingAPRAcceptor.h" @@ -32,10 +33,10 @@ BlockingAPRSessionContext::BlockingAPRSessionContext(apr_socket_t* _socket, bool _debug) : socket(_socket), debug(_debug), - inbuf(65536), - outbuf(65536), handler(0), acceptor(_acceptor), + inbuf(65536), + outbuf(65536), closed(false){ reader = new Reader(this); @@ -73,9 +74,9 @@ void BlockingAPRSessionContext::read(){ inbuf.flip(); if(!initiated){ - ProtocolInitiation* init = new ProtocolInitiation(); - if(init->decode(inbuf)){ - handler->initiated(init); + ProtocolInitiation* protocolInit = new ProtocolInitiation(); + if(protocolInit->decode(inbuf)){ + handler->initiated(protocolInit); if(debug) std::cout << "RECV: [" << &socket << "]: Initialised " << std::endl; initiated = true; } @@ -122,6 +123,7 @@ void BlockingAPRSessionContext::write(){ apr_size_t bytes = available; while(available > written){ apr_status_t s = apr_socket_send(socket, data + written, &bytes); + assert(s == 0); // TODO aconway 2006-10-05: Error Handling. written += bytes; bytes = available - written; } @@ -146,9 +148,8 @@ void BlockingAPRSessionContext::send(AMQFrame* frame){ } } -void BlockingAPRSessionContext::init(SessionHandler* handler){ - this->handler = handler; - //start the threads +void BlockingAPRSessionContext::init(SessionHandler* _handler){ + handler = _handler; rThread->start(); wThread->start(); } diff --git a/cpp/common/io/src/LFAcceptor.cpp b/cpp/common/io/src/LFAcceptor.cpp index 6653e926db..bb5164f457 100644 --- a/cpp/common/io/src/LFAcceptor.cpp +++ b/cpp/common/io/src/LFAcceptor.cpp @@ -21,12 +21,12 @@ using namespace qpid::concurrent; using namespace qpid::io; -LFAcceptor::LFAcceptor(bool _debug, int c, int worker_threads, int m) : processor(aprPool.pool, worker_threads, 1000, 5000000), - connectionBacklog(c), - max_connections_per_processor(m), - debug(_debug){ - -} +LFAcceptor::LFAcceptor(bool _debug, int c, int worker_threads, int m) : + processor(aprPool.pool, worker_threads, 1000, 5000000), + max_connections_per_processor(m), + debug(_debug), + connectionBacklog(c) +{ } void LFAcceptor::bind(int port, SessionHandlerFactory* factory){ diff --git a/cpp/common/io/src/LFProcessor.cpp b/cpp/common/io/src/LFProcessor.cpp index 8ef3543b8f..3ac66576e3 100644 --- a/cpp/common/io/src/LFProcessor.cpp +++ b/cpp/common/io/src/LFProcessor.cpp @@ -25,15 +25,17 @@ using namespace qpid::io; using namespace qpid::concurrent; using qpid::QpidError; -LFProcessor::LFProcessor(apr_pool_t* pool, int _workers, int _size, int _timeout) : size(_size), - timeout(_timeout), - signalledCount(0), - current(0), - count(0), - hasLeader(false), - workerCount(_workers), - workers(new Thread*[_workers]), - stopped(false){ +LFProcessor::LFProcessor(apr_pool_t* pool, int _workers, int _size, int _timeout) : + size(_size), + timeout(_timeout), + signalledCount(0), + current(0), + count(0), + workerCount(_workers), + hasLeader(false), + workers(new Thread*[_workers]), + stopped(false) +{ CHECK_APR_SUCCESS(apr_pollset_create(&pollset, size, pool, APR_POLLSET_THREADSAFE)); //create & start the required number of threads @@ -87,17 +89,13 @@ void LFProcessor::update(const apr_pollfd_t* const fd){ } bool LFProcessor::full(){ - countLock.acquire(); - bool full = count == size; - countLock.release(); - return full; + Locker locker(countLock); + return count == size; } bool LFProcessor::empty(){ - countLock.acquire(); - bool empty = count == 0; - countLock.release(); - return empty; + Locker locker(countLock); + return count == 0; } void LFProcessor::poll(){ diff --git a/cpp/common/io/src/LFSessionContext.cpp b/cpp/common/io/src/LFSessionContext.cpp index d786cb5e98..7b8208f704 100644 --- a/cpp/common/io/src/LFSessionContext.cpp +++ b/cpp/common/io/src/LFSessionContext.cpp @@ -26,17 +26,19 @@ using namespace qpid::framing; LFSessionContext::LFSessionContext(apr_pool_t* _pool, apr_socket_t* _socket, LFProcessor* const _processor, - bool _debug) : socket(_socket), - processor(_processor), - initiated(false), - processing(false), - closing(false), - in(32768), - out(32768), - reading(0), - writing(0), - debug(_debug){ - + bool _debug) : + debug(_debug), + socket(_socket), + initiated(false), + in(32768), + out(32768), + processor(_processor), + processing(false), + closing(false), + reading(0), + writing(0) +{ + fd.p = _pool; fd.desc_type = APR_POLL_SOCKET; fd.reqevents = APR_POLLIN; @@ -63,9 +65,9 @@ void LFSessionContext::read(){ handler->received(&frame); } }else{ - ProtocolInitiation init; - if(init.decode(in)){ - handler->initiated(&init); + ProtocolInitiation protocolInit; + if(protocolInit.decode(in)){ + handler->initiated(&protocolInit); initiated = true; if(debug) std::cout << "INIT [" << &socket << "]" << std::endl; } @@ -173,8 +175,8 @@ void LFSessionContext::shutdown(){ handleClose(); } -void LFSessionContext::init(SessionHandler* handler){ - this->handler = handler; +void LFSessionContext::init(SessionHandler* _handler){ + handler = _handler; processor->add(&fd); } |