diff options
Diffstat (limited to 'src/mongo/util/net/listen.cpp')
-rw-r--r-- | src/mongo/util/net/listen.cpp | 927 |
1 files changed, 463 insertions, 464 deletions
diff --git a/src/mongo/util/net/listen.cpp b/src/mongo/util/net/listen.cpp index 249fe6f878e..be77c6fc485 100644 --- a/src/mongo/util/net/listen.cpp +++ b/src/mongo/util/net/listen.cpp @@ -46,11 +46,11 @@ #ifndef _WIN32 -# ifndef __sun -# include <ifaddrs.h> -# endif -# include <sys/resource.h> -# include <sys/stat.h> +#ifndef __sun +#include <ifaddrs.h> +#endif +#include <sys/resource.h> +#include <sys/stat.h> #include <sys/types.h> #include <sys/socket.h> @@ -61,7 +61,7 @@ #include <errno.h> #include <netdb.h> #ifdef __OpenBSD__ -# include <sys/uio.h> +#include <sys/uio.h> #endif #else @@ -74,594 +74,593 @@ namespace mongo { - using std::shared_ptr; - using std::endl; - using std::string; - using std::vector; +using std::shared_ptr; +using std::endl; +using std::string; +using std::vector; - // ----- Listener ------- +// ----- Listener ------- - const Listener* Listener::_timeTracker; +const Listener* Listener::_timeTracker; - vector<SockAddr> ipToAddrs(const char* ips, int port, bool useUnixSockets) { - vector<SockAddr> out; - if (*ips == '\0') { - out.push_back(SockAddr("0.0.0.0", port)); // IPv4 all +vector<SockAddr> ipToAddrs(const char* ips, int port, bool useUnixSockets) { + vector<SockAddr> out; + if (*ips == '\0') { + out.push_back(SockAddr("0.0.0.0", port)); // IPv4 all - if (IPv6Enabled()) - out.push_back(SockAddr("::", port)); // IPv6 all + if (IPv6Enabled()) + out.push_back(SockAddr("::", port)); // IPv6 all #ifndef _WIN32 - if (useUnixSockets) - out.push_back(SockAddr(makeUnixSockPath(port).c_str(), port)); // Unix socket + if (useUnixSockets) + out.push_back(SockAddr(makeUnixSockPath(port).c_str(), port)); // Unix socket #endif - return out; - } + return out; + } - while(*ips) { - string ip; - const char * comma = strchr(ips, ','); - if (comma) { - ip = string(ips, comma - ips); - ips = comma + 1; - } - else { - ip = string(ips); - ips = ""; - } + while (*ips) { + string ip; + const char* comma = strchr(ips, ','); + if (comma) { + ip = string(ips, comma - ips); + ips = comma + 1; + } else { + ip = string(ips); + ips = ""; + } - SockAddr sa(ip.c_str(), port); - out.push_back(sa); + SockAddr sa(ip.c_str(), port); + out.push_back(sa); #ifndef _WIN32 - if (sa.isValid() && useUnixSockets && - (sa.getAddr() == "127.0.0.1" || sa.getAddr() == "0.0.0.0")) // only IPv4 - out.push_back(SockAddr(makeUnixSockPath(port).c_str(), port)); + if (sa.isValid() && useUnixSockets && + (sa.getAddr() == "127.0.0.1" || sa.getAddr() == "0.0.0.0")) // only IPv4 + out.push_back(SockAddr(makeUnixSockPath(port).c_str(), port)); #endif - } - return out; - } - - Listener::Listener(const string& name, const string &ip, int port, bool logConnect ) - : _port(port), _name(name), _ip(ip), _setupSocketsSuccessful(false), - _logConnect(logConnect), _elapsedTime(0) { + return out; +} + +Listener::Listener(const string& name, const string& ip, int port, bool logConnect) + : _port(port), + _name(name), + _ip(ip), + _setupSocketsSuccessful(false), + _logConnect(logConnect), + _elapsedTime(0) { #ifdef MONGO_CONFIG_SSL - _ssl = getSSLManager(); + _ssl = getSSLManager(); #endif - } - - Listener::~Listener() { - if ( _timeTracker == this ) - _timeTracker = 0; - } +} + +Listener::~Listener() { + if (_timeTracker == this) + _timeTracker = 0; +} - void Listener::setupSockets() { - checkTicketNumbers(); +void Listener::setupSockets() { + checkTicketNumbers(); #if !defined(_WIN32) - _mine = ipToAddrs(_ip.c_str(), _port, (!serverGlobalParams.noUnixSocket && - useUnixSockets())); + _mine = ipToAddrs(_ip.c_str(), _port, (!serverGlobalParams.noUnixSocket && useUnixSockets())); #else - _mine = ipToAddrs(_ip.c_str(), _port, false); + _mine = ipToAddrs(_ip.c_str(), _port, false); #endif - for (std::vector<SockAddr>::const_iterator it=_mine.begin(), end=_mine.end(); - it != end; - ++it) { - - const SockAddr& me = *it; + for (std::vector<SockAddr>::const_iterator it = _mine.begin(), end = _mine.end(); it != end; + ++it) { + const SockAddr& me = *it; - if (!me.isValid()) { - error() << "listen(): socket is invalid." << endl; - return; - } + if (!me.isValid()) { + error() << "listen(): socket is invalid." << endl; + return; + } - SOCKET sock = ::socket(me.getType(), SOCK_STREAM, 0); - ScopeGuard socketGuard = MakeGuard(&closesocket, sock); - massert( 15863 , str::stream() << "listen(): invalid socket? " << errnoWithDescription() , sock >= 0 ); + SOCKET sock = ::socket(me.getType(), SOCK_STREAM, 0); + ScopeGuard socketGuard = MakeGuard(&closesocket, sock); + massert(15863, + str::stream() << "listen(): invalid socket? " << errnoWithDescription(), + sock >= 0); - if (me.getType() == AF_UNIX) { + if (me.getType() == AF_UNIX) { #if !defined(_WIN32) - if (unlink(me.getAddr().c_str()) == -1) { - if (errno != ENOENT) { - error() << "Failed to unlink socket file " << me << " " - << errnoWithDescription(errno); - fassertFailedNoTrace(28578); - } + if (unlink(me.getAddr().c_str()) == -1) { + if (errno != ENOENT) { + error() << "Failed to unlink socket file " << me << " " + << errnoWithDescription(errno); + fassertFailedNoTrace(28578); } -#endif - } - else if (me.getType() == AF_INET6) { - // IPv6 can also accept IPv4 connections as mapped addresses (::ffff:127.0.0.1) - // That causes a conflict if we don't do set it to IPV6_ONLY - const int one = 1; - setsockopt(sock, IPPROTO_IPV6, IPV6_V6ONLY, (const char*) &one, sizeof(one)); } +#endif + } else if (me.getType() == AF_INET6) { + // IPv6 can also accept IPv4 connections as mapped addresses (::ffff:127.0.0.1) + // That causes a conflict if we don't do set it to IPV6_ONLY + const int one = 1; + setsockopt(sock, IPPROTO_IPV6, IPV6_V6ONLY, (const char*)&one, sizeof(one)); + } #if !defined(_WIN32) - { - const int one = 1; - if ( setsockopt( sock , SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one)) < 0 ) - log() << "Failed to set socket opt, SO_REUSEADDR" << endl; - } + { + const int one = 1; + if (setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one)) < 0) + log() << "Failed to set socket opt, SO_REUSEADDR" << endl; + } #endif - if ( ::bind(sock, me.raw(), me.addressSize) != 0 ) { - int x = errno; - error() << "listen(): bind() failed " << errnoWithDescription(x) << " for socket: " << me.toString() << endl; - if ( x == EADDRINUSE ) - error() << " addr already in use" << endl; - return; - } + if (::bind(sock, me.raw(), me.addressSize) != 0) { + int x = errno; + error() << "listen(): bind() failed " << errnoWithDescription(x) + << " for socket: " << me.toString() << endl; + if (x == EADDRINUSE) + error() << " addr already in use" << endl; + return; + } #if !defined(_WIN32) - if (me.getType() == AF_UNIX) { - if (chmod(me.getAddr().c_str(), serverGlobalParams.unixSocketPermissions) == -1) { - error() << "Failed to chmod socket file " << me << " " - << errnoWithDescription(errno); - fassertFailedNoTrace(28582); - } - ListeningSockets::get()->addPath( me.getAddr() ); + if (me.getType() == AF_UNIX) { + if (chmod(me.getAddr().c_str(), serverGlobalParams.unixSocketPermissions) == -1) { + error() << "Failed to chmod socket file " << me << " " + << errnoWithDescription(errno); + fassertFailedNoTrace(28582); } + ListeningSockets::get()->addPath(me.getAddr()); + } #endif - _socks.push_back(sock); - socketGuard.Dismiss(); - } - - _setupSocketsSuccessful = true; + _socks.push_back(sock); + socketGuard.Dismiss(); } - - + + _setupSocketsSuccessful = true; +} + + #if !defined(_WIN32) - void Listener::initAndListen() { - if (!_setupSocketsSuccessful) { +void Listener::initAndListen() { + if (!_setupSocketsSuccessful) { + return; + } + + SOCKET maxfd = 0; // needed for select() + for (unsigned i = 0; i < _socks.size(); i++) { + if (::listen(_socks[i], 128) != 0) { + error() << "listen(): listen() failed " << errnoWithDescription() << endl; return; } - SOCKET maxfd = 0; // needed for select() - for (unsigned i = 0; i < _socks.size(); i++) { - if (::listen(_socks[i], 128) != 0) { - error() << "listen(): listen() failed " << errnoWithDescription() << endl; - return; - } - - ListeningSockets::get()->add(_socks[i]); + ListeningSockets::get()->add(_socks[i]); - if (_socks[i] > maxfd) { - maxfd = _socks[i]; - } + if (_socks[i] > maxfd) { + maxfd = _socks[i]; } + } - if ( maxfd >= FD_SETSIZE ) { - error() << "socket " << maxfd << " is higher than " << FD_SETSIZE-1 << - "; not supported" << warnings; - return; - } + if (maxfd >= FD_SETSIZE) { + error() << "socket " << maxfd << " is higher than " << FD_SETSIZE - 1 << "; not supported" + << warnings; + return; + } #ifdef MONGO_CONFIG_SSL - _logListen(_port, _ssl); + _logListen(_port, _ssl); #else - _logListen(_port, false); + _logListen(_port, false); #endif - { - // Wake up any threads blocked in waitUntilListening() - stdx::lock_guard<stdx::mutex> lock(_readyMutex); - _ready = true; - _readyCondition.notify_all(); - } + { + // Wake up any threads blocked in waitUntilListening() + stdx::lock_guard<stdx::mutex> lock(_readyMutex); + _ready = true; + _readyCondition.notify_all(); + } - struct timeval maxSelectTime; - while ( ! inShutdown() ) { - fd_set fds[1]; - FD_ZERO(fds); - - for (vector<SOCKET>::iterator it=_socks.begin(), end=_socks.end(); it != end; ++it) { - FD_SET(*it, fds); - } + struct timeval maxSelectTime; + while (!inShutdown()) { + fd_set fds[1]; + FD_ZERO(fds); + + for (vector<SOCKET>::iterator it = _socks.begin(), end = _socks.end(); it != end; ++it) { + FD_SET(*it, fds); + } - maxSelectTime.tv_sec = 0; - maxSelectTime.tv_usec = 10000; - const int ret = select(maxfd+1, fds, NULL, NULL, &maxSelectTime); + maxSelectTime.tv_sec = 0; + maxSelectTime.tv_usec = 10000; + const int ret = select(maxfd + 1, fds, NULL, NULL, &maxSelectTime); - if (ret == 0) { + if (ret == 0) { #if defined(__linux__) - _elapsedTime += ( 10000 - maxSelectTime.tv_usec ) / 1000; + _elapsedTime += (10000 - maxSelectTime.tv_usec) / 1000; #else - _elapsedTime += 10; + _elapsedTime += 10; #endif - continue; - } + continue; + } - if (ret < 0) { - int x = errno; + if (ret < 0) { + int x = errno; #ifdef EINTR - if ( x == EINTR ) { - log() << "select() signal caught, continuing" << endl; - continue; - } -#endif - if ( ! inShutdown() ) - log() << "select() failure: ret=" << ret << " " << errnoWithDescription(x) << endl; - return; + if (x == EINTR) { + log() << "select() signal caught, continuing" << endl; + continue; } +#endif + if (!inShutdown()) + log() << "select() failure: ret=" << ret << " " << errnoWithDescription(x) << endl; + return; + } #if defined(__linux__) - _elapsedTime += std::max(ret, (int)(( 10000 - maxSelectTime.tv_usec ) / 1000)); + _elapsedTime += std::max(ret, (int)((10000 - maxSelectTime.tv_usec) / 1000)); #else - _elapsedTime += ret; // assume 1ms to grab connection. very rough + _elapsedTime += ret; // assume 1ms to grab connection. very rough #endif - for (vector<SOCKET>::iterator it=_socks.begin(), end=_socks.end(); it != end; ++it) { - if (! (FD_ISSET(*it, fds))) + for (vector<SOCKET>::iterator it = _socks.begin(), end = _socks.end(); it != end; ++it) { + if (!(FD_ISSET(*it, fds))) + continue; + SockAddr from; + int s = accept(*it, from.raw(), &from.addressSize); + if (s < 0) { + int x = errno; // so no global issues + if (x == EBADF) { + log() << "Port " << _port << " is no longer valid" << endl; + return; + } else if (x == ECONNABORTED) { + log() << "Connection on port " << _port << " aborted" << endl; continue; - SockAddr from; - int s = accept(*it, from.raw(), &from.addressSize); - if ( s < 0 ) { - int x = errno; // so no global issues - if (x == EBADF) { - log() << "Port " << _port << " is no longer valid" << endl; - return; - } - else if (x == ECONNABORTED) { - log() << "Connection on port " << _port << " aborted" << endl; - continue; - } - if ( x == 0 && inShutdown() ) { - return; // socket closed - } - if( !inShutdown() ) { - log() << "Listener: accept() returns " << s << " " << errnoWithDescription(x) << endl; - if (x == EMFILE || x == ENFILE) { - // Connection still in listen queue but we can't accept it yet - error() << "Out of file descriptors. Waiting one second before trying to accept more connections." << warnings; - sleepsecs(1); - } + } + if (x == 0 && inShutdown()) { + return; // socket closed + } + if (!inShutdown()) { + log() << "Listener: accept() returns " << s << " " << errnoWithDescription(x) + << endl; + if (x == EMFILE || x == ENFILE) { + // Connection still in listen queue but we can't accept it yet + error() << "Out of file descriptors. Waiting one second before trying to " + "accept more connections." << warnings; + sleepsecs(1); } - continue; } - if (from.getType() != AF_UNIX) - disableNagle(s); + continue; + } + if (from.getType() != AF_UNIX) + disableNagle(s); #ifdef SO_NOSIGPIPE - // ignore SIGPIPE signals on osx, to avoid process exit - const int one = 1; - setsockopt( s , SOL_SOCKET, SO_NOSIGPIPE, &one, sizeof(int)); + // ignore SIGPIPE signals on osx, to avoid process exit + const int one = 1; + setsockopt(s, SOL_SOCKET, SO_NOSIGPIPE, &one, sizeof(int)); #endif - long long myConnectionNumber = globalConnectionNumber.addAndFetch(1); + long long myConnectionNumber = globalConnectionNumber.addAndFetch(1); + + if (_logConnect && !serverGlobalParams.quiet) { + int conns = globalTicketHolder.used() + 1; + const char* word = (conns == 1 ? " connection" : " connections"); + log() << "connection accepted from " << from.toString() << " #" + << myConnectionNumber << " (" << conns << word << " now open)" << endl; + } - if (_logConnect && !serverGlobalParams.quiet) { - int conns = globalTicketHolder.used()+1; - const char* word = (conns == 1 ? " connection" : " connections"); - log() << "connection accepted from " << from.toString() << " #" << myConnectionNumber << " (" << conns << word << " now open)" << endl; - } - - std::shared_ptr<Socket> pnewSock( new Socket(s, from) ); + std::shared_ptr<Socket> pnewSock(new Socket(s, from)); #ifdef MONGO_CONFIG_SSL - if (_ssl) { - pnewSock->secureAccepted(_ssl); - } -#endif - accepted( pnewSock , myConnectionNumber ); + if (_ssl) { + pnewSock->secureAccepted(_ssl); } +#endif + accepted(pnewSock, myConnectionNumber); } } +} -#else - // Windows - - // Given a SOCKET, turns off nonblocking mode - static void disableNonblockingMode(SOCKET socket) { - unsigned long resultBuffer = 0; - unsigned long resultBufferBytesWritten = 0; - unsigned long newNonblockingEnabled = 0; - const int status = WSAIoctl(socket, - FIONBIO, - &newNonblockingEnabled, - sizeof(unsigned long), - &resultBuffer, - sizeof(resultBuffer), - &resultBufferBytesWritten, - NULL, - NULL); - if (status == SOCKET_ERROR) { - const int mongo_errno = WSAGetLastError(); - error() << "Windows WSAIoctl returned " << errnoWithDescription(mongo_errno) << endl; - fassertFailed(16726); - } +#else +// Windows + +// Given a SOCKET, turns off nonblocking mode +static void disableNonblockingMode(SOCKET socket) { + unsigned long resultBuffer = 0; + unsigned long resultBufferBytesWritten = 0; + unsigned long newNonblockingEnabled = 0; + const int status = WSAIoctl(socket, + FIONBIO, + &newNonblockingEnabled, + sizeof(unsigned long), + &resultBuffer, + sizeof(resultBuffer), + &resultBufferBytesWritten, + NULL, + NULL); + if (status == SOCKET_ERROR) { + const int mongo_errno = WSAGetLastError(); + error() << "Windows WSAIoctl returned " << errnoWithDescription(mongo_errno) << endl; + fassertFailed(16726); } +} - // RAII wrapper class to ensure we do not leak WSAEVENTs. - class EventHolder { - WSAEVENT _socketEventHandle; - public: - EventHolder() { - _socketEventHandle = WSACreateEvent(); - if (_socketEventHandle == WSA_INVALID_EVENT) { - const int mongo_errno = WSAGetLastError(); - error() << "Windows WSACreateEvent returned " << errnoWithDescription(mongo_errno) +// RAII wrapper class to ensure we do not leak WSAEVENTs. +class EventHolder { + WSAEVENT _socketEventHandle; + +public: + EventHolder() { + _socketEventHandle = WSACreateEvent(); + if (_socketEventHandle == WSA_INVALID_EVENT) { + const int mongo_errno = WSAGetLastError(); + error() << "Windows WSACreateEvent returned " << errnoWithDescription(mongo_errno) << endl; - fassertFailed(16728); - } + fassertFailed(16728); } - ~EventHolder() { - BOOL bstatus = WSACloseEvent(_socketEventHandle); - if (bstatus == FALSE) { - const int mongo_errno = WSAGetLastError(); - error() << "Windows WSACloseEvent returned " << errnoWithDescription(mongo_errno) + } + ~EventHolder() { + BOOL bstatus = WSACloseEvent(_socketEventHandle); + if (bstatus == FALSE) { + const int mongo_errno = WSAGetLastError(); + error() << "Windows WSACloseEvent returned " << errnoWithDescription(mongo_errno) << endl; - fassertFailed(16725); - } - } - WSAEVENT get() { - return _socketEventHandle; - } - }; - - void Listener::initAndListen() { - if (!_setupSocketsSuccessful) { - return; + fassertFailed(16725); } + } + WSAEVENT get() { + return _socketEventHandle; + } +}; - for (unsigned i = 0; i < _socks.size(); i++) { - if (::listen(_socks[i], 128) != 0) { - error() << "listen(): listen() failed " << errnoWithDescription() << endl; - return; - } +void Listener::initAndListen() { + if (!_setupSocketsSuccessful) { + return; + } - ListeningSockets::get()->add(_socks[i]); + for (unsigned i = 0; i < _socks.size(); i++) { + if (::listen(_socks[i], 128) != 0) { + error() << "listen(): listen() failed " << errnoWithDescription() << endl; + return; } + ListeningSockets::get()->add(_socks[i]); + } + #ifdef MONGO_CONFIG_SSL - _logListen(_port, _ssl); + _logListen(_port, _ssl); #else - _logListen(_port, false); + _logListen(_port, false); #endif - { - // Wake up any threads blocked in waitUntilListening() - stdx::lock_guard<stdx::mutex> lock(_readyMutex); - _ready = true; - _readyCondition.notify_all(); - } + { + // Wake up any threads blocked in waitUntilListening() + stdx::lock_guard<stdx::mutex> lock(_readyMutex); + _ready = true; + _readyCondition.notify_all(); + } + + OwnedPointerVector<EventHolder> eventHolders; + std::unique_ptr<WSAEVENT[]> events(new WSAEVENT[_socks.size()]); - OwnedPointerVector<EventHolder> eventHolders; - std::unique_ptr<WSAEVENT[]> events(new WSAEVENT[_socks.size()]); - - - // Populate events array with an event for each socket we are watching + + // Populate events array with an event for each socket we are watching + for (size_t count = 0; count < _socks.size(); ++count) { + EventHolder* ev(new EventHolder); + eventHolders.mutableVector().push_back(ev); + events[count] = ev->get(); + } + + while (!inShutdown()) { + // Turn on listening for accept-ready sockets for (size_t count = 0; count < _socks.size(); ++count) { - EventHolder* ev(new EventHolder); - eventHolders.mutableVector().push_back(ev); - events[count] = ev->get(); - } - - while ( ! inShutdown() ) { - // Turn on listening for accept-ready sockets - for (size_t count = 0; count < _socks.size(); ++count) { - int status = WSAEventSelect(_socks[count], events[count], FD_ACCEPT | FD_CLOSE); - if (status == SOCKET_ERROR) { - const int mongo_errno = WSAGetLastError(); - - // During shutdown, we may fail to listen on the socket if it has already - // been closed - if (inShutdown()) { - return; - } + int status = WSAEventSelect(_socks[count], events[count], FD_ACCEPT | FD_CLOSE); + if (status == SOCKET_ERROR) { + const int mongo_errno = WSAGetLastError(); - error() << "Windows WSAEventSelect returned " - << errnoWithDescription(mongo_errno) << endl; - fassertFailed(16727); + // During shutdown, we may fail to listen on the socket if it has already + // been closed + if (inShutdown()) { + return; } + + error() << "Windows WSAEventSelect returned " << errnoWithDescription(mongo_errno) + << endl; + fassertFailed(16727); } - - // Wait till one of them goes active, or we time out - DWORD result = WSAWaitForMultipleEvents(_socks.size(), - events.get(), - FALSE, // don't wait for all the events - 10, // timeout, in ms - FALSE); // do not allow I/O interruptions - if (result == WSA_WAIT_FAILED) { - const int mongo_errno = WSAGetLastError(); - error() << "Windows WSAWaitForMultipleEvents returned " - << errnoWithDescription(mongo_errno) << endl; - fassertFailed(16723); - } - - if (result == WSA_WAIT_TIMEOUT) { - _elapsedTime += 10; - continue; - } - _elapsedTime += 1; // assume 1ms to grab connection. very rough - - // Determine which socket is ready - DWORD eventIndex = result - WSA_WAIT_EVENT_0; - WSANETWORKEVENTS networkEvents; - // Extract event details, and clear event for next pass - int status = WSAEnumNetworkEvents(_socks[eventIndex], - events[eventIndex], - &networkEvents); - if (status == SOCKET_ERROR) { - const int mongo_errno = WSAGetLastError(); - error() << "Windows WSAEnumNetworkEvents returned " + } + + // Wait till one of them goes active, or we time out + DWORD result = WSAWaitForMultipleEvents(_socks.size(), + events.get(), + FALSE, // don't wait for all the events + 10, // timeout, in ms + FALSE); // do not allow I/O interruptions + if (result == WSA_WAIT_FAILED) { + const int mongo_errno = WSAGetLastError(); + error() << "Windows WSAWaitForMultipleEvents returned " << errnoWithDescription(mongo_errno) << endl; + fassertFailed(16723); + } + + if (result == WSA_WAIT_TIMEOUT) { + _elapsedTime += 10; + continue; + } + _elapsedTime += 1; // assume 1ms to grab connection. very rough + + // Determine which socket is ready + DWORD eventIndex = result - WSA_WAIT_EVENT_0; + WSANETWORKEVENTS networkEvents; + // Extract event details, and clear event for next pass + int status = WSAEnumNetworkEvents(_socks[eventIndex], events[eventIndex], &networkEvents); + if (status == SOCKET_ERROR) { + const int mongo_errno = WSAGetLastError(); + error() << "Windows WSAEnumNetworkEvents returned " << errnoWithDescription(mongo_errno) + << endl; + continue; + } + + if (networkEvents.lNetworkEvents & FD_CLOSE) { + log() << "listen socket closed" << endl; + break; + } + + if (!(networkEvents.lNetworkEvents & FD_ACCEPT)) { + error() << "Unexpected network event: " << networkEvents.lNetworkEvents << endl; + continue; + } + + int iec = networkEvents.iErrorCode[FD_ACCEPT_BIT]; + if (iec != 0) { + error() << "Windows socket accept did not work:" << errnoWithDescription(iec) << endl; + continue; + } + + status = WSAEventSelect(_socks[eventIndex], NULL, 0); + if (status == SOCKET_ERROR) { + const int mongo_errno = WSAGetLastError(); + error() << "Windows WSAEventSelect returned " << errnoWithDescription(mongo_errno) + << endl; + continue; + } + + disableNonblockingMode(_socks[eventIndex]); + + SockAddr from; + int s = accept(_socks[eventIndex], from.raw(), &from.addressSize); + if (s < 0) { + int x = errno; // so no global issues + if (x == EBADF) { + log() << "Port " << _port << " is no longer valid" << endl; continue; - } - - if (networkEvents.lNetworkEvents & FD_CLOSE) { - log() << "listen socket closed" << endl; - break; - } - - if (!(networkEvents.lNetworkEvents & FD_ACCEPT)) { - error() << "Unexpected network event: " << networkEvents.lNetworkEvents << endl; - continue; - } - - int iec = networkEvents.iErrorCode[FD_ACCEPT_BIT]; - if (iec != 0) { - error() << "Windows socket accept did not work:" << errnoWithDescription(iec) - << endl; + } else if (x == ECONNABORTED) { + log() << "Listener on port " << _port << " aborted" << endl; continue; } - - status = WSAEventSelect(_socks[eventIndex], NULL, 0); - if (status == SOCKET_ERROR) { - const int mongo_errno = WSAGetLastError(); - error() << "Windows WSAEventSelect returned " - << errnoWithDescription(mongo_errno) << endl; - continue; + if (x == 0 && inShutdown()) { + return; // socket closed } - - disableNonblockingMode(_socks[eventIndex]); - - SockAddr from; - int s = accept(_socks[eventIndex], from.raw(), &from.addressSize); - if ( s < 0 ) { - int x = errno; // so no global issues - if (x == EBADF) { - log() << "Port " << _port << " is no longer valid" << endl; - continue; - } - else if (x == ECONNABORTED) { - log() << "Listener on port " << _port << " aborted" << endl; - continue; - } - if ( x == 0 && inShutdown() ) { - return; // socket closed - } - if( !inShutdown() ) { - log() << "Listener: accept() returns " << s << " " - << errnoWithDescription(x) << endl; - if (x == EMFILE || x == ENFILE) { - // Connection still in listen queue but we can't accept it yet - error() << "Out of file descriptors. Waiting one second before" - " trying to accept more connections." << warnings; - sleepsecs(1); - } + if (!inShutdown()) { + log() << "Listener: accept() returns " << s << " " << errnoWithDescription(x) + << endl; + if (x == EMFILE || x == ENFILE) { + // Connection still in listen queue but we can't accept it yet + error() << "Out of file descriptors. Waiting one second before" + " trying to accept more connections." << warnings; + sleepsecs(1); } - continue; } - if (from.getType() != AF_UNIX) - disableNagle(s); + continue; + } + if (from.getType() != AF_UNIX) + disableNagle(s); - long long myConnectionNumber = globalConnectionNumber.addAndFetch(1); + long long myConnectionNumber = globalConnectionNumber.addAndFetch(1); - if (_logConnect && !serverGlobalParams.quiet) { - int conns = globalTicketHolder.used()+1; - const char* word = (conns == 1 ? " connection" : " connections"); - log() << "connection accepted from " << from.toString() << " #" << myConnectionNumber << " (" << conns << word << " now open)" << endl; - } - - std::shared_ptr<Socket> pnewSock( new Socket(s, from) ); + if (_logConnect && !serverGlobalParams.quiet) { + int conns = globalTicketHolder.used() + 1; + const char* word = (conns == 1 ? " connection" : " connections"); + log() << "connection accepted from " << from.toString() << " #" << myConnectionNumber + << " (" << conns << word << " now open)" << endl; + } + + std::shared_ptr<Socket> pnewSock(new Socket(s, from)); #ifdef MONGO_CONFIG_SSL - if (_ssl) { - pnewSock->secureAccepted(_ssl); - } -#endif - accepted( pnewSock , myConnectionNumber ); + if (_ssl) { + pnewSock->secureAccepted(_ssl); } +#endif + accepted(pnewSock, myConnectionNumber); } +} #endif - void Listener::_logListen( int port , bool ssl ) { - log() << _name << ( _name.size() ? " " : "" ) << "waiting for connections on port " << port << ( ssl ? " ssl" : "" ) << endl; - } +void Listener::_logListen(int port, bool ssl) { + log() << _name << (_name.size() ? " " : "") << "waiting for connections on port " << port + << (ssl ? " ssl" : "") << endl; +} - void Listener::waitUntilListening() const { - stdx::unique_lock<stdx::mutex> lock(_readyMutex); - while (!_ready) { - _readyCondition.wait(lock); - } +void Listener::waitUntilListening() const { + stdx::unique_lock<stdx::mutex> lock(_readyMutex); + while (!_ready) { + _readyCondition.wait(lock); } +} - void Listener::accepted(std::shared_ptr<Socket> psocket, long long connectionId ) { - MessagingPort* port = new MessagingPort(psocket); - port->setConnectionId( connectionId ); - acceptedMP( port ); - } - - void Listener::acceptedMP(MessagingPort *mp) { - verify(!"You must overwrite one of the accepted methods"); - } +void Listener::accepted(std::shared_ptr<Socket> psocket, long long connectionId) { + MessagingPort* port = new MessagingPort(psocket); + port->setConnectionId(connectionId); + acceptedMP(port); +} - // ----- ListeningSockets ------- +void Listener::acceptedMP(MessagingPort* mp) { + verify(!"You must overwrite one of the accepted methods"); +} - ListeningSockets* ListeningSockets::_instance = new ListeningSockets(); +// ----- ListeningSockets ------- - ListeningSockets* ListeningSockets::get() { - return _instance; - } +ListeningSockets* ListeningSockets::_instance = new ListeningSockets(); - // ------ connection ticket and control ------ +ListeningSockets* ListeningSockets::get() { + return _instance; +} + +// ------ connection ticket and control ------ - int getMaxConnections() { +int getMaxConnections() { #ifdef _WIN32 - return DEFAULT_MAX_CONN; + return DEFAULT_MAX_CONN; #else - struct rlimit limit; - verify( getrlimit(RLIMIT_NOFILE,&limit) == 0 ); + struct rlimit limit; + verify(getrlimit(RLIMIT_NOFILE, &limit) == 0); - int max = (int)(limit.rlim_cur * .8); + int max = (int)(limit.rlim_cur * .8); - LOG(1) << "fd limit" - << " hard:" << limit.rlim_max - << " soft:" << limit.rlim_cur - << " max conn: " << max - << endl; + LOG(1) << "fd limit" + << " hard:" << limit.rlim_max << " soft:" << limit.rlim_cur << " max conn: " << max + << endl; - return max; + return max; #endif - } +} - void Listener::checkTicketNumbers() { - int want = getMaxConnections(); - int current = globalTicketHolder.outof(); - if ( current != DEFAULT_MAX_CONN ) { - if ( current < want ) { - // they want fewer than they can handle - // which is fine - LOG(1) << " only allowing " << current << " connections" << endl; - return; - } - if ( current > want ) { - log() << " --maxConns too high, can only handle " << want << endl; - } +void Listener::checkTicketNumbers() { + int want = getMaxConnections(); + int current = globalTicketHolder.outof(); + if (current != DEFAULT_MAX_CONN) { + if (current < want) { + // they want fewer than they can handle + // which is fine + LOG(1) << " only allowing " << current << " connections" << endl; + return; + } + if (current > want) { + log() << " --maxConns too high, can only handle " << want << endl; } - globalTicketHolder.resize( want ); } + globalTicketHolder.resize(want); +} - TicketHolder Listener::globalTicketHolder(DEFAULT_MAX_CONN); - AtomicInt64 Listener::globalConnectionNumber; - - void ListeningSockets::closeAll() { - std::set<int>* sockets; - std::set<std::string>* paths; +TicketHolder Listener::globalTicketHolder(DEFAULT_MAX_CONN); +AtomicInt64 Listener::globalConnectionNumber; - { - stdx::lock_guard<stdx::mutex> lk( _mutex ); - sockets = _sockets; - _sockets = new std::set<int>(); - paths = _socketPaths; - _socketPaths = new std::set<std::string>(); - } +void ListeningSockets::closeAll() { + std::set<int>* sockets; + std::set<std::string>* paths; - for ( std::set<int>::iterator i=sockets->begin(); i!=sockets->end(); i++ ) { - int sock = *i; - log() << "closing listening socket: " << sock << std::endl; - closesocket( sock ); - } - delete sockets; + { + stdx::lock_guard<stdx::mutex> lk(_mutex); + sockets = _sockets; + _sockets = new std::set<int>(); + paths = _socketPaths; + _socketPaths = new std::set<std::string>(); + } - for ( std::set<std::string>::iterator i=paths->begin(); i!=paths->end(); i++ ) { - std::string path = *i; - log() << "removing socket file: " << path << std::endl; - ::remove( path.c_str() ); - } - delete paths; + for (std::set<int>::iterator i = sockets->begin(); i != sockets->end(); i++) { + int sock = *i; + log() << "closing listening socket: " << sock << std::endl; + closesocket(sock); } + delete sockets; + for (std::set<std::string>::iterator i = paths->begin(); i != paths->end(); i++) { + std::string path = *i; + log() << "removing socket file: " << path << std::endl; + ::remove(path.c_str()); + } + delete paths; +} } |