diff options
Diffstat (limited to 'cpp/src/qpid/sys/windows/AsynchIO.cpp')
-rw-r--r-- | cpp/src/qpid/sys/windows/AsynchIO.cpp | 39 |
1 files changed, 28 insertions, 11 deletions
diff --git a/cpp/src/qpid/sys/windows/AsynchIO.cpp b/cpp/src/qpid/sys/windows/AsynchIO.cpp index 9fdf89c83b..e7e966519d 100644 --- a/cpp/src/qpid/sys/windows/AsynchIO.cpp +++ b/cpp/src/qpid/sys/windows/AsynchIO.cpp @@ -24,6 +24,7 @@ #include "qpid/sys/AsynchIO.h" #include "qpid/sys/Mutex.h" #include "qpid/sys/Socket.h" +#include "qpid/sys/windows/WinSocket.h" #include "qpid/sys/SocketAddress.h" #include "qpid/sys/Poller.h" #include "qpid/sys/Thread.h" @@ -51,8 +52,8 @@ namespace { * The function pointers for AcceptEx and ConnectEx need to be looked up * at run time. */ -const LPFN_ACCEPTEX lookUpAcceptEx(const qpid::sys::Socket& s) { - SOCKET h = toSocketHandle(s); +const LPFN_ACCEPTEX lookUpAcceptEx(const qpid::sys::IOHandle& io) { + SOCKET h = io.fd; GUID guidAcceptEx = WSAID_ACCEPTEX; DWORD dwBytes = 0; LPFN_ACCEPTEX fnAcceptEx; @@ -94,12 +95,14 @@ private: AsynchAcceptor::Callback acceptedCallback; const Socket& socket; + const SOCKET wSocket; const LPFN_ACCEPTEX fnAcceptEx; }; AsynchAcceptor::AsynchAcceptor(const Socket& s, Callback callback) : acceptedCallback(callback), socket(s), + wSocket(IOHandle(s).fd), fnAcceptEx(lookUpAcceptEx(s)) { s.setNonblocking(); @@ -122,8 +125,8 @@ void AsynchAcceptor::restart(void) { this, socket); BOOL status; - status = fnAcceptEx(toSocketHandle(socket), - toSocketHandle(*result->newSocket), + status = fnAcceptEx(wSocket, + IOHandle(*result->newSocket).fd, result->addressBuffer, 0, AsynchAcceptResult::SOCKADDRMAXLEN, @@ -134,16 +137,30 @@ void AsynchAcceptor::restart(void) { } +Socket* createSameTypeSocket(const Socket& sock) { + SOCKET socket = IOHandle(sock).fd; + // Socket currently has no actual socket attached + if (socket == INVALID_SOCKET) + return new WinSocket; + + ::sockaddr_storage sa; + ::socklen_t salen = sizeof(sa); + QPID_WINSOCK_CHECK(::getsockname(socket, (::sockaddr*)&sa, &salen)); + SOCKET s = ::socket(sa.ss_family, SOCK_STREAM, 0); // Currently only work with SOCK_STREAM + if (s == INVALID_SOCKET) throw QPID_WINDOWS_ERROR(WSAGetLastError()); + return new WinSocket(s); +} + AsynchAcceptResult::AsynchAcceptResult(AsynchAcceptor::Callback cb, AsynchAcceptor *acceptor, - const Socket& listener) + const Socket& lsocket) : callback(cb), acceptor(acceptor), - listener(toSocketHandle(listener)), - newSocket(listener.createSameTypeSocket()) { + listener(IOHandle(lsocket).fd), + newSocket(createSameTypeSocket(lsocket)) { } void AsynchAcceptResult::success(size_t /*bytesTransferred*/) { - ::setsockopt (toSocketHandle(*newSocket), + ::setsockopt (IOHandle(*newSocket).fd, SOL_SOCKET, SO_UPDATE_ACCEPT_CONTEXT, (char*)&listener, @@ -363,7 +380,7 @@ class CallbackHandle : public IOHandle { public: CallbackHandle(AsynchIoResult::Completer completeCb, AsynchIO::RequestCallback reqCb = 0) : - IOHandle(new IOHandlePrivate (INVALID_SOCKET, completeCb, reqCb)) + IOHandle(INVALID_SOCKET, completeCb, reqCb) {} }; @@ -516,7 +533,7 @@ void AsynchIO::startReading() { DWORD bytesReceived = 0, flags = 0; InterlockedIncrement(&opsInProgress); readInProgress = true; - int status = WSARecv(toSocketHandle(socket), + int status = WSARecv(IOHandle(socket).fd, const_cast<LPWSABUF>(result->getWSABUF()), 1, &bytesReceived, &flags, @@ -614,7 +631,7 @@ void AsynchIO::startWrite(AsynchIO::BufferBase* buff) { buff, buff->dataCount); DWORD bytesSent = 0; - int status = WSASend(toSocketHandle(socket), + int status = WSASend(IOHandle(socket).fd, const_cast<LPWSABUF>(result->getWSABUF()), 1, &bytesSent, 0, |