diff options
Diffstat (limited to 'cpp/src/qpid/sys/posix/EventChannelConnection.cpp')
-rw-r--r-- | cpp/src/qpid/sys/posix/EventChannelConnection.cpp | 36 |
1 files changed, 24 insertions, 12 deletions
diff --git a/cpp/src/qpid/sys/posix/EventChannelConnection.cpp b/cpp/src/qpid/sys/posix/EventChannelConnection.cpp index 0c1c81b6fe..a36f096a4d 100644 --- a/cpp/src/qpid/sys/posix/EventChannelConnection.cpp +++ b/cpp/src/qpid/sys/posix/EventChannelConnection.cpp @@ -24,7 +24,6 @@ #include "EventChannelConnection.h" #include "qpid/sys/ConnectionInputHandlerFactory.h" #include "qpid/QpidError.h" -#include "qpid/log/Statement.h" using namespace std; using namespace qpid; @@ -44,6 +43,8 @@ EventChannelConnection::EventChannelConnection( ) : readFd(rfd), writeFd(wfd ? wfd : rfd), + readEvent(readFd), + writeEvent(writeFd), readCallback(boost::bind(&EventChannelConnection::closeOnException, this, &EventChannelConnection::endInitRead)), @@ -55,8 +56,8 @@ EventChannelConnection::EventChannelConnection( out(bufferSize), isTrace(isTrace_) { - BOOST_ASSERT(readFd > 0); - BOOST_ASSERT(writeFd > 0); + assert(readFd > 0); + assert(writeFd > 0); closeOnException(&EventChannelConnection::startRead); } @@ -133,14 +134,17 @@ void EventChannelConnection::startWrite() { } // No need to lock here - only one thread can be writing at a time. out.clear(); - QPID_LOG(trace, "Send on socket " << writeFd << ": " << *frame); + if (isTrace) + cout << "Send on socket " << writeFd << ": " << *frame << endl; frame->encode(out); out.flip(); + // TODO: AMS 1/6/07 This only works because we already have the correct fd + // in the descriptor - change not to use assigment writeEvent = WriteEvent( writeFd, out.start(), out.available(), boost::bind(&EventChannelConnection::closeOnException, this, &EventChannelConnection::endWrite)); - threads->postEvent(writeEvent); + threads->post(writeEvent); } // ScopedBusy ctor increments busyThreads. @@ -161,12 +165,18 @@ void EventChannelConnection::endWrite() { ScopedBusy(*this); { Monitor::ScopedLock lock(monitor); + assert(isWriting); isWriting = false; - if (isClosed) + if (isClosed) return; writeEvent.throwIfException(); + if (writeEvent.getBytesWritten() < writeEvent.getSize()) { + // Keep writing the current event till done. + isWriting = true; + threads->post(writeEvent); + } } - // Check if there's more in to write in the write queue. + // Continue writing from writeFrames queue. startWrite(); } @@ -179,8 +189,8 @@ void EventChannelConnection::endWrite() { void EventChannelConnection::startRead() { // Non blocking read, as much as we can swallow. readEvent = ReadEvent( - readFd, in.start(), in.available(), readCallback,true); - threads->postEvent(readEvent); + readFd, in.start(), in.available(), readCallback); + threads->post(readEvent); } // Completion of initial read, expect protocolInit. @@ -194,7 +204,7 @@ void EventChannelConnection::endInitRead() { in.flip(); ProtocolInitiation protocolInit; if(protocolInit.decode(in)){ - handler->initiated(&protocolInit); + handler->initiated(protocolInit); readCallback = boost::bind( &EventChannelConnection::closeOnException, this, &EventChannelConnection::endRead); @@ -215,8 +225,10 @@ void EventChannelConnection::endRead() { in.flip(); AMQFrame frame; while (frame.decode(in)) { - QPID_LOG(trace, "Received on socket " << readFd - << ": " << frame); + // TODO aconway 2006-11-30: received should take Frame& + if (isTrace) + cout << "Received on socket " << readFd + << ": " << frame << endl; handler->received(&frame); } in.compact(); |