diff options
author | Esha Maharishi <esha.maharishi@mongodb.com> | 2016-08-01 14:25:08 -0400 |
---|---|---|
committer | Esha Maharishi <esha.maharishi@mongodb.com> | 2016-08-01 14:25:08 -0400 |
commit | c169349f8fd5769747d47ca3a984bfd64fc77802 (patch) | |
tree | 2c4fec43a70c719e2c838ad475293444cc3a5652 | |
parent | bbf959a5d661d061d7564a4049633626f558da08 (diff) | |
download | mongo-c169349f8fd5769747d47ca3a984bfd64fc77802.tar.gz |
SERVER-25154 unit tests for addShard compatibility path
5 files changed, 276 insertions, 31 deletions
diff --git a/src/mongo/s/catalog/replset/sharding_catalog_add_shard_test.cpp b/src/mongo/s/catalog/replset/sharding_catalog_add_shard_test.cpp index 2da1fcb718a..4bf6396012f 100644 --- a/src/mongo/s/catalog/replset/sharding_catalog_add_shard_test.cpp +++ b/src/mongo/s/catalog/replset/sharding_catalog_add_shard_test.cpp @@ -37,6 +37,8 @@ #include "mongo/client/remote_command_targeter_mock.h" #include "mongo/db/commands.h" #include "mongo/db/query/query_request.h" +#include "mongo/db/repl/member_state.h" +#include "mongo/db/repl/replication_coordinator_mock.h" #include "mongo/db/s/type_shard_identity.h" #include "mongo/rpc/metadata/repl_set_metadata.h" #include "mongo/rpc/metadata/server_selection_metadata.h" @@ -131,8 +133,8 @@ protected: * Waits for a request for the shardIdentity document to be upserted into a shard from the * config server on addShard. */ - void expectShardIdentityUpsert(const HostAndPort& expectedHost, - const std::string& expectedShardName) { + void expectShardIdentityUpsertReturnSuccess(const HostAndPort& expectedHost, + const std::string& expectedShardName) { // Create the expected upsert shardIdentity command for this shardType. auto upsertCmdObj = catalogManager()->createShardIdentityUpsertForAddShard( operationContext(), expectedShardName); @@ -142,18 +144,36 @@ protected: std::string errMsg; invariant(request.parseBSON("admin", upsertCmdObj, &errMsg) || !request.isValid(&errMsg)); - expectUpdates(expectedHost, - NamespaceString(NamespaceString::kConfigCollectionNamespace), - request.getUpdateRequest()); + expectUpdatesReturnSuccess(expectedHost, + NamespaceString(NamespaceString::kConfigCollectionNamespace), + request.getUpdateRequest()); + } + + void expectShardIdentityUpsertReturnFailure(const HostAndPort& expectedHost, + const std::string& expectedShardName, + const Status& statusToReturn) { + // Create the expected upsert shardIdentity command for this shardType. + auto upsertCmdObj = catalogManager()->createShardIdentityUpsertForAddShard( + operationContext(), expectedShardName); + + // Get the BatchedUpdateRequest from the upsert command. + BatchedCommandRequest request(BatchedCommandRequest::BatchType::BatchType_Update); + std::string errMsg; + invariant(request.parseBSON("admin", upsertCmdObj, &errMsg) || !request.isValid(&errMsg)); + + expectUpdatesReturnFailure(expectedHost, + NamespaceString(NamespaceString::kConfigCollectionNamespace), + request.getUpdateRequest(), + statusToReturn); } /** * Waits for a set of batched updates and ensures that the host, namespace, and updates exactly * match what's expected. Responds with a success status. */ - void expectUpdates(const HostAndPort& expectedHost, - const NamespaceString& expectedNss, - BatchedUpdateRequest* expectedBatchedUpdates) { + void expectUpdatesReturnSuccess(const HostAndPort& expectedHost, + const NamespaceString& expectedNss, + BatchedUpdateRequest* expectedBatchedUpdates) { onCommandForAddShard([&](const RemoteCommandRequest& request) { ASSERT_EQUALS(expectedHost, request.target); @@ -193,6 +213,49 @@ protected: } /** + * Waits for a set of batched updates and ensures that the host, namespace, and updates exactly + * match what's expected. Responds with a failure status. + */ + void expectUpdatesReturnFailure(const HostAndPort& expectedHost, + const NamespaceString& expectedNss, + BatchedUpdateRequest* expectedBatchedUpdates, + const Status& statusToReturn) { + onCommandForAddShard([&](const RemoteCommandRequest& request) { + + ASSERT_EQUALS(expectedHost, request.target); + + // Check that the db name in the request matches the expected db name. + ASSERT_EQUALS(expectedNss.db(), request.dbname); + + BatchedUpdateRequest actualBatchedUpdates; + std::string errmsg; + ASSERT_TRUE(actualBatchedUpdates.parseBSON(request.dbname, request.cmdObj, &errmsg)); + + // Check that the db and collection names in the BatchedUpdateRequest match the + // expected. + ASSERT_EQUALS(expectedNss, actualBatchedUpdates.getNS()); + + auto expectedUpdates = expectedBatchedUpdates->getUpdates(); + auto actualUpdates = actualBatchedUpdates.getUpdates(); + + ASSERT_EQUALS(expectedUpdates.size(), actualUpdates.size()); + + auto itExpected = expectedUpdates.begin(); + auto itActual = actualUpdates.begin(); + + for (; itActual != actualUpdates.end(); itActual++, itExpected++) { + ASSERT_EQ((*itExpected)->getUpsert(), (*itActual)->getUpsert()); + ASSERT_EQ((*itExpected)->getMulti(), (*itActual)->getMulti()); + ASSERT_EQ((*itExpected)->getQuery(), (*itActual)->getQuery()); + ASSERT_EQ((*itExpected)->getUpdateExpr(), (*itActual)->getUpdateExpr()); + } + + return statusToReturn; + }); + } + + + /** * Asserts that a document exists in the config server's config.shards collection corresponding * to 'expectedShard'. */ @@ -245,6 +308,12 @@ protected: ASSERT_EQUALS(addedShard.getHost(), logEntry.getDetails()["host"].String()); } + void forwardAddShardNetwork(Date_t when) { + networkForAddShard()->enterNetwork(); + networkForAddShard()->runUntil(when); + networkForAddShard()->exitNetwork(); + } + OID _clusterId; }; @@ -330,13 +399,13 @@ TEST_F(AddShardTest, StandaloneBasicSuccess) { BSON("name" << discoveredDB1.getName() << "sizeOnDisk" << 2000), BSON("name" << discoveredDB2.getName() << "sizeOnDisk" << 5000)}); - // The shardIdentity doc inserted into the config.version collection on the shard. - expectShardIdentityUpsert(shardTarget, expectedShardName); + // The shardIdentity doc inserted into the admin.system.version collection on the shard. + expectShardIdentityUpsertReturnSuccess(shardTarget, expectedShardName); // Wait for the addShard to complete before checking the config database future.timed_get(kFutureTimeout); - // Ensure that the shard document was properly added to config.shards + // Ensure that the shard document was properly added to config.shards. assertShardExists(expectedShard); // Ensure that the databases detected from the shard were properly added to config.database. @@ -408,13 +477,13 @@ TEST_F(AddShardTest, StandaloneGenerateName) { BSON("name" << discoveredDB1.getName() << "sizeOnDisk" << 2000), BSON("name" << discoveredDB2.getName() << "sizeOnDisk" << 5000)}); - // The shardIdentity doc inserted into the config.version collection on the shard. - expectShardIdentityUpsert(shardTarget, expectedShardName); + // The shardIdentity doc inserted into the admin.system.version collection on the shard. + expectShardIdentityUpsertReturnSuccess(shardTarget, expectedShardName); // Wait for the addShard to complete before checking the config database future.timed_get(kFutureTimeout); - // Ensure that the shard document was properly added to config.shards + // Ensure that the shard document was properly added to config.shards. assertShardExists(expectedShard); // Ensure that the databases detected from the shard were properly added to config.database. @@ -780,13 +849,13 @@ TEST_F(AddShardTest, SuccessfullyAddReplicaSet) { // Get databases list from new shard expectListDatabases(shardTarget, std::vector<BSONObj>{BSON("name" << discoveredDB.getName())}); - // The shardIdentity doc inserted into the config.version collection on the shard. - expectShardIdentityUpsert(shardTarget, expectedShardName); + // The shardIdentity doc inserted into the admin.system.version collection on the shard. + expectShardIdentityUpsertReturnSuccess(shardTarget, expectedShardName); // Wait for the addShard to complete before checking the config database future.timed_get(kFutureTimeout); - // Ensure that the shard document was properly added to config.shards + // Ensure that the shard document was properly added to config.shards. assertShardExists(expectedShard); // Ensure that the databases detected from the shard were properly added to config.database. @@ -840,13 +909,13 @@ TEST_F(AddShardTest, ReplicaSetExtraHostsDiscovered) { // Get databases list from new shard expectListDatabases(shardTarget, std::vector<BSONObj>{BSON("name" << discoveredDB.getName())}); - // The shardIdentity doc inserted into the config.version collection on the shard. - expectShardIdentityUpsert(shardTarget, expectedShardName); + // The shardIdentity doc inserted into the admin.system.version collection on the shard. + expectShardIdentityUpsertReturnSuccess(shardTarget, expectedShardName); // Wait for the addShard to complete before checking the config database future.timed_get(kFutureTimeout); - // Ensure that the shard document was properly added to config.shards + // Ensure that the shard document was properly added to config.shards. assertShardExists(expectedShard); // Ensure that the databases detected from the shard were properly added to config.database. @@ -915,13 +984,13 @@ TEST_F(AddShardTest, AddShardSucceedsEvenIfAddingDBsFromNewShardFails) { BSON("name" << discoveredDB1.getName() << "sizeOnDisk" << 2000), BSON("name" << discoveredDB2.getName() << "sizeOnDisk" << 5000)}); - // The shardIdentity doc inserted into the config.version collection on the shard. - expectShardIdentityUpsert(shardTarget, expectedShardName); + // The shardIdentity doc inserted into the admin.system.version collection on the shard. + expectShardIdentityUpsertReturnSuccess(shardTarget, expectedShardName); // Wait for the addShard to complete before checking the config database future.timed_get(kFutureTimeout); - // Ensure that the shard document was properly added to config.shards + // Ensure that the shard document was properly added to config.shards. assertShardExists(expectedShard); // Ensure that the databases detected from the shard were *not* added. @@ -935,6 +1004,162 @@ TEST_F(AddShardTest, AddShardSucceedsEvenIfAddingDBsFromNewShardFails) { assertChangeWasLogged(expectedShard); } +TEST_F(AddShardTest, CompatibilityAddShardSuccess) { + // This is a hack to set the ReplicationCoordinator's MemberState to primary, since this test + // relies on behavior guarded by a check that we are a primary. + repl::ReplicationCoordinator::get(getGlobalServiceContext()) + ->setFollowerMode(repl::MemberState::RS_PRIMARY); + + std::unique_ptr<RemoteCommandTargeterMock> targeter( + stdx::make_unique<RemoteCommandTargeterMock>()); + HostAndPort shardTarget("StandaloneHost:12345"); + targeter->setConnectionStringReturnValue(ConnectionString(shardTarget)); + targeter->setFindHostReturnValue(shardTarget); + targeterFactory()->addTargeterToReturn(ConnectionString(shardTarget), std::move(targeter)); + + std::string shardName = "StandaloneShard"; + + // The shard doc inserted into the config.shards collection on the config server. + ShardType addedShard; + addedShard.setName(shardName); + addedShard.setHost(shardTarget.toString()); + addedShard.setMaxSizeMB(100); + + // Add the shard to config.shards to trigger the OpObserver that performs shard aware + // initialization. + ASSERT_OK(catalogClient()->insertConfigDocument(operationContext(), + ShardType::ConfigNS, + addedShard.toBSON(), + ShardingCatalogClient::kMajorityWriteConcern)); + + // The shardIdentity doc inserted into the admin.system.version collection on the shard. + expectShardIdentityUpsertReturnSuccess(shardTarget, shardName); + + // Since the shardIdentity upsert succeeded, the entry in config.shards should have been + // updated to reflect that the shard is now shard aware. + addedShard.setState(ShardType::ShardState::kShardAware); + + // Ensure that the shard document was properly added to config.shards. + assertShardExists(addedShard); +} + +TEST_F(AddShardTest, CompatibilityAddShardRetryOnGenericFailures) { + // This is a hack to set the ReplicationCoordinator's MemberState to primary, since this test + // relies on behavior guarded by a check that we are a primary. + repl::ReplicationCoordinator::get(getGlobalServiceContext()) + ->setFollowerMode(repl::MemberState::RS_PRIMARY); + + std::unique_ptr<RemoteCommandTargeterMock> targeter( + stdx::make_unique<RemoteCommandTargeterMock>()); + HostAndPort shardTarget("StandaloneHost:12345"); + targeter->setConnectionStringReturnValue(ConnectionString(shardTarget)); + targeter->setFindHostReturnValue(shardTarget); + targeterFactory()->addTargeterToReturn(ConnectionString(shardTarget), std::move(targeter)); + + std::string shardName = "StandaloneShard"; + + // The shard doc inserted into the config.shards collection on the config server. + ShardType addedShard; + addedShard.setName(shardName); + addedShard.setHost(shardTarget.toString()); + addedShard.setMaxSizeMB(100); + + // Add the shard to config.shards to trigger the OpObserver that performs shard aware + // initialization. + ASSERT_OK(catalogClient()->insertConfigDocument(operationContext(), + ShardType::ConfigNS, + addedShard.toBSON(), + ShardingCatalogClient::kMajorityWriteConcern)); + + // Simulate several failures upserting the shardIdentity doc on the shard. The upsert should + // be rescheduled and retried until it succeeds. + + expectShardIdentityUpsertReturnFailure( + shardTarget, shardName, {ErrorCodes::HostUnreachable, "host unreachable"}); + forwardAddShardNetwork(networkForAddShard()->now() + + ShardingCatalogManager::getAddShardTaskRetryInterval() + + Milliseconds(10)); + + expectShardIdentityUpsertReturnFailure( + shardTarget, shardName, {ErrorCodes::WriteConcernFailed, "write concern failed"}); + forwardAddShardNetwork(networkForAddShard()->now() + + ShardingCatalogManager::getAddShardTaskRetryInterval() + + Milliseconds(10)); + + expectShardIdentityUpsertReturnFailure( + shardTarget, shardName, {ErrorCodes::RemoteChangeDetected, "remote change detected"}); + forwardAddShardNetwork(networkForAddShard()->now() + + ShardingCatalogManager::getAddShardTaskRetryInterval() + + Milliseconds(10)); + + // Finally, respond with success. + expectShardIdentityUpsertReturnSuccess(shardTarget, shardName); + forwardAddShardNetwork(networkForAddShard()->now() + + ShardingCatalogManager::getAddShardTaskRetryInterval() + + Milliseconds(10)); + + // Since the shardIdentity upsert succeeded, the entry in config.shards should have been + // updated to reflect that the shard is now shard aware. + addedShard.setState(ShardType::ShardState::kShardAware); + + // Ensure that the shard document was properly added to config.shards. + assertShardExists(addedShard); +} + +// Note: This test is separated from the generic failures one because there is a special code path +// to handle DuplicateKey errors, even though the server's actual behavior is the same. +TEST_F(AddShardTest, CompatibilityAddShardRetryOnDuplicateKeyFailure) { + // This is a hack to set the ReplicationCoordinator's MemberState to primary, since this test + // relies on behavior guarded by a check that we are a primary. + repl::ReplicationCoordinator::get(getGlobalServiceContext()) + ->setFollowerMode(repl::MemberState::RS_PRIMARY); + + std::unique_ptr<RemoteCommandTargeterMock> targeter( + stdx::make_unique<RemoteCommandTargeterMock>()); + HostAndPort shardTarget("StandaloneHost:12345"); + targeter->setConnectionStringReturnValue(ConnectionString(shardTarget)); + targeter->setFindHostReturnValue(shardTarget); + targeterFactory()->addTargeterToReturn(ConnectionString(shardTarget), std::move(targeter)); + + std::string shardName = "StandaloneShard"; + + // The shard doc inserted into the config.shards collection on the config server. + ShardType addedShard; + addedShard.setName(shardName); + addedShard.setHost(shardTarget.toString()); + addedShard.setMaxSizeMB(100); + + // Add the shard to config.shards to trigger the OpObserver that performs shard aware + // initialization. + ASSERT_OK(catalogClient()->insertConfigDocument(operationContext(), + ShardType::ConfigNS, + addedShard.toBSON(), + ShardingCatalogClient::kMajorityWriteConcern)); + + // Simulate several DuplicateKeyError failures while the shardIdentity document on the shard + // has not yet been manually deleted. + for (int i = 0; i < 3; i++) { + expectShardIdentityUpsertReturnFailure( + shardTarget, shardName, {ErrorCodes::DuplicateKey, "duplicate key"}); + forwardAddShardNetwork(networkForAddShard()->now() + + ShardingCatalogManager::getAddShardTaskRetryInterval() + + Milliseconds(10)); + } + + // Finally, respond with success (simulating that the shardIdentity document has been deleted). + expectShardIdentityUpsertReturnSuccess(shardTarget, shardName); + forwardAddShardNetwork(networkForAddShard()->now() + + ShardingCatalogManager::getAddShardTaskRetryInterval() + + Milliseconds(10)); + + // Since the shardIdentity upsert succeeded, the entry in config.shards should have been + // updated to reflect that the shard is now shard aware. + addedShard.setState(ShardType::ShardState::kShardAware); + + // Ensure that the shard document was properly added to config.shards. + assertShardExists(addedShard); +} + /* TODO(SERVER-24213): Add back tests around adding shard that already exists. // Host is already part of an existing shard. @@ -1032,8 +1257,8 @@ TEST_F(AddShardTest, ReAddExistingShard) { expectGetDatabase("shardDB", boost::none); - // The shardIdentity doc inserted into the config.version collection on the shard. - expectShardIdentityUpsert(shardTarget, expectedShardName); + // The shardIdentity doc inserted into the admin.system.version collection on the shard. + expectShardIdentityUpsertReturnSuccess(shardTarget, expectedShardName); // The shard doc inserted into the config.shards collection on the config server. ShardType newShard; diff --git a/src/mongo/s/catalog/replset/sharding_catalog_manager_impl.cpp b/src/mongo/s/catalog/replset/sharding_catalog_manager_impl.cpp index 4a200b34f41..8bdf449de3f 100644 --- a/src/mongo/s/catalog/replset/sharding_catalog_manager_impl.cpp +++ b/src/mongo/s/catalog/replset/sharding_catalog_manager_impl.cpp @@ -88,7 +88,6 @@ using CallbackArgs = executor::TaskExecutor::CallbackArgs; using RemoteCommandCallbackArgs = executor::TaskExecutor::RemoteCommandCallbackArgs; using RemoteCommandCallbackFn = executor::TaskExecutor::RemoteCommandCallbackFn; -const Seconds kAddShardTaskRetryInterval(30); const Seconds kDefaultFindHostMaxWaitTime(20); const ReadPreferenceSetting kConfigReadSelector(ReadPreference::Nearest, TagSet{}); @@ -229,7 +228,6 @@ StatusWith<ChunkRange> includeFullShardKey(OperationContext* txn, } // namespace - ShardingCatalogManagerImpl::ShardingCatalogManagerImpl( ShardingCatalogClient* catalogClient, std::unique_ptr<executor::TaskExecutor> addShardExecutor) : _catalogClient(catalogClient), _executorForAddShard(std::move(addShardExecutor)) {} @@ -1216,7 +1214,7 @@ void ShardingCatalogManagerImpl::_scheduleAddShardTask( << " when trying to upsert a shardIdentity document, " << causedBy(swHost.getStatus()); const Date_t now = _executorForAddShard->now(); - const Date_t when = now + kAddShardTaskRetryInterval; + const Date_t when = now + getAddShardTaskRetryInterval(); _trackAddShardHandle_inlock( shardType.getName(), _executorForAddShard->scheduleWorkAt( @@ -1310,7 +1308,7 @@ void ShardingCatalogManagerImpl::_handleAddShardTaskResponse( // If the command did not succeed, schedule the upsert shardIdentity task again with a // delay. const Date_t now = _executorForAddShard->now(); - const Date_t when = now + kAddShardTaskRetryInterval; + const Date_t when = now + getAddShardTaskRetryInterval(); // Track the handle from scheduleWorkAt. _trackAddShardHandle_inlock( diff --git a/src/mongo/s/catalog/sharding_catalog_manager.h b/src/mongo/s/catalog/sharding_catalog_manager.h index dbb38c16b48..c3fd3d5117e 100644 --- a/src/mongo/s/catalog/sharding_catalog_manager.h +++ b/src/mongo/s/catalog/sharding_catalog_manager.h @@ -66,6 +66,10 @@ class ShardingCatalogManager { MONGO_DISALLOW_COPYING(ShardingCatalogManager); public: + static Seconds getAddShardTaskRetryInterval() { + return Seconds{30}; + } + virtual ~ShardingCatalogManager() = default; /** diff --git a/src/mongo/s/config_server_test_fixture.cpp b/src/mongo/s/config_server_test_fixture.cpp index 49573d43603..ff8e9f434b0 100644 --- a/src/mongo/s/config_server_test_fixture.cpp +++ b/src/mongo/s/config_server_test_fixture.cpp @@ -153,9 +153,10 @@ void ConfigServerTestFixture::setUp() { // Set up executor used for a few special operations during addShard. auto specialNet(stdx::make_unique<executor::NetworkInterfaceMock>()); - auto specialMockNet = specialNet.get(); + _mockNetworkForAddShard = specialNet.get(); auto specialExec = makeThreadPoolTestExecutor(std::move(specialNet)); - _addShardNetworkTestEnv = stdx::make_unique<NetworkTestEnv>(specialExec.get(), specialMockNet); + _addShardNetworkTestEnv = + stdx::make_unique<NetworkTestEnv>(specialExec.get(), _mockNetworkForAddShard); _executorForAddShard = specialExec.get(); auto targeterFactory(stdx::make_unique<RemoteCommandTargeterFactoryMock>()); @@ -285,12 +286,24 @@ executor::NetworkInterfaceMock* ConfigServerTestFixture::network() const { return _mockNetwork; } +executor::NetworkInterfaceMock* ConfigServerTestFixture::networkForAddShard() const { + invariant(_mockNetworkForAddShard); + + return _mockNetworkForAddShard; +} + executor::TaskExecutor* ConfigServerTestFixture::executor() const { invariant(_executor); return _executor; } +executor::TaskExecutor* ConfigServerTestFixture::executorForAddShard() const { + invariant(_executorForAddShard); + + return _executorForAddShard; +} + MessagingPortMock* ConfigServerTestFixture::getMessagingPort() const { return _messagePort.get(); } diff --git a/src/mongo/s/config_server_test_fixture.h b/src/mongo/s/config_server_test_fixture.h index 38d348a78f0..776b637a3af 100644 --- a/src/mongo/s/config_server_test_fixture.h +++ b/src/mongo/s/config_server_test_fixture.h @@ -96,8 +96,12 @@ public: executor::NetworkInterfaceMock* network() const; + executor::NetworkInterfaceMock* networkForAddShard() const; + executor::TaskExecutor* executor() const; + executor::TaskExecutor* executorForAddShard() const; + MessagingPortMock* getMessagingPort() const; ReplSetDistLockManager* distLock() const; @@ -168,6 +172,7 @@ private: RemoteCommandTargeterFactoryMock* _targeterFactory; executor::NetworkInterfaceMock* _mockNetwork; + executor::NetworkInterfaceMock* _mockNetworkForAddShard; executor::TaskExecutor* _executor; executor::TaskExecutor* _executorForAddShard; std::unique_ptr<executor::NetworkTestEnv> _networkTestEnv; |