summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGeorge Wangensteen <george.wangensteen@mongodb.com>2020-11-30 16:04:23 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2020-12-10 19:45:20 +0000
commit261562fe64fac903ff1da27de429cdc395bf308d (patch)
tree9e638efddd747e4fc7270b79acde51a0fdac9e6f
parent1779d6fcbfa29f1263ba3e77b4aab4e2f41f18cf (diff)
downloadmongo-261562fe64fac903ff1da27de429cdc395bf308d.tar.gz
SERVER-51298 Add cancelation support to AsyncTry/until looping utility
-rw-r--r--src/mongo/db/repl/tenant_migration_access_blocker.cpp3
-rw-r--r--src/mongo/db/repl/tenant_migration_donor_service.cpp10
-rw-r--r--src/mongo/db/s/range_deletion_util.cpp3
-rw-r--r--src/mongo/util/future_util.h139
-rw-r--r--src/mongo/util/future_util_test.cpp90
5 files changed, 173 insertions, 72 deletions
diff --git a/src/mongo/db/repl/tenant_migration_access_blocker.cpp b/src/mongo/db/repl/tenant_migration_access_blocker.cpp
index 02286cd4307..b7d95ed16d8 100644
--- a/src/mongo/db/repl/tenant_migration_access_blocker.cpp
+++ b/src/mongo/db/repl/tenant_migration_access_blocker.cpp
@@ -37,6 +37,7 @@
#include "mongo/db/repl/tenant_migration_committed_info.h"
#include "mongo/db/repl/tenant_migration_conflict_info.h"
#include "mongo/logv2/log.h"
+#include "mongo/util/cancelation.h"
#include "mongo/util/fail_point.h"
#include "mongo/util/future_util.h"
@@ -292,7 +293,7 @@ ExecutorFuture<void> TenantMigrationAccessBlocker::_waitForOpTimeToMajorityCommi
return shouldStop;
})
.withBackoffBetweenIterations(kExponentialBackoff)
- .on(_executor);
+ .on(_executor, CancelationToken::uncancelable());
}
void TenantMigrationAccessBlocker::appendInfoForServerStatus(BSONObjBuilder* builder) const {
diff --git a/src/mongo/db/repl/tenant_migration_donor_service.cpp b/src/mongo/db/repl/tenant_migration_donor_service.cpp
index b55f7b0f7be..71585d55a9c 100644
--- a/src/mongo/db/repl/tenant_migration_donor_service.cpp
+++ b/src/mongo/db/repl/tenant_migration_donor_service.cpp
@@ -110,7 +110,7 @@ ExecutorFuture<void> TenantMigrationDonorService::_rebuildService(
})
.until([](Status status) { return shouldStopCreatingTTLIndex(status); })
.withBackoffBetweenIterations(kExponentialBackoff)
- .on(**executor);
+ .on(**executor, CancelationToken::uncancelable());
}
TenantMigrationDonorService::Instance::Instance(ServiceContext* serviceContext,
@@ -249,7 +249,7 @@ ExecutorFuture<repl::OpTime> TenantMigrationDonorService::Instance::_insertState
return shouldStopInsertingDonorStateDoc(swOpTime.getStatus());
})
.withBackoffBetweenIterations(kExponentialBackoff)
- .on(**executor);
+ .on(**executor, CancelationToken::uncancelable());
}
ExecutorFuture<repl::OpTime> TenantMigrationDonorService::Instance::_updateStateDocument(
@@ -344,7 +344,7 @@ ExecutorFuture<repl::OpTime> TenantMigrationDonorService::Instance::_updateState
return shouldStopUpdatingDonorStateDoc(swOpTime.getStatus());
})
.withBackoffBetweenIterations(kExponentialBackoff)
- .on(**executor);
+ .on(**executor, CancelationToken::uncancelable());
}
ExecutorFuture<repl::OpTime>
@@ -382,7 +382,7 @@ TenantMigrationDonorService::Instance::_markStateDocumentAsGarbageCollectable(
return shouldStopUpdatingDonorStateDoc(swOpTime.getStatus());
})
.withBackoffBetweenIterations(kExponentialBackoff)
- .on(**executor);
+ .on(**executor, CancelationToken::uncancelable());
}
ExecutorFuture<void> TenantMigrationDonorService::Instance::_waitForMajorityWriteConcern(
@@ -446,7 +446,7 @@ ExecutorFuture<void> TenantMigrationDonorService::Instance::_sendCommandToRecipi
})
.until([](Status status) { return shouldStopSendingRecipientCommand(status); })
.withBackoffBetweenIterations(kExponentialBackoff)
- .on(**executor);
+ .on(**executor, token);
}
ExecutorFuture<void> TenantMigrationDonorService::Instance::_sendRecipientSyncDataCommand(
diff --git a/src/mongo/db/s/range_deletion_util.cpp b/src/mongo/db/s/range_deletion_util.cpp
index 8dd7347d481..3073570682d 100644
--- a/src/mongo/db/s/range_deletion_util.cpp
+++ b/src/mongo/db/s/range_deletion_util.cpp
@@ -63,6 +63,7 @@
#include "mongo/db/write_concern.h"
#include "mongo/executor/task_executor.h"
#include "mongo/logv2/log.h"
+#include "mongo/util/cancelation.h"
#include "mongo/util/future_util.h"
namespace mongo {
@@ -348,7 +349,7 @@ ExecutorFuture<void> deleteRangeInBatches(const std::shared_ptr<executor::TaskEx
ErrorCodes::isNotPrimaryError(swNumDeleted.getStatus());
})
.withDelayBetweenIterations(delayBetweenBatches)
- .on(executor)
+ .on(executor, CancelationToken::uncancelable())
.ignoreValue();
}
diff --git a/src/mongo/util/future_util.h b/src/mongo/util/future_util.h
index 6050bcb2e58..ff846fabc3e 100644
--- a/src/mongo/util/future_util.h
+++ b/src/mongo/util/future_util.h
@@ -30,6 +30,7 @@
#include "mongo/executor/task_executor.h"
#include "mongo/util/future.h"
+#include "mongo/util/static_immortal.h"
namespace mongo {
@@ -47,6 +48,14 @@ ExecutorFuture<void> sleepFor(std::shared_ptr<executor::TaskExecutor> executor,
namespace future_util_details {
/**
+ * Error status to use if any AsyncTry loop has been canceled.
+ */
+inline Status asyncTryCanceledStatus() {
+ static StaticImmortal s = Status{ErrorCodes::CallbackCanceled, "AsyncTry loop canceled"};
+ return *s;
+}
+
+/**
* Widget to get a default-constructible object that allows access to the type passed in at
* compile time. Used for getReturnType below.
*/
@@ -84,15 +93,20 @@ public:
/**
* Launches the loop and returns an ExecutorFuture that will be resolved when the loop is
- * complete.
+ * complete. If the executor is already shut down or the cancelToken has already been canceled
+ * before the loop is launched, the loop body will never run and the resulting ExecutorFuture
+ * will be set with either a ShutdownInProgress or CallbackCanceled error.
*
* The returned ExecutorFuture contains the last result returned by the loop body. If the last
* iteration of the loop body threw an exception or otherwise returned an error status, the
* returned ExecutorFuture will contain that error.
*/
- auto on(std::shared_ptr<executor::TaskExecutor> executor)&& {
- auto loop = std::make_shared<TryUntilLoopWithDelay>(
- std::move(executor), std::move(_body), std::move(_condition), std::move(_delay));
+ auto on(std::shared_ptr<executor::TaskExecutor> executor, CancelationToken cancelToken)&& {
+ auto loop = std::make_shared<TryUntilLoopWithDelay>(std::move(executor),
+ std::move(_body),
+ std::move(_condition),
+ std::move(_delay),
+ std::move(cancelToken));
// Launch the recursive chain using the helper class.
return loop->run();
}
@@ -106,11 +120,13 @@ private:
TryUntilLoopWithDelay(std::shared_ptr<executor::TaskExecutor> executor,
BodyCallable executeLoopBody,
ConditionCallable shouldStopIteration,
- Delay delay)
+ Delay delay,
+ CancelationToken cancelToken)
: executor(std::move(executor)),
executeLoopBody(std::move(executeLoopBody)),
shouldStopIteration(std::move(shouldStopIteration)),
- delay(std::move(delay)) {}
+ delay(std::move(delay)),
+ cancelToken(std::move(cancelToken)) {}
/**
* Performs actual looping through recursion.
@@ -118,15 +134,19 @@ private:
ExecutorFuture<FutureContinuationResult<BodyCallable>> run() {
using ReturnType =
typename decltype(getReturnType<decltype(executeLoopBody())>())::type;
+ // If the request to executeLoopBody has already been canceled, don't attempt to run it.
+ if (cancelToken.isCanceled()) {
+ return ExecutorFuture<ReturnType>(executor, asyncTryCanceledStatus());
+ }
auto future = ExecutorFuture<void>(executor).then(executeLoopBody);
return std::move(future).onCompletion(
- [this, self = this->shared_from_this()](StatusOrStatusWith<ReturnType> s) mutable {
+ [this, self = this->shared_from_this()](StatusOrStatusWith<ReturnType> s) {
if (shouldStopIteration(s))
return ExecutorFuture<ReturnType>(executor, std::move(s));
// Retry after a delay.
- return sleepFor(executor, delay.getNext()).then([this, self]() mutable {
+ return executor->sleepFor(delay.getNext(), cancelToken).then([this, self] {
return run();
});
});
@@ -136,6 +156,7 @@ private:
BodyCallable executeLoopBody;
ConditionCallable shouldStopIteration;
Delay delay;
+ CancelationToken cancelToken;
};
BodyCallable _body;
@@ -177,15 +198,17 @@ public:
/**
* Launches the loop and returns an ExecutorFuture that will be resolved when the loop is
- * complete.
+ * complete. If the executor is already shut down or the cancelToken has already been canceled
+ * before the loop is launched, the loop body will never run and the resulting ExecutorFuture
+ * will be set with either a ShutdownInProgress or CallbackCanceled error.
*
* The returned ExecutorFuture contains the last result returned by the loop body. If the last
* iteration of the loop body threw an exception or otherwise returned an error status, the
* returned ExecutorFuture will contain that error.
*/
- auto on(std::shared_ptr<executor::TaskExecutor> executor)&& {
+ auto on(std::shared_ptr<executor::TaskExecutor> executor, CancelationToken cancelToken)&& {
auto loop = std::make_shared<TryUntilLoop>(
- std::move(executor), std::move(_body), std::move(_condition));
+ std::move(executor), std::move(_body), std::move(_condition), std::move(cancelToken));
// Launch the recursive chain using the helper class.
return loop->run();
}
@@ -224,10 +247,12 @@ private:
struct TryUntilLoop : public std::enable_shared_from_this<TryUntilLoop> {
TryUntilLoop(std::shared_ptr<executor::TaskExecutor> executor,
BodyCallable executeLoopBody,
- ConditionCallable shouldStopIteration)
+ ConditionCallable shouldStopIteration,
+ CancelationToken cancelToken)
: executor(std::move(executor)),
executeLoopBody(std::move(executeLoopBody)),
- shouldStopIteration(std::move(shouldStopIteration)) {}
+ shouldStopIteration(std::move(shouldStopIteration)),
+ cancelToken(std::move(cancelToken)) {}
/**
* Performs actual looping through recursion.
@@ -235,10 +260,13 @@ private:
ExecutorFuture<FutureContinuationResult<BodyCallable>> run() {
using ReturnType =
typename decltype(getReturnType<decltype(executeLoopBody())>())::type;
+ // If the request is already canceled, don't run anything.
+ if (cancelToken.isCanceled())
+ return ExecutorFuture<ReturnType>(executor, asyncTryCanceledStatus());
auto future = ExecutorFuture<void>(executor).then(executeLoopBody);
return std::move(future).onCompletion(
- [this, self = this->shared_from_this()](StatusOrStatusWith<ReturnType> s) mutable {
+ [this, self = this->shared_from_this()](StatusOrStatusWith<ReturnType> s) {
if (shouldStopIteration(s))
return ExecutorFuture<ReturnType>(executor, std::move(s));
@@ -249,6 +277,7 @@ private:
std::shared_ptr<executor::TaskExecutor> executor;
BodyCallable executeLoopBody;
ConditionCallable shouldStopIteration;
+ CancelationToken cancelToken;
};
BodyCallable _body;
@@ -332,32 +361,31 @@ SemiFuture<ResultVector> whenAllSucceed(std::vector<FutureLike>&& futures) {
auto sharedBlock = std::make_shared<SharedBlock>(futures.size(), std::move(promise));
for (size_t i = 0; i < futures.size(); ++i) {
- std::move(futures[i])
- .getAsync([sharedBlock, myIndex = i](StatusWith<Value> swValue) mutable {
- if (swValue.isOK()) {
- // Best effort check that no error has returned, not required for correctness.
- if (!sharedBlock->completedWithError.loadRelaxed()) {
- // Put this result in its proper slot in the output vector.
- sharedBlock->intermediateResult[myIndex] = std::move(swValue.getValue());
- auto numResultsReturnedWithSuccess =
- sharedBlock->numResultsReturnedWithSuccess.addAndFetch(1);
- // If this is the last result to return, set the promise. Note that this
- // will never be true if one of the input futures resolves with an error,
- // since the future with an error will not cause the
- // numResultsReturnedWithSuccess count to be incremented.
- if (numResultsReturnedWithSuccess == sharedBlock->numFuturesToWaitFor) {
- // All results are ready.
- sharedBlock->resultPromise.emplaceValue(
- std::move(sharedBlock->intermediateResult));
- }
- }
- } else {
- // Make sure no other error has already been set before setting the promise.
- if (!sharedBlock->completedWithError.swap(true)) {
- sharedBlock->resultPromise.setError(std::move(swValue.getStatus()));
+ std::move(futures[i]).getAsync([sharedBlock, myIndex = i](StatusWith<Value> swValue) {
+ if (swValue.isOK()) {
+ // Best effort check that no error has returned, not required for correctness.
+ if (!sharedBlock->completedWithError.loadRelaxed()) {
+ // Put this result in its proper slot in the output vector.
+ sharedBlock->intermediateResult[myIndex] = std::move(swValue.getValue());
+ auto numResultsReturnedWithSuccess =
+ sharedBlock->numResultsReturnedWithSuccess.addAndFetch(1);
+ // If this is the last result to return, set the promise. Note that this
+ // will never be true if one of the input futures resolves with an error,
+ // since the future with an error will not cause the
+ // numResultsReturnedWithSuccess count to be incremented.
+ if (numResultsReturnedWithSuccess == sharedBlock->numFuturesToWaitFor) {
+ // All results are ready.
+ sharedBlock->resultPromise.emplaceValue(
+ std::move(sharedBlock->intermediateResult));
}
}
- });
+ } else {
+ // Make sure no other error has already been set before setting the promise.
+ if (!sharedBlock->completedWithError.swap(true)) {
+ sharedBlock->resultPromise.setError(std::move(swValue.getStatus()));
+ }
+ }
+ });
}
return std::move(future).semi();
@@ -450,19 +478,17 @@ SemiFuture<ResultVector> whenAll(std::vector<FutureT>&& futures) {
auto sharedBlock = std::make_shared<SharedBlock>(futures.size(), std::move(promise));
for (size_t i = 0; i < futures.size(); ++i) {
- std::move(futures[i])
- .getAsync([sharedBlock, myIndex = i](StatusOrStatusWith<Value> value) mutable {
- sharedBlock->intermediateResult[myIndex] = std::move(value);
+ std::move(futures[i]).getAsync([sharedBlock, myIndex = i](StatusOrStatusWith<Value> value) {
+ sharedBlock->intermediateResult[myIndex] = std::move(value);
- auto numReady = sharedBlock->numReady.addAndFetch(1);
- invariant(numReady <= sharedBlock->numFuturesToWaitFor);
+ auto numReady = sharedBlock->numReady.addAndFetch(1);
+ invariant(numReady <= sharedBlock->numFuturesToWaitFor);
- if (numReady == sharedBlock->numFuturesToWaitFor) {
- // All results are ready.
- sharedBlock->resultPromise.emplaceValue(
- std::move(sharedBlock->intermediateResult));
- }
- });
+ if (numReady == sharedBlock->numFuturesToWaitFor) {
+ // All results are ready.
+ sharedBlock->resultPromise.emplaceValue(std::move(sharedBlock->intermediateResult));
+ }
+ });
}
return std::move(future).semi();
@@ -504,14 +530,13 @@ SemiFuture<Result> whenAny(std::vector<FutureT>&& futures) {
auto sharedBlock = std::make_shared<SharedBlock>(std::move(promise));
for (size_t i = 0; i < futures.size(); ++i) {
- std::move(futures[i])
- .getAsync([sharedBlock, myIndex = i](StatusOrStatusWith<Value> value) mutable {
- // If this is the first input future to complete, change done to true and set the
- // value on the promise.
- if (!sharedBlock->done.swap(true)) {
- sharedBlock->resultPromise.emplaceValue(Result{std::move(value), myIndex});
- }
- });
+ std::move(futures[i]).getAsync([sharedBlock, myIndex = i](StatusOrStatusWith<Value> value) {
+ // If this is the first input future to complete, change done to true and set the
+ // value on the promise.
+ if (!sharedBlock->done.swap(true)) {
+ sharedBlock->resultPromise.emplaceValue(Result{std::move(value), myIndex});
+ }
+ });
}
return std::move(future).semi();
diff --git a/src/mongo/util/future_util_test.cpp b/src/mongo/util/future_util_test.cpp
index 93744634593..e9878d21e8c 100644
--- a/src/mongo/util/future_util_test.cpp
+++ b/src/mongo/util/future_util_test.cpp
@@ -87,7 +87,9 @@ using AsyncTryUntilTest = FutureUtilTest;
TEST_F(AsyncTryUntilTest, LoopExecutesOnceWithAlwaysTrueCondition) {
auto i = 0;
- auto resultFut = AsyncTry([&] { ++i; }).until([](Status s) { return true; }).on(executor());
+ auto resultFut = AsyncTry([&] { ++i; })
+ .until([](Status s) { return true; })
+ .on(executor(), CancelationToken::uncancelable());
resultFut.wait();
ASSERT_EQ(i, 1);
@@ -101,7 +103,7 @@ TEST_F(AsyncTryUntilTest, LoopExecutesUntilConditionIsTrue) {
return i;
})
.until([&](StatusWith<int> swInt) { return swInt.getValue() == numLoops; })
- .on(executor());
+ .on(executor(), CancelationToken::uncancelable());
resultFut.wait();
ASSERT_EQ(i, numLoops);
@@ -112,7 +114,7 @@ TEST_F(AsyncTryUntilTest, LoopDoesNotRespectConstDelayIfConditionIsAlreadyTrue)
auto resultFut = AsyncTry([&] { ++i; })
.until([](Status s) { return true; })
.withDelayBetweenIterations(Seconds(10000000))
- .on(executor());
+ .on(executor(), CancelationToken::uncancelable());
// This would hang for a very long time if the behavior were incorrect.
resultFut.wait();
@@ -124,7 +126,7 @@ TEST_F(AsyncTryUntilTest, LoopDoesNotRespectBackoffDelayIfConditionIsAlreadyTrue
auto resultFut = AsyncTry([&] { ++i; })
.until([](Status s) { return true; })
.withBackoffBetweenIterations(TestBackoff{Seconds(10000000)})
- .on(executor());
+ .on(executor(), CancelationToken::uncancelable());
// This would hang for a very long time if the behavior were incorrect.
resultFut.wait();
@@ -140,7 +142,7 @@ TEST_F(AsyncTryUntilTest, LoopRespectsConstDelayAfterEvaluatingCondition) {
})
.until([&](StatusWith<int> swInt) { return swInt.getValue() == numLoops; })
.withDelayBetweenIterations(Seconds(1000))
- .on(executor());
+ .on(executor(), CancelationToken::uncancelable());
ASSERT_FALSE(resultFut.isReady());
// Advance the time some, but not enough to be past the delay yet.
@@ -171,7 +173,7 @@ TEST_F(AsyncTryUntilTest, LoopRespectsBackoffDelayAfterEvaluatingCondition) {
})
.until([&](StatusWith<int> swInt) { return swInt.getValue() == numLoops; })
.withBackoffBetweenIterations(TestBackoff{Seconds(1000)})
- .on(executor());
+ .on(executor(), CancelationToken::uncancelable());
ASSERT_FALSE(resultFut.isReady());
// Due to the backoff, the delays are going to be 1000 seconds and 2000 seconds.
@@ -220,7 +222,7 @@ TEST_F(AsyncTryUntilTest, LoopBodyPropagatesValueOfLastIterationToCaller) {
return i;
})
.until([&](StatusWith<int> swInt) { return i == expectedResult; })
- .on(executor());
+ .on(executor(), CancelationToken::uncancelable());
ASSERT_EQ(resultFut.get(), expectedResult);
}
@@ -235,11 +237,83 @@ TEST_F(AsyncTryUntilTest, LoopBodyPropagatesErrorToConditionAndCaller) {
ASSERT_EQ(swInt.getStatus().code(), ErrorCodes::InternalError);
return true;
})
- .on(executor());
+ .on(executor(), CancelationToken::uncancelable());
ASSERT_EQ(resultFut.getNoThrow(), ErrorCodes::InternalError);
}
+static const Status kCanceledStatus = {ErrorCodes::CallbackCanceled, "AsyncTry::until canceled"};
+
+TEST_F(AsyncTryUntilTest, AsyncTryUntilCanBeCanceled) {
+ CancelationSource cancelSource;
+ auto resultFut =
+ AsyncTry([] {}).until([](Status) { return false; }).on(executor(), cancelSource.token());
+ // This should hang forever if it is not canceled.
+ cancelSource.cancel();
+ ASSERT_EQ(resultFut.getNoThrow(), kCanceledStatus);
+}
+
+TEST_F(AsyncTryUntilTest, AsyncTryUntilWithDelayCanBeCanceled) {
+ CancelationSource cancelSource;
+ auto resultFut = AsyncTry([] {})
+ .until([](Status) { return false; })
+ .withDelayBetweenIterations(Hours(1000))
+ .on(executor(), cancelSource.token());
+ // Since the "until" condition is false, and the delay between iterations is very long, the only
+ // way this test should pass without hanging is if the future produced by TaskExecutor::sleepFor
+ // is resolved and set with ErrorCodes::CallbackCanceled well _before_ the deadline.
+ cancelSource.cancel();
+ ASSERT_EQ(resultFut.getNoThrow(), kCanceledStatus);
+}
+
+TEST_F(AsyncTryUntilTest, AsyncTryUntilWithBackoffCanBeCanceled) {
+ CancelationSource cancelSource;
+ auto resultFut = AsyncTry([] {})
+ .until([](Status) { return false; })
+ .withBackoffBetweenIterations(TestBackoff{Seconds(10000000)})
+ .on(executor(), cancelSource.token());
+ cancelSource.cancel();
+ ASSERT_EQ(resultFut.getNoThrow(), kCanceledStatus);
+}
+
+TEST_F(AsyncTryUntilTest, CanceledTryUntilLoopDoesNotExecuteIfAlreadyCanceled) {
+ int counter{0};
+ CancelationSource cancelSource;
+ auto canceledToken = cancelSource.token();
+ cancelSource.cancel();
+ auto resultFut = AsyncTry([&] { ++counter; })
+ .until([](Status) { return false; })
+ .on(executor(), canceledToken);
+ ASSERT_EQ(resultFut.getNoThrow(), kCanceledStatus);
+ ASSERT_EQ(counter, 0);
+}
+
+TEST_F(AsyncTryUntilTest, CanceledTryUntilLoopWithDelayDoesNotExecuteIfAlreadyCanceled) {
+ CancelationSource cancelSource;
+ int counter{0};
+ auto canceledToken = cancelSource.token();
+ cancelSource.cancel();
+ auto resultFut = AsyncTry([&] { ++counter; })
+ .until([](Status) { return false; })
+ .withDelayBetweenIterations(Hours(1000))
+ .on(executor(), canceledToken);
+ ASSERT_EQ(resultFut.getNoThrow(), kCanceledStatus);
+ ASSERT_EQ(counter, 0);
+}
+
+TEST_F(AsyncTryUntilTest, CanceledTryUntilLoopWithBackoffDoesNotExecuteIfAlreadyCanceled) {
+ CancelationSource cancelSource;
+ int counter{0};
+ auto canceledToken = cancelSource.token();
+ cancelSource.cancel();
+ auto resultFut = AsyncTry([&] { ++counter; })
+ .until([](Status) { return false; })
+ .withBackoffBetweenIterations(TestBackoff{Seconds(10000000)})
+ .on(executor(), canceledToken);
+ ASSERT_EQ(resultFut.getNoThrow(), kCanceledStatus);
+ ASSERT_EQ(counter, 0);
+}
+
template <typename T>
std::pair<std::vector<Promise<T>>, std::vector<Future<T>>> makePromisesAndFutures(size_t size) {
std::vector<Future<T>> inputFutures;