summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Saltz <matthew.saltz@mongodb.com>2020-11-17 21:53:52 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2020-11-18 22:02:17 +0000
commit875cccb1375d07991dae89793a096d0eab123b11 (patch)
tree1346976f7e3c84b1d08ded76b8e3d98b8dba4569
parenta01be7ed8e475775ebec57dd6291c3cf5cd33ccf (diff)
downloadmongo-875cccb1375d07991dae89793a096d0eab123b11.tar.gz
SERVER-52916 Add futurized TaskExecutor::sleepUntil
-rw-r--r--src/mongo/executor/task_executor.cpp50
-rw-r--r--src/mongo/executor/task_executor.h25
-rw-r--r--src/mongo/executor/task_executor_test_common.cpp129
3 files changed, 204 insertions, 0 deletions
diff --git a/src/mongo/executor/task_executor.cpp b/src/mongo/executor/task_executor.cpp
index 8a4b30f08e6..83ee516dfc4 100644
--- a/src/mongo/executor/task_executor.cpp
+++ b/src/mongo/executor/task_executor.cpp
@@ -114,6 +114,56 @@ void TaskExecutor::schedule(OutOfLineExecutor::Task func) {
}
}
+ExecutorFuture<void> TaskExecutor::sleepUntil(Date_t when, const CancelationToken& token) {
+ if (token.isCanceled()) {
+ return ExecutorFuture<void>(shared_from_this(), TaskExecutor::kCallbackCanceledErrorStatus);
+ }
+
+ if (when <= now()) {
+ return ExecutorFuture<void>(shared_from_this());
+ }
+
+ /**
+ * Encapsulates the promise associated with the result future.
+ */
+ struct AlarmState {
+ void signal(const Status& status) {
+ if (status.isOK()) {
+ promise.emplaceValue();
+ } else {
+ promise.setError(status);
+ }
+ }
+
+ Promise<void> promise;
+ };
+
+ auto [promise, future] = makePromiseFuture<void>();
+ // This has to be shared because Promises (and therefore AlarmState) are move-only and we need
+ // to maintain two copies: One to capture in the scheduleWorkAt callback, and one locally in
+ // case scheduling the request fails.
+ auto alarmState = std::make_shared<AlarmState>(AlarmState{std::move(promise)});
+
+ // Schedule a task to signal the alarm when the deadline is reached.
+ auto cbHandle = scheduleWorkAt(
+ when, [alarmState](const auto& args) mutable { alarmState->signal(args.status); });
+
+ // Handle cancelation via the input CancelationToken.
+ auto scheduleStatus =
+ wrapCallbackHandleWithCancelToken(shared_from_this(), std::move(cbHandle), token);
+
+ if (!scheduleStatus.isOK()) {
+ // If scheduleStatus is not okay, then the callback passed to scheduleWorkAt should never
+ // run, meaning that it will be okay to set the promise here.
+ alarmState->signal(scheduleStatus);
+ }
+
+ // TODO (SERVER-51285): Optimize to avoid an additional call to schedule to run the callback
+ // chained by the caller of sleepUntil.
+ return std::move(future).thenRunOn(shared_from_this());
+}
+
+
TaskExecutor::CallbackState::CallbackState() = default;
TaskExecutor::CallbackState::~CallbackState() = default;
diff --git a/src/mongo/executor/task_executor.h b/src/mongo/executor/task_executor.h
index 68f50a04f7e..5c656570462 100644
--- a/src/mongo/executor/task_executor.h
+++ b/src/mongo/executor/task_executor.h
@@ -273,6 +273,31 @@ public:
virtual StatusWith<CallbackHandle> scheduleWorkAt(Date_t when, CallbackFn&& work) = 0;
/**
+ * Returns an ExecutorFuture that will be resolved with success when the given date is reached.
+ *
+ * If the executor is already shut down when this is called, the resulting future will be set
+ * with ErrorCodes::ShutdownInProgress.
+ *
+ * Otherwise, if the executor shuts down or the token is canceled prior to the deadline being
+ * reached, the resulting ExecutorFuture will be set with ErrorCodes::CallbackCanceled.
+ */
+ ExecutorFuture<void> sleepUntil(Date_t when, const CancelationToken& token);
+
+ /**
+ * Returns an ExecutorFuture that will be resolved with success after the given duration has
+ * passed.
+ *
+ * If the executor is already shut down when this is called, the resulting future will be set
+ * with ErrorCodes::ShutdownInProgress.
+ *
+ * Otherwise, if the executor shuts down or the token is canceled prior to the deadline being
+ * reached, the resulting ExecutorFuture will be set with ErrorCodes::CallbackCanceled.
+ */
+ ExecutorFuture<void> sleepFor(Milliseconds duration, const CancelationToken& token) {
+ return sleepUntil(now() + duration, token);
+ }
+
+ /**
* Schedules "cb" to be run by the executor with the result of executing the remote command
* described by "request".
*
diff --git a/src/mongo/executor/task_executor_test_common.cpp b/src/mongo/executor/task_executor_test_common.cpp
index 22841eaaf89..49187aa658c 100644
--- a/src/mongo/executor/task_executor_test_common.cpp
+++ b/src/mongo/executor/task_executor_test_common.cpp
@@ -382,6 +382,135 @@ COMMON_EXECUTOR_TEST(EventSignalWithTimeoutTest) {
joinExecutorThread();
}
+COMMON_EXECUTOR_TEST(SleepUntilReturnsReadyFutureWithSuccessWhenDeadlineAlreadyPassed) {
+ NetworkInterfaceMock* net = getNet();
+ TaskExecutor& executor = getExecutor();
+ launchExecutorThread();
+
+ const Date_t now = net->now();
+
+ auto alarm = executor.sleepUntil(now, CancelationToken::uncancelable());
+ ASSERT(alarm.isReady());
+ ASSERT_OK(alarm.getNoThrow());
+ shutdownExecutorThread();
+ joinExecutorThread();
+}
+
+COMMON_EXECUTOR_TEST(
+ SleepUntilReturnsReadyFutureWithShutdownInProgressWhenExecutorAlreadyShutdown) {
+ NetworkInterfaceMock* net = getNet();
+ TaskExecutor& executor = getExecutor();
+ launchExecutorThread();
+ shutdownExecutorThread();
+ joinExecutorThread();
+
+ const Date_t now = net->now();
+ const Milliseconds sleepDuration{1000};
+ const auto deadline = now + sleepDuration;
+ auto alarm = executor.sleepUntil(deadline, CancelationToken::uncancelable());
+
+ ASSERT(alarm.isReady());
+ ASSERT_EQ(alarm.getNoThrow().code(), ErrorCodes::ShutdownInProgress);
+}
+
+COMMON_EXECUTOR_TEST(SleepUntilReturnsReadyFutureWithCallbackCanceledWhenTokenAlreadyCanceled) {
+ NetworkInterfaceMock* net = getNet();
+ TaskExecutor& executor = getExecutor();
+ const Date_t now = net->now();
+ const Milliseconds sleepDuration{1000};
+ const auto deadline = now + sleepDuration;
+ CancelationSource cancelSource;
+ cancelSource.cancel();
+ auto alarm = executor.sleepUntil(deadline, cancelSource.token());
+
+ ASSERT(alarm.isReady());
+ ASSERT_EQ(alarm.getNoThrow().code(), ErrorCodes::CallbackCanceled);
+}
+
+COMMON_EXECUTOR_TEST(
+ SleepUntilResolvesOutputFutureWithCallbackCanceledWhenTokenCanceledBeforeDeadline) {
+ NetworkInterfaceMock* net = getNet();
+ TaskExecutor& executor = getExecutor();
+ launchExecutorThread();
+
+ const Date_t now = net->now();
+ const Milliseconds sleepDuration{1000};
+ const auto deadline = now + sleepDuration;
+ CancelationSource cancelSource;
+ auto alarm = executor.sleepUntil(deadline, cancelSource.token());
+
+ ASSERT_FALSE(alarm.isReady());
+
+ net->enterNetwork();
+ // Run almost until the deadline. This isn't really necessary for the test since we could just
+ // skip running the clock at all, so just doing this for some "realism".
+ net->runUntil(deadline - Milliseconds(1));
+ net->exitNetwork();
+
+ // Cancel before deadline.
+ cancelSource.cancel();
+ // Required to process the cancelation.
+ net->enterNetwork();
+ net->exitNetwork();
+
+ ASSERT(alarm.isReady());
+ ASSERT_EQ(alarm.getNoThrow().code(), ErrorCodes::CallbackCanceled);
+
+ shutdownExecutorThread();
+ joinExecutorThread();
+}
+
+COMMON_EXECUTOR_TEST(
+ SleepUntilResolvesOutputFutureWithCallbackCanceledWhenExecutorShutsDownBeforeDeadline) {
+ NetworkInterfaceMock* net = getNet();
+ TaskExecutor& executor = getExecutor();
+ launchExecutorThread();
+
+ const Date_t now = net->now();
+ const Milliseconds sleepDuration{1000};
+ const auto deadline = now + sleepDuration;
+ CancelationSource cancelSource;
+ auto alarm = executor.sleepUntil(deadline, cancelSource.token());
+
+ ASSERT_FALSE(alarm.isReady());
+
+ net->enterNetwork();
+ // Run almost until the deadline. This isn't really necessary for the test since we could just
+ // skip running the clock at all, so just doing this for some "realism".
+ net->runUntil(deadline - Milliseconds(1));
+ net->exitNetwork();
+
+ // Shut down before deadline.
+ shutdownExecutorThread();
+ joinExecutorThread();
+
+ ASSERT(alarm.isReady());
+ ASSERT_EQ(alarm.getNoThrow().code(), ErrorCodes::CallbackCanceled);
+}
+
+COMMON_EXECUTOR_TEST(SleepUntilResolvesOutputFutureWithSuccessWhenDeadlinePasses) {
+ NetworkInterfaceMock* net = getNet();
+ TaskExecutor& executor = getExecutor();
+ launchExecutorThread();
+
+ const Date_t now = net->now();
+ const Milliseconds sleepDuration{1000};
+ const auto deadline = now + sleepDuration;
+
+ auto alarm = executor.sleepUntil(deadline, CancelationToken::uncancelable());
+ ASSERT_FALSE(alarm.isReady());
+
+ net->enterNetwork();
+ net->runUntil(deadline);
+ net->exitNetwork();
+
+ ASSERT(alarm.isReady());
+ ASSERT_OK(alarm.getNoThrow());
+
+ executor.shutdown();
+ joinExecutorThread();
+}
+
COMMON_EXECUTOR_TEST(ScheduleWorkAt) {
NetworkInterfaceMock* net = getNet();
TaskExecutor& executor = getExecutor();