diff options
author | Andrew Stitcher <astitcher@apache.org> | 2009-09-02 16:24:00 +0000 |
---|---|---|
committer | Andrew Stitcher <astitcher@apache.org> | 2009-09-02 16:24:00 +0000 |
commit | 9f8abff662ff73e45954fec11c85edaff17367b1 (patch) | |
tree | 09eb16d16cd2760a147b0a566c351ddd9b184055 | |
parent | 242d32bbd0f84026680c3a8dc0c765b65403d36a (diff) | |
download | qpid-python-9f8abff662ff73e45954fec11c85edaff17367b1.tar.gz |
Change Async buffer returning logic to only watch reads when necessary
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@810591 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp | 24 |
1 files changed, 21 insertions, 3 deletions
diff --git a/qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp b/qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp index 5d1ac6d034..e702a6d76d 100644 --- a/qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp +++ b/qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp @@ -305,6 +305,13 @@ private: * thread processing this handle. */ volatile bool writePending; + /** + * This records whether we've been reading is flow controlled: + * it's safe as a simple boolean as the only way to be stopped + * is in calls only allowed in the callback context, the only calls + * checking it are also in calls only allowed in callback context. + */ + volatile bool readingStopped; }; AsynchIO::AsynchIO(const Socket& s, @@ -323,7 +330,8 @@ AsynchIO::AsynchIO(const Socket& s, idleCallback(iCb), socket(s), queuedClose(false), - writePending(false) { + writePending(false), + readingStopped(false) { s.setNonblocking(); } @@ -351,8 +359,11 @@ void AsynchIO::queueReadBuffer(BufferBase* buff) { assert(buff); buff->dataStart = 0; buff->dataCount = 0; + + bool queueWasEmpty = bufferQueue.empty(); bufferQueue.push_back(buff); - DispatchHandle::rewatchRead(); + if (queueWasEmpty && !readingStopped) + DispatchHandle::rewatchRead(); } void AsynchIO::unread(BufferBase* buff) { @@ -361,8 +372,11 @@ void AsynchIO::unread(BufferBase* buff) { memmove(buff->bytes, buff->bytes+buff->dataStart, buff->dataCount); buff->dataStart = 0; } + + bool queueWasEmpty = bufferQueue.empty(); bufferQueue.push_front(buff); - DispatchHandle::rewatchRead(); + if (queueWasEmpty && !readingStopped) + DispatchHandle::rewatchRead(); } void AsynchIO::queueWrite(BufferBase* buff) { @@ -378,6 +392,7 @@ void AsynchIO::queueWrite(BufferBase* buff) { DispatchHandle::rewatchWrite(); } +// This can happen outside the callback context void AsynchIO::notifyPendingWrite() { writePending = true; DispatchHandle::rewatchWrite(); @@ -392,11 +407,14 @@ bool AsynchIO::writeQueueEmpty() { return writeQueue.empty(); } +// This can happen outside the callback context void AsynchIO::startReading() { + readingStopped = false; DispatchHandle::rewatchRead(); } void AsynchIO::stopReading() { + readingStopped = true; DispatchHandle::unwatchRead(); } |