From 46d15e1715071f615fe6ceb4c54cf2803f69b7fc Mon Sep 17 00:00:00 2001 From: Pierlauro Sciarelli Date: Mon, 17 Oct 2022 16:36:21 +0000 Subject: SERVER-65558 Enable Feature flag for PM-2849 --- .../resmokeconfig/fully_disabled_feature_flags.yml | 2 - .../sharding/disable_resumable_range_deleter.js | 1 - ...replication_coordinator_external_state_impl.cpp | 5 +- src/mongo/db/s/collection_sharding_runtime.cpp | 53 ++++++++++------ .../db/s/collection_sharding_runtime_test.cpp | 51 ++++++++-------- src/mongo/db/s/collection_sharding_state.cpp | 27 +++++---- .../s/collection_sharding_state_factory_shard.cpp | 5 ++ src/mongo/db/s/metadata_manager.cpp | 5 +- src/mongo/db/s/metadata_manager.h | 3 +- src/mongo/db/s/metadata_manager_test.cpp | 17 +++--- src/mongo/db/s/migration_coordinator.cpp | 70 ++++++++++++++++------ src/mongo/db/s/migration_coordinator.h | 4 +- src/mongo/db/s/migration_source_manager.h | 2 +- src/mongo/db/s/migration_util_test.cpp | 2 + src/mongo/db/s/range_deleter_service.cpp | 32 ++++++---- src/mongo/db/s/range_deleter_service.h | 43 +++++++++---- src/mongo/db/s/range_deleter_service_test.cpp | 28 +++++++++ src/mongo/db/s/range_deleter_service_test_util.cpp | 6 +- src/mongo/db/s/sharding_server_status.cpp | 6 ++ src/mongo/s/sharding_feature_flags.idl | 3 +- 20 files changed, 248 insertions(+), 117 deletions(-) diff --git a/buildscripts/resmokeconfig/fully_disabled_feature_flags.yml b/buildscripts/resmokeconfig/fully_disabled_feature_flags.yml index 678b3c522c3..1188d8163cc 100644 --- a/buildscripts/resmokeconfig/fully_disabled_feature_flags.yml +++ b/buildscripts/resmokeconfig/fully_disabled_feature_flags.yml @@ -12,5 +12,3 @@ # released create the transactions collection index and is only meant to be enabled adhoc, so only # its targeted tests should enable it. - featureFlagAlwaysCreateConfigTransactionsPartialIndexOnStepUp -# Disable 'featureFlagRangeDeleterService' as long as the service is not performing actual range deletions -- featureFlagRangeDeleterService diff --git a/jstests/sharding/disable_resumable_range_deleter.js b/jstests/sharding/disable_resumable_range_deleter.js index ddf7c8a698f..760a937fbab 100644 --- a/jstests/sharding/disable_resumable_range_deleter.js +++ b/jstests/sharding/disable_resumable_range_deleter.js @@ -51,7 +51,6 @@ st.rs0.restart(0, { }); jsTest.log("Shard0 should fail to submit the range deletion task on stepup."); -checkLog.contains(st.rs0.getPrimary(), "Failed to submit range deletion task"); jsTest.log("Shard0 should fail to receive a range that overlaps the range deletion task."); // The error from moveChunk gets wrapped as an OperationFailed error, so we have to check the error diff --git a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp index 677504159fc..bdd03bc69fe 100644 --- a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp +++ b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp @@ -112,6 +112,7 @@ #include "mongo/s/client/shard_registry.h" #include "mongo/s/cluster_identity_loader.h" #include "mongo/s/grid.h" +#include "mongo/s/sharding_feature_flags_gen.h" #include "mongo/stdx/thread.h" #include "mongo/transport/service_entry_point.h" #include "mongo/util/assert_util.h" @@ -951,7 +952,9 @@ void ReplicationCoordinatorExternalStateImpl::_shardingOnTransitionToPrimaryHook // Note, these must be done after the configOpTime is recovered via // ShardingStateRecovery::recover above, because they may trigger filtering metadata // refreshes which should use the recovered configOpTime. - migrationutil::resubmitRangeDeletionsOnStepUp(_service); + if (!mongo::feature_flags::gRangeDeleterService.isEnabledAndIgnoreFCV()) { + migrationutil::resubmitRangeDeletionsOnStepUp(_service); + } migrationutil::resumeMigrationCoordinationsOnStepUp(opCtx); migrationutil::resumeMigrationRecipientsOnStepUp(opCtx); diff --git a/src/mongo/db/s/collection_sharding_runtime.cpp b/src/mongo/db/s/collection_sharding_runtime.cpp index f112f54ada8..e2b02e7a452 100644 --- a/src/mongo/db/s/collection_sharding_runtime.cpp +++ b/src/mongo/db/s/collection_sharding_runtime.cpp @@ -33,9 +33,11 @@ #include "mongo/db/catalog_raii.h" #include "mongo/db/global_settings.h" #include "mongo/db/s/operation_sharding_state.h" +#include "mongo/db/s/range_deleter_service.h" #include "mongo/db/s/sharding_runtime_d_params_gen.h" #include "mongo/db/s/sharding_state.h" #include "mongo/logv2/log.h" +#include "mongo/s/sharding_feature_flags_gen.h" #include "mongo/util/duration.h" #define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kSharding @@ -260,9 +262,14 @@ void CollectionShardingRuntime::clearFilteringMetadataForDroppedCollection( SharedSemiFuture CollectionShardingRuntime::cleanUpRange(ChunkRange const& range, CleanWhen when) { - stdx::lock_guard lk(_metadataManagerLock); - invariant(_metadataType == MetadataType::kSharded); - return _metadataManager->cleanUpRange(range, when == kDelayed); + if (!feature_flags::gRangeDeleterService.isEnabledAndIgnoreFCV()) { + stdx::lock_guard lk(_metadataManagerLock); + invariant(_metadataType == MetadataType::kSharded); + return _metadataManager->cleanUpRange(range, when == kDelayed); + } + + // This method must never be called if the range deleter service feature flag is enabled + MONGO_UNREACHABLE; } Status CollectionShardingRuntime::waitForClean(OperationContext* opCtx, @@ -271,9 +278,8 @@ Status CollectionShardingRuntime::waitForClean(OperationContext* opCtx, ChunkRange orphanRange, Date_t deadline) { while (true) { - boost::optional> stillScheduled; - - { + const StatusWith> swOrphanCleanupFuture = + [&]() -> StatusWith> { AutoGetCollection autoColl(opCtx, nss, MODE_IX); auto* const self = CollectionShardingRuntime::get(opCtx, nss); stdx::lock_guard lk(self->_metadataManagerLock); @@ -287,16 +293,27 @@ Status CollectionShardingRuntime::waitForClean(OperationContext* opCtx, "metadata reset"}; } - stillScheduled = self->_metadataManager->trackOrphanedDataCleanup(orphanRange); - if (!stillScheduled) { - LOGV2_OPTIONS(21918, - {logv2::LogComponent::kShardingMigration}, - "Finished waiting for deletion of {namespace} range {orphanRange}", - "Finished waiting for deletion of orphans", - "namespace"_attr = nss.ns(), - "orphanRange"_attr = redact(orphanRange.toString())); - return Status::OK(); + if (feature_flags::gRangeDeleterService.isEnabledAndIgnoreFCV()) { + return RangeDeleterService::get(opCtx)->getOverlappingRangeDeletionsFuture( + self->_metadataManager->getCollectionUuid(), orphanRange); + } else { + return self->_metadataManager->trackOrphanedDataCleanup(orphanRange); } + }(); + + if (!swOrphanCleanupFuture.isOK()) { + return swOrphanCleanupFuture.getStatus(); + } + + auto orphanCleanupFuture = std::move(swOrphanCleanupFuture.getValue()); + if (orphanCleanupFuture.isReady()) { + LOGV2_OPTIONS(21918, + {logv2::LogComponent::kShardingMigration}, + "Finished waiting for deletion of {namespace} range {orphanRange}", + "Finished waiting for deletion of orphans", + "namespace"_attr = nss.ns(), + "orphanRange"_attr = redact(orphanRange.toString())); + return Status::OK(); } LOGV2_OPTIONS(21919, @@ -307,12 +324,12 @@ Status CollectionShardingRuntime::waitForClean(OperationContext* opCtx, "orphanRange"_attr = orphanRange); try { opCtx->runWithDeadline( - deadline, ErrorCodes::ExceededTimeLimit, [&] { stillScheduled->get(opCtx); }); + deadline, ErrorCodes::ExceededTimeLimit, [&] { orphanCleanupFuture.get(opCtx); }); } catch (const DBException& ex) { auto result = ex.toStatus(); // Swallow RangeDeletionAbandonedBecauseCollectionWithUUIDDoesNotExist error since the - // collection could either never exist or get dropped directly from the shard after - // the range deletion task got scheduled. + // collection could either never exist or get dropped directly from the shard after the + // range deletion task got scheduled. if (result != ErrorCodes::RangeDeletionAbandonedBecauseCollectionWithUUIDDoesNotExist) { return result.withContext(str::stream() << "Failed to delete orphaned " << nss.ns() << " range " << orphanRange.toString()); diff --git a/src/mongo/db/s/collection_sharding_runtime_test.cpp b/src/mongo/db/s/collection_sharding_runtime_test.cpp index 1379772ba14..763e835d7f1 100644 --- a/src/mongo/db/s/collection_sharding_runtime_test.cpp +++ b/src/mongo/db/s/collection_sharding_runtime_test.cpp @@ -38,6 +38,7 @@ #include "mongo/db/s/collection_sharding_runtime.h" #include "mongo/db/s/global_index_ddl_util.h" #include "mongo/db/s/operation_sharding_state.h" +#include "mongo/db/s/range_deleter_service_test.h" #include "mongo/db/s/range_deletion_task_gen.h" #include "mongo/db/s/shard_filtering_metadata_refresh.h" #include "mongo/db/s/shard_server_test_fixture.h" @@ -378,12 +379,17 @@ public: AutoGetCollection autoColl(operationContext(), kTestNss, MODE_IX); _uuid = autoColl.getCollection()->uuid(); + + RangeDeleterService::get(operationContext())->onStepUpComplete(operationContext(), 0L); + RangeDeleterService::get(operationContext())->_waitForRangeDeleterServiceUp_FOR_TESTING(); } void tearDown() override { DBDirectClient client(operationContext()); client.dropCollection(kTestNss.ns()); + RangeDeleterService::get(operationContext())->onStepDown(); + RangeDeleterService::get(operationContext())->onShutdown(); WaitForMajorityService::get(getServiceContext()).shutDown(); CollectionShardingRuntimeTest::tearDown(); } @@ -411,25 +417,16 @@ private: // updates on the range deletions namespace though, so we can get around the issue by inserting // the task with the 'pending' field set, and then remove the field using a replacement update // after. -RangeDeletionTask insertRangeDeletionTask(OperationContext* opCtx, +RangeDeletionTask createRangeDeletionTask(OperationContext* opCtx, const NamespaceString& nss, const UUID& uuid, const ChunkRange& range, int64_t numOrphans) { - PersistentTaskStore store(NamespaceString::kRangeDeletionNamespace); auto migrationId = UUID::gen(); - RangeDeletionTask t(migrationId, nss, uuid, ShardId("donor"), range, CleanWhenEnum::kDelayed); - t.setPending(true); + RangeDeletionTask t(migrationId, nss, uuid, ShardId("donor"), range, CleanWhenEnum::kNow); t.setNumOrphanDocs(numOrphans); const auto currentTime = VectorClock::get(opCtx)->getTime(); t.setTimestamp(currentTime.clusterTime().asTimestamp()); - store.add(opCtx, t); - - auto query = BSON(RangeDeletionTask::kIdFieldName << migrationId); - t.setPending(boost::none); - auto update = t.toBSON(); - store.update(opCtx, query, update); - return t; } @@ -480,14 +477,18 @@ TEST_F(CollectionShardingRuntimeWithRangeDeleterTest, WaitForCleanBlocksBehindOneScheduledDeletion) { // Enable fail point to suspendRangeDeletion. globalFailPointRegistry().find("suspendRangeDeletion")->setMode(FailPoint::alwaysOn); + ScopeGuard resetFailPoint( + [=] { globalFailPointRegistry().find("suspendRangeDeletion")->setMode(FailPoint::off); }); + OperationContext* opCtx = operationContext(); auto metadata = makeShardedMetadata(opCtx, uuid()); csr().setFilteringMetadata(opCtx, metadata); const ChunkRange range = ChunkRange(BSON(kShardKey << MINKEY), BSON(kShardKey << MAXKEY)); - const auto task = insertRangeDeletionTask(opCtx, kTestNss, uuid(), range, 0); - auto cleanupComplete = csr().cleanUpRange(range, CollectionShardingRuntime::CleanWhen::kNow); + const auto task = createRangeDeletionTask(opCtx, kTestNss, uuid(), range, 0); + auto taskCompletionFuture = registerAndCreatePersistentTask( + opCtx, task, SemiFuture::makeReady() /* waitForActiveQueries */); opCtx->setDeadlineAfterNowBy(Milliseconds(100), ErrorCodes::MaxTimeMSExpired); auto status = @@ -496,7 +497,7 @@ TEST_F(CollectionShardingRuntimeWithRangeDeleterTest, ASSERT_EQ(status.code(), ErrorCodes::MaxTimeMSExpired); globalFailPointRegistry().find("suspendRangeDeletion")->setMode(FailPoint::off); - cleanupComplete.get(); + taskCompletionFuture.get(); } TEST_F(CollectionShardingRuntimeWithRangeDeleterTest, @@ -507,15 +508,15 @@ TEST_F(CollectionShardingRuntimeWithRangeDeleterTest, const auto middleKey = 5; const ChunkRange range1 = ChunkRange(BSON(kShardKey << MINKEY), BSON(kShardKey << middleKey)); - const auto task1 = insertRangeDeletionTask(opCtx, kTestNss, uuid(), range1, 0); + const auto task1 = createRangeDeletionTask(opCtx, kTestNss, uuid(), range1, 0); const ChunkRange range2 = ChunkRange(BSON(kShardKey << middleKey), BSON(kShardKey << MAXKEY)); - const auto task2 = insertRangeDeletionTask(opCtx, kTestNss, uuid(), range2, 0); + const auto task2 = createRangeDeletionTask(opCtx, kTestNss, uuid(), range2, 0); - auto cleanupCompleteFirst = - csr().cleanUpRange(range1, CollectionShardingRuntime::CleanWhen::kNow); + auto cleanupCompleteFirst = registerAndCreatePersistentTask( + opCtx, task1, SemiFuture::makeReady() /* waitForActiveQueries */); - auto cleanupCompleteSecond = - csr().cleanUpRange(range2, CollectionShardingRuntime::CleanWhen::kNow); + auto cleanupCompleteSecond = registerAndCreatePersistentTask( + opCtx, task2, SemiFuture::makeReady() /* waitForActiveQueries */); auto status = CollectionShardingRuntime::waitForClean( opCtx, @@ -539,9 +540,10 @@ TEST_F(CollectionShardingRuntimeWithRangeDeleterTest, auto metadata = makeShardedMetadata(opCtx, uuid()); csr().setFilteringMetadata(opCtx, metadata); const ChunkRange range = ChunkRange(BSON(kShardKey << MINKEY), BSON(kShardKey << MAXKEY)); - const auto task = insertRangeDeletionTask(opCtx, kTestNss, uuid(), range, 0); + const auto task = createRangeDeletionTask(opCtx, kTestNss, uuid(), range, 0); - auto cleanupComplete = csr().cleanUpRange(range, CollectionShardingRuntime::CleanWhen::kNow); + auto cleanupComplete = registerAndCreatePersistentTask( + opCtx, task, SemiFuture::makeReady() /* waitForActiveQueries */); auto status = CollectionShardingRuntime::waitForClean(opCtx, kTestNss, uuid(), range, Date_t::max()); @@ -560,10 +562,11 @@ TEST_F(CollectionShardingRuntimeWithRangeDeleterTest, auto metadata = makeShardedMetadata(opCtx, uuid()); csr().setFilteringMetadata(opCtx, metadata); const ChunkRange range = ChunkRange(BSON(kShardKey << MINKEY), BSON(kShardKey << MAXKEY)); - const auto task = insertRangeDeletionTask(opCtx, kTestNss, uuid(), range, 0); + const auto task = createRangeDeletionTask(opCtx, kTestNss, uuid(), range, 0); // Schedule range deletion that will hang due to `suspendRangeDeletion` failpoint - auto cleanupComplete = csr().cleanUpRange(range, CollectionShardingRuntime::CleanWhen::kNow); + auto cleanupComplete = registerAndCreatePersistentTask( + opCtx, task, SemiFuture::makeReady() /* waitForActiveQueries */); // Clear and set again filtering metadata csr().clearFilteringMetadata(opCtx); diff --git a/src/mongo/db/s/collection_sharding_state.cpp b/src/mongo/db/s/collection_sharding_state.cpp index 17a9127e70f..cf8fed2816e 100644 --- a/src/mongo/db/s/collection_sharding_state.cpp +++ b/src/mongo/db/s/collection_sharding_state.cpp @@ -30,6 +30,7 @@ #include "mongo/db/s/collection_sharding_state.h" #include "mongo/logv2/log.h" +#include "mongo/s/sharding_feature_flags_gen.h" #include "mongo/util/string_map.h" #define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kSharding @@ -82,18 +83,20 @@ public: } void appendInfoForServerStatus(BSONObjBuilder* builder) { - auto totalNumberOfRangesScheduledForDeletion = ([this] { - stdx::lock_guard lg(_mutex); - return std::accumulate(_collections.begin(), - _collections.end(), - 0LL, - [](long long total, const auto& coll) { - return total + - coll.second->numberOfRangesScheduledForDeletion(); - }); - })(); - - builder->appendNumber("rangeDeleterTasks", totalNumberOfRangesScheduledForDeletion); + if (!mongo::feature_flags::gRangeDeleterService.isEnabledAndIgnoreFCV()) { + auto totalNumberOfRangesScheduledForDeletion = ([this] { + stdx::lock_guard lg(_mutex); + return std::accumulate(_collections.begin(), + _collections.end(), + 0LL, + [](long long total, const auto& coll) { + return total + + coll.second->numberOfRangesScheduledForDeletion(); + }); + })(); + + builder->appendNumber("rangeDeleterTasks", totalNumberOfRangesScheduledForDeletion); + } } std::vector getCollectionNames() { diff --git a/src/mongo/db/s/collection_sharding_state_factory_shard.cpp b/src/mongo/db/s/collection_sharding_state_factory_shard.cpp index 8a1b65fa270..74939c03ffe 100644 --- a/src/mongo/db/s/collection_sharding_state_factory_shard.cpp +++ b/src/mongo/db/s/collection_sharding_state_factory_shard.cpp @@ -36,6 +36,7 @@ #include "mongo/executor/network_interface_factory.h" #include "mongo/executor/network_interface_thread_pool.h" #include "mongo/executor/thread_pool_task_executor.h" +#include "mongo/s/sharding_feature_flags_gen.h" #define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kSharding @@ -64,6 +65,10 @@ std::unique_ptr CollectionShardingStateFactoryShard::ma std::shared_ptr CollectionShardingStateFactoryShard::_getRangeDeletionExecutor() { + if (feature_flags::gRangeDeleterService.isEnabledAndIgnoreFCV()) { + return nullptr; + } + stdx::lock_guard lg(_mutex); if (!_rangeDeletionExecutor) { const std::string kExecName("CollectionRangeDeleter-TaskExecutor"); diff --git a/src/mongo/db/s/metadata_manager.cpp b/src/mongo/db/s/metadata_manager.cpp index 96542f14ef2..1d9c63f77e5 100644 --- a/src/mongo/db/s/metadata_manager.cpp +++ b/src/mongo/db/s/metadata_manager.cpp @@ -320,8 +320,7 @@ size_t MetadataManager::numberOfRangesScheduledForDeletion() const { return _rangesScheduledForDeletion.size(); } -boost::optional> MetadataManager::trackOrphanedDataCleanup( - ChunkRange const& range) const { +SharedSemiFuture MetadataManager::trackOrphanedDataCleanup(ChunkRange const& range) const { stdx::lock_guard lg(_managerLock); for (const auto& [orphanRange, deletionComplete] : _rangesScheduledForDeletion) { if (orphanRange.overlapWith(range)) { @@ -329,7 +328,7 @@ boost::optional> MetadataManager::trackOrphanedDataCleanu } } - return boost::none; + return SemiFuture::makeReady().share(); } SharedSemiFuture MetadataManager::getOngoingQueriesCompletionFuture(ChunkRange const& range) { diff --git a/src/mongo/db/s/metadata_manager.h b/src/mongo/db/s/metadata_manager.h index 5343e7cc123..153dbb5f55f 100644 --- a/src/mongo/db/s/metadata_manager.h +++ b/src/mongo/db/s/metadata_manager.h @@ -142,8 +142,7 @@ public: * returns a future that will be resolved when the newest overlapping range's deletion (possibly * the one of interest) completes or fails. */ - boost::optional> trackOrphanedDataCleanup( - ChunkRange const& orphans) const; + SharedSemiFuture trackOrphanedDataCleanup(ChunkRange const& orphans) const; /** * Returns a future marked as ready when all the ongoing queries retaining the range complete diff --git a/src/mongo/db/s/metadata_manager_test.cpp b/src/mongo/db/s/metadata_manager_test.cpp index fca1894abf4..1d5244e080e 100644 --- a/src/mongo/db/s/metadata_manager_test.cpp +++ b/src/mongo/db/s/metadata_manager_test.cpp @@ -44,6 +44,7 @@ #include "mongo/db/service_context.h" #include "mongo/db/vector_clock.h" #include "mongo/executor/task_executor.h" +#include "mongo/idl/server_parameter_test_util.h" #include "mongo/s/catalog/type_chunk.h" #include "mongo/s/client/shard_registry.h" #include "mongo/stdx/condition_variable.h" @@ -214,6 +215,7 @@ RangeDeletionTask insertRangeDeletionTask(OperationContext* opCtx, } TEST_F(MetadataManagerTest, TrackOrphanedDataCleanupBlocksOnScheduledRangeDeletions) { + RAIIServerParameterControllerForTest enableFeatureFlag{"featureFlagRangeDeleterService", false}; ChunkRange cr1(BSON("key" << 0), BSON("key" << 10)); const auto task = insertRangeDeletionTask(operationContext(), kNss, _manager->getCollectionUuid(), cr1, 0); @@ -225,14 +227,15 @@ TEST_F(MetadataManagerTest, TrackOrphanedDataCleanupBlocksOnScheduledRangeDeleti ASSERT_FALSE(notifn1.isReady()); ASSERT_EQ(_manager->numberOfRangesToClean(), 1UL); - auto optNotifn = _manager->trackOrphanedDataCleanup(cr1); + auto future = _manager->trackOrphanedDataCleanup(cr1); ASSERT_FALSE(notifn1.isReady()); - ASSERT_FALSE(optNotifn->isReady()); + ASSERT_FALSE(future.isReady()); globalFailPointRegistry().find("suspendRangeDeletion")->setMode(FailPoint::off); } TEST_F(MetadataManagerTest, CleanupNotificationsAreSignaledWhenMetadataManagerIsDestroyed) { + RAIIServerParameterControllerForTest enableFeatureFlag{"featureFlagRangeDeleterService", false}; const ChunkRange rangeToClean(BSON("key" << 20), BSON("key" << 30)); const auto task = insertRangeDeletionTask( operationContext(), kNss, _manager->getCollectionUuid(), rangeToClean, 0); @@ -254,15 +257,14 @@ TEST_F(MetadataManagerTest, CleanupNotificationsAreSignaledWhenMetadataManagerIs auto notif = _manager->cleanUpRange(rangeToClean, false /*delayBeforeDeleting*/); ASSERT(!notif.isReady()); - auto optNotif = _manager->trackOrphanedDataCleanup(rangeToClean); - ASSERT(optNotif); - ASSERT(!optNotif->isReady()); + auto future = _manager->trackOrphanedDataCleanup(rangeToClean); + ASSERT(!future.isReady()); // Reset the original shared_ptr. The cursorOnMovedMetadata will still contain its own copy of // the shared_ptr though, so the destructor of ~MetadataManager won't yet be called. _manager.reset(); ASSERT(!notif.isReady()); - ASSERT(!optNotif->isReady()); + ASSERT(!future.isReady()); // Destroys the ScopedCollectionDescription object and causes the destructor of MetadataManager // to run, which should trigger all deletion notifications. @@ -275,7 +277,7 @@ TEST_F(MetadataManagerTest, CleanupNotificationsAreSignaledWhenMetadataManagerIs } notif.wait(); - optNotif->wait(); + future.wait(); } TEST_F(MetadataManagerTest, RefreshAfterSuccessfulMigrationSinglePending) { @@ -327,6 +329,7 @@ TEST_F(MetadataManagerTest, BeginReceiveWithOverlappingRange) { // Tests membership functions for _rangesToClean TEST_F(MetadataManagerTest, RangesToCleanMembership) { + RAIIServerParameterControllerForTest enableFeatureFlag{"featureFlagRangeDeleterService", false}; ChunkRange cr(BSON("key" << 0), BSON("key" << 10)); const auto task = insertRangeDeletionTask(operationContext(), kNss, _manager->getCollectionUuid(), cr, 0); diff --git a/src/mongo/db/s/migration_coordinator.cpp b/src/mongo/db/s/migration_coordinator.cpp index feaf7252cc2..febea6e0fc8 100644 --- a/src/mongo/db/s/migration_coordinator.cpp +++ b/src/mongo/db/s/migration_coordinator.cpp @@ -29,7 +29,10 @@ #include "mongo/db/s/migration_coordinator.h" +#include "mongo/db/catalog_raii.h" +#include "mongo/db/s/collection_sharding_runtime.h" #include "mongo/db/s/migration_util.h" +#include "mongo/db/s/range_deleter_service.h" #include "mongo/db/s/range_deletion_task_gen.h" #include "mongo/db/s/range_deletion_util.h" #include "mongo/db/session/logical_session_id_helpers.h" @@ -142,7 +145,8 @@ void MigrationCoordinator::setMigrationDecision(DecisionEnum decision) { } -boost::optional> MigrationCoordinator::completeMigration(OperationContext* opCtx) { +boost::optional> MigrationCoordinator::completeMigration( + OperationContext* opCtx) { auto decision = _migrationInfo.getDecision(); if (!decision) { LOGV2( @@ -165,7 +169,7 @@ boost::optional> MigrationCoordinator::completeMigration(Operat launchReleaseRecipientCriticalSection(opCtx); } - boost::optional> cleanupCompleteFuture = boost::none; + boost::optional> cleanupCompleteFuture = boost::none; switch (*decision) { case DecisionEnum::kAborted: @@ -183,7 +187,7 @@ boost::optional> MigrationCoordinator::completeMigration(Operat return cleanupCompleteFuture; } -SemiFuture MigrationCoordinator::_commitMigrationOnDonorAndRecipient( +SharedSemiFuture MigrationCoordinator::_commitMigrationOnDonorAndRecipient( OperationContext* opCtx) { hangBeforeMakingCommitDecisionDurable.pauseWhileSet(); @@ -233,19 +237,6 @@ SemiFuture MigrationCoordinator::_commitMigrationOnDonorAndRecipient( _migrationInfo.getRange(), _migrationInfo.getId()); - LOGV2_DEBUG(23897, - 2, - "Marking range deletion task on donor as ready for processing", - "migrationId"_attr = _migrationInfo.getId()); - migrationutil::markAsReadyRangeDeletionTaskLocally( - opCtx, _migrationInfo.getCollectionUuid(), _migrationInfo.getRange()); - - // At this point the decision cannot be changed and will be recovered in the event of a - // failover, so it is safe to schedule the deletion task after updating the persisted state. - LOGV2_DEBUG(23898, - 2, - "Scheduling range deletion task on donor", - "migrationId"_attr = _migrationInfo.getId()); RangeDeletionTask deletionTask(_migrationInfo.getId(), _migrationInfo.getNss(), _migrationInfo.getCollectionUuid(), @@ -254,7 +245,52 @@ SemiFuture MigrationCoordinator::_commitMigrationOnDonorAndRecipient( _waitForDelete ? CleanWhenEnum::kNow : CleanWhenEnum::kDelayed); const auto currentTime = VectorClock::get(opCtx)->getTime(); deletionTask.setTimestamp(currentTime.clusterTime().asTimestamp()); - return migrationutil::submitRangeDeletionTask(opCtx, deletionTask).semi(); + + if (!feature_flags::gRangeDeleterService.isEnabledAndIgnoreFCV()) { + LOGV2_DEBUG(23897, + 2, + "Marking range deletion task on donor as ready for processing", + "migrationId"_attr = _migrationInfo.getId()); + migrationutil::markAsReadyRangeDeletionTaskLocally( + opCtx, _migrationInfo.getCollectionUuid(), _migrationInfo.getRange()); + + // At this point the decision cannot be changed and will be recovered in the event of a + // failover, so it is safe to schedule the deletion task after updating the persisted state. + LOGV2_DEBUG(23898, + 2, + "Scheduling range deletion task on donor", + "migrationId"_attr = _migrationInfo.getId()); + + return migrationutil::submitRangeDeletionTask(opCtx, deletionTask).share(); + } + + + auto waitForActiveQueriesToComplete = [&]() { + AutoGetCollection autoColl(opCtx, deletionTask.getNss(), MODE_IS); + return CollectionShardingRuntime::get(opCtx, deletionTask.getNss()) + ->getOngoingQueriesCompletionFuture(deletionTask.getCollectionUuid(), + deletionTask.getRange()) + .semi(); + }(); + + // Register the range deletion task as pending in order to get the completion future + auto rangeDeletionCompletionFuture = + RangeDeleterService::get(opCtx)->registerTask(deletionTask, + std::move(waitForActiveQueriesToComplete), + false /* fromStepUp*/, + true /* pending */); + + LOGV2_DEBUG(6555800, + 2, + "Marking range deletion task on donor as ready for processing", + "rangeDeletion"_attr = deletionTask); + + // Mark the range deletion task document as non-pending in order to unblock the previously + // registered range deletion + migrationutil::markAsReadyRangeDeletionTaskLocally( + opCtx, deletionTask.getCollectionUuid(), deletionTask.getRange()); + + return rangeDeletionCompletionFuture; } void MigrationCoordinator::_abortMigrationOnDonorAndRecipient(OperationContext* opCtx) { diff --git a/src/mongo/db/s/migration_coordinator.h b/src/mongo/db/s/migration_coordinator.h index 5c24400ef7c..da0afc0a309 100644 --- a/src/mongo/db/s/migration_coordinator.h +++ b/src/mongo/db/s/migration_coordinator.h @@ -89,7 +89,7 @@ public: * If the decision was to commit, returns a future that is set when range deletion for * the donated range completes. */ - boost::optional> completeMigration(OperationContext* opCtx); + boost::optional> completeMigration(OperationContext* opCtx); /** * Deletes the persistent state for this migration from config.migrationCoordinators. @@ -109,7 +109,7 @@ private: * the donor as ready to be processed. Returns a future that is set when range deletion for * the donated range completes. */ - SemiFuture _commitMigrationOnDonorAndRecipient(OperationContext* opCtx); + SharedSemiFuture _commitMigrationOnDonorAndRecipient(OperationContext* opCtx); /** * Deletes the range deletion task from the donor node and marks the range deletion task on the diff --git a/src/mongo/db/s/migration_source_manager.h b/src/mongo/db/s/migration_source_manager.h index 756d1828014..b7819432c49 100644 --- a/src/mongo/db/s/migration_source_manager.h +++ b/src/mongo/db/s/migration_source_manager.h @@ -290,7 +290,7 @@ private: // Optional future that is populated if the migration succeeds and range deletion is scheduled // on this node. The future is set when the range deletion completes. Used if the moveChunk was // sent with waitForDelete. - boost::optional> _cleanupCompleteFuture; + boost::optional> _cleanupCompleteFuture; }; } // namespace mongo diff --git a/src/mongo/db/s/migration_util_test.cpp b/src/mongo/db/s/migration_util_test.cpp index ee6e189178d..d023db60ab1 100644 --- a/src/mongo/db/s/migration_util_test.cpp +++ b/src/mongo/db/s/migration_util_test.cpp @@ -490,6 +490,8 @@ public: CatalogCacheLoaderMock* _mockCatalogCacheLoader; StaticCatalogClient* _mockCatalogClient; + + RAIIServerParameterControllerForTest enableFeatureFlag{"featureFlagRangeDeleterService", false}; }; TEST_F(SubmitRangeDeletionTaskTest, diff --git a/src/mongo/db/s/range_deleter_service.cpp b/src/mongo/db/s/range_deleter_service.cpp index dd94e9be392..267905c0632 100644 --- a/src/mongo/db/s/range_deleter_service.cpp +++ b/src/mongo/db/s/range_deleter_service.cpp @@ -427,7 +427,8 @@ long long RangeDeleterService::totalNumOfRegisteredTasks() { SharedSemiFuture RangeDeleterService::registerTask( const RangeDeletionTask& rdt, SemiFuture&& waitForActiveQueriesToComplete, - bool fromResubmitOnStepUp) { + bool fromResubmitOnStepUp, + bool pending) { if (disableResumableRangeDeleter.load()) { LOGV2_INFO(6872509, @@ -439,10 +440,13 @@ SharedSemiFuture RangeDeleterService::registerTask( .share(); } - auto scheduleRangeDeletionChain = [&]() { - // Step 1: wait for ongoing queries retaining the range to drain - (void)std::move(waitForActiveQueriesToComplete) - .thenRunOn(_executor) + auto scheduleRangeDeletionChain = [&](SharedSemiFuture pendingFuture) { + (void)pendingFuture.thenRunOn(_executor) + .then([this, + waitForOngoingQueries = std::move(waitForActiveQueriesToComplete).share()]() { + // Step 1: wait for ongoing queries retaining the range to drain + return waitForOngoingQueries; + }) .then([this, when = rdt.getWhenToClean()]() { // Step 2: schedule wait for secondaries orphans cleanup delay const auto delayForActiveQueriesOnSecondariesToComplete = @@ -467,19 +471,23 @@ SharedSemiFuture RangeDeleterService::registerTask( auto lock = fromResubmitOnStepUp ? _acquireMutexUnconditionally() : _acquireMutexFailIfServiceNotUp(); + auto [registeredTask, firstRegistration] = _rangeDeletionTasks[rdt.getCollectionUuid()].insert(std::make_shared(rdt)); + auto task = static_cast(registeredTask->get()); + + // Register the task on the service only once, duplicate registrations will join if (firstRegistration) { - scheduleRangeDeletionChain(); - } else { - LOGV2_WARNING(6804200, - "Tried to register duplicate range deletion task. This results in a no-op.", - "collectionUUID"_attr = rdt.getCollectionUuid(), - "range"_attr = rdt.getRange()); + scheduleRangeDeletionChain(task->getPendingFuture()); + } + + // Allow future chain to progress in case the task is flagged as non-pending + if (!pending) { + task->clearPending(); } - return static_cast(registeredTask->get())->getCompletionFuture(); + return task->getCompletionFuture(); } void RangeDeleterService::deregisterTask(const UUID& collUUID, const ChunkRange& range) { diff --git a/src/mongo/db/s/range_deleter_service.h b/src/mongo/db/s/range_deleter_service.h index 2b8293805e8..2c772053dc6 100644 --- a/src/mongo/db/s/range_deleter_service.h +++ b/src/mongo/db/s/range_deleter_service.h @@ -64,6 +64,16 @@ private: } } + SharedSemiFuture getPendingFuture() { + return _pendingPromise.getFuture(); + } + + void clearPending() { + if (!_pendingPromise.getFuture().isReady()) { + _pendingPromise.emplaceValue(); + } + } + SharedSemiFuture getCompletionFuture() const { return _completionPromise.getFuture().semi().share(); } @@ -75,6 +85,8 @@ private: private: // Marked ready once the range deletion has been fully processed SharedPromise _completionPromise; + + SharedPromise _pendingPromise; }; /* @@ -98,8 +110,8 @@ private: */ class ReadyRangeDeletionsProcessor { public: - ReadyRangeDeletionsProcessor(OperationContext* opCtx) - : _thread(stdx::thread([this] { _runRangeDeletions(); })) { + ReadyRangeDeletionsProcessor(OperationContext* opCtx) { + _thread = stdx::thread([this] { _runRangeDeletions(); }); stdx::unique_lock lock(_mutex); opCtx->waitForConditionOrInterrupt( _condVar, lock, [&] { return _threadOpCtxHolder.is_initialized(); }); @@ -149,12 +161,8 @@ private: */ void _runRangeDeletions(); - /* Queue containing scheduled range deletions */ - std::queue _queue; - /* Thread consuming the range deletions queue */ - stdx::thread _thread; - /* Pointer to the (one and only) operation context used by the thread */ - boost::optional _threadOpCtxHolder; + Mutex _mutex = MONGO_MAKE_LATCH("ReadyRangeDeletionsProcessor"); + /* * Condition variable notified when: * - The component has been initialized (the operation context has been instantiated) @@ -163,7 +171,14 @@ private: */ stdx::condition_variable _condVar; - Mutex _mutex = MONGO_MAKE_LATCH("ReadyRangeDeletionsProcessor"); + /* Queue containing scheduled range deletions */ + std::queue _queue; + + /* Pointer to the (one and only) operation context used by the thread */ + boost::optional _threadOpCtxHolder; + + /* Thread consuming the range deletions queue */ + stdx::thread _thread; }; // Keeping track of per-collection registered range deletion tasks @@ -203,12 +218,18 @@ public: * Register a task on the range deleter service. * Returns a future that will be marked ready once the range deletion will be completed. * - * In case of trying to register an already existing task, the future will contain an error. + * In case of trying to register an already existing task, the original future will be returned. + * + * A task can be registered only if the service is up (except for tasks resubmitted on step-up). + * + * When a task is registered as `pending`, it can be unblocked by calling again the same method + * with `pending=false`. */ SharedSemiFuture registerTask( const RangeDeletionTask& rdt, SemiFuture&& waitForActiveQueriesToComplete = SemiFuture::makeReady(), - bool fromResubmitOnStepUp = false); + bool fromResubmitOnStepUp = false, + bool pending = false); /* * Deregister a task from the range deleter service. diff --git a/src/mongo/db/s/range_deleter_service_test.cpp b/src/mongo/db/s/range_deleter_service_test.cpp index 5d6b49b7587..69807d52684 100644 --- a/src/mongo/db/s/range_deleter_service_test.cpp +++ b/src/mongo/db/s/range_deleter_service_test.cpp @@ -851,4 +851,32 @@ TEST_F(RangeDeleterServiceTest, GetOverlappingRangeDeletionsWithNonContiguousTas ASSERT_OK(futureReadyWhenTask30Ready.getNoThrow(opCtx)); } +TEST_F(RangeDeleterServiceTest, RegisterPendingTaskAndMarkItNonPending) { + // Set delay for waiting secondary queries to 0 + int defaultOrphanCleanupDelaySecs = orphanCleanupDelaySecs.load(); + ScopeGuard reset([=] { orphanCleanupDelaySecs.store(defaultOrphanCleanupDelaySecs); }); + orphanCleanupDelaySecs.store(0); + + auto rds = RangeDeleterService::get(opCtx); + auto taskWithOngoingQueries = rangeDeletionTask0ForCollA; + + // Drain queries since the beginning + taskWithOngoingQueries->drainOngoingQueries(); + + // Register task as pending (will not be processed until someone registers it again as !pending) + auto completionFuture = rds->registerTask(taskWithOngoingQueries->getTask(), + taskWithOngoingQueries->getOngoingQueriesFuture(), + false /* from step up*/, + true /* pending */); + + ASSERT(!completionFuture.isReady()); + + // Re-registering the task as non-pending must unblock the range deletion + registerAndCreatePersistentTask( + opCtx, taskWithOngoingQueries->getTask(), taskWithOngoingQueries->getOngoingQueriesFuture()) + .get(opCtx); + + ASSERT(completionFuture.isReady()); +} + } // namespace mongo diff --git a/src/mongo/db/s/range_deleter_service_test_util.cpp b/src/mongo/db/s/range_deleter_service_test_util.cpp index 607f51abbf8..fde50854c74 100644 --- a/src/mongo/db/s/range_deleter_service_test_util.cpp +++ b/src/mongo/db/s/range_deleter_service_test_util.cpp @@ -85,15 +85,15 @@ std::shared_ptr createRangeDeletionTaskWithOngo createRangeDeletionTask(collectionUUID, min, max, whenToClean, pending, keyPattern)); } -// TODO review this method: the task may be registered, finish and be recreated by inserting the -// document SharedSemiFuture registerAndCreatePersistentTask( OperationContext* opCtx, const RangeDeletionTask& rdt, SemiFuture&& waitForActiveQueriesToComplete) { auto rds = RangeDeleterService::get(opCtx); - auto completionFuture = rds->registerTask(rdt, std::move(waitForActiveQueriesToComplete)); + // Register task as `pending` in order to block it until the persistent document is non-pending + auto completionFuture = rds->registerTask( + rdt, std::move(waitForActiveQueriesToComplete), false /* fromStepUp */, true /* pending*/); // Range deletion task will only proceed if persistent doc exists and its `pending` field // doesn't exist diff --git a/src/mongo/db/s/sharding_server_status.cpp b/src/mongo/db/s/sharding_server_status.cpp index a765d9fd7e4..c47bfd6388e 100644 --- a/src/mongo/db/s/sharding_server_status.cpp +++ b/src/mongo/db/s/sharding_server_status.cpp @@ -33,6 +33,7 @@ #include "mongo/db/commands/server_status.h" #include "mongo/db/s/active_migrations_registry.h" #include "mongo/db/s/collection_sharding_state.h" +#include "mongo/db/s/range_deleter_service.h" #include "mongo/db/s/sharding_data_transform_cumulative_metrics.h" #include "mongo/db/s/sharding_state.h" #include "mongo/db/s/sharding_statistics.h" @@ -125,6 +126,11 @@ public: ShardingStatistics::get(opCtx).report(&result); catalogCache->report(&result); + if (mongo::feature_flags::gRangeDeleterService.isEnabledAndIgnoreFCV()) { + auto nRangeDeletions = RangeDeleterService::get(opCtx)->totalNumOfRegisteredTasks(); + result.appendNumber("rangeDeleterTasks", nRangeDeletions); + } + CollectionShardingState::appendInfoForServerStatus(opCtx, &result); } diff --git a/src/mongo/s/sharding_feature_flags.idl b/src/mongo/s/sharding_feature_flags.idl index 917172718d0..218702b0496 100644 --- a/src/mongo/s/sharding_feature_flags.idl +++ b/src/mongo/s/sharding_feature_flags.idl @@ -54,7 +54,8 @@ feature_flags: featureFlagRangeDeleterService: description: "Feature flag protecting instantiation and usages of the range deleter service" cpp_varname: feature_flags::gRangeDeleterService - default: false + default: true + version: 6.2 featureFlagCollModCoordinatorV3: description: "Feature for enabling new coll mod coordinator v3" cpp_varname: feature_flags::gCollModCoordinatorV3 -- cgit v1.2.1