diff options
author | Spencer T Brody <spencer@mongodb.com> | 2016-06-01 16:55:05 -0400 |
---|---|---|
committer | Spencer T Brody <spencer@mongodb.com> | 2016-06-13 19:02:47 -0400 |
commit | e324289ed29a41f0f6f610dc63ab5d2ce1f9c351 (patch) | |
tree | 07b7159767295a97c310816f56d9dea0f534d6fd /src/mongo | |
parent | 2d487da181d970df760a40ee253398255ca240d0 (diff) | |
download | mongo-e324289ed29a41f0f6f610dc63ab5d2ce1f9c351.tar.gz |
SERVER-24323 Add ShardingCatalogManager and move addShard implementation into it
Diffstat (limited to 'src/mongo')
29 files changed, 1132 insertions, 621 deletions
diff --git a/src/mongo/db/SConscript b/src/mongo/db/SConscript index 8f4c088c26b..8b00a9e41d0 100644 --- a/src/mongo/db/SConscript +++ b/src/mongo/db/SConscript @@ -697,6 +697,7 @@ serveronlyLibdeps = [ "$BUILD_DIR/mongo/db/bson/dotted_path_support", "$BUILD_DIR/mongo/executor/network_interface_factory", "$BUILD_DIR/mongo/s/catalog/replset/sharding_catalog_client_impl", + "$BUILD_DIR/mongo/s/catalog/replset/sharding_catalog_manager_impl", "$BUILD_DIR/mongo/s/client/sharding_connection_hook", "$BUILD_DIR/mongo/s/coreshard", "$BUILD_DIR/mongo/s/serveronly", diff --git a/src/mongo/db/commands/conn_pool_stats.cpp b/src/mongo/db/commands/conn_pool_stats.cpp index 15bbe103ce1..2cd000e7d30 100644 --- a/src/mongo/db/commands/conn_pool_stats.cpp +++ b/src/mongo/db/commands/conn_pool_stats.cpp @@ -38,9 +38,11 @@ #include "mongo/db/repl/replication_coordinator.h" #include "mongo/db/repl/replication_coordinator_global.h" #include "mongo/db/s/sharding_state.h" +#include "mongo/db/server_options.h" #include "mongo/executor/connection_pool_stats.h" #include "mongo/executor/network_interface_factory.h" #include "mongo/executor/task_executor_pool.h" +#include "mongo/s/catalog/sharding_catalog_manager.h" #include "mongo/s/client/shard_registry.h" #include "mongo/s/grid.h" @@ -90,7 +92,9 @@ public: auto grid = Grid::get(txn); if (grid->shardRegistry()) { grid->getExecutorPool()->appendConnectionStats(&stats); - grid->catalogClient(txn)->appendConnectionStats(&stats); + if (serverGlobalParams.clusterRole == ClusterRole::ConfigServer) { + grid->catalogManager()->appendConnectionStats(&stats); + } } // Output to a BSON object. diff --git a/src/mongo/db/s/SConscript b/src/mongo/db/s/SConscript index 9aa55fb8058..474179ac240 100644 --- a/src/mongo/db/s/SConscript +++ b/src/mongo/db/s/SConscript @@ -158,7 +158,7 @@ env.CppUnitTest( '$BUILD_DIR/mongo/db/serveronly', '$BUILD_DIR/mongo/executor/network_test_env', '$BUILD_DIR/mongo/executor/thread_pool_task_executor_test_fixture', - '$BUILD_DIR/mongo/s/catalog/sharding_catalog_client_mock', + '$BUILD_DIR/mongo/s/catalog/sharding_catalog_mock', '$BUILD_DIR/mongo/s/coreshard', '$BUILD_DIR/mongo/util/clock_source_mock', '$BUILD_DIR/mongo/util/net/message_port_mock', @@ -177,7 +177,7 @@ env.CppUnitTest( '$BUILD_DIR/mongo/db/serveronly', '$BUILD_DIR/mongo/executor/network_test_env', '$BUILD_DIR/mongo/executor/thread_pool_task_executor_test_fixture', - '$BUILD_DIR/mongo/s/catalog/sharding_catalog_client_mock', + '$BUILD_DIR/mongo/s/catalog/sharding_catalog_mock', '$BUILD_DIR/mongo/s/coreshard', '$BUILD_DIR/mongo/util/clock_source_mock', '$BUILD_DIR/mongo/util/net/message_port_mock', diff --git a/src/mongo/db/s/config/configsvr_add_shard_command.cpp b/src/mongo/db/s/config/configsvr_add_shard_command.cpp index 4378ae22c20..3bc7e2eeeb0 100644 --- a/src/mongo/db/s/config/configsvr_add_shard_command.cpp +++ b/src/mongo/db/s/config/configsvr_add_shard_command.cpp @@ -38,7 +38,7 @@ #include "mongo/db/namespace_string.h" #include "mongo/db/repl/replica_set_config.h" #include "mongo/db/repl/replication_coordinator.h" -#include "mongo/s/catalog/sharding_catalog_client.h" +#include "mongo/s/catalog/sharding_catalog_manager.h" #include "mongo/s/catalog/type_shard.h" #include "mongo/s/grid.h" #include "mongo/s/request_types/add_shard_request_type.h" @@ -119,7 +119,7 @@ public: parsedRequest.hasMaxSize() ? parsedRequest.getMaxSize() : kMaxSizeMBDefault); - StatusWith<string> addShardResult = grid.catalogClient(txn)->addShard( + StatusWith<string> addShardResult = Grid::get(txn)->catalogManager()->addShard( txn, parsedRequest.hasName() ? &parsedRequest.getName() : nullptr, parsedRequest.getConnString(), diff --git a/src/mongo/db/s/sharding_initialization_mongod.cpp b/src/mongo/db/s/sharding_initialization_mongod.cpp index 4993e118b80..a65c83fc1ae 100644 --- a/src/mongo/db/s/sharding_initialization_mongod.cpp +++ b/src/mongo/db/s/sharding_initialization_mongod.cpp @@ -35,6 +35,9 @@ #include "mongo/client/connection_string.h" #include "mongo/client/remote_command_targeter.h" #include "mongo/client/remote_command_targeter_factory_impl.h" +#include "mongo/db/server_options.h" +#include "mongo/executor/task_executor.h" +#include "mongo/s/catalog/replset/sharding_catalog_manager_impl.h" #include "mongo/s/client/shard_factory.h" #include "mongo/s/client/shard_local.h" #include "mongo/s/client/shard_remote.h" @@ -74,9 +77,19 @@ Status initializeGlobalShardingStateForMongod(const ConnectionString& configCS) auto shardFactory = stdx::make_unique<ShardFactory>(std::move(buildersMap), std::move(targeterFactory)); - return initializeGlobalShardingState(configCS, std::move(shardFactory), []() { - return stdx::make_unique<rpc::ShardingEgressMetadataHookForMongod>(); - }); + return initializeGlobalShardingState( + configCS, + std::move(shardFactory), + []() { return stdx::make_unique<rpc::ShardingEgressMetadataHookForMongod>(); }, + [](ShardingCatalogClient* catalogClient, std::unique_ptr<executor::TaskExecutor> executor) + -> std::unique_ptr<ShardingCatalogManager> { + if (serverGlobalParams.clusterRole == ClusterRole::ConfigServer) { + return stdx::make_unique<ShardingCatalogManagerImpl>(catalogClient, + std::move(executor)); + } else { + return nullptr; // Only config servers get a real ShardingCatalogManager + } + }); } } // namespace mongo diff --git a/src/mongo/db/s/sharding_state_test.cpp b/src/mongo/db/s/sharding_state_test.cpp index 83b885163cb..76c6772994c 100644 --- a/src/mongo/db/s/sharding_state_test.cpp +++ b/src/mongo/db/s/sharding_state_test.cpp @@ -47,6 +47,7 @@ #include "mongo/s/balancer/balancer_configuration.h" #include "mongo/s/catalog/catalog_cache.h" #include "mongo/s/catalog/sharding_catalog_client_mock.h" +#include "mongo/s/catalog/sharding_catalog_manager_mock.h" #include "mongo/s/client/shard_factory.h" #include "mongo/s/client/shard_registry.h" #include "mongo/s/client/shard_remote.h" @@ -111,6 +112,7 @@ void initGrid(OperationContext* txn, const ConnectionString& configConnString) { grid.init( stdx::make_unique<ShardingCatalogClientMock>(), + stdx::make_unique<ShardingCatalogManagerMock>(), stdx::make_unique<CatalogCache>(), std::move(shardRegistry), stdx::make_unique<ClusterCursorManager>(txn->getServiceContext()->getPreciseClockSource()), diff --git a/src/mongo/s/SConscript b/src/mongo/s/SConscript index 4801a9000f1..e89179b2ef3 100644 --- a/src/mongo/s/SConscript +++ b/src/mongo/s/SConscript @@ -84,6 +84,7 @@ env.Library( '$BUILD_DIR/mongo/rpc/metadata', '$BUILD_DIR/mongo/s/catalog/dist_lock_manager_mock', '$BUILD_DIR/mongo/s/catalog/replset/sharding_catalog_client_impl', + '$BUILD_DIR/mongo/s/catalog/replset/sharding_catalog_manager_impl', '$BUILD_DIR/mongo/s/coreshard', '$BUILD_DIR/mongo/util/clock_source_mock', '$BUILD_DIR/mongo/util/net/message_port_mock', diff --git a/src/mongo/s/catalog/SConscript b/src/mongo/s/catalog/SConscript index d9470241010..5363683be08 100644 --- a/src/mongo/s/catalog/SConscript +++ b/src/mongo/s/catalog/SConscript @@ -9,9 +9,10 @@ env.SConscript( ) env.Library( - target='sharding_catalog_client_mock', + target='sharding_catalog_mock', source=[ 'sharding_catalog_client_mock.cpp', + 'sharding_catalog_manager_mock.cpp', ], LIBDEPS=[ 'dist_lock_manager_mock', diff --git a/src/mongo/s/catalog/replset/SConscript b/src/mongo/s/catalog/replset/SConscript index 4f9272fcbee..e8303bc5f14 100644 --- a/src/mongo/s/catalog/replset/SConscript +++ b/src/mongo/s/catalog/replset/SConscript @@ -51,7 +51,7 @@ env.CppUnitTest( '$BUILD_DIR/mongo/executor/network_test_env', '$BUILD_DIR/mongo/executor/thread_pool_task_executor_test_fixture', '$BUILD_DIR/mongo/s/catalog/dist_lock_catalog_mock', - '$BUILD_DIR/mongo/s/catalog/sharding_catalog_client_mock', + '$BUILD_DIR/mongo/s/catalog/sharding_catalog_mock', '$BUILD_DIR/mongo/s/coreshard', '$BUILD_DIR/mongo/s/mongoscore', '$BUILD_DIR/mongo/s/sharding_test_fixture', @@ -78,6 +78,22 @@ env.Library( ) env.Library( + target='sharding_catalog_manager_impl', + source=[ + 'sharding_catalog_manager_impl.cpp', + ], + LIBDEPS=[ + '$BUILD_DIR/mongo/db/repl/read_concern_args', + '$BUILD_DIR/mongo/executor/network_interface', + '$BUILD_DIR/mongo/s/client/sharding_client', + ], + LIBDEPS_TAGS=[ + # Depends on coreshard, but that would be circular + 'incomplete', + ], +) + +env.Library( target='sharding_catalog_test_fixture', source=[ 'sharding_catalog_test_fixture.cpp', diff --git a/src/mongo/s/catalog/replset/dist_lock_catalog_impl_test.cpp b/src/mongo/s/catalog/replset/dist_lock_catalog_impl_test.cpp index 3b8331495e5..ff05b83d175 100644 --- a/src/mongo/s/catalog/replset/dist_lock_catalog_impl_test.cpp +++ b/src/mongo/s/catalog/replset/dist_lock_catalog_impl_test.cpp @@ -48,6 +48,7 @@ #include "mongo/s/catalog/catalog_cache.h" #include "mongo/s/catalog/replset/dist_lock_catalog_impl.h" #include "mongo/s/catalog/sharding_catalog_client_mock.h" +#include "mongo/s/catalog/sharding_catalog_manager_mock.h" #include "mongo/s/catalog/type_lockpings.h" #include "mongo/s/catalog/type_locks.h" #include "mongo/s/client/shard_factory.h" @@ -161,6 +162,7 @@ private: _distLockCatalog = stdx::make_unique<DistLockCatalogImpl>(shardRegistry.get()); grid.init(stdx::make_unique<ShardingCatalogClientMock>(), + stdx::make_unique<ShardingCatalogManagerMock>(), stdx::make_unique<CatalogCache>(), std::move(shardRegistry), std::unique_ptr<ClusterCursorManager>{nullptr}, @@ -180,8 +182,6 @@ private: std::unique_ptr<executor::NetworkTestEnv> _networkTestEnv; - ShardingCatalogClientMock _catalogMgr; - std::unique_ptr<DistLockCatalogImpl> _distLockCatalog; OperationContextNoop _txn; }; diff --git a/src/mongo/s/catalog/replset/replset_dist_lock_manager_test.cpp b/src/mongo/s/catalog/replset/replset_dist_lock_manager_test.cpp index dac97dc4369..c353ee05e82 100644 --- a/src/mongo/s/catalog/replset/replset_dist_lock_manager_test.cpp +++ b/src/mongo/s/catalog/replset/replset_dist_lock_manager_test.cpp @@ -52,6 +52,7 @@ #include "mongo/s/catalog/catalog_cache.h" #include "mongo/s/catalog/dist_lock_catalog_mock.h" #include "mongo/s/catalog/sharding_catalog_client.h" +#include "mongo/s/catalog/sharding_catalog_manager.h" #include "mongo/s/catalog/type_lockpings.h" #include "mongo/s/catalog/type_locks.h" #include "mongo/s/client/shard_factory.h" @@ -166,6 +167,7 @@ protected: auto shardRegistry = stdx::make_unique<ShardRegistry>(std::move(shardFactory), configCS); grid.init(nullptr, nullptr, + nullptr, std::move(shardRegistry), nullptr, stdx::make_unique<BalancerConfiguration>(), diff --git a/src/mongo/s/catalog/replset/sharding_catalog_add_shard_test.cpp b/src/mongo/s/catalog/replset/sharding_catalog_add_shard_test.cpp index d1159c64dcf..62a79a0e37b 100644 --- a/src/mongo/s/catalog/replset/sharding_catalog_add_shard_test.cpp +++ b/src/mongo/s/catalog/replset/sharding_catalog_add_shard_test.cpp @@ -40,8 +40,8 @@ #include "mongo/db/s/type_shard_identity.h" #include "mongo/rpc/metadata/repl_set_metadata.h" #include "mongo/rpc/metadata/server_selection_metadata.h" -#include "mongo/s/catalog/replset/sharding_catalog_client_impl.h" #include "mongo/s/catalog/replset/sharding_catalog_test_fixture.h" +#include "mongo/s/catalog/sharding_catalog_manager.h" #include "mongo/s/catalog/type_changelog.h" #include "mongo/s/catalog/type_database.h" #include "mongo/s/catalog/type_shard.h" @@ -300,10 +300,10 @@ TEST_F(AddShardTest, Standalone) { auto future = launchAsync([this, expectedShardName] { auto shardName = assertGet( - catalogClient()->addShard(operationContext(), - &expectedShardName, - assertGet(ConnectionString::parse("StandaloneHost:12345")), - 100)); + catalogManager()->addShard(operationContext(), + &expectedShardName, + assertGet(ConnectionString::parse("StandaloneHost:12345")), + 100)); ASSERT_EQUALS(expectedShardName, shardName); }); @@ -381,10 +381,10 @@ TEST_F(AddShardTest, StandaloneGenerateName) { auto future = launchAsync([this, expectedShardName, shardTarget] { auto shardName = assertGet( - catalogClient()->addShard(operationContext(), - nullptr, - assertGet(ConnectionString::parse(shardTarget.toString())), - 100)); + catalogManager()->addShard(operationContext(), + nullptr, + assertGet(ConnectionString::parse(shardTarget.toString())), + 100)); ASSERT_EQUALS(expectedShardName, shardName); }); @@ -478,7 +478,7 @@ TEST_F(AddShardTest, AddSCCCConnectionStringAsShard) { auto future = launchAsync([this, invalidConn] { const std::string shardName("StandaloneShard"); - auto status = catalogClient()->addShard(operationContext(), &shardName, invalidConn, 100); + auto status = catalogManager()->addShard(operationContext(), &shardName, invalidConn, 100); ASSERT_EQUALS(ErrorCodes::BadValue, status); ASSERT_STRING_CONTAINS(status.getStatus().reason(), "Invalid connection string"); }); @@ -493,10 +493,10 @@ TEST_F(AddShardTest, EmptyShardName) { auto future = launchAsync([this, expectedShardName] { auto status = - catalogClient()->addShard(operationContext(), - &expectedShardName, - assertGet(ConnectionString::parse("StandaloneHost:12345")), - 100); + catalogManager()->addShard(operationContext(), + &expectedShardName, + assertGet(ConnectionString::parse("StandaloneHost:12345")), + 100); ASSERT_EQUALS(ErrorCodes::BadValue, status); ASSERT_EQUALS("shard name cannot be empty", status.getStatus().reason()); }); @@ -517,10 +517,10 @@ TEST_F(AddShardTest, UnreachableHost) { auto future = launchAsync([this, expectedShardName] { auto status = - catalogClient()->addShard(operationContext(), - &expectedShardName, - assertGet(ConnectionString::parse("StandaloneHost:12345")), - 100); + catalogManager()->addShard(operationContext(), + &expectedShardName, + assertGet(ConnectionString::parse("StandaloneHost:12345")), + 100); ASSERT_EQUALS(ErrorCodes::HostUnreachable, status); ASSERT_EQUALS("host unreachable", status.getStatus().reason()); }); @@ -544,10 +544,10 @@ TEST_F(AddShardTest, AddMongosAsShard) { auto future = launchAsync([this, expectedShardName] { auto status = - catalogClient()->addShard(operationContext(), - &expectedShardName, - assertGet(ConnectionString::parse("StandaloneHost:12345")), - 100); + catalogManager()->addShard(operationContext(), + &expectedShardName, + assertGet(ConnectionString::parse("StandaloneHost:12345")), + 100); ASSERT_EQUALS(ErrorCodes::RPCProtocolNegotiationFailed, status); }); @@ -571,10 +571,10 @@ TEST_F(AddShardTest, AddExistingShardStandalone) { auto future = launchAsync([this, expectedShardName, shardTarget] { auto status = - catalogClient()->addShard(operationContext(), - &expectedShardName, - assertGet(ConnectionString::parse(shardTarget.toString())), - 100); + catalogManager()->addShard(operationContext(), + &expectedShardName, + assertGet(ConnectionString::parse(shardTarget.toString())), + 100); ASSERT_EQUALS(ErrorCodes::OperationFailed, status); ASSERT_STRING_CONTAINS(status.getStatus().reason(), "is already a member of the existing shard"); @@ -603,7 +603,7 @@ TEST_F(AddShardTest, AddExistingShardReplicaSet) { auto future = launchAsync([this, expectedShardName, connString] { auto status = - catalogClient()->addShard(operationContext(), &expectedShardName, connString, 100); + catalogManager()->addShard(operationContext(), &expectedShardName, connString, 100); ASSERT_EQUALS(ErrorCodes::OperationFailed, status); ASSERT_STRING_CONTAINS(status.getStatus().reason(), "is already a member of the existing shard"); @@ -630,10 +630,10 @@ TEST_F(AddShardTest, AddReplicaSetShardAsStandalone) { auto future = launchAsync([this, expectedShardName, shardTarget] { auto status = - catalogClient()->addShard(operationContext(), - &expectedShardName, - assertGet(ConnectionString::parse(shardTarget.toString())), - 100); + catalogManager()->addShard(operationContext(), + &expectedShardName, + assertGet(ConnectionString::parse(shardTarget.toString())), + 100); ASSERT_EQUALS(ErrorCodes::OperationFailed, status); ASSERT_STRING_CONTAINS(status.getStatus().reason(), "use replica set url format"); }); @@ -660,7 +660,7 @@ TEST_F(AddShardTest, AddStandaloneHostShardAsReplicaSet) { auto future = launchAsync([this, expectedShardName, connString] { auto status = - catalogClient()->addShard(operationContext(), &expectedShardName, connString, 100); + catalogManager()->addShard(operationContext(), &expectedShardName, connString, 100); ASSERT_EQUALS(ErrorCodes::OperationFailed, status); ASSERT_STRING_CONTAINS(status.getStatus().reason(), "host did not return a set name"); }); @@ -686,7 +686,7 @@ TEST_F(AddShardTest, ReplicaSetMistmatchedReplicaSetName) { auto future = launchAsync([this, expectedShardName, connString] { auto status = - catalogClient()->addShard(operationContext(), &expectedShardName, connString, 100); + catalogManager()->addShard(operationContext(), &expectedShardName, connString, 100); ASSERT_EQUALS(ErrorCodes::OperationFailed, status); ASSERT_STRING_CONTAINS(status.getStatus().reason(), "does not match the actual set name"); }); @@ -713,7 +713,7 @@ TEST_F(AddShardTest, ShardIsCSRSConfigServer) { auto future = launchAsync([this, expectedShardName, connString] { auto status = - catalogClient()->addShard(operationContext(), &expectedShardName, connString, 100); + catalogManager()->addShard(operationContext(), &expectedShardName, connString, 100); ASSERT_EQUALS(ErrorCodes::OperationFailed, status); ASSERT_STRING_CONTAINS(status.getStatus().reason(), "as a shard since it is a config server"); @@ -743,7 +743,7 @@ TEST_F(AddShardTest, ReplicaSetMissingHostsProvidedInSeedList) { auto future = launchAsync([this, expectedShardName, connString] { auto status = - catalogClient()->addShard(operationContext(), &expectedShardName, connString, 100); + catalogManager()->addShard(operationContext(), &expectedShardName, connString, 100); ASSERT_EQUALS(ErrorCodes::OperationFailed, status); ASSERT_STRING_CONTAINS(status.getStatus().reason(), "host2:12345 does not belong to replica set"); @@ -775,7 +775,7 @@ TEST_F(AddShardTest, ShardNameIsConfig) { auto future = launchAsync([this, expectedShardName, connString] { auto status = - catalogClient()->addShard(operationContext(), &expectedShardName, connString, 100); + catalogManager()->addShard(operationContext(), &expectedShardName, connString, 100); ASSERT_EQUALS(ErrorCodes::BadValue, status); ASSERT_EQUALS(status.getStatus().reason(), "use of shard replica set with name 'config' is not allowed"); @@ -807,7 +807,7 @@ TEST_F(AddShardTest, ShardContainsExistingDatabase) { auto future = launchAsync([this, expectedShardName, connString] { auto status = - catalogClient()->addShard(operationContext(), &expectedShardName, connString, 100); + catalogManager()->addShard(operationContext(), &expectedShardName, connString, 100); ASSERT_EQUALS(ErrorCodes::OperationFailed, status); ASSERT_STRING_CONTAINS( status.getStatus().reason(), @@ -851,7 +851,7 @@ TEST_F(AddShardTest, ReAddExistingShard) { auto future = launchAsync([this, expectedShardName, connString] { auto status = - catalogClient()->addShard(operationContext(), &expectedShardName, connString, 100); + catalogManager()->addShard(operationContext(), &expectedShardName, connString, 100); ASSERT_OK(status); }); @@ -918,7 +918,7 @@ TEST_F(AddShardTest, SuccessfullyAddReplicaSet) { auto future = launchAsync([this, expectedShardName, connString] { auto status = - catalogClient()->addShard(operationContext(), &expectedShardName, connString, 100); + catalogManager()->addShard(operationContext(), &expectedShardName, connString, 100); ASSERT_OK(status); auto shardName = status.getValue(); ASSERT_EQUALS(expectedShardName, shardName); @@ -979,7 +979,7 @@ TEST_F(AddShardTest, AddShardSucceedsEvenIfAddingDBsFromNewShardFails) { auto future = launchAsync([this, expectedShardName, connString] { auto status = - catalogClient()->addShard(operationContext(), &expectedShardName, connString, 100); + catalogManager()->addShard(operationContext(), &expectedShardName, connString, 100); ASSERT_OK(status); auto shardName = status.getValue(); ASSERT_EQUALS(expectedShardName, shardName); @@ -1064,7 +1064,7 @@ TEST_F(AddShardTest, ReplicaSetExtraHostsDiscovered) { auto future = launchAsync([this, expectedShardName, seedString] { auto status = - catalogClient()->addShard(operationContext(), &expectedShardName, seedString, 100); + catalogManager()->addShard(operationContext(), &expectedShardName, seedString, 100); ASSERT_OK(status); auto shardName = status.getValue(); ASSERT_EQUALS(expectedShardName, shardName); diff --git a/src/mongo/s/catalog/replset/sharding_catalog_client_impl.cpp b/src/mongo/s/catalog/replset/sharding_catalog_client_impl.cpp index f78d6a7ba62..71c060df46f 100644 --- a/src/mongo/s/catalog/replset/sharding_catalog_client_impl.cpp +++ b/src/mongo/s/catalog/replset/sharding_catalog_client_impl.cpp @@ -162,228 +162,9 @@ Status _processBatchWriteResponse(StatusWith<Shard::CommandResponse> response, } // namespace -StatusWith<ShardType> ShardingCatalogClientImpl::_validateHostAsShard( - OperationContext* txn, - ShardRegistry* shardRegistry, - const ConnectionString& connectionString, - const std::string* shardProposedName) { - if (connectionString.type() == ConnectionString::INVALID) { - return {ErrorCodes::BadValue, "Invalid connection string"}; - } - - if (shardProposedName && shardProposedName->empty()) { - return {ErrorCodes::BadValue, "shard name cannot be empty"}; - } - - // TODO: Don't create a detached Shard object, create a detached RemoteCommandTargeter instead. - const std::shared_ptr<Shard> shardConn{shardRegistry->createConnection(connectionString)}; - invariant(shardConn); - auto targeter = shardConn->getTargeter(); - - // Check whether any host in the connection is already part of the cluster. - shardRegistry->reload(txn); - for (const auto& hostAndPort : connectionString.getServers()) { - std::shared_ptr<Shard> shard; - shard = shardRegistry->getShardNoReload(hostAndPort.toString()); - if (shard) { - return {ErrorCodes::OperationFailed, - str::stream() << "'" << hostAndPort.toString() << "' " - << "is already a member of the existing shard '" - << shard->getConnString().toString() - << "' (" - << shard->getId() - << ")."}; - } - } - - // Check for mongos and older version mongod connections, and whether the hosts - // can be found for the user specified replset. - auto swCommandResponse = - _runCommandForAddShard(txn, targeter.get(), "admin", BSON("isMaster" << 1)); - if (!swCommandResponse.isOK()) { - if (swCommandResponse.getStatus() == ErrorCodes::RPCProtocolNegotiationFailed) { - // Mongos to mongos commands are no longer supported in the wire protocol - // (because mongos does not support OP_COMMAND), similarly for a new mongos - // and an old mongod. So the call will fail in such cases. - // TODO: If/When mongos ever supports opCommands, this logic will break because - // cmdStatus will be OK. - return {ErrorCodes::RPCProtocolNegotiationFailed, - str::stream() << shardConn->toString() - << " does not recognize the RPC protocol being used. This is" - << " likely because it contains a node that is a mongos or an old" - << " version of mongod."}; - } else { - return swCommandResponse.getStatus(); - } - } - - // Check for a command response error - auto resIsMasterStatus = std::move(swCommandResponse.getValue().commandStatus); - if (!resIsMasterStatus.isOK()) { - return {resIsMasterStatus.code(), - str::stream() << "Error running isMaster against " << shardConn->toString() << ": " - << causedBy(resIsMasterStatus)}; - } - - auto resIsMaster = std::move(swCommandResponse.getValue().response); - - // Check whether there is a master. If there isn't, the replica set may not have been - // initiated. If the connection is a standalone, it will return true for isMaster. - bool isMaster; - Status status = bsonExtractBooleanField(resIsMaster, "ismaster", &isMaster); - if (!status.isOK()) { - return Status(status.code(), - str::stream() << "isMaster returned invalid 'ismaster' " - << "field when attempting to add " - << connectionString.toString() - << " as a shard: " - << status.reason()); - } - if (!isMaster) { - return {ErrorCodes::NotMaster, - str::stream() - << connectionString.toString() - << " does not have a master. If this is a replica set, ensure that it has a" - << " healthy primary and that the set has been properly initiated."}; - } - - const string providedSetName = connectionString.getSetName(); - const string foundSetName = resIsMaster["setName"].str(); - - // Make sure the specified replica set name (if any) matches the actual shard's replica set - if (providedSetName.empty() && !foundSetName.empty()) { - return {ErrorCodes::OperationFailed, - str::stream() << "host is part of set " << foundSetName << "; " - << "use replica set url format " - << "<setname>/<server1>,<server2>, ..."}; - } - - if (!providedSetName.empty() && foundSetName.empty()) { - return {ErrorCodes::OperationFailed, - str::stream() << "host did not return a set name; " - << "is the replica set still initializing? " - << resIsMaster}; - } - - // Make sure the set name specified in the connection string matches the one where its hosts - // belong into - if (!providedSetName.empty() && (providedSetName != foundSetName)) { - return {ErrorCodes::OperationFailed, - str::stream() << "the provided connection string (" << connectionString.toString() - << ") does not match the actual set name " - << foundSetName}; - } - - // Is it a config server? - if (resIsMaster.hasField("configsvr")) { - return {ErrorCodes::OperationFailed, - str::stream() << "Cannot add " << connectionString.toString() - << " as a shard since it is a config server"}; - } - - // If the shard is part of a replica set, make sure all the hosts mentioned in the connection - // string are part of the set. It is fine if not all members of the set are mentioned in the - // connection string, though. - if (!providedSetName.empty()) { - std::set<string> hostSet; - - BSONObjIterator iter(resIsMaster["hosts"].Obj()); - while (iter.more()) { - hostSet.insert(iter.next().String()); // host:port - } - - if (resIsMaster["passives"].isABSONObj()) { - BSONObjIterator piter(resIsMaster["passives"].Obj()); - while (piter.more()) { - hostSet.insert(piter.next().String()); // host:port - } - } - - if (resIsMaster["arbiters"].isABSONObj()) { - BSONObjIterator piter(resIsMaster["arbiters"].Obj()); - while (piter.more()) { - hostSet.insert(piter.next().String()); // host:port - } - } - - vector<HostAndPort> hosts = connectionString.getServers(); - for (size_t i = 0; i < hosts.size(); i++) { - const string host = hosts[i].toString(); // host:port - if (hostSet.find(host) == hostSet.end()) { - return {ErrorCodes::OperationFailed, - str::stream() << "in seed list " << connectionString.toString() << ", host " - << host - << " does not belong to replica set " - << foundSetName - << "; found " - << resIsMaster.toString()}; - } - } - } - - string actualShardName; - - if (shardProposedName) { - actualShardName = *shardProposedName; - } else if (!foundSetName.empty()) { - // Default it to the name of the replica set - actualShardName = foundSetName; - } - - // Disallow adding shard replica set with name 'config' - if (actualShardName == "config") { - return {ErrorCodes::BadValue, "use of shard replica set with name 'config' is not allowed"}; - } - - // Retrieve the most up to date connection string that we know from the replica set monitor (if - // this is a replica set shard, otherwise it will be the same value as connectionString). - ConnectionString actualShardConnStr = shardConn->getTargeter()->connectionString(); - - ShardType shard; - shard.setName(actualShardName); - shard.setHost(actualShardConnStr.toString()); - - return shard; -} - -StatusWith<std::vector<std::string>> ShardingCatalogClientImpl::_getDBNamesListFromShard( - OperationContext* txn, ShardRegistry* shardRegistry, const ConnectionString& connectionString) { - // TODO: Don't create a detached Shard object, create a detached RemoteCommandTargeter instead. - const std::shared_ptr<Shard> shardConn{ - shardRegistry->createConnection(connectionString).release()}; - invariant(shardConn); - - auto swCommandResponse = _runCommandForAddShard( - txn, shardConn->getTargeter().get(), "admin", BSON("listDatabases" << 1)); - if (!swCommandResponse.isOK()) { - return swCommandResponse.getStatus(); - } - - auto cmdStatus = std::move(swCommandResponse.getValue().commandStatus); - if (!cmdStatus.isOK()) { - return cmdStatus; - } - - auto cmdResult = std::move(swCommandResponse.getValue().response); - - vector<string> dbNames; - - for (const auto& dbEntry : cmdResult["databases"].Obj()) { - const string& dbName = dbEntry["name"].String(); - - if (!(dbName == "local" || dbName == "admin")) { - dbNames.push_back(dbName); - } - } - - return dbNames; -} - ShardingCatalogClientImpl::ShardingCatalogClientImpl( - std::unique_ptr<DistLockManager> distLockManager, - std::unique_ptr<executor::TaskExecutor> addShardExecutor) - : _distLockManager(std::move(distLockManager)), - _executorForAddShard(std::move(addShardExecutor)) {} + std::unique_ptr<DistLockManager> distLockManager) + : _distLockManager(std::move(distLockManager)) {} ShardingCatalogClientImpl::~ShardingCatalogClientImpl() = default; @@ -394,7 +175,6 @@ Status ShardingCatalogClientImpl::startup() { } _started = true; _distLockManager->startUp(); - _executorForAddShard->startup(); return Status::OK(); } @@ -407,199 +187,6 @@ void ShardingCatalogClientImpl::shutDown(OperationContext* txn) { invariant(_distLockManager); _distLockManager->shutDown(txn); - _executorForAddShard->shutdown(); - _executorForAddShard->join(); -} - -StatusWith<Shard::CommandResponse> ShardingCatalogClientImpl::_runCommandForAddShard( - OperationContext* txn, - RemoteCommandTargeter* targeter, - const std::string& dbName, - const BSONObj& cmdObj) { - auto host = targeter->findHost(ReadPreferenceSetting{ReadPreference::PrimaryOnly}, - RemoteCommandTargeter::selectFindHostMaxWaitTime(txn)); - if (!host.isOK()) { - return host.getStatus(); - } - - executor::RemoteCommandRequest request( - host.getValue(), dbName, cmdObj, rpc::makeEmptyMetadata(), Seconds(30)); - StatusWith<executor::RemoteCommandResponse> swResponse = - Status(ErrorCodes::InternalError, "Internal error running command"); - - auto callStatus = _executorForAddShard->scheduleRemoteCommand( - request, [&swResponse](const executor::TaskExecutor::RemoteCommandCallbackArgs& args) { - swResponse = args.response; - }); - if (!callStatus.isOK()) { - return callStatus.getStatus(); - } - - // Block until the command is carried out - _executorForAddShard->wait(callStatus.getValue()); - - if (!swResponse.isOK()) { - if (swResponse.getStatus().compareCode(ErrorCodes::ExceededTimeLimit)) { - LOG(0) << "Operation for addShard timed out with status " << swResponse.getStatus(); - } - return swResponse.getStatus(); - } - - BSONObj responseObj = swResponse.getValue().data.getOwned(); - BSONObj responseMetadata = swResponse.getValue().metadata.getOwned(); - Status commandStatus = getStatusFromCommandResult(responseObj); - Status writeConcernStatus = getWriteConcernStatusFromCommandResult(responseObj); - - return Shard::CommandResponse(std::move(responseObj), - std::move(responseMetadata), - std::move(commandStatus), - std::move(writeConcernStatus)); -} - -StatusWith<string> ShardingCatalogClientImpl::addShard( - OperationContext* txn, - const std::string* shardProposedName, - const ConnectionString& shardConnectionString, - const long long maxSize) { - // Validate the specified connection string may serve as shard at all - auto shardStatus = - _validateHostAsShard(txn, grid.shardRegistry(), shardConnectionString, shardProposedName); - if (!shardStatus.isOK()) { - // TODO: This is a workaround for the case were we could have some bad shard being - // requested to be added and we put that bad connection string on the global replica set - // monitor registry. It needs to be cleaned up so that when a correct replica set is added, - // it will be recreated. - ReplicaSetMonitor::remove(shardConnectionString.getSetName()); - return shardStatus.getStatus(); - } - - ShardType& shardType = shardStatus.getValue(); - - auto dbNamesStatus = _getDBNamesListFromShard(txn, grid.shardRegistry(), shardConnectionString); - if (!dbNamesStatus.isOK()) { - return dbNamesStatus.getStatus(); - } - - // Check that none of the existing shard candidate's dbs exist already - for (const string& dbName : dbNamesStatus.getValue()) { - auto dbt = getDatabase(txn, dbName); - if (dbt.isOK()) { - const auto& dbDoc = dbt.getValue().value; - return Status(ErrorCodes::OperationFailed, - str::stream() << "can't add shard " - << "'" - << shardConnectionString.toString() - << "'" - << " because a local database '" - << dbName - << "' exists in another " - << dbDoc.getPrimary()); - } else if (dbt != ErrorCodes::NamespaceNotFound) { - return dbt.getStatus(); - } - } - - // If a name for a shard wasn't provided, generate one - if (shardType.getName().empty()) { - StatusWith<string> result = _generateNewShardName(txn); - if (!result.isOK()) { - return result.getStatus(); - } - shardType.setName(result.getValue()); - } - - if (maxSize > 0) { - shardType.setMaxSizeMB(maxSize); - } - - ShardIdentityType shardIdentity; - shardIdentity.setConfigsvrConnString( - Grid::get(txn)->shardRegistry()->getConfigServerConnectionString()); - shardIdentity.setShardName(shardType.getName()); - shardIdentity.setClusterId(Grid::get(txn)->shardRegistry()->getClusterId()); - auto validateStatus = shardIdentity.validate(); - if (!validateStatus.isOK()) { - return validateStatus; - } - - log() << "going to insert shardIdentity document into shard: " << shardIdentity.toString(); - - auto updateRequest = shardIdentity.createUpsertForAddShard(); - BatchedCommandRequest commandRequest(updateRequest.release()); - commandRequest.setNS(NamespaceString::kConfigCollectionNamespace); - commandRequest.setWriteConcern(kMajorityWriteConcern.toBSON()); - - const std::shared_ptr<Shard> shardConn{ - Grid::get(txn)->shardRegistry()->createConnection(shardConnectionString)}; - invariant(shardConn); - auto targeter = shardConn->getTargeter(); - - auto swCommandResponse = - _runCommandForAddShard(txn, targeter.get(), "admin", commandRequest.toBSON()); - - if (!swCommandResponse.isOK()) { - return swCommandResponse.getStatus(); - } - - auto commandResponse = std::move(swCommandResponse.getValue()); - - BatchedCommandResponse batchResponse; - auto batchResponseStatus = _processBatchWriteResponse(commandResponse, &batchResponse); - if (!batchResponseStatus.isOK()) { - return batchResponseStatus; - } - - log() << "going to insert new entry for shard into config.shards: " << shardType.toString(); - - Status result = insertConfigDocument(txn, ShardType::ConfigNS, shardType.toBSON()); - if (!result.isOK()) { - log() << "error adding shard: " << shardType.toBSON() << " err: " << result.reason(); - if (result == ErrorCodes::DuplicateKey) { - // TODO(SERVER-24213): adding a shard that already exists should be considered success, - // however this approach does no validation that we are adding the shard with the same - // options. It also does not protect against adding the same shard with a different - // shard name and slightly different connection string. This is a temporary hack to - // get the continuous stepdown suite passing. - warning() << "Received duplicate key error when inserting new shard with name " - << shardType.getName() << " and connection string " - << shardConnectionString.toString() - << " to config.shards collection. This most likely means that there was an " - "attempt to add a shard that already exists in the cluster"; - return shardType.getName(); - } - return result; - } - - // Add all databases which were discovered on the new shard - for (const string& dbName : dbNamesStatus.getValue()) { - DatabaseType dbt; - dbt.setName(dbName); - dbt.setPrimary(shardType.getName()); - dbt.setSharded(false); - - Status status = updateDatabase(txn, dbName, dbt); - if (!status.isOK()) { - log() << "adding shard " << shardConnectionString.toString() - << " even though could not add database " << dbName; - } - } - - // Record in changelog - BSONObjBuilder shardDetails; - shardDetails.append("name", shardType.getName()); - shardDetails.append("host", shardConnectionString.toString()); - - logChange(txn, "addShard", "", shardDetails.obj()); - - // Ensure the added shard is visible to this process. - auto shardRegistry = Grid::get(txn)->shardRegistry(); - if (!shardRegistry->getShard(txn, shardType.getName())) { - return {ErrorCodes::OperationFailed, - "Could not find shard metadata for shard after adding it. This most likely " - "indicates that the shard was removed immediately after it was added."}; - } - - return shardType.getName(); } Status ShardingCatalogClientImpl::updateCollection(OperationContext* txn, @@ -1924,44 +1511,6 @@ Status ShardingCatalogClientImpl::_checkDbDoesNotExist(OperationContext* txn, << dbName); } -StatusWith<std::string> ShardingCatalogClientImpl::_generateNewShardName(OperationContext* txn) { - BSONObjBuilder shardNameRegex; - shardNameRegex.appendRegex(ShardType::name(), "^shard"); - - auto findStatus = _exhaustiveFindOnConfig(txn, - kConfigReadSelector, - NamespaceString(ShardType::ConfigNS), - shardNameRegex.obj(), - BSON(ShardType::name() << -1), - 1); - if (!findStatus.isOK()) { - return findStatus.getStatus(); - } - - const auto& docs = findStatus.getValue().value; - - int count = 0; - if (!docs.empty()) { - const auto shardStatus = ShardType::fromBSON(docs.front()); - if (!shardStatus.isOK()) { - return shardStatus.getStatus(); - } - - std::istringstream is(shardStatus.getValue().getName().substr(5)); - is >> count; - count++; - } - - // TODO fix so that we can have more than 10000 automatically generated shard names - if (count < 9999) { - std::stringstream ss; - ss << "shard" << std::setfill('0') << std::setw(4) << count; - return ss.str(); - } - - return Status(ErrorCodes::OperationFailed, "unable to generate new shard name"); -} - Status ShardingCatalogClientImpl::_createCappedConfigCollection(OperationContext* txn, StringData collName, int cappedSize) { @@ -2187,8 +1736,4 @@ Status ShardingCatalogClientImpl::appendInfoForConfigServerDatabases(OperationCo return Status::OK(); } -void ShardingCatalogClientImpl::appendConnectionStats(executor::ConnectionPoolStats* stats) { - _executorForAddShard->appendConnectionStats(stats); -} - } // namespace mongo diff --git a/src/mongo/s/catalog/replset/sharding_catalog_client_impl.h b/src/mongo/s/catalog/replset/sharding_catalog_client_impl.h index 9f8201f8d23..48df7545263 100644 --- a/src/mongo/s/catalog/replset/sharding_catalog_client_impl.h +++ b/src/mongo/s/catalog/replset/sharding_catalog_client_impl.h @@ -49,8 +49,7 @@ class TaskExecutor; */ class ShardingCatalogClientImpl final : public ShardingCatalogClient { public: - ShardingCatalogClientImpl(std::unique_ptr<DistLockManager> distLockManager, - std::unique_ptr<executor::TaskExecutor> addShardExecutor); + explicit ShardingCatalogClientImpl(std::unique_ptr<DistLockManager> distLockManager); virtual ~ShardingCatalogClientImpl(); /** @@ -63,11 +62,6 @@ public: Status enableSharding(OperationContext* txn, const std::string& dbName) override; - StatusWith<std::string> addShard(OperationContext* txn, - const std::string* shardProposedName, - const ConnectionString& shardConnectionString, - const long long maxSize) override; - Status updateDatabase(OperationContext* txn, const std::string& dbName, const DatabaseType& db) override; @@ -182,8 +176,6 @@ public: Status appendInfoForConfigServerDatabases(OperationContext* txn, BSONArrayBuilder* builder) override; - void appendConnectionStats(executor::ConnectionPoolStats* stats) override; - /** * Runs a read command against the config server with majority read concern. */ @@ -213,42 +205,6 @@ private: Status _checkDbDoesNotExist(OperationContext* txn, const std::string& dbName, DatabaseType* db); /** - * Generates a unique name to be given to a newly added shard. - */ - StatusWith<std::string> _generateNewShardName(OperationContext* txn); - - /** - * Validates that the specified connection string can serve as a shard server. In particular, - * this function checks that the shard can be contacted, that it is not already member of - * another sharded cluster and etc. - * - * @param shardRegistry Shard registry to use for getting a targeter to the shard-to-be. - * @param connectionString Connection string to be attempted as a shard host. - * @param shardProposedName Optional proposed name for the shard. Can be omitted in which case - * a unique name for the shard will be generated from the shard's connection string. If it - * is not omitted, the value cannot be the empty string. - * - * On success returns a partially initialized ShardType object corresponding to the requested - * shard. It will have the hostName field set and optionally the name, if the name could be - * generated from either the proposed name or the connection string set name. The returned - * shard's name should be checked and if empty, one should be generated using some uniform - * algorithm. - */ - StatusWith<ShardType> _validateHostAsShard(OperationContext* txn, - ShardRegistry* shardRegistry, - const ConnectionString& connectionString, - const std::string* shardProposedName); - - /** - * Runs the listDatabases command on the specified host and returns the names of all databases - * it returns excluding those named local and admin, since they serve administrative purpose. - */ - StatusWith<std::vector<std::string>> _getDBNamesListFromShard( - OperationContext* txn, - ShardRegistry* shardRegistry, - const ConnectionString& connectionString); - - /** * Creates the specified collection name in the config database. */ Status _createCappedConfigCollection(OperationContext* txn, @@ -272,15 +228,6 @@ private: const NamespaceString& ns, BSONObj query); - /** - * Runs a command against a "shard" that is not yet in the cluster and thus not present in the - * ShardRegistry. - */ - StatusWith<Shard::CommandResponse> _runCommandForAddShard(OperationContext* txn, - RemoteCommandTargeter* targeter, - const std::string& dbName, - const BSONObj& cmdObj); - StatusWith<repl::OpTimeWith<std::vector<BSONObj>>> _exhaustiveFindOnConfig( OperationContext* txn, const ReadPreferenceSetting& readPref, @@ -337,11 +284,6 @@ private: // Distributed lock manager singleton. std::unique_ptr<DistLockManager> _distLockManager; // (R) - // Executor specifically used for sending commands to servers that are in the process of being - // added as shards. Does not have any connection hook set on it, thus it can be used to talk - // to servers that are not yet in the ShardRegistry. - std::unique_ptr<executor::TaskExecutor> _executorForAddShard; // (R) - // True if shutDown() has been called. False, otherwise. bool _inShutdown = false; // (M) diff --git a/src/mongo/s/catalog/replset/sharding_catalog_manager_impl.cpp b/src/mongo/s/catalog/replset/sharding_catalog_manager_impl.cpp new file mode 100644 index 00000000000..239d5e0c28d --- /dev/null +++ b/src/mongo/s/catalog/replset/sharding_catalog_manager_impl.cpp @@ -0,0 +1,602 @@ +/** + * Copyright (C) 2015 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kSharding + +#include "mongo/platform/basic.h" + +#include "mongo/s/catalog/replset/sharding_catalog_manager_impl.h" + +#include <iomanip> + +#include "mongo/base/status.h" +#include "mongo/base/status_with.h" +#include "mongo/bson/bsonobjbuilder.h" +#include "mongo/bson/util/bson_extract.h" +#include "mongo/client/connection_string.h" +#include "mongo/client/read_preference.h" +#include "mongo/client/remote_command_targeter.h" +#include "mongo/client/replica_set_monitor.h" +#include "mongo/db/namespace_string.h" +#include "mongo/db/operation_context.h" +#include "mongo/db/s/type_shard_identity.h" +#include "mongo/executor/network_interface.h" +#include "mongo/executor/task_executor.h" +#include "mongo/rpc/get_status_from_command_result.h" +#include "mongo/s/catalog/sharding_catalog_client.h" +#include "mongo/s/catalog/type_database.h" +#include "mongo/s/catalog/type_shard.h" +#include "mongo/s/client/shard.h" +#include "mongo/s/client/shard_registry.h" +#include "mongo/s/grid.h" +#include "mongo/s/set_shard_version_request.h" +#include "mongo/s/write_ops/batched_command_request.h" +#include "mongo/s/write_ops/batched_command_response.h" +#include "mongo/util/log.h" +#include "mongo/util/mongoutils/str.h" +#include "mongo/util/net/hostandport.h" + +namespace mongo { + +using std::string; +using std::vector; +using str::stream; + +namespace { + +const ReadPreferenceSetting kConfigReadSelector(ReadPreference::Nearest, TagSet{}); + +const WriteConcernOptions kMajorityWriteConcern(WriteConcernOptions::kMajority, + // Note: Even though we're setting UNSET here, + // kMajority implies JOURNAL if journaling is + // supported by mongod and + // writeConcernMajorityJournalDefault is set to true + // in the ReplicaSetConfig. + WriteConcernOptions::SyncMode::UNSET, + Seconds(15)); + +void toBatchError(const Status& status, BatchedCommandResponse* response) { + response->clear(); + response->setErrCode(status.code()); + response->setErrMessage(status.reason()); + response->setOk(false); +} + +/** + * Takes the response from running a batch write command and writes the appropriate response into + * *batchResponse, while also returning the Status of the operation. + */ +Status _processBatchWriteResponse(StatusWith<Shard::CommandResponse> response, + BatchedCommandResponse* batchResponse) { + Status status(ErrorCodes::InternalError, "status not set"); + + if (!response.isOK()) { + status = response.getStatus(); + } else if (!response.getValue().commandStatus.isOK()) { + status = response.getValue().commandStatus; + } else if (!response.getValue().writeConcernStatus.isOK()) { + status = response.getValue().writeConcernStatus; + } else { + string errmsg; + if (!batchResponse->parseBSON(response.getValue().response, &errmsg)) { + status = Status(ErrorCodes::FailedToParse, + str::stream() << "Failed to parse config server response: " << errmsg); + } else { + status = batchResponse->toStatus(); + } + } + + if (!status.isOK()) { + toBatchError(status, batchResponse); + } + + return status; +} +} // namespace + + +ShardingCatalogManagerImpl::ShardingCatalogManagerImpl( + ShardingCatalogClient* catalogClient, std::unique_ptr<executor::TaskExecutor> addShardExecutor) + : _catalogClient(catalogClient), _executorForAddShard(std::move(addShardExecutor)) {} + +ShardingCatalogManagerImpl::~ShardingCatalogManagerImpl() = default; + +Status ShardingCatalogManagerImpl::startup() { + stdx::lock_guard<stdx::mutex> lk(_mutex); + if (_started) { + return Status::OK(); + } + _started = true; + _executorForAddShard->startup(); + return Status::OK(); +} + +void ShardingCatalogManagerImpl::shutDown(OperationContext* txn) { + LOG(1) << "ShardingCatalogManagerImpl::shutDown() called."; + { + stdx::lock_guard<stdx::mutex> lk(_mutex); + _inShutdown = true; + } + + _executorForAddShard->shutdown(); + _executorForAddShard->join(); +} + +StatusWith<Shard::CommandResponse> ShardingCatalogManagerImpl::_runCommandForAddShard( + OperationContext* txn, + RemoteCommandTargeter* targeter, + const std::string& dbName, + const BSONObj& cmdObj) { + auto host = targeter->findHost(ReadPreferenceSetting{ReadPreference::PrimaryOnly}, + RemoteCommandTargeter::selectFindHostMaxWaitTime(txn)); + if (!host.isOK()) { + return host.getStatus(); + } + + executor::RemoteCommandRequest request( + host.getValue(), dbName, cmdObj, rpc::makeEmptyMetadata(), Seconds(30)); + StatusWith<executor::RemoteCommandResponse> swResponse = + Status(ErrorCodes::InternalError, "Internal error running command"); + + auto callStatus = _executorForAddShard->scheduleRemoteCommand( + request, [&swResponse](const executor::TaskExecutor::RemoteCommandCallbackArgs& args) { + swResponse = args.response; + }); + if (!callStatus.isOK()) { + return callStatus.getStatus(); + } + + // Block until the command is carried out + _executorForAddShard->wait(callStatus.getValue()); + + if (!swResponse.isOK()) { + if (swResponse.getStatus().compareCode(ErrorCodes::ExceededTimeLimit)) { + LOG(0) << "Operation for addShard timed out with status " << swResponse.getStatus(); + } + return swResponse.getStatus(); + } + + BSONObj responseObj = swResponse.getValue().data.getOwned(); + BSONObj responseMetadata = swResponse.getValue().metadata.getOwned(); + Status commandStatus = getStatusFromCommandResult(responseObj); + Status writeConcernStatus = getWriteConcernStatusFromCommandResult(responseObj); + + return Shard::CommandResponse(std::move(responseObj), + std::move(responseMetadata), + std::move(commandStatus), + std::move(writeConcernStatus)); +} + +StatusWith<ShardType> ShardingCatalogManagerImpl::_validateHostAsShard( + OperationContext* txn, + ShardRegistry* shardRegistry, + const ConnectionString& connectionString, + const std::string* shardProposedName) { + if (connectionString.type() == ConnectionString::INVALID) { + return {ErrorCodes::BadValue, "Invalid connection string"}; + } + + if (shardProposedName && shardProposedName->empty()) { + return {ErrorCodes::BadValue, "shard name cannot be empty"}; + } + + // TODO: Don't create a detached Shard object, create a detached RemoteCommandTargeter instead. + const std::shared_ptr<Shard> shardConn{shardRegistry->createConnection(connectionString)}; + invariant(shardConn); + auto targeter = shardConn->getTargeter(); + + // Check whether any host in the connection is already part of the cluster. + shardRegistry->reload(txn); + for (const auto& hostAndPort : connectionString.getServers()) { + std::shared_ptr<Shard> shard; + shard = shardRegistry->getShardNoReload(hostAndPort.toString()); + if (shard) { + return {ErrorCodes::OperationFailed, + str::stream() << "'" << hostAndPort.toString() << "' " + << "is already a member of the existing shard '" + << shard->getConnString().toString() + << "' (" + << shard->getId() + << ")."}; + } + } + + // Check for mongos and older version mongod connections, and whether the hosts + // can be found for the user specified replset. + auto swCommandResponse = + _runCommandForAddShard(txn, targeter.get(), "admin", BSON("isMaster" << 1)); + if (!swCommandResponse.isOK()) { + if (swCommandResponse.getStatus() == ErrorCodes::RPCProtocolNegotiationFailed) { + // Mongos to mongos commands are no longer supported in the wire protocol + // (because mongos does not support OP_COMMAND), similarly for a new mongos + // and an old mongod. So the call will fail in such cases. + // TODO: If/When mongos ever supports opCommands, this logic will break because + // cmdStatus will be OK. + return {ErrorCodes::RPCProtocolNegotiationFailed, + str::stream() << shardConn->toString() + << " does not recognize the RPC protocol being used. This is" + << " likely because it contains a node that is a mongos or an old" + << " version of mongod."}; + } else { + return swCommandResponse.getStatus(); + } + } + + // Check for a command response error + auto resIsMasterStatus = std::move(swCommandResponse.getValue().commandStatus); + if (!resIsMasterStatus.isOK()) { + return {resIsMasterStatus.code(), + str::stream() << "Error running isMaster against " << shardConn->toString() << ": " + << causedBy(resIsMasterStatus)}; + } + + auto resIsMaster = std::move(swCommandResponse.getValue().response); + + // Check whether there is a master. If there isn't, the replica set may not have been + // initiated. If the connection is a standalone, it will return true for isMaster. + bool isMaster; + Status status = bsonExtractBooleanField(resIsMaster, "ismaster", &isMaster); + if (!status.isOK()) { + return Status(status.code(), + str::stream() << "isMaster returned invalid 'ismaster' " + << "field when attempting to add " + << connectionString.toString() + << " as a shard: " + << status.reason()); + } + if (!isMaster) { + return {ErrorCodes::NotMaster, + str::stream() + << connectionString.toString() + << " does not have a master. If this is a replica set, ensure that it has a" + << " healthy primary and that the set has been properly initiated."}; + } + + const string providedSetName = connectionString.getSetName(); + const string foundSetName = resIsMaster["setName"].str(); + + // Make sure the specified replica set name (if any) matches the actual shard's replica set + if (providedSetName.empty() && !foundSetName.empty()) { + return {ErrorCodes::OperationFailed, + str::stream() << "host is part of set " << foundSetName << "; " + << "use replica set url format " + << "<setname>/<server1>,<server2>, ..."}; + } + + if (!providedSetName.empty() && foundSetName.empty()) { + return {ErrorCodes::OperationFailed, + str::stream() << "host did not return a set name; " + << "is the replica set still initializing? " + << resIsMaster}; + } + + // Make sure the set name specified in the connection string matches the one where its hosts + // belong into + if (!providedSetName.empty() && (providedSetName != foundSetName)) { + return {ErrorCodes::OperationFailed, + str::stream() << "the provided connection string (" << connectionString.toString() + << ") does not match the actual set name " + << foundSetName}; + } + + // Is it a config server? + if (resIsMaster.hasField("configsvr")) { + return {ErrorCodes::OperationFailed, + str::stream() << "Cannot add " << connectionString.toString() + << " as a shard since it is a config server"}; + } + + // If the shard is part of a replica set, make sure all the hosts mentioned in the connection + // string are part of the set. It is fine if not all members of the set are mentioned in the + // connection string, though. + if (!providedSetName.empty()) { + std::set<string> hostSet; + + BSONObjIterator iter(resIsMaster["hosts"].Obj()); + while (iter.more()) { + hostSet.insert(iter.next().String()); // host:port + } + + if (resIsMaster["passives"].isABSONObj()) { + BSONObjIterator piter(resIsMaster["passives"].Obj()); + while (piter.more()) { + hostSet.insert(piter.next().String()); // host:port + } + } + + if (resIsMaster["arbiters"].isABSONObj()) { + BSONObjIterator piter(resIsMaster["arbiters"].Obj()); + while (piter.more()) { + hostSet.insert(piter.next().String()); // host:port + } + } + + vector<HostAndPort> hosts = connectionString.getServers(); + for (size_t i = 0; i < hosts.size(); i++) { + const string host = hosts[i].toString(); // host:port + if (hostSet.find(host) == hostSet.end()) { + return {ErrorCodes::OperationFailed, + str::stream() << "in seed list " << connectionString.toString() << ", host " + << host + << " does not belong to replica set " + << foundSetName + << "; found " + << resIsMaster.toString()}; + } + } + } + + string actualShardName; + + if (shardProposedName) { + actualShardName = *shardProposedName; + } else if (!foundSetName.empty()) { + // Default it to the name of the replica set + actualShardName = foundSetName; + } + + // Disallow adding shard replica set with name 'config' + if (actualShardName == "config") { + return {ErrorCodes::BadValue, "use of shard replica set with name 'config' is not allowed"}; + } + + // Retrieve the most up to date connection string that we know from the replica set monitor (if + // this is a replica set shard, otherwise it will be the same value as connectionString). + ConnectionString actualShardConnStr = shardConn->getTargeter()->connectionString(); + + ShardType shard; + shard.setName(actualShardName); + shard.setHost(actualShardConnStr.toString()); + + return shard; +} + +StatusWith<std::vector<std::string>> ShardingCatalogManagerImpl::_getDBNamesListFromShard( + OperationContext* txn, ShardRegistry* shardRegistry, const ConnectionString& connectionString) { + // TODO: Don't create a detached Shard object, create a detached RemoteCommandTargeter instead. + const std::shared_ptr<Shard> shardConn{ + shardRegistry->createConnection(connectionString).release()}; + invariant(shardConn); + + auto swCommandResponse = _runCommandForAddShard( + txn, shardConn->getTargeter().get(), "admin", BSON("listDatabases" << 1)); + if (!swCommandResponse.isOK()) { + return swCommandResponse.getStatus(); + } + + auto cmdStatus = std::move(swCommandResponse.getValue().commandStatus); + if (!cmdStatus.isOK()) { + return cmdStatus; + } + + auto cmdResult = std::move(swCommandResponse.getValue().response); + + vector<string> dbNames; + + for (const auto& dbEntry : cmdResult["databases"].Obj()) { + const string& dbName = dbEntry["name"].String(); + + if (!(dbName == "local" || dbName == "admin")) { + dbNames.push_back(dbName); + } + } + + return dbNames; +} + +StatusWith<std::string> ShardingCatalogManagerImpl::_generateNewShardName(OperationContext* txn) { + BSONObjBuilder shardNameRegex; + shardNameRegex.appendRegex(ShardType::name(), "^shard"); + + auto findStatus = Grid::get(txn)->shardRegistry()->getConfigShard()->exhaustiveFindOnConfig( + txn, + kConfigReadSelector, + NamespaceString(ShardType::ConfigNS), + shardNameRegex.obj(), + BSON(ShardType::name() << -1), + 1); + if (!findStatus.isOK()) { + return findStatus.getStatus(); + } + + const auto& docs = findStatus.getValue().docs; + + int count = 0; + if (!docs.empty()) { + const auto shardStatus = ShardType::fromBSON(docs.front()); + if (!shardStatus.isOK()) { + return shardStatus.getStatus(); + } + + std::istringstream is(shardStatus.getValue().getName().substr(5)); + is >> count; + count++; + } + + // TODO fix so that we can have more than 10000 automatically generated shard names + if (count < 9999) { + std::stringstream ss; + ss << "shard" << std::setfill('0') << std::setw(4) << count; + return ss.str(); + } + + return Status(ErrorCodes::OperationFailed, "unable to generate new shard name"); +} + +StatusWith<string> ShardingCatalogManagerImpl::addShard( + OperationContext* txn, + const std::string* shardProposedName, + const ConnectionString& shardConnectionString, + const long long maxSize) { + // Validate the specified connection string may serve as shard at all + auto shardStatus = + _validateHostAsShard(txn, grid.shardRegistry(), shardConnectionString, shardProposedName); + if (!shardStatus.isOK()) { + // TODO: This is a workaround for the case were we could have some bad shard being + // requested to be added and we put that bad connection string on the global replica set + // monitor registry. It needs to be cleaned up so that when a correct replica set is added, + // it will be recreated. + ReplicaSetMonitor::remove(shardConnectionString.getSetName()); + return shardStatus.getStatus(); + } + + ShardType& shardType = shardStatus.getValue(); + + auto dbNamesStatus = _getDBNamesListFromShard(txn, grid.shardRegistry(), shardConnectionString); + if (!dbNamesStatus.isOK()) { + return dbNamesStatus.getStatus(); + } + + // Check that none of the existing shard candidate's dbs exist already + for (const string& dbName : dbNamesStatus.getValue()) { + auto dbt = _catalogClient->getDatabase(txn, dbName); + if (dbt.isOK()) { + const auto& dbDoc = dbt.getValue().value; + return Status(ErrorCodes::OperationFailed, + str::stream() << "can't add shard " + << "'" + << shardConnectionString.toString() + << "'" + << " because a local database '" + << dbName + << "' exists in another " + << dbDoc.getPrimary()); + } else if (dbt != ErrorCodes::NamespaceNotFound) { + return dbt.getStatus(); + } + } + + // If a name for a shard wasn't provided, generate one + if (shardType.getName().empty()) { + StatusWith<string> result = _generateNewShardName(txn); + if (!result.isOK()) { + return result.getStatus(); + } + shardType.setName(result.getValue()); + } + + if (maxSize > 0) { + shardType.setMaxSizeMB(maxSize); + } + + ShardIdentityType shardIdentity; + shardIdentity.setConfigsvrConnString( + Grid::get(txn)->shardRegistry()->getConfigServerConnectionString()); + shardIdentity.setShardName(shardType.getName()); + shardIdentity.setClusterId(Grid::get(txn)->shardRegistry()->getClusterId()); + auto validateStatus = shardIdentity.validate(); + if (!validateStatus.isOK()) { + return validateStatus; + } + + log() << "going to insert shardIdentity document into shard: " << shardIdentity.toString(); + + auto updateRequest = shardIdentity.createUpsertForAddShard(); + BatchedCommandRequest commandRequest(updateRequest.release()); + commandRequest.setNS(NamespaceString::kConfigCollectionNamespace); + commandRequest.setWriteConcern(kMajorityWriteConcern.toBSON()); + + const std::shared_ptr<Shard> shardConn{ + Grid::get(txn)->shardRegistry()->createConnection(shardConnectionString)}; + invariant(shardConn); + auto targeter = shardConn->getTargeter(); + + auto swCommandResponse = + _runCommandForAddShard(txn, targeter.get(), "admin", commandRequest.toBSON()); + + if (!swCommandResponse.isOK()) { + return swCommandResponse.getStatus(); + } + + auto commandResponse = std::move(swCommandResponse.getValue()); + + BatchedCommandResponse batchResponse; + auto batchResponseStatus = _processBatchWriteResponse(commandResponse, &batchResponse); + if (!batchResponseStatus.isOK()) { + return batchResponseStatus; + } + + log() << "going to insert new entry for shard into config.shards: " << shardType.toString(); + + Status result = + _catalogClient->insertConfigDocument(txn, ShardType::ConfigNS, shardType.toBSON()); + if (!result.isOK()) { + log() << "error adding shard: " << shardType.toBSON() << " err: " << result.reason(); + if (result == ErrorCodes::DuplicateKey) { + // TODO(SERVER-24213): adding a shard that already exists should be considered success, + // however this approach does no validation that we are adding the shard with the same + // options. It also does not protect against adding the same shard with a different + // shard name and slightly different connection string. This is a temporary hack to + // get the continuous stepdown suite passing. + warning() << "Received duplicate key error when inserting new shard with name " + << shardType.getName() << " and connection string " + << shardConnectionString.toString() + << " to config.shards collection. This most likely means that there was an " + "attempt to add a shard that already exists in the cluster"; + return shardType.getName(); + } + return result; + } + + // Add all databases which were discovered on the new shard + for (const string& dbName : dbNamesStatus.getValue()) { + DatabaseType dbt; + dbt.setName(dbName); + dbt.setPrimary(shardType.getName()); + dbt.setSharded(false); + + Status status = _catalogClient->updateDatabase(txn, dbName, dbt); + if (!status.isOK()) { + log() << "adding shard " << shardConnectionString.toString() + << " even though could not add database " << dbName; + } + } + + // Record in changelog + BSONObjBuilder shardDetails; + shardDetails.append("name", shardType.getName()); + shardDetails.append("host", shardConnectionString.toString()); + + _catalogClient->logChange(txn, "addShard", "", shardDetails.obj()); + + // Ensure the added shard is visible to this process. + auto shardRegistry = Grid::get(txn)->shardRegistry(); + if (!shardRegistry->getShard(txn, shardType.getName())) { + return {ErrorCodes::OperationFailed, + "Could not find shard metadata for shard after adding it. This most likely " + "indicates that the shard was removed immediately after it was added."}; + } + + return shardType.getName(); +} + +void ShardingCatalogManagerImpl::appendConnectionStats(executor::ConnectionPoolStats* stats) { + _executorForAddShard->appendConnectionStats(stats); +} + +} // namespace mongo diff --git a/src/mongo/s/catalog/replset/sharding_catalog_manager_impl.h b/src/mongo/s/catalog/replset/sharding_catalog_manager_impl.h new file mode 100644 index 00000000000..7c2bbd05479 --- /dev/null +++ b/src/mongo/s/catalog/replset/sharding_catalog_manager_impl.h @@ -0,0 +1,145 @@ +/** + * Copyright (C) 2015 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include <vector> + +#include "mongo/s/catalog/sharding_catalog_manager.h" +#include "mongo/s/client/shard_registry.h" +#include "mongo/stdx/mutex.h" + +namespace mongo { + +class DatabaseType; +class ShardingCatalogClient; + +namespace executor { +class TaskExecutor; +} // namespace executor + +/** + * Implements the catalog manager for writing to replica set config servers. + */ +class ShardingCatalogManagerImpl final : public ShardingCatalogManager { +public: + ShardingCatalogManagerImpl(ShardingCatalogClient* catalogClient, + std::unique_ptr<executor::TaskExecutor> addShardExecutor); + virtual ~ShardingCatalogManagerImpl(); + + /** + * Safe to call multiple times as long as the calls are externally synchronized to be + * non-overlapping. + */ + Status startup() override; + + void shutDown(OperationContext* txn) override; + + StatusWith<std::string> addShard(OperationContext* txn, + const std::string* shardProposedName, + const ConnectionString& shardConnectionString, + const long long maxSize) override; + + void appendConnectionStats(executor::ConnectionPoolStats* stats) override; + +private: + /** + * Generates a unique name to be given to a newly added shard. + */ + StatusWith<std::string> _generateNewShardName(OperationContext* txn); + + /** + * Validates that the specified connection string can serve as a shard server. In particular, + * this function checks that the shard can be contacted, that it is not already member of + * another sharded cluster and etc. + * + * @param shardRegistry Shard registry to use for getting a targeter to the shard-to-be. + * @param connectionString Connection string to be attempted as a shard host. + * @param shardProposedName Optional proposed name for the shard. Can be omitted in which case + * a unique name for the shard will be generated from the shard's connection string. If it + * is not omitted, the value cannot be the empty string. + * + * On success returns a partially initialized ShardType object corresponding to the requested + * shard. It will have the hostName field set and optionally the name, if the name could be + * generated from either the proposed name or the connection string set name. The returned + * shard's name should be checked and if empty, one should be generated using some uniform + * algorithm. + */ + StatusWith<ShardType> _validateHostAsShard(OperationContext* txn, + ShardRegistry* shardRegistry, + const ConnectionString& connectionString, + const std::string* shardProposedName); + + /** + * Runs the listDatabases command on the specified host and returns the names of all databases + * it returns excluding those named local and admin, since they serve administrative purpose. + */ + StatusWith<std::vector<std::string>> _getDBNamesListFromShard( + OperationContext* txn, + ShardRegistry* shardRegistry, + const ConnectionString& connectionString); + + /** + * Runs a command against a "shard" that is not yet in the cluster and thus not present in the + * ShardRegistry. + */ + StatusWith<Shard::CommandResponse> _runCommandForAddShard(OperationContext* txn, + RemoteCommandTargeter* targeter, + const std::string& dbName, + const BSONObj& cmdObj); + + + // + // All member variables are labeled with one of the following codes indicating the + // synchronization rules for accessing them. + // + // (M) Must hold _mutex for access. + // (R) Read only, can only be written during initialization. + // (S) Self-synchronizing; access in any way from any context. + // + + stdx::mutex _mutex; + + // Pointer to the ShardingCatalogClient that can be used to read config server data. + // This pointer is not owned, so it is important that the object it points to continues to be + // valid for the lifetime of this ShardingCatalogManager. + ShardingCatalogClient* _catalogClient; // (R) + + // Executor specifically used for sending commands to servers that are in the process of being + // added as shards. Does not have any connection hook set on it, thus it can be used to talk + // to servers that are not yet in the ShardRegistry. + std::unique_ptr<executor::TaskExecutor> _executorForAddShard; // (R) + + // True if shutDown() has been called. False, otherwise. + bool _inShutdown = false; // (M) + + // True if startup() has been called. + bool _started = false; // (M) +}; + +} // namespace mongo diff --git a/src/mongo/s/catalog/sharding_catalog_client.h b/src/mongo/s/catalog/sharding_catalog_client.h index e7d6716b1b0..8317debf6db 100644 --- a/src/mongo/s/catalog/sharding_catalog_client.h +++ b/src/mongo/s/catalog/sharding_catalog_client.h @@ -95,7 +95,7 @@ public: /** * Performs implementation-specific startup tasks. Must be run after the catalog client - * has been installed into the global 'grid' object. Implementation do not need to guarantee + * has been installed into the global 'grid' object. Implementations do not need to guarantee * thread safety so callers should employ proper synchronization when calling this method. */ virtual Status startup() = 0; @@ -140,24 +140,6 @@ public: const std::set<ShardId>& initShardIds) = 0; /** - * - * Adds a new shard. It expects a standalone mongod process or replica set to be running - * on the provided address. - * - * @param shardProposedName is an optional string with the proposed name of the shard. - * If it is nullptr, a name will be automatically generated; if not nullptr, it cannot - * contain the empty string. - * @param shardConnectionString is the connection string of the shard being added. - * @param maxSize is the optional space quota in bytes. Zeros means there's - * no limitation to space usage. - * @return either an !OK status or the name of the newly added shard. - */ - virtual StatusWith<std::string> addShard(OperationContext* txn, - const std::string* shardProposedName, - const ConnectionString& shardConnectionString, - const long long maxSize) = 0; - - /** * Tries to remove a shard. To completely remove a shard from a sharded cluster, * the data residing in that shard must be moved to the remaining shards in the * cluster by "draining" chunks from that shard. @@ -440,11 +422,6 @@ public: virtual Status appendInfoForConfigServerDatabases(OperationContext* txn, BSONArrayBuilder* builder) = 0; - /** - * Append information about the connection pools owned by the CatalogClient. - */ - virtual void appendConnectionStats(executor::ConnectionPoolStats* stats) = 0; - virtual StatusWith<DistLockManager::ScopedDistLock> distLock( OperationContext* txn, diff --git a/src/mongo/s/catalog/sharding_catalog_client_mock.cpp b/src/mongo/s/catalog/sharding_catalog_client_mock.cpp index 72836e6b4d7..e13711c040c 100644 --- a/src/mongo/s/catalog/sharding_catalog_client_mock.cpp +++ b/src/mongo/s/catalog/sharding_catalog_client_mock.cpp @@ -67,14 +67,6 @@ Status ShardingCatalogClientMock::shardCollection(OperationContext* txn, return {ErrorCodes::InternalError, "Method not implemented"}; } -StatusWith<string> ShardingCatalogClientMock::addShard( - OperationContext* txn, - const std::string* shardProposedName, - const ConnectionString& shardConnectionString, - const long long maxSize) { - return {ErrorCodes::InternalError, "Method not implemented"}; -} - StatusWith<ShardDrainingStatus> ShardingCatalogClientMock::removeShard(OperationContext* txn, const string& name) { return ShardDrainingStatus::COMPLETED; @@ -233,6 +225,4 @@ Status ShardingCatalogClientMock::appendInfoForConfigServerDatabases(OperationCo return Status::OK(); } -void ShardingCatalogClientMock::appendConnectionStats(executor::ConnectionPoolStats* stats) {} - } // namespace mongo diff --git a/src/mongo/s/catalog/sharding_catalog_client_mock.h b/src/mongo/s/catalog/sharding_catalog_client_mock.h index 177530a8bdc..1bd8c5841e3 100644 --- a/src/mongo/s/catalog/sharding_catalog_client_mock.h +++ b/src/mongo/s/catalog/sharding_catalog_client_mock.h @@ -54,11 +54,6 @@ public: const std::vector<BSONObj>& initPoints, const std::set<ShardId>& initShardIds) override; - StatusWith<std::string> addShard(OperationContext* txn, - const std::string* shardProposedName, - const ConnectionString& shardConnectionString, - const long long maxSize) override; - StatusWith<ShardDrainingStatus> removeShard(OperationContext* txn, const std::string& name) override; @@ -166,8 +161,6 @@ public: Status appendInfoForConfigServerDatabases(OperationContext* txn, BSONArrayBuilder* builder) override; - void appendConnectionStats(executor::ConnectionPoolStats* stats) override; - private: std::unique_ptr<DistLockManagerMock> _mockDistLockMgr; }; diff --git a/src/mongo/s/catalog/sharding_catalog_manager.h b/src/mongo/s/catalog/sharding_catalog_manager.h new file mode 100644 index 00000000000..2ac9413128f --- /dev/null +++ b/src/mongo/s/catalog/sharding_catalog_manager.h @@ -0,0 +1,103 @@ +/** + * Copyright (C) 2015 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include <string> + +#include "mongo/base/disallow_copying.h" + +namespace mongo { + +class ConnectionString; +class OperationContext; +class Status; +template <typename T> +class StatusWith; + +namespace executor { +struct ConnectionPoolStats; +} + +/** + * Abstracts writes of the sharding catalog metadata. + * + * All implementations of this interface should go directly to the persistent backing store + * and should avoid doing any caching of their own. The caching is delegated to a parallel + * read-only view of the catalog, which is maintained by a higher level code. + * + * TODO: Currently the code responsible for writing the sharding catalog metadata is split between + * this class and ShardingCatalogClient. Eventually all methods that write catalog data should be + * moved out of ShardingCatalogClient and into ShardingCatalogManager, here. + */ +class ShardingCatalogManager { + MONGO_DISALLOW_COPYING(ShardingCatalogManager); + +public: + virtual ~ShardingCatalogManager() = default; + + /** + * Performs implementation-specific startup tasks. Must be run after the catalog manager + * has been installed into the global 'grid' object. Implementations do not need to guarantee + * thread safety so callers should employ proper synchronization when calling this method. + */ + virtual Status startup() = 0; + + /** + * Performs necessary cleanup when shutting down cleanly. + */ + virtual void shutDown(OperationContext* txn) = 0; + + /** + * + * Adds a new shard. It expects a standalone mongod process or replica set to be running + * on the provided address. + * + * @param shardProposedName is an optional string with the proposed name of the shard. + * If it is nullptr, a name will be automatically generated; if not nullptr, it cannot + * contain the empty string. + * @param shardConnectionString is the connection string of the shard being added. + * @param maxSize is the optional space quota in bytes. Zeros means there's + * no limitation to space usage. + * @return either an !OK status or the name of the newly added shard. + */ + virtual StatusWith<std::string> addShard(OperationContext* txn, + const std::string* shardProposedName, + const ConnectionString& shardConnectionString, + const long long maxSize) = 0; + + /** + * Append information about the connection pools owned by the CatalogManager. + */ + virtual void appendConnectionStats(executor::ConnectionPoolStats* stats) = 0; + +protected: + ShardingCatalogManager() = default; +}; + +} // namespace mongo diff --git a/src/mongo/s/catalog/sharding_catalog_manager_mock.cpp b/src/mongo/s/catalog/sharding_catalog_manager_mock.cpp new file mode 100644 index 00000000000..1768a1a9925 --- /dev/null +++ b/src/mongo/s/catalog/sharding_catalog_manager_mock.cpp @@ -0,0 +1,60 @@ +/** + * Copyright (C) 2015 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/platform/basic.h" + +#include "mongo/s/catalog/sharding_catalog_manager_mock.h" + +#include "mongo/base/status.h" +#include "mongo/base/status_with.h" + +namespace mongo { + +using std::string; + +ShardingCatalogManagerMock::ShardingCatalogManagerMock() = default; + +ShardingCatalogManagerMock::~ShardingCatalogManagerMock() = default; + +Status ShardingCatalogManagerMock::startup() { + return {ErrorCodes::InternalError, "Method not implemented"}; +} + +void ShardingCatalogManagerMock::shutDown(OperationContext* txn) {} + +StatusWith<string> ShardingCatalogManagerMock::addShard( + OperationContext* txn, + const std::string* shardProposedName, + const ConnectionString& shardConnectionString, + const long long maxSize) { + return {ErrorCodes::InternalError, "Method not implemented"}; +} + +void ShardingCatalogManagerMock::appendConnectionStats(executor::ConnectionPoolStats* stats) {} + +} // namespace mongo diff --git a/src/mongo/s/catalog/sharding_catalog_manager_mock.h b/src/mongo/s/catalog/sharding_catalog_manager_mock.h new file mode 100644 index 00000000000..a56681e37fa --- /dev/null +++ b/src/mongo/s/catalog/sharding_catalog_manager_mock.h @@ -0,0 +1,55 @@ +/** + * Copyright (C) 2015 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include "mongo/s/catalog/sharding_catalog_manager.h" + +namespace mongo { + +/** + * A dummy implementation of ShardingCatalogManager for testing purposes. + */ +class ShardingCatalogManagerMock : public ShardingCatalogManager { +public: + ShardingCatalogManagerMock(); + ~ShardingCatalogManagerMock(); + + Status startup() override; + + void shutDown(OperationContext* txn) override; + + StatusWith<std::string> addShard(OperationContext* txn, + const std::string* shardProposedName, + const ConnectionString& shardConnectionString, + const long long maxSize) override; + + void appendConnectionStats(executor::ConnectionPoolStats* stats) override; +}; + +} // namespace mongo diff --git a/src/mongo/s/grid.cpp b/src/mongo/s/grid.cpp index d4a6a827af5..c6a03319b1c 100644 --- a/src/mongo/s/grid.cpp +++ b/src/mongo/s/grid.cpp @@ -38,6 +38,7 @@ #include "mongo/s/balancer/balancer_configuration.h" #include "mongo/s/catalog/catalog_cache.h" #include "mongo/s/catalog/sharding_catalog_client.h" +#include "mongo/s/catalog/sharding_catalog_manager.h" #include "mongo/s/client/shard_factory.h" #include "mongo/s/client/shard_registry.h" #include "mongo/s/query/cluster_cursor_manager.h" @@ -58,6 +59,7 @@ Grid* Grid::get(OperationContext* operationContext) { } void Grid::init(std::unique_ptr<ShardingCatalogClient> catalogClient, + std::unique_ptr<ShardingCatalogManager> catalogManager, std::unique_ptr<CatalogCache> catalogCache, std::unique_ptr<ShardRegistry> shardRegistry, std::unique_ptr<ClusterCursorManager> cursorManager, @@ -65,6 +67,7 @@ void Grid::init(std::unique_ptr<ShardingCatalogClient> catalogClient, std::unique_ptr<executor::TaskExecutorPool> executorPool, executor::NetworkInterface* network) { invariant(!_catalogClient); + invariant(!_catalogManager); invariant(!_catalogCache); invariant(!_shardRegistry); invariant(!_cursorManager); @@ -73,6 +76,7 @@ void Grid::init(std::unique_ptr<ShardingCatalogClient> catalogClient, invariant(!_network); _catalogClient = std::move(catalogClient); + _catalogManager = std::move(catalogManager); _catalogCache = std::move(catalogCache); _shardRegistry = std::move(shardRegistry); _cursorManager = std::move(cursorManager); @@ -108,6 +112,7 @@ void Grid::advanceConfigOpTime(repl::OpTime opTime) { } void Grid::clearForUnitTests() { + _catalogManager.reset(); _catalogClient.reset(); _catalogCache.reset(); _shardRegistry.reset(); diff --git a/src/mongo/s/grid.h b/src/mongo/s/grid.h index 4cc1cf71970..54ff81f1a5b 100644 --- a/src/mongo/s/grid.h +++ b/src/mongo/s/grid.h @@ -38,6 +38,7 @@ namespace mongo { class BalancerConfiguration; class CatalogCache; class ShardingCatalogClient; +class ShardingCatalogManager; class ClusterCursorManager; class OperationContext; class ShardRegistry; @@ -69,6 +70,7 @@ public: * state using clearForUnitTests. */ void init(std::unique_ptr<ShardingCatalogClient> catalogClient, + std::unique_ptr<ShardingCatalogManager> catalogManager, std::unique_ptr<CatalogCache> catalogCache, std::unique_ptr<ShardRegistry> shardRegistry, std::unique_ptr<ClusterCursorManager> cursorManager, @@ -98,6 +100,14 @@ public: return _catalogClient.get(); } + /** + * Returns a pointer to a ShardingCatalogManager to use for manipulating catalog data stored on + * the config servers. + */ + ShardingCatalogManager* catalogManager() { + return _catalogManager.get(); + } + CatalogCache* catalogCache() const { return _catalogCache.get(); } @@ -149,6 +159,7 @@ public: private: std::unique_ptr<ShardingCatalogClient> _catalogClient; + std::unique_ptr<ShardingCatalogManager> _catalogManager; std::unique_ptr<CatalogCache> _catalogCache; std::unique_ptr<ShardRegistry> _shardRegistry; std::unique_ptr<ClusterCursorManager> _cursorManager; diff --git a/src/mongo/s/server.cpp b/src/mongo/s/server.cpp index 4fc1248f778..09b29dff110 100644 --- a/src/mongo/s/server.cpp +++ b/src/mongo/s/server.cpp @@ -63,6 +63,7 @@ #include "mongo/s/balancer/balancer.h" #include "mongo/s/balancer/balancer_configuration.h" #include "mongo/s/catalog/sharding_catalog_client.h" +#include "mongo/s/catalog/sharding_catalog_manager.h" #include "mongo/s/catalog/type_chunk.h" #include "mongo/s/catalog/type_lockpings.h" #include "mongo/s/catalog/type_locks.h" @@ -300,9 +301,12 @@ static Status initializeSharding(OperationContext* txn) { auto shardFactory = stdx::make_unique<ShardFactory>(std::move(buildersMap), std::move(targeterFactory)); - Status status = - initializeGlobalShardingState(mongosGlobalParams.configdbs, std::move(shardFactory), []() { - return stdx::make_unique<rpc::ShardingEgressMetadataHookForMongos>(); + Status status = initializeGlobalShardingState( + mongosGlobalParams.configdbs, + std::move(shardFactory), + []() { return stdx::make_unique<rpc::ShardingEgressMetadataHookForMongos>(); }, + [](ShardingCatalogClient* catalogClient, std::unique_ptr<executor::TaskExecutor> executor) { + return nullptr; // Only config servers get a real ShardingCatalogManager. }); if (!status.isOK()) { diff --git a/src/mongo/s/sharding_initialization.cpp b/src/mongo/s/sharding_initialization.cpp index 6138842505c..9c3b2b60876 100644 --- a/src/mongo/s/sharding_initialization.cpp +++ b/src/mongo/s/sharding_initialization.cpp @@ -51,6 +51,7 @@ #include "mongo/s/catalog/replset/dist_lock_catalog_impl.h" #include "mongo/s/catalog/replset/replset_dist_lock_manager.h" #include "mongo/s/catalog/replset/sharding_catalog_client_impl.h" +#include "mongo/s/catalog/replset/sharding_catalog_manager_impl.h" #include "mongo/s/client/shard_factory.h" #include "mongo/s/client/shard_registry.h" #include "mongo/s/client/sharding_network_connection_hook.h" @@ -94,10 +95,7 @@ std::unique_ptr<ShardingCatalogClient> makeCatalogClient(ServiceContext* service ReplSetDistLockManager::kDistLockPingInterval, ReplSetDistLockManager::kDistLockExpirationTime); - return stdx::make_unique<ShardingCatalogClientImpl>( - std::move(distLockManager), - makeTaskExecutor( - executor::makeNetworkInterface("NetworkInterfaceASIO-AddShard-TaskExecutor"))); + return stdx::make_unique<ShardingCatalogClientImpl>(std::move(distLockManager)); } std::unique_ptr<TaskExecutorPool> makeTaskExecutorPool( @@ -130,7 +128,8 @@ std::unique_ptr<TaskExecutorPool> makeTaskExecutorPool( Status initializeGlobalShardingState(const ConnectionString& configCS, std::unique_ptr<ShardFactory> shardFactory, - rpc::ShardingEgressMetadataHookBuilder hookBuilder) { + rpc::ShardingEgressMetadataHookBuilder hookBuilder, + ShardingCatalogManagerBuilder catalogManagerBuilder) { if (configCS.type() == ConnectionString::INVALID) { return {ErrorCodes::BadValue, "Unrecognized connection string."}; } @@ -150,8 +149,15 @@ Status initializeGlobalShardingState(const ConnectionString& configCS, HostAndPort(getHostName(), serverGlobalParams.port)); auto rawCatalogClient = catalogClient.get(); + + std::unique_ptr<ShardingCatalogManager> catalogManager = catalogManagerBuilder( + rawCatalogClient, + makeTaskExecutor(executor::makeNetworkInterface("AddShard-TaskExecutor"))); + auto rawCatalogManager = catalogManager.get(); + grid.init( std::move(catalogClient), + std::move(catalogManager), stdx::make_unique<CatalogCache>(), std::move(shardRegistry), stdx::make_unique<ClusterCursorManager>(getGlobalServiceContext()->getPreciseClockSource()), @@ -164,6 +170,14 @@ Status initializeGlobalShardingState(const ConnectionString& configCS, return status; } + if (serverGlobalParams.clusterRole == ClusterRole::ConfigServer) { + // Only config servers get a ShardingCatalogManager. + status = rawCatalogManager->startup(); + if (!status.isOK()) { + return status; + } + } + return Status::OK(); } diff --git a/src/mongo/s/sharding_initialization.h b/src/mongo/s/sharding_initialization.h index b7cb2e174f0..a782dc6474a 100644 --- a/src/mongo/s/sharding_initialization.h +++ b/src/mongo/s/sharding_initialization.h @@ -35,10 +35,18 @@ namespace mongo { +namespace executor { +class TaskExecutor; +} // namespace executor + class ConnectionString; class OperationContext; class ShardFactory; class Status; +class ShardingCatalogClient; +class ShardingCatalogManager; +using ShardingCatalogManagerBuilder = stdx::function<std::unique_ptr<ShardingCatalogManager>( + ShardingCatalogClient*, std::unique_ptr<executor::TaskExecutor>)>; namespace rpc { class ShardingEgressMetadataHook; @@ -52,7 +60,8 @@ using ShardingEgressMetadataHookBuilder = */ Status initializeGlobalShardingState(const ConnectionString& configCS, std::unique_ptr<ShardFactory> shardFactory, - rpc::ShardingEgressMetadataHookBuilder hookBuilder); + rpc::ShardingEgressMetadataHookBuilder hookBuilder, + ShardingCatalogManagerBuilder catalogManagerBuilder); /** * Tries to contact the config server and reload the shard registry until it succeeds or diff --git a/src/mongo/s/sharding_test_fixture.cpp b/src/mongo/s/sharding_test_fixture.cpp index d50b2e75115..0d66216950b 100644 --- a/src/mongo/s/sharding_test_fixture.cpp +++ b/src/mongo/s/sharding_test_fixture.cpp @@ -51,6 +51,7 @@ #include "mongo/s/catalog/catalog_cache.h" #include "mongo/s/catalog/dist_lock_manager_mock.h" #include "mongo/s/catalog/replset/sharding_catalog_client_impl.h" +#include "mongo/s/catalog/replset/sharding_catalog_manager_impl.h" #include "mongo/s/catalog/type_changelog.h" #include "mongo/s/catalog/type_collection.h" #include "mongo/s/catalog/type_shard.h" @@ -122,11 +123,15 @@ void ShardingTestFixture::setUp() { auto uniqueDistLockManager = stdx::make_unique<DistLockManagerMock>(); _distLockManager = uniqueDistLockManager.get(); std::unique_ptr<ShardingCatalogClientImpl> catalogClient( - stdx::make_unique<ShardingCatalogClientImpl>(std::move(uniqueDistLockManager), - std::move(specialExec))); + stdx::make_unique<ShardingCatalogClientImpl>(std::move(uniqueDistLockManager))); _catalogClient = catalogClient.get(); catalogClient->startup(); + std::unique_ptr<ShardingCatalogManagerImpl> catalogManager( + stdx::make_unique<ShardingCatalogManagerImpl>(_catalogClient, std::move(specialExec))); + _catalogManager = catalogManager.get(); + catalogManager->startup(); + ConnectionString configCS = ConnectionString::forReplicaSet( "configRS", {HostAndPort{"TestHost1"}, HostAndPort{"TestHost2"}}); @@ -171,6 +176,7 @@ void ShardingTestFixture::setUp() { // For now initialize the global grid object. All sharding objects will be accessible from there // until we get rid of it. grid.init(std::move(catalogClient), + std::move(catalogManager), stdx::make_unique<CatalogCache>(), std::move(shardRegistry), stdx::make_unique<ClusterCursorManager>(_service->getPreciseClockSource()), @@ -181,6 +187,7 @@ void ShardingTestFixture::setUp() { void ShardingTestFixture::tearDown() { grid.getExecutorPool()->shutdownAndJoin(); + grid.catalogManager()->shutDown(_opCtx.get()); grid.catalogClient(_opCtx.get())->shutDown(_opCtx.get()); grid.clearForUnitTests(); @@ -200,6 +207,10 @@ ShardingCatalogClient* ShardingTestFixture::catalogClient() const { return grid.catalogClient(_opCtx.get()); } +ShardingCatalogManager* ShardingTestFixture::catalogManager() const { + return grid.catalogManager(); +} + ShardingCatalogClientImpl* ShardingTestFixture::getCatalogClient() const { return _catalogClient; } diff --git a/src/mongo/s/sharding_test_fixture.h b/src/mongo/s/sharding_test_fixture.h index f1129c06845..2a930bbefc4 100644 --- a/src/mongo/s/sharding_test_fixture.h +++ b/src/mongo/s/sharding_test_fixture.h @@ -41,6 +41,8 @@ class BSONObj; class CatalogCache; class ShardingCatalogClient; class ShardingCatalogClientImpl; +class ShardingCatalogManager; +class ShardingCatalogManagerImpl; struct ChunkVersion; class CollectionType; class DistLockManagerMock; @@ -78,6 +80,8 @@ protected: ShardingCatalogClient* catalogClient() const; + ShardingCatalogManager* catalogManager() const; + /** * Prefer catalogClient() method over this as much as possible. */ @@ -215,6 +219,7 @@ private: std::unique_ptr<executor::NetworkTestEnv> _addShardNetworkTestEnv; DistLockManagerMock* _distLockManager = nullptr; ShardingCatalogClientImpl* _catalogClient = nullptr; + ShardingCatalogManagerImpl* _catalogManager = nullptr; }; } // namespace mongo |