From 6f7a69e8d5aef3d924a639c4f8625bc259334ffb Mon Sep 17 00:00:00 2001 From: Kshitij Gupta Date: Tue, 21 Sep 2021 21:45:35 +0000 Subject: SERVER-59329: Make sure that withTemporaryOperationContext throw an error if the node is no longer a primary --- src/mongo/db/s/range_deletion_util.cpp | 164 ++++++++++++++++++--------------- 1 file changed, 92 insertions(+), 72 deletions(-) diff --git a/src/mongo/db/s/range_deletion_util.cpp b/src/mongo/db/s/range_deletion_util.cpp index 5ad24b06cef..4a0e7dc968e 100644 --- a/src/mongo/db/s/range_deletion_util.cpp +++ b/src/mongo/db/s/range_deletion_util.cpp @@ -242,7 +242,7 @@ StatusWith deleteNextBatch(OperationContext* opCtx, template -auto withTemporaryOperationContext(Callable&& callable) { +auto withTemporaryOperationContext(Callable&& callable, const NamespaceString& nss) { ThreadClient tc(migrationutil::kRangeDeletionThreadName, getGlobalServiceContext()); { stdx::lock_guard lk(*tc.get()); @@ -255,6 +255,16 @@ auto withTemporaryOperationContext(Callable&& callable) { opCtx->setAlwaysInterruptAtStepDownOrUp(); invariant(opCtx->shouldAlwaysInterruptAtStepDownOrUp()); + { + auto replCoord = repl::ReplicationCoordinator::get(opCtx); + Lock::GlobalLock lock(opCtx, MODE_IX); + uassert(ErrorCodes::PrimarySteppedDown, + str::stream() << "Not primary while running range deletion task for collection" + << nss, + replCoord->getReplicationMode() == repl::ReplicationCoordinator::modeReplSet && + replCoord->canAcceptWritesFor(opCtx, nss)); + } + return callable(opCtx); } @@ -297,48 +307,51 @@ ExecutorFuture deleteRangeInBatches(const std::shared_ptr swNumDeleted) { // Continue iterating until there are no more documents to delete, retrying on @@ -365,23 +378,28 @@ ExecutorFuture deleteRangeInBatches(const std::shared_ptr store(NamespaceString::kRangeDeletionNamespace); + withTemporaryOperationContext( + [&](OperationContext* opCtx) { + PersistentTaskStore store(NamespaceString::kRangeDeletionNamespace); - store.remove(opCtx, BSON(RangeDeletionTask::kIdFieldName << migrationId)); - }); + store.remove(opCtx, BSON(RangeDeletionTask::kIdFieldName << migrationId)); + }, + nss); } ExecutorFuture waitForDeletionsToMajorityReplicate( @@ -389,23 +407,25 @@ ExecutorFuture waitForDeletionsToMajorityReplicate( const NamespaceString& nss, const UUID& collectionUuid, const ChunkRange& range) { - return withTemporaryOperationContext([=](OperationContext* opCtx) { - repl::ReplClientInfo::forClient(opCtx->getClient()).setLastOpToSystemLastOpTime(opCtx); - auto clientOpTime = repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp(); + return withTemporaryOperationContext( + [=](OperationContext* opCtx) { + repl::ReplClientInfo::forClient(opCtx->getClient()).setLastOpToSystemLastOpTime(opCtx); + auto clientOpTime = repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp(); - LOGV2_DEBUG(5346202, - 1, - "Waiting for majority replication of local deletions", - "namespace"_attr = nss.ns(), - "collectionUUID"_attr = collectionUuid, - "range"_attr = redact(range.toString()), - "clientOpTime"_attr = clientOpTime); - - // Asynchronously wait for majority write concern. - return WaitForMajorityService::get(opCtx->getServiceContext()) - .waitUntilMajority(clientOpTime, CancellationToken::uncancelable()) - .thenRunOn(executor); - }); + LOGV2_DEBUG(5346202, + 1, + "Waiting for majority replication of local deletions", + "namespace"_attr = nss.ns(), + "collectionUUID"_attr = collectionUuid, + "range"_attr = redact(range.toString()), + "clientOpTime"_attr = clientOpTime); + + // Asynchronously wait for majority write concern. + return WaitForMajorityService::get(opCtx->getServiceContext()) + .waitUntilMajority(clientOpTime, CancellationToken::uncancelable()) + .thenRunOn(executor); + }, + nss); } std::vector getPersistentRangeDeletionTasks(OperationContext* opCtx, -- cgit v1.2.1