diff options
Diffstat (limited to 'qpid/cpp/src/qpid/sys/windows/AsynchIO.cpp')
-rw-r--r-- | qpid/cpp/src/qpid/sys/windows/AsynchIO.cpp | 113 |
1 files changed, 71 insertions, 42 deletions
diff --git a/qpid/cpp/src/qpid/sys/windows/AsynchIO.cpp b/qpid/cpp/src/qpid/sys/windows/AsynchIO.cpp index fb8df5ddf8..ae53414e52 100644 --- a/qpid/cpp/src/qpid/sys/windows/AsynchIO.cpp +++ b/qpid/cpp/src/qpid/sys/windows/AsynchIO.cpp @@ -291,12 +291,12 @@ private: volatile LONG opsInProgress; // Is there a write in progress? volatile bool writeInProgress; + // Or a read? + volatile bool readInProgress; // Deletion requested, but there are callbacks in progress. volatile bool queuedDelete; // Socket close requested, but there are operations in progress. volatile bool queuedClose; - // Most recent asynch read request - volatile AsynchReadResult* pendingRead; private: // Dispatch events that have completed. @@ -346,6 +346,11 @@ private: * Called when there's a completion to process. */ void completion(AsynchIoResult *result); + + /** + * Helper function to facilitate the close operation + */ + void cancelRead(); }; // This is used to encapsulate pure callbacks into a handle @@ -374,9 +379,9 @@ AsynchIO::AsynchIO(const Socket& s, socket(s), opsInProgress(0), writeInProgress(false), + readInProgress(false), queuedDelete(false), queuedClose(false), - pendingRead(0), working(false) { } @@ -392,21 +397,24 @@ AsynchIO::~AsynchIO() { } void AsynchIO::queueForDeletion() { - queuedDelete = true; - if (opsInProgress > 0) { - QPID_LOG(info, "Delete AsynchIO queued; ops in progress"); - // AsynchIOHandler calls this then deletes itself; don't do any more - // callbacks. - readCallback = 0; - eofCallback = 0; - disCallback = 0; - closedCallback = 0; - emptyCallback = 0; - idleCallback = 0; - } - else { - delete this; + { + ScopedLock<Mutex> l(completionLock); + assert(!queuedDelete); + queuedDelete = true; + if (working || opsInProgress > 0) { + QPID_LOG(info, "Delete AsynchIO queued; ops in progress"); + // AsynchIOHandler calls this then deletes itself; don't do any more + // callbacks. + readCallback = 0; + eofCallback = 0; + disCallback = 0; + closedCallback = 0; + emptyCallback = 0; + idleCallback = 0; + return; + } } + delete this; } void AsynchIO::start(Poller::shared_ptr poller0) { @@ -454,9 +462,14 @@ void AsynchIO::notifyPendingWrite() { } void AsynchIO::queueWriteClose() { - queuedClose = true; - if (!writeInProgress) - notifyPendingWrite(); + { + ScopedLock<Mutex> l(completionLock); + queuedClose = true; + if (working || writeInProgress) + // no need to summon an IO thread + return; + } + notifyPendingWrite(); } bool AsynchIO::writeQueueEmpty() { @@ -469,7 +482,7 @@ bool AsynchIO::writeQueueEmpty() { * called when the read is complete and data is available. */ void AsynchIO::startReading() { - if (queuedDelete) + if (queuedDelete || queuedClose) return; // (Try to) get a buffer; look on the front since there may be an @@ -492,6 +505,7 @@ void AsynchIO::startReading() { readCount); DWORD bytesReceived = 0, flags = 0; InterlockedIncrement(&opsInProgress); + readInProgress = true; int status = WSARecv(toSocketHandle(socket), const_cast<LPWSABUF>(result->getWSABUF()), 1, &bytesReceived, @@ -507,7 +521,6 @@ void AsynchIO::startReading() { } } // On status 0 or WSA_IO_PENDING, completion will handle the rest. - pendingRead = result; } else { notifyBuffersEmpty(); @@ -620,6 +633,7 @@ void AsynchIO::close(void) { void AsynchIO::readComplete(AsynchReadResult *result) { int status = result->getStatus(); size_t bytes = result->getTransferred(); + readInProgress = false; if (status == 0 && bytes > 0) { if (readCallback) readCallback(*this, result->getBuff()); @@ -629,8 +643,8 @@ void AsynchIO::readComplete(AsynchReadResult *result) { // No data read, so put the buffer back. It may be partially filled, // so "unread" it back to the front of the queue. unread(result->getBuff()); - if (queuedClose && status == ERROR_OPERATION_ABORTED) { - return; // Expected reap from CancelIoEx + if (queuedClose) { + return; // Expected from cancelRead() } notifyEof(); if (status != 0) @@ -687,6 +701,8 @@ void AsynchIO::writeComplete(AsynchWriteResult *result) { } void AsynchIO::completion(AsynchIoResult *result) { + bool closing = false; + bool deleting = false; { ScopedLock<Mutex> l(completionLock); if (working) { @@ -702,11 +718,8 @@ void AsynchIO::completion(AsynchIoResult *result) { { ScopedUnlock<Mutex> ul(completionLock); AsynchReadResult *r = dynamic_cast<AsynchReadResult*>(result); - if (r != 0) { + if (r != 0) readComplete(r); - // Set pendingRead to 0 if it's still pointing to (newly completed) r - InterlockedCompareExchangePointer((void * volatile *)&pendingRead, 0, r); - } else { AsynchWriteResult *w = dynamic_cast<AsynchWriteResult*>(result); @@ -721,6 +734,8 @@ void AsynchIO::completion(AsynchIoResult *result) { delete result; result = 0; InterlockedDecrement(&opsInProgress); + if (queuedClose && opsInProgress == 1 && readInProgress) + cancelRead(); } // Lock is held again. if (completionQueue.empty()) @@ -729,26 +744,40 @@ void AsynchIO::completion(AsynchIoResult *result) { completionQueue.pop(); } working = false; + if (opsInProgress == 0) { + closing = queuedClose; + deleting = queuedDelete; + } } // Lock released; ok to close if ops are done and close requested. // Layer above will call back to queueForDeletion() if it hasn't // already been done. If it already has, go ahead and delete. - if (opsInProgress == 0) { - if (queuedClose) - // close() may cause a delete; don't trust 'this' on return - close(); - else if (queuedDelete) - delete this; - } + if (deleting) + delete this; + else if (closing) + // close() may cause a delete; don't trust 'this' on return + close(); +} + +/* + * NOTE - this method must be called in the same context as other completions, + * so that the resulting readComplete, and final AsynchIO::close() is serialized + * after this method returns. + */ +void AsynchIO::cancelRead() { + if (queuedDelete) + return; // socket already deleted else { - if (queuedClose && pendingRead) { - // Force outstanding read to completion. Layer above will - // call back. - CancelIoEx((HANDLE)toSocketHandle(socket), - ((AsynchReadResult *)pendingRead)->overlapped()); - pendingRead = 0; - } + ScopedLock<Mutex> l(completionLock);; + if (!completionQueue.empty()) + return; // process it; come back later if necessary } + // Cancel outstanding read and force to completion. Otherwise, on a faulty + // physical link, the pending read can remain uncompleted indefinitely. + // Draining the pending read will result in the official close (and + // notifyClosed). CancelIoEX() is the natural choice, but not available in + // XP, so we make do with closesocket(). + socket.close(); } } // namespace windows |