summaryrefslogtreecommitdiff
path: root/src/mongo/executor
diff options
context:
space:
mode:
authorLuis Osta <luis.osta@mongodb.com>2021-11-22 18:42:41 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2021-11-22 19:31:45 +0000
commit6bf01cf8aa7f0a83ce7173de4ec7a310d7781f6d (patch)
tree4262584d0456e876fbfefd5845fe3905e62542d5 /src/mongo/executor
parent78ab98a46b53582a5e69424bbb92f25c483fec0a (diff)
downloadmongo-6bf01cf8aa7f0a83ce7173de4ec7a310d7781f6d.tar.gz
SERVER-61489 Revert the executor_with_mock_net_stress_test test
Diffstat (limited to 'src/mongo/executor')
-rw-r--r--src/mongo/executor/SConscript38
-rw-r--r--src/mongo/executor/executor_stress_test_fixture.cpp232
-rw-r--r--src/mongo/executor/executor_stress_test_fixture.h131
-rw-r--r--src/mongo/executor/thread_pool_task_executor_with_mock_net_stress_test.cpp68
4 files changed, 1 insertions, 468 deletions
diff --git a/src/mongo/executor/SConscript b/src/mongo/executor/SConscript
index 8bb67ffb50d..a382c094de8 100644
--- a/src/mongo/executor/SConscript
+++ b/src/mongo/executor/SConscript
@@ -251,28 +251,6 @@ env.Library(
)
env.Library(
- target='network_interface_mock_test_fixture',
- source=[
- 'network_interface_mock_test_fixture.cpp',
- ],
- LIBDEPS=[
- 'task_executor_test_fixture',
- 'thread_pool_task_executor',
- ]
-)
-
-env.Library(
- target='executor_stress_test_fixture',
- source=[
- 'executor_stress_test_fixture.cpp',
- ],
- LIBDEPS=[
- 'task_executor_test_fixture',
- 'thread_pool_task_executor',
- ]
-)
-
-env.Library(
target='task_executor_pool',
source=[
'task_executor_pool.cpp',
@@ -317,6 +295,7 @@ env.CppUnitTest(
'connection_pool_test_fixture.cpp',
'mock_network_fixture_test.cpp',
'network_interface_mock_test.cpp',
+ 'network_interface_mock_test_fixture.cpp',
'scoped_task_executor_test.cpp',
'task_executor_cursor_test.cpp',
'thread_pool_task_executor_test.cpp',
@@ -325,7 +304,6 @@ env.CppUnitTest(
'connection_pool_executor',
'egress_tag_closer_manager',
'network_interface_mock',
- 'network_interface_mock_test_fixture',
'scoped_task_executor',
'task_executor_cursor',
'thread_pool_task_executor',
@@ -333,20 +311,6 @@ env.CppUnitTest(
],
)
-env.CppUnitTest(
- target='executor_with_mock_net_stress_test',
- source=[
- 'thread_pool_task_executor_with_mock_net_stress_test.cpp',
- ],
- LIBDEPS=[
- 'executor_stress_test_fixture',
- 'network_interface_mock',
- 'network_interface_mock_test_fixture',
- 'thread_pool_task_executor',
- 'thread_pool_task_executor_test_fixture',
- ],
-)
-
env.CppIntegrationTest(
target='executor_integration_test',
source=[
diff --git a/src/mongo/executor/executor_stress_test_fixture.cpp b/src/mongo/executor/executor_stress_test_fixture.cpp
deleted file mode 100644
index 27a3d3aaff3..00000000000
--- a/src/mongo/executor/executor_stress_test_fixture.cpp
+++ /dev/null
@@ -1,232 +0,0 @@
-/**
- * Copyright (C) 2021-present MongoDB, Inc.
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the Server Side Public License, version 1,
- * as published by MongoDB, Inc.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * Server Side Public License for more details.
- *
- * You should have received a copy of the Server Side Public License
- * along with this program. If not, see
- * <http://www.mongodb.com/licensing/server-side-public-license>.
- *
- * As a special exception, the copyright holders give permission to link the
- * code of portions of this program with the OpenSSL library under certain
- * conditions as described in each individual source file and distribute
- * linked combinations including the program with the OpenSSL library. You
- * must comply with the Server Side Public License in all respects for
- * all of the code used other than as permitted herein. If you modify file(s)
- * with this exception, you may extend this exception to your version of the
- * file(s), but you are not obligated to do so. If you do not wish to do so,
- * delete this exception statement from your version. If you delete this
- * exception statement from all source files in the program, then also delete
- * it in the license file.
- */
-#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kTest
-
-#include "mongo/executor/executor_stress_test_fixture.h"
-#include "mongo/logv2/log.h"
-#include "mongo/platform/random.h"
-#include "mongo/rpc/topology_version_gen.h"
-#include "mongo/unittest/integration_test.h"
-#include "mongo/util/net/hostandport.h"
-
-namespace mongo {
-namespace executor {
-
-ThreadPoolExecutorStressTestEngine::ThreadPoolExecutorStressTestEngine(
- std::shared_ptr<TaskExecutor> executor,
- boost::optional<NetworkInterfaceMock*> netMock,
- Milliseconds waitBeforeTermination)
- : _executor(std::move(executor)),
- _netMock(netMock),
- _waitBeforeTermination(waitBeforeTermination) {
- _timer.reset();
- _terminate.store(false);
- _threadAssertionMonitor.emplace();
- if (_netMock) {
- // If we use mocked Network, start a thread that answers pending requests.
- _addMockNetworkResponseThread();
- }
-}
-
-void ThreadPoolExecutorStressTestEngine::addSimpleSchedulingThreads(int count) {
- auto schedulerTask = [this] {
- while (!_terminate.load()) {
- CopyableCallback work = [this](const TaskExecutor::CallbackArgs&) {
- _completedWorks.increment();
- };
- const auto swCb = _executor->scheduleWork(work);
- if (!swCb.isOK()) {
- ASSERT_TRUE(_terminate.load())
- << "Scheduling failed before termination flag was set";
- ASSERT_TRUE(_executor->isShuttingDown())
- << "Scheduling failed before executor was shut down";
- } else {
- auto lk = stdx::lock_guard(_mutex);
- _callbacks.push_back(swCb.getValue());
- }
- while (!_terminate.load()) {
- sleepFor(kDurationBetweenSimpleSchedulings);
- auto lk = stdx::lock_guard(_mutex);
- if (_callbacks.size() < kMaxCallbacks) {
- break;
- }
- }
- }
- };
-
- auto lk = stdx::lock_guard(_mutex);
- for (int i = 0; i < count; ++i) {
- // _monitor is an instance of `ThreadAssertionMonitor`
- _threads.emplace_back(_threadAssertionMonitor->spawn([schedulerTask] { schedulerTask(); }));
- }
-}
-
-void ThreadPoolExecutorStressTestEngine::addRandomCancelationThreads(int count) {
- auto cancelationTask = [this] {
- while (true) {
- TaskExecutor::CallbackHandle cb;
- {
- auto lk = stdx::lock_guard(_mutex);
- while ((_callbacks.size() > 100 || _terminate.load()) && !cb) {
- cb = std::move(_callbacks.front());
- _callbacks.pop_front();
- }
- }
-
- if (auto shouldCancel = nextRandomInt32(2) == 0; shouldCancel && cb) {
- _executor->cancel(cb);
- } else if (cb) {
- _executor->wait(cb);
- }
- if (_terminate.load()) {
- stdx::lock_guard<Latch> lk(_mutex);
- if (_callbacks.empty()) {
- break;
- }
- }
- }
- };
-
- auto lk = stdx::lock_guard(_mutex);
- for (int i = 0; i < count; ++i) {
- _threads.emplace_back(cancelationTask);
- }
-}
-
-void ThreadPoolExecutorStressTestEngine::addScheduleRemoteCommandThreads(int count) {
- auto remoteSchedulerTask = [this] {
- RemoteCommandRequest rcr(HostAndPort("localhost"), "test", BSONObj(), nullptr);
- while (!_terminate.load()) {
- const auto swCb = _executor->scheduleRemoteCommand(
- rcr, [this](const TaskExecutor::RemoteCommandCallbackArgs& ca) {
- if (ca.response.status.isOK()) {
- _commandsSucceeded.increment();
- } else {
- _commandsFailed.increment();
- }
- });
- if (!swCb.isOK()) {
- // This race can happen only at termination.
- ASSERT_TRUE(_terminate.load())
- << "Scheduling failed before termination flag was set";
- ASSERT_TRUE(_executor->isShuttingDown())
- << "Scheduling failed before executor was shut down";
- }
-
- {
- stdx::lock_guard<Latch> lk(_mutex);
- _callbacks.push_back(swCb.getValue());
- }
- sleepFor(kDurationBetweenRemoteCommands);
- }
- };
-
- auto lk = stdx::lock_guard(_mutex);
- for (auto i = 0; i < count; ++i) {
- _threads.emplace_back(
- _threadAssertionMonitor->spawn([remoteSchedulerTask] { remoteSchedulerTask(); }));
- }
-}
-
-void ThreadPoolExecutorStressTestEngine::_addMockNetworkResponseThread() {
- if (!_netMock) {
- return; // Nothing to do if not mock.
- }
- stdx::lock_guard<Latch> lk(_mutex);
- _threads.emplace_back([this] {
- while (true) {
- {
- NetworkInterfaceMock::InNetworkGuard ing(*_netMock);
- while ((*_netMock)->hasReadyRequests()) {
- (*_netMock)->scheduleSuccessfulResponse(BSONObj());
- (*_netMock)->runReadyNetworkOperations();
- }
- }
- sleepFor(kDurationBetweenMockedResponses);
- // Network response thread must wait until all callbacks are cleared.
- if (_terminate.load()) {
- stdx::lock_guard<Latch> lk(_mutex);
- if (_callbacks.empty()) {
- break;
- }
- }
- }
- });
-}
-
-void ThreadPoolExecutorStressTestEngine::waitAndCleanup() {
- while (true) {
- sleepFor(Milliseconds(500));
- int64_t threadsSize;
- int64_t pendingRequestsSize;
- {
- stdx::lock_guard<Latch> lk(_mutex);
- threadsSize = _threads.size();
- pendingRequestsSize = _callbacks.size();
- }
- LOGV2(5822101,
- "Waiting for test termination",
- "completed"_attr = _completedWorks.get(),
- "commandsSucceeded"_attr = _commandsSucceeded.get(),
- "commandsFailed"_attr = _commandsFailed.get(),
- "threadsSize"_attr = threadsSize,
- "pendingRequestsSize"_attr = pendingRequestsSize);
- if (_timer.elapsed() > _waitBeforeTermination) {
- break;
- }
- }
-
- _terminate.store(true);
-
- std::list<stdx::thread> threads;
- {
- auto lk = stdx::lock_guard(_mutex);
- std::swap(threads, _threads);
- }
-
- while (!threads.empty()) {
- threads.front().join();
- threads.pop_front();
- }
-
- _executor->shutdown();
- _executor->join();
-
- _threadAssertionMonitor->notifyDone();
- _threadAssertionMonitor.reset();
-}
-
-int32_t ThreadPoolExecutorStressTestEngine::nextRandomInt32(int32_t max) {
- static thread_local PseudoRandom random(SecureRandom().nextInt64());
- return random.nextInt32(max);
-}
-
-
-} // namespace executor
-} // namespace mongo
diff --git a/src/mongo/executor/executor_stress_test_fixture.h b/src/mongo/executor/executor_stress_test_fixture.h
deleted file mode 100644
index d54f6e25658..00000000000
--- a/src/mongo/executor/executor_stress_test_fixture.h
+++ /dev/null
@@ -1,131 +0,0 @@
-/**
- * Copyright (C) 2021-present MongoDB, Inc.
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the Server Side Public License, version 1,
- * as published by MongoDB, Inc.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * Server Side Public License for more details.
- *
- * You should have received a copy of the Server Side Public License
- * along with this program. If not, see
- * <http://www.mongodb.com/licensing/server-side-public-license>.
- *
- * As a special exception, the copyright holders give permission to link the
- * code of portions of this program with the OpenSSL library under certain
- * conditions as described in each individual source file and distribute
- * linked combinations including the program with the OpenSSL library. You
- * must comply with the Server Side Public License in all respects for
- * all of the code used other than as permitted herein. If you modify file(s)
- * with this exception, you may extend this exception to your version of the
- * file(s), but you are not obligated to do so. If you do not wish to do so,
- * delete this exception statement from your version. If you delete this
- * exception statement from all source files in the program, then also delete
- * it in the license file.
- */
-
-#pragma once
-
-#include "mongo/base/counter.h"
-#include "mongo/executor/thread_pool_task_executor_test_fixture.h"
-#include "mongo/platform/atomic_word.h"
-#include "mongo/unittest/thread_assertion_monitor.h"
-#include "mongo/util/duration.h"
-
-namespace mongo {
-namespace executor {
-
-/**
- * Stress test common implementation methods.
- */
-class ThreadPoolExecutorStressTestEngine {
-public:
- using CopyableCallback = std::function<void(const TaskExecutor::CallbackArgs&)>;
-
- // Adjust periods based on statistical reports if needed.
- static constexpr auto kDurationBetweenSimpleSchedulings{Microseconds(40)};
- static constexpr auto kDurationBetweenRemoteCommands{Microseconds(10)};
- static constexpr auto kDurationBetweenMockedResponses{Microseconds(10)};
- static constexpr auto kMaxCallbacks = 100 * 1000;
-
- ThreadPoolExecutorStressTestEngine(std::shared_ptr<TaskExecutor> executor,
- boost::optional<NetworkInterfaceMock*> netMock,
- Milliseconds waitBeforeTermination);
-
- // Add various stress test threads. In each case 'count' is the count of new threads to add.
-
- void addSimpleSchedulingThreads(int count);
-
- void addRandomCancelationThreads(int count);
-
- void addScheduleRemoteCommandThreads(int count);
-
- // Waits for test termination, then sets termination flag and blocks until all threads
- // are terminated.
- void waitAndCleanup();
-
-private:
- void _addMockNetworkResponseThread();
-
- static int32_t nextRandomInt32(int32_t max);
-
- const std::shared_ptr<TaskExecutor> _executor;
- const boost::optional<NetworkInterfaceMock*> _netMock;
-
- // Statistics.
- Counter64 _completedWorks;
- Counter64 _commandsSucceeded;
- Counter64 _commandsFailed;
-
- AtomicWord<bool> _terminate; // Termination flag.
- Timer _timer;
- Milliseconds _waitBeforeTermination;
- boost::optional<unittest::ThreadAssertionMonitor> _threadAssertionMonitor;
-
- Mutex _mutex = MONGO_MAKE_LATCH("ThreadPoolExecutorMockNetStressTest::_mutex");
- std::list<stdx::thread> _threads;
- std::deque<TaskExecutor::CallbackHandle> _callbacks;
-};
-
-/**
- * Stress test suite using mock thread pool and mock Net interface.
- */
-class ThreadPoolExecutorMockNetStressTest : public ThreadPoolExecutorTest {
-public:
- void setUp() override {
- ThreadPoolExecutorTest::setUp();
- getExecutor().startup();
- _engine = std::make_unique<ThreadPoolExecutorStressTestEngine>(
- getExecutor().shared_from_this(), getNet(), Seconds(2));
- }
-
- void tearDown() override {
- waitAndCleanup();
- ThreadPoolExecutorTest::tearDown();
- }
-
- void addSimpleSchedulingThreads(int count) {
- _engine->addSimpleSchedulingThreads(count);
- }
-
- void addRandomCancelationThreads(int count) {
- _engine->addRandomCancelationThreads(count);
- }
-
- void addScheduleRemoteCommandThreads(int count) {
- _engine->addScheduleRemoteCommandThreads(count);
- }
-
- void waitAndCleanup() {
- _engine->waitAndCleanup();
- }
-
-private:
- std::unique_ptr<ThreadPoolExecutorStressTestEngine> _engine;
-};
-
-} // namespace executor
-} // namespace mongo
diff --git a/src/mongo/executor/thread_pool_task_executor_with_mock_net_stress_test.cpp b/src/mongo/executor/thread_pool_task_executor_with_mock_net_stress_test.cpp
deleted file mode 100644
index 51544792dca..00000000000
--- a/src/mongo/executor/thread_pool_task_executor_with_mock_net_stress_test.cpp
+++ /dev/null
@@ -1,68 +0,0 @@
-/**
- * Copyright (C) 2021-present MongoDB, Inc.
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the Server Side Public License, version 1,
- * as published by MongoDB, Inc.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * Server Side Public License for more details.
- *
- * You should have received a copy of the Server Side Public License
- * along with this program. If not, see
- * <http://www.mongodb.com/licensing/server-side-public-license>.
- *
- * As a special exception, the copyright holders give permission to link the
- * code of portions of this program with the OpenSSL library under certain
- * conditions as described in each individual source file and distribute
- * linked combinations including the program with the OpenSSL library. You
- * must comply with the Server Side Public License in all respects for
- * all of the code used other than as permitted herein. If you modify file(s)
- * with this exception, you may extend this exception to your version of the
- * file(s), but you are not obligated to do so. If you do not wish to do so,
- * delete this exception statement from your version. If you delete this
- * exception statement from all source files in the program, then also delete
- * it in the license file.
- */
-
-#include "mongo/platform/basic.h"
-
-#include <memory>
-
-#include "mongo/base/init.h"
-#include "mongo/base/status.h"
-#include "mongo/executor/executor_stress_test_fixture.h"
-#include "mongo/executor/task_executor_test_common.h"
-#include "mongo/executor/task_executor_test_fixture.h"
-#include "mongo/executor/thread_pool_task_executor.h"
-#include "mongo/executor/thread_pool_task_executor_test_fixture.h"
-
-namespace mongo {
-namespace executor {
-namespace {
-
-MONGO_INITIALIZER(ThreadPoolExecutorCommonTests)(InitializerContext*) {
- addTestsForExecutor("ThreadPoolExecutorCommon", [](std::unique_ptr<NetworkInterfaceMock> net) {
- return makeSharedThreadPoolTestExecutor(std::move(net));
- });
-}
-
-TEST_F(ThreadPoolExecutorMockNetStressTest, StressTest) {
-#if defined(__APPLE__) && defined(__aarch64__)
- // TODO: Fix this test under mac os arm64.
-#else
- // The idea is to have as much concurrency in the 'executor' as possible. However adding
- // too many threads increases contention in the ThreadPoolExecutorStressTestEngine
- // _mutex. Check the stats printed by the waitAndCleanup() for results. Apparently the
- // executor saturates very fast to some extend the thread count has very limited influence.
- addSimpleSchedulingThreads(100);
- addRandomCancelationThreads(20);
- addScheduleRemoteCommandThreads(100);
-#endif // defined(__APPLE__) && defined(__aarch64__)
-}
-
-} // namespace
-} // namespace executor
-} // namespace mongo