diff options
Diffstat (limited to 'src/mongo/transport/asio_utils.h')
-rw-r--r-- | src/mongo/transport/asio_utils.h | 93 |
1 files changed, 93 insertions, 0 deletions
diff --git a/src/mongo/transport/asio_utils.h b/src/mongo/transport/asio_utils.h index 89e7821f5b8..e5647a19264 100644 --- a/src/mongo/transport/asio_utils.h +++ b/src/mongo/transport/asio_utils.h @@ -30,9 +30,14 @@ #include "mongo/base/status.h" #include "mongo/base/system_error.h" +#include "mongo/util/errno_util.h" #include "mongo/util/net/hostandport.h" #include "mongo/util/net/sockaddr.h" +#ifndef _WIN32 +#include <sys/poll.h> +#endif // ndef _WIN32 + #include <asio.hpp> namespace mongo { @@ -60,6 +65,9 @@ inline Status errorCodeToStatus(const std::error_code& ec) { if (ec == asio::error::try_again || ec == asio::error::would_block) { #endif return {ErrorCodes::NetworkTimeout, "Socket operation timed out"}; + } else if (ec == asio::error::eof || ec == asio::error::connection_reset || + ec == asio::error::network_reset) { + return {ErrorCodes::HostUnreachable, "Connection was closed"}; } // If the ec.category() is a mongoErrorCategory() then this error was propogated from @@ -73,5 +81,90 @@ inline Status errorCodeToStatus(const std::error_code& ec) { return {errorCode, ec.message()}; } +/* + * The ASIO implementation of poll (i.e. socket.wait()) cannot poll for a mask of events, and + * doesn't support timeouts. + * + * This wraps up ::select/::poll for Windows/POSIX for a single socket and handles EINTR on POSIX + * + * - On timeout: it returns Status(ErrorCodes::NetworkTimeout) + * - On poll returning with an event: it returns the EventsMask for the socket, the caller must + * check whether it matches the expected events mask. + * - On error: it returns a Status(ErrorCodes::InternalError) + */ +template <typename Socket, typename EventsMask> +StatusWith<EventsMask> pollASIOSocket(Socket& socket, EventsMask mask, Milliseconds timeout) { +#ifdef _WIN32 + fd_set readfds; + fd_set writefds; + fd_set errfds; + + FD_ZERO(&readfds); + FD_ZERO(&writefds); + FD_ZERO(&errfds); + + auto fd = socket.native_handle(); + if (mask & POLLIN) { + FD_SET(fd, &readfds); + } + if (mask & POLLOUT) { + FD_SET(fd, &writefds); + } + FD_SET(fd, &errfds); + + timeval timeoutTv{}; + auto timeoutUs = duration_cast<Microseconds>(timeout); + if (timeoutUs >= Seconds{1}) { + auto timeoutSec = duration_cast<Seconds>(timeoutUs); + timeoutTv.tv_sec = timeoutSec.count(); + timeoutUs -= timeoutSec; + } + timeoutTv.tv_usec = timeoutUs.count(); + int result = ::select(1, &readfds, &writefds, &errfds, &timeoutTv); + if (result == SOCKET_ERROR) { + auto errDesc = errnoWithDescription(WSAGetLastError()); + return {ErrorCodes::InternalError, errDesc}; + } + int revents = (FD_ISSET(fd, &readfds) ? POLLIN : 0) | (FD_ISSET(fd, &writefds) ? POLLOUT : 0) | + (FD_ISSET(fd, &errfds) ? POLLERR : 0); +#else + pollfd pollItem; + pollItem.fd = socket.native_handle(); + pollItem.events = mask; + + int result; + boost::optional<Date_t> expiration; + if (timeout.count() > 0) { + expiration = Date_t::now() + timeout; + } + do { + Milliseconds curTimeout; + if (expiration) { + curTimeout = *expiration - Date_t::now(); + if (curTimeout.count() <= 0) { + result = 0; + break; + } + } else { + curTimeout = timeout; + } + result = ::poll(&pollItem, 1, curTimeout.count()); + } while (result == -1 && errno == EINTR); + + if (result == -1) { + int errCode = errno; + return {ErrorCodes::InternalError, errnoWithDescription(errCode)}; + } + int revents = pollItem.revents; +#endif + + if (result == 0) { + return {ErrorCodes::NetworkTimeout, "Timed out waiting for poll"}; + } else { + return revents; + } +} + + } // namespace transport } // namespace mongo |