summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/mongo/transport/transport_layer_asio.cpp9
-rw-r--r--src/mongo/transport/transport_layer_asio.h8
-rw-r--r--src/mongo/transport/transport_layer_asio_test.cpp101
3 files changed, 58 insertions, 60 deletions
diff --git a/src/mongo/transport/transport_layer_asio.cpp b/src/mongo/transport/transport_layer_asio.cpp
index 3593cc7673a..387a0943d10 100644
--- a/src/mongo/transport/transport_layer_asio.cpp
+++ b/src/mongo/transport/transport_layer_asio.cpp
@@ -288,8 +288,11 @@ TransportLayerASIO::Options::Options(const ServerGlobalParams* params)
maxConns(params->maxConns) {
}
-TransportLayerASIO::TimerService::TimerService()
- : _reactor(std::make_shared<TransportLayerASIO::ASIOReactor>()) {}
+TransportLayerASIO::TimerService::TimerService(Options opt)
+ : _reactor(std::make_shared<TransportLayerASIO::ASIOReactor>()) {
+ if (opt.spawn)
+ _spawn = std::move(opt.spawn);
+}
TransportLayerASIO::TimerService::~TimerService() {
stop();
@@ -307,7 +310,7 @@ void TransportLayerASIO::TimerService::start() {
auto lk = stdx::lock_guard(_mutex);
auto precondition = State::kInitialized;
if (_state.compareAndSwap(&precondition, State::kStarted)) {
- _thread = stdx::thread([reactor = _reactor] {
+ _thread = _spawn([reactor = _reactor] {
LOGV2_INFO(5490002, "Started a new thread for the timer service");
reactor->run();
LOGV2_INFO(5490003, "Returning from the timer service thread");
diff --git a/src/mongo/transport/transport_layer_asio.h b/src/mongo/transport/transport_layer_asio.h
index e10d050319c..a3ded3b164a 100644
--- a/src/mongo/transport/transport_layer_asio.h
+++ b/src/mongo/transport/transport_layer_asio.h
@@ -130,7 +130,12 @@ public:
*/
class TimerService {
public:
- TimerService();
+ using Spawn = std::function<stdx::thread(std::function<void()>)>;
+ struct Options {
+ Spawn spawn;
+ };
+ explicit TimerService(Options opt);
+ TimerService() : TimerService(Options{}) {}
~TimerService();
/**
@@ -165,6 +170,7 @@ public:
enum class State { kInitialized, kStarted, kStopped };
AtomicWord<State> _state;
+ Spawn _spawn = [](std::function<void()> f) { return stdx::thread{std::move(f)}; };
stdx::thread _thread;
};
diff --git a/src/mongo/transport/transport_layer_asio_test.cpp b/src/mongo/transport/transport_layer_asio_test.cpp
index 664e9e4df11..c73358f11f7 100644
--- a/src/mongo/transport/transport_layer_asio_test.cpp
+++ b/src/mongo/transport/transport_layer_asio_test.cpp
@@ -380,45 +380,50 @@ TEST(TransportLayerASIO, SwitchTimeoutModes) {
class TransportLayerASIOWithServiceContextTest : public ServiceContextTest {
public:
- /**
- * `ThreadCounter` and `ThreadToken` allow tracking the number of active (running) threads.
- * For each thread, a `ThreadToken` is created. The token notifies `ThreadCounter` about
- * creation and destruction of its associated thread. This allows maintaining the number of
- * active threads at any point during the execution of this unit-test.
- */
class ThreadCounter {
public:
- static ThreadCounter& get() {
- static StaticImmortal<ThreadCounter> instance;
- return *instance;
+ std::function<stdx::thread(std::function<void()>)> makeSpawnFunc() {
+ return [core = _core](std::function<void()> cb) {
+ {
+ stdx::lock_guard lk(core->mutex);
+ ++core->created;
+ core->cv.notify_all();
+ }
+ return stdx::thread{[core, cb = std::move(cb)]() mutable {
+ {
+ stdx::lock_guard lk(core->mutex);
+ ++core->started;
+ core->cv.notify_all();
+ }
+ cb();
+ }};
+ };
}
- int64_t count() const {
- const auto count = _count.load();
- invariant(count > 0);
- return count;
+ int64_t created() const {
+ stdx::lock_guard lk(_core->mutex);
+ return _core->created;
}
- void onCreateThread() {
- _count.fetchAndAdd(1);
+ int64_t started() const {
+ stdx::lock_guard lk(_core->mutex);
+ return _core->started;
}
- void onDestroyThread() {
- _count.fetchAndAdd(-1);
+ template <typename Pred>
+ void waitForStarted(const Pred& pred) const {
+ stdx::unique_lock lk(_core->mutex);
+ _core->cv.wait(lk, [&] { return pred(_core->started); });
}
private:
- AtomicWord<int64_t> _count;
- };
-
- struct ThreadToken {
- ThreadToken() {
- ThreadCounter::get().onCreateThread();
- }
-
- ~ThreadToken() {
- ThreadCounter::get().onDestroyThread();
- }
+ struct Core {
+ mutable stdx::mutex mutex; // NOLINT
+ mutable stdx::condition_variable cv;
+ int64_t created = 0;
+ int64_t started = 0;
+ };
+ std::shared_ptr<Core> _core = std::make_shared<Core>();
};
void setUp() override {
@@ -438,47 +443,32 @@ public:
}
};
-#if 0
-const auto getThreadToken =
- ThreadContext::declareDecoration<TransportLayerASIOWithServiceContextTest::ThreadToken>();
-
TEST_F(TransportLayerASIOWithServiceContextTest, TimerServiceDoesNotSpawnThreadsBeforeStart) {
- const auto beforeThreadCount = ThreadCounter::get().count();
- transport::TransportLayerASIO::TimerService service;
- // Note that the following is a best-effort and not deterministic as we don't have control over
- // when threads may start running and advance the thread count.
- const auto afterThreadCount = ThreadCounter::get().count();
- ASSERT_EQ(beforeThreadCount, afterThreadCount);
+ ThreadCounter counter;
+ { transport::TransportLayerASIO::TimerService service{{counter.makeSpawnFunc()}}; }
+ ASSERT_EQ(counter.created(), 0);
}
TEST_F(TransportLayerASIOWithServiceContextTest, TimerServiceOneShotStart) {
- const auto beforeThreadCount = ThreadCounter::get().count();
- transport::TransportLayerASIO::TimerService service;
+ ThreadCounter counter;
+ transport::TransportLayerASIO::TimerService service{{counter.makeSpawnFunc()}};
service.start();
- LOGV2(5490004, "Waiting for the timer thread to start", "threads"_attr = beforeThreadCount);
- while (ThreadCounter::get().count() == beforeThreadCount) {
- sleepFor(Milliseconds(1));
- }
- const auto afterThreadCount = ThreadCounter::get().count();
- LOGV2(5490005, "Returned from waiting for the timer thread", "threads"_attr = afterThreadCount);
+ LOGV2(5490004, "Awaiting timer thread start", "threads"_attr = counter.started());
+ counter.waitForStarted([](auto n) { return n > 0; });
+ LOGV2(5490005, "Awaited timer thread start", "threads"_attr = counter.started());
- // Start the service a few times and verify that the thread count has not changed. Note that the
- // following is a best-effort and not deterministic as we don't have control over when threads
- // may start running and advance the thread count.
service.start();
service.start();
service.start();
- ASSERT_EQ(afterThreadCount, ThreadCounter::get().count());
+ ASSERT_EQ(counter.created(), 1) << "Redundant start should spawn only once";
}
TEST_F(TransportLayerASIOWithServiceContextTest, TimerServiceDoesNotStartAfterStop) {
- const auto beforeThreadCount = ThreadCounter::get().count();
- transport::TransportLayerASIO::TimerService service;
+ ThreadCounter counter;
+ transport::TransportLayerASIO::TimerService service{{counter.makeSpawnFunc()}};
service.stop();
service.start();
- const auto afterThreadCount = ThreadCounter::get().count();
- // The test would fail if `start` proceeds to spawn a thread for `service`.
- ASSERT_EQ(beforeThreadCount, afterThreadCount);
+ ASSERT_EQ(counter.created(), 0) << "Stop then start should not spawn";
}
TEST_F(TransportLayerASIOWithServiceContextTest, TimerServiceCanStopMoreThanOnce) {
@@ -495,7 +485,6 @@ TEST_F(TransportLayerASIOWithServiceContextTest, TimerServiceCanStopMoreThanOnce
service.stop();
}
}
-#endif
#ifdef MONGO_CONFIG_SSL
#ifndef _WIN32