diff options
Diffstat (limited to 'cpp/src/qpid/sys/windows/AsynchIO.cpp')
-rw-r--r-- | cpp/src/qpid/sys/windows/AsynchIO.cpp | 755 |
1 files changed, 0 insertions, 755 deletions
diff --git a/cpp/src/qpid/sys/windows/AsynchIO.cpp b/cpp/src/qpid/sys/windows/AsynchIO.cpp deleted file mode 100644 index 71138757a5..0000000000 --- a/cpp/src/qpid/sys/windows/AsynchIO.cpp +++ /dev/null @@ -1,755 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include "qpid/sys/windows/AsynchIoResult.h" -#include "qpid/sys/windows/IoHandlePrivate.h" -#include "qpid/sys/AsynchIO.h" -#include "qpid/sys/Mutex.h" -#include "qpid/sys/Socket.h" -#include "qpid/sys/Poller.h" -#include "qpid/sys/Thread.h" -#include "qpid/sys/Time.h" -#include "qpid/log/Statement.h" - -#include "qpid/sys/windows/check.h" -#include "qpid/sys/windows/mingw32_compat.h" - -#include <boost/thread/once.hpp> - -#include <queue> -#include <winsock2.h> -#include <mswsock.h> -#include <windows.h> - -#include <boost/bind.hpp> - -namespace { - - typedef qpid::sys::ScopedLock<qpid::sys::Mutex> QLock; - -/* - * The function pointers for AcceptEx and ConnectEx need to be looked up - * at run time. Make sure this is done only once. - */ -boost::once_flag lookUpAcceptExOnce = BOOST_ONCE_INIT; -LPFN_ACCEPTEX fnAcceptEx = 0; -typedef void (*lookUpFunc)(const qpid::sys::Socket &); - -void lookUpAcceptEx() { - SOCKET h = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP); - GUID guidAcceptEx = WSAID_ACCEPTEX; - DWORD dwBytes = 0; - WSAIoctl(h, - SIO_GET_EXTENSION_FUNCTION_POINTER, - &guidAcceptEx, - sizeof(guidAcceptEx), - &fnAcceptEx, - sizeof(fnAcceptEx), - &dwBytes, - NULL, - NULL); - closesocket(h); - if (fnAcceptEx == 0) - throw qpid::Exception(QPID_MSG("Failed to look up AcceptEx")); -} - -} - -namespace qpid { -namespace sys { -namespace windows { - -/* - * Asynch Acceptor - * - */ -class AsynchAcceptor : public qpid::sys::AsynchAcceptor { - - friend class AsynchAcceptResult; - -public: - AsynchAcceptor(const Socket& s, AsynchAcceptor::Callback callback); - ~AsynchAcceptor(); - void start(Poller::shared_ptr poller); - -private: - void restart(void); - - AsynchAcceptor::Callback acceptedCallback; - const Socket& socket; -}; - -AsynchAcceptor::AsynchAcceptor(const Socket& s, Callback callback) - : acceptedCallback(callback), - socket(s) { - - s.setNonblocking(); -#if (BOOST_VERSION >= 103500) /* boost 1.35 or later reversed the args */ - boost::call_once(lookUpAcceptExOnce, lookUpAcceptEx); -#else - boost::call_once(lookUpAcceptEx, lookUpAcceptExOnce); -#endif -} - -AsynchAcceptor::~AsynchAcceptor() -{ - socket.close(); -} - -void AsynchAcceptor::start(Poller::shared_ptr poller) { - PollerHandle ph = PollerHandle(socket); - poller->monitorHandle(ph, Poller::INPUT); - restart (); -} - -void AsynchAcceptor::restart(void) { - DWORD bytesReceived = 0; // Not used, needed for AcceptEx API - AsynchAcceptResult *result = new AsynchAcceptResult(acceptedCallback, - this, - toSocketHandle(socket)); - BOOL status; - status = ::fnAcceptEx(toSocketHandle(socket), - toSocketHandle(*result->newSocket), - result->addressBuffer, - 0, - AsynchAcceptResult::SOCKADDRMAXLEN, - AsynchAcceptResult::SOCKADDRMAXLEN, - &bytesReceived, - result->overlapped()); - QPID_WINDOWS_CHECK_ASYNC_START(status); -} - - -AsynchAcceptResult::AsynchAcceptResult(AsynchAcceptor::Callback cb, - AsynchAcceptor *acceptor, - SOCKET listener) - : callback(cb), acceptor(acceptor), listener(listener) { - newSocket.reset (new Socket()); -} - -void AsynchAcceptResult::success(size_t /*bytesTransferred*/) { - ::setsockopt (toSocketHandle(*newSocket), - SOL_SOCKET, - SO_UPDATE_ACCEPT_CONTEXT, - (char*)&listener, - sizeof (listener)); - callback(*(newSocket.release())); - acceptor->restart (); - delete this; -} - -void AsynchAcceptResult::failure(int /*status*/) { - //if (status != WSA_OPERATION_ABORTED) - // Can there be anything else? ; - delete this; -} - -/* - * AsynchConnector does synchronous connects for now... to do asynch the - * IocpPoller will need some extension to register an event handle as a - * CONNECT-type "direction", the connect completion/result will need an - * event handle to associate with the connecting handle. But there's no - * time for that right now... - */ -class AsynchConnector : public qpid::sys::AsynchConnector { -private: - ConnectedCallback connCallback; - FailedCallback failCallback; - const Socket& socket; - const std::string hostname; - const uint16_t port; - -public: - AsynchConnector(const Socket& socket, - std::string hostname, - uint16_t port, - ConnectedCallback connCb, - FailedCallback failCb = 0); - void start(Poller::shared_ptr poller); -}; - -AsynchConnector::AsynchConnector(const Socket& sock, - std::string hname, - uint16_t p, - ConnectedCallback connCb, - FailedCallback failCb) : - connCallback(connCb), failCallback(failCb), socket(sock), - hostname(hname), port(p) -{ -} - -void AsynchConnector::start(Poller::shared_ptr) -{ - try { - socket.connect(hostname, port); - socket.setNonblocking(); - connCallback(socket); - } catch(std::exception& e) { - if (failCallback) - failCallback(socket, -1, std::string(e.what())); - socket.close(); - } -} - -} // namespace windows - -AsynchAcceptor* AsynchAcceptor::create(const Socket& s, - Callback callback) -{ - return new windows::AsynchAcceptor(s, callback); -} - -AsynchConnector* qpid::sys::AsynchConnector::create(const Socket& s, - std::string hostname, - uint16_t port, - ConnectedCallback connCb, - FailedCallback failCb) -{ - return new windows::AsynchConnector(s, - hostname, - port, - connCb, - failCb); -} - - -/* - * Asynch reader/writer - */ - -namespace windows { - -class AsynchIO : public qpid::sys::AsynchIO { -public: - AsynchIO(const Socket& s, - ReadCallback rCb, - EofCallback eofCb, - DisconnectCallback disCb, - ClosedCallback cCb = 0, - BuffersEmptyCallback eCb = 0, - IdleCallback iCb = 0); - ~AsynchIO(); - - // Methods inherited from qpid::sys::AsynchIO - - /** - * Notify the object is should delete itself as soon as possible. - */ - virtual void queueForDeletion(); - - /// Take any actions needed to prepare for working with the poller. - virtual void start(Poller::shared_ptr poller); - virtual void queueReadBuffer(BufferBase* buff); - virtual void unread(BufferBase* buff); - virtual void queueWrite(BufferBase* buff); - virtual void notifyPendingWrite(); - virtual void queueWriteClose(); - virtual bool writeQueueEmpty(); - virtual void startReading(); - virtual void stopReading(); - virtual void requestCallback(RequestCallback); - - /** - * getQueuedBuffer returns a buffer from the buffer queue, if one is - * available. - * - * @retval Pointer to BufferBase buffer; 0 if none is available. - */ - virtual BufferBase* getQueuedBuffer(); - -private: - ReadCallback readCallback; - EofCallback eofCallback; - DisconnectCallback disCallback; - ClosedCallback closedCallback; - BuffersEmptyCallback emptyCallback; - IdleCallback idleCallback; - const Socket& socket; - Poller::shared_ptr poller; - - std::deque<BufferBase*> bufferQueue; - std::deque<BufferBase*> writeQueue; - /* The MSVC-supplied deque is not thread-safe; keep locks to serialize - * access to the buffer queue and write queue. - */ - Mutex bufferQueueLock; - - // Number of outstanding I/O operations. - volatile LONG opsInProgress; - // Is there a write in progress? - volatile bool writeInProgress; - // Deletion requested, but there are callbacks in progress. - volatile bool queuedDelete; - // Socket close requested, but there are operations in progress. - volatile bool queuedClose; - -private: - // Dispatch events that have completed. - void notifyEof(void); - void notifyDisconnect(void); - void notifyClosed(void); - void notifyBuffersEmpty(void); - void notifyIdle(void); - - /** - * Initiate a write of the specified buffer. There's no callback for - * write completion to the AsynchIO object. - */ - void startWrite(AsynchIO::BufferBase* buff); - - void close(void); - - /** - * readComplete is called when a read request is complete. - * - * @param result Results of the operation. - */ - void readComplete(AsynchReadResult *result); - - /** - * writeComplete is called when a write request is complete. - * - * @param result Results of the operation. - */ - void writeComplete(AsynchWriteResult *result); - - /** - * Queue of completions to run. This queue enforces the requirement - * from upper layers that only one thread at a time is allowed to act - * on any given connection. Once a thread is busy processing a completion - * on this object, other threads that dispatch completions queue the - * completions here for the in-progress thread to handle when done. - * Thus, any threads can dispatch a completion from the IocpPoller, but - * this class ensures that actual processing at the connection level is - * only on one thread at a time. - */ - std::queue<AsynchIoResult *> completionQueue; - volatile bool working; - Mutex completionLock; - - /** - * Called when there's a completion to process. - */ - void completion(AsynchIoResult *result); -}; - -// This is used to encapsulate pure callbacks into a handle -class CallbackHandle : public IOHandle { -public: - CallbackHandle(AsynchIoResult::Completer completeCb, - AsynchIO::RequestCallback reqCb = 0) : - IOHandle(new IOHandlePrivate (INVALID_SOCKET, completeCb, reqCb)) - {} -}; - -AsynchIO::AsynchIO(const Socket& s, - ReadCallback rCb, - EofCallback eofCb, - DisconnectCallback disCb, - ClosedCallback cCb, - BuffersEmptyCallback eCb, - IdleCallback iCb) : - - readCallback(rCb), - eofCallback(eofCb), - disCallback(disCb), - closedCallback(cCb), - emptyCallback(eCb), - idleCallback(iCb), - socket(s), - opsInProgress(0), - writeInProgress(false), - queuedDelete(false), - queuedClose(false), - working(false) { -} - -struct deleter -{ - template <typename T> - void operator()(T *ptr){ delete ptr;} -}; - -AsynchIO::~AsynchIO() { - std::for_each( bufferQueue.begin(), bufferQueue.end(), deleter()); - std::for_each( writeQueue.begin(), writeQueue.end(), deleter()); -} - -void AsynchIO::queueForDeletion() { - queuedDelete = true; - if (opsInProgress > 0) { - QPID_LOG(info, "Delete AsynchIO queued; ops in progress"); - // AsynchIOHandler calls this then deletes itself; don't do any more - // callbacks. - readCallback = 0; - eofCallback = 0; - disCallback = 0; - closedCallback = 0; - emptyCallback = 0; - idleCallback = 0; - } - else { - delete this; - } -} - -void AsynchIO::start(Poller::shared_ptr poller0) { - PollerHandle ph = PollerHandle(socket); - poller = poller0; - poller->monitorHandle(ph, Poller::INPUT); - if (writeQueue.size() > 0) // Already have data queued for write - notifyPendingWrite(); - startReading(); -} - -void AsynchIO::queueReadBuffer(AsynchIO::BufferBase* buff) { - assert(buff); - buff->dataStart = 0; - buff->dataCount = 0; - QLock l(bufferQueueLock); - bufferQueue.push_back(buff); -} - -void AsynchIO::unread(AsynchIO::BufferBase* buff) { - assert(buff); - buff->squish(); - QLock l(bufferQueueLock); - bufferQueue.push_front(buff); -} - -void AsynchIO::queueWrite(AsynchIO::BufferBase* buff) { - assert(buff); - QLock l(bufferQueueLock); - writeQueue.push_back(buff); - if (!writeInProgress) - notifyPendingWrite(); -} - -void AsynchIO::notifyPendingWrite() { - // This method is generally called from a processing thread; transfer - // work on this to an I/O thread. Much of the upper layer code assumes - // that all I/O-related things happen in an I/O thread. - if (poller == 0) // Not really going yet... - return; - - InterlockedIncrement(&opsInProgress); - PollerHandle ph(CallbackHandle(boost::bind(&AsynchIO::completion, this, _1))); - poller->monitorHandle(ph, Poller::OUTPUT); -} - -void AsynchIO::queueWriteClose() { - queuedClose = true; - if (!writeInProgress) - notifyPendingWrite(); -} - -bool AsynchIO::writeQueueEmpty() { - QLock l(bufferQueueLock); - return writeQueue.size() == 0; -} - -/* - * Initiate a read operation. AsynchIO::readComplete() will be - * called when the read is complete and data is available. - */ -void AsynchIO::startReading() { - if (queuedDelete) - return; - - // (Try to) get a buffer; look on the front since there may be an - // "unread" one there with data remaining from last time. - AsynchIO::BufferBase *buff = 0; - { - QLock l(bufferQueueLock); - - if (!bufferQueue.empty()) { - buff = bufferQueue.front(); - assert(buff); - bufferQueue.pop_front(); - } - } - if (buff != 0) { - int readCount = buff->byteCount - buff->dataCount; - AsynchReadResult *result = - new AsynchReadResult(boost::bind(&AsynchIO::completion, this, _1), - buff, - readCount); - DWORD bytesReceived = 0, flags = 0; - InterlockedIncrement(&opsInProgress); - int status = WSARecv(toSocketHandle(socket), - const_cast<LPWSABUF>(result->getWSABUF()), 1, - &bytesReceived, - &flags, - result->overlapped(), - 0); - if (status != 0) { - int error = WSAGetLastError(); - if (error != WSA_IO_PENDING) { - result->failure(error); - result = 0; // result is invalid here - return; - } - } - // On status 0 or WSA_IO_PENDING, completion will handle the rest. - } - else { - notifyBuffersEmpty(); - } - return; -} - -// stopReading was added to prevent a race condition with read-credit on Linux. -// It may or may not be required on windows. -// -// AsynchIOHandler::readbuff() calls stopReading() inside the same -// critical section that protects startReading() in -// AsynchIOHandler::giveReadCredit(). -// -void AsynchIO::stopReading() {} - -// Queue the specified callback for invocation from an I/O thread. -void AsynchIO::requestCallback(RequestCallback callback) { - // This method is generally called from a processing thread; transfer - // work on this to an I/O thread. Much of the upper layer code assumes - // that all I/O-related things happen in an I/O thread. - if (poller == 0) // Not really going yet... - return; - - InterlockedIncrement(&opsInProgress); - PollerHandle ph(CallbackHandle( - boost::bind(&AsynchIO::completion, this, _1), - callback)); - poller->monitorHandle(ph, Poller::INPUT); -} - -/** - * Return a queued buffer if there are enough to spare. - */ -AsynchIO::BufferBase* AsynchIO::getQueuedBuffer() { - QLock l(bufferQueueLock); - // Always keep at least one buffer (it might have data that was - // "unread" in it). - if (bufferQueue.size() <= 1) - return 0; - BufferBase* buff = bufferQueue.back(); - assert(buff); - bufferQueue.pop_back(); - return buff; -} - -void AsynchIO::notifyEof(void) { - if (eofCallback) - eofCallback(*this); -} - -void AsynchIO::notifyDisconnect(void) { - if (disCallback) - disCallback(*this); -} - -void AsynchIO::notifyClosed(void) { - if (closedCallback) - closedCallback(*this, socket); -} - -void AsynchIO::notifyBuffersEmpty(void) { - if (emptyCallback) - emptyCallback(*this); -} - -void AsynchIO::notifyIdle(void) { - if (idleCallback) - idleCallback(*this); -} - -/* - * Asynch reader/writer using overlapped I/O - */ - -void AsynchIO::startWrite(AsynchIO::BufferBase* buff) { - writeInProgress = true; - InterlockedIncrement(&opsInProgress); - AsynchWriteResult *result = - new AsynchWriteResult(boost::bind(&AsynchIO::completion, this, _1), - buff, - buff->dataCount); - DWORD bytesSent = 0; - int status = WSASend(toSocketHandle(socket), - const_cast<LPWSABUF>(result->getWSABUF()), 1, - &bytesSent, - 0, - result->overlapped(), - 0); - if (status != 0) { - int error = WSAGetLastError(); - if (error != WSA_IO_PENDING) { - result->failure(error); // Also decrements in-progress count - result = 0; // result is invalid here - return; - } - } - // On status 0 or WSA_IO_PENDING, completion will handle the rest. - return; -} - -/* - * Close the socket and callback to say we've done it - */ -void AsynchIO::close(void) { - socket.close(); - notifyClosed(); -} - -void AsynchIO::readComplete(AsynchReadResult *result) { - int status = result->getStatus(); - size_t bytes = result->getTransferred(); - if (status == 0 && bytes > 0) { - bool restartRead = true; // May not if receiver doesn't want more - if (readCallback) - readCallback(*this, result->getBuff()); - if (restartRead) - startReading(); - } - else { - // No data read, so put the buffer back. It may be partially filled, - // so "unread" it back to the front of the queue. - unread(result->getBuff()); - notifyEof(); - if (status != 0) - { - notifyDisconnect(); - } - } -} - -/* - * NOTE - this completion is called for completed writes and also when - * a write is desired. The difference is in the buff - if a write is desired - * the buff is 0. - */ -void AsynchIO::writeComplete(AsynchWriteResult *result) { - int status = result->getStatus(); - size_t bytes = result->getTransferred(); - AsynchIO::BufferBase *buff = result->getBuff(); - if (buff != 0) { - writeInProgress = false; - if (status == 0 && bytes > 0) { - if (bytes < result->getRequested()) // Still more to go; resubmit - startWrite(buff); - else - queueReadBuffer(buff); // All done; back to the pool - } - else { - // An error... if it's a connection close, ignore it - it will be - // noticed and handled on a read completion any moment now. - // What to do with real error??? Save the Buffer? - } - } - - // If there are no writes outstanding, check for more writes to initiate - // (either queued or via idle). The opsInProgress count is handled in - // completion() - if (!writeInProgress) { - bool writing = false; - { - QLock l(bufferQueueLock); - if (writeQueue.size() > 0) { - buff = writeQueue.front(); - assert(buff); - writeQueue.pop_front(); - startWrite(buff); - writing = true; - } - } - if (!writing && !queuedClose) { - notifyIdle(); - } - } - return; -} - -void AsynchIO::completion(AsynchIoResult *result) { - { - ScopedLock<Mutex> l(completionLock); - if (working) { - completionQueue.push(result); - return; - } - - // First thread in with something to do; note we're working then keep - // handling completions. - working = true; - while (result != 0) { - // New scope to unlock temporarily. - { - ScopedUnlock<Mutex> ul(completionLock); - AsynchReadResult *r = dynamic_cast<AsynchReadResult*>(result); - if (r != 0) - readComplete(r); - else { - AsynchWriteResult *w = - dynamic_cast<AsynchWriteResult*>(result); - if (w != 0) - writeComplete(w); - else { - AsynchCallbackRequest *req = - dynamic_cast<AsynchCallbackRequest*>(result); - req->reqCallback(*this); - } - } - delete result; - result = 0; - InterlockedDecrement(&opsInProgress); - } - // Lock is held again. - if (completionQueue.empty()) - continue; - result = completionQueue.front(); - completionQueue.pop(); - } - working = false; - } - // Lock released; ok to close if ops are done and close requested. - // Layer above will call back to queueForDeletion() if it hasn't - // already been done. If it already has, go ahead and delete. - if (opsInProgress == 0) { - if (queuedClose) - // close() may cause a delete; don't trust 'this' on return - close(); - else if (queuedDelete) - delete this; - } -} - -} // namespace windows - -AsynchIO* qpid::sys::AsynchIO::create(const Socket& s, - AsynchIO::ReadCallback rCb, - AsynchIO::EofCallback eofCb, - AsynchIO::DisconnectCallback disCb, - AsynchIO::ClosedCallback cCb, - AsynchIO::BuffersEmptyCallback eCb, - AsynchIO::IdleCallback iCb) -{ - return new qpid::sys::windows::AsynchIO(s, rCb, eofCb, disCb, cCb, eCb, iCb); -} - -}} // namespace qpid::sys |