/** * Copyright (C) 2019-present MongoDB, Inc. * * This program is free software: you can redistribute it and/or modify * it under the terms of the Server Side Public License, version 1, * as published by MongoDB, Inc. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * Server Side Public License for more details. * * You should have received a copy of the Server Side Public License * along with this program. If not, see * . * * As a special exception, the copyright holders give permission to link the * code of portions of this program with the OpenSSL library under certain * conditions as described in each individual source file and distribute * linked combinations including the program with the OpenSSL library. You * must comply with the Server Side Public License in all respects for * all of the code used other than as permitted herein. If you modify file(s) * with this exception, you may extend this exception to your version of the * file(s), but you are not obligated to do so. If you do not wish to do so, * delete this exception statement from your version. If you delete this * exception statement from all source files in the program, then also delete * it in the license file. */ #pragma once #include "mongo/executor/task_executor.h" #include "mongo/platform/compiler.h" #include "mongo/util/assert_util.h" #include "mongo/util/future.h" #include "mongo/util/static_immortal.h" namespace mongo { /** * Returns a future which will be fulfilled at the given date. */ ExecutorFuture sleepUntil(std::shared_ptr executor, const Date_t& date); /** * Returns a future which will be fulfilled after the given duration. */ ExecutorFuture sleepFor(std::shared_ptr executor, Milliseconds duration); namespace future_util_details { /** * Error status to use if any AsyncTry loop has been canceled. */ inline Status asyncTryCanceledStatus() { static StaticImmortal s = Status{ErrorCodes::CallbackCanceled, "AsyncTry loop canceled"}; return *s; } /** * Creates an ExecutorFuture from the result of the input callable. */ template auto makeExecutorFutureWith(ExecutorPtr executor, Callable&& callable) { using CallableResult = std::invoke_result_t; if constexpr (future_details::isFutureLike) { try { return callable().thenRunOn(executor); } catch (const DBException& e) { return ExecutorFuture>(executor, e.toStatus()); } } else { return makeReadyFutureWith(callable).thenRunOn(executor); } } /** * Wraps a `Promise` and allows replacing the default `BrokenPromise` error with a custom status. * * Consider the following before using or making changes to this class: * * This type is marked non-movable for simplicity, but that can be done using `std::unique_ptr`. * * This is wrapping and not extending `Promise` to avoid any (performance) impact on `Promise`. * * There is no requirement for `_broken` to be thread-safe when this type is used appropriately. */ template class PromiseWithCustomBrokenStatus { public: PromiseWithCustomBrokenStatus() = delete; PromiseWithCustomBrokenStatus(PromiseWithCustomBrokenStatus&&) = delete; PromiseWithCustomBrokenStatus(const PromiseWithCustomBrokenStatus&) = delete; PromiseWithCustomBrokenStatus(Promise promise, Status status) : _promise(std::move(promise)), _status(std::move(status)) { invariant(!_status.isOK()); } ~PromiseWithCustomBrokenStatus() { if (_broken) { _promise.setError(_status); } } template void setFrom(ResultType value) { _broken = false; _promise.setFrom(std::move(value)); } void setError(Status status) { _broken = false; _promise.setError(std::move(status)); } private: bool _broken = true; Promise _promise; const Status _status; }; /** * Represents an intermediate state which holds the body, condition, and delay between iterations of * a try-until loop. See comments for AsyncTry for usage. * * Note: This is a helper class and is not intended for standalone usage. */ template class [[nodiscard]] AsyncTryUntilWithDelay { public: explicit AsyncTryUntilWithDelay( BodyCallable && body, ConditionCallable && condition, Delay delay) : _body(std::move(body)), _condition(std::move(condition)), _delay(delay) {} /** * Launches the loop and returns an ExecutorFuture that will be resolved when the loop is * complete. If the executor is already shut down or the cancelToken has already been canceled * before the loop is launched, the loop body will never run and the resulting ExecutorFuture * will be set with either a ShutdownInProgress or CallbackCanceled error. * * The returned ExecutorFuture contains the last result returned by the loop body. If the last * iteration of the loop body threw an exception or otherwise returned an error status, the * returned ExecutorFuture will contain that error. */ auto on(std::shared_ptr executor, CancellationToken cancelToken)&& { auto loop = std::make_shared(std::move(executor), std::move(_body), std::move(_condition), std::move(_delay), std::move(cancelToken)); // Launch the recursive chain using the helper class. return loop->run(); } private: /** * Helper class to perform the actual looping logic with a recursive member function run(). * Mostly needed to clean up lambda captures and make the looping logic more readable. */ struct TryUntilLoopWithDelay : public std::enable_shared_from_this { TryUntilLoopWithDelay(std::shared_ptr executor, BodyCallable executeLoopBody, ConditionCallable shouldStopIteration, Delay delay, CancellationToken cancelToken) : executor(std::move(executor)), executeLoopBody(std::move(executeLoopBody)), shouldStopIteration(std::move(shouldStopIteration)), delay(std::move(delay)), cancelToken(std::move(cancelToken)) {} /** * Performs actual looping through recursion. */ ExecutorFuture> run() { using ReturnType = FutureContinuationResult; // If the request is already canceled, don't run anything. if (cancelToken.isCanceled()) return ExecutorFuture(executor, asyncTryCanceledStatus()); Promise promise{NonNullPromiseTag{}}; auto future = promise.getFuture(); auto wrappedPromise = std::make_unique>( std::move(promise), Status(ErrorCodes::ShutdownInProgress, "Terminated loop due to executor shutdown")); // Kick off the asynchronous loop. runImpl(std::move(wrappedPromise)); return std::move(future).thenRunOn(executor); } /** * Helper function that schedules an asynchronous task. This task executes the loop body and * either terminates the loop by emplacing the resultPromise, or makes a recursive call to * reschedule another iteration of the loop. */ template void runImpl(std::unique_ptr> resultPromise) { executor->schedule([this, self = this->shared_from_this(), resultPromise = std::move(resultPromise)](Status scheduleStatus) mutable { if (!scheduleStatus.isOK()) { resultPromise->setError(std::move(scheduleStatus)); return; } using BodyCallableResult = std::invoke_result_t; // Convert the result of the loop body into an ExecutorFuture, even if the // loop body is not future-returning. This isn't strictly necessary but it // makes implementation easier. makeExecutorFutureWith(executor, executeLoopBody) .getAsync([this, self, resultPromise = std::move(resultPromise)]( StatusOrStatusWith&& swResult) mutable { if (cancelToken.isCanceled()) { resultPromise->setError(asyncTryCanceledStatus()); return; } const auto swShouldStop = [&]() -> StatusWith { try { return shouldStopIteration(swResult); } catch (...) { return exceptionToStatus(); } }(); if (MONGO_unlikely(!swShouldStop.isOK())) { resultPromise->setError(swShouldStop.getStatus()); } else if (swShouldStop.getValue()) { resultPromise->setFrom(std::move(swResult)); } else { // Retry after a delay. executor->sleepFor(delay.getNext(), cancelToken) .getAsync([this, self, resultPromise = std::move(resultPromise)]( Status s) mutable { // Prevent another loop iteration when cancellation happens // after loop body if (s.isOK()) { runImpl(std::move(resultPromise)); } else { resultPromise->setError(std::move(s)); } }); } }); }); } std::shared_ptr executor; BodyCallable executeLoopBody; ConditionCallable shouldStopIteration; Delay delay; CancellationToken cancelToken; }; BodyCallable _body; ConditionCallable _condition; Delay _delay; }; /** * Represents an intermediate state which holds the body and condition of a try-until loop. See * comments for AsyncTry for usage. * * Note: This is a helper class and is not intended for standalone usage. */ template class [[nodiscard]] AsyncTryUntil { public: explicit AsyncTryUntil(BodyCallable && body, ConditionCallable && condition) : _body(std::move(body)), _condition(std::move(condition)) {} /** * Creates a delay which takes place after evaluating the condition and before executing the * loop body. */ template auto withDelayBetweenIterations(DurationType delay)&& { return AsyncTryUntilWithDelay( std::move(_body), std::move(_condition), ConstDelay(std::move(delay))); } /** * Creates an exponential delay which takes place after evaluating the condition and before * executing the loop body. */ template auto withBackoffBetweenIterations(BackoffType backoff)&& { return AsyncTryUntilWithDelay( std::move(_body), std::move(_condition), BackoffDelay(std::move(backoff))); } /** * Launches the loop and returns an ExecutorFuture that will be resolved when the loop is * complete. If the executor is already shut down or the cancelToken has already been canceled * before the loop is launched, the loop body will never run and the resulting ExecutorFuture * will be set with either a ShutdownInProgress or CallbackCanceled error. * * The returned ExecutorFuture contains the last result returned by the loop body. If the last * iteration of the loop body threw an exception or otherwise returned an error status, the * returned ExecutorFuture will contain that error. */ auto on(ExecutorPtr executor, CancellationToken cancelToken)&& { auto loop = std::make_shared( std::move(executor), std::move(_body), std::move(_condition), std::move(cancelToken)); // Launch the recursive chain using the helper class. return loop->run(); } private: template class ConstDelay { public: explicit ConstDelay(DurationType delay) : _delay(delay) {} Milliseconds getNext() { return Milliseconds(_delay); } private: DurationType _delay; }; template class BackoffDelay { public: explicit BackoffDelay(BackoffType backoff) : _backoff(backoff) {} Milliseconds getNext() { return _backoff.nextSleep(); } private: BackoffType _backoff; }; /** * Helper class to perform the actual looping logic with a recursive member function run(). * Mostly needed to clean up lambda captures and make the looping logic more readable. */ struct TryUntilLoop : public std::enable_shared_from_this { TryUntilLoop(ExecutorPtr executor, BodyCallable executeLoopBody, ConditionCallable shouldStopIteration, CancellationToken cancelToken) : executor(std::move(executor)), executeLoopBody(std::move(executeLoopBody)), shouldStopIteration(std::move(shouldStopIteration)), cancelToken(std::move(cancelToken)) {} /** * Performs actual looping through recursion. */ ExecutorFuture> run() { using ReturnType = FutureContinuationResult; // If the request is already canceled, don't run anything. if (cancelToken.isCanceled()) return ExecutorFuture(executor, asyncTryCanceledStatus()); Promise promise{NonNullPromiseTag{}}; auto future = promise.getFuture(); auto wrappedPromise = std::make_unique>( std::move(promise), Status(ErrorCodes::ShutdownInProgress, "Terminated loop due to executor shutdown")); // Kick off the asynchronous loop. runImpl(std::move(wrappedPromise)); return std::move(future).thenRunOn(executor); } /** * Helper function that schedules an asynchronous task. This task executes the loop body and * either terminates the loop by emplacing the resultPromise, or makes a recursive call to * reschedule another iteration of the loop. */ template void runImpl(std::unique_ptr> resultPromise) { executor->schedule( [this, self = this->shared_from_this(), resultPromise = std::move(resultPromise)]( Status scheduleStatus) mutable { if (!scheduleStatus.isOK()) { resultPromise->setError(std::move(scheduleStatus)); return; } // Convert the result of the loop body into an ExecutorFuture, even if the // loop body is not Future-returning. This isn't strictly necessary but it // makes implementation easier. makeExecutorFutureWith(executor, executeLoopBody) .getAsync([this, self, resultPromise = std::move(resultPromise)]( StatusOrStatusWith&& swResult) mutable { if (cancelToken.isCanceled()) { resultPromise->setError(asyncTryCanceledStatus()); return; } const auto swShouldStop = [&]() -> StatusWith { try { return shouldStopIteration(swResult); } catch (...) { return exceptionToStatus(); } }(); if (MONGO_unlikely(!swShouldStop.isOK())) { resultPromise->setError(swShouldStop.getStatus()); } else if (swShouldStop.getValue()) { resultPromise->setFrom(std::move(swResult)); } else { runImpl(std::move(resultPromise)); } }); }); } ExecutorPtr executor; BodyCallable executeLoopBody; ConditionCallable shouldStopIteration; CancellationToken cancelToken; }; BodyCallable _body; ConditionCallable _condition; }; // Helpers for functions which only take Future or ExecutorFutures, but not SemiFutures or // SharedSemiFutures. template inline constexpr bool isFutureOrExecutorFuture = false; template inline constexpr bool isFutureOrExecutorFuture> = true; template inline constexpr bool isFutureOrExecutorFuture> = true; static inline const std::string kWhenAllSucceedEmptyInputInvariantMsg = "Must pass at least one future to whenAllSucceed"; /** * Turns a variadic parameter pack into a vector without invoking copies if possible via static * casts. */ template > std::vector variadicArgsToVector(U&&... elems) { std::vector vector; vector.reserve(sizeof...(elems)); (vector.push_back(std::forward(elems)), ...); return vector; } } // namespace future_util_details /** * A fluent-style API for executing asynchronous, future-returning try-until loops. * * Example usage to send a request until a successful status is returned: * ExecutorFuture response = * AsyncTry([] { return sendRequest(); }) * .until([](StatusWith swResponse) { return swResponse.isOK(); }) * .withDelayBetweenIterations(Milliseconds(100)) // This call is optional. * .on(executor); * * Note that the AsyncTry() call passes on the return value of its input lambda (the *body*) to the * condition lambda of Until, even if the body returns an error or throws - in which case the * StatusWith will contain an error status. The delay inserted by WithDelayBetweenIterations * takes place after evaluating the condition and before executing the loop body an extra time. */ template class [[nodiscard]] AsyncTry { public: explicit AsyncTry(Callable && callable) : _body(std::move(callable)) {} template auto until(Condition && condition)&& { return future_util_details::AsyncTryUntil(std::move(_body), std::move(condition)); } Callable _body; }; /** * For an input vector of Future or ExecutorFuture elements, returns a * SemiFuture> that will be resolved when all input futures succeed or set with an * error as soon as any input future is set with an error. The resulting vector contains the results * of all of the input futures in the same order in which they were provided. */ TEMPLATE(typename FutureLike, typename Value = typename FutureLike::value_type, typename ResultVector = std::vector) REQUIRES(!std::is_void_v && future_util_details::isFutureOrExecutorFuture) SemiFuture whenAllSucceed(std::vector&& futures) { invariant(futures.size() > 0, future_util_details::kWhenAllSucceedEmptyInputInvariantMsg); // A structure used to share state between the input futures. struct SharedBlock { SharedBlock(size_t numFuturesToWaitFor, Promise result) : numFuturesToWaitFor(numFuturesToWaitFor), resultPromise(std::move(result)), intermediateResult(numFuturesToWaitFor) {} // Total number of input futures. const size_t numFuturesToWaitFor; // Tracks the number of input futures which have resolved with success so far. AtomicWord numResultsReturnedWithSuccess{0}; // Tracks whether or not the resultPromise has been set. Only used for the error case. AtomicWord completedWithError{false}; // The promise corresponding to the resulting SemiFuture returned by this function. Promise resultPromise; // A vector containing the results of each input future. ResultVector intermediateResult; }; Promise promise{NonNullPromiseTag{}}; auto future = promise.getFuture(); auto sharedBlock = std::make_shared(futures.size(), std::move(promise)); for (size_t i = 0; i < futures.size(); ++i) { std::move(futures[i]).getAsync([sharedBlock, myIndex = i](StatusWith swValue) { if (swValue.isOK()) { // Best effort check that no error has returned, not required for correctness. if (!sharedBlock->completedWithError.loadRelaxed()) { // Put this result in its proper slot in the output vector. sharedBlock->intermediateResult[myIndex] = std::move(swValue.getValue()); auto numResultsReturnedWithSuccess = sharedBlock->numResultsReturnedWithSuccess.addAndFetch(1); // If this is the last result to return, set the promise. Note that this // will never be true if one of the input futures resolves with an error, // since the future with an error will not cause the // numResultsReturnedWithSuccess count to be incremented. if (numResultsReturnedWithSuccess == sharedBlock->numFuturesToWaitFor) { // All results are ready. sharedBlock->resultPromise.emplaceValue( std::move(sharedBlock->intermediateResult)); } } } else { // Make sure no other error has already been set before setting the promise. if (!sharedBlock->completedWithError.swap(true)) { sharedBlock->resultPromise.setError(std::move(swValue.getStatus())); } } }); } return std::move(future).semi(); } /** * Variant of whenAllSucceed for void input futures. The only behavior difference is that it returns * SemiFuture instead of SemiFuture>. */ TEMPLATE(typename FutureLike, typename Value = typename FutureLike::value_type) REQUIRES(std::is_void_v&& future_util_details::isFutureOrExecutorFuture) SemiFuture whenAllSucceed(std::vector&& futures) { invariant(futures.size() > 0, future_util_details::kWhenAllSucceedEmptyInputInvariantMsg); // A structure used to share state between the input futures. struct SharedBlock { SharedBlock(size_t numFuturesToWaitFor, Promise result) : numFuturesToWaitFor(numFuturesToWaitFor), resultPromise(std::move(result)) {} // Total number of input futures. const size_t numFuturesToWaitFor; // Tracks the number of input futures which have resolved with success so far. AtomicWord numResultsReturnedWithSuccess{0}; // Tracks whether or not the resultPromise has been set. Only used for the error case. AtomicWord completedWithError{false}; // The promise corresponding to the resulting SemiFuture returned by this function. Promise resultPromise; }; Promise promise{NonNullPromiseTag{}}; auto future = promise.getFuture(); auto sharedBlock = std::make_shared(futures.size(), std::move(promise)); for (size_t i = 0; i < futures.size(); ++i) { std::move(futures[i]).getAsync([sharedBlock](Status status) { if (status.isOK()) { // Best effort check that no error has returned, not required for correctness if (!sharedBlock->completedWithError.loadRelaxed()) { auto numResultsReturnedWithSuccess = sharedBlock->numResultsReturnedWithSuccess.addAndFetch(1); // If this is the last result to return, set the promise. Note that this will // never be true if one of the input futures resolves with an error, since the // future with an error will not cause the numResultsReturnedWithSuccess count // to be incremented. if (numResultsReturnedWithSuccess == sharedBlock->numFuturesToWaitFor) { // All results are ready. sharedBlock->resultPromise.emplaceValue(); } } } else { // Make sure no other error has already been set before setting the promise. if (!sharedBlock->completedWithError.swap(true)) { sharedBlock->resultPromise.setError(std::move(status)); } } }); } return std::move(future).semi(); } /** * Given a vector of input Futures or ExecutorFutures, returns a SemiFuture that contains the * results of each input future wrapped in a StatusWith to indicate whether it resolved with success * or failure and will be resolved when all of the input futures have resolved. */ template >> SemiFuture whenAll(std::vector&& futures) { invariant(futures.size() > 0); /** * A structure used to share state between the input futures. */ struct SharedBlock { SharedBlock(size_t numFuturesToWaitFor, Promise result) : numFuturesToWaitFor(numFuturesToWaitFor), intermediateResult(numFuturesToWaitFor, {ErrorCodes::InternalError, ""}), resultPromise(std::move(result)) {} // Total number of input futures. const size_t numFuturesToWaitFor; // Tracks the number of input futures which have resolved so far. AtomicWord numReady{0}; // A vector containing the results of each input future. ResultVector intermediateResult; // The promise corresponding to the resulting SemiFuture returned by this function. Promise resultPromise; }; Promise promise{NonNullPromiseTag{}}; auto future = promise.getFuture(); auto sharedBlock = std::make_shared(futures.size(), std::move(promise)); for (size_t i = 0; i < futures.size(); ++i) { std::move(futures[i]).getAsync([sharedBlock, myIndex = i](StatusOrStatusWith value) { sharedBlock->intermediateResult[myIndex] = std::move(value); auto numReady = sharedBlock->numReady.addAndFetch(1); invariant(numReady <= sharedBlock->numFuturesToWaitFor); if (numReady == sharedBlock->numFuturesToWaitFor) { // All results are ready. sharedBlock->resultPromise.emplaceValue(std::move(sharedBlock->intermediateResult)); } }); } return std::move(future).semi(); } /** * Result type for the whenAny function. */ template struct WhenAnyResult { // The result of the future that resolved first. StatusOrStatusWith result; // The index of the future that resolved first. size_t index; }; /** * Given a vector of input Futures or ExecutorFutures, returns a SemiFuture which will contain a * struct containing the first of those futures to resolve along with its index in the input array. */ template > SemiFuture whenAny(std::vector&& futures) { invariant(futures.size() > 0); /** * A structure used to share state between the input futures. */ struct SharedBlock { SharedBlock(Promise result) : resultPromise(std::move(result)) {} // Tracks whether or not the resultPromise has been set. AtomicWord done{false}; // The promise corresponding to the resulting SemiFuture returned by this function. Promise resultPromise; }; Promise promise{NonNullPromiseTag{}}; auto future = promise.getFuture(); auto sharedBlock = std::make_shared(std::move(promise)); for (size_t i = 0; i < futures.size(); ++i) { std::move(futures[i]).getAsync([sharedBlock, myIndex = i](StatusOrStatusWith value) { // If this is the first input future to complete, change done to true and set the // value on the promise. if (!sharedBlock->done.swap(true)) { sharedBlock->resultPromise.emplaceValue(Result{std::move(value), myIndex}); } }); } return std::move(future).semi(); } /** * Variadic template overloads for the above helper functions. Though not strictly necessary, * we peel off the first element of each input list in order to assist the compiler in type * inference and to prevent 0 length lists from compiling. */ TEMPLATE(typename... FuturePack, typename FutureLike = std::common_type_t, typename Value = typename FutureLike::value_type, typename ResultVector = std::vector) REQUIRES(future_util_details::isFutureOrExecutorFuture) auto whenAllSucceed(FuturePack&&... futures) { return whenAllSucceed( future_util_details::variadicArgsToVector(std::forward(futures)...)); } template , typename Value = typename FutureT::value_type, typename ResultVector = std::vector>> SemiFuture whenAll(FuturePack&&... futures) { return whenAll(future_util_details::variadicArgsToVector(std::forward(futures)...)); } template , typename Result = WhenAnyResult> SemiFuture whenAny(FuturePack&&... futures) { return whenAny(future_util_details::variadicArgsToVector(std::forward(futures)...)); } namespace future_util { /** * Takes an input Future, ExecutorFuture, SemiFuture, or SharedSemiFuture and a CancellationToken, * and returns a new SemiFuture that will be resolved when either the input future is resolved or * when the input CancellationToken is canceled. If the token is canceled before the input future is * resolved, the resulting SemiFuture will be resolved with a CallbackCanceled error. Otherwise, the * resulting SemiFuture will be resolved with the same result as the input future. */ template SemiFuture withCancellation(FutureT&& inputFuture, const CancellationToken& token) { /** * A structure used to share state between the continuation we attach to the input future and * the continuation we attach to the token's onCancel() future. */ struct SharedBlock { SharedBlock(Promise result) : resultPromise(std::move(result)) {} // Tracks whether or not the resultPromise has been set. AtomicWord done{false}; // The promise corresponding to the resulting SemiFuture returned by this function. Promise resultPromise; }; Promise promise{NonNullPromiseTag{}}; auto future = promise.getFuture(); auto sharedBlock = std::make_shared(std::move(promise)); std::move(inputFuture) .unsafeToInlineFuture() .getAsync([sharedBlock](StatusOrStatusWith result) { // If the input future completes first, change done to true and set the // value on the promise. if (!sharedBlock->done.swap(true)) { sharedBlock->resultPromise.setFrom(std::move(result)); } }); token.onCancel().unsafeToInlineFuture().getAsync([sharedBlock](Status s) { if (s.isOK()) { // If the cancellation token is canceled first, change done to true and set the value on // the promise. if (!sharedBlock->done.swap(true)) { sharedBlock->resultPromise.setError( {ErrorCodes::CallbackCanceled, "CancellationToken canceled while waiting for input future"}); } } }); return std::move(future).semi(); } /** * This class is a helper for ensuring RAII-like behavior in async code. * * This class constructs a heap allocated State object in make(), and then uses that State object * with a Launcher functor to create a future. The State object is persisted until the future * created by the Launcher completes. If the Launcher emits an exception, the state is destructed. * If the State ctor throws, then the error Status is returned as a ready future instead of invoking * the Launcher. The simplest example would be a guard that is destroyed once an async function * finishes: * ``` * auto myFuture = future_util::AsyncState::make({}) * .thenWithState([](MyGuard*) { return myAsyncFunc(); }); * ``` * * Note that this class is not usable for ExecutorFutures because there is no tapAll() function to * invoke. If you want similar behavior, simply bind your state to the final callback in the async * chain, but do be mindful of TODO(SERVER-66126). */ template class [[nodiscard]] AsyncState { public: explicit AsyncState(std::unique_ptr state) : _status(Status::OK()), _state(std::move(state)) {} /** * Consume the AsyncState and bind the underlying state to the tail of the future returned from * the launcher functor. * * If the AsyncState was constructed with a Status, then return the captured status instead of * running the launcher. */ template auto thenWithState(Launcher && launcher) && noexcept { using namespace future_details; using ReturnType = FutureFor>; if (!_status.isOK()) { // The factory threw, we have no state to run the launcher with. return ReturnType::makeReady(std::move(_status)); } auto ptr = _state.get(); return makeReadyFutureWith([&] { auto future = coerceToFuture(std::forward(launcher)(ptr)); return std::move(future).tapAll([state = std::move(_state)](auto&&) mutable { // Finally, release our state. auto localState = std::move(state); }); }); } /** * This function is a helper for making an AsyncState given ala make_unique. * * If an exception would be emitted, it is instead stored in the AsyncState. */ template static auto make(Args && ... args) noexcept { try { auto ptr = std::make_unique(std::forward(args)...); return AsyncState(std::move(ptr)); } catch (const DBException& ex) { return AsyncState(ex.toStatus()); } } private: /** * This ctor allows make to capture exceptions and defer them until someone invokes * thenWithState(). It is private simply because it is difficult to justify a good use case for * it by an external user. */ explicit AsyncState(Status status) : _status(std::move(status)) {} Status _status; std::unique_ptr _state; }; // This function is syntactic sugar for AsyncState::make(). template auto makeState(Args&&... args) { return AsyncState::make(std::forward(args)...); } } // namespace future_util } // namespace mongo