path: root/src/mongo/transport/asio/asio_transport_layer.h
diff options
authorMatt Diener <>2023-01-24 17:34:50 +0000
committerEvergreen Agent <>2023-01-24 19:55:52 +0000
commitb6152817bb33a1f128c54a8a95343dc8f57af64e (patch)
tree44ddd971e04e0de0cc9636e47e555e82d7cef22b /src/mongo/transport/asio/asio_transport_layer.h
parentd6b6e56babc573c0b48c4b6c037086ed0cca59dd (diff)
SERVER-71906 move asio-related transport files to their own directory
Diffstat (limited to 'src/mongo/transport/asio/asio_transport_layer.h')
1 files changed, 327 insertions, 0 deletions
diff --git a/src/mongo/transport/asio/asio_transport_layer.h b/src/mongo/transport/asio/asio_transport_layer.h
new file mode 100644
index 00000000000..c506638e7f6
--- /dev/null
+++ b/src/mongo/transport/asio/asio_transport_layer.h
@@ -0,0 +1,327 @@
+ * Copyright (C) 2018-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+#pragma once
+#include <functional>
+#include <memory>
+#include <string>
+#include "mongo/base/status_with.h"
+#include "mongo/config.h"
+#include "mongo/db/server_options.h"
+#include "mongo/platform/mutex.h"
+#include "mongo/stdx/condition_variable.h"
+#include "mongo/stdx/thread.h"
+#include "mongo/transport/transport_layer.h"
+#include "mongo/util/fail_point.h"
+#include "mongo/util/hierarchical_acquisition.h"
+#include "mongo/util/net/hostandport.h"
+#include "mongo/util/net/ssl_options.h"
+#include "mongo/util/net/ssl_types.h"
+namespace asio {
+class io_context;
+template <typename Protocol>
+class basic_socket_acceptor;
+namespace generic {
+class stream_protocol;
+} // namespace generic
+namespace ssl {
+class context;
+} // namespace ssl
+} // namespace asio
+namespace mongo {
+class ServiceContext;
+class ServiceEntryPoint;
+namespace transport {
+// Simulates reads and writes that always return 1 byte and fail with EAGAIN
+extern FailPoint asioTransportLayerShortOpportunisticReadWrite;
+// Cause an asyncConnect to timeout after it's successfully connected to the remote peer
+extern FailPoint asioTransportLayerAsyncConnectTimesOut;
+extern FailPoint asioTransportLayerHangBeforeAcceptCallback;
+extern FailPoint asioTransportLayerHangDuringAcceptCallback;
+class AsioNetworkingBaton;
+class AsioReactor;
+class AsioSession;
+ * A TransportLayer implementation based on ASIO networking primitives.
+ */
+class AsioTransportLayer final : public TransportLayer {
+ AsioTransportLayer(const AsioTransportLayer&) = delete;
+ AsioTransportLayer& operator=(const AsioTransportLayer&) = delete;
+ constexpr static auto kSlowOperationThreshold = Seconds(1);
+ struct Options {
+ constexpr static auto kIngress = 0x1;
+ constexpr static auto kEgress = 0x10;
+ explicit Options(const ServerGlobalParams* params) : Options(params, {}) {}
+ Options(const ServerGlobalParams* params, boost::optional<int> loadBalancerPort);
+ Options() = default;
+ int mode = kIngress | kEgress;
+ bool isIngress() const {
+ return mode & kIngress;
+ }
+ bool isEgress() const {
+ return mode & kEgress;
+ }
+ int port = ServerGlobalParams::DefaultDBPort; // port to bind to
+ boost::optional<int> loadBalancerPort; // accepts load balancer connections
+ std::vector<std::string> ipList; // addresses to bind to
+#ifndef _WIN32
+ bool useUnixSockets = true; // whether to allow UNIX sockets in ipList
+ bool enableIPv6 = false; // whether to allow IPv6 sockets in ipList
+ size_t maxConns = DEFAULT_MAX_CONN; // maximum number of active connections
+ };
+ /**
+ * A service, internal to `AsioTransportLayer`, that allows creating timers and running `Future`
+ * continuations when a timeout occurs. This allows setting up timeouts for synchronous
+ * operations, such as a synchronous SSL handshake. A separate thread is assigned to run these
+ * timers to:
+ * - Ensure there is always a thread running the timers, regardless of using a synchronous or
+ * asynchronous listener.
+ * - Avoid any performance implications on other reactors (e.g., the `egressReactor`).
+ * The public visibility is only for testing purposes and this service is not intended to be
+ * used outside `AsioTransportLayer`.
+ */
+ class TimerService {
+ public:
+ using Spawn = std::function<stdx::thread(std::function<void()>)>;
+ struct Options {
+ Spawn spawn;
+ };
+ explicit TimerService(Options opt);
+ TimerService() : TimerService(Options{}) {}
+ ~TimerService();
+ /**
+ * Spawns a thread to run the reactor.
+ * Immediately returns if the service has already started.
+ * May be called more than once, and concurrently.
+ */
+ void start();
+ /**
+ * Stops the reactor and joins the thread.
+ * Immediately returns if the service is not started, or already stopped.
+ * May be called more than once, and concurrently.
+ */
+ void stop();
+ std::unique_ptr<ReactorTimer> makeTimer();
+ Date_t now();
+ private:
+ Reactor* _getReactor();
+ const std::shared_ptr<Reactor> _reactor;
+ // Serializes invocations of `start()` and `stop()`, and allows updating `_state` and
+ // `_thread` as a single atomic operation.
+ Mutex _mutex = MONGO_MAKE_LATCH("AsioTransportLayer::TimerService::_mutex");
+ // State transitions: `kInitialized` --> `kStarted` --> `kStopped`
+ // |_______________________________^
+ enum class State { kInitialized, kStarted, kStopped };
+ AtomicWord<State> _state;
+ Spawn _spawn = [](std::function<void()> f) { return stdx::thread{std::move(f)}; };
+ stdx::thread _thread;
+ };
+ AsioTransportLayer(const Options& opts,
+ ServiceEntryPoint* sep,
+ const WireSpec& wireSpec = WireSpec::instance());
+ ~AsioTransportLayer() override;
+ StatusWith<SessionHandle> connect(HostAndPort peer,
+ ConnectSSLMode sslMode,
+ Milliseconds timeout,
+ boost::optional<TransientSSLParams> transientSSLParams) final;
+ Future<SessionHandle> asyncConnect(
+ HostAndPort peer,
+ ConnectSSLMode sslMode,
+ const ReactorHandle& reactor,
+ Milliseconds timeout,
+ std::shared_ptr<ConnectionMetrics> connectionMetrics,
+ std::shared_ptr<const SSLConnectionContext> transientSSLContext = nullptr) final;
+ Status setup() final;
+ ReactorHandle getReactor(WhichReactor which) final;
+ Status start() final;
+ void shutdown() final;
+ void appendStatsForServerStatus(BSONObjBuilder* bob) const override;
+ void appendStatsForFTDC(BSONObjBuilder& bob) const override;
+ int listenerPort() const {
+ return _listenerPort;
+ }
+ boost::optional<int> loadBalancerPort() const {
+ return _listenerOptions.loadBalancerPort;
+ }
+ std::vector<std::pair<SockAddr, int>> getListenerSocketBacklogQueueDepths() const;
+#ifdef __linux__
+ BatonHandle makeBaton(OperationContext* opCtx) const override;
+ SSLParams::SSLModes sslMode() const;
+ std::shared_ptr<const SSLConnectionContext> sslContext() const {
+ return _sslContext.get();
+ }
+ Status rotateCertificates(std::shared_ptr<SSLManagerInterface> manager,
+ bool asyncOCSPStaple) override;
+ /**
+ * Creates a transient SSL context using targeted (non default) SSL params.
+ * @param transientSSLParams overrides any value in stored SSLConnectionContext.
+ * @param optionalManager provides an optional SSL manager, otherwise the default one will be
+ * used.
+ */
+ StatusWith<std::shared_ptr<const transport::SSLConnectionContext>> createTransientSSLContext(
+ const TransientSSLParams& transientSSLParams) override;
+ using AsioSessionHandle = std::shared_ptr<AsioSession>;
+ using ConstAsioSessionHandle = std::shared_ptr<const AsioSession>;
+ using GenericAcceptor = asio::basic_socket_acceptor<asio::generic::stream_protocol>;
+ void _acceptConnection(GenericAcceptor& acceptor);
+ template <typename Endpoint>
+ StatusWith<AsioSessionHandle> _doSyncConnect(
+ Endpoint endpoint,
+ const HostAndPort& peer,
+ const Milliseconds& timeout,
+ boost::optional<TransientSSLParams> transientSSLParams);
+ StatusWith<std::shared_ptr<const transport::SSLConnectionContext>> _createSSLContext(
+ std::shared_ptr<SSLManagerInterface>& manager,
+ SSLParams::SSLModes sslMode,
+ bool asyncOCSPStaple) const;
+ void _runListener() noexcept;
+ void _trySetListenerSocketBacklogQueueDepth(GenericAcceptor& acceptor) noexcept;
+ Mutex _mutex = MONGO_MAKE_LATCH(HierarchicalAcquisitionLevel(0), "AsioTransportLayer::_mutex");
+ // There are three reactors that are used by AsioTransportLayer. The _ingressReactor contains
+ // all the accepted sockets and all ingress networking activity. The _acceptorReactor contains
+ // all the sockets in _acceptors. The _egressReactor contains egress connections.
+ //
+ // AsioTransportLayer should never call run() on the _ingressReactor.
+ // In synchronous mode, this will cause a massive performance degradation due to
+ // unnecessary wakeups on the asio thread for sockets we don't intend to interact
+ // with asynchronously. The additional IO context avoids registering those sockets
+ // with the acceptors epoll set, thus avoiding those wakeups. Calling run will
+ // undo that benefit.
+ //
+ // AsioTransportLayer should run its own thread that calls run() on the _acceptorReactor
+ // to process calls to async_accept - this is the equivalent of the "listener" thread in
+ // other TransportLayers.
+ //
+ // The underlying problem that caused this is here:
+ //
+ //
+ // It is important that the reactors be declared before the vector of acceptors (or any other
+ // state that is associated with the reactors), so that we destroy any existing acceptors or
+ // other reactor associated state before we drop the refcount on the reactor, which may destroy
+ // it.
+ std::shared_ptr<AsioReactor> _ingressReactor;
+ std::shared_ptr<AsioReactor> _egressReactor;
+ std::shared_ptr<AsioReactor> _acceptorReactor;
+ synchronized_value<std::shared_ptr<const SSLConnectionContext>> _sslContext;
+ struct AcceptorRecord;
+ std::vector<std::unique_ptr<AcceptorRecord>> _acceptorRecords;
+ // Only used if _listenerOptions.async is false.
+ struct Listener {
+ stdx::thread thread;
+ stdx::condition_variable cv;
+ bool active = false;
+ };
+ Listener _listener;
+ ServiceEntryPoint* const _sep = nullptr;
+ Options _listenerOptions;
+ // The real incoming port in case of _listenerOptions.port==0 (ephemeral).
+ int _listenerPort = 0;
+ bool _isShutdown = false;
+ const std::unique_ptr<TimerService> _timerService;
+ // Tracks the cumulative time the listener spends between accepting incoming connections to
+ // handing them off to dedicated connection threads.
+ AtomicWord<Microseconds> _listenerProcessingTime;
+} // namespace transport
+} // namespace mongo