diff options
-rw-r--r-- | qpid/cpp/src/qpid/cluster/Connection.cpp | 1 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/cluster/ConnectionMap.cpp | 1 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/sys/AsynchIO.h | 1 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/sys/AsynchIOHandler.cpp | 4 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp | 8 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/sys/windows/AsynchIO.cpp | 10 |
6 files changed, 22 insertions, 3 deletions
diff --git a/qpid/cpp/src/qpid/cluster/Connection.cpp b/qpid/cpp/src/qpid/cluster/Connection.cpp index d97a1b6c68..4a13d24499 100644 --- a/qpid/cpp/src/qpid/cluster/Connection.cpp +++ b/qpid/cpp/src/qpid/cluster/Connection.cpp @@ -210,6 +210,7 @@ size_t Connection::decode(const char* buffer, size_t size) { remainingSize = size - pi.encodedSize(); } else { QPID_LOG(debug, "Not enough data for protocol header on outgoing clustered link"); + giveReadCredit(1); // We're not going to mcast so give read credit now. return 0; } } diff --git a/qpid/cpp/src/qpid/cluster/ConnectionMap.cpp b/qpid/cpp/src/qpid/cluster/ConnectionMap.cpp index 064e3cd252..b412bb13cc 100644 --- a/qpid/cpp/src/qpid/cluster/ConnectionMap.cpp +++ b/qpid/cpp/src/qpid/cluster/ConnectionMap.cpp @@ -66,6 +66,7 @@ ConnectionMap::ConnectionPtr ConnectionMap::get(const ConnectionId& id) { ConnectionMap::ConnectionPtr ConnectionMap::getLocal(const ConnectionId& id) { if (id.getMember() != cluster.getId()) return 0; Map::const_iterator i = map.find(id); + assert(i != map.end()); // FIXME aconway 2009-02-11: remove or exception. return i == map.end() ? 0 : i->second; } diff --git a/qpid/cpp/src/qpid/sys/AsynchIO.h b/qpid/cpp/src/qpid/sys/AsynchIO.h index 6507589f2c..ffd4436c2a 100644 --- a/qpid/cpp/src/qpid/sys/AsynchIO.h +++ b/qpid/cpp/src/qpid/sys/AsynchIO.h @@ -139,6 +139,7 @@ public: virtual void queueWriteClose() = 0; virtual bool writeQueueEmpty() = 0; virtual void startReading() = 0; + virtual void stopReading() = 0; virtual void requestCallback(RequestCallback) = 0; virtual BufferBase* getQueuedBuffer() = 0; diff --git a/qpid/cpp/src/qpid/sys/AsynchIOHandler.cpp b/qpid/cpp/src/qpid/sys/AsynchIOHandler.cpp index 35b75c1fe8..3bc05e4bf9 100644 --- a/qpid/cpp/src/qpid/sys/AsynchIOHandler.cpp +++ b/qpid/cpp/src/qpid/sys/AsynchIOHandler.cpp @@ -145,8 +145,10 @@ bool AsynchIOHandler::readbuff(AsynchIO& , AsynchIO::BufferBase* buff) { // Lock and retest credit to make sure we don't race with increasing credit ScopedLock<Mutex> l(creditLock); assert(readCredit.get() >= 0); - if (readCredit.get() == 0) + if (readCredit.get() == 0) { + aio->stopReading(); return false; + } } } return true; diff --git a/qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp b/qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp index 8b87039dc8..a356a72650 100644 --- a/qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp +++ b/qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp @@ -267,6 +267,7 @@ public: virtual void queueWriteClose(); virtual bool writeQueueEmpty(); virtual void startReading(); + virtual void stopReading(); virtual void requestCallback(RequestCallback); virtual BufferBase* getQueuedBuffer(); @@ -389,6 +390,10 @@ void AsynchIO::startReading() { DispatchHandle::rewatchRead(); } +void AsynchIO::stopReading() { + DispatchHandle::unwatchRead(); +} + void AsynchIO::requestCallback(RequestCallback callback) { // TODO creating a function object every time isn't all that // efficient - if this becomes heavily used do something better (what?) @@ -439,8 +444,7 @@ void AsynchIO::readable(DispatchHandle& h) { readTotal += rc; if (!readCallback(*this, buff)) { - // We were told to flow control reading at this point - h.unwatchRead(); + // We have been flow controlled. break; } diff --git a/qpid/cpp/src/qpid/sys/windows/AsynchIO.cpp b/qpid/cpp/src/qpid/sys/windows/AsynchIO.cpp index 5ae9a4bfef..37d87947a2 100644 --- a/qpid/cpp/src/qpid/sys/windows/AsynchIO.cpp +++ b/qpid/cpp/src/qpid/sys/windows/AsynchIO.cpp @@ -286,6 +286,7 @@ public: virtual void queueWriteClose(); virtual bool writeQueueEmpty(); virtual void startReading(); + virtual void stopReading(); virtual void requestCallback(RequestCallback); /** @@ -534,6 +535,15 @@ void AsynchIO::startReading() { return; } +// stopReading was added to prevent a race condition with read-credit on Linux. +// It may or may not be required on windows. +// +// AsynchIOHandler::readbuff() calls stopReading() inside the same +// critical section that protects startReading() in +// AsynchIOHandler::giveReadCredit(). +// +void AsynchIO::stopReading() {} + // Queue the specified callback for invocation from an I/O thread. void AsynchIO::requestCallback(RequestCallback callback) { // This method is generally called from a processing thread; transfer |