diff options
author | Pierlauro Sciarelli <pierlauro.sciarelli@mongodb.com> | 2022-09-12 15:39:28 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2022-09-12 16:47:44 +0000 |
commit | 727d3de09c9e38206cbde56c8a431f6e7b480da6 (patch) | |
tree | cdd2dbd0544449f99cc59984593121e6dc3351f4 /src/mongo/db/s/range_deleter_service.cpp | |
parent | c7f9bb9676a8859a21aaea5fdd81e23a4e9d79e8 (diff) | |
download | mongo-727d3de09c9e38206cbde56c8a431f6e7b480da6.tar.gz |
SERVER-68348 Asynchronously register tasks on the range deleter service on step-up
Diffstat (limited to 'src/mongo/db/s/range_deleter_service.cpp')
-rw-r--r-- | src/mongo/db/s/range_deleter_service.cpp | 103 |
1 files changed, 94 insertions, 9 deletions
diff --git a/src/mongo/db/s/range_deleter_service.cpp b/src/mongo/db/s/range_deleter_service.cpp index f90618421d5..a24e14082f9 100644 --- a/src/mongo/db/s/range_deleter_service.cpp +++ b/src/mongo/db/s/range_deleter_service.cpp @@ -28,10 +28,13 @@ */ #include "mongo/db/s/range_deleter_service.h" +#include "mongo/db/dbdirectclient.h" #include "mongo/db/op_observer/op_observer_registry.h" +#include "mongo/db/s/balancer_stats_registry.h" #include "mongo/db/s/range_deleter_service_op_observer.h" #include "mongo/logv2/log.h" #include "mongo/s/sharding_feature_flags_gen.h" +#include "mongo/util/future_util.h" #define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kShardingRangeDeleter @@ -83,18 +86,94 @@ void RangeDeleterService::onStepUpComplete(OperationContext* opCtx, long long te _executor = std::move(taskExecutor); _executor->startup(); - _recoverRangeDeletionsOnStepUp(); + _recoverRangeDeletionsOnStepUp(opCtx); } -void RangeDeleterService::_recoverRangeDeletionsOnStepUp() { - +void RangeDeleterService::_recoverRangeDeletionsOnStepUp(OperationContext* opCtx) { if (disableResumableRangeDeleter.load()) { _state.store(kDown); return; } - // TODO SERVER-68348 Asynchronously register tasks on the range deleter service on step-up - _state.store(kUp); + LOGV2(6834800, "Resubmitting range deletion tasks"); + + ServiceContext* serviceContext = opCtx->getServiceContext(); + + ExecutorFuture<void>(_executor) + .then([serviceContext, this] { + ThreadClient tc("ResubmitRangeDeletionsOnStepUp", serviceContext); + { + stdx::lock_guard<Client> lk(*tc.get()); + tc->setSystemOperationKillableByStepdown(lk); + } + auto opCtx = tc->makeOperationContext(); + opCtx->setAlwaysInterruptAtStepDownOrUp_UNSAFE(); + + ScopedRangeDeleterLock rangeDeleterLock(opCtx.get()); + DBDirectClient client(opCtx.get()); + + int nRescheduledTasks = 0; + + // (1) register range deletion tasks marked as "processing" + auto processingTasksCompletionFuture = [&] { + std::vector<ExecutorFuture<void>> processingTasksCompletionFutures; + FindCommandRequest findCommand(NamespaceString::kRangeDeletionNamespace); + findCommand.setFilter(BSON(RangeDeletionTask::kProcessingFieldName << true)); + auto cursor = client.find(std::move(findCommand)); + + while (cursor->more()) { + auto completionFuture = this->registerTask( + RangeDeletionTask::parse(IDLParserContext("rangeDeletionRecovery"), + cursor->next()), + SemiFuture<void>::makeReady(), + true /* fromResubmitOnStepUp */); + nRescheduledTasks++; + processingTasksCompletionFutures.push_back( + completionFuture.thenRunOn(_executor)); + } + + if (nRescheduledTasks > 1) { + LOGV2_WARNING(6834801, + "Rescheduling several range deletions marked as processing. " + "Orphans count may be off while they are not drained", + "numRangeDeletionsMarkedAsProcessing"_attr = nRescheduledTasks); + } + + return processingTasksCompletionFutures.size() > 0 + ? whenAllSucceed(std::move(processingTasksCompletionFutures)).share() + : SemiFuture<void>::makeReady().share(); + }(); + + // (2) register all other "non-pending" tasks + { + FindCommandRequest findCommand(NamespaceString::kRangeDeletionNamespace); + findCommand.setFilter(BSON(RangeDeletionTask::kProcessingFieldName + << BSON("$ne" << true) + << RangeDeletionTask::kPendingFieldName + << BSON("$ne" << true))); + auto cursor = client.find(std::move(findCommand)); + while (cursor->more()) { + (void)this->registerTask( + RangeDeletionTask::parse(IDLParserContext("rangeDeletionRecovery"), + cursor->next()), + processingTasksCompletionFuture.thenRunOn(_executor).semi(), + true /* fromResubmitOnStepUp */); + } + } + + LOGV2_INFO(6834802, + "Finished resubmitting range deletion tasks", + "nRescheduledTasks"_attr = nRescheduledTasks); + + auto lock = _acquireMutexUnconditionally(); + // Since the recovery is only spawned on step-up but may complete later, it's not + // assumable that the node is still primary when the all resubmissions finish + if (_state.load() != kDown) { + this->_rangeDeleterServiceUpCondVar_FOR_TESTING.notify_all(); + this->_state.store(kUp); + } + }) + .getAsync([](auto) {}); } void RangeDeleterService::onStepDown() { @@ -103,9 +182,12 @@ void RangeDeleterService::onStepDown() { } auto lock = _acquireMutexUnconditionally(); - dassert(_state.load() != kDown, "Service expected to be initializing/up before stepping down"); - _executor->shutdown(); + // It may happen for the `onStepDown` hook to be invoked on a SECONDARY node transitioning + // to ROLLBACK, hence the executor may have never been initialized + if (_executor) { + _executor->shutdown(); + } _state.store(kDown); } @@ -149,7 +231,9 @@ long long RangeDeleterService::totalNumOfRegisteredTasks() { } SharedSemiFuture<void> RangeDeleterService::registerTask( - const RangeDeletionTask& rdt, SemiFuture<void>&& waitForActiveQueriesToComplete) { + const RangeDeletionTask& rdt, + SemiFuture<void>&& waitForActiveQueriesToComplete, + bool fromResubmitOnStepUp) { if (disableResumableRangeDeleter.load()) { return SemiFuture<void>::makeReady( @@ -207,7 +291,8 @@ SharedSemiFuture<void> RangeDeleterService::registerTask( .share(); auto [taskCompletionFuture, inserted] = [&]() -> std::pair<SharedSemiFuture<void>, bool> { - auto lock = _acquireMutexFailIfServiceNotUp(); + auto lock = fromResubmitOnStepUp ? _acquireMutexUnconditionally() + : _acquireMutexFailIfServiceNotUp(); auto [registeredTask, inserted] = _rangeDeletionTasks[rdt.getCollectionUuid()].insert( std::make_shared<RangeDeletion>(RangeDeletion(rdt, chainCompletionFuture))); auto retFuture = static_cast<RangeDeletion*>(registeredTask->get())->getCompletionFuture(); |