summaryrefslogtreecommitdiff
path: root/src/mongo/executor
diff options
context:
space:
mode:
authorAndrew Shuvalov <andrew.shuvalov@mongodb.com>2021-07-26 14:29:51 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2021-07-26 14:51:52 +0000
commit2f5709e68f0acc28448d90612536c0c7d22b0c0c (patch)
tree6d5cc98b2ef1c328fc8fe8f78ccbad33cc4a8644 /src/mongo/executor
parent0588dd589451ce5762035f68b81d12b82482885c (diff)
downloadmongo-2f5709e68f0acc28448d90612536c0c7d22b0c0c.tar.gz
SERVER-58221: stress test for ThreadPoolExecutor
Diffstat (limited to 'src/mongo/executor')
-rw-r--r--src/mongo/executor/SConscript38
-rw-r--r--src/mongo/executor/executor_stress_test_fixture.cpp208
-rw-r--r--src/mongo/executor/executor_stress_test_fixture.h130
-rw-r--r--src/mongo/executor/thread_pool_task_executor_with_mock_net_stress_test.cpp64
4 files changed, 439 insertions, 1 deletions
diff --git a/src/mongo/executor/SConscript b/src/mongo/executor/SConscript
index a382c094de8..8bb67ffb50d 100644
--- a/src/mongo/executor/SConscript
+++ b/src/mongo/executor/SConscript
@@ -251,6 +251,28 @@ env.Library(
)
env.Library(
+ target='network_interface_mock_test_fixture',
+ source=[
+ 'network_interface_mock_test_fixture.cpp',
+ ],
+ LIBDEPS=[
+ 'task_executor_test_fixture',
+ 'thread_pool_task_executor',
+ ]
+)
+
+env.Library(
+ target='executor_stress_test_fixture',
+ source=[
+ 'executor_stress_test_fixture.cpp',
+ ],
+ LIBDEPS=[
+ 'task_executor_test_fixture',
+ 'thread_pool_task_executor',
+ ]
+)
+
+env.Library(
target='task_executor_pool',
source=[
'task_executor_pool.cpp',
@@ -295,7 +317,6 @@ env.CppUnitTest(
'connection_pool_test_fixture.cpp',
'mock_network_fixture_test.cpp',
'network_interface_mock_test.cpp',
- 'network_interface_mock_test_fixture.cpp',
'scoped_task_executor_test.cpp',
'task_executor_cursor_test.cpp',
'thread_pool_task_executor_test.cpp',
@@ -304,6 +325,7 @@ env.CppUnitTest(
'connection_pool_executor',
'egress_tag_closer_manager',
'network_interface_mock',
+ 'network_interface_mock_test_fixture',
'scoped_task_executor',
'task_executor_cursor',
'thread_pool_task_executor',
@@ -311,6 +333,20 @@ env.CppUnitTest(
],
)
+env.CppUnitTest(
+ target='executor_with_mock_net_stress_test',
+ source=[
+ 'thread_pool_task_executor_with_mock_net_stress_test.cpp',
+ ],
+ LIBDEPS=[
+ 'executor_stress_test_fixture',
+ 'network_interface_mock',
+ 'network_interface_mock_test_fixture',
+ 'thread_pool_task_executor',
+ 'thread_pool_task_executor_test_fixture',
+ ],
+)
+
env.CppIntegrationTest(
target='executor_integration_test',
source=[
diff --git a/src/mongo/executor/executor_stress_test_fixture.cpp b/src/mongo/executor/executor_stress_test_fixture.cpp
new file mode 100644
index 00000000000..46e6ca3ed81
--- /dev/null
+++ b/src/mongo/executor/executor_stress_test_fixture.cpp
@@ -0,0 +1,208 @@
+/**
+ * Copyright (C) 2021-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kTest
+
+#include "mongo/executor/executor_stress_test_fixture.h"
+#include "mongo/logv2/log.h"
+#include "mongo/rpc/topology_version_gen.h"
+#include "mongo/unittest/integration_test.h"
+#include "mongo/util/net/hostandport.h"
+
+namespace mongo {
+namespace executor {
+
+ThreadPoolExecutorStressTestEngine::ThreadPoolExecutorStressTestEngine(
+ std::shared_ptr<TaskExecutor> executor,
+ boost::optional<NetworkInterfaceMock*> netMock,
+ Milliseconds waitBeforeTermination)
+ : _executor(std::move(executor)),
+ _netMock(netMock),
+ _random(SecureRandom().nextInt64()),
+ _waitBeforeTermination(waitBeforeTermination) {
+ _timer.reset();
+ _terminate.store(false);
+ _threadAssertionMonitor.emplace();
+ if (_netMock) {
+ // If we use mocked Network, start a thread that answers pending requests.
+ _addMockNetworkResponseThread();
+ }
+}
+
+void ThreadPoolExecutorStressTestEngine::addSimpleSchedulingThreads(int count) {
+ auto schedulerTask = [this] {
+ while (!_terminate.load()) {
+ CopyableCallback work = [this](const TaskExecutor::CallbackArgs&) {
+ _completedWorks.increment();
+ };
+ const auto swCb = _executor->scheduleWork(work);
+ if (!swCb.isOK()) {
+ ASSERT_TRUE(_terminate.load())
+ << "Scheduling failed before termination flag was set";
+ ASSERT_TRUE(_executor->isShuttingDown())
+ << "Scheduling failed before executor was shut down";
+ } else {
+ auto lk = stdx::lock_guard(_mutex);
+ _callbacks.push_back(swCb.getValue());
+ }
+ sleepFor(kDurationBetweenSimpleSchedulings);
+ }
+ };
+
+ auto lk = stdx::lock_guard(_mutex);
+ for (int i = 0; i < count; ++i) {
+ // _monitor is an instance of `ThreadAssertionMonitor`
+ _threads.emplace_back(_threadAssertionMonitor->spawn([schedulerTask] { schedulerTask(); }));
+ }
+}
+
+void ThreadPoolExecutorStressTestEngine::addRandomCancelationThreads(int count) {
+ auto cancelationTask = [this] {
+ while (!_terminate.load()) {
+ TaskExecutor::CallbackHandle cb;
+ {
+ auto lk = stdx::lock_guard(_mutex);
+ while (_callbacks.size() > 100 && !cb) {
+ cb = std::move(_callbacks.front());
+ _callbacks.pop_front();
+ }
+ }
+
+ if (auto shouldCancel = _random.nextInt32(2) == 0; shouldCancel && cb) {
+ _executor->cancel(cb);
+ } else if (cb) {
+ _executor->wait(cb);
+ }
+ }
+ };
+
+ auto lk = stdx::lock_guard(_mutex);
+ for (int i = 0; i < count; ++i) {
+ _threads.emplace_back(cancelationTask);
+ }
+}
+
+void ThreadPoolExecutorStressTestEngine::addScheduleRemoteCommandThreads(int count) {
+ auto remoteSchedulerTask = [this] {
+ RemoteCommandRequest rcr(HostAndPort("localhost"), "test", BSONObj(), nullptr);
+ while (!_terminate.load()) {
+ const auto swCb = _executor->scheduleRemoteCommand(
+ rcr, [this](const TaskExecutor::RemoteCommandCallbackArgs& ca) {
+ if (ca.response.status.isOK()) {
+ _commandsSucceeded.increment();
+ } else {
+ _commandsFailed.increment();
+ }
+ });
+ if (!swCb.isOK()) {
+ // This race can happen only at termination.
+ ASSERT_TRUE(_terminate.load())
+ << "Scheduling failed before termination flag was set";
+ ASSERT_TRUE(_executor->isShuttingDown())
+ << "Scheduling failed before executor was shut down";
+ }
+
+ {
+ stdx::lock_guard<Latch> lk(_mutex);
+ _callbacks.push_back(swCb.getValue());
+ }
+ sleepFor(kDurationBetweenRemoteCommands);
+ }
+ };
+
+ auto lk = stdx::lock_guard(_mutex);
+ for (auto i = 0; i < count; ++i) {
+ _threads.emplace_back(
+ _threadAssertionMonitor->spawn([remoteSchedulerTask] { remoteSchedulerTask(); }));
+ }
+}
+
+void ThreadPoolExecutorStressTestEngine::_addMockNetworkResponseThread() {
+ if (!_netMock) {
+ return; // Nothing to do if not mock.
+ }
+ stdx::lock_guard<Latch> lk(_mutex);
+ _threads.emplace_back([this] {
+ while (!_terminate.load()) {
+ {
+ NetworkInterfaceMock::InNetworkGuard ing(*_netMock);
+ while ((*_netMock)->hasReadyRequests()) {
+ (*_netMock)->scheduleSuccessfulResponse(BSONObj());
+ (*_netMock)->runReadyNetworkOperations();
+ }
+ }
+ sleepFor(kDurationBetweenMockedResponses);
+ }
+ });
+}
+
+void ThreadPoolExecutorStressTestEngine::waitAndCleanup() {
+ while (true) {
+ sleepFor(Milliseconds(500));
+ int64_t threadsSize;
+ int64_t pendingRequestsSize;
+ {
+ stdx::lock_guard<Latch> lk(_mutex);
+ threadsSize = _threads.size();
+ pendingRequestsSize = _callbacks.size();
+ }
+ LOGV2(5822101,
+ "Waiting for test termination",
+ "completed"_attr = _completedWorks.get(),
+ "commandsSucceeded"_attr = _commandsSucceeded.get(),
+ "commandsFailed"_attr = _commandsFailed.get(),
+ "threadsSize"_attr = threadsSize,
+ "pendingRequestsSize"_attr = pendingRequestsSize);
+ if (_timer.elapsed() > _waitBeforeTermination) {
+ break;
+ }
+ }
+
+ _terminate.store(true);
+
+ std::list<stdx::thread> threads;
+ {
+ auto lk = stdx::lock_guard(_mutex);
+ std::swap(threads, _threads);
+ }
+
+ while (!threads.empty()) {
+ threads.front().join();
+ threads.pop_front();
+ }
+
+ _executor->shutdown();
+ _executor->join();
+
+ _threadAssertionMonitor->notifyDone();
+ _threadAssertionMonitor.reset();
+}
+
+
+} // namespace executor
+} // namespace mongo
diff --git a/src/mongo/executor/executor_stress_test_fixture.h b/src/mongo/executor/executor_stress_test_fixture.h
new file mode 100644
index 00000000000..6a31d725a12
--- /dev/null
+++ b/src/mongo/executor/executor_stress_test_fixture.h
@@ -0,0 +1,130 @@
+/**
+ * Copyright (C) 2021-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#pragma once
+
+#include "mongo/base/counter.h"
+#include "mongo/executor/thread_pool_task_executor_test_fixture.h"
+#include "mongo/platform/atomic_word.h"
+#include "mongo/platform/random.h"
+#include "mongo/unittest/thread_assertion_monitor.h"
+#include "mongo/util/duration.h"
+
+namespace mongo {
+namespace executor {
+
+/**
+ * Stress test common implementation methods.
+ */
+class ThreadPoolExecutorStressTestEngine {
+public:
+ using CopyableCallback = std::function<void(const TaskExecutor::CallbackArgs&)>;
+
+ // Adjust periods based on statistical reports if needed.
+ static constexpr auto kDurationBetweenSimpleSchedulings{Microseconds(40)};
+ static constexpr auto kDurationBetweenRemoteCommands{Microseconds(10)};
+ static constexpr auto kDurationBetweenMockedResponses{Microseconds(10)};
+
+ ThreadPoolExecutorStressTestEngine(std::shared_ptr<TaskExecutor> executor,
+ boost::optional<NetworkInterfaceMock*> netMock,
+ Milliseconds waitBeforeTermination);
+
+ // Add various stress test threads. In each case 'count' is the count of new threads to add.
+
+ void addSimpleSchedulingThreads(int count);
+
+ void addRandomCancelationThreads(int count);
+
+ void addScheduleRemoteCommandThreads(int count);
+
+ // Waits for test termination, then sets termination flag and blocks until all threads
+ // are terminated.
+ void waitAndCleanup();
+
+private:
+ void _addMockNetworkResponseThread();
+
+ const std::shared_ptr<TaskExecutor> _executor;
+ const boost::optional<NetworkInterfaceMock*> _netMock;
+ PseudoRandom _random;
+
+ // Statistics.
+ Counter64 _completedWorks;
+ Counter64 _commandsSucceeded;
+ Counter64 _commandsFailed;
+
+ AtomicWord<bool> _terminate; // Termination flag.
+ Timer _timer;
+ Milliseconds _waitBeforeTermination;
+ boost::optional<unittest::ThreadAssertionMonitor> _threadAssertionMonitor;
+
+ Mutex _mutex = MONGO_MAKE_LATCH("ThreadPoolExecutorMockNetStressTest::_mutex");
+ std::list<stdx::thread> _threads;
+ std::deque<TaskExecutor::CallbackHandle> _callbacks;
+};
+
+/**
+ * Stress test suite using mock thread pool and mock Net interface.
+ */
+class ThreadPoolExecutorMockNetStressTest : public ThreadPoolExecutorTest {
+public:
+ void setUp() override {
+ ThreadPoolExecutorTest::setUp();
+ getExecutor().startup();
+ _engine = std::make_unique<ThreadPoolExecutorStressTestEngine>(
+ getExecutor().shared_from_this(), getNet(), Seconds(2));
+ }
+
+ void tearDown() override {
+ waitAndCleanup();
+ ThreadPoolExecutorTest::tearDown();
+ }
+
+ void addSimpleSchedulingThreads(int count) {
+ _engine->addSimpleSchedulingThreads(count);
+ }
+
+ void addRandomCancelationThreads(int count) {
+ _engine->addRandomCancelationThreads(count);
+ }
+
+ void addScheduleRemoteCommandThreads(int count) {
+ _engine->addScheduleRemoteCommandThreads(count);
+ }
+
+ void waitAndCleanup() {
+ _engine->waitAndCleanup();
+ }
+
+private:
+ std::unique_ptr<ThreadPoolExecutorStressTestEngine> _engine;
+};
+
+} // namespace executor
+} // namespace mongo
diff --git a/src/mongo/executor/thread_pool_task_executor_with_mock_net_stress_test.cpp b/src/mongo/executor/thread_pool_task_executor_with_mock_net_stress_test.cpp
new file mode 100644
index 00000000000..ad8c983cbce
--- /dev/null
+++ b/src/mongo/executor/thread_pool_task_executor_with_mock_net_stress_test.cpp
@@ -0,0 +1,64 @@
+/**
+ * Copyright (C) 2021-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#include "mongo/platform/basic.h"
+
+#include <memory>
+
+#include "mongo/base/init.h"
+#include "mongo/base/status.h"
+#include "mongo/executor/executor_stress_test_fixture.h"
+#include "mongo/executor/task_executor_test_common.h"
+#include "mongo/executor/task_executor_test_fixture.h"
+#include "mongo/executor/thread_pool_task_executor.h"
+#include "mongo/executor/thread_pool_task_executor_test_fixture.h"
+
+namespace mongo {
+namespace executor {
+namespace {
+
+MONGO_INITIALIZER(ThreadPoolExecutorCommonTests)(InitializerContext*) {
+ addTestsForExecutor("ThreadPoolExecutorCommon", [](std::unique_ptr<NetworkInterfaceMock> net) {
+ return makeSharedThreadPoolTestExecutor(std::move(net));
+ });
+}
+
+TEST_F(ThreadPoolExecutorMockNetStressTest, StressTest) {
+ // The idea is to have as much concurrency in the 'executor' as possible. However adding
+ // too many threads increases contention in the ThreadPoolExecutorStressTestEngine
+ // _mutex. Check the stats printed by the waitAndCleanup() for results. Apparently the
+ // executor saturates very fast to some extend the thread count has very limited influence.
+ addSimpleSchedulingThreads(100);
+ addRandomCancelationThreads(20);
+ addScheduleRemoteCommandThreads(100);
+}
+
+} // namespace
+} // namespace executor
+} // namespace mongo