/**
* Copyright (C) 2008-2015 MongoDB Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License, version 3,
* as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see .
*
* As a special exception, the copyright holders give permission to link the
* code of portions of this program with the OpenSSL library under certain
* conditions as described in each individual source file and distribute
* linked combinations including the program with the OpenSSL library. You
* must comply with the GNU Affero General Public License in all respects for
* all of the code used other than as permitted herein. If you modify file(s)
* with this exception, you may extend this exception to your version of the
* file(s), but you are not obligated to do so. If you do not wish to do so,
* delete this exception statement from your version. If you delete this
* exception statement from all source files in the program, then also delete
* it in the license file.
*/
#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kSharding
#include "mongo/platform/basic.h"
#include "mongo/s/config.h"
#include "mongo/client/connpool.h"
#include "mongo/db/client.h"
#include "mongo/db/lasterror.h"
#include "mongo/db/operation_context.h"
#include "mongo/db/write_concern.h"
#include "mongo/s/balancer_configuration.h"
#include "mongo/s/catalog/catalog_cache.h"
#include "mongo/s/catalog/sharding_catalog_client.h"
#include "mongo/s/catalog/type_chunk.h"
#include "mongo/s/catalog/type_collection.h"
#include "mongo/s/catalog/type_database.h"
#include "mongo/s/catalog/type_shard.h"
#include "mongo/s/chunk_manager.h"
#include "mongo/s/chunk_version.h"
#include "mongo/s/client/shard_registry.h"
#include "mongo/s/grid.h"
#include "mongo/stdx/memory.h"
#include "mongo/util/log.h"
namespace mongo {
using std::set;
using std::string;
using std::unique_ptr;
using std::vector;
CollectionInfo::CollectionInfo(OperationContext* txn,
const CollectionType& coll,
repl::OpTime opTime)
: _configOpTime(std::move(opTime)) {
_dropped = coll.getDropped();
// Do this *first* so we're invisible to everyone else
std::unique_ptr manager(stdx::make_unique(txn, coll));
manager->loadExistingRanges(txn, nullptr);
// Collections with no chunks are unsharded, no matter what the collections entry says. This
// helps prevent errors when dropping in a different process.
if (manager->numChunks() != 0) {
useChunkManager(std::move(manager));
} else {
warning() << "no chunks found for collection " << manager->getns()
<< ", assuming unsharded";
unshard();
}
_dirty = false;
}
CollectionInfo::~CollectionInfo() = default;
void CollectionInfo::resetCM(ChunkManager* cm) {
invariant(cm);
invariant(_cm);
_cm.reset(cm);
}
void CollectionInfo::unshard() {
_cm.reset();
_dropped = true;
_dirty = true;
_key = BSONObj();
}
void CollectionInfo::useChunkManager(std::shared_ptr manager) {
_cm = manager;
_key = manager->getShardKeyPattern().toBSON().getOwned();
_unique = manager->isUnique();
_dirty = true;
_dropped = false;
}
void CollectionInfo::save(OperationContext* txn, const string& ns) {
CollectionType coll;
coll.setNs(NamespaceString{ns});
if (_cm) {
invariant(!_dropped);
coll.setEpoch(_cm->getVersion().epoch());
// TODO(schwerin): The following isn't really a date, but is stored as one in-memory and
// in config.collections, as a historical oddity.
coll.setUpdatedAt(Date_t::fromMillisSinceEpoch(_cm->getVersion().toLong()));
coll.setKeyPattern(_cm->getShardKeyPattern().toBSON());
coll.setDefaultCollation(
_cm->getDefaultCollator() ? _cm->getDefaultCollator()->getSpec().toBSON() : BSONObj());
coll.setUnique(_cm->isUnique());
} else {
invariant(_dropped);
coll.setDropped(true);
coll.setEpoch(ChunkVersion::DROPPED().epoch());
coll.setUpdatedAt(Date_t::now());
}
uassertStatusOK(grid.catalogClient(txn)->updateCollection(txn, ns, coll));
_dirty = false;
}
DBConfig::DBConfig(std::string name, const DatabaseType& dbt, repl::OpTime configOpTime)
: _name(name), _configOpTime(std::move(configOpTime)) {
invariant(_name == dbt.getName());
_primaryId = dbt.getPrimary();
_shardingEnabled = dbt.getSharded();
}
DBConfig::~DBConfig() = default;
bool DBConfig::isSharded(const string& ns) {
stdx::lock_guard lk(_lock);
if (!_shardingEnabled) {
return false;
}
CollectionInfoMap::iterator i = _collections.find(ns);
if (i == _collections.end()) {
return false;
}
return i->second.isSharded();
}
void DBConfig::invalidateNs(const std::string& ns) {
stdx::lock_guard lk(_lock);
CollectionInfoMap::iterator it = _collections.find(ns);
if (it != _collections.end()) {
_collections.erase(it);
}
}
void DBConfig::enableSharding(OperationContext* txn) {
invariant(_name != "config");
stdx::lock_guard lk(_lock);
if (_shardingEnabled) {
return;
}
_shardingEnabled = true;
_save(txn);
}
bool DBConfig::removeSharding(OperationContext* txn, const string& ns) {
stdx::lock_guard lk(_lock);
if (!_shardingEnabled) {
warning() << "could not remove sharding for collection " << ns
<< ", sharding not enabled for db";
return false;
}
CollectionInfoMap::iterator i = _collections.find(ns);
if (i == _collections.end())
return false;
CollectionInfo& ci = _collections[ns];
if (!ci.isSharded()) {
warning() << "could not remove sharding for collection " << ns
<< ", no sharding information found";
return false;
}
ci.unshard();
_save(txn, false, true);
return true;
}
// Handles weird logic related to getting *either* a chunk manager *or* the collection primary
// shard
void DBConfig::getChunkManagerOrPrimary(OperationContext* txn,
const string& ns,
std::shared_ptr& manager,
std::shared_ptr& primary) {
// The logic here is basically that at any time, our collection can become sharded or
// unsharded
// via a command. If we're not sharded, we want to send data to the primary, if sharded, we
// want to send data to the correct chunks, and we can't check both w/o the lock.
manager.reset();
primary.reset();
{
stdx::lock_guard lk(_lock);
CollectionInfoMap::iterator i = _collections.find(ns);
// No namespace
if (i == _collections.end()) {
// If we don't know about this namespace, it's unsharded by default
auto primaryStatus = grid.shardRegistry()->getShard(txn, _primaryId);
if (primaryStatus.isOK()) {
primary = primaryStatus.getValue();
}
} else {
CollectionInfo& cInfo = i->second;
// TODO: we need to be careful about handling shardingEnabled, b/c in some places we
// seem to use and some we don't. If we use this function in combination with just
// getChunkManager() on a slightly borked config db, we'll get lots of staleconfig
// retries
if (_shardingEnabled && cInfo.isSharded()) {
manager = cInfo.getCM();
} else {
auto primaryStatus = grid.shardRegistry()->getShard(txn, _primaryId);
if (primaryStatus.isOK()) {
primary = primaryStatus.getValue();
}
}
}
}
invariant(manager || primary);
invariant(!manager || !primary);
}
std::shared_ptr DBConfig::getChunkManagerIfExists(OperationContext* txn,
const string& ns,
bool shouldReload,
bool forceReload) {
// Don't report exceptions here as errors in GetLastError
LastError::Disabled ignoreForGLE(&LastError::get(cc()));
try {
return getChunkManager(txn, ns, shouldReload, forceReload);
} catch (AssertionException& e) {
warning() << "chunk manager not found for " << ns << causedBy(e);
return nullptr;
}
}
std::shared_ptr DBConfig::getChunkManager(OperationContext* txn,
const string& ns,
bool shouldReload,
bool forceReload) {
BSONObj key;
ChunkVersion oldVersion;
std::shared_ptr oldManager;
{
stdx::lock_guard lk(_lock);
bool earlyReload = !_collections[ns].isSharded() && (shouldReload || forceReload);
if (earlyReload) {
// This is to catch cases where there this is a new sharded collection.
// Note: read the _reloadCount inside the _lock mutex, so _loadIfNeeded will always
// be forced to perform a reload.
const auto currentReloadIteration = _reloadCount.load();
_loadIfNeeded(txn, currentReloadIteration);
}
CollectionInfo& ci = _collections[ns];
uassert(10181, str::stream() << "not sharded:" << ns, ci.isSharded());
invariant(!ci.key().isEmpty());
if (!(shouldReload || forceReload) || earlyReload) {
return ci.getCM();
}
key = ci.key().copy();
if (ci.getCM()) {
oldManager = ci.getCM();
oldVersion = ci.getCM()->getVersion();
}
}
invariant(!key.isEmpty());
// TODO: We need to keep this first one-chunk check in until we have a more efficient way of
// creating/reusing a chunk manager, as doing so requires copying the full set of chunks
// currently
vector newestChunk;
if (oldVersion.isSet() && !forceReload) {
uassertStatusOK(
grid.catalogClient(txn)->getChunks(txn,
BSON(ChunkType::ns(ns)),
BSON(ChunkType::DEPRECATED_lastmod() << -1),
1,
&newestChunk,
nullptr,
repl::ReadConcernLevel::kMajorityReadConcern));
if (!newestChunk.empty()) {
invariant(newestChunk.size() == 1);
ChunkVersion v = newestChunk[0].getVersion();
if (v.equals(oldVersion)) {
stdx::lock_guard lk(_lock);
const CollectionInfo& ci = _collections[ns];
uassert(15885,
str::stream() << "not sharded after reloading from chunks : " << ns,
ci.isSharded());
return ci.getCM();
}
}
} else if (!oldVersion.isSet()) {
warning() << "version 0 found when " << (forceReload ? "reloading" : "checking")
<< " chunk manager; collection '" << ns << "' initially detected as sharded";
}
// we are not locked now, and want to load a new ChunkManager
unique_ptr tempChunkManager;
{
stdx::lock_guard lll(_hitConfigServerLock);
if (!newestChunk.empty() && !forceReload) {
// If we have a target we're going for see if we've hit already
stdx::lock_guard lk(_lock);
CollectionInfo& ci = _collections[ns];
if (ci.isSharded() && ci.getCM()) {
ChunkVersion currentVersion = newestChunk[0].getVersion();
// Only reload if the version we found is newer than our own in the same epoch
if (currentVersion <= ci.getCM()->getVersion() &&
ci.getCM()->getVersion().hasEqualEpoch(currentVersion)) {
return ci.getCM();
}
}
}
tempChunkManager.reset(new ChunkManager(
oldManager->getns(),
oldManager->getShardKeyPattern(),
oldManager->getDefaultCollator() ? oldManager->getDefaultCollator()->clone() : nullptr,
oldManager->isUnique()));
tempChunkManager->loadExistingRanges(txn, oldManager.get());
if (tempChunkManager->numChunks() == 0) {
// Maybe we're not sharded any more, so do a full reload
reload(txn);
return getChunkManager(txn, ns, false);
}
}
stdx::lock_guard lk(_lock);
CollectionInfo& ci = _collections[ns];
uassert(14822, (string) "state changed in the middle: " + ns, ci.isSharded());
// Reset if our versions aren't the same
bool shouldReset = !tempChunkManager->getVersion().equals(ci.getCM()->getVersion());
// Also reset if we're forced to do so
if (!shouldReset && forceReload) {
shouldReset = true;
warning() << "chunk manager reload forced for collection '" << ns << "', config version is "
<< tempChunkManager->getVersion();
}
//
// LEGACY BEHAVIOR
//
// It's possible to get into a state when dropping collections when our new version is
// less than our prev version. Behave identically to legacy mongos, for now, and warn to
// draw attention to the problem.
//
// TODO: Assert in next version, to allow smooth upgrades
//
if (shouldReset && tempChunkManager->getVersion() < ci.getCM()->getVersion()) {
shouldReset = false;
warning() << "not resetting chunk manager for collection '" << ns << "', config version is "
<< tempChunkManager->getVersion() << " and "
<< "old version is " << ci.getCM()->getVersion();
}
// end legacy behavior
if (shouldReset) {
const auto cmOpTime = tempChunkManager->getConfigOpTime();
// The existing ChunkManager could have been updated since we last checked, so
// replace the existing chunk manager only if it is strictly newer.
// The condition should be (>) than instead of (>=), but use (>=) since legacy non-repl
// config servers will always have an opTime of zero.
if (cmOpTime >= ci.getCM()->getConfigOpTime()) {
ci.resetCM(tempChunkManager.release());
}
}
uassert(
15883, str::stream() << "not sharded after chunk manager reset : " << ns, ci.isSharded());
return ci.getCM();
}
void DBConfig::setPrimary(OperationContext* txn, const ShardId& newPrimaryId) {
stdx::lock_guard lk(_lock);
_primaryId = newPrimaryId;
_save(txn);
}
bool DBConfig::load(OperationContext* txn) {
const auto currentReloadIteration = _reloadCount.load();
stdx::lock_guard lk(_lock);
return _loadIfNeeded(txn, currentReloadIteration);
}
bool DBConfig::_loadIfNeeded(OperationContext* txn, Counter reloadIteration) {
if (reloadIteration != _reloadCount.load()) {
return true;
}
auto status = grid.catalogClient(txn)->getDatabase(txn, _name);
if (status == ErrorCodes::NamespaceNotFound) {
return false;
}
// All other errors are connectivity, etc so throw an exception.
uassertStatusOK(status.getStatus());
const auto& dbOpTimePair = status.getValue();
const auto& dbt = dbOpTimePair.value;
invariant(_name == dbt.getName());
_primaryId = dbt.getPrimary();
_shardingEnabled = dbt.getSharded();
invariant(dbOpTimePair.opTime >= _configOpTime);
_configOpTime = dbOpTimePair.opTime;
// Load all collections
vector collections;
repl::OpTime configOpTimeWhenLoadingColl;
uassertStatusOK(grid.catalogClient(txn)->getCollections(
txn, &_name, &collections, &configOpTimeWhenLoadingColl));
int numCollsErased = 0;
int numCollsSharded = 0;
invariant(configOpTimeWhenLoadingColl >= _configOpTime);
for (const auto& coll : collections) {
auto collIter = _collections.find(coll.getNs().ns());
if (collIter != _collections.end()) {
invariant(configOpTimeWhenLoadingColl >= collIter->second.getConfigOpTime());
}
if (coll.getDropped()) {
_collections.erase(coll.getNs().ns());
numCollsErased++;
} else {
_collections[coll.getNs().ns()] =
CollectionInfo(txn, coll, configOpTimeWhenLoadingColl);
numCollsSharded++;
}
}
LOG(2) << "found " << numCollsSharded << " collections left and " << numCollsErased
<< " collections dropped for database " << _name;
_reloadCount.fetchAndAdd(1);
return true;
}
void DBConfig::_save(OperationContext* txn, bool db, bool coll) {
if (db) {
DatabaseType dbt;
dbt.setName(_name);
dbt.setPrimary(_primaryId);
dbt.setSharded(_shardingEnabled);
uassertStatusOK(grid.catalogClient(txn)->updateDatabase(txn, _name, dbt));
}
if (coll) {
for (CollectionInfoMap::iterator i = _collections.begin(); i != _collections.end(); ++i) {
if (!i->second.isDirty()) {
continue;
}
i->second.save(txn, i->first);
}
}
}
bool DBConfig::reload(OperationContext* txn) {
bool successful = false;
const auto currentReloadIteration = _reloadCount.load();
{
stdx::lock_guard lk(_lock);
successful = _loadIfNeeded(txn, currentReloadIteration);
}
// If we aren't successful loading the database entry, we don't want to keep the stale
// object around which has invalid data.
if (!successful) {
grid.catalogCache()->invalidate(_name);
}
return successful;
}
bool DBConfig::dropDatabase(OperationContext* txn, string& errmsg) {
/**
* 1) update config server
* 2) drop and reset sharded collections
* 3) drop and reset primary
* 4) drop everywhere to clean up loose ends
*/
log() << "DBConfig::dropDatabase: " << _name;
grid.catalogClient(txn)->logChange(
txn, "dropDatabase.start", _name, BSONObj(), ShardingCatalogClient::kMajorityWriteConcern);
// 1
grid.catalogCache()->invalidate(_name);
Status result = grid.catalogClient(txn)->removeConfigDocuments(
txn,
DatabaseType::ConfigNS,
BSON(DatabaseType::name(_name)),
ShardingCatalogClient::kMajorityWriteConcern);
if (!result.isOK()) {
errmsg = result.reason();
log() << "could not drop '" << _name << "': " << errmsg;
return false;
}
LOG(1) << "\t removed entry from config server for: " << _name;
set shardIds;
// 2
while (true) {
int num = 0;
if (!_dropShardedCollections(txn, num, shardIds, errmsg)) {
return 0;
}
log() << " DBConfig::dropDatabase: " << _name << " dropped sharded collections: " << num;
if (num == 0) {
break;
}
}
// 3
{
const auto shard = uassertStatusOK(grid.shardRegistry()->getShard(txn, _primaryId));
ScopedDbConnection conn(shard->getConnString(), 30.0);
BSONObj res;
if (!conn->dropDatabase(_name, txn->getWriteConcern(), &res)) {
errmsg = res.toString() + " at " + _primaryId.toString();
return 0;
}
conn.done();
if (auto wcErrorElem = res["writeConcernError"]) {
auto wcError = wcErrorElem.Obj();
if (auto errMsgElem = wcError["errmsg"]) {
errmsg = errMsgElem.str() + " at " + _primaryId.toString();
return false;
}
}
}
// 4
for (const ShardId& shardId : shardIds) {
const auto shardStatus = grid.shardRegistry()->getShard(txn, shardId);
if (!shardStatus.isOK()) {
continue;
}
ScopedDbConnection conn(shardStatus.getValue()->getConnString(), 30.0);
BSONObj res;
if (!conn->dropDatabase(_name, txn->getWriteConcern(), &res)) {
errmsg = res.toString() + " at " + shardId.toString();
return 0;
}
conn.done();
if (auto wcErrorElem = res["writeConcernError"]) {
auto wcError = wcErrorElem.Obj();
if (auto errMsgElem = wcError["errmsg"]) {
errmsg = errMsgElem.str() + " at " + shardId.toString();
return false;
}
}
}
LOG(1) << "\t dropped primary db for: " << _name;
grid.catalogClient(txn)->logChange(
txn, "dropDatabase", _name, BSONObj(), ShardingCatalogClient::kMajorityWriteConcern);
return true;
}
bool DBConfig::_dropShardedCollections(OperationContext* txn,
int& num,
set& shardIds,
string& errmsg) {
num = 0;
set seen;
while (true) {
std::string aCollection;
{
stdx::lock_guard lk(_lock);
CollectionInfoMap::iterator i = _collections.begin();
for (; i != _collections.end(); ++i) {
if (i->second.isSharded()) {
break;
}
}
if (i == _collections.end()) {
break;
}
aCollection = i->first;
if (seen.count(aCollection)) {
errmsg = "seen a collection twice!";
return false;
}
seen.insert(aCollection);
LOG(1) << "\t dropping sharded collection: " << aCollection;
i->second.getCM()->getAllShardIds(&shardIds);
}
// drop lock before network activity
uassertStatusOK(grid.catalogClient(txn)->dropCollection(txn, NamespaceString(aCollection)));
// We should warn, but it's not a fatal error if someone else reloaded the db/coll as
// unsharded in the meantime
if (!removeSharding(txn, aCollection)) {
warning() << "collection " << aCollection
<< " was reloaded as unsharded before drop completed"
<< " during drop of all collections";
}
num++;
uassert(10184, "_dropShardedCollections too many collections - bailing", num < 100000);
LOG(2) << "\t\t dropped " << num << " so far";
}
return true;
}
void DBConfig::getAllShardIds(set* shardIds) {
dassert(shardIds);
stdx::lock_guard lk(_lock);
shardIds->insert(_primaryId);
for (CollectionInfoMap::const_iterator it(_collections.begin()), end(_collections.end());
it != end;
++it) {
if (it->second.isSharded()) {
it->second.getCM()->getAllShardIds(shardIds);
} // TODO: handle collections on non-primary shard
}
}
void DBConfig::getAllShardedCollections(set& namespaces) {
stdx::lock_guard lk(_lock);
for (CollectionInfoMap::const_iterator i = _collections.begin(); i != _collections.end(); i++) {
log() << "Coll : " << i->first << " sharded? " << i->second.isSharded();
if (i->second.isSharded())
namespaces.insert(i->first);
}
}
bool DBConfig::isShardingEnabled() {
stdx::lock_guard lk(_lock);
return _shardingEnabled;
}
ShardId DBConfig::getPrimaryId() {
stdx::lock_guard lk(_lock);
return _primaryId;
}
/* --- ConfigServer ---- */
void ConfigServer::replicaSetChangeShardRegistryUpdateHook(const string& setName,
const string& newConnectionString) {
// Inform the ShardRegsitry of the new connection string for the shard.
auto connString = fassertStatusOK(28805, ConnectionString::parse(newConnectionString));
invariant(setName == connString.getSetName());
grid.shardRegistry()->updateReplSetHosts(connString);
}
void ConfigServer::replicaSetChangeConfigServerUpdateHook(const string& setName,
const string& newConnectionString) {
// This is run in it's own thread. Exceptions escaping would result in a call to terminate.
Client::initThread("replSetChange");
auto txn = cc().makeOperationContext();
try {
std::shared_ptr s = grid.shardRegistry()->lookupRSName(setName);
if (!s) {
LOG(1) << "shard not found for set: " << newConnectionString
<< " when attempting to inform config servers of updated set membership";
return;
}
if (s->isConfig()) {
// No need to tell the config servers their own connection string.
return;
}
auto status = grid.catalogClient(txn.get())->updateConfigDocument(
txn.get(),
ShardType::ConfigNS,
BSON(ShardType::name(s->getId().toString())),
BSON("$set" << BSON(ShardType::host(newConnectionString))),
false,
ShardingCatalogClient::kMajorityWriteConcern);
if (!status.isOK()) {
error() << "RSChangeWatcher: could not update config db for set: " << setName
<< " to: " << newConnectionString << causedBy(status.getStatus());
}
} catch (const std::exception& e) {
warning() << "caught exception while updating config servers: " << e.what();
} catch (...) {
warning() << "caught unknown exception while updating config servers";
}
}
} // namespace mongo