diff options
-rw-r--r-- | src/mongo/db/s/range_deletion_util.cpp | 164 |
1 files changed, 92 insertions, 72 deletions
diff --git a/src/mongo/db/s/range_deletion_util.cpp b/src/mongo/db/s/range_deletion_util.cpp index 5ad24b06cef..4a0e7dc968e 100644 --- a/src/mongo/db/s/range_deletion_util.cpp +++ b/src/mongo/db/s/range_deletion_util.cpp @@ -242,7 +242,7 @@ StatusWith<int> deleteNextBatch(OperationContext* opCtx, template <typename Callable> -auto withTemporaryOperationContext(Callable&& callable) { +auto withTemporaryOperationContext(Callable&& callable, const NamespaceString& nss) { ThreadClient tc(migrationutil::kRangeDeletionThreadName, getGlobalServiceContext()); { stdx::lock_guard<Client> lk(*tc.get()); @@ -255,6 +255,16 @@ auto withTemporaryOperationContext(Callable&& callable) { opCtx->setAlwaysInterruptAtStepDownOrUp(); invariant(opCtx->shouldAlwaysInterruptAtStepDownOrUp()); + { + auto replCoord = repl::ReplicationCoordinator::get(opCtx); + Lock::GlobalLock lock(opCtx, MODE_IX); + uassert(ErrorCodes::PrimarySteppedDown, + str::stream() << "Not primary while running range deletion task for collection" + << nss, + replCoord->getReplicationMode() == repl::ReplicationCoordinator::modeReplSet && + replCoord->canAcceptWritesFor(opCtx, nss)); + } + return callable(opCtx); } @@ -297,48 +307,51 @@ ExecutorFuture<void> deleteRangeInBatches(const std::shared_ptr<executor::TaskEx int numDocsToRemovePerBatch, Milliseconds delayBetweenBatches) { return AsyncTry([=] { - return withTemporaryOperationContext([=](OperationContext* opCtx) { - LOGV2_DEBUG(5346200, - 1, - "Starting batch deletion", - "namespace"_attr = nss, - "range"_attr = redact(range.toString()), - "numDocsToRemovePerBatch"_attr = numDocsToRemovePerBatch, - "delayBetweenBatches"_attr = delayBetweenBatches); - - if (migrationId) { - ensureRangeDeletionTaskStillExists(opCtx, *migrationId); - } - - AutoGetCollection collection(opCtx, nss, MODE_IX); - - // Ensure the collection exists and has not been dropped or dropped and - // recreated. - uassert( - ErrorCodes::RangeDeletionAbandonedBecauseCollectionWithUUIDDoesNotExist, - "Collection has been dropped since enqueuing this range " - "deletion task. No need to delete documents.", - !collectionUuidHasChanged(nss, collection.getCollection(), collectionUuid)); - - auto numDeleted = uassertStatusOK(deleteNextBatch(opCtx, - collection.getCollection(), - keyPattern, - range, - numDocsToRemovePerBatch)); - - LOGV2_DEBUG( - 23769, - 1, - "Deleted {numDeleted} documents in pass in namespace {namespace} with " - "UUID {collectionUUID} for range {range}", - "Deleted documents in pass", - "numDeleted"_attr = numDeleted, - "namespace"_attr = nss.ns(), - "collectionUUID"_attr = collectionUuid, - "range"_attr = range.toString()); - - return numDeleted; - }); + return withTemporaryOperationContext( + [=](OperationContext* opCtx) { + LOGV2_DEBUG(5346200, + 1, + "Starting batch deletion", + "namespace"_attr = nss, + "range"_attr = redact(range.toString()), + "numDocsToRemovePerBatch"_attr = numDocsToRemovePerBatch, + "delayBetweenBatches"_attr = delayBetweenBatches); + + if (migrationId) { + ensureRangeDeletionTaskStillExists(opCtx, *migrationId); + } + + AutoGetCollection collection(opCtx, nss, MODE_IX); + + // Ensure the collection exists and has not been dropped or dropped and + // recreated. + uassert( + ErrorCodes::RangeDeletionAbandonedBecauseCollectionWithUUIDDoesNotExist, + "Collection has been dropped since enqueuing this range " + "deletion task. No need to delete documents.", + !collectionUuidHasChanged( + nss, collection.getCollection(), collectionUuid)); + + auto numDeleted = uassertStatusOK(deleteNextBatch(opCtx, + collection.getCollection(), + keyPattern, + range, + numDocsToRemovePerBatch)); + + LOGV2_DEBUG( + 23769, + 1, + "Deleted {numDeleted} documents in pass in namespace {namespace} with " + "UUID {collectionUUID} for range {range}", + "Deleted documents in pass", + "numDeleted"_attr = numDeleted, + "namespace"_attr = nss.ns(), + "collectionUUID"_attr = collectionUuid, + "range"_attr = range.toString()); + + return numDeleted; + }, + nss); }) .until([](StatusWith<int> swNumDeleted) { // Continue iterating until there are no more documents to delete, retrying on @@ -365,23 +378,28 @@ ExecutorFuture<void> deleteRangeInBatches(const std::shared_ptr<executor::TaskEx void notifySecondariesThatDeletionIsOccurring(const NamespaceString& nss, const UUID& collectionUuid, const ChunkRange& range) { - withTemporaryOperationContext([&](OperationContext* opCtx) { - AutoGetCollection autoAdmin(opCtx, NamespaceString::kServerConfigurationNamespace, MODE_IX); - Helpers::upsert(opCtx, - NamespaceString::kServerConfigurationNamespace.ns(), - BSON("_id" - << "startRangeDeletion" - << "ns" << nss.ns() << "uuid" << collectionUuid << "min" - << range.getMin() << "max" << range.getMax())); - }); + withTemporaryOperationContext( + [&](OperationContext* opCtx) { + AutoGetCollection autoAdmin( + opCtx, NamespaceString::kServerConfigurationNamespace, MODE_IX); + Helpers::upsert(opCtx, + NamespaceString::kServerConfigurationNamespace.ns(), + BSON("_id" + << "startRangeDeletion" + << "ns" << nss.ns() << "uuid" << collectionUuid << "min" + << range.getMin() << "max" << range.getMax())); + }, + nss); } void removePersistentRangeDeletionTask(const NamespaceString& nss, UUID migrationId) { - withTemporaryOperationContext([&](OperationContext* opCtx) { - PersistentTaskStore<RangeDeletionTask> store(NamespaceString::kRangeDeletionNamespace); + withTemporaryOperationContext( + [&](OperationContext* opCtx) { + PersistentTaskStore<RangeDeletionTask> store(NamespaceString::kRangeDeletionNamespace); - store.remove(opCtx, BSON(RangeDeletionTask::kIdFieldName << migrationId)); - }); + store.remove(opCtx, BSON(RangeDeletionTask::kIdFieldName << migrationId)); + }, + nss); } ExecutorFuture<void> waitForDeletionsToMajorityReplicate( @@ -389,23 +407,25 @@ ExecutorFuture<void> waitForDeletionsToMajorityReplicate( const NamespaceString& nss, const UUID& collectionUuid, const ChunkRange& range) { - return withTemporaryOperationContext([=](OperationContext* opCtx) { - repl::ReplClientInfo::forClient(opCtx->getClient()).setLastOpToSystemLastOpTime(opCtx); - auto clientOpTime = repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp(); + return withTemporaryOperationContext( + [=](OperationContext* opCtx) { + repl::ReplClientInfo::forClient(opCtx->getClient()).setLastOpToSystemLastOpTime(opCtx); + auto clientOpTime = repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp(); - LOGV2_DEBUG(5346202, - 1, - "Waiting for majority replication of local deletions", - "namespace"_attr = nss.ns(), - "collectionUUID"_attr = collectionUuid, - "range"_attr = redact(range.toString()), - "clientOpTime"_attr = clientOpTime); - - // Asynchronously wait for majority write concern. - return WaitForMajorityService::get(opCtx->getServiceContext()) - .waitUntilMajority(clientOpTime, CancellationToken::uncancelable()) - .thenRunOn(executor); - }); + LOGV2_DEBUG(5346202, + 1, + "Waiting for majority replication of local deletions", + "namespace"_attr = nss.ns(), + "collectionUUID"_attr = collectionUuid, + "range"_attr = redact(range.toString()), + "clientOpTime"_attr = clientOpTime); + + // Asynchronously wait for majority write concern. + return WaitForMajorityService::get(opCtx->getServiceContext()) + .waitUntilMajority(clientOpTime, CancellationToken::uncancelable()) + .thenRunOn(executor); + }, + nss); } std::vector<RangeDeletionTask> getPersistentRangeDeletionTasks(OperationContext* opCtx, |