summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp24
1 files changed, 21 insertions, 3 deletions
diff --git a/qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp b/qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp
index 5d1ac6d034..e702a6d76d 100644
--- a/qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp
+++ b/qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp
@@ -305,6 +305,13 @@ private:
* thread processing this handle.
*/
volatile bool writePending;
+ /**
+ * This records whether we've been reading is flow controlled:
+ * it's safe as a simple boolean as the only way to be stopped
+ * is in calls only allowed in the callback context, the only calls
+ * checking it are also in calls only allowed in callback context.
+ */
+ volatile bool readingStopped;
};
AsynchIO::AsynchIO(const Socket& s,
@@ -323,7 +330,8 @@ AsynchIO::AsynchIO(const Socket& s,
idleCallback(iCb),
socket(s),
queuedClose(false),
- writePending(false) {
+ writePending(false),
+ readingStopped(false) {
s.setNonblocking();
}
@@ -351,8 +359,11 @@ void AsynchIO::queueReadBuffer(BufferBase* buff) {
assert(buff);
buff->dataStart = 0;
buff->dataCount = 0;
+
+ bool queueWasEmpty = bufferQueue.empty();
bufferQueue.push_back(buff);
- DispatchHandle::rewatchRead();
+ if (queueWasEmpty && !readingStopped)
+ DispatchHandle::rewatchRead();
}
void AsynchIO::unread(BufferBase* buff) {
@@ -361,8 +372,11 @@ void AsynchIO::unread(BufferBase* buff) {
memmove(buff->bytes, buff->bytes+buff->dataStart, buff->dataCount);
buff->dataStart = 0;
}
+
+ bool queueWasEmpty = bufferQueue.empty();
bufferQueue.push_front(buff);
- DispatchHandle::rewatchRead();
+ if (queueWasEmpty && !readingStopped)
+ DispatchHandle::rewatchRead();
}
void AsynchIO::queueWrite(BufferBase* buff) {
@@ -378,6 +392,7 @@ void AsynchIO::queueWrite(BufferBase* buff) {
DispatchHandle::rewatchWrite();
}
+// This can happen outside the callback context
void AsynchIO::notifyPendingWrite() {
writePending = true;
DispatchHandle::rewatchWrite();
@@ -392,11 +407,14 @@ bool AsynchIO::writeQueueEmpty() {
return writeQueue.empty();
}
+// This can happen outside the callback context
void AsynchIO::startReading() {
+ readingStopped = false;
DispatchHandle::rewatchRead();
}
void AsynchIO::stopReading() {
+ readingStopped = true;
DispatchHandle::unwatchRead();
}