summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAmirsaman Memaripour <amirsaman.memaripour@mongodb.com>2020-07-13 23:09:50 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2020-07-22 19:16:58 +0000
commitdca9e5d997042e11368c7a22e465eb07188fcc84 (patch)
treea683b32ec07041da9b757b2114278a80dd74fa57
parentdb78b73d0b1cda09831163e9283e7ff5adf90fa7 (diff)
downloadmongo-dca9e5d997042e11368c7a22e465eb07188fcc84.tar.gz
SERVER-49105 Make ServiceExecutor extend OutOfLineExecutor
-rw-r--r--src/mongo/transport/service_executor.h15
-rw-r--r--src/mongo/transport/service_executor_fixed.cpp2
-rw-r--r--src/mongo/transport/service_executor_fixed.h2
-rw-r--r--src/mongo/transport/service_executor_reserved.cpp2
-rw-r--r--src/mongo/transport/service_executor_reserved.h2
-rw-r--r--src/mongo/transport/service_executor_synchronous.cpp4
-rw-r--r--src/mongo/transport/service_executor_synchronous.h2
-rw-r--r--src/mongo/transport/service_executor_test.cpp22
-rw-r--r--src/mongo/transport/service_state_machine.cpp2
-rw-r--r--src/mongo/transport/service_state_machine_test.cpp2
10 files changed, 33 insertions, 22 deletions
diff --git a/src/mongo/transport/service_executor.h b/src/mongo/transport/service_executor.h
index d44bb704c66..859969efa66 100644
--- a/src/mongo/transport/service_executor.h
+++ b/src/mongo/transport/service_executor.h
@@ -37,6 +37,7 @@
#include "mongo/transport/transport_mode.h"
#include "mongo/util/duration.h"
#include "mongo/util/functional.h"
+#include "mongo/util/out_of_line_executor.h"
namespace mongo {
// This needs to be forward declared here because the service_context.h is a circular dependency.
@@ -47,7 +48,7 @@ namespace transport {
/*
* This is the interface for all ServiceExecutors.
*/
-class ServiceExecutor {
+class ServiceExecutor : public OutOfLineExecutor {
public:
virtual ~ServiceExecutor() = default;
using Task = unique_function<void()>;
@@ -81,7 +82,17 @@ public:
* If defer is true, then the executor may defer execution of this Task until an available
* thread is available.
*/
- virtual Status schedule(Task task, ScheduleFlags flags) = 0;
+ virtual Status scheduleTask(Task task, ScheduleFlags flags) = 0;
+
+ /*
+ * Provides an executor-friendly wrapper for "scheduleTask". Internally, it wraps instance of
+ * "OutOfLineExecutor::Task" inside "ServiceExecutor::Task" objects, which are then scheduled
+ * for execution on the service executor. May throw if "scheduleTask" returns a non-okay status.
+ */
+ void schedule(OutOfLineExecutor::Task func) override {
+ internalAssert(scheduleTask([task = std::move(func)]() mutable { task(Status::OK()); },
+ ScheduleFlags::kEmptyFlags));
+ }
/*
* Stops and joins the ServiceExecutor. Any outstanding tasks will not be executed, and any
diff --git a/src/mongo/transport/service_executor_fixed.cpp b/src/mongo/transport/service_executor_fixed.cpp
index ba8eca2bfaf..ac3653126e0 100644
--- a/src/mongo/transport/service_executor_fixed.cpp
+++ b/src/mongo/transport/service_executor_fixed.cpp
@@ -109,7 +109,7 @@ Status ServiceExecutorFixed::shutdown(Milliseconds timeout) {
return waitForShutdown();
}
-Status ServiceExecutorFixed::schedule(Task task, ScheduleFlags flags) {
+Status ServiceExecutorFixed::scheduleTask(Task task, ScheduleFlags flags) {
if (!_canScheduleWork.load()) {
return Status(ErrorCodes::ShutdownInProgress, "Executor is not running");
}
diff --git a/src/mongo/transport/service_executor_fixed.h b/src/mongo/transport/service_executor_fixed.h
index c418296db8b..127c3e5d24d 100644
--- a/src/mongo/transport/service_executor_fixed.h
+++ b/src/mongo/transport/service_executor_fixed.h
@@ -56,7 +56,7 @@ public:
Status start() override;
Status shutdown(Milliseconds timeout) override;
- Status schedule(Task task, ScheduleFlags flags) override;
+ Status scheduleTask(Task task, ScheduleFlags flags) override;
Mode transportMode() const override {
return Mode::kSynchronous;
diff --git a/src/mongo/transport/service_executor_reserved.cpp b/src/mongo/transport/service_executor_reserved.cpp
index f262f030583..c03070a43e9 100644
--- a/src/mongo/transport/service_executor_reserved.cpp
+++ b/src/mongo/transport/service_executor_reserved.cpp
@@ -165,7 +165,7 @@ Status ServiceExecutorReserved::shutdown(Milliseconds timeout) {
"reserved executor couldn't shutdown all worker threads within time limit.");
}
-Status ServiceExecutorReserved::schedule(Task task, ScheduleFlags flags) {
+Status ServiceExecutorReserved::scheduleTask(Task task, ScheduleFlags flags) {
if (!_stillRunning.load()) {
return Status{ErrorCodes::ShutdownInProgress, "Executor is not running"};
}
diff --git a/src/mongo/transport/service_executor_reserved.h b/src/mongo/transport/service_executor_reserved.h
index b6b0f17f803..4d2aabbce36 100644
--- a/src/mongo/transport/service_executor_reserved.h
+++ b/src/mongo/transport/service_executor_reserved.h
@@ -56,7 +56,7 @@ public:
Status start() override;
Status shutdown(Milliseconds timeout) override;
- Status schedule(Task task, ScheduleFlags flags) override;
+ Status scheduleTask(Task task, ScheduleFlags flags) override;
Mode transportMode() const override {
return Mode::kSynchronous;
diff --git a/src/mongo/transport/service_executor_synchronous.cpp b/src/mongo/transport/service_executor_synchronous.cpp
index 10cc0ad3cce..cab4c158433 100644
--- a/src/mongo/transport/service_executor_synchronous.cpp
+++ b/src/mongo/transport/service_executor_synchronous.cpp
@@ -78,7 +78,7 @@ Status ServiceExecutorSynchronous::shutdown(Milliseconds timeout) {
"passthrough executor couldn't shutdown all worker threads within time limit.");
}
-Status ServiceExecutorSynchronous::schedule(Task task, ScheduleFlags flags) {
+Status ServiceExecutorSynchronous::scheduleTask(Task task, ScheduleFlags flags) {
if (!_stillRunning.load()) {
return Status{ErrorCodes::ShutdownInProgress, "Executor is not running"};
}
@@ -109,7 +109,7 @@ Status ServiceExecutorSynchronous::schedule(Task task, ScheduleFlags flags) {
return Status::OK();
}
- // First call to schedule() for this connection, spawn a worker thread that will push jobs
+ // First call to scheduleTask() for this connection, spawn a worker thread that will push jobs
// into the thread local job queue.
LOGV2_DEBUG(22983, 3, "Starting new executor thread in passthrough mode");
diff --git a/src/mongo/transport/service_executor_synchronous.h b/src/mongo/transport/service_executor_synchronous.h
index 5fee75bbf2d..19a651f9f09 100644
--- a/src/mongo/transport/service_executor_synchronous.h
+++ b/src/mongo/transport/service_executor_synchronous.h
@@ -51,7 +51,7 @@ public:
Status start() override;
Status shutdown(Milliseconds timeout) override;
- Status schedule(Task task, ScheduleFlags flags) override;
+ Status scheduleTask(Task task, ScheduleFlags flags) override;
Mode transportMode() const override {
return Mode::kSynchronous;
diff --git a/src/mongo/transport/service_executor_test.cpp b/src/mongo/transport/service_executor_test.cpp
index f79597a5521..94c8d092b6d 100644
--- a/src/mongo/transport/service_executor_test.cpp
+++ b/src/mongo/transport/service_executor_test.cpp
@@ -146,7 +146,7 @@ void scheduleBasicTask(ServiceExecutor* exec, bool expectSuccess) {
auto barrier = std::make_shared<unittest::Barrier>(2);
auto task = [barrier] { barrier->countDownAndWait(); };
- auto status = exec->schedule(std::move(task), ServiceExecutor::kEmptyFlags);
+ auto status = exec->scheduleTask(std::move(task), ServiceExecutor::kEmptyFlags);
if (expectSuccess) {
ASSERT_OK(status);
barrier->countDownAndWait();
@@ -213,7 +213,7 @@ private:
TEST_F(ServiceExecutorFixedFixture, ScheduleFailsBeforeStartup) {
auto executor = getServiceExecutor();
- ASSERT_NOT_OK(executor->schedule([] {}, ServiceExecutor::kEmptyFlags));
+ ASSERT_NOT_OK(executor->scheduleTask([] {}, ServiceExecutor::kEmptyFlags));
}
DEATH_TEST_F(ServiceExecutorFixedFixture, DestructorFailsBeforeShutdown, "invariant") {
@@ -224,8 +224,8 @@ DEATH_TEST_F(ServiceExecutorFixedFixture, DestructorFailsBeforeShutdown, "invari
TEST_F(ServiceExecutorFixedFixture, BasicTaskRuns) {
auto executor = startAndGetServiceExecutor();
auto barrier = std::make_shared<unittest::Barrier>(2);
- ASSERT_OK(executor->schedule([barrier]() mutable { barrier->countDownAndWait(); },
- ServiceExecutor::kEmptyFlags));
+ ASSERT_OK(executor->scheduleTask([barrier]() mutable { barrier->countDownAndWait(); },
+ ServiceExecutor::kEmptyFlags));
barrier->countDownAndWait();
}
@@ -237,7 +237,7 @@ TEST_F(ServiceExecutorFixedFixture, RecursiveTask) {
recursiveTask = [&, barrier] {
auto recursionGuard = makeRecursionGuard();
if (getRecursionDepth() < fixedServiceExecutorRecursionLimit.load()) {
- ASSERT_OK(executor->schedule(recursiveTask, ServiceExecutor::kMayRecurse));
+ ASSERT_OK(executor->scheduleTask(recursiveTask, ServiceExecutor::kMayRecurse));
} else {
// This test never returns unless the service executor can satisfy the recursion depth.
barrier->countDownAndWait();
@@ -245,7 +245,7 @@ TEST_F(ServiceExecutorFixedFixture, RecursiveTask) {
};
// Schedule recursive task and wait for the recursion to stop
- ASSERT_OK(executor->schedule(recursiveTask, ServiceExecutor::kMayRecurse));
+ ASSERT_OK(executor->scheduleTask(recursiveTask, ServiceExecutor::kMayRecurse));
barrier->countDownAndWait();
}
@@ -262,7 +262,7 @@ TEST_F(ServiceExecutorFixedFixture, FlattenRecursiveScheduledTasks) {
auto recursionGuard = makeRecursionGuard();
ASSERT_EQ(getRecursionDepth(), 1);
if (tasksToSchedule.fetchAndSubtract(1) > 0) {
- ASSERT_OK(executor->schedule(recursiveTask, ServiceExecutor::kEmptyFlags));
+ ASSERT_OK(executor->scheduleTask(recursiveTask, ServiceExecutor::kEmptyFlags));
} else {
// Once there are no more tasks to schedule, notify the main thread to proceed.
barrier->countDownAndWait();
@@ -270,7 +270,7 @@ TEST_F(ServiceExecutorFixedFixture, FlattenRecursiveScheduledTasks) {
};
// Schedule the recursive task and wait for the execution to finish.
- ASSERT_OK(executor->schedule(recursiveTask, ServiceExecutor::kMayYieldBeforeSchedule));
+ ASSERT_OK(executor->scheduleTask(recursiveTask, ServiceExecutor::kMayYieldBeforeSchedule));
barrier->countDownAndWait();
}
@@ -279,7 +279,7 @@ TEST_F(ServiceExecutorFixedFixture, ShutdownTimeLimit) {
auto invoked = std::make_shared<SharedPromise<void>>();
auto mayReturn = std::make_shared<SharedPromise<void>>();
- ASSERT_OK(executor->schedule(
+ ASSERT_OK(executor->scheduleTask(
[executor, invoked, mayReturn]() mutable {
invoked->emplaceValue();
mayReturn->getFuture().get();
@@ -304,7 +304,7 @@ TEST_F(ServiceExecutorFixedFixture, Stats) {
};
for (auto i = 0; i < kNumExecutorThreads; i++) {
- ASSERT_OK(executor->schedule(task, ServiceExecutor::kEmptyFlags));
+ ASSERT_OK(executor->scheduleTask(task, ServiceExecutor::kEmptyFlags));
}
// The main thread waits for the executor threads to bump up "threadsRunning" while picking up a
@@ -337,7 +337,7 @@ TEST_F(ServiceExecutorFixedFixture, ScheduleFailsAfterShutdown) {
FailPointEnableBlock failpoint("hangBeforeSchedulingServiceExecutorFixedTask");
schedulerThread = std::make_unique<stdx::thread>([executor] {
ASSERT_NOT_OK(
- executor->schedule([] { MONGO_UNREACHABLE; }, ServiceExecutor::kEmptyFlags));
+ executor->scheduleTask([] { MONGO_UNREACHABLE; }, ServiceExecutor::kEmptyFlags));
});
failpoint->waitForTimesEntered(1);
ASSERT_OK(executor->shutdown(kShutdownTime));
diff --git a/src/mongo/transport/service_state_machine.cpp b/src/mongo/transport/service_state_machine.cpp
index fb09fa9ad85..5fbff04a151 100644
--- a/src/mongo/transport/service_state_machine.cpp
+++ b/src/mongo/transport/service_state_machine.cpp
@@ -597,7 +597,7 @@ void ServiceStateMachine::_scheduleNextWithGuard(ThreadGuard guard,
ssm->_runNextInGuard(std::move(guard));
};
guard.release();
- Status status = _serviceExecutor->schedule(std::move(func), flags);
+ Status status = _serviceExecutor->scheduleTask(std::move(func), flags);
if (status.isOK()) {
return;
}
diff --git a/src/mongo/transport/service_state_machine_test.cpp b/src/mongo/transport/service_state_machine_test.cpp
index e10268c8c78..0189093a34e 100644
--- a/src/mongo/transport/service_state_machine_test.cpp
+++ b/src/mongo/transport/service_state_machine_test.cpp
@@ -252,7 +252,7 @@ public:
Status shutdown(Milliseconds timeout) override {
return Status::OK();
}
- Status schedule(Task task, ScheduleFlags flags) override {
+ Status scheduleTask(Task task, ScheduleFlags flags) override {
if (!_scheduleHook) {
return Status::OK();
} else {