From 261562fe64fac903ff1da27de429cdc395bf308d Mon Sep 17 00:00:00 2001 From: George Wangensteen Date: Mon, 30 Nov 2020 16:04:23 +0000 Subject: SERVER-51298 Add cancelation support to AsyncTry/until looping utility --- .../db/repl/tenant_migration_access_blocker.cpp | 3 +- .../db/repl/tenant_migration_donor_service.cpp | 10 +- src/mongo/db/s/range_deletion_util.cpp | 3 +- src/mongo/util/future_util.h | 139 ++++++++++++--------- src/mongo/util/future_util_test.cpp | 90 +++++++++++-- 5 files changed, 173 insertions(+), 72 deletions(-) diff --git a/src/mongo/db/repl/tenant_migration_access_blocker.cpp b/src/mongo/db/repl/tenant_migration_access_blocker.cpp index 02286cd4307..b7d95ed16d8 100644 --- a/src/mongo/db/repl/tenant_migration_access_blocker.cpp +++ b/src/mongo/db/repl/tenant_migration_access_blocker.cpp @@ -37,6 +37,7 @@ #include "mongo/db/repl/tenant_migration_committed_info.h" #include "mongo/db/repl/tenant_migration_conflict_info.h" #include "mongo/logv2/log.h" +#include "mongo/util/cancelation.h" #include "mongo/util/fail_point.h" #include "mongo/util/future_util.h" @@ -292,7 +293,7 @@ ExecutorFuture TenantMigrationAccessBlocker::_waitForOpTimeToMajorityCommi return shouldStop; }) .withBackoffBetweenIterations(kExponentialBackoff) - .on(_executor); + .on(_executor, CancelationToken::uncancelable()); } void TenantMigrationAccessBlocker::appendInfoForServerStatus(BSONObjBuilder* builder) const { diff --git a/src/mongo/db/repl/tenant_migration_donor_service.cpp b/src/mongo/db/repl/tenant_migration_donor_service.cpp index b55f7b0f7be..71585d55a9c 100644 --- a/src/mongo/db/repl/tenant_migration_donor_service.cpp +++ b/src/mongo/db/repl/tenant_migration_donor_service.cpp @@ -110,7 +110,7 @@ ExecutorFuture TenantMigrationDonorService::_rebuildService( }) .until([](Status status) { return shouldStopCreatingTTLIndex(status); }) .withBackoffBetweenIterations(kExponentialBackoff) - .on(**executor); + .on(**executor, CancelationToken::uncancelable()); } TenantMigrationDonorService::Instance::Instance(ServiceContext* serviceContext, @@ -249,7 +249,7 @@ ExecutorFuture TenantMigrationDonorService::Instance::_insertState return shouldStopInsertingDonorStateDoc(swOpTime.getStatus()); }) .withBackoffBetweenIterations(kExponentialBackoff) - .on(**executor); + .on(**executor, CancelationToken::uncancelable()); } ExecutorFuture TenantMigrationDonorService::Instance::_updateStateDocument( @@ -344,7 +344,7 @@ ExecutorFuture TenantMigrationDonorService::Instance::_updateState return shouldStopUpdatingDonorStateDoc(swOpTime.getStatus()); }) .withBackoffBetweenIterations(kExponentialBackoff) - .on(**executor); + .on(**executor, CancelationToken::uncancelable()); } ExecutorFuture @@ -382,7 +382,7 @@ TenantMigrationDonorService::Instance::_markStateDocumentAsGarbageCollectable( return shouldStopUpdatingDonorStateDoc(swOpTime.getStatus()); }) .withBackoffBetweenIterations(kExponentialBackoff) - .on(**executor); + .on(**executor, CancelationToken::uncancelable()); } ExecutorFuture TenantMigrationDonorService::Instance::_waitForMajorityWriteConcern( @@ -446,7 +446,7 @@ ExecutorFuture TenantMigrationDonorService::Instance::_sendCommandToRecipi }) .until([](Status status) { return shouldStopSendingRecipientCommand(status); }) .withBackoffBetweenIterations(kExponentialBackoff) - .on(**executor); + .on(**executor, token); } ExecutorFuture TenantMigrationDonorService::Instance::_sendRecipientSyncDataCommand( diff --git a/src/mongo/db/s/range_deletion_util.cpp b/src/mongo/db/s/range_deletion_util.cpp index 8dd7347d481..3073570682d 100644 --- a/src/mongo/db/s/range_deletion_util.cpp +++ b/src/mongo/db/s/range_deletion_util.cpp @@ -63,6 +63,7 @@ #include "mongo/db/write_concern.h" #include "mongo/executor/task_executor.h" #include "mongo/logv2/log.h" +#include "mongo/util/cancelation.h" #include "mongo/util/future_util.h" namespace mongo { @@ -348,7 +349,7 @@ ExecutorFuture deleteRangeInBatches(const std::shared_ptr sleepFor(std::shared_ptr executor, namespace future_util_details { +/** + * Error status to use if any AsyncTry loop has been canceled. + */ +inline Status asyncTryCanceledStatus() { + static StaticImmortal s = Status{ErrorCodes::CallbackCanceled, "AsyncTry loop canceled"}; + return *s; +} + /** * Widget to get a default-constructible object that allows access to the type passed in at * compile time. Used for getReturnType below. @@ -84,15 +93,20 @@ public: /** * Launches the loop and returns an ExecutorFuture that will be resolved when the loop is - * complete. + * complete. If the executor is already shut down or the cancelToken has already been canceled + * before the loop is launched, the loop body will never run and the resulting ExecutorFuture + * will be set with either a ShutdownInProgress or CallbackCanceled error. * * The returned ExecutorFuture contains the last result returned by the loop body. If the last * iteration of the loop body threw an exception or otherwise returned an error status, the * returned ExecutorFuture will contain that error. */ - auto on(std::shared_ptr executor)&& { - auto loop = std::make_shared( - std::move(executor), std::move(_body), std::move(_condition), std::move(_delay)); + auto on(std::shared_ptr executor, CancelationToken cancelToken)&& { + auto loop = std::make_shared(std::move(executor), + std::move(_body), + std::move(_condition), + std::move(_delay), + std::move(cancelToken)); // Launch the recursive chain using the helper class. return loop->run(); } @@ -106,11 +120,13 @@ private: TryUntilLoopWithDelay(std::shared_ptr executor, BodyCallable executeLoopBody, ConditionCallable shouldStopIteration, - Delay delay) + Delay delay, + CancelationToken cancelToken) : executor(std::move(executor)), executeLoopBody(std::move(executeLoopBody)), shouldStopIteration(std::move(shouldStopIteration)), - delay(std::move(delay)) {} + delay(std::move(delay)), + cancelToken(std::move(cancelToken)) {} /** * Performs actual looping through recursion. @@ -118,15 +134,19 @@ private: ExecutorFuture> run() { using ReturnType = typename decltype(getReturnType())::type; + // If the request to executeLoopBody has already been canceled, don't attempt to run it. + if (cancelToken.isCanceled()) { + return ExecutorFuture(executor, asyncTryCanceledStatus()); + } auto future = ExecutorFuture(executor).then(executeLoopBody); return std::move(future).onCompletion( - [this, self = this->shared_from_this()](StatusOrStatusWith s) mutable { + [this, self = this->shared_from_this()](StatusOrStatusWith s) { if (shouldStopIteration(s)) return ExecutorFuture(executor, std::move(s)); // Retry after a delay. - return sleepFor(executor, delay.getNext()).then([this, self]() mutable { + return executor->sleepFor(delay.getNext(), cancelToken).then([this, self] { return run(); }); }); @@ -136,6 +156,7 @@ private: BodyCallable executeLoopBody; ConditionCallable shouldStopIteration; Delay delay; + CancelationToken cancelToken; }; BodyCallable _body; @@ -177,15 +198,17 @@ public: /** * Launches the loop and returns an ExecutorFuture that will be resolved when the loop is - * complete. + * complete. If the executor is already shut down or the cancelToken has already been canceled + * before the loop is launched, the loop body will never run and the resulting ExecutorFuture + * will be set with either a ShutdownInProgress or CallbackCanceled error. * * The returned ExecutorFuture contains the last result returned by the loop body. If the last * iteration of the loop body threw an exception or otherwise returned an error status, the * returned ExecutorFuture will contain that error. */ - auto on(std::shared_ptr executor)&& { + auto on(std::shared_ptr executor, CancelationToken cancelToken)&& { auto loop = std::make_shared( - std::move(executor), std::move(_body), std::move(_condition)); + std::move(executor), std::move(_body), std::move(_condition), std::move(cancelToken)); // Launch the recursive chain using the helper class. return loop->run(); } @@ -224,10 +247,12 @@ private: struct TryUntilLoop : public std::enable_shared_from_this { TryUntilLoop(std::shared_ptr executor, BodyCallable executeLoopBody, - ConditionCallable shouldStopIteration) + ConditionCallable shouldStopIteration, + CancelationToken cancelToken) : executor(std::move(executor)), executeLoopBody(std::move(executeLoopBody)), - shouldStopIteration(std::move(shouldStopIteration)) {} + shouldStopIteration(std::move(shouldStopIteration)), + cancelToken(std::move(cancelToken)) {} /** * Performs actual looping through recursion. @@ -235,10 +260,13 @@ private: ExecutorFuture> run() { using ReturnType = typename decltype(getReturnType())::type; + // If the request is already canceled, don't run anything. + if (cancelToken.isCanceled()) + return ExecutorFuture(executor, asyncTryCanceledStatus()); auto future = ExecutorFuture(executor).then(executeLoopBody); return std::move(future).onCompletion( - [this, self = this->shared_from_this()](StatusOrStatusWith s) mutable { + [this, self = this->shared_from_this()](StatusOrStatusWith s) { if (shouldStopIteration(s)) return ExecutorFuture(executor, std::move(s)); @@ -249,6 +277,7 @@ private: std::shared_ptr executor; BodyCallable executeLoopBody; ConditionCallable shouldStopIteration; + CancelationToken cancelToken; }; BodyCallable _body; @@ -332,32 +361,31 @@ SemiFuture whenAllSucceed(std::vector&& futures) { auto sharedBlock = std::make_shared(futures.size(), std::move(promise)); for (size_t i = 0; i < futures.size(); ++i) { - std::move(futures[i]) - .getAsync([sharedBlock, myIndex = i](StatusWith swValue) mutable { - if (swValue.isOK()) { - // Best effort check that no error has returned, not required for correctness. - if (!sharedBlock->completedWithError.loadRelaxed()) { - // Put this result in its proper slot in the output vector. - sharedBlock->intermediateResult[myIndex] = std::move(swValue.getValue()); - auto numResultsReturnedWithSuccess = - sharedBlock->numResultsReturnedWithSuccess.addAndFetch(1); - // If this is the last result to return, set the promise. Note that this - // will never be true if one of the input futures resolves with an error, - // since the future with an error will not cause the - // numResultsReturnedWithSuccess count to be incremented. - if (numResultsReturnedWithSuccess == sharedBlock->numFuturesToWaitFor) { - // All results are ready. - sharedBlock->resultPromise.emplaceValue( - std::move(sharedBlock->intermediateResult)); - } - } - } else { - // Make sure no other error has already been set before setting the promise. - if (!sharedBlock->completedWithError.swap(true)) { - sharedBlock->resultPromise.setError(std::move(swValue.getStatus())); + std::move(futures[i]).getAsync([sharedBlock, myIndex = i](StatusWith swValue) { + if (swValue.isOK()) { + // Best effort check that no error has returned, not required for correctness. + if (!sharedBlock->completedWithError.loadRelaxed()) { + // Put this result in its proper slot in the output vector. + sharedBlock->intermediateResult[myIndex] = std::move(swValue.getValue()); + auto numResultsReturnedWithSuccess = + sharedBlock->numResultsReturnedWithSuccess.addAndFetch(1); + // If this is the last result to return, set the promise. Note that this + // will never be true if one of the input futures resolves with an error, + // since the future with an error will not cause the + // numResultsReturnedWithSuccess count to be incremented. + if (numResultsReturnedWithSuccess == sharedBlock->numFuturesToWaitFor) { + // All results are ready. + sharedBlock->resultPromise.emplaceValue( + std::move(sharedBlock->intermediateResult)); } } - }); + } else { + // Make sure no other error has already been set before setting the promise. + if (!sharedBlock->completedWithError.swap(true)) { + sharedBlock->resultPromise.setError(std::move(swValue.getStatus())); + } + } + }); } return std::move(future).semi(); @@ -450,19 +478,17 @@ SemiFuture whenAll(std::vector&& futures) { auto sharedBlock = std::make_shared(futures.size(), std::move(promise)); for (size_t i = 0; i < futures.size(); ++i) { - std::move(futures[i]) - .getAsync([sharedBlock, myIndex = i](StatusOrStatusWith value) mutable { - sharedBlock->intermediateResult[myIndex] = std::move(value); + std::move(futures[i]).getAsync([sharedBlock, myIndex = i](StatusOrStatusWith value) { + sharedBlock->intermediateResult[myIndex] = std::move(value); - auto numReady = sharedBlock->numReady.addAndFetch(1); - invariant(numReady <= sharedBlock->numFuturesToWaitFor); + auto numReady = sharedBlock->numReady.addAndFetch(1); + invariant(numReady <= sharedBlock->numFuturesToWaitFor); - if (numReady == sharedBlock->numFuturesToWaitFor) { - // All results are ready. - sharedBlock->resultPromise.emplaceValue( - std::move(sharedBlock->intermediateResult)); - } - }); + if (numReady == sharedBlock->numFuturesToWaitFor) { + // All results are ready. + sharedBlock->resultPromise.emplaceValue(std::move(sharedBlock->intermediateResult)); + } + }); } return std::move(future).semi(); @@ -504,14 +530,13 @@ SemiFuture whenAny(std::vector&& futures) { auto sharedBlock = std::make_shared(std::move(promise)); for (size_t i = 0; i < futures.size(); ++i) { - std::move(futures[i]) - .getAsync([sharedBlock, myIndex = i](StatusOrStatusWith value) mutable { - // If this is the first input future to complete, change done to true and set the - // value on the promise. - if (!sharedBlock->done.swap(true)) { - sharedBlock->resultPromise.emplaceValue(Result{std::move(value), myIndex}); - } - }); + std::move(futures[i]).getAsync([sharedBlock, myIndex = i](StatusOrStatusWith value) { + // If this is the first input future to complete, change done to true and set the + // value on the promise. + if (!sharedBlock->done.swap(true)) { + sharedBlock->resultPromise.emplaceValue(Result{std::move(value), myIndex}); + } + }); } return std::move(future).semi(); diff --git a/src/mongo/util/future_util_test.cpp b/src/mongo/util/future_util_test.cpp index 93744634593..e9878d21e8c 100644 --- a/src/mongo/util/future_util_test.cpp +++ b/src/mongo/util/future_util_test.cpp @@ -87,7 +87,9 @@ using AsyncTryUntilTest = FutureUtilTest; TEST_F(AsyncTryUntilTest, LoopExecutesOnceWithAlwaysTrueCondition) { auto i = 0; - auto resultFut = AsyncTry([&] { ++i; }).until([](Status s) { return true; }).on(executor()); + auto resultFut = AsyncTry([&] { ++i; }) + .until([](Status s) { return true; }) + .on(executor(), CancelationToken::uncancelable()); resultFut.wait(); ASSERT_EQ(i, 1); @@ -101,7 +103,7 @@ TEST_F(AsyncTryUntilTest, LoopExecutesUntilConditionIsTrue) { return i; }) .until([&](StatusWith swInt) { return swInt.getValue() == numLoops; }) - .on(executor()); + .on(executor(), CancelationToken::uncancelable()); resultFut.wait(); ASSERT_EQ(i, numLoops); @@ -112,7 +114,7 @@ TEST_F(AsyncTryUntilTest, LoopDoesNotRespectConstDelayIfConditionIsAlreadyTrue) auto resultFut = AsyncTry([&] { ++i; }) .until([](Status s) { return true; }) .withDelayBetweenIterations(Seconds(10000000)) - .on(executor()); + .on(executor(), CancelationToken::uncancelable()); // This would hang for a very long time if the behavior were incorrect. resultFut.wait(); @@ -124,7 +126,7 @@ TEST_F(AsyncTryUntilTest, LoopDoesNotRespectBackoffDelayIfConditionIsAlreadyTrue auto resultFut = AsyncTry([&] { ++i; }) .until([](Status s) { return true; }) .withBackoffBetweenIterations(TestBackoff{Seconds(10000000)}) - .on(executor()); + .on(executor(), CancelationToken::uncancelable()); // This would hang for a very long time if the behavior were incorrect. resultFut.wait(); @@ -140,7 +142,7 @@ TEST_F(AsyncTryUntilTest, LoopRespectsConstDelayAfterEvaluatingCondition) { }) .until([&](StatusWith swInt) { return swInt.getValue() == numLoops; }) .withDelayBetweenIterations(Seconds(1000)) - .on(executor()); + .on(executor(), CancelationToken::uncancelable()); ASSERT_FALSE(resultFut.isReady()); // Advance the time some, but not enough to be past the delay yet. @@ -171,7 +173,7 @@ TEST_F(AsyncTryUntilTest, LoopRespectsBackoffDelayAfterEvaluatingCondition) { }) .until([&](StatusWith swInt) { return swInt.getValue() == numLoops; }) .withBackoffBetweenIterations(TestBackoff{Seconds(1000)}) - .on(executor()); + .on(executor(), CancelationToken::uncancelable()); ASSERT_FALSE(resultFut.isReady()); // Due to the backoff, the delays are going to be 1000 seconds and 2000 seconds. @@ -220,7 +222,7 @@ TEST_F(AsyncTryUntilTest, LoopBodyPropagatesValueOfLastIterationToCaller) { return i; }) .until([&](StatusWith swInt) { return i == expectedResult; }) - .on(executor()); + .on(executor(), CancelationToken::uncancelable()); ASSERT_EQ(resultFut.get(), expectedResult); } @@ -235,11 +237,83 @@ TEST_F(AsyncTryUntilTest, LoopBodyPropagatesErrorToConditionAndCaller) { ASSERT_EQ(swInt.getStatus().code(), ErrorCodes::InternalError); return true; }) - .on(executor()); + .on(executor(), CancelationToken::uncancelable()); ASSERT_EQ(resultFut.getNoThrow(), ErrorCodes::InternalError); } +static const Status kCanceledStatus = {ErrorCodes::CallbackCanceled, "AsyncTry::until canceled"}; + +TEST_F(AsyncTryUntilTest, AsyncTryUntilCanBeCanceled) { + CancelationSource cancelSource; + auto resultFut = + AsyncTry([] {}).until([](Status) { return false; }).on(executor(), cancelSource.token()); + // This should hang forever if it is not canceled. + cancelSource.cancel(); + ASSERT_EQ(resultFut.getNoThrow(), kCanceledStatus); +} + +TEST_F(AsyncTryUntilTest, AsyncTryUntilWithDelayCanBeCanceled) { + CancelationSource cancelSource; + auto resultFut = AsyncTry([] {}) + .until([](Status) { return false; }) + .withDelayBetweenIterations(Hours(1000)) + .on(executor(), cancelSource.token()); + // Since the "until" condition is false, and the delay between iterations is very long, the only + // way this test should pass without hanging is if the future produced by TaskExecutor::sleepFor + // is resolved and set with ErrorCodes::CallbackCanceled well _before_ the deadline. + cancelSource.cancel(); + ASSERT_EQ(resultFut.getNoThrow(), kCanceledStatus); +} + +TEST_F(AsyncTryUntilTest, AsyncTryUntilWithBackoffCanBeCanceled) { + CancelationSource cancelSource; + auto resultFut = AsyncTry([] {}) + .until([](Status) { return false; }) + .withBackoffBetweenIterations(TestBackoff{Seconds(10000000)}) + .on(executor(), cancelSource.token()); + cancelSource.cancel(); + ASSERT_EQ(resultFut.getNoThrow(), kCanceledStatus); +} + +TEST_F(AsyncTryUntilTest, CanceledTryUntilLoopDoesNotExecuteIfAlreadyCanceled) { + int counter{0}; + CancelationSource cancelSource; + auto canceledToken = cancelSource.token(); + cancelSource.cancel(); + auto resultFut = AsyncTry([&] { ++counter; }) + .until([](Status) { return false; }) + .on(executor(), canceledToken); + ASSERT_EQ(resultFut.getNoThrow(), kCanceledStatus); + ASSERT_EQ(counter, 0); +} + +TEST_F(AsyncTryUntilTest, CanceledTryUntilLoopWithDelayDoesNotExecuteIfAlreadyCanceled) { + CancelationSource cancelSource; + int counter{0}; + auto canceledToken = cancelSource.token(); + cancelSource.cancel(); + auto resultFut = AsyncTry([&] { ++counter; }) + .until([](Status) { return false; }) + .withDelayBetweenIterations(Hours(1000)) + .on(executor(), canceledToken); + ASSERT_EQ(resultFut.getNoThrow(), kCanceledStatus); + ASSERT_EQ(counter, 0); +} + +TEST_F(AsyncTryUntilTest, CanceledTryUntilLoopWithBackoffDoesNotExecuteIfAlreadyCanceled) { + CancelationSource cancelSource; + int counter{0}; + auto canceledToken = cancelSource.token(); + cancelSource.cancel(); + auto resultFut = AsyncTry([&] { ++counter; }) + .until([](Status) { return false; }) + .withBackoffBetweenIterations(TestBackoff{Seconds(10000000)}) + .on(executor(), canceledToken); + ASSERT_EQ(resultFut.getNoThrow(), kCanceledStatus); + ASSERT_EQ(counter, 0); +} + template std::pair>, std::vector>> makePromisesAndFutures(size_t size) { std::vector> inputFutures; -- cgit v1.2.1