From 1f16829e38ab24e820547e78c048d31b9946f09a Mon Sep 17 00:00:00 2001 From: Billy Donahue Date: Mon, 16 May 2022 16:43:36 +0000 Subject: SERVER-66466 TransportLayerASIOTest remove ThreadContext (cherry picked from commit df9f792d75db8efd8cd697c954d7fce291b3def6) --- src/mongo/transport/transport_layer_asio.cpp | 9 +- src/mongo/transport/transport_layer_asio.h | 8 +- src/mongo/transport/transport_layer_asio_test.cpp | 100 ++++++++++------------ 3 files changed, 58 insertions(+), 59 deletions(-) diff --git a/src/mongo/transport/transport_layer_asio.cpp b/src/mongo/transport/transport_layer_asio.cpp index a857b28d9ce..10bc80f3a22 100644 --- a/src/mongo/transport/transport_layer_asio.cpp +++ b/src/mongo/transport/transport_layer_asio.cpp @@ -296,8 +296,11 @@ TransportLayerASIO::Options::Options(const ServerGlobalParams* params, maxConns(params->maxConns) { } -TransportLayerASIO::TimerService::TimerService() - : _reactor(std::make_shared()) {} +TransportLayerASIO::TimerService::TimerService(Options opt) + : _reactor(std::make_shared()) { + if (opt.spawn) + _spawn = std::move(opt.spawn); +} TransportLayerASIO::TimerService::~TimerService() { stop(); @@ -315,7 +318,7 @@ void TransportLayerASIO::TimerService::start() { auto lk = stdx::lock_guard(_mutex); auto precondition = State::kInitialized; if (_state.compareAndSwap(&precondition, State::kStarted)) { - _thread = stdx::thread([reactor = _reactor] { + _thread = _spawn([reactor = _reactor] { LOGV2_INFO(5490002, "Started a new thread for the timer service"); reactor->run(); LOGV2_INFO(5490003, "Returning from the timer service thread"); diff --git a/src/mongo/transport/transport_layer_asio.h b/src/mongo/transport/transport_layer_asio.h index 65f43e44053..90c63058c43 100644 --- a/src/mongo/transport/transport_layer_asio.h +++ b/src/mongo/transport/transport_layer_asio.h @@ -130,7 +130,12 @@ public: */ class TimerService { public: - TimerService(); + using Spawn = std::function)>; + struct Options { + Spawn spawn; + }; + explicit TimerService(Options opt); + TimerService() : TimerService(Options{}) {} ~TimerService(); /** @@ -165,6 +170,7 @@ public: enum class State { kInitialized, kStarted, kStopped }; AtomicWord _state; + Spawn _spawn = [](std::function f) { return stdx::thread{std::move(f)}; }; stdx::thread _thread; }; diff --git a/src/mongo/transport/transport_layer_asio_test.cpp b/src/mongo/transport/transport_layer_asio_test.cpp index e5ca6fe201f..fd14eec25ef 100644 --- a/src/mongo/transport/transport_layer_asio_test.cpp +++ b/src/mongo/transport/transport_layer_asio_test.cpp @@ -60,7 +60,6 @@ #include "mongo/util/scopeguard.h" #include "mongo/util/static_immortal.h" #include "mongo/util/synchronized_value.h" -#include "mongo/util/thread_context.h" #include "mongo/util/time_support.h" namespace mongo { @@ -568,45 +567,50 @@ TEST(TransportLayerASIO, ConfirmSocketSetOptionOnResetConnections) { class TransportLayerASIOWithServiceContextTest : public ServiceContextTest { public: - /** - * `ThreadCounter` and `ThreadToken` allow tracking the number of active (running) threads. - * For each thread, a `ThreadToken` is created. The token notifies `ThreadCounter` about - * creation and destruction of its associated thread. This allows maintaining the number of - * active threads at any point during the execution of this unit-test. - */ class ThreadCounter { public: - static ThreadCounter& get() { - static StaticImmortal instance; - return *instance; + std::function)> makeSpawnFunc() { + return [core = _core](std::function cb) { + { + stdx::lock_guard lk(core->mutex); + ++core->created; + core->cv.notify_all(); + } + return stdx::thread{[core, cb = std::move(cb)]() mutable { + { + stdx::lock_guard lk(core->mutex); + ++core->started; + core->cv.notify_all(); + } + cb(); + }}; + }; } - int64_t count() const { - const auto count = _count.load(); - invariant(count > 0); - return count; + int64_t created() const { + stdx::lock_guard lk(_core->mutex); + return _core->created; } - void onCreateThread() { - _count.fetchAndAdd(1); + int64_t started() const { + stdx::lock_guard lk(_core->mutex); + return _core->started; } - void onDestroyThread() { - _count.fetchAndAdd(-1); + template + void waitForStarted(const Pred& pred) const { + stdx::unique_lock lk(_core->mutex); + _core->cv.wait(lk, [&] { return pred(_core->started); }); } private: - AtomicWord _count; - }; - - struct ThreadToken { - ThreadToken() { - ThreadCounter::get().onCreateThread(); - } - - ~ThreadToken() { - ThreadCounter::get().onDestroyThread(); - } + struct Core { + mutable stdx::mutex mutex; // NOLINT + mutable stdx::condition_variable cv; + int64_t created = 0; + int64_t started = 0; + }; + std::shared_ptr _core = std::make_shared(); }; void setUp() override { @@ -626,46 +630,32 @@ public: } }; -const auto getThreadToken = - ThreadContext::declareDecoration(); - TEST_F(TransportLayerASIOWithServiceContextTest, TimerServiceDoesNotSpawnThreadsBeforeStart) { - const auto beforeThreadCount = ThreadCounter::get().count(); - transport::TransportLayerASIO::TimerService service; - // Note that the following is a best-effort and not deterministic as we don't have control over - // when threads may start running and advance the thread count. - const auto afterThreadCount = ThreadCounter::get().count(); - ASSERT_EQ(beforeThreadCount, afterThreadCount); + ThreadCounter counter; + { transport::TransportLayerASIO::TimerService service{{counter.makeSpawnFunc()}}; } + ASSERT_EQ(counter.created(), 0); } TEST_F(TransportLayerASIOWithServiceContextTest, TimerServiceOneShotStart) { - const auto beforeThreadCount = ThreadCounter::get().count(); - transport::TransportLayerASIO::TimerService service; + ThreadCounter counter; + transport::TransportLayerASIO::TimerService service{{counter.makeSpawnFunc()}}; service.start(); - LOGV2(5490004, "Waiting for the timer thread to start", "threads"_attr = beforeThreadCount); - while (ThreadCounter::get().count() == beforeThreadCount) { - sleepFor(Milliseconds(1)); - } - const auto afterThreadCount = ThreadCounter::get().count(); - LOGV2(5490005, "Returned from waiting for the timer thread", "threads"_attr = afterThreadCount); + LOGV2(5490004, "Awaiting timer thread start", "threads"_attr = counter.started()); + counter.waitForStarted([](auto n) { return n > 0; }); + LOGV2(5490005, "Awaited timer thread start", "threads"_attr = counter.started()); - // Start the service a few times and verify that the thread count has not changed. Note that the - // following is a best-effort and not deterministic as we don't have control over when threads - // may start running and advance the thread count. service.start(); service.start(); service.start(); - ASSERT_EQ(afterThreadCount, ThreadCounter::get().count()); + ASSERT_EQ(counter.created(), 1) << "Redundant start should spawn only once"; } TEST_F(TransportLayerASIOWithServiceContextTest, TimerServiceDoesNotStartAfterStop) { - const auto beforeThreadCount = ThreadCounter::get().count(); - transport::TransportLayerASIO::TimerService service; + ThreadCounter counter; + transport::TransportLayerASIO::TimerService service{{counter.makeSpawnFunc()}}; service.stop(); service.start(); - const auto afterThreadCount = ThreadCounter::get().count(); - // The test would fail if `start` proceeds to spawn a thread for `service`. - ASSERT_EQ(beforeThreadCount, afterThreadCount); + ASSERT_EQ(counter.created(), 0) << "Stop then start should not spawn"; } TEST_F(TransportLayerASIOWithServiceContextTest, TimerServiceCanStopMoreThanOnce) { -- cgit v1.2.1