diff options
author | Kaloian Manassiev <kaloian.manassiev@mongodb.com> | 2018-07-30 15:22:26 -0400 |
---|---|---|
committer | Kaloian Manassiev <kaloian.manassiev@mongodb.com> | 2018-08-13 17:49:02 -0400 |
commit | 200c3dc58410d8b3287a2075cc9b2ad085100e83 (patch) | |
tree | 6025b0042d610f3040b65c6c58cba45825122ca1 /src/mongo/db/s/sharding_state.cpp | |
parent | 55ca9666a9f40ab480f399f4ed9d915bf55925ef (diff) | |
download | mongo-200c3dc58410d8b3287a2075cc9b2ad085100e83.tar.gz |
SERVER-29908 Move all runtime logic out of ShardingState
... and move it into a ShardingInitializationMongoD class, which is
responsible for driving the sharding-awareness of the node and setting
it onto ShardingState.
Also gets rid of the 'sharding' library, so there is no more library
dependency cycle.
Diffstat (limited to 'src/mongo/db/s/sharding_state.cpp')
-rw-r--r-- | src/mongo/db/s/sharding_state.cpp | 331 |
1 files changed, 34 insertions, 297 deletions
diff --git a/src/mongo/db/s/sharding_state.cpp b/src/mongo/db/s/sharding_state.cpp index 8535def6517..8541a3b45c6 100644 --- a/src/mongo/db/s/sharding_state.cpp +++ b/src/mongo/db/s/sharding_state.cpp @@ -32,31 +32,9 @@ #include "mongo/db/s/sharding_state.h" -#include "mongo/bson/util/bson_extract.h" -#include "mongo/client/connection_string.h" -#include "mongo/client/replica_set_monitor.h" -#include "mongo/db/catalog_raii.h" -#include "mongo/db/client.h" -#include "mongo/db/dbhelpers.h" -#include "mongo/db/operation_context.h" -#include "mongo/db/ops/update.h" -#include "mongo/db/ops/update_lifecycle_impl.h" -#include "mongo/db/repl/optime.h" -#include "mongo/db/repl/replication_coordinator.h" -#include "mongo/db/s/chunk_splitter.h" #include "mongo/db/s/operation_sharding_state.h" -#include "mongo/db/s/periodic_balancer_config_refresher.h" #include "mongo/db/s/sharded_connection_info.h" -#include "mongo/db/s/sharding_initialization_mongod.h" -#include "mongo/db/s/type_shard_identity.h" -#include "mongo/executor/task_executor_pool.h" -#include "mongo/rpc/metadata/metadata_hook.h" -#include "mongo/s/catalog/sharding_catalog_client.h" -#include "mongo/s/catalog_cache_loader.h" -#include "mongo/s/client/shard_registry.h" -#include "mongo/s/client/sharding_network_connection_hook.h" -#include "mongo/s/grid.h" -#include "mongo/s/sharding_initialization.h" +#include "mongo/db/server_options.h" #include "mongo/util/log.h" namespace mongo { @@ -64,40 +42,9 @@ namespace { const auto getShardingState = ServiceContext::declareDecoration<ShardingState>(); -/** - * Updates the config server field of the shardIdentity document with the given connection string - * if setName is equal to the config server replica set name. - * - * Note: This is intended to be used on a new thread that hasn't called Client::initThread. - * One example use case is for the ReplicaSetMonitor asynchronous callback when it detects changes - * to replica set membership. - */ -void updateShardIdentityConfigStringCB(const std::string& setName, - const std::string& newConnectionString) { - auto configsvrConnStr = - Grid::get(getGlobalServiceContext())->shardRegistry()->getConfigServerConnectionString(); - if (configsvrConnStr.getSetName() != setName) { - // Ignore all change notification for other sets that are not the config server. - return; - } - - Client::initThread("updateShardIdentityConfigConnString"); - auto uniqOpCtx = Client::getCurrent()->makeOperationContext(); - - auto status = ShardingState::get(uniqOpCtx.get()) - ->updateShardIdentityConfigString(uniqOpCtx.get(), newConnectionString); - if (!status.isOK() && !ErrorCodes::isNotMasterError(status.code())) { - warning() << "error encountered while trying to update config connection string to " - << newConnectionString << causedBy(redact(status)); - } -} - } // namespace -ShardingState::ShardingState() - : _globalInit(&initializeGlobalShardingStateForMongod), - _initializationState(static_cast<uint32_t>(InitializationState::kNew)), - _initializationStatus(Status(ErrorCodes::InternalError, "Uninitialized value")) {} +ShardingState::ShardingState() = default; ShardingState::~ShardingState() = default; @@ -109,13 +56,38 @@ ShardingState* ShardingState::get(OperationContext* operationContext) { return ShardingState::get(operationContext->getServiceContext()); } -bool ShardingState::enabled() const { - return _getInitializationState() == InitializationState::kInitialized; +void ShardingState::setInitialized(ShardId shardId, OID clusterId) { + stdx::unique_lock<stdx::mutex> ul(_mutex); + invariant(_getInitializationState() == InitializationState::kNew); + + _shardId = std::move(shardId); + _clusterId = std::move(clusterId); + _initializationStatus = Status::OK(); + + _initializationState.store(static_cast<uint32_t>(InitializationState::kInitialized)); +} + +void ShardingState::setInitialized(Status failedStatus) { + invariant(!failedStatus.isOK()); + log() << "Failed to initialize sharding components" << causedBy(failedStatus); + + stdx::unique_lock<stdx::mutex> ul(_mutex); + invariant(_getInitializationState() == InitializationState::kNew); + + _initializationStatus = std::move(failedStatus); + _initializationState.store(static_cast<uint32_t>(InitializationState::kError)); } -void ShardingState::setEnabledForTest(const std::string& shardName) { - _shardId = shardName; - _setInitializationState(InitializationState::kInitialized); +boost::optional<Status> ShardingState::initializationStatus() { + stdx::unique_lock<stdx::mutex> ul(_mutex); + if (_getInitializationState() == InitializationState::kNew) + return boost::none; + + return _initializationStatus; +} + +bool ShardingState::enabled() const { + return _getInitializationState() == InitializationState::kInitialized; } Status ShardingState::canAcceptShardedCommands() const { @@ -155,243 +127,8 @@ bool ShardingState::needCollectionMetadata(OperationContext* opCtx, const std::s OperationShardingState::get(opCtx).hasShardVersion(); } -void ShardingState::shutDown(OperationContext* opCtx) { - stdx::unique_lock<stdx::mutex> lk(_mutex); - if (enabled()) { - Grid::get(opCtx)->getExecutorPool()->shutdownAndJoin(); - Grid::get(opCtx)->catalogClient()->shutDown(opCtx); - } -} - -void ShardingState::setGlobalInitMethodForTest(GlobalInitFunc func) { - _globalInit = func; -} - -Status ShardingState::initializeFromShardIdentity(OperationContext* opCtx, - const ShardIdentityType& shardIdentity) { - invariant(serverGlobalParams.clusterRole == ClusterRole::ShardServer); - invariant(opCtx->lockState()->isLocked()); - - Status validationStatus = shardIdentity.validate(); - if (!validationStatus.isOK()) { - return validationStatus.withContext( - "Invalid shard identity document found when initializing sharding state"); - } - - log() << "initializing sharding state with: " << shardIdentity; - - stdx::unique_lock<stdx::mutex> lk(_mutex); - - const auto& configSvrConnStr = shardIdentity.getConfigsvrConnectionString(); - - if (enabled()) { - invariant(_shardId.isValid()); - fassert(40372, _shardId == shardIdentity.getShardName()); - - auto prevConfigsvrConnStr = - Grid::get(opCtx)->shardRegistry()->getConfigServerConnectionString(); - invariant(prevConfigsvrConnStr.type() == ConnectionString::SET); - fassert(40373, prevConfigsvrConnStr.getSetName() == configSvrConnStr.getSetName()); - - invariant(_clusterId.isSet()); - fassert(40374, _clusterId == shardIdentity.getClusterId()); - - return Status::OK(); - } - - if (_getInitializationState() == InitializationState::kError) { - return {ErrorCodes::ManualInterventionRequired, - str::stream() << "Server's sharding metadata manager failed to initialize and will " - "remain in this state until the instance is manually reset" - << causedBy(_initializationStatus)}; - } - - try { - Status status = _globalInit(opCtx, configSvrConnStr, generateDistLockProcessId(opCtx)); - if (status.isOK()) { - ReplicaSetMonitor::setSynchronousConfigChangeHook( - &ShardRegistry::replicaSetChangeShardRegistryUpdateHook); - ReplicaSetMonitor::setAsynchronousConfigChangeHook(&updateShardIdentityConfigStringCB); - - // Determine primary/secondary/standalone state in order to properly initialize sharding - // components. - auto replCoord = repl::ReplicationCoordinator::get(opCtx); - bool isReplSet = - replCoord->getReplicationMode() == repl::ReplicationCoordinator::modeReplSet; - bool isStandaloneOrPrimary = - !isReplSet || (repl::ReplicationCoordinator::get(opCtx)->getMemberState() == - repl::MemberState::RS_PRIMARY); - - CatalogCacheLoader::get(opCtx).initializeReplicaSetRole(isStandaloneOrPrimary); - ChunkSplitter::get(opCtx).onShardingInitialization(isStandaloneOrPrimary); - PeriodicBalancerConfigRefresher::get(opCtx).onShardingInitialization( - opCtx->getServiceContext(), isStandaloneOrPrimary); - - log() << "initialized sharding components for " - << (isStandaloneOrPrimary ? "primary" : "secondary") << " node."; - _setInitializationState(InitializationState::kInitialized); - } else { - log() << "failed to initialize sharding components" << causedBy(status); - _initializationStatus = status; - _setInitializationState(InitializationState::kError); - } - _shardId = shardIdentity.getShardName().toString(); - _clusterId = shardIdentity.getClusterId(); - - return status; - } catch (const DBException& ex) { - auto errorStatus = ex.toStatus(); - _initializationStatus = errorStatus; - _setInitializationState(InitializationState::kError); - return errorStatus; - } - - MONGO_UNREACHABLE; -} - -ShardingState::InitializationState ShardingState::_getInitializationState() const { - return static_cast<InitializationState>(_initializationState.load()); -} - -void ShardingState::_setInitializationState(InitializationState newState) { - _initializationState.store(static_cast<uint32_t>(newState)); -} - -StatusWith<bool> ShardingState::initializeShardingAwarenessIfNeeded(OperationContext* opCtx) { - invariant(!opCtx->lockState()->isLocked()); - - // In sharded readOnly mode, we ignore the shardIdentity document on disk and instead *require* - // a shardIdentity document to be passed through --overrideShardIdentity. - if (storageGlobalParams.readOnly) { - if (serverGlobalParams.clusterRole == ClusterRole::ShardServer) { - if (serverGlobalParams.overrideShardIdentity.isEmpty()) { - return {ErrorCodes::InvalidOptions, - "If started with --shardsvr in queryableBackupMode, a shardIdentity " - "document must be provided through --overrideShardIdentity"}; - } - auto swOverrideShardIdentity = ShardIdentityType::fromShardIdentityDocument( - serverGlobalParams.overrideShardIdentity); - if (!swOverrideShardIdentity.isOK()) { - return swOverrideShardIdentity.getStatus(); - } - { - // Global lock is required to call initializeFromShardIdenetity(). - Lock::GlobalWrite lk(opCtx); - auto status = - initializeFromShardIdentity(opCtx, swOverrideShardIdentity.getValue()); - if (!status.isOK()) { - return status; - } - } - return true; - } else { - // Error if --overrideShardIdentity is used but *not* started with --shardsvr. - if (!serverGlobalParams.overrideShardIdentity.isEmpty()) { - return { - ErrorCodes::InvalidOptions, - str::stream() - << "Not started with --shardsvr, but a shardIdentity document was provided " - "through --overrideShardIdentity: " - << serverGlobalParams.overrideShardIdentity}; - } - return false; - } - } - // In sharded *non*-readOnly mode, error if --overrideShardIdentity is provided. Use the - // shardIdentity document on disk if one exists, but it is okay if no shardIdentity document is - // provided at all (sharding awareness will be initialized when a shardIdentity document is - // inserted). - else { - if (!serverGlobalParams.overrideShardIdentity.isEmpty()) { - return { - ErrorCodes::InvalidOptions, - str::stream() << "--overrideShardIdentity is only allowed in sharded " - "queryableBackupMode. If not in queryableBackupMode, you can edit " - "the shardIdentity document by starting the server *without* " - "--shardsvr, manually updating the shardIdentity document in the " - << NamespaceString::kServerConfigurationNamespace.toString() - << " collection, and restarting the server with --shardsvr."}; - } - - // Load the shardIdentity document from disk. - BSONObj shardIdentityBSON; - bool foundShardIdentity = false; - try { - AutoGetCollection autoColl( - opCtx, NamespaceString::kServerConfigurationNamespace, MODE_IS); - foundShardIdentity = Helpers::findOne(opCtx, - autoColl.getCollection(), - BSON("_id" << ShardIdentityType::IdName), - shardIdentityBSON); - } catch (const DBException& ex) { - return ex.toStatus(); - } - - if (serverGlobalParams.clusterRole == ClusterRole::ShardServer) { - if (!foundShardIdentity) { - warning() << "Started with --shardsvr, but no shardIdentity document was found on " - "disk in " - << NamespaceString::kServerConfigurationNamespace - << ". This most likely means this server has not yet been added to a " - "sharded cluster."; - return false; - } - - invariant(!shardIdentityBSON.isEmpty()); - - auto swShardIdentity = ShardIdentityType::fromShardIdentityDocument(shardIdentityBSON); - if (!swShardIdentity.isOK()) { - return swShardIdentity.getStatus(); - } - { - // Global lock is required to call initializeFromShardIdenetity(). - Lock::GlobalWrite lk(opCtx); - auto status = initializeFromShardIdentity(opCtx, swShardIdentity.getValue()); - if (!status.isOK()) { - return status; - } - } - return true; - } else { - // Warn if a shardIdentity document is found on disk but *not* started with --shardsvr. - if (!shardIdentityBSON.isEmpty()) { - warning() << "Not started with --shardsvr, but a shardIdentity document was found " - "on disk in " - << NamespaceString::kServerConfigurationNamespace << ": " - << shardIdentityBSON; - } - return false; - } - } -} - -Status ShardingState::updateShardIdentityConfigString(OperationContext* opCtx, - const std::string& newConnectionString) { - BSONObj updateObj(ShardIdentityType::createConfigServerUpdateObject(newConnectionString)); - - UpdateRequest updateReq(NamespaceString::kServerConfigurationNamespace); - updateReq.setQuery(BSON("_id" << ShardIdentityType::IdName)); - updateReq.setUpdates(updateObj); - UpdateLifecycleImpl updateLifecycle(NamespaceString::kServerConfigurationNamespace); - updateReq.setLifecycle(&updateLifecycle); - - try { - AutoGetOrCreateDb autoDb( - opCtx, NamespaceString::kServerConfigurationNamespace.db(), MODE_X); - - auto result = update(opCtx, autoDb.getDb(), updateReq); - if (result.numMatched == 0) { - warning() << "failed to update config string of shard identity document because " - << "it does not exist. This shard could have been removed from the cluster"; - } else { - LOG(2) << "Updated config server connection string in shardIdentity document to" - << newConnectionString; - } - } catch (const DBException& exception) { - return exception.toStatus(); - } - - return Status::OK(); +void ShardingState::clearForTests() { + _initializationState.store(static_cast<uint32_t>(InitializationState::kNew)); } } // namespace mongo |