diff options
author | Pierlauro Sciarelli <pierlauro.sciarelli@mongodb.com> | 2022-09-29 13:02:26 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2022-09-29 14:06:07 +0000 |
commit | 71a59429935cc8cb11f627ee137e111124ef21f9 (patch) | |
tree | 174b4644abd3df6e0130fc6cf59fe494e729a4c3 /src/mongo/db/s/range_deleter_service.cpp | |
parent | 0b24ce9b359574b8f695fce8b4d9b80fdcaa4a10 (diff) | |
download | mongo-71a59429935cc8cb11f627ee137e111124ef21f9.tar.gz |
SERVER-70094 Synchronize shutdown with resuming of range deletions
Diffstat (limited to 'src/mongo/db/s/range_deleter_service.cpp')
-rw-r--r-- | src/mongo/db/s/range_deleter_service.cpp | 142 |
1 files changed, 73 insertions, 69 deletions
diff --git a/src/mongo/db/s/range_deleter_service.cpp b/src/mongo/db/s/range_deleter_service.cpp index 09c35f1fffb..170e32ec944 100644 --- a/src/mongo/db/s/range_deleter_service.cpp +++ b/src/mongo/db/s/range_deleter_service.cpp @@ -287,81 +287,82 @@ void RangeDeleterService::_recoverRangeDeletionsOnStepUp(OperationContext* opCtx ServiceContext* serviceContext = opCtx->getServiceContext(); - ExecutorFuture<void>(_executor) - .then([serviceContext, this] { - ThreadClient tc("ResubmitRangeDeletionsOnStepUp", serviceContext); - { - stdx::lock_guard<Client> lk(*tc.get()); - tc->setSystemOperationKillableByStepdown(lk); - } - auto opCtx = tc->makeOperationContext(); - opCtx->setAlwaysInterruptAtStepDownOrUp_UNSAFE(); - - ScopedRangeDeleterLock rangeDeleterLock(opCtx.get()); - DBDirectClient client(opCtx.get()); - - int nRescheduledTasks = 0; - - // (1) register range deletion tasks marked as "processing" - auto processingTasksCompletionFuture = [&] { - std::vector<ExecutorFuture<void>> processingTasksCompletionFutures; - FindCommandRequest findCommand(NamespaceString::kRangeDeletionNamespace); - findCommand.setFilter(BSON(RangeDeletionTask::kProcessingFieldName << true)); - auto cursor = client.find(std::move(findCommand)); - - while (cursor->more()) { - auto completionFuture = this->registerTask( - RangeDeletionTask::parse(IDLParserContext("rangeDeletionRecovery"), - cursor->next()), - SemiFuture<void>::makeReady(), - true /* fromResubmitOnStepUp */); - nRescheduledTasks++; - processingTasksCompletionFutures.push_back( - completionFuture.thenRunOn(_executor)); + _stepUpCompletedFuture = + ExecutorFuture<void>(_executor) + .then([serviceContext, this] { + ThreadClient tc("ResubmitRangeDeletionsOnStepUp", serviceContext); + { + stdx::lock_guard<Client> lk(*tc.get()); + tc->setSystemOperationKillableByStepdown(lk); } + auto opCtx = tc->makeOperationContext(); + opCtx->setAlwaysInterruptAtStepDownOrUp_UNSAFE(); + + ScopedRangeDeleterLock rangeDeleterLock(opCtx.get()); + DBDirectClient client(opCtx.get()); + + int nRescheduledTasks = 0; + + // (1) register range deletion tasks marked as "processing" + auto processingTasksCompletionFuture = [&] { + std::vector<ExecutorFuture<void>> processingTasksCompletionFutures; + FindCommandRequest findCommand(NamespaceString::kRangeDeletionNamespace); + findCommand.setFilter(BSON(RangeDeletionTask::kProcessingFieldName << true)); + auto cursor = client.find(std::move(findCommand)); + + while (cursor->more()) { + auto completionFuture = this->registerTask( + RangeDeletionTask::parse(IDLParserContext("rangeDeletionRecovery"), + cursor->next()), + SemiFuture<void>::makeReady(), + true /* fromResubmitOnStepUp */); + nRescheduledTasks++; + processingTasksCompletionFutures.push_back( + completionFuture.thenRunOn(_executor)); + } - if (nRescheduledTasks > 1) { - LOGV2_WARNING(6834801, - "Rescheduling several range deletions marked as processing. " - "Orphans count may be off while they are not drained", - "numRangeDeletionsMarkedAsProcessing"_attr = nRescheduledTasks); - } + if (nRescheduledTasks > 1) { + LOGV2_WARNING(6834801, + "Rescheduling several range deletions marked as processing. " + "Orphans count may be off while they are not drained", + "numRangeDeletionsMarkedAsProcessing"_attr = + nRescheduledTasks); + } + + return processingTasksCompletionFutures.size() > 0 + ? whenAllSucceed(std::move(processingTasksCompletionFutures)).share() + : SemiFuture<void>::makeReady().share(); + }(); - return processingTasksCompletionFutures.size() > 0 - ? whenAllSucceed(std::move(processingTasksCompletionFutures)).share() - : SemiFuture<void>::makeReady().share(); - }(); - - // (2) register all other "non-pending" tasks - { - FindCommandRequest findCommand(NamespaceString::kRangeDeletionNamespace); - findCommand.setFilter(BSON(RangeDeletionTask::kProcessingFieldName - << BSON("$ne" << true) - << RangeDeletionTask::kPendingFieldName - << BSON("$ne" << true))); - auto cursor = client.find(std::move(findCommand)); - while (cursor->more()) { - (void)this->registerTask( - RangeDeletionTask::parse(IDLParserContext("rangeDeletionRecovery"), - cursor->next()), - processingTasksCompletionFuture.thenRunOn(_executor).semi(), - true /* fromResubmitOnStepUp */); + // (2) register all other "non-pending" tasks + { + FindCommandRequest findCommand(NamespaceString::kRangeDeletionNamespace); + findCommand.setFilter(BSON(RangeDeletionTask::kProcessingFieldName + << BSON("$ne" << true) + << RangeDeletionTask::kPendingFieldName + << BSON("$ne" << true))); + auto cursor = client.find(std::move(findCommand)); + while (cursor->more()) { + (void)this->registerTask( + RangeDeletionTask::parse(IDLParserContext("rangeDeletionRecovery"), + cursor->next()), + processingTasksCompletionFuture.thenRunOn(_executor).semi(), + true /* fromResubmitOnStepUp */); + } } - } - LOGV2_INFO(6834802, - "Finished resubmitting range deletion tasks", - "nRescheduledTasks"_attr = nRescheduledTasks); + LOGV2_INFO(6834802, + "Finished resubmitting range deletion tasks", + "nRescheduledTasks"_attr = nRescheduledTasks); - auto lock = _acquireMutexUnconditionally(); - // Since the recovery is only spawned on step-up but may complete later, it's not - // assumable that the node is still primary when the all resubmissions finish - if (_state.load() != kDown) { - this->_rangeDeleterServiceUpCondVar_FOR_TESTING.notify_all(); - this->_state.store(kUp); - } - }) - .getAsync([](auto) {}); + auto lock = _acquireMutexUnconditionally(); + // Since the recovery is only spawned on step-up but may complete later, it's not + // assumable that the node is still primary when the all resubmissions finish + if (_state.load() != kDown) { + this->_state.store(kUp); + } + }) + .semi(); } void RangeDeleterService::_stopService(bool joinExecutor) { @@ -369,6 +370,9 @@ void RangeDeleterService::_stopService(bool joinExecutor) { return; } + // Join the thread spawned on step-up to resume range deletions + _stepUpCompletedFuture.getNoThrow().ignore(); + auto lock = _acquireMutexUnconditionally(); // It may happen for the `onStepDown` hook to be invoked on a SECONDARY node transitioning |