diff options
author | Jason Carey <jcarey@argv.me> | 2016-12-01 14:54:59 -0500 |
---|---|---|
committer | Jason Carey <jcarey@argv.me> | 2016-12-08 11:18:17 -0500 |
commit | 80775cd49db61ec792313da630158a2904faa75f (patch) | |
tree | 43566a1716adaa33bdc7f9291b3619bacd1285f4 | |
parent | cea6463452fb138b4536aed6660be082343dd9de (diff) | |
download | mongo-80775cd49db61ec792313da630158a2904faa75f.tar.gz |
SERVER-27240 Replace ConnectBG with poll
It's unsafe to close a socket from another thread. Also, after
returning EINTR, the connect call converts to an async call. And on
non-linux systems that requires a fallback to poll/select to handle
errors.
Because of that, let's just do the connect without the background thread
at all, starting off with poll.
(cherry picked from commit a1baabeee5694aa8c4ffa1827233684d6c7fcc49)
-rw-r--r-- | src/mongo/util/net/sock.cpp | 166 |
1 files changed, 110 insertions, 56 deletions
diff --git a/src/mongo/util/net/sock.cpp b/src/mongo/util/net/sock.cpp index 43426b6ff03..e2bbc28da83 100644 --- a/src/mongo/util/net/sock.cpp +++ b/src/mongo/util/net/sock.cpp @@ -33,7 +33,10 @@ #include "mongo/util/net/sock.h" +#include <algorithm> + #if !defined(_WIN32) +#include <fcntl.h> #include <sys/socket.h> #include <sys/types.h> #include <sys/un.h> @@ -70,6 +73,38 @@ using std::vector; MONGO_FP_DECLARE(throwSockExcep); +namespace { + +// Provides a cross-platform function for setting a file descriptor/socket to non-blocking mode. +bool setBlock(int fd, bool block) { +#ifdef _WIN32 + u_long ioMode = block ? 0 : 1; + return (NO_ERROR == ::ioctlsocket(fd, FIONBIO, &ioMode)); +#else + int flags = fcntl(fd, F_GETFL, fd); + if (block) { + return (-1 != fcntl(fd, F_SETFL, (flags & ~O_NONBLOCK))); + } else { + return (-1 != fcntl(fd, F_SETFL, (flags | O_NONBLOCK))); + } +#endif +} + +void networkWarnWithDescription(const Socket& socket, StringData call, int errorCode = -1) { +#ifdef _WIN32 + if (errorCode == -1) { + errorCode = WSAGetLastError(); + } +#endif + std::string ewd(errnoWithDescription(errorCode)); + warning() << "Failed to connect to " << socket.remoteAddr().getAddr() << ":" + << socket.remoteAddr().getPort() << ", in(" << call << "), reason: " << ewd; +} + +const double kMaxConnectTimeoutMS = 5000; + +} // namespace + static bool ipv6 = false; void enableIPv6(bool state) { ipv6 = state; @@ -539,53 +574,91 @@ std::string Socket::doSSLHandshake(const char* firstBytes, int len) { } #endif -class ConnectBG : public BackgroundJob { -public: - ConnectBG(int sock, SockAddr remote) : _sock(sock), _remote(remote) {} +bool Socket::connect(SockAddr& remote) { + _remote = remote; - void run() { -#if defined(_WIN32) - if ((_res = _connect()) == SOCKET_ERROR) { - _errnoWithDescription = errnoWithDescription(); + _fd = ::socket(remote.getType(), SOCK_STREAM, 0); + if (_fd == INVALID_SOCKET) { + networkWarnWithDescription(*this, "socket"); + return false; + } + + if (!setBlock(_fd, false)) { + networkWarnWithDescription(*this, "set socket to non-blocking mode"); + return false; + } + + const uint64_t connectTimeoutMillis = + _timeout > 0 ? std::min(kMaxConnectTimeoutMS, _timeout) : kMaxConnectTimeoutMS; + const uint64_t expiration = curTimeMillis64() + connectTimeoutMillis; + + bool connectSucceeded = ::connect(_fd, _remote.raw(), _remote.addressSize) == 0; + + if (!connectSucceeded) { +#ifdef _WIN32 + if (WSAGetLastError() != WSAEWOULDBLOCK) { + networkWarnWithDescription(*this, "connect"); + return false; } #else - while ((_res = _connect()) == -1) { - const int error = errno; - if (error != EINTR) { - _errnoWithDescription = errnoWithDescription(error); - break; - } + if (errno != EINTR && errno != EINPROGRESS) { + networkWarnWithDescription(*this, "connect"); + return false; } #endif - } - std::string name() const { - return "ConnectBG"; - } - std::string getErrnoWithDescription() const { - return _errnoWithDescription; - } - int inError() const { - return _res; - } + pollfd pfd; + pfd.fd = _fd; + pfd.events = POLLOUT; -private: - int _connect() const { - return ::connect(_sock, _remote.raw(), _remote.addressSize); - } + while (true) { + const uint64_t timeout = std::max(0ull, expiration - curTimeMillis64()); - int _sock; - int _res; - SockAddr _remote; - std::string _errnoWithDescription; -}; + int pollReturn = socketPoll(&pfd, 1, timeout); +#ifdef _WIN32 + if (pollReturn == SOCKET_ERROR) { + networkWarnWithDescription(*this, "poll"); + return false; + } +#else + if (pollReturn == -1) { + if (errno != EINTR) { + networkWarnWithDescription(*this, "poll"); + return false; + } -bool Socket::connect(SockAddr& remote) { - _remote = remote; + // EINTR in poll, try again + continue; + } +#endif + // No activity for the full duration of the timeout. + if (pollReturn == 0) { + warning() << "Failed to connect to " << _remote.getAddr() << ":" + << _remote.getPort() << " after " << connectTimeoutMillis + << " milliseconds, giving up."; + return false; + } - _fd = socket(remote.getType(), SOCK_STREAM, 0); - if (_fd == INVALID_SOCKET) { - LOG(_logLevel) << "ERROR: connect invalid socket " << errnoWithDescription() << endl; + // We had a result, see if there's an error on the socket. + int optVal; + socklen_t optLen = sizeof(optVal); + if (::getsockopt( + _fd, SOL_SOCKET, SO_ERROR, reinterpret_cast<char*>(&optVal), &optLen) == -1) { + networkWarnWithDescription(*this, "getsockopt"); + return false; + } + if (optVal != 0) { + networkWarnWithDescription(*this, "checking socket for error after poll", optVal); + return false; + } + + // We had activity and we don't have errors on the socket, we're connected. + break; + } + } + + if (!setBlock(_fd, true)) { + networkWarnWithDescription(*this, "could not set socket to blocking mode"); return false; } @@ -593,25 +666,6 @@ bool Socket::connect(SockAddr& remote) { setTimeout(_timeout); } - static const unsigned int connectTimeoutMillis = 5000; - ConnectBG bg(_fd, remote); - bg.go(); - if (bg.wait(connectTimeoutMillis)) { - if (bg.inError()) { - warning() << "Failed to connect to " << _remote.getAddr() << ":" << _remote.getPort() - << ", reason: " << bg.getErrnoWithDescription() << endl; - close(); - return false; - } - } else { - // time out the connect - close(); - bg.wait(); // so bg stays in scope until bg thread terminates - warning() << "Failed to connect to " << _remote.getAddr() << ":" << _remote.getPort() - << " after " << connectTimeoutMillis << " milliseconds, giving up." << endl; - return false; - } - if (remote.getType() != AF_UNIX) disableNagle(_fd); |