summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAmirsaman Memaripour <amirsaman.memaripour@mongodb.com>2021-12-21 16:49:40 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2022-06-08 16:02:03 +0000
commit65134ea4e05c6c6d2647186782775ae0cdeb712e (patch)
treeabcd01d1cdcce47714c9f0183c4eb7b5c2780d78
parent1e4c6f23678343729a953b849da530946ca6e100 (diff)
downloadmongo-65134ea4e05c6c6d2647186782775ae0cdeb712e.tar.gz
SERVER-54900 Cancel ASIO session when SSL handshake times out
(cherry picked from commit 263c0631c8001a8cdb42aff720b8a49a621754dd)
-rw-r--r--src/mongo/transport/SConscript3
-rw-r--r--src/mongo/transport/transport_layer_asio.cpp97
-rw-r--r--src/mongo/transport/transport_layer_asio.h53
-rw-r--r--src/mongo/transport/transport_layer_asio_test.cpp183
4 files changed, 317 insertions, 19 deletions
diff --git a/src/mongo/transport/SConscript b/src/mongo/transport/SConscript
index 8349fe45be4..8e18bfb106a 100644
--- a/src/mongo/transport/SConscript
+++ b/src/mongo/transport/SConscript
@@ -183,8 +183,9 @@ tlEnv.CppUnitTest(
],
LIBDEPS=[
'$BUILD_DIR/mongo/base',
+ '$BUILD_DIR/mongo/client/clientdriver_network',
'$BUILD_DIR/mongo/db/dbmessage',
- '$BUILD_DIR/mongo/db/service_context',
+ '$BUILD_DIR/mongo/db/service_context_test_fixture',
'$BUILD_DIR/mongo/rpc/protocol',
'$BUILD_DIR/mongo/rpc/rpc',
'$BUILD_DIR/mongo/unittest/unittest',
diff --git a/src/mongo/transport/transport_layer_asio.cpp b/src/mongo/transport/transport_layer_asio.cpp
index 11282830b30..3593cc7673a 100644
--- a/src/mongo/transport/transport_layer_asio.cpp
+++ b/src/mongo/transport/transport_layer_asio.cpp
@@ -33,6 +33,7 @@
#include "mongo/transport/transport_layer_asio.h"
+#include <fmt/format.h>
#include <fstream>
#include <asio.hpp>
@@ -58,6 +59,7 @@
#include "mongo/util/net/ssl_manager.h"
#include "mongo/util/net/ssl_options.h"
#include "mongo/util/options_parser/startup_options.h"
+#include "mongo/util/strong_weak_finish_line.h"
#ifdef MONGO_CONFIG_SSL
#include "mongo/util/net/ssl.hpp"
@@ -286,6 +288,60 @@ TransportLayerASIO::Options::Options(const ServerGlobalParams* params)
maxConns(params->maxConns) {
}
+TransportLayerASIO::TimerService::TimerService()
+ : _reactor(std::make_shared<TransportLayerASIO::ASIOReactor>()) {}
+
+TransportLayerASIO::TimerService::~TimerService() {
+ stop();
+}
+
+void TransportLayerASIO::TimerService::start() {
+ // Skip the expensive lock acquisition and `compareAndSwap` in the common path.
+ if (MONGO_likely(_state.load() != State::kInitialized))
+ return;
+
+ // The following ensures only one thread continues to spawn a thread to run the reactor. It also
+ // ensures concurrent `start()` and `stop()` invocations are serialized. Holding the lock
+ // guarantees that the following runs either before or after running `stop()`. Note that using
+ // `compareAndSwap` while holding the lock is for simplicity and not necessary.
+ auto lk = stdx::lock_guard(_mutex);
+ auto precondition = State::kInitialized;
+ if (_state.compareAndSwap(&precondition, State::kStarted)) {
+ _thread = stdx::thread([reactor = _reactor] {
+ LOGV2_INFO(5490002, "Started a new thread for the timer service");
+ reactor->run();
+ LOGV2_INFO(5490003, "Returning from the timer service thread");
+ });
+ }
+}
+
+void TransportLayerASIO::TimerService::stop() {
+ // It's possible for `stop()` to be called without `start()` having been called (or for them to
+ // be called concurrently), so we only proceed with stopping the reactor and joining the thread
+ // if we've already transitioned to the `kStarted` state.
+ auto lk = stdx::lock_guard(_mutex);
+ if (_state.swap(State::kStopped) != State::kStarted)
+ return;
+
+ _reactor->stop();
+ _thread.join();
+}
+
+std::unique_ptr<ReactorTimer> TransportLayerASIO::TimerService::makeTimer() {
+ return _getReactor()->makeTimer();
+}
+
+Date_t TransportLayerASIO::TimerService::now() {
+ return _getReactor()->now();
+}
+
+Reactor* TransportLayerASIO::TimerService::_getReactor() {
+ // TODO SERVER-57253 We can start this service as part of starting `TransportLayerASIO`.
+ // Then, we can remove the following invocation of `start()`.
+ start();
+ return _reactor.get();
+}
+
TransportLayerASIO::TransportLayerASIO(const TransportLayerASIO::Options& opts,
ServiceEntryPoint* sep)
: _ingressReactor(std::make_shared<ASIOReactor>()),
@@ -296,7 +352,8 @@ TransportLayerASIO::TransportLayerASIO(const TransportLayerASIO::Options& opts,
_egressSSLContext(nullptr),
#endif
_sep(sep),
- _listenerOptions(opts) {
+ _listenerOptions(opts),
+ _timerService(std::make_unique<TimerService>()) {
}
TransportLayerASIO::~TransportLayerASIO() = default;
@@ -524,13 +581,47 @@ StatusWith<SessionHandle> TransportLayerASIO::connect(HostAndPort peer,
(sslMode == kGlobalSSLMode &&
((globalSSLMode == SSLParams::SSLMode_preferSSL) ||
(globalSSLMode == SSLParams::SSLMode_requireSSL)))) {
+ // The handshake is complete once either of the following passes the finish line:
+ // - The thread running the handshake returns from `handshakeSSLForEgress`.
+ // - The thread running `TimerService` cancels the handshake due to a timeout.
+ auto finishLine = std::make_shared<StrongWeakFinishLine>(2);
+
+ // Schedules a task to cancel the synchronous handshake if it does not complete before the
+ // specified timeout.
+ auto timer = _timerService->makeTimer();
+#ifndef _WIN32
+ // TODO SERVER-62035: enable the following on Windows.
+ if (timeout > Milliseconds(0)) {
+ timer->waitUntil(_timerService->now() + timeout)
+ .getAsync([finishLine, session](Status status) {
+ if (status.isOK() && finishLine->arriveStrongly())
+ session->end();
+ });
+ }
+#endif
+
Date_t timeBefore = Date_t::now();
auto sslStatus = session->handshakeSSLForEgress(peer).getNoThrow();
Date_t timeAfter = Date_t::now();
+
if (timeAfter - timeBefore > kSlowOperationThreshold) {
networkCounter.incrementNumSlowSSLOperations();
}
+ if (finishLine->arriveStrongly()) {
+ timer->cancel();
+ } else if (!sslStatus.isOK()) {
+ // We only take this path if the handshake times out. Overwrite the socket exception
+ // with a network timeout.
+ auto errMsg = fmt::format("SSL handshake timed out after {}",
+ (timeAfter - timeBefore).toString());
+ sslStatus = Status(ErrorCodes::NetworkTimeout, errMsg);
+ LOGV2(5490001,
+ "Timed out while running handshake",
+ "peer"_attr = peer,
+ "timeout"_attr = timeout);
+ }
+
if (!sslStatus.isOK()) {
return sslStatus;
}
@@ -1141,6 +1232,10 @@ void TransportLayerASIO::shutdown() {
return;
}
+ lk.unlock();
+ _timerService->stop();
+ lk.lock();
+
if (!_listenerOptions.isIngress()) {
// Egress only reactors never start a listener
return;
diff --git a/src/mongo/transport/transport_layer_asio.h b/src/mongo/transport/transport_layer_asio.h
index f11b0dbc274..e10d050319c 100644
--- a/src/mongo/transport/transport_layer_asio.h
+++ b/src/mongo/transport/transport_layer_asio.h
@@ -117,6 +117,57 @@ public:
TransportLayerASIO(const Options& opts, ServiceEntryPoint* sep);
+ /**
+ * A service, internal to `TransportLayerASIO`, that allows creating timers and running `Future`
+ * continuations when a timeout occurs. This allows setting up timeouts for synchronous
+ * operations, such as a synchronous SSL handshake. A separate thread is assigned to run these
+ * timers to:
+ * - Ensure there is always a thread running the timers, regardless of using a synchronous or
+ * asynchronous listener.
+ * - Avoid any performance implications on other reactors (e.g., the `egressReactor`).
+ * The public visibility is only for testing purposes and this service is not intended to be
+ * used outside `TransportLayerASIO`.
+ */
+ class TimerService {
+ public:
+ TimerService();
+ ~TimerService();
+
+ /**
+ * Spawns a thread to run the reactor.
+ * Immediately returns if the service has already started.
+ * May be called more than once, and concurrently.
+ */
+ void start();
+
+ /**
+ * Stops the reactor and joins the thread.
+ * Immediately returns if the service is not started, or already stopped.
+ * May be called more than once, and concurrently.
+ */
+ void stop();
+
+ std::unique_ptr<ReactorTimer> makeTimer();
+
+ Date_t now();
+
+ private:
+ Reactor* _getReactor();
+
+ const std::shared_ptr<Reactor> _reactor;
+
+ // Serializes invocations of `start()` and `stop()`, and allows updating `_state` and
+ // `_thread` as a single atomic operation.
+ Mutex _mutex = MONGO_MAKE_LATCH("TransportLayerASIO::TimerService::_mutex");
+
+ // State transitions: `kInitialized` --> `kStarted` --> `kStopped`
+ // |_______________________________^
+ enum class State { kInitialized, kStarted, kStopped };
+ AtomicWord<State> _state;
+
+ stdx::thread _thread;
+ };
+
~TransportLayerASIO() override;
StatusWith<SessionHandle> connect(HostAndPort peer,
@@ -216,6 +267,8 @@ private:
int _listenerPort = 0;
bool _isShutdown = false;
+
+ const std::unique_ptr<TimerService> _timerService;
};
} // namespace transport
diff --git a/src/mongo/transport/transport_layer_asio_test.cpp b/src/mongo/transport/transport_layer_asio_test.cpp
index eafc0fc777b..664e9e4df11 100644
--- a/src/mongo/transport/transport_layer_asio_test.cpp
+++ b/src/mongo/transport/transport_layer_asio_test.cpp
@@ -30,6 +30,7 @@
#include "mongo/transport/transport_layer_asio.h"
+#include <fstream>
#include <queue>
#include <system_error>
#include <utility>
@@ -37,7 +38,10 @@
#include <asio.hpp>
+#include "mongo/client/dbclient_connection.h"
+#include "mongo/config.h"
#include "mongo/db/server_options.h"
+#include "mongo/db/service_context_test_fixture.h"
#include "mongo/logv2/log.h"
#include "mongo/platform/atomic_word.h"
#include "mongo/platform/basic.h"
@@ -47,6 +51,7 @@
#include "mongo/util/assert_util.h"
#include "mongo/util/concurrency/notification.h"
#include "mongo/util/net/sock.h"
+#include "mongo/util/static_immortal.h"
#include "mongo/util/synchronized_value.h"
#include "mongo/util/time_support.h"
@@ -253,13 +258,29 @@ private:
synchronized_value<std::vector<std::unique_ptr<SessionThread>>> _sessions;
};
+std::unique_ptr<transport::TransportLayerASIO> makeTLA(ServiceEntryPoint* sep) {
+ auto options = [] {
+ ServerGlobalParams params;
+ params.noUnixSocket = true;
+ transport::TransportLayerASIO::Options opts(&params);
+ // TODO SERVER-30212 should clean this up and assign a port from the supplied port range
+ // provided by resmoke.
+ opts.port = 0;
+ return opts;
+ }();
+ auto tla = std::make_unique<transport::TransportLayerASIO>(options, sep);
+ ASSERT_OK(tla->setup());
+ ASSERT_OK(tla->start());
+ return tla;
+}
+
/**
* Properly setting up and tearing down the MockSEP and TransportLayerASIO is
* tricky. Most tests can delegate the details to this TestFixture.
*/
class TestFixture {
public:
- TestFixture() : _tla{_makeTLA()} {}
+ TestFixture() : _tla{makeTLA(&_sep)} {}
~TestFixture() {
_sep.endAllSessions({});
@@ -275,22 +296,6 @@ public:
}
private:
- std::unique_ptr<transport::TransportLayerASIO> _makeTLA() {
- auto options = [] {
- ServerGlobalParams params;
- params.noUnixSocket = true;
- transport::TransportLayerASIO::Options opts(&params);
- // TODO SERVER-30212 should clean this up and assign a port from the supplied port range
- // provided by resmoke.
- opts.port = 0;
- return opts;
- }();
- auto tla = std::make_unique<transport::TransportLayerASIO>(options, &_sep);
- ASSERT_OK(tla->setup());
- ASSERT_OK(tla->start());
- return tla;
- }
-
MockSEP _sep;
std::unique_ptr<transport::TransportLayerASIO> _tla;
};
@@ -373,5 +378,149 @@ TEST(TransportLayerASIO, SwitchTimeoutModes) {
}
}
+class TransportLayerASIOWithServiceContextTest : public ServiceContextTest {
+public:
+ /**
+ * `ThreadCounter` and `ThreadToken` allow tracking the number of active (running) threads.
+ * For each thread, a `ThreadToken` is created. The token notifies `ThreadCounter` about
+ * creation and destruction of its associated thread. This allows maintaining the number of
+ * active threads at any point during the execution of this unit-test.
+ */
+ class ThreadCounter {
+ public:
+ static ThreadCounter& get() {
+ static StaticImmortal<ThreadCounter> instance;
+ return *instance;
+ }
+
+ int64_t count() const {
+ const auto count = _count.load();
+ invariant(count > 0);
+ return count;
+ }
+
+ void onCreateThread() {
+ _count.fetchAndAdd(1);
+ }
+
+ void onDestroyThread() {
+ _count.fetchAndAdd(-1);
+ }
+
+ private:
+ AtomicWord<int64_t> _count;
+ };
+
+ struct ThreadToken {
+ ThreadToken() {
+ ThreadCounter::get().onCreateThread();
+ }
+
+ ~ThreadToken() {
+ ThreadCounter::get().onDestroyThread();
+ }
+ };
+
+ void setUp() override {
+ auto sep = std::make_unique<MockSEP>();
+ auto tl = makeTLA(sep.get());
+ getServiceContext()->setServiceEntryPoint(std::move(sep));
+ getServiceContext()->setTransportLayer(std::move(tl));
+ }
+
+ void tearDown() override {
+ getServiceContext()->getTransportLayer()->shutdown();
+ }
+
+ transport::TransportLayerASIO& tla() {
+ auto tl = getServiceContext()->getTransportLayer();
+ return *dynamic_cast<transport::TransportLayerASIO*>(tl);
+ }
+};
+
+#if 0
+const auto getThreadToken =
+ ThreadContext::declareDecoration<TransportLayerASIOWithServiceContextTest::ThreadToken>();
+
+TEST_F(TransportLayerASIOWithServiceContextTest, TimerServiceDoesNotSpawnThreadsBeforeStart) {
+ const auto beforeThreadCount = ThreadCounter::get().count();
+ transport::TransportLayerASIO::TimerService service;
+ // Note that the following is a best-effort and not deterministic as we don't have control over
+ // when threads may start running and advance the thread count.
+ const auto afterThreadCount = ThreadCounter::get().count();
+ ASSERT_EQ(beforeThreadCount, afterThreadCount);
+}
+
+TEST_F(TransportLayerASIOWithServiceContextTest, TimerServiceOneShotStart) {
+ const auto beforeThreadCount = ThreadCounter::get().count();
+ transport::TransportLayerASIO::TimerService service;
+ service.start();
+ LOGV2(5490004, "Waiting for the timer thread to start", "threads"_attr = beforeThreadCount);
+ while (ThreadCounter::get().count() == beforeThreadCount) {
+ sleepFor(Milliseconds(1));
+ }
+ const auto afterThreadCount = ThreadCounter::get().count();
+ LOGV2(5490005, "Returned from waiting for the timer thread", "threads"_attr = afterThreadCount);
+
+ // Start the service a few times and verify that the thread count has not changed. Note that the
+ // following is a best-effort and not deterministic as we don't have control over when threads
+ // may start running and advance the thread count.
+ service.start();
+ service.start();
+ service.start();
+ ASSERT_EQ(afterThreadCount, ThreadCounter::get().count());
+}
+
+TEST_F(TransportLayerASIOWithServiceContextTest, TimerServiceDoesNotStartAfterStop) {
+ const auto beforeThreadCount = ThreadCounter::get().count();
+ transport::TransportLayerASIO::TimerService service;
+ service.stop();
+ service.start();
+ const auto afterThreadCount = ThreadCounter::get().count();
+ // The test would fail if `start` proceeds to spawn a thread for `service`.
+ ASSERT_EQ(beforeThreadCount, afterThreadCount);
+}
+
+TEST_F(TransportLayerASIOWithServiceContextTest, TimerServiceCanStopMoreThanOnce) {
+ // Verifying that it is safe to have multiple calls to `stop()`.
+ {
+ transport::TransportLayerASIO::TimerService service;
+ service.start();
+ service.stop();
+ service.stop();
+ }
+ {
+ transport::TransportLayerASIO::TimerService service;
+ service.stop();
+ service.stop();
+ }
+}
+#endif
+
+#ifdef MONGO_CONFIG_SSL
+#ifndef _WIN32
+// TODO SERVER-62035: enable the following on Windows.
+TEST_F(TransportLayerASIOWithServiceContextTest, ShutdownDuringSSLHandshake) {
+ /**
+ * Creates a server and a client thread:
+ * - The server listens for incoming connections, but doesn't participate in SSL handshake.
+ * - The client connects to the server, and is configured to perform SSL handshake.
+ * The server never writes on the socket in response to the handshake request, thus the client
+ * should block until it is timed out.
+ * The goal is to simulate a server crash, and verify the behavior of the client, during the
+ * handshake process.
+ */
+ int port = tla().listenerPort();
+
+ auto uri = uassertStatusOK(MongoURI::parse("mongodb://localhost/?ssl=true"));
+ DBClientConnection conn(false, 1 /* this is ignored */, std::move(uri));
+ conn.setSoTimeout(1); // 1 second timeout
+
+ auto status = conn.connectSocketOnly({"localhost", port});
+ ASSERT_EQ(status, ErrorCodes::HostUnreachable);
+}
+#endif // _WIN32
+#endif // MONGO_CONFIG_SSL
+
} // namespace
} // namespace mongo