/**
* Copyright (C) 2015 MongoDB Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License, version 3,
* as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see .
*
* As a special exception, the copyright holders give permission to link the
* code of portions of this program with the OpenSSL library under certain
* conditions as described in each individual source file and distribute
* linked combinations including the program with the OpenSSL library. You
* must comply with the GNU Affero General Public License in all respects for
* all of the code used other than as permitted herein. If you modify file(s)
* with this exception, you may extend this exception to your version of the
* file(s), but you are not obligated to do so. If you do not wish to do so,
* delete this exception statement from your version. If you delete this
* exception statement from all source files in the program, then also delete
* it in the license file.
*/
#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kSharding
#include "mongo/platform/basic.h"
#include "mongo/s/client/shard_registry.h"
#include
#include "mongo/bson/bsonobj.h"
#include "mongo/bson/util/bson_extract.h"
#include "mongo/client/connection_string.h"
#include "mongo/client/replica_set_monitor.h"
#include "mongo/db/client.h"
#include "mongo/db/logical_time_metadata_hook.h"
#include "mongo/db/operation_context.h"
#include "mongo/db/server_options.h"
#include "mongo/executor/network_connection_hook.h"
#include "mongo/executor/network_interface_factory.h"
#include "mongo/executor/network_interface_thread_pool.h"
#include "mongo/executor/task_executor.h"
#include "mongo/executor/task_executor_pool.h"
#include "mongo/executor/thread_pool_task_executor.h"
#include "mongo/rpc/metadata/egress_metadata_hook_list.h"
#include "mongo/s/catalog/sharding_catalog_client.h"
#include "mongo/s/catalog/type_shard.h"
#include "mongo/s/client/shard.h"
#include "mongo/s/client/shard_factory.h"
#include "mongo/s/grid.h"
#include "mongo/stdx/memory.h"
#include "mongo/stdx/mutex.h"
#include "mongo/util/concurrency/with_lock.h"
#include "mongo/util/log.h"
#include "mongo/util/map_util.h"
#include "mongo/util/mongoutils/str.h"
#include "mongo/util/scopeguard.h"
namespace mongo {
using std::shared_ptr;
using std::set;
using std::string;
using std::unique_ptr;
using std::vector;
using executor::NetworkInterface;
using executor::NetworkInterfaceThreadPool;
using executor::TaskExecutorPool;
using executor::ThreadPoolTaskExecutor;
using executor::TaskExecutor;
using CallbackArgs = TaskExecutor::CallbackArgs;
using CallbackHandle = TaskExecutor::CallbackHandle;
namespace {
const Seconds kRefreshPeriod(30);
} // namespace
const ShardId ShardRegistry::kConfigServerShardId = ShardId("config");
ShardRegistry::ShardRegistry(std::unique_ptr shardFactory,
const ConnectionString& configServerCS)
: _shardFactory(std::move(shardFactory)), _initConfigServerCS(configServerCS) {}
ShardRegistry::~ShardRegistry() {
shutdown();
}
void ShardRegistry::shutdown() {
if (_executor && !_isShutdown) {
LOG(1) << "Shutting down task executor for reloading shard registry";
_executor->shutdown();
_executor->join();
_isShutdown = true;
}
}
ConnectionString ShardRegistry::getConfigServerConnectionString() const {
return getConfigShard()->getConnString();
}
StatusWith> ShardRegistry::getShard(OperationContext* opCtx,
const ShardId& shardId) {
// If we know about the shard, return it.
auto shard = _data.findByShardId(shardId);
if (shard) {
return shard;
}
// If we can't find the shard, attempt to reload the ShardRegistry.
bool didReload = reload(opCtx);
shard = _data.findByShardId(shardId);
// If we found the shard, return it.
if (shard) {
return shard;
}
// If we did not find the shard but performed the reload
// ourselves, return, because it means the shard does not exist.
if (didReload) {
return {ErrorCodes::ShardNotFound, str::stream() << "Shard " << shardId << " not found"};
}
// If we did not perform the reload ourselves (because there was a concurrent reload), force a
// reload again to ensure that we have seen data at least as up to date as our first reload.
reload(opCtx);
shard = _data.findByShardId(shardId);
if (shard) {
return shard;
}
return {ErrorCodes::ShardNotFound, str::stream() << "Shard " << shardId << " not found"};
}
shared_ptr ShardRegistry::getShardNoReload(const ShardId& shardId) {
return _data.findByShardId(shardId);
}
shared_ptr ShardRegistry::getShardForHostNoReload(const HostAndPort& host) {
return _data.findByHostAndPort(host);
}
shared_ptr ShardRegistry::getConfigShard() const {
auto shard = _data.getConfigShard();
invariant(shard);
return shard;
}
unique_ptr ShardRegistry::createConnection(const ConnectionString& connStr) const {
return _shardFactory->createUniqueShard(ShardId(""), connStr);
}
shared_ptr ShardRegistry::lookupRSName(const string& name) const {
return _data.findByRSName(name);
}
void ShardRegistry::getAllShardIdsNoReload(vector* all) const {
std::set seen;
_data.getAllShardIds(seen);
all->assign(seen.begin(), seen.end());
}
void ShardRegistry::getAllShardIds(OperationContext* opCtx, vector* all) {
getAllShardIdsNoReload(all);
if (all->empty()) {
bool didReload = reload(opCtx);
getAllShardIdsNoReload(all);
// If we didn't do the reload ourselves, we should retry to ensure
// that the reload is actually initiated while we're executing this
if (!didReload && all->empty()) {
reload(opCtx);
getAllShardIdsNoReload(all);
}
}
}
int ShardRegistry::getNumShards() const {
std::set seen;
_data.getAllShardIds(seen);
return seen.size();
}
void ShardRegistry::toBSON(BSONObjBuilder* result) const {
_data.toBSON(result);
}
void ShardRegistry::updateReplSetHosts(const ConnectionString& newConnString) {
invariant(newConnString.type() == ConnectionString::SET ||
newConnString.type() == ConnectionString::CUSTOM); // For dbtests
// to prevent update config shard connection string during init
stdx::unique_lock lock(_reloadMutex);
_data.rebuildShardIfExists(newConnString, _shardFactory.get());
}
void ShardRegistry::init() {
stdx::unique_lock reloadLock(_reloadMutex);
invariant(_initConfigServerCS.isValid());
auto configShard =
_shardFactory->createShard(ShardRegistry::kConfigServerShardId, _initConfigServerCS);
_data.addConfigShard(configShard);
// set to invalid so it cant be started more than once.
_initConfigServerCS = ConnectionString();
}
void ShardRegistry::startup(OperationContext* opCtx) {
// startup() must be called only once
invariant(!_executor);
auto hookList = stdx::make_unique();
hookList->addHook(stdx::make_unique(opCtx->getServiceContext()));
// construct task executor
auto net = executor::makeNetworkInterface("ShardRegistryUpdater", nullptr, std::move(hookList));
auto netPtr = net.get();
_executor = stdx::make_unique(
stdx::make_unique(netPtr), std::move(net));
LOG(1) << "Starting up task executor for periodic reloading of ShardRegistry";
_executor->startup();
auto status =
_executor->scheduleWork([this](const CallbackArgs& cbArgs) { _internalReload(cbArgs); });
if (status.getStatus() == ErrorCodes::ShutdownInProgress) {
LOG(1) << "Cant schedule Shard Registry reload. "
<< "Executor shutdown in progress";
return;
}
if (!status.isOK()) {
severe() << "Can't schedule ShardRegistry reload due to " << causedBy(status.getStatus());
fassertFailed(40252);
}
}
void ShardRegistry::_internalReload(const CallbackArgs& cbArgs) {
LOG(1) << "Reloading shardRegistry";
if (!cbArgs.status.isOK()) {
warning() << "cant reload ShardRegistry " << causedBy(cbArgs.status);
return;
}
Client::initThreadIfNotAlready("shard registry reload");
auto opCtx = cc().makeOperationContext();
try {
reload(opCtx.get());
} catch (const DBException& e) {
log() << "Periodic reload of shard registry failed " << causedBy(e) << "; will retry after "
<< kRefreshPeriod;
}
// reschedule itself
auto status =
_executor->scheduleWorkAt(_executor->now() + kRefreshPeriod,
[this](const CallbackArgs& cbArgs) { _internalReload(cbArgs); });
if (status.getStatus() == ErrorCodes::ShutdownInProgress) {
LOG(1) << "Cant schedule ShardRegistry reload. "
<< "Executor shutdown in progress";
return;
}
if (!status.isOK()) {
severe() << "Can't schedule ShardRegistry reload due to " << causedBy(status.getStatus());
fassertFailed(40253);
}
}
bool ShardRegistry::isUp() const {
stdx::unique_lock reloadLock(_reloadMutex);
return _isUp;
}
bool ShardRegistry::reload(OperationContext* opCtx) {
stdx::unique_lock reloadLock(_reloadMutex);
if (_reloadState == ReloadState::Reloading) {
// Another thread is already in the process of reloading so no need to do duplicate work.
// There is also an issue if multiple threads are allowed to call getAllShards()
// simultaneously because there is no good way to determine which of the threads has the
// more recent version of the data.
do {
auto waitStatus = opCtx->waitForConditionOrInterruptNoAssert(_inReloadCV, reloadLock);
if (!waitStatus.isOK()) {
LOG(1) << "ShardRegistry reload is interrupted due to: " << redact(waitStatus);
return false;
}
} while (_reloadState == ReloadState::Reloading);
if (_reloadState == ReloadState::Idle) {
return false;
}
// else proceed to reload since an error occured on the last reload attempt.
invariant(_reloadState == ReloadState::Failed);
}
_reloadState = ReloadState::Reloading;
reloadLock.unlock();
auto nextReloadState = ReloadState::Failed;
auto failGuard = MakeGuard([&] {
if (!reloadLock.owns_lock()) {
reloadLock.lock();
}
_reloadState = nextReloadState;
_inReloadCV.notify_all();
});
ShardRegistryData currData(opCtx, _shardFactory.get());
currData.addConfigShard(_data.getConfigShard());
_data.swap(currData);
// Remove RSMs that are not in the catalog any more.
std::set removedShardIds;
currData.getAllShardIds(removedShardIds);
_data.shardIdSetDifference(removedShardIds);
for (auto& shardId : removedShardIds) {
auto shard = currData.findByShardId(shardId);
invariant(shard);
auto name = shard->getConnString().getSetName();
ReplicaSetMonitor::remove(name);
}
nextReloadState = ReloadState::Idle;
// first successful reload means that registry is up
_isUp = true;
return true;
}
void ShardRegistry::replicaSetChangeShardRegistryUpdateHook(
const std::string& setName, const std::string& newConnectionString) {
// Inform the ShardRegsitry of the new connection string for the shard.
auto connString = fassert(28805, ConnectionString::parse(newConnectionString));
invariant(setName == connString.getSetName());
grid.shardRegistry()->updateReplSetHosts(connString);
}
void ShardRegistry::replicaSetChangeConfigServerUpdateHook(const std::string& setName,
const std::string& newConnectionString) {
// This is run in it's own thread. Exceptions escaping would result in a call to terminate.
Client::initThread("replSetChange");
auto opCtx = cc().makeOperationContext();
auto const grid = Grid::get(opCtx.get());
try {
std::shared_ptr s = grid->shardRegistry()->lookupRSName(setName);
if (!s) {
LOG(1) << "shard not found for set: " << newConnectionString
<< " when attempting to inform config servers of updated set membership";
return;
}
if (s->isConfig()) {
// No need to tell the config servers their own connection string.
return;
}
auto status = grid->catalogClient()->updateConfigDocument(
opCtx.get(),
ShardType::ConfigNS,
BSON(ShardType::name(s->getId().toString())),
BSON("$set" << BSON(ShardType::host(newConnectionString))),
false,
ShardingCatalogClient::kMajorityWriteConcern);
if (!status.isOK()) {
error() << "RSChangeWatcher: could not update config db for set: " << setName
<< " to: " << newConnectionString << causedBy(status.getStatus());
}
} catch (const std::exception& e) {
warning() << "caught exception while updating config servers: " << e.what();
} catch (...) {
warning() << "caught unknown exception while updating config servers";
}
}
////////////// ShardRegistryData //////////////////
ShardRegistryData::ShardRegistryData(OperationContext* opCtx, ShardFactory* shardFactory) {
auto const catalogClient = Grid::get(opCtx)->catalogClient();
auto readConcern = repl::ReadConcernLevel::kMajorityReadConcern;
// ShardRemote requires a majority read. We can only allow a non-majority read if we are a
// config server.
if (serverGlobalParams.clusterRole == ClusterRole::ConfigServer &&
!repl::ReadConcernArgs::get(opCtx).isEmpty()) {
readConcern = repl::ReadConcernArgs::get(opCtx).getLevel();
}
auto shardsAndOpTime =
uassertStatusOKWithContext(catalogClient->getAllShards(opCtx, readConcern),
"could not get updated shard list from config server");
auto shards = std::move(shardsAndOpTime.value);
auto reloadOpTime = std::move(shardsAndOpTime.opTime);
LOG(1) << "found " << shards.size()
<< " shards listed on config server(s) with lastVisibleOpTime: "
<< reloadOpTime.toBSON();
// Ensure targeter exists for all shards and take shard connection string from the targeter.
// Do this before re-taking the mutex to avoid deadlock with the ReplicaSetMonitor updating
// hosts for a given shard.
std::vector> shardsInfo;
for (const auto& shardType : shards) {
// This validation should ideally go inside the ShardType::validate call. However, doing
// it there would prevent us from loading previously faulty shard hosts, which might have
// been stored (i.e., the entire getAllShards call would fail).
auto shardHostStatus = ConnectionString::parse(shardType.getHost());
if (!shardHostStatus.isOK()) {
warning() << "Unable to parse shard host " << shardHostStatus.getStatus().toString();
continue;
}
shardsInfo.push_back(std::make_tuple(shardType.getName(), shardHostStatus.getValue()));
}
for (auto& shardInfo : shardsInfo) {
if (std::get<0>(shardInfo) == "config") {
continue;
}
auto shard = shardFactory->createShard(std::move(std::get<0>(shardInfo)),
std::move(std::get<1>(shardInfo)));
_addShard(WithLock::withoutLock(), std::move(shard), false);
}
}
void ShardRegistryData::swap(ShardRegistryData& other) {
stdx::lock_guard lk(_mutex);
_lookup.swap(other._lookup);
_rsLookup.swap(other._rsLookup);
_hostLookup.swap(other._hostLookup);
_configShard.swap(other._configShard);
}
shared_ptr ShardRegistryData::getConfigShard() const {
stdx::lock_guard lk(_mutex);
return _configShard;
}
void ShardRegistryData::addConfigShard(std::shared_ptr shard) {
stdx::lock_guard lk(_mutex);
_configShard = shard;
_addShard(lk, shard, true);
}
shared_ptr ShardRegistryData::findByRSName(const string& name) const {
stdx::lock_guard lk(_mutex);
auto i = _rsLookup.find(name);
return (i != _rsLookup.end()) ? i->second : nullptr;
}
shared_ptr ShardRegistryData::findByHostAndPort(const HostAndPort& hostAndPort) const {
stdx::lock_guard lk(_mutex);
return mapFindWithDefault(_hostLookup, hostAndPort);
}
shared_ptr ShardRegistryData::findByShardId(const ShardId& shardId) const {
stdx::lock_guard lk(_mutex);
return _findByShardId(lk, shardId);
}
shared_ptr ShardRegistryData::_findByShardId(WithLock, ShardId const& shardId) const {
auto i = _lookup.find(shardId);
return (i != _lookup.end()) ? i->second : nullptr;
}
void ShardRegistryData::toBSON(BSONObjBuilder* result) const {
// Need to copy, then sort by shardId.
std::vector> shards;
{
stdx::lock_guard lk(_mutex);
shards.reserve(_lookup.size());
for (auto&& shard : _lookup) {
shards.emplace_back(shard.first, shard.second->getConnString().toString());
}
}
std::sort(std::begin(shards), std::end(shards));
BSONObjBuilder mapBob(result->subobjStart("map"));
for (auto&& shard : shards) {
mapBob.append(shard.first, shard.second);
}
}
void ShardRegistryData::getAllShardIds(std::set& seen) const {
stdx::lock_guard lk(_mutex);
for (auto i = _lookup.begin(); i != _lookup.end(); ++i) {
const auto& s = i->second;
if (s->getId().toString() == "config") {
continue;
}
seen.insert(s->getId());
}
}
void ShardRegistryData::shardIdSetDifference(std::set& diff) const {
stdx::lock_guard lk(_mutex);
for (auto i = _lookup.begin(); i != _lookup.end(); ++i) {
invariant(i->second);
auto res = diff.find(i->second->getId());
if (res != diff.end()) {
diff.erase(res);
}
}
}
void ShardRegistryData::rebuildShardIfExists(const ConnectionString& newConnString,
ShardFactory* factory) {
stdx::unique_lock updateConnStringLock(_mutex);
auto it = _rsLookup.find(newConnString.getSetName());
if (it == _rsLookup.end()) {
return;
}
_rebuildShard(updateConnStringLock, newConnString, factory);
}
void ShardRegistryData::_rebuildShard(WithLock lk,
ConnectionString const& newConnString,
ShardFactory* factory) {
auto it = _rsLookup.find(newConnString.getSetName());
invariant(it->second);
auto shard = factory->createShard(it->second->getId(), newConnString);
_addShard(lk, shard, true);
if (shard->isConfig()) {
_configShard = shard;
}
}
void ShardRegistryData::_addShard(WithLock lk,
std::shared_ptr const& shard,
bool useOriginalCS) {
const ShardId shardId = shard->getId();
const ConnectionString connString =
useOriginalCS ? shard->originalConnString() : shard->getConnString();
auto currentShard = _findByShardId(lk, shardId);
if (currentShard) {
auto oldConnString = currentShard->originalConnString();
if (oldConnString.toString() != connString.toString()) {
log() << "Updating ShardRegistry connection string for shard " << currentShard->getId()
<< " from: " << oldConnString.toString() << " to: " << connString.toString();
}
for (const auto& host : oldConnString.getServers()) {
_lookup.erase(host.toString());
_hostLookup.erase(host);
}
_lookup.erase(oldConnString.toString());
}
_lookup[shard->getId()] = shard;
LOG(3) << "Adding shard " << shard->getId() << ", with CS " << connString.toString();
if (connString.type() == ConnectionString::SET) {
_rsLookup[connString.getSetName()] = shard;
} else if (connString.type() == ConnectionString::CUSTOM) {
// CUSTOM connection strings (ie "$dummy:10000) become DBDirectClient connections which
// always return "localhost" as their response to getServerAddress(). This is just for
// making dbtest work.
_lookup[ShardId("localhost")] = shard;
_hostLookup[HostAndPort("localhost")] = shard;
}
// TODO: The only reason to have the shard host names in the lookup table is for the
// setShardVersion call, which resolves the shard id from the shard address. This is
// error-prone and will go away eventually when we switch all communications to go through
// the remote command runner and all nodes are sharding aware by default.
_lookup[connString.toString()] = shard;
for (const HostAndPort& hostAndPort : connString.getServers()) {
_lookup[hostAndPort.toString()] = shard;
_hostLookup[hostAndPort] = shard;
}
}
} // namespace mongo