summaryrefslogtreecommitdiff
path: root/src/mongo/db/s/resharding
diff options
context:
space:
mode:
authorHaley Connelly <haley.connelly@mongodb.com>2021-03-03 23:18:30 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2021-03-15 17:49:04 +0000
commit67ff8452c4172dbbfd2199df2dc349eb739b7bf1 (patch)
tree6ad799027d4c0cf81ff8366685a48da779f2fd97 /src/mongo/db/s/resharding
parent8f7f06898e9a9d4da95302f5e1ebe12e1a359586 (diff)
downloadmongo-67ff8452c4172dbbfd2199df2dc349eb739b7bf1.tar.gz
SERVER-53931 Use cancelationTokens for resharding recipient replication components
Diffstat (limited to 'src/mongo/db/s/resharding')
-rw-r--r--src/mongo/db/s/resharding/resharding_collection_cloner.cpp27
-rw-r--r--src/mongo/db/s/resharding/resharding_donor_oplog_iterator.cpp12
-rw-r--r--src/mongo/db/s/resharding/resharding_donor_oplog_iterator.h4
-rw-r--r--src/mongo/db/s/resharding/resharding_donor_oplog_iterator_test.cpp4
-rw-r--r--src/mongo/db/s/resharding/resharding_oplog_applier.cpp26
-rw-r--r--src/mongo/db/s/resharding/resharding_oplog_applier.h6
-rw-r--r--src/mongo/db/s/resharding/resharding_oplog_applier_test.cpp318
-rw-r--r--src/mongo/db/s/resharding/resharding_oplog_fetcher.cpp25
-rw-r--r--src/mongo/db/s/resharding/resharding_oplog_fetcher.h3
-rw-r--r--src/mongo/db/s/resharding/resharding_recipient_service.cpp51
-rw-r--r--src/mongo/db/s/resharding/resharding_recipient_service.h13
-rw-r--r--src/mongo/db/s/resharding/resharding_txn_cloner.cpp25
-rw-r--r--src/mongo/db/s/resharding/resharding_txn_cloner_test.cpp55
13 files changed, 400 insertions, 169 deletions
diff --git a/src/mongo/db/s/resharding/resharding_collection_cloner.cpp b/src/mongo/db/s/resharding/resharding_collection_cloner.cpp
index 24592a1a3ca..c1f0cba5d72 100644
--- a/src/mongo/db/s/resharding/resharding_collection_cloner.cpp
+++ b/src/mongo/db/s/resharding/resharding_collection_cloner.cpp
@@ -265,6 +265,8 @@ std::vector<InsertStatement> ReshardingCollectionCloner::_fillBatch(Pipeline& pi
void ReshardingCollectionCloner::_insertBatch(OperationContext* opCtx,
std::vector<InsertStatement>& batch) {
+ // TODO SERVER-55102: Use CancelableOperationContext to prevent retrying once the operation has
+ // been canceled.
writeConflictRetry(opCtx, "ReshardingCollectionCloner::_insertBatch", _outputNss.ns(), [&] {
AutoGetCollection outputColl(opCtx, _outputNss, MODE_IX);
uassert(ErrorCodes::NamespaceNotFound,
@@ -404,7 +406,7 @@ ExecutorFuture<void> ReshardingCollectionCloner::run(
return true;
});
})
- .until([this, chainCtx](Status status) {
+ .until([this, chainCtx, cancelToken](Status status) {
if (status.isOK() && chainCtx->moreToCome) {
return false;
}
@@ -419,9 +421,12 @@ ExecutorFuture<void> ReshardingCollectionCloner::run(
if (status.isA<ErrorCategory::CancelationError>() ||
status.isA<ErrorCategory::NotPrimaryError>()) {
// Cancellation and NotPrimary errors indicate the primary-only service Instance
- // will be shut down or is shutting down now. Don't retry and leave resuming to when
- // the RecipientStateMachine is restarted on the new primary.
- return true;
+ // will be shut down or is shutting down now - provided the cancelToken is also
+ // canceled. Otherwise, the errors may have originated from a remote response rather
+ // than the shard itself.
+ //
+ // Don't retry when primary-only service Instance is shutting down.
+ return cancelToken.isCanceled();
}
if (status.isA<ErrorCategory::RetriableError>() ||
@@ -449,7 +454,19 @@ ExecutorFuture<void> ReshardingCollectionCloner::run(
return true;
})
- .on(std::move(executor), std::move(cancelToken));
+ .on(executor, cancelToken)
+ .onCompletion([this, chainCtx](Status status) {
+ if (chainCtx->pipeline) {
+ // Guarantee the pipeline is always cleaned up - even upon cancelation.
+ _withTemporaryOperationContext([&](auto* opCtx) {
+ chainCtx->pipeline->dispose(opCtx);
+ chainCtx->pipeline.reset();
+ });
+ }
+
+ // Propagate the result of the AsyncTry.
+ return status;
+ });
}
} // namespace mongo
diff --git a/src/mongo/db/s/resharding/resharding_donor_oplog_iterator.cpp b/src/mongo/db/s/resharding/resharding_donor_oplog_iterator.cpp
index dac42b6ba39..bfbd5d4270a 100644
--- a/src/mongo/db/s/resharding/resharding_donor_oplog_iterator.cpp
+++ b/src/mongo/db/s/resharding/resharding_donor_oplog_iterator.cpp
@@ -49,6 +49,7 @@
#include "mongo/db/s/resharding_util.h"
#include "mongo/logv2/log.h"
#include "mongo/util/assert_util.h"
+#include "mongo/util/future_util.h"
#include "mongo/util/str.h"
namespace mongo {
@@ -207,7 +208,7 @@ std::vector<repl::OplogEntry> ReshardingDonorOplogIterator::_fillBatch(Pipeline&
}
ExecutorFuture<std::vector<repl::OplogEntry>> ReshardingDonorOplogIterator::getNextBatch(
- std::shared_ptr<executor::TaskExecutor> executor) {
+ std::shared_ptr<executor::TaskExecutor> executor, CancelationToken cancelToken) {
if (_hasSeenFinalOplogEntry) {
invariant(!_pipeline);
return ExecutorFuture(std::move(executor), std::vector<repl::OplogEntry>{});
@@ -246,8 +247,13 @@ ExecutorFuture<std::vector<repl::OplogEntry>> ReshardingDonorOplogIterator::getN
if (batch.empty() && !_hasSeenFinalOplogEntry) {
return ExecutorFuture(executor)
- .then([this] { return _insertNotifier->awaitInsert(_resumeToken); })
- .then([this, executor] { return getNextBatch(std::move(executor)); });
+ .then([this, cancelToken] {
+ return future_util::withCancelation(_insertNotifier->awaitInsert(_resumeToken),
+ cancelToken);
+ })
+ .then([this, cancelToken, executor] {
+ return getNextBatch(std::move(executor), cancelToken);
+ });
}
return ExecutorFuture(std::move(executor), std::move(batch));
diff --git a/src/mongo/db/s/resharding/resharding_donor_oplog_iterator.h b/src/mongo/db/s/resharding/resharding_donor_oplog_iterator.h
index 9ce38f04eb5..c799bb9d834 100644
--- a/src/mongo/db/s/resharding/resharding_donor_oplog_iterator.h
+++ b/src/mongo/db/s/resharding/resharding_donor_oplog_iterator.h
@@ -69,7 +69,7 @@ public:
* final oplog entry hasn't been returned yet.
*/
virtual ExecutorFuture<std::vector<repl::OplogEntry>> getNextBatch(
- std::shared_ptr<executor::TaskExecutor> executor) = 0;
+ std::shared_ptr<executor::TaskExecutor> executor, CancelationToken cancelToken) = 0;
};
/**
@@ -94,7 +94,7 @@ public:
OperationContext* opCtx, std::shared_ptr<MongoProcessInterface> mongoProcessInterface);
ExecutorFuture<std::vector<repl::OplogEntry>> getNextBatch(
- std::shared_ptr<executor::TaskExecutor> executor) override;
+ std::shared_ptr<executor::TaskExecutor> executor, CancelationToken cancelToken) override;
static constexpr auto kActualOpFieldName = "actualOp"_sd;
static constexpr auto kPreImageOpFieldName = "preImageOp"_sd;
diff --git a/src/mongo/db/s/resharding/resharding_donor_oplog_iterator_test.cpp b/src/mongo/db/s/resharding/resharding_donor_oplog_iterator_test.cpp
index a0fe04087ed..4338bb7399b 100644
--- a/src/mongo/db/s/resharding/resharding_donor_oplog_iterator_test.cpp
+++ b/src/mongo/db/s/resharding/resharding_donor_oplog_iterator_test.cpp
@@ -176,7 +176,9 @@ public:
// destructor has run. Otherwise `executor` could end up outliving the ServiceContext and
// triggering an invariant due to the task executor's thread having a Client still.
return ExecutorFuture(executor)
- .then([iter, executor] { return iter->getNextBatch(std::move(executor)); })
+ .then([iter, executor] {
+ return iter->getNextBatch(std::move(executor), CancelationToken::uncancelable());
+ })
.then([](auto x) { return x; })
.get();
}
diff --git a/src/mongo/db/s/resharding/resharding_oplog_applier.cpp b/src/mongo/db/s/resharding/resharding_oplog_applier.cpp
index af8e92a0754..2b069fe72b2 100644
--- a/src/mongo/db/s/resharding/resharding_oplog_applier.cpp
+++ b/src/mongo/db/s/resharding/resharding_oplog_applier.cpp
@@ -271,35 +271,36 @@ ReshardingOplogApplier::ReshardingOplogApplier(
_writerPool(writerPool),
_oplogIter(std::move(oplogIterator)) {}
-ExecutorFuture<void> ReshardingOplogApplier::applyUntilCloneFinishedTs() {
+ExecutorFuture<void> ReshardingOplogApplier::applyUntilCloneFinishedTs(
+ CancelationToken cancelToken) {
invariant(_stage == ReshardingOplogApplier::Stage::kStarted);
// It is safe to capture `this` because PrimaryOnlyService and RecipientStateMachine
// collectively guarantee that the ReshardingOplogApplier instances will outlive `_executor` and
// `_writerPool`.
return ExecutorFuture(_executor)
- .then([this] { return _scheduleNextBatch(); })
+ .then([this, cancelToken] { return _scheduleNextBatch(cancelToken); })
.onError([this](Status status) { return _onError(status); });
}
-ExecutorFuture<void> ReshardingOplogApplier::applyUntilDone() {
+ExecutorFuture<void> ReshardingOplogApplier::applyUntilDone(CancelationToken cancelToken) {
invariant(_stage == ReshardingOplogApplier::Stage::kReachedCloningTS);
// It is safe to capture `this` because PrimaryOnlyService and RecipientStateMachine
// collectively guarantee that the ReshardingOplogApplier instances will outlive `_executor` and
// `_writerPool`.
return ExecutorFuture(_executor)
- .then([this] { return _scheduleNextBatch(); })
+ .then([this, cancelToken] { return _scheduleNextBatch(cancelToken); })
.onError([this](Status status) { return _onError(status); });
}
-ExecutorFuture<void> ReshardingOplogApplier::_scheduleNextBatch() {
+ExecutorFuture<void> ReshardingOplogApplier::_scheduleNextBatch(CancelationToken cancelToken) {
return ExecutorFuture(_executor)
- .then([this] {
+ .then([this, cancelToken] {
auto batchClient = makeKillableClient(_service(), kClientName);
AlternativeClientRegion acr(batchClient);
- return _oplogIter->getNextBatch(_executor);
+ return _oplogIter->getNextBatch(_executor, cancelToken);
})
.then([this](OplogBatch batch) {
LOGV2_DEBUG(5391002, 3, "Starting batch", "batchSize"_attr = batch.size());
@@ -349,11 +350,18 @@ ExecutorFuture<void> ReshardingOplogApplier::_scheduleNextBatch() {
return true;
})
- .then([this](bool moreToApply) {
+ .then([this, cancelToken](bool moreToApply) {
if (!moreToApply) {
return ExecutorFuture(_executor);
}
- return _scheduleNextBatch();
+
+ if (cancelToken.isCanceled()) {
+ return ExecutorFuture<void>(
+ _executor,
+ Status{ErrorCodes::CallbackCanceled,
+ "Resharding oplog applier aborting due to abort or stepdown"});
+ }
+ return _scheduleNextBatch(cancelToken);
});
}
diff --git a/src/mongo/db/s/resharding/resharding_oplog_applier.h b/src/mongo/db/s/resharding/resharding_oplog_applier.h
index 3930001fcff..061a7b17d50 100644
--- a/src/mongo/db/s/resharding/resharding_oplog_applier.h
+++ b/src/mongo/db/s/resharding/resharding_oplog_applier.h
@@ -102,7 +102,7 @@ public:
* greater than or equal to reshardingCloneFinishedTs.
* It is undefined to call applyUntilCloneFinishedTs more than once.
*/
- ExecutorFuture<void> applyUntilCloneFinishedTs();
+ ExecutorFuture<void> applyUntilCloneFinishedTs(CancelationToken cancelToken);
/**
* Applies oplog from the iterator until it is exhausted or hits an error. It is an error to
@@ -111,7 +111,7 @@ public:
* It is an error to call this when applyUntilCloneFinishedTs future returns an error.
* It is undefined to call applyUntilDone more than once.
*/
- ExecutorFuture<void> applyUntilDone();
+ ExecutorFuture<void> applyUntilDone(CancelationToken cancelToken);
static boost::optional<ReshardingOplogApplierProgress> checkStoredProgress(
OperationContext* opCtx, const ReshardingSourceId& id);
@@ -130,7 +130,7 @@ private:
* Returns a future that becomes ready when the next batch of oplog entries have been collected
* and applied.
*/
- ExecutorFuture<void> _scheduleNextBatch();
+ ExecutorFuture<void> _scheduleNextBatch(CancelationToken cancelToken);
/**
* Setup the worker threads to apply the ops in the current buffer in parallel. Waits for all
diff --git a/src/mongo/db/s/resharding/resharding_oplog_applier_test.cpp b/src/mongo/db/s/resharding/resharding_oplog_applier_test.cpp
index 6a137dcc4ed..fb0f3ae2b94 100644
--- a/src/mongo/db/s/resharding/resharding_oplog_applier_test.cpp
+++ b/src/mongo/db/s/resharding/resharding_oplog_applier_test.cpp
@@ -72,7 +72,7 @@ public:
}
ExecutorFuture<std::vector<repl::OplogEntry>> getNextBatch(
- std::shared_ptr<executor::TaskExecutor> executor) override {
+ std::shared_ptr<executor::TaskExecutor> executor, CancelationToken cancelToken) override {
// This operation context is unused by the function but confirms that the Client calling
// getNextBatch() doesn't already have an operation context.
auto opCtx = cc().makeOperationContext();
@@ -332,10 +332,11 @@ TEST_F(ReshardingOplogApplierTest, NothingToIterate) {
executor,
writerPool.get());
- auto future = applier->applyUntilCloneFinishedTs();
+ auto cancelToken = operationContext()->getCancelationToken();
+ auto future = applier->applyUntilCloneFinishedTs(cancelToken);
future.get();
- future = applier->applyUntilDone();
+ future = applier->applyUntilDone(cancelToken);
future.get();
}
@@ -375,7 +376,8 @@ TEST_F(ReshardingOplogApplierTest, ApplyBasicCrud) {
executor,
writerPool.get());
- auto future = applier->applyUntilCloneFinishedTs();
+ auto cancelToken = operationContext()->getCancelationToken();
+ auto future = applier->applyUntilCloneFinishedTs(cancelToken);
future.get();
DBDirectClient client(operationContext());
@@ -385,7 +387,7 @@ TEST_F(ReshardingOplogApplierTest, ApplyBasicCrud) {
doc = client.findOne(appliedToNs().ns(), BSON("_id" << 2));
ASSERT_BSONOBJ_EQ(BSON("_id" << 2), doc);
- future = applier->applyUntilDone();
+ future = applier->applyUntilDone(cancelToken);
future.get();
doc = client.findOne(appliedToNs().ns(), BSON("_id" << 1));
@@ -400,6 +402,94 @@ TEST_F(ReshardingOplogApplierTest, ApplyBasicCrud) {
ASSERT_EQ(Timestamp(8, 3), progressDoc->getProgress().getTs());
}
+TEST_F(ReshardingOplogApplierTest, CanceledCloningBatch) {
+ std::deque<repl::OplogEntry> crudOps;
+ crudOps.push_back(makeOplog(repl::OpTime(Timestamp(5, 3), 1),
+ repl::OpTypeEnum::kInsert,
+ BSON("_id" << 1),
+ boost::none));
+ crudOps.push_back(makeOplog(repl::OpTime(Timestamp(6, 3), 1),
+ repl::OpTypeEnum::kInsert,
+ BSON("_id" << 2),
+ boost::none));
+ crudOps.push_back(makeOplog(repl::OpTime(Timestamp(7, 3), 1),
+ repl::OpTypeEnum::kInsert,
+ BSON("_id" << 2),
+ boost::none));
+
+ auto iterator = std::make_unique<OplogIteratorMock>(std::move(crudOps), 2 /* batchSize */);
+ boost::optional<ReshardingOplogApplier> applier;
+ auto executor = makeTaskExecutorForApplier();
+ auto writerPool = repl::makeReplWriterPool(kWriterPoolSize);
+
+ applier.emplace(makeApplierEnv(),
+ sourceId(),
+ oplogNs(),
+ crudNs(),
+ crudUUID(),
+ stashCollections(),
+ 0U, /* myStashIdx */
+ Timestamp(7, 3),
+ std::move(iterator),
+ chunkManager(),
+ executor,
+ writerPool.get());
+
+ // Cancel the rescheduling of the next batch.
+ auto abortSource = CancelationSource();
+ abortSource.cancel();
+
+ auto future = applier->applyUntilCloneFinishedTs(abortSource.token());
+ ASSERT_THROWS_CODE(future.get(), DBException, ErrorCodes::CallbackCanceled);
+}
+
+TEST_F(ReshardingOplogApplierTest, CanceledApplyingBatch) {
+ std::deque<repl::OplogEntry> crudOps;
+ crudOps.push_back(makeOplog(repl::OpTime(Timestamp(5, 3), 1),
+ repl::OpTypeEnum::kInsert,
+ BSON("_id" << 1),
+ boost::none));
+ crudOps.push_back(makeOplog(repl::OpTime(Timestamp(6, 3), 1),
+ repl::OpTypeEnum::kInsert,
+ BSON("_id" << 2),
+ boost::none));
+ crudOps.push_back(makeOplog(repl::OpTime(Timestamp(7, 3), 1),
+ repl::OpTypeEnum::kUpdate,
+ BSON("$set" << BSON("x" << 1)),
+ BSON("_id" << 2)));
+ crudOps.push_back(makeOplog(repl::OpTime(Timestamp(8, 3), 1),
+ repl::OpTypeEnum::kDelete,
+ BSON("_id" << 1),
+ boost::none));
+
+ auto iterator = std::make_unique<OplogIteratorMock>(std::move(crudOps), 2 /* batchSize */);
+ boost::optional<ReshardingOplogApplier> applier;
+ auto executor = makeTaskExecutorForApplier();
+ auto writerPool = repl::makeReplWriterPool(kWriterPoolSize);
+
+ applier.emplace(makeApplierEnv(),
+ sourceId(),
+ oplogNs(),
+ crudNs(),
+ crudUUID(),
+ stashCollections(),
+ 0U, /* myStashIdx */
+ Timestamp(6, 3),
+ std::move(iterator),
+ chunkManager(),
+ executor,
+ writerPool.get());
+
+ auto abortSource = CancelationSource();
+ auto future = applier->applyUntilCloneFinishedTs(abortSource.token());
+ future.get();
+
+ abortSource.cancel();
+
+ future = applier->applyUntilDone(abortSource.token());
+ ASSERT_THROWS_CODE(future.get(), DBException, ErrorCodes::CallbackCanceled);
+}
+
TEST_F(ReshardingOplogApplierTest, InsertTypeOplogAppliedInMultipleBatches) {
std::deque<repl::OplogEntry> crudOps;
@@ -427,7 +517,8 @@ TEST_F(ReshardingOplogApplierTest, InsertTypeOplogAppliedInMultipleBatches) {
executor,
writerPool.get());
- auto future = applier->applyUntilCloneFinishedTs();
+ auto cancelToken = operationContext()->getCancelationToken();
+ auto future = applier->applyUntilCloneFinishedTs(cancelToken);
future.get();
DBDirectClient client(operationContext());
@@ -445,7 +536,7 @@ TEST_F(ReshardingOplogApplierTest, InsertTypeOplogAppliedInMultipleBatches) {
ASSERT_EQ(Timestamp(8, 3), progressDoc->getProgress().getClusterTime());
ASSERT_EQ(Timestamp(8, 3), progressDoc->getProgress().getTs());
- future = applier->applyUntilDone();
+ future = applier->applyUntilDone(cancelToken);
future.get();
for (int x = 0; x < 19; x++) {
@@ -487,7 +578,8 @@ TEST_F(ReshardingOplogApplierTest, ErrorDuringBatchApplyCloningPhase) {
executor,
writerPool.get());
- auto future = applier->applyUntilCloneFinishedTs();
+ auto cancelToken = operationContext()->getCancelationToken();
+ auto future = applier->applyUntilCloneFinishedTs(cancelToken);
ASSERT_THROWS_CODE(future.get(), DBException, ErrorCodes::FailedToParse);
@@ -535,10 +627,11 @@ TEST_F(ReshardingOplogApplierTest, ErrorDuringBatchApplyCatchUpPhase) {
executor,
writerPool.get());
- auto future = applier->applyUntilCloneFinishedTs();
+ auto cancelToken = operationContext()->getCancelationToken();
+ auto future = applier->applyUntilCloneFinishedTs(cancelToken);
future.get();
- future = applier->applyUntilDone();
+ future = applier->applyUntilDone(cancelToken);
ASSERT_THROWS_CODE(future.get(), DBException, ErrorCodes::FailedToParse);
@@ -584,7 +677,8 @@ TEST_F(ReshardingOplogApplierTest, ErrorWhileIteratingFirstOplogCloningPhase) {
executor,
writerPool.get());
- auto future = applier->applyUntilCloneFinishedTs();
+ auto cancelToken = operationContext()->getCancelationToken();
+ auto future = applier->applyUntilCloneFinishedTs(cancelToken);
ASSERT_THROWS_CODE(future.get(), DBException, ErrorCodes::InternalError);
@@ -630,10 +724,11 @@ TEST_F(ReshardingOplogApplierTest, ErrorWhileIteratingFirstOplogCatchUpPhase) {
executor,
writerPool.get());
- auto future = applier->applyUntilCloneFinishedTs();
+ auto cancelToken = operationContext()->getCancelationToken();
+ auto future = applier->applyUntilCloneFinishedTs(cancelToken);
future.get();
- future = applier->applyUntilDone();
+ future = applier->applyUntilDone(cancelToken);
ASSERT_THROWS_CODE(future.get(), DBException, ErrorCodes::InternalError);
DBDirectClient client(operationContext());
@@ -676,7 +771,8 @@ TEST_F(ReshardingOplogApplierTest, ErrorWhileIteratingFirstBatchCloningPhase) {
executor,
writerPool.get());
- auto future = applier->applyUntilCloneFinishedTs();
+ auto cancelToken = operationContext()->getCancelationToken();
+ auto future = applier->applyUntilCloneFinishedTs(cancelToken);
ASSERT_THROWS_CODE(future.get(), DBException, ErrorCodes::InternalError);
@@ -726,10 +822,11 @@ TEST_F(ReshardingOplogApplierTest, ErrorWhileIteratingFirstBatchCatchUpPhase) {
executor,
writerPool.get());
- auto future = applier->applyUntilCloneFinishedTs();
+ auto cancelToken = operationContext()->getCancelationToken();
+ auto future = applier->applyUntilCloneFinishedTs(cancelToken);
future.get();
- future = applier->applyUntilDone();
+ future = applier->applyUntilDone(cancelToken);
ASSERT_THROWS_CODE(future.get(), DBException, ErrorCodes::InternalError);
@@ -777,7 +874,8 @@ TEST_F(ReshardingOplogApplierTest, ErrorWhileIteratingSecondBatchCloningPhase) {
executor,
writerPool.get());
- auto future = applier->applyUntilCloneFinishedTs();
+ auto cancelToken = operationContext()->getCancelationToken();
+ auto future = applier->applyUntilCloneFinishedTs(cancelToken);
ASSERT_THROWS_CODE(future.get(), DBException, ErrorCodes::InternalError);
@@ -839,10 +937,11 @@ TEST_F(ReshardingOplogApplierTest, ErrorWhileIteratingSecondBatchCatchUpPhase) {
executor,
writerPool.get());
- auto future = applier->applyUntilCloneFinishedTs();
+ auto cancelToken = operationContext()->getCancelationToken();
+ auto future = applier->applyUntilCloneFinishedTs(cancelToken);
future.get();
- future = applier->applyUntilDone();
+ future = applier->applyUntilDone(cancelToken);
ASSERT_THROWS_CODE(future.get(), DBException, ErrorCodes::InternalError);
@@ -894,7 +993,8 @@ TEST_F(ReshardingOplogApplierTest, ExecutorIsShutDownCloningPhase) {
executor->shutdown();
- auto future = applier->applyUntilCloneFinishedTs();
+ auto cancelToken = operationContext()->getCancelationToken();
+ auto future = applier->applyUntilCloneFinishedTs(cancelToken);
ASSERT_THROWS_CODE(future.get(), DBException, ErrorCodes::ShutdownInProgress);
DBDirectClient client(operationContext());
@@ -937,11 +1037,12 @@ TEST_F(ReshardingOplogApplierTest, ExecutorIsShutDownCatchUpPhase) {
executor,
writerPool.get());
- auto future = applier->applyUntilCloneFinishedTs();
+ auto cancelToken = operationContext()->getCancelationToken();
+ auto future = applier->applyUntilCloneFinishedTs(cancelToken);
future.get();
executor->shutdown();
- future = applier->applyUntilDone();
+ future = applier->applyUntilDone(cancelToken);
ASSERT_THROWS_CODE(future.get(), DBException, ErrorCodes::ShutdownInProgress);
@@ -981,7 +1082,8 @@ TEST_F(ReshardingOplogApplierTest, WriterPoolIsShutDownCloningPhase) {
writerPool->shutdown();
- auto future = applier->applyUntilCloneFinishedTs();
+ auto cancelToken = operationContext()->getCancelationToken();
+ auto future = applier->applyUntilCloneFinishedTs(cancelToken);
ASSERT_THROWS_CODE(future.get(), DBException, ErrorCodes::ShutdownInProgress);
DBDirectClient client(operationContext());
@@ -1024,11 +1126,12 @@ TEST_F(ReshardingOplogApplierTest, WriterPoolIsShutDownCatchUpPhase) {
executor,
writerPool.get());
- auto future = applier->applyUntilCloneFinishedTs();
+ auto cancelToken = operationContext()->getCancelationToken();
+ auto future = applier->applyUntilCloneFinishedTs(cancelToken);
future.get();
writerPool->shutdown();
- future = applier->applyUntilDone();
+ future = applier->applyUntilDone(cancelToken);
ASSERT_THROWS_CODE(future.get(), DBException, ErrorCodes::ShutdownInProgress);
@@ -1080,7 +1183,8 @@ TEST_F(ReshardingOplogApplierTest, InsertOpIntoOuputCollectionUseReshardingAppli
executor,
writerPool.get());
- auto future = applier->applyUntilCloneFinishedTs();
+ auto cancelToken = operationContext()->getCancelationToken();
+ auto future = applier->applyUntilCloneFinishedTs(cancelToken);
future.get();
DBDirectClient client(operationContext());
@@ -1090,7 +1194,7 @@ TEST_F(ReshardingOplogApplierTest, InsertOpIntoOuputCollectionUseReshardingAppli
doc = client.findOne(appliedToNs().ns(), BSON("_id" << 2));
ASSERT_BSONOBJ_EQ(BSON("_id" << 2), doc);
- future = applier->applyUntilDone();
+ future = applier->applyUntilDone(cancelToken);
future.get();
doc = client.findOne(appliedToNs().ns(), BSON("_id" << 3));
@@ -1138,14 +1242,15 @@ TEST_F(ReshardingOplogApplierTest,
DBDirectClient client(operationContext());
client.insert(appliedToNs().toString(), BSON("_id" << 1 << "sk" << 1));
- auto future = applier->applyUntilCloneFinishedTs();
+ auto cancelToken = operationContext()->getCancelationToken();
+ auto future = applier->applyUntilCloneFinishedTs(cancelToken);
future.get();
// We should have replaced the existing doc in the output collection.
auto doc = client.findOne(appliedToNs().ns(), BSON("_id" << 1));
ASSERT_BSONOBJ_EQ(BSON("_id" << 1 << "sk" << 2), doc);
- future = applier->applyUntilDone();
+ future = applier->applyUntilDone(cancelToken);
future.get();
auto progressDoc = ReshardingOplogApplier::checkStoredProgress(operationContext(), sourceId());
@@ -1191,7 +1296,8 @@ TEST_F(ReshardingOplogApplierTest,
DBDirectClient client(operationContext());
client.insert(appliedToNs().toString(), BSON("_id" << 1 << "sk" << -1));
- auto future = applier->applyUntilCloneFinishedTs();
+ auto cancelToken = operationContext()->getCancelationToken();
+ auto future = applier->applyUntilCloneFinishedTs(cancelToken);
future.get();
// The output collection should still hold the doc {_id: 1, sk: -1}, and the doc with {_id: 1,
@@ -1202,7 +1308,7 @@ TEST_F(ReshardingOplogApplierTest,
doc = client.findOne(stashNs().ns(), BSON("_id" << 1));
ASSERT_BSONOBJ_EQ(BSON("_id" << 1 << "sk" << 2), doc);
- future = applier->applyUntilDone();
+ future = applier->applyUntilDone(cancelToken);
future.get();
// The output collection should still hold the doc {_id: 1, x: 1}. We should have applied rule
@@ -1257,7 +1363,8 @@ TEST_F(ReshardingOplogApplierTest,
DBDirectClient client(operationContext());
client.insert(appliedToNs().toString(), BSON("_id" << 1 << "sk" << -1));
- auto future = applier->applyUntilCloneFinishedTs();
+ auto cancelToken = operationContext()->getCancelationToken();
+ auto future = applier->applyUntilCloneFinishedTs(cancelToken);
future.get();
// The output collection should still hold the doc {_id: 1, sk: -1}, and the doc with {_id: 1,
@@ -1268,7 +1375,7 @@ TEST_F(ReshardingOplogApplierTest,
doc = client.findOne(stashNs().ns(), BSON("_id" << 1));
ASSERT_BSONOBJ_EQ(BSON("_id" << 1 << "sk" << 2), doc);
- future = applier->applyUntilDone();
+ future = applier->applyUntilDone(cancelToken);
future.get();
// We should have applied rule #1 and deleted the doc with {_id : 1} from the stash collection
@@ -1322,7 +1429,8 @@ TEST_F(ReshardingOplogApplierTest,
DBDirectClient client(operationContext());
client.insert(appliedToNs().ns(), BSON("_id" << 1 << "sk" << -1));
- auto future = applier->applyUntilCloneFinishedTs();
+ auto cancelToken = operationContext()->getCancelationToken();
+ auto future = applier->applyUntilCloneFinishedTs(cancelToken);
future.get();
// The doc {_id: 1, sk: -1} that exists in the output collection does not belong to this donor
@@ -1331,7 +1439,7 @@ TEST_F(ReshardingOplogApplierTest,
auto doc = client.findOne(appliedToNs().ns(), BSON("_id" << 1));
ASSERT_BSONOBJ_EQ(BSON("_id" << 1 << "sk" << -1), doc);
- future = applier->applyUntilDone();
+ future = applier->applyUntilDone(cancelToken);
future.get();
// There does not exist a doc with {_id : 2} in the output collection, so we should have applied
@@ -1389,7 +1497,8 @@ TEST_F(ReshardingOplogApplierTest,
writerPool.get());
// Apply the inserts first so there exists docs in the output collection
- auto future = applier->applyUntilCloneFinishedTs();
+ auto cancelToken = operationContext()->getCancelationToken();
+ auto future = applier->applyUntilCloneFinishedTs(cancelToken);
future.get();
DBDirectClient client(operationContext());
@@ -1399,7 +1508,7 @@ TEST_F(ReshardingOplogApplierTest,
doc = client.findOne(appliedToNs().ns(), BSON("_id" << 2));
ASSERT_BSONOBJ_EQ(BSON("_id" << 2 << "sk" << 2), doc);
- future = applier->applyUntilDone();
+ future = applier->applyUntilDone(cancelToken);
future.get();
// None of the stash collections have docs with _id == [op _id], so we should not have found any
@@ -1460,7 +1569,8 @@ TEST_F(ReshardingOplogApplierTest,
DBDirectClient client(operationContext());
client.insert(stashCollections()[1].toString(), BSON("_id" << 1 << "sk" << -3));
- auto future = applier->applyUntilCloneFinishedTs();
+ auto cancelToken = operationContext()->getCancelationToken();
+ auto future = applier->applyUntilCloneFinishedTs(cancelToken);
future.get();
// The output collection should now hold the doc {_id: 1, sk: 1}.
@@ -1475,7 +1585,7 @@ TEST_F(ReshardingOplogApplierTest,
doc = client.findOne(stashCollections()[1].toString(), BSON("_id" << 1));
ASSERT_BSONOBJ_EQ(BSON("_id" << 1 << "sk" << -3), doc);
- future = applier->applyUntilDone();
+ future = applier->applyUntilDone(cancelToken);
future.get();
// We should have applied rule #4 and deleted the doc that was in the output collection {_id: 1,
@@ -1548,7 +1658,8 @@ TEST_F(ReshardingOplogApplierTest, UpdateShouldModifyStashCollectionUseReshardin
DBDirectClient client(operationContext());
client.insert(appliedToNs().toString(), BSON("_id" << 1 << "sk" << -1));
- auto future = applier->applyUntilCloneFinishedTs();
+ auto cancelToken = operationContext()->getCancelationToken();
+ auto future = applier->applyUntilCloneFinishedTs(cancelToken);
future.get();
// The output collection should still hold the doc {_id: 1, sk: -1}, and the doc with {_id: 1,
@@ -1559,7 +1670,7 @@ TEST_F(ReshardingOplogApplierTest, UpdateShouldModifyStashCollectionUseReshardin
doc = client.findOne(stashNs().ns(), BSON("_id" << 1));
ASSERT_BSONOBJ_EQ(BSON("_id" << 1 << "sk" << 2), doc);
- future = applier->applyUntilDone();
+ future = applier->applyUntilDone(cancelToken);
future.get();
// We should have applied rule #1 and updated the doc with {_id : 1} in the stash collection
@@ -1612,7 +1723,8 @@ TEST_F(ReshardingOplogApplierTest, UpdateShouldDoNothingUseReshardingApplication
DBDirectClient client(operationContext());
client.insert(appliedToNs().ns(), BSON("_id" << 1 << "sk" << -1));
- auto future = applier->applyUntilCloneFinishedTs();
+ auto cancelToken = operationContext()->getCancelationToken();
+ auto future = applier->applyUntilCloneFinishedTs(cancelToken);
future.get();
// The doc {_id: 1, sk: -1} that exists in the output collection does not belong to this donor
@@ -1621,7 +1733,7 @@ TEST_F(ReshardingOplogApplierTest, UpdateShouldDoNothingUseReshardingApplication
auto doc = client.findOne(appliedToNs().ns(), BSON("_id" << 1));
ASSERT_BSONOBJ_EQ(BSON("_id" << 1 << "sk" << -1), doc);
- future = applier->applyUntilDone();
+ future = applier->applyUntilDone(cancelToken);
future.get();
// There does not exist a doc with {_id : 2} in the output collection, so we should have applied
@@ -1678,7 +1790,8 @@ TEST_F(ReshardingOplogApplierTest, UpdateOutputCollUseReshardingApplicationRules
writerPool.get());
// Apply the inserts first so there exists docs in the output collection.
- auto future = applier->applyUntilCloneFinishedTs();
+ auto cancelToken = operationContext()->getCancelationToken();
+ auto future = applier->applyUntilCloneFinishedTs(cancelToken);
future.get();
DBDirectClient client(operationContext());
@@ -1688,7 +1801,7 @@ TEST_F(ReshardingOplogApplierTest, UpdateOutputCollUseReshardingApplicationRules
doc = client.findOne(appliedToNs().ns(), BSON("_id" << 2));
ASSERT_BSONOBJ_EQ(BSON("_id" << 2 << "sk" << 2), doc);
- future = applier->applyUntilDone();
+ future = applier->applyUntilDone(cancelToken);
future.get();
// We should have updated both docs in the output collection to include the new field "x".
@@ -1737,14 +1850,15 @@ TEST_F(ReshardingOplogApplierTest, UnsupportedCommandOpsShouldErrorUseResharding
executor,
writerPool.get());
- auto future = applier->applyUntilCloneFinishedTs();
+ auto cancelToken = operationContext()->getCancelationToken();
+ auto future = applier->applyUntilCloneFinishedTs(cancelToken);
future.get();
DBDirectClient client(operationContext());
auto doc = client.findOne(appliedToNs().ns(), BSON("_id" << 1));
ASSERT_BSONOBJ_EQ(BSON("_id" << 1), doc);
- future = applier->applyUntilDone();
+ future = applier->applyUntilDone(cancelToken);
ASSERT_THROWS_CODE(future.get(), DBException, ErrorCodes::OplogOperationUnsupported);
@@ -1782,7 +1896,8 @@ TEST_F(ReshardingOplogApplierTest,
executor,
writerPool.get());
- auto future = applier->applyUntilCloneFinishedTs();
+ auto cancelToken = operationContext()->getCancelationToken();
+ auto future = applier->applyUntilCloneFinishedTs(cancelToken);
ASSERT_THROWS_CODE(future.get(), DBException, ErrorCodes::OplogOperationUnsupported);
@@ -2246,10 +2361,11 @@ TEST_F(ReshardingOplogApplierRetryableTest, GroupInserts) {
executor,
writerPool.get());
- auto future = applier->applyUntilCloneFinishedTs();
+ auto cancelToken = operationContext()->getCancelationToken();
+ auto future = applier->applyUntilCloneFinishedTs(cancelToken);
future.get();
- future = applier->applyUntilDone();
+ future = applier->applyUntilDone(cancelToken);
future.get();
DBDirectClient client(operationContext());
@@ -2333,10 +2449,11 @@ TEST_F(ReshardingOplogApplierRetryableTest, CrudWithEmptyConfigTransactions) {
executor,
writerPool.get());
- auto future = applier->applyUntilCloneFinishedTs();
+ auto cancelToken = operationContext()->getCancelationToken();
+ auto future = applier->applyUntilCloneFinishedTs(cancelToken);
future.get();
- future = applier->applyUntilDone();
+ future = applier->applyUntilDone(cancelToken);
future.get();
DBDirectClient client(operationContext());
@@ -2419,10 +2536,11 @@ TEST_F(ReshardingOplogApplierRetryableTest, MultipleTxnSameLsidInOneBatch) {
executor,
writerPool.get());
- auto future = applier->applyUntilCloneFinishedTs();
+ auto cancelToken = operationContext()->getCancelationToken();
+ auto future = applier->applyUntilCloneFinishedTs(cancelToken);
future.get();
- future = applier->applyUntilDone();
+ future = applier->applyUntilDone(cancelToken);
future.get();
DBDirectClient client(operationContext());
@@ -2479,10 +2597,11 @@ TEST_F(ReshardingOplogApplierRetryableTest, RetryableWithLowerExistingTxn) {
executor,
writerPool.get());
- auto future = applier->applyUntilCloneFinishedTs();
+ auto cancelToken = operationContext()->getCancelationToken();
+ auto future = applier->applyUntilCloneFinishedTs(cancelToken);
future.get();
- future = applier->applyUntilDone();
+ future = applier->applyUntilDone(cancelToken);
future.get();
DBDirectClient client(operationContext());
@@ -2532,10 +2651,11 @@ TEST_F(ReshardingOplogApplierRetryableTest, RetryableWithHigherExistingTxnNum) {
executor,
writerPool.get());
- auto future = applier->applyUntilCloneFinishedTs();
+ auto cancelToken = operationContext()->getCancelationToken();
+ auto future = applier->applyUntilCloneFinishedTs(cancelToken);
future.get();
- future = applier->applyUntilDone();
+ future = applier->applyUntilDone(cancelToken);
future.get();
// Op should always be applied, even if session info was not compatible.
@@ -2596,10 +2716,11 @@ TEST_F(ReshardingOplogApplierRetryableTest, RetryableWithEqualExistingTxnNum) {
executor,
writerPool.get());
- auto future = applier->applyUntilCloneFinishedTs();
+ auto cancelToken = operationContext()->getCancelationToken();
+ auto future = applier->applyUntilCloneFinishedTs(cancelToken);
future.get();
- future = applier->applyUntilDone();
+ future = applier->applyUntilDone(cancelToken);
future.get();
DBDirectClient client(operationContext());
@@ -2650,10 +2771,11 @@ TEST_F(ReshardingOplogApplierRetryableTest, RetryableWithStmtIdAlreadyExecuted)
executor,
writerPool.get());
- auto future = applier->applyUntilCloneFinishedTs();
+ auto cancelToken = operationContext()->getCancelationToken();
+ auto future = applier->applyUntilCloneFinishedTs(cancelToken);
future.get();
- future = applier->applyUntilDone();
+ future = applier->applyUntilDone(cancelToken);
future.get();
DBDirectClient client(operationContext());
@@ -2706,10 +2828,11 @@ TEST_F(ReshardingOplogApplierRetryableTest, RetryableWithActiveUnpreparedTxnSame
executor,
writerPool.get());
- auto future = applier->applyUntilCloneFinishedTs();
+ auto cancelToken = operationContext()->getCancelationToken();
+ auto future = applier->applyUntilCloneFinishedTs(cancelToken);
future.get();
- future = applier->applyUntilDone();
+ future = applier->applyUntilDone(cancelToken);
future.get();
// Op should always be applied, even if session info was not compatible.
@@ -2763,10 +2886,11 @@ TEST_F(ReshardingOplogApplierRetryableTest, RetryableWithActiveUnpreparedTxnWith
executor,
writerPool.get());
- auto future = applier->applyUntilCloneFinishedTs();
+ auto cancelToken = operationContext()->getCancelationToken();
+ auto future = applier->applyUntilCloneFinishedTs(cancelToken);
future.get();
- future = applier->applyUntilDone();
+ future = applier->applyUntilDone(cancelToken);
future.get();
// Op should always be applied, even if session info was not compatible.
@@ -2819,7 +2943,8 @@ TEST_F(ReshardingOplogApplierRetryableTest, RetryableWithPreparedTxnThatWillComm
executor,
writerPool.get());
- auto future = applier->applyUntilCloneFinishedTs();
+ auto cancelToken = operationContext()->getCancelationToken();
+ auto future = applier->applyUntilCloneFinishedTs(cancelToken);
// Sleep a little bit to make the applier block on the prepared transaction.
sleepmillis(200);
@@ -2830,7 +2955,7 @@ TEST_F(ReshardingOplogApplierRetryableTest, RetryableWithPreparedTxnThatWillComm
future.get();
- future = applier->applyUntilDone();
+ future = applier->applyUntilDone(cancelToken);
future.get();
// Op should always be applied, even if session info was not compatible.
@@ -2883,7 +3008,8 @@ TEST_F(ReshardingOplogApplierRetryableTest, RetryableWithPreparedTxnThatWillAbor
executor,
writerPool.get());
- auto future = applier->applyUntilCloneFinishedTs();
+ auto cancelToken = operationContext()->getCancelationToken();
+ auto future = applier->applyUntilCloneFinishedTs(cancelToken);
// Sleep a little bit to make the applier block on the prepared transaction.
sleepmillis(200);
@@ -2894,7 +3020,7 @@ TEST_F(ReshardingOplogApplierRetryableTest, RetryableWithPreparedTxnThatWillAbor
future.get();
- future = applier->applyUntilDone();
+ future = applier->applyUntilDone(cancelToken);
future.get();
// Op should always be applied, even if session info was not compatible.
@@ -2954,10 +3080,11 @@ TEST_F(ReshardingOplogApplierRetryableTest, RetryableWriteWithPreImage) {
executor,
writerPool.get());
- auto future = applier->applyUntilCloneFinishedTs();
+ auto cancelToken = operationContext()->getCancelationToken();
+ auto future = applier->applyUntilCloneFinishedTs(cancelToken);
future.get();
- future = applier->applyUntilDone();
+ future = applier->applyUntilDone(cancelToken);
future.get();
DBDirectClient client(operationContext());
@@ -3020,10 +3147,11 @@ TEST_F(ReshardingOplogApplierRetryableTest, RetryableWriteWithPostImage) {
executor,
writerPool.get());
- auto future = applier->applyUntilCloneFinishedTs();
+ auto cancelToken = operationContext()->getCancelationToken();
+ auto future = applier->applyUntilCloneFinishedTs(cancelToken);
future.get();
- future = applier->applyUntilDone();
+ future = applier->applyUntilDone(cancelToken);
future.get();
DBDirectClient client(operationContext());
@@ -3069,10 +3197,11 @@ TEST_F(ReshardingOplogApplierRetryableTest, ApplyTxnWithLowerExistingTxn) {
executor,
writerPool.get());
- auto future = applier->applyUntilCloneFinishedTs();
+ auto cancelToken = operationContext()->getCancelationToken();
+ auto future = applier->applyUntilCloneFinishedTs(cancelToken);
future.get();
- future = applier->applyUntilDone();
+ future = applier->applyUntilDone(cancelToken);
future.get();
DBDirectClient client(operationContext());
@@ -3119,10 +3248,11 @@ TEST_F(ReshardingOplogApplierRetryableTest, ApplyTxnWithHigherExistingTxnNum) {
executor,
writerPool.get());
- auto future = applier->applyUntilCloneFinishedTs();
+ auto cancelToken = operationContext()->getCancelationToken();
+ auto future = applier->applyUntilCloneFinishedTs(cancelToken);
future.get();
- future = applier->applyUntilDone();
+ future = applier->applyUntilDone(cancelToken);
future.get();
// Op should always be applied, even if txn info was not compatible.
@@ -3178,10 +3308,11 @@ TEST_F(ReshardingOplogApplierRetryableTest, ApplyTxnWithEqualExistingTxnNum) {
executor,
writerPool.get());
- auto future = applier->applyUntilCloneFinishedTs();
+ auto cancelToken = operationContext()->getCancelationToken();
+ auto future = applier->applyUntilCloneFinishedTs(cancelToken);
future.get();
- future = applier->applyUntilDone();
+ future = applier->applyUntilDone(cancelToken);
future.get();
DBDirectClient client(operationContext());
@@ -3231,10 +3362,11 @@ TEST_F(ReshardingOplogApplierRetryableTest, ApplyTxnWithActiveUnpreparedTxnSameT
executor,
writerPool.get());
- auto future = applier->applyUntilCloneFinishedTs();
+ auto cancelToken = operationContext()->getCancelationToken();
+ auto future = applier->applyUntilCloneFinishedTs(cancelToken);
future.get();
- future = applier->applyUntilDone();
+ future = applier->applyUntilDone(cancelToken);
future.get();
// Ops should always be applied regardless of conflict with existing txn.
@@ -3288,10 +3420,11 @@ TEST_F(ReshardingOplogApplierRetryableTest, ApplyTxnActiveUnpreparedTxnWithLower
executor,
writerPool.get());
- auto future = applier->applyUntilCloneFinishedTs();
+ auto cancelToken = operationContext()->getCancelationToken();
+ auto future = applier->applyUntilCloneFinishedTs(cancelToken);
future.get();
- future = applier->applyUntilDone();
+ future = applier->applyUntilDone(cancelToken);
future.get();
// Op should always be applied, even if txn info was not compatible.
@@ -3344,7 +3477,8 @@ TEST_F(ReshardingOplogApplierRetryableTest, ApplyTxnWithPreparedTxnThatWillCommi
executor,
writerPool.get());
- auto future = applier->applyUntilCloneFinishedTs();
+ auto cancelToken = operationContext()->getCancelationToken();
+ auto future = applier->applyUntilCloneFinishedTs(cancelToken);
// Sleep a little bit to make the applier block on the prepared transaction.
sleepmillis(200);
@@ -3355,7 +3489,7 @@ TEST_F(ReshardingOplogApplierRetryableTest, ApplyTxnWithPreparedTxnThatWillCommi
future.get();
- future = applier->applyUntilDone();
+ future = applier->applyUntilDone(cancelToken);
future.get();
// Op should always be applied, even if txn info was not compatible.
@@ -3404,7 +3538,8 @@ TEST_F(ReshardingOplogApplierRetryableTest, ApplyTxnWithPreparedTxnThatWillAbort
executor,
writerPool.get());
- auto future = applier->applyUntilCloneFinishedTs();
+ auto cancelToken = operationContext()->getCancelationToken();
+ auto future = applier->applyUntilCloneFinishedTs(cancelToken);
// Sleep a little bit to make the applier block on the prepared transaction.
sleepmillis(200);
@@ -3415,7 +3550,7 @@ TEST_F(ReshardingOplogApplierRetryableTest, ApplyTxnWithPreparedTxnThatWillAbort
future.get();
- future = applier->applyUntilDone();
+ future = applier->applyUntilDone(cancelToken);
future.get();
// Op should always be applied, even if txn info was not compatible.
@@ -3459,10 +3594,13 @@ TEST_F(ReshardingOplogApplierTest, MetricsAreReported) {
writerPool.get());
ASSERT_EQ(metricsAppliedCount(), 0);
- applier.applyUntilCloneFinishedTs().get(); // Stop at clone timestamp 7
+
+ auto cancelToken = operationContext()->getCancelationToken();
+
+ applier.applyUntilCloneFinishedTs(cancelToken).get(); // Stop at clone timestamp 7
ASSERT_EQ(metricsAppliedCount(),
4); // Applied timestamps {5,6,7}, and {8} drafts in on the batch.
- applier.applyUntilDone().get();
+ applier.applyUntilDone(cancelToken).get();
ASSERT_EQ(metricsAppliedCount(), 5); // Now includes timestamp {9}
}
diff --git a/src/mongo/db/s/resharding/resharding_oplog_fetcher.cpp b/src/mongo/db/s/resharding/resharding_oplog_fetcher.cpp
index 6e9f823b779..8e69accb0bc 100644
--- a/src/mongo/db/s/resharding/resharding_oplog_fetcher.cpp
+++ b/src/mongo/db/s/resharding/resharding_oplog_fetcher.cpp
@@ -112,10 +112,6 @@ Future<void> ReshardingOplogFetcher::awaitInsert(const ReshardingDonorOplogId& l
// being issued.
stdx::lock_guard lk(_mutex);
- if (_interruptStatus) {
- return Future<void>::makeReady(*_interruptStatus);
- }
-
if (lastSeen < _startAt) {
// `lastSeen < _startAt` means there's at least one document which has been inserted by
// ReshardingOplogFetcher and hasn't been returned by
@@ -135,20 +131,6 @@ Future<void> ReshardingOplogFetcher::awaitInsert(const ReshardingDonorOplogId& l
return std::move(_onInsertFuture);
}
-void ReshardingOplogFetcher::interrupt(Status status) {
- invariant(!status.isOK());
-
- // We replace the promise/future pair with a fresh one because consume() won't know an error has
- // already been set and would otherwise attempt to fulfill the promise again. Later calls to
- // awaitInsert() won't ever look at `_onInsertFuture` though.
- auto [p, f] = makePromiseFuture<void>();
- stdx::lock_guard lk(_mutex);
- _interruptStatus = status;
- _onInsertPromise.setError(*_interruptStatus);
- _onInsertPromise = std::move(p);
- _onInsertFuture = std::move(f);
-}
-
ExecutorFuture<void> ReshardingOplogFetcher::schedule(
std::shared_ptr<executor::TaskExecutor> executor, const CancelationToken& cancelToken) {
return ExecutorFuture(executor)
@@ -182,6 +164,13 @@ ExecutorFuture<void> ReshardingOplogFetcher::_reschedule(
if (!moreToCome) {
return ExecutorFuture(std::move(executor));
}
+
+ if (cancelToken.isCanceled()) {
+ return ExecutorFuture<void>(
+ executor,
+ Status{ErrorCodes::CallbackCanceled,
+ "Resharding oplog fetcher canceled due to abort or stepdown"});
+ }
return _reschedule(std::move(executor), cancelToken);
});
}
diff --git a/src/mongo/db/s/resharding/resharding_oplog_fetcher.h b/src/mongo/db/s/resharding/resharding_oplog_fetcher.h
index f01eafe09fd..43353369486 100644
--- a/src/mongo/db/s/resharding/resharding_oplog_fetcher.h
+++ b/src/mongo/db/s/resharding/resharding_oplog_fetcher.h
@@ -81,8 +81,6 @@ public:
Future<void> awaitInsert(const ReshardingDonorOplogId& lastSeen) override;
- void interrupt(Status status);
-
/**
* Schedules a task that will do the following:
*
@@ -160,7 +158,6 @@ private:
Mutex _mutex = MONGO_MAKE_LATCH("ReshardingOplogFetcher::_mutex");
Promise<void> _onInsertPromise;
Future<void> _onInsertFuture;
- boost::optional<Status> _interruptStatus;
// For testing to control behavior.
diff --git a/src/mongo/db/s/resharding/resharding_recipient_service.cpp b/src/mongo/db/s/resharding/resharding_recipient_service.cpp
index c95bfe6dd91..f46c264c865 100644
--- a/src/mongo/db/s/resharding/resharding_recipient_service.cpp
+++ b/src/mongo/db/s/resharding/resharding_recipient_service.cpp
@@ -238,19 +238,22 @@ ReshardingRecipientService::RecipientStateMachine::~RecipientStateMachine() {
SemiFuture<void> ReshardingRecipientService::RecipientStateMachine::run(
std::shared_ptr<executor::ScopedTaskExecutor> executor,
- const CancelationToken& cancelToken) noexcept {
+ const CancelationToken& stepdownToken) noexcept {
+ auto abortToken = _initAbortSource(stepdownToken);
+
return ExecutorFuture<void>(**executor)
.then([this, executor] {
_metrics()->onStart();
return _awaitAllDonorsPreparedToDonateThenTransitionToCreatingCollection(executor);
})
.then([this] { _createTemporaryReshardingCollectionThenTransitionToCloning(); })
- .then([this, executor, cancelToken] {
- return _cloneThenTransitionToApplying(executor, cancelToken);
+ .then([this, executor, abortToken] {
+ return _cloneThenTransitionToApplying(executor, abortToken);
})
.then([this, executor] { return _applyThenTransitionToSteadyState(executor); })
- .then([this, executor] {
- return _awaitAllDonorsBlockingWritesThenTransitionToStrictConsistency(executor);
+ .then([this, executor, abortToken] {
+ return _awaitAllDonorsBlockingWritesThenTransitionToStrictConsistency(executor,
+ abortToken);
})
.then([this, executor] {
return _awaitCoordinatorHasDecisionPersistedThenTransitionToRenaming(executor);
@@ -349,6 +352,12 @@ void ReshardingRecipientService::RecipientStateMachine::onReshardingFieldsChange
stdx::lock_guard<Latch> lk(_mutex);
if (reshardingFields.getAbortReason()) {
auto status = getStatusFromAbortReason(reshardingFields);
+ invariant(!status.isOK());
+
+ if (_abortSource) {
+ _abortSource->cancel();
+ }
+
_onAbortOrStepdown(lk, status);
return;
}
@@ -438,7 +447,7 @@ void ReshardingRecipientService::RecipientStateMachine::_initTxnCloner(
ExecutorFuture<void>
ReshardingRecipientService::RecipientStateMachine::_cloneThenTransitionToApplying(
const std::shared_ptr<executor::ScopedTaskExecutor>& executor,
- const CancelationToken& cancelToken) {
+ const CancelationToken& abortToken) {
if (_recipientCtx.getState() > RecipientStateEnum::kCloning) {
return ExecutorFuture(**executor);
}
@@ -494,15 +503,15 @@ ReshardingRecipientService::RecipientStateMachine::_cloneThenTransitionToApplyin
_oplogFetcherFutures.emplace_back(
_oplogFetchers.back()
- ->schedule(_oplogFetcherExecutor, cancelToken)
+ ->schedule(_oplogFetcherExecutor, abortToken)
.onError([](Status status) {
LOGV2(5259300, "Error fetching oplog entries", "error"_attr = redact(status));
return status;
}));
}
- return _collectionCloner->run(**executor, cancelToken)
- .then([this, executor, cancelToken] {
+ return _collectionCloner->run(**executor, abortToken)
+ .then([this, executor, abortToken] {
if (_txnCloners.empty()) {
return SemiFuture<void>::makeReady();
}
@@ -511,7 +520,7 @@ ReshardingRecipientService::RecipientStateMachine::_cloneThenTransitionToApplyin
std::vector<ExecutorFuture<void>> txnClonerFutures;
for (auto&& txnCloner : _txnCloners) {
- txnClonerFutures.push_back(txnCloner->run(serviceContext, **executor, cancelToken));
+ txnClonerFutures.push_back(txnCloner->run(serviceContext, **executor, abortToken));
}
return whenAllSucceed(std::move(txnClonerFutures));
@@ -548,13 +557,14 @@ ReshardingRecipientService::RecipientStateMachine::_applyThenTransitionToSteadyS
ExecutorFuture<void> ReshardingRecipientService::RecipientStateMachine::
_awaitAllDonorsBlockingWritesThenTransitionToStrictConsistency(
- const std::shared_ptr<executor::ScopedTaskExecutor>& executor) {
+ const std::shared_ptr<executor::ScopedTaskExecutor>& executor,
+ const CancelationToken& abortToken) {
if (_recipientCtx.getState() > RecipientStateEnum::kSteadyState) {
return ExecutorFuture<void>(**executor, Status::OK());
}
auto opCtx = cc().makeOperationContext();
- return _updateCoordinator(opCtx.get(), executor).then([this, executor] {
+ return _updateCoordinator(opCtx.get(), executor).then([this, executor, abortToken] {
auto numDonors = _donorShardIds.size();
_oplogAppliers.reserve(numDonors);
_oplogApplierWorkers.reserve(numDonors);
@@ -619,8 +629,10 @@ ExecutorFuture<void> ReshardingRecipientService::RecipientStateMachine::
// ReshardingCollectionCloner has finished. This is why applyUntilCloneFinishedTs() and
// applyUntilDone() are both called here in sequence.
auto* applier = _oplogAppliers.back().get();
- futuresToWaitOn.emplace_back(applier->applyUntilCloneFinishedTs().then(
- [applier] { return applier->applyUntilDone(); }));
+ futuresToWaitOn.emplace_back(
+ applier->applyUntilCloneFinishedTs(abortToken).then([applier, abortToken] {
+ return applier->applyUntilDone(abortToken);
+ }));
}
return whenAllSucceed(std::move(futuresToWaitOn))
@@ -930,10 +942,6 @@ void ReshardingRecipientService::RecipientStateMachine::_onAbortOrStepdown(WithL
_oplogFetcherExecutor->shutdown();
}
- for (auto&& fetcher : _oplogFetchers) {
- fetcher->interrupt(status);
- }
-
for (auto&& threadPool : _oplogApplierWorkers) {
threadPool->shutdown();
}
@@ -947,4 +955,11 @@ void ReshardingRecipientService::RecipientStateMachine::_onAbortOrStepdown(WithL
}
}
+CancelationToken ReshardingRecipientService::RecipientStateMachine::_initAbortSource(
+ const CancelationToken& stepdownToken) {
+ stdx::lock_guard<Latch> lk(_mutex);
+ _abortSource = CancelationSource(stepdownToken);
+ return _abortSource->token();
+}
+
} // namespace mongo
diff --git a/src/mongo/db/s/resharding/resharding_recipient_service.h b/src/mongo/db/s/resharding/resharding_recipient_service.h
index 12411127e6c..1e667da7195 100644
--- a/src/mongo/db/s/resharding/resharding_recipient_service.h
+++ b/src/mongo/db/s/resharding/resharding_recipient_service.h
@@ -143,13 +143,14 @@ private:
ExecutorFuture<void> _cloneThenTransitionToApplying(
const std::shared_ptr<executor::ScopedTaskExecutor>& executor,
- const CancelationToken& cancelToken);
+ const CancelationToken& abortToken);
ExecutorFuture<void> _applyThenTransitionToSteadyState(
const std::shared_ptr<executor::ScopedTaskExecutor>& executor);
ExecutorFuture<void> _awaitAllDonorsBlockingWritesThenTransitionToStrictConsistency(
- const std::shared_ptr<executor::ScopedTaskExecutor>& executor);
+ const std::shared_ptr<executor::ScopedTaskExecutor>& executor,
+ const CancelationToken& abortToken);
ExecutorFuture<void> _awaitCoordinatorHasDecisionPersistedThenTransitionToRenaming(
const std::shared_ptr<executor::ScopedTaskExecutor>& executor);
@@ -195,6 +196,11 @@ private:
// (abort resharding).
void _onAbortOrStepdown(WithLock, Status status);
+ // Initializes the _abortSource and generates a token from it to return back the caller.
+ //
+ // Should only be called once per lifetime.
+ CancelationToken _initAbortSource(const CancelationToken& stepdownToken);
+
// The in-memory representation of the immutable portion of the document in
// config.localReshardingOperations.recipient.
const CommonReshardingMetadata _metadata;
@@ -220,6 +226,9 @@ private:
// Protects the promises below
Mutex _mutex = MONGO_MAKE_LATCH("RecipientStateMachine::_mutex");
+ // Canceled when there is an unrecoverable error or stepdown.
+ boost::optional<CancelationSource> _abortSource;
+
boost::optional<ReshardingCriticalSection> _critSec;
// Each promise below corresponds to a state on the recipient state machine. They are listed in
diff --git a/src/mongo/db/s/resharding/resharding_txn_cloner.cpp b/src/mongo/db/s/resharding/resharding_txn_cloner.cpp
index 85056c4f861..052dc8c1396 100644
--- a/src/mongo/db/s/resharding/resharding_txn_cloner.cpp
+++ b/src/mongo/db/s/resharding/resharding_txn_cloner.cpp
@@ -347,7 +347,7 @@ ExecutorFuture<void> ReshardingTxnCloner::run(
chainCtx->donorRecord = boost::none;
return makeReadyFutureWith([] {}).share();
})
- .until([this, chainCtx](Status status) {
+ .until([this, cancelToken, chainCtx](Status status) {
if (status.isOK() && chainCtx->moreToCome) {
return false;
}
@@ -362,9 +362,12 @@ ExecutorFuture<void> ReshardingTxnCloner::run(
if (status.isA<ErrorCategory::CancelationError>() ||
status.isA<ErrorCategory::NotPrimaryError>()) {
// Cancellation and NotPrimary errors indicate the primary-only service Instance
- // will be shut down or is shutting down now. Don't retry and leave resuming to when
- // the RecipientStateMachine is restarted on the new primary.
- return true;
+ // will be shut down or is shutting down now - provided the cancelToken is also
+ // canceled. Otherwise, the errors may have originated from a remote response rather
+ // than the shard itself.
+ //
+ // Don't retry when primary-only service Instance is shutting down.
+ return !cancelToken.isCanceled();
}
if (status.isA<ErrorCategory::RetriableError>() ||
@@ -391,7 +394,19 @@ ExecutorFuture<void> ReshardingTxnCloner::run(
return true;
})
- .on(std::move(executor), std::move(cancelToken));
+ .on(executor, cancelToken)
+ .onCompletion([this, chainCtx](Status status) {
+ if (chainCtx->pipeline) {
+ // Guarantee the pipeline is always cleaned up - even upon cancelation.
+ _withTemporaryOperationContext([&](auto* opCtx) {
+ chainCtx->pipeline->dispose(opCtx);
+ chainCtx->pipeline.reset();
+ });
+ }
+
+ // Propagate the result of the AsyncTry.
+ return status;
+ });
}
std::unique_ptr<Pipeline, PipelineDeleter> createConfigTxnCloningPipelineForResharding(
diff --git a/src/mongo/db/s/resharding/resharding_txn_cloner_test.cpp b/src/mongo/db/s/resharding/resharding_txn_cloner_test.cpp
index 5a7ae102be4..e1a598be9b9 100644
--- a/src/mongo/db/s/resharding/resharding_txn_cloner_test.cpp
+++ b/src/mongo/db/s/resharding/resharding_txn_cloner_test.cpp
@@ -343,17 +343,21 @@ protected:
Grid::get(getServiceContext())->getExecutorPool()->getFixedExecutor());
}
- ExecutorFuture<void> runCloner(ReshardingTxnCloner& cloner,
- std::shared_ptr<executor::ThreadPoolTaskExecutor> executor) {
+ ExecutorFuture<void> runCloner(
+ ReshardingTxnCloner& cloner,
+ std::shared_ptr<executor::ThreadPoolTaskExecutor> executor,
+ boost::optional<CancelationToken> customCancelToken = boost::none) {
+ // Allows callers to control the cancelation of the cloner's run() function when specified.
+ auto cancelToken = customCancelToken.is_initialized()
+ ? customCancelToken.get()
+ : operationContext()->getCancelationToken();
+
// There isn't a guarantee that the reference count to `executor` has been decremented after
// .run() returns. We schedule a trivial task on the task executor to ensure the callback's
// destructor has run. Otherwise `executor` could end up outliving the ServiceContext and
// triggering an invariant due to the task executor's thread having a Client still.
return cloner
- .run(getServiceContext(),
- std::move(executor),
- operationContext()->getCancelationToken(),
- makeMongoProcessInterface())
+ .run(getServiceContext(), std::move(executor), cancelToken, makeMongoProcessInterface())
.onCompletion([](auto x) { return x; });
}
@@ -540,6 +544,27 @@ TEST_F(ReshardingTxnClonerTest, MergeMultiDocTransactionAndRetryableWrite) {
checkTxnHasBeenUpdated(sessionIdMultiDocTxn, txnNum);
}
+/**
+ * Test that the ReshardingTxnCloner stops processing batches when canceled via cancelToken.
+ */
+TEST_F(ReshardingTxnClonerTest, ClonerOneBatchThenCanceled) {
+ const auto txns = makeSortedTxns(4);
+ auto executor = makeTaskExecutorForCloner();
+ ReshardingTxnCloner cloner(kTwoSourceIdList[1], Timestamp::max());
+ auto opCtxToken = operationContext()->getCancelationToken();
+ auto cancelSource = CancelationSource(opCtxToken);
+ auto future = runCloner(cloner, executor, cancelSource.token());
+
+ onCommandReturnTxnBatch(std::vector<BSONObj>(txns.begin(), txns.begin() + 2),
+ CursorId{123},
+ true /* isFirstBatch */);
+
+ cancelSource.cancel();
+
+ auto status = future.getNoThrow();
+ ASSERT_EQ(status.code(), ErrorCodes::CallbackCanceled);
+}
+
TEST_F(ReshardingTxnClonerTest, ClonerStoresProgressSingleBatch) {
const auto txns = makeSortedTxns(2);
const auto lastLsid = getTxnRecordLsid(txns.back());
@@ -595,7 +620,8 @@ TEST_F(ReshardingTxnClonerTest, ClonerStoresProgressMultipleBatches) {
auto executor = makeTaskExecutorForCloner();
ReshardingTxnCloner cloner(kTwoSourceIdList[1], Timestamp::max());
- auto future = runCloner(cloner, executor);
+ auto cancelSource = CancelationSource(operationContext()->getCancelationToken());
+ auto future = runCloner(cloner, executor, cancelSource.token());
// The progress document is updated asynchronously after the session record is updated. We fake
// the cloning operation being canceled to inspect the progress document after the first batch
@@ -603,11 +629,14 @@ TEST_F(ReshardingTxnClonerTest, ClonerStoresProgressMultipleBatches) {
onCommandReturnTxnBatch(std::vector<BSONObj>(txns.begin(), txns.begin() + 2),
CursorId{123},
true /* isFirstBatch */);
-
onCommand([&](const executor::RemoteCommandRequest& request) {
+ // Simulate a stepdown.
+ cancelSource.cancel();
+
+ // With a non-mock network, disposing of the pipeline upon cancelation would also cancel the
+ // original request.
return Status{ErrorCodes::CallbackCanceled, "Simulate cancellation"};
});
-
auto status = future.getNoThrow();
ASSERT_EQ(status, ErrorCodes::CallbackCanceled);
@@ -641,7 +670,8 @@ TEST_F(ReshardingTxnClonerTest, ClonerStoresProgressResume) {
auto executor = makeTaskExecutorForCloner();
ReshardingTxnCloner cloner(kTwoSourceIdList[1], Timestamp::max());
- auto future = runCloner(cloner, executor);
+ auto cancelSource = CancelationSource(operationContext()->getCancelationToken());
+ auto future = runCloner(cloner, executor, cancelSource.token());
onCommandReturnTxnBatch({txns.front()}, CursorId{123}, true /* isFirstBatch */);
@@ -652,6 +682,11 @@ TEST_F(ReshardingTxnClonerTest, ClonerStoresProgressResume) {
ASSERT_FALSE(getTxnCloningProgress(kTwoSourceIdList[0]));
ASSERT_EQ(*getProgressLsid(kTwoSourceIdList[1]), firstLsid);
+ // Simulate a stepdown.
+ cancelSource.cancel();
+
+ // With a non-mock network, disposing of the pipeline upon cancelation would also cancel the
+ // original request.
return Status{ErrorCodes::CallbackCanceled, "Simulate cancellation"};
});