summaryrefslogtreecommitdiff
path: root/src/mongo/transport
diff options
context:
space:
mode:
authorAmirsaman Memaripour <amirsaman.memaripour@mongodb.com>2020-06-22 16:30:35 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2020-06-23 22:36:50 +0000
commit311d7ccd0b6fd24122e28b7e8f3a1e191dd4078a (patch)
tree5e18af20c70eaf568d93778b1ee0052da3a91ace /src/mongo/transport
parent36bf915c32d551ad557ec7a1fa41890037e9f54f (diff)
downloadmongo-311d7ccd0b6fd24122e28b7e8f3a1e191dd4078a.tar.gz
SERVER-48973 Remove ServiceExecutorAdaptive
Diffstat (limited to 'src/mongo/transport')
-rw-r--r--src/mongo/transport/SConscript3
-rw-r--r--src/mongo/transport/service_executor_adaptive.cpp704
-rw-r--r--src/mongo/transport/service_executor_adaptive.h251
-rw-r--r--src/mongo/transport/service_executor_adaptive_test.cpp397
-rw-r--r--src/mongo/transport/service_executor_test.cpp60
-rw-r--r--src/mongo/transport/transport_layer_manager.cpp26
6 files changed, 4 insertions, 1437 deletions
diff --git a/src/mongo/transport/SConscript b/src/mongo/transport/SConscript
index 95ebc3a1fbb..ab1e34c702b 100644
--- a/src/mongo/transport/SConscript
+++ b/src/mongo/transport/SConscript
@@ -85,7 +85,6 @@ env.Library(
tlEnv.Library(
target='service_executor',
source=[
- 'service_executor_adaptive.cpp',
'service_executor_reserved.cpp',
'service_executor_synchronous.cpp',
env.Idlc('service_executor.idl')[0],
@@ -175,8 +174,6 @@ tlEnv.CppUnitTest(
'message_compressor_registry_test.cpp',
'transport_layer_asio_test.cpp',
'service_executor_test.cpp',
- # Disable this test until SERVER-30475 and associated build failure tickets are resolved.
- # 'service_executor_adaptive_test.cpp',
'max_conns_override_test.cpp',
'service_state_machine_test.cpp',
],
diff --git a/src/mongo/transport/service_executor_adaptive.cpp b/src/mongo/transport/service_executor_adaptive.cpp
deleted file mode 100644
index 156f1e3d74a..00000000000
--- a/src/mongo/transport/service_executor_adaptive.cpp
+++ /dev/null
@@ -1,704 +0,0 @@
-/**
- * Copyright (C) 2018-present MongoDB, Inc.
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the Server Side Public License, version 1,
- * as published by MongoDB, Inc.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * Server Side Public License for more details.
- *
- * You should have received a copy of the Server Side Public License
- * along with this program. If not, see
- * <http://www.mongodb.com/licensing/server-side-public-license>.
- *
- * As a special exception, the copyright holders give permission to link the
- * code of portions of this program with the OpenSSL library under certain
- * conditions as described in each individual source file and distribute
- * linked combinations including the program with the OpenSSL library. You
- * must comply with the Server Side Public License in all respects for
- * all of the code used other than as permitted herein. If you modify file(s)
- * with this exception, you may extend this exception to your version of the
- * file(s), but you are not obligated to do so. If you do not wish to do so,
- * delete this exception statement from your version. If you delete this
- * exception statement from all source files in the program, then also delete
- * it in the license file.
- */
-
-#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kExecutor
-
-#include "mongo/platform/basic.h"
-
-#include "mongo/transport/service_executor_adaptive.h"
-
-#include <array>
-#include <random>
-
-#include "mongo/logv2/log.h"
-#include "mongo/transport/service_entry_point_utils.h"
-#include "mongo/transport/service_executor_gen.h"
-#include "mongo/transport/service_executor_task_names.h"
-#include "mongo/util/concurrency/thread_name.h"
-#include "mongo/util/duration.h"
-#include "mongo/util/processinfo.h"
-#include "mongo/util/scopeguard.h"
-#include "mongo/util/str.h"
-
-#include <asio.hpp>
-
-namespace mongo {
-namespace transport {
-
-namespace {
-constexpr auto kTotalQueued = "totalQueued"_sd;
-constexpr auto kTotalExecuted = "totalExecuted"_sd;
-constexpr auto kTotalTimeExecutingUs = "totalTimeExecutingMicros"_sd;
-constexpr auto kTotalTimeRunningUs = "totalTimeRunningMicros"_sd;
-constexpr auto kTotalTimeQueuedUs = "totalTimeQueuedMicros"_sd;
-constexpr auto kThreadsInUse = "threadsInUse"_sd;
-constexpr auto kThreadsRunning = "threadsRunning"_sd;
-constexpr auto kThreadsPending = "threadsPending"_sd;
-constexpr auto kExecutorLabel = "executor"_sd;
-constexpr auto kExecutorName = "adaptive"_sd;
-constexpr auto kStuckDetection = "stuckThreadsDetected"_sd;
-constexpr auto kStarvation = "starvation"_sd;
-constexpr auto kReserveMinimum = "belowReserveMinimum"_sd;
-constexpr auto kThreadReasons = "threadCreationCauses"_sd;
-
-int64_t ticksToMicros(TickSource::Tick ticks, TickSource* tickSource) {
- invariant(tickSource->getTicksPerSecond() >= 1000000);
- return tickSource->ticksTo<Microseconds>(ticks).count();
-}
-
-struct ServerParameterOptions : public ServiceExecutorAdaptive::Options {
- int reservedThreads() const final {
- int value = adaptiveServiceExecutorReservedThreads.load();
- if (value == -1) {
- value = ProcessInfo::getNumAvailableCores() / 2;
- value = std::max(value, 2);
- adaptiveServiceExecutorReservedThreads.store(value);
- LOGV2(
- 22951,
- "No thread count configured for executor. Using number of cores / 2: {threadCount}",
- "No thread count configured for executor. Using number of cores / 2",
- "threadCount"_attr = value);
- }
- return value;
- }
-
- Milliseconds workerThreadRunTime() const final {
- return Milliseconds{adaptiveServiceExecutorRunTimeMillis.load()};
- }
-
- int runTimeJitter() const final {
- return adaptiveServiceExecutorRunTimeJitterMillis.load();
- }
-
- Milliseconds stuckThreadTimeout() const final {
- return Milliseconds{adaptiveServiceExecutorStuckThreadTimeoutMillis.load()};
- }
-
- Microseconds maxQueueLatency() const final {
- static Nanoseconds minTimerResolution = getMinimumTimerResolution();
- Microseconds value{adaptiveServiceExecutorMaxQueueLatencyMicros.load()};
- if (value < minTimerResolution) {
- LOGV2(22952,
- "Target MaxQueueLatencyMicros ({targetMaxQueLatencyMicros}) is less than minimum "
- "timer resolution of "
- "OS ({OSMinTimerResolution}). Using {OSMinTimerResolution}",
- "Target MaxQueueLatencyMicros is less than the OS minimum timer resolution. "
- "Using the OS minimum",
- "targetMaxQueLatencyMicros"_attr = value,
- "OSMinTimerResolution"_attr = minTimerResolution);
- value = duration_cast<Microseconds>(minTimerResolution) + Microseconds{1};
- adaptiveServiceExecutorMaxQueueLatencyMicros.store(value.count());
- }
- return value;
- }
-
- int idlePctThreshold() const final {
- return adaptiveServiceExecutorIdlePctThreshold.load();
- }
-
- int recursionLimit() const final {
- return adaptiveServiceExecutorRecursionLimit.load();
- }
-};
-
-} // namespace
-
-thread_local ServiceExecutorAdaptive::ThreadState* ServiceExecutorAdaptive::_localThreadState =
- nullptr;
-
-ServiceExecutorAdaptive::ServiceExecutorAdaptive(ServiceContext* ctx, ReactorHandle reactor)
- : ServiceExecutorAdaptive(ctx, std::move(reactor), std::make_unique<ServerParameterOptions>()) {
-}
-
-ServiceExecutorAdaptive::ServiceExecutorAdaptive(ServiceContext* ctx,
- ReactorHandle reactor,
- std::unique_ptr<Options> config)
- : _reactorHandle(reactor),
- _config(std::move(config)),
- _tickSource(ctx->getTickSource()),
- _lastScheduleTimer(_tickSource) {}
-
-ServiceExecutorAdaptive::~ServiceExecutorAdaptive() {
- invariant(!_isRunning.load());
-}
-
-Status ServiceExecutorAdaptive::start() {
- invariant(!_isRunning.load());
- _isRunning.store(true);
- _controllerThread = stdx::thread(&ServiceExecutorAdaptive::_controllerThreadRoutine, this);
- for (auto i = 0; i < _config->reservedThreads(); i++) {
- _startWorkerThread(ThreadCreationReason::kReserveMinimum);
- }
-
- return Status::OK();
-}
-
-Status ServiceExecutorAdaptive::shutdown(Milliseconds timeout) {
- if (!_isRunning.load())
- return Status::OK();
-
- _isRunning.store(false);
-
- _scheduleCondition.notify_one();
- _controllerThread.join();
-
- stdx::unique_lock<Latch> lk(_threadsMutex);
- _reactorHandle->stop();
- bool result =
- _deathCondition.wait_for(lk, timeout.toSystemDuration(), [&] { return _threads.empty(); });
-
- return result
- ? Status::OK()
- : Status(ErrorCodes::Error::ExceededTimeLimit,
- "adaptive executor couldn't shutdown all worker threads within time limit.");
-}
-
-Status ServiceExecutorAdaptive::schedule(ServiceExecutorAdaptive::Task task,
- ScheduleFlags flags,
- ServiceExecutorTaskName taskName) {
- auto scheduleTime = _tickSource->getTicks();
- auto pendingCounterPtr = (flags & kDeferredTask) ? &_deferredTasksQueued : &_tasksQueued;
- pendingCounterPtr->addAndFetch(1);
-
- if (!_isRunning.load()) {
- return {ErrorCodes::ShutdownInProgress, "Executor is not running"};
- }
-
- auto wrappedTask =
- [this, task = std::move(task), scheduleTime, pendingCounterPtr, taskName, flags](
- auto status) {
- pendingCounterPtr->subtractAndFetch(1);
- auto start = _tickSource->getTicks();
- _totalSpentQueued.addAndFetch(start - scheduleTime);
-
- _localThreadState->threadMetrics[static_cast<size_t>(taskName)]
- ._totalSpentQueued.addAndFetch(start - scheduleTime);
-
- if (_localThreadState->recursionDepth++ == 0) {
- _localThreadState->executing.markRunning();
- _threadsInUse.addAndFetch(1);
- }
- const auto guard = makeGuard([this, taskName] {
- if (--_localThreadState->recursionDepth == 0) {
- _localThreadState->executingCurRun +=
- _localThreadState->executing.markStopped();
- _threadsInUse.subtractAndFetch(1);
- }
- _totalExecuted.addAndFetch(1);
- _localThreadState->threadMetrics[static_cast<size_t>(taskName)]
- ._totalExecuted.addAndFetch(1);
- });
-
- TickTimer _localTimer(_tickSource);
- task();
- _localThreadState->threadMetrics[static_cast<size_t>(taskName)]
- ._totalSpentExecuting.addAndFetch(_localTimer.sinceStartTicks());
- };
-
- // Dispatching a task on the io_context will run the task immediately, and may run it
- // on the current thread (if the current thread is running the io_context right now).
- //
- // Posting a task on the io_context will run the task without recursion.
- //
- // If the task is allowed to recurse and we are not over the depth limit, dispatch it so it
- // can be called immediately and recursively.
- if ((flags & kMayRecurse) &&
- (_localThreadState->recursionDepth + 1 < _config->recursionLimit())) {
- _reactorHandle->dispatch(std::move(wrappedTask));
- } else {
- _reactorHandle->schedule(std::move(wrappedTask));
- }
-
- _lastScheduleTimer.reset();
- _totalQueued.addAndFetch(1);
-
- _accumulatedMetrics[static_cast<size_t>(taskName)]._totalQueued.addAndFetch(1);
-
- // Deferred tasks never count against the thread starvation avoidance. For other tasks, we
- // notify the controller thread that a task has been scheduled and we should monitor thread
- // starvation.
- if (_isStarved() && !(flags & kDeferredTask)) {
- _starvationCheckRequests.addAndFetch(1);
- _scheduleCondition.notify_one();
- }
-
- return Status::OK();
-}
-
-bool ServiceExecutorAdaptive::_isStarved() const {
- // If threads are still starting, then assume we won't be starved pretty soon, return false
- if (_threadsPending.load() > 0)
- return false;
-
- auto tasksQueued = _tasksQueued.load();
- // If there are no pending tasks, then we definitely aren't starved
- if (tasksQueued == 0)
- return false;
-
- // The available threads is the number that are running - the number that are currently
- // executing
- auto available = _threadsRunning.load() - _threadsInUse.load();
-
- return (tasksQueued > available);
-}
-
-/*
- * The pool of worker threads can become unhealthy in several ways, and the controller thread
- * tries to keep the pool healthy by starting new threads when it is:
- *
- * Stuck: All threads are running a long-running task that's waiting on a network event, but
- * there are no threads available to process network events. The thread pool cannot make progress
- * without intervention.
- *
- * Starved: All threads are saturated with tasks and new tasks are having to queue for longer
- * than the configured maxQueueLatency().
- *
- * Below reserve: An error has occurred and there are fewer threads than the reserved minimum.
- *
- * While the executor is running, it runs in a loop waiting to be woken up by schedule() or a
- * timeout to occur. When it wakes up, it ensures that:
- * - The thread pool is not stuck longer than the configured stuckThreadTimeout(). If it is, then
- * start a new thread and wait to be woken up again (or time out again and redo stuck thread
- * detection).
- * - The number of threads is >= the reservedThreads() value. If it isn't, then start as many
- * threads as necessary.
- * - Checking for starvation when requested by schedule(), and starting new threads if the
- * pool is saturated and is starved longer than the maxQueueLatency() after being woken up
- * by schedule().
- */
-void ServiceExecutorAdaptive::_controllerThreadRoutine() {
- auto noopLock = MONGO_MAKE_LATCH();
- setThreadName("worker-controller"_sd);
-
- // Setup the timers/timeout values for stuck thread detection.
- TickTimer sinceLastStuckThreadCheck(_tickSource);
- auto stuckThreadTimeout = _config->stuckThreadTimeout();
-
- // Get the initial values for our utilization percentage calculations
- auto getTimerTotals = [this]() {
- stdx::unique_lock<Latch> lk(_threadsMutex);
- auto first = _getThreadTimerTotal(ThreadTimer::kExecuting, lk);
- auto second = _getThreadTimerTotal(ThreadTimer::kRunning, lk);
- return std::make_pair(first, second);
- };
-
- TickSource::Tick lastSpentExecuting, lastSpentRunning;
- std::tie(lastSpentExecuting, lastSpentRunning) = getTimerTotals();
-
- while (_isRunning.load()) {
- // We want to wait for schedule() to wake us up, or for the stuck thread timeout to pass.
- // So the timeout is the current stuck thread timeout - the last time we did stuck thread
- // detection.
- auto timeout = stuckThreadTimeout - sinceLastStuckThreadCheck.sinceStart();
-
- bool maybeStarved = false;
- // If the timeout is less than a millisecond then don't bother to go to sleep to wait for
- // it, just do the stuck thread detection now.
- if (timeout > Milliseconds{0}) {
- stdx::unique_lock<decltype(noopLock)> scheduleLk(noopLock);
- int checkRequests = 0;
- maybeStarved = _scheduleCondition.wait_for(
- scheduleLk, timeout.toSystemDuration(), [this, &checkRequests] {
- if (!_isRunning.load())
- return false;
- checkRequests = _starvationCheckRequests.load();
- return (checkRequests > 0);
- });
-
- _starvationCheckRequests.subtractAndFetch(checkRequests);
- }
-
- // If the executor has stopped, then stop the controller altogether
- if (!_isRunning.load())
- break;
-
- if (sinceLastStuckThreadCheck.sinceStart() >= stuckThreadTimeout) {
- // Reset our timer so we know how long to sleep for the next time around;
- sinceLastStuckThreadCheck.reset();
-
- // Each call to schedule updates the last schedule ticks so we know the last time a
- // task was scheduled
- Milliseconds sinceLastSchedule = _lastScheduleTimer.sinceStart();
-
- // If the number of tasks executing is the number of threads running (that is all
- // threads are currently busy), and the last time a task was able to be scheduled was
- // longer than our wait timeout, then we can assume all threads are stuck and we should
- // start a new thread to unblock the pool.
- //
- if ((_threadsInUse.load() == _threadsRunning.load()) &&
- (sinceLastSchedule >= stuckThreadTimeout)) {
- // When the executor is stuck, we halve the stuck thread timeout to be more
- // aggressive the next time out unsticking the executor, and then start a new
- // thread to unblock the executor for now.
- stuckThreadTimeout /= 2;
- stuckThreadTimeout = std::max(Milliseconds{10}, stuckThreadTimeout);
- LOGV2(22953,
- "Detected blocked worker threads. starting new thread to unblock service "
- "executor. Stuck thread timeout now: {stuckThreadTimeout}",
- "Detected blocked worker threads. starting new thread to unblock service "
- "executor",
- "stuckThreadTimeout"_attr = stuckThreadTimeout);
- _startWorkerThread(ThreadCreationReason::kStuckDetection);
-
- // Since we've just started a worker thread, then we know that the executor isn't
- // starved, so just loop back around to wait for the next control event.
- continue;
- }
-
- // If the executor wasn't stuck, then we should back off our stuck thread timeout back
- // towards the configured value.
- auto newStuckThreadTimeout = stuckThreadTimeout + (stuckThreadTimeout / 2);
- newStuckThreadTimeout = std::min(_config->stuckThreadTimeout(), newStuckThreadTimeout);
- if (newStuckThreadTimeout != stuckThreadTimeout) {
- LOGV2_DEBUG(22954,
- 1,
- "Increasing stuck thread timeout to {newStuckThreadTimeout}",
- "Increasing stuck thread timeout",
- "newStuckThreadTimeout"_attr = newStuckThreadTimeout);
- stuckThreadTimeout = newStuckThreadTimeout;
- }
- }
-
- auto threadsRunning = _threadsRunning.load();
- if (threadsRunning < _config->reservedThreads()) {
- LOGV2(22955,
- "Starting {numThreads} to replenish reserved worker",
- "Starting threads to replenish reserved worker",
- "numThreads"_attr = _config->reservedThreads() - threadsRunning);
- while (_threadsRunning.load() < _config->reservedThreads()) {
- _startWorkerThread(ThreadCreationReason::kReserveMinimum);
- }
- }
-
- // If we were notified by schedule() to do starvation checking, then we first need to
- // calculate the overall utilization of the executor.
- if (maybeStarved) {
-
- // Get the difference between the amount of time the executor has spent waiting for/
- // running tasks since the last time we measured.
- TickSource::Tick spentExecuting, spentRunning;
- std::tie(spentExecuting, spentRunning) = getTimerTotals();
- auto diffExecuting = spentExecuting - lastSpentExecuting;
- auto diffRunning = spentRunning - lastSpentRunning;
-
- double utilizationPct;
- // If we spent zero time running then the executor was fully idle and our utilization
- // is zero percent
- if (spentRunning == 0 || diffRunning == 0)
- utilizationPct = 0.0;
- else {
- lastSpentExecuting = spentExecuting;
- lastSpentRunning = spentRunning;
-
- utilizationPct = diffExecuting / static_cast<double>(diffRunning);
- utilizationPct *= 100;
- }
-
- // If the utilization percentage is less than our threshold then we don't want to
- // do anything because the threads are not actually saturated with work.
- if (utilizationPct < _config->idlePctThreshold()) {
- continue;
- }
- }
-
- // While there are threads that are still starting up, wait for the max queue latency,
- // up to the current stuck thread timeout.
- do {
- stdx::this_thread::sleep_for(_config->maxQueueLatency().toSystemDuration());
- } while ((_threadsPending.load() > 0) &&
- (sinceLastStuckThreadCheck.sinceStart() < stuckThreadTimeout));
-
- // If the number of pending tasks is greater than the number of running threads minus the
- // number of tasks executing (the number of free threads), then start a new worker to
- // avoid starvation.
- if (_isStarved()) {
- LOGV2(22956, "Starting worker thread to avoid starvation.");
- _startWorkerThread(ThreadCreationReason::kStarvation);
- }
- }
-}
-
-void ServiceExecutorAdaptive::_startWorkerThread(ThreadCreationReason reason) {
- stdx::unique_lock<Latch> lk(_threadsMutex);
- auto it = _threads.emplace(_threads.begin(), _tickSource);
- auto num = _threads.size();
-
- _threadsPending.addAndFetch(1);
- _threadsRunning.addAndFetch(1);
- _threadStartCounters[static_cast<size_t>(reason)] += 1;
-
- lk.unlock();
-
- const auto launchResult =
- launchServiceWorkerThread([this, num, it] { _workerThreadRoutine(num, it); });
-
- if (!launchResult.isOK()) {
- LOGV2_WARNING(22959,
- "Failed to launch new worker thread: {error}",
- "Failed to launch new worker thread",
- "error"_attr = launchResult);
- lk.lock();
- _threadsPending.subtractAndFetch(1);
- _threadsRunning.subtractAndFetch(1);
- _threadStartCounters[static_cast<size_t>(reason)] -= 1;
- _threads.erase(it);
- }
-}
-
-Milliseconds ServiceExecutorAdaptive::_getThreadJitter() const {
- static auto jitterMutex = MONGO_MAKE_LATCH();
- static std::default_random_engine randomEngine = [] {
- std::random_device seed;
- return std::default_random_engine(seed());
- }();
-
- auto jitterParam = _config->runTimeJitter();
- if (jitterParam == 0)
- return Milliseconds{0};
-
- std::uniform_int_distribution<> jitterDist(-jitterParam, jitterParam);
-
- stdx::lock_guard<Latch> lk(jitterMutex);
- auto jitter = jitterDist(randomEngine);
- if (jitter > _config->workerThreadRunTime().count())
- jitter = 0;
-
- return Milliseconds{jitter};
-}
-
-void ServiceExecutorAdaptive::_accumulateTaskMetrics(MetricsArray* outArray,
- const MetricsArray& inputArray) const {
- for (auto it = inputArray.begin(); it != inputArray.end(); ++it) {
- auto taskName = static_cast<ServiceExecutorTaskName>(std::distance(inputArray.begin(), it));
- auto& output = outArray->at(static_cast<size_t>(taskName));
-
- output._totalSpentExecuting.addAndFetch(it->_totalSpentExecuting.load());
- output._totalSpentQueued.addAndFetch(it->_totalSpentQueued.load());
- output._totalExecuted.addAndFetch(it->_totalExecuted.load());
- output._totalQueued.addAndFetch(it->_totalQueued.load());
- }
-}
-
-void ServiceExecutorAdaptive::_accumulateAllTaskMetrics(MetricsArray* outputMetricsArray,
- const stdx::unique_lock<Latch>& lk) const {
- _accumulateTaskMetrics(outputMetricsArray, _accumulatedMetrics);
- for (auto& thread : _threads) {
- _accumulateTaskMetrics(outputMetricsArray, thread.threadMetrics);
- }
-}
-
-TickSource::Tick ServiceExecutorAdaptive::_getThreadTimerTotal(
- ThreadTimer which, const stdx::unique_lock<Latch>& lk) const {
- TickSource::Tick accumulator;
- switch (which) {
- case ThreadTimer::kRunning:
- accumulator = _pastThreadsSpentRunning.load();
- break;
- case ThreadTimer::kExecuting:
- accumulator = _pastThreadsSpentExecuting.load();
- break;
- }
-
- for (auto& thread : _threads) {
- switch (which) {
- case ThreadTimer::kRunning:
- accumulator += thread.running.totalTime();
- break;
- case ThreadTimer::kExecuting:
- accumulator += thread.executing.totalTime();
- break;
- }
- }
-
- return accumulator;
-}
-
-void ServiceExecutorAdaptive::_workerThreadRoutine(
- int threadId, ServiceExecutorAdaptive::ThreadList::iterator state) {
- _threadsPending.subtractAndFetch(1);
- _localThreadState = &(*state);
- {
- std::string threadName = str::stream() << "worker-" << threadId;
- setThreadName(threadName);
- }
-
- LOGV2(22957,
- "Started new database worker thread {id}",
- "Started new database worker thread",
- "id"_attr = threadId);
-
- bool guardThreadsRunning = true;
- const auto guard = makeGuard([this, &guardThreadsRunning, state] {
- if (guardThreadsRunning)
- _threadsRunning.subtractAndFetch(1);
- _pastThreadsSpentRunning.addAndFetch(state->running.totalTime());
- _pastThreadsSpentExecuting.addAndFetch(state->executing.totalTime());
-
- _accumulateTaskMetrics(&_accumulatedMetrics, state->threadMetrics);
- {
- stdx::lock_guard<Latch> lk(_threadsMutex);
- _threads.erase(state);
- }
- _deathCondition.notify_one();
- });
-
- auto jitter = _getThreadJitter();
-
- while (_isRunning.load()) {
- // We don't want all the threads to start/stop running at exactly the same time, so the
- // jitter setParameter adds/removes a random small amount of time to the runtime.
- Milliseconds runTime = _config->workerThreadRunTime() + jitter;
- dassert(runTime.count() > 0);
-
- // Reset ticksSpentExecuting timer
- state->executingCurRun = 0;
-
- // If we're still "pending" only try to run one task, that way the controller will
- // know that it's okay to start adding threads to avoid starvation again.
- state->running.markRunning();
- _reactorHandle->runFor(runTime);
-
- auto spentRunning = state->running.markStopped();
-
- // If we spent less than our idle threshold actually running tasks then exit the thread.
- // This is a helper lambda to perform that calculation.
- const auto calculatePctExecuting = [&spentRunning, &state]() {
- // This time measurement doesn't include time spent running network callbacks,
- // so the threshold is lower than you'd expect.
- dassert(spentRunning < std::numeric_limits<double>::max());
-
- // First get the ratio of ticks spent executing to ticks spent running. We
- // expect this to be <= 1.0
- double executingToRunning = state->executingCurRun / static_cast<double>(spentRunning);
-
- // Multiply that by 100 to get the percentage of time spent executing tasks. We
- // expect this to be <= 100.
- executingToRunning *= 100;
- dassert(executingToRunning <= 100);
-
- return static_cast<int>(executingToRunning);
- };
-
- bool terminateThread = false;
- int pctExecuting;
- int runningThreads;
-
- // Make sure we don't terminate threads below the reserved threshold. As there can be
- // several worker threads concurrently in this terminate logic atomically reduce the threads
- // one by one to avoid racing using a lockless compare-and-swap loop where we retry if there
- // is contention on the atomic.
- do {
- runningThreads = _threadsRunning.load();
-
- if (runningThreads <= _config->reservedThreads()) {
- terminateThread = false;
- break; // keep thread
- }
-
- if (!terminateThread) {
- pctExecuting = calculatePctExecuting();
- terminateThread = pctExecuting <= _config->idlePctThreshold();
- }
- } while (terminateThread &&
- !_threadsRunning.compareAndSwap(&runningThreads, runningThreads - 1));
- if (terminateThread) {
- LOGV2(22958,
- "Thread exiting because utiliaztion {utilizationPct}% was under the idle "
- "threshold in the runtime window {runtimeWindow}",
- "Thread exiting because utiliaztion was under the idle threshold in the runtime "
- "window",
- "unilizationPercent"_attr = pctExecuting,
- "runtimeWindow"_attr = runTime);
-
- // Because we've already modified _threadsRunning, make sure the thread guard also
- // doesn't do it.
- guardThreadsRunning = false;
- break;
- }
- }
-}
-
-StringData ServiceExecutorAdaptive::_threadStartedByToString(
- ServiceExecutorAdaptive::ThreadCreationReason reason) {
- switch (reason) {
- case ThreadCreationReason::kStuckDetection:
- return kStuckDetection;
- case ThreadCreationReason::kStarvation:
- return kStarvation;
- case ThreadCreationReason::kReserveMinimum:
- return kReserveMinimum;
- default:
- MONGO_UNREACHABLE;
- }
-}
-
-void ServiceExecutorAdaptive::appendStats(BSONObjBuilder* bob) const {
- stdx::unique_lock<Latch> lk(_threadsMutex);
- *bob << kExecutorLabel << kExecutorName //
- << kTotalQueued << _totalQueued.load() //
- << kTotalExecuted << _totalExecuted.load() //
- << kThreadsInUse << _threadsInUse.load() //
- << kTotalTimeRunningUs //
- << ticksToMicros(_getThreadTimerTotal(ThreadTimer::kRunning, lk), _tickSource) //
- << kTotalTimeExecutingUs //
- << ticksToMicros(_getThreadTimerTotal(ThreadTimer::kExecuting, lk), _tickSource) //
- << kTotalTimeQueuedUs << ticksToMicros(_totalSpentQueued.load(), _tickSource) //
- << kThreadsRunning << _threadsRunning.load() //
- << kThreadsPending << _threadsPending.load();
-
- BSONObjBuilder threadStartReasons(bob->subobjStart(kThreadReasons));
- for (size_t i = 0; i < _threadStartCounters.size(); i++) {
- threadStartReasons << _threadStartedByToString(static_cast<ThreadCreationReason>(i))
- << _threadStartCounters[i];
- }
-
- threadStartReasons.doneFast();
-
- BSONObjBuilder metricsByTask(bob->subobjStart("metricsByTask"));
- MetricsArray totalMetrics;
- _accumulateAllTaskMetrics(&totalMetrics, lk);
- lk.unlock();
- for (auto it = totalMetrics.begin(); it != totalMetrics.end(); ++it) {
- auto taskName =
- static_cast<ServiceExecutorTaskName>(std::distance(totalMetrics.begin(), it));
- auto taskNameString = taskNameToString(taskName);
- BSONObjBuilder subSection(metricsByTask.subobjStart(taskNameString));
- subSection << kTotalQueued << it->_totalQueued.load() << kTotalExecuted
- << it->_totalExecuted.load() << kTotalTimeExecutingUs
- << ticksToMicros(it->_totalSpentExecuting.load(), _tickSource)
- << kTotalTimeQueuedUs
- << ticksToMicros(it->_totalSpentQueued.load(), _tickSource);
-
- subSection.doneFast();
- }
- metricsByTask.doneFast();
-}
-
-} // namespace transport
-} // namespace mongo
diff --git a/src/mongo/transport/service_executor_adaptive.h b/src/mongo/transport/service_executor_adaptive.h
deleted file mode 100644
index bf3b55b2c5c..00000000000
--- a/src/mongo/transport/service_executor_adaptive.h
+++ /dev/null
@@ -1,251 +0,0 @@
-/**
- * Copyright (C) 2018-present MongoDB, Inc.
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the Server Side Public License, version 1,
- * as published by MongoDB, Inc.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * Server Side Public License for more details.
- *
- * You should have received a copy of the Server Side Public License
- * along with this program. If not, see
- * <http://www.mongodb.com/licensing/server-side-public-license>.
- *
- * As a special exception, the copyright holders give permission to link the
- * code of portions of this program with the OpenSSL library under certain
- * conditions as described in each individual source file and distribute
- * linked combinations including the program with the OpenSSL library. You
- * must comply with the Server Side Public License in all respects for
- * all of the code used other than as permitted herein. If you modify file(s)
- * with this exception, you may extend this exception to your version of the
- * file(s), but you are not obligated to do so. If you do not wish to do so,
- * delete this exception statement from your version. If you delete this
- * exception statement from all source files in the program, then also delete
- * it in the license file.
- */
-
-#pragma once
-
-#include <array>
-#include <list>
-#include <vector>
-
-#include "mongo/db/service_context.h"
-#include "mongo/platform/atomic_word.h"
-#include "mongo/stdx/condition_variable.h"
-#include "mongo/stdx/thread.h"
-#include "mongo/transport/service_executor.h"
-#include "mongo/transport/service_executor_task_names.h"
-#include "mongo/transport/transport_layer.h"
-#include "mongo/util/tick_source.h"
-
-namespace mongo {
-namespace transport {
-
-/**
- * This is an ASIO-based adaptive ServiceExecutor. It guarantees that threads will not become stuck
- * or deadlocked longer that its configured timeout and that idle threads will terminate themselves
- * if they spend more than its configure idle threshold idle.
- */
-class ServiceExecutorAdaptive : public ServiceExecutor {
-public:
- struct Options {
- virtual ~Options() = default;
- // The minimum number of threads the executor will keep running to service tasks.
- virtual int reservedThreads() const = 0;
-
- // The amount of time each worker thread runs before considering exiting because of
- // idleness.
- virtual Milliseconds workerThreadRunTime() const = 0;
-
- // workerThreadRuntime() is offset by a random value between -jitter and +jitter to prevent
- // thundering herds
- virtual int runTimeJitter() const = 0;
-
- // The amount of time the controller thread will wait before checking for stuck threads
- // to guarantee forward progress
- virtual Milliseconds stuckThreadTimeout() const = 0;
-
- // The maximum allowed latency between when a task is scheduled and a thread is started to
- // service it.
- virtual Microseconds maxQueueLatency() const = 0;
-
- // Threads that spend less than this threshold doing work during their workerThreadRunTime
- // period will exit
- virtual int idlePctThreshold() const = 0;
-
- // The maximum allowable depth of recursion for tasks scheduled with the MayRecurse flag
- // before stack unwinding is forced.
- virtual int recursionLimit() const = 0;
- };
-
- explicit ServiceExecutorAdaptive(ServiceContext* ctx, ReactorHandle reactor);
- explicit ServiceExecutorAdaptive(ServiceContext* ctx,
- ReactorHandle reactor,
- std::unique_ptr<Options> config);
-
- ServiceExecutorAdaptive(ServiceExecutorAdaptive&&) = default;
- ServiceExecutorAdaptive& operator=(ServiceExecutorAdaptive&&) = default;
- virtual ~ServiceExecutorAdaptive();
-
- Status start() final;
- Status shutdown(Milliseconds timeout) final;
- Status schedule(Task task, ScheduleFlags flags, ServiceExecutorTaskName taskName) final;
-
- Mode transportMode() const final {
- return Mode::kAsynchronous;
- }
-
- void appendStats(BSONObjBuilder* bob) const final;
-
- int threadsRunning() {
- return _threadsRunning.load();
- }
-
-private:
- class TickTimer {
- public:
- explicit TickTimer(TickSource* tickSource)
- : _tickSource(tickSource),
- _ticksPerMillisecond(_tickSource->getTicksPerSecond() / 1000),
- _start(_tickSource->getTicks()) {
- invariant(_ticksPerMillisecond > 0);
- }
-
- TickSource::Tick sinceStartTicks() const {
- return _tickSource->getTicks() - _start.load();
- }
-
- Milliseconds sinceStart() const {
- return Milliseconds{sinceStartTicks() / _ticksPerMillisecond};
- }
-
- void reset() {
- _start.store(_tickSource->getTicks());
- }
-
- private:
- TickSource* const _tickSource;
- const TickSource::Tick _ticksPerMillisecond;
- AtomicWord<TickSource::Tick> _start;
- };
-
- class CumulativeTickTimer {
- public:
- CumulativeTickTimer(TickSource* ts) : _timer(ts) {}
-
- TickSource::Tick markStopped() {
- stdx::lock_guard<Latch> lk(_mutex);
- invariant(_running);
- _running = false;
- auto curTime = _timer.sinceStartTicks();
- _accumulator += curTime;
- return curTime;
- }
-
- void markRunning() {
- stdx::lock_guard<Latch> lk(_mutex);
- invariant(!_running);
- _timer.reset();
- _running = true;
- }
-
- TickSource::Tick totalTime() const {
- stdx::lock_guard<Latch> lk(_mutex);
- if (!_running)
- return _accumulator;
- return _timer.sinceStartTicks() + _accumulator;
- }
-
- private:
- TickTimer _timer;
- mutable Mutex _mutex = MONGO_MAKE_LATCH("::_mutex");
- TickSource::Tick _accumulator = 0;
- bool _running = false;
- };
-
- struct Metrics {
- AtomicWord<int64_t> _totalQueued{0};
- AtomicWord<int64_t> _totalExecuted{0};
- AtomicWord<TickSource::Tick> _totalSpentQueued{0};
- AtomicWord<TickSource::Tick> _totalSpentExecuting{0};
- };
-
- using MetricsArray =
- std::array<Metrics, static_cast<size_t>(ServiceExecutorTaskName::kMaxTaskName)>;
-
- enum class ThreadCreationReason { kStuckDetection, kStarvation, kReserveMinimum, kMax };
- enum class ThreadTimer { kRunning, kExecuting };
-
- struct ThreadState {
- ThreadState(TickSource* ts) : running(ts), executing(ts) {}
-
- CumulativeTickTimer running;
- TickSource::Tick executingCurRun;
- CumulativeTickTimer executing;
- MetricsArray threadMetrics;
- std::int64_t markIdleCounter = 0;
- int recursionDepth = 0;
- };
-
- using ThreadList = std::list<ThreadState>;
-
- void _startWorkerThread(ThreadCreationReason reason);
- static StringData _threadStartedByToString(ThreadCreationReason reason);
- void _workerThreadRoutine(int threadId, ThreadList::iterator it);
- void _controllerThreadRoutine();
- bool _isStarved() const;
- Milliseconds _getThreadJitter() const;
-
- void _accumulateTaskMetrics(MetricsArray* outArray, const MetricsArray& inputArray) const;
- void _accumulateAllTaskMetrics(MetricsArray* outputMetricsArray,
- const stdx::unique_lock<Latch>& lk) const;
- TickSource::Tick _getThreadTimerTotal(ThreadTimer which,
- const stdx::unique_lock<Latch>& lk) const;
-
- ReactorHandle _reactorHandle;
-
- std::unique_ptr<Options> _config;
-
- mutable Mutex _threadsMutex = MONGO_MAKE_LATCH("ServiceExecutorAdaptive::_threadsMutex");
- ThreadList _threads;
- std::array<int64_t, static_cast<size_t>(ThreadCreationReason::kMax)> _threadStartCounters;
-
- stdx::thread _controllerThread;
-
- TickSource* const _tickSource;
- AtomicWord<bool> _isRunning{false};
-
- // These counters are used to detect stuck threads and high task queuing.
- AtomicWord<int> _threadsRunning{0};
- AtomicWord<int> _threadsPending{0};
- AtomicWord<int> _threadsInUse{0};
- AtomicWord<int> _tasksQueued{0};
- AtomicWord<int> _deferredTasksQueued{0};
- TickTimer _lastScheduleTimer;
- AtomicWord<TickSource::Tick> _pastThreadsSpentExecuting{0};
- AtomicWord<TickSource::Tick> _pastThreadsSpentRunning{0};
- static thread_local ThreadState* _localThreadState;
-
- // These counters are only used for reporting in serverStatus.
- AtomicWord<int64_t> _totalQueued{0};
- AtomicWord<int64_t> _totalExecuted{0};
- AtomicWord<TickSource::Tick> _totalSpentQueued{0};
-
- // Threads signal this condition variable when they exit so we can gracefully shutdown
- // the executor.
- stdx::condition_variable _deathCondition;
-
- // Tasks should signal this condition variable if they want the thread controller to
- // track their progress and do fast stuck detection
- AtomicWord<int> _starvationCheckRequests{0};
- stdx::condition_variable _scheduleCondition;
-
- MetricsArray _accumulatedMetrics;
-};
-
-} // namespace transport
-} // namespace mongo
diff --git a/src/mongo/transport/service_executor_adaptive_test.cpp b/src/mongo/transport/service_executor_adaptive_test.cpp
deleted file mode 100644
index b47a73089a1..00000000000
--- a/src/mongo/transport/service_executor_adaptive_test.cpp
+++ /dev/null
@@ -1,397 +0,0 @@
-/**
- * Copyright (C) 2018-present MongoDB, Inc.
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the Server Side Public License, version 1,
- * as published by MongoDB, Inc.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * Server Side Public License for more details.
- *
- * You should have received a copy of the Server Side Public License
- * along with this program. If not, see
- * <http://www.mongodb.com/licensing/server-side-public-license>.
- *
- * As a special exception, the copyright holders give permission to link the
- * code of portions of this program with the OpenSSL library under certain
- * conditions as described in each individual source file and distribute
- * linked combinations including the program with the OpenSSL library. You
- * must comply with the Server Side Public License in all respects for
- * all of the code used other than as permitted herein. If you modify file(s)
- * with this exception, you may extend this exception to your version of the
- * file(s), but you are not obligated to do so. If you do not wish to do so,
- * delete this exception statement from your version. If you delete this
- * exception statement from all source files in the program, then also delete
- * it in the license file.
- */
-
-#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kTest
-
-#include "mongo/platform/basic.h"
-
-#include "boost/optional.hpp"
-
-#include "mongo/db/service_context.h"
-#include "mongo/logv2/log.h"
-#include "mongo/transport/service_executor_adaptive.h"
-#include "mongo/unittest/unittest.h"
-#include "mongo/util/scopeguard.h"
-
-#include <asio.hpp>
-
-namespace mongo {
-namespace {
-using namespace transport;
-
-struct TestOptions : public ServiceExecutorAdaptive::Options {
- int reservedThreads() const final {
- return 1;
- }
-
- Milliseconds workerThreadRunTime() const final {
- return Milliseconds{1000};
- }
-
- int runTimeJitter() const final {
- return 0;
- }
-
- Milliseconds stuckThreadTimeout() const final {
- return Milliseconds{100};
- }
-
- Microseconds maxQueueLatency() const final {
- return duration_cast<Microseconds>(Milliseconds{10});
- }
-
- int idlePctThreshold() const final {
- return 0;
- }
-
- int recursionLimit() const final {
- return 0;
- }
-};
-
-struct RecursionOptions : public ServiceExecutorAdaptive::Options {
- int reservedThreads() const final {
- return 1;
- }
-
- Milliseconds workerThreadRunTime() const final {
- return Milliseconds{1000};
- }
-
- int runTimeJitter() const final {
- return 0;
- }
-
- Milliseconds stuckThreadTimeout() const final {
- return Milliseconds{100};
- }
-
- Microseconds maxQueueLatency() const final {
- return duration_cast<Microseconds>(Milliseconds{5});
- }
-
- int idlePctThreshold() const final {
- return 0;
- }
-
- int recursionLimit() const final {
- return 10;
- }
-};
-
-class ServiceExecutorAdaptiveFixture : public unittest::Test {
-protected:
- void setUp() override {
- setGlobalServiceContext(ServiceContext::make());
- asioIoCtx = std::make_shared<asio::io_context>();
- }
-
- std::shared_ptr<asio::io_context> asioIoCtx;
-
- mutex = MONGO_MAKE_LATCH("ServiceExecutorAdaptiveFixture::mutex");
- AtomicWord<int> waitFor{-1};
- stdx::condition_variable cond;
- std::function<void()> notifyCallback = [this] {
- stdx::unique_lock<Latch> lk(mutex);
- invariant(waitFor.load() != -1);
- waitFor.fetchAndSubtract(1);
- cond.notify_one();
- LOGV2(22960, "Ran callback");
- };
-
- void waitForCallback(int expected, boost::optional<Milliseconds> timeout = boost::none) {
- stdx::unique_lock<Latch> lk(mutex);
- invariant(waitFor.load() != -1);
- if (timeout) {
- ASSERT_TRUE(cond.wait_for(
- lk, timeout->toSystemDuration(), [&] { return waitFor.load() == expected; }));
- } else {
- cond.wait(lk, [&] { return waitFor.load() == expected; });
- }
- }
-
- ServiceExecutorAdaptive::Options* config;
-
- template <class Options>
- std::unique_ptr<ServiceExecutorAdaptive> makeAndStartExecutor() {
- auto configOwned = std::make_unique<Options>();
- config = configOwned.get();
- auto exec = std::make_unique<ServiceExecutorAdaptive>(
- getGlobalServiceContext(), asioIoCtx, std::move(configOwned));
-
- ASSERT_OK(exec->start());
- LOGV2(22961, "wait for executor to finish starting");
- waitFor.store(1);
- ASSERT_OK(exec->schedule(notifyCallback,
- ServiceExecutor::kEmptyFlags,
- ServiceExecutorTaskName::kSSMProcessMessage));
- waitForCallback(0);
- ASSERT_EQ(exec->threadsRunning(), config->reservedThreads());
-
- return exec;
- }
-};
-
-/*
- * This tests that the executor will launch a new thread if the current threads are blocked, and
- * that those threads retire when they become idle.
- */
-TEST_F(ServiceExecutorAdaptiveFixture, TestStuckTask) {
- auto blockedMutex = MONGO_MAKE_LATCH();
- stdx::unique_lock<Latch> blockedLock(blockedMutex);
-
- auto exec = makeAndStartExecutor<TestOptions>();
- auto guard = makeGuard([&] {
- if (blockedLock)
- blockedLock.unlock();
- ASSERT_OK(exec->shutdown(config->workerThreadRunTime() * 2));
- });
-
- LOGV2(22962, "Scheduling blocked task");
- waitFor.store(3);
- ASSERT_OK(exec->schedule(
- [this, &blockedMutex] {
- notifyCallback();
- stdx::unique_lock<Latch> lk(blockedMutex);
- notifyCallback();
- },
- ServiceExecutor::kEmptyFlags,
- ServiceExecutorTaskName::kSSMProcessMessage));
-
- LOGV2(22963, "Scheduling task stuck on blocked task");
- ASSERT_OK(exec->schedule(
- notifyCallback, ServiceExecutor::kEmptyFlags, ServiceExecutorTaskName::kSSMProcessMessage));
-
- LOGV2(22964, "Waiting for second thread to start");
- waitForCallback(1);
- ASSERT_EQ(exec->threadsRunning(), 2);
-
- LOGV2(22965, "Waiting for unstuck task to run");
- blockedLock.unlock();
- waitForCallback(0);
- ASSERT_EQ(exec->threadsRunning(), 2);
-
- LOGV2(22966, "Waiting for second thread to idle out");
- stdx::this_thread::sleep_for(config->workerThreadRunTime().toSystemDuration() * 1.5);
- ASSERT_EQ(exec->threadsRunning(), config->reservedThreads());
-}
-
-/*
- * This tests that the executor will start a new batch of reserved threads if it detects that
- * all
- * threads are running a task for longer than the stuckThreadTimeout.
- */
-TEST_F(ServiceExecutorAdaptiveFixture, TestStuckThreads) {
- auto blockedMutex = MONGO_MAKE_LATCH();
- stdx::unique_lock<Latch> blockedLock(blockedMutex);
-
- auto exec = makeAndStartExecutor<TestOptions>();
- auto guard = makeGuard([&] {
- if (blockedLock)
- blockedLock.unlock();
- ASSERT_OK(exec->shutdown(config->workerThreadRunTime() * 2));
- });
-
- auto blockedTask = [this, &blockedMutex] {
- LOGV2(22967, "waiting on blocked mutex");
- notifyCallback();
- stdx::unique_lock<Latch> lk(blockedMutex);
- notifyCallback();
- };
-
- waitFor.store(6);
- auto tasks = waitFor.load() / 2;
- LOGV2(22968,
- "Scheduling {tasks} blocked tasks",
- "Scheduling blocked tasks",
- "tasks"_attr = tasks);
- for (auto i = 0; i < tasks; i++) {
- ASSERT_OK(exec->schedule(blockedTask,
- ServiceExecutor::kEmptyFlags,
- ServiceExecutorTaskName::kSSMProcessMessage));
- }
-
- LOGV2(22969, "Waiting for executor to start new threads");
- waitForCallback(3);
-
- LOGV2(22970, "All threads blocked, wait for executor to detect block and start a new thread.");
-
- // The controller thread in the adaptive executor runs on a stuckThreadTimeout in normal
- // operation where no starvation is detected (shouldn't be in this test as all threads should be
- // blocked). By waiting here for stuckThreadTimeout*3 it means that we have stuckThreadTimeout*2
- // for other waits in the controller and boot up a new thread which should be enough.
- stdx::this_thread::sleep_for(config->stuckThreadTimeout().toSystemDuration() * 3);
-
- ASSERT_EQ(exec->threadsRunning(), waitFor.load() + config->reservedThreads());
-
- LOGV2(22971, "Waiting for unstuck task to run");
- blockedLock.unlock();
- waitForCallback(0);
-}
-
-/*
- * This tests that the executor will launch more threads when starvation is detected. We launch
- * another task from itself so there will always be a queue of a waiting task if there's just one
- * thread.
- */
-TEST_F(ServiceExecutorAdaptiveFixture, TestStarvation) {
- auto exec = makeAndStartExecutor<TestOptions>();
-
- // auto so = MONGO_MAKE_LATCH() we don't attempt to call schedule and shutdown concurrently
- auto scheduleMutex = MONGO_MAKE_LATCH();
-
- auto guard = makeGuard([&] { ASSERT_OK(exec->shutdown(config->workerThreadRunTime() * 2)); });
-
- bool scheduleNew{true};
-
- std::function<void()> task;
- task = [this, &task, &exec, &scheduleMutex, &scheduleNew] {
- // This sleep needs to be larger than the sleep below to be able to limit the amount of
- // starvation.
- stdx::this_thread::sleep_for(config->maxQueueLatency().toSystemDuration() * 5);
-
- {
- stdx::unique_lock<Latch> lock(scheduleMutex);
-
- if (scheduleNew) {
- ASSERT_OK(exec->schedule(task,
- ServiceExecutor::kEmptyFlags,
- ServiceExecutorTaskName::kSSMProcessMessage));
- }
- }
-
- // This sleep needs to be larger than maxQueueLatency, when we schedule above the controller
- // thread will wake up because starvation is detected. By the time the controller thread
- // have slept for maxQueueLatency both worker threads should be executing work and there's
- // no further starvation. It needs to be significantly larger to avoid a race with asio
- // post. In the case of the first time when there's only one worker thread, starvation
- // should be detected and the second worker will be started.
- stdx::this_thread::sleep_for(config->maxQueueLatency().toSystemDuration() * 2);
- };
-
- ASSERT_OK(exec->schedule(
- task, ServiceExecutor::kEmptyFlags, ServiceExecutorTaskName::kSSMProcessMessage));
-
- stdx::this_thread::sleep_for(config->workerThreadRunTime().toSystemDuration() * 2);
- ASSERT_EQ(exec->threadsRunning(), 2);
-
- stdx::unique_lock<Latch> lock(scheduleMutex);
- scheduleNew = false;
-}
-
-/*
- * This tests that the executor can execute tasks recursively. If it can't starvation will be
- * detected and new threads started.
- */
-TEST_F(ServiceExecutorAdaptiveFixture, TestRecursion) {
- auto exec = makeAndStartExecutor<RecursionOptions>();
-
- AtomicWord<int> remainingTasks{config->recursionLimit() - 1};
- auto mutex = MONGO_MAKE_LATCH();
- stdx::condition_variable cv;
- std::function<void()> task;
-
- auto guard = makeGuard([&] { ASSERT_OK(exec->shutdown(config->workerThreadRunTime() * 2)); });
-
- task = [this, &task, &exec, &mutex, &cv, &remainingTasks] {
- if (remainingTasks.subtractAndFetch(1) == 0) {
- LOGV2(22972, "Signaling job done");
- cv.notify_one();
- return;
- }
-
- LOGV2(22973, "Starting task recursively");
-
- ASSERT_OK(exec->schedule(
- task, ServiceExecutor::kMayRecurse, ServiceExecutorTaskName::kSSMProcessMessage));
-
- // Make sure we don't block too long because then the block detection logic would kick in.
- stdx::this_thread::sleep_for(config->stuckThreadTimeout().toSystemDuration() /
- (config->recursionLimit() * 2));
- LOGV2(22974, "Completing task recursively");
- };
-
- stdx::unique_lock<Latch> lock(mutex);
-
- ASSERT_OK(exec->schedule(
- task, ServiceExecutor::kEmptyFlags, ServiceExecutorTaskName::kSSMProcessMessage));
-
- cv.wait_for(lock, config->workerThreadRunTime().toSystemDuration(), [&remainingTasks]() {
- return remainingTasks.load() == 0;
- });
-
- ASSERT_EQ(exec->threadsRunning(), config->reservedThreads());
-}
-
-/*
- * This tests that deferred tasks don't cause a new thread to be created, and they don't
- * interfere
- * with new normal tasks
- */
-TEST_F(ServiceExecutorAdaptiveFixture, TestDeferredTasks) {
- auto blockedMutex = MONGO_MAKE_LATCH();
- stdx::unique_lock<Latch> blockedLock(blockedMutex);
-
- auto exec = makeAndStartExecutor<TestOptions>();
- auto guard = makeGuard([&] {
- if (blockedLock)
- blockedLock.unlock();
- ASSERT_OK(exec->shutdown(config->workerThreadRunTime() * 2));
- });
-
- waitFor.store(3);
- LOGV2(22975, "Scheduling a blocking task");
- ASSERT_OK(exec->schedule(
- [this, &blockedMutex] {
- stdx::unique_lock<Latch> lk(blockedMutex);
- notifyCallback();
- },
- ServiceExecutor::kEmptyFlags,
- ServiceExecutorTaskName::kSSMProcessMessage));
-
- LOGV2(22976, "Scheduling deferred task");
- ASSERT_OK(exec->schedule(notifyCallback,
- ServiceExecutor::kDeferredTask,
- ServiceExecutorTaskName::kSSMProcessMessage));
-
- ASSERT_THROWS(waitForCallback(1, config->stuckThreadTimeout()),
- unittest::TestAssertionFailureException);
-
- LOGV2(22977, "Scheduling non-deferred task");
- ASSERT_OK(exec->schedule(
- notifyCallback, ServiceExecutor::kEmptyFlags, ServiceExecutorTaskName::kSSMProcessMessage));
- waitForCallback(1, config->stuckThreadTimeout());
- ASSERT_GT(exec->threadsRunning(), config->reservedThreads());
-
- blockedLock.unlock();
- waitForCallback(0);
-}
-
-} // namespace
-} // namespace mongo
diff --git a/src/mongo/transport/service_executor_test.cpp b/src/mongo/transport/service_executor_test.cpp
index 684efc24070..e584a921f12 100644
--- a/src/mongo/transport/service_executor_test.cpp
+++ b/src/mongo/transport/service_executor_test.cpp
@@ -35,9 +35,9 @@
#include "mongo/db/service_context.h"
#include "mongo/logv2/log.h"
-#include "mongo/transport/service_executor_adaptive.h"
#include "mongo/transport/service_executor_synchronous.h"
#include "mongo/transport/service_executor_task_names.h"
+#include "mongo/transport/transport_layer.h"
#include "mongo/unittest/unittest.h"
#include "mongo/util/scopeguard.h"
@@ -53,36 +53,6 @@ constexpr Milliseconds kWorkerThreadRunTime{1000};
const Milliseconds kShutdownTime = kWorkerThreadRunTime + Milliseconds{50};
} // namespace
-struct TestOptions : public ServiceExecutorAdaptive::Options {
- int reservedThreads() const final {
- return 1;
- }
-
- Milliseconds workerThreadRunTime() const final {
- return kWorkerThreadRunTime;
- }
-
- int runTimeJitter() const final {
- return 0;
- }
-
- Milliseconds stuckThreadTimeout() const final {
- return Milliseconds{100};
- }
-
- Microseconds maxQueueLatency() const final {
- return duration_cast<Microseconds>(Milliseconds{5});
- }
-
- int idlePctThreshold() const final {
- return 0;
- }
-
- int recursionLimit() const final {
- return 0;
- }
-};
-
/* This implements the portions of the transport::Reactor based on ASIO, but leaves out
* the methods not needed by ServiceExecutors.
*
@@ -149,23 +119,6 @@ private:
asio::io_context _ioContext;
};
-class ServiceExecutorAdaptiveFixture : public unittest::Test {
-protected:
- void setUp() override {
- auto scOwned = ServiceContext::make();
- setGlobalServiceContext(std::move(scOwned));
-
- auto configOwned = std::make_unique<TestOptions>();
- executorConfig = configOwned.get();
- executor = std::make_unique<ServiceExecutorAdaptive>(
- getGlobalServiceContext(), std::make_shared<ASIOReactor>(), std::move(configOwned));
- }
-
- ServiceExecutorAdaptive::Options* executorConfig;
- std::unique_ptr<ServiceExecutorAdaptive> executor;
- std::shared_ptr<asio::io_context> asioIOCtx;
-};
-
class ServiceExecutorSynchronousFixture : public unittest::Test {
protected:
void setUp() override {
@@ -197,17 +150,6 @@ void scheduleBasicTask(ServiceExecutor* exec, bool expectSuccess) {
}
}
-TEST_F(ServiceExecutorAdaptiveFixture, BasicTaskRuns) {
- ASSERT_OK(executor->start());
- auto guard = makeGuard([this] { ASSERT_OK(executor->shutdown(kShutdownTime)); });
-
- scheduleBasicTask(executor.get(), true);
-}
-
-TEST_F(ServiceExecutorAdaptiveFixture, ScheduleFailsBeforeStartup) {
- scheduleBasicTask(executor.get(), false);
-}
-
TEST_F(ServiceExecutorSynchronousFixture, BasicTaskRuns) {
ASSERT_OK(executor->start());
auto guard = makeGuard([this] { ASSERT_OK(executor->shutdown(kShutdownTime)); });
diff --git a/src/mongo/transport/transport_layer_manager.cpp b/src/mongo/transport/transport_layer_manager.cpp
index 487e4b6e643..2f7508e5c1a 100644
--- a/src/mongo/transport/transport_layer_manager.cpp
+++ b/src/mongo/transport/transport_layer_manager.cpp
@@ -41,7 +41,6 @@
#include "mongo/db/server_options.h"
#include "mongo/db/service_context.h"
#include "mongo/logv2/log.h"
-#include "mongo/transport/service_executor_adaptive.h"
#include "mongo/transport/service_executor_synchronous.h"
#include "mongo/transport/session.h"
#include "mongo/transport/transport_layer_asio.h"
@@ -133,34 +132,15 @@ std::unique_ptr<TransportLayer> TransportLayerManager::makeAndStartDefaultEgress
std::unique_ptr<TransportLayer> TransportLayerManager::createWithConfig(
const ServerGlobalParams* config, ServiceContext* ctx) {
- std::unique_ptr<TransportLayer> transportLayer;
auto sep = ctx->getServiceEntryPoint();
transport::TransportLayerASIO::Options opts(config);
- if (config->serviceExecutor == "adaptive") {
- LOGV2_OPTIONS(4870401,
- {logv2::LogTag::kStartupWarnings},
- "The adaptive service executor implementation is deprecated, please leave "
- "--serviceExecutor unspecified");
- opts.transportMode = transport::Mode::kAsynchronous;
- } else if (config->serviceExecutor == "synchronous") {
- opts.transportMode = transport::Mode::kSynchronous;
- } else {
- MONGO_UNREACHABLE;
- }
-
- auto transportLayerASIO = std::make_unique<transport::TransportLayerASIO>(opts, sep);
+ opts.transportMode = transport::Mode::kSynchronous;
- if (config->serviceExecutor == "adaptive") {
- auto reactor = transportLayerASIO->getReactor(TransportLayer::kIngress);
- ctx->setServiceExecutor(std::make_unique<ServiceExecutorAdaptive>(ctx, std::move(reactor)));
- } else if (config->serviceExecutor == "synchronous") {
- ctx->setServiceExecutor(std::make_unique<ServiceExecutorSynchronous>(ctx));
- }
- transportLayer = std::move(transportLayerASIO);
+ ctx->setServiceExecutor(std::make_unique<ServiceExecutorSynchronous>(ctx));
std::vector<std::unique_ptr<TransportLayer>> retVector;
- retVector.emplace_back(std::move(transportLayer));
+ retVector.emplace_back(std::make_unique<transport::TransportLayerASIO>(opts, sep));
return std::make_unique<TransportLayerManager>(std::move(retVector));
}