diff options
-rw-r--r-- | src/mongo/transport/transport_layer_asio.cpp | 9 | ||||
-rw-r--r-- | src/mongo/transport/transport_layer_asio.h | 8 | ||||
-rw-r--r-- | src/mongo/transport/transport_layer_asio_test.cpp | 101 |
3 files changed, 58 insertions, 60 deletions
diff --git a/src/mongo/transport/transport_layer_asio.cpp b/src/mongo/transport/transport_layer_asio.cpp index 3593cc7673a..387a0943d10 100644 --- a/src/mongo/transport/transport_layer_asio.cpp +++ b/src/mongo/transport/transport_layer_asio.cpp @@ -288,8 +288,11 @@ TransportLayerASIO::Options::Options(const ServerGlobalParams* params) maxConns(params->maxConns) { } -TransportLayerASIO::TimerService::TimerService() - : _reactor(std::make_shared<TransportLayerASIO::ASIOReactor>()) {} +TransportLayerASIO::TimerService::TimerService(Options opt) + : _reactor(std::make_shared<TransportLayerASIO::ASIOReactor>()) { + if (opt.spawn) + _spawn = std::move(opt.spawn); +} TransportLayerASIO::TimerService::~TimerService() { stop(); @@ -307,7 +310,7 @@ void TransportLayerASIO::TimerService::start() { auto lk = stdx::lock_guard(_mutex); auto precondition = State::kInitialized; if (_state.compareAndSwap(&precondition, State::kStarted)) { - _thread = stdx::thread([reactor = _reactor] { + _thread = _spawn([reactor = _reactor] { LOGV2_INFO(5490002, "Started a new thread for the timer service"); reactor->run(); LOGV2_INFO(5490003, "Returning from the timer service thread"); diff --git a/src/mongo/transport/transport_layer_asio.h b/src/mongo/transport/transport_layer_asio.h index e10d050319c..a3ded3b164a 100644 --- a/src/mongo/transport/transport_layer_asio.h +++ b/src/mongo/transport/transport_layer_asio.h @@ -130,7 +130,12 @@ public: */ class TimerService { public: - TimerService(); + using Spawn = std::function<stdx::thread(std::function<void()>)>; + struct Options { + Spawn spawn; + }; + explicit TimerService(Options opt); + TimerService() : TimerService(Options{}) {} ~TimerService(); /** @@ -165,6 +170,7 @@ public: enum class State { kInitialized, kStarted, kStopped }; AtomicWord<State> _state; + Spawn _spawn = [](std::function<void()> f) { return stdx::thread{std::move(f)}; }; stdx::thread _thread; }; diff --git a/src/mongo/transport/transport_layer_asio_test.cpp b/src/mongo/transport/transport_layer_asio_test.cpp index 664e9e4df11..c73358f11f7 100644 --- a/src/mongo/transport/transport_layer_asio_test.cpp +++ b/src/mongo/transport/transport_layer_asio_test.cpp @@ -380,45 +380,50 @@ TEST(TransportLayerASIO, SwitchTimeoutModes) { class TransportLayerASIOWithServiceContextTest : public ServiceContextTest { public: - /** - * `ThreadCounter` and `ThreadToken` allow tracking the number of active (running) threads. - * For each thread, a `ThreadToken` is created. The token notifies `ThreadCounter` about - * creation and destruction of its associated thread. This allows maintaining the number of - * active threads at any point during the execution of this unit-test. - */ class ThreadCounter { public: - static ThreadCounter& get() { - static StaticImmortal<ThreadCounter> instance; - return *instance; + std::function<stdx::thread(std::function<void()>)> makeSpawnFunc() { + return [core = _core](std::function<void()> cb) { + { + stdx::lock_guard lk(core->mutex); + ++core->created; + core->cv.notify_all(); + } + return stdx::thread{[core, cb = std::move(cb)]() mutable { + { + stdx::lock_guard lk(core->mutex); + ++core->started; + core->cv.notify_all(); + } + cb(); + }}; + }; } - int64_t count() const { - const auto count = _count.load(); - invariant(count > 0); - return count; + int64_t created() const { + stdx::lock_guard lk(_core->mutex); + return _core->created; } - void onCreateThread() { - _count.fetchAndAdd(1); + int64_t started() const { + stdx::lock_guard lk(_core->mutex); + return _core->started; } - void onDestroyThread() { - _count.fetchAndAdd(-1); + template <typename Pred> + void waitForStarted(const Pred& pred) const { + stdx::unique_lock lk(_core->mutex); + _core->cv.wait(lk, [&] { return pred(_core->started); }); } private: - AtomicWord<int64_t> _count; - }; - - struct ThreadToken { - ThreadToken() { - ThreadCounter::get().onCreateThread(); - } - - ~ThreadToken() { - ThreadCounter::get().onDestroyThread(); - } + struct Core { + mutable stdx::mutex mutex; // NOLINT + mutable stdx::condition_variable cv; + int64_t created = 0; + int64_t started = 0; + }; + std::shared_ptr<Core> _core = std::make_shared<Core>(); }; void setUp() override { @@ -438,47 +443,32 @@ public: } }; -#if 0 -const auto getThreadToken = - ThreadContext::declareDecoration<TransportLayerASIOWithServiceContextTest::ThreadToken>(); - TEST_F(TransportLayerASIOWithServiceContextTest, TimerServiceDoesNotSpawnThreadsBeforeStart) { - const auto beforeThreadCount = ThreadCounter::get().count(); - transport::TransportLayerASIO::TimerService service; - // Note that the following is a best-effort and not deterministic as we don't have control over - // when threads may start running and advance the thread count. - const auto afterThreadCount = ThreadCounter::get().count(); - ASSERT_EQ(beforeThreadCount, afterThreadCount); + ThreadCounter counter; + { transport::TransportLayerASIO::TimerService service{{counter.makeSpawnFunc()}}; } + ASSERT_EQ(counter.created(), 0); } TEST_F(TransportLayerASIOWithServiceContextTest, TimerServiceOneShotStart) { - const auto beforeThreadCount = ThreadCounter::get().count(); - transport::TransportLayerASIO::TimerService service; + ThreadCounter counter; + transport::TransportLayerASIO::TimerService service{{counter.makeSpawnFunc()}}; service.start(); - LOGV2(5490004, "Waiting for the timer thread to start", "threads"_attr = beforeThreadCount); - while (ThreadCounter::get().count() == beforeThreadCount) { - sleepFor(Milliseconds(1)); - } - const auto afterThreadCount = ThreadCounter::get().count(); - LOGV2(5490005, "Returned from waiting for the timer thread", "threads"_attr = afterThreadCount); + LOGV2(5490004, "Awaiting timer thread start", "threads"_attr = counter.started()); + counter.waitForStarted([](auto n) { return n > 0; }); + LOGV2(5490005, "Awaited timer thread start", "threads"_attr = counter.started()); - // Start the service a few times and verify that the thread count has not changed. Note that the - // following is a best-effort and not deterministic as we don't have control over when threads - // may start running and advance the thread count. service.start(); service.start(); service.start(); - ASSERT_EQ(afterThreadCount, ThreadCounter::get().count()); + ASSERT_EQ(counter.created(), 1) << "Redundant start should spawn only once"; } TEST_F(TransportLayerASIOWithServiceContextTest, TimerServiceDoesNotStartAfterStop) { - const auto beforeThreadCount = ThreadCounter::get().count(); - transport::TransportLayerASIO::TimerService service; + ThreadCounter counter; + transport::TransportLayerASIO::TimerService service{{counter.makeSpawnFunc()}}; service.stop(); service.start(); - const auto afterThreadCount = ThreadCounter::get().count(); - // The test would fail if `start` proceeds to spawn a thread for `service`. - ASSERT_EQ(beforeThreadCount, afterThreadCount); + ASSERT_EQ(counter.created(), 0) << "Stop then start should not spawn"; } TEST_F(TransportLayerASIOWithServiceContextTest, TimerServiceCanStopMoreThanOnce) { @@ -495,7 +485,6 @@ TEST_F(TransportLayerASIOWithServiceContextTest, TimerServiceCanStopMoreThanOnce service.stop(); } } -#endif #ifdef MONGO_CONFIG_SSL #ifndef _WIN32 |