diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/mongo/db/s/range_deleter_service.cpp | 90 | ||||
-rw-r--r-- | src/mongo/db/s/range_deleter_service.h | 5 | ||||
-rw-r--r-- | src/mongo/db/s/range_deleter_service_test.cpp | 56 |
3 files changed, 142 insertions, 9 deletions
diff --git a/src/mongo/db/s/range_deleter_service.cpp b/src/mongo/db/s/range_deleter_service.cpp index 84de8538cc4..95e3f0477a3 100644 --- a/src/mongo/db/s/range_deleter_service.cpp +++ b/src/mongo/db/s/range_deleter_service.cpp @@ -28,10 +28,13 @@ */ #include "mongo/db/s/range_deleter_service.h" +#include "mongo/db/dbdirectclient.h" #include "mongo/db/op_observer/op_observer_registry.h" +#include "mongo/db/s/balancer_stats_registry.h" #include "mongo/db/s/range_deleter_service_op_observer.h" #include "mongo/logv2/log.h" #include "mongo/s/sharding_feature_flags_gen.h" +#include "mongo/util/future_util.h" #define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kShardingRangeDeleter @@ -83,18 +86,88 @@ void RangeDeleterService::onStepUpComplete(OperationContext* opCtx, long long te _executor = std::move(taskExecutor); _executor->startup(); - _recoverRangeDeletionsOnStepUp(); + _recoverRangeDeletionsOnStepUp(opCtx); } -void RangeDeleterService::_recoverRangeDeletionsOnStepUp() { - +void RangeDeleterService::_recoverRangeDeletionsOnStepUp(OperationContext* opCtx) { if (disableResumableRangeDeleter.load()) { _state.store(kDown); return; } - // TODO SERVER-68348 Asynchronously register tasks on the range deleter service on step-up - _state.store(kUp); + LOGV2(6834800, "Resubmitting range deletion tasks"); + + ServiceContext* serviceContext = opCtx->getServiceContext(); + + ExecutorFuture<void>(_executor) + .then([serviceContext, this] { + ThreadClient tc("ResubmitRangeDeletionsOnStepUp", serviceContext); + { + stdx::lock_guard<Client> lk(*tc.get()); + tc->setSystemOperationKillableByStepdown(lk); + } + auto opCtx = tc->makeOperationContext(); + opCtx->setAlwaysInterruptAtStepDownOrUp_UNSAFE(); + + ScopedRangeDeleterLock rangeDeleterLock(opCtx.get()); + DBDirectClient client(opCtx.get()); + + int nRescheduledTasks = 0; + + // (1) register range deletion tasks marked as "processing" + auto processingTasksCompletionFuture = [&] { + std::vector<ExecutorFuture<void>> processingTasksCompletionFutures; + FindCommandRequest findCommand(NamespaceString::kRangeDeletionNamespace); + findCommand.setFilter(BSON(RangeDeletionTask::kProcessingFieldName << true)); + auto cursor = client.find(std::move(findCommand)); + + while (cursor->more()) { + auto completionFuture = this->registerTask( + RangeDeletionTask::parse(IDLParserContext("rangeDeletionRecovery"), + cursor->next()), + SemiFuture<void>::makeReady(), + true /* fromResubmitOnStepUp */); + nRescheduledTasks++; + processingTasksCompletionFutures.push_back( + completionFuture.thenRunOn(_executor)); + } + + if (nRescheduledTasks > 1) { + LOGV2_WARNING(6834801, + "Rescheduling several range deletions marked as processing. " + "Orphans count may be off while they are not drained", + "numRangeDeletionsMarkedAsProcessing"_attr = nRescheduledTasks); + } + + return processingTasksCompletionFutures.size() > 0 + ? whenAllSucceed(std::move(processingTasksCompletionFutures)).share() + : SemiFuture<void>::makeReady().share(); + }(); + + // (2) register all other "non-pending" tasks + { + FindCommandRequest findCommand(NamespaceString::kRangeDeletionNamespace); + findCommand.setFilter(BSON(RangeDeletionTask::kProcessingFieldName + << BSON("$ne" << true) + << RangeDeletionTask::kPendingFieldName + << BSON("$ne" << true))); + auto cursor = client.find(std::move(findCommand)); + while (cursor->more()) { + (void)this->registerTask( + RangeDeletionTask::parse(IDLParserContext("rangeDeletionRecovery"), + cursor->next()), + processingTasksCompletionFuture.thenRunOn(_executor).semi(), + true /* fromResubmitOnStepUp */); + } + } + + LOGV2_INFO(6834802, + "Finished resubmitting range deletion tasks", + "nRescheduledTasks"_attr = nRescheduledTasks); + + this->_state.store(kUp); + }) + .getAsync([](auto) {}); } void RangeDeleterService::onStepDown() { @@ -124,7 +197,9 @@ BSONObj RangeDeleterService::dumpState() { } SharedSemiFuture<void> RangeDeleterService::registerTask( - const RangeDeletionTask& rdt, SemiFuture<void>&& waitForActiveQueriesToComplete) { + const RangeDeletionTask& rdt, + SemiFuture<void>&& waitForActiveQueriesToComplete, + bool fromResubmitOnStepUp) { if (disableResumableRangeDeleter.load()) { return SemiFuture<void>::makeReady( @@ -182,7 +257,8 @@ SharedSemiFuture<void> RangeDeleterService::registerTask( .share(); auto [taskCompletionFuture, inserted] = [&]() -> std::pair<SharedSemiFuture<void>, bool> { - auto lock = _acquireMutexFailIfServiceNotUp(); + auto lock = fromResubmitOnStepUp ? _acquireMutexUnconditionally() + : _acquireMutexFailIfServiceNotUp(); auto [registeredTask, inserted] = _rangeDeletionTasks[rdt.getCollectionUuid()].insert( std::make_shared<RangeDeletion>(RangeDeletion(rdt, chainCompletionFuture))); auto retFuture = static_cast<RangeDeletion*>(registeredTask->get())->getCompletionFuture(); diff --git a/src/mongo/db/s/range_deleter_service.h b/src/mongo/db/s/range_deleter_service.h index 75e2c4dd655..33144c3f190 100644 --- a/src/mongo/db/s/range_deleter_service.h +++ b/src/mongo/db/s/range_deleter_service.h @@ -120,7 +120,8 @@ public: */ SharedSemiFuture<void> registerTask( const RangeDeletionTask& rdt, - SemiFuture<void>&& waitForActiveQueriesToComplete = SemiFuture<void>::makeReady()); + SemiFuture<void>&& waitForActiveQueriesToComplete = SemiFuture<void>::makeReady(), + bool fromResubmitOnStepUp = false); /* * Deregister a task from the range deleter service. @@ -153,7 +154,7 @@ public: private: /* Asynchronously register range deletions on the service. To be called on on step-up */ - void _recoverRangeDeletionsOnStepUp(); + void _recoverRangeDeletionsOnStepUp(OperationContext* opCtx); /* ReplicaSetAwareServiceShardSvr "empty implemented" methods */ void onStartup(OperationContext* opCtx) override final{}; diff --git a/src/mongo/db/s/range_deleter_service_test.cpp b/src/mongo/db/s/range_deleter_service_test.cpp index d12da47c2f4..414f8b284d3 100644 --- a/src/mongo/db/s/range_deleter_service_test.cpp +++ b/src/mongo/db/s/range_deleter_service_test.cpp @@ -31,6 +31,7 @@ #include "mongo/bson/unordered_fields_bsonobj_comparator.h" #include "mongo/db/catalog/create_collection.h" #include "mongo/db/catalog_raii.h" +#include "mongo/db/persistent_task_store.h" #include "mongo/db/s/operation_sharding_state.h" namespace mongo { @@ -567,4 +568,59 @@ TEST_F(RangeDeleterServiceTest, ASSERT(overlappingRangeFutureWhenDisabled.isReady()); } +TEST_F(RangeDeleterServiceTest, RescheduleRangeDeletionTasksOnStepUp) { + PseudoRandom random(SecureRandom().nextInt64()); + auto rds = RangeDeleterService::get(opCtx); + + // Trigger step-down + rds->onStepDown(); + + // Random number of range deletion documents to generate (minimum 1, maximum 20). + int nRangeDeletionTasks = random.nextInt32(20) + 1; + + PersistentTaskStore<RangeDeletionTask> store(NamespaceString::kRangeDeletionNamespace); + + // Generate and persist range deleter tasks (some pending, some non-pending, some non-pending && + // processing) + int nPending = 0, nNonPending = 0, nNonPendingAndProcessing = 0; + int minBound = 0; + for (int i = 0; i < nRangeDeletionTasks; i++) { + auto rangeDeletionTask = createRangeDeletionTask( + uuidCollA, BSON("a" << minBound), BSON("a" << minBound + 10), CleanWhenEnum::kDelayed); + minBound += 10; + + auto rand = random.nextInt32() % 3; + if (rand == 0) { + // Pending range deletion task + rangeDeletionTask.setPending(true); + nPending++; + } else if (rand == 1) { + // Non-pending range deletion task + rangeDeletionTask.setPending(false); + nNonPending++; + } else if (rand == 2) { + // Non-pending and processing range deletion task + rangeDeletionTask.setPending(false); + rangeDeletionTask.setProcessing(true); + nNonPendingAndProcessing++; + } + + store.add(opCtx, rangeDeletionTask); + } + + // Trigger step-up + rds->onStepUpComplete(opCtx, 0L); + + // Check that all non-pending tasks are being rescheduled + while (true) { + try { + ASSERT_EQ(nNonPending + nNonPendingAndProcessing, + rds->getNumRangeDeletionTasksForCollection(uuidCollA)); + break; + } catch (const ExceptionFor<ErrorCodes::NotYetInitialized>&) { + // Retry as long as range deletion tasks are being resubmitted + } + } +} + } // namespace mongo |