diff options
author | Andrew Stitcher <astitcher@apache.org> | 2009-09-02 16:24:00 +0000 |
---|---|---|
committer | Andrew Stitcher <astitcher@apache.org> | 2009-09-02 16:24:00 +0000 |
commit | ea3640c656951f3f152f30e5bfb012d520ed2b8a (patch) | |
tree | 4028b056415ce8f72535defe8ec265873c1ed002 /cpp/src | |
parent | f2cfa0f81023d24db62a45084be4fabe80114ba6 (diff) | |
download | qpid-python-ea3640c656951f3f152f30e5bfb012d520ed2b8a.tar.gz |
Change Async buffer returning logic to only watch reads when necessary
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@810591 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src')
-rw-r--r-- | cpp/src/qpid/sys/posix/AsynchIO.cpp | 24 |
1 files changed, 21 insertions, 3 deletions
diff --git a/cpp/src/qpid/sys/posix/AsynchIO.cpp b/cpp/src/qpid/sys/posix/AsynchIO.cpp index 5d1ac6d034..e702a6d76d 100644 --- a/cpp/src/qpid/sys/posix/AsynchIO.cpp +++ b/cpp/src/qpid/sys/posix/AsynchIO.cpp @@ -305,6 +305,13 @@ private: * thread processing this handle. */ volatile bool writePending; + /** + * This records whether we've been reading is flow controlled: + * it's safe as a simple boolean as the only way to be stopped + * is in calls only allowed in the callback context, the only calls + * checking it are also in calls only allowed in callback context. + */ + volatile bool readingStopped; }; AsynchIO::AsynchIO(const Socket& s, @@ -323,7 +330,8 @@ AsynchIO::AsynchIO(const Socket& s, idleCallback(iCb), socket(s), queuedClose(false), - writePending(false) { + writePending(false), + readingStopped(false) { s.setNonblocking(); } @@ -351,8 +359,11 @@ void AsynchIO::queueReadBuffer(BufferBase* buff) { assert(buff); buff->dataStart = 0; buff->dataCount = 0; + + bool queueWasEmpty = bufferQueue.empty(); bufferQueue.push_back(buff); - DispatchHandle::rewatchRead(); + if (queueWasEmpty && !readingStopped) + DispatchHandle::rewatchRead(); } void AsynchIO::unread(BufferBase* buff) { @@ -361,8 +372,11 @@ void AsynchIO::unread(BufferBase* buff) { memmove(buff->bytes, buff->bytes+buff->dataStart, buff->dataCount); buff->dataStart = 0; } + + bool queueWasEmpty = bufferQueue.empty(); bufferQueue.push_front(buff); - DispatchHandle::rewatchRead(); + if (queueWasEmpty && !readingStopped) + DispatchHandle::rewatchRead(); } void AsynchIO::queueWrite(BufferBase* buff) { @@ -378,6 +392,7 @@ void AsynchIO::queueWrite(BufferBase* buff) { DispatchHandle::rewatchWrite(); } +// This can happen outside the callback context void AsynchIO::notifyPendingWrite() { writePending = true; DispatchHandle::rewatchWrite(); @@ -392,11 +407,14 @@ bool AsynchIO::writeQueueEmpty() { return writeQueue.empty(); } +// This can happen outside the callback context void AsynchIO::startReading() { + readingStopped = false; DispatchHandle::rewatchRead(); } void AsynchIO::stopReading() { + readingStopped = true; DispatchHandle::unwatchRead(); } |