/**
* Copyright (C) 2015 MongoDB Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License, version 3,
* as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see .
*
* As a special exception, the copyright holders give permission to link the
* code of portions of this program with the OpenSSL library under certain
* conditions as described in each individual source file and distribute
* linked combinations including the program with the OpenSSL library. You
* must comply with the GNU Affero General Public License in all respects
* for all of the code used other than as permitted herein. If you modify
* file(s) with this exception, you may extend this exception to your
* version of the file(s), but you are not obligated to do so. If you do not
* wish to do so, delete this exception statement from your version. If you
* delete this exception statement from all source files in the program,
* then also delete it in the license file.
*/
#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kSharding
#include "mongo/platform/basic.h"
#include "mongo/db/s/sharding_state.h"
#include "mongo/base/init.h"
#include "mongo/bson/util/bson_extract.h"
#include "mongo/client/connection_string.h"
#include "mongo/client/replica_set_monitor.h"
#include "mongo/db/auth/authorization_session.h"
#include "mongo/db/catalog/catalog_raii.h"
#include "mongo/db/client.h"
#include "mongo/db/dbhelpers.h"
#include "mongo/db/operation_context.h"
#include "mongo/db/ops/update.h"
#include "mongo/db/ops/update_lifecycle_impl.h"
#include "mongo/db/repl/optime.h"
#include "mongo/db/repl/replication_coordinator.h"
#include "mongo/db/s/operation_sharding_state.h"
#include "mongo/db/s/sharded_connection_info.h"
#include "mongo/db/s/sharding_initialization_mongod.h"
#include "mongo/db/s/sharding_statistics.h"
#include "mongo/db/s/type_shard_identity.h"
#include "mongo/executor/network_interface_factory.h"
#include "mongo/executor/network_interface_thread_pool.h"
#include "mongo/executor/task_executor_pool.h"
#include "mongo/rpc/metadata/config_server_metadata.h"
#include "mongo/rpc/metadata/metadata_hook.h"
#include "mongo/s/catalog/sharding_catalog_client.h"
#include "mongo/s/catalog/type_chunk.h"
#include "mongo/s/catalog_cache.h"
#include "mongo/s/chunk_version.h"
#include "mongo/s/client/shard_registry.h"
#include "mongo/s/client/sharding_network_connection_hook.h"
#include "mongo/s/grid.h"
#include "mongo/s/sharding_initialization.h"
#include "mongo/util/log.h"
#include "mongo/util/mongoutils/str.h"
namespace mongo {
using std::shared_ptr;
using std::string;
using std::vector;
using CallbackArgs = executor::TaskExecutor::CallbackArgs;
namespace {
const auto getShardingState = ServiceContext::declareDecoration();
/**
* Updates the config server field of the shardIdentity document with the given connection string
* if setName is equal to the config server replica set name.
*
* Note: This is intended to be used on a new thread that hasn't called Client::initThread.
* One example use case is for the ReplicaSetMonitor asynchronous callback when it detects changes
* to replica set membership.
*/
void updateShardIdentityConfigStringCB(const string& setName, const string& newConnectionString) {
auto configsvrConnStr = grid.shardRegistry()->getConfigServerConnectionString();
if (configsvrConnStr.getSetName() != setName) {
// Ignore all change notification for other sets that are not the config server.
return;
}
Client::initThread("updateShardIdentityConfigConnString");
auto uniqOpCtx = Client::getCurrent()->makeOperationContext();
auto status = ShardingState::get(uniqOpCtx.get())
->updateShardIdentityConfigString(uniqOpCtx.get(), newConnectionString);
if (!status.isOK() && !ErrorCodes::isNotMasterError(status.code())) {
warning() << "error encountered while trying to update config connection string to "
<< newConnectionString << causedBy(redact(status));
}
}
} // namespace
ShardingState::ShardingState()
: _chunkSplitter(stdx::make_unique()),
_initializationState(static_cast(InitializationState::kNew)),
_initializationStatus(Status(ErrorCodes::InternalError, "Uninitialized value")),
_globalInit(&initializeGlobalShardingStateForMongod) {}
ShardingState::~ShardingState() = default;
ShardingState* ShardingState::get(ServiceContext* serviceContext) {
return &getShardingState(serviceContext);
}
ShardingState* ShardingState::get(OperationContext* operationContext) {
return ShardingState::get(operationContext->getServiceContext());
}
bool ShardingState::enabled() const {
return _getInitializationState() == InitializationState::kInitialized;
}
void ShardingState::setEnabledForTest(const std::string& shardName) {
_setInitializationState(InitializationState::kInitialized);
_shardName = shardName;
}
Status ShardingState::canAcceptShardedCommands() const {
if (serverGlobalParams.clusterRole != ClusterRole::ShardServer) {
return {ErrorCodes::NoShardingEnabled,
"Cannot accept sharding commands if not started with --shardsvr"};
} else if (!enabled()) {
return {ErrorCodes::ShardingStateNotInitialized,
"Cannot accept sharding commands if sharding state has not "
"been initialized with a shardIdentity document"};
} else {
return Status::OK();
}
}
ConnectionString ShardingState::getConfigServer(OperationContext* opCtx) {
invariant(enabled());
stdx::lock_guard lk(_mutex);
return Grid::get(opCtx)->shardRegistry()->getConfigServerConnectionString();
}
string ShardingState::getShardName() {
invariant(enabled());
stdx::lock_guard lk(_mutex);
return _shardName;
}
void ShardingState::shutDown(OperationContext* opCtx) {
stdx::unique_lock lk(_mutex);
if (enabled()) {
Grid::get(opCtx)->getExecutorPool()->shutdownAndJoin();
Grid::get(opCtx)->catalogClient()->shutDown(opCtx);
}
}
Status ShardingState::updateConfigServerOpTimeFromMetadata(OperationContext* opCtx) {
if (!enabled()) {
// Nothing to do if sharding state has not been initialized.
return Status::OK();
}
boost::optional opTime = rpc::ConfigServerMetadata::get(opCtx).getOpTime();
if (opTime) {
if (!AuthorizationSession::get(opCtx->getClient())
->isAuthorizedForActionsOnResource(ResourcePattern::forClusterResource(),
ActionType::internal)) {
return Status(ErrorCodes::Unauthorized, "Unauthorized to update config opTime");
}
Grid::get(opCtx)->advanceConfigOpTime(*opTime);
}
return Status::OK();
}
ChunkSplitter* ShardingState::getChunkSplitter() {
return _chunkSplitter.get();
}
void ShardingState::initiateChunkSplitter() {
_chunkSplitter->initiateChunkSplitter();
}
void ShardingState::interruptChunkSplitter() {
_chunkSplitter->interruptChunkSplitter();
}
void ShardingState::setGlobalInitMethodForTest(GlobalInitFunc func) {
_globalInit = func;
}
Status ShardingState::onStaleShardVersion(OperationContext* opCtx,
const NamespaceString& nss,
const ChunkVersion& expectedVersion) {
invariant(!opCtx->getClient()->isInDirectClient());
invariant(!opCtx->lockState()->isLocked());
invariant(enabled());
LOG(2) << "metadata refresh requested for " << nss.ns() << " at shard version "
<< expectedVersion;
ShardingStatistics::get(opCtx).countStaleConfigErrors.addAndFetch(1);
// Ensure any ongoing migrations have completed
auto& oss = OperationShardingState::get(opCtx);
oss.waitForMigrationCriticalSectionSignal(opCtx);
const auto collectionShardVersion = [&] {
// Fast path - check if the requested version is at a higher version than the current
// metadata version or a different epoch before verifying against config server
AutoGetCollection autoColl(opCtx, nss, MODE_IS);
const auto currentMetadata = CollectionShardingState::get(opCtx, nss)->getMetadata();
if (currentMetadata) {
return currentMetadata->getShardVersion();
}
return ChunkVersion::UNSHARDED();
}();
if (collectionShardVersion.epoch() == expectedVersion.epoch() &&
collectionShardVersion >= expectedVersion) {
// Don't need to remotely reload if we're in the same epoch and the requested version is
// smaller than the one we know about. This means that the remote side is behind.
return Status::OK();
}
try {
_refreshMetadata(opCtx, nss);
return Status::OK();
} catch (const DBException& ex) {
log() << "Failed to refresh metadata for collection" << nss << causedBy(redact(ex));
return ex.toStatus();
}
}
Status ShardingState::refreshMetadataNow(OperationContext* opCtx,
const NamespaceString& nss,
ChunkVersion* latestShardVersion) {
try {
*latestShardVersion = _refreshMetadata(opCtx, nss);
return Status::OK();
} catch (const DBException& ex) {
return ex.toStatus();
}
}
// NOTE: This method will be called inside a database lock so it should never take any database
// locks, perform I/O, or any long running operations.
Status ShardingState::initializeFromShardIdentity(OperationContext* opCtx,
const ShardIdentityType& shardIdentity) {
invariant(serverGlobalParams.clusterRole == ClusterRole::ShardServer);
invariant(opCtx->lockState()->isLocked());
Status validationStatus = shardIdentity.validate();
if (!validationStatus.isOK()) {
return Status(
validationStatus.code(),
str::stream()
<< "Invalid shard identity document found when initializing sharding state: "
<< validationStatus.reason());
}
log() << "initializing sharding state with: " << shardIdentity;
stdx::unique_lock lk(_mutex);
auto configSvrConnStr = shardIdentity.getConfigsvrConnString();
if (enabled()) {
invariant(!_shardName.empty());
fassert(40372, _shardName == shardIdentity.getShardName());
auto prevConfigsvrConnStr =
Grid::get(opCtx)->shardRegistry()->getConfigServerConnectionString();
invariant(prevConfigsvrConnStr.type() == ConnectionString::SET);
fassert(40373, prevConfigsvrConnStr.getSetName() == configSvrConnStr.getSetName());
invariant(_clusterId.isSet());
fassert(40374, _clusterId == shardIdentity.getClusterId());
return Status::OK();
}
if (_getInitializationState() == InitializationState::kError) {
return {ErrorCodes::ManualInterventionRequired,
str::stream() << "Server's sharding metadata manager failed to initialize and will "
"remain in this state until the instance is manually reset"
<< causedBy(_initializationStatus)};
}
ShardedConnectionInfo::addHook(opCtx->getServiceContext());
try {
Status status = _globalInit(opCtx, configSvrConnStr, generateDistLockProcessId(opCtx));
if (status.isOK()) {
ReplicaSetMonitor::setSynchronousConfigChangeHook(
&ShardRegistry::replicaSetChangeShardRegistryUpdateHook);
ReplicaSetMonitor::setAsynchronousConfigChangeHook(&updateShardIdentityConfigStringCB);
// Determine primary/secondary/standalone state in order to properly initialize sharding
// components.
auto replCoord = repl::ReplicationCoordinator::get(opCtx);
bool isReplSet =
replCoord->getReplicationMode() == repl::ReplicationCoordinator::modeReplSet;
bool isStandaloneOrPrimary =
!isReplSet || (repl::ReplicationCoordinator::get(opCtx)->getMemberState() ==
repl::MemberState::RS_PRIMARY);
CatalogCacheLoader::get(opCtx).initializeReplicaSetRole(isStandaloneOrPrimary);
_chunkSplitter->setReplicaSetMode(isStandaloneOrPrimary);
log() << "initialized sharding components for "
<< (isStandaloneOrPrimary ? "primary" : "secondary") << " node.";
_setInitializationState(InitializationState::kInitialized);
} else {
log() << "failed to initialize sharding components" << causedBy(status);
_initializationStatus = status;
_setInitializationState(InitializationState::kError);
}
_shardName = shardIdentity.getShardName();
_clusterId = shardIdentity.getClusterId();
return status;
} catch (const DBException& ex) {
auto errorStatus = ex.toStatus();
_initializationStatus = errorStatus;
_setInitializationState(InitializationState::kError);
return errorStatus;
}
MONGO_UNREACHABLE;
}
ShardingState::InitializationState ShardingState::_getInitializationState() const {
return static_cast(_initializationState.load());
}
void ShardingState::_setInitializationState(InitializationState newState) {
_initializationState.store(static_cast(newState));
}
StatusWith ShardingState::initializeShardingAwarenessIfNeeded(OperationContext* opCtx) {
invariant(!opCtx->lockState()->isLocked());
// In sharded readOnly mode, we ignore the shardIdentity document on disk and instead *require*
// a shardIdentity document to be passed through --overrideShardIdentity.
if (storageGlobalParams.readOnly) {
if (serverGlobalParams.clusterRole == ClusterRole::ShardServer) {
if (serverGlobalParams.overrideShardIdentity.isEmpty()) {
return {ErrorCodes::InvalidOptions,
"If started with --shardsvr in queryableBackupMode, a shardIdentity "
"document must be provided through --overrideShardIdentity"};
}
auto swOverrideShardIdentity =
ShardIdentityType::fromBSON(serverGlobalParams.overrideShardIdentity);
if (!swOverrideShardIdentity.isOK()) {
return swOverrideShardIdentity.getStatus();
}
{
// Global lock is required to call initializeFromShardIdenetity().
Lock::GlobalWrite lk(opCtx);
auto status =
initializeFromShardIdentity(opCtx, swOverrideShardIdentity.getValue());
if (!status.isOK()) {
return status;
}
}
return true;
} else {
// Error if --overrideShardIdentity is used but *not* started with --shardsvr.
if (!serverGlobalParams.overrideShardIdentity.isEmpty()) {
return {
ErrorCodes::InvalidOptions,
str::stream()
<< "Not started with --shardsvr, but a shardIdentity document was provided "
"through --overrideShardIdentity: "
<< serverGlobalParams.overrideShardIdentity};
}
return false;
}
}
// In sharded *non*-readOnly mode, error if --overrideShardIdentity is provided. Use the
// shardIdentity document on disk if one exists, but it is okay if no shardIdentity document is
// provided at all (sharding awareness will be initialized when a shardIdentity document is
// inserted).
else {
if (!serverGlobalParams.overrideShardIdentity.isEmpty()) {
return {
ErrorCodes::InvalidOptions,
str::stream() << "--overrideShardIdentity is only allowed in sharded "
"queryableBackupMode. If not in queryableBackupMode, you can edit "
"the shardIdentity document by starting the server *without* "
"--shardsvr, manually updating the shardIdentity document in the "
<< NamespaceString::kServerConfigurationNamespace.toString()
<< " collection, and restarting the server with --shardsvr."};
}
// Load the shardIdentity document from disk.
BSONObj shardIdentityBSON;
bool foundShardIdentity = false;
try {
AutoGetCollection autoColl(
opCtx, NamespaceString::kServerConfigurationNamespace, MODE_IS);
foundShardIdentity = Helpers::findOne(opCtx,
autoColl.getCollection(),
BSON("_id" << ShardIdentityType::IdName),
shardIdentityBSON);
} catch (const DBException& ex) {
return ex.toStatus();
}
if (serverGlobalParams.clusterRole == ClusterRole::ShardServer) {
if (!foundShardIdentity) {
warning() << "Started with --shardsvr, but no shardIdentity document was found on "
"disk in "
<< NamespaceString::kServerConfigurationNamespace
<< ". This most likely means this server has not yet been added to a "
"sharded cluster.";
return false;
}
invariant(!shardIdentityBSON.isEmpty());
auto swShardIdentity = ShardIdentityType::fromBSON(shardIdentityBSON);
if (!swShardIdentity.isOK()) {
return swShardIdentity.getStatus();
}
{
// Global lock is required to call initializeFromShardIdenetity().
Lock::GlobalWrite lk(opCtx);
auto status = initializeFromShardIdentity(opCtx, swShardIdentity.getValue());
if (!status.isOK()) {
return status;
}
}
return true;
} else {
// Warn if a shardIdentity document is found on disk but *not* started with --shardsvr.
if (!shardIdentityBSON.isEmpty()) {
warning() << "Not started with --shardsvr, but a shardIdentity document was found "
"on disk in "
<< NamespaceString::kServerConfigurationNamespace << ": "
<< shardIdentityBSON;
}
return false;
}
}
}
ChunkVersion ShardingState::_refreshMetadata(OperationContext* opCtx, const NamespaceString& nss) {
invariant(!opCtx->lockState()->isLocked());
invariant(enabled());
const ShardId shardId = getShardName();
uassert(ErrorCodes::NotYetInitialized,
str::stream() << "Cannot refresh metadata for " << nss.ns()
<< " before shard name has been set",
shardId.isValid());
const auto routingInfo = uassertStatusOK(
Grid::get(opCtx)->catalogCache()->getCollectionRoutingInfoWithRefresh(opCtx, nss));
const auto cm = routingInfo.cm();
if (!cm) {
// No chunk manager, so unsharded.
// Exclusive collection lock needed since we're now changing the metadata
AutoGetCollection autoColl(opCtx, nss, MODE_IX, MODE_X);
auto css = CollectionShardingState::get(opCtx, nss);
css->refreshMetadata(opCtx, nullptr);
return ChunkVersion::UNSHARDED();
}
{
AutoGetCollection autoColl(opCtx, nss, MODE_IS);
auto css = CollectionShardingState::get(opCtx, nss);
// We already have newer version
if (css->getMetadata() &&
css->getMetadata()->getCollVersion().epoch() == cm->getVersion().epoch() &&
css->getMetadata()->getCollVersion() >= cm->getVersion()) {
LOG(1) << "Skipping refresh of metadata for " << nss << " "
<< css->getMetadata()->getCollVersion() << " with an older " << cm->getVersion();
return css->getMetadata()->getShardVersion();
}
}
// Exclusive collection lock needed since we're now changing the metadata
AutoGetCollection autoColl(opCtx, nss, MODE_IX, MODE_X);
auto css = CollectionShardingState::get(opCtx, nss);
// We already have newer version
if (css->getMetadata() &&
css->getMetadata()->getCollVersion().epoch() == cm->getVersion().epoch() &&
css->getMetadata()->getCollVersion() >= cm->getVersion()) {
LOG(1) << "Skipping refresh of metadata for " << nss << " "
<< css->getMetadata()->getCollVersion() << " with an older " << cm->getVersion();
return css->getMetadata()->getShardVersion();
}
std::unique_ptr newCollectionMetadata =
stdx::make_unique(cm, shardId);
css->refreshMetadata(opCtx, std::move(newCollectionMetadata));
return css->getMetadata()->getShardVersion();
}
StatusWith ShardingState::registerDonateChunk(
const MoveChunkRequest& args) {
return _activeMigrationsRegistry.registerDonateChunk(args);
}
StatusWith ShardingState::registerReceiveChunk(
const NamespaceString& nss, const ChunkRange& chunkRange, const ShardId& fromShardId) {
return _activeMigrationsRegistry.registerReceiveChunk(nss, chunkRange, fromShardId);
}
boost::optional ShardingState::getActiveDonateChunkNss() {
return _activeMigrationsRegistry.getActiveDonateChunkNss();
}
BSONObj ShardingState::getActiveMigrationStatusReport(OperationContext* opCtx) {
return _activeMigrationsRegistry.getActiveMigrationStatusReport(opCtx);
}
void ShardingState::appendInfo(OperationContext* opCtx, BSONObjBuilder& builder) {
const bool isEnabled = enabled();
builder.appendBool("enabled", isEnabled);
if (!isEnabled)
return;
stdx::lock_guard lk(_mutex);
builder.append("configServer",
Grid::get(opCtx)->shardRegistry()->getConfigServerConnectionString().toString());
builder.append("shardName", _shardName);
builder.append("clusterId", _clusterId);
}
bool ShardingState::needCollectionMetadata(OperationContext* opCtx, const string& ns) {
if (!enabled())
return false;
Client* client = opCtx->getClient();
// Shard version information received from mongos may either by attached to the Client or
// directly to the OperationContext.
return ShardedConnectionInfo::get(client, false) ||
OperationShardingState::get(opCtx).hasShardVersion();
}
Status ShardingState::updateShardIdentityConfigString(OperationContext* opCtx,
const std::string& newConnectionString) {
BSONObj updateObj(ShardIdentityType::createConfigServerUpdateObject(newConnectionString));
UpdateRequest updateReq(NamespaceString::kServerConfigurationNamespace);
updateReq.setQuery(BSON("_id" << ShardIdentityType::IdName));
updateReq.setUpdates(updateObj);
UpdateLifecycleImpl updateLifecycle(NamespaceString::kServerConfigurationNamespace);
updateReq.setLifecycle(&updateLifecycle);
try {
AutoGetOrCreateDb autoDb(
opCtx, NamespaceString::kServerConfigurationNamespace.db(), MODE_X);
auto result = update(opCtx, autoDb.getDb(), updateReq);
if (result.numMatched == 0) {
warning() << "failed to update config string of shard identity document because "
<< "it does not exist. This shard could have been removed from the cluster";
} else {
LOG(2) << "Updated config server connection string in shardIdentity document to"
<< newConnectionString;
}
} catch (const DBException& exception) {
return exception.toStatus();
}
return Status::OK();
}
executor::TaskExecutor* ShardingState::getRangeDeleterTaskExecutor() {
stdx::lock_guard lk(_rangeDeleterExecutor.lock);
if (_rangeDeleterExecutor.taskExecutor.get() == nullptr) {
static const char kExecName[] = "NetworkInterfaceCollectionRangeDeleter-TaskExecutor";
auto net = executor::makeNetworkInterface(kExecName);
auto pool = stdx::make_unique(net.get());
_rangeDeleterExecutor.taskExecutor =
stdx::make_unique(std::move(pool), std::move(net));
_rangeDeleterExecutor.taskExecutor->startup();
}
return _rangeDeleterExecutor.taskExecutor.get();
}
ShardingState::RangeDeleterExecutor::~RangeDeleterExecutor() {
if (taskExecutor) {
taskExecutor->shutdown();
taskExecutor->join();
}
}
} // namespace mongo