summaryrefslogtreecommitdiff
path: root/src/mongo/transport/asio_utils.h
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/transport/asio_utils.h')
-rw-r--r--src/mongo/transport/asio_utils.h93
1 files changed, 93 insertions, 0 deletions
diff --git a/src/mongo/transport/asio_utils.h b/src/mongo/transport/asio_utils.h
index 89e7821f5b8..e5647a19264 100644
--- a/src/mongo/transport/asio_utils.h
+++ b/src/mongo/transport/asio_utils.h
@@ -30,9 +30,14 @@
#include "mongo/base/status.h"
#include "mongo/base/system_error.h"
+#include "mongo/util/errno_util.h"
#include "mongo/util/net/hostandport.h"
#include "mongo/util/net/sockaddr.h"
+#ifndef _WIN32
+#include <sys/poll.h>
+#endif // ndef _WIN32
+
#include <asio.hpp>
namespace mongo {
@@ -60,6 +65,9 @@ inline Status errorCodeToStatus(const std::error_code& ec) {
if (ec == asio::error::try_again || ec == asio::error::would_block) {
#endif
return {ErrorCodes::NetworkTimeout, "Socket operation timed out"};
+ } else if (ec == asio::error::eof || ec == asio::error::connection_reset ||
+ ec == asio::error::network_reset) {
+ return {ErrorCodes::HostUnreachable, "Connection was closed"};
}
// If the ec.category() is a mongoErrorCategory() then this error was propogated from
@@ -73,5 +81,90 @@ inline Status errorCodeToStatus(const std::error_code& ec) {
return {errorCode, ec.message()};
}
+/*
+ * The ASIO implementation of poll (i.e. socket.wait()) cannot poll for a mask of events, and
+ * doesn't support timeouts.
+ *
+ * This wraps up ::select/::poll for Windows/POSIX for a single socket and handles EINTR on POSIX
+ *
+ * - On timeout: it returns Status(ErrorCodes::NetworkTimeout)
+ * - On poll returning with an event: it returns the EventsMask for the socket, the caller must
+ * check whether it matches the expected events mask.
+ * - On error: it returns a Status(ErrorCodes::InternalError)
+ */
+template <typename Socket, typename EventsMask>
+StatusWith<EventsMask> pollASIOSocket(Socket& socket, EventsMask mask, Milliseconds timeout) {
+#ifdef _WIN32
+ fd_set readfds;
+ fd_set writefds;
+ fd_set errfds;
+
+ FD_ZERO(&readfds);
+ FD_ZERO(&writefds);
+ FD_ZERO(&errfds);
+
+ auto fd = socket.native_handle();
+ if (mask & POLLIN) {
+ FD_SET(fd, &readfds);
+ }
+ if (mask & POLLOUT) {
+ FD_SET(fd, &writefds);
+ }
+ FD_SET(fd, &errfds);
+
+ timeval timeoutTv{};
+ auto timeoutUs = duration_cast<Microseconds>(timeout);
+ if (timeoutUs >= Seconds{1}) {
+ auto timeoutSec = duration_cast<Seconds>(timeoutUs);
+ timeoutTv.tv_sec = timeoutSec.count();
+ timeoutUs -= timeoutSec;
+ }
+ timeoutTv.tv_usec = timeoutUs.count();
+ int result = ::select(1, &readfds, &writefds, &errfds, &timeoutTv);
+ if (result == SOCKET_ERROR) {
+ auto errDesc = errnoWithDescription(WSAGetLastError());
+ return {ErrorCodes::InternalError, errDesc};
+ }
+ int revents = (FD_ISSET(fd, &readfds) ? POLLIN : 0) | (FD_ISSET(fd, &writefds) ? POLLOUT : 0) |
+ (FD_ISSET(fd, &errfds) ? POLLERR : 0);
+#else
+ pollfd pollItem;
+ pollItem.fd = socket.native_handle();
+ pollItem.events = mask;
+
+ int result;
+ boost::optional<Date_t> expiration;
+ if (timeout.count() > 0) {
+ expiration = Date_t::now() + timeout;
+ }
+ do {
+ Milliseconds curTimeout;
+ if (expiration) {
+ curTimeout = *expiration - Date_t::now();
+ if (curTimeout.count() <= 0) {
+ result = 0;
+ break;
+ }
+ } else {
+ curTimeout = timeout;
+ }
+ result = ::poll(&pollItem, 1, curTimeout.count());
+ } while (result == -1 && errno == EINTR);
+
+ if (result == -1) {
+ int errCode = errno;
+ return {ErrorCodes::InternalError, errnoWithDescription(errCode)};
+ }
+ int revents = pollItem.revents;
+#endif
+
+ if (result == 0) {
+ return {ErrorCodes::NetworkTimeout, "Timed out waiting for poll"};
+ } else {
+ return revents;
+ }
+}
+
+
} // namespace transport
} // namespace mongo