/** * Copyright (C) 2018-present MongoDB, Inc. * * This program is free software: you can redistribute it and/or modify * it under the terms of the Server Side Public License, version 1, * as published by MongoDB, Inc. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * Server Side Public License for more details. * * You should have received a copy of the Server Side Public License * along with this program. If not, see * . * * As a special exception, the copyright holders give permission to link the * code of portions of this program with the OpenSSL library under certain * conditions as described in each individual source file and distribute * linked combinations including the program with the OpenSSL library. You * must comply with the Server Side Public License in all respects for * all of the code used other than as permitted herein. If you modify file(s) * with this exception, you may extend this exception to your version of the * file(s), but you are not obligated to do so. If you do not wish to do so, * delete this exception statement from your version. If you delete this * exception statement from all source files in the program, then also delete * it in the license file. */ #pragma once // Keeping this first to ensure it compiles by itself #include "mongo/util/future_impl.h" #include #include #include "mongo/base/status.h" #include "mongo/base/status_with.h" #include "mongo/stdx/type_traits.h" #include "mongo/util/assert_util.h" #include "mongo/util/debug_util.h" #include "mongo/util/interruptible.h" #include "mongo/util/intrusive_counter.h" #include "mongo/util/out_of_line_executor.h" namespace mongo { /** * SemiFuture is logically a possibly-deferred StatusWith (or Status when T is void). * * Unlike Future it only supports blocking operations, not directly chained continuations. You * are only allowed to chain continuations by passing an executor to thenRunOn(). This is intended * to protect the promise-completer's execution context from needing to perform arbitrary operations * requested by other subsystem's continuations. * * SemiFutures can't convert to or be assigned to Futures since that would allow adding * continuations which would defeat the purpose of SemiFuture. * * A future may be passed between threads, but only one thread may use it at a time. * * TODO decide if destroying a Future before extracting the result should cancel work or should * cancellation be explicit. For now avoid unnecessarily throwing away active Futures since the * behavior may change. End all Future chains with either a blocking call to get()/getNoThrow() or a * non-blocking call to getAsync(). * * SemiFuture is the same as the generic SemiFuture with the following exceptions: * - Anything mentioning StatusWith will use Status instead. * - Anything returning references to T will just return void since there are no void references. * - Anything taking a T argument will receive no arguments. */ template class MONGO_WARN_UNUSED_RESULT_CLASS SemiFuture { using Impl = future_details::FutureImpl; using T_unless_void = std::conditional_t, future_details::FakeVoid, T>; public: static_assert(!std::is_same::value, "Future is banned. Use Future instead."); static_assert(!isStatusWith, "Future> is banned. Just use Future instead."); static_assert(!future_details::isFutureLike, "Future of Future types is banned. Just use Future instead."); static_assert(!std::is_reference::value, "Future is banned."); static_assert(!std::is_const::value, "Future is banned."); static_assert(!std::is_array::value, "Future is banned."); using value_type = T; /** * For non-void T: Constructs a SemiFuture in a moved-from state that can only be assigned to * or destroyed. * * For void T: Constructs a ready Semifuture for parity with SemiFuture(T) */ SemiFuture() = default; SemiFuture& operator=(SemiFuture&&) = default; SemiFuture(SemiFuture&&) = default; SemiFuture(const SemiFuture&) = delete; SemiFuture& operator=(const SemiFuture&) = delete; /** * For non-void T: This must be passed a not-OK Status. * * For void T: This behaves like the StatusWith constructor and accepts any Status. */ /* implicit */ SemiFuture(Status status) : SemiFuture(Impl::makeReady(std::move(status))) {} // These should not be used with T=void. /* implicit */ SemiFuture(T_unless_void val) : SemiFuture(Impl::makeReady(std::move(val))) { static_assert(!std::is_void_v); } /* implicit */ SemiFuture(StatusWith sw) : SemiFuture(Impl::makeReady(std::move(sw))) { static_assert(!std::is_void_v); } /** * Make a ready SemiFuture from a value for cases where you don't need to wait * asynchronously. * * Calling this is faster than getting a SemiFuture out of a Promise, and is effectively free. * It is * fast enough that you never need to avoid returning a SemiFuture from an API, even if the * result * is ready 99.99% of the time. * * As an example, if you are handing out results from a batch, you can use this when for each * result while you have a batch, then use a Promise to return a not-ready SemiFuture when you * need * to get another batch. */ static SemiFuture makeReady(T_unless_void val) { return SemiFuture(Impl::makeReady(std::move(val))); } static SemiFuture makeReady(Status status) { return SemiFuture(Impl::makeReady(std::move(status))); } static SemiFuture makeReady(StatusWith val) { return SemiFuture(Impl::makeReady(std::move(val))); } REQUIRES_FOR_NON_TEMPLATE(std::is_void_v) static SemiFuture makeReady() { return SemiFuture(Impl::makeReady()); } /** * A no-op so that you can always do `return makesFutureOrSemiFuture().semi()` when you want to * protect your execution context. */ SemiFuture semi() && noexcept { return std::move(*this); } /** * Convert this SemiFuture to a SharedSemiFuture. */ SharedSemiFuture share() && noexcept { return std::move(_impl).share(); } /** * If this returns true, get() is guaranteed not to block and callbacks will be immediately * invoked. You can't assume anything if this returns false since it may be completed * immediately after checking (unless you have independent knowledge that this SemiFuture can't * complete in the background). * * Callers must still call get() or similar, even on SemiFuture, to ensure that they are * correctly sequenced with the completing task, and to be informed about whether the Promise * completed successfully. * * This is generally only useful as an optimization to avoid prep work, such as setting up * timeouts, that is unnecessary if the SemiFuture is ready already. */ bool isReady() const { return _impl.isReady(); } /** * Returns whether this SemiFuture can or will be able to access a deferred status or value. * * NOTE: valid() will still return true if the value inside of the future is moved from. This * should not be used as a way to determine usage validity until SERVER-66036. */ bool valid() const { return _impl.valid(); } /** * Returns when the Semifuture isReady(). * * Throws if the interruptible passed is interrupted (explicitly or via deadline). */ void wait(Interruptible* interruptible = Interruptible::notInterruptible()) const { return _impl.wait(interruptible); } /** * Returns Status::OK() when the Semifuture isReady(). * * Returns a non-okay status if the interruptible is interrupted. */ Status waitNoThrow(Interruptible* interruptible = Interruptible::notInterruptible()) const noexcept { return _impl.waitNoThrow(interruptible); } /** * Gets the value out of this SemiFuture, blocking until it is ready. * * get() methods throw on error, while getNoThrow() returns a !OK status. * * These methods can be called multiple times, except for the rvalue overloads. * * Note: It is impossible to differentiate interruptible interruption from an error propagating * down the Semifuture chain with these methods. If you need to distinguish the two cases, call * wait() first. */ T get(Interruptible* interruptible = Interruptible::notInterruptible()) && { return std::move(_impl).get(interruptible); } future_details::AddRefUnlessVoid get( Interruptible* interruptible = Interruptible::notInterruptible()) & { return _impl.get(interruptible); } future_details::AddRefUnlessVoid get( Interruptible* interruptible = Interruptible::notInterruptible()) const& { return _impl.get(interruptible); } StatusOrStatusWith getNoThrow( Interruptible* interruptible = Interruptible::notInterruptible()) && noexcept { return std::move(_impl).getNoThrow(interruptible); } StatusOrStatusWith getNoThrow( Interruptible* interruptible = Interruptible::notInterruptible()) const& noexcept { return _impl.getNoThrow(interruptible); } /** * Ignores the return value of a future, transforming it down into a SemiFuture. * * This only ignores values, not errors. Those remain propagated to an onError handler. */ SemiFuture ignoreValue() && noexcept { return SemiFuture(std::move(this->_impl).ignoreValue()); } /** * Returns a future that allows you to add continuations that are guaranteed to run on the * provided executor. * * Be sure to read the ExecutorFuture class comment. */ ExecutorFuture thenRunOn(ExecutorPtr exec) && noexcept; /** * Returns an inline Future type from this SemiFuture. * * WARNING: Do not use this unless you're extremely sure of what you're doing, as callbacks * chained to the resulting Future may run in unexpected places. */ Future unsafeToInlineFuture() && noexcept; private: friend class Promise; friend class SharedPromise; template friend class Future; template friend class ExecutorFuture; template friend class future_details::FutureImpl; template friend class SharedSemiFuture; template friend class SemiFuture; explicit SemiFuture(future_details::SharedStateHolder&& impl) : _impl(std::move(impl)) {} explicit SemiFuture(Impl&& impl) : _impl(std::move(impl)) {} operator Impl &&() && { return std::move(_impl); } template void propagateResultTo(U&& arg) && { std::move(_impl).propagateResultTo(std::forward(arg)); } Impl _impl; }; // Deduction Guides TEMPLATE(typename T) REQUIRES(!isStatusOrStatusWith && !future_details::isFutureLike) SemiFuture(T)->SemiFuture; template SemiFuture(StatusWith)->SemiFuture; /** * Future is a SemiFuture (which is logically a possibly deferred StatusOrStatusWith), * extended with the ability to chain additional continuations that will be invoked when the result * is ready. * * All comments on SemiFuture apply to Future as well. */ template class MONGO_WARN_UNUSED_RESULT_CLASS Future : private SemiFuture { using Impl = typename SemiFuture::Impl; using T_unless_void = typename SemiFuture::T_unless_void; public: /** * Re-export the API of SemiFuture. The API of Future is a superset, except you can't convert * from a SemiFuture to a Future. */ using value_type = T; using SemiFuture::SemiFuture; // Constructors. using SemiFuture::share; using SemiFuture::isReady; using SemiFuture::valid; using SemiFuture::wait; using SemiFuture::waitNoThrow; using SemiFuture::get; using SemiFuture::getNoThrow; using SemiFuture::semi; using SemiFuture::thenRunOn; /** * Re-export makeReady, but return a Future */ static Future makeReady(T_unless_void val) { return Future(Impl::makeReady(std::move(val))); } static Future makeReady(Status status) { return Future(Impl::makeReady(std::move(status))); } static Future makeReady(StatusWith val) { return Future(Impl::makeReady(std::move(val))); } REQUIRES_FOR_NON_TEMPLATE(std::is_void_v) static Future makeReady() { return Future(Impl::makeReady()); } Future ignoreValue() && noexcept { return Future(std::move(this->_impl).ignoreValue()); } /** * Attaches a `func` callback that will consume the result of this `Future` when it completes. A * `getAsync` call can be the tail end of a continuation chain, and that is only position at * which it can appear. * * The result argument passed to `func` is `Status` if `T` is `void`, otherwise `StatusWith`. * The `func(result)` return type must be `void`, and not a discarded return value. * * `func` must not throw an exception. It is invoked as if by a `noexcept` function, and it will * `std::terminate` the process. This is because there is no appropriate context in which to * handle such an asynchronous exception. * * TODO decide how to handle func throwing. */ TEMPLATE(typename Policy, typename Func) REQUIRES(future_details::isCallableExactR>&& isFuturePolicy) void getAsync(Policy policy, Func&& func) && noexcept { std::move(this->_impl).getAsync(policy, std::forward(func)); } // // The remaining methods are all continuation-based and take a callback and return a Future-like // type based on the return type of the callback, except for the "tap" methods that always // return Future. When passed a callback that returns a FutureLike type, the return type // of the method will be either Future if FutureLike is Future, otherwise SemiFuture. The // result of the callback will be automatically unwrapped and connected to the returned // FutureLike rather than producing a Future>. When the callback returns a // non-FutureLike type U, the return type of the method will be Future, with the adjustment // for Status/StatusWith described below. // // Each method has a comment indicating the supported signatures for that callback, and a // description of when the callback is invoked and how the impacts the returned Future. It may // be helpful to think of Future continuation chains as a pipeline of stages that take input // from earlier stages and produce output for later stages. // // Be aware that the callback may be invoked inline at the call-site or at the producer when // setting the value. Therefore, you should avoid doing blocking work inside of a callback. // Additionally, avoid acquiring any locks or mutexes that the caller already holds, otherwise // you risk a deadlock. If either of these concerns apply to your callback, it should schedule // itself on an executor, rather than doing work in the callback. // TODO make this easier to do by having executor APIs return Futures. // // Error handling in callbacks: all exceptions thrown propagate to the returned Future // automatically. Callbacks that return Status or StatusWith behave as-if they were wrapped // in something that called uassertStatusOK() on the return value. There is no way to // distinguish between a function throwing or returning a !OK status. // /** * Callbacks passed to then() are only called if the input Future completes successfully. * Otherwise the error propagates automatically, bypassing the callback. * * The callback takes a T and can return anything (see above for how Statusy and Futurey returns * are handled.) */ TEMPLATE(typename Policy, typename Func) REQUIRES(future_details::isCallable&& isFuturePolicy) /*see above*/ auto then(Policy policy, Func&& func) && noexcept { return wrap(std::move(this->_impl).then(policy, std::forward(func))); } /** * Callbacks passed to onCompletion() are called if the input Future completes with or without * an error. * * The callback takes a StatusOrStatusWith and can return anything (see above for how Statusy * and Futurey returns are handled.) */ TEMPLATE(typename Policy, typename Func) REQUIRES(future_details::isCallable>&& isFuturePolicy) /*see above*/ auto onCompletion(Policy policy, Func&& func) && noexcept { return wrap( std::move(this->_impl).onCompletion(policy, std::forward(func))); } /** * Callbacks passed to onError() are only called if the input Future completes with an error. * Otherwise, the successful result propagates automatically, bypassing the callback. * * The callback can either produce a replacement value (which must be a T), return a replacement * Future (such as by retrying), or return/throw a replacement error. * * Note that this will only catch errors produced by earlier stages; it is not registering a * general error handler for the entire chain. * * The callback takes a non-OK Status and returns a possibly-wrapped T (see above for how * Statusy and Futurey returns are handled.) */ TEMPLATE(typename Policy, typename Func) REQUIRES(future_details::isCallableR&& isFuturePolicy) /*see above*/ auto onError(Policy policy, Func&& func) && noexcept { return wrap(std::move(this->_impl).onError(policy, std::forward(func))); } /** * Same as the other two onErrors but only calls the callback if the code matches the template * parameter. Otherwise lets the error propagate unchanged. * * The callback takes a non-OK Status and returns a possibly-wrapped T (see above for how * Statusy and Futurey returns are handled.) */ TEMPLATE(ErrorCodes::Error code, typename Policy, typename Func) REQUIRES(future_details::isCallableR&& isFuturePolicy) /*see above*/ auto onError(Policy policy, Func&& func) && noexcept { return wrap( std::move(this->_impl).template onError(policy, std::forward(func))); } /** * Similar to the first two onErrors, but only calls the callback if the category matches * the template parameter. Otherwise lets the error propagate unchanged. * * The callback takes a non-OK Status and returns a possibly-wrapped T (see above for how * Statusy and Futurey returns are handled.) */ TEMPLATE(ErrorCategory category, typename Policy, typename Func) REQUIRES(future_details::isCallableR&& isFuturePolicy) /*see above*/ auto onErrorCategory(Policy policy, Func&& func) && noexcept { return wrap( std::move(this->_impl) .template onErrorCategory(policy, std::forward(func))); } // // The tap/tapError/tapAll family of functions take callbacks to observe the flow through a // future chain without affecting the propagating result, except possibly if they throw. If the // naming seems odd, you can think of it like a "wire tap" in that it allows you to observe a // conversation between two parties (the promise-producer and future-consumer) without adding // messages of your own. This is why all callbacks are required to return void. // // TODO decide what to do if callback throws: // - transition the future chain to failure // - ignore // - fatal (current impl) // /** * Callback is called if the input completes successfully. * * This can be used to inform some outside system of the result. * * The callback takes a const T& and must return void. */ TEMPLATE(typename Policy, typename Func) REQUIRES(future_details::isCallableExactR&& isFuturePolicy) Future tap(Policy policy, Func&& func) && noexcept { return Future(std::move(this->_impl).tap(policy, std::forward(func))); } /** * Callback is called if the input completes with an error. * * This can be used to log. * * The callback takes a non-OK Status and must return void. */ TEMPLATE(typename Policy, typename Func) REQUIRES(future_details::isCallableExactR&& isFuturePolicy) Future tapError(Policy policy, Func&& func) && noexcept { return Future(std::move(this->_impl).tapError(policy, std::forward(func))); } /** * Callback is called when the input completes, regardless of success or failure. * * This can be used for cleanup. Some other libraries name the equivalent method finally to * match the common semantic from other languages. * * The callback takes a StatusOrStatusWith and must return void. */ TEMPLATE(typename Policy, typename Func) REQUIRES(future_details::isCallableExactR>&& isFuturePolicy) Future tapAll(Policy policy, Func&& func) && noexcept { return Future(std::move(this->_impl).tapAll(policy, std::forward(func))); } // Calling this on a Future is never strictly necessary, since this is already an inline // Future, but making this public helps for writing generic utilities which need to use // unsafeToInlineFuture for some future types but not others. using SemiFuture::unsafeToInlineFuture; /** * Providing a Policy to continuation functions is optional. All callers should eventually * use the default policy by removing the tag altogether. * * TODO(SERVER-66126): Remove all tag-taking methods and associated default implementations. */ TEMPLATE(typename Func) REQUIRES(future_details::isCallableExactR>) void getAsync(Func&& func) && noexcept { std::move(*this).getAsync(destroyDefault, std::forward(func)); } TEMPLATE(typename Func) REQUIRES(future_details::isCallable) auto then(Func&& func) && noexcept { return std::move(*this).then(destroyDefault, std::forward(func)); } TEMPLATE(typename Func) REQUIRES(future_details::isCallable>) auto onCompletion(Func&& func) && noexcept { return std::move(*this).onCompletion(destroyDefault, std::forward(func)); } TEMPLATE(typename Func) REQUIRES(future_details::isCallableR) auto onError(Func&& func) && noexcept { return std::move(*this).onError(destroyDefault, std::forward(func)); } TEMPLATE(ErrorCodes::Error code, typename Func) REQUIRES(future_details::isCallableR) auto onError(Func&& func) && noexcept { return std::move(*this).template onError(destroyDefault, std::forward(func)); } TEMPLATE(ErrorCategory category, typename Func) REQUIRES(future_details::isCallableR) auto onErrorCategory(Func&& func) && noexcept { return std::move(*this).template onErrorCategory(destroyDefault, std::forward(func)); } TEMPLATE(typename Func) REQUIRES(future_details::isCallableExactR) Future tap(Func&& func) && noexcept { return std::move(*this).tap(destroyDefault, std::forward(func)); } TEMPLATE(typename Func) REQUIRES(future_details::isCallableExactR) Future tapError(Func&& func) && noexcept { return std::move(*this).tapError(destroyDefault, std::forward(func)); } TEMPLATE(typename Func) REQUIRES(future_details::isCallableExactR>) Future tapAll(Func&& func) && noexcept { return std::move(*this).tapAll(destroyDefault, std::forward(func)); } private: template friend class ExecutorFuture; template friend class Future; template friend class future_details::FutureImpl; friend class Promise; friend class SharedPromise; template static auto wrap(future_details::FutureImpl&& impl) { using namespace future_details; return FutureContinuationKind>(std::move(impl)); } }; // Deduction Guides TEMPLATE(typename T) REQUIRES(!isStatusOrStatusWith && !future_details::isFutureLike) Future(T)->Future; template Future(StatusWith)->Future; /** * An ExecutorFuture is like a Future that ensures that all callbacks are run on a supplied * executor. * * IMPORTANT: Executors are allowed to refuse work by invoking their task callbacks with a non-OK * Status. In that event, callbacks passed to continuation functions WILL NOT RUN. Instead, the * error status will propagate down the future chain until it would run a callback on an executor * that doesn't refuse the work, or it is extracted by calling a blocking get() method. Destructors * for these callbacks can run in any context, so be suspicious of callbacks that capture Promises * because they will propagate out BrokenPromise if the executor refuses work. */ template class MONGO_WARN_UNUSED_RESULT_CLASS ExecutorFuture : private SemiFuture { using Impl = typename SemiFuture::Impl; using T_unless_void = typename SemiFuture::T_unless_void; public: /** * Default construction is disallowed to ensure that every ExecutorFuture has an associated * Executor (unless it has been moved-from). */ ExecutorFuture() = delete; ExecutorFuture(ExecutorPtr exec, Status status) : SemiFuture(std::move(status)), _exec(std::move(exec)) {} // These should not be used with T=void. ExecutorFuture(ExecutorPtr exec, T_unless_void val) : SemiFuture(std::move(val)), _exec(std::move(exec)) { static_assert(!std::is_void_v); } ExecutorFuture(ExecutorPtr exec, StatusWith sw) : SemiFuture(std::move(sw)), _exec(std::move(exec)) { static_assert(!std::is_void_v); } REQUIRES_FOR_NON_TEMPLATE(std::is_void_v) explicit ExecutorFuture(ExecutorPtr exec) : SemiFuture(), _exec(std::move(exec)) {} /** * Re-export the accessor API of SemiFuture. The access API of ExecutorFuture is a superset, but * you can't create an ExecutorFuture without supplying an executor. */ using value_type = T; using SemiFuture::share; using SemiFuture::isReady; using SemiFuture::valid; using SemiFuture::wait; using SemiFuture::waitNoThrow; using SemiFuture::get; using SemiFuture::getNoThrow; using SemiFuture::semi; using SemiFuture::thenRunOn; ExecutorFuture ignoreValue() && noexcept { return ExecutorFuture(std::move(_exec), std::move(this->_impl).ignoreValue()); } // // Provide the callback-taking API from Future (except for the taps). All callbacks will be run // on the executor associated with this ExecutorFuture. See class comment for how we handle // executors that refuse work. // // All methods that return non-void will return an ExecutorFuture bound to the same executor as // this. // // There is no tap support because we can't easily be both non-intrusive in the value flow and // schedule on an executor that is allowed to fail. In particular, the inability to copy // move-only values means that we would need to refer directly into the internal SharedState // objects and keep them alive longer that we otherwise would. If there is a real need for this, // it should be doable, but will be fairly complicated. // /** * Attach a completion callback to asynchronously consume this `ExecutorFuture`'s result. * \see `Future::getAsync()`. */ TEMPLATE(typename Policy, typename Func) REQUIRES(future_details::isCallableExactR>&& isFuturePolicy) void getAsync(Policy policy, Func&& func) && noexcept { static_assert(std::is_void_v>()))>, "func passed to getAsync must return void"); // Can't use wrapCB since we don't want to return a future, just schedule a non-chainable // callback. std::move(this->_impl).getAsync(policy, [ exec = std::move(_exec), // Unlike wrapCB this can move because we won't need it later. func = std::forward(func) ](StatusOrStatusWith arg) mutable noexcept { exec->schedule([ func = std::move(func), arg = std::move(arg) ](Status execStatus) mutable noexcept { if (execStatus.isOK()) func(std::move(arg)); }); }); } TEMPLATE(typename Policy, typename Func) REQUIRES(future_details::isCallable&& isFuturePolicy) auto then(Policy policy, Func&& func) && noexcept { return mongo::ExecutorFuture( std::move(_exec), std::move(this->_impl).then(policy, wrapCB(policy, std::forward(func)))); } TEMPLATE(typename Policy, typename Func) REQUIRES(future_details::isCallable>&& isFuturePolicy) auto onCompletion(Policy policy, Func&& func) && noexcept { return mongo::ExecutorFuture( std::move(_exec), std::move(this->_impl) .onCompletion(policy, wrapCB>(policy, std::forward(func)))); } TEMPLATE(typename Policy, typename Func) REQUIRES(future_details::isCallableR&& isFuturePolicy) ExecutorFuture onError(Policy policy, Func&& func) && noexcept { return mongo::ExecutorFuture( std::move(_exec), std::move(this->_impl) .onError(policy, wrapCB(policy, std::forward(func)))); } TEMPLATE(ErrorCodes::Error code, typename Policy, typename Func) REQUIRES(future_details::isCallableR&& isFuturePolicy) ExecutorFuture onError(Policy policy, Func&& func) && noexcept { return mongo::ExecutorFuture( std::move(_exec), std::move(this->_impl) .template onError(policy, wrapCB(policy, std::forward(func)))); } TEMPLATE(ErrorCategory category, typename Policy, typename Func) REQUIRES(future_details::isCallableR&& isFuturePolicy) ExecutorFuture onErrorCategory(Policy policy, Func&& func) && noexcept { return mongo::ExecutorFuture( std::move(_exec), std::move(this->_impl) .template onErrorCategory( policy, wrapCB(policy, std::forward(func)))); } /** * Returns an inline Future type from this ExecutorFuture. * * WARNING: Do not use this unless you're extremely sure of what you're doing, as callbacks * chained to the resulting Future may run in unexpected places. */ using SemiFuture::unsafeToInlineFuture; /** * Providing a Policy to continuation functions is optional. All callers should eventually * use the default policy by removing the tag altogether. * * TODO(SERVER-66126): Remove all tag-taking methods and associated default implementations. */ TEMPLATE(typename Func) REQUIRES(future_details::isCallableExactR>) void getAsync(Func&& func) && noexcept { std::move(*this).getAsync(destroyDefault, std::forward(func)); } TEMPLATE(typename Func) REQUIRES(future_details::isCallable) auto then(Func&& func) && noexcept { return std::move(*this).then(destroyDefault, std::forward(func)); } TEMPLATE(typename Func) REQUIRES(future_details::isCallable>) auto onCompletion(Func&& func) && noexcept { return std::move(*this).onCompletion(destroyDefault, std::forward(func)); } TEMPLATE(typename Func) REQUIRES(future_details::isCallableR) auto onError(Func&& func) && noexcept { return std::move(*this).onError(destroyDefault, std::forward(func)); } TEMPLATE(ErrorCodes::Error code, typename Func) REQUIRES(future_details::isCallableR) auto onError(Func&& func) && noexcept { return std::move(*this).template onError(destroyDefault, std::forward(func)); } TEMPLATE(ErrorCategory category, typename Func) REQUIRES(future_details::isCallableR) auto onErrorCategory(Func&& func) && noexcept { return std::move(*this).template onErrorCategory(destroyDefault, std::forward(func)); } private: // This *must* take exec by ref to ensure it isn't moved from while evaluating wrapCB above. ExecutorFuture(ExecutorPtr&& exec, Impl&& impl) : SemiFuture(std::move(impl)), _exec(exec) { dassert(_exec); } /** * Wraps func in a callback that takes the argument it would and returns an appropriately typed * Future, then schedules a task on _exec to complete the associated promise with the result * of calling func with that argument. */ TEMPLATE(typename RawArg, typename Policy, typename Func) REQUIRES(isFuturePolicy) auto wrapCB(Policy policy, Func&& func) { // Have to take care to never put void in argument position, since that is a hard error. using Result = typename std::conditional_t, std::invoke_result, std::invoke_result>::type; using DummyArg = std::conditional_t, // future_details::FakeVoid, RawArg>; using Sig = std::conditional_t, // Result(), Result(DummyArg)>; return _wrapCBHelper(_exec, unique_function(std::forward(func))); } template MONGO_COMPILER_NOINLINE static auto _wrapCBHelper(ExecutorPtr exec, UniqueFunc&& func); template friend class ExecutorFuture; template friend class SemiFuture; template friend class SharedSemiFuture; template friend class future_details::FutureImpl; ExecutorPtr _exec; }; // Deduction Guides TEMPLATE(typename T) REQUIRES(!isStatusOrStatusWith && !future_details::isFutureLike) ExecutorFuture(ExecutorPtr, T)->ExecutorFuture; template ExecutorFuture(ExecutorPtr, future_details::FutureImpl)->ExecutorFuture; template ExecutorFuture(ExecutorPtr, StatusWith)->ExecutorFuture; ExecutorFuture(ExecutorPtr)->ExecutorFuture; /** Constructor tag (see the corresponding `Promise` constructor). */ struct NonNullPromiseTag {}; /** * This class represents the producer side of a Future. * * This is a single-shot class: you may either set a value or error at most once. If no value or * error has been set at the time this Promise is destroyed, a error will be set with * ErrorCode::BrokenPromise. This should generally be considered a programmer error, and should not * be relied upon. We may make it debug-fatal in the future. * * Only one thread can use a given Promise at a time, but another thread may be using the associated * Future object. * * If the result is ready when producing the Future, it is more efficient to use * makeReadyFutureWith() or Future::makeReady() than to use a Promise. * * A default constructed `Promise` is in a null state. Null `Promises` can only be assigned over * and destroyed. It is a programmer error to call any methods on a null `Promise`. Any methods * that complete a `Promise` leave it in the null state. */ template class Promise { using SharedStateT = future_details::SharedState; using T_unless_void = typename SemiFuture::T_unless_void; public: using value_type = T; /** * Creates a null `Promise`. */ Promise() = default; /** Makes a `Promise` that has a `Future`. */ explicit Promise(NonNullPromiseTag) : Promise{make_intrusive()} {} ~Promise() { breakPromiseIfNeeded(); } Promise(const Promise&) = delete; Promise& operator=(const Promise&) = delete; /** * Breaks this `Promise`, if not fulfilled and not in a null state. */ Promise& operator=(Promise&& p) noexcept { breakPromiseIfNeeded(); _sharedState = std::move(p._sharedState); return *this; } Promise(Promise&&) = default; /** * Sets a value or error into this Promise by calling func, which must take no arguments and * return one of T, StatusWith (or Status when T is void), or Future. All errors, whether * returned or thrown, will be correctly propagated. * * If the function returns a Future, this Promise's Future will complete when the returned * Future completes, as-if it was passed to Promise::setFrom(). * * If any work is needed to produce the result, prefer doing something like: * promise.setWith([&]{ return makeResult(); }); * over code like: * promise.emplaceValue(makeResult()); * because this method will correctly propagate errors thrown from makeResult(), rather than * ErrorCodes::BrokenPromise. */ TEMPLATE(typename Policy, typename Func) REQUIRES(future_details::isCallableR&& isFuturePolicy) void setWith(Policy policy, Func&& func) noexcept { setFrom(Future::makeReady().then(policy, std::forward(func))); } TEMPLATE(typename Func) REQUIRES(future_details::isCallableR) void setWith(Func&& func) noexcept { setWith(destroyDefault, std::forward(func)); } /** * Sets the value into this Promise when the passed-in Future completes, which may have already * happened. If it hasn't, it is still safe to destroy this Promise since it is no longer * involved. */ void setFrom(Future&& future) noexcept { setImpl([&](boost::intrusive_ptr&& sharedState) { std::move(future).propagateResultTo(sharedState.get()); }); } /** * Sets the value into this Promise immediately. * * This accepts a Status for Promises or a StatusWith for Promise. */ void setFrom(StatusOrStatusWith sosw) noexcept { setImpl([&](boost::intrusive_ptr&& sharedState) { sharedState->setFrom(std::move(sosw)); }); } // Use emplaceValue(Args&&...) instead. REQUIRES_FOR_NON_TEMPLATE(!std::is_void_v) void setFrom(T_unless_void val) noexcept = delete; // Use setError(Status) instead. REQUIRES_FOR_NON_TEMPLATE(!std::is_void_v) void setFrom(Status) noexcept = delete; TEMPLATE(typename... Args) REQUIRES(std::is_constructible_v || (std::is_void_v && sizeof...(Args) == 0)) void emplaceValue(Args&&... args) noexcept { setImpl([&](boost::intrusive_ptr&& sharedState) { sharedState->emplaceValue(std::forward(args)...); }); } void setError(Status status) noexcept { invariant(!status.isOK()); setImpl([&](boost::intrusive_ptr&& sharedState) { sharedState->setError(std::move(status)); }); } /** * Create and return the Future corresponding to this Promise. Cannot be * called more than once (but see `SharedPromise` which supports multiple * `getFuture` calls). * * Caution: calling `getFuture` is only valid before the promise is * completed, because a Promise is nullified upon completion. This Promise * behavior is different from the semantics of other future libraries. The * PromiseAndFuture helper avoids a potential data race by eagerly calling * `getFuture`, returning the Promise together with its Future. */ Future getFuture() noexcept { // Copy `_sharedState` into a SharedStateHolder to make a Future, // exploiting the knowledge that this Promise is its sole owner. _sharedState->threadUnsafeIncRefCountTo(2); return Future(future_details::SharedStateHolder>( boost::intrusive_ptr(_sharedState.get(), false))); } private: explicit Promise(boost::intrusive_ptr&& sharedState) : _sharedState(std::move(sharedState)) {} friend class Future; template void setImpl(Func&& doSet) noexcept { invariant(_sharedState); // We keep `sharedState` as a stack local, to preserve ownership of the resource, // in case the code in `doSet` unblocks a thread which winds up causing // `~Promise` to be invoked. auto sharedState = std::move(_sharedState); doSet(std::move(sharedState)); // Note: `this` is potentially dead, at this point. } // The current promise will be broken, if not already fulfilled. void breakPromiseIfNeeded() { if (MONGO_unlikely(_sharedState)) { _sharedState->setError({ErrorCodes::BrokenPromise, "broken promise"}); } } boost::intrusive_ptr _sharedState; }; /** * SharedSemiFuture is logically a possibly-deferred StatusWith (or Status when T is void). * * All methods that are present do the same as on a Future so see it for documentation. * * Unlike Future it only supports blocking operation, not chained continuations. This is intended * to protect the promise-completer's execution context from needing to perform arbitrary operations * requested by other subsystem's continuations. * TODO Support continuation chaining when supplied with an executor to run them on. * * A SharedSemiFuture may be passed between threads, but only one thread may use it at a time. */ template class MONGO_WARN_UNUSED_RESULT_CLASS SharedSemiFuture { using Impl = future_details::SharedStateHolder; using T_unless_void = std::conditional_t, future_details::FakeVoid, T>; public: static_assert(!std::is_same::value, "SharedSemiFuture is banned. Use SharedSemiFuture instead."); static_assert( !isStatusWith, "SharedSemiFuture> is banned. Just use SharedSemiFuture instead."); static_assert( !future_details::isFutureLike, "SharedSemiFuture of Future types is banned. Just use SharedSemiFuture instead."); static_assert(!std::is_reference::value, "SharedSemiFuture is banned."); static_assert(!std::is_const::value, "SharedSemiFuture is banned."); static_assert(!std::is_array::value, "SharedSemiFuture is banned."); static_assert(std::is_void_v || std::is_copy_constructible_v, "SharedSemiFuture currently requires copyable types. Let us know if this is a " "problem. Supporting this for blocking use cases is easy, but it will require " "more work for async usage."); using value_type = T; SharedSemiFuture() = default; /*implicit*/ SharedSemiFuture(const Future& fut) = delete; /*implicit*/ SharedSemiFuture(Future&& fut) : SharedSemiFuture(std::move(fut).share()) {} /*implicit*/ SharedSemiFuture(Status error) : _shared(Impl::makeReady(std::move(error))) {} // These should not be used with T=void. /*implicit*/ SharedSemiFuture(T_unless_void val) : _shared(Impl::makeReady(std::move(val))) { static_assert(!std::is_void_v); } /*implicit*/ SharedSemiFuture(StatusWith sw) : _shared(Impl::makeReady(std::move(sw))) { static_assert(!std::is_void_v); } bool isReady() const { return _shared.isReady(); } bool valid() const { return _shared.valid(); } void wait(Interruptible* interruptible = Interruptible::notInterruptible()) const { _shared.wait(interruptible); } Status waitNoThrow(Interruptible* interruptible = Interruptible::notInterruptible()) const noexcept { return _shared.waitNoThrow(interruptible); } future_details::AddRefUnlessVoid get( Interruptible* interruptible = Interruptible::notInterruptible()) const& { return _shared.get(interruptible); } StatusOrStatusWith getNoThrow( Interruptible* interruptible = Interruptible::notInterruptible()) const& noexcept { return _shared.getNoThrow(interruptible); } ExecutorFuture thenRunOn(ExecutorPtr exec) const noexcept { return ExecutorFuture(std::move(exec), toFutureImpl()); } /** * Makes a copy of this SharedSemiFuture that resolves at the same time as the original. */ SharedSemiFuture split() const noexcept { return toFutureImpl().share(); } SemiFuture semi() && noexcept { return SemiFuture(toFutureImpl()); } /** * Returns an inline Future type from this SharedSemiFuture. * * WARNING: Do not use this unless you're extremely sure of what you're doing, as callbacks * chained to the resulting Future may run in unexpected places. */ Future unsafeToInlineFuture() const noexcept { return Future(toFutureImpl()); } private: template friend class SharedPromise; template friend class future_details::FutureImpl; friend class SharedSemiFuture; template friend class ExecutorFuture; future_details::FutureImpl toFutureImpl() const noexcept { static_assert(std::is_void_v || std::is_copy_constructible_v); return future_details::FutureImpl(_shared.addChild()); } // These are needed to support chaining where a SharedSemiFuture is returned from a // continuation. explicit operator future_details::FutureImpl() const noexcept { return toFutureImpl(); } template void propagateResultTo(U&& arg) const noexcept { toFutureImpl().propagateResultTo(std::forward(arg)); } explicit SharedSemiFuture(boost::intrusive_ptr> ptr) : _shared(std::move(ptr)) {} explicit SharedSemiFuture(future_details::SharedStateHolder&& holder) : _shared(std::move(holder)) {} future_details::SharedStateHolder _shared; }; // Deduction Guides TEMPLATE(typename T) REQUIRES(!isStatusOrStatusWith && !future_details::isFutureLike) SharedSemiFuture(T)->SharedSemiFuture; template SharedSemiFuture(StatusWith)->SharedSemiFuture; /** * This class represents the producer of SharedSemiFutures. * * This is a single-shot class: you may either set a value or error at most once. However you may * extract as many futures as you want and they will all be completed at the same time. Any number * of threads can extract a future at the same time. It is also safe to extract a future * concurrently with completing the promise. If you extract a future after the promise has been * completed, a ready future will be returned. You must still ensure that all calls to getFuture() * complete prior to destroying the Promise. * * If no value or error has been set at the time this Promise is destroyed, an error will be set * with ErrorCode::BrokenPromise. This should generally be considered a programmer error, and should * not be relied upon. We may make it debug-fatal in the future. * * Unless otherwise specified, all methods behave the same as on Promise. */ template class SharedPromise { using SharedStateT = future_details::SharedState; using T_unless_void = std::conditional_t, future_details::FakeVoid, T>; public: using value_type = T; /** * Creates a `SharedPromise` ready for use. */ SharedPromise() = default; ~SharedPromise() { if (MONGO_unlikely(!_haveCompleted)) { _sharedState->setError({ErrorCodes::BrokenPromise, "broken promise"}); } } SharedPromise(const SharedPromise&) = delete; SharedPromise(SharedPromise&&) = delete; SharedPromise& operator=(const SharedPromise&) = delete; SharedPromise& operator=(SharedPromise&& p) noexcept = delete; /** * Returns a future associated with this promise. All returned futures will be completed when * the promise is completed. */ SharedSemiFuture getFuture() const { return SharedSemiFuture(_sharedState); } TEMPLATE(typename Policy, typename Func) REQUIRES(future_details::isCallableR&& isFuturePolicy) void setWith(Policy policy, Func&& func) noexcept { setFrom(Future::makeReady().then(policy, std::forward(func))); } TEMPLATE(typename Func) REQUIRES(future_details::isCallableR) void setWith(Func&& func) noexcept { setWith(destroyDefault, std::forward(func)); } /** * Sets the value into this SharedPromise when the passed-in Future completes, which may have * already happened. */ void setFrom(Future&& future) noexcept { invariant(!std::exchange(_haveCompleted, true)); std::move(future).propagateResultTo(_sharedState.get()); } /** * Sets the value into this SharedPromise immediately. * * This accepts a Status for SharedPromises or a StatusWith for SharedPromise. */ void setFrom(StatusOrStatusWith sosw) noexcept { invariant(!std::exchange(_haveCompleted, true)); _sharedState->setFrom(std::move(sosw)); } // Use emplaceValue(Args&&...) instead. REQUIRES_FOR_NON_TEMPLATE(!std::is_void_v) void setFrom(T_unless_void val) noexcept = delete; // Use setError(Status) instead. REQUIRES_FOR_NON_TEMPLATE(!std::is_void_v) void setFrom(Status) noexcept = delete; TEMPLATE(typename... Args) REQUIRES(std::is_constructible_v || (std::is_void_v && sizeof...(Args) == 0)) void emplaceValue(Args&&... args) noexcept { invariant(!std::exchange(_haveCompleted, true)); _sharedState->emplaceValue(std::forward(args)...); } void setError(Status status) noexcept { invariant(!status.isOK()); invariant(!std::exchange(_haveCompleted, true)); _sharedState->setError(std::move(status)); } private: friend class Future; // This is slightly different from whether the SharedState is in kFinished, because this // SharedPromise may have been completed with a Future that isn't ready yet. bool _haveCompleted = false; const boost::intrusive_ptr _sharedState = make_intrusive(); }; /** * Holds a Promise and its corresponding Future. * Upon construction, contains a new Promise and its Future. */ template struct PromiseAndFuture { Promise promise{NonNullPromiseTag{}}; Future future{promise.getFuture()}; }; /** Another way to write `PromiseAndFuture{}`. */ template PromiseAndFuture makePromiseFuture() { return {}; } /** * This metafunction allows APIs that take callbacks and return Future to avoid doing their own type * calculus. This results in the base value_type that would result from passing Func to a * Future::then(), with the same normalizing of T/StatusWith/Future returns. This is * primarily useful for implementations of executors rather than their users. * * This returns the unwrapped T rather than Future so it will be easy to create a Promise. * * Examples: * * FutureContinuationResult> == void * FutureContinuationResult> == void * FutureContinuationResult()>> == void * * FutureContinuationResult> == int * FutureContinuationResult()>> == int * FutureContinuationResult()>> == int * * FutureContinuationResult, bool> == int * * FutureContinuationResult, NotBool> SFINAE-safe substitution failure. */ template using FutureContinuationResult = future_details::UnwrappedType>; /** * This type transform is useful for coercing (T,StatusWith)->Future and Status->Future. */ template using FutureFor = Future>; template auto coerceToFuture(T&& value) { return FutureFor(std::forward(value)); } /** * Makes a Future with the return value of a nullary function. This has the same semantics as * Promise::setWith, and has the same reasons to prefer it over Future::makeReady(). Also, it * deduces the T, so it is easier to use. * * Note that if func returns an unready Future, this function will not block until it is ready. */ TEMPLATE(typename Func) REQUIRES(future_details::isCallable) auto makeReadyFutureWith(Func&& func) -> Future> try { if constexpr (std::is_void_v>) { std::forward(func)(); return Future::makeReady(); } else { return std::forward(func)(); } } catch (const DBException& ex) { return ex.toStatus(); } // // Implementations of methods that couldn't be defined in the class due to ordering requirements. // template template auto ExecutorFuture::_wrapCBHelper(ExecutorPtr exec, UniqueFunc&& func) { return [ exec = std::move(exec), func = std::move(func) ](auto&&... args) mutable noexcept { using FuncR = typename UniqueFunc::result_type; using BoundArgs = std::tuple...>; Promise> promise{NonNullPromiseTag{}}; auto future = promise.getFuture(); exec->schedule([ promise = std::move(promise), func = std::move(func), boundArgs = BoundArgs{std::forward(args)...} ](Status execStatus) mutable noexcept { if (!execStatus.isOK()) { promise.setError(std::move(execStatus)); return; } promise.setWith([&] { auto closure = [&] { return std::apply(func, std::move(boundArgs)); }; if constexpr (future_details::isFutureLike) { // Cheat and convert to an inline Future since we know we will schedule // further user callbacks onto an executor. return closure().unsafeToInlineFuture(); } else { return closure(); } }); }); return future; }; } template inline ExecutorFuture SemiFuture::thenRunOn(ExecutorPtr exec) && noexcept { return ExecutorFuture(std::move(exec), std::move(_impl)); } template Future SemiFuture::unsafeToInlineFuture() && noexcept { return Future(std::move(_impl)); } template inline SharedSemiFuture> future_details::FutureImpl::share() && noexcept { using Out = SharedSemiFuture>; if (_immediate) return Out(SharedStateHolder>::makeReady(std::move(*_immediate))); return Out(SharedStateHolder>(std::move(_shared))); } } // namespace mongo