diff options
Diffstat (limited to 'cpp/src/qpid/sys/posix/AsynchIO.cpp')
-rw-r--r-- | cpp/src/qpid/sys/posix/AsynchIO.cpp | 50 |
1 files changed, 29 insertions, 21 deletions
diff --git a/cpp/src/qpid/sys/posix/AsynchIO.cpp b/cpp/src/qpid/sys/posix/AsynchIO.cpp index 4600960c6d..e73bbc03ca 100644 --- a/cpp/src/qpid/sys/posix/AsynchIO.cpp +++ b/cpp/src/qpid/sys/posix/AsynchIO.cpp @@ -97,7 +97,8 @@ AsynchIO::AsynchIO(const Socket& s, closedCallback(cCb), emptyCallback(eCb), idleCallback(iCb), - queuedClose(false) { + queuedClose(false), + writePending(false) { s.setNonblocking(); } @@ -139,20 +140,21 @@ void AsynchIO::unread(BufferBase* buff) { DispatchHandle::rewatchRead(); } -// Either queue for writing or announce that there is something to write -// and we should ask for it void AsynchIO::queueWrite(BufferBase* buff) { - // If no buffer then don't queue anything - // (but still wake up for writing) - if (buff) { - // If we've already closed the socket then throw the write away - if (queuedClose) { - bufferQueue.push_front(buff); - return; - } else { - writeQueue.push_front(buff); - } - } + assert(buff); + // If we've already closed the socket then throw the write away + if (queuedClose) { + bufferQueue.push_front(buff); + return; + } else { + writeQueue.push_front(buff); + } + writePending = false; + DispatchHandle::rewatchWrite(); +} + +void AsynchIO::notifyPendingWrite() { + writePending = true; DispatchHandle::rewatchWrite(); } @@ -269,18 +271,24 @@ void AsynchIO::writeable(DispatchHandle& h) { } } } else { - // If we're waiting to close the socket then can do it now as there is nothing to write - if (queuedClose) { - close(h); - return; - } + // If we're waiting to close the socket then can do it now as there is nothing to write + if (queuedClose) { + close(h); + return; + } // Fd is writable, but nothing to write if (idleCallback) { + writePending = false; idleCallback(*this); } // If we still have no buffers to write we can't do anything more - if (writeQueue.empty() && !queuedClose) { + if (writeQueue.empty() && !writePending && !queuedClose) { h.unwatchWrite(); + //the following handles the case where writePending is + //set to true after the test above; in this case its + //possible that the unwatchWrite overwrites the + //desired rewatchWrite so we correct that here + if (writePending) h.rewatchWrite(); return; } } @@ -304,7 +312,7 @@ void AsynchIO::close(DispatchHandle& h) { h.stopWatch(); h.getSocket().close(); if (closedCallback) { - closedCallback(*this, getSocket()); + closedCallback(*this, getSocket()); } } |