/**
* Copyright (C) 2015 MongoDB Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License, version 3,
* as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see .
*
* As a special exception, the copyright holders give permission to link the
* code of portions of this program with the OpenSSL library under certain
* conditions as described in each individual source file and distribute
* linked combinations including the program with the OpenSSL library. You
* must comply with the GNU Affero General Public License in all respects
* for all of the code used other than as permitted herein. If you modify
* file(s) with this exception, you may extend this exception to your
* version of the file(s), but you are not obligated to do so. If you do not
* wish to do so, delete this exception statement from your version. If you
* delete this exception statement from all source files in the program,
* then also delete it in the license file.
*/
#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kSharding
#include "mongo/platform/basic.h"
#include "mongo/db/s/sharding_state.h"
#include "mongo/bson/util/bson_extract.h"
#include "mongo/client/connection_string.h"
#include "mongo/client/replica_set_monitor.h"
#include "mongo/db/auth/authorization_session.h"
#include "mongo/db/client.h"
#include "mongo/db/db_raii.h"
#include "mongo/db/dbhelpers.h"
#include "mongo/db/operation_context.h"
#include "mongo/db/ops/update.h"
#include "mongo/db/ops/update_lifecycle_impl.h"
#include "mongo/db/repl/optime.h"
#include "mongo/db/repl/replication_coordinator_global.h"
#include "mongo/db/s/collection_metadata.h"
#include "mongo/db/s/collection_sharding_state.h"
#include "mongo/db/s/metadata_loader.h"
#include "mongo/db/s/operation_sharding_state.h"
#include "mongo/db/s/sharded_connection_info.h"
#include "mongo/db/s/sharding_initialization_mongod.h"
#include "mongo/db/s/type_shard_identity.h"
#include "mongo/executor/network_interface_factory.h"
#include "mongo/executor/network_interface_thread_pool.h"
#include "mongo/executor/task_executor_pool.h"
#include "mongo/rpc/metadata/config_server_metadata.h"
#include "mongo/rpc/metadata/metadata_hook.h"
#include "mongo/s/catalog/sharding_catalog_client.h"
#include "mongo/s/catalog/type_chunk.h"
#include "mongo/s/chunk_version.h"
#include "mongo/s/client/shard_registry.h"
#include "mongo/s/client/sharding_network_connection_hook.h"
#include "mongo/s/config.h"
#include "mongo/s/grid.h"
#include "mongo/s/sharding_initialization.h"
#include "mongo/util/log.h"
#include "mongo/util/mongoutils/str.h"
#include
#include
#include
#include
namespace mongo {
using std::shared_ptr;
using std::string;
using std::vector;
using CallbackArgs = executor::TaskExecutor::CallbackArgs;
namespace {
const auto getShardingState = ServiceContext::declareDecoration();
// Max number of concurrent config server refresh threads
const int kMaxConfigServerRefreshThreads = 3;
// Maximum number of times to try to refresh the collection metadata if conflicts are occurring
const int kMaxNumMetadataRefreshAttempts = 3;
/**
* Updates the config server field of the shardIdentity document with the given connection string
* if setName is equal to the config server replica set name.
*
* Note: This is intended to be used on a new thread that hasn't called Client::initThread.
* One example use case is for the ReplicaSetMonitor asynchronous callback when it detects changes
* to replica set membership.
*/
void updateShardIdentityConfigStringCB(const string& setName, const string& newConnectionString) {
auto configsvrConnStr = grid.shardRegistry()->getConfigServerConnectionString();
if (configsvrConnStr.getSetName() != setName) {
// Ignore all change notification for other sets that are not the config server.
return;
}
Client::initThread("updateShardIdentityConfigConnString");
auto uniqOpCtx = getGlobalServiceContext()->makeOperationContext(&cc());
auto status = ShardingState::get(uniqOpCtx.get())
->updateShardIdentityConfigString(uniqOpCtx.get(), newConnectionString);
if (!status.isOK() && !ErrorCodes::isNotMasterError(status.code())) {
warning() << "error encountered while trying to update config connection string to "
<< newConnectionString << causedBy(status);
}
}
} // namespace
ShardingState::ShardingState()
: _initializationState(static_cast(InitializationState::kNew)),
_initializationStatus(Status(ErrorCodes::InternalError, "Uninitialized value")),
_configServerTickets(kMaxConfigServerRefreshThreads),
_globalInit(&initializeGlobalShardingStateForMongod),
_scheduleWorkFn([](NamespaceString nss) {}) {}
ShardingState::~ShardingState() = default;
ShardingState* ShardingState::get(ServiceContext* serviceContext) {
return &getShardingState(serviceContext);
}
ShardingState* ShardingState::get(OperationContext* operationContext) {
return ShardingState::get(operationContext->getServiceContext());
}
bool ShardingState::enabled() const {
return _getInitializationState() == InitializationState::kInitialized;
}
ConnectionString ShardingState::getConfigServer(OperationContext* txn) {
invariant(enabled());
stdx::lock_guard lk(_mutex);
return grid.shardRegistry()->getConfigServerConnectionString();
}
string ShardingState::getShardName() {
invariant(enabled());
stdx::lock_guard lk(_mutex);
return _shardName;
}
void ShardingState::shutDown(OperationContext* txn) {
bool mustEnterShutdownState = false;
{
stdx::unique_lock lk(_mutex);
while (_getInitializationState() == InitializationState::kInitializing) {
_initializationFinishedCondition.wait(lk);
}
if (_getInitializationState() == InitializationState::kNew) {
_setInitializationState_inlock(InitializationState::kInitializing);
mustEnterShutdownState = true;
}
}
// Initialization completion must be signalled outside of the mutex
if (mustEnterShutdownState) {
_signalInitializationComplete(
Status(ErrorCodes::ShutdownInProgress,
"Sharding state unavailable because the system is shutting down"));
}
if (_getInitializationState() == InitializationState::kInitialized) {
grid.getExecutorPool()->shutdownAndJoin();
grid.catalogClient(txn)->shutDown(txn);
}
}
Status ShardingState::updateConfigServerOpTimeFromMetadata(OperationContext* txn) {
if (serverGlobalParams.clusterRole == ClusterRole::ConfigServer) {
// Nothing to do if we're a config server ourselves.
return Status::OK();
}
boost::optional opTime = rpc::ConfigServerMetadata::get(txn).getOpTime();
if (opTime) {
if (!AuthorizationSession::get(txn->getClient())
->isAuthorizedForActionsOnResource(ResourcePattern::forClusterResource(),
ActionType::internal)) {
return Status(ErrorCodes::Unauthorized, "Unauthorized to update config opTime");
}
grid.advanceConfigOpTime(*opTime);
}
return Status::OK();
}
void ShardingState::setShardName(const string& name) {
const string clientAddr = cc().clientAddress(true);
stdx::lock_guard lk(_mutex);
if (_shardName.empty()) {
// TODO SERVER-2299 remotely verify the name is sound w.r.t IPs
_shardName = name;
log() << "remote client " << clientAddr << " initialized this host as shard " << name;
return;
}
if (_shardName != name) {
const string message = str::stream()
<< "remote client " << clientAddr << " tried to initialize this host as shard " << name
<< ", but shard name was previously initialized as " << _shardName;
warning() << message;
uassertStatusOK({ErrorCodes::AlreadyInitialized, message});
}
}
CollectionShardingState* ShardingState::getNS(const std::string& ns, OperationContext* txn) {
stdx::lock_guard lk(_mutex);
CollectionShardingStateMap::iterator it = _collections.find(ns);
if (it == _collections.end()) {
auto inserted =
_collections.insert(make_pair(ns,
stdx::make_unique(
txn->getServiceContext(), NamespaceString(ns))));
invariant(inserted.second);
it = std::move(inserted.first);
}
return it->second.get();
}
void ShardingState::clearCollectionMetadata() {
stdx::lock_guard lk(_mutex);
_collections.clear();
}
void ShardingState::resetMetadata(const string& ns) {
stdx::lock_guard lk(_mutex);
warning() << "resetting metadata for " << ns << ", this should only be used in testing";
_collections.erase(ns);
}
void ShardingState::setGlobalInitMethodForTest(GlobalInitFunc func) {
_globalInit = func;
}
void ShardingState::setScheduleCleanupFunctionForTest(RangeDeleterCleanupNotificationFunc fn) {
_scheduleWorkFn = fn;
}
void ShardingState::scheduleCleanup(const NamespaceString& nss) {
_scheduleWorkFn(nss);
}
Status ShardingState::onStaleShardVersion(OperationContext* txn,
const NamespaceString& nss,
const ChunkVersion& expectedVersion) {
invariant(!txn->lockState()->isLocked());
invariant(enabled());
LOG(2) << "metadata refresh requested for " << nss.ns() << " at shard version "
<< expectedVersion;
// Ensure any ongoing migrations have completed
auto& oss = OperationShardingState::get(txn);
oss.waitForMigrationCriticalSectionSignal(txn);
ChunkVersion collectionShardVersion;
// Fast path - check if the requested version is at a higher version than the current metadata
// version or a different epoch before verifying against config server.
ScopedCollectionMetadata currentMetadata;
{
AutoGetCollection autoColl(txn, nss, MODE_IS);
currentMetadata = CollectionShardingState::get(txn, nss)->getMetadata();
if (currentMetadata) {
collectionShardVersion = currentMetadata->getShardVersion();
}
if (collectionShardVersion.epoch() == expectedVersion.epoch() &&
collectionShardVersion >= expectedVersion) {
// Don't need to remotely reload if we're in the same epoch and the requested version is
// smaller than the one we know about. This means that the remote side is behind.
return Status::OK();
}
}
// At the first attempt try to use the currently loaded metadata and on subsequent attempts use
// the complete metadata
int numRefreshAttempts = 0;
while (true) {
numRefreshAttempts++;
auto refreshStatusAndVersion =
_refreshMetadata(txn, nss, (currentMetadata ? currentMetadata.getMetadata() : nullptr));
if (refreshStatusAndVersion.isOK()) {
LOG(1) << "Successfully refreshed metadata for " << nss.ns() << " to "
<< refreshStatusAndVersion.getValue();
return Status::OK();
}
if (refreshStatusAndVersion == ErrorCodes::RemoteChangeDetected &&
numRefreshAttempts < kMaxNumMetadataRefreshAttempts) {
currentMetadata = ScopedCollectionMetadata();
log() << "Refresh failed and will be retried as full reload "
<< refreshStatusAndVersion.getStatus();
continue;
}
return refreshStatusAndVersion.getStatus();
}
MONGO_UNREACHABLE;
}
Status ShardingState::refreshMetadataNow(OperationContext* txn,
const string& ns,
ChunkVersion* latestShardVersion) {
auto refreshLatestShardVersionStatus = _refreshMetadata(txn, NamespaceString(ns), nullptr);
if (!refreshLatestShardVersionStatus.isOK()) {
return refreshLatestShardVersionStatus.getStatus();
}
*latestShardVersion = refreshLatestShardVersionStatus.getValue();
return Status::OK();
}
void ShardingState::initializeFromConfigConnString(OperationContext* txn, const string& configSvr) {
{
stdx::lock_guard lk(_mutex);
if (_getInitializationState() == InitializationState::kNew) {
uassert(18509,
"Unable to obtain host name during sharding initialization.",
!getHostName().empty());
ConnectionString configSvrConnStr = uassertStatusOK(ConnectionString::parse(configSvr));
_setInitializationState_inlock(InitializationState::kInitializing);
stdx::thread thread([this, configSvrConnStr] { _initializeImpl(configSvrConnStr); });
thread.detach();
}
}
uassertStatusOK(_waitForInitialization(txn->getDeadline()));
uassertStatusOK(reloadShardRegistryUntilSuccess(txn));
uassertStatusOK(updateConfigServerOpTimeFromMetadata(txn));
}
// NOTE: This method can be called inside a database lock so it should never take any database
// locks, perform I/O, or any long running operations.
Status ShardingState::initializeFromShardIdentity(OperationContext* txn,
const ShardIdentityType& shardIdentity) {
invariant(serverGlobalParams.clusterRole == ClusterRole::ShardServer);
Status validationStatus = shardIdentity.validate();
if (!validationStatus.isOK()) {
return Status(
validationStatus.code(),
str::stream()
<< "Invalid shard identity document found when initializing sharding state: "
<< validationStatus.reason());
}
log() << "initializing sharding state with: " << shardIdentity;
stdx::unique_lock lk(_mutex);
// TODO: remove after v3.4.
// This is for backwards compatibility with old style initialization through metadata
// commands/setShardVersion, which can happen concurrently with an insert of a
// shardIdentity document to admin.system.version.
if (_getInitializationState() == InitializationState::kInitializing) {
auto waitStatus = _waitForInitialization_inlock(Date_t::max(), lk);
if (!waitStatus.isOK()) {
return waitStatus;
}
}
if (_getInitializationState() == InitializationState::kError) {
return {ErrorCodes::ManualInterventionRequired,
str::stream() << "Server's sharding metadata manager failed to initialize and will "
"remain in this state until the instance is manually reset"
<< causedBy(_initializationStatus)};
}
auto configSvrConnStr = shardIdentity.getConfigsvrConnString();
// TODO: remove after v3.4.
// This is for backwards compatibility with old style initialization through metadata
// commands/setShardVersion, which sets the shardName and configsvrConnectionString.
if (_getInitializationState() == InitializationState::kInitialized) {
if (_shardName != shardIdentity.getShardName()) {
return {ErrorCodes::InconsistentShardIdentity,
str::stream() << "shard name previously set as " << _shardName
<< " is different from stored: "
<< shardIdentity.getShardName()};
}
auto prevConfigsvrConnStr = grid.shardRegistry()->getConfigServerConnectionString();
if (prevConfigsvrConnStr.type() != ConnectionString::SET) {
return {ErrorCodes::UnsupportedFormat,
str::stream() << "config server connection string was previously initialized as"
" something that is not a replica set: "
<< prevConfigsvrConnStr.toString()};
}
if (prevConfigsvrConnStr.getSetName() != configSvrConnStr.getSetName()) {
return {ErrorCodes::InconsistentShardIdentity,
str::stream() << "config server connection string previously set as "
<< prevConfigsvrConnStr.toString()
<< " is different from stored: "
<< configSvrConnStr.toString()};
}
// The clusterId will only be unset if sharding state was initialized via the sharding
// metadata commands.
if (!_clusterId.isSet()) {
_clusterId = shardIdentity.getClusterId();
} else if (_clusterId != shardIdentity.getClusterId()) {
return {ErrorCodes::InconsistentShardIdentity,
str::stream() << "cluster id previously set as " << _clusterId
<< " is different from stored: "
<< shardIdentity.getClusterId()};
}
return Status::OK();
}
if (_getInitializationState() == InitializationState::kNew) {
ShardedConnectionInfo::addHook();
try {
Status status = _globalInit(txn, configSvrConnStr, generateDistLockProcessId(txn));
// TODO: remove after v3.4.
// This is for backwards compatibility with old style initialization through metadata
// commands/setShardVersion, which can happen concurrently with an insert of a
// shardIdentity document to admin.system.version.
if (status.isOK()) {
_setInitializationState_inlock(InitializationState::kInitialized);
ReplicaSetMonitor::setSynchronousConfigChangeHook(
&ConfigServer::replicaSetChangeShardRegistryUpdateHook);
ReplicaSetMonitor::setAsynchronousConfigChangeHook(
&updateShardIdentityConfigStringCB);
} else {
_initializationStatus = status;
_setInitializationState_inlock(InitializationState::kError);
}
_shardName = shardIdentity.getShardName();
_clusterId = shardIdentity.getClusterId();
_initializeRangeDeleterTaskExecutor();
return status;
} catch (const DBException& ex) {
auto errorStatus = ex.toStatus();
_setInitializationState_inlock(InitializationState::kError);
_initializationStatus = errorStatus;
return errorStatus;
}
}
MONGO_UNREACHABLE;
}
void ShardingState::_initializeImpl(ConnectionString configSvr) {
Client::initThread("ShardingState initialization");
auto txn = cc().makeOperationContext();
// Do this initialization outside of the lock, since we are already protected by having entered
// the kInitializing state.
ShardedConnectionInfo::addHook();
try {
Status status = _globalInit(txn.get(), configSvr, generateDistLockProcessId(txn.get()));
if (status.isOK()) {
ReplicaSetMonitor::setSynchronousConfigChangeHook(
&ConfigServer::replicaSetChangeShardRegistryUpdateHook);
ReplicaSetMonitor::setAsynchronousConfigChangeHook(&updateShardIdentityConfigStringCB);
_initializeRangeDeleterTaskExecutor();
}
_signalInitializationComplete(status);
} catch (const DBException& ex) {
_signalInitializationComplete(ex.toStatus());
}
}
Status ShardingState::_waitForInitialization(Date_t deadline) {
if (enabled())
return Status::OK();
stdx::unique_lock lk(_mutex);
return _waitForInitialization_inlock(deadline, lk);
}
Status ShardingState::_waitForInitialization_inlock(Date_t deadline,
stdx::unique_lock& lk) {
{
while (_getInitializationState() == InitializationState::kInitializing ||
_getInitializationState() == InitializationState::kNew) {
if (deadline == Date_t::max()) {
_initializationFinishedCondition.wait(lk);
} else if (stdx::cv_status::timeout ==
_initializationFinishedCondition.wait_until(lk,
deadline.toSystemTimePoint())) {
return Status(ErrorCodes::ExceededTimeLimit,
"Initializing sharding state exceeded time limit");
}
}
}
auto initializationState = _getInitializationState();
if (initializationState == InitializationState::kInitialized) {
fassertStatusOK(34349, _initializationStatus);
return Status::OK();
}
if (initializationState == InitializationState::kError) {
return Status(ErrorCodes::ManualInterventionRequired,
str::stream()
<< "Server's sharding metadata manager failed to initialize and will "
"remain in this state until the instance is manually reset"
<< causedBy(_initializationStatus));
}
MONGO_UNREACHABLE;
}
ShardingState::InitializationState ShardingState::_getInitializationState() const {
return static_cast(_initializationState.load());
}
void ShardingState::_setInitializationState_inlock(InitializationState newState) {
_initializationState.store(static_cast(newState));
}
void ShardingState::_signalInitializationComplete(Status status) {
stdx::lock_guard lk(_mutex);
invariant(_getInitializationState() == InitializationState::kInitializing);
if (!status.isOK()) {
_initializationStatus = status;
_setInitializationState_inlock(InitializationState::kError);
} else {
_initializationStatus = Status::OK();
_setInitializationState_inlock(InitializationState::kInitialized);
}
_initializationFinishedCondition.notify_all();
}
Status ShardingState::initializeShardingAwarenessIfNeeded(OperationContext* txn) {
// In sharded readOnly mode, we ignore the shardIdentity document on disk and instead *require*
// a shardIdentity document to be passed through --overrideShardIdentity.
if (storageGlobalParams.readOnly) {
if (serverGlobalParams.clusterRole == ClusterRole::ShardServer) {
if (serverGlobalParams.overrideShardIdentity.isEmpty()) {
return {ErrorCodes::InvalidOptions,
"If started with --shardsvr in queryableBackupMode, a shardIdentity "
"document must be provided through --overrideShardIdentity"};
}
auto swOverrideShardIdentity =
ShardIdentityType::fromBSON(serverGlobalParams.overrideShardIdentity);
if (!swOverrideShardIdentity.isOK()) {
return swOverrideShardIdentity.getStatus();
}
auto status = initializeFromShardIdentity(txn, swOverrideShardIdentity.getValue());
if (!status.isOK()) {
return status;
}
return reloadShardRegistryUntilSuccess(txn);
} else {
// Error if --overrideShardIdentity is used but *not* started with --shardsvr.
if (!serverGlobalParams.overrideShardIdentity.isEmpty()) {
return {
ErrorCodes::InvalidOptions,
str::stream()
<< "Not started with --shardsvr, but a shardIdentity document was provided "
"through --overrideShardIdentity: "
<< serverGlobalParams.overrideShardIdentity};
}
return Status::OK();
}
}
// In sharded *non*-readOnly mode, error if --overrideShardIdentity is provided. Use the
// shardIdentity document on disk if one exists, but it is okay if no shardIdentity document is
// provided at all (sharding awareness will be initialized when a shardIdentity document is
// inserted).
else {
if (!serverGlobalParams.overrideShardIdentity.isEmpty()) {
return {
ErrorCodes::InvalidOptions,
str::stream() << "--overrideShardIdentity is only allowed in sharded "
"queryableBackupMode. If not in queryableBackupMode, you can edit "
"the shardIdentity document by starting the server *without* "
"--shardsvr, manually updating the shardIdentity document in the "
<< NamespaceString::kConfigCollectionNamespace.toString()
<< " collection, and restarting the server with --shardsvr."};
}
// Load the shardIdentity document from disk.
invariant(!txn->lockState()->isLocked());
BSONObj shardIdentityBSON;
try {
AutoGetCollection autoColl(txn, NamespaceString::kConfigCollectionNamespace, MODE_IS);
Helpers::findOne(txn,
autoColl.getCollection(),
BSON("_id" << ShardIdentityType::IdName),
shardIdentityBSON);
} catch (const DBException& ex) {
return ex.toStatus();
}
if (serverGlobalParams.clusterRole == ClusterRole::ShardServer) {
if (shardIdentityBSON.isEmpty()) {
warning() << "Started with --shardsvr, but no shardIdentity document was found on "
"disk in "
<< NamespaceString::kConfigCollectionNamespace
<< ". This most likely means this server has not yet been added to a "
"sharded cluster.";
return Status::OK();
}
auto swShardIdentity = ShardIdentityType::fromBSON(shardIdentityBSON);
if (!swShardIdentity.isOK()) {
return swShardIdentity.getStatus();
}
auto status = initializeFromShardIdentity(txn, swShardIdentity.getValue());
if (!status.isOK()) {
return status;
}
return reloadShardRegistryUntilSuccess(txn);
} else {
// Warn if a shardIdentity document is found on disk but *not* started with --shardsvr.
if (!shardIdentityBSON.isEmpty()) {
warning() << "Not started with --shardsvr, but a shardIdentity document was found "
"on disk in "
<< NamespaceString::kConfigCollectionNamespace << ": "
<< shardIdentityBSON;
}
return Status::OK();
}
}
}
StatusWith ShardingState::_refreshMetadata(
OperationContext* txn, const NamespaceString& nss, const CollectionMetadata* metadataForDiff) {
invariant(!txn->lockState()->isLocked());
{
Status status = _waitForInitialization(txn->getDeadline());
if (!status.isOK())
return status;
}
// We can't reload if a shard name has not yet been set
{
stdx::lock_guard lk(_mutex);
if (_shardName.empty()) {
string errMsg = str::stream() << "cannot refresh metadata for " << nss.ns()
<< " before shard name has been set";
warning() << errMsg;
return {ErrorCodes::NotYetInitialized, errMsg};
}
}
// The _configServerTickets serializes this process such that only a small number of threads
// can try to refresh at the same time
_configServerTickets.waitForTicket();
TicketHolderReleaser needTicketFrom(&_configServerTickets);
LOG(1) << "Remotely refreshing metadata for " << nss.ns() << ", based on collection version "
<< (metadataForDiff ? metadataForDiff->getCollVersion().toString() : "(empty)");
std::unique_ptr remoteMetadata(stdx::make_unique());
{
Timer refreshTimer;
MetadataLoader mdLoader;
Status status = mdLoader.makeCollectionMetadata(txn,
grid.catalogClient(txn),
nss.ns(),
getShardName(),
metadataForDiff,
remoteMetadata.get());
if (status.code() == ErrorCodes::NamespaceNotFound) {
remoteMetadata.reset();
} else if (!status.isOK()) {
warning() << "Could not remotely refresh metadata for " << nss.ns()
<< causedBy(status.reason());
return status;
}
}
// Exclusive collection lock needed since we're now potentially changing the metadata, and
// don't want reads/writes to be ongoing
ScopedTransaction transaction(txn, MODE_IX);
AutoGetCollection autoColl(txn, nss, MODE_IX, MODE_X);
auto css = CollectionShardingState::get(txn, nss);
// Resolve newer pending chunks with the remote metadata, finish construction
css->refreshMetadata(txn, std::move(remoteMetadata));
auto metadata = css->getMetadata();
return (metadata ? metadata->getShardVersion() : ChunkVersion::UNSHARDED());
}
StatusWith ShardingState::registerMigration(const MoveChunkRequest& args) {
if (_migrationDestManager.isActive()) {
return {
ErrorCodes::ConflictingOperationInProgress,
str::stream()
<< "Unable start new migration because this shard is currently receiving a chunk"};
}
return _activeMigrationsRegistry.registerMigration(args);
}
boost::optional ShardingState::getActiveMigrationNss() {
return _activeMigrationsRegistry.getActiveMigrationNss();
}
void ShardingState::appendInfo(OperationContext* txn, BSONObjBuilder& builder) {
const bool isEnabled = enabled();
builder.appendBool("enabled", isEnabled);
if (!isEnabled)
return;
stdx::lock_guard lk(_mutex);
builder.append("configServer",
grid.shardRegistry()->getConfigServerConnectionString().toString());
builder.append("shardName", _shardName);
builder.append("clusterId", _clusterId);
BSONObjBuilder versionB(builder.subobjStart("versions"));
for (CollectionShardingStateMap::const_iterator it = _collections.begin();
it != _collections.end();
++it) {
ScopedCollectionMetadata metadata = it->second->getMetadata();
if (metadata) {
versionB.appendTimestamp(it->first, metadata->getShardVersion().toLong());
} else {
versionB.appendTimestamp(it->first, ChunkVersion::UNSHARDED().toLong());
}
}
versionB.done();
}
bool ShardingState::needCollectionMetadata(OperationContext* txn, const string& ns) {
if (!enabled())
return false;
Client* client = txn->getClient();
// Shard version information received from mongos may either by attached to the Client or
// directly to the OperationContext.
return ShardedConnectionInfo::get(client, false) ||
OperationShardingState::get(txn).hasShardVersion();
}
Status ShardingState::updateShardIdentityConfigString(OperationContext* txn,
const std::string& newConnectionString) {
BSONObj updateObj(ShardIdentityType::createConfigServerUpdateObject(newConnectionString));
UpdateRequest updateReq(NamespaceString::kConfigCollectionNamespace);
updateReq.setQuery(BSON("_id" << ShardIdentityType::IdName));
updateReq.setUpdates(updateObj);
UpdateLifecycleImpl updateLifecycle(NamespaceString::kConfigCollectionNamespace);
updateReq.setLifecycle(&updateLifecycle);
try {
AutoGetOrCreateDb autoDb(txn, NamespaceString::kConfigCollectionNamespace.db(), MODE_X);
auto result = update(txn, autoDb.getDb(), updateReq);
if (result.numMatched == 0) {
warning() << "failed to update config string of shard identity document because "
<< "it does not exist. This shard could have been removed from the cluster";
} else {
LOG(2) << "Updated config server connection string in shardIdentity document to"
<< newConnectionString;
}
} catch (const DBException& exception) {
return exception.toStatus();
}
return Status::OK();
}
void ShardingState::_initializeRangeDeleterTaskExecutor() {
invariant(!_rangeDeleterTaskExecutor);
auto net =
executor::makeNetworkInterface("NetworkInterfaceCollectionRangeDeleter-TaskExecutor");
auto netPtr = net.get();
_rangeDeleterTaskExecutor = stdx::make_unique(
stdx::make_unique(netPtr), std::move(net));
}
executor::ThreadPoolTaskExecutor* ShardingState::getRangeDeleterTaskExecutor() {
return _rangeDeleterTaskExecutor.get();
}
/**
* Global free function.
*/
bool isMongos() {
return false;
}
} // namespace mongo