summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Saltz <matthew.saltz@mongodb.com>2021-02-16 18:07:34 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2021-02-16 21:22:04 +0000
commitfefd2787643720bdf23f47558cbdcafd02e7452f (patch)
tree56d8f2b378a53568cfbea7d39a7fe752f13f97cc
parenta66d01165fecfd0e1cdca9808ae43d82d895d25e (diff)
downloadmongo-fefd2787643720bdf23f47558cbdcafd02e7452f.tar.gz
SERVER-54408 Implement AsyncTry-until without future recursion
-rw-r--r--src/mongo/util/future.h7
-rw-r--r--src/mongo/util/future_util.h137
-rw-r--r--src/mongo/util/future_util_test.cpp188
3 files changed, 284 insertions, 48 deletions
diff --git a/src/mongo/util/future.h b/src/mongo/util/future.h
index ad7bf140e4a..f325b572678 100644
--- a/src/mongo/util/future.h
+++ b/src/mongo/util/future.h
@@ -1200,7 +1200,12 @@ auto coerceToFuture(T&& value) {
TEMPLATE(typename Func)
REQUIRES(future_details::isCallable<Func, void>)
auto makeReadyFutureWith(Func&& func) -> Future<FutureContinuationResult<Func&&>> try {
- return std::forward<Func>(func)();
+ if constexpr (std::is_void_v<std::invoke_result_t<Func>>) {
+ std::forward<Func>(func)();
+ return Future<void>::makeReady();
+ } else {
+ return std::forward<Func>(func)();
+ }
} catch (const DBException& ex) {
return ex.toStatus();
}
diff --git a/src/mongo/util/future_util.h b/src/mongo/util/future_util.h
index af70290746c..25865caae89 100644
--- a/src/mongo/util/future_util.h
+++ b/src/mongo/util/future_util.h
@@ -56,25 +56,20 @@ inline Status asyncTryCanceledStatus() {
}
/**
- * Widget to get a default-constructible object that allows access to the type passed in at
- * compile time. Used for getReturnType below.
+ * Creates an ExecutorFuture from the result of the input callable.
*/
-template <typename T>
-struct DefaultConstructibleWrapper {
- using type = T;
-};
+template <typename Callable>
+auto makeExecutorFutureWith(ExecutorPtr executor, Callable&& callable) {
+ using CallableResult = std::invoke_result_t<Callable>;
-/**
- * Helper to get the return type of the loop body in TryUntilLoop/TryUntilLoopWithDelay. This is
- * required because the loop body may return a future-like type, which wraps another result type
- * (specified in Future<T>::value_type), or some other kind of raw type which can be used directly.
- */
-template <typename T>
-auto getReturnType() {
- if constexpr (future_details::isFutureLike<std::decay_t<T>>) {
- return DefaultConstructibleWrapper<typename T::value_type>();
+ if constexpr (future_details::isFutureLike<CallableResult>) {
+ try {
+ return callable().thenRunOn(executor);
+ } catch (const DBException& e) {
+ return ExecutorFuture<FutureContinuationResult<Callable>>(executor, e.toStatus());
+ }
} else {
- return DefaultConstructibleWrapper<T>();
+ return makeReadyFutureWith(callable).thenRunOn(executor);
}
}
@@ -132,24 +127,60 @@ private:
* Performs actual looping through recursion.
*/
ExecutorFuture<FutureContinuationResult<BodyCallable>> run() {
- using ReturnType =
- typename decltype(getReturnType<decltype(executeLoopBody())>())::type;
- // If the request to executeLoopBody has already been canceled, don't attempt to run it.
- if (cancelToken.isCanceled()) {
+ using ReturnType = FutureContinuationResult<BodyCallable>;
+
+ // If the request is already canceled, don't run anything.
+ if (cancelToken.isCanceled())
return ExecutorFuture<ReturnType>(executor, asyncTryCanceledStatus());
- }
- auto future = ExecutorFuture<void>(executor).then(executeLoopBody);
- return std::move(future).onCompletion(
- [this, self = this->shared_from_this()](StatusOrStatusWith<ReturnType> s) {
- if (shouldStopIteration(s))
- return ExecutorFuture<ReturnType>(executor, std::move(s));
+ auto [promise, future] = makePromiseFuture<ReturnType>();
+
+ // Kick off the asynchronous loop.
+ runImpl(std::move(promise));
+
+ return std::move(future).thenRunOn(executor);
+ }
+
+
+ /**
+ * Helper function that schedules an asynchronous task. This task executes the loop body and
+ * either terminates the loop by emplacing the resultPromise, or makes a recursive call to
+ * reschedule another iteration of the loop.
+ */
+ template <typename ReturnType>
+ void runImpl(Promise<ReturnType> resultPromise) {
+ executor->schedule([this,
+ self = this->shared_from_this(),
+ resultPromise =
+ std::move(resultPromise)](Status scheduleStatus) mutable {
+ if (!scheduleStatus.isOK()) {
+ resultPromise.setError(std::move(scheduleStatus));
+ return;
+ }
- // Retry after a delay.
- return executor->sleepFor(delay.getNext(), cancelToken).then([this, self] {
- return run();
+ using BodyCallableResult = std::invoke_result_t<BodyCallable>;
+ // Convert the result of the loop body into an ExecutorFuture, even if the
+ // loop body is not future-returning. This isn't strictly necessary but it
+ // makes implementation easier.
+ makeExecutorFutureWith(executor, executeLoopBody)
+ .getAsync([this, self, resultPromise = std::move(resultPromise)](
+ StatusOrStatusWith<ReturnType>&& swResult) mutable {
+ if (cancelToken.isCanceled()) {
+ resultPromise.setError(asyncTryCanceledStatus());
+ } else if (shouldStopIteration(swResult)) {
+ resultPromise.setFrom(std::move(swResult));
+ } else {
+ // Retry after a delay.
+ executor->sleepFor(delay.getNext(), cancelToken)
+ .getAsync([this, self, resultPromise = std::move(resultPromise)](
+ Status s) mutable {
+ if (s.isOK()) {
+ runImpl(std::move(resultPromise));
+ }
+ });
+ }
});
- });
+ });
}
std::shared_ptr<executor::TaskExecutor> executor;
@@ -258,19 +289,49 @@ private:
* Performs actual looping through recursion.
*/
ExecutorFuture<FutureContinuationResult<BodyCallable>> run() {
- using ReturnType =
- typename decltype(getReturnType<decltype(executeLoopBody())>())::type;
+ using ReturnType = FutureContinuationResult<BodyCallable>;
+
// If the request is already canceled, don't run anything.
if (cancelToken.isCanceled())
return ExecutorFuture<ReturnType>(executor, asyncTryCanceledStatus());
- auto future = ExecutorFuture<void>(executor).then(executeLoopBody);
- return std::move(future).onCompletion(
- [this, self = this->shared_from_this()](StatusOrStatusWith<ReturnType> s) {
- if (shouldStopIteration(s))
- return ExecutorFuture<ReturnType>(executor, std::move(s));
+ auto [promise, future] = makePromiseFuture<ReturnType>();
+
+ // Kick off the asynchronous loop.
+ runImpl(std::move(promise));
+
+ return std::move(future).thenRunOn(executor);
+ }
+
+ /**
+ * Helper function that schedules an asynchronous task. This task executes the loop body and
+ * either terminates the loop by emplacing the resultPromise, or makes a recursive call to
+ * reschedule another iteration of the loop.
+ */
+ template <typename ReturnType>
+ void runImpl(Promise<ReturnType> resultPromise) {
+ executor->schedule(
+ [this, self = this->shared_from_this(), resultPromise = std::move(resultPromise)](
+ Status scheduleStatus) mutable {
+ if (!scheduleStatus.isOK()) {
+ resultPromise.setError(std::move(scheduleStatus));
+ return;
+ }
- return run();
+ // Convert the result of the loop body into an ExecutorFuture, even if the
+ // loop body is not Future-returning. This isn't strictly necessary but it
+ // makes implementation easier.
+ makeExecutorFutureWith(executor, executeLoopBody)
+ .getAsync([this, self, resultPromise = std::move(resultPromise)](
+ StatusOrStatusWith<ReturnType>&& swResult) mutable {
+ if (cancelToken.isCanceled()) {
+ resultPromise.setError(asyncTryCanceledStatus());
+ } else if (shouldStopIteration(swResult)) {
+ resultPromise.setFrom(std::move(swResult));
+ } else {
+ runImpl(std::move(resultPromise));
+ }
+ });
});
}
diff --git a/src/mongo/util/future_util_test.cpp b/src/mongo/util/future_util_test.cpp
index 05325e6c7c8..824591dccbc 100644
--- a/src/mongo/util/future_util_test.cpp
+++ b/src/mongo/util/future_util_test.cpp
@@ -35,6 +35,7 @@
#include "mongo/executor/thread_pool_task_executor_test_fixture.h"
#include "mongo/unittest/barrier.h"
#include "mongo/unittest/death_test.h"
+#include "mongo/unittest/thread_assertion_monitor.h"
#include "mongo/util/future_util.h"
namespace mongo {
@@ -95,6 +96,33 @@ TEST_F(AsyncTryUntilTest, LoopExecutesOnceWithAlwaysTrueCondition) {
ASSERT_EQ(i, 1);
}
+TEST_F(AsyncTryUntilTest, LoopDoesNotExecuteIfExecutorAlreadyShutdown) {
+ executor()->shutdown();
+
+ auto i = 0;
+ auto resultFut = AsyncTry([&] { ++i; })
+ .until([](Status s) { return true; })
+ .on(executor(), CancelationToken::uncancelable());
+
+ ASSERT_THROWS_CODE(resultFut.get(), DBException, ErrorCodes::ShutdownInProgress);
+
+ ASSERT_EQ(i, 0);
+}
+
+TEST_F(AsyncTryUntilTest, LoopWithDelayDoesNotExecuteIfExecutorAlreadyShutdown) {
+ executor()->shutdown();
+
+ auto i = 0;
+ auto resultFut = AsyncTry([&] { ++i; })
+ .until([](Status s) { return true; })
+ .withDelayBetweenIterations(Milliseconds(0))
+ .on(executor(), CancelationToken::uncancelable());
+
+ ASSERT_THROWS_CODE(resultFut.get(), DBException, ErrorCodes::ShutdownInProgress);
+
+ ASSERT_EQ(i, 0);
+}
+
TEST_F(AsyncTryUntilTest, LoopExecutesUntilConditionIsTrue) {
const int numLoops = 3;
auto i = 0;
@@ -109,6 +137,48 @@ TEST_F(AsyncTryUntilTest, LoopExecutesUntilConditionIsTrue) {
ASSERT_EQ(i, numLoops);
}
+TEST_F(AsyncTryUntilTest, LoopExecutesUntilConditionIsTrueWithFutureReturnType) {
+ const int numLoops = 3;
+ auto i = 0;
+ auto resultFut = AsyncTry([&] {
+ ++i;
+ return Future<int>::makeReady(i);
+ })
+ .until([&](StatusWith<int> swInt) { return swInt.getValue() == numLoops; })
+ .on(executor(), CancelationToken::uncancelable());
+ resultFut.wait();
+
+ ASSERT_EQ(i, numLoops);
+}
+
+TEST_F(AsyncTryUntilTest, LoopExecutesUntilConditionIsTrueWithSemiFutureReturnType) {
+ const int numLoops = 3;
+ auto i = 0;
+ auto resultFut = AsyncTry([&] {
+ ++i;
+ return SemiFuture<int>::makeReady(i);
+ })
+ .until([&](StatusWith<int> swInt) { return swInt.getValue() == numLoops; })
+ .on(executor(), CancelationToken::uncancelable());
+ resultFut.wait();
+
+ ASSERT_EQ(i, numLoops);
+}
+
+TEST_F(AsyncTryUntilTest, LoopExecutesUntilConditionIsTrueWithExecutorFutureReturnType) {
+ const int numLoops = 3;
+ auto i = 0;
+ auto resultFut = AsyncTry([&] {
+ ++i;
+ return ExecutorFuture<int>(executor(), i);
+ })
+ .until([&](StatusWith<int> swInt) { return swInt.getValue() == numLoops; })
+ .on(executor(), CancelationToken::uncancelable());
+ resultFut.wait();
+
+ ASSERT_EQ(i, numLoops);
+}
+
TEST_F(AsyncTryUntilTest, LoopDoesNotRespectConstDelayIfConditionIsAlreadyTrue) {
auto i = 0;
auto resultFut = AsyncTry([&] { ++i; })
@@ -227,19 +297,119 @@ TEST_F(AsyncTryUntilTest, LoopBodyPropagatesValueOfLastIterationToCaller) {
ASSERT_EQ(resultFut.get(), expectedResult);
}
-TEST_F(AsyncTryUntilTest, LoopBodyPropagatesErrorToConditionAndCaller) {
+TEST_F(AsyncTryUntilTest, FutureReturningLoopBodyPropagatesValueOfLastIterationToCaller) {
+ auto i = 0;
+ auto expectedResult = 3;
auto resultFut = AsyncTry([&] {
- uasserted(ErrorCodes::InternalError, "test error");
- return 3;
+ ++i;
+ return Future<int>::makeReady(i);
})
- .until([&](StatusWith<int> swInt) {
- ASSERT_NOT_OK(swInt);
- ASSERT_EQ(swInt.getStatus().code(), ErrorCodes::InternalError);
- return true;
- })
+ .until([&](StatusWith<int> swInt) { return i == expectedResult; })
+ .on(executor(), CancelationToken::uncancelable());
+
+ ASSERT_EQ(resultFut.get(), expectedResult);
+}
+
+TEST_F(AsyncTryUntilTest, SemiFutureReturningLoopBodyPropagatesValueOfLastIterationToCaller) {
+ auto i = 0;
+ auto expectedResult = 3;
+ auto resultFut = AsyncTry([&] {
+ ++i;
+ return SemiFuture<int>::makeReady(i);
+ })
+ .until([&](StatusWith<int> swInt) { return i == expectedResult; })
+ .on(executor(), CancelationToken::uncancelable());
+
+ ASSERT_EQ(resultFut.get(), expectedResult);
+}
+
+TEST_F(AsyncTryUntilTest, ExecutorFutureReturningLoopBodyPropagatesValueOfLastIterationToCaller) {
+ auto i = 0;
+ auto expectedResult = 3;
+ auto resultFut = AsyncTry([&] {
+ ++i;
+ return ExecutorFuture<int>(executor(), i);
+ })
+ .until([&](StatusWith<int> swInt) { return i == expectedResult; })
.on(executor(), CancelationToken::uncancelable());
- ASSERT_EQ(resultFut.getNoThrow(), ErrorCodes::InternalError);
+ ASSERT_EQ(resultFut.get(), expectedResult);
+}
+
+TEST_F(AsyncTryUntilTest, LoopBodyPropagatesErrorToConditionAndCaller) {
+ unittest::threadAssertionMonitoredTest([&](auto& assertionMonitor) {
+ auto resultFut = AsyncTry<std::function<void()>>([&] {
+ uasserted(ErrorCodes::InternalError, "test error");
+ return 3;
+ })
+ .until([&](StatusWith<int> swInt) {
+ assertionMonitor.exec([&] {
+ ASSERT_NOT_OK(swInt);
+ ASSERT_EQ(swInt.getStatus().code(), ErrorCodes::InternalError);
+ });
+ return true;
+ })
+ .on(executor(), CancelationToken::uncancelable());
+
+ ASSERT_EQ(resultFut.getNoThrow(), ErrorCodes::InternalError);
+ });
+}
+
+TEST_F(AsyncTryUntilTest, FutureReturningLoopBodyPropagatesErrorToConditionAndCaller) {
+ unittest::threadAssertionMonitoredTest([&](auto& assertionMonitor) {
+ auto resultFut = AsyncTry<std::function<void()>>([&] {
+ uasserted(ErrorCodes::InternalError, "test error");
+ return Future<int>::makeReady(3);
+ })
+ .until([&](StatusWith<int> swInt) {
+ assertionMonitor.exec([&] {
+ ASSERT_NOT_OK(swInt);
+ ASSERT_EQ(swInt.getStatus().code(), ErrorCodes::InternalError);
+ });
+ return true;
+ })
+ .on(executor(), CancelationToken::uncancelable());
+
+ ASSERT_EQ(resultFut.getNoThrow(), ErrorCodes::InternalError);
+ });
+}
+
+TEST_F(AsyncTryUntilTest, SemiFutureReturningLoopBodyPropagatesErrorToConditionAndCaller) {
+ unittest::threadAssertionMonitoredTest([&](auto& assertionMonitor) {
+ auto resultFut = AsyncTry<std::function<void()>>([&] {
+ uasserted(ErrorCodes::InternalError, "test error");
+ return SemiFuture<int>::makeReady(3);
+ })
+ .until([&](StatusWith<int> swInt) {
+ assertionMonitor.exec([&] {
+ ASSERT_NOT_OK(swInt);
+ ASSERT_EQ(swInt.getStatus().code(), ErrorCodes::InternalError);
+ });
+ return true;
+ })
+ .on(executor(), CancelationToken::uncancelable());
+
+ ASSERT_EQ(resultFut.getNoThrow(), ErrorCodes::InternalError);
+ });
+}
+
+TEST_F(AsyncTryUntilTest, ExecutorFutureReturningLoopBodyPropagatesErrorToConditionAndCaller) {
+ unittest::threadAssertionMonitoredTest([&](auto& assertionMonitor) {
+ auto resultFut = AsyncTry<std::function<void()>>([&] {
+ uasserted(ErrorCodes::InternalError, "test error");
+ return ExecutorFuture<int>(executor(), 3);
+ })
+ .until([&](StatusWith<int> swInt) {
+ assertionMonitor.exec([&] {
+ ASSERT_NOT_OK(swInt);
+ ASSERT_EQ(swInt.getStatus().code(), ErrorCodes::InternalError);
+ });
+ return true;
+ })
+ .on(executor(), CancelationToken::uncancelable());
+
+ ASSERT_EQ(resultFut.getNoThrow(), ErrorCodes::InternalError);
+ });
}
static const Status kCanceledStatus = {ErrorCodes::CallbackCanceled, "AsyncTry::until canceled"};