diff options
-rw-r--r-- | src/mongo/db/s/migration_util.cpp | 99 | ||||
-rw-r--r-- | src/mongo/db/s/migration_util_test.cpp | 58 | ||||
-rw-r--r-- | src/mongo/db/s/rename_collection_participant_service.cpp | 25 |
3 files changed, 86 insertions, 96 deletions
diff --git a/src/mongo/db/s/migration_util.cpp b/src/mongo/db/s/migration_util.cpp index 1916bb0abff..ced6c1bc2a4 100644 --- a/src/mongo/db/s/migration_util.cpp +++ b/src/mongo/db/s/migration_util.cpp @@ -347,7 +347,6 @@ bool deletionTaskUuidMatchesFilteringMetadataUuid( ExecutorFuture<void> cleanUpRange(ServiceContext* serviceContext, const std::shared_ptr<executor::ThreadPoolTaskExecutor>& executor, const RangeDeletionTask& deletionTask) { - return AsyncTry([=]() mutable { ThreadClient tc(kRangeDeletionThreadName, serviceContext); { @@ -358,33 +357,45 @@ ExecutorFuture<void> cleanUpRange(ServiceContext* serviceContext, auto opCtx = uniqueOpCtx.get(); opCtx->setAlwaysInterruptAtStepDownOrUp(); - AutoGetCollection autoColl(opCtx, deletionTask.getNss(), MODE_IS); - auto csr = CollectionShardingRuntime::get(opCtx, deletionTask.getNss()); - // Keep the collection metadata from changing for the rest of this scope. - auto csrLock = CollectionShardingRuntime::CSRLock::lockShared(opCtx, csr); - auto optCollDescr = csr->getCurrentMetadataIfKnown(); - uassert( - ErrorCodes::RangeDeletionAbandonedBecauseCollectionWithUUIDDoesNotExist, - str::stream() - << "Even after forced refresh, filtering metadata for namespace in " - "deletion task " - << (optCollDescr - ? (optCollDescr->isSharded() - ? " has UUID that does not match UUID of the deletion task" - : " is unsharded") - : " is not known"), - deletionTaskUuidMatchesFilteringMetadataUuid(opCtx, optCollDescr, deletionTask)); - - LOGV2(22026, - "Submitting range deletion task", - "deletionTask"_attr = redact(deletionTask.toBSON()), - "migrationId"_attr = deletionTask.getId()); - - const auto whenToClean = deletionTask.getWhenToClean() == CleanWhenEnum::kNow - ? CollectionShardingRuntime::kNow - : CollectionShardingRuntime::kDelayed; - - return csr->cleanUpRange(deletionTask.getRange(), deletionTask.getId(), whenToClean); + const NamespaceString& nss = deletionTask.getNss(); + + while (true) { + { + // Holding the locks while enqueueing the task protects against possible + // concurrent cleanups of the filtering metadata, that be serialized + AutoGetCollection autoColl(opCtx, nss, MODE_IS); + auto csr = CollectionShardingRuntime::get(opCtx, nss); + auto csrLock = CollectionShardingRuntime::CSRLock::lockShared(opCtx, csr); + auto optCollDescr = csr->getCurrentMetadataIfKnown(); + + if (optCollDescr) { + uassert(ErrorCodes:: + RangeDeletionAbandonedBecauseCollectionWithUUIDDoesNotExist, + str::stream() << "Filtering metadata for " << nss + << (optCollDescr->isSharded() + ? " has UUID that does not match UUID of " + "the deletion task" + : " is unsharded"), + deletionTaskUuidMatchesFilteringMetadataUuid( + opCtx, optCollDescr, deletionTask)); + + LOGV2(22026, + "Submitting range deletion task", + "deletionTask"_attr = redact(deletionTask.toBSON()), + "migrationId"_attr = deletionTask.getId()); + + const auto whenToClean = + deletionTask.getWhenToClean() == CleanWhenEnum::kNow + ? CollectionShardingRuntime::kNow + : CollectionShardingRuntime::kDelayed; + + return csr->cleanUpRange( + deletionTask.getRange(), deletionTask.getId(), whenToClean); + } + } + + refreshFilteringMetadataUntilSuccess(opCtx, nss); + } }) .until([](Status status) mutable { // Resubmit the range for deletion on a RangeOverlapConflict error. @@ -405,8 +416,6 @@ ExecutorFuture<void> submitRangeDeletionTask(OperationContext* opCtx, stdx::lock_guard<Client> lk(*tc.get()); tc->setSystemOperationKillableByStepdown(lk); } - auto uniqueOpCtx = tc->makeOperationContext(); - auto opCtx = uniqueOpCtx.get(); uassert( ErrorCodes::ResumableRangeDeleterDisabled, @@ -415,36 +424,6 @@ ExecutorFuture<void> submitRangeDeletionTask(OperationContext* opCtx, << " because the disableResumableRangeDeleter server parameter is set to true", !disableResumableRangeDeleter.load()); - // Make sure the collection metadata is up-to-date before submitting. - boost::optional<CollectionMetadata> optCollDescr; - { - AutoGetCollection autoColl(opCtx, deletionTask.getNss(), MODE_IS); - auto csr = CollectionShardingRuntime::get(opCtx, deletionTask.getNss()); - optCollDescr = csr->getCurrentMetadataIfKnown(); - } - - if (!deletionTaskUuidMatchesFilteringMetadataUuid(opCtx, optCollDescr, deletionTask)) { - - // If the collection's filtering metadata is not known, is unsharded, or its - // UUID does not match the UUID of the deletion task, force a filtering metadata - // refresh, because this node may have just stepped up and therefore may have a - // stale cache. - LOGV2(22024, - "Filtering metadata for this range deletion task may be outdated; " - "forcing refresh", - "deletionTask"_attr = redact(deletionTask.toBSON()), - "error"_attr = - (optCollDescr ? (optCollDescr->isSharded() - ? "Collection has UUID that does not match " - "UUID of the deletion task" - : "Collection is unsharded") - : "Collection's sharding state is not known"), - "namespace"_attr = deletionTask.getNss(), - "migrationId"_attr = deletionTask.getId()); - - refreshFilteringMetadataUntilSuccess(opCtx, deletionTask.getNss()); - } - return AsyncTry([=]() { return cleanUpRange(serviceContext, executor, deletionTask) .onError<ErrorCodes::KeyPatternShorterThanBound>([=](Status status) { diff --git a/src/mongo/db/s/migration_util_test.cpp b/src/mongo/db/s/migration_util_test.cpp index fe1f1c0a4d6..d832540173d 100644 --- a/src/mongo/db/s/migration_util_test.cpp +++ b/src/mongo/db/s/migration_util_test.cpp @@ -31,11 +31,11 @@ #include "mongo/client/remote_command_targeter_factory_mock.h" #include "mongo/client/remote_command_targeter_mock.h" -#include "mongo/db/catalog/create_collection.h" #include "mongo/db/catalog_raii.h" #include "mongo/db/persistent_task_store.h" #include "mongo/db/repl/wait_for_majority_service.h" #include "mongo/db/s/collection_sharding_runtime.h" +#include "mongo/db/s/collection_sharding_runtime_test.cpp" #include "mongo/db/s/migration_util.h" #include "mongo/db/s/operation_sharding_state.h" #include "mongo/db/s/shard_filtering_metadata_refresh.h" @@ -321,15 +321,14 @@ TEST_F(MigrationUtilsTest, TestInvalidUUID) { * Fixture that uses a mocked CatalogCacheLoader and CatalogClient to allow metadata refreshes * without using the mock network. */ -class SubmitRangeDeletionTaskTest : public ShardServerTestFixture { +class SubmitRangeDeletionTaskTest : public CollectionShardingRuntimeWithRangeDeleterTest { public: const HostAndPort kConfigHostAndPort{"dummy", 123}; - const NamespaceString kNss{"test.foo"}; const ShardKeyPattern kShardKeyPattern = ShardKeyPattern(BSON("_id" << 1)); const UUID kDefaultUUID = UUID::gen(); const OID kEpoch = OID::gen(); const DatabaseType kDefaultDatabaseType = - DatabaseType(kNss.db().toString(), ShardId("0"), true, DatabaseVersion(kDefaultUUID)); + DatabaseType(kTestNss.db().toString(), ShardId("0"), true, DatabaseVersion(kDefaultUUID)); const std::vector<ShardType> kShardList = {ShardType("0", "Host0:12345"), ShardType("1", "Host1:12345")}; @@ -418,29 +417,29 @@ public: } CollectionType makeCollectionType(UUID uuid, OID epoch) { - CollectionType coll(kNss, epoch, Date_t::now(), uuid); + CollectionType coll(kTestNss, epoch, Date_t::now(), uuid); coll.setKeyPattern(kShardKeyPattern.getKeyPattern()); coll.setUnique(true); return coll; } std::vector<ChunkType> makeChangedChunks(ChunkVersion startingVersion) { - ChunkType chunk1(kNss, + ChunkType chunk1(kTestNss, {kShardKeyPattern.getKeyPattern().globalMin(), BSON("_id" << -100)}, startingVersion, {"0"}); chunk1.setName(OID::gen()); startingVersion.incMinor(); - ChunkType chunk2(kNss, {BSON("_id" << -100), BSON("_id" << 0)}, startingVersion, {"1"}); + ChunkType chunk2(kTestNss, {BSON("_id" << -100), BSON("_id" << 0)}, startingVersion, {"1"}); chunk2.setName(OID::gen()); startingVersion.incMinor(); - ChunkType chunk3(kNss, {BSON("_id" << 0), BSON("_id" << 100)}, startingVersion, {"0"}); + ChunkType chunk3(kTestNss, {BSON("_id" << 0), BSON("_id" << 100)}, startingVersion, {"0"}); chunk3.setName(OID::gen()); startingVersion.incMinor(); - ChunkType chunk4(kNss, + ChunkType chunk4(kTestNss, {BSON("_id" << 100), kShardKeyPattern.getKeyPattern().globalMax()}, startingVersion, {"1"}); @@ -457,7 +456,7 @@ public: TEST_F(SubmitRangeDeletionTaskTest, FailsAndDeletesTaskIfFilteringMetadataIsUnknownEvenAfterRefresh) { auto opCtx = operationContext(); - auto deletionTask = createDeletionTask(kNss, kDefaultUUID, 0, 10, _myShardName); + auto deletionTask = createDeletionTask(kTestNss, kDefaultUUID, 0, 10, _myShardName); PersistentTaskStore<RangeDeletionTask> store(NamespaceString::kRangeDeletionNamespace); store.add(opCtx, deletionTask); @@ -482,7 +481,7 @@ TEST_F(SubmitRangeDeletionTaskTest, TEST_F(SubmitRangeDeletionTaskTest, FailsAndDeletesTaskIfNamespaceIsUnshardedEvenAfterRefresh) { auto opCtx = operationContext(); - auto deletionTask = createDeletionTask(kNss, kDefaultUUID, 0, 10, _myShardName); + auto deletionTask = createDeletionTask(kTestNss, kDefaultUUID, 0, 10, _myShardName); PersistentTaskStore<RangeDeletionTask> store(NamespaceString::kRangeDeletionNamespace); @@ -510,7 +509,7 @@ TEST_F(SubmitRangeDeletionTaskTest, FailsAndDeletesTaskIfNamespaceIsUnshardedBeforeAndAfterRefresh) { auto opCtx = operationContext(); - auto deletionTask = createDeletionTask(kNss, kDefaultUUID, 0, 10, _myShardName); + auto deletionTask = createDeletionTask(kTestNss, kDefaultUUID, 0, 10, _myShardName); PersistentTaskStore<RangeDeletionTask> store(NamespaceString::kRangeDeletionNamespace); @@ -523,7 +522,7 @@ TEST_F(SubmitRangeDeletionTaskTest, _mockCatalogCacheLoader->setDatabaseRefreshReturnValue(kDefaultDatabaseType); _mockCatalogCacheLoader->setCollectionRefreshReturnValue( Status(ErrorCodes::NamespaceNotFound, "dummy errmsg")); - forceShardFilteringMetadataRefresh(opCtx, kNss); + forceShardFilteringMetadataRefresh(opCtx, kTestNss); auto cleanupCompleteFuture = migrationutil::submitRangeDeletionTask(opCtx, deletionTask); @@ -538,8 +537,8 @@ TEST_F(SubmitRangeDeletionTaskTest, TEST_F(SubmitRangeDeletionTaskTest, SucceedsIfFilteringMetadataUUIDMatchesTaskUUID) { auto opCtx = operationContext(); - auto collectionUUID = createCollectionAndGetUUID(kNss); - auto deletionTask = createDeletionTask(kNss, collectionUUID, 0, 10, _myShardName); + auto collectionUUID = createCollectionAndGetUUID(kTestNss); + auto deletionTask = createDeletionTask(kTestNss, collectionUUID, 0, 10, _myShardName); PersistentTaskStore<RangeDeletionTask> store(NamespaceString::kRangeDeletionNamespace); @@ -554,7 +553,7 @@ TEST_F(SubmitRangeDeletionTaskTest, SucceedsIfFilteringMetadataUUIDMatchesTaskUU _mockCatalogCacheLoader->setChunkRefreshReturnValue( makeChangedChunks(ChunkVersion(1, 0, kEpoch, boost::none /* timestamp */))); _mockCatalogClient->setCollections({coll}); - forceShardFilteringMetadataRefresh(opCtx, kNss); + forceShardFilteringMetadataRefresh(opCtx, kTestNss); // The task should have been submitted successfully. auto cleanupCompleteFuture = migrationutil::submitRangeDeletionTask(opCtx, deletionTask); @@ -566,8 +565,8 @@ TEST_F( SucceedsIfFilteringMetadataInitiallyUnknownButFilteringMetadataUUIDMatchesTaskUUIDAfterRefresh) { auto opCtx = operationContext(); - auto collectionUUID = createCollectionAndGetUUID(kNss); - auto deletionTask = createDeletionTask(kNss, collectionUUID, 0, 10, _myShardName); + auto collectionUUID = createCollectionAndGetUUID(kTestNss); + auto deletionTask = createDeletionTask(kTestNss, collectionUUID, 0, 10, _myShardName); PersistentTaskStore<RangeDeletionTask> store(NamespaceString::kRangeDeletionNamespace); @@ -583,6 +582,9 @@ TEST_F( makeChangedChunks(ChunkVersion(1, 0, kEpoch, boost::none /* timestamp */))); _mockCatalogClient->setCollections({coll}); + auto metadata = makeShardedMetadata(opCtx, collectionUUID); + csr().setFilteringMetadata(opCtx, metadata); + // The task should have been submitted successfully. auto cleanupCompleteFuture = migrationutil::submitRangeDeletionTask(opCtx, deletionTask); cleanupCompleteFuture.get(opCtx); @@ -597,10 +599,10 @@ TEST_F(SubmitRangeDeletionTaskTest, _mockCatalogCacheLoader->setDatabaseRefreshReturnValue(kDefaultDatabaseType); _mockCatalogCacheLoader->setCollectionRefreshReturnValue( Status(ErrorCodes::NamespaceNotFound, "dummy errmsg")); - forceShardFilteringMetadataRefresh(opCtx, kNss); + forceShardFilteringMetadataRefresh(opCtx, kTestNss); - auto collectionUUID = createCollectionAndGetUUID(kNss); - auto deletionTask = createDeletionTask(kNss, collectionUUID, 0, 10, _myShardName); + auto collectionUUID = createCollectionAndGetUUID(kTestNss); + auto deletionTask = createDeletionTask(kTestNss, collectionUUID, 0, 10, _myShardName); PersistentTaskStore<RangeDeletionTask> store(NamespaceString::kRangeDeletionNamespace); @@ -615,6 +617,9 @@ TEST_F(SubmitRangeDeletionTaskTest, makeChangedChunks(ChunkVersion(10, 0, kEpoch, boost::none /* timestamp */))); _mockCatalogClient->setCollections({matchingColl}); + auto metadata = makeShardedMetadata(opCtx, collectionUUID); + csr().setFilteringMetadata(opCtx, metadata); + // The task should have been submitted successfully. auto cleanupCompleteFuture = migrationutil::submitRangeDeletionTask(opCtx, deletionTask); cleanupCompleteFuture.get(opCtx); @@ -634,10 +639,10 @@ TEST_F(SubmitRangeDeletionTaskTest, _mockCatalogCacheLoader->setChunkRefreshReturnValue( makeChangedChunks(ChunkVersion(1, 0, staleEpoch, boost::none /* timestamp */))); _mockCatalogClient->setCollections({staleColl}); - forceShardFilteringMetadataRefresh(opCtx, kNss); + forceShardFilteringMetadataRefresh(opCtx, kTestNss); - auto collectionUUID = createCollectionAndGetUUID(kNss); - auto deletionTask = createDeletionTask(kNss, collectionUUID, 0, 10, _myShardName); + auto collectionUUID = createCollectionAndGetUUID(kTestNss); + auto deletionTask = createDeletionTask(kTestNss, collectionUUID, 0, 10, _myShardName); PersistentTaskStore<RangeDeletionTask> store(NamespaceString::kRangeDeletionNamespace); @@ -652,6 +657,9 @@ TEST_F(SubmitRangeDeletionTaskTest, makeChangedChunks(ChunkVersion(10, 0, kEpoch, boost::none /* timestamp */))); _mockCatalogClient->setCollections({matchingColl}); + auto metadata = makeShardedMetadata(opCtx, collectionUUID); + csr().setFilteringMetadata(opCtx, metadata); + // The task should have been submitted successfully. auto cleanupCompleteFuture = migrationutil::submitRangeDeletionTask(opCtx, deletionTask); cleanupCompleteFuture.get(opCtx); @@ -661,7 +669,7 @@ TEST_F(SubmitRangeDeletionTaskTest, FailsAndDeletesTaskIfFilteringMetadataUUIDDifferentFromTaskUUIDEvenAfterRefresh) { auto opCtx = operationContext(); - auto deletionTask = createDeletionTask(kNss, kDefaultUUID, 0, 10, _myShardName); + auto deletionTask = createDeletionTask(kTestNss, kDefaultUUID, 0, 10, _myShardName); PersistentTaskStore<RangeDeletionTask> store(NamespaceString::kRangeDeletionNamespace); diff --git a/src/mongo/db/s/rename_collection_participant_service.cpp b/src/mongo/db/s/rename_collection_participant_service.cpp index f93b7732ec9..568534c83cd 100644 --- a/src/mongo/db/s/rename_collection_participant_service.cpp +++ b/src/mongo/db/s/rename_collection_participant_service.cpp @@ -71,6 +71,15 @@ void dropCollectionLocally(OperationContext* opCtx, const NamespaceString& nss) "collectionExisted"_attr = knownNss); } +/* Clear the CollectionShardingRuntime entry for the specified namespace */ +void clearFilteringMetadata(OperationContext* opCtx, const NamespaceString& nss) { + UninterruptibleLockGuard noInterrupt(opCtx->lockState()); + Lock::DBLock dbLock(opCtx, nss.db(), MODE_IX); + Lock::CollectionLock collLock(opCtx, nss, MODE_IX); + auto* csr = CollectionShardingRuntime::get(opCtx, nss); + csr->clearFilteringMetadata(opCtx); +} + /* * Rename the collection if exists locally, otherwise simply drop the target collection. */ @@ -256,6 +265,11 @@ SemiFuture<void> RenameParticipantInstance::run( service->promoteRecoverableCriticalSectionToBlockAlsoReads( opCtx, toNss(), reason, ShardingCatalogClient::kLocalWriteConcern); + // Clear the filtering metadata to safely create new range deletion tasks: the + // submission will serialize on the renamed collection's metadata refresh. + clearFilteringMetadata(opCtx, fromNss()); + clearFilteringMetadata(opCtx, toNss()); + snapshotRangeDeletionsForRename(opCtx, fromNss(), toNss()); })) .then(_executePhase( @@ -303,17 +317,6 @@ SemiFuture<void> RenameParticipantInstance::run( auto opCtxHolder = cc().makeOperationContext(); auto* opCtx = opCtxHolder.get(); - // Clear the CollectionShardingRuntime entry - auto clearFilteringMetadata = [&](const NamespaceString& nss) { - UninterruptibleLockGuard noInterrupt(opCtx->lockState()); - Lock::DBLock dbLock(opCtx, nss.db(), MODE_IX); - Lock::CollectionLock collLock(opCtx, nss, MODE_IX); - auto* csr = CollectionShardingRuntime::get(opCtx, nss); - csr->clearFilteringMetadata(opCtx); - }; - clearFilteringMetadata(fromNss()); - clearFilteringMetadata(toNss()); - // Force the refresh of the catalog cache for both source and destination // collections to purge outdated information. // |