summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPierlauro Sciarelli <pierlauro.sciarelli@mongodb.com>2022-09-29 17:18:33 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2022-09-29 18:04:46 +0000
commit646eed48d0da896588759030f2ec546ac6fbbd48 (patch)
treedefcf73ef27c36ea3231f2c182f455050715d576
parent2b311888e92e070a2fdb48867695ae4c7cc63873 (diff)
downloadmongo-646eed48d0da896588759030f2ec546ac6fbbd48.tar.gz
SERVER-68774 Simplify `RangeDeleterService::registerTask` by removing `blockUntilRegistered`
-rw-r--r--src/mongo/db/s/range_deleter_service.cpp94
1 files changed, 33 insertions, 61 deletions
diff --git a/src/mongo/db/s/range_deleter_service.cpp b/src/mongo/db/s/range_deleter_service.cpp
index 1137f464b6a..7b85e07e68f 100644
--- a/src/mongo/db/s/range_deleter_service.cpp
+++ b/src/mongo/db/s/range_deleter_service.cpp
@@ -439,75 +439,47 @@ SharedSemiFuture<void> RangeDeleterService::registerTask(
.share();
}
- // Block the scheduling of the task while populating internal data structures
- SharedPromise<void> blockUntilRegistered;
-
- (void)blockUntilRegistered.getFuture()
- .semi()
- .thenRunOn(_executor)
- .onError([serializedTask = rdt.toBSON()](Status errStatus) {
- // The above futures can only fail with those specific codes (futures notifying
- // the end of ongoing queries on a range will never be set to an error):
- // - 67635: the task was already previously scheduled
- // - BrokenPromise: the executor is shutting down
- // - Cancellation error: the node is shutting down or a stepdown happened
- if (errStatus.code() != 67635 && errStatus != ErrorCodes::BrokenPromise &&
- !ErrorCodes::isCancellationError(errStatus)) {
- LOGV2_ERROR(6784800,
- "Range deletion scheduling failed with unexpected error",
- "error"_attr = errStatus,
- "rangeDeletion"_attr = serializedTask);
- }
- return errStatus;
- })
- .then([waitForOngoingQueries = std::move(waitForActiveQueriesToComplete).share()]() {
- // Step 1: wait for ongoing queries retaining the range to drain
- return waitForOngoingQueries;
- })
- .then([this, when = rdt.getWhenToClean()]() {
- // Step 2: schedule wait for secondaries orphans cleanup delay
- const auto delayForActiveQueriesOnSecondariesToComplete =
- when == CleanWhenEnum::kDelayed ? Seconds(orphanCleanupDelaySecs.load())
- : Seconds(0);
-
- return sleepUntil(_executor,
- _executor->now() + delayForActiveQueriesOnSecondariesToComplete)
- .share();
- })
- .then([this, rdt = rdt]() {
- // Step 3: schedule the actual range deletion task
- auto lock = _acquireMutexUnconditionally();
- invariant(_readyRangeDeletionsProcessorPtr || _state == kDown,
- "The range deletions processor must be instantiated if the state != kDown");
- if (_state != kDown) {
- _readyRangeDeletionsProcessorPtr->emplaceRangeDeletion(rdt);
- }
- });
-
- auto [taskCompletionFuture, inserted] = [&]() -> std::pair<SharedSemiFuture<void>, bool> {
- auto lock = fromResubmitOnStepUp ? _acquireMutexUnconditionally()
- : _acquireMutexFailIfServiceNotUp();
- auto [registeredTask, inserted] = _rangeDeletionTasks[rdt.getCollectionUuid()].insert(
- std::make_shared<RangeDeletion>(rdt));
- auto retFuture = static_cast<RangeDeletion*>(registeredTask->get())->getCompletionFuture();
- return {retFuture, inserted};
- }();
+ auto scheduleRangeDeletionChain = [&]() {
+ // Step 1: wait for ongoing queries retaining the range to drain
+ (void)std::move(waitForActiveQueriesToComplete)
+ .thenRunOn(_executor)
+ .then([this, when = rdt.getWhenToClean()]() {
+ // Step 2: schedule wait for secondaries orphans cleanup delay
+ const auto delayForActiveQueriesOnSecondariesToComplete =
+ when == CleanWhenEnum::kDelayed ? Seconds(orphanCleanupDelaySecs.load())
+ : Seconds(0);
+
+ return sleepUntil(_executor,
+ _executor->now() + delayForActiveQueriesOnSecondariesToComplete)
+ .share();
+ })
+ .then([this, rdt = rdt]() {
+ // Step 3: schedule the actual range deletion task
+ auto lock = _acquireMutexUnconditionally();
+ invariant(
+ _readyRangeDeletionsProcessorPtr || _state == kDown,
+ "The range deletions processor must be instantiated if the state != kDown");
+ if (_state != kDown) {
+ _readyRangeDeletionsProcessorPtr->emplaceRangeDeletion(rdt);
+ }
+ });
+ };
+
+ auto lock =
+ fromResubmitOnStepUp ? _acquireMutexUnconditionally() : _acquireMutexFailIfServiceNotUp();
+ auto [registeredTask, firstRegistration] =
+ _rangeDeletionTasks[rdt.getCollectionUuid()].insert(std::make_shared<RangeDeletion>(rdt));
- if (inserted) {
- // The range deletion task has been registered, so the chain execution can be unblocked
- blockUntilRegistered.setFrom(Status::OK());
+ if (firstRegistration) {
+ scheduleRangeDeletionChain();
} else {
- // Tried to register a duplicate range deletion task: invalidate the chain
- auto errStatus =
- Status(ErrorCodes::Error(67635), "Not scheduling duplicated range deletion");
LOGV2_WARNING(6804200,
"Tried to register duplicate range deletion task. This results in a no-op.",
"collectionUUID"_attr = rdt.getCollectionUuid(),
"range"_attr = rdt.getRange());
- blockUntilRegistered.setFrom(errStatus);
}
- return taskCompletionFuture;
+ return static_cast<RangeDeletion*>(registeredTask->get())->getCompletionFuture();
}
void RangeDeleterService::deregisterTask(const UUID& collUUID, const ChunkRange& range) {