summaryrefslogtreecommitdiff
path: root/src/mongo
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo')
-rw-r--r--src/mongo/dbtests/SConscript1
-rw-r--r--src/mongo/dbtests/dbtests.cpp4
-rw-r--r--src/mongo/transport/asio_utils.h4
-rw-r--r--src/mongo/transport/mock_session.h2
-rw-r--r--src/mongo/transport/service_executor_adaptive.cpp49
-rw-r--r--src/mongo/transport/service_executor_adaptive.h11
-rw-r--r--src/mongo/transport/service_executor_test.cpp59
-rw-r--r--src/mongo/transport/session.h7
-rw-r--r--src/mongo/transport/session_asio.h19
-rw-r--r--src/mongo/transport/transport_layer.h83
-rw-r--r--src/mongo/transport/transport_layer_asio.cpp326
-rw-r--r--src/mongo/transport/transport_layer_asio.h44
-rw-r--r--src/mongo/transport/transport_layer_manager.cpp26
-rw-r--r--src/mongo/transport/transport_layer_manager.h11
-rw-r--r--src/mongo/transport/transport_layer_mock.cpp11
-rw-r--r--src/mongo/transport/transport_layer_mock.h10
-rw-r--r--src/mongo/util/future.h7
17 files changed, 534 insertions, 140 deletions
diff --git a/src/mongo/dbtests/SConscript b/src/mongo/dbtests/SConscript
index 9088d5fb117..0257196aafb 100644
--- a/src/mongo/dbtests/SConscript
+++ b/src/mongo/dbtests/SConscript
@@ -140,6 +140,7 @@ dbtest = env.Program(
"$BUILD_DIR/mongo/db/sessions_collection_standalone",
"$BUILD_DIR/mongo/db/storage/mmap_v1/paths",
"$BUILD_DIR/mongo/db/storage/kv/kv_engine_core",
+ "$BUILD_DIR/mongo/transport/transport_layer_manager",
"$BUILD_DIR/mongo/util/clock_source_mock",
"$BUILD_DIR/mongo/util/net/network",
"$BUILD_DIR/mongo/util/progress_meter",
diff --git a/src/mongo/dbtests/dbtests.cpp b/src/mongo/dbtests/dbtests.cpp
index 07db83f45ac..53ecaaa74e9 100644
--- a/src/mongo/dbtests/dbtests.cpp
+++ b/src/mongo/dbtests/dbtests.cpp
@@ -55,6 +55,7 @@
#include "mongo/dbtests/framework.h"
#include "mongo/scripting/engine.h"
#include "mongo/stdx/memory.h"
+#include "mongo/transport/transport_layer_manager.h"
#include "mongo/util/clock_source_mock.h"
#include "mongo/util/quick_exit.h"
#include "mongo/util/signal_handlers_synchronous.h"
@@ -156,6 +157,9 @@ int dbtestsMain(int argc, char** argv, char** envp) {
preciseClock->advance(Seconds(1));
service->setPreciseClockSource(std::move(preciseClock));
+ service->setTransportLayer(
+ transport::TransportLayerManager::makeAndStartDefaultEgressTransportLayer());
+
repl::ReplicationCoordinator::set(
service,
std::unique_ptr<repl::ReplicationCoordinator>(
diff --git a/src/mongo/transport/asio_utils.h b/src/mongo/transport/asio_utils.h
index 8f29b709fda..be660d8b861 100644
--- a/src/mongo/transport/asio_utils.h
+++ b/src/mongo/transport/asio_utils.h
@@ -60,6 +60,10 @@ inline Status errorCodeToStatus(const std::error_code& ec) {
if (!ec)
return Status::OK();
+ if (ec == asio::error::operation_aborted) {
+ return {ErrorCodes::CallbackCanceled, "Callback was canceled"};
+ }
+
#ifdef _WIN32
if (ec == asio::error::timed_out) {
#else
diff --git a/src/mongo/transport/mock_session.h b/src/mongo/transport/mock_session.h
index a21c913b98e..19379babb20 100644
--- a/src/mongo/transport/mock_session.h
+++ b/src/mongo/transport/mock_session.h
@@ -103,6 +103,8 @@ public:
return Future<void>::makeReady(sinkMessage(message));
}
+ void cancelAsyncOperations() override {}
+
void setTimeout(boost::optional<Milliseconds>) override {}
bool isConnected() override {
diff --git a/src/mongo/transport/service_executor_adaptive.cpp b/src/mongo/transport/service_executor_adaptive.cpp
index f038d62c57a..d0014485cc4 100644
--- a/src/mongo/transport/service_executor_adaptive.cpp
+++ b/src/mongo/transport/service_executor_adaptive.cpp
@@ -92,7 +92,6 @@ constexpr auto kExecutorName = "adaptive"_sd;
constexpr auto kStuckDetection = "stuckThreadsDetected"_sd;
constexpr auto kStarvation = "starvation"_sd;
constexpr auto kReserveMinimum = "belowReserveMinimum"_sd;
-constexpr auto kBecauseOfError = "replacingCrashedThreads"_sd;
constexpr auto kThreadReasons = "threadCreationCauses"_sd;
int64_t ticksToMicros(TickSource::Tick ticks, TickSource* tickSource) {
@@ -153,14 +152,14 @@ struct ServerParameterOptions : public ServiceExecutorAdaptive::Options {
thread_local ServiceExecutorAdaptive::ThreadState* ServiceExecutorAdaptive::_localThreadState =
nullptr;
-ServiceExecutorAdaptive::ServiceExecutorAdaptive(ServiceContext* ctx,
- std::shared_ptr<asio::io_context> ioCtx)
- : ServiceExecutorAdaptive(ctx, std::move(ioCtx), stdx::make_unique<ServerParameterOptions>()) {}
+ServiceExecutorAdaptive::ServiceExecutorAdaptive(ServiceContext* ctx, ReactorHandle reactor)
+ : ServiceExecutorAdaptive(
+ ctx, std::move(reactor), stdx::make_unique<ServerParameterOptions>()) {}
ServiceExecutorAdaptive::ServiceExecutorAdaptive(ServiceContext* ctx,
- std::shared_ptr<asio::io_context> ioCtx,
+ ReactorHandle reactor,
std::unique_ptr<Options> config)
- : _ioContext(std::move(ioCtx)),
+ : _reactorHandle(reactor),
_config(std::move(config)),
_tickSource(ctx->getTickSource()),
_lastScheduleTimer(_tickSource) {}
@@ -190,7 +189,7 @@ Status ServiceExecutorAdaptive::shutdown(Milliseconds timeout) {
_controllerThread.join();
stdx::unique_lock<stdx::mutex> lk(_threadsMutex);
- _ioContext->stop();
+ _reactorHandle->stop();
bool result =
_deathCondition.wait_for(lk, timeout.toSystemDuration(), [&] { return _threads.empty(); });
@@ -254,9 +253,9 @@ Status ServiceExecutorAdaptive::schedule(ServiceExecutorAdaptive::Task task,
// can be called immediately and recursively.
if ((flags & kMayRecurse) &&
(_localThreadState->recursionDepth + 1 < _config->recursionLimit())) {
- _ioContext->dispatch(std::move(wrappedTask));
+ _reactorHandle->schedule(Reactor::kDispatch, std::move(wrappedTask));
} else {
- _ioContext->post(std::move(wrappedTask));
+ _reactorHandle->schedule(Reactor::kPost, std::move(wrappedTask));
}
_lastScheduleTimer.reset();
@@ -588,31 +587,11 @@ void ServiceExecutorAdaptive::_workerThreadRoutine(
// Reset ticksSpentExecuting timer
state->executingCurRun = 0;
- try {
- asio::io_context::work work(*_ioContext);
- // If we're still "pending" only try to run one task, that way the controller will
- // know that it's okay to start adding threads to avoid starvation again.
- state->running.markRunning();
- _ioContext->run_for(runTime.toSystemDuration());
-
- // _ioContext->run_one() will return when all the scheduled handlers are completed, and
- // you must call restart() to call run_one() again or else it will return immediately.
- // In the case where the server has just started and there has been no work yet, this
- // means this loop will spin until the first client connect. This call to restart avoids
- // that.
- if (_ioContext->stopped())
- _ioContext->restart();
- // If an exception escaped from ASIO, then break from this thread and start a new one.
- } catch (std::exception& e) {
- log() << "Exception escaped worker thread: " << e.what()
- << " Starting new worker thread.";
- _startWorkerThread(ThreadCreationReason::kError);
- break;
- } catch (...) {
- log() << "Unknown exception escaped worker thread. Starting new worker thread.";
- _startWorkerThread(ThreadCreationReason::kError);
- break;
- }
+ // If we're still "pending" only try to run one task, that way the controller will
+ // know that it's okay to start adding threads to avoid starvation again.
+ state->running.markRunning();
+ _reactorHandle->runFor(runTime);
+
auto spentRunning = state->running.markStopped();
// If we spent less than our idle threshold actually running tasks then exit the thread.
@@ -678,8 +657,6 @@ StringData ServiceExecutorAdaptive::_threadStartedByToString(
return kStarvation;
case ThreadCreationReason::kReserveMinimum:
return kReserveMinimum;
- case ThreadCreationReason::kError:
- return kBecauseOfError;
default:
MONGO_UNREACHABLE;
}
diff --git a/src/mongo/transport/service_executor_adaptive.h b/src/mongo/transport/service_executor_adaptive.h
index d300eeaa279..07ae2f8b0cd 100644
--- a/src/mongo/transport/service_executor_adaptive.h
+++ b/src/mongo/transport/service_executor_adaptive.h
@@ -38,10 +38,9 @@
#include "mongo/stdx/thread.h"
#include "mongo/transport/service_executor.h"
#include "mongo/transport/service_executor_task_names.h"
+#include "mongo/transport/transport_layer.h"
#include "mongo/util/tick_source.h"
-#include <asio.hpp>
-
namespace mongo {
namespace transport {
@@ -82,9 +81,9 @@ public:
virtual int recursionLimit() const = 0;
};
- explicit ServiceExecutorAdaptive(ServiceContext* ctx, std::shared_ptr<asio::io_context> ioCtx);
+ explicit ServiceExecutorAdaptive(ServiceContext* ctx, ReactorHandle reactor);
explicit ServiceExecutorAdaptive(ServiceContext* ctx,
- std::shared_ptr<asio::io_context> ioCtx,
+ ReactorHandle reactor,
std::unique_ptr<Options> config);
ServiceExecutorAdaptive(ServiceExecutorAdaptive&&) = default;
@@ -177,7 +176,7 @@ private:
using MetricsArray =
std::array<Metrics, static_cast<size_t>(ServiceExecutorTaskName::kMaxTaskName)>;
- enum class ThreadCreationReason { kStuckDetection, kStarvation, kReserveMinimum, kError, kMax };
+ enum class ThreadCreationReason { kStuckDetection, kStarvation, kReserveMinimum, kMax };
enum class ThreadTimer { kRunning, kExecuting };
struct ThreadState {
@@ -206,7 +205,7 @@ private:
TickSource::Tick _getThreadTimerTotal(ThreadTimer which,
const stdx::unique_lock<stdx::mutex>& lk) const;
- std::shared_ptr<asio::io_context> _ioContext;
+ ReactorHandle _reactorHandle;
std::unique_ptr<Options> _config;
diff --git a/src/mongo/transport/service_executor_test.cpp b/src/mongo/transport/service_executor_test.cpp
index 60437101b2d..8e749228631 100644
--- a/src/mongo/transport/service_executor_test.cpp
+++ b/src/mongo/transport/service_executor_test.cpp
@@ -76,17 +76,72 @@ struct TestOptions : public ServiceExecutorAdaptive::Options {
}
};
+/* This implements the portions of the transport::Reactor based on ASIO, but leaves out
+ * the methods not needed by ServiceExecutors.
+ *
+ * TODO Maybe use TransportLayerASIO's Reactor?
+ */
+class ASIOReactor : public transport::Reactor {
+public:
+ ASIOReactor() : _ioContext() {}
+
+ void run() noexcept final {
+ MONGO_UNREACHABLE;
+ }
+
+ void runFor(Milliseconds time) noexcept final {
+ asio::io_context::work work(_ioContext);
+
+ try {
+ _ioContext.run_for(time.toSystemDuration());
+ } catch (...) {
+ severe() << "Uncaught exception in reactor: " << exceptionToStatus();
+ fassertFailed(50476);
+ }
+ }
+
+ void stop() final {
+ _ioContext.stop();
+ }
+
+ std::unique_ptr<ReactorTimer> makeTimer() final {
+ MONGO_UNREACHABLE;
+ }
+
+ Date_t now() final {
+ MONGO_UNREACHABLE;
+ }
+
+ void schedule(ScheduleMode mode, Task task) final {
+ if (mode == kDispatch) {
+ _ioContext.dispatch(std::move(task));
+ } else {
+ _ioContext.post(std::move(task));
+ }
+ }
+
+ bool onReactorThread() const final {
+ return false;
+ }
+
+ operator asio::io_context&() {
+ return _ioContext;
+ }
+
+private:
+ asio::io_context _ioContext;
+};
+
class ServiceExecutorAdaptiveFixture : public unittest::Test {
protected:
void setUp() override {
auto scOwned = stdx::make_unique<ServiceContextNoop>();
setGlobalServiceContext(std::move(scOwned));
- asioIOCtx = std::make_shared<asio::io_context>();
auto configOwned = stdx::make_unique<TestOptions>();
executorConfig = configOwned.get();
executor = stdx::make_unique<ServiceExecutorAdaptive>(
- getGlobalServiceContext(), asioIOCtx, std::move(configOwned));
+ getGlobalServiceContext(), std::make_shared<ASIOReactor>(), std::move(configOwned));
}
ServiceExecutorAdaptive::Options* executorConfig;
diff --git a/src/mongo/transport/session.h b/src/mongo/transport/session.h
index a017fc6ce42..bfe45a6cbe2 100644
--- a/src/mongo/transport/session.h
+++ b/src/mongo/transport/session.h
@@ -114,6 +114,13 @@ public:
virtual Future<void> asyncSinkMessage(Message message) = 0;
/**
+ * Cancel any outstanding async operations. There is no way to cancel synchronous calls.
+ * Futures will finish with an ErrorCodes::CallbackCancelled error if they haven't already
+ * completed.
+ */
+ virtual void cancelAsyncOperations() = 0;
+
+ /**
* This should only be used to detect when the remote host has disappeared without
* notice. It does NOT work correctly for ensuring that operations complete or fail
* by some deadline.
diff --git a/src/mongo/transport/session_asio.h b/src/mongo/transport/session_asio.h
index d76f30e6393..ec198bc4dbb 100644
--- a/src/mongo/transport/session_asio.h
+++ b/src/mongo/transport/session_asio.h
@@ -101,7 +101,7 @@ public:
void end() override {
if (getSocket().is_open()) {
std::error_code ec;
- getSocket().cancel();
+ cancelAsyncOperations();
getSocket().shutdown(GenericSocket::shutdown_both, ec);
if ((ec) && (ec != asio::error::not_connected)) {
error() << "Error shutting down socket: " << ec.message();
@@ -139,6 +139,11 @@ public:
});
}
+ void cancelAsyncOperations() override {
+ LOG(3) << "Cancelling outstanding I/O operations on connection to " << _remote;
+ getSocket().cancel();
+ }
+
void setTimeout(boost::optional<Milliseconds> timeout) override {
invariant(!timeout || timeout->count() > 0);
_configuredTimeout = timeout;
@@ -180,7 +185,7 @@ protected:
friend class TransportLayerASIO;
#ifdef MONGO_CONFIG_SSL
- Future<void> handshakeSSLForEgress(HostAndPort target) {
+ Future<void> handshakeSSLForEgress(const HostAndPort& target) {
if (!_tl->_egressSSLContext) {
return Future<void>::makeReady(Status(ErrorCodes::SSLHandshakeFailed,
"SSL requested but SSL support is disabled"));
@@ -197,7 +202,7 @@ protected:
return _sslSocket->async_handshake(asio::ssl::stream_base::client, UseFuture{});
}
};
- return doHandshake().then([ this, target = std::move(target) ] {
+ return doHandshake().then([this, target] {
_ranHandshake = true;
auto sslManager = getSSLManager();
@@ -371,7 +376,13 @@ private:
#ifdef MONGO_CONFIG_SSL
_ranHandshake = true;
if (_sslSocket) {
- return opportunisticWrite(*_sslSocket, buffers);
+ if (_blockingMode == Async) {
+ // Opportunistic writes are broken for async egress SSL (switching between blocking
+ // and non-blocking mode corrupts the TLS exchange).
+ return asio::async_write(*_sslSocket, buffers, UseFuture{});
+ } else {
+ return opportunisticWrite(*_sslSocket, buffers);
+ }
}
#endif
return opportunisticWrite(_socket, buffers);
diff --git a/src/mongo/transport/transport_layer.h b/src/mongo/transport/transport_layer.h
index 29b4a7bb72c..1b8f502237f 100644
--- a/src/mongo/transport/transport_layer.h
+++ b/src/mongo/transport/transport_layer.h
@@ -28,9 +28,12 @@
#pragma once
+#include <memory>
+
#include "mongo/base/status.h"
#include "mongo/stdx/functional.h"
#include "mongo/transport/session.h"
+#include "mongo/util/future.h"
#include "mongo/util/net/message.h"
#include "mongo/util/time_support.h"
@@ -39,6 +42,9 @@ namespace transport {
enum ConnectSSLMode { kGlobalSSLMode, kEnableSSL, kDisableSSL };
+class Reactor;
+using ReactorHandle = std::shared_ptr<Reactor>;
+
/**
* The TransportLayer moves Messages between transport::Endpoints and the database.
* This class owns an Acceptor that generates new endpoints from which it can
@@ -69,10 +75,9 @@ public:
ConnectSSLMode sslMode,
Milliseconds timeout) = 0;
- virtual void asyncConnect(HostAndPort peer,
- ConnectSSLMode sslMode,
- Milliseconds timeout,
- std::function<void(StatusWith<SessionHandle>)> callback) = 0;
+ virtual Future<SessionHandle> asyncConnect(HostAndPort peer,
+ ConnectSSLMode sslMode,
+ const ReactorHandle& reactor) = 0;
/**
* Start the TransportLayer. After this point, the TransportLayer will begin accepting active
@@ -95,9 +100,79 @@ public:
*/
virtual Status setup() = 0;
+ enum WhichReactor { kIngress, kEgress, kNewReactor };
+ virtual ReactorHandle getReactor(WhichReactor which) = 0;
+
protected:
TransportLayer() = default;
};
+class ReactorTimer {
+public:
+ ReactorTimer() = default;
+ ReactorTimer(const ReactorTimer&) = delete;
+ ReactorTimer& operator=(const ReactorTimer&) = delete;
+
+ virtual ~ReactorTimer() = default;
+
+ /*
+ * Cancel any outstanding calls to waitFor/waitUntil. The future will have
+ * an ErrorCodes::CallbackCancelled status.
+ */
+ virtual void cancel() = 0;
+
+ /*
+ * Returns a future that will be filled with Status::OK after the timeout has
+ * ellapsed or has been cancelled.
+ */
+
+ virtual Future<void> waitFor(Milliseconds timeout) = 0;
+ virtual Future<void> waitUntil(Date_t timeout) = 0;
+};
+
+class Reactor {
+public:
+ Reactor(const Reactor&) = delete;
+ Reactor& operator=(const Reactor&) = delete;
+
+ virtual ~Reactor() = default;
+
+ /*
+ * Run the event loop of the reactor until stop() is called.
+ */
+ virtual void run() noexcept = 0;
+ virtual void runFor(Milliseconds time) noexcept = 0;
+ virtual void stop() = 0;
+
+ using Task = stdx::function<void()>;
+
+ enum ScheduleMode { kDispatch, kPost };
+ virtual void schedule(ScheduleMode mode, Task task) = 0;
+
+ template <typename Callback>
+ Future<FutureContinuationResult<Callback>> execute(Callback&& cb) {
+ Promise<FutureContinuationResult<Callback>> promise;
+ auto future = promise.getFuture();
+
+ schedule(kPost,
+ [ cb = std::forward<Callback>(cb), sp = promise.share() ] { sp.setWith(cb); });
+
+ return future;
+ }
+
+ virtual bool onReactorThread() const = 0;
+
+ /*
+ * Makes a timer tied to this reactor's event loop. Timeout callbacks will be
+ * executed in a thread calling run() or runFor().
+ */
+ virtual std::unique_ptr<ReactorTimer> makeTimer() = 0;
+ virtual Date_t now() = 0;
+
+protected:
+ Reactor() = default;
+};
+
+
} // namespace transport
} // namespace mongo
diff --git a/src/mongo/transport/transport_layer_asio.cpp b/src/mongo/transport/transport_layer_asio.cpp
index ff213b66b43..772a4eb189f 100644
--- a/src/mongo/transport/transport_layer_asio.cpp
+++ b/src/mongo/transport/transport_layer_asio.cpp
@@ -33,6 +33,7 @@
#include "mongo/transport/transport_layer_asio.h"
#include <asio.hpp>
+#include <asio/system_timer.hpp>
#include <boost/algorithm/string.hpp>
#include "mongo/config.h"
@@ -60,6 +61,131 @@
namespace mongo {
namespace transport {
+class ASIOReactorTimer final : public ReactorTimer {
+public:
+ explicit ASIOReactorTimer(asio::io_context& ctx)
+ : _timer(std::make_shared<asio::system_timer>(ctx)) {}
+
+ ~ASIOReactorTimer() {
+ // The underlying timer won't get destroyed until the last promise from _asyncWait
+ // has been filled, so cancel the timer so call callbacks get run
+ cancel();
+ }
+
+ void cancel() override {
+ _timer->cancel();
+ }
+
+ Future<void> waitFor(Milliseconds timeout) override {
+ return _asyncWait([&] { _timer->expires_after(timeout.toSystemDuration()); });
+ }
+
+ Future<void> waitUntil(Date_t expiration) override {
+ return _asyncWait([&] { _timer->expires_at(expiration.toSystemTimePoint()); });
+ }
+
+private:
+ template <typename Callback>
+ Future<void> _asyncWait(Callback&& cb) {
+ try {
+ cb();
+ Promise<void> promise;
+ auto ret = promise.getFuture();
+ _timer->async_wait(
+ [ promise = promise.share(), timer = _timer ](const std::error_code& ec) mutable {
+ if (ec) {
+ promise.setError(errorCodeToStatus(ec));
+ } else {
+ promise.emplaceValue();
+ }
+ });
+ return ret;
+ } catch (asio::system_error& ex) {
+ return Future<void>::makeReady(errorCodeToStatus(ex.code()));
+ }
+ }
+
+ // Destroying an asio::system_timer that has outstanding callbacks from async_wait will cause
+ // a broken promise - so this is managed by a shared ptr, so that each callback can extend the
+ // lifetime of the timer to after its been called.
+ std::shared_ptr<asio::system_timer> _timer;
+};
+
+class TransportLayerASIO::ASIOReactor final : public Reactor {
+public:
+ ASIOReactor() : _ioContext() {}
+
+ void run() noexcept override {
+ ThreadIdGuard threadIdGuard(this);
+ asio::io_context::work work(_ioContext);
+ try {
+ _ioContext.run();
+ } catch (...) {
+ severe() << "Uncaught exception in reactor: " << exceptionToStatus();
+ fassertFailed(40491);
+ }
+ }
+
+ void runFor(Milliseconds time) noexcept override {
+ ThreadIdGuard threadIdGuard(this);
+ asio::io_context::work work(_ioContext);
+
+ try {
+ _ioContext.run_for(time.toSystemDuration());
+ } catch (...) {
+ severe() << "Uncaught exception in reactor: " << exceptionToStatus();
+ fassertFailed(50473);
+ }
+ }
+
+ void stop() override {
+ _ioContext.stop();
+ }
+
+ std::unique_ptr<ReactorTimer> makeTimer() override {
+ return std::make_unique<ASIOReactorTimer>(_ioContext);
+ }
+
+ Date_t now() override {
+ return Date_t(asio::system_timer::clock_type::now());
+ }
+
+ void schedule(ScheduleMode mode, Task task) override {
+ if (mode == kDispatch) {
+ _ioContext.dispatch(std::move(task));
+ } else {
+ _ioContext.post(std::move(task));
+ }
+ }
+
+ bool onReactorThread() const override {
+ return this == _reactorForThread;
+ }
+
+ operator asio::io_context&() {
+ return _ioContext;
+ }
+
+private:
+ class ThreadIdGuard {
+ public:
+ ThreadIdGuard(TransportLayerASIO::ASIOReactor* reactor) {
+ _reactorForThread = reactor;
+ }
+
+ ~ThreadIdGuard() {
+ _reactorForThread = nullptr;
+ }
+ };
+
+ static thread_local ASIOReactor* _reactorForThread;
+
+ asio::io_context _ioContext;
+};
+
+thread_local TransportLayerASIO::ASIOReactor* TransportLayerASIO::ASIOReactor::_reactorForThread =
+ nullptr;
+
TransportLayerASIO::Options::Options(const ServerGlobalParams* params)
: port(params->port),
ipList(params->bind_ip),
@@ -72,8 +198,9 @@ TransportLayerASIO::Options::Options(const ServerGlobalParams* params)
TransportLayerASIO::TransportLayerASIO(const TransportLayerASIO::Options& opts,
ServiceEntryPoint* sep)
- : _workerIOContext(std::make_shared<asio::io_context>()),
- _acceptorIOContext(stdx::make_unique<asio::io_context>()),
+ : _ingressReactor(std::make_shared<ASIOReactor>()),
+ _egressReactor(std::make_shared<ASIOReactor>()),
+ _acceptorReactor(std::make_shared<ASIOReactor>()),
#ifdef MONGO_CONFIG_SSL
_ingressSSLContext(nullptr),
_egressSSLContext(nullptr),
@@ -84,11 +211,74 @@ TransportLayerASIO::TransportLayerASIO(const TransportLayerASIO::Options& opts,
TransportLayerASIO::~TransportLayerASIO() = default;
+using Resolver = asio::ip::tcp::resolver;
+class WrappedResolver {
+public:
+ using Flags = Resolver::flags;
+ using Results = Resolver::results_type;
+
+ explicit WrappedResolver(asio::io_context& ioCtx) : _resolver(ioCtx) {}
+
+ Future<Results> resolve(const HostAndPort& peer, Flags flags, bool enableIPv6) {
+ Results results;
+
+ std::error_code ec;
+ auto port = std::to_string(peer.port());
+ if (enableIPv6) {
+ results = _resolver.resolve(peer.host(), port, flags, ec);
+ } else {
+ results = _resolver.resolve(asio::ip::tcp::v4(), peer.host(), port, flags, ec);
+ }
+
+ if (ec) {
+ return _makeFuture(errorCodeToStatus(ec), peer);
+ } else {
+ return _makeFuture(results, peer);
+ }
+ }
+
+ Future<Results> asyncResolve(const HostAndPort& peer, Flags flags, bool enableIPv6) {
+ auto port = std::to_string(peer.port());
+ Future<Results> ret;
+ if (enableIPv6) {
+ ret = _resolver.async_resolve(peer.host(), port, flags, UseFuture{});
+ } else {
+ ret =
+ _resolver.async_resolve(asio::ip::tcp::v4(), peer.host(), port, flags, UseFuture{});
+ }
+
+ return std::move(ret)
+ .onError([this, peer](Status status) { return _makeFuture(status, peer); })
+ .then([this, peer](Results results) { return _makeFuture(results, peer); });
+ }
+
+ void cancel() {
+ _resolver.cancel();
+ }
+
+private:
+ Future<Results> _makeFuture(StatusWith<Results> results, const HostAndPort& peer) {
+ if (!results.isOK()) {
+ return Status{ErrorCodes::HostNotFound,
+ str::stream() << "Could not find address for " << peer << ": "
+ << results.getStatus()};
+ } else if (results.getValue().empty()) {
+ return Status{ErrorCodes::HostNotFound,
+ str::stream() << "Could not find address for " << peer};
+ } else {
+ return std::move(results.getValue());
+ }
+ }
+
+ Resolver _resolver;
+};
+
+
StatusWith<SessionHandle> TransportLayerASIO::connect(HostAndPort peer,
ConnectSSLMode sslMode,
Milliseconds timeout) {
std::error_code ec;
- GenericSocket sock(*_workerIOContext);
+ GenericSocket sock(*_egressReactor);
#ifndef _WIN32
if (mongoutils::str::contains(peer.host(), '/')) {
invariant(!peer.hasPort());
@@ -102,31 +292,7 @@ StatusWith<SessionHandle> TransportLayerASIO::connect(HostAndPort peer,
}
#endif
- using Resolver = asio::ip::tcp::resolver;
- Resolver resolver(*_workerIOContext);
- std::string portNumberStr = std::to_string(peer.port());
- auto doResolve = [&](auto resolverFlags) -> StatusWith<Resolver::iterator> {
- // If IPv6 is disabled, then we should specify that we only want IPv4 addresses, otherwise
- // we should do a normal AF_UNSPEC resolution to get both IPv4/IPv6
- Resolver::iterator resolverIt;
- if (_listenerOptions.enableIPv6) {
- resolverIt = resolver.resolve(peer.host(), portNumberStr, resolverFlags, ec);
- } else {
- resolverIt = resolver.resolve(
- asio::ip::tcp::v4(), peer.host(), portNumberStr, resolverFlags, ec);
- }
-
- if (ec) {
- return {ErrorCodes::HostNotFound,
- str::stream() << "Could not find address for " << peer.host() << ": "
- << ec.message()};
- } else if (resolverIt == Resolver::iterator()) {
- return {ErrorCodes::HostNotFound,
- str::stream() << "Could not find address for " << peer.host()};
- }
-
- return resolverIt;
- };
+ WrappedResolver resolver(*_egressReactor);
// We always want to resolve the "service" (port number) as a numeric.
//
@@ -140,10 +306,13 @@ StatusWith<SessionHandle> TransportLayerASIO::connect(HostAndPort peer,
//
// Then, if the numeric (IP address) lookup failed, we fall back to DNS or return the error
// from the resolver.
- auto swResolverIt = doResolve(resolverFlags | Resolver::numeric_host);
+ auto swResolverIt =
+ resolver.resolve(peer, resolverFlags | Resolver::numeric_host, _listenerOptions.enableIPv6)
+ .getNoThrow();
if (!swResolverIt.isOK()) {
if (swResolverIt == ErrorCodes::HostNotFound) {
- swResolverIt = doResolve(resolverFlags);
+ swResolverIt =
+ resolver.resolve(peer, resolverFlags, _listenerOptions.enableIPv6).getNoThrow();
if (!swResolverIt.isOK()) {
return swResolverIt.getStatus();
}
@@ -183,7 +352,7 @@ StatusWith<SessionHandle> TransportLayerASIO::connect(HostAndPort peer,
template <typename Endpoint>
StatusWith<TransportLayerASIO::ASIOSessionHandle> TransportLayerASIO::_doSyncConnect(
Endpoint endpoint, const HostAndPort& peer, const Milliseconds& timeout) {
- GenericSocket sock(*_workerIOContext);
+ GenericSocket sock(*_egressReactor);
std::error_code ec;
sock.open(endpoint.protocol());
sock.non_blocking(true);
@@ -210,11 +379,69 @@ StatusWith<TransportLayerASIO::ASIOSessionHandle> TransportLayerASIO::_doSyncCon
return std::make_shared<ASIOSession>(this, std::move(sock));
}
-void TransportLayerASIO::asyncConnect(HostAndPort peer,
- ConnectSSLMode sslMode,
- Milliseconds timeout,
- std::function<void(StatusWith<SessionHandle>)> callback) {
- MONGO_UNREACHABLE;
+Future<SessionHandle> TransportLayerASIO::asyncConnect(HostAndPort peer,
+ ConnectSSLMode sslMode,
+ const ReactorHandle& reactor) {
+ struct AsyncConnectState {
+ AsyncConnectState(HostAndPort peer, asio::io_context& context)
+ : socket(context), resolver(context), peer(std::move(peer)) {}
+
+ Future<SessionHandle> finish() {
+ return SessionHandle(std::move(session));
+ }
+
+ GenericSocket socket;
+ WrappedResolver resolver;
+ const HostAndPort peer;
+ TransportLayerASIO::ASIOSessionHandle session;
+ };
+
+ auto reactorImpl = checked_cast<ASIOReactor*>(reactor.get());
+ auto connector = std::make_shared<AsyncConnectState>(std::move(peer), *reactorImpl);
+
+ if (connector->peer.host().empty()) {
+ return Status{ErrorCodes::HostNotFound, "Hostname or IP address to connect to is empty"};
+ }
+
+ // We always want to resolve the "service" (port number) as a numeric.
+ //
+ // We intentionally don't set the Resolver::address_configured flag because it might prevent us
+ // from connecting to localhost on hosts with only a loopback interface (see SERVER-1579).
+ const auto resolverFlags = Resolver::numeric_service;
+ return connector->resolver
+ .asyncResolve(
+ connector->peer, resolverFlags | Resolver::numeric_host, _listenerOptions.enableIPv6)
+ .onError([this, connector, resolverFlags](Status status) {
+ return connector->resolver.asyncResolve(
+ connector->peer, resolverFlags, _listenerOptions.enableIPv6);
+ })
+ .then([connector](WrappedResolver::Results results) {
+ connector->socket.open(results->endpoint().protocol());
+ connector->socket.non_blocking(true);
+ return connector->socket.async_connect(results->endpoint(), UseFuture{});
+ })
+ .then([this, connector, sslMode]() {
+ connector->session = std::make_shared<ASIOSession>(this, std::move(connector->socket));
+ connector->session->ensureAsync();
+#ifndef MONGO_CONFIG_SSL
+ if (sslMode == kEnableSSL) {
+ uasserted(ErrorCodes::InvalidSSLConfiguration, "SSL requested but not supported");
+ }
+#else
+ auto globalSSLMode = _sslMode();
+ if (sslMode == kEnableSSL ||
+ (sslMode == kGlobalSSLMode && ((globalSSLMode == SSLParams::SSLMode_preferSSL) ||
+ (globalSSLMode == SSLParams::SSLMode_requireSSL)))) {
+ return connector->session->handshakeSSLForEgress(connector->peer).then([connector] {
+ return connector->finish();
+ });
+ }
+#endif
+ return connector->finish();
+ })
+ .onError([connector](Status status) -> Future<SessionHandle> {
+ return status.withContext(str::stream() << "Error connecting to " << connector->peer);
+ });
}
Status TransportLayerASIO::setup() {
@@ -273,7 +500,7 @@ Status TransportLayerASIO::setup() {
fassertFailedNoTrace(40488);
}
- GenericAcceptor acceptor(*_acceptorIOContext);
+ GenericAcceptor acceptor(*_acceptorReactor);
acceptor.open(endpoint.protocol());
acceptor.set_option(GenericAcceptor::reuse_address(true));
if (addr.getType() == AF_INET6) {
@@ -365,13 +592,7 @@ Status TransportLayerASIO::start() {
_listenerThread = stdx::thread([this] {
setThreadName("listener");
while (_running.load()) {
- asio::io_context::work work(*_acceptorIOContext);
- try {
- _acceptorIOContext->run();
- } catch (...) {
- severe() << "Uncaught exception in the listener: " << exceptionToStatus();
- fassertFailed(40491);
- }
+ _acceptorReactor->run();
}
});
@@ -415,13 +636,22 @@ void TransportLayerASIO::shutdown() {
// Otherwise the ServiceExecutor may need to continue running the io_context to drain running
// connections, so we just cancel the acceptors and return.
if (_listenerThread.joinable()) {
- _acceptorIOContext->stop();
+ _acceptorReactor->stop();
_listenerThread.join();
}
}
-const std::shared_ptr<asio::io_context>& TransportLayerASIO::getIOContext() {
- return _workerIOContext;
+ReactorHandle TransportLayerASIO::getReactor(WhichReactor which) {
+ switch (which) {
+ case TransportLayer::kIngress:
+ return _ingressReactor;
+ case TransportLayer::kEgress:
+ return _egressReactor;
+ case TransportLayer::kNewReactor:
+ return std::make_shared<ASIOReactor>();
+ }
+
+ MONGO_UNREACHABLE;
}
void TransportLayerASIO::_acceptConnection(GenericAcceptor& acceptor) {
@@ -442,7 +672,7 @@ void TransportLayerASIO::_acceptConnection(GenericAcceptor& acceptor) {
_acceptConnection(acceptor);
};
- acceptor.async_accept(*_workerIOContext, std::move(acceptCb));
+ acceptor.async_accept(*_ingressReactor, std::move(acceptCb));
}
#ifdef MONGO_CONFIG_SSL
diff --git a/src/mongo/transport/transport_layer_asio.h b/src/mongo/transport/transport_layer_asio.h
index da2bdab547e..85ead5799b3 100644
--- a/src/mongo/transport/transport_layer_asio.h
+++ b/src/mongo/transport/transport_layer_asio.h
@@ -74,12 +74,12 @@ class TransportLayerASIO final : public TransportLayer {
public:
struct Options {
- explicit Options(const ServerGlobalParams* params);
- Options() = default;
-
constexpr static auto kIngress = 0x1;
constexpr static auto kEgress = 0x10;
+ explicit Options(const ServerGlobalParams* params);
+ Options() = default;
+
int mode = kIngress | kEgress;
bool isIngress() const {
@@ -108,24 +108,26 @@ public:
StatusWith<SessionHandle> connect(HostAndPort peer,
ConnectSSLMode sslMode,
Milliseconds timeout) final;
- void asyncConnect(HostAndPort peer,
- ConnectSSLMode sslMode,
- Milliseconds timeout,
- std::function<void(StatusWith<SessionHandle>)> callback) final;
+
+ Future<SessionHandle> asyncConnect(HostAndPort peer,
+ ConnectSSLMode sslMode,
+ const ReactorHandle& reactor) final;
Status setup() final;
+
+ ReactorHandle getReactor(WhichReactor which) final;
+
Status start() final;
void shutdown() final;
- const std::shared_ptr<asio::io_context>& getIOContext();
-
int listenerPort() const {
return _listenerPort;
}
private:
class ASIOSession;
+ class ASIOReactor;
using ASIOSessionHandle = std::shared_ptr<ASIOSession>;
using ConstASIOSessionHandle = std::shared_ptr<const ASIOSession>;
@@ -144,31 +146,31 @@ private:
stdx::mutex _mutex;
- // There are two IO contexts that are used by TransportLayerASIO. The _workerIOContext
- // contains all the accepted sockets and all normal networking activity. The
- // _acceptorIOContext contains all the sockets in _acceptors.
+ // There are three reactors that are used by TransportLayerASIO. The _ingressReactor contains
+ // all the accepted sockets and all ingress networking activity. The _acceptorReactor contains
+ // all the sockets in _acceptors. The _egressReactor contains egress connections.
//
- // TransportLayerASIO should never call run() on the _workerIOContext.
+ // TransportLayerASIO should never call run() on the _ingressReactor.
// In synchronous mode, this will cause a massive performance degradation due to
// unnecessary wakeups on the asio thread for sockets we don't intend to interact
// with asynchronously. The additional IO context avoids registering those sockets
// with the acceptors epoll set, thus avoiding those wakeups. Calling run will
// undo that benefit.
//
- // TransportLayerASIO should run its own thread that calls run() on the _acceptorIOContext
+ // TransportLayerASIO should run its own thread that calls run() on the _acceptorReactor
// to process calls to async_accept - this is the equivalent of the "listener" thread in
// other TransportLayers.
//
// The underlying problem that caused this is here:
// https://github.com/chriskohlhoff/asio/issues/240
//
- // It is important that the io_context be declared before the
- // vector of acceptors (or any other state that is associated with
- // the io_context), so that we destroy any existing acceptors or
- // other io_service associated state before we drop the refcount
- // on the io_context, which may destroy it.
- std::shared_ptr<asio::io_context> _workerIOContext;
- std::unique_ptr<asio::io_context> _acceptorIOContext;
+ // It is important that the reactors be declared before the vector of acceptors (or any other
+ // state that is associated with the reactors), so that we destroy any existing acceptors or
+ // other reactor associated state before we drop the refcount on the reactor, which may destroy
+ // it.
+ std::shared_ptr<ASIOReactor> _ingressReactor;
+ std::shared_ptr<ASIOReactor> _egressReactor;
+ std::shared_ptr<ASIOReactor> _acceptorReactor;
#ifdef MONGO_CONFIG_SSL
std::unique_ptr<asio::ssl::context> _ingressSSLContext;
diff --git a/src/mongo/transport/transport_layer_manager.cpp b/src/mongo/transport/transport_layer_manager.cpp
index f53250e91bd..f92538c773a 100644
--- a/src/mongo/transport/transport_layer_manager.cpp
+++ b/src/mongo/transport/transport_layer_manager.cpp
@@ -65,11 +65,14 @@ StatusWith<SessionHandle> TransportLayerManager::connect(HostAndPort peer,
return _tls.front()->connect(peer, sslMode, timeout);
}
-void TransportLayerManager::asyncConnect(HostAndPort peer,
- ConnectSSLMode sslMode,
- Milliseconds timeout,
- std::function<void(StatusWith<SessionHandle>)> callback) {
- MONGO_UNREACHABLE;
+Future<SessionHandle> TransportLayerManager::asyncConnect(HostAndPort peer,
+ ConnectSSLMode sslMode,
+ const ReactorHandle& reactor) {
+ return _tls.front()->asyncConnect(peer, sslMode, reactor);
+}
+
+ReactorHandle TransportLayerManager::getReactor(WhichReactor which) {
+ return _tls.front()->getReactor(which);
}
// TODO Right now this and setup() leave TLs started if there's an error. In practice the server
@@ -112,6 +115,16 @@ Status TransportLayerManager::addAndStartTransportLayer(std::unique_ptr<Transpor
return ptr->start();
}
+std::unique_ptr<TransportLayer> TransportLayerManager::makeAndStartDefaultEgressTransportLayer() {
+ transport::TransportLayerASIO::Options opts(&serverGlobalParams);
+ opts.mode = transport::TransportLayerASIO::Options::kEgress;
+
+ auto ret = stdx::make_unique<transport::TransportLayerASIO>(opts, nullptr);
+ uassertStatusOK(ret->setup());
+ uassertStatusOK(ret->start());
+ return std::unique_ptr<TransportLayer>(std::move(ret));
+}
+
std::unique_ptr<TransportLayer> TransportLayerManager::createWithConfig(
const ServerGlobalParams* config, ServiceContext* ctx) {
std::unique_ptr<TransportLayer> transportLayer;
@@ -129,8 +142,9 @@ std::unique_ptr<TransportLayer> TransportLayerManager::createWithConfig(
auto transportLayerASIO = stdx::make_unique<transport::TransportLayerASIO>(opts, sep);
if (config->serviceExecutor == "adaptive") {
+ auto reactor = transportLayerASIO->getReactor(TransportLayer::kIngress);
ctx->setServiceExecutor(
- stdx::make_unique<ServiceExecutorAdaptive>(ctx, transportLayerASIO->getIOContext()));
+ stdx::make_unique<ServiceExecutorAdaptive>(ctx, std::move(reactor)));
} else if (config->serviceExecutor == "synchronous") {
ctx->setServiceExecutor(stdx::make_unique<ServiceExecutorSynchronous>(ctx));
}
diff --git a/src/mongo/transport/transport_layer_manager.h b/src/mongo/transport/transport_layer_manager.h
index 70f54b10652..d90386829e2 100644
--- a/src/mongo/transport/transport_layer_manager.h
+++ b/src/mongo/transport/transport_layer_manager.h
@@ -60,15 +60,16 @@ public:
StatusWith<SessionHandle> connect(HostAndPort peer,
ConnectSSLMode sslMode,
Milliseconds timeout) override;
- void asyncConnect(HostAndPort peer,
- ConnectSSLMode sslMode,
- Milliseconds timeout,
- std::function<void(StatusWith<SessionHandle>)> callback) override;
+ Future<SessionHandle> asyncConnect(HostAndPort peer,
+ ConnectSSLMode sslMode,
+ const ReactorHandle& reactor) override;
Status start() override;
void shutdown() override;
Status setup() override;
+ ReactorHandle getReactor(WhichReactor which) override;
+
// TODO This method is not called anymore, but may be useful to add new TransportLayers
// to the manager after it's been created.
Status addAndStartTransportLayer(std::unique_ptr<TransportLayer> tl);
@@ -85,6 +86,8 @@ public:
static std::unique_ptr<TransportLayer> createWithConfig(const ServerGlobalParams* config,
ServiceContext* ctx);
+ static std::unique_ptr<TransportLayer> makeAndStartDefaultEgressTransportLayer();
+
private:
template <typename Callable>
void _foreach(Callable&& cb) const;
diff --git a/src/mongo/transport/transport_layer_mock.cpp b/src/mongo/transport/transport_layer_mock.cpp
index 0fc6ce7964c..6b23127e80f 100644
--- a/src/mongo/transport/transport_layer_mock.cpp
+++ b/src/mongo/transport/transport_layer_mock.cpp
@@ -68,10 +68,9 @@ StatusWith<SessionHandle> TransportLayerMock::connect(HostAndPort peer,
MONGO_UNREACHABLE;
}
-void TransportLayerMock::asyncConnect(HostAndPort peer,
- ConnectSSLMode sslMode,
- Milliseconds timeout,
- std::function<void(StatusWith<SessionHandle>)> callback) {
+Future<SessionHandle> TransportLayerMock::asyncConnect(HostAndPort peer,
+ ConnectSSLMode sslMode,
+ const ReactorHandle& reactor) {
MONGO_UNREACHABLE;
}
@@ -89,6 +88,10 @@ void TransportLayerMock::shutdown() {
}
}
+ReactorHandle TransportLayerMock::getReactor(WhichReactor which) {
+ return nullptr;
+}
+
bool TransportLayerMock::inShutdown() const {
return _shutdown;
}
diff --git a/src/mongo/transport/transport_layer_mock.h b/src/mongo/transport/transport_layer_mock.h
index 06a9cd3f37c..001c2bd76fa 100644
--- a/src/mongo/transport/transport_layer_mock.h
+++ b/src/mongo/transport/transport_layer_mock.h
@@ -56,16 +56,18 @@ public:
StatusWith<SessionHandle> connect(HostAndPort peer,
ConnectSSLMode sslMode,
Milliseconds timeout) override;
- void asyncConnect(HostAndPort peer,
- ConnectSSLMode sslMode,
- Milliseconds timeout,
- std::function<void(StatusWith<SessionHandle>)> callback) override;
+ Future<SessionHandle> asyncConnect(HostAndPort peer,
+ ConnectSSLMode sslMode,
+ const ReactorHandle& reactor) override;
Status setup() override;
Status start() override;
void shutdown() override;
bool inShutdown() const;
+
+ virtual ReactorHandle getReactor(WhichReactor which) override;
+
// Set to a factory function to use your own session type.
std::function<SessionHandle(TransportLayer*)> createSessionHook;
diff --git a/src/mongo/util/future.h b/src/mongo/util/future.h
index 9fa51d173b4..978dca58106 100644
--- a/src/mongo/util/future.h
+++ b/src/mongo/util/future.h
@@ -652,6 +652,10 @@ public:
Future(const Future&) = delete;
Future& operator=(const Future&) = delete;
+ /* implicit */ Future(T val) : Future(makeReady(std::move(val))) {}
+ /* implicit */ Future(Status status) : Future(makeReady(std::move(status))) {}
+ /* implicit */ Future(StatusWith<T> sw) : Future(makeReady(std::move(sw))) {}
+
/**
* Make a ready Future<T> from a value for cases where you don't need to wait asynchronously.
*
@@ -1144,7 +1148,8 @@ class MONGO_WARN_UNUSED_RESULT_CLASS future_details::Future<void> {
public:
using value_type = void;
- Future() = default;
+ /* implicit */ Future() : Future(makeReady()) {}
+ /* implicit */ Future(Status status) : Future(makeReady(std::move(status))) {}
static Future<void> makeReady() {
return Future<FakeVoid>::makeReady(FakeVoid{});