summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Saltz <matthew.saltz@mongodb.com>2020-09-03 15:40:30 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2020-09-24 22:56:50 +0000
commit39cb1d7307652e105e1b12317ef0e671c39d567c (patch)
tree5c2756f97e466f9fec125507dd48cdf245ac1a8a
parent53aaa253ff598578816d4add5e88f063775b4b85 (diff)
downloadmongo-39cb1d7307652e105e1b12317ef0e671c39d567c.tar.gz
SERVER-50341 Add whenAny and whenAll
-rw-r--r--src/mongo/util/future_util.h99
-rw-r--r--src/mongo/util/future_util_test.cpp358
2 files changed, 443 insertions, 14 deletions
diff --git a/src/mongo/util/future_util.h b/src/mongo/util/future_util.h
index ceaf310090f..d609a2f1e6d 100644
--- a/src/mongo/util/future_util.h
+++ b/src/mongo/util/future_util.h
@@ -380,4 +380,103 @@ SemiFuture<void> whenAllSucceed(std::vector<FutureLike>&& futures) {
return std::move(future).semi();
}
+/**
+ * Given a vector of input Futures or ExecutorFutures, returns a SemiFuture that contains the
+ * results of each input future wrapped in a StatusWith to indicate whether it resolved with success
+ * or failure and will be resolved when all of the input futures have resolved.
+ */
+template <typename FutureT,
+ typename Value = typename FutureT::value_type,
+ typename ResultVector = std::vector<StatusOrStatusWith<Value>>>
+SemiFuture<ResultVector> whenAll(std::vector<FutureT>&& futures) {
+ invariant(futures.size() > 0);
+
+ /**
+ * A structure used to share state between the input futures.
+ */
+ struct SharedBlock {
+ SharedBlock(size_t numFuturesToWaitFor, Promise<ResultVector> result)
+ : numFuturesToWaitFor(numFuturesToWaitFor),
+ intermediateResult(numFuturesToWaitFor, {ErrorCodes::InternalError, ""}),
+ resultPromise(std::move(result)) {}
+ // Total number of input futures.
+ const size_t numFuturesToWaitFor;
+ // Tracks the number of input futures which have resolved so far.
+ AtomicWord<size_t> numReady{0};
+ // A vector containing the results of each input future.
+ ResultVector intermediateResult;
+ // The promise corresponding to the resulting SemiFuture returned by this function.
+ Promise<ResultVector> resultPromise;
+ };
+
+ auto [promise, future] = makePromiseFuture<ResultVector>();
+ auto sharedBlock = std::make_shared<SharedBlock>(futures.size(), std::move(promise));
+
+ for (size_t i = 0; i < futures.size(); ++i) {
+ std::move(futures[i])
+ .getAsync([sharedBlock, myIndex = i](StatusOrStatusWith<Value> value) mutable {
+ sharedBlock->intermediateResult[myIndex] = std::move(value);
+
+ auto numReady = sharedBlock->numReady.addAndFetch(1);
+ invariant(numReady <= sharedBlock->numFuturesToWaitFor);
+
+ if (numReady == sharedBlock->numFuturesToWaitFor) {
+ // All results are ready.
+ sharedBlock->resultPromise.emplaceValue(
+ std::move(sharedBlock->intermediateResult));
+ }
+ });
+ }
+
+ return std::move(future).semi();
+}
+
+/**
+ * Result type for the whenAny function.
+ */
+template <typename T>
+struct WhenAnyResult {
+ // The result of the future that resolved first.
+ StatusOrStatusWith<T> result;
+ // The index of the future that resolved first.
+ size_t index;
+};
+
+/**
+ * Given a vector of input Futures or ExecutorFutures, returns a SemiFuture which will contain a
+ * struct containing the first of those futures to resolve along with its index in the input array.
+ */
+template <typename FutureT,
+ typename Value = typename FutureT::value_type,
+ typename Result = WhenAnyResult<Value>>
+SemiFuture<Result> whenAny(std::vector<FutureT>&& futures) {
+ invariant(futures.size() > 0);
+
+ /**
+ * A structure used to share state between the input futures.
+ */
+ struct SharedBlock {
+ SharedBlock(Promise<Result> result) : resultPromise(std::move(result)) {}
+ // Tracks whether or not the resultPromise has been set.
+ AtomicWord<bool> done{false};
+ // The promise corresponding to the resulting SemiFuture returned by this function.
+ Promise<Result> resultPromise;
+ };
+
+ auto [promise, future] = makePromiseFuture<Result>();
+ auto sharedBlock = std::make_shared<SharedBlock>(std::move(promise));
+
+ for (size_t i = 0; i < futures.size(); ++i) {
+ std::move(futures[i])
+ .getAsync([sharedBlock, myIndex = i](StatusOrStatusWith<Value> value) mutable {
+ // If this is the first input future to complete, change done to true and set the
+ // value on the promise.
+ if (!sharedBlock->done.swap(true)) {
+ sharedBlock->resultPromise.emplaceValue(Result{std::move(value), myIndex});
+ }
+ });
+ }
+
+ return std::move(future).semi();
+}
} // namespace mongo
diff --git a/src/mongo/util/future_util_test.cpp b/src/mongo/util/future_util_test.cpp
index 3f2606c3d4f..7d9dbb1655a 100644
--- a/src/mongo/util/future_util_test.cpp
+++ b/src/mongo/util/future_util_test.cpp
@@ -163,21 +163,19 @@ TEST_F(AsyncTryUntilTest, LoopBodyPropagatesErrorToConditionAndCaller) {
ASSERT_EQ(resultFut.getNoThrow(), ErrorCodes::InternalError);
}
-class WhenAllSucceedTest : public FutureUtilTest {
-public:
- template <typename T>
- static std::pair<std::vector<Promise<T>>, std::vector<Future<T>>> makePromisesAndFutures(
- size_t size) {
- std::vector<Future<T>> inputFutures;
- std::vector<Promise<T>> inputPromises;
- for (size_t i = 0; i < size; ++i) {
- auto [inputPromise, inputFuture] = makePromiseFuture<T>();
- inputFutures.emplace_back(std::move(inputFuture));
- inputPromises.emplace_back(std::move(inputPromise));
- }
- return std::make_pair(std::move(inputPromises), std::move(inputFutures));
+template <typename T>
+std::pair<std::vector<Promise<T>>, std::vector<Future<T>>> makePromisesAndFutures(size_t size) {
+ std::vector<Future<T>> inputFutures;
+ std::vector<Promise<T>> inputPromises;
+ for (size_t i = 0; i < size; ++i) {
+ auto [inputPromise, inputFuture] = makePromiseFuture<T>();
+ inputFutures.emplace_back(std::move(inputFuture));
+ inputPromises.emplace_back(std::move(inputPromise));
}
-};
+ return std::make_pair(std::move(inputPromises), std::move(inputFutures));
+}
+
+class WhenAllSucceedTest : public FutureUtilTest {};
static const Status kErrorStatus = {ErrorCodes::InternalError, ""};
@@ -462,5 +460,337 @@ TEST_F(WhenAllSucceedVoidTest, WhenAllSucceedWorksWithExecutorFutures) {
ASSERT_EQ(result.getNoThrow(), Status::OK());
}
+class WhenAllTest : public FutureUtilTest {};
+
+TEST_F(WhenAllTest, ReturnsOnceAllInputsResolveWithSuccess) {
+ const auto kNumInputs = 5;
+ auto [inputPromises, inputFutures] = makePromisesAndFutures<int>(kNumInputs);
+
+ auto result = whenAll(std::move(inputFutures));
+ ASSERT_FALSE(result.isReady());
+
+ const auto kValue = 10;
+ for (auto i = 0; i < kNumInputs; ++i) {
+ ASSERT_FALSE(result.isReady());
+ inputPromises[i].emplaceValue(kValue);
+ }
+
+ auto output = result.get();
+ ASSERT_EQ(output.size(), kNumInputs);
+ for (auto& swValue : output) {
+ ASSERT_TRUE(swValue.isOK());
+ ASSERT_EQ(swValue.getValue(), kValue);
+ }
+}
+
+TEST_F(WhenAllTest, ReturnsOnceAllInputsResolveWithError) {
+ const auto kNumInputs = 5;
+ auto [inputPromises, inputFutures] = makePromisesAndFutures<int>(kNumInputs);
+
+ auto result = whenAll(std::move(inputFutures));
+ ASSERT_FALSE(result.isReady());
+
+ for (auto i = 0; i < kNumInputs; ++i) {
+ ASSERT_FALSE(result.isReady());
+ inputPromises[i].setError(kErrorStatus);
+ }
+
+ auto output = result.get();
+ ASSERT_EQ(output.size(), kNumInputs);
+ for (auto& swValue : output) {
+ ASSERT_EQ(swValue.getStatus(), kErrorStatus);
+ }
+}
+
+TEST_F(WhenAllTest, ReturnsOnceAllInputsResolveWithSuccessWithVoidInputs) {
+ const auto kNumInputs = 5;
+ auto [inputPromises, inputFutures] = makePromisesAndFutures<void>(kNumInputs);
+
+ auto result = whenAll(std::move(inputFutures));
+ ASSERT_FALSE(result.isReady());
+
+ for (auto i = 0; i < kNumInputs; ++i) {
+ ASSERT_FALSE(result.isReady());
+ inputPromises[i].emplaceValue();
+ }
+
+ auto output = result.get();
+ ASSERT_EQ(output.size(), kNumInputs);
+ for (auto& status : output) {
+ ASSERT_OK(status);
+ }
+}
+
+TEST_F(WhenAllTest, ReturnsOnceAllInputsResolveWithErrorWithVoidInputs) {
+ const auto kNumInputs = 5;
+ auto [inputPromises, inputFutures] = makePromisesAndFutures<void>(kNumInputs);
+
+ auto result = whenAll(std::move(inputFutures));
+ ASSERT_FALSE(result.isReady());
+
+ for (auto i = 0; i < kNumInputs; ++i) {
+ ASSERT_FALSE(result.isReady());
+ inputPromises[i].setError(kErrorStatus);
+ }
+
+ auto output = result.get();
+ ASSERT_EQ(output.size(), kNumInputs);
+ for (auto& status : output) {
+ ASSERT_EQ(status, kErrorStatus);
+ }
+}
+
+
+TEST_F(WhenAllTest, ReturnsOnceAllInputsResolveWithMixOfSuccessAndError) {
+ const auto kNumInputs = 5;
+ auto [inputPromises, inputFutures] = makePromisesAndFutures<int>(kNumInputs);
+
+ auto result = whenAll(std::move(inputFutures));
+ ASSERT_FALSE(result.isReady());
+
+ const auto kValue = 10;
+ const auto kNumSuccesses = 3;
+ for (auto i = 0; i < kNumInputs; ++i) {
+ ASSERT_FALSE(result.isReady());
+ if (i < kNumSuccesses) {
+ inputPromises[i].emplaceValue(kValue);
+ } else {
+ inputPromises[i].setError(kErrorStatus);
+ }
+ }
+
+ auto output = result.get();
+ ASSERT_EQ(output.size(), kNumInputs);
+ for (auto i = 0; i < kNumInputs; ++i) {
+ auto swValue = output[i];
+ if (i < kNumSuccesses) {
+ ASSERT_TRUE(swValue.isOK());
+ ASSERT_EQ(swValue.getValue(), kValue);
+ } else {
+ ASSERT_EQ(swValue.getStatus(), kErrorStatus);
+ }
+ }
+}
+
+TEST_F(WhenAllTest, ReturnsOnceAllInputsResolveWithMixOfSuccessAndErrorInReverseOrder) {
+ const auto kNumInputs = 5;
+ auto [inputPromises, inputFutures] = makePromisesAndFutures<int>(kNumInputs);
+
+ auto result = whenAll(std::move(inputFutures));
+ ASSERT_FALSE(result.isReady());
+
+ const auto kValue = 10;
+ const auto kNumSuccesses = 3;
+ // Iterate over inputs backwards, resolving them one at a time.
+ for (auto i = kNumInputs - 1; i >= 0; --i) {
+ ASSERT_FALSE(result.isReady());
+ if (i < kNumSuccesses) {
+ inputPromises[i].emplaceValue(kValue);
+ } else {
+ inputPromises[i].setError(kErrorStatus);
+ }
+ }
+
+ auto output = result.get();
+ ASSERT_EQ(output.size(), kNumInputs);
+ for (auto i = kNumInputs - 1; i >= 0; --i) {
+ auto swValue = output[i];
+ if (i < kNumSuccesses) {
+ ASSERT_TRUE(swValue.isOK());
+ ASSERT_EQ(swValue.getValue(), kValue);
+ } else {
+ ASSERT_EQ(swValue.getStatus(), kErrorStatus);
+ }
+ }
+}
+
+TEST_F(WhenAllTest, WorksWithExecutorFutures) {
+ const auto kNumInputs = 5;
+ auto [inputPromises, rawInputFutures] = makePromisesAndFutures<int>(kNumInputs);
+
+ // Turn raw input Futures into ExecutorFutures.
+ std::vector<ExecutorFuture<int>> inputFutures;
+ for (auto i = 0; i < kNumInputs; ++i) {
+ inputFutures.emplace_back(std::move(rawInputFutures[i]).thenRunOn(executor()));
+ }
+
+ auto result = whenAll(std::move(inputFutures));
+ ASSERT_FALSE(result.isReady());
+
+ const auto kValue = 10;
+ for (auto i = 0; i < kNumInputs; ++i) {
+ ASSERT_FALSE(result.isReady());
+ inputPromises[i].emplaceValue(kValue);
+ }
+
+ auto output = result.get();
+ ASSERT_EQ(output.size(), kNumInputs);
+ for (auto& swValue : output) {
+ ASSERT_TRUE(swValue.isOK());
+ ASSERT_EQ(swValue.getValue(), kValue);
+ }
+}
+
+class WhenAnyTest : public FutureUtilTest {};
+
+TEST_F(WhenAnyTest, ReturnsTheFirstFutureToResolveWhenThatFutureContainsSuccessAndOnlyOneInput) {
+ std::vector<Future<int>> inputFutures;
+ auto [promise, future] = makePromiseFuture<int>();
+ inputFutures.emplace_back(std::move(future));
+ auto result = whenAny(std::move(inputFutures));
+ ASSERT_FALSE(result.isReady());
+
+ const auto kValue = 10;
+ promise.emplaceValue(kValue);
+ auto [swVal, idx] = result.get();
+ ASSERT_TRUE(swVal.isOK());
+ ASSERT_EQ(swVal.getValue(), kValue);
+ ASSERT_EQ(idx, 0);
+}
+
+TEST_F(WhenAnyTest, ReturnsTheFirstFutureToResolveWhenThatFutureContainsSuccessAndMultipleInputs) {
+ const auto kNumInputs = 5;
+ auto [inputPromises, inputFutures] = makePromisesAndFutures<int>(kNumInputs);
+
+ auto result = whenAny(std::move(inputFutures));
+ ASSERT_FALSE(result.isReady());
+
+ const auto kWhichIdxWillBeFirst = 3;
+ const auto kValue = 10;
+ inputPromises[kWhichIdxWillBeFirst].emplaceValue(kValue);
+ auto [swVal, idx] = result.get();
+ ASSERT_TRUE(swVal.isOK());
+ ASSERT_EQ(swVal.getValue(), kValue);
+ ASSERT_EQ(idx, kWhichIdxWillBeFirst);
+}
+
+TEST_F(WhenAnyTest,
+ ReturnsTheFirstFutureToResolveWhenThatFutureContainsSuccessAndMultipleInputsWithVoidInputs) {
+ const auto kNumInputs = 5;
+ auto [inputPromises, inputFutures] = makePromisesAndFutures<void>(kNumInputs);
+
+ auto result = whenAny(std::move(inputFutures));
+ ASSERT_FALSE(result.isReady());
+
+ const auto kWhichIdxWillBeFirst = 3;
+ inputPromises[kWhichIdxWillBeFirst].emplaceValue();
+ auto [status, idx] = result.get();
+ ASSERT_OK(status);
+ ASSERT_EQ(idx, kWhichIdxWillBeFirst);
+}
+
+TEST_F(
+ WhenAnyTest,
+ ReturnsTheFirstFutureToResolveWhenThatFutureContainsSuccessAndMultipleInputsAndOthersSucceedAfter) {
+ const auto kNumInputs = 5;
+ auto [inputPromises, inputFutures] = makePromisesAndFutures<int>(kNumInputs);
+
+ auto result = whenAny(std::move(inputFutures));
+ ASSERT_FALSE(result.isReady());
+
+ const auto kWhichIdxWillBeFirst = 3;
+ const auto kValue = 10;
+ inputPromises[kWhichIdxWillBeFirst].emplaceValue(kValue);
+ auto [swVal, idx] = result.get();
+ ASSERT_TRUE(swVal.isOK());
+ ASSERT_EQ(swVal.getValue(), kValue);
+ ASSERT_EQ(idx, kWhichIdxWillBeFirst);
+
+ // Make sure there's no problem when these resolve after whenAny has resolved due to an error.
+ for (auto i = 0; i < kNumInputs; ++i) {
+ if (i != kWhichIdxWillBeFirst) {
+ inputPromises[i].emplaceValue(5);
+ }
+ }
+}
+
+TEST_F(WhenAnyTest, ReturnsTheFirstFutureToResolveWhenThatFutureContainsAnErrorAndOnlyOneInput) {
+ std::vector<Future<int>> inputFutures;
+ auto [promise, future] = makePromiseFuture<int>();
+ inputFutures.emplace_back(std::move(future));
+ auto result = whenAny(std::move(inputFutures));
+ ASSERT_FALSE(result.isReady());
+
+ promise.setError(kErrorStatus);
+ auto [swVal, idx] = result.get();
+ ASSERT_EQ(swVal.getStatus(), kErrorStatus);
+ ASSERT_EQ(idx, 0);
+}
+
+TEST_F(WhenAnyTest, ReturnsTheFirstFutureToResolveWhenThatFutureContainsAnErrorAndMultipleInputs) {
+ const auto kNumInputs = 5;
+ auto [inputPromises, inputFutures] = makePromisesAndFutures<int>(kNumInputs);
+
+ auto result = whenAny(std::move(inputFutures));
+ ASSERT_FALSE(result.isReady());
+
+ const auto kWhichIdxWillBeFirst = 3;
+ inputPromises[kWhichIdxWillBeFirst].setError(kErrorStatus);
+ auto [swVal, idx] = result.get();
+ ASSERT_EQ(swVal.getStatus(), kErrorStatus);
+ ASSERT_EQ(idx, kWhichIdxWillBeFirst);
+}
+
+TEST_F(WhenAnyTest,
+ ReturnsTheFirstFutureToResolveWhenThatFutureContainsAnErrorAndMultipleInputsWithVoidInputs) {
+ const auto kNumInputs = 5;
+ auto [inputPromises, inputFutures] = makePromisesAndFutures<void>(kNumInputs);
+
+ auto result = whenAny(std::move(inputFutures));
+ ASSERT_FALSE(result.isReady());
+
+ const auto kWhichIdxWillBeFirst = 3;
+ inputPromises[kWhichIdxWillBeFirst].setError(kErrorStatus);
+ auto [status, idx] = result.get();
+ ASSERT_EQ(status, kErrorStatus);
+ ASSERT_EQ(idx, kWhichIdxWillBeFirst);
+}
+
+TEST_F(
+ WhenAnyTest,
+ ReturnsTheFirstFutureToResolveWhenThatFutureContainsAnErrorAndMultipleInputsAndOthersSucceedAfter) {
+ const auto kNumInputs = 5;
+ auto [inputPromises, inputFutures] = makePromisesAndFutures<int>(kNumInputs);
+
+ auto result = whenAny(std::move(inputFutures));
+ ASSERT_FALSE(result.isReady());
+
+ const auto kWhichIdxWillBeFirst = 3;
+ inputPromises[kWhichIdxWillBeFirst].setError(kErrorStatus);
+ auto [swVal, idx] = result.get();
+ ASSERT_EQ(swVal.getStatus(), kErrorStatus);
+ ASSERT_EQ(idx, kWhichIdxWillBeFirst);
+
+ // Make sure there's no problem when these resolve after whenAny has resolved due to an error.
+ for (auto i = 0; i < kNumInputs; ++i) {
+ if (i != kWhichIdxWillBeFirst) {
+ inputPromises[i].emplaceValue(5);
+ }
+ }
+}
+
+TEST_F(WhenAnyTest, WorksWithExecutorFutures) {
+ const auto kNumInputs = 5;
+ auto [inputPromises, rawInputFutures] = makePromisesAndFutures<int>(kNumInputs);
+
+ // Turn raw input Futures into ExecutorFutures.
+ std::vector<ExecutorFuture<int>> inputFutures;
+ for (auto i = 0; i < kNumInputs; ++i) {
+ inputFutures.emplace_back(std::move(rawInputFutures[i]).thenRunOn(executor()));
+ }
+
+ auto result = whenAny(std::move(inputFutures));
+ ASSERT_FALSE(result.isReady());
+
+ const auto kWhichIdxWillBeFirst = 3;
+ const auto kValue = 10;
+ inputPromises[kWhichIdxWillBeFirst].emplaceValue(kValue);
+ auto [swVal, idx] = result.get();
+ ASSERT_TRUE(swVal.isOK());
+ ASSERT_EQ(swVal.getValue(), kValue);
+ ASSERT_EQ(idx, kWhichIdxWillBeFirst);
+}
+
+
} // namespace
} // namespace mongo