diff options
Diffstat (limited to 'cpp/common/io/src')
-rw-r--r-- | cpp/common/io/src/APRConnector.cpp | 23 | ||||
-rw-r--r-- | cpp/common/io/src/APRSocket.cpp | 4 | ||||
-rw-r--r-- | cpp/common/io/src/BlockingAPRAcceptor.cpp | 9 | ||||
-rw-r--r-- | cpp/common/io/src/BlockingAPRSessionContext.cpp | 17 | ||||
-rw-r--r-- | cpp/common/io/src/LFAcceptor.cpp | 12 | ||||
-rw-r--r-- | cpp/common/io/src/LFProcessor.cpp | 32 | ||||
-rw-r--r-- | cpp/common/io/src/LFSessionContext.cpp | 34 |
7 files changed, 69 insertions, 62 deletions
diff --git a/cpp/common/io/src/APRConnector.cpp b/cpp/common/io/src/APRConnector.cpp index 0e022a8c73..5f3bfd6957 100644 --- a/cpp/common/io/src/APRConnector.cpp +++ b/cpp/common/io/src/APRConnector.cpp @@ -26,15 +26,18 @@ using namespace qpid::concurrent; using namespace qpid::framing; using qpid::QpidError; -APRConnector::APRConnector(bool _debug, u_int32_t buffer_size) : closed(true), debug(_debug), - idleIn(0), idleOut(0), timeout(0), - timeoutHandler(0), - shutdownHandler(0), - lastIn(0), lastOut(0), - receive_buffer_size(buffer_size), - send_buffer_size(buffer_size), - inbuf(receive_buffer_size), - outbuf(send_buffer_size){ +APRConnector::APRConnector(bool _debug, u_int32_t buffer_size) : + debug(_debug), + receive_buffer_size(buffer_size), + send_buffer_size(buffer_size), + closed(true), + lastIn(0), lastOut(0), + timeout(0), + idleIn(0), idleOut(0), + timeoutHandler(0), + shutdownHandler(0), + inbuf(receive_buffer_size), + outbuf(send_buffer_size){ APRBase::increment(); @@ -104,7 +107,7 @@ void APRConnector::writeBlock(AMQDataBlock* data){ writeLock->release(); } -void APRConnector::writeToSocket(char* data, int available){ +void APRConnector::writeToSocket(char* data, size_t available){ apr_size_t bytes(available); apr_size_t written(0); while(written < available && !closed){ diff --git a/cpp/common/io/src/APRSocket.cpp b/cpp/common/io/src/APRSocket.cpp index 32861ea442..1ef7e270a3 100644 --- a/cpp/common/io/src/APRSocket.cpp +++ b/cpp/common/io/src/APRSocket.cpp @@ -17,7 +17,7 @@ */ #include "APRBase.h" #include "APRSocket.h" - +#include <assert.h> #include <iostream> using namespace qpid::io; @@ -45,6 +45,8 @@ void APRSocket::write(qpid::framing::Buffer& buffer){ do{ bytes = buffer.available(); apr_status_t s = apr_socket_send(socket, buffer.start(), &bytes); + // TODO aconway 2006-10-05: better error handling + assert(s == 0); buffer.move(bytes); }while(bytes > 0); } diff --git a/cpp/common/io/src/BlockingAPRAcceptor.cpp b/cpp/common/io/src/BlockingAPRAcceptor.cpp index 380318bcfa..bf74260a55 100644 --- a/cpp/common/io/src/BlockingAPRAcceptor.cpp +++ b/cpp/common/io/src/BlockingAPRAcceptor.cpp @@ -24,10 +24,11 @@ using namespace qpid::concurrent; using namespace qpid::framing; using namespace qpid::io; -BlockingAPRAcceptor::BlockingAPRAcceptor(bool _debug, int c) : connectionBacklog(c), - threadFactory(new APRThreadFactory()), - debug(_debug){ - +BlockingAPRAcceptor::BlockingAPRAcceptor(bool _debug, int c) : + debug(_debug), + threadFactory(new APRThreadFactory()), + connectionBacklog(c) +{ APRBase::increment(); CHECK_APR_SUCCESS(apr_pool_create(&apr_pool, NULL)); } diff --git a/cpp/common/io/src/BlockingAPRSessionContext.cpp b/cpp/common/io/src/BlockingAPRSessionContext.cpp index 99352c90d5..6d1dc3470c 100644 --- a/cpp/common/io/src/BlockingAPRSessionContext.cpp +++ b/cpp/common/io/src/BlockingAPRSessionContext.cpp @@ -15,6 +15,7 @@ * limitations under the License. * */ +#include <assert.h> #include <iostream> #include "BlockingAPRSessionContext.h" #include "BlockingAPRAcceptor.h" @@ -32,10 +33,10 @@ BlockingAPRSessionContext::BlockingAPRSessionContext(apr_socket_t* _socket, bool _debug) : socket(_socket), debug(_debug), - inbuf(65536), - outbuf(65536), handler(0), acceptor(_acceptor), + inbuf(65536), + outbuf(65536), closed(false){ reader = new Reader(this); @@ -73,9 +74,9 @@ void BlockingAPRSessionContext::read(){ inbuf.flip(); if(!initiated){ - ProtocolInitiation* init = new ProtocolInitiation(); - if(init->decode(inbuf)){ - handler->initiated(init); + ProtocolInitiation* protocolInit = new ProtocolInitiation(); + if(protocolInit->decode(inbuf)){ + handler->initiated(protocolInit); if(debug) std::cout << "RECV: [" << &socket << "]: Initialised " << std::endl; initiated = true; } @@ -122,6 +123,7 @@ void BlockingAPRSessionContext::write(){ apr_size_t bytes = available; while(available > written){ apr_status_t s = apr_socket_send(socket, data + written, &bytes); + assert(s == 0); // TODO aconway 2006-10-05: Error Handling. written += bytes; bytes = available - written; } @@ -146,9 +148,8 @@ void BlockingAPRSessionContext::send(AMQFrame* frame){ } } -void BlockingAPRSessionContext::init(SessionHandler* handler){ - this->handler = handler; - //start the threads +void BlockingAPRSessionContext::init(SessionHandler* _handler){ + handler = _handler; rThread->start(); wThread->start(); } diff --git a/cpp/common/io/src/LFAcceptor.cpp b/cpp/common/io/src/LFAcceptor.cpp index 6653e926db..bb5164f457 100644 --- a/cpp/common/io/src/LFAcceptor.cpp +++ b/cpp/common/io/src/LFAcceptor.cpp @@ -21,12 +21,12 @@ using namespace qpid::concurrent; using namespace qpid::io; -LFAcceptor::LFAcceptor(bool _debug, int c, int worker_threads, int m) : processor(aprPool.pool, worker_threads, 1000, 5000000), - connectionBacklog(c), - max_connections_per_processor(m), - debug(_debug){ - -} +LFAcceptor::LFAcceptor(bool _debug, int c, int worker_threads, int m) : + processor(aprPool.pool, worker_threads, 1000, 5000000), + max_connections_per_processor(m), + debug(_debug), + connectionBacklog(c) +{ } void LFAcceptor::bind(int port, SessionHandlerFactory* factory){ diff --git a/cpp/common/io/src/LFProcessor.cpp b/cpp/common/io/src/LFProcessor.cpp index 8ef3543b8f..3ac66576e3 100644 --- a/cpp/common/io/src/LFProcessor.cpp +++ b/cpp/common/io/src/LFProcessor.cpp @@ -25,15 +25,17 @@ using namespace qpid::io; using namespace qpid::concurrent; using qpid::QpidError; -LFProcessor::LFProcessor(apr_pool_t* pool, int _workers, int _size, int _timeout) : size(_size), - timeout(_timeout), - signalledCount(0), - current(0), - count(0), - hasLeader(false), - workerCount(_workers), - workers(new Thread*[_workers]), - stopped(false){ +LFProcessor::LFProcessor(apr_pool_t* pool, int _workers, int _size, int _timeout) : + size(_size), + timeout(_timeout), + signalledCount(0), + current(0), + count(0), + workerCount(_workers), + hasLeader(false), + workers(new Thread*[_workers]), + stopped(false) +{ CHECK_APR_SUCCESS(apr_pollset_create(&pollset, size, pool, APR_POLLSET_THREADSAFE)); //create & start the required number of threads @@ -87,17 +89,13 @@ void LFProcessor::update(const apr_pollfd_t* const fd){ } bool LFProcessor::full(){ - countLock.acquire(); - bool full = count == size; - countLock.release(); - return full; + Locker locker(countLock); + return count == size; } bool LFProcessor::empty(){ - countLock.acquire(); - bool empty = count == 0; - countLock.release(); - return empty; + Locker locker(countLock); + return count == 0; } void LFProcessor::poll(){ diff --git a/cpp/common/io/src/LFSessionContext.cpp b/cpp/common/io/src/LFSessionContext.cpp index d786cb5e98..7b8208f704 100644 --- a/cpp/common/io/src/LFSessionContext.cpp +++ b/cpp/common/io/src/LFSessionContext.cpp @@ -26,17 +26,19 @@ using namespace qpid::framing; LFSessionContext::LFSessionContext(apr_pool_t* _pool, apr_socket_t* _socket, LFProcessor* const _processor, - bool _debug) : socket(_socket), - processor(_processor), - initiated(false), - processing(false), - closing(false), - in(32768), - out(32768), - reading(0), - writing(0), - debug(_debug){ - + bool _debug) : + debug(_debug), + socket(_socket), + initiated(false), + in(32768), + out(32768), + processor(_processor), + processing(false), + closing(false), + reading(0), + writing(0) +{ + fd.p = _pool; fd.desc_type = APR_POLL_SOCKET; fd.reqevents = APR_POLLIN; @@ -63,9 +65,9 @@ void LFSessionContext::read(){ handler->received(&frame); } }else{ - ProtocolInitiation init; - if(init.decode(in)){ - handler->initiated(&init); + ProtocolInitiation protocolInit; + if(protocolInit.decode(in)){ + handler->initiated(&protocolInit); initiated = true; if(debug) std::cout << "INIT [" << &socket << "]" << std::endl; } @@ -173,8 +175,8 @@ void LFSessionContext::shutdown(){ handleClose(); } -void LFSessionContext::init(SessionHandler* handler){ - this->handler = handler; +void LFSessionContext::init(SessionHandler* _handler){ + handler = _handler; processor->add(&fd); } |