From bdeb7d80139abd91d3db4eaa29df796375096ec4 Mon Sep 17 00:00:00 2001 From: Pierlauro Sciarelli Date: Tue, 20 Sep 2022 10:13:29 +0000 Subject: SERVER-68725 Plug actual deletion in the range deleter service Co-authored-by: Pierlauro Sciarelli Co-authored-by: Silvia Surroca --- src/mongo/db/s/SConscript | 1 + src/mongo/db/s/migration_coordinator.cpp | 3 +- src/mongo/db/s/migration_destination_manager.cpp | 6 +- src/mongo/db/s/migration_util.cpp | 24 -- src/mongo/db/s/migration_util.h | 11 - src/mongo/db/s/migration_util_test.cpp | 5 +- src/mongo/db/s/range_deleter_service.cpp | 173 ++++++++- .../s/range_deleter_service_op_observer_test.cpp | 60 ---- src/mongo/db/s/range_deleter_service_test.cpp | 400 +++++++++++++-------- src/mongo/db/s/range_deleter_service_test.h | 66 +++- src/mongo/db/s/range_deleter_service_test_util.cpp | 177 +++++++++ src/mongo/db/s/range_deletion_util.cpp | 110 +++--- src/mongo/db/s/range_deletion_util.h | 59 +++ src/mongo/db/s/range_deletion_util_test.cpp | 3 - 14 files changed, 770 insertions(+), 328 deletions(-) create mode 100644 src/mongo/db/s/range_deleter_service_test_util.cpp diff --git a/src/mongo/db/s/SConscript b/src/mongo/db/s/SConscript index c1d894535fb..b67524f1834 100644 --- a/src/mongo/db/s/SConscript +++ b/src/mongo/db/s/SConscript @@ -647,6 +647,7 @@ env.CppUnitTest( 'operation_sharding_state_test.cpp', 'persistent_task_queue_test.cpp', 'range_deleter_service_test.cpp', + 'range_deleter_service_test_util.cpp', 'range_deleter_service_op_observer_test.cpp', 'range_deletion_util_test.cpp', 'resharding/resharding_agg_test.cpp', diff --git a/src/mongo/db/s/migration_coordinator.cpp b/src/mongo/db/s/migration_coordinator.cpp index 57ad09a9ded..5844d30c3a5 100644 --- a/src/mongo/db/s/migration_coordinator.cpp +++ b/src/mongo/db/s/migration_coordinator.cpp @@ -31,6 +31,7 @@ #include "mongo/db/s/migration_util.h" #include "mongo/db/s/range_deletion_task_gen.h" +#include "mongo/db/s/range_deletion_util.h" #include "mongo/db/session/logical_session_id_helpers.h" #include "mongo/db/vector_clock_mutable.h" #include "mongo/logv2/log.h" @@ -215,7 +216,7 @@ SemiFuture MigrationCoordinator::_commitMigrationOnDonorAndRecipient( const auto numOrphans = migrationutil::retrieveNumOrphansFromRecipient(opCtx, _migrationInfo); if (numOrphans > 0) { - migrationutil::persistUpdatedNumOrphans( + persistUpdatedNumOrphans( opCtx, _migrationInfo.getCollectionUuid(), _migrationInfo.getRange(), numOrphans); } diff --git a/src/mongo/db/s/migration_destination_manager.cpp b/src/mongo/db/s/migration_destination_manager.cpp index fee3a2f796b..72ee678b386 100644 --- a/src/mongo/db/s/migration_destination_manager.cpp +++ b/src/mongo/db/s/migration_destination_manager.cpp @@ -58,6 +58,7 @@ #include "mongo/db/s/move_timing_helper.h" #include "mongo/db/s/operation_sharding_state.h" #include "mongo/db/s/range_deletion_task_gen.h" +#include "mongo/db/s/range_deletion_util.h" #include "mongo/db/s/shard_filtering_metadata_refresh.h" #include "mongo/db/s/sharding_recovery_service.h" #include "mongo/db/s/sharding_runtime_d_params_gen.h" @@ -1379,7 +1380,7 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* outerOpCtx, // Revert to the original DocumentValidationSettings for opCtx } - migrationutil::persistUpdatedNumOrphans( + persistUpdatedNumOrphans( opCtx, *_collectionUuid, ChunkRange(_min, _max), batchNumCloned); { @@ -1827,8 +1828,7 @@ bool MigrationDestinationManager::_applyMigrateOp(OperationContext* opCtx, const } if (changeInOrphans != 0) { - migrationutil::persistUpdatedNumOrphans( - opCtx, *_collectionUuid, ChunkRange(_min, _max), changeInOrphans); + persistUpdatedNumOrphans(opCtx, *_collectionUuid, ChunkRange(_min, _max), changeInOrphans); } return didAnything; } diff --git a/src/mongo/db/s/migration_util.cpp b/src/mongo/db/s/migration_util.cpp index 55083911f44..d3110cb8f13 100644 --- a/src/mongo/db/s/migration_util.cpp +++ b/src/mongo/db/s/migration_util.cpp @@ -643,30 +643,6 @@ void persistRangeDeletionTaskLocally(OperationContext* opCtx, } } -void persistUpdatedNumOrphans(OperationContext* opCtx, - const UUID& collectionUuid, - const ChunkRange& range, - long long changeInOrphans) { - const auto query = getQueryFilterForRangeDeletionTask(collectionUuid, range); - try { - PersistentTaskStore store(NamespaceString::kRangeDeletionNamespace); - ScopedRangeDeleterLock rangeDeleterLock(opCtx, collectionUuid); - // The DBDirectClient will not retry WriteConflictExceptions internally while holding an X - // mode lock, so we need to retry at this level. - writeConflictRetry( - opCtx, "updateOrphanCount", NamespaceString::kRangeDeletionNamespace.ns(), [&] { - store.update(opCtx, - query, - BSON("$inc" << BSON(RangeDeletionTask::kNumOrphanDocsFieldName - << changeInOrphans)), - WriteConcerns::kLocalWriteConcern); - }); - BalancerStatsRegistry::get(opCtx)->updateOrphansCount(collectionUuid, changeInOrphans); - } catch (const ExceptionFor&) { - // When upgrading or downgrading, there may be no documents with the orphan count field. - } -} - long long retrieveNumOrphansFromRecipient(OperationContext* opCtx, const MigrationCoordinatorDocument& migrationInfo) { const auto recipientShard = uassertStatusOK( diff --git a/src/mongo/db/s/migration_util.h b/src/mongo/db/s/migration_util.h index 7ac9da2a9b2..df5ba9a15b6 100644 --- a/src/mongo/db/s/migration_util.h +++ b/src/mongo/db/s/migration_util.h @@ -47,8 +47,6 @@ class ShardId; namespace migrationutil { -constexpr auto kRangeDeletionThreadName = "range-deleter"_sd; - /** * Creates a report document with the provided parameters: * @@ -137,15 +135,6 @@ void persistRangeDeletionTaskLocally(OperationContext* opCtx, const RangeDeletionTask& deletionTask, const WriteConcernOptions& writeConcern); -/** - * Updates the range deletion task document to increase or decrease numOrphanedDocs and waits for - * write concern. - */ -void persistUpdatedNumOrphans(OperationContext* opCtx, - const UUID& collectionUuid, - const ChunkRange& range, - long long changeInOrphans); - /** * Retrieves the value of 'numOrphanedDocs' from the recipient shard's range deletion task document. */ diff --git a/src/mongo/db/s/migration_util_test.cpp b/src/mongo/db/s/migration_util_test.cpp index 2807b26ba88..ee6e189178d 100644 --- a/src/mongo/db/s/migration_util_test.cpp +++ b/src/mongo/db/s/migration_util_test.cpp @@ -36,6 +36,7 @@ #include "mongo/db/s/collection_sharding_runtime_test.cpp" #include "mongo/db/s/migration_util.h" #include "mongo/db/s/operation_sharding_state.h" +#include "mongo/db/s/range_deletion_util.h" #include "mongo/db/s/shard_filtering_metadata_refresh.h" #include "mongo/db/s/shard_server_catalog_cache_loader.h" #include "mongo/db/s/shard_server_test_fixture.h" @@ -344,11 +345,11 @@ TEST_F(MigrationUtilsTest, TestUpdateNumberOfOrphans) { auto rangeDeletionDoc = createDeletionTask(opCtx, kTestNss, collectionUuid, 0, 10); store.add(opCtx, rangeDeletionDoc); - migrationutil::persistUpdatedNumOrphans(opCtx, collectionUuid, rangeDeletionDoc.getRange(), 5); + persistUpdatedNumOrphans(opCtx, collectionUuid, rangeDeletionDoc.getRange(), 5); rangeDeletionDoc.setNumOrphanDocs(5); ASSERT_EQ(store.count(opCtx, rangeDeletionDoc.toBSON().removeField("timestamp")), 1); - migrationutil::persistUpdatedNumOrphans(opCtx, collectionUuid, rangeDeletionDoc.getRange(), -5); + persistUpdatedNumOrphans(opCtx, collectionUuid, rangeDeletionDoc.getRange(), -5); rangeDeletionDoc.setNumOrphanDocs(0); ASSERT_EQ(store.count(opCtx, rangeDeletionDoc.toBSON().removeField("timestamp")), 1); } diff --git a/src/mongo/db/s/range_deleter_service.cpp b/src/mongo/db/s/range_deleter_service.cpp index a24e14082f9..ee5ff61f608 100644 --- a/src/mongo/db/s/range_deleter_service.cpp +++ b/src/mongo/db/s/range_deleter_service.cpp @@ -28,10 +28,16 @@ */ #include "mongo/db/s/range_deleter_service.h" +#include "mongo/db/catalog_raii.h" #include "mongo/db/dbdirectclient.h" #include "mongo/db/op_observer/op_observer_registry.h" +#include "mongo/db/repl/repl_client_info.h" +#include "mongo/db/repl/wait_for_majority_service.h" #include "mongo/db/s/balancer_stats_registry.h" +#include "mongo/db/s/collection_sharding_runtime.h" #include "mongo/db/s/range_deleter_service_op_observer.h" +#include "mongo/db/s/range_deletion_util.h" +#include "mongo/db/s/shard_filtering_metadata_refresh.h" #include "mongo/logv2/log.h" #include "mongo/s/sharding_feature_flags_gen.h" #include "mongo/util/future_util.h" @@ -41,8 +47,32 @@ namespace mongo { namespace { const auto rangeDeleterServiceDecorator = ServiceContext::declareDecoration(); + +const BSONObj getShardKeyPattern(OperationContext* opCtx, + const DatabaseName& dbName, + const UUID& collectionUuid) { + while (true) { + opCtx->checkForInterrupt(); + boost::optional optNss; + { + AutoGetCollection collection( + opCtx, NamespaceStringOrUUID{dbName.toString(), collectionUuid}, MODE_IS); + + auto optMetadata = CollectionShardingRuntime::get(opCtx, collection.getNss()) + ->getCurrentMetadataIfKnown(); + if (optMetadata && optMetadata->isSharded()) { + return optMetadata->getShardKeyPattern().toBSON(); + } + optNss = collection.getNss(); + } + + onShardVersionMismatchNoExcept(opCtx, *optNss, boost::none).ignore(); + continue; + } } +} // namespace + const ReplicaSetAwareServiceRegistry::Registerer rangeDeleterServiceRegistryRegisterer("RangeDeleterService"); @@ -59,6 +89,13 @@ void RangeDeleterService::onStepUpComplete(OperationContext* opCtx, long long te return; } + if (disableResumableRangeDeleter.load()) { + LOGV2_INFO( + 6872508, + "Not resuming range deletions on step-up because `disableResumableRangeDeleter=true`"); + return; + } + auto lock = _acquireMutexUnconditionally(); dassert(_state.load() == kDown, "Service expected to be down before stepping up"); @@ -236,6 +273,8 @@ SharedSemiFuture RangeDeleterService::registerTask( bool fromResubmitOnStepUp) { if (disableResumableRangeDeleter.load()) { + LOGV2_INFO(6872509, + "Not scheduling range deletion because `disableResumableRangeDeleter=true`"); return SemiFuture::makeReady( Status(ErrorCodes::ResumableRangeDeleterDisabled, "Not submitting any range deletion task because the " @@ -278,12 +317,134 @@ SharedSemiFuture RangeDeleterService::registerTask( _executor->now() + delayForActiveQueriesOnSecondariesToComplete) .share(); }) - .then([this, collUuid = rdt.getCollectionUuid(), range = rdt.getRange()]() { - // Step 3: perform the actual range deletion - // TODO - - // Deregister the task - deregisterTask(collUuid, range); + .then([this, + dbName = rdt.getNss().dbName(), + collectionUuid = rdt.getCollectionUuid(), + range = rdt.getRange()]() { + return withTemporaryOperationContext( + [&](OperationContext* opCtx) { + // A task is considered completed when all the following conditions are met: + // - All orphans have been deleted + // - The deletions have been majority committed + // - The range deletion task document has been deleted + bool taskCompleted = false; + + while (!taskCompleted) { + try { + // Perform the actual range deletion + bool orphansRemovalCompleted = false; + while (!orphansRemovalCompleted) { + try { + LOGV2_DEBUG( + 6872501, + 2, + "Beginning deletion of documents in orphan range", + "dbName"_attr = dbName, + "collectionUUID"_attr = collectionUuid.toString(), + "range"_attr = redact(range.toString())); + + auto shardKeyPattern = + getShardKeyPattern(opCtx, dbName, collectionUuid); + + uassertStatusOK(deleteRangeInBatches( + opCtx, dbName, collectionUuid, shardKeyPattern, range)); + orphansRemovalCompleted = true; + } catch (ExceptionFor&) { + // No orphaned documents to remove from a dropped collection + orphansRemovalCompleted = true; + } catch ( + ExceptionFor< + ErrorCodes:: + RangeDeletionAbandonedBecauseTaskDocumentDoesNotExist>&) { + // No orphaned documents to remove from a dropped collection + orphansRemovalCompleted = true; + } catch ( + ExceptionFor< + ErrorCodes:: + RangeDeletionAbandonedBecauseCollectionWithUUIDDoesNotExist>&) { + // The task can be considered completed because the range + // deletion document doesn't exist + orphansRemovalCompleted = true; + } catch (const DBException& e) { + LOGV2_ERROR(6872502, + "Failed to delete documents in orphan range", + "dbName"_attr = dbName, + "collectionUUID"_attr = + collectionUuid.toString(), + "range"_attr = redact(range.toString()), + "error"_attr = e); + throw; + } + } + + { + repl::ReplClientInfo::forClient(opCtx->getClient()) + .setLastOpToSystemLastOpTime(opCtx); + auto clientOpTime = + repl::ReplClientInfo::forClient(opCtx->getClient()) + .getLastOp(); + + LOGV2_DEBUG( + 6872503, + 2, + "Waiting for majority replication of local deletions", + "dbName"_attr = dbName, + "collectionUUID"_attr = collectionUuid, + "range"_attr = redact(range.toString()), + "clientOpTime"_attr = clientOpTime); + + // Synchronously wait for majority before removing the range + // deletion task document: oplog gets applied in parallel for + // different collections, so it's important not to apply + // out of order the deletions of orphans and the removal of the + // entry persisted in `config.rangeDeletions` + WaitForMajorityService::get(opCtx->getServiceContext()) + .waitUntilMajority(clientOpTime, + CancellationToken::uncancelable()) + .get(opCtx); + } + + // Remove persistent range deletion task + try { + removePersistentRangeDeletionTask(opCtx, collectionUuid, range); + + LOGV2_DEBUG( + 6872504, + 2, + "Completed removal of persistent range deletion task", + "dbName"_attr = dbName, + "collectionUUID"_attr = collectionUuid.toString(), + "range"_attr = redact(range.toString())); + + } catch (const DBException& e) { + LOGV2_ERROR(6872505, + "Failed to remove persistent range deletion task", + "dbName"_attr = dbName, + "collectionUUID"_attr = collectionUuid.toString(), + "range"_attr = redact(range.toString()), + "error"_attr = e); + throw; + } + } catch (const DBException& e) { + // Fail in case of shutdown/stepdown errors as the range + // deletion will be resumed on the next step up + if (ErrorCodes::isShutdownError(e.code()) || + ErrorCodes::isNotPrimaryError(e.code())) { + return e.toStatus(); + } + + // Iterate again in case of any other error + continue; + } + + taskCompleted = true; + } + + return Status::OK(); + }, + dbName, + collectionUuid, + true); }) // IMPORTANT: no continuation should be added to this chain after this point // in order to make sure range deletions order is preserved. diff --git a/src/mongo/db/s/range_deleter_service_op_observer_test.cpp b/src/mongo/db/s/range_deleter_service_op_observer_test.cpp index 09fa8e3eb91..3d7d4767bf9 100644 --- a/src/mongo/db/s/range_deleter_service_op_observer_test.cpp +++ b/src/mongo/db/s/range_deleter_service_op_observer_test.cpp @@ -32,66 +32,6 @@ namespace mongo { -void insertRangeDeletionTaskDocument(OperationContext* opCtx, const RangeDeletionTask& rdt) { - PersistentTaskStore store(NamespaceString::kRangeDeletionNamespace); - store.add(opCtx, rdt); -} - -void updatePendingField(OperationContext* opCtx, UUID migrationId, bool pending) { - PersistentTaskStore store(NamespaceString::kRangeDeletionNamespace); - store.update(opCtx, - BSON(RangeDeletionTask::kIdFieldName << migrationId), - BSON("$set" << BSON(RangeDeletionTask::kPendingFieldName << pending))); -} - -void removePendingField(OperationContext* opCtx, UUID migrationId) { - PersistentTaskStore store(NamespaceString::kRangeDeletionNamespace); - store.update(opCtx, - BSON(RangeDeletionTask::kIdFieldName << migrationId), - BSON("$unset" << BSON(RangeDeletionTask::kPendingFieldName << ""))); -} - -void deleteRangeDeletionTaskDocument(OperationContext* opCtx, UUID migrationId) { - PersistentTaskStore store(NamespaceString::kRangeDeletionNamespace); - store.remove(opCtx, BSON(RangeDeletionTask::kIdFieldName << migrationId)); -} - -// Ensure that `expectedChunkRanges` range deletion tasks are scheduled for collection with UUID -// `uuidColl` -void verifyRangeDeletionTasks(OperationContext* opCtx, - UUID uuidColl, - std::vector expectedChunkRanges) { - auto rds = RangeDeleterService::get(opCtx); - - // Get chunk ranges inserted to be deleted by RangeDeleterService - BSONObj dumpState = rds->dumpState(); - BSONElement chunkRangesElem = dumpState.getField(uuidColl.toString()); - if (!chunkRangesElem.ok() && expectedChunkRanges.size() == 0) { - return; - } - ASSERT(chunkRangesElem.ok()) << "Expected to find range deletion tasks from collection " - << uuidColl.toString(); - - const auto chunkRanges = chunkRangesElem.Array(); - ASSERT_EQ(chunkRanges.size(), expectedChunkRanges.size()); - - // Sort expectedChunkRanges vector to replicate RangeDeleterService dumpState order - struct { - bool operator()(const ChunkRange& a, const ChunkRange& b) { - return a.getMin().woCompare(b.getMin()) < 0; - } - } RANGES_COMPARATOR; - - std::sort(expectedChunkRanges.begin(), expectedChunkRanges.end(), RANGES_COMPARATOR); - - // Check expectedChunkRanges are exactly the same as the returned ones - for (size_t i = 0; i < expectedChunkRanges.size(); ++i) { - ASSERT(ChunkRange::fromBSONThrowing(chunkRanges[i].Obj()) == expectedChunkRanges[i]) - << "Expected " << ChunkRange::fromBSONThrowing(chunkRanges[i].Obj()).toBSON() - << " == " << expectedChunkRanges[i].toBSON(); - } -} - /** ** TESTS */ diff --git a/src/mongo/db/s/range_deleter_service_test.cpp b/src/mongo/db/s/range_deleter_service_test.cpp index 6d3d49eced0..8d465cfc34d 100644 --- a/src/mongo/db/s/range_deleter_service_test.cpp +++ b/src/mongo/db/s/range_deleter_service_test.cpp @@ -32,6 +32,8 @@ #include "mongo/db/catalog/create_collection.h" #include "mongo/db/catalog_raii.h" #include "mongo/db/persistent_task_store.h" +#include "mongo/db/repl/wait_for_majority_service.h" +#include "mongo/db/s/collection_sharding_runtime.h" #include "mongo/db/s/operation_sharding_state.h" namespace mongo { @@ -41,6 +43,7 @@ namespace mongo { */ void RangeDeleterServiceTest::setUp() { ShardServerTestFixture::setUp(); + WaitForMajorityService::get(getServiceContext()).startup(getServiceContext()); opCtx = operationContext(); RangeDeleterService::get(opCtx)->onStepUpComplete(opCtx, 0L); RangeDeleterService::get(opCtx)->_waitForRangeDeleterServiceUp_FOR_TESTING(); @@ -57,68 +60,67 @@ void RangeDeleterServiceTest::setUp() { { AutoGetCollection autoColl(opCtx, nsCollA, MODE_IX); uuidCollA = autoColl.getCollection()->uuid(); + nssWithUuid[uuidCollA] = nsCollA; + _setFilteringMetadataWithUUID(opCtx, uuidCollA); } { AutoGetCollection autoColl(opCtx, nsCollB, MODE_IX); uuidCollB = autoColl.getCollection()->uuid(); + nssWithUuid[uuidCollB] = nsCollB; + _setFilteringMetadataWithUUID(opCtx, uuidCollB); } - rangeDeletionTask0ForCollA = - createRangeDeletionTaskWithOngoingQueries(uuidCollA, BSON("a" << 0), BSON("a" << 10)); - rangeDeletionTask1ForCollA = - createRangeDeletionTaskWithOngoingQueries(uuidCollA, BSON("a" << 10), BSON("a" << 20)); - rangeDeletionTask0ForCollB = - createRangeDeletionTaskWithOngoingQueries(uuidCollB, BSON("a" << 0), BSON("a" << 10)); + rangeDeletionTask0ForCollA = createRangeDeletionTaskWithOngoingQueries( + uuidCollA, BSON(kShardKey << 0), BSON(kShardKey << 10)); + rangeDeletionTask1ForCollA = createRangeDeletionTaskWithOngoingQueries( + uuidCollA, BSON(kShardKey << 10), BSON(kShardKey << 20)); + rangeDeletionTask0ForCollB = createRangeDeletionTaskWithOngoingQueries( + uuidCollB, BSON(kShardKey << 0), BSON(kShardKey << 10)); } void RangeDeleterServiceTest::tearDown() { RangeDeleterService::get(opCtx)->onStepDown(); RangeDeleterService::get(opCtx)->onShutdown(); + WaitForMajorityService::get(opCtx->getServiceContext()).shutDown(); ShardServerTestFixture::tearDown(); } -RangeDeletionTask RangeDeleterServiceTest::createRangeDeletionTask(const UUID& collectionUUID, - const BSONObj& min, - const BSONObj& max, - CleanWhenEnum whenToClean, - bool pending) { - RangeDeletionTask rdt; - rdt.setId(UUID::gen()); - rdt.setNss(NamespaceString("test.mock")); - rdt.setDonorShardId(ShardId("shard0")); - rdt.setCollectionUuid(collectionUUID); - rdt.setRange(ChunkRange(min, max)); - rdt.setWhenToClean(whenToClean); - rdt.setPending(pending); - return rdt; -} - -std::shared_ptr -RangeDeleterServiceTest::createRangeDeletionTaskWithOngoingQueries(const UUID& collectionUUID, - const BSONObj& min, - const BSONObj& max, - CleanWhenEnum whenToClean, - bool pending) { - return std::make_shared( - createRangeDeletionTask(collectionUUID, min, max, whenToClean, pending)); -} - -/** - * RangeDeletionWithOngoingQueries implementation - */ -RangeDeletionWithOngoingQueries::RangeDeletionWithOngoingQueries(const RangeDeletionTask& t) - : _task(t) {} - -RangeDeletionTask RangeDeletionWithOngoingQueries::getTask() { - return _task; -} - -void RangeDeletionWithOngoingQueries::drainOngoingQueries() { - _ongoingQueries.setFrom(Status::OK()); -} - -auto RangeDeletionWithOngoingQueries::getOngoingQueriesFuture() { - return _ongoingQueries.getFuture().semi(); +void RangeDeleterServiceTest::_setFilteringMetadataWithUUID(OperationContext* opCtx, + const UUID& uuid) { + const OID epoch = OID::gen(); + NamespaceString nss = nssWithUuid[uuid]; + + const CollectionMetadata metadata = [&]() { + auto chunk = ChunkType(uuid, + ChunkRange{BSON(kShardKey << MINKEY), BSON(kShardKey << MAXKEY)}, + ChunkVersion({epoch, Timestamp(1, 1)}, {1, 0}), + ShardId("this")); + ChunkManager cm(ShardId("this"), + DatabaseVersion(UUID::gen(), Timestamp(1, 1)), + makeStandaloneRoutingTableHistory( + RoutingTableHistory::makeNew(nss, + uuid, + kShardKeyPattern, + nullptr, + false, + epoch, + Timestamp(1, 1), + boost::none /* timeseriesFields */, + boost::none, + boost::none /* chunkSizeBytes */, + true, + {std::move(chunk)})), + boost::none); + + return CollectionMetadata(std::move(cm), ShardId("this")); + }(); + + AutoGetCollection autoColl(opCtx, nss, LockMode::MODE_X); + + CollectionShardingRuntime::get(opCtx, nss)->setFilteringMetadata(opCtx, metadata); + auto* css = CollectionShardingState::get(opCtx, nss); + auto& csr = *checked_cast(css); + csr.setFilteringMetadata(opCtx, metadata); } /** @@ -129,8 +131,10 @@ TEST_F(RangeDeleterServiceTest, RegisterAndProcessSingleTask) { auto rds = RangeDeleterService::get(opCtx); auto taskWithOngoingQueries = rangeDeletionTask0ForCollA; - auto completionFuture = rds->registerTask(taskWithOngoingQueries->getTask(), - taskWithOngoingQueries->getOngoingQueriesFuture()); + auto completionFuture = + registerAndCreatePersistentTask(opCtx, + taskWithOngoingQueries->getTask(), + taskWithOngoingQueries->getOngoingQueriesFuture()); // The task can't be processed (hence completed) before ongoing queries drain ASSERT(!completionFuture.isReady()); ASSERT_EQ(1, rds->getNumRangeDeletionTasksForCollection(uuidCollA)); @@ -145,8 +149,10 @@ TEST_F(RangeDeleterServiceTest, RegisterDuplicateTaskForSameRangeReturnsOriginal auto rds = RangeDeleterService::get(opCtx); auto taskWithOngoingQueries = rangeDeletionTask0ForCollA; - auto originalTaskCompletionFuture = rds->registerTask( - taskWithOngoingQueries->getTask(), taskWithOngoingQueries->getOngoingQueriesFuture()); + auto originalTaskCompletionFuture = + registerAndCreatePersistentTask(opCtx, + taskWithOngoingQueries->getTask(), + taskWithOngoingQueries->getOngoingQueriesFuture()); // Trying registering a duplicate task must return a future without throwing errors auto duplicateTaskCompletionFuture = @@ -171,10 +177,14 @@ TEST_F(RangeDeleterServiceTest, RegisterAndProcessMoreTasksForSameCollection) { auto task1WithOngoingQueries = rangeDeletionTask1ForCollA; // Register 2 tasks for the same collection - auto completionFuture0 = rds->registerTask(task0WithOngoingQueries->getTask(), - task0WithOngoingQueries->getOngoingQueriesFuture()); - auto completionFuture1 = rds->registerTask(task1WithOngoingQueries->getTask(), - task1WithOngoingQueries->getOngoingQueriesFuture()); + auto completionFuture0 = + registerAndCreatePersistentTask(opCtx, + task0WithOngoingQueries->getTask(), + task0WithOngoingQueries->getOngoingQueriesFuture()); + auto completionFuture1 = + registerAndCreatePersistentTask(opCtx, + task1WithOngoingQueries->getTask(), + task1WithOngoingQueries->getOngoingQueriesFuture()); // The tasks can't be processed (hence completed) before ongoing queries drain ASSERT(!completionFuture0.isReady()); @@ -199,11 +209,13 @@ TEST_F(RangeDeleterServiceTest, RegisterAndProcessTasksForDifferentCollections) // Register 1 tasks for `collA` and 1 task for `collB` auto completionFutureCollA = - rds->registerTask(taskWithOngoingQueriesCollA->getTask(), - taskWithOngoingQueriesCollA->getOngoingQueriesFuture()); + registerAndCreatePersistentTask(opCtx, + taskWithOngoingQueriesCollA->getTask(), + taskWithOngoingQueriesCollA->getOngoingQueriesFuture()); auto completionFutureCollB = - rds->registerTask(taskWithOngoingQueriesCollB->getTask(), - taskWithOngoingQueriesCollB->getOngoingQueriesFuture()); + registerAndCreatePersistentTask(opCtx, + taskWithOngoingQueriesCollB->getTask(), + taskWithOngoingQueriesCollB->getOngoingQueriesFuture()); // The tasks can't be processed (hence completed) before ongoing queries drain ASSERT(!completionFutureCollA.isReady()); @@ -232,12 +244,13 @@ TEST_F(RangeDeleterServiceTest, DelayForSecondaryQueriesIsHonored) { // Set delay for waiting secondary queries to 2 seconds orphanCleanupDelaySecs.store(2); - auto rds = RangeDeleterService::get(opCtx); auto taskWithOngoingQueries = createRangeDeletionTaskWithOngoingQueries( - uuidCollA, BSON("a" << 0), BSON("a" << 10), CleanWhenEnum::kDelayed); + uuidCollA, BSON(kShardKey << 0), BSON(kShardKey << 10), CleanWhenEnum::kDelayed); - auto completionFuture = rds->registerTask(taskWithOngoingQueries->getTask(), - taskWithOngoingQueries->getOngoingQueriesFuture()); + auto completionFuture = + registerAndCreatePersistentTask(opCtx, + taskWithOngoingQueries->getTask(), + taskWithOngoingQueries->getOngoingQueriesFuture()); // Check that the task lasts at least 2 seconds from the moment ongoing queries drain auto start = Date_t::now(); @@ -253,12 +266,14 @@ TEST_F(RangeDeleterServiceTest, ScheduledTaskInvalidatedOnStepDown) { auto rds = RangeDeleterService::get(opCtx); auto taskWithOngoingQueries = createRangeDeletionTaskWithOngoingQueries( - uuidCollA, BSON("a" << 0), BSON("a" << 10), CleanWhenEnum::kDelayed); + uuidCollA, BSON(kShardKey << 0), BSON(kShardKey << 10), CleanWhenEnum::kDelayed); // Mark ongoing queries as completed taskWithOngoingQueries->drainOngoingQueries(); - auto completionFuture = rds->registerTask(taskWithOngoingQueries->getTask(), - taskWithOngoingQueries->getOngoingQueriesFuture()); + auto completionFuture = + registerAndCreatePersistentTask(opCtx, + taskWithOngoingQueries->getTask(), + taskWithOngoingQueries->getOngoingQueriesFuture()); // Manually trigger disabling of the service rds->onStepDown(); @@ -287,12 +302,14 @@ TEST_F(RangeDeleterServiceTest, NoActionPossibleIfServiceIsDown) { }); auto taskWithOngoingQueries = createRangeDeletionTaskWithOngoingQueries( - uuidCollA, BSON("a" << 0), BSON("a" << 10), CleanWhenEnum::kDelayed); + uuidCollA, BSON(kShardKey << 0), BSON(kShardKey << 10), CleanWhenEnum::kDelayed); - ASSERT_THROWS_CODE(rds->registerTask(taskWithOngoingQueries->getTask(), - taskWithOngoingQueries->getOngoingQueriesFuture()), - DBException, - ErrorCodes::NotYetInitialized); + ASSERT_THROWS_CODE( + registerAndCreatePersistentTask(opCtx, + taskWithOngoingQueries->getTask(), + taskWithOngoingQueries->getOngoingQueriesFuture()), + DBException, + ErrorCodes::NotYetInitialized); ASSERT_THROWS_CODE(rds->deregisterTask(taskWithOngoingQueries->getTask().getCollectionUuid(), taskWithOngoingQueries->getTask().getRange()), @@ -309,80 +326,84 @@ TEST_F(RangeDeleterServiceTest, NoOverlappingRangeDeletionsFuture) { auto rds = RangeDeleterService::get(opCtx); // No range deletion task registered - ChunkRange inputRange(BSON("a" << 0), BSON("a" << 10)); + ChunkRange inputRange(BSON(kShardKey << 0), BSON(kShardKey << 10)); auto fut = rds->getOverlappingRangeDeletionsFuture(uuidCollA, inputRange); ASSERT(fut.isReady()); // Register a range deletion task - auto taskWithOngoingQueries = - createRangeDeletionTaskWithOngoingQueries(uuidCollA, BSON("a" << 0), BSON("a" << 10)); - auto completionFuture = rds->registerTask(taskWithOngoingQueries->getTask(), - taskWithOngoingQueries->getOngoingQueriesFuture()); + auto taskWithOngoingQueries = createRangeDeletionTaskWithOngoingQueries( + uuidCollA, BSON(kShardKey << 0), BSON(kShardKey << 10)); + auto completionFuture = + registerAndCreatePersistentTask(opCtx, + taskWithOngoingQueries->getTask(), + taskWithOngoingQueries->getOngoingQueriesFuture()); // Totally unrelated range - inputRange = ChunkRange(BSON("a" << -10), BSON("a" << -3)); + inputRange = ChunkRange(BSON(kShardKey << -10), BSON(kShardKey << -3)); fut = rds->getOverlappingRangeDeletionsFuture(uuidCollA, inputRange); ASSERT(fut.isReady()); // Range "touching" lower bound - inputRange = ChunkRange(BSON("a" << -10), BSON("a" << 0)); + inputRange = ChunkRange(BSON(kShardKey << -10), BSON(kShardKey << 0)); fut = rds->getOverlappingRangeDeletionsFuture(uuidCollA, inputRange); ASSERT(fut.isReady()); // Range "touching" upper bound - inputRange = ChunkRange(BSON("a" << 10), BSON("a" << 20)); + inputRange = ChunkRange(BSON(kShardKey << 10), BSON(kShardKey << 20)); fut = rds->getOverlappingRangeDeletionsFuture(uuidCollA, inputRange); ASSERT(fut.isReady()); } TEST_F(RangeDeleterServiceTest, OneOverlappingRangeDeletionFuture) { auto rds = RangeDeleterService::get(opCtx); - auto taskWithOngoingQueries = - createRangeDeletionTaskWithOngoingQueries(uuidCollA, BSON("a" << 0), BSON("a" << 10)); + auto taskWithOngoingQueries = createRangeDeletionTaskWithOngoingQueries( + uuidCollA, BSON(kShardKey << 0), BSON(kShardKey << 10)); - auto completionFuture = rds->registerTask(taskWithOngoingQueries->getTask(), - taskWithOngoingQueries->getOngoingQueriesFuture()); + auto completionFuture = + registerAndCreatePersistentTask(opCtx, + taskWithOngoingQueries->getTask(), + taskWithOngoingQueries->getOngoingQueriesFuture()); std::vector> waitForRangeToBeDeletedFutures; // Exact match - ChunkRange inputRange(BSON("a" << 0), BSON("a" << 10)); + ChunkRange inputRange(BSON(kShardKey << 0), BSON(kShardKey << 10)); auto fut = rds->getOverlappingRangeDeletionsFuture(uuidCollA, inputRange); ASSERT(!fut.isReady()); waitForRangeToBeDeletedFutures.push_back(fut); // Super-range - inputRange = ChunkRange(BSON("a" << -10), BSON("a" << 20)); + inputRange = ChunkRange(BSON(kShardKey << -10), BSON(kShardKey << 20)); fut = rds->getOverlappingRangeDeletionsFuture(uuidCollA, inputRange); ASSERT(!fut.isReady()); waitForRangeToBeDeletedFutures.push_back(fut); // Super range touching upper bound - inputRange = ChunkRange(BSON("a" << -10), BSON("a" << 10)); + inputRange = ChunkRange(BSON(kShardKey << -10), BSON(kShardKey << 10)); fut = rds->getOverlappingRangeDeletionsFuture(uuidCollA, inputRange); ASSERT(!fut.isReady()); waitForRangeToBeDeletedFutures.push_back(fut); // Super range touching lower bound - inputRange = ChunkRange(BSON("a" << 0), BSON("a" << 20)); + inputRange = ChunkRange(BSON(kShardKey << 0), BSON(kShardKey << 20)); fut = rds->getOverlappingRangeDeletionsFuture(uuidCollA, inputRange); ASSERT(!fut.isReady()); waitForRangeToBeDeletedFutures.push_back(fut); // Sub-range - inputRange = ChunkRange(BSON("a" << 3), BSON("a" << 6)); + inputRange = ChunkRange(BSON(kShardKey << 3), BSON(kShardKey << 6)); fut = rds->getOverlappingRangeDeletionsFuture(uuidCollA, inputRange); ASSERT(!fut.isReady()); waitForRangeToBeDeletedFutures.push_back(fut); // Sub-range touching upper bound - inputRange = ChunkRange(BSON("a" << 3), BSON("a" << 10)); + inputRange = ChunkRange(BSON(kShardKey << 3), BSON(kShardKey << 10)); fut = rds->getOverlappingRangeDeletionsFuture(uuidCollA, inputRange); ASSERT(!fut.isReady()); waitForRangeToBeDeletedFutures.push_back(fut); // Sub-range touching lower bound - inputRange = ChunkRange(BSON("a" << 0), BSON("a" << 6)); + inputRange = ChunkRange(BSON(kShardKey << 0), BSON(kShardKey << 6)); fut = rds->getOverlappingRangeDeletionsFuture(uuidCollA, inputRange); ASSERT(!fut.isReady()); waitForRangeToBeDeletedFutures.push_back(fut); @@ -398,32 +419,38 @@ TEST_F(RangeDeleterServiceTest, MultipleOverlappingRangeDeletionsFuture) { auto rds = RangeDeleterService::get(opCtx); // Register range deletion tasks [0, 10) - [10, 20) - [20, 30) - auto taskWithOngoingQueries0 = - createRangeDeletionTaskWithOngoingQueries(uuidCollA, BSON("a" << 0), BSON("a" << 10)); - auto completionFuture0 = rds->registerTask(taskWithOngoingQueries0->getTask(), - taskWithOngoingQueries0->getOngoingQueriesFuture()); - auto taskWithOngoingQueries10 = - createRangeDeletionTaskWithOngoingQueries(uuidCollA, BSON("a" << 10), BSON("a" << 20)); - auto completionFuture10 = rds->registerTask( - taskWithOngoingQueries10->getTask(), taskWithOngoingQueries10->getOngoingQueriesFuture()); - auto taskWithOngoingQueries30 = - createRangeDeletionTaskWithOngoingQueries(uuidCollA, BSON("a" << 30), BSON("a" << 40)); - auto completionFuture30 = rds->registerTask( - taskWithOngoingQueries30->getTask(), taskWithOngoingQueries30->getOngoingQueriesFuture()); + auto taskWithOngoingQueries0 = createRangeDeletionTaskWithOngoingQueries( + uuidCollA, BSON(kShardKey << 0), BSON(kShardKey << 10)); + auto completionFuture0 = + registerAndCreatePersistentTask(opCtx, + taskWithOngoingQueries0->getTask(), + taskWithOngoingQueries0->getOngoingQueriesFuture()); + auto taskWithOngoingQueries10 = createRangeDeletionTaskWithOngoingQueries( + uuidCollA, BSON(kShardKey << 10), BSON(kShardKey << 20)); + auto completionFuture10 = + registerAndCreatePersistentTask(opCtx, + taskWithOngoingQueries10->getTask(), + taskWithOngoingQueries10->getOngoingQueriesFuture()); + auto taskWithOngoingQueries30 = createRangeDeletionTaskWithOngoingQueries( + uuidCollA, BSON(kShardKey << 30), BSON(kShardKey << 40)); + auto completionFuture30 = + registerAndCreatePersistentTask(opCtx, + taskWithOngoingQueries30->getTask(), + taskWithOngoingQueries30->getOngoingQueriesFuture()); // Exact match with [0, 10) - ChunkRange inputRange(BSON("a" << 0), BSON("a" << 10)); + ChunkRange inputRange(BSON(kShardKey << 0), BSON(kShardKey << 10)); auto futureReadyWhenTask0Ready = rds->getOverlappingRangeDeletionsFuture(uuidCollA, inputRange); ASSERT(!futureReadyWhenTask0Ready.isReady()); // Super-range spanning across [0, 10) and [10, 20) - inputRange = ChunkRange(BSON("a" << -10), BSON("a" << 20)); + inputRange = ChunkRange(BSON(kShardKey << -10), BSON(kShardKey << 20)); auto futureReadyWhenTasks0And10Ready = rds->getOverlappingRangeDeletionsFuture(uuidCollA, inputRange); ASSERT(!futureReadyWhenTasks0And10Ready.isReady()); // Super-range spanning across [0, 10), [10, 20) and [30, 40) - inputRange = ChunkRange(BSON("a" << -10), BSON("a" << 50)); + inputRange = ChunkRange(BSON(kShardKey << -10), BSON(kShardKey << 50)); auto futureReadyWhenTasks0And10And30Ready = rds->getOverlappingRangeDeletionsFuture(uuidCollA, inputRange); ASSERT(!futureReadyWhenTasks0And10And30Ready.isReady()); @@ -446,41 +473,47 @@ TEST_F(RangeDeleterServiceTest, GetOverlappingRangeDeletionsResilientToRefineSha auto rds = RangeDeleterService::get(opCtx); // Register range deletion tasks [0, 10) - [10, 20) - [20, 30) - auto taskWithOngoingQueries0 = - createRangeDeletionTaskWithOngoingQueries(uuidCollA, BSON("a" << 0), BSON("a" << 10)); - auto completionFuture0 = rds->registerTask(taskWithOngoingQueries0->getTask(), - taskWithOngoingQueries0->getOngoingQueriesFuture()); - auto taskWithOngoingQueries10 = - createRangeDeletionTaskWithOngoingQueries(uuidCollA, BSON("a" << 10), BSON("a" << 20)); - auto completionFuture10 = rds->registerTask( - taskWithOngoingQueries10->getTask(), taskWithOngoingQueries10->getOngoingQueriesFuture()); - auto taskWithOngoingQueries30 = - createRangeDeletionTaskWithOngoingQueries(uuidCollA, BSON("a" << 30), BSON("a" << 40)); - auto completionFuture30 = rds->registerTask( - taskWithOngoingQueries30->getTask(), taskWithOngoingQueries30->getOngoingQueriesFuture()); + auto taskWithOngoingQueries0 = createRangeDeletionTaskWithOngoingQueries( + uuidCollA, BSON(kShardKey << 0), BSON(kShardKey << 10)); + auto completionFuture0 = + registerAndCreatePersistentTask(opCtx, + taskWithOngoingQueries0->getTask(), + taskWithOngoingQueries0->getOngoingQueriesFuture()); + auto taskWithOngoingQueries10 = createRangeDeletionTaskWithOngoingQueries( + uuidCollA, BSON(kShardKey << 10), BSON(kShardKey << 20)); + auto completionFuture10 = + registerAndCreatePersistentTask(opCtx, + taskWithOngoingQueries10->getTask(), + taskWithOngoingQueries10->getOngoingQueriesFuture()); + auto taskWithOngoingQueries30 = createRangeDeletionTaskWithOngoingQueries( + uuidCollA, BSON(kShardKey << 30), BSON(kShardKey << 40)); + auto completionFuture30 = + registerAndCreatePersistentTask(opCtx, + taskWithOngoingQueries30->getTask(), + taskWithOngoingQueries30->getOngoingQueriesFuture()); // Exact match with [0, 10) - ChunkRange inputRange(BSON("a" << 0 << "b" - << "lol"), - BSON("a" << 9 << "b" - << "lol")); + ChunkRange inputRange(BSON(kShardKey << 0 << "b" + << "lol"), + BSON(kShardKey << 9 << "b" + << "lol")); auto futureReadyWhenTask0Ready = rds->getOverlappingRangeDeletionsFuture(uuidCollA, inputRange); ASSERT(!futureReadyWhenTask0Ready.isReady()); // Super-range spanning across [0, 10) and [10, 20) - inputRange = ChunkRange(BSON("a" << -10 << "b" - << "lol"), - BSON("a" << 15 << "b" - << "lol")); + inputRange = ChunkRange(BSON(kShardKey << -10 << "b" + << "lol"), + BSON(kShardKey << 15 << "b" + << "lol")); auto futureReadyWhenTasks0And10Ready = rds->getOverlappingRangeDeletionsFuture(uuidCollA, inputRange); ASSERT(!futureReadyWhenTasks0And10Ready.isReady()); // Super-range spanning across [0, 10), [10, 20) and [30, 40) - inputRange = ChunkRange(BSON("a" << -10 << "b" - << "lol"), - BSON("a" << 50 << "b" - << "lol")); + inputRange = ChunkRange(BSON(kShardKey << -10 << "b" + << "lol"), + BSON(kShardKey << 50 << "b" + << "lol")); auto futureReadyWhenTasks0And10And30Ready = rds->getOverlappingRangeDeletionsFuture(uuidCollA, inputRange); ASSERT(!futureReadyWhenTasks0And10And30Ready.isReady()); @@ -507,14 +540,17 @@ TEST_F(RangeDeleterServiceTest, DumpState) { // Register 2 tasks for `collA` and 1 task for `collB` auto completionFuture0CollA = - rds->registerTask(task0WithOngoingQueriesCollA->getTask(), - task0WithOngoingQueriesCollA->getOngoingQueriesFuture()); + registerAndCreatePersistentTask(opCtx, + task0WithOngoingQueriesCollA->getTask(), + task0WithOngoingQueriesCollA->getOngoingQueriesFuture()); auto completionFuture1CollA = - rds->registerTask(task1WithOngoingQueriesCollA->getTask(), - task1WithOngoingQueriesCollA->getOngoingQueriesFuture()); + registerAndCreatePersistentTask(opCtx, + task1WithOngoingQueriesCollA->getTask(), + task1WithOngoingQueriesCollA->getOngoingQueriesFuture()); auto completionFutureCollB = - rds->registerTask(taskWithOngoingQueriesCollB->getTask(), - taskWithOngoingQueriesCollB->getOngoingQueriesFuture()); + registerAndCreatePersistentTask(opCtx, + taskWithOngoingQueriesCollB->getTask(), + taskWithOngoingQueriesCollB->getOngoingQueriesFuture()); // The tasks can't be processed (hence completed) before ongoing queries drain ASSERT(!completionFuture0CollA.isReady()); @@ -550,14 +586,17 @@ TEST_F(RangeDeleterServiceTest, TotalNumOfRegisteredTasks) { // Register 2 tasks for `collA` and 1 task for `collB` auto completionFuture0CollA = - rds->registerTask(rangeDeletionTask0ForCollA->getTask(), - rangeDeletionTask0ForCollA->getOngoingQueriesFuture()); + registerAndCreatePersistentTask(opCtx, + rangeDeletionTask0ForCollA->getTask(), + rangeDeletionTask0ForCollA->getOngoingQueriesFuture()); auto completionFuture1CollA = - rds->registerTask(rangeDeletionTask1ForCollA->getTask(), - rangeDeletionTask1ForCollA->getOngoingQueriesFuture()); + registerAndCreatePersistentTask(opCtx, + rangeDeletionTask1ForCollA->getTask(), + rangeDeletionTask1ForCollA->getOngoingQueriesFuture()); auto completionFutureCollB = - rds->registerTask(rangeDeletionTask0ForCollB->getTask(), - rangeDeletionTask0ForCollB->getOngoingQueriesFuture()); + registerAndCreatePersistentTask(opCtx, + rangeDeletionTask0ForCollB->getTask(), + rangeDeletionTask0ForCollB->getOngoingQueriesFuture()); ASSERT_EQ(3, rds->totalNumOfRegisteredTasks()); } @@ -568,8 +607,10 @@ TEST_F(RangeDeleterServiceTest, RegisterTaskWithDisableResumableRangeDeleterFlag auto rds = RangeDeleterService::get(opCtx); auto taskWithOngoingQueries = rangeDeletionTask0ForCollA; - auto completionFuture = rds->registerTask(taskWithOngoingQueries->getTask(), - taskWithOngoingQueries->getOngoingQueriesFuture()); + auto completionFuture = + registerAndCreatePersistentTask(opCtx, + taskWithOngoingQueries->getTask(), + taskWithOngoingQueries->getOngoingQueriesFuture()); ASSERT(completionFuture.isReady()); ASSERT_EQ(0, rds->getNumRangeDeletionTasksForCollection(uuidCollA)); } @@ -579,8 +620,10 @@ TEST_F(RangeDeleterServiceTest, auto rds = RangeDeleterService::get(opCtx); auto taskWithOngoingQueries = rangeDeletionTask0ForCollA; - auto completionFuture = rds->registerTask(taskWithOngoingQueries->getTask(), - taskWithOngoingQueries->getOngoingQueriesFuture()); + auto completionFuture = + registerAndCreatePersistentTask(opCtx, + taskWithOngoingQueries->getTask(), + taskWithOngoingQueries->getOngoingQueriesFuture()); ASSERT(!completionFuture.isReady()); ASSERT_EQ(1, rds->getNumRangeDeletionTasksForCollection(uuidCollA)); @@ -611,8 +654,10 @@ TEST_F(RangeDeleterServiceTest, RescheduleRangeDeletionTasksOnStepUp) { int nPending = 0, nNonPending = 0, nNonPendingAndProcessing = 0; int minBound = 0; for (int i = 0; i < nRangeDeletionTasks; i++) { - auto rangeDeletionTask = createRangeDeletionTask( - uuidCollA, BSON("a" << minBound), BSON("a" << minBound + 10), CleanWhenEnum::kDelayed); + auto rangeDeletionTask = createRangeDeletionTask(uuidCollA, + BSON(kShardKey << minBound), + BSON(kShardKey << minBound + 10), + CleanWhenEnum::kDelayed); minBound += 10; auto rand = random.nextInt32() % 3; @@ -643,4 +688,63 @@ TEST_F(RangeDeleterServiceTest, RescheduleRangeDeletionTasksOnStepUp) { rds->getNumRangeDeletionTasksForCollection(uuidCollA)); } +TEST_F(RangeDeleterServiceTest, PerformActualRangeDeletion) { + auto rds = RangeDeleterService::get(opCtx); + auto taskWithOngoingQueries = rangeDeletionTask0ForCollA; + auto nss = nssWithUuid[uuidCollA]; + DBDirectClient dbclient(opCtx); + + insertDocsWithinRange(opCtx, nss, 0, 10, 10); + ASSERT_EQUALS(dbclient.count(nss, BSONObj()), 10); + + auto completionFuture = + registerAndCreatePersistentTask(opCtx, + taskWithOngoingQueries->getTask(), + taskWithOngoingQueries->getOngoingQueriesFuture()); + + // The task can't be processed (hence completed) before ongoing queries drain + ASSERT(!completionFuture.isReady()); + ASSERT_EQ(1, rds->getNumRangeDeletionTasksForCollection(uuidCollA)); + ASSERT_EQUALS(dbclient.count(NamespaceString::kRangeDeletionNamespace), 1); + verifyRangeDeletionTasks(opCtx, uuidCollA, {taskWithOngoingQueries->getTask().getRange()}); + + // Pretend ongoing queries have drained and make sure the task completes + taskWithOngoingQueries->drainOngoingQueries(); + completionFuture.get(opCtx); + ASSERT_EQUALS(dbclient.count(nss), 0); +} + +TEST_F(RangeDeleterServiceTest, OnlyRemoveDocumentsInRangeToDelete) { + auto rds = RangeDeleterService::get(opCtx); + auto taskWithOngoingQueries = rangeDeletionTask0ForCollA; + auto nss = nssWithUuid[uuidCollA]; + DBDirectClient dbclient(opCtx); + + // Insert docs within the range targeted by the deletion task + int numDocsToDelete = insertDocsWithinRange(opCtx, nss, 0, 10, 10); + ASSERT_EQUALS(dbclient.count(nss), numDocsToDelete); + + // Insert docs in a different range + int numDocsToKeep = insertDocsWithinRange(opCtx, nss, 20, 25, 5); + numDocsToKeep += insertDocsWithinRange(opCtx, nss, 100, 105, 5); + ASSERT_EQUALS(dbclient.count(nss), numDocsToKeep + numDocsToDelete); + + auto completionFuture = + registerAndCreatePersistentTask(opCtx, + taskWithOngoingQueries->getTask(), + taskWithOngoingQueries->getOngoingQueriesFuture()); + + // The task can't be processed (hence completed) before ongoing queries drain + ASSERT(!completionFuture.isReady()); + ASSERT_EQ(1, rds->getNumRangeDeletionTasksForCollection(uuidCollA)); + ASSERT_EQUALS(dbclient.count(NamespaceString::kRangeDeletionNamespace), 1); + verifyRangeDeletionTasks(opCtx, uuidCollA, {taskWithOngoingQueries->getTask().getRange()}); + + // Pretend ongoing queries have drained and make sure only orphans were cleared up + taskWithOngoingQueries->drainOngoingQueries(); + completionFuture.get(opCtx); + ASSERT_EQUALS(dbclient.count(nss), numDocsToKeep); + ASSERT_EQUALS(dbclient.count(NamespaceString::kRangeDeletionNamespace), 0); +} + } // namespace mongo diff --git a/src/mongo/db/s/range_deleter_service_test.h b/src/mongo/db/s/range_deleter_service_test.h index 44435954534..7c689178ab1 100644 --- a/src/mongo/db/s/range_deleter_service_test.h +++ b/src/mongo/db/s/range_deleter_service_test.h @@ -31,6 +31,7 @@ #include "mongo/db/s/range_deleter_service.h" #include "mongo/db/s/shard_server_test_fixture.h" #include "mongo/idl/server_parameter_test_util.h" +#include "mongo/unittest/log_test.h" namespace mongo { @@ -44,7 +45,7 @@ public: RangeDeletionTask getTask(); void drainOngoingQueries(); - auto getOngoingQueriesFuture(); + SemiFuture getOngoingQueriesFuture(); private: RangeDeletionTask _task; @@ -58,32 +59,59 @@ public: OperationContext* opCtx; - // Util methods - RangeDeletionTask createRangeDeletionTask(const UUID& collectionUUID, - const BSONObj& min, - const BSONObj& max, - CleanWhenEnum whenToClean = CleanWhenEnum::kNow, - bool pending = true); - - std::shared_ptr createRangeDeletionTaskWithOngoingQueries( - const UUID& collectionUUID, - const BSONObj& min, - const BSONObj& max, - CleanWhenEnum whenToClean = CleanWhenEnum::kNow, - bool pending = true); - // Instantiate some collection UUIDs and tasks to be used for testing UUID uuidCollA = UUID::gen(); - inline static const NamespaceString nsCollA = NamespaceString("test", "collA"); + inline static const NamespaceString nsCollA{"test", "collA"}; + UUID uuidCollB = UUID::gen(); + inline static const NamespaceString nsCollB{"test", "collB"}; + + inline static std::map nssWithUuid{}; + std::shared_ptr rangeDeletionTask0ForCollA; std::shared_ptr rangeDeletionTask1ForCollA; - - UUID uuidCollB = UUID::gen(); - inline static const NamespaceString nsCollB = NamespaceString("test", "collB"); std::shared_ptr rangeDeletionTask0ForCollB; + inline static const std::string kShardKey = "_id"; + inline static const BSONObj kShardKeyPattern = BSON(kShardKey << 1); + private: + void _setFilteringMetadataWithUUID(OperationContext* opCtx, const UUID& uuid); + + // Scoped objects RAIIServerParameterControllerForTest enableFeatureFlag{"featureFlagRangeDeleterService", true}; + unittest::MinimumLoggedSeverityGuard _severityGuard{logv2::LogComponent::kShardingRangeDeleter, + logv2::LogSeverity::Debug(2)}; }; +RangeDeletionTask createRangeDeletionTask(const UUID& collectionUUID, + const BSONObj& min, + const BSONObj& max, + CleanWhenEnum whenToClean = CleanWhenEnum::kNow, + bool pending = true); + +std::shared_ptr createRangeDeletionTaskWithOngoingQueries( + const UUID& collectionUUID, + const BSONObj& min, + const BSONObj& max, + CleanWhenEnum whenToClean = CleanWhenEnum::kNow, + bool pending = true); + +SharedSemiFuture registerAndCreatePersistentTask( + OperationContext* opCtx, + const RangeDeletionTask& rdt, + SemiFuture&& waitForActiveQueriesToComplete); + +int insertDocsWithinRange( + OperationContext* opCtx, const NamespaceString& nss, int min, int max, int maxCount); + +void verifyRangeDeletionTasks(OperationContext* opCtx, + UUID uuidColl, + std::vector expectedChunkRanges); + +// CRUD operation over `config.rangeDeletions` +void insertRangeDeletionTaskDocument(OperationContext* opCtx, const RangeDeletionTask& rdt); +void updatePendingField(OperationContext* opCtx, UUID rdtId, bool pending); +void removePendingField(OperationContext* opCtx, UUID rdtId); +void deleteRangeDeletionTaskDocument(OperationContext* opCtx, UUID rdtId); + } // namespace mongo diff --git a/src/mongo/db/s/range_deleter_service_test_util.cpp b/src/mongo/db/s/range_deleter_service_test_util.cpp new file mode 100644 index 00000000000..6bfc2a76d5e --- /dev/null +++ b/src/mongo/db/s/range_deleter_service_test_util.cpp @@ -0,0 +1,177 @@ +/** + * Copyright (C) 2022-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * . + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/db/persistent_task_store.h" +#include "mongo/db/s/range_deleter_service_test.h" + +namespace mongo { + +/** + * RangeDeletionWithOngoingQueries implementation + */ +RangeDeletionWithOngoingQueries::RangeDeletionWithOngoingQueries(const RangeDeletionTask& t) + : _task(t) {} + +RangeDeletionTask RangeDeletionWithOngoingQueries::getTask() { + return _task; +} + +void RangeDeletionWithOngoingQueries::drainOngoingQueries() { + _ongoingQueries.setFrom(Status::OK()); +} + +SemiFuture RangeDeletionWithOngoingQueries::getOngoingQueriesFuture() { + return _ongoingQueries.getFuture().semi(); +} + +/** + * Utils + */ +RangeDeletionTask createRangeDeletionTask(const UUID& collectionUUID, + const BSONObj& min, + const BSONObj& max, + CleanWhenEnum whenToClean, + bool pending) { + RangeDeletionTask rdt; + rdt.setId(UUID::gen()); + rdt.setNss(RangeDeleterServiceTest::nssWithUuid[collectionUUID]); + rdt.setDonorShardId(ShardId("shard0")); + rdt.setCollectionUuid(collectionUUID); + rdt.setRange(ChunkRange(min, max)); + rdt.setWhenToClean(whenToClean); + rdt.setPending(pending); + return rdt; +} + +std::shared_ptr createRangeDeletionTaskWithOngoingQueries( + const UUID& collectionUUID, + const BSONObj& min, + const BSONObj& max, + CleanWhenEnum whenToClean, + bool pending) { + return std::make_shared( + createRangeDeletionTask(collectionUUID, min, max, whenToClean, pending)); +} + +// TODO review this method: the task may be registered, finish and be recreated by inserting the +// document +SharedSemiFuture registerAndCreatePersistentTask( + OperationContext* opCtx, + const RangeDeletionTask& rdt, + SemiFuture&& waitForActiveQueriesToComplete) { + auto rds = RangeDeleterService::get(opCtx); + + auto completionFuture = rds->registerTask(rdt, std::move(waitForActiveQueriesToComplete)); + + // Range deletion task will only proceed if persistent doc exists and its `pending` field + // doesn't exist + insertRangeDeletionTaskDocument(opCtx, rdt); + removePendingField(opCtx, rdt.getId()); + + return completionFuture; +} + +int insertDocsWithinRange( + OperationContext* opCtx, const NamespaceString& nss, int min, int max, int maxCount) { + + DBDirectClient dbclient(opCtx); + for (auto i = 0; i < maxCount; ++i) { + const int nextI = min + i; + if (nextI == max) { + return i; + } + dbclient.insert(nss.toString(), BSON(RangeDeleterServiceTest::kShardKey << nextI)); + } + return maxCount; +} + + +void insertRangeDeletionTaskDocument(OperationContext* opCtx, const RangeDeletionTask& rdt) { + PersistentTaskStore store(NamespaceString::kRangeDeletionNamespace); + store.add(opCtx, rdt); +} + +void updatePendingField(OperationContext* opCtx, UUID rdtId, bool pending) { + PersistentTaskStore store(NamespaceString::kRangeDeletionNamespace); + store.update(opCtx, + BSON(RangeDeletionTask::kIdFieldName << rdtId), + BSON("$set" << BSON(RangeDeletionTask::kPendingFieldName << pending))); +} + +void removePendingField(OperationContext* opCtx, UUID rdtId) { + PersistentTaskStore store(NamespaceString::kRangeDeletionNamespace); + store.update(opCtx, + BSON(RangeDeletionTask::kIdFieldName << rdtId), + BSON("$unset" << BSON(RangeDeletionTask::kPendingFieldName << ""))); +} + +void deleteRangeDeletionTaskDocument(OperationContext* opCtx, UUID rdtId) { + PersistentTaskStore store(NamespaceString::kRangeDeletionNamespace); + store.remove(opCtx, BSON(RangeDeletionTask::kIdFieldName << rdtId)); +} + +/** + * Ensure that `expectedChunkRanges` range deletion tasks are scheduled for collection with UUID + * `uuidColl` + */ +void verifyRangeDeletionTasks(OperationContext* opCtx, + UUID uuidColl, + std::vector expectedChunkRanges) { + auto rds = RangeDeleterService::get(opCtx); + + // Get chunk ranges inserted to be deleted by RangeDeleterService + BSONObj dumpState = rds->dumpState(); + BSONElement chunkRangesElem = dumpState.getField(uuidColl.toString()); + if (!chunkRangesElem.ok() && expectedChunkRanges.size() == 0) { + return; + } + ASSERT(chunkRangesElem.ok()) << "Expected to find range deletion tasks from collection " + << uuidColl.toString(); + + const auto chunkRanges = chunkRangesElem.Array(); + ASSERT_EQ(chunkRanges.size(), expectedChunkRanges.size()); + + // Sort expectedChunkRanges vector to replicate RangeDeleterService dumpState order + struct { + bool operator()(const ChunkRange& a, const ChunkRange& b) { + return a.getMin().woCompare(b.getMin()) < 0; + } + } RANGES_COMPARATOR; + + std::sort(expectedChunkRanges.begin(), expectedChunkRanges.end(), RANGES_COMPARATOR); + + // Check expectedChunkRanges are exactly the same as the returned ones + for (size_t i = 0; i < expectedChunkRanges.size(); ++i) { + ASSERT(ChunkRange::fromBSONThrowing(chunkRanges[i].Obj()) == expectedChunkRanges[i]) + << "Expected " << ChunkRange::fromBSONThrowing(chunkRanges[i].Obj()).toBSON() + << " == " << expectedChunkRanges[i].toBSON(); + } +} + +} // namespace mongo diff --git a/src/mongo/db/s/range_deletion_util.cpp b/src/mongo/db/s/range_deletion_util.cpp index a7a75a3257b..331f6f41a51 100644 --- a/src/mongo/db/s/range_deletion_util.cpp +++ b/src/mongo/db/s/range_deletion_util.cpp @@ -49,7 +49,8 @@ #include "mongo/db/query/query_planner.h" #include "mongo/db/repl/repl_client_info.h" #include "mongo/db/repl/wait_for_majority_service.h" -#include "mongo/db/s/migration_util.h" +#include "mongo/db/s/balancer_stats_registry.h" +#include "mongo/db/s/operation_sharding_state.h" #include "mongo/db/s/shard_key_index_util.h" #include "mongo/db/s/sharding_runtime_d_params_gen.h" #include "mongo/db/s/sharding_statistics.h" @@ -190,33 +191,6 @@ StatusWith deleteNextBatch(OperationContext* opCtx, return numDeleted; } -template -auto withTemporaryOperationContext(Callable&& callable, const NamespaceString& nss) { - ThreadClient tc(migrationutil::kRangeDeletionThreadName, getGlobalServiceContext()); - { - stdx::lock_guard lk(*tc.get()); - tc->setSystemOperationKillableByStepdown(lk); - } - auto uniqueOpCtx = Client::getCurrent()->makeOperationContext(); - auto opCtx = uniqueOpCtx.get(); - - // Ensure that this operation will be killed by the RstlKillOpThread during step-up or stepdown. - opCtx->setAlwaysInterruptAtStepDownOrUp_UNSAFE(); - invariant(opCtx->shouldAlwaysInterruptAtStepDownOrUp()); - - { - auto replCoord = repl::ReplicationCoordinator::get(opCtx); - Lock::GlobalLock lock(opCtx, MODE_IX); - uassert(ErrorCodes::PrimarySteppedDown, - str::stream() << "Not primary while running range deletion task for collection" - << nss, - replCoord->getReplicationMode() == repl::ReplicationCoordinator::modeReplSet && - replCoord->canAcceptWritesFor(opCtx, nss)); - } - - return callable(opCtx); -} - void ensureRangeDeletionTaskStillExists(OperationContext* opCtx, const UUID& collectionUuid, const ChunkRange& range) { @@ -284,28 +258,11 @@ ExecutorFuture deleteRangeInBatchesWithExecutor( [=](OperationContext* opCtx) { return deleteRangeInBatches(opCtx, nss.db(), collectionUuid, keyPattern, range); }, - nss); + nss.db(), + collectionUuid); }); } -void removePersistentRangeDeletionTask(const NamespaceString& nss, - const UUID& collectionUuid, - const ChunkRange& range) { - withTemporaryOperationContext( - [&](OperationContext* opCtx) { - PersistentTaskStore store(NamespaceString::kRangeDeletionNamespace); - - auto overlappingRangeQuery = BSON( - RangeDeletionTask::kCollectionUuidFieldName - << collectionUuid << RangeDeletionTask::kRangeFieldName + "." + ChunkRange::kMinKey - << GTE << range.getMin() - << RangeDeletionTask::kRangeFieldName + "." + ChunkRange::kMaxKey << LTE - << range.getMax()); - store.remove(opCtx, overlappingRangeQuery); - }, - nss); -} - ExecutorFuture waitForDeletionsToMajorityReplicate( const std::shared_ptr& executor, const NamespaceString& nss, @@ -329,7 +286,8 @@ ExecutorFuture waitForDeletionsToMajorityReplicate( .waitUntilMajority(clientOpTime, CancellationToken::uncancelable()) .thenRunOn(executor); }, - nss); + nss.db(), + collectionUuid); } std::vector getPersistentRangeDeletionTasks(OperationContext* opCtx, @@ -347,6 +305,13 @@ std::vector getPersistentRangeDeletionTasks(OperationContext* return tasks; } +BSONObj getQueryFilterForRangeDeletionTask(const UUID& collectionUuid, const ChunkRange& range) { + return BSON(RangeDeletionTask::kCollectionUuidFieldName + << collectionUuid << RangeDeletionTask::kRangeFieldName + "." + ChunkRange::kMinKey + << range.getMin() << RangeDeletionTask::kRangeFieldName + "." + ChunkRange::kMaxKey + << range.getMax()); +} + } // namespace Status deleteRangeInBatches(OperationContext* opCtx, @@ -354,6 +319,8 @@ Status deleteRangeInBatches(OperationContext* opCtx, const UUID& collectionUuid, const BSONObj& keyPattern, const ChunkRange& range) { + suspendRangeDeletion.pauseWhileSet(); + bool allDocsRemoved = false; // Delete all batches in this range unless a stepdown error occurs. Do not yield the // executor to ensure that this range is fully deleted before another range is @@ -402,7 +369,7 @@ Status deleteRangeInBatches(OperationContext* opCtx, } }(); - migrationutil::persistUpdatedNumOrphans(opCtx, collectionUuid, range, -numDeleted); + persistUpdatedNumOrphans(opCtx, collectionUuid, range, -numDeleted); if (MONGO_unlikely(hangAfterDoingDeletion.shouldFail())) { hangAfterDoingDeletion.pauseWhileSet(opCtx); @@ -508,7 +475,6 @@ SharedSemiFuture removeDocumentsInRange( invariant(s.isOK()); }) .then([=]() mutable { - suspendRangeDeletion.pauseWhileSet(); // Wait for possibly ongoing queries on secondaries to complete. return sleepUntil(executor, executor->now() + delayForActiveQueriesOnSecondariesToComplete); @@ -574,7 +540,12 @@ SharedSemiFuture removeDocumentsInRange( } try { - removePersistentRangeDeletionTask(nss, collectionUuid, range); + withTemporaryOperationContext( + [&](OperationContext* opCtx) { + removePersistentRangeDeletionTask(opCtx, collectionUuid, range); + }, + nss.db(), + collectionUuid); } catch (const DBException& e) { LOGV2_ERROR(23770, "Failed to delete range deletion task for range {range} in collection " @@ -602,4 +573,41 @@ SharedSemiFuture removeDocumentsInRange( .share(); } +void persistUpdatedNumOrphans(OperationContext* opCtx, + const UUID& collectionUuid, + const ChunkRange& range, + long long changeInOrphans) { + const auto query = getQueryFilterForRangeDeletionTask(collectionUuid, range); + try { + PersistentTaskStore store(NamespaceString::kRangeDeletionNamespace); + ScopedRangeDeleterLock rangeDeleterLock(opCtx, collectionUuid); + // The DBDirectClient will not retry WriteConflictExceptions internally while holding an X + // mode lock, so we need to retry at this level. + writeConflictRetry( + opCtx, "updateOrphanCount", NamespaceString::kRangeDeletionNamespace.ns(), [&] { + store.update(opCtx, + query, + BSON("$inc" << BSON(RangeDeletionTask::kNumOrphanDocsFieldName + << changeInOrphans)), + WriteConcerns::kLocalWriteConcern); + }); + BalancerStatsRegistry::get(opCtx)->updateOrphansCount(collectionUuid, changeInOrphans); + } catch (const ExceptionFor&) { + // When upgrading or downgrading, there may be no documents with the orphan count field. + } +} + +void removePersistentRangeDeletionTask(OperationContext* opCtx, + const UUID& collectionUuid, + const ChunkRange& range) { + PersistentTaskStore store(NamespaceString::kRangeDeletionNamespace); + + auto overlappingRangeQuery = BSON( + RangeDeletionTask::kCollectionUuidFieldName + << collectionUuid << RangeDeletionTask::kRangeFieldName + "." + ChunkRange::kMinKey << GTE + << range.getMin() << RangeDeletionTask::kRangeFieldName + "." + ChunkRange::kMaxKey << LTE + << range.getMax()); + store.remove(opCtx, overlappingRangeQuery); +} + } // namespace mongo diff --git a/src/mongo/db/s/range_deletion_util.h b/src/mongo/db/s/range_deletion_util.h index dd78e97db6f..6efaf6946ce 100644 --- a/src/mongo/db/s/range_deletion_util.h +++ b/src/mongo/db/s/range_deletion_util.h @@ -31,13 +31,17 @@ #include #include +#include "mongo/db/concurrency/d_concurrency.h" #include "mongo/db/namespace_string.h" +#include "mongo/db/repl/replication_coordinator.h" #include "mongo/db/s/range_deletion_task_gen.h" #include "mongo/executor/task_executor.h" #include "mongo/s/catalog/type_chunk.h" namespace mongo { +constexpr auto kRangeDeletionThreadName = "range-deleter"_sd; + /** * DO NOT USE - only necessary for the legacy range deleter * @@ -94,4 +98,59 @@ void restoreRangeDeletionTasksForRename(OperationContext* opCtx, const Namespace void deleteRangeDeletionTasksForRename(OperationContext* opCtx, const NamespaceString& fromNss, const NamespaceString& toNss); + +/** + * Updates the range deletion task document to increase or decrease numOrphanedDocs + */ +void persistUpdatedNumOrphans(OperationContext* opCtx, + const UUID& collectionUuid, + const ChunkRange& range, + long long changeInOrphans); + +/** + * Removes range deletion task documents from `config.rangeDeletions` for the specified range and + * collection + */ +void removePersistentRangeDeletionTask(OperationContext* opCtx, + const UUID& collectionUuid, + const ChunkRange& range); + +/** + * Wrapper to run a safer step up/step down killable task within an operation context + */ +template +auto withTemporaryOperationContext(Callable&& callable, + const DatabaseName dbName, + const UUID& collectionUUID, + bool writeToRangeDeletionNamespace = false) { + ThreadClient tc(kRangeDeletionThreadName, getGlobalServiceContext()); + { + stdx::lock_guard lk(*tc.get()); + tc->setSystemOperationKillableByStepdown(lk); + } + auto uniqueOpCtx = Client::getCurrent()->makeOperationContext(); + auto opCtx = uniqueOpCtx.get(); + + // Ensure that this operation will be killed by the RstlKillOpThread during step-up or stepdown. + opCtx->setAlwaysInterruptAtStepDownOrUp_UNSAFE(); + invariant(opCtx->shouldAlwaysInterruptAtStepDownOrUp()); + + { + auto replCoord = repl::ReplicationCoordinator::get(opCtx); + Lock::GlobalLock lock(opCtx, MODE_IX); + uassert( + ErrorCodes::PrimarySteppedDown, + str::stream() + << "Not primary while running range deletion task for collection with UUID " + << collectionUUID, + replCoord->getReplicationMode() == repl::ReplicationCoordinator::modeReplSet && + replCoord->canAcceptWritesFor(opCtx, + NamespaceStringOrUUID(dbName, collectionUUID)) && + (!writeToRangeDeletionNamespace || + replCoord->canAcceptWritesFor(opCtx, NamespaceString::kRangeDeletionNamespace))); + } + + return callable(opCtx); +} + } // namespace mongo diff --git a/src/mongo/db/s/range_deletion_util_test.cpp b/src/mongo/db/s/range_deletion_util_test.cpp index e0299a30208..5f8cafc91e0 100644 --- a/src/mongo/db/s/range_deletion_util_test.cpp +++ b/src/mongo/db/s/range_deletion_util_test.cpp @@ -120,9 +120,6 @@ public: Lock::CollectionLock collLock(_opCtx, kNss, MODE_IX); CollectionMetadata collMetadata(std::move(cm), ShardId("dummyShardId")); CollectionShardingRuntime::get(_opCtx, kNss)->setFilteringMetadata(_opCtx, collMetadata); - auto* css = CollectionShardingState::get(_opCtx, kNss); - auto& csr = *checked_cast(css); - csr.setFilteringMetadata(_opCtx, collMetadata); } UUID uuid() const { -- cgit v1.2.1