From fefd2787643720bdf23f47558cbdcafd02e7452f Mon Sep 17 00:00:00 2001 From: Matthew Saltz Date: Tue, 16 Feb 2021 18:07:34 +0000 Subject: SERVER-54408 Implement AsyncTry-until without future recursion --- src/mongo/util/future.h | 7 +- src/mongo/util/future_util.h | 137 ++++++++++++++++++-------- src/mongo/util/future_util_test.cpp | 188 ++++++++++++++++++++++++++++++++++-- 3 files changed, 284 insertions(+), 48 deletions(-) diff --git a/src/mongo/util/future.h b/src/mongo/util/future.h index ad7bf140e4a..f325b572678 100644 --- a/src/mongo/util/future.h +++ b/src/mongo/util/future.h @@ -1200,7 +1200,12 @@ auto coerceToFuture(T&& value) { TEMPLATE(typename Func) REQUIRES(future_details::isCallable) auto makeReadyFutureWith(Func&& func) -> Future> try { - return std::forward(func)(); + if constexpr (std::is_void_v>) { + std::forward(func)(); + return Future::makeReady(); + } else { + return std::forward(func)(); + } } catch (const DBException& ex) { return ex.toStatus(); } diff --git a/src/mongo/util/future_util.h b/src/mongo/util/future_util.h index af70290746c..25865caae89 100644 --- a/src/mongo/util/future_util.h +++ b/src/mongo/util/future_util.h @@ -56,25 +56,20 @@ inline Status asyncTryCanceledStatus() { } /** - * Widget to get a default-constructible object that allows access to the type passed in at - * compile time. Used for getReturnType below. + * Creates an ExecutorFuture from the result of the input callable. */ -template -struct DefaultConstructibleWrapper { - using type = T; -}; +template +auto makeExecutorFutureWith(ExecutorPtr executor, Callable&& callable) { + using CallableResult = std::invoke_result_t; -/** - * Helper to get the return type of the loop body in TryUntilLoop/TryUntilLoopWithDelay. This is - * required because the loop body may return a future-like type, which wraps another result type - * (specified in Future::value_type), or some other kind of raw type which can be used directly. - */ -template -auto getReturnType() { - if constexpr (future_details::isFutureLike>) { - return DefaultConstructibleWrapper(); + if constexpr (future_details::isFutureLike) { + try { + return callable().thenRunOn(executor); + } catch (const DBException& e) { + return ExecutorFuture>(executor, e.toStatus()); + } } else { - return DefaultConstructibleWrapper(); + return makeReadyFutureWith(callable).thenRunOn(executor); } } @@ -132,24 +127,60 @@ private: * Performs actual looping through recursion. */ ExecutorFuture> run() { - using ReturnType = - typename decltype(getReturnType())::type; - // If the request to executeLoopBody has already been canceled, don't attempt to run it. - if (cancelToken.isCanceled()) { + using ReturnType = FutureContinuationResult; + + // If the request is already canceled, don't run anything. + if (cancelToken.isCanceled()) return ExecutorFuture(executor, asyncTryCanceledStatus()); - } - auto future = ExecutorFuture(executor).then(executeLoopBody); - return std::move(future).onCompletion( - [this, self = this->shared_from_this()](StatusOrStatusWith s) { - if (shouldStopIteration(s)) - return ExecutorFuture(executor, std::move(s)); + auto [promise, future] = makePromiseFuture(); + + // Kick off the asynchronous loop. + runImpl(std::move(promise)); + + return std::move(future).thenRunOn(executor); + } + + + /** + * Helper function that schedules an asynchronous task. This task executes the loop body and + * either terminates the loop by emplacing the resultPromise, or makes a recursive call to + * reschedule another iteration of the loop. + */ + template + void runImpl(Promise resultPromise) { + executor->schedule([this, + self = this->shared_from_this(), + resultPromise = + std::move(resultPromise)](Status scheduleStatus) mutable { + if (!scheduleStatus.isOK()) { + resultPromise.setError(std::move(scheduleStatus)); + return; + } - // Retry after a delay. - return executor->sleepFor(delay.getNext(), cancelToken).then([this, self] { - return run(); + using BodyCallableResult = std::invoke_result_t; + // Convert the result of the loop body into an ExecutorFuture, even if the + // loop body is not future-returning. This isn't strictly necessary but it + // makes implementation easier. + makeExecutorFutureWith(executor, executeLoopBody) + .getAsync([this, self, resultPromise = std::move(resultPromise)]( + StatusOrStatusWith&& swResult) mutable { + if (cancelToken.isCanceled()) { + resultPromise.setError(asyncTryCanceledStatus()); + } else if (shouldStopIteration(swResult)) { + resultPromise.setFrom(std::move(swResult)); + } else { + // Retry after a delay. + executor->sleepFor(delay.getNext(), cancelToken) + .getAsync([this, self, resultPromise = std::move(resultPromise)]( + Status s) mutable { + if (s.isOK()) { + runImpl(std::move(resultPromise)); + } + }); + } }); - }); + }); } std::shared_ptr executor; @@ -258,19 +289,49 @@ private: * Performs actual looping through recursion. */ ExecutorFuture> run() { - using ReturnType = - typename decltype(getReturnType())::type; + using ReturnType = FutureContinuationResult; + // If the request is already canceled, don't run anything. if (cancelToken.isCanceled()) return ExecutorFuture(executor, asyncTryCanceledStatus()); - auto future = ExecutorFuture(executor).then(executeLoopBody); - return std::move(future).onCompletion( - [this, self = this->shared_from_this()](StatusOrStatusWith s) { - if (shouldStopIteration(s)) - return ExecutorFuture(executor, std::move(s)); + auto [promise, future] = makePromiseFuture(); + + // Kick off the asynchronous loop. + runImpl(std::move(promise)); + + return std::move(future).thenRunOn(executor); + } + + /** + * Helper function that schedules an asynchronous task. This task executes the loop body and + * either terminates the loop by emplacing the resultPromise, or makes a recursive call to + * reschedule another iteration of the loop. + */ + template + void runImpl(Promise resultPromise) { + executor->schedule( + [this, self = this->shared_from_this(), resultPromise = std::move(resultPromise)]( + Status scheduleStatus) mutable { + if (!scheduleStatus.isOK()) { + resultPromise.setError(std::move(scheduleStatus)); + return; + } - return run(); + // Convert the result of the loop body into an ExecutorFuture, even if the + // loop body is not Future-returning. This isn't strictly necessary but it + // makes implementation easier. + makeExecutorFutureWith(executor, executeLoopBody) + .getAsync([this, self, resultPromise = std::move(resultPromise)]( + StatusOrStatusWith&& swResult) mutable { + if (cancelToken.isCanceled()) { + resultPromise.setError(asyncTryCanceledStatus()); + } else if (shouldStopIteration(swResult)) { + resultPromise.setFrom(std::move(swResult)); + } else { + runImpl(std::move(resultPromise)); + } + }); }); } diff --git a/src/mongo/util/future_util_test.cpp b/src/mongo/util/future_util_test.cpp index 05325e6c7c8..824591dccbc 100644 --- a/src/mongo/util/future_util_test.cpp +++ b/src/mongo/util/future_util_test.cpp @@ -35,6 +35,7 @@ #include "mongo/executor/thread_pool_task_executor_test_fixture.h" #include "mongo/unittest/barrier.h" #include "mongo/unittest/death_test.h" +#include "mongo/unittest/thread_assertion_monitor.h" #include "mongo/util/future_util.h" namespace mongo { @@ -95,6 +96,33 @@ TEST_F(AsyncTryUntilTest, LoopExecutesOnceWithAlwaysTrueCondition) { ASSERT_EQ(i, 1); } +TEST_F(AsyncTryUntilTest, LoopDoesNotExecuteIfExecutorAlreadyShutdown) { + executor()->shutdown(); + + auto i = 0; + auto resultFut = AsyncTry([&] { ++i; }) + .until([](Status s) { return true; }) + .on(executor(), CancelationToken::uncancelable()); + + ASSERT_THROWS_CODE(resultFut.get(), DBException, ErrorCodes::ShutdownInProgress); + + ASSERT_EQ(i, 0); +} + +TEST_F(AsyncTryUntilTest, LoopWithDelayDoesNotExecuteIfExecutorAlreadyShutdown) { + executor()->shutdown(); + + auto i = 0; + auto resultFut = AsyncTry([&] { ++i; }) + .until([](Status s) { return true; }) + .withDelayBetweenIterations(Milliseconds(0)) + .on(executor(), CancelationToken::uncancelable()); + + ASSERT_THROWS_CODE(resultFut.get(), DBException, ErrorCodes::ShutdownInProgress); + + ASSERT_EQ(i, 0); +} + TEST_F(AsyncTryUntilTest, LoopExecutesUntilConditionIsTrue) { const int numLoops = 3; auto i = 0; @@ -109,6 +137,48 @@ TEST_F(AsyncTryUntilTest, LoopExecutesUntilConditionIsTrue) { ASSERT_EQ(i, numLoops); } +TEST_F(AsyncTryUntilTest, LoopExecutesUntilConditionIsTrueWithFutureReturnType) { + const int numLoops = 3; + auto i = 0; + auto resultFut = AsyncTry([&] { + ++i; + return Future::makeReady(i); + }) + .until([&](StatusWith swInt) { return swInt.getValue() == numLoops; }) + .on(executor(), CancelationToken::uncancelable()); + resultFut.wait(); + + ASSERT_EQ(i, numLoops); +} + +TEST_F(AsyncTryUntilTest, LoopExecutesUntilConditionIsTrueWithSemiFutureReturnType) { + const int numLoops = 3; + auto i = 0; + auto resultFut = AsyncTry([&] { + ++i; + return SemiFuture::makeReady(i); + }) + .until([&](StatusWith swInt) { return swInt.getValue() == numLoops; }) + .on(executor(), CancelationToken::uncancelable()); + resultFut.wait(); + + ASSERT_EQ(i, numLoops); +} + +TEST_F(AsyncTryUntilTest, LoopExecutesUntilConditionIsTrueWithExecutorFutureReturnType) { + const int numLoops = 3; + auto i = 0; + auto resultFut = AsyncTry([&] { + ++i; + return ExecutorFuture(executor(), i); + }) + .until([&](StatusWith swInt) { return swInt.getValue() == numLoops; }) + .on(executor(), CancelationToken::uncancelable()); + resultFut.wait(); + + ASSERT_EQ(i, numLoops); +} + TEST_F(AsyncTryUntilTest, LoopDoesNotRespectConstDelayIfConditionIsAlreadyTrue) { auto i = 0; auto resultFut = AsyncTry([&] { ++i; }) @@ -227,19 +297,119 @@ TEST_F(AsyncTryUntilTest, LoopBodyPropagatesValueOfLastIterationToCaller) { ASSERT_EQ(resultFut.get(), expectedResult); } -TEST_F(AsyncTryUntilTest, LoopBodyPropagatesErrorToConditionAndCaller) { +TEST_F(AsyncTryUntilTest, FutureReturningLoopBodyPropagatesValueOfLastIterationToCaller) { + auto i = 0; + auto expectedResult = 3; auto resultFut = AsyncTry([&] { - uasserted(ErrorCodes::InternalError, "test error"); - return 3; + ++i; + return Future::makeReady(i); }) - .until([&](StatusWith swInt) { - ASSERT_NOT_OK(swInt); - ASSERT_EQ(swInt.getStatus().code(), ErrorCodes::InternalError); - return true; - }) + .until([&](StatusWith swInt) { return i == expectedResult; }) + .on(executor(), CancelationToken::uncancelable()); + + ASSERT_EQ(resultFut.get(), expectedResult); +} + +TEST_F(AsyncTryUntilTest, SemiFutureReturningLoopBodyPropagatesValueOfLastIterationToCaller) { + auto i = 0; + auto expectedResult = 3; + auto resultFut = AsyncTry([&] { + ++i; + return SemiFuture::makeReady(i); + }) + .until([&](StatusWith swInt) { return i == expectedResult; }) + .on(executor(), CancelationToken::uncancelable()); + + ASSERT_EQ(resultFut.get(), expectedResult); +} + +TEST_F(AsyncTryUntilTest, ExecutorFutureReturningLoopBodyPropagatesValueOfLastIterationToCaller) { + auto i = 0; + auto expectedResult = 3; + auto resultFut = AsyncTry([&] { + ++i; + return ExecutorFuture(executor(), i); + }) + .until([&](StatusWith swInt) { return i == expectedResult; }) .on(executor(), CancelationToken::uncancelable()); - ASSERT_EQ(resultFut.getNoThrow(), ErrorCodes::InternalError); + ASSERT_EQ(resultFut.get(), expectedResult); +} + +TEST_F(AsyncTryUntilTest, LoopBodyPropagatesErrorToConditionAndCaller) { + unittest::threadAssertionMonitoredTest([&](auto& assertionMonitor) { + auto resultFut = AsyncTry>([&] { + uasserted(ErrorCodes::InternalError, "test error"); + return 3; + }) + .until([&](StatusWith swInt) { + assertionMonitor.exec([&] { + ASSERT_NOT_OK(swInt); + ASSERT_EQ(swInt.getStatus().code(), ErrorCodes::InternalError); + }); + return true; + }) + .on(executor(), CancelationToken::uncancelable()); + + ASSERT_EQ(resultFut.getNoThrow(), ErrorCodes::InternalError); + }); +} + +TEST_F(AsyncTryUntilTest, FutureReturningLoopBodyPropagatesErrorToConditionAndCaller) { + unittest::threadAssertionMonitoredTest([&](auto& assertionMonitor) { + auto resultFut = AsyncTry>([&] { + uasserted(ErrorCodes::InternalError, "test error"); + return Future::makeReady(3); + }) + .until([&](StatusWith swInt) { + assertionMonitor.exec([&] { + ASSERT_NOT_OK(swInt); + ASSERT_EQ(swInt.getStatus().code(), ErrorCodes::InternalError); + }); + return true; + }) + .on(executor(), CancelationToken::uncancelable()); + + ASSERT_EQ(resultFut.getNoThrow(), ErrorCodes::InternalError); + }); +} + +TEST_F(AsyncTryUntilTest, SemiFutureReturningLoopBodyPropagatesErrorToConditionAndCaller) { + unittest::threadAssertionMonitoredTest([&](auto& assertionMonitor) { + auto resultFut = AsyncTry>([&] { + uasserted(ErrorCodes::InternalError, "test error"); + return SemiFuture::makeReady(3); + }) + .until([&](StatusWith swInt) { + assertionMonitor.exec([&] { + ASSERT_NOT_OK(swInt); + ASSERT_EQ(swInt.getStatus().code(), ErrorCodes::InternalError); + }); + return true; + }) + .on(executor(), CancelationToken::uncancelable()); + + ASSERT_EQ(resultFut.getNoThrow(), ErrorCodes::InternalError); + }); +} + +TEST_F(AsyncTryUntilTest, ExecutorFutureReturningLoopBodyPropagatesErrorToConditionAndCaller) { + unittest::threadAssertionMonitoredTest([&](auto& assertionMonitor) { + auto resultFut = AsyncTry>([&] { + uasserted(ErrorCodes::InternalError, "test error"); + return ExecutorFuture(executor(), 3); + }) + .until([&](StatusWith swInt) { + assertionMonitor.exec([&] { + ASSERT_NOT_OK(swInt); + ASSERT_EQ(swInt.getStatus().code(), ErrorCodes::InternalError); + }); + return true; + }) + .on(executor(), CancelationToken::uncancelable()); + + ASSERT_EQ(resultFut.getNoThrow(), ErrorCodes::InternalError); + }); } static const Status kCanceledStatus = {ErrorCodes::CallbackCanceled, "AsyncTry::until canceled"}; -- cgit v1.2.1