From 691b0b1849413e576d105803fd93adb2457796ac Mon Sep 17 00:00:00 2001 From: "Stephen D. Huston" Date: Sat, 5 May 2012 20:23:48 +0000 Subject: Merge in QPID-3759 from trunk and fixes QPID-3982. git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/0.8-release-candidates@1334484 13f79535-47bb-0310-9956-ffa450edef68 --- qpid/cpp/src/qpid/sys/windows/AsynchIO.cpp | 158 +++++++++++++++++---------- qpid/cpp/src/qpid/sys/windows/IocpPoller.cpp | 5 +- 2 files changed, 103 insertions(+), 60 deletions(-) diff --git a/qpid/cpp/src/qpid/sys/windows/AsynchIO.cpp b/qpid/cpp/src/qpid/sys/windows/AsynchIO.cpp index c55d34e875..8f835e3f8c 100644 --- a/qpid/cpp/src/qpid/sys/windows/AsynchIO.cpp +++ b/qpid/cpp/src/qpid/sys/windows/AsynchIO.cpp @@ -46,16 +46,13 @@ namespace { /* * The function pointers for AcceptEx and ConnectEx need to be looked up - * at run time. Make sure this is done only once. + * at run time. */ -boost::once_flag lookUpAcceptExOnce = BOOST_ONCE_INIT; -LPFN_ACCEPTEX fnAcceptEx = 0; -typedef void (*lookUpFunc)(const qpid::sys::Socket &); - -void lookUpAcceptEx() { - SOCKET h = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP); +const LPFN_ACCEPTEX lookUpAcceptEx(const qpid::sys::Socket& s) { + SOCKET h = toSocketHandle(s); GUID guidAcceptEx = WSAID_ACCEPTEX; DWORD dwBytes = 0; + LPFN_ACCEPTEX fnAcceptEx; WSAIoctl(h, SIO_GET_EXTENSION_FUNCTION_POINTER, &guidAcceptEx, @@ -65,9 +62,9 @@ void lookUpAcceptEx() { &dwBytes, NULL, NULL); - closesocket(h); if (fnAcceptEx == 0) throw qpid::Exception(QPID_MSG("Failed to look up AcceptEx")); + return fnAcceptEx; } } @@ -94,18 +91,15 @@ private: AsynchAcceptor::Callback acceptedCallback; const Socket& socket; + const LPFN_ACCEPTEX fnAcceptEx; }; AsynchAcceptor::AsynchAcceptor(const Socket& s, Callback callback) : acceptedCallback(callback), - socket(s) { + socket(s), + fnAcceptEx(lookUpAcceptEx(s)) { s.setNonblocking(); -#if (BOOST_VERSION >= 103500) /* boost 1.35 or later reversed the args */ - boost::call_once(lookUpAcceptExOnce, lookUpAcceptEx); -#else - boost::call_once(lookUpAcceptEx, lookUpAcceptExOnce); -#endif } AsynchAcceptor::~AsynchAcceptor() @@ -114,7 +108,8 @@ AsynchAcceptor::~AsynchAcceptor() } void AsynchAcceptor::start(Poller::shared_ptr poller) { - poller->monitorHandle(PollerHandle(socket), Poller::INPUT); + PollerHandle ph = PollerHandle(socket); + poller->monitorHandle(ph, Poller::INPUT); restart (); } @@ -124,14 +119,14 @@ void AsynchAcceptor::restart(void) { this, toSocketHandle(socket)); BOOL status; - status = ::fnAcceptEx(toSocketHandle(socket), - toSocketHandle(*result->newSocket), - result->addressBuffer, - 0, - AsynchAcceptResult::SOCKADDRMAXLEN, - AsynchAcceptResult::SOCKADDRMAXLEN, - &bytesReceived, - result->overlapped()); + status = fnAcceptEx(toSocketHandle(socket), + toSocketHandle(*result->newSocket), + result->addressBuffer, + 0, + AsynchAcceptResult::SOCKADDRMAXLEN, + AsynchAcceptResult::SOCKADDRMAXLEN, + &bytesReceived, + result->overlapped()); QPID_WINDOWS_CHECK_ASYNC_START(status); } @@ -154,7 +149,7 @@ void AsynchAcceptResult::success(size_t /*bytesTransferred*/) { delete this; } -void AsynchAcceptResult::failure(int status) { +void AsynchAcceptResult::failure(int /*status*/) { //if (status != WSA_OPERATION_ABORTED) // Can there be anything else? ; delete this; @@ -177,7 +172,7 @@ private: public: AsynchConnector(const Socket& socket, - std::string hostname, + std::string& hostname, uint16_t port, ConnectedCallback connCb, FailedCallback failCb = 0); @@ -185,7 +180,7 @@ public: }; AsynchConnector::AsynchConnector(const Socket& sock, - std::string hname, + std::string& hname, uint16_t p, ConnectedCallback connCb, FailedCallback failCb) : @@ -294,6 +289,8 @@ private: volatile LONG opsInProgress; // Is there a write in progress? volatile bool writeInProgress; + // Or a read? + volatile bool readInProgress; // Deletion requested, but there are callbacks in progress. volatile bool queuedDelete; // Socket close requested, but there are operations in progress. @@ -347,6 +344,11 @@ private: * Called when there's a completion to process. */ void completion(AsynchIoResult *result); + + /** + * Helper function to facilitate the close operation + */ + void cancelRead(); }; // This is used to encapsulate pure callbacks into a handle @@ -375,6 +377,7 @@ AsynchIO::AsynchIO(const Socket& s, socket(s), opsInProgress(0), writeInProgress(false), + readInProgress(false), queuedDelete(false), queuedClose(false), working(false) { @@ -392,26 +395,30 @@ AsynchIO::~AsynchIO() { } void AsynchIO::queueForDeletion() { - queuedDelete = true; - if (opsInProgress > 0) { - QPID_LOG(info, "Delete AsynchIO queued; ops in progress"); - // AsynchIOHandler calls this then deletes itself; don't do any more - // callbacks. - readCallback = 0; - eofCallback = 0; - disCallback = 0; - closedCallback = 0; - emptyCallback = 0; - idleCallback = 0; - } - else { - delete this; + { + ScopedLock l(completionLock); + assert(!queuedDelete); + queuedDelete = true; + if (working || opsInProgress > 0) { + QPID_LOG(info, "Delete AsynchIO queued; ops in progress"); + // AsynchIOHandler calls this then deletes itself; don't do any more + // callbacks. + readCallback = 0; + eofCallback = 0; + disCallback = 0; + closedCallback = 0; + emptyCallback = 0; + idleCallback = 0; + return; + } } + delete this; } void AsynchIO::start(Poller::shared_ptr poller0) { + PollerHandle ph = PollerHandle(socket); poller = poller0; - poller->monitorHandle(PollerHandle(socket), Poller::INPUT); + poller->monitorHandle(ph, Poller::INPUT); if (writeQueue.size() > 0) // Already have data queued for write notifyPendingWrite(); startReading(); @@ -453,9 +460,14 @@ void AsynchIO::notifyPendingWrite() { } void AsynchIO::queueWriteClose() { - queuedClose = true; - if (!writeInProgress) - notifyPendingWrite(); + { + ScopedLock l(completionLock); + queuedClose = true; + if (working || writeInProgress) + // no need to summon an IO thread + return; + } + notifyPendingWrite(); } bool AsynchIO::writeQueueEmpty() { @@ -468,7 +480,7 @@ bool AsynchIO::writeQueueEmpty() { * called when the read is complete and data is available. */ void AsynchIO::startReading() { - if (queuedDelete) + if (queuedDelete || queuedClose) return; // (Try to) get a buffer; look on the front since there may be an @@ -491,6 +503,7 @@ void AsynchIO::startReading() { readCount); DWORD bytesReceived = 0, flags = 0; InterlockedIncrement(&opsInProgress); + readInProgress = true; int status = WSARecv(toSocketHandle(socket), const_cast(result->getWSABUF()), 1, &bytesReceived, @@ -584,7 +597,6 @@ void AsynchIO::notifyIdle(void) { void AsynchIO::startWrite(AsynchIO::BufferBase* buff) { writeInProgress = true; InterlockedIncrement(&opsInProgress); - int writeCount = buff->byteCount-buff->dataCount; AsynchWriteResult *result = new AsynchWriteResult(boost::bind(&AsynchIO::completion, this, _1), buff, @@ -619,21 +631,24 @@ void AsynchIO::close(void) { void AsynchIO::readComplete(AsynchReadResult *result) { int status = result->getStatus(); size_t bytes = result->getTransferred(); + readInProgress = false; if (status == 0 && bytes > 0) { - bool restartRead = true; // May not if receiver doesn't want more if (readCallback) readCallback(*this, result->getBuff()); - if (restartRead) - startReading(); + startReading(); } else { // No data read, so put the buffer back. It may be partially filled, // so "unread" it back to the front of the queue. unread(result->getBuff()); - if (status == 0) - notifyEof(); - else + if (queuedClose) { + return; // Expected from cancelRead() + } + notifyEof(); + if (status != 0) + { notifyDisconnect(); + } } } @@ -684,6 +699,8 @@ void AsynchIO::writeComplete(AsynchWriteResult *result) { } void AsynchIO::completion(AsynchIoResult *result) { + bool closing = false; + bool deleting = false; { ScopedLock l(completionLock); if (working) { @@ -715,6 +732,8 @@ void AsynchIO::completion(AsynchIoResult *result) { delete result; result = 0; InterlockedDecrement(&opsInProgress); + if (queuedClose && opsInProgress == 1 && readInProgress) + cancelRead(); } // Lock is held again. if (completionQueue.empty()) @@ -723,17 +742,40 @@ void AsynchIO::completion(AsynchIoResult *result) { completionQueue.pop(); } working = false; + if (opsInProgress == 0) { + closing = queuedClose; + deleting = queuedDelete; + } } // Lock released; ok to close if ops are done and close requested. // Layer above will call back to queueForDeletion() if it hasn't // already been done. If it already has, go ahead and delete. - if (opsInProgress == 0) { - if (queuedClose) - // close() may cause a delete; don't trust 'this' on return - close(); - else if (queuedDelete) - delete this; + if (deleting) + delete this; + else if (closing) + // close() may cause a delete; don't trust 'this' on return + close(); +} + +/* + * NOTE - this method must be called in the same context as other completions, + * so that the resulting readComplete, and final AsynchIO::close() is serialized + * after this method returns. + */ +void AsynchIO::cancelRead() { + if (queuedDelete) + return; // socket already deleted + else { + ScopedLock l(completionLock);; + if (!completionQueue.empty()) + return; // process it; come back later if necessary } + // Cancel outstanding read and force to completion. Otherwise, on a faulty + // physical link, the pending read can remain uncompleted indefinitely. + // Draining the pending read will result in the official close (and + // notifyClosed). CancelIoEX() is the natural choice, but not available in + // XP, so we make do with closesocket(). + socket.close(); } } // namespace windows diff --git a/qpid/cpp/src/qpid/sys/windows/IocpPoller.cpp b/qpid/cpp/src/qpid/sys/windows/IocpPoller.cpp index d326ab02ac..4c0b420eee 100755 --- a/qpid/cpp/src/qpid/sys/windows/IocpPoller.cpp +++ b/qpid/cpp/src/qpid/sys/windows/IocpPoller.cpp @@ -96,6 +96,7 @@ void Poller::shutdown() { // Allow sloppy code to shut us down more than once. if (impl->isShutdown) return; + impl->isShutdown = true; ULONG_PTR key = 1; // Tell wait() it's a shutdown, not I/O PostQueuedCompletionStatus(impl->iocp, 0, key, 0); } @@ -110,7 +111,7 @@ bool Poller::interrupt(PollerHandle&) { } void Poller::run() { - do { + while (!impl->isShutdown) { Poller::Event event = this->wait(); // Handle shutdown @@ -124,7 +125,7 @@ void Poller::run() { // This should be impossible assert(false); } - } while (true); + } } void Poller::monitorHandle(PollerHandle& handle, Direction dir) { -- cgit v1.2.1