diff options
author | Billy Donahue <billy.donahue@mongodb.com> | 2022-10-27 18:46:06 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2022-10-27 19:47:47 +0000 |
commit | 7b9c8c6a22e38fd34ca313bb0637615f3075f720 (patch) | |
tree | e22c17190689b8bb22b54741aae30196273394de | |
parent | 3fd2cc553a27067f4662a0708cd2e98cd122dffb (diff) | |
download | mongo-7b9c8c6a22e38fd34ca313bb0637615f3075f720.tar.gz |
SERVER-70863 ServiceExecutor::TaskRunner
-rw-r--r-- | src/mongo/transport/mock_service_executor.h | 21 | ||||
-rw-r--r-- | src/mongo/transport/service_executor.h | 38 | ||||
-rw-r--r-- | src/mongo/transport/service_executor_bm.cpp | 44 | ||||
-rw-r--r-- | src/mongo/transport/service_executor_fixed.cpp | 50 | ||||
-rw-r--r-- | src/mongo/transport/service_executor_fixed.h | 10 | ||||
-rw-r--r-- | src/mongo/transport/service_executor_reserved.cpp | 42 | ||||
-rw-r--r-- | src/mongo/transport/service_executor_reserved.h | 9 | ||||
-rw-r--r-- | src/mongo/transport/service_executor_synchronous.cpp | 29 | ||||
-rw-r--r-- | src/mongo/transport/service_executor_synchronous.h | 19 | ||||
-rw-r--r-- | src/mongo/transport/service_executor_test.cpp | 47 | ||||
-rw-r--r-- | src/mongo/transport/service_executor_utils.cpp | 16 | ||||
-rw-r--r-- | src/mongo/transport/service_executor_utils.h | 9 | ||||
-rw-r--r-- | src/mongo/transport/session_workflow.cpp | 19 |
13 files changed, 229 insertions, 124 deletions
diff --git a/src/mongo/transport/mock_service_executor.h b/src/mongo/transport/mock_service_executor.h index 16928d8bfef..4b4eaf752a1 100644 --- a/src/mongo/transport/mock_service_executor.h +++ b/src/mongo/transport/mock_service_executor.h @@ -42,12 +42,23 @@ public: return startCb(); } - void schedule(Task task) override { - return scheduleTaskCb(std::move(task)); - } + std::unique_ptr<TaskRunner> makeTaskRunner() override { + class Runner : public TaskRunner { + public: + explicit Runner(MockServiceExecutor* p) : _p{p} {} + + void schedule(Task task) override { + return _p->scheduleTaskCb(std::move(task)); + } + + void runOnDataAvailable(SessionHandle session, Task onCompletionCallback) override { + _p->runOnDataAvailableCb(session, std::move(onCompletionCallback)); + } - void runOnDataAvailable(const SessionHandle& session, Task onCompletionCallback) override { - runOnDataAvailableCb(session, std::move(onCompletionCallback)); + private: + MockServiceExecutor* _p; + }; + return std::make_unique<Runner>(this); } Status shutdown(Milliseconds timeout) override { diff --git a/src/mongo/transport/service_executor.h b/src/mongo/transport/service_executor.h index a1bac34e93c..d6b53453ba6 100644 --- a/src/mongo/transport/service_executor.h +++ b/src/mongo/transport/service_executor.h @@ -42,33 +42,39 @@ #include "mongo/util/functional.h" #include "mongo/util/out_of_line_executor.h" -namespace mongo { -namespace transport { +namespace mongo::transport { extern bool gInitialUseDedicatedThread; /* * This is the interface for all ServiceExecutors. */ -class ServiceExecutor : public OutOfLineExecutor { +class ServiceExecutor { public: + using Task = OutOfLineExecutor::Task; + + class TaskRunner : public OutOfLineExecutor { + public: + /** + * Awaits the availability of incoming data for the specified session. On success, it will + * schedule the callback on current executor. Otherwise, it will invoke the callback with a + * non-okay status on the caller thread. + */ + virtual void runOnDataAvailable(std::shared_ptr<Session> session, Task task) = 0; + }; + static void shutdownAll(ServiceContext* serviceContext, Date_t deadline); virtual ~ServiceExecutor() = default; + virtual std::unique_ptr<TaskRunner> makeTaskRunner() = 0; + /* * Starts the ServiceExecutor. This may create threads even if no tasks are scheduled. */ virtual Status start() = 0; /* - * Awaits the availability of incoming data for the specified session. On success, it will - * schedule the callback on current executor. Otherwise, it will invoke the callback with a - * non-okay status on the caller thread. - */ - virtual void runOnDataAvailable(const SessionHandle& session, Task onCompletionCallback) = 0; - - /* * Stops and joins the ServiceExecutor. Any outstanding tasks will not be executed, and any * associated callbacks waiting on I/O may get called with an error code. * @@ -78,14 +84,10 @@ public: virtual size_t getRunningThreads() const = 0; - /* - * Appends statistics about task scheduling to a BSONObjBuilder for serverStatus output. - */ + /** Appends statistics about task scheduling to a BSONObjBuilder for serverStatus output. */ virtual void appendStats(BSONObjBuilder* bob) const = 0; - /** - * Yield if we have more threads than cores. - */ + /** Yield if this executor controls more threads than we have cores. */ void yieldIfAppropriate() const; }; @@ -191,6 +193,4 @@ public: size_t limitExempt = 0; }; -} // namespace transport - -} // namespace mongo +} // namespace mongo::transport diff --git a/src/mongo/transport/service_executor_bm.cpp b/src/mongo/transport/service_executor_bm.cpp index 529e3e36200..b9e5b7e30dd 100644 --- a/src/mongo/transport/service_executor_bm.cpp +++ b/src/mongo/transport/service_executor_bm.cpp @@ -91,8 +91,8 @@ public: lastTearDown(); } - void runOnExec(ServiceExecutor::Task task) { - executor()->schedule(std::move(task)); + void runOnExec(ServiceExecutor::TaskRunner* taskRunner, ServiceExecutor::Task task) { + taskRunner->schedule(std::move(task)); } stdx::mutex mu; // NOLINT @@ -104,16 +104,19 @@ public: auto maxThreads = 2 * ProcessInfo::getNumCores(); BENCHMARK_DEFINE_F(ServiceExecutorSynchronousBm, ScheduleTask)(benchmark::State& state) { - for (auto _ : state) - runOnExec([](Status) {}); + for (auto _ : state) { + auto runner = executor()->makeTaskRunner(); + runOnExec(&*runner, [](Status) {}); + } } BENCHMARK_REGISTER_F(ServiceExecutorSynchronousBm, ScheduleTask)->ThreadRange(1, maxThreads); /** A simplified ChainedSchedule with only one task. */ BENCHMARK_DEFINE_F(ServiceExecutorSynchronousBm, ScheduleAndWait)(benchmark::State& state) { for (auto _ : state) { + auto runner = executor()->makeTaskRunner(); Notification done; - runOnExec([&](Status) { done.set(); }); + runOnExec(&*runner, [&](Status) { done.set(); }); done.get(); } } @@ -121,10 +124,17 @@ BENCHMARK_REGISTER_F(ServiceExecutorSynchronousBm, ScheduleAndWait)->ThreadRange BENCHMARK_DEFINE_F(ServiceExecutorSynchronousBm, ChainedSchedule)(benchmark::State& state) { int chainDepth = state.range(0); - Notification* donePtr = nullptr; - std::function<void(Status)> chainedTask = [&](Status) { donePtr->set(); }; + struct LoopState { + std::shared_ptr<ServiceExecutor::TaskRunner> runner; + Notification done; + unittest::Barrier startingLine{2}; + }; + LoopState* loopStatePtr = nullptr; + std::function<void(Status)> chainedTask = [&](Status) { loopStatePtr->done.set(); }; for (int step = 0; step != chainDepth; ++step) - chainedTask = [this, chainedTask](Status) { runOnExec(chainedTask); }; + chainedTask = [this, chainedTask, &loopStatePtr](Status) { + runOnExec(&*loopStatePtr->runner, chainedTask); + }; // The first scheduled task starts the worker thread. This test is // specifically measuring the per-task schedule and run overhead. So startup @@ -134,16 +144,18 @@ BENCHMARK_DEFINE_F(ServiceExecutorSynchronousBm, ChainedSchedule)(benchmark::Sta // benchmark loop resumes it. for (auto _ : state) { state.PauseTiming(); - unittest::Barrier startingLine(2); - Notification done; - donePtr = &done; - runOnExec([&](Status) { - startingLine.countDownAndWait(); - runOnExec(chainedTask); + LoopState loopState{ + executor()->makeTaskRunner(), + {}, + }; + loopStatePtr = &loopState; + runOnExec(&*loopStatePtr->runner, [&](Status s) { + loopState.startingLine.countDownAndWait(); + runOnExec(&*loopStatePtr->runner, chainedTask); }); state.ResumeTiming(); - startingLine.countDownAndWait(); - done.get(); + loopState.startingLine.countDownAndWait(); + loopState.done.get(); } } BENCHMARK_REGISTER_F(ServiceExecutorSynchronousBm, ChainedSchedule) diff --git a/src/mongo/transport/service_executor_fixed.cpp b/src/mongo/transport/service_executor_fixed.cpp index 9d3e9ddd857..c0b3a5cad18 100644 --- a/src/mongo/transport/service_executor_fixed.cpp +++ b/src/mongo/transport/service_executor_fixed.cpp @@ -381,7 +381,7 @@ void ServiceExecutorFixed::_checkForShutdown() { reactor->stop(); } -void ServiceExecutorFixed::schedule(Task task) { +void ServiceExecutorFixed::_schedule(Task task) { { auto lk = stdx::unique_lock(_mutex); if (_state != State::kRunning) { @@ -403,8 +403,8 @@ size_t ServiceExecutorFixed::getRunningThreads() const { return _stats->threadsRunning(); } -void ServiceExecutorFixed::runOnDataAvailable(const SessionHandle& session, - Task onCompletionCallback) { +void ServiceExecutorFixed::_runOnDataAvailable(const SessionHandle& session, + Task onCompletionCallback) { invariant(session); yieldIfAppropriate(); @@ -422,17 +422,19 @@ void ServiceExecutorFixed::runOnDataAvailable(const SessionHandle& session, lk.unlock(); auto anchor = shared_from_this(); - session->asyncWaitForData().thenRunOn(anchor).getAsync([this, anchor, it](Status status) { - // Remove our waiter from the list. - auto lk = stdx::unique_lock(_mutex); - auto waiter = std::exchange(*it, {}); - _waiters.erase(it); - _stats->waitersEnded.fetchAndAdd(1); - lk.unlock(); + session->asyncWaitForData() + .thenRunOn(makeTaskRunner()) + .getAsync([this, anchor, it](Status status) { + // Remove our waiter from the list. + auto lk = stdx::unique_lock(_mutex); + auto waiter = std::exchange(*it, {}); + _waiters.erase(it); + _stats->waitersEnded.fetchAndAdd(1); + lk.unlock(); - waiter.session = nullptr; - waiter.onCompletionCallback(std::move(status)); - }); + waiter.session = nullptr; + waiter.onCompletionCallback(std::move(status)); + }); } void ServiceExecutorFixed::appendStats(BSONObjBuilder* bob) const { @@ -450,4 +452,26 @@ int ServiceExecutorFixed::getRecursionDepthForExecutorThread() const { return _executorContext->getRecursionDepth(); } +auto ServiceExecutorFixed::makeTaskRunner() -> std::unique_ptr<TaskRunner> { + iassert(ErrorCodes::ShutdownInProgress, "Executor is not running", _state == State::kRunning); + + /** Schedules on this. */ + class ForwardingTaskRunner : public TaskRunner { + public: + explicit ForwardingTaskRunner(ServiceExecutorFixed* e) : _e{e} {} + + void schedule(Task task) override { + _e->_schedule(std::move(task)); + } + + void runOnDataAvailable(std::shared_ptr<Session> session, Task task) override { + _e->_runOnDataAvailable(std::move(session), std::move(task)); + } + + private: + ServiceExecutorFixed* _e; + }; + return std::make_unique<ForwardingTaskRunner>(this); +} + } // namespace mongo::transport diff --git a/src/mongo/transport/service_executor_fixed.h b/src/mongo/transport/service_executor_fixed.h index 8270e84176d..a74adc6f3b7 100644 --- a/src/mongo/transport/service_executor_fixed.h +++ b/src/mongo/transport/service_executor_fixed.h @@ -68,10 +68,6 @@ public: Status start() override; Status shutdown(Milliseconds timeout) override; - void schedule(Task task) override; - - void runOnDataAvailable(const SessionHandle& session, Task onCompletionCallback) override; - size_t getRunningThreads() const override; void appendStats(BSONObjBuilder* bob) const override; @@ -82,6 +78,8 @@ public: */ int getRecursionDepthForExecutorThread() const; + std::unique_ptr<TaskRunner> makeTaskRunner() override; + private: enum class State { kNotStarted, kRunning, kStopping, kStopped }; @@ -95,6 +93,10 @@ private: Task onCompletionCallback; }; + void _schedule(Task task); + + void _runOnDataAvailable(const SessionHandle& session, Task onCompletionCallback); + const std::string& _name() const; /** Requires `_mutex` locked. */ diff --git a/src/mongo/transport/service_executor_reserved.cpp b/src/mongo/transport/service_executor_reserved.cpp index 5d0aed19d8d..1ec4e7af75e 100644 --- a/src/mongo/transport/service_executor_reserved.cpp +++ b/src/mongo/transport/service_executor_reserved.cpp @@ -186,7 +186,7 @@ Status ServiceExecutorReserved::shutdown(Milliseconds timeout) { "reserved executor couldn't shutdown all worker threads within time limit."); } -void ServiceExecutorReserved::schedule(Task task) { +void ServiceExecutorReserved::_schedule(Task task) { if (!_stillRunning.load()) { task(Status(ErrorCodes::ShutdownInProgress, "Executor is not running")); return; @@ -228,9 +228,43 @@ void ServiceExecutorReserved::appendStats(BSONObjBuilder* bob) const { subbob.append(kClientsWaiting, statlet.waiting); } -void ServiceExecutorReserved::runOnDataAvailable(const SessionHandle& session, - Task onCompletionCallback) { - scheduleCallbackOnDataAvailable(session, std::move(onCompletionCallback), this); +/** + * Schedules task immediately, on the assumption that The task will block to + * receive the next message and we don't mind blocking on this dedicated + * worker thread. + */ +void ServiceExecutorReserved::_runOnDataAvailable(const SessionHandle& session, Task task) { + invariant(session); + _schedule([this, session, callback = std::move(task)](Status status) { + yieldIfAppropriate(); + if (!status.isOK()) { + callback(std::move(status)); + return; + } + callback(session->waitForData()); + }); +} + +auto ServiceExecutorReserved::makeTaskRunner() -> std::unique_ptr<TaskRunner> { + iassert(ErrorCodes::ShutdownInProgress, "Executor is not running", _stillRunning.load()); + + /** Schedules on this. */ + class ForwardingTaskRunner : public TaskRunner { + public: + explicit ForwardingTaskRunner(ServiceExecutorReserved* e) : _e{e} {} + + void schedule(Task task) override { + _e->_schedule(std::move(task)); + } + + void runOnDataAvailable(std::shared_ptr<Session> session, Task task) override { + _e->_runOnDataAvailable(std::move(session), std::move(task)); + } + + private: + ServiceExecutorReserved* _e; + }; + return std::make_unique<ForwardingTaskRunner>(this); } } // namespace transport diff --git a/src/mongo/transport/service_executor_reserved.h b/src/mongo/transport/service_executor_reserved.h index 76be1d45732..b9211b2f58b 100644 --- a/src/mongo/transport/service_executor_reserved.h +++ b/src/mongo/transport/service_executor_reserved.h @@ -59,19 +59,22 @@ public: Status start() override; Status shutdown(Milliseconds timeout) override; - void schedule(Task task) override; size_t getRunningThreads() const override { return _numRunningWorkerThreads.loadRelaxed(); } - void runOnDataAvailable(const SessionHandle& session, Task onCompletionCallback) override; - void appendStats(BSONObjBuilder* bob) const override; + std::unique_ptr<TaskRunner> makeTaskRunner() override; + private: Status _startWorker(); + void _schedule(Task task); + + void _runOnDataAvailable(const SessionHandle& session, Task task); + static thread_local std::deque<Task> _localWorkQueue; static thread_local int64_t _localThreadIdleCounter; diff --git a/src/mongo/transport/service_executor_synchronous.cpp b/src/mongo/transport/service_executor_synchronous.cpp index 002d44cb213..31b88b76eb6 100644 --- a/src/mongo/transport/service_executor_synchronous.cpp +++ b/src/mongo/transport/service_executor_synchronous.cpp @@ -173,7 +173,7 @@ ServiceExecutorSynchronous* ServiceExecutorSynchronous::get(ServiceContext* ctx) return ref.get(); } -void ServiceExecutorSynchronous::schedule(Task task) { +void ServiceExecutorSynchronous::_schedule(Task task) { _sharedState->schedule(std::move(task)); } @@ -191,10 +191,33 @@ void ServiceExecutorSynchronous::appendStats(BSONObjBuilder* bob) const { .append("clientsWaitingForData", 0); } -void ServiceExecutorSynchronous::runOnDataAvailable(const SessionHandle& session, Task task) { +void ServiceExecutorSynchronous::_runOnDataAvailable(const SessionHandle& session, Task task) { invariant(session); yieldIfAppropriate(); - schedule(std::move(task)); + _schedule(std::move(task)); +} + +auto ServiceExecutorSynchronous::makeTaskRunner() -> std::unique_ptr<TaskRunner> { + if (!_sharedState->isRunning()) + iassert(Status(ErrorCodes::ShutdownInProgress, "Executor is not running")); + + /** Schedules on this. */ + class ForwardingTaskRunner : public TaskRunner { + public: + explicit ForwardingTaskRunner(ServiceExecutorSynchronous* e) : _e{e} {} + + void schedule(Task task) override { + _e->_schedule(std::move(task)); + } + + void runOnDataAvailable(std::shared_ptr<Session> session, Task task) override { + _e->_runOnDataAvailable(std::move(session), std::move(task)); + } + + private: + ServiceExecutorSynchronous* _e; + }; + return std::make_unique<ForwardingTaskRunner>(this); } } // namespace mongo::transport diff --git a/src/mongo/transport/service_executor_synchronous.h b/src/mongo/transport/service_executor_synchronous.h index a8ed815dbad..89a31492be2 100644 --- a/src/mongo/transport/service_executor_synchronous.h +++ b/src/mongo/transport/service_executor_synchronous.h @@ -66,6 +66,15 @@ public: Status start() override; Status shutdown(Milliseconds timeout) override; + std::unique_ptr<TaskRunner> makeTaskRunner() override; + + size_t getRunningThreads() const override; + + void appendStats(BSONObjBuilder* bob) const override; + +private: + class SharedState; + /** * The behavior of `schedule` depends on whether the calling thread is a * worker thread spawned by a previous `schedule` call. @@ -76,16 +85,10 @@ public: * If a worker thread schedules a task, the task is pushed to the back of its * queue. The worker thread exits when the queue becomes empty. */ - void schedule(Task task) override; - - size_t getRunningThreads() const override; + void _schedule(Task task); - void runOnDataAvailable(const SessionHandle& session, Task onCompletionCallback) override; + void _runOnDataAvailable(const SessionHandle& session, Task onCompletionCallback); - void appendStats(BSONObjBuilder* bob) const override; - -private: - class SharedState; std::shared_ptr<SharedState> _sharedState; }; diff --git a/src/mongo/transport/service_executor_test.cpp b/src/mongo/transport/service_executor_test.cpp index 83c58661169..7bf605cc28a 100644 --- a/src/mongo/transport/service_executor_test.cpp +++ b/src/mongo/transport/service_executor_test.cpp @@ -32,6 +32,7 @@ #include "boost/optional.hpp" #include <algorithm> +#include <asio.hpp> #include "mongo/bson/bsonobjbuilder.h" #include "mongo/db/service_context.h" @@ -43,8 +44,10 @@ #include "mongo/transport/service_executor_synchronous.h" #include "mongo/transport/transport_layer.h" #include "mongo/transport/transport_layer_mock.h" +#include "mongo/unittest/assert_that.h" #include "mongo/unittest/barrier.h" #include "mongo/unittest/death_test.h" +#include "mongo/unittest/matcher.h" #include "mongo/unittest/thread_assertion_monitor.h" #include "mongo/unittest/unittest.h" #include "mongo/util/concurrency/thread_pool.h" @@ -52,14 +55,14 @@ #include "mongo/util/future.h" #include "mongo/util/scopeguard.h" -#include <asio.hpp> - #define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kTest namespace mongo::transport { namespace { +namespace m = unittest::match; + constexpr auto kWorkerThreadRunTime = Milliseconds{1000}; // Run time + generous scheduling time slice constexpr auto kShutdownTime = Milliseconds{kWorkerThreadRunTime.count() + 50}; @@ -154,16 +157,15 @@ TEST_F(ServiceExecutorSynchronousTest, BasicTaskRuns) { ServiceExecutorSynchronous executor(getGlobalServiceContext()); ASSERT_OK(executor.start()); PromiseAndFuture<void> pf; - executor.schedule([&](Status st) { pf.promise.setFrom(st); }); + auto runner = executor.makeTaskRunner(); + runner->schedule([&](Status st) { pf.promise.setFrom(st); }); ASSERT_DOES_NOT_THROW(pf.future.get()); ASSERT_OK(executor.shutdown(kShutdownTime)); } -TEST_F(ServiceExecutorSynchronousTest, ScheduleFailsBeforeStartup) { - ServiceExecutorSynchronous executor(getGlobalServiceContext()); - PromiseAndFuture<void> pf; - executor.schedule([&](Status s) { pf.promise.setFrom(s); }); - ASSERT_THROWS(pf.future.get(), DBException); +TEST_F(ServiceExecutorSynchronousTest, MakeTaskRunnerFailsBeforeStartup) { + ServiceExecutorSynchronous executor{getGlobalServiceContext()}; + ASSERT_THROWS(executor.makeTaskRunner(), DBException); } class ServiceExecutorFixedTest : public unittest::Test { @@ -202,18 +204,17 @@ public: }; }; -TEST_F(ServiceExecutorFixedTest, ScheduleFailsBeforeStartup) { +TEST_F(ServiceExecutorFixedTest, MakeTaskRunnerFailsBeforeStartup) { Handle handle; - PromiseAndFuture<void> pf; - handle->schedule([&](Status s) { pf.promise.setFrom(s); }); - ASSERT_THROWS(pf.future.get(), DBException); + ASSERT_THROWS(handle->makeTaskRunner(), DBException); } TEST_F(ServiceExecutorFixedTest, BasicTaskRuns) { Handle handle; handle.start(); + auto runner = handle->makeTaskRunner(); PromiseAndFuture<void> pf; - handle->schedule([&](Status s) { pf.promise.setFrom(s); }); + runner->schedule([&](Status s) { pf.promise.setFrom(s); }); ASSERT_DOES_NOT_THROW(pf.future.get()); } @@ -221,9 +222,9 @@ TEST_F(ServiceExecutorFixedTest, ShutdownTimeLimit) { unittest::Barrier mayReturn(2); Handle handle; handle.start(); - + auto runner = handle->makeTaskRunner(); PromiseAndFuture<void> pf; - handle->schedule([&](Status st) { + runner->schedule([&](Status st) { pf.promise.setFrom(st); mayReturn.countDownAndWait(); }); @@ -239,10 +240,11 @@ TEST_F(ServiceExecutorFixedTest, ScheduleSucceedsBeforeShutdown) { PromiseAndFuture<void> pf; Handle handle; handle.start(); + auto runner = handle->makeTaskRunner(); // The executor accepts the work, but hasn't used the underlying pool yet. - JoinThread scheduleClient{[&] { handle->schedule([&](Status s) { pf.promise.setFrom(s); }); }}; - (*failpoint)->waitForTimesEntered(1); + JoinThread scheduleClient{[&] { runner->schedule([&](Status s) { pf.promise.setFrom(s); }); }}; + (*failpoint)->waitForTimesEntered(failpoint->initialTimesEntered() + 1); // Trigger an immediate shutdown which will not affect the task we have accepted. ASSERT_NOT_OK(handle->shutdown(Milliseconds{0})); @@ -258,10 +260,10 @@ TEST_F(ServiceExecutorFixedTest, ScheduleSucceedsBeforeShutdown) { TEST_F(ServiceExecutorFixedTest, ScheduleFailsAfterShutdown) { Handle handle; handle.start(); - + auto runner = handle->makeTaskRunner(); ASSERT_OK(handle->shutdown(kShutdownTime)); PromiseAndFuture<void> pf; - handle->schedule([&](Status s) { pf.promise.setFrom(s); }); + runner->schedule([&](Status s) { pf.promise.setFrom(s); }); ASSERT_THROWS(pf.future.get(), ExceptionFor<ErrorCodes::ServiceExecutorInShutdown>); } @@ -274,12 +276,13 @@ TEST_F(ServiceExecutorFixedTest, RunTaskAfterWaitingForData) { Handle handle; handle.start(); + auto runner = handle->makeTaskRunner(); const auto signallingThreadId = stdx::this_thread::get_id(); AtomicWord<bool> ranOnDataAvailable{false}; - handle->runOnDataAvailable(session, [&](Status) { + runner->runOnDataAvailable(session, [&](Status) { ranOnDataAvailable.store(true); ASSERT(stdx::this_thread::get_id() != signallingThreadId); barrier.countDownAndWait(); @@ -303,7 +306,7 @@ TEST_F(ServiceExecutorFixedTest, StartAndShutdownAreDeterministic) { { FailPointEnableBlock failpoint("hangAfterServiceExecutorFixedExecutorThreadsStart"); handle.start(); - failpoint->waitForTimesEntered(kExecutorThreads); + failpoint->waitForTimesEntered(failpoint.initialTimesEntered() + kExecutorThreads); } // Since destroying ServiceExecutorFixed is blocking, spawn a thread to issue the @@ -315,7 +318,7 @@ TEST_F(ServiceExecutorFixedTest, StartAndShutdownAreDeterministic) { FailPointEnableBlock failpoint( "hangBeforeServiceExecutorFixedLastExecutorThreadReturns"); shutdownThread = monitor.spawn([&] { handle.join(); }); - failpoint->waitForTimesEntered(1); + failpoint->waitForTimesEntered(failpoint.initialTimesEntered() + 1); } shutdownThread.join(); }); diff --git a/src/mongo/transport/service_executor_utils.cpp b/src/mongo/transport/service_executor_utils.cpp index 96b03bdf989..317e4261598 100644 --- a/src/mongo/transport/service_executor_utils.cpp +++ b/src/mongo/transport/service_executor_utils.cpp @@ -157,20 +157,4 @@ Status launchServiceWorkerThread(unique_function<void()> task) { return Status::OK(); } -void scheduleCallbackOnDataAvailable(const SessionHandle& session, - unique_function<void(Status)> callback, - ServiceExecutor* executor) { - invariant(session); - executor->schedule([session, callback = std::move(callback), executor](Status status) { - executor->yieldIfAppropriate(); - - if (!status.isOK()) { - callback(std::move(status)); - return; - } - - callback(session->waitForData()); - }); -} - } // namespace mongo::transport diff --git a/src/mongo/transport/service_executor_utils.h b/src/mongo/transport/service_executor_utils.h index 057e0a1686d..3874a6ce653 100644 --- a/src/mongo/transport/service_executor_utils.h +++ b/src/mongo/transport/service_executor_utils.h @@ -37,13 +37,4 @@ namespace mongo::transport { Status launchServiceWorkerThread(unique_function<void()> task); -/** - * The default implementation for "ServiceExecutor::runOnDataAvailable()", which blocks the caller - * thread until data is available for reading. On success, it schedules "callback" on "executor". - * Other implementations (e.g., "ServiceExecutorFixed") may provide asynchronous variants. - */ -void scheduleCallbackOnDataAvailable(const SessionHandle& session, - unique_function<void(Status)> callback, - ServiceExecutor* executor); - } // namespace mongo::transport diff --git a/src/mongo/transport/session_workflow.cpp b/src/mongo/transport/session_workflow.cpp index b96d5a430d0..d38759fcc06 100644 --- a/src/mongo/transport/session_workflow.cpp +++ b/src/mongo/transport/session_workflow.cpp @@ -387,6 +387,14 @@ public: return ServiceExecutorContext::get(client())->getServiceExecutor(); } + std::shared_ptr<ServiceExecutor::TaskRunner> taskRunner() { + auto exec = executor(); + // Allows switching the executor between iterations of the workflow. + if (MONGO_unlikely(!_taskRunner.source || _taskRunner.source != exec)) + _taskRunner = {exec->makeTaskRunner(), exec}; + return _taskRunner.runner; + } + bool isTLS() const { #ifdef MONGO_CONFIG_SSL return SSLPeerInfo::forSession(session()).isTLS; @@ -398,6 +406,11 @@ public: private: class WorkItem; + struct RunnerAndSource { + std::shared_ptr<ServiceExecutor::TaskRunner> runner; + ServiceExecutor* source = nullptr; + }; + /** Alias: refers to this Impl, but holds a ref to the enclosing workflow. */ std::shared_ptr<Impl> shared_from_this() { return {_workflow->shared_from_this(), this}; @@ -406,6 +419,7 @@ private: SessionWorkflow* const _workflow; ServiceContext* const _serviceContext; ServiceEntryPoint* const _sep; + RunnerAndSource _taskRunner; AtomicWord<bool> _isTerminated{false}; ClientStrandPtr _clientStrand; @@ -640,9 +654,9 @@ void SessionWorkflow::Impl::scheduleNewLoop(Status status) try { // Start our loop again with a new stack. if (_nextWork) { // If we're in exhaust, we're not expecting more data. - executor()->schedule(std::move(cb)); + taskRunner()->schedule(std::move(cb)); } else { - executor()->runOnDataAvailable(session(), std::move(cb)); + taskRunner()->runOnDataAvailable(session(), std::move(cb)); } } catch (const DBException& ex) { LOGV2_WARNING_OPTIONS(22993, @@ -720,6 +734,7 @@ void SessionWorkflow::Impl::cleanupExhaustResources() { void SessionWorkflow::Impl::cleanupSession(const Status& status) { LOGV2_DEBUG(5127900, 2, "Ending session", "error"_attr = status); + _taskRunner = {}; cleanupExhaustResources(); _sep->onClientDisconnect(client()); } |