summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/sys/windows/AsynchIO.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/sys/windows/AsynchIO.cpp')
-rw-r--r--cpp/src/qpid/sys/windows/AsynchIO.cpp39
1 files changed, 28 insertions, 11 deletions
diff --git a/cpp/src/qpid/sys/windows/AsynchIO.cpp b/cpp/src/qpid/sys/windows/AsynchIO.cpp
index 9fdf89c83b..e7e966519d 100644
--- a/cpp/src/qpid/sys/windows/AsynchIO.cpp
+++ b/cpp/src/qpid/sys/windows/AsynchIO.cpp
@@ -24,6 +24,7 @@
#include "qpid/sys/AsynchIO.h"
#include "qpid/sys/Mutex.h"
#include "qpid/sys/Socket.h"
+#include "qpid/sys/windows/WinSocket.h"
#include "qpid/sys/SocketAddress.h"
#include "qpid/sys/Poller.h"
#include "qpid/sys/Thread.h"
@@ -51,8 +52,8 @@ namespace {
* The function pointers for AcceptEx and ConnectEx need to be looked up
* at run time.
*/
-const LPFN_ACCEPTEX lookUpAcceptEx(const qpid::sys::Socket& s) {
- SOCKET h = toSocketHandle(s);
+const LPFN_ACCEPTEX lookUpAcceptEx(const qpid::sys::IOHandle& io) {
+ SOCKET h = io.fd;
GUID guidAcceptEx = WSAID_ACCEPTEX;
DWORD dwBytes = 0;
LPFN_ACCEPTEX fnAcceptEx;
@@ -94,12 +95,14 @@ private:
AsynchAcceptor::Callback acceptedCallback;
const Socket& socket;
+ const SOCKET wSocket;
const LPFN_ACCEPTEX fnAcceptEx;
};
AsynchAcceptor::AsynchAcceptor(const Socket& s, Callback callback)
: acceptedCallback(callback),
socket(s),
+ wSocket(IOHandle(s).fd),
fnAcceptEx(lookUpAcceptEx(s)) {
s.setNonblocking();
@@ -122,8 +125,8 @@ void AsynchAcceptor::restart(void) {
this,
socket);
BOOL status;
- status = fnAcceptEx(toSocketHandle(socket),
- toSocketHandle(*result->newSocket),
+ status = fnAcceptEx(wSocket,
+ IOHandle(*result->newSocket).fd,
result->addressBuffer,
0,
AsynchAcceptResult::SOCKADDRMAXLEN,
@@ -134,16 +137,30 @@ void AsynchAcceptor::restart(void) {
}
+Socket* createSameTypeSocket(const Socket& sock) {
+ SOCKET socket = IOHandle(sock).fd;
+ // Socket currently has no actual socket attached
+ if (socket == INVALID_SOCKET)
+ return new WinSocket;
+
+ ::sockaddr_storage sa;
+ ::socklen_t salen = sizeof(sa);
+ QPID_WINSOCK_CHECK(::getsockname(socket, (::sockaddr*)&sa, &salen));
+ SOCKET s = ::socket(sa.ss_family, SOCK_STREAM, 0); // Currently only work with SOCK_STREAM
+ if (s == INVALID_SOCKET) throw QPID_WINDOWS_ERROR(WSAGetLastError());
+ return new WinSocket(s);
+}
+
AsynchAcceptResult::AsynchAcceptResult(AsynchAcceptor::Callback cb,
AsynchAcceptor *acceptor,
- const Socket& listener)
+ const Socket& lsocket)
: callback(cb), acceptor(acceptor),
- listener(toSocketHandle(listener)),
- newSocket(listener.createSameTypeSocket()) {
+ listener(IOHandle(lsocket).fd),
+ newSocket(createSameTypeSocket(lsocket)) {
}
void AsynchAcceptResult::success(size_t /*bytesTransferred*/) {
- ::setsockopt (toSocketHandle(*newSocket),
+ ::setsockopt (IOHandle(*newSocket).fd,
SOL_SOCKET,
SO_UPDATE_ACCEPT_CONTEXT,
(char*)&listener,
@@ -363,7 +380,7 @@ class CallbackHandle : public IOHandle {
public:
CallbackHandle(AsynchIoResult::Completer completeCb,
AsynchIO::RequestCallback reqCb = 0) :
- IOHandle(new IOHandlePrivate (INVALID_SOCKET, completeCb, reqCb))
+ IOHandle(INVALID_SOCKET, completeCb, reqCb)
{}
};
@@ -516,7 +533,7 @@ void AsynchIO::startReading() {
DWORD bytesReceived = 0, flags = 0;
InterlockedIncrement(&opsInProgress);
readInProgress = true;
- int status = WSARecv(toSocketHandle(socket),
+ int status = WSARecv(IOHandle(socket).fd,
const_cast<LPWSABUF>(result->getWSABUF()), 1,
&bytesReceived,
&flags,
@@ -614,7 +631,7 @@ void AsynchIO::startWrite(AsynchIO::BufferBase* buff) {
buff,
buff->dataCount);
DWORD bytesSent = 0;
- int status = WSASend(toSocketHandle(socket),
+ int status = WSASend(IOHandle(socket).fd,
const_cast<LPWSABUF>(result->getWSABUF()), 1,
&bytesSent,
0,