diff options
author | Amirsaman Memaripour <amirsaman.memaripour@mongodb.com> | 2020-06-22 16:30:35 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2020-06-23 22:36:50 +0000 |
commit | 311d7ccd0b6fd24122e28b7e8f3a1e191dd4078a (patch) | |
tree | 5e18af20c70eaf568d93778b1ee0052da3a91ace | |
parent | 36bf915c32d551ad557ec7a1fa41890037e9f54f (diff) | |
download | mongo-311d7ccd0b6fd24122e28b7e8f3a1e191dd4078a.tar.gz |
SERVER-48973 Remove ServiceExecutorAdaptive
-rw-r--r-- | buildscripts/resmokelib/core/programs.py | 2 | ||||
-rw-r--r-- | buildscripts/resmokelib/run/__init__.py | 3 | ||||
-rw-r--r-- | debian/mongod.1 | 41 | ||||
-rw-r--r-- | debian/mongos.1 | 41 | ||||
-rw-r--r-- | jstests/noPassthrough/command_line_parsing.js | 2 | ||||
-rw-r--r-- | jstests/noPassthrough/max_conns_override.js | 2 | ||||
-rw-r--r-- | src/mongo/db/server_options.h | 3 | ||||
-rw-r--r-- | src/mongo/db/server_options_general.idl | 6 | ||||
-rw-r--r-- | src/mongo/db/server_options_server_helpers.cpp | 11 | ||||
-rw-r--r-- | src/mongo/shell/servers.js | 6 | ||||
-rw-r--r-- | src/mongo/shell/utils.js | 1 | ||||
-rw-r--r-- | src/mongo/transport/SConscript | 3 | ||||
-rw-r--r-- | src/mongo/transport/service_executor_adaptive.cpp | 704 | ||||
-rw-r--r-- | src/mongo/transport/service_executor_adaptive.h | 251 | ||||
-rw-r--r-- | src/mongo/transport/service_executor_adaptive_test.cpp | 397 | ||||
-rw-r--r-- | src/mongo/transport/service_executor_test.cpp | 60 | ||||
-rw-r--r-- | src/mongo/transport/transport_layer_manager.cpp | 26 |
17 files changed, 4 insertions, 1555 deletions
diff --git a/buildscripts/resmokelib/core/programs.py b/buildscripts/resmokelib/core/programs.py index a9931c2e5ad..d18f5d9ac70 100644 --- a/buildscripts/resmokelib/core/programs.py +++ b/buildscripts/resmokelib/core/programs.py @@ -214,7 +214,6 @@ def mongod_program( # pylint: disable=too-many-branches,too-many-statements shortcut_opts = { "enableMajorityReadConcern": config.MAJORITY_READ_CONCERN, "nojournal": config.NO_JOURNAL, - "serviceExecutor": config.SERVICE_EXECUTOR, "storageEngine": config.STORAGE_ENGINE, "transportLayer": config.TRANSPORT_LAYER, "wiredTigerCollectionConfigString": config.WT_COLL_CONFIG, @@ -327,7 +326,6 @@ def mongo_shell_program( # pylint: disable=too-many-branches,too-many-locals,to "enableMajorityReadConcern": (config.MAJORITY_READ_CONCERN, True), "mixedBinVersions": (config.MIXED_BIN_VERSIONS, ""), "noJournal": (config.NO_JOURNAL, False), - "serviceExecutor": (config.SERVICE_EXECUTOR, ""), "storageEngine": (config.STORAGE_ENGINE, ""), "storageEngineCacheSizeGB": (config.STORAGE_ENGINE_CACHE_SIZE, ""), "testName": (test_name, ""), diff --git a/buildscripts/resmokelib/run/__init__.py b/buildscripts/resmokelib/run/__init__.py index 62405acb14b..6b906597e14 100644 --- a/buildscripts/resmokelib/run/__init__.py +++ b/buildscripts/resmokelib/run/__init__.py @@ -728,9 +728,6 @@ class RunPlugin(PluginInterface): help=("Seed for the random number generator. Useful in combination with the" " --shuffle option for producing a consistent test execution order.")) - parser.add_argument("--serviceExecutor", dest="service_executor", metavar="EXECUTOR", - help="The service executor used by jstests") - parser.add_argument("--transportLayer", dest="transport_layer", metavar="TRANSPORT", help="The transport layer used by jstests") diff --git a/debian/mongod.1 b/debian/mongod.1 index 5892f4e2196..700a0774222 100644 --- a/debian/mongod.1 +++ b/debian/mongod.1 @@ -972,47 +972,6 @@ mongod \-\-timeZoneInfo timezonedb\-2017b/ .UNINDENT .INDENT 0.0 .TP -.B \-\-serviceExecutor <string> -\fIDefault\fP: synchronous -.sp -New in version 3.6. - -.sp -Determines the threading and execution model \fBmongod\fP uses to -execute client requests. The \fB\-\-serviceExecutor\fP option accepts one -of the following values: -.TS -center; -|l|l|. -_ -T{ -Value -T} T{ -Description -T} -_ -T{ -\fBsynchronous\fP -T} T{ -The \fBmongod\fP uses synchronous networking and manages its -networking thread pool on a per connection basis. Previous -versions of MongoDB managed threads in this way. -T} -_ -T{ -\fBadaptive\fP -T} T{ -The \fBmongod\fP uses the new experimental asynchronous -networking mode with an adaptive thread pool which manages -threads on a per request basis. This mode should have more -consistent performance and use less resources when there are -more inactive connections than database requests. -T} -_ -.TE -.UNINDENT -.INDENT 0.0 -.TP .B \-\-outputConfig New in version 4.2. diff --git a/debian/mongos.1 b/debian/mongos.1 index db9e626e3df..e9e6f6d2464 100644 --- a/debian/mongos.1 +++ b/debian/mongos.1 @@ -718,47 +718,6 @@ between \fBmongo\fP shell and \fBmongod\fP are not compressed. .UNINDENT .INDENT 0.0 .TP -.B \-\-serviceExecutor <string> -\fIDefault\fP: synchronous -.sp -New in version 3.6. - -.sp -Determines the threading and execution model \fBmongos\fP uses to -execute client requests. The \fB\-\-serviceExecutor\fP option accepts one -of the following values: -.TS -center; -|l|l|. -_ -T{ -Value -T} T{ -Description -T} -_ -T{ -\fBsynchronous\fP -T} T{ -The \fBmongos\fP uses synchronous networking and manages its -networking thread pool on a per connection basis. Previous -versions of MongoDB managed threads in this way. -T} -_ -T{ -\fBadaptive\fP -T} T{ -The \fBmongos\fP uses the new experimental asynchronous -networking mode with an adaptive thread pool which manages -threads on a per request basis. This mode should have more -consistent performance and use less resources when there are -more inactive connections than database requests. -T} -_ -.TE -.UNINDENT -.INDENT 0.0 -.TP .B \-\-timeZoneInfo <path> The full path from which to load the time zone database. If this option is not provided, then MongoDB will use its built\-in time zone database. diff --git a/jstests/noPassthrough/command_line_parsing.js b/jstests/noPassthrough/command_line_parsing.js index 9e96dcab01a..511aabf90dd 100644 --- a/jstests/noPassthrough/command_line_parsing.js +++ b/jstests/noPassthrough/command_line_parsing.js @@ -27,7 +27,6 @@ var m2result = m2.getDB("admin").runCommand("getCmdLineOpts"); MongoRunner.stopMongod(m2); // remove variables that depend on the way the test is started. -delete m2result.parsed.net.serviceExecutor; delete m2result.parsed.net.transportLayer; delete m2result.parsed.storage.mmapv1; delete m2result.parsed.setParameter; @@ -56,7 +55,6 @@ var m3result = m3.getDB("admin").runCommand("getCmdLineOpts"); MongoRunner.stopMongod(m3); // remove variables that depend on the way the test is started. -delete m3result.parsed.net.serviceExecutor; delete m3result.parsed.net.transportLayer; delete m3result.parsed.storage.mmapv1; delete m3result.parsed.setParameter; diff --git a/jstests/noPassthrough/max_conns_override.js b/jstests/noPassthrough/max_conns_override.js index 07d012d663f..a0005f981e5 100644 --- a/jstests/noPassthrough/max_conns_override.js +++ b/jstests/noPassthrough/max_conns_override.js @@ -4,8 +4,6 @@ const configuredMaxConns = 5; const configuredReadyAdminThreads = 3; let conn = MongoRunner.runMongod({ config: "jstests/noPassthrough/libs/max_conns_override_config.yaml", - // We check a specific field in this executor's serverStatus section - serviceExecutor: "synchronous", }); // Use up all the maxConns with junk connections, all of these should succeed diff --git a/src/mongo/db/server_options.h b/src/mongo/db/server_options.h index 8487167c490..19ba188dbea 100644 --- a/src/mongo/db/server_options.h +++ b/src/mongo/db/server_options.h @@ -82,9 +82,6 @@ struct ServerGlobalParams { std::string socket = "/tmp"; // UNIX domain socket directory std::string transportLayer; // --transportLayer (must be either "asio" or "legacy") - // --serviceExecutor ("adaptive", "synchronous") - std::string serviceExecutor; - size_t maxConns = DEFAULT_MAX_CONN; // Maximum number of simultaneous open connections. std::vector<stdx::variant<CIDR, std::string>> maxConnsOverride; int reservedAdminThreads = 0; diff --git a/src/mongo/db/server_options_general.idl b/src/mongo/db/server_options_general.idl index 67093ee254f..1d595700996 100644 --- a/src/mongo/db/server_options_general.idl +++ b/src/mongo/db/server_options_general.idl @@ -105,12 +105,6 @@ configs: arg_vartype: String default: asio hidden: true - 'net.serviceExecutor': - description: 'Sets the service executor implementation' - short_name: serviceExecutor - arg_vartype: String - default: synchronous - hidden: true 'processManagement.pidFilePath': description: 'Full path to pidfile (if not set, no pidfile is created)' short_name: pidfilepath diff --git a/src/mongo/db/server_options_server_helpers.cpp b/src/mongo/db/server_options_server_helpers.cpp index 44d01d99f79..c895931704d 100644 --- a/src/mongo/db/server_options_server_helpers.cpp +++ b/src/mongo/db/server_options_server_helpers.cpp @@ -304,17 +304,6 @@ Status storeServerOptions(const moe::Environment& params) { } } - if (params.count("net.serviceExecutor")) { - auto value = params["net.serviceExecutor"].as<std::string>(); - const auto valid = {"synchronous"_sd, "adaptive"_sd}; - if (std::find(valid.begin(), valid.end(), value) == valid.end()) { - return {ErrorCodes::BadValue, "Unsupported value for serviceExecutor"}; - } - serverGlobalParams.serviceExecutor = value; - } else { - serverGlobalParams.serviceExecutor = "synchronous"; - } - if (params.count("security.transitionToAuth")) { serverGlobalParams.transitionToAuth = params["security.transitionToAuth"].as<bool>(); } diff --git a/src/mongo/shell/servers.js b/src/mongo/shell/servers.js index caf7bbbe737..5e02938a861 100644 --- a/src/mongo/shell/servers.js +++ b/src/mongo/shell/servers.js @@ -1161,12 +1161,6 @@ function appendSetParameterArgs(argArray) { // New options in 3.5.x if (!programMajorMinorVersion || programMajorMinorVersion >= 305) { - if (jsTest.options().serviceExecutor) { - if (!argArrayContains("--serviceExecutor")) { - argArray.push(...["--serviceExecutor", jsTest.options().serviceExecutor]); - } - } - if (jsTest.options().transportLayer) { if (!argArrayContains("--transportLayer")) { argArray.push(...["--transportLayer", jsTest.options().transportLayer]); diff --git a/src/mongo/shell/utils.js b/src/mongo/shell/utils.js index 2ad53b963e3..48c3388112b 100644 --- a/src/mongo/shell/utils.js +++ b/src/mongo/shell/utils.js @@ -273,7 +273,6 @@ jsTestOptions = function() { testingDiagnosticsEnabled: TestData.hasOwnProperty('testingDiagnosticsEnabled') ? TestData.testingDiagnosticsEnabled : true, - serviceExecutor: TestData.serviceExecutor, setParameters: TestData.setParameters, setParametersMongos: TestData.setParametersMongos, storageEngine: TestData.storageEngine, diff --git a/src/mongo/transport/SConscript b/src/mongo/transport/SConscript index 95ebc3a1fbb..ab1e34c702b 100644 --- a/src/mongo/transport/SConscript +++ b/src/mongo/transport/SConscript @@ -85,7 +85,6 @@ env.Library( tlEnv.Library( target='service_executor', source=[ - 'service_executor_adaptive.cpp', 'service_executor_reserved.cpp', 'service_executor_synchronous.cpp', env.Idlc('service_executor.idl')[0], @@ -175,8 +174,6 @@ tlEnv.CppUnitTest( 'message_compressor_registry_test.cpp', 'transport_layer_asio_test.cpp', 'service_executor_test.cpp', - # Disable this test until SERVER-30475 and associated build failure tickets are resolved. - # 'service_executor_adaptive_test.cpp', 'max_conns_override_test.cpp', 'service_state_machine_test.cpp', ], diff --git a/src/mongo/transport/service_executor_adaptive.cpp b/src/mongo/transport/service_executor_adaptive.cpp deleted file mode 100644 index 156f1e3d74a..00000000000 --- a/src/mongo/transport/service_executor_adaptive.cpp +++ /dev/null @@ -1,704 +0,0 @@ -/** - * Copyright (C) 2018-present MongoDB, Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the Server Side Public License, version 1, - * as published by MongoDB, Inc. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * Server Side Public License for more details. - * - * You should have received a copy of the Server Side Public License - * along with this program. If not, see - * <http://www.mongodb.com/licensing/server-side-public-license>. - * - * As a special exception, the copyright holders give permission to link the - * code of portions of this program with the OpenSSL library under certain - * conditions as described in each individual source file and distribute - * linked combinations including the program with the OpenSSL library. You - * must comply with the Server Side Public License in all respects for - * all of the code used other than as permitted herein. If you modify file(s) - * with this exception, you may extend this exception to your version of the - * file(s), but you are not obligated to do so. If you do not wish to do so, - * delete this exception statement from your version. If you delete this - * exception statement from all source files in the program, then also delete - * it in the license file. - */ - -#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kExecutor - -#include "mongo/platform/basic.h" - -#include "mongo/transport/service_executor_adaptive.h" - -#include <array> -#include <random> - -#include "mongo/logv2/log.h" -#include "mongo/transport/service_entry_point_utils.h" -#include "mongo/transport/service_executor_gen.h" -#include "mongo/transport/service_executor_task_names.h" -#include "mongo/util/concurrency/thread_name.h" -#include "mongo/util/duration.h" -#include "mongo/util/processinfo.h" -#include "mongo/util/scopeguard.h" -#include "mongo/util/str.h" - -#include <asio.hpp> - -namespace mongo { -namespace transport { - -namespace { -constexpr auto kTotalQueued = "totalQueued"_sd; -constexpr auto kTotalExecuted = "totalExecuted"_sd; -constexpr auto kTotalTimeExecutingUs = "totalTimeExecutingMicros"_sd; -constexpr auto kTotalTimeRunningUs = "totalTimeRunningMicros"_sd; -constexpr auto kTotalTimeQueuedUs = "totalTimeQueuedMicros"_sd; -constexpr auto kThreadsInUse = "threadsInUse"_sd; -constexpr auto kThreadsRunning = "threadsRunning"_sd; -constexpr auto kThreadsPending = "threadsPending"_sd; -constexpr auto kExecutorLabel = "executor"_sd; -constexpr auto kExecutorName = "adaptive"_sd; -constexpr auto kStuckDetection = "stuckThreadsDetected"_sd; -constexpr auto kStarvation = "starvation"_sd; -constexpr auto kReserveMinimum = "belowReserveMinimum"_sd; -constexpr auto kThreadReasons = "threadCreationCauses"_sd; - -int64_t ticksToMicros(TickSource::Tick ticks, TickSource* tickSource) { - invariant(tickSource->getTicksPerSecond() >= 1000000); - return tickSource->ticksTo<Microseconds>(ticks).count(); -} - -struct ServerParameterOptions : public ServiceExecutorAdaptive::Options { - int reservedThreads() const final { - int value = adaptiveServiceExecutorReservedThreads.load(); - if (value == -1) { - value = ProcessInfo::getNumAvailableCores() / 2; - value = std::max(value, 2); - adaptiveServiceExecutorReservedThreads.store(value); - LOGV2( - 22951, - "No thread count configured for executor. Using number of cores / 2: {threadCount}", - "No thread count configured for executor. Using number of cores / 2", - "threadCount"_attr = value); - } - return value; - } - - Milliseconds workerThreadRunTime() const final { - return Milliseconds{adaptiveServiceExecutorRunTimeMillis.load()}; - } - - int runTimeJitter() const final { - return adaptiveServiceExecutorRunTimeJitterMillis.load(); - } - - Milliseconds stuckThreadTimeout() const final { - return Milliseconds{adaptiveServiceExecutorStuckThreadTimeoutMillis.load()}; - } - - Microseconds maxQueueLatency() const final { - static Nanoseconds minTimerResolution = getMinimumTimerResolution(); - Microseconds value{adaptiveServiceExecutorMaxQueueLatencyMicros.load()}; - if (value < minTimerResolution) { - LOGV2(22952, - "Target MaxQueueLatencyMicros ({targetMaxQueLatencyMicros}) is less than minimum " - "timer resolution of " - "OS ({OSMinTimerResolution}). Using {OSMinTimerResolution}", - "Target MaxQueueLatencyMicros is less than the OS minimum timer resolution. " - "Using the OS minimum", - "targetMaxQueLatencyMicros"_attr = value, - "OSMinTimerResolution"_attr = minTimerResolution); - value = duration_cast<Microseconds>(minTimerResolution) + Microseconds{1}; - adaptiveServiceExecutorMaxQueueLatencyMicros.store(value.count()); - } - return value; - } - - int idlePctThreshold() const final { - return adaptiveServiceExecutorIdlePctThreshold.load(); - } - - int recursionLimit() const final { - return adaptiveServiceExecutorRecursionLimit.load(); - } -}; - -} // namespace - -thread_local ServiceExecutorAdaptive::ThreadState* ServiceExecutorAdaptive::_localThreadState = - nullptr; - -ServiceExecutorAdaptive::ServiceExecutorAdaptive(ServiceContext* ctx, ReactorHandle reactor) - : ServiceExecutorAdaptive(ctx, std::move(reactor), std::make_unique<ServerParameterOptions>()) { -} - -ServiceExecutorAdaptive::ServiceExecutorAdaptive(ServiceContext* ctx, - ReactorHandle reactor, - std::unique_ptr<Options> config) - : _reactorHandle(reactor), - _config(std::move(config)), - _tickSource(ctx->getTickSource()), - _lastScheduleTimer(_tickSource) {} - -ServiceExecutorAdaptive::~ServiceExecutorAdaptive() { - invariant(!_isRunning.load()); -} - -Status ServiceExecutorAdaptive::start() { - invariant(!_isRunning.load()); - _isRunning.store(true); - _controllerThread = stdx::thread(&ServiceExecutorAdaptive::_controllerThreadRoutine, this); - for (auto i = 0; i < _config->reservedThreads(); i++) { - _startWorkerThread(ThreadCreationReason::kReserveMinimum); - } - - return Status::OK(); -} - -Status ServiceExecutorAdaptive::shutdown(Milliseconds timeout) { - if (!_isRunning.load()) - return Status::OK(); - - _isRunning.store(false); - - _scheduleCondition.notify_one(); - _controllerThread.join(); - - stdx::unique_lock<Latch> lk(_threadsMutex); - _reactorHandle->stop(); - bool result = - _deathCondition.wait_for(lk, timeout.toSystemDuration(), [&] { return _threads.empty(); }); - - return result - ? Status::OK() - : Status(ErrorCodes::Error::ExceededTimeLimit, - "adaptive executor couldn't shutdown all worker threads within time limit."); -} - -Status ServiceExecutorAdaptive::schedule(ServiceExecutorAdaptive::Task task, - ScheduleFlags flags, - ServiceExecutorTaskName taskName) { - auto scheduleTime = _tickSource->getTicks(); - auto pendingCounterPtr = (flags & kDeferredTask) ? &_deferredTasksQueued : &_tasksQueued; - pendingCounterPtr->addAndFetch(1); - - if (!_isRunning.load()) { - return {ErrorCodes::ShutdownInProgress, "Executor is not running"}; - } - - auto wrappedTask = - [this, task = std::move(task), scheduleTime, pendingCounterPtr, taskName, flags]( - auto status) { - pendingCounterPtr->subtractAndFetch(1); - auto start = _tickSource->getTicks(); - _totalSpentQueued.addAndFetch(start - scheduleTime); - - _localThreadState->threadMetrics[static_cast<size_t>(taskName)] - ._totalSpentQueued.addAndFetch(start - scheduleTime); - - if (_localThreadState->recursionDepth++ == 0) { - _localThreadState->executing.markRunning(); - _threadsInUse.addAndFetch(1); - } - const auto guard = makeGuard([this, taskName] { - if (--_localThreadState->recursionDepth == 0) { - _localThreadState->executingCurRun += - _localThreadState->executing.markStopped(); - _threadsInUse.subtractAndFetch(1); - } - _totalExecuted.addAndFetch(1); - _localThreadState->threadMetrics[static_cast<size_t>(taskName)] - ._totalExecuted.addAndFetch(1); - }); - - TickTimer _localTimer(_tickSource); - task(); - _localThreadState->threadMetrics[static_cast<size_t>(taskName)] - ._totalSpentExecuting.addAndFetch(_localTimer.sinceStartTicks()); - }; - - // Dispatching a task on the io_context will run the task immediately, and may run it - // on the current thread (if the current thread is running the io_context right now). - // - // Posting a task on the io_context will run the task without recursion. - // - // If the task is allowed to recurse and we are not over the depth limit, dispatch it so it - // can be called immediately and recursively. - if ((flags & kMayRecurse) && - (_localThreadState->recursionDepth + 1 < _config->recursionLimit())) { - _reactorHandle->dispatch(std::move(wrappedTask)); - } else { - _reactorHandle->schedule(std::move(wrappedTask)); - } - - _lastScheduleTimer.reset(); - _totalQueued.addAndFetch(1); - - _accumulatedMetrics[static_cast<size_t>(taskName)]._totalQueued.addAndFetch(1); - - // Deferred tasks never count against the thread starvation avoidance. For other tasks, we - // notify the controller thread that a task has been scheduled and we should monitor thread - // starvation. - if (_isStarved() && !(flags & kDeferredTask)) { - _starvationCheckRequests.addAndFetch(1); - _scheduleCondition.notify_one(); - } - - return Status::OK(); -} - -bool ServiceExecutorAdaptive::_isStarved() const { - // If threads are still starting, then assume we won't be starved pretty soon, return false - if (_threadsPending.load() > 0) - return false; - - auto tasksQueued = _tasksQueued.load(); - // If there are no pending tasks, then we definitely aren't starved - if (tasksQueued == 0) - return false; - - // The available threads is the number that are running - the number that are currently - // executing - auto available = _threadsRunning.load() - _threadsInUse.load(); - - return (tasksQueued > available); -} - -/* - * The pool of worker threads can become unhealthy in several ways, and the controller thread - * tries to keep the pool healthy by starting new threads when it is: - * - * Stuck: All threads are running a long-running task that's waiting on a network event, but - * there are no threads available to process network events. The thread pool cannot make progress - * without intervention. - * - * Starved: All threads are saturated with tasks and new tasks are having to queue for longer - * than the configured maxQueueLatency(). - * - * Below reserve: An error has occurred and there are fewer threads than the reserved minimum. - * - * While the executor is running, it runs in a loop waiting to be woken up by schedule() or a - * timeout to occur. When it wakes up, it ensures that: - * - The thread pool is not stuck longer than the configured stuckThreadTimeout(). If it is, then - * start a new thread and wait to be woken up again (or time out again and redo stuck thread - * detection). - * - The number of threads is >= the reservedThreads() value. If it isn't, then start as many - * threads as necessary. - * - Checking for starvation when requested by schedule(), and starting new threads if the - * pool is saturated and is starved longer than the maxQueueLatency() after being woken up - * by schedule(). - */ -void ServiceExecutorAdaptive::_controllerThreadRoutine() { - auto noopLock = MONGO_MAKE_LATCH(); - setThreadName("worker-controller"_sd); - - // Setup the timers/timeout values for stuck thread detection. - TickTimer sinceLastStuckThreadCheck(_tickSource); - auto stuckThreadTimeout = _config->stuckThreadTimeout(); - - // Get the initial values for our utilization percentage calculations - auto getTimerTotals = [this]() { - stdx::unique_lock<Latch> lk(_threadsMutex); - auto first = _getThreadTimerTotal(ThreadTimer::kExecuting, lk); - auto second = _getThreadTimerTotal(ThreadTimer::kRunning, lk); - return std::make_pair(first, second); - }; - - TickSource::Tick lastSpentExecuting, lastSpentRunning; - std::tie(lastSpentExecuting, lastSpentRunning) = getTimerTotals(); - - while (_isRunning.load()) { - // We want to wait for schedule() to wake us up, or for the stuck thread timeout to pass. - // So the timeout is the current stuck thread timeout - the last time we did stuck thread - // detection. - auto timeout = stuckThreadTimeout - sinceLastStuckThreadCheck.sinceStart(); - - bool maybeStarved = false; - // If the timeout is less than a millisecond then don't bother to go to sleep to wait for - // it, just do the stuck thread detection now. - if (timeout > Milliseconds{0}) { - stdx::unique_lock<decltype(noopLock)> scheduleLk(noopLock); - int checkRequests = 0; - maybeStarved = _scheduleCondition.wait_for( - scheduleLk, timeout.toSystemDuration(), [this, &checkRequests] { - if (!_isRunning.load()) - return false; - checkRequests = _starvationCheckRequests.load(); - return (checkRequests > 0); - }); - - _starvationCheckRequests.subtractAndFetch(checkRequests); - } - - // If the executor has stopped, then stop the controller altogether - if (!_isRunning.load()) - break; - - if (sinceLastStuckThreadCheck.sinceStart() >= stuckThreadTimeout) { - // Reset our timer so we know how long to sleep for the next time around; - sinceLastStuckThreadCheck.reset(); - - // Each call to schedule updates the last schedule ticks so we know the last time a - // task was scheduled - Milliseconds sinceLastSchedule = _lastScheduleTimer.sinceStart(); - - // If the number of tasks executing is the number of threads running (that is all - // threads are currently busy), and the last time a task was able to be scheduled was - // longer than our wait timeout, then we can assume all threads are stuck and we should - // start a new thread to unblock the pool. - // - if ((_threadsInUse.load() == _threadsRunning.load()) && - (sinceLastSchedule >= stuckThreadTimeout)) { - // When the executor is stuck, we halve the stuck thread timeout to be more - // aggressive the next time out unsticking the executor, and then start a new - // thread to unblock the executor for now. - stuckThreadTimeout /= 2; - stuckThreadTimeout = std::max(Milliseconds{10}, stuckThreadTimeout); - LOGV2(22953, - "Detected blocked worker threads. starting new thread to unblock service " - "executor. Stuck thread timeout now: {stuckThreadTimeout}", - "Detected blocked worker threads. starting new thread to unblock service " - "executor", - "stuckThreadTimeout"_attr = stuckThreadTimeout); - _startWorkerThread(ThreadCreationReason::kStuckDetection); - - // Since we've just started a worker thread, then we know that the executor isn't - // starved, so just loop back around to wait for the next control event. - continue; - } - - // If the executor wasn't stuck, then we should back off our stuck thread timeout back - // towards the configured value. - auto newStuckThreadTimeout = stuckThreadTimeout + (stuckThreadTimeout / 2); - newStuckThreadTimeout = std::min(_config->stuckThreadTimeout(), newStuckThreadTimeout); - if (newStuckThreadTimeout != stuckThreadTimeout) { - LOGV2_DEBUG(22954, - 1, - "Increasing stuck thread timeout to {newStuckThreadTimeout}", - "Increasing stuck thread timeout", - "newStuckThreadTimeout"_attr = newStuckThreadTimeout); - stuckThreadTimeout = newStuckThreadTimeout; - } - } - - auto threadsRunning = _threadsRunning.load(); - if (threadsRunning < _config->reservedThreads()) { - LOGV2(22955, - "Starting {numThreads} to replenish reserved worker", - "Starting threads to replenish reserved worker", - "numThreads"_attr = _config->reservedThreads() - threadsRunning); - while (_threadsRunning.load() < _config->reservedThreads()) { - _startWorkerThread(ThreadCreationReason::kReserveMinimum); - } - } - - // If we were notified by schedule() to do starvation checking, then we first need to - // calculate the overall utilization of the executor. - if (maybeStarved) { - - // Get the difference between the amount of time the executor has spent waiting for/ - // running tasks since the last time we measured. - TickSource::Tick spentExecuting, spentRunning; - std::tie(spentExecuting, spentRunning) = getTimerTotals(); - auto diffExecuting = spentExecuting - lastSpentExecuting; - auto diffRunning = spentRunning - lastSpentRunning; - - double utilizationPct; - // If we spent zero time running then the executor was fully idle and our utilization - // is zero percent - if (spentRunning == 0 || diffRunning == 0) - utilizationPct = 0.0; - else { - lastSpentExecuting = spentExecuting; - lastSpentRunning = spentRunning; - - utilizationPct = diffExecuting / static_cast<double>(diffRunning); - utilizationPct *= 100; - } - - // If the utilization percentage is less than our threshold then we don't want to - // do anything because the threads are not actually saturated with work. - if (utilizationPct < _config->idlePctThreshold()) { - continue; - } - } - - // While there are threads that are still starting up, wait for the max queue latency, - // up to the current stuck thread timeout. - do { - stdx::this_thread::sleep_for(_config->maxQueueLatency().toSystemDuration()); - } while ((_threadsPending.load() > 0) && - (sinceLastStuckThreadCheck.sinceStart() < stuckThreadTimeout)); - - // If the number of pending tasks is greater than the number of running threads minus the - // number of tasks executing (the number of free threads), then start a new worker to - // avoid starvation. - if (_isStarved()) { - LOGV2(22956, "Starting worker thread to avoid starvation."); - _startWorkerThread(ThreadCreationReason::kStarvation); - } - } -} - -void ServiceExecutorAdaptive::_startWorkerThread(ThreadCreationReason reason) { - stdx::unique_lock<Latch> lk(_threadsMutex); - auto it = _threads.emplace(_threads.begin(), _tickSource); - auto num = _threads.size(); - - _threadsPending.addAndFetch(1); - _threadsRunning.addAndFetch(1); - _threadStartCounters[static_cast<size_t>(reason)] += 1; - - lk.unlock(); - - const auto launchResult = - launchServiceWorkerThread([this, num, it] { _workerThreadRoutine(num, it); }); - - if (!launchResult.isOK()) { - LOGV2_WARNING(22959, - "Failed to launch new worker thread: {error}", - "Failed to launch new worker thread", - "error"_attr = launchResult); - lk.lock(); - _threadsPending.subtractAndFetch(1); - _threadsRunning.subtractAndFetch(1); - _threadStartCounters[static_cast<size_t>(reason)] -= 1; - _threads.erase(it); - } -} - -Milliseconds ServiceExecutorAdaptive::_getThreadJitter() const { - static auto jitterMutex = MONGO_MAKE_LATCH(); - static std::default_random_engine randomEngine = [] { - std::random_device seed; - return std::default_random_engine(seed()); - }(); - - auto jitterParam = _config->runTimeJitter(); - if (jitterParam == 0) - return Milliseconds{0}; - - std::uniform_int_distribution<> jitterDist(-jitterParam, jitterParam); - - stdx::lock_guard<Latch> lk(jitterMutex); - auto jitter = jitterDist(randomEngine); - if (jitter > _config->workerThreadRunTime().count()) - jitter = 0; - - return Milliseconds{jitter}; -} - -void ServiceExecutorAdaptive::_accumulateTaskMetrics(MetricsArray* outArray, - const MetricsArray& inputArray) const { - for (auto it = inputArray.begin(); it != inputArray.end(); ++it) { - auto taskName = static_cast<ServiceExecutorTaskName>(std::distance(inputArray.begin(), it)); - auto& output = outArray->at(static_cast<size_t>(taskName)); - - output._totalSpentExecuting.addAndFetch(it->_totalSpentExecuting.load()); - output._totalSpentQueued.addAndFetch(it->_totalSpentQueued.load()); - output._totalExecuted.addAndFetch(it->_totalExecuted.load()); - output._totalQueued.addAndFetch(it->_totalQueued.load()); - } -} - -void ServiceExecutorAdaptive::_accumulateAllTaskMetrics(MetricsArray* outputMetricsArray, - const stdx::unique_lock<Latch>& lk) const { - _accumulateTaskMetrics(outputMetricsArray, _accumulatedMetrics); - for (auto& thread : _threads) { - _accumulateTaskMetrics(outputMetricsArray, thread.threadMetrics); - } -} - -TickSource::Tick ServiceExecutorAdaptive::_getThreadTimerTotal( - ThreadTimer which, const stdx::unique_lock<Latch>& lk) const { - TickSource::Tick accumulator; - switch (which) { - case ThreadTimer::kRunning: - accumulator = _pastThreadsSpentRunning.load(); - break; - case ThreadTimer::kExecuting: - accumulator = _pastThreadsSpentExecuting.load(); - break; - } - - for (auto& thread : _threads) { - switch (which) { - case ThreadTimer::kRunning: - accumulator += thread.running.totalTime(); - break; - case ThreadTimer::kExecuting: - accumulator += thread.executing.totalTime(); - break; - } - } - - return accumulator; -} - -void ServiceExecutorAdaptive::_workerThreadRoutine( - int threadId, ServiceExecutorAdaptive::ThreadList::iterator state) { - _threadsPending.subtractAndFetch(1); - _localThreadState = &(*state); - { - std::string threadName = str::stream() << "worker-" << threadId; - setThreadName(threadName); - } - - LOGV2(22957, - "Started new database worker thread {id}", - "Started new database worker thread", - "id"_attr = threadId); - - bool guardThreadsRunning = true; - const auto guard = makeGuard([this, &guardThreadsRunning, state] { - if (guardThreadsRunning) - _threadsRunning.subtractAndFetch(1); - _pastThreadsSpentRunning.addAndFetch(state->running.totalTime()); - _pastThreadsSpentExecuting.addAndFetch(state->executing.totalTime()); - - _accumulateTaskMetrics(&_accumulatedMetrics, state->threadMetrics); - { - stdx::lock_guard<Latch> lk(_threadsMutex); - _threads.erase(state); - } - _deathCondition.notify_one(); - }); - - auto jitter = _getThreadJitter(); - - while (_isRunning.load()) { - // We don't want all the threads to start/stop running at exactly the same time, so the - // jitter setParameter adds/removes a random small amount of time to the runtime. - Milliseconds runTime = _config->workerThreadRunTime() + jitter; - dassert(runTime.count() > 0); - - // Reset ticksSpentExecuting timer - state->executingCurRun = 0; - - // If we're still "pending" only try to run one task, that way the controller will - // know that it's okay to start adding threads to avoid starvation again. - state->running.markRunning(); - _reactorHandle->runFor(runTime); - - auto spentRunning = state->running.markStopped(); - - // If we spent less than our idle threshold actually running tasks then exit the thread. - // This is a helper lambda to perform that calculation. - const auto calculatePctExecuting = [&spentRunning, &state]() { - // This time measurement doesn't include time spent running network callbacks, - // so the threshold is lower than you'd expect. - dassert(spentRunning < std::numeric_limits<double>::max()); - - // First get the ratio of ticks spent executing to ticks spent running. We - // expect this to be <= 1.0 - double executingToRunning = state->executingCurRun / static_cast<double>(spentRunning); - - // Multiply that by 100 to get the percentage of time spent executing tasks. We - // expect this to be <= 100. - executingToRunning *= 100; - dassert(executingToRunning <= 100); - - return static_cast<int>(executingToRunning); - }; - - bool terminateThread = false; - int pctExecuting; - int runningThreads; - - // Make sure we don't terminate threads below the reserved threshold. As there can be - // several worker threads concurrently in this terminate logic atomically reduce the threads - // one by one to avoid racing using a lockless compare-and-swap loop where we retry if there - // is contention on the atomic. - do { - runningThreads = _threadsRunning.load(); - - if (runningThreads <= _config->reservedThreads()) { - terminateThread = false; - break; // keep thread - } - - if (!terminateThread) { - pctExecuting = calculatePctExecuting(); - terminateThread = pctExecuting <= _config->idlePctThreshold(); - } - } while (terminateThread && - !_threadsRunning.compareAndSwap(&runningThreads, runningThreads - 1)); - if (terminateThread) { - LOGV2(22958, - "Thread exiting because utiliaztion {utilizationPct}% was under the idle " - "threshold in the runtime window {runtimeWindow}", - "Thread exiting because utiliaztion was under the idle threshold in the runtime " - "window", - "unilizationPercent"_attr = pctExecuting, - "runtimeWindow"_attr = runTime); - - // Because we've already modified _threadsRunning, make sure the thread guard also - // doesn't do it. - guardThreadsRunning = false; - break; - } - } -} - -StringData ServiceExecutorAdaptive::_threadStartedByToString( - ServiceExecutorAdaptive::ThreadCreationReason reason) { - switch (reason) { - case ThreadCreationReason::kStuckDetection: - return kStuckDetection; - case ThreadCreationReason::kStarvation: - return kStarvation; - case ThreadCreationReason::kReserveMinimum: - return kReserveMinimum; - default: - MONGO_UNREACHABLE; - } -} - -void ServiceExecutorAdaptive::appendStats(BSONObjBuilder* bob) const { - stdx::unique_lock<Latch> lk(_threadsMutex); - *bob << kExecutorLabel << kExecutorName // - << kTotalQueued << _totalQueued.load() // - << kTotalExecuted << _totalExecuted.load() // - << kThreadsInUse << _threadsInUse.load() // - << kTotalTimeRunningUs // - << ticksToMicros(_getThreadTimerTotal(ThreadTimer::kRunning, lk), _tickSource) // - << kTotalTimeExecutingUs // - << ticksToMicros(_getThreadTimerTotal(ThreadTimer::kExecuting, lk), _tickSource) // - << kTotalTimeQueuedUs << ticksToMicros(_totalSpentQueued.load(), _tickSource) // - << kThreadsRunning << _threadsRunning.load() // - << kThreadsPending << _threadsPending.load(); - - BSONObjBuilder threadStartReasons(bob->subobjStart(kThreadReasons)); - for (size_t i = 0; i < _threadStartCounters.size(); i++) { - threadStartReasons << _threadStartedByToString(static_cast<ThreadCreationReason>(i)) - << _threadStartCounters[i]; - } - - threadStartReasons.doneFast(); - - BSONObjBuilder metricsByTask(bob->subobjStart("metricsByTask")); - MetricsArray totalMetrics; - _accumulateAllTaskMetrics(&totalMetrics, lk); - lk.unlock(); - for (auto it = totalMetrics.begin(); it != totalMetrics.end(); ++it) { - auto taskName = - static_cast<ServiceExecutorTaskName>(std::distance(totalMetrics.begin(), it)); - auto taskNameString = taskNameToString(taskName); - BSONObjBuilder subSection(metricsByTask.subobjStart(taskNameString)); - subSection << kTotalQueued << it->_totalQueued.load() << kTotalExecuted - << it->_totalExecuted.load() << kTotalTimeExecutingUs - << ticksToMicros(it->_totalSpentExecuting.load(), _tickSource) - << kTotalTimeQueuedUs - << ticksToMicros(it->_totalSpentQueued.load(), _tickSource); - - subSection.doneFast(); - } - metricsByTask.doneFast(); -} - -} // namespace transport -} // namespace mongo diff --git a/src/mongo/transport/service_executor_adaptive.h b/src/mongo/transport/service_executor_adaptive.h deleted file mode 100644 index bf3b55b2c5c..00000000000 --- a/src/mongo/transport/service_executor_adaptive.h +++ /dev/null @@ -1,251 +0,0 @@ -/** - * Copyright (C) 2018-present MongoDB, Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the Server Side Public License, version 1, - * as published by MongoDB, Inc. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * Server Side Public License for more details. - * - * You should have received a copy of the Server Side Public License - * along with this program. If not, see - * <http://www.mongodb.com/licensing/server-side-public-license>. - * - * As a special exception, the copyright holders give permission to link the - * code of portions of this program with the OpenSSL library under certain - * conditions as described in each individual source file and distribute - * linked combinations including the program with the OpenSSL library. You - * must comply with the Server Side Public License in all respects for - * all of the code used other than as permitted herein. If you modify file(s) - * with this exception, you may extend this exception to your version of the - * file(s), but you are not obligated to do so. If you do not wish to do so, - * delete this exception statement from your version. If you delete this - * exception statement from all source files in the program, then also delete - * it in the license file. - */ - -#pragma once - -#include <array> -#include <list> -#include <vector> - -#include "mongo/db/service_context.h" -#include "mongo/platform/atomic_word.h" -#include "mongo/stdx/condition_variable.h" -#include "mongo/stdx/thread.h" -#include "mongo/transport/service_executor.h" -#include "mongo/transport/service_executor_task_names.h" -#include "mongo/transport/transport_layer.h" -#include "mongo/util/tick_source.h" - -namespace mongo { -namespace transport { - -/** - * This is an ASIO-based adaptive ServiceExecutor. It guarantees that threads will not become stuck - * or deadlocked longer that its configured timeout and that idle threads will terminate themselves - * if they spend more than its configure idle threshold idle. - */ -class ServiceExecutorAdaptive : public ServiceExecutor { -public: - struct Options { - virtual ~Options() = default; - // The minimum number of threads the executor will keep running to service tasks. - virtual int reservedThreads() const = 0; - - // The amount of time each worker thread runs before considering exiting because of - // idleness. - virtual Milliseconds workerThreadRunTime() const = 0; - - // workerThreadRuntime() is offset by a random value between -jitter and +jitter to prevent - // thundering herds - virtual int runTimeJitter() const = 0; - - // The amount of time the controller thread will wait before checking for stuck threads - // to guarantee forward progress - virtual Milliseconds stuckThreadTimeout() const = 0; - - // The maximum allowed latency between when a task is scheduled and a thread is started to - // service it. - virtual Microseconds maxQueueLatency() const = 0; - - // Threads that spend less than this threshold doing work during their workerThreadRunTime - // period will exit - virtual int idlePctThreshold() const = 0; - - // The maximum allowable depth of recursion for tasks scheduled with the MayRecurse flag - // before stack unwinding is forced. - virtual int recursionLimit() const = 0; - }; - - explicit ServiceExecutorAdaptive(ServiceContext* ctx, ReactorHandle reactor); - explicit ServiceExecutorAdaptive(ServiceContext* ctx, - ReactorHandle reactor, - std::unique_ptr<Options> config); - - ServiceExecutorAdaptive(ServiceExecutorAdaptive&&) = default; - ServiceExecutorAdaptive& operator=(ServiceExecutorAdaptive&&) = default; - virtual ~ServiceExecutorAdaptive(); - - Status start() final; - Status shutdown(Milliseconds timeout) final; - Status schedule(Task task, ScheduleFlags flags, ServiceExecutorTaskName taskName) final; - - Mode transportMode() const final { - return Mode::kAsynchronous; - } - - void appendStats(BSONObjBuilder* bob) const final; - - int threadsRunning() { - return _threadsRunning.load(); - } - -private: - class TickTimer { - public: - explicit TickTimer(TickSource* tickSource) - : _tickSource(tickSource), - _ticksPerMillisecond(_tickSource->getTicksPerSecond() / 1000), - _start(_tickSource->getTicks()) { - invariant(_ticksPerMillisecond > 0); - } - - TickSource::Tick sinceStartTicks() const { - return _tickSource->getTicks() - _start.load(); - } - - Milliseconds sinceStart() const { - return Milliseconds{sinceStartTicks() / _ticksPerMillisecond}; - } - - void reset() { - _start.store(_tickSource->getTicks()); - } - - private: - TickSource* const _tickSource; - const TickSource::Tick _ticksPerMillisecond; - AtomicWord<TickSource::Tick> _start; - }; - - class CumulativeTickTimer { - public: - CumulativeTickTimer(TickSource* ts) : _timer(ts) {} - - TickSource::Tick markStopped() { - stdx::lock_guard<Latch> lk(_mutex); - invariant(_running); - _running = false; - auto curTime = _timer.sinceStartTicks(); - _accumulator += curTime; - return curTime; - } - - void markRunning() { - stdx::lock_guard<Latch> lk(_mutex); - invariant(!_running); - _timer.reset(); - _running = true; - } - - TickSource::Tick totalTime() const { - stdx::lock_guard<Latch> lk(_mutex); - if (!_running) - return _accumulator; - return _timer.sinceStartTicks() + _accumulator; - } - - private: - TickTimer _timer; - mutable Mutex _mutex = MONGO_MAKE_LATCH("::_mutex"); - TickSource::Tick _accumulator = 0; - bool _running = false; - }; - - struct Metrics { - AtomicWord<int64_t> _totalQueued{0}; - AtomicWord<int64_t> _totalExecuted{0}; - AtomicWord<TickSource::Tick> _totalSpentQueued{0}; - AtomicWord<TickSource::Tick> _totalSpentExecuting{0}; - }; - - using MetricsArray = - std::array<Metrics, static_cast<size_t>(ServiceExecutorTaskName::kMaxTaskName)>; - - enum class ThreadCreationReason { kStuckDetection, kStarvation, kReserveMinimum, kMax }; - enum class ThreadTimer { kRunning, kExecuting }; - - struct ThreadState { - ThreadState(TickSource* ts) : running(ts), executing(ts) {} - - CumulativeTickTimer running; - TickSource::Tick executingCurRun; - CumulativeTickTimer executing; - MetricsArray threadMetrics; - std::int64_t markIdleCounter = 0; - int recursionDepth = 0; - }; - - using ThreadList = std::list<ThreadState>; - - void _startWorkerThread(ThreadCreationReason reason); - static StringData _threadStartedByToString(ThreadCreationReason reason); - void _workerThreadRoutine(int threadId, ThreadList::iterator it); - void _controllerThreadRoutine(); - bool _isStarved() const; - Milliseconds _getThreadJitter() const; - - void _accumulateTaskMetrics(MetricsArray* outArray, const MetricsArray& inputArray) const; - void _accumulateAllTaskMetrics(MetricsArray* outputMetricsArray, - const stdx::unique_lock<Latch>& lk) const; - TickSource::Tick _getThreadTimerTotal(ThreadTimer which, - const stdx::unique_lock<Latch>& lk) const; - - ReactorHandle _reactorHandle; - - std::unique_ptr<Options> _config; - - mutable Mutex _threadsMutex = MONGO_MAKE_LATCH("ServiceExecutorAdaptive::_threadsMutex"); - ThreadList _threads; - std::array<int64_t, static_cast<size_t>(ThreadCreationReason::kMax)> _threadStartCounters; - - stdx::thread _controllerThread; - - TickSource* const _tickSource; - AtomicWord<bool> _isRunning{false}; - - // These counters are used to detect stuck threads and high task queuing. - AtomicWord<int> _threadsRunning{0}; - AtomicWord<int> _threadsPending{0}; - AtomicWord<int> _threadsInUse{0}; - AtomicWord<int> _tasksQueued{0}; - AtomicWord<int> _deferredTasksQueued{0}; - TickTimer _lastScheduleTimer; - AtomicWord<TickSource::Tick> _pastThreadsSpentExecuting{0}; - AtomicWord<TickSource::Tick> _pastThreadsSpentRunning{0}; - static thread_local ThreadState* _localThreadState; - - // These counters are only used for reporting in serverStatus. - AtomicWord<int64_t> _totalQueued{0}; - AtomicWord<int64_t> _totalExecuted{0}; - AtomicWord<TickSource::Tick> _totalSpentQueued{0}; - - // Threads signal this condition variable when they exit so we can gracefully shutdown - // the executor. - stdx::condition_variable _deathCondition; - - // Tasks should signal this condition variable if they want the thread controller to - // track their progress and do fast stuck detection - AtomicWord<int> _starvationCheckRequests{0}; - stdx::condition_variable _scheduleCondition; - - MetricsArray _accumulatedMetrics; -}; - -} // namespace transport -} // namespace mongo diff --git a/src/mongo/transport/service_executor_adaptive_test.cpp b/src/mongo/transport/service_executor_adaptive_test.cpp deleted file mode 100644 index b47a73089a1..00000000000 --- a/src/mongo/transport/service_executor_adaptive_test.cpp +++ /dev/null @@ -1,397 +0,0 @@ -/** - * Copyright (C) 2018-present MongoDB, Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the Server Side Public License, version 1, - * as published by MongoDB, Inc. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * Server Side Public License for more details. - * - * You should have received a copy of the Server Side Public License - * along with this program. If not, see - * <http://www.mongodb.com/licensing/server-side-public-license>. - * - * As a special exception, the copyright holders give permission to link the - * code of portions of this program with the OpenSSL library under certain - * conditions as described in each individual source file and distribute - * linked combinations including the program with the OpenSSL library. You - * must comply with the Server Side Public License in all respects for - * all of the code used other than as permitted herein. If you modify file(s) - * with this exception, you may extend this exception to your version of the - * file(s), but you are not obligated to do so. If you do not wish to do so, - * delete this exception statement from your version. If you delete this - * exception statement from all source files in the program, then also delete - * it in the license file. - */ - -#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kTest - -#include "mongo/platform/basic.h" - -#include "boost/optional.hpp" - -#include "mongo/db/service_context.h" -#include "mongo/logv2/log.h" -#include "mongo/transport/service_executor_adaptive.h" -#include "mongo/unittest/unittest.h" -#include "mongo/util/scopeguard.h" - -#include <asio.hpp> - -namespace mongo { -namespace { -using namespace transport; - -struct TestOptions : public ServiceExecutorAdaptive::Options { - int reservedThreads() const final { - return 1; - } - - Milliseconds workerThreadRunTime() const final { - return Milliseconds{1000}; - } - - int runTimeJitter() const final { - return 0; - } - - Milliseconds stuckThreadTimeout() const final { - return Milliseconds{100}; - } - - Microseconds maxQueueLatency() const final { - return duration_cast<Microseconds>(Milliseconds{10}); - } - - int idlePctThreshold() const final { - return 0; - } - - int recursionLimit() const final { - return 0; - } -}; - -struct RecursionOptions : public ServiceExecutorAdaptive::Options { - int reservedThreads() const final { - return 1; - } - - Milliseconds workerThreadRunTime() const final { - return Milliseconds{1000}; - } - - int runTimeJitter() const final { - return 0; - } - - Milliseconds stuckThreadTimeout() const final { - return Milliseconds{100}; - } - - Microseconds maxQueueLatency() const final { - return duration_cast<Microseconds>(Milliseconds{5}); - } - - int idlePctThreshold() const final { - return 0; - } - - int recursionLimit() const final { - return 10; - } -}; - -class ServiceExecutorAdaptiveFixture : public unittest::Test { -protected: - void setUp() override { - setGlobalServiceContext(ServiceContext::make()); - asioIoCtx = std::make_shared<asio::io_context>(); - } - - std::shared_ptr<asio::io_context> asioIoCtx; - - mutex = MONGO_MAKE_LATCH("ServiceExecutorAdaptiveFixture::mutex"); - AtomicWord<int> waitFor{-1}; - stdx::condition_variable cond; - std::function<void()> notifyCallback = [this] { - stdx::unique_lock<Latch> lk(mutex); - invariant(waitFor.load() != -1); - waitFor.fetchAndSubtract(1); - cond.notify_one(); - LOGV2(22960, "Ran callback"); - }; - - void waitForCallback(int expected, boost::optional<Milliseconds> timeout = boost::none) { - stdx::unique_lock<Latch> lk(mutex); - invariant(waitFor.load() != -1); - if (timeout) { - ASSERT_TRUE(cond.wait_for( - lk, timeout->toSystemDuration(), [&] { return waitFor.load() == expected; })); - } else { - cond.wait(lk, [&] { return waitFor.load() == expected; }); - } - } - - ServiceExecutorAdaptive::Options* config; - - template <class Options> - std::unique_ptr<ServiceExecutorAdaptive> makeAndStartExecutor() { - auto configOwned = std::make_unique<Options>(); - config = configOwned.get(); - auto exec = std::make_unique<ServiceExecutorAdaptive>( - getGlobalServiceContext(), asioIoCtx, std::move(configOwned)); - - ASSERT_OK(exec->start()); - LOGV2(22961, "wait for executor to finish starting"); - waitFor.store(1); - ASSERT_OK(exec->schedule(notifyCallback, - ServiceExecutor::kEmptyFlags, - ServiceExecutorTaskName::kSSMProcessMessage)); - waitForCallback(0); - ASSERT_EQ(exec->threadsRunning(), config->reservedThreads()); - - return exec; - } -}; - -/* - * This tests that the executor will launch a new thread if the current threads are blocked, and - * that those threads retire when they become idle. - */ -TEST_F(ServiceExecutorAdaptiveFixture, TestStuckTask) { - auto blockedMutex = MONGO_MAKE_LATCH(); - stdx::unique_lock<Latch> blockedLock(blockedMutex); - - auto exec = makeAndStartExecutor<TestOptions>(); - auto guard = makeGuard([&] { - if (blockedLock) - blockedLock.unlock(); - ASSERT_OK(exec->shutdown(config->workerThreadRunTime() * 2)); - }); - - LOGV2(22962, "Scheduling blocked task"); - waitFor.store(3); - ASSERT_OK(exec->schedule( - [this, &blockedMutex] { - notifyCallback(); - stdx::unique_lock<Latch> lk(blockedMutex); - notifyCallback(); - }, - ServiceExecutor::kEmptyFlags, - ServiceExecutorTaskName::kSSMProcessMessage)); - - LOGV2(22963, "Scheduling task stuck on blocked task"); - ASSERT_OK(exec->schedule( - notifyCallback, ServiceExecutor::kEmptyFlags, ServiceExecutorTaskName::kSSMProcessMessage)); - - LOGV2(22964, "Waiting for second thread to start"); - waitForCallback(1); - ASSERT_EQ(exec->threadsRunning(), 2); - - LOGV2(22965, "Waiting for unstuck task to run"); - blockedLock.unlock(); - waitForCallback(0); - ASSERT_EQ(exec->threadsRunning(), 2); - - LOGV2(22966, "Waiting for second thread to idle out"); - stdx::this_thread::sleep_for(config->workerThreadRunTime().toSystemDuration() * 1.5); - ASSERT_EQ(exec->threadsRunning(), config->reservedThreads()); -} - -/* - * This tests that the executor will start a new batch of reserved threads if it detects that - * all - * threads are running a task for longer than the stuckThreadTimeout. - */ -TEST_F(ServiceExecutorAdaptiveFixture, TestStuckThreads) { - auto blockedMutex = MONGO_MAKE_LATCH(); - stdx::unique_lock<Latch> blockedLock(blockedMutex); - - auto exec = makeAndStartExecutor<TestOptions>(); - auto guard = makeGuard([&] { - if (blockedLock) - blockedLock.unlock(); - ASSERT_OK(exec->shutdown(config->workerThreadRunTime() * 2)); - }); - - auto blockedTask = [this, &blockedMutex] { - LOGV2(22967, "waiting on blocked mutex"); - notifyCallback(); - stdx::unique_lock<Latch> lk(blockedMutex); - notifyCallback(); - }; - - waitFor.store(6); - auto tasks = waitFor.load() / 2; - LOGV2(22968, - "Scheduling {tasks} blocked tasks", - "Scheduling blocked tasks", - "tasks"_attr = tasks); - for (auto i = 0; i < tasks; i++) { - ASSERT_OK(exec->schedule(blockedTask, - ServiceExecutor::kEmptyFlags, - ServiceExecutorTaskName::kSSMProcessMessage)); - } - - LOGV2(22969, "Waiting for executor to start new threads"); - waitForCallback(3); - - LOGV2(22970, "All threads blocked, wait for executor to detect block and start a new thread."); - - // The controller thread in the adaptive executor runs on a stuckThreadTimeout in normal - // operation where no starvation is detected (shouldn't be in this test as all threads should be - // blocked). By waiting here for stuckThreadTimeout*3 it means that we have stuckThreadTimeout*2 - // for other waits in the controller and boot up a new thread which should be enough. - stdx::this_thread::sleep_for(config->stuckThreadTimeout().toSystemDuration() * 3); - - ASSERT_EQ(exec->threadsRunning(), waitFor.load() + config->reservedThreads()); - - LOGV2(22971, "Waiting for unstuck task to run"); - blockedLock.unlock(); - waitForCallback(0); -} - -/* - * This tests that the executor will launch more threads when starvation is detected. We launch - * another task from itself so there will always be a queue of a waiting task if there's just one - * thread. - */ -TEST_F(ServiceExecutorAdaptiveFixture, TestStarvation) { - auto exec = makeAndStartExecutor<TestOptions>(); - - // auto so = MONGO_MAKE_LATCH() we don't attempt to call schedule and shutdown concurrently - auto scheduleMutex = MONGO_MAKE_LATCH(); - - auto guard = makeGuard([&] { ASSERT_OK(exec->shutdown(config->workerThreadRunTime() * 2)); }); - - bool scheduleNew{true}; - - std::function<void()> task; - task = [this, &task, &exec, &scheduleMutex, &scheduleNew] { - // This sleep needs to be larger than the sleep below to be able to limit the amount of - // starvation. - stdx::this_thread::sleep_for(config->maxQueueLatency().toSystemDuration() * 5); - - { - stdx::unique_lock<Latch> lock(scheduleMutex); - - if (scheduleNew) { - ASSERT_OK(exec->schedule(task, - ServiceExecutor::kEmptyFlags, - ServiceExecutorTaskName::kSSMProcessMessage)); - } - } - - // This sleep needs to be larger than maxQueueLatency, when we schedule above the controller - // thread will wake up because starvation is detected. By the time the controller thread - // have slept for maxQueueLatency both worker threads should be executing work and there's - // no further starvation. It needs to be significantly larger to avoid a race with asio - // post. In the case of the first time when there's only one worker thread, starvation - // should be detected and the second worker will be started. - stdx::this_thread::sleep_for(config->maxQueueLatency().toSystemDuration() * 2); - }; - - ASSERT_OK(exec->schedule( - task, ServiceExecutor::kEmptyFlags, ServiceExecutorTaskName::kSSMProcessMessage)); - - stdx::this_thread::sleep_for(config->workerThreadRunTime().toSystemDuration() * 2); - ASSERT_EQ(exec->threadsRunning(), 2); - - stdx::unique_lock<Latch> lock(scheduleMutex); - scheduleNew = false; -} - -/* - * This tests that the executor can execute tasks recursively. If it can't starvation will be - * detected and new threads started. - */ -TEST_F(ServiceExecutorAdaptiveFixture, TestRecursion) { - auto exec = makeAndStartExecutor<RecursionOptions>(); - - AtomicWord<int> remainingTasks{config->recursionLimit() - 1}; - auto mutex = MONGO_MAKE_LATCH(); - stdx::condition_variable cv; - std::function<void()> task; - - auto guard = makeGuard([&] { ASSERT_OK(exec->shutdown(config->workerThreadRunTime() * 2)); }); - - task = [this, &task, &exec, &mutex, &cv, &remainingTasks] { - if (remainingTasks.subtractAndFetch(1) == 0) { - LOGV2(22972, "Signaling job done"); - cv.notify_one(); - return; - } - - LOGV2(22973, "Starting task recursively"); - - ASSERT_OK(exec->schedule( - task, ServiceExecutor::kMayRecurse, ServiceExecutorTaskName::kSSMProcessMessage)); - - // Make sure we don't block too long because then the block detection logic would kick in. - stdx::this_thread::sleep_for(config->stuckThreadTimeout().toSystemDuration() / - (config->recursionLimit() * 2)); - LOGV2(22974, "Completing task recursively"); - }; - - stdx::unique_lock<Latch> lock(mutex); - - ASSERT_OK(exec->schedule( - task, ServiceExecutor::kEmptyFlags, ServiceExecutorTaskName::kSSMProcessMessage)); - - cv.wait_for(lock, config->workerThreadRunTime().toSystemDuration(), [&remainingTasks]() { - return remainingTasks.load() == 0; - }); - - ASSERT_EQ(exec->threadsRunning(), config->reservedThreads()); -} - -/* - * This tests that deferred tasks don't cause a new thread to be created, and they don't - * interfere - * with new normal tasks - */ -TEST_F(ServiceExecutorAdaptiveFixture, TestDeferredTasks) { - auto blockedMutex = MONGO_MAKE_LATCH(); - stdx::unique_lock<Latch> blockedLock(blockedMutex); - - auto exec = makeAndStartExecutor<TestOptions>(); - auto guard = makeGuard([&] { - if (blockedLock) - blockedLock.unlock(); - ASSERT_OK(exec->shutdown(config->workerThreadRunTime() * 2)); - }); - - waitFor.store(3); - LOGV2(22975, "Scheduling a blocking task"); - ASSERT_OK(exec->schedule( - [this, &blockedMutex] { - stdx::unique_lock<Latch> lk(blockedMutex); - notifyCallback(); - }, - ServiceExecutor::kEmptyFlags, - ServiceExecutorTaskName::kSSMProcessMessage)); - - LOGV2(22976, "Scheduling deferred task"); - ASSERT_OK(exec->schedule(notifyCallback, - ServiceExecutor::kDeferredTask, - ServiceExecutorTaskName::kSSMProcessMessage)); - - ASSERT_THROWS(waitForCallback(1, config->stuckThreadTimeout()), - unittest::TestAssertionFailureException); - - LOGV2(22977, "Scheduling non-deferred task"); - ASSERT_OK(exec->schedule( - notifyCallback, ServiceExecutor::kEmptyFlags, ServiceExecutorTaskName::kSSMProcessMessage)); - waitForCallback(1, config->stuckThreadTimeout()); - ASSERT_GT(exec->threadsRunning(), config->reservedThreads()); - - blockedLock.unlock(); - waitForCallback(0); -} - -} // namespace -} // namespace mongo diff --git a/src/mongo/transport/service_executor_test.cpp b/src/mongo/transport/service_executor_test.cpp index 684efc24070..e584a921f12 100644 --- a/src/mongo/transport/service_executor_test.cpp +++ b/src/mongo/transport/service_executor_test.cpp @@ -35,9 +35,9 @@ #include "mongo/db/service_context.h" #include "mongo/logv2/log.h" -#include "mongo/transport/service_executor_adaptive.h" #include "mongo/transport/service_executor_synchronous.h" #include "mongo/transport/service_executor_task_names.h" +#include "mongo/transport/transport_layer.h" #include "mongo/unittest/unittest.h" #include "mongo/util/scopeguard.h" @@ -53,36 +53,6 @@ constexpr Milliseconds kWorkerThreadRunTime{1000}; const Milliseconds kShutdownTime = kWorkerThreadRunTime + Milliseconds{50}; } // namespace -struct TestOptions : public ServiceExecutorAdaptive::Options { - int reservedThreads() const final { - return 1; - } - - Milliseconds workerThreadRunTime() const final { - return kWorkerThreadRunTime; - } - - int runTimeJitter() const final { - return 0; - } - - Milliseconds stuckThreadTimeout() const final { - return Milliseconds{100}; - } - - Microseconds maxQueueLatency() const final { - return duration_cast<Microseconds>(Milliseconds{5}); - } - - int idlePctThreshold() const final { - return 0; - } - - int recursionLimit() const final { - return 0; - } -}; - /* This implements the portions of the transport::Reactor based on ASIO, but leaves out * the methods not needed by ServiceExecutors. * @@ -149,23 +119,6 @@ private: asio::io_context _ioContext; }; -class ServiceExecutorAdaptiveFixture : public unittest::Test { -protected: - void setUp() override { - auto scOwned = ServiceContext::make(); - setGlobalServiceContext(std::move(scOwned)); - - auto configOwned = std::make_unique<TestOptions>(); - executorConfig = configOwned.get(); - executor = std::make_unique<ServiceExecutorAdaptive>( - getGlobalServiceContext(), std::make_shared<ASIOReactor>(), std::move(configOwned)); - } - - ServiceExecutorAdaptive::Options* executorConfig; - std::unique_ptr<ServiceExecutorAdaptive> executor; - std::shared_ptr<asio::io_context> asioIOCtx; -}; - class ServiceExecutorSynchronousFixture : public unittest::Test { protected: void setUp() override { @@ -197,17 +150,6 @@ void scheduleBasicTask(ServiceExecutor* exec, bool expectSuccess) { } } -TEST_F(ServiceExecutorAdaptiveFixture, BasicTaskRuns) { - ASSERT_OK(executor->start()); - auto guard = makeGuard([this] { ASSERT_OK(executor->shutdown(kShutdownTime)); }); - - scheduleBasicTask(executor.get(), true); -} - -TEST_F(ServiceExecutorAdaptiveFixture, ScheduleFailsBeforeStartup) { - scheduleBasicTask(executor.get(), false); -} - TEST_F(ServiceExecutorSynchronousFixture, BasicTaskRuns) { ASSERT_OK(executor->start()); auto guard = makeGuard([this] { ASSERT_OK(executor->shutdown(kShutdownTime)); }); diff --git a/src/mongo/transport/transport_layer_manager.cpp b/src/mongo/transport/transport_layer_manager.cpp index 487e4b6e643..2f7508e5c1a 100644 --- a/src/mongo/transport/transport_layer_manager.cpp +++ b/src/mongo/transport/transport_layer_manager.cpp @@ -41,7 +41,6 @@ #include "mongo/db/server_options.h" #include "mongo/db/service_context.h" #include "mongo/logv2/log.h" -#include "mongo/transport/service_executor_adaptive.h" #include "mongo/transport/service_executor_synchronous.h" #include "mongo/transport/session.h" #include "mongo/transport/transport_layer_asio.h" @@ -133,34 +132,15 @@ std::unique_ptr<TransportLayer> TransportLayerManager::makeAndStartDefaultEgress std::unique_ptr<TransportLayer> TransportLayerManager::createWithConfig( const ServerGlobalParams* config, ServiceContext* ctx) { - std::unique_ptr<TransportLayer> transportLayer; auto sep = ctx->getServiceEntryPoint(); transport::TransportLayerASIO::Options opts(config); - if (config->serviceExecutor == "adaptive") { - LOGV2_OPTIONS(4870401, - {logv2::LogTag::kStartupWarnings}, - "The adaptive service executor implementation is deprecated, please leave " - "--serviceExecutor unspecified"); - opts.transportMode = transport::Mode::kAsynchronous; - } else if (config->serviceExecutor == "synchronous") { - opts.transportMode = transport::Mode::kSynchronous; - } else { - MONGO_UNREACHABLE; - } - - auto transportLayerASIO = std::make_unique<transport::TransportLayerASIO>(opts, sep); + opts.transportMode = transport::Mode::kSynchronous; - if (config->serviceExecutor == "adaptive") { - auto reactor = transportLayerASIO->getReactor(TransportLayer::kIngress); - ctx->setServiceExecutor(std::make_unique<ServiceExecutorAdaptive>(ctx, std::move(reactor))); - } else if (config->serviceExecutor == "synchronous") { - ctx->setServiceExecutor(std::make_unique<ServiceExecutorSynchronous>(ctx)); - } - transportLayer = std::move(transportLayerASIO); + ctx->setServiceExecutor(std::make_unique<ServiceExecutorSynchronous>(ctx)); std::vector<std::unique_ptr<TransportLayer>> retVector; - retVector.emplace_back(std::move(transportLayer)); + retVector.emplace_back(std::make_unique<transport::TransportLayerASIO>(opts, sep)); return std::make_unique<TransportLayerManager>(std::move(retVector)); } |