diff options
author | Matthew Saltz <matthew.saltz@mongodb.com> | 2020-11-17 21:53:52 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2020-11-18 22:02:17 +0000 |
commit | 875cccb1375d07991dae89793a096d0eab123b11 (patch) | |
tree | 1346976f7e3c84b1d08ded76b8e3d98b8dba4569 | |
parent | a01be7ed8e475775ebec57dd6291c3cf5cd33ccf (diff) | |
download | mongo-875cccb1375d07991dae89793a096d0eab123b11.tar.gz |
SERVER-52916 Add futurized TaskExecutor::sleepUntil
-rw-r--r-- | src/mongo/executor/task_executor.cpp | 50 | ||||
-rw-r--r-- | src/mongo/executor/task_executor.h | 25 | ||||
-rw-r--r-- | src/mongo/executor/task_executor_test_common.cpp | 129 |
3 files changed, 204 insertions, 0 deletions
diff --git a/src/mongo/executor/task_executor.cpp b/src/mongo/executor/task_executor.cpp index 8a4b30f08e6..83ee516dfc4 100644 --- a/src/mongo/executor/task_executor.cpp +++ b/src/mongo/executor/task_executor.cpp @@ -114,6 +114,56 @@ void TaskExecutor::schedule(OutOfLineExecutor::Task func) { } } +ExecutorFuture<void> TaskExecutor::sleepUntil(Date_t when, const CancelationToken& token) { + if (token.isCanceled()) { + return ExecutorFuture<void>(shared_from_this(), TaskExecutor::kCallbackCanceledErrorStatus); + } + + if (when <= now()) { + return ExecutorFuture<void>(shared_from_this()); + } + + /** + * Encapsulates the promise associated with the result future. + */ + struct AlarmState { + void signal(const Status& status) { + if (status.isOK()) { + promise.emplaceValue(); + } else { + promise.setError(status); + } + } + + Promise<void> promise; + }; + + auto [promise, future] = makePromiseFuture<void>(); + // This has to be shared because Promises (and therefore AlarmState) are move-only and we need + // to maintain two copies: One to capture in the scheduleWorkAt callback, and one locally in + // case scheduling the request fails. + auto alarmState = std::make_shared<AlarmState>(AlarmState{std::move(promise)}); + + // Schedule a task to signal the alarm when the deadline is reached. + auto cbHandle = scheduleWorkAt( + when, [alarmState](const auto& args) mutable { alarmState->signal(args.status); }); + + // Handle cancelation via the input CancelationToken. + auto scheduleStatus = + wrapCallbackHandleWithCancelToken(shared_from_this(), std::move(cbHandle), token); + + if (!scheduleStatus.isOK()) { + // If scheduleStatus is not okay, then the callback passed to scheduleWorkAt should never + // run, meaning that it will be okay to set the promise here. + alarmState->signal(scheduleStatus); + } + + // TODO (SERVER-51285): Optimize to avoid an additional call to schedule to run the callback + // chained by the caller of sleepUntil. + return std::move(future).thenRunOn(shared_from_this()); +} + + TaskExecutor::CallbackState::CallbackState() = default; TaskExecutor::CallbackState::~CallbackState() = default; diff --git a/src/mongo/executor/task_executor.h b/src/mongo/executor/task_executor.h index 68f50a04f7e..5c656570462 100644 --- a/src/mongo/executor/task_executor.h +++ b/src/mongo/executor/task_executor.h @@ -273,6 +273,31 @@ public: virtual StatusWith<CallbackHandle> scheduleWorkAt(Date_t when, CallbackFn&& work) = 0; /** + * Returns an ExecutorFuture that will be resolved with success when the given date is reached. + * + * If the executor is already shut down when this is called, the resulting future will be set + * with ErrorCodes::ShutdownInProgress. + * + * Otherwise, if the executor shuts down or the token is canceled prior to the deadline being + * reached, the resulting ExecutorFuture will be set with ErrorCodes::CallbackCanceled. + */ + ExecutorFuture<void> sleepUntil(Date_t when, const CancelationToken& token); + + /** + * Returns an ExecutorFuture that will be resolved with success after the given duration has + * passed. + * + * If the executor is already shut down when this is called, the resulting future will be set + * with ErrorCodes::ShutdownInProgress. + * + * Otherwise, if the executor shuts down or the token is canceled prior to the deadline being + * reached, the resulting ExecutorFuture will be set with ErrorCodes::CallbackCanceled. + */ + ExecutorFuture<void> sleepFor(Milliseconds duration, const CancelationToken& token) { + return sleepUntil(now() + duration, token); + } + + /** * Schedules "cb" to be run by the executor with the result of executing the remote command * described by "request". * diff --git a/src/mongo/executor/task_executor_test_common.cpp b/src/mongo/executor/task_executor_test_common.cpp index 22841eaaf89..49187aa658c 100644 --- a/src/mongo/executor/task_executor_test_common.cpp +++ b/src/mongo/executor/task_executor_test_common.cpp @@ -382,6 +382,135 @@ COMMON_EXECUTOR_TEST(EventSignalWithTimeoutTest) { joinExecutorThread(); } +COMMON_EXECUTOR_TEST(SleepUntilReturnsReadyFutureWithSuccessWhenDeadlineAlreadyPassed) { + NetworkInterfaceMock* net = getNet(); + TaskExecutor& executor = getExecutor(); + launchExecutorThread(); + + const Date_t now = net->now(); + + auto alarm = executor.sleepUntil(now, CancelationToken::uncancelable()); + ASSERT(alarm.isReady()); + ASSERT_OK(alarm.getNoThrow()); + shutdownExecutorThread(); + joinExecutorThread(); +} + +COMMON_EXECUTOR_TEST( + SleepUntilReturnsReadyFutureWithShutdownInProgressWhenExecutorAlreadyShutdown) { + NetworkInterfaceMock* net = getNet(); + TaskExecutor& executor = getExecutor(); + launchExecutorThread(); + shutdownExecutorThread(); + joinExecutorThread(); + + const Date_t now = net->now(); + const Milliseconds sleepDuration{1000}; + const auto deadline = now + sleepDuration; + auto alarm = executor.sleepUntil(deadline, CancelationToken::uncancelable()); + + ASSERT(alarm.isReady()); + ASSERT_EQ(alarm.getNoThrow().code(), ErrorCodes::ShutdownInProgress); +} + +COMMON_EXECUTOR_TEST(SleepUntilReturnsReadyFutureWithCallbackCanceledWhenTokenAlreadyCanceled) { + NetworkInterfaceMock* net = getNet(); + TaskExecutor& executor = getExecutor(); + const Date_t now = net->now(); + const Milliseconds sleepDuration{1000}; + const auto deadline = now + sleepDuration; + CancelationSource cancelSource; + cancelSource.cancel(); + auto alarm = executor.sleepUntil(deadline, cancelSource.token()); + + ASSERT(alarm.isReady()); + ASSERT_EQ(alarm.getNoThrow().code(), ErrorCodes::CallbackCanceled); +} + +COMMON_EXECUTOR_TEST( + SleepUntilResolvesOutputFutureWithCallbackCanceledWhenTokenCanceledBeforeDeadline) { + NetworkInterfaceMock* net = getNet(); + TaskExecutor& executor = getExecutor(); + launchExecutorThread(); + + const Date_t now = net->now(); + const Milliseconds sleepDuration{1000}; + const auto deadline = now + sleepDuration; + CancelationSource cancelSource; + auto alarm = executor.sleepUntil(deadline, cancelSource.token()); + + ASSERT_FALSE(alarm.isReady()); + + net->enterNetwork(); + // Run almost until the deadline. This isn't really necessary for the test since we could just + // skip running the clock at all, so just doing this for some "realism". + net->runUntil(deadline - Milliseconds(1)); + net->exitNetwork(); + + // Cancel before deadline. + cancelSource.cancel(); + // Required to process the cancelation. + net->enterNetwork(); + net->exitNetwork(); + + ASSERT(alarm.isReady()); + ASSERT_EQ(alarm.getNoThrow().code(), ErrorCodes::CallbackCanceled); + + shutdownExecutorThread(); + joinExecutorThread(); +} + +COMMON_EXECUTOR_TEST( + SleepUntilResolvesOutputFutureWithCallbackCanceledWhenExecutorShutsDownBeforeDeadline) { + NetworkInterfaceMock* net = getNet(); + TaskExecutor& executor = getExecutor(); + launchExecutorThread(); + + const Date_t now = net->now(); + const Milliseconds sleepDuration{1000}; + const auto deadline = now + sleepDuration; + CancelationSource cancelSource; + auto alarm = executor.sleepUntil(deadline, cancelSource.token()); + + ASSERT_FALSE(alarm.isReady()); + + net->enterNetwork(); + // Run almost until the deadline. This isn't really necessary for the test since we could just + // skip running the clock at all, so just doing this for some "realism". + net->runUntil(deadline - Milliseconds(1)); + net->exitNetwork(); + + // Shut down before deadline. + shutdownExecutorThread(); + joinExecutorThread(); + + ASSERT(alarm.isReady()); + ASSERT_EQ(alarm.getNoThrow().code(), ErrorCodes::CallbackCanceled); +} + +COMMON_EXECUTOR_TEST(SleepUntilResolvesOutputFutureWithSuccessWhenDeadlinePasses) { + NetworkInterfaceMock* net = getNet(); + TaskExecutor& executor = getExecutor(); + launchExecutorThread(); + + const Date_t now = net->now(); + const Milliseconds sleepDuration{1000}; + const auto deadline = now + sleepDuration; + + auto alarm = executor.sleepUntil(deadline, CancelationToken::uncancelable()); + ASSERT_FALSE(alarm.isReady()); + + net->enterNetwork(); + net->runUntil(deadline); + net->exitNetwork(); + + ASSERT(alarm.isReady()); + ASSERT_OK(alarm.getNoThrow()); + + executor.shutdown(); + joinExecutorThread(); +} + COMMON_EXECUTOR_TEST(ScheduleWorkAt) { NetworkInterfaceMock* net = getNet(); TaskExecutor& executor = getExecutor(); |