diff options
Diffstat (limited to 'qpid/cpp/src/qpid/sys/windows/AsynchIO.cpp')
-rw-r--r-- | qpid/cpp/src/qpid/sys/windows/AsynchIO.cpp | 25 |
1 files changed, 21 insertions, 4 deletions
diff --git a/qpid/cpp/src/qpid/sys/windows/AsynchIO.cpp b/qpid/cpp/src/qpid/sys/windows/AsynchIO.cpp index 30378d4c5f..fb8df5ddf8 100644 --- a/qpid/cpp/src/qpid/sys/windows/AsynchIO.cpp +++ b/qpid/cpp/src/qpid/sys/windows/AsynchIO.cpp @@ -295,6 +295,8 @@ private: volatile bool queuedDelete; // Socket close requested, but there are operations in progress. volatile bool queuedClose; + // Most recent asynch read request + volatile AsynchReadResult* pendingRead; private: // Dispatch events that have completed. @@ -374,6 +376,7 @@ AsynchIO::AsynchIO(const Socket& s, writeInProgress(false), queuedDelete(false), queuedClose(false), + pendingRead(0), working(false) { } @@ -504,6 +507,7 @@ void AsynchIO::startReading() { } } // On status 0 or WSA_IO_PENDING, completion will handle the rest. + pendingRead = result; } else { notifyBuffersEmpty(); @@ -617,16 +621,17 @@ void AsynchIO::readComplete(AsynchReadResult *result) { int status = result->getStatus(); size_t bytes = result->getTransferred(); if (status == 0 && bytes > 0) { - bool restartRead = true; // May not if receiver doesn't want more if (readCallback) readCallback(*this, result->getBuff()); - if (restartRead) - startReading(); + startReading(); } else { // No data read, so put the buffer back. It may be partially filled, // so "unread" it back to the front of the queue. unread(result->getBuff()); + if (queuedClose && status == ERROR_OPERATION_ABORTED) { + return; // Expected reap from CancelIoEx + } notifyEof(); if (status != 0) { @@ -697,8 +702,11 @@ void AsynchIO::completion(AsynchIoResult *result) { { ScopedUnlock<Mutex> ul(completionLock); AsynchReadResult *r = dynamic_cast<AsynchReadResult*>(result); - if (r != 0) + if (r != 0) { readComplete(r); + // Set pendingRead to 0 if it's still pointing to (newly completed) r + InterlockedCompareExchangePointer((void * volatile *)&pendingRead, 0, r); + } else { AsynchWriteResult *w = dynamic_cast<AsynchWriteResult*>(result); @@ -732,6 +740,15 @@ void AsynchIO::completion(AsynchIoResult *result) { else if (queuedDelete) delete this; } + else { + if (queuedClose && pendingRead) { + // Force outstanding read to completion. Layer above will + // call back. + CancelIoEx((HANDLE)toSocketHandle(socket), + ((AsynchReadResult *)pendingRead)->overlapped()); + pendingRead = 0; + } + } } } // namespace windows |