// config.cpp
/**
* Copyright (C) 2008 10gen Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License, version 3,
* as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see .
*
* As a special exception, the copyright holders give permission to link the
* code of portions of this program with the OpenSSL library under certain
* conditions as described in each individual source file and distribute
* linked combinations including the program with the OpenSSL library. You
* must comply with the GNU Affero General Public License in all respects
* for all of the code used other than as permitted herein. If you modify
* file(s) with this exception, you may extend this exception to your
* version of the file(s), but you are not obligated to do so. If you do not
* wish to do so, delete this exception statement from your version. If you
* delete this exception statement from all source files in the program,
* then also delete it in the license file.
*/
#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kSharding
#include "mongo/platform/basic.h"
#include
#include "pcrecpp.h"
#include "mongo/client/connpool.h"
#include "mongo/client/dbclientcursor.h"
#include "mongo/db/client.h"
#include "mongo/db/lasterror.h"
#include "mongo/db/write_concern.h"
#include "mongo/s/chunk.h"
#include "mongo/s/chunk_version.h"
#include "mongo/s/cluster_write.h"
#include "mongo/s/config.h"
#include "mongo/s/grid.h"
#include "mongo/s/server.h"
#include "mongo/s/type_changelog.h"
#include "mongo/s/type_chunk.h"
#include "mongo/s/type_collection.h"
#include "mongo/s/type_database.h"
#include "mongo/s/type_locks.h"
#include "mongo/s/type_lockpings.h"
#include "mongo/s/type_settings.h"
#include "mongo/s/type_shard.h"
#include "mongo/s/type_tags.h"
#include "mongo/util/exit.h"
#include "mongo/util/log.h"
#include "mongo/util/net/message.h"
#include "mongo/util/stringutils.h"
namespace mongo {
using boost::scoped_ptr;
using std::auto_ptr;
using std::endl;
using std::pair;
using std::set;
using std::stringstream;
using std::vector;
int ConfigServer::VERSION = 3;
Shard Shard::EMPTY;
/* --- DBConfig --- */
DBConfig::CollectionInfo::CollectionInfo( const BSONObj& in ) {
_dirty = false;
_dropped = in[CollectionType::dropped()].trueValue();
if ( in[CollectionType::keyPattern()].isABSONObj() ) {
shard( new ChunkManager( in ) );
}
_dirty = false;
}
void DBConfig::CollectionInfo::shard( ChunkManager* manager ){
// Do this *first* so we're invisible to everyone else
manager->loadExistingRanges(configServer.getPrimary().getConnString(), NULL);
//
// Collections with no chunks are unsharded, no matter what the collections entry says
// This helps prevent errors when dropping in a different process
//
if( manager->numChunks() != 0 ){
_cm = ChunkManagerPtr( manager );
_key = manager->getShardKeyPattern().toBSON().getOwned();
_unqiue = manager->isUnique();
_dirty = true;
_dropped = false;
}
else{
warning() << "no chunks found for collection " << manager->getns()
<< ", assuming unsharded" << endl;
unshard();
}
}
void DBConfig::CollectionInfo::unshard() {
_cm.reset();
_dropped = true;
_dirty = true;
_key = BSONObj();
}
void DBConfig::CollectionInfo::save( const string& ns ) {
BSONObj key = BSON( "_id" << ns );
BSONObjBuilder val;
val.append(CollectionType::ns(), ns);
val.appendDate(CollectionType::DEPRECATED_lastmod(), jsTime());
val.appendBool(CollectionType::dropped(), _dropped);
if ( _cm ) {
// This also appends the lastmodEpoch.
_cm->getInfo( val );
}
else {
// lastmodEpoch is a required field so we also need to do it here.
val.append(CollectionType::DEPRECATED_lastmodEpoch(), ChunkVersion::DROPPED().epoch());
}
Status result = clusterUpdate(CollectionType::ConfigNS,
key,
val.obj(),
true /* upsert */,
false /* multi */,
WriteConcernOptions::AllConfigs,
NULL);
if ( !result.isOK() ) {
uasserted( 13473, str::stream() << "failed to save collection (" << ns
<< "): " << result.reason() );
}
_dirty = false;
}
bool DBConfig::isSharded( const string& ns ) {
if ( ! _shardingEnabled )
return false;
scoped_lock lk( _lock );
return _isSharded( ns );
}
bool DBConfig::_isSharded( const string& ns ) {
if ( ! _shardingEnabled )
return false;
Collections::iterator i = _collections.find( ns );
if ( i == _collections.end() )
return false;
return i->second.isSharded();
}
ShardPtr DBConfig::getShardIfExists( const string& ns ){
try{
// TODO: this function assumes the _primary will not change under-the-covers, but so does
// getShard() in general
return ShardPtr( new Shard( getShard( ns ) ) );
}
catch( AssertionException& e ){
warning() << "primary not found for " << ns << causedBy( e ) << endl;
return ShardPtr();
}
}
const Shard& DBConfig::getShard( const string& ns ) {
if ( isSharded( ns ) )
return Shard::EMPTY;
uassert( 10178 , "no primary!" , _primary.ok() );
return _primary;
}
void DBConfig::enableSharding( bool save ) {
if ( _shardingEnabled )
return;
verify( _name != "config" );
scoped_lock lk( _lock );
_shardingEnabled = true;
if( save ) _save();
}
/**
*
*/
ChunkManagerPtr DBConfig::shardCollection(const string& ns,
const ShardKeyPattern& fieldsAndOrder,
bool unique,
vector* initPoints,
vector* initShards) {
uassert( 8042 , "db doesn't have sharding enabled" , _shardingEnabled );
uassert(13648,
str::stream() << "can't shard collection because not all config servers are up",
configServer.allUp(false));
ChunkManagerPtr manager;
{
scoped_lock lk( _lock );
CollectionInfo& ci = _collections[ns];
uassert( 8043 , "collection already sharded" , ! ci.isSharded() );
log() << "enable sharding on: " << ns << " with shard key: " << fieldsAndOrder << endl;
// Record start in changelog
BSONObjBuilder collectionDetail;
collectionDetail.append("shardKey", fieldsAndOrder.toBSON());
collectionDetail.append("collection", ns);
collectionDetail.append("primary", getPrimary().toString());
BSONArray a;
if (initShards == NULL)
a = BSONArray();
else {
BSONArrayBuilder b;
for (unsigned i = 0; i < initShards->size(); i++) {
b.append((*initShards)[i].getName());
}
a = b.arr();
}
collectionDetail.append("initShards", a);
collectionDetail.append("numChunks", (int)(initPoints->size() + 1));
configServer.logChange("shardCollection.start", ns, collectionDetail.obj());
ChunkManager* cm = new ChunkManager( ns, fieldsAndOrder, unique );
cm->createFirstChunks( configServer.getPrimary().getConnString(),
getPrimary(), initPoints, initShards );
ci.shard( cm );
_save();
// Save the initial chunk manager for later, no need to reload if we're in this lock
manager = ci.getCM();
verify( manager.get() );
}
// Tell the primary mongod to refresh it's data
// TODO: Think the real fix here is for mongos to just assume all collections sharded, when we get there
for( int i = 0; i < 4; i++ ){
if( i == 3 ){
warning() << "too many tries updating initial version of " << ns << " on shard primary " << getPrimary() <<
", other mongoses may not see the collection as sharded immediately" << endl;
break;
}
try {
ShardConnection conn( getPrimary(), ns );
conn.setVersion();
conn.done();
break;
}
catch( DBException& e ){
warning() << "could not update initial version of " << ns << " on shard primary " << getPrimary() <<
causedBy( e ) << endl;
}
sleepsecs( i );
}
// Record finish in changelog
BSONObjBuilder finishDetail;
finishDetail.append("version", manager->getVersion().toString());
configServer.logChange("shardCollection", ns, finishDetail.obj());
return manager;
}
bool DBConfig::removeSharding( const string& ns ) {
if ( ! _shardingEnabled ) {
warning() << "could not remove sharding for collection " << ns
<< ", sharding not enabled for db" << endl;
return false;
}
scoped_lock lk( _lock );
Collections::iterator i = _collections.find( ns );
if ( i == _collections.end() )
return false;
CollectionInfo& ci = _collections[ns];
if ( ! ci.isSharded() ){
warning() << "could not remove sharding for collection " << ns
<< ", no sharding information found" << endl;
return false;
}
ci.unshard();
_save( false, true );
return true;
}
// Handles weird logic related to getting *either* a chunk manager *or* the collection primary shard
void DBConfig::getChunkManagerOrPrimary( const string& ns, ChunkManagerPtr& manager, ShardPtr& primary ){
// The logic here is basically that at any time, our collection can become sharded or unsharded
// via a command. If we're not sharded, we want to send data to the primary, if sharded, we want
// to send data to the correct chunks, and we can't check both w/o the lock.
manager.reset();
primary.reset();
{
scoped_lock lk( _lock );
Collections::iterator i = _collections.find( ns );
// No namespace
if( i == _collections.end() ){
// If we don't know about this namespace, it's unsharded by default
primary.reset( new Shard( _primary ) );
}
else {
CollectionInfo& cInfo = i->second;
// TODO: we need to be careful about handling shardingEnabled, b/c in some places we seem to use and
// some we don't. If we use this function in combination with just getChunkManager() on a slightly
// borked config db, we'll get lots of staleconfig retries
if( _shardingEnabled && cInfo.isSharded() ){
manager = cInfo.getCM();
}
else{
// Make a copy, we don't want to be tied to this config object
primary.reset( new Shard( _primary ) );
}
}
}
verify( manager || primary );
verify( ! manager || ! primary );
}
ChunkManagerPtr DBConfig::getChunkManagerIfExists( const string& ns, bool shouldReload, bool forceReload ){
// Don't report exceptions here as errors in GetLastError
LastError::Disabled ignoreForGLE(lastError.get(false));
try{
return getChunkManager( ns, shouldReload, forceReload );
}
catch( AssertionException& e ){
warning() << "chunk manager not found for " << ns << causedBy( e ) << endl;
return ChunkManagerPtr();
}
}
ChunkManagerPtr DBConfig::getChunkManager( const string& ns , bool shouldReload, bool forceReload ) {
BSONObj key;
ChunkVersion oldVersion;
ChunkManagerPtr oldManager;
{
scoped_lock lk( _lock );
bool earlyReload = ! _collections[ns].isSharded() && ( shouldReload || forceReload );
if ( earlyReload ) {
// this is to catch cases where there this is a new sharded collection
_reload();
}
CollectionInfo& ci = _collections[ns];
uassert( 10181 , (string)"not sharded:" + ns , ci.isSharded() );
verify( ! ci.key().isEmpty() );
if ( ! ( shouldReload || forceReload ) || earlyReload )
return ci.getCM();
key = ci.key().copy();
if ( ci.getCM() ){
oldManager = ci.getCM();
oldVersion = ci.getCM()->getVersion();
}
}
verify( ! key.isEmpty() );
// TODO: We need to keep this first one-chunk check in until we have a more efficient way of
// creating/reusing a chunk manager, as doing so requires copying the full set of chunks currently
BSONObj newest;
if ( oldVersion.isSet() && ! forceReload ) {
ScopedDbConnection conn(configServer.modelServer(), 30.0);
newest = conn->findOne(ChunkType::ConfigNS,
Query(BSON(ChunkType::ns(ns))).sort(
ChunkType::DEPRECATED_lastmod(), -1));
conn.done();
if ( ! newest.isEmpty() ) {
ChunkVersion v = ChunkVersion::fromBSON(newest, ChunkType::DEPRECATED_lastmod());
if ( v.equals( oldVersion ) ) {
scoped_lock lk( _lock );
CollectionInfo& ci = _collections[ns];
uassert( 15885 , str::stream() << "not sharded after reloading from chunks : " << ns , ci.isSharded() );
return ci.getCM();
}
}
}
else if( ! oldVersion.isSet() ){
warning() << "version 0 found when " << ( forceReload ? "reloading" : "checking" ) << " chunk manager"
<< ", collection '" << ns << "' initially detected as sharded" << endl;
}
// we are not locked now, and want to load a new ChunkManager
auto_ptr temp;
{
scoped_lock lll ( _hitConfigServerLock );
if ( ! newest.isEmpty() && ! forceReload ) {
// if we have a target we're going for
// see if we've hit already
scoped_lock lk( _lock );
CollectionInfo& ci = _collections[ns];
if ( ci.isSharded() && ci.getCM() ) {
ChunkVersion currentVersion =
ChunkVersion::fromBSON(newest, ChunkType::DEPRECATED_lastmod());
// Only reload if the version we found is newer than our own in the same
// epoch
if( currentVersion <= ci.getCM()->getVersion() &&
ci.getCM()->getVersion().hasEqualEpoch( currentVersion ) )
{
return ci.getCM();
}
}
}
temp.reset(new ChunkManager(oldManager->getns(),
oldManager->getShardKeyPattern(),
oldManager->isUnique()));
temp->loadExistingRanges(configServer.getPrimary().getConnString(), oldManager.get());
if ( temp->numChunks() == 0 ) {
// maybe we're not sharded any more
reload(); // this is a full reload
return getChunkManager( ns , false );
}
}
scoped_lock lk( _lock );
CollectionInfo& ci = _collections[ns];
uassert( 14822 , (string)"state changed in the middle: " + ns , ci.isSharded() );
// Reset if our versions aren't the same
bool shouldReset = ! temp->getVersion().equals( ci.getCM()->getVersion() );
// Also reset if we're forced to do so
if( ! shouldReset && forceReload ){
shouldReset = true;
warning() << "chunk manager reload forced for collection '" << ns
<< "', config version is " << temp->getVersion() << endl;
}
//
// LEGACY BEHAVIOR
// It's possible to get into a state when dropping collections when our new version is less than our prev
// version. Behave identically to legacy mongos, for now, and warn to draw attention to the problem.
// TODO: Assert in next version, to allow smooth upgrades
//
if( shouldReset && temp->getVersion() < ci.getCM()->getVersion() ){
shouldReset = false;
warning() << "not resetting chunk manager for collection '" << ns
<< "', config version is " << temp->getVersion() << " and "
<< "old version is " << ci.getCM()->getVersion() << endl;
}
// end legacy behavior
if ( shouldReset ){
ci.resetCM( temp.release() );
}
uassert( 15883 , str::stream() << "not sharded after chunk manager reset : " << ns , ci.isSharded() );
return ci.getCM();
}
void DBConfig::setPrimary( const std::string& s ) {
scoped_lock lk( _lock );
_primary.reset( s );
_save();
}
void DBConfig::serialize(BSONObjBuilder& to) {
to.append("_id", _name);
to.appendBool(DatabaseType::DEPRECATED_partitioned(), _shardingEnabled );
to.append(DatabaseType::primary(), _primary.getName() );
}
void DBConfig::unserialize(const BSONObj& from) {
LOG(1) << "DBConfig unserialize: " << _name << " " << from << endl;
verify( _name == from[DatabaseType::name()].String() );
_shardingEnabled = from.getBoolField(DatabaseType::DEPRECATED_partitioned().c_str());
_primary.reset( from.getStringField(DatabaseType::primary().c_str()));
// In the 1.5.x series, we used to have collection metadata nested in the database entry. The 1.6.x series
// had migration code that ported that info to where it belongs now: the 'collections' collection. We now
// just assert that we're not migrating from a 1.5.x directly into a 1.7.x without first converting.
BSONObj sharded = from.getObjectField(DatabaseType::DEPRECATED_sharded().c_str());
if ( ! sharded.isEmpty() )
uasserted( 13509 , "can't migrate from 1.5.x release to the current one; need to upgrade to 1.6.x first");
}
bool DBConfig::load() {
scoped_lock lk( _lock );
return _load();
}
bool DBConfig::_load() {
ScopedDbConnection conn(configServer.modelServer(), 30.0);
BSONObj dbObj = conn->findOne( DatabaseType::ConfigNS,
BSON( DatabaseType::name( _name ) ) );
if ( dbObj.isEmpty() ) {
conn.done();
return false;
}
unserialize( dbObj );
BSONObjBuilder b;
b.appendRegex(CollectionType::ns(),
(string)"^" + pcrecpp::RE::QuoteMeta( _name ) + "\\." );
int numCollsErased = 0;
int numCollsSharded = 0;
auto_ptr cursor = conn->query(CollectionType::ConfigNS, b.obj());
verify( cursor.get() );
while ( cursor->more() ) {
BSONObj collObj = cursor->next();
string collName = collObj[CollectionType::ns()].String();
if( collObj[CollectionType::dropped()].trueValue() ){
_collections.erase( collName );
numCollsErased++;
}
else if( !collObj[CollectionType::primary()].eoo() ){
// For future compatibility, explicitly ignore any collection with the
// "primary" field set.
// Erased in case it was previously sharded, dropped, then init'd as unsharded
_collections.erase( collName );
numCollsErased++;
}
else{
_collections[ collName ] = CollectionInfo( collObj );
if( _collections[ collName ].isSharded() ) numCollsSharded++;
}
}
LOG(2) << "found " << numCollsErased << " dropped collections and "
<< numCollsSharded << " sharded collections for database " << _name << endl;
conn.done();
return true;
}
void DBConfig::_save( bool db, bool coll ) {
if( db ){
BSONObj n;
{
BSONObjBuilder b;
serialize(b);
n = b.obj();
}
BatchedCommandResponse response;
Status result = clusterUpdate( DatabaseType::ConfigNS,
BSON( DatabaseType::name( _name )),
n,
true, // upsert
false, // multi
WriteConcernOptions::AllConfigs,
&response );
if ( !result.isOK() ) {
uasserted( 13396, str::stream() << "DBConfig save failed: "
<< response.toBSON() );
}
}
if( coll ){
for ( Collections::iterator i=_collections.begin(); i!=_collections.end(); ++i ) {
if ( ! i->second.isDirty() )
continue;
i->second.save( i->first );
}
}
}
bool DBConfig::reload() {
bool successful = false;
{
scoped_lock lk( _lock );
successful = _reload();
}
//
// If we aren't successful loading the database entry, we don't want to keep the stale
// object around which has invalid data. We should remove it instead.
//
if( ! successful ) grid.removeDBIfExists( *this );
return successful;
}
bool DBConfig::_reload() {
// TODO: i don't think is 100% correct
return _load();
}
bool DBConfig::dropDatabase( string& errmsg ) {
/**
* 1) make sure everything is up
* 2) update config server
* 3) drop and reset sharded collections
* 4) drop and reset primary
* 5) drop everywhere to clean up loose ends
*/
log() << "DBConfig::dropDatabase: " << _name << endl;
configServer.logChange( "dropDatabase.start" , _name , BSONObj() );
// 1
if (!configServer.allUp(false, errmsg)) {
LOG(1) << "\t DBConfig::dropDatabase not all up" << endl;
return 0;
}
// 2
grid.removeDB( _name );
Status result = clusterDelete( DatabaseType::ConfigNS,
BSON( DatabaseType::name( _name )),
0 /* limit */,
WriteConcernOptions::AllConfigs,
NULL );
if ( !result.isOK() ) {
errmsg = result.reason();
log() << "could not drop '" << _name << "': " << errmsg << endl;
return false;
}
if (!configServer.allUp(false, errmsg)) {
log() << "error removing from config server even after checking!" << endl;
return 0;
}
LOG(1) << "\t removed entry from config server for: " << _name << endl;
set allServers;
// 3
while ( true ) {
int num = 0;
if ( ! _dropShardedCollections( num , allServers , errmsg ) )
return 0;
log() << " DBConfig::dropDatabase: " << _name << " dropped sharded collections: " << num << endl;
if ( num == 0 )
break;
}
// 4
{
ScopedDbConnection conn(_primary.getConnString(), 30.0);
BSONObj res;
if ( ! conn->dropDatabase( _name , &res ) ) {
errmsg = res.toString();
return 0;
}
conn.done();
}
// 5
for ( set::iterator i=allServers.begin(); i!=allServers.end(); i++ ) {
ScopedDbConnection conn(i->getConnString(), 30.0);
BSONObj res;
if ( ! conn->dropDatabase( _name , &res ) ) {
errmsg = res.toString();
return 0;
}
conn.done();
}
LOG(1) << "\t dropped primary db for: " << _name << endl;
configServer.logChange( "dropDatabase" , _name , BSONObj() );
return true;
}
bool DBConfig::_dropShardedCollections( int& num, set& allServers , string& errmsg ) {
num = 0;
set seen;
while ( true ) {
Collections::iterator i = _collections.begin();
for ( ; i != _collections.end(); ++i ) {
// log() << "coll : " << i->first << " and " << i->second.isSharded() << endl;
if ( i->second.isSharded() )
break;
}
if ( i == _collections.end() )
break;
if ( seen.count( i->first ) ) {
errmsg = "seen a collection twice!";
return false;
}
seen.insert( i->first );
LOG(1) << "\t dropping sharded collection: " << i->first << endl;
i->second.getCM()->getAllShards( allServers );
i->second.getCM()->drop( i->second.getCM() );
// We should warn, but it's not a fatal error if someone else reloaded the db/coll as
// unsharded in the meantime
if( ! removeSharding( i->first ) ){
warning() << "collection " << i->first
<< " was reloaded as unsharded before drop completed"
<< " during drop of all collections" << endl;
}
num++;
uassert( 10184 , "_dropShardedCollections too many collections - bailing" , num < 100000 );
LOG(2) << "\t\t dropped " << num << " so far" << endl;
}
return true;
}
void DBConfig::getAllShards(set& shards) const {
scoped_lock lk( _lock );
shards.insert(getPrimary());
for (Collections::const_iterator it(_collections.begin()), end(_collections.end()); it != end; ++it) {
if (it->second.isSharded()) {
it->second.getCM()->getAllShards(shards);
} // TODO: handle collections on non-primary shard
}
}
void DBConfig::getAllShardedCollections( set& namespaces ) const {
scoped_lock lk( _lock );
for( Collections::const_iterator i = _collections.begin(); i != _collections.end(); i++ ) {
log() << "Coll : " << i->first << " sharded? " << i->second.isSharded() << endl;
if( i->second.isSharded() ) namespaces.insert( i->first );
}
}
/* --- ConfigServer ---- */
ConfigServer::ConfigServer() : DBConfig( "config" ) {
_shardingEnabled = false;
}
ConfigServer::~ConfigServer() {
}
bool ConfigServer::init( const std::string& s ) {
vector configdbs;
splitStringDelim( s, &configdbs, ',' );
return init( configdbs );
}
bool ConfigServer::init( vector configHosts ) {
uassert( 10187 , "need configdbs" , configHosts.size() );
set hosts;
for ( size_t i=0; i::iterator i=hosts.begin(); i!=hosts.end(); i++ ) {
string host = *i;
// If this is a CUSTOM connection string (for testing) don't do DNS resolution
string errMsg;
if ( ConnectionString::parse( host, errMsg ).type() == ConnectionString::CUSTOM ) {
continue;
}
bool ok = false;
for ( int x=10; x>0; x-- ) {
if ( ! hostbyname( host.c_str() ).empty() ) {
ok = true;
break;
}
log() << "can't resolve DNS for [" << host << "] sleeping and trying " << x << " more times" << endl;
sleepsecs( 10 );
}
if ( ! ok )
return false;
}
_config = configHosts;
string errmsg;
if( ! checkHostsAreUnique(configHosts, &errmsg) ) {
error() << errmsg << endl;;
return false;
}
string fullString;
joinStringDelim( configHosts, &fullString, ',' );
_primary = Shard(_primary.getName(),
ConnectionString(fullString, ConnectionString::SYNC),
_primary.getMaxSizeMB(),
_primary.isDraining());
Shard::installShard(_primary.getName(), _primary);
LOG(1) << " config string : " << fullString << endl;
return true;
}
bool ConfigServer::checkHostsAreUnique( const vector& configHosts, string* errmsg ) {
//If we have one host, its always unique
if ( configHosts.size() == 1 ) {
return true;
}
//Compare each host with all other hosts.
set hostsTest;
pair::iterator,bool> ret;
for ( size_t x=0; x < configHosts.size(); x++) {
ret = hostsTest.insert( configHosts[x] );
if ( ret.second == false ) {
*errmsg = str::stream() << "config servers " << configHosts[x]
<< " exists twice in config listing.";
return false;
}
}
return true;
}
bool ConfigServer::checkConfigServersConsistent( string& errmsg , int tries ) const {
if ( tries <= 0 )
return false;
unsigned firstGood = 0;
int up = 0;
vector res;
// The last error we saw on a config server
string error;
for ( unsigned i=0; i<_config.size(); i++ ) {
BSONObj result;
scoped_ptr conn;
try {
conn.reset( new ScopedDbConnection( _config[i], 30.0 ) );
if ( ! conn->get()->runCommand( "config",
BSON( "dbhash" << 1 <<
"collections" << BSON_ARRAY( "chunks" <<
"databases" <<
"collections" <<
"shards" <<
"version" )),
result ) ) {
// TODO: Make this a helper
error = result["errmsg"].eoo() ? "" : result["errmsg"].String();
if (!result["assertion"].eoo()) error = result["assertion"].String();
warning() << "couldn't check dbhash on config server " << _config[i]
<< causedBy(result.toString()) << endl;
result = BSONObj();
}
else {
result = result.getOwned();
if ( up == 0 )
firstGood = i;
up++;
}
conn->done();
}
catch ( const DBException& e ) {
if (conn) {
conn->kill();
}
// We need to catch DBExceptions b/c sometimes we throw them
// instead of socket exceptions when findN fails
error = e.toString();
warning() << " couldn't check dbhash on config server " << _config[i] << causedBy(e) << endl;
}
res.push_back(result);
}
if ( _config.size() == 1 )
return true;
if ( up == 0 ) {
// Use a ptr to error so if empty we won't add causedby
errmsg = str::stream() << "no config servers successfully contacted" << causedBy(&error);
return false;
}
if ( up == 1 ) {
warning() << "only 1 config server reachable, continuing" << endl;
return true;
}
BSONObj base = res[firstGood];
for ( unsigned i=firstGood+1; iisStillConnected()) {
errmsg = str::stream() << "Not all config servers "
<< _primary.toString() << " are reachable";
LOG(1) << errmsg;
return false;
}
if (localCheckOnly) {
conn.done();
return true;
}
// Note: For SyncClusterConnection, gle will only be sent to the first
// node, and it is not even guaranteed to be invoked.
conn->getLastError();
conn.done();
return true;
}
catch (const DBException& excep) {
errmsg = str::stream() << "Not all config servers "
<< _primary.toString() << " are reachable"
<< causedBy(excep);
return false;
}
}
int ConfigServer::dbConfigVersion() {
ScopedDbConnection conn(_primary.getConnString(), 30.0);
int version = dbConfigVersion( conn.conn() );
conn.done();
return version;
}
int ConfigServer::dbConfigVersion( DBClientBase& conn ) {
auto_ptr c = conn.query( "config.version" , BSONObj() );
int version = 0;
if ( c->more() ) {
BSONObj o = c->next();
version = o["version"].numberInt();
uassert( 10189 , "should only have 1 thing in config.version" , ! c->more() );
}
else {
if ( conn.count(ShardType::ConfigNS) || conn.count( DatabaseType::ConfigNS ) ) {
version = 1;
}
}
return version;
}
void ConfigServer::reloadSettings() {
set got;
try {
ScopedDbConnection conn(_primary.getConnString(), 30.0);
auto_ptr cursor = conn->query(SettingsType::ConfigNS, BSONObj());
verify(cursor.get());
while (cursor->more()) {
BSONObj o = cursor->nextSafe();
string name = o[SettingsType::key()].valuestrsafe();
got.insert( name );
if ( name == "chunksize" ) {
int csize = o[SettingsType::chunksize()].numberInt();
// validate chunksize before proceeding
if ( csize == 0 ) {
// setting was not modified; mark as such
got.erase(name);
log() << "warning: invalid chunksize (" << csize << ") ignored" << endl;
} else {
LOG(1) << "MaxChunkSize: " << csize << endl;
if ( !Chunk::setMaxChunkSizeSizeMB( csize ) ) {
warning() << "invalid chunksize: " << csize << endl;
}
}
}
else if ( name == "balancer" ) {
// ones we ignore here
}
else {
log() << "warning: unknown setting [" << name << "]" << endl;
}
}
conn.done();
}
catch (const DBException& ex) {
warning() << "couldn't load settings on config db" << causedBy(ex);
}
if ( ! got.count( "chunksize" ) ) {
const int chunkSize = Chunk::MaxChunkSize / (1024 * 1024);
Status result = clusterInsert( SettingsType::ConfigNS,
BSON( SettingsType::key("chunksize") <<
SettingsType::chunksize(chunkSize)),
WriteConcernOptions::AllConfigs,
NULL );
if (!result.isOK()) {
warning() << "couldn't set chunkSize on config db" << causedBy(result);
}
}
// indexes
Status result = clusterCreateIndex( ChunkType::ConfigNS,
BSON( ChunkType::ns() << 1 << ChunkType::min() << 1 ),
true, // unique
WriteConcernOptions::AllConfigs,
NULL );
if (!result.isOK()) {
warning() << "couldn't create ns_1_min_1 index on config db" << causedBy(result);
}
result = clusterCreateIndex( ChunkType::ConfigNS,
BSON( ChunkType::ns() << 1 <<
ChunkType::shard() << 1 <<
ChunkType::min() << 1 ),
true, // unique
WriteConcernOptions::AllConfigs,
NULL );
if (!result.isOK()) {
warning() << "couldn't create ns_1_shard_1_min_1 index on config db"
<< causedBy(result);
}
result = clusterCreateIndex( ChunkType::ConfigNS,
BSON( ChunkType::ns() << 1 <<
ChunkType::DEPRECATED_lastmod() << 1 ),
true, // unique
WriteConcernOptions::AllConfigs,
NULL );
if (!result.isOK()) {
warning() << "couldn't create ns_1_lastmod_1 index on config db" << causedBy(result);
}
result = clusterCreateIndex( ShardType::ConfigNS,
BSON( ShardType::host() << 1 ),
true, // unique
WriteConcernOptions::AllConfigs,
NULL );
if (!result.isOK()) {
warning() << "couldn't create host_1 index on config db" << causedBy(result);
}
result = clusterCreateIndex(LocksType::ConfigNS,
BSON(LocksType::lockID() << 1),
false, // unique
WriteConcernOptions::AllConfigs,
NULL);
if (!result.isOK()) {
warning() << "couldn't create lock id index on config db" << causedBy(result);
}
result = clusterCreateIndex( LocksType::ConfigNS,
BSON( LocksType::state() << 1 <<
LocksType::process() << 1 ),
false, // unique
WriteConcernOptions::AllConfigs,
NULL );
if (!result.isOK()) {
warning() << "couldn't create state and process id index on config db"
<< causedBy(result);
}
result = clusterCreateIndex( LockpingsType::ConfigNS,
BSON( LockpingsType::ping() << 1 ),
false, // unique
WriteConcernOptions::AllConfigs,
NULL );
if (!result.isOK()) {
warning() << "couldn't create lockping ping time index on config db"
<< causedBy(result);
}
result = clusterCreateIndex(TagsType::ConfigNS,
BSON(TagsType::ns() << 1 << TagsType::min() << 1),
true, // unique
WriteConcernOptions::AllConfigs,
NULL);
if (!result.isOK()) {
warning() << "could not create index ns_1_min_1: " << causedBy(result);
}
}
string ConfigServer::getHost( const std::string& name , bool withPort ) {
if ( name.find( ":" ) != string::npos ) {
if ( withPort )
return name;
return name.substr( 0 , name.find( ":" ) );
}
if ( withPort ) {
stringstream ss;
ss << name << ":" << ServerGlobalParams::ConfigServerPort;
return ss.str();
}
return name;
}
/* must never throw */
void ConfigServer::logChange( const string& what , const string& ns , const BSONObj& detail ) {
string changeID;
try {
// get this entry's ID so we can use on the exception code path too
stringstream id;
id << getHostNameCached() << "-" << terseCurrentTime() << "-" << OID::gen();
changeID = id.str();
// send a copy of the message to the log in case it doesn't manage to reach config.changelog
Client* c = currentClient.get();
BSONObj msg = BSON( ChangelogType::changeID(changeID) <<
ChangelogType::server(getHostNameCached()) <<
ChangelogType::clientAddr((c ? c->clientAddress(true) : "N/A")) <<
ChangelogType::time(jsTime()) <<
ChangelogType::what(what) <<
ChangelogType::ns(ns) <<
ChangelogType::details(detail) );
log() << "about to log metadata event: " << msg << endl;
verify( _primary.ok() );
ScopedDbConnection conn(_primary.getConnString(), 30.0);
static bool createdCapped = false;
if ( ! createdCapped ) {
try {
conn->createCollection( ChangelogType::ConfigNS , 1024 * 1024 * 10 , true );
}
catch ( UserException& e ) {
LOG(1) << "couldn't create changelog (like race condition): " << e << endl;
// don't care
}
createdCapped = true;
}
conn.done();
Status result = clusterInsert( ChangelogType::ConfigNS,
msg,
WriteConcernOptions::AllConfigs,
NULL );
if ( !result.isOK() ) {
log() << "Error encountered while logging config change with ID: " << changeID
<< result.reason() << endl;
}
}
catch ( std::exception& e ) {
// if we got here, it means the config change is only in the log; it didn't make it to config.changelog
log() << "not logging config change: " << changeID << " " << e.what() << endl;
}
}
void ConfigServer::replicaSetChange(const string& setName, const string& newConnectionString) {
// This is run in it's own thread. Exceptions escaping would result in a call to terminate.
Client::initThread("replSetChange");
try {
Shard s = Shard::lookupRSName(setName);
if (s == Shard::EMPTY) {
LOG(1) << "shard not found for set: " << newConnectionString;
return;
}
Status result = clusterUpdate(ShardType::ConfigNS,
BSON(ShardType::name(s.getName())),
BSON("$set" << BSON(ShardType::host(newConnectionString))),
false, // upsert
false, // multi
WriteConcernOptions::AllConfigs,
NULL);
if ( !result.isOK() ) {
error() << "RSChangeWatcher: could not update config db for set: "
<< setName
<< " to: " << newConnectionString
<< ": " << result.reason() << endl;
}
}
catch (const std::exception& e) {
log() << "caught exception while updating config servers: " << e.what();
}
catch (...) {
log() << "caught unknown exception while updating config servers";
}
}
DBConfigPtr configServerPtr (new ConfigServer());
ConfigServer& configServer = dynamic_cast(*configServerPtr);
}