diff options
author | Eric Milkie <milkie@10gen.com> | 2013-02-15 09:46:25 -0500 |
---|---|---|
committer | Eric Milkie <milkie@10gen.com> | 2013-02-15 09:48:30 -0500 |
commit | bf97961c885fc02feecb3bc45bd1f52e23b3d1b8 (patch) | |
tree | dcf4c78ba70bdd058c7b5f3a023c435892691f09 | |
parent | a54647db96c32c1c8c92e44f9c35b1eb2ba6ce89 (diff) | |
download | mongo-bf97961c885fc02feecb3bc45bd1f52e23b3d1b8.tar.gz |
SERVER-8612 alternative select() implementation for Windows
On Windows, socket() can return any SOCKET value, not just a small, incrementing
number. This makes it very hard to use select() since you need to allocate an
enormous fd_set to cover all the possible SOCKET values.
Instead, I've rewritten the listen/accept logic on Windows to avoid calling
select(); instead, we make use of Winsock2 native API calls.
-rw-r--r-- | src/mongo/platform/windows_basic.h | 4 | ||||
-rw-r--r-- | src/mongo/util/net/listen.cpp | 203 |
2 files changed, 200 insertions, 7 deletions
diff --git a/src/mongo/platform/windows_basic.h b/src/mongo/platform/windows_basic.h index 1ca0810b5be..96789116c18 100644 --- a/src/mongo/platform/windows_basic.h +++ b/src/mongo/platform/windows_basic.h @@ -28,10 +28,6 @@ # define WIN32_LEAN_AND_MEAN # include "mongo/targetver.h" -// Override the default of 64, to match Linux -// fd_set is defined in winsock2.h -# define FD_SETSIZE 1024 - # include <winsock2.h> //this must be included before the first windows.h include # include <ws2tcpip.h> # include <wspiapi.h> diff --git a/src/mongo/util/net/listen.cpp b/src/mongo/util/net/listen.cpp index 8d0a140dd60..e8ab3b1adc4 100644 --- a/src/mongo/util/net/listen.cpp +++ b/src/mongo/util/net/listen.cpp @@ -19,6 +19,7 @@ #include "pch.h" #include "listen.h" #include "message_port.h" +#include "mongo/base/owned_pointer_vector.h" #ifndef _WIN32 @@ -183,11 +184,13 @@ namespace mongo { return true; } + +#if !defined(_WIN32) void Listener::initAndListen() { checkTicketNumbers(); vector<SOCKET> socks; - { // normal sockets + { vector<SockAddr> mine = ipToAddrs(_ip.c_str(), _port, (!cmdLine.noUnixSocket && useUnixSockets())); if ( ! _setupSockets( mine , socks ) ) return; @@ -204,7 +207,7 @@ namespace mongo { "; not supported" << warnings; return; } - + #ifdef MONGO_SSL _logListen(_port, _ssl); #else @@ -255,7 +258,6 @@ namespace mongo { for (vector<SOCKET>::iterator it=socks.begin(), end=socks.end(); it != end; ++it) { if (! (FD_ISSET(*it, fds))) continue; - SockAddr from; int s = accept(*it, from.raw(), &from.addressSize); if ( s < 0 ) { @@ -305,6 +307,201 @@ namespace mongo { } } +#else + // Windows + + // Given a SOCKET, turns off nonblocking mode + static void disableNonblockingMode(SOCKET socket) { + unsigned long resultBuffer = 0; + unsigned long resultBufferBytesWritten = 0; + unsigned long newNonblockingEnabled = 0; + const int status = WSAIoctl(socket, + FIONBIO, + &newNonblockingEnabled, + sizeof(unsigned long), + &resultBuffer, + sizeof(resultBuffer), + &resultBufferBytesWritten, + NULL, + NULL); + if (status == SOCKET_ERROR) { + const int mongo_errno = WSAGetLastError(); + error() << "Windows WSAIoctl returned " << errnoWithDescription(mongo_errno) << endl; + fassertFailed(16726); + } + } + + // RAII wrapper class to ensure we do not leak WSAEVENTs. + class EventHolder { + WSAEVENT _socketEventHandle; + public: + EventHolder() { + _socketEventHandle = WSACreateEvent(); + if (_socketEventHandle == WSA_INVALID_EVENT) { + const int mongo_errno = WSAGetLastError(); + error() << "Windows WSACreateEvent returned " << errnoWithDescription(mongo_errno) + << endl; + fassertFailed(16728); + } + } + ~EventHolder() { + BOOL bstatus = WSACloseEvent(_socketEventHandle); + if (bstatus == FALSE) { + const int mongo_errno = WSAGetLastError(); + error() << "Windows WSACloseEvent returned " << errnoWithDescription(mongo_errno) + << endl; + fassertFailed(16725); + } + } + WSAEVENT get() { + return _socketEventHandle; + } + }; + + void Listener::initAndListen() { + checkTicketNumbers(); + vector<SOCKET> socks; + + { + vector<SockAddr> mine = ipToAddrs(_ip.c_str(), _port, false); + if ( ! _setupSockets( mine , socks ) ) + return; + } + +#ifdef MONGO_SSL + _logListen(_port, _ssl); +#else + _logListen(_port, false); +#endif + + OwnedPointerVector<EventHolder> eventHolders; + boost::scoped_array<WSAEVENT> events(new WSAEVENT[socks.size()]); + + + // Populate events array with an event for each socket we are watching + for (size_t count = 0; count < socks.size(); ++count) { + EventHolder* ev(new EventHolder); + eventHolders.mutableVector().push_back(ev); + events[count] = ev->get(); + } + + while ( ! inShutdown() ) { + // Turn on listening for accept-ready sockets + for (size_t count = 0; count < socks.size(); ++count) { + int status = WSAEventSelect(socks[count], events[count], FD_ACCEPT | FD_CLOSE); + if (status == SOCKET_ERROR) { + const int mongo_errno = WSAGetLastError(); + error() << "Windows WSAEventSelect returned " + << errnoWithDescription(mongo_errno) << endl; + fassertFailed(16727); + } + } + + // Wait till one of them goes active, or we time out + DWORD result = WSAWaitForMultipleEvents(socks.size(), + events.get(), + FALSE, // don't wait for all the events + 10, // timeout, in ms + FALSE); // do not allow I/O interruptions + if (result == WSA_WAIT_FAILED) { + const int mongo_errno = WSAGetLastError(); + error() << "Windows WSAWaitForMultipleEvents returned " + << errnoWithDescription(mongo_errno) << endl; + fassertFailed(16723); + } + + if (result == WSA_WAIT_TIMEOUT) { + _elapsedTime += 10; + continue; + } + _elapsedTime += 1; // assume 1ms to grab connection. very rough + + // Determine which socket is ready + DWORD eventIndex = result - WSA_WAIT_EVENT_0; + WSANETWORKEVENTS networkEvents; + // Extract event details, and clear event for next pass + int status = WSAEnumNetworkEvents(socks[eventIndex], + events[eventIndex], + &networkEvents); + if (status == SOCKET_ERROR) { + const int mongo_errno = WSAGetLastError(); + error() << "Windows WSAEnumNetworkEvents returned " + << errnoWithDescription(mongo_errno) << endl; + continue; + } + + if (networkEvents.lNetworkEvents & FD_CLOSE) { + log() << "listen socket closed" << endl; + break; + } + + if (!networkEvents.lNetworkEvents & FD_ACCEPT) { + error() << "Unexpected network event: " << networkEvents.lNetworkEvents << endl; + continue; + } + + int iec = networkEvents.iErrorCode[FD_ACCEPT_BIT]; + if (iec != 0) { + error() << "Windows socket accept did not work:" << errnoWithDescription(iec) + << endl; + continue; + } + + status = WSAEventSelect(socks[eventIndex], NULL, 0); + if (status == SOCKET_ERROR) { + const int mongo_errno = WSAGetLastError(); + error() << "Windows WSAEventSelect returned " + << errnoWithDescription(mongo_errno) << endl; + continue; + } + + disableNonblockingMode(socks[eventIndex]); + + SockAddr from; + int s = accept(socks[eventIndex], from.raw(), &from.addressSize); + if ( s < 0 ) { + int x = errno; // so no global issues + if ( x == ECONNABORTED || x == EBADF ) { + log() << "Listener on port " << _port << " aborted" << endl; + return; + } + if ( x == 0 && inShutdown() ) { + return; // socket closed + } + if( !inShutdown() ) { + log() << "Listener: accept() returns " << s << " " + << errnoWithDescription(x) << endl; + if (x == EMFILE || x == ENFILE) { + // Connection still in listen queue but we can't accept it yet + error() << "Out of file descriptors. Waiting one second before" + " trying to accept more connections." << warnings; + sleepsecs(1); + } + } + continue; + } + if (from.getType() != AF_UNIX) + disableNagle(s); + + long long myConnectionNumber = globalConnectionNumber.addAndFetch(1); + + if ( _logConnect && ! cmdLine.quiet ){ + int conns = globalTicketHolder.used()+1; + const char* word = (conns == 1 ? " connection" : " connections"); + log() << "connection accepted from " << from.toString() << " #" << myConnectionNumber << " (" << conns << word << " now open)" << endl; + } + + boost::shared_ptr<Socket> pnewSock( new Socket(s, from) ); +#ifdef MONGO_SSL + if (_ssl) { + pnewSock->secureAccepted(_ssl); + } +#endif + accepted( pnewSock , myConnectionNumber ); + } + } +#endif + void Listener::_logListen( int port , bool ssl ) { log() << _name << ( _name.size() ? " " : "" ) << "waiting for connections on port " << port << ( ssl ? " ssl" : "" ) << endl; } |