diff options
-rw-r--r-- | src/mongo/db/s/catalog_cache_loader_mock.cpp | 25 | ||||
-rw-r--r-- | src/mongo/db/s/catalog_cache_loader_mock.h | 9 | ||||
-rw-r--r-- | src/mongo/db/s/migration_util.cpp | 8 | ||||
-rw-r--r-- | src/mongo/db/s/migration_util_test.cpp | 199 |
4 files changed, 210 insertions, 31 deletions
diff --git a/src/mongo/db/s/catalog_cache_loader_mock.cpp b/src/mongo/db/s/catalog_cache_loader_mock.cpp index 406a16c26f6..7d3c7b1f80b 100644 --- a/src/mongo/db/s/catalog_cache_loader_mock.cpp +++ b/src/mongo/db/s/catalog_cache_loader_mock.cpp @@ -133,7 +133,26 @@ std::shared_ptr<Notification<void>> CatalogCacheLoaderMock::getChunksSince( void CatalogCacheLoaderMock::getDatabase( StringData dbName, std::function<void(OperationContext*, StatusWith<DatabaseType>)> callbackFn) { - // Not implemented + _threadPool.schedule([ this, callbackFn ](auto status) noexcept { + invariant(status); + + auto opCtx = Client::getCurrent()->makeOperationContext(); + + auto swDatabase = [&]() -> StatusWith<DatabaseType> { + try { + uassertStatusOK(_swDatabaseReturnValue); + + return DatabaseType(_swDatabaseReturnValue.getValue().getName(), + _swDatabaseReturnValue.getValue().getPrimary(), + _swDatabaseReturnValue.getValue().getSharded(), + _swDatabaseReturnValue.getValue().getVersion()); + } catch (const DBException& ex) { + return ex.toStatus(); + } + }(); + + callbackFn(opCtx.get(), std::move(swDatabase)); + }); } void CatalogCacheLoaderMock::setCollectionRefreshReturnValue( @@ -146,4 +165,8 @@ void CatalogCacheLoaderMock::setChunkRefreshReturnValue( _swChunksReturnValue = std::move(statusWithChunks); } +void CatalogCacheLoaderMock::setDatabaseRefreshReturnValue(StatusWith<DatabaseType> swDatabase) { + _swDatabaseReturnValue = std::move(swDatabase); +} + } // namespace mongo diff --git a/src/mongo/db/s/catalog_cache_loader_mock.h b/src/mongo/db/s/catalog_cache_loader_mock.h index b5a2f64a6b7..ce566a02fd7 100644 --- a/src/mongo/db/s/catalog_cache_loader_mock.h +++ b/src/mongo/db/s/catalog_cache_loader_mock.h @@ -77,7 +77,16 @@ public: */ void setChunkRefreshReturnValue(StatusWith<std::vector<ChunkType>> statusWithChunks); + /** + * Sets the mocked database entry result that getDatabase will use to construct its return + * value. + */ + void setDatabaseRefreshReturnValue(StatusWith<DatabaseType> swDatabase); + private: + StatusWith<DatabaseType> _swDatabaseReturnValue{ + Status(ErrorCodes::InternalError, "config loader database response is uninitialized")}; + // These variables hold the mocked chunks and collection entry results used to construct the // return value of getChunksSince above. StatusWith<CollectionType> _swCollectionReturnValue{Status( diff --git a/src/mongo/db/s/migration_util.cpp b/src/mongo/db/s/migration_util.cpp index 18c06472062..ff3d10e6537 100644 --- a/src/mongo/db/s/migration_util.cpp +++ b/src/mongo/db/s/migration_util.cpp @@ -130,11 +130,7 @@ bool checkForConflictingDeletions(OperationContext* opCtx, ExecutorFuture<bool> submitRangeDeletionTask(OperationContext* opCtx, const RangeDeletionTask& deletionTask) { const auto serviceContext = opCtx->getServiceContext(); - // TODO (SERVER-45577): Use the Grid's fixed executor once the refresh is done asynchronously. - // An arbitrary executor is being used temporarily because unit tests have only one thread in - // the fixed executor, and that thread is needed to respond to the refresh. - return ExecutorFuture<void>( - Grid::get(serviceContext)->getExecutorPool()->getArbitraryExecutor()) + return ExecutorFuture<void>(Grid::get(serviceContext)->getExecutorPool()->getFixedExecutor()) .then([=] { ThreadClient tc(kRangeDeletionThreadName, serviceContext); { @@ -160,7 +156,7 @@ ExecutorFuture<bool> submitRangeDeletionTask(OperationContext* opCtx, : " is not known") << ", forcing a refresh of " << deletionTask.getNss(); - // TODO (SERVER-45577): Add an asynchronous version of + // TODO (SERVER-46075): Add an asynchronous version of // forceShardFilteringMetadataRefresh to avoid blocking on the network in the // thread pool. autoColl.reset(); diff --git a/src/mongo/db/s/migration_util_test.cpp b/src/mongo/db/s/migration_util_test.cpp index 2aafa4e6c5f..14547873f36 100644 --- a/src/mongo/db/s/migration_util_test.cpp +++ b/src/mongo/db/s/migration_util_test.cpp @@ -30,12 +30,16 @@ #include "mongo/client/remote_command_targeter_factory_mock.h" #include "mongo/client/remote_command_targeter_mock.h" #include "mongo/db/catalog_raii.h" +#include "mongo/db/s/catalog_cache_loader_mock.h" #include "mongo/db/s/collection_sharding_runtime.h" #include "mongo/db/s/collection_sharding_state.h" #include "mongo/db/s/migration_util.h" #include "mongo/db/s/persistent_task_store.h" #include "mongo/db/s/shard_filtering_metadata_refresh.h" +#include "mongo/db/s/shard_server_catalog_cache_loader.h" +#include "mongo/db/s/sharding_state.h" #include "mongo/db/s/wait_for_majority_service.h" +#include "mongo/s/catalog/sharding_catalog_client_mock.h" #include "mongo/s/catalog/type_shard.h" #include "mongo/s/database_version_helpers.h" #include "mongo/s/shard_server_test_fixture.h" @@ -376,22 +380,151 @@ TEST_F(MigrationUtilsTest, TestInvalidUUID) { ASSERT_FALSE(migrationutil::checkForConflictingDeletions(opCtx, range, wrongUuid)); } -using SubmitRangeDeletionTaskTest = MigrationUtilsTest; +// Fixture that uses a mocked CatalogCacheLoader and CatalogClient to allow metadata refreshes +// without using the mock network. +class SubmitRangeDeletionTaskTest : public ShardServerTestFixture { +public: + const HostAndPort kConfigHostAndPort{"dummy", 123}; + const NamespaceString kNss{"test.foo"}; + const ShardKeyPattern kShardKeyPattern = ShardKeyPattern(BSON("_id" << 1)); + const UUID kDefaultUUID = UUID::gen(); + const OID kEpoch = OID::gen(); + const DatabaseType kDefaultDatabaseType = + DatabaseType(kNss.db().toString(), ShardId("0"), true, DatabaseVersion(kDefaultUUID, 1)); + const std::vector<ShardType> kShardList = {ShardType("0", "Host0:12345"), + ShardType("1", "Host1:12345")}; + + void setUp() override { + // Don't call ShardServerTestFixture::setUp so we can install a mock catalog cache loader. + ShardingMongodTestFixture::setUp(); + + replicationCoordinator()->alwaysAllowWrites(true); + serverGlobalParams.clusterRole = ClusterRole::ShardServer; + + _clusterId = OID::gen(); + ShardingState::get(getServiceContext())->setInitialized(_myShardName, _clusterId); + + std::unique_ptr<CatalogCacheLoaderMock> mockLoader = + std::make_unique<CatalogCacheLoaderMock>(); + _mockCatalogCacheLoader = mockLoader.get(); + CatalogCacheLoader::set(getServiceContext(), std::move(mockLoader)); + + uassertStatusOK( + initializeGlobalShardingStateForMongodForTest(ConnectionString(kConfigHostAndPort))); + + configTargeterMock()->setFindHostReturnValue(kConfigHostAndPort); + + WaitForMajorityService::get(getServiceContext()).setUp(getServiceContext()); + + // Set up 2 default shards. + for (const auto& shard : kShardList) { + std::unique_ptr<RemoteCommandTargeterMock> targeter( + std::make_unique<RemoteCommandTargeterMock>()); + HostAndPort host(shard.getHost()); + targeter->setConnectionStringReturnValue(ConnectionString(host)); + targeter->setFindHostReturnValue(host); + targeterFactory()->addTargeterToReturn(ConnectionString(host), std::move(targeter)); + } + } + + void tearDown() override { + WaitForMajorityService::get(getServiceContext()).shutDown(); + CatalogCacheLoader::clearForTests(getServiceContext()); + ShardingMongodTestFixture::tearDown(); + CollectionShardingStateFactory::clear(getServiceContext()); + } + + // Mock for the ShardingCatalogClient used to satisfy loading all shards for the ShardRegistry + // and loading all collections when a database is loaded for the first time by the CatalogCache. + class StaticCatalogClient final : public ShardingCatalogClientMock { + public: + StaticCatalogClient(std::vector<ShardType> shards) + : ShardingCatalogClientMock(nullptr), _shards(std::move(shards)) {} + + StatusWith<repl::OpTimeWith<std::vector<ShardType>>> getAllShards( + OperationContext* opCtx, repl::ReadConcernLevel readConcern) override { + return repl::OpTimeWith<std::vector<ShardType>>(_shards); + } + + StatusWith<std::vector<CollectionType>> getCollections( + OperationContext* opCtx, + const std::string* dbName, + repl::OpTime* optime, + repl::ReadConcernLevel readConcernLevel) override { + return _colls; + } + + void setCollections(std::vector<CollectionType> colls) { + _colls = std::move(colls); + } + + private: + const std::vector<ShardType> _shards; + std::vector<CollectionType> _colls; + }; + + std::unique_ptr<ShardingCatalogClient> makeShardingCatalogClient( + std::unique_ptr<DistLockManager> distLockManager) override { + auto mockCatalogClient = std::make_unique<StaticCatalogClient>(kShardList); + // Stash a pointer to the mock so its return values can be set. + _mockCatalogClient = mockCatalogClient.get(); + return mockCatalogClient; + } + + CollectionType makeCollectionType(UUID uuid, OID epoch) { + CollectionType coll; + coll.setNs(kNss); + coll.setEpoch(epoch); + coll.setKeyPattern(kShardKeyPattern.getKeyPattern()); + coll.setUnique(true); + coll.setUUID(uuid); + return coll; + } + + std::vector<ChunkType> makeChangedChunks(ChunkVersion startingVersion) { + ChunkType chunk1(kNss, + {kShardKeyPattern.getKeyPattern().globalMin(), BSON("_id" << -100)}, + startingVersion, + {"0"}); + chunk1.setName(OID::gen()); + startingVersion.incMinor(); + + ChunkType chunk2(kNss, {BSON("_id" << -100), BSON("_id" << 0)}, startingVersion, {"1"}); + chunk2.setName(OID::gen()); + startingVersion.incMinor(); + + ChunkType chunk3(kNss, {BSON("_id" << 0), BSON("_id" << 100)}, startingVersion, {"0"}); + chunk3.setName(OID::gen()); + startingVersion.incMinor(); + + ChunkType chunk4(kNss, + {BSON("_id" << 100), kShardKeyPattern.getKeyPattern().globalMax()}, + startingVersion, + {"1"}); + chunk4.setName(OID::gen()); + startingVersion.incMinor(); + + return std::vector<ChunkType>{chunk1, chunk2, chunk3, chunk4}; + } + + CatalogCacheLoaderMock* _mockCatalogCacheLoader; + StaticCatalogClient* _mockCatalogClient; +}; TEST_F(SubmitRangeDeletionTaskTest, FailsAndDeletesTaskIfFilteringMetadataIsUnknownEvenAfterRefresh) { auto opCtx = operationContext(); - const auto uuid = UUID::gen(); - auto deletionTask = createDeletionTask(kNss, uuid, 0, 10); + auto deletionTask = createDeletionTask(kNss, kDefaultUUID, 0, 10); PersistentTaskStore<RangeDeletionTask> store(opCtx, NamespaceString::kRangeDeletionNamespace); store.add(opCtx, deletionTask); ASSERT_EQ(store.count(opCtx), 1); - // Make the refresh triggered by submitting the task return an empty result. - auto result = stdx::async(stdx::launch::async, - [this, uuid] { respondToMetadataRefreshRequestsWithError(); }); + // Make the refresh triggered by submitting the task return an empty result when loading the + // database. + _mockCatalogCacheLoader->setDatabaseRefreshReturnValue( + Status(ErrorCodes::NamespaceNotFound, "dummy errmsg")); auto submitTaskFuture = migrationutil::submitRangeDeletionTask(opCtx, deletionTask); @@ -404,12 +537,15 @@ TEST_F(SubmitRangeDeletionTaskTest, TEST_F(SubmitRangeDeletionTaskTest, SucceedsIfFilteringMetadataUUIDMatchesTaskUUID) { auto opCtx = operationContext(); - const auto uuid = UUID::gen(); - auto deletionTask = createDeletionTask(kNss, uuid, 0, 10); + auto deletionTask = createDeletionTask(kNss, kDefaultUUID, 0, 10); // Force a metadata refresh with the task's UUID before the task is submitted. - auto result = - stdx::async(stdx::launch::async, [this, uuid] { respondToMetadataRefreshRequests(uuid); }); + auto coll = makeCollectionType(kDefaultUUID, kEpoch); + _mockCatalogCacheLoader->setDatabaseRefreshReturnValue(kDefaultDatabaseType); + _mockCatalogCacheLoader->setCollectionRefreshReturnValue(coll); + _mockCatalogCacheLoader->setChunkRefreshReturnValue( + makeChangedChunks(ChunkVersion(1, 0, kEpoch))); + _mockCatalogClient->setCollections({coll}); forceShardFilteringMetadataRefresh(opCtx, kNss, true); // The task should have been submitted successfully. @@ -422,12 +558,15 @@ TEST_F( SucceedsIfFilteringMetadataInitiallyUnknownButFilteringMetadataUUIDMatchesTaskUUIDAfterRefresh) { auto opCtx = operationContext(); - const auto uuid = UUID::gen(); - auto deletionTask = createDeletionTask(kNss, uuid, 0, 10); + auto deletionTask = createDeletionTask(kNss, kDefaultUUID, 0, 10); // Make the refresh triggered by submitting the task return a UUID that matches the task's UUID. - auto result = - stdx::async(stdx::launch::async, [this, uuid] { respondToMetadataRefreshRequests(uuid); }); + auto coll = makeCollectionType(kDefaultUUID, kEpoch); + _mockCatalogCacheLoader->setDatabaseRefreshReturnValue(kDefaultDatabaseType); + _mockCatalogCacheLoader->setCollectionRefreshReturnValue(coll); + _mockCatalogCacheLoader->setChunkRefreshReturnValue( + makeChangedChunks(ChunkVersion(1, 0, kEpoch))); + _mockCatalogClient->setCollections({coll}); // The task should have been submitted successfully. auto submitTaskFuture = migrationutil::submitRangeDeletionTask(opCtx, deletionTask); @@ -440,16 +579,24 @@ TEST_F(SubmitRangeDeletionTaskTest, // Force a metadata refresh with an arbitrary UUID so that the node's filtering metadata is // stale when the task is submitted. - auto result1 = stdx::async(stdx::launch::async, [this] { respondToMetadataRefreshRequests(); }); + const auto staleUUID = UUID::gen(); + const auto staleEpoch = OID::gen(); + auto staleColl = makeCollectionType(staleUUID, staleEpoch); + _mockCatalogCacheLoader->setDatabaseRefreshReturnValue(kDefaultDatabaseType); + _mockCatalogCacheLoader->setCollectionRefreshReturnValue(staleColl); + _mockCatalogCacheLoader->setChunkRefreshReturnValue( + makeChangedChunks(ChunkVersion(1, 0, staleEpoch))); + _mockCatalogClient->setCollections({staleColl}); forceShardFilteringMetadataRefresh(opCtx, kNss, true); - const auto uuid = UUID::gen(); - auto deletionTask = createDeletionTask(kNss, uuid, 0, 10); + auto deletionTask = createDeletionTask(kNss, kDefaultUUID, 0, 10); // Make the refresh triggered by submitting the task return a UUID that matches the task's UUID. - auto result2 = stdx::async(stdx::launch::async, [this, uuid] { - respondToMetadataRefreshRequests(uuid, true /* incrementalRefresh */); - }); + auto matchingColl = makeCollectionType(kDefaultUUID, kEpoch); + _mockCatalogCacheLoader->setCollectionRefreshReturnValue(matchingColl); + _mockCatalogCacheLoader->setChunkRefreshReturnValue( + makeChangedChunks(ChunkVersion(10, 0, kEpoch))); + _mockCatalogClient->setCollections({matchingColl}); // The task should have been submitted successfully. auto submitTaskFuture = migrationutil::submitRangeDeletionTask(opCtx, deletionTask); @@ -460,16 +607,20 @@ TEST_F(SubmitRangeDeletionTaskTest, FailsAndDeletesTaskIfFilteringMetadataUUIDDifferentFromTaskUUIDEvenAfterRefresh) { auto opCtx = operationContext(); - const auto uuid = UUID::gen(); - auto deletionTask = createDeletionTask(kNss, uuid, 0, 10); + auto deletionTask = createDeletionTask(kNss, kDefaultUUID, 0, 10); PersistentTaskStore<RangeDeletionTask> store(opCtx, NamespaceString::kRangeDeletionNamespace); store.add(opCtx, deletionTask); ASSERT_EQ(store.count(opCtx), 1); // Make the refresh triggered by submitting the task return an arbitrary UUID. - auto result2 = - stdx::async(stdx::launch::async, [this, uuid] { respondToMetadataRefreshRequests(); }); + const auto otherEpoch = OID::gen(); + auto otherColl = makeCollectionType(UUID::gen(), otherEpoch); + _mockCatalogCacheLoader->setDatabaseRefreshReturnValue(kDefaultDatabaseType); + _mockCatalogCacheLoader->setCollectionRefreshReturnValue(otherColl); + _mockCatalogCacheLoader->setChunkRefreshReturnValue( + makeChangedChunks(ChunkVersion(1, 0, otherEpoch))); + _mockCatalogClient->setCollections({otherColl}); // The task should not have been submitted, and the task's entry should have been removed from // the persistent store. |