diff options
-rw-r--r-- | src/mongo/transport/SConscript | 13 | ||||
-rw-r--r-- | src/mongo/transport/service_executor_bm.cpp | 154 | ||||
-rw-r--r-- | src/mongo/transport/service_executor_synchronous.cpp | 205 | ||||
-rw-r--r-- | src/mongo/transport/service_executor_synchronous.h | 44 | ||||
-rw-r--r-- | src/mongo/transport/service_executor_utils.cpp | 17 | ||||
-rw-r--r-- | src/mongo/transport/service_executor_utils.h | 20 |
6 files changed, 333 insertions, 120 deletions
diff --git a/src/mongo/transport/SConscript b/src/mongo/transport/SConscript index 29de1d74759..bba85a8a268 100644 --- a/src/mongo/transport/SConscript +++ b/src/mongo/transport/SConscript @@ -247,6 +247,19 @@ tlEnvTest.CppIntegrationTest( ) env.Benchmark( + target='service_executor_bm', + source=[ + 'service_executor_bm.cpp', + ], + LIBDEPS=[ + '$BUILD_DIR/mongo/base', + '$BUILD_DIR/mongo/db/service_context_test_fixture', + 'service_executor', + 'transport_layer_mock', + ], +) + +env.Benchmark( target='session_workflow_bm', source=[ 'session_workflow_bm.cpp', diff --git a/src/mongo/transport/service_executor_bm.cpp b/src/mongo/transport/service_executor_bm.cpp new file mode 100644 index 00000000000..529e3e36200 --- /dev/null +++ b/src/mongo/transport/service_executor_bm.cpp @@ -0,0 +1,154 @@ +/** + * Copyright (C) 2022-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include <benchmark/benchmark.h> + +#include "mongo/db/concurrency/locker_noop_client_observer.h" +#include "mongo/logv2/log.h" +#include "mongo/transport/service_executor_synchronous.h" +#include "mongo/unittest/barrier.h" +#include "mongo/util/processinfo.h" +#include "mongo/util/scopeguard.h" + +#define MONGO_LOGV2_DEFAULT_COMPONENT mongo::logv2::LogComponent::kTest + +namespace mongo::transport { +namespace { + +struct Notification { + void set() { + stdx::unique_lock lk{mu}; + notified = true; + cv.notify_all(); + } + + void get() { + stdx::unique_lock lk{mu}; + cv.wait(lk, [&] { return notified; }); + } + + stdx::mutex mu; // NOLINT + stdx::condition_variable cv; + bool notified = false; +}; + +class ServiceExecutorSynchronousBm : public benchmark::Fixture { +public: + void firstSetup() { + auto usc = ServiceContext::make(); + sc = usc.get(); + usc->registerClientObserver(std::make_unique<LockerNoopClientObserver>()); + setGlobalServiceContext(std::move(usc)); + (void)executor()->start(); + } + + ServiceExecutorSynchronous* executor() { + return ServiceExecutorSynchronous::get(sc); + } + + void lastTearDown() { + (void)executor()->shutdown(Hours{1}); + setGlobalServiceContext({}); + } + + void SetUp(benchmark::State& state) override { + stdx::lock_guard lk{mu}; + if (nThreads++) + return; + firstSetup(); + } + + void TearDown(benchmark::State& state) override { + stdx::lock_guard lk{mu}; + if (--nThreads) + return; + lastTearDown(); + } + + void runOnExec(ServiceExecutor::Task task) { + executor()->schedule(std::move(task)); + } + + stdx::mutex mu; // NOLINT + int nThreads = 0; + ServiceContext* sc; +}; + +/** 2x to benchmark the case of more threads than cores for curiosity's sake. */ +auto maxThreads = 2 * ProcessInfo::getNumCores(); + +BENCHMARK_DEFINE_F(ServiceExecutorSynchronousBm, ScheduleTask)(benchmark::State& state) { + for (auto _ : state) + runOnExec([](Status) {}); +} +BENCHMARK_REGISTER_F(ServiceExecutorSynchronousBm, ScheduleTask)->ThreadRange(1, maxThreads); + +/** A simplified ChainedSchedule with only one task. */ +BENCHMARK_DEFINE_F(ServiceExecutorSynchronousBm, ScheduleAndWait)(benchmark::State& state) { + for (auto _ : state) { + Notification done; + runOnExec([&](Status) { done.set(); }); + done.get(); + } +} +BENCHMARK_REGISTER_F(ServiceExecutorSynchronousBm, ScheduleAndWait)->ThreadRange(1, maxThreads); + +BENCHMARK_DEFINE_F(ServiceExecutorSynchronousBm, ChainedSchedule)(benchmark::State& state) { + int chainDepth = state.range(0); + Notification* donePtr = nullptr; + std::function<void(Status)> chainedTask = [&](Status) { donePtr->set(); }; + for (int step = 0; step != chainDepth; ++step) + chainedTask = [this, chainedTask](Status) { runOnExec(chainedTask); }; + + // The first scheduled task starts the worker thread. This test is + // specifically measuring the per-task schedule and run overhead. So startup + // costs are moved outside the loop. But it's tricky because that started + // thread will die if its task returns without scheduling a successor task. + // So we start the worker thread with a task that will pause until the + // benchmark loop resumes it. + for (auto _ : state) { + state.PauseTiming(); + unittest::Barrier startingLine(2); + Notification done; + donePtr = &done; + runOnExec([&](Status) { + startingLine.countDownAndWait(); + runOnExec(chainedTask); + }); + state.ResumeTiming(); + startingLine.countDownAndWait(); + done.get(); + } +} +BENCHMARK_REGISTER_F(ServiceExecutorSynchronousBm, ChainedSchedule) + ->Range(1, 2 << 10) + ->ThreadRange(1, maxThreads); + +} // namespace +} // namespace mongo::transport diff --git a/src/mongo/transport/service_executor_synchronous.cpp b/src/mongo/transport/service_executor_synchronous.cpp index a13053dfb08..002d44cb213 100644 --- a/src/mongo/transport/service_executor_synchronous.cpp +++ b/src/mongo/transport/service_executor_synchronous.cpp @@ -27,129 +27,174 @@ * it in the license file. */ - -#include "mongo/platform/basic.h" - #include "mongo/transport/service_executor_synchronous.h" #include "mongo/logv2/log.h" #include "mongo/stdx/thread.h" #include "mongo/transport/service_executor_utils.h" -#include "mongo/util/thread_safety_context.h" #define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kExecutor - -namespace mongo { -namespace transport { +namespace mongo::transport { namespace { -constexpr auto kExecutorName = "passthrough"_sd; - -constexpr auto kThreadsRunning = "threadsRunning"_sd; -constexpr auto kClientsInTotal = "clientsInTotal"_sd; -constexpr auto kClientsRunning = "clientsRunning"_sd; -constexpr auto kClientsWaiting = "clientsWaitingForData"_sd; - const auto getServiceExecutorSynchronous = ServiceContext::declareDecoration<std::unique_ptr<ServiceExecutorSynchronous>>(); -const auto serviceExecutorSynchronousRegisterer = ServiceContext::ConstructorActionRegisterer{ +const ServiceContext::ConstructorActionRegisterer serviceExecutorSynchronousRegisterer{ "ServiceExecutorSynchronous", [](ServiceContext* ctx) { getServiceExecutorSynchronous(ctx) = std::make_unique<ServiceExecutorSynchronous>(ctx); }}; } // namespace -thread_local std::deque<ServiceExecutor::Task> ServiceExecutorSynchronous::_localWorkQueue = {}; -thread_local int64_t ServiceExecutorSynchronous::_localThreadIdleCounter = 0; +class ServiceExecutorSynchronous::SharedState : public std::enable_shared_from_this<SharedState> { +private: + class LockRef { + public: + explicit LockRef(SharedState* p) : _p{p} {} -ServiceExecutorSynchronous::ServiceExecutorSynchronous(ServiceContext* ctx) - : _shutdownCondition(std::make_shared<stdx::condition_variable>()) {} + size_t threads() const { + return _p->_threads; + } -Status ServiceExecutorSynchronous::start() { - _stillRunning.store(true); + bool waitForDrain(Milliseconds dur) { + return _p->_cv.wait_for(_lk, dur.toSystemDuration(), [&] { return !_p->_threads; }); + } - return Status::OK(); -} + void onStartThread() { + ++_p->_threads; + } -Status ServiceExecutorSynchronous::shutdown(Milliseconds timeout) { - LOGV2_DEBUG(22982, 3, "Shutting down passthrough executor"); + void onEndThread() { + if (!--_p->_threads) + _p->_cv.notify_all(); + } - _stillRunning.store(false); + private: + SharedState* _p; + stdx::unique_lock<stdx::mutex> _lk{_p->_mutex}; // NOLINT + }; - stdx::unique_lock<Latch> lock(_shutdownMutex); - bool result = _shutdownCondition->wait_for(lock, timeout.toSystemDuration(), [this]() { - return _numRunningWorkerThreads.load() == 0; - }); +public: + void schedule(Task task); - return result - ? Status::OK() - : Status(ErrorCodes::Error::ExceededTimeLimit, - "passthrough executor couldn't shutdown all worker threads within time limit."); -} + bool isRunning() const { + return _isRunning.loadRelaxed(); + } -ServiceExecutorSynchronous* ServiceExecutorSynchronous::get(ServiceContext* ctx) { - auto& ref = getServiceExecutorSynchronous(ctx); - invariant(ref); - return ref.get(); -} + void setIsRunning(bool b) { + _isRunning.store(b); + } -void ServiceExecutorSynchronous::schedule(Task task) { - if (!_stillRunning.load()) { + LockRef lock() { + return LockRef{this}; + } + +private: + class WorkerThreadInfo; + + mutable stdx::mutex _mutex; // NOLINT + stdx::condition_variable _cv; + AtomicWord<bool> _isRunning; + size_t _threads = 0; +}; + +class ServiceExecutorSynchronous::SharedState::WorkerThreadInfo { +public: + explicit WorkerThreadInfo(std::shared_ptr<SharedState> sharedState) + : sharedState{std::move(sharedState)} {} + + void run() { + while (!queue.empty() && sharedState->isRunning()) { + queue.front()(Status::OK()); + queue.pop_front(); + } + } + + std::shared_ptr<SharedState> sharedState; + std::deque<Task> queue; +}; + +void ServiceExecutorSynchronous::SharedState::schedule(Task task) { + if (!isRunning()) { task(Status(ErrorCodes::ShutdownInProgress, "Executor is not running")); return; } - if (!_localWorkQueue.empty()) { - _localWorkQueue.emplace_back(std::move(task)); + thread_local WorkerThreadInfo* workerThreadInfoTls = nullptr; + + if (workerThreadInfoTls) { + workerThreadInfoTls->queue.push_back(std::move(task)); return; } - // First call to scheduleTask() for this connection, spawn a worker thread that will push jobs - // into the thread local job queue. - LOGV2_DEBUG(22983, 3, "Starting new executor thread in passthrough mode"); - - Status status = launchServiceWorkerThread( - [this, condVarAnchor = _shutdownCondition, task = std::move(task)]() mutable { - _numRunningWorkerThreads.addAndFetch(1); - - _localWorkQueue.emplace_back(std::move(task)); - while (!_localWorkQueue.empty() && _stillRunning.loadRelaxed()) { - _localWorkQueue.front()(Status::OK()); - _localWorkQueue.pop_front(); - } - - // We maintain an anchor to "_shutdownCondition" to ensure it remains alive even if the - // service executor is freed. Any access to the service executor (through "this") is - // prohibited (and unsafe) after the following line. For more context, see SERVER-49432. - auto numWorkerThreadsStillRunning = _numRunningWorkerThreads.subtractAndFetch(1); - if (numWorkerThreadsStillRunning == 0) { - condVarAnchor->notify_all(); - } - }); + LOGV2_DEBUG(22983, 3, "Starting ServiceExecutorSynchronous worker thread"); + auto workerInfo = std::make_unique<WorkerThreadInfo>(shared_from_this()); + workerInfo->queue.push_back(std::move(task)); + + Status status = launchServiceWorkerThread([w = std::move(workerInfo)] { + w->sharedState->lock().onStartThread(); + ScopeGuard onEndThreadGuard = [&] { w->sharedState->lock().onEndThread(); }; + + workerThreadInfoTls = &*w; + ScopeGuard resetTlsGuard = [&] { workerThreadInfoTls = nullptr; }; + + w->run(); + }); // The usual way to fail to schedule is to invoke the task, but in this case // we don't have the task anymore. We gave it away to the callback that the // failed thread was supposed to run. iassert(status); } +ServiceExecutorSynchronous::ServiceExecutorSynchronous(ServiceContext*) + : _sharedState{std::make_shared<SharedState>()} {} + +ServiceExecutorSynchronous::~ServiceExecutorSynchronous() = default; + +Status ServiceExecutorSynchronous::start() { + _sharedState->setIsRunning(true); + return Status::OK(); +} + +Status ServiceExecutorSynchronous::shutdown(Milliseconds timeout) { + LOGV2_DEBUG(22982, 3, "Shutting down passthrough executor"); + auto stopLock = _sharedState->lock(); + _sharedState->setIsRunning(false); + if (!stopLock.waitForDrain(timeout)) + return Status(ErrorCodes::Error::ExceededTimeLimit, + "passthrough executor couldn't shutdown " + "all worker threads within time limit."); + return Status::OK(); +} + +ServiceExecutorSynchronous* ServiceExecutorSynchronous::get(ServiceContext* ctx) { + auto& ref = getServiceExecutorSynchronous(ctx); + invariant(ref); + return ref.get(); +} + +void ServiceExecutorSynchronous::schedule(Task task) { + _sharedState->schedule(std::move(task)); +} + +size_t ServiceExecutorSynchronous::getRunningThreads() const { + return _sharedState->lock().threads(); +} + void ServiceExecutorSynchronous::appendStats(BSONObjBuilder* bob) const { - // The ServiceExecutorSynchronous has one client per thread and waits synchronously on thread. - auto threads = static_cast<int>(_numRunningWorkerThreads.loadRelaxed()); - - BSONObjBuilder subbob = bob->subobjStart(kExecutorName); - subbob.append(kThreadsRunning, threads); - subbob.append(kClientsInTotal, threads); - subbob.append(kClientsRunning, threads); - subbob.append(kClientsWaiting, 0); + // Has one client per thread and waits synchronously on that thread. + int threads = getRunningThreads(); + BSONObjBuilder{bob->subobjStart("passthrough")} + .append("threadsRunning", threads) + .append("clientsInTotal", threads) + .append("clientsRunning", threads) + .append("clientsWaitingForData", 0); } -void ServiceExecutorSynchronous::runOnDataAvailable(const SessionHandle& session, Task callback) { +void ServiceExecutorSynchronous::runOnDataAvailable(const SessionHandle& session, Task task) { invariant(session); yieldIfAppropriate(); - - schedule([callback = std::move(callback)](Status status) { callback(std::move(status)); }); + schedule(std::move(task)); } - -} // namespace transport -} // namespace mongo +} // namespace mongo::transport diff --git a/src/mongo/transport/service_executor_synchronous.h b/src/mongo/transport/service_executor_synchronous.h index 4b29f1299c1..28a9b258883 100644 --- a/src/mongo/transport/service_executor_synchronous.h +++ b/src/mongo/transport/service_executor_synchronous.h @@ -43,40 +43,48 @@ namespace mongo { namespace transport { /** - * The passthrough service executor emulates a thread per connection. - * Each connection has its own worker thread where jobs get scheduled. + * Creates a fresh worker thread for each top-level scheduled task. Any tasks + * scheduled during the execution of that top-level task as it runs on such a + * worker thread are pushed to the queue of that worker thread. + * + * Thus, the top-level task is expected to represent a chain of operations, each + * of which schedules its successor before returning. The entire chain of + * operations, and nothing else, executes on the same worker thread. */ class ServiceExecutorSynchronous final : public ServiceExecutor { public: - explicit ServiceExecutorSynchronous(ServiceContext* ctx); - + /** Returns the ServiceExecutorSynchronous decoration on `ctx`. */ static ServiceExecutorSynchronous* get(ServiceContext* ctx); + explicit ServiceExecutorSynchronous(ServiceContext*); + + ~ServiceExecutorSynchronous(); + Status start() override; Status shutdown(Milliseconds timeout) override; + + /** + * The behavior of `schedule` depends on whether the calling thread is a + * worker thread spawned by a previous `schedule` call. + * + * If a nonworker thread schedules a task, a worker thread is spawned, and + * the task is transferred to the new worker thread's queue. + * + * If a worker thread schedules a task, the task is pushed to the back of its + * queue. The worker thread exits when the queue becomes empty. + */ void schedule(Task task) override; - size_t getRunningThreads() const override { - return _numRunningWorkerThreads.loadRelaxed(); - } + size_t getRunningThreads() const override; void runOnDataAvailable(const SessionHandle& session, Task onCompletionCallback) override; void appendStats(BSONObjBuilder* bob) const override; private: - static thread_local std::deque<Task> _localWorkQueue; - static thread_local int _localRecursionDepth; - static thread_local int64_t _localThreadIdleCounter; - - AtomicWord<bool> _stillRunning{false}; - - mutable Mutex _shutdownMutex = MONGO_MAKE_LATCH(HierarchicalAcquisitionLevel(0), - "ServiceExecutorSynchronous::_shutdownMutex"); - std::shared_ptr<stdx::condition_variable> _shutdownCondition; + class SharedState; - AtomicWord<size_t> _numRunningWorkerThreads{0}; - size_t _numHardwareCores{0}; + std::shared_ptr<SharedState> _sharedState; }; } // namespace transport diff --git a/src/mongo/transport/service_executor_utils.cpp b/src/mongo/transport/service_executor_utils.cpp index 6b37efcd383..96b03bdf989 100644 --- a/src/mongo/transport/service_executor_utils.cpp +++ b/src/mongo/transport/service_executor_utils.cpp @@ -27,9 +27,6 @@ * it in the license file. */ - -#include "mongo/platform/basic.h" - #include "mongo/transport/service_executor_utils.h" #include <fmt/format.h> @@ -56,11 +53,11 @@ #define __has_feature(x) 0 #endif -using namespace fmt::literals; - -namespace mongo { +namespace mongo::transport { namespace { +using namespace fmt::literals; + void* runFunc(void* ctx) { auto taskPtr = std::unique_ptr<unique_function<void()>>(static_cast<unique_function<void()>*>(ctx)); @@ -70,7 +67,7 @@ void* runFunc(void* ctx) { } } // namespace -Status launchServiceWorkerThread(unique_function<void()> task) noexcept { +Status launchServiceWorkerThread(unique_function<void()> task) { try { #if defined(_WIN32) @@ -160,9 +157,9 @@ Status launchServiceWorkerThread(unique_function<void()> task) noexcept { return Status::OK(); } -void scheduleCallbackOnDataAvailable(const transport::SessionHandle& session, +void scheduleCallbackOnDataAvailable(const SessionHandle& session, unique_function<void(Status)> callback, - transport::ServiceExecutor* executor) noexcept { + ServiceExecutor* executor) { invariant(session); executor->schedule([session, callback = std::move(callback), executor](Status status) { executor->yieldIfAppropriate(); @@ -176,4 +173,4 @@ void scheduleCallbackOnDataAvailable(const transport::SessionHandle& session, }); } -} // namespace mongo +} // namespace mongo::transport diff --git a/src/mongo/transport/service_executor_utils.h b/src/mongo/transport/service_executor_utils.h index 10ab4880dea..057e0a1686d 100644 --- a/src/mongo/transport/service_executor_utils.h +++ b/src/mongo/transport/service_executor_utils.h @@ -29,25 +29,21 @@ #pragma once -#include <functional> - +#include "mongo/transport/service_executor.h" #include "mongo/transport/session.h" #include "mongo/util/functional.h" -namespace mongo { - -namespace transport { -class ServiceExecutor; -} +namespace mongo::transport { -Status launchServiceWorkerThread(unique_function<void()> task) noexcept; +Status launchServiceWorkerThread(unique_function<void()> task); -/* The default implementation for "ServiceExecutor::runOnDataAvailable()", which blocks the caller +/** + * The default implementation for "ServiceExecutor::runOnDataAvailable()", which blocks the caller * thread until data is available for reading. On success, it schedules "callback" on "executor". * Other implementations (e.g., "ServiceExecutorFixed") may provide asynchronous variants. */ -void scheduleCallbackOnDataAvailable(const transport::SessionHandle& session, +void scheduleCallbackOnDataAvailable(const SessionHandle& session, unique_function<void(Status)> callback, - transport::ServiceExecutor* executor) noexcept; + ServiceExecutor* executor); -} // namespace mongo +} // namespace mongo::transport |