diff options
Diffstat (limited to 'src/mongo/db')
-rw-r--r-- | src/mongo/db/s/range_deleter_service.cpp | 94 |
1 files changed, 33 insertions, 61 deletions
diff --git a/src/mongo/db/s/range_deleter_service.cpp b/src/mongo/db/s/range_deleter_service.cpp index 1137f464b6a..7b85e07e68f 100644 --- a/src/mongo/db/s/range_deleter_service.cpp +++ b/src/mongo/db/s/range_deleter_service.cpp @@ -439,75 +439,47 @@ SharedSemiFuture<void> RangeDeleterService::registerTask( .share(); } - // Block the scheduling of the task while populating internal data structures - SharedPromise<void> blockUntilRegistered; - - (void)blockUntilRegistered.getFuture() - .semi() - .thenRunOn(_executor) - .onError([serializedTask = rdt.toBSON()](Status errStatus) { - // The above futures can only fail with those specific codes (futures notifying - // the end of ongoing queries on a range will never be set to an error): - // - 67635: the task was already previously scheduled - // - BrokenPromise: the executor is shutting down - // - Cancellation error: the node is shutting down or a stepdown happened - if (errStatus.code() != 67635 && errStatus != ErrorCodes::BrokenPromise && - !ErrorCodes::isCancellationError(errStatus)) { - LOGV2_ERROR(6784800, - "Range deletion scheduling failed with unexpected error", - "error"_attr = errStatus, - "rangeDeletion"_attr = serializedTask); - } - return errStatus; - }) - .then([waitForOngoingQueries = std::move(waitForActiveQueriesToComplete).share()]() { - // Step 1: wait for ongoing queries retaining the range to drain - return waitForOngoingQueries; - }) - .then([this, when = rdt.getWhenToClean()]() { - // Step 2: schedule wait for secondaries orphans cleanup delay - const auto delayForActiveQueriesOnSecondariesToComplete = - when == CleanWhenEnum::kDelayed ? Seconds(orphanCleanupDelaySecs.load()) - : Seconds(0); - - return sleepUntil(_executor, - _executor->now() + delayForActiveQueriesOnSecondariesToComplete) - .share(); - }) - .then([this, rdt = rdt]() { - // Step 3: schedule the actual range deletion task - auto lock = _acquireMutexUnconditionally(); - invariant(_readyRangeDeletionsProcessorPtr || _state == kDown, - "The range deletions processor must be instantiated if the state != kDown"); - if (_state != kDown) { - _readyRangeDeletionsProcessorPtr->emplaceRangeDeletion(rdt); - } - }); - - auto [taskCompletionFuture, inserted] = [&]() -> std::pair<SharedSemiFuture<void>, bool> { - auto lock = fromResubmitOnStepUp ? _acquireMutexUnconditionally() - : _acquireMutexFailIfServiceNotUp(); - auto [registeredTask, inserted] = _rangeDeletionTasks[rdt.getCollectionUuid()].insert( - std::make_shared<RangeDeletion>(rdt)); - auto retFuture = static_cast<RangeDeletion*>(registeredTask->get())->getCompletionFuture(); - return {retFuture, inserted}; - }(); + auto scheduleRangeDeletionChain = [&]() { + // Step 1: wait for ongoing queries retaining the range to drain + (void)std::move(waitForActiveQueriesToComplete) + .thenRunOn(_executor) + .then([this, when = rdt.getWhenToClean()]() { + // Step 2: schedule wait for secondaries orphans cleanup delay + const auto delayForActiveQueriesOnSecondariesToComplete = + when == CleanWhenEnum::kDelayed ? Seconds(orphanCleanupDelaySecs.load()) + : Seconds(0); + + return sleepUntil(_executor, + _executor->now() + delayForActiveQueriesOnSecondariesToComplete) + .share(); + }) + .then([this, rdt = rdt]() { + // Step 3: schedule the actual range deletion task + auto lock = _acquireMutexUnconditionally(); + invariant( + _readyRangeDeletionsProcessorPtr || _state == kDown, + "The range deletions processor must be instantiated if the state != kDown"); + if (_state != kDown) { + _readyRangeDeletionsProcessorPtr->emplaceRangeDeletion(rdt); + } + }); + }; + + auto lock = + fromResubmitOnStepUp ? _acquireMutexUnconditionally() : _acquireMutexFailIfServiceNotUp(); + auto [registeredTask, firstRegistration] = + _rangeDeletionTasks[rdt.getCollectionUuid()].insert(std::make_shared<RangeDeletion>(rdt)); - if (inserted) { - // The range deletion task has been registered, so the chain execution can be unblocked - blockUntilRegistered.setFrom(Status::OK()); + if (firstRegistration) { + scheduleRangeDeletionChain(); } else { - // Tried to register a duplicate range deletion task: invalidate the chain - auto errStatus = - Status(ErrorCodes::Error(67635), "Not scheduling duplicated range deletion"); LOGV2_WARNING(6804200, "Tried to register duplicate range deletion task. This results in a no-op.", "collectionUUID"_attr = rdt.getCollectionUuid(), "range"_attr = rdt.getRange()); - blockUntilRegistered.setFrom(errStatus); } - return taskCompletionFuture; + return static_cast<RangeDeletion*>(registeredTask->get())->getCompletionFuture(); } void RangeDeleterService::deregisterTask(const UUID& collUUID, const ChunkRange& range) { |