diff options
author | Spencer T Brody <spencer@mongodb.com> | 2016-07-18 14:45:04 -0400 |
---|---|---|
committer | Spencer T Brody <spencer@mongodb.com> | 2016-08-08 17:51:29 -0400 |
commit | 9597d67f9640b32fad2264c49da4ba0bd271d395 (patch) | |
tree | d8c1d1d219924e5d4dd4f7730600c6591350d299 /src/mongo/s | |
parent | 1257f4f5cc2c5a09383c1ddcd3d03bc83809ca18 (diff) | |
download | mongo-9597d67f9640b32fad2264c49da4ba0bd271d395.tar.gz |
SERVER-24213 Make addShard return success if attempting to add an already existing shard
Diffstat (limited to 'src/mongo/s')
12 files changed, 447 insertions, 153 deletions
diff --git a/src/mongo/s/balancer/cluster_statistics_impl.cpp b/src/mongo/s/balancer/cluster_statistics_impl.cpp index c5a40a8b742..01993f1a938 100644 --- a/src/mongo/s/balancer/cluster_statistics_impl.cpp +++ b/src/mongo/s/balancer/cluster_statistics_impl.cpp @@ -104,7 +104,8 @@ StatusWith<vector<ShardStatistics>> ClusterStatisticsImpl::getStats(OperationCon // db.serverStatus() (mem.mapped) to all shards. // // TODO: skip unresponsive shards and mark information as stale. - auto shardsStatus = Grid::get(txn)->catalogClient(txn)->getAllShards(txn); + auto shardsStatus = Grid::get(txn)->catalogClient(txn)->getAllShards( + txn, repl::ReadConcernLevel::kMajorityReadConcern); if (!shardsStatus.isOK()) { return shardsStatus.getStatus(); } diff --git a/src/mongo/s/catalog/replset/sharding_catalog_add_shard_test.cpp b/src/mongo/s/catalog/replset/sharding_catalog_add_shard_test.cpp index 63934c8a58e..691d8dc76b0 100644 --- a/src/mongo/s/catalog/replset/sharding_catalog_add_shard_test.cpp +++ b/src/mongo/s/catalog/replset/sharding_catalog_add_shard_test.cpp @@ -1354,138 +1354,303 @@ TEST_F(AddShardTest, CompatibilityAddShardCancelRescheduledCallbackReAddShard) { assertShardExists(addedShard); } -/* -TODO(SERVER-24213): Add back tests around adding shard that already exists. -// Host is already part of an existing shard. +// Tests both that trying to add a shard with the same host as an existing shard but with different +// options fails, and that adding a shard with the same host as an existing shard with the *same* +// options succeeds. TEST_F(AddShardTest, AddExistingShardStandalone) { - std::unique_ptr<RemoteCommandTargeterMock> targeter( + HostAndPort shardTarget("StandaloneHost:12345"); + std::unique_ptr<RemoteCommandTargeterMock> standaloneTargeter( stdx::make_unique<RemoteCommandTargeterMock>()); - HostAndPort shardTarget = HostAndPort("host1:12345"); - targeter->setConnectionStringReturnValue(ConnectionString(shardTarget)); - targeter->setFindHostReturnValue(shardTarget); + standaloneTargeter->setConnectionStringReturnValue(ConnectionString(shardTarget)); + standaloneTargeter->setFindHostReturnValue(shardTarget); + targeterFactory()->addTargeterToReturn(ConnectionString(shardTarget), + std::move(standaloneTargeter)); - targeterFactory()->addTargeterToReturn(ConnectionString(shardTarget), std::move(targeter)); - std::string expectedShardName = "StandaloneShard"; + std::unique_ptr<RemoteCommandTargeterMock> replsetTargeter( + stdx::make_unique<RemoteCommandTargeterMock>()); + replsetTargeter->setConnectionStringReturnValue( + ConnectionString::forReplicaSet("mySet", {shardTarget})); + replsetTargeter->setFindHostReturnValue(shardTarget); + targeterFactory()->addTargeterToReturn(ConnectionString::forReplicaSet("mySet", {shardTarget}), + std::move(replsetTargeter)); - auto future = launchAsync([this, expectedShardName, shardTarget] { - auto status = - catalogManager()->addShard(operationContext(), - &expectedShardName, - assertGet(ConnectionString::parse(shardTarget.toString())), - 100); - ASSERT_EQUALS(ErrorCodes::OperationFailed, status); - ASSERT_STRING_CONTAINS(status.getStatus().reason(), - "is already a member of the existing shard"); - }); + std::string existingShardName = "myShard"; + ShardType existingShard; + existingShard.setName(existingShardName); + existingShard.setHost(shardTarget.toString()); + existingShard.setMaxSizeMB(100); + existingShard.setState(ShardType::ShardState::kShardAware); - ShardType shard; - shard.setName("shard0000"); - shard.setHost(shardTarget.toString()); - expectGetShards({shard}); + // Make sure the shard already exists. + ASSERT_OK(catalogClient()->insertConfigDocument(operationContext(), + ShardType::ConfigNS, + existingShard.toBSON(), + ShardingCatalogClient::kMajorityWriteConcern)); + assertShardExists(existingShard); - future.timed_get(kFutureTimeout); -} + // Adding the same host with a different shard name should fail. + std::string differentName = "anotherShardName"; + auto future1 = launchAsync([&] { + Client::initThreadIfNotAlready(); + ASSERT_EQUALS(ErrorCodes::IllegalOperation, + catalogManager()->addShard(operationContext(), + &differentName, + ConnectionString(shardTarget), + existingShard.getMaxSizeMB())); + }); + expectIsMaster(shardTarget, BSON("ok" << 1 << "ismaster" << true)); + future1.timed_get(kFutureTimeout); + // Ensure that the shard document was unchanged. + assertShardExists(existingShard); -// Host is already part of an existing replica set shard. -TEST_F(AddShardTest, AddExistingShardReplicaSet) { - std::unique_ptr<RemoteCommandTargeterMock> targeter( - stdx::make_unique<RemoteCommandTargeterMock>()); - ConnectionString connString = - assertGet(ConnectionString::parse("mySet/host1:12345,host2:12345")); - HostAndPort shardTarget = connString.getServers().front(); - targeter->setConnectionStringReturnValue(connString); - targeter->setFindHostReturnValue(shardTarget); + // Adding the same host with a different maxSize should fail. + auto future2 = launchAsync([&] { + Client::initThreadIfNotAlready(); + ASSERT_EQUALS(ErrorCodes::IllegalOperation, + catalogManager()->addShard(operationContext(), + nullptr, + ConnectionString(shardTarget), + existingShard.getMaxSizeMB() + 100)); + }); + expectIsMaster(shardTarget, BSON("ok" << 1 << "ismaster" << true)); + future2.timed_get(kFutureTimeout); - targeterFactory()->addTargeterToReturn(connString, std::move(targeter)); - std::string expectedShardName = "StandaloneShard"; + // Adding the same host but as part of a replica set should fail. + auto future3 = launchAsync([&] { + Client::initThreadIfNotAlready(); + ASSERT_EQUALS( + ErrorCodes::IllegalOperation, + catalogManager()->addShard(operationContext(), + nullptr, + ConnectionString::forReplicaSet("mySet", {shardTarget}), + existingShard.getMaxSizeMB())); + }); + // Make it get past the host validation check (even though if this *really* was a standalone + // it wouldn't report it was a replica set here and thus would fail the validation check) to + // ensure that even if the user changed the standalone shard to a single-node replica set, you + // can't change the sharded cluster's notion of the shard from standalone to replica set just + // by calling addShard. + expectIsMaster(shardTarget, + BSON("ok" << 1 << "ismaster" << true << "setName" + << "mySet" + << "hosts" + << BSON_ARRAY(shardTarget.toString()))); + future3.timed_get(kFutureTimeout); + + // Ensure that the shard document was unchanged. + assertShardExists(existingShard); - auto future = launchAsync([this, expectedShardName, connString] { - auto status = - catalogManager()->addShard(operationContext(), &expectedShardName, connString, 100); - ASSERT_EQUALS(ErrorCodes::OperationFailed, status); - ASSERT_STRING_CONTAINS(status.getStatus().reason(), - "is already a member of the existing shard"); + // Adding the same host with the same options should succeed. + auto future4 = launchAsync([&] { + Client::initThreadIfNotAlready(); + auto shardName = assertGet(catalogManager()->addShard(operationContext(), + &existingShardName, + ConnectionString(shardTarget), + existingShard.getMaxSizeMB())); + ASSERT_EQUALS(existingShardName, shardName); }); + expectIsMaster(shardTarget, BSON("ok" << 1 << "ismaster" << true)); + future4.timed_get(kFutureTimeout); - ShardType shard; - shard.setName("shard0000"); - shard.setHost(shardTarget.toString()); - expectGetShards({shard}); + // Ensure that the shard document was unchanged. + assertShardExists(existingShard); - future.timed_get(kFutureTimeout); + // Adding the same host with the same options (without explicitly specifying the shard name) + // should succeed. + auto future5 = launchAsync([&] { + Client::initThreadIfNotAlready(); + auto shardName = + assertGet(catalogManager()->addShard(operationContext(), + nullptr, // should auto-pick same name + ConnectionString(shardTarget), + existingShard.getMaxSizeMB())); + ASSERT_EQUALS(existingShardName, shardName); + }); + expectIsMaster(shardTarget, BSON("ok" << 1 << "ismaster" << true)); + future5.timed_get(kFutureTimeout); + + // Ensure that the shard document was unchanged. + assertShardExists(existingShard); } -// TODO(SERVER-24213): Test adding a new shard with an existing shard name, but different -// shard membership -TEST_F(AddShardTest, ReAddExistingShard) { - std::unique_ptr<RemoteCommandTargeterMock> targeter( +// Tests both that trying to add a shard with the same replica set as an existing shard but with +// different options fails, and that adding a shard with the same replica set as an existing shard +// with the *same* options succeeds. +TEST_F(AddShardTest, AddExistingShardReplicaSet) { + std::unique_ptr<RemoteCommandTargeterMock> replsetTargeter( stdx::make_unique<RemoteCommandTargeterMock>()); - ConnectionString connString = - assertGet(ConnectionString::parse("mySet/host1:12345,host2:12345")); - targeter->setConnectionStringReturnValue(connString); + ConnectionString connString = assertGet(ConnectionString::parse("mySet/host1:12345")); + replsetTargeter->setConnectionStringReturnValue(connString); HostAndPort shardTarget = connString.getServers().front(); - targeter->setFindHostReturnValue(shardTarget); + replsetTargeter->setFindHostReturnValue(shardTarget); + targeterFactory()->addTargeterToReturn(connString, std::move(replsetTargeter)); - targeterFactory()->addTargeterToReturn(connString, std::move(targeter)); - std::string expectedShardName = "mySet"; + std::string existingShardName = "myShard"; + ShardType existingShard; + existingShard.setName(existingShardName); + existingShard.setHost(connString.toString()); + existingShard.setMaxSizeMB(100); + existingShard.setState(ShardType::ShardState::kShardAware); - auto future = launchAsync([this, expectedShardName, connString] { - auto status = - catalogManager()->addShard(operationContext(), &expectedShardName, connString, 100); - ASSERT_OK(status); - }); + // Make sure the shard already exists. + ASSERT_OK(catalogClient()->insertConfigDocument(operationContext(), + ShardType::ConfigNS, + existingShard.toBSON(), + ShardingCatalogClient::kMajorityWriteConcern)); + assertShardExists(existingShard); - BSONArrayBuilder hosts; - hosts.append("host1:12345"); - hosts.append("host2:12345"); - BSONObj commandResponse = BSON("ok" << 1 << "ismaster" << true << "setName" - << "mySet" - << "hosts" - << hosts.arr()); - expectIsMaster(shardTarget, commandResponse); + BSONObj isMasterResponse = BSON("ok" << 1 << "ismaster" << true << "setName" + << "mySet" + << "hosts" + << BSON_ARRAY("host1:12345" + << "host2:12345")); - expectListDatabases(shardTarget, - {BSON("name" - << "shardDB")}); + // Adding the same connection string with a different shard name should fail. + std::string differentName = "anotherShardName"; + auto future1 = launchAsync([&] { + Client::initThreadIfNotAlready(); + ASSERT_EQUALS( + ErrorCodes::IllegalOperation, + catalogManager()->addShard( + operationContext(), &differentName, connString, existingShard.getMaxSizeMB())); + }); + expectIsMaster(shardTarget, isMasterResponse); + future1.timed_get(kFutureTimeout); - expectGetDatabase("shardDB", boost::none); + // Ensure that the shard document was unchanged. + assertShardExists(existingShard); - // The shardIdentity doc inserted into the admin.system.version collection on the shard. - expectShardIdentityUpsertReturnSuccess(shardTarget, expectedShardName); + // Adding the same connection string with a different maxSize should fail. + auto future2 = launchAsync([&] { + Client::initThreadIfNotAlready(); + ASSERT_EQUALS( + ErrorCodes::IllegalOperation, + catalogManager()->addShard( + operationContext(), nullptr, connString, existingShard.getMaxSizeMB() + 100)); + }); + expectIsMaster(shardTarget, isMasterResponse); + future2.timed_get(kFutureTimeout); - // The shard doc inserted into the config.shards collection on the config server. - ShardType newShard; - newShard.setName(expectedShardName); - newShard.setMaxSizeMB(100); - newShard.setHost(connString.toString()); - newShard.setState(ShardType::ShardState::kShardAware); + // Ensure that the shard document was unchanged. + assertShardExists(existingShard); - // When a shard with the same name already exists, the insert into config.shards will fail - // with a duplicate key error on the shard name. - onCommand([&newShard](const RemoteCommandRequest& request) { - BatchedInsertRequest actualBatchedInsert; - std::string errmsg; - ASSERT_TRUE(actualBatchedInsert.parseBSON(request.dbname, request.cmdObj, &errmsg)); + // Adding a connecting string with a host of an existing shard but using a different connection + // string type should fail + { + // Make sure we can target the request to the standalone server. + std::unique_ptr<RemoteCommandTargeterMock> standaloneTargeter( + stdx::make_unique<RemoteCommandTargeterMock>()); + standaloneTargeter->setConnectionStringReturnValue(ConnectionString(shardTarget)); + standaloneTargeter->setFindHostReturnValue(shardTarget); + targeterFactory()->addTargeterToReturn(ConnectionString(shardTarget), + std::move(standaloneTargeter)); + } + auto future3 = launchAsync([&] { + Client::initThreadIfNotAlready(); + ASSERT_EQUALS(ErrorCodes::IllegalOperation, + catalogManager()->addShard(operationContext(), + nullptr, + ConnectionString(shardTarget), + existingShard.getMaxSizeMB())); + }); + // Make it get past the host validation check (even though if this *really* was a replica set + // it would report it was a replica set here and thus would fail the validation check) to + // ensure that even if the user changed the replica set shard to a standalone, you + // can't change the sharded cluster's notion of the shard from replica set to standalone just + // by calling addShard. + expectIsMaster(shardTarget, BSON("ok" << 1 << "ismaster" << true)); + future3.timed_get(kFutureTimeout); + + // Ensure that the shard document was unchanged. + assertShardExists(existingShard); - ASSERT_EQUALS(ShardType::ConfigNS, actualBatchedInsert.getNS().toString()); + // Adding a connecting string with the same hosts but a different replica set name should fail. + std::string differentSetName = "differentSet"; + { + // Add a targeter with the new replica set name so the validation check can be targeted and + // run properly. + std::unique_ptr<RemoteCommandTargeterMock> differentRSNameTargeter( + stdx::make_unique<RemoteCommandTargeterMock>()); + ConnectionString differentRSConnString = + ConnectionString::forReplicaSet(differentSetName, connString.getServers()); + differentRSNameTargeter->setConnectionStringReturnValue(differentRSConnString); + differentRSNameTargeter->setFindHostReturnValue(shardTarget); + targeterFactory()->addTargeterToReturn(differentRSConnString, + std::move(differentRSNameTargeter)); + } + auto future4 = launchAsync([&] { + Client::initThreadIfNotAlready(); + ASSERT_EQUALS(ErrorCodes::IllegalOperation, + catalogManager()->addShard(operationContext(), + nullptr, + ConnectionString::forReplicaSet( + differentSetName, connString.getServers()), + existingShard.getMaxSizeMB())); + }); + BSONObj differentRSIsMasterResponse = + BSON("ok" << 1 << "ismaster" << true << "setName" << differentSetName << "hosts" + << BSON_ARRAY("host1:12345" + << "host2:12345")); + // Make it get past the validation check (even though if you really tried to add a replica set + // with the wrong name it would report the other name in the ismaster response here and thus + // would fail the validation check) to ensure that even if you manually change the shard's + // replica set name somehow, you can't change the replica set name the sharded cluster knows + // for it just by calling addShard again. + expectIsMaster(shardTarget, differentRSIsMasterResponse); + future4.timed_get(kFutureTimeout); + + // Ensure that the shard document was unchanged. + assertShardExists(existingShard); - auto inserted = actualBatchedInsert.getDocuments(); - ASSERT_EQUALS(1U, inserted.size()); + // Adding the same host with the same options should succeed. + auto future5 = launchAsync([&] { + Client::initThreadIfNotAlready(); + auto shardName = assertGet(catalogManager()->addShard( + operationContext(), &existingShardName, connString, existingShard.getMaxSizeMB())); + ASSERT_EQUALS(existingShardName, shardName); + }); + expectIsMaster(shardTarget, isMasterResponse); + future5.timed_get(kFutureTimeout); - ASSERT_EQ(newShard.toBSON(), inserted.front()); + // Adding the same host with the same options (without explicitly specifying the shard name) + // should succeed. + auto future6 = launchAsync([&] { + Client::initThreadIfNotAlready(); + auto shardName = assertGet(catalogManager()->addShard( + operationContext(), nullptr, connString, existingShard.getMaxSizeMB())); + ASSERT_EQUALS(existingShardName, shardName); + }); + expectIsMaster(shardTarget, isMasterResponse); + future6.timed_get(kFutureTimeout); - BatchedCommandResponse response; - response.setOk(false); - response.setErrCode(ErrorCodes::DuplicateKey); - response.setErrMessage("E11000 duplicate key error collection: config.shards"); + // Ensure that the shard document was unchanged. + assertShardExists(existingShard); - return response.toBSON(); + // Adding the same replica set but different host membership (but otherwise the same options) + // should succeed + auto otherHost = connString.getServers().back(); + ConnectionString otherHostConnString = assertGet(ConnectionString::parse("mySet/host2:12345")); + { + // Add a targeter for the different seed string this addShard request will use. + std::unique_ptr<RemoteCommandTargeterMock> otherHostTargeter( + stdx::make_unique<RemoteCommandTargeterMock>()); + otherHostTargeter->setConnectionStringReturnValue(otherHostConnString); + otherHostTargeter->setFindHostReturnValue(otherHost); + targeterFactory()->addTargeterToReturn(otherHostConnString, std::move(otherHostTargeter)); + } + auto future7 = launchAsync([&] { + Client::initThreadIfNotAlready(); + auto shardName = assertGet(catalogManager()->addShard( + operationContext(), nullptr, otherHostConnString, existingShard.getMaxSizeMB())); + ASSERT_EQUALS(existingShardName, shardName); }); + expectIsMaster(otherHost, isMasterResponse); + future7.timed_get(kFutureTimeout); - future.timed_get(kFutureTimeout); + // Ensure that the shard document was unchanged. + assertShardExists(existingShard); } -*/ } // namespace } // namespace mongo diff --git a/src/mongo/s/catalog/replset/sharding_catalog_client_impl.cpp b/src/mongo/s/catalog/replset/sharding_catalog_client_impl.cpp index daa0dfcd49f..6cc5c791eb9 100644 --- a/src/mongo/s/catalog/replset/sharding_catalog_client_impl.cpp +++ b/src/mongo/s/catalog/replset/sharding_catalog_client_impl.cpp @@ -648,6 +648,7 @@ StatusWith<repl::OpTimeWith<DatabaseType>> ShardingCatalogClientImpl::_fetchData auto findStatus = _exhaustiveFindOnConfig(txn, readPref, + repl::ReadConcernLevel::kMajorityReadConcern, NamespaceString(DatabaseType::ConfigNS), BSON(DatabaseType::name(dbName)), BSONObj(), @@ -675,6 +676,7 @@ StatusWith<repl::OpTimeWith<CollectionType>> ShardingCatalogClientImpl::getColle OperationContext* txn, const std::string& collNs) { auto statusFind = _exhaustiveFindOnConfig(txn, kConfigReadSelector, + repl::ReadConcernLevel::kMajorityReadConcern, NamespaceString(CollectionType::ConfigNS), BSON(CollectionType::fullNs(collNs)), BSONObj(), @@ -713,6 +715,7 @@ Status ShardingCatalogClientImpl::getCollections(OperationContext* txn, auto findStatus = _exhaustiveFindOnConfig(txn, kConfigReadSelector, + repl::ReadConcernLevel::kMajorityReadConcern, NamespaceString(CollectionType::ConfigNS), b.obj(), BSONObj(), @@ -748,7 +751,7 @@ Status ShardingCatalogClientImpl::getCollections(OperationContext* txn, Status ShardingCatalogClientImpl::dropCollection(OperationContext* txn, const NamespaceString& ns) { logChange(txn, "dropCollection.start", ns.ns(), BSONObj()); - auto shardsStatus = getAllShards(txn); + auto shardsStatus = getAllShards(txn, repl::ReadConcernLevel::kMajorityReadConcern); if (!shardsStatus.isOK()) { return shardsStatus.getStatus(); } @@ -908,8 +911,13 @@ Status ShardingCatalogClientImpl::dropCollection(OperationContext* txn, const Na StatusWith<BSONObj> ShardingCatalogClientImpl::getGlobalSettings(OperationContext* txn, StringData key) { - auto findStatus = _exhaustiveFindOnConfig( - txn, kConfigReadSelector, kSettingsNamespace, BSON("_id" << key), BSONObj(), 1); + auto findStatus = _exhaustiveFindOnConfig(txn, + kConfigReadSelector, + repl::ReadConcernLevel::kMajorityReadConcern, + kSettingsNamespace, + BSON("_id" << key), + BSONObj(), + 1); if (!findStatus.isOK()) { return findStatus.getStatus(); } @@ -976,6 +984,7 @@ Status ShardingCatalogClientImpl::getDatabasesForShard(OperationContext* txn, vector<string>* dbs) { auto findStatus = _exhaustiveFindOnConfig(txn, kConfigReadSelector, + repl::ReadConcernLevel::kMajorityReadConcern, NamespaceString(DatabaseType::ConfigNS), BSON(DatabaseType::primary(shardId.toString())), BSONObj(), @@ -1008,8 +1017,13 @@ Status ShardingCatalogClientImpl::getChunks(OperationContext* txn, // Convert boost::optional<int> to boost::optional<long long>. auto longLimit = limit ? boost::optional<long long>(*limit) : boost::none; - auto findStatus = _exhaustiveFindOnConfig( - txn, kConfigReadSelector, NamespaceString(ChunkType::ConfigNS), query, sort, longLimit); + auto findStatus = _exhaustiveFindOnConfig(txn, + kConfigReadSelector, + repl::ReadConcernLevel::kMajorityReadConcern, + NamespaceString(ChunkType::ConfigNS), + query, + sort, + longLimit); if (!findStatus.isOK()) { return findStatus.getStatus(); } @@ -1043,6 +1057,7 @@ Status ShardingCatalogClientImpl::getTagsForCollection(OperationContext* txn, auto findStatus = _exhaustiveFindOnConfig(txn, kConfigReadSelector, + repl::ReadConcernLevel::kMajorityReadConcern, NamespaceString(TagsType::ConfigNS), BSON(TagsType::ns(collectionNs)), BSON(TagsType::min() << 1), @@ -1072,8 +1087,13 @@ StatusWith<string> ShardingCatalogClientImpl::getTagForChunk(OperationContext* t BSON(TagsType::ns(collectionNs) << TagsType::min() << BSON("$lte" << chunk.getMin()) << TagsType::max() << BSON("$gte" << chunk.getMax())); - auto findStatus = _exhaustiveFindOnConfig( - txn, kConfigReadSelector, NamespaceString(TagsType::ConfigNS), query, BSONObj(), 1); + auto findStatus = _exhaustiveFindOnConfig(txn, + kConfigReadSelector, + repl::ReadConcernLevel::kMajorityReadConcern, + NamespaceString(TagsType::ConfigNS), + query, + BSONObj(), + 1); if (!findStatus.isOK()) { return findStatus.getStatus(); } @@ -1097,10 +1117,11 @@ StatusWith<string> ShardingCatalogClientImpl::getTagForChunk(OperationContext* t } StatusWith<repl::OpTimeWith<std::vector<ShardType>>> ShardingCatalogClientImpl::getAllShards( - OperationContext* txn) { + OperationContext* txn, repl::ReadConcernLevel readConcern) { std::vector<ShardType> shards; auto findStatus = _exhaustiveFindOnConfig(txn, kConfigReadSelector, + readConcern, NamespaceString(ShardType::ConfigNS), BSONObj(), // no query filter BSONObj(), // no sort @@ -1380,6 +1401,7 @@ Status ShardingCatalogClientImpl::insertConfigDocument(OperationContext* txn, auto fetchDuplicate = _exhaustiveFindOnConfig(txn, ReadPreferenceSetting{ReadPreference::PrimaryOnly}, + repl::ReadConcernLevel::kMajorityReadConcern, nss, idField.wrap(), BSONObj(), @@ -1486,6 +1508,7 @@ Status ShardingCatalogClientImpl::_checkDbDoesNotExist(OperationContext* txn, auto findStatus = _exhaustiveFindOnConfig(txn, kConfigReadSelector, + repl::ReadConcernLevel::kMajorityReadConcern, NamespaceString(DatabaseType::ConfigNS), queryBuilder.obj(), BSONObj(), @@ -1593,12 +1616,13 @@ StatusWith<long long> ShardingCatalogClientImpl::_runCountCommandOnConfig(Operat StatusWith<repl::OpTimeWith<vector<BSONObj>>> ShardingCatalogClientImpl::_exhaustiveFindOnConfig( OperationContext* txn, const ReadPreferenceSetting& readPref, + repl::ReadConcernLevel readConcern, const NamespaceString& nss, const BSONObj& query, const BSONObj& sort, boost::optional<long long> limit) { auto response = Grid::get(txn)->shardRegistry()->getConfigShard()->exhaustiveFindOnConfig( - txn, readPref, repl::ReadConcernLevel::kMajorityReadConcern, nss, query, sort, limit); + txn, readPref, readConcern, nss, query, sort, limit); if (!response.isOK()) { return response.getStatus(); } diff --git a/src/mongo/s/catalog/replset/sharding_catalog_client_impl.h b/src/mongo/s/catalog/replset/sharding_catalog_client_impl.h index 614e98c36e2..72e000db216 100644 --- a/src/mongo/s/catalog/replset/sharding_catalog_client_impl.h +++ b/src/mongo/s/catalog/replset/sharding_catalog_client_impl.h @@ -131,7 +131,7 @@ public: const ChunkType& chunk) override; StatusWith<repl::OpTimeWith<std::vector<ShardType>>> getAllShards( - OperationContext* txn) override; + OperationContext* txn, repl::ReadConcernLevel readConcern) override; bool runUserManagementWriteCommand(OperationContext* txn, const std::string& commandName, @@ -227,6 +227,7 @@ private: StatusWith<repl::OpTimeWith<std::vector<BSONObj>>> _exhaustiveFindOnConfig( OperationContext* txn, const ReadPreferenceSetting& readPref, + repl::ReadConcernLevel readConcern, const NamespaceString& nss, const BSONObj& query, const BSONObj& sort, diff --git a/src/mongo/s/catalog/replset/sharding_catalog_manager_impl.cpp b/src/mongo/s/catalog/replset/sharding_catalog_manager_impl.cpp index 80bd6c81da8..ee81e49eb6f 100644 --- a/src/mongo/s/catalog/replset/sharding_catalog_manager_impl.cpp +++ b/src/mongo/s/catalog/replset/sharding_catalog_manager_impl.cpp @@ -94,12 +94,13 @@ const ReadPreferenceSetting kConfigReadSelector(ReadPreference::Nearest, TagSet{ const ReadPreferenceSetting kConfigPrimarySelector(ReadPreference::PrimaryOnly); const WriteConcernOptions kNoWaitWriteConcern(1, WriteConcernOptions::SyncMode::UNSET, Seconds(0)); -void toBatchError(const Status& status, BatchedCommandResponse* response) { - response->clear(); - response->setErrCode(status.code()); - response->setErrMessage(status.reason()); - response->setOk(false); -} +/** + * Lock that guards changes to the set of shards in the cluster (ie addShard and removeShard + * requests). + * TODO: Currently only taken during addShard requests, this should also be taken in X mode during + * removeShard, once removeShard is moved to run on the config server primary instead of on mongos. + */ +Lock::ResourceMutex kShardMembershipLock; /** * Lock for shard zoning operations. This should be acquired when doing any operations that @@ -300,26 +301,104 @@ StatusWith<Shard::CommandResponse> ShardingCatalogManagerImpl::_runCommandForAdd std::move(writeConcernStatus)); } +StatusWith<boost::optional<ShardType>> ShardingCatalogManagerImpl::_checkIfShardExists( + OperationContext* txn, + const ConnectionString& proposedShardConnectionString, + const std::string* proposedShardName, + long long proposedShardMaxSize) { + // Check whether any host in the connection is already part of the cluster. + const auto existingShards = + _catalogClient->getAllShards(txn, repl::ReadConcernLevel::kLocalReadConcern); + if (!existingShards.isOK()) { + return Status(existingShards.getStatus().code(), + str::stream() << "Failed to load existing shards during addShard" + << causedBy(existingShards.getStatus().reason())); + } + + // Now check if this shard already exists - if it already exists *with the same options* then + // the addShard request can return success early without doing anything more. + for (const auto& existingShard : existingShards.getValue().value) { + auto swExistingShardConnStr = ConnectionString::parse(existingShard.getHost()); + if (!swExistingShardConnStr.isOK()) { + return swExistingShardConnStr.getStatus(); + } + auto existingShardConnStr = std::move(swExistingShardConnStr.getValue()); + // Function for determining if the options for the shard that is being added match the + // options of an existing shard that conflicts with it. + auto shardsAreEquivalent = [&]() { + if (proposedShardName && *proposedShardName != existingShard.getName()) { + return false; + } + if (proposedShardConnectionString.type() != existingShardConnStr.type()) { + return false; + } + if (proposedShardConnectionString.type() == ConnectionString::SET && + proposedShardConnectionString.getSetName() != existingShardConnStr.getSetName()) { + return false; + } + if (proposedShardMaxSize != existingShard.getMaxSizeMB()) { + return false; + } + return true; + }; + + if (existingShardConnStr.type() == ConnectionString::SET && + proposedShardConnectionString.type() == ConnectionString::SET && + existingShardConnStr.getSetName() == proposedShardConnectionString.getSetName()) { + // An existing shard has the same replica set name as the shard being added. + // If the options aren't the same, then this is an error, + // but if the options match then the addShard operation should be immediately + // considered a success and terminated. + if (shardsAreEquivalent()) { + return {existingShard}; + } else { + return {ErrorCodes::IllegalOperation, + str::stream() << "A shard already exists containing the replica set '" + << existingShardConnStr.getSetName() + << "'"}; + } + } + + for (const auto& existingHost : existingShardConnStr.getServers()) { + // Look if any of the hosts in the existing shard are present within the shard trying + // to be added. + for (const auto& addingHost : proposedShardConnectionString.getServers()) { + if (existingHost == addingHost) { + // At least one of the hosts in the shard being added already exists in an + // existing shard. If the options aren't the same, then this is an error, + // but if the options match then the addShard operation should be immediately + // considered a success and terminated. + if (shardsAreEquivalent()) { + return {existingShard}; + } else { + return {ErrorCodes::IllegalOperation, + str::stream() << "'" << addingHost.toString() << "' " + << "is already a member of the existing shard '" + << existingShard.getHost() + << "' (" + << existingShard.getName() + << ")."}; + } + } + } + } + if (proposedShardName && *proposedShardName == existingShard.getName()) { + // If we get here then we're trying to add a shard with the same name as an existing + // shard, but there was no overlap in the hosts between the existing shard and the + // proposed connection string for the new shard. + return Status(ErrorCodes::IllegalOperation, + str::stream() << "A shard named " << *proposedShardName + << " already exists"); + } + } + return {boost::none}; +} + StatusWith<ShardType> ShardingCatalogManagerImpl::_validateHostAsShard( OperationContext* txn, std::shared_ptr<RemoteCommandTargeter> targeter, const std::string* shardProposedName, const ConnectionString& connectionString) { - // Check whether any host in the connection is already part of the cluster. - Grid::get(txn)->shardRegistry()->reload(txn); - for (const auto& hostAndPort : connectionString.getServers()) { - std::shared_ptr<Shard> shard; - shard = Grid::get(txn)->shardRegistry()->getShardNoReload(hostAndPort.toString()); - if (shard) { - return {ErrorCodes::OperationFailed, - str::stream() << "'" << hostAndPort.toString() << "' " - << "is already a member of the existing shard '" - << shard->getConnString().toString() - << "' (" - << shard->getId() - << ")."}; - } - } // Check for mongos and older version mongod connections, and whether the hosts // can be found for the user specified replset. @@ -556,6 +635,9 @@ StatusWith<string> ShardingCatalogManagerImpl::addShard( return {ErrorCodes::BadValue, "shard name cannot be empty"}; } + // Only one addShard operation can be in progress at a time. + Lock::ExclusiveLock lk(txn->lockState(), kZoneOpLock); + // TODO: Don't create a detached Shard object, create a detached RemoteCommandTargeter instead. const std::shared_ptr<Shard> shard{ Grid::get(txn)->shardRegistry()->createConnection(shardConnectionString)}; @@ -578,15 +660,29 @@ StatusWith<string> ShardingCatalogManagerImpl::addShard( if (!shardStatus.isOK()) { return shardStatus.getStatus(); } - ShardType& shardType = shardStatus.getValue(); + + // Check if this shard has already been added (can happen in the case of a retry after a network + // error, for example) and thus this addShard request should be considered a no-op. + auto existingShard = + _checkIfShardExists(txn, shardConnectionString, shardProposedName, maxSize); + if (!existingShard.isOK()) { + return existingShard.getStatus(); + } + if (existingShard.getValue()) { + // These hosts already belong to an existing shard, so report success and terminate the + // addShard request. + return existingShard.getValue()->getName(); + } + + + // Check that none of the existing shard candidate's dbs exist already auto dbNamesStatus = _getDBNamesListFromShard(txn, targeter); if (!dbNamesStatus.isOK()) { return dbNamesStatus.getStatus(); } - // Check that none of the existing shard candidate's dbs exist already for (const string& dbName : dbNamesStatus.getValue()) { auto dbt = _catalogClient->getDatabase(txn, dbName); if (dbt.isOK()) { @@ -642,19 +738,6 @@ StatusWith<string> ShardingCatalogManagerImpl::addShard( txn, ShardType::ConfigNS, shardType.toBSON(), ShardingCatalogClient::kMajorityWriteConcern); if (!result.isOK()) { log() << "error adding shard: " << shardType.toBSON() << " err: " << result.reason(); - if (result == ErrorCodes::DuplicateKey) { - // TODO(SERVER-24213): adding a shard that already exists should be considered success, - // however this approach does no validation that we are adding the shard with the same - // options. It also does not protect against adding the same shard with a different - // shard name and slightly different connection string. This is a temporary hack to - // get the continuous stepdown suite passing. - warning() << "Received duplicate key error when inserting new shard with name " - << shardType.getName() << " and connection string " - << shardConnectionString.toString() - << " to config.shards collection. This most likely means that there was an " - "attempt to add a shard that already exists in the cluster"; - return shardType.getName(); - } return result; } diff --git a/src/mongo/s/catalog/replset/sharding_catalog_manager_impl.h b/src/mongo/s/catalog/replset/sharding_catalog_manager_impl.h index ec1cf3841c9..9b261906ecf 100644 --- a/src/mongo/s/catalog/replset/sharding_catalog_manager_impl.h +++ b/src/mongo/s/catalog/replset/sharding_catalog_manager_impl.h @@ -106,6 +106,22 @@ private: StatusWith<std::string> _generateNewShardName(OperationContext* txn); /** + * Used during addShard to determine if there is already an existing shard that matches the + * shard that is currently being added. An OK return with boost::none indicates that there + * is no conflicting shard, and we can proceed trying to add the new shard. An OK return + * with a ShardType indicates that there is an existing shard that matches the shard being added + * but since the options match, this addShard request can do nothing and return success. A + * non-OK return either indicates a problem reading the existing shards from disk or more likely + * indicates that an existing shard conflicts with the shard being added and they have different + * options, so the addShard attempt must be aborted. + */ + StatusWith<boost::optional<ShardType>> _checkIfShardExists( + OperationContext* txn, + const ConnectionString& propsedShardConnectionString, + const std::string* shardProposedName, + long long maxSize); + + /** * Validates that the specified endpoint can serve as a shard server. In particular, this * this function checks that the shard can be contacted and that it is not already member of * another sharded cluster. diff --git a/src/mongo/s/catalog/replset/sharding_catalog_test.cpp b/src/mongo/s/catalog/replset/sharding_catalog_test.cpp index e4d06494d1b..bff745ffb78 100644 --- a/src/mongo/s/catalog/replset/sharding_catalog_test.cpp +++ b/src/mongo/s/catalog/replset/sharding_catalog_test.cpp @@ -389,7 +389,8 @@ TEST_F(ShardingCatalogClientTest, GetAllShardsValid) { const vector<ShardType> expectedShardsList = {s1, s2, s3}; auto future = launchAsync([this] { - auto shards = assertGet(catalogClient()->getAllShards(operationContext())); + auto shards = assertGet(catalogClient()->getAllShards( + operationContext(), repl::ReadConcernLevel::kMajorityReadConcern)); return shards.value; }); @@ -423,7 +424,8 @@ TEST_F(ShardingCatalogClientTest, GetAllShardsWithInvalidShard) { configTargeter()->setFindHostReturnValue(HostAndPort("TestHost1")); auto future = launchAsync([this] { - auto status = catalogClient()->getAllShards(operationContext()); + auto status = catalogClient()->getAllShards(operationContext(), + repl::ReadConcernLevel::kMajorityReadConcern); ASSERT_EQ(ErrorCodes::FailedToParse, status.getStatus()); }); diff --git a/src/mongo/s/catalog/sharding_catalog_client.h b/src/mongo/s/catalog/sharding_catalog_client.h index fe62753b249..02de97a4f34 100644 --- a/src/mongo/s/catalog/sharding_catalog_client.h +++ b/src/mongo/s/catalog/sharding_catalog_client.h @@ -274,7 +274,7 @@ public: * Returns a !OK status if an error occurs. */ virtual StatusWith<repl::OpTimeWith<std::vector<ShardType>>> getAllShards( - OperationContext* txn) = 0; + OperationContext* txn, repl::ReadConcernLevel readConcern) = 0; /** * Runs a user management command on the config servers, potentially synchronizing through diff --git a/src/mongo/s/catalog/sharding_catalog_client_mock.cpp b/src/mongo/s/catalog/sharding_catalog_client_mock.cpp index 308bf4aeb7d..9206eb1d0e1 100644 --- a/src/mongo/s/catalog/sharding_catalog_client_mock.cpp +++ b/src/mongo/s/catalog/sharding_catalog_client_mock.cpp @@ -135,7 +135,7 @@ StatusWith<string> ShardingCatalogClientMock::getTagForChunk(OperationContext* t } StatusWith<repl::OpTimeWith<std::vector<ShardType>>> ShardingCatalogClientMock::getAllShards( - OperationContext* txn) { + OperationContext* txn, repl::ReadConcernLevel readConcern) { return {ErrorCodes::InternalError, "Method not implemented"}; } diff --git a/src/mongo/s/catalog/sharding_catalog_client_mock.h b/src/mongo/s/catalog/sharding_catalog_client_mock.h index 14fb1e87d1d..8c37896fbbc 100644 --- a/src/mongo/s/catalog/sharding_catalog_client_mock.h +++ b/src/mongo/s/catalog/sharding_catalog_client_mock.h @@ -99,7 +99,7 @@ public: const ChunkType& chunk) override; StatusWith<repl::OpTimeWith<std::vector<ShardType>>> getAllShards( - OperationContext* txn) override; + OperationContext* txn, repl::ReadConcernLevel readConcern) override; bool runUserManagementWriteCommand(OperationContext* txn, const std::string& commandName, diff --git a/src/mongo/s/client/shard_registry.cpp b/src/mongo/s/client/shard_registry.cpp index 5f983493e05..9a951bae5f9 100644 --- a/src/mongo/s/client/shard_registry.cpp +++ b/src/mongo/s/client/shard_registry.cpp @@ -302,7 +302,8 @@ ShardRegistryData::ShardRegistryData(OperationContext* txn, ShardFactory* shardF } void ShardRegistryData::_init(OperationContext* txn, ShardFactory* shardFactory) { - auto shardsStatus = grid.catalogClient(txn)->getAllShards(txn); + auto shardsStatus = + grid.catalogClient(txn)->getAllShards(txn, repl::ReadConcernLevel::kMajorityReadConcern); if (!shardsStatus.isOK()) { uasserted(shardsStatus.getStatus().code(), diff --git a/src/mongo/s/commands/cluster_list_shards_cmd.cpp b/src/mongo/s/commands/cluster_list_shards_cmd.cpp index 9deee999e12..8d3b19ad8c8 100644 --- a/src/mongo/s/commands/cluster_list_shards_cmd.cpp +++ b/src/mongo/s/commands/cluster_list_shards_cmd.cpp @@ -74,7 +74,8 @@ public: int options, std::string& errmsg, BSONObjBuilder& result) { - auto shardsStatus = grid.catalogClient(txn)->getAllShards(txn); + auto shardsStatus = grid.catalogClient(txn)->getAllShards( + txn, repl::ReadConcernLevel::kMajorityReadConcern); if (!shardsStatus.isOK()) { return appendCommandStatus(result, shardsStatus.getStatus()); } |