diff options
author | Kaloian Manassiev <kaloian.manassiev@mongodb.com> | 2018-07-30 15:22:26 -0400 |
---|---|---|
committer | Kaloian Manassiev <kaloian.manassiev@mongodb.com> | 2018-08-13 17:49:02 -0400 |
commit | 200c3dc58410d8b3287a2075cc9b2ad085100e83 (patch) | |
tree | 6025b0042d610f3040b65c6c58cba45825122ca1 /src/mongo/db | |
parent | 55ca9666a9f40ab480f399f4ed9d915bf55925ef (diff) | |
download | mongo-200c3dc58410d8b3287a2075cc9b2ad085100e83.tar.gz |
SERVER-29908 Move all runtime logic out of ShardingState
... and move it into a ShardingInitializationMongoD class, which is
responsible for driving the sharding-awareness of the node and setting
it onto ShardingState.
Also gets rid of the 'sharding' library, so there is no more library
dependency cycle.
Diffstat (limited to 'src/mongo/db')
-rw-r--r-- | src/mongo/db/SConscript | 13 | ||||
-rw-r--r-- | src/mongo/db/catalog/SConscript | 10 | ||||
-rw-r--r-- | src/mongo/db/db.cpp | 16 | ||||
-rw-r--r-- | src/mongo/db/repl/replication_coordinator_external_state_impl.cpp | 6 | ||||
-rw-r--r-- | src/mongo/db/s/SConscript | 42 | ||||
-rw-r--r-- | src/mongo/db/s/shard_server_op_observer.cpp | 10 | ||||
-rw-r--r-- | src/mongo/db/s/sharding_initialization_mongod.cpp | 284 | ||||
-rw-r--r-- | src/mongo/db/s/sharding_initialization_mongod.h | 84 | ||||
-rw-r--r-- | src/mongo/db/s/sharding_initialization_mongod_test.cpp (renamed from src/mongo/db/s/sharding_state_test.cpp) | 431 | ||||
-rw-r--r-- | src/mongo/db/s/sharding_initialization_op_observer_test.cpp | 149 | ||||
-rw-r--r-- | src/mongo/db/s/sharding_state.cpp | 331 | ||||
-rw-r--r-- | src/mongo/db/s/sharding_state.h | 117 | ||||
-rw-r--r-- | src/mongo/db/s/split_chunk_test.cpp | 54 |
13 files changed, 780 insertions, 767 deletions
diff --git a/src/mongo/db/SConscript b/src/mongo/db/SConscript index 1fbc546cac3..b35c4dc1fed 100644 --- a/src/mongo/db/SConscript +++ b/src/mongo/db/SConscript @@ -740,7 +740,6 @@ env.Library( '$BUILD_DIR/mongo/db/auth/auth', '$BUILD_DIR/mongo/db/concurrency/lock_manager', '$BUILD_DIR/mongo/db/ops/write_ops_parsers', - '$BUILD_DIR/mongo/db/s/sharding', '$BUILD_DIR/mongo/db/storage/storage_engine_common', '$BUILD_DIR/mongo/db/storage/storage_options', '$BUILD_DIR/mongo/db/storage/storage_engine_lock_file', @@ -891,10 +890,12 @@ env.Library( "repl/repl_coordinator_interface", "stats/timer_stats", "storage/storage_options", - "s/sharding", ], LIBDEPS_PRIVATE=[ + "$BUILD_DIR/mongo/s/grid", + "catalog_raii", "commands/server_status_core", + "s/sharding_api_d", ], ) @@ -1012,16 +1013,16 @@ env.Library( 'audit', 'background', 'bson/dotted_path_support', - 'catalog/collection', 'catalog/collection_info_cache', + 'catalog/collection', 'catalog/database', 'catalog/document_validation', - 'catalog/index_catalog', 'catalog/index_catalog_entry', + 'catalog/index_catalog', 'commands', 'concurrency/write_conflict_exception', - 'curop', 'curop_failpoint_helpers', + 'curop', 'cursor_server_params', 'db_raii', 'dbdirectclient', @@ -1036,7 +1037,7 @@ env.Library( 'query/query_common', 'query/query_planner', 'repl/repl_coordinator_interface', - 's/sharding', + 's/sharding_api_d', 'stats/serveronly_stats', 'storage/oplog_hack', 'storage/storage_options', diff --git a/src/mongo/db/catalog/SConscript b/src/mongo/db/catalog/SConscript index 1122f2656c8..826f6302a2c 100644 --- a/src/mongo/db/catalog/SConscript +++ b/src/mongo/db/catalog/SConscript @@ -305,11 +305,6 @@ env.Library( 'rename_collection.cpp', ], LIBDEPS=[ - 'collection', - 'collection_options', - 'database', - 'index_catalog', - 'index_create', '$BUILD_DIR/mongo/base', '$BUILD_DIR/mongo/db/background', '$BUILD_DIR/mongo/db/db_raii', @@ -317,6 +312,11 @@ env.Library( '$BUILD_DIR/mongo/db/server_options_core', '$BUILD_DIR/mongo/db/views/views', '$BUILD_DIR/mongo/db/write_ops', + 'collection_options', + 'collection', + 'database', + 'index_catalog', + 'index_create', ], ) diff --git a/src/mongo/db/db.cpp b/src/mongo/db/db.cpp index 19762719102..3446fb50f62 100644 --- a/src/mongo/db/db.cpp +++ b/src/mongo/db/db.cpp @@ -113,9 +113,7 @@ #include "mongo/db/s/config/sharding_catalog_manager.h" #include "mongo/db/s/config_server_op_observer.h" #include "mongo/db/s/shard_server_op_observer.h" -#include "mongo/db/s/sharded_connection_info.h" #include "mongo/db/s/sharding_initialization_mongod.h" -#include "mongo/db/s/sharding_state.h" #include "mongo/db/s/sharding_state_recovery.h" #include "mongo/db/server_options.h" #include "mongo/db/server_parameters.h" @@ -524,9 +522,8 @@ ExitCode _initAndListen(int listenPort) { serviceContext->setPeriodicRunner(std::move(runner)); // This function may take the global lock. - auto shardingInitialized = - uassertStatusOK(ShardingState::get(startupOpCtx.get()) - ->initializeShardingAwarenessIfNeeded(startupOpCtx.get())); + auto shardingInitialized = ShardingInitializationMongoD::get(startupOpCtx.get()) + ->initializeShardingAwarenessIfNeeded(startupOpCtx.get()); if (shardingInitialized) { waitForShardRegistryReload(startupOpCtx.get()).transitional_ignore(); } @@ -553,10 +550,9 @@ ExitCode _initAndListen(int listenPort) { uassertStatusOK(ShardingStateRecovery::recover(startupOpCtx.get())); } } else if (serverGlobalParams.clusterRole == ClusterRole::ConfigServer) { - uassertStatusOK( - initializeGlobalShardingStateForMongod(startupOpCtx.get(), - ConnectionString::forLocal(), - kDistLockProcessIdForConfigServer)); + initializeGlobalShardingStateForMongoD(startupOpCtx.get(), + ConnectionString::forLocal(), + kDistLockProcessIdForConfigServer); Balancer::create(startupOpCtx->getServiceContext()); @@ -883,7 +879,7 @@ void shutdownTask() { // is building an index. repl::ReplicationCoordinator::get(serviceContext)->shutdown(opCtx); - ShardingState::get(serviceContext)->shutDown(opCtx); + ShardingInitializationMongoD::get(serviceContext)->shutDown(opCtx); // Destroy all stashed transaction resources, in order to release locks. SessionKiller::Matcher matcherAllSessions( diff --git a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp index 6a73d426a90..322f445a9a8 100644 --- a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp +++ b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp @@ -73,7 +73,7 @@ #include "mongo/db/s/chunk_splitter.h" #include "mongo/db/s/config/sharding_catalog_manager.h" #include "mongo/db/s/periodic_balancer_config_refresher.h" -#include "mongo/db/s/sharding_state.h" +#include "mongo/db/s/sharding_initialization_mongod.h" #include "mongo/db/s/sharding_state_recovery.h" #include "mongo/db/server_options.h" #include "mongo/db/server_parameters.h" @@ -784,8 +784,8 @@ void ReplicationCoordinatorExternalStateImpl::_shardingOnTransitionToPrimaryHook const auto configsvrConnStr = Grid::get(opCtx)->shardRegistry()->getConfigShard()->getConnString(); - auto status = ShardingState::get(opCtx)->updateShardIdentityConfigString( - opCtx, configsvrConnStr.toString()); + auto status = ShardingInitializationMongoD::get(opCtx)->updateShardIdentityConfigString( + opCtx, configsvrConnStr); if (!status.isOK()) { warning() << "error encountered while trying to update config connection string to " << configsvrConnStr << causedBy(status); diff --git a/src/mongo/db/s/SConscript b/src/mongo/db/s/SConscript index 529511351ee..cd31bc02796 100644 --- a/src/mongo/db/s/SConscript +++ b/src/mongo/db/s/SConscript @@ -17,6 +17,7 @@ env.Library( 'operation_sharding_state.cpp', 'sharded_connection_info.cpp', 'sharding_migration_critical_section.cpp', + 'sharding_state.cpp', ], LIBDEPS=[ '$BUILD_DIR/mongo/base', @@ -65,6 +66,7 @@ env.Library( 'split_vector.cpp', ], LIBDEPS=[ + '$BUILD_DIR/mongo/db/catalog/index_create', '$BUILD_DIR/mongo/db/commands/mongod_fcv', '$BUILD_DIR/mongo/db/db_raii', '$BUILD_DIR/mongo/db/dbhelpers', @@ -72,10 +74,13 @@ env.Library( '$BUILD_DIR/mongo/db/repl/oplog', '$BUILD_DIR/mongo/db/repl/repl_coordinator_interface', '$BUILD_DIR/mongo/db/rw_concern_d', + '$BUILD_DIR/mongo/s/client/shard_local', + '$BUILD_DIR/mongo/s/client/sharding_connection_hook', + '$BUILD_DIR/mongo/s/sharding_initialization', 'chunk_splitter', + 'migration_types', 'sharding_api_d', 'sharding_catalog_manager', - 'sharding', ], ) @@ -150,36 +155,6 @@ env.Library( ) env.Library( - target='sharding', - source=[ - 'sharding_state.cpp', - ], - LIBDEPS=[ - '$BUILD_DIR/mongo/base', - '$BUILD_DIR/mongo/bson/util/bson_extract', - '$BUILD_DIR/mongo/db/bson/dotted_path_support', - '$BUILD_DIR/mongo/db/catalog/index_catalog', - '$BUILD_DIR/mongo/db/catalog/index_create', - '$BUILD_DIR/mongo/db/catalog_raii', - '$BUILD_DIR/mongo/db/common', - '$BUILD_DIR/mongo/s/client/shard_local', - '$BUILD_DIR/mongo/s/coreshard', - '$BUILD_DIR/mongo/s/is_mongos', - '$BUILD_DIR/mongo/s/sharding_initialization', - '$BUILD_DIR/mongo/s/sharding_task_executor', - '$BUILD_DIR/mongo/util/elapsed_tracker', - 'balancer', - 'migration_types', - 'sharding_api_d', - 'type_shard_identity', - ], - LIBDEPS_TAGS=[ - # TODO(ADAM, 2017-01-06): See `CYCLE` tags above - 'illegal_cyclic_or_unresolved_dependencies_whitelisted', - ], -) - -env.Library( target='balancer', source=[ 'balancer/balancer_chunk_selection_policy_impl.cpp', @@ -309,6 +284,7 @@ env.Library( '$BUILD_DIR/mongo/db/repl/replica_set_messages', '$BUILD_DIR/mongo/s/commands/cluster_commands_helpers', '$BUILD_DIR/mongo/s/commands/shared_cluster_commands', + '$BUILD_DIR/mongo/s/sharding_initialization', 'balancer', 'sharding_runtime_d', ], @@ -347,7 +323,8 @@ env.CppUnitTest( 'namespace_metadata_change_notifications_test.cpp', 'shard_metadata_util_test.cpp', 'shard_server_catalog_cache_loader_test.cpp', - 'sharding_state_test.cpp', + 'sharding_initialization_mongod_test.cpp', + 'sharding_initialization_op_observer_test.cpp', 'split_vector_test.cpp', ], LIBDEPS=[ @@ -390,7 +367,6 @@ env.CppUnitTest( LIBDEPS=[ '$BUILD_DIR/mongo/db/auth/authmocks', '$BUILD_DIR/mongo/db/repl/mock_repl_coord_server_fixture', - 'sharding', ] ) diff --git a/src/mongo/db/s/shard_server_op_observer.cpp b/src/mongo/db/s/shard_server_op_observer.cpp index d057a650f90..ad326711439 100644 --- a/src/mongo/db/s/shard_server_op_observer.cpp +++ b/src/mongo/db/s/shard_server_op_observer.cpp @@ -39,7 +39,7 @@ #include "mongo/db/s/database_sharding_state.h" #include "mongo/db/s/migration_source_manager.h" #include "mongo/db/s/shard_identity_rollback_notifier.h" -#include "mongo/db/s/sharding_state.h" +#include "mongo/db/s/sharding_initialization_mongod.h" #include "mongo/db/s/type_shard_identity.h" #include "mongo/s/balancer_configuration.h" #include "mongo/s/catalog/type_shard_collection.h" @@ -97,8 +97,12 @@ public: : _opCtx(opCtx), _shardIdentity(std::move(shardIdentity)) {} void commit(boost::optional<Timestamp>) override { - fassertNoTrace( - 40071, ShardingState::get(_opCtx)->initializeFromShardIdentity(_opCtx, _shardIdentity)); + try { + ShardingInitializationMongoD::get(_opCtx)->initializeFromShardIdentity(_opCtx, + _shardIdentity); + } catch (const AssertionException& ex) { + fassertFailedWithStatus(40071, ex.toStatus()); + } } void rollback() override {} diff --git a/src/mongo/db/s/sharding_initialization_mongod.cpp b/src/mongo/db/s/sharding_initialization_mongod.cpp index 41c45a0b8a8..ab235e1db55 100644 --- a/src/mongo/db/s/sharding_initialization_mongod.cpp +++ b/src/mongo/db/s/sharding_initialization_mongod.cpp @@ -28,26 +28,33 @@ #define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kSharding -#include "mongo/db/s/sharding_initialization_mongod.h" - #include "mongo/platform/basic.h" +#include "mongo/db/s/sharding_initialization_mongod.h" + #include "mongo/base/status.h" #include "mongo/client/connection_string.h" #include "mongo/client/global_conn_pool.h" #include "mongo/client/remote_command_targeter.h" #include "mongo/client/remote_command_targeter_factory_impl.h" +#include "mongo/client/replica_set_monitor.h" +#include "mongo/db/catalog_raii.h" +#include "mongo/db/concurrency/d_concurrency.h" +#include "mongo/db/dbhelpers.h" #include "mongo/db/logical_time_metadata_hook.h" #include "mongo/db/logical_time_validator.h" #include "mongo/db/operation_context.h" +#include "mongo/db/ops/update.h" +#include "mongo/db/ops/update_lifecycle_impl.h" +#include "mongo/db/s/chunk_splitter.h" +#include "mongo/db/s/periodic_balancer_config_refresher.h" #include "mongo/db/s/read_only_catalog_cache_loader.h" #include "mongo/db/s/shard_server_catalog_cache_loader.h" #include "mongo/db/s/sharding_config_optime_gossip.h" #include "mongo/db/server_options.h" -#include "mongo/executor/task_executor.h" +#include "mongo/executor/task_executor_pool.h" #include "mongo/rpc/metadata/egress_metadata_hook_list.h" #include "mongo/s/catalog_cache.h" -#include "mongo/s/catalog_cache_loader.h" #include "mongo/s/client/shard_connection.h" #include "mongo/s/client/shard_factory.h" #include "mongo/s/client/shard_local.h" @@ -56,12 +63,13 @@ #include "mongo/s/config_server_catalog_cache_loader.h" #include "mongo/s/grid.h" #include "mongo/s/sharding_initialization.h" -#include "mongo/stdx/memory.h" #include "mongo/util/log.h" namespace mongo { namespace { +const auto getInstance = ServiceContext::declareDecoration<ShardingInitializationMongoD>(); + auto makeEgressHooksList(ServiceContext* service) { auto unshardedHookList = stdx::make_unique<rpc::EgressMetadataHookList>(); unshardedHookList->addHook(stdx::make_unique<rpc::LogicalTimeMetadataHook>(service)); @@ -69,13 +77,261 @@ auto makeEgressHooksList(ServiceContext* service) { stdx::make_unique<rpc::ShardingEgressMetadataHookForMongod>(service)); return unshardedHookList; -}; +} + +/** + * Updates the config server field of the shardIdentity document with the given connection string if + * setName is equal to the config server replica set name. + * + * NOTE: This is intended to be used on a new thread that hasn't called Client::initThread. + * + * One example use case is for the ReplicaSetMonitor asynchronous callback when it detects changes + * to replica set membership. + */ +void updateShardIdentityConfigStringCB(const std::string& setName, + const std::string& newConnectionString) { + auto configsvrConnStr = + Grid::get(getGlobalServiceContext())->shardRegistry()->getConfigServerConnectionString(); + if (configsvrConnStr.getSetName() != setName) { + // Ignore all change notification for other sets that are not the config server. + return; + } + + Client::initThread("updateShardIdentityConfigConnString"); + auto uniqOpCtx = Client::getCurrent()->makeOperationContext(); + + auto status = ShardingInitializationMongoD::updateShardIdentityConfigString( + uniqOpCtx.get(), uassertStatusOK(ConnectionString::parse(newConnectionString))); + if (!status.isOK() && !ErrorCodes::isNotMasterError(status.code())) { + warning() << "Error encountered while trying to update config connection string to " + << newConnectionString << causedBy(redact(status)); + } +} + +void initializeShardingEnvironmentOnShardServer(OperationContext* opCtx, + const ShardIdentity& shardIdentity, + StringData distLockProcessId) { + initializeGlobalShardingStateForMongoD( + opCtx, shardIdentity.getConfigsvrConnectionString(), distLockProcessId); + + ReplicaSetMonitor::setSynchronousConfigChangeHook( + &ShardRegistry::replicaSetChangeShardRegistryUpdateHook); + ReplicaSetMonitor::setAsynchronousConfigChangeHook(&updateShardIdentityConfigStringCB); + + // Determine primary/secondary/standalone state in order to properly initialize sharding + // components. + auto replCoord = repl::ReplicationCoordinator::get(opCtx); + bool isReplSet = replCoord->getReplicationMode() == repl::ReplicationCoordinator::modeReplSet; + bool isStandaloneOrPrimary = + !isReplSet || (repl::ReplicationCoordinator::get(opCtx)->getMemberState() == + repl::MemberState::RS_PRIMARY); + + CatalogCacheLoader::get(opCtx).initializeReplicaSetRole(isStandaloneOrPrimary); + ChunkSplitter::get(opCtx).onShardingInitialization(isStandaloneOrPrimary); + PeriodicBalancerConfigRefresher::get(opCtx).onShardingInitialization(opCtx->getServiceContext(), + isStandaloneOrPrimary); + + LOG(0) << "Finished initializing sharding components for " + << (isStandaloneOrPrimary ? "primary" : "secondary") << " node."; +} } // namespace -Status initializeGlobalShardingStateForMongod(OperationContext* opCtx, - const ConnectionString& configCS, - StringData distLockProcessId) { +ShardingInitializationMongoD::ShardingInitializationMongoD() + : _initFunc(initializeShardingEnvironmentOnShardServer) {} + +ShardingInitializationMongoD::~ShardingInitializationMongoD() = default; + +ShardingInitializationMongoD* ShardingInitializationMongoD::get(OperationContext* opCtx) { + return get(opCtx->getServiceContext()); +} + +ShardingInitializationMongoD* ShardingInitializationMongoD::get(ServiceContext* service) { + return &getInstance(service); +} + +void ShardingInitializationMongoD::shutDown(OperationContext* opCtx) { + auto const shardingState = ShardingState::get(opCtx); + auto const grid = Grid::get(opCtx); + + if (!shardingState->enabled()) + return; + + grid->getExecutorPool()->shutdownAndJoin(); + grid->catalogClient()->shutDown(opCtx); +} + +bool ShardingInitializationMongoD::initializeShardingAwarenessIfNeeded(OperationContext* opCtx) { + invariant(!opCtx->lockState()->isLocked()); + + // In sharded readOnly mode, we ignore the shardIdentity document on disk and instead *require* + // a shardIdentity document to be passed through --overrideShardIdentity + if (storageGlobalParams.readOnly) { + if (serverGlobalParams.clusterRole == ClusterRole::ShardServer) { + uassert(ErrorCodes::InvalidOptions, + "If started with --shardsvr in queryableBackupMode, a shardIdentity document " + "must be provided through --overrideShardIdentity", + !serverGlobalParams.overrideShardIdentity.isEmpty()); + + auto overrideShardIdentity = + uassertStatusOK(ShardIdentityType::fromShardIdentityDocument( + serverGlobalParams.overrideShardIdentity)); + + { + // Global lock is required to call initializeFromShardIdentity + Lock::GlobalWrite lk(opCtx); + initializeFromShardIdentity(opCtx, overrideShardIdentity); + } + + return true; + } else { + // Error if --overrideShardIdentity is used but *not* started with --shardsvr + uassert(ErrorCodes::InvalidOptions, + str::stream() + << "Not started with --shardsvr, but a shardIdentity document was provided " + "through --overrideShardIdentity: " + << serverGlobalParams.overrideShardIdentity, + serverGlobalParams.overrideShardIdentity.isEmpty()); + return false; + } + + MONGO_UNREACHABLE; + } + + // In sharded *non*-readOnly mode, error if --overrideShardIdentity is provided + uassert(ErrorCodes::InvalidOptions, + str::stream() << "--overrideShardIdentity is only allowed in sharded " + "queryableBackupMode. If not in queryableBackupMode, you can edit " + "the shardIdentity document by starting the server *without* " + "--shardsvr, manually updating the shardIdentity document in the " + << NamespaceString::kServerConfigurationNamespace.toString() + << " collection, and restarting the server with --shardsvr.", + serverGlobalParams.overrideShardIdentity.isEmpty()); + + // Use the shardIdentity document on disk if one exists, but it is okay if no shardIdentity + // document is available at all (sharding awareness will be initialized when a shardIdentity + // document is inserted) + BSONObj shardIdentityBSON; + const bool foundShardIdentity = [&] { + AutoGetCollection autoColl(opCtx, NamespaceString::kServerConfigurationNamespace, MODE_IS); + return Helpers::findOne(opCtx, + autoColl.getCollection(), + BSON("_id" << ShardIdentityType::IdName), + shardIdentityBSON); + }(); + + if (serverGlobalParams.clusterRole == ClusterRole::ShardServer) { + if (!foundShardIdentity) { + warning() << "Started with --shardsvr, but no shardIdentity document was found on " + "disk in " + << NamespaceString::kServerConfigurationNamespace + << ". This most likely means this server has not yet been added to a " + "sharded cluster."; + return false; + } + + invariant(!shardIdentityBSON.isEmpty()); + + auto shardIdentity = + uassertStatusOK(ShardIdentityType::fromShardIdentityDocument(shardIdentityBSON)); + + { + // Global lock is required to call initializeFromShardIdentity + Lock::GlobalWrite lk(opCtx); + initializeFromShardIdentity(opCtx, shardIdentity); + } + + return true; + } else { + // Warn if a shardIdentity document is found on disk but *not* started with --shardsvr. + if (!shardIdentityBSON.isEmpty()) { + warning() << "Not started with --shardsvr, but a shardIdentity document was found " + "on disk in " + << NamespaceString::kServerConfigurationNamespace << ": " + << shardIdentityBSON; + } + return false; + } +} + +void ShardingInitializationMongoD::initializeFromShardIdentity( + OperationContext* opCtx, const ShardIdentityType& shardIdentity) { + invariant(serverGlobalParams.clusterRole == ClusterRole::ShardServer); + invariant(opCtx->lockState()->isLocked()); + + uassertStatusOKWithContext( + shardIdentity.validate(), + "Invalid shard identity document found when initializing sharding state"); + + log() << "initializing sharding state with: " << shardIdentity; + + const auto& configSvrConnStr = shardIdentity.getConfigsvrConnectionString(); + + auto const shardingState = ShardingState::get(opCtx); + auto const shardRegistry = Grid::get(opCtx)->shardRegistry(); + + stdx::unique_lock<stdx::mutex> ul(_initSynchronizationMutex); + + if (shardingState->enabled()) { + uassert(40371, "", shardingState->shardId() == shardIdentity.getShardName()); + uassert(40372, "", shardingState->clusterId() == shardIdentity.getClusterId()); + + auto prevConfigsvrConnStr = shardRegistry->getConfigServerConnectionString(); + uassert(40373, "", prevConfigsvrConnStr.type() == ConnectionString::SET); + uassert(40374, "", prevConfigsvrConnStr.getSetName() == configSvrConnStr.getSetName()); + + return; + } + + auto initializationStatus = shardingState->initializationStatus(); + uassert(ErrorCodes::ManualInterventionRequired, + str::stream() << "Server's sharding metadata manager failed to initialize and will " + "remain in this state until the instance is manually reset" + << causedBy(*initializationStatus), + !initializationStatus); + + try { + _initFunc(opCtx, shardIdentity, generateDistLockProcessId(opCtx)); + shardingState->setInitialized(shardIdentity.getShardName().toString(), + shardIdentity.getClusterId()); + } catch (const DBException& ex) { + shardingState->setInitialized(ex.toStatus()); + } +} + +Status ShardingInitializationMongoD::updateShardIdentityConfigString( + OperationContext* opCtx, const ConnectionString& newConnectionString) { + BSONObj updateObj( + ShardIdentityType::createConfigServerUpdateObject(newConnectionString.toString())); + + UpdateRequest updateReq(NamespaceString::kServerConfigurationNamespace); + updateReq.setQuery(BSON("_id" << ShardIdentityType::IdName)); + updateReq.setUpdates(updateObj); + UpdateLifecycleImpl updateLifecycle(NamespaceString::kServerConfigurationNamespace); + updateReq.setLifecycle(&updateLifecycle); + + try { + AutoGetOrCreateDb autoDb( + opCtx, NamespaceString::kServerConfigurationNamespace.db(), MODE_X); + + auto result = update(opCtx, autoDb.getDb(), updateReq); + if (result.numMatched == 0) { + warning() << "failed to update config string of shard identity document because " + << "it does not exist. This shard could have been removed from the cluster"; + } else { + LOG(2) << "Updated config server connection string in shardIdentity document to" + << newConnectionString; + } + } catch (const DBException& exception) { + return exception.toStatus(); + } + + return Status::OK(); +} + +void initializeGlobalShardingStateForMongoD(OperationContext* opCtx, + const ConnectionString& configCS, + StringData distLockProcessId) { auto targeterFactory = stdx::make_unique<RemoteCommandTargeterFactoryImpl>(); auto targeterFactoryPtr = targeterFactory.get(); @@ -126,7 +382,7 @@ Status initializeGlobalShardingStateForMongod(OperationContext* opCtx, globalConnPool.addHook(new ShardingConnectionHook(false, makeEgressHooksList(service))); shardConnectionPool.addHook(new ShardingConnectionHook(true, makeEgressHooksList(service))); - Status initStatus = initializeGlobalShardingState( + uassertStatusOK(initializeGlobalShardingState( opCtx, configCS, distLockProcessId, @@ -135,13 +391,9 @@ Status initializeGlobalShardingStateForMongod(OperationContext* opCtx, [service] { return makeEgressHooksList(service); }, // We only need one task executor here because sharding task executors aren't used for user // queries in mongod. - 1); - - if (initStatus.isOK()) { - Grid::get(opCtx)->setShardingInitialized(); - } + 1)); - return initStatus; + Grid::get(opCtx)->setShardingInitialized(); } } // namespace mongo diff --git a/src/mongo/db/s/sharding_initialization_mongod.h b/src/mongo/db/s/sharding_initialization_mongod.h index 9db0bf4357d..0cb0dbc6307 100644 --- a/src/mongo/db/s/sharding_initialization_mongod.h +++ b/src/mongo/db/s/sharding_initialization_mongod.h @@ -28,14 +28,90 @@ #pragma once +#include "mongo/base/disallow_copying.h" #include "mongo/base/string_data.h" +#include "mongo/db/s/sharding_state.h" +#include "mongo/db/s/type_shard_identity.h" +#include "mongo/stdx/functional.h" namespace mongo { class ConnectionString; class OperationContext; class ServiceContext; -class Status; + +/** + * This class serves as a bootstrap and shutdown for the sharding subsystem and also controls the + * persisted cluster identity. The default ShardingEnvironmentInitFunc instantiates all the sharding + * services, attaches them to the same service context to which it itself is attached and puts the + * ShardingState in the initialized state. + */ +class ShardingInitializationMongoD { + MONGO_DISALLOW_COPYING(ShardingInitializationMongoD); + +public: + using ShardingEnvironmentInitFunc = stdx::function<void( + OperationContext* opCtx, const ShardIdentity& shardIdentity, StringData distLockProcessId)>; + + ShardingInitializationMongoD(); + ~ShardingInitializationMongoD(); + + static ShardingInitializationMongoD* get(OperationContext* opCtx); + static ShardingInitializationMongoD* get(ServiceContext* service); + + /** + * If started with --shardsvr, initializes sharding awareness from the shardIdentity document on + * disk, if there is one. + * + * If started with --shardsvr in queryableBackupMode, initializes sharding awareness from the + * shardIdentity document passed through the --overrideShardIdentity startup parameter. + * + * If it returns true, the '_initFunc' was called, meaning all the core classes for sharding + * were initialized, but no networking calls were made yet (with the exception of the duplicate + * ShardRegistry reload in ShardRegistry::startup() (see SERVER-26123). Outgoing networking + * calls to cluster members can now be made. + * + * If it returns false, this means the node is not yet sharding aware. + * + * NOTE: this function briefly takes the global lock to determine primary/secondary state. + */ + bool initializeShardingAwarenessIfNeeded(OperationContext* opCtx); + + /** + * Initializes the sharding state of this server from the shard identity document argument and + * sets secondary or primary state information on the catalog cache loader. + * + * NOTE: This must be called under at least Global IX lock in order for the replica set member + * state to be stable (primary/secondary). + */ + void initializeFromShardIdentity(OperationContext* opCtx, + const ShardIdentityType& shardIdentity); + + void shutDown(OperationContext* service); + + /** + * Updates the config server field of the shardIdentity document with the given connection + * string. + */ + static Status updateShardIdentityConfigString(OperationContext* opCtx, + const ConnectionString& newConnectionString); + + /** + * For testing only. Mock the initialization method used by initializeFromConfigConnString and + * initializeFromShardIdentity after all checks are performed. + */ + void setGlobalInitMethodForTest(ShardingEnvironmentInitFunc func) { + _initFunc = std::move(func); + } + +private: + // This mutex ensures that only one thread at a time executes the sharding + // initialization/teardown sequence + stdx::mutex _initSynchronizationMutex; + + // Function for initializing the sharding environment components (i.e. everything on the Grid) + ShardingEnvironmentInitFunc _initFunc; +}; /** * Initialize the sharding components of this server. This can be used on both shard and config @@ -43,8 +119,8 @@ class Status; * * NOTE: This does not initialize ShardingState, which should only be done for shard servers. */ -Status initializeGlobalShardingStateForMongod(OperationContext* opCtx, - const ConnectionString& configCS, - StringData distLockProcessId); +void initializeGlobalShardingStateForMongoD(OperationContext* opCtx, + const ConnectionString& configCS, + StringData distLockProcessId); } // namespace mongo diff --git a/src/mongo/db/s/sharding_state_test.cpp b/src/mongo/db/s/sharding_initialization_mongod_test.cpp index 36b6024cc21..7748e7492ac 100644 --- a/src/mongo/db/s/sharding_state_test.cpp +++ b/src/mongo/db/s/sharding_initialization_mongod_test.cpp @@ -1,5 +1,5 @@ /** - * Copyright (C) 2016 MongoDB, Inc. + * Copyright (C) 2018 MongoDB, Inc. * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU Affero General Public License, version 3, @@ -38,7 +38,7 @@ #include "mongo/db/s/config_server_op_observer.h" #include "mongo/db/s/shard_server_catalog_cache_loader.h" #include "mongo/db/s/shard_server_op_observer.h" -#include "mongo/db/s/sharding_state.h" +#include "mongo/db/s/sharding_initialization_mongod.h" #include "mongo/db/s/type_shard_identity.h" #include "mongo/db/server_options.h" #include "mongo/s/catalog/dist_lock_manager_mock.h" @@ -46,96 +46,17 @@ #include "mongo/s/client/shard_registry.h" #include "mongo/s/config_server_catalog_cache_loader.h" #include "mongo/s/shard_server_test_fixture.h" -#include "mongo/s/sharding_mongod_test_fixture.h" namespace mongo { namespace { const std::string kShardName("TestShard"); -class ShardingInitializationOpObserverTest : public ShardServerTestFixture { -public: - void setUp() override { - ShardServerTestFixture::setUp(); - - // NOTE: this assumes that globalInit will always be called on the same thread as the main - // test thread - ShardingState::get(operationContext()) - ->setGlobalInitMethodForTest( - [this](OperationContext*, const ConnectionString&, StringData) { - _initCallCount++; - return Status::OK(); - }); - } - - int getInitCallCount() const { - return _initCallCount; - } - -private: - int _initCallCount = 0; -}; - -TEST_F(ShardingInitializationOpObserverTest, GlobalInitGetsCalledAfterWriteCommits) { - ShardIdentityType shardIdentity; - shardIdentity.setConfigsvrConnectionString( - ConnectionString(ConnectionString::SET, "a:1,b:2", "config")); - shardIdentity.setShardName(kShardName); - shardIdentity.setClusterId(OID::gen()); - - DBDirectClient client(operationContext()); - client.insert("admin.system.version", shardIdentity.toShardIdentityDocument()); - ASSERT_EQ(1, getInitCallCount()); -} - -TEST_F(ShardingInitializationOpObserverTest, GlobalInitDoesntGetCalledIfWriteAborts) { - ShardIdentityType shardIdentity; - shardIdentity.setConfigsvrConnectionString( - ConnectionString(ConnectionString::SET, "a:1,b:2", "config")); - shardIdentity.setShardName(kShardName); - shardIdentity.setClusterId(OID::gen()); - - // This part of the test ensures that the collection exists for the AutoGetCollection below to - // find and also validates that the initializer does not get called for non-sharding documents - DBDirectClient client(operationContext()); - client.insert("admin.system.version", BSON("_id" << 1)); - ASSERT_EQ(0, getInitCallCount()); - - { - AutoGetCollection autoColl( - operationContext(), NamespaceString("admin.system.version"), MODE_IX); - - WriteUnitOfWork wuow(operationContext()); - ASSERT_OK(autoColl.getCollection()->insertDocument( - operationContext(), shardIdentity.toShardIdentityDocument(), {})); - ASSERT_EQ(0, getInitCallCount()); - } - - ASSERT_EQ(0, getInitCallCount()); -} - -TEST_F(ShardingInitializationOpObserverTest, GlobalInitDoesntGetsCalledIfNSIsNotForShardIdentity) { - ShardIdentityType shardIdentity; - shardIdentity.setConfigsvrConnectionString( - ConnectionString(ConnectionString::SET, "a:1,b:2", "config")); - shardIdentity.setShardName(kShardName); - shardIdentity.setClusterId(OID::gen()); - - DBDirectClient client(operationContext()); - client.insert("admin.user", shardIdentity.toShardIdentityDocument()); - ASSERT_EQ(0, getInitCallCount()); -} - -TEST_F(ShardingInitializationOpObserverTest, OnInsertOpThrowWithIncompleteShardIdentityDocument) { - DBDirectClient client(operationContext()); - client.insert("admin.system.version", - BSON("_id" << ShardIdentityType::IdName << ShardIdentity::kShardNameFieldName - << kShardName)); - ASSERT(!client.getLastError().empty()); -} - - -class ShardingStateTest : public ShardingMongodTestFixture { +/** + * This test suite directly invokes the sharding initialization code and validates its behaviour and + * proper state transitions. + */ +class ShardingInitializationMongoDTest : public ShardingMongodTestFixture { protected: // Used to write to set up local collections before exercising server logic. std::unique_ptr<DBDirectClient> _dbDirectClient; @@ -151,23 +72,24 @@ protected: stdx::make_unique<ShardServerCatalogCacheLoader>( stdx::make_unique<ConfigServerCatalogCacheLoader>())); - _shardingState.setGlobalInitMethodForTest([&](OperationContext* opCtx, - const ConnectionString& configConnStr, - StringData distLockProcessId) { - auto status = initializeGlobalShardingStateForMongodForTest(configConnStr); - if (!status.isOK()) { - return status; - } + ShardingInitializationMongoD::get(getServiceContext()) + ->setGlobalInitMethodForTest([&](OperationContext* opCtx, + const ShardIdentity& shardIdentity, + StringData distLockProcessId) { + const auto& configConnStr = shardIdentity.getConfigsvrConnectionString(); - // Set the ConnectionString return value on the mock targeter so that later calls to the - // targeter's getConnString() return the appropriate value - auto configTargeter = - RemoteCommandTargeterMock::get(shardRegistry()->getConfigShard()->getTargeter()); - configTargeter->setConnectionStringReturnValue(configConnStr); - configTargeter->setFindHostReturnValue(configConnStr.getServers()[0]); + uassertStatusOK(initializeGlobalShardingStateForMongodForTest(configConnStr)); - return Status::OK(); - }); + // Set the ConnectionString return value on the mock targeter so that later calls to + // the + // targeter's getConnString() return the appropriate value + auto configTargeter = RemoteCommandTargeterMock::get( + shardRegistry()->getConfigShard()->getTargeter()); + configTargeter->setConnectionStringReturnValue(configConnStr); + configTargeter->setFindHostReturnValue(configConnStr.getServers()[0]); + + return Status::OK(); + }); _dbDirectClient = stdx::make_unique<DBDirectClient>(operationContext()); } @@ -180,6 +102,7 @@ protected: serverGlobalParams.overrideShardIdentity = BSONObj(); CatalogCacheLoader::clearForTests(getServiceContext()); + ShardingState::get(getServiceContext())->clearForTests(); ShardingMongodTestFixture::tearDown(); } @@ -195,12 +118,13 @@ protected: return stdx::make_unique<ShardingCatalogClientImpl>(std::move(distLockManager)); } - ShardingState* shardingState() { - return &_shardingState; + auto* shardingInitialization() { + return ShardingInitializationMongoD::get(getServiceContext()); } -private: - ShardingState _shardingState; + auto* shardingState() { + return ShardingState::get(getServiceContext()); + } }; /** @@ -231,7 +155,7 @@ private: ServiceContext* const _serviceContext; }; -TEST_F(ShardingStateTest, ValidShardIdentitySucceeds) { +TEST_F(ShardingInitializationMongoDTest, ValidShardIdentitySucceeds) { // Must hold a lock to call initializeFromShardIdentity. Lock::GlobalWrite lk(operationContext()); @@ -241,13 +165,14 @@ TEST_F(ShardingStateTest, ValidShardIdentitySucceeds) { shardIdentity.setShardName(kShardName); shardIdentity.setClusterId(OID::gen()); - ASSERT_OK(shardingState()->initializeFromShardIdentity(operationContext(), shardIdentity)); - ASSERT_TRUE(shardingState()->enabled()); + shardingInitialization()->initializeFromShardIdentity(operationContext(), shardIdentity); + ASSERT_OK(shardingState()->canAcceptShardedCommands()); + ASSERT(shardingState()->enabled()); ASSERT_EQ(kShardName, shardingState()->shardId()); ASSERT_EQ("config/a:1,b:2", shardRegistry()->getConfigServerConnectionString().toString()); } -TEST_F(ShardingStateTest, InitWhilePreviouslyInErrorStateWillStayInErrorState) { +TEST_F(ShardingInitializationMongoDTest, InitWhilePreviouslyInErrorStateWillStayInErrorState) { // Must hold a lock to call initializeFromShardIdentity. Lock::GlobalWrite lk(operationContext()); @@ -257,34 +182,28 @@ TEST_F(ShardingStateTest, InitWhilePreviouslyInErrorStateWillStayInErrorState) { shardIdentity.setShardName(kShardName); shardIdentity.setClusterId(OID::gen()); - shardingState()->setGlobalInitMethodForTest( - [](OperationContext* opCtx, const ConnectionString& connStr, StringData distLockProcessId) { - return Status{ErrorCodes::ShutdownInProgress, "shutting down"}; - }); + shardingInitialization()->setGlobalInitMethodForTest([]( + OperationContext* opCtx, const ShardIdentity& shardIdentity, StringData distLockProcessId) { + uasserted(ErrorCodes::ShutdownInProgress, "Not an actual shutdown"); + }); - { - auto status = - shardingState()->initializeFromShardIdentity(operationContext(), shardIdentity); - ASSERT_EQ(ErrorCodes::ShutdownInProgress, status); - } + shardingInitialization()->initializeFromShardIdentity(operationContext(), shardIdentity); // ShardingState is now in error state, attempting to call it again will still result in error. - - shardingState()->setGlobalInitMethodForTest( - [](OperationContext* opCtx, const ConnectionString& connStr, StringData distLockProcessId) { - return Status::OK(); - }); - - { - auto status = - shardingState()->initializeFromShardIdentity(operationContext(), shardIdentity); - ASSERT_EQ(ErrorCodes::ManualInterventionRequired, status); - } - - ASSERT_FALSE(shardingState()->enabled()); + shardingInitialization()->setGlobalInitMethodForTest([]( + OperationContext* opCtx, const ShardIdentity& shardIdentity, StringData distLockProcessId) { + FAIL("Should not be invoked!"); + }); + + ASSERT_THROWS_CODE( + shardingInitialization()->initializeFromShardIdentity(operationContext(), shardIdentity), + AssertionException, + ErrorCodes::ManualInterventionRequired); + ASSERT_NOT_OK(shardingState()->canAcceptShardedCommands()); + ASSERT(!shardingState()->enabled()); } -TEST_F(ShardingStateTest, InitializeAgainWithMatchingShardIdentitySucceeds) { +TEST_F(ShardingInitializationMongoDTest, InitializeAgainWithMatchingShardIdentitySucceeds) { // Must hold a lock to call initializeFromShardIdentity. Lock::GlobalWrite lk(operationContext()); @@ -295,7 +214,7 @@ TEST_F(ShardingStateTest, InitializeAgainWithMatchingShardIdentitySucceeds) { shardIdentity.setShardName(kShardName); shardIdentity.setClusterId(clusterID); - ASSERT_OK(shardingState()->initializeFromShardIdentity(operationContext(), shardIdentity)); + shardingInitialization()->initializeFromShardIdentity(operationContext(), shardIdentity); ShardIdentityType shardIdentity2; shardIdentity2.setConfigsvrConnectionString( @@ -303,19 +222,21 @@ TEST_F(ShardingStateTest, InitializeAgainWithMatchingShardIdentitySucceeds) { shardIdentity2.setShardName(kShardName); shardIdentity2.setClusterId(clusterID); - shardingState()->setGlobalInitMethodForTest( - [](OperationContext* opCtx, const ConnectionString& connStr, StringData distLockProcessId) { - return Status{ErrorCodes::InternalError, "should not reach here"}; - }); + shardingInitialization()->setGlobalInitMethodForTest([]( + OperationContext* opCtx, const ShardIdentity& shardIdentity, StringData distLockProcessId) { + FAIL("Should not be invoked!"); + }); - ASSERT_OK(shardingState()->initializeFromShardIdentity(operationContext(), shardIdentity2)); + shardingInitialization()->initializeFromShardIdentity(operationContext(), shardIdentity2); + ASSERT_OK(shardingState()->canAcceptShardedCommands()); ASSERT_TRUE(shardingState()->enabled()); + ASSERT_EQ(kShardName, shardingState()->shardId()); ASSERT_EQ("config/a:1,b:2", shardRegistry()->getConfigServerConnectionString().toString()); } -TEST_F(ShardingStateTest, InitializeAgainWithSameReplSetNameSucceeds) { +TEST_F(ShardingInitializationMongoDTest, InitializeAgainWithMatchingReplSetNameSucceeds) { // Must hold a lock to call initializeFromShardIdentity. Lock::GlobalWrite lk(operationContext()); @@ -326,7 +247,7 @@ TEST_F(ShardingStateTest, InitializeAgainWithSameReplSetNameSucceeds) { shardIdentity.setShardName(kShardName); shardIdentity.setClusterId(clusterID); - ASSERT_OK(shardingState()->initializeFromShardIdentity(operationContext(), shardIdentity)); + shardingInitialization()->initializeFromShardIdentity(operationContext(), shardIdentity); ShardIdentityType shardIdentity2; shardIdentity2.setConfigsvrConnectionString( @@ -334,14 +255,16 @@ TEST_F(ShardingStateTest, InitializeAgainWithSameReplSetNameSucceeds) { shardIdentity2.setShardName(kShardName); shardIdentity2.setClusterId(clusterID); - shardingState()->setGlobalInitMethodForTest( - [](OperationContext* opCtx, const ConnectionString& connStr, StringData distLockProcessId) { - return Status{ErrorCodes::InternalError, "should not reach here"}; - }); + shardingInitialization()->setGlobalInitMethodForTest([]( + OperationContext* opCtx, const ShardIdentity& shardIdentity, StringData distLockProcessId) { + FAIL("Should not be invoked!"); + }); - ASSERT_OK(shardingState()->initializeFromShardIdentity(operationContext(), shardIdentity2)); + shardingInitialization()->initializeFromShardIdentity(operationContext(), shardIdentity2); + ASSERT_OK(shardingState()->canAcceptShardedCommands()); ASSERT_TRUE(shardingState()->enabled()); + ASSERT_EQ(kShardName, shardingState()->shardId()); ASSERT_EQ("config/a:1,b:2", shardRegistry()->getConfigServerConnectionString().toString()); } @@ -349,17 +272,20 @@ TEST_F(ShardingStateTest, InitializeAgainWithSameReplSetNameSucceeds) { // The tests below check for different combinations of the compatible startup parameters for // --shardsvr, --overrideShardIdentity, and queryableBackup (readOnly) mode -// readOnly and --shardsvr - -TEST_F(ShardingStateTest, +/** + * readOnly and --shardsvr + */ +TEST_F(ShardingInitializationMongoDTest, InitializeShardingAwarenessIfNeededReadOnlyAndShardServerAndNoOverrideShardIdentity) { storageGlobalParams.readOnly = true; - auto swShardingInitialized = - shardingState()->initializeShardingAwarenessIfNeeded(operationContext()); - ASSERT_EQUALS(ErrorCodes::InvalidOptions, swShardingInitialized.getStatus().code()); + + ASSERT_THROWS_CODE( + shardingInitialization()->initializeShardingAwarenessIfNeeded(operationContext()), + AssertionException, + ErrorCodes::InvalidOptions); } -TEST_F(ShardingStateTest, +TEST_F(ShardingInitializationMongoDTest, InitializeShardingAwarenessIfNeededReadOnlyAndShardServerAndInvalidOverrideShardIdentity) { storageGlobalParams.readOnly = true; serverGlobalParams.overrideShardIdentity = @@ -371,132 +297,136 @@ TEST_F(ShardingStateTest, << OID::gen() << ShardIdentity::kConfigsvrConnectionStringFieldName << "invalid"); - auto swShardingInitialized = - shardingState()->initializeShardingAwarenessIfNeeded(operationContext()); - ASSERT_EQUALS(ErrorCodes::UnsupportedFormat, swShardingInitialized.getStatus().code()); + + ASSERT_THROWS_CODE( + shardingInitialization()->initializeShardingAwarenessIfNeeded(operationContext()), + AssertionException, + ErrorCodes::UnsupportedFormat); } -TEST_F(ShardingStateTest, +TEST_F(ShardingInitializationMongoDTest, InitializeShardingAwarenessIfNeededReadOnlyAndShardServerAndValidOverrideShardIdentity) { storageGlobalParams.readOnly = true; serverGlobalParams.clusterRole = ClusterRole::ShardServer; + serverGlobalParams.overrideShardIdentity = [] { + ShardIdentityType shardIdentity; + shardIdentity.setConfigsvrConnectionString( + ConnectionString(ConnectionString::SET, "a:1,b:2", "config")); + shardIdentity.setShardName(kShardName); + shardIdentity.setClusterId(OID::gen()); + ASSERT_OK(shardIdentity.validate()); + return shardIdentity.toShardIdentityDocument(); + }(); - ShardIdentityType shardIdentity; - shardIdentity.setConfigsvrConnectionString( - ConnectionString(ConnectionString::SET, "a:1,b:2", "config")); - shardIdentity.setShardName(kShardName); - shardIdentity.setClusterId(OID::gen()); - ASSERT_OK(shardIdentity.validate()); - serverGlobalParams.overrideShardIdentity = shardIdentity.toShardIdentityDocument(); - - auto swShardingInitialized = - shardingState()->initializeShardingAwarenessIfNeeded(operationContext()); - ASSERT_OK(swShardingInitialized); - ASSERT_TRUE(swShardingInitialized.getValue()); + ASSERT(shardingInitialization()->initializeShardingAwarenessIfNeeded(operationContext())); } -// readOnly and not --shardsvr - -TEST_F(ShardingStateTest, +/** + * readOnly and not --shardsvr + */ +TEST_F(ShardingInitializationMongoDTest, InitializeShardingAwarenessIfNeededReadOnlyAndNotShardServerAndNoOverrideShardIdentity) { storageGlobalParams.readOnly = true; serverGlobalParams.clusterRole = ClusterRole::None; - auto swShardingInitialized = - shardingState()->initializeShardingAwarenessIfNeeded(operationContext()); - ASSERT_OK(swShardingInitialized); - ASSERT_FALSE(swShardingInitialized.getValue()); + ASSERT(!shardingInitialization()->initializeShardingAwarenessIfNeeded(operationContext())); } TEST_F( - ShardingStateTest, + ShardingInitializationMongoDTest, InitializeShardingAwarenessIfNeededReadOnlyAndNotShardServerAndInvalidOverrideShardIdentity) { storageGlobalParams.readOnly = true; serverGlobalParams.clusterRole = ClusterRole::None; - serverGlobalParams.overrideShardIdentity = BSON("_id" << "shardIdentity" << "configsvrConnectionString" << "invalid"); - auto swShardingInitialized = - shardingState()->initializeShardingAwarenessIfNeeded(operationContext()); - ASSERT_EQUALS(ErrorCodes::InvalidOptions, swShardingInitialized.getStatus().code()); + + ASSERT_THROWS_CODE( + shardingInitialization()->initializeShardingAwarenessIfNeeded(operationContext()), + AssertionException, + ErrorCodes::InvalidOptions); } -TEST_F(ShardingStateTest, +TEST_F(ShardingInitializationMongoDTest, InitializeShardingAwarenessIfNeededReadOnlyAndNotShardServerAndValidOverrideShardIdentity) { storageGlobalParams.readOnly = true; serverGlobalParams.clusterRole = ClusterRole::None; + serverGlobalParams.overrideShardIdentity = [] { + ShardIdentityType shardIdentity; + shardIdentity.setConfigsvrConnectionString( + ConnectionString(ConnectionString::SET, "a:1,b:2", "config")); + shardIdentity.setShardName(kShardName); + shardIdentity.setClusterId(OID::gen()); + ASSERT_OK(shardIdentity.validate()); + return shardIdentity.toShardIdentityDocument(); + }(); - ShardIdentityType shardIdentity; - shardIdentity.setConfigsvrConnectionString( - ConnectionString(ConnectionString::SET, "a:1,b:2", "config")); - shardIdentity.setShardName(kShardName); - shardIdentity.setClusterId(OID::gen()); - ASSERT_OK(shardIdentity.validate()); - serverGlobalParams.overrideShardIdentity = shardIdentity.toShardIdentityDocument(); - - auto swShardingInitialized = - shardingState()->initializeShardingAwarenessIfNeeded(operationContext()); - ASSERT_EQUALS(ErrorCodes::InvalidOptions, swShardingInitialized.getStatus().code()); + ASSERT_THROWS_CODE( + shardingInitialization()->initializeShardingAwarenessIfNeeded(operationContext()), + AssertionException, + ErrorCodes::InvalidOptions); } -// not readOnly and --overrideShardIdentity - -TEST_F(ShardingStateTest, +/** + * not readOnly and --overrideShardIdentity + */ +TEST_F(ShardingInitializationMongoDTest, InitializeShardingAwarenessIfNeededNotReadOnlyAndInvalidOverrideShardIdentity) { + serverGlobalParams.clusterRole = ClusterRole::ShardServer; serverGlobalParams.overrideShardIdentity = BSON("_id" << "shardIdentity" << "configsvrConnectionString" << "invalid"); - // Should error regardless of cluster role. - - serverGlobalParams.clusterRole = ClusterRole::ShardServer; - auto swShardingInitialized = - shardingState()->initializeShardingAwarenessIfNeeded(operationContext()); - ASSERT_EQUALS(ErrorCodes::InvalidOptions, swShardingInitialized.getStatus().code()); + ASSERT_THROWS_CODE( + shardingInitialization()->initializeShardingAwarenessIfNeeded(operationContext()), + AssertionException, + ErrorCodes::InvalidOptions); + // Should error regardless of cluster role serverGlobalParams.clusterRole = ClusterRole::None; - swShardingInitialized = - shardingState()->initializeShardingAwarenessIfNeeded(operationContext()); - ASSERT_EQUALS(ErrorCodes::InvalidOptions, swShardingInitialized.getStatus().code()); + ASSERT_THROWS_CODE( + shardingInitialization()->initializeShardingAwarenessIfNeeded(operationContext()), + AssertionException, + ErrorCodes::InvalidOptions); } -TEST_F(ShardingStateTest, +TEST_F(ShardingInitializationMongoDTest, InitializeShardingAwarenessIfNeededNotReadOnlyAndValidOverrideShardIdentity) { - ShardIdentityType shardIdentity; - shardIdentity.setConfigsvrConnectionString( - ConnectionString(ConnectionString::SET, "a:1,b:2", "config")); - shardIdentity.setShardName(kShardName); - shardIdentity.setClusterId(OID::gen()); - ASSERT_OK(shardIdentity.validate()); - serverGlobalParams.overrideShardIdentity = shardIdentity.toShardIdentityDocument(); - - // Should error regardless of cluster role. - serverGlobalParams.clusterRole = ClusterRole::ShardServer; - auto swShardingInitialized = - shardingState()->initializeShardingAwarenessIfNeeded(operationContext()); - ASSERT_EQUALS(ErrorCodes::InvalidOptions, swShardingInitialized.getStatus().code()); + serverGlobalParams.overrideShardIdentity = [] { + ShardIdentityType shardIdentity; + shardIdentity.setConfigsvrConnectionString( + ConnectionString(ConnectionString::SET, "a:1,b:2", "config")); + shardIdentity.setShardName(kShardName); + shardIdentity.setClusterId(OID::gen()); + ASSERT_OK(shardIdentity.validate()); + return shardIdentity.toShardIdentityDocument(); + }(); + ASSERT_THROWS_CODE( + shardingInitialization()->initializeShardingAwarenessIfNeeded(operationContext()), + AssertionException, + ErrorCodes::InvalidOptions); + + // Should error regardless of cluster role serverGlobalParams.clusterRole = ClusterRole::None; - swShardingInitialized = - shardingState()->initializeShardingAwarenessIfNeeded(operationContext()); - ASSERT_EQUALS(ErrorCodes::InvalidOptions, swShardingInitialized.getStatus().code()); + ASSERT_THROWS_CODE( + shardingInitialization()->initializeShardingAwarenessIfNeeded(operationContext()), + AssertionException, + ErrorCodes::InvalidOptions); } -// not readOnly and --shardsvr - -TEST_F(ShardingStateTest, +/** + * not readOnly and --shardsvr + */ +TEST_F(ShardingInitializationMongoDTest, InitializeShardingAwarenessIfNeededNotReadOnlyAndShardServerAndNoShardIdentity) { - auto swShardingInitialized = - shardingState()->initializeShardingAwarenessIfNeeded(operationContext()); - ASSERT_OK(swShardingInitialized); - ASSERT_FALSE(swShardingInitialized.getValue()); + ASSERT(!shardingInitialization()->initializeShardingAwarenessIfNeeded(operationContext())); } -TEST_F(ShardingStateTest, +TEST_F(ShardingInitializationMongoDTest, InitializeShardingAwarenessIfNeededNotReadOnlyAndShardServerAndInvalidShardIdentity) { // Insert the shardIdentity doc to disk while pretending that we are in "standalone" mode, // otherwise OpObserver for inserts will prevent the insert from occurring because the @@ -517,12 +447,13 @@ TEST_F(ShardingStateTest, invalidShardIdentity); } - auto swShardingInitialized = - shardingState()->initializeShardingAwarenessIfNeeded(operationContext()); - ASSERT_EQUALS(ErrorCodes::UnsupportedFormat, swShardingInitialized.getStatus().code()); + ASSERT_THROWS_CODE( + shardingInitialization()->initializeShardingAwarenessIfNeeded(operationContext()), + AssertionException, + ErrorCodes::UnsupportedFormat); } -TEST_F(ShardingStateTest, +TEST_F(ShardingInitializationMongoDTest, InitializeShardingAwarenessIfNeededNotReadOnlyAndShardServerAndValidShardIdentity) { // Insert the shardIdentity doc to disk while pretending that we are in "standalone" mode, // otherwise OpObserver for inserts will prevent the insert from occurring because the @@ -544,25 +475,20 @@ TEST_F(ShardingStateTest, validShardIdentity); } - auto swShardingInitialized = - shardingState()->initializeShardingAwarenessIfNeeded(operationContext()); - ASSERT_OK(swShardingInitialized); - ASSERT_TRUE(swShardingInitialized.getValue()); + ASSERT(shardingInitialization()->initializeShardingAwarenessIfNeeded(operationContext())); } -// not readOnly and not --shardsvr - -TEST_F(ShardingStateTest, +/** + * not readOnly and not --shardsvr + */ +TEST_F(ShardingInitializationMongoDTest, InitializeShardingAwarenessIfNeededNotReadOnlyAndNotShardServerAndNoShardIdentity) { ScopedSetStandaloneMode standalone(getServiceContext()); - auto swShardingInitialized = - shardingState()->initializeShardingAwarenessIfNeeded(operationContext()); - ASSERT_OK(swShardingInitialized); - ASSERT_FALSE(swShardingInitialized.getValue()); + ASSERT(!shardingInitialization()->initializeShardingAwarenessIfNeeded(operationContext())); } -TEST_F(ShardingStateTest, +TEST_F(ShardingInitializationMongoDTest, InitializeShardingAwarenessIfNeededNotReadOnlyAndNotShardServerAndInvalidShardIdentity) { ScopedSetStandaloneMode standalone(getServiceContext()); @@ -574,13 +500,10 @@ TEST_F(ShardingStateTest, // The shardIdentity doc on disk, even if invalid, is ignored if the ClusterRole is None. This // is to allow fixing the shardIdentity doc by starting without --shardsvr. - auto swShardingInitialized = - shardingState()->initializeShardingAwarenessIfNeeded(operationContext()); - ASSERT_OK(swShardingInitialized); - ASSERT_FALSE(swShardingInitialized.getValue()); + ASSERT(!shardingInitialization()->initializeShardingAwarenessIfNeeded(operationContext())); } -TEST_F(ShardingStateTest, +TEST_F(ShardingInitializationMongoDTest, InitializeShardingAwarenessIfNeededNotReadOnlyAndNotShardServerAndValidShardIdentity) { ScopedSetStandaloneMode standalone(getServiceContext()); @@ -597,11 +520,9 @@ TEST_F(ShardingStateTest, _dbDirectClient->insert(NamespaceString::kServerConfigurationNamespace.toString(), validShardIdentity); - // The shardIdentity doc on disk is ignored if ClusterRole is None. - auto swShardingInitialized = - shardingState()->initializeShardingAwarenessIfNeeded(operationContext()); - ASSERT_OK(swShardingInitialized); - ASSERT_FALSE(swShardingInitialized.getValue()); + // The shardIdentity doc on disk, even if invalid, is ignored if the ClusterRole is None. This + // is to allow fixing the shardIdentity doc by starting without --shardsvr. + ASSERT(!shardingInitialization()->initializeShardingAwarenessIfNeeded(operationContext())); } } // namespace diff --git a/src/mongo/db/s/sharding_initialization_op_observer_test.cpp b/src/mongo/db/s/sharding_initialization_op_observer_test.cpp new file mode 100644 index 00000000000..3937cb06f0a --- /dev/null +++ b/src/mongo/db/s/sharding_initialization_op_observer_test.cpp @@ -0,0 +1,149 @@ +/** + * Copyright (C) 2018 MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects + * for all of the code used other than as permitted herein. If you modify + * file(s) with this exception, you may extend this exception to your + * version of the file(s), but you are not obligated to do so. If you do not + * wish to do so, delete this exception statement from your version. If you + * delete this exception statement from all source files in the program, + * then also delete it in the license file. + */ + +#include "mongo/platform/basic.h" + +#include "mongo/client/remote_command_targeter_mock.h" +#include "mongo/db/catalog_raii.h" +#include "mongo/db/concurrency/d_concurrency.h" +#include "mongo/db/dbdirectclient.h" +#include "mongo/db/op_observer_impl.h" +#include "mongo/db/op_observer_registry.h" +#include "mongo/db/repl/replication_coordinator_mock.h" +#include "mongo/db/s/config_server_op_observer.h" +#include "mongo/db/s/shard_server_catalog_cache_loader.h" +#include "mongo/db/s/shard_server_op_observer.h" +#include "mongo/db/s/sharding_initialization_mongod.h" +#include "mongo/db/s/type_shard_identity.h" +#include "mongo/db/server_options.h" +#include "mongo/s/catalog/dist_lock_manager_mock.h" +#include "mongo/s/catalog/sharding_catalog_client_impl.h" +#include "mongo/s/client/shard_registry.h" +#include "mongo/s/config_server_catalog_cache_loader.h" +#include "mongo/s/shard_server_test_fixture.h" + +namespace mongo { +namespace { + +const std::string kShardName("TestShard"); + +/** + * This test suite validates that when the default OpObserver chain is set up (which happens to + * include the ShardServerOpObserver), writes to the 'admin.system.version' collection (and the + * shardIdentity document specifically) will invoke the sharding initialization code. + */ +class ShardingInitializationOpObserverTest : public ShardServerTestFixture { +public: + void setUp() override { + ShardServerTestFixture::setUp(); + + // NOTE: this assumes that globalInit will always be called on the same thread as the main + // test thread + ShardingInitializationMongoD::get(operationContext()) + ->setGlobalInitMethodForTest([this](OperationContext* opCtx, + const ShardIdentity& shardIdentity, + StringData distLockProcessId) { + _initCallCount++; + return Status::OK(); + }); + } + + void tearDown() override { + ShardingState::get(getServiceContext())->clearForTests(); + + ShardServerTestFixture::tearDown(); + } + + int getInitCallCount() const { + return _initCallCount; + } + +private: + int _initCallCount = 0; +}; + +TEST_F(ShardingInitializationOpObserverTest, GlobalInitGetsCalledAfterWriteCommits) { + ShardIdentityType shardIdentity; + shardIdentity.setConfigsvrConnectionString( + ConnectionString(ConnectionString::SET, "a:1,b:2", "config")); + shardIdentity.setShardName(kShardName); + shardIdentity.setClusterId(OID::gen()); + + DBDirectClient client(operationContext()); + client.insert("admin.system.version", shardIdentity.toShardIdentityDocument()); + ASSERT_EQ(1, getInitCallCount()); +} + +TEST_F(ShardingInitializationOpObserverTest, GlobalInitDoesntGetCalledIfWriteAborts) { + ShardIdentityType shardIdentity; + shardIdentity.setConfigsvrConnectionString( + ConnectionString(ConnectionString::SET, "a:1,b:2", "config")); + shardIdentity.setShardName(kShardName); + shardIdentity.setClusterId(OID::gen()); + + // This part of the test ensures that the collection exists for the AutoGetCollection below to + // find and also validates that the initializer does not get called for non-sharding documents + DBDirectClient client(operationContext()); + client.insert("admin.system.version", BSON("_id" << 1)); + ASSERT_EQ(0, getInitCallCount()); + + { + AutoGetCollection autoColl( + operationContext(), NamespaceString("admin.system.version"), MODE_IX); + + WriteUnitOfWork wuow(operationContext()); + ASSERT_OK(autoColl.getCollection()->insertDocument( + operationContext(), shardIdentity.toShardIdentityDocument(), {})); + ASSERT_EQ(0, getInitCallCount()); + } + + ASSERT_EQ(0, getInitCallCount()); +} + +TEST_F(ShardingInitializationOpObserverTest, GlobalInitDoesntGetsCalledIfNSIsNotForShardIdentity) { + ShardIdentityType shardIdentity; + shardIdentity.setConfigsvrConnectionString( + ConnectionString(ConnectionString::SET, "a:1,b:2", "config")); + shardIdentity.setShardName(kShardName); + shardIdentity.setClusterId(OID::gen()); + + DBDirectClient client(operationContext()); + client.insert("admin.user", shardIdentity.toShardIdentityDocument()); + ASSERT_EQ(0, getInitCallCount()); +} + +TEST_F(ShardingInitializationOpObserverTest, OnInsertOpThrowWithIncompleteShardIdentityDocument) { + DBDirectClient client(operationContext()); + client.insert("admin.system.version", + BSON("_id" << ShardIdentityType::IdName << ShardIdentity::kShardNameFieldName + << kShardName)); + ASSERT(!client.getLastError().empty()); +} + +} // namespace +} // namespace mongo diff --git a/src/mongo/db/s/sharding_state.cpp b/src/mongo/db/s/sharding_state.cpp index 8535def6517..8541a3b45c6 100644 --- a/src/mongo/db/s/sharding_state.cpp +++ b/src/mongo/db/s/sharding_state.cpp @@ -32,31 +32,9 @@ #include "mongo/db/s/sharding_state.h" -#include "mongo/bson/util/bson_extract.h" -#include "mongo/client/connection_string.h" -#include "mongo/client/replica_set_monitor.h" -#include "mongo/db/catalog_raii.h" -#include "mongo/db/client.h" -#include "mongo/db/dbhelpers.h" -#include "mongo/db/operation_context.h" -#include "mongo/db/ops/update.h" -#include "mongo/db/ops/update_lifecycle_impl.h" -#include "mongo/db/repl/optime.h" -#include "mongo/db/repl/replication_coordinator.h" -#include "mongo/db/s/chunk_splitter.h" #include "mongo/db/s/operation_sharding_state.h" -#include "mongo/db/s/periodic_balancer_config_refresher.h" #include "mongo/db/s/sharded_connection_info.h" -#include "mongo/db/s/sharding_initialization_mongod.h" -#include "mongo/db/s/type_shard_identity.h" -#include "mongo/executor/task_executor_pool.h" -#include "mongo/rpc/metadata/metadata_hook.h" -#include "mongo/s/catalog/sharding_catalog_client.h" -#include "mongo/s/catalog_cache_loader.h" -#include "mongo/s/client/shard_registry.h" -#include "mongo/s/client/sharding_network_connection_hook.h" -#include "mongo/s/grid.h" -#include "mongo/s/sharding_initialization.h" +#include "mongo/db/server_options.h" #include "mongo/util/log.h" namespace mongo { @@ -64,40 +42,9 @@ namespace { const auto getShardingState = ServiceContext::declareDecoration<ShardingState>(); -/** - * Updates the config server field of the shardIdentity document with the given connection string - * if setName is equal to the config server replica set name. - * - * Note: This is intended to be used on a new thread that hasn't called Client::initThread. - * One example use case is for the ReplicaSetMonitor asynchronous callback when it detects changes - * to replica set membership. - */ -void updateShardIdentityConfigStringCB(const std::string& setName, - const std::string& newConnectionString) { - auto configsvrConnStr = - Grid::get(getGlobalServiceContext())->shardRegistry()->getConfigServerConnectionString(); - if (configsvrConnStr.getSetName() != setName) { - // Ignore all change notification for other sets that are not the config server. - return; - } - - Client::initThread("updateShardIdentityConfigConnString"); - auto uniqOpCtx = Client::getCurrent()->makeOperationContext(); - - auto status = ShardingState::get(uniqOpCtx.get()) - ->updateShardIdentityConfigString(uniqOpCtx.get(), newConnectionString); - if (!status.isOK() && !ErrorCodes::isNotMasterError(status.code())) { - warning() << "error encountered while trying to update config connection string to " - << newConnectionString << causedBy(redact(status)); - } -} - } // namespace -ShardingState::ShardingState() - : _globalInit(&initializeGlobalShardingStateForMongod), - _initializationState(static_cast<uint32_t>(InitializationState::kNew)), - _initializationStatus(Status(ErrorCodes::InternalError, "Uninitialized value")) {} +ShardingState::ShardingState() = default; ShardingState::~ShardingState() = default; @@ -109,13 +56,38 @@ ShardingState* ShardingState::get(OperationContext* operationContext) { return ShardingState::get(operationContext->getServiceContext()); } -bool ShardingState::enabled() const { - return _getInitializationState() == InitializationState::kInitialized; +void ShardingState::setInitialized(ShardId shardId, OID clusterId) { + stdx::unique_lock<stdx::mutex> ul(_mutex); + invariant(_getInitializationState() == InitializationState::kNew); + + _shardId = std::move(shardId); + _clusterId = std::move(clusterId); + _initializationStatus = Status::OK(); + + _initializationState.store(static_cast<uint32_t>(InitializationState::kInitialized)); +} + +void ShardingState::setInitialized(Status failedStatus) { + invariant(!failedStatus.isOK()); + log() << "Failed to initialize sharding components" << causedBy(failedStatus); + + stdx::unique_lock<stdx::mutex> ul(_mutex); + invariant(_getInitializationState() == InitializationState::kNew); + + _initializationStatus = std::move(failedStatus); + _initializationState.store(static_cast<uint32_t>(InitializationState::kError)); } -void ShardingState::setEnabledForTest(const std::string& shardName) { - _shardId = shardName; - _setInitializationState(InitializationState::kInitialized); +boost::optional<Status> ShardingState::initializationStatus() { + stdx::unique_lock<stdx::mutex> ul(_mutex); + if (_getInitializationState() == InitializationState::kNew) + return boost::none; + + return _initializationStatus; +} + +bool ShardingState::enabled() const { + return _getInitializationState() == InitializationState::kInitialized; } Status ShardingState::canAcceptShardedCommands() const { @@ -155,243 +127,8 @@ bool ShardingState::needCollectionMetadata(OperationContext* opCtx, const std::s OperationShardingState::get(opCtx).hasShardVersion(); } -void ShardingState::shutDown(OperationContext* opCtx) { - stdx::unique_lock<stdx::mutex> lk(_mutex); - if (enabled()) { - Grid::get(opCtx)->getExecutorPool()->shutdownAndJoin(); - Grid::get(opCtx)->catalogClient()->shutDown(opCtx); - } -} - -void ShardingState::setGlobalInitMethodForTest(GlobalInitFunc func) { - _globalInit = func; -} - -Status ShardingState::initializeFromShardIdentity(OperationContext* opCtx, - const ShardIdentityType& shardIdentity) { - invariant(serverGlobalParams.clusterRole == ClusterRole::ShardServer); - invariant(opCtx->lockState()->isLocked()); - - Status validationStatus = shardIdentity.validate(); - if (!validationStatus.isOK()) { - return validationStatus.withContext( - "Invalid shard identity document found when initializing sharding state"); - } - - log() << "initializing sharding state with: " << shardIdentity; - - stdx::unique_lock<stdx::mutex> lk(_mutex); - - const auto& configSvrConnStr = shardIdentity.getConfigsvrConnectionString(); - - if (enabled()) { - invariant(_shardId.isValid()); - fassert(40372, _shardId == shardIdentity.getShardName()); - - auto prevConfigsvrConnStr = - Grid::get(opCtx)->shardRegistry()->getConfigServerConnectionString(); - invariant(prevConfigsvrConnStr.type() == ConnectionString::SET); - fassert(40373, prevConfigsvrConnStr.getSetName() == configSvrConnStr.getSetName()); - - invariant(_clusterId.isSet()); - fassert(40374, _clusterId == shardIdentity.getClusterId()); - - return Status::OK(); - } - - if (_getInitializationState() == InitializationState::kError) { - return {ErrorCodes::ManualInterventionRequired, - str::stream() << "Server's sharding metadata manager failed to initialize and will " - "remain in this state until the instance is manually reset" - << causedBy(_initializationStatus)}; - } - - try { - Status status = _globalInit(opCtx, configSvrConnStr, generateDistLockProcessId(opCtx)); - if (status.isOK()) { - ReplicaSetMonitor::setSynchronousConfigChangeHook( - &ShardRegistry::replicaSetChangeShardRegistryUpdateHook); - ReplicaSetMonitor::setAsynchronousConfigChangeHook(&updateShardIdentityConfigStringCB); - - // Determine primary/secondary/standalone state in order to properly initialize sharding - // components. - auto replCoord = repl::ReplicationCoordinator::get(opCtx); - bool isReplSet = - replCoord->getReplicationMode() == repl::ReplicationCoordinator::modeReplSet; - bool isStandaloneOrPrimary = - !isReplSet || (repl::ReplicationCoordinator::get(opCtx)->getMemberState() == - repl::MemberState::RS_PRIMARY); - - CatalogCacheLoader::get(opCtx).initializeReplicaSetRole(isStandaloneOrPrimary); - ChunkSplitter::get(opCtx).onShardingInitialization(isStandaloneOrPrimary); - PeriodicBalancerConfigRefresher::get(opCtx).onShardingInitialization( - opCtx->getServiceContext(), isStandaloneOrPrimary); - - log() << "initialized sharding components for " - << (isStandaloneOrPrimary ? "primary" : "secondary") << " node."; - _setInitializationState(InitializationState::kInitialized); - } else { - log() << "failed to initialize sharding components" << causedBy(status); - _initializationStatus = status; - _setInitializationState(InitializationState::kError); - } - _shardId = shardIdentity.getShardName().toString(); - _clusterId = shardIdentity.getClusterId(); - - return status; - } catch (const DBException& ex) { - auto errorStatus = ex.toStatus(); - _initializationStatus = errorStatus; - _setInitializationState(InitializationState::kError); - return errorStatus; - } - - MONGO_UNREACHABLE; -} - -ShardingState::InitializationState ShardingState::_getInitializationState() const { - return static_cast<InitializationState>(_initializationState.load()); -} - -void ShardingState::_setInitializationState(InitializationState newState) { - _initializationState.store(static_cast<uint32_t>(newState)); -} - -StatusWith<bool> ShardingState::initializeShardingAwarenessIfNeeded(OperationContext* opCtx) { - invariant(!opCtx->lockState()->isLocked()); - - // In sharded readOnly mode, we ignore the shardIdentity document on disk and instead *require* - // a shardIdentity document to be passed through --overrideShardIdentity. - if (storageGlobalParams.readOnly) { - if (serverGlobalParams.clusterRole == ClusterRole::ShardServer) { - if (serverGlobalParams.overrideShardIdentity.isEmpty()) { - return {ErrorCodes::InvalidOptions, - "If started with --shardsvr in queryableBackupMode, a shardIdentity " - "document must be provided through --overrideShardIdentity"}; - } - auto swOverrideShardIdentity = ShardIdentityType::fromShardIdentityDocument( - serverGlobalParams.overrideShardIdentity); - if (!swOverrideShardIdentity.isOK()) { - return swOverrideShardIdentity.getStatus(); - } - { - // Global lock is required to call initializeFromShardIdenetity(). - Lock::GlobalWrite lk(opCtx); - auto status = - initializeFromShardIdentity(opCtx, swOverrideShardIdentity.getValue()); - if (!status.isOK()) { - return status; - } - } - return true; - } else { - // Error if --overrideShardIdentity is used but *not* started with --shardsvr. - if (!serverGlobalParams.overrideShardIdentity.isEmpty()) { - return { - ErrorCodes::InvalidOptions, - str::stream() - << "Not started with --shardsvr, but a shardIdentity document was provided " - "through --overrideShardIdentity: " - << serverGlobalParams.overrideShardIdentity}; - } - return false; - } - } - // In sharded *non*-readOnly mode, error if --overrideShardIdentity is provided. Use the - // shardIdentity document on disk if one exists, but it is okay if no shardIdentity document is - // provided at all (sharding awareness will be initialized when a shardIdentity document is - // inserted). - else { - if (!serverGlobalParams.overrideShardIdentity.isEmpty()) { - return { - ErrorCodes::InvalidOptions, - str::stream() << "--overrideShardIdentity is only allowed in sharded " - "queryableBackupMode. If not in queryableBackupMode, you can edit " - "the shardIdentity document by starting the server *without* " - "--shardsvr, manually updating the shardIdentity document in the " - << NamespaceString::kServerConfigurationNamespace.toString() - << " collection, and restarting the server with --shardsvr."}; - } - - // Load the shardIdentity document from disk. - BSONObj shardIdentityBSON; - bool foundShardIdentity = false; - try { - AutoGetCollection autoColl( - opCtx, NamespaceString::kServerConfigurationNamespace, MODE_IS); - foundShardIdentity = Helpers::findOne(opCtx, - autoColl.getCollection(), - BSON("_id" << ShardIdentityType::IdName), - shardIdentityBSON); - } catch (const DBException& ex) { - return ex.toStatus(); - } - - if (serverGlobalParams.clusterRole == ClusterRole::ShardServer) { - if (!foundShardIdentity) { - warning() << "Started with --shardsvr, but no shardIdentity document was found on " - "disk in " - << NamespaceString::kServerConfigurationNamespace - << ". This most likely means this server has not yet been added to a " - "sharded cluster."; - return false; - } - - invariant(!shardIdentityBSON.isEmpty()); - - auto swShardIdentity = ShardIdentityType::fromShardIdentityDocument(shardIdentityBSON); - if (!swShardIdentity.isOK()) { - return swShardIdentity.getStatus(); - } - { - // Global lock is required to call initializeFromShardIdenetity(). - Lock::GlobalWrite lk(opCtx); - auto status = initializeFromShardIdentity(opCtx, swShardIdentity.getValue()); - if (!status.isOK()) { - return status; - } - } - return true; - } else { - // Warn if a shardIdentity document is found on disk but *not* started with --shardsvr. - if (!shardIdentityBSON.isEmpty()) { - warning() << "Not started with --shardsvr, but a shardIdentity document was found " - "on disk in " - << NamespaceString::kServerConfigurationNamespace << ": " - << shardIdentityBSON; - } - return false; - } - } -} - -Status ShardingState::updateShardIdentityConfigString(OperationContext* opCtx, - const std::string& newConnectionString) { - BSONObj updateObj(ShardIdentityType::createConfigServerUpdateObject(newConnectionString)); - - UpdateRequest updateReq(NamespaceString::kServerConfigurationNamespace); - updateReq.setQuery(BSON("_id" << ShardIdentityType::IdName)); - updateReq.setUpdates(updateObj); - UpdateLifecycleImpl updateLifecycle(NamespaceString::kServerConfigurationNamespace); - updateReq.setLifecycle(&updateLifecycle); - - try { - AutoGetOrCreateDb autoDb( - opCtx, NamespaceString::kServerConfigurationNamespace.db(), MODE_X); - - auto result = update(opCtx, autoDb.getDb(), updateReq); - if (result.numMatched == 0) { - warning() << "failed to update config string of shard identity document because " - << "it does not exist. This shard could have been removed from the cluster"; - } else { - LOG(2) << "Updated config server connection string in shardIdentity document to" - << newConnectionString; - } - } catch (const DBException& exception) { - return exception.toStatus(); - } - - return Status::OK(); +void ShardingState::clearForTests() { + _initializationState.store(static_cast<uint32_t>(InitializationState::kNew)); } } // namespace mongo diff --git a/src/mongo/db/s/sharding_state.h b/src/mongo/db/s/sharding_state.h index 5a4145675b2..efdb45a30ec 100644 --- a/src/mongo/db/s/sharding_state.h +++ b/src/mongo/db/s/sharding_state.h @@ -33,50 +33,48 @@ #include "mongo/base/disallow_copying.h" #include "mongo/bson/oid.h" #include "mongo/s/shard_id.h" -#include "mongo/stdx/functional.h" #include "mongo/stdx/mutex.h" namespace mongo { -class BSONObj; -class BSONObjBuilder; -class ConnectionString; class OperationContext; class ServiceContext; -class ShardIdentityType; -class Status; - -namespace repl { -class OpTime; -} // namespace repl /** - * Contains the global sharding state for a running mongod. There is one instance of this object per - * service context and it is never destroyed for the lifetime of the context. + * Contains information about the shardingness of a running mongod. This is a passive class and its + * state and concurrency control is driven from outside (specifically ShardingInitializationMongoD, + * which should be its only caller). + * + * There is one instance of this object per service context and once 'setInitialized' is called, it + * never gets destroyed or uninitialized. */ class ShardingState { MONGO_DISALLOW_COPYING(ShardingState); public: - using GlobalInitFunc = - stdx::function<Status(OperationContext*, const ConnectionString&, StringData)>; - ShardingState(); ~ShardingState(); - /** - * Retrieves the sharding state object associated with the specified service context. This - * method must only be called if ShardingState decoration has been created on the service - * context, otherwise it will fassert. In other words, it may only be called on MongoD and - * tests, which specifically require and instantiate ShardingState. - * - * Returns the instance's ShardingState. - */ static ShardingState* get(ServiceContext* serviceContext); static ShardingState* get(OperationContext* operationContext); /** - * Returns true if ShardingState has been successfully initialized. + * Puts the sharding state singleton in the "initialization completed" state with either + * successful initialization or an error. This method may only be called once for the lifetime + * of the object. + */ + void setInitialized(ShardId shardId, OID clusterId); + void setInitialized(Status failedStatus); + + /** + * If 'setInitialized' has not been called, returns boost::none. Otherwise, returns the status + * with which 'setInitialized' was called. This is used by the initialization sequence to decide + * whether to set up the sharding services. + */ + boost::optional<Status> initializationStatus(); + + /** + * Returns true if 'setInitialized' has been called with shardId and clusterId. * * Code that needs to perform extra actions if sharding is initialized, but does not need to * error if not, should use this. Alternatively, see ShardingState::canAcceptShardedCommands(). @@ -112,54 +110,11 @@ public: bool needCollectionMetadata(OperationContext* opCtx, const std::string& ns); /** - * Shuts down sharding machinery on the shard. - */ - void shutDown(OperationContext* opCtx); - - /** - * Updates the config server field of the shardIdentity document with the given connection - * string. - * - * Note: this can return NotMaster error. + * For testing only. This is a workaround for the fact that it is not possible to get a clean + * ServiceContext in between test executions. Because of this, tests which require that they get + * started with a clean (uninitialized) ShardingState must invoke this in their tearDown method. */ - Status updateShardIdentityConfigString(OperationContext* opCtx, - const std::string& newConnectionString); - - /** - * If started with --shardsvr, initializes sharding awareness from the shardIdentity document - * on disk, if there is one. - * If started with --shardsvr in queryableBackupMode, initializes sharding awareness from the - * shardIdentity document passed through the --overrideShardIdentity startup parameter. - * - * If returns true, the ShardingState::_globalInit method was called, meaning all the core - * classes for sharding were initialized, but no networking calls were made yet (with the - * exception of the duplicate ShardRegistry reload in ShardRegistry::startup() (see - * SERVER-26123). Outgoing networking calls to cluster members can now be made. - * - * Note: this function briefly takes the global lock to determine primary/secondary state. - */ - StatusWith<bool> initializeShardingAwarenessIfNeeded(OperationContext* opCtx); - - /** - * Initializes the sharding state of this server from the shard identity document argument - * and sets secondary or primary state information on the catalog cache loader. - * - * NOTE: This must be called under at least Global IX lock in order for the replica set member - * state to be stable (primary/secondary). - */ - Status initializeFromShardIdentity(OperationContext* opCtx, - const ShardIdentityType& shardIdentity); - - /** - * For testing only. Mock the initialization method used by initializeFromConfigConnString and - * initializeFromShardIdentity after all checks are performed. - */ - void setGlobalInitMethodForTest(GlobalInitFunc func); - - /** - * For testing only. Force-sets the initialization state to InitializationState::kInitialized. - */ - void setEnabledForTest(const std::string& shardName); + void clearForTests(); private: // Progress of the sharding state initialization @@ -181,30 +136,24 @@ private: /** * Returns the initialization state. */ - InitializationState _getInitializationState() const; - - /** - * Updates the initialization state. - */ - void _setInitializationState(InitializationState newState); - - // Function for initializing the external sharding state components not owned here. - GlobalInitFunc _globalInit; + InitializationState _getInitializationState() const { + return static_cast<InitializationState>(_initializationState.load()); + } // Protects state below stdx::mutex _mutex; // State of the initialization of the sharding state along with any potential errors - AtomicUInt32 _initializationState; - - // Only valid if _initializationState is kError. Contains the reason for initialization failure. - Status _initializationStatus; + AtomicUInt32 _initializationState{static_cast<uint32_t>(InitializationState::kNew)}; // Sets the shard name for this host (comes through setShardVersion) ShardId _shardId; // The id for the cluster this shard belongs to. OID _clusterId; + + // Only valid if _initializationState is kError. Contains the reason for initialization failure. + Status _initializationStatus{ErrorCodes::InternalError, "Uninitialized value"}; }; } // namespace mongo diff --git a/src/mongo/db/s/split_chunk_test.cpp b/src/mongo/db/s/split_chunk_test.cpp index 45430b5272e..8258e3472fc 100644 --- a/src/mongo/db/s/split_chunk_test.cpp +++ b/src/mongo/db/s/split_chunk_test.cpp @@ -31,11 +31,10 @@ #include <boost/optional.hpp> -#include "mongo/db/s/split_chunk.h" - #include "mongo/client/remote_command_targeter_mock.h" #include "mongo/db/json.h" -#include "mongo/db/s/sharding_state.h" +#include "mongo/db/s/sharding_initialization_mongod.h" +#include "mongo/db/s/split_chunk.h" #include "mongo/db/server_options.h" #include "mongo/executor/network_interface_mock.h" #include "mongo/executor/remote_command_request.h" @@ -66,6 +65,7 @@ public: void setUp() override { ShardServerTestFixture::setUp(); + ShardingState::get(operationContext())->setInitialized(_shardId, OID::gen()); CatalogCacheLoader::get(getServiceContext()).initializeReplicaSetRole(true); // Instantiate names. @@ -210,7 +210,6 @@ void SplitChunkTest::emptyResponse() { } TEST_F(SplitChunkTest, HashedKeyPatternNumberLongSplitKeys) { - BSONObj keyPatternObj = BSON("foo" << "hashed"); _coll.setKeyPattern(BSON("_id" @@ -222,9 +221,6 @@ TEST_F(SplitChunkTest, HashedKeyPatternNumberLongSplitKeys) { validSplitKeys.push_back(BSON("foo" << i)); } - // Force-set the sharding state to enabled with the _shardId, for testing purposes. - ShardingState::get(operationContext())->setEnabledForTest(_shardId.toString()); - expectLock(); // Call the splitChunk function asynchronously on a different thread, so that we do not block, @@ -260,7 +256,6 @@ TEST_F(SplitChunkTest, HashedKeyPatternNumberLongSplitKeys) { } TEST_F(SplitChunkTest, HashedKeyPatternIntegerSplitKeys) { - BSONObj keyPatternObj = BSON("foo" << "hashed"); _coll.setKeyPattern(BSON("_id" @@ -271,9 +266,6 @@ TEST_F(SplitChunkTest, HashedKeyPatternIntegerSplitKeys) { std::vector<BSONObj> invalidSplitKeys{ BSON("foo" << -1), BSON("foo" << 0), BSON("foo" << 1), BSON("foo" << 42)}; - // Force-set the sharding state to enabled with the _shardId, for testing purposes. - ShardingState::get(operationContext())->setEnabledForTest(_shardId.toString()); - expectLock(); // Call the splitChunk function asynchronously on a different thread, so that we do not block, @@ -302,7 +294,6 @@ TEST_F(SplitChunkTest, HashedKeyPatternIntegerSplitKeys) { } TEST_F(SplitChunkTest, HashedKeyPatternDoubleSplitKeys) { - BSONObj keyPatternObj = BSON("foo" << "hashed"); _coll.setKeyPattern(BSON("_id" @@ -313,9 +304,6 @@ TEST_F(SplitChunkTest, HashedKeyPatternDoubleSplitKeys) { std::vector<BSONObj> invalidSplitKeys{ BSON("foo" << 47.21230129), BSON("foo" << 1.0), BSON("foo" << 0.0), BSON("foo" << -0.001)}; - // Force-set the sharding state to enabled with the _shardId, for testing purposes. - ShardingState::get(operationContext())->setEnabledForTest(_shardId.toString()); - expectLock(); // Call the splitChunk function asynchronously on a different thread, so that we do not block, @@ -344,7 +332,6 @@ TEST_F(SplitChunkTest, HashedKeyPatternDoubleSplitKeys) { } TEST_F(SplitChunkTest, HashedKeyPatternStringSplitKeys) { - BSONObj keyPatternObj = BSON("foo" << "hashed"); _coll.setKeyPattern(BSON("_id" @@ -361,9 +348,6 @@ TEST_F(SplitChunkTest, HashedKeyPatternStringSplitKeys) { BSON("foo" << "")}; - // Force-set the sharding state to enabled with the _shardId, for testing purposes. - ShardingState::get(operationContext())->setEnabledForTest(_shardId.toString()); - expectLock(); // Call the splitChunk function asynchronously on a different thread, so that we do not block, @@ -392,7 +376,6 @@ TEST_F(SplitChunkTest, HashedKeyPatternStringSplitKeys) { } TEST_F(SplitChunkTest, ValidRangeKeyPatternSplitKeys) { - BSONObj keyPatternObj = BSON("foo" << 1); // Build a vector of valid split keys, which contains values that may not necessarily be able @@ -406,9 +389,6 @@ TEST_F(SplitChunkTest, ValidRangeKeyPatternSplitKeys) { << ""), BSON("foo" << 3.1415926535)}; - // Force-set the sharding state to enabled with the _shardId, for testing purposes. - ShardingState::get(operationContext())->setEnabledForTest(_shardId.toString()); - expectLock(); // Call the splitChunk function asynchronously on a different thread, so that we do not block, @@ -444,7 +424,6 @@ TEST_F(SplitChunkTest, ValidRangeKeyPatternSplitKeys) { } TEST_F(SplitChunkTest, SplitChunkWithNoErrors) { - BSONObj keyPatternObj = BSON("foo" << 1); // Build a vector of split keys. Note that we start at {"foo" : 256} and end at {"foo" : 768}, @@ -454,9 +433,6 @@ TEST_F(SplitChunkTest, SplitChunkWithNoErrors) { splitKeys.push_back(BSON("foo" << i)); } - // Force-set the sharding state to enabled with the _shardId, for testing purposes. - ShardingState::get(operationContext())->setEnabledForTest(_shardId.toString()); - expectLock(); // Call the splitChunk function asynchronously on a different thread, so that we do not block, @@ -509,7 +485,6 @@ TEST_F(SplitChunkTest, SplitChunkWithNoErrors) { } TEST_F(SplitChunkTest, AttemptSplitWithConfigsvrError) { - BSONObj keyPatternObj = BSON("foo" << 1); // Build a vector of split keys. Note that we start at {"foo" : 0} and end at {"foo" : 1024}, @@ -519,9 +494,6 @@ TEST_F(SplitChunkTest, AttemptSplitWithConfigsvrError) { splitKeys.push_back(BSON("foo" << i)); } - // Force-set the sharding state to enabled with the _shardId, for testing purposes. - ShardingState::get(operationContext())->setEnabledForTest(_shardId.toString()); - expectLock(); // Call the splitChunk function asynchronously on a different thread, so that we do not block, @@ -557,7 +529,6 @@ TEST_F(SplitChunkTest, AttemptSplitWithConfigsvrError) { } TEST_F(SplitChunkTest, AttemptSplitOnNoDatabases) { - BSONObj keyPatternObj = BSON("foo" << 1); // Build a vector of split keys. Note that we start at {"foo" : 256} and end at {"foo" : 768}, @@ -567,9 +538,6 @@ TEST_F(SplitChunkTest, AttemptSplitOnNoDatabases) { splitKeys.push_back(BSON("foo" << i)); } - // Force-set the sharding state to enabled with the _shardId, for testing purposes. - ShardingState::get(operationContext())->setEnabledForTest(_shardId.toString()); - expectLock(); // Call the splitChunk function asynchronously on a different thread, so that we do not block, @@ -595,7 +563,6 @@ TEST_F(SplitChunkTest, AttemptSplitOnNoDatabases) { } TEST_F(SplitChunkTest, AttemptSplitOnNoCollections) { - BSONObj keyPatternObj = BSON("foo" << 1); // Build a vector of split keys. Note that we start at {"foo" : 256} and end at {"foo" : 768}, @@ -605,9 +572,6 @@ TEST_F(SplitChunkTest, AttemptSplitOnNoCollections) { splitKeys.push_back(BSON("foo" << i)); } - // Force-set the sharding state to enabled with the _shardId, for testing purposes. - ShardingState::get(operationContext())->setEnabledForTest(_shardId.toString()); - expectLock(); // Call the splitChunk function asynchronously on a different thread, so that we do not block, @@ -636,7 +600,6 @@ TEST_F(SplitChunkTest, AttemptSplitOnNoCollections) { } TEST_F(SplitChunkTest, AttemptSplitOnNoChunks) { - BSONObj keyPatternObj = BSON("foo" << 1); // Build a vector of split keys. Note that we start at {"foo" : 256} and end at {"foo" : 768}, @@ -646,9 +609,6 @@ TEST_F(SplitChunkTest, AttemptSplitOnNoChunks) { splitKeys.push_back(BSON("foo" << i)); } - // Force-set the sharding state to enabled with the _shardId, for testing purposes. - ShardingState::get(operationContext())->setEnabledForTest(_shardId.toString()); - expectLock(); // Call the splitChunk function asynchronously on a different thread, so that we do not block, @@ -683,7 +643,6 @@ TEST_F(SplitChunkTest, AttemptSplitOnNoChunks) { } TEST_F(SplitChunkTest, NoCollectionAfterSplit) { - BSONObj keyPatternObj = BSON("foo" << 1); // Build a vector of split keys. Note that we start at {"foo" : 256} and end at {"foo" : 768}, @@ -693,9 +652,6 @@ TEST_F(SplitChunkTest, NoCollectionAfterSplit) { splitKeys.push_back(BSON("foo" << i)); } - // Force-set the sharding state to enabled with the _shardId, for testing purposes. - ShardingState::get(operationContext())->setEnabledForTest(_shardId.toString()); - expectLock(); // Call the splitChunk function asynchronously on a different thread, so that we do not block, @@ -731,7 +687,6 @@ TEST_F(SplitChunkTest, NoCollectionAfterSplit) { } TEST_F(SplitChunkTest, NoChunksAfterSplit) { - BSONObj keyPatternObj = BSON("foo" << 1); // Build a vector of split keys. Note that we start at {"foo" : 256} and end at {"foo" : 768}, @@ -741,9 +696,6 @@ TEST_F(SplitChunkTest, NoChunksAfterSplit) { splitKeys.push_back(BSON("foo" << i)); } - // Force-set the sharding state to enabled with the _shardId, for testing purposes. - ShardingState::get(operationContext())->setEnabledForTest(_shardId.toString()); - expectLock(); // Call the splitChunk function asynchronously on a different thread, so that we do not block, |