// @file sock.cpp /* Copyright 2009 10gen Inc. * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU Affero General Public License, version 3, * as published by the Free Software Foundation. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Affero General Public License for more details. * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . * * As a special exception, the copyright holders give permission to link the * code of portions of this program with the OpenSSL library under certain * conditions as described in each individual source file and distribute * linked combinations including the program with the OpenSSL library. You * must comply with the GNU Affero General Public License in all respects * for all of the code used other than as permitted herein. If you modify * file(s) with this exception, you may extend this exception to your * version of the file(s), but you are not obligated to do so. If you do not * wish to do so, delete this exception statement from your version. If you * delete this exception statement from all source files in the program, * then also delete it in the license file. */ #define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kNetwork #include "mongo/platform/basic.h" #include "mongo/util/net/sock.h" #if !defined(_WIN32) #include #include #include #include #include #include #include #include #if defined(__OpenBSD__) #include #endif #endif #include "mongo/config.h" #include "mongo/db/server_options.h" #include "mongo/util/background.h" #include "mongo/util/concurrency/value.h" #include "mongo/util/debug_util.h" #include "mongo/util/fail_point_service.h" #include "mongo/util/hex.h" #include "mongo/util/log.h" #include "mongo/util/mongoutils/str.h" #include "mongo/util/net/message.h" #include "mongo/util/net/socket_exception.h" #include "mongo/util/net/socket_poll.h" #include "mongo/util/net/ssl_manager.h" #include "mongo/util/quick_exit.h" namespace mongo { using std::endl; using std::pair; using std::string; using std::stringstream; using std::vector; MONGO_FP_DECLARE(throwSockExcep); static bool ipv6 = false; void enableIPv6(bool state) { ipv6 = state; } bool IPv6Enabled() { return ipv6; } void setSockTimeouts(int sock, double secs) { bool report = shouldLog(logger::LogSeverity::Debug(4)); DEV report = true; #if defined(_WIN32) DWORD timeout = secs * 1000; // Windows timeout is a DWORD, in milliseconds. int status = setsockopt(sock, SOL_SOCKET, SO_RCVTIMEO, reinterpret_cast(&timeout), sizeof(DWORD)); if (report && (status == SOCKET_ERROR)) log() << "unable to set SO_RCVTIMEO: " << errnoWithDescription(WSAGetLastError()) << endl; status = setsockopt(sock, SOL_SOCKET, SO_SNDTIMEO, reinterpret_cast(&timeout), sizeof(DWORD)); DEV if (report && (status == SOCKET_ERROR)) log() << "unable to set SO_SNDTIMEO: " << errnoWithDescription(WSAGetLastError()) << endl; #else struct timeval tv; tv.tv_sec = (int)secs; tv.tv_usec = (int)((long long)(secs * 1000 * 1000) % (1000 * 1000)); bool ok = setsockopt(sock, SOL_SOCKET, SO_RCVTIMEO, (char*)&tv, sizeof(tv)) == 0; if (report && !ok) log() << "unable to set SO_RCVTIMEO" << endl; ok = setsockopt(sock, SOL_SOCKET, SO_SNDTIMEO, (char*)&tv, sizeof(tv)) == 0; DEV if (report && !ok) log() << "unable to set SO_SNDTIMEO" << endl; #endif } #if defined(_WIN32) void disableNagle(int sock) { int x = 1; if (setsockopt(sock, IPPROTO_TCP, TCP_NODELAY, (char*)&x, sizeof(x))) error() << "disableNagle failed: " << errnoWithDescription() << endl; if (setsockopt(sock, SOL_SOCKET, SO_KEEPALIVE, (char*)&x, sizeof(x))) error() << "SO_KEEPALIVE failed: " << errnoWithDescription() << endl; } #else void disableNagle(int sock) { int x = 1; #ifdef SOL_TCP int level = SOL_TCP; #else int level = SOL_SOCKET; #endif if (setsockopt(sock, level, TCP_NODELAY, (char*)&x, sizeof(x))) error() << "disableNagle failed: " << errnoWithDescription() << endl; #ifdef SO_KEEPALIVE if (setsockopt(sock, SOL_SOCKET, SO_KEEPALIVE, (char*)&x, sizeof(x))) error() << "SO_KEEPALIVE failed: " << errnoWithDescription() << endl; #ifdef __linux__ socklen_t len = sizeof(x); if (getsockopt(sock, level, TCP_KEEPIDLE, (char*)&x, &len)) error() << "can't get TCP_KEEPIDLE: " << errnoWithDescription() << endl; if (x > 300) { x = 300; if (setsockopt(sock, level, TCP_KEEPIDLE, (char*)&x, sizeof(x))) { error() << "can't set TCP_KEEPIDLE: " << errnoWithDescription() << endl; } } len = sizeof(x); // just in case it changed if (getsockopt(sock, level, TCP_KEEPINTVL, (char*)&x, &len)) error() << "can't get TCP_KEEPINTVL: " << errnoWithDescription() << endl; if (x > 300) { x = 300; if (setsockopt(sock, level, TCP_KEEPINTVL, (char*)&x, sizeof(x))) { error() << "can't set TCP_KEEPINTVL: " << errnoWithDescription() << endl; } } #endif #endif } #endif string getAddrInfoStrError(int code) { #if !defined(_WIN32) return gai_strerror(code); #else /* gai_strerrorA is not threadsafe on windows. don't use it. */ return errnoWithDescription(code); #endif } // --- SockAddr string makeUnixSockPath(int port) { return mongoutils::str::stream() << serverGlobalParams.socket << "/mongodb-" << port << ".sock"; } // If an ip address is passed in, just return that. If a hostname is passed // in, look up its ip and return that. Returns "" on failure. string hostbyname(const char* hostname) { SockAddr sockAddr(hostname, 0); if (!sockAddr.isValid() || sockAddr.getAddr() == "0.0.0.0") return ""; else return sockAddr.getAddr(); } // --- my -- DiagStr& _hostNameCached = *(new DiagStr); // this is also written to from commands/cloud.cpp string getHostName() { char buf[256]; int ec = gethostname(buf, 127); if (ec || *buf == 0) { log() << "can't get this server's hostname " << errnoWithDescription() << endl; return ""; } return buf; } /** we store our host name once */ string getHostNameCached() { string temp = _hostNameCached.get(); if (_hostNameCached.empty()) { temp = getHostName(); _hostNameCached = temp; } return temp; } string prettyHostName() { StringBuilder s; s << getHostNameCached(); if (serverGlobalParams.port != ServerGlobalParams::DefaultDBPort) s << ':' << mongo::serverGlobalParams.port; return s.str(); } #ifdef MSG_NOSIGNAL const int portSendFlags = MSG_NOSIGNAL; const int portRecvFlags = MSG_NOSIGNAL; #else const int portSendFlags = 0; const int portRecvFlags = 0; #endif // ------------ Socket ----------------- static int socketGetLastError() { #ifdef _WIN32 return WSAGetLastError(); #else return errno; #endif } static SockAddr getLocalAddrForBoundSocketFd(int fd) { SockAddr result; int rc = getsockname(fd, result.raw(), &result.addressSize); if (rc != 0) { warning() << "Could not resolve local address for socket with fd " << fd << ": " << getAddrInfoStrError(socketGetLastError()); result = SockAddr(); } return result; } Socket::Socket(int fd, const SockAddr& remote) : _fd(fd), _remote(remote), _timeout(0), _lastValidityCheckAtSecs(time(0)), _logLevel(logger::LogSeverity::Log()) { _init(); if (fd >= 0) { _local = getLocalAddrForBoundSocketFd(_fd); } } Socket::Socket(double timeout, logger::LogSeverity ll) : _logLevel(ll) { _fd = -1; _timeout = timeout; _lastValidityCheckAtSecs = time(0); _init(); } Socket::~Socket() { close(); } void Socket::_init() { _bytesOut = 0; _bytesIn = 0; _awaitingHandshake = true; #ifdef MONGO_CONFIG_SSL _sslManager = 0; #endif } void Socket::close() { if (_fd >= 0) { // Stop any blocking reads/writes, and prevent new reads/writes #if defined(_WIN32) shutdown(_fd, SD_BOTH); #else shutdown(_fd, SHUT_RDWR); #endif closesocket(_fd); _fd = -1; } } #ifdef MONGO_CONFIG_SSL bool Socket::secure(SSLManagerInterface* mgr, const std::string& remoteHost) { fassert(16503, mgr); if (_fd < 0) { return false; } _sslManager = mgr; _sslConnection.reset(_sslManager->connect(this)); mgr->parseAndValidatePeerCertificateDeprecated(_sslConnection.get(), remoteHost); return true; } void Socket::secureAccepted(SSLManagerInterface* ssl) { _sslManager = ssl; } std::string Socket::doSSLHandshake(const char* firstBytes, int len) { if (!_sslManager) return ""; fassert(16506, _fd); if (_sslConnection.get()) { throw SocketException(SocketException::RECV_ERROR, "Attempt to call SSL_accept on already secure Socket from " + remoteString()); } _sslConnection.reset(_sslManager->accept(this, firstBytes, len)); return _sslManager->parseAndValidatePeerCertificateDeprecated(_sslConnection.get(), ""); } #endif class ConnectBG : public BackgroundJob { public: ConnectBG(int sock, SockAddr remote) : _sock(sock), _remote(remote) {} void run() { #if defined(_WIN32) if ((_res = _connect()) == SOCKET_ERROR) { _errnoWithDescription = errnoWithDescription(); } #else while ((_res = _connect()) == -1) { const int error = errno; if (error != EINTR) { _errnoWithDescription = errnoWithDescription(error); break; } } #endif } std::string name() const { return "ConnectBG"; } std::string getErrnoWithDescription() const { return _errnoWithDescription; } int inError() const { return _res; } private: int _connect() const { return ::connect(_sock, _remote.raw(), _remote.addressSize); } int _sock; int _res; SockAddr _remote; std::string _errnoWithDescription; }; bool Socket::connect(SockAddr& remote) { _remote = remote; _fd = socket(remote.getType(), SOCK_STREAM, 0); if (_fd == INVALID_SOCKET) { LOG(_logLevel) << "ERROR: connect invalid socket " << errnoWithDescription() << endl; return false; } if (_timeout > 0) { setTimeout(_timeout); } static const unsigned int connectTimeoutMillis = 5000; ConnectBG bg(_fd, remote); bg.go(); if (bg.wait(connectTimeoutMillis)) { if (bg.inError()) { warning() << "Failed to connect to " << _remote.getAddr() << ":" << _remote.getPort() << ", reason: " << bg.getErrnoWithDescription() << endl; close(); return false; } } else { // time out the connect close(); bg.wait(); // so bg stays in scope until bg thread terminates warning() << "Failed to connect to " << _remote.getAddr() << ":" << _remote.getPort() << " after " << connectTimeoutMillis << " milliseconds, giving up." << endl; return false; } if (remote.getType() != AF_UNIX) disableNagle(_fd); #ifdef SO_NOSIGPIPE // ignore SIGPIPE signals on osx, to avoid process exit const int one = 1; setsockopt(_fd, SOL_SOCKET, SO_NOSIGPIPE, &one, sizeof(int)); #endif _local = getLocalAddrForBoundSocketFd(_fd); _fdCreationMicroSec = curTimeMicros64(); _awaitingHandshake = false; return true; } // throws if SSL_write or send fails int Socket::_send(const char* data, int len, const char* context) { #ifdef MONGO_CONFIG_SSL if (_sslConnection.get()) { return _sslManager->SSL_write(_sslConnection.get(), data, len); } #endif int ret = ::send(_fd, data, len, portSendFlags); if (ret < 0) { handleSendError(ret, context); } return ret; } // sends all data or throws an exception void Socket::send(const char* data, int len, const char* context) { while (len > 0) { int ret = -1; if (MONGO_FAIL_POINT(throwSockExcep)) { #if defined(_WIN32) WSASetLastError(WSAENETUNREACH); #else errno = ENETUNREACH; #endif handleSendError(ret, context); } else { ret = _send(data, len, context); } _bytesOut += ret; fassert(16507, ret <= len); len -= ret; data += ret; } } void Socket::_send(const vector>& data, const char* context) { for (vector>::const_iterator i = data.begin(); i != data.end(); ++i) { char* data = i->first; int len = i->second; send(data, len, context); } } /** sends all data or throws an exception * @param context descriptive for logging */ void Socket::send(const vector>& data, const char* context) { #ifdef MONGO_CONFIG_SSL if (_sslConnection.get()) { _send(data, context); return; } #endif #if defined(_WIN32) // TODO use scatter/gather api _send(data, context); #else vector d(data.size()); int i = 0; for (vector>::const_iterator j = data.begin(); j != data.end(); ++j) { if (j->second > 0) { d[i].iov_base = j->first; d[i].iov_len = j->second; ++i; _bytesOut += j->second; } } struct msghdr meta; memset(&meta, 0, sizeof(meta)); meta.msg_iov = &d[0]; meta.msg_iovlen = d.size(); while (meta.msg_iovlen > 0) { int ret = -1; if (MONGO_FAIL_POINT(throwSockExcep)) { #if defined(_WIN32) WSASetLastError(WSAENETUNREACH); #else errno = ENETUNREACH; #endif } else { ret = ::sendmsg(_fd, &meta, portSendFlags); } if (ret == -1) { if (errno != EAGAIN || _timeout == 0) { LOG(_logLevel) << "Socket " << context << " send() " << errnoWithDescription() << ' ' << remoteString() << endl; throw SocketException(SocketException::SEND_ERROR, remoteString()); } else { LOG(_logLevel) << "Socket " << context << " send() remote timeout " << remoteString() << endl; throw SocketException(SocketException::SEND_TIMEOUT, remoteString()); } } else { struct iovec*& i = meta.msg_iov; while (ret > 0) { if (i->iov_len > unsigned(ret)) { i->iov_len -= ret; i->iov_base = (char*)(i->iov_base) + ret; ret = 0; } else { ret -= i->iov_len; ++i; --(meta.msg_iovlen); } } } } #endif } void Socket::recv(char* buf, int len) { while (len > 0) { int ret = -1; if (MONGO_FAIL_POINT(throwSockExcep)) { #if defined(_WIN32) WSASetLastError(WSAENETUNREACH); #else errno = ENETUNREACH; #endif if (ret <= 0) { handleRecvError(ret, len); continue; } } else { ret = unsafe_recv(buf, len); } fassert(16508, ret <= len); len -= ret; buf += ret; } } int Socket::unsafe_recv(char* buf, int max) { int x = _recv(buf, max); _bytesIn += x; return x; } // throws if SSL_read fails or recv returns an error int Socket::_recv(char* buf, int max) { #ifdef MONGO_CONFIG_SSL if (_sslConnection.get()) { return _sslManager->SSL_read(_sslConnection.get(), buf, max); } #endif int ret = ::recv(_fd, buf, max, portRecvFlags); if (ret <= 0) { handleRecvError(ret, max); // If no throw return and call _recv again return 0; } return ret; } void Socket::handleSendError(int ret, const char* context) { #if defined(_WIN32) const int mongo_errno = WSAGetLastError(); if (mongo_errno == WSAETIMEDOUT && _timeout != 0) { #else const int mongo_errno = errno; if ((mongo_errno == EAGAIN || mongo_errno == EWOULDBLOCK) && _timeout != 0) { #endif LOG(_logLevel) << "Socket " << context << " send() timed out " << remoteString() << endl; throw SocketException(SocketException::SEND_TIMEOUT, remoteString()); } else { LOG(_logLevel) << "Socket " << context << " send() " << errnoWithDescription(mongo_errno) << ' ' << remoteString() << endl; throw SocketException(SocketException::SEND_ERROR, remoteString()); } } void Socket::handleRecvError(int ret, int len) { if (ret == 0) { LOG(3) << "Socket recv() conn closed? " << remoteString() << endl; throw SocketException(SocketException::CLOSED, remoteString()); } // ret < 0 #if defined(_WIN32) int e = WSAGetLastError(); #else int e = errno; #if defined(EINTR) if (e == EINTR) { LOG(_logLevel) << "EINTR returned from recv(), retrying"; return; } #endif #endif #if defined(_WIN32) // Windows if ((e == EAGAIN || e == WSAETIMEDOUT) && _timeout > 0) { #else if (e == EAGAIN && _timeout > 0) { #endif // this is a timeout LOG(_logLevel) << "Socket recv() timeout " << remoteString() << endl; throw SocketException(SocketException::RECV_TIMEOUT, remoteString()); } LOG(_logLevel) << "Socket recv() " << errnoWithDescription(e) << " " << remoteString() << endl; throw SocketException(SocketException::RECV_ERROR, remoteString()); } void Socket::setTimeout(double secs) { setSockTimeouts(_fd, secs); } // TODO: allow modification? // // : secs to wait between stillConnected checks // 0 : always check // -1 : never check const int Socket::errorPollIntervalSecs(5); // Patch to allow better tolerance of flaky network connections that get broken // while we aren't looking. // TODO: Remove when better async changes come. // // isStillConnected() polls the socket at max every Socket::errorPollIntervalSecs to determine // if any disconnection-type events have happened on the socket. bool Socket::isStillConnected() { if (_fd == -1) { // According to the man page, poll will respond with POLLVNAL for invalid or // unopened descriptors, but it doesn't seem to be properly implemented in // some platforms - it can return 0 events and 0 for revent. Hence this workaround. return false; } if (errorPollIntervalSecs < 0) return true; if (!isPollSupported()) return true; // nothing we can do time_t now = time(0); time_t idleTimeSecs = now - _lastValidityCheckAtSecs; // Only check once every 5 secs if (idleTimeSecs < errorPollIntervalSecs) return true; // Reset our timer, we're checking the connection _lastValidityCheckAtSecs = now; // It's been long enough, poll to see if our socket is still connected pollfd pollInfo; pollInfo.fd = _fd; // We only care about reading the EOF message on clean close (and errors) pollInfo.events = POLLIN; // Poll( info[], size, timeout ) - timeout == 0 => nonblocking int nEvents = socketPoll(&pollInfo, 1, 0); LOG(2) << "polling for status of connection to " << remoteString() << ", " << (nEvents == 0 ? "no events" : nEvents == -1 ? "error detected" : "event detected") << endl; if (nEvents == 0) { // No events incoming, return still connected AFAWK return true; } else if (nEvents < 0) { // Poll itself failed, this is weird, warn and log errno warning() << "Socket poll() failed during connectivity check" << " (idle " << idleTimeSecs << " secs," << " remote host " << remoteString() << ")" << causedBy(errnoWithDescription()) << endl; // Return true since it's not clear that we're disconnected. return true; } dassert(nEvents == 1); dassert(pollInfo.revents > 0); // Return false at this point, some event happened on the socket, but log what the // actual event was. if (pollInfo.revents & POLLIN) { // There shouldn't really be any data to recv here, so make sure this // is a clean hangup. const int testBufLength = 1024; char testBuf[testBufLength]; int recvd = ::recv(_fd, testBuf, testBufLength, portRecvFlags); if (recvd < 0) { // An error occurred during recv, warn and log errno warning() << "Socket recv() failed during connectivity check" << " (idle " << idleTimeSecs << " secs," << " remote host " << remoteString() << ")" << causedBy(errnoWithDescription()) << endl; } else if (recvd > 0) { // We got nonzero data from this socket, very weird? // Log and warn at runtime, log and abort at devtime // TODO: Dump the data to the log somehow? error() << "Socket found pending " << recvd << " bytes of data during connectivity check" << " (idle " << idleTimeSecs << " secs," << " remote host " << remoteString() << ")" << endl; DEV { std::string hex = hexdump(testBuf, recvd); error() << "Hex dump of stale log data: " << hex << endl; } dassert(false); } else { // recvd == 0, socket closed remotely, just return false LOG(0) << "Socket closed remotely, no longer connected" << " (idle " << idleTimeSecs << " secs," << " remote host " << remoteString() << ")" << endl; } } else if (pollInfo.revents & POLLHUP) { // A hangup has occurred on this socket LOG(0) << "Socket hangup detected, no longer connected" << " (idle " << idleTimeSecs << " secs," << " remote host " << remoteString() << ")" << endl; } else if (pollInfo.revents & POLLERR) { // An error has occurred on this socket LOG(0) << "Socket error detected, no longer connected" << " (idle " << idleTimeSecs << " secs," << " remote host " << remoteString() << ")" << endl; } else if (pollInfo.revents & POLLNVAL) { // Socket descriptor itself is weird // Log and warn at runtime, log and abort at devtime error() << "Socket descriptor detected as invalid" << " (idle " << idleTimeSecs << " secs," << " remote host " << remoteString() << ")" << endl; dassert(false); } else { // Don't know what poll is saying here // Log and warn at runtime, log and abort at devtime error() << "Socket had unknown event (" << static_cast(pollInfo.revents) << ")" << " (idle " << idleTimeSecs << " secs," << " remote host " << remoteString() << ")" << endl; dassert(false); } return false; } #if defined(_WIN32) struct WinsockInit { WinsockInit() { WSADATA d; if (WSAStartup(MAKEWORD(2, 2), &d) != 0) { log() << "ERROR: wsastartup failed " << errnoWithDescription() << endl; quickExit(EXIT_NTSERVICE_ERROR); } } } winsock_init; #endif } // namespace mongo