diff options
Diffstat (limited to 'src/mongo')
-rw-r--r-- | src/mongo/util/future.h | 258 | ||||
-rw-r--r-- | src/mongo/util/future_test_edge_cases.cpp | 70 | ||||
-rw-r--r-- | src/mongo/util/future_test_future_int.cpp | 38 | ||||
-rw-r--r-- | src/mongo/util/future_test_future_move_only.cpp | 22 | ||||
-rw-r--r-- | src/mongo/util/future_test_future_void.cpp | 34 | ||||
-rw-r--r-- | src/mongo/util/future_test_utils.h | 12 |
6 files changed, 419 insertions, 15 deletions
diff --git a/src/mongo/util/future.h b/src/mongo/util/future.h index b7299afd89a..04ab91fc605 100644 --- a/src/mongo/util/future.h +++ b/src/mongo/util/future.h @@ -61,6 +61,14 @@ class Future; template <> class Future<void>; +template <typename T> +class SharedPromise; + +template <typename T> +class SharedSemiFuture; +template <> +class SharedSemiFuture<void>; + // Using extern constexpr to prevent the compiler from allocating storage as a poor man's c++17 // inline constexpr variable. // TODO delete extern in c++17 because inline is the default for constexper variables. @@ -68,6 +76,8 @@ template <typename T> extern constexpr bool isFuture = false; template <typename T> extern constexpr bool isFuture<Future<T>> = true; +template <typename T> +extern constexpr bool isFuture<SharedSemiFuture<T>> = true; // This is used to "normalize" void since it can't be used as an argument and it becomes Status // rather than StatusWith<void>. @@ -504,17 +514,16 @@ struct SharedStateImpl final : SharedStateBase { // public API. using future_details::Promise; using future_details::Future; +using future_details::SharedPromise; +using future_details::SharedSemiFuture; /** * This class represents the producer side of a Future. * - * This is a single-shot class. You may only extract the Future once, and you may either set a value - * or error at most once. Extracting the future and setting the value/error can be done in either - * order. - * - * If the Future has been extracted, but no value or error has been set at the time this Promise is - * destroyed, a error will be set with ErrorCode::BrokenPromise. This should generally be considered - * a programmer error, and should not be relied upon. We may make it debug-fatal in the future. + * This is a single-shot class: you may either set a value or error at most once. If no value or + * error has been set at the time this Promise is destroyed, a error will be set with + * ErrorCode::BrokenPromise. This should generally be considered a programmer error, and should not + * be relied upon. We may make it debug-fatal in the future. * * Only one thread can use a given Promise at a time, but another thread may be using the associated * Future object. @@ -658,7 +667,7 @@ public: static_assert(!std::is_same<T, Status>::value, "Future<Status> is banned. Use Future<void> instead."); static_assert(!isStatusWith<T>, "Future<StatusWith<T>> is banned. Just use Future<T> instead."); - static_assert(!isFuture<T>, "Future<Future<T>> is banned. Just use Future<T> instead."); + static_assert(!isFuture<T>, "Future of Future types is banned. Just use Future<T> instead."); static_assert(!std::is_reference<T>::value, "Future<T&> is banned."); static_assert(!std::is_const<T>::value, "Future<const T> is banned."); static_assert(!std::is_array<T>::value, "Future<T[]> is banned."); @@ -711,6 +720,11 @@ public: } /** + * Convert this Future to a SharedSemiFuture. + */ + SharedSemiFuture<T> share() && noexcept; + + /** * If this returns true, get() is guaranteed not to block and callbacks will be immediately * invoked. You can't assume anything if this returns false since it may be completed * immediately after checking (unless you have independent knowledge that this Future can't @@ -1267,6 +1281,8 @@ public: return Future<FakeVoid>::makeReady(std::move(status)); } + SharedSemiFuture<void> share() && noexcept; + bool isReady() const { return _inner.isReady(); } @@ -1351,6 +1367,216 @@ private: }; /** + * SharedSemiFuture<T> is logically a possibly-deferred StatusWith<T> (or Status when T is void). + * + * All methods that are present do the same as on a Future<T> so see it for documentation. + * + * Unlike Future<T> it only supports blocking operation, not chained continuations. This is intended + * to protect the promise-completer's execution context from needing to perform arbitrary + * operations requested by other subsystem's continuations. + * TODO Support continuation chaining when supplied with an executor to run them on. + * + * A SharedSemiFuture may be passed between threads, but only one thread may use it at a time. + */ +template <typename T> +class MONGO_WARN_UNUSED_RESULT_CLASS future_details::SharedSemiFuture { +public: + static_assert(!std::is_same<T, Status>::value, + "SharedSemiFuture<Status> is banned. Use SharedSemiFuture<void> instead."); + static_assert( + !isStatusWith<T>, + "SharedSemiFuture<StatusWith<T>> is banned. Just use SharedSemiFuture<T> instead."); + static_assert( + !isFuture<T>, + "SharedSemiFuture of Future types is banned. Just use SharedSemiFuture<T> instead."); + static_assert(!std::is_reference<T>::value, "SharedSemiFuture<T&> is banned."); + static_assert(!std::is_const<T>::value, "SharedSemiFuture<const T> is banned."); + static_assert(!std::is_array<T>::value, "SharedSemiFuture<T[]> is banned."); + + using value_type = T; + + SharedSemiFuture() = default; + + bool isReady() const { + return _shared->state.load(std::memory_order_acquire) == SSBState::kFinished; + } + + void wait(Interruptible* interruptible = Interruptible::notInterruptible()) const { + _shared->wait(interruptible); + } + + Status waitNoThrow(Interruptible* interruptible = Interruptible::notInterruptible()) const + noexcept { + try { + _shared->wait(interruptible); + } catch (const DBException& ex) { + return ex.toStatus(); + } + + return Status::OK(); + } + + const T& get(Interruptible* interruptible = Interruptible::notInterruptible()) const& { + _shared->wait(interruptible); + uassertStatusOK(_shared->status); + return *(_shared->data); + } + + StatusWith<T> getNoThrow( + Interruptible* interruptible = Interruptible::notInterruptible()) const& noexcept { + try { + _shared->wait(interruptible); + } catch (const DBException& ex) { + return ex.toStatus(); + } + + if (!_shared->status.isOK()) + return _shared->status; + return *_shared->data; + } + +private: + template <typename T2> + friend class SharedPromise; + template <typename T2> + friend class Future; + friend class SharedSemiFuture<void>; + + explicit SharedSemiFuture(boost::intrusive_ptr<SharedState<T>> ptr) : _shared(std::move(ptr)) {} + + boost::intrusive_ptr<SharedState<T>> _shared; +}; + +template <> +class MONGO_WARN_UNUSED_RESULT_CLASS future_details::SharedSemiFuture<void> { +public: + using value_type = void; + + bool isReady() const { + return _inner.isReady(); + } + + void wait(Interruptible* interruptible = Interruptible::notInterruptible()) const { + _inner.wait(interruptible); + } + + Status waitNoThrow(Interruptible* interruptible = Interruptible::notInterruptible()) const + noexcept { + return _inner.waitNoThrow(interruptible); + } + + void get(Interruptible* interruptible = Interruptible::notInterruptible()) const { + _inner.get(interruptible); + } + + Status getNoThrow(Interruptible* interruptible = Interruptible::notInterruptible()) const + noexcept { + return _inner.getNoThrow(interruptible).getStatus(); + } + +private: + friend class SharedPromise<void>; + friend class Future<void>; + + explicit SharedSemiFuture(boost::intrusive_ptr<SharedState<FakeVoid>> ptr) + : _inner(std::move(ptr)) {} + + /*implicit*/ SharedSemiFuture(SharedSemiFuture<FakeVoid>&& inner) : _inner(std::move(inner)) {} + /*implicit*/ operator SharedSemiFuture<FakeVoid>() && { + return std::move(_inner); + } + + SharedSemiFuture<FakeVoid> _inner; +}; + +/** + * This class represents the producer of SharedSemiFutures. + * + * This is a single-shot class: you may either set a value or error at most once. However you may + * extract as many futures as you want and they will all be completed at the same time. Any number + * of threads can extract a future at the same time. It is also safe to extract a future + * concurrently with completing the promise. If you extract a future after the promise has been + * completed, a ready future will be returned. You must still ensure that all calls to getFuture() + * complete prior to destroying the Promise. + * + * If no value or error has been set at the time this Promise is destroyed, an error will be set + * with ErrorCode::BrokenPromise. This should generally be considered a programmer error, and should + * not be relied upon. We may make it debug-fatal in the future. + * + * Unless otherwise specified, all methods behave the same as on Promise<T>. + */ +template <typename T> +class future_details::SharedPromise { +public: + using value_type = T; + + /** + * Creates a `SharedPromise` ready for use. + */ + SharedPromise() = default; + + ~SharedPromise() { + if (MONGO_unlikely(!haveCompleted())) { + _sharedState->setError({ErrorCodes::BrokenPromise, "broken promise"}); + } + } + + SharedPromise(const SharedPromise&) = delete; + SharedPromise(SharedPromise&&) = delete; + SharedPromise& operator=(const SharedPromise&) = delete; + SharedPromise& operator=(SharedPromise&& p) noexcept = delete; + + /** + * Returns a future associated with this promise. All returned futures will be completed when + * the promise is completed. + */ + SharedSemiFuture<T> getFuture() const { + return SharedSemiFuture<T>(_sharedState); + } + + template <typename Func> + void setWith(Func&& func) noexcept { + invariant(!haveCompleted()); + setFrom(Future<void>::makeReady().then(std::forward<Func>(func))); + } + + void setFrom(Future<T>&& future) noexcept { + invariant(!haveCompleted()); + std::move(future).propagateResultTo(_sharedState.get()); + } + + template <typename... Args> + void emplaceValue(Args&&... args) noexcept { + invariant(!haveCompleted()); + _sharedState->emplaceValue(std::forward<Args>(args)...); + } + + void setError(Status status) noexcept { + invariant(!status.isOK()); + invariant(!haveCompleted()); + _sharedState->setError(std::move(status)); + } + + // TODO rename to not XXXWith and handle void + void setFromStatusWith(StatusWith<T> sw) noexcept { + invariant(!haveCompleted()); + _sharedState->setFromStatusWith(std::move(sw)); + } + +private: + friend class Future<void>; + + bool haveCompleted() const noexcept { + // This can be relaxed because it is only called from the Promise thread which is also the + // only thread that will transition this from returning false to true. Additionally it isn't + // used to establish synchronization with any other thread. + return _sharedState->state.load(std::memory_order_relaxed) == SSBState::kFinished; + } + + const boost::intrusive_ptr<SharedState<T>> _sharedState = make_intrusive<SharedState<T>>(); +}; + +/** * Makes a ready Future with the return value of a nullary function. This has the same semantics as * Promise::setWith, and has the same reasons to prefer it over Future<T>::makeReady(). Also, it * deduces the T, so it is easier to use. @@ -1419,8 +1645,22 @@ inline void Promise<T>::setWith(Func&& func) noexcept { } template <typename T> - Future<void> Future<T>::ignoreValue() && noexcept { + inline Future<void> Future<T>::ignoreValue() && noexcept { return std::move(*this).then([](auto&&) {}); } +template <typename T> + inline SharedSemiFuture<T> Future<T>::share() && noexcept { + if (!_immediate) + return SharedSemiFuture<T>(std::move(_shared)); + + auto shared = make_intrusive<SharedState<T>>(); + shared->emplaceValue(std::move(*_immediate)); + return SharedSemiFuture<T>(std::move(shared)); +} + +inline SharedSemiFuture<void> Future<void>::share() && noexcept { + return std::move(_inner).share(); +} + } // namespace mongo diff --git a/src/mongo/util/future_test_edge_cases.cpp b/src/mongo/util/future_test_edge_cases.cpp index c3ce5db03e2..f2b05484ce1 100644 --- a/src/mongo/util/future_test_edge_cases.cpp +++ b/src/mongo/util/future_test_edge_cases.cpp @@ -163,6 +163,76 @@ TEST(Future_EdgeCases, interrupted_wait_then_then_with_bgthread) { std::move(future).then([] {}).get(); } +TEST(Future_EdgeCases, Racing_SharePromise_getFuture_and_emplaceValue) { + SharedPromise<void> sp; + std::vector<Future<void>> futs; + futs.reserve(30); + + // Note, this is intentionally somewhat racy. async() is defined to sleep 100ms before running + // the function so the first batch of futures will generally block before getting the value is + // emplaced, and the second batch will happen around the same time. In all cases the final batch + // happen after the emplaceValue(), but roughly at the same time. Under TSAN the sleep is + // removed to allow it to find more interesting interleavings, and give it a better chance at + // detecting data races. + + for (int i = 0; i < 10; i++) { + futs.push_back(async([&] { sp.getFuture().get(); })); + } + + sleepUnlessInTsan(); + + for (int i = 0; i < 10; i++) { + futs.push_back(async([&] { sp.getFuture().get(); })); + } + + sleepUnlessInTsan(); + + sp.emplaceValue(); + + for (int i = 0; i < 10; i++) { + futs.push_back(async([&] { sp.getFuture().get(); })); + } + + for (auto& fut : futs) { + fut.get(); + } +} + +TEST(Future_EdgeCases, Racing_SharePromise_getFuture_and_setError) { + SharedPromise<void> sp; + std::vector<Future<void>> futs; + futs.reserve(30); + + // Note, this is intentionally somewhat racy. async() is defined to sleep 100ms before running + // the function so the first batch of futures will generally block before getting the value is + // emplaced, and the second batch will happen around the same time. In all cases the final batch + // happen after the emplaceValue(), but roughly at the same time. Under TSAN the sleep is + // removed to allow it to find more interesting interleavings, and give it a better chance at + // detecting data races. + + for (int i = 0; i < 10; i++) { + futs.push_back(async([&] { sp.getFuture().get(); })); + } + + sleepUnlessInTsan(); + + for (int i = 0; i < 10; i++) { + futs.push_back(async([&] { sp.getFuture().get(); })); + } + + sleepUnlessInTsan(); + + sp.setError(failStatus()); + + for (int i = 0; i < 10; i++) { + futs.push_back(async([&] { sp.getFuture().get(); })); + } + + for (auto& fut : futs) { + ASSERT_EQ(fut.getNoThrow(), failStatus()); + } +} + // Make sure we actually die if someone throws from the getAsync callback. // // With gcc 5.8 we terminate, but print "terminate() called. No exception is active". This works in diff --git a/src/mongo/util/future_test_future_int.cpp b/src/mongo/util/future_test_future_int.cpp index 92955d367fb..899310cf69c 100644 --- a/src/mongo/util/future_test_future_int.cpp +++ b/src/mongo/util/future_test_future_int.cpp @@ -89,6 +89,17 @@ TEST(Future, Success_getNothrowRvalue) { [](Future<int>&& fut) { ASSERT_EQ(std::move(fut).getNoThrow(), 1); }); } +TEST(Future, Success_shared_get) { + FUTURE_SUCCESS_TEST([] { return 1; }, + [](Future<int>&& fut) { ASSERT_EQ(std::move(fut).share().get(), 1); }); +} + +TEST(Future, Success_shared_getNothrow) { + FUTURE_SUCCESS_TEST( + [] { return 1; }, + [](Future<int>&& fut) { ASSERT_EQ(std::move(fut).share().getNoThrow(), 1); }); +} + TEST(Future, Success_getAsync) { FUTURE_SUCCESS_TEST( [] { return 1; }, @@ -129,6 +140,16 @@ TEST(Future, Fail_getNothrowRvalue) { [](Future<int>&& fut) { ASSERT_EQ(std::move(fut).getNoThrow(), failStatus()); }); } +TEST(Future, Fail_shared_get) { + FUTURE_FAIL_TEST<int>( + [](Future<int>&& fut) { ASSERT_THROWS_failStatus(std::move(fut).share().get()); }); +} + +TEST(Future, Fail_shared_getNothrow) { + FUTURE_FAIL_TEST<int>( + [](Future<int>&& fut) { ASSERT_EQ(std::move(fut).share().getNoThrow(), failStatus()); }); +} + TEST(Future, Fail_getAsync) { FUTURE_FAIL_TEST<int>([](Future<int>&& fut) { auto pf = makePromiseFuture<int>(); @@ -203,9 +224,24 @@ TEST(Future, isReady_TSAN_OK) { done = true; return 1; }); + //(void)*const_cast<volatile bool*>(&done); // Data Race! Uncomment to make sure TSAN works. while (!fut.isReady()) { } - // ASSERT(done); // Data Race! Uncomment to make sure TSAN is working. + ASSERT(done); + (void)fut.get(); + ASSERT(done); +} + +TEST(Future, isReady_shared_TSAN_OK) { + bool done = false; + auto fut = async([&] { + done = true; + return 1; + }).share(); + //(void)*const_cast<volatile bool*>(&done); // Data Race! Uncomment to make sure TSAN works. + while (!fut.isReady()) { + } + ASSERT(done); (void)fut.get(); ASSERT(done); } diff --git a/src/mongo/util/future_test_future_move_only.cpp b/src/mongo/util/future_test_future_move_only.cpp index b3d6dcc9730..f7b706ecee3 100644 --- a/src/mongo/util/future_test_future_move_only.cpp +++ b/src/mongo/util/future_test_future_move_only.cpp @@ -90,6 +90,11 @@ TEST(Future_MoveOnly, Success_getRvalue) { [](Future<Widget>&& fut) { ASSERT_EQ(std::move(fut).get(), 1); }); } +TEST(Future_MoveOnly, Success_shared_get) { + FUTURE_SUCCESS_TEST([] { return Widget(1); }, + [](Future<Widget>&& fut) { ASSERT_EQ(std::move(fut).share().get(), 1); }); +} + #if 0 // Needs copy TEST(Future_MoveOnly, Success_getNothrowLvalue) { FUTURE_SUCCESS_TEST([] { return Widget(1); }, @@ -100,6 +105,12 @@ TEST(Future_MoveOnly, Success_getNothrowConstLvalue) { FUTURE_SUCCESS_TEST([] { return Widget(1); }, [](const Future<Widget>& fut) { ASSERT_EQ(fut.getNoThrow(), 1); }); } + +TEST(Future_MoveOnly, Success_shared_getNothrow) { + FUTURE_SUCCESS_TEST( + [] { return Widget(1); }, + [](Future<Widget>&& fut) { ASSERT_EQ(std::move(fut).share().getNoThrow(), 1); }); +} #endif TEST(Future_MoveOnly, Success_getNothrowRvalue) { @@ -136,6 +147,11 @@ TEST(Future_MoveOnly, Fail_getRvalue) { [](Future<Widget>&& fut) { ASSERT_THROWS_failStatus(std::move(fut).get()); }); } +TEST(Future_MoveOnly, Fail_shared_get) { + FUTURE_FAIL_TEST<Widget>( + [](Future<Widget>&& fut) { ASSERT_THROWS_failStatus(std::move(fut).share().get()); }); +} + #if 0 // Needs copy TEST(Future_MoveOnly, Fail_getNothrowLvalue) { FUTURE_FAIL_TEST<Widget>([](Future<Widget>&& fut) { ASSERT_EQ(fut.getNoThrow(), failStatus); }); @@ -145,6 +161,12 @@ TEST(Future_MoveOnly, Fail_getNothrowConstLvalue) { FUTURE_FAIL_TEST<Widget>( [](const Future<Widget>& fut) { ASSERT_EQ(fut.getNoThrow(), failStatus); }); } + +TEST(Future_MoveOnly, Fail_shared_getNothrow) { + FUTURE_FAIL_TEST<Widget>([](Future<Widget>&& fut) { + ASSERT_EQ(std::move(fut).share().getNoThrow().getStatus(), failStatus()); + }); +} #endif TEST(Future_MoveOnly, Fail_getNothrowRvalue) { diff --git a/src/mongo/util/future_test_future_void.cpp b/src/mongo/util/future_test_future_void.cpp index 4095706739f..17585dc4257 100644 --- a/src/mongo/util/future_test_future_void.cpp +++ b/src/mongo/util/future_test_future_void.cpp @@ -68,6 +68,16 @@ TEST(Future_Void, Success_getNothrowRvalue) { [] {}, [](Future<void>&& fut) { ASSERT_EQ(std::move(fut).getNoThrow(), Status::OK()); }); } +TEST(Future_Void, Success_shared_get) { + FUTURE_SUCCESS_TEST([] {}, [](Future<void>&& fut) { std::move(fut).share().get(); }); +} + +TEST(Future_Void, Success_shared_getNothrow) { + FUTURE_SUCCESS_TEST( + [] {}, + [](Future<void>&& fut) { ASSERT_EQ(std::move(fut).share().getNoThrow(), Status::OK()); }); +} + TEST(Future_Void, Success_getAsync) { FUTURE_SUCCESS_TEST( [] {}, @@ -108,6 +118,16 @@ TEST(Future_Void, Fail_getNothrowRvalue) { [](Future<void>&& fut) { ASSERT_EQ(std::move(fut).getNoThrow(), failStatus()); }); } +TEST(Future_Void, Fail_share_getRvalue) { + FUTURE_FAIL_TEST<void>( + [](Future<void>&& fut) { ASSERT_THROWS_failStatus(std::move(fut).share().get()); }); +} + +TEST(Future_Void, Fail_share_getNothrow) { + FUTURE_FAIL_TEST<void>( + [](Future<void>&& fut) { ASSERT_EQ(std::move(fut).share().getNoThrow(), failStatus()); }); +} + TEST(Future_Void, Fail_getAsync) { FUTURE_FAIL_TEST<void>([](Future<void>&& fut) { auto pf = makePromiseFuture<void>(); @@ -149,9 +169,21 @@ TEST(Future_Void, Fail_isReady) { TEST(Future_Void, isReady_TSAN_OK) { bool done = false; auto fut = async([&] { done = true; }); + //(void)*const_cast<volatile bool*>(&done); // Data Race! Uncomment to make sure TSAN works. + while (!fut.isReady()) { + } + ASSERT(done); + fut.get(); + ASSERT(done); +} + +TEST(Future_Void, isReady_share_TSAN_OK) { + bool done = false; + auto fut = async([&] { done = true; }).share(); + //(void)*const_cast<volatile bool*>(&done); // Data Race! Uncomment to make sure TSAN works. while (!fut.isReady()) { } - // ASSERT(done); // Data Race! Uncomment to make sure TSAN is working. + ASSERT(done); fut.get(); ASSERT(done); } diff --git a/src/mongo/util/future_test_utils.h b/src/mongo/util/future_test_utils.h index 2cce88ce66b..4cd20daadac 100644 --- a/src/mongo/util/future_test_utils.h +++ b/src/mongo/util/future_test_utils.h @@ -53,15 +53,19 @@ void completePromise(Promise<void>* promise, Func&& func) { promise->emplaceValue(); } +inline void sleepUnlessInTsan() { +#if !__has_feature(thread_sanitizer) + // TSAN works better without this sleep, but it is useful for testing correctness. + sleepmillis(100); // Try to wait until after the Future has been handled. +#endif +} + template <typename Func, typename Result = std::result_of_t<Func && ()>> Future<Result> async(Func&& func) { auto pf = makePromiseFuture<Result>(); stdx::thread([ promise = std::move(pf.promise), func = std::forward<Func>(func) ]() mutable { -#if !__has_feature(thread_sanitizer) - // TSAN works better without this sleep, but it is useful for testing correctness. - sleepmillis(100); // Try to wait until after the Future has been handled. -#endif + sleepUnlessInTsan(); try { completePromise(&promise, func); } catch (const DBException& ex) { |