diff options
author | Haley Connelly <haley.connelly@mongodb.com> | 2021-03-03 23:18:30 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2021-03-15 17:49:04 +0000 |
commit | 67ff8452c4172dbbfd2199df2dc349eb739b7bf1 (patch) | |
tree | 6ad799027d4c0cf81ff8366685a48da779f2fd97 /src/mongo/db/s/resharding | |
parent | 8f7f06898e9a9d4da95302f5e1ebe12e1a359586 (diff) | |
download | mongo-67ff8452c4172dbbfd2199df2dc349eb739b7bf1.tar.gz |
SERVER-53931 Use cancelationTokens for resharding recipient replication components
Diffstat (limited to 'src/mongo/db/s/resharding')
13 files changed, 400 insertions, 169 deletions
diff --git a/src/mongo/db/s/resharding/resharding_collection_cloner.cpp b/src/mongo/db/s/resharding/resharding_collection_cloner.cpp index 24592a1a3ca..c1f0cba5d72 100644 --- a/src/mongo/db/s/resharding/resharding_collection_cloner.cpp +++ b/src/mongo/db/s/resharding/resharding_collection_cloner.cpp @@ -265,6 +265,8 @@ std::vector<InsertStatement> ReshardingCollectionCloner::_fillBatch(Pipeline& pi void ReshardingCollectionCloner::_insertBatch(OperationContext* opCtx, std::vector<InsertStatement>& batch) { + // TODO SERVER-55102: Use CancelableOperationContext to prevent retrying once the operation has + // been canceled. writeConflictRetry(opCtx, "ReshardingCollectionCloner::_insertBatch", _outputNss.ns(), [&] { AutoGetCollection outputColl(opCtx, _outputNss, MODE_IX); uassert(ErrorCodes::NamespaceNotFound, @@ -404,7 +406,7 @@ ExecutorFuture<void> ReshardingCollectionCloner::run( return true; }); }) - .until([this, chainCtx](Status status) { + .until([this, chainCtx, cancelToken](Status status) { if (status.isOK() && chainCtx->moreToCome) { return false; } @@ -419,9 +421,12 @@ ExecutorFuture<void> ReshardingCollectionCloner::run( if (status.isA<ErrorCategory::CancelationError>() || status.isA<ErrorCategory::NotPrimaryError>()) { // Cancellation and NotPrimary errors indicate the primary-only service Instance - // will be shut down or is shutting down now. Don't retry and leave resuming to when - // the RecipientStateMachine is restarted on the new primary. - return true; + // will be shut down or is shutting down now - provided the cancelToken is also + // canceled. Otherwise, the errors may have originated from a remote response rather + // than the shard itself. + // + // Don't retry when primary-only service Instance is shutting down. + return cancelToken.isCanceled(); } if (status.isA<ErrorCategory::RetriableError>() || @@ -449,7 +454,19 @@ ExecutorFuture<void> ReshardingCollectionCloner::run( return true; }) - .on(std::move(executor), std::move(cancelToken)); + .on(executor, cancelToken) + .onCompletion([this, chainCtx](Status status) { + if (chainCtx->pipeline) { + // Guarantee the pipeline is always cleaned up - even upon cancelation. + _withTemporaryOperationContext([&](auto* opCtx) { + chainCtx->pipeline->dispose(opCtx); + chainCtx->pipeline.reset(); + }); + } + + // Propagate the result of the AsyncTry. + return status; + }); } } // namespace mongo diff --git a/src/mongo/db/s/resharding/resharding_donor_oplog_iterator.cpp b/src/mongo/db/s/resharding/resharding_donor_oplog_iterator.cpp index dac42b6ba39..bfbd5d4270a 100644 --- a/src/mongo/db/s/resharding/resharding_donor_oplog_iterator.cpp +++ b/src/mongo/db/s/resharding/resharding_donor_oplog_iterator.cpp @@ -49,6 +49,7 @@ #include "mongo/db/s/resharding_util.h" #include "mongo/logv2/log.h" #include "mongo/util/assert_util.h" +#include "mongo/util/future_util.h" #include "mongo/util/str.h" namespace mongo { @@ -207,7 +208,7 @@ std::vector<repl::OplogEntry> ReshardingDonorOplogIterator::_fillBatch(Pipeline& } ExecutorFuture<std::vector<repl::OplogEntry>> ReshardingDonorOplogIterator::getNextBatch( - std::shared_ptr<executor::TaskExecutor> executor) { + std::shared_ptr<executor::TaskExecutor> executor, CancelationToken cancelToken) { if (_hasSeenFinalOplogEntry) { invariant(!_pipeline); return ExecutorFuture(std::move(executor), std::vector<repl::OplogEntry>{}); @@ -246,8 +247,13 @@ ExecutorFuture<std::vector<repl::OplogEntry>> ReshardingDonorOplogIterator::getN if (batch.empty() && !_hasSeenFinalOplogEntry) { return ExecutorFuture(executor) - .then([this] { return _insertNotifier->awaitInsert(_resumeToken); }) - .then([this, executor] { return getNextBatch(std::move(executor)); }); + .then([this, cancelToken] { + return future_util::withCancelation(_insertNotifier->awaitInsert(_resumeToken), + cancelToken); + }) + .then([this, cancelToken, executor] { + return getNextBatch(std::move(executor), cancelToken); + }); } return ExecutorFuture(std::move(executor), std::move(batch)); diff --git a/src/mongo/db/s/resharding/resharding_donor_oplog_iterator.h b/src/mongo/db/s/resharding/resharding_donor_oplog_iterator.h index 9ce38f04eb5..c799bb9d834 100644 --- a/src/mongo/db/s/resharding/resharding_donor_oplog_iterator.h +++ b/src/mongo/db/s/resharding/resharding_donor_oplog_iterator.h @@ -69,7 +69,7 @@ public: * final oplog entry hasn't been returned yet. */ virtual ExecutorFuture<std::vector<repl::OplogEntry>> getNextBatch( - std::shared_ptr<executor::TaskExecutor> executor) = 0; + std::shared_ptr<executor::TaskExecutor> executor, CancelationToken cancelToken) = 0; }; /** @@ -94,7 +94,7 @@ public: OperationContext* opCtx, std::shared_ptr<MongoProcessInterface> mongoProcessInterface); ExecutorFuture<std::vector<repl::OplogEntry>> getNextBatch( - std::shared_ptr<executor::TaskExecutor> executor) override; + std::shared_ptr<executor::TaskExecutor> executor, CancelationToken cancelToken) override; static constexpr auto kActualOpFieldName = "actualOp"_sd; static constexpr auto kPreImageOpFieldName = "preImageOp"_sd; diff --git a/src/mongo/db/s/resharding/resharding_donor_oplog_iterator_test.cpp b/src/mongo/db/s/resharding/resharding_donor_oplog_iterator_test.cpp index a0fe04087ed..4338bb7399b 100644 --- a/src/mongo/db/s/resharding/resharding_donor_oplog_iterator_test.cpp +++ b/src/mongo/db/s/resharding/resharding_donor_oplog_iterator_test.cpp @@ -176,7 +176,9 @@ public: // destructor has run. Otherwise `executor` could end up outliving the ServiceContext and // triggering an invariant due to the task executor's thread having a Client still. return ExecutorFuture(executor) - .then([iter, executor] { return iter->getNextBatch(std::move(executor)); }) + .then([iter, executor] { + return iter->getNextBatch(std::move(executor), CancelationToken::uncancelable()); + }) .then([](auto x) { return x; }) .get(); } diff --git a/src/mongo/db/s/resharding/resharding_oplog_applier.cpp b/src/mongo/db/s/resharding/resharding_oplog_applier.cpp index af8e92a0754..2b069fe72b2 100644 --- a/src/mongo/db/s/resharding/resharding_oplog_applier.cpp +++ b/src/mongo/db/s/resharding/resharding_oplog_applier.cpp @@ -271,35 +271,36 @@ ReshardingOplogApplier::ReshardingOplogApplier( _writerPool(writerPool), _oplogIter(std::move(oplogIterator)) {} -ExecutorFuture<void> ReshardingOplogApplier::applyUntilCloneFinishedTs() { +ExecutorFuture<void> ReshardingOplogApplier::applyUntilCloneFinishedTs( + CancelationToken cancelToken) { invariant(_stage == ReshardingOplogApplier::Stage::kStarted); // It is safe to capture `this` because PrimaryOnlyService and RecipientStateMachine // collectively guarantee that the ReshardingOplogApplier instances will outlive `_executor` and // `_writerPool`. return ExecutorFuture(_executor) - .then([this] { return _scheduleNextBatch(); }) + .then([this, cancelToken] { return _scheduleNextBatch(cancelToken); }) .onError([this](Status status) { return _onError(status); }); } -ExecutorFuture<void> ReshardingOplogApplier::applyUntilDone() { +ExecutorFuture<void> ReshardingOplogApplier::applyUntilDone(CancelationToken cancelToken) { invariant(_stage == ReshardingOplogApplier::Stage::kReachedCloningTS); // It is safe to capture `this` because PrimaryOnlyService and RecipientStateMachine // collectively guarantee that the ReshardingOplogApplier instances will outlive `_executor` and // `_writerPool`. return ExecutorFuture(_executor) - .then([this] { return _scheduleNextBatch(); }) + .then([this, cancelToken] { return _scheduleNextBatch(cancelToken); }) .onError([this](Status status) { return _onError(status); }); } -ExecutorFuture<void> ReshardingOplogApplier::_scheduleNextBatch() { +ExecutorFuture<void> ReshardingOplogApplier::_scheduleNextBatch(CancelationToken cancelToken) { return ExecutorFuture(_executor) - .then([this] { + .then([this, cancelToken] { auto batchClient = makeKillableClient(_service(), kClientName); AlternativeClientRegion acr(batchClient); - return _oplogIter->getNextBatch(_executor); + return _oplogIter->getNextBatch(_executor, cancelToken); }) .then([this](OplogBatch batch) { LOGV2_DEBUG(5391002, 3, "Starting batch", "batchSize"_attr = batch.size()); @@ -349,11 +350,18 @@ ExecutorFuture<void> ReshardingOplogApplier::_scheduleNextBatch() { return true; }) - .then([this](bool moreToApply) { + .then([this, cancelToken](bool moreToApply) { if (!moreToApply) { return ExecutorFuture(_executor); } - return _scheduleNextBatch(); + + if (cancelToken.isCanceled()) { + return ExecutorFuture<void>( + _executor, + Status{ErrorCodes::CallbackCanceled, + "Resharding oplog applier aborting due to abort or stepdown"}); + } + return _scheduleNextBatch(cancelToken); }); } diff --git a/src/mongo/db/s/resharding/resharding_oplog_applier.h b/src/mongo/db/s/resharding/resharding_oplog_applier.h index 3930001fcff..061a7b17d50 100644 --- a/src/mongo/db/s/resharding/resharding_oplog_applier.h +++ b/src/mongo/db/s/resharding/resharding_oplog_applier.h @@ -102,7 +102,7 @@ public: * greater than or equal to reshardingCloneFinishedTs. * It is undefined to call applyUntilCloneFinishedTs more than once. */ - ExecutorFuture<void> applyUntilCloneFinishedTs(); + ExecutorFuture<void> applyUntilCloneFinishedTs(CancelationToken cancelToken); /** * Applies oplog from the iterator until it is exhausted or hits an error. It is an error to @@ -111,7 +111,7 @@ public: * It is an error to call this when applyUntilCloneFinishedTs future returns an error. * It is undefined to call applyUntilDone more than once. */ - ExecutorFuture<void> applyUntilDone(); + ExecutorFuture<void> applyUntilDone(CancelationToken cancelToken); static boost::optional<ReshardingOplogApplierProgress> checkStoredProgress( OperationContext* opCtx, const ReshardingSourceId& id); @@ -130,7 +130,7 @@ private: * Returns a future that becomes ready when the next batch of oplog entries have been collected * and applied. */ - ExecutorFuture<void> _scheduleNextBatch(); + ExecutorFuture<void> _scheduleNextBatch(CancelationToken cancelToken); /** * Setup the worker threads to apply the ops in the current buffer in parallel. Waits for all diff --git a/src/mongo/db/s/resharding/resharding_oplog_applier_test.cpp b/src/mongo/db/s/resharding/resharding_oplog_applier_test.cpp index 6a137dcc4ed..fb0f3ae2b94 100644 --- a/src/mongo/db/s/resharding/resharding_oplog_applier_test.cpp +++ b/src/mongo/db/s/resharding/resharding_oplog_applier_test.cpp @@ -72,7 +72,7 @@ public: } ExecutorFuture<std::vector<repl::OplogEntry>> getNextBatch( - std::shared_ptr<executor::TaskExecutor> executor) override { + std::shared_ptr<executor::TaskExecutor> executor, CancelationToken cancelToken) override { // This operation context is unused by the function but confirms that the Client calling // getNextBatch() doesn't already have an operation context. auto opCtx = cc().makeOperationContext(); @@ -332,10 +332,11 @@ TEST_F(ReshardingOplogApplierTest, NothingToIterate) { executor, writerPool.get()); - auto future = applier->applyUntilCloneFinishedTs(); + auto cancelToken = operationContext()->getCancelationToken(); + auto future = applier->applyUntilCloneFinishedTs(cancelToken); future.get(); - future = applier->applyUntilDone(); + future = applier->applyUntilDone(cancelToken); future.get(); } @@ -375,7 +376,8 @@ TEST_F(ReshardingOplogApplierTest, ApplyBasicCrud) { executor, writerPool.get()); - auto future = applier->applyUntilCloneFinishedTs(); + auto cancelToken = operationContext()->getCancelationToken(); + auto future = applier->applyUntilCloneFinishedTs(cancelToken); future.get(); DBDirectClient client(operationContext()); @@ -385,7 +387,7 @@ TEST_F(ReshardingOplogApplierTest, ApplyBasicCrud) { doc = client.findOne(appliedToNs().ns(), BSON("_id" << 2)); ASSERT_BSONOBJ_EQ(BSON("_id" << 2), doc); - future = applier->applyUntilDone(); + future = applier->applyUntilDone(cancelToken); future.get(); doc = client.findOne(appliedToNs().ns(), BSON("_id" << 1)); @@ -400,6 +402,94 @@ TEST_F(ReshardingOplogApplierTest, ApplyBasicCrud) { ASSERT_EQ(Timestamp(8, 3), progressDoc->getProgress().getTs()); } +TEST_F(ReshardingOplogApplierTest, CanceledCloningBatch) { + std::deque<repl::OplogEntry> crudOps; + crudOps.push_back(makeOplog(repl::OpTime(Timestamp(5, 3), 1), + repl::OpTypeEnum::kInsert, + BSON("_id" << 1), + boost::none)); + crudOps.push_back(makeOplog(repl::OpTime(Timestamp(6, 3), 1), + repl::OpTypeEnum::kInsert, + BSON("_id" << 2), + boost::none)); + crudOps.push_back(makeOplog(repl::OpTime(Timestamp(7, 3), 1), + repl::OpTypeEnum::kInsert, + BSON("_id" << 2), + boost::none)); + + auto iterator = std::make_unique<OplogIteratorMock>(std::move(crudOps), 2 /* batchSize */); + boost::optional<ReshardingOplogApplier> applier; + auto executor = makeTaskExecutorForApplier(); + auto writerPool = repl::makeReplWriterPool(kWriterPoolSize); + + applier.emplace(makeApplierEnv(), + sourceId(), + oplogNs(), + crudNs(), + crudUUID(), + stashCollections(), + 0U, /* myStashIdx */ + Timestamp(7, 3), + std::move(iterator), + chunkManager(), + executor, + writerPool.get()); + + // Cancel the rescheduling of the next batch. + auto abortSource = CancelationSource(); + abortSource.cancel(); + + auto future = applier->applyUntilCloneFinishedTs(abortSource.token()); + ASSERT_THROWS_CODE(future.get(), DBException, ErrorCodes::CallbackCanceled); +} + +TEST_F(ReshardingOplogApplierTest, CanceledApplyingBatch) { + std::deque<repl::OplogEntry> crudOps; + crudOps.push_back(makeOplog(repl::OpTime(Timestamp(5, 3), 1), + repl::OpTypeEnum::kInsert, + BSON("_id" << 1), + boost::none)); + crudOps.push_back(makeOplog(repl::OpTime(Timestamp(6, 3), 1), + repl::OpTypeEnum::kInsert, + BSON("_id" << 2), + boost::none)); + crudOps.push_back(makeOplog(repl::OpTime(Timestamp(7, 3), 1), + repl::OpTypeEnum::kUpdate, + BSON("$set" << BSON("x" << 1)), + BSON("_id" << 2))); + crudOps.push_back(makeOplog(repl::OpTime(Timestamp(8, 3), 1), + repl::OpTypeEnum::kDelete, + BSON("_id" << 1), + boost::none)); + + auto iterator = std::make_unique<OplogIteratorMock>(std::move(crudOps), 2 /* batchSize */); + boost::optional<ReshardingOplogApplier> applier; + auto executor = makeTaskExecutorForApplier(); + auto writerPool = repl::makeReplWriterPool(kWriterPoolSize); + + applier.emplace(makeApplierEnv(), + sourceId(), + oplogNs(), + crudNs(), + crudUUID(), + stashCollections(), + 0U, /* myStashIdx */ + Timestamp(6, 3), + std::move(iterator), + chunkManager(), + executor, + writerPool.get()); + + auto abortSource = CancelationSource(); + auto future = applier->applyUntilCloneFinishedTs(abortSource.token()); + future.get(); + + abortSource.cancel(); + + future = applier->applyUntilDone(abortSource.token()); + ASSERT_THROWS_CODE(future.get(), DBException, ErrorCodes::CallbackCanceled); +} + TEST_F(ReshardingOplogApplierTest, InsertTypeOplogAppliedInMultipleBatches) { std::deque<repl::OplogEntry> crudOps; @@ -427,7 +517,8 @@ TEST_F(ReshardingOplogApplierTest, InsertTypeOplogAppliedInMultipleBatches) { executor, writerPool.get()); - auto future = applier->applyUntilCloneFinishedTs(); + auto cancelToken = operationContext()->getCancelationToken(); + auto future = applier->applyUntilCloneFinishedTs(cancelToken); future.get(); DBDirectClient client(operationContext()); @@ -445,7 +536,7 @@ TEST_F(ReshardingOplogApplierTest, InsertTypeOplogAppliedInMultipleBatches) { ASSERT_EQ(Timestamp(8, 3), progressDoc->getProgress().getClusterTime()); ASSERT_EQ(Timestamp(8, 3), progressDoc->getProgress().getTs()); - future = applier->applyUntilDone(); + future = applier->applyUntilDone(cancelToken); future.get(); for (int x = 0; x < 19; x++) { @@ -487,7 +578,8 @@ TEST_F(ReshardingOplogApplierTest, ErrorDuringBatchApplyCloningPhase) { executor, writerPool.get()); - auto future = applier->applyUntilCloneFinishedTs(); + auto cancelToken = operationContext()->getCancelationToken(); + auto future = applier->applyUntilCloneFinishedTs(cancelToken); ASSERT_THROWS_CODE(future.get(), DBException, ErrorCodes::FailedToParse); @@ -535,10 +627,11 @@ TEST_F(ReshardingOplogApplierTest, ErrorDuringBatchApplyCatchUpPhase) { executor, writerPool.get()); - auto future = applier->applyUntilCloneFinishedTs(); + auto cancelToken = operationContext()->getCancelationToken(); + auto future = applier->applyUntilCloneFinishedTs(cancelToken); future.get(); - future = applier->applyUntilDone(); + future = applier->applyUntilDone(cancelToken); ASSERT_THROWS_CODE(future.get(), DBException, ErrorCodes::FailedToParse); @@ -584,7 +677,8 @@ TEST_F(ReshardingOplogApplierTest, ErrorWhileIteratingFirstOplogCloningPhase) { executor, writerPool.get()); - auto future = applier->applyUntilCloneFinishedTs(); + auto cancelToken = operationContext()->getCancelationToken(); + auto future = applier->applyUntilCloneFinishedTs(cancelToken); ASSERT_THROWS_CODE(future.get(), DBException, ErrorCodes::InternalError); @@ -630,10 +724,11 @@ TEST_F(ReshardingOplogApplierTest, ErrorWhileIteratingFirstOplogCatchUpPhase) { executor, writerPool.get()); - auto future = applier->applyUntilCloneFinishedTs(); + auto cancelToken = operationContext()->getCancelationToken(); + auto future = applier->applyUntilCloneFinishedTs(cancelToken); future.get(); - future = applier->applyUntilDone(); + future = applier->applyUntilDone(cancelToken); ASSERT_THROWS_CODE(future.get(), DBException, ErrorCodes::InternalError); DBDirectClient client(operationContext()); @@ -676,7 +771,8 @@ TEST_F(ReshardingOplogApplierTest, ErrorWhileIteratingFirstBatchCloningPhase) { executor, writerPool.get()); - auto future = applier->applyUntilCloneFinishedTs(); + auto cancelToken = operationContext()->getCancelationToken(); + auto future = applier->applyUntilCloneFinishedTs(cancelToken); ASSERT_THROWS_CODE(future.get(), DBException, ErrorCodes::InternalError); @@ -726,10 +822,11 @@ TEST_F(ReshardingOplogApplierTest, ErrorWhileIteratingFirstBatchCatchUpPhase) { executor, writerPool.get()); - auto future = applier->applyUntilCloneFinishedTs(); + auto cancelToken = operationContext()->getCancelationToken(); + auto future = applier->applyUntilCloneFinishedTs(cancelToken); future.get(); - future = applier->applyUntilDone(); + future = applier->applyUntilDone(cancelToken); ASSERT_THROWS_CODE(future.get(), DBException, ErrorCodes::InternalError); @@ -777,7 +874,8 @@ TEST_F(ReshardingOplogApplierTest, ErrorWhileIteratingSecondBatchCloningPhase) { executor, writerPool.get()); - auto future = applier->applyUntilCloneFinishedTs(); + auto cancelToken = operationContext()->getCancelationToken(); + auto future = applier->applyUntilCloneFinishedTs(cancelToken); ASSERT_THROWS_CODE(future.get(), DBException, ErrorCodes::InternalError); @@ -839,10 +937,11 @@ TEST_F(ReshardingOplogApplierTest, ErrorWhileIteratingSecondBatchCatchUpPhase) { executor, writerPool.get()); - auto future = applier->applyUntilCloneFinishedTs(); + auto cancelToken = operationContext()->getCancelationToken(); + auto future = applier->applyUntilCloneFinishedTs(cancelToken); future.get(); - future = applier->applyUntilDone(); + future = applier->applyUntilDone(cancelToken); ASSERT_THROWS_CODE(future.get(), DBException, ErrorCodes::InternalError); @@ -894,7 +993,8 @@ TEST_F(ReshardingOplogApplierTest, ExecutorIsShutDownCloningPhase) { executor->shutdown(); - auto future = applier->applyUntilCloneFinishedTs(); + auto cancelToken = operationContext()->getCancelationToken(); + auto future = applier->applyUntilCloneFinishedTs(cancelToken); ASSERT_THROWS_CODE(future.get(), DBException, ErrorCodes::ShutdownInProgress); DBDirectClient client(operationContext()); @@ -937,11 +1037,12 @@ TEST_F(ReshardingOplogApplierTest, ExecutorIsShutDownCatchUpPhase) { executor, writerPool.get()); - auto future = applier->applyUntilCloneFinishedTs(); + auto cancelToken = operationContext()->getCancelationToken(); + auto future = applier->applyUntilCloneFinishedTs(cancelToken); future.get(); executor->shutdown(); - future = applier->applyUntilDone(); + future = applier->applyUntilDone(cancelToken); ASSERT_THROWS_CODE(future.get(), DBException, ErrorCodes::ShutdownInProgress); @@ -981,7 +1082,8 @@ TEST_F(ReshardingOplogApplierTest, WriterPoolIsShutDownCloningPhase) { writerPool->shutdown(); - auto future = applier->applyUntilCloneFinishedTs(); + auto cancelToken = operationContext()->getCancelationToken(); + auto future = applier->applyUntilCloneFinishedTs(cancelToken); ASSERT_THROWS_CODE(future.get(), DBException, ErrorCodes::ShutdownInProgress); DBDirectClient client(operationContext()); @@ -1024,11 +1126,12 @@ TEST_F(ReshardingOplogApplierTest, WriterPoolIsShutDownCatchUpPhase) { executor, writerPool.get()); - auto future = applier->applyUntilCloneFinishedTs(); + auto cancelToken = operationContext()->getCancelationToken(); + auto future = applier->applyUntilCloneFinishedTs(cancelToken); future.get(); writerPool->shutdown(); - future = applier->applyUntilDone(); + future = applier->applyUntilDone(cancelToken); ASSERT_THROWS_CODE(future.get(), DBException, ErrorCodes::ShutdownInProgress); @@ -1080,7 +1183,8 @@ TEST_F(ReshardingOplogApplierTest, InsertOpIntoOuputCollectionUseReshardingAppli executor, writerPool.get()); - auto future = applier->applyUntilCloneFinishedTs(); + auto cancelToken = operationContext()->getCancelationToken(); + auto future = applier->applyUntilCloneFinishedTs(cancelToken); future.get(); DBDirectClient client(operationContext()); @@ -1090,7 +1194,7 @@ TEST_F(ReshardingOplogApplierTest, InsertOpIntoOuputCollectionUseReshardingAppli doc = client.findOne(appliedToNs().ns(), BSON("_id" << 2)); ASSERT_BSONOBJ_EQ(BSON("_id" << 2), doc); - future = applier->applyUntilDone(); + future = applier->applyUntilDone(cancelToken); future.get(); doc = client.findOne(appliedToNs().ns(), BSON("_id" << 3)); @@ -1138,14 +1242,15 @@ TEST_F(ReshardingOplogApplierTest, DBDirectClient client(operationContext()); client.insert(appliedToNs().toString(), BSON("_id" << 1 << "sk" << 1)); - auto future = applier->applyUntilCloneFinishedTs(); + auto cancelToken = operationContext()->getCancelationToken(); + auto future = applier->applyUntilCloneFinishedTs(cancelToken); future.get(); // We should have replaced the existing doc in the output collection. auto doc = client.findOne(appliedToNs().ns(), BSON("_id" << 1)); ASSERT_BSONOBJ_EQ(BSON("_id" << 1 << "sk" << 2), doc); - future = applier->applyUntilDone(); + future = applier->applyUntilDone(cancelToken); future.get(); auto progressDoc = ReshardingOplogApplier::checkStoredProgress(operationContext(), sourceId()); @@ -1191,7 +1296,8 @@ TEST_F(ReshardingOplogApplierTest, DBDirectClient client(operationContext()); client.insert(appliedToNs().toString(), BSON("_id" << 1 << "sk" << -1)); - auto future = applier->applyUntilCloneFinishedTs(); + auto cancelToken = operationContext()->getCancelationToken(); + auto future = applier->applyUntilCloneFinishedTs(cancelToken); future.get(); // The output collection should still hold the doc {_id: 1, sk: -1}, and the doc with {_id: 1, @@ -1202,7 +1308,7 @@ TEST_F(ReshardingOplogApplierTest, doc = client.findOne(stashNs().ns(), BSON("_id" << 1)); ASSERT_BSONOBJ_EQ(BSON("_id" << 1 << "sk" << 2), doc); - future = applier->applyUntilDone(); + future = applier->applyUntilDone(cancelToken); future.get(); // The output collection should still hold the doc {_id: 1, x: 1}. We should have applied rule @@ -1257,7 +1363,8 @@ TEST_F(ReshardingOplogApplierTest, DBDirectClient client(operationContext()); client.insert(appliedToNs().toString(), BSON("_id" << 1 << "sk" << -1)); - auto future = applier->applyUntilCloneFinishedTs(); + auto cancelToken = operationContext()->getCancelationToken(); + auto future = applier->applyUntilCloneFinishedTs(cancelToken); future.get(); // The output collection should still hold the doc {_id: 1, sk: -1}, and the doc with {_id: 1, @@ -1268,7 +1375,7 @@ TEST_F(ReshardingOplogApplierTest, doc = client.findOne(stashNs().ns(), BSON("_id" << 1)); ASSERT_BSONOBJ_EQ(BSON("_id" << 1 << "sk" << 2), doc); - future = applier->applyUntilDone(); + future = applier->applyUntilDone(cancelToken); future.get(); // We should have applied rule #1 and deleted the doc with {_id : 1} from the stash collection @@ -1322,7 +1429,8 @@ TEST_F(ReshardingOplogApplierTest, DBDirectClient client(operationContext()); client.insert(appliedToNs().ns(), BSON("_id" << 1 << "sk" << -1)); - auto future = applier->applyUntilCloneFinishedTs(); + auto cancelToken = operationContext()->getCancelationToken(); + auto future = applier->applyUntilCloneFinishedTs(cancelToken); future.get(); // The doc {_id: 1, sk: -1} that exists in the output collection does not belong to this donor @@ -1331,7 +1439,7 @@ TEST_F(ReshardingOplogApplierTest, auto doc = client.findOne(appliedToNs().ns(), BSON("_id" << 1)); ASSERT_BSONOBJ_EQ(BSON("_id" << 1 << "sk" << -1), doc); - future = applier->applyUntilDone(); + future = applier->applyUntilDone(cancelToken); future.get(); // There does not exist a doc with {_id : 2} in the output collection, so we should have applied @@ -1389,7 +1497,8 @@ TEST_F(ReshardingOplogApplierTest, writerPool.get()); // Apply the inserts first so there exists docs in the output collection - auto future = applier->applyUntilCloneFinishedTs(); + auto cancelToken = operationContext()->getCancelationToken(); + auto future = applier->applyUntilCloneFinishedTs(cancelToken); future.get(); DBDirectClient client(operationContext()); @@ -1399,7 +1508,7 @@ TEST_F(ReshardingOplogApplierTest, doc = client.findOne(appliedToNs().ns(), BSON("_id" << 2)); ASSERT_BSONOBJ_EQ(BSON("_id" << 2 << "sk" << 2), doc); - future = applier->applyUntilDone(); + future = applier->applyUntilDone(cancelToken); future.get(); // None of the stash collections have docs with _id == [op _id], so we should not have found any @@ -1460,7 +1569,8 @@ TEST_F(ReshardingOplogApplierTest, DBDirectClient client(operationContext()); client.insert(stashCollections()[1].toString(), BSON("_id" << 1 << "sk" << -3)); - auto future = applier->applyUntilCloneFinishedTs(); + auto cancelToken = operationContext()->getCancelationToken(); + auto future = applier->applyUntilCloneFinishedTs(cancelToken); future.get(); // The output collection should now hold the doc {_id: 1, sk: 1}. @@ -1475,7 +1585,7 @@ TEST_F(ReshardingOplogApplierTest, doc = client.findOne(stashCollections()[1].toString(), BSON("_id" << 1)); ASSERT_BSONOBJ_EQ(BSON("_id" << 1 << "sk" << -3), doc); - future = applier->applyUntilDone(); + future = applier->applyUntilDone(cancelToken); future.get(); // We should have applied rule #4 and deleted the doc that was in the output collection {_id: 1, @@ -1548,7 +1658,8 @@ TEST_F(ReshardingOplogApplierTest, UpdateShouldModifyStashCollectionUseReshardin DBDirectClient client(operationContext()); client.insert(appliedToNs().toString(), BSON("_id" << 1 << "sk" << -1)); - auto future = applier->applyUntilCloneFinishedTs(); + auto cancelToken = operationContext()->getCancelationToken(); + auto future = applier->applyUntilCloneFinishedTs(cancelToken); future.get(); // The output collection should still hold the doc {_id: 1, sk: -1}, and the doc with {_id: 1, @@ -1559,7 +1670,7 @@ TEST_F(ReshardingOplogApplierTest, UpdateShouldModifyStashCollectionUseReshardin doc = client.findOne(stashNs().ns(), BSON("_id" << 1)); ASSERT_BSONOBJ_EQ(BSON("_id" << 1 << "sk" << 2), doc); - future = applier->applyUntilDone(); + future = applier->applyUntilDone(cancelToken); future.get(); // We should have applied rule #1 and updated the doc with {_id : 1} in the stash collection @@ -1612,7 +1723,8 @@ TEST_F(ReshardingOplogApplierTest, UpdateShouldDoNothingUseReshardingApplication DBDirectClient client(operationContext()); client.insert(appliedToNs().ns(), BSON("_id" << 1 << "sk" << -1)); - auto future = applier->applyUntilCloneFinishedTs(); + auto cancelToken = operationContext()->getCancelationToken(); + auto future = applier->applyUntilCloneFinishedTs(cancelToken); future.get(); // The doc {_id: 1, sk: -1} that exists in the output collection does not belong to this donor @@ -1621,7 +1733,7 @@ TEST_F(ReshardingOplogApplierTest, UpdateShouldDoNothingUseReshardingApplication auto doc = client.findOne(appliedToNs().ns(), BSON("_id" << 1)); ASSERT_BSONOBJ_EQ(BSON("_id" << 1 << "sk" << -1), doc); - future = applier->applyUntilDone(); + future = applier->applyUntilDone(cancelToken); future.get(); // There does not exist a doc with {_id : 2} in the output collection, so we should have applied @@ -1678,7 +1790,8 @@ TEST_F(ReshardingOplogApplierTest, UpdateOutputCollUseReshardingApplicationRules writerPool.get()); // Apply the inserts first so there exists docs in the output collection. - auto future = applier->applyUntilCloneFinishedTs(); + auto cancelToken = operationContext()->getCancelationToken(); + auto future = applier->applyUntilCloneFinishedTs(cancelToken); future.get(); DBDirectClient client(operationContext()); @@ -1688,7 +1801,7 @@ TEST_F(ReshardingOplogApplierTest, UpdateOutputCollUseReshardingApplicationRules doc = client.findOne(appliedToNs().ns(), BSON("_id" << 2)); ASSERT_BSONOBJ_EQ(BSON("_id" << 2 << "sk" << 2), doc); - future = applier->applyUntilDone(); + future = applier->applyUntilDone(cancelToken); future.get(); // We should have updated both docs in the output collection to include the new field "x". @@ -1737,14 +1850,15 @@ TEST_F(ReshardingOplogApplierTest, UnsupportedCommandOpsShouldErrorUseResharding executor, writerPool.get()); - auto future = applier->applyUntilCloneFinishedTs(); + auto cancelToken = operationContext()->getCancelationToken(); + auto future = applier->applyUntilCloneFinishedTs(cancelToken); future.get(); DBDirectClient client(operationContext()); auto doc = client.findOne(appliedToNs().ns(), BSON("_id" << 1)); ASSERT_BSONOBJ_EQ(BSON("_id" << 1), doc); - future = applier->applyUntilDone(); + future = applier->applyUntilDone(cancelToken); ASSERT_THROWS_CODE(future.get(), DBException, ErrorCodes::OplogOperationUnsupported); @@ -1782,7 +1896,8 @@ TEST_F(ReshardingOplogApplierTest, executor, writerPool.get()); - auto future = applier->applyUntilCloneFinishedTs(); + auto cancelToken = operationContext()->getCancelationToken(); + auto future = applier->applyUntilCloneFinishedTs(cancelToken); ASSERT_THROWS_CODE(future.get(), DBException, ErrorCodes::OplogOperationUnsupported); @@ -2246,10 +2361,11 @@ TEST_F(ReshardingOplogApplierRetryableTest, GroupInserts) { executor, writerPool.get()); - auto future = applier->applyUntilCloneFinishedTs(); + auto cancelToken = operationContext()->getCancelationToken(); + auto future = applier->applyUntilCloneFinishedTs(cancelToken); future.get(); - future = applier->applyUntilDone(); + future = applier->applyUntilDone(cancelToken); future.get(); DBDirectClient client(operationContext()); @@ -2333,10 +2449,11 @@ TEST_F(ReshardingOplogApplierRetryableTest, CrudWithEmptyConfigTransactions) { executor, writerPool.get()); - auto future = applier->applyUntilCloneFinishedTs(); + auto cancelToken = operationContext()->getCancelationToken(); + auto future = applier->applyUntilCloneFinishedTs(cancelToken); future.get(); - future = applier->applyUntilDone(); + future = applier->applyUntilDone(cancelToken); future.get(); DBDirectClient client(operationContext()); @@ -2419,10 +2536,11 @@ TEST_F(ReshardingOplogApplierRetryableTest, MultipleTxnSameLsidInOneBatch) { executor, writerPool.get()); - auto future = applier->applyUntilCloneFinishedTs(); + auto cancelToken = operationContext()->getCancelationToken(); + auto future = applier->applyUntilCloneFinishedTs(cancelToken); future.get(); - future = applier->applyUntilDone(); + future = applier->applyUntilDone(cancelToken); future.get(); DBDirectClient client(operationContext()); @@ -2479,10 +2597,11 @@ TEST_F(ReshardingOplogApplierRetryableTest, RetryableWithLowerExistingTxn) { executor, writerPool.get()); - auto future = applier->applyUntilCloneFinishedTs(); + auto cancelToken = operationContext()->getCancelationToken(); + auto future = applier->applyUntilCloneFinishedTs(cancelToken); future.get(); - future = applier->applyUntilDone(); + future = applier->applyUntilDone(cancelToken); future.get(); DBDirectClient client(operationContext()); @@ -2532,10 +2651,11 @@ TEST_F(ReshardingOplogApplierRetryableTest, RetryableWithHigherExistingTxnNum) { executor, writerPool.get()); - auto future = applier->applyUntilCloneFinishedTs(); + auto cancelToken = operationContext()->getCancelationToken(); + auto future = applier->applyUntilCloneFinishedTs(cancelToken); future.get(); - future = applier->applyUntilDone(); + future = applier->applyUntilDone(cancelToken); future.get(); // Op should always be applied, even if session info was not compatible. @@ -2596,10 +2716,11 @@ TEST_F(ReshardingOplogApplierRetryableTest, RetryableWithEqualExistingTxnNum) { executor, writerPool.get()); - auto future = applier->applyUntilCloneFinishedTs(); + auto cancelToken = operationContext()->getCancelationToken(); + auto future = applier->applyUntilCloneFinishedTs(cancelToken); future.get(); - future = applier->applyUntilDone(); + future = applier->applyUntilDone(cancelToken); future.get(); DBDirectClient client(operationContext()); @@ -2650,10 +2771,11 @@ TEST_F(ReshardingOplogApplierRetryableTest, RetryableWithStmtIdAlreadyExecuted) executor, writerPool.get()); - auto future = applier->applyUntilCloneFinishedTs(); + auto cancelToken = operationContext()->getCancelationToken(); + auto future = applier->applyUntilCloneFinishedTs(cancelToken); future.get(); - future = applier->applyUntilDone(); + future = applier->applyUntilDone(cancelToken); future.get(); DBDirectClient client(operationContext()); @@ -2706,10 +2828,11 @@ TEST_F(ReshardingOplogApplierRetryableTest, RetryableWithActiveUnpreparedTxnSame executor, writerPool.get()); - auto future = applier->applyUntilCloneFinishedTs(); + auto cancelToken = operationContext()->getCancelationToken(); + auto future = applier->applyUntilCloneFinishedTs(cancelToken); future.get(); - future = applier->applyUntilDone(); + future = applier->applyUntilDone(cancelToken); future.get(); // Op should always be applied, even if session info was not compatible. @@ -2763,10 +2886,11 @@ TEST_F(ReshardingOplogApplierRetryableTest, RetryableWithActiveUnpreparedTxnWith executor, writerPool.get()); - auto future = applier->applyUntilCloneFinishedTs(); + auto cancelToken = operationContext()->getCancelationToken(); + auto future = applier->applyUntilCloneFinishedTs(cancelToken); future.get(); - future = applier->applyUntilDone(); + future = applier->applyUntilDone(cancelToken); future.get(); // Op should always be applied, even if session info was not compatible. @@ -2819,7 +2943,8 @@ TEST_F(ReshardingOplogApplierRetryableTest, RetryableWithPreparedTxnThatWillComm executor, writerPool.get()); - auto future = applier->applyUntilCloneFinishedTs(); + auto cancelToken = operationContext()->getCancelationToken(); + auto future = applier->applyUntilCloneFinishedTs(cancelToken); // Sleep a little bit to make the applier block on the prepared transaction. sleepmillis(200); @@ -2830,7 +2955,7 @@ TEST_F(ReshardingOplogApplierRetryableTest, RetryableWithPreparedTxnThatWillComm future.get(); - future = applier->applyUntilDone(); + future = applier->applyUntilDone(cancelToken); future.get(); // Op should always be applied, even if session info was not compatible. @@ -2883,7 +3008,8 @@ TEST_F(ReshardingOplogApplierRetryableTest, RetryableWithPreparedTxnThatWillAbor executor, writerPool.get()); - auto future = applier->applyUntilCloneFinishedTs(); + auto cancelToken = operationContext()->getCancelationToken(); + auto future = applier->applyUntilCloneFinishedTs(cancelToken); // Sleep a little bit to make the applier block on the prepared transaction. sleepmillis(200); @@ -2894,7 +3020,7 @@ TEST_F(ReshardingOplogApplierRetryableTest, RetryableWithPreparedTxnThatWillAbor future.get(); - future = applier->applyUntilDone(); + future = applier->applyUntilDone(cancelToken); future.get(); // Op should always be applied, even if session info was not compatible. @@ -2954,10 +3080,11 @@ TEST_F(ReshardingOplogApplierRetryableTest, RetryableWriteWithPreImage) { executor, writerPool.get()); - auto future = applier->applyUntilCloneFinishedTs(); + auto cancelToken = operationContext()->getCancelationToken(); + auto future = applier->applyUntilCloneFinishedTs(cancelToken); future.get(); - future = applier->applyUntilDone(); + future = applier->applyUntilDone(cancelToken); future.get(); DBDirectClient client(operationContext()); @@ -3020,10 +3147,11 @@ TEST_F(ReshardingOplogApplierRetryableTest, RetryableWriteWithPostImage) { executor, writerPool.get()); - auto future = applier->applyUntilCloneFinishedTs(); + auto cancelToken = operationContext()->getCancelationToken(); + auto future = applier->applyUntilCloneFinishedTs(cancelToken); future.get(); - future = applier->applyUntilDone(); + future = applier->applyUntilDone(cancelToken); future.get(); DBDirectClient client(operationContext()); @@ -3069,10 +3197,11 @@ TEST_F(ReshardingOplogApplierRetryableTest, ApplyTxnWithLowerExistingTxn) { executor, writerPool.get()); - auto future = applier->applyUntilCloneFinishedTs(); + auto cancelToken = operationContext()->getCancelationToken(); + auto future = applier->applyUntilCloneFinishedTs(cancelToken); future.get(); - future = applier->applyUntilDone(); + future = applier->applyUntilDone(cancelToken); future.get(); DBDirectClient client(operationContext()); @@ -3119,10 +3248,11 @@ TEST_F(ReshardingOplogApplierRetryableTest, ApplyTxnWithHigherExistingTxnNum) { executor, writerPool.get()); - auto future = applier->applyUntilCloneFinishedTs(); + auto cancelToken = operationContext()->getCancelationToken(); + auto future = applier->applyUntilCloneFinishedTs(cancelToken); future.get(); - future = applier->applyUntilDone(); + future = applier->applyUntilDone(cancelToken); future.get(); // Op should always be applied, even if txn info was not compatible. @@ -3178,10 +3308,11 @@ TEST_F(ReshardingOplogApplierRetryableTest, ApplyTxnWithEqualExistingTxnNum) { executor, writerPool.get()); - auto future = applier->applyUntilCloneFinishedTs(); + auto cancelToken = operationContext()->getCancelationToken(); + auto future = applier->applyUntilCloneFinishedTs(cancelToken); future.get(); - future = applier->applyUntilDone(); + future = applier->applyUntilDone(cancelToken); future.get(); DBDirectClient client(operationContext()); @@ -3231,10 +3362,11 @@ TEST_F(ReshardingOplogApplierRetryableTest, ApplyTxnWithActiveUnpreparedTxnSameT executor, writerPool.get()); - auto future = applier->applyUntilCloneFinishedTs(); + auto cancelToken = operationContext()->getCancelationToken(); + auto future = applier->applyUntilCloneFinishedTs(cancelToken); future.get(); - future = applier->applyUntilDone(); + future = applier->applyUntilDone(cancelToken); future.get(); // Ops should always be applied regardless of conflict with existing txn. @@ -3288,10 +3420,11 @@ TEST_F(ReshardingOplogApplierRetryableTest, ApplyTxnActiveUnpreparedTxnWithLower executor, writerPool.get()); - auto future = applier->applyUntilCloneFinishedTs(); + auto cancelToken = operationContext()->getCancelationToken(); + auto future = applier->applyUntilCloneFinishedTs(cancelToken); future.get(); - future = applier->applyUntilDone(); + future = applier->applyUntilDone(cancelToken); future.get(); // Op should always be applied, even if txn info was not compatible. @@ -3344,7 +3477,8 @@ TEST_F(ReshardingOplogApplierRetryableTest, ApplyTxnWithPreparedTxnThatWillCommi executor, writerPool.get()); - auto future = applier->applyUntilCloneFinishedTs(); + auto cancelToken = operationContext()->getCancelationToken(); + auto future = applier->applyUntilCloneFinishedTs(cancelToken); // Sleep a little bit to make the applier block on the prepared transaction. sleepmillis(200); @@ -3355,7 +3489,7 @@ TEST_F(ReshardingOplogApplierRetryableTest, ApplyTxnWithPreparedTxnThatWillCommi future.get(); - future = applier->applyUntilDone(); + future = applier->applyUntilDone(cancelToken); future.get(); // Op should always be applied, even if txn info was not compatible. @@ -3404,7 +3538,8 @@ TEST_F(ReshardingOplogApplierRetryableTest, ApplyTxnWithPreparedTxnThatWillAbort executor, writerPool.get()); - auto future = applier->applyUntilCloneFinishedTs(); + auto cancelToken = operationContext()->getCancelationToken(); + auto future = applier->applyUntilCloneFinishedTs(cancelToken); // Sleep a little bit to make the applier block on the prepared transaction. sleepmillis(200); @@ -3415,7 +3550,7 @@ TEST_F(ReshardingOplogApplierRetryableTest, ApplyTxnWithPreparedTxnThatWillAbort future.get(); - future = applier->applyUntilDone(); + future = applier->applyUntilDone(cancelToken); future.get(); // Op should always be applied, even if txn info was not compatible. @@ -3459,10 +3594,13 @@ TEST_F(ReshardingOplogApplierTest, MetricsAreReported) { writerPool.get()); ASSERT_EQ(metricsAppliedCount(), 0); - applier.applyUntilCloneFinishedTs().get(); // Stop at clone timestamp 7 + + auto cancelToken = operationContext()->getCancelationToken(); + + applier.applyUntilCloneFinishedTs(cancelToken).get(); // Stop at clone timestamp 7 ASSERT_EQ(metricsAppliedCount(), 4); // Applied timestamps {5,6,7}, and {8} drafts in on the batch. - applier.applyUntilDone().get(); + applier.applyUntilDone(cancelToken).get(); ASSERT_EQ(metricsAppliedCount(), 5); // Now includes timestamp {9} } diff --git a/src/mongo/db/s/resharding/resharding_oplog_fetcher.cpp b/src/mongo/db/s/resharding/resharding_oplog_fetcher.cpp index 6e9f823b779..8e69accb0bc 100644 --- a/src/mongo/db/s/resharding/resharding_oplog_fetcher.cpp +++ b/src/mongo/db/s/resharding/resharding_oplog_fetcher.cpp @@ -112,10 +112,6 @@ Future<void> ReshardingOplogFetcher::awaitInsert(const ReshardingDonorOplogId& l // being issued. stdx::lock_guard lk(_mutex); - if (_interruptStatus) { - return Future<void>::makeReady(*_interruptStatus); - } - if (lastSeen < _startAt) { // `lastSeen < _startAt` means there's at least one document which has been inserted by // ReshardingOplogFetcher and hasn't been returned by @@ -135,20 +131,6 @@ Future<void> ReshardingOplogFetcher::awaitInsert(const ReshardingDonorOplogId& l return std::move(_onInsertFuture); } -void ReshardingOplogFetcher::interrupt(Status status) { - invariant(!status.isOK()); - - // We replace the promise/future pair with a fresh one because consume() won't know an error has - // already been set and would otherwise attempt to fulfill the promise again. Later calls to - // awaitInsert() won't ever look at `_onInsertFuture` though. - auto [p, f] = makePromiseFuture<void>(); - stdx::lock_guard lk(_mutex); - _interruptStatus = status; - _onInsertPromise.setError(*_interruptStatus); - _onInsertPromise = std::move(p); - _onInsertFuture = std::move(f); -} - ExecutorFuture<void> ReshardingOplogFetcher::schedule( std::shared_ptr<executor::TaskExecutor> executor, const CancelationToken& cancelToken) { return ExecutorFuture(executor) @@ -182,6 +164,13 @@ ExecutorFuture<void> ReshardingOplogFetcher::_reschedule( if (!moreToCome) { return ExecutorFuture(std::move(executor)); } + + if (cancelToken.isCanceled()) { + return ExecutorFuture<void>( + executor, + Status{ErrorCodes::CallbackCanceled, + "Resharding oplog fetcher canceled due to abort or stepdown"}); + } return _reschedule(std::move(executor), cancelToken); }); } diff --git a/src/mongo/db/s/resharding/resharding_oplog_fetcher.h b/src/mongo/db/s/resharding/resharding_oplog_fetcher.h index f01eafe09fd..43353369486 100644 --- a/src/mongo/db/s/resharding/resharding_oplog_fetcher.h +++ b/src/mongo/db/s/resharding/resharding_oplog_fetcher.h @@ -81,8 +81,6 @@ public: Future<void> awaitInsert(const ReshardingDonorOplogId& lastSeen) override; - void interrupt(Status status); - /** * Schedules a task that will do the following: * @@ -160,7 +158,6 @@ private: Mutex _mutex = MONGO_MAKE_LATCH("ReshardingOplogFetcher::_mutex"); Promise<void> _onInsertPromise; Future<void> _onInsertFuture; - boost::optional<Status> _interruptStatus; // For testing to control behavior. diff --git a/src/mongo/db/s/resharding/resharding_recipient_service.cpp b/src/mongo/db/s/resharding/resharding_recipient_service.cpp index c95bfe6dd91..f46c264c865 100644 --- a/src/mongo/db/s/resharding/resharding_recipient_service.cpp +++ b/src/mongo/db/s/resharding/resharding_recipient_service.cpp @@ -238,19 +238,22 @@ ReshardingRecipientService::RecipientStateMachine::~RecipientStateMachine() { SemiFuture<void> ReshardingRecipientService::RecipientStateMachine::run( std::shared_ptr<executor::ScopedTaskExecutor> executor, - const CancelationToken& cancelToken) noexcept { + const CancelationToken& stepdownToken) noexcept { + auto abortToken = _initAbortSource(stepdownToken); + return ExecutorFuture<void>(**executor) .then([this, executor] { _metrics()->onStart(); return _awaitAllDonorsPreparedToDonateThenTransitionToCreatingCollection(executor); }) .then([this] { _createTemporaryReshardingCollectionThenTransitionToCloning(); }) - .then([this, executor, cancelToken] { - return _cloneThenTransitionToApplying(executor, cancelToken); + .then([this, executor, abortToken] { + return _cloneThenTransitionToApplying(executor, abortToken); }) .then([this, executor] { return _applyThenTransitionToSteadyState(executor); }) - .then([this, executor] { - return _awaitAllDonorsBlockingWritesThenTransitionToStrictConsistency(executor); + .then([this, executor, abortToken] { + return _awaitAllDonorsBlockingWritesThenTransitionToStrictConsistency(executor, + abortToken); }) .then([this, executor] { return _awaitCoordinatorHasDecisionPersistedThenTransitionToRenaming(executor); @@ -349,6 +352,12 @@ void ReshardingRecipientService::RecipientStateMachine::onReshardingFieldsChange stdx::lock_guard<Latch> lk(_mutex); if (reshardingFields.getAbortReason()) { auto status = getStatusFromAbortReason(reshardingFields); + invariant(!status.isOK()); + + if (_abortSource) { + _abortSource->cancel(); + } + _onAbortOrStepdown(lk, status); return; } @@ -438,7 +447,7 @@ void ReshardingRecipientService::RecipientStateMachine::_initTxnCloner( ExecutorFuture<void> ReshardingRecipientService::RecipientStateMachine::_cloneThenTransitionToApplying( const std::shared_ptr<executor::ScopedTaskExecutor>& executor, - const CancelationToken& cancelToken) { + const CancelationToken& abortToken) { if (_recipientCtx.getState() > RecipientStateEnum::kCloning) { return ExecutorFuture(**executor); } @@ -494,15 +503,15 @@ ReshardingRecipientService::RecipientStateMachine::_cloneThenTransitionToApplyin _oplogFetcherFutures.emplace_back( _oplogFetchers.back() - ->schedule(_oplogFetcherExecutor, cancelToken) + ->schedule(_oplogFetcherExecutor, abortToken) .onError([](Status status) { LOGV2(5259300, "Error fetching oplog entries", "error"_attr = redact(status)); return status; })); } - return _collectionCloner->run(**executor, cancelToken) - .then([this, executor, cancelToken] { + return _collectionCloner->run(**executor, abortToken) + .then([this, executor, abortToken] { if (_txnCloners.empty()) { return SemiFuture<void>::makeReady(); } @@ -511,7 +520,7 @@ ReshardingRecipientService::RecipientStateMachine::_cloneThenTransitionToApplyin std::vector<ExecutorFuture<void>> txnClonerFutures; for (auto&& txnCloner : _txnCloners) { - txnClonerFutures.push_back(txnCloner->run(serviceContext, **executor, cancelToken)); + txnClonerFutures.push_back(txnCloner->run(serviceContext, **executor, abortToken)); } return whenAllSucceed(std::move(txnClonerFutures)); @@ -548,13 +557,14 @@ ReshardingRecipientService::RecipientStateMachine::_applyThenTransitionToSteadyS ExecutorFuture<void> ReshardingRecipientService::RecipientStateMachine:: _awaitAllDonorsBlockingWritesThenTransitionToStrictConsistency( - const std::shared_ptr<executor::ScopedTaskExecutor>& executor) { + const std::shared_ptr<executor::ScopedTaskExecutor>& executor, + const CancelationToken& abortToken) { if (_recipientCtx.getState() > RecipientStateEnum::kSteadyState) { return ExecutorFuture<void>(**executor, Status::OK()); } auto opCtx = cc().makeOperationContext(); - return _updateCoordinator(opCtx.get(), executor).then([this, executor] { + return _updateCoordinator(opCtx.get(), executor).then([this, executor, abortToken] { auto numDonors = _donorShardIds.size(); _oplogAppliers.reserve(numDonors); _oplogApplierWorkers.reserve(numDonors); @@ -619,8 +629,10 @@ ExecutorFuture<void> ReshardingRecipientService::RecipientStateMachine:: // ReshardingCollectionCloner has finished. This is why applyUntilCloneFinishedTs() and // applyUntilDone() are both called here in sequence. auto* applier = _oplogAppliers.back().get(); - futuresToWaitOn.emplace_back(applier->applyUntilCloneFinishedTs().then( - [applier] { return applier->applyUntilDone(); })); + futuresToWaitOn.emplace_back( + applier->applyUntilCloneFinishedTs(abortToken).then([applier, abortToken] { + return applier->applyUntilDone(abortToken); + })); } return whenAllSucceed(std::move(futuresToWaitOn)) @@ -930,10 +942,6 @@ void ReshardingRecipientService::RecipientStateMachine::_onAbortOrStepdown(WithL _oplogFetcherExecutor->shutdown(); } - for (auto&& fetcher : _oplogFetchers) { - fetcher->interrupt(status); - } - for (auto&& threadPool : _oplogApplierWorkers) { threadPool->shutdown(); } @@ -947,4 +955,11 @@ void ReshardingRecipientService::RecipientStateMachine::_onAbortOrStepdown(WithL } } +CancelationToken ReshardingRecipientService::RecipientStateMachine::_initAbortSource( + const CancelationToken& stepdownToken) { + stdx::lock_guard<Latch> lk(_mutex); + _abortSource = CancelationSource(stepdownToken); + return _abortSource->token(); +} + } // namespace mongo diff --git a/src/mongo/db/s/resharding/resharding_recipient_service.h b/src/mongo/db/s/resharding/resharding_recipient_service.h index 12411127e6c..1e667da7195 100644 --- a/src/mongo/db/s/resharding/resharding_recipient_service.h +++ b/src/mongo/db/s/resharding/resharding_recipient_service.h @@ -143,13 +143,14 @@ private: ExecutorFuture<void> _cloneThenTransitionToApplying( const std::shared_ptr<executor::ScopedTaskExecutor>& executor, - const CancelationToken& cancelToken); + const CancelationToken& abortToken); ExecutorFuture<void> _applyThenTransitionToSteadyState( const std::shared_ptr<executor::ScopedTaskExecutor>& executor); ExecutorFuture<void> _awaitAllDonorsBlockingWritesThenTransitionToStrictConsistency( - const std::shared_ptr<executor::ScopedTaskExecutor>& executor); + const std::shared_ptr<executor::ScopedTaskExecutor>& executor, + const CancelationToken& abortToken); ExecutorFuture<void> _awaitCoordinatorHasDecisionPersistedThenTransitionToRenaming( const std::shared_ptr<executor::ScopedTaskExecutor>& executor); @@ -195,6 +196,11 @@ private: // (abort resharding). void _onAbortOrStepdown(WithLock, Status status); + // Initializes the _abortSource and generates a token from it to return back the caller. + // + // Should only be called once per lifetime. + CancelationToken _initAbortSource(const CancelationToken& stepdownToken); + // The in-memory representation of the immutable portion of the document in // config.localReshardingOperations.recipient. const CommonReshardingMetadata _metadata; @@ -220,6 +226,9 @@ private: // Protects the promises below Mutex _mutex = MONGO_MAKE_LATCH("RecipientStateMachine::_mutex"); + // Canceled when there is an unrecoverable error or stepdown. + boost::optional<CancelationSource> _abortSource; + boost::optional<ReshardingCriticalSection> _critSec; // Each promise below corresponds to a state on the recipient state machine. They are listed in diff --git a/src/mongo/db/s/resharding/resharding_txn_cloner.cpp b/src/mongo/db/s/resharding/resharding_txn_cloner.cpp index 85056c4f861..052dc8c1396 100644 --- a/src/mongo/db/s/resharding/resharding_txn_cloner.cpp +++ b/src/mongo/db/s/resharding/resharding_txn_cloner.cpp @@ -347,7 +347,7 @@ ExecutorFuture<void> ReshardingTxnCloner::run( chainCtx->donorRecord = boost::none; return makeReadyFutureWith([] {}).share(); }) - .until([this, chainCtx](Status status) { + .until([this, cancelToken, chainCtx](Status status) { if (status.isOK() && chainCtx->moreToCome) { return false; } @@ -362,9 +362,12 @@ ExecutorFuture<void> ReshardingTxnCloner::run( if (status.isA<ErrorCategory::CancelationError>() || status.isA<ErrorCategory::NotPrimaryError>()) { // Cancellation and NotPrimary errors indicate the primary-only service Instance - // will be shut down or is shutting down now. Don't retry and leave resuming to when - // the RecipientStateMachine is restarted on the new primary. - return true; + // will be shut down or is shutting down now - provided the cancelToken is also + // canceled. Otherwise, the errors may have originated from a remote response rather + // than the shard itself. + // + // Don't retry when primary-only service Instance is shutting down. + return !cancelToken.isCanceled(); } if (status.isA<ErrorCategory::RetriableError>() || @@ -391,7 +394,19 @@ ExecutorFuture<void> ReshardingTxnCloner::run( return true; }) - .on(std::move(executor), std::move(cancelToken)); + .on(executor, cancelToken) + .onCompletion([this, chainCtx](Status status) { + if (chainCtx->pipeline) { + // Guarantee the pipeline is always cleaned up - even upon cancelation. + _withTemporaryOperationContext([&](auto* opCtx) { + chainCtx->pipeline->dispose(opCtx); + chainCtx->pipeline.reset(); + }); + } + + // Propagate the result of the AsyncTry. + return status; + }); } std::unique_ptr<Pipeline, PipelineDeleter> createConfigTxnCloningPipelineForResharding( diff --git a/src/mongo/db/s/resharding/resharding_txn_cloner_test.cpp b/src/mongo/db/s/resharding/resharding_txn_cloner_test.cpp index 5a7ae102be4..e1a598be9b9 100644 --- a/src/mongo/db/s/resharding/resharding_txn_cloner_test.cpp +++ b/src/mongo/db/s/resharding/resharding_txn_cloner_test.cpp @@ -343,17 +343,21 @@ protected: Grid::get(getServiceContext())->getExecutorPool()->getFixedExecutor()); } - ExecutorFuture<void> runCloner(ReshardingTxnCloner& cloner, - std::shared_ptr<executor::ThreadPoolTaskExecutor> executor) { + ExecutorFuture<void> runCloner( + ReshardingTxnCloner& cloner, + std::shared_ptr<executor::ThreadPoolTaskExecutor> executor, + boost::optional<CancelationToken> customCancelToken = boost::none) { + // Allows callers to control the cancelation of the cloner's run() function when specified. + auto cancelToken = customCancelToken.is_initialized() + ? customCancelToken.get() + : operationContext()->getCancelationToken(); + // There isn't a guarantee that the reference count to `executor` has been decremented after // .run() returns. We schedule a trivial task on the task executor to ensure the callback's // destructor has run. Otherwise `executor` could end up outliving the ServiceContext and // triggering an invariant due to the task executor's thread having a Client still. return cloner - .run(getServiceContext(), - std::move(executor), - operationContext()->getCancelationToken(), - makeMongoProcessInterface()) + .run(getServiceContext(), std::move(executor), cancelToken, makeMongoProcessInterface()) .onCompletion([](auto x) { return x; }); } @@ -540,6 +544,27 @@ TEST_F(ReshardingTxnClonerTest, MergeMultiDocTransactionAndRetryableWrite) { checkTxnHasBeenUpdated(sessionIdMultiDocTxn, txnNum); } +/** + * Test that the ReshardingTxnCloner stops processing batches when canceled via cancelToken. + */ +TEST_F(ReshardingTxnClonerTest, ClonerOneBatchThenCanceled) { + const auto txns = makeSortedTxns(4); + auto executor = makeTaskExecutorForCloner(); + ReshardingTxnCloner cloner(kTwoSourceIdList[1], Timestamp::max()); + auto opCtxToken = operationContext()->getCancelationToken(); + auto cancelSource = CancelationSource(opCtxToken); + auto future = runCloner(cloner, executor, cancelSource.token()); + + onCommandReturnTxnBatch(std::vector<BSONObj>(txns.begin(), txns.begin() + 2), + CursorId{123}, + true /* isFirstBatch */); + + cancelSource.cancel(); + + auto status = future.getNoThrow(); + ASSERT_EQ(status.code(), ErrorCodes::CallbackCanceled); +} + TEST_F(ReshardingTxnClonerTest, ClonerStoresProgressSingleBatch) { const auto txns = makeSortedTxns(2); const auto lastLsid = getTxnRecordLsid(txns.back()); @@ -595,7 +620,8 @@ TEST_F(ReshardingTxnClonerTest, ClonerStoresProgressMultipleBatches) { auto executor = makeTaskExecutorForCloner(); ReshardingTxnCloner cloner(kTwoSourceIdList[1], Timestamp::max()); - auto future = runCloner(cloner, executor); + auto cancelSource = CancelationSource(operationContext()->getCancelationToken()); + auto future = runCloner(cloner, executor, cancelSource.token()); // The progress document is updated asynchronously after the session record is updated. We fake // the cloning operation being canceled to inspect the progress document after the first batch @@ -603,11 +629,14 @@ TEST_F(ReshardingTxnClonerTest, ClonerStoresProgressMultipleBatches) { onCommandReturnTxnBatch(std::vector<BSONObj>(txns.begin(), txns.begin() + 2), CursorId{123}, true /* isFirstBatch */); - onCommand([&](const executor::RemoteCommandRequest& request) { + // Simulate a stepdown. + cancelSource.cancel(); + + // With a non-mock network, disposing of the pipeline upon cancelation would also cancel the + // original request. return Status{ErrorCodes::CallbackCanceled, "Simulate cancellation"}; }); - auto status = future.getNoThrow(); ASSERT_EQ(status, ErrorCodes::CallbackCanceled); @@ -641,7 +670,8 @@ TEST_F(ReshardingTxnClonerTest, ClonerStoresProgressResume) { auto executor = makeTaskExecutorForCloner(); ReshardingTxnCloner cloner(kTwoSourceIdList[1], Timestamp::max()); - auto future = runCloner(cloner, executor); + auto cancelSource = CancelationSource(operationContext()->getCancelationToken()); + auto future = runCloner(cloner, executor, cancelSource.token()); onCommandReturnTxnBatch({txns.front()}, CursorId{123}, true /* isFirstBatch */); @@ -652,6 +682,11 @@ TEST_F(ReshardingTxnClonerTest, ClonerStoresProgressResume) { ASSERT_FALSE(getTxnCloningProgress(kTwoSourceIdList[0])); ASSERT_EQ(*getProgressLsid(kTwoSourceIdList[1]), firstLsid); + // Simulate a stepdown. + cancelSource.cancel(); + + // With a non-mock network, disposing of the pipeline upon cancelation would also cancel the + // original request. return Status{ErrorCodes::CallbackCanceled, "Simulate cancellation"}; }); |