diff options
author | Pierlauro Sciarelli <pierlauro.sciarelli@mongodb.com> | 2022-09-12 15:39:28 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2022-09-12 16:47:44 +0000 |
commit | 727d3de09c9e38206cbde56c8a431f6e7b480da6 (patch) | |
tree | cdd2dbd0544449f99cc59984593121e6dc3351f4 /src/mongo/db | |
parent | c7f9bb9676a8859a21aaea5fdd81e23a4e9d79e8 (diff) | |
download | mongo-727d3de09c9e38206cbde56c8a431f6e7b480da6.tar.gz |
SERVER-68348 Asynchronously register tasks on the range deleter service on step-up
Diffstat (limited to 'src/mongo/db')
-rw-r--r-- | src/mongo/db/repl/replica_set_aware_service.h | 3 | ||||
-rw-r--r-- | src/mongo/db/s/range_deleter_service.cpp | 103 | ||||
-rw-r--r-- | src/mongo/db/s/range_deleter_service.h | 17 | ||||
-rw-r--r-- | src/mongo/db/s/range_deleter_service_test.cpp | 59 |
4 files changed, 169 insertions, 13 deletions
diff --git a/src/mongo/db/repl/replica_set_aware_service.h b/src/mongo/db/repl/replica_set_aware_service.h index 3f099610a5c..a6b723163c3 100644 --- a/src/mongo/db/repl/replica_set_aware_service.h +++ b/src/mongo/db/repl/replica_set_aware_service.h @@ -148,6 +148,9 @@ public: /** * Called after the node has transitioned out of PRIMARY. Usually this is into SECONDARY, but it * could also be into ROLLBACK or REMOVED. + * + * NB: also called when SECONDARY nodes transition to ROLLBACK, hence it should never be assumed + * that `onStepUp` hooks have been invoked at least once before this method is invoked. */ virtual void onStepDown() = 0; diff --git a/src/mongo/db/s/range_deleter_service.cpp b/src/mongo/db/s/range_deleter_service.cpp index f90618421d5..a24e14082f9 100644 --- a/src/mongo/db/s/range_deleter_service.cpp +++ b/src/mongo/db/s/range_deleter_service.cpp @@ -28,10 +28,13 @@ */ #include "mongo/db/s/range_deleter_service.h" +#include "mongo/db/dbdirectclient.h" #include "mongo/db/op_observer/op_observer_registry.h" +#include "mongo/db/s/balancer_stats_registry.h" #include "mongo/db/s/range_deleter_service_op_observer.h" #include "mongo/logv2/log.h" #include "mongo/s/sharding_feature_flags_gen.h" +#include "mongo/util/future_util.h" #define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kShardingRangeDeleter @@ -83,18 +86,94 @@ void RangeDeleterService::onStepUpComplete(OperationContext* opCtx, long long te _executor = std::move(taskExecutor); _executor->startup(); - _recoverRangeDeletionsOnStepUp(); + _recoverRangeDeletionsOnStepUp(opCtx); } -void RangeDeleterService::_recoverRangeDeletionsOnStepUp() { - +void RangeDeleterService::_recoverRangeDeletionsOnStepUp(OperationContext* opCtx) { if (disableResumableRangeDeleter.load()) { _state.store(kDown); return; } - // TODO SERVER-68348 Asynchronously register tasks on the range deleter service on step-up - _state.store(kUp); + LOGV2(6834800, "Resubmitting range deletion tasks"); + + ServiceContext* serviceContext = opCtx->getServiceContext(); + + ExecutorFuture<void>(_executor) + .then([serviceContext, this] { + ThreadClient tc("ResubmitRangeDeletionsOnStepUp", serviceContext); + { + stdx::lock_guard<Client> lk(*tc.get()); + tc->setSystemOperationKillableByStepdown(lk); + } + auto opCtx = tc->makeOperationContext(); + opCtx->setAlwaysInterruptAtStepDownOrUp_UNSAFE(); + + ScopedRangeDeleterLock rangeDeleterLock(opCtx.get()); + DBDirectClient client(opCtx.get()); + + int nRescheduledTasks = 0; + + // (1) register range deletion tasks marked as "processing" + auto processingTasksCompletionFuture = [&] { + std::vector<ExecutorFuture<void>> processingTasksCompletionFutures; + FindCommandRequest findCommand(NamespaceString::kRangeDeletionNamespace); + findCommand.setFilter(BSON(RangeDeletionTask::kProcessingFieldName << true)); + auto cursor = client.find(std::move(findCommand)); + + while (cursor->more()) { + auto completionFuture = this->registerTask( + RangeDeletionTask::parse(IDLParserContext("rangeDeletionRecovery"), + cursor->next()), + SemiFuture<void>::makeReady(), + true /* fromResubmitOnStepUp */); + nRescheduledTasks++; + processingTasksCompletionFutures.push_back( + completionFuture.thenRunOn(_executor)); + } + + if (nRescheduledTasks > 1) { + LOGV2_WARNING(6834801, + "Rescheduling several range deletions marked as processing. " + "Orphans count may be off while they are not drained", + "numRangeDeletionsMarkedAsProcessing"_attr = nRescheduledTasks); + } + + return processingTasksCompletionFutures.size() > 0 + ? whenAllSucceed(std::move(processingTasksCompletionFutures)).share() + : SemiFuture<void>::makeReady().share(); + }(); + + // (2) register all other "non-pending" tasks + { + FindCommandRequest findCommand(NamespaceString::kRangeDeletionNamespace); + findCommand.setFilter(BSON(RangeDeletionTask::kProcessingFieldName + << BSON("$ne" << true) + << RangeDeletionTask::kPendingFieldName + << BSON("$ne" << true))); + auto cursor = client.find(std::move(findCommand)); + while (cursor->more()) { + (void)this->registerTask( + RangeDeletionTask::parse(IDLParserContext("rangeDeletionRecovery"), + cursor->next()), + processingTasksCompletionFuture.thenRunOn(_executor).semi(), + true /* fromResubmitOnStepUp */); + } + } + + LOGV2_INFO(6834802, + "Finished resubmitting range deletion tasks", + "nRescheduledTasks"_attr = nRescheduledTasks); + + auto lock = _acquireMutexUnconditionally(); + // Since the recovery is only spawned on step-up but may complete later, it's not + // assumable that the node is still primary when the all resubmissions finish + if (_state.load() != kDown) { + this->_rangeDeleterServiceUpCondVar_FOR_TESTING.notify_all(); + this->_state.store(kUp); + } + }) + .getAsync([](auto) {}); } void RangeDeleterService::onStepDown() { @@ -103,9 +182,12 @@ void RangeDeleterService::onStepDown() { } auto lock = _acquireMutexUnconditionally(); - dassert(_state.load() != kDown, "Service expected to be initializing/up before stepping down"); - _executor->shutdown(); + // It may happen for the `onStepDown` hook to be invoked on a SECONDARY node transitioning + // to ROLLBACK, hence the executor may have never been initialized + if (_executor) { + _executor->shutdown(); + } _state.store(kDown); } @@ -149,7 +231,9 @@ long long RangeDeleterService::totalNumOfRegisteredTasks() { } SharedSemiFuture<void> RangeDeleterService::registerTask( - const RangeDeletionTask& rdt, SemiFuture<void>&& waitForActiveQueriesToComplete) { + const RangeDeletionTask& rdt, + SemiFuture<void>&& waitForActiveQueriesToComplete, + bool fromResubmitOnStepUp) { if (disableResumableRangeDeleter.load()) { return SemiFuture<void>::makeReady( @@ -207,7 +291,8 @@ SharedSemiFuture<void> RangeDeleterService::registerTask( .share(); auto [taskCompletionFuture, inserted] = [&]() -> std::pair<SharedSemiFuture<void>, bool> { - auto lock = _acquireMutexFailIfServiceNotUp(); + auto lock = fromResubmitOnStepUp ? _acquireMutexUnconditionally() + : _acquireMutexFailIfServiceNotUp(); auto [registeredTask, inserted] = _rangeDeletionTasks[rdt.getCollectionUuid()].insert( std::make_shared<RangeDeletion>(RangeDeletion(rdt, chainCompletionFuture))); auto retFuture = static_cast<RangeDeletion*>(registeredTask->get())->getCompletionFuture(); diff --git a/src/mongo/db/s/range_deleter_service.h b/src/mongo/db/s/range_deleter_service.h index 7de54c0be60..70f41bc1ccd 100644 --- a/src/mongo/db/s/range_deleter_service.h +++ b/src/mongo/db/s/range_deleter_service.h @@ -92,6 +92,9 @@ private: AtomicWord<State> _state{kDown}; + // ONLY FOR TESTING: variable notified when the state changes to "up" + stdx::condition_variable _rangeDeleterServiceUpCondVar_FOR_TESTING; + /* Acquire mutex only if service is up (for "user" operation) */ [[nodiscard]] stdx::unique_lock<Latch> _acquireMutexFailIfServiceNotUp() { stdx::unique_lock<Latch> lg(_mutex_DO_NOT_USE_DIRECTLY); @@ -120,7 +123,8 @@ public: */ SharedSemiFuture<void> registerTask( const RangeDeletionTask& rdt, - SemiFuture<void>&& waitForActiveQueriesToComplete = SemiFuture<void>::makeReady()); + SemiFuture<void>&& waitForActiveQueriesToComplete = SemiFuture<void>::makeReady(), + bool fromResubmitOnStepUp = false); /* * Deregister a task from the range deleter service. @@ -157,9 +161,18 @@ public: */ long long totalNumOfRegisteredTasks(); + /* ONLY FOR TESTING: wait for the state to become "up" */ + void _waitForRangeDeleterServiceUp_FOR_TESTING() { + stdx::unique_lock<Latch> lg(_mutex_DO_NOT_USE_DIRECTLY); + if (_state.load() != kUp) { + _rangeDeleterServiceUpCondVar_FOR_TESTING.wait(lg, + [&]() { return _state.load() == kUp; }); + } + } + private: /* Asynchronously register range deletions on the service. To be called on on step-up */ - void _recoverRangeDeletionsOnStepUp(); + void _recoverRangeDeletionsOnStepUp(OperationContext* opCtx); /* ReplicaSetAwareServiceShardSvr "empty implemented" methods */ void onStartup(OperationContext* opCtx) override final{}; diff --git a/src/mongo/db/s/range_deleter_service_test.cpp b/src/mongo/db/s/range_deleter_service_test.cpp index 26e1b0aca49..6d3d49eced0 100644 --- a/src/mongo/db/s/range_deleter_service_test.cpp +++ b/src/mongo/db/s/range_deleter_service_test.cpp @@ -31,6 +31,7 @@ #include "mongo/bson/unordered_fields_bsonobj_comparator.h" #include "mongo/db/catalog/create_collection.h" #include "mongo/db/catalog_raii.h" +#include "mongo/db/persistent_task_store.h" #include "mongo/db/s/operation_sharding_state.h" namespace mongo { @@ -42,6 +43,7 @@ void RangeDeleterServiceTest::setUp() { ShardServerTestFixture::setUp(); opCtx = operationContext(); RangeDeleterService::get(opCtx)->onStepUpComplete(opCtx, 0L); + RangeDeleterService::get(opCtx)->_waitForRangeDeleterServiceUp_FOR_TESTING(); { OperationShardingState::ScopedAllowImplicitCollectionCreate_UNSAFE unsafeCreateCollection( @@ -261,7 +263,9 @@ TEST_F(RangeDeleterServiceTest, ScheduledTaskInvalidatedOnStepDown) { // Manually trigger disabling of the service rds->onStepDown(); ON_BLOCK_EXIT([&] { - rds->onStepUpComplete(opCtx, 0L); // Re-enable the service for clean teardown + // Re-enable the service for clean teardown + rds->onStepUpComplete(opCtx, 0L); + rds->_waitForRangeDeleterServiceUp_FOR_TESTING(); }); try { @@ -277,7 +281,9 @@ TEST_F(RangeDeleterServiceTest, NoActionPossibleIfServiceIsDown) { // Manually trigger disabling of the service rds->onStepDown(); ON_BLOCK_EXIT([&] { - rds->onStepUpComplete(opCtx, 0L); // Re-enable the service for clean teardown + // Re-enable the service for clean teardown + rds->onStepUpComplete(opCtx, 0L); + rds->_waitForRangeDeleterServiceUp_FOR_TESTING(); }); auto taskWithOngoingQueries = createRangeDeletionTaskWithOngoingQueries( @@ -588,4 +594,53 @@ TEST_F(RangeDeleterServiceTest, ASSERT(overlappingRangeFutureWhenDisabled.isReady()); } +TEST_F(RangeDeleterServiceTest, RescheduleRangeDeletionTasksOnStepUp) { + PseudoRandom random(SecureRandom().nextInt64()); + auto rds = RangeDeleterService::get(opCtx); + + // Trigger step-down + rds->onStepDown(); + + // Random number of range deletion documents to generate (minimum 1, maximum 20). + int nRangeDeletionTasks = random.nextInt32(20) + 1; + + PersistentTaskStore<RangeDeletionTask> store(NamespaceString::kRangeDeletionNamespace); + + // Generate and persist range deleter tasks (some pending, some non-pending, some non-pending && + // processing) + int nPending = 0, nNonPending = 0, nNonPendingAndProcessing = 0; + int minBound = 0; + for (int i = 0; i < nRangeDeletionTasks; i++) { + auto rangeDeletionTask = createRangeDeletionTask( + uuidCollA, BSON("a" << minBound), BSON("a" << minBound + 10), CleanWhenEnum::kDelayed); + minBound += 10; + + auto rand = random.nextInt32() % 3; + if (rand == 0) { + // Pending range deletion task + rangeDeletionTask.setPending(true); + nPending++; + } else if (rand == 1) { + // Non-pending range deletion task + rangeDeletionTask.setPending(false); + nNonPending++; + } else if (rand == 2) { + // Non-pending and processing range deletion task + rangeDeletionTask.setPending(false); + rangeDeletionTask.setProcessing(true); + nNonPendingAndProcessing++; + } + + store.add(opCtx, rangeDeletionTask); + } + + // Trigger step-up + rds->onStepUpComplete(opCtx, 0L); + rds->_waitForRangeDeleterServiceUp_FOR_TESTING(); + + // Check that all non-pending tasks are being rescheduled + ASSERT_EQ(nNonPending + nNonPendingAndProcessing, + rds->getNumRangeDeletionTasksForCollection(uuidCollA)); +} + } // namespace mongo |