summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBilly Donahue <billy.donahue@mongodb.com>2022-10-27 18:46:06 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2022-10-27 19:47:47 +0000
commit7b9c8c6a22e38fd34ca313bb0637615f3075f720 (patch)
treee22c17190689b8bb22b54741aae30196273394de
parent3fd2cc553a27067f4662a0708cd2e98cd122dffb (diff)
downloadmongo-7b9c8c6a22e38fd34ca313bb0637615f3075f720.tar.gz
SERVER-70863 ServiceExecutor::TaskRunner
-rw-r--r--src/mongo/transport/mock_service_executor.h21
-rw-r--r--src/mongo/transport/service_executor.h38
-rw-r--r--src/mongo/transport/service_executor_bm.cpp44
-rw-r--r--src/mongo/transport/service_executor_fixed.cpp50
-rw-r--r--src/mongo/transport/service_executor_fixed.h10
-rw-r--r--src/mongo/transport/service_executor_reserved.cpp42
-rw-r--r--src/mongo/transport/service_executor_reserved.h9
-rw-r--r--src/mongo/transport/service_executor_synchronous.cpp29
-rw-r--r--src/mongo/transport/service_executor_synchronous.h19
-rw-r--r--src/mongo/transport/service_executor_test.cpp47
-rw-r--r--src/mongo/transport/service_executor_utils.cpp16
-rw-r--r--src/mongo/transport/service_executor_utils.h9
-rw-r--r--src/mongo/transport/session_workflow.cpp19
13 files changed, 229 insertions, 124 deletions
diff --git a/src/mongo/transport/mock_service_executor.h b/src/mongo/transport/mock_service_executor.h
index 16928d8bfef..4b4eaf752a1 100644
--- a/src/mongo/transport/mock_service_executor.h
+++ b/src/mongo/transport/mock_service_executor.h
@@ -42,12 +42,23 @@ public:
return startCb();
}
- void schedule(Task task) override {
- return scheduleTaskCb(std::move(task));
- }
+ std::unique_ptr<TaskRunner> makeTaskRunner() override {
+ class Runner : public TaskRunner {
+ public:
+ explicit Runner(MockServiceExecutor* p) : _p{p} {}
+
+ void schedule(Task task) override {
+ return _p->scheduleTaskCb(std::move(task));
+ }
+
+ void runOnDataAvailable(SessionHandle session, Task onCompletionCallback) override {
+ _p->runOnDataAvailableCb(session, std::move(onCompletionCallback));
+ }
- void runOnDataAvailable(const SessionHandle& session, Task onCompletionCallback) override {
- runOnDataAvailableCb(session, std::move(onCompletionCallback));
+ private:
+ MockServiceExecutor* _p;
+ };
+ return std::make_unique<Runner>(this);
}
Status shutdown(Milliseconds timeout) override {
diff --git a/src/mongo/transport/service_executor.h b/src/mongo/transport/service_executor.h
index a1bac34e93c..d6b53453ba6 100644
--- a/src/mongo/transport/service_executor.h
+++ b/src/mongo/transport/service_executor.h
@@ -42,33 +42,39 @@
#include "mongo/util/functional.h"
#include "mongo/util/out_of_line_executor.h"
-namespace mongo {
-namespace transport {
+namespace mongo::transport {
extern bool gInitialUseDedicatedThread;
/*
* This is the interface for all ServiceExecutors.
*/
-class ServiceExecutor : public OutOfLineExecutor {
+class ServiceExecutor {
public:
+ using Task = OutOfLineExecutor::Task;
+
+ class TaskRunner : public OutOfLineExecutor {
+ public:
+ /**
+ * Awaits the availability of incoming data for the specified session. On success, it will
+ * schedule the callback on current executor. Otherwise, it will invoke the callback with a
+ * non-okay status on the caller thread.
+ */
+ virtual void runOnDataAvailable(std::shared_ptr<Session> session, Task task) = 0;
+ };
+
static void shutdownAll(ServiceContext* serviceContext, Date_t deadline);
virtual ~ServiceExecutor() = default;
+ virtual std::unique_ptr<TaskRunner> makeTaskRunner() = 0;
+
/*
* Starts the ServiceExecutor. This may create threads even if no tasks are scheduled.
*/
virtual Status start() = 0;
/*
- * Awaits the availability of incoming data for the specified session. On success, it will
- * schedule the callback on current executor. Otherwise, it will invoke the callback with a
- * non-okay status on the caller thread.
- */
- virtual void runOnDataAvailable(const SessionHandle& session, Task onCompletionCallback) = 0;
-
- /*
* Stops and joins the ServiceExecutor. Any outstanding tasks will not be executed, and any
* associated callbacks waiting on I/O may get called with an error code.
*
@@ -78,14 +84,10 @@ public:
virtual size_t getRunningThreads() const = 0;
- /*
- * Appends statistics about task scheduling to a BSONObjBuilder for serverStatus output.
- */
+ /** Appends statistics about task scheduling to a BSONObjBuilder for serverStatus output. */
virtual void appendStats(BSONObjBuilder* bob) const = 0;
- /**
- * Yield if we have more threads than cores.
- */
+ /** Yield if this executor controls more threads than we have cores. */
void yieldIfAppropriate() const;
};
@@ -191,6 +193,4 @@ public:
size_t limitExempt = 0;
};
-} // namespace transport
-
-} // namespace mongo
+} // namespace mongo::transport
diff --git a/src/mongo/transport/service_executor_bm.cpp b/src/mongo/transport/service_executor_bm.cpp
index 529e3e36200..b9e5b7e30dd 100644
--- a/src/mongo/transport/service_executor_bm.cpp
+++ b/src/mongo/transport/service_executor_bm.cpp
@@ -91,8 +91,8 @@ public:
lastTearDown();
}
- void runOnExec(ServiceExecutor::Task task) {
- executor()->schedule(std::move(task));
+ void runOnExec(ServiceExecutor::TaskRunner* taskRunner, ServiceExecutor::Task task) {
+ taskRunner->schedule(std::move(task));
}
stdx::mutex mu; // NOLINT
@@ -104,16 +104,19 @@ public:
auto maxThreads = 2 * ProcessInfo::getNumCores();
BENCHMARK_DEFINE_F(ServiceExecutorSynchronousBm, ScheduleTask)(benchmark::State& state) {
- for (auto _ : state)
- runOnExec([](Status) {});
+ for (auto _ : state) {
+ auto runner = executor()->makeTaskRunner();
+ runOnExec(&*runner, [](Status) {});
+ }
}
BENCHMARK_REGISTER_F(ServiceExecutorSynchronousBm, ScheduleTask)->ThreadRange(1, maxThreads);
/** A simplified ChainedSchedule with only one task. */
BENCHMARK_DEFINE_F(ServiceExecutorSynchronousBm, ScheduleAndWait)(benchmark::State& state) {
for (auto _ : state) {
+ auto runner = executor()->makeTaskRunner();
Notification done;
- runOnExec([&](Status) { done.set(); });
+ runOnExec(&*runner, [&](Status) { done.set(); });
done.get();
}
}
@@ -121,10 +124,17 @@ BENCHMARK_REGISTER_F(ServiceExecutorSynchronousBm, ScheduleAndWait)->ThreadRange
BENCHMARK_DEFINE_F(ServiceExecutorSynchronousBm, ChainedSchedule)(benchmark::State& state) {
int chainDepth = state.range(0);
- Notification* donePtr = nullptr;
- std::function<void(Status)> chainedTask = [&](Status) { donePtr->set(); };
+ struct LoopState {
+ std::shared_ptr<ServiceExecutor::TaskRunner> runner;
+ Notification done;
+ unittest::Barrier startingLine{2};
+ };
+ LoopState* loopStatePtr = nullptr;
+ std::function<void(Status)> chainedTask = [&](Status) { loopStatePtr->done.set(); };
for (int step = 0; step != chainDepth; ++step)
- chainedTask = [this, chainedTask](Status) { runOnExec(chainedTask); };
+ chainedTask = [this, chainedTask, &loopStatePtr](Status) {
+ runOnExec(&*loopStatePtr->runner, chainedTask);
+ };
// The first scheduled task starts the worker thread. This test is
// specifically measuring the per-task schedule and run overhead. So startup
@@ -134,16 +144,18 @@ BENCHMARK_DEFINE_F(ServiceExecutorSynchronousBm, ChainedSchedule)(benchmark::Sta
// benchmark loop resumes it.
for (auto _ : state) {
state.PauseTiming();
- unittest::Barrier startingLine(2);
- Notification done;
- donePtr = &done;
- runOnExec([&](Status) {
- startingLine.countDownAndWait();
- runOnExec(chainedTask);
+ LoopState loopState{
+ executor()->makeTaskRunner(),
+ {},
+ };
+ loopStatePtr = &loopState;
+ runOnExec(&*loopStatePtr->runner, [&](Status s) {
+ loopState.startingLine.countDownAndWait();
+ runOnExec(&*loopStatePtr->runner, chainedTask);
});
state.ResumeTiming();
- startingLine.countDownAndWait();
- done.get();
+ loopState.startingLine.countDownAndWait();
+ loopState.done.get();
}
}
BENCHMARK_REGISTER_F(ServiceExecutorSynchronousBm, ChainedSchedule)
diff --git a/src/mongo/transport/service_executor_fixed.cpp b/src/mongo/transport/service_executor_fixed.cpp
index 9d3e9ddd857..c0b3a5cad18 100644
--- a/src/mongo/transport/service_executor_fixed.cpp
+++ b/src/mongo/transport/service_executor_fixed.cpp
@@ -381,7 +381,7 @@ void ServiceExecutorFixed::_checkForShutdown() {
reactor->stop();
}
-void ServiceExecutorFixed::schedule(Task task) {
+void ServiceExecutorFixed::_schedule(Task task) {
{
auto lk = stdx::unique_lock(_mutex);
if (_state != State::kRunning) {
@@ -403,8 +403,8 @@ size_t ServiceExecutorFixed::getRunningThreads() const {
return _stats->threadsRunning();
}
-void ServiceExecutorFixed::runOnDataAvailable(const SessionHandle& session,
- Task onCompletionCallback) {
+void ServiceExecutorFixed::_runOnDataAvailable(const SessionHandle& session,
+ Task onCompletionCallback) {
invariant(session);
yieldIfAppropriate();
@@ -422,17 +422,19 @@ void ServiceExecutorFixed::runOnDataAvailable(const SessionHandle& session,
lk.unlock();
auto anchor = shared_from_this();
- session->asyncWaitForData().thenRunOn(anchor).getAsync([this, anchor, it](Status status) {
- // Remove our waiter from the list.
- auto lk = stdx::unique_lock(_mutex);
- auto waiter = std::exchange(*it, {});
- _waiters.erase(it);
- _stats->waitersEnded.fetchAndAdd(1);
- lk.unlock();
+ session->asyncWaitForData()
+ .thenRunOn(makeTaskRunner())
+ .getAsync([this, anchor, it](Status status) {
+ // Remove our waiter from the list.
+ auto lk = stdx::unique_lock(_mutex);
+ auto waiter = std::exchange(*it, {});
+ _waiters.erase(it);
+ _stats->waitersEnded.fetchAndAdd(1);
+ lk.unlock();
- waiter.session = nullptr;
- waiter.onCompletionCallback(std::move(status));
- });
+ waiter.session = nullptr;
+ waiter.onCompletionCallback(std::move(status));
+ });
}
void ServiceExecutorFixed::appendStats(BSONObjBuilder* bob) const {
@@ -450,4 +452,26 @@ int ServiceExecutorFixed::getRecursionDepthForExecutorThread() const {
return _executorContext->getRecursionDepth();
}
+auto ServiceExecutorFixed::makeTaskRunner() -> std::unique_ptr<TaskRunner> {
+ iassert(ErrorCodes::ShutdownInProgress, "Executor is not running", _state == State::kRunning);
+
+ /** Schedules on this. */
+ class ForwardingTaskRunner : public TaskRunner {
+ public:
+ explicit ForwardingTaskRunner(ServiceExecutorFixed* e) : _e{e} {}
+
+ void schedule(Task task) override {
+ _e->_schedule(std::move(task));
+ }
+
+ void runOnDataAvailable(std::shared_ptr<Session> session, Task task) override {
+ _e->_runOnDataAvailable(std::move(session), std::move(task));
+ }
+
+ private:
+ ServiceExecutorFixed* _e;
+ };
+ return std::make_unique<ForwardingTaskRunner>(this);
+}
+
} // namespace mongo::transport
diff --git a/src/mongo/transport/service_executor_fixed.h b/src/mongo/transport/service_executor_fixed.h
index 8270e84176d..a74adc6f3b7 100644
--- a/src/mongo/transport/service_executor_fixed.h
+++ b/src/mongo/transport/service_executor_fixed.h
@@ -68,10 +68,6 @@ public:
Status start() override;
Status shutdown(Milliseconds timeout) override;
- void schedule(Task task) override;
-
- void runOnDataAvailable(const SessionHandle& session, Task onCompletionCallback) override;
-
size_t getRunningThreads() const override;
void appendStats(BSONObjBuilder* bob) const override;
@@ -82,6 +78,8 @@ public:
*/
int getRecursionDepthForExecutorThread() const;
+ std::unique_ptr<TaskRunner> makeTaskRunner() override;
+
private:
enum class State { kNotStarted, kRunning, kStopping, kStopped };
@@ -95,6 +93,10 @@ private:
Task onCompletionCallback;
};
+ void _schedule(Task task);
+
+ void _runOnDataAvailable(const SessionHandle& session, Task onCompletionCallback);
+
const std::string& _name() const;
/** Requires `_mutex` locked. */
diff --git a/src/mongo/transport/service_executor_reserved.cpp b/src/mongo/transport/service_executor_reserved.cpp
index 5d0aed19d8d..1ec4e7af75e 100644
--- a/src/mongo/transport/service_executor_reserved.cpp
+++ b/src/mongo/transport/service_executor_reserved.cpp
@@ -186,7 +186,7 @@ Status ServiceExecutorReserved::shutdown(Milliseconds timeout) {
"reserved executor couldn't shutdown all worker threads within time limit.");
}
-void ServiceExecutorReserved::schedule(Task task) {
+void ServiceExecutorReserved::_schedule(Task task) {
if (!_stillRunning.load()) {
task(Status(ErrorCodes::ShutdownInProgress, "Executor is not running"));
return;
@@ -228,9 +228,43 @@ void ServiceExecutorReserved::appendStats(BSONObjBuilder* bob) const {
subbob.append(kClientsWaiting, statlet.waiting);
}
-void ServiceExecutorReserved::runOnDataAvailable(const SessionHandle& session,
- Task onCompletionCallback) {
- scheduleCallbackOnDataAvailable(session, std::move(onCompletionCallback), this);
+/**
+ * Schedules task immediately, on the assumption that The task will block to
+ * receive the next message and we don't mind blocking on this dedicated
+ * worker thread.
+ */
+void ServiceExecutorReserved::_runOnDataAvailable(const SessionHandle& session, Task task) {
+ invariant(session);
+ _schedule([this, session, callback = std::move(task)](Status status) {
+ yieldIfAppropriate();
+ if (!status.isOK()) {
+ callback(std::move(status));
+ return;
+ }
+ callback(session->waitForData());
+ });
+}
+
+auto ServiceExecutorReserved::makeTaskRunner() -> std::unique_ptr<TaskRunner> {
+ iassert(ErrorCodes::ShutdownInProgress, "Executor is not running", _stillRunning.load());
+
+ /** Schedules on this. */
+ class ForwardingTaskRunner : public TaskRunner {
+ public:
+ explicit ForwardingTaskRunner(ServiceExecutorReserved* e) : _e{e} {}
+
+ void schedule(Task task) override {
+ _e->_schedule(std::move(task));
+ }
+
+ void runOnDataAvailable(std::shared_ptr<Session> session, Task task) override {
+ _e->_runOnDataAvailable(std::move(session), std::move(task));
+ }
+
+ private:
+ ServiceExecutorReserved* _e;
+ };
+ return std::make_unique<ForwardingTaskRunner>(this);
}
} // namespace transport
diff --git a/src/mongo/transport/service_executor_reserved.h b/src/mongo/transport/service_executor_reserved.h
index 76be1d45732..b9211b2f58b 100644
--- a/src/mongo/transport/service_executor_reserved.h
+++ b/src/mongo/transport/service_executor_reserved.h
@@ -59,19 +59,22 @@ public:
Status start() override;
Status shutdown(Milliseconds timeout) override;
- void schedule(Task task) override;
size_t getRunningThreads() const override {
return _numRunningWorkerThreads.loadRelaxed();
}
- void runOnDataAvailable(const SessionHandle& session, Task onCompletionCallback) override;
-
void appendStats(BSONObjBuilder* bob) const override;
+ std::unique_ptr<TaskRunner> makeTaskRunner() override;
+
private:
Status _startWorker();
+ void _schedule(Task task);
+
+ void _runOnDataAvailable(const SessionHandle& session, Task task);
+
static thread_local std::deque<Task> _localWorkQueue;
static thread_local int64_t _localThreadIdleCounter;
diff --git a/src/mongo/transport/service_executor_synchronous.cpp b/src/mongo/transport/service_executor_synchronous.cpp
index 002d44cb213..31b88b76eb6 100644
--- a/src/mongo/transport/service_executor_synchronous.cpp
+++ b/src/mongo/transport/service_executor_synchronous.cpp
@@ -173,7 +173,7 @@ ServiceExecutorSynchronous* ServiceExecutorSynchronous::get(ServiceContext* ctx)
return ref.get();
}
-void ServiceExecutorSynchronous::schedule(Task task) {
+void ServiceExecutorSynchronous::_schedule(Task task) {
_sharedState->schedule(std::move(task));
}
@@ -191,10 +191,33 @@ void ServiceExecutorSynchronous::appendStats(BSONObjBuilder* bob) const {
.append("clientsWaitingForData", 0);
}
-void ServiceExecutorSynchronous::runOnDataAvailable(const SessionHandle& session, Task task) {
+void ServiceExecutorSynchronous::_runOnDataAvailable(const SessionHandle& session, Task task) {
invariant(session);
yieldIfAppropriate();
- schedule(std::move(task));
+ _schedule(std::move(task));
+}
+
+auto ServiceExecutorSynchronous::makeTaskRunner() -> std::unique_ptr<TaskRunner> {
+ if (!_sharedState->isRunning())
+ iassert(Status(ErrorCodes::ShutdownInProgress, "Executor is not running"));
+
+ /** Schedules on this. */
+ class ForwardingTaskRunner : public TaskRunner {
+ public:
+ explicit ForwardingTaskRunner(ServiceExecutorSynchronous* e) : _e{e} {}
+
+ void schedule(Task task) override {
+ _e->_schedule(std::move(task));
+ }
+
+ void runOnDataAvailable(std::shared_ptr<Session> session, Task task) override {
+ _e->_runOnDataAvailable(std::move(session), std::move(task));
+ }
+
+ private:
+ ServiceExecutorSynchronous* _e;
+ };
+ return std::make_unique<ForwardingTaskRunner>(this);
}
} // namespace mongo::transport
diff --git a/src/mongo/transport/service_executor_synchronous.h b/src/mongo/transport/service_executor_synchronous.h
index a8ed815dbad..89a31492be2 100644
--- a/src/mongo/transport/service_executor_synchronous.h
+++ b/src/mongo/transport/service_executor_synchronous.h
@@ -66,6 +66,15 @@ public:
Status start() override;
Status shutdown(Milliseconds timeout) override;
+ std::unique_ptr<TaskRunner> makeTaskRunner() override;
+
+ size_t getRunningThreads() const override;
+
+ void appendStats(BSONObjBuilder* bob) const override;
+
+private:
+ class SharedState;
+
/**
* The behavior of `schedule` depends on whether the calling thread is a
* worker thread spawned by a previous `schedule` call.
@@ -76,16 +85,10 @@ public:
* If a worker thread schedules a task, the task is pushed to the back of its
* queue. The worker thread exits when the queue becomes empty.
*/
- void schedule(Task task) override;
-
- size_t getRunningThreads() const override;
+ void _schedule(Task task);
- void runOnDataAvailable(const SessionHandle& session, Task onCompletionCallback) override;
+ void _runOnDataAvailable(const SessionHandle& session, Task onCompletionCallback);
- void appendStats(BSONObjBuilder* bob) const override;
-
-private:
- class SharedState;
std::shared_ptr<SharedState> _sharedState;
};
diff --git a/src/mongo/transport/service_executor_test.cpp b/src/mongo/transport/service_executor_test.cpp
index 83c58661169..7bf605cc28a 100644
--- a/src/mongo/transport/service_executor_test.cpp
+++ b/src/mongo/transport/service_executor_test.cpp
@@ -32,6 +32,7 @@
#include "boost/optional.hpp"
#include <algorithm>
+#include <asio.hpp>
#include "mongo/bson/bsonobjbuilder.h"
#include "mongo/db/service_context.h"
@@ -43,8 +44,10 @@
#include "mongo/transport/service_executor_synchronous.h"
#include "mongo/transport/transport_layer.h"
#include "mongo/transport/transport_layer_mock.h"
+#include "mongo/unittest/assert_that.h"
#include "mongo/unittest/barrier.h"
#include "mongo/unittest/death_test.h"
+#include "mongo/unittest/matcher.h"
#include "mongo/unittest/thread_assertion_monitor.h"
#include "mongo/unittest/unittest.h"
#include "mongo/util/concurrency/thread_pool.h"
@@ -52,14 +55,14 @@
#include "mongo/util/future.h"
#include "mongo/util/scopeguard.h"
-#include <asio.hpp>
-
#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kTest
namespace mongo::transport {
namespace {
+namespace m = unittest::match;
+
constexpr auto kWorkerThreadRunTime = Milliseconds{1000};
// Run time + generous scheduling time slice
constexpr auto kShutdownTime = Milliseconds{kWorkerThreadRunTime.count() + 50};
@@ -154,16 +157,15 @@ TEST_F(ServiceExecutorSynchronousTest, BasicTaskRuns) {
ServiceExecutorSynchronous executor(getGlobalServiceContext());
ASSERT_OK(executor.start());
PromiseAndFuture<void> pf;
- executor.schedule([&](Status st) { pf.promise.setFrom(st); });
+ auto runner = executor.makeTaskRunner();
+ runner->schedule([&](Status st) { pf.promise.setFrom(st); });
ASSERT_DOES_NOT_THROW(pf.future.get());
ASSERT_OK(executor.shutdown(kShutdownTime));
}
-TEST_F(ServiceExecutorSynchronousTest, ScheduleFailsBeforeStartup) {
- ServiceExecutorSynchronous executor(getGlobalServiceContext());
- PromiseAndFuture<void> pf;
- executor.schedule([&](Status s) { pf.promise.setFrom(s); });
- ASSERT_THROWS(pf.future.get(), DBException);
+TEST_F(ServiceExecutorSynchronousTest, MakeTaskRunnerFailsBeforeStartup) {
+ ServiceExecutorSynchronous executor{getGlobalServiceContext()};
+ ASSERT_THROWS(executor.makeTaskRunner(), DBException);
}
class ServiceExecutorFixedTest : public unittest::Test {
@@ -202,18 +204,17 @@ public:
};
};
-TEST_F(ServiceExecutorFixedTest, ScheduleFailsBeforeStartup) {
+TEST_F(ServiceExecutorFixedTest, MakeTaskRunnerFailsBeforeStartup) {
Handle handle;
- PromiseAndFuture<void> pf;
- handle->schedule([&](Status s) { pf.promise.setFrom(s); });
- ASSERT_THROWS(pf.future.get(), DBException);
+ ASSERT_THROWS(handle->makeTaskRunner(), DBException);
}
TEST_F(ServiceExecutorFixedTest, BasicTaskRuns) {
Handle handle;
handle.start();
+ auto runner = handle->makeTaskRunner();
PromiseAndFuture<void> pf;
- handle->schedule([&](Status s) { pf.promise.setFrom(s); });
+ runner->schedule([&](Status s) { pf.promise.setFrom(s); });
ASSERT_DOES_NOT_THROW(pf.future.get());
}
@@ -221,9 +222,9 @@ TEST_F(ServiceExecutorFixedTest, ShutdownTimeLimit) {
unittest::Barrier mayReturn(2);
Handle handle;
handle.start();
-
+ auto runner = handle->makeTaskRunner();
PromiseAndFuture<void> pf;
- handle->schedule([&](Status st) {
+ runner->schedule([&](Status st) {
pf.promise.setFrom(st);
mayReturn.countDownAndWait();
});
@@ -239,10 +240,11 @@ TEST_F(ServiceExecutorFixedTest, ScheduleSucceedsBeforeShutdown) {
PromiseAndFuture<void> pf;
Handle handle;
handle.start();
+ auto runner = handle->makeTaskRunner();
// The executor accepts the work, but hasn't used the underlying pool yet.
- JoinThread scheduleClient{[&] { handle->schedule([&](Status s) { pf.promise.setFrom(s); }); }};
- (*failpoint)->waitForTimesEntered(1);
+ JoinThread scheduleClient{[&] { runner->schedule([&](Status s) { pf.promise.setFrom(s); }); }};
+ (*failpoint)->waitForTimesEntered(failpoint->initialTimesEntered() + 1);
// Trigger an immediate shutdown which will not affect the task we have accepted.
ASSERT_NOT_OK(handle->shutdown(Milliseconds{0}));
@@ -258,10 +260,10 @@ TEST_F(ServiceExecutorFixedTest, ScheduleSucceedsBeforeShutdown) {
TEST_F(ServiceExecutorFixedTest, ScheduleFailsAfterShutdown) {
Handle handle;
handle.start();
-
+ auto runner = handle->makeTaskRunner();
ASSERT_OK(handle->shutdown(kShutdownTime));
PromiseAndFuture<void> pf;
- handle->schedule([&](Status s) { pf.promise.setFrom(s); });
+ runner->schedule([&](Status s) { pf.promise.setFrom(s); });
ASSERT_THROWS(pf.future.get(), ExceptionFor<ErrorCodes::ServiceExecutorInShutdown>);
}
@@ -274,12 +276,13 @@ TEST_F(ServiceExecutorFixedTest, RunTaskAfterWaitingForData) {
Handle handle;
handle.start();
+ auto runner = handle->makeTaskRunner();
const auto signallingThreadId = stdx::this_thread::get_id();
AtomicWord<bool> ranOnDataAvailable{false};
- handle->runOnDataAvailable(session, [&](Status) {
+ runner->runOnDataAvailable(session, [&](Status) {
ranOnDataAvailable.store(true);
ASSERT(stdx::this_thread::get_id() != signallingThreadId);
barrier.countDownAndWait();
@@ -303,7 +306,7 @@ TEST_F(ServiceExecutorFixedTest, StartAndShutdownAreDeterministic) {
{
FailPointEnableBlock failpoint("hangAfterServiceExecutorFixedExecutorThreadsStart");
handle.start();
- failpoint->waitForTimesEntered(kExecutorThreads);
+ failpoint->waitForTimesEntered(failpoint.initialTimesEntered() + kExecutorThreads);
}
// Since destroying ServiceExecutorFixed is blocking, spawn a thread to issue the
@@ -315,7 +318,7 @@ TEST_F(ServiceExecutorFixedTest, StartAndShutdownAreDeterministic) {
FailPointEnableBlock failpoint(
"hangBeforeServiceExecutorFixedLastExecutorThreadReturns");
shutdownThread = monitor.spawn([&] { handle.join(); });
- failpoint->waitForTimesEntered(1);
+ failpoint->waitForTimesEntered(failpoint.initialTimesEntered() + 1);
}
shutdownThread.join();
});
diff --git a/src/mongo/transport/service_executor_utils.cpp b/src/mongo/transport/service_executor_utils.cpp
index 96b03bdf989..317e4261598 100644
--- a/src/mongo/transport/service_executor_utils.cpp
+++ b/src/mongo/transport/service_executor_utils.cpp
@@ -157,20 +157,4 @@ Status launchServiceWorkerThread(unique_function<void()> task) {
return Status::OK();
}
-void scheduleCallbackOnDataAvailable(const SessionHandle& session,
- unique_function<void(Status)> callback,
- ServiceExecutor* executor) {
- invariant(session);
- executor->schedule([session, callback = std::move(callback), executor](Status status) {
- executor->yieldIfAppropriate();
-
- if (!status.isOK()) {
- callback(std::move(status));
- return;
- }
-
- callback(session->waitForData());
- });
-}
-
} // namespace mongo::transport
diff --git a/src/mongo/transport/service_executor_utils.h b/src/mongo/transport/service_executor_utils.h
index 057e0a1686d..3874a6ce653 100644
--- a/src/mongo/transport/service_executor_utils.h
+++ b/src/mongo/transport/service_executor_utils.h
@@ -37,13 +37,4 @@ namespace mongo::transport {
Status launchServiceWorkerThread(unique_function<void()> task);
-/**
- * The default implementation for "ServiceExecutor::runOnDataAvailable()", which blocks the caller
- * thread until data is available for reading. On success, it schedules "callback" on "executor".
- * Other implementations (e.g., "ServiceExecutorFixed") may provide asynchronous variants.
- */
-void scheduleCallbackOnDataAvailable(const SessionHandle& session,
- unique_function<void(Status)> callback,
- ServiceExecutor* executor);
-
} // namespace mongo::transport
diff --git a/src/mongo/transport/session_workflow.cpp b/src/mongo/transport/session_workflow.cpp
index b96d5a430d0..d38759fcc06 100644
--- a/src/mongo/transport/session_workflow.cpp
+++ b/src/mongo/transport/session_workflow.cpp
@@ -387,6 +387,14 @@ public:
return ServiceExecutorContext::get(client())->getServiceExecutor();
}
+ std::shared_ptr<ServiceExecutor::TaskRunner> taskRunner() {
+ auto exec = executor();
+ // Allows switching the executor between iterations of the workflow.
+ if (MONGO_unlikely(!_taskRunner.source || _taskRunner.source != exec))
+ _taskRunner = {exec->makeTaskRunner(), exec};
+ return _taskRunner.runner;
+ }
+
bool isTLS() const {
#ifdef MONGO_CONFIG_SSL
return SSLPeerInfo::forSession(session()).isTLS;
@@ -398,6 +406,11 @@ public:
private:
class WorkItem;
+ struct RunnerAndSource {
+ std::shared_ptr<ServiceExecutor::TaskRunner> runner;
+ ServiceExecutor* source = nullptr;
+ };
+
/** Alias: refers to this Impl, but holds a ref to the enclosing workflow. */
std::shared_ptr<Impl> shared_from_this() {
return {_workflow->shared_from_this(), this};
@@ -406,6 +419,7 @@ private:
SessionWorkflow* const _workflow;
ServiceContext* const _serviceContext;
ServiceEntryPoint* const _sep;
+ RunnerAndSource _taskRunner;
AtomicWord<bool> _isTerminated{false};
ClientStrandPtr _clientStrand;
@@ -640,9 +654,9 @@ void SessionWorkflow::Impl::scheduleNewLoop(Status status) try {
// Start our loop again with a new stack.
if (_nextWork) {
// If we're in exhaust, we're not expecting more data.
- executor()->schedule(std::move(cb));
+ taskRunner()->schedule(std::move(cb));
} else {
- executor()->runOnDataAvailable(session(), std::move(cb));
+ taskRunner()->runOnDataAvailable(session(), std::move(cb));
}
} catch (const DBException& ex) {
LOGV2_WARNING_OPTIONS(22993,
@@ -720,6 +734,7 @@ void SessionWorkflow::Impl::cleanupExhaustResources() {
void SessionWorkflow::Impl::cleanupSession(const Status& status) {
LOGV2_DEBUG(5127900, 2, "Ending session", "error"_attr = status);
+ _taskRunner = {};
cleanupExhaustResources();
_sep->onClientDisconnect(client());
}