summaryrefslogtreecommitdiff
path: root/src/mongo/transport/asio_transport_layer.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/transport/asio_transport_layer.cpp')
-rw-r--r--src/mongo/transport/asio_transport_layer.cpp1686
1 files changed, 1686 insertions, 0 deletions
diff --git a/src/mongo/transport/asio_transport_layer.cpp b/src/mongo/transport/asio_transport_layer.cpp
new file mode 100644
index 00000000000..e96df2dc36d
--- /dev/null
+++ b/src/mongo/transport/asio_transport_layer.cpp
@@ -0,0 +1,1686 @@
+/**
+ * Copyright (C) 2018-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/transport/asio_transport_layer.h"
+
+#include <fmt/format.h>
+#include <fstream>
+
+#ifdef __linux__
+#include <netinet/tcp.h>
+#endif
+
+#include <asio.hpp>
+#include <asio/system_timer.hpp>
+#include <boost/algorithm/string.hpp>
+#include <boost/filesystem/operations.hpp>
+
+#include "mongo/config.h"
+
+#include "mongo/base/system_error.h"
+#include "mongo/db/server_feature_flags_gen.h"
+#include "mongo/db/server_options.h"
+#include "mongo/db/service_context.h"
+#include "mongo/db/stats/counters.h"
+#include "mongo/logv2/log.h"
+#include "mongo/transport/asio_utils.h"
+#include "mongo/transport/service_entry_point.h"
+#include "mongo/transport/transport_options_gen.h"
+#include "mongo/util/clock_source.h"
+#include "mongo/util/errno_util.h"
+#include "mongo/util/executor_stats.h"
+#include "mongo/util/hierarchical_acquisition.h"
+#include "mongo/util/net/hostandport.h"
+#include "mongo/util/net/sockaddr.h"
+#include "mongo/util/net/ssl_manager.h"
+#include "mongo/util/net/ssl_options.h"
+#include "mongo/util/options_parser/startup_options.h"
+#include "mongo/util/strong_weak_finish_line.h"
+
+#ifdef MONGO_CONFIG_SSL
+#include "mongo/util/net/ssl.hpp"
+#endif
+
+// asio_session.h has some header dependencies that require it to be the last header.
+
+#ifdef __linux__
+#include "mongo/transport/baton_asio_linux.h"
+#endif
+
+#include "mongo/transport/asio_session.h"
+
+#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kNetwork
+
+
+namespace mongo {
+namespace transport {
+
+namespace {
+
+using TcpKeepaliveOption = SocketOption<SOL_SOCKET, SO_KEEPALIVE>;
+#ifdef __linux__
+using TcpInfoOption = SocketOption<IPPROTO_TCP, TCP_INFO, tcp_info>;
+using TcpKeepaliveCountOption = SocketOption<IPPROTO_TCP, TCP_KEEPCNT>;
+using TcpKeepaliveIdleSecsOption = SocketOption<IPPROTO_TCP, TCP_KEEPIDLE>;
+using TcpKeepaliveIntervalSecsOption = SocketOption<IPPROTO_TCP, TCP_KEEPINTVL>;
+#ifdef TCP_USER_TIMEOUT
+using TcpUserTimeoutMillisOption = SocketOption<IPPROTO_TCP, TCP_USER_TIMEOUT, unsigned>;
+#endif
+#endif // __linux__
+
+#ifdef TCP_FASTOPEN
+using TcpFastOpenOption = SocketOption<IPPROTO_TCP, TCP_FASTOPEN>;
+#endif
+/**
+ * On systems with TCP_FASTOPEN_CONNECT (linux >= 4.11),
+ * we can get TFO "for free" by letting the kernel handle
+ * postponing connect() until the first send() call.
+ *
+ * https://github.com/torvalds/linux/commit/19f6d3f3c8422d65b5e3d2162e30ef07c6e21ea2
+ */
+#ifdef TCP_FASTOPEN_CONNECT
+using TcpFastOpenConnectOption = SocketOption<IPPROTO_TCP, TCP_FASTOPEN_CONNECT>;
+#endif
+
+/**
+ * Set to `true` if any of the following set parameters were explicitly configured.
+ * - tcpFastOpenServer
+ * - tcpFastOpenClient
+ * - tcpFastOpenQueueSize
+ */
+bool tcpFastOpenIsConfigured = false;
+boost::optional<Status> maybeTcpFastOpenStatus;
+} // namespace
+
+MONGO_FAIL_POINT_DEFINE(asioTransportLayerAsyncConnectTimesOut);
+MONGO_FAIL_POINT_DEFINE(asioTransportLayerDelayConnection);
+MONGO_FAIL_POINT_DEFINE(asioTransportLayerHangBeforeAcceptCallback);
+MONGO_FAIL_POINT_DEFINE(asioTransportLayerHangDuringAcceptCallback);
+MONGO_FAIL_POINT_DEFINE(asioTransportLayerAsyncConnectReturnsConnectionError);
+
+#ifdef MONGO_CONFIG_SSL
+SSLConnectionContext::~SSLConnectionContext() = default;
+#endif
+
+class ASIOReactorTimer final : public ReactorTimer {
+public:
+ using TimerType = synchronized_value<asio::system_timer, RawSynchronizedValueMutexPolicy>;
+ explicit ASIOReactorTimer(asio::io_context& ctx)
+ : _timer(std::make_shared<TimerType>(asio::system_timer(ctx))) {}
+
+ ~ASIOReactorTimer() {
+ // The underlying timer won't get destroyed until the last promise from _asyncWait
+ // has been filled, so cancel the timer so our promises get fulfilled
+ cancel();
+ }
+
+ void cancel(const BatonHandle& baton = nullptr) override {
+ // If we have a baton try to cancel that.
+ if (baton && baton->networking() && baton->networking()->cancelTimer(*this)) {
+ LOGV2_DEBUG(23010, 2, "Canceled via baton, skipping asio cancel.");
+ return;
+ }
+
+ // Otherwise there could be a previous timer that was scheduled normally.
+ (**_timer)->cancel();
+ }
+
+
+ Future<void> waitUntil(Date_t expiration, const BatonHandle& baton = nullptr) override {
+ if (baton && baton->networking()) {
+ return _asyncWait([&] { return baton->networking()->waitUntil(*this, expiration); },
+ baton);
+ } else {
+ return _asyncWait([&] { (**_timer)->expires_at(expiration.toSystemTimePoint()); });
+ }
+ }
+
+private:
+ template <typename ArmTimerCb>
+ Future<void> _asyncWait(ArmTimerCb&& armTimer) {
+ try {
+ cancel();
+
+ armTimer();
+ return (**_timer)
+ ->async_wait(UseFuture{})
+ .tapError([timer = _timer](const Status& status) {
+ if (status != ErrorCodes::CallbackCanceled) {
+ LOGV2_DEBUG(23011,
+ 2,
+ "Timer received error: {error}",
+ "Timer received error",
+ "error"_attr = status);
+ }
+ });
+
+ } catch (asio::system_error& ex) {
+ return futurize(ex.code());
+ }
+ }
+
+ template <typename ArmTimerCb>
+ Future<void> _asyncWait(ArmTimerCb&& armTimer, const BatonHandle& baton) {
+ cancel(baton);
+
+ auto pf = makePromiseFuture<void>();
+ armTimer().getAsync([p = std::move(pf.promise)](Status status) mutable {
+ if (status.isOK()) {
+ p.emplaceValue();
+ } else {
+ p.setError(status);
+ }
+ });
+
+ return std::move(pf.future);
+ }
+
+ std::shared_ptr<TimerType> _timer;
+};
+
+class AsioTransportLayer::ASIOReactor final : public Reactor {
+public:
+ ASIOReactor() : _clkSource(this), _stats(&_clkSource), _ioContext() {}
+
+ void run() noexcept override {
+ ThreadIdGuard threadIdGuard(this);
+ asio::io_context::work work(_ioContext);
+ _ioContext.run();
+ }
+
+ void runFor(Milliseconds time) noexcept override {
+ ThreadIdGuard threadIdGuard(this);
+ asio::io_context::work work(_ioContext);
+ _ioContext.run_for(time.toSystemDuration());
+ }
+
+ void stop() override {
+ _ioContext.stop();
+ }
+
+ void drain() override {
+ ThreadIdGuard threadIdGuard(this);
+ _ioContext.restart();
+ while (_ioContext.poll()) {
+ LOGV2_DEBUG(23012, 2, "Draining remaining work in reactor.");
+ }
+ _ioContext.stop();
+ }
+
+ std::unique_ptr<ReactorTimer> makeTimer() override {
+ return std::make_unique<ASIOReactorTimer>(_ioContext);
+ }
+
+ Date_t now() override {
+ return Date_t(asio::system_timer::clock_type::now());
+ }
+
+ void schedule(Task task) override {
+ asio::post(_ioContext, [task = _stats.wrapTask(std::move(task))] { task(Status::OK()); });
+ }
+
+ void dispatch(Task task) override {
+ asio::dispatch(_ioContext,
+ [task = _stats.wrapTask(std::move(task))] { task(Status::OK()); });
+ }
+
+ bool onReactorThread() const override {
+ return this == _reactorForThread;
+ }
+
+ operator asio::io_context&() {
+ return _ioContext;
+ }
+
+ void appendStats(BSONObjBuilder& bob) const override {
+ _stats.serialize(&bob);
+ }
+
+private:
+ // Provides `ClockSource` API for the reactor's clock source.
+ class ReactorClockSource final : public ClockSource {
+ public:
+ explicit ReactorClockSource(ASIOReactor* reactor) : _reactor(reactor) {}
+ ~ReactorClockSource() = default;
+
+ Milliseconds getPrecision() override {
+ MONGO_UNREACHABLE;
+ }
+
+ Date_t now() override {
+ return _reactor->now();
+ }
+
+ private:
+ ASIOReactor* const _reactor;
+ };
+
+ class ThreadIdGuard {
+ public:
+ ThreadIdGuard(AsioTransportLayer::ASIOReactor* reactor) {
+ invariant(!_reactorForThread);
+ _reactorForThread = reactor;
+ }
+
+ ~ThreadIdGuard() {
+ invariant(_reactorForThread);
+ _reactorForThread = nullptr;
+ }
+ };
+
+ static thread_local ASIOReactor* _reactorForThread;
+
+ ReactorClockSource _clkSource;
+
+ ExecutorStats _stats;
+
+ asio::io_context _ioContext;
+};
+
+thread_local AsioTransportLayer::ASIOReactor* AsioTransportLayer::ASIOReactor::_reactorForThread =
+ nullptr;
+
+AsioTransportLayer::Options::Options(const ServerGlobalParams* params,
+ boost::optional<int> loadBalancerPort)
+ : port(params->port),
+ loadBalancerPort(loadBalancerPort),
+ ipList(params->bind_ips),
+#ifndef _WIN32
+ useUnixSockets(!params->noUnixSocket),
+#endif
+ enableIPv6(params->enableIPv6),
+ maxConns(params->maxConns) {
+}
+
+AsioTransportLayer::TimerService::TimerService(Options opt)
+ : _reactor(std::make_shared<AsioTransportLayer::ASIOReactor>()) {
+ if (opt.spawn)
+ _spawn = std::move(opt.spawn);
+}
+
+AsioTransportLayer::TimerService::~TimerService() {
+ stop();
+}
+
+void AsioTransportLayer::TimerService::start() {
+ // Skip the expensive lock acquisition and `compareAndSwap` in the common path.
+ if (MONGO_likely(_state.load() != State::kInitialized))
+ return;
+
+ // The following ensures only one thread continues to spawn a thread to run the reactor. It also
+ // ensures concurrent `start()` and `stop()` invocations are serialized. Holding the lock
+ // guarantees that the following runs either before or after running `stop()`. Note that using
+ // `compareAndSwap` while holding the lock is for simplicity and not necessary.
+ auto lk = stdx::lock_guard(_mutex);
+ auto precondition = State::kInitialized;
+ if (_state.compareAndSwap(&precondition, State::kStarted)) {
+ _thread = _spawn([reactor = _reactor] {
+ if (!serverGlobalParams.quiet.load()) {
+ LOGV2_INFO(5490002, "Started a new thread for the timer service");
+ }
+
+ reactor->run();
+
+ if (!serverGlobalParams.quiet.load()) {
+ LOGV2_INFO(5490003, "Returning from the timer service thread");
+ }
+ });
+ }
+}
+
+void AsioTransportLayer::TimerService::stop() {
+ // It's possible for `stop()` to be called without `start()` having been called (or for them to
+ // be called concurrently), so we only proceed with stopping the reactor and joining the thread
+ // if we've already transitioned to the `kStarted` state.
+ auto lk = stdx::lock_guard(_mutex);
+ if (_state.swap(State::kStopped) != State::kStarted)
+ return;
+
+ _reactor->stop();
+ _thread.join();
+}
+
+std::unique_ptr<ReactorTimer> AsioTransportLayer::TimerService::makeTimer() {
+ return _getReactor()->makeTimer();
+}
+
+Date_t AsioTransportLayer::TimerService::now() {
+ return _getReactor()->now();
+}
+
+Reactor* AsioTransportLayer::TimerService::_getReactor() {
+ // TODO SERVER-57253 We can start this service as part of starting `AsioTransportLayer`.
+ // Then, we can remove the following invocation of `start()`.
+ start();
+ return _reactor.get();
+}
+
+AsioTransportLayer::AsioTransportLayer(const AsioTransportLayer::Options& opts,
+ ServiceEntryPoint* sep,
+ const WireSpec& wireSpec)
+ : TransportLayer(wireSpec),
+ _ingressReactor(std::make_shared<ASIOReactor>()),
+ _egressReactor(std::make_shared<ASIOReactor>()),
+ _acceptorReactor(std::make_shared<ASIOReactor>()),
+ _sep(sep),
+ _listenerOptions(opts),
+ _timerService(std::make_unique<TimerService>()) {}
+
+AsioTransportLayer::~AsioTransportLayer() = default;
+
+struct AsioTransportLayer::AcceptorRecord {
+ AcceptorRecord(SockAddr address, GenericAcceptor acceptor)
+ : address(std::move(address)), acceptor(std::move(acceptor)) {}
+
+ SockAddr address;
+ GenericAcceptor acceptor;
+ // Tracks the amount of incoming connections waiting to be accepted by the server on this
+ // acceptor.
+ AtomicWord<int> backlogQueueDepth{0};
+};
+
+class WrappedEndpoint {
+public:
+ using Endpoint = asio::generic::stream_protocol::endpoint;
+
+ explicit WrappedEndpoint(const asio::ip::basic_resolver_entry<asio::ip::tcp>& source)
+ : _str(str::stream() << source.endpoint().address().to_string() << ":"
+ << source.service_name()),
+ _endpoint(source.endpoint()) {}
+
+#ifndef _WIN32
+ explicit WrappedEndpoint(const asio::local::stream_protocol::endpoint& source)
+ : _str(source.path()), _endpoint(source) {}
+#endif
+
+ WrappedEndpoint() = default;
+
+ Endpoint* operator->() noexcept {
+ return &_endpoint;
+ }
+
+ const Endpoint* operator->() const noexcept {
+ return &_endpoint;
+ }
+
+ Endpoint& operator*() noexcept {
+ return _endpoint;
+ }
+
+ const Endpoint& operator*() const noexcept {
+ return _endpoint;
+ }
+
+ bool operator<(const WrappedEndpoint& rhs) const noexcept {
+ return _endpoint < rhs._endpoint;
+ }
+
+ const std::string& toString() const {
+ return _str;
+ }
+
+ sa_family_t family() const {
+ return _endpoint.data()->sa_family;
+ }
+
+private:
+ std::string _str;
+ Endpoint _endpoint;
+};
+
+using Resolver = asio::ip::tcp::resolver;
+class WrappedResolver {
+public:
+ using Flags = Resolver::flags;
+ using EndpointVector = std::vector<WrappedEndpoint>;
+
+ explicit WrappedResolver(asio::io_context& ioCtx) : _resolver(ioCtx) {}
+
+ StatusWith<EndpointVector> resolve(const HostAndPort& peer, bool enableIPv6) {
+ if (auto unixEp = _checkForUnixSocket(peer)) {
+ return *unixEp;
+ }
+
+ // We always want to resolve the "service" (port number) as a numeric.
+ //
+ // We intentionally don't set the Resolver::address_configured flag because it might prevent
+ // us from connecting to localhost on hosts with only a loopback interface
+ // (see SERVER-1579).
+ const auto flags = Resolver::numeric_service;
+
+ // We resolve in two steps, the first step tries to resolve the hostname as an IP address -
+ // that way if there's a DNS timeout, we can still connect to IP addresses quickly.
+ // (See SERVER-1709)
+ //
+ // Then, if the numeric (IP address) lookup failed, we fall back to DNS or return the error
+ // from the resolver.
+ return _resolve(peer, flags | Resolver::numeric_host, enableIPv6)
+ .onError([=](Status) { return _resolve(peer, flags, enableIPv6); })
+ .getNoThrow();
+ }
+
+ Future<EndpointVector> asyncResolve(const HostAndPort& peer, bool enableIPv6) {
+ if (auto unixEp = _checkForUnixSocket(peer)) {
+ return *unixEp;
+ }
+
+ // We follow the same numeric -> hostname fallback procedure as the synchronous resolver
+ // function for setting resolver flags (see above).
+ const auto flags = Resolver::numeric_service;
+ return _asyncResolve(peer, flags | Resolver::numeric_host, enableIPv6).onError([=](Status) {
+ return _asyncResolve(peer, flags, enableIPv6);
+ });
+ }
+
+ void cancel() {
+ _resolver.cancel();
+ }
+
+private:
+ boost::optional<EndpointVector> _checkForUnixSocket(const HostAndPort& peer) {
+#ifndef _WIN32
+ if (str::contains(peer.host(), '/')) {
+ asio::local::stream_protocol::endpoint ep(peer.host());
+ return EndpointVector{WrappedEndpoint(ep)};
+ }
+#endif
+ return boost::none;
+ }
+
+ Future<EndpointVector> _resolve(const HostAndPort& peer, Flags flags, bool enableIPv6) {
+ std::error_code ec;
+ auto port = std::to_string(peer.port());
+ Results results;
+ if (enableIPv6) {
+ results = _resolver.resolve(peer.host(), port, flags, ec);
+ } else {
+ results = _resolver.resolve(asio::ip::tcp::v4(), peer.host(), port, flags, ec);
+ }
+
+ if (ec) {
+ return _makeFuture(errorCodeToStatus(ec, "resolve"), peer);
+ } else {
+ return _makeFuture(results, peer);
+ }
+ }
+
+ Future<EndpointVector> _asyncResolve(const HostAndPort& peer, Flags flags, bool enableIPv6) {
+ auto port = std::to_string(peer.port());
+ Future<Results> ret;
+ if (enableIPv6) {
+ ret = _resolver.async_resolve(peer.host(), port, flags, UseFuture{});
+ } else {
+ ret =
+ _resolver.async_resolve(asio::ip::tcp::v4(), peer.host(), port, flags, UseFuture{});
+ }
+
+ return std::move(ret)
+ .onError([this, peer](Status status) { return _checkResults(status, peer); })
+ .then([this, peer](Results results) { return _makeFuture(results, peer); });
+ }
+
+ using Results = Resolver::results_type;
+ StatusWith<Results> _checkResults(StatusWith<Results> results, const HostAndPort& peer) {
+ if (!results.isOK()) {
+ return Status{ErrorCodes::HostNotFound,
+ str::stream() << "Could not find address for " << peer << ": "
+ << results.getStatus()};
+ } else if (results.getValue().empty()) {
+ return Status{ErrorCodes::HostNotFound,
+ str::stream() << "Could not find address for " << peer};
+ } else {
+ return results;
+ }
+ }
+
+ Future<EndpointVector> _makeFuture(StatusWith<Results> results, const HostAndPort& peer) {
+ results = _checkResults(std::move(results), peer);
+ if (!results.isOK()) {
+ return results.getStatus();
+ } else {
+ auto& epl = results.getValue();
+ return EndpointVector(epl.begin(), epl.end());
+ }
+ }
+
+ Resolver _resolver;
+};
+
+Status makeConnectError(Status status, const HostAndPort& peer, const WrappedEndpoint& endpoint) {
+ std::string errmsg;
+ if (peer.toString() != endpoint.toString() && !endpoint.toString().empty()) {
+ errmsg = str::stream() << "Error connecting to " << peer << " (" << endpoint.toString()
+ << ")";
+ } else {
+ errmsg = str::stream() << "Error connecting to " << peer;
+ }
+
+ return status.withContext(errmsg);
+}
+
+
+StatusWith<SessionHandle> AsioTransportLayer::connect(
+ HostAndPort peer,
+ ConnectSSLMode sslMode,
+ Milliseconds timeout,
+ boost::optional<TransientSSLParams> transientSSLParams) {
+ if (transientSSLParams) {
+ uassert(ErrorCodes::InvalidSSLConfiguration,
+ "Specified transient SSL params but connection SSL mode is not set",
+ sslMode == kEnableSSL);
+ LOGV2_DEBUG(
+ 5270701, 2, "Connecting to peer using transient SSL connection", "peer"_attr = peer);
+ }
+
+ std::error_code ec;
+ AsioSession::GenericSocket sock(*_egressReactor);
+ WrappedResolver resolver(*_egressReactor);
+
+ Date_t timeBefore = Date_t::now();
+ auto swEndpoints = resolver.resolve(peer, _listenerOptions.enableIPv6);
+ Date_t timeAfter = Date_t::now();
+ if (timeAfter - timeBefore > kSlowOperationThreshold) {
+ networkCounter.incrementNumSlowDNSOperations();
+ }
+
+ if (!swEndpoints.isOK()) {
+ return swEndpoints.getStatus();
+ }
+
+ auto endpoints = std::move(swEndpoints.getValue());
+ auto sws = _doSyncConnect(endpoints.front(), peer, timeout, transientSSLParams);
+ if (!sws.isOK()) {
+ return sws.getStatus();
+ }
+
+ auto session = std::move(sws.getValue());
+ session->ensureSync();
+
+#ifndef _WIN32
+ if (endpoints.front().family() == AF_UNIX) {
+ return static_cast<SessionHandle>(std::move(session));
+ }
+#endif
+
+#ifndef MONGO_CONFIG_SSL
+ if (sslMode == kEnableSSL) {
+ return {ErrorCodes::InvalidSSLConfiguration, "SSL requested but not supported"};
+ }
+#else
+ auto globalSSLMode = _sslMode();
+ if (sslMode == kEnableSSL ||
+ (sslMode == kGlobalSSLMode &&
+ ((globalSSLMode == SSLParams::SSLMode_preferSSL) ||
+ (globalSSLMode == SSLParams::SSLMode_requireSSL)))) {
+
+ if (auto sslStatus = session->buildSSLSocket(peer); !sslStatus.isOK()) {
+ return sslStatus;
+ }
+
+ // The handshake is complete once either of the following passes the finish line:
+ // - The thread running the handshake returns from `handshakeSSLForEgress`.
+ // - The thread running `TimerService` cancels the handshake due to a timeout.
+ auto finishLine = std::make_shared<StrongWeakFinishLine>(2);
+
+ // Schedules a task to cancel the synchronous handshake if it does not complete before the
+ // specified timeout.
+ auto timer = _timerService->makeTimer();
+
+#ifndef _WIN32
+ // TODO SERVER-62035: enable the following on Windows.
+ if (timeout > Milliseconds(0)) {
+ timer->waitUntil(_timerService->now() + timeout)
+ .getAsync([finishLine, session](Status status) {
+ if (status.isOK() && finishLine->arriveStrongly()) {
+ session->end();
+ }
+ });
+ }
+#endif
+
+ Date_t timeBefore = Date_t::now();
+ auto sslStatus = session->handshakeSSLForEgress(peer, nullptr).getNoThrow();
+ Date_t timeAfter = Date_t::now();
+
+ if (timeAfter - timeBefore > kSlowOperationThreshold) {
+ networkCounter.incrementNumSlowSSLOperations();
+ }
+
+ if (finishLine->arriveStrongly()) {
+ timer->cancel();
+ } else if (!sslStatus.isOK()) {
+ // We only take this path if the handshake times out. Overwrite the socket exception
+ // with a network timeout.
+ auto errMsg = fmt::format("SSL handshake timed out after {}",
+ (timeAfter - timeBefore).toString());
+ sslStatus = Status(ErrorCodes::NetworkTimeout, errMsg);
+ LOGV2(5490001,
+ "Timed out while running handshake",
+ "peer"_attr = peer,
+ "timeout"_attr = timeout);
+ }
+
+ if (!sslStatus.isOK()) {
+ return sslStatus;
+ }
+ }
+#endif
+
+ return static_cast<SessionHandle>(std::move(session));
+}
+
+template <typename Endpoint>
+StatusWith<AsioTransportLayer::AsioSessionHandle> AsioTransportLayer::_doSyncConnect(
+ Endpoint endpoint,
+ const HostAndPort& peer,
+ const Milliseconds& timeout,
+ boost::optional<TransientSSLParams> transientSSLParams) {
+ AsioSession::GenericSocket sock(*_egressReactor);
+ std::error_code ec;
+
+ const auto protocol = endpoint->protocol();
+ sock.open(protocol);
+
+#ifdef TCP_FASTOPEN_CONNECT
+ const auto family = protocol.family();
+ if ((family == AF_INET) || (family == AF_INET6)) {
+ setSocketOption(sock,
+ TcpFastOpenConnectOption(gTCPFastOpenClient),
+ "connect (sync) TCP fast open",
+ logv2::LogSeverity::Info(),
+ ec);
+ if (tcpFastOpenIsConfigured) {
+ return errorCodeToStatus(ec, "syncConnect tcpFastOpenIsConfigured");
+ }
+ ec = std::error_code();
+ }
+#endif
+
+ sock.non_blocking(true);
+
+ auto now = Date_t::now();
+ auto expiration = now + timeout;
+ do {
+ auto curTimeout = expiration - now;
+ sock.connect(*endpoint, curTimeout.toSystemDuration(), ec);
+ if (ec) {
+ now = Date_t::now();
+ }
+ // We loop below if ec == interrupted to deal with EINTR failures, otherwise we handle
+ // the error/timeout below.
+ } while (ec == asio::error::interrupted && now < expiration);
+
+ auto status = [&] {
+ if (ec) {
+ return errorCodeToStatus(ec, "syncConnect connect error");
+ } else if (now >= expiration) {
+ return Status(ErrorCodes::NetworkTimeout, "Timed out");
+ } else {
+ return Status::OK();
+ }
+ }();
+
+ if (!status.isOK()) {
+ return makeConnectError(status, peer, endpoint);
+ }
+
+ sock.non_blocking(false);
+ try {
+ std::shared_ptr<const transport::SSLConnectionContext> transientSSLContext;
+#ifdef MONGO_CONFIG_SSL
+ if (transientSSLParams) {
+ auto statusOrContext = createTransientSSLContext(transientSSLParams.value());
+ uassertStatusOK(statusOrContext.getStatus());
+ transientSSLContext = std::move(statusOrContext.getValue());
+ }
+#endif
+ return std::make_shared<AsioSession>(
+ this, std::move(sock), false, *endpoint, transientSSLContext);
+ } catch (const asio::system_error& e) {
+ return errorCodeToStatus(e.code(), "syncConnect AsioSession constructor");
+ } catch (const DBException& e) {
+ return e.toStatus();
+ }
+}
+
+Future<SessionHandle> AsioTransportLayer::asyncConnect(
+ HostAndPort peer,
+ ConnectSSLMode sslMode,
+ const ReactorHandle& reactor,
+ Milliseconds timeout,
+ std::shared_ptr<ConnectionMetrics> connectionMetrics,
+ std::shared_ptr<const SSLConnectionContext> transientSSLContext) {
+ if (MONGO_unlikely(asioTransportLayerAsyncConnectReturnsConnectionError.shouldFail()))
+ return Status{ErrorCodes::ConnectionError, "Failing asyncConnect due to fail-point"};
+
+ invariant(connectionMetrics);
+ connectionMetrics->onConnectionStarted();
+
+ if (transientSSLContext) {
+ uassert(ErrorCodes::InvalidSSLConfiguration,
+ "Specified transient SSL context but connection SSL mode is not set",
+ sslMode == kEnableSSL);
+ LOGV2_DEBUG(
+ 5270601, 2, "Connecting to peer using transient SSL connection", "peer"_attr = peer);
+ }
+
+ struct AsyncConnectState {
+ AsyncConnectState(HostAndPort peer,
+ asio::io_context& context,
+ Promise<SessionHandle> promise_,
+ const ReactorHandle& reactor)
+ : promise(std::move(promise_)),
+ socket(context),
+ timeoutTimer(context),
+ resolver(context),
+ peer(std::move(peer)),
+ reactor(reactor) {}
+
+ AtomicWord<bool> done{false};
+ Promise<SessionHandle> promise;
+
+ Mutex mutex = MONGO_MAKE_LATCH(HierarchicalAcquisitionLevel(0), "AsyncConnectState::mutex");
+ AsioSession::GenericSocket socket;
+ ASIOReactorTimer timeoutTimer;
+ WrappedResolver resolver;
+ WrappedEndpoint resolvedEndpoint;
+ const HostAndPort peer;
+ AsioTransportLayer::AsioSessionHandle session;
+ ReactorHandle reactor;
+ };
+
+ auto reactorImpl = checked_cast<ASIOReactor*>(reactor.get());
+ auto pf = makePromiseFuture<SessionHandle>();
+ auto connector = std::make_shared<AsyncConnectState>(
+ std::move(peer), *reactorImpl, std::move(pf.promise), reactor);
+ Future<SessionHandle> mergedFuture = std::move(pf.future);
+
+ if (connector->peer.host().empty()) {
+ return Status{ErrorCodes::HostNotFound, "Hostname or IP address to connect to is empty"};
+ }
+
+ if (timeout > Milliseconds{0} && timeout < Milliseconds::max()) {
+ connector->timeoutTimer.waitUntil(reactor->now() + timeout)
+ .getAsync([connector](Status status) {
+ if (status == ErrorCodes::CallbackCanceled || connector->done.swap(true)) {
+ return;
+ }
+
+ connector->promise.setError(
+ makeConnectError({ErrorCodes::NetworkTimeout, "Connecting timed out"},
+ connector->peer,
+ connector->resolvedEndpoint));
+
+ std::error_code ec;
+ stdx::lock_guard<Latch> lk(connector->mutex);
+ connector->resolver.cancel();
+ if (connector->session) {
+ connector->session->end();
+ } else {
+ connector->socket.cancel(ec);
+ }
+ });
+ }
+
+ Date_t timeBefore = Date_t::now();
+
+ auto resolverFuture = [&]() {
+ if (auto sfp = asioTransportLayerDelayConnection.scoped(); MONGO_unlikely(sfp.isActive())) {
+ Milliseconds delay{sfp.getData()["millis"].safeNumberInt()};
+ Date_t deadline = reactor->now() + delay;
+ if ((delay > Milliseconds(0)) && (deadline < Date_t::max())) {
+ LOGV2(6885900,
+ "delayConnection fail point is active, delaying connection establishment",
+ "delay"_attr = delay);
+
+ // Normally, the unique_ptr returned by makeTimer() is stored somewhere where we can
+ // ensure its validity. Here, we have to make it a shared_ptr and capture it so it
+ // remains valid until the timer fires.
+ std::shared_ptr<ReactorTimer> delayTimer = reactor->makeTimer();
+ return delayTimer->waitUntil(deadline).then(
+ [delayTimer, connector, enableIPv6 = _listenerOptions.enableIPv6] {
+ LOGV2(6885901, "finished delaying the connection");
+ return connector->resolver.asyncResolve(connector->peer, enableIPv6);
+ });
+ }
+ }
+ return connector->resolver.asyncResolve(connector->peer, _listenerOptions.enableIPv6);
+ }();
+
+ std::move(resolverFuture)
+ .then([connector, timeBefore, connectionMetrics](WrappedResolver::EndpointVector results) {
+ try {
+ connectionMetrics->onDNSResolved();
+
+ Date_t timeAfter = Date_t::now();
+ if (timeAfter - timeBefore > kSlowOperationThreshold) {
+ LOGV2_WARNING(23019,
+ "DNS resolution while connecting to {peer} took {duration}",
+ "DNS resolution while connecting to peer was slow",
+ "peer"_attr = connector->peer,
+ "duration"_attr = timeAfter - timeBefore);
+ networkCounter.incrementNumSlowDNSOperations();
+ }
+
+ stdx::lock_guard<Latch> lk(connector->mutex);
+
+ connector->resolvedEndpoint = results.front();
+ connector->socket.open(connector->resolvedEndpoint->protocol());
+ connector->socket.non_blocking(true);
+ } catch (asio::system_error& ex) {
+ return futurize(ex.code());
+ }
+
+#ifdef TCP_FASTOPEN_CONNECT
+ std::error_code ec;
+ setSocketOption(connector->socket,
+ TcpFastOpenConnectOption(gTCPFastOpenClient),
+ "connect (async) TCP fast open",
+ logv2::LogSeverity::Info(),
+ ec);
+ if (tcpFastOpenIsConfigured) {
+ return futurize(ec);
+ }
+#endif
+ return connector->socket.async_connect(*connector->resolvedEndpoint, UseFuture{});
+ })
+ .then([this, connector, sslMode, transientSSLContext, connectionMetrics]() -> Future<void> {
+ connectionMetrics->onTCPConnectionEstablished();
+
+ stdx::unique_lock<Latch> lk(connector->mutex);
+ connector->session = [&] {
+ try {
+ return std::make_shared<AsioSession>(this,
+ std::move(connector->socket),
+ false,
+ *connector->resolvedEndpoint,
+ transientSSLContext);
+ } catch (const asio::system_error& e) {
+ iasserted(errorCodeToStatus(e.code(), "asyncConnect AsioSession constructor"));
+ }
+ }();
+ connector->session->ensureAsync();
+
+#ifndef MONGO_CONFIG_SSL
+ if (sslMode == kEnableSSL) {
+ uasserted(ErrorCodes::InvalidSSLConfiguration, "SSL requested but not supported");
+ }
+#else
+ auto globalSSLMode = _sslMode();
+ if (sslMode == kEnableSSL ||
+ (sslMode == kGlobalSSLMode &&
+ ((globalSSLMode == SSLParams::SSLMode_preferSSL) ||
+ (globalSSLMode == SSLParams::SSLMode_requireSSL)))) {
+ if (const auto sslStatus = connector->session->buildSSLSocket(connector->peer);
+ !sslStatus.isOK()) {
+ return sslStatus;
+ }
+ Date_t timeBefore = Date_t::now();
+ return connector->session
+ ->handshakeSSLForEgress(connector->peer, connector->reactor)
+ .then([connector, timeBefore, connectionMetrics] {
+ connectionMetrics->onTLSHandshakeFinished();
+
+ Date_t timeAfter = Date_t::now();
+ if (timeAfter - timeBefore > kSlowOperationThreshold) {
+ networkCounter.incrementNumSlowSSLOperations();
+ }
+ return Status::OK();
+ });
+ }
+#endif
+ return Status::OK();
+ })
+ .onError([connector](Status status) -> Future<void> {
+ return makeConnectError(status, connector->peer, connector->resolvedEndpoint);
+ })
+ .getAsync([connector](Status connectResult) {
+ if (MONGO_unlikely(asioTransportLayerAsyncConnectTimesOut.shouldFail())) {
+ LOGV2(23013, "asyncConnectTimesOut fail point is active. simulating timeout.");
+ return;
+ }
+
+ if (connector->done.swap(true)) {
+ return;
+ }
+
+ connector->timeoutTimer.cancel();
+ if (connectResult.isOK()) {
+ connector->promise.emplaceValue(std::move(connector->session));
+ } else {
+ connector->promise.setError(connectResult);
+ }
+ });
+
+ return mergedFuture;
+}
+
+namespace {
+#if defined(TCP_FASTOPEN) || defined(TCP_FASTOPEN_CONNECT)
+/**
+ * Attempt to set an option on a dummy SOCK_STREAM/AF_INET socket
+ * and report success/failure.
+ */
+bool trySetSockOpt(int level, int opt, int val) {
+ auto sock = ::socket(AF_INET, SOCK_STREAM, 0);
+ if (sock == -1) {
+ auto ec = lastSocketError();
+ LOGV2_WARNING(5128700, "socket() failed", "error"_attr = errorMessage(ec));
+ return false;
+ }
+
+#ifdef _WIN32
+ char* pval = reinterpret_cast<char*>(&val);
+#else
+ void* pval = &val;
+#endif
+
+ const auto ret = ::setsockopt(sock, level, opt, pval, sizeof(val));
+
+#ifdef _WIN32
+ closesocket(sock);
+#else
+ close(sock);
+#endif
+
+ return ret == 0;
+}
+#endif
+
+Status validateFastOpen() noexcept {
+ namespace moe = optionenvironment;
+ if (moe::startupOptionsParsed.count("setParameter")) {
+ const auto params =
+ moe::startupOptionsParsed["setParameter"].as<std::map<std::string, std::string>>();
+ tcpFastOpenIsConfigured = (params.find("tcpFastOpenServer") != params.end()) ||
+ (params.find("tcpFastOpenClient") != params.end()) ||
+ (params.find("tcpFastOpenQueueSize") != params.end());
+ }
+
+#ifndef TCP_FASTOPEN
+ if (tcpFastOpenIsConfigured && gTCPFastOpenServer) {
+ return {ErrorCodes::BadValue,
+ "TCP FastOpen server support unavailable in this build of MongoDB"};
+ }
+#else
+ networkCounter.setTFOServerSupport(trySetSockOpt(IPPROTO_TCP, TCP_FASTOPEN, 1));
+#endif
+
+#ifndef TCP_FASTOPEN_CONNECT
+ if (tcpFastOpenIsConfigured && gTCPFastOpenClient) {
+ return {ErrorCodes::BadValue,
+ "TCP FastOpen client support unavailable in this build of MongoDB"};
+ }
+#else
+ networkCounter.setTFOClientSupport(trySetSockOpt(IPPROTO_TCP, TCP_FASTOPEN_CONNECT, 1));
+#endif
+
+#if defined(TCP_FASTOPEN) && defined(__linux)
+ if (!gTCPFastOpenServer && !gTCPFastOpenClient) {
+ return Status::OK();
+ }
+
+ std::string procfile("/proc/sys/net/ipv4/tcp_fastopen");
+ boost::system::error_code ec;
+ if (!boost::filesystem::exists(procfile, ec)) {
+ return {ErrorCodes::BadValue,
+ str::stream() << "Unable to locate " << procfile << ": " << errorCodeToStatus(ec)};
+ }
+
+ std::fstream f(procfile, std::ifstream::in);
+ if (!f.is_open()) {
+ return {ErrorCodes::BadValue, str::stream() << "Unable to read " << procfile};
+ }
+
+ std::int64_t val;
+ f >> val;
+ networkCounter.setTFOKernelSetting(val);
+
+ constexpr std::int64_t kTFOClientBit = (1 << 0);
+ constexpr std::int64_t kTFOServerBit = (1 << 1);
+
+ // Future proof this setting by allowing extra bits to stay set in help output.
+ std::int64_t wantval = val;
+ if (gTCPFastOpenClient) {
+ wantval |= kTFOClientBit;
+ }
+ if (gTCPFastOpenServer) {
+ wantval |= kTFOServerBit;
+ }
+
+ if (val != wantval) {
+ return {ErrorCodes::BadValue,
+ str::stream() << "TCP FastOpen disabled in kernel. "
+ << "Set " << procfile << " to " << std::to_string(wantval)};
+ }
+#endif
+
+ return Status::OK();
+}
+
+Status validateFastOpenOnce() noexcept {
+ if (!maybeTcpFastOpenStatus) {
+ // If we haven't validated the TCP FastOpen situation yet, do so.
+ maybeTcpFastOpenStatus = validateFastOpen();
+
+ if (!maybeTcpFastOpenStatus->isOK()) {
+ // This has to be a char[] because that's what logv2 understands
+ static constexpr char kPrefixString[] = "Unable to enable TCP FastOpen";
+
+ if (tcpFastOpenIsConfigured) {
+ // If the user asked for TCP FastOpen and we couldn't provide it, log a startup
+ // warning in addition to the hard failure.
+ LOGV2_WARNING_OPTIONS(23014,
+ {logv2::LogTag::kStartupWarnings},
+ kPrefixString,
+ "reason"_attr = maybeTcpFastOpenStatus->reason());
+ } else {
+ LOGV2(4648601,
+ "Implicit TCP FastOpen unavailable. "
+ "If TCP FastOpen is required, set tcpFastOpenServer, tcpFastOpenClient, "
+ "and tcpFastOpenQueueSize.");
+ }
+
+ maybeTcpFastOpenStatus->addContext(kPrefixString);
+ } else {
+ if (!tcpFastOpenIsConfigured) {
+ LOGV2(4648602, "Implicit TCP FastOpen in use.");
+ }
+ }
+ }
+
+ if (!tcpFastOpenIsConfigured) {
+ // If nobody asked for TCP FastOpen, no one will miss it.
+ return Status::OK();
+ }
+
+ // TCP FastOpen was requested. It's either there or it's not.
+ return *maybeTcpFastOpenStatus;
+}
+} // namespace
+
+Status AsioTransportLayer::setup() {
+ std::vector<std::string> listenAddrs;
+ if (_listenerOptions.ipList.empty() && _listenerOptions.isIngress()) {
+ listenAddrs = {"127.0.0.1"};
+ if (_listenerOptions.enableIPv6) {
+ listenAddrs.emplace_back("::1");
+ }
+ } else if (!_listenerOptions.ipList.empty()) {
+ listenAddrs = _listenerOptions.ipList;
+ }
+
+#ifndef _WIN32
+ if (_listenerOptions.useUnixSockets && _listenerOptions.isIngress()) {
+ listenAddrs.push_back(makeUnixSockPath(_listenerOptions.port));
+
+ if (_listenerOptions.loadBalancerPort) {
+ listenAddrs.push_back(makeUnixSockPath(*_listenerOptions.loadBalancerPort));
+ }
+ }
+#endif
+
+ if (auto foStatus = validateFastOpenOnce(); !foStatus.isOK()) {
+ return foStatus;
+ }
+
+ if (!(_listenerOptions.isIngress()) && !listenAddrs.empty()) {
+ return {ErrorCodes::BadValue,
+ "Cannot bind to listening sockets with ingress networking is disabled"};
+ }
+
+ _listenerPort = _listenerOptions.port;
+ WrappedResolver resolver(*_acceptorReactor);
+
+ std::vector<int> ports = {_listenerPort};
+ if (_listenerOptions.loadBalancerPort) {
+ ports.push_back(*_listenerOptions.loadBalancerPort);
+ }
+
+ // Self-deduplicating list of unique endpoint addresses.
+ std::set<WrappedEndpoint> endpoints;
+ for (const auto& port : ports) {
+ for (const auto& listenAddr : listenAddrs) {
+ if (listenAddr.empty()) {
+ LOGV2_WARNING(23020, "Skipping empty bind address");
+ continue;
+ }
+
+ const auto& swAddrs =
+ resolver.resolve(HostAndPort(listenAddr, port), _listenerOptions.enableIPv6);
+ if (!swAddrs.isOK()) {
+ LOGV2_WARNING(23021,
+ "Found no addresses for {peer}",
+ "Found no addresses for peer",
+ "peer"_attr = swAddrs.getStatus());
+ continue;
+ }
+ const auto& addrs = swAddrs.getValue();
+ endpoints.insert(addrs.begin(), addrs.end());
+ }
+ }
+
+ for (const auto& addr : endpoints) {
+#ifndef _WIN32
+ if (addr.family() == AF_UNIX) {
+ if (::unlink(addr.toString().c_str()) == -1) {
+ auto ec = lastPosixError();
+ if (ec != posixError(ENOENT)) {
+ LOGV2_ERROR(23024,
+ "Failed to unlink socket file {path} {error}",
+ "Failed to unlink socket file",
+ "path"_attr = addr.toString().c_str(),
+ "error"_attr = errorMessage(ec));
+ fassertFailedNoTrace(40486);
+ }
+ }
+ }
+#endif
+ if (addr.family() == AF_INET6 && !_listenerOptions.enableIPv6) {
+ LOGV2_ERROR(23025, "Specified ipv6 bind address, but ipv6 is disabled");
+ fassertFailedNoTrace(40488);
+ }
+
+ GenericAcceptor acceptor(*_acceptorReactor);
+ try {
+ acceptor.open(addr->protocol());
+ } catch (std::exception&) {
+ // Allow the server to start when "ipv6: true" and "bindIpAll: true", but the platform
+ // does not support ipv6 (e.g., ipv6 kernel module is not loaded in Linux).
+ auto bindAllFmt = [](auto p) { return fmt::format(":::{}", p); };
+ bool addrIsBindAll = addr.toString() == bindAllFmt(_listenerPort);
+
+ if (!addrIsBindAll && _listenerOptions.loadBalancerPort) {
+ addrIsBindAll = (addr.toString() == bindAllFmt(*_listenerOptions.loadBalancerPort));
+ }
+
+ if (errno == EAFNOSUPPORT && _listenerOptions.enableIPv6 && addr.family() == AF_INET6 &&
+ addrIsBindAll) {
+ LOGV2_WARNING(4206501,
+ "Failed to bind to address as the platform does not support ipv6",
+ "Failed to bind to {address} as the platform does not support ipv6",
+ "address"_attr = addr.toString());
+ continue;
+ }
+
+ throw;
+ }
+ setSocketOption(acceptor,
+ GenericAcceptor::reuse_address(true),
+ "acceptor reuse address",
+ logv2::LogSeverity::Info());
+
+ std::error_code ec;
+#ifdef TCP_FASTOPEN
+ if (gTCPFastOpenServer && ((addr.family() == AF_INET) || (addr.family() == AF_INET6))) {
+ setSocketOption(acceptor,
+ TcpFastOpenOption(gTCPFastOpenQueueSize),
+ "acceptor TCP fast open",
+ logv2::LogSeverity::Info(),
+ ec);
+ if (tcpFastOpenIsConfigured) {
+ return errorCodeToStatus(ec, "setup tcpFastOpenIsConfigured");
+ }
+ ec = std::error_code();
+ }
+#endif
+ if (addr.family() == AF_INET6) {
+ setSocketOption(
+ acceptor, asio::ip::v6_only(true), "acceptor v6 only", logv2::LogSeverity::Info());
+ }
+
+ acceptor.non_blocking(true, ec);
+ if (ec) {
+ return errorCodeToStatus(ec, "setup non_blocking");
+ }
+
+ acceptor.bind(*addr, ec);
+ if (ec) {
+ return errorCodeToStatus(ec, "setup bind");
+ }
+
+#ifndef _WIN32
+ if (addr.family() == AF_UNIX) {
+ if (::chmod(addr.toString().c_str(), serverGlobalParams.unixSocketPermissions) == -1) {
+ auto ec = lastPosixError();
+ LOGV2_ERROR(23026,
+ "Failed to chmod socket file {path} {error}",
+ "Failed to chmod socket file",
+ "path"_attr = addr.toString().c_str(),
+ "error"_attr = errorMessage(ec));
+ fassertFailedNoTrace(40487);
+ }
+ }
+#endif
+ auto endpoint = acceptor.local_endpoint(ec);
+ if (ec) {
+ return errorCodeToStatus(ec);
+ }
+ auto hostAndPort = endpointToHostAndPort(endpoint);
+
+ auto record = std::make_unique<AcceptorRecord>(SockAddr(addr->data(), addr->size()),
+ std::move(acceptor));
+
+ if (_listenerOptions.port == 0 && (addr.family() == AF_INET || addr.family() == AF_INET6)) {
+ if (_listenerPort != _listenerOptions.port) {
+ return Status(ErrorCodes::BadValue,
+ "Port 0 (ephemeral port) is not allowed when"
+ " listening on multiple IP interfaces");
+ }
+ _listenerPort = hostAndPort.port();
+ record->address.setPort(_listenerPort);
+ }
+
+ _acceptorRecords.push_back(std::move(record));
+ }
+
+ if (_acceptorRecords.empty() && _listenerOptions.isIngress()) {
+ return Status(ErrorCodes::SocketException, "No available addresses/ports to bind to");
+ }
+
+#ifdef MONGO_CONFIG_SSL
+ std::shared_ptr<SSLManagerInterface> manager = nullptr;
+ if (SSLManagerCoordinator::get()) {
+ manager = SSLManagerCoordinator::get()->getSSLManager();
+ }
+ return rotateCertificates(manager, true);
+#endif
+
+ return Status::OK();
+}
+
+std::vector<std::pair<SockAddr, int>> AsioTransportLayer::getListenerSocketBacklogQueueDepths()
+ const {
+ std::vector<std::pair<SockAddr, int>> queueDepths;
+ for (auto&& record : _acceptorRecords) {
+ queueDepths.push_back({SockAddr(record->address), record->backlogQueueDepth.load()});
+ }
+ return queueDepths;
+}
+
+void AsioTransportLayer::appendStatsForServerStatus(BSONObjBuilder* bob) const {
+ if (gFeatureFlagConnHealthMetrics.isEnabledAndIgnoreFCV()) {
+ bob->append("listenerProcessingTime", _listenerProcessingTime.load().toBSON());
+ }
+}
+
+void AsioTransportLayer::appendStatsForFTDC(BSONObjBuilder& bob) const {
+ if (gFeatureFlagConnHealthMetrics.isEnabledAndIgnoreFCV()) {
+ BSONArrayBuilder queueDepthsArrayBuilder(
+ bob.subarrayStart("listenerSocketBacklogQueueDepths"));
+ for (const auto& record : _acceptorRecords) {
+ BSONObjBuilder{queueDepthsArrayBuilder.subobjStart()}.append(
+ record->address.toString(), record->backlogQueueDepth.load());
+ }
+ queueDepthsArrayBuilder.done();
+ }
+}
+
+void AsioTransportLayer::_runListener() noexcept {
+ setThreadName("listener");
+
+ stdx::unique_lock lk(_mutex);
+ if (_isShutdown) {
+ return;
+ }
+
+ for (auto& acceptorRecord : _acceptorRecords) {
+ asio::error_code ec;
+ acceptorRecord->acceptor.listen(serverGlobalParams.listenBacklog, ec);
+ if (ec) {
+ LOGV2_FATAL(31339,
+ "Error listening for new connections on {listenAddress}: {error}",
+ "Error listening for new connections on listen address",
+ "listenAddrs"_attr = acceptorRecord->address,
+ "error"_attr = ec.message());
+ }
+
+ _acceptConnection(acceptorRecord->acceptor);
+ LOGV2(23015, "Listening on", "address"_attr = acceptorRecord->address.getAddr());
+ }
+
+ const char* ssl = "off";
+#ifdef MONGO_CONFIG_SSL
+ if (_sslMode() != SSLParams::SSLMode_disabled) {
+ ssl = "on";
+ }
+#endif
+ LOGV2(23016, "Waiting for connections", "port"_attr = _listenerPort, "ssl"_attr = ssl);
+
+ _listener.active = true;
+ _listener.cv.notify_all();
+ ON_BLOCK_EXIT([&] {
+ _listener.active = false;
+ _listener.cv.notify_all();
+ });
+
+ while (!_isShutdown) {
+ lk.unlock();
+ _acceptorReactor->run();
+ lk.lock();
+ }
+
+ // Loop through the acceptors and cancel their calls to async_accept. This will prevent new
+ // connections from being opened.
+ for (auto& acceptorRecord : _acceptorRecords) {
+ acceptorRecord->acceptor.cancel();
+ auto& addr = acceptorRecord->address;
+ if (addr.getType() == AF_UNIX && !addr.isAnonymousUNIXSocket()) {
+ auto path = addr.getAddr();
+ LOGV2(
+ 23017, "removing socket file: {path}", "removing socket file", "path"_attr = path);
+ if (::unlink(path.c_str()) != 0) {
+ auto ec = lastPosixError();
+ LOGV2_WARNING(23022,
+ "Unable to remove UNIX socket {path}: {error}",
+ "Unable to remove UNIX socket",
+ "path"_attr = path,
+ "error"_attr = errorMessage(ec));
+ }
+ }
+ }
+}
+
+Status AsioTransportLayer::start() {
+ stdx::unique_lock lk(_mutex);
+ if (_isShutdown) {
+ LOGV2(6986801, "Cannot start an already shutdown TransportLayer");
+ return ShutdownStatus;
+ }
+
+ if (_listenerOptions.isIngress()) {
+ _listener.thread = stdx::thread([this] { _runListener(); });
+ _listener.cv.wait(lk, [&] { return _isShutdown || _listener.active; });
+ return Status::OK();
+ }
+
+ invariant(_acceptorRecords.empty());
+ return Status::OK();
+}
+
+void AsioTransportLayer::shutdown() {
+ stdx::unique_lock lk(_mutex);
+
+ if (std::exchange(_isShutdown, true)) {
+ // We were already stopped
+ return;
+ }
+ lk.unlock();
+ _timerService->stop();
+ lk.lock();
+
+ if (!_listenerOptions.isIngress()) {
+ // Egress only reactors never start a listener
+ return;
+ }
+
+ auto thread = std::exchange(_listener.thread, {});
+ if (!thread.joinable()) {
+ // If the listener never started, then we can return now
+ return;
+ }
+
+ // Spam stop() on the reactor, it interrupts run()
+ while (_listener.active) {
+ lk.unlock();
+ _acceptorReactor->stop();
+ lk.lock();
+ }
+
+ // Release the lock and wait for the thread to die
+ lk.unlock();
+ thread.join();
+}
+
+ReactorHandle AsioTransportLayer::getReactor(WhichReactor which) {
+ switch (which) {
+ case TransportLayer::kIngress:
+ return _ingressReactor;
+ case TransportLayer::kEgress:
+ return _egressReactor;
+ case TransportLayer::kNewReactor:
+ return std::make_shared<ASIOReactor>();
+ }
+
+ MONGO_UNREACHABLE;
+}
+
+namespace {
+bool isConnectionResetError(const std::error_code& ec) {
+ // Connection reset errors classically present as asio::error::eof, but can bubble up as
+ // asio::error::invalid_argument when calling into socket.set_option().
+ return ec == asio::error::eof || ec == asio::error::invalid_argument;
+}
+
+/** Tricky: TCP can be represented by IPPROTO_IP or IPPROTO_TCP. */
+template <typename Protocol>
+bool isTcp(Protocol&& p) {
+ auto pf = p.family();
+ auto pt = p.type();
+ auto pp = p.protocol();
+ return (pf == AF_INET || pf == AF_INET6) && (pt == SOCK_STREAM) &&
+ (pp == IPPROTO_IP || pp == IPPROTO_TCP);
+}
+} // namespace
+
+void AsioTransportLayer::_acceptConnection(GenericAcceptor& acceptor) {
+ auto acceptCb = [this, &acceptor](const std::error_code& ec,
+ AsioSession::GenericSocket peerSocket) mutable {
+ Timer timer;
+ asioTransportLayerHangDuringAcceptCallback.pauseWhileSet();
+
+ if (auto lk = stdx::lock_guard(_mutex); _isShutdown) {
+ return;
+ }
+
+ if (ec) {
+ LOGV2(23018,
+ "Error accepting new connection on {localEndpoint}: {error}",
+ "Error accepting new connection on local endpoint",
+ "localEndpoint"_attr = endpointToHostAndPort(acceptor.local_endpoint()),
+ "error"_attr = ec.message());
+ _acceptConnection(acceptor);
+ return;
+ }
+
+#ifdef TCPI_OPT_SYN_DATA
+ try {
+ TcpInfoOption tcpi{};
+ peerSocket.get_option(tcpi);
+ if (tcpi->tcpi_options & TCPI_OPT_SYN_DATA)
+ networkCounter.acceptedTFOIngress();
+ } catch (const asio::system_error&) {
+ }
+#endif
+
+ try {
+ std::shared_ptr<AsioSession> session(
+ new AsioSession(this, std::move(peerSocket), true));
+ if (session->isFromLoadBalancer()) {
+ session->parseProxyProtocolHeader(_acceptorReactor)
+ .getAsync([this, session = std::move(session)](Status s) {
+ if (s.isOK()) {
+ _sep->startSession(std::move(session));
+ }
+ });
+ } else {
+ _sep->startSession(std::move(session));
+ }
+ } catch (const asio::system_error& e) {
+ // Swallow connection reset errors.
+ if (!isConnectionResetError(e.code())) {
+ LOGV2_WARNING(5746600,
+ "Error accepting new connection: {error}",
+ "Error accepting new connection",
+ "error"_attr = e.code().message());
+ }
+ } catch (const DBException& e) {
+ LOGV2_WARNING(23023,
+ "Error accepting new connection: {error}",
+ "Error accepting new connection",
+ "error"_attr = e);
+ }
+
+ // _acceptConnection() is accessed by only one thread (i.e. the listener thread), so an
+ // atomic increment is not required here
+ _listenerProcessingTime.store(_listenerProcessingTime.load() + timer.elapsed());
+ _acceptConnection(acceptor);
+ };
+
+ asioTransportLayerHangBeforeAcceptCallback.pauseWhileSet();
+
+ _trySetListenerSocketBacklogQueueDepth(acceptor);
+
+ acceptor.async_accept(*_ingressReactor, std::move(acceptCb));
+}
+
+void AsioTransportLayer::_trySetListenerSocketBacklogQueueDepth(
+ GenericAcceptor& acceptor) noexcept {
+#ifdef __linux__
+ try {
+ if (!isTcp(acceptor.local_endpoint().protocol()))
+ return;
+ auto matchingRecord =
+ std::find_if(begin(_acceptorRecords), end(_acceptorRecords), [&](const auto& record) {
+ return acceptor.local_endpoint() == record->acceptor.local_endpoint();
+ });
+ invariant(matchingRecord != std::end(_acceptorRecords));
+ TcpInfoOption tcpi;
+ acceptor.get_option(tcpi);
+ (*matchingRecord)->backlogQueueDepth.store(tcpi->tcpi_unacked);
+ } catch (const asio::system_error& e) {
+ // Swallow connection reset errors.
+ if (!isConnectionResetError(e.code())) {
+ LOGV2_WARNING(7006800,
+ "Error retrieving tcp acceptor socket queue length",
+ "error"_attr = e.code().message());
+ }
+ }
+#endif
+}
+
+#ifdef MONGO_CONFIG_SSL
+SSLParams::SSLModes AsioTransportLayer::_sslMode() const {
+ return static_cast<SSLParams::SSLModes>(getSSLGlobalParams().sslMode.load());
+}
+
+Status AsioTransportLayer::rotateCertificates(std::shared_ptr<SSLManagerInterface> manager,
+ bool asyncOCSPStaple) {
+ if (manager && manager->isTransient()) {
+ return Status(ErrorCodes::InternalError,
+ "Should not rotate transient SSL manager's certificates");
+ }
+ auto contextOrStatus = _createSSLContext(manager, _sslMode(), asyncOCSPStaple);
+ if (!contextOrStatus.isOK()) {
+ return contextOrStatus.getStatus();
+ }
+ _sslContext = std::move(contextOrStatus.getValue());
+ return Status::OK();
+}
+
+StatusWith<std::shared_ptr<const transport::SSLConnectionContext>>
+AsioTransportLayer::_createSSLContext(std::shared_ptr<SSLManagerInterface>& manager,
+ SSLParams::SSLModes sslMode,
+ bool asyncOCSPStaple) const {
+
+ std::shared_ptr<SSLConnectionContext> newSSLContext = std::make_shared<SSLConnectionContext>();
+ newSSLContext->manager = manager;
+ const auto& sslParams = getSSLGlobalParams();
+
+ if (sslMode != SSLParams::SSLMode_disabled && _listenerOptions.isIngress()) {
+ newSSLContext->ingress = std::make_unique<asio::ssl::context>(asio::ssl::context::sslv23);
+
+ Status status = newSSLContext->manager->initSSLContext(
+ newSSLContext->ingress->native_handle(),
+ sslParams,
+ SSLManagerInterface::ConnectionDirection::kIncoming);
+ if (!status.isOK()) {
+ return status;
+ }
+
+ std::weak_ptr<const SSLConnectionContext> weakContextPtr = newSSLContext;
+ manager->registerOwnedBySSLContext(weakContextPtr);
+ auto resp = newSSLContext->manager->stapleOCSPResponse(
+ newSSLContext->ingress->native_handle(), asyncOCSPStaple);
+
+ if (!resp.isOK()) {
+ return Status(ErrorCodes::InvalidSSLConfiguration,
+ str::stream()
+ << "Can not staple OCSP Response. Reason: " << resp.reason());
+ }
+ }
+
+ if (_listenerOptions.isEgress() && newSSLContext->manager) {
+ newSSLContext->egress = std::make_unique<asio::ssl::context>(asio::ssl::context::sslv23);
+ Status status = newSSLContext->manager->initSSLContext(
+ newSSLContext->egress->native_handle(),
+ sslParams,
+ SSLManagerInterface::ConnectionDirection::kOutgoing);
+ if (!status.isOK()) {
+ return status;
+ }
+ if (newSSLContext->manager->isTransient()) {
+ newSSLContext->targetClusterURI =
+ newSSLContext->manager->getTargetedClusterConnectionString();
+ }
+ }
+ return newSSLContext;
+}
+
+StatusWith<std::shared_ptr<const transport::SSLConnectionContext>>
+AsioTransportLayer::createTransientSSLContext(const TransientSSLParams& transientSSLParams) {
+ auto coordinator = SSLManagerCoordinator::get();
+ if (!coordinator) {
+ return Status(ErrorCodes::InvalidSSLConfiguration,
+ "SSLManagerCoordinator is not initialized");
+ }
+ auto manager = coordinator->createTransientSSLManager(transientSSLParams);
+ invariant(manager);
+
+ return _createSSLContext(manager, _sslMode(), true /* asyncOCSPStaple */);
+}
+
+#endif
+
+#ifdef __linux__
+BatonHandle AsioTransportLayer::makeBaton(OperationContext* opCtx) const {
+ invariant(!opCtx->getBaton());
+
+ auto baton = std::make_shared<BatonASIO>(opCtx);
+ opCtx->setBaton(baton);
+
+ return baton;
+}
+#endif
+
+} // namespace transport
+} // namespace mongo