summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAmirsaman Memaripour <amirsaman.memaripour@mongodb.com>2021-12-21 16:49:40 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2022-12-05 20:29:48 +0000
commit99f9a9588825842040c945a941d2b34554b9bea9 (patch)
treec6a9e2d1e0c9e47da4611e494fb9aab6e7d90ec0
parent14f545898e7127f8afd9f81517eecbaabf575cad (diff)
downloadmongo-99f9a9588825842040c945a941d2b34554b9bea9.tar.gz
SERVER-54900 Cancel ASIO session when SSL handshake times out
(cherry picked from commit 263c0631c8001a8cdb42aff720b8a49a621754dd)
-rw-r--r--src/mongo/transport/SConscript1
-rw-r--r--src/mongo/transport/transport_layer_asio.cpp97
-rw-r--r--src/mongo/transport/transport_layer_asio.h53
-rw-r--r--src/mongo/transport/transport_layer_asio_test.cpp137
4 files changed, 286 insertions, 2 deletions
diff --git a/src/mongo/transport/SConscript b/src/mongo/transport/SConscript
index 51ede8d2e85..0c7fe0917c8 100644
--- a/src/mongo/transport/SConscript
+++ b/src/mongo/transport/SConscript
@@ -192,6 +192,7 @@ tlEnv.CppUnitTest(
],
LIBDEPS=[
'$BUILD_DIR/mongo/base',
+ '$BUILD_DIR/mongo/client/clientdriver_network',
'$BUILD_DIR/mongo/db/dbmessage',
'$BUILD_DIR/mongo/db/service_context',
'$BUILD_DIR/mongo/db/service_context_test_fixture',
diff --git a/src/mongo/transport/transport_layer_asio.cpp b/src/mongo/transport/transport_layer_asio.cpp
index cda01dbfc38..a857b28d9ce 100644
--- a/src/mongo/transport/transport_layer_asio.cpp
+++ b/src/mongo/transport/transport_layer_asio.cpp
@@ -33,6 +33,7 @@
#include "mongo/transport/transport_layer_asio.h"
+#include <fmt/format.h>
#include <fstream>
#include <asio.hpp>
@@ -59,6 +60,7 @@
#include "mongo/util/net/ssl_manager.h"
#include "mongo/util/net/ssl_options.h"
#include "mongo/util/options_parser/startup_options.h"
+#include "mongo/util/strong_weak_finish_line.h"
#ifdef MONGO_CONFIG_SSL
#include "mongo/util/net/ssl.hpp"
@@ -294,6 +296,60 @@ TransportLayerASIO::Options::Options(const ServerGlobalParams* params,
maxConns(params->maxConns) {
}
+TransportLayerASIO::TimerService::TimerService()
+ : _reactor(std::make_shared<TransportLayerASIO::ASIOReactor>()) {}
+
+TransportLayerASIO::TimerService::~TimerService() {
+ stop();
+}
+
+void TransportLayerASIO::TimerService::start() {
+ // Skip the expensive lock acquisition and `compareAndSwap` in the common path.
+ if (MONGO_likely(_state.load() != State::kInitialized))
+ return;
+
+ // The following ensures only one thread continues to spawn a thread to run the reactor. It also
+ // ensures concurrent `start()` and `stop()` invocations are serialized. Holding the lock
+ // guarantees that the following runs either before or after running `stop()`. Note that using
+ // `compareAndSwap` while holding the lock is for simplicity and not necessary.
+ auto lk = stdx::lock_guard(_mutex);
+ auto precondition = State::kInitialized;
+ if (_state.compareAndSwap(&precondition, State::kStarted)) {
+ _thread = stdx::thread([reactor = _reactor] {
+ LOGV2_INFO(5490002, "Started a new thread for the timer service");
+ reactor->run();
+ LOGV2_INFO(5490003, "Returning from the timer service thread");
+ });
+ }
+}
+
+void TransportLayerASIO::TimerService::stop() {
+ // It's possible for `stop()` to be called without `start()` having been called (or for them to
+ // be called concurrently), so we only proceed with stopping the reactor and joining the thread
+ // if we've already transitioned to the `kStarted` state.
+ auto lk = stdx::lock_guard(_mutex);
+ if (_state.swap(State::kStopped) != State::kStarted)
+ return;
+
+ _reactor->stop();
+ _thread.join();
+}
+
+std::unique_ptr<ReactorTimer> TransportLayerASIO::TimerService::makeTimer() {
+ return _getReactor()->makeTimer();
+}
+
+Date_t TransportLayerASIO::TimerService::now() {
+ return _getReactor()->now();
+}
+
+Reactor* TransportLayerASIO::TimerService::_getReactor() {
+ // TODO SERVER-57253 We can start this service as part of starting `TransportLayerASIO`.
+ // Then, we can remove the following invocation of `start()`.
+ start();
+ return _reactor.get();
+}
+
TransportLayerASIO::TransportLayerASIO(const TransportLayerASIO::Options& opts,
ServiceEntryPoint* sep,
const WireSpec& wireSpec)
@@ -302,7 +358,8 @@ TransportLayerASIO::TransportLayerASIO(const TransportLayerASIO::Options& opts,
_egressReactor(std::make_shared<ASIOReactor>()),
_acceptorReactor(std::make_shared<ASIOReactor>()),
_sep(sep),
- _listenerOptions(opts) {}
+ _listenerOptions(opts),
+ _timerService(std::make_unique<TimerService>()) {}
TransportLayerASIO::~TransportLayerASIO() = default;
@@ -539,13 +596,47 @@ StatusWith<SessionHandle> TransportLayerASIO::connect(
(sslMode == kGlobalSSLMode &&
((globalSSLMode == SSLParams::SSLMode_preferSSL) ||
(globalSSLMode == SSLParams::SSLMode_requireSSL)))) {
+ // The handshake is complete once either of the following passes the finish line:
+ // - The thread running the handshake returns from `handshakeSSLForEgress`.
+ // - The thread running `TimerService` cancels the handshake due to a timeout.
+ auto finishLine = std::make_shared<StrongWeakFinishLine>(2);
+
+ // Schedules a task to cancel the synchronous handshake if it does not complete before the
+ // specified timeout.
+ auto timer = _timerService->makeTimer();
+#ifndef _WIN32
+ // TODO SERVER-62035: enable the following on Windows.
+ if (timeout > Milliseconds(0)) {
+ timer->waitUntil(_timerService->now() + timeout)
+ .getAsync([finishLine, session](Status status) {
+ if (status.isOK() && finishLine->arriveStrongly())
+ session->end();
+ });
+ }
+#endif
+
Date_t timeBefore = Date_t::now();
auto sslStatus = session->handshakeSSLForEgress(peer).getNoThrow();
Date_t timeAfter = Date_t::now();
+
if (timeAfter - timeBefore > kSlowOperationThreshold) {
networkCounter.incrementNumSlowSSLOperations();
}
+ if (finishLine->arriveStrongly()) {
+ timer->cancel();
+ } else if (!sslStatus.isOK()) {
+ // We only take this path if the handshake times out. Overwrite the socket exception
+ // with a network timeout.
+ auto errMsg = fmt::format("SSL handshake timed out after {}",
+ (timeAfter - timeBefore).toString());
+ sslStatus = Status(ErrorCodes::NetworkTimeout, errMsg);
+ LOGV2(5490001,
+ "Timed out while running handshake",
+ "peer"_attr = peer,
+ "timeout"_attr = timeout);
+ }
+
if (!sslStatus.isOK()) {
return sslStatus;
}
@@ -1217,6 +1308,10 @@ void TransportLayerASIO::shutdown() {
return;
}
+ lk.unlock();
+ _timerService->stop();
+ lk.lock();
+
if (!_listenerOptions.isIngress()) {
// Egress only reactors never start a listener
return;
diff --git a/src/mongo/transport/transport_layer_asio.h b/src/mongo/transport/transport_layer_asio.h
index adedd3eb308..65f43e44053 100644
--- a/src/mongo/transport/transport_layer_asio.h
+++ b/src/mongo/transport/transport_layer_asio.h
@@ -117,6 +117,57 @@ public:
size_t maxConns = DEFAULT_MAX_CONN; // maximum number of active connections
};
+ /**
+ * A service, internal to `TransportLayerASIO`, that allows creating timers and running `Future`
+ * continuations when a timeout occurs. This allows setting up timeouts for synchronous
+ * operations, such as a synchronous SSL handshake. A separate thread is assigned to run these
+ * timers to:
+ * - Ensure there is always a thread running the timers, regardless of using a synchronous or
+ * asynchronous listener.
+ * - Avoid any performance implications on other reactors (e.g., the `egressReactor`).
+ * The public visibility is only for testing purposes and this service is not intended to be
+ * used outside `TransportLayerASIO`.
+ */
+ class TimerService {
+ public:
+ TimerService();
+ ~TimerService();
+
+ /**
+ * Spawns a thread to run the reactor.
+ * Immediately returns if the service has already started.
+ * May be called more than once, and concurrently.
+ */
+ void start();
+
+ /**
+ * Stops the reactor and joins the thread.
+ * Immediately returns if the service is not started, or already stopped.
+ * May be called more than once, and concurrently.
+ */
+ void stop();
+
+ std::unique_ptr<ReactorTimer> makeTimer();
+
+ Date_t now();
+
+ private:
+ Reactor* _getReactor();
+
+ const std::shared_ptr<Reactor> _reactor;
+
+ // Serializes invocations of `start()` and `stop()`, and allows updating `_state` and
+ // `_thread` as a single atomic operation.
+ Mutex _mutex = MONGO_MAKE_LATCH("TransportLayerASIO::TimerService::_mutex");
+
+ // State transitions: `kInitialized` --> `kStarted` --> `kStopped`
+ // |_______________________________^
+ enum class State { kInitialized, kStarted, kStopped };
+ AtomicWord<State> _state;
+
+ stdx::thread _thread;
+ };
+
TransportLayerASIO(const Options& opts,
ServiceEntryPoint* sep,
const WireSpec& wireSpec = WireSpec::instance());
@@ -247,6 +298,8 @@ private:
int _listenerPort = 0;
bool _isShutdown = false;
+
+ const std::unique_ptr<TimerService> _timerService;
};
} // namespace transport
diff --git a/src/mongo/transport/transport_layer_asio_test.cpp b/src/mongo/transport/transport_layer_asio_test.cpp
index 60138912f22..e5ca6fe201f 100644
--- a/src/mongo/transport/transport_layer_asio_test.cpp
+++ b/src/mongo/transport/transport_layer_asio_test.cpp
@@ -30,6 +30,7 @@
#include "mongo/transport/transport_layer_asio.h"
+#include <fstream>
#include <queue>
#include <system_error>
#include <utility>
@@ -38,6 +39,8 @@
#include <asio.hpp>
#include <fmt/format.h>
+#include "mongo/client/dbclient_connection.h"
+#include "mongo/config.h"
#include "mongo/db/server_options.h"
#include "mongo/db/service_context_test_fixture.h"
#include "mongo/logv2/log.h"
@@ -55,7 +58,9 @@
#include "mongo/util/concurrency/notification.h"
#include "mongo/util/net/sock.h"
#include "mongo/util/scopeguard.h"
+#include "mongo/util/static_immortal.h"
#include "mongo/util/synchronized_value.h"
+#include "mongo/util/thread_context.h"
#include "mongo/util/time_support.h"
namespace mongo {
@@ -306,8 +311,8 @@ public:
}
private:
- std::unique_ptr<transport::TransportLayerASIO> _tla;
MockSEP _sep;
+ std::unique_ptr<transport::TransportLayerASIO> _tla;
};
TEST(TransportLayerASIO, ListenerPortZeroTreatedAsEphemeral) {
@@ -563,6 +568,47 @@ TEST(TransportLayerASIO, ConfirmSocketSetOptionOnResetConnections) {
class TransportLayerASIOWithServiceContextTest : public ServiceContextTest {
public:
+ /**
+ * `ThreadCounter` and `ThreadToken` allow tracking the number of active (running) threads.
+ * For each thread, a `ThreadToken` is created. The token notifies `ThreadCounter` about
+ * creation and destruction of its associated thread. This allows maintaining the number of
+ * active threads at any point during the execution of this unit-test.
+ */
+ class ThreadCounter {
+ public:
+ static ThreadCounter& get() {
+ static StaticImmortal<ThreadCounter> instance;
+ return *instance;
+ }
+
+ int64_t count() const {
+ const auto count = _count.load();
+ invariant(count > 0);
+ return count;
+ }
+
+ void onCreateThread() {
+ _count.fetchAndAdd(1);
+ }
+
+ void onDestroyThread() {
+ _count.fetchAndAdd(-1);
+ }
+
+ private:
+ AtomicWord<int64_t> _count;
+ };
+
+ struct ThreadToken {
+ ThreadToken() {
+ ThreadCounter::get().onCreateThread();
+ }
+
+ ~ThreadToken() {
+ ThreadCounter::get().onDestroyThread();
+ }
+ };
+
void setUp() override {
auto sep = std::make_unique<MockSEP>();
auto tl = makeTLA(sep.get());
@@ -580,10 +626,99 @@ public:
}
};
+const auto getThreadToken =
+ ThreadContext::declareDecoration<TransportLayerASIOWithServiceContextTest::ThreadToken>();
+
+TEST_F(TransportLayerASIOWithServiceContextTest, TimerServiceDoesNotSpawnThreadsBeforeStart) {
+ const auto beforeThreadCount = ThreadCounter::get().count();
+ transport::TransportLayerASIO::TimerService service;
+ // Note that the following is a best-effort and not deterministic as we don't have control over
+ // when threads may start running and advance the thread count.
+ const auto afterThreadCount = ThreadCounter::get().count();
+ ASSERT_EQ(beforeThreadCount, afterThreadCount);
+}
+
+TEST_F(TransportLayerASIOWithServiceContextTest, TimerServiceOneShotStart) {
+ const auto beforeThreadCount = ThreadCounter::get().count();
+ transport::TransportLayerASIO::TimerService service;
+ service.start();
+ LOGV2(5490004, "Waiting for the timer thread to start", "threads"_attr = beforeThreadCount);
+ while (ThreadCounter::get().count() == beforeThreadCount) {
+ sleepFor(Milliseconds(1));
+ }
+ const auto afterThreadCount = ThreadCounter::get().count();
+ LOGV2(5490005, "Returned from waiting for the timer thread", "threads"_attr = afterThreadCount);
+
+ // Start the service a few times and verify that the thread count has not changed. Note that the
+ // following is a best-effort and not deterministic as we don't have control over when threads
+ // may start running and advance the thread count.
+ service.start();
+ service.start();
+ service.start();
+ ASSERT_EQ(afterThreadCount, ThreadCounter::get().count());
+}
+
+TEST_F(TransportLayerASIOWithServiceContextTest, TimerServiceDoesNotStartAfterStop) {
+ const auto beforeThreadCount = ThreadCounter::get().count();
+ transport::TransportLayerASIO::TimerService service;
+ service.stop();
+ service.start();
+ const auto afterThreadCount = ThreadCounter::get().count();
+ // The test would fail if `start` proceeds to spawn a thread for `service`.
+ ASSERT_EQ(beforeThreadCount, afterThreadCount);
+}
+
+TEST_F(TransportLayerASIOWithServiceContextTest, TimerServiceCanStopMoreThanOnce) {
+ // Verifying that it is safe to have multiple calls to `stop()`.
+ {
+ transport::TransportLayerASIO::TimerService service;
+ service.start();
+ service.stop();
+ service.stop();
+ }
+ {
+ transport::TransportLayerASIO::TimerService service;
+ service.stop();
+ service.stop();
+ }
+}
+
TEST_F(TransportLayerASIOWithServiceContextTest, TransportStartAfterShutDown) {
tla().shutdown();
ASSERT_EQ(tla().start(), transport::TransportLayer::ShutdownStatus);
}
+#ifdef MONGO_CONFIG_SSL
+#ifndef _WIN32
+// TODO SERVER-62035: enable the following on Windows.
+TEST_F(TransportLayerASIOWithServiceContextTest, ShutdownDuringSSLHandshake) {
+ /**
+ * Creates a server and a client thread:
+ * - The server listens for incoming connections, but doesn't participate in SSL handshake.
+ * - The client connects to the server, and is configured to perform SSL handshake.
+ * The server never writes on the socket in response to the handshake request, thus the client
+ * should block until it is timed out.
+ * The goal is to simulate a server crash, and verify the behavior of the client, during the
+ * handshake process.
+ */
+ int port = tla().listenerPort();
+
+ DBClientConnection conn;
+ conn.setSoTimeout(1); // 1 second timeout
+
+ TransientSSLParams params;
+ params.sslClusterPEMPayload = [] {
+ std::ifstream input("jstests/libs/client.pem");
+ std::string str((std::istreambuf_iterator<char>(input)), std::istreambuf_iterator<char>());
+ return str;
+ }();
+ params.targetedClusterConnectionString = ConnectionString::forLocal();
+
+ auto status = conn.connectSocketOnly({"localhost", port}, std::move(params));
+ ASSERT_EQ(status, ErrorCodes::HostUnreachable);
+}
+#endif // _WIN32
+#endif // MONGO_CONFIG_SSL
+
} // namespace
} // namespace mongo