summaryrefslogtreecommitdiff
path: root/src/mongo/db/s/range_deleter_service.cpp
diff options
context:
space:
mode:
authorPierlauro Sciarelli <pierlauro.sciarelli@mongodb.com>2022-09-12 15:39:28 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2022-09-12 16:47:44 +0000
commit727d3de09c9e38206cbde56c8a431f6e7b480da6 (patch)
treecdd2dbd0544449f99cc59984593121e6dc3351f4 /src/mongo/db/s/range_deleter_service.cpp
parentc7f9bb9676a8859a21aaea5fdd81e23a4e9d79e8 (diff)
downloadmongo-727d3de09c9e38206cbde56c8a431f6e7b480da6.tar.gz
SERVER-68348 Asynchronously register tasks on the range deleter service on step-up
Diffstat (limited to 'src/mongo/db/s/range_deleter_service.cpp')
-rw-r--r--src/mongo/db/s/range_deleter_service.cpp103
1 files changed, 94 insertions, 9 deletions
diff --git a/src/mongo/db/s/range_deleter_service.cpp b/src/mongo/db/s/range_deleter_service.cpp
index f90618421d5..a24e14082f9 100644
--- a/src/mongo/db/s/range_deleter_service.cpp
+++ b/src/mongo/db/s/range_deleter_service.cpp
@@ -28,10 +28,13 @@
*/
#include "mongo/db/s/range_deleter_service.h"
+#include "mongo/db/dbdirectclient.h"
#include "mongo/db/op_observer/op_observer_registry.h"
+#include "mongo/db/s/balancer_stats_registry.h"
#include "mongo/db/s/range_deleter_service_op_observer.h"
#include "mongo/logv2/log.h"
#include "mongo/s/sharding_feature_flags_gen.h"
+#include "mongo/util/future_util.h"
#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kShardingRangeDeleter
@@ -83,18 +86,94 @@ void RangeDeleterService::onStepUpComplete(OperationContext* opCtx, long long te
_executor = std::move(taskExecutor);
_executor->startup();
- _recoverRangeDeletionsOnStepUp();
+ _recoverRangeDeletionsOnStepUp(opCtx);
}
-void RangeDeleterService::_recoverRangeDeletionsOnStepUp() {
-
+void RangeDeleterService::_recoverRangeDeletionsOnStepUp(OperationContext* opCtx) {
if (disableResumableRangeDeleter.load()) {
_state.store(kDown);
return;
}
- // TODO SERVER-68348 Asynchronously register tasks on the range deleter service on step-up
- _state.store(kUp);
+ LOGV2(6834800, "Resubmitting range deletion tasks");
+
+ ServiceContext* serviceContext = opCtx->getServiceContext();
+
+ ExecutorFuture<void>(_executor)
+ .then([serviceContext, this] {
+ ThreadClient tc("ResubmitRangeDeletionsOnStepUp", serviceContext);
+ {
+ stdx::lock_guard<Client> lk(*tc.get());
+ tc->setSystemOperationKillableByStepdown(lk);
+ }
+ auto opCtx = tc->makeOperationContext();
+ opCtx->setAlwaysInterruptAtStepDownOrUp_UNSAFE();
+
+ ScopedRangeDeleterLock rangeDeleterLock(opCtx.get());
+ DBDirectClient client(opCtx.get());
+
+ int nRescheduledTasks = 0;
+
+ // (1) register range deletion tasks marked as "processing"
+ auto processingTasksCompletionFuture = [&] {
+ std::vector<ExecutorFuture<void>> processingTasksCompletionFutures;
+ FindCommandRequest findCommand(NamespaceString::kRangeDeletionNamespace);
+ findCommand.setFilter(BSON(RangeDeletionTask::kProcessingFieldName << true));
+ auto cursor = client.find(std::move(findCommand));
+
+ while (cursor->more()) {
+ auto completionFuture = this->registerTask(
+ RangeDeletionTask::parse(IDLParserContext("rangeDeletionRecovery"),
+ cursor->next()),
+ SemiFuture<void>::makeReady(),
+ true /* fromResubmitOnStepUp */);
+ nRescheduledTasks++;
+ processingTasksCompletionFutures.push_back(
+ completionFuture.thenRunOn(_executor));
+ }
+
+ if (nRescheduledTasks > 1) {
+ LOGV2_WARNING(6834801,
+ "Rescheduling several range deletions marked as processing. "
+ "Orphans count may be off while they are not drained",
+ "numRangeDeletionsMarkedAsProcessing"_attr = nRescheduledTasks);
+ }
+
+ return processingTasksCompletionFutures.size() > 0
+ ? whenAllSucceed(std::move(processingTasksCompletionFutures)).share()
+ : SemiFuture<void>::makeReady().share();
+ }();
+
+ // (2) register all other "non-pending" tasks
+ {
+ FindCommandRequest findCommand(NamespaceString::kRangeDeletionNamespace);
+ findCommand.setFilter(BSON(RangeDeletionTask::kProcessingFieldName
+ << BSON("$ne" << true)
+ << RangeDeletionTask::kPendingFieldName
+ << BSON("$ne" << true)));
+ auto cursor = client.find(std::move(findCommand));
+ while (cursor->more()) {
+ (void)this->registerTask(
+ RangeDeletionTask::parse(IDLParserContext("rangeDeletionRecovery"),
+ cursor->next()),
+ processingTasksCompletionFuture.thenRunOn(_executor).semi(),
+ true /* fromResubmitOnStepUp */);
+ }
+ }
+
+ LOGV2_INFO(6834802,
+ "Finished resubmitting range deletion tasks",
+ "nRescheduledTasks"_attr = nRescheduledTasks);
+
+ auto lock = _acquireMutexUnconditionally();
+ // Since the recovery is only spawned on step-up but may complete later, it's not
+ // assumable that the node is still primary when the all resubmissions finish
+ if (_state.load() != kDown) {
+ this->_rangeDeleterServiceUpCondVar_FOR_TESTING.notify_all();
+ this->_state.store(kUp);
+ }
+ })
+ .getAsync([](auto) {});
}
void RangeDeleterService::onStepDown() {
@@ -103,9 +182,12 @@ void RangeDeleterService::onStepDown() {
}
auto lock = _acquireMutexUnconditionally();
- dassert(_state.load() != kDown, "Service expected to be initializing/up before stepping down");
- _executor->shutdown();
+ // It may happen for the `onStepDown` hook to be invoked on a SECONDARY node transitioning
+ // to ROLLBACK, hence the executor may have never been initialized
+ if (_executor) {
+ _executor->shutdown();
+ }
_state.store(kDown);
}
@@ -149,7 +231,9 @@ long long RangeDeleterService::totalNumOfRegisteredTasks() {
}
SharedSemiFuture<void> RangeDeleterService::registerTask(
- const RangeDeletionTask& rdt, SemiFuture<void>&& waitForActiveQueriesToComplete) {
+ const RangeDeletionTask& rdt,
+ SemiFuture<void>&& waitForActiveQueriesToComplete,
+ bool fromResubmitOnStepUp) {
if (disableResumableRangeDeleter.load()) {
return SemiFuture<void>::makeReady(
@@ -207,7 +291,8 @@ SharedSemiFuture<void> RangeDeleterService::registerTask(
.share();
auto [taskCompletionFuture, inserted] = [&]() -> std::pair<SharedSemiFuture<void>, bool> {
- auto lock = _acquireMutexFailIfServiceNotUp();
+ auto lock = fromResubmitOnStepUp ? _acquireMutexUnconditionally()
+ : _acquireMutexFailIfServiceNotUp();
auto [registeredTask, inserted] = _rangeDeletionTasks[rdt.getCollectionUuid()].insert(
std::make_shared<RangeDeletion>(RangeDeletion(rdt, chainCompletionFuture)));
auto retFuture = static_cast<RangeDeletion*>(registeredTask->get())->getCompletionFuture();