summaryrefslogtreecommitdiff
path: root/src/mongo/db/s/range_deleter_service.cpp
diff options
context:
space:
mode:
authorPierlauro Sciarelli <pierlauro.sciarelli@mongodb.com>2022-09-29 13:02:26 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2022-09-29 14:06:07 +0000
commit71a59429935cc8cb11f627ee137e111124ef21f9 (patch)
tree174b4644abd3df6e0130fc6cf59fe494e729a4c3 /src/mongo/db/s/range_deleter_service.cpp
parent0b24ce9b359574b8f695fce8b4d9b80fdcaa4a10 (diff)
downloadmongo-71a59429935cc8cb11f627ee137e111124ef21f9.tar.gz
SERVER-70094 Synchronize shutdown with resuming of range deletions
Diffstat (limited to 'src/mongo/db/s/range_deleter_service.cpp')
-rw-r--r--src/mongo/db/s/range_deleter_service.cpp142
1 files changed, 73 insertions, 69 deletions
diff --git a/src/mongo/db/s/range_deleter_service.cpp b/src/mongo/db/s/range_deleter_service.cpp
index 09c35f1fffb..170e32ec944 100644
--- a/src/mongo/db/s/range_deleter_service.cpp
+++ b/src/mongo/db/s/range_deleter_service.cpp
@@ -287,81 +287,82 @@ void RangeDeleterService::_recoverRangeDeletionsOnStepUp(OperationContext* opCtx
ServiceContext* serviceContext = opCtx->getServiceContext();
- ExecutorFuture<void>(_executor)
- .then([serviceContext, this] {
- ThreadClient tc("ResubmitRangeDeletionsOnStepUp", serviceContext);
- {
- stdx::lock_guard<Client> lk(*tc.get());
- tc->setSystemOperationKillableByStepdown(lk);
- }
- auto opCtx = tc->makeOperationContext();
- opCtx->setAlwaysInterruptAtStepDownOrUp_UNSAFE();
-
- ScopedRangeDeleterLock rangeDeleterLock(opCtx.get());
- DBDirectClient client(opCtx.get());
-
- int nRescheduledTasks = 0;
-
- // (1) register range deletion tasks marked as "processing"
- auto processingTasksCompletionFuture = [&] {
- std::vector<ExecutorFuture<void>> processingTasksCompletionFutures;
- FindCommandRequest findCommand(NamespaceString::kRangeDeletionNamespace);
- findCommand.setFilter(BSON(RangeDeletionTask::kProcessingFieldName << true));
- auto cursor = client.find(std::move(findCommand));
-
- while (cursor->more()) {
- auto completionFuture = this->registerTask(
- RangeDeletionTask::parse(IDLParserContext("rangeDeletionRecovery"),
- cursor->next()),
- SemiFuture<void>::makeReady(),
- true /* fromResubmitOnStepUp */);
- nRescheduledTasks++;
- processingTasksCompletionFutures.push_back(
- completionFuture.thenRunOn(_executor));
+ _stepUpCompletedFuture =
+ ExecutorFuture<void>(_executor)
+ .then([serviceContext, this] {
+ ThreadClient tc("ResubmitRangeDeletionsOnStepUp", serviceContext);
+ {
+ stdx::lock_guard<Client> lk(*tc.get());
+ tc->setSystemOperationKillableByStepdown(lk);
}
+ auto opCtx = tc->makeOperationContext();
+ opCtx->setAlwaysInterruptAtStepDownOrUp_UNSAFE();
+
+ ScopedRangeDeleterLock rangeDeleterLock(opCtx.get());
+ DBDirectClient client(opCtx.get());
+
+ int nRescheduledTasks = 0;
+
+ // (1) register range deletion tasks marked as "processing"
+ auto processingTasksCompletionFuture = [&] {
+ std::vector<ExecutorFuture<void>> processingTasksCompletionFutures;
+ FindCommandRequest findCommand(NamespaceString::kRangeDeletionNamespace);
+ findCommand.setFilter(BSON(RangeDeletionTask::kProcessingFieldName << true));
+ auto cursor = client.find(std::move(findCommand));
+
+ while (cursor->more()) {
+ auto completionFuture = this->registerTask(
+ RangeDeletionTask::parse(IDLParserContext("rangeDeletionRecovery"),
+ cursor->next()),
+ SemiFuture<void>::makeReady(),
+ true /* fromResubmitOnStepUp */);
+ nRescheduledTasks++;
+ processingTasksCompletionFutures.push_back(
+ completionFuture.thenRunOn(_executor));
+ }
- if (nRescheduledTasks > 1) {
- LOGV2_WARNING(6834801,
- "Rescheduling several range deletions marked as processing. "
- "Orphans count may be off while they are not drained",
- "numRangeDeletionsMarkedAsProcessing"_attr = nRescheduledTasks);
- }
+ if (nRescheduledTasks > 1) {
+ LOGV2_WARNING(6834801,
+ "Rescheduling several range deletions marked as processing. "
+ "Orphans count may be off while they are not drained",
+ "numRangeDeletionsMarkedAsProcessing"_attr =
+ nRescheduledTasks);
+ }
+
+ return processingTasksCompletionFutures.size() > 0
+ ? whenAllSucceed(std::move(processingTasksCompletionFutures)).share()
+ : SemiFuture<void>::makeReady().share();
+ }();
- return processingTasksCompletionFutures.size() > 0
- ? whenAllSucceed(std::move(processingTasksCompletionFutures)).share()
- : SemiFuture<void>::makeReady().share();
- }();
-
- // (2) register all other "non-pending" tasks
- {
- FindCommandRequest findCommand(NamespaceString::kRangeDeletionNamespace);
- findCommand.setFilter(BSON(RangeDeletionTask::kProcessingFieldName
- << BSON("$ne" << true)
- << RangeDeletionTask::kPendingFieldName
- << BSON("$ne" << true)));
- auto cursor = client.find(std::move(findCommand));
- while (cursor->more()) {
- (void)this->registerTask(
- RangeDeletionTask::parse(IDLParserContext("rangeDeletionRecovery"),
- cursor->next()),
- processingTasksCompletionFuture.thenRunOn(_executor).semi(),
- true /* fromResubmitOnStepUp */);
+ // (2) register all other "non-pending" tasks
+ {
+ FindCommandRequest findCommand(NamespaceString::kRangeDeletionNamespace);
+ findCommand.setFilter(BSON(RangeDeletionTask::kProcessingFieldName
+ << BSON("$ne" << true)
+ << RangeDeletionTask::kPendingFieldName
+ << BSON("$ne" << true)));
+ auto cursor = client.find(std::move(findCommand));
+ while (cursor->more()) {
+ (void)this->registerTask(
+ RangeDeletionTask::parse(IDLParserContext("rangeDeletionRecovery"),
+ cursor->next()),
+ processingTasksCompletionFuture.thenRunOn(_executor).semi(),
+ true /* fromResubmitOnStepUp */);
+ }
}
- }
- LOGV2_INFO(6834802,
- "Finished resubmitting range deletion tasks",
- "nRescheduledTasks"_attr = nRescheduledTasks);
+ LOGV2_INFO(6834802,
+ "Finished resubmitting range deletion tasks",
+ "nRescheduledTasks"_attr = nRescheduledTasks);
- auto lock = _acquireMutexUnconditionally();
- // Since the recovery is only spawned on step-up but may complete later, it's not
- // assumable that the node is still primary when the all resubmissions finish
- if (_state.load() != kDown) {
- this->_rangeDeleterServiceUpCondVar_FOR_TESTING.notify_all();
- this->_state.store(kUp);
- }
- })
- .getAsync([](auto) {});
+ auto lock = _acquireMutexUnconditionally();
+ // Since the recovery is only spawned on step-up but may complete later, it's not
+ // assumable that the node is still primary when the all resubmissions finish
+ if (_state.load() != kDown) {
+ this->_state.store(kUp);
+ }
+ })
+ .semi();
}
void RangeDeleterService::_stopService(bool joinExecutor) {
@@ -369,6 +370,9 @@ void RangeDeleterService::_stopService(bool joinExecutor) {
return;
}
+ // Join the thread spawned on step-up to resume range deletions
+ _stepUpCompletedFuture.getNoThrow().ignore();
+
auto lock = _acquireMutexUnconditionally();
// It may happen for the `onStepDown` hook to be invoked on a SECONDARY node transitioning