diff options
author | Amirsaman Memaripour <amirsaman.memaripour@mongodb.com> | 2020-07-13 23:09:50 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2020-07-22 19:16:58 +0000 |
commit | dca9e5d997042e11368c7a22e465eb07188fcc84 (patch) | |
tree | a683b32ec07041da9b757b2114278a80dd74fa57 | |
parent | db78b73d0b1cda09831163e9283e7ff5adf90fa7 (diff) | |
download | mongo-dca9e5d997042e11368c7a22e465eb07188fcc84.tar.gz |
SERVER-49105 Make ServiceExecutor extend OutOfLineExecutor
-rw-r--r-- | src/mongo/transport/service_executor.h | 15 | ||||
-rw-r--r-- | src/mongo/transport/service_executor_fixed.cpp | 2 | ||||
-rw-r--r-- | src/mongo/transport/service_executor_fixed.h | 2 | ||||
-rw-r--r-- | src/mongo/transport/service_executor_reserved.cpp | 2 | ||||
-rw-r--r-- | src/mongo/transport/service_executor_reserved.h | 2 | ||||
-rw-r--r-- | src/mongo/transport/service_executor_synchronous.cpp | 4 | ||||
-rw-r--r-- | src/mongo/transport/service_executor_synchronous.h | 2 | ||||
-rw-r--r-- | src/mongo/transport/service_executor_test.cpp | 22 | ||||
-rw-r--r-- | src/mongo/transport/service_state_machine.cpp | 2 | ||||
-rw-r--r-- | src/mongo/transport/service_state_machine_test.cpp | 2 |
10 files changed, 33 insertions, 22 deletions
diff --git a/src/mongo/transport/service_executor.h b/src/mongo/transport/service_executor.h index d44bb704c66..859969efa66 100644 --- a/src/mongo/transport/service_executor.h +++ b/src/mongo/transport/service_executor.h @@ -37,6 +37,7 @@ #include "mongo/transport/transport_mode.h" #include "mongo/util/duration.h" #include "mongo/util/functional.h" +#include "mongo/util/out_of_line_executor.h" namespace mongo { // This needs to be forward declared here because the service_context.h is a circular dependency. @@ -47,7 +48,7 @@ namespace transport { /* * This is the interface for all ServiceExecutors. */ -class ServiceExecutor { +class ServiceExecutor : public OutOfLineExecutor { public: virtual ~ServiceExecutor() = default; using Task = unique_function<void()>; @@ -81,7 +82,17 @@ public: * If defer is true, then the executor may defer execution of this Task until an available * thread is available. */ - virtual Status schedule(Task task, ScheduleFlags flags) = 0; + virtual Status scheduleTask(Task task, ScheduleFlags flags) = 0; + + /* + * Provides an executor-friendly wrapper for "scheduleTask". Internally, it wraps instance of + * "OutOfLineExecutor::Task" inside "ServiceExecutor::Task" objects, which are then scheduled + * for execution on the service executor. May throw if "scheduleTask" returns a non-okay status. + */ + void schedule(OutOfLineExecutor::Task func) override { + internalAssert(scheduleTask([task = std::move(func)]() mutable { task(Status::OK()); }, + ScheduleFlags::kEmptyFlags)); + } /* * Stops and joins the ServiceExecutor. Any outstanding tasks will not be executed, and any diff --git a/src/mongo/transport/service_executor_fixed.cpp b/src/mongo/transport/service_executor_fixed.cpp index ba8eca2bfaf..ac3653126e0 100644 --- a/src/mongo/transport/service_executor_fixed.cpp +++ b/src/mongo/transport/service_executor_fixed.cpp @@ -109,7 +109,7 @@ Status ServiceExecutorFixed::shutdown(Milliseconds timeout) { return waitForShutdown(); } -Status ServiceExecutorFixed::schedule(Task task, ScheduleFlags flags) { +Status ServiceExecutorFixed::scheduleTask(Task task, ScheduleFlags flags) { if (!_canScheduleWork.load()) { return Status(ErrorCodes::ShutdownInProgress, "Executor is not running"); } diff --git a/src/mongo/transport/service_executor_fixed.h b/src/mongo/transport/service_executor_fixed.h index c418296db8b..127c3e5d24d 100644 --- a/src/mongo/transport/service_executor_fixed.h +++ b/src/mongo/transport/service_executor_fixed.h @@ -56,7 +56,7 @@ public: Status start() override; Status shutdown(Milliseconds timeout) override; - Status schedule(Task task, ScheduleFlags flags) override; + Status scheduleTask(Task task, ScheduleFlags flags) override; Mode transportMode() const override { return Mode::kSynchronous; diff --git a/src/mongo/transport/service_executor_reserved.cpp b/src/mongo/transport/service_executor_reserved.cpp index f262f030583..c03070a43e9 100644 --- a/src/mongo/transport/service_executor_reserved.cpp +++ b/src/mongo/transport/service_executor_reserved.cpp @@ -165,7 +165,7 @@ Status ServiceExecutorReserved::shutdown(Milliseconds timeout) { "reserved executor couldn't shutdown all worker threads within time limit."); } -Status ServiceExecutorReserved::schedule(Task task, ScheduleFlags flags) { +Status ServiceExecutorReserved::scheduleTask(Task task, ScheduleFlags flags) { if (!_stillRunning.load()) { return Status{ErrorCodes::ShutdownInProgress, "Executor is not running"}; } diff --git a/src/mongo/transport/service_executor_reserved.h b/src/mongo/transport/service_executor_reserved.h index b6b0f17f803..4d2aabbce36 100644 --- a/src/mongo/transport/service_executor_reserved.h +++ b/src/mongo/transport/service_executor_reserved.h @@ -56,7 +56,7 @@ public: Status start() override; Status shutdown(Milliseconds timeout) override; - Status schedule(Task task, ScheduleFlags flags) override; + Status scheduleTask(Task task, ScheduleFlags flags) override; Mode transportMode() const override { return Mode::kSynchronous; diff --git a/src/mongo/transport/service_executor_synchronous.cpp b/src/mongo/transport/service_executor_synchronous.cpp index 10cc0ad3cce..cab4c158433 100644 --- a/src/mongo/transport/service_executor_synchronous.cpp +++ b/src/mongo/transport/service_executor_synchronous.cpp @@ -78,7 +78,7 @@ Status ServiceExecutorSynchronous::shutdown(Milliseconds timeout) { "passthrough executor couldn't shutdown all worker threads within time limit."); } -Status ServiceExecutorSynchronous::schedule(Task task, ScheduleFlags flags) { +Status ServiceExecutorSynchronous::scheduleTask(Task task, ScheduleFlags flags) { if (!_stillRunning.load()) { return Status{ErrorCodes::ShutdownInProgress, "Executor is not running"}; } @@ -109,7 +109,7 @@ Status ServiceExecutorSynchronous::schedule(Task task, ScheduleFlags flags) { return Status::OK(); } - // First call to schedule() for this connection, spawn a worker thread that will push jobs + // First call to scheduleTask() for this connection, spawn a worker thread that will push jobs // into the thread local job queue. LOGV2_DEBUG(22983, 3, "Starting new executor thread in passthrough mode"); diff --git a/src/mongo/transport/service_executor_synchronous.h b/src/mongo/transport/service_executor_synchronous.h index 5fee75bbf2d..19a651f9f09 100644 --- a/src/mongo/transport/service_executor_synchronous.h +++ b/src/mongo/transport/service_executor_synchronous.h @@ -51,7 +51,7 @@ public: Status start() override; Status shutdown(Milliseconds timeout) override; - Status schedule(Task task, ScheduleFlags flags) override; + Status scheduleTask(Task task, ScheduleFlags flags) override; Mode transportMode() const override { return Mode::kSynchronous; diff --git a/src/mongo/transport/service_executor_test.cpp b/src/mongo/transport/service_executor_test.cpp index f79597a5521..94c8d092b6d 100644 --- a/src/mongo/transport/service_executor_test.cpp +++ b/src/mongo/transport/service_executor_test.cpp @@ -146,7 +146,7 @@ void scheduleBasicTask(ServiceExecutor* exec, bool expectSuccess) { auto barrier = std::make_shared<unittest::Barrier>(2); auto task = [barrier] { barrier->countDownAndWait(); }; - auto status = exec->schedule(std::move(task), ServiceExecutor::kEmptyFlags); + auto status = exec->scheduleTask(std::move(task), ServiceExecutor::kEmptyFlags); if (expectSuccess) { ASSERT_OK(status); barrier->countDownAndWait(); @@ -213,7 +213,7 @@ private: TEST_F(ServiceExecutorFixedFixture, ScheduleFailsBeforeStartup) { auto executor = getServiceExecutor(); - ASSERT_NOT_OK(executor->schedule([] {}, ServiceExecutor::kEmptyFlags)); + ASSERT_NOT_OK(executor->scheduleTask([] {}, ServiceExecutor::kEmptyFlags)); } DEATH_TEST_F(ServiceExecutorFixedFixture, DestructorFailsBeforeShutdown, "invariant") { @@ -224,8 +224,8 @@ DEATH_TEST_F(ServiceExecutorFixedFixture, DestructorFailsBeforeShutdown, "invari TEST_F(ServiceExecutorFixedFixture, BasicTaskRuns) { auto executor = startAndGetServiceExecutor(); auto barrier = std::make_shared<unittest::Barrier>(2); - ASSERT_OK(executor->schedule([barrier]() mutable { barrier->countDownAndWait(); }, - ServiceExecutor::kEmptyFlags)); + ASSERT_OK(executor->scheduleTask([barrier]() mutable { barrier->countDownAndWait(); }, + ServiceExecutor::kEmptyFlags)); barrier->countDownAndWait(); } @@ -237,7 +237,7 @@ TEST_F(ServiceExecutorFixedFixture, RecursiveTask) { recursiveTask = [&, barrier] { auto recursionGuard = makeRecursionGuard(); if (getRecursionDepth() < fixedServiceExecutorRecursionLimit.load()) { - ASSERT_OK(executor->schedule(recursiveTask, ServiceExecutor::kMayRecurse)); + ASSERT_OK(executor->scheduleTask(recursiveTask, ServiceExecutor::kMayRecurse)); } else { // This test never returns unless the service executor can satisfy the recursion depth. barrier->countDownAndWait(); @@ -245,7 +245,7 @@ TEST_F(ServiceExecutorFixedFixture, RecursiveTask) { }; // Schedule recursive task and wait for the recursion to stop - ASSERT_OK(executor->schedule(recursiveTask, ServiceExecutor::kMayRecurse)); + ASSERT_OK(executor->scheduleTask(recursiveTask, ServiceExecutor::kMayRecurse)); barrier->countDownAndWait(); } @@ -262,7 +262,7 @@ TEST_F(ServiceExecutorFixedFixture, FlattenRecursiveScheduledTasks) { auto recursionGuard = makeRecursionGuard(); ASSERT_EQ(getRecursionDepth(), 1); if (tasksToSchedule.fetchAndSubtract(1) > 0) { - ASSERT_OK(executor->schedule(recursiveTask, ServiceExecutor::kEmptyFlags)); + ASSERT_OK(executor->scheduleTask(recursiveTask, ServiceExecutor::kEmptyFlags)); } else { // Once there are no more tasks to schedule, notify the main thread to proceed. barrier->countDownAndWait(); @@ -270,7 +270,7 @@ TEST_F(ServiceExecutorFixedFixture, FlattenRecursiveScheduledTasks) { }; // Schedule the recursive task and wait for the execution to finish. - ASSERT_OK(executor->schedule(recursiveTask, ServiceExecutor::kMayYieldBeforeSchedule)); + ASSERT_OK(executor->scheduleTask(recursiveTask, ServiceExecutor::kMayYieldBeforeSchedule)); barrier->countDownAndWait(); } @@ -279,7 +279,7 @@ TEST_F(ServiceExecutorFixedFixture, ShutdownTimeLimit) { auto invoked = std::make_shared<SharedPromise<void>>(); auto mayReturn = std::make_shared<SharedPromise<void>>(); - ASSERT_OK(executor->schedule( + ASSERT_OK(executor->scheduleTask( [executor, invoked, mayReturn]() mutable { invoked->emplaceValue(); mayReturn->getFuture().get(); @@ -304,7 +304,7 @@ TEST_F(ServiceExecutorFixedFixture, Stats) { }; for (auto i = 0; i < kNumExecutorThreads; i++) { - ASSERT_OK(executor->schedule(task, ServiceExecutor::kEmptyFlags)); + ASSERT_OK(executor->scheduleTask(task, ServiceExecutor::kEmptyFlags)); } // The main thread waits for the executor threads to bump up "threadsRunning" while picking up a @@ -337,7 +337,7 @@ TEST_F(ServiceExecutorFixedFixture, ScheduleFailsAfterShutdown) { FailPointEnableBlock failpoint("hangBeforeSchedulingServiceExecutorFixedTask"); schedulerThread = std::make_unique<stdx::thread>([executor] { ASSERT_NOT_OK( - executor->schedule([] { MONGO_UNREACHABLE; }, ServiceExecutor::kEmptyFlags)); + executor->scheduleTask([] { MONGO_UNREACHABLE; }, ServiceExecutor::kEmptyFlags)); }); failpoint->waitForTimesEntered(1); ASSERT_OK(executor->shutdown(kShutdownTime)); diff --git a/src/mongo/transport/service_state_machine.cpp b/src/mongo/transport/service_state_machine.cpp index fb09fa9ad85..5fbff04a151 100644 --- a/src/mongo/transport/service_state_machine.cpp +++ b/src/mongo/transport/service_state_machine.cpp @@ -597,7 +597,7 @@ void ServiceStateMachine::_scheduleNextWithGuard(ThreadGuard guard, ssm->_runNextInGuard(std::move(guard)); }; guard.release(); - Status status = _serviceExecutor->schedule(std::move(func), flags); + Status status = _serviceExecutor->scheduleTask(std::move(func), flags); if (status.isOK()) { return; } diff --git a/src/mongo/transport/service_state_machine_test.cpp b/src/mongo/transport/service_state_machine_test.cpp index e10268c8c78..0189093a34e 100644 --- a/src/mongo/transport/service_state_machine_test.cpp +++ b/src/mongo/transport/service_state_machine_test.cpp @@ -252,7 +252,7 @@ public: Status shutdown(Milliseconds timeout) override { return Status::OK(); } - Status schedule(Task task, ScheduleFlags flags) override { + Status scheduleTask(Task task, ScheduleFlags flags) override { if (!_scheduleHook) { return Status::OK(); } else { |