summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBilly Donahue <billy.donahue@mongodb.com>2022-09-22 06:30:32 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2022-09-22 07:02:51 +0000
commit25749cbec3d1e8244276a592e3a3dfdb333c061a (patch)
tree9eacdbf920193f45b057df433238faaf01df1e25
parentc99b24eef4cd1e7e86548f311c06c358ade021c4 (diff)
downloadmongo-25749cbec3d1e8244276a592e3a3dfdb333c061a.tar.gz
SERVER-69570 ServiceExecutorSynchronous optimizations
-rw-r--r--src/mongo/transport/SConscript13
-rw-r--r--src/mongo/transport/service_executor_bm.cpp154
-rw-r--r--src/mongo/transport/service_executor_synchronous.cpp205
-rw-r--r--src/mongo/transport/service_executor_synchronous.h44
-rw-r--r--src/mongo/transport/service_executor_utils.cpp17
-rw-r--r--src/mongo/transport/service_executor_utils.h20
6 files changed, 333 insertions, 120 deletions
diff --git a/src/mongo/transport/SConscript b/src/mongo/transport/SConscript
index 29de1d74759..bba85a8a268 100644
--- a/src/mongo/transport/SConscript
+++ b/src/mongo/transport/SConscript
@@ -247,6 +247,19 @@ tlEnvTest.CppIntegrationTest(
)
env.Benchmark(
+ target='service_executor_bm',
+ source=[
+ 'service_executor_bm.cpp',
+ ],
+ LIBDEPS=[
+ '$BUILD_DIR/mongo/base',
+ '$BUILD_DIR/mongo/db/service_context_test_fixture',
+ 'service_executor',
+ 'transport_layer_mock',
+ ],
+)
+
+env.Benchmark(
target='session_workflow_bm',
source=[
'session_workflow_bm.cpp',
diff --git a/src/mongo/transport/service_executor_bm.cpp b/src/mongo/transport/service_executor_bm.cpp
new file mode 100644
index 00000000000..529e3e36200
--- /dev/null
+++ b/src/mongo/transport/service_executor_bm.cpp
@@ -0,0 +1,154 @@
+/**
+ * Copyright (C) 2022-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#include <benchmark/benchmark.h>
+
+#include "mongo/db/concurrency/locker_noop_client_observer.h"
+#include "mongo/logv2/log.h"
+#include "mongo/transport/service_executor_synchronous.h"
+#include "mongo/unittest/barrier.h"
+#include "mongo/util/processinfo.h"
+#include "mongo/util/scopeguard.h"
+
+#define MONGO_LOGV2_DEFAULT_COMPONENT mongo::logv2::LogComponent::kTest
+
+namespace mongo::transport {
+namespace {
+
+struct Notification {
+ void set() {
+ stdx::unique_lock lk{mu};
+ notified = true;
+ cv.notify_all();
+ }
+
+ void get() {
+ stdx::unique_lock lk{mu};
+ cv.wait(lk, [&] { return notified; });
+ }
+
+ stdx::mutex mu; // NOLINT
+ stdx::condition_variable cv;
+ bool notified = false;
+};
+
+class ServiceExecutorSynchronousBm : public benchmark::Fixture {
+public:
+ void firstSetup() {
+ auto usc = ServiceContext::make();
+ sc = usc.get();
+ usc->registerClientObserver(std::make_unique<LockerNoopClientObserver>());
+ setGlobalServiceContext(std::move(usc));
+ (void)executor()->start();
+ }
+
+ ServiceExecutorSynchronous* executor() {
+ return ServiceExecutorSynchronous::get(sc);
+ }
+
+ void lastTearDown() {
+ (void)executor()->shutdown(Hours{1});
+ setGlobalServiceContext({});
+ }
+
+ void SetUp(benchmark::State& state) override {
+ stdx::lock_guard lk{mu};
+ if (nThreads++)
+ return;
+ firstSetup();
+ }
+
+ void TearDown(benchmark::State& state) override {
+ stdx::lock_guard lk{mu};
+ if (--nThreads)
+ return;
+ lastTearDown();
+ }
+
+ void runOnExec(ServiceExecutor::Task task) {
+ executor()->schedule(std::move(task));
+ }
+
+ stdx::mutex mu; // NOLINT
+ int nThreads = 0;
+ ServiceContext* sc;
+};
+
+/** 2x to benchmark the case of more threads than cores for curiosity's sake. */
+auto maxThreads = 2 * ProcessInfo::getNumCores();
+
+BENCHMARK_DEFINE_F(ServiceExecutorSynchronousBm, ScheduleTask)(benchmark::State& state) {
+ for (auto _ : state)
+ runOnExec([](Status) {});
+}
+BENCHMARK_REGISTER_F(ServiceExecutorSynchronousBm, ScheduleTask)->ThreadRange(1, maxThreads);
+
+/** A simplified ChainedSchedule with only one task. */
+BENCHMARK_DEFINE_F(ServiceExecutorSynchronousBm, ScheduleAndWait)(benchmark::State& state) {
+ for (auto _ : state) {
+ Notification done;
+ runOnExec([&](Status) { done.set(); });
+ done.get();
+ }
+}
+BENCHMARK_REGISTER_F(ServiceExecutorSynchronousBm, ScheduleAndWait)->ThreadRange(1, maxThreads);
+
+BENCHMARK_DEFINE_F(ServiceExecutorSynchronousBm, ChainedSchedule)(benchmark::State& state) {
+ int chainDepth = state.range(0);
+ Notification* donePtr = nullptr;
+ std::function<void(Status)> chainedTask = [&](Status) { donePtr->set(); };
+ for (int step = 0; step != chainDepth; ++step)
+ chainedTask = [this, chainedTask](Status) { runOnExec(chainedTask); };
+
+ // The first scheduled task starts the worker thread. This test is
+ // specifically measuring the per-task schedule and run overhead. So startup
+ // costs are moved outside the loop. But it's tricky because that started
+ // thread will die if its task returns without scheduling a successor task.
+ // So we start the worker thread with a task that will pause until the
+ // benchmark loop resumes it.
+ for (auto _ : state) {
+ state.PauseTiming();
+ unittest::Barrier startingLine(2);
+ Notification done;
+ donePtr = &done;
+ runOnExec([&](Status) {
+ startingLine.countDownAndWait();
+ runOnExec(chainedTask);
+ });
+ state.ResumeTiming();
+ startingLine.countDownAndWait();
+ done.get();
+ }
+}
+BENCHMARK_REGISTER_F(ServiceExecutorSynchronousBm, ChainedSchedule)
+ ->Range(1, 2 << 10)
+ ->ThreadRange(1, maxThreads);
+
+} // namespace
+} // namespace mongo::transport
diff --git a/src/mongo/transport/service_executor_synchronous.cpp b/src/mongo/transport/service_executor_synchronous.cpp
index a13053dfb08..002d44cb213 100644
--- a/src/mongo/transport/service_executor_synchronous.cpp
+++ b/src/mongo/transport/service_executor_synchronous.cpp
@@ -27,129 +27,174 @@
* it in the license file.
*/
-
-#include "mongo/platform/basic.h"
-
#include "mongo/transport/service_executor_synchronous.h"
#include "mongo/logv2/log.h"
#include "mongo/stdx/thread.h"
#include "mongo/transport/service_executor_utils.h"
-#include "mongo/util/thread_safety_context.h"
#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kExecutor
-
-namespace mongo {
-namespace transport {
+namespace mongo::transport {
namespace {
-constexpr auto kExecutorName = "passthrough"_sd;
-
-constexpr auto kThreadsRunning = "threadsRunning"_sd;
-constexpr auto kClientsInTotal = "clientsInTotal"_sd;
-constexpr auto kClientsRunning = "clientsRunning"_sd;
-constexpr auto kClientsWaiting = "clientsWaitingForData"_sd;
-
const auto getServiceExecutorSynchronous =
ServiceContext::declareDecoration<std::unique_ptr<ServiceExecutorSynchronous>>();
-const auto serviceExecutorSynchronousRegisterer = ServiceContext::ConstructorActionRegisterer{
+const ServiceContext::ConstructorActionRegisterer serviceExecutorSynchronousRegisterer{
"ServiceExecutorSynchronous", [](ServiceContext* ctx) {
getServiceExecutorSynchronous(ctx) = std::make_unique<ServiceExecutorSynchronous>(ctx);
}};
} // namespace
-thread_local std::deque<ServiceExecutor::Task> ServiceExecutorSynchronous::_localWorkQueue = {};
-thread_local int64_t ServiceExecutorSynchronous::_localThreadIdleCounter = 0;
+class ServiceExecutorSynchronous::SharedState : public std::enable_shared_from_this<SharedState> {
+private:
+ class LockRef {
+ public:
+ explicit LockRef(SharedState* p) : _p{p} {}
-ServiceExecutorSynchronous::ServiceExecutorSynchronous(ServiceContext* ctx)
- : _shutdownCondition(std::make_shared<stdx::condition_variable>()) {}
+ size_t threads() const {
+ return _p->_threads;
+ }
-Status ServiceExecutorSynchronous::start() {
- _stillRunning.store(true);
+ bool waitForDrain(Milliseconds dur) {
+ return _p->_cv.wait_for(_lk, dur.toSystemDuration(), [&] { return !_p->_threads; });
+ }
- return Status::OK();
-}
+ void onStartThread() {
+ ++_p->_threads;
+ }
-Status ServiceExecutorSynchronous::shutdown(Milliseconds timeout) {
- LOGV2_DEBUG(22982, 3, "Shutting down passthrough executor");
+ void onEndThread() {
+ if (!--_p->_threads)
+ _p->_cv.notify_all();
+ }
- _stillRunning.store(false);
+ private:
+ SharedState* _p;
+ stdx::unique_lock<stdx::mutex> _lk{_p->_mutex}; // NOLINT
+ };
- stdx::unique_lock<Latch> lock(_shutdownMutex);
- bool result = _shutdownCondition->wait_for(lock, timeout.toSystemDuration(), [this]() {
- return _numRunningWorkerThreads.load() == 0;
- });
+public:
+ void schedule(Task task);
- return result
- ? Status::OK()
- : Status(ErrorCodes::Error::ExceededTimeLimit,
- "passthrough executor couldn't shutdown all worker threads within time limit.");
-}
+ bool isRunning() const {
+ return _isRunning.loadRelaxed();
+ }
-ServiceExecutorSynchronous* ServiceExecutorSynchronous::get(ServiceContext* ctx) {
- auto& ref = getServiceExecutorSynchronous(ctx);
- invariant(ref);
- return ref.get();
-}
+ void setIsRunning(bool b) {
+ _isRunning.store(b);
+ }
-void ServiceExecutorSynchronous::schedule(Task task) {
- if (!_stillRunning.load()) {
+ LockRef lock() {
+ return LockRef{this};
+ }
+
+private:
+ class WorkerThreadInfo;
+
+ mutable stdx::mutex _mutex; // NOLINT
+ stdx::condition_variable _cv;
+ AtomicWord<bool> _isRunning;
+ size_t _threads = 0;
+};
+
+class ServiceExecutorSynchronous::SharedState::WorkerThreadInfo {
+public:
+ explicit WorkerThreadInfo(std::shared_ptr<SharedState> sharedState)
+ : sharedState{std::move(sharedState)} {}
+
+ void run() {
+ while (!queue.empty() && sharedState->isRunning()) {
+ queue.front()(Status::OK());
+ queue.pop_front();
+ }
+ }
+
+ std::shared_ptr<SharedState> sharedState;
+ std::deque<Task> queue;
+};
+
+void ServiceExecutorSynchronous::SharedState::schedule(Task task) {
+ if (!isRunning()) {
task(Status(ErrorCodes::ShutdownInProgress, "Executor is not running"));
return;
}
- if (!_localWorkQueue.empty()) {
- _localWorkQueue.emplace_back(std::move(task));
+ thread_local WorkerThreadInfo* workerThreadInfoTls = nullptr;
+
+ if (workerThreadInfoTls) {
+ workerThreadInfoTls->queue.push_back(std::move(task));
return;
}
- // First call to scheduleTask() for this connection, spawn a worker thread that will push jobs
- // into the thread local job queue.
- LOGV2_DEBUG(22983, 3, "Starting new executor thread in passthrough mode");
-
- Status status = launchServiceWorkerThread(
- [this, condVarAnchor = _shutdownCondition, task = std::move(task)]() mutable {
- _numRunningWorkerThreads.addAndFetch(1);
-
- _localWorkQueue.emplace_back(std::move(task));
- while (!_localWorkQueue.empty() && _stillRunning.loadRelaxed()) {
- _localWorkQueue.front()(Status::OK());
- _localWorkQueue.pop_front();
- }
-
- // We maintain an anchor to "_shutdownCondition" to ensure it remains alive even if the
- // service executor is freed. Any access to the service executor (through "this") is
- // prohibited (and unsafe) after the following line. For more context, see SERVER-49432.
- auto numWorkerThreadsStillRunning = _numRunningWorkerThreads.subtractAndFetch(1);
- if (numWorkerThreadsStillRunning == 0) {
- condVarAnchor->notify_all();
- }
- });
+ LOGV2_DEBUG(22983, 3, "Starting ServiceExecutorSynchronous worker thread");
+ auto workerInfo = std::make_unique<WorkerThreadInfo>(shared_from_this());
+ workerInfo->queue.push_back(std::move(task));
+
+ Status status = launchServiceWorkerThread([w = std::move(workerInfo)] {
+ w->sharedState->lock().onStartThread();
+ ScopeGuard onEndThreadGuard = [&] { w->sharedState->lock().onEndThread(); };
+
+ workerThreadInfoTls = &*w;
+ ScopeGuard resetTlsGuard = [&] { workerThreadInfoTls = nullptr; };
+
+ w->run();
+ });
// The usual way to fail to schedule is to invoke the task, but in this case
// we don't have the task anymore. We gave it away to the callback that the
// failed thread was supposed to run.
iassert(status);
}
+ServiceExecutorSynchronous::ServiceExecutorSynchronous(ServiceContext*)
+ : _sharedState{std::make_shared<SharedState>()} {}
+
+ServiceExecutorSynchronous::~ServiceExecutorSynchronous() = default;
+
+Status ServiceExecutorSynchronous::start() {
+ _sharedState->setIsRunning(true);
+ return Status::OK();
+}
+
+Status ServiceExecutorSynchronous::shutdown(Milliseconds timeout) {
+ LOGV2_DEBUG(22982, 3, "Shutting down passthrough executor");
+ auto stopLock = _sharedState->lock();
+ _sharedState->setIsRunning(false);
+ if (!stopLock.waitForDrain(timeout))
+ return Status(ErrorCodes::Error::ExceededTimeLimit,
+ "passthrough executor couldn't shutdown "
+ "all worker threads within time limit.");
+ return Status::OK();
+}
+
+ServiceExecutorSynchronous* ServiceExecutorSynchronous::get(ServiceContext* ctx) {
+ auto& ref = getServiceExecutorSynchronous(ctx);
+ invariant(ref);
+ return ref.get();
+}
+
+void ServiceExecutorSynchronous::schedule(Task task) {
+ _sharedState->schedule(std::move(task));
+}
+
+size_t ServiceExecutorSynchronous::getRunningThreads() const {
+ return _sharedState->lock().threads();
+}
+
void ServiceExecutorSynchronous::appendStats(BSONObjBuilder* bob) const {
- // The ServiceExecutorSynchronous has one client per thread and waits synchronously on thread.
- auto threads = static_cast<int>(_numRunningWorkerThreads.loadRelaxed());
-
- BSONObjBuilder subbob = bob->subobjStart(kExecutorName);
- subbob.append(kThreadsRunning, threads);
- subbob.append(kClientsInTotal, threads);
- subbob.append(kClientsRunning, threads);
- subbob.append(kClientsWaiting, 0);
+ // Has one client per thread and waits synchronously on that thread.
+ int threads = getRunningThreads();
+ BSONObjBuilder{bob->subobjStart("passthrough")}
+ .append("threadsRunning", threads)
+ .append("clientsInTotal", threads)
+ .append("clientsRunning", threads)
+ .append("clientsWaitingForData", 0);
}
-void ServiceExecutorSynchronous::runOnDataAvailable(const SessionHandle& session, Task callback) {
+void ServiceExecutorSynchronous::runOnDataAvailable(const SessionHandle& session, Task task) {
invariant(session);
yieldIfAppropriate();
-
- schedule([callback = std::move(callback)](Status status) { callback(std::move(status)); });
+ schedule(std::move(task));
}
-
-} // namespace transport
-} // namespace mongo
+} // namespace mongo::transport
diff --git a/src/mongo/transport/service_executor_synchronous.h b/src/mongo/transport/service_executor_synchronous.h
index 4b29f1299c1..28a9b258883 100644
--- a/src/mongo/transport/service_executor_synchronous.h
+++ b/src/mongo/transport/service_executor_synchronous.h
@@ -43,40 +43,48 @@ namespace mongo {
namespace transport {
/**
- * The passthrough service executor emulates a thread per connection.
- * Each connection has its own worker thread where jobs get scheduled.
+ * Creates a fresh worker thread for each top-level scheduled task. Any tasks
+ * scheduled during the execution of that top-level task as it runs on such a
+ * worker thread are pushed to the queue of that worker thread.
+ *
+ * Thus, the top-level task is expected to represent a chain of operations, each
+ * of which schedules its successor before returning. The entire chain of
+ * operations, and nothing else, executes on the same worker thread.
*/
class ServiceExecutorSynchronous final : public ServiceExecutor {
public:
- explicit ServiceExecutorSynchronous(ServiceContext* ctx);
-
+ /** Returns the ServiceExecutorSynchronous decoration on `ctx`. */
static ServiceExecutorSynchronous* get(ServiceContext* ctx);
+ explicit ServiceExecutorSynchronous(ServiceContext*);
+
+ ~ServiceExecutorSynchronous();
+
Status start() override;
Status shutdown(Milliseconds timeout) override;
+
+ /**
+ * The behavior of `schedule` depends on whether the calling thread is a
+ * worker thread spawned by a previous `schedule` call.
+ *
+ * If a nonworker thread schedules a task, a worker thread is spawned, and
+ * the task is transferred to the new worker thread's queue.
+ *
+ * If a worker thread schedules a task, the task is pushed to the back of its
+ * queue. The worker thread exits when the queue becomes empty.
+ */
void schedule(Task task) override;
- size_t getRunningThreads() const override {
- return _numRunningWorkerThreads.loadRelaxed();
- }
+ size_t getRunningThreads() const override;
void runOnDataAvailable(const SessionHandle& session, Task onCompletionCallback) override;
void appendStats(BSONObjBuilder* bob) const override;
private:
- static thread_local std::deque<Task> _localWorkQueue;
- static thread_local int _localRecursionDepth;
- static thread_local int64_t _localThreadIdleCounter;
-
- AtomicWord<bool> _stillRunning{false};
-
- mutable Mutex _shutdownMutex = MONGO_MAKE_LATCH(HierarchicalAcquisitionLevel(0),
- "ServiceExecutorSynchronous::_shutdownMutex");
- std::shared_ptr<stdx::condition_variable> _shutdownCondition;
+ class SharedState;
- AtomicWord<size_t> _numRunningWorkerThreads{0};
- size_t _numHardwareCores{0};
+ std::shared_ptr<SharedState> _sharedState;
};
} // namespace transport
diff --git a/src/mongo/transport/service_executor_utils.cpp b/src/mongo/transport/service_executor_utils.cpp
index 6b37efcd383..96b03bdf989 100644
--- a/src/mongo/transport/service_executor_utils.cpp
+++ b/src/mongo/transport/service_executor_utils.cpp
@@ -27,9 +27,6 @@
* it in the license file.
*/
-
-#include "mongo/platform/basic.h"
-
#include "mongo/transport/service_executor_utils.h"
#include <fmt/format.h>
@@ -56,11 +53,11 @@
#define __has_feature(x) 0
#endif
-using namespace fmt::literals;
-
-namespace mongo {
+namespace mongo::transport {
namespace {
+using namespace fmt::literals;
+
void* runFunc(void* ctx) {
auto taskPtr =
std::unique_ptr<unique_function<void()>>(static_cast<unique_function<void()>*>(ctx));
@@ -70,7 +67,7 @@ void* runFunc(void* ctx) {
}
} // namespace
-Status launchServiceWorkerThread(unique_function<void()> task) noexcept {
+Status launchServiceWorkerThread(unique_function<void()> task) {
try {
#if defined(_WIN32)
@@ -160,9 +157,9 @@ Status launchServiceWorkerThread(unique_function<void()> task) noexcept {
return Status::OK();
}
-void scheduleCallbackOnDataAvailable(const transport::SessionHandle& session,
+void scheduleCallbackOnDataAvailable(const SessionHandle& session,
unique_function<void(Status)> callback,
- transport::ServiceExecutor* executor) noexcept {
+ ServiceExecutor* executor) {
invariant(session);
executor->schedule([session, callback = std::move(callback), executor](Status status) {
executor->yieldIfAppropriate();
@@ -176,4 +173,4 @@ void scheduleCallbackOnDataAvailable(const transport::SessionHandle& session,
});
}
-} // namespace mongo
+} // namespace mongo::transport
diff --git a/src/mongo/transport/service_executor_utils.h b/src/mongo/transport/service_executor_utils.h
index 10ab4880dea..057e0a1686d 100644
--- a/src/mongo/transport/service_executor_utils.h
+++ b/src/mongo/transport/service_executor_utils.h
@@ -29,25 +29,21 @@
#pragma once
-#include <functional>
-
+#include "mongo/transport/service_executor.h"
#include "mongo/transport/session.h"
#include "mongo/util/functional.h"
-namespace mongo {
-
-namespace transport {
-class ServiceExecutor;
-}
+namespace mongo::transport {
-Status launchServiceWorkerThread(unique_function<void()> task) noexcept;
+Status launchServiceWorkerThread(unique_function<void()> task);
-/* The default implementation for "ServiceExecutor::runOnDataAvailable()", which blocks the caller
+/**
+ * The default implementation for "ServiceExecutor::runOnDataAvailable()", which blocks the caller
* thread until data is available for reading. On success, it schedules "callback" on "executor".
* Other implementations (e.g., "ServiceExecutorFixed") may provide asynchronous variants.
*/
-void scheduleCallbackOnDataAvailable(const transport::SessionHandle& session,
+void scheduleCallbackOnDataAvailable(const SessionHandle& session,
unique_function<void(Status)> callback,
- transport::ServiceExecutor* executor) noexcept;
+ ServiceExecutor* executor);
-} // namespace mongo
+} // namespace mongo::transport