diff options
author | Jennifer Peshansky <jennifer.peshansky@mongodb.com> | 2022-11-03 16:13:20 +0000 |
---|---|---|
committer | Jennifer Peshansky <jennifer.peshansky@mongodb.com> | 2022-11-03 16:13:20 +0000 |
commit | e74d2910bbe76790ad131d53fee277829cd95982 (patch) | |
tree | cabe148764529c9623652374fbc36323a550cd44 /src/mongo/db/s/range_deleter_service.cpp | |
parent | 280145e9940729480bb8a35453d4056afac87641 (diff) | |
parent | ba467f46cc1bc49965e1d72b541eff0cf1d7b22e (diff) | |
download | mongo-jenniferpeshansky/SERVER-70854.tar.gz |
Merge branch 'master' into jenniferpeshansky/SERVER-70854jenniferpeshansky/SERVER-70854
Diffstat (limited to 'src/mongo/db/s/range_deleter_service.cpp')
-rw-r--r-- | src/mongo/db/s/range_deleter_service.cpp | 154 |
1 files changed, 107 insertions, 47 deletions
diff --git a/src/mongo/db/s/range_deleter_service.cpp b/src/mongo/db/s/range_deleter_service.cpp index acabcc40bea..ca80b24aa3e 100644 --- a/src/mongo/db/s/range_deleter_service.cpp +++ b/src/mongo/db/s/range_deleter_service.cpp @@ -86,6 +86,50 @@ RangeDeleterService* RangeDeleterService::get(OperationContext* opCtx) { return get(opCtx->getServiceContext()); } +RangeDeleterService::ReadyRangeDeletionsProcessor::ReadyRangeDeletionsProcessor( + OperationContext* opCtx) + : _thread([this] { _runRangeDeletions(); }) {} + +RangeDeleterService::ReadyRangeDeletionsProcessor::~ReadyRangeDeletionsProcessor() { + shutdown(); + invariant(_thread.joinable()); + _thread.join(); + invariant(!_threadOpCtxHolder, + "Thread operation context is still alive after joining main thread"); +} + +void RangeDeleterService::ReadyRangeDeletionsProcessor::shutdown() { + stdx::lock_guard<Latch> lock(_mutex); + if (_state == kStopped) + return; + + _state = kStopped; + + if (_threadOpCtxHolder) { + stdx::lock_guard<Client> scopedClientLock(*_threadOpCtxHolder->getClient()); + _threadOpCtxHolder->markKilled(ErrorCodes::Interrupted); + } +} + +bool RangeDeleterService::ReadyRangeDeletionsProcessor::_stopRequested() const { + stdx::unique_lock<Latch> lock(_mutex); + return _state == kStopped; +} + +void RangeDeleterService::ReadyRangeDeletionsProcessor::emplaceRangeDeletion( + const RangeDeletionTask& rdt) { + stdx::unique_lock<Latch> lock(_mutex); + invariant(_state == kRunning); + _queue.push(rdt); + _condVar.notify_all(); +} + +void RangeDeleterService::ReadyRangeDeletionsProcessor::_completedRangeDeletion() { + stdx::unique_lock<Latch> lock(_mutex); + dassert(!_queue.empty()); + _queue.pop(); +} + void RangeDeleterService::ReadyRangeDeletionsProcessor::_runRangeDeletions() { Client::initThread(kRangeDeletionThreadName); { @@ -93,21 +137,22 @@ void RangeDeleterService::ReadyRangeDeletionsProcessor::_runRangeDeletions() { cc().setSystemOperationKillableByStepdown(lk); } - auto opCtx = [&] { - stdx::unique_lock<Latch> lock(_mutex); + { + stdx::lock_guard<Latch> lock(_mutex); + if (_state != kRunning) { + return; + } _threadOpCtxHolder = cc().makeOperationContext(); - _condVar.notify_all(); - return (*_threadOpCtxHolder).get(); - }(); + } - opCtx->setAlwaysInterruptAtStepDownOrUp_UNSAFE(); + auto opCtx = _threadOpCtxHolder.get(); ON_BLOCK_EXIT([this]() { - stdx::unique_lock<Latch> lock(_mutex); + stdx::lock_guard<Latch> lock(_mutex); _threadOpCtxHolder.reset(); }); - while (opCtx->checkForInterruptNoAssert().isOK()) { + while (!_stopRequested()) { { stdx::unique_lock<Latch> lock(_mutex); try { @@ -223,7 +268,7 @@ void RangeDeleterService::ReadyRangeDeletionsProcessor::_runRangeDeletions() { // Release the thread only in case the operation context has been interrupted, as // interruption only happens on shutdown/stepdown (this is fine because range // deletions will be resumed on the next step up) - if (!opCtx->checkForInterruptNoAssert().isOK()) { + if (_stopRequested()) { break; } @@ -237,6 +282,17 @@ void RangeDeleterService::ReadyRangeDeletionsProcessor::_runRangeDeletions() { } } +void RangeDeleterService::onStartup(OperationContext* opCtx) { + if (disableResumableRangeDeleter.load() || + !feature_flags::gRangeDeleterService.isEnabledAndIgnoreFCV()) { + return; + } + + auto opObserverRegistry = + checked_cast<OpObserverRegistry*>(opCtx->getServiceContext()->getOpObserver()); + opObserverRegistry->addObserver(std::make_unique<RangeDeleterServiceOpObserver>()); +} + void RangeDeleterService::onStepUpComplete(OperationContext* opCtx, long long term) { if (!feature_flags::gRangeDeleterService.isEnabledAndIgnoreFCV()) { return; @@ -249,22 +305,14 @@ void RangeDeleterService::onStepUpComplete(OperationContext* opCtx, long long te return; } + // Wait until all tasks and thread from previous term drain + _joinAndResetState(); + auto lock = _acquireMutexUnconditionally(); dassert(_state == kDown, "Service expected to be down before stepping up"); _state = kInitializing; - if (_executor) { - // Join previously shutted down executor before reinstantiating it - _executor->join(); - _executor.reset(); - } else { - // Initializing the op observer, only executed once at the first step-up - auto opObserverRegistry = - checked_cast<OpObserverRegistry*>(opCtx->getServiceContext()->getOpObserver()); - opObserverRegistry->addObserver(std::make_unique<RangeDeleterServiceOpObserver>()); - } - const std::string kExecName("RangeDeleterServiceExecutor"); auto net = executor::makeNetworkInterface(kExecName); auto pool = std::make_unique<executor::NetworkInterfaceThreadPool>(net.get()); @@ -303,9 +351,14 @@ void RangeDeleterService::_recoverRangeDeletionsOnStepUp(OperationContext* opCtx LOGV2(6834800, "Resubmitting range deletion tasks"); - ScopedRangeDeleterLock rangeDeleterLock(opCtx); - DBDirectClient client(opCtx); + // The Scoped lock is needed to serialize with concurrent range deletions + ScopedRangeDeleterLock rangeDeleterLock(opCtx, MODE_S); + // The collection lock is needed to serialize with donors trying to + // schedule local range deletions by updating the 'pending' field + AutoGetCollection rangeDeletionLock( + opCtx, NamespaceString::kRangeDeletionNamespace, MODE_S); + DBDirectClient client(opCtx); int nRescheduledTasks = 0; // (1) register range deletion tasks marked as "processing" @@ -370,47 +423,55 @@ void RangeDeleterService::_recoverRangeDeletionsOnStepUp(OperationContext* opCtx .semi(); } -void RangeDeleterService::_stopService(bool joinExecutor) { - if (!feature_flags::gRangeDeleterService.isEnabledAndIgnoreFCV()) { - return; - } +void RangeDeleterService::_joinAndResetState() { + invariant(_state == kDown); + // Join the thread spawned on step-up to resume range deletions + _stepUpCompletedFuture.getNoThrow().ignore(); - { - auto lock = _acquireMutexUnconditionally(); - _state = kDown; - if (_initOpCtxHolder) { - stdx::lock_guard<Client> lk(*_initOpCtxHolder->getClient()); - _initOpCtxHolder->markKilled(ErrorCodes::Interrupted); - } + // Join and destruct the executor + if (_executor) { + _executor->join(); + _executor.reset(); } - // Join the thread spawned on step-up to resume range deletions - _stepUpCompletedFuture.getNoThrow().ignore(); + // Join and destruct the processor + _readyRangeDeletionsProcessorPtr.reset(); + + // Clear range deletions potentially created during recovery + _rangeDeletionTasks.clear(); +} +void RangeDeleterService::_stopService() { auto lock = _acquireMutexUnconditionally(); + if (_state == kDown) + return; + + _state = kDown; + if (_initOpCtxHolder) { + stdx::lock_guard<Client> lk(*_initOpCtxHolder->getClient()); + _initOpCtxHolder->markKilled(ErrorCodes::Interrupted); + } - // It may happen for the `onStepDown` hook to be invoked on a SECONDARY node transitioning - // to ROLLBACK, hence the executor may have never been initialized if (_executor) { _executor->shutdown(); - if (joinExecutor) { - _executor->join(); - } } - // Destroy the range deletion processor in order to stop range deletions - _readyRangeDeletionsProcessorPtr.reset(); + // Shutdown the range deletion processor to interrupt range deletions + if (_readyRangeDeletionsProcessorPtr) { + _readyRangeDeletionsProcessorPtr->shutdown(); + } // Clear range deletion tasks map in order to notify potential waiters on completion futures _rangeDeletionTasks.clear(); } void RangeDeleterService::onStepDown() { - _stopService(false /* joinExecutor */); + _stopService(); } void RangeDeleterService::onShutdown() { - _stopService(true /* joinExecutor */); + _stopService(); + _joinAndResetState(); } BSONObj RangeDeleterService::dumpState() { @@ -472,10 +533,9 @@ SharedSemiFuture<void> RangeDeleterService::registerTask( .then([this, rdt = rdt]() { // Step 3: schedule the actual range deletion task auto lock = _acquireMutexUnconditionally(); - invariant( - _readyRangeDeletionsProcessorPtr || _state == kDown, - "The range deletions processor must be instantiated if the state != kDown"); if (_state != kDown) { + invariant(_readyRangeDeletionsProcessorPtr, + "The range deletions processor is not initialized"); _readyRangeDeletionsProcessorPtr->emplaceRangeDeletion(rdt); } }); |