summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/sys/posix/EventChannelConnection.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/sys/posix/EventChannelConnection.cpp')
-rw-r--r--cpp/src/qpid/sys/posix/EventChannelConnection.cpp36
1 files changed, 24 insertions, 12 deletions
diff --git a/cpp/src/qpid/sys/posix/EventChannelConnection.cpp b/cpp/src/qpid/sys/posix/EventChannelConnection.cpp
index 0c1c81b6fe..a36f096a4d 100644
--- a/cpp/src/qpid/sys/posix/EventChannelConnection.cpp
+++ b/cpp/src/qpid/sys/posix/EventChannelConnection.cpp
@@ -24,7 +24,6 @@
#include "EventChannelConnection.h"
#include "qpid/sys/ConnectionInputHandlerFactory.h"
#include "qpid/QpidError.h"
-#include "qpid/log/Statement.h"
using namespace std;
using namespace qpid;
@@ -44,6 +43,8 @@ EventChannelConnection::EventChannelConnection(
) :
readFd(rfd),
writeFd(wfd ? wfd : rfd),
+ readEvent(readFd),
+ writeEvent(writeFd),
readCallback(boost::bind(&EventChannelConnection::closeOnException,
this, &EventChannelConnection::endInitRead)),
@@ -55,8 +56,8 @@ EventChannelConnection::EventChannelConnection(
out(bufferSize),
isTrace(isTrace_)
{
- BOOST_ASSERT(readFd > 0);
- BOOST_ASSERT(writeFd > 0);
+ assert(readFd > 0);
+ assert(writeFd > 0);
closeOnException(&EventChannelConnection::startRead);
}
@@ -133,14 +134,17 @@ void EventChannelConnection::startWrite() {
}
// No need to lock here - only one thread can be writing at a time.
out.clear();
- QPID_LOG(trace, "Send on socket " << writeFd << ": " << *frame);
+ if (isTrace)
+ cout << "Send on socket " << writeFd << ": " << *frame << endl;
frame->encode(out);
out.flip();
+ // TODO: AMS 1/6/07 This only works because we already have the correct fd
+ // in the descriptor - change not to use assigment
writeEvent = WriteEvent(
writeFd, out.start(), out.available(),
boost::bind(&EventChannelConnection::closeOnException,
this, &EventChannelConnection::endWrite));
- threads->postEvent(writeEvent);
+ threads->post(writeEvent);
}
// ScopedBusy ctor increments busyThreads.
@@ -161,12 +165,18 @@ void EventChannelConnection::endWrite() {
ScopedBusy(*this);
{
Monitor::ScopedLock lock(monitor);
+ assert(isWriting);
isWriting = false;
- if (isClosed)
+ if (isClosed)
return;
writeEvent.throwIfException();
+ if (writeEvent.getBytesWritten() < writeEvent.getSize()) {
+ // Keep writing the current event till done.
+ isWriting = true;
+ threads->post(writeEvent);
+ }
}
- // Check if there's more in to write in the write queue.
+ // Continue writing from writeFrames queue.
startWrite();
}
@@ -179,8 +189,8 @@ void EventChannelConnection::endWrite() {
void EventChannelConnection::startRead() {
// Non blocking read, as much as we can swallow.
readEvent = ReadEvent(
- readFd, in.start(), in.available(), readCallback,true);
- threads->postEvent(readEvent);
+ readFd, in.start(), in.available(), readCallback);
+ threads->post(readEvent);
}
// Completion of initial read, expect protocolInit.
@@ -194,7 +204,7 @@ void EventChannelConnection::endInitRead() {
in.flip();
ProtocolInitiation protocolInit;
if(protocolInit.decode(in)){
- handler->initiated(&protocolInit);
+ handler->initiated(protocolInit);
readCallback = boost::bind(
&EventChannelConnection::closeOnException,
this, &EventChannelConnection::endRead);
@@ -215,8 +225,10 @@ void EventChannelConnection::endRead() {
in.flip();
AMQFrame frame;
while (frame.decode(in)) {
- QPID_LOG(trace, "Received on socket " << readFd
- << ": " << frame);
+ // TODO aconway 2006-11-30: received should take Frame&
+ if (isTrace)
+ cout << "Received on socket " << readFd
+ << ": " << frame << endl;
handler->received(&frame);
}
in.compact();