diff options
author | Max Hirschhorn <max.hirschhorn@mongodb.com> | 2022-09-10 15:55:51 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2022-09-10 16:34:21 +0000 |
commit | fba24ea907dd3220fd2e3149c6ac81fd67b16254 (patch) | |
tree | 5c9a44373b9add8a6693dafd73424656a3569853 | |
parent | 27c853347660310304d6fe2a3870a9a083aba953 (diff) | |
download | mongo-fba24ea907dd3220fd2e3149c6ac81fd67b16254.tar.gz |
Revert "SERVER-68348 Asynchronously register tasks on the range deleter service on step-up"
This reverts commit 30583a2c505c03e4d54bbb14cab170b225c071f2.
-rw-r--r-- | src/mongo/db/s/range_deleter_service.cpp | 90 | ||||
-rw-r--r-- | src/mongo/db/s/range_deleter_service.h | 5 | ||||
-rw-r--r-- | src/mongo/db/s/range_deleter_service_test.cpp | 56 |
3 files changed, 9 insertions, 142 deletions
diff --git a/src/mongo/db/s/range_deleter_service.cpp b/src/mongo/db/s/range_deleter_service.cpp index acc97caeea9..63a2fb33cf8 100644 --- a/src/mongo/db/s/range_deleter_service.cpp +++ b/src/mongo/db/s/range_deleter_service.cpp @@ -28,13 +28,10 @@ */ #include "mongo/db/s/range_deleter_service.h" -#include "mongo/db/dbdirectclient.h" #include "mongo/db/op_observer/op_observer_registry.h" -#include "mongo/db/s/balancer_stats_registry.h" #include "mongo/db/s/range_deleter_service_op_observer.h" #include "mongo/logv2/log.h" #include "mongo/s/sharding_feature_flags_gen.h" -#include "mongo/util/future_util.h" #define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kShardingRangeDeleter @@ -86,88 +83,18 @@ void RangeDeleterService::onStepUpComplete(OperationContext* opCtx, long long te _executor = std::move(taskExecutor); _executor->startup(); - _recoverRangeDeletionsOnStepUp(opCtx); + _recoverRangeDeletionsOnStepUp(); } -void RangeDeleterService::_recoverRangeDeletionsOnStepUp(OperationContext* opCtx) { +void RangeDeleterService::_recoverRangeDeletionsOnStepUp() { + if (disableResumableRangeDeleter.load()) { _state.store(kDown); return; } - LOGV2(6834800, "Resubmitting range deletion tasks"); - - ServiceContext* serviceContext = opCtx->getServiceContext(); - - ExecutorFuture<void>(_executor) - .then([serviceContext, this] { - ThreadClient tc("ResubmitRangeDeletionsOnStepUp", serviceContext); - { - stdx::lock_guard<Client> lk(*tc.get()); - tc->setSystemOperationKillableByStepdown(lk); - } - auto opCtx = tc->makeOperationContext(); - opCtx->setAlwaysInterruptAtStepDownOrUp_UNSAFE(); - - ScopedRangeDeleterLock rangeDeleterLock(opCtx.get()); - DBDirectClient client(opCtx.get()); - - int nRescheduledTasks = 0; - - // (1) register range deletion tasks marked as "processing" - auto processingTasksCompletionFuture = [&] { - std::vector<ExecutorFuture<void>> processingTasksCompletionFutures; - FindCommandRequest findCommand(NamespaceString::kRangeDeletionNamespace); - findCommand.setFilter(BSON(RangeDeletionTask::kProcessingFieldName << true)); - auto cursor = client.find(std::move(findCommand)); - - while (cursor->more()) { - auto completionFuture = this->registerTask( - RangeDeletionTask::parse(IDLParserContext("rangeDeletionRecovery"), - cursor->next()), - SemiFuture<void>::makeReady(), - true /* fromResubmitOnStepUp */); - nRescheduledTasks++; - processingTasksCompletionFutures.push_back( - completionFuture.thenRunOn(_executor)); - } - - if (nRescheduledTasks > 1) { - LOGV2_WARNING(6834801, - "Rescheduling several range deletions marked as processing. " - "Orphans count may be off while they are not drained", - "numRangeDeletionsMarkedAsProcessing"_attr = nRescheduledTasks); - } - - return processingTasksCompletionFutures.size() > 0 - ? whenAllSucceed(std::move(processingTasksCompletionFutures)).share() - : SemiFuture<void>::makeReady().share(); - }(); - - // (2) register all other "non-pending" tasks - { - FindCommandRequest findCommand(NamespaceString::kRangeDeletionNamespace); - findCommand.setFilter(BSON(RangeDeletionTask::kProcessingFieldName - << BSON("$ne" << true) - << RangeDeletionTask::kPendingFieldName - << BSON("$ne" << true))); - auto cursor = client.find(std::move(findCommand)); - while (cursor->more()) { - (void)this->registerTask( - RangeDeletionTask::parse(IDLParserContext("rangeDeletionRecovery"), - cursor->next()), - processingTasksCompletionFuture.thenRunOn(_executor).semi(), - true /* fromResubmitOnStepUp */); - } - } - - LOGV2_INFO(6834802, - "Finished resubmitting range deletion tasks", - "nRescheduledTasks"_attr = nRescheduledTasks); - - this->_state.store(kUp); - }) - .getAsync([](auto) {}); + // TODO SERVER-68348 Asynchronously register tasks on the range deleter service on step-up + _state.store(kUp); } void RangeDeleterService::onStepDown() { @@ -207,9 +134,7 @@ long long RangeDeleterService::totalNumOfRegisteredTasks() { } SharedSemiFuture<void> RangeDeleterService::registerTask( - const RangeDeletionTask& rdt, - SemiFuture<void>&& waitForActiveQueriesToComplete, - bool fromResubmitOnStepUp) { + const RangeDeletionTask& rdt, SemiFuture<void>&& waitForActiveQueriesToComplete) { if (disableResumableRangeDeleter.load()) { return SemiFuture<void>::makeReady( @@ -267,8 +192,7 @@ SharedSemiFuture<void> RangeDeleterService::registerTask( .share(); auto [taskCompletionFuture, inserted] = [&]() -> std::pair<SharedSemiFuture<void>, bool> { - auto lock = fromResubmitOnStepUp ? _acquireMutexUnconditionally() - : _acquireMutexFailIfServiceNotUp(); + auto lock = _acquireMutexFailIfServiceNotUp(); auto [registeredTask, inserted] = _rangeDeletionTasks[rdt.getCollectionUuid()].insert( std::make_shared<RangeDeletion>(RangeDeletion(rdt, chainCompletionFuture))); auto retFuture = static_cast<RangeDeletion*>(registeredTask->get())->getCompletionFuture(); diff --git a/src/mongo/db/s/range_deleter_service.h b/src/mongo/db/s/range_deleter_service.h index a946865739c..4b1e09ce919 100644 --- a/src/mongo/db/s/range_deleter_service.h +++ b/src/mongo/db/s/range_deleter_service.h @@ -120,8 +120,7 @@ public: */ SharedSemiFuture<void> registerTask( const RangeDeletionTask& rdt, - SemiFuture<void>&& waitForActiveQueriesToComplete = SemiFuture<void>::makeReady(), - bool fromResubmitOnStepUp = false); + SemiFuture<void>&& waitForActiveQueriesToComplete = SemiFuture<void>::makeReady()); /* * Deregister a task from the range deleter service. @@ -159,7 +158,7 @@ public: private: /* Asynchronously register range deletions on the service. To be called on on step-up */ - void _recoverRangeDeletionsOnStepUp(OperationContext* opCtx); + void _recoverRangeDeletionsOnStepUp(); /* ReplicaSetAwareServiceShardSvr "empty implemented" methods */ void onStartup(OperationContext* opCtx) override final{}; diff --git a/src/mongo/db/s/range_deleter_service_test.cpp b/src/mongo/db/s/range_deleter_service_test.cpp index a6ba43ac540..c359d31725d 100644 --- a/src/mongo/db/s/range_deleter_service_test.cpp +++ b/src/mongo/db/s/range_deleter_service_test.cpp @@ -31,7 +31,6 @@ #include "mongo/bson/unordered_fields_bsonobj_comparator.h" #include "mongo/db/catalog/create_collection.h" #include "mongo/db/catalog_raii.h" -#include "mongo/db/persistent_task_store.h" #include "mongo/db/s/operation_sharding_state.h" namespace mongo { @@ -588,59 +587,4 @@ TEST_F(RangeDeleterServiceTest, ASSERT(overlappingRangeFutureWhenDisabled.isReady()); } -TEST_F(RangeDeleterServiceTest, RescheduleRangeDeletionTasksOnStepUp) { - PseudoRandom random(SecureRandom().nextInt64()); - auto rds = RangeDeleterService::get(opCtx); - - // Trigger step-down - rds->onStepDown(); - - // Random number of range deletion documents to generate (minimum 1, maximum 20). - int nRangeDeletionTasks = random.nextInt32(20) + 1; - - PersistentTaskStore<RangeDeletionTask> store(NamespaceString::kRangeDeletionNamespace); - - // Generate and persist range deleter tasks (some pending, some non-pending, some non-pending && - // processing) - int nPending = 0, nNonPending = 0, nNonPendingAndProcessing = 0; - int minBound = 0; - for (int i = 0; i < nRangeDeletionTasks; i++) { - auto rangeDeletionTask = createRangeDeletionTask( - uuidCollA, BSON("a" << minBound), BSON("a" << minBound + 10), CleanWhenEnum::kDelayed); - minBound += 10; - - auto rand = random.nextInt32() % 3; - if (rand == 0) { - // Pending range deletion task - rangeDeletionTask.setPending(true); - nPending++; - } else if (rand == 1) { - // Non-pending range deletion task - rangeDeletionTask.setPending(false); - nNonPending++; - } else if (rand == 2) { - // Non-pending and processing range deletion task - rangeDeletionTask.setPending(false); - rangeDeletionTask.setProcessing(true); - nNonPendingAndProcessing++; - } - - store.add(opCtx, rangeDeletionTask); - } - - // Trigger step-up - rds->onStepUpComplete(opCtx, 0L); - - // Check that all non-pending tasks are being rescheduled - while (true) { - try { - ASSERT_EQ(nNonPending + nNonPendingAndProcessing, - rds->getNumRangeDeletionTasksForCollection(uuidCollA)); - break; - } catch (const ExceptionFor<ErrorCodes::NotYetInitialized>&) { - // Retry as long as range deletion tasks are being resubmitted - } - } -} - } // namespace mongo |