diff options
Diffstat (limited to 'src/mongo')
-rw-r--r-- | src/mongo/dbtests/SConscript | 1 | ||||
-rw-r--r-- | src/mongo/dbtests/dbtests.cpp | 4 | ||||
-rw-r--r-- | src/mongo/transport/asio_utils.h | 4 | ||||
-rw-r--r-- | src/mongo/transport/mock_session.h | 2 | ||||
-rw-r--r-- | src/mongo/transport/service_executor_adaptive.cpp | 49 | ||||
-rw-r--r-- | src/mongo/transport/service_executor_adaptive.h | 11 | ||||
-rw-r--r-- | src/mongo/transport/service_executor_test.cpp | 59 | ||||
-rw-r--r-- | src/mongo/transport/session.h | 7 | ||||
-rw-r--r-- | src/mongo/transport/session_asio.h | 19 | ||||
-rw-r--r-- | src/mongo/transport/transport_layer.h | 83 | ||||
-rw-r--r-- | src/mongo/transport/transport_layer_asio.cpp | 326 | ||||
-rw-r--r-- | src/mongo/transport/transport_layer_asio.h | 44 | ||||
-rw-r--r-- | src/mongo/transport/transport_layer_manager.cpp | 26 | ||||
-rw-r--r-- | src/mongo/transport/transport_layer_manager.h | 11 | ||||
-rw-r--r-- | src/mongo/transport/transport_layer_mock.cpp | 11 | ||||
-rw-r--r-- | src/mongo/transport/transport_layer_mock.h | 10 | ||||
-rw-r--r-- | src/mongo/util/future.h | 7 |
17 files changed, 534 insertions, 140 deletions
diff --git a/src/mongo/dbtests/SConscript b/src/mongo/dbtests/SConscript index 9088d5fb117..0257196aafb 100644 --- a/src/mongo/dbtests/SConscript +++ b/src/mongo/dbtests/SConscript @@ -140,6 +140,7 @@ dbtest = env.Program( "$BUILD_DIR/mongo/db/sessions_collection_standalone", "$BUILD_DIR/mongo/db/storage/mmap_v1/paths", "$BUILD_DIR/mongo/db/storage/kv/kv_engine_core", + "$BUILD_DIR/mongo/transport/transport_layer_manager", "$BUILD_DIR/mongo/util/clock_source_mock", "$BUILD_DIR/mongo/util/net/network", "$BUILD_DIR/mongo/util/progress_meter", diff --git a/src/mongo/dbtests/dbtests.cpp b/src/mongo/dbtests/dbtests.cpp index 07db83f45ac..53ecaaa74e9 100644 --- a/src/mongo/dbtests/dbtests.cpp +++ b/src/mongo/dbtests/dbtests.cpp @@ -55,6 +55,7 @@ #include "mongo/dbtests/framework.h" #include "mongo/scripting/engine.h" #include "mongo/stdx/memory.h" +#include "mongo/transport/transport_layer_manager.h" #include "mongo/util/clock_source_mock.h" #include "mongo/util/quick_exit.h" #include "mongo/util/signal_handlers_synchronous.h" @@ -156,6 +157,9 @@ int dbtestsMain(int argc, char** argv, char** envp) { preciseClock->advance(Seconds(1)); service->setPreciseClockSource(std::move(preciseClock)); + service->setTransportLayer( + transport::TransportLayerManager::makeAndStartDefaultEgressTransportLayer()); + repl::ReplicationCoordinator::set( service, std::unique_ptr<repl::ReplicationCoordinator>( diff --git a/src/mongo/transport/asio_utils.h b/src/mongo/transport/asio_utils.h index 8f29b709fda..be660d8b861 100644 --- a/src/mongo/transport/asio_utils.h +++ b/src/mongo/transport/asio_utils.h @@ -60,6 +60,10 @@ inline Status errorCodeToStatus(const std::error_code& ec) { if (!ec) return Status::OK(); + if (ec == asio::error::operation_aborted) { + return {ErrorCodes::CallbackCanceled, "Callback was canceled"}; + } + #ifdef _WIN32 if (ec == asio::error::timed_out) { #else diff --git a/src/mongo/transport/mock_session.h b/src/mongo/transport/mock_session.h index a21c913b98e..19379babb20 100644 --- a/src/mongo/transport/mock_session.h +++ b/src/mongo/transport/mock_session.h @@ -103,6 +103,8 @@ public: return Future<void>::makeReady(sinkMessage(message)); } + void cancelAsyncOperations() override {} + void setTimeout(boost::optional<Milliseconds>) override {} bool isConnected() override { diff --git a/src/mongo/transport/service_executor_adaptive.cpp b/src/mongo/transport/service_executor_adaptive.cpp index f038d62c57a..d0014485cc4 100644 --- a/src/mongo/transport/service_executor_adaptive.cpp +++ b/src/mongo/transport/service_executor_adaptive.cpp @@ -92,7 +92,6 @@ constexpr auto kExecutorName = "adaptive"_sd; constexpr auto kStuckDetection = "stuckThreadsDetected"_sd; constexpr auto kStarvation = "starvation"_sd; constexpr auto kReserveMinimum = "belowReserveMinimum"_sd; -constexpr auto kBecauseOfError = "replacingCrashedThreads"_sd; constexpr auto kThreadReasons = "threadCreationCauses"_sd; int64_t ticksToMicros(TickSource::Tick ticks, TickSource* tickSource) { @@ -153,14 +152,14 @@ struct ServerParameterOptions : public ServiceExecutorAdaptive::Options { thread_local ServiceExecutorAdaptive::ThreadState* ServiceExecutorAdaptive::_localThreadState = nullptr; -ServiceExecutorAdaptive::ServiceExecutorAdaptive(ServiceContext* ctx, - std::shared_ptr<asio::io_context> ioCtx) - : ServiceExecutorAdaptive(ctx, std::move(ioCtx), stdx::make_unique<ServerParameterOptions>()) {} +ServiceExecutorAdaptive::ServiceExecutorAdaptive(ServiceContext* ctx, ReactorHandle reactor) + : ServiceExecutorAdaptive( + ctx, std::move(reactor), stdx::make_unique<ServerParameterOptions>()) {} ServiceExecutorAdaptive::ServiceExecutorAdaptive(ServiceContext* ctx, - std::shared_ptr<asio::io_context> ioCtx, + ReactorHandle reactor, std::unique_ptr<Options> config) - : _ioContext(std::move(ioCtx)), + : _reactorHandle(reactor), _config(std::move(config)), _tickSource(ctx->getTickSource()), _lastScheduleTimer(_tickSource) {} @@ -190,7 +189,7 @@ Status ServiceExecutorAdaptive::shutdown(Milliseconds timeout) { _controllerThread.join(); stdx::unique_lock<stdx::mutex> lk(_threadsMutex); - _ioContext->stop(); + _reactorHandle->stop(); bool result = _deathCondition.wait_for(lk, timeout.toSystemDuration(), [&] { return _threads.empty(); }); @@ -254,9 +253,9 @@ Status ServiceExecutorAdaptive::schedule(ServiceExecutorAdaptive::Task task, // can be called immediately and recursively. if ((flags & kMayRecurse) && (_localThreadState->recursionDepth + 1 < _config->recursionLimit())) { - _ioContext->dispatch(std::move(wrappedTask)); + _reactorHandle->schedule(Reactor::kDispatch, std::move(wrappedTask)); } else { - _ioContext->post(std::move(wrappedTask)); + _reactorHandle->schedule(Reactor::kPost, std::move(wrappedTask)); } _lastScheduleTimer.reset(); @@ -588,31 +587,11 @@ void ServiceExecutorAdaptive::_workerThreadRoutine( // Reset ticksSpentExecuting timer state->executingCurRun = 0; - try { - asio::io_context::work work(*_ioContext); - // If we're still "pending" only try to run one task, that way the controller will - // know that it's okay to start adding threads to avoid starvation again. - state->running.markRunning(); - _ioContext->run_for(runTime.toSystemDuration()); - - // _ioContext->run_one() will return when all the scheduled handlers are completed, and - // you must call restart() to call run_one() again or else it will return immediately. - // In the case where the server has just started and there has been no work yet, this - // means this loop will spin until the first client connect. This call to restart avoids - // that. - if (_ioContext->stopped()) - _ioContext->restart(); - // If an exception escaped from ASIO, then break from this thread and start a new one. - } catch (std::exception& e) { - log() << "Exception escaped worker thread: " << e.what() - << " Starting new worker thread."; - _startWorkerThread(ThreadCreationReason::kError); - break; - } catch (...) { - log() << "Unknown exception escaped worker thread. Starting new worker thread."; - _startWorkerThread(ThreadCreationReason::kError); - break; - } + // If we're still "pending" only try to run one task, that way the controller will + // know that it's okay to start adding threads to avoid starvation again. + state->running.markRunning(); + _reactorHandle->runFor(runTime); + auto spentRunning = state->running.markStopped(); // If we spent less than our idle threshold actually running tasks then exit the thread. @@ -678,8 +657,6 @@ StringData ServiceExecutorAdaptive::_threadStartedByToString( return kStarvation; case ThreadCreationReason::kReserveMinimum: return kReserveMinimum; - case ThreadCreationReason::kError: - return kBecauseOfError; default: MONGO_UNREACHABLE; } diff --git a/src/mongo/transport/service_executor_adaptive.h b/src/mongo/transport/service_executor_adaptive.h index d300eeaa279..07ae2f8b0cd 100644 --- a/src/mongo/transport/service_executor_adaptive.h +++ b/src/mongo/transport/service_executor_adaptive.h @@ -38,10 +38,9 @@ #include "mongo/stdx/thread.h" #include "mongo/transport/service_executor.h" #include "mongo/transport/service_executor_task_names.h" +#include "mongo/transport/transport_layer.h" #include "mongo/util/tick_source.h" -#include <asio.hpp> - namespace mongo { namespace transport { @@ -82,9 +81,9 @@ public: virtual int recursionLimit() const = 0; }; - explicit ServiceExecutorAdaptive(ServiceContext* ctx, std::shared_ptr<asio::io_context> ioCtx); + explicit ServiceExecutorAdaptive(ServiceContext* ctx, ReactorHandle reactor); explicit ServiceExecutorAdaptive(ServiceContext* ctx, - std::shared_ptr<asio::io_context> ioCtx, + ReactorHandle reactor, std::unique_ptr<Options> config); ServiceExecutorAdaptive(ServiceExecutorAdaptive&&) = default; @@ -177,7 +176,7 @@ private: using MetricsArray = std::array<Metrics, static_cast<size_t>(ServiceExecutorTaskName::kMaxTaskName)>; - enum class ThreadCreationReason { kStuckDetection, kStarvation, kReserveMinimum, kError, kMax }; + enum class ThreadCreationReason { kStuckDetection, kStarvation, kReserveMinimum, kMax }; enum class ThreadTimer { kRunning, kExecuting }; struct ThreadState { @@ -206,7 +205,7 @@ private: TickSource::Tick _getThreadTimerTotal(ThreadTimer which, const stdx::unique_lock<stdx::mutex>& lk) const; - std::shared_ptr<asio::io_context> _ioContext; + ReactorHandle _reactorHandle; std::unique_ptr<Options> _config; diff --git a/src/mongo/transport/service_executor_test.cpp b/src/mongo/transport/service_executor_test.cpp index 60437101b2d..8e749228631 100644 --- a/src/mongo/transport/service_executor_test.cpp +++ b/src/mongo/transport/service_executor_test.cpp @@ -76,17 +76,72 @@ struct TestOptions : public ServiceExecutorAdaptive::Options { } }; +/* This implements the portions of the transport::Reactor based on ASIO, but leaves out + * the methods not needed by ServiceExecutors. + * + * TODO Maybe use TransportLayerASIO's Reactor? + */ +class ASIOReactor : public transport::Reactor { +public: + ASIOReactor() : _ioContext() {} + + void run() noexcept final { + MONGO_UNREACHABLE; + } + + void runFor(Milliseconds time) noexcept final { + asio::io_context::work work(_ioContext); + + try { + _ioContext.run_for(time.toSystemDuration()); + } catch (...) { + severe() << "Uncaught exception in reactor: " << exceptionToStatus(); + fassertFailed(50476); + } + } + + void stop() final { + _ioContext.stop(); + } + + std::unique_ptr<ReactorTimer> makeTimer() final { + MONGO_UNREACHABLE; + } + + Date_t now() final { + MONGO_UNREACHABLE; + } + + void schedule(ScheduleMode mode, Task task) final { + if (mode == kDispatch) { + _ioContext.dispatch(std::move(task)); + } else { + _ioContext.post(std::move(task)); + } + } + + bool onReactorThread() const final { + return false; + } + + operator asio::io_context&() { + return _ioContext; + } + +private: + asio::io_context _ioContext; +}; + class ServiceExecutorAdaptiveFixture : public unittest::Test { protected: void setUp() override { auto scOwned = stdx::make_unique<ServiceContextNoop>(); setGlobalServiceContext(std::move(scOwned)); - asioIOCtx = std::make_shared<asio::io_context>(); auto configOwned = stdx::make_unique<TestOptions>(); executorConfig = configOwned.get(); executor = stdx::make_unique<ServiceExecutorAdaptive>( - getGlobalServiceContext(), asioIOCtx, std::move(configOwned)); + getGlobalServiceContext(), std::make_shared<ASIOReactor>(), std::move(configOwned)); } ServiceExecutorAdaptive::Options* executorConfig; diff --git a/src/mongo/transport/session.h b/src/mongo/transport/session.h index a017fc6ce42..bfe45a6cbe2 100644 --- a/src/mongo/transport/session.h +++ b/src/mongo/transport/session.h @@ -114,6 +114,13 @@ public: virtual Future<void> asyncSinkMessage(Message message) = 0; /** + * Cancel any outstanding async operations. There is no way to cancel synchronous calls. + * Futures will finish with an ErrorCodes::CallbackCancelled error if they haven't already + * completed. + */ + virtual void cancelAsyncOperations() = 0; + + /** * This should only be used to detect when the remote host has disappeared without * notice. It does NOT work correctly for ensuring that operations complete or fail * by some deadline. diff --git a/src/mongo/transport/session_asio.h b/src/mongo/transport/session_asio.h index d76f30e6393..ec198bc4dbb 100644 --- a/src/mongo/transport/session_asio.h +++ b/src/mongo/transport/session_asio.h @@ -101,7 +101,7 @@ public: void end() override { if (getSocket().is_open()) { std::error_code ec; - getSocket().cancel(); + cancelAsyncOperations(); getSocket().shutdown(GenericSocket::shutdown_both, ec); if ((ec) && (ec != asio::error::not_connected)) { error() << "Error shutting down socket: " << ec.message(); @@ -139,6 +139,11 @@ public: }); } + void cancelAsyncOperations() override { + LOG(3) << "Cancelling outstanding I/O operations on connection to " << _remote; + getSocket().cancel(); + } + void setTimeout(boost::optional<Milliseconds> timeout) override { invariant(!timeout || timeout->count() > 0); _configuredTimeout = timeout; @@ -180,7 +185,7 @@ protected: friend class TransportLayerASIO; #ifdef MONGO_CONFIG_SSL - Future<void> handshakeSSLForEgress(HostAndPort target) { + Future<void> handshakeSSLForEgress(const HostAndPort& target) { if (!_tl->_egressSSLContext) { return Future<void>::makeReady(Status(ErrorCodes::SSLHandshakeFailed, "SSL requested but SSL support is disabled")); @@ -197,7 +202,7 @@ protected: return _sslSocket->async_handshake(asio::ssl::stream_base::client, UseFuture{}); } }; - return doHandshake().then([ this, target = std::move(target) ] { + return doHandshake().then([this, target] { _ranHandshake = true; auto sslManager = getSSLManager(); @@ -371,7 +376,13 @@ private: #ifdef MONGO_CONFIG_SSL _ranHandshake = true; if (_sslSocket) { - return opportunisticWrite(*_sslSocket, buffers); + if (_blockingMode == Async) { + // Opportunistic writes are broken for async egress SSL (switching between blocking + // and non-blocking mode corrupts the TLS exchange). + return asio::async_write(*_sslSocket, buffers, UseFuture{}); + } else { + return opportunisticWrite(*_sslSocket, buffers); + } } #endif return opportunisticWrite(_socket, buffers); diff --git a/src/mongo/transport/transport_layer.h b/src/mongo/transport/transport_layer.h index 29b4a7bb72c..1b8f502237f 100644 --- a/src/mongo/transport/transport_layer.h +++ b/src/mongo/transport/transport_layer.h @@ -28,9 +28,12 @@ #pragma once +#include <memory> + #include "mongo/base/status.h" #include "mongo/stdx/functional.h" #include "mongo/transport/session.h" +#include "mongo/util/future.h" #include "mongo/util/net/message.h" #include "mongo/util/time_support.h" @@ -39,6 +42,9 @@ namespace transport { enum ConnectSSLMode { kGlobalSSLMode, kEnableSSL, kDisableSSL }; +class Reactor; +using ReactorHandle = std::shared_ptr<Reactor>; + /** * The TransportLayer moves Messages between transport::Endpoints and the database. * This class owns an Acceptor that generates new endpoints from which it can @@ -69,10 +75,9 @@ public: ConnectSSLMode sslMode, Milliseconds timeout) = 0; - virtual void asyncConnect(HostAndPort peer, - ConnectSSLMode sslMode, - Milliseconds timeout, - std::function<void(StatusWith<SessionHandle>)> callback) = 0; + virtual Future<SessionHandle> asyncConnect(HostAndPort peer, + ConnectSSLMode sslMode, + const ReactorHandle& reactor) = 0; /** * Start the TransportLayer. After this point, the TransportLayer will begin accepting active @@ -95,9 +100,79 @@ public: */ virtual Status setup() = 0; + enum WhichReactor { kIngress, kEgress, kNewReactor }; + virtual ReactorHandle getReactor(WhichReactor which) = 0; + protected: TransportLayer() = default; }; +class ReactorTimer { +public: + ReactorTimer() = default; + ReactorTimer(const ReactorTimer&) = delete; + ReactorTimer& operator=(const ReactorTimer&) = delete; + + virtual ~ReactorTimer() = default; + + /* + * Cancel any outstanding calls to waitFor/waitUntil. The future will have + * an ErrorCodes::CallbackCancelled status. + */ + virtual void cancel() = 0; + + /* + * Returns a future that will be filled with Status::OK after the timeout has + * ellapsed or has been cancelled. + */ + + virtual Future<void> waitFor(Milliseconds timeout) = 0; + virtual Future<void> waitUntil(Date_t timeout) = 0; +}; + +class Reactor { +public: + Reactor(const Reactor&) = delete; + Reactor& operator=(const Reactor&) = delete; + + virtual ~Reactor() = default; + + /* + * Run the event loop of the reactor until stop() is called. + */ + virtual void run() noexcept = 0; + virtual void runFor(Milliseconds time) noexcept = 0; + virtual void stop() = 0; + + using Task = stdx::function<void()>; + + enum ScheduleMode { kDispatch, kPost }; + virtual void schedule(ScheduleMode mode, Task task) = 0; + + template <typename Callback> + Future<FutureContinuationResult<Callback>> execute(Callback&& cb) { + Promise<FutureContinuationResult<Callback>> promise; + auto future = promise.getFuture(); + + schedule(kPost, + [ cb = std::forward<Callback>(cb), sp = promise.share() ] { sp.setWith(cb); }); + + return future; + } + + virtual bool onReactorThread() const = 0; + + /* + * Makes a timer tied to this reactor's event loop. Timeout callbacks will be + * executed in a thread calling run() or runFor(). + */ + virtual std::unique_ptr<ReactorTimer> makeTimer() = 0; + virtual Date_t now() = 0; + +protected: + Reactor() = default; +}; + + } // namespace transport } // namespace mongo diff --git a/src/mongo/transport/transport_layer_asio.cpp b/src/mongo/transport/transport_layer_asio.cpp index ff213b66b43..772a4eb189f 100644 --- a/src/mongo/transport/transport_layer_asio.cpp +++ b/src/mongo/transport/transport_layer_asio.cpp @@ -33,6 +33,7 @@ #include "mongo/transport/transport_layer_asio.h" #include <asio.hpp> +#include <asio/system_timer.hpp> #include <boost/algorithm/string.hpp> #include "mongo/config.h" @@ -60,6 +61,131 @@ namespace mongo { namespace transport { +class ASIOReactorTimer final : public ReactorTimer { +public: + explicit ASIOReactorTimer(asio::io_context& ctx) + : _timer(std::make_shared<asio::system_timer>(ctx)) {} + + ~ASIOReactorTimer() { + // The underlying timer won't get destroyed until the last promise from _asyncWait + // has been filled, so cancel the timer so call callbacks get run + cancel(); + } + + void cancel() override { + _timer->cancel(); + } + + Future<void> waitFor(Milliseconds timeout) override { + return _asyncWait([&] { _timer->expires_after(timeout.toSystemDuration()); }); + } + + Future<void> waitUntil(Date_t expiration) override { + return _asyncWait([&] { _timer->expires_at(expiration.toSystemTimePoint()); }); + } + +private: + template <typename Callback> + Future<void> _asyncWait(Callback&& cb) { + try { + cb(); + Promise<void> promise; + auto ret = promise.getFuture(); + _timer->async_wait( + [ promise = promise.share(), timer = _timer ](const std::error_code& ec) mutable { + if (ec) { + promise.setError(errorCodeToStatus(ec)); + } else { + promise.emplaceValue(); + } + }); + return ret; + } catch (asio::system_error& ex) { + return Future<void>::makeReady(errorCodeToStatus(ex.code())); + } + } + + // Destroying an asio::system_timer that has outstanding callbacks from async_wait will cause + // a broken promise - so this is managed by a shared ptr, so that each callback can extend the + // lifetime of the timer to after its been called. + std::shared_ptr<asio::system_timer> _timer; +}; + +class TransportLayerASIO::ASIOReactor final : public Reactor { +public: + ASIOReactor() : _ioContext() {} + + void run() noexcept override { + ThreadIdGuard threadIdGuard(this); + asio::io_context::work work(_ioContext); + try { + _ioContext.run(); + } catch (...) { + severe() << "Uncaught exception in reactor: " << exceptionToStatus(); + fassertFailed(40491); + } + } + + void runFor(Milliseconds time) noexcept override { + ThreadIdGuard threadIdGuard(this); + asio::io_context::work work(_ioContext); + + try { + _ioContext.run_for(time.toSystemDuration()); + } catch (...) { + severe() << "Uncaught exception in reactor: " << exceptionToStatus(); + fassertFailed(50473); + } + } + + void stop() override { + _ioContext.stop(); + } + + std::unique_ptr<ReactorTimer> makeTimer() override { + return std::make_unique<ASIOReactorTimer>(_ioContext); + } + + Date_t now() override { + return Date_t(asio::system_timer::clock_type::now()); + } + + void schedule(ScheduleMode mode, Task task) override { + if (mode == kDispatch) { + _ioContext.dispatch(std::move(task)); + } else { + _ioContext.post(std::move(task)); + } + } + + bool onReactorThread() const override { + return this == _reactorForThread; + } + + operator asio::io_context&() { + return _ioContext; + } + +private: + class ThreadIdGuard { + public: + ThreadIdGuard(TransportLayerASIO::ASIOReactor* reactor) { + _reactorForThread = reactor; + } + + ~ThreadIdGuard() { + _reactorForThread = nullptr; + } + }; + + static thread_local ASIOReactor* _reactorForThread; + + asio::io_context _ioContext; +}; + +thread_local TransportLayerASIO::ASIOReactor* TransportLayerASIO::ASIOReactor::_reactorForThread = + nullptr; + TransportLayerASIO::Options::Options(const ServerGlobalParams* params) : port(params->port), ipList(params->bind_ip), @@ -72,8 +198,9 @@ TransportLayerASIO::Options::Options(const ServerGlobalParams* params) TransportLayerASIO::TransportLayerASIO(const TransportLayerASIO::Options& opts, ServiceEntryPoint* sep) - : _workerIOContext(std::make_shared<asio::io_context>()), - _acceptorIOContext(stdx::make_unique<asio::io_context>()), + : _ingressReactor(std::make_shared<ASIOReactor>()), + _egressReactor(std::make_shared<ASIOReactor>()), + _acceptorReactor(std::make_shared<ASIOReactor>()), #ifdef MONGO_CONFIG_SSL _ingressSSLContext(nullptr), _egressSSLContext(nullptr), @@ -84,11 +211,74 @@ TransportLayerASIO::TransportLayerASIO(const TransportLayerASIO::Options& opts, TransportLayerASIO::~TransportLayerASIO() = default; +using Resolver = asio::ip::tcp::resolver; +class WrappedResolver { +public: + using Flags = Resolver::flags; + using Results = Resolver::results_type; + + explicit WrappedResolver(asio::io_context& ioCtx) : _resolver(ioCtx) {} + + Future<Results> resolve(const HostAndPort& peer, Flags flags, bool enableIPv6) { + Results results; + + std::error_code ec; + auto port = std::to_string(peer.port()); + if (enableIPv6) { + results = _resolver.resolve(peer.host(), port, flags, ec); + } else { + results = _resolver.resolve(asio::ip::tcp::v4(), peer.host(), port, flags, ec); + } + + if (ec) { + return _makeFuture(errorCodeToStatus(ec), peer); + } else { + return _makeFuture(results, peer); + } + } + + Future<Results> asyncResolve(const HostAndPort& peer, Flags flags, bool enableIPv6) { + auto port = std::to_string(peer.port()); + Future<Results> ret; + if (enableIPv6) { + ret = _resolver.async_resolve(peer.host(), port, flags, UseFuture{}); + } else { + ret = + _resolver.async_resolve(asio::ip::tcp::v4(), peer.host(), port, flags, UseFuture{}); + } + + return std::move(ret) + .onError([this, peer](Status status) { return _makeFuture(status, peer); }) + .then([this, peer](Results results) { return _makeFuture(results, peer); }); + } + + void cancel() { + _resolver.cancel(); + } + +private: + Future<Results> _makeFuture(StatusWith<Results> results, const HostAndPort& peer) { + if (!results.isOK()) { + return Status{ErrorCodes::HostNotFound, + str::stream() << "Could not find address for " << peer << ": " + << results.getStatus()}; + } else if (results.getValue().empty()) { + return Status{ErrorCodes::HostNotFound, + str::stream() << "Could not find address for " << peer}; + } else { + return std::move(results.getValue()); + } + } + + Resolver _resolver; +}; + + StatusWith<SessionHandle> TransportLayerASIO::connect(HostAndPort peer, ConnectSSLMode sslMode, Milliseconds timeout) { std::error_code ec; - GenericSocket sock(*_workerIOContext); + GenericSocket sock(*_egressReactor); #ifndef _WIN32 if (mongoutils::str::contains(peer.host(), '/')) { invariant(!peer.hasPort()); @@ -102,31 +292,7 @@ StatusWith<SessionHandle> TransportLayerASIO::connect(HostAndPort peer, } #endif - using Resolver = asio::ip::tcp::resolver; - Resolver resolver(*_workerIOContext); - std::string portNumberStr = std::to_string(peer.port()); - auto doResolve = [&](auto resolverFlags) -> StatusWith<Resolver::iterator> { - // If IPv6 is disabled, then we should specify that we only want IPv4 addresses, otherwise - // we should do a normal AF_UNSPEC resolution to get both IPv4/IPv6 - Resolver::iterator resolverIt; - if (_listenerOptions.enableIPv6) { - resolverIt = resolver.resolve(peer.host(), portNumberStr, resolverFlags, ec); - } else { - resolverIt = resolver.resolve( - asio::ip::tcp::v4(), peer.host(), portNumberStr, resolverFlags, ec); - } - - if (ec) { - return {ErrorCodes::HostNotFound, - str::stream() << "Could not find address for " << peer.host() << ": " - << ec.message()}; - } else if (resolverIt == Resolver::iterator()) { - return {ErrorCodes::HostNotFound, - str::stream() << "Could not find address for " << peer.host()}; - } - - return resolverIt; - }; + WrappedResolver resolver(*_egressReactor); // We always want to resolve the "service" (port number) as a numeric. // @@ -140,10 +306,13 @@ StatusWith<SessionHandle> TransportLayerASIO::connect(HostAndPort peer, // // Then, if the numeric (IP address) lookup failed, we fall back to DNS or return the error // from the resolver. - auto swResolverIt = doResolve(resolverFlags | Resolver::numeric_host); + auto swResolverIt = + resolver.resolve(peer, resolverFlags | Resolver::numeric_host, _listenerOptions.enableIPv6) + .getNoThrow(); if (!swResolverIt.isOK()) { if (swResolverIt == ErrorCodes::HostNotFound) { - swResolverIt = doResolve(resolverFlags); + swResolverIt = + resolver.resolve(peer, resolverFlags, _listenerOptions.enableIPv6).getNoThrow(); if (!swResolverIt.isOK()) { return swResolverIt.getStatus(); } @@ -183,7 +352,7 @@ StatusWith<SessionHandle> TransportLayerASIO::connect(HostAndPort peer, template <typename Endpoint> StatusWith<TransportLayerASIO::ASIOSessionHandle> TransportLayerASIO::_doSyncConnect( Endpoint endpoint, const HostAndPort& peer, const Milliseconds& timeout) { - GenericSocket sock(*_workerIOContext); + GenericSocket sock(*_egressReactor); std::error_code ec; sock.open(endpoint.protocol()); sock.non_blocking(true); @@ -210,11 +379,69 @@ StatusWith<TransportLayerASIO::ASIOSessionHandle> TransportLayerASIO::_doSyncCon return std::make_shared<ASIOSession>(this, std::move(sock)); } -void TransportLayerASIO::asyncConnect(HostAndPort peer, - ConnectSSLMode sslMode, - Milliseconds timeout, - std::function<void(StatusWith<SessionHandle>)> callback) { - MONGO_UNREACHABLE; +Future<SessionHandle> TransportLayerASIO::asyncConnect(HostAndPort peer, + ConnectSSLMode sslMode, + const ReactorHandle& reactor) { + struct AsyncConnectState { + AsyncConnectState(HostAndPort peer, asio::io_context& context) + : socket(context), resolver(context), peer(std::move(peer)) {} + + Future<SessionHandle> finish() { + return SessionHandle(std::move(session)); + } + + GenericSocket socket; + WrappedResolver resolver; + const HostAndPort peer; + TransportLayerASIO::ASIOSessionHandle session; + }; + + auto reactorImpl = checked_cast<ASIOReactor*>(reactor.get()); + auto connector = std::make_shared<AsyncConnectState>(std::move(peer), *reactorImpl); + + if (connector->peer.host().empty()) { + return Status{ErrorCodes::HostNotFound, "Hostname or IP address to connect to is empty"}; + } + + // We always want to resolve the "service" (port number) as a numeric. + // + // We intentionally don't set the Resolver::address_configured flag because it might prevent us + // from connecting to localhost on hosts with only a loopback interface (see SERVER-1579). + const auto resolverFlags = Resolver::numeric_service; + return connector->resolver + .asyncResolve( + connector->peer, resolverFlags | Resolver::numeric_host, _listenerOptions.enableIPv6) + .onError([this, connector, resolverFlags](Status status) { + return connector->resolver.asyncResolve( + connector->peer, resolverFlags, _listenerOptions.enableIPv6); + }) + .then([connector](WrappedResolver::Results results) { + connector->socket.open(results->endpoint().protocol()); + connector->socket.non_blocking(true); + return connector->socket.async_connect(results->endpoint(), UseFuture{}); + }) + .then([this, connector, sslMode]() { + connector->session = std::make_shared<ASIOSession>(this, std::move(connector->socket)); + connector->session->ensureAsync(); +#ifndef MONGO_CONFIG_SSL + if (sslMode == kEnableSSL) { + uasserted(ErrorCodes::InvalidSSLConfiguration, "SSL requested but not supported"); + } +#else + auto globalSSLMode = _sslMode(); + if (sslMode == kEnableSSL || + (sslMode == kGlobalSSLMode && ((globalSSLMode == SSLParams::SSLMode_preferSSL) || + (globalSSLMode == SSLParams::SSLMode_requireSSL)))) { + return connector->session->handshakeSSLForEgress(connector->peer).then([connector] { + return connector->finish(); + }); + } +#endif + return connector->finish(); + }) + .onError([connector](Status status) -> Future<SessionHandle> { + return status.withContext(str::stream() << "Error connecting to " << connector->peer); + }); } Status TransportLayerASIO::setup() { @@ -273,7 +500,7 @@ Status TransportLayerASIO::setup() { fassertFailedNoTrace(40488); } - GenericAcceptor acceptor(*_acceptorIOContext); + GenericAcceptor acceptor(*_acceptorReactor); acceptor.open(endpoint.protocol()); acceptor.set_option(GenericAcceptor::reuse_address(true)); if (addr.getType() == AF_INET6) { @@ -365,13 +592,7 @@ Status TransportLayerASIO::start() { _listenerThread = stdx::thread([this] { setThreadName("listener"); while (_running.load()) { - asio::io_context::work work(*_acceptorIOContext); - try { - _acceptorIOContext->run(); - } catch (...) { - severe() << "Uncaught exception in the listener: " << exceptionToStatus(); - fassertFailed(40491); - } + _acceptorReactor->run(); } }); @@ -415,13 +636,22 @@ void TransportLayerASIO::shutdown() { // Otherwise the ServiceExecutor may need to continue running the io_context to drain running // connections, so we just cancel the acceptors and return. if (_listenerThread.joinable()) { - _acceptorIOContext->stop(); + _acceptorReactor->stop(); _listenerThread.join(); } } -const std::shared_ptr<asio::io_context>& TransportLayerASIO::getIOContext() { - return _workerIOContext; +ReactorHandle TransportLayerASIO::getReactor(WhichReactor which) { + switch (which) { + case TransportLayer::kIngress: + return _ingressReactor; + case TransportLayer::kEgress: + return _egressReactor; + case TransportLayer::kNewReactor: + return std::make_shared<ASIOReactor>(); + } + + MONGO_UNREACHABLE; } void TransportLayerASIO::_acceptConnection(GenericAcceptor& acceptor) { @@ -442,7 +672,7 @@ void TransportLayerASIO::_acceptConnection(GenericAcceptor& acceptor) { _acceptConnection(acceptor); }; - acceptor.async_accept(*_workerIOContext, std::move(acceptCb)); + acceptor.async_accept(*_ingressReactor, std::move(acceptCb)); } #ifdef MONGO_CONFIG_SSL diff --git a/src/mongo/transport/transport_layer_asio.h b/src/mongo/transport/transport_layer_asio.h index da2bdab547e..85ead5799b3 100644 --- a/src/mongo/transport/transport_layer_asio.h +++ b/src/mongo/transport/transport_layer_asio.h @@ -74,12 +74,12 @@ class TransportLayerASIO final : public TransportLayer { public: struct Options { - explicit Options(const ServerGlobalParams* params); - Options() = default; - constexpr static auto kIngress = 0x1; constexpr static auto kEgress = 0x10; + explicit Options(const ServerGlobalParams* params); + Options() = default; + int mode = kIngress | kEgress; bool isIngress() const { @@ -108,24 +108,26 @@ public: StatusWith<SessionHandle> connect(HostAndPort peer, ConnectSSLMode sslMode, Milliseconds timeout) final; - void asyncConnect(HostAndPort peer, - ConnectSSLMode sslMode, - Milliseconds timeout, - std::function<void(StatusWith<SessionHandle>)> callback) final; + + Future<SessionHandle> asyncConnect(HostAndPort peer, + ConnectSSLMode sslMode, + const ReactorHandle& reactor) final; Status setup() final; + + ReactorHandle getReactor(WhichReactor which) final; + Status start() final; void shutdown() final; - const std::shared_ptr<asio::io_context>& getIOContext(); - int listenerPort() const { return _listenerPort; } private: class ASIOSession; + class ASIOReactor; using ASIOSessionHandle = std::shared_ptr<ASIOSession>; using ConstASIOSessionHandle = std::shared_ptr<const ASIOSession>; @@ -144,31 +146,31 @@ private: stdx::mutex _mutex; - // There are two IO contexts that are used by TransportLayerASIO. The _workerIOContext - // contains all the accepted sockets and all normal networking activity. The - // _acceptorIOContext contains all the sockets in _acceptors. + // There are three reactors that are used by TransportLayerASIO. The _ingressReactor contains + // all the accepted sockets and all ingress networking activity. The _acceptorReactor contains + // all the sockets in _acceptors. The _egressReactor contains egress connections. // - // TransportLayerASIO should never call run() on the _workerIOContext. + // TransportLayerASIO should never call run() on the _ingressReactor. // In synchronous mode, this will cause a massive performance degradation due to // unnecessary wakeups on the asio thread for sockets we don't intend to interact // with asynchronously. The additional IO context avoids registering those sockets // with the acceptors epoll set, thus avoiding those wakeups. Calling run will // undo that benefit. // - // TransportLayerASIO should run its own thread that calls run() on the _acceptorIOContext + // TransportLayerASIO should run its own thread that calls run() on the _acceptorReactor // to process calls to async_accept - this is the equivalent of the "listener" thread in // other TransportLayers. // // The underlying problem that caused this is here: // https://github.com/chriskohlhoff/asio/issues/240 // - // It is important that the io_context be declared before the - // vector of acceptors (or any other state that is associated with - // the io_context), so that we destroy any existing acceptors or - // other io_service associated state before we drop the refcount - // on the io_context, which may destroy it. - std::shared_ptr<asio::io_context> _workerIOContext; - std::unique_ptr<asio::io_context> _acceptorIOContext; + // It is important that the reactors be declared before the vector of acceptors (or any other + // state that is associated with the reactors), so that we destroy any existing acceptors or + // other reactor associated state before we drop the refcount on the reactor, which may destroy + // it. + std::shared_ptr<ASIOReactor> _ingressReactor; + std::shared_ptr<ASIOReactor> _egressReactor; + std::shared_ptr<ASIOReactor> _acceptorReactor; #ifdef MONGO_CONFIG_SSL std::unique_ptr<asio::ssl::context> _ingressSSLContext; diff --git a/src/mongo/transport/transport_layer_manager.cpp b/src/mongo/transport/transport_layer_manager.cpp index f53250e91bd..f92538c773a 100644 --- a/src/mongo/transport/transport_layer_manager.cpp +++ b/src/mongo/transport/transport_layer_manager.cpp @@ -65,11 +65,14 @@ StatusWith<SessionHandle> TransportLayerManager::connect(HostAndPort peer, return _tls.front()->connect(peer, sslMode, timeout); } -void TransportLayerManager::asyncConnect(HostAndPort peer, - ConnectSSLMode sslMode, - Milliseconds timeout, - std::function<void(StatusWith<SessionHandle>)> callback) { - MONGO_UNREACHABLE; +Future<SessionHandle> TransportLayerManager::asyncConnect(HostAndPort peer, + ConnectSSLMode sslMode, + const ReactorHandle& reactor) { + return _tls.front()->asyncConnect(peer, sslMode, reactor); +} + +ReactorHandle TransportLayerManager::getReactor(WhichReactor which) { + return _tls.front()->getReactor(which); } // TODO Right now this and setup() leave TLs started if there's an error. In practice the server @@ -112,6 +115,16 @@ Status TransportLayerManager::addAndStartTransportLayer(std::unique_ptr<Transpor return ptr->start(); } +std::unique_ptr<TransportLayer> TransportLayerManager::makeAndStartDefaultEgressTransportLayer() { + transport::TransportLayerASIO::Options opts(&serverGlobalParams); + opts.mode = transport::TransportLayerASIO::Options::kEgress; + + auto ret = stdx::make_unique<transport::TransportLayerASIO>(opts, nullptr); + uassertStatusOK(ret->setup()); + uassertStatusOK(ret->start()); + return std::unique_ptr<TransportLayer>(std::move(ret)); +} + std::unique_ptr<TransportLayer> TransportLayerManager::createWithConfig( const ServerGlobalParams* config, ServiceContext* ctx) { std::unique_ptr<TransportLayer> transportLayer; @@ -129,8 +142,9 @@ std::unique_ptr<TransportLayer> TransportLayerManager::createWithConfig( auto transportLayerASIO = stdx::make_unique<transport::TransportLayerASIO>(opts, sep); if (config->serviceExecutor == "adaptive") { + auto reactor = transportLayerASIO->getReactor(TransportLayer::kIngress); ctx->setServiceExecutor( - stdx::make_unique<ServiceExecutorAdaptive>(ctx, transportLayerASIO->getIOContext())); + stdx::make_unique<ServiceExecutorAdaptive>(ctx, std::move(reactor))); } else if (config->serviceExecutor == "synchronous") { ctx->setServiceExecutor(stdx::make_unique<ServiceExecutorSynchronous>(ctx)); } diff --git a/src/mongo/transport/transport_layer_manager.h b/src/mongo/transport/transport_layer_manager.h index 70f54b10652..d90386829e2 100644 --- a/src/mongo/transport/transport_layer_manager.h +++ b/src/mongo/transport/transport_layer_manager.h @@ -60,15 +60,16 @@ public: StatusWith<SessionHandle> connect(HostAndPort peer, ConnectSSLMode sslMode, Milliseconds timeout) override; - void asyncConnect(HostAndPort peer, - ConnectSSLMode sslMode, - Milliseconds timeout, - std::function<void(StatusWith<SessionHandle>)> callback) override; + Future<SessionHandle> asyncConnect(HostAndPort peer, + ConnectSSLMode sslMode, + const ReactorHandle& reactor) override; Status start() override; void shutdown() override; Status setup() override; + ReactorHandle getReactor(WhichReactor which) override; + // TODO This method is not called anymore, but may be useful to add new TransportLayers // to the manager after it's been created. Status addAndStartTransportLayer(std::unique_ptr<TransportLayer> tl); @@ -85,6 +86,8 @@ public: static std::unique_ptr<TransportLayer> createWithConfig(const ServerGlobalParams* config, ServiceContext* ctx); + static std::unique_ptr<TransportLayer> makeAndStartDefaultEgressTransportLayer(); + private: template <typename Callable> void _foreach(Callable&& cb) const; diff --git a/src/mongo/transport/transport_layer_mock.cpp b/src/mongo/transport/transport_layer_mock.cpp index 0fc6ce7964c..6b23127e80f 100644 --- a/src/mongo/transport/transport_layer_mock.cpp +++ b/src/mongo/transport/transport_layer_mock.cpp @@ -68,10 +68,9 @@ StatusWith<SessionHandle> TransportLayerMock::connect(HostAndPort peer, MONGO_UNREACHABLE; } -void TransportLayerMock::asyncConnect(HostAndPort peer, - ConnectSSLMode sslMode, - Milliseconds timeout, - std::function<void(StatusWith<SessionHandle>)> callback) { +Future<SessionHandle> TransportLayerMock::asyncConnect(HostAndPort peer, + ConnectSSLMode sslMode, + const ReactorHandle& reactor) { MONGO_UNREACHABLE; } @@ -89,6 +88,10 @@ void TransportLayerMock::shutdown() { } } +ReactorHandle TransportLayerMock::getReactor(WhichReactor which) { + return nullptr; +} + bool TransportLayerMock::inShutdown() const { return _shutdown; } diff --git a/src/mongo/transport/transport_layer_mock.h b/src/mongo/transport/transport_layer_mock.h index 06a9cd3f37c..001c2bd76fa 100644 --- a/src/mongo/transport/transport_layer_mock.h +++ b/src/mongo/transport/transport_layer_mock.h @@ -56,16 +56,18 @@ public: StatusWith<SessionHandle> connect(HostAndPort peer, ConnectSSLMode sslMode, Milliseconds timeout) override; - void asyncConnect(HostAndPort peer, - ConnectSSLMode sslMode, - Milliseconds timeout, - std::function<void(StatusWith<SessionHandle>)> callback) override; + Future<SessionHandle> asyncConnect(HostAndPort peer, + ConnectSSLMode sslMode, + const ReactorHandle& reactor) override; Status setup() override; Status start() override; void shutdown() override; bool inShutdown() const; + + virtual ReactorHandle getReactor(WhichReactor which) override; + // Set to a factory function to use your own session type. std::function<SessionHandle(TransportLayer*)> createSessionHook; diff --git a/src/mongo/util/future.h b/src/mongo/util/future.h index 9fa51d173b4..978dca58106 100644 --- a/src/mongo/util/future.h +++ b/src/mongo/util/future.h @@ -652,6 +652,10 @@ public: Future(const Future&) = delete; Future& operator=(const Future&) = delete; + /* implicit */ Future(T val) : Future(makeReady(std::move(val))) {} + /* implicit */ Future(Status status) : Future(makeReady(std::move(status))) {} + /* implicit */ Future(StatusWith<T> sw) : Future(makeReady(std::move(sw))) {} + /** * Make a ready Future<T> from a value for cases where you don't need to wait asynchronously. * @@ -1144,7 +1148,8 @@ class MONGO_WARN_UNUSED_RESULT_CLASS future_details::Future<void> { public: using value_type = void; - Future() = default; + /* implicit */ Future() : Future(makeReady()) {} + /* implicit */ Future(Status status) : Future(makeReady(std::move(status))) {} static Future<void> makeReady() { return Future<FakeVoid>::makeReady(FakeVoid{}); |