From 99f9a9588825842040c945a941d2b34554b9bea9 Mon Sep 17 00:00:00 2001 From: Amirsaman Memaripour Date: Tue, 21 Dec 2021 16:49:40 +0000 Subject: SERVER-54900 Cancel ASIO session when SSL handshake times out (cherry picked from commit 263c0631c8001a8cdb42aff720b8a49a621754dd) --- src/mongo/transport/SConscript | 1 + src/mongo/transport/transport_layer_asio.cpp | 97 ++++++++++++++- src/mongo/transport/transport_layer_asio.h | 53 +++++++++ src/mongo/transport/transport_layer_asio_test.cpp | 137 +++++++++++++++++++++- 4 files changed, 286 insertions(+), 2 deletions(-) diff --git a/src/mongo/transport/SConscript b/src/mongo/transport/SConscript index 51ede8d2e85..0c7fe0917c8 100644 --- a/src/mongo/transport/SConscript +++ b/src/mongo/transport/SConscript @@ -192,6 +192,7 @@ tlEnv.CppUnitTest( ], LIBDEPS=[ '$BUILD_DIR/mongo/base', + '$BUILD_DIR/mongo/client/clientdriver_network', '$BUILD_DIR/mongo/db/dbmessage', '$BUILD_DIR/mongo/db/service_context', '$BUILD_DIR/mongo/db/service_context_test_fixture', diff --git a/src/mongo/transport/transport_layer_asio.cpp b/src/mongo/transport/transport_layer_asio.cpp index cda01dbfc38..a857b28d9ce 100644 --- a/src/mongo/transport/transport_layer_asio.cpp +++ b/src/mongo/transport/transport_layer_asio.cpp @@ -33,6 +33,7 @@ #include "mongo/transport/transport_layer_asio.h" +#include #include #include @@ -59,6 +60,7 @@ #include "mongo/util/net/ssl_manager.h" #include "mongo/util/net/ssl_options.h" #include "mongo/util/options_parser/startup_options.h" +#include "mongo/util/strong_weak_finish_line.h" #ifdef MONGO_CONFIG_SSL #include "mongo/util/net/ssl.hpp" @@ -294,6 +296,60 @@ TransportLayerASIO::Options::Options(const ServerGlobalParams* params, maxConns(params->maxConns) { } +TransportLayerASIO::TimerService::TimerService() + : _reactor(std::make_shared()) {} + +TransportLayerASIO::TimerService::~TimerService() { + stop(); +} + +void TransportLayerASIO::TimerService::start() { + // Skip the expensive lock acquisition and `compareAndSwap` in the common path. + if (MONGO_likely(_state.load() != State::kInitialized)) + return; + + // The following ensures only one thread continues to spawn a thread to run the reactor. It also + // ensures concurrent `start()` and `stop()` invocations are serialized. Holding the lock + // guarantees that the following runs either before or after running `stop()`. Note that using + // `compareAndSwap` while holding the lock is for simplicity and not necessary. + auto lk = stdx::lock_guard(_mutex); + auto precondition = State::kInitialized; + if (_state.compareAndSwap(&precondition, State::kStarted)) { + _thread = stdx::thread([reactor = _reactor] { + LOGV2_INFO(5490002, "Started a new thread for the timer service"); + reactor->run(); + LOGV2_INFO(5490003, "Returning from the timer service thread"); + }); + } +} + +void TransportLayerASIO::TimerService::stop() { + // It's possible for `stop()` to be called without `start()` having been called (or for them to + // be called concurrently), so we only proceed with stopping the reactor and joining the thread + // if we've already transitioned to the `kStarted` state. + auto lk = stdx::lock_guard(_mutex); + if (_state.swap(State::kStopped) != State::kStarted) + return; + + _reactor->stop(); + _thread.join(); +} + +std::unique_ptr TransportLayerASIO::TimerService::makeTimer() { + return _getReactor()->makeTimer(); +} + +Date_t TransportLayerASIO::TimerService::now() { + return _getReactor()->now(); +} + +Reactor* TransportLayerASIO::TimerService::_getReactor() { + // TODO SERVER-57253 We can start this service as part of starting `TransportLayerASIO`. + // Then, we can remove the following invocation of `start()`. + start(); + return _reactor.get(); +} + TransportLayerASIO::TransportLayerASIO(const TransportLayerASIO::Options& opts, ServiceEntryPoint* sep, const WireSpec& wireSpec) @@ -302,7 +358,8 @@ TransportLayerASIO::TransportLayerASIO(const TransportLayerASIO::Options& opts, _egressReactor(std::make_shared()), _acceptorReactor(std::make_shared()), _sep(sep), - _listenerOptions(opts) {} + _listenerOptions(opts), + _timerService(std::make_unique()) {} TransportLayerASIO::~TransportLayerASIO() = default; @@ -539,13 +596,47 @@ StatusWith TransportLayerASIO::connect( (sslMode == kGlobalSSLMode && ((globalSSLMode == SSLParams::SSLMode_preferSSL) || (globalSSLMode == SSLParams::SSLMode_requireSSL)))) { + // The handshake is complete once either of the following passes the finish line: + // - The thread running the handshake returns from `handshakeSSLForEgress`. + // - The thread running `TimerService` cancels the handshake due to a timeout. + auto finishLine = std::make_shared(2); + + // Schedules a task to cancel the synchronous handshake if it does not complete before the + // specified timeout. + auto timer = _timerService->makeTimer(); +#ifndef _WIN32 + // TODO SERVER-62035: enable the following on Windows. + if (timeout > Milliseconds(0)) { + timer->waitUntil(_timerService->now() + timeout) + .getAsync([finishLine, session](Status status) { + if (status.isOK() && finishLine->arriveStrongly()) + session->end(); + }); + } +#endif + Date_t timeBefore = Date_t::now(); auto sslStatus = session->handshakeSSLForEgress(peer).getNoThrow(); Date_t timeAfter = Date_t::now(); + if (timeAfter - timeBefore > kSlowOperationThreshold) { networkCounter.incrementNumSlowSSLOperations(); } + if (finishLine->arriveStrongly()) { + timer->cancel(); + } else if (!sslStatus.isOK()) { + // We only take this path if the handshake times out. Overwrite the socket exception + // with a network timeout. + auto errMsg = fmt::format("SSL handshake timed out after {}", + (timeAfter - timeBefore).toString()); + sslStatus = Status(ErrorCodes::NetworkTimeout, errMsg); + LOGV2(5490001, + "Timed out while running handshake", + "peer"_attr = peer, + "timeout"_attr = timeout); + } + if (!sslStatus.isOK()) { return sslStatus; } @@ -1217,6 +1308,10 @@ void TransportLayerASIO::shutdown() { return; } + lk.unlock(); + _timerService->stop(); + lk.lock(); + if (!_listenerOptions.isIngress()) { // Egress only reactors never start a listener return; diff --git a/src/mongo/transport/transport_layer_asio.h b/src/mongo/transport/transport_layer_asio.h index adedd3eb308..65f43e44053 100644 --- a/src/mongo/transport/transport_layer_asio.h +++ b/src/mongo/transport/transport_layer_asio.h @@ -117,6 +117,57 @@ public: size_t maxConns = DEFAULT_MAX_CONN; // maximum number of active connections }; + /** + * A service, internal to `TransportLayerASIO`, that allows creating timers and running `Future` + * continuations when a timeout occurs. This allows setting up timeouts for synchronous + * operations, such as a synchronous SSL handshake. A separate thread is assigned to run these + * timers to: + * - Ensure there is always a thread running the timers, regardless of using a synchronous or + * asynchronous listener. + * - Avoid any performance implications on other reactors (e.g., the `egressReactor`). + * The public visibility is only for testing purposes and this service is not intended to be + * used outside `TransportLayerASIO`. + */ + class TimerService { + public: + TimerService(); + ~TimerService(); + + /** + * Spawns a thread to run the reactor. + * Immediately returns if the service has already started. + * May be called more than once, and concurrently. + */ + void start(); + + /** + * Stops the reactor and joins the thread. + * Immediately returns if the service is not started, or already stopped. + * May be called more than once, and concurrently. + */ + void stop(); + + std::unique_ptr makeTimer(); + + Date_t now(); + + private: + Reactor* _getReactor(); + + const std::shared_ptr _reactor; + + // Serializes invocations of `start()` and `stop()`, and allows updating `_state` and + // `_thread` as a single atomic operation. + Mutex _mutex = MONGO_MAKE_LATCH("TransportLayerASIO::TimerService::_mutex"); + + // State transitions: `kInitialized` --> `kStarted` --> `kStopped` + // |_______________________________^ + enum class State { kInitialized, kStarted, kStopped }; + AtomicWord _state; + + stdx::thread _thread; + }; + TransportLayerASIO(const Options& opts, ServiceEntryPoint* sep, const WireSpec& wireSpec = WireSpec::instance()); @@ -247,6 +298,8 @@ private: int _listenerPort = 0; bool _isShutdown = false; + + const std::unique_ptr _timerService; }; } // namespace transport diff --git a/src/mongo/transport/transport_layer_asio_test.cpp b/src/mongo/transport/transport_layer_asio_test.cpp index 60138912f22..e5ca6fe201f 100644 --- a/src/mongo/transport/transport_layer_asio_test.cpp +++ b/src/mongo/transport/transport_layer_asio_test.cpp @@ -30,6 +30,7 @@ #include "mongo/transport/transport_layer_asio.h" +#include #include #include #include @@ -38,6 +39,8 @@ #include #include +#include "mongo/client/dbclient_connection.h" +#include "mongo/config.h" #include "mongo/db/server_options.h" #include "mongo/db/service_context_test_fixture.h" #include "mongo/logv2/log.h" @@ -55,7 +58,9 @@ #include "mongo/util/concurrency/notification.h" #include "mongo/util/net/sock.h" #include "mongo/util/scopeguard.h" +#include "mongo/util/static_immortal.h" #include "mongo/util/synchronized_value.h" +#include "mongo/util/thread_context.h" #include "mongo/util/time_support.h" namespace mongo { @@ -306,8 +311,8 @@ public: } private: - std::unique_ptr _tla; MockSEP _sep; + std::unique_ptr _tla; }; TEST(TransportLayerASIO, ListenerPortZeroTreatedAsEphemeral) { @@ -563,6 +568,47 @@ TEST(TransportLayerASIO, ConfirmSocketSetOptionOnResetConnections) { class TransportLayerASIOWithServiceContextTest : public ServiceContextTest { public: + /** + * `ThreadCounter` and `ThreadToken` allow tracking the number of active (running) threads. + * For each thread, a `ThreadToken` is created. The token notifies `ThreadCounter` about + * creation and destruction of its associated thread. This allows maintaining the number of + * active threads at any point during the execution of this unit-test. + */ + class ThreadCounter { + public: + static ThreadCounter& get() { + static StaticImmortal instance; + return *instance; + } + + int64_t count() const { + const auto count = _count.load(); + invariant(count > 0); + return count; + } + + void onCreateThread() { + _count.fetchAndAdd(1); + } + + void onDestroyThread() { + _count.fetchAndAdd(-1); + } + + private: + AtomicWord _count; + }; + + struct ThreadToken { + ThreadToken() { + ThreadCounter::get().onCreateThread(); + } + + ~ThreadToken() { + ThreadCounter::get().onDestroyThread(); + } + }; + void setUp() override { auto sep = std::make_unique(); auto tl = makeTLA(sep.get()); @@ -580,10 +626,99 @@ public: } }; +const auto getThreadToken = + ThreadContext::declareDecoration(); + +TEST_F(TransportLayerASIOWithServiceContextTest, TimerServiceDoesNotSpawnThreadsBeforeStart) { + const auto beforeThreadCount = ThreadCounter::get().count(); + transport::TransportLayerASIO::TimerService service; + // Note that the following is a best-effort and not deterministic as we don't have control over + // when threads may start running and advance the thread count. + const auto afterThreadCount = ThreadCounter::get().count(); + ASSERT_EQ(beforeThreadCount, afterThreadCount); +} + +TEST_F(TransportLayerASIOWithServiceContextTest, TimerServiceOneShotStart) { + const auto beforeThreadCount = ThreadCounter::get().count(); + transport::TransportLayerASIO::TimerService service; + service.start(); + LOGV2(5490004, "Waiting for the timer thread to start", "threads"_attr = beforeThreadCount); + while (ThreadCounter::get().count() == beforeThreadCount) { + sleepFor(Milliseconds(1)); + } + const auto afterThreadCount = ThreadCounter::get().count(); + LOGV2(5490005, "Returned from waiting for the timer thread", "threads"_attr = afterThreadCount); + + // Start the service a few times and verify that the thread count has not changed. Note that the + // following is a best-effort and not deterministic as we don't have control over when threads + // may start running and advance the thread count. + service.start(); + service.start(); + service.start(); + ASSERT_EQ(afterThreadCount, ThreadCounter::get().count()); +} + +TEST_F(TransportLayerASIOWithServiceContextTest, TimerServiceDoesNotStartAfterStop) { + const auto beforeThreadCount = ThreadCounter::get().count(); + transport::TransportLayerASIO::TimerService service; + service.stop(); + service.start(); + const auto afterThreadCount = ThreadCounter::get().count(); + // The test would fail if `start` proceeds to spawn a thread for `service`. + ASSERT_EQ(beforeThreadCount, afterThreadCount); +} + +TEST_F(TransportLayerASIOWithServiceContextTest, TimerServiceCanStopMoreThanOnce) { + // Verifying that it is safe to have multiple calls to `stop()`. + { + transport::TransportLayerASIO::TimerService service; + service.start(); + service.stop(); + service.stop(); + } + { + transport::TransportLayerASIO::TimerService service; + service.stop(); + service.stop(); + } +} + TEST_F(TransportLayerASIOWithServiceContextTest, TransportStartAfterShutDown) { tla().shutdown(); ASSERT_EQ(tla().start(), transport::TransportLayer::ShutdownStatus); } +#ifdef MONGO_CONFIG_SSL +#ifndef _WIN32 +// TODO SERVER-62035: enable the following on Windows. +TEST_F(TransportLayerASIOWithServiceContextTest, ShutdownDuringSSLHandshake) { + /** + * Creates a server and a client thread: + * - The server listens for incoming connections, but doesn't participate in SSL handshake. + * - The client connects to the server, and is configured to perform SSL handshake. + * The server never writes on the socket in response to the handshake request, thus the client + * should block until it is timed out. + * The goal is to simulate a server crash, and verify the behavior of the client, during the + * handshake process. + */ + int port = tla().listenerPort(); + + DBClientConnection conn; + conn.setSoTimeout(1); // 1 second timeout + + TransientSSLParams params; + params.sslClusterPEMPayload = [] { + std::ifstream input("jstests/libs/client.pem"); + std::string str((std::istreambuf_iterator(input)), std::istreambuf_iterator()); + return str; + }(); + params.targetedClusterConnectionString = ConnectionString::forLocal(); + + auto status = conn.connectSocketOnly({"localhost", port}, std::move(params)); + ASSERT_EQ(status, ErrorCodes::HostUnreachable); +} +#endif // _WIN32 +#endif // MONGO_CONFIG_SSL + } // namespace } // namespace mongo -- cgit v1.2.1