diff options
author | Amirsaman Memaripour <amirsaman.memaripour@mongodb.com> | 2020-06-22 16:30:35 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2020-06-23 22:36:50 +0000 |
commit | 311d7ccd0b6fd24122e28b7e8f3a1e191dd4078a (patch) | |
tree | 5e18af20c70eaf568d93778b1ee0052da3a91ace /src/mongo/transport/service_executor_adaptive_test.cpp | |
parent | 36bf915c32d551ad557ec7a1fa41890037e9f54f (diff) | |
download | mongo-311d7ccd0b6fd24122e28b7e8f3a1e191dd4078a.tar.gz |
SERVER-48973 Remove ServiceExecutorAdaptive
Diffstat (limited to 'src/mongo/transport/service_executor_adaptive_test.cpp')
-rw-r--r-- | src/mongo/transport/service_executor_adaptive_test.cpp | 397 |
1 files changed, 0 insertions, 397 deletions
diff --git a/src/mongo/transport/service_executor_adaptive_test.cpp b/src/mongo/transport/service_executor_adaptive_test.cpp deleted file mode 100644 index b47a73089a1..00000000000 --- a/src/mongo/transport/service_executor_adaptive_test.cpp +++ /dev/null @@ -1,397 +0,0 @@ -/** - * Copyright (C) 2018-present MongoDB, Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the Server Side Public License, version 1, - * as published by MongoDB, Inc. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * Server Side Public License for more details. - * - * You should have received a copy of the Server Side Public License - * along with this program. If not, see - * <http://www.mongodb.com/licensing/server-side-public-license>. - * - * As a special exception, the copyright holders give permission to link the - * code of portions of this program with the OpenSSL library under certain - * conditions as described in each individual source file and distribute - * linked combinations including the program with the OpenSSL library. You - * must comply with the Server Side Public License in all respects for - * all of the code used other than as permitted herein. If you modify file(s) - * with this exception, you may extend this exception to your version of the - * file(s), but you are not obligated to do so. If you do not wish to do so, - * delete this exception statement from your version. If you delete this - * exception statement from all source files in the program, then also delete - * it in the license file. - */ - -#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kTest - -#include "mongo/platform/basic.h" - -#include "boost/optional.hpp" - -#include "mongo/db/service_context.h" -#include "mongo/logv2/log.h" -#include "mongo/transport/service_executor_adaptive.h" -#include "mongo/unittest/unittest.h" -#include "mongo/util/scopeguard.h" - -#include <asio.hpp> - -namespace mongo { -namespace { -using namespace transport; - -struct TestOptions : public ServiceExecutorAdaptive::Options { - int reservedThreads() const final { - return 1; - } - - Milliseconds workerThreadRunTime() const final { - return Milliseconds{1000}; - } - - int runTimeJitter() const final { - return 0; - } - - Milliseconds stuckThreadTimeout() const final { - return Milliseconds{100}; - } - - Microseconds maxQueueLatency() const final { - return duration_cast<Microseconds>(Milliseconds{10}); - } - - int idlePctThreshold() const final { - return 0; - } - - int recursionLimit() const final { - return 0; - } -}; - -struct RecursionOptions : public ServiceExecutorAdaptive::Options { - int reservedThreads() const final { - return 1; - } - - Milliseconds workerThreadRunTime() const final { - return Milliseconds{1000}; - } - - int runTimeJitter() const final { - return 0; - } - - Milliseconds stuckThreadTimeout() const final { - return Milliseconds{100}; - } - - Microseconds maxQueueLatency() const final { - return duration_cast<Microseconds>(Milliseconds{5}); - } - - int idlePctThreshold() const final { - return 0; - } - - int recursionLimit() const final { - return 10; - } -}; - -class ServiceExecutorAdaptiveFixture : public unittest::Test { -protected: - void setUp() override { - setGlobalServiceContext(ServiceContext::make()); - asioIoCtx = std::make_shared<asio::io_context>(); - } - - std::shared_ptr<asio::io_context> asioIoCtx; - - mutex = MONGO_MAKE_LATCH("ServiceExecutorAdaptiveFixture::mutex"); - AtomicWord<int> waitFor{-1}; - stdx::condition_variable cond; - std::function<void()> notifyCallback = [this] { - stdx::unique_lock<Latch> lk(mutex); - invariant(waitFor.load() != -1); - waitFor.fetchAndSubtract(1); - cond.notify_one(); - LOGV2(22960, "Ran callback"); - }; - - void waitForCallback(int expected, boost::optional<Milliseconds> timeout = boost::none) { - stdx::unique_lock<Latch> lk(mutex); - invariant(waitFor.load() != -1); - if (timeout) { - ASSERT_TRUE(cond.wait_for( - lk, timeout->toSystemDuration(), [&] { return waitFor.load() == expected; })); - } else { - cond.wait(lk, [&] { return waitFor.load() == expected; }); - } - } - - ServiceExecutorAdaptive::Options* config; - - template <class Options> - std::unique_ptr<ServiceExecutorAdaptive> makeAndStartExecutor() { - auto configOwned = std::make_unique<Options>(); - config = configOwned.get(); - auto exec = std::make_unique<ServiceExecutorAdaptive>( - getGlobalServiceContext(), asioIoCtx, std::move(configOwned)); - - ASSERT_OK(exec->start()); - LOGV2(22961, "wait for executor to finish starting"); - waitFor.store(1); - ASSERT_OK(exec->schedule(notifyCallback, - ServiceExecutor::kEmptyFlags, - ServiceExecutorTaskName::kSSMProcessMessage)); - waitForCallback(0); - ASSERT_EQ(exec->threadsRunning(), config->reservedThreads()); - - return exec; - } -}; - -/* - * This tests that the executor will launch a new thread if the current threads are blocked, and - * that those threads retire when they become idle. - */ -TEST_F(ServiceExecutorAdaptiveFixture, TestStuckTask) { - auto blockedMutex = MONGO_MAKE_LATCH(); - stdx::unique_lock<Latch> blockedLock(blockedMutex); - - auto exec = makeAndStartExecutor<TestOptions>(); - auto guard = makeGuard([&] { - if (blockedLock) - blockedLock.unlock(); - ASSERT_OK(exec->shutdown(config->workerThreadRunTime() * 2)); - }); - - LOGV2(22962, "Scheduling blocked task"); - waitFor.store(3); - ASSERT_OK(exec->schedule( - [this, &blockedMutex] { - notifyCallback(); - stdx::unique_lock<Latch> lk(blockedMutex); - notifyCallback(); - }, - ServiceExecutor::kEmptyFlags, - ServiceExecutorTaskName::kSSMProcessMessage)); - - LOGV2(22963, "Scheduling task stuck on blocked task"); - ASSERT_OK(exec->schedule( - notifyCallback, ServiceExecutor::kEmptyFlags, ServiceExecutorTaskName::kSSMProcessMessage)); - - LOGV2(22964, "Waiting for second thread to start"); - waitForCallback(1); - ASSERT_EQ(exec->threadsRunning(), 2); - - LOGV2(22965, "Waiting for unstuck task to run"); - blockedLock.unlock(); - waitForCallback(0); - ASSERT_EQ(exec->threadsRunning(), 2); - - LOGV2(22966, "Waiting for second thread to idle out"); - stdx::this_thread::sleep_for(config->workerThreadRunTime().toSystemDuration() * 1.5); - ASSERT_EQ(exec->threadsRunning(), config->reservedThreads()); -} - -/* - * This tests that the executor will start a new batch of reserved threads if it detects that - * all - * threads are running a task for longer than the stuckThreadTimeout. - */ -TEST_F(ServiceExecutorAdaptiveFixture, TestStuckThreads) { - auto blockedMutex = MONGO_MAKE_LATCH(); - stdx::unique_lock<Latch> blockedLock(blockedMutex); - - auto exec = makeAndStartExecutor<TestOptions>(); - auto guard = makeGuard([&] { - if (blockedLock) - blockedLock.unlock(); - ASSERT_OK(exec->shutdown(config->workerThreadRunTime() * 2)); - }); - - auto blockedTask = [this, &blockedMutex] { - LOGV2(22967, "waiting on blocked mutex"); - notifyCallback(); - stdx::unique_lock<Latch> lk(blockedMutex); - notifyCallback(); - }; - - waitFor.store(6); - auto tasks = waitFor.load() / 2; - LOGV2(22968, - "Scheduling {tasks} blocked tasks", - "Scheduling blocked tasks", - "tasks"_attr = tasks); - for (auto i = 0; i < tasks; i++) { - ASSERT_OK(exec->schedule(blockedTask, - ServiceExecutor::kEmptyFlags, - ServiceExecutorTaskName::kSSMProcessMessage)); - } - - LOGV2(22969, "Waiting for executor to start new threads"); - waitForCallback(3); - - LOGV2(22970, "All threads blocked, wait for executor to detect block and start a new thread."); - - // The controller thread in the adaptive executor runs on a stuckThreadTimeout in normal - // operation where no starvation is detected (shouldn't be in this test as all threads should be - // blocked). By waiting here for stuckThreadTimeout*3 it means that we have stuckThreadTimeout*2 - // for other waits in the controller and boot up a new thread which should be enough. - stdx::this_thread::sleep_for(config->stuckThreadTimeout().toSystemDuration() * 3); - - ASSERT_EQ(exec->threadsRunning(), waitFor.load() + config->reservedThreads()); - - LOGV2(22971, "Waiting for unstuck task to run"); - blockedLock.unlock(); - waitForCallback(0); -} - -/* - * This tests that the executor will launch more threads when starvation is detected. We launch - * another task from itself so there will always be a queue of a waiting task if there's just one - * thread. - */ -TEST_F(ServiceExecutorAdaptiveFixture, TestStarvation) { - auto exec = makeAndStartExecutor<TestOptions>(); - - // auto so = MONGO_MAKE_LATCH() we don't attempt to call schedule and shutdown concurrently - auto scheduleMutex = MONGO_MAKE_LATCH(); - - auto guard = makeGuard([&] { ASSERT_OK(exec->shutdown(config->workerThreadRunTime() * 2)); }); - - bool scheduleNew{true}; - - std::function<void()> task; - task = [this, &task, &exec, &scheduleMutex, &scheduleNew] { - // This sleep needs to be larger than the sleep below to be able to limit the amount of - // starvation. - stdx::this_thread::sleep_for(config->maxQueueLatency().toSystemDuration() * 5); - - { - stdx::unique_lock<Latch> lock(scheduleMutex); - - if (scheduleNew) { - ASSERT_OK(exec->schedule(task, - ServiceExecutor::kEmptyFlags, - ServiceExecutorTaskName::kSSMProcessMessage)); - } - } - - // This sleep needs to be larger than maxQueueLatency, when we schedule above the controller - // thread will wake up because starvation is detected. By the time the controller thread - // have slept for maxQueueLatency both worker threads should be executing work and there's - // no further starvation. It needs to be significantly larger to avoid a race with asio - // post. In the case of the first time when there's only one worker thread, starvation - // should be detected and the second worker will be started. - stdx::this_thread::sleep_for(config->maxQueueLatency().toSystemDuration() * 2); - }; - - ASSERT_OK(exec->schedule( - task, ServiceExecutor::kEmptyFlags, ServiceExecutorTaskName::kSSMProcessMessage)); - - stdx::this_thread::sleep_for(config->workerThreadRunTime().toSystemDuration() * 2); - ASSERT_EQ(exec->threadsRunning(), 2); - - stdx::unique_lock<Latch> lock(scheduleMutex); - scheduleNew = false; -} - -/* - * This tests that the executor can execute tasks recursively. If it can't starvation will be - * detected and new threads started. - */ -TEST_F(ServiceExecutorAdaptiveFixture, TestRecursion) { - auto exec = makeAndStartExecutor<RecursionOptions>(); - - AtomicWord<int> remainingTasks{config->recursionLimit() - 1}; - auto mutex = MONGO_MAKE_LATCH(); - stdx::condition_variable cv; - std::function<void()> task; - - auto guard = makeGuard([&] { ASSERT_OK(exec->shutdown(config->workerThreadRunTime() * 2)); }); - - task = [this, &task, &exec, &mutex, &cv, &remainingTasks] { - if (remainingTasks.subtractAndFetch(1) == 0) { - LOGV2(22972, "Signaling job done"); - cv.notify_one(); - return; - } - - LOGV2(22973, "Starting task recursively"); - - ASSERT_OK(exec->schedule( - task, ServiceExecutor::kMayRecurse, ServiceExecutorTaskName::kSSMProcessMessage)); - - // Make sure we don't block too long because then the block detection logic would kick in. - stdx::this_thread::sleep_for(config->stuckThreadTimeout().toSystemDuration() / - (config->recursionLimit() * 2)); - LOGV2(22974, "Completing task recursively"); - }; - - stdx::unique_lock<Latch> lock(mutex); - - ASSERT_OK(exec->schedule( - task, ServiceExecutor::kEmptyFlags, ServiceExecutorTaskName::kSSMProcessMessage)); - - cv.wait_for(lock, config->workerThreadRunTime().toSystemDuration(), [&remainingTasks]() { - return remainingTasks.load() == 0; - }); - - ASSERT_EQ(exec->threadsRunning(), config->reservedThreads()); -} - -/* - * This tests that deferred tasks don't cause a new thread to be created, and they don't - * interfere - * with new normal tasks - */ -TEST_F(ServiceExecutorAdaptiveFixture, TestDeferredTasks) { - auto blockedMutex = MONGO_MAKE_LATCH(); - stdx::unique_lock<Latch> blockedLock(blockedMutex); - - auto exec = makeAndStartExecutor<TestOptions>(); - auto guard = makeGuard([&] { - if (blockedLock) - blockedLock.unlock(); - ASSERT_OK(exec->shutdown(config->workerThreadRunTime() * 2)); - }); - - waitFor.store(3); - LOGV2(22975, "Scheduling a blocking task"); - ASSERT_OK(exec->schedule( - [this, &blockedMutex] { - stdx::unique_lock<Latch> lk(blockedMutex); - notifyCallback(); - }, - ServiceExecutor::kEmptyFlags, - ServiceExecutorTaskName::kSSMProcessMessage)); - - LOGV2(22976, "Scheduling deferred task"); - ASSERT_OK(exec->schedule(notifyCallback, - ServiceExecutor::kDeferredTask, - ServiceExecutorTaskName::kSSMProcessMessage)); - - ASSERT_THROWS(waitForCallback(1, config->stuckThreadTimeout()), - unittest::TestAssertionFailureException); - - LOGV2(22977, "Scheduling non-deferred task"); - ASSERT_OK(exec->schedule( - notifyCallback, ServiceExecutor::kEmptyFlags, ServiceExecutorTaskName::kSSMProcessMessage)); - waitForCallback(1, config->stuckThreadTimeout()); - ASSERT_GT(exec->threadsRunning(), config->reservedThreads()); - - blockedLock.unlock(); - waitForCallback(0); -} - -} // namespace -} // namespace mongo |