diff options
author | Amirsaman Memaripour <amirsaman.memaripour@mongodb.com> | 2020-06-30 20:12:29 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2020-07-13 18:43:43 +0000 |
commit | 5893ced59bd4f54626d92b72a6a776a2f2ec2ccc (patch) | |
tree | 92ea718a0267b0c228a2a2b1fd1a70eb7a6efa47 /src/mongo/transport/service_executor_test.cpp | |
parent | 5679ec1dc597df81cbf7e2bf153b5859489521c9 (diff) | |
download | mongo-5893ced59bd4f54626d92b72a6a776a2f2ec2ccc.tar.gz |
SERVER-49105 Create fixed thread pool ServiceExecutor
Diffstat (limited to 'src/mongo/transport/service_executor_test.cpp')
-rw-r--r-- | src/mongo/transport/service_executor_test.cpp | 187 |
1 files changed, 187 insertions, 0 deletions
diff --git a/src/mongo/transport/service_executor_test.cpp b/src/mongo/transport/service_executor_test.cpp index 4838789df47..99ee7f3ddb5 100644 --- a/src/mongo/transport/service_executor_test.cpp +++ b/src/mongo/transport/service_executor_test.cpp @@ -32,13 +32,21 @@ #include "mongo/platform/basic.h" #include "boost/optional.hpp" +#include <algorithm> +#include "mongo/bson/bsonobjbuilder.h" #include "mongo/db/service_context.h" #include "mongo/logv2/log.h" +#include "mongo/transport/service_executor_fixed.h" +#include "mongo/transport/service_executor_gen.h" #include "mongo/transport/service_executor_synchronous.h" #include "mongo/transport/transport_layer.h" #include "mongo/unittest/barrier.h" +#include "mongo/unittest/death_test.h" #include "mongo/unittest/unittest.h" +#include "mongo/util/concurrency/thread_pool.h" +#include "mongo/util/fail_point.h" +#include "mongo/util/future.h" #include "mongo/util/scopeguard.h" #include <asio.hpp> @@ -158,6 +166,185 @@ TEST_F(ServiceExecutorSynchronousFixture, ScheduleFailsBeforeStartup) { scheduleBasicTask(executor.get(), false); } +class ServiceExecutorFixedFixture : public unittest::Test { +public: + static constexpr auto kNumExecutorThreads = 2; + + void setUp() override { + ThreadPool::Options options; + options.minThreads = options.maxThreads = kNumExecutorThreads; + options.poolName = "Test"; + _executor = std::make_shared<ServiceExecutorFixed>(std::move(options)); + } + + void tearDown() override { + if (_skipShutdown) + return; + ASSERT_OK(_executor->shutdown(kShutdownTime)); + } + + void skipShutdown(bool skip) { + _skipShutdown = skip; + } + + auto getServiceExecutor() const { + return _executor; + } + + auto startAndGetServiceExecutor() { + ASSERT_OK(_executor->start()); + return getServiceExecutor(); + } + + auto makeRecursionGuard() { + _recursionDepth.fetchAndAdd(1); + return makeGuard([this] { _recursionDepth.fetchAndSubtract(1); }); + } + + auto getRecursionDepth() const { + return _recursionDepth.load(); + } + +private: + AtomicWord<int> _recursionDepth{0}; + bool _skipShutdown = false; + std::shared_ptr<ServiceExecutorFixed> _executor; +}; + +TEST_F(ServiceExecutorFixedFixture, ScheduleFailsBeforeStartup) { + auto executor = getServiceExecutor(); + ASSERT_NOT_OK(executor->schedule([] {}, ServiceExecutor::kEmptyFlags)); +} + +DEATH_TEST_F(ServiceExecutorFixedFixture, DestructorFailsBeforeShutdown, "invariant") { + startAndGetServiceExecutor(); + skipShutdown(true); +} + +TEST_F(ServiceExecutorFixedFixture, BasicTaskRuns) { + auto executor = startAndGetServiceExecutor(); + auto barrier = std::make_shared<unittest::Barrier>(2); + ASSERT_OK(executor->schedule([barrier]() mutable { barrier->countDownAndWait(); }, + ServiceExecutor::kEmptyFlags)); + barrier->countDownAndWait(); +} + +TEST_F(ServiceExecutorFixedFixture, RecursiveTask) { + auto executor = startAndGetServiceExecutor(); + auto barrier = std::make_shared<unittest::Barrier>(2); + + ServiceExecutor::Task recursiveTask; + recursiveTask = [this, executor, barrier, &recursiveTask] { + auto recursionGuard = makeRecursionGuard(); + if (getRecursionDepth() < fixedServiceExecutorRecursionLimit.load()) { + ASSERT_OK(executor->schedule(recursiveTask, ServiceExecutor::kMayRecurse)); + } else { + // This test never returns unless the service executor can satisfy the recursion depth. + barrier->countDownAndWait(); + } + }; + + // Schedule recursive task and wait for the recursion to stop + ASSERT_OK(executor->schedule(recursiveTask, ServiceExecutor::kMayRecurse)); + barrier->countDownAndWait(); +} + +TEST_F(ServiceExecutorFixedFixture, FlattenRecursiveScheduledTasks) { + auto executor = startAndGetServiceExecutor(); + auto barrier = std::make_shared<unittest::Barrier>(2); + AtomicWord<int> tasksToSchedule{fixedServiceExecutorRecursionLimit.load() * 3}; + + // A recursive task that expects to be executed non-recursively. The task recursively schedules + // "tasksToSchedule" tasks to the service executor, and each scheduled task verifies that the + // recursion depth remains zero during its execution. + ServiceExecutor::Task recursiveTask; + recursiveTask = [this, executor, barrier, &recursiveTask, &tasksToSchedule] { + auto recursionGuard = makeRecursionGuard(); + ASSERT_EQ(getRecursionDepth(), 1); + if (tasksToSchedule.fetchAndSubtract(1) > 0) { + ASSERT_OK(executor->schedule(recursiveTask, ServiceExecutor::kEmptyFlags)); + } else { + // Once there are no more tasks to schedule, notify the main thread to proceed. + barrier->countDownAndWait(); + } + }; + + // Schedule the recursive task and wait for the execution to finish. + ASSERT_OK(executor->schedule(recursiveTask, ServiceExecutor::kMayYieldBeforeSchedule)); + barrier->countDownAndWait(); +} + +TEST_F(ServiceExecutorFixedFixture, ShutdownTimeLimit) { + auto executor = startAndGetServiceExecutor(); + auto invoked = std::make_shared<SharedPromise<void>>(); + auto mayReturn = std::make_shared<SharedPromise<void>>(); + + ASSERT_OK(executor->schedule( + [executor, invoked, mayReturn]() mutable { + invoked->emplaceValue(); + mayReturn->getFuture().get(); + }, + ServiceExecutor::kEmptyFlags)); + + invoked->getFuture().get(); + ASSERT_NOT_OK(executor->shutdown(kShutdownTime)); + + // Ensure the service executor is stopped before leaving the test. + mayReturn->emplaceValue(); +} + +TEST_F(ServiceExecutorFixedFixture, Stats) { + auto executor = startAndGetServiceExecutor(); + auto barrier = std::make_shared<unittest::Barrier>(kNumExecutorThreads + 1); + stdx::condition_variable cond; + + auto task = [barrier, &cond]() mutable { + cond.notify_one(); + barrier->countDownAndWait(); + }; + + for (auto i = 0; i < kNumExecutorThreads; i++) { + ASSERT_OK(executor->schedule(task, ServiceExecutor::kEmptyFlags)); + } + + // The main thread waits for the executor threads to bump up "threadsRunning" while picking up a + // task to execute. Once all executor threads are running, the main thread will unblock them + // through the barrier. + auto mutex = MONGO_MAKE_LATCH(); + stdx::unique_lock<Latch> lk(mutex); + cond.wait(lk, [&executor]() { + BSONObjBuilder bob; + executor->appendStats(&bob); + auto obj = bob.obj(); + ASSERT(obj.hasField("threadsRunning")); + auto threadsRunning = obj.getIntField("threadsRunning"); + LOGV2_DEBUG( + 4910503, 1, "Checked number of executor threads", "threads"_attr = threadsRunning); + return threadsRunning == static_cast<int>(ServiceExecutorFixedFixture::kNumExecutorThreads); + }); + + barrier->countDownAndWait(); +} + +TEST_F(ServiceExecutorFixedFixture, ScheduleFailsAfterShutdown) { + auto executor = startAndGetServiceExecutor(); + std::unique_ptr<stdx::thread> schedulerThread; + + { + // Spawn a thread to schedule a task, and block it before it can schedule the task with the + // underlying thread-pool. Then shutdown the service executor and unblock the scheduler + // thread. This order of events must cause "schedule()" to return a non-okay status. + FailPointEnableBlock failpoint("hangBeforeSchedulingServiceExecutorFixedTask"); + schedulerThread = std::make_unique<stdx::thread>([executor] { + ASSERT_NOT_OK( + executor->schedule([] { MONGO_UNREACHABLE; }, ServiceExecutor::kEmptyFlags)); + }); + failpoint->waitForTimesEntered(1); + ASSERT_OK(executor->shutdown(kShutdownTime)); + } + + schedulerThread->join(); +} } // namespace } // namespace mongo |