summaryrefslogtreecommitdiff
path: root/src/mongo
diff options
context:
space:
mode:
authorSpencer T Brody <spencer@mongodb.com>2016-06-01 16:55:05 -0400
committerSpencer T Brody <spencer@mongodb.com>2016-06-13 19:02:47 -0400
commite324289ed29a41f0f6f610dc63ab5d2ce1f9c351 (patch)
tree07b7159767295a97c310816f56d9dea0f534d6fd /src/mongo
parent2d487da181d970df760a40ee253398255ca240d0 (diff)
downloadmongo-e324289ed29a41f0f6f610dc63ab5d2ce1f9c351.tar.gz
SERVER-24323 Add ShardingCatalogManager and move addShard implementation into it
Diffstat (limited to 'src/mongo')
-rw-r--r--src/mongo/db/SConscript1
-rw-r--r--src/mongo/db/commands/conn_pool_stats.cpp6
-rw-r--r--src/mongo/db/s/SConscript4
-rw-r--r--src/mongo/db/s/config/configsvr_add_shard_command.cpp4
-rw-r--r--src/mongo/db/s/sharding_initialization_mongod.cpp19
-rw-r--r--src/mongo/db/s/sharding_state_test.cpp2
-rw-r--r--src/mongo/s/SConscript1
-rw-r--r--src/mongo/s/catalog/SConscript3
-rw-r--r--src/mongo/s/catalog/replset/SConscript18
-rw-r--r--src/mongo/s/catalog/replset/dist_lock_catalog_impl_test.cpp4
-rw-r--r--src/mongo/s/catalog/replset/replset_dist_lock_manager_test.cpp2
-rw-r--r--src/mongo/s/catalog/replset/sharding_catalog_add_shard_test.cpp82
-rw-r--r--src/mongo/s/catalog/replset/sharding_catalog_client_impl.cpp459
-rw-r--r--src/mongo/s/catalog/replset/sharding_catalog_client_impl.h60
-rw-r--r--src/mongo/s/catalog/replset/sharding_catalog_manager_impl.cpp602
-rw-r--r--src/mongo/s/catalog/replset/sharding_catalog_manager_impl.h145
-rw-r--r--src/mongo/s/catalog/sharding_catalog_client.h25
-rw-r--r--src/mongo/s/catalog/sharding_catalog_client_mock.cpp10
-rw-r--r--src/mongo/s/catalog/sharding_catalog_client_mock.h7
-rw-r--r--src/mongo/s/catalog/sharding_catalog_manager.h103
-rw-r--r--src/mongo/s/catalog/sharding_catalog_manager_mock.cpp60
-rw-r--r--src/mongo/s/catalog/sharding_catalog_manager_mock.h55
-rw-r--r--src/mongo/s/grid.cpp5
-rw-r--r--src/mongo/s/grid.h11
-rw-r--r--src/mongo/s/server.cpp10
-rw-r--r--src/mongo/s/sharding_initialization.cpp24
-rw-r--r--src/mongo/s/sharding_initialization.h11
-rw-r--r--src/mongo/s/sharding_test_fixture.cpp15
-rw-r--r--src/mongo/s/sharding_test_fixture.h5
29 files changed, 1132 insertions, 621 deletions
diff --git a/src/mongo/db/SConscript b/src/mongo/db/SConscript
index 8f4c088c26b..8b00a9e41d0 100644
--- a/src/mongo/db/SConscript
+++ b/src/mongo/db/SConscript
@@ -697,6 +697,7 @@ serveronlyLibdeps = [
"$BUILD_DIR/mongo/db/bson/dotted_path_support",
"$BUILD_DIR/mongo/executor/network_interface_factory",
"$BUILD_DIR/mongo/s/catalog/replset/sharding_catalog_client_impl",
+ "$BUILD_DIR/mongo/s/catalog/replset/sharding_catalog_manager_impl",
"$BUILD_DIR/mongo/s/client/sharding_connection_hook",
"$BUILD_DIR/mongo/s/coreshard",
"$BUILD_DIR/mongo/s/serveronly",
diff --git a/src/mongo/db/commands/conn_pool_stats.cpp b/src/mongo/db/commands/conn_pool_stats.cpp
index 15bbe103ce1..2cd000e7d30 100644
--- a/src/mongo/db/commands/conn_pool_stats.cpp
+++ b/src/mongo/db/commands/conn_pool_stats.cpp
@@ -38,9 +38,11 @@
#include "mongo/db/repl/replication_coordinator.h"
#include "mongo/db/repl/replication_coordinator_global.h"
#include "mongo/db/s/sharding_state.h"
+#include "mongo/db/server_options.h"
#include "mongo/executor/connection_pool_stats.h"
#include "mongo/executor/network_interface_factory.h"
#include "mongo/executor/task_executor_pool.h"
+#include "mongo/s/catalog/sharding_catalog_manager.h"
#include "mongo/s/client/shard_registry.h"
#include "mongo/s/grid.h"
@@ -90,7 +92,9 @@ public:
auto grid = Grid::get(txn);
if (grid->shardRegistry()) {
grid->getExecutorPool()->appendConnectionStats(&stats);
- grid->catalogClient(txn)->appendConnectionStats(&stats);
+ if (serverGlobalParams.clusterRole == ClusterRole::ConfigServer) {
+ grid->catalogManager()->appendConnectionStats(&stats);
+ }
}
// Output to a BSON object.
diff --git a/src/mongo/db/s/SConscript b/src/mongo/db/s/SConscript
index 9aa55fb8058..474179ac240 100644
--- a/src/mongo/db/s/SConscript
+++ b/src/mongo/db/s/SConscript
@@ -158,7 +158,7 @@ env.CppUnitTest(
'$BUILD_DIR/mongo/db/serveronly',
'$BUILD_DIR/mongo/executor/network_test_env',
'$BUILD_DIR/mongo/executor/thread_pool_task_executor_test_fixture',
- '$BUILD_DIR/mongo/s/catalog/sharding_catalog_client_mock',
+ '$BUILD_DIR/mongo/s/catalog/sharding_catalog_mock',
'$BUILD_DIR/mongo/s/coreshard',
'$BUILD_DIR/mongo/util/clock_source_mock',
'$BUILD_DIR/mongo/util/net/message_port_mock',
@@ -177,7 +177,7 @@ env.CppUnitTest(
'$BUILD_DIR/mongo/db/serveronly',
'$BUILD_DIR/mongo/executor/network_test_env',
'$BUILD_DIR/mongo/executor/thread_pool_task_executor_test_fixture',
- '$BUILD_DIR/mongo/s/catalog/sharding_catalog_client_mock',
+ '$BUILD_DIR/mongo/s/catalog/sharding_catalog_mock',
'$BUILD_DIR/mongo/s/coreshard',
'$BUILD_DIR/mongo/util/clock_source_mock',
'$BUILD_DIR/mongo/util/net/message_port_mock',
diff --git a/src/mongo/db/s/config/configsvr_add_shard_command.cpp b/src/mongo/db/s/config/configsvr_add_shard_command.cpp
index 4378ae22c20..3bc7e2eeeb0 100644
--- a/src/mongo/db/s/config/configsvr_add_shard_command.cpp
+++ b/src/mongo/db/s/config/configsvr_add_shard_command.cpp
@@ -38,7 +38,7 @@
#include "mongo/db/namespace_string.h"
#include "mongo/db/repl/replica_set_config.h"
#include "mongo/db/repl/replication_coordinator.h"
-#include "mongo/s/catalog/sharding_catalog_client.h"
+#include "mongo/s/catalog/sharding_catalog_manager.h"
#include "mongo/s/catalog/type_shard.h"
#include "mongo/s/grid.h"
#include "mongo/s/request_types/add_shard_request_type.h"
@@ -119,7 +119,7 @@ public:
parsedRequest.hasMaxSize() ? parsedRequest.getMaxSize()
: kMaxSizeMBDefault);
- StatusWith<string> addShardResult = grid.catalogClient(txn)->addShard(
+ StatusWith<string> addShardResult = Grid::get(txn)->catalogManager()->addShard(
txn,
parsedRequest.hasName() ? &parsedRequest.getName() : nullptr,
parsedRequest.getConnString(),
diff --git a/src/mongo/db/s/sharding_initialization_mongod.cpp b/src/mongo/db/s/sharding_initialization_mongod.cpp
index 4993e118b80..a65c83fc1ae 100644
--- a/src/mongo/db/s/sharding_initialization_mongod.cpp
+++ b/src/mongo/db/s/sharding_initialization_mongod.cpp
@@ -35,6 +35,9 @@
#include "mongo/client/connection_string.h"
#include "mongo/client/remote_command_targeter.h"
#include "mongo/client/remote_command_targeter_factory_impl.h"
+#include "mongo/db/server_options.h"
+#include "mongo/executor/task_executor.h"
+#include "mongo/s/catalog/replset/sharding_catalog_manager_impl.h"
#include "mongo/s/client/shard_factory.h"
#include "mongo/s/client/shard_local.h"
#include "mongo/s/client/shard_remote.h"
@@ -74,9 +77,19 @@ Status initializeGlobalShardingStateForMongod(const ConnectionString& configCS)
auto shardFactory =
stdx::make_unique<ShardFactory>(std::move(buildersMap), std::move(targeterFactory));
- return initializeGlobalShardingState(configCS, std::move(shardFactory), []() {
- return stdx::make_unique<rpc::ShardingEgressMetadataHookForMongod>();
- });
+ return initializeGlobalShardingState(
+ configCS,
+ std::move(shardFactory),
+ []() { return stdx::make_unique<rpc::ShardingEgressMetadataHookForMongod>(); },
+ [](ShardingCatalogClient* catalogClient, std::unique_ptr<executor::TaskExecutor> executor)
+ -> std::unique_ptr<ShardingCatalogManager> {
+ if (serverGlobalParams.clusterRole == ClusterRole::ConfigServer) {
+ return stdx::make_unique<ShardingCatalogManagerImpl>(catalogClient,
+ std::move(executor));
+ } else {
+ return nullptr; // Only config servers get a real ShardingCatalogManager
+ }
+ });
}
} // namespace mongo
diff --git a/src/mongo/db/s/sharding_state_test.cpp b/src/mongo/db/s/sharding_state_test.cpp
index 83b885163cb..76c6772994c 100644
--- a/src/mongo/db/s/sharding_state_test.cpp
+++ b/src/mongo/db/s/sharding_state_test.cpp
@@ -47,6 +47,7 @@
#include "mongo/s/balancer/balancer_configuration.h"
#include "mongo/s/catalog/catalog_cache.h"
#include "mongo/s/catalog/sharding_catalog_client_mock.h"
+#include "mongo/s/catalog/sharding_catalog_manager_mock.h"
#include "mongo/s/client/shard_factory.h"
#include "mongo/s/client/shard_registry.h"
#include "mongo/s/client/shard_remote.h"
@@ -111,6 +112,7 @@ void initGrid(OperationContext* txn, const ConnectionString& configConnString) {
grid.init(
stdx::make_unique<ShardingCatalogClientMock>(),
+ stdx::make_unique<ShardingCatalogManagerMock>(),
stdx::make_unique<CatalogCache>(),
std::move(shardRegistry),
stdx::make_unique<ClusterCursorManager>(txn->getServiceContext()->getPreciseClockSource()),
diff --git a/src/mongo/s/SConscript b/src/mongo/s/SConscript
index 4801a9000f1..e89179b2ef3 100644
--- a/src/mongo/s/SConscript
+++ b/src/mongo/s/SConscript
@@ -84,6 +84,7 @@ env.Library(
'$BUILD_DIR/mongo/rpc/metadata',
'$BUILD_DIR/mongo/s/catalog/dist_lock_manager_mock',
'$BUILD_DIR/mongo/s/catalog/replset/sharding_catalog_client_impl',
+ '$BUILD_DIR/mongo/s/catalog/replset/sharding_catalog_manager_impl',
'$BUILD_DIR/mongo/s/coreshard',
'$BUILD_DIR/mongo/util/clock_source_mock',
'$BUILD_DIR/mongo/util/net/message_port_mock',
diff --git a/src/mongo/s/catalog/SConscript b/src/mongo/s/catalog/SConscript
index d9470241010..5363683be08 100644
--- a/src/mongo/s/catalog/SConscript
+++ b/src/mongo/s/catalog/SConscript
@@ -9,9 +9,10 @@ env.SConscript(
)
env.Library(
- target='sharding_catalog_client_mock',
+ target='sharding_catalog_mock',
source=[
'sharding_catalog_client_mock.cpp',
+ 'sharding_catalog_manager_mock.cpp',
],
LIBDEPS=[
'dist_lock_manager_mock',
diff --git a/src/mongo/s/catalog/replset/SConscript b/src/mongo/s/catalog/replset/SConscript
index 4f9272fcbee..e8303bc5f14 100644
--- a/src/mongo/s/catalog/replset/SConscript
+++ b/src/mongo/s/catalog/replset/SConscript
@@ -51,7 +51,7 @@ env.CppUnitTest(
'$BUILD_DIR/mongo/executor/network_test_env',
'$BUILD_DIR/mongo/executor/thread_pool_task_executor_test_fixture',
'$BUILD_DIR/mongo/s/catalog/dist_lock_catalog_mock',
- '$BUILD_DIR/mongo/s/catalog/sharding_catalog_client_mock',
+ '$BUILD_DIR/mongo/s/catalog/sharding_catalog_mock',
'$BUILD_DIR/mongo/s/coreshard',
'$BUILD_DIR/mongo/s/mongoscore',
'$BUILD_DIR/mongo/s/sharding_test_fixture',
@@ -78,6 +78,22 @@ env.Library(
)
env.Library(
+ target='sharding_catalog_manager_impl',
+ source=[
+ 'sharding_catalog_manager_impl.cpp',
+ ],
+ LIBDEPS=[
+ '$BUILD_DIR/mongo/db/repl/read_concern_args',
+ '$BUILD_DIR/mongo/executor/network_interface',
+ '$BUILD_DIR/mongo/s/client/sharding_client',
+ ],
+ LIBDEPS_TAGS=[
+ # Depends on coreshard, but that would be circular
+ 'incomplete',
+ ],
+)
+
+env.Library(
target='sharding_catalog_test_fixture',
source=[
'sharding_catalog_test_fixture.cpp',
diff --git a/src/mongo/s/catalog/replset/dist_lock_catalog_impl_test.cpp b/src/mongo/s/catalog/replset/dist_lock_catalog_impl_test.cpp
index 3b8331495e5..ff05b83d175 100644
--- a/src/mongo/s/catalog/replset/dist_lock_catalog_impl_test.cpp
+++ b/src/mongo/s/catalog/replset/dist_lock_catalog_impl_test.cpp
@@ -48,6 +48,7 @@
#include "mongo/s/catalog/catalog_cache.h"
#include "mongo/s/catalog/replset/dist_lock_catalog_impl.h"
#include "mongo/s/catalog/sharding_catalog_client_mock.h"
+#include "mongo/s/catalog/sharding_catalog_manager_mock.h"
#include "mongo/s/catalog/type_lockpings.h"
#include "mongo/s/catalog/type_locks.h"
#include "mongo/s/client/shard_factory.h"
@@ -161,6 +162,7 @@ private:
_distLockCatalog = stdx::make_unique<DistLockCatalogImpl>(shardRegistry.get());
grid.init(stdx::make_unique<ShardingCatalogClientMock>(),
+ stdx::make_unique<ShardingCatalogManagerMock>(),
stdx::make_unique<CatalogCache>(),
std::move(shardRegistry),
std::unique_ptr<ClusterCursorManager>{nullptr},
@@ -180,8 +182,6 @@ private:
std::unique_ptr<executor::NetworkTestEnv> _networkTestEnv;
- ShardingCatalogClientMock _catalogMgr;
-
std::unique_ptr<DistLockCatalogImpl> _distLockCatalog;
OperationContextNoop _txn;
};
diff --git a/src/mongo/s/catalog/replset/replset_dist_lock_manager_test.cpp b/src/mongo/s/catalog/replset/replset_dist_lock_manager_test.cpp
index dac97dc4369..c353ee05e82 100644
--- a/src/mongo/s/catalog/replset/replset_dist_lock_manager_test.cpp
+++ b/src/mongo/s/catalog/replset/replset_dist_lock_manager_test.cpp
@@ -52,6 +52,7 @@
#include "mongo/s/catalog/catalog_cache.h"
#include "mongo/s/catalog/dist_lock_catalog_mock.h"
#include "mongo/s/catalog/sharding_catalog_client.h"
+#include "mongo/s/catalog/sharding_catalog_manager.h"
#include "mongo/s/catalog/type_lockpings.h"
#include "mongo/s/catalog/type_locks.h"
#include "mongo/s/client/shard_factory.h"
@@ -166,6 +167,7 @@ protected:
auto shardRegistry = stdx::make_unique<ShardRegistry>(std::move(shardFactory), configCS);
grid.init(nullptr,
nullptr,
+ nullptr,
std::move(shardRegistry),
nullptr,
stdx::make_unique<BalancerConfiguration>(),
diff --git a/src/mongo/s/catalog/replset/sharding_catalog_add_shard_test.cpp b/src/mongo/s/catalog/replset/sharding_catalog_add_shard_test.cpp
index d1159c64dcf..62a79a0e37b 100644
--- a/src/mongo/s/catalog/replset/sharding_catalog_add_shard_test.cpp
+++ b/src/mongo/s/catalog/replset/sharding_catalog_add_shard_test.cpp
@@ -40,8 +40,8 @@
#include "mongo/db/s/type_shard_identity.h"
#include "mongo/rpc/metadata/repl_set_metadata.h"
#include "mongo/rpc/metadata/server_selection_metadata.h"
-#include "mongo/s/catalog/replset/sharding_catalog_client_impl.h"
#include "mongo/s/catalog/replset/sharding_catalog_test_fixture.h"
+#include "mongo/s/catalog/sharding_catalog_manager.h"
#include "mongo/s/catalog/type_changelog.h"
#include "mongo/s/catalog/type_database.h"
#include "mongo/s/catalog/type_shard.h"
@@ -300,10 +300,10 @@ TEST_F(AddShardTest, Standalone) {
auto future = launchAsync([this, expectedShardName] {
auto shardName = assertGet(
- catalogClient()->addShard(operationContext(),
- &expectedShardName,
- assertGet(ConnectionString::parse("StandaloneHost:12345")),
- 100));
+ catalogManager()->addShard(operationContext(),
+ &expectedShardName,
+ assertGet(ConnectionString::parse("StandaloneHost:12345")),
+ 100));
ASSERT_EQUALS(expectedShardName, shardName);
});
@@ -381,10 +381,10 @@ TEST_F(AddShardTest, StandaloneGenerateName) {
auto future = launchAsync([this, expectedShardName, shardTarget] {
auto shardName = assertGet(
- catalogClient()->addShard(operationContext(),
- nullptr,
- assertGet(ConnectionString::parse(shardTarget.toString())),
- 100));
+ catalogManager()->addShard(operationContext(),
+ nullptr,
+ assertGet(ConnectionString::parse(shardTarget.toString())),
+ 100));
ASSERT_EQUALS(expectedShardName, shardName);
});
@@ -478,7 +478,7 @@ TEST_F(AddShardTest, AddSCCCConnectionStringAsShard) {
auto future = launchAsync([this, invalidConn] {
const std::string shardName("StandaloneShard");
- auto status = catalogClient()->addShard(operationContext(), &shardName, invalidConn, 100);
+ auto status = catalogManager()->addShard(operationContext(), &shardName, invalidConn, 100);
ASSERT_EQUALS(ErrorCodes::BadValue, status);
ASSERT_STRING_CONTAINS(status.getStatus().reason(), "Invalid connection string");
});
@@ -493,10 +493,10 @@ TEST_F(AddShardTest, EmptyShardName) {
auto future = launchAsync([this, expectedShardName] {
auto status =
- catalogClient()->addShard(operationContext(),
- &expectedShardName,
- assertGet(ConnectionString::parse("StandaloneHost:12345")),
- 100);
+ catalogManager()->addShard(operationContext(),
+ &expectedShardName,
+ assertGet(ConnectionString::parse("StandaloneHost:12345")),
+ 100);
ASSERT_EQUALS(ErrorCodes::BadValue, status);
ASSERT_EQUALS("shard name cannot be empty", status.getStatus().reason());
});
@@ -517,10 +517,10 @@ TEST_F(AddShardTest, UnreachableHost) {
auto future = launchAsync([this, expectedShardName] {
auto status =
- catalogClient()->addShard(operationContext(),
- &expectedShardName,
- assertGet(ConnectionString::parse("StandaloneHost:12345")),
- 100);
+ catalogManager()->addShard(operationContext(),
+ &expectedShardName,
+ assertGet(ConnectionString::parse("StandaloneHost:12345")),
+ 100);
ASSERT_EQUALS(ErrorCodes::HostUnreachable, status);
ASSERT_EQUALS("host unreachable", status.getStatus().reason());
});
@@ -544,10 +544,10 @@ TEST_F(AddShardTest, AddMongosAsShard) {
auto future = launchAsync([this, expectedShardName] {
auto status =
- catalogClient()->addShard(operationContext(),
- &expectedShardName,
- assertGet(ConnectionString::parse("StandaloneHost:12345")),
- 100);
+ catalogManager()->addShard(operationContext(),
+ &expectedShardName,
+ assertGet(ConnectionString::parse("StandaloneHost:12345")),
+ 100);
ASSERT_EQUALS(ErrorCodes::RPCProtocolNegotiationFailed, status);
});
@@ -571,10 +571,10 @@ TEST_F(AddShardTest, AddExistingShardStandalone) {
auto future = launchAsync([this, expectedShardName, shardTarget] {
auto status =
- catalogClient()->addShard(operationContext(),
- &expectedShardName,
- assertGet(ConnectionString::parse(shardTarget.toString())),
- 100);
+ catalogManager()->addShard(operationContext(),
+ &expectedShardName,
+ assertGet(ConnectionString::parse(shardTarget.toString())),
+ 100);
ASSERT_EQUALS(ErrorCodes::OperationFailed, status);
ASSERT_STRING_CONTAINS(status.getStatus().reason(),
"is already a member of the existing shard");
@@ -603,7 +603,7 @@ TEST_F(AddShardTest, AddExistingShardReplicaSet) {
auto future = launchAsync([this, expectedShardName, connString] {
auto status =
- catalogClient()->addShard(operationContext(), &expectedShardName, connString, 100);
+ catalogManager()->addShard(operationContext(), &expectedShardName, connString, 100);
ASSERT_EQUALS(ErrorCodes::OperationFailed, status);
ASSERT_STRING_CONTAINS(status.getStatus().reason(),
"is already a member of the existing shard");
@@ -630,10 +630,10 @@ TEST_F(AddShardTest, AddReplicaSetShardAsStandalone) {
auto future = launchAsync([this, expectedShardName, shardTarget] {
auto status =
- catalogClient()->addShard(operationContext(),
- &expectedShardName,
- assertGet(ConnectionString::parse(shardTarget.toString())),
- 100);
+ catalogManager()->addShard(operationContext(),
+ &expectedShardName,
+ assertGet(ConnectionString::parse(shardTarget.toString())),
+ 100);
ASSERT_EQUALS(ErrorCodes::OperationFailed, status);
ASSERT_STRING_CONTAINS(status.getStatus().reason(), "use replica set url format");
});
@@ -660,7 +660,7 @@ TEST_F(AddShardTest, AddStandaloneHostShardAsReplicaSet) {
auto future = launchAsync([this, expectedShardName, connString] {
auto status =
- catalogClient()->addShard(operationContext(), &expectedShardName, connString, 100);
+ catalogManager()->addShard(operationContext(), &expectedShardName, connString, 100);
ASSERT_EQUALS(ErrorCodes::OperationFailed, status);
ASSERT_STRING_CONTAINS(status.getStatus().reason(), "host did not return a set name");
});
@@ -686,7 +686,7 @@ TEST_F(AddShardTest, ReplicaSetMistmatchedReplicaSetName) {
auto future = launchAsync([this, expectedShardName, connString] {
auto status =
- catalogClient()->addShard(operationContext(), &expectedShardName, connString, 100);
+ catalogManager()->addShard(operationContext(), &expectedShardName, connString, 100);
ASSERT_EQUALS(ErrorCodes::OperationFailed, status);
ASSERT_STRING_CONTAINS(status.getStatus().reason(), "does not match the actual set name");
});
@@ -713,7 +713,7 @@ TEST_F(AddShardTest, ShardIsCSRSConfigServer) {
auto future = launchAsync([this, expectedShardName, connString] {
auto status =
- catalogClient()->addShard(operationContext(), &expectedShardName, connString, 100);
+ catalogManager()->addShard(operationContext(), &expectedShardName, connString, 100);
ASSERT_EQUALS(ErrorCodes::OperationFailed, status);
ASSERT_STRING_CONTAINS(status.getStatus().reason(),
"as a shard since it is a config server");
@@ -743,7 +743,7 @@ TEST_F(AddShardTest, ReplicaSetMissingHostsProvidedInSeedList) {
auto future = launchAsync([this, expectedShardName, connString] {
auto status =
- catalogClient()->addShard(operationContext(), &expectedShardName, connString, 100);
+ catalogManager()->addShard(operationContext(), &expectedShardName, connString, 100);
ASSERT_EQUALS(ErrorCodes::OperationFailed, status);
ASSERT_STRING_CONTAINS(status.getStatus().reason(),
"host2:12345 does not belong to replica set");
@@ -775,7 +775,7 @@ TEST_F(AddShardTest, ShardNameIsConfig) {
auto future = launchAsync([this, expectedShardName, connString] {
auto status =
- catalogClient()->addShard(operationContext(), &expectedShardName, connString, 100);
+ catalogManager()->addShard(operationContext(), &expectedShardName, connString, 100);
ASSERT_EQUALS(ErrorCodes::BadValue, status);
ASSERT_EQUALS(status.getStatus().reason(),
"use of shard replica set with name 'config' is not allowed");
@@ -807,7 +807,7 @@ TEST_F(AddShardTest, ShardContainsExistingDatabase) {
auto future = launchAsync([this, expectedShardName, connString] {
auto status =
- catalogClient()->addShard(operationContext(), &expectedShardName, connString, 100);
+ catalogManager()->addShard(operationContext(), &expectedShardName, connString, 100);
ASSERT_EQUALS(ErrorCodes::OperationFailed, status);
ASSERT_STRING_CONTAINS(
status.getStatus().reason(),
@@ -851,7 +851,7 @@ TEST_F(AddShardTest, ReAddExistingShard) {
auto future = launchAsync([this, expectedShardName, connString] {
auto status =
- catalogClient()->addShard(operationContext(), &expectedShardName, connString, 100);
+ catalogManager()->addShard(operationContext(), &expectedShardName, connString, 100);
ASSERT_OK(status);
});
@@ -918,7 +918,7 @@ TEST_F(AddShardTest, SuccessfullyAddReplicaSet) {
auto future = launchAsync([this, expectedShardName, connString] {
auto status =
- catalogClient()->addShard(operationContext(), &expectedShardName, connString, 100);
+ catalogManager()->addShard(operationContext(), &expectedShardName, connString, 100);
ASSERT_OK(status);
auto shardName = status.getValue();
ASSERT_EQUALS(expectedShardName, shardName);
@@ -979,7 +979,7 @@ TEST_F(AddShardTest, AddShardSucceedsEvenIfAddingDBsFromNewShardFails) {
auto future = launchAsync([this, expectedShardName, connString] {
auto status =
- catalogClient()->addShard(operationContext(), &expectedShardName, connString, 100);
+ catalogManager()->addShard(operationContext(), &expectedShardName, connString, 100);
ASSERT_OK(status);
auto shardName = status.getValue();
ASSERT_EQUALS(expectedShardName, shardName);
@@ -1064,7 +1064,7 @@ TEST_F(AddShardTest, ReplicaSetExtraHostsDiscovered) {
auto future = launchAsync([this, expectedShardName, seedString] {
auto status =
- catalogClient()->addShard(operationContext(), &expectedShardName, seedString, 100);
+ catalogManager()->addShard(operationContext(), &expectedShardName, seedString, 100);
ASSERT_OK(status);
auto shardName = status.getValue();
ASSERT_EQUALS(expectedShardName, shardName);
diff --git a/src/mongo/s/catalog/replset/sharding_catalog_client_impl.cpp b/src/mongo/s/catalog/replset/sharding_catalog_client_impl.cpp
index f78d6a7ba62..71c060df46f 100644
--- a/src/mongo/s/catalog/replset/sharding_catalog_client_impl.cpp
+++ b/src/mongo/s/catalog/replset/sharding_catalog_client_impl.cpp
@@ -162,228 +162,9 @@ Status _processBatchWriteResponse(StatusWith<Shard::CommandResponse> response,
} // namespace
-StatusWith<ShardType> ShardingCatalogClientImpl::_validateHostAsShard(
- OperationContext* txn,
- ShardRegistry* shardRegistry,
- const ConnectionString& connectionString,
- const std::string* shardProposedName) {
- if (connectionString.type() == ConnectionString::INVALID) {
- return {ErrorCodes::BadValue, "Invalid connection string"};
- }
-
- if (shardProposedName && shardProposedName->empty()) {
- return {ErrorCodes::BadValue, "shard name cannot be empty"};
- }
-
- // TODO: Don't create a detached Shard object, create a detached RemoteCommandTargeter instead.
- const std::shared_ptr<Shard> shardConn{shardRegistry->createConnection(connectionString)};
- invariant(shardConn);
- auto targeter = shardConn->getTargeter();
-
- // Check whether any host in the connection is already part of the cluster.
- shardRegistry->reload(txn);
- for (const auto& hostAndPort : connectionString.getServers()) {
- std::shared_ptr<Shard> shard;
- shard = shardRegistry->getShardNoReload(hostAndPort.toString());
- if (shard) {
- return {ErrorCodes::OperationFailed,
- str::stream() << "'" << hostAndPort.toString() << "' "
- << "is already a member of the existing shard '"
- << shard->getConnString().toString()
- << "' ("
- << shard->getId()
- << ")."};
- }
- }
-
- // Check for mongos and older version mongod connections, and whether the hosts
- // can be found for the user specified replset.
- auto swCommandResponse =
- _runCommandForAddShard(txn, targeter.get(), "admin", BSON("isMaster" << 1));
- if (!swCommandResponse.isOK()) {
- if (swCommandResponse.getStatus() == ErrorCodes::RPCProtocolNegotiationFailed) {
- // Mongos to mongos commands are no longer supported in the wire protocol
- // (because mongos does not support OP_COMMAND), similarly for a new mongos
- // and an old mongod. So the call will fail in such cases.
- // TODO: If/When mongos ever supports opCommands, this logic will break because
- // cmdStatus will be OK.
- return {ErrorCodes::RPCProtocolNegotiationFailed,
- str::stream() << shardConn->toString()
- << " does not recognize the RPC protocol being used. This is"
- << " likely because it contains a node that is a mongos or an old"
- << " version of mongod."};
- } else {
- return swCommandResponse.getStatus();
- }
- }
-
- // Check for a command response error
- auto resIsMasterStatus = std::move(swCommandResponse.getValue().commandStatus);
- if (!resIsMasterStatus.isOK()) {
- return {resIsMasterStatus.code(),
- str::stream() << "Error running isMaster against " << shardConn->toString() << ": "
- << causedBy(resIsMasterStatus)};
- }
-
- auto resIsMaster = std::move(swCommandResponse.getValue().response);
-
- // Check whether there is a master. If there isn't, the replica set may not have been
- // initiated. If the connection is a standalone, it will return true for isMaster.
- bool isMaster;
- Status status = bsonExtractBooleanField(resIsMaster, "ismaster", &isMaster);
- if (!status.isOK()) {
- return Status(status.code(),
- str::stream() << "isMaster returned invalid 'ismaster' "
- << "field when attempting to add "
- << connectionString.toString()
- << " as a shard: "
- << status.reason());
- }
- if (!isMaster) {
- return {ErrorCodes::NotMaster,
- str::stream()
- << connectionString.toString()
- << " does not have a master. If this is a replica set, ensure that it has a"
- << " healthy primary and that the set has been properly initiated."};
- }
-
- const string providedSetName = connectionString.getSetName();
- const string foundSetName = resIsMaster["setName"].str();
-
- // Make sure the specified replica set name (if any) matches the actual shard's replica set
- if (providedSetName.empty() && !foundSetName.empty()) {
- return {ErrorCodes::OperationFailed,
- str::stream() << "host is part of set " << foundSetName << "; "
- << "use replica set url format "
- << "<setname>/<server1>,<server2>, ..."};
- }
-
- if (!providedSetName.empty() && foundSetName.empty()) {
- return {ErrorCodes::OperationFailed,
- str::stream() << "host did not return a set name; "
- << "is the replica set still initializing? "
- << resIsMaster};
- }
-
- // Make sure the set name specified in the connection string matches the one where its hosts
- // belong into
- if (!providedSetName.empty() && (providedSetName != foundSetName)) {
- return {ErrorCodes::OperationFailed,
- str::stream() << "the provided connection string (" << connectionString.toString()
- << ") does not match the actual set name "
- << foundSetName};
- }
-
- // Is it a config server?
- if (resIsMaster.hasField("configsvr")) {
- return {ErrorCodes::OperationFailed,
- str::stream() << "Cannot add " << connectionString.toString()
- << " as a shard since it is a config server"};
- }
-
- // If the shard is part of a replica set, make sure all the hosts mentioned in the connection
- // string are part of the set. It is fine if not all members of the set are mentioned in the
- // connection string, though.
- if (!providedSetName.empty()) {
- std::set<string> hostSet;
-
- BSONObjIterator iter(resIsMaster["hosts"].Obj());
- while (iter.more()) {
- hostSet.insert(iter.next().String()); // host:port
- }
-
- if (resIsMaster["passives"].isABSONObj()) {
- BSONObjIterator piter(resIsMaster["passives"].Obj());
- while (piter.more()) {
- hostSet.insert(piter.next().String()); // host:port
- }
- }
-
- if (resIsMaster["arbiters"].isABSONObj()) {
- BSONObjIterator piter(resIsMaster["arbiters"].Obj());
- while (piter.more()) {
- hostSet.insert(piter.next().String()); // host:port
- }
- }
-
- vector<HostAndPort> hosts = connectionString.getServers();
- for (size_t i = 0; i < hosts.size(); i++) {
- const string host = hosts[i].toString(); // host:port
- if (hostSet.find(host) == hostSet.end()) {
- return {ErrorCodes::OperationFailed,
- str::stream() << "in seed list " << connectionString.toString() << ", host "
- << host
- << " does not belong to replica set "
- << foundSetName
- << "; found "
- << resIsMaster.toString()};
- }
- }
- }
-
- string actualShardName;
-
- if (shardProposedName) {
- actualShardName = *shardProposedName;
- } else if (!foundSetName.empty()) {
- // Default it to the name of the replica set
- actualShardName = foundSetName;
- }
-
- // Disallow adding shard replica set with name 'config'
- if (actualShardName == "config") {
- return {ErrorCodes::BadValue, "use of shard replica set with name 'config' is not allowed"};
- }
-
- // Retrieve the most up to date connection string that we know from the replica set monitor (if
- // this is a replica set shard, otherwise it will be the same value as connectionString).
- ConnectionString actualShardConnStr = shardConn->getTargeter()->connectionString();
-
- ShardType shard;
- shard.setName(actualShardName);
- shard.setHost(actualShardConnStr.toString());
-
- return shard;
-}
-
-StatusWith<std::vector<std::string>> ShardingCatalogClientImpl::_getDBNamesListFromShard(
- OperationContext* txn, ShardRegistry* shardRegistry, const ConnectionString& connectionString) {
- // TODO: Don't create a detached Shard object, create a detached RemoteCommandTargeter instead.
- const std::shared_ptr<Shard> shardConn{
- shardRegistry->createConnection(connectionString).release()};
- invariant(shardConn);
-
- auto swCommandResponse = _runCommandForAddShard(
- txn, shardConn->getTargeter().get(), "admin", BSON("listDatabases" << 1));
- if (!swCommandResponse.isOK()) {
- return swCommandResponse.getStatus();
- }
-
- auto cmdStatus = std::move(swCommandResponse.getValue().commandStatus);
- if (!cmdStatus.isOK()) {
- return cmdStatus;
- }
-
- auto cmdResult = std::move(swCommandResponse.getValue().response);
-
- vector<string> dbNames;
-
- for (const auto& dbEntry : cmdResult["databases"].Obj()) {
- const string& dbName = dbEntry["name"].String();
-
- if (!(dbName == "local" || dbName == "admin")) {
- dbNames.push_back(dbName);
- }
- }
-
- return dbNames;
-}
-
ShardingCatalogClientImpl::ShardingCatalogClientImpl(
- std::unique_ptr<DistLockManager> distLockManager,
- std::unique_ptr<executor::TaskExecutor> addShardExecutor)
- : _distLockManager(std::move(distLockManager)),
- _executorForAddShard(std::move(addShardExecutor)) {}
+ std::unique_ptr<DistLockManager> distLockManager)
+ : _distLockManager(std::move(distLockManager)) {}
ShardingCatalogClientImpl::~ShardingCatalogClientImpl() = default;
@@ -394,7 +175,6 @@ Status ShardingCatalogClientImpl::startup() {
}
_started = true;
_distLockManager->startUp();
- _executorForAddShard->startup();
return Status::OK();
}
@@ -407,199 +187,6 @@ void ShardingCatalogClientImpl::shutDown(OperationContext* txn) {
invariant(_distLockManager);
_distLockManager->shutDown(txn);
- _executorForAddShard->shutdown();
- _executorForAddShard->join();
-}
-
-StatusWith<Shard::CommandResponse> ShardingCatalogClientImpl::_runCommandForAddShard(
- OperationContext* txn,
- RemoteCommandTargeter* targeter,
- const std::string& dbName,
- const BSONObj& cmdObj) {
- auto host = targeter->findHost(ReadPreferenceSetting{ReadPreference::PrimaryOnly},
- RemoteCommandTargeter::selectFindHostMaxWaitTime(txn));
- if (!host.isOK()) {
- return host.getStatus();
- }
-
- executor::RemoteCommandRequest request(
- host.getValue(), dbName, cmdObj, rpc::makeEmptyMetadata(), Seconds(30));
- StatusWith<executor::RemoteCommandResponse> swResponse =
- Status(ErrorCodes::InternalError, "Internal error running command");
-
- auto callStatus = _executorForAddShard->scheduleRemoteCommand(
- request, [&swResponse](const executor::TaskExecutor::RemoteCommandCallbackArgs& args) {
- swResponse = args.response;
- });
- if (!callStatus.isOK()) {
- return callStatus.getStatus();
- }
-
- // Block until the command is carried out
- _executorForAddShard->wait(callStatus.getValue());
-
- if (!swResponse.isOK()) {
- if (swResponse.getStatus().compareCode(ErrorCodes::ExceededTimeLimit)) {
- LOG(0) << "Operation for addShard timed out with status " << swResponse.getStatus();
- }
- return swResponse.getStatus();
- }
-
- BSONObj responseObj = swResponse.getValue().data.getOwned();
- BSONObj responseMetadata = swResponse.getValue().metadata.getOwned();
- Status commandStatus = getStatusFromCommandResult(responseObj);
- Status writeConcernStatus = getWriteConcernStatusFromCommandResult(responseObj);
-
- return Shard::CommandResponse(std::move(responseObj),
- std::move(responseMetadata),
- std::move(commandStatus),
- std::move(writeConcernStatus));
-}
-
-StatusWith<string> ShardingCatalogClientImpl::addShard(
- OperationContext* txn,
- const std::string* shardProposedName,
- const ConnectionString& shardConnectionString,
- const long long maxSize) {
- // Validate the specified connection string may serve as shard at all
- auto shardStatus =
- _validateHostAsShard(txn, grid.shardRegistry(), shardConnectionString, shardProposedName);
- if (!shardStatus.isOK()) {
- // TODO: This is a workaround for the case were we could have some bad shard being
- // requested to be added and we put that bad connection string on the global replica set
- // monitor registry. It needs to be cleaned up so that when a correct replica set is added,
- // it will be recreated.
- ReplicaSetMonitor::remove(shardConnectionString.getSetName());
- return shardStatus.getStatus();
- }
-
- ShardType& shardType = shardStatus.getValue();
-
- auto dbNamesStatus = _getDBNamesListFromShard(txn, grid.shardRegistry(), shardConnectionString);
- if (!dbNamesStatus.isOK()) {
- return dbNamesStatus.getStatus();
- }
-
- // Check that none of the existing shard candidate's dbs exist already
- for (const string& dbName : dbNamesStatus.getValue()) {
- auto dbt = getDatabase(txn, dbName);
- if (dbt.isOK()) {
- const auto& dbDoc = dbt.getValue().value;
- return Status(ErrorCodes::OperationFailed,
- str::stream() << "can't add shard "
- << "'"
- << shardConnectionString.toString()
- << "'"
- << " because a local database '"
- << dbName
- << "' exists in another "
- << dbDoc.getPrimary());
- } else if (dbt != ErrorCodes::NamespaceNotFound) {
- return dbt.getStatus();
- }
- }
-
- // If a name for a shard wasn't provided, generate one
- if (shardType.getName().empty()) {
- StatusWith<string> result = _generateNewShardName(txn);
- if (!result.isOK()) {
- return result.getStatus();
- }
- shardType.setName(result.getValue());
- }
-
- if (maxSize > 0) {
- shardType.setMaxSizeMB(maxSize);
- }
-
- ShardIdentityType shardIdentity;
- shardIdentity.setConfigsvrConnString(
- Grid::get(txn)->shardRegistry()->getConfigServerConnectionString());
- shardIdentity.setShardName(shardType.getName());
- shardIdentity.setClusterId(Grid::get(txn)->shardRegistry()->getClusterId());
- auto validateStatus = shardIdentity.validate();
- if (!validateStatus.isOK()) {
- return validateStatus;
- }
-
- log() << "going to insert shardIdentity document into shard: " << shardIdentity.toString();
-
- auto updateRequest = shardIdentity.createUpsertForAddShard();
- BatchedCommandRequest commandRequest(updateRequest.release());
- commandRequest.setNS(NamespaceString::kConfigCollectionNamespace);
- commandRequest.setWriteConcern(kMajorityWriteConcern.toBSON());
-
- const std::shared_ptr<Shard> shardConn{
- Grid::get(txn)->shardRegistry()->createConnection(shardConnectionString)};
- invariant(shardConn);
- auto targeter = shardConn->getTargeter();
-
- auto swCommandResponse =
- _runCommandForAddShard(txn, targeter.get(), "admin", commandRequest.toBSON());
-
- if (!swCommandResponse.isOK()) {
- return swCommandResponse.getStatus();
- }
-
- auto commandResponse = std::move(swCommandResponse.getValue());
-
- BatchedCommandResponse batchResponse;
- auto batchResponseStatus = _processBatchWriteResponse(commandResponse, &batchResponse);
- if (!batchResponseStatus.isOK()) {
- return batchResponseStatus;
- }
-
- log() << "going to insert new entry for shard into config.shards: " << shardType.toString();
-
- Status result = insertConfigDocument(txn, ShardType::ConfigNS, shardType.toBSON());
- if (!result.isOK()) {
- log() << "error adding shard: " << shardType.toBSON() << " err: " << result.reason();
- if (result == ErrorCodes::DuplicateKey) {
- // TODO(SERVER-24213): adding a shard that already exists should be considered success,
- // however this approach does no validation that we are adding the shard with the same
- // options. It also does not protect against adding the same shard with a different
- // shard name and slightly different connection string. This is a temporary hack to
- // get the continuous stepdown suite passing.
- warning() << "Received duplicate key error when inserting new shard with name "
- << shardType.getName() << " and connection string "
- << shardConnectionString.toString()
- << " to config.shards collection. This most likely means that there was an "
- "attempt to add a shard that already exists in the cluster";
- return shardType.getName();
- }
- return result;
- }
-
- // Add all databases which were discovered on the new shard
- for (const string& dbName : dbNamesStatus.getValue()) {
- DatabaseType dbt;
- dbt.setName(dbName);
- dbt.setPrimary(shardType.getName());
- dbt.setSharded(false);
-
- Status status = updateDatabase(txn, dbName, dbt);
- if (!status.isOK()) {
- log() << "adding shard " << shardConnectionString.toString()
- << " even though could not add database " << dbName;
- }
- }
-
- // Record in changelog
- BSONObjBuilder shardDetails;
- shardDetails.append("name", shardType.getName());
- shardDetails.append("host", shardConnectionString.toString());
-
- logChange(txn, "addShard", "", shardDetails.obj());
-
- // Ensure the added shard is visible to this process.
- auto shardRegistry = Grid::get(txn)->shardRegistry();
- if (!shardRegistry->getShard(txn, shardType.getName())) {
- return {ErrorCodes::OperationFailed,
- "Could not find shard metadata for shard after adding it. This most likely "
- "indicates that the shard was removed immediately after it was added."};
- }
-
- return shardType.getName();
}
Status ShardingCatalogClientImpl::updateCollection(OperationContext* txn,
@@ -1924,44 +1511,6 @@ Status ShardingCatalogClientImpl::_checkDbDoesNotExist(OperationContext* txn,
<< dbName);
}
-StatusWith<std::string> ShardingCatalogClientImpl::_generateNewShardName(OperationContext* txn) {
- BSONObjBuilder shardNameRegex;
- shardNameRegex.appendRegex(ShardType::name(), "^shard");
-
- auto findStatus = _exhaustiveFindOnConfig(txn,
- kConfigReadSelector,
- NamespaceString(ShardType::ConfigNS),
- shardNameRegex.obj(),
- BSON(ShardType::name() << -1),
- 1);
- if (!findStatus.isOK()) {
- return findStatus.getStatus();
- }
-
- const auto& docs = findStatus.getValue().value;
-
- int count = 0;
- if (!docs.empty()) {
- const auto shardStatus = ShardType::fromBSON(docs.front());
- if (!shardStatus.isOK()) {
- return shardStatus.getStatus();
- }
-
- std::istringstream is(shardStatus.getValue().getName().substr(5));
- is >> count;
- count++;
- }
-
- // TODO fix so that we can have more than 10000 automatically generated shard names
- if (count < 9999) {
- std::stringstream ss;
- ss << "shard" << std::setfill('0') << std::setw(4) << count;
- return ss.str();
- }
-
- return Status(ErrorCodes::OperationFailed, "unable to generate new shard name");
-}
-
Status ShardingCatalogClientImpl::_createCappedConfigCollection(OperationContext* txn,
StringData collName,
int cappedSize) {
@@ -2187,8 +1736,4 @@ Status ShardingCatalogClientImpl::appendInfoForConfigServerDatabases(OperationCo
return Status::OK();
}
-void ShardingCatalogClientImpl::appendConnectionStats(executor::ConnectionPoolStats* stats) {
- _executorForAddShard->appendConnectionStats(stats);
-}
-
} // namespace mongo
diff --git a/src/mongo/s/catalog/replset/sharding_catalog_client_impl.h b/src/mongo/s/catalog/replset/sharding_catalog_client_impl.h
index 9f8201f8d23..48df7545263 100644
--- a/src/mongo/s/catalog/replset/sharding_catalog_client_impl.h
+++ b/src/mongo/s/catalog/replset/sharding_catalog_client_impl.h
@@ -49,8 +49,7 @@ class TaskExecutor;
*/
class ShardingCatalogClientImpl final : public ShardingCatalogClient {
public:
- ShardingCatalogClientImpl(std::unique_ptr<DistLockManager> distLockManager,
- std::unique_ptr<executor::TaskExecutor> addShardExecutor);
+ explicit ShardingCatalogClientImpl(std::unique_ptr<DistLockManager> distLockManager);
virtual ~ShardingCatalogClientImpl();
/**
@@ -63,11 +62,6 @@ public:
Status enableSharding(OperationContext* txn, const std::string& dbName) override;
- StatusWith<std::string> addShard(OperationContext* txn,
- const std::string* shardProposedName,
- const ConnectionString& shardConnectionString,
- const long long maxSize) override;
-
Status updateDatabase(OperationContext* txn,
const std::string& dbName,
const DatabaseType& db) override;
@@ -182,8 +176,6 @@ public:
Status appendInfoForConfigServerDatabases(OperationContext* txn,
BSONArrayBuilder* builder) override;
- void appendConnectionStats(executor::ConnectionPoolStats* stats) override;
-
/**
* Runs a read command against the config server with majority read concern.
*/
@@ -213,42 +205,6 @@ private:
Status _checkDbDoesNotExist(OperationContext* txn, const std::string& dbName, DatabaseType* db);
/**
- * Generates a unique name to be given to a newly added shard.
- */
- StatusWith<std::string> _generateNewShardName(OperationContext* txn);
-
- /**
- * Validates that the specified connection string can serve as a shard server. In particular,
- * this function checks that the shard can be contacted, that it is not already member of
- * another sharded cluster and etc.
- *
- * @param shardRegistry Shard registry to use for getting a targeter to the shard-to-be.
- * @param connectionString Connection string to be attempted as a shard host.
- * @param shardProposedName Optional proposed name for the shard. Can be omitted in which case
- * a unique name for the shard will be generated from the shard's connection string. If it
- * is not omitted, the value cannot be the empty string.
- *
- * On success returns a partially initialized ShardType object corresponding to the requested
- * shard. It will have the hostName field set and optionally the name, if the name could be
- * generated from either the proposed name or the connection string set name. The returned
- * shard's name should be checked and if empty, one should be generated using some uniform
- * algorithm.
- */
- StatusWith<ShardType> _validateHostAsShard(OperationContext* txn,
- ShardRegistry* shardRegistry,
- const ConnectionString& connectionString,
- const std::string* shardProposedName);
-
- /**
- * Runs the listDatabases command on the specified host and returns the names of all databases
- * it returns excluding those named local and admin, since they serve administrative purpose.
- */
- StatusWith<std::vector<std::string>> _getDBNamesListFromShard(
- OperationContext* txn,
- ShardRegistry* shardRegistry,
- const ConnectionString& connectionString);
-
- /**
* Creates the specified collection name in the config database.
*/
Status _createCappedConfigCollection(OperationContext* txn,
@@ -272,15 +228,6 @@ private:
const NamespaceString& ns,
BSONObj query);
- /**
- * Runs a command against a "shard" that is not yet in the cluster and thus not present in the
- * ShardRegistry.
- */
- StatusWith<Shard::CommandResponse> _runCommandForAddShard(OperationContext* txn,
- RemoteCommandTargeter* targeter,
- const std::string& dbName,
- const BSONObj& cmdObj);
-
StatusWith<repl::OpTimeWith<std::vector<BSONObj>>> _exhaustiveFindOnConfig(
OperationContext* txn,
const ReadPreferenceSetting& readPref,
@@ -337,11 +284,6 @@ private:
// Distributed lock manager singleton.
std::unique_ptr<DistLockManager> _distLockManager; // (R)
- // Executor specifically used for sending commands to servers that are in the process of being
- // added as shards. Does not have any connection hook set on it, thus it can be used to talk
- // to servers that are not yet in the ShardRegistry.
- std::unique_ptr<executor::TaskExecutor> _executorForAddShard; // (R)
-
// True if shutDown() has been called. False, otherwise.
bool _inShutdown = false; // (M)
diff --git a/src/mongo/s/catalog/replset/sharding_catalog_manager_impl.cpp b/src/mongo/s/catalog/replset/sharding_catalog_manager_impl.cpp
new file mode 100644
index 00000000000..239d5e0c28d
--- /dev/null
+++ b/src/mongo/s/catalog/replset/sharding_catalog_manager_impl.cpp
@@ -0,0 +1,602 @@
+/**
+ * Copyright (C) 2015 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kSharding
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/s/catalog/replset/sharding_catalog_manager_impl.h"
+
+#include <iomanip>
+
+#include "mongo/base/status.h"
+#include "mongo/base/status_with.h"
+#include "mongo/bson/bsonobjbuilder.h"
+#include "mongo/bson/util/bson_extract.h"
+#include "mongo/client/connection_string.h"
+#include "mongo/client/read_preference.h"
+#include "mongo/client/remote_command_targeter.h"
+#include "mongo/client/replica_set_monitor.h"
+#include "mongo/db/namespace_string.h"
+#include "mongo/db/operation_context.h"
+#include "mongo/db/s/type_shard_identity.h"
+#include "mongo/executor/network_interface.h"
+#include "mongo/executor/task_executor.h"
+#include "mongo/rpc/get_status_from_command_result.h"
+#include "mongo/s/catalog/sharding_catalog_client.h"
+#include "mongo/s/catalog/type_database.h"
+#include "mongo/s/catalog/type_shard.h"
+#include "mongo/s/client/shard.h"
+#include "mongo/s/client/shard_registry.h"
+#include "mongo/s/grid.h"
+#include "mongo/s/set_shard_version_request.h"
+#include "mongo/s/write_ops/batched_command_request.h"
+#include "mongo/s/write_ops/batched_command_response.h"
+#include "mongo/util/log.h"
+#include "mongo/util/mongoutils/str.h"
+#include "mongo/util/net/hostandport.h"
+
+namespace mongo {
+
+using std::string;
+using std::vector;
+using str::stream;
+
+namespace {
+
+const ReadPreferenceSetting kConfigReadSelector(ReadPreference::Nearest, TagSet{});
+
+const WriteConcernOptions kMajorityWriteConcern(WriteConcernOptions::kMajority,
+ // Note: Even though we're setting UNSET here,
+ // kMajority implies JOURNAL if journaling is
+ // supported by mongod and
+ // writeConcernMajorityJournalDefault is set to true
+ // in the ReplicaSetConfig.
+ WriteConcernOptions::SyncMode::UNSET,
+ Seconds(15));
+
+void toBatchError(const Status& status, BatchedCommandResponse* response) {
+ response->clear();
+ response->setErrCode(status.code());
+ response->setErrMessage(status.reason());
+ response->setOk(false);
+}
+
+/**
+ * Takes the response from running a batch write command and writes the appropriate response into
+ * *batchResponse, while also returning the Status of the operation.
+ */
+Status _processBatchWriteResponse(StatusWith<Shard::CommandResponse> response,
+ BatchedCommandResponse* batchResponse) {
+ Status status(ErrorCodes::InternalError, "status not set");
+
+ if (!response.isOK()) {
+ status = response.getStatus();
+ } else if (!response.getValue().commandStatus.isOK()) {
+ status = response.getValue().commandStatus;
+ } else if (!response.getValue().writeConcernStatus.isOK()) {
+ status = response.getValue().writeConcernStatus;
+ } else {
+ string errmsg;
+ if (!batchResponse->parseBSON(response.getValue().response, &errmsg)) {
+ status = Status(ErrorCodes::FailedToParse,
+ str::stream() << "Failed to parse config server response: " << errmsg);
+ } else {
+ status = batchResponse->toStatus();
+ }
+ }
+
+ if (!status.isOK()) {
+ toBatchError(status, batchResponse);
+ }
+
+ return status;
+}
+} // namespace
+
+
+ShardingCatalogManagerImpl::ShardingCatalogManagerImpl(
+ ShardingCatalogClient* catalogClient, std::unique_ptr<executor::TaskExecutor> addShardExecutor)
+ : _catalogClient(catalogClient), _executorForAddShard(std::move(addShardExecutor)) {}
+
+ShardingCatalogManagerImpl::~ShardingCatalogManagerImpl() = default;
+
+Status ShardingCatalogManagerImpl::startup() {
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+ if (_started) {
+ return Status::OK();
+ }
+ _started = true;
+ _executorForAddShard->startup();
+ return Status::OK();
+}
+
+void ShardingCatalogManagerImpl::shutDown(OperationContext* txn) {
+ LOG(1) << "ShardingCatalogManagerImpl::shutDown() called.";
+ {
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+ _inShutdown = true;
+ }
+
+ _executorForAddShard->shutdown();
+ _executorForAddShard->join();
+}
+
+StatusWith<Shard::CommandResponse> ShardingCatalogManagerImpl::_runCommandForAddShard(
+ OperationContext* txn,
+ RemoteCommandTargeter* targeter,
+ const std::string& dbName,
+ const BSONObj& cmdObj) {
+ auto host = targeter->findHost(ReadPreferenceSetting{ReadPreference::PrimaryOnly},
+ RemoteCommandTargeter::selectFindHostMaxWaitTime(txn));
+ if (!host.isOK()) {
+ return host.getStatus();
+ }
+
+ executor::RemoteCommandRequest request(
+ host.getValue(), dbName, cmdObj, rpc::makeEmptyMetadata(), Seconds(30));
+ StatusWith<executor::RemoteCommandResponse> swResponse =
+ Status(ErrorCodes::InternalError, "Internal error running command");
+
+ auto callStatus = _executorForAddShard->scheduleRemoteCommand(
+ request, [&swResponse](const executor::TaskExecutor::RemoteCommandCallbackArgs& args) {
+ swResponse = args.response;
+ });
+ if (!callStatus.isOK()) {
+ return callStatus.getStatus();
+ }
+
+ // Block until the command is carried out
+ _executorForAddShard->wait(callStatus.getValue());
+
+ if (!swResponse.isOK()) {
+ if (swResponse.getStatus().compareCode(ErrorCodes::ExceededTimeLimit)) {
+ LOG(0) << "Operation for addShard timed out with status " << swResponse.getStatus();
+ }
+ return swResponse.getStatus();
+ }
+
+ BSONObj responseObj = swResponse.getValue().data.getOwned();
+ BSONObj responseMetadata = swResponse.getValue().metadata.getOwned();
+ Status commandStatus = getStatusFromCommandResult(responseObj);
+ Status writeConcernStatus = getWriteConcernStatusFromCommandResult(responseObj);
+
+ return Shard::CommandResponse(std::move(responseObj),
+ std::move(responseMetadata),
+ std::move(commandStatus),
+ std::move(writeConcernStatus));
+}
+
+StatusWith<ShardType> ShardingCatalogManagerImpl::_validateHostAsShard(
+ OperationContext* txn,
+ ShardRegistry* shardRegistry,
+ const ConnectionString& connectionString,
+ const std::string* shardProposedName) {
+ if (connectionString.type() == ConnectionString::INVALID) {
+ return {ErrorCodes::BadValue, "Invalid connection string"};
+ }
+
+ if (shardProposedName && shardProposedName->empty()) {
+ return {ErrorCodes::BadValue, "shard name cannot be empty"};
+ }
+
+ // TODO: Don't create a detached Shard object, create a detached RemoteCommandTargeter instead.
+ const std::shared_ptr<Shard> shardConn{shardRegistry->createConnection(connectionString)};
+ invariant(shardConn);
+ auto targeter = shardConn->getTargeter();
+
+ // Check whether any host in the connection is already part of the cluster.
+ shardRegistry->reload(txn);
+ for (const auto& hostAndPort : connectionString.getServers()) {
+ std::shared_ptr<Shard> shard;
+ shard = shardRegistry->getShardNoReload(hostAndPort.toString());
+ if (shard) {
+ return {ErrorCodes::OperationFailed,
+ str::stream() << "'" << hostAndPort.toString() << "' "
+ << "is already a member of the existing shard '"
+ << shard->getConnString().toString()
+ << "' ("
+ << shard->getId()
+ << ")."};
+ }
+ }
+
+ // Check for mongos and older version mongod connections, and whether the hosts
+ // can be found for the user specified replset.
+ auto swCommandResponse =
+ _runCommandForAddShard(txn, targeter.get(), "admin", BSON("isMaster" << 1));
+ if (!swCommandResponse.isOK()) {
+ if (swCommandResponse.getStatus() == ErrorCodes::RPCProtocolNegotiationFailed) {
+ // Mongos to mongos commands are no longer supported in the wire protocol
+ // (because mongos does not support OP_COMMAND), similarly for a new mongos
+ // and an old mongod. So the call will fail in such cases.
+ // TODO: If/When mongos ever supports opCommands, this logic will break because
+ // cmdStatus will be OK.
+ return {ErrorCodes::RPCProtocolNegotiationFailed,
+ str::stream() << shardConn->toString()
+ << " does not recognize the RPC protocol being used. This is"
+ << " likely because it contains a node that is a mongos or an old"
+ << " version of mongod."};
+ } else {
+ return swCommandResponse.getStatus();
+ }
+ }
+
+ // Check for a command response error
+ auto resIsMasterStatus = std::move(swCommandResponse.getValue().commandStatus);
+ if (!resIsMasterStatus.isOK()) {
+ return {resIsMasterStatus.code(),
+ str::stream() << "Error running isMaster against " << shardConn->toString() << ": "
+ << causedBy(resIsMasterStatus)};
+ }
+
+ auto resIsMaster = std::move(swCommandResponse.getValue().response);
+
+ // Check whether there is a master. If there isn't, the replica set may not have been
+ // initiated. If the connection is a standalone, it will return true for isMaster.
+ bool isMaster;
+ Status status = bsonExtractBooleanField(resIsMaster, "ismaster", &isMaster);
+ if (!status.isOK()) {
+ return Status(status.code(),
+ str::stream() << "isMaster returned invalid 'ismaster' "
+ << "field when attempting to add "
+ << connectionString.toString()
+ << " as a shard: "
+ << status.reason());
+ }
+ if (!isMaster) {
+ return {ErrorCodes::NotMaster,
+ str::stream()
+ << connectionString.toString()
+ << " does not have a master. If this is a replica set, ensure that it has a"
+ << " healthy primary and that the set has been properly initiated."};
+ }
+
+ const string providedSetName = connectionString.getSetName();
+ const string foundSetName = resIsMaster["setName"].str();
+
+ // Make sure the specified replica set name (if any) matches the actual shard's replica set
+ if (providedSetName.empty() && !foundSetName.empty()) {
+ return {ErrorCodes::OperationFailed,
+ str::stream() << "host is part of set " << foundSetName << "; "
+ << "use replica set url format "
+ << "<setname>/<server1>,<server2>, ..."};
+ }
+
+ if (!providedSetName.empty() && foundSetName.empty()) {
+ return {ErrorCodes::OperationFailed,
+ str::stream() << "host did not return a set name; "
+ << "is the replica set still initializing? "
+ << resIsMaster};
+ }
+
+ // Make sure the set name specified in the connection string matches the one where its hosts
+ // belong into
+ if (!providedSetName.empty() && (providedSetName != foundSetName)) {
+ return {ErrorCodes::OperationFailed,
+ str::stream() << "the provided connection string (" << connectionString.toString()
+ << ") does not match the actual set name "
+ << foundSetName};
+ }
+
+ // Is it a config server?
+ if (resIsMaster.hasField("configsvr")) {
+ return {ErrorCodes::OperationFailed,
+ str::stream() << "Cannot add " << connectionString.toString()
+ << " as a shard since it is a config server"};
+ }
+
+ // If the shard is part of a replica set, make sure all the hosts mentioned in the connection
+ // string are part of the set. It is fine if not all members of the set are mentioned in the
+ // connection string, though.
+ if (!providedSetName.empty()) {
+ std::set<string> hostSet;
+
+ BSONObjIterator iter(resIsMaster["hosts"].Obj());
+ while (iter.more()) {
+ hostSet.insert(iter.next().String()); // host:port
+ }
+
+ if (resIsMaster["passives"].isABSONObj()) {
+ BSONObjIterator piter(resIsMaster["passives"].Obj());
+ while (piter.more()) {
+ hostSet.insert(piter.next().String()); // host:port
+ }
+ }
+
+ if (resIsMaster["arbiters"].isABSONObj()) {
+ BSONObjIterator piter(resIsMaster["arbiters"].Obj());
+ while (piter.more()) {
+ hostSet.insert(piter.next().String()); // host:port
+ }
+ }
+
+ vector<HostAndPort> hosts = connectionString.getServers();
+ for (size_t i = 0; i < hosts.size(); i++) {
+ const string host = hosts[i].toString(); // host:port
+ if (hostSet.find(host) == hostSet.end()) {
+ return {ErrorCodes::OperationFailed,
+ str::stream() << "in seed list " << connectionString.toString() << ", host "
+ << host
+ << " does not belong to replica set "
+ << foundSetName
+ << "; found "
+ << resIsMaster.toString()};
+ }
+ }
+ }
+
+ string actualShardName;
+
+ if (shardProposedName) {
+ actualShardName = *shardProposedName;
+ } else if (!foundSetName.empty()) {
+ // Default it to the name of the replica set
+ actualShardName = foundSetName;
+ }
+
+ // Disallow adding shard replica set with name 'config'
+ if (actualShardName == "config") {
+ return {ErrorCodes::BadValue, "use of shard replica set with name 'config' is not allowed"};
+ }
+
+ // Retrieve the most up to date connection string that we know from the replica set monitor (if
+ // this is a replica set shard, otherwise it will be the same value as connectionString).
+ ConnectionString actualShardConnStr = shardConn->getTargeter()->connectionString();
+
+ ShardType shard;
+ shard.setName(actualShardName);
+ shard.setHost(actualShardConnStr.toString());
+
+ return shard;
+}
+
+StatusWith<std::vector<std::string>> ShardingCatalogManagerImpl::_getDBNamesListFromShard(
+ OperationContext* txn, ShardRegistry* shardRegistry, const ConnectionString& connectionString) {
+ // TODO: Don't create a detached Shard object, create a detached RemoteCommandTargeter instead.
+ const std::shared_ptr<Shard> shardConn{
+ shardRegistry->createConnection(connectionString).release()};
+ invariant(shardConn);
+
+ auto swCommandResponse = _runCommandForAddShard(
+ txn, shardConn->getTargeter().get(), "admin", BSON("listDatabases" << 1));
+ if (!swCommandResponse.isOK()) {
+ return swCommandResponse.getStatus();
+ }
+
+ auto cmdStatus = std::move(swCommandResponse.getValue().commandStatus);
+ if (!cmdStatus.isOK()) {
+ return cmdStatus;
+ }
+
+ auto cmdResult = std::move(swCommandResponse.getValue().response);
+
+ vector<string> dbNames;
+
+ for (const auto& dbEntry : cmdResult["databases"].Obj()) {
+ const string& dbName = dbEntry["name"].String();
+
+ if (!(dbName == "local" || dbName == "admin")) {
+ dbNames.push_back(dbName);
+ }
+ }
+
+ return dbNames;
+}
+
+StatusWith<std::string> ShardingCatalogManagerImpl::_generateNewShardName(OperationContext* txn) {
+ BSONObjBuilder shardNameRegex;
+ shardNameRegex.appendRegex(ShardType::name(), "^shard");
+
+ auto findStatus = Grid::get(txn)->shardRegistry()->getConfigShard()->exhaustiveFindOnConfig(
+ txn,
+ kConfigReadSelector,
+ NamespaceString(ShardType::ConfigNS),
+ shardNameRegex.obj(),
+ BSON(ShardType::name() << -1),
+ 1);
+ if (!findStatus.isOK()) {
+ return findStatus.getStatus();
+ }
+
+ const auto& docs = findStatus.getValue().docs;
+
+ int count = 0;
+ if (!docs.empty()) {
+ const auto shardStatus = ShardType::fromBSON(docs.front());
+ if (!shardStatus.isOK()) {
+ return shardStatus.getStatus();
+ }
+
+ std::istringstream is(shardStatus.getValue().getName().substr(5));
+ is >> count;
+ count++;
+ }
+
+ // TODO fix so that we can have more than 10000 automatically generated shard names
+ if (count < 9999) {
+ std::stringstream ss;
+ ss << "shard" << std::setfill('0') << std::setw(4) << count;
+ return ss.str();
+ }
+
+ return Status(ErrorCodes::OperationFailed, "unable to generate new shard name");
+}
+
+StatusWith<string> ShardingCatalogManagerImpl::addShard(
+ OperationContext* txn,
+ const std::string* shardProposedName,
+ const ConnectionString& shardConnectionString,
+ const long long maxSize) {
+ // Validate the specified connection string may serve as shard at all
+ auto shardStatus =
+ _validateHostAsShard(txn, grid.shardRegistry(), shardConnectionString, shardProposedName);
+ if (!shardStatus.isOK()) {
+ // TODO: This is a workaround for the case were we could have some bad shard being
+ // requested to be added and we put that bad connection string on the global replica set
+ // monitor registry. It needs to be cleaned up so that when a correct replica set is added,
+ // it will be recreated.
+ ReplicaSetMonitor::remove(shardConnectionString.getSetName());
+ return shardStatus.getStatus();
+ }
+
+ ShardType& shardType = shardStatus.getValue();
+
+ auto dbNamesStatus = _getDBNamesListFromShard(txn, grid.shardRegistry(), shardConnectionString);
+ if (!dbNamesStatus.isOK()) {
+ return dbNamesStatus.getStatus();
+ }
+
+ // Check that none of the existing shard candidate's dbs exist already
+ for (const string& dbName : dbNamesStatus.getValue()) {
+ auto dbt = _catalogClient->getDatabase(txn, dbName);
+ if (dbt.isOK()) {
+ const auto& dbDoc = dbt.getValue().value;
+ return Status(ErrorCodes::OperationFailed,
+ str::stream() << "can't add shard "
+ << "'"
+ << shardConnectionString.toString()
+ << "'"
+ << " because a local database '"
+ << dbName
+ << "' exists in another "
+ << dbDoc.getPrimary());
+ } else if (dbt != ErrorCodes::NamespaceNotFound) {
+ return dbt.getStatus();
+ }
+ }
+
+ // If a name for a shard wasn't provided, generate one
+ if (shardType.getName().empty()) {
+ StatusWith<string> result = _generateNewShardName(txn);
+ if (!result.isOK()) {
+ return result.getStatus();
+ }
+ shardType.setName(result.getValue());
+ }
+
+ if (maxSize > 0) {
+ shardType.setMaxSizeMB(maxSize);
+ }
+
+ ShardIdentityType shardIdentity;
+ shardIdentity.setConfigsvrConnString(
+ Grid::get(txn)->shardRegistry()->getConfigServerConnectionString());
+ shardIdentity.setShardName(shardType.getName());
+ shardIdentity.setClusterId(Grid::get(txn)->shardRegistry()->getClusterId());
+ auto validateStatus = shardIdentity.validate();
+ if (!validateStatus.isOK()) {
+ return validateStatus;
+ }
+
+ log() << "going to insert shardIdentity document into shard: " << shardIdentity.toString();
+
+ auto updateRequest = shardIdentity.createUpsertForAddShard();
+ BatchedCommandRequest commandRequest(updateRequest.release());
+ commandRequest.setNS(NamespaceString::kConfigCollectionNamespace);
+ commandRequest.setWriteConcern(kMajorityWriteConcern.toBSON());
+
+ const std::shared_ptr<Shard> shardConn{
+ Grid::get(txn)->shardRegistry()->createConnection(shardConnectionString)};
+ invariant(shardConn);
+ auto targeter = shardConn->getTargeter();
+
+ auto swCommandResponse =
+ _runCommandForAddShard(txn, targeter.get(), "admin", commandRequest.toBSON());
+
+ if (!swCommandResponse.isOK()) {
+ return swCommandResponse.getStatus();
+ }
+
+ auto commandResponse = std::move(swCommandResponse.getValue());
+
+ BatchedCommandResponse batchResponse;
+ auto batchResponseStatus = _processBatchWriteResponse(commandResponse, &batchResponse);
+ if (!batchResponseStatus.isOK()) {
+ return batchResponseStatus;
+ }
+
+ log() << "going to insert new entry for shard into config.shards: " << shardType.toString();
+
+ Status result =
+ _catalogClient->insertConfigDocument(txn, ShardType::ConfigNS, shardType.toBSON());
+ if (!result.isOK()) {
+ log() << "error adding shard: " << shardType.toBSON() << " err: " << result.reason();
+ if (result == ErrorCodes::DuplicateKey) {
+ // TODO(SERVER-24213): adding a shard that already exists should be considered success,
+ // however this approach does no validation that we are adding the shard with the same
+ // options. It also does not protect against adding the same shard with a different
+ // shard name and slightly different connection string. This is a temporary hack to
+ // get the continuous stepdown suite passing.
+ warning() << "Received duplicate key error when inserting new shard with name "
+ << shardType.getName() << " and connection string "
+ << shardConnectionString.toString()
+ << " to config.shards collection. This most likely means that there was an "
+ "attempt to add a shard that already exists in the cluster";
+ return shardType.getName();
+ }
+ return result;
+ }
+
+ // Add all databases which were discovered on the new shard
+ for (const string& dbName : dbNamesStatus.getValue()) {
+ DatabaseType dbt;
+ dbt.setName(dbName);
+ dbt.setPrimary(shardType.getName());
+ dbt.setSharded(false);
+
+ Status status = _catalogClient->updateDatabase(txn, dbName, dbt);
+ if (!status.isOK()) {
+ log() << "adding shard " << shardConnectionString.toString()
+ << " even though could not add database " << dbName;
+ }
+ }
+
+ // Record in changelog
+ BSONObjBuilder shardDetails;
+ shardDetails.append("name", shardType.getName());
+ shardDetails.append("host", shardConnectionString.toString());
+
+ _catalogClient->logChange(txn, "addShard", "", shardDetails.obj());
+
+ // Ensure the added shard is visible to this process.
+ auto shardRegistry = Grid::get(txn)->shardRegistry();
+ if (!shardRegistry->getShard(txn, shardType.getName())) {
+ return {ErrorCodes::OperationFailed,
+ "Could not find shard metadata for shard after adding it. This most likely "
+ "indicates that the shard was removed immediately after it was added."};
+ }
+
+ return shardType.getName();
+}
+
+void ShardingCatalogManagerImpl::appendConnectionStats(executor::ConnectionPoolStats* stats) {
+ _executorForAddShard->appendConnectionStats(stats);
+}
+
+} // namespace mongo
diff --git a/src/mongo/s/catalog/replset/sharding_catalog_manager_impl.h b/src/mongo/s/catalog/replset/sharding_catalog_manager_impl.h
new file mode 100644
index 00000000000..7c2bbd05479
--- /dev/null
+++ b/src/mongo/s/catalog/replset/sharding_catalog_manager_impl.h
@@ -0,0 +1,145 @@
+/**
+ * Copyright (C) 2015 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#pragma once
+
+#include <vector>
+
+#include "mongo/s/catalog/sharding_catalog_manager.h"
+#include "mongo/s/client/shard_registry.h"
+#include "mongo/stdx/mutex.h"
+
+namespace mongo {
+
+class DatabaseType;
+class ShardingCatalogClient;
+
+namespace executor {
+class TaskExecutor;
+} // namespace executor
+
+/**
+ * Implements the catalog manager for writing to replica set config servers.
+ */
+class ShardingCatalogManagerImpl final : public ShardingCatalogManager {
+public:
+ ShardingCatalogManagerImpl(ShardingCatalogClient* catalogClient,
+ std::unique_ptr<executor::TaskExecutor> addShardExecutor);
+ virtual ~ShardingCatalogManagerImpl();
+
+ /**
+ * Safe to call multiple times as long as the calls are externally synchronized to be
+ * non-overlapping.
+ */
+ Status startup() override;
+
+ void shutDown(OperationContext* txn) override;
+
+ StatusWith<std::string> addShard(OperationContext* txn,
+ const std::string* shardProposedName,
+ const ConnectionString& shardConnectionString,
+ const long long maxSize) override;
+
+ void appendConnectionStats(executor::ConnectionPoolStats* stats) override;
+
+private:
+ /**
+ * Generates a unique name to be given to a newly added shard.
+ */
+ StatusWith<std::string> _generateNewShardName(OperationContext* txn);
+
+ /**
+ * Validates that the specified connection string can serve as a shard server. In particular,
+ * this function checks that the shard can be contacted, that it is not already member of
+ * another sharded cluster and etc.
+ *
+ * @param shardRegistry Shard registry to use for getting a targeter to the shard-to-be.
+ * @param connectionString Connection string to be attempted as a shard host.
+ * @param shardProposedName Optional proposed name for the shard. Can be omitted in which case
+ * a unique name for the shard will be generated from the shard's connection string. If it
+ * is not omitted, the value cannot be the empty string.
+ *
+ * On success returns a partially initialized ShardType object corresponding to the requested
+ * shard. It will have the hostName field set and optionally the name, if the name could be
+ * generated from either the proposed name or the connection string set name. The returned
+ * shard's name should be checked and if empty, one should be generated using some uniform
+ * algorithm.
+ */
+ StatusWith<ShardType> _validateHostAsShard(OperationContext* txn,
+ ShardRegistry* shardRegistry,
+ const ConnectionString& connectionString,
+ const std::string* shardProposedName);
+
+ /**
+ * Runs the listDatabases command on the specified host and returns the names of all databases
+ * it returns excluding those named local and admin, since they serve administrative purpose.
+ */
+ StatusWith<std::vector<std::string>> _getDBNamesListFromShard(
+ OperationContext* txn,
+ ShardRegistry* shardRegistry,
+ const ConnectionString& connectionString);
+
+ /**
+ * Runs a command against a "shard" that is not yet in the cluster and thus not present in the
+ * ShardRegistry.
+ */
+ StatusWith<Shard::CommandResponse> _runCommandForAddShard(OperationContext* txn,
+ RemoteCommandTargeter* targeter,
+ const std::string& dbName,
+ const BSONObj& cmdObj);
+
+
+ //
+ // All member variables are labeled with one of the following codes indicating the
+ // synchronization rules for accessing them.
+ //
+ // (M) Must hold _mutex for access.
+ // (R) Read only, can only be written during initialization.
+ // (S) Self-synchronizing; access in any way from any context.
+ //
+
+ stdx::mutex _mutex;
+
+ // Pointer to the ShardingCatalogClient that can be used to read config server data.
+ // This pointer is not owned, so it is important that the object it points to continues to be
+ // valid for the lifetime of this ShardingCatalogManager.
+ ShardingCatalogClient* _catalogClient; // (R)
+
+ // Executor specifically used for sending commands to servers that are in the process of being
+ // added as shards. Does not have any connection hook set on it, thus it can be used to talk
+ // to servers that are not yet in the ShardRegistry.
+ std::unique_ptr<executor::TaskExecutor> _executorForAddShard; // (R)
+
+ // True if shutDown() has been called. False, otherwise.
+ bool _inShutdown = false; // (M)
+
+ // True if startup() has been called.
+ bool _started = false; // (M)
+};
+
+} // namespace mongo
diff --git a/src/mongo/s/catalog/sharding_catalog_client.h b/src/mongo/s/catalog/sharding_catalog_client.h
index e7d6716b1b0..8317debf6db 100644
--- a/src/mongo/s/catalog/sharding_catalog_client.h
+++ b/src/mongo/s/catalog/sharding_catalog_client.h
@@ -95,7 +95,7 @@ public:
/**
* Performs implementation-specific startup tasks. Must be run after the catalog client
- * has been installed into the global 'grid' object. Implementation do not need to guarantee
+ * has been installed into the global 'grid' object. Implementations do not need to guarantee
* thread safety so callers should employ proper synchronization when calling this method.
*/
virtual Status startup() = 0;
@@ -140,24 +140,6 @@ public:
const std::set<ShardId>& initShardIds) = 0;
/**
- *
- * Adds a new shard. It expects a standalone mongod process or replica set to be running
- * on the provided address.
- *
- * @param shardProposedName is an optional string with the proposed name of the shard.
- * If it is nullptr, a name will be automatically generated; if not nullptr, it cannot
- * contain the empty string.
- * @param shardConnectionString is the connection string of the shard being added.
- * @param maxSize is the optional space quota in bytes. Zeros means there's
- * no limitation to space usage.
- * @return either an !OK status or the name of the newly added shard.
- */
- virtual StatusWith<std::string> addShard(OperationContext* txn,
- const std::string* shardProposedName,
- const ConnectionString& shardConnectionString,
- const long long maxSize) = 0;
-
- /**
* Tries to remove a shard. To completely remove a shard from a sharded cluster,
* the data residing in that shard must be moved to the remaining shards in the
* cluster by "draining" chunks from that shard.
@@ -440,11 +422,6 @@ public:
virtual Status appendInfoForConfigServerDatabases(OperationContext* txn,
BSONArrayBuilder* builder) = 0;
- /**
- * Append information about the connection pools owned by the CatalogClient.
- */
- virtual void appendConnectionStats(executor::ConnectionPoolStats* stats) = 0;
-
virtual StatusWith<DistLockManager::ScopedDistLock> distLock(
OperationContext* txn,
diff --git a/src/mongo/s/catalog/sharding_catalog_client_mock.cpp b/src/mongo/s/catalog/sharding_catalog_client_mock.cpp
index 72836e6b4d7..e13711c040c 100644
--- a/src/mongo/s/catalog/sharding_catalog_client_mock.cpp
+++ b/src/mongo/s/catalog/sharding_catalog_client_mock.cpp
@@ -67,14 +67,6 @@ Status ShardingCatalogClientMock::shardCollection(OperationContext* txn,
return {ErrorCodes::InternalError, "Method not implemented"};
}
-StatusWith<string> ShardingCatalogClientMock::addShard(
- OperationContext* txn,
- const std::string* shardProposedName,
- const ConnectionString& shardConnectionString,
- const long long maxSize) {
- return {ErrorCodes::InternalError, "Method not implemented"};
-}
-
StatusWith<ShardDrainingStatus> ShardingCatalogClientMock::removeShard(OperationContext* txn,
const string& name) {
return ShardDrainingStatus::COMPLETED;
@@ -233,6 +225,4 @@ Status ShardingCatalogClientMock::appendInfoForConfigServerDatabases(OperationCo
return Status::OK();
}
-void ShardingCatalogClientMock::appendConnectionStats(executor::ConnectionPoolStats* stats) {}
-
} // namespace mongo
diff --git a/src/mongo/s/catalog/sharding_catalog_client_mock.h b/src/mongo/s/catalog/sharding_catalog_client_mock.h
index 177530a8bdc..1bd8c5841e3 100644
--- a/src/mongo/s/catalog/sharding_catalog_client_mock.h
+++ b/src/mongo/s/catalog/sharding_catalog_client_mock.h
@@ -54,11 +54,6 @@ public:
const std::vector<BSONObj>& initPoints,
const std::set<ShardId>& initShardIds) override;
- StatusWith<std::string> addShard(OperationContext* txn,
- const std::string* shardProposedName,
- const ConnectionString& shardConnectionString,
- const long long maxSize) override;
-
StatusWith<ShardDrainingStatus> removeShard(OperationContext* txn,
const std::string& name) override;
@@ -166,8 +161,6 @@ public:
Status appendInfoForConfigServerDatabases(OperationContext* txn,
BSONArrayBuilder* builder) override;
- void appendConnectionStats(executor::ConnectionPoolStats* stats) override;
-
private:
std::unique_ptr<DistLockManagerMock> _mockDistLockMgr;
};
diff --git a/src/mongo/s/catalog/sharding_catalog_manager.h b/src/mongo/s/catalog/sharding_catalog_manager.h
new file mode 100644
index 00000000000..2ac9413128f
--- /dev/null
+++ b/src/mongo/s/catalog/sharding_catalog_manager.h
@@ -0,0 +1,103 @@
+/**
+ * Copyright (C) 2015 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#pragma once
+
+#include <string>
+
+#include "mongo/base/disallow_copying.h"
+
+namespace mongo {
+
+class ConnectionString;
+class OperationContext;
+class Status;
+template <typename T>
+class StatusWith;
+
+namespace executor {
+struct ConnectionPoolStats;
+}
+
+/**
+ * Abstracts writes of the sharding catalog metadata.
+ *
+ * All implementations of this interface should go directly to the persistent backing store
+ * and should avoid doing any caching of their own. The caching is delegated to a parallel
+ * read-only view of the catalog, which is maintained by a higher level code.
+ *
+ * TODO: Currently the code responsible for writing the sharding catalog metadata is split between
+ * this class and ShardingCatalogClient. Eventually all methods that write catalog data should be
+ * moved out of ShardingCatalogClient and into ShardingCatalogManager, here.
+ */
+class ShardingCatalogManager {
+ MONGO_DISALLOW_COPYING(ShardingCatalogManager);
+
+public:
+ virtual ~ShardingCatalogManager() = default;
+
+ /**
+ * Performs implementation-specific startup tasks. Must be run after the catalog manager
+ * has been installed into the global 'grid' object. Implementations do not need to guarantee
+ * thread safety so callers should employ proper synchronization when calling this method.
+ */
+ virtual Status startup() = 0;
+
+ /**
+ * Performs necessary cleanup when shutting down cleanly.
+ */
+ virtual void shutDown(OperationContext* txn) = 0;
+
+ /**
+ *
+ * Adds a new shard. It expects a standalone mongod process or replica set to be running
+ * on the provided address.
+ *
+ * @param shardProposedName is an optional string with the proposed name of the shard.
+ * If it is nullptr, a name will be automatically generated; if not nullptr, it cannot
+ * contain the empty string.
+ * @param shardConnectionString is the connection string of the shard being added.
+ * @param maxSize is the optional space quota in bytes. Zeros means there's
+ * no limitation to space usage.
+ * @return either an !OK status or the name of the newly added shard.
+ */
+ virtual StatusWith<std::string> addShard(OperationContext* txn,
+ const std::string* shardProposedName,
+ const ConnectionString& shardConnectionString,
+ const long long maxSize) = 0;
+
+ /**
+ * Append information about the connection pools owned by the CatalogManager.
+ */
+ virtual void appendConnectionStats(executor::ConnectionPoolStats* stats) = 0;
+
+protected:
+ ShardingCatalogManager() = default;
+};
+
+} // namespace mongo
diff --git a/src/mongo/s/catalog/sharding_catalog_manager_mock.cpp b/src/mongo/s/catalog/sharding_catalog_manager_mock.cpp
new file mode 100644
index 00000000000..1768a1a9925
--- /dev/null
+++ b/src/mongo/s/catalog/sharding_catalog_manager_mock.cpp
@@ -0,0 +1,60 @@
+/**
+ * Copyright (C) 2015 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/s/catalog/sharding_catalog_manager_mock.h"
+
+#include "mongo/base/status.h"
+#include "mongo/base/status_with.h"
+
+namespace mongo {
+
+using std::string;
+
+ShardingCatalogManagerMock::ShardingCatalogManagerMock() = default;
+
+ShardingCatalogManagerMock::~ShardingCatalogManagerMock() = default;
+
+Status ShardingCatalogManagerMock::startup() {
+ return {ErrorCodes::InternalError, "Method not implemented"};
+}
+
+void ShardingCatalogManagerMock::shutDown(OperationContext* txn) {}
+
+StatusWith<string> ShardingCatalogManagerMock::addShard(
+ OperationContext* txn,
+ const std::string* shardProposedName,
+ const ConnectionString& shardConnectionString,
+ const long long maxSize) {
+ return {ErrorCodes::InternalError, "Method not implemented"};
+}
+
+void ShardingCatalogManagerMock::appendConnectionStats(executor::ConnectionPoolStats* stats) {}
+
+} // namespace mongo
diff --git a/src/mongo/s/catalog/sharding_catalog_manager_mock.h b/src/mongo/s/catalog/sharding_catalog_manager_mock.h
new file mode 100644
index 00000000000..a56681e37fa
--- /dev/null
+++ b/src/mongo/s/catalog/sharding_catalog_manager_mock.h
@@ -0,0 +1,55 @@
+/**
+ * Copyright (C) 2015 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#pragma once
+
+#include "mongo/s/catalog/sharding_catalog_manager.h"
+
+namespace mongo {
+
+/**
+ * A dummy implementation of ShardingCatalogManager for testing purposes.
+ */
+class ShardingCatalogManagerMock : public ShardingCatalogManager {
+public:
+ ShardingCatalogManagerMock();
+ ~ShardingCatalogManagerMock();
+
+ Status startup() override;
+
+ void shutDown(OperationContext* txn) override;
+
+ StatusWith<std::string> addShard(OperationContext* txn,
+ const std::string* shardProposedName,
+ const ConnectionString& shardConnectionString,
+ const long long maxSize) override;
+
+ void appendConnectionStats(executor::ConnectionPoolStats* stats) override;
+};
+
+} // namespace mongo
diff --git a/src/mongo/s/grid.cpp b/src/mongo/s/grid.cpp
index d4a6a827af5..c6a03319b1c 100644
--- a/src/mongo/s/grid.cpp
+++ b/src/mongo/s/grid.cpp
@@ -38,6 +38,7 @@
#include "mongo/s/balancer/balancer_configuration.h"
#include "mongo/s/catalog/catalog_cache.h"
#include "mongo/s/catalog/sharding_catalog_client.h"
+#include "mongo/s/catalog/sharding_catalog_manager.h"
#include "mongo/s/client/shard_factory.h"
#include "mongo/s/client/shard_registry.h"
#include "mongo/s/query/cluster_cursor_manager.h"
@@ -58,6 +59,7 @@ Grid* Grid::get(OperationContext* operationContext) {
}
void Grid::init(std::unique_ptr<ShardingCatalogClient> catalogClient,
+ std::unique_ptr<ShardingCatalogManager> catalogManager,
std::unique_ptr<CatalogCache> catalogCache,
std::unique_ptr<ShardRegistry> shardRegistry,
std::unique_ptr<ClusterCursorManager> cursorManager,
@@ -65,6 +67,7 @@ void Grid::init(std::unique_ptr<ShardingCatalogClient> catalogClient,
std::unique_ptr<executor::TaskExecutorPool> executorPool,
executor::NetworkInterface* network) {
invariant(!_catalogClient);
+ invariant(!_catalogManager);
invariant(!_catalogCache);
invariant(!_shardRegistry);
invariant(!_cursorManager);
@@ -73,6 +76,7 @@ void Grid::init(std::unique_ptr<ShardingCatalogClient> catalogClient,
invariant(!_network);
_catalogClient = std::move(catalogClient);
+ _catalogManager = std::move(catalogManager);
_catalogCache = std::move(catalogCache);
_shardRegistry = std::move(shardRegistry);
_cursorManager = std::move(cursorManager);
@@ -108,6 +112,7 @@ void Grid::advanceConfigOpTime(repl::OpTime opTime) {
}
void Grid::clearForUnitTests() {
+ _catalogManager.reset();
_catalogClient.reset();
_catalogCache.reset();
_shardRegistry.reset();
diff --git a/src/mongo/s/grid.h b/src/mongo/s/grid.h
index 4cc1cf71970..54ff81f1a5b 100644
--- a/src/mongo/s/grid.h
+++ b/src/mongo/s/grid.h
@@ -38,6 +38,7 @@ namespace mongo {
class BalancerConfiguration;
class CatalogCache;
class ShardingCatalogClient;
+class ShardingCatalogManager;
class ClusterCursorManager;
class OperationContext;
class ShardRegistry;
@@ -69,6 +70,7 @@ public:
* state using clearForUnitTests.
*/
void init(std::unique_ptr<ShardingCatalogClient> catalogClient,
+ std::unique_ptr<ShardingCatalogManager> catalogManager,
std::unique_ptr<CatalogCache> catalogCache,
std::unique_ptr<ShardRegistry> shardRegistry,
std::unique_ptr<ClusterCursorManager> cursorManager,
@@ -98,6 +100,14 @@ public:
return _catalogClient.get();
}
+ /**
+ * Returns a pointer to a ShardingCatalogManager to use for manipulating catalog data stored on
+ * the config servers.
+ */
+ ShardingCatalogManager* catalogManager() {
+ return _catalogManager.get();
+ }
+
CatalogCache* catalogCache() const {
return _catalogCache.get();
}
@@ -149,6 +159,7 @@ public:
private:
std::unique_ptr<ShardingCatalogClient> _catalogClient;
+ std::unique_ptr<ShardingCatalogManager> _catalogManager;
std::unique_ptr<CatalogCache> _catalogCache;
std::unique_ptr<ShardRegistry> _shardRegistry;
std::unique_ptr<ClusterCursorManager> _cursorManager;
diff --git a/src/mongo/s/server.cpp b/src/mongo/s/server.cpp
index 4fc1248f778..09b29dff110 100644
--- a/src/mongo/s/server.cpp
+++ b/src/mongo/s/server.cpp
@@ -63,6 +63,7 @@
#include "mongo/s/balancer/balancer.h"
#include "mongo/s/balancer/balancer_configuration.h"
#include "mongo/s/catalog/sharding_catalog_client.h"
+#include "mongo/s/catalog/sharding_catalog_manager.h"
#include "mongo/s/catalog/type_chunk.h"
#include "mongo/s/catalog/type_lockpings.h"
#include "mongo/s/catalog/type_locks.h"
@@ -300,9 +301,12 @@ static Status initializeSharding(OperationContext* txn) {
auto shardFactory =
stdx::make_unique<ShardFactory>(std::move(buildersMap), std::move(targeterFactory));
- Status status =
- initializeGlobalShardingState(mongosGlobalParams.configdbs, std::move(shardFactory), []() {
- return stdx::make_unique<rpc::ShardingEgressMetadataHookForMongos>();
+ Status status = initializeGlobalShardingState(
+ mongosGlobalParams.configdbs,
+ std::move(shardFactory),
+ []() { return stdx::make_unique<rpc::ShardingEgressMetadataHookForMongos>(); },
+ [](ShardingCatalogClient* catalogClient, std::unique_ptr<executor::TaskExecutor> executor) {
+ return nullptr; // Only config servers get a real ShardingCatalogManager.
});
if (!status.isOK()) {
diff --git a/src/mongo/s/sharding_initialization.cpp b/src/mongo/s/sharding_initialization.cpp
index 6138842505c..9c3b2b60876 100644
--- a/src/mongo/s/sharding_initialization.cpp
+++ b/src/mongo/s/sharding_initialization.cpp
@@ -51,6 +51,7 @@
#include "mongo/s/catalog/replset/dist_lock_catalog_impl.h"
#include "mongo/s/catalog/replset/replset_dist_lock_manager.h"
#include "mongo/s/catalog/replset/sharding_catalog_client_impl.h"
+#include "mongo/s/catalog/replset/sharding_catalog_manager_impl.h"
#include "mongo/s/client/shard_factory.h"
#include "mongo/s/client/shard_registry.h"
#include "mongo/s/client/sharding_network_connection_hook.h"
@@ -94,10 +95,7 @@ std::unique_ptr<ShardingCatalogClient> makeCatalogClient(ServiceContext* service
ReplSetDistLockManager::kDistLockPingInterval,
ReplSetDistLockManager::kDistLockExpirationTime);
- return stdx::make_unique<ShardingCatalogClientImpl>(
- std::move(distLockManager),
- makeTaskExecutor(
- executor::makeNetworkInterface("NetworkInterfaceASIO-AddShard-TaskExecutor")));
+ return stdx::make_unique<ShardingCatalogClientImpl>(std::move(distLockManager));
}
std::unique_ptr<TaskExecutorPool> makeTaskExecutorPool(
@@ -130,7 +128,8 @@ std::unique_ptr<TaskExecutorPool> makeTaskExecutorPool(
Status initializeGlobalShardingState(const ConnectionString& configCS,
std::unique_ptr<ShardFactory> shardFactory,
- rpc::ShardingEgressMetadataHookBuilder hookBuilder) {
+ rpc::ShardingEgressMetadataHookBuilder hookBuilder,
+ ShardingCatalogManagerBuilder catalogManagerBuilder) {
if (configCS.type() == ConnectionString::INVALID) {
return {ErrorCodes::BadValue, "Unrecognized connection string."};
}
@@ -150,8 +149,15 @@ Status initializeGlobalShardingState(const ConnectionString& configCS,
HostAndPort(getHostName(), serverGlobalParams.port));
auto rawCatalogClient = catalogClient.get();
+
+ std::unique_ptr<ShardingCatalogManager> catalogManager = catalogManagerBuilder(
+ rawCatalogClient,
+ makeTaskExecutor(executor::makeNetworkInterface("AddShard-TaskExecutor")));
+ auto rawCatalogManager = catalogManager.get();
+
grid.init(
std::move(catalogClient),
+ std::move(catalogManager),
stdx::make_unique<CatalogCache>(),
std::move(shardRegistry),
stdx::make_unique<ClusterCursorManager>(getGlobalServiceContext()->getPreciseClockSource()),
@@ -164,6 +170,14 @@ Status initializeGlobalShardingState(const ConnectionString& configCS,
return status;
}
+ if (serverGlobalParams.clusterRole == ClusterRole::ConfigServer) {
+ // Only config servers get a ShardingCatalogManager.
+ status = rawCatalogManager->startup();
+ if (!status.isOK()) {
+ return status;
+ }
+ }
+
return Status::OK();
}
diff --git a/src/mongo/s/sharding_initialization.h b/src/mongo/s/sharding_initialization.h
index b7cb2e174f0..a782dc6474a 100644
--- a/src/mongo/s/sharding_initialization.h
+++ b/src/mongo/s/sharding_initialization.h
@@ -35,10 +35,18 @@
namespace mongo {
+namespace executor {
+class TaskExecutor;
+} // namespace executor
+
class ConnectionString;
class OperationContext;
class ShardFactory;
class Status;
+class ShardingCatalogClient;
+class ShardingCatalogManager;
+using ShardingCatalogManagerBuilder = stdx::function<std::unique_ptr<ShardingCatalogManager>(
+ ShardingCatalogClient*, std::unique_ptr<executor::TaskExecutor>)>;
namespace rpc {
class ShardingEgressMetadataHook;
@@ -52,7 +60,8 @@ using ShardingEgressMetadataHookBuilder =
*/
Status initializeGlobalShardingState(const ConnectionString& configCS,
std::unique_ptr<ShardFactory> shardFactory,
- rpc::ShardingEgressMetadataHookBuilder hookBuilder);
+ rpc::ShardingEgressMetadataHookBuilder hookBuilder,
+ ShardingCatalogManagerBuilder catalogManagerBuilder);
/**
* Tries to contact the config server and reload the shard registry until it succeeds or
diff --git a/src/mongo/s/sharding_test_fixture.cpp b/src/mongo/s/sharding_test_fixture.cpp
index d50b2e75115..0d66216950b 100644
--- a/src/mongo/s/sharding_test_fixture.cpp
+++ b/src/mongo/s/sharding_test_fixture.cpp
@@ -51,6 +51,7 @@
#include "mongo/s/catalog/catalog_cache.h"
#include "mongo/s/catalog/dist_lock_manager_mock.h"
#include "mongo/s/catalog/replset/sharding_catalog_client_impl.h"
+#include "mongo/s/catalog/replset/sharding_catalog_manager_impl.h"
#include "mongo/s/catalog/type_changelog.h"
#include "mongo/s/catalog/type_collection.h"
#include "mongo/s/catalog/type_shard.h"
@@ -122,11 +123,15 @@ void ShardingTestFixture::setUp() {
auto uniqueDistLockManager = stdx::make_unique<DistLockManagerMock>();
_distLockManager = uniqueDistLockManager.get();
std::unique_ptr<ShardingCatalogClientImpl> catalogClient(
- stdx::make_unique<ShardingCatalogClientImpl>(std::move(uniqueDistLockManager),
- std::move(specialExec)));
+ stdx::make_unique<ShardingCatalogClientImpl>(std::move(uniqueDistLockManager)));
_catalogClient = catalogClient.get();
catalogClient->startup();
+ std::unique_ptr<ShardingCatalogManagerImpl> catalogManager(
+ stdx::make_unique<ShardingCatalogManagerImpl>(_catalogClient, std::move(specialExec)));
+ _catalogManager = catalogManager.get();
+ catalogManager->startup();
+
ConnectionString configCS = ConnectionString::forReplicaSet(
"configRS", {HostAndPort{"TestHost1"}, HostAndPort{"TestHost2"}});
@@ -171,6 +176,7 @@ void ShardingTestFixture::setUp() {
// For now initialize the global grid object. All sharding objects will be accessible from there
// until we get rid of it.
grid.init(std::move(catalogClient),
+ std::move(catalogManager),
stdx::make_unique<CatalogCache>(),
std::move(shardRegistry),
stdx::make_unique<ClusterCursorManager>(_service->getPreciseClockSource()),
@@ -181,6 +187,7 @@ void ShardingTestFixture::setUp() {
void ShardingTestFixture::tearDown() {
grid.getExecutorPool()->shutdownAndJoin();
+ grid.catalogManager()->shutDown(_opCtx.get());
grid.catalogClient(_opCtx.get())->shutDown(_opCtx.get());
grid.clearForUnitTests();
@@ -200,6 +207,10 @@ ShardingCatalogClient* ShardingTestFixture::catalogClient() const {
return grid.catalogClient(_opCtx.get());
}
+ShardingCatalogManager* ShardingTestFixture::catalogManager() const {
+ return grid.catalogManager();
+}
+
ShardingCatalogClientImpl* ShardingTestFixture::getCatalogClient() const {
return _catalogClient;
}
diff --git a/src/mongo/s/sharding_test_fixture.h b/src/mongo/s/sharding_test_fixture.h
index f1129c06845..2a930bbefc4 100644
--- a/src/mongo/s/sharding_test_fixture.h
+++ b/src/mongo/s/sharding_test_fixture.h
@@ -41,6 +41,8 @@ class BSONObj;
class CatalogCache;
class ShardingCatalogClient;
class ShardingCatalogClientImpl;
+class ShardingCatalogManager;
+class ShardingCatalogManagerImpl;
struct ChunkVersion;
class CollectionType;
class DistLockManagerMock;
@@ -78,6 +80,8 @@ protected:
ShardingCatalogClient* catalogClient() const;
+ ShardingCatalogManager* catalogManager() const;
+
/**
* Prefer catalogClient() method over this as much as possible.
*/
@@ -215,6 +219,7 @@ private:
std::unique_ptr<executor::NetworkTestEnv> _addShardNetworkTestEnv;
DistLockManagerMock* _distLockManager = nullptr;
ShardingCatalogClientImpl* _catalogClient = nullptr;
+ ShardingCatalogManagerImpl* _catalogManager = nullptr;
};
} // namespace mongo