diff options
author | Matthew Saltz <matthew.saltz@mongodb.com> | 2020-09-03 15:40:30 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2020-09-24 22:56:50 +0000 |
commit | 39cb1d7307652e105e1b12317ef0e671c39d567c (patch) | |
tree | 5c2756f97e466f9fec125507dd48cdf245ac1a8a | |
parent | 53aaa253ff598578816d4add5e88f063775b4b85 (diff) | |
download | mongo-39cb1d7307652e105e1b12317ef0e671c39d567c.tar.gz |
SERVER-50341 Add whenAny and whenAll
-rw-r--r-- | src/mongo/util/future_util.h | 99 | ||||
-rw-r--r-- | src/mongo/util/future_util_test.cpp | 358 |
2 files changed, 443 insertions, 14 deletions
diff --git a/src/mongo/util/future_util.h b/src/mongo/util/future_util.h index ceaf310090f..d609a2f1e6d 100644 --- a/src/mongo/util/future_util.h +++ b/src/mongo/util/future_util.h @@ -380,4 +380,103 @@ SemiFuture<void> whenAllSucceed(std::vector<FutureLike>&& futures) { return std::move(future).semi(); } +/** + * Given a vector of input Futures or ExecutorFutures, returns a SemiFuture that contains the + * results of each input future wrapped in a StatusWith to indicate whether it resolved with success + * or failure and will be resolved when all of the input futures have resolved. + */ +template <typename FutureT, + typename Value = typename FutureT::value_type, + typename ResultVector = std::vector<StatusOrStatusWith<Value>>> +SemiFuture<ResultVector> whenAll(std::vector<FutureT>&& futures) { + invariant(futures.size() > 0); + + /** + * A structure used to share state between the input futures. + */ + struct SharedBlock { + SharedBlock(size_t numFuturesToWaitFor, Promise<ResultVector> result) + : numFuturesToWaitFor(numFuturesToWaitFor), + intermediateResult(numFuturesToWaitFor, {ErrorCodes::InternalError, ""}), + resultPromise(std::move(result)) {} + // Total number of input futures. + const size_t numFuturesToWaitFor; + // Tracks the number of input futures which have resolved so far. + AtomicWord<size_t> numReady{0}; + // A vector containing the results of each input future. + ResultVector intermediateResult; + // The promise corresponding to the resulting SemiFuture returned by this function. + Promise<ResultVector> resultPromise; + }; + + auto [promise, future] = makePromiseFuture<ResultVector>(); + auto sharedBlock = std::make_shared<SharedBlock>(futures.size(), std::move(promise)); + + for (size_t i = 0; i < futures.size(); ++i) { + std::move(futures[i]) + .getAsync([sharedBlock, myIndex = i](StatusOrStatusWith<Value> value) mutable { + sharedBlock->intermediateResult[myIndex] = std::move(value); + + auto numReady = sharedBlock->numReady.addAndFetch(1); + invariant(numReady <= sharedBlock->numFuturesToWaitFor); + + if (numReady == sharedBlock->numFuturesToWaitFor) { + // All results are ready. + sharedBlock->resultPromise.emplaceValue( + std::move(sharedBlock->intermediateResult)); + } + }); + } + + return std::move(future).semi(); +} + +/** + * Result type for the whenAny function. + */ +template <typename T> +struct WhenAnyResult { + // The result of the future that resolved first. + StatusOrStatusWith<T> result; + // The index of the future that resolved first. + size_t index; +}; + +/** + * Given a vector of input Futures or ExecutorFutures, returns a SemiFuture which will contain a + * struct containing the first of those futures to resolve along with its index in the input array. + */ +template <typename FutureT, + typename Value = typename FutureT::value_type, + typename Result = WhenAnyResult<Value>> +SemiFuture<Result> whenAny(std::vector<FutureT>&& futures) { + invariant(futures.size() > 0); + + /** + * A structure used to share state between the input futures. + */ + struct SharedBlock { + SharedBlock(Promise<Result> result) : resultPromise(std::move(result)) {} + // Tracks whether or not the resultPromise has been set. + AtomicWord<bool> done{false}; + // The promise corresponding to the resulting SemiFuture returned by this function. + Promise<Result> resultPromise; + }; + + auto [promise, future] = makePromiseFuture<Result>(); + auto sharedBlock = std::make_shared<SharedBlock>(std::move(promise)); + + for (size_t i = 0; i < futures.size(); ++i) { + std::move(futures[i]) + .getAsync([sharedBlock, myIndex = i](StatusOrStatusWith<Value> value) mutable { + // If this is the first input future to complete, change done to true and set the + // value on the promise. + if (!sharedBlock->done.swap(true)) { + sharedBlock->resultPromise.emplaceValue(Result{std::move(value), myIndex}); + } + }); + } + + return std::move(future).semi(); +} } // namespace mongo diff --git a/src/mongo/util/future_util_test.cpp b/src/mongo/util/future_util_test.cpp index 3f2606c3d4f..7d9dbb1655a 100644 --- a/src/mongo/util/future_util_test.cpp +++ b/src/mongo/util/future_util_test.cpp @@ -163,21 +163,19 @@ TEST_F(AsyncTryUntilTest, LoopBodyPropagatesErrorToConditionAndCaller) { ASSERT_EQ(resultFut.getNoThrow(), ErrorCodes::InternalError); } -class WhenAllSucceedTest : public FutureUtilTest { -public: - template <typename T> - static std::pair<std::vector<Promise<T>>, std::vector<Future<T>>> makePromisesAndFutures( - size_t size) { - std::vector<Future<T>> inputFutures; - std::vector<Promise<T>> inputPromises; - for (size_t i = 0; i < size; ++i) { - auto [inputPromise, inputFuture] = makePromiseFuture<T>(); - inputFutures.emplace_back(std::move(inputFuture)); - inputPromises.emplace_back(std::move(inputPromise)); - } - return std::make_pair(std::move(inputPromises), std::move(inputFutures)); +template <typename T> +std::pair<std::vector<Promise<T>>, std::vector<Future<T>>> makePromisesAndFutures(size_t size) { + std::vector<Future<T>> inputFutures; + std::vector<Promise<T>> inputPromises; + for (size_t i = 0; i < size; ++i) { + auto [inputPromise, inputFuture] = makePromiseFuture<T>(); + inputFutures.emplace_back(std::move(inputFuture)); + inputPromises.emplace_back(std::move(inputPromise)); } -}; + return std::make_pair(std::move(inputPromises), std::move(inputFutures)); +} + +class WhenAllSucceedTest : public FutureUtilTest {}; static const Status kErrorStatus = {ErrorCodes::InternalError, ""}; @@ -462,5 +460,337 @@ TEST_F(WhenAllSucceedVoidTest, WhenAllSucceedWorksWithExecutorFutures) { ASSERT_EQ(result.getNoThrow(), Status::OK()); } +class WhenAllTest : public FutureUtilTest {}; + +TEST_F(WhenAllTest, ReturnsOnceAllInputsResolveWithSuccess) { + const auto kNumInputs = 5; + auto [inputPromises, inputFutures] = makePromisesAndFutures<int>(kNumInputs); + + auto result = whenAll(std::move(inputFutures)); + ASSERT_FALSE(result.isReady()); + + const auto kValue = 10; + for (auto i = 0; i < kNumInputs; ++i) { + ASSERT_FALSE(result.isReady()); + inputPromises[i].emplaceValue(kValue); + } + + auto output = result.get(); + ASSERT_EQ(output.size(), kNumInputs); + for (auto& swValue : output) { + ASSERT_TRUE(swValue.isOK()); + ASSERT_EQ(swValue.getValue(), kValue); + } +} + +TEST_F(WhenAllTest, ReturnsOnceAllInputsResolveWithError) { + const auto kNumInputs = 5; + auto [inputPromises, inputFutures] = makePromisesAndFutures<int>(kNumInputs); + + auto result = whenAll(std::move(inputFutures)); + ASSERT_FALSE(result.isReady()); + + for (auto i = 0; i < kNumInputs; ++i) { + ASSERT_FALSE(result.isReady()); + inputPromises[i].setError(kErrorStatus); + } + + auto output = result.get(); + ASSERT_EQ(output.size(), kNumInputs); + for (auto& swValue : output) { + ASSERT_EQ(swValue.getStatus(), kErrorStatus); + } +} + +TEST_F(WhenAllTest, ReturnsOnceAllInputsResolveWithSuccessWithVoidInputs) { + const auto kNumInputs = 5; + auto [inputPromises, inputFutures] = makePromisesAndFutures<void>(kNumInputs); + + auto result = whenAll(std::move(inputFutures)); + ASSERT_FALSE(result.isReady()); + + for (auto i = 0; i < kNumInputs; ++i) { + ASSERT_FALSE(result.isReady()); + inputPromises[i].emplaceValue(); + } + + auto output = result.get(); + ASSERT_EQ(output.size(), kNumInputs); + for (auto& status : output) { + ASSERT_OK(status); + } +} + +TEST_F(WhenAllTest, ReturnsOnceAllInputsResolveWithErrorWithVoidInputs) { + const auto kNumInputs = 5; + auto [inputPromises, inputFutures] = makePromisesAndFutures<void>(kNumInputs); + + auto result = whenAll(std::move(inputFutures)); + ASSERT_FALSE(result.isReady()); + + for (auto i = 0; i < kNumInputs; ++i) { + ASSERT_FALSE(result.isReady()); + inputPromises[i].setError(kErrorStatus); + } + + auto output = result.get(); + ASSERT_EQ(output.size(), kNumInputs); + for (auto& status : output) { + ASSERT_EQ(status, kErrorStatus); + } +} + + +TEST_F(WhenAllTest, ReturnsOnceAllInputsResolveWithMixOfSuccessAndError) { + const auto kNumInputs = 5; + auto [inputPromises, inputFutures] = makePromisesAndFutures<int>(kNumInputs); + + auto result = whenAll(std::move(inputFutures)); + ASSERT_FALSE(result.isReady()); + + const auto kValue = 10; + const auto kNumSuccesses = 3; + for (auto i = 0; i < kNumInputs; ++i) { + ASSERT_FALSE(result.isReady()); + if (i < kNumSuccesses) { + inputPromises[i].emplaceValue(kValue); + } else { + inputPromises[i].setError(kErrorStatus); + } + } + + auto output = result.get(); + ASSERT_EQ(output.size(), kNumInputs); + for (auto i = 0; i < kNumInputs; ++i) { + auto swValue = output[i]; + if (i < kNumSuccesses) { + ASSERT_TRUE(swValue.isOK()); + ASSERT_EQ(swValue.getValue(), kValue); + } else { + ASSERT_EQ(swValue.getStatus(), kErrorStatus); + } + } +} + +TEST_F(WhenAllTest, ReturnsOnceAllInputsResolveWithMixOfSuccessAndErrorInReverseOrder) { + const auto kNumInputs = 5; + auto [inputPromises, inputFutures] = makePromisesAndFutures<int>(kNumInputs); + + auto result = whenAll(std::move(inputFutures)); + ASSERT_FALSE(result.isReady()); + + const auto kValue = 10; + const auto kNumSuccesses = 3; + // Iterate over inputs backwards, resolving them one at a time. + for (auto i = kNumInputs - 1; i >= 0; --i) { + ASSERT_FALSE(result.isReady()); + if (i < kNumSuccesses) { + inputPromises[i].emplaceValue(kValue); + } else { + inputPromises[i].setError(kErrorStatus); + } + } + + auto output = result.get(); + ASSERT_EQ(output.size(), kNumInputs); + for (auto i = kNumInputs - 1; i >= 0; --i) { + auto swValue = output[i]; + if (i < kNumSuccesses) { + ASSERT_TRUE(swValue.isOK()); + ASSERT_EQ(swValue.getValue(), kValue); + } else { + ASSERT_EQ(swValue.getStatus(), kErrorStatus); + } + } +} + +TEST_F(WhenAllTest, WorksWithExecutorFutures) { + const auto kNumInputs = 5; + auto [inputPromises, rawInputFutures] = makePromisesAndFutures<int>(kNumInputs); + + // Turn raw input Futures into ExecutorFutures. + std::vector<ExecutorFuture<int>> inputFutures; + for (auto i = 0; i < kNumInputs; ++i) { + inputFutures.emplace_back(std::move(rawInputFutures[i]).thenRunOn(executor())); + } + + auto result = whenAll(std::move(inputFutures)); + ASSERT_FALSE(result.isReady()); + + const auto kValue = 10; + for (auto i = 0; i < kNumInputs; ++i) { + ASSERT_FALSE(result.isReady()); + inputPromises[i].emplaceValue(kValue); + } + + auto output = result.get(); + ASSERT_EQ(output.size(), kNumInputs); + for (auto& swValue : output) { + ASSERT_TRUE(swValue.isOK()); + ASSERT_EQ(swValue.getValue(), kValue); + } +} + +class WhenAnyTest : public FutureUtilTest {}; + +TEST_F(WhenAnyTest, ReturnsTheFirstFutureToResolveWhenThatFutureContainsSuccessAndOnlyOneInput) { + std::vector<Future<int>> inputFutures; + auto [promise, future] = makePromiseFuture<int>(); + inputFutures.emplace_back(std::move(future)); + auto result = whenAny(std::move(inputFutures)); + ASSERT_FALSE(result.isReady()); + + const auto kValue = 10; + promise.emplaceValue(kValue); + auto [swVal, idx] = result.get(); + ASSERT_TRUE(swVal.isOK()); + ASSERT_EQ(swVal.getValue(), kValue); + ASSERT_EQ(idx, 0); +} + +TEST_F(WhenAnyTest, ReturnsTheFirstFutureToResolveWhenThatFutureContainsSuccessAndMultipleInputs) { + const auto kNumInputs = 5; + auto [inputPromises, inputFutures] = makePromisesAndFutures<int>(kNumInputs); + + auto result = whenAny(std::move(inputFutures)); + ASSERT_FALSE(result.isReady()); + + const auto kWhichIdxWillBeFirst = 3; + const auto kValue = 10; + inputPromises[kWhichIdxWillBeFirst].emplaceValue(kValue); + auto [swVal, idx] = result.get(); + ASSERT_TRUE(swVal.isOK()); + ASSERT_EQ(swVal.getValue(), kValue); + ASSERT_EQ(idx, kWhichIdxWillBeFirst); +} + +TEST_F(WhenAnyTest, + ReturnsTheFirstFutureToResolveWhenThatFutureContainsSuccessAndMultipleInputsWithVoidInputs) { + const auto kNumInputs = 5; + auto [inputPromises, inputFutures] = makePromisesAndFutures<void>(kNumInputs); + + auto result = whenAny(std::move(inputFutures)); + ASSERT_FALSE(result.isReady()); + + const auto kWhichIdxWillBeFirst = 3; + inputPromises[kWhichIdxWillBeFirst].emplaceValue(); + auto [status, idx] = result.get(); + ASSERT_OK(status); + ASSERT_EQ(idx, kWhichIdxWillBeFirst); +} + +TEST_F( + WhenAnyTest, + ReturnsTheFirstFutureToResolveWhenThatFutureContainsSuccessAndMultipleInputsAndOthersSucceedAfter) { + const auto kNumInputs = 5; + auto [inputPromises, inputFutures] = makePromisesAndFutures<int>(kNumInputs); + + auto result = whenAny(std::move(inputFutures)); + ASSERT_FALSE(result.isReady()); + + const auto kWhichIdxWillBeFirst = 3; + const auto kValue = 10; + inputPromises[kWhichIdxWillBeFirst].emplaceValue(kValue); + auto [swVal, idx] = result.get(); + ASSERT_TRUE(swVal.isOK()); + ASSERT_EQ(swVal.getValue(), kValue); + ASSERT_EQ(idx, kWhichIdxWillBeFirst); + + // Make sure there's no problem when these resolve after whenAny has resolved due to an error. + for (auto i = 0; i < kNumInputs; ++i) { + if (i != kWhichIdxWillBeFirst) { + inputPromises[i].emplaceValue(5); + } + } +} + +TEST_F(WhenAnyTest, ReturnsTheFirstFutureToResolveWhenThatFutureContainsAnErrorAndOnlyOneInput) { + std::vector<Future<int>> inputFutures; + auto [promise, future] = makePromiseFuture<int>(); + inputFutures.emplace_back(std::move(future)); + auto result = whenAny(std::move(inputFutures)); + ASSERT_FALSE(result.isReady()); + + promise.setError(kErrorStatus); + auto [swVal, idx] = result.get(); + ASSERT_EQ(swVal.getStatus(), kErrorStatus); + ASSERT_EQ(idx, 0); +} + +TEST_F(WhenAnyTest, ReturnsTheFirstFutureToResolveWhenThatFutureContainsAnErrorAndMultipleInputs) { + const auto kNumInputs = 5; + auto [inputPromises, inputFutures] = makePromisesAndFutures<int>(kNumInputs); + + auto result = whenAny(std::move(inputFutures)); + ASSERT_FALSE(result.isReady()); + + const auto kWhichIdxWillBeFirst = 3; + inputPromises[kWhichIdxWillBeFirst].setError(kErrorStatus); + auto [swVal, idx] = result.get(); + ASSERT_EQ(swVal.getStatus(), kErrorStatus); + ASSERT_EQ(idx, kWhichIdxWillBeFirst); +} + +TEST_F(WhenAnyTest, + ReturnsTheFirstFutureToResolveWhenThatFutureContainsAnErrorAndMultipleInputsWithVoidInputs) { + const auto kNumInputs = 5; + auto [inputPromises, inputFutures] = makePromisesAndFutures<void>(kNumInputs); + + auto result = whenAny(std::move(inputFutures)); + ASSERT_FALSE(result.isReady()); + + const auto kWhichIdxWillBeFirst = 3; + inputPromises[kWhichIdxWillBeFirst].setError(kErrorStatus); + auto [status, idx] = result.get(); + ASSERT_EQ(status, kErrorStatus); + ASSERT_EQ(idx, kWhichIdxWillBeFirst); +} + +TEST_F( + WhenAnyTest, + ReturnsTheFirstFutureToResolveWhenThatFutureContainsAnErrorAndMultipleInputsAndOthersSucceedAfter) { + const auto kNumInputs = 5; + auto [inputPromises, inputFutures] = makePromisesAndFutures<int>(kNumInputs); + + auto result = whenAny(std::move(inputFutures)); + ASSERT_FALSE(result.isReady()); + + const auto kWhichIdxWillBeFirst = 3; + inputPromises[kWhichIdxWillBeFirst].setError(kErrorStatus); + auto [swVal, idx] = result.get(); + ASSERT_EQ(swVal.getStatus(), kErrorStatus); + ASSERT_EQ(idx, kWhichIdxWillBeFirst); + + // Make sure there's no problem when these resolve after whenAny has resolved due to an error. + for (auto i = 0; i < kNumInputs; ++i) { + if (i != kWhichIdxWillBeFirst) { + inputPromises[i].emplaceValue(5); + } + } +} + +TEST_F(WhenAnyTest, WorksWithExecutorFutures) { + const auto kNumInputs = 5; + auto [inputPromises, rawInputFutures] = makePromisesAndFutures<int>(kNumInputs); + + // Turn raw input Futures into ExecutorFutures. + std::vector<ExecutorFuture<int>> inputFutures; + for (auto i = 0; i < kNumInputs; ++i) { + inputFutures.emplace_back(std::move(rawInputFutures[i]).thenRunOn(executor())); + } + + auto result = whenAny(std::move(inputFutures)); + ASSERT_FALSE(result.isReady()); + + const auto kWhichIdxWillBeFirst = 3; + const auto kValue = 10; + inputPromises[kWhichIdxWillBeFirst].emplaceValue(kValue); + auto [swVal, idx] = result.get(); + ASSERT_TRUE(swVal.isOK()); + ASSERT_EQ(swVal.getValue(), kValue); + ASSERT_EQ(idx, kWhichIdxWillBeFirst); +} + + } // namespace } // namespace mongo |