diff options
author | Andrew Shuvalov <andrew.shuvalov@mongodb.com> | 2021-07-26 14:29:51 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2021-07-26 14:51:52 +0000 |
commit | 2f5709e68f0acc28448d90612536c0c7d22b0c0c (patch) | |
tree | 6d5cc98b2ef1c328fc8fe8f78ccbad33cc4a8644 /src/mongo/executor | |
parent | 0588dd589451ce5762035f68b81d12b82482885c (diff) | |
download | mongo-2f5709e68f0acc28448d90612536c0c7d22b0c0c.tar.gz |
SERVER-58221: stress test for ThreadPoolExecutor
Diffstat (limited to 'src/mongo/executor')
-rw-r--r-- | src/mongo/executor/SConscript | 38 | ||||
-rw-r--r-- | src/mongo/executor/executor_stress_test_fixture.cpp | 208 | ||||
-rw-r--r-- | src/mongo/executor/executor_stress_test_fixture.h | 130 | ||||
-rw-r--r-- | src/mongo/executor/thread_pool_task_executor_with_mock_net_stress_test.cpp | 64 |
4 files changed, 439 insertions, 1 deletions
diff --git a/src/mongo/executor/SConscript b/src/mongo/executor/SConscript index a382c094de8..8bb67ffb50d 100644 --- a/src/mongo/executor/SConscript +++ b/src/mongo/executor/SConscript @@ -251,6 +251,28 @@ env.Library( ) env.Library( + target='network_interface_mock_test_fixture', + source=[ + 'network_interface_mock_test_fixture.cpp', + ], + LIBDEPS=[ + 'task_executor_test_fixture', + 'thread_pool_task_executor', + ] +) + +env.Library( + target='executor_stress_test_fixture', + source=[ + 'executor_stress_test_fixture.cpp', + ], + LIBDEPS=[ + 'task_executor_test_fixture', + 'thread_pool_task_executor', + ] +) + +env.Library( target='task_executor_pool', source=[ 'task_executor_pool.cpp', @@ -295,7 +317,6 @@ env.CppUnitTest( 'connection_pool_test_fixture.cpp', 'mock_network_fixture_test.cpp', 'network_interface_mock_test.cpp', - 'network_interface_mock_test_fixture.cpp', 'scoped_task_executor_test.cpp', 'task_executor_cursor_test.cpp', 'thread_pool_task_executor_test.cpp', @@ -304,6 +325,7 @@ env.CppUnitTest( 'connection_pool_executor', 'egress_tag_closer_manager', 'network_interface_mock', + 'network_interface_mock_test_fixture', 'scoped_task_executor', 'task_executor_cursor', 'thread_pool_task_executor', @@ -311,6 +333,20 @@ env.CppUnitTest( ], ) +env.CppUnitTest( + target='executor_with_mock_net_stress_test', + source=[ + 'thread_pool_task_executor_with_mock_net_stress_test.cpp', + ], + LIBDEPS=[ + 'executor_stress_test_fixture', + 'network_interface_mock', + 'network_interface_mock_test_fixture', + 'thread_pool_task_executor', + 'thread_pool_task_executor_test_fixture', + ], +) + env.CppIntegrationTest( target='executor_integration_test', source=[ diff --git a/src/mongo/executor/executor_stress_test_fixture.cpp b/src/mongo/executor/executor_stress_test_fixture.cpp new file mode 100644 index 00000000000..46e6ca3ed81 --- /dev/null +++ b/src/mongo/executor/executor_stress_test_fixture.cpp @@ -0,0 +1,208 @@ +/** + * Copyright (C) 2021-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ +#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kTest + +#include "mongo/executor/executor_stress_test_fixture.h" +#include "mongo/logv2/log.h" +#include "mongo/rpc/topology_version_gen.h" +#include "mongo/unittest/integration_test.h" +#include "mongo/util/net/hostandport.h" + +namespace mongo { +namespace executor { + +ThreadPoolExecutorStressTestEngine::ThreadPoolExecutorStressTestEngine( + std::shared_ptr<TaskExecutor> executor, + boost::optional<NetworkInterfaceMock*> netMock, + Milliseconds waitBeforeTermination) + : _executor(std::move(executor)), + _netMock(netMock), + _random(SecureRandom().nextInt64()), + _waitBeforeTermination(waitBeforeTermination) { + _timer.reset(); + _terminate.store(false); + _threadAssertionMonitor.emplace(); + if (_netMock) { + // If we use mocked Network, start a thread that answers pending requests. + _addMockNetworkResponseThread(); + } +} + +void ThreadPoolExecutorStressTestEngine::addSimpleSchedulingThreads(int count) { + auto schedulerTask = [this] { + while (!_terminate.load()) { + CopyableCallback work = [this](const TaskExecutor::CallbackArgs&) { + _completedWorks.increment(); + }; + const auto swCb = _executor->scheduleWork(work); + if (!swCb.isOK()) { + ASSERT_TRUE(_terminate.load()) + << "Scheduling failed before termination flag was set"; + ASSERT_TRUE(_executor->isShuttingDown()) + << "Scheduling failed before executor was shut down"; + } else { + auto lk = stdx::lock_guard(_mutex); + _callbacks.push_back(swCb.getValue()); + } + sleepFor(kDurationBetweenSimpleSchedulings); + } + }; + + auto lk = stdx::lock_guard(_mutex); + for (int i = 0; i < count; ++i) { + // _monitor is an instance of `ThreadAssertionMonitor` + _threads.emplace_back(_threadAssertionMonitor->spawn([schedulerTask] { schedulerTask(); })); + } +} + +void ThreadPoolExecutorStressTestEngine::addRandomCancelationThreads(int count) { + auto cancelationTask = [this] { + while (!_terminate.load()) { + TaskExecutor::CallbackHandle cb; + { + auto lk = stdx::lock_guard(_mutex); + while (_callbacks.size() > 100 && !cb) { + cb = std::move(_callbacks.front()); + _callbacks.pop_front(); + } + } + + if (auto shouldCancel = _random.nextInt32(2) == 0; shouldCancel && cb) { + _executor->cancel(cb); + } else if (cb) { + _executor->wait(cb); + } + } + }; + + auto lk = stdx::lock_guard(_mutex); + for (int i = 0; i < count; ++i) { + _threads.emplace_back(cancelationTask); + } +} + +void ThreadPoolExecutorStressTestEngine::addScheduleRemoteCommandThreads(int count) { + auto remoteSchedulerTask = [this] { + RemoteCommandRequest rcr(HostAndPort("localhost"), "test", BSONObj(), nullptr); + while (!_terminate.load()) { + const auto swCb = _executor->scheduleRemoteCommand( + rcr, [this](const TaskExecutor::RemoteCommandCallbackArgs& ca) { + if (ca.response.status.isOK()) { + _commandsSucceeded.increment(); + } else { + _commandsFailed.increment(); + } + }); + if (!swCb.isOK()) { + // This race can happen only at termination. + ASSERT_TRUE(_terminate.load()) + << "Scheduling failed before termination flag was set"; + ASSERT_TRUE(_executor->isShuttingDown()) + << "Scheduling failed before executor was shut down"; + } + + { + stdx::lock_guard<Latch> lk(_mutex); + _callbacks.push_back(swCb.getValue()); + } + sleepFor(kDurationBetweenRemoteCommands); + } + }; + + auto lk = stdx::lock_guard(_mutex); + for (auto i = 0; i < count; ++i) { + _threads.emplace_back( + _threadAssertionMonitor->spawn([remoteSchedulerTask] { remoteSchedulerTask(); })); + } +} + +void ThreadPoolExecutorStressTestEngine::_addMockNetworkResponseThread() { + if (!_netMock) { + return; // Nothing to do if not mock. + } + stdx::lock_guard<Latch> lk(_mutex); + _threads.emplace_back([this] { + while (!_terminate.load()) { + { + NetworkInterfaceMock::InNetworkGuard ing(*_netMock); + while ((*_netMock)->hasReadyRequests()) { + (*_netMock)->scheduleSuccessfulResponse(BSONObj()); + (*_netMock)->runReadyNetworkOperations(); + } + } + sleepFor(kDurationBetweenMockedResponses); + } + }); +} + +void ThreadPoolExecutorStressTestEngine::waitAndCleanup() { + while (true) { + sleepFor(Milliseconds(500)); + int64_t threadsSize; + int64_t pendingRequestsSize; + { + stdx::lock_guard<Latch> lk(_mutex); + threadsSize = _threads.size(); + pendingRequestsSize = _callbacks.size(); + } + LOGV2(5822101, + "Waiting for test termination", + "completed"_attr = _completedWorks.get(), + "commandsSucceeded"_attr = _commandsSucceeded.get(), + "commandsFailed"_attr = _commandsFailed.get(), + "threadsSize"_attr = threadsSize, + "pendingRequestsSize"_attr = pendingRequestsSize); + if (_timer.elapsed() > _waitBeforeTermination) { + break; + } + } + + _terminate.store(true); + + std::list<stdx::thread> threads; + { + auto lk = stdx::lock_guard(_mutex); + std::swap(threads, _threads); + } + + while (!threads.empty()) { + threads.front().join(); + threads.pop_front(); + } + + _executor->shutdown(); + _executor->join(); + + _threadAssertionMonitor->notifyDone(); + _threadAssertionMonitor.reset(); +} + + +} // namespace executor +} // namespace mongo diff --git a/src/mongo/executor/executor_stress_test_fixture.h b/src/mongo/executor/executor_stress_test_fixture.h new file mode 100644 index 00000000000..6a31d725a12 --- /dev/null +++ b/src/mongo/executor/executor_stress_test_fixture.h @@ -0,0 +1,130 @@ +/** + * Copyright (C) 2021-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include "mongo/base/counter.h" +#include "mongo/executor/thread_pool_task_executor_test_fixture.h" +#include "mongo/platform/atomic_word.h" +#include "mongo/platform/random.h" +#include "mongo/unittest/thread_assertion_monitor.h" +#include "mongo/util/duration.h" + +namespace mongo { +namespace executor { + +/** + * Stress test common implementation methods. + */ +class ThreadPoolExecutorStressTestEngine { +public: + using CopyableCallback = std::function<void(const TaskExecutor::CallbackArgs&)>; + + // Adjust periods based on statistical reports if needed. + static constexpr auto kDurationBetweenSimpleSchedulings{Microseconds(40)}; + static constexpr auto kDurationBetweenRemoteCommands{Microseconds(10)}; + static constexpr auto kDurationBetweenMockedResponses{Microseconds(10)}; + + ThreadPoolExecutorStressTestEngine(std::shared_ptr<TaskExecutor> executor, + boost::optional<NetworkInterfaceMock*> netMock, + Milliseconds waitBeforeTermination); + + // Add various stress test threads. In each case 'count' is the count of new threads to add. + + void addSimpleSchedulingThreads(int count); + + void addRandomCancelationThreads(int count); + + void addScheduleRemoteCommandThreads(int count); + + // Waits for test termination, then sets termination flag and blocks until all threads + // are terminated. + void waitAndCleanup(); + +private: + void _addMockNetworkResponseThread(); + + const std::shared_ptr<TaskExecutor> _executor; + const boost::optional<NetworkInterfaceMock*> _netMock; + PseudoRandom _random; + + // Statistics. + Counter64 _completedWorks; + Counter64 _commandsSucceeded; + Counter64 _commandsFailed; + + AtomicWord<bool> _terminate; // Termination flag. + Timer _timer; + Milliseconds _waitBeforeTermination; + boost::optional<unittest::ThreadAssertionMonitor> _threadAssertionMonitor; + + Mutex _mutex = MONGO_MAKE_LATCH("ThreadPoolExecutorMockNetStressTest::_mutex"); + std::list<stdx::thread> _threads; + std::deque<TaskExecutor::CallbackHandle> _callbacks; +}; + +/** + * Stress test suite using mock thread pool and mock Net interface. + */ +class ThreadPoolExecutorMockNetStressTest : public ThreadPoolExecutorTest { +public: + void setUp() override { + ThreadPoolExecutorTest::setUp(); + getExecutor().startup(); + _engine = std::make_unique<ThreadPoolExecutorStressTestEngine>( + getExecutor().shared_from_this(), getNet(), Seconds(2)); + } + + void tearDown() override { + waitAndCleanup(); + ThreadPoolExecutorTest::tearDown(); + } + + void addSimpleSchedulingThreads(int count) { + _engine->addSimpleSchedulingThreads(count); + } + + void addRandomCancelationThreads(int count) { + _engine->addRandomCancelationThreads(count); + } + + void addScheduleRemoteCommandThreads(int count) { + _engine->addScheduleRemoteCommandThreads(count); + } + + void waitAndCleanup() { + _engine->waitAndCleanup(); + } + +private: + std::unique_ptr<ThreadPoolExecutorStressTestEngine> _engine; +}; + +} // namespace executor +} // namespace mongo diff --git a/src/mongo/executor/thread_pool_task_executor_with_mock_net_stress_test.cpp b/src/mongo/executor/thread_pool_task_executor_with_mock_net_stress_test.cpp new file mode 100644 index 00000000000..ad8c983cbce --- /dev/null +++ b/src/mongo/executor/thread_pool_task_executor_with_mock_net_stress_test.cpp @@ -0,0 +1,64 @@ +/** + * Copyright (C) 2021-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/platform/basic.h" + +#include <memory> + +#include "mongo/base/init.h" +#include "mongo/base/status.h" +#include "mongo/executor/executor_stress_test_fixture.h" +#include "mongo/executor/task_executor_test_common.h" +#include "mongo/executor/task_executor_test_fixture.h" +#include "mongo/executor/thread_pool_task_executor.h" +#include "mongo/executor/thread_pool_task_executor_test_fixture.h" + +namespace mongo { +namespace executor { +namespace { + +MONGO_INITIALIZER(ThreadPoolExecutorCommonTests)(InitializerContext*) { + addTestsForExecutor("ThreadPoolExecutorCommon", [](std::unique_ptr<NetworkInterfaceMock> net) { + return makeSharedThreadPoolTestExecutor(std::move(net)); + }); +} + +TEST_F(ThreadPoolExecutorMockNetStressTest, StressTest) { + // The idea is to have as much concurrency in the 'executor' as possible. However adding + // too many threads increases contention in the ThreadPoolExecutorStressTestEngine + // _mutex. Check the stats printed by the waitAndCleanup() for results. Apparently the + // executor saturates very fast to some extend the thread count has very limited influence. + addSimpleSchedulingThreads(100); + addRandomCancelationThreads(20); + addScheduleRemoteCommandThreads(100); +} + +} // namespace +} // namespace executor +} // namespace mongo |