diff options
Diffstat (limited to 'src/mongo/util/future.h')
-rw-r--r-- | src/mongo/util/future.h | 258 |
1 files changed, 249 insertions, 9 deletions
diff --git a/src/mongo/util/future.h b/src/mongo/util/future.h index b7299afd89a..04ab91fc605 100644 --- a/src/mongo/util/future.h +++ b/src/mongo/util/future.h @@ -61,6 +61,14 @@ class Future; template <> class Future<void>; +template <typename T> +class SharedPromise; + +template <typename T> +class SharedSemiFuture; +template <> +class SharedSemiFuture<void>; + // Using extern constexpr to prevent the compiler from allocating storage as a poor man's c++17 // inline constexpr variable. // TODO delete extern in c++17 because inline is the default for constexper variables. @@ -68,6 +76,8 @@ template <typename T> extern constexpr bool isFuture = false; template <typename T> extern constexpr bool isFuture<Future<T>> = true; +template <typename T> +extern constexpr bool isFuture<SharedSemiFuture<T>> = true; // This is used to "normalize" void since it can't be used as an argument and it becomes Status // rather than StatusWith<void>. @@ -504,17 +514,16 @@ struct SharedStateImpl final : SharedStateBase { // public API. using future_details::Promise; using future_details::Future; +using future_details::SharedPromise; +using future_details::SharedSemiFuture; /** * This class represents the producer side of a Future. * - * This is a single-shot class. You may only extract the Future once, and you may either set a value - * or error at most once. Extracting the future and setting the value/error can be done in either - * order. - * - * If the Future has been extracted, but no value or error has been set at the time this Promise is - * destroyed, a error will be set with ErrorCode::BrokenPromise. This should generally be considered - * a programmer error, and should not be relied upon. We may make it debug-fatal in the future. + * This is a single-shot class: you may either set a value or error at most once. If no value or + * error has been set at the time this Promise is destroyed, a error will be set with + * ErrorCode::BrokenPromise. This should generally be considered a programmer error, and should not + * be relied upon. We may make it debug-fatal in the future. * * Only one thread can use a given Promise at a time, but another thread may be using the associated * Future object. @@ -658,7 +667,7 @@ public: static_assert(!std::is_same<T, Status>::value, "Future<Status> is banned. Use Future<void> instead."); static_assert(!isStatusWith<T>, "Future<StatusWith<T>> is banned. Just use Future<T> instead."); - static_assert(!isFuture<T>, "Future<Future<T>> is banned. Just use Future<T> instead."); + static_assert(!isFuture<T>, "Future of Future types is banned. Just use Future<T> instead."); static_assert(!std::is_reference<T>::value, "Future<T&> is banned."); static_assert(!std::is_const<T>::value, "Future<const T> is banned."); static_assert(!std::is_array<T>::value, "Future<T[]> is banned."); @@ -711,6 +720,11 @@ public: } /** + * Convert this Future to a SharedSemiFuture. + */ + SharedSemiFuture<T> share() && noexcept; + + /** * If this returns true, get() is guaranteed not to block and callbacks will be immediately * invoked. You can't assume anything if this returns false since it may be completed * immediately after checking (unless you have independent knowledge that this Future can't @@ -1267,6 +1281,8 @@ public: return Future<FakeVoid>::makeReady(std::move(status)); } + SharedSemiFuture<void> share() && noexcept; + bool isReady() const { return _inner.isReady(); } @@ -1351,6 +1367,216 @@ private: }; /** + * SharedSemiFuture<T> is logically a possibly-deferred StatusWith<T> (or Status when T is void). + * + * All methods that are present do the same as on a Future<T> so see it for documentation. + * + * Unlike Future<T> it only supports blocking operation, not chained continuations. This is intended + * to protect the promise-completer's execution context from needing to perform arbitrary + * operations requested by other subsystem's continuations. + * TODO Support continuation chaining when supplied with an executor to run them on. + * + * A SharedSemiFuture may be passed between threads, but only one thread may use it at a time. + */ +template <typename T> +class MONGO_WARN_UNUSED_RESULT_CLASS future_details::SharedSemiFuture { +public: + static_assert(!std::is_same<T, Status>::value, + "SharedSemiFuture<Status> is banned. Use SharedSemiFuture<void> instead."); + static_assert( + !isStatusWith<T>, + "SharedSemiFuture<StatusWith<T>> is banned. Just use SharedSemiFuture<T> instead."); + static_assert( + !isFuture<T>, + "SharedSemiFuture of Future types is banned. Just use SharedSemiFuture<T> instead."); + static_assert(!std::is_reference<T>::value, "SharedSemiFuture<T&> is banned."); + static_assert(!std::is_const<T>::value, "SharedSemiFuture<const T> is banned."); + static_assert(!std::is_array<T>::value, "SharedSemiFuture<T[]> is banned."); + + using value_type = T; + + SharedSemiFuture() = default; + + bool isReady() const { + return _shared->state.load(std::memory_order_acquire) == SSBState::kFinished; + } + + void wait(Interruptible* interruptible = Interruptible::notInterruptible()) const { + _shared->wait(interruptible); + } + + Status waitNoThrow(Interruptible* interruptible = Interruptible::notInterruptible()) const + noexcept { + try { + _shared->wait(interruptible); + } catch (const DBException& ex) { + return ex.toStatus(); + } + + return Status::OK(); + } + + const T& get(Interruptible* interruptible = Interruptible::notInterruptible()) const& { + _shared->wait(interruptible); + uassertStatusOK(_shared->status); + return *(_shared->data); + } + + StatusWith<T> getNoThrow( + Interruptible* interruptible = Interruptible::notInterruptible()) const& noexcept { + try { + _shared->wait(interruptible); + } catch (const DBException& ex) { + return ex.toStatus(); + } + + if (!_shared->status.isOK()) + return _shared->status; + return *_shared->data; + } + +private: + template <typename T2> + friend class SharedPromise; + template <typename T2> + friend class Future; + friend class SharedSemiFuture<void>; + + explicit SharedSemiFuture(boost::intrusive_ptr<SharedState<T>> ptr) : _shared(std::move(ptr)) {} + + boost::intrusive_ptr<SharedState<T>> _shared; +}; + +template <> +class MONGO_WARN_UNUSED_RESULT_CLASS future_details::SharedSemiFuture<void> { +public: + using value_type = void; + + bool isReady() const { + return _inner.isReady(); + } + + void wait(Interruptible* interruptible = Interruptible::notInterruptible()) const { + _inner.wait(interruptible); + } + + Status waitNoThrow(Interruptible* interruptible = Interruptible::notInterruptible()) const + noexcept { + return _inner.waitNoThrow(interruptible); + } + + void get(Interruptible* interruptible = Interruptible::notInterruptible()) const { + _inner.get(interruptible); + } + + Status getNoThrow(Interruptible* interruptible = Interruptible::notInterruptible()) const + noexcept { + return _inner.getNoThrow(interruptible).getStatus(); + } + +private: + friend class SharedPromise<void>; + friend class Future<void>; + + explicit SharedSemiFuture(boost::intrusive_ptr<SharedState<FakeVoid>> ptr) + : _inner(std::move(ptr)) {} + + /*implicit*/ SharedSemiFuture(SharedSemiFuture<FakeVoid>&& inner) : _inner(std::move(inner)) {} + /*implicit*/ operator SharedSemiFuture<FakeVoid>() && { + return std::move(_inner); + } + + SharedSemiFuture<FakeVoid> _inner; +}; + +/** + * This class represents the producer of SharedSemiFutures. + * + * This is a single-shot class: you may either set a value or error at most once. However you may + * extract as many futures as you want and they will all be completed at the same time. Any number + * of threads can extract a future at the same time. It is also safe to extract a future + * concurrently with completing the promise. If you extract a future after the promise has been + * completed, a ready future will be returned. You must still ensure that all calls to getFuture() + * complete prior to destroying the Promise. + * + * If no value or error has been set at the time this Promise is destroyed, an error will be set + * with ErrorCode::BrokenPromise. This should generally be considered a programmer error, and should + * not be relied upon. We may make it debug-fatal in the future. + * + * Unless otherwise specified, all methods behave the same as on Promise<T>. + */ +template <typename T> +class future_details::SharedPromise { +public: + using value_type = T; + + /** + * Creates a `SharedPromise` ready for use. + */ + SharedPromise() = default; + + ~SharedPromise() { + if (MONGO_unlikely(!haveCompleted())) { + _sharedState->setError({ErrorCodes::BrokenPromise, "broken promise"}); + } + } + + SharedPromise(const SharedPromise&) = delete; + SharedPromise(SharedPromise&&) = delete; + SharedPromise& operator=(const SharedPromise&) = delete; + SharedPromise& operator=(SharedPromise&& p) noexcept = delete; + + /** + * Returns a future associated with this promise. All returned futures will be completed when + * the promise is completed. + */ + SharedSemiFuture<T> getFuture() const { + return SharedSemiFuture<T>(_sharedState); + } + + template <typename Func> + void setWith(Func&& func) noexcept { + invariant(!haveCompleted()); + setFrom(Future<void>::makeReady().then(std::forward<Func>(func))); + } + + void setFrom(Future<T>&& future) noexcept { + invariant(!haveCompleted()); + std::move(future).propagateResultTo(_sharedState.get()); + } + + template <typename... Args> + void emplaceValue(Args&&... args) noexcept { + invariant(!haveCompleted()); + _sharedState->emplaceValue(std::forward<Args>(args)...); + } + + void setError(Status status) noexcept { + invariant(!status.isOK()); + invariant(!haveCompleted()); + _sharedState->setError(std::move(status)); + } + + // TODO rename to not XXXWith and handle void + void setFromStatusWith(StatusWith<T> sw) noexcept { + invariant(!haveCompleted()); + _sharedState->setFromStatusWith(std::move(sw)); + } + +private: + friend class Future<void>; + + bool haveCompleted() const noexcept { + // This can be relaxed because it is only called from the Promise thread which is also the + // only thread that will transition this from returning false to true. Additionally it isn't + // used to establish synchronization with any other thread. + return _sharedState->state.load(std::memory_order_relaxed) == SSBState::kFinished; + } + + const boost::intrusive_ptr<SharedState<T>> _sharedState = make_intrusive<SharedState<T>>(); +}; + +/** * Makes a ready Future with the return value of a nullary function. This has the same semantics as * Promise::setWith, and has the same reasons to prefer it over Future<T>::makeReady(). Also, it * deduces the T, so it is easier to use. @@ -1419,8 +1645,22 @@ inline void Promise<T>::setWith(Func&& func) noexcept { } template <typename T> - Future<void> Future<T>::ignoreValue() && noexcept { + inline Future<void> Future<T>::ignoreValue() && noexcept { return std::move(*this).then([](auto&&) {}); } +template <typename T> + inline SharedSemiFuture<T> Future<T>::share() && noexcept { + if (!_immediate) + return SharedSemiFuture<T>(std::move(_shared)); + + auto shared = make_intrusive<SharedState<T>>(); + shared->emplaceValue(std::move(*_immediate)); + return SharedSemiFuture<T>(std::move(shared)); +} + +inline SharedSemiFuture<void> Future<void>::share() && noexcept { + return std::move(_inner).share(); +} + } // namespace mongo |