summaryrefslogtreecommitdiff
path: root/src/mongo/db/s/sharding_state.cpp
diff options
context:
space:
mode:
authorKaloian Manassiev <kaloian.manassiev@mongodb.com>2018-07-30 15:22:26 -0400
committerKaloian Manassiev <kaloian.manassiev@mongodb.com>2018-08-13 17:49:02 -0400
commit200c3dc58410d8b3287a2075cc9b2ad085100e83 (patch)
tree6025b0042d610f3040b65c6c58cba45825122ca1 /src/mongo/db/s/sharding_state.cpp
parent55ca9666a9f40ab480f399f4ed9d915bf55925ef (diff)
downloadmongo-200c3dc58410d8b3287a2075cc9b2ad085100e83.tar.gz
SERVER-29908 Move all runtime logic out of ShardingState
... and move it into a ShardingInitializationMongoD class, which is responsible for driving the sharding-awareness of the node and setting it onto ShardingState. Also gets rid of the 'sharding' library, so there is no more library dependency cycle.
Diffstat (limited to 'src/mongo/db/s/sharding_state.cpp')
-rw-r--r--src/mongo/db/s/sharding_state.cpp331
1 files changed, 34 insertions, 297 deletions
diff --git a/src/mongo/db/s/sharding_state.cpp b/src/mongo/db/s/sharding_state.cpp
index 8535def6517..8541a3b45c6 100644
--- a/src/mongo/db/s/sharding_state.cpp
+++ b/src/mongo/db/s/sharding_state.cpp
@@ -32,31 +32,9 @@
#include "mongo/db/s/sharding_state.h"
-#include "mongo/bson/util/bson_extract.h"
-#include "mongo/client/connection_string.h"
-#include "mongo/client/replica_set_monitor.h"
-#include "mongo/db/catalog_raii.h"
-#include "mongo/db/client.h"
-#include "mongo/db/dbhelpers.h"
-#include "mongo/db/operation_context.h"
-#include "mongo/db/ops/update.h"
-#include "mongo/db/ops/update_lifecycle_impl.h"
-#include "mongo/db/repl/optime.h"
-#include "mongo/db/repl/replication_coordinator.h"
-#include "mongo/db/s/chunk_splitter.h"
#include "mongo/db/s/operation_sharding_state.h"
-#include "mongo/db/s/periodic_balancer_config_refresher.h"
#include "mongo/db/s/sharded_connection_info.h"
-#include "mongo/db/s/sharding_initialization_mongod.h"
-#include "mongo/db/s/type_shard_identity.h"
-#include "mongo/executor/task_executor_pool.h"
-#include "mongo/rpc/metadata/metadata_hook.h"
-#include "mongo/s/catalog/sharding_catalog_client.h"
-#include "mongo/s/catalog_cache_loader.h"
-#include "mongo/s/client/shard_registry.h"
-#include "mongo/s/client/sharding_network_connection_hook.h"
-#include "mongo/s/grid.h"
-#include "mongo/s/sharding_initialization.h"
+#include "mongo/db/server_options.h"
#include "mongo/util/log.h"
namespace mongo {
@@ -64,40 +42,9 @@ namespace {
const auto getShardingState = ServiceContext::declareDecoration<ShardingState>();
-/**
- * Updates the config server field of the shardIdentity document with the given connection string
- * if setName is equal to the config server replica set name.
- *
- * Note: This is intended to be used on a new thread that hasn't called Client::initThread.
- * One example use case is for the ReplicaSetMonitor asynchronous callback when it detects changes
- * to replica set membership.
- */
-void updateShardIdentityConfigStringCB(const std::string& setName,
- const std::string& newConnectionString) {
- auto configsvrConnStr =
- Grid::get(getGlobalServiceContext())->shardRegistry()->getConfigServerConnectionString();
- if (configsvrConnStr.getSetName() != setName) {
- // Ignore all change notification for other sets that are not the config server.
- return;
- }
-
- Client::initThread("updateShardIdentityConfigConnString");
- auto uniqOpCtx = Client::getCurrent()->makeOperationContext();
-
- auto status = ShardingState::get(uniqOpCtx.get())
- ->updateShardIdentityConfigString(uniqOpCtx.get(), newConnectionString);
- if (!status.isOK() && !ErrorCodes::isNotMasterError(status.code())) {
- warning() << "error encountered while trying to update config connection string to "
- << newConnectionString << causedBy(redact(status));
- }
-}
-
} // namespace
-ShardingState::ShardingState()
- : _globalInit(&initializeGlobalShardingStateForMongod),
- _initializationState(static_cast<uint32_t>(InitializationState::kNew)),
- _initializationStatus(Status(ErrorCodes::InternalError, "Uninitialized value")) {}
+ShardingState::ShardingState() = default;
ShardingState::~ShardingState() = default;
@@ -109,13 +56,38 @@ ShardingState* ShardingState::get(OperationContext* operationContext) {
return ShardingState::get(operationContext->getServiceContext());
}
-bool ShardingState::enabled() const {
- return _getInitializationState() == InitializationState::kInitialized;
+void ShardingState::setInitialized(ShardId shardId, OID clusterId) {
+ stdx::unique_lock<stdx::mutex> ul(_mutex);
+ invariant(_getInitializationState() == InitializationState::kNew);
+
+ _shardId = std::move(shardId);
+ _clusterId = std::move(clusterId);
+ _initializationStatus = Status::OK();
+
+ _initializationState.store(static_cast<uint32_t>(InitializationState::kInitialized));
+}
+
+void ShardingState::setInitialized(Status failedStatus) {
+ invariant(!failedStatus.isOK());
+ log() << "Failed to initialize sharding components" << causedBy(failedStatus);
+
+ stdx::unique_lock<stdx::mutex> ul(_mutex);
+ invariant(_getInitializationState() == InitializationState::kNew);
+
+ _initializationStatus = std::move(failedStatus);
+ _initializationState.store(static_cast<uint32_t>(InitializationState::kError));
}
-void ShardingState::setEnabledForTest(const std::string& shardName) {
- _shardId = shardName;
- _setInitializationState(InitializationState::kInitialized);
+boost::optional<Status> ShardingState::initializationStatus() {
+ stdx::unique_lock<stdx::mutex> ul(_mutex);
+ if (_getInitializationState() == InitializationState::kNew)
+ return boost::none;
+
+ return _initializationStatus;
+}
+
+bool ShardingState::enabled() const {
+ return _getInitializationState() == InitializationState::kInitialized;
}
Status ShardingState::canAcceptShardedCommands() const {
@@ -155,243 +127,8 @@ bool ShardingState::needCollectionMetadata(OperationContext* opCtx, const std::s
OperationShardingState::get(opCtx).hasShardVersion();
}
-void ShardingState::shutDown(OperationContext* opCtx) {
- stdx::unique_lock<stdx::mutex> lk(_mutex);
- if (enabled()) {
- Grid::get(opCtx)->getExecutorPool()->shutdownAndJoin();
- Grid::get(opCtx)->catalogClient()->shutDown(opCtx);
- }
-}
-
-void ShardingState::setGlobalInitMethodForTest(GlobalInitFunc func) {
- _globalInit = func;
-}
-
-Status ShardingState::initializeFromShardIdentity(OperationContext* opCtx,
- const ShardIdentityType& shardIdentity) {
- invariant(serverGlobalParams.clusterRole == ClusterRole::ShardServer);
- invariant(opCtx->lockState()->isLocked());
-
- Status validationStatus = shardIdentity.validate();
- if (!validationStatus.isOK()) {
- return validationStatus.withContext(
- "Invalid shard identity document found when initializing sharding state");
- }
-
- log() << "initializing sharding state with: " << shardIdentity;
-
- stdx::unique_lock<stdx::mutex> lk(_mutex);
-
- const auto& configSvrConnStr = shardIdentity.getConfigsvrConnectionString();
-
- if (enabled()) {
- invariant(_shardId.isValid());
- fassert(40372, _shardId == shardIdentity.getShardName());
-
- auto prevConfigsvrConnStr =
- Grid::get(opCtx)->shardRegistry()->getConfigServerConnectionString();
- invariant(prevConfigsvrConnStr.type() == ConnectionString::SET);
- fassert(40373, prevConfigsvrConnStr.getSetName() == configSvrConnStr.getSetName());
-
- invariant(_clusterId.isSet());
- fassert(40374, _clusterId == shardIdentity.getClusterId());
-
- return Status::OK();
- }
-
- if (_getInitializationState() == InitializationState::kError) {
- return {ErrorCodes::ManualInterventionRequired,
- str::stream() << "Server's sharding metadata manager failed to initialize and will "
- "remain in this state until the instance is manually reset"
- << causedBy(_initializationStatus)};
- }
-
- try {
- Status status = _globalInit(opCtx, configSvrConnStr, generateDistLockProcessId(opCtx));
- if (status.isOK()) {
- ReplicaSetMonitor::setSynchronousConfigChangeHook(
- &ShardRegistry::replicaSetChangeShardRegistryUpdateHook);
- ReplicaSetMonitor::setAsynchronousConfigChangeHook(&updateShardIdentityConfigStringCB);
-
- // Determine primary/secondary/standalone state in order to properly initialize sharding
- // components.
- auto replCoord = repl::ReplicationCoordinator::get(opCtx);
- bool isReplSet =
- replCoord->getReplicationMode() == repl::ReplicationCoordinator::modeReplSet;
- bool isStandaloneOrPrimary =
- !isReplSet || (repl::ReplicationCoordinator::get(opCtx)->getMemberState() ==
- repl::MemberState::RS_PRIMARY);
-
- CatalogCacheLoader::get(opCtx).initializeReplicaSetRole(isStandaloneOrPrimary);
- ChunkSplitter::get(opCtx).onShardingInitialization(isStandaloneOrPrimary);
- PeriodicBalancerConfigRefresher::get(opCtx).onShardingInitialization(
- opCtx->getServiceContext(), isStandaloneOrPrimary);
-
- log() << "initialized sharding components for "
- << (isStandaloneOrPrimary ? "primary" : "secondary") << " node.";
- _setInitializationState(InitializationState::kInitialized);
- } else {
- log() << "failed to initialize sharding components" << causedBy(status);
- _initializationStatus = status;
- _setInitializationState(InitializationState::kError);
- }
- _shardId = shardIdentity.getShardName().toString();
- _clusterId = shardIdentity.getClusterId();
-
- return status;
- } catch (const DBException& ex) {
- auto errorStatus = ex.toStatus();
- _initializationStatus = errorStatus;
- _setInitializationState(InitializationState::kError);
- return errorStatus;
- }
-
- MONGO_UNREACHABLE;
-}
-
-ShardingState::InitializationState ShardingState::_getInitializationState() const {
- return static_cast<InitializationState>(_initializationState.load());
-}
-
-void ShardingState::_setInitializationState(InitializationState newState) {
- _initializationState.store(static_cast<uint32_t>(newState));
-}
-
-StatusWith<bool> ShardingState::initializeShardingAwarenessIfNeeded(OperationContext* opCtx) {
- invariant(!opCtx->lockState()->isLocked());
-
- // In sharded readOnly mode, we ignore the shardIdentity document on disk and instead *require*
- // a shardIdentity document to be passed through --overrideShardIdentity.
- if (storageGlobalParams.readOnly) {
- if (serverGlobalParams.clusterRole == ClusterRole::ShardServer) {
- if (serverGlobalParams.overrideShardIdentity.isEmpty()) {
- return {ErrorCodes::InvalidOptions,
- "If started with --shardsvr in queryableBackupMode, a shardIdentity "
- "document must be provided through --overrideShardIdentity"};
- }
- auto swOverrideShardIdentity = ShardIdentityType::fromShardIdentityDocument(
- serverGlobalParams.overrideShardIdentity);
- if (!swOverrideShardIdentity.isOK()) {
- return swOverrideShardIdentity.getStatus();
- }
- {
- // Global lock is required to call initializeFromShardIdenetity().
- Lock::GlobalWrite lk(opCtx);
- auto status =
- initializeFromShardIdentity(opCtx, swOverrideShardIdentity.getValue());
- if (!status.isOK()) {
- return status;
- }
- }
- return true;
- } else {
- // Error if --overrideShardIdentity is used but *not* started with --shardsvr.
- if (!serverGlobalParams.overrideShardIdentity.isEmpty()) {
- return {
- ErrorCodes::InvalidOptions,
- str::stream()
- << "Not started with --shardsvr, but a shardIdentity document was provided "
- "through --overrideShardIdentity: "
- << serverGlobalParams.overrideShardIdentity};
- }
- return false;
- }
- }
- // In sharded *non*-readOnly mode, error if --overrideShardIdentity is provided. Use the
- // shardIdentity document on disk if one exists, but it is okay if no shardIdentity document is
- // provided at all (sharding awareness will be initialized when a shardIdentity document is
- // inserted).
- else {
- if (!serverGlobalParams.overrideShardIdentity.isEmpty()) {
- return {
- ErrorCodes::InvalidOptions,
- str::stream() << "--overrideShardIdentity is only allowed in sharded "
- "queryableBackupMode. If not in queryableBackupMode, you can edit "
- "the shardIdentity document by starting the server *without* "
- "--shardsvr, manually updating the shardIdentity document in the "
- << NamespaceString::kServerConfigurationNamespace.toString()
- << " collection, and restarting the server with --shardsvr."};
- }
-
- // Load the shardIdentity document from disk.
- BSONObj shardIdentityBSON;
- bool foundShardIdentity = false;
- try {
- AutoGetCollection autoColl(
- opCtx, NamespaceString::kServerConfigurationNamespace, MODE_IS);
- foundShardIdentity = Helpers::findOne(opCtx,
- autoColl.getCollection(),
- BSON("_id" << ShardIdentityType::IdName),
- shardIdentityBSON);
- } catch (const DBException& ex) {
- return ex.toStatus();
- }
-
- if (serverGlobalParams.clusterRole == ClusterRole::ShardServer) {
- if (!foundShardIdentity) {
- warning() << "Started with --shardsvr, but no shardIdentity document was found on "
- "disk in "
- << NamespaceString::kServerConfigurationNamespace
- << ". This most likely means this server has not yet been added to a "
- "sharded cluster.";
- return false;
- }
-
- invariant(!shardIdentityBSON.isEmpty());
-
- auto swShardIdentity = ShardIdentityType::fromShardIdentityDocument(shardIdentityBSON);
- if (!swShardIdentity.isOK()) {
- return swShardIdentity.getStatus();
- }
- {
- // Global lock is required to call initializeFromShardIdenetity().
- Lock::GlobalWrite lk(opCtx);
- auto status = initializeFromShardIdentity(opCtx, swShardIdentity.getValue());
- if (!status.isOK()) {
- return status;
- }
- }
- return true;
- } else {
- // Warn if a shardIdentity document is found on disk but *not* started with --shardsvr.
- if (!shardIdentityBSON.isEmpty()) {
- warning() << "Not started with --shardsvr, but a shardIdentity document was found "
- "on disk in "
- << NamespaceString::kServerConfigurationNamespace << ": "
- << shardIdentityBSON;
- }
- return false;
- }
- }
-}
-
-Status ShardingState::updateShardIdentityConfigString(OperationContext* opCtx,
- const std::string& newConnectionString) {
- BSONObj updateObj(ShardIdentityType::createConfigServerUpdateObject(newConnectionString));
-
- UpdateRequest updateReq(NamespaceString::kServerConfigurationNamespace);
- updateReq.setQuery(BSON("_id" << ShardIdentityType::IdName));
- updateReq.setUpdates(updateObj);
- UpdateLifecycleImpl updateLifecycle(NamespaceString::kServerConfigurationNamespace);
- updateReq.setLifecycle(&updateLifecycle);
-
- try {
- AutoGetOrCreateDb autoDb(
- opCtx, NamespaceString::kServerConfigurationNamespace.db(), MODE_X);
-
- auto result = update(opCtx, autoDb.getDb(), updateReq);
- if (result.numMatched == 0) {
- warning() << "failed to update config string of shard identity document because "
- << "it does not exist. This shard could have been removed from the cluster";
- } else {
- LOG(2) << "Updated config server connection string in shardIdentity document to"
- << newConnectionString;
- }
- } catch (const DBException& exception) {
- return exception.toStatus();
- }
-
- return Status::OK();
+void ShardingState::clearForTests() {
+ _initializationState.store(static_cast<uint32_t>(InitializationState::kNew));
}
} // namespace mongo