summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/sys/posix/AsynchIO.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/sys/posix/AsynchIO.cpp')
-rw-r--r--cpp/src/qpid/sys/posix/AsynchIO.cpp50
1 files changed, 29 insertions, 21 deletions
diff --git a/cpp/src/qpid/sys/posix/AsynchIO.cpp b/cpp/src/qpid/sys/posix/AsynchIO.cpp
index 4600960c6d..e73bbc03ca 100644
--- a/cpp/src/qpid/sys/posix/AsynchIO.cpp
+++ b/cpp/src/qpid/sys/posix/AsynchIO.cpp
@@ -97,7 +97,8 @@ AsynchIO::AsynchIO(const Socket& s,
closedCallback(cCb),
emptyCallback(eCb),
idleCallback(iCb),
- queuedClose(false) {
+ queuedClose(false),
+ writePending(false) {
s.setNonblocking();
}
@@ -139,20 +140,21 @@ void AsynchIO::unread(BufferBase* buff) {
DispatchHandle::rewatchRead();
}
-// Either queue for writing or announce that there is something to write
-// and we should ask for it
void AsynchIO::queueWrite(BufferBase* buff) {
- // If no buffer then don't queue anything
- // (but still wake up for writing)
- if (buff) {
- // If we've already closed the socket then throw the write away
- if (queuedClose) {
- bufferQueue.push_front(buff);
- return;
- } else {
- writeQueue.push_front(buff);
- }
- }
+ assert(buff);
+ // If we've already closed the socket then throw the write away
+ if (queuedClose) {
+ bufferQueue.push_front(buff);
+ return;
+ } else {
+ writeQueue.push_front(buff);
+ }
+ writePending = false;
+ DispatchHandle::rewatchWrite();
+}
+
+void AsynchIO::notifyPendingWrite() {
+ writePending = true;
DispatchHandle::rewatchWrite();
}
@@ -269,18 +271,24 @@ void AsynchIO::writeable(DispatchHandle& h) {
}
}
} else {
- // If we're waiting to close the socket then can do it now as there is nothing to write
- if (queuedClose) {
- close(h);
- return;
- }
+ // If we're waiting to close the socket then can do it now as there is nothing to write
+ if (queuedClose) {
+ close(h);
+ return;
+ }
// Fd is writable, but nothing to write
if (idleCallback) {
+ writePending = false;
idleCallback(*this);
}
// If we still have no buffers to write we can't do anything more
- if (writeQueue.empty() && !queuedClose) {
+ if (writeQueue.empty() && !writePending && !queuedClose) {
h.unwatchWrite();
+ //the following handles the case where writePending is
+ //set to true after the test above; in this case its
+ //possible that the unwatchWrite overwrites the
+ //desired rewatchWrite so we correct that here
+ if (writePending) h.rewatchWrite();
return;
}
}
@@ -304,7 +312,7 @@ void AsynchIO::close(DispatchHandle& h) {
h.stopWatch();
h.getSocket().close();
if (closedCallback) {
- closedCallback(*this, getSocket());
+ closedCallback(*this, getSocket());
}
}