summaryrefslogtreecommitdiff
path: root/src/mongo/s
diff options
context:
space:
mode:
authorSpencer T Brody <spencer@mongodb.com>2016-07-18 14:45:04 -0400
committerSpencer T Brody <spencer@mongodb.com>2016-08-08 17:51:29 -0400
commit9597d67f9640b32fad2264c49da4ba0bd271d395 (patch)
treed8c1d1d219924e5d4dd4f7730600c6591350d299 /src/mongo/s
parent1257f4f5cc2c5a09383c1ddcd3d03bc83809ca18 (diff)
downloadmongo-9597d67f9640b32fad2264c49da4ba0bd271d395.tar.gz
SERVER-24213 Make addShard return success if attempting to add an already existing shard
Diffstat (limited to 'src/mongo/s')
-rw-r--r--src/mongo/s/balancer/cluster_statistics_impl.cpp3
-rw-r--r--src/mongo/s/catalog/replset/sharding_catalog_add_shard_test.cpp363
-rw-r--r--src/mongo/s/catalog/replset/sharding_catalog_client_impl.cpp42
-rw-r--r--src/mongo/s/catalog/replset/sharding_catalog_client_impl.h3
-rw-r--r--src/mongo/s/catalog/replset/sharding_catalog_manager_impl.cpp155
-rw-r--r--src/mongo/s/catalog/replset/sharding_catalog_manager_impl.h16
-rw-r--r--src/mongo/s/catalog/replset/sharding_catalog_test.cpp6
-rw-r--r--src/mongo/s/catalog/sharding_catalog_client.h2
-rw-r--r--src/mongo/s/catalog/sharding_catalog_client_mock.cpp2
-rw-r--r--src/mongo/s/catalog/sharding_catalog_client_mock.h2
-rw-r--r--src/mongo/s/client/shard_registry.cpp3
-rw-r--r--src/mongo/s/commands/cluster_list_shards_cmd.cpp3
12 files changed, 447 insertions, 153 deletions
diff --git a/src/mongo/s/balancer/cluster_statistics_impl.cpp b/src/mongo/s/balancer/cluster_statistics_impl.cpp
index c5a40a8b742..01993f1a938 100644
--- a/src/mongo/s/balancer/cluster_statistics_impl.cpp
+++ b/src/mongo/s/balancer/cluster_statistics_impl.cpp
@@ -104,7 +104,8 @@ StatusWith<vector<ShardStatistics>> ClusterStatisticsImpl::getStats(OperationCon
// db.serverStatus() (mem.mapped) to all shards.
//
// TODO: skip unresponsive shards and mark information as stale.
- auto shardsStatus = Grid::get(txn)->catalogClient(txn)->getAllShards(txn);
+ auto shardsStatus = Grid::get(txn)->catalogClient(txn)->getAllShards(
+ txn, repl::ReadConcernLevel::kMajorityReadConcern);
if (!shardsStatus.isOK()) {
return shardsStatus.getStatus();
}
diff --git a/src/mongo/s/catalog/replset/sharding_catalog_add_shard_test.cpp b/src/mongo/s/catalog/replset/sharding_catalog_add_shard_test.cpp
index 63934c8a58e..691d8dc76b0 100644
--- a/src/mongo/s/catalog/replset/sharding_catalog_add_shard_test.cpp
+++ b/src/mongo/s/catalog/replset/sharding_catalog_add_shard_test.cpp
@@ -1354,138 +1354,303 @@ TEST_F(AddShardTest, CompatibilityAddShardCancelRescheduledCallbackReAddShard) {
assertShardExists(addedShard);
}
-/*
-TODO(SERVER-24213): Add back tests around adding shard that already exists.
-// Host is already part of an existing shard.
+// Tests both that trying to add a shard with the same host as an existing shard but with different
+// options fails, and that adding a shard with the same host as an existing shard with the *same*
+// options succeeds.
TEST_F(AddShardTest, AddExistingShardStandalone) {
- std::unique_ptr<RemoteCommandTargeterMock> targeter(
+ HostAndPort shardTarget("StandaloneHost:12345");
+ std::unique_ptr<RemoteCommandTargeterMock> standaloneTargeter(
stdx::make_unique<RemoteCommandTargeterMock>());
- HostAndPort shardTarget = HostAndPort("host1:12345");
- targeter->setConnectionStringReturnValue(ConnectionString(shardTarget));
- targeter->setFindHostReturnValue(shardTarget);
+ standaloneTargeter->setConnectionStringReturnValue(ConnectionString(shardTarget));
+ standaloneTargeter->setFindHostReturnValue(shardTarget);
+ targeterFactory()->addTargeterToReturn(ConnectionString(shardTarget),
+ std::move(standaloneTargeter));
- targeterFactory()->addTargeterToReturn(ConnectionString(shardTarget), std::move(targeter));
- std::string expectedShardName = "StandaloneShard";
+ std::unique_ptr<RemoteCommandTargeterMock> replsetTargeter(
+ stdx::make_unique<RemoteCommandTargeterMock>());
+ replsetTargeter->setConnectionStringReturnValue(
+ ConnectionString::forReplicaSet("mySet", {shardTarget}));
+ replsetTargeter->setFindHostReturnValue(shardTarget);
+ targeterFactory()->addTargeterToReturn(ConnectionString::forReplicaSet("mySet", {shardTarget}),
+ std::move(replsetTargeter));
- auto future = launchAsync([this, expectedShardName, shardTarget] {
- auto status =
- catalogManager()->addShard(operationContext(),
- &expectedShardName,
- assertGet(ConnectionString::parse(shardTarget.toString())),
- 100);
- ASSERT_EQUALS(ErrorCodes::OperationFailed, status);
- ASSERT_STRING_CONTAINS(status.getStatus().reason(),
- "is already a member of the existing shard");
- });
+ std::string existingShardName = "myShard";
+ ShardType existingShard;
+ existingShard.setName(existingShardName);
+ existingShard.setHost(shardTarget.toString());
+ existingShard.setMaxSizeMB(100);
+ existingShard.setState(ShardType::ShardState::kShardAware);
- ShardType shard;
- shard.setName("shard0000");
- shard.setHost(shardTarget.toString());
- expectGetShards({shard});
+ // Make sure the shard already exists.
+ ASSERT_OK(catalogClient()->insertConfigDocument(operationContext(),
+ ShardType::ConfigNS,
+ existingShard.toBSON(),
+ ShardingCatalogClient::kMajorityWriteConcern));
+ assertShardExists(existingShard);
- future.timed_get(kFutureTimeout);
-}
+ // Adding the same host with a different shard name should fail.
+ std::string differentName = "anotherShardName";
+ auto future1 = launchAsync([&] {
+ Client::initThreadIfNotAlready();
+ ASSERT_EQUALS(ErrorCodes::IllegalOperation,
+ catalogManager()->addShard(operationContext(),
+ &differentName,
+ ConnectionString(shardTarget),
+ existingShard.getMaxSizeMB()));
+ });
+ expectIsMaster(shardTarget, BSON("ok" << 1 << "ismaster" << true));
+ future1.timed_get(kFutureTimeout);
+ // Ensure that the shard document was unchanged.
+ assertShardExists(existingShard);
-// Host is already part of an existing replica set shard.
-TEST_F(AddShardTest, AddExistingShardReplicaSet) {
- std::unique_ptr<RemoteCommandTargeterMock> targeter(
- stdx::make_unique<RemoteCommandTargeterMock>());
- ConnectionString connString =
- assertGet(ConnectionString::parse("mySet/host1:12345,host2:12345"));
- HostAndPort shardTarget = connString.getServers().front();
- targeter->setConnectionStringReturnValue(connString);
- targeter->setFindHostReturnValue(shardTarget);
+ // Adding the same host with a different maxSize should fail.
+ auto future2 = launchAsync([&] {
+ Client::initThreadIfNotAlready();
+ ASSERT_EQUALS(ErrorCodes::IllegalOperation,
+ catalogManager()->addShard(operationContext(),
+ nullptr,
+ ConnectionString(shardTarget),
+ existingShard.getMaxSizeMB() + 100));
+ });
+ expectIsMaster(shardTarget, BSON("ok" << 1 << "ismaster" << true));
+ future2.timed_get(kFutureTimeout);
- targeterFactory()->addTargeterToReturn(connString, std::move(targeter));
- std::string expectedShardName = "StandaloneShard";
+ // Adding the same host but as part of a replica set should fail.
+ auto future3 = launchAsync([&] {
+ Client::initThreadIfNotAlready();
+ ASSERT_EQUALS(
+ ErrorCodes::IllegalOperation,
+ catalogManager()->addShard(operationContext(),
+ nullptr,
+ ConnectionString::forReplicaSet("mySet", {shardTarget}),
+ existingShard.getMaxSizeMB()));
+ });
+ // Make it get past the host validation check (even though if this *really* was a standalone
+ // it wouldn't report it was a replica set here and thus would fail the validation check) to
+ // ensure that even if the user changed the standalone shard to a single-node replica set, you
+ // can't change the sharded cluster's notion of the shard from standalone to replica set just
+ // by calling addShard.
+ expectIsMaster(shardTarget,
+ BSON("ok" << 1 << "ismaster" << true << "setName"
+ << "mySet"
+ << "hosts"
+ << BSON_ARRAY(shardTarget.toString())));
+ future3.timed_get(kFutureTimeout);
+
+ // Ensure that the shard document was unchanged.
+ assertShardExists(existingShard);
- auto future = launchAsync([this, expectedShardName, connString] {
- auto status =
- catalogManager()->addShard(operationContext(), &expectedShardName, connString, 100);
- ASSERT_EQUALS(ErrorCodes::OperationFailed, status);
- ASSERT_STRING_CONTAINS(status.getStatus().reason(),
- "is already a member of the existing shard");
+ // Adding the same host with the same options should succeed.
+ auto future4 = launchAsync([&] {
+ Client::initThreadIfNotAlready();
+ auto shardName = assertGet(catalogManager()->addShard(operationContext(),
+ &existingShardName,
+ ConnectionString(shardTarget),
+ existingShard.getMaxSizeMB()));
+ ASSERT_EQUALS(existingShardName, shardName);
});
+ expectIsMaster(shardTarget, BSON("ok" << 1 << "ismaster" << true));
+ future4.timed_get(kFutureTimeout);
- ShardType shard;
- shard.setName("shard0000");
- shard.setHost(shardTarget.toString());
- expectGetShards({shard});
+ // Ensure that the shard document was unchanged.
+ assertShardExists(existingShard);
- future.timed_get(kFutureTimeout);
+ // Adding the same host with the same options (without explicitly specifying the shard name)
+ // should succeed.
+ auto future5 = launchAsync([&] {
+ Client::initThreadIfNotAlready();
+ auto shardName =
+ assertGet(catalogManager()->addShard(operationContext(),
+ nullptr, // should auto-pick same name
+ ConnectionString(shardTarget),
+ existingShard.getMaxSizeMB()));
+ ASSERT_EQUALS(existingShardName, shardName);
+ });
+ expectIsMaster(shardTarget, BSON("ok" << 1 << "ismaster" << true));
+ future5.timed_get(kFutureTimeout);
+
+ // Ensure that the shard document was unchanged.
+ assertShardExists(existingShard);
}
-// TODO(SERVER-24213): Test adding a new shard with an existing shard name, but different
-// shard membership
-TEST_F(AddShardTest, ReAddExistingShard) {
- std::unique_ptr<RemoteCommandTargeterMock> targeter(
+// Tests both that trying to add a shard with the same replica set as an existing shard but with
+// different options fails, and that adding a shard with the same replica set as an existing shard
+// with the *same* options succeeds.
+TEST_F(AddShardTest, AddExistingShardReplicaSet) {
+ std::unique_ptr<RemoteCommandTargeterMock> replsetTargeter(
stdx::make_unique<RemoteCommandTargeterMock>());
- ConnectionString connString =
- assertGet(ConnectionString::parse("mySet/host1:12345,host2:12345"));
- targeter->setConnectionStringReturnValue(connString);
+ ConnectionString connString = assertGet(ConnectionString::parse("mySet/host1:12345"));
+ replsetTargeter->setConnectionStringReturnValue(connString);
HostAndPort shardTarget = connString.getServers().front();
- targeter->setFindHostReturnValue(shardTarget);
+ replsetTargeter->setFindHostReturnValue(shardTarget);
+ targeterFactory()->addTargeterToReturn(connString, std::move(replsetTargeter));
- targeterFactory()->addTargeterToReturn(connString, std::move(targeter));
- std::string expectedShardName = "mySet";
+ std::string existingShardName = "myShard";
+ ShardType existingShard;
+ existingShard.setName(existingShardName);
+ existingShard.setHost(connString.toString());
+ existingShard.setMaxSizeMB(100);
+ existingShard.setState(ShardType::ShardState::kShardAware);
- auto future = launchAsync([this, expectedShardName, connString] {
- auto status =
- catalogManager()->addShard(operationContext(), &expectedShardName, connString, 100);
- ASSERT_OK(status);
- });
+ // Make sure the shard already exists.
+ ASSERT_OK(catalogClient()->insertConfigDocument(operationContext(),
+ ShardType::ConfigNS,
+ existingShard.toBSON(),
+ ShardingCatalogClient::kMajorityWriteConcern));
+ assertShardExists(existingShard);
- BSONArrayBuilder hosts;
- hosts.append("host1:12345");
- hosts.append("host2:12345");
- BSONObj commandResponse = BSON("ok" << 1 << "ismaster" << true << "setName"
- << "mySet"
- << "hosts"
- << hosts.arr());
- expectIsMaster(shardTarget, commandResponse);
+ BSONObj isMasterResponse = BSON("ok" << 1 << "ismaster" << true << "setName"
+ << "mySet"
+ << "hosts"
+ << BSON_ARRAY("host1:12345"
+ << "host2:12345"));
- expectListDatabases(shardTarget,
- {BSON("name"
- << "shardDB")});
+ // Adding the same connection string with a different shard name should fail.
+ std::string differentName = "anotherShardName";
+ auto future1 = launchAsync([&] {
+ Client::initThreadIfNotAlready();
+ ASSERT_EQUALS(
+ ErrorCodes::IllegalOperation,
+ catalogManager()->addShard(
+ operationContext(), &differentName, connString, existingShard.getMaxSizeMB()));
+ });
+ expectIsMaster(shardTarget, isMasterResponse);
+ future1.timed_get(kFutureTimeout);
- expectGetDatabase("shardDB", boost::none);
+ // Ensure that the shard document was unchanged.
+ assertShardExists(existingShard);
- // The shardIdentity doc inserted into the admin.system.version collection on the shard.
- expectShardIdentityUpsertReturnSuccess(shardTarget, expectedShardName);
+ // Adding the same connection string with a different maxSize should fail.
+ auto future2 = launchAsync([&] {
+ Client::initThreadIfNotAlready();
+ ASSERT_EQUALS(
+ ErrorCodes::IllegalOperation,
+ catalogManager()->addShard(
+ operationContext(), nullptr, connString, existingShard.getMaxSizeMB() + 100));
+ });
+ expectIsMaster(shardTarget, isMasterResponse);
+ future2.timed_get(kFutureTimeout);
- // The shard doc inserted into the config.shards collection on the config server.
- ShardType newShard;
- newShard.setName(expectedShardName);
- newShard.setMaxSizeMB(100);
- newShard.setHost(connString.toString());
- newShard.setState(ShardType::ShardState::kShardAware);
+ // Ensure that the shard document was unchanged.
+ assertShardExists(existingShard);
- // When a shard with the same name already exists, the insert into config.shards will fail
- // with a duplicate key error on the shard name.
- onCommand([&newShard](const RemoteCommandRequest& request) {
- BatchedInsertRequest actualBatchedInsert;
- std::string errmsg;
- ASSERT_TRUE(actualBatchedInsert.parseBSON(request.dbname, request.cmdObj, &errmsg));
+ // Adding a connecting string with a host of an existing shard but using a different connection
+ // string type should fail
+ {
+ // Make sure we can target the request to the standalone server.
+ std::unique_ptr<RemoteCommandTargeterMock> standaloneTargeter(
+ stdx::make_unique<RemoteCommandTargeterMock>());
+ standaloneTargeter->setConnectionStringReturnValue(ConnectionString(shardTarget));
+ standaloneTargeter->setFindHostReturnValue(shardTarget);
+ targeterFactory()->addTargeterToReturn(ConnectionString(shardTarget),
+ std::move(standaloneTargeter));
+ }
+ auto future3 = launchAsync([&] {
+ Client::initThreadIfNotAlready();
+ ASSERT_EQUALS(ErrorCodes::IllegalOperation,
+ catalogManager()->addShard(operationContext(),
+ nullptr,
+ ConnectionString(shardTarget),
+ existingShard.getMaxSizeMB()));
+ });
+ // Make it get past the host validation check (even though if this *really* was a replica set
+ // it would report it was a replica set here and thus would fail the validation check) to
+ // ensure that even if the user changed the replica set shard to a standalone, you
+ // can't change the sharded cluster's notion of the shard from replica set to standalone just
+ // by calling addShard.
+ expectIsMaster(shardTarget, BSON("ok" << 1 << "ismaster" << true));
+ future3.timed_get(kFutureTimeout);
+
+ // Ensure that the shard document was unchanged.
+ assertShardExists(existingShard);
- ASSERT_EQUALS(ShardType::ConfigNS, actualBatchedInsert.getNS().toString());
+ // Adding a connecting string with the same hosts but a different replica set name should fail.
+ std::string differentSetName = "differentSet";
+ {
+ // Add a targeter with the new replica set name so the validation check can be targeted and
+ // run properly.
+ std::unique_ptr<RemoteCommandTargeterMock> differentRSNameTargeter(
+ stdx::make_unique<RemoteCommandTargeterMock>());
+ ConnectionString differentRSConnString =
+ ConnectionString::forReplicaSet(differentSetName, connString.getServers());
+ differentRSNameTargeter->setConnectionStringReturnValue(differentRSConnString);
+ differentRSNameTargeter->setFindHostReturnValue(shardTarget);
+ targeterFactory()->addTargeterToReturn(differentRSConnString,
+ std::move(differentRSNameTargeter));
+ }
+ auto future4 = launchAsync([&] {
+ Client::initThreadIfNotAlready();
+ ASSERT_EQUALS(ErrorCodes::IllegalOperation,
+ catalogManager()->addShard(operationContext(),
+ nullptr,
+ ConnectionString::forReplicaSet(
+ differentSetName, connString.getServers()),
+ existingShard.getMaxSizeMB()));
+ });
+ BSONObj differentRSIsMasterResponse =
+ BSON("ok" << 1 << "ismaster" << true << "setName" << differentSetName << "hosts"
+ << BSON_ARRAY("host1:12345"
+ << "host2:12345"));
+ // Make it get past the validation check (even though if you really tried to add a replica set
+ // with the wrong name it would report the other name in the ismaster response here and thus
+ // would fail the validation check) to ensure that even if you manually change the shard's
+ // replica set name somehow, you can't change the replica set name the sharded cluster knows
+ // for it just by calling addShard again.
+ expectIsMaster(shardTarget, differentRSIsMasterResponse);
+ future4.timed_get(kFutureTimeout);
+
+ // Ensure that the shard document was unchanged.
+ assertShardExists(existingShard);
- auto inserted = actualBatchedInsert.getDocuments();
- ASSERT_EQUALS(1U, inserted.size());
+ // Adding the same host with the same options should succeed.
+ auto future5 = launchAsync([&] {
+ Client::initThreadIfNotAlready();
+ auto shardName = assertGet(catalogManager()->addShard(
+ operationContext(), &existingShardName, connString, existingShard.getMaxSizeMB()));
+ ASSERT_EQUALS(existingShardName, shardName);
+ });
+ expectIsMaster(shardTarget, isMasterResponse);
+ future5.timed_get(kFutureTimeout);
- ASSERT_EQ(newShard.toBSON(), inserted.front());
+ // Adding the same host with the same options (without explicitly specifying the shard name)
+ // should succeed.
+ auto future6 = launchAsync([&] {
+ Client::initThreadIfNotAlready();
+ auto shardName = assertGet(catalogManager()->addShard(
+ operationContext(), nullptr, connString, existingShard.getMaxSizeMB()));
+ ASSERT_EQUALS(existingShardName, shardName);
+ });
+ expectIsMaster(shardTarget, isMasterResponse);
+ future6.timed_get(kFutureTimeout);
- BatchedCommandResponse response;
- response.setOk(false);
- response.setErrCode(ErrorCodes::DuplicateKey);
- response.setErrMessage("E11000 duplicate key error collection: config.shards");
+ // Ensure that the shard document was unchanged.
+ assertShardExists(existingShard);
- return response.toBSON();
+ // Adding the same replica set but different host membership (but otherwise the same options)
+ // should succeed
+ auto otherHost = connString.getServers().back();
+ ConnectionString otherHostConnString = assertGet(ConnectionString::parse("mySet/host2:12345"));
+ {
+ // Add a targeter for the different seed string this addShard request will use.
+ std::unique_ptr<RemoteCommandTargeterMock> otherHostTargeter(
+ stdx::make_unique<RemoteCommandTargeterMock>());
+ otherHostTargeter->setConnectionStringReturnValue(otherHostConnString);
+ otherHostTargeter->setFindHostReturnValue(otherHost);
+ targeterFactory()->addTargeterToReturn(otherHostConnString, std::move(otherHostTargeter));
+ }
+ auto future7 = launchAsync([&] {
+ Client::initThreadIfNotAlready();
+ auto shardName = assertGet(catalogManager()->addShard(
+ operationContext(), nullptr, otherHostConnString, existingShard.getMaxSizeMB()));
+ ASSERT_EQUALS(existingShardName, shardName);
});
+ expectIsMaster(otherHost, isMasterResponse);
+ future7.timed_get(kFutureTimeout);
- future.timed_get(kFutureTimeout);
+ // Ensure that the shard document was unchanged.
+ assertShardExists(existingShard);
}
-*/
} // namespace
} // namespace mongo
diff --git a/src/mongo/s/catalog/replset/sharding_catalog_client_impl.cpp b/src/mongo/s/catalog/replset/sharding_catalog_client_impl.cpp
index daa0dfcd49f..6cc5c791eb9 100644
--- a/src/mongo/s/catalog/replset/sharding_catalog_client_impl.cpp
+++ b/src/mongo/s/catalog/replset/sharding_catalog_client_impl.cpp
@@ -648,6 +648,7 @@ StatusWith<repl::OpTimeWith<DatabaseType>> ShardingCatalogClientImpl::_fetchData
auto findStatus = _exhaustiveFindOnConfig(txn,
readPref,
+ repl::ReadConcernLevel::kMajorityReadConcern,
NamespaceString(DatabaseType::ConfigNS),
BSON(DatabaseType::name(dbName)),
BSONObj(),
@@ -675,6 +676,7 @@ StatusWith<repl::OpTimeWith<CollectionType>> ShardingCatalogClientImpl::getColle
OperationContext* txn, const std::string& collNs) {
auto statusFind = _exhaustiveFindOnConfig(txn,
kConfigReadSelector,
+ repl::ReadConcernLevel::kMajorityReadConcern,
NamespaceString(CollectionType::ConfigNS),
BSON(CollectionType::fullNs(collNs)),
BSONObj(),
@@ -713,6 +715,7 @@ Status ShardingCatalogClientImpl::getCollections(OperationContext* txn,
auto findStatus = _exhaustiveFindOnConfig(txn,
kConfigReadSelector,
+ repl::ReadConcernLevel::kMajorityReadConcern,
NamespaceString(CollectionType::ConfigNS),
b.obj(),
BSONObj(),
@@ -748,7 +751,7 @@ Status ShardingCatalogClientImpl::getCollections(OperationContext* txn,
Status ShardingCatalogClientImpl::dropCollection(OperationContext* txn, const NamespaceString& ns) {
logChange(txn, "dropCollection.start", ns.ns(), BSONObj());
- auto shardsStatus = getAllShards(txn);
+ auto shardsStatus = getAllShards(txn, repl::ReadConcernLevel::kMajorityReadConcern);
if (!shardsStatus.isOK()) {
return shardsStatus.getStatus();
}
@@ -908,8 +911,13 @@ Status ShardingCatalogClientImpl::dropCollection(OperationContext* txn, const Na
StatusWith<BSONObj> ShardingCatalogClientImpl::getGlobalSettings(OperationContext* txn,
StringData key) {
- auto findStatus = _exhaustiveFindOnConfig(
- txn, kConfigReadSelector, kSettingsNamespace, BSON("_id" << key), BSONObj(), 1);
+ auto findStatus = _exhaustiveFindOnConfig(txn,
+ kConfigReadSelector,
+ repl::ReadConcernLevel::kMajorityReadConcern,
+ kSettingsNamespace,
+ BSON("_id" << key),
+ BSONObj(),
+ 1);
if (!findStatus.isOK()) {
return findStatus.getStatus();
}
@@ -976,6 +984,7 @@ Status ShardingCatalogClientImpl::getDatabasesForShard(OperationContext* txn,
vector<string>* dbs) {
auto findStatus = _exhaustiveFindOnConfig(txn,
kConfigReadSelector,
+ repl::ReadConcernLevel::kMajorityReadConcern,
NamespaceString(DatabaseType::ConfigNS),
BSON(DatabaseType::primary(shardId.toString())),
BSONObj(),
@@ -1008,8 +1017,13 @@ Status ShardingCatalogClientImpl::getChunks(OperationContext* txn,
// Convert boost::optional<int> to boost::optional<long long>.
auto longLimit = limit ? boost::optional<long long>(*limit) : boost::none;
- auto findStatus = _exhaustiveFindOnConfig(
- txn, kConfigReadSelector, NamespaceString(ChunkType::ConfigNS), query, sort, longLimit);
+ auto findStatus = _exhaustiveFindOnConfig(txn,
+ kConfigReadSelector,
+ repl::ReadConcernLevel::kMajorityReadConcern,
+ NamespaceString(ChunkType::ConfigNS),
+ query,
+ sort,
+ longLimit);
if (!findStatus.isOK()) {
return findStatus.getStatus();
}
@@ -1043,6 +1057,7 @@ Status ShardingCatalogClientImpl::getTagsForCollection(OperationContext* txn,
auto findStatus = _exhaustiveFindOnConfig(txn,
kConfigReadSelector,
+ repl::ReadConcernLevel::kMajorityReadConcern,
NamespaceString(TagsType::ConfigNS),
BSON(TagsType::ns(collectionNs)),
BSON(TagsType::min() << 1),
@@ -1072,8 +1087,13 @@ StatusWith<string> ShardingCatalogClientImpl::getTagForChunk(OperationContext* t
BSON(TagsType::ns(collectionNs) << TagsType::min() << BSON("$lte" << chunk.getMin())
<< TagsType::max()
<< BSON("$gte" << chunk.getMax()));
- auto findStatus = _exhaustiveFindOnConfig(
- txn, kConfigReadSelector, NamespaceString(TagsType::ConfigNS), query, BSONObj(), 1);
+ auto findStatus = _exhaustiveFindOnConfig(txn,
+ kConfigReadSelector,
+ repl::ReadConcernLevel::kMajorityReadConcern,
+ NamespaceString(TagsType::ConfigNS),
+ query,
+ BSONObj(),
+ 1);
if (!findStatus.isOK()) {
return findStatus.getStatus();
}
@@ -1097,10 +1117,11 @@ StatusWith<string> ShardingCatalogClientImpl::getTagForChunk(OperationContext* t
}
StatusWith<repl::OpTimeWith<std::vector<ShardType>>> ShardingCatalogClientImpl::getAllShards(
- OperationContext* txn) {
+ OperationContext* txn, repl::ReadConcernLevel readConcern) {
std::vector<ShardType> shards;
auto findStatus = _exhaustiveFindOnConfig(txn,
kConfigReadSelector,
+ readConcern,
NamespaceString(ShardType::ConfigNS),
BSONObj(), // no query filter
BSONObj(), // no sort
@@ -1380,6 +1401,7 @@ Status ShardingCatalogClientImpl::insertConfigDocument(OperationContext* txn,
auto fetchDuplicate =
_exhaustiveFindOnConfig(txn,
ReadPreferenceSetting{ReadPreference::PrimaryOnly},
+ repl::ReadConcernLevel::kMajorityReadConcern,
nss,
idField.wrap(),
BSONObj(),
@@ -1486,6 +1508,7 @@ Status ShardingCatalogClientImpl::_checkDbDoesNotExist(OperationContext* txn,
auto findStatus = _exhaustiveFindOnConfig(txn,
kConfigReadSelector,
+ repl::ReadConcernLevel::kMajorityReadConcern,
NamespaceString(DatabaseType::ConfigNS),
queryBuilder.obj(),
BSONObj(),
@@ -1593,12 +1616,13 @@ StatusWith<long long> ShardingCatalogClientImpl::_runCountCommandOnConfig(Operat
StatusWith<repl::OpTimeWith<vector<BSONObj>>> ShardingCatalogClientImpl::_exhaustiveFindOnConfig(
OperationContext* txn,
const ReadPreferenceSetting& readPref,
+ repl::ReadConcernLevel readConcern,
const NamespaceString& nss,
const BSONObj& query,
const BSONObj& sort,
boost::optional<long long> limit) {
auto response = Grid::get(txn)->shardRegistry()->getConfigShard()->exhaustiveFindOnConfig(
- txn, readPref, repl::ReadConcernLevel::kMajorityReadConcern, nss, query, sort, limit);
+ txn, readPref, readConcern, nss, query, sort, limit);
if (!response.isOK()) {
return response.getStatus();
}
diff --git a/src/mongo/s/catalog/replset/sharding_catalog_client_impl.h b/src/mongo/s/catalog/replset/sharding_catalog_client_impl.h
index 614e98c36e2..72e000db216 100644
--- a/src/mongo/s/catalog/replset/sharding_catalog_client_impl.h
+++ b/src/mongo/s/catalog/replset/sharding_catalog_client_impl.h
@@ -131,7 +131,7 @@ public:
const ChunkType& chunk) override;
StatusWith<repl::OpTimeWith<std::vector<ShardType>>> getAllShards(
- OperationContext* txn) override;
+ OperationContext* txn, repl::ReadConcernLevel readConcern) override;
bool runUserManagementWriteCommand(OperationContext* txn,
const std::string& commandName,
@@ -227,6 +227,7 @@ private:
StatusWith<repl::OpTimeWith<std::vector<BSONObj>>> _exhaustiveFindOnConfig(
OperationContext* txn,
const ReadPreferenceSetting& readPref,
+ repl::ReadConcernLevel readConcern,
const NamespaceString& nss,
const BSONObj& query,
const BSONObj& sort,
diff --git a/src/mongo/s/catalog/replset/sharding_catalog_manager_impl.cpp b/src/mongo/s/catalog/replset/sharding_catalog_manager_impl.cpp
index 80bd6c81da8..ee81e49eb6f 100644
--- a/src/mongo/s/catalog/replset/sharding_catalog_manager_impl.cpp
+++ b/src/mongo/s/catalog/replset/sharding_catalog_manager_impl.cpp
@@ -94,12 +94,13 @@ const ReadPreferenceSetting kConfigReadSelector(ReadPreference::Nearest, TagSet{
const ReadPreferenceSetting kConfigPrimarySelector(ReadPreference::PrimaryOnly);
const WriteConcernOptions kNoWaitWriteConcern(1, WriteConcernOptions::SyncMode::UNSET, Seconds(0));
-void toBatchError(const Status& status, BatchedCommandResponse* response) {
- response->clear();
- response->setErrCode(status.code());
- response->setErrMessage(status.reason());
- response->setOk(false);
-}
+/**
+ * Lock that guards changes to the set of shards in the cluster (ie addShard and removeShard
+ * requests).
+ * TODO: Currently only taken during addShard requests, this should also be taken in X mode during
+ * removeShard, once removeShard is moved to run on the config server primary instead of on mongos.
+ */
+Lock::ResourceMutex kShardMembershipLock;
/**
* Lock for shard zoning operations. This should be acquired when doing any operations that
@@ -300,26 +301,104 @@ StatusWith<Shard::CommandResponse> ShardingCatalogManagerImpl::_runCommandForAdd
std::move(writeConcernStatus));
}
+StatusWith<boost::optional<ShardType>> ShardingCatalogManagerImpl::_checkIfShardExists(
+ OperationContext* txn,
+ const ConnectionString& proposedShardConnectionString,
+ const std::string* proposedShardName,
+ long long proposedShardMaxSize) {
+ // Check whether any host in the connection is already part of the cluster.
+ const auto existingShards =
+ _catalogClient->getAllShards(txn, repl::ReadConcernLevel::kLocalReadConcern);
+ if (!existingShards.isOK()) {
+ return Status(existingShards.getStatus().code(),
+ str::stream() << "Failed to load existing shards during addShard"
+ << causedBy(existingShards.getStatus().reason()));
+ }
+
+ // Now check if this shard already exists - if it already exists *with the same options* then
+ // the addShard request can return success early without doing anything more.
+ for (const auto& existingShard : existingShards.getValue().value) {
+ auto swExistingShardConnStr = ConnectionString::parse(existingShard.getHost());
+ if (!swExistingShardConnStr.isOK()) {
+ return swExistingShardConnStr.getStatus();
+ }
+ auto existingShardConnStr = std::move(swExistingShardConnStr.getValue());
+ // Function for determining if the options for the shard that is being added match the
+ // options of an existing shard that conflicts with it.
+ auto shardsAreEquivalent = [&]() {
+ if (proposedShardName && *proposedShardName != existingShard.getName()) {
+ return false;
+ }
+ if (proposedShardConnectionString.type() != existingShardConnStr.type()) {
+ return false;
+ }
+ if (proposedShardConnectionString.type() == ConnectionString::SET &&
+ proposedShardConnectionString.getSetName() != existingShardConnStr.getSetName()) {
+ return false;
+ }
+ if (proposedShardMaxSize != existingShard.getMaxSizeMB()) {
+ return false;
+ }
+ return true;
+ };
+
+ if (existingShardConnStr.type() == ConnectionString::SET &&
+ proposedShardConnectionString.type() == ConnectionString::SET &&
+ existingShardConnStr.getSetName() == proposedShardConnectionString.getSetName()) {
+ // An existing shard has the same replica set name as the shard being added.
+ // If the options aren't the same, then this is an error,
+ // but if the options match then the addShard operation should be immediately
+ // considered a success and terminated.
+ if (shardsAreEquivalent()) {
+ return {existingShard};
+ } else {
+ return {ErrorCodes::IllegalOperation,
+ str::stream() << "A shard already exists containing the replica set '"
+ << existingShardConnStr.getSetName()
+ << "'"};
+ }
+ }
+
+ for (const auto& existingHost : existingShardConnStr.getServers()) {
+ // Look if any of the hosts in the existing shard are present within the shard trying
+ // to be added.
+ for (const auto& addingHost : proposedShardConnectionString.getServers()) {
+ if (existingHost == addingHost) {
+ // At least one of the hosts in the shard being added already exists in an
+ // existing shard. If the options aren't the same, then this is an error,
+ // but if the options match then the addShard operation should be immediately
+ // considered a success and terminated.
+ if (shardsAreEquivalent()) {
+ return {existingShard};
+ } else {
+ return {ErrorCodes::IllegalOperation,
+ str::stream() << "'" << addingHost.toString() << "' "
+ << "is already a member of the existing shard '"
+ << existingShard.getHost()
+ << "' ("
+ << existingShard.getName()
+ << ")."};
+ }
+ }
+ }
+ }
+ if (proposedShardName && *proposedShardName == existingShard.getName()) {
+ // If we get here then we're trying to add a shard with the same name as an existing
+ // shard, but there was no overlap in the hosts between the existing shard and the
+ // proposed connection string for the new shard.
+ return Status(ErrorCodes::IllegalOperation,
+ str::stream() << "A shard named " << *proposedShardName
+ << " already exists");
+ }
+ }
+ return {boost::none};
+}
+
StatusWith<ShardType> ShardingCatalogManagerImpl::_validateHostAsShard(
OperationContext* txn,
std::shared_ptr<RemoteCommandTargeter> targeter,
const std::string* shardProposedName,
const ConnectionString& connectionString) {
- // Check whether any host in the connection is already part of the cluster.
- Grid::get(txn)->shardRegistry()->reload(txn);
- for (const auto& hostAndPort : connectionString.getServers()) {
- std::shared_ptr<Shard> shard;
- shard = Grid::get(txn)->shardRegistry()->getShardNoReload(hostAndPort.toString());
- if (shard) {
- return {ErrorCodes::OperationFailed,
- str::stream() << "'" << hostAndPort.toString() << "' "
- << "is already a member of the existing shard '"
- << shard->getConnString().toString()
- << "' ("
- << shard->getId()
- << ")."};
- }
- }
// Check for mongos and older version mongod connections, and whether the hosts
// can be found for the user specified replset.
@@ -556,6 +635,9 @@ StatusWith<string> ShardingCatalogManagerImpl::addShard(
return {ErrorCodes::BadValue, "shard name cannot be empty"};
}
+ // Only one addShard operation can be in progress at a time.
+ Lock::ExclusiveLock lk(txn->lockState(), kZoneOpLock);
+
// TODO: Don't create a detached Shard object, create a detached RemoteCommandTargeter instead.
const std::shared_ptr<Shard> shard{
Grid::get(txn)->shardRegistry()->createConnection(shardConnectionString)};
@@ -578,15 +660,29 @@ StatusWith<string> ShardingCatalogManagerImpl::addShard(
if (!shardStatus.isOK()) {
return shardStatus.getStatus();
}
-
ShardType& shardType = shardStatus.getValue();
+
+ // Check if this shard has already been added (can happen in the case of a retry after a network
+ // error, for example) and thus this addShard request should be considered a no-op.
+ auto existingShard =
+ _checkIfShardExists(txn, shardConnectionString, shardProposedName, maxSize);
+ if (!existingShard.isOK()) {
+ return existingShard.getStatus();
+ }
+ if (existingShard.getValue()) {
+ // These hosts already belong to an existing shard, so report success and terminate the
+ // addShard request.
+ return existingShard.getValue()->getName();
+ }
+
+
+ // Check that none of the existing shard candidate's dbs exist already
auto dbNamesStatus = _getDBNamesListFromShard(txn, targeter);
if (!dbNamesStatus.isOK()) {
return dbNamesStatus.getStatus();
}
- // Check that none of the existing shard candidate's dbs exist already
for (const string& dbName : dbNamesStatus.getValue()) {
auto dbt = _catalogClient->getDatabase(txn, dbName);
if (dbt.isOK()) {
@@ -642,19 +738,6 @@ StatusWith<string> ShardingCatalogManagerImpl::addShard(
txn, ShardType::ConfigNS, shardType.toBSON(), ShardingCatalogClient::kMajorityWriteConcern);
if (!result.isOK()) {
log() << "error adding shard: " << shardType.toBSON() << " err: " << result.reason();
- if (result == ErrorCodes::DuplicateKey) {
- // TODO(SERVER-24213): adding a shard that already exists should be considered success,
- // however this approach does no validation that we are adding the shard with the same
- // options. It also does not protect against adding the same shard with a different
- // shard name and slightly different connection string. This is a temporary hack to
- // get the continuous stepdown suite passing.
- warning() << "Received duplicate key error when inserting new shard with name "
- << shardType.getName() << " and connection string "
- << shardConnectionString.toString()
- << " to config.shards collection. This most likely means that there was an "
- "attempt to add a shard that already exists in the cluster";
- return shardType.getName();
- }
return result;
}
diff --git a/src/mongo/s/catalog/replset/sharding_catalog_manager_impl.h b/src/mongo/s/catalog/replset/sharding_catalog_manager_impl.h
index ec1cf3841c9..9b261906ecf 100644
--- a/src/mongo/s/catalog/replset/sharding_catalog_manager_impl.h
+++ b/src/mongo/s/catalog/replset/sharding_catalog_manager_impl.h
@@ -106,6 +106,22 @@ private:
StatusWith<std::string> _generateNewShardName(OperationContext* txn);
/**
+ * Used during addShard to determine if there is already an existing shard that matches the
+ * shard that is currently being added. An OK return with boost::none indicates that there
+ * is no conflicting shard, and we can proceed trying to add the new shard. An OK return
+ * with a ShardType indicates that there is an existing shard that matches the shard being added
+ * but since the options match, this addShard request can do nothing and return success. A
+ * non-OK return either indicates a problem reading the existing shards from disk or more likely
+ * indicates that an existing shard conflicts with the shard being added and they have different
+ * options, so the addShard attempt must be aborted.
+ */
+ StatusWith<boost::optional<ShardType>> _checkIfShardExists(
+ OperationContext* txn,
+ const ConnectionString& propsedShardConnectionString,
+ const std::string* shardProposedName,
+ long long maxSize);
+
+ /**
* Validates that the specified endpoint can serve as a shard server. In particular, this
* this function checks that the shard can be contacted and that it is not already member of
* another sharded cluster.
diff --git a/src/mongo/s/catalog/replset/sharding_catalog_test.cpp b/src/mongo/s/catalog/replset/sharding_catalog_test.cpp
index e4d06494d1b..bff745ffb78 100644
--- a/src/mongo/s/catalog/replset/sharding_catalog_test.cpp
+++ b/src/mongo/s/catalog/replset/sharding_catalog_test.cpp
@@ -389,7 +389,8 @@ TEST_F(ShardingCatalogClientTest, GetAllShardsValid) {
const vector<ShardType> expectedShardsList = {s1, s2, s3};
auto future = launchAsync([this] {
- auto shards = assertGet(catalogClient()->getAllShards(operationContext()));
+ auto shards = assertGet(catalogClient()->getAllShards(
+ operationContext(), repl::ReadConcernLevel::kMajorityReadConcern));
return shards.value;
});
@@ -423,7 +424,8 @@ TEST_F(ShardingCatalogClientTest, GetAllShardsWithInvalidShard) {
configTargeter()->setFindHostReturnValue(HostAndPort("TestHost1"));
auto future = launchAsync([this] {
- auto status = catalogClient()->getAllShards(operationContext());
+ auto status = catalogClient()->getAllShards(operationContext(),
+ repl::ReadConcernLevel::kMajorityReadConcern);
ASSERT_EQ(ErrorCodes::FailedToParse, status.getStatus());
});
diff --git a/src/mongo/s/catalog/sharding_catalog_client.h b/src/mongo/s/catalog/sharding_catalog_client.h
index fe62753b249..02de97a4f34 100644
--- a/src/mongo/s/catalog/sharding_catalog_client.h
+++ b/src/mongo/s/catalog/sharding_catalog_client.h
@@ -274,7 +274,7 @@ public:
* Returns a !OK status if an error occurs.
*/
virtual StatusWith<repl::OpTimeWith<std::vector<ShardType>>> getAllShards(
- OperationContext* txn) = 0;
+ OperationContext* txn, repl::ReadConcernLevel readConcern) = 0;
/**
* Runs a user management command on the config servers, potentially synchronizing through
diff --git a/src/mongo/s/catalog/sharding_catalog_client_mock.cpp b/src/mongo/s/catalog/sharding_catalog_client_mock.cpp
index 308bf4aeb7d..9206eb1d0e1 100644
--- a/src/mongo/s/catalog/sharding_catalog_client_mock.cpp
+++ b/src/mongo/s/catalog/sharding_catalog_client_mock.cpp
@@ -135,7 +135,7 @@ StatusWith<string> ShardingCatalogClientMock::getTagForChunk(OperationContext* t
}
StatusWith<repl::OpTimeWith<std::vector<ShardType>>> ShardingCatalogClientMock::getAllShards(
- OperationContext* txn) {
+ OperationContext* txn, repl::ReadConcernLevel readConcern) {
return {ErrorCodes::InternalError, "Method not implemented"};
}
diff --git a/src/mongo/s/catalog/sharding_catalog_client_mock.h b/src/mongo/s/catalog/sharding_catalog_client_mock.h
index 14fb1e87d1d..8c37896fbbc 100644
--- a/src/mongo/s/catalog/sharding_catalog_client_mock.h
+++ b/src/mongo/s/catalog/sharding_catalog_client_mock.h
@@ -99,7 +99,7 @@ public:
const ChunkType& chunk) override;
StatusWith<repl::OpTimeWith<std::vector<ShardType>>> getAllShards(
- OperationContext* txn) override;
+ OperationContext* txn, repl::ReadConcernLevel readConcern) override;
bool runUserManagementWriteCommand(OperationContext* txn,
const std::string& commandName,
diff --git a/src/mongo/s/client/shard_registry.cpp b/src/mongo/s/client/shard_registry.cpp
index 5f983493e05..9a951bae5f9 100644
--- a/src/mongo/s/client/shard_registry.cpp
+++ b/src/mongo/s/client/shard_registry.cpp
@@ -302,7 +302,8 @@ ShardRegistryData::ShardRegistryData(OperationContext* txn, ShardFactory* shardF
}
void ShardRegistryData::_init(OperationContext* txn, ShardFactory* shardFactory) {
- auto shardsStatus = grid.catalogClient(txn)->getAllShards(txn);
+ auto shardsStatus =
+ grid.catalogClient(txn)->getAllShards(txn, repl::ReadConcernLevel::kMajorityReadConcern);
if (!shardsStatus.isOK()) {
uasserted(shardsStatus.getStatus().code(),
diff --git a/src/mongo/s/commands/cluster_list_shards_cmd.cpp b/src/mongo/s/commands/cluster_list_shards_cmd.cpp
index 9deee999e12..8d3b19ad8c8 100644
--- a/src/mongo/s/commands/cluster_list_shards_cmd.cpp
+++ b/src/mongo/s/commands/cluster_list_shards_cmd.cpp
@@ -74,7 +74,8 @@ public:
int options,
std::string& errmsg,
BSONObjBuilder& result) {
- auto shardsStatus = grid.catalogClient(txn)->getAllShards(txn);
+ auto shardsStatus = grid.catalogClient(txn)->getAllShards(
+ txn, repl::ReadConcernLevel::kMajorityReadConcern);
if (!shardsStatus.isOK()) {
return appendCommandStatus(result, shardsStatus.getStatus());
}