diff options
author | Luis Osta <luis.osta@mongodb.com> | 2021-11-22 18:42:41 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2021-11-22 19:31:45 +0000 |
commit | 6bf01cf8aa7f0a83ce7173de4ec7a310d7781f6d (patch) | |
tree | 4262584d0456e876fbfefd5845fe3905e62542d5 /src/mongo/executor | |
parent | 78ab98a46b53582a5e69424bbb92f25c483fec0a (diff) | |
download | mongo-6bf01cf8aa7f0a83ce7173de4ec7a310d7781f6d.tar.gz |
SERVER-61489 Revert the executor_with_mock_net_stress_test test
Diffstat (limited to 'src/mongo/executor')
-rw-r--r-- | src/mongo/executor/SConscript | 38 | ||||
-rw-r--r-- | src/mongo/executor/executor_stress_test_fixture.cpp | 232 | ||||
-rw-r--r-- | src/mongo/executor/executor_stress_test_fixture.h | 131 | ||||
-rw-r--r-- | src/mongo/executor/thread_pool_task_executor_with_mock_net_stress_test.cpp | 68 |
4 files changed, 1 insertions, 468 deletions
diff --git a/src/mongo/executor/SConscript b/src/mongo/executor/SConscript index 8bb67ffb50d..a382c094de8 100644 --- a/src/mongo/executor/SConscript +++ b/src/mongo/executor/SConscript @@ -251,28 +251,6 @@ env.Library( ) env.Library( - target='network_interface_mock_test_fixture', - source=[ - 'network_interface_mock_test_fixture.cpp', - ], - LIBDEPS=[ - 'task_executor_test_fixture', - 'thread_pool_task_executor', - ] -) - -env.Library( - target='executor_stress_test_fixture', - source=[ - 'executor_stress_test_fixture.cpp', - ], - LIBDEPS=[ - 'task_executor_test_fixture', - 'thread_pool_task_executor', - ] -) - -env.Library( target='task_executor_pool', source=[ 'task_executor_pool.cpp', @@ -317,6 +295,7 @@ env.CppUnitTest( 'connection_pool_test_fixture.cpp', 'mock_network_fixture_test.cpp', 'network_interface_mock_test.cpp', + 'network_interface_mock_test_fixture.cpp', 'scoped_task_executor_test.cpp', 'task_executor_cursor_test.cpp', 'thread_pool_task_executor_test.cpp', @@ -325,7 +304,6 @@ env.CppUnitTest( 'connection_pool_executor', 'egress_tag_closer_manager', 'network_interface_mock', - 'network_interface_mock_test_fixture', 'scoped_task_executor', 'task_executor_cursor', 'thread_pool_task_executor', @@ -333,20 +311,6 @@ env.CppUnitTest( ], ) -env.CppUnitTest( - target='executor_with_mock_net_stress_test', - source=[ - 'thread_pool_task_executor_with_mock_net_stress_test.cpp', - ], - LIBDEPS=[ - 'executor_stress_test_fixture', - 'network_interface_mock', - 'network_interface_mock_test_fixture', - 'thread_pool_task_executor', - 'thread_pool_task_executor_test_fixture', - ], -) - env.CppIntegrationTest( target='executor_integration_test', source=[ diff --git a/src/mongo/executor/executor_stress_test_fixture.cpp b/src/mongo/executor/executor_stress_test_fixture.cpp deleted file mode 100644 index 27a3d3aaff3..00000000000 --- a/src/mongo/executor/executor_stress_test_fixture.cpp +++ /dev/null @@ -1,232 +0,0 @@ -/** - * Copyright (C) 2021-present MongoDB, Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the Server Side Public License, version 1, - * as published by MongoDB, Inc. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * Server Side Public License for more details. - * - * You should have received a copy of the Server Side Public License - * along with this program. If not, see - * <http://www.mongodb.com/licensing/server-side-public-license>. - * - * As a special exception, the copyright holders give permission to link the - * code of portions of this program with the OpenSSL library under certain - * conditions as described in each individual source file and distribute - * linked combinations including the program with the OpenSSL library. You - * must comply with the Server Side Public License in all respects for - * all of the code used other than as permitted herein. If you modify file(s) - * with this exception, you may extend this exception to your version of the - * file(s), but you are not obligated to do so. If you do not wish to do so, - * delete this exception statement from your version. If you delete this - * exception statement from all source files in the program, then also delete - * it in the license file. - */ -#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kTest - -#include "mongo/executor/executor_stress_test_fixture.h" -#include "mongo/logv2/log.h" -#include "mongo/platform/random.h" -#include "mongo/rpc/topology_version_gen.h" -#include "mongo/unittest/integration_test.h" -#include "mongo/util/net/hostandport.h" - -namespace mongo { -namespace executor { - -ThreadPoolExecutorStressTestEngine::ThreadPoolExecutorStressTestEngine( - std::shared_ptr<TaskExecutor> executor, - boost::optional<NetworkInterfaceMock*> netMock, - Milliseconds waitBeforeTermination) - : _executor(std::move(executor)), - _netMock(netMock), - _waitBeforeTermination(waitBeforeTermination) { - _timer.reset(); - _terminate.store(false); - _threadAssertionMonitor.emplace(); - if (_netMock) { - // If we use mocked Network, start a thread that answers pending requests. - _addMockNetworkResponseThread(); - } -} - -void ThreadPoolExecutorStressTestEngine::addSimpleSchedulingThreads(int count) { - auto schedulerTask = [this] { - while (!_terminate.load()) { - CopyableCallback work = [this](const TaskExecutor::CallbackArgs&) { - _completedWorks.increment(); - }; - const auto swCb = _executor->scheduleWork(work); - if (!swCb.isOK()) { - ASSERT_TRUE(_terminate.load()) - << "Scheduling failed before termination flag was set"; - ASSERT_TRUE(_executor->isShuttingDown()) - << "Scheduling failed before executor was shut down"; - } else { - auto lk = stdx::lock_guard(_mutex); - _callbacks.push_back(swCb.getValue()); - } - while (!_terminate.load()) { - sleepFor(kDurationBetweenSimpleSchedulings); - auto lk = stdx::lock_guard(_mutex); - if (_callbacks.size() < kMaxCallbacks) { - break; - } - } - } - }; - - auto lk = stdx::lock_guard(_mutex); - for (int i = 0; i < count; ++i) { - // _monitor is an instance of `ThreadAssertionMonitor` - _threads.emplace_back(_threadAssertionMonitor->spawn([schedulerTask] { schedulerTask(); })); - } -} - -void ThreadPoolExecutorStressTestEngine::addRandomCancelationThreads(int count) { - auto cancelationTask = [this] { - while (true) { - TaskExecutor::CallbackHandle cb; - { - auto lk = stdx::lock_guard(_mutex); - while ((_callbacks.size() > 100 || _terminate.load()) && !cb) { - cb = std::move(_callbacks.front()); - _callbacks.pop_front(); - } - } - - if (auto shouldCancel = nextRandomInt32(2) == 0; shouldCancel && cb) { - _executor->cancel(cb); - } else if (cb) { - _executor->wait(cb); - } - if (_terminate.load()) { - stdx::lock_guard<Latch> lk(_mutex); - if (_callbacks.empty()) { - break; - } - } - } - }; - - auto lk = stdx::lock_guard(_mutex); - for (int i = 0; i < count; ++i) { - _threads.emplace_back(cancelationTask); - } -} - -void ThreadPoolExecutorStressTestEngine::addScheduleRemoteCommandThreads(int count) { - auto remoteSchedulerTask = [this] { - RemoteCommandRequest rcr(HostAndPort("localhost"), "test", BSONObj(), nullptr); - while (!_terminate.load()) { - const auto swCb = _executor->scheduleRemoteCommand( - rcr, [this](const TaskExecutor::RemoteCommandCallbackArgs& ca) { - if (ca.response.status.isOK()) { - _commandsSucceeded.increment(); - } else { - _commandsFailed.increment(); - } - }); - if (!swCb.isOK()) { - // This race can happen only at termination. - ASSERT_TRUE(_terminate.load()) - << "Scheduling failed before termination flag was set"; - ASSERT_TRUE(_executor->isShuttingDown()) - << "Scheduling failed before executor was shut down"; - } - - { - stdx::lock_guard<Latch> lk(_mutex); - _callbacks.push_back(swCb.getValue()); - } - sleepFor(kDurationBetweenRemoteCommands); - } - }; - - auto lk = stdx::lock_guard(_mutex); - for (auto i = 0; i < count; ++i) { - _threads.emplace_back( - _threadAssertionMonitor->spawn([remoteSchedulerTask] { remoteSchedulerTask(); })); - } -} - -void ThreadPoolExecutorStressTestEngine::_addMockNetworkResponseThread() { - if (!_netMock) { - return; // Nothing to do if not mock. - } - stdx::lock_guard<Latch> lk(_mutex); - _threads.emplace_back([this] { - while (true) { - { - NetworkInterfaceMock::InNetworkGuard ing(*_netMock); - while ((*_netMock)->hasReadyRequests()) { - (*_netMock)->scheduleSuccessfulResponse(BSONObj()); - (*_netMock)->runReadyNetworkOperations(); - } - } - sleepFor(kDurationBetweenMockedResponses); - // Network response thread must wait until all callbacks are cleared. - if (_terminate.load()) { - stdx::lock_guard<Latch> lk(_mutex); - if (_callbacks.empty()) { - break; - } - } - } - }); -} - -void ThreadPoolExecutorStressTestEngine::waitAndCleanup() { - while (true) { - sleepFor(Milliseconds(500)); - int64_t threadsSize; - int64_t pendingRequestsSize; - { - stdx::lock_guard<Latch> lk(_mutex); - threadsSize = _threads.size(); - pendingRequestsSize = _callbacks.size(); - } - LOGV2(5822101, - "Waiting for test termination", - "completed"_attr = _completedWorks.get(), - "commandsSucceeded"_attr = _commandsSucceeded.get(), - "commandsFailed"_attr = _commandsFailed.get(), - "threadsSize"_attr = threadsSize, - "pendingRequestsSize"_attr = pendingRequestsSize); - if (_timer.elapsed() > _waitBeforeTermination) { - break; - } - } - - _terminate.store(true); - - std::list<stdx::thread> threads; - { - auto lk = stdx::lock_guard(_mutex); - std::swap(threads, _threads); - } - - while (!threads.empty()) { - threads.front().join(); - threads.pop_front(); - } - - _executor->shutdown(); - _executor->join(); - - _threadAssertionMonitor->notifyDone(); - _threadAssertionMonitor.reset(); -} - -int32_t ThreadPoolExecutorStressTestEngine::nextRandomInt32(int32_t max) { - static thread_local PseudoRandom random(SecureRandom().nextInt64()); - return random.nextInt32(max); -} - - -} // namespace executor -} // namespace mongo diff --git a/src/mongo/executor/executor_stress_test_fixture.h b/src/mongo/executor/executor_stress_test_fixture.h deleted file mode 100644 index d54f6e25658..00000000000 --- a/src/mongo/executor/executor_stress_test_fixture.h +++ /dev/null @@ -1,131 +0,0 @@ -/** - * Copyright (C) 2021-present MongoDB, Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the Server Side Public License, version 1, - * as published by MongoDB, Inc. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * Server Side Public License for more details. - * - * You should have received a copy of the Server Side Public License - * along with this program. If not, see - * <http://www.mongodb.com/licensing/server-side-public-license>. - * - * As a special exception, the copyright holders give permission to link the - * code of portions of this program with the OpenSSL library under certain - * conditions as described in each individual source file and distribute - * linked combinations including the program with the OpenSSL library. You - * must comply with the Server Side Public License in all respects for - * all of the code used other than as permitted herein. If you modify file(s) - * with this exception, you may extend this exception to your version of the - * file(s), but you are not obligated to do so. If you do not wish to do so, - * delete this exception statement from your version. If you delete this - * exception statement from all source files in the program, then also delete - * it in the license file. - */ - -#pragma once - -#include "mongo/base/counter.h" -#include "mongo/executor/thread_pool_task_executor_test_fixture.h" -#include "mongo/platform/atomic_word.h" -#include "mongo/unittest/thread_assertion_monitor.h" -#include "mongo/util/duration.h" - -namespace mongo { -namespace executor { - -/** - * Stress test common implementation methods. - */ -class ThreadPoolExecutorStressTestEngine { -public: - using CopyableCallback = std::function<void(const TaskExecutor::CallbackArgs&)>; - - // Adjust periods based on statistical reports if needed. - static constexpr auto kDurationBetweenSimpleSchedulings{Microseconds(40)}; - static constexpr auto kDurationBetweenRemoteCommands{Microseconds(10)}; - static constexpr auto kDurationBetweenMockedResponses{Microseconds(10)}; - static constexpr auto kMaxCallbacks = 100 * 1000; - - ThreadPoolExecutorStressTestEngine(std::shared_ptr<TaskExecutor> executor, - boost::optional<NetworkInterfaceMock*> netMock, - Milliseconds waitBeforeTermination); - - // Add various stress test threads. In each case 'count' is the count of new threads to add. - - void addSimpleSchedulingThreads(int count); - - void addRandomCancelationThreads(int count); - - void addScheduleRemoteCommandThreads(int count); - - // Waits for test termination, then sets termination flag and blocks until all threads - // are terminated. - void waitAndCleanup(); - -private: - void _addMockNetworkResponseThread(); - - static int32_t nextRandomInt32(int32_t max); - - const std::shared_ptr<TaskExecutor> _executor; - const boost::optional<NetworkInterfaceMock*> _netMock; - - // Statistics. - Counter64 _completedWorks; - Counter64 _commandsSucceeded; - Counter64 _commandsFailed; - - AtomicWord<bool> _terminate; // Termination flag. - Timer _timer; - Milliseconds _waitBeforeTermination; - boost::optional<unittest::ThreadAssertionMonitor> _threadAssertionMonitor; - - Mutex _mutex = MONGO_MAKE_LATCH("ThreadPoolExecutorMockNetStressTest::_mutex"); - std::list<stdx::thread> _threads; - std::deque<TaskExecutor::CallbackHandle> _callbacks; -}; - -/** - * Stress test suite using mock thread pool and mock Net interface. - */ -class ThreadPoolExecutorMockNetStressTest : public ThreadPoolExecutorTest { -public: - void setUp() override { - ThreadPoolExecutorTest::setUp(); - getExecutor().startup(); - _engine = std::make_unique<ThreadPoolExecutorStressTestEngine>( - getExecutor().shared_from_this(), getNet(), Seconds(2)); - } - - void tearDown() override { - waitAndCleanup(); - ThreadPoolExecutorTest::tearDown(); - } - - void addSimpleSchedulingThreads(int count) { - _engine->addSimpleSchedulingThreads(count); - } - - void addRandomCancelationThreads(int count) { - _engine->addRandomCancelationThreads(count); - } - - void addScheduleRemoteCommandThreads(int count) { - _engine->addScheduleRemoteCommandThreads(count); - } - - void waitAndCleanup() { - _engine->waitAndCleanup(); - } - -private: - std::unique_ptr<ThreadPoolExecutorStressTestEngine> _engine; -}; - -} // namespace executor -} // namespace mongo diff --git a/src/mongo/executor/thread_pool_task_executor_with_mock_net_stress_test.cpp b/src/mongo/executor/thread_pool_task_executor_with_mock_net_stress_test.cpp deleted file mode 100644 index 51544792dca..00000000000 --- a/src/mongo/executor/thread_pool_task_executor_with_mock_net_stress_test.cpp +++ /dev/null @@ -1,68 +0,0 @@ -/** - * Copyright (C) 2021-present MongoDB, Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the Server Side Public License, version 1, - * as published by MongoDB, Inc. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * Server Side Public License for more details. - * - * You should have received a copy of the Server Side Public License - * along with this program. If not, see - * <http://www.mongodb.com/licensing/server-side-public-license>. - * - * As a special exception, the copyright holders give permission to link the - * code of portions of this program with the OpenSSL library under certain - * conditions as described in each individual source file and distribute - * linked combinations including the program with the OpenSSL library. You - * must comply with the Server Side Public License in all respects for - * all of the code used other than as permitted herein. If you modify file(s) - * with this exception, you may extend this exception to your version of the - * file(s), but you are not obligated to do so. If you do not wish to do so, - * delete this exception statement from your version. If you delete this - * exception statement from all source files in the program, then also delete - * it in the license file. - */ - -#include "mongo/platform/basic.h" - -#include <memory> - -#include "mongo/base/init.h" -#include "mongo/base/status.h" -#include "mongo/executor/executor_stress_test_fixture.h" -#include "mongo/executor/task_executor_test_common.h" -#include "mongo/executor/task_executor_test_fixture.h" -#include "mongo/executor/thread_pool_task_executor.h" -#include "mongo/executor/thread_pool_task_executor_test_fixture.h" - -namespace mongo { -namespace executor { -namespace { - -MONGO_INITIALIZER(ThreadPoolExecutorCommonTests)(InitializerContext*) { - addTestsForExecutor("ThreadPoolExecutorCommon", [](std::unique_ptr<NetworkInterfaceMock> net) { - return makeSharedThreadPoolTestExecutor(std::move(net)); - }); -} - -TEST_F(ThreadPoolExecutorMockNetStressTest, StressTest) { -#if defined(__APPLE__) && defined(__aarch64__) - // TODO: Fix this test under mac os arm64. -#else - // The idea is to have as much concurrency in the 'executor' as possible. However adding - // too many threads increases contention in the ThreadPoolExecutorStressTestEngine - // _mutex. Check the stats printed by the waitAndCleanup() for results. Apparently the - // executor saturates very fast to some extend the thread count has very limited influence. - addSimpleSchedulingThreads(100); - addRandomCancelationThreads(20); - addScheduleRemoteCommandThreads(100); -#endif // defined(__APPLE__) && defined(__aarch64__) -} - -} // namespace -} // namespace executor -} // namespace mongo |